Merge pull request #6 from github/master

Updates from upstream
This commit is contained in:
Shlomi Noach 2020-07-23 14:01:49 +03:00 committed by GitHub
commit 87595b1780
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 320 additions and 233 deletions

View File

@ -19,6 +19,7 @@ import (
"github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/mysql"
"github.com/github/gh-ost/go/sql" "github.com/github/gh-ost/go/sql"
"github.com/outbrain/golib/log"
"gopkg.in/gcfg.v1" "gopkg.in/gcfg.v1"
gcfgscanner "gopkg.in/gcfg.v1/scanner" gcfgscanner "gopkg.in/gcfg.v1/scanner"
@ -217,6 +218,25 @@ type MigrationContext struct {
ForceTmpTableName string ForceTmpTableName string
recentBinlogCoordinates mysql.BinlogCoordinates recentBinlogCoordinates mysql.BinlogCoordinates
Log Logger
}
type Logger interface {
Debug(args ...interface{})
Debugf(format string, args ...interface{})
Info(args ...interface{})
Infof(format string, args ...interface{})
Warning(args ...interface{}) error
Warningf(format string, args ...interface{}) error
Error(args ...interface{}) error
Errorf(format string, args ...interface{}) error
Errore(err error) error
Fatal(args ...interface{}) error
Fatalf(format string, args ...interface{}) error
Fatale(err error) error
SetLevel(level log.LogLevel)
SetPrintStackTrace(printStackTraceFlag bool)
} }
type ContextConfig struct { type ContextConfig struct {
@ -251,6 +271,7 @@ func NewMigrationContext() *MigrationContext {
pointOfInterestTimeMutex: &sync.Mutex{}, pointOfInterestTimeMutex: &sync.Mutex{},
ColumnRenameMap: make(map[string]string), ColumnRenameMap: make(map[string]string),
PanicAbort: make(chan error), PanicAbort: make(chan error),
Log: NewDefaultLogger(),
} }
} }

73
go/base/default_logger.go Normal file
View File

@ -0,0 +1,73 @@
package base
import (
"github.com/outbrain/golib/log"
)
type simpleLogger struct{}
func NewDefaultLogger() *simpleLogger {
return &simpleLogger{}
}
func (*simpleLogger) Debug(args ...interface{}) {
log.Debug(args[0].(string), args[1:])
return
}
func (*simpleLogger) Debugf(format string, args ...interface{}) {
log.Debugf(format, args...)
return
}
func (*simpleLogger) Info(args ...interface{}) {
log.Info(args[0].(string), args[1:])
return
}
func (*simpleLogger) Infof(format string, args ...interface{}) {
log.Infof(format, args...)
return
}
func (*simpleLogger) Warning(args ...interface{}) error {
return log.Warning(args[0].(string), args[1:])
}
func (*simpleLogger) Warningf(format string, args ...interface{}) error {
return log.Warningf(format, args...)
}
func (*simpleLogger) Error(args ...interface{}) error {
return log.Error(args[0].(string), args[1:])
}
func (*simpleLogger) Errorf(format string, args ...interface{}) error {
return log.Errorf(format, args...)
}
func (*simpleLogger) Errore(err error) error {
return log.Errore(err)
}
func (*simpleLogger) Fatal(args ...interface{}) error {
return log.Fatal(args[0].(string), args[1:])
}
func (*simpleLogger) Fatalf(format string, args ...interface{}) error {
return log.Fatalf(format, args...)
}
func (*simpleLogger) Fatale(err error) error {
return log.Fatale(err)
}
func (*simpleLogger) SetLevel(level log.LogLevel) {
log.SetLevel(level)
return
}
func (*simpleLogger) SetPrintStackTrace(printStackTraceFlag bool) {
log.SetPrintStackTrace(printStackTraceFlag)
return
}

View File

