working on user access for observe_fd

This commit is contained in:
Axel Kittenberger 2010-12-01 12:19:17 +00:00
parent 84bc9de865
commit ded00765c6
4 changed files with 442 additions and 143 deletions

112
examples/lsayirc.lua Normal file
View File

@ -0,0 +1,112 @@
require("socket")
settings.nodaemon = true
--hostname = "irc.freenode.org"
hostname = "127.0.0.1"
port = 6667
nick = "lbot01"
chan = "##lfile01"
-----
-- this blocks until the connection is established
-- for once this ok since Lsyncd didnt yet start.
local ircSocket, err = socket.connect(hostname, port)
if not ircSocket then
log("Error", "Cannot connect to IRC: ", err)
terminate(-1)
end
-----
-- from now it must not block!
ircSocket:settimeout(0)
------
-- Buffer for stuff to send and receive on IRC:
local ircWBuf = ""
local ircRBuf = ""
local writeIrc
-----
-- Called when the IRC socket can be written
local function ircWritey(fd)
writeIrc()
end
----
-- Called when socket can be read
local function ircReady(socket)
local l, err, ircRBuf = ircSocket:receive("*l", ircRBuf)
log("Normal", "XXX", l)
if not l then
if errr ~= "timeout" then
log("Error", "IRC connection failed: ", err)
terminate(-1)
end
else
ircRBuf = ""
return
end
log("Normal", "ircin :", l)
--- answers ping messages
local ping = l:match("PING :(.*)")
if ping then
writeIRC("PONG :", ping)
end
end
-----
-- Writes on IRC socket
function writeIrc(...)
ircWBuf = ircWBuf..table.concat({...})
local s, err = ircSocket:send(ircWBuf)
if not s then
log("Error", "IRC connection failed: ", err)
terminate(-1)
end
--- log what has been send, but dont log the linefeed.
if (ircWBuf:sub(s, s) == "\n") then
log("Normal", "ircout: ", ircWBuf:sub(1, s - 1))
else
log("Normal", "ircout: ", ircWBuf:sub(1, s), "\\")
end
ircWBuf = ircWBuf:sub(s + 1, -1)
-- when the write buffer is empty unregister from core
-- this script no longer wants to be called when it can write
-- on the socket. If the buffer is filled register at the core.
if ircWBuf == "" then
observefd(ircSocket:getfd(), ircReady, nil)
else
observefd(ircSocket:getfd(), ircReady, ircWritey)
end
end
writeIrc("NICK ", nick, "\n")
writeIrc("USER ", nick, " 0 * :testbot", "\n")
writeIrc("JOIN ", chan, "\n")
----
-- Lets the Lsyncd core watch the IRCs filedescriptor
-- and call ircReady and ircWritey when they are
-- ready for transfer
observefd(ircSocket:getfd(), ircReady, ircWritey)
local function action(inlet)
-- event2 is the target of a move event
local event, event2 = inlet.getEvent()
if not event2 then
writeIrc("PRIVMSG ",chan," :",event.etype," ",
event.path)
else
writeIrc("PRIVMSG ",chan," :",event.etype," ",
event.path," -> ",event2.path)
end
inlet.discardEvent(event)
end
sync{source= "src", action= action, delay= 1 }

View File

@ -337,8 +337,6 @@ inotify_tidy(struct observance *obs) {
readbuf = NULL;
}
/**
* opens and initalizes inotify.
*/
@ -358,6 +356,7 @@ open_inotify(lua_State *L) {
errno, strerror(errno));
exit(-1); // ERRNO
}
printlogf(L, "Inotify", "inotify fd = %d", inotify_fd);
close_exec_fd(inotify_fd);
non_block_fd(inotify_fd);

454
lsyncd.c
View File

