Skip to content
This repository was archived by the owner on Feb 18, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions go/app/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,19 @@ func Cli(command string, strict bool, instance string, destination string, owner
}
fmt.Println(instanceKey.DisplayString())
}
case registerCliCommand("which-gtid-errant", "Replication, general", `Get errant GTID set (empty results if no errant GTID)`):
{
instanceKey, _ = inst.FigureInstanceKey(instanceKey, thisInstanceKey)

instance, err := inst.ReadTopologyInstance(instanceKey)
if err != nil {
log.Fatale(err)
}
if instance == nil {
log.Fatalf("Instance not found: %+v", *instanceKey)
}
fmt.Println(instance.GtidErrant)
}
case registerCliCommand("gtid-errant-reset-master", "Replication, general", `Reset master on instance, remove GTID errant transactions`):
{
instanceKey, _ = inst.FigureInstanceKey(instanceKey, thisInstanceKey)
Expand Down Expand Up @@ -828,6 +841,20 @@ func Cli(command string, strict bool, instance string, destination string, owner
}
fmt.Println(fmt.Sprintf("%+v:%s", *coordinates, text))
}
case registerCliCommand("locate-gtid-errant", "Binary logs", `List binary logs containing errant GTIDs`):
{
instanceKey, _ = inst.FigureInstanceKey(instanceKey, thisInstanceKey)
if instanceKey == nil {
log.Fatalf("Unresolved instance")
}
errantBinlogs, err := inst.LocateErrantGTID(instanceKey)
if err != nil {
log.Fatale(err)
}
for _, binlog := range errantBinlogs {
fmt.Println(binlog)
}
}
case registerCliCommand("last-executed-relay-entry", "Binary logs", `Find coordinates of last executed relay log entry`):
{
instanceKey, _ = inst.FigureInstanceKey(instanceKey, thisInstanceKey)
Expand Down
17 changes: 17 additions & 0 deletions go/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,22 @@ func (this *HttpAPI) DisableGTID(params martini.Params, r render.Render, req *ht
Respond(r, &APIResponse{Code: OK, Message: fmt.Sprintf("Disabled GTID on %+v", instance.Key), Details: instance})
}

// LocateErrantGTID identifies the binlog positions for errant GTIDs on an instance
func (this *HttpAPI) LocateErrantGTID(params martini.Params, r render.Render, req *http.Request, user auth.User) {
instanceKey, err := this.getInstanceKey(params["host"], params["port"])

if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: err.Error()})
return
}
errantBinlogs, err := inst.LocateErrantGTID(&instanceKey)
if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: err.Error()})
return
}
Respond(r, &APIResponse{Code: OK, Message: fmt.Sprintf("located errant GTID"), Details: errantBinlogs})
}

