Changes codes

1) For uploading performance(part 2)
   Changed a codes about uploading large object(multipart uploading).
   This revision does not make temporary file when s3fs uploads large object by multipart uploading.
   Before this revision, s3fs made temporary file(/tmp/s3fs.XXXXX) for multipart, but it was not good for performance.
   So that, new codes do not use those files, and s3fs reads directly large object from s3fs's cache file.

2) Some value to symbol
   Changed some value to symbol(define).



git-svn-id: http://s3fs.googlecode.com/svn/trunk@457 df820570-a93a-0410-bd06-b72b767a4274
This commit is contained in:
ggtakec@gmail.com 2013-07-12 00:33:36 +00:00
parent 1095b7bc52
commit 1c93dd30c1
5 changed files with 95 additions and 137 deletions

View File

@ -521,6 +521,39 @@ size_t S3fsCurl::HeaderCallback(void *data, size_t blockSize, size_t numBlocks,
return blockSize * numBlocks;
}
size_t S3fsCurl::UploadReadCallback(void *ptr, size_t size, size_t nmemb, void *userp)
{
S3fsCurl* pCurl = reinterpret_cast<S3fsCurl*>(userp);
if(1 > (size * nmemb)){
return 0;
}
if(-1 == pCurl->partdata.fd || 0 >= pCurl->partdata.size){
return 0;
}
// read size
ssize_t copysize = (size * nmemb) < (size_t)pCurl->partdata.size ? (size * nmemb) : (size_t)pCurl->partdata.size;
ssize_t readbytes;
ssize_t totalread;
// read and set
for(totalread = 0, readbytes = 0; totalread < copysize; totalread += readbytes){
readbytes = pread(pCurl->partdata.fd, &((char*)ptr)[totalread], (copysize - totalread), pCurl->partdata.startpos + totalread);
if(0 == readbytes){
// eof
break;
}else if(-1 == readbytes){
// error
FGPRINT("S3fsCurl::UploadReadCallback: read file error(%d).\n", errno);
SYSLOGERR("read file error(%d).", errno);
return 0;
}
}
pCurl->partdata.startpos += totalread;
pCurl->partdata.size -= totalread;
return totalread;
}
bool S3fsCurl::SetDnsCache(bool isCache)
{
bool old = S3fsCurl::is_dns_cache;
@ -646,20 +679,19 @@ S3fsCurl* S3fsCurl::UploadMultipartPostRetryCallback(S3fsCurl* s3fscurl)
// duplicate request
S3fsCurl* newcurl = new S3fsCurl();
newcurl->partdata.partfile = s3fscurl->partdata.partfile;
newcurl->partdata.etaglist = s3fscurl->partdata.etaglist;
newcurl->partdata.etagpos = s3fscurl->partdata.etagpos;
newcurl->partdata.fd = s3fscurl->partdata.fd;
newcurl->partdata.startpos = s3fscurl->partdata.startpos;
newcurl->partdata.size = s3fscurl->partdata.size;
// setup new curl object
if(!newcurl->UploadMultipartPostSetup(s3fscurl->path.c_str(), part_num, upload_id)){
FGPRINT(" S3fsCurl::UploadMultipartPostRetryCallback : Could not duplicate curl object(%s:%d).\n", s3fscurl->path.c_str(), part_num);
SYSLOGERR("Could not duplicate curl object(%s:%d).", s3fscurl->path.c_str(), part_num);
newcurl->partdata.partfile = ""; // for do not removing tmp file.
delete newcurl;
return NULL;
}
s3fscurl->partdata.partfile = ""; // for do not removing tmp file.
return newcurl;
}
@ -725,19 +757,12 @@ int S3fsCurl::ParallelMultipartUploadRequest(const char* tpath, headers_t& meta,
chunk = remaining_bytes > MULTIPART_SIZE ? MULTIPART_SIZE : remaining_bytes;
// s3fscurl sub object
S3fsCurl* s3fscurl_para = new S3fsCurl();
S3fsCurl* s3fscurl_para = new S3fsCurl();
s3fscurl_para->partdata.fd = fd2;
s3fscurl_para->partdata.startpos = st.st_size - remaining_bytes;
s3fscurl_para->partdata.size = chunk;
s3fscurl_para->partdata.add_etag_list(&list);
// make temp file
if(0 != (result = copy_chunk_tempfile(file, buf, chunk, s3fscurl_para->partdata.partfile))){
FGPRINT("S3fsCurl::ParallelMultipartUploadRequest: failed to make temp file(%d)\n", ferror(file));
SYSLOGERR("failed to make temp file(%d)", ferror(file));
free(buf);
fclose(file);
delete s3fscurl_para;
return result;
}
// initiate upload part for parallel
if(0 != (result = s3fscurl_para->UploadMultipartPostSetup(tpath, list.size(), upload_id))){
FGPRINT("S3fsCurl::ParallelMultipartUploadRequest: failed uploading part setup(%d)\n", result);
@ -1907,44 +1932,30 @@ int S3fsCurl::MultipartListRequest(string& body)
// Content-MD5: pUNXr/BjKK5G2UKvaRRrOA==
// Authorization: AWS VGhpcyBtZXNzYWdlIHNpZ25lZGGieSRlbHZpbmc=
//
int S3fsCurl::UploadMultipartPostSetup(const char* tpath, int part_num, string& upload_id)
{
int para_fd;
struct stat st;
FGPRINT(" S3fsCurl::UploadMultipartPostSetup[tpath=%s][start=%zd][size=%zd][part=%d]\n",
SAFESTRPTR(tpath), partdata.startpos, partdata.size, part_num);
FGPRINT(" S3fsCurl::UploadMultipartPostSetup[tpath=%s][fpath=%s][part=%d]\n", SAFESTRPTR(tpath), partdata.partfile.c_str(), part_num);
if(-1 == partdata.fd || -1 == partdata.startpos || -1 == partdata.size){
return -1;
}
// make md5 and file pointer
if(-1 == (para_fd = open(partdata.partfile.c_str(), O_RDONLY))){
FGPRINT("S3fsCurl::UploadMultipartPostSetup: Could not open file(%s) - errorno(%d)\n", partdata.partfile.c_str(), errno);
SYSLOGERR("Could not open file(%s) - errno(%d)", partdata.partfile.c_str(), errno);
return -errno;
}
if(-1 == fstat(para_fd, &st)){
FGPRINT("S3fsCurl::UploadMultipartPostSetup: Invalid file(%s) discriptor(errno=%d)\n", partdata.partfile.c_str(), errno);
SYSLOGERR("Invalid file(%s) discriptor(errno=%d)", partdata.partfile.c_str(), errno);
close(para_fd);
return -errno;
}
partdata.etag = md5sum(para_fd);
partdata.etag = md5sum(partdata.fd, partdata.startpos, partdata.size);
if(partdata.etag.empty()){
FGPRINT("S3fsCurl::UploadMultipartPostSetup: Could not make md5 for file(%s)\n", partdata.partfile.c_str());
SYSLOGERR("Could not make md5 for file(%s)", partdata.partfile.c_str());
close(para_fd);
FGPRINT("S3fsCurl::UploadMultipartPostSetup: Could not make md5 for file(part %d)\n", part_num);
SYSLOGERR("Could not make md5 for file(part %d)", part_num);
return -1;
}
if(NULL == (partdata.fppart = fdopen(para_fd, "rb"))){
FGPRINT("S3fsCurl::UploadMultipartPostSetup: Invalid file(%s) discriptor(errno=%d)\n", partdata.partfile.c_str(), errno);
SYSLOGERR("Invalid file(%s) discriptor(errno=%d)", partdata.partfile.c_str(), errno);
close(para_fd);
return -errno;
}
// create handle
if(!CreateCurlHandle(true)){
fclose(partdata.fppart);
partdata.fppart = NULL;
return -1;
}
// make request
string urlargs = "?partNumber=" + IntToStr(part_num) + "&uploadId=" + upload_id;
string resource;
string turl;
@ -1977,8 +1988,9 @@ int S3fsCurl::UploadMultipartPostSetup(const char* tpath, int part_num, string&
curl_easy_setopt(hCurl, CURLOPT_WRITEFUNCTION, WriteMemoryCallback);
curl_easy_setopt(hCurl, CURLOPT_HEADERDATA, (void*)headdata);
curl_easy_setopt(hCurl, CURLOPT_HEADERFUNCTION, WriteMemoryCallback);
curl_easy_setopt(hCurl, CURLOPT_INFILESIZE_LARGE, static_cast<curl_off_t>(st.st_size)); // Content-Length
curl_easy_setopt(hCurl, CURLOPT_INFILE, partdata.fppart);
curl_easy_setopt(hCurl, CURLOPT_INFILESIZE_LARGE, partdata.size); // Content-Length
curl_easy_setopt(hCurl, CURLOPT_READFUNCTION, S3fsCurl::UploadReadCallback);
curl_easy_setopt(hCurl, CURLOPT_READDATA, (void*)this);
curl_easy_setopt(hCurl, CURLOPT_HTTPHEADER, requestHeaders);
return 0;
@ -1988,7 +2000,8 @@ int S3fsCurl::UploadMultipartPostRequest(const char* tpath, int part_num, string
{
int result;
FGPRINT(" S3fsCurl::UploadMultipartPostRequest[tpath=%s][fpath=%s][part=%d]\n", SAFESTRPTR(tpath), partdata.partfile.c_str(), part_num);
FGPRINT(" S3fsCurl::UploadMultipartPostRequest[tpath=%s][start=%zd][size=%zd][part=%d]\n",
SAFESTRPTR(tpath), partdata.startpos, partdata.size, part_num);
// setup
if(0 != (result = S3fsCurl::UploadMultipartPostSetup(tpath, part_num, upload_id))){
@ -2190,14 +2203,10 @@ int S3fsCurl::MultipartUploadRequest(const char* tpath, headers_t& meta, int fd,
// chunk size
chunk = remaining_bytes > MULTIPART_SIZE ? MULTIPART_SIZE : remaining_bytes;
// make temp file
if(0 != (result = copy_chunk_tempfile(file, buf, chunk, partdata.partfile))){
FGPRINT("S3fsCurl::MultipartUploadRequest: failed to make temp file(%d)\n", ferror(file));
SYSLOGERR("failed to make temp file(%d)", ferror(file));
free(buf);
fclose(file);
return result;
}
// set
partdata.fd = fd2;
partdata.startpos = st.st_size - remaining_bytes;
partdata.size = chunk;
// upload part
if(0 != (result = UploadMultipartPostRequest(tpath, (list.size() + 1), upload_id))){
@ -2550,7 +2559,7 @@ string GetContentMD5(int fd)
return Signature;
}
unsigned char* md5hexsum(int fd)
unsigned char* md5hexsum(int fd, off_t start, off_t size)
{
MD5_CTX c;
char buf[512];
@ -2558,19 +2567,30 @@ unsigned char* md5hexsum(int fd)
unsigned char* result = (unsigned char*)malloc(MD5_DIGEST_LENGTH);
// seek to top of file.
if(-1 == lseek(fd, 0, SEEK_SET)){
if(-1 == lseek(fd, start, SEEK_SET)){
return NULL;
}
memset(buf, 0, 512);
MD5_Init(&c);
while((bytes = read(fd, buf, 512)) > 0) {
for(ssize_t total = 0; total < size; total += bytes){
bytes = 512 < (size - total) ? 512 : (size - total);
bytes = read(fd, buf, bytes);
if(0 == bytes){
// end of file
break;
}else if(-1 == bytes){
// error
FGPRINT("md5hexsum: : file read error(%d)\n", errno);
free(result);
return NULL;
}
MD5_Update(&c, buf, bytes);
memset(buf, 0, 512);
}
MD5_Final(result, &c);
if(-1 == lseek(fd, 0, SEEK_SET)){
if(-1 == lseek(fd, start, SEEK_SET)){
free(result);
return NULL;
}
@ -2578,13 +2598,13 @@ unsigned char* md5hexsum(int fd)
return result;
}
string md5sum(int fd)
string md5sum(int fd, off_t start, off_t size)
{
char md5[2 * MD5_DIGEST_LENGTH + 1];
char hexbuf[3];
unsigned char* md5hex;
if(NULL == (md5hex = md5hexsum(fd))){
if(NULL == (md5hex = md5hexsum(fd, start, size))){
return string("");
}

View File

@ -44,14 +44,15 @@ typedef std::vector<std::string> etaglist_t;
// Each part information for Multipart upload
struct filepart
{
bool uploaded;
std::string partfile;
std::string etag;
FILE* fppart;
bool uploaded; // does finish uploading
std::string etag; // expected etag value
int fd; // base file(temporary full file) discriptor
off_t startpos; // seek fd point for uploading
ssize_t size; // uploading size
etaglist_t* etaglist; // use only parallel upload
int etagpos; // use only parallel upload
filepart() : uploaded(false), fppart(NULL), etaglist(NULL), etagpos(-1) {}
filepart() : uploaded(false), fd(-1), startpos(0), size(-1), etaglist(NULL), etagpos(-1) {}
~filepart()
{
clear();
@ -59,16 +60,11 @@ struct filepart
void clear(bool isfree = true)
{
if(isfree && fppart){
fclose(fppart);
}
if(isfree && 0 < partfile.size()){
remove(partfile.c_str());
}
uploaded = false;
partfile = "";
etag = "";
fppart = NULL;
fd = -1;
startpos = 0;
size = -1;
etaglist = NULL;
etagpos = - 1;
}
@ -165,6 +161,7 @@ class S3fsCurl
static size_t HeaderCallback(void *data, size_t blockSize, size_t numBlocks, void *userPtr);
static size_t WriteMemoryCallback(void *ptr, size_t blockSize, size_t numBlocks, void *data);
static size_t ReadCallback(void *ptr, size_t size, size_t nmemb, void *userp);
static size_t UploadReadCallback(void *ptr, size_t size, size_t nmemb, void *userp);
static bool UploadMultipartPostCallback(S3fsCurl* s3fscurl);
static S3fsCurl* UploadMultipartPostRetryCallback(S3fsCurl* s3fscurl);
@ -288,8 +285,8 @@ class S3fsMultiCurl
// Utility Functions
//----------------------------------------------
std::string GetContentMD5(int fd);
unsigned char* md5hexsum(int fd);
std::string md5sum(int fd);
unsigned char* md5hexsum(int fd, off_t start = 0, ssize_t size = -1);
std::string md5sum(int fd, off_t start = 0, ssize_t size = -1);
struct curl_slist* curl_slist_sort_insert(struct curl_slist* list, const char* data);
bool MakeUrlResource(const char* realpath, std::string& resourcepath, std::string& url);

View File

@ -63,6 +63,9 @@ using namespace std;
#define IS_REPLACEDIR(type) (DIRTYPE_OLD == type || DIRTYPE_FOLDER == type || DIRTYPE_NOOBJ == type)
#define IS_RMTYPEDIR(type) (DIRTYPE_OLD == type || DIRTYPE_FOLDER == type)
#define MAX_OBJECT_SIZE 68719476735LL // 64GB - 1L
#define MULTIPART_LOWLIMIT (20 * 1024 * 1024) // 20MB
//-------------------------------------------------------------------
// Global valiables
//-------------------------------------------------------------------
@ -810,7 +813,7 @@ static int put_local_fd(const char* path, headers_t meta, int fd, bool ow_sse_fl
*
* If file is > 20MB, then multipart will kick in
*/
if(st.st_size > 68719476735LL){ // 64GB - 1
if(st.st_size > MAX_OBJECT_SIZE){ // 64GB - 1
// close f ?
return -ENOTSUP;
}
@ -822,7 +825,7 @@ static int put_local_fd(const char* path, headers_t meta, int fd, bool ow_sse_fl
return -errno;
}
if(st.st_size >= 20971520 && !nomultipart){ // 20MB
if(st.st_size >= MULTIPART_LOWLIMIT && !nomultipart){ // 20MB
// Additional time is needed for large files
time_t backup = 0;
if(120 > S3fsCurl::GetReadwriteTimeout()){

View File

@ -672,67 +672,6 @@ bool is_need_check_obj_detail(headers_t& meta)
return true;
}
//-------------------------------------------------------------------
// Utility functions for convert
//-------------------------------------------------------------------
// Copy file(file pointer) to tmpfile
int copy_chunk_tempfile(FILE* file, unsigned char* buf, off_t chunk, string& tmpname)
{
char tmpfile[256];
off_t copy_total;
off_t copied;
int fd;
FILE* fptmp;
if(!file || !buf || 0 == chunk){
return 0;
}
// copy the file portion into the buffer
for(copy_total = 0; copy_total < chunk; copy_total += copied){
copied = fread(&buf[copy_total], sizeof(unsigned char), (chunk - copy_total), file);
if(copied != (chunk - copy_total)){
if(0 != ferror(file) || feof(file)){
FGPRINT("copy_part_tempfile: read file error(%d)\n", ferror(file));
SYSLOGERR("read file error(%d)", ferror(file));
return -(ferror(file));
}
}
}
// create uniq temporary file
strncpy(tmpfile, "/tmp/s3fs.XXXXXX", sizeof(tmpfile));
if(-1 == (fd = mkstemp(tmpfile))){
FGPRINT("copy_part_tempfile: Could not open tempolary file(%s) - errno(%d)\n", tmpfile, errno);
SYSLOGERR("Could not open tempolary file(%s) - errno(%d)", tmpfile, errno);
return -errno;
}
if(NULL == (fptmp = fdopen(fd, "wb"))){
FGPRINT("copy_part_tempfile: Could not open tempolary file(%s) - errno(%d)\n", tmpfile, errno);
SYSLOGERR("Could not open tempolary file(%s) - errno(%d)", tmpfile, errno);
close(fd);
return -errno;
}
// copy buffer to temporary file
for(copy_total = 0; copy_total < chunk; copy_total += copied){
copied = fwrite(&buf[copy_total], sizeof(unsigned char), (chunk - copy_total), fptmp);
if(copied != (chunk - copy_total)){
if(0 != ferror(fptmp)){
FGPRINT("copy_part_tempfile: write file error(%d)\n", ferror(fptmp));
SYSLOGERR("write file error(%d)", ferror(fptmp));
fclose(fptmp);
remove(tmpfile);
return -(ferror(fptmp));
}
}
}
fclose(fptmp);
tmpname = tmpfile;
return 0;
}
//-------------------------------------------------------------------
// Help
//-------------------------------------------------------------------

View File

@ -94,7 +94,6 @@ blkcnt_t get_blocks(off_t size);
time_t get_lastmodified(const char* s);
time_t get_lastmodified(headers_t& meta);
bool is_need_check_obj_detail(headers_t& meta);
int copy_chunk_tempfile(FILE* file, unsigned char* buf, off_t chunk, std::string& tmpname);
void show_usage(void);
void show_help(void);