Further multipart cleanup/error checking, in preparation for multi-threaded uploads.

"Last-Modified" is now returned with get_headers() data


git-svn-id: http://s3fs.googlecode.com/svn/trunk@320 df820570-a93a-0410-bd06-b72b767a4274
This commit is contained in:
ben.lemasurier@gmail.com 2011-02-17 17:31:43 +00:00
parent cfa0fd2992
commit 1496f6a81e
3 changed files with 52 additions and 97 deletions

View File

@ -1,7 +1,7 @@
dnl Process this file with autoconf to produce a configure script.
AC_PREREQ(2.59)
AC_INIT(s3fs, 1.45)
AC_INIT(s3fs, 1.46)
AC_CANONICAL_SYSTEM

View File

@ -63,8 +63,6 @@ class auto_curl_slist {
struct curl_slist* slist;
};
// Memory structure for the alternate
// write memory callback used with curl_easy_perform
struct BodyStruct {
@ -78,9 +76,6 @@ struct WriteThis {
int sizeleft;
};
typedef struct curlhll {
CURL *handle;
struct curlhll *next;
@ -309,9 +304,6 @@ void free_mvnodes(MVNODE *head) {
return;
}
/////////////////////////////////////////////////
// Multi CURL stuff
/////////////////////////////////////////////////
@ -341,7 +333,6 @@ void add_h_element(CURLHLL *head, CURL *handle) {
return;
}
CURLMHLL *create_mh_element(CURLM *handle) {
CURLMHLL *p;
p = (CURLMHLL *) malloc(sizeof(CURLMHLL));
@ -622,7 +613,6 @@ static int my_curl_easy_perform(CURL* curl, BodyStruct* body = NULL, FILE* f = 0
}
// Service response codes which are >= 400 && < 500
switch(responseCode) {
case 400:
if(debug) syslog(LOG_ERR, "HTTP response code 400 was returned");
@ -769,7 +759,6 @@ static int my_curl_easy_perform(CURL* curl, BodyStruct* body = NULL, FILE* f = 0
return -EIO;
}
/**
* urlEncode a fuse path,
* taking into special consideration "/",
@ -1058,6 +1047,8 @@ int get_headers(const char* path, headers_t& meta) {
meta[key] = value;
if (key == "ETag")
meta[key] = value;
if(key == "Last-Modified")
meta[key] = value;
if (key.substr(0, 5) == "x-amz")
meta[key] = value;
}
@ -1087,9 +1078,8 @@ int get_local_fd(const char* path) {
if (use_cache.size() > 0) {
result = get_headers(path, responseHeaders);
if(result != 0) {
if(result != 0)
return -result;
}
fd = open(cache_path.c_str(), O_RDWR); // ### TODO should really somehow obey flags here
@ -1358,7 +1348,6 @@ static int put_local_fd_small_file(const char* path, headers_t meta, int fd) {
static int put_local_fd_big_file(const char* path, headers_t meta, int fd) {
struct stat st;
int result;
off_t lSize;
int partfd = -1;
FILE* pSourceFile;
@ -1367,17 +1356,12 @@ static int put_local_fd_big_file(const char* path, headers_t meta, int fd) {
unsigned long lBufferSize = 0;
size_t bytesRead;
size_t bytesWritten;
unsigned int partNumber;
string uploadId;
vector <string> etags;
char partFileName[17] = "";
vector <file_part> parts;
if(foreground)
cout << " put_local_fd_big_file[path=" << path << "][fd=" << fd << "]" << endl;
// Initialization of variables
etags.clear();
if(fstat(fd, &st) == -1)
YIKES(-errno);
@ -1399,20 +1383,19 @@ static int put_local_fd_big_file(const char* path, headers_t meta, int fd) {
lSize = st.st_size;
lBufferSize = 0;
partNumber = 0;
// cycle through open fd, pulling off 10MB chunks at a time
while(lSize > 0) {
partNumber++;
if(lSize >= 10485760) // 10MB
lBufferSize = 10485760;
file_part part;
if(lSize >= MULTIPART_SIZE)
lBufferSize = MULTIPART_SIZE;
else
lBufferSize = lSize;
lSize = lSize - lBufferSize;
buffer = (char*) malloc(sizeof(char) * lBufferSize);
if(buffer == NULL) {
if((buffer = (char *) malloc(sizeof(char) * lBufferSize)) == NULL) {
syslog(LOG_CRIT, "Could not allocate memory for buffer\n");
exit(1);
}
@ -1429,22 +1412,17 @@ static int put_local_fd_big_file(const char* path, headers_t meta, int fd) {
return(-EIO);
}
// printf("bytesRead: %i\n", bytesRead);
// source file portion is now in memory
// open a temporary file for upload
strncpy(partFileName, "/tmp/s3fs.XXXXXX", sizeof partFileName);
partfd = mkstemp(partFileName);
if(partfd == -1) {
syslog(LOG_ERR, "%d ### Could not create temporary file\n", __LINE__);
// create uniq temporary file
strncpy(part.path, "/tmp/s3fs.XXXXXX", sizeof part.path);
if((partfd = mkstemp(part.path)) == -1) {
if(buffer)
free(buffer);
return(-errno);
YIKES(-errno);
}
pPartFile = fdopen(partfd, "wb");
if(pPartFile == NULL) {
// open a temporary file for upload
if((pPartFile = fdopen(partfd, "wb")) == NULL) {
syslog(LOG_ERR, "%d ### Could not open temporary file: errno %i\n",
__LINE__, errno);
if(buffer)
@ -1453,13 +1431,8 @@ static int put_local_fd_big_file(const char* path, headers_t meta, int fd) {
return(-errno);
}
// printf ("Tempfile opened: %s\n", partFileName);
// Copy buffer to tmpfile
// printf("bufferSize: %lu\n", lBufferSize);
// copy buffer to temporary file
bytesWritten = fwrite(buffer, 1, (size_t)lBufferSize, pPartFile);
// printf("bytesWritten: %u\n", bytesWritten);
if(bytesWritten != lBufferSize) {
syslog(LOG_ERR, "%d ### bytesWritten:%zu does not match lBufferSize: %lu\n",
__LINE__, bytesWritten, lBufferSize);
@ -1471,24 +1444,20 @@ static int put_local_fd_big_file(const char* path, headers_t meta, int fd) {
return(-EIO);
}
// printf ("Tempfile written: %s\n", partFileName);
fclose(pPartFile);
if(buffer)
free(buffer);
etags.push_back(upload_part(path, partFileName, partNumber, uploadId));
part.etag = upload_part(path, part.path, parts.size() + 1, uploadId);
// delete temporary part file
result = remove(partFileName);
if(result != 0) {
syslog(LOG_ERR, "Could not remove temporary file: %s errno: %i\n",
partFileName, errno);
if(remove(part.path) != 0)
YIKES(-errno);
return(-errno);
}
parts.push_back(part);
} // while(lSize > 0)
return complete_multipart_upload(path, uploadId, partNumber, etags);
return complete_multipart_upload(path, uploadId, parts);
}
/**
@ -1677,11 +1646,11 @@ string initiate_multipart_upload(const char *path, off_t size, headers_t meta) {
}
static int complete_multipart_upload(const char *path, string upload_id,
int n_parts, vector <string> etags) {
vector <file_part> parts) {
CURL *curl = NULL;
char *pData;
int result;
int i;
int i, j;
string auth;
string date;
string raw_date;
@ -1700,24 +1669,23 @@ static int complete_multipart_upload(const char *path, string upload_id,
body.text = (char *)malloc(1);
body.size = 0;
curl = NULL;
etags.clear();
postContent.clear();
postContent.append("<CompleteMultipartUpload>\n");
for(i = 0; i < n_parts; i++) {
for(i = 0, j = parts.size(); i < j; i++) {
postContent.append(" <Part>\n");
postContent.append(" <PartNumber>");
postContent.append(IntToStr(i+1));
postContent.append("</PartNumber>\n");
postContent.append(" <ETag>");
postContent.append(etags[i]);
postContent.append(parts[i].etag.insert(0, "\"").append("\""));
postContent.append("</ETag>\n");
postContent.append(" </Part>\n");
}
postContent.append("</CompleteMultipartUpload>\n");
// error check pData
pData = (char *)malloc( postContent.size() + 1);
if((pData = (char *)malloc(postContent.size() + 1)) == NULL)
YIKES(-errno)
pooh.readptr = pData;
pooh.sizeleft = postContent.size();
@ -1737,6 +1705,7 @@ static int complete_multipart_upload(const char *path, string upload_id,
curl = create_curl_handle();
// curl_easy_setopt(curl, CURLOPT_VERBOSE, true);
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, true);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)&body);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteMemoryCallback);
@ -1884,7 +1853,7 @@ string upload_part(const char *path, const char *source, int part_number, string
// calculate local md5sum, if it matches the header
// ETag value, the upload was successful.
string md5 = md5sum(source).insert(0, "\"").append("\"");
string md5 = md5sum(source);
if(!md5.empty() && strstr(header.text, md5.c_str())) {
ETag.assign(md5);
} else {
@ -3589,43 +3558,21 @@ static int s3fs_utimens(const char *path, const struct timespec ts[2]) {
///////////////////////////////////////////////////////////
static int list_multipart_uploads(void) {
CURL *curl = NULL;
printf("List Multipart Uploads\n");
// struct stat st;
string resource;
string url;
struct BodyStruct body;
// struct BodyStruct header;
// string ETag;
int result;
// unsigned long lSize;
// FILE* pSourceFile;
// FILE* pPartFile;
// char * buffer;
// unsigned long lBufferSize = 0;
// size_t bytesRead;
// size_t bytesWritten;
// char * partFileName;
// int partNumber;
string date;
// string expires;
string raw_date;
string auth;
string my_url;
// const char * UploadId;
// string uploadId;
struct curl_slist *slist=NULL;
// vector <string> etags;
// int i;
// Initialization of variables
body.text = (char *)malloc(1);
body.size = 0;
curl = NULL;
printf("List Multipart Uploads\n");
//////////////////////////////////////////
// Syntax:
@ -3648,8 +3595,6 @@ static int list_multipart_uploads(void) {
url = host + resource;
// printf("url: %s\n", url.c_str());
my_url = prepare_url(url.c_str());
// printf("my_url: %s\n", my_url.c_str());

View File

@ -3,6 +3,8 @@
#define FUSE_USE_VERSION 26
#define MULTIPART_SIZE 10485760 // 10MB
#include <map>
#include <string>
#include <vector>
@ -73,6 +75,14 @@ typedef map<string, struct stat_cache_entry> stat_cache_t;
static stat_cache_t stat_cache;
static pthread_mutex_t stat_cache_lock;
struct file_part {
char path[17];
string etag;
bool uploaded;
file_part() : uploaded(false) {}
};
static const char hexAlphabet[] = "0123456789ABCDEF";
// http headers
@ -93,7 +103,7 @@ string urlEncode(const string &s);
string lookupMimeType(string);
string initiate_multipart_upload(const char *path, off_t size, headers_t meta);
string upload_part(const char *path, const char *source, int part_number, string upload_id);
static int complete_multipart_upload(const char *path, string upload_id, int n_parts, vector <string> etags);
static int complete_multipart_upload(const char *path, string upload_id, vector <file_part> parts);
string md5sum(const char *path);
static int get_stat_cache_entry(const char *path, struct stat *buf);