/* Copyright 2016 GitHub Inc. See https://github.com/github/gh-osc/blob/master/LICENSE */ package logic import ( "fmt" "os" "os/signal" "sync/atomic" "syscall" "time" "github.com/github/gh-osc/go/base" "github.com/github/gh-osc/go/binlog" "github.com/github/gh-osc/go/sql" "github.com/outbrain/golib/log" ) type ChangelogState string const ( TablesInPlace ChangelogState = "TablesInPlace" AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed" ) type tableWriteFunc func() error const ( applyEventsQueueBuffer = 100 heartbeatIntervalMilliseconds = 1000 ) // Migrator is the main schema migration flow manager. type Migrator struct { inspector *Inspector applier *Applier eventsStreamer *EventsStreamer migrationContext *base.MigrationContext tablesInPlace chan bool rowCopyComplete chan bool allEventsUpToLockProcessed chan bool panicAbort chan error // copyRowsQueue should not be buffered; if buffered some non-damaging but // excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete copyRowsQueue chan tableWriteFunc applyEventsQueue chan tableWriteFunc handledChangelogStates map[string]bool } func NewMigrator() *Migrator { migrator := &Migrator{ migrationContext: base.GetMigrationContext(), tablesInPlace: make(chan bool), rowCopyComplete: make(chan bool), allEventsUpToLockProcessed: make(chan bool), panicAbort: make(chan error), copyRowsQueue: make(chan tableWriteFunc), applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer), handledChangelogStates: make(map[string]bool), } return migrator } // acceptSignals registers for OS signals func (this *Migrator) acceptSignals() { c := make(chan os.Signal, 1) signal.Notify(c, syscall.SIGHUP) go func() { for sig := range c { switch sig { case syscall.SIGHUP: log.Debugf("Received SIGHUP. Reloading configuration") } } }() } func (this *Migrator) shouldThrottle() (result bool, reason string) { lag := atomic.LoadInt64(&this.migrationContext.CurrentLag) if time.Duration(lag) > time.Duration(this.migrationContext.MaxLagMillisecondsThrottleThreshold)*time.Millisecond { return true, fmt.Sprintf("lag=%fs", time.Duration(lag).Seconds()) } if this.migrationContext.ThrottleFlagFile != "" { if _, err := os.Stat(this.migrationContext.ThrottleFlagFile); err == nil { // Throttle file defined and exists! return true, "flag-file" } } if this.migrationContext.ThrottleAdditionalFlagFile != "" { if _, err := os.Stat(this.migrationContext.ThrottleAdditionalFlagFile); err == nil { // 2nd Throttle file defined and exists! return true, "flag-file" } } for variableName, threshold := range this.migrationContext.MaxLoad { value, err := this.applier.ShowStatusVariable(variableName) if err != nil { return true, fmt.Sprintf("%s %s", variableName, err) } if value > threshold { return true, fmt.Sprintf("%s=%d", variableName, value) } } return false, "" } func (this *Migrator) initiateThrottler() error { throttlerTick := time.Tick(1 * time.Second) throttlerFunction := func() { alreadyThrottling, currentReason := this.migrationContext.IsThrottled() shouldThrottle, throttleReason := this.shouldThrottle() if shouldThrottle && !alreadyThrottling { // New throttling this.applier.WriteAndLogChangelog("throttle", throttleReason) } else if shouldThrottle && alreadyThrottling && (currentReason != throttleReason) { // Change of reason this.applier.WriteAndLogChangelog("throttle", throttleReason) } else if alreadyThrottling && !shouldThrottle { // End of throttling this.applier.WriteAndLogChangelog("throttle", "done throttling") } this.migrationContext.SetThrottled(shouldThrottle, throttleReason) } throttlerFunction() for range throttlerTick { throttlerFunction() } return nil } // throttle initiates a throttling event, if need be, updates the Context and // calls callback functions, if any func (this *Migrator) throttle(onThrottled func()) { for { if shouldThrottle, _ := this.migrationContext.IsThrottled(); !shouldThrottle { return } if onThrottled != nil { onThrottled() } time.Sleep(time.Second) } } // retryOperation attempts up to `count` attempts at running given function, // exiting as soon as it returns with non-error. func (this *Migrator) retryOperation(operation func() error) (err error) { maxRetries := this.migrationContext.MaxRetries() for i := 0; i < maxRetries; i++ { if i != 0 { // sleep after previous iteration time.Sleep(1 * time.Second) } err = operation() if err == nil { return nil } // there's an error. Let's try again. } this.panicAbort <- err return err } func (this *Migrator) canStopStreaming() bool { return false } func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) { // Hey, I created the changlog table, I know the type of columns it has! if hint := dmlEvent.NewColumnValues.StringColumn(2); hint != "state" { return nil } changelogState := ChangelogState(dmlEvent.NewColumnValues.StringColumn(3)) switch changelogState { case TablesInPlace: { this.tablesInPlace <- true } case AllEventsUpToLockProcessed: { this.allEventsUpToLockProcessed <- true } default: { return fmt.Errorf("Unknown changelog state: %+v", changelogState) } } log.Debugf("Received state %+v", changelogState) return nil } func (this *Migrator) onChangelogState(stateValue string) (err error) { log.Fatalf("I shouldn't be here") if this.handledChangelogStates[stateValue] { return nil } this.handledChangelogStates[stateValue] = true changelogState := ChangelogState(stateValue) switch changelogState { case TablesInPlace: { this.tablesInPlace <- true } case AllEventsUpToLockProcessed: { this.allEventsUpToLockProcessed <- true } default: { return fmt.Errorf("Unknown changelog state: %+v", changelogState) } } log.Debugf("Received state %+v", changelogState) return nil } func (this *Migrator) onChangelogHeartbeat(heartbeatValue string) (err error) { heartbeatTime, err := time.Parse(time.RFC3339Nano, heartbeatValue) if err != nil { return log.Errore(err) } lag := time.Now().Sub(heartbeatTime) atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag)) return nil } // func (this *Migrator) listenOnPanicAbort() { err := <-this.panicAbort log.Fatale(err) } func (this *Migrator) Migrate() (err error) { this.migrationContext.StartTime = time.Now() go this.listenOnPanicAbort() if err := this.initiateInspector(); err != nil { return err } if err := this.initiateStreaming(); err != nil { return err } if err := this.initiateApplier(); err != nil { return err } log.Debugf("Waiting for tables to be in place") <-this.tablesInPlace log.Debugf("Tables are in place") // Yay! We now know the Ghost and Changelog tables are good to examine! // When running on replica, this means the replica has those tables. When running // on master this is always true, of course, and yet it also implies this knowledge // is in the binlogs. if err := this.inspector.InspectOriginalAndGhostTables(); err != nil { return err } go this.initiateHeartbeatListener() if err := this.applier.ReadMigrationRangeValues(); err != nil { return err } go this.initiateThrottler() go this.executeWriteFuncs() go this.iterateChunks() this.migrationContext.RowCopyStartTime = time.Now() go this.initiateStatus() log.Debugf("Operating until row copy is complete") <-this.rowCopyComplete log.Debugf("Row copy complete") this.printStatus() this.stopWritesAndCompleteMigration() return nil } func (this *Migrator) stopWritesAndCompleteMigration() (err error) { if this.migrationContext.Noop { log.Debugf("Noop operation; not really swapping tables") return nil } this.throttle(func() { log.Debugf("throttling before swapping tables") }) if this.migrationContext.TestOnReplica { return this.stopWritesAndCompleteMigrationOnReplica() } // Running on master if this.migrationContext.QuickAndBumpySwapTables { return this.stopWritesAndCompleteMigrationOnMasterQuickAndBumpy() } return this.stopWritesAndCompleteMigrationOnMasterViaLock() } func (this *Migrator) stopWritesAndCompleteMigrationOnMasterQuickAndBumpy() (err error) { if err := this.retryOperation(this.applier.LockTables); err != nil { return err } this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed)) log.Debugf("Waiting for events up to lock") <-this.allEventsUpToLockProcessed log.Debugf("Done waiting for events up to lock") if err := this.retryOperation(this.applier.SwapTables); err != nil { return err } if err := this.retryOperation(this.applier.UnlockTables); err != nil { return err } if this.migrationContext.OkToDropTable { dropTableFunc := func() error { return this.applier.dropTable(this.migrationContext.GetOldTableName()) } if err := this.retryOperation(dropTableFunc); err != nil { return err } } lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime) renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime) log.Debugf("Lock & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName)) return nil } func (this *Migrator) stopWritesAndCompleteMigrationOnMasterViaLock() (err error) { if err := this.retryOperation(this.applier.LockTables); err != nil { return err } this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed)) log.Debugf("Waiting for events up to lock") <-this.allEventsUpToLockProcessed log.Debugf("Done waiting for events up to lock") if err := this.retryOperation(this.applier.SwapTables); err != nil { return err } if err := this.retryOperation(this.applier.UnlockTables); err != nil { return err } if this.migrationContext.OkToDropTable { dropTableFunc := func() error { return this.applier.dropTable(this.migrationContext.GetOldTableName()) } if err := this.retryOperation(dropTableFunc); err != nil { return err } } lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime) renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime) log.Debugf("Lock & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName)) return nil } func (this *Migrator) stopWritesAndCompleteMigrationOnReplica() (err error) { log.Debugf("testing on replica. Instead of LOCK tables I will STOP SLAVE") if err := this.retryOperation(this.applier.StopSlaveIOThread); err != nil { return err } this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed)) log.Debugf("Waiting for events up to lock") <-this.allEventsUpToLockProcessed log.Debugf("Done waiting for events up to lock") log.Info("Table duplicated with new schema. Am not touching the original table. Replication is stopped. You may now compare the two tables to gain trust into this tool's operation") return nil } func (this *Migrator) initiateInspector() (err error) { this.inspector = NewInspector() if err := this.inspector.InitDBConnections(); err != nil { return err } if err := this.inspector.ValidateOriginalTable(); err != nil { return err } if err := this.inspector.InspectOriginalTable(); err != nil { return err } // So far so good, table is accessible and valid. // Let's get master connection config if this.migrationContext.ApplierConnectionConfig, err = this.inspector.getMasterConnectionConfig(); err != nil { return err } if this.migrationContext.TestOnReplica { if this.migrationContext.InspectorIsAlsoApplier() { return fmt.Errorf("Instructed to --test-on-replica, but the server we connect to doesn't seem to be a replica") } log.Infof("--test-on-replica given. Will not execute on master %+v but rather on replica %+v itself", this.migrationContext.ApplierConnectionConfig.Key, this.migrationContext.InspectorConnectionConfig.Key, ) this.migrationContext.ApplierConnectionConfig = this.migrationContext.InspectorConnectionConfig.Duplicate() } else if this.migrationContext.InspectorIsAlsoApplier() && !this.migrationContext.AllowedRunningOnMaster { return fmt.Errorf("It seems like this migration attempt to run directly on master. Preferably it would be executed on a replica (and this reduces load from the master). To proceed please provide --allow-on-master") } log.Infof("Master found to be %+v", this.migrationContext.ApplierConnectionConfig.Key) return nil } func (this *Migrator) initiateStatus() error { this.printStatus() statusTick := time.Tick(1 * time.Second) for range statusTick { go this.printStatus() } return nil } func (this *Migrator) printStatus() { elapsedTime := this.migrationContext.ElapsedTime() elapsedSeconds := int64(elapsedTime.Seconds()) totalRowsCopied := this.migrationContext.GetTotalRowsCopied() rowsEstimate := this.migrationContext.RowsEstimate progressPct := 100.0 * float64(totalRowsCopied) / float64(rowsEstimate) shouldPrintStatus := false if elapsedSeconds <= 60 { shouldPrintStatus = true } else if progressPct >= 99.0 { shouldPrintStatus = true } else if progressPct >= 95.0 { shouldPrintStatus = (elapsedSeconds%5 == 0) } else if elapsedSeconds <= 120 { shouldPrintStatus = (elapsedSeconds%5 == 0) } else { shouldPrintStatus = (elapsedSeconds%30 == 0) } if !shouldPrintStatus { return } eta := "N/A" if isThrottled, throttleReason := this.migrationContext.IsThrottled(); isThrottled { eta = fmt.Sprintf("throttled, %s", throttleReason) } status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Elapsed: %+v(copy), %+v(total); ETA: %s", totalRowsCopied, rowsEstimate, progressPct, atomic.LoadInt64(&this.migrationContext.TotalDMLEventsApplied), len(this.applyEventsQueue), cap(this.applyEventsQueue), base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()), base.PrettifyDurationOutput(elapsedTime), eta, ) this.applier.WriteChangelog( fmt.Sprintf("copy iteration %d at %d", this.migrationContext.GetIteration(), time.Now().Unix()), status, ) fmt.Println(status) } func (this *Migrator) initiateHeartbeatListener() { ticker := time.Tick((heartbeatIntervalMilliseconds * time.Millisecond) / 2) for range ticker { go func() error { changelogState, err := this.inspector.readChangelogState() if err != nil { return log.Errore(err) } for hint, value := range changelogState { switch hint { case "heartbeat": { this.onChangelogHeartbeat(value) } } } return nil }() } } // initiateStreaming begins treaming of binary log events and registers listeners for such events func (this *Migrator) initiateStreaming() error { this.eventsStreamer = NewEventsStreamer() if err := this.eventsStreamer.InitDBConnections(); err != nil { return err } this.eventsStreamer.AddListener( false, this.migrationContext.DatabaseName, this.migrationContext.GetChangelogTableName(), func(dmlEvent *binlog.BinlogDMLEvent) error { return this.onChangelogStateEvent(dmlEvent) }, ) this.eventsStreamer.AddListener( true, this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, func(dmlEvent *binlog.BinlogDMLEvent) error { applyEventFunc := func() error { return this.applier.ApplyDMLEventQuery(dmlEvent) } this.applyEventsQueue <- applyEventFunc return nil }, ) go func() { log.Debugf("Beginning streaming") this.eventsStreamer.StreamEvents(func() bool { return this.canStopStreaming() }) }() return nil } func (this *Migrator) initiateApplier() error { this.applier = NewApplier() if err := this.applier.InitDBConnections(); err != nil { return err } if err := this.applier.CreateGhostTable(); err != nil { log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out") return err } if err := this.applier.AlterGhost(); err != nil { log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out") return err } if err := this.applier.CreateChangelogTable(); err != nil { log.Errorf("Unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out") return err } this.applier.WriteChangelogState(string(TablesInPlace)) go this.applier.InitiateHeartbeat(heartbeatIntervalMilliseconds) return nil } func (this *Migrator) iterateChunks() error { terminateRowIteration := func(err error) error { this.rowCopyComplete <- true return log.Errore(err) } if this.migrationContext.Noop { log.Debugf("Noop operation; not really copying data") return terminateRowIteration(nil) } if this.migrationContext.MigrationRangeMinValues == nil { log.Debugf("No rows found in table. Rowcopy will be implicitly empty") return terminateRowIteration(nil) } for { copyRowsFunc := func() error { hasFurtherRange, err := this.applier.CalculateNextIterationRangeEndValues() if err != nil { return terminateRowIteration(err) } if !hasFurtherRange { return terminateRowIteration(nil) } _, rowsAffected, _, err := this.applier.ApplyIterationInsertQuery() if err != nil { return terminateRowIteration(err) } atomic.AddInt64(&this.migrationContext.TotalRowsCopied, rowsAffected) atomic.AddInt64(&this.migrationContext.Iteration, 1) return nil } this.copyRowsQueue <- copyRowsFunc } return nil } func (this *Migrator) executeWriteFuncs() error { if this.migrationContext.Noop { log.Debugf("Noop operation; not really executing write funcs") return nil } for { this.throttle(nil) // We give higher priority to event processing, then secondary priority to // rowcopy select { case applyEventFunc := <-this.applyEventsQueue: { if err := this.retryOperation(applyEventFunc); err != nil { return log.Errore(err) } } default: { select { case copyRowsFunc := <-this.copyRowsQueue: { if err := this.retryOperation(copyRowsFunc); err != nil { return log.Errore(err) } } default: { // Hmmmmm... nothing in the queue; no events, but also no row copy. // This is possible upon load. Let's just sleep it over. log.Debugf("Getting nothing in the write queue. Sleeping...") time.Sleep(time.Second) } } } } } return nil }