From 1c93dd30c16482c424866605e2ff6d15f56db820 Mon Sep 17 00:00:00 2001 From: "ggtakec@gmail.com" Date: Fri, 12 Jul 2013 00:33:36 +0000 Subject: [PATCH] Changes codes 1) For uploading performance(part 2) Changed a codes about uploading large object(multipart uploading). This revision does not make temporary file when s3fs uploads large object by multipart uploading. Before this revision, s3fs made temporary file(/tmp/s3fs.XXXXX) for multipart, but it was not good for performance. So that, new codes do not use those files, and s3fs reads directly large object from s3fs's cache file. 2) Some value to symbol Changed some value to symbol(define). git-svn-id: http://s3fs.googlecode.com/svn/trunk@457 df820570-a93a-0410-bd06-b72b767a4274 --- src/curl.cpp | 136 ++++++++++++++++++++++++++-------------------- src/curl.h | 27 ++++----- src/s3fs.cpp | 7 ++- src/s3fs_util.cpp | 61 --------------------- src/s3fs_util.h | 1 - 5 files changed, 95 insertions(+), 137 deletions(-) diff --git a/src/curl.cpp b/src/curl.cpp index a8c3144..4b7b8fb 100644 --- a/src/curl.cpp +++ b/src/curl.cpp @@ -521,6 +521,39 @@ size_t S3fsCurl::HeaderCallback(void *data, size_t blockSize, size_t numBlocks, return blockSize * numBlocks; } +size_t S3fsCurl::UploadReadCallback(void *ptr, size_t size, size_t nmemb, void *userp) +{ + S3fsCurl* pCurl = reinterpret_cast(userp); + + if(1 > (size * nmemb)){ + return 0; + } + if(-1 == pCurl->partdata.fd || 0 >= pCurl->partdata.size){ + return 0; + } + // read size + ssize_t copysize = (size * nmemb) < (size_t)pCurl->partdata.size ? (size * nmemb) : (size_t)pCurl->partdata.size; + ssize_t readbytes; + ssize_t totalread; + // read and set + for(totalread = 0, readbytes = 0; totalread < copysize; totalread += readbytes){ + readbytes = pread(pCurl->partdata.fd, &((char*)ptr)[totalread], (copysize - totalread), pCurl->partdata.startpos + totalread); + if(0 == readbytes){ + // eof + break; + }else if(-1 == readbytes){ + // error + FGPRINT("S3fsCurl::UploadReadCallback: read file error(%d).\n", errno); + SYSLOGERR("read file error(%d).", errno); + return 0; + } + } + pCurl->partdata.startpos += totalread; + pCurl->partdata.size -= totalread; + + return totalread; +} + bool S3fsCurl::SetDnsCache(bool isCache) { bool old = S3fsCurl::is_dns_cache; @@ -646,20 +679,19 @@ S3fsCurl* S3fsCurl::UploadMultipartPostRetryCallback(S3fsCurl* s3fscurl) // duplicate request S3fsCurl* newcurl = new S3fsCurl(); - newcurl->partdata.partfile = s3fscurl->partdata.partfile; newcurl->partdata.etaglist = s3fscurl->partdata.etaglist; newcurl->partdata.etagpos = s3fscurl->partdata.etagpos; + newcurl->partdata.fd = s3fscurl->partdata.fd; + newcurl->partdata.startpos = s3fscurl->partdata.startpos; + newcurl->partdata.size = s3fscurl->partdata.size; // setup new curl object if(!newcurl->UploadMultipartPostSetup(s3fscurl->path.c_str(), part_num, upload_id)){ FGPRINT(" S3fsCurl::UploadMultipartPostRetryCallback : Could not duplicate curl object(%s:%d).\n", s3fscurl->path.c_str(), part_num); SYSLOGERR("Could not duplicate curl object(%s:%d).", s3fscurl->path.c_str(), part_num); - newcurl->partdata.partfile = ""; // for do not removing tmp file. delete newcurl; return NULL; } - s3fscurl->partdata.partfile = ""; // for do not removing tmp file. - return newcurl; } @@ -725,19 +757,12 @@ int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, headers_t& meta, chunk = remaining_bytes > MULTIPART_SIZE ? MULTIPART_SIZE : remaining_bytes; // s3fscurl sub object - S3fsCurl* s3fscurl_para = new S3fsCurl(); + S3fsCurl* s3fscurl_para = new S3fsCurl(); + s3fscurl_para->partdata.fd = fd2; + s3fscurl_para->partdata.startpos = st.st_size - remaining_bytes; + s3fscurl_para->partdata.size = chunk; s3fscurl_para->partdata.add_etag_list(&list); - // make temp file - if(0 != (result = copy_chunk_tempfile(file, buf, chunk, s3fscurl_para->partdata.partfile))){ - FGPRINT("S3fsCurl::ParallelMultipartUploadRequest: failed to make temp file(%d)\n", ferror(file)); - SYSLOGERR("failed to make temp file(%d)", ferror(file)); - free(buf); - fclose(file); - delete s3fscurl_para; - return result; - } - // initiate upload part for parallel if(0 != (result = s3fscurl_para->UploadMultipartPostSetup(tpath, list.size(), upload_id))){ FGPRINT("S3fsCurl::ParallelMultipartUploadRequest: failed uploading part setup(%d)\n", result); @@ -1907,44 +1932,30 @@ int S3fsCurl::MultipartListRequest(string& body) // Content-MD5: pUNXr/BjKK5G2UKvaRRrOA== // Authorization: AWS VGhpcyBtZXNzYWdlIHNpZ25lZGGieSRlbHZpbmc= // + int S3fsCurl::UploadMultipartPostSetup(const char* tpath, int part_num, string& upload_id) { - int para_fd; - struct stat st; + FGPRINT(" S3fsCurl::UploadMultipartPostSetup[tpath=%s][start=%zd][size=%zd][part=%d]\n", + SAFESTRPTR(tpath), partdata.startpos, partdata.size, part_num); - FGPRINT(" S3fsCurl::UploadMultipartPostSetup[tpath=%s][fpath=%s][part=%d]\n", SAFESTRPTR(tpath), partdata.partfile.c_str(), part_num); + if(-1 == partdata.fd || -1 == partdata.startpos || -1 == partdata.size){ + return -1; + } // make md5 and file pointer - if(-1 == (para_fd = open(partdata.partfile.c_str(), O_RDONLY))){ - FGPRINT("S3fsCurl::UploadMultipartPostSetup: Could not open file(%s) - errorno(%d)\n", partdata.partfile.c_str(), errno); - SYSLOGERR("Could not open file(%s) - errno(%d)", partdata.partfile.c_str(), errno); - return -errno; - } - if(-1 == fstat(para_fd, &st)){ - FGPRINT("S3fsCurl::UploadMultipartPostSetup: Invalid file(%s) discriptor(errno=%d)\n", partdata.partfile.c_str(), errno); - SYSLOGERR("Invalid file(%s) discriptor(errno=%d)", partdata.partfile.c_str(), errno); - close(para_fd); - return -errno; - } - partdata.etag = md5sum(para_fd); + partdata.etag = md5sum(partdata.fd, partdata.startpos, partdata.size); if(partdata.etag.empty()){ - FGPRINT("S3fsCurl::UploadMultipartPostSetup: Could not make md5 for file(%s)\n", partdata.partfile.c_str()); - SYSLOGERR("Could not make md5 for file(%s)", partdata.partfile.c_str()); - close(para_fd); + FGPRINT("S3fsCurl::UploadMultipartPostSetup: Could not make md5 for file(part %d)\n", part_num); + SYSLOGERR("Could not make md5 for file(part %d)", part_num); return -1; } - if(NULL == (partdata.fppart = fdopen(para_fd, "rb"))){ - FGPRINT("S3fsCurl::UploadMultipartPostSetup: Invalid file(%s) discriptor(errno=%d)\n", partdata.partfile.c_str(), errno); - SYSLOGERR("Invalid file(%s) discriptor(errno=%d)", partdata.partfile.c_str(), errno); - close(para_fd); - return -errno; - } + // create handle if(!CreateCurlHandle(true)){ - fclose(partdata.fppart); - partdata.fppart = NULL; return -1; } + + // make request string urlargs = "?partNumber=" + IntToStr(part_num) + "&uploadId=" + upload_id; string resource; string turl; @@ -1977,8 +1988,9 @@ int S3fsCurl::UploadMultipartPostSetup(const char* tpath, int part_num, string& curl_easy_setopt(hCurl, CURLOPT_WRITEFUNCTION, WriteMemoryCallback); curl_easy_setopt(hCurl, CURLOPT_HEADERDATA, (void*)headdata); curl_easy_setopt(hCurl, CURLOPT_HEADERFUNCTION, WriteMemoryCallback); - curl_easy_setopt(hCurl, CURLOPT_INFILESIZE_LARGE, static_cast(st.st_size)); // Content-Length - curl_easy_setopt(hCurl, CURLOPT_INFILE, partdata.fppart); + curl_easy_setopt(hCurl, CURLOPT_INFILESIZE_LARGE, partdata.size); // Content-Length + curl_easy_setopt(hCurl, CURLOPT_READFUNCTION, S3fsCurl::UploadReadCallback); + curl_easy_setopt(hCurl, CURLOPT_READDATA, (void*)this); curl_easy_setopt(hCurl, CURLOPT_HTTPHEADER, requestHeaders); return 0; @@ -1988,7 +2000,8 @@ int S3fsCurl::UploadMultipartPostRequest(const char* tpath, int part_num, string { int result; - FGPRINT(" S3fsCurl::UploadMultipartPostRequest[tpath=%s][fpath=%s][part=%d]\n", SAFESTRPTR(tpath), partdata.partfile.c_str(), part_num); + FGPRINT(" S3fsCurl::UploadMultipartPostRequest[tpath=%s][start=%zd][size=%zd][part=%d]\n", + SAFESTRPTR(tpath), partdata.startpos, partdata.size, part_num); // setup if(0 != (result = S3fsCurl::UploadMultipartPostSetup(tpath, part_num, upload_id))){ @@ -2190,14 +2203,10 @@ int S3fsCurl::MultipartUploadRequest(const char* tpath, headers_t& meta, int fd, // chunk size chunk = remaining_bytes > MULTIPART_SIZE ? MULTIPART_SIZE : remaining_bytes; - // make temp file - if(0 != (result = copy_chunk_tempfile(file, buf, chunk, partdata.partfile))){ - FGPRINT("S3fsCurl::MultipartUploadRequest: failed to make temp file(%d)\n", ferror(file)); - SYSLOGERR("failed to make temp file(%d)", ferror(file)); - free(buf); - fclose(file); - return result; - } + // set + partdata.fd = fd2; + partdata.startpos = st.st_size - remaining_bytes; + partdata.size = chunk; // upload part if(0 != (result = UploadMultipartPostRequest(tpath, (list.size() + 1), upload_id))){ @@ -2550,7 +2559,7 @@ string GetContentMD5(int fd) return Signature; } -unsigned char* md5hexsum(int fd) +unsigned char* md5hexsum(int fd, off_t start, off_t size) { MD5_CTX c; char buf[512]; @@ -2558,19 +2567,30 @@ unsigned char* md5hexsum(int fd) unsigned char* result = (unsigned char*)malloc(MD5_DIGEST_LENGTH); // seek to top of file. - if(-1 == lseek(fd, 0, SEEK_SET)){ + if(-1 == lseek(fd, start, SEEK_SET)){ return NULL; } memset(buf, 0, 512); MD5_Init(&c); - while((bytes = read(fd, buf, 512)) > 0) { + for(ssize_t total = 0; total < size; total += bytes){ + bytes = 512 < (size - total) ? 512 : (size - total); + bytes = read(fd, buf, bytes); + if(0 == bytes){ + // end of file + break; + }else if(-1 == bytes){ + // error + FGPRINT("md5hexsum: : file read error(%d)\n", errno); + free(result); + return NULL; + } MD5_Update(&c, buf, bytes); memset(buf, 0, 512); } MD5_Final(result, &c); - if(-1 == lseek(fd, 0, SEEK_SET)){ + if(-1 == lseek(fd, start, SEEK_SET)){ free(result); return NULL; } @@ -2578,13 +2598,13 @@ unsigned char* md5hexsum(int fd) return result; } -string md5sum(int fd) +string md5sum(int fd, off_t start, off_t size) { char md5[2 * MD5_DIGEST_LENGTH + 1]; char hexbuf[3]; unsigned char* md5hex; - if(NULL == (md5hex = md5hexsum(fd))){ + if(NULL == (md5hex = md5hexsum(fd, start, size))){ return string(""); } diff --git a/src/curl.h b/src/curl.h index 0ecc7f3..9733eda 100644 --- a/src/curl.h +++ b/src/curl.h @@ -44,14 +44,15 @@ typedef std::vector etaglist_t; // Each part information for Multipart upload struct filepart { - bool uploaded; - std::string partfile; - std::string etag; - FILE* fppart; + bool uploaded; // does finish uploading + std::string etag; // expected etag value + int fd; // base file(temporary full file) discriptor + off_t startpos; // seek fd point for uploading + ssize_t size; // uploading size etaglist_t* etaglist; // use only parallel upload int etagpos; // use only parallel upload - filepart() : uploaded(false), fppart(NULL), etaglist(NULL), etagpos(-1) {} + filepart() : uploaded(false), fd(-1), startpos(0), size(-1), etaglist(NULL), etagpos(-1) {} ~filepart() { clear(); @@ -59,16 +60,11 @@ struct filepart void clear(bool isfree = true) { - if(isfree && fppart){ - fclose(fppart); - } - if(isfree && 0 < partfile.size()){ - remove(partfile.c_str()); - } uploaded = false; - partfile = ""; etag = ""; - fppart = NULL; + fd = -1; + startpos = 0; + size = -1; etaglist = NULL; etagpos = - 1; } @@ -165,6 +161,7 @@ class S3fsCurl static size_t HeaderCallback(void *data, size_t blockSize, size_t numBlocks, void *userPtr); static size_t WriteMemoryCallback(void *ptr, size_t blockSize, size_t numBlocks, void *data); static size_t ReadCallback(void *ptr, size_t size, size_t nmemb, void *userp); + static size_t UploadReadCallback(void *ptr, size_t size, size_t nmemb, void *userp); static bool UploadMultipartPostCallback(S3fsCurl* s3fscurl); static S3fsCurl* UploadMultipartPostRetryCallback(S3fsCurl* s3fscurl); @@ -288,8 +285,8 @@ class S3fsMultiCurl // Utility Functions //---------------------------------------------- std::string GetContentMD5(int fd); -unsigned char* md5hexsum(int fd); -std::string md5sum(int fd); +unsigned char* md5hexsum(int fd, off_t start = 0, ssize_t size = -1); +std::string md5sum(int fd, off_t start = 0, ssize_t size = -1); struct curl_slist* curl_slist_sort_insert(struct curl_slist* list, const char* data); bool MakeUrlResource(const char* realpath, std::string& resourcepath, std::string& url); diff --git a/src/s3fs.cpp b/src/s3fs.cpp index fdcc0d9..2f1b5a4 100644 --- a/src/s3fs.cpp +++ b/src/s3fs.cpp @@ -63,6 +63,9 @@ using namespace std; #define IS_REPLACEDIR(type) (DIRTYPE_OLD == type || DIRTYPE_FOLDER == type || DIRTYPE_NOOBJ == type) #define IS_RMTYPEDIR(type) (DIRTYPE_OLD == type || DIRTYPE_FOLDER == type) +#define MAX_OBJECT_SIZE 68719476735LL // 64GB - 1L +#define MULTIPART_LOWLIMIT (20 * 1024 * 1024) // 20MB + //------------------------------------------------------------------- // Global valiables //------------------------------------------------------------------- @@ -810,7 +813,7 @@ static int put_local_fd(const char* path, headers_t meta, int fd, bool ow_sse_fl * * If file is > 20MB, then multipart will kick in */ - if(st.st_size > 68719476735LL){ // 64GB - 1 + if(st.st_size > MAX_OBJECT_SIZE){ // 64GB - 1 // close f ? return -ENOTSUP; } @@ -822,7 +825,7 @@ static int put_local_fd(const char* path, headers_t meta, int fd, bool ow_sse_fl return -errno; } - if(st.st_size >= 20971520 && !nomultipart){ // 20MB + if(st.st_size >= MULTIPART_LOWLIMIT && !nomultipart){ // 20MB // Additional time is needed for large files time_t backup = 0; if(120 > S3fsCurl::GetReadwriteTimeout()){ diff --git a/src/s3fs_util.cpp b/src/s3fs_util.cpp index bede586..3eef28b 100644 --- a/src/s3fs_util.cpp +++ b/src/s3fs_util.cpp @@ -672,67 +672,6 @@ bool is_need_check_obj_detail(headers_t& meta) return true; } -//------------------------------------------------------------------- -// Utility functions for convert -//------------------------------------------------------------------- -// Copy file(file pointer) to tmpfile -int copy_chunk_tempfile(FILE* file, unsigned char* buf, off_t chunk, string& tmpname) -{ - char tmpfile[256]; - off_t copy_total; - off_t copied; - int fd; - FILE* fptmp; - - if(!file || !buf || 0 == chunk){ - return 0; - } - - // copy the file portion into the buffer - for(copy_total = 0; copy_total < chunk; copy_total += copied){ - copied = fread(&buf[copy_total], sizeof(unsigned char), (chunk - copy_total), file); - if(copied != (chunk - copy_total)){ - if(0 != ferror(file) || feof(file)){ - FGPRINT("copy_part_tempfile: read file error(%d)\n", ferror(file)); - SYSLOGERR("read file error(%d)", ferror(file)); - return -(ferror(file)); - } - } - } - - // create uniq temporary file - strncpy(tmpfile, "/tmp/s3fs.XXXXXX", sizeof(tmpfile)); - if(-1 == (fd = mkstemp(tmpfile))){ - FGPRINT("copy_part_tempfile: Could not open tempolary file(%s) - errno(%d)\n", tmpfile, errno); - SYSLOGERR("Could not open tempolary file(%s) - errno(%d)", tmpfile, errno); - return -errno; - } - if(NULL == (fptmp = fdopen(fd, "wb"))){ - FGPRINT("copy_part_tempfile: Could not open tempolary file(%s) - errno(%d)\n", tmpfile, errno); - SYSLOGERR("Could not open tempolary file(%s) - errno(%d)", tmpfile, errno); - close(fd); - return -errno; - } - - // copy buffer to temporary file - for(copy_total = 0; copy_total < chunk; copy_total += copied){ - copied = fwrite(&buf[copy_total], sizeof(unsigned char), (chunk - copy_total), fptmp); - if(copied != (chunk - copy_total)){ - if(0 != ferror(fptmp)){ - FGPRINT("copy_part_tempfile: write file error(%d)\n", ferror(fptmp)); - SYSLOGERR("write file error(%d)", ferror(fptmp)); - fclose(fptmp); - remove(tmpfile); - return -(ferror(fptmp)); - } - } - } - fclose(fptmp); - tmpname = tmpfile; - - return 0; -} - //------------------------------------------------------------------- // Help //------------------------------------------------------------------- diff --git a/src/s3fs_util.h b/src/s3fs_util.h index 0741ebd..90fde86 100644 --- a/src/s3fs_util.h +++ b/src/s3fs_util.h @@ -94,7 +94,6 @@ blkcnt_t get_blocks(off_t size); time_t get_lastmodified(const char* s); time_t get_lastmodified(headers_t& meta); bool is_need_check_obj_detail(headers_t& meta); -int copy_chunk_tempfile(FILE* file, unsigned char* buf, off_t chunk, std::string& tmpname); void show_usage(void); void show_help(void);