package client import ( "context" "math" "math/rand" "sync" "time" "github.com/pingcap/errors" ) /* Pool for efficient reuse of connections. Usage: pool := client.NewPool(log.Debugf, 100, 400, 5, `127.0.0.1:3306`, `username`, `userpwd`, `dbname`) ... conn, _ := pool.GetConn(ctx) defer pool.PutConn(conn) conn.Execute/conn.Begin/etc... */ type ( Timestamp int64 LogFunc func(format string, args ...interface{}) Pool struct { logFunc LogFunc minAlive int maxAlive int maxIdle int idleCloseTimeout Timestamp idlePingTimeout Timestamp connect func() (*Conn, error) synchro struct { sync.Mutex idleConnections []Connection stats ConnectionStats } readyConnection chan Connection } ConnectionStats struct { // Uses internally TotalCount int // Only for stats IdleCount int CreatedCount int64 } Connection struct { conn *Conn lastUseAt Timestamp } ) var ( // MaxIdleTimeoutWithoutPing - If the connection has been idle for more than this time, // then ping will be performed before use to check if it alive MaxIdleTimeoutWithoutPing = 10 * time.Second // DefaultIdleTimeout - If the connection has been idle for more than this time, // we can close it (but we should remember about Pool.minAlive) DefaultIdleTimeout = 30 * time.Second // MaxNewConnectionAtOnce - If we need to create new connections, // then we will create no more than this number of connections at a time. // This restriction will be ignored on pool initialization. MaxNewConnectionAtOnce = 5 ) // NewPool initializes new connection pool and uses params: addr, user, password, dbName and options. // minAlive specifies the minimum number of open connections that the pool will try to maintain. // maxAlive specifies the maximum number of open connections // (for internal reasons, may be greater by 1 inside newConnectionProducer). // maxIdle specifies the maximum number of idle connections (see DefaultIdleTimeout). func NewPool( logFunc LogFunc, minAlive int, maxAlive int, maxIdle int, addr string, user string, password string, dbName string, options ...func(conn *Conn), ) *Pool { if minAlive > maxAlive { minAlive = maxAlive } if maxIdle > maxAlive { maxIdle = maxAlive } if maxIdle <= minAlive { maxIdle = minAlive } pool := &Pool{ logFunc: logFunc, minAlive: minAlive, maxAlive: maxAlive, maxIdle: maxIdle, idleCloseTimeout: Timestamp(math.Ceil(DefaultIdleTimeout.Seconds())), idlePingTimeout: Timestamp(math.Ceil(MaxIdleTimeoutWithoutPing.Seconds())), connect: func() (*Conn, error) { return Connect(addr, user, password, dbName, options...) }, readyConnection: make(chan Connection), } pool.synchro.idleConnections = make([]Connection, 0, pool.maxIdle) go pool.newConnectionProducer() if pool.minAlive > 0 { pool.logFunc(`Pool: Setup %d new connections (minimal pool size)...`, pool.minAlive) pool.startNewConnections(pool.minAlive) } go pool.closeOldIdleConnections() return pool } func (pool *Pool) GetStats(stats *ConnectionStats) { pool.synchro.Lock() *stats = pool.synchro.stats stats.IdleCount = len(pool.synchro.idleConnections) pool.synchro.Unlock() } // GetConn returns connection from the pool or create new func (pool *Pool) GetConn(ctx context.Context) (*Conn, error) { for { connection, err := pool.getConnection(ctx) if err != nil { return nil, err } // For long time idle connections, we do a ping check if delta := pool.nowTs() - connection.lastUseAt; delta > pool.idlePingTimeout { if err := pool.ping(connection.conn); err != nil { pool.closeConn(connection.conn) continue } } return connection.conn, nil } } // PutConn returns working connection back to pool func (pool *Pool) PutConn(conn *Conn) { pool.putConnection(Connection{ conn: conn, lastUseAt: pool.nowTs(), }) } // DropConn closes the connection without any checks func (pool *Pool) DropConn(conn *Conn) { pool.closeConn(conn) } func (pool *Pool) putConnection(connection Connection) { pool.synchro.Lock() defer pool.synchro.Unlock() // If someone is already waiting for a connection, then we return it to him select { case pool.readyConnection <- connection: return default: } // Nobody needs this connection pool.putConnectionUnsafe(connection) } func (pool *Pool) nowTs() Timestamp { return Timestamp(time.Now().Unix()) } func (pool *Pool) getConnection(ctx context.Context) (Connection, error) { pool.synchro.Lock() connection := pool.getIdleConnectionUnsafe() if connection.conn != nil { pool.synchro.Unlock() return connection, nil } pool.synchro.Unlock() // No idle connections are available select { case connection := <-pool.readyConnection: return connection, nil case <-ctx.Done(): return Connection{}, ctx.Err() } } func (pool *Pool) putConnectionUnsafe(connection Connection) { if len(pool.synchro.idleConnections) == cap(pool.synchro.idleConnections) { pool.synchro.stats.TotalCount-- _ = connection.conn.Close() // Could it be more effective to close older connections? } else { pool.synchro.idleConnections = append(pool.synchro.idleConnections, connection) } } func (pool *Pool) newConnectionProducer() { var connection Connection var err error for { connection.conn = nil pool.synchro.Lock() connection = pool.getIdleConnectionUnsafe() if connection.conn == nil { if pool.synchro.stats.TotalCount >= pool.maxAlive { // Can't create more connections pool.synchro.Unlock() time.Sleep(10 * time.Millisecond) continue } pool.synchro.stats.TotalCount++ // "Reserving" new connection } pool.synchro.Unlock() if connection.conn == nil { connection, err = pool.createNewConnection() if err != nil { pool.synchro.Lock() pool.synchro.stats.TotalCount-- // Bad luck, should try again pool.synchro.Unlock() time.Sleep(time.Duration(10+rand.Intn(90)) * time.Millisecond) continue } } pool.readyConnection <- connection } } func (pool *Pool) createNewConnection() (Connection, error) { var connection Connection var err error connection.conn, err = pool.connect() if err != nil { return Connection{}, errors.Errorf(`Could not connect to mysql: %s`, err) } connection.lastUseAt = pool.nowTs() pool.synchro.Lock() pool.synchro.stats.CreatedCount++ pool.synchro.Unlock() return connection, nil } func (pool *Pool) getIdleConnectionUnsafe() Connection { cnt := len(pool.synchro.idleConnections) if cnt == 0 { return Connection{} } last := cnt - 1 connection := pool.synchro.idleConnections[last] pool.synchro.idleConnections[last].conn = nil pool.synchro.idleConnections = pool.synchro.idleConnections[:last] return connection } func (pool *Pool) closeOldIdleConnections() { var toPing []Connection ticker := time.NewTicker(5 * time.Second) for range ticker.C { toPing = pool.getOldIdleConnections(toPing[:0]) if len(toPing) == 0 { continue } pool.recheckConnections(toPing) if !pool.spawnConnectionsIfNeeded() { pool.closeIdleConnectionsIfCan() } } } func (pool *Pool) getOldIdleConnections(dst []Connection) []Connection { dst = dst[:0] pool.synchro.Lock() synchro := &pool.synchro idleCnt := len(synchro.idleConnections) checkBefore := pool.nowTs() - pool.idlePingTimeout for i := idleCnt - 1; i >= 0; i-- { if synchro.idleConnections[i].lastUseAt > checkBefore { continue } dst = append(dst, synchro.idleConnections[i]) last := idleCnt - 1 if i < last { // Removing an item from the middle of a slice synchro.idleConnections[i], synchro.idleConnections[last] = synchro.idleConnections[last], synchro.idleConnections[i] } synchro.idleConnections[last].conn = nil synchro.idleConnections = synchro.idleConnections[:last] idleCnt-- } pool.synchro.Unlock() return dst } func (pool *Pool) recheckConnections(connections []Connection) { const workerCnt = 2 // Heuristic :) queue := make(chan Connection, len(connections)) for _, connection := range connections { queue <- connection } close(queue) var wg sync.WaitGroup wg.Add(workerCnt) for worker := 0; worker < workerCnt; worker++ { go func() { defer wg.Done() for connection := range queue { if err := pool.ping(connection.conn); err != nil { pool.closeConn(connection.conn) } else { pool.putConnection(connection) } } }() } wg.Wait() } // spawnConnectionsIfNeeded creates new connections if there are not enough of them and returns true in this case func (pool *Pool) spawnConnectionsIfNeeded() bool { pool.synchro.Lock() totalCount := pool.synchro.stats.TotalCount idleCount := len(pool.synchro.idleConnections) needSpanNew := pool.minAlive - totalCount pool.synchro.Unlock() if needSpanNew <= 0 { return false } // Не хватает соединений, нужно создать еще if needSpanNew > MaxNewConnectionAtOnce { needSpanNew = MaxNewConnectionAtOnce } pool.logFunc(`Pool: Setup %d new connections (total: %d idle: %d)...`, needSpanNew, totalCount, idleCount) pool.startNewConnections(needSpanNew) return true } func (pool *Pool) closeIdleConnectionsIfCan() { pool.synchro.Lock() canCloseCnt := pool.synchro.stats.TotalCount - pool.minAlive canCloseCnt-- // -1 to account for an open but unused connection (pool.readyConnection <- connection in newConnectionProducer) idleCnt := len(pool.synchro.idleConnections) inFly := pool.synchro.stats.TotalCount - idleCnt // We can close no more than 10% connections at a time, but at least 1, if possible idleCanCloseCnt := idleCnt / 10 if idleCanCloseCnt == 0 { idleCanCloseCnt = 1 } if canCloseCnt > idleCanCloseCnt { canCloseCnt = idleCanCloseCnt } if canCloseCnt <= 0 { pool.synchro.Unlock() return } closeFromIdx := idleCnt - canCloseCnt if closeFromIdx < 0 { // If there are enough requests in the "flight" now, then we can close all unnecessary closeFromIdx = 0 } toClose := append([]Connection{}, pool.synchro.idleConnections[closeFromIdx:]...) for i := closeFromIdx; i < idleCnt; i++ { pool.synchro.idleConnections[i].conn = nil } pool.synchro.idleConnections = pool.synchro.idleConnections[:closeFromIdx] pool.synchro.Unlock() pool.logFunc(`Pool: Close %d idle connections (in fly %d)`, len(toClose), inFly) for _, connection := range toClose { pool.closeConn(connection.conn) } } func (pool *Pool) closeConn(conn *Conn) { pool.synchro.Lock() pool.synchro.stats.TotalCount-- pool.synchro.Unlock() _ = conn.Close() // Closing is not an instant action, so do it outside the lock } func (pool *Pool) startNewConnections(count int) { connections := make([]Connection, 0, count) for i := 0; i < count; i++ { if conn, err := pool.createNewConnection(); err == nil { pool.synchro.Lock() pool.synchro.stats.TotalCount++ pool.synchro.Unlock() connections = append(connections, conn) } } pool.synchro.Lock() for _, connection := range connections { pool.putConnectionUnsafe(connection) } pool.synchro.Unlock() } func (pool *Pool) ping(conn *Conn) error { deadline := time.Now().Add(100 * time.Millisecond) _ = conn.SetWriteDeadline(deadline) _ = conn.SetReadDeadline(deadline) err := conn.Ping() if err != nil { pool.logFunc(`Pool: ping query fail: %s`, err.Error()) } return err }