/* paraNode - parasol node server. */

#include "paraCommon.h"
#include <grp.h>
#include "errAbort.h"
#include "dystring.h"
#include "dlist.h"
#include "hash.h"
#include "localmem.h"
#include "options.h"
#include "subText.h"
#include "paraLib.h"
#include "net.h"
#include "portable.h"
#include "rudp.h"
#include "paraMessage.h"
#include "internet.h"
#include "log.h"
#include <sys/resource.h>

/* command line option specifications */
static struct optionSpec optionSpecs[] = {
    {"logFacility", OPTION_STRING},
    {"logMinPriority", OPTION_STRING},
    {"log", OPTION_STRING},
    {"debug", OPTION_BOOLEAN},
    {"hub", OPTION_STRING},
    {"umask", OPTION_INT},
    {"userPath", OPTION_STRING},
    {"sysPath", OPTION_STRING},
    {"env", OPTION_STRING|OPTION_MULTI},
    {"randomDelay", OPTION_INT},
    {"cpu", OPTION_INT},
    {"localhost", OPTION_STRING},
    {NULL, 0}
};

char *version = PARA_VERSION;   /* Version number. */

void usage()
/* Explain usage and exit. */
{
errAbort("paraNode - version %s\n"
	 "Parasol node server.\n"
         "usage:\n"
	 "    paraNode start\n"
	 "options:\n"
	 "    -logFacility=facility  Log to the specified syslog facility - default local0.\n"
         "    -logMinPriority=pri minimum syslog priority to log, also filters file logging.\n"
         "     defaults to \"warn\"\n"
         "    -log=file  Log to file instead of syslog.\n"
         "    -debug  Don't daemonize\n"
	 "    -hub=host  Restrict access to connections from hub.\n"
	 "    -umask=000  Set umask to run under - default 002.\n"
	 "    -userPath=bin:bin/i386  User dirs to add to path.\n"
	 "    -sysPath=/sbin:/local/bin  System dirs to add to path.\n"
         "    -env=name=value - add environment variable to jobs.  Maybe repeated.\n"
	 "    -randomDelay=N  Up to this many milliseconds of random delay before\n"
	 "        starting a job.  This is mostly to avoid swamping NFS with\n"
	 "        file opens when loading up an idle cluster.  Also it limits\n"
	 "        the impact on the hub of very short jobs. Default 5000.\n"
	 "    -cpu=N  Number of CPUs to use - default 1.\n"
	, version
	);
}


/* Command line overwriteable variables. */
char *hubName;			/* Name of hub machine, may be NULL. */
int umaskVal = 0002;		/* File creation mask. */
int maxProcs = 1;		/* Number of processers allowed to use. */
char *userPath = "";		/* User stuff to add to path. */
char *sysPath = "";		/* System stuff to add to path. */
struct slName *envExtra = NULL; /* Add to environment */ 
int randomDelay = 5000;		/* How much to delay job startup. */

/* Other globals. */
char *hostName;			/* Name of this machine. */
char   hubIp[NI_MAXHOST];	/* Hub IP address as a string. */
char localIp[NI_MAXHOST];	/* localhost IP address as a string. */
int busyProcs = 0;		/* Number of processers in use. */
struct rudp *mainRudp;		/* Rudp wrapper around main socket. */ 
struct paraMessage pmIn;	/* Input message */
double ticksToHundreths;	/* Conversion factor from system ticks
                                 * to 100ths of second. */

struct job
/* Info on one job in this node. */
    {
    struct job *next;	/* Next job. */
    int jobId;		/* Job ID for hub. */
    int pid;		/* Process ID of running job. */
    char *startMessage;	/* Full message that started this job. */
    char *doneMessage;  /* Full message that ended this job if any. */
    struct dlNode *node; /* Node for list this is on. */
    };
struct dlList *jobsRunning;		/* List of currently running jobs. */
struct dlList *jobsFinished;	/* List of recent finished jobs. */

struct job *findJobOnList(struct dlList *list, int jobId)
/* Return job if it's on list, otherwise NULL */
{
struct dlNode *node;
struct job *job;
for (node = list->head; !dlEnd(node); node = node->next)
    {
    job = node->val;
    if (job->jobId == jobId)
        return job;
    }
return NULL;
}