@ -422,6 +422,19 @@ pipe_tidy(struct observance *observance)
* helper routines.
****************************************************************************/
/**
* Dummy variable whos address is used as the cores index in the lua registry
* to the lua runners function table in the lua registry.
*/
static int runner;
/**
* Dummy variable whos address is used as the cores index n the lua registry
* to the lua runners error handler.
*/
static int callError;
/**
* Sets the close-on-exit flag for an fd
*/
@ -474,13 +487,206 @@ write_pidfile(lua_State *L, const char *pidfile) {
fclose(f);
}
/*****************************************************************************
* Observances
****************************************************************************/
/**
* List of file descriptor watches.
*/
static struct observance * observances = NULL;
static int observances_len = 0;
static int observances_size = 0;
/**
* List of file descriptors to nonobserve.
* While working for the oberver lists, it may
* not be altered, thus nonobserve stores here the
* actions that will be delayed.
*/
static int *nonobservances = NULL;
static int nonobservances_len = 0;
static int nonobservances_size = 0;
/**
* true while the observances list is being handled.
*/
static bool observance_action = false;
/**
* Core watches a filedescriptor to become ready,
* one of read_ready or write_ready may be zero
*/
extern void
observe_fd(int fd,
void (*ready) (lua_State *, struct observance *),
void (*writey)(lua_State *, struct observance *),
void (*tidy) (struct observance *),
void *extra)
{
int pos;
/* looks if the fd is already there as pos or
* stores the position to insert the new fd in pos */
for(pos = 0; pos < observances_len; pos++) {
if (fd <= observances[pos].fd) {
break;
}
}
if (pos < observances_len && observances[pos].fd == fd) {
/* just update an existing observance */
logstring("Masterloop", "updating n fd observance");
observances[pos].ready = ready;
observances[pos].writey = writey;
observances[pos].tidy = tidy;
observances[pos].extra = extra;
return;
}
printf("new observance %d:%d\n", fd, pos);
if (observance_action) {
// TODO
logstring("Error",
"internal, New observances in ready/writey handlers not yet supported");
exit(-1); // ERRNO
}
if (!tidy) {
logstring("Error",
"internal, tidy() in observe_fd() must not be NULL.");
exit(-1); // ERRNO
}
if (observances_len + 1 > observances_size) {
observances_size = observances_len + 1;
observances = s_realloc(observances,
observances_size * sizeof(struct observance));
}
memmove(observances + pos + 1, observances + pos,
(observances_len - pos) * (sizeof(struct observance)));
observances_len++;
observances[pos].fd = fd;
observances[pos].ready = ready;
observances[pos].writey = writey;
observances[pos].tidy = tidy;
observances[pos].extra = extra;
}
/**
* Makes core no longer watch fd.
*/
extern void
nonobserve_fd(int fd)
{
int pos;
if (observance_action) {
/* this function is called through a ready/writey handler
* while the core works through the observance list, thus
* it does not alter the list, but stores this actions
* on a stack
*/
nonobservances_len++;
if (nonobservances_len > nonobservances_size) {
nonobservances_size = nonobservances_len;
nonobservances = s_realloc(nonobservances,
nonobservances_size * sizeof(int));
}
nonobservances[nonobservances_len - 1] = fd;
return;
}
/* looks for the fd */
for(pos = 0; pos < observances_len; pos++) {
if (observances[pos].fd == fd) {
break;
}
}
if (pos >= observances_len) {
logstring("Error",
"internal fail, not observance file descriptor in nonobserve");
exit(-1); //ERRNO
}
/* and moves the list down */
memmove(observances + pos, observances + pos + 1,
(observances_len - pos) * (sizeof(struct observance)));
observances_len--;
}
/**
* A user observance became read-ready
*/
static void
user_obs_ready(lua_State *L, struct observance *obs)
{
int fd = obs->fd;
/* pushes the ready table on table */
lua_pushlightuserdata(L, (void *) user_obs_ready);
lua_gettable(L, LUA_REGISTRYINDEX);
/* pushes the error handler */
lua_pushlightuserdata(L, (void *) &callError);
lua_gettable(L, LUA_REGISTRYINDEX);
/* pushed the user func */
lua_pushnumber(L, fd);
lua_gettable(L, -3);
/* gives the ufunc the fd */
lua_pushnumber(L, fd);
/* calls the user function */
if (lua_pcall(L, 1, 0, -3)) {
exit(-1); // ERRNO
}
lua_pop(L, 2);
}
/**
* A user observance became write-ready
*/
static void
user_obs_writey(lua_State *L, struct observance *obs)
{
int fd = obs->fd;
/* pushes the writey table on table */
lua_pushlightuserdata(L, (void *) user_obs_writey);
lua_gettable(L, LUA_REGISTRYINDEX);
/* pushes the error handler */
lua_pushlightuserdata(L, (void *) &callError);
lua_gettable(L, LUA_REGISTRYINDEX);
/* pushed the user func */
lua_pushnumber(L, fd);
lua_gettable(L, -3);
/* gives the ufunc the fd */
lua_pushnumber(L, fd);
/* calls the user function */
if (lua_pcall(L, 1, 0, -3)) {
exit(-1); // ERRNO
}
lua_pop(L, 2);
}
/**
* Tidies up a user observance
* TODO - give the user a chance to do something in that case!
*/
static void
user_obs_tidy(struct observance *obs)
{
close(obs->fd);
}
/*****************************************************************************
* Library calls for lsyncd.lua
*
* These are as minimal as possible glues to the operating system needed for
* lsyncd operation.
*
****************************************************************************/
static void daemonize(lua_State *L);
@ -913,15 +1119,109 @@ l_configure(lua_State *L)
return 0;
}
/**
* Allows the user to observe filedescriptors
*
* @param (Lua stack) filedescriptor.
* @param (Lua stack) function to call on ready
* @param (Lua stack) function to call on writey
*/
static int
l_observe_fd(lua_State *L)
{
int fd = luaL_checknumber(L, 1);
bool ready = false;
bool writey = false;
/* Stores the user function in the lua registry.
* It uses the address of the cores ready/write functions
* for the user as key */
if (!lua_isnoneornil(L, 2)) {
lua_pushlightuserdata(L, (void *) user_obs_ready);
lua_gettable(L, LUA_REGISTRYINDEX);
if lua_isnil(L, -1) {
lua_pop(L, 1);
lua_newtable(L);
lua_pushlightuserdata(L, (void *) user_obs_ready);
lua_pushvalue(L, -2);
lua_settable(L, LUA_REGISTRYINDEX);
}
lua_pushnumber(L, fd);
lua_pushvalue(L, 2);
lua_settable(L, -3);
lua_pop(L, 1);
ready = true;
}
if (!lua_isnoneornil(L, 3)) {
lua_pushlightuserdata(L, (void *) user_obs_writey);
lua_gettable(L, LUA_REGISTRYINDEX);
if lua_isnil(L, -1) {
lua_pop(L, 1);
lua_newtable(L);
lua_pushlightuserdata(L, (void *) user_obs_writey);
lua_pushvalue(L, -2);
lua_settable(L, LUA_REGISTRYINDEX);
}
lua_pushnumber(L, fd);
lua_pushvalue(L, 3);
lua_settable(L, -3);
lua_pop(L, 1);
writey = true;
}
/* tells the core to watch the fd */
observe_fd(fd,
ready ? user_obs_ready : NULL,
writey ? user_obs_writey : NULL,
user_obs_tidy,
NULL);
return 0;
}
/**
* Removes a user observance
* @param (Lua stack) filedescriptor.
*/
extern int
l_nonobserve_fd(lua_State *L)
{
int fd = luaL_checknumber(L, 1);
/* removes read func */
lua_pushlightuserdata(L, (void *) user_obs_ready);
lua_gettable(L, LUA_REGISTRYINDEX);
if (!lua_isnil(L, -1)) {
lua_pushnumber(L, fd);
lua_pushnil(L);
lua_settable(L, -2);
}
lua_pop(L, 1);
lua_pushlightuserdata(L, (void *) user_obs_writey);
lua_gettable(L, LUA_REGISTRYINDEX);
if (!lua_isnil(L, -1)) {
lua_pushnumber(L, fd);
lua_pushnil(L);
lua_settable(L, -2);
}
lua_pop(L, 1);
nonobserve_fd(fd);
return 0;
}
static const luaL_reg lsyncdlib[] = {
{"configure", l_configure },
{"exec", l_exec },
{"log", l_log },
{"now", l_now },
{"readdir", l_readdir },
{"realdir", l_realdir },
{"stackdump", l_stackdump },
{"terminate", l_terminate },
{"configure", l_configure },
{"exec", l_exec },
{"log", l_log },
{"now", l_now },
{"nonobserve_fd", l_nonobserve_fd },
{"observe_fd", l_observe_fd },
{"readdir", l_readdir },
{"realdir", l_realdir },
{"stackdump", l_stackdump },
{"terminate", l_terminate },
{NULL, NULL}
};
@ -1064,18 +1364,6 @@ register_lsyncd(lua_State *L)
* Lsyncd Core
****************************************************************************/
/**
* Dummy variable whos address is used as the cores index in the lua registry
* to the lua runners function table in the lua registry.
*/
static int runner;
/**
* Dummy variable whos address is used as the cores index n the lua registry
* to the lua runners error handler.
*/
static int callError;
/**
* Pushes a function from the runner on the stack.
* Prior it pushed the callError handler.
@ -1098,120 +1386,6 @@ load_runner_func(lua_State *L,
lua_remove(L, -2);
}
/**
* List of file descriptor watches.
*/
static struct observance * observances = NULL;
static int observances_len = 0;
static int observances_size = 0;
/**
* List of file descriptors to nonobserve.
* While working for the oberver lists, it may
* not be altered, thus nonobserve stores here the
* actions that will be delayed.
*/
static int *nonobservances = NULL;
static int nonobservances_len = 0;
static int nonobservances_size = 0;
/**
* true while the observances list is being handled.
*/
static bool observance_action = false;
/**
* Core watches a filedescriptor to become ready,
* one of read_ready or write_ready may be zero
*/
extern void
observe_fd(int fd,
void (*ready) (lua_State *, struct observance *),
void (*writey)(lua_State *, struct observance *),
void (*tidy) (struct observance *),
void *extra)
{
int pos;
if (observance_action) {
// TODO
logstring("Error",
"internal, New observances in ready/writey handlers not yet supported");
exit(-1); // ERRNO
}
if (!tidy) {
logstring("Error",
"internal, tidy() in observe_fd() must not be NULL.");
exit(-1); // ERRNO
}
if (observances_len + 1 > observances_size) {
observances_size = observances_len + 1;
observances = s_realloc(observances,
observances_size * sizeof(struct observance));
}
for(pos = 0; pos < observances_len; pos++) {
if (observances[pos].fd <= fd) {
break;
}
}
if (observances[pos].fd == fd) {
logstring("Error",
"Observing already an observed file descriptor.");
exit(-1); // ERRNO
}
memmove(observances + pos + 1, observances + pos,
(observances_len - pos) * (sizeof(struct observance)));
observances_len++;
observances[pos].fd = fd;
observances[pos].ready = ready;
observances[pos].writey = writey;
observances[pos].tidy = tidy;
observances[pos].extra = extra;
}
/**
* Makes core no longer watch fd.
*/
extern void
nonobserve_fd(int fd)
{
int pos;
if (observance_action) {
/* this function is called through a ready/writey handler
* while the core works through the observance list, thus
* it does not alter the list, but stores this actions
* on a stack
*/
nonobservances_len++;
if (nonobservances_len > nonobservances_size) {
nonobservances_size = nonobservances_len;
nonobservances = s_realloc(nonobservances,
nonobservances_size * sizeof(int));
}
nonobservances[nonobservances_len - 1] = fd;
return;
}
/* looks for the fd */
for(pos = 0; pos < observances_len; pos++) {
if (observances[pos].fd == fd) {
break;
}
}
if (pos >= observances_len) {
logstring("Error",
"internal fail, not observance file descriptor in nonobserve");
exit(-1); //ERRNO
}
/* and moves the list down */
memmove(observances + pos, observances + pos + 1,
(observances_len - pos) * (sizeof(struct observance)));
observances_len--;
}
/**
* Daemonizes.
*
@ -1338,12 +1512,12 @@ masterloop(lua_State *L)
FD_ZERO(&wfds);
for(pi = 0; pi < observances_len; pi++) {
int fd = observances[pi].fd;
if (observances[pi].ready) {
FD_SET(fd, &rfds);
struct observance *obs = observances + pi;
if (obs->ready) {
FD_SET(obs->fd, &rfds);
}
if (observances[pi].writey) {
FD_SET(fd, &wfds);
if (obs->writey) {
FD_SET(obs->fd, &wfds);
}
}

View File

@ -2644,7 +2644,7 @@ function runner.getAlarm()
-- checks for an userAlarm
checkAlarm(UserAlarms.getAlarm())
log("Debug", "getAlarm returns: ",alarm)
log("Debug","getAlarm returns: ",alarm)
return alarm
end
@ -2763,6 +2763,20 @@ function spawnShell(agent, command, ...)
return spawn(agent, "/bin/sh", "-c", command, "/bin/sh", ...)
end
-----
-- Observes a filedescriptor
--
function observefd(fd, ready, writey)
return lsyncd.observe_fd(fd, ready, writey)
end
-----
-- Nonobserves a filedescriptor
--
function nonobservefd(fd)
return lsyncd.nonobserve_fd(fd)
end
-----
-- Calls func at timestamp.
-- Use now() to receive current timestamp