diff --git a/vendor/github.com/ngaut/deadline/rw.go b/vendor/github.com/ngaut/deadline/rw.go new file mode 100644 index 0000000..19d4368 --- /dev/null +++ b/vendor/github.com/ngaut/deadline/rw.go @@ -0,0 +1,50 @@ +package deadline + +import ( + "io" + "time" +) + +type DeadlineReader interface { + io.Reader + SetReadDeadline(t time.Time) error +} + +type DeadlineWriter interface { + io.Writer + SetWriteDeadline(t time.Time) error +} + +type DeadlineReadWriter interface { + io.ReadWriter + SetReadDeadline(t time.Time) error + SetWriteDeadline(t time.Time) error +} + +type deadlineReader struct { + DeadlineReader + timeout time.Duration +} + +func (r *deadlineReader) Read(p []byte) (int, error) { + r.DeadlineReader.SetReadDeadline(time.Now().Add(r.timeout)) + return r.DeadlineReader.Read(p) +} + +func NewDeadlineReader(r DeadlineReader, timeout time.Duration) io.Reader { + return &deadlineReader{DeadlineReader: r, timeout: timeout} +} + +type deadlineWriter struct { + DeadlineWriter + timeout time.Duration +} + +func (r *deadlineWriter) Write(p []byte) (int, error) { + r.DeadlineWriter.SetWriteDeadline(time.Now().Add(r.timeout)) + return r.DeadlineWriter.Write(p) +} + +func NewDeadlineWriter(r DeadlineWriter, timeout time.Duration) io.Writer { + return &deadlineWriter{DeadlineWriter: r, timeout: timeout} +} diff --git a/vendor/github.com/ngaut/log/LICENSE b/vendor/github.com/ngaut/log/LICENSE new file mode 100644 index 0000000..6600f1c --- /dev/null +++ b/vendor/github.com/ngaut/log/LICENSE @@ -0,0 +1,165 @@ +GNU LESSER GENERAL PUBLIC LICENSE + Version 3, 29 June 2007 + + Copyright (C) 2007 Free Software Foundation, Inc. + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. + + + This version of the GNU Lesser General Public License incorporates +the terms and conditions of version 3 of the GNU General Public +License, supplemented by the additional permissions listed below. + + 0. Additional Definitions. + + As used herein, "this License" refers to version 3 of the GNU Lesser +General Public License, and the "GNU GPL" refers to version 3 of the GNU +General Public License. + + "The Library" refers to a covered work governed by this License, +other than an Application or a Combined Work as defined below. + + An "Application" is any work that makes use of an interface provided +by the Library, but which is not otherwise based on the Library. +Defining a subclass of a class defined by the Library is deemed a mode +of using an interface provided by the Library. + + A "Combined Work" is a work produced by combining or linking an +Application with the Library. The particular version of the Library +with which the Combined Work was made is also called the "Linked +Version". + + The "Minimal Corresponding Source" for a Combined Work means the +Corresponding Source for the Combined Work, excluding any source code +for portions of the Combined Work that, considered in isolation, are +based on the Application, and not on the Linked Version. + + The "Corresponding Application Code" for a Combined Work means the +object code and/or source code for the Application, including any data +and utility programs needed for reproducing the Combined Work from the +Application, but excluding the System Libraries of the Combined Work. + + 1. Exception to Section 3 of the GNU GPL. + + You may convey a covered work under sections 3 and 4 of this License +without being bound by section 3 of the GNU GPL. + + 2. Conveying Modified Versions. + + If you modify a copy of the Library, and, in your modifications, a +facility refers to a function or data to be supplied by an Application +that uses the facility (other than as an argument passed when the +facility is invoked), then you may convey a copy of the modified +version: + + a) under this License, provided that you make a good faith effort to + ensure that, in the event an Application does not supply the + function or data, the facility still operates, and performs + whatever part of its purpose remains meaningful, or + + b) under the GNU GPL, with none of the additional permissions of + this License applicable to that copy. + + 3. Object Code Incorporating Material from Library Header Files. + + The object code form of an Application may incorporate material from +a header file that is part of the Library. You may convey such object +code under terms of your choice, provided that, if the incorporated +material is not limited to numerical parameters, data structure +layouts and accessors, or small macros, inline functions and templates +(ten or fewer lines in length), you do both of the following: + + a) Give prominent notice with each copy of the object code that the + Library is used in it and that the Library and its use are + covered by this License. + + b) Accompany the object code with a copy of the GNU GPL and this license + document. + + 4. Combined Works. + + You may convey a Combined Work under terms of your choice that, +taken together, effectively do not restrict modification of the +portions of the Library contained in the Combined Work and reverse +engineering for debugging such modifications, if you also do each of +the following: + + a) Give prominent notice with each copy of the Combined Work that + the Library is used in it and that the Library and its use are + covered by this License. + + b) Accompany the Combined Work with a copy of the GNU GPL and this license + document. + + c) For a Combined Work that displays copyright notices during + execution, include the copyright notice for the Library among + these notices, as well as a reference directing the user to the + copies of the GNU GPL and this license document. + + d) Do one of the following: + + 0) Convey the Minimal Corresponding Source under the terms of this + License, and the Corresponding Application Code in a form + suitable for, and under terms that permit, the user to + recombine or relink the Application with a modified version of + the Linked Version to produce a modified Combined Work, in the + manner specified by section 6 of the GNU GPL for conveying + Corresponding Source. + + 1) Use a suitable shared library mechanism for linking with the + Library. A suitable mechanism is one that (a) uses at run time + a copy of the Library already present on the user's computer + system, and (b) will operate properly with a modified version + of the Library that is interface-compatible with the Linked + Version. + + e) Provide Installation Information, but only if you would otherwise + be required to provide such information under section 6 of the + GNU GPL, and only to the extent that such information is + necessary to install and execute a modified version of the + Combined Work produced by recombining or relinking the + Application with a modified version of the Linked Version. (If + you use option 4d0, the Installation Information must accompany + the Minimal Corresponding Source and Corresponding Application + Code. If you use option 4d1, you must provide the Installation + Information in the manner specified by section 6 of the GNU GPL + for conveying Corresponding Source.) + + 5. Combined Libraries. + + You may place library facilities that are a work based on the +Library side by side in a single library together with other library +facilities that are not Applications and are not covered by this +License, and convey such a combined library under terms of your +choice, if you do both of the following: + + a) Accompany the combined library with a copy of the same work based + on the Library, uncombined with any other library facilities, + conveyed under the terms of this License. + + b) Give prominent notice with the combined library that part of it + is a work based on the Library, and explaining where to find the + accompanying uncombined form of the same work. + + 6. Revised Versions of the GNU Lesser General Public License. + + The Free Software Foundation may publish revised and/or new versions +of the GNU Lesser General Public License from time to time. Such new +versions will be similar in spirit to the present version, but may +differ in detail to address new problems or concerns. + + Each version is given a distinguishing version number. If the +Library as you received it specifies that a certain numbered version +of the GNU Lesser General Public License "or any later version" +applies to it, you have the option of following the terms and +conditions either of that published version or of any later version +published by the Free Software Foundation. If the Library as you +received it does not specify a version number of the GNU Lesser +General Public License, you may choose any version of the GNU Lesser +General Public License ever published by the Free Software Foundation. + + If the Library as you received it specifies that a proxy can decide +whether future versions of the GNU Lesser General Public License shall +apply, that proxy's public statement of acceptance of any version is +permanent authorization for you to choose that version for the +Library. diff --git a/vendor/github.com/ngaut/log/README.md b/vendor/github.com/ngaut/log/README.md new file mode 100644 index 0000000..e0e857e --- /dev/null +++ b/vendor/github.com/ngaut/log/README.md @@ -0,0 +1,2 @@ +logging +======= diff --git a/vendor/github.com/ngaut/log/crash_unix.go b/vendor/github.com/ngaut/log/crash_unix.go new file mode 100644 index 0000000..37f407d --- /dev/null +++ b/vendor/github.com/ngaut/log/crash_unix.go @@ -0,0 +1,18 @@ +// +build freebsd openbsd netbsd dragonfly darwin linux + +package log + +import ( + "log" + "os" + "syscall" +) + +func CrashLog(file string) { + f, err := os.OpenFile(file, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) + if err != nil { + log.Println(err.Error()) + } else { + syscall.Dup2(int(f.Fd()), 2) + } +} diff --git a/vendor/github.com/ngaut/log/crash_win.go b/vendor/github.com/ngaut/log/crash_win.go new file mode 100644 index 0000000..7d612ee --- /dev/null +++ b/vendor/github.com/ngaut/log/crash_win.go @@ -0,0 +1,37 @@ +// +build windows + +package log + +import ( + "log" + "os" + "syscall" +) + +var ( + kernel32 = syscall.MustLoadDLL("kernel32.dll") + procSetStdHandle = kernel32.MustFindProc("SetStdHandle") +) + +func setStdHandle(stdhandle int32, handle syscall.Handle) error { + r0, _, e1 := syscall.Syscall(procSetStdHandle.Addr(), 2, uintptr(stdhandle), uintptr(handle), 0) + if r0 == 0 { + if e1 != 0 { + return error(e1) + } + return syscall.EINVAL + } + return nil +} + +func CrashLog(file string) { + f, err := os.OpenFile(file, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) + if err != nil { + log.Println(err.Error()) + } else { + err = setStdHandle(syscall.STD_ERROR_HANDLE, syscall.Handle(f.Fd())) + if err != nil { + log.Println(err.Error()) + } + } +} diff --git a/vendor/github.com/ngaut/log/log.go b/vendor/github.com/ngaut/log/log.go new file mode 100644 index 0000000..896b393 --- /dev/null +++ b/vendor/github.com/ngaut/log/log.go @@ -0,0 +1,380 @@ +//high level log wrapper, so it can output different log based on level +package log + +import ( + "fmt" + "io" + "log" + "os" + "runtime" + "sync" + "time" +) + +const ( + Ldate = log.Ldate + Llongfile = log.Llongfile + Lmicroseconds = log.Lmicroseconds + Lshortfile = log.Lshortfile + LstdFlags = log.LstdFlags + Ltime = log.Ltime +) + +type ( + LogLevel int + LogType int +) + +const ( + LOG_FATAL = LogType(0x1) + LOG_ERROR = LogType(0x2) + LOG_WARNING = LogType(0x4) + LOG_INFO = LogType(0x8) + LOG_DEBUG = LogType(0x10) +) + +const ( + LOG_LEVEL_NONE = LogLevel(0x0) + LOG_LEVEL_FATAL = LOG_LEVEL_NONE | LogLevel(LOG_FATAL) + LOG_LEVEL_ERROR = LOG_LEVEL_FATAL | LogLevel(LOG_ERROR) + LOG_LEVEL_WARN = LOG_LEVEL_ERROR | LogLevel(LOG_WARNING) + LOG_LEVEL_INFO = LOG_LEVEL_WARN | LogLevel(LOG_INFO) + LOG_LEVEL_DEBUG = LOG_LEVEL_INFO | LogLevel(LOG_DEBUG) + LOG_LEVEL_ALL = LOG_LEVEL_DEBUG +) + +const FORMAT_TIME_DAY string = "20060102" +const FORMAT_TIME_HOUR string = "2006010215" + +var _log *logger = New() + +func init() { + SetFlags(Ldate | Ltime | Lshortfile) + SetHighlighting(runtime.GOOS != "windows") +} + +func Logger() *log.Logger { + return _log._log +} + +func SetLevel(level LogLevel) { + _log.SetLevel(level) +} +func GetLogLevel() LogLevel { + return _log.level +} + +func SetOutput(out io.Writer) { + _log.SetOutput(out) +} + +func SetOutputByName(path string) error { + return _log.SetOutputByName(path) +} + +func SetFlags(flags int) { + _log._log.SetFlags(flags) +} + +func Info(v ...interface{}) { + _log.Info(v...) +} + +func Infof(format string, v ...interface{}) { + _log.Infof(format, v...) +} + +func Debug(v ...interface{}) { + _log.Debug(v...) +} + +func Debugf(format string, v ...interface{}) { + _log.Debugf(format, v...) +} + +func Warn(v ...interface{}) { + _log.Warning(v...) +} + +func Warnf(format string, v ...interface{}) { + _log.Warningf(format, v...) +} + +func Warning(v ...interface{}) { + _log.Warning(v...) +} + +func Warningf(format string, v ...interface{}) { + _log.Warningf(format, v...) +} + +func Error(v ...interface{}) { + _log.Error(v...) +} + +func Errorf(format string, v ...interface{}) { + _log.Errorf(format, v...) +} + +func Fatal(v ...interface{}) { + _log.Fatal(v...) +} + +func Fatalf(format string, v ...interface{}) { + _log.Fatalf(format, v...) +} + +func SetLevelByString(level string) { + _log.SetLevelByString(level) +} + +func SetHighlighting(highlighting bool) { + _log.SetHighlighting(highlighting) +} + +func SetRotateByDay() { + _log.SetRotateByDay() +} + +func SetRotateByHour() { + _log.SetRotateByHour() +} + +type logger struct { + _log *log.Logger + level LogLevel + highlighting bool + + dailyRolling bool + hourRolling bool + + fileName string + logSuffix string + fd *os.File + + lock sync.Mutex +} + +func (l *logger) SetHighlighting(highlighting bool) { + l.highlighting = highlighting +} + +func (l *logger) SetLevel(level LogLevel) { + l.level = level +} + +func (l *logger) SetLevelByString(level string) { + l.level = StringToLogLevel(level) +} + +func (l *logger) SetRotateByDay() { + l.dailyRolling = true + l.logSuffix = genDayTime(time.Now()) +} + +func (l *logger) SetRotateByHour() { + l.hourRolling = true + l.logSuffix = genHourTime(time.Now()) +} + +func (l *logger) rotate() error { + l.lock.Lock() + defer l.lock.Unlock() + + var suffix string + if l.dailyRolling { + suffix = genDayTime(time.Now()) + } else if l.hourRolling { + suffix = genHourTime(time.Now()) + } else { + return nil + } + + // Notice: if suffix is not equal to l.LogSuffix, then rotate + if suffix != l.logSuffix { + err := l.doRotate(suffix) + if err != nil { + return err + } + } + + return nil +} + +func (l *logger) doRotate(suffix string) error { + // Notice: Not check error, is this ok? + l.fd.Close() + + lastFileName := l.fileName + "." + l.logSuffix + err := os.Rename(l.fileName, lastFileName) + if err != nil { + return err + } + + err = l.SetOutputByName(l.fileName) + if err != nil { + return err + } + + l.logSuffix = suffix + + return nil +} + +func (l *logger) SetOutput(out io.Writer) { + l._log = log.New(out, l._log.Prefix(), l._log.Flags()) +} + +func (l *logger) SetOutputByName(path string) error { + f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666) + if err != nil { + log.Fatal(err) + } + + l.SetOutput(f) + + l.fileName = path + l.fd = f + + return err +} + +func (l *logger) log(t LogType, v ...interface{}) { + if l.level|LogLevel(t) != l.level { + return + } + + err := l.rotate() + if err != nil { + fmt.Fprintf(os.Stderr, "%s\n", err.Error()) + return + } + + v1 := make([]interface{}, len(v)+2) + logStr, logColor := LogTypeToString(t) + if l.highlighting { + v1[0] = "\033" + logColor + "m[" + logStr + "]" + copy(v1[1:], v) + v1[len(v)+1] = "\033[0m" + } else { + v1[0] = "[" + logStr + "]" + copy(v1[1:], v) + v1[len(v)+1] = "" + } + + s := fmt.Sprintln(v1...) + l._log.Output(4, s) +} + +func (l *logger) logf(t LogType, format string, v ...interface{}) { + if l.level|LogLevel(t) != l.level { + return + } + + err := l.rotate() + if err != nil { + fmt.Fprintf(os.Stderr, "%s\n", err.Error()) + return + } + + logStr, logColor := LogTypeToString(t) + var s string + if l.highlighting { + s = "\033" + logColor + "m[" + logStr + "] " + fmt.Sprintf(format, v...) + "\033[0m" + } else { + s = "[" + logStr + "] " + fmt.Sprintf(format, v...) + } + l._log.Output(4, s) +} + +func (l *logger) Fatal(v ...interface{}) { + l.log(LOG_FATAL, v...) + os.Exit(-1) +} + +func (l *logger) Fatalf(format string, v ...interface{}) { + l.logf(LOG_FATAL, format, v...) + os.Exit(-1) +} + +func (l *logger) Error(v ...interface{}) { + l.log(LOG_ERROR, v...) +} + +func (l *logger) Errorf(format string, v ...interface{}) { + l.logf(LOG_ERROR, format, v...) +} + +func (l *logger) Warning(v ...interface{}) { + l.log(LOG_WARNING, v...) +} + +func (l *logger) Warningf(format string, v ...interface{}) { + l.logf(LOG_WARNING, format, v...) +} + +func (l *logger) Debug(v ...interface{}) { + l.log(LOG_DEBUG, v...) +} + +func (l *logger) Debugf(format string, v ...interface{}) { + l.logf(LOG_DEBUG, format, v...) +} + +func (l *logger) Info(v ...interface{}) { + l.log(LOG_INFO, v...) +} + +func (l *logger) Infof(format string, v ...interface{}) { + l.logf(LOG_INFO, format, v...) +} + +func StringToLogLevel(level string) LogLevel { + switch level { + case "fatal": + return LOG_LEVEL_FATAL + case "error": + return LOG_LEVEL_ERROR + case "warn": + return LOG_LEVEL_WARN + case "warning": + return LOG_LEVEL_WARN + case "debug": + return LOG_LEVEL_DEBUG + case "info": + return LOG_LEVEL_INFO + } + return LOG_LEVEL_ALL +} + +func LogTypeToString(t LogType) (string, string) { + switch t { + case LOG_FATAL: + return "fatal", "[0;31" + case LOG_ERROR: + return "error", "[0;31" + case LOG_WARNING: + return "warning", "[0;33" + case LOG_DEBUG: + return "debug", "[0;36" + case LOG_INFO: + return "info", "[0;37" + } + return "unknown", "[0;37" +} + +func genDayTime(t time.Time) string { + return t.Format(FORMAT_TIME_DAY) +} + +func genHourTime(t time.Time) string { + return t.Format(FORMAT_TIME_HOUR) +} + +func New() *logger { + return Newlogger(os.Stderr, "") +} + +func Newlogger(w io.Writer, prefix string) *logger { + return &logger{_log: log.New(w, prefix, LstdFlags), level: LOG_LEVEL_ALL, highlighting: true} +} diff --git a/vendor/github.com/ngaut/log/log_test.go b/vendor/github.com/ngaut/log/log_test.go new file mode 100644 index 0000000..adbcfb3 --- /dev/null +++ b/vendor/github.com/ngaut/log/log_test.go @@ -0,0 +1,197 @@ +package log + +import ( + "bufio" + "fmt" + "io" + "os" + "strings" + "testing" + "time" +) + +func isFileExists(name string) bool { + f, err := os.Stat(name) + if err != nil { + if os.IsNotExist(err) { + return false + } + } + + if f.IsDir() { + return false + } + + return true +} + +func parseDate(value string, format string) (time.Time, error) { + tt, err := time.ParseInLocation(format, value, time.Local) + if err != nil { + fmt.Println("[Error]" + err.Error()) + return tt, err + } + + return tt, nil +} + +func checkLogData(fileName string, containData string, num int64) error { + input, err := os.OpenFile(fileName, os.O_RDONLY, 0) + if err != nil { + return err + } + defer input.Close() + + var lineNum int64 + br := bufio.NewReader(input) + for { + line, err := br.ReadString('\n') + if err == io.EOF { + break + } + + realLine := strings.TrimRight(line, "\n") + if strings.Contains(realLine, containData) { + lineNum += 1 + } + } + + // check whether num is equal to lineNum + if lineNum != num { + return fmt.Errorf("checkLogData fail - %d vs %d", lineNum, num) + } + + return nil +} + +func TestDayRotateCase(t *testing.T) { + _log = New() + + logName := "example_day_test.log" + if isFileExists(logName) { + err := os.Remove(logName) + if err != nil { + t.Errorf("Remove old log file fail - %s, %s\n", err.Error(), logName) + } + } + + SetRotateByDay() + err := SetOutputByName(logName) + if err != nil { + t.Errorf("SetOutputByName fail - %s, %s\n", err.Error(), logName) + } + + if _log.logSuffix == "" { + t.Errorf("bad log suffix fail - %s\n", _log.logSuffix) + } + + day, err := parseDate(_log.logSuffix, FORMAT_TIME_DAY) + if err != nil { + t.Errorf("parseDate fail - %s, %s\n", err.Error(), _log.logSuffix) + } + + _log.Info("Test data") + _log.Infof("Test data - %s", day.String()) + + // mock log suffix to check rotate + lastDay := day.AddDate(0, 0, -1) + _log.logSuffix = genDayTime(lastDay) + oldLogSuffix := _log.logSuffix + + _log.Info("Test new data") + _log.Infof("Test new data - %s", day.String()) + + err = _log.fd.Close() + if err != nil { + t.Errorf("close log fd fail - %s, %s\n", err.Error(), _log.fileName) + } + + // check both old and new log file datas + oldLogName := logName + "." + oldLogSuffix + err = checkLogData(oldLogName, "Test data", 2) + if err != nil { + t.Errorf("old log file checkLogData fail - %s, %s\n", err.Error(), oldLogName) + } + + err = checkLogData(logName, "Test new data", 2) + if err != nil { + t.Errorf("new log file checkLogData fail - %s, %s\n", err.Error(), logName) + } + + // remove test log files + err = os.Remove(oldLogName) + if err != nil { + t.Errorf("Remove final old log file fail - %s, %s\n", err.Error(), oldLogName) + } + + err = os.Remove(logName) + if err != nil { + t.Errorf("Remove final new log file fail - %s, %s\n", err.Error(), logName) + } +} + +func TestHourRotateCase(t *testing.T) { + _log = New() + + logName := "example_hour_test.log" + if isFileExists(logName) { + err := os.Remove(logName) + if err != nil { + t.Errorf("Remove old log file fail - %s, %s\n", err.Error(), logName) + } + } + + SetRotateByHour() + err := SetOutputByName(logName) + if err != nil { + t.Errorf("SetOutputByName fail - %s, %s\n", err.Error(), logName) + } + + if _log.logSuffix == "" { + t.Errorf("bad log suffix fail - %s\n", _log.logSuffix) + } + + hour, err := parseDate(_log.logSuffix, FORMAT_TIME_HOUR) + if err != nil { + t.Errorf("parseDate fail - %s, %s\n", err.Error(), _log.logSuffix) + } + + _log.Info("Test data") + _log.Infof("Test data - %s", hour.String()) + + // mock log suffix to check rotate + lastHour := hour.Add(time.Duration(-1 * time.Hour)) + _log.logSuffix = genHourTime(lastHour) + oldLogSuffix := _log.logSuffix + + _log.Info("Test new data") + _log.Infof("Test new data - %s", hour.String()) + + err = _log.fd.Close() + if err != nil { + t.Errorf("close log fd fail - %s, %s\n", err.Error(), _log.fileName) + } + + // check both old and new log file datas + oldLogName := logName + "." + oldLogSuffix + err = checkLogData(oldLogName, "Test data", 2) + if err != nil { + t.Errorf("old log file checkLogData fail - %s, %s\n", err.Error(), oldLogName) + } + + err = checkLogData(logName, "Test new data", 2) + if err != nil { + t.Errorf("new log file checkLogData fail - %s, %s\n", err.Error(), logName) + } + + // remove test log files + err = os.Remove(oldLogName) + if err != nil { + t.Errorf("Remove final old log file fail - %s, %s\n", err.Error(), oldLogName) + } + + err = os.Remove(logName) + if err != nil { + t.Errorf("Remove final new log file fail - %s, %s\n", err.Error(), logName) + } +} diff --git a/vendor/github.com/ngaut/pools/id_pool.go b/vendor/github.com/ngaut/pools/id_pool.go new file mode 100644 index 0000000..31db606 --- /dev/null +++ b/vendor/github.com/ngaut/pools/id_pool.go @@ -0,0 +1,72 @@ +// Copyright 2014, Google Inc. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package pools + +import ( + "fmt" + "sync" +) + +// IDPool is used to ensure that the set of IDs in use concurrently never +// contains any duplicates. The IDs start at 1 and increase without bound, but +// will never be larger than the peak number of concurrent uses. +// +// IDPool's Get() and Set() methods can be used concurrently. +type IDPool struct { + sync.Mutex + + // used holds the set of values that have been returned to us with Put(). + used map[uint32]bool + // maxUsed remembers the largest value we've given out. + maxUsed uint32 +} + +// NewIDPool creates and initializes an IDPool. +func NewIDPool() *IDPool { + return &IDPool{ + used: make(map[uint32]bool), + } +} + +// Get returns an ID that is unique among currently active users of this pool. +func (pool *IDPool) Get() (id uint32) { + pool.Lock() + defer pool.Unlock() + + // Pick a value that's been returned, if any. + for key, _ := range pool.used { + delete(pool.used, key) + return key + } + + // No recycled IDs are available, so increase the pool size. + pool.maxUsed += 1 + return pool.maxUsed +} + +// Put recycles an ID back into the pool for others to use. Putting back a value +// or 0, or a value that is not currently "checked out", will result in a panic +// because that should never happen except in the case of a programming error. +func (pool *IDPool) Put(id uint32) { + pool.Lock() + defer pool.Unlock() + + if id < 1 || id > pool.maxUsed { + panic(fmt.Errorf("IDPool.Put(%v): invalid value, must be in the range [1,%v]", id, pool.maxUsed)) + } + + if pool.used[id] { + panic(fmt.Errorf("IDPool.Put(%v): can't put value that was already recycled", id)) + } + + // If we're recycling maxUsed, just shrink the pool. + if id == pool.maxUsed { + pool.maxUsed = id - 1 + return + } + + // Add it to the set of recycled IDs. + pool.used[id] = true +} diff --git a/vendor/github.com/ngaut/pools/id_pool_test.go b/vendor/github.com/ngaut/pools/id_pool_test.go new file mode 100644 index 0000000..a437874 --- /dev/null +++ b/vendor/github.com/ngaut/pools/id_pool_test.go @@ -0,0 +1,118 @@ +// Copyright 2014, Google Inc. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package pools + +import ( + "reflect" + "strings" + "testing" +) + +func (pool *IDPool) want(want *IDPool, t *testing.T) { + if pool.maxUsed != want.maxUsed { + t.Errorf("pool.maxUsed = %#v, want %#v", pool.maxUsed, want.maxUsed) + } + + if !reflect.DeepEqual(pool.used, want.used) { + t.Errorf("pool.used = %#v, want %#v", pool.used, want.used) + } +} + +func TestIDPoolFirstGet(t *testing.T) { + pool := NewIDPool() + + if got := pool.Get(); got != 1 { + t.Errorf("pool.Get() = %v, want 1", got) + } + + pool.want(&IDPool{used: map[uint32]bool{}, maxUsed: 1}, t) +} + +func TestIDPoolSecondGet(t *testing.T) { + pool := NewIDPool() + pool.Get() + + if got := pool.Get(); got != 2 { + t.Errorf("pool.Get() = %v, want 2", got) + } + + pool.want(&IDPool{used: map[uint32]bool{}, maxUsed: 2}, t) +} + +func TestIDPoolPutToUsedSet(t *testing.T) { + pool := NewIDPool() + id1 := pool.Get() + pool.Get() + pool.Put(id1) + + pool.want(&IDPool{used: map[uint32]bool{1: true}, maxUsed: 2}, t) +} + +func TestIDPoolPutMaxUsed1(t *testing.T) { + pool := NewIDPool() + id1 := pool.Get() + pool.Put(id1) + + pool.want(&IDPool{used: map[uint32]bool{}, maxUsed: 0}, t) +} + +func TestIDPoolPutMaxUsed2(t *testing.T) { + pool := NewIDPool() + pool.Get() + id2 := pool.Get() + pool.Put(id2) + + pool.want(&IDPool{used: map[uint32]bool{}, maxUsed: 1}, t) +} + +func TestIDPoolGetFromUsedSet(t *testing.T) { + pool := NewIDPool() + id1 := pool.Get() + pool.Get() + pool.Put(id1) + + if got := pool.Get(); got != 1 { + t.Errorf("pool.Get() = %v, want 1", got) + } + + pool.want(&IDPool{used: map[uint32]bool{}, maxUsed: 2}, t) +} + +func wantError(want string, t *testing.T) { + rec := recover() + if rec == nil { + t.Errorf("expected panic, but there wasn't one") + } + err, ok := rec.(error) + if !ok || !strings.Contains(err.Error(), want) { + t.Errorf("wrong error, got '%v', want '%v'", err, want) + } +} + +func TestIDPoolPut0(t *testing.T) { + pool := NewIDPool() + pool.Get() + + defer wantError("invalid value", t) + pool.Put(0) +} + +func TestIDPoolPutInvalid(t *testing.T) { + pool := NewIDPool() + pool.Get() + + defer wantError("invalid value", t) + pool.Put(5) +} + +func TestIDPoolPutDuplicate(t *testing.T) { + pool := NewIDPool() + pool.Get() + pool.Get() + pool.Put(1) + + defer wantError("already recycled", t) + pool.Put(1) +} diff --git a/vendor/github.com/ngaut/pools/numbered.go b/vendor/github.com/ngaut/pools/numbered.go new file mode 100644 index 0000000..e170e03 --- /dev/null +++ b/vendor/github.com/ngaut/pools/numbered.go @@ -0,0 +1,149 @@ +// Copyright 2012, Google Inc. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package pools + +import ( + "fmt" + "sync" + "time" +) + +// Numbered allows you to manage resources by tracking them with numbers. +// There are no interface restrictions on what you can track. +type Numbered struct { + mu sync.Mutex + empty *sync.Cond // Broadcast when pool becomes empty + resources map[int64]*numberedWrapper +} + +type numberedWrapper struct { + val interface{} + inUse bool + purpose string + timeCreated time.Time + timeUsed time.Time +} + +func NewNumbered() *Numbered { + n := &Numbered{resources: make(map[int64]*numberedWrapper)} + n.empty = sync.NewCond(&n.mu) + return n +} + +// Register starts tracking a resource by the supplied id. +// It does not lock the object. +// It returns an error if the id already exists. +func (nu *Numbered) Register(id int64, val interface{}) error { + nu.mu.Lock() + defer nu.mu.Unlock() + if _, ok := nu.resources[id]; ok { + return fmt.Errorf("already present") + } + now := time.Now() + nu.resources[id] = &numberedWrapper{ + val: val, + timeCreated: now, + timeUsed: now, + } + return nil +} + +// Unregiester forgets the specified resource. +// If the resource is not present, it's ignored. +func (nu *Numbered) Unregister(id int64) { + nu.mu.Lock() + defer nu.mu.Unlock() + delete(nu.resources, id) + if len(nu.resources) == 0 { + nu.empty.Broadcast() + } +} + +// Get locks the resource for use. It accepts a purpose as a string. +// If it cannot be found, it returns a "not found" error. If in use, +// it returns a "in use: purpose" error. +func (nu *Numbered) Get(id int64, purpose string) (val interface{}, err error) { + nu.mu.Lock() + defer nu.mu.Unlock() + nw, ok := nu.resources[id] + if !ok { + return nil, fmt.Errorf("not found") + } + if nw.inUse { + return nil, fmt.Errorf("in use: %s", nw.purpose) + } + nw.inUse = true + nw.purpose = purpose + return nw.val, nil +} + +// Put unlocks a resource for someone else to use. +func (nu *Numbered) Put(id int64) { + nu.mu.Lock() + defer nu.mu.Unlock() + if nw, ok := nu.resources[id]; ok { + nw.inUse = false + nw.purpose = "" + nw.timeUsed = time.Now() + } +} + +// GetOutdated returns a list of resources that are older than age, and locks them. +// It does not return any resources that are already locked. +func (nu *Numbered) GetOutdated(age time.Duration, purpose string) (vals []interface{}) { + nu.mu.Lock() + defer nu.mu.Unlock() + now := time.Now() + for _, nw := range nu.resources { + if nw.inUse { + continue + } + if nw.timeCreated.Add(age).Sub(now) <= 0 { + nw.inUse = true + nw.purpose = purpose + vals = append(vals, nw.val) + } + } + return vals +} + +// GetIdle returns a list of resurces that have been idle for longer +// than timeout, and locks them. It does not return any resources that +// are already locked. +func (nu *Numbered) GetIdle(timeout time.Duration, purpose string) (vals []interface{}) { + nu.mu.Lock() + defer nu.mu.Unlock() + now := time.Now() + for _, nw := range nu.resources { + if nw.inUse { + continue + } + if nw.timeUsed.Add(timeout).Sub(now) <= 0 { + nw.inUse = true + nw.purpose = purpose + vals = append(vals, nw.val) + } + } + return vals +} + +// WaitForEmpty returns as soon as the pool becomes empty +func (nu *Numbered) WaitForEmpty() { + nu.mu.Lock() + defer nu.mu.Unlock() + for len(nu.resources) != 0 { + nu.empty.Wait() + } +} + +func (nu *Numbered) StatsJSON() string { + return fmt.Sprintf("{\"Size\": %v}", nu.Size()) +} + +func (nu *Numbered) Size() (size int64) { + nu.mu.Lock() + defer nu.mu.Unlock() + return int64(len(nu.resources)) +} diff --git a/vendor/github.com/ngaut/pools/numbered_test.go b/vendor/github.com/ngaut/pools/numbered_test.go new file mode 100644 index 0000000..54d5946 --- /dev/null +++ b/vendor/github.com/ngaut/pools/numbered_test.go @@ -0,0 +1,84 @@ +// Copyright 2012, Google Inc. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package pools + +import ( + "testing" + "time" +) + +func TestNumbered(t *testing.T) { + id := int64(0) + p := NewNumbered() + + var err error + if err = p.Register(id, id); err != nil { + t.Errorf("Error %v", err) + } + if err = p.Register(id, id); err.Error() != "already present" { + t.Errorf("want 'already present', got '%v'", err) + } + var v interface{} + if v, err = p.Get(id, "test"); err != nil { + t.Errorf("Error %v", err) + } + if v.(int64) != id { + t.Errorf("want %v, got %v", id, v.(int64)) + } + if v, err = p.Get(id, "test1"); err.Error() != "in use: test" { + t.Errorf("want 'in use: test', got '%v'", err) + } + p.Put(id) + if v, err = p.Get(1, "test2"); err.Error() != "not found" { + t.Errorf("want 'not found', got '%v'", err) + } + p.Unregister(1) // Should not fail + p.Unregister(0) + // p is now empty + + p.Register(id, id) + id++ + p.Register(id, id) + time.Sleep(300 * time.Millisecond) + id++ + p.Register(id, id) + time.Sleep(100 * time.Millisecond) + + // p has 0, 1, 2 (0 & 1 are aged) + vals := p.GetOutdated(200*time.Millisecond, "by outdated") + if len(vals) != 2 { + t.Errorf("want 2, got %v", len(vals)) + } + if v, err = p.Get(vals[0].(int64), "test1"); err.Error() != "in use: by outdated" { + t.Errorf("want 'in use: by outdated', got '%v'", err) + } + for _, v := range vals { + p.Put(v.(int64)) + } + time.Sleep(100 * time.Millisecond) + + // p has 0, 1, 2 (2 is idle) + vals = p.GetIdle(200*time.Millisecond, "by idle") + if len(vals) != 1 { + t.Errorf("want 1, got %v", len(vals)) + } + if v, err = p.Get(vals[0].(int64), "test1"); err.Error() != "in use: by idle" { + t.Errorf("want 'in use: by idle', got '%v'", err) + } + if vals[0].(int64) != 2 { + t.Errorf("want 2, got %v", vals[0]) + } + p.Unregister(vals[0].(int64)) + + // p has 0 & 1 + if p.Size() != 2 { + t.Errorf("want 2, got %v", p.Size()) + } + go func() { + p.Unregister(0) + p.Unregister(1) + }() + p.WaitForEmpty() +} diff --git a/vendor/github.com/ngaut/pools/resource_pool.go b/vendor/github.com/ngaut/pools/resource_pool.go new file mode 100644 index 0000000..b02cf04 --- /dev/null +++ b/vendor/github.com/ngaut/pools/resource_pool.go @@ -0,0 +1,228 @@ +// Copyright 2012, Google Inc. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package pools provides functionality to manage and reuse resources +// like connections. +package pools + +import ( + "fmt" + "time" + + "github.com/ngaut/sync2" +) + +var ( + CLOSED_ERR = fmt.Errorf("ResourcePool is closed") +) + +// Factory is a function that can be used to create a resource. +type Factory func() (Resource, error) + +// Every resource needs to suport the Resource interface. +// Thread synchronization between Close() and IsClosed() +// is the responsibility the caller. +type Resource interface { + Close() +} + +// ResourcePool allows you to use a pool of resources. +type ResourcePool struct { + resources chan resourceWrapper + factory Factory + capacity sync2.AtomicInt64 + idleTimeout sync2.AtomicDuration + + // stats + waitCount sync2.AtomicInt64 + waitTime sync2.AtomicDuration +} + +type resourceWrapper struct { + resource Resource + timeUsed time.Time +} + +// NewResourcePool creates a new ResourcePool pool. +// capacity is the initial capacity of the pool. +// maxCap is the maximum capacity. +// If a resource is unused beyond idleTimeout, it's discarded. +// An idleTimeout of 0 means that there is no timeout. +func NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Duration) *ResourcePool { + if capacity <= 0 || maxCap <= 0 || capacity > maxCap { + panic(fmt.Errorf("Invalid/out of range capacity")) + } + rp := &ResourcePool{ + resources: make(chan resourceWrapper, maxCap), + factory: factory, + capacity: sync2.AtomicInt64(capacity), + idleTimeout: sync2.AtomicDuration(idleTimeout), + } + for i := 0; i < capacity; i++ { + rp.resources <- resourceWrapper{} + } + return rp +} + +// Close empties the pool calling Close on all its resources. +// You can call Close while there are outstanding resources. +// It waits for all resources to be returned (Put). +// After a Close, Get and TryGet are not allowed. +func (rp *ResourcePool) Close() { + rp.SetCapacity(0) +} + +func (rp *ResourcePool) IsClosed() (closed bool) { + return rp.capacity.Get() == 0 +} + +// Get will return the next available resource. If capacity +// has not been reached, it will create a new one using the factory. Otherwise, +// it will indefinitely wait till the next resource becomes available. +func (rp *ResourcePool) Get() (resource Resource, err error) { + return rp.get(true) +} + +// TryGet will return the next available resource. If none is available, and capacity +// has not been reached, it will create a new one using the factory. Otherwise, +// it will return nil with no error. +func (rp *ResourcePool) TryGet() (resource Resource, err error) { + return rp.get(false) +} + +func (rp *ResourcePool) get(wait bool) (resource Resource, err error) { + // Fetch + var wrapper resourceWrapper + var ok bool + select { + case wrapper, ok = <-rp.resources: + default: + if !wait { + return nil, nil + } + startTime := time.Now() + wrapper, ok = <-rp.resources + rp.recordWait(startTime) + } + if !ok { + return nil, CLOSED_ERR + } + + // Unwrap + timeout := rp.idleTimeout.Get() + if wrapper.resource != nil && timeout > 0 && wrapper.timeUsed.Add(timeout).Sub(time.Now()) < 0 { + wrapper.resource.Close() + wrapper.resource = nil + } + if wrapper.resource == nil { + wrapper.resource, err = rp.factory() + if err != nil { + rp.resources <- resourceWrapper{} + } + } + return wrapper.resource, err +} + +// Put will return a resource to the pool. For every successful Get, +// a corresponding Put is required. If you no longer need a resource, +// you will need to call Put(nil) instead of returning the closed resource. +// The will eventually cause a new resource to be created in its place. +func (rp *ResourcePool) Put(resource Resource) { + var wrapper resourceWrapper + if resource != nil { + wrapper = resourceWrapper{resource, time.Now()} + } + select { + case rp.resources <- wrapper: + default: + panic(fmt.Errorf("Attempt to Put into a full ResourcePool")) + } +} + +// SetCapacity changes the capacity of the pool. +// You can use it to shrink or expand, but not beyond +// the max capacity. If the change requires the pool +// to be shrunk, SetCapacity waits till the necessary +// number of resources are returned to the pool. +// A SetCapacity of 0 is equivalent to closing the ResourcePool. +func (rp *ResourcePool) SetCapacity(capacity int) error { + if capacity < 0 || capacity > cap(rp.resources) { + return fmt.Errorf("capacity %d is out of range", capacity) + } + + // Atomically swap new capacity with old, but only + // if old capacity is non-zero. + var oldcap int + for { + oldcap = int(rp.capacity.Get()) + if oldcap == 0 { + return CLOSED_ERR + } + if oldcap == capacity { + return nil + } + if rp.capacity.CompareAndSwap(int64(oldcap), int64(capacity)) { + break + } + } + + if capacity < oldcap { + for i := 0; i < oldcap-capacity; i++ { + wrapper := <-rp.resources + if wrapper.resource != nil { + wrapper.resource.Close() + } + } + } else { + for i := 0; i < capacity-oldcap; i++ { + rp.resources <- resourceWrapper{} + } + } + if capacity == 0 { + close(rp.resources) + } + return nil +} + +func (rp *ResourcePool) recordWait(start time.Time) { + rp.waitCount.Add(1) + rp.waitTime.Add(time.Now().Sub(start)) +} + +func (rp *ResourcePool) SetIdleTimeout(idleTimeout time.Duration) { + rp.idleTimeout.Set(idleTimeout) +} + +func (rp *ResourcePool) StatsJSON() string { + c, a, mx, wc, wt, it := rp.Stats() + return fmt.Sprintf(`{"Capacity": %v, "Available": %v, "MaxCapacity": %v, "WaitCount": %v, "WaitTime": %v, "IdleTimeout": %v}`, c, a, mx, wc, int64(wt), int64(it)) +} + +func (rp *ResourcePool) Stats() (capacity, available, maxCap, waitCount int64, waitTime, idleTimeout time.Duration) { + return rp.Capacity(), rp.Available(), rp.MaxCap(), rp.WaitCount(), rp.WaitTime(), rp.IdleTimeout() +} + +func (rp *ResourcePool) Capacity() int64 { + return rp.capacity.Get() +} + +func (rp *ResourcePool) Available() int64 { + return int64(len(rp.resources)) +} + +func (rp *ResourcePool) MaxCap() int64 { + return int64(cap(rp.resources)) +} + +func (rp *ResourcePool) WaitCount() int64 { + return rp.waitCount.Get() +} + +func (rp *ResourcePool) WaitTime() time.Duration { + return rp.waitTime.Get() +} + +func (rp *ResourcePool) IdleTimeout() time.Duration { + return rp.idleTimeout.Get() +} diff --git a/vendor/github.com/ngaut/pools/resource_pool_test.go b/vendor/github.com/ngaut/pools/resource_pool_test.go new file mode 100644 index 0000000..27820dd --- /dev/null +++ b/vendor/github.com/ngaut/pools/resource_pool_test.go @@ -0,0 +1,487 @@ +// Copyright 2012, Google Inc. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package pools + +import ( + "errors" + "testing" + "time" + + "github.com/ngaut/sync2" +) + +var lastId, count sync2.AtomicInt64 + +type TestResource struct { + num int64 + closed bool +} + +func (tr *TestResource) Close() { + if !tr.closed { + count.Add(-1) + tr.closed = true + } +} + +func (tr *TestResource) IsClosed() bool { + return tr.closed +} + +func PoolFactory() (Resource, error) { + count.Add(1) + return &TestResource{lastId.Add(1), false}, nil +} + +func FailFactory() (Resource, error) { + return nil, errors.New("Failed") +} + +func SlowFailFactory() (Resource, error) { + time.Sleep(10 * time.Nanosecond) + return nil, errors.New("Failed") +} + +func TestOpen(t *testing.T) { + lastId.Set(0) + count.Set(0) + p := NewResourcePool(PoolFactory, 6, 6, time.Second) + p.SetCapacity(5) + var resources [10]Resource + + // Test Get + for i := 0; i < 5; i++ { + r, err := p.Get() + resources[i] = r + if err != nil { + t.Errorf("Unexpected error %v", err) + } + _, available, _, waitCount, waitTime, _ := p.Stats() + if available != int64(5-i-1) { + t.Errorf("expecting %d, received %d", 5-i-1, available) + } + if waitCount != 0 { + t.Errorf("expecting 0, received %d", waitCount) + } + if waitTime != 0 { + t.Errorf("expecting 0, received %d", waitTime) + } + if lastId.Get() != int64(i+1) { + t.Errorf("Expecting %d, received %d", i+1, lastId.Get()) + } + if count.Get() != int64(i+1) { + t.Errorf("Expecting %d, received %d", i+1, count.Get()) + } + } + + // Test TryGet + r, err := p.TryGet() + if err != nil { + t.Errorf("Unexpected error %v", err) + } + if r != nil { + t.Errorf("Expecting nil") + } + for i := 0; i < 5; i++ { + p.Put(resources[i]) + _, available, _, _, _, _ := p.Stats() + if available != int64(i+1) { + t.Errorf("expecting %d, received %d", 5-i-1, available) + } + } + for i := 0; i < 5; i++ { + r, err := p.TryGet() + resources[i] = r + if err != nil { + t.Errorf("Unexpected error %v", err) + } + if r == nil { + t.Errorf("Expecting non-nil") + } + if lastId.Get() != 5 { + t.Errorf("Expecting 5, received %d", lastId.Get()) + } + if count.Get() != 5 { + t.Errorf("Expecting 5, received %d", count.Get()) + } + } + + // Test that Get waits + ch := make(chan bool) + go func() { + for i := 0; i < 5; i++ { + r, err := p.Get() + if err != nil { + t.Errorf("Get failed: %v", err) + } + resources[i] = r + } + for i := 0; i < 5; i++ { + p.Put(resources[i]) + } + ch <- true + }() + for i := 0; i < 5; i++ { + // Sleep to ensure the goroutine waits + time.Sleep(10 * time.Nanosecond) + p.Put(resources[i]) + } + <-ch + _, _, _, waitCount, waitTime, _ := p.Stats() + if waitCount != 5 { + t.Errorf("Expecting 5, received %d", waitCount) + } + if waitTime == 0 { + t.Errorf("Expecting non-zero") + } + if lastId.Get() != 5 { + t.Errorf("Expecting 5, received %d", lastId.Get()) + } + + // Test Close resource + r, err = p.Get() + if err != nil { + t.Errorf("Unexpected error %v", err) + } + r.Close() + p.Put(nil) + if count.Get() != 4 { + t.Errorf("Expecting 4, received %d", count.Get()) + } + for i := 0; i < 5; i++ { + r, err := p.Get() + if err != nil { + t.Errorf("Get failed: %v", err) + } + resources[i] = r + } + for i := 0; i < 5; i++ { + p.Put(resources[i]) + } + if count.Get() != 5 { + t.Errorf("Expecting 5, received %d", count.Get()) + } + if lastId.Get() != 6 { + t.Errorf("Expecting 6, received %d", lastId.Get()) + } + + // SetCapacity + p.SetCapacity(3) + if count.Get() != 3 { + t.Errorf("Expecting 3, received %d", count.Get()) + } + if lastId.Get() != 6 { + t.Errorf("Expecting 6, received %d", lastId.Get()) + } + capacity, available, _, _, _, _ := p.Stats() + if capacity != 3 { + t.Errorf("Expecting 3, received %d", capacity) + } + if available != 3 { + t.Errorf("Expecting 3, received %d", available) + } + p.SetCapacity(6) + capacity, available, _, _, _, _ = p.Stats() + if capacity != 6 { + t.Errorf("Expecting 6, received %d", capacity) + } + if available != 6 { + t.Errorf("Expecting 6, received %d", available) + } + for i := 0; i < 6; i++ { + r, err := p.Get() + if err != nil { + t.Errorf("Get failed: %v", err) + } + resources[i] = r + } + for i := 0; i < 6; i++ { + p.Put(resources[i]) + } + if count.Get() != 6 { + t.Errorf("Expecting 5, received %d", count.Get()) + } + if lastId.Get() != 9 { + t.Errorf("Expecting 9, received %d", lastId.Get()) + } + + // Close + p.Close() + capacity, available, _, _, _, _ = p.Stats() + if capacity != 0 { + t.Errorf("Expecting 0, received %d", capacity) + } + if available != 0 { + t.Errorf("Expecting 0, received %d", available) + } + if count.Get() != 0 { + t.Errorf("Expecting 0, received %d", count.Get()) + } +} + +func TestShrinking(t *testing.T) { + lastId.Set(0) + count.Set(0) + p := NewResourcePool(PoolFactory, 5, 5, time.Second) + var resources [10]Resource + // Leave one empty slot in the pool + for i := 0; i < 4; i++ { + r, err := p.Get() + if err != nil { + t.Errorf("Get failed: %v", err) + } + resources[i] = r + } + go p.SetCapacity(3) + time.Sleep(10 * time.Nanosecond) + stats := p.StatsJSON() + expected := `{"Capacity": 3, "Available": 0, "MaxCapacity": 5, "WaitCount": 0, "WaitTime": 0, "IdleTimeout": 1000000000}` + if stats != expected { + t.Errorf(`expecting '%s', received '%s'`, expected, stats) + } + + // TryGet is allowed when shrinking + r, err := p.TryGet() + if err != nil { + t.Errorf("Unexpected error %v", err) + } + if r != nil { + t.Errorf("Expecting nil") + } + + // Get is allowed when shrinking, but it will wait + getdone := make(chan bool) + go func() { + r, err := p.Get() + if err != nil { + t.Errorf("Unexpected error %v", err) + } + p.Put(r) + getdone <- true + }() + + // Put is allowed when shrinking. It's necessary. + for i := 0; i < 4; i++ { + p.Put(resources[i]) + } + // Wait for Get test to complete + <-getdone + stats = p.StatsJSON() + expected = `{"Capacity": 3, "Available": 3, "MaxCapacity": 5, "WaitCount": 0, "WaitTime": 0, "IdleTimeout": 1000000000}` + if stats != expected { + t.Errorf(`expecting '%s', received '%s'`, expected, stats) + } + if count.Get() != 3 { + t.Errorf("Expecting 3, received %d", count.Get()) + } + + // Ensure no deadlock if SetCapacity is called after we start + // waiting for a resource + for i := 0; i < 3; i++ { + resources[i], err = p.Get() + if err != nil { + t.Errorf("Unexpected error %v", err) + } + } + // This will wait because pool is empty + go func() { + r, err := p.Get() + if err != nil { + t.Errorf("Unexpected error %v", err) + } + p.Put(r) + getdone <- true + }() + time.Sleep(10 * time.Nanosecond) + + // This will wait till we Put + go p.SetCapacity(2) + time.Sleep(10 * time.Nanosecond) + + // This should not hang + for i := 0; i < 3; i++ { + p.Put(resources[i]) + } + <-getdone + capacity, available, _, _, _, _ := p.Stats() + if capacity != 2 { + t.Errorf("Expecting 2, received %d", capacity) + } + if available != 2 { + t.Errorf("Expecting 2, received %d", available) + } + if count.Get() != 2 { + t.Errorf("Expecting 2, received %d", count.Get()) + } + + // Test race condition of SetCapacity with itself + p.SetCapacity(3) + for i := 0; i < 3; i++ { + resources[i], err = p.Get() + if err != nil { + t.Errorf("Unexpected error %v", err) + } + } + // This will wait because pool is empty + go func() { + r, err := p.Get() + if err != nil { + t.Errorf("Unexpected error %v", err) + } + p.Put(r) + getdone <- true + }() + time.Sleep(10 * time.Nanosecond) + + // This will wait till we Put + go p.SetCapacity(2) + time.Sleep(10 * time.Nanosecond) + go p.SetCapacity(4) + time.Sleep(10 * time.Nanosecond) + + // This should not hang + for i := 0; i < 3; i++ { + p.Put(resources[i]) + } + <-getdone + + err = p.SetCapacity(-1) + if err == nil { + t.Errorf("Expecting error") + } + err = p.SetCapacity(255555) + if err == nil { + t.Errorf("Expecting error") + } + + capacity, available, _, _, _, _ = p.Stats() + if capacity != 4 { + t.Errorf("Expecting 4, received %d", capacity) + } + if available != 4 { + t.Errorf("Expecting 4, received %d", available) + } +} + +func TestClosing(t *testing.T) { + lastId.Set(0) + count.Set(0) + p := NewResourcePool(PoolFactory, 5, 5, time.Second) + var resources [10]Resource + for i := 0; i < 5; i++ { + r, err := p.Get() + if err != nil { + t.Errorf("Get failed: %v", err) + } + resources[i] = r + } + ch := make(chan bool) + go func() { + p.Close() + ch <- true + }() + + // Wait for goroutine to call Close + time.Sleep(10 * time.Nanosecond) + stats := p.StatsJSON() + expected := `{"Capacity": 0, "Available": 0, "MaxCapacity": 5, "WaitCount": 0, "WaitTime": 0, "IdleTimeout": 1000000000}` + if stats != expected { + t.Errorf(`expecting '%s', received '%s'`, expected, stats) + } + + // Put is allowed when closing + for i := 0; i < 5; i++ { + p.Put(resources[i]) + } + + // Wait for Close to return + <-ch + + // SetCapacity must be ignored after Close + err := p.SetCapacity(1) + if err == nil { + t.Errorf("expecting error") + } + + stats = p.StatsJSON() + expected = `{"Capacity": 0, "Available": 0, "MaxCapacity": 5, "WaitCount": 0, "WaitTime": 0, "IdleTimeout": 1000000000}` + if stats != expected { + t.Errorf(`expecting '%s', received '%s'`, expected, stats) + } + if lastId.Get() != 5 { + t.Errorf("Expecting 5, received %d", count.Get()) + } + if count.Get() != 0 { + t.Errorf("Expecting 0, received %d", count.Get()) + } +} + +func TestIdleTimeout(t *testing.T) { + lastId.Set(0) + count.Set(0) + p := NewResourcePool(PoolFactory, 1, 1, 10*time.Nanosecond) + defer p.Close() + + r, err := p.Get() + if err != nil { + t.Errorf("Unexpected error %v", err) + } + p.Put(r) + if lastId.Get() != 1 { + t.Errorf("Expecting 1, received %d", count.Get()) + } + if count.Get() != 1 { + t.Errorf("Expecting 1, received %d", count.Get()) + } + time.Sleep(20 * time.Nanosecond) + r, err = p.Get() + if err != nil { + t.Errorf("Unexpected error %v", err) + } + if lastId.Get() != 2 { + t.Errorf("Expecting 2, received %d", count.Get()) + } + if count.Get() != 1 { + t.Errorf("Expecting 1, received %d", count.Get()) + } + p.Put(r) +} + +func TestCreateFail(t *testing.T) { + lastId.Set(0) + count.Set(0) + p := NewResourcePool(FailFactory, 5, 5, time.Second) + defer p.Close() + if _, err := p.Get(); err.Error() != "Failed" { + t.Errorf("Expecting Failed, received %v", err) + } + stats := p.StatsJSON() + expected := `{"Capacity": 5, "Available": 5, "MaxCapacity": 5, "WaitCount": 0, "WaitTime": 0, "IdleTimeout": 1000000000}` + if stats != expected { + t.Errorf(`expecting '%s', received '%s'`, expected, stats) + } +} + +func TestSlowCreateFail(t *testing.T) { + lastId.Set(0) + count.Set(0) + p := NewResourcePool(SlowFailFactory, 2, 2, time.Second) + defer p.Close() + ch := make(chan bool) + // The third Get should not wait indefinitely + for i := 0; i < 3; i++ { + go func() { + p.Get() + ch <- true + }() + } + for i := 0; i < 3; i++ { + <-ch + } + _, available, _, _, _, _ := p.Stats() + if available != 2 { + t.Errorf("Expecting 2, received %d", available) + } +} diff --git a/vendor/github.com/ngaut/pools/roundrobin.go b/vendor/github.com/ngaut/pools/roundrobin.go new file mode 100644 index 0000000..b06985f --- /dev/null +++ b/vendor/github.com/ngaut/pools/roundrobin.go @@ -0,0 +1,214 @@ +// Copyright 2012, Google Inc. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package pools + +import ( + "fmt" + "sync" + "time" +) + +// RoundRobin is deprecated. Use ResourcePool instead. +// RoundRobin allows you to use a pool of resources in a round robin fashion. +type RoundRobin struct { + mu sync.Mutex + available *sync.Cond + resources chan fifoWrapper + size int64 + factory Factory + idleTimeout time.Duration + + // stats + waitCount int64 + waitTime time.Duration +} + +type fifoWrapper struct { + resource Resource + timeUsed time.Time +} + +// NewRoundRobin creates a new RoundRobin pool. +// capacity is the maximum number of resources RoundRobin will create. +// factory will be the function used to create resources. +// If a resource is unused beyond idleTimeout, it's discarded. +func NewRoundRobin(capacity int, idleTimeout time.Duration) *RoundRobin { + r := &RoundRobin{ + resources: make(chan fifoWrapper, capacity), + size: 0, + idleTimeout: idleTimeout, + } + r.available = sync.NewCond(&r.mu) + return r +} + +// Open starts allowing the creation of resources +func (rr *RoundRobin) Open(factory Factory) { + rr.mu.Lock() + defer rr.mu.Unlock() + rr.factory = factory +} + +// Close empties the pool calling Close on all its resources. +// It waits for all resources to be returned (Put). +func (rr *RoundRobin) Close() { + rr.mu.Lock() + defer rr.mu.Unlock() + for rr.size > 0 { + select { + case fw := <-rr.resources: + go fw.resource.Close() + rr.size-- + default: + rr.available.Wait() + } + } + rr.factory = nil +} + +func (rr *RoundRobin) IsClosed() bool { + return rr.factory == nil +} + +// Get will return the next available resource. If none is available, and capacity +// has not been reached, it will create a new one using the factory. Otherwise, +// it will indefinitely wait till the next resource becomes available. +func (rr *RoundRobin) Get() (resource Resource, err error) { + return rr.get(true) +} + +// TryGet will return the next available resource. If none is available, and capacity +// has not been reached, it will create a new one using the factory. Otherwise, +// it will return nil with no error. +func (rr *RoundRobin) TryGet() (resource Resource, err error) { + return rr.get(false) +} + +func (rr *RoundRobin) get(wait bool) (resource Resource, err error) { + rr.mu.Lock() + defer rr.mu.Unlock() + // Any waits in this loop will release the lock, and it will be + // reacquired before the waits return. + for { + select { + case fw := <-rr.resources: + // Found a free resource in the channel + if rr.idleTimeout > 0 && fw.timeUsed.Add(rr.idleTimeout).Sub(time.Now()) < 0 { + // resource has been idle for too long. Discard & go for next. + go fw.resource.Close() + rr.size-- + // Nobody else should be waiting, but signal anyway. + rr.available.Signal() + continue + } + return fw.resource, nil + default: + // resource channel is empty + if rr.size >= int64(cap(rr.resources)) { + // The pool is full + if wait { + start := time.Now() + rr.available.Wait() + rr.recordWait(start) + continue + } + return nil, nil + } + // Pool is not full. Create a resource. + if resource, err = rr.waitForCreate(); err != nil { + // size was decremented, and somebody could be waiting. + rr.available.Signal() + return nil, err + } + // Creation successful. Account for this by incrementing size. + rr.size++ + return resource, err + } + } +} + +func (rr *RoundRobin) recordWait(start time.Time) { + rr.waitCount++ + rr.waitTime += time.Now().Sub(start) +} + +func (rr *RoundRobin) waitForCreate() (resource Resource, err error) { + // Prevent thundering herd: increment size before creating resource, and decrement after. + rr.size++ + rr.mu.Unlock() + defer func() { + rr.mu.Lock() + rr.size-- + }() + return rr.factory() +} + +// Put will return a resource to the pool. You MUST return every resource to the pool, +// even if it's closed. If a resource is closed, you should call Put(nil). +func (rr *RoundRobin) Put(resource Resource) { + rr.mu.Lock() + defer rr.available.Signal() + defer rr.mu.Unlock() + + if rr.size > int64(cap(rr.resources)) { + if resource != nil { + go resource.Close() + } + rr.size-- + } else if resource == nil { + rr.size-- + } else { + if len(rr.resources) == cap(rr.resources) { + panic("unexpected") + } + rr.resources <- fifoWrapper{resource, time.Now()} + } +} + +// Set capacity changes the capacity of the pool. +// You can use it to expand or shrink. +func (rr *RoundRobin) SetCapacity(capacity int) error { + rr.mu.Lock() + defer rr.available.Broadcast() + defer rr.mu.Unlock() + + nr := make(chan fifoWrapper, capacity) + // This loop transfers resources from the old channel + // to the new one, until it fills up or runs out. + // It discards extras, if any. + for { + select { + case fw := <-rr.resources: + if len(nr) < cap(nr) { + nr <- fw + } else { + go fw.resource.Close() + rr.size-- + } + continue + default: + } + break + } + rr.resources = nr + return nil +} + +func (rr *RoundRobin) SetIdleTimeout(idleTimeout time.Duration) { + rr.mu.Lock() + defer rr.mu.Unlock() + rr.idleTimeout = idleTimeout +} + +func (rr *RoundRobin) StatsJSON() string { + s, c, a, wc, wt, it := rr.Stats() + return fmt.Sprintf("{\"Size\": %v, \"Capacity\": %v, \"Available\": %v, \"WaitCount\": %v, \"WaitTime\": %v, \"IdleTimeout\": %v}", s, c, a, wc, int64(wt), int64(it)) +} + +func (rr *RoundRobin) Stats() (size, capacity, available, waitCount int64, waitTime, idleTimeout time.Duration) { + rr.mu.Lock() + defer rr.mu.Unlock() + return rr.size, int64(cap(rr.resources)), int64(len(rr.resources)), rr.waitCount, rr.waitTime, rr.idleTimeout +} diff --git a/vendor/github.com/ngaut/pools/roundrobin_test.go b/vendor/github.com/ngaut/pools/roundrobin_test.go new file mode 100644 index 0000000..870503d --- /dev/null +++ b/vendor/github.com/ngaut/pools/roundrobin_test.go @@ -0,0 +1,126 @@ +// Copyright 2012, Google Inc. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package pools + +import ( + "testing" + "time" +) + +func TestPool(t *testing.T) { + lastId.Set(0) + p := NewRoundRobin(5, time.Duration(10e9)) + p.Open(PoolFactory) + defer p.Close() + + for i := 0; i < 2; i++ { + r, err := p.TryGet() + if err != nil { + t.Errorf("TryGet failed: %v", err) + } + if r.(*TestResource).num != 1 { + t.Errorf("Expecting 1, received %d", r.(*TestResource).num) + } + p.Put(r) + } + // p = [1] + + all := make([]Resource, 5) + for i := 0; i < 5; i++ { + if all[i], _ = p.TryGet(); all[i] == nil { + t.Errorf("TryGet failed with nil") + } + } + // all = [1-5], p is empty + if none, _ := p.TryGet(); none != nil { + t.Errorf("TryGet failed with non-nil") + } + + ch := make(chan bool) + go ResourceWait(p, t, ch) + time.Sleep(1e8) + for i := 0; i < 5; i++ { + p.Put(all[i]) + } + // p = [1-5] + <-ch + // p = [1-5] + if p.waitCount != 1 { + t.Errorf("Expecting 1, received %d", p.waitCount) + } + + for i := 0; i < 5; i++ { + all[i], _ = p.Get() + } + // all = [1-5], p is empty + all[0].(*TestResource).Close() + p.Put(nil) + for i := 1; i < 5; i++ { + p.Put(all[i]) + } + // p = [2-5] + + for i := 0; i < 4; i++ { + r, _ := p.Get() + if r.(*TestResource).num != int64(i+2) { + t.Errorf("Expecting %d, received %d", i+2, r.(*TestResource).num) + } + p.Put(r) + } + + p.SetCapacity(3) + // p = [2-4] + if p.size != 3 { + t.Errorf("Expecting 3, received %d", p.size) + } + + p.SetIdleTimeout(time.Duration(1e8)) + time.Sleep(2e8) + r, _ := p.Get() + if r.(*TestResource).num != 6 { + t.Errorf("Expecting 6, received %d", r.(*TestResource).num) + } + p.Put(r) + // p = [6] +} + +func TestPoolFail(t *testing.T) { + p := NewRoundRobin(5, time.Duration(10e9)) + p.Open(FailFactory) + defer p.Close() + if _, err := p.Get(); err.Error() != "Failed" { + t.Errorf("Expecting Failed, received %v", err) + } +} + +func TestPoolFullFail(t *testing.T) { + p := NewRoundRobin(2, time.Duration(10e9)) + p.Open(SlowFailFactory) + defer p.Close() + ch := make(chan bool) + // The third get should not wait indefinitely + for i := 0; i < 3; i++ { + go func() { + p.Get() + ch <- true + }() + } + for i := 0; i < 3; i++ { + <-ch + } +} + +func ResourceWait(p *RoundRobin, t *testing.T, ch chan bool) { + for i := 0; i < 5; i++ { + if r, err := p.Get(); err != nil { + t.Errorf("TryGet failed: %v", err) + } else if r.(*TestResource).num != int64(i+1) { + t.Errorf("Expecting %d, received %d", i+1, r.(*TestResource).num) + } else { + p.Put(r) + } + } + ch <- true +} diff --git a/vendor/github.com/ngaut/pools/vitess_license b/vendor/github.com/ngaut/pools/vitess_license new file mode 100644 index 0000000..989d02e --- /dev/null +++ b/vendor/github.com/ngaut/pools/vitess_license @@ -0,0 +1,28 @@ +Copyright 2012, Google Inc. +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/github.com/ngaut/sync2/atomic.go b/vendor/github.com/ngaut/sync2/atomic.go new file mode 100644 index 0000000..909f3b1 --- /dev/null +++ b/vendor/github.com/ngaut/sync2/atomic.go @@ -0,0 +1,114 @@ +// Copyright 2013, Google Inc. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync2 + +import ( + "sync" + "sync/atomic" + "time" +) + +type AtomicInt32 int32 + +func (i *AtomicInt32) Add(n int32) int32 { + return atomic.AddInt32((*int32)(i), n) +} + +func (i *AtomicInt32) Set(n int32) { + atomic.StoreInt32((*int32)(i), n) +} + +func (i *AtomicInt32) Get() int32 { + return atomic.LoadInt32((*int32)(i)) +} + +func (i *AtomicInt32) CompareAndSwap(oldval, newval int32) (swapped bool) { + return atomic.CompareAndSwapInt32((*int32)(i), oldval, newval) +} + +type AtomicUint32 uint32 + +func (i *AtomicUint32) Add(n uint32) uint32 { + return atomic.AddUint32((*uint32)(i), n) +} + +func (i *AtomicUint32) Set(n uint32) { + atomic.StoreUint32((*uint32)(i), n) +} + +func (i *AtomicUint32) Get() uint32 { + return atomic.LoadUint32((*uint32)(i)) +} + +func (i *AtomicUint32) CompareAndSwap(oldval, newval uint32) (swapped bool) { + return atomic.CompareAndSwapUint32((*uint32)(i), oldval, newval) +} + +type AtomicInt64 int64 + +func (i *AtomicInt64) Add(n int64) int64 { + return atomic.AddInt64((*int64)(i), n) +} + +func (i *AtomicInt64) Set(n int64) { + atomic.StoreInt64((*int64)(i), n) +} + +func (i *AtomicInt64) Get() int64 { + return atomic.LoadInt64((*int64)(i)) +} + +func (i *AtomicInt64) CompareAndSwap(oldval, newval int64) (swapped bool) { + return atomic.CompareAndSwapInt64((*int64)(i), oldval, newval) +} + +type AtomicDuration int64 + +func (d *AtomicDuration) Add(duration time.Duration) time.Duration { + return time.Duration(atomic.AddInt64((*int64)(d), int64(duration))) +} + +func (d *AtomicDuration) Set(duration time.Duration) { + atomic.StoreInt64((*int64)(d), int64(duration)) +} + +func (d *AtomicDuration) Get() time.Duration { + return time.Duration(atomic.LoadInt64((*int64)(d))) +} + +func (d *AtomicDuration) CompareAndSwap(oldval, newval time.Duration) (swapped bool) { + return atomic.CompareAndSwapInt64((*int64)(d), int64(oldval), int64(newval)) +} + +// AtomicString gives you atomic-style APIs for string, but +// it's only a convenience wrapper that uses a mutex. So, it's +// not as efficient as the rest of the atomic types. +type AtomicString struct { + mu sync.Mutex + str string +} + +func (s *AtomicString) Set(str string) { + s.mu.Lock() + s.str = str + s.mu.Unlock() +} + +func (s *AtomicString) Get() string { + s.mu.Lock() + str := s.str + s.mu.Unlock() + return str +} + +func (s *AtomicString) CompareAndSwap(oldval, newval string) (swqpped bool) { + s.mu.Lock() + defer s.mu.Unlock() + if s.str == oldval { + s.str = newval + return true + } + return false +} diff --git a/vendor/github.com/ngaut/sync2/atomic_test.go b/vendor/github.com/ngaut/sync2/atomic_test.go new file mode 100644 index 0000000..7261159 --- /dev/null +++ b/vendor/github.com/ngaut/sync2/atomic_test.go @@ -0,0 +1,32 @@ +// Copyright 2013, Google Inc. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync2 + +import ( + "testing" +) + +func TestAtomicString(t *testing.T) { + var s AtomicString + if s.Get() != "" { + t.Errorf("want empty, got %s", s.Get()) + } + s.Set("a") + if s.Get() != "a" { + t.Errorf("want a, got %s", s.Get()) + } + if s.CompareAndSwap("b", "c") { + t.Errorf("want false, got true") + } + if s.Get() != "a" { + t.Errorf("want a, got %s", s.Get()) + } + if !s.CompareAndSwap("a", "c") { + t.Errorf("want true, got false") + } + if s.Get() != "c" { + t.Errorf("want c, got %s", s.Get()) + } +} diff --git a/vendor/github.com/ngaut/sync2/cond.go b/vendor/github.com/ngaut/sync2/cond.go new file mode 100644 index 0000000..dea11ae --- /dev/null +++ b/vendor/github.com/ngaut/sync2/cond.go @@ -0,0 +1,56 @@ +// Copyright 2013, Google Inc. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync2 + +import ( + "sync" +) + +// Cond is an alternate implementation of sync.Cond +type Cond struct { + L sync.Locker + sema chan struct{} + waiters AtomicInt64 +} + +func NewCond(l sync.Locker) *Cond { + return &Cond{L: l, sema: make(chan struct{})} +} + +func (c *Cond) Wait() { + c.waiters.Add(1) + c.L.Unlock() + <-c.sema + c.L.Lock() +} + +func (c *Cond) Signal() { + for { + w := c.waiters.Get() + if w == 0 { + return + } + if c.waiters.CompareAndSwap(w, w-1) { + break + } + } + c.sema <- struct{}{} +} + +func (c *Cond) Broadcast() { + var w int64 + for { + w = c.waiters.Get() + if w == 0 { + return + } + if c.waiters.CompareAndSwap(w, 0) { + break + } + } + for i := int64(0); i < w; i++ { + c.sema <- struct{}{} + } +} diff --git a/vendor/github.com/ngaut/sync2/cond_test.go b/vendor/github.com/ngaut/sync2/cond_test.go new file mode 100644 index 0000000..4dd3929 --- /dev/null +++ b/vendor/github.com/ngaut/sync2/cond_test.go @@ -0,0 +1,276 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. +package sync2 + +import ( + "fmt" + "runtime" + "sync" + "testing" +) + +func TestCondSignal(t *testing.T) { + var m sync.Mutex + c := NewCond(&m) + n := 2 + running := make(chan bool, n) + awake := make(chan bool, n) + for i := 0; i < n; i++ { + go func() { + m.Lock() + running <- true + c.Wait() + awake <- true + m.Unlock() + }() + } + for i := 0; i < n; i++ { + <-running // Wait for everyone to run. + } + for n > 0 { + select { + case <-awake: + t.Fatal("goroutine not asleep") + default: + } + m.Lock() + c.Signal() + m.Unlock() + <-awake // Will deadlock if no goroutine wakes up + select { + case <-awake: + t.Fatal("too many goroutines awake") + default: + } + n-- + } + c.Signal() +} + +func TestCondSignalGenerations(t *testing.T) { + var m sync.Mutex + c := NewCond(&m) + n := 100 + running := make(chan bool, n) + awake := make(chan int, n) + for i := 0; i < n; i++ { + go func(i int) { + m.Lock() + running <- true + c.Wait() + awake <- i + m.Unlock() + }(i) + if i > 0 { + a := <-awake + if a != i-1 { + t.Fatalf("wrong goroutine woke up: want %d, got %d", i-1, a) + } + } + <-running + m.Lock() + c.Signal() + m.Unlock() + } +} + +func TestCondBroadcast(t *testing.T) { + var m sync.Mutex + c := NewCond(&m) + n := 200 + running := make(chan int, n) + awake := make(chan int, n) + exit := false + for i := 0; i < n; i++ { + go func(g int) { + m.Lock() + for !exit { + running <- g + c.Wait() + awake <- g + } + m.Unlock() + }(i) + } + for i := 0; i < n; i++ { + for i := 0; i < n; i++ { + <-running // Will deadlock unless n are running. + } + if i == n-1 { + m.Lock() + exit = true + m.Unlock() + } + select { + case <-awake: + t.Fatal("goroutine not asleep") + default: + } + m.Lock() + c.Broadcast() + m.Unlock() + seen := make([]bool, n) + for i := 0; i < n; i++ { + g := <-awake + if seen[g] { + t.Fatal("goroutine woke up twice") + } + seen[g] = true + } + } + select { + case <-running: + t.Fatal("goroutine did not exit") + default: + } + c.Broadcast() +} + +func TestRace(t *testing.T) { + x := 0 + c := NewCond(&sync.Mutex{}) + done := make(chan bool) + go func() { + c.L.Lock() + x = 1 + c.Wait() + if x != 2 { + t.Fatal("want 2") + } + x = 3 + c.Signal() + c.L.Unlock() + done <- true + }() + go func() { + c.L.Lock() + for { + if x == 1 { + x = 2 + c.Signal() + break + } + c.L.Unlock() + runtime.Gosched() + c.L.Lock() + } + c.L.Unlock() + done <- true + }() + go func() { + c.L.Lock() + for { + if x == 2 { + c.Wait() + if x != 3 { + t.Fatal("want 3") + } + break + } + if x == 3 { + break + } + c.L.Unlock() + runtime.Gosched() + c.L.Lock() + } + c.L.Unlock() + done <- true + }() + <-done + <-done + <-done +} + +// Bench: Rename this function to TestBench for running benchmarks +func Bench(t *testing.T) { + waitvals := []int{1, 2, 4, 8} + maxprocs := []int{1, 2, 4} + fmt.Printf("procs\twaiters\told\tnew\tdelta\n") + for _, procs := range maxprocs { + runtime.GOMAXPROCS(procs) + for _, waiters := range waitvals { + oldbench := func(b *testing.B) { + benchmarkCond(b, waiters) + } + oldbr := testing.Benchmark(oldbench) + newbench := func(b *testing.B) { + benchmarkCond2(b, waiters) + } + newbr := testing.Benchmark(newbench) + oldns := oldbr.NsPerOp() + newns := newbr.NsPerOp() + percent := float64(newns-oldns) * 100.0 / float64(oldns) + fmt.Printf("%d\t%d\t%d\t%d\t%6.2f%%\n", procs, waiters, oldns, newns, percent) + } + } +} + +func benchmarkCond2(b *testing.B, waiters int) { + c := NewCond(&sync.Mutex{}) + done := make(chan bool) + id := 0 + + for routine := 0; routine < waiters+1; routine++ { + go func() { + for i := 0; i < b.N; i++ { + c.L.Lock() + if id == -1 { + c.L.Unlock() + break + } + id++ + if id == waiters+1 { + id = 0 + c.Broadcast() + } else { + c.Wait() + } + c.L.Unlock() + } + c.L.Lock() + id = -1 + c.Broadcast() + c.L.Unlock() + done <- true + }() + } + for routine := 0; routine < waiters+1; routine++ { + <-done + } +} + +func benchmarkCond(b *testing.B, waiters int) { + c := sync.NewCond(&sync.Mutex{}) + done := make(chan bool) + id := 0 + + for routine := 0; routine < waiters+1; routine++ { + go func() { + for i := 0; i < b.N; i++ { + c.L.Lock() + if id == -1 { + c.L.Unlock() + break + } + id++ + if id == waiters+1 { + id = 0 + c.Broadcast() + } else { + c.Wait() + } + c.L.Unlock() + } + c.L.Lock() + id = -1 + c.Broadcast() + c.L.Unlock() + done <- true + }() + } + for routine := 0; routine < waiters+1; routine++ { + <-done + } +} diff --git a/vendor/github.com/ngaut/sync2/semaphore.go b/vendor/github.com/ngaut/sync2/semaphore.go new file mode 100644 index 0000000..190a27d --- /dev/null +++ b/vendor/github.com/ngaut/sync2/semaphore.go @@ -0,0 +1,55 @@ +// Copyright 2012, Google Inc. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync2 + +// What's in a name? Channels have all you need to emulate a counting +// semaphore with a boatload of extra functionality. However, in some +// cases, you just want a familiar API. + +import ( + "time" +) + +// Semaphore is a counting semaphore with the option to +// specify a timeout. +type Semaphore struct { + slots chan struct{} + timeout time.Duration +} + +// NewSemaphore creates a Semaphore. The count parameter must be a positive +// number. A timeout of zero means that there is no timeout. +func NewSemaphore(count int, timeout time.Duration) *Semaphore { + sem := &Semaphore{ + slots: make(chan struct{}, count), + timeout: timeout, + } + for i := 0; i < count; i++ { + sem.slots <- struct{}{} + } + return sem +} + +// Acquire returns true on successful acquisition, and +// false on a timeout. +func (sem *Semaphore) Acquire() bool { + if sem.timeout == 0 { + <-sem.slots + return true + } + select { + case <-sem.slots: + return true + case <-time.After(sem.timeout): + return false + } +} + +// Release releases the acquired semaphore. You must +// not release more than the number of semaphores you've +// acquired. +func (sem *Semaphore) Release() { + sem.slots <- struct{}{} +} diff --git a/vendor/github.com/ngaut/sync2/semaphore_test.go b/vendor/github.com/ngaut/sync2/semaphore_test.go new file mode 100644 index 0000000..207c8f9 --- /dev/null +++ b/vendor/github.com/ngaut/sync2/semaphore_test.go @@ -0,0 +1,41 @@ +// Copyright 2012, Google Inc. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync2 + +import ( + "testing" + "time" +) + +func TestSemaNoTimeout(t *testing.T) { + s := NewSemaphore(1, 0) + s.Acquire() + released := false + go func() { + time.Sleep(10 * time.Millisecond) + released = true + s.Release() + }() + s.Acquire() + if !released { + t.Errorf("want true, got false") + } +} + +func TestSemaTimeout(t *testing.T) { + s := NewSemaphore(1, 5*time.Millisecond) + s.Acquire() + go func() { + time.Sleep(10 * time.Millisecond) + s.Release() + }() + if ok := s.Acquire(); ok { + t.Errorf("want false, got true") + } + time.Sleep(10 * time.Millisecond) + if ok := s.Acquire(); !ok { + t.Errorf("want true, got false") + } +} diff --git a/vendor/github.com/ngaut/sync2/service_manager.go b/vendor/github.com/ngaut/sync2/service_manager.go new file mode 100644 index 0000000..4b85e01 --- /dev/null +++ b/vendor/github.com/ngaut/sync2/service_manager.go @@ -0,0 +1,121 @@ +// Copyright 2013, Google Inc. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync2 + +import ( + "sync" +) + +// These are the three predefined states of a service. +const ( + SERVICE_STOPPED = iota + SERVICE_RUNNING + SERVICE_SHUTTING_DOWN +) + +var stateNames = []string{ + "Stopped", + "Running", + "ShuttingDown", +} + +// ServiceManager manages the state of a service through its lifecycle. +type ServiceManager struct { + mu sync.Mutex + wg sync.WaitGroup + err error // err is the error returned from the service function. + state AtomicInt64 + // shutdown is created when the service starts and is closed when the service + // enters the SERVICE_SHUTTING_DOWN state. + shutdown chan struct{} +} + +// Go tries to change the state from SERVICE_STOPPED to SERVICE_RUNNING. +// +// If the current state is not SERVICE_STOPPED (already running), it returns +// false immediately. +// +// On successful transition, it launches the service as a goroutine and returns +// true. The service function is responsible for returning on its own when +// requested, either by regularly checking svc.IsRunning(), or by waiting for +// the svc.ShuttingDown channel to be closed. +// +// When the service func returns, the state is reverted to SERVICE_STOPPED. +func (svm *ServiceManager) Go(service func(svc *ServiceContext) error) bool { + svm.mu.Lock() + defer svm.mu.Unlock() + if !svm.state.CompareAndSwap(SERVICE_STOPPED, SERVICE_RUNNING) { + return false + } + svm.wg.Add(1) + svm.err = nil + svm.shutdown = make(chan struct{}) + go func() { + svm.err = service(&ServiceContext{ShuttingDown: svm.shutdown}) + svm.state.Set(SERVICE_STOPPED) + svm.wg.Done() + }() + return true +} + +// Stop tries to change the state from SERVICE_RUNNING to SERVICE_SHUTTING_DOWN. +// If the current state is not SERVICE_RUNNING, it returns false immediately. +// On successul transition, it waits for the service to finish, and returns true. +// You are allowed to Go() again after a Stop(). +func (svm *ServiceManager) Stop() bool { + svm.mu.Lock() + defer svm.mu.Unlock() + if !svm.state.CompareAndSwap(SERVICE_RUNNING, SERVICE_SHUTTING_DOWN) { + return false + } + // Signal the service that we've transitioned to SERVICE_SHUTTING_DOWN. + close(svm.shutdown) + svm.shutdown = nil + svm.wg.Wait() + return true +} + +// Wait waits for the service to terminate if it's currently running. +func (svm *ServiceManager) Wait() { + svm.wg.Wait() +} + +// Join waits for the service to terminate and returns the value returned by the +// service function. +func (svm *ServiceManager) Join() error { + svm.wg.Wait() + return svm.err +} + +// State returns the current state of the service. +// This should only be used to report the current state. +func (svm *ServiceManager) State() int64 { + return svm.state.Get() +} + +// StateName returns the name of the current state. +func (svm *ServiceManager) StateName() string { + return stateNames[svm.State()] +} + +// ServiceContext is passed into the service function to give it access to +// information about the running service. +type ServiceContext struct { + // ShuttingDown is a channel that the service can select on to be notified + // when it should shut down. The channel is closed when the state transitions + // from SERVICE_RUNNING to SERVICE_SHUTTING_DOWN. + ShuttingDown chan struct{} +} + +// IsRunning returns true if the ServiceContext.ShuttingDown channel has not +// been closed yet. +func (svc *ServiceContext) IsRunning() bool { + select { + case <-svc.ShuttingDown: + return false + default: + return true + } +} diff --git a/vendor/github.com/ngaut/sync2/service_manager_test.go b/vendor/github.com/ngaut/sync2/service_manager_test.go new file mode 100644 index 0000000..a41912e --- /dev/null +++ b/vendor/github.com/ngaut/sync2/service_manager_test.go @@ -0,0 +1,176 @@ +// Copyright 2013, Google Inc. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync2 + +import ( + "fmt" + "testing" + "time" +) + +type testService struct { + activated AtomicInt64 + t *testing.T +} + +func (ts *testService) service(svc *ServiceContext) error { + if !ts.activated.CompareAndSwap(0, 1) { + ts.t.Fatalf("service called more than once") + } + for svc.IsRunning() { + time.Sleep(10 * time.Millisecond) + + } + if !ts.activated.CompareAndSwap(1, 0) { + ts.t.Fatalf("service ended more than once") + } + return nil +} + +func (ts *testService) selectService(svc *ServiceContext) error { + if !ts.activated.CompareAndSwap(0, 1) { + ts.t.Fatalf("service called more than once") + } +serviceLoop: + for svc.IsRunning() { + select { + case <-time.After(1 * time.Second): + ts.t.Errorf("service didn't stop when shutdown channel was closed") + case <-svc.ShuttingDown: + break serviceLoop + } + } + if !ts.activated.CompareAndSwap(1, 0) { + ts.t.Fatalf("service ended more than once") + } + return nil +} + +func TestServiceManager(t *testing.T) { + ts := &testService{t: t} + var sm ServiceManager + if sm.StateName() != "Stopped" { + t.Errorf("want Stopped, got %s", sm.StateName()) + } + result := sm.Go(ts.service) + if !result { + t.Errorf("want true, got false") + } + if sm.StateName() != "Running" { + t.Errorf("want Running, got %s", sm.StateName()) + } + time.Sleep(5 * time.Millisecond) + if val := ts.activated.Get(); val != 1 { + t.Errorf("want 1, got %d", val) + } + result = sm.Go(ts.service) + if result { + t.Errorf("want false, got true") + } + result = sm.Stop() + if !result { + t.Errorf("want true, got false") + } + if val := ts.activated.Get(); val != 0 { + t.Errorf("want 0, got %d", val) + } + result = sm.Stop() + if result { + t.Errorf("want false, got true") + } + sm.state.Set(SERVICE_SHUTTING_DOWN) + if sm.StateName() != "ShuttingDown" { + t.Errorf("want ShuttingDown, got %s", sm.StateName()) + } +} + +func TestServiceManagerSelect(t *testing.T) { + ts := &testService{t: t} + var sm ServiceManager + if sm.StateName() != "Stopped" { + t.Errorf("want Stopped, got %s", sm.StateName()) + } + result := sm.Go(ts.selectService) + if !result { + t.Errorf("want true, got false") + } + if sm.StateName() != "Running" { + t.Errorf("want Running, got %s", sm.StateName()) + } + time.Sleep(5 * time.Millisecond) + if val := ts.activated.Get(); val != 1 { + t.Errorf("want 1, got %d", val) + } + result = sm.Go(ts.service) + if result { + t.Errorf("want false, got true") + } + result = sm.Stop() + if !result { + t.Errorf("want true, got false") + } + if val := ts.activated.Get(); val != 0 { + t.Errorf("want 0, got %d", val) + } + result = sm.Stop() + if result { + t.Errorf("want false, got true") + } + sm.state.Set(SERVICE_SHUTTING_DOWN) + if sm.StateName() != "ShuttingDown" { + t.Errorf("want ShuttingDown, got %s", sm.StateName()) + } +} + +func TestServiceManagerWaitNotRunning(t *testing.T) { + done := make(chan struct{}) + var sm ServiceManager + go func() { + sm.Wait() + close(done) + }() + select { + case <-done: + case <-time.After(1 * time.Second): + t.Errorf("Wait() blocked even though service wasn't running.") + } +} + +func TestServiceManagerWait(t *testing.T) { + done := make(chan struct{}) + stop := make(chan struct{}) + var sm ServiceManager + sm.Go(func(*ServiceContext) error { + <-stop + return nil + }) + go func() { + sm.Wait() + close(done) + }() + time.Sleep(100 * time.Millisecond) + select { + case <-done: + t.Errorf("Wait() didn't block while service was still running.") + default: + } + close(stop) + select { + case <-done: + case <-time.After(100 * time.Millisecond): + t.Errorf("Wait() didn't unblock when service stopped.") + } +} + +func TestServiceManagerJoin(t *testing.T) { + want := "error 123" + var sm ServiceManager + sm.Go(func(*ServiceContext) error { + return fmt.Errorf("error 123") + }) + if got := sm.Join().Error(); got != want { + t.Errorf("Join().Error() = %#v, want %#v", got, want) + } +} diff --git a/vendor/github.com/ngaut/sync2/vitess_license b/vendor/github.com/ngaut/sync2/vitess_license new file mode 100644 index 0000000..989d02e --- /dev/null +++ b/vendor/github.com/ngaut/sync2/vitess_license @@ -0,0 +1,28 @@ +Copyright 2012, Google Inc. +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/github.com/ngaut/systimemon/LICENSE b/vendor/github.com/ngaut/systimemon/LICENSE new file mode 100644 index 0000000..8dada3e --- /dev/null +++ b/vendor/github.com/ngaut/systimemon/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/ngaut/systimemon/README.md b/vendor/github.com/ngaut/systimemon/README.md new file mode 100644 index 0000000..39e3630 --- /dev/null +++ b/vendor/github.com/ngaut/systimemon/README.md @@ -0,0 +1,2 @@ +# systimemon +System time monitor diff --git a/vendor/github.com/ngaut/systimemon/systime_mon.go b/vendor/github.com/ngaut/systimemon/systime_mon.go new file mode 100644 index 0000000..0d3defe --- /dev/null +++ b/vendor/github.com/ngaut/systimemon/systime_mon.go @@ -0,0 +1,24 @@ +package systimemon + +import ( + "time" + + "github.com/ngaut/log" +) + +// StartMonitor will call systimeErrHandler if system time jump backward. +func StartMonitor(now func() time.Time, systimeErrHandler func()) { + log.Info("start system time monitor") + tick := time.NewTicker(100 * time.Millisecond) + defer tick.Stop() + for { + last := now() + select { + case <-tick.C: + if now().Sub(last) < 0 { + log.Errorf("system time jump backward, last:%v", last) + systimeErrHandler() + } + } + } +} diff --git a/vendor/github.com/ngaut/systimemon/systime_mon_test.go b/vendor/github.com/ngaut/systimemon/systime_mon_test.go new file mode 100644 index 0000000..3f6dc1b --- /dev/null +++ b/vendor/github.com/ngaut/systimemon/systime_mon_test.go @@ -0,0 +1,28 @@ +package systimemon + +import ( + "testing" + "time" +) + +func TestSystimeMonitor(t *testing.T) { + jumpForward := false + trigged := false + go StartMonitor( + func() time.Time { + if !trigged { + trigged = true + return time.Now() + } + + return time.Now().Add(-2 * time.Second) + }, func() { + jumpForward = true + }) + + time.Sleep(1 * time.Second) + + if !jumpForward { + t.Error("should detect time error") + } +}