From c933b6a9b1006724f9e270f33cc964d0eb7d5f7e Mon Sep 17 00:00:00 2001 From: "ben.lemasurier@gmail.com" Date: Mon, 29 Aug 2011 22:01:32 +0000 Subject: [PATCH] Support for modifying files > 5GB (fixes issue #215) Modified rename_object and put_headers to handle objects larger than 5GB. Files larger than 5GB are required to use the multi interface. git-svn-id: http://s3fs.googlecode.com/svn/trunk@363 df820570-a93a-0410-bd06-b72b767a4274 --- src/curl.cpp | 2 +- src/s3fs.cpp | 255 +++++++++++++++++++++++++++++++++++++++++++++++++-- src/s3fs.h | 5 + 3 files changed, 254 insertions(+), 8 deletions(-) diff --git a/src/curl.cpp b/src/curl.cpp index 19d6dc5..54d37f3 100644 --- a/src/curl.cpp +++ b/src/curl.cpp @@ -311,7 +311,7 @@ size_t WriteMemoryCallback(void *ptr, size_t blockSize, size_t numBlocks, void * memcpy(&(mem->text[mem->size]), ptr, realsize); mem->size += realsize; mem->text[mem->size] = 0; - + return realsize; } diff --git a/src/s3fs.cpp b/src/s3fs.cpp index 9c58f42..d4466f9 100644 --- a/src/s3fs.cpp +++ b/src/s3fs.cpp @@ -613,12 +613,18 @@ static int put_headers(const char *path, headers_t meta) { char *s3_realpath; string url; string resource; + struct stat buf; struct BodyStruct body; CURL *curl = NULL; if(foreground) cout << " put_headers[path=" << path << "]" << endl; + // files larger than 5GB must be modified via the multipart interface + s3fs_getattr(path, &buf); + if(buf.st_size >= 5368709120) + return(put_multipart_headers(path, meta)); + s3_realpath = get_realpath(path); resource = urlEncode(service_path + bucket + s3_realpath); url = host + resource; @@ -697,6 +703,80 @@ static int put_headers(const char *path, headers_t meta) { return 0; } +static int put_multipart_headers(const char *path, headers_t meta) { + int result; + char *s3_realpath; + string url; + string resource; + string upload_id; + struct stat buf; + struct BodyStruct body; + vector parts; + + if(foreground) + cout << " put_multipart_headers[path=" << path << "]" << endl; + + s3_realpath = get_realpath(path); + resource = urlEncode(service_path + bucket + s3_realpath); + url = host + resource; + + body.text = (char *)malloc(1); + body.size = 0; + + s3fs_getattr(path, &buf); + + upload_id = initiate_multipart_upload(path, buf.st_size, meta); + if(upload_id.size() == 0) + return(-EIO); + + off_t chunk = 0; + off_t bytes_written = 0; + off_t bytes_remaining = buf.st_size; + while(bytes_remaining > 0) { + file_part part; + + if(bytes_remaining > MAX_COPY_SOURCE_SIZE) + chunk = MAX_COPY_SOURCE_SIZE; + else + chunk = bytes_remaining - 1; + + stringstream ss; + ss << "bytes=" << bytes_written << "-" << (bytes_written + chunk); + meta["x-amz-copy-source-range"] = ss.str(); + + part.etag = copy_part(path, path, parts.size() + 1, upload_id, meta); + parts.push_back(part); + + bytes_written += (chunk + 1); + bytes_remaining = buf.st_size - bytes_written; + } + + result = complete_multipart_upload(path, upload_id, parts); + if(result != 0) { + free(s3_realpath); + return -EIO; + } + + // Update mtime in local file cache. + if(meta.count("x-amz-meta-mtime") > 0 && use_cache.size() > 0) { + struct stat st; + struct utimbuf n_mtime; + string cache_path(use_cache + "/" + bucket + path); + + if((stat(cache_path.c_str(), &st)) == 0) { + n_mtime.modtime = strtoul(meta["x-amz-meta-mtime"].c_str(), (char **) NULL, 10); + n_mtime.actime = n_mtime.modtime; + if((utime(cache_path.c_str(), &n_mtime)) == -1) { + YIKES(-errno); + } + } + } + + free(s3_realpath); + + return 0; +} + static int put_local_fd_small_file(const char* path, headers_t meta, int fd) { string resource; string url; @@ -1294,6 +1374,7 @@ string upload_part(const char *path, const char *source, int part_number, string close(fd); if(!md5.empty() && strstr(header.text, md5.c_str())) { ETag.assign(md5); + } else { if(header.text) free(header.text); @@ -1314,6 +1395,102 @@ string upload_part(const char *path, const char *source, int part_number, string return ETag; } +string copy_part(const char *from, const char *to, int part_number, string upload_id, headers_t meta) { + CURL *curl = NULL; + int result; + string url; + string my_url; + string auth; + string resource; + string raw_date; + string ETag; + char *s3_realpath; + struct BodyStruct body; + struct BodyStruct header; + + // Now copy the file as the nth part + if(foreground) + printf("copy_part [from=%s] [to=%s]\n", from, to); + + s3_realpath = get_realpath(to); + resource = urlEncode(service_path + bucket + s3_realpath); + resource.append("?partNumber="); + resource.append(IntToStr(part_number)); + resource.append("&uploadId="); + resource.append(upload_id); + url = host + resource; + my_url = prepare_url(url.c_str()); + + body.text = (char *)malloc(1); + body.size = 0; + header.text = (char *)malloc(1); + header.size = 0; + + auto_curl_slist headers; + string date = get_date(); + headers.append("Date: " + date); + + string ContentType = meta["Content-Type"]; + meta["x-amz-acl"] = default_acl; + + for(headers_t::iterator iter = meta.begin(); iter != meta.end(); ++iter) { + string key = (*iter).first; + string value = (*iter).second; + if (key == "Content-Type") + headers.append(key + ":" + value); + if (key == "x-amz-copy-source") + headers.append(key + ":" + value); + if (key == "x-amz-copy-source-range") + headers.append(key + ":" + value); + } + + if(use_rrs.substr(0,1) == "1") + headers.append("x-amz-storage-class:REDUCED_REDUNDANCY"); + + if(public_bucket.substr(0,1) != "1") + headers.append("Authorization: AWS " + AWSAccessKeyId + ":" + + calc_signature("PUT", ContentType, date, headers.get(), resource)); + + curl = create_curl_handle(); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)&body); + curl_easy_setopt(curl, CURLOPT_HEADERDATA, (void *)&header); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteMemoryCallback); + curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, WriteMemoryCallback); + curl_easy_setopt(curl, CURLOPT_UPLOAD, true); // HTTP PUT + curl_easy_setopt(curl, CURLOPT_INFILESIZE, 0); // Content-Length + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers.get()); + curl_easy_setopt(curl, CURLOPT_URL, my_url.c_str()); + + result = my_curl_easy_perform(curl, &body); + destroy_curl_handle(curl); + + if(result != 0) { + if(body.text) + free(body.text); + if(header.text) + free(header.text); + free(s3_realpath); + + return ""; + } + + char *start_etag; + char *end_etag; + start_etag = strstr(body.text, "ETag"); + end_etag = strstr(body.text, "/ETag>"); + start_etag += 11; + ETag.assign(start_etag, (size_t)(end_etag - start_etag - 7)); + + // clean up + if(body.text) + free(body.text); + if(header.text) + free(header.text); + free(s3_realpath); + + return ETag; +} + string md5sum(int fd) { MD5_CTX c; char buf[512]; @@ -1876,7 +2053,7 @@ static int rename_object(const char *from, const char *to) { headers_t meta; if(foreground) - cout << "rename_object[from=" << from << "][to=" << to << "]" << endl; + printf("rename_object [from=%s] [to=%s]\n", from , to); if(debug) syslog(LOG_DEBUG, "rename_object [from=%s] [to=%s]", from, to); @@ -1900,6 +2077,62 @@ static int rename_object(const char *from, const char *to) { return result; } +static int rename_large_object(const char *from, const char *to) { + int result; + char *s3_realpath; + struct stat buf; + headers_t meta; + string upload_id; + vector parts; + + if(foreground) + printf("rename_large_object [from=%s] [to=%s]\n", from , to); + + if(debug) + syslog(LOG_DEBUG, "rename_large_object [from=%s] [to=%s]", from, to); + + s3fs_getattr(from, &buf); + s3_realpath = get_realpath(from); + + if((get_headers(from, meta) != 0)) + return -1; + + meta["Content-Type"] = lookupMimeType(to); + meta["x-amz-copy-source"] = urlEncode("/" + bucket + s3_realpath); + + upload_id = initiate_multipart_upload(to, buf.st_size, meta); + if(upload_id.size() == 0) + return(-EIO); + + off_t chunk = 0; + off_t bytes_written = 0; + off_t bytes_remaining = buf.st_size; + while(bytes_remaining > 0) { + file_part part; + + if(bytes_remaining > MAX_COPY_SOURCE_SIZE) + chunk = MAX_COPY_SOURCE_SIZE; + else + chunk = bytes_remaining - 1; + + stringstream ss; + ss << "bytes=" << bytes_written << "-" << (bytes_written + chunk); + meta["x-amz-copy-source-range"] = ss.str(); + + part.etag = copy_part(from, to, parts.size() + 1, upload_id, meta); + parts.push_back(part); + + bytes_written += (chunk + 1); + bytes_remaining = buf.st_size - bytes_written; + } + + result = complete_multipart_upload(to, upload_id, parts); + if(result != 0) + return -EIO; + + return s3fs_unlink(from); +} + static int clone_directory_object(const char *from, const char *to) { int result; mode_t mode; @@ -2187,16 +2420,18 @@ static int s3fs_rename(const char *from, const char *to) { int result; if(foreground) - cout << "rename[from=" << from << "][to=" << to << "]" << endl; + printf("s3fs_rename [from=%s] [to=%s]\n", from, to); if(debug) - syslog(LOG_DEBUG, "rename [from=%s] [to=%s]", from, to); + syslog(LOG_DEBUG, "s3fs_rename [from=%s] [to=%s]", from, to); s3fs_getattr(from, &buf); - - // is a directory or a different type of file + + // files larger than 5GB must be modified via the multipart interface if(S_ISDIR(buf.st_mode)) result = rename_directory(from, to); + else if(buf.st_size >= 5368709120) + result = rename_large_object(from, to); else result = rename_object(from, to); @@ -2227,9 +2462,12 @@ static int s3fs_chmod(const char *path, mode_t mode) { meta["x-amz-metadata-directive"] = "REPLACE"; free(s3_realpath); + if(put_headers(path, meta) != 0) + return -EIO; + delete_stat_cache_entry(path); - return put_headers(path, meta); + return 0; } static int s3fs_chown(const char *path, uid_t uid, gid_t gid) { @@ -2257,9 +2495,12 @@ static int s3fs_chown(const char *path, uid_t uid, gid_t gid) { meta["x-amz-metadata-directive"] = "REPLACE"; free(s3_realpath); + if(put_headers(path, meta) != 0) + return -EIO; + delete_stat_cache_entry(path); - return put_headers(path, meta); + return 0; } static int s3fs_truncate(const char *path, off_t size) { diff --git a/src/s3fs.h b/src/s3fs.h index e2f5dbb..0888da1 100644 --- a/src/s3fs.h +++ b/src/s3fs.h @@ -4,6 +4,7 @@ #define FUSE_USE_VERSION 26 #define MULTIPART_SIZE 10485760 // 10MB #define MAX_REQUESTS 100 // max number of concurrent HTTP requests +#define MAX_COPY_SOURCE_SIZE 524288000 // 500MB #include #include @@ -87,6 +88,7 @@ static struct fuse_operations s3fs_oper; std::string lookupMimeType(std::string); std::string initiate_multipart_upload(const char *path, off_t size, headers_t meta); std::string upload_part(const char *path, const char *source, int part_number, std::string upload_id); +std::string copy_part(const char *from, const char *to, int part_number, std::string upload_id, headers_t meta); static int complete_multipart_upload(const char *path, std::string upload_id, std::vector parts); std::string md5sum(int fd); char *get_realpath(const char *path); @@ -103,6 +105,9 @@ static int append_objects_from_xml(const char *xml, struct s3_object **head); static const char *get_next_marker(const char *xml); static char *get_object_name(xmlDocPtr doc, xmlNodePtr node); +static int put_headers(const char *path, headers_t meta); +static int put_multipart_headers(const char *path, headers_t meta); + static int s3fs_getattr(const char *path, struct stat *stbuf); static int s3fs_readlink(const char *path, char *buf, size_t size); static int s3fs_mknod(const char* path, mode_t mode, dev_t rdev);