mirror of
https://github.com/octoleo/restic.git
synced 2024-11-02 19:49:44 +00:00
1095 lines
26 KiB
Go
1095 lines
26 KiB
Go
|
// This implements a very basic Swift server
|
||
|
// Everything is stored in memory
|
||
|
//
|
||
|
// This comes from the https://github.com/mitchellh/goamz
|
||
|
// and was adapted for Swift
|
||
|
//
|
||
|
package swifttest
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
"crypto/hmac"
|
||
|
"crypto/md5"
|
||
|
"crypto/rand"
|
||
|
"crypto/sha1"
|
||
|
"encoding/hex"
|
||
|
"encoding/json"
|
||
|
"fmt"
|
||
|
"io"
|
||
|
"io/ioutil"
|
||
|
"log"
|
||
|
"mime"
|
||
|
"net"
|
||
|
"net/http"
|
||
|
"net/http/httptest"
|
||
|
"net/url"
|
||
|
"path"
|
||
|
"regexp"
|
||
|
"sort"
|
||
|
"strconv"
|
||
|
"strings"
|
||
|
"sync"
|
||
|
"sync/atomic"
|
||
|
"testing"
|
||
|
"time"
|
||
|
|
||
|
"github.com/ncw/swift"
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
DEBUG = false
|
||
|
TEST_ACCOUNT = "swifttest"
|
||
|
)
|
||
|
|
||
|
type HandlerOverrideFunc func(w http.ResponseWriter, r *http.Request, recorder *httptest.ResponseRecorder)
|
||
|
|
||
|
type SwiftServer struct {
|
||
|
sync.RWMutex
|
||
|
t *testing.T
|
||
|
reqId int64
|
||
|
mu sync.Mutex
|
||
|
Listener net.Listener
|
||
|
AuthURL string
|
||
|
URL string
|
||
|
Accounts map[string]*account
|
||
|
Sessions map[string]*session
|
||
|
override map[string]HandlerOverrideFunc
|
||
|
}
|
||
|
|
||
|
// The Folder type represents a container stored in an account
|
||
|
type Folder struct {
|
||
|
Count int64 `json:"count"`
|
||
|
Bytes int64 `json:"bytes"`
|
||
|
Name string `json:"name"`
|
||
|
}
|
||
|
|
||
|
// The Key type represents an item stored in an container.
|
||
|
type Key struct {
|
||
|
Key string `json:"name"`
|
||
|
LastModified string `json:"last_modified"`
|
||
|
Size int64 `json:"bytes"`
|
||
|
// ETag gives the hex-encoded MD5 sum of the contents,
|
||
|
// surrounded with double-quotes.
|
||
|
ETag string `json:"hash"`
|
||
|
ContentType string `json:"content_type"`
|
||
|
// Owner Owner
|
||
|
}
|
||
|
|
||
|
type Subdir struct {
|
||
|
Subdir string `json:"subdir"`
|
||
|
}
|
||
|
|
||
|
type swiftError struct {
|
||
|
statusCode int
|
||
|
Code string
|
||
|
Message string
|
||
|
}
|
||
|
|
||
|
type action struct {
|
||
|
srv *SwiftServer
|
||
|
w http.ResponseWriter
|
||
|
req *http.Request
|
||
|
reqId string
|
||
|
user *account
|
||
|
}
|
||
|
|
||
|
type session struct {
|
||
|
username string
|
||
|
}
|
||
|
|
||
|
type metadata struct {
|
||
|
meta http.Header // metadata to return with requests.
|
||
|
}
|
||
|
|
||
|
type account struct {
|
||
|
sync.RWMutex
|
||
|
swift.Account
|
||
|
metadata
|
||
|
password string
|
||
|
ContainersLock sync.RWMutex
|
||
|
Containers map[string]*container
|
||
|
}
|
||
|
|
||
|
type object struct {
|
||
|
sync.RWMutex
|
||
|
metadata
|
||
|
name string
|
||
|
mtime time.Time
|
||
|
checksum []byte // also held as ETag in meta.
|
||
|
data []byte
|
||
|
content_type string
|
||
|
}
|
||
|
|
||
|
type container struct {
|
||
|
sync.RWMutex
|
||
|
metadata
|
||
|
name string
|
||
|
ctime time.Time
|
||
|
objects map[string]*object
|
||
|
bytes int64
|
||
|
}
|
||
|
|
||
|
type segment struct {
|
||
|
Path string `json:"path,omitempty"`
|
||
|
Hash string `json:"hash,omitempty"`
|
||
|
Size int64 `json:"size_bytes,omitempty"`
|
||
|
// When uploading a manifest, the attributes must be named `path`, `hash` and `size`
|
||
|
// but when querying the JSON content of a manifest with the `multipart-manifest=get`
|
||
|
// parameter, Swift names those attributes `name`, `etag` and `bytes`.
|
||
|
// We use all the different attributes names in this structure to be able to use
|
||
|
// the same structure for both uploading and retrieving.
|
||
|
Name string `json:"name,omitempty"`
|
||
|
Etag string `json:"etag,omitempty"`
|
||
|
Bytes int64 `json:"bytes,omitempty"`
|
||
|
ContentType string `json:"content_type,omitempty"`
|
||
|
LastModified string `json:"last_modified,omitempty"`
|
||
|
}
|
||
|
|
||
|
// A resource encapsulates the subject of an HTTP request.
|
||
|
// The resource referred to may or may not exist
|
||
|
// when the request is made.
|
||
|
type resource interface {
|
||
|
put(a *action) interface{}
|
||
|
get(a *action) interface{}
|
||
|
post(a *action) interface{}
|
||
|
delete(a *action) interface{}
|
||
|
copy(a *action) interface{}
|
||
|
}
|
||
|
|
||
|
type objectResource struct {
|
||
|
name string
|
||
|
version string
|
||
|
container *container // always non-nil.
|
||
|
object *object // may be nil.
|
||
|
}
|
||
|
|
||
|
type containerResource struct {
|
||
|
name string
|
||
|
container *container // non-nil if the container already exists.
|
||
|
}
|
||
|
|
||
|
var responseParams = map[string]bool{
|
||
|
"content-type": true,
|
||
|
"content-language": true,
|
||
|
"expires": true,
|
||
|
"cache-control": true,
|
||
|
"content-disposition": true,
|
||
|
"content-encoding": true,
|
||
|
}
|
||
|
|
||
|
func fatalf(code int, codeStr string, errf string, a ...interface{}) {
|
||
|
panic(&swiftError{
|
||
|
statusCode: code,
|
||
|
Code: codeStr,
|
||
|
Message: fmt.Sprintf(errf, a...),
|
||
|
})
|
||
|
}
|
||
|
|
||
|
func (m metadata) setMetadata(a *action, resource string) {
|
||
|
for key, values := range a.req.Header {
|
||
|
key = http.CanonicalHeaderKey(key)
|
||
|
if metaHeaders[key] || strings.HasPrefix(key, "X-"+strings.Title(resource)+"-Meta-") {
|
||
|
if values[0] != "" || resource == "object" {
|
||
|
m.meta[key] = values
|
||
|
} else {
|
||
|
m.meta.Del(key)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (m metadata) getMetadata(a *action) {
|
||
|
h := a.w.Header()
|
||
|
for name, d := range m.meta {
|
||
|
h[name] = d
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (c *container) list(delimiter string, marker string, prefix string, parent string) (resp []interface{}) {
|
||
|
var tmp orderedObjects
|
||
|
|
||
|
c.RLock()
|
||
|
defer c.RUnlock()
|
||
|
|
||
|
// first get all matching objects and arrange them in alphabetical order.
|
||
|
for _, obj := range c.objects {
|
||
|
if strings.HasPrefix(obj.name, prefix) {
|
||
|
tmp = append(tmp, obj)
|
||
|
}
|
||
|
}
|
||
|
sort.Sort(tmp)
|
||
|
|
||
|
var prefixes []string
|
||
|
for _, obj := range tmp {
|
||
|
if !strings.HasPrefix(obj.name, prefix) {
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
isPrefix := false
|
||
|
name := obj.name
|
||
|
if parent != "" {
|
||
|
if path.Dir(obj.name) != path.Clean(parent) {
|
||
|
continue
|
||
|
}
|
||
|
} else if delimiter != "" {
|
||
|
if i := strings.Index(obj.name[len(prefix):], delimiter); i >= 0 {
|
||
|
name = obj.name[:len(prefix)+i+len(delimiter)]
|
||
|
if prefixes != nil && prefixes[len(prefixes)-1] == name {
|
||
|
continue
|
||
|
}
|
||
|
isPrefix = true
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if name <= marker {
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
if isPrefix {
|
||
|
prefixes = append(prefixes, name)
|
||
|
|
||
|
resp = append(resp, Subdir{
|
||
|
Subdir: name,
|
||
|
})
|
||
|
} else {
|
||
|
resp = append(resp, obj)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// GET on a container lists the objects in the container.
|
||
|
func (r containerResource) get(a *action) interface{} {
|
||
|
if r.container == nil {
|
||
|
fatalf(404, "NoSuchContainer", "The specified container does not exist")
|
||
|
}
|
||
|
|
||
|
r.container.RLock()
|
||
|
|
||
|
delimiter := a.req.Form.Get("delimiter")
|
||
|
marker := a.req.Form.Get("marker")
|
||
|
prefix := a.req.Form.Get("prefix")
|
||
|
format := a.req.URL.Query().Get("format")
|
||
|
parent := a.req.Form.Get("path")
|
||
|
|
||
|
a.w.Header().Set("X-Container-Bytes-Used", strconv.Itoa(int(r.container.bytes)))
|
||
|
a.w.Header().Set("X-Container-Object-Count", strconv.Itoa(len(r.container.objects)))
|
||
|
r.container.getMetadata(a)
|
||
|
|
||
|
if a.req.Method == "HEAD" {
|
||
|
r.container.RUnlock()
|
||
|
return nil
|
||
|
}
|
||
|
r.container.RUnlock()
|
||
|
|
||
|
objects := r.container.list(delimiter, marker, prefix, parent)
|
||
|
|
||
|
if format == "json" {
|
||
|
a.w.Header().Set("Content-Type", "application/json")
|
||
|
var resp []interface{}
|
||
|
for _, item := range objects {
|
||
|
if obj, ok := item.(*object); ok {
|
||
|
resp = append(resp, obj.Key())
|
||
|
} else {
|
||
|
resp = append(resp, item)
|
||
|
}
|
||
|
}
|
||
|
return resp
|
||
|
} else {
|
||
|
for _, item := range objects {
|
||
|
if obj, ok := item.(*object); ok {
|
||
|
a.w.Write([]byte(obj.name + "\n"))
|
||
|
} else if subdir, ok := item.(Subdir); ok {
|
||
|
a.w.Write([]byte(subdir.Subdir + "\n"))
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// orderedContainers holds a slice of containers that can be sorted
|
||
|
// by name.
|
||
|
type orderedContainers []*container
|
||
|
|
||
|
func (s orderedContainers) Len() int {
|
||
|
return len(s)
|
||
|
}
|
||
|
func (s orderedContainers) Swap(i, j int) {
|
||
|
s[i], s[j] = s[j], s[i]
|
||
|
}
|
||
|
func (s orderedContainers) Less(i, j int) bool {
|
||
|
return s[i].name < s[j].name
|
||
|
}
|
||
|
|
||
|
func (r containerResource) delete(a *action) interface{} {
|
||
|
b := r.container
|
||
|
if b == nil {
|
||
|
fatalf(404, "NoSuchContainer", "The specified container does not exist")
|
||
|
}
|
||
|
if len(b.objects) > 0 {
|
||
|
fatalf(409, "Conflict", "The container you tried to delete is not empty")
|
||
|
}
|
||
|
a.user.Lock()
|
||
|
delete(a.user.Containers, b.name)
|
||
|
a.user.Account.Containers--
|
||
|
a.user.Unlock()
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (r containerResource) put(a *action) interface{} {
|
||
|
if a.req.URL.Query().Get("extract-archive") != "" {
|
||
|
fatalf(403, "Operation forbidden", "Bulk upload is not supported")
|
||
|
}
|
||
|
|
||
|
if r.container == nil {
|
||
|
if !validContainerName(r.name) {
|
||
|
fatalf(400, "InvalidContainerName", "The specified container is not valid")
|
||
|
}
|
||
|
r.container = &container{
|
||
|
name: r.name,
|
||
|
objects: make(map[string]*object),
|
||
|
metadata: metadata{
|
||
|
meta: make(http.Header),
|
||
|
},
|
||
|
}
|
||
|
r.container.setMetadata(a, "container")
|
||
|
|
||
|
a.user.Lock()
|
||
|
a.user.Containers[r.name] = r.container
|
||
|
a.user.Account.Containers++
|
||
|
a.user.Unlock()
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (r containerResource) post(a *action) interface{} {
|
||
|
if r.container == nil {
|
||
|
fatalf(400, "Method", "The resource could not be found.")
|
||
|
} else {
|
||
|
r.container.RLock()
|
||
|
defer r.container.RUnlock()
|
||
|
|
||
|
r.container.setMetadata(a, "container")
|
||
|
a.w.WriteHeader(201)
|
||
|
jsonMarshal(a.w, Folder{
|
||
|
Count: int64(len(r.container.objects)),
|
||
|
Bytes: r.container.bytes,
|
||
|
Name: r.container.name,
|
||
|
})
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (containerResource) copy(a *action) interface{} { return notAllowed() }
|
||
|
|
||
|
// validContainerName returns whether name is a valid bucket name.
|
||
|
// Here are the rules, from:
|
||
|
// http://docs.openstack.org/api/openstack-object-storage/1.0/content/ch_object-storage-dev-api-storage.html
|
||
|
//
|
||
|
// Container names cannot exceed 256 bytes and cannot contain the / character.
|
||
|
//
|
||
|
func validContainerName(name string) bool {
|
||
|
if len(name) == 0 || len(name) > 256 {
|
||
|
return false
|
||
|
}
|
||
|
for _, r := range name {
|
||
|
switch {
|
||
|
case r == '/':
|
||
|
return false
|
||
|
default:
|
||
|
}
|
||
|
}
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
// orderedObjects holds a slice of objects that can be sorted
|
||
|
// by name.
|
||
|
type orderedObjects []*object
|
||
|
|
||
|
func (s orderedObjects) Len() int {
|
||
|
return len(s)
|
||
|
}
|
||
|
func (s orderedObjects) Swap(i, j int) {
|
||
|
s[i], s[j] = s[j], s[i]
|
||
|
}
|
||
|
func (s orderedObjects) Less(i, j int) bool {
|
||
|
return s[i].name < s[j].name
|
||
|
}
|
||
|
|
||
|
func (obj *object) Key() Key {
|
||
|
return Key{
|
||
|
Key: obj.name,
|
||
|
LastModified: obj.mtime.Format("2006-01-02T15:04:05"),
|
||
|
Size: int64(len(obj.data)),
|
||
|
ETag: fmt.Sprintf("%x", obj.checksum),
|
||
|
ContentType: obj.content_type,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
var metaHeaders = map[string]bool{
|
||
|
"Content-Type": true,
|
||
|
"Content-Encoding": true,
|
||
|
"Content-Disposition": true,
|
||
|
"X-Object-Manifest": true,
|
||
|
"X-Static-Large-Object": true,
|
||
|
}
|
||
|
|
||
|
var rangeRegexp = regexp.MustCompile("(bytes=)?([0-9]*)-([0-9]*)")
|
||
|
|
||
|
// GET on an object gets the contents of the object.
|
||
|
func (objr objectResource) get(a *action) interface{} {
|
||
|
var (
|
||
|
etag []byte
|
||
|
reader io.Reader
|
||
|
start int
|
||
|
end int = -1
|
||
|
)
|
||
|
obj := objr.object
|
||
|
if obj == nil {
|
||
|
fatalf(404, "Not Found", "The resource could not be found.")
|
||
|
}
|
||
|
|
||
|
obj.RLock()
|
||
|
defer obj.RUnlock()
|
||
|
|
||
|
h := a.w.Header()
|
||
|
// add metadata
|
||
|
obj.getMetadata(a)
|
||
|
|
||
|
if r := a.req.Header.Get("Range"); r != "" {
|
||
|
m := rangeRegexp.FindStringSubmatch(r)
|
||
|
if m[2] != "" {
|
||
|
start, _ = strconv.Atoi(m[2])
|
||
|
}
|
||
|
if m[3] != "" {
|
||
|
end, _ = strconv.Atoi(m[3])
|
||
|
}
|
||
|
}
|
||
|
|
||
|
max := func(a int, b int) int {
|
||
|
if a > b {
|
||
|
return a
|
||
|
}
|
||
|
return b
|
||
|
}
|
||
|
|
||
|
if manifest, ok := obj.meta["X-Object-Manifest"]; ok {
|
||
|
var segments []io.Reader
|
||
|
components := strings.SplitN(manifest[0], "/", 2)
|
||
|
a.user.RLock()
|
||
|
segContainer := a.user.Containers[components[0]]
|
||
|
a.user.RUnlock()
|
||
|
prefix := components[1]
|
||
|
resp := segContainer.list("", "", prefix, "")
|
||
|
sum := md5.New()
|
||
|
cursor := 0
|
||
|
size := 0
|
||
|
for _, item := range resp {
|
||
|
if obj, ok := item.(*object); ok {
|
||
|
length := len(obj.data)
|
||
|
size += length
|
||
|
sum.Write([]byte(hex.EncodeToString(obj.checksum)))
|
||
|
if start >= cursor+length {
|
||
|
continue
|
||
|
}
|
||
|
segments = append(segments, bytes.NewReader(obj.data[max(0, start-cursor):]))
|
||
|
cursor += length
|
||
|
}
|
||
|
}
|
||
|
etag = sum.Sum(nil)
|
||
|
if end == -1 {
|
||
|
end = size - 1
|
||
|
}
|
||
|
reader = io.LimitReader(io.MultiReader(segments...), int64(end-start+1))
|
||
|
} else if value, ok := obj.meta["X-Static-Large-Object"]; ok && value[0] == "True" && a.req.URL.Query().Get("multipart-manifest") != "get" {
|
||
|
var segments []io.Reader
|
||
|
var segmentList []segment
|
||
|
json.Unmarshal(obj.data, &segmentList)
|
||
|
cursor := 0
|
||
|
size := 0
|
||
|
sum := md5.New()
|
||
|
for _, segment := range segmentList {
|
||
|
components := strings.SplitN(segment.Name[1:], "/", 2)
|
||
|
a.user.RLock()
|
||
|
segContainer := a.user.Containers[components[0]]
|
||
|
a.user.RUnlock()
|
||
|
objectName := components[1]
|
||
|
segObject := segContainer.objects[objectName]
|
||
|
length := len(segObject.data)
|
||
|
size += length
|
||
|
sum.Write([]byte(hex.EncodeToString(segObject.checksum)))
|
||
|
if start >= cursor+length {
|
||
|
continue
|
||
|
}
|
||
|
segments = append(segments, bytes.NewReader(segObject.data[max(0, start-cursor):]))
|
||
|
cursor += length
|
||
|
}
|
||
|
etag = sum.Sum(nil)
|
||
|
if end == -1 {
|
||
|
end = size - 1
|
||
|
}
|
||
|
reader = io.LimitReader(io.MultiReader(segments...), int64(end-start+1))
|
||
|
} else {
|
||
|
if end == -1 {
|
||
|
end = len(obj.data) - 1
|
||
|
}
|
||
|
etag = obj.checksum
|
||
|
reader = bytes.NewReader(obj.data[start : end+1])
|
||
|
}
|
||
|
|
||
|
h.Set("Content-Length", fmt.Sprint(end-start+1))
|
||
|
h.Set("ETag", hex.EncodeToString(etag))
|
||
|
h.Set("Last-Modified", obj.mtime.Format(http.TimeFormat))
|
||
|
|
||
|
if a.req.Method == "HEAD" {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// TODO avoid holding the lock when writing data.
|
||
|
_, err := io.Copy(a.w, reader)
|
||
|
if err != nil {
|
||
|
// we can't do much except just log the fact.
|
||
|
log.Printf("error writing data: %v", err)
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// PUT on an object creates the object.
|
||
|
func (objr objectResource) put(a *action) interface{} {
|
||
|
var expectHash []byte
|
||
|
if c := a.req.Header.Get("ETag"); c != "" {
|
||
|
var err error
|
||
|
expectHash, err = hex.DecodeString(c)
|
||
|
if err != nil || len(expectHash) != md5.Size {
|
||
|
fatalf(400, "InvalidDigest", "The ETag you specified was invalid")
|
||
|
}
|
||
|
}
|
||
|
sum := md5.New()
|
||
|
// TODO avoid holding lock while reading data.
|
||
|
data, err := ioutil.ReadAll(io.TeeReader(a.req.Body, sum))
|
||
|
if err != nil {
|
||
|
fatalf(400, "TODO", "read error")
|
||
|
}
|
||
|
gotHash := sum.Sum(nil)
|
||
|
if expectHash != nil && bytes.Compare(gotHash, expectHash) != 0 {
|
||
|
fatalf(422, "Bad ETag", "The ETag you specified did not match what we received")
|
||
|
}
|
||
|
if a.req.ContentLength >= 0 && int64(len(data)) != a.req.ContentLength {
|
||
|
fatalf(400, "IncompleteBody", "You did not provide the number of bytes specified by the Content-Length HTTP header")
|
||
|
}
|
||
|
|
||
|
// TODO is this correct, or should we erase all previous metadata?
|
||
|
obj := objr.object
|
||
|
if obj == nil {
|
||
|
obj = &object{
|
||
|
name: objr.name,
|
||
|
metadata: metadata{
|
||
|
meta: make(http.Header),
|
||
|
},
|
||
|
}
|
||
|
atomic.AddInt64(&a.user.Objects, 1)
|
||
|
} else {
|
||
|
atomic.AddInt64(&objr.container.bytes, -int64(len(obj.data)))
|
||
|
atomic.AddInt64(&a.user.BytesUsed, -int64(len(obj.data)))
|
||
|
}
|
||
|
|
||
|
var content_type string
|
||
|
if content_type = a.req.Header.Get("Content-Type"); content_type == "" {
|
||
|
content_type = mime.TypeByExtension(obj.name)
|
||
|
if content_type == "" {
|
||
|
content_type = "application/octet-stream"
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if a.req.URL.Query().Get("multipart-manifest") == "put" {
|
||
|
// TODO: check the content of the SLO
|
||
|
a.req.Header.Set("X-Static-Large-Object", "True")
|
||
|
|
||
|
var segments []segment
|
||
|
json.Unmarshal(data, &segments)
|
||
|
for i := range segments {
|
||
|
segments[i].Name = "/" + segments[i].Path
|
||
|
segments[i].Path = ""
|
||
|
segments[i].Hash = segments[i].Etag
|
||
|
segments[i].Etag = ""
|
||
|
segments[i].Bytes = segments[i].Size
|
||
|
segments[i].Size = 0
|
||
|
}
|
||
|
|
||
|
data, _ = json.Marshal(segments)
|
||
|
sum = md5.New()
|
||
|
sum.Write(data)
|
||
|
gotHash = sum.Sum(nil)
|
||
|
}
|
||
|
|
||
|
// PUT request has been successful - save data and metadata
|
||
|
obj.setMetadata(a, "object")
|
||
|
obj.content_type = content_type
|
||
|
obj.data = data
|
||
|
obj.checksum = gotHash
|
||
|
obj.mtime = time.Now().UTC()
|
||
|
objr.container.Lock()
|
||
|
objr.container.objects[objr.name] = obj
|
||
|
objr.container.bytes += int64(len(data))
|
||
|
objr.container.Unlock()
|
||
|
|
||
|
atomic.AddInt64(&a.user.BytesUsed, int64(len(data)))
|
||
|
|
||
|
h := a.w.Header()
|
||
|
h.Set("ETag", hex.EncodeToString(obj.checksum))
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (objr objectResource) delete(a *action) interface{} {
|
||
|
if objr.object == nil {
|
||
|
fatalf(404, "NoSuchKey", "The specified key does not exist.")
|
||
|
}
|
||
|
|
||
|
objr.container.Lock()
|
||
|
defer objr.container.Unlock()
|
||
|
|
||
|
objr.object.Lock()
|
||
|
defer objr.object.Unlock()
|
||
|
|
||
|
objr.container.bytes -= int64(len(objr.object.data))
|
||
|
delete(objr.container.objects, objr.name)
|
||
|
|
||
|
atomic.AddInt64(&a.user.BytesUsed, -int64(len(objr.object.data)))
|
||
|
atomic.AddInt64(&a.user.Objects, -1)
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (objr objectResource) post(a *action) interface{} {
|
||
|
objr.object.Lock()
|
||
|
defer objr.object.Unlock()
|
||
|
|
||
|
obj := objr.object
|
||
|
obj.setMetadata(a, "object")
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (objr objectResource) copy(a *action) interface{} {
|
||
|
if objr.object == nil {
|
||
|
fatalf(404, "NoSuchKey", "The specified key does not exist.")
|
||
|
}
|
||
|
|
||
|
obj := objr.object
|
||
|
obj.RLock()
|
||
|
defer obj.RUnlock()
|
||
|
|
||
|
destination := a.req.Header.Get("Destination")
|
||
|
if destination == "" {
|
||
|
fatalf(400, "Bad Request", "You must provide a Destination header")
|
||
|
}
|
||
|
|
||
|
var (
|
||
|
obj2 *object
|
||
|
objr2 objectResource
|
||
|
)
|
||
|
|
||
|
destURL, _ := url.Parse("/v1/AUTH_" + TEST_ACCOUNT + "/" + destination)
|
||
|
r := a.srv.resourceForURL(destURL)
|
||
|
switch t := r.(type) {
|
||
|
case objectResource:
|
||
|
objr2 = t
|
||
|
if objr2.object == nil {
|
||
|
obj2 = &object{
|
||
|
name: objr2.name,
|
||
|
metadata: metadata{
|
||
|
meta: make(http.Header),
|
||
|
},
|
||
|
}
|
||
|
atomic.AddInt64(&a.user.Objects, 1)
|
||
|
} else {
|
||
|
obj2 = objr2.object
|
||
|
atomic.AddInt64(&objr2.container.bytes, -int64(len(obj2.data)))
|
||
|
atomic.AddInt64(&a.user.BytesUsed, -int64(len(obj2.data)))
|
||
|
}
|
||
|
default:
|
||
|
fatalf(400, "Bad Request", "Destination must point to a valid object path")
|
||
|
}
|
||
|
|
||
|
if objr2.container.name != objr2.container.name && obj2.name != obj.name {
|
||
|
obj2.Lock()
|
||
|
defer obj2.Unlock()
|
||
|
}
|
||
|
|
||
|
obj2.content_type = obj.content_type
|
||
|
obj2.data = obj.data
|
||
|
obj2.checksum = obj.checksum
|
||
|
obj2.mtime = time.Now()
|
||
|
|
||
|
for key, values := range obj.metadata.meta {
|
||
|
obj2.metadata.meta[key] = values
|
||
|
}
|
||
|
obj2.setMetadata(a, "object")
|
||
|
|
||
|
objr2.container.Lock()
|
||
|
objr2.container.objects[objr2.name] = obj2
|
||
|
objr2.container.bytes += int64(len(obj.data))
|
||
|
objr2.container.Unlock()
|
||
|
|
||
|
atomic.AddInt64(&a.user.BytesUsed, int64(len(obj.data)))
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (s *SwiftServer) serveHTTP(w http.ResponseWriter, req *http.Request) {
|
||
|
// ignore error from ParseForm as it's usually spurious.
|
||
|
req.ParseForm()
|
||
|
|
||
|
if fn := s.override[req.URL.Path]; fn != nil {
|
||
|
originalRW := w
|
||
|
recorder := httptest.NewRecorder()
|
||
|
w = recorder
|
||
|
defer func() {
|
||
|
fn(originalRW, req, recorder)
|
||
|
}()
|
||
|
}
|
||
|
|
||
|
if DEBUG {
|
||
|
log.Printf("swifttest %q %q", req.Method, req.URL)
|
||
|
}
|
||
|
a := &action{
|
||
|
srv: s,
|
||
|
w: w,
|
||
|
req: req,
|
||
|
reqId: fmt.Sprintf("%09X", atomic.LoadInt64(&s.reqId)),
|
||
|
}
|
||
|
atomic.AddInt64(&s.reqId, 1)
|
||
|
|
||
|
var r resource
|
||
|
defer func() {
|
||
|
switch err := recover().(type) {
|
||
|
case *swiftError:
|
||
|
w.Header().Set("Content-Type", `text/plain; charset=utf-8`)
|
||
|
http.Error(w, err.Message, err.statusCode)
|
||
|
case nil:
|
||
|
default:
|
||
|
panic(err)
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
var resp interface{}
|
||
|
|
||
|
if req.URL.String() == "/v1.0" {
|
||
|
username := req.Header.Get("x-auth-user")
|
||
|
key := req.Header.Get("x-auth-key")
|
||
|
s.Lock()
|
||
|
defer s.Unlock()
|
||
|
if acct, ok := s.Accounts[username]; ok {
|
||
|
if acct.password == key {
|
||
|
r := make([]byte, 16)
|
||
|
_, _ = rand.Read(r)
|
||
|
id := fmt.Sprintf("%X", r)
|
||
|
w.Header().Set("X-Storage-Url", s.URL+"/AUTH_"+username)
|
||
|
w.Header().Set("X-Auth-Token", "AUTH_tk"+string(id))
|
||
|
w.Header().Set("X-Storage-Token", "AUTH_tk"+string(id))
|
||
|
s.Sessions[id] = &session{
|
||
|
username: username,
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
panic(notAuthorized())
|
||
|
}
|
||
|
|
||
|
if req.URL.String() == "/info" {
|
||
|
jsonMarshal(w, &swift.SwiftInfo{
|
||
|
"swift": map[string]interface{}{
|
||
|
"version": "1.2",
|
||
|
},
|
||
|
"tempurl": map[string]interface{}{
|
||
|
"methods": []string{"GET", "HEAD", "PUT"},
|
||
|
},
|
||
|
"slo": map[string]interface{}{
|
||
|
"max_manifest_segments": 1000,
|
||
|
"max_manifest_size": 2097152,
|
||
|
"min_segment_size": 1,
|
||
|
},
|
||
|
})
|
||
|
return
|
||
|
}
|
||
|
|
||
|
r = s.resourceForURL(req.URL)
|
||
|
|
||
|
key := req.Header.Get("x-auth-token")
|
||
|
signature := req.URL.Query().Get("temp_url_sig")
|
||
|
expires := req.URL.Query().Get("temp_url_expires")
|
||
|
if key == "" && signature != "" && expires != "" {
|
||
|
accountName, _, _, _ := s.parseURL(req.URL)
|
||
|
secretKey := ""
|
||
|
s.RLock()
|
||
|
if account, ok := s.Accounts[accountName]; ok {
|
||
|
secretKey = account.meta.Get("X-Account-Meta-Temp-Url-Key")
|
||
|
}
|
||
|
s.RUnlock()
|
||
|
|
||
|
get_hmac := func(method string) string {
|
||
|
mac := hmac.New(sha1.New, []byte(secretKey))
|
||
|
body := fmt.Sprintf("%s\n%s\n%s", method, expires, req.URL.Path)
|
||
|
mac.Write([]byte(body))
|
||
|
return hex.EncodeToString(mac.Sum(nil))
|
||
|
}
|
||
|
|
||
|
if req.Method == "HEAD" {
|
||
|
if signature != get_hmac("GET") && signature != get_hmac("POST") && signature != get_hmac("PUT") {
|
||
|
panic(notAuthorized())
|
||
|
}
|
||
|
} else if signature != get_hmac(req.Method) {
|
||
|
panic(notAuthorized())
|
||
|
}
|
||
|
} else {
|
||
|
s.RLock()
|
||
|
session, ok := s.Sessions[key[7:]]
|
||
|
if !ok {
|
||
|
s.RUnlock()
|
||
|
panic(notAuthorized())
|
||
|
return
|
||
|
}
|
||
|
|
||
|
a.user = s.Accounts[session.username]
|
||
|
s.RUnlock()
|
||
|
}
|
||
|
|
||
|
switch req.Method {
|
||
|
case "PUT":
|
||
|
resp = r.put(a)
|
||
|
case "GET", "HEAD":
|
||
|
resp = r.get(a)
|
||
|
case "DELETE":
|
||
|
resp = r.delete(a)
|
||
|
case "POST":
|
||
|
resp = r.post(a)
|
||
|
case "COPY":
|
||
|
resp = r.copy(a)
|
||
|
default:
|
||
|
fatalf(400, "MethodNotAllowed", "unknown http request method %q", req.Method)
|
||
|
}
|
||
|
|
||
|
content_type := req.Header.Get("Content-Type")
|
||
|
if resp != nil && req.Method != "HEAD" {
|
||
|
if strings.HasPrefix(content_type, "application/json") ||
|
||
|
req.URL.Query().Get("format") == "json" {
|
||
|
jsonMarshal(w, resp)
|
||
|
} else {
|
||
|
switch r := resp.(type) {
|
||
|
case string:
|
||
|
w.Write([]byte(r))
|
||
|
default:
|
||
|
w.Write(resp.([]byte))
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (s *SwiftServer) SetOverride(path string, fn HandlerOverrideFunc) {
|
||
|
s.override[path] = fn
|
||
|
}
|
||
|
|
||
|
func (s *SwiftServer) UnsetOverride(path string) {
|
||
|
delete(s.override, path)
|
||
|
}
|
||
|
|
||
|
func jsonMarshal(w io.Writer, x interface{}) {
|
||
|
if err := json.NewEncoder(w).Encode(x); err != nil {
|
||
|
panic(fmt.Errorf("error marshalling %#v: %v", x, err))
|
||
|
}
|
||
|
}
|
||
|
|
||
|
var pathRegexp = regexp.MustCompile("/v1/AUTH_([a-zA-Z0-9]+)(/([^/]+)(/(.*))?)?")
|
||
|
|
||
|
func (srv *SwiftServer) parseURL(u *url.URL) (account string, container string, object string, err error) {
|
||
|
m := pathRegexp.FindStringSubmatch(u.Path)
|
||
|
if m == nil {
|
||
|
return "", "", "", fmt.Errorf("Couldn't parse the specified URI")
|
||
|
}
|
||
|
account = m[1]
|
||
|
container = m[3]
|
||
|
object = m[5]
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// resourceForURL returns a resource object for the given URL.
|
||
|
func (srv *SwiftServer) resourceForURL(u *url.URL) (r resource) {
|
||
|
accountName, containerName, objectName, err := srv.parseURL(u)
|
||
|
|
||
|
if err != nil {
|
||
|
fatalf(404, "InvalidURI", err.Error())
|
||
|
}
|
||
|
|
||
|
srv.RLock()
|
||
|
account, ok := srv.Accounts[accountName]
|
||
|
if !ok {
|
||
|
//srv.RUnlock()
|
||
|
fatalf(404, "NoSuchAccount", "The specified account does not exist")
|
||
|
}
|
||
|
srv.RUnlock()
|
||
|
|
||
|
account.RLock()
|
||
|
if containerName == "" {
|
||
|
account.RUnlock()
|
||
|
return rootResource{}
|
||
|
}
|
||
|
account.RUnlock()
|
||
|
|
||
|
b := containerResource{
|
||
|
name: containerName,
|
||
|
container: account.Containers[containerName],
|
||
|
}
|
||
|
|
||
|
if objectName == "" {
|
||
|
return b
|
||
|
}
|
||
|
|
||
|
if b.container == nil {
|
||
|
fatalf(404, "NoSuchContainer", "The specified container does not exist")
|
||
|
}
|
||
|
|
||
|
objr := objectResource{
|
||
|
name: objectName,
|
||
|
version: u.Query().Get("versionId"),
|
||
|
container: b.container,
|
||
|
}
|
||
|
|
||
|
objr.container.RLock()
|
||
|
defer objr.container.RUnlock()
|
||
|
if obj := objr.container.objects[objr.name]; obj != nil {
|
||
|
objr.object = obj
|
||
|
}
|
||
|
return objr
|
||
|
}
|
||
|
|
||
|
// nullResource has error stubs for all resource methods.
|
||
|
type nullResource struct{}
|
||
|
|
||
|
func notAllowed() interface{} {
|
||
|
fatalf(400, "MethodNotAllowed", "The specified method is not allowed against this resource")
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func notAuthorized() interface{} {
|
||
|
fatalf(401, "Unauthorized", "This server could not verify that you are authorized to access the document you requested.")
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (nullResource) put(a *action) interface{} { return notAllowed() }
|
||
|
func (nullResource) get(a *action) interface{} { return notAllowed() }
|
||
|
func (nullResource) post(a *action) interface{} { return notAllowed() }
|
||
|
func (nullResource) delete(a *action) interface{} { return notAllowed() }
|
||
|
func (nullResource) copy(a *action) interface{} { return notAllowed() }
|
||
|
|
||
|
type rootResource struct{}
|
||
|
|
||
|
func (rootResource) put(a *action) interface{} { return notAllowed() }
|
||
|
func (rootResource) get(a *action) interface{} {
|
||
|
marker := a.req.Form.Get("marker")
|
||
|
prefix := a.req.Form.Get("prefix")
|
||
|
format := a.req.URL.Query().Get("format")
|
||
|
|
||
|
h := a.w.Header()
|
||
|
|
||
|
h.Set("X-Account-Bytes-Used", strconv.Itoa(int(atomic.LoadInt64(&a.user.BytesUsed))))
|
||
|
h.Set("X-Account-Container-Count", strconv.Itoa(int(atomic.LoadInt64(&a.user.Account.Containers))))
|
||
|
h.Set("X-Account-Object-Count", strconv.Itoa(int(atomic.LoadInt64(&a.user.Objects))))
|
||
|
|
||
|
a.user.RLock()
|
||
|
defer a.user.RUnlock()
|
||
|
|
||
|
// add metadata
|
||
|
a.user.metadata.getMetadata(a)
|
||
|
|
||
|
if a.req.Method == "HEAD" {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
var tmp orderedContainers
|
||
|
// first get all matching objects and arrange them in alphabetical order.
|
||
|
for _, container := range a.user.Containers {
|
||
|
if strings.HasPrefix(container.name, prefix) {
|
||
|
tmp = append(tmp, container)
|
||
|
}
|
||
|
}
|
||
|
sort.Sort(tmp)
|
||
|
|
||
|
resp := make([]Folder, 0)
|
||
|
for _, container := range tmp {
|
||
|
if container.name <= marker {
|
||
|
continue
|
||
|
}
|
||
|
if format == "json" {
|
||
|
resp = append(resp, Folder{
|
||
|
Count: int64(len(container.objects)),
|
||
|
Bytes: container.bytes,
|
||
|
Name: container.name,
|
||
|
})
|
||
|
} else {
|
||
|
a.w.Write([]byte(container.name + "\n"))
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if format == "json" {
|
||
|
return resp
|
||
|
} else {
|
||
|
return nil
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (r rootResource) post(a *action) interface{} {
|
||
|
a.user.Lock()
|
||
|
a.user.metadata.setMetadata(a, "account")
|
||
|
a.user.Unlock()
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (rootResource) delete(a *action) interface{} {
|
||
|
if a.req.URL.Query().Get("bulk-delete") == "1" {
|
||
|
fatalf(403, "Operation forbidden", "Bulk delete is not supported")
|
||
|
}
|
||
|
|
||
|
return notAllowed()
|
||
|
}
|
||
|
|
||
|
func (rootResource) copy(a *action) interface{} { return notAllowed() }
|
||
|
|
||
|
func NewSwiftServer(address string) (*SwiftServer, error) {
|
||
|
if strings.Index(address, ":") == -1 {
|
||
|
address += ":0"
|
||
|
}
|
||
|
l, err := net.Listen("tcp", address)
|
||
|
if err != nil {
|
||
|
return nil, fmt.Errorf("cannot listen on %s: %v", address, err)
|
||
|
}
|
||
|
|
||
|
server := &SwiftServer{
|
||
|
Listener: l,
|
||
|
AuthURL: "http://" + l.Addr().String() + "/v1.0",
|
||
|
URL: "http://" + l.Addr().String() + "/v1",
|
||
|
Accounts: make(map[string]*account),
|
||
|
Sessions: make(map[string]*session),
|
||
|
override: make(map[string]HandlerOverrideFunc),
|
||
|
}
|
||
|
|
||
|
server.Accounts[TEST_ACCOUNT] = &account{
|
||
|
password: TEST_ACCOUNT,
|
||
|
metadata: metadata{
|
||
|
meta: make(http.Header),
|
||
|
},
|
||
|
Containers: make(map[string]*container),
|
||
|
}
|
||
|
|
||
|
go http.Serve(l, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||
|
server.serveHTTP(w, req)
|
||
|
}))
|
||
|
|
||
|
return server, nil
|
||
|
}
|
||
|
|
||
|
func (srv *SwiftServer) Close() {
|
||
|
srv.Listener.Close()
|
||
|
}
|