@ -14,7 +14,6 @@ import (
gosql "database/sql" gosql "database/sql"
"github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/mysql"
"github.com/outbrain/golib/log"
) )
var ( var (
@ -86,7 +85,7 @@ func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig,
} }
if connectionConfig.Key.Port == port || (extraPort > 0 && connectionConfig.Key.Port == extraPort) { if connectionConfig.Key.Port == port || (extraPort > 0 && connectionConfig.Key.Port == extraPort) {
log.Infof("connection validated on %+v", connectionConfig.Key) migrationContext.Log.Infof("connection validated on %+v", connectionConfig.Key)
return version, nil return version, nil
} else if extraPort == 0 { } else if extraPort == 0 {
return "", fmt.Errorf("Unexpected database port reported: %+v", port) return "", fmt.Errorf("Unexpected database port reported: %+v", port)

View File

@ -13,13 +13,13 @@ import (
"github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/mysql"
"github.com/github/gh-ost/go/sql" "github.com/github/gh-ost/go/sql"
"github.com/outbrain/golib/log"
gomysql "github.com/siddontang/go-mysql/mysql" gomysql "github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication" "github.com/siddontang/go-mysql/replication"
"golang.org/x/net/context" "golang.org/x/net/context"
) )
type GoMySQLReader struct { type GoMySQLReader struct {
migrationContext *base.MigrationContext
connectionConfig *mysql.ConnectionConfig connectionConfig *mysql.ConnectionConfig
binlogSyncer *replication.BinlogSyncer binlogSyncer *replication.BinlogSyncer
binlogStreamer *replication.BinlogStreamer binlogStreamer *replication.BinlogStreamer
@ -30,6 +30,7 @@ type GoMySQLReader struct {
func NewGoMySQLReader(migrationContext *base.MigrationContext) (binlogReader *GoMySQLReader, err error) { func NewGoMySQLReader(migrationContext *base.MigrationContext) (binlogReader *GoMySQLReader, err error) {
binlogReader = &GoMySQLReader{ binlogReader = &GoMySQLReader{
migrationContext: migrationContext,
connectionConfig: migrationContext.InspectorConnectionConfig, connectionConfig: migrationContext.InspectorConnectionConfig,
currentCoordinates: mysql.BinlogCoordinates{}, currentCoordinates: mysql.BinlogCoordinates{},
currentCoordinatesMutex: &sync.Mutex{}, currentCoordinatesMutex: &sync.Mutex{},
@ -57,11 +58,11 @@ func NewGoMySQLReader(migrationContext *base.MigrationContext) (binlogReader *Go
// ConnectBinlogStreamer // ConnectBinlogStreamer
func (this *GoMySQLReader) ConnectBinlogStreamer(coordinates mysql.BinlogCoordinates) (err error) { func (this *GoMySQLReader) ConnectBinlogStreamer(coordinates mysql.BinlogCoordinates) (err error) {
if coordinates.IsEmpty() { if coordinates.IsEmpty() {
return log.Errorf("Empty coordinates at ConnectBinlogStreamer()") return this.migrationContext.Log.Errorf("Empty coordinates at ConnectBinlogStreamer()")
} }
this.currentCoordinates = coordinates this.currentCoordinates = coordinates
log.Infof("Connecting binlog streamer at %+v", this.currentCoordinates) this.migrationContext.Log.Infof("Connecting binlog streamer at %+v", this.currentCoordinates)
// Start sync with specified binlog file and position // Start sync with specified binlog file and position
this.binlogStreamer, err = this.binlogSyncer.StartSync(gomysql.Position{this.currentCoordinates.LogFile, uint32(this.currentCoordinates.LogPos)}) this.binlogStreamer, err = this.binlogSyncer.StartSync(gomysql.Position{this.currentCoordinates.LogFile, uint32(this.currentCoordinates.LogPos)})
@ -78,7 +79,7 @@ func (this *GoMySQLReader) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinate
// StreamEvents // StreamEvents
func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent *replication.RowsEvent, entriesChannel chan<- *BinlogEntry) error { func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent *replication.RowsEvent, entriesChannel chan<- *BinlogEntry) error {
if this.currentCoordinates.SmallerThanOrEquals(&this.LastAppliedRowsEventHint) { if this.currentCoordinates.SmallerThanOrEquals(&this.LastAppliedRowsEventHint) {
log.Debugf("Skipping handled query at %+v", this.currentCoordinates) this.migrationContext.Log.Debugf("Skipping handled query at %+v", this.currentCoordinates)
return nil return nil
} }
@ -147,14 +148,14 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha
defer this.currentCoordinatesMutex.Unlock() defer this.currentCoordinatesMutex.Unlock()
this.currentCoordinates.LogFile = string(rotateEvent.NextLogName) this.currentCoordinates.LogFile = string(rotateEvent.NextLogName)
}() }()
log.Infof("rotate to next log from %s:%d to %s", this.currentCoordinates.LogFile, int64(ev.Header.LogPos), rotateEvent.NextLogName) this.migrationContext.Log.Infof("rotate to next log from %s:%d to %s", this.currentCoordinates.LogFile, int64(ev.Header.LogPos), rotateEvent.NextLogName)
} else if rowsEvent, ok := ev.Event.(*replication.RowsEvent); ok { } else if rowsEvent, ok := ev.Event.(*replication.RowsEvent); ok {
if err := this.handleRowsEvent(ev, rowsEvent, entriesChannel); err != nil { if err := this.handleRowsEvent(ev, rowsEvent, entriesChannel); err != nil {
return err return err
} }
} }
} }
log.Debugf("done streaming events") this.migrationContext.Log.Debugf("done streaming events")
return nil return nil
} }

View File

@ -31,7 +31,7 @@ func acceptSignals(migrationContext *base.MigrationContext) {
for sig := range c { for sig := range c {
switch sig { switch sig {
case syscall.SIGHUP: case syscall.SIGHUP:
log.Infof("Received SIGHUP. Reloading configuration") migrationContext.Log.Infof("Received SIGHUP. Reloading configuration")
if err := migrationContext.ReadConfigFile(); err != nil { if err := migrationContext.ReadConfigFile(); err != nil {
log.Errore(err) log.Errore(err)
} else { } else {
@ -157,69 +157,69 @@ func main() {
return return
} }
log.SetLevel(log.ERROR) migrationContext.Log.SetLevel(log.ERROR)
if *verbose { if *verbose {
log.SetLevel(log.INFO) migrationContext.Log.SetLevel(log.INFO)
} }
if *debug { if *debug {
log.SetLevel(log.DEBUG) migrationContext.Log.SetLevel(log.DEBUG)
} }
if *stack { if *stack {
log.SetPrintStackTrace(*stack) migrationContext.Log.SetPrintStackTrace(*stack)
} }
if *quiet { if *quiet {
// Override!! // Override!!
log.SetLevel(log.ERROR) migrationContext.Log.SetLevel(log.ERROR)
} }
if migrationContext.DatabaseName == "" { if migrationContext.DatabaseName == "" {
log.Fatalf("--database must be provided and database name must not be empty") migrationContext.Log.Fatalf("--database must be provided and database name must not be empty")
} }
if migrationContext.OriginalTableName == "" { if migrationContext.OriginalTableName == "" {
log.Fatalf("--table must be provided and table name must not be empty") migrationContext.Log.Fatalf("--table must be provided and table name must not be empty")
} }
if migrationContext.AlterStatement == "" { if migrationContext.AlterStatement == "" {
log.Fatalf("--alter must be provided and statement must not be empty") migrationContext.Log.Fatalf("--alter must be provided and statement must not be empty")
} }
migrationContext.Noop = !(*executeFlag) migrationContext.Noop = !(*executeFlag)
if migrationContext.AllowedRunningOnMaster && migrationContext.TestOnReplica { if migrationContext.AllowedRunningOnMaster && migrationContext.TestOnReplica {
log.Fatalf("--allow-on-master and --test-on-replica are mutually exclusive") migrationContext.Log.Fatalf("--allow-on-master and --test-on-replica are mutually exclusive")
} }
if migrationContext.AllowedRunningOnMaster && migrationContext.MigrateOnReplica { if migrationContext.AllowedRunningOnMaster && migrationContext.MigrateOnReplica {
log.Fatalf("--allow-on-master and --migrate-on-replica are mutually exclusive") migrationContext.Log.Fatalf("--allow-on-master and --migrate-on-replica are mutually exclusive")
} }
if migrationContext.MigrateOnReplica && migrationContext.TestOnReplica { if migrationContext.MigrateOnReplica && migrationContext.TestOnReplica {
log.Fatalf("--migrate-on-replica and --test-on-replica are mutually exclusive") migrationContext.Log.Fatalf("--migrate-on-replica and --test-on-replica are mutually exclusive")
} }
if migrationContext.SwitchToRowBinlogFormat && migrationContext.AssumeRBR { if migrationContext.SwitchToRowBinlogFormat && migrationContext.AssumeRBR {
log.Fatalf("--switch-to-rbr and --assume-rbr are mutually exclusive") migrationContext.Log.Fatalf("--switch-to-rbr and --assume-rbr are mutually exclusive")
} }
if migrationContext.TestOnReplicaSkipReplicaStop { if migrationContext.TestOnReplicaSkipReplicaStop {
if !migrationContext.TestOnReplica { if !migrationContext.TestOnReplica {
log.Fatalf("--test-on-replica-skip-replica-stop requires --test-on-replica to be enabled") migrationContext.Log.Fatalf("--test-on-replica-skip-replica-stop requires --test-on-replica to be enabled")
} }
log.Warning("--test-on-replica-skip-replica-stop enabled. We will not stop replication before cut-over. Ensure you have a plugin that does this.") migrationContext.Log.Warning("--test-on-replica-skip-replica-stop enabled. We will not stop replication before cut-over. Ensure you have a plugin that does this.")
} }
if migrationContext.CliMasterUser != "" && migrationContext.AssumeMasterHostname == "" { if migrationContext.CliMasterUser != "" && migrationContext.AssumeMasterHostname == "" {
log.Fatalf("--master-user requires --assume-master-host") migrationContext.Log.Fatalf("--master-user requires --assume-master-host")
} }
if migrationContext.CliMasterPassword != "" && migrationContext.AssumeMasterHostname == "" { if migrationContext.CliMasterPassword != "" && migrationContext.AssumeMasterHostname == "" {
log.Fatalf("--master-password requires --assume-master-host") migrationContext.Log.Fatalf("--master-password requires --assume-master-host")
} }
if migrationContext.TLSCACertificate != "" && !migrationContext.UseTLS { if migrationContext.TLSCACertificate != "" && !migrationContext.UseTLS {
log.Fatalf("--ssl-ca requires --ssl") migrationContext.Log.Fatalf("--ssl-ca requires --ssl")
} }
if migrationContext.TLSCertificate != "" && !migrationContext.UseTLS { if migrationContext.TLSCertificate != "" && !migrationContext.UseTLS {
log.Fatalf("--ssl-cert requires --ssl") migrationContext.Log.Fatalf("--ssl-cert requires --ssl")
} }
if migrationContext.TLSKey != "" && !migrationContext.UseTLS { if migrationContext.TLSKey != "" && !migrationContext.UseTLS {
log.Fatalf("--ssl-key requires --ssl") migrationContext.Log.Fatalf("--ssl-key requires --ssl")
} }
if migrationContext.TLSAllowInsecure && !migrationContext.UseTLS { if migrationContext.TLSAllowInsecure && !migrationContext.UseTLS {
log.Fatalf("--ssl-allow-insecure requires --ssl") migrationContext.Log.Fatalf("--ssl-allow-insecure requires --ssl")
} }
if *replicationLagQuery != "" { if *replicationLagQuery != "" {
log.Warningf("--replication-lag-query is deprecated") migrationContext.Log.Warningf("--replication-lag-query is deprecated")
} }
switch *cutOver { switch *cutOver {
@ -228,19 +228,19 @@ func main() {
case "two-step": case "two-step":
migrationContext.CutOverType = base.CutOverTwoStep migrationContext.CutOverType = base.CutOverTwoStep
default: default:
log.Fatalf("Unknown cut-over: %s", *cutOver) migrationContext.Log.Fatalf("Unknown cut-over: %s", *cutOver)
} }
if err := migrationContext.ReadConfigFile(); err != nil { if err := migrationContext.ReadConfigFile(); err != nil {
log.Fatale(err) migrationContext.Log.Fatale(err)
} }
if err := migrationContext.ReadThrottleControlReplicaKeys(*throttleControlReplicas); err != nil { if err := migrationContext.ReadThrottleControlReplicaKeys(*throttleControlReplicas); err != nil {
log.Fatale(err) migrationContext.Log.Fatale(err)
} }
if err := migrationContext.ReadMaxLoad(*maxLoad); err != nil { if err := migrationContext.ReadMaxLoad(*maxLoad); err != nil {
log.Fatale(err) migrationContext.Log.Fatale(err)
} }
if err := migrationContext.ReadCriticalLoad(*criticalLoad); err != nil { if err := migrationContext.ReadCriticalLoad(*criticalLoad); err != nil {
log.Fatale(err) migrationContext.Log.Fatale(err)
} }
if migrationContext.ServeSocketFile == "" { if migrationContext.ServeSocketFile == "" {
migrationContext.ServeSocketFile = fmt.Sprintf("/tmp/gh-ost.%s.%s.sock", migrationContext.DatabaseName, migrationContext.OriginalTableName) migrationContext.ServeSocketFile = fmt.Sprintf("/tmp/gh-ost.%s.%s.sock", migrationContext.DatabaseName, migrationContext.OriginalTableName)
@ -249,7 +249,7 @@ func main() {
fmt.Println("Password:") fmt.Println("Password:")
bytePassword, err := terminal.ReadPassword(int(syscall.Stdin)) bytePassword, err := terminal.ReadPassword(int(syscall.Stdin))
if err != nil { if err != nil {
log.Fatale(err) migrationContext.Log.Fatale(err)
} }
migrationContext.CliPassword = string(bytePassword) migrationContext.CliPassword = string(bytePassword)
} }
@ -264,13 +264,13 @@ func main() {
migrationContext.SetDefaultNumRetries(*defaultRetries) migrationContext.SetDefaultNumRetries(*defaultRetries)
migrationContext.ApplyCredentials() migrationContext.ApplyCredentials()
if err := migrationContext.SetupTLS(); err != nil { if err := migrationContext.SetupTLS(); err != nil {
log.Fatale(err) migrationContext.Log.Fatale(err)
} }
if err := migrationContext.SetCutOverLockTimeoutSeconds(*cutOverLockTimeoutSeconds); err != nil { if err := migrationContext.SetCutOverLockTimeoutSeconds(*cutOverLockTimeoutSeconds); err != nil {
log.Errore(err) migrationContext.Log.Errore(err)
} }
if err := migrationContext.SetExponentialBackoffMaxInterval(*exponentialBackoffMaxInterval); err != nil { if err := migrationContext.SetExponentialBackoffMaxInterval(*exponentialBackoffMaxInterval); err != nil {
log.Errore(err) migrationContext.Log.Errore(err)
} }
log.Infof("starting gh-ost %+v", AppVersion) log.Infof("starting gh-ost %+v", AppVersion)
@ -280,7 +280,7 @@ func main() {
err := migrator.Migrate() err := migrator.Migrate()
if err != nil { if err != nil {
migrator.ExecOnFailureHook() migrator.ExecOnFailureHook()
log.Fatale(err) migrationContext.Log.Fatale(err)
} }
fmt.Fprintf(os.Stdout, "# Done\n") fmt.Fprintf(os.Stdout, "# Done\n")
} }

View File

@ -16,7 +16,6 @@ import (
"github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/mysql"
"github.com/github/gh-ost/go/sql" "github.com/github/gh-ost/go/sql"
"github.com/outbrain/golib/log"
"github.com/outbrain/golib/sqlutils" "github.com/outbrain/golib/sqlutils"
) )
@ -99,7 +98,7 @@ func (this *Applier) InitDBConnections() (err error) {
if err := this.readTableColumns(); err != nil { if err := this.readTableColumns(); err != nil {
return err return err
} }
log.Infof("Applier initiated on %+v, version %+v", this.connectionConfig.ImpliedKey, this.migrationContext.ApplierMySQLVersion) this.migrationContext.Log.Infof("Applier initiated on %+v, version %+v", this.connectionConfig.ImpliedKey, this.migrationContext.ApplierMySQLVersion)
return nil return nil
} }
@ -110,13 +109,13 @@ func (this *Applier) validateAndReadTimeZone() error {
return err return err
} }
log.Infof("will use time_zone='%s' on applier", this.migrationContext.ApplierTimeZone) this.migrationContext.Log.Infof("will use time_zone='%s' on applier", this.migrationContext.ApplierTimeZone)
return nil return nil
} }
// readTableColumns reads table columns on applier // readTableColumns reads table columns on applier
func (this *Applier) readTableColumns() (err error) { func (this *Applier) readTableColumns() (err error) {
log.Infof("Examining table structure on applier") this.migrationContext.Log.Infof("Examining table structure on applier")
this.migrationContext.OriginalTableColumnsOnApplier, _, err = mysql.GetTableColumns(this.db, this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName) this.migrationContext.OriginalTableColumnsOnApplier, _, err = mysql.GetTableColumns(this.db, this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName)
if err != nil { if err != nil {
return err return err
@ -157,7 +156,7 @@ func (this *Applier) ValidateOrDropExistingTables() error {
} }
} }
if len(this.migrationContext.GetOldTableName()) > mysql.MaxTableNameLength { if len(this.migrationContext.GetOldTableName()) > mysql.MaxTableNameLength {
log.Fatalf("--timestamp-old-table defined, but resulting table name (%s) is too long (only %d characters allowed)", this.migrationContext.GetOldTableName(), mysql.MaxTableNameLength) this.migrationContext.Log.Fatalf("--timestamp-old-table defined, but resulting table name (%s) is too long (only %d characters allowed)", this.migrationContext.GetOldTableName(), mysql.MaxTableNameLength)
} }
if this.tableExists(this.migrationContext.GetOldTableName()) { if this.tableExists(this.migrationContext.GetOldTableName()) {
@ -175,14 +174,14 @@ func (this *Applier) CreateGhostTable() error {
sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName), sql.EscapeName(this.migrationContext.OriginalTableName),
) )
log.Infof("Creating ghost table %s.%s", this.migrationContext.Log.Infof("Creating ghost table %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()), sql.EscapeName(this.migrationContext.GetGhostTableName()),
) )
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil { if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err return err
} }
log.Infof("Ghost table created") this.migrationContext.Log.Infof("Ghost table created")
return nil return nil
} }
@ -193,15 +192,15 @@ func (this *Applier) AlterGhost() error {
sql.EscapeName(this.migrationContext.GetGhostTableName()), sql.EscapeName(this.migrationContext.GetGhostTableName()),
this.migrationContext.AlterStatement, this.migrationContext.AlterStatement,
) )
log.Infof("Altering ghost table %s.%s", this.migrationContext.Log.Infof("Altering ghost table %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()), sql.EscapeName(this.migrationContext.GetGhostTableName()),
) )
log.Debugf("ALTER statement: %s", query) this.migrationContext.Log.Debugf("ALTER statement: %s", query)
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil { if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err return err
} }
log.Infof("Ghost table altered") this.migrationContext.Log.Infof("Ghost table altered")
return nil return nil
} }
@ -222,14 +221,14 @@ func (this *Applier) CreateChangelogTable() error {
sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetChangelogTableName()), sql.EscapeName(this.migrationContext.GetChangelogTableName()),
) )
log.Infof("Creating changelog table %s.%s", this.migrationContext.Log.Infof("Creating changelog table %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetChangelogTableName()), sql.EscapeName(this.migrationContext.GetChangelogTableName()),
) )
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil { if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err return err
} }
log.Infof("Changelog table created") this.migrationContext.Log.Infof("Changelog table created")
return nil return nil
} }
@ -239,14 +238,14 @@ func (this *Applier) dropTable(tableName string) error {
sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(tableName), sql.EscapeName(tableName),
) )
log.Infof("Dropping table %s.%s", this.migrationContext.Log.Infof("Dropping table %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(tableName), sql.EscapeName(tableName),
) )
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil { if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err return err
} }
log.Infof("Table dropped") this.migrationContext.Log.Infof("Table dropped")
return nil return nil
} }
@ -313,7 +312,7 @@ func (this *Applier) InitiateHeartbeat() {
if _, err := this.WriteChangelog("heartbeat", time.Now().Format(time.RFC3339Nano)); err != nil { if _, err := this.WriteChangelog("heartbeat", time.Now().Format(time.RFC3339Nano)); err != nil {
numSuccessiveFailures++ numSuccessiveFailures++
if numSuccessiveFailures > this.migrationContext.MaxRetries() { if numSuccessiveFailures > this.migrationContext.MaxRetries() {
return log.Errore(err) return this.migrationContext.Log.Errore(err)
} }
} else { } else {
numSuccessiveFailures = 0 numSuccessiveFailures = 0
@ -348,14 +347,14 @@ func (this *Applier) ExecuteThrottleQuery() (int64, error) {
} }
var result int64 var result int64
if err := this.db.QueryRow(throttleQuery).Scan(&result); err != nil { if err := this.db.QueryRow(throttleQuery).Scan(&result); err != nil {
return 0, log.Errore(err) return 0, this.migrationContext.Log.Errore(err)
} }
return result, nil return result, nil
} }
// ReadMigrationMinValues returns the minimum values to be iterated on rowcopy // ReadMigrationMinValues returns the minimum values to be iterated on rowcopy
func (this *Applier) ReadMigrationMinValues(uniqueKey *sql.UniqueKey) error { func (this *Applier) ReadMigrationMinValues(uniqueKey *sql.UniqueKey) error {
log.Debugf("Reading migration range according to key: %s", uniqueKey.Name) this.migrationContext.Log.Debugf("Reading migration range according to key: %s", uniqueKey.Name)
query, err := sql.BuildUniqueKeyMinValuesPreparedQuery(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, &uniqueKey.Columns) query, err := sql.BuildUniqueKeyMinValuesPreparedQuery(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, &uniqueKey.Columns)
if err != nil { if err != nil {
return err return err
@ -370,13 +369,13 @@ func (this *Applier) ReadMigrationMinValues(uniqueKey *sql.UniqueKey) error {
return err return err
} }
} }
log.Infof("Migration min values: [%s]", this.migrationContext.MigrationRangeMinValues) this.migrationContext.Log.Infof("Migration min values: [%s]", this.migrationContext.MigrationRangeMinValues)
return err return err
} }
// ReadMigrationMaxValues returns the maximum values to be iterated on rowcopy // ReadMigrationMaxValues returns the maximum values to be iterated on rowcopy
func (this *Applier) ReadMigrationMaxValues(uniqueKey *sql.UniqueKey) error { func (this *Applier) ReadMigrationMaxValues(uniqueKey *sql.UniqueKey) error {
log.Debugf("Reading migration range according to key: %s", uniqueKey.Name) this.migrationContext.Log.Debugf("Reading migration range according to key: %s", uniqueKey.Name)
query, err := sql.BuildUniqueKeyMaxValuesPreparedQuery(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, &uniqueKey.Columns) query, err := sql.BuildUniqueKeyMaxValuesPreparedQuery(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, &uniqueKey.Columns)
if err != nil { if err != nil {
return err return err
@ -391,7 +390,7 @@ func (this *Applier) ReadMigrationMaxValues(uniqueKey *sql.UniqueKey) error {
return err return err
} }
} }
log.Infof("Migration max values: [%s]", this.migrationContext.MigrationRangeMaxValues) this.migrationContext.Log.Infof("Migration max values: [%s]", this.migrationContext.MigrationRangeMaxValues)
return err return err
} }
@ -449,7 +448,7 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo
return hasFurtherRange, nil return hasFurtherRange, nil
} }
} }
log.Debugf("Iteration complete: no further range to iterate") this.migrationContext.Log.Debugf("Iteration complete: no further range to iterate")
return hasFurtherRange, nil return hasFurtherRange, nil
} }
@ -507,7 +506,7 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
} }
rowsAffected, _ = sqlResult.RowsAffected() rowsAffected, _ = sqlResult.RowsAffected()
duration = time.Since(startTime) duration = time.Since(startTime)
log.Debugf( this.migrationContext.Log.Debugf(
"Issued INSERT on range: [%s]..[%s]; iteration: %d; chunk-size: %d", "Issued INSERT on range: [%s]..[%s]; iteration: %d; chunk-size: %d",
this.migrationContext.MigrationIterationRangeMinValues, this.migrationContext.MigrationIterationRangeMinValues,
this.migrationContext.MigrationIterationRangeMaxValues, this.migrationContext.MigrationIterationRangeMaxValues,
@ -522,7 +521,7 @@ func (this *Applier) LockOriginalTable() error {
sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName), sql.EscapeName(this.migrationContext.OriginalTableName),
) )
log.Infof("Locking %s.%s", this.migrationContext.Log.Infof("Locking %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName), sql.EscapeName(this.migrationContext.OriginalTableName),
) )
@ -530,18 +529,18 @@ func (this *Applier) LockOriginalTable() error {
if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil { if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
return err return err
} }
log.Infof("Table locked") this.migrationContext.Log.Infof("Table locked")
return nil return nil
} }
// UnlockTables makes tea. No wait, it unlocks tables. // UnlockTables makes tea. No wait, it unlocks tables.
func (this *Applier) UnlockTables() error { func (this *Applier) UnlockTables() error {
query := `unlock /* gh-ost */ tables` query := `unlock /* gh-ost */ tables`
log.Infof("Unlocking tables") this.migrationContext.Log.Infof("Unlocking tables")
if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil { if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
return err return err
} }
log.Infof("Tables unlocked") this.migrationContext.Log.Infof("Tables unlocked")
return nil return nil
} }
@ -555,7 +554,7 @@ func (this *Applier) SwapTablesQuickAndBumpy() error {
sql.EscapeName(this.migrationContext.OriginalTableName), sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.GetOldTableName()), sql.EscapeName(this.migrationContext.GetOldTableName()),
) )
log.Infof("Renaming original table") this.migrationContext.Log.Infof("Renaming original table")
this.migrationContext.RenameTablesStartTime = time.Now() this.migrationContext.RenameTablesStartTime = time.Now()
if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil { if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
return err return err
@ -565,13 +564,13 @@ func (this *Applier) SwapTablesQuickAndBumpy() error {
sql.EscapeName(this.migrationContext.GetGhostTableName()), sql.EscapeName(this.migrationContext.GetGhostTableName()),
sql.EscapeName(this.migrationContext.OriginalTableName), sql.EscapeName(this.migrationContext.OriginalTableName),
) )
log.Infof("Renaming ghost table") this.migrationContext.Log.Infof("Renaming ghost table")
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil { if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err return err
} }
this.migrationContext.RenameTablesEndTime = time.Now() this.migrationContext.RenameTablesEndTime = time.Now()
log.Infof("Tables renamed") this.migrationContext.Log.Infof("Tables renamed")
return nil return nil
} }
@ -590,7 +589,7 @@ func (this *Applier) RenameTablesRollback() (renameError error) {
sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName), sql.EscapeName(this.migrationContext.OriginalTableName),
) )
log.Infof("Renaming back both tables") this.migrationContext.Log.Infof("Renaming back both tables")
if _, err := sqlutils.ExecNoPrepare(this.db, query); err == nil { if _, err := sqlutils.ExecNoPrepare(this.db, query); err == nil {
return nil return nil
} }
@ -601,7 +600,7 @@ func (this *Applier) RenameTablesRollback() (renameError error) {
sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()), sql.EscapeName(this.migrationContext.GetGhostTableName()),
) )
log.Infof("Renaming back to ghost table") this.migrationContext.Log.Infof("Renaming back to ghost table")
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil { if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
renameError = err renameError = err
} }
@ -611,11 +610,11 @@ func (this *Applier) RenameTablesRollback() (renameError error) {
sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName), sql.EscapeName(this.migrationContext.OriginalTableName),
) )
log.Infof("Renaming back to original table") this.migrationContext.Log.Infof("Renaming back to original table")
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil { if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
renameError = err renameError = err
} }
return log.Errore(renameError) return this.migrationContext.Log.Errore(renameError)
} }
// StopSlaveIOThread is applicable with --test-on-replica; it stops the IO thread, duh. // StopSlaveIOThread is applicable with --test-on-replica; it stops the IO thread, duh.
@ -623,44 +622,44 @@ func (this *Applier) RenameTablesRollback() (renameError error) {
// and have them written to the binary log, so that we can then read them via streamer. // and have them written to the binary log, so that we can then read them via streamer.
func (this *Applier) StopSlaveIOThread() error { func (this *Applier) StopSlaveIOThread() error {
query := `stop /* gh-ost */ slave io_thread` query := `stop /* gh-ost */ slave io_thread`
log.Infof("Stopping replication IO thread") this.migrationContext.Log.Infof("Stopping replication IO thread")
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil { if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err return err
} }
log.Infof("Replication IO thread stopped") this.migrationContext.Log.Infof("Replication IO thread stopped")
return nil return nil
} }
// StartSlaveIOThread is applicable with --test-on-replica // StartSlaveIOThread is applicable with --test-on-replica
func (this *Applier) StartSlaveIOThread() error { func (this *Applier) StartSlaveIOThread() error {
query := `start /* gh-ost */ slave io_thread` query := `start /* gh-ost */ slave io_thread`
log.Infof("Starting replication IO thread") this.migrationContext.Log.Infof("Starting replication IO thread")
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil { if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err return err
} }
log.Infof("Replication IO thread started") this.migrationContext.Log.Infof("Replication IO thread started")
return nil return nil
} }
// StartSlaveSQLThread is applicable with --test-on-replica // StartSlaveSQLThread is applicable with --test-on-replica
func (this *Applier) StopSlaveSQLThread() error { func (this *Applier) StopSlaveSQLThread() error {
query := `stop /* gh-ost */ slave sql_thread` query := `stop /* gh-ost */ slave sql_thread`
log.Infof("Verifying SQL thread is stopped") this.migrationContext.Log.Infof("Verifying SQL thread is stopped")
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil { if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err return err
} }
log.Infof("SQL thread stopped") this.migrationContext.Log.Infof("SQL thread stopped")
return nil return nil
} }
// StartSlaveSQLThread is applicable with --test-on-replica // StartSlaveSQLThread is applicable with --test-on-replica
func (this *Applier) StartSlaveSQLThread() error { func (this *Applier) StartSlaveSQLThread() error {
query := `start /* gh-ost */ slave sql_thread` query := `start /* gh-ost */ slave sql_thread`
log.Infof("Verifying SQL thread is running") this.migrationContext.Log.Infof("Verifying SQL thread is running")
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil { if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err return err
} }
log.Infof("SQL thread started") this.migrationContext.Log.Infof("SQL thread started")
return nil return nil
} }
@ -677,7 +676,7 @@ func (this *Applier) StopReplication() error {
if err != nil { if err != nil {
return err return err
} }
log.Infof("Replication IO thread at %+v. SQL thread is at %+v", *readBinlogCoordinates, *executeBinlogCoordinates) this.migrationContext.Log.Infof("Replication IO thread at %+v. SQL thread is at %+v", *readBinlogCoordinates, *executeBinlogCoordinates)
return nil return nil
} }
@ -689,7 +688,7 @@ func (this *Applier) StartReplication() error {
if err := this.StartSlaveSQLThread(); err != nil { if err := this.StartSlaveSQLThread(); err != nil {
return err return err
} }
log.Infof("Replication started") this.migrationContext.Log.Infof("Replication started")
return nil return nil
} }
@ -703,7 +702,7 @@ func (this *Applier) ExpectUsedLock(sessionId int64) error {
var result int64 var result int64
query := `select is_used_lock(?)` query := `select is_used_lock(?)`
lockName := this.GetSessionLockName(sessionId) lockName := this.GetSessionLockName(sessionId)
log.Infof("Checking session lock: %s", lockName) this.migrationContext.Log.Infof("Checking session lock: %s", lockName)
if err := this.db.QueryRow(query, lockName).Scan(&result); err != nil || result != sessionId { if err := this.db.QueryRow(query, lockName).Scan(&result); err != nil || result != sessionId {
return fmt.Errorf("Session lock %s expected to be found but wasn't", lockName) return fmt.Errorf("Session lock %s expected to be found but wasn't", lockName)
} }
@ -738,7 +737,7 @@ func (this *Applier) ExpectProcess(sessionId int64, stateHint, infoHint string)
// DropAtomicCutOverSentryTableIfExists checks if the "old" table name // DropAtomicCutOverSentryTableIfExists checks if the "old" table name
// happens to be a cut-over magic table; if so, it drops it. // happens to be a cut-over magic table; if so, it drops it.
func (this *Applier) DropAtomicCutOverSentryTableIfExists() error { func (this *Applier) DropAtomicCutOverSentryTableIfExists() error {
log.Infof("Looking for magic cut-over table") this.migrationContext.Log.Infof("Looking for magic cut-over table")
tableName := this.migrationContext.GetOldTableName() tableName := this.migrationContext.GetOldTableName()
rowMap := this.showTableStatus(tableName) rowMap := this.showTableStatus(tableName)
if rowMap == nil { if rowMap == nil {
@ -748,7 +747,7 @@ func (this *Applier) DropAtomicCutOverSentryTableIfExists() error {
if rowMap["Comment"].String != atomicCutOverMagicHint { if rowMap["Comment"].String != atomicCutOverMagicHint {
return fmt.Errorf("Expected magic comment on %s, did not find it", tableName) return fmt.Errorf("Expected magic comment on %s, did not find it", tableName)
} }
log.Infof("Dropping magic cut-over table") this.migrationContext.Log.Infof("Dropping magic cut-over table")
return this.dropTable(tableName) return this.dropTable(tableName)
} }
@ -768,14 +767,14 @@ func (this *Applier) CreateAtomicCutOverSentryTable() error {
this.migrationContext.TableEngine, this.migrationContext.TableEngine,
atomicCutOverMagicHint, atomicCutOverMagicHint,
) )
log.Infof("Creating magic cut-over table %s.%s", this.migrationContext.Log.Infof("Creating magic cut-over table %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(tableName), sql.EscapeName(tableName),
) )
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil { if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err return err
} }
log.Infof("Magic cut-over table created") this.migrationContext.Log.Infof("Magic cut-over table created")
return nil return nil
} }
@ -804,7 +803,7 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
lockResult := 0 lockResult := 0
query := `select get_lock(?, 0)` query := `select get_lock(?, 0)`
lockName := this.GetSessionLockName(sessionId) lockName := this.GetSessionLockName(sessionId)
log.Infof("Grabbing voluntary lock: %s", lockName) this.migrationContext.Log.Infof("Grabbing voluntary lock: %s", lockName)
if err := tx.QueryRow(query, lockName).Scan(&lockResult); err != nil || lockResult != 1 { if err := tx.QueryRow(query, lockName).Scan(&lockResult); err != nil || lockResult != 1 {
err := fmt.Errorf("Unable to acquire lock %s", lockName) err := fmt.Errorf("Unable to acquire lock %s", lockName)
tableLocked <- err tableLocked <- err
@ -812,7 +811,7 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
} }
tableLockTimeoutSeconds := this.migrationContext.CutOverLockTimeoutSeconds * 2 tableLockTimeoutSeconds := this.migrationContext.CutOverLockTimeoutSeconds * 2
log.Infof("Setting LOCK timeout as %d seconds", tableLockTimeoutSeconds) this.migrationContext.Log.Infof("Setting LOCK timeout as %d seconds", tableLockTimeoutSeconds)
query = fmt.Sprintf(`set session lock_wait_timeout:=%d`, tableLockTimeoutSeconds) query = fmt.Sprintf(`set session lock_wait_timeout:=%d`, tableLockTimeoutSeconds)
if _, err := tx.Exec(query); err != nil { if _, err := tx.Exec(query); err != nil {
tableLocked <- err tableLocked <- err
@ -830,7 +829,7 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()), sql.EscapeName(this.migrationContext.GetOldTableName()),
) )
log.Infof("Locking %s.%s, %s.%s", this.migrationContext.Log.Infof("Locking %s.%s, %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName), sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.DatabaseName),
@ -841,7 +840,7 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
tableLocked <- err tableLocked <- err
return err return err
} }
log.Infof("Tables locked") this.migrationContext.Log.Infof("Tables locked")
tableLocked <- nil // No error. tableLocked <- nil // No error.
// From this point on, we are committed to UNLOCK TABLES. No matter what happens, // From this point on, we are committed to UNLOCK TABLES. No matter what happens,
@ -850,22 +849,22 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
// The cut-over phase will proceed to apply remaining backlog onto ghost table, // The cut-over phase will proceed to apply remaining backlog onto ghost table,
// and issue RENAME. We wait here until told to proceed. // and issue RENAME. We wait here until told to proceed.
<-okToUnlockTable <-okToUnlockTable
log.Infof("Will now proceed to drop magic table and unlock tables") this.migrationContext.Log.Infof("Will now proceed to drop magic table and unlock tables")
// The magic table is here because we locked it. And we are the only ones allowed to drop it. // The magic table is here because we locked it. And we are the only ones allowed to drop it.
// And in fact, we will: // And in fact, we will:
log.Infof("Dropping magic cut-over table") this.migrationContext.Log.Infof("Dropping magic cut-over table")
query = fmt.Sprintf(`drop /* gh-ost */ table if exists %s.%s`, query = fmt.Sprintf(`drop /* gh-ost */ table if exists %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()), sql.EscapeName(this.migrationContext.GetOldTableName()),
) )
if _, err := tx.Exec(query); err != nil { if _, err := tx.Exec(query); err != nil {
log.Errore(err) this.migrationContext.Log.Errore(err)
// We DO NOT return here because we must `UNLOCK TABLES`! // We DO NOT return here because we must `UNLOCK TABLES`!
} }
// Tables still locked // Tables still locked
log.Infof("Releasing lock from %s.%s, %s.%s", this.migrationContext.Log.Infof("Releasing lock from %s.%s, %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName), sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.DatabaseName),
@ -874,9 +873,9 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
query = `unlock tables` query = `unlock tables`
if _, err := tx.Exec(query); err != nil { if _, err := tx.Exec(query); err != nil {
tableUnlocked <- err tableUnlocked <- err
return log.Errore(err) return this.migrationContext.Log.Errore(err)
} }
log.Infof("Tables unlocked") this.migrationContext.Log.Infof("Tables unlocked")
tableUnlocked <- nil tableUnlocked <- nil
return nil return nil
} }
@ -898,7 +897,7 @@ func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed
} }
sessionIdChan <- sessionId sessionIdChan <- sessionId
log.Infof("Setting RENAME timeout as %d seconds", this.migrationContext.CutOverLockTimeoutSeconds) this.migrationContext.Log.Infof("Setting RENAME timeout as %d seconds", this.migrationContext.CutOverLockTimeoutSeconds)
query := fmt.Sprintf(`set session lock_wait_timeout:=%d`, this.migrationContext.CutOverLockTimeoutSeconds) query := fmt.Sprintf(`set session lock_wait_timeout:=%d`, this.migrationContext.CutOverLockTimeoutSeconds)
if _, err := tx.Exec(query); err != nil { if _, err := tx.Exec(query); err != nil {
return err return err
@ -914,13 +913,13 @@ func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed
sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName), sql.EscapeName(this.migrationContext.OriginalTableName),
) )
log.Infof("Issuing and expecting this to block: %s", query) this.migrationContext.Log.Infof("Issuing and expecting this to block: %s", query)
if _, err := tx.Exec(query); err != nil { if _, err := tx.Exec(query); err != nil {
tablesRenamed <- err tablesRenamed <- err
return log.Errore(err) return this.migrationContext.Log.Errore(err)
} }
tablesRenamed <- nil tablesRenamed <- nil
log.Infof("Tables renamed") this.migrationContext.Log.Infof("Tables renamed")
return nil return nil
} }
@ -1026,19 +1025,19 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
}() }()
if err != nil { if err != nil {
return log.Errore(err) return this.migrationContext.Log.Errore(err)
} }
// no error // no error
atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, int64(len(dmlEvents))) atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, int64(len(dmlEvents)))
if this.migrationContext.CountTableRows { if this.migrationContext.CountTableRows {
atomic.AddInt64(&this.migrationContext.RowsDeltaEstimate, totalDelta) atomic.AddInt64(&this.migrationContext.RowsDeltaEstimate, totalDelta)
} }
log.Debugf("ApplyDMLEventQueries() applied %d events in one transaction", len(dmlEvents)) this.migrationContext.Log.Debugf("ApplyDMLEventQueries() applied %d events in one transaction", len(dmlEvents))
return nil return nil
} }
func (this *Applier) Teardown() { func (this *Applier) Teardown() {
log.Debugf("Tearing down...") this.migrationContext.Log.Debugf("Tearing down...")
this.db.Close() this.db.Close()
this.singletonDB.Close() this.singletonDB.Close()
atomic.StoreInt64(&this.finishedMigrating, 1) atomic.StoreInt64(&this.finishedMigrating, 1)

View File

@ -17,7 +17,6 @@ import (
"github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/mysql"
"github.com/github/gh-ost/go/sql" "github.com/github/gh-ost/go/sql"
"github.com/outbrain/golib/log"
"github.com/outbrain/golib/sqlutils" "github.com/outbrain/golib/sqlutils"
) )
@ -69,7 +68,7 @@ func (this *Inspector) InitDBConnections() (err error) {
if err := this.applyBinlogFormat(); err != nil { if err := this.applyBinlogFormat(); err != nil {
return err return err
} }
log.Infof("Inspector initiated on %+v, version %+v", this.connectionConfig.ImpliedKey, this.migrationContext.InspectorMySQLVersion) this.migrationContext.Log.Infof("Inspector initiated on %+v, version %+v", this.connectionConfig.ImpliedKey, this.migrationContext.InspectorMySQLVersion)
return nil return nil
} }
@ -137,14 +136,14 @@ func (this *Inspector) inspectOriginalAndGhostTables() (err error) {
switch column.Type { switch column.Type {
case sql.FloatColumnType: case sql.FloatColumnType:
{ {
log.Warning("Will not use %+v as shared key due to FLOAT data type", sharedUniqueKey.Name) this.migrationContext.Log.Warning("Will not use %+v as shared key due to FLOAT data type", sharedUniqueKey.Name)
uniqueKeyIsValid = false uniqueKeyIsValid = false
} }
case sql.JSONColumnType: case sql.JSONColumnType:
{ {
// Noteworthy that at this time MySQL does not allow JSON indexing anyhow, but this code // Noteworthy that at this time MySQL does not allow JSON indexing anyhow, but this code
// will remain in place to potentially handle the future case where JSON is supported in indexes. // will remain in place to potentially handle the future case where JSON is supported in indexes.
log.Warning("Will not use %+v as shared key due to JSON data type", sharedUniqueKey.Name) this.migrationContext.Log.Warning("Will not use %+v as shared key due to JSON data type", sharedUniqueKey.Name)
uniqueKeyIsValid = false uniqueKeyIsValid = false
} }
} }
@ -157,17 +156,17 @@ func (this *Inspector) inspectOriginalAndGhostTables() (err error) {
if this.migrationContext.UniqueKey == nil { if this.migrationContext.UniqueKey == nil {
return fmt.Errorf("No shared unique key can be found after ALTER! Bailing out") return fmt.Errorf("No shared unique key can be found after ALTER! Bailing out")
} }
log.Infof("Chosen shared unique key is %s", this.migrationContext.UniqueKey.Name) this.migrationContext.Log.Infof("Chosen shared unique key is %s", this.migrationContext.UniqueKey.Name)
if this.migrationContext.UniqueKey.HasNullable { if this.migrationContext.UniqueKey.HasNullable {
if this.migrationContext.NullableUniqueKeyAllowed { if this.migrationContext.NullableUniqueKeyAllowed {
log.Warningf("Chosen key (%s) has nullable columns. You have supplied with --allow-nullable-unique-key and so this migration proceeds. As long as there aren't NULL values in this key's column, migration should be fine. NULL values will corrupt migration's data", this.migrationContext.UniqueKey) this.migrationContext.Log.Warningf("Chosen key (%s) has nullable columns. You have supplied with --allow-nullable-unique-key and so this migration proceeds. As long as there aren't NULL values in this key's column, migration should be fine. NULL values will corrupt migration's data", this.migrationContext.UniqueKey)
} else { } else {
return fmt.Errorf("Chosen key (%s) has nullable columns. Bailing out. To force this operation to continue, supply --allow-nullable-unique-key flag. Only do so if you are certain there are no actual NULL values in this key. As long as there aren't, migration should be fine. NULL values in columns of this key will corrupt migration's data", this.migrationContext.UniqueKey) return fmt.Errorf("Chosen key (%s) has nullable columns. Bailing out. To force this operation to continue, supply --allow-nullable-unique-key flag. Only do so if you are certain there are no actual NULL values in this key. As long as there aren't, migration should be fine. NULL values in columns of this key will corrupt migration's data", this.migrationContext.UniqueKey)
} }
} }
this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns = this.getSharedColumns(this.migrationContext.OriginalTableColumns, this.migrationContext.GhostTableColumns, this.migrationContext.OriginalTableVirtualColumns, this.migrationContext.GhostTableVirtualColumns, this.migrationContext.ColumnRenameMap) this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns = this.getSharedColumns(this.migrationContext.OriginalTableColumns, this.migrationContext.GhostTableColumns, this.migrationContext.OriginalTableVirtualColumns, this.migrationContext.GhostTableVirtualColumns, this.migrationContext.ColumnRenameMap)
log.Infof("Shared columns are %s", this.migrationContext.SharedColumns) this.migrationContext.Log.Infof("Shared columns are %s", this.migrationContext.SharedColumns)
// By fact that a non-empty unique key exists we also know the shared columns are non-empty // By fact that a non-empty unique key exists we also know the shared columns are non-empty
// This additional step looks at which columns are unsigned. We could have merged this within // This additional step looks at which columns are unsigned. We could have merged this within
@ -250,19 +249,19 @@ func (this *Inspector) validateGrants() error {
this.migrationContext.HasSuperPrivilege = foundSuper this.migrationContext.HasSuperPrivilege = foundSuper
if foundAll { if foundAll {
log.Infof("User has ALL privileges") this.migrationContext.Log.Infof("User has ALL privileges")
return nil return nil
} }
if foundSuper && foundReplicationSlave && foundDBAll { if foundSuper && foundReplicationSlave && foundDBAll {
log.Infof("User has SUPER, REPLICATION SLAVE privileges, and has ALL privileges on %s.*", sql.EscapeName(this.migrationContext.DatabaseName)) this.migrationContext.Log.Infof("User has SUPER, REPLICATION SLAVE privileges, and has ALL privileges on %s.*", sql.EscapeName(this.migrationContext.DatabaseName))
return nil return nil
} }
if foundReplicationClient && foundReplicationSlave && foundDBAll { if foundReplicationClient && foundReplicationSlave && foundDBAll {
log.Infof("User has REPLICATION CLIENT, REPLICATION SLAVE privileges, and has ALL privileges on %s.*", sql.EscapeName(this.migrationContext.DatabaseName)) this.migrationContext.Log.Infof("User has REPLICATION CLIENT, REPLICATION SLAVE privileges, and has ALL privileges on %s.*", sql.EscapeName(this.migrationContext.DatabaseName))
return nil return nil
} }
log.Debugf("Privileges: Super: %t, REPLICATION CLIENT: %t, REPLICATION SLAVE: %t, ALL on *.*: %t, ALL on %s.*: %t", foundSuper, foundReplicationClient, foundReplicationSlave, foundAll, sql.EscapeName(this.migrationContext.DatabaseName), foundDBAll) this.migrationContext.Log.Debugf("Privileges: Super: %t, REPLICATION CLIENT: %t, REPLICATION SLAVE: %t, ALL on *.*: %t, ALL on %s.*: %t", foundSuper, foundReplicationClient, foundReplicationSlave, foundAll, sql.EscapeName(this.migrationContext.DatabaseName), foundDBAll)
return log.Errorf("User has insufficient privileges for migration. Needed: SUPER|REPLICATION CLIENT, REPLICATION SLAVE and ALL on %s.*", sql.EscapeName(this.migrationContext.DatabaseName)) return this.migrationContext.Log.Errorf("User has insufficient privileges for migration. Needed: SUPER|REPLICATION CLIENT, REPLICATION SLAVE and ALL on %s.*", sql.EscapeName(this.migrationContext.DatabaseName))
} }
// restartReplication is required so that we are _certain_ the binlog format and // restartReplication is required so that we are _certain_ the binlog format and
@ -270,7 +269,7 @@ func (this *Inspector) validateGrants() error {
// It is entirely possible, for example, that the replication is using 'STATEMENT' // It is entirely possible, for example, that the replication is using 'STATEMENT'
// binlog format even as the variable says 'ROW' // binlog format even as the variable says 'ROW'
func (this *Inspector) restartReplication() error { func (this *Inspector) restartReplication() error {
log.Infof("Restarting replication on %s:%d to make sure binlog settings apply to replication thread", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port) this.migrationContext.Log.Infof("Restarting replication on %s:%d to make sure binlog settings apply to replication thread", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
masterKey, _ := mysql.GetMasterKeyFromSlaveStatus(this.connectionConfig) masterKey, _ := mysql.GetMasterKeyFromSlaveStatus(this.connectionConfig)
if masterKey == nil { if masterKey == nil {
@ -289,7 +288,7 @@ func (this *Inspector) restartReplication() error {
} }
time.Sleep(startSlavePostWaitMilliseconds) time.Sleep(startSlavePostWaitMilliseconds)
log.Debugf("Replication restarted") this.migrationContext.Log.Debugf("Replication restarted")
return nil return nil
} }
@ -309,7 +308,7 @@ func (this *Inspector) applyBinlogFormat() error {
if err := this.restartReplication(); err != nil { if err := this.restartReplication(); err != nil {
return err return err
} }
log.Debugf("'ROW' binlog format applied") this.migrationContext.Log.Debugf("'ROW' binlog format applied")
return nil return nil
} }
// We already have RBR, no explicit switch // We already have RBR, no explicit switch
@ -347,7 +346,7 @@ func (this *Inspector) validateBinlogs() error {
if countReplicas > 0 { if countReplicas > 0 {
return fmt.Errorf("%s:%d has %s binlog_format, but I'm too scared to change it to ROW because it has replicas. Bailing out", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port, this.migrationContext.OriginalBinlogFormat) return fmt.Errorf("%s:%d has %s binlog_format, but I'm too scared to change it to ROW because it has replicas. Bailing out", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port, this.migrationContext.OriginalBinlogFormat)
} }
log.Infof("%s:%d has %s binlog_format. I will change it to ROW, and will NOT change it back, even in the event of failure.", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port, this.migrationContext.OriginalBinlogFormat) this.migrationContext.Log.Infof("%s:%d has %s binlog_format. I will change it to ROW, and will NOT change it back, even in the event of failure.", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port, this.migrationContext.OriginalBinlogFormat)
} }
query = `select @@global.binlog_row_image` query = `select @@global.binlog_row_image`
if err := this.db.QueryRow(query).Scan(&this.migrationContext.OriginalBinlogRowImage); err != nil { if err := this.db.QueryRow(query).Scan(&this.migrationContext.OriginalBinlogRowImage); err != nil {
@ -359,7 +358,7 @@ func (this *Inspector) validateBinlogs() error {
return fmt.Errorf("%s:%d has '%s' binlog_row_image, and only 'FULL' is supported. This operation cannot proceed. You may `set global binlog_row_image='full'` and try again", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port, this.migrationContext.OriginalBinlogRowImage) return fmt.Errorf("%s:%d has '%s' binlog_row_image, and only 'FULL' is supported. This operation cannot proceed. You may `set global binlog_row_image='full'` and try again", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port, this.migrationContext.OriginalBinlogRowImage)
} }
log.Infof("binary logs validated on %s:%d", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port) this.migrationContext.Log.Infof("binary logs validated on %s:%d", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
return nil return nil
} }
@ -372,12 +371,12 @@ func (this *Inspector) validateLogSlaveUpdates() error {
} }
if logSlaveUpdates { if logSlaveUpdates {
log.Infof("log_slave_updates validated on %s:%d", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port) this.migrationContext.Log.Infof("log_slave_updates validated on %s:%d", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
return nil return nil
} }
if this.migrationContext.IsTungsten { if this.migrationContext.IsTungsten {
log.Warningf("log_slave_updates not found on %s:%d, but --tungsten provided, so I'm proceeding", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port) this.migrationContext.Log.Warningf("log_slave_updates not found on %s:%d, but --tungsten provided, so I'm proceeding", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
return nil return nil
} }
@ -386,7 +385,7 @@ func (this *Inspector) validateLogSlaveUpdates() error {
} }
if this.migrationContext.InspectorIsAlsoApplier() { if this.migrationContext.InspectorIsAlsoApplier() {
log.Warningf("log_slave_updates not found on %s:%d, but executing directly on master, so I'm proceeding", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port) this.migrationContext.Log.Warningf("log_slave_updates not found on %s:%d, but executing directly on master, so I'm proceeding", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
return nil return nil
} }
@ -413,17 +412,17 @@ func (this *Inspector) validateTable() error {
return err return err
} }
if !tableFound { if !tableFound {
return log.Errorf("Cannot find table %s.%s!", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName)) return this.migrationContext.Log.Errorf("Cannot find table %s.%s!", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
} }
log.Infof("Table found. Engine=%s", this.migrationContext.TableEngine) this.migrationContext.Log.Infof("Table found. Engine=%s", this.migrationContext.TableEngine)
log.Debugf("Estimated number of rows via STATUS: %d", this.migrationContext.RowsEstimate) this.migrationContext.Log.Debugf("Estimated number of rows via STATUS: %d", this.migrationContext.RowsEstimate)
return nil return nil
} }
// validateTableForeignKeys makes sure no foreign keys exist on the migrated table // validateTableForeignKeys makes sure no foreign keys exist on the migrated table
func (this *Inspector) validateTableForeignKeys(allowChildForeignKeys bool) error { func (this *Inspector) validateTableForeignKeys(allowChildForeignKeys bool) error {
if this.migrationContext.SkipForeignKeyChecks { if this.migrationContext.SkipForeignKeyChecks {
log.Warning("--skip-foreign-key-checks provided: will not check for foreign keys") this.migrationContext.Log.Warning("--skip-foreign-key-checks provided: will not check for foreign keys")
return nil return nil
} }
query := ` query := `
@ -457,16 +456,16 @@ func (this *Inspector) validateTableForeignKeys(allowChildForeignKeys bool) erro
return err return err
} }
if numParentForeignKeys > 0 { if numParentForeignKeys > 0 {
return log.Errorf("Found %d parent-side foreign keys on %s.%s. Parent-side foreign keys are not supported. Bailing out", numParentForeignKeys, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName)) return this.migrationContext.Log.Errorf("Found %d parent-side foreign keys on %s.%s. Parent-side foreign keys are not supported. Bailing out", numParentForeignKeys, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
} }
if numChildForeignKeys > 0 { if numChildForeignKeys > 0 {
if allowChildForeignKeys { if allowChildForeignKeys {
log.Debugf("Foreign keys found and will be dropped, as per given --discard-foreign-keys flag") this.migrationContext.Log.Debugf("Foreign keys found and will be dropped, as per given --discard-foreign-keys flag")
return nil return nil
} }
return log.Errorf("Found %d child-side foreign keys on %s.%s. Child-side foreign keys are not supported. Bailing out", numChildForeignKeys, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName)) return this.migrationContext.Log.Errorf("Found %d child-side foreign keys on %s.%s. Child-side foreign keys are not supported. Bailing out", numChildForeignKeys, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
} }
log.Debugf("Validated no foreign keys exist on table") this.migrationContext.Log.Debugf("Validated no foreign keys exist on table")
return nil return nil
} }
@ -492,9 +491,9 @@ func (this *Inspector) validateTableTriggers() error {
return err return err
} }
if numTriggers > 0 { if numTriggers > 0 {
return log.Errorf("Found triggers on %s.%s. Triggers are not supported at this time. Bailing out", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName)) return this.migrationContext.Log.Errorf("Found triggers on %s.%s. Triggers are not supported at this time. Bailing out", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
} }
log.Debugf("Validated no triggers exist on table") this.migrationContext.Log.Debugf("Validated no triggers exist on table")
return nil return nil
} }
@ -514,9 +513,9 @@ func (this *Inspector) estimateTableRowsViaExplain() error {
return err return err
} }
if !outputFound { if !outputFound {
return log.Errorf("Cannot run EXPLAIN on %s.%s!", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName)) return this.migrationContext.Log.Errorf("Cannot run EXPLAIN on %s.%s!", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
} }
log.Infof("Estimated number of rows via EXPLAIN: %d", this.migrationContext.RowsEstimate) this.migrationContext.Log.Infof("Estimated number of rows via EXPLAIN: %d", this.migrationContext.RowsEstimate)
return nil return nil
} }
@ -525,7 +524,7 @@ func (this *Inspector) CountTableRows() error {
atomic.StoreInt64(&this.migrationContext.CountingRowsFlag, 1) atomic.StoreInt64(&this.migrationContext.CountingRowsFlag, 1)
defer atomic.StoreInt64(&this.migrationContext.CountingRowsFlag, 0) defer atomic.StoreInt64(&this.migrationContext.CountingRowsFlag, 0)
log.Infof("As instructed, I'm issuing a SELECT COUNT(*) on the table. This may take a while") this.migrationContext.Log.Infof("As instructed, I'm issuing a SELECT COUNT(*) on the table. This may take a while")
query := fmt.Sprintf(`select /* gh-ost */ count(*) as rows from %s.%s`, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName)) query := fmt.Sprintf(`select /* gh-ost */ count(*) as rows from %s.%s`, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
var rowsEstimate int64 var rowsEstimate int64
@ -535,7 +534,7 @@ func (this *Inspector) CountTableRows() error {
atomic.StoreInt64(&this.migrationContext.RowsEstimate, rowsEstimate) atomic.StoreInt64(&this.migrationContext.RowsEstimate, rowsEstimate)
this.migrationContext.UsedRowsEstimateMethod = base.CountRowsEstimate this.migrationContext.UsedRowsEstimateMethod = base.CountRowsEstimate
log.Infof("Exact number of rows via COUNT: %d", rowsEstimate) this.migrationContext.Log.Infof("Exact number of rows via COUNT: %d", rowsEstimate)
return nil return nil
} }
@ -663,7 +662,7 @@ func (this *Inspector) getCandidateUniqueKeys(tableName string) (uniqueKeys [](*
if err != nil { if err != nil {
return uniqueKeys, err return uniqueKeys, err
} }
log.Debugf("Potential unique keys in %+v: %+v", tableName, uniqueKeys) this.migrationContext.Log.Debugf("Potential unique keys in %+v: %+v", tableName, uniqueKeys)
return uniqueKeys, nil return uniqueKeys, nil
} }
@ -753,7 +752,7 @@ func (this *Inspector) readChangelogState(hint string) (string, error) {
} }
func (this *Inspector) getMasterConnectionConfig() (applierConfig *mysql.ConnectionConfig, err error) { func (this *Inspector) getMasterConnectionConfig() (applierConfig *mysql.ConnectionConfig, err error) {
log.Infof("Recursively searching for replication master") this.migrationContext.Log.Infof("Recursively searching for replication master")
visitedKeys := mysql.NewInstanceKeyMap() visitedKeys := mysql.NewInstanceKeyMap()
return mysql.GetMasterConnectionConfigSafe(this.connectionConfig, visitedKeys, this.migrationContext.AllowedMasterMaster) return mysql.GetMasterConnectionConfigSafe(this.connectionConfig, visitedKeys, this.migrationContext.AllowedMasterMaster)
} }

View File

@ -18,8 +18,6 @@ import (
"github.com/github/gh-ost/go/binlog" "github.com/github/gh-ost/go/binlog"
"github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/mysql"
"github.com/github/gh-ost/go/sql" "github.com/github/gh-ost/go/sql"
"github.com/outbrain/golib/log"
) )
type ChangelogState string type ChangelogState string
@ -216,7 +214,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
} }
changelogStateString := dmlEvent.NewColumnValues.StringColumn(3) changelogStateString := dmlEvent.NewColumnValues.StringColumn(3)
changelogState := ReadChangelogState(changelogStateString) changelogState := ReadChangelogState(changelogStateString)
log.Infof("Intercepted changelog state %s", changelogState) this.migrationContext.Log.Infof("Intercepted changelog state %s", changelogState)
switch changelogState { switch changelogState {
case GhostTableMigrated: case GhostTableMigrated:
{ {
@ -242,14 +240,14 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
return fmt.Errorf("Unknown changelog state: %+v", changelogState) return fmt.Errorf("Unknown changelog state: %+v", changelogState)
} }
} }
log.Infof("Handled changelog state %s", changelogState) this.migrationContext.Log.Infof("Handled changelog state %s", changelogState)
return nil return nil
} }
// listenOnPanicAbort aborts on abort request // listenOnPanicAbort aborts on abort request
func (this *Migrator) listenOnPanicAbort() { func (this *Migrator) listenOnPanicAbort() {
err := <-this.migrationContext.PanicAbort err := <-this.migrationContext.PanicAbort
log.Fatale(err) this.migrationContext.Log.Fatale(err)
} }
// validateStatement validates the `alter` statement meets criteria. // validateStatement validates the `alter` statement meets criteria.
@ -265,7 +263,7 @@ func (this *Migrator) validateStatement() (err error) {
if !this.migrationContext.ApproveRenamedColumns { if !this.migrationContext.ApproveRenamedColumns {
return fmt.Errorf("gh-ost believes the ALTER statement renames columns, as follows: %v; as precaution, you are asked to confirm gh-ost is correct, and provide with `--approve-renamed-columns`, and we're all happy. Or you can skip renamed columns via `--skip-renamed-columns`, in which case column data may be lost", this.parser.GetNonTrivialRenames()) return fmt.Errorf("gh-ost believes the ALTER statement renames columns, as follows: %v; as precaution, you are asked to confirm gh-ost is correct, and provide with `--approve-renamed-columns`, and we're all happy. Or you can skip renamed columns via `--skip-renamed-columns`, in which case column data may be lost", this.parser.GetNonTrivialRenames())
} }
log.Infof("Alter statement has column(s) renamed. gh-ost finds the following renames: %v; --approve-renamed-columns is given and so migration proceeds.", this.parser.GetNonTrivialRenames()) this.migrationContext.Log.Infof("Alter statement has column(s) renamed. gh-ost finds the following renames: %v; --approve-renamed-columns is given and so migration proceeds.", this.parser.GetNonTrivialRenames())
} }
this.migrationContext.DroppedColumnsMap = this.parser.DroppedColumnsMap() this.migrationContext.DroppedColumnsMap = this.parser.DroppedColumnsMap()
return nil return nil
@ -277,7 +275,7 @@ func (this *Migrator) countTableRows() (err error) {
return nil return nil
} }
if this.migrationContext.Noop { if this.migrationContext.Noop {
log.Debugf("Noop operation; not really counting table rows") this.migrationContext.Log.Debugf("Noop operation; not really counting table rows")
return nil return nil
} }
@ -292,7 +290,7 @@ func (this *Migrator) countTableRows() (err error) {
} }
if this.migrationContext.ConcurrentCountTableRows { if this.migrationContext.ConcurrentCountTableRows {
log.Infof("As instructed, counting rows in the background; meanwhile I will use an estimated count, and will update it later on") this.migrationContext.Log.Infof("As instructed, counting rows in the background; meanwhile I will use an estimated count, and will update it later on")
go countRowsFunc() go countRowsFunc()
// and we ignore errors, because this turns to be a background job // and we ignore errors, because this turns to be a background job
return nil return nil
@ -304,9 +302,9 @@ func (this *Migrator) createFlagFiles() (err error) {
if this.migrationContext.PostponeCutOverFlagFile != "" { if this.migrationContext.PostponeCutOverFlagFile != "" {
if !base.FileExists(this.migrationContext.PostponeCutOverFlagFile) { if !base.FileExists(this.migrationContext.PostponeCutOverFlagFile) {
if err := base.TouchFile(this.migrationContext.PostponeCutOverFlagFile); err != nil { if err := base.TouchFile(this.migrationContext.PostponeCutOverFlagFile); err != nil {
return log.Errorf("--postpone-cut-over-flag-file indicated by gh-ost is unable to create said file: %s", err.Error()) return this.migrationContext.Log.Errorf("--postpone-cut-over-flag-file indicated by gh-ost is unable to create said file: %s", err.Error())
} }
log.Infof("Created postpone-cut-over-flag-file: %s", this.migrationContext.PostponeCutOverFlagFile) this.migrationContext.Log.Infof("Created postpone-cut-over-flag-file: %s", this.migrationContext.PostponeCutOverFlagFile)
} }
} }
return nil return nil
@ -314,7 +312,7 @@ func (this *Migrator) createFlagFiles() (err error) {
// Migrate executes the complete migration logic. This is *the* major gh-ost function. // Migrate executes the complete migration logic. This is *the* major gh-ost function.
func (this *Migrator) Migrate() (err error) { func (this *Migrator) Migrate() (err error) {
log.Infof("Migrating %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName)) this.migrationContext.Log.Infof("Migrating %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
this.migrationContext.StartTime = time.Now() this.migrationContext.StartTime = time.Now()
if this.migrationContext.Hostname, err = os.Hostname(); err != nil { if this.migrationContext.Hostname, err = os.Hostname(); err != nil {
return err return err
@ -353,9 +351,9 @@ func (this *Migrator) Migrate() (err error) {
} }
initialLag, _ := this.inspector.getReplicationLag() initialLag, _ := this.inspector.getReplicationLag()
log.Infof("Waiting for ghost table to be migrated. Current lag is %+v", initialLag) this.migrationContext.Log.Infof("Waiting for ghost table to be migrated. Current lag is %+v", initialLag)
<-this.ghostTableMigrated <-this.ghostTableMigrated
log.Debugf("ghost table migrated") this.migrationContext.Log.Debugf("ghost table migrated")
// Yay! We now know the Ghost and Changelog tables are good to examine! // Yay! We now know the Ghost and Changelog tables are good to examine!
// When running on replica, this means the replica has those tables. When running // When running on replica, this means the replica has those tables. When running
// on master this is always true, of course, and yet it also implies this knowledge // on master this is always true, of course, and yet it also implies this knowledge
@ -393,9 +391,9 @@ func (this *Migrator) Migrate() (err error) {
this.migrationContext.MarkRowCopyStartTime() this.migrationContext.MarkRowCopyStartTime()
go this.initiateStatus() go this.initiateStatus()
log.Debugf("Operating until row copy is complete") this.migrationContext.Log.Debugf("Operating until row copy is complete")
this.consumeRowCopyComplete() this.consumeRowCopyComplete()
log.Infof("Row copy complete") this.migrationContext.Log.Infof("Row copy complete")
if err := this.hooksExecutor.onRowCopyComplete(); err != nil { if err := this.hooksExecutor.onRowCopyComplete(); err != nil {
return err return err
} }
@ -421,7 +419,7 @@ func (this *Migrator) Migrate() (err error) {
if err := this.hooksExecutor.onSuccess(); err != nil { if err := this.hooksExecutor.onSuccess(); err != nil {
return err return err
} }
log.Infof("Done migrating %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName)) this.migrationContext.Log.Infof("Done migrating %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
return nil return nil
} }
@ -447,14 +445,14 @@ func (this *Migrator) handleCutOverResult(cutOverError error) (err error) {
// and swap the tables. // and swap the tables.
// The difference is that we will later swap the tables back. // The difference is that we will later swap the tables back.
if err := this.hooksExecutor.onStartReplication(); err != nil { if err := this.hooksExecutor.onStartReplication(); err != nil {
return log.Errore(err) return this.migrationContext.Log.Errore(err)
} }
if this.migrationContext.TestOnReplicaSkipReplicaStop { if this.migrationContext.TestOnReplicaSkipReplicaStop {
log.Warningf("--test-on-replica-skip-replica-stop enabled, we are not starting replication.") this.migrationContext.Log.Warningf("--test-on-replica-skip-replica-stop enabled, we are not starting replication.")
} else { } else {
log.Debugf("testing on replica. Starting replication IO thread after cut-over failure") this.migrationContext.Log.Debugf("testing on replica. Starting replication IO thread after cut-over failure")
if err := this.retryOperation(this.applier.StartReplication); err != nil { if err := this.retryOperation(this.applier.StartReplication); err != nil {
return log.Errore(err) return this.migrationContext.Log.Errore(err)
} }
} }
} }
@ -465,16 +463,16 @@ func (this *Migrator) handleCutOverResult(cutOverError error) (err error) {
// type (on replica? atomic? safe?) // type (on replica? atomic? safe?)
func (this *Migrator) cutOver() (err error) { func (this *Migrator) cutOver() (err error) {
if this.migrationContext.Noop { if this.migrationContext.Noop {
log.Debugf("Noop operation; not really swapping tables") this.migrationContext.Log.Debugf("Noop operation; not really swapping tables")
return nil return nil
} }
this.migrationContext.MarkPointOfInterest() this.migrationContext.MarkPointOfInterest()
this.throttler.throttle(func() { this.throttler.throttle(func() {
log.Debugf("throttling before swapping tables") this.migrationContext.Log.Debugf("throttling before swapping tables")
}) })
this.migrationContext.MarkPointOfInterest() this.migrationContext.MarkPointOfInterest()
log.Debugf("checking for cut-over postpone") this.migrationContext.Log.Debugf("checking for cut-over postpone")
this.sleepWhileTrue( this.sleepWhileTrue(
func() (bool, error) { func() (bool, error) {
if this.migrationContext.PostponeCutOverFlagFile == "" { if this.migrationContext.PostponeCutOverFlagFile == "" {
@ -499,7 +497,7 @@ func (this *Migrator) cutOver() (err error) {
) )
atomic.StoreInt64(&this.migrationContext.IsPostponingCutOver, 0) atomic.StoreInt64(&this.migrationContext.IsPostponingCutOver, 0)
this.migrationContext.MarkPointOfInterest() this.migrationContext.MarkPointOfInterest()
log.Debugf("checking for cut-over postpone: complete") this.migrationContext.Log.Debugf("checking for cut-over postpone: complete")
if this.migrationContext.TestOnReplica { if this.migrationContext.TestOnReplica {
// With `--test-on-replica` we stop replication thread, and then proceed to use // With `--test-on-replica` we stop replication thread, and then proceed to use
@ -510,9 +508,9 @@ func (this *Migrator) cutOver() (err error) {
return err return err
} }
if this.migrationContext.TestOnReplicaSkipReplicaStop { if this.migrationContext.TestOnReplicaSkipReplicaStop {
log.Warningf("--test-on-replica-skip-replica-stop enabled, we are not stopping replication.") this.migrationContext.Log.Warningf("--test-on-replica-skip-replica-stop enabled, we are not stopping replication.")
} else { } else {
log.Debugf("testing on replica. Stopping replication IO thread") this.migrationContext.Log.Debugf("testing on replica. Stopping replication IO thread")
if err := this.retryOperation(this.applier.StopReplication); err != nil { if err := this.retryOperation(this.applier.StopReplication); err != nil {
return err return err
} }
@ -530,7 +528,7 @@ func (this *Migrator) cutOver() (err error) {
this.handleCutOverResult(err) this.handleCutOverResult(err)
return err return err
} }
return log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType) return this.migrationContext.Log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType)
} }
// Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs, // Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs,
@ -542,32 +540,32 @@ func (this *Migrator) waitForEventsUpToLock() (err error) {
waitForEventsUpToLockStartTime := time.Now() waitForEventsUpToLockStartTime := time.Now()
allEventsUpToLockProcessedChallenge := fmt.Sprintf("%s:%d", string(AllEventsUpToLockProcessed), waitForEventsUpToLockStartTime.UnixNano()) allEventsUpToLockProcessedChallenge := fmt.Sprintf("%s:%d", string(AllEventsUpToLockProcessed), waitForEventsUpToLockStartTime.UnixNano())
log.Infof("Writing changelog state: %+v", allEventsUpToLockProcessedChallenge) this.migrationContext.Log.Infof("Writing changelog state: %+v", allEventsUpToLockProcessedChallenge)
if _, err := this.applier.WriteChangelogState(allEventsUpToLockProcessedChallenge); err != nil { if _, err := this.applier.WriteChangelogState(allEventsUpToLockProcessedChallenge); err != nil {
return err return err
} }
log.Infof("Waiting for events up to lock") this.migrationContext.Log.Infof("Waiting for events up to lock")
atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 1) atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 1)
for found := false; !found; { for found := false; !found; {
select { select {
case <-timeout.C: case <-timeout.C:
{ {
return log.Errorf("Timeout while waiting for events up to lock") return this.migrationContext.Log.Errorf("Timeout while waiting for events up to lock")
} }
case state := <-this.allEventsUpToLockProcessed: case state := <-this.allEventsUpToLockProcessed:
{ {
if state == allEventsUpToLockProcessedChallenge { if state == allEventsUpToLockProcessedChallenge {
log.Infof("Waiting for events up to lock: got %s", state) this.migrationContext.Log.Infof("Waiting for events up to lock: got %s", state)
found = true found = true
} else { } else {
log.Infof("Waiting for events up to lock: skipping %s", state) this.migrationContext.Log.Infof("Waiting for events up to lock: skipping %s", state)
} }
} }
} }
} }
waitForEventsUpToLockDuration := time.Since(waitForEventsUpToLockStartTime) waitForEventsUpToLockDuration := time.Since(waitForEventsUpToLockStartTime)
log.Infof("Done waiting for events up to lock; duration=%+v", waitForEventsUpToLockDuration) this.migrationContext.Log.Infof("Done waiting for events up to lock; duration=%+v", waitForEventsUpToLockDuration)
this.printStatus(ForcePrintStatusAndHintRule) this.printStatus(ForcePrintStatusAndHintRule)
return nil return nil
@ -598,7 +596,7 @@ func (this *Migrator) cutOverTwoStep() (err error) {
lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime) lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime)
renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime) renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime)
log.Debugf("Lock & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName)) this.migrationContext.Log.Debugf("Lock & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName))
return nil return nil
} }
@ -620,18 +618,18 @@ func (this *Migrator) atomicCutOver() (err error) {
tableUnlocked := make(chan error, 2) tableUnlocked := make(chan error, 2)
go func() { go func() {
if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked); err != nil { if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked); err != nil {
log.Errore(err) this.migrationContext.Log.Errore(err)
} }
}() }()
if err := <-tableLocked; err != nil { if err := <-tableLocked; err != nil {
return log.Errore(err) return this.migrationContext.Log.Errore(err)
} }
lockOriginalSessionId := <-lockOriginalSessionIdChan lockOriginalSessionId := <-lockOriginalSessionIdChan
log.Infof("Session locking original & magic tables is %+v", lockOriginalSessionId) this.migrationContext.Log.Infof("Session locking original & magic tables is %+v", lockOriginalSessionId)
// At this point we know the original table is locked. // At this point we know the original table is locked.
// We know any newly incoming DML on original table is blocked. // We know any newly incoming DML on original table is blocked.
if err := this.waitForEventsUpToLock(); err != nil { if err := this.waitForEventsUpToLock(); err != nil {
return log.Errore(err) return this.migrationContext.Log.Errore(err)
} }
// Step 2 // Step 2
@ -649,7 +647,7 @@ func (this *Migrator) atomicCutOver() (err error) {
} }
}() }()
renameSessionId := <-renameSessionIdChan renameSessionId := <-renameSessionIdChan
log.Infof("Session renaming tables is %+v", renameSessionId) this.migrationContext.Log.Infof("Session renaming tables is %+v", renameSessionId)
waitForRename := func() error { waitForRename := func() error {
if atomic.LoadInt64(&tableRenameKnownToHaveFailed) == 1 { if atomic.LoadInt64(&tableRenameKnownToHaveFailed) == 1 {
@ -666,13 +664,13 @@ func (this *Migrator) atomicCutOver() (err error) {
return err return err
} }
if atomic.LoadInt64(&tableRenameKnownToHaveFailed) == 0 { if atomic.LoadInt64(&tableRenameKnownToHaveFailed) == 0 {
log.Infof("Found atomic RENAME to be blocking, as expected. Double checking the lock is still in place (though I don't strictly have to)") this.migrationContext.Log.Infof("Found atomic RENAME to be blocking, as expected. Double checking the lock is still in place (though I don't strictly have to)")
} }
if err := this.applier.ExpectUsedLock(lockOriginalSessionId); err != nil { if err := this.applier.ExpectUsedLock(lockOriginalSessionId); err != nil {
// Abort operation. Just make sure to drop the magic table. // Abort operation. Just make sure to drop the magic table.
return log.Errore(err) return this.migrationContext.Log.Errore(err)
} }
log.Infof("Connection holding lock on original table still exists") this.migrationContext.Log.Infof("Connection holding lock on original table still exists")
// Now that we've found the RENAME blocking, AND the locking connection still alive, // Now that we've found the RENAME blocking, AND the locking connection still alive,
// we know it is safe to proceed to release the lock // we know it is safe to proceed to release the lock
@ -681,16 +679,16 @@ func (this *Migrator) atomicCutOver() (err error) {
// BAM! magic table dropped, original table lock is released // BAM! magic table dropped, original table lock is released
// -> RENAME released -> queries on original are unblocked. // -> RENAME released -> queries on original are unblocked.
if err := <-tableUnlocked; err != nil { if err := <-tableUnlocked; err != nil {
return log.Errore(err) return this.migrationContext.Log.Errore(err)
} }
if err := <-tablesRenamed; err != nil { if err := <-tablesRenamed; err != nil {
return log.Errore(err) return this.migrationContext.Log.Errore(err)
} }
this.migrationContext.RenameTablesEndTime = time.Now() this.migrationContext.RenameTablesEndTime = time.Now()
// ooh nice! We're actually truly and thankfully done // ooh nice! We're actually truly and thankfully done
lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime) lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime)
log.Infof("Lock & rename duration: %s. During this time, queries on %s were blocked", lockAndRenameDuration, sql.EscapeName(this.migrationContext.OriginalTableName)) this.migrationContext.Log.Infof("Lock & rename duration: %s. During this time, queries on %s were blocked", lockAndRenameDuration, sql.EscapeName(this.migrationContext.OriginalTableName))
return nil return nil
} }
@ -736,7 +734,7 @@ func (this *Migrator) initiateInspector() (err error) {
if this.migrationContext.ApplierConnectionConfig, err = this.inspector.getMasterConnectionConfig(); err != nil { if this.migrationContext.ApplierConnectionConfig, err = this.inspector.getMasterConnectionConfig(); err != nil {
return err return err
} }
log.Infof("Master found to be %+v", *this.migrationContext.ApplierConnectionConfig.ImpliedKey) this.migrationContext.Log.Infof("Master found to be %+v", *this.migrationContext.ApplierConnectionConfig.ImpliedKey)
} else { } else {
// Forced master host. // Forced master host.
key, err := mysql.ParseRawInstanceKeyLoose(this.migrationContext.AssumeMasterHostname) key, err := mysql.ParseRawInstanceKeyLoose(this.migrationContext.AssumeMasterHostname)
@ -750,14 +748,14 @@ func (this *Migrator) initiateInspector() (err error) {
if this.migrationContext.CliMasterPassword != "" { if this.migrationContext.CliMasterPassword != "" {
this.migrationContext.ApplierConnectionConfig.Password = this.migrationContext.CliMasterPassword this.migrationContext.ApplierConnectionConfig.Password = this.migrationContext.CliMasterPassword
} }
log.Infof("Master forced to be %+v", *this.migrationContext.ApplierConnectionConfig.ImpliedKey) this.migrationContext.Log.Infof("Master forced to be %+v", *this.migrationContext.ApplierConnectionConfig.ImpliedKey)
} }
// validate configs // validate configs
if this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica { if this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica {
if this.migrationContext.InspectorIsAlsoApplier() { if this.migrationContext.InspectorIsAlsoApplier() {
return fmt.Errorf("Instructed to --test-on-replica or --migrate-on-replica, but the server we connect to doesn't seem to be a replica") return fmt.Errorf("Instructed to --test-on-replica or --migrate-on-replica, but the server we connect to doesn't seem to be a replica")
} }
log.Infof("--test-on-replica or --migrate-on-replica given. Will not execute on master %+v but rather on replica %+v itself", this.migrationContext.Log.Infof("--test-on-replica or --migrate-on-replica given. Will not execute on master %+v but rather on replica %+v itself",
*this.migrationContext.ApplierConnectionConfig.ImpliedKey, *this.migrationContext.InspectorConnectionConfig.ImpliedKey, *this.migrationContext.ApplierConnectionConfig.ImpliedKey, *this.migrationContext.InspectorConnectionConfig.ImpliedKey,
) )
this.migrationContext.ApplierConnectionConfig = this.migrationContext.InspectorConnectionConfig.Duplicate() this.migrationContext.ApplierConnectionConfig = this.migrationContext.InspectorConnectionConfig.Duplicate()
@ -998,12 +996,12 @@ func (this *Migrator) initiateStreaming() error {
) )
go func() { go func() {
log.Debugf("Beginning streaming") this.migrationContext.Log.Debugf("Beginning streaming")
err := this.eventsStreamer.StreamEvents(this.canStopStreaming) err := this.eventsStreamer.StreamEvents(this.canStopStreaming)
if err != nil { if err != nil {
this.migrationContext.PanicAbort <- err this.migrationContext.PanicAbort <- err
} }
log.Debugf("Done streaming") this.migrationContext.Log.Debugf("Done streaming")
}() }()
go func() { go func() {
@ -1038,11 +1036,11 @@ func (this *Migrator) initiateThrottler() error {
this.throttler = NewThrottler(this.migrationContext, this.applier, this.inspector) this.throttler = NewThrottler(this.migrationContext, this.applier, this.inspector)
go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected) go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected)
log.Infof("Waiting for first throttle metrics to be collected") this.migrationContext.Log.Infof("Waiting for first throttle metrics to be collected")
<-this.firstThrottlingCollected // replication lag <-this.firstThrottlingCollected // replication lag
<-this.firstThrottlingCollected // HTTP status <-this.firstThrottlingCollected // HTTP status
<-this.firstThrottlingCollected // other, general metrics <-this.firstThrottlingCollected // other, general metrics
log.Infof("First throttle metrics collected") this.migrationContext.Log.Infof("First throttle metrics collected")
go this.throttler.initiateThrottlerChecks() go this.throttler.initiateThrottlerChecks()
return nil return nil
@ -1057,16 +1055,16 @@ func (this *Migrator) initiateApplier() error {
return err return err
} }
if err := this.applier.CreateChangelogTable(); err != nil { if err := this.applier.CreateChangelogTable(); err != nil {
log.Errorf("Unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out") this.migrationContext.Log.Errorf("Unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out")
return err return err
} }
if err := this.applier.CreateGhostTable(); err != nil { if err := this.applier.CreateGhostTable(); err != nil {
log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out") this.migrationContext.Log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out")
return err return err
} }
if err := this.applier.AlterGhost(); err != nil { if err := this.applier.AlterGhost(); err != nil {
log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out") this.migrationContext.Log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out")
return err return err
} }
@ -1080,14 +1078,14 @@ func (this *Migrator) initiateApplier() error {
func (this *Migrator) iterateChunks() error { func (this *Migrator) iterateChunks() error {
terminateRowIteration := func(err error) error { terminateRowIteration := func(err error) error {
this.rowCopyComplete <- err this.rowCopyComplete <- err
return log.Errore(err) return this.migrationContext.Log.Errore(err)
} }
if this.migrationContext.Noop { if this.migrationContext.Noop {
log.Debugf("Noop operation; not really copying data") this.migrationContext.Log.Debugf("Noop operation; not really copying data")
return terminateRowIteration(nil) return terminateRowIteration(nil)
} }
if this.migrationContext.MigrationRangeMinValues == nil { if this.migrationContext.MigrationRangeMinValues == nil {
log.Debugf("No rows found in table. Rowcopy will be implicitly empty") this.migrationContext.Log.Debugf("No rows found in table. Rowcopy will be implicitly empty")
return terminateRowIteration(nil) return terminateRowIteration(nil)
} }
@ -1155,7 +1153,7 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error {
handleNonDMLEventStruct := func(eventStruct *applyEventStruct) error { handleNonDMLEventStruct := func(eventStruct *applyEventStruct) error {
if eventStruct.writeFunc != nil { if eventStruct.writeFunc != nil {
if err := this.retryOperation(*eventStruct.writeFunc); err != nil { if err := this.retryOperation(*eventStruct.writeFunc); err != nil {
return log.Errore(err) return this.migrationContext.Log.Errore(err)
} }
} }
return nil return nil
@ -1189,13 +1187,13 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error {
return this.applier.ApplyDMLEventQueries(dmlEvents) return this.applier.ApplyDMLEventQueries(dmlEvents)
} }
if err := this.retryOperation(applyEventFunc); err != nil { if err := this.retryOperation(applyEventFunc); err != nil {
return log.Errore(err) return this.migrationContext.Log.Errore(err)
} }
if nonDmlStructToApply != nil { if nonDmlStructToApply != nil {
// We pulled DML events from the queue, and then we hit a non-DML event. Wait! // We pulled DML events from the queue, and then we hit a non-DML event. Wait!
// We need to handle it! // We need to handle it!
if err := handleNonDMLEventStruct(nonDmlStructToApply); err != nil { if err := handleNonDMLEventStruct(nonDmlStructToApply); err != nil {
return log.Errore(err) return this.migrationContext.Log.Errore(err)
} }
} }
} }
@ -1207,7 +1205,7 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error {
// Both event backlog and rowcopy events are polled; the backlog events have precedence. // Both event backlog and rowcopy events are polled; the backlog events have precedence.
func (this *Migrator) executeWriteFuncs() error { func (this *Migrator) executeWriteFuncs() error {
if this.migrationContext.Noop { if this.migrationContext.Noop {
log.Debugf("Noop operation; not really executing write funcs") this.migrationContext.Log.Debugf("Noop operation; not really executing write funcs")
return nil return nil
} }
for { for {
@ -1234,7 +1232,7 @@ func (this *Migrator) executeWriteFuncs() error {
copyRowsStartTime := time.Now() copyRowsStartTime := time.Now()
// Retries are handled within the copyRowsFunc // Retries are handled within the copyRowsFunc
if err := copyRowsFunc(); err != nil { if err := copyRowsFunc(); err != nil {
return log.Errore(err) return this.migrationContext.Log.Errore(err)
} }
if niceRatio := this.migrationContext.GetNiceRatio(); niceRatio > 0 { if niceRatio := this.migrationContext.GetNiceRatio(); niceRatio > 0 {
copyRowsDuration := time.Since(copyRowsStartTime) copyRowsDuration := time.Since(copyRowsStartTime)
@ -1247,7 +1245,7 @@ func (this *Migrator) executeWriteFuncs() error {
{ {
// Hmmmmm... nothing in the queue; no events, but also no row copy. // Hmmmmm... nothing in the queue; no events, but also no row copy.
// This is possible upon load. Let's just sleep it over. // This is possible upon load. Let's just sleep it over.
log.Debugf("Getting nothing in the write queue. Sleeping...") this.migrationContext.Log.Debugf("Getting nothing in the write queue. Sleeping...")
time.Sleep(time.Second) time.Sleep(time.Second)
} }
} }
@ -1263,14 +1261,14 @@ func (this *Migrator) finalCleanup() error {
if this.migrationContext.Noop { if this.migrationContext.Noop {
if createTableStatement, err := this.inspector.showCreateTable(this.migrationContext.GetGhostTableName()); err == nil { if createTableStatement, err := this.inspector.showCreateTable(this.migrationContext.GetGhostTableName()); err == nil {
log.Infof("New table structure follows") this.migrationContext.Log.Infof("New table structure follows")
fmt.Println(createTableStatement) fmt.Println(createTableStatement)
} else { } else {
log.Errore(err) this.migrationContext.Log.Errore(err)
} }
} }
if err := this.eventsStreamer.Close(); err != nil { if err := this.eventsStreamer.Close(); err != nil {
log.Errore(err) this.migrationContext.Log.Errore(err)
} }
if err := this.retryOperation(this.applier.DropChangelogTable); err != nil { if err := this.retryOperation(this.applier.DropChangelogTable); err != nil {
@ -1282,8 +1280,8 @@ func (this *Migrator) finalCleanup() error {
} }
} else { } else {
if !this.migrationContext.Noop { if !this.migrationContext.Noop {
log.Infof("Am not dropping old table because I want this operation to be as live as possible. If you insist I should do it, please add `--ok-to-drop-table` next time. But I prefer you do not. To drop the old table, issue:") this.migrationContext.Log.Infof("Am not dropping old table because I want this operation to be as live as possible. If you insist I should do it, please add `--ok-to-drop-table` next time. But I prefer you do not. To drop the old table, issue:")
log.Infof("-- drop table %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.GetOldTableName())) this.migrationContext.Log.Infof("-- drop table %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.GetOldTableName()))
} }
} }
if this.migrationContext.Noop { if this.migrationContext.Noop {
@ -1299,22 +1297,22 @@ func (this *Migrator) teardown() {
atomic.StoreInt64(&this.finishedMigrating, 1) atomic.StoreInt64(&this.finishedMigrating, 1)
if this.inspector != nil { if this.inspector != nil {
log.Infof("Tearing down inspector") this.migrationContext.Log.Infof("Tearing down inspector")
this.inspector.Teardown() this.inspector.Teardown()
} }
if this.applier != nil { if this.applier != nil {
log.Infof("Tearing down applier") this.migrationContext.Log.Infof("Tearing down applier")
this.applier.Teardown() this.applier.Teardown()
} }
if this.eventsStreamer != nil { if this.eventsStreamer != nil {
log.Infof("Tearing down streamer") this.migrationContext.Log.Infof("Tearing down streamer")
this.eventsStreamer.Teardown() this.eventsStreamer.Teardown()
} }
if this.throttler != nil { if this.throttler != nil {
log.Infof("Tearing down throttler") this.migrationContext.Log.Infof("Tearing down throttler")
this.throttler.Teardown() this.throttler.Teardown()
} }
} }

View File

@ -16,7 +16,6 @@ import (
"sync/atomic" "sync/atomic"
"github.com/github/gh-ost/go/base" "github.com/github/gh-ost/go/base"
"github.com/outbrain/golib/log"
) )
type printStatusFunc func(PrintStatusRule, io.Writer) type printStatusFunc func(PrintStatusRule, io.Writer)
@ -49,12 +48,12 @@ func (this *Server) BindSocketFile() (err error) {
if err != nil { if err != nil {
return err return err
} }
log.Infof("Listening on unix socket file: %s", this.migrationContext.ServeSocketFile) this.migrationContext.Log.Infof("Listening on unix socket file: %s", this.migrationContext.ServeSocketFile)
return nil return nil
} }
func (this *Server) RemoveSocketFile() (err error) { func (this *Server) RemoveSocketFile() (err error) {
log.Infof("Removing socket file: %s", this.migrationContext.ServeSocketFile) this.migrationContext.Log.Infof("Removing socket file: %s", this.migrationContext.ServeSocketFile)
return os.Remove(this.migrationContext.ServeSocketFile) return os.Remove(this.migrationContext.ServeSocketFile)
} }
@ -66,7 +65,7 @@ func (this *Server) BindTCPPort() (err error) {
if err != nil { if err != nil {
return err return err
} }
log.Infof("Listening on tcp port: %d", this.migrationContext.ServeTCPPort) this.migrationContext.Log.Infof("Listening on tcp port: %d", this.migrationContext.ServeTCPPort)
return nil return nil
} }
@ -76,7 +75,7 @@ func (this *Server) Serve() (err error) {
for { for {
conn, err := this.unixListener.Accept() conn, err := this.unixListener.Accept()
if err != nil { if err != nil {
log.Errore(err) this.migrationContext.Log.Errore(err)
} }
go this.handleConnection(conn) go this.handleConnection(conn)
} }
@ -88,7 +87,7 @@ func (this *Server) Serve() (err error) {
for { for {
conn, err := this.tcpListener.Accept() conn, err := this.tcpListener.Accept()
if err != nil { if err != nil {
log.Errore(err) this.migrationContext.Log.Errore(err)
} }
go this.handleConnection(conn) go this.handleConnection(conn)
} }
@ -118,7 +117,7 @@ func (this *Server) onServerCommand(command string, writer *bufio.Writer) (err e
} else { } else {
fmt.Fprintf(writer, "%s\n", err.Error()) fmt.Fprintf(writer, "%s\n", err.Error())
} }
return log.Errore(err) return this.migrationContext.Log.Errore(err)
} }
// applyServerCommand parses and executes commands by user // applyServerCommand parses and executes commands by user

View File

@ -16,7 +16,6 @@ import (
"github.com/github/gh-ost/go/binlog" "github.com/github/gh-ost/go/binlog"
"github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/mysql"
"github.com/outbrain/golib/log"
"github.com/outbrain/golib/sqlutils" "github.com/outbrain/golib/sqlutils"
) )
@ -160,7 +159,7 @@ func (this *EventsStreamer) readCurrentBinlogCoordinates() error {
if !foundMasterStatus { if !foundMasterStatus {
return fmt.Errorf("Got no results from SHOW MASTER STATUS. Bailing out") return fmt.Errorf("Got no results from SHOW MASTER STATUS. Bailing out")
} }
log.Debugf("Streamer binlog coordinates: %+v", *this.initialBinlogCoordinates) this.migrationContext.Log.Debugf("Streamer binlog coordinates: %+v", *this.initialBinlogCoordinates)
return nil return nil
} }
@ -186,7 +185,7 @@ func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error {
return nil return nil
} }
log.Infof("StreamEvents encountered unexpected error: %+v", err) this.migrationContext.Log.Infof("StreamEvents encountered unexpected error: %+v", err)
this.migrationContext.MarkPointOfInterest() this.migrationContext.MarkPointOfInterest()
time.Sleep(ReconnectStreamerSleepSeconds * time.Second) time.Sleep(ReconnectStreamerSleepSeconds * time.Second)
@ -202,7 +201,7 @@ func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error {
// Reposition at same binlog file. // Reposition at same binlog file.
lastAppliedRowsEventHint = this.binlogReader.LastAppliedRowsEventHint lastAppliedRowsEventHint = this.binlogReader.LastAppliedRowsEventHint
log.Infof("Reconnecting... Will resume at %+v", lastAppliedRowsEventHint) this.migrationContext.Log.Infof("Reconnecting... Will resume at %+v", lastAppliedRowsEventHint)
if err := this.initBinlogReader(this.GetReconnectBinlogCoordinates()); err != nil { if err := this.initBinlogReader(this.GetReconnectBinlogCoordinates()); err != nil {
return err return err
} }
@ -213,7 +212,7 @@ func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error {
func (this *EventsStreamer) Close() (err error) { func (this *EventsStreamer) Close() (err error) {
err = this.binlogReader.Close() err = this.binlogReader.Close()
log.Infof("Closed streamer connection. err=%+v", err) this.migrationContext.Log.Infof("Closed streamer connection. err=%+v", err)
return err return err
} }

View File

@ -15,7 +15,6 @@ import (
"github.com/github/gh-ost/go/base" "github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/mysql"
"github.com/github/gh-ost/go/sql" "github.com/github/gh-ost/go/sql"
"github.com/outbrain/golib/log"
) )
var ( var (
@ -123,7 +122,7 @@ func parseChangelogHeartbeat(heartbeatValue string) (lag time.Duration, err erro
// parseChangelogHeartbeat parses a string timestamp and deduces replication lag // parseChangelogHeartbeat parses a string timestamp and deduces replication lag
func (this *Throttler) parseChangelogHeartbeat(heartbeatValue string) (err error) { func (this *Throttler) parseChangelogHeartbeat(heartbeatValue string) (err error) {
if lag, err := parseChangelogHeartbeat(heartbeatValue); err != nil { if lag, err := parseChangelogHeartbeat(heartbeatValue); err != nil {
return log.Errore(err) return this.migrationContext.Log.Errore(err)
} else { } else {
atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag)) atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag))
return nil return nil
@ -145,13 +144,13 @@ func (this *Throttler) collectReplicationLag(firstThrottlingCollected chan<- boo
// This means we will always get a good heartbeat value. // This means we will always get a good heartbeat value.
// When running on replica, we should instead check the `SHOW SLAVE STATUS` output. // When running on replica, we should instead check the `SHOW SLAVE STATUS` output.
if lag, err := mysql.GetReplicationLagFromSlaveStatus(this.inspector.informationSchemaDb); err != nil { if lag, err := mysql.GetReplicationLagFromSlaveStatus(this.inspector.informationSchemaDb); err != nil {
return log.Errore(err) return this.migrationContext.Log.Errore(err)
} else { } else {
atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag)) atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag))
} }
} else { } else {
if heartbeatValue, err := this.inspector.readChangelogState("heartbeat"); err != nil { if heartbeatValue, err := this.inspector.readChangelogState("heartbeat"); err != nil {
return log.Errore(err) return this.migrationContext.Log.Errore(err)
} else { } else {
this.parseChangelogHeartbeat(heartbeatValue) this.parseChangelogHeartbeat(heartbeatValue)
} }
@ -348,7 +347,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
hibernateDuration := time.Duration(this.migrationContext.CriticalLoadHibernateSeconds) * time.Second hibernateDuration := time.Duration(this.migrationContext.CriticalLoadHibernateSeconds) * time.Second
hibernateUntilTime := time.Now().Add(hibernateDuration) hibernateUntilTime := time.Now().Add(hibernateDuration)
atomic.StoreInt64(&this.migrationContext.HibernateUntil, hibernateUntilTime.UnixNano()) atomic.StoreInt64(&this.migrationContext.HibernateUntil, hibernateUntilTime.UnixNano())
log.Errorf("critical-load met: %s=%d, >=%d. Will hibernate for the duration of %+v, until %+v", variableName, value, threshold, hibernateDuration, hibernateUntilTime) this.migrationContext.Log.Errorf("critical-load met: %s=%d, >=%d. Will hibernate for the duration of %+v, until %+v", variableName, value, threshold, hibernateDuration, hibernateUntilTime)
go func() { go func() {
time.Sleep(hibernateDuration) time.Sleep(hibernateDuration)
this.migrationContext.SetThrottleGeneralCheckResult(base.NewThrottleCheckResult(true, "leaving hibernation", base.LeavingHibernationThrottleReasonHint)) this.migrationContext.SetThrottleGeneralCheckResult(base.NewThrottleCheckResult(true, "leaving hibernation", base.LeavingHibernationThrottleReasonHint))
@ -361,7 +360,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold) this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold)
} }
if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds > 0 { if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds > 0 {
log.Errorf("critical-load met once: %s=%d, >=%d. Will check again in %d millis", variableName, value, threshold, this.migrationContext.CriticalLoadIntervalMilliseconds) this.migrationContext.Log.Errorf("critical-load met once: %s=%d, >=%d. Will check again in %d millis", variableName, value, threshold, this.migrationContext.CriticalLoadIntervalMilliseconds)
go func() { go func() {
timer := time.NewTimer(time.Millisecond * time.Duration(this.migrationContext.CriticalLoadIntervalMilliseconds)) timer := time.NewTimer(time.Millisecond * time.Duration(this.migrationContext.CriticalLoadIntervalMilliseconds))
<-timer.C <-timer.C
@ -479,6 +478,6 @@ func (this *Throttler) throttle(onThrottled func()) {
} }
func (this *Throttler) Teardown() { func (this *Throttler) Teardown() {
log.Debugf("Tearing down...") this.migrationContext.Log.Debugf("Tearing down...")
atomic.StoreInt64(&this.finishedMigrating, 1) atomic.StoreInt64(&this.finishedMigrating, 1)
} }