struct job *findRunningJob(int jobId)
/* Return job if it's on running list, otherwise NULL */
{
return findJobOnList(jobsRunning, jobId);
}

struct job *findFinishedJob(int jobId)
/* Return recently finished job or NULL if it doesn't exist */
{
return findJobOnList(jobsFinished, jobId);
}

extern char **environ;	/* The environment strings. */

char **hashToEnviron(struct hash *hash)
/* Create an environ formatted string from hash. */
{
struct dyString *dy = dyStringNew(512);
struct hashEl *list = hashElListHash(hash), *el;
char **newEnv;
int envCount = slCount(list), i;
AllocArray(newEnv, envCount+1);
for (i=0, el=list; i<envCount; ++i, el = el->next)
    {
    dyStringClear(dy);
    dyStringAppend(dy, el->name);
    dyStringAppend(dy, "=");
    dyStringAppend(dy, el->val);
    newEnv[i] = cloneString(dy->string);
    }
dyStringFree(&dy);
hashElFreeList(&list);
return newEnv;
}

struct hash *environToHash(char **env)
/* Put environment into hash. */
{
struct hash *hash = newHash(7);
char *name, *val, *s;
while ((s = *env++) != NULL)
    {
    name = cloneString(s);
    val = strchr(name, '=');
    if (val != NULL)
        {
	*val++ = 0;
	hashAdd(hash, name, cloneString(val));
	}
    freez(&name);
    }
return hash;
}

unsigned randomNumber;	/* Last random number generated. */

void initRandom()
/* Initialize random number generator */
{
/* Set up random number generator with seed depending on host name. */
unsigned seed = hashCrc(hostName)&0xfffff;
srand(seed);
}

void nextRandom()
/* Generate next random number.  This is in
 * executed main thread in response to any message
 * rather than in child threads. */
{
randomNumber = rand();
}

void randomSleep()
/* Put in a random sleep time of 0-5 seconds to help
 * prevent overloading hub with jobDone messages all
 * at once, and file server with file open requests
 * all at once. */
{
if (randomDelay > 0)
    {
    int sleepTime = randomNumber%randomDelay;
    sleep1000(sleepTime);
    }
}


void hashUpdate(struct hash *hash, char *name, char *val)
/* Update hash with name/val pair.   If name not in hash
 * already, put it in hash, otherwise free up old value
 * associated with name and put new value in it's place. */
{
struct hashEl *hel = hashLookup(hash, name);
val = cloneString(val);
if (hel == NULL)
    hashAdd(hash, name, val);
else
    {
    freez(&hel->val);
    hel->val = val;
    }
}

void changeUid(char *name, char **retDir)
/* Try and change process user (and groups) id to that of name. if 
 * we are root.  Get home dir no matter what. */
{
struct passwd *pw = getpwnam(name);
if (pw == NULL)
    errnoAbort("can't obtain user passwd entry for user %s", name);
if (geteuid() == 0)
    {
    if (initgroups(name, pw->pw_gid) < 0)
        errnoAbort("initgroups failed");
    if (setgid(pw->pw_gid) < 0)
        errnoAbort("setgid failed");
    if (setuid(pw->pw_uid) < 0)
        errnoAbort("setuid failed");
    }
if (retDir != NULL)
    *retDir = pw->pw_dir;
}

static int grandChildId = 0;

void termHandler()
/* Handle termination signal. */
{
if (grandChildId != 0)
    {
    kill(-grandChildId, SIGTERM);
    grandChildId = 0;
    // sleep(3);
    // kill(-grandChildId, SIGKILL);
    }
}

void updatePath(struct hash *hash, char *userPath, 
	char *homeDir, char *sysPath)
