diff --git a/src/curl.cpp b/src/curl.cpp index a8c3144..4b7b8fb 100644 --- a/src/curl.cpp +++ b/src/curl.cpp @@ -521,6 +521,39 @@ size_t S3fsCurl::HeaderCallback(void *data, size_t blockSize, size_t numBlocks, return blockSize * numBlocks; } +size_t S3fsCurl::UploadReadCallback(void *ptr, size_t size, size_t nmemb, void *userp) +{ + S3fsCurl* pCurl = reinterpret_cast(userp); + + if(1 > (size * nmemb)){ + return 0; + } + if(-1 == pCurl->partdata.fd || 0 >= pCurl->partdata.size){ + return 0; + } + // read size + ssize_t copysize = (size * nmemb) < (size_t)pCurl->partdata.size ? (size * nmemb) : (size_t)pCurl->partdata.size; + ssize_t readbytes; + ssize_t totalread; + // read and set + for(totalread = 0, readbytes = 0; totalread < copysize; totalread += readbytes){ + readbytes = pread(pCurl->partdata.fd, &((char*)ptr)[totalread], (copysize - totalread), pCurl->partdata.startpos + totalread); + if(0 == readbytes){ + // eof + break; + }else if(-1 == readbytes){ + // error + FGPRINT("S3fsCurl::UploadReadCallback: read file error(%d).\n", errno); + SYSLOGERR("read file error(%d).", errno); + return 0; + } + } + pCurl->partdata.startpos += totalread; + pCurl->partdata.size -= totalread; + + return totalread; +} + bool S3fsCurl::SetDnsCache(bool isCache) { bool old = S3fsCurl::is_dns_cache; @@ -646,20 +679,19 @@ S3fsCurl* S3fsCurl::UploadMultipartPostRetryCallback(S3fsCurl* s3fscurl) // duplicate request S3fsCurl* newcurl = new S3fsCurl(); - newcurl->partdata.partfile = s3fscurl->partdata.partfile; newcurl->partdata.etaglist = s3fscurl->partdata.etaglist; newcurl->partdata.etagpos = s3fscurl->partdata.etagpos; + newcurl->partdata.fd = s3fscurl->partdata.fd; + newcurl->partdata.startpos = s3fscurl->partdata.startpos; + newcurl->partdata.size = s3fscurl->partdata.size; // setup new curl object if(!newcurl->UploadMultipartPostSetup(s3fscurl->path.c_str(), part_num, upload_id)){ FGPRINT(" S3fsCurl::UploadMultipartPostRetryCallback : Could not duplicate curl object(%s:%d).\n", s3fscurl->path.c_str(), part_num); SYSLOGERR("Could not duplicate curl object(%s:%d).", s3fscurl->path.c_str(), part_num); - newcurl->partdata.partfile = ""; // for do not removing tmp file. delete newcurl; return NULL; } - s3fscurl->partdata.partfile = ""; // for do not removing tmp file. - return newcurl; } @@ -725,19 +757,12 @@ int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, headers_t& meta, chunk = remaining_bytes > MULTIPART_SIZE ? MULTIPART_SIZE : remaining_bytes; // s3fscurl sub object - S3fsCurl* s3fscurl_para = new S3fsCurl(); + S3fsCurl* s3fscurl_para = new S3fsCurl(); + s3fscurl_para->partdata.fd = fd2; + s3fscurl_para->partdata.startpos = st.st_size - remaining_bytes; + s3fscurl_para->partdata.size = chunk; s3fscurl_para->partdata.add_etag_list(&list); - // make temp file - if(0 != (result = copy_chunk_tempfile(file, buf, chunk, s3fscurl_para->partdata.partfile))){ - FGPRINT("S3fsCurl::ParallelMultipartUploadRequest: failed to make temp file(%d)\n", ferror(file)); - SYSLOGERR("failed to make temp file(%d)", ferror(file)); - free(buf); - fclose(file); - delete s3fscurl_para; - return result; - } - // initiate upload part for parallel if(0 != (result = s3fscurl_para->UploadMultipartPostSetup(tpath, list.size(), upload_id))){ FGPRINT("S3fsCurl::ParallelMultipartUploadRequest: failed uploading part setup(%d)\n", result); @@ -1907,44 +1932,30 @@ int S3fsCurl::MultipartListRequest(string& body) // Content-MD5: pUNXr/BjKK5G2UKvaRRrOA== // Authorization: AWS VGhpcyBtZXNzYWdlIHNpZ25lZGGieSRlbHZpbmc= // + int S3fsCurl::UploadMultipartPostSetup(const char* tpath, int part_num, string& upload_id) { - int para_fd; - struct stat st; + FGPRINT(" S3fsCurl::UploadMultipartPostSetup[tpath=%s][start=%zd][size=%zd][part=%d]\n", + SAFESTRPTR(tpath), partdata.startpos, partdata.size, part_num); - FGPRINT(" S3fsCurl::UploadMultipartPostSetup[tpath=%s][fpath=%s][part=%d]\n", SAFESTRPTR(tpath), partdata.partfile.c_str(), part_num); + if(-1 == partdata.fd || -1 == partdata.startpos || -1 == partdata.size){ + return -1; + } // make md5 and file pointer - if(-1 == (para_fd = open(partdata.partfile.c_str(), O_RDONLY))){ - FGPRINT("S3fsCurl::UploadMultipartPostSetup: Could not open file(%s) - errorno(%d)\n", partdata.partfile.c_str(), errno); - SYSLOGERR("Could not open file(%s) - errno(%d)", partdata.partfile.c_str(), errno); - return -errno; - } - if(-1 == fstat(para_fd, &st)){ - FGPRINT("S3fsCurl::UploadMultipartPostSetup: Invalid file(%s) discriptor(errno=%d)\n", partdata.partfile.c_str(), errno); - SYSLOGERR("Invalid file(%s) discriptor(errno=%d)", partdata.partfile.c_str(), errno); - close(para_fd); - return -errno; - } - partdata.etag = md5sum(para_fd); + partdata.etag = md5sum(partdata.fd, partdata.startpos, partdata.size); if(partdata.etag.empty()){ - FGPRINT("S3fsCurl::UploadMultipartPostSetup: Could not make md5 for file(%s)\n", partdata.partfile.c_str()); - SYSLOGERR("Could not make md5 for file(%s)", partdata.partfile.c_str()); - close(para_fd); + FGPRINT("S3fsCurl::UploadMultipartPostSetup: Could not make md5 for file(part %d)\n", part_num); + SYSLOGERR("Could not make md5 for file(part %d)", part_num); return -1; } - if(NULL == (partdata.fppart = fdopen(para_fd, "rb"))){ - FGPRINT("S3fsCurl::UploadMultipartPostSetup: Invalid file(%s) discriptor(errno=%d)\n", partdata.partfile.c_str(), errno); - SYSLOGERR("Invalid file(%s) discriptor(errno=%d)", partdata.partfile.c_str(), errno); - close(para_fd); - return -errno; - } + // create handle if(!CreateCurlHandle(true)){ - fclose(partdata.fppart); - partdata.fppart = NULL; return -1; } + + // make request string urlargs = "?partNumber=" + IntToStr(part_num) + "&uploadId=" + upload_id; string resource; string turl; @@ -1977,8 +1988,9 @@ int S3fsCurl::UploadMultipartPostSetup(const char* tpath, int part_num, string& curl_easy_setopt(hCurl, CURLOPT_WRITEFUNCTION, WriteMemoryCallback); curl_easy_setopt(hCurl, CURLOPT_HEADERDATA, (void*)headdata); curl_easy_setopt(hCurl, CURLOPT_HEADERFUNCTION, WriteMemoryCallback); - curl_easy_setopt(hCurl, CURLOPT_INFILESIZE_LARGE, static_cast(st.st_size)); // Content-Length - curl_easy_setopt(hCurl, CURLOPT_INFILE, partdata.fppart); + curl_easy_setopt(hCurl, CURLOPT_INFILESIZE_LARGE, partdata.size); // Content-Length + curl_easy_setopt(hCurl, CURLOPT_READFUNCTION, S3fsCurl::UploadReadCallback); + curl_easy_setopt(hCurl, CURLOPT_READDATA, (void*)this); curl_easy_setopt(hCurl, CURLOPT_HTTPHEADER, requestHeaders); return 0; @@ -1988,7 +2000,8 @@ int S3fsCurl::UploadMultipartPostRequest(const char* tpath, int part_num, string { int result; - FGPRINT(" S3fsCurl::UploadMultipartPostRequest[tpath=%s][fpath=%s][part=%d]\n", SAFESTRPTR(tpath), partdata.partfile.c_str(), part_num); + FGPRINT(" S3fsCurl::UploadMultipartPostRequest[tpath=%s][start=%zd][size=%zd][part=%d]\n", + SAFESTRPTR(tpath), partdata.startpos, partdata.size, part_num); // setup if(0 != (result = S3fsCurl::UploadMultipartPostSetup(tpath, part_num, upload_id))){ @@ -2190,14 +2203,10 @@ int S3fsCurl::MultipartUploadRequest(const char* tpath, headers_t& meta, int fd, // chunk size chunk = remaining_bytes > MULTIPART_SIZE ? MULTIPART_SIZE : remaining_bytes; - // make temp file - if(0 != (result = copy_chunk_tempfile(file, buf, chunk, partdata.partfile))){ - FGPRINT("S3fsCurl::MultipartUploadRequest: failed to make temp file(%d)\n", ferror(file)); - SYSLOGERR("failed to make temp file(%d)", ferror(file)); - free(buf); - fclose(file); - return result; - } + // set + partdata.fd = fd2; + partdata.startpos = st.st_size - remaining_bytes; + partdata.size = chunk; // upload part if(0 != (result = UploadMultipartPostRequest(tpath, (list.size() + 1), upload_id))){ @@ -2550,7 +2559,7 @@ string GetContentMD5(int fd) return Signature; } -unsigned char* md5hexsum(int fd) +unsigned char* md5hexsum(int fd, off_t start, off_t size) { MD5_CTX c; char buf[512]; @@ -2558,19 +2567,30 @@ unsigned char* md5hexsum(int fd) unsigned char* result = (unsigned char*)malloc(MD5_DIGEST_LENGTH); // seek to top of file. - if(-1 == lseek(fd, 0, SEEK_SET)){ + if(-1 == lseek(fd, start, SEEK_SET)){ return NULL; } memset(buf, 0, 512); MD5_Init(&c); - while((bytes = read(fd, buf, 512)) > 0) { + for(ssize_t total = 0; total < size; total += bytes){ + bytes = 512 < (size - total) ? 512 : (size - total); + bytes = read(fd, buf, bytes); + if(0 == bytes){ + // end of file + break; + }else if(-1 == bytes){ + // error + FGPRINT("md5hexsum: : file read error(%d)\n", errno); + free(result); + return NULL; + } MD5_Update(&c, buf, bytes); memset(buf, 0, 512); } MD5_Final(result, &c); - if(-1 == lseek(fd, 0, SEEK_SET)){ + if(-1 == lseek(fd, start, SEEK_SET)){ free(result); return NULL; } @@ -2578,13 +2598,13 @@ unsigned char* md5hexsum(int fd) return result; } -string md5sum(int fd) +string md5sum(int fd, off_t start, off_t size) { char md5[2 * MD5_DIGEST_LENGTH + 1]; char hexbuf[3]; unsigned char* md5hex; - if(NULL == (md5hex = md5hexsum(fd))){ + if(NULL == (md5hex = md5hexsum(fd, start, size))){ return string(""); } diff --git a/src/curl.h b/src/curl.h index 0ecc7f3..9733eda 100644 --- a/src/curl.h +++ b/src/curl.h @@ -44,14 +44,15 @@ typedef std::vector etaglist_t; // Each part information for Multipart upload struct filepart { - bool uploaded; - std::string partfile; - std::string etag; - FILE* fppart; + bool uploaded; // does finish uploading + std::string etag; // expected etag value + int fd; // base file(temporary full file) discriptor + off_t startpos; // seek fd point for uploading + ssize_t size; // uploading size etaglist_t* etaglist; // use only parallel upload int etagpos; // use only parallel upload - filepart() : uploaded(false), fppart(NULL), etaglist(NULL), etagpos(-1) {} + filepart() : uploaded(false), fd(-1), startpos(0), size(-1), etaglist(NULL), etagpos(-1) {} ~filepart() { clear(); @@ -59,16 +60,11 @@ struct filepart void clear(bool isfree = true) { - if(isfree && fppart){ - fclose(fppart); - } - if(isfree && 0 < partfile.size()){ - remove(partfile.c_str()); - } uploaded = false; - partfile = ""; etag = ""; - fppart = NULL; + fd = -1; + startpos = 0; + size = -1; etaglist = NULL; etagpos = - 1; } @@ -165,6 +161,7 @@ class S3fsCurl static size_t HeaderCallback(void *data, size_t blockSize, size_t numBlocks, void *userPtr); static size_t WriteMemoryCallback(void *ptr, size_t blockSize, size_t numBlocks, void *data); static size_t ReadCallback(void *ptr, size_t size, size_t nmemb, void *userp); + static size_t UploadReadCallback(void *ptr, size_t size, size_t nmemb, void *userp); static bool UploadMultipartPostCallback(S3fsCurl* s3fscurl); static S3fsCurl* UploadMultipartPostRetryCallback(S3fsCurl* s3fscurl); @@ -288,8 +285,8 @@ class S3fsMultiCurl // Utility Functions //---------------------------------------------- std::string GetContentMD5(int fd); -unsigned char* md5hexsum(int fd); -std::string md5sum(int fd); +unsigned char* md5hexsum(int fd, off_t start = 0, ssize_t size = -1); +std::string md5sum(int fd, off_t start = 0, ssize_t size = -1); struct curl_slist* curl_slist_sort_insert(struct curl_slist* list, const char* data); bool MakeUrlResource(const char* realpath, std::string& resourcepath, std::string& url); diff --git a/src/s3fs.cpp b/src/s3fs.cpp index fdcc0d9..2f1b5a4 100644 --- a/src/s3fs.cpp +++ b/src/s3fs.cpp @@ -63,6 +63,9 @@ using namespace std; #define IS_REPLACEDIR(type) (DIRTYPE_OLD == type || DIRTYPE_FOLDER == type || DIRTYPE_NOOBJ == type) #define IS_RMTYPEDIR(type) (DIRTYPE_OLD == type || DIRTYPE_FOLDER == type) +#define MAX_OBJECT_SIZE 68719476735LL // 64GB - 1L +#define MULTIPART_LOWLIMIT (20 * 1024 * 1024) // 20MB + //------------------------------------------------------------------- // Global valiables //------------------------------------------------------------------- @@ -810,7 +813,7 @@ static int put_local_fd(const char* path, headers_t meta, int fd, bool ow_sse_fl * * If file is > 20MB, then multipart will kick in */ - if(st.st_size > 68719476735LL){ // 64GB - 1 + if(st.st_size > MAX_OBJECT_SIZE){ // 64GB - 1 // close f ? return -ENOTSUP; } @@ -822,7 +825,7 @@ static int put_local_fd(const char* path, headers_t meta, int fd, bool ow_sse_fl return -errno; } - if(st.st_size >= 20971520 && !nomultipart){ // 20MB + if(st.st_size >= MULTIPART_LOWLIMIT && !nomultipart){ // 20MB // Additional time is needed for large files time_t backup = 0; if(120 > S3fsCurl::GetReadwriteTimeout()){ diff --git a/src/s3fs_util.cpp b/src/s3fs_util.cpp index bede586..3eef28b 100644 --- a/src/s3fs_util.cpp +++ b/src/s3fs_util.cpp @@ -672,67 +672,6 @@ bool is_need_check_obj_detail(headers_t& meta) return true; } -//------------------------------------------------------------------- -// Utility functions for convert -//------------------------------------------------------------------- -// Copy file(file pointer) to tmpfile -int copy_chunk_tempfile(FILE* file, unsigned char* buf, off_t chunk, string& tmpname) -{ - char tmpfile[256]; - off_t copy_total; - off_t copied; - int fd; - FILE* fptmp; - - if(!file || !buf || 0 == chunk){ - return 0; - } - - // copy the file portion into the buffer - for(copy_total = 0; copy_total < chunk; copy_total += copied){ - copied = fread(&buf[copy_total], sizeof(unsigned char), (chunk - copy_total), file); - if(copied != (chunk - copy_total)){ - if(0 != ferror(file) || feof(file)){ - FGPRINT("copy_part_tempfile: read file error(%d)\n", ferror(file)); - SYSLOGERR("read file error(%d)", ferror(file)); - return -(ferror(file)); - } - } - } - - // create uniq temporary file - strncpy(tmpfile, "/tmp/s3fs.XXXXXX", sizeof(tmpfile)); - if(-1 == (fd = mkstemp(tmpfile))){ - FGPRINT("copy_part_tempfile: Could not open tempolary file(%s) - errno(%d)\n", tmpfile, errno); - SYSLOGERR("Could not open tempolary file(%s) - errno(%d)", tmpfile, errno); - return -errno; - } - if(NULL == (fptmp = fdopen(fd, "wb"))){ - FGPRINT("copy_part_tempfile: Could not open tempolary file(%s) - errno(%d)\n", tmpfile, errno); - SYSLOGERR("Could not open tempolary file(%s) - errno(%d)", tmpfile, errno); - close(fd); - return -errno; - } - - // copy buffer to temporary file - for(copy_total = 0; copy_total < chunk; copy_total += copied){ - copied = fwrite(&buf[copy_total], sizeof(unsigned char), (chunk - copy_total), fptmp); - if(copied != (chunk - copy_total)){ - if(0 != ferror(fptmp)){ - FGPRINT("copy_part_tempfile: write file error(%d)\n", ferror(fptmp)); - SYSLOGERR("write file error(%d)", ferror(fptmp)); - fclose(fptmp); - remove(tmpfile); - return -(ferror(fptmp)); - } - } - } - fclose(fptmp); - tmpname = tmpfile; - - return 0; -} - //------------------------------------------------------------------- // Help //------------------------------------------------------------------- diff --git a/src/s3fs_util.h b/src/s3fs_util.h index 0741ebd..90fde86 100644 --- a/src/s3fs_util.h +++ b/src/s3fs_util.h @@ -94,7 +94,6 @@ blkcnt_t get_blocks(off_t size); time_t get_lastmodified(const char* s); time_t get_lastmodified(headers_t& meta); bool is_need_check_obj_detail(headers_t& meta); -int copy_chunk_tempfile(FILE* file, unsigned char* buf, off_t chunk, std::string& tmpname); void show_usage(void); void show_help(void);