2016-09-13 22:20:22 +02:00

1720 lines
38 KiB
Go

// Copyright 2014 The ql Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//LATER profile mem
//LATER profile cpu
//LATER coverage
package ql
import (
"bytes"
"errors"
"fmt"
"math/big"
"strconv"
"strings"
"sync"
"time"
"github.com/cznic/strutil"
)
const (
crossJoin = iota
leftJoin
rightJoin
fullJoin
)
// NOTE: all rset implementations must be safe for concurrent use by multiple
// goroutines. If the do method requires any execution domain local data, they
// must be held out of the implementing instance.
var (
_ rset = (*distinctRset)(nil)
_ rset = (*groupByRset)(nil)
_ rset = (*joinRset)(nil)
_ rset = (*limitRset)(nil)
_ rset = (*offsetRset)(nil)
_ rset = (*orderByRset)(nil)
_ rset = (*selectRset)(nil)
_ rset = (*selectStmt)(nil)
_ rset = (*tableRset)(nil)
_ rset = (*whereRset)(nil)
isTesting bool // enables test hook: select from an index
)
type rset interface {
plan(ctx *execCtx) (plan, error)
}
type recordset struct {
ctx *execCtx
plan
tx *TCtx
}
func (r recordset) fieldNames() []interface{} {
f := r.plan.fieldNames()
a := make([]interface{}, len(f))
for i, v := range f {
a[i] = v
}
return a
}
// Do implements Recordset.
func (r recordset) Do(names bool, f func(data []interface{}) (bool, error)) error {
if names {
if more, err := f(r.fieldNames()); err != nil || !more {
return err
}
}
return r.ctx.db.do(r, f)
}
// Fields implements Recordset.
func (r recordset) Fields() (names []string, err error) {
return r.plan.fieldNames(), nil
}
// FirstRow implements Recordset.
func (r recordset) FirstRow() (row []interface{}, err error) {
rows, err := r.Rows(1, 0)
if err != nil {
return nil, err
}
if len(rows) != 0 {
return rows[0], nil
}
return nil, nil
}
// Rows implements Recordset.
func (r recordset) Rows(limit, offset int) ([][]interface{}, error) {
var rows [][]interface{}
if err := r.Do(false, func(row []interface{}) (bool, error) {
if offset > 0 {
offset--
return true, nil
}
switch {
case limit < 0:
rows = append(rows, row)
return true, nil
case limit == 0:
return false, nil
default: // limit > 0
rows = append(rows, row)
limit--
return limit > 0, nil
}
}); err != nil {
return nil, err
}
return rows, nil
}
// List represents a group of compiled statements.
type List struct {
l []stmt
params int
}
// String implements fmt.Stringer
func (l List) String() string {
var b bytes.Buffer
f := strutil.IndentFormatter(&b, "\t")
for _, s := range l.l {
switch s.(type) {
case beginTransactionStmt:
f.Format("%s\n%i", s)
case commitStmt, rollbackStmt:
f.Format("%u%s\n", s)
default:
f.Format("%s\n", s)
}
}
return b.String()
}
// IsExplainStmt reports whether l is a single EXPLAIN statement or a single EXPLAIN
// statement enclosed in a transaction.
func (l List) IsExplainStmt() bool {
switch len(l.l) {
case 1:
_, ok := l.l[0].(*explainStmt)
return ok
case 3:
if _, ok := l.l[0].(beginTransactionStmt); !ok {
return false
}
if _, ok := l.l[1].(*explainStmt); !ok {
return false
}
_, ok := l.l[2].(commitStmt)
return ok
default:
return false
}
}
type groupByRset struct {
colNames []string
src plan
}
func (r *groupByRset) plan(ctx *execCtx) (plan, error) {
fields := r.src.fieldNames()
for _, v := range r.colNames {
found := false
for _, v2 := range fields {
if v == v2 {
found = true
break
}
}
if !found {
return nil, fmt.Errorf("unknown field %s", v)
}
}
return &groupByDefaultPlan{colNames: r.colNames, src: r.src, fields: fields}, nil
}
// TCtx represents transaction context. It enables to execute multiple
// statement lists in the same context. The same context guarantees the state
// of the DB cannot change in between the separated executions.
//
// LastInsertID
//
// LastInsertID is updated by INSERT INTO statements. The value considers
// performed ROLLBACK statements, if any, even though roll backed IDs are not
// reused. QL clients should treat the field as read only.
//
// RowsAffected
//
// RowsAffected is updated by INSERT INTO, DELETE FROM and UPDATE statements.
// The value does not (yet) consider any ROLLBACK statements involved. QL
// clients should treat the field as read only.
type TCtx struct {
LastInsertID int64
RowsAffected int64
}
// NewRWCtx returns a new read/write transaction context. NewRWCtx is safe for
// concurrent use by multiple goroutines, every one of them will get a new,
// unique context.
func NewRWCtx() *TCtx { return &TCtx{} }
// Recordset is a result of a select statement. It can call a user function for
// every row (record) in the set using the Do method.
//
// Recordsets can be safely reused. Evaluation of the rows is performed lazily.
// Every invocation of Do will see the current, potentially actualized data.
//
// Do
//
// Do will call f for every row (record) in the Recordset.
//
// If f returns more == false or err != nil then f will not be called for any
// remaining rows in the set and the err value is returned from Do.
//
// If names == true then f is firstly called with a virtual row
// consisting of field (column) names of the RecordSet.
//
// Do is executed in a read only context and performs a RLock of the
// database.
//
// Do is safe for concurrent use by multiple goroutines.
//
// Fields
//
// Fields return a slice of field names of the recordset. The result is computed
// without actually computing the recordset rows.
//
// FirstRow
//
// FirstRow will return the first row of the RecordSet or an error, if any. If
// the Recordset has no rows the result is (nil, nil).
//
// Rows
//
// Rows will return rows in Recordset or an error, if any. The semantics of
// limit and offset are the same as of the LIMIT and OFFSET clauses of the
// SELECT statement. To get all rows pass limit < 0. If there are no rows to
// return the result is (nil, nil).
type Recordset interface {
Do(names bool, f func(data []interface{}) (more bool, err error)) error
Fields() (names []string, err error)
FirstRow() (row []interface{}, err error)
Rows(limit, offset int) (rows [][]interface{}, err error)
}
type assignment struct {
colName string
expr expression
}
func (a *assignment) String() string {
return fmt.Sprintf("%s=%s", a.colName, a.expr)
}
type distinctRset struct {
src plan
}
func (r *distinctRset) plan(ctx *execCtx) (plan, error) {
return &distinctDefaultPlan{src: r.src, fields: r.src.fieldNames()}, nil
}
type orderByRset struct {
asc bool
by []expression
src plan
}
func (r *orderByRset) String() string {
a := make([]string, len(r.by))
for i, v := range r.by {
a[i] = v.String()
}
s := strings.Join(a, ", ")
if !r.asc {
s += " DESC"
}
return s
}
func (r *orderByRset) plan(ctx *execCtx) (plan, error) {
if _, ok := r.src.(*nullPlan); ok {
return r.src, nil
}
var by []expression
fields := r.src.fieldNames()
for _, e := range r.by {
cols := mentionedColumns(e)
for k := range cols {
found := false
for _, v := range fields {
if k == v {
found = true
break
}
}
if !found {
return nil, fmt.Errorf("unknown field %s", k)
}
}
if len(cols) == 0 {
v, err := e.eval(ctx, nil)
if err != nil {
by = append(by, e)
continue
}
if isConstValue(v) != nil {
continue
}
}
by = append(by, e)
}
return &orderByDefaultPlan{asc: r.asc, by: by, src: r.src, fields: fields}, nil
}
type whereRset struct {
expr expression
src plan
}
func (r *whereRset) planBinOp(x *binaryOperation) (plan, error) {
p := r.src
ok, cn := isColumnExpression(x.l)
if ok && cn == "id()" {
if v := isConstValue(x.r); v != nil {
v, err := typeCheck1(v, idCol)
if err != nil {
return nil, err
}
rv := v.(int64)
switch {
case p.hasID():
switch x.op {
case '<':
if rv <= 1 {
return &nullPlan{p.fieldNames()}, nil
}
case '>':
if rv <= 0 {
return p, nil
}
case ge:
if rv >= 1 {
return p, nil
}
case neq:
if rv <= 0 {
return p, nil
}
case eq:
if rv <= 0 {
return &nullPlan{p.fieldNames()}, nil
}
case le:
if rv <= 0 {
return &nullPlan{p.fieldNames()}, nil
}
}
}
}
}
var err error
var p2 plan
var is []string
switch x.op {
case eq, ge, '>', le, '<', neq:
if p2, is, err = p.filter(x); err != nil {
return nil, err
}
if p2 != nil {
return p2, nil
}
case andand:
var in []expression
var f func(expression)
f = func(e expression) {
b, ok := e.(*binaryOperation)
if !ok || b.op != andand {
in = append(in, e)
return
}
f(b.l)
f(b.r)
}
f(x)
out := []expression{}
p := r.src
isNewPlan := false
for _, e := range in {
p2, is2, err := p.filter(e)
if err != nil {
return nil, err
}
if p2 == nil {
is = append(is, is2...)
out = append(out, e)
continue
}
p = p2
isNewPlan = true
}
if !isNewPlan {
break
}
if len(out) == 0 {
return p, nil
}
for len(out) > 1 {
n := len(out)
e, err := newBinaryOperation(andand, out[n-2], out[n-1])
if err != nil {
return nil, err
}
out = out[:n-1]
out[n-2] = e
}
return &filterDefaultPlan{p, out[0], is}, nil
}
return &filterDefaultPlan{p, x, is}, nil
}
func (r *whereRset) planIdent(x *ident) (plan, error) {
p := r.src
p2, is, err := p.filter(x)
if err != nil {
return nil, err
}
if p2 != nil {
return p2, nil
}
return &filterDefaultPlan{p, x, is}, nil
}
func (r *whereRset) planIsNull(x *isNull) (plan, error) {
p := r.src
ok, cn := isColumnExpression(x.expr)
if !ok {
return &filterDefaultPlan{p, x, nil}, nil
}
if cn == "id()" {
switch {
case p.hasID():
switch {
case x.not: // IS NOT NULL
return p, nil
default: // IS NULL
return &nullPlan{p.fieldNames()}, nil
}
default:
switch {
case x.not: // IS NOT NULL
return &nullPlan{p.fieldNames()}, nil
default: // IS NULL
return p, nil
}
}
}
p2, is, err := p.filter(x)
if err != nil {
return nil, err
}
if p2 != nil {
return p2, nil
}
return &filterDefaultPlan{p, x, is}, nil
}
func (r *whereRset) planUnaryOp(x *unaryOperation) (plan, error) {
p := r.src
p2, is, err := p.filter(x)
if err != nil {
return nil, err
}
if p2 != nil {
return p2, nil
}
return &filterDefaultPlan{p, x, is}, nil
}
func (r *whereRset) plan(ctx *execCtx) (plan, error) {
expr, err := r.expr.clone(ctx.arg)
if err != nil {
return nil, err
}
switch r.src.(type) {
case *leftJoinDefaultPlan, *rightJoinDefaultPlan, *fullJoinDefaultPlan:
return &filterDefaultPlan{r.src, expr, nil}, nil
}
switch x := expr.(type) {
case *binaryOperation:
return r.planBinOp(x)
case *ident:
return r.planIdent(x)
case *isNull:
return r.planIsNull(x)
case *pIn:
//TODO optimize
//TODO show plan
case *pLike:
//TODO optimize
case *unaryOperation:
return r.planUnaryOp(x)
}
return &filterDefaultPlan{r.src, expr, nil}, nil
}
type offsetRset struct {
expr expression
src plan
}
func (r *offsetRset) plan(ctx *execCtx) (plan, error) {
return &offsetDefaultPlan{expr: r.expr, src: r.src, fields: r.src.fieldNames()}, nil
}
type limitRset struct {
expr expression
src plan
}
func (r *limitRset) plan(ctx *execCtx) (plan, error) {
return &limitDefaultPlan{expr: r.expr, src: r.src, fields: r.src.fieldNames()}, nil
}
type selectRset struct {
flds []*fld
src plan
}
func (r *selectRset) plan(ctx *execCtx) (plan, error) {
var flds2 []*fld
if len(r.flds) != 0 {
m := map[string]struct{}{}
for _, v := range r.flds {
mentionedColumns0(v.expr, true, true, m)
}
for _, v := range r.src.fieldNames() {
delete(m, v)
}
for k := range m {
return nil, fmt.Errorf("unknown field %s", k)
}
flds2 = append(flds2, r.flds...)
}
if x, ok := r.src.(*groupByDefaultPlan); ok {
if len(r.flds) == 0 {
fields := x.fieldNames()
flds := make([]*fld, len(fields))
for i, v := range fields {
flds[i] = &fld{&ident{v}, v}
}
return &selectFieldsGroupPlan{flds: flds, src: x, fields: fields}, nil
}
p := &selectFieldsGroupPlan{flds: flds2, src: x}
for _, v := range r.flds {
p.fields = append(p.fields, v.name)
}
return p, nil
}
if len(r.flds) == 0 {
return r.src, nil
}
f0 := r.src.fieldNames()
if len(f0) == len(flds2) {
match := true
for i, v := range flds2 {
if x, ok := v.expr.(*ident); ok && x.s == f0[i] && v.name == f0[i] {
continue
}
match = false
break
}
if match {
return r.src, nil
}
}
src := r.src
if x, ok := src.(*tableDefaultPlan); ok {
isconst := true
for _, v := range flds2 {
if isConstValue(v.expr) == nil {
isconst = false
break
}
}
if isconst { // #250
src = &tableNilPlan{x.t}
}
}
p := &selectFieldsDefaultPlan{flds: flds2, src: src}
for _, v := range r.flds {
p.fields = append(p.fields, v.name)
}
return p, nil
}
type tableRset string
func (r tableRset) plan(ctx *execCtx) (plan, error) {
switch r {
case "__Table":
return &sysTableDefaultPlan{}, nil
case "__Column":
return &sysColumnDefaultPlan{}, nil
case "__Index":
return &sysIndexDefaultPlan{}, nil
}
t, ok := ctx.db.root.tables[string(r)]
if !ok && isTesting {
if _, x0 := ctx.db.root.findIndexByName(string(r)); x0 != nil {
return &selectIndexDefaultPlan{nm: string(r), x: x0}, nil
}
}
if !ok {
return nil, fmt.Errorf("table %s does not exist", r)
}
rs := &tableDefaultPlan{t: t}
for _, col := range t.cols {
rs.fields = append(rs.fields, col.name)
}
return rs, nil
}
func findFld(fields []*fld, name string) (f *fld) {
for _, f = range fields {
if f.name == name {
return
}
}
return nil
}
type col struct {
index int
name string
typ int
constraint *constraint
dflt expression
}
var idCol = &col{name: "id()", typ: qInt64}
func findCol(cols []*col, name string) (c *col) {
for _, c = range cols {
if c.name == name {
return
}
}
return nil
}
func (f *col) clone() *col {
var r col
r = *f
r.constraint = f.constraint.clone()
if f.dflt != nil {
r.dflt, _ = r.dflt.clone(nil)
}
return &r
}
func (f *col) typeCheck(x interface{}) (ok bool) { //NTYPE
switch x.(type) {
case nil:
return true
case bool:
return f.typ == qBool
case complex64:
return f.typ == qComplex64
case complex128:
return f.typ == qComplex128
case float32:
return f.typ == qFloat32
case float64:
return f.typ == qFloat64
case int8:
return f.typ == qInt8
case int16:
return f.typ == qInt16
case int32:
return f.typ == qInt32
case int64:
return f.typ == qInt64
case string:
return f.typ == qString
case uint8:
return f.typ == qUint8
case uint16:
return f.typ == qUint16
case uint32:
return f.typ == qUint32
case uint64:
return f.typ == qUint64
case []byte:
return f.typ == qBlob
case *big.Int:
return f.typ == qBigInt
case *big.Rat:
return f.typ == qBigRat
case time.Time:
return f.typ == qTime
case time.Duration:
return f.typ == qDuration
case chunk:
return true // was checked earlier
}
return
}
func cols2meta(f []*col) (s string) {
a := []string{}
for _, f := range f {
a = append(a, string(f.typ)+f.name)
}
return strings.Join(a, "|")
}
// DB represent the database capable of executing QL statements.
type DB struct {
cc *TCtx // Current transaction context
isMem bool
mu sync.Mutex
root *root
rw bool // DB FSM
rwmu sync.RWMutex
store storage
tnl int // Transaction nesting level
exprCache map[string]expression
exprCacheMu sync.Mutex
hasIndex2 int // 0: nope, 1: in progress, 2: yes.
}
var selIndex2Expr = MustCompile("select Expr from __Index2_Expr where Index2_ID == $1")
func newDB(store storage) (db *DB, err error) {
db0 := &DB{
exprCache: map[string]expression{},
store: store,
}
if db0.root, err = newRoot(store); err != nil {
return
}
ctx := &execCtx{db: db0}
for _, t := range db0.root.tables {
if err := t.constraintsAndDefaults(ctx); err != nil {
return nil, err
}
}
if !db0.hasAllIndex2() {
return db0, nil
}
db0.hasIndex2 = 2
rss, _, err := db0.Run(nil, "select id(), TableName, IndexName, IsUnique, Root from __Index2 where !IsSimple")
if err != nil {
return nil, err
}
rows, err := rss[0].Rows(-1, 0)
if err != nil {
return nil, err
}
for _, row := range rows {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("error loading DB indices: %v", e)
}
}()
id := row[0].(int64)
tn := row[1].(string)
xn := row[2].(string)
unique := row[3].(bool)
xroot := row[4].(int64)
t := db0.root.tables[tn]
if t == nil {
return nil, fmt.Errorf("DB index refers to nonexistent table: %s", tn)
}
x, err := store.OpenIndex(unique, xroot)
if err != nil {
return nil, err
}
if v := t.indices2[xn]; v != nil {
return nil, fmt.Errorf("duplicate DB index: %s", xn)
}
ix := &index2{
unique: unique,
x: x,
xroot: xroot,
}
rss, _, err := db0.Execute(nil, selIndex2Expr, id)
if err != nil {
return nil, err
}
rows, err := rss[0].Rows(-1, 0)
if err != nil {
return nil, err
}
if len(rows) == 0 {
return nil, fmt.Errorf("index has no expression: %s", xn)
}
var sources []string
var list []expression
for _, row := range rows {
src, ok := row[0].(string)
if !ok {
return nil, fmt.Errorf("index %s: expression of type %T", xn, row[0])
}
expr, err := db0.str2expr(src)
if err != nil {
return nil, fmt.Errorf("index %s: expression error: %v", xn, err)
}
sources = append(sources, src)
list = append(list, expr)
}
ix.sources = sources
ix.exprList = list
if t.indices2 == nil {
t.indices2 = map[string]*index2{}
}
t.indices2[xn] = ix
}
return db0, nil
}
func (db *DB) deleteIndex2ByIndexName(nm string) error {
for _, s := range deleteIndex2ByIndexName.l {
if _, err := s.exec(&execCtx{db: db, arg: []interface{}{nm}}); err != nil {
return err
}
}
return nil
}
func (db *DB) deleteIndex2ByTableName(nm string) error {
for _, s := range deleteIndex2ByTableName.l {
if _, err := s.exec(&execCtx{db: db, arg: []interface{}{nm}}); err != nil {
return err
}
}
return nil
}
func (db *DB) createIndex2() error {
if db.hasIndex2 != 0 {
return nil
}
db.hasIndex2 = 1
ctx := execCtx{db: db}
for _, s := range createIndex2.l {
if _, err := s.exec(&ctx); err != nil {
db.hasIndex2 = 0
return err
}
}
for t := db.root.thead; t != nil; t = t.tnext {
for i, index := range t.indices {
if index == nil {
continue
}
expr := "id()"
if i != 0 {
expr = t.cols[i-1].name
}
if err := db.insertIndex2(t.name, index.name, []string{expr}, index.unique, true, index.xroot); err != nil {
db.hasIndex2 = 0
return err
}
}
}
db.hasIndex2 = 2
return nil
}
func (db *DB) insertIndex2(tableName, indexName string, expr []string, unique, isSimple bool, h int64) error {
ctx := execCtx{db: db}
ctx.arg = []interface{}{
tableName,
indexName,
unique,
isSimple,
h,
}
if _, err := insertIndex2.l[0].exec(&ctx); err != nil {
return err
}
id := db.root.lastInsertID
for _, e := range expr {
ctx.arg = []interface{}{id, e}
if _, err := insertIndex2Expr.l[0].exec(&ctx); err != nil {
return err
}
}
return nil
}
func (db *DB) hasAllIndex2() bool {
t := db.root.tables
if _, ok := t["__Index2"]; !ok {
return false
}
_, ok := t["__Index2_Expr"]
return ok
}
func (db *DB) str2expr(expr string) (expression, error) {
db.exprCacheMu.Lock()
e := db.exprCache[expr]
db.exprCacheMu.Unlock()
if e != nil {
return e, nil
}
e, err := compileExpr(expr)
if err != nil {
return nil, err
}
db.exprCacheMu.Lock()
for k := range db.exprCache {
if len(db.exprCache) < 1000 {
break
}
delete(db.exprCache, k)
}
db.exprCache[expr] = e
db.exprCacheMu.Unlock()
return e, nil
}
// Name returns the name of the DB.
func (db *DB) Name() string { return db.store.Name() }
// Run compiles and executes a statement list. It returns, if applicable, a
// RecordSet slice and/or an index and error.
//
// For more details please see DB.Execute
//
// Run is safe for concurrent use by multiple goroutines.
func (db *DB) Run(ctx *TCtx, ql string, arg ...interface{}) (rs []Recordset, index int, err error) {
l, err := Compile(ql)
if err != nil {
return nil, -1, err
}
return db.Execute(ctx, l, arg...)
}
func (db *DB) run(ctx *TCtx, ql string, arg ...interface{}) (rs []Recordset, index int, err error) {
l, err := compile(ql)
if err != nil {
return nil, -1, err
}
return db.Execute(ctx, l, arg...)
}
// Compile parses the ql statements from src and returns a compiled list for
// DB.Execute or an error if any.
//
// Compile is safe for concurrent use by multiple goroutines.
func Compile(src string) (List, error) {
l := newLexer(src)
if yyParse(l) != 0 {
return List{}, l.errs[0]
}
return List{l.list, l.params}, nil
}
func compileExpr(src string) (expression, error) {
l := newLexer(src)
l.inj = parseExpression
if yyParse(l) != 0 {
return nil, l.errs[0]
}
return l.expr, nil
}
func compile(src string) (List, error) {
l := newLexer(src)
l.root = true
if yyParse(l) != 0 {
return List{}, l.errs[0]
}
return List{l.list, l.params}, nil
}
// MustCompile is like Compile but panics if the ql statements in src cannot be
// compiled. It simplifies safe initialization of global variables holding
// compiled statement lists for DB.Execute.
//
// MustCompile is safe for concurrent use by multiple goroutines.
func MustCompile(src string) List {
list, err := Compile(src)
if err != nil {
panic("ql: Compile(" + strconv.Quote(src) + "): " + err.Error()) // panic ok here
}
return list
}
func mustCompile(src string) List {
list, err := compile(src)
if err != nil {
panic("ql: compile(" + strconv.Quote(src) + "): " + err.Error()) // panic ok here
}
return list
}
// Execute executes statements in a list while substituting QL paramaters from
// arg.
//
// The resulting []Recordset corresponds to the SELECT FROM statements in the
// list.
//
// If err != nil then index is the zero based index of the failed QL statement.
// Empty statements do not count.
//
// The FSM STT describing the relations between DB states, statements and the
// ctx parameter.
//
// +-----------+---------------------+------------------+------------------+------------------+
// |\ Event | | | | |
// | \-------\ | BEGIN | | | Other |
// | State \| TRANSACTION | COMMIT | ROLLBACK | statement |
// +-----------+---------------------+------------------+------------------+------------------+
// | RD | if PC == nil | return error | return error | DB.RLock |
// | | return error | | | Execute(1) |
// | CC == nil | | | | DB.RUnlock |
// | TNL == 0 | DB.Lock | | | |
// | | CC = PC | | | |
// | | TNL++ | | | |
// | | DB.BeginTransaction | | | |
// | | State = WR | | | |
// +-----------+---------------------+------------------+------------------+------------------+
// | WR | if PC == nil | if PC != CC | if PC != CC | if PC == nil |
// | | return error | return error | return error | DB.Rlock |
// | CC != nil | | | | Execute(1) |
// | TNL != 0 | if PC != CC | DB.Commit | DB.Rollback | RUnlock |
// | | DB.Lock | TNL-- | TNL-- | else if PC != CC |
// | | CC = PC | if TNL == 0 | if TNL == 0 | return error |
// | | | CC = nil | CC = nil | else |
// | | TNL++ | State = RD | State = RD | Execute(2) |
// | | DB.BeginTransaction | DB.Unlock | DB.Unlock | |
// +-----------+---------------------+------------------+------------------+------------------+
// CC: Curent transaction context
// PC: Passed transaction context
// TNL: Transaction nesting level
//
// Lock, Unlock, RLock, RUnlock semantics above are the same as in
// sync.RWMutex.
//
// (1): Statement list is executed outside of a transaction. Attempts to update
// the DB will fail, the execution context is read-only. Other statements with
// read only context will execute concurrently. If any statement fails, the
// execution of the statement list is aborted.
//
// Note that the RLock/RUnlock surrounds every single "other" statement when it
// is executed outside of a transaction. If read consistency is required by a
// list of more than one statement then an explicit BEGIN TRANSACTION / COMMIT
// or ROLLBACK wrapper must be provided. Otherwise the state of the DB may
// change in between executing any two out-of-transaction statements.
//
// (2): Statement list is executed inside an isolated transaction. Execution of
// statements can update the DB, the execution context is read-write. If any
// statement fails, the execution of the statement list is aborted and the DB
// is automatically rolled back to the TNL which was active before the start of
// execution of the statement list.
//
// Execute is safe for concurrent use by multiple goroutines, but one must
// consider the blocking issues as discussed above.
//
// ACID
//
// Atomicity: Transactions are atomic. Transactions can be nested. Commit or
// rollbacks work on the current transaction level. Transactions are made
// persistent only on the top level commit. Reads made from within an open
// transaction are dirty reads.
//
// Consistency: Transactions bring the DB from one structurally consistent
// state to other structurally consistent state.
//
// Isolation: Transactions are isolated. Isolation is implemented by
// serialization.
//
// Durability: Transactions are durable. A two phase commit protocol and a
// write ahead log is used. Database is recovered after a crash from the write
// ahead log automatically on open.
func (db *DB) Execute(ctx *TCtx, l List, arg ...interface{}) (rs []Recordset, index int, err error) {
// Sanitize args
for i, v := range arg {
switch x := v.(type) {
case nil, bool, complex64, complex128, float32, float64, string,
int8, int16, int32, int64, int,
uint8, uint16, uint32, uint64, uint,
*big.Int, *big.Rat, []byte, time.Duration, time.Time:
case big.Int:
arg[i] = &x
case big.Rat:
arg[i] = &x
default:
return nil, 0, fmt.Errorf("cannot use arg[%d] (type %T):unsupported type", i, v)
}
}
tnl0 := -1
if ctx != nil {
ctx.LastInsertID, ctx.RowsAffected = 0, 0
}
list := l.l
for _, s := range list {
r, tnla, tnl, err := db.run1(ctx, s, arg...)
if tnl0 < 0 {
tnl0 = tnla
}
if err != nil {
for tnl > tnl0 {
var e2 error
if _, _, tnl, e2 = db.run1(ctx, rollbackStmt{}); e2 != nil {
err = e2
}
}
return rs, index, err
}
if r != nil {
if x, ok := r.(recordset); ok {
x.tx = ctx
r = x
}
rs = append(rs, r)
}
}
return
}
func (db *DB) run1(pc *TCtx, s stmt, arg ...interface{}) (rs Recordset, tnla, tnlb int, err error) {
db.mu.Lock()
tnla = db.tnl
tnlb = db.tnl
switch db.rw {
case false:
switch s.(type) {
case beginTransactionStmt:
defer db.mu.Unlock()
if pc == nil {
return nil, tnla, tnlb, errors.New("BEGIN TRANSACTION: cannot start a transaction in nil TransactionCtx")
}
if err = db.store.BeginTransaction(); err != nil {
return
}
db.rwmu.Lock()
db.beginTransaction()
db.cc = pc
db.tnl++
tnlb = db.tnl
db.rw = true
return
case commitStmt:
defer db.mu.Unlock()
return nil, tnla, tnlb, errCommitNotInTransaction
case rollbackStmt:
defer db.mu.Unlock()
return nil, tnla, tnlb, errRollbackNotInTransaction
default:
if s.isUpdating() {
db.mu.Unlock()
return nil, tnla, tnlb, fmt.Errorf("attempt to update the DB outside of a transaction")
}
db.rwmu.RLock() // can safely grab before Unlock
db.mu.Unlock()
defer db.rwmu.RUnlock()
rs, err = s.exec(&execCtx{db, arg}) // R/O tctx
return rs, tnla, tnlb, err
}
default: // case true:
switch s.(type) {
case beginTransactionStmt:
defer db.mu.Unlock()
if pc == nil {
return nil, tnla, tnlb, errBeginTransNoCtx
}
if pc != db.cc {
for db.rw {
db.mu.Unlock() // Transaction isolation
db.mu.Lock()
}
db.rw = true
db.rwmu.Lock()
}
if err = db.store.BeginTransaction(); err != nil {
return
}
db.beginTransaction()
db.cc = pc
db.tnl++
tnlb = db.tnl
return
case commitStmt:
defer db.mu.Unlock()
if pc != db.cc {
return nil, tnla, tnlb, fmt.Errorf("invalid passed transaction context")
}
db.commit()
err = db.store.Commit()
db.tnl--
tnlb = db.tnl
if db.tnl != 0 {
return
}
db.cc = nil
db.rw = false
db.rwmu.Unlock()
return
case rollbackStmt:
defer db.mu.Unlock()
defer func() { pc.LastInsertID = db.root.lastInsertID }()
if pc != db.cc {
return nil, tnla, tnlb, fmt.Errorf("invalid passed transaction context")
}
db.rollback()
err = db.store.Rollback()
db.tnl--
tnlb = db.tnl
if db.tnl != 0 {
return
}
db.cc = nil
db.rw = false
db.rwmu.Unlock()
return
default:
if pc == nil {
if s.isUpdating() {
db.mu.Unlock()
return nil, tnla, tnlb, fmt.Errorf("attempt to update the DB outside of a transaction")
}
db.mu.Unlock() // must Unlock before RLock
db.rwmu.RLock()
defer db.rwmu.RUnlock()
rs, err = s.exec(&execCtx{db, arg})
return rs, tnla, tnlb, err
}
defer db.mu.Unlock()
defer func() { pc.LastInsertID = db.root.lastInsertID }()
if pc != db.cc {
return nil, tnla, tnlb, fmt.Errorf("invalid passed transaction context")
}
rs, err = s.exec(&execCtx{db, arg})
return rs, tnla, tnlb, err
}
}
}
// Flush ends the transaction collecting window, if applicable. IOW, if the DB
// is dirty, it schedules a 2PC (WAL + DB file) commit on the next outer most
// DB.Commit or performs it synchronously if there's currently no open
// transaction.
//
// The collecting window is an implementation detail and future versions of
// Flush may become a no operation while keeping the operation semantics.
func (db *DB) Flush() (err error) {
return nil
}
// Close will close the DB. Successful Close is idempotent.
func (db *DB) Close() error {
db.mu.Lock()
defer db.mu.Unlock()
if db.store == nil {
return nil
}
if db.tnl != 0 {
return fmt.Errorf("cannot close DB while open transaction exist")
}
err := db.store.Close()
db.root, db.store = nil, nil
return err
}
func (db *DB) do(r recordset, f func(data []interface{}) (bool, error)) (err error) {
db.mu.Lock()
switch db.rw {
case false:
db.rwmu.RLock() // can safely grab before Unlock
db.mu.Unlock()
defer db.rwmu.RUnlock()
default: // case true:
if r.tx == nil {
db.mu.Unlock() // must Unlock before RLock
db.rwmu.RLock()
defer db.rwmu.RUnlock()
break
}
defer db.mu.Unlock()
if r.tx != db.cc {
return fmt.Errorf("invalid passed transaction context")
}
}
return r.do(r.ctx, func(id interface{}, data []interface{}) (bool, error) {
if err = expand(data); err != nil {
return false, err
}
return f(data)
})
}
func (db *DB) beginTransaction() { //TODO Rewrite, must use much smaller undo info!
root := *db.root
root.parent = db.root
root.tables = make(map[string]*table, len(db.root.tables))
var tprev *table
for t := db.root.thead; t != nil; t = t.tnext {
t2 := t.clone()
root.tables[t2.name] = t2
t2.tprev = tprev
switch {
case tprev == nil:
root.thead = t2
default:
tprev.tnext = t2
}
tprev = t2
}
db.root = &root
}
func (db *DB) rollback() {
db.root = db.root.parent
}
func (db *DB) commit() {
db.root.parent = db.root.parent.parent
}
// Type represents a QL type (bigint, int, string, ...)
type Type int
// Values of ColumnInfo.Type.
const (
BigInt Type = qBigInt
BigRat = qBigRat
Blob = qBlob
Bool = qBool
Complex128 = qComplex128
Complex64 = qComplex64
Duration = qDuration
Float32 = qFloat32
Float64 = qFloat64
Int16 = qInt16
Int32 = qInt32
Int64 = qInt64
Int8 = qInt8
String = qString
Time = qTime
Uint16 = qUint16
Uint32 = qUint32
Uint64 = qUint64
Uint8 = qUint8
)
// String implements fmt.Stringer.
func (t Type) String() string {
return typeStr(int(t))
}
// ColumnInfo provides meta data describing a table column.
type ColumnInfo struct {
Name string // Column name.
Type Type // Column type (BigInt, BigRat, ...).
NotNull bool // Column cannot be NULL.
Constraint string // Constraint expression, if any.
Default string // Default expression, if any.
}
// TableInfo provides meta data describing a DB table.
type TableInfo struct {
// Table name.
Name string
// Table schema. Columns are listed in the order in which they appear
// in the schema.
Columns []ColumnInfo
}
// IndexInfo provides meta data describing a DB index. It corresponds to the
// statement
//
// CREATE INDEX Name ON Table (Column);
type IndexInfo struct {
Name string // Index name
Table string // Table name.
Column string // Column name.
Unique bool // Whether the index is unique.
ExpressionList []string // Index expression list.
}
// DbInfo provides meta data describing a DB.
type DbInfo struct {
Name string // DB name.
Tables []TableInfo // Tables in the DB.
Indices []IndexInfo // Indices in the DB.
}
func (db *DB) info() (r *DbInfo, err error) {
_, hasColumn2 := db.root.tables["__Column2"]
r = &DbInfo{Name: db.Name()}
for nm, t := range db.root.tables {
ti := TableInfo{Name: nm}
m := map[string]*ColumnInfo{}
if hasColumn2 {
rs, err := selectColumn2.l[0].exec(&execCtx{db: db, arg: []interface{}{nm}})
if err != nil {
return nil, err
}
if err := rs.(recordset).do(
&execCtx{db: db, arg: []interface{}{nm}},
func(id interface{}, data []interface{}) (bool, error) {
ci := &ColumnInfo{NotNull: data[1].(bool), Constraint: data[2].(string), Default: data[3].(string)}
m[data[0].(string)] = ci
return true, nil
},
); err != nil {
return nil, err
}
}
for _, c := range t.cols {
ci := ColumnInfo{Name: c.name, Type: Type(c.typ)}
if c2 := m[c.name]; c2 != nil {
ci.NotNull = c2.NotNull
ci.Constraint = c2.Constraint
ci.Default = c2.Default
}
ti.Columns = append(ti.Columns, ci)
}
r.Tables = append(r.Tables, ti)
for i, x := range t.indices {
if x == nil {
continue
}
var cn string
switch {
case i == 0:
cn = "id()"
default:
cn = t.cols0[i-1].name
}
r.Indices = append(r.Indices, IndexInfo{x.name, nm, cn, x.unique, []string{cn}})
}
var a []string
for k := range t.indices2 {
a = append(a, k)
}
for _, k := range a {
x := t.indices2[k]
a = a[:0]
for _, e := range x.exprList {
a = append(a, e.String())
}
r.Indices = append(r.Indices, IndexInfo{k, nm, "", x.unique, a})
}
}
return
}
// Info provides meta data describing a DB or an error if any. It locks the DB
// to obtain the result.
func (db *DB) Info() (r *DbInfo, err error) {
db.mu.Lock()
defer db.mu.Unlock()
return db.info()
}
type constraint struct {
expr expression // If expr == nil: constraint is 'NOT NULL'
}
func (c *constraint) clone() *constraint {
if c == nil {
return nil
}
var e expression
if c.expr != nil {
e, _ = c.expr.clone(nil)
}
return &constraint{e}
}
type joinRset struct {
sources []interface{}
typ int
on expression
}
func (r *joinRset) String() string {
a := make([]string, len(r.sources))
for i, pair0 := range r.sources {
pair := pair0.([]interface{})
altName := pair[1].(string)
switch x := pair[0].(type) {
case string: // table name
switch {
case altName == "":
a[i] = x
default:
a[i] = fmt.Sprintf("%s AS %s", x, altName)
}
case *selectStmt:
switch {
case altName == "":
a[i] = fmt.Sprintf("(%s)", x)
default:
a[i] = fmt.Sprintf("(%s) AS %s", x, altName)
}
default:
panic("internal error 054")
}
}
n := len(a)
a2 := a[:n-1]
j := a[n-1]
var s string
switch r.typ {
case crossJoin:
return strings.Join(a, ", ")
case leftJoin:
s = strings.Join(a2, ",") + " LEFT"
case rightJoin:
s = strings.Join(a2, ",") + " RIGHT"
case fullJoin:
s = strings.Join(a2, ",") + " FULL"
}
s += " OUTER JOIN " + j + " ON " + r.on.String()
return s
}
func (r *joinRset) plan(ctx *execCtx) (plan, error) {
rsets := make([]plan, len(r.sources))
names := make([]string, len(r.sources))
var err error
m := map[string]bool{}
var fields []string
for i, v := range r.sources {
pair := v.([]interface{})
src := pair[0]
nm := pair[1].(string)
if s, ok := src.(string); ok {
src = tableRset(s)
if nm == "" {
nm = s
}
}
if m[nm] {
return nil, fmt.Errorf("%s: duplicate name %s", r.String(), nm)
}
if nm != "" {
m[nm] = true
}
names[i] = nm
var q plan
switch x := src.(type) {
case rset:
if q, err = x.plan(ctx); err != nil {
return nil, err
}
case plan:
q = x
default:
panic("internal error 008")
}
switch {
case len(r.sources) == 1:
fields = q.fieldNames()
default:
for _, f := range q.fieldNames() {
if strings.Contains(f, ".") {
return nil, fmt.Errorf("cannot join on recordset with already qualified field names (use the AS clause): %s", f)
}
if f != "" && nm != "" {
f = fmt.Sprintf("%s.%s", nm, f)
}
if nm == "" {
f = ""
}
fields = append(fields, f)
}
}
rsets[i] = q
}
if len(rsets) == 1 {
return rsets[0], nil
}
right := len(rsets[len(rsets)-1].fieldNames())
switch r.typ {
case crossJoin:
return &crossJoinDefaultPlan{rsets: rsets, names: names, fields: fields}, nil
case leftJoin:
return &leftJoinDefaultPlan{rsets: rsets, names: names, fields: fields, on: r.on, right: right}, nil
case rightJoin:
return &rightJoinDefaultPlan{leftJoinDefaultPlan{rsets: rsets, names: names, fields: fields, on: r.on, right: right}}, nil
case fullJoin:
return &fullJoinDefaultPlan{leftJoinDefaultPlan{rsets: rsets, names: names, fields: fields, on: r.on, right: right}}, nil
default:
panic("internal error 010")
}
}
type fld struct {
expr expression
name string
}