/* Prepend userPath and system path to existing path. 
 * Add homeDir in front of all elements of user path. */
{
struct dyString *dy = dyStringNew(1024);
char *s, *e;
char *oldPath;

/* Go through user path - which is colon separated
 * and prepend homeDir to it. */
userPath = cloneString(userPath);
s = userPath;
for (;;)
    {
    if (s == NULL || s[0] == 0)
        break;
    e = strchr(s, ':');
    if (e != NULL)
       *e++ = 0;
    dyStringAppend(dy, homeDir);
    dyStringAppend(dy, "/");
    dyStringAppend(dy, s);
    dyStringAppend(dy, ":");
    s = e;
    }

/* Add system path next. */
if (sysPath != NULL && sysPath[0] != 0)
    {
    dyStringAppend(dy, sysPath);
    if (lastChar(sysPath) != ':')
	dyStringAppend(dy, ":");
    }

/* Add paths we inherited from root. */
oldPath = hashFindVal(hash, "PATH");
if (oldPath == NULL || oldPath[0] == 0)
    oldPath = "/bin:/usr/bin";
dyStringAppend(dy, oldPath);

hashUpdate(hash, "PATH", dy->string);
freez(&userPath);
dyStringFree(&dy);
}

void addEnvExtra(struct hash *hash, char *nameVal)
/* parse and add one of the environment extra entries */
{
char *eq = strchr(nameVal, '=');
if (eq == NULL)
    errAbort("invalid -env argument, expected -env=name=value, got -env=%s", nameVal);
*eq = '\0';
hashUpdate(hash, nameVal, eq+1);
*eq = '=';
}

void addEnvExtras(struct hash *hash)
/* add environment extras */
{
struct slName *nameVal;
for (nameVal = envExtra; nameVal != NULL; nameVal = nameVal->next)
    addEnvExtra(hash, nameVal->name);
}

void getTicksToHundreths()
/* Return number of hundreths of seconds per system tick.
 * It used to be CLK_TCK would work for this, but
 * under recent Linux's it doesn't. */
{
#ifdef CLK_TCK
    ticksToHundreths = 100.0/CLK_TCK;
#else
    ticksToHundreths = 100.0/sysconf(_SC_CLK_TCK);
#endif /* CLK_TCK */
}

void setupProcStdio(char *in, char *out, char *err)
/* setup stdio for process (after fork) */
{
int newStdin, newStdout, newStderr;

close(mainRudp->socket);

/* do stderr first */
newStderr = open(err, O_WRONLY | O_CREAT, 0666);
if (newStderr < 0)
    errnoAbort("can't open job stderr file %s", err);
if (dup2(newStderr, STDERR_FILENO) < 0)
    errnoAbort("can't dup2 stderr");

newStdin = open(in, O_RDONLY);
if (newStdin < 0)
    errnoAbort("can't open job stdin file %s", in);
if (dup2(newStdin, STDIN_FILENO) < 0)
    errnoAbort("can't dup2 stdin");

newStdout = open(out, O_WRONLY | O_CREAT, 0666);
if (newStdout < 0)
    errnoAbort("can't open job stdout file %s", out);
if (dup2(newStdout, STDOUT_FILENO) < 0)
    errnoAbort("can't dup2 stdout");
}

void execProc(char *managingHost, char *jobIdString, char *reserved,
	char *user, char *dir, char *in, char *out, char *err, long long memLimit,
	char *exe, char **params)
