2016-04-01 16:04:27 +02:00

334 lines
9.1 KiB
Go

/*
Copyright 2014 Outbrain Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sqlutils
import (
"database/sql"
"encoding/json"
"errors"
"fmt"
_ "github.com/go-sql-driver/mysql"
"github.com/outbrain/golib/log"
"strconv"
"strings"
"sync"
)
// RowMap represents one row in a result set. Its objective is to allow
// for easy, typed getters by column name.
type RowMap map[string]CellData
// Cell data is the result of a single (atomic) column in a single row
type CellData sql.NullString
func (this *CellData) MarshalJSON() ([]byte, error) {
if this.Valid {
return json.Marshal(this.String)
} else {
return json.Marshal(nil)
}
}
func (this *CellData) NullString() *sql.NullString {
return (*sql.NullString)(this)
}
// RowData is the result of a single row, in positioned array format
type RowData []CellData
// MarshalJSON will marshal this map as JSON
func (this *RowData) MarshalJSON() ([]byte, error) {
cells := make([](*CellData), len(*this), len(*this))
for i, val := range *this {
d := CellData(val)
cells[i] = &d
}
return json.Marshal(cells)
}
// ResultData is an ordered row set of RowData
type ResultData []RowData
var EmptyResultData = ResultData{}
func (this *RowMap) GetString(key string) string {
return (*this)[key].String
}
// GetStringD returns a string from the map, or a default value if the key does not exist
func (this *RowMap) GetStringD(key string, def string) string {
if cell, ok := (*this)[key]; ok {
return cell.String
}
return def
}
func (this *RowMap) GetInt64(key string) int64 {
res, _ := strconv.ParseInt(this.GetString(key), 10, 0)
return res
}
func (this *RowMap) GetNullInt64(key string) sql.NullInt64 {
i, err := strconv.ParseInt(this.GetString(key), 10, 0)
if err == nil {
return sql.NullInt64{Int64: i, Valid: true}
} else {
return sql.NullInt64{Valid: false}
}
}
func (this *RowMap) GetInt(key string) int {
res, _ := strconv.Atoi(this.GetString(key))
return res
}
func (this *RowMap) GetIntD(key string, def int) int {
res, err := strconv.Atoi(this.GetString(key))
if err != nil {
return def
}
return res
}
func (this *RowMap) GetUint(key string) uint {
res, _ := strconv.Atoi(this.GetString(key))
return uint(res)
}
func (this *RowMap) GetUintD(key string, def uint) uint {
res, err := strconv.Atoi(this.GetString(key))
if err != nil {
return def
}
return uint(res)
}
func (this *RowMap) GetBool(key string) bool {
return this.GetInt(key) != 0
}
// knownDBs is a DB cache by uri
var knownDBs map[string]*sql.DB = make(map[string]*sql.DB)
var knownDBsMutex = &sync.Mutex{}
// GetDB returns a DB instance based on uri.
// bool result indicates whether the DB was returned from cache; err
func GetDB(mysql_uri string) (*sql.DB, bool, error) {
knownDBsMutex.Lock()
defer func() {
knownDBsMutex.Unlock()
}()
var exists bool
if _, exists = knownDBs[mysql_uri]; !exists {
if db, err := sql.Open("mysql", mysql_uri); err == nil {
knownDBs[mysql_uri] = db
} else {
return db, exists, err
}
}
return knownDBs[mysql_uri], exists, nil
}
// RowToArray is a convenience function, typically not called directly, which maps a
// single read database row into a NullString
func RowToArray(rows *sql.Rows, columns []string) []CellData {
buff := make([]interface{}, len(columns))
data := make([]CellData, len(columns))
for i, _ := range buff {
buff[i] = data[i].NullString()
}
rows.Scan(buff...)
return data
}
// ScanRowsToArrays is a convenience function, typically not called directly, which maps rows
// already read from the databse into arrays of NullString
func ScanRowsToArrays(rows *sql.Rows, on_row func([]CellData) error) error {
columns, _ := rows.Columns()
for rows.Next() {
arr := RowToArray(rows, columns)
err := on_row(arr)
if err != nil {
return err
}
}
return nil
}
func rowToMap(row []CellData, columns []string) map[string]CellData {
m := make(map[string]CellData)
for k, data_col := range row {
m[columns[k]] = data_col
}
return m
}
// ScanRowsToMaps is a convenience function, typically not called directly, which maps rows
// already read from the databse into RowMap entries.
func ScanRowsToMaps(rows *sql.Rows, on_row func(RowMap) error) error {
columns, _ := rows.Columns()
err := ScanRowsToArrays(rows, func(arr []CellData) error {
m := rowToMap(arr, columns)
err := on_row(m)
if err != nil {
return err
}
return nil
})
return err
}
// QueryRowsMap is a convenience function allowing querying a result set while poviding a callback
// function activated per read row.
func QueryRowsMap(db *sql.DB, query string, on_row func(RowMap) error, args ...interface{}) error {
var err error
defer func() {
if derr := recover(); derr != nil {
err = errors.New(fmt.Sprintf("QueryRowsMap unexpected error: %+v", derr))
}
}()
rows, err := db.Query(query, args...)
defer rows.Close()
if err != nil && err != sql.ErrNoRows {
return log.Errore(err)
}
err = ScanRowsToMaps(rows, on_row)
return err
}
// queryResultData returns a raw array of rows for a given query, optionally reading and returning column names
func queryResultData(db *sql.DB, query string, retrieveColumns bool, args ...interface{}) (ResultData, []string, error) {
var err error
defer func() {
if derr := recover(); derr != nil {
err = errors.New(fmt.Sprintf("QueryRowsMap unexpected error: %+v", derr))
}
}()
columns := []string{}
rows, err := db.Query(query, args...)
defer rows.Close()
if err != nil && err != sql.ErrNoRows {
return EmptyResultData, columns, log.Errore(err)
}
if retrieveColumns {
// Don't pay if you don't want to
columns, _ = rows.Columns()
}
resultData := ResultData{}
err = ScanRowsToArrays(rows, func(rowData []CellData) error {
resultData = append(resultData, rowData)
return nil
})
return resultData, columns, err
}
// QueryResultData returns a raw array of rows
func QueryResultData(db *sql.DB, query string, args ...interface{}) (ResultData, error) {
resultData, _, err := queryResultData(db, query, false, args...)
return resultData, err
}
// QueryResultDataNamed returns a raw array of rows, with column names
func QueryResultDataNamed(db *sql.DB, query string, args ...interface{}) (ResultData, []string, error) {
return queryResultData(db, query, true, args...)
}
// QueryRowsMapBuffered reads data from the database into a buffer, and only then applies the given function per row.
// This allows the application to take its time with processing the data, albeit consuming as much memory as required by
// the result set.
func QueryRowsMapBuffered(db *sql.DB, query string, on_row func(RowMap) error, args ...interface{}) error {
resultData, columns, err := queryResultData(db, query, true, args...)
if err != nil {
// Already logged
return err
}
for _, row := range resultData {
err = on_row(rowToMap(row, columns))
if err != nil {
return err
}
}
return nil
}
// ExecNoPrepare executes given query using given args on given DB, without using prepared statements.
func ExecNoPrepare(db *sql.DB, query string, args ...interface{}) (sql.Result, error) {
var err error
defer func() {
if derr := recover(); derr != nil {
err = errors.New(fmt.Sprintf("ExecNoPrepare unexpected error: %+v", derr))
}
}()
var res sql.Result
res, err = db.Exec(query, args...)
if err != nil {
log.Errore(err)
}
return res, err
}
// ExecQuery executes given query using given args on given DB. It will safele prepare, execute and close
// the statement.
func execInternal(silent bool, db *sql.DB, query string, args ...interface{}) (sql.Result, error) {
var err error
defer func() {
if derr := recover(); derr != nil {
err = errors.New(fmt.Sprintf("execInternal unexpected error: %+v", derr))
}
}()
stmt, err := db.Prepare(query)
if err != nil {
return nil, err
}
defer stmt.Close()
var res sql.Result
res, err = stmt.Exec(args...)
if err != nil && !silent {
log.Errore(err)
}
return res, err
}
// Exec executes given query using given args on given DB. It will safele prepare, execute and close
// the statement.
func Exec(db *sql.DB, query string, args ...interface{}) (sql.Result, error) {
return execInternal(false, db, query, args...)
}
// ExecSilently acts like Exec but does not report any error
func ExecSilently(db *sql.DB, query string, args ...interface{}) (sql.Result, error) {
return execInternal(true, db, query, args...)
}
func InClauseStringValues(terms []string) string {
quoted := []string{}
for _, s := range terms {
quoted = append(quoted, fmt.Sprintf("'%s'", strings.Replace(s, ",", "''", -1)))
}
return strings.Join(quoted, ", ")
}
// Convert variable length arguments into arguments array
func Args(args ...interface{}) []interface{} {
return args
}