Support for modifying files > 5GB (fixes issue #215)

Modified rename_object and put_headers to handle objects larger than
5GB. Files larger than 5GB are required to use the multi interface.


git-svn-id: http://s3fs.googlecode.com/svn/trunk@363 df820570-a93a-0410-bd06-b72b767a4274
This commit is contained in:
ben.lemasurier@gmail.com 2011-08-29 22:01:32 +00:00
parent f623a395bc
commit c933b6a9b1
3 changed files with 254 additions and 8 deletions

View File

@ -311,7 +311,7 @@ size_t WriteMemoryCallback(void *ptr, size_t blockSize, size_t numBlocks, void *
memcpy(&(mem->text[mem->size]), ptr, realsize); memcpy(&(mem->text[mem->size]), ptr, realsize);
mem->size += realsize; mem->size += realsize;
mem->text[mem->size] = 0; mem->text[mem->size] = 0;
return realsize; return realsize;
} }

View File

@ -613,12 +613,18 @@ static int put_headers(const char *path, headers_t meta) {
char *s3_realpath; char *s3_realpath;
string url; string url;
string resource; string resource;
struct stat buf;
struct BodyStruct body; struct BodyStruct body;
CURL *curl = NULL; CURL *curl = NULL;
if(foreground) if(foreground)
cout << " put_headers[path=" << path << "]" << endl; cout << " put_headers[path=" << path << "]" << endl;
// files larger than 5GB must be modified via the multipart interface
s3fs_getattr(path, &buf);
if(buf.st_size >= 5368709120)
return(put_multipart_headers(path, meta));
s3_realpath = get_realpath(path); s3_realpath = get_realpath(path);
resource = urlEncode(service_path + bucket + s3_realpath); resource = urlEncode(service_path + bucket + s3_realpath);
url = host + resource; url = host + resource;
@ -697,6 +703,80 @@ static int put_headers(const char *path, headers_t meta) {
return 0; return 0;
} }
static int put_multipart_headers(const char *path, headers_t meta) {
int result;
char *s3_realpath;
string url;
string resource;
string upload_id;
struct stat buf;
struct BodyStruct body;
vector <file_part> parts;
if(foreground)
cout << " put_multipart_headers[path=" << path << "]" << endl;
s3_realpath = get_realpath(path);
resource = urlEncode(service_path + bucket + s3_realpath);
url = host + resource;
body.text = (char *)malloc(1);
body.size = 0;
s3fs_getattr(path, &buf);
upload_id = initiate_multipart_upload(path, buf.st_size, meta);
if(upload_id.size() == 0)
return(-EIO);
off_t chunk = 0;
off_t bytes_written = 0;
off_t bytes_remaining = buf.st_size;
while(bytes_remaining > 0) {
file_part part;
if(bytes_remaining > MAX_COPY_SOURCE_SIZE)
chunk = MAX_COPY_SOURCE_SIZE;
else
chunk = bytes_remaining - 1;
stringstream ss;
ss << "bytes=" << bytes_written << "-" << (bytes_written + chunk);
meta["x-amz-copy-source-range"] = ss.str();
part.etag = copy_part(path, path, parts.size() + 1, upload_id, meta);
parts.push_back(part);
bytes_written += (chunk + 1);
bytes_remaining = buf.st_size - bytes_written;
}
result = complete_multipart_upload(path, upload_id, parts);
if(result != 0) {
free(s3_realpath);
return -EIO;
}
// Update mtime in local file cache.
if(meta.count("x-amz-meta-mtime") > 0 && use_cache.size() > 0) {
struct stat st;
struct utimbuf n_mtime;
string cache_path(use_cache + "/" + bucket + path);
if((stat(cache_path.c_str(), &st)) == 0) {
n_mtime.modtime = strtoul(meta["x-amz-meta-mtime"].c_str(), (char **) NULL, 10);
n_mtime.actime = n_mtime.modtime;
if((utime(cache_path.c_str(), &n_mtime)) == -1) {
YIKES(-errno);
}
}
}
free(s3_realpath);
return 0;
}
static int put_local_fd_small_file(const char* path, headers_t meta, int fd) { static int put_local_fd_small_file(const char* path, headers_t meta, int fd) {
string resource; string resource;
string url; string url;
@ -1294,6 +1374,7 @@ string upload_part(const char *path, const char *source, int part_number, string
close(fd); close(fd);
if(!md5.empty() && strstr(header.text, md5.c_str())) { if(!md5.empty() && strstr(header.text, md5.c_str())) {
ETag.assign(md5); ETag.assign(md5);
} else { } else {
if(header.text) if(header.text)
free(header.text); free(header.text);
@ -1314,6 +1395,102 @@ string upload_part(const char *path, const char *source, int part_number, string
return ETag; return ETag;
} }
string copy_part(const char *from, const char *to, int part_number, string upload_id, headers_t meta) {
CURL *curl = NULL;
int result;
string url;
string my_url;
string auth;
string resource;
string raw_date;
string ETag;
char *s3_realpath;
struct BodyStruct body;
struct BodyStruct header;
// Now copy the file as the nth part
if(foreground)
printf("copy_part [from=%s] [to=%s]\n", from, to);
s3_realpath = get_realpath(to);
resource = urlEncode(service_path + bucket + s3_realpath);
resource.append("?partNumber=");
resource.append(IntToStr(part_number));
resource.append("&uploadId=");
resource.append(upload_id);
url = host + resource;
my_url = prepare_url(url.c_str());
body.text = (char *)malloc(1);
body.size = 0;
header.text = (char *)malloc(1);
header.size = 0;
auto_curl_slist headers;
string date = get_date();
headers.append("Date: " + date);
string ContentType = meta["Content-Type"];
meta["x-amz-acl"] = default_acl;
for(headers_t::iterator iter = meta.begin(); iter != meta.end(); ++iter) {
string key = (*iter).first;
string value = (*iter).second;
if (key == "Content-Type")
headers.append(key + ":" + value);
if (key == "x-amz-copy-source")
headers.append(key + ":" + value);
if (key == "x-amz-copy-source-range")
headers.append(key + ":" + value);
}
if(use_rrs.substr(0,1) == "1")
headers.append("x-amz-storage-class:REDUCED_REDUNDANCY");
if(public_bucket.substr(0,1) != "1")
headers.append("Authorization: AWS " + AWSAccessKeyId + ":" +
calc_signature("PUT", ContentType, date, headers.get(), resource));
curl = create_curl_handle();
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)&body);
curl_easy_setopt(curl, CURLOPT_HEADERDATA, (void *)&header);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteMemoryCallback);
curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, WriteMemoryCallback);
curl_easy_setopt(curl, CURLOPT_UPLOAD, true); // HTTP PUT
curl_easy_setopt(curl, CURLOPT_INFILESIZE, 0); // Content-Length
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers.get());
curl_easy_setopt(curl, CURLOPT_URL, my_url.c_str());
result = my_curl_easy_perform(curl, &body);
destroy_curl_handle(curl);
if(result != 0) {
if(body.text)
free(body.text);
if(header.text)
free(header.text);
free(s3_realpath);
return "";
}
char *start_etag;
char *end_etag;
start_etag = strstr(body.text, "ETag");
end_etag = strstr(body.text, "/ETag>");
start_etag += 11;
ETag.assign(start_etag, (size_t)(end_etag - start_etag - 7));
// clean up
if(body.text)
free(body.text);
if(header.text)
free(header.text);
free(s3_realpath);
return ETag;
}
string md5sum(int fd) { string md5sum(int fd) {
MD5_CTX c; MD5_CTX c;
char buf[512]; char buf[512];
@ -1876,7 +2053,7 @@ static int rename_object(const char *from, const char *to) {
headers_t meta; headers_t meta;
if(foreground) if(foreground)
cout << "rename_object[from=" << from << "][to=" << to << "]" << endl; printf("rename_object [from=%s] [to=%s]\n", from , to);
if(debug) if(debug)
syslog(LOG_DEBUG, "rename_object [from=%s] [to=%s]", from, to); syslog(LOG_DEBUG, "rename_object [from=%s] [to=%s]", from, to);
@ -1900,6 +2077,62 @@ static int rename_object(const char *from, const char *to) {
return result; return result;
} }
static int rename_large_object(const char *from, const char *to) {
int result;
char *s3_realpath;
struct stat buf;
headers_t meta;
string upload_id;
vector <file_part> parts;
if(foreground)
printf("rename_large_object [from=%s] [to=%s]\n", from , to);
if(debug)
syslog(LOG_DEBUG, "rename_large_object [from=%s] [to=%s]", from, to);
s3fs_getattr(from, &buf);
s3_realpath = get_realpath(from);
if((get_headers(from, meta) != 0))
return -1;
meta["Content-Type"] = lookupMimeType(to);
meta["x-amz-copy-source"] = urlEncode("/" + bucket + s3_realpath);
upload_id = initiate_multipart_upload(to, buf.st_size, meta);
if(upload_id.size() == 0)
return(-EIO);
off_t chunk = 0;
off_t bytes_written = 0;
off_t bytes_remaining = buf.st_size;
while(bytes_remaining > 0) {
file_part part;
if(bytes_remaining > MAX_COPY_SOURCE_SIZE)
chunk = MAX_COPY_SOURCE_SIZE;
else
chunk = bytes_remaining - 1;
stringstream ss;
ss << "bytes=" << bytes_written << "-" << (bytes_written + chunk);
meta["x-amz-copy-source-range"] = ss.str();
part.etag = copy_part(from, to, parts.size() + 1, upload_id, meta);
parts.push_back(part);
bytes_written += (chunk + 1);
bytes_remaining = buf.st_size - bytes_written;
}
result = complete_multipart_upload(to, upload_id, parts);
if(result != 0)
return -EIO;
return s3fs_unlink(from);
}
static int clone_directory_object(const char *from, const char *to) { static int clone_directory_object(const char *from, const char *to) {
int result; int result;
mode_t mode; mode_t mode;
@ -2187,16 +2420,18 @@ static int s3fs_rename(const char *from, const char *to) {
int result; int result;
if(foreground) if(foreground)
cout << "rename[from=" << from << "][to=" << to << "]" << endl; printf("s3fs_rename [from=%s] [to=%s]\n", from, to);
if(debug) if(debug)
syslog(LOG_DEBUG, "rename [from=%s] [to=%s]", from, to); syslog(LOG_DEBUG, "s3fs_rename [from=%s] [to=%s]", from, to);
s3fs_getattr(from, &buf); s3fs_getattr(from, &buf);
// is a directory or a different type of file // files larger than 5GB must be modified via the multipart interface
if(S_ISDIR(buf.st_mode)) if(S_ISDIR(buf.st_mode))
result = rename_directory(from, to); result = rename_directory(from, to);
else if(buf.st_size >= 5368709120)
result = rename_large_object(from, to);
else else
result = rename_object(from, to); result = rename_object(from, to);
@ -2227,9 +2462,12 @@ static int s3fs_chmod(const char *path, mode_t mode) {
meta["x-amz-metadata-directive"] = "REPLACE"; meta["x-amz-metadata-directive"] = "REPLACE";
free(s3_realpath); free(s3_realpath);
if(put_headers(path, meta) != 0)
return -EIO;
delete_stat_cache_entry(path); delete_stat_cache_entry(path);
return put_headers(path, meta); return 0;
} }
static int s3fs_chown(const char *path, uid_t uid, gid_t gid) { static int s3fs_chown(const char *path, uid_t uid, gid_t gid) {
@ -2257,9 +2495,12 @@ static int s3fs_chown(const char *path, uid_t uid, gid_t gid) {
meta["x-amz-metadata-directive"] = "REPLACE"; meta["x-amz-metadata-directive"] = "REPLACE";
free(s3_realpath); free(s3_realpath);
if(put_headers(path, meta) != 0)
return -EIO;
delete_stat_cache_entry(path); delete_stat_cache_entry(path);
return put_headers(path, meta); return 0;
} }
static int s3fs_truncate(const char *path, off_t size) { static int s3fs_truncate(const char *path, off_t size) {

View File

@ -4,6 +4,7 @@
#define FUSE_USE_VERSION 26 #define FUSE_USE_VERSION 26
#define MULTIPART_SIZE 10485760 // 10MB #define MULTIPART_SIZE 10485760 // 10MB
#define MAX_REQUESTS 100 // max number of concurrent HTTP requests #define MAX_REQUESTS 100 // max number of concurrent HTTP requests
#define MAX_COPY_SOURCE_SIZE 524288000 // 500MB
#include <map> #include <map>
#include <string> #include <string>
@ -87,6 +88,7 @@ static struct fuse_operations s3fs_oper;
std::string lookupMimeType(std::string); std::string lookupMimeType(std::string);
std::string initiate_multipart_upload(const char *path, off_t size, headers_t meta); std::string initiate_multipart_upload(const char *path, off_t size, headers_t meta);
std::string upload_part(const char *path, const char *source, int part_number, std::string upload_id); std::string upload_part(const char *path, const char *source, int part_number, std::string upload_id);
std::string copy_part(const char *from, const char *to, int part_number, std::string upload_id, headers_t meta);
static int complete_multipart_upload(const char *path, std::string upload_id, std::vector <file_part> parts); static int complete_multipart_upload(const char *path, std::string upload_id, std::vector <file_part> parts);
std::string md5sum(int fd); std::string md5sum(int fd);
char *get_realpath(const char *path); char *get_realpath(const char *path);
@ -103,6 +105,9 @@ static int append_objects_from_xml(const char *xml, struct s3_object **head);
static const char *get_next_marker(const char *xml); static const char *get_next_marker(const char *xml);
static char *get_object_name(xmlDocPtr doc, xmlNodePtr node); static char *get_object_name(xmlDocPtr doc, xmlNodePtr node);
static int put_headers(const char *path, headers_t meta);
static int put_multipart_headers(const char *path, headers_t meta);
static int s3fs_getattr(const char *path, struct stat *stbuf); static int s3fs_getattr(const char *path, struct stat *stbuf);
static int s3fs_readlink(const char *path, char *buf, size_t size); static int s3fs_readlink(const char *path, char *buf, size_t size);
static int s3fs_mknod(const char* path, mode_t mode, dev_t rdev); static int s3fs_mknod(const char* path, mode_t mode, dev_t rdev);