/* This routine is the child process of doExec.
 * It spawns a grandchild that actually does the
 * work and waits on it.  It sends message to the
 * main message loop here when done. */
{
if ((grandChildId = forkOrDie()) == 0)
    {
    // use rlimit to limit RAM available to process so it will not hog the machine unfairly.
    if (sizeof(long) == 4 && memLimit > 4294967295LL)  // 4GB is max value on 32-bit platform.
	{
	logWarn("memLimit %lld exceeds address space on 32-bit machine, treating it as 4GB limit", memLimit);
	memLimit = 4294967295LL;
	}
    struct rlimit rlim;
    rlim.rlim_cur = rlim.rlim_max = memLimit;
    if (setrlimit(RLIMIT_DATA, &rlim) < 0)
	logWarn("setrlimit failed with RLIMIT_DATA rlim_cur=%lld rlim_max=%lld"
	    , (long long) rlim.rlim_cur , (long long) rlim.rlim_max); 
    // although RLIMIT_AS is not supported/enforced on all platforms,
    // it is useful for linux and some other unix OSes. 
    if (setrlimit(RLIMIT_AS, &rlim) < 0)
	logWarn("setrlimit failed with RLIMIT_AS rlim_cur=%lld rlim_max=%lld"
	    , (long long) rlim.rlim_cur , (long long) rlim.rlim_max); 

    if (0) { // change to 1 for debugging
    logInfo("memLimit=%lld", memLimit);
    struct rlimit rlim; 
    int rv; 
    rv = getrlimit(RLIMIT_DATA,&rlim); 
    if ( rv == -1 ) 
	logWarn("error getrlimit RLIMIT_DATA %s", strerror(errno)); 
    else 
    	logInfo("rlimit_data:%lu,%lu\n", (unsigned long) rlim.rlim_max, (unsigned long) rlim.rlim_cur); 
    rv = getrlimit(RLIMIT_AS,&rlim); 
    if ( rv == -1 ) 
	logWarn("error getrlimit RLIMIT_AS %s", strerror(errno)); 
    else 
    	logInfo("rlimit_as:%lu,%lu\n", (unsigned long) rlim.rlim_max, (unsigned long) rlim.rlim_cur); 
    }


    char *homeDir = "";

    /* Change to given user (if root) */
    changeUid(user, &homeDir);

    /* create output files just after becoming user so that errors in the rest
     * of this proc will go to the err file and be available via para
     * problems */
    setupProcStdio(in, out, err);

    if (chdir(dir) < 0)
        errnoAbort("can't chdir to %s", dir);
    setsid();
    // setpgid(0,0);
    umask(umaskVal); 


    /* Update environment. */
        {
	struct hash *hash = environToHash(environ);
	hashUpdate(hash, "JOB_ID", jobIdString);
	hashUpdate(hash, "USER", user);
	hashUpdate(hash, "HOME", homeDir);
	hashUpdate(hash, "HOST", hostName);
	hashUpdate(hash, "PARASOL", "7");
	updatePath(hash, userPath, homeDir, sysPath);
        addEnvExtras(hash);
	environ = hashToEnviron(hash);
	freeHashAndVals(&hash);
	}

    randomSleep();	/* Sleep a random bit before executing this thing
                         * to help spread out i/o when a big batch of jobs
			 * hit idle cluster */
    execvp(exe, params);
    errnoAbort("execvp'ing %s", exe);
    }
else
    {
    /* Wait on executed job and send jobID and status back to 
     * main process. */
    int status = -1;
    int cid;
    struct paraMessage pm;
    struct rudp *ru = NULL;
    struct tms tms;
    unsigned long uTime = 0;
    unsigned long sTime = 0;

    if (grandChildId >= 0)
	{
	signal(SIGTERM, termHandler);
	cid = waitpid(grandChildId, &status, 0);
        if (cid < 0)
            errnoAbort("wait on grandchild failed");
	times(&tms);
	uTime = ticksToHundreths*tms.tms_cutime;
	sTime = ticksToHundreths*tms.tms_cstime;
	}
    ru = rudpOpen();
    if (ru != NULL)
	{
	ru->maxRetries = 20;
	pmInit(&pm, localIp, paraNodePortStr);
	pmPrintf(&pm, "jobDone %s %s %d %lu %lu", managingHost, 
	    jobIdString, status, uTime, sTime);
	pmSend(&pm, ru);
	rudpClose(&ru);
	}
    }
}

void clearZombies()
/* Clear any zombie processes */
{
int stat;
for (;;)
    {
    if (waitpid(-1, &stat, WNOHANG) <= 0)
        break;
    }
}

void lookupIp(char *host, char *ipStr, int ipStrSize)
/* convert host into IP address string. */
{
struct sockaddr_storage sai;
if (!internetFillInAddress6n4(host, NULL, AF_UNSPEC, SOCK_DGRAM, &sai, FALSE))
    errAbort("host %s lookup failed.", host);
getAddrAsString6n4(&sai, ipStr, ipStrSize);
}


void tellManagerJobIsDone(char *managingHost, char *jobIdString, char *line)
/* Try and send message to host saying job is done. */
{
struct paraMessage pm;
pmInitFromName(&pm, managingHost, paraHubPortStr);
pmPrintf(&pm, "jobDone %s %s", jobIdString, line);
if (!pmSend(&pm, mainRudp))
    warn("Couldn't send message to %s to say %s is done\n", managingHost, jobIdString);
}

