diff --git a/go/base/context.go b/go/base/context.go index d926b05..6e28cb0 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -7,6 +7,8 @@ package base import ( "fmt" + "strings" + "sync/atomic" "time" "github.com/github/gh-osc/go/mysql" @@ -36,7 +38,7 @@ type MigrationContext struct { CountTableRows bool RowsEstimate int64 UsedRowsEstimateMethod RowsEstimateMethod - ChunkSize int + ChunkSize int64 OriginalBinlogFormat string OriginalBinlogRowImage string AllowedRunningOnMaster bool @@ -52,6 +54,8 @@ type MigrationContext struct { RowCopyStartTime time.Time CurrentLag int64 MaxLagMillisecondsThrottleThreshold int64 + ThrottleFlagFile string + TotalRowsCopied int64 IsThrottled func() bool CanStopStreaming func() bool @@ -107,3 +111,33 @@ func (this *MigrationContext) HasMigrationRange() bool { func (this *MigrationContext) MaxRetries() int { return maxRetries } + +func (this *MigrationContext) IsTransactionalTable() bool { + switch strings.ToLower(this.TableEngine) { + case "innodb": + { + return true + } + case "tokudb": + { + return true + } + } + return false +} + +// ElapsedTime returns time since very beginning of the process +func (this *MigrationContext) ElapsedTime() time.Duration { + return time.Now().Sub(this.StartTime) +} + +// ElapsedRowCopyTime returns time since starting to copy chunks of rows +func (this *MigrationContext) ElapsedRowCopyTime() time.Duration { + return time.Now().Sub(this.RowCopyStartTime) +} + +// GetTotalRowsCopied returns the accurate number of rows being copied (affected) +// This is not exactly the same as the rows being iterated via chunks, but potentially close enough +func (this *MigrationContext) GetTotalRowsCopied() int64 { + return atomic.LoadInt64(&this.TotalRowsCopied) +} diff --git a/go/binlog/gomysql_reader.go b/go/binlog/gomysql_reader.go index ea06d58..4a0b586 100644 --- a/go/binlog/gomysql_reader.go +++ b/go/binlog/gomysql_reader.go @@ -120,65 +120,3 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha return nil } - -// ReadEntries will read binlog entries from parsed text output of `mysqlbinlog` utility -func (this *GoMySQLReader) ReadEntries(logFile string, startPos uint64, stopPos uint64) (entries [](*BinlogEntry), err error) { - this.currentCoordinates.LogFile = logFile - // Start sync with sepcified binlog file and position - streamer, err := this.binlogSyncer.StartSync(gomysql.Position{logFile, uint32(startPos)}) - if err != nil { - return entries, err - } - - for { - ev, err := streamer.GetEvent() - if err != nil { - return entries, err - } - this.currentCoordinates.LogPos = int64(ev.Header.LogPos) - if rotateEvent, ok := ev.Event.(*replication.RotateEvent); ok { - this.currentCoordinates.LogFile = string(rotateEvent.NextLogName) - log.Infof("rotate to next log name: %s", rotateEvent.NextLogName) - } else if tableMapEvent, ok := ev.Event.(*replication.TableMapEvent); ok { - // Actually not being used, since Table is available in RowsEvent. - // Keeping this here in case I'm wrong about this. Sometime in the near - // future I should remove this. - this.tableMap[tableMapEvent.TableID] = string(tableMapEvent.Table) - } else if rowsEvent, ok := ev.Event.(*replication.RowsEvent); ok { - dml := ToEventDML(ev.Header.EventType.String()) - if dml == NotDML { - return entries, fmt.Errorf("Unknown DML type: %s", ev.Header.EventType.String()) - } - for i, row := range rowsEvent.Rows { - if dml == UpdateDML && i%2 == 1 { - // An update has two rows (WHERE+SET) - // We do both at the same time - continue - } - binlogEntry := NewBinlogEntryAt(this.currentCoordinates) - binlogEntry.DmlEvent = NewBinlogDMLEvent( - string(rowsEvent.Table.Schema), - string(rowsEvent.Table.Table), - dml, - ) - switch dml { - case InsertDML: - { - binlogEntry.DmlEvent.NewColumnValues = sql.ToColumnValues(row) - } - case UpdateDML: - { - binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row) - binlogEntry.DmlEvent.NewColumnValues = sql.ToColumnValues(rowsEvent.Rows[i+1]) - } - case DeleteDML: - { - binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row) - } - } - } - } - } - log.Debugf("done") - return entries, err -} diff --git a/go/cmd/gh-osc/main.go b/go/cmd/gh-osc/main.go index 104699a..de7f976 100644 --- a/go/cmd/gh-osc/main.go +++ b/go/cmd/gh-osc/main.go @@ -35,7 +35,8 @@ func main() { flag.BoolVar(&migrationContext.CountTableRows, "exact-rowcount", false, "actually count table rows as opposed to estimate them (results in more accurate progress estimation)") flag.BoolVar(&migrationContext.AllowedRunningOnMaster, "allow-on-master", false, "allow this migration to run directly on master. Preferably it would run on a replica") - flag.IntVar(&migrationContext.ChunkSize, "chunk-size", 1000, "amount of rows to handle in each iteration") + flag.Int64Var(&migrationContext.ChunkSize, "chunk-size", 1000, "amount of rows to handle in each iteration") + flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "", "operation pauses when this file exists") quiet := flag.Bool("quiet", false, "quiet") verbose := flag.Bool("verbose", false, "verbose") diff --git a/go/logic/applier.go b/go/logic/applier.go index 46b759c..b4f63a9 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -8,6 +8,7 @@ package logic import ( gosql "database/sql" "fmt" + "sync/atomic" "time" "github.com/github/gh-osc/go/base" @@ -255,9 +256,9 @@ func (this *Applier) ReadMigrationRangeValues() error { return nil } -// IterationIsComplete lets us know when the copy-iteration phase is complete, i.e. +// __unused_IterationIsComplete lets us know when the copy-iteration phase is complete, i.e. // we've exhausted all rows -func (this *Applier) IterationIsComplete() (bool, error) { +func (this *Applier) __unused_IterationIsComplete() (bool, error) { if !this.migrationContext.HasMigrationRange() { return false, nil } @@ -298,7 +299,11 @@ func (this *Applier) IterationIsComplete() (bool, error) { return !moreRowsFound, nil } -func (this *Applier) CalculateNextIterationRangeEndValues() error { +// CalculateNextIterationRangeEndValues reads the next-iteration-range-end unique key values, +// which will be used for copying the next chunk of rows. Ir returns "false" if there is +// no further chunk to work through, i.e. we're past the last chunk and are done with +// itrating the range (and this done with copying row chunks) +func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, err error) { this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMaxValues if this.migrationContext.MigrationIterationRangeMinValues == nil { this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues @@ -313,23 +318,22 @@ func (this *Applier) CalculateNextIterationRangeEndValues() error { fmt.Sprintf("iteration:%d", this.migrationContext.Iteration), ) if err != nil { - return err + return hasFurtherRange, err } rows, err := this.db.Query(query, explodedArgs...) if err != nil { - return err + return hasFurtherRange, err } iterationRangeMaxValues := sql.NewColumnValues(len(this.migrationContext.UniqueKey.Columns)) - iterationRangeEndFound := false for rows.Next() { if err = rows.Scan(iterationRangeMaxValues.ValuesPointers...); err != nil { - return err + return hasFurtherRange, err } - iterationRangeEndFound = true + hasFurtherRange = true } - if !iterationRangeEndFound { + if !hasFurtherRange { log.Debugf("Iteration complete: cannot find iteration end") - return nil + return hasFurtherRange, nil } this.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues log.Debugf( @@ -339,10 +343,13 @@ func (this *Applier) CalculateNextIterationRangeEndValues() error { this.migrationContext.Iteration, this.migrationContext.ChunkSize, ) - return nil + return hasFurtherRange, nil } -func (this *Applier) ApplyIterationInsertQuery() error { +func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected int64, duration time.Duration, err error) { + startTime := time.Now() + chunkSize = atomic.LoadInt64(&this.migrationContext.ChunkSize) + query, explodedArgs, err := sql.BuildRangeInsertPreparedQuery( this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, @@ -353,44 +360,55 @@ func (this *Applier) ApplyIterationInsertQuery() error { this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(), this.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(), this.migrationContext.Iteration == 0, + this.migrationContext.IsTransactionalTable(), ) if err != nil { - return err + return chunkSize, rowsAffected, duration, err } - if _, err := sqlutils.Exec(this.db, query, explodedArgs...); err != nil { - return err + sqlResult, err := sqlutils.Exec(this.db, query, explodedArgs...) + if err != nil { + return chunkSize, rowsAffected, duration, err } + rowsAffected, _ = sqlResult.RowsAffected() + duration = time.Now().Sub(startTime) + this.WriteChangelog( + fmt.Sprintf("copy iteration %d", this.migrationContext.Iteration), + fmt.Sprintf("chunk: %d; affected: %d; duration: %d", chunkSize, rowsAffected, duration), + ) log.Debugf( "Issued INSERT on range: [%s]..[%s]; iteration: %d; chunk-size: %d", this.migrationContext.MigrationIterationRangeMinValues, this.migrationContext.MigrationIterationRangeMaxValues, this.migrationContext.Iteration, - this.migrationContext.ChunkSize) + chunkSize) + return chunkSize, rowsAffected, duration, nil +} + +// LockTables +func (this *Applier) LockTables() error { + query := fmt.Sprintf(`lock /* gh-osc */ tables %s.%s write, %s.%s write, %s.%s write`, + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.OriginalTableName), + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.GetGhostTableName()), + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.GetChangelogTableName()), + ) + log.Infof("Locking tables") + if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil { + return err + } + log.Infof("Tables locked") return nil } -// IterateTable -func (this *Applier) IterateTable(uniqueKey *sql.UniqueKey) error { - query, err := sql.BuildUniqueKeyMinValuesPreparedQuery(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, uniqueKey.Columns) - if err != nil { +// UnlockTables +func (this *Applier) UnlockTables() error { + query := `unlock /* gh-osc */ tables` + log.Infof("Unlocking tables") + if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil { return err } - columnValues := sql.NewColumnValues(len(uniqueKey.Columns)) - - rows, err := this.db.Query(query) - if err != nil { - return err - } - for rows.Next() { - if err = rows.Scan(columnValues.ValuesPointers...); err != nil { - return err - } - } - log.Debugf("column values: %s", columnValues) - query = `insert into test.sample_data_dump (category, ts) values (?, ?)` - if _, err := sqlutils.Exec(this.db, query, columnValues.AbstractValues()...); err != nil { - return err - } - + log.Infof("Tables unlocked") return nil } diff --git a/go/logic/migrator.go b/go/logic/migrator.go index d790117..451cb96 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -7,6 +7,7 @@ package logic import ( "fmt" + "os" "sync/atomic" "time" @@ -16,6 +17,19 @@ import ( "github.com/outbrain/golib/log" ) +type ChangelogState string + +const ( + TablesInPlace ChangelogState = "TablesInPlace" + AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed" +) + +type tableWriteFunc func() error + +const ( + applyEventsQueueBuffer = 100 +) + // Migrator is the main schema migration flow manager. type Migrator struct { inspector *Inspector @@ -23,13 +37,25 @@ type Migrator struct { eventsStreamer *EventsStreamer migrationContext *base.MigrationContext - tablesInPlace chan bool + tablesInPlace chan bool + rowCopyComplete chan bool + allEventsUpToLockProcessed chan bool + + // copyRowsQueue should not be buffered; if buffered some non-damaging but + // excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete + copyRowsQueue chan tableWriteFunc + applyEventsQueue chan tableWriteFunc } func NewMigrator() *Migrator { migrator := &Migrator{ - migrationContext: base.GetMigrationContext(), - tablesInPlace: make(chan bool), + migrationContext: base.GetMigrationContext(), + tablesInPlace: make(chan bool), + rowCopyComplete: make(chan bool), + allEventsUpToLockProcessed: make(chan bool), + + copyRowsQueue: make(chan tableWriteFunc), + applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer), } migrator.migrationContext.IsThrottled = func() bool { return migrator.shouldThrottle() @@ -43,6 +69,11 @@ func (this *Migrator) shouldThrottle() bool { shouldThrottle := false if time.Duration(lag) > time.Duration(this.migrationContext.MaxLagMillisecondsThrottleThreshold)*time.Millisecond { shouldThrottle = true + } else if this.migrationContext.ThrottleFlagFile != "" { + if _, err := os.Stat(this.migrationContext.ThrottleFlagFile); err == nil { + //Throttle file defined and exists! + shouldThrottle = true + } } return shouldThrottle } @@ -62,6 +93,10 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er { this.tablesInPlace <- true } + case AllEventsUpToLockProcessed: + { + this.allEventsUpToLockProcessed <- true + } default: { return fmt.Errorf("Unknown changelog state: %+v", changelogState) @@ -83,12 +118,13 @@ func (this *Migrator) onChangelogHeartbeatEvent(dmlEvent *binlog.BinlogDMLEvent) lag := time.Now().Sub(heartbeatTime) atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag)) - log.Debugf("---- - - - - - lag %+v", lag) return nil } func (this *Migrator) Migrate() (err error) { + this.migrationContext.StartTime = time.Now() + this.inspector = NewInspector() if err := this.inspector.InitDBConnections(); err != nil { return err @@ -109,6 +145,97 @@ func (this *Migrator) Migrate() (err error) { } log.Infof("Master found to be %+v", this.migrationContext.MasterConnectionConfig.Key) + if err := this.initiateStreaming(); err != nil { + return err + } + if err := this.initiateApplier(); err != nil { + return err + } + + log.Debugf("Waiting for tables to be in place") + <-this.tablesInPlace + log.Debugf("Tables are in place") + // Yay! We now know the Ghost and Changelog tables are good to examine! + // When running on replica, this means the replica has those tables. When running + // on master this is always true, of course, and yet it also implies this knowledge + // is in the binlogs. + + this.migrationContext.UniqueKey = uniqueKeys[0] // TODO. Need to wait on replica till the ghost table exists and get shared keys + if err := this.applier.ReadMigrationRangeValues(); err != nil { + return err + } + go this.initiateStatus() + go this.executeWriteFuncs() + go this.iterateChunks() + + log.Debugf("Operating until row copy is complete") + <-this.rowCopyComplete + log.Debugf("Row copy complete") + this.printStatus() + + throttleMigration( + this.migrationContext, + func() { + log.Debugf("throttling before LOCK TABLES") + }, + nil, + func() { + log.Debugf("done throttling") + }, + ) + // TODO retries!! + this.applier.LockTables() + this.applier.WriteChangelog("state", string(AllEventsUpToLockProcessed)) + log.Debugf("Waiting for events up to lock") + <-this.allEventsUpToLockProcessed + log.Debugf("Done waiting for events up to lock") + // TODO retries!! + this.applier.UnlockTables() + + return nil +} + +func (this *Migrator) initiateStatus() error { + this.printStatus() + statusTick := time.Tick(1 * time.Second) + for range statusTick { + go this.printStatus() + } + + return nil +} + +func (this *Migrator) printStatus() { + elapsedTime := this.migrationContext.ElapsedTime() + elapsedSeconds := int64(elapsedTime.Seconds()) + totalRowsCopied := this.migrationContext.GetTotalRowsCopied() + rowsEstimate := this.migrationContext.RowsEstimate + progressPct := 100.0 * float64(totalRowsCopied) / float64(rowsEstimate) + + shouldPrintStatus := false + if elapsedSeconds <= 60 { + shouldPrintStatus = true + } else if progressPct >= 99.0 { + shouldPrintStatus = true + } else if progressPct >= 95.0 { + shouldPrintStatus = (elapsedSeconds%5 == 0) + } else if elapsedSeconds <= 120 { + shouldPrintStatus = (elapsedSeconds%5 == 0) + } else { + shouldPrintStatus = (elapsedSeconds%30 == 0) + } + if !shouldPrintStatus { + return + } + + status := fmt.Sprintf("Copy: %d/%d %.1f%% Backlog: %d/%d Elapsed: %+v(copy), %+v(total) ETA: N/A", + totalRowsCopied, rowsEstimate, progressPct, + len(this.applyEventsQueue), cap(this.applyEventsQueue), + this.migrationContext.ElapsedRowCopyTime(), elapsedTime) + fmt.Println(status) +} + +func (this *Migrator) initiateStreaming() error { this.eventsStreamer = NewEventsStreamer() if err := this.eventsStreamer.InitDBConnections(); err != nil { return err @@ -133,7 +260,10 @@ func (this *Migrator) Migrate() (err error) { log.Debugf("Beginning streaming") this.eventsStreamer.StreamEvents(func() bool { return this.canStopStreaming() }) }() + return nil +} +func (this *Migrator) initiateApplier() error { this.applier = NewApplier() if err := this.applier.InitDBConnections(); err != nil { return err @@ -153,48 +283,73 @@ func (this *Migrator) Migrate() (err error) { this.applier.WriteChangelog("state", string(TablesInPlace)) this.applier.InitiateHeartbeat() + return nil +} - <-this.tablesInPlace - // Yay! We now know the Ghost and Changelog tables are good to examine! - // When running on replica, this means the replica has those tables. When running - // on master this is always true, of course, and yet it also implies this knowledge - // is in the binlogs. - - this.migrationContext.UniqueKey = uniqueKeys[0] // TODO. Need to wait on replica till the ghost table exists and get shared keys - if err := this.applier.ReadMigrationRangeValues(); err != nil { - return err +func (this *Migrator) iterateChunks() error { + this.migrationContext.RowCopyStartTime = time.Now() + terminateRowIteration := func(err error) error { + this.rowCopyComplete <- true + return log.Errore(err) } + for { + copyRowsFunc := func() error { + hasFurtherRange, err := this.applier.CalculateNextIterationRangeEndValues() + if err != nil { + return terminateRowIteration(err) + } + if !hasFurtherRange { + return terminateRowIteration(nil) + } + _, rowsAffected, _, err := this.applier.ApplyIterationInsertQuery() + if err != nil { + return terminateRowIteration(err) + } + atomic.AddInt64(&this.migrationContext.TotalRowsCopied, rowsAffected) + this.migrationContext.Iteration++ + return nil + } + this.copyRowsQueue <- copyRowsFunc + } + return nil +} + +func (this *Migrator) executeWriteFuncs() error { for { throttleMigration( this.migrationContext, func() { - log.Debugf("throttling rowcopy") + log.Debugf("throttling writes") }, nil, func() { - log.Debugf("done throttling rowcopy") + log.Debugf("done throttling writes") }, ) - isComplete, err := this.applier.IterationIsComplete() - if err != nil { - return err + // We give higher priority to event processing, then secondary priority to + // rowcopy + select { + case applyEventFunc := <-this.applyEventsQueue: + { + retryOperation(applyEventFunc, this.migrationContext.MaxRetries()) + } + default: + { + select { + case copyRowsFunc := <-this.copyRowsQueue: + { + retryOperation(copyRowsFunc, this.migrationContext.MaxRetries()) + } + default: + { + // Hmmmmm... nothing in the queue; no events, but also no row copy. + // This is possible upon load. Let's just sleep it over. + log.Debugf("Getting nothing in the write queue. Sleeping...") + time.Sleep(time.Second) + } + } + } } - if isComplete { - break - } - if err = this.applier.CalculateNextIterationRangeEndValues(); err != nil { - return err - } - if err = this.applier.ApplyIterationInsertQuery(); err != nil { - return err - } - this.migrationContext.Iteration++ } - // temporary wait: - heartbeatTick := time.Tick(10 * time.Second) - for range heartbeatTick { - return nil - } - return nil } diff --git a/go/sql/builder.go b/go/sql/builder.go index 27a5e20..b8f0358 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -128,7 +128,7 @@ func BuildRangePreparedComparison(columns []string, args []interface{}, comparis return BuildRangeComparison(columns, values, args, comparisonSign) } -func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, uniqueKey string, uniqueKeyColumns, rangeStartValues, rangeEndValues []string, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool) (result string, explodedArgs []interface{}, err error) { +func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, uniqueKey string, uniqueKeyColumns, rangeStartValues, rangeEndValues []string, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool) (result string, explodedArgs []interface{}, err error) { if len(sharedColumns) == 0 { return "", explodedArgs, fmt.Errorf("Got 0 shared columns in BuildRangeInsertQuery") } @@ -155,28 +155,32 @@ func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName strin return "", explodedArgs, err } explodedArgs = append(explodedArgs, rangeExplodedArgs...) + transactionalClause := "" + if transactionalTable { + transactionalClause = "lock in share mode" + } result = fmt.Sprintf(` insert /* gh-osc %s.%s */ ignore into %s.%s (%s) (select %s from %s.%s force index (%s) - where (%s and %s) + where (%s and %s) %s ) `, databaseName, originalTableName, databaseName, ghostTableName, sharedColumnsListing, sharedColumnsListing, databaseName, originalTableName, uniqueKey, - rangeStartComparison, rangeEndComparison) + rangeStartComparison, rangeEndComparison, transactionalClause) return result, explodedArgs, nil } -func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, uniqueKey string, uniqueKeyColumns []string, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool) (result string, explodedArgs []interface{}, err error) { +func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, uniqueKey string, uniqueKeyColumns []string, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool) (result string, explodedArgs []interface{}, err error) { rangeStartValues := make([]string, len(uniqueKeyColumns), len(uniqueKeyColumns)) rangeEndValues := make([]string, len(uniqueKeyColumns), len(uniqueKeyColumns)) for i := range uniqueKeyColumns { rangeStartValues[i] = "?" rangeEndValues[i] = "?" } - return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues) + return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable) } -func BuildUniqueKeyRangeEndPreparedQuery(databaseName, tableName string, uniqueKeyColumns []string, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int, hint string) (result string, explodedArgs []interface{}, err error) { +func BuildUniqueKeyRangeEndPreparedQuery(databaseName, tableName string, uniqueKeyColumns []string, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, hint string) (result string, explodedArgs []interface{}, err error) { if len(uniqueKeyColumns) == 0 { return "", explodedArgs, fmt.Errorf("Got 0 columns in BuildUniqueKeyRangeEndPreparedQuery") }