This commit is contained in:
Axel Kittenberger 2010-11-12 15:39:43 +00:00
parent a2554168e6
commit 14669a70dc
3 changed files with 185 additions and 46 deletions

View File

@ -26,6 +26,10 @@ slowbash = {
event.source, event.target) event.source, event.target)
end, end,
onModify = function(event)
spawn(event, "/home/axel/lsyncd2/in", "<", "tuhutu\n", "2")
end,
onCreate = function(event) onCreate = function(event)
local s = event.sourcePathname local s = event.sourcePathname
local t = event.targetPathname local t = event.targetPathname
@ -33,12 +37,12 @@ slowbash = {
spawnShell(event, prefix..[[cp -r "$1" "$2"]], s, t) spawnShell(event, prefix..[[cp -r "$1" "$2"]], s, t)
end, end,
onModify = function(event) -- onModify = function(event)
local s = event.sourcePathname -- local s = event.sourcePathname
local t = event.targetPathname -- local t = event.targetPathname
log("Normal", "Spawning Modify ",s," -> ",t) -- log("Normal", "Spawning Modify ",s," -> ",t)
spawnShell(event, prefix..[[cp -r "$1" "$2"]], s, t) -- spawnShell(event, prefix..[[cp -r "$1" "$2"]], s, t)
end, -- end,
onDelete = function(event) onDelete = function(event)
local t = event.targetPathname local t = event.targetPathname

211
lsyncd.c
View File

@ -393,6 +393,79 @@ s_strdup(const char *src)
return s; return s;
} }
/**
* Sets the close-on-exit flag for an fd
*/
static void
close_exec_fd(int fd)
{
int flags;
flags = fcntl(fd, F_GETFD);
if (flags == -1) {
logstring("Error", "cannot get descriptor flags!");
exit(-1); // ERRNO
}
flags |= FD_CLOEXEC;
if (fcntl(fd, F_SETFD, flags) == -1) {
logstring("Error", "cannot set descripptor flags!");
exit(-1); // ERRNO
}
}
/**
* Sets the non-blocking flag for an fd
*/
static void
non_block_fd(int fd)
{
int flags;
flags = fcntl(fd, F_GETFL);
if (flags == -1) {
logstring("Error", "cannot get status flags!");
exit(-1); // ERRNO
}
flags |= O_NONBLOCK;;
if (fcntl(fd, F_SETFL, flags) == -1) {
logstring("Error", "cannot set status flags!");
exit(-1); // ERRNO
}
}
/**
* A child process gets text piped longer than on
* write() can manage.
*/
struct pipemsg {
/* pipe file descriptor */
int fd;
/* message to send */
char *text;
/* length of text */
int tlen;
/* position in message */
int pos;
};
/**
* All pipes currently active.
*/
static struct pipemsg *pipes = NULL;
/**
* amount of pipes allocated.
*/
size_t pipes_size = 0;
/**
* number of pipes used.
*/
size_t pipes_len = 0;
/***************************************************************************** /*****************************************************************************
* Library calls for lsyncd.lua * Library calls for lsyncd.lua
* *
@ -606,37 +679,31 @@ l_exec(lua_State *L)
if (argc >= 2 && !strcmp(luaL_checkstring(L, 2), "<")) { if (argc >= 2 && !strcmp(luaL_checkstring(L, 2), "<")) {
/* pipes something into stdin */ /* pipes something into stdin */
int flags;
pipe_text = luaL_checkstring(L, 3); pipe_text = luaL_checkstring(L, 3);
printf("pipe_text = %s\n", pipe_text);
/* creates the pipe */ /* creates the pipe */
if (pipe(pipefd) == -1) { if (pipe(pipefd) == -1) {
logstring("Error", "cannot create a pipe!"); logstring("Error", "cannot create a pipe!");
terminate(-1); // ERRNO exit(-1); // ERRNO
}
/* sets the write end of the pipe to close on exec */
if (flags = fcntl(fd, F_GETFD) == -1) {
logstring("Error", "cannot get pipe flage!");
terminate(-1); // ERRNO
}
flags |= FD_CLOEXEC;
if (fcntl(fd, F_SETFD, flags) == -1) {
logstring("Error", "cannot set pipe flage!");
terminate(-1); // ERRNO
} }
/* always close the write end for child processes */
close_exec_fd(pipefd[1]);
/* set the write end on non-blocking */
non_block_fd(pipefd[1]);
argc -= 2; argc -= 2;
li += 2; li += 2;
} }
argv = s_calloc(argc + 2, sizeof(char *)); {
argv[0] = binary; /* prepares the arguments */
for(i = 1; i <= argc; i++) { int i;
argv[i] = luaL_checkstring(L, i + li); argv = s_calloc(argc + 2, sizeof(char *));
argv[0] = binary;
for(i = 1; i <= argc; i++) {
argv[i] = luaL_checkstring(L, i + li);
}
argv[i] = NULL;
} }
argv[i] = NULL;
pid = fork(); pid = fork();
if (pid == 0) { if (pid == 0) {
@ -665,9 +732,33 @@ printf("pipe_text = %s\n", pipe_text);
} }
if (pipe_text) { if (pipe_text) {
int tlen = strlen(pipe_text);
int len;
/* first closes read-end of pipe, this is for child process only */ /* first closes read-end of pipe, this is for child process only */
close(pipefd[0]); close(pipefd[0]);
/* start filling the pipe */
len = write(pipefd[1], pipe_text, tlen);
if (len < 0) {
logstring("Normal", "broken pipe.");
close(pipefd[0]);
}
if (len == tlen) {
/* usual and best case, the pipe accepted all input -> close */
close(pipefd[1]);
logstring("Exec", "one-sweeped pipe");
} else {
logstring("Exec", "adding delayed pipe");
int p = pipes_len;
pipes_len++;
if (pipes_len > pipes_size) {
pipes_size = pipes_len;
pipes = s_realloc(pipes, pipes_size*sizeof(struct pipemsg));
}
pipes[p].fd = pipefd[1];
pipes[p].tlen = tlen;
pipes[p].text = s_strdup(pipe_text);
}
close(pipefd[0]);
} }
free(argv); free(argv);
lua_pushnumber(L, pid); lua_pushnumber(L, pid);
@ -1075,7 +1166,7 @@ masterloop(lua_State *L)
bool have_alarm; bool have_alarm;
clock_t now = times(NULL); clock_t now = times(NULL);
clock_t alarm_time; clock_t alarm_time;
int do_read; bool do_read = false;
ssize_t len; ssize_t len;
/* queries runner about soonest alarm */ /* queries runner about soonest alarm */
@ -1096,16 +1187,12 @@ masterloop(lua_State *L)
/* there is a delay that wants to be handled already thus do not /* there is a delay that wants to be handled already thus do not
* read from inotify_fd and jump directly to its handling */ * read from inotify_fd and jump directly to its handling */
logstring("Masterloop", "immediately handling delays."); logstring("Masterloop", "immediately handling delays.");
do_read = 0;
} else { } else {
/* use select() to determine what happens next /* use select() to determine what happens next
* + a new event on inotify * + a new event on inotify
* + an alarm on timeout * + an alarm on timeout
* + the return of a child process */ * + the return of a child process */
sigset_t sigset;
fd_set readfds;
struct timespec tv; struct timespec tv;
sigemptyset(&sigset);
if (have_alarm) { if (have_alarm) {
double d = ((double)(alarm_time - now)) / clocks_per_sec; double d = ((double)(alarm_time - now)) / clocks_per_sec;
@ -1118,18 +1205,39 @@ masterloop(lua_State *L)
} }
/* if select returns a positive number there is data on inotify /* if select returns a positive number there is data on inotify
* on zero the timemout occured. */ * on zero the timemout occured. */
FD_ZERO(&readfds); {
FD_SET(inotify_fd, &readfds); fd_set rfds;
do_read = pselect(inotify_fd + 1, &readfds, NULL, NULL, fd_set wfds;
have_alarm ? &tv : NULL, &sigset); sigset_t sigset;
sigemptyset(&sigset);
int nfds = inotify_fd;
int pi;
logstring("Masterloop", do_read > 0 ? FD_ZERO(&rfds);
"theres data on inotify." : FD_ZERO(&wfds);
"core: select() timeout or signal."); FD_SET(inotify_fd, &rfds);
for(pi = 0; pi < pipes_len; pi++) {
int pfd = pipes[pi].fd;
nfds = pfd > nfds ? pfd : nfds;
FD_SET(pfd, &wfds);
}
/* reuse pi for result */
pi = pselect(nfds + 1, &rfds, NULL, NULL,
have_alarm ? &tv : NULL, &sigset);
if (pi >= 0) {
do_read = FD_ISSET(inotify_fd, &rfds);
}
if (do_read) {
logstring("Masterloop", do_read > 0 ?
"theres data on inotify." :
"core: select() timeout or signal.");
}
}
} }
/* reads possible events from inotify stream */ /* reads possible events from inotify stream */
while(do_read > 0) { while(do_read) {
int i = 0; int i = 0;
do { do {
len = read (inotify_fd, readbuf, readbuf_size); len = read (inotify_fd, readbuf, readbuf_size);
@ -1158,8 +1266,8 @@ masterloop(lua_State *L)
FD_ZERO(&readfds); FD_ZERO(&readfds);
FD_SET(inotify_fd, &readfds); FD_SET(inotify_fd, &readfds);
do_read = pselect(inotify_fd + 1, &readfds, do_read = pselect(inotify_fd + 1, &readfds,
NULL, NULL, &tv, NULL); NULL, NULL, &tv, NULL) > 0;
if (do_read > 0) { if (do_read) {
logstring("Masterloop", "there is more data on inotify."); logstring("Masterloop", "there is more data on inotify.");
} }
} }
@ -1170,6 +1278,33 @@ masterloop(lua_State *L)
handle_event(L, NULL); handle_event(L, NULL);
} }
{
/* writes into pipes if any */
int pi;
for(pi = 0; pi < pipes_len; pi++) {
struct pipemsg *pm = pipes + pi;
int len = write(pm->fd, pm->text + pm->pos, pm->tlen - pm->pos);
bool do_close = false;
pm->pos += len;
if (len < 0) {
logstring("Normal", "broken pipe.");
do_close = true;
} else if (pm->pos >= pm->tlen) {
logstring("Debug", "finished pipe.");
do_close = true;
}
if (do_close) {
close(pm->fd);
free(pm->text);
pipes_len--;
memmove(pipes + pi, pipes + pi + 1,
(pipes_len - pi) * sizeof(struct pipemsg));
pi--;
continue;
}
}
}
/* collects zombified child processes */ /* collects zombified child processes */
while(1) { while(1) {
int status; int status;
@ -1429,14 +1564,14 @@ main(int argc, char *argv[])
} }
/* opens inotify */ /* opens inotify */
inotify_fd = inotify_init1(IN_CLOEXEC); inotify_fd = inotify_init();
if (inotify_fd == -1) { if (inotify_fd == -1) {
printlogf(L, "Error", printlogf(L, "Error",
"Cannot create inotify instance! (%d:%s)", "Cannot create inotify instance! (%d:%s)",
errno, strerror(errno)); errno, strerror(errno));
return -1; // ERRNO return -1; // ERRNO
} }
close_exec_fd(inotify_fd);
{ {
/* adds signal handlers * /* adds signal handlers *

View File

@ -1474,8 +1474,8 @@ end
-- @param ... arguments -- @param ... arguments
-- --
function spawn(agent, binary, ...) function spawn(agent, binary, ...)
if agent == nil then if agent == nil or type(agent) ~= "table" then
error("spawning with a nil agent", 2) error("spawning with an invalid agent", 2)
end end
local pid = lsyncd.exec(binary, ...) local pid = lsyncd.exec(binary, ...)
if pid and pid > 0 then if pid and pid > 0 then