void jobFree(struct job **pJob)
/* Free up memory associated with job */
{
struct job *job = *pJob;
if (job != NULL)
    {
    freeMem(job->startMessage);
    freeMem(job->doneMessage);
    freeMem(job->node);
    freez(pJob);
    }
}

void jobDone(char *line)
/* Handle job-done message - forward it to managing host. */
{
char *managingHost = nextWord(&line);
char *jobIdString = nextWord(&line);

// clearZombies();
if (jobIdString != NULL && line != NULL && line[0] != 0)
    {
    /* Remove job from list running list and put on recently finished list. */
    struct job *job = findRunningJob(atoi(jobIdString));
    if (job != NULL)
        {
	int status, err;
	err = waitpid(job->pid, &status, 0);
	if (err == -1 || !WIFEXITED(status) || WEXITSTATUS(status) != 0)
	    {
	    logDebug("paraNode sheparding %s pid %d status %d err %d errno %d", 
                     jobIdString, job->pid, status, err, errno);
	    }
	job->doneMessage = cloneString(line);
	dlRemove(job->node);
	if (dlCount(jobsFinished) >= 4*maxProcs)
	    {
	    struct dlNode *node = dlPopTail(jobsFinished);
	    struct job *oldJob = node->val;
	    jobFree(&oldJob);
	    }
	dlAddHead(jobsFinished, job->node);
	--busyProcs;
	}
    tellManagerJobIsDone(managingHost, jobIdString, line);
    }
}

void doCheck(char *line, struct sockaddr_storage *ipAddress)
/* Send back check result - either a check in message or
 * jobDone. */
{
char *jobIdString = nextWord(&line);
if (jobIdString != NULL)
    {
    int jobId = atoi(jobIdString);
    struct job *job = findRunningJob(jobId);
    struct paraMessage pm;

    char     ipStr[NI_MAXHOST];
    getAddrAsString6n4(ipAddress, ipStr, sizeof ipStr);
    pmInit(&pm, ipStr, paraHubPortStr);
    if (job != NULL)
	pmPrintf(&pm, "checkIn %s %s running", hostName, jobIdString);
    else
	{
	struct job *job = findFinishedJob(jobId);
	if (job == NULL)
	    pmPrintf(&pm, "checkIn %s %s free", hostName, jobIdString);
	else
	    pmPrintf(&pm, "jobDone %s %s", jobIdString, job->doneMessage);
	}
    pmSend(&pm, mainRudp);
    }
}

void doResurrect(char *line, struct sockaddr_storage *ipAddress)
/* Send back I'm alive message */
{
struct paraMessage pm;
struct dlNode *node;
int jobsReported = 0;
char     ipStr[NI_MAXHOST];
getAddrAsString6n4(ipAddress, ipStr, sizeof ipStr);
pmInit(&pm, ipStr, paraHubPortStr);
pmPrintf(&pm, "alive %s", hostName);
for (node = jobsRunning->head; !dlEnd(node); node = node->next)
    {
    struct job *job = node->val;
    pmPrintf(&pm, " %d", job->jobId);
    ++jobsReported;
    }
for (node = jobsFinished->head; !dlEnd(node); node = node->next)
    {
    struct job *job = node->val;
    if (jobsReported >= maxProcs)
	break;
    pmPrintf(&pm, " %d", job->jobId);
    ++jobsReported;
    }
pmSend(&pm, mainRudp);
}

void doRun(char *line, struct sockaddr_storage *ipAddress)
/* Execute command. */
{
char *jobMessage = cloneString(line);
static char *args[1024];
int argCount;
char ipStr[NI_MAXHOST];
getAddrAsString6n4(ipAddress, ipStr, sizeof ipStr);

nextRandom();
if (line == NULL)
    warn("Executing nothing...");
else
    {
    struct runJobMessage rjm;
    if (parseRunJobMessage(line, &rjm))
	{
	int jobId = atoi(rjm.jobIdString);
	if (findRunningJob(jobId) == NULL && findFinishedJob(jobId) == NULL)
	    {
	    if (busyProcs < maxProcs)
		{
		int childPid;
		argCount = chopLine(rjm.command, args);
		if (argCount >= ArraySize(args))
		    warn("Too many arguments to run");
		else
		    {
		    args[argCount] = NULL;
		    if ((childPid = forkOrDie()) == 0)
			{
			/* Do JOB_ID substitutions */
			struct subText *st = subTextNew("$JOB_ID", rjm.jobIdString);
			int i;
			rjm.in = subTextString(st, rjm.in);
			rjm.out = subTextString(st, rjm.out);
			rjm.err = subTextString(st, rjm.err);
			for (i=0; i<argCount; ++i)
			    args[i] = subTextString(st, args[i]);

			execProc(ipStr, rjm.jobIdString, rjm.reserved,
			    rjm.user, rjm.dir, rjm.in, rjm.out, rjm.err, rjm.ram,
			    args[0], args);
			exit(0);
			}
		    else
			{
			struct job *job;
			AllocVar(job);
			job->jobId = atoi(rjm.jobIdString);
			job->pid = childPid;
			job->startMessage = jobMessage;
			jobMessage = NULL;	/* No longer own memory. */
			job->node = dlAddValTail(jobsRunning, job);
			++busyProcs;
			}
		    }
		}
	    else
		{
		warn("Trying to run when busy.");
		}
	    }
	else
	    {
	    warn("Duplicate run-job %d\n", jobId);
	    }
	}
    }
freez(&jobMessage);
}

void doFetch(char *line)
/* Fetch first part of file.  Protocol is to send the
 * file one UDP packet at a time.  A zero length packet
 * indicates end of file. */
{
char *user = cloneString(nextWord(&line));
char *fileName = cloneString(nextWord(&line));
if ((user == NULL) || (fileName != NULL))
    {
    FILE *f = fopen(fileName, "r");
    pmClear(&pmIn);
    if (f == NULL)
	{
        if (user == NULL)
            user = "<null>";
	pmPrintf(&pmIn, "Couldn't open fetch file: \"%s\" %s for user %s",
                 fileName, strerror(errno), user);
	warn("Couldn't open fetch file: \"%s\" %s for user %s",
             fileName, strerror(errno), user);
	pmSend(&pmIn, mainRudp);
	pmClear(&pmIn);
	pmSend(&pmIn, mainRudp);
	}
    else
	{
	int size;
	for (;;)
	    {
	    size = fread(pmIn.data,1,  sizeof(pmIn.data)-1, f);
	    if (size < 0)
		{
		size = 0;
		warn("Couldn't read fetch file: \"%s\" %s",
                     fileName, strerror(errno));
		}
	    pmIn.size = size;
	    pmSend(&pmIn, mainRudp);
	    if (size == 0)
		break;
	    }
	fclose(f);
	}
    }
freez(&user);
freez(&fileName);
}

void doKill(char *line)
/* Kill a specific job. */
{
char *jobIdString = nextWord(&line);
int jobId = atoi(jobIdString);
if (jobId != 0)
    {
    struct job *job = findRunningJob(jobId);
    if (job != NULL)
        {
	kill(job->pid, SIGTERM);
	dlRemove(job->node);
	jobFree(&job);
	--busyProcs;
	}
    }
else
    {
    warn("Nothing to kill\n");
    }
}


void doStatus()
/* Report status. */
{
pmClear(&pmIn);
pmPrintf(&pmIn, "%d of %d CPUs busy", busyProcs, maxProcs);
if (busyProcs > 0)
    {
    struct dlNode *node;
    pmPrintf(&pmIn, ". Jobs:");
    for (node = jobsRunning->head; !dlEnd(node); node = node->next)
        {
	struct job *job = node->val;
        pmPrintf(&pmIn, " %d", job->jobId);
	}
    }
pmSend(&pmIn, mainRudp);
}

void listJobs()
/* Report jobs running and recently finished. */
{
struct job *job;
struct dlNode *node;

pmClear(&pmIn);
pmPrintf(&pmIn, "%d running", dlCount(jobsRunning));
if (!pmSend(&pmIn, mainRudp))
    return;
for (node = jobsRunning->head; !dlEnd(node); node=node->next)
    {
    job = node->val;
    pmClear(&pmIn);
    pmPrintf(&pmIn, "%s", job->startMessage);
    if (!pmSend(&pmIn, mainRudp))
        return;
    }
pmClear(&pmIn);
pmPrintf(&pmIn, "%d recent", dlCount(jobsFinished));
if (!pmSend(&pmIn, mainRudp))
    return;
for (node = jobsFinished->head; !dlEnd(node); node=node->next)
    {
    job = node->val;
    pmClear(&pmIn);
    pmPrintf(&pmIn, "%s", job->startMessage);
    if (!pmSend(&pmIn, mainRudp))
        return;
    pmClear(&pmIn);
    pmPrintf(&pmIn, "%s", job->doneMessage);
    if (!pmSend(&pmIn, mainRudp))
        return;
    }
}

void paraNode()
/* paraNode - a net server. */
{
char *line;
char *command;
struct sockaddr_storage sai;

/* We have to know who we are... */
hostName = getMachine();
initRandom();
getTicksToHundreths();

/* log init */
if (optionExists("log"))
    logOpenFile("paraNode", optionVal("log", NULL));
else    
    logOpenSyslog("paraNode", optionVal("logFacility", NULL));
logSetMinPriority(optionVal("logMinPriority", "info"));
logInfo("starting paraNode on %s", hostName);

/* Make job lists. */
jobsRunning = newDlList();
jobsFinished = newDlList();

/* Set up socket and self to listen to it. */
ZeroVar(&sai);

if (!internetFillInAddress6n4(NULL, paraNodePortStr, AF_INET6, SOCK_DGRAM, &sai, FALSE))
    errAbort("NULL host addrinfo lookup failed trying to bind listener.");

mainRudp = rudpMustOpenBound(&sai);
mainRudp->maxRetries = 12;

/* Event loop. */
findNow();
for (;;)
    {
    /* Get next incoming message and optionally check to make
     * sure that it's from a host we trust, and check signature
     * on first bit of incoming data. */
    if (pmReceive(&pmIn, mainRudp))
	{
	findNow();
	char pmIpStr[NI_MAXHOST];
	getAddrAsString6n4(&pmIn.ipAddress, pmIpStr, sizeof pmIpStr);

	if (hubName == NULL || sameString(pmIpStr, hubIp) || sameString(pmIpStr, localIp))
	    {
	    /* Host and signature look ok,  read a string and
	     * parse out first word as command. */
	    line = pmIn.data;
	    logDebug("message from %s: \"%s\"", pmIpStr, line);
	    command = nextWord(&line);
	    if (command != NULL)
		{
		if (sameString("quit", command))
		    break;
		else if (sameString("run", command))
		    doRun(line, &pmIn.ipAddress);
		else if (sameString("jobDone", command))
		    jobDone(line);
		else if (sameString("status", command))
		    doStatus();
		else if (sameString("kill", command))
		    doKill(line);
		else if (sameString("check", command))
		    doCheck(line, &pmIn.ipAddress);
		else if (sameString("resurrect", command))
		    doResurrect(line, &pmIn.ipAddress);
		else if (sameString("listJobs", command))
		    listJobs();
		else if (sameString("fetch", command))
		    doFetch(line);
                else
                    logWarn("invalid command: \"%s\"", command);
		}
	    logDebug("done command");
	    }
	else
	    {
	    logWarn("command from unauthorized host %s", pmIpStr);
	    }
	}
    }
rudpClose(&mainRudp);
}

int main(int argc, char *argv[])
/* Process command line. */
{
optionInit(&argc, argv, optionSpecs);
if (argc != 2)
    usage();
maxProcs = optionInt("cpu", 1);
umaskVal = optionInt("umask", 0002);
userPath = optionVal("userPath", userPath);
sysPath = optionVal("sysPath", sysPath);
envExtra = optionMultiVal("env", NULL);
randomDelay = optionInt("randomDelay", randomDelay);

/* Look up IP addresses. */
lookupIp("localhost", localIp, sizeof localIp);
hubName = optionVal("hub", NULL);
if (hubName != NULL)
    lookupIp(hubName, hubIp, sizeof hubIp);
paraDaemonize("paraNode");
paraNode();
return 0;
}