// ErrantGTIDResetMaster removes errant transactions on a server by way of RESET MASTER
func (this *HttpAPI) ErrantGTIDResetMaster(params martini.Params, r render.Render, req *http.Request, user auth.User) {
if !isAuthorizedForAction(req, user) {
Expand Down Expand Up @@ -3527,6 +3543,7 @@ func (this *HttpAPI) RegisterRequests(m *martini.ClassicMartini) {
// Replication, general:
this.registerAPIRequest(m, "enable-gtid/:host/:port", this.EnableGTID)
this.registerAPIRequest(m, "disable-gtid/:host/:port", this.DisableGTID)
this.registerAPIRequest(m, "locate-gtid-errant/:host/:port", this.LocateErrantGTID)
this.registerAPIRequest(m, "gtid-errant-reset-master/:host/:port", this.ErrantGTIDResetMaster)
this.registerAPIRequest(m, "gtid-errant-inject-empty/:host/:port", this.ErrantGTIDInjectEmpty)
this.registerAPIRequest(m, "skip-query/:host/:port", this.SkipQuery)
Expand Down
24 changes: 24 additions & 0 deletions go/inst/instance_binlog_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,3 +838,27 @@ func GetNextBinlogCoordinatesToMatch(
}
// Won't get here
}

func GetPreviousGTIDs(instanceKey *InstanceKey, binlog string) (previousGTIDs *OracleGtidSet, err error) {
if binlog == "" {
return nil, log.Errorf("GetPreviousGTIDs: empty binlog file name for %+v", *instanceKey)
}
db, err := db.OpenTopology(instanceKey.Hostname, instanceKey.Port)
if err != nil {
return nil, err
}

query := fmt.Sprintf("show binlog events in '%s' LIMIT 5", binlog)

err = sqlutils.QueryRowsMapBuffered(db, query, func(m sqlutils.RowMap) error {
eventType := m.GetString("Event_type")
if eventType == "Previous_gtids" {
var e error
if previousGTIDs, e = NewOracleGtidSet(m.GetString("Info")); e != nil {
return e
}
}
return nil
})
return previousGTIDs, err
}
51 changes: 51 additions & 0 deletions go/inst/instance_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -1192,6 +1192,57 @@ func DisableGTID(instanceKey *InstanceKey) (*Instance, error) {
return instance, err
}

func LocateErrantGTID(instanceKey *InstanceKey) (errantBinlogs []string, err error) {
instance, err := ReadTopologyInstance(instanceKey)
if err != nil {
return errantBinlogs, err
}
errantSearch := instance.GtidErrant
if errantSearch == "" {
return errantBinlogs, log.Errorf("locate-errant-gtid: no errant-gtid on %+v", *instanceKey)
}
subtract, err := GTIDSubtract(instanceKey, errantSearch, instance.GtidPurged)
if err != nil {
return errantBinlogs, err
}
if subtract != errantSearch {
return errantBinlogs, fmt.Errorf("locate-errant-gtid: %+v is already purged on %+v", subtract, *instanceKey)
}
binlogs, err := ShowBinaryLogs(instanceKey)
if err != nil {
return errantBinlogs, err
}
previousGTIDs := make(map[string]*OracleGtidSet)
for _, binlog := range binlogs {
oracleGTIDSet, err := GetPreviousGTIDs(instanceKey, binlog)
if err != nil {
return errantBinlogs, err
}
previousGTIDs[binlog] = oracleGTIDSet
}
for i, binlog := range binlogs {
if errantSearch == "" {
break
}
previousGTID := previousGTIDs[binlog]
subtract, err := GTIDSubtract(instanceKey, errantSearch, previousGTID.String())
if err != nil {
return errantBinlogs, err
}
if subtract != errantSearch {
// binlogs[i-1] is safe to use when i==0. because that implies GTIDs have been purged,
// which covered by an earlier assertion
errantBinlogs = append(errantBinlogs, binlogs[i-1])
errantSearch = subtract
}
}
if errantSearch != "" {
// then it's in the last binary log
errantBinlogs = append(errantBinlogs, binlogs[len(binlogs)-1])
}
return errantBinlogs, err
}

// ErrantGTIDResetMaster will issue a safe RESET MASTER on a replica that replicates via GTID:
// It will make sure the gtid_purged set matches the executed set value as read just before the RESET.
// this will enable new replicas to be attached to given instance without complaints about missing/purged entries.
Expand Down
12 changes: 12 additions & 0 deletions go/inst/instance_topology_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -1182,3 +1182,15 @@ func ShowMasterStatus(instanceKey *InstanceKey) (masterStatusFound bool, execute
})
return masterStatusFound, executedGtidSet, err
}

func ShowBinaryLogs(instanceKey *InstanceKey) (binlogs []string, err error) {
db, err := db.OpenTopology(instanceKey.Hostname, instanceKey.Port)
if err != nil {
return binlogs, err
}
err = sqlutils.QueryRowsMap(db, "show binary logs", func(m sqlutils.RowMap) error {
binlogs = append(binlogs, m.GetString("Log_name"))
return nil
})
return binlogs, err
}
14 changes: 14 additions & 0 deletions resources/bin/orchestrator-client
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,18 @@ function purge_binary_logs {
print_details | filter_key | print_key
}

function which_gtid_errant {
assert_nonempty "instance" "$instance_hostport"
api "instance/$instance_hostport"
print_response | jq -r '.GtidErrant'
}

function locate_gtid_errant {
assert_nonempty "instance" "$instance_hostport"
api "locate-gtid-errant/$instance_hostport"
print_response | print_details | jq -r '.[]'
}

function last_pseudo_gtid {
assert_nonempty "instance" "$instance_hostport"
api "last-pseudo-gtid/$instance_hostport"
Expand Down Expand Up @@ -893,6 +905,8 @@ function run_command {
"detach-replica-master-host") general_instance_command ;; # Stops replication and modifies Master_Host into an impossible yet reversible value.
"reattach-replica-master-host") general_instance_command ;; # Undo a detach-replica-master-host operation
"skip-query") general_instance_command ;; # Skip a single statement on a replica; either when running with GTID or without
"which-gtid-errant") which_gtid_errant ;; # Get errant GTID set (empty results if no errant GTID)
"locate-gtid-errant") locate_gtid_errant ;; # List binary logs containing errant GTID
"gtid-errant-reset-master") general_instance_command ;; # Remove errant GTID transactions by way of RESET MASTER
"gtid-errant-inject-empty") general_instance_command ;; # Apply errant GTID as empty transactions on cluster's master
"enable-semi-sync-master") general_instance_command ;; # Enable semi-sync (master-side)
Expand Down