s3fs-fuse/s3fs/s3fs.cpp

1436 lines
40 KiB
C++
Raw Normal View History

/*
* s3fs - FUSE-based file system backed by Amazon S3
*
* Copyright 2007-2008 Randy Rizun <rrizun@gmail.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#define FUSE_USE_VERSION 26
#include <fuse.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <dirent.h>
#include <errno.h>
#include <sys/time.h>
#include <libgen.h>
#include <pthread.h>
#include <curl/curl.h>
#include <libxml/parser.h>
#include <libxml/tree.h>
#include <syslog.h>
#include <fstream>
#include <iostream>
#include <map>
#include <sstream>
#include <stack>
#include <string>
#include <vector>
#include <algorithm>
using namespace std;
#define VERIFY(s) if (true) { \
int result = (s); \
if (result != 0) \
return result; \
}
#define Yikes(result) if (true) { \
syslog(LOG_ERR,"%d###result=%d", __LINE__, result); \
return result; \
}
class auto_fd {
int fd;
public:
auto_fd(int fd): fd(fd) {
}
~auto_fd() {
///###cout << "close[fd=" << fd << "]" << endl;
close(fd);
}
int get() {
return fd;
}
};
template<typename T>
string
stringificationizer(T value) {
stringstream tmp;
tmp << value;
return tmp.str();
}
#define SPACES " \t\r\n"
inline string trim_left (const string & s, const string & t = SPACES)
{
string d (s);
return d.erase (0, s.find_first_not_of (t)) ;
} // end of trim_left
inline string trim_right (const string & s, const string & t = SPACES)
{
string d (s);
string::size_type i (d.find_last_not_of (t));
if (i == string::npos)
return "";
else
return d.erase (d.find_last_not_of (t) + 1) ;
} // end of trim_right
inline string trim (const string & s, const string & t = SPACES)
{
string d (s);
return trim_left (trim_right (d, t), t) ;
} // end of trim
class auto_lock {
pthread_mutex_t& lock;
public:
auto_lock(pthread_mutex_t& lock): lock(lock) {
pthread_mutex_lock(&lock);
}
~auto_lock() {
pthread_mutex_unlock(&lock);
}
};
stack<CURL*> curl_handles;
static pthread_mutex_t curl_handles_lock;
static CURL*
alloc_curl_handle() {
CURL* curl;
auto_lock lock(curl_handles_lock);
if (curl_handles.size() == 0)
curl = curl_easy_init();
else {
curl = curl_handles.top();
curl_handles.pop();
}
curl_easy_reset(curl);
long signal = 1;
curl_easy_setopt(curl, CURLOPT_NOSIGNAL, signal);
long seconds = 10;
curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, seconds);
// don't use CURLOPT_TIMEOUT...
// e.g., what timeout would you choose for uploading a 5G file?!?
return curl;
}
static void
return_curl_handle(CURL* curl_handle) {
if (curl_handle != 0) {
auto_lock lock(curl_handles_lock);
curl_handles.push(curl_handle);
}
}
class auto_curl {
CURL* curl_handle;
public:
auto_curl(): curl_handle(alloc_curl_handle()) {
}
// auto_curl(CURL* curl): curl(curl) {
//// auto_lock lock(curl_handles_lock);
//// if (curl_handles.size() == 0)
//// curl = curl_easy_init();
//// else {
//// curl = curl_handles.top();
//// curl_handles.pop();
//// }
//// curl_easy_reset(curl);
//// long seconds = 10;
//// //###curl_easy_setopt(curl, CURLOPT_TIMEOUT, seconds); // bad idea
//// curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, seconds);
// }
~auto_curl() {
if (curl_handle != 0) {
return_curl_handle(curl_handle);
// auto_lock lock(curl_handles_lock);
// curl_handles.push(curl);
}
}
CURL* get() const {
return curl_handle;
}
// CURL* release() {
// CURL* tmp = curl;
// curl = 0;
// return tmp;
// }
// void reset(CURL* curl) {
// if (curl != 0) {
// auto_lock lock(curl_handles_lock);
// curl_handles.push(curl);
// }
// this->curl = curl;
// }
operator CURL*() const {
return curl_handle;
}
};
struct curl_multi_remove_handle_functor {
CURLM* multi_handle;
curl_multi_remove_handle_functor(CURLM* multi_handle): multi_handle(multi_handle) {
}
void operator()(CURL* curl_handle) {
curl_multi_remove_handle(multi_handle, curl_handle);
return_curl_handle(curl_handle);
}
};
class auto_curl_multi {
CURLM* multi_handle;
vector<CURL*> curl_handles;
public:
auto_curl_multi(): multi_handle(curl_multi_init()) {
}
~auto_curl_multi() {
curl_multi_cleanup(for_each(curl_handles.begin(), curl_handles.end(), curl_multi_remove_handle_functor(multi_handle)).multi_handle);
}
CURLM* get() const {
return multi_handle;
}
void add_curl(CURL* curl_handle) {
curl_handles.push_back(curl_handle);
curl_multi_add_handle(multi_handle, curl_handle);
}
};
class auto_curl_slist {
struct curl_slist* slist;
public:
auto_curl_slist(): slist(0) {
}
~auto_curl_slist() {
curl_slist_free_all(slist);
}
struct curl_slist* get() const {
return slist;
}
void append(const string& s) {
slist = curl_slist_append(slist, s.c_str());
}
};
// -oretries=2
static int retries = 2;
/**
* @return fuse return code
*/
static int
my_curl_easy_perform(CURL* curl, FILE* f = 0) {
// 1 attempt + retries...
int t = 1+retries;
while (t-- > 0) {
if (f)
rewind(f);
CURLcode curlCode = curl_easy_perform(curl);
if (curlCode == 0)
return 0;
if (curlCode == CURLE_OPERATION_TIMEDOUT)
syslog(LOG_ERR, "###timeout");
else if (curlCode == CURLE_HTTP_RETURNED_ERROR) {
long responseCode;
if (curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &responseCode) != 0)
return -EIO;
if (responseCode == 404)
return -ENOENT;
syslog(LOG_ERR, "###response=%ld", responseCode);
if (responseCode < 500)
return -EIO;
} else
syslog(LOG_ERR, "###%s", curl_easy_strerror(curlCode));;
syslog(LOG_ERR, "###retrying...");
}
syslog(LOG_ERR, "###giving up");
return -EIO;
}
static string bucket;
static string AWSAccessKeyId;
static string AWSSecretAccessKey;
static const string host = "http://s3.amazonaws.com";
static mode_t root_mode = 0;
// if .size()==0 then local file cache is disabled
static string use_cache;
// private, public-read, public-read-write, authenticated-read
static string default_acl("private");
// key=path
typedef map<string, struct stat> stat_cache_t;
static stat_cache_t stat_cache;
static pthread_mutex_t stat_cache_lock;
static const char hexAlphabet[] = "0123456789ABCDEF";
/**
* urlEncode a fuse path,
* taking into special consideration "/",
* otherwise regular urlEncode.
*/
string
urlEncode(const string &s) {
string result;
for (unsigned i = 0; i < s.length(); ++i) {
if (s[i] == '/') // Note- special case for fuse paths...
result += s[i];
else if (isalnum(s[i]))
result += s[i];
else if (s[i] == '.' || s[i] == '-' || s[i] == '*' || s[i] == '_')
result += s[i];
else if (s[i] == ' ')
result += '+';
else {
result += "%";
result += hexAlphabet[static_cast<unsigned char>(s[i]) / 16];
result += hexAlphabet[static_cast<unsigned char>(s[i]) % 16];
}
}
return result;
}
// http headers
typedef map<string, string> headers_t;
#include <openssl/bio.h>
#include <openssl/buffer.h>
#include <openssl/evp.h>
#include <openssl/hmac.h>
static const EVP_MD* evp_md = EVP_sha1();
/**
* Returns the current date
* in a format suitable for a HTTP request header.
*/
string
get_date() {
char buf[100];
time_t t = time(NULL);
strftime(buf, sizeof(buf), "%a, %d %b %Y %H:%M:%S GMT", gmtime(&t));
return buf;
}
/**
* Returns the Amazon AWS signature for the given parameters.
*
* @param method e.g., "GET"
* @param content_type e.g., "application/x-directory"
* @param date e.g., get_date()
* @param resource e.g., "/pub"
*/
string
calc_signature(string method, string content_type, string date, curl_slist* headers, string resource) {
string Signature;
string StringToSign;
StringToSign += method + "\n";
StringToSign += "\n"; // md5
StringToSign += content_type + "\n";
StringToSign += date + "\n";
int count = 0;
if (headers != 0) {
do {
//###cout << headers->data << endl;
if (strncmp(headers->data, "x-amz", 5) == 0) {
++count;
StringToSign += headers->data;
StringToSign += 10; // linefeed
}
} while ((headers = headers->next) != 0);
}
StringToSign += resource;
const void* key = AWSSecretAccessKey.data();
int key_len = AWSSecretAccessKey.size();
const unsigned char* d = reinterpret_cast<const unsigned char*>(StringToSign.data());
int n = StringToSign.size();
unsigned int md_len;
unsigned char md[EVP_MAX_MD_SIZE];
HMAC(evp_md, key, key_len, d, n, md, &md_len);
BIO* b64 = BIO_new(BIO_f_base64());
BIO* bmem = BIO_new(BIO_s_mem());
b64 = BIO_push(b64, bmem);
BIO_write(b64, md, md_len);
BIO_flush(b64);
BUF_MEM *bptr;
BIO_get_mem_ptr(b64, &bptr);
Signature.resize(bptr->length - 1);
memcpy(&Signature[0], bptr->data, bptr->length-1);
BIO_free_all(b64);
return Signature;
}
// libcurl callback
static size_t
readCallback(void *data, size_t blockSize, size_t numBlocks, void *userPtr) {
string *userString = static_cast<string *>(userPtr);
size_t count = min((*userString).size(), blockSize*numBlocks);
memcpy(data, (*userString).data(), count);
(*userString).erase(0, count);
return count;
}
// libcurl callback
static size_t
writeCallback(void* data, size_t blockSize, size_t numBlocks, void* userPtr) {
string* userString = static_cast<string*>(userPtr);
(*userString).append(reinterpret_cast<const char*>(data), blockSize*numBlocks);
return blockSize*numBlocks;
}
static size_t header_callback(void *data,
size_t blockSize,
size_t numBlocks,
void *userPtr) {
headers_t* headers = reinterpret_cast<headers_t*>(userPtr);
string header(reinterpret_cast<char *>(data), blockSize*numBlocks);
string key;
stringstream ss(header);
if (getline(ss, key, ':')) {
string value;
getline(ss, value);
(*headers)[key] = trim(value);
}
return blockSize*numBlocks;
}
// safe variant of dirname
static string
mydirname(string path) {
// dirname clobbers path so let it operate on a tmp copy
return dirname(&path[0]);
}
// safe variant of basename
static string
mybasename(string path) {
// basename clobbers path so let it operate on a tmp copy
return basename(&path[0]);
}
// mkdir --parents
static int
mkdirp(const string& path, mode_t mode) {
string base;
string component;
stringstream ss(path);
while (getline(ss, component, '/')) {
base += "/" + component;
/*if (*/mkdir(base.c_str(), mode)/* == -1);
return -1*/;
}
return 0;
}
#include <openssl/md5.h>
/**
* @return fuse return code
* TODO return pair<int, headers_t>?!?
*/
int
get_headers(const char* path, headers_t& meta) {
string resource(urlEncode("/"+bucket + path));
string url(host + resource);
auto_curl curl;
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_FAILONERROR, true);
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, true);
curl_easy_setopt(curl, CURLOPT_NOBODY, true); // HEAD
curl_easy_setopt(curl, CURLOPT_FILETIME, true); // Last-Modified
headers_t responseHeaders;
curl_easy_setopt(curl, CURLOPT_HEADERDATA, &responseHeaders);
curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, header_callback);
auto_curl_slist headers;
string date = get_date();
headers.append("Date: "+date);
headers.append("Authorization: AWS "+AWSAccessKeyId+":"+calc_signature("HEAD", "", date, headers.get(), resource));
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers.get());
VERIFY(my_curl_easy_perform((curl.get())));
// at this point we know the file exists in s3
for (headers_t::iterator iter = responseHeaders.begin(); iter != responseHeaders.end(); ++iter) {
string key = (*iter).first;
string value = (*iter).second;
if (key == "Content-Type")
meta[key] = value;
if (key == "ETag")
meta[key] = value;
if (key.substr(0, 5) == "x-amz")
meta[key] = value;
}
return 0;
}
/**
* get_local_fd
*/
int
get_local_fd(const char* path) {
string resource(urlEncode("/"+bucket + path));
string url(host + resource);
string baseName = mybasename(path);
string resolved_path(use_cache + "/" + bucket);
int fd = -1;
string cache_path(resolved_path + path);
headers_t responseHeaders;
if (use_cache.size() > 0) {
VERIFY(get_headers(path, responseHeaders));
fd = open(cache_path.c_str(), O_RDWR); // ### TODO should really somehow obey flags here
if (fd != -1) {
MD5_CTX c;
if (MD5_Init(&c) != 1)
Yikes(-EIO);
int count;
char buf[1024];
while ((count = read(fd, buf, sizeof(buf))) > 0) {
if (MD5_Update(&c, buf, count) != 1)
Yikes(-EIO);
}
unsigned char md[MD5_DIGEST_LENGTH];
if (MD5_Final(md, &c) != 1)
Yikes(-EIO);
///###cout << md << endl;
char localMd5[2*MD5_DIGEST_LENGTH+1];
sprintf(localMd5, "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x",
md[0], md[1], md[2], md[3], md[4], md[5], md[6], md[7], md[8], md[9], md[10], md[11], md[12], md[13], md[14], md[15]);
string remoteMd5(trim(responseHeaders["ETag"], "\""));
// md5 match?
if (string(localMd5) != remoteMd5) {
// no! prepare to download
if (close(fd) == -1)
Yikes(-errno);
fd = -1;
}
}
}
// need to download?
if (fd == -1) {
// yes!
if (use_cache.size() > 0) {
/*if (*/mkdirp(resolved_path + mydirname(path), 0777)/* == -1)
return -errno*/;
mode_t mode = strtoul(responseHeaders["x-amz-meta-mode"].c_str(), (char **)NULL, 10);
fd = open(cache_path.c_str(), O_CREAT|O_RDWR|O_TRUNC, mode);
} else {
fd = fileno(tmpfile());
}
if (fd == -1)
Yikes(-errno);
auto_curl curl;
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_FAILONERROR, true);
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, true);
FILE* f = fdopen(fd, "w+");
if (f == 0)
Yikes(-errno);
curl_easy_setopt(curl, CURLOPT_FILE, f);
auto_curl_slist headers;
string date = get_date();
headers.append("Date: "+date);
headers.append("Authorization: AWS "+AWSAccessKeyId+":"+calc_signature("GET", "", date, headers.get(), resource));
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers.get());
cout << "downloading[path=" << path << "][fd=" << fd << "]" << endl;
VERIFY(my_curl_easy_perform(curl.get(), f));
//only one of these is needed...
fflush(f);
fsync(fd);
if (fd == -1)
Yikes(-errno);
}
return fd;
}
/**
* create or update s3 object
* @return fuse return code
*/
static int
put_local_fd(const char* path, headers_t meta, int fd) {
string resource = urlEncode("/"+bucket + path);
string url = host + resource;
struct stat st;
if (fstat(fd, &st) == -1)
Yikes(-errno);
auto_curl curl;
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_FAILONERROR, true);
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, true);
string responseText;
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &responseText);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writeCallback);
curl_easy_setopt(curl, CURLOPT_UPLOAD, true); // HTTP PUT
curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, static_cast<curl_off_t>(st.st_size)); // Content-Length
FILE* f = fdopen(fd, "rb");
if (f == 0)
Yikes(-errno);
curl_easy_setopt(curl, CURLOPT_INFILE, f);
string ContentType = meta["Content-Type"];
auto_curl_slist headers;
string date = get_date();
headers.append("Date: "+date);
meta["x-amz-acl"] = default_acl;
for (headers_t::iterator iter = meta.begin(); iter != meta.end(); ++iter) {
string key = (*iter).first;
string value = (*iter).second;
if (key == "Content-Type")
headers.append(key+":"+value);
if (key.substr(0,9) == "x-amz-acl")
headers.append(key+":"+value);
if (key.substr(0,10) == "x-amz-meta")
headers.append(key+":"+value);
}
headers.append("Authorization: AWS "+AWSAccessKeyId+":"+calc_signature("PUT", ContentType, date, headers.get(), resource));
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers.get());
//###rewind(f);
syslog(LOG_INFO, "upload path=%s", path);
cout << "uploading[path=" << path << "][fd=" << fd << "][size="<<st.st_size <<"]" << endl;
VERIFY(my_curl_easy_perform(curl.get(), f));
return 0;
}
static int
s3fs_getattr(const char *path, struct stat *stbuf) {
cout << "getattr[path=" << path << "]" << endl;
memset(stbuf, 0, sizeof(struct stat));
if (strcmp(path, "/") == 0) {
stbuf->st_nlink = 1; // see fuse faq
stbuf->st_mode = root_mode | S_IFDIR;
return 0;
}
{
auto_lock lock(stat_cache_lock);
stat_cache_t::iterator iter = stat_cache.find(path);
if (iter != stat_cache.end()) {
*stbuf = (*iter).second;
stat_cache.erase(path);
return 0;
}
}
string resource = urlEncode("/"+bucket + path);
string url = host + resource;
auto_curl curl;
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_FAILONERROR, true);
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, true);
curl_easy_setopt(curl, CURLOPT_NOBODY, true); // HEAD
curl_easy_setopt(curl, CURLOPT_FILETIME, true); // Last-Modified
headers_t responseHeaders;
curl_easy_setopt(curl, CURLOPT_HEADERDATA, &responseHeaders);
curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, header_callback);
auto_curl_slist headers;
string date = get_date();
headers.append("Date: "+date);
headers.append("Authorization: AWS "+AWSAccessKeyId+":"+calc_signature("HEAD", "", date, headers.get(), resource));
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers.get());
VERIFY(my_curl_easy_perform(curl.get()));
stbuf->st_nlink = 1; // see fuse faq
stbuf->st_mtime = strtoul(responseHeaders["x-amz-meta-mtime"].c_str(), (char **)NULL, 10);
if (stbuf->st_mtime == 0) {
long LastModified;
if (curl_easy_getinfo(curl, CURLINFO_FILETIME, &LastModified) == 0)
stbuf->st_mtime = LastModified;
}
stbuf->st_mode = strtoul(responseHeaders["x-amz-meta-mode"].c_str(), (char **)NULL, 10);
char* ContentType = 0;
if (curl_easy_getinfo(curl, CURLINFO_CONTENT_TYPE, &ContentType) == 0) {
if (ContentType)
stbuf->st_mode |= strcmp(ContentType, "application/x-directory") == 0 ? S_IFDIR : S_IFREG;
}
double ContentLength;
if (curl_easy_getinfo(curl, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &ContentLength) == 0)
stbuf->st_size = static_cast<off_t>(ContentLength);
if (S_ISREG(stbuf->st_mode))
stbuf->st_blocks = stbuf->st_size / 512 + 1;
stbuf->st_uid = getuid();
stbuf->st_gid = getgid();
return 0;
}
static int
s3fs_readlink(const char *path, char *buf, size_t size) {
cout << "###readlink[path=" << path << "]" << endl;
return -ENOENT;
}
static int
s3fs_mknod(const char *path, mode_t mode, dev_t rdev) {
// see man 2 mknod
// If pathname already exists, or is a symbolic link, this call fails with an EEXIST error.
cout << "mknod[path="<< path << "][mode=" << mode << "]" << endl;
string resource = urlEncode("/"+bucket + path);
string url = host + resource;
auto_curl curl;
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_FAILONERROR, true);
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, true);
curl_easy_setopt(curl, CURLOPT_UPLOAD, true); // HTTP PUT
curl_easy_setopt(curl, CURLOPT_INFILESIZE, 0); // Content-Length: 0
auto_curl_slist headers;
string date = get_date();
headers.append("Date: "+date);
headers.append("Content-Type: application/octet-stream");
// x-amz headers: (a) alphabetical order and (b) no spaces after colon
headers.append("x-amz-acl:"+default_acl);
headers.append("x-amz-meta-mode:"+stringificationizer(mode));
headers.append("x-amz-meta-mtime:"+stringificationizer(time(NULL)));
headers.append("Authorization: AWS "+AWSAccessKeyId+":"+calc_signature("PUT", "application/octet-stream", date, headers.get(), resource));
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers.get());
VERIFY(my_curl_easy_perform(curl.get()));
return 0;
}
static int
s3fs_mkdir(const char *path, mode_t mode) {
cout << "mkdir[path=" << path << "][mode=" << mode << "]" << endl;
string resource = urlEncode("/"+bucket + path);
string url = host + resource;
auto_curl curl;
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_FAILONERROR, true);
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, true);
curl_easy_setopt(curl, CURLOPT_UPLOAD, true); // HTTP PUT
curl_easy_setopt(curl, CURLOPT_INFILESIZE, 0); // Content-Length: 0
auto_curl_slist headers;
string date = get_date();
headers.append("Date: "+date);
headers.append("Content-Type: application/x-directory");
// x-amz headers: (a) alphabetical order and (b) no spaces after colon
headers.append("x-amz-acl:"+default_acl);
headers.append("x-amz-meta-mode:"+stringificationizer(mode));
headers.append("x-amz-meta-mtime:"+stringificationizer(time(NULL)));
headers.append("Authorization: AWS "+AWSAccessKeyId+":"+calc_signature("PUT", "application/x-directory", date, headers.get(), resource));
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers.get());
VERIFY(my_curl_easy_perform(curl.get()));
return 0;
}
// aka rm
static int
s3fs_unlink(const char *path) {
cout << "unlink[path=" << path << "]" << endl;
string resource = urlEncode("/"+bucket + path);
string url = host + resource;
auto_curl curl;
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_FAILONERROR, true);
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, true);
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE");
auto_curl_slist headers;
string date = get_date();
headers.append("Date: "+date);
headers.append("Authorization: AWS "+AWSAccessKeyId+":"+calc_signature("DELETE", "", date, headers.get(), resource));
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers.get());
VERIFY(my_curl_easy_perform(curl.get()));
return 0;
}
static int
s3fs_rmdir(const char *path) {
cout << "unlink[path=" << path << "]" << endl;
string resource = urlEncode("/"+bucket + path);
string url = host + resource;
auto_curl curl;
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_FAILONERROR, true);
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, true);
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE");
auto_curl_slist headers;
string date = get_date();
headers.append("Date: "+date);
headers.append("Authorization: AWS "+AWSAccessKeyId+":"+calc_signature("DELETE", "", date, headers.get(), resource));
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers.get());
VERIFY(my_curl_easy_perform(curl.get()));
return 0;
}
static int
s3fs_symlink(const char *from, const char *to) {
cout << "symlink[from=" << from << "][to=" << to << "]" << endl;
return -EPERM;
}
static int
s3fs_rename(const char *from, const char *to) {
int result = 0;
cout << "rename[from=" << from << "][to=" << to << "]" << endl;
auto_fd fd(get_local_fd(from));
// preserve meta headers across rename
headers_t meta;
VERIFY(get_headers(from, meta));
result = put_local_fd(to, meta, fd.get());
if (result != 0)
return result;
return s3fs_unlink(from);
}
static int
s3fs_link(const char *from, const char *to) {
cout << "link[from=" << from << "][to=" << to << "]" << endl;
return -EPERM;
}
static int
s3fs_chmod(const char *path, mode_t mode) {
cout << "chmod[path=" << path << "][mode=" << mode << "]" << endl;
auto_fd fd(get_local_fd(path));
headers_t meta;
VERIFY(get_headers(path, meta));
meta["x-amz-meta-mode"] = stringificationizer(mode);
return put_local_fd(path, meta, fd.get());
}
static int
s3fs_chown(const char *path, uid_t uid, gid_t gid) {
cout << "###chown[path=" << path << "]" << endl;
// hook this into s3's acl?!?
return 0;
}
static int
s3fs_truncate(const char *path, off_t size) {
//###TODO honor size?!?
cout << "truncate[path=" << path << "][size=" << size << "]" << endl;
// preserve headers across truncate
headers_t meta;
VERIFY(get_headers(path, meta));
auto_fd fd(fileno(tmpfile()));
//###verify fd here?!?
VERIFY(put_local_fd(path, meta, fd.get()));
return 0;
}
// fd -> flags
typedef map<int, int> s3fs_descriptors_t;
static s3fs_descriptors_t s3fs_descriptors;
static pthread_mutex_t s3fs_descriptors_lock;
static int
s3fs_open(const char *path, struct fuse_file_info *fi) {
cout << "open[path=" << path << "][flags=" << fi->flags << "]" << endl;
headers_t meta;
//###TODO check fi->fh here...
fi->fh = get_local_fd(path);
// remember flags and headers...
auto_lock lock(s3fs_descriptors_lock);
s3fs_descriptors[fi->fh] = fi->flags;
return 0;
}
static int
s3fs_read(const char *path, char *buf, size_t size, off_t offset, struct fuse_file_info *fi) {
//###cout << "read: " << path << endl;
int res = pread(fi->fh, buf, size, offset);
if (res == -1)
Yikes(-errno);
return res;
}
static int
s3fs_write(const char *path, const char *buf, size_t size, off_t offset, struct fuse_file_info *fi) {
//###cout << "write: " << path << endl;
int res = pwrite(fi->fh, buf, size, offset);
if (res == -1)
Yikes(-errno);
return res;
}
static int
s3fs_statfs(const char *path, struct statvfs *stbuf) {
// 256T
stbuf->f_bsize = 0X1000000;
stbuf->f_blocks = 0X1000000;
stbuf->f_bfree = 0x1000000;
stbuf->f_bavail = 0x1000000;
return 0;
}
static int
get_flags(int fd) {
auto_lock lock(s3fs_descriptors_lock);
return s3fs_descriptors[fd];
}
static int
s3fs_flush(const char *path, struct fuse_file_info *fi) {
int fd = fi->fh;
cout << "flush[path=" << path << "][fd=" << fd << "]" << endl;
// NOTE- fi->flags is not available here
int flags = get_flags(fd);
if ((flags & O_RDWR) || (flags & O_WRONLY)) {
headers_t meta;
VERIFY(get_headers(path, meta));
meta["x-amz-meta-mtime"]=stringificationizer(time(NULL));
return put_local_fd(path, meta, fd);
}
return 0;
}
static int
s3fs_release(const char *path, struct fuse_file_info *fi) {
int fd = fi->fh;
cout << "release[path=" << path << "][fd=" << fd << "]" << endl;
if (close(fd) == -1)
Yikes(-errno);
return 0;
}
time_t
my_timegm (struct tm *tm) {
time_t ret;
char *tz;
tz = getenv("TZ");
setenv("TZ", "", 1);
tzset();
ret = mktime(tm);
if (tz)
setenv("TZ", tz, 1);
else
unsetenv("TZ");
tzset();
return ret;
}
// All this "stuff" stuff is kinda ugly... it works though... needs cleanup
struct stuff_t {
// default ctor works
string path;
string* url;
struct curl_slist* requestHeaders;
headers_t* responseHeaders;
};
typedef map<CURL*, stuff_t> stuffMap_t;
struct cleanup_stuff {
void operator()(pair<CURL*, stuff_t> qqq) {
stuff_t stuff = qqq.second;
delete stuff.url;
curl_slist_free_all(stuff.requestHeaders);
delete stuff.responseHeaders;
}
};
class auto_stuff {
stuffMap_t stuffMap;
public:
auto_stuff() {
}
~auto_stuff() {
for_each(stuffMap.begin(), stuffMap.end(), cleanup_stuff());
}
stuffMap_t& get() {
return stuffMap;
}
};
static int
s3fs_readdir(const char *path, void *buf, fuse_fill_dir_t filler, off_t offset, struct fuse_file_info *fi) {
//cout << "readdir:"<< " path="<< path << endl;
string NextMarker;
string IsTruncated("true");
while (IsTruncated == "true") {
string responseText;
string resource = urlEncode("/"+bucket); // this is what gets signed
string query = "delimiter=/&prefix=";
if (strcmp(path, "/") != 0)
query += urlEncode(string(path).substr(1) + "/");
if (NextMarker.size() > 0)
query += "&marker=" + urlEncode(NextMarker);
query += "&max-keys=20";
string url = host + resource + "?"+ query;
{
auto_curl curl;
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_FAILONERROR, true);
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, true);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &responseText);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writeCallback);
// headers_t responseHeaders;
// curl_easy_setopt(curl, CURLOPT_HEADERDATA, &responseHeaders);
// curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, header_callback);
auto_curl_slist headers;
string date = get_date();
headers.append("Date: "+date);
headers.append("Authorization: AWS "+AWSAccessKeyId+":"+calc_signature("GET", "", date, headers.get(), resource));
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers.get());
VERIFY(my_curl_easy_perform(curl.get()));
}
auto_stuff curlMap;
auto_curl_multi multi_handle;
// long max_connects = 5;
// curl_multi_setopt(multi_handle.get(), CURLMOPT_MAXCONNECTS, max_connects);
{
xmlDocPtr doc = xmlReadMemory(responseText.c_str(), responseText.size(), "", NULL, 0);
if (doc != NULL&& doc->children != NULL) {
for (xmlNodePtr cur_node = doc->children->children; cur_node != NULL; cur_node = cur_node->next) {
string cur_node_name(reinterpret_cast<const char *>(cur_node->name));
if (cur_node_name == "IsTruncated")
IsTruncated = reinterpret_cast<const char *>(cur_node->children->content);
if (cur_node_name == "NextMarker")
NextMarker = reinterpret_cast<const char *>(cur_node->children->content);
if (cur_node_name == "Contents") {
if (cur_node->children != NULL) {
string Key;
string LastModified;
string Size;
for (xmlNodePtr sub_node = cur_node->children; sub_node != NULL; sub_node = sub_node->next) {
if (sub_node->type == XML_ELEMENT_NODE) {
string elementName = reinterpret_cast<const char*>(sub_node->name);
if (sub_node->children != NULL) {
if (sub_node->children->type == XML_TEXT_NODE) {
if (elementName == "Key")
Key = reinterpret_cast<const char *>(sub_node->children->content);
if (elementName == "LastModified")
LastModified = reinterpret_cast<const char *>(sub_node->children->content);
if (elementName == "Size")
Size = reinterpret_cast<const char *>(sub_node->children->content);
}
}
}
}
if (Key.size() > 0) {
if (filler(buf, mybasename(Key).c_str(), 0, 0))
break;
CURL* curl_handle = alloc_curl_handle();
string resource = urlEncode("/"+bucket + "/" + Key);
string url = host + resource;
stuff_t stuff;
stuff.path = "/"+Key;
// libcurl 7.17 does deep copy of url... e.g., fc7 has libcurl 7.16... therefore, must deep copy "stable" url...
stuff.url = new string(url);
stuff.requestHeaders = 0;
stuff.responseHeaders = new headers_t;
curl_easy_setopt(curl_handle, CURLOPT_URL, stuff.url->c_str());
curl_easy_setopt(curl_handle, CURLOPT_FAILONERROR, true);
curl_easy_setopt(curl_handle, CURLOPT_FOLLOWLOCATION, true);
curl_easy_setopt(curl_handle, CURLOPT_NOBODY, true); // HEAD
curl_easy_setopt(curl_handle, CURLOPT_FILETIME, true); // Last-Modified
// requestHeaders
string date = get_date();
stuff.requestHeaders = curl_slist_append(stuff.requestHeaders, string("Date: "+date).c_str());
stuff.requestHeaders = curl_slist_append(stuff.requestHeaders, string("Authorization: AWS "+AWSAccessKeyId+":"+calc_signature("HEAD", "", date, stuff.requestHeaders, resource)).c_str());
curl_easy_setopt(curl_handle, CURLOPT_HTTPHEADER, stuff.requestHeaders);
// responseHeaders
curl_easy_setopt(curl_handle, CURLOPT_HEADERDATA, stuff.responseHeaders);
curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, header_callback);
curlMap.get()[curl_handle] = stuff;
multi_handle.add_curl(curl_handle);
}
}
}
}
}
xmlFreeDoc(doc);
}
int running_handles;
while (curl_multi_perform(multi_handle.get(), &running_handles) == CURLM_CALL_MULTI_PERFORM)
;
while (running_handles) {
fd_set read_fd_set;
fd_set write_fd_set;
fd_set exc_fd_set;
FD_ZERO(&read_fd_set);
FD_ZERO(&write_fd_set);
FD_ZERO(&exc_fd_set);
long milliseconds;
VERIFY(curl_multi_timeout(multi_handle.get(), &milliseconds));
if (milliseconds > 0) {
struct timeval timeout;
timeout.tv_sec = 1000*milliseconds/1000000;
timeout.tv_usec = 1000*milliseconds%1000000;
int max_fd;
VERIFY(curl_multi_fdset(multi_handle.get(), &read_fd_set, &write_fd_set, &exc_fd_set, &max_fd));
if (select(max_fd + 1, &read_fd_set, &write_fd_set, &exc_fd_set, &timeout) == -1)
Yikes(-errno);
}
while (curl_multi_perform(multi_handle.get(), &running_handles) == CURLM_CALL_MULTI_PERFORM)
;
}
int remaining_msgs = 1;
while (remaining_msgs) {
// this next line pegs cpu for directories w/lotsa files
CURLMsg* msg = curl_multi_info_read(multi_handle.get(), &remaining_msgs);
if (msg != NULL) {
CURLcode code =msg->data.result;
if (code != 0)
syslog(LOG_ERR, "###%d %s", code, curl_easy_strerror(code));
if (code == 0) {
CURL* curl_handle = msg->easy_handle;
stuff_t stuff = curlMap.get()[curl_handle];
struct stat st;
memset(&st, 0, sizeof(st));
st.st_nlink = 1; // see fuse faq
// mode
st.st_mode = strtoul((*stuff.responseHeaders)["x-amz-meta-mode"].c_str(), (char **)NULL, 10);
char* ContentType = 0;
if (curl_easy_getinfo(curl_handle, CURLINFO_CONTENT_TYPE, &ContentType) == 0) {
if (ContentType)
st.st_mode |= strcmp(ContentType, "application/x-directory") == 0 ? S_IFDIR : S_IFREG;
}
// mtime
st.st_mtime = strtoul((*stuff.responseHeaders)["x-amz-meta-mtime"].c_str(), (char **)NULL, 10);
if (st.st_mtime == 0) {
long LastModified;
if (curl_easy_getinfo(curl_handle, CURLINFO_FILETIME, &LastModified) == 0)
st.st_mtime = LastModified;
}
// size
double ContentLength;
if (curl_easy_getinfo(curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &ContentLength) == 0)
st.st_size = static_cast<off_t>(ContentLength);
// blocks
if (S_ISREG(st.st_mode))
st.st_blocks = st.st_size / 512 + 1;
st.st_uid = getuid();
st.st_gid = getgid();
auto_lock lock(stat_cache_lock);
stat_cache[stuff.path] = st;
}
}
}
} // IsTruncated
return 0;
}
static pthread_mutex_t *mutex_buf = NULL;
/**
* OpenSSL locking function.
*
* @param mode lock mode
* @param n lock number
* @param file source file name
* @param line source file line number
* @return none
*/
static void locking_function(int mode, int n, const char *file, int line)
{
if (mode & CRYPTO_LOCK) {
pthread_mutex_lock(&mutex_buf[n]);
} else {
pthread_mutex_unlock(&mutex_buf[n]);
}
}
/**
* OpenSSL uniq id function.
*
* @return thread id
*/
static unsigned long id_function(void)
{
return ((unsigned long) pthread_self());
}
static void* s3fs_init(struct fuse_conn_info *conn) {
syslog(LOG_INFO, "init");
// openssl
mutex_buf = static_cast<pthread_mutex_t*>(malloc(CRYPTO_num_locks() * sizeof(pthread_mutex_t)));
for (int i = 0; i < CRYPTO_num_locks(); i++)
pthread_mutex_init(&mutex_buf[i], NULL);
CRYPTO_set_locking_callback(locking_function);
CRYPTO_set_id_callback(id_function);
curl_global_init(CURL_GLOBAL_ALL);
pthread_mutex_init(&curl_handles_lock, NULL);
pthread_mutex_init(&s3fs_descriptors_lock, NULL);
pthread_mutex_init(&stat_cache_lock, NULL);
return 0;
}
static void s3fs_destroy(void*) {
syslog(LOG_INFO, "destroy");
// openssl
CRYPTO_set_id_callback(NULL);
CRYPTO_set_locking_callback(NULL);
for (int i = 0; i < CRYPTO_num_locks(); i++)
pthread_mutex_destroy(&mutex_buf[i]);
free(mutex_buf);
mutex_buf = NULL;
curl_global_cleanup();
pthread_mutex_destroy(&curl_handles_lock);
pthread_mutex_destroy(&s3fs_descriptors_lock);
pthread_mutex_destroy(&stat_cache_lock);
}
static int
s3fs_access(const char *path, int mask) {
//###cout << "###access[path=" << path << "]" << endl;
return 0;
}
// aka touch
static int
s3fs_utimens(const char *path, const struct timespec ts[2]) {
cout << "utimens[path=" << path << "][mtime=" << stringificationizer(ts[1].tv_sec) << "]" << endl;
auto_fd fd(get_local_fd(path));
headers_t meta;
VERIFY(get_headers(path, meta));
meta["x-amz-meta-mtime"] = stringificationizer(ts[1].tv_sec);
return put_local_fd(path, meta, fd.get());
}
static int
my_fuse_opt_proc(void *data, const char *arg, int key, struct fuse_args *outargs) {
if (key == FUSE_OPT_KEY_NONOPT) {
if (bucket.size() == 0) {
bucket = arg;
return 0;
} else {
struct stat buf;
// its the mountpoint... what is its mode?
if (stat(arg, &buf) != -1) {
root_mode = buf.st_mode;
}
}
}
if (key == FUSE_OPT_KEY_OPT) {
if (strstr(arg, "accessKeyId=") != 0) {
AWSAccessKeyId = strchr(arg, '=') + 1;
return 0;
}
if (strstr(arg, "secretAccessKey=") != 0) {
AWSSecretAccessKey = strchr(arg, '=') + 1;
return 0;
}
if (strstr(arg, "default_acl=") != 0) {
default_acl = strchr(arg, '=') + 1;
return 0;
}
// ### TODO: prefix
if (strstr(arg, "retries=") != 0) {
retries = atoi(strchr(arg, '=') + 1);
return 0;
}
if (strstr(arg, "use_cache=") != 0) {
use_cache = strchr(arg, '=') + 1;
return 0;
}
}
return 1;
}
static struct fuse_operations s3fs_oper;
int
main(int argc, char *argv[]) {
memset(&s3fs_oper, sizeof(s3fs_oper), 0);
struct fuse_args custom_args = FUSE_ARGS_INIT(argc, argv);
fuse_opt_parse(&custom_args, NULL, NULL, my_fuse_opt_proc);
if (bucket.size() == 0) {
cout << argv[0] << ": " << "missing bucket" << endl;
exit(1);
}
if (AWSSecretAccessKey.size() == 0) {
string line;
ifstream passwd("/etc/passwd-s3fs");
while (getline(passwd, line)) {
if (line[0]=='#')
continue;
size_t pos = line.find(':');
if (pos != string::npos) {
// is accessKeyId missing?
if (AWSAccessKeyId.size() == 0)
AWSAccessKeyId = line.substr(0, pos);
// is secretAccessKey missing?
if (AWSSecretAccessKey.size() == 0) {
if (line.substr(0, pos) == AWSAccessKeyId)
AWSSecretAccessKey = line.substr(pos + 1, string::npos);
}
}
}
}
if (AWSAccessKeyId.size() == 0) {
cout << argv[0] << ": " << "missing accessKeyId.. see /etc/passwd-s3fs or use, e.g., -o accessKeyId=aaa" << endl;
exit(1);
}
if (AWSSecretAccessKey.size() == 0) {
cout << argv[0] << ": " << "missing secretAccessKey... see /etc/passwd-s3fs or use, e.g., -o secretAccessKey=bbb" << endl;
exit(1);
}
s3fs_oper.getattr = s3fs_getattr;
s3fs_oper.readlink = s3fs_readlink;
s3fs_oper.mknod = s3fs_mknod;
s3fs_oper.mkdir = s3fs_mkdir;
s3fs_oper.unlink = s3fs_unlink;
s3fs_oper.rmdir = s3fs_rmdir;
s3fs_oper.symlink = s3fs_symlink;
s3fs_oper.rename = s3fs_rename;
s3fs_oper.link = s3fs_link;
s3fs_oper.chmod = s3fs_chmod;
s3fs_oper.chown = s3fs_chown;
s3fs_oper.truncate = s3fs_truncate;
s3fs_oper.open = s3fs_open;
s3fs_oper.read = s3fs_read;
s3fs_oper.write = s3fs_write;
s3fs_oper.statfs = s3fs_statfs;
s3fs_oper.flush = s3fs_flush;
s3fs_oper.release = s3fs_release;
s3fs_oper.readdir = s3fs_readdir;
s3fs_oper.init = s3fs_init;
s3fs_oper.destroy = s3fs_destroy;
s3fs_oper.access = s3fs_access;
s3fs_oper.utimens = s3fs_utimens;
return fuse_main(custom_args.argc, custom_args.argv, &s3fs_oper, NULL);
}