/* para - para - manage a batch of jobs in parallel on a compute cluster.. */
#include "paraCommon.h"
#include "errAbort.h"
#include "linefile.h"
#include "options.h"
#include "hash.h"
#include "localmem.h"
#include "dystring.h"
#include "obscure.h"
#include "portable.h"
#include "net.h"
#include "paraLib.h"
#include "paraMessage.h"
#include "jobDb.h"
#include "jobResult.h"
#include "verbose.h"
#include "sqlNum.h"


/* command line option specifications */
static struct optionSpec optionSpecs[] = {
    {"retries"  , OPTION_INT},
    {"maxQueue" , OPTION_INT},
    {"minPush"  , OPTION_INT},
    {"maxPush"  , OPTION_INT},
    {"warnTime" , OPTION_INT},
    {"killTime" , OPTION_INT},
    {"delayTime", OPTION_INT},
    {"eta"      , OPTION_BOOLEAN},  /* default, no longer used */
    {"pri"      , OPTION_STRING},
    {"priority" , OPTION_STRING},
    {"maxJob"   , OPTION_STRING},
    {"cpu"      , OPTION_FLOAT},
    {"ram"      , OPTION_STRING},
    {"batch"    , OPTION_STRING},
    {"jobCwd"   , OPTION_STRING},
    {NULL, 0}
};

char *version = PARA_VERSION;   /* Version number. */

static int numHappyDots;       /* number of happy dots written */

void usage()
/* Explain usage and exit. */
{
errAbort(
  "para - version %s\n"
  "Manage a batch of jobs in parallel on a compute cluster.\n"
  "Normal usage is to do a 'para create' followed by 'para push' until\n"
  "job is done.  Use 'para check' to check status.\n"
  "usage:\n\n"
  "   para [options] command [command-specific arguments]\n\n"
  "The commands are:\n"
  "\n"
  "para create jobList\n"
  "   This makes the job-tracking database from a text file with the\n"
  "   command line for each job on a separate line.\n"
  "   options:\n"
  "      -cpu=N  Number of CPUs used by the jobs, default 1.\n"
  "      -ram=N  Number of bytes of RAM used by the jobs.\n"
  "         Default is RAM on node divided by number of cpus on node.\n"
  "         Shorthand expressions allow t,g,m,k for tera, giga, mega, kilo.\n"
  "         e.g. 4g = 4 Gigabytes.\n"
  "      -batch=batchDir - specify the directory path that is used to store the\n"
  "       batch control files.  The batchDir can be an absolute path or a path\n"
  "       relative to the current directory.  The resulting path is use as the\n"
  "       batch name.  The directory is created if it doesn't exist.  When\n"
  "       creating a new batch, batchDir should not have been previously used as\n"
  "       a batch name.  The batchDir must be writable by the paraHub process.\n"
  "       This does not affect the working directory assigned to jobs.  It defaults\n"
  "       to the directory where para is run.  If used, this option must be specified\n"
  "       on all para commands for the  batch.  For example to run two batches in the\n"
  "       same directory:\n"
  "          para -batch=b1 make jobs1\n"
  "          para -batch=b2 make jobs2\n"
  "para push \n"
  "   This pushes forward the batch of jobs by submitting jobs to parasol\n"
  "   It will limit parasol queue size to something not too big and\n"
  "   retry failed jobs.\n"
  "   options:\n"
  "      -retries=N  Number of retries per job - default 4.\n"
  "      -maxQueue=N  Number of jobs to allow on parasol queue. \n"
  "         Default 2000000.\n"
  "      -minPush=N  Minimum number of jobs to queue. \n"
  "         Default 1.  Overrides maxQueue.\n"
  "      -maxPush=N  Maximum number of jobs to queue - default 100000.\n"
  "      -warnTime=N  Number of minutes job runs before hang warning. \n"
  "         Default 4320 (3 days).\n"
  "      -killTime=N  Number of minutes hung job runs before push kills it.\n"
  "         By default kill off for backwards compatibility.\n" /* originally to be 2 weeks */
  "      -delayTime=N  Number of seconds to delay before submitting next job \n"
  "         to minimize i/o load at startup - default 0.\n"
  "      -priority=x  Set batch priority to high, medium, or low.\n"
  "         Default medium (use high only with approval).\n"
  "         If needed, use with make, push, create, shove, or try.\n"
  "         Or, set batch priority to a specific numeric value - default %d.\n"
  "         1 is emergency high priority, \n"
  "         %d is normal medium, \n"
  "         %d is low for bottomfeeders.\n"
  "         Setting priority higher than normal (1-%d) will be logged.\n"
  "         Please keep low priority jobs short, they won't be pre-empted.\n"
  "      -maxJob=x  Limit the number of jobs the batch can run.\n"
  "         Specify number of jobs, for example 10 or 'unlimited'.\n"
  "         Default unlimited displays as -1.\n"
  "      -jobCwd=dir - specify the directory path to use as the current working\n"
  "       directory for each job.  The dir can be an absolute path or a path\n"
  "       relative to the current directory. It defaults to the directory where\n"
  "       para is run.\n"
  "para try \n"
  "   This is like para push, but only submits up to 10 jobs.\n"
  "para shove\n"
  "   Push jobs in this database until all are done or one fails after N retries.\n"
  "para make jobList\n"
  "   Create database and run all jobs in it if possible.  If one job\n"
  "   fails repeatedly this will fail.  Suitable for inclusion in makefiles.\n"
  "   Same as a 'create' followed by a 'shove'.\n"
  "para check\n"
  "   This checks on the progress of the jobs.\n"
  "para stop\n"
  "   This stops all the jobs in the batch.\n"
  "para chill\n"
  "   Tells system to not launch more jobs in this batch, but\n"
  "   does not stop jobs that are already running.\n"
  "para finished\n"
  "   List jobs that have finished.\n"
  "para hung\n"
  "   List hung jobs in the batch (running > killTime).\n"
  "para slow\n"
  "   List slow jobs in the batch (running > warnTime).\n"
  "para crashed\n"
  "   List jobs that crashed or failed output checks the last time they were run.\n"
  "para failed\n"
  "   List jobs that crashed after repeated restarts.\n"
  "para status\n"
  "   List individual job status, including times.\n"
  "para problems\n"
  "   List jobs that had problems (even if successfully rerun).\n"
  "   Includes host info.\n"
  "para running\n"
  "   Print info on currently running jobs.\n"
  "para hippos time\n"
  "   Print info on currently running jobs taking > 'time' (minutes) to run.\n"
  "para time\n"
  "   List timing information.\n"
  "para recover jobList newJobList\n"
  "   Generate a job list by selecting jobs from an existing list where\n"
  "   the `check out' tests fail.\n"
  "para priority 999\n"
  "   Set batch priority. Values explained under 'push' options above.\n"
  "para maxJob 999\n"
  "   Set batch maxJob. Values explained under 'push' options above.\n"
  "para ram 999\n"
  "   Set batch ram usage. Values explained under 'push' options above.\n"
  "para cpu 999\n"
  "   Set batch cpu usage. Values explained under 'push' options above.\n"
  "para resetCounts\n"
  "   Set batch done and crash counters to 0.\n"
  "para flushResults\n"
  "   Flush results file.  Warns if batch has jobs queued or running.\n"
  "para freeBatch\n"
  "   Free all batch info on hub.  Works only if batch has nothing queued or running.\n"
  "para showSickNodes\n"
  "   Show sick nodes which have failed when running this batch.\n"
  "para clearSickNodes\n"
  "   Clear sick nodes statistics and consecutive crash counts of batch.\n"
  "\n"
  "Common options\n"
  "   -verbose=1 - set verbosity level.\n",
  version,
  NORMAL_PRIORITY,
  NORMAL_PRIORITY,
  NORMAL_PRIORITY * NORMAL_PRIORITY,
  NORMAL_PRIORITY-1
  );
}

/* Variables that can be set from command line. */

int retries = 4;
int maxQueue = 2000000;
int minPush = 1;
int maxPush = 100000;
int warnTime = 3*24*60;
int killTime = 0;  /* 0 = off, was originally going to be 2 weeks or 14*24*60; */
int sleepTime = 5*60;
int delayTime = 0;
int priority = NORMAL_PRIORITY;
int maxJob  = -1;
float cpuUsage = 0;
long long ramUsage = 0;
char *batchDir = NULL;
char *jobCwd = NULL;
char resultsName[PATH_LEN];
char errDir[PATH_LEN];

/* Some variable we might want to move to a config file someday. */
char *resultsFileName = "para.results"; /* Name of results file. */
char *statusCommand = "parasol pstat2";
char *killCommand = "parasol remove job";

boolean sickBatch = FALSE;

char *bookMarkFileName = "para.bookmark";  /* name of bookmark file */
char bookMarkName[PATH_LEN];  /* path to bookmark file */
off_t bookMark = 0;  /* faster to resume from bookmark only reading new para.results */ 
off_t resultsSize = 0;  /* where to stop reading results for the current cycle */

void beginHappy()
/* Call before a loop where happy dots maybe written */
{
numHappyDots = 0;
}

void endHappy()
/* Call after a loop where happy dots may have been written.  There maybe more
 * than one call to endHappy() for a beginHappy(). It's a good idea to call
 * this before outputing low-level error messages to keep messages readable
 * if dots are being written.
 */
{
if (numHappyDots > 0)
    {
    verbose(1, "\n");
    fflush(stderr);
    fflush(stdout);
    numHappyDots = 0;
    }
}

void vpprintf(FILE *f, char *format, va_list args)
/* para printf; ensures happy dots are flushed */
{
endHappy();
vfprintf(f, format, args);
}

void pprintf(FILE *f, char *format, ...)
/* para printf; ensures happy dots are flushed */
{
va_list args;
va_start(args, format);
vpprintf(f, format, args);
va_end(args);
}

void paraVaWarn(char *format, va_list args)
/* warn handler, flushes happyDots if needed. */
{
endHappy();
if (format != NULL) {
    fflush(stdout);
    vfprintf(stderr, format, args);
    fprintf(stderr, "\n");
    fflush(stderr);  /* forces to log files */
    }
}

enum jaState 
/* A job is in one of these states. */
 {
 jaUnsubmitted,
 jaQueued,
 jaRunning,
 jaHung,
 jaCrashed,
 jaFinished,
 };

char *jaStateShortDesc[] =
/* short description of job state; indexed by enum jaState */
    {
    "unsub",
    "queue",
    "run",
    "hung",
    "crash",
    "done"
    };

enum jaState figureState(struct job *job)
/* Figure out state of job. */
{
struct submission *sub;
if ((sub = job->submissionList) == NULL)
    return jaUnsubmitted;
if (sub->inQueue)
    return jaQueued;
if (sub->running)
    return jaRunning;
if (sub->hung)
    return jaHung;
if (sub->ranOk)
    return jaFinished;
else
    return jaCrashed;
}


char *useWhats[] = {"cpu", "ram"};

void parseUsage(struct job* job, struct lineFile *lf)
/* Parse out and keep CPU and RAM usage from the job command. */
{
/* allow command-line options to be overridden by spec */
if (cpuUsage != 0)
    job->cpusUsed = cpuUsage;
if (ramUsage != 0)
    job->ramUsed = ramUsage;
char *pattern = "{use";
char *s, *e, *z;
char *line = job->command;
struct dyString *dy = dyStringNew(1024);
s = line;
for (;;)
    {
    e = stringIn(pattern, s);
    if (e == NULL)
	{
	dyStringAppend(dy, s);
	break;
	}
    else
        {
	char *parts[4];
	int partCount;
	dyStringAppendN(dy, s, e-s);
	z = strchr(e, '}');
	if (z == NULL)
	    errAbort("{use without } line %d of %s", lf->lineIx, lf->fileName);
	*z = 0;
	partCount = chopLine(e, parts);
	if (partCount != 3)
	    errAbort("Badly formatted 'use' clause in line %d of %s", lf->lineIx, lf->fileName);
	if (stringIx(parts[1], useWhats) < 0)
	    errAbort("Unrecognized word '%s' in 'use' clause line %d of %s", 
	    	parts[1], lf->lineIx, lf->fileName);
	if (sameString(parts[1], "cpu"))
	    {
    	    job->cpusUsed = sqlFloat(parts[2]);
	    }
	if (sameString(parts[1], "ram"))
	    {
    	    job->ramUsed = paraParseRam(parts[2]);
	    if (job->ramUsed == -1)
		errAbort("Invalid RAM expression '%s' in 'use' clause line %d of %s", 
		    parts[2], lf->lineIx, lf->fileName);
	    }
	s = z+1;
	}
    }
freeMem(job->command);
job->command = cloneString(dy->string);
dyStringFree(&dy);
}


/* Places that can be checked. */
char *checkWhens[] = {"in", "out"};

/* Types of checks. */
char *checkTypes[] = {"exists", "exists+", "line", "line+"};

struct job *jobFromLine(struct lineFile *lf, char *line)
/* Parse out the beginnings of a job from input line. 
 * Parse out and keep checks. */
{
struct check *check;
char *pattern = "{check";
char *s, *e, *z;
struct dyString *dy = dyStringNew(1024);
struct job *job;

AllocVar(job);
job->spec = cloneString(line);
s = line;
for (;;)
    {
    e = stringIn(pattern, s);
    if (e == NULL)
	{
	dyStringAppend(dy, s);
	break;
	}
    else
        {
	char *parts[5];
	int partCount;
	dyStringAppendN(dy, s, e-s);
	z = strchr(e, '}');
	if (z == NULL)
	    errAbort("{check without } line %d of %s", lf->lineIx, lf->fileName);
	*z = 0;
	partCount = chopLine(e, parts);
	if (partCount != 4)
	    errAbort("Badly formatted check line %d of %s", lf->lineIx, lf->fileName);
	AllocVar(check);
	slAddHead(&job->checkList, check);
	job->checkCount += 1;
	if (stringIx(parts[1], checkWhens) < 0)
	    errAbort("Unrecognized word '%s' in check line %d of %s", 
	    	parts[1], lf->lineIx, lf->fileName);
	check->when = cloneString(parts[1]);
	if (stringIx(parts[2], checkTypes) < 0)
	    errAbort("Unrecognized word '%s' in check line %d of %s", 
	    	parts[2], lf->lineIx, lf->fileName);
	check->what = cloneString(parts[2]);
	check->file = cloneString(parts[3]);
	dyStringAppend(dy, check->file);
	s = z+1;
	}
    }
job->command = cloneString(dy->string);
slReverse(&job->checkList);
dyStringFree(&dy);
parseUsage(job, lf);
return job;
}

struct fileStatus
/* Some info on a file. */
    {
    bool exists;	/* TRUE if file exists. */
    bool hasData;	/* TRUE if nonempty. */
    bool completeLastLine; /* TRUE if last line ends with <lf> */
    bool checkedLastLine;   /* TRUE if done completeLastLine check */
    bool reported;	/* TRUE if reported error. */
    };

struct fileStatus *basicFileCheck(char *file, struct hash *hash)
/* Do quick file checks caching result */
{
struct fileStatus *fi = hashFindVal(hash, file);
struct stat statBuf;
if (fi != NULL)
    return fi;
lmAllocVar(hash->lm, fi);
if (stat(file, &statBuf) == 0)
    {
    fi->exists = TRUE;
    fi->hasData = (statBuf.st_size > 0);
    }
return fi;
}

struct fileStatus *completeLastLineCheck(char *file, struct hash *hash)
/* Do slower checks for complete last line, caching result */
{
struct fileStatus *fi = hashFindVal(hash, file);
FILE *f;

if ((fi != NULL) && (fi->checkedLastLine))
    return fi;
if (fi == NULL)
    lmAllocVar(hash->lm, fi);
if ((f = fopen(file, "rb")) != NULL)
    {
    fi->exists = TRUE;
    if (fseek(f, -1, SEEK_END) == 0)
        {
	int c = fgetc(f);
	if (c >= 0)
	    {
	    fi->hasData = TRUE;
	    if (c == '\n')
	        fi->completeLastLine = TRUE;
	    }
	}
    fclose(f);
    }
fi->checkedLastLine = TRUE;
return fi;
}

int doOneCheck(struct check *check, struct hash *hash, FILE *f)
/* Do one check.  Return error count from check. f maybe NULL to discard
 * errors. */
{
struct fileStatus *fi;
char *file = check->file;
char *what = check->what;

/* only do slower completeLastLine check if needed */
if (startsWith("line", what))
    fi = completeLastLineCheck(file, hash);
else
    fi = basicFileCheck(file, hash);

if (!fi->reported)
    {
    if (!fi->exists)
	{
        if (f != NULL)
            pprintf(f, "%s does not exist\n", file);
	fi->reported = TRUE;
	return 1;
	}
    if (sameWord(what, "exists+"))
	{
	if (!fi->hasData)
	    {
            if (f != NULL)
                pprintf(f, "%s is empty\n", file);
	    fi->reported = TRUE;
	    return 1;
	    }
	}
    else if (sameWord(what, "line"))
	{
	if (fi->hasData && !fi->completeLastLine)
	    {
            if (f != NULL)
                pprintf(f, "%s has an incomplete last line\n", file);
	    fi->reported = TRUE;
	    return 1;
	    }
	}
    else if (sameWord(what, "line+"))
	{
	if (!fi->hasData)
	    {
            if (f != NULL)
                pprintf(f, "%s is empty\n", file);
	    fi->reported = TRUE;
	    return 1;
	    }
	else if (!fi->completeLastLine)
	    {
            if (f != NULL)
                pprintf(f, "%s has an incomplete last line\n", file);
	    fi->reported = TRUE;
	    return 1;
	    }
	}
    else if (sameString(what, "exists"))
	{
	/* Check already made. */
	}
    else
	{
	warn("Unknown check '%s'", what);
	}
    }
return fi->reported ? 1 : 0;
}

void occassionalDot()
/* Write out a dot every 20 times this is called. */
{
static int dotMod = 20;
static int dot = 20;
if ((--dot <= 0) && verboseDotsEnabled())
    {
    verboseDot();
    dot = dotMod;
    numHappyDots++;
    }
}

void occassionalSleep()
/* Sleep every 1000 times this is called. */
{
static int dotMod = 600;
static int dot = 600;
if (--dot <= 0)
    {
    fflush(stdout);
    fflush(stderr);
    sleep(1);
    dot = dotMod;
    }
}


int checkOneJob(struct job *job, char *when, struct hash *hash, FILE *f)
/* Perform checks on one job if checks not already in hash. 
 * Returns number of errors. */
{
int errCount = 0;
struct check *check;

for (check = job->checkList; check != NULL; check = check->next)
    {
    if (sameWord(when, check->when))
	{
	errCount += doOneCheck(check, hash, f);
	occassionalDot();
	}
    }
return errCount;
}

void doChecks(struct jobDb *db, char *when)
/* Do checks on files where check->when matches when. */
{
int errCount = 0;
struct job *job;
struct hash *hash = newHash(19);

verbose(1, "Checking %sput files\n", when);
beginHappy();
for (job = db->jobList; job != NULL; job = job->next)
    errCount += checkOneJob(job, when, hash, stderr);
endHappy();
if (errCount > 0)
    errAbort("%d total errors in file check", errCount);
freeHashAndVals(&hash);
}

void writeBatch(struct jobDb *db, char *fileName)
/* Write out batch file. */
{
FILE *f = mustOpen(fileName, "w");
struct job *job;
for (job = db->jobList; job != NULL; job = job->next)
    {
    jobCommaOut(job, f);
    fprintf(f, "\n");
    }
if (ferror(f))
    errAbort("error writing %s", fileName);
carefulClose(&f);
}

void readBookMark()
/* Read the value of the bookMark, an offset into the results file.
 * This should be called before reading the results file. */
{
/* a bookmark should not exist if results file does not exist */
if ((!fileExists(resultsName)) && fileExists(bookMarkName))
    unlink(bookMarkName);
/* read bookmark position */
struct lineFile *lf = lineFileMayOpen(bookMarkName, TRUE);
char *line;
if (!lf)
    return;
if (lineFileNext(lf, &line, NULL))
    {
    bookMark = sqlLongLong(line);
    }
lineFileClose(&lf);
}

void writeBookMark()
/* Save the value of the bookMark, an offset into the results file.
 * This should be called after the batch has been safely updated. */
{
FILE *f = mustOpen(bookMarkName, "w");
fprintf(f, "%llu\n", (unsigned long long) bookMark);
carefulClose(&f);
}

void atomicWriteBatch(struct jobDb *db, char *fileName)
/* Wrapper to avoid corruption file by two para process being run in the same
 * directory. */
{
char hostName[128];
char tmpName[PATH_LEN];
long time = clock1000();

/* generate a unique name for tmp file */
if (gethostname(hostName, sizeof(hostName)) < 0)
    errnoAbort("can't get host name");
safef(tmpName, sizeof(tmpName), "%s.%d.%s.tmp", fileName, getpid(), 
      hostName);

writeBatch(db, tmpName);

/* now rename (which is atomic) */
if (rename(tmpName, fileName) < 0)
    errnoAbort("can't rename %s to %s", tmpName, fileName);

writeBookMark();
verbose(2, "atomicWriteBatch time: %.2f seconds\n", (clock1000() - time) / 1000.0);
}

struct jobDb *readBatch(char *batch)
/* Read a batch file. */
{
struct jobDb *db;
struct job *job;
struct lineFile *lf = lineFileOpen(batch, TRUE);
char *line;
long time = clock1000();

AllocVar(db);
while (lineFileNext(lf, &line, NULL))
    {
    line = skipLeadingSpaces(line);
    if (line[0] == '#' || line[0] == 0)
       continue;
    job = jobCommaIn(&line, NULL);
    slAddHead(&db->jobList, job);
    ++db->jobCount;
    }
lineFileClose(&lf);
slReverse(&db->jobList);
verbose(1, "%d jobs in batch\n", db->jobCount);
verbose(2, "readBatch time: %.2f seconds\n", (clock1000() - time) / 1000.0);
return db;
}

char *hubSingleLineQuery(char *query)
/* Send message to hub and get single line response.
 * This should be freeMem'd when done. */
{
return pmHubSingleLineQuery(query, "localhost");
}

struct slName *hubMultilineQuery(char *query)
/* Send a command with a multiline response to hub,
 * and return response as a list of strings. */
{
return pmHubMultilineQuery(query, "localhost");
}

boolean batchRunning(char *batchName)
/* Return TRUE if a batch is running. */
{
#define NUMLISTBATCHCOLUMNS 12
struct slName *lineList = hubMultilineQuery("listBatches"), *lineEl;
boolean ret = FALSE;
for (lineEl = lineList; lineEl != NULL; lineEl = lineEl->next)
    {
    int wordCount;
    char *line = lineEl->name;
    char *row[NUMLISTBATCHCOLUMNS];
    if (line[0] != '#')
	{
	char *b;
	wordCount = chopLine(line, row);
	b = row[NUMLISTBATCHCOLUMNS-1];
	if (wordCount < NUMLISTBATCHCOLUMNS || b[0] != '/')
	    errAbort("paraHub and para out of sync on listBatches");
	if (sameString(b, batchName))
	    ret = TRUE;
	}
    }
slFreeList(&lineList);
return ret;
}

boolean thisBatchRunning()
/* Return true if this batch is running */
{
return batchRunning(batchDir);
}

struct jobDb *parseJobList(char *jobList)
/* parse a job list */
{
struct lineFile *lf = lineFileOpen(jobList, TRUE);
char *line;
struct jobDb *db;
struct job *job;
AllocVar(db);
while (lineFileNext(lf, &line, NULL))
    {
    line = trimSpaces(line);
    if (line == NULL || line[0] == '#' || line[0] == 0)
        continue;
    ++db->jobCount;
    job = jobFromLine(lf, line);
    slAddHead(&db->jobList, job);
    }
lineFileClose(&lf);
slReverse(&db->jobList);
return db;
}

void sendSetPriorityMessage(int priority)
/* Tell hub to change priority on batch */
{
struct dyString *dy = dyStringNew(1024);
char *result;
if ((priority < 1) || (priority > MAX_PRIORITY))
    errAbort("Priority %d out of range, should be 1 to %d",priority,MAX_PRIORITY);
dyStringPrintf(dy, "setPriority %s %s %d", getUser(), resultsName, priority);
result = hubSingleLineQuery(dy->string);
dyStringFree(&dy);
if (result == NULL || sameString(result, "0"))
    errAbort("Couldn't set priority for %s\n", batchDir);
freez(&result);
verbose(1, "Told hub to set priority %d\n",priority);
}


void paraPriority(char *val)
/* Tell hub to change priority on batch */
{
if (sameWord(val,"high"))
    priority = 1;
else if (sameWord(val,"medium"))
    priority = NORMAL_PRIORITY;
else if (sameWord(val,"low"))
    priority = NORMAL_PRIORITY * NORMAL_PRIORITY;
else
    priority = atoi(val);
sendSetPriorityMessage(priority);
}


void checkPrioritySetting()
/* see if we can and need to set the priority */
{
if (optionVal("pri",NULL)!=NULL)
    paraPriority(optionVal("pri","medium"));
if (optionVal("priority",NULL)!=NULL)
    paraPriority(optionVal("priority","medium"));
}   

void sendSetMaxJobMessage(int maxJob)
/* Tell hub to change maxJob on batch */
{
struct dyString *dy = dyStringNew(1024);
char *result;
if (maxJob <-1) 
    errAbort("maxJob %d out of range, should be >=-1", maxJob);
dyStringPrintf(dy, "setMaxJob %s %s %d", getUser(), resultsName, maxJob);
result = hubSingleLineQuery(dy->string);
dyStringFree(&dy);
if (result == NULL || sameString(result, "-2"))
    errAbort("Couldn't set maxJob %d for %s", maxJob, batchDir);
freez(&result);
verbose(1, "Told hub to set maxJob %d\n",maxJob);
}

void paraMaxJob(char *val)
/* Tell hub to change maxJob on batch */
{
if (sameWord(val,"unlimited"))
    maxJob = -1;
else
    maxJob = atoi(val);
sendSetMaxJobMessage(maxJob);
}

void checkMaxJobSetting()
/* see if we can and need to set maxJob */
{
if (optionVal("maxJob",NULL)!=NULL)
    paraMaxJob(optionVal("maxJob","unlimited"));
/* backwards compatibility */
if (optionVal("maxNode",NULL)!=NULL)
    paraMaxJob(optionVal("maxNode","unlimited"));
}   


void paraCreate(char *batch, char *jobList)
/* Create a batch database from a job list. */
{
char backup[PATH_LEN];
struct jobDb *db;

if (thisBatchRunning())
    errAbort("This batch is currently running.  Please para stop first.");
makeDir(batchDir);
makeDir(errDir);
db = parseJobList(jobList);

doChecks(db, "in");
safef(backup, sizeof(backup), "%s.bak", batch);
atomicWriteBatch(db, backup);
atomicWriteBatch(db, batch);
verbose(1, "%d jobs written to %s\n", db->jobCount, batch);
checkPrioritySetting();
checkMaxJobSetting();
}

void paraRecover(char *batch, char *jobList, char *newJobList)
/* Create a new job list from an existing list based on check outs that
 * failed. */
{
struct jobDb *db;
struct job *job;
struct hash *hash = newHash(19);
FILE *newFh = mustOpen(newJobList, "w");
db = parseJobList(jobList);

beginHappy();
for (job = db->jobList; job != NULL; job = job->next)
    {
    if (checkOneJob(job, "out", hash, NULL))
        fprintf(newFh, "%s\n", job->spec);
    }
endHappy();
if (ferror(newFh))
    errAbort("write failed: %s", newJobList);
carefulClose(&newFh);
jobDbFree(&db);
freeHash(&hash);
}

boolean submitJob(struct job *job)
/* Attempt to submit job. */
{
struct dyString *cmd = dyStringNew(1024);
struct submission *sub;
char *jobId = NULL;

dyStringPrintf(cmd, "addJob2 %s %s /dev/null /dev/null %s %f %lld %s",
               getUser(), jobCwd, resultsName, job->cpusUsed, job->ramUsed, job->command);
pmCheckCommandSize(cmd->string, cmd->stringSize);
jobId = hubSingleLineQuery(cmd->string);
if (sameString(jobId,"0"))
    {
    warn("addJob failed - if batch is bad, correct problem and run para clearSickNodes.");
    freez(&jobId);
    }
if (jobId != NULL)
    {
    AllocVar(sub);
    slAddHead(&job->submissionList, sub);
    job->submissionCount += 1;
    sub->submitTime = time(NULL);
    sub->host = cloneString("n/a");
    sub->id = jobId;
    sub->inQueue = TRUE;
    sub->errFile = cloneString("n/a");
    }
dyStringFree(&cmd);
return jobId != NULL;
}

boolean killJob(char *jobId)
/* Tell hub to kill a job.  Return TRUE on
 * success. */
{
char buf[256];
char *result = NULL;
boolean ok;
snprintf(buf, sizeof(buf), "removeJob %s", jobId);
result = hubSingleLineQuery(buf);
ok = (result != NULL && sameString(result, "ok"));
freez(&result);
return ok;
}


void statusOutputChanged()
/* Complain about status output format change and die. */
{
errAbort("\n%s output format changed, please update markQueuedJobs in para.c", 
	statusCommand);
}

int markQueuedJobs(struct jobDb *db)
/* Mark jobs that are queued up. Return total number of jobs in queue. */
{
struct hash *hash = newHash(max(12,digitsBaseTwo(db->jobCount)-3));

struct job *job;
struct submission *sub;
int queueSize = 0;

long killSeconds = killTime*60;
long warnSeconds = warnTime*60;
long duration;
time_t now = time(NULL);

long queryTime = clock1000();

/* Get job list from paraHub. */
struct dyString *dy = dyStringNew(1024);
dyStringPrintf(dy, "pstat2 %s %s", getUser(), resultsName);
struct slName *lineList = hubMultilineQuery(dy->string), *lineEl;
dyStringFree(&dy);
now = time(NULL);  /* need to refresh this after we get the info */

verbose(2, "pstat2 time: %.2f seconds\n", (clock1000() - queryTime) / 1000.0);

long hashTime = clock1000();

verbose(2, "submission hash size: %d\n", hash->size);

/* Make hash of submissions based on id and clear flags. */
for (job = db->jobList; job != NULL; job = job->next)
    {
    for (sub = job->submissionList; sub != NULL; sub = sub->next)
        {
	hashAdd(hash, sub->id, sub);
	sub->running = FALSE;
	sub->inQueue = FALSE;
	}
    }

verbose(2, "submission hash time: %.2f seconds\n", (clock1000() - hashTime) / 1000.0);

long pstatListTime = clock1000();

/* Read status output. */
for (lineEl = lineList; lineEl != NULL; lineEl = lineEl->next)
    {
    int wordCount;
    char *line = lineEl->name;
    char *row[6];
    if (startsWith("Total Jobs:", line))
	{
        wordCount = chopLine(line, row);
	queueSize = sqlSigned(row[2]);
	verbose(1, "%d jobs (including everybody's) in Parasol queue or running.\n", queueSize);
	continue;
	}
    if (startsWith("Sick Batch:", line))
	{
	sickBatch = TRUE;
	warn("%s", line);
	continue;
	}
    if (startsWith("Results Size:", line))
	{
        wordCount = chopLine(line, row);
	resultsSize = sqlLongLong(row[2]);
	continue;
	}
    wordCount = chopLine(line, row);
    if (wordCount < 6 && wordCount != 2)
	{
	warn("Expecting at least 6 words in pstat2 output,"
	     " found %d. paraHub and para out of sync.", wordCount);
	statusOutputChanged();
	}
    else
	{
	char *state = row[0], *jobId = row[1], *ticks = NULL, *host = NULL;
	time_t t = 0;
	if (wordCount > 2)
	    {
    	    ticks = row[4], host = row[5];
    	    t = atol(ticks);
	    }
	if ((sub = hashFindVal(hash, jobId)) != NULL)
	    {
	    if (sameString(state, "r"))
		{
		sub->running = TRUE;
		sub->startTime = t;
		if (sub->host)
		    freeMem(sub->host);
		sub->host = cloneString(host);
		duration = now - sub->startTime;
                if (duration < 0)
		    warn("Strange start time in jobId %s: %u (now=%u)", jobId, sub->startTime, (unsigned int) now);
                else
		    {
		    if (duration > killSeconds && killSeconds > 0)
			{
    			sub->hung = TRUE;
			sub->slow = FALSE;
			sub->endTime = now;
			killJob(sub->id);
                        verbose(1, "killed hung jobId: %s\n", sub->id);
			}
		    else if (duration > warnSeconds)
			sub->slow = TRUE;
		    }
		}
	    else
		{
		sub->inQueue = TRUE;
		}
	    }
	}
    }
verbose(2, "pstat list time: %.2f seconds\n", (clock1000() - pstatListTime) / 1000.0);

slFreeList(&lineList);

freeHash(&hash);
verbose(2, "markQueuedJobs time (includes pstat2, hash, list): %.2f seconds\n", (clock1000() - queryTime) / 1000.0);
return queueSize;
}

struct hash *hashResults(char *fileName)
/* Return hash of job results keyed on id as string. */
{
long time = clock1000();
struct hash *hash = newHash(0);
if (fileExists(fileName))
    {
    struct jobResult *el, *list = jobResultLoadAll(fileName, &bookMark, resultsSize);
    for (el = list; el != NULL; el = el->next)
	hashAdd(hash, el->jobId, el);
    }
verbose(2, "hashResults time: %.2f seconds\n", (clock1000() - time) / 1000.0);
return hash;
}

void freeResults(struct hash **pHash)
/* Free up results hash and elements in it. */
{
struct hash *hash = *pHash;
struct hashEl *list, *el;
struct jobResult *jr;
if ((hash = *pHash) != NULL)
    {
    list = hashElListHash(hash);
    for (el = list; el != NULL; el = el->next)
	{
	jr = el->val;
	jobResultFree(&jr);
	}
    hashElFreeList(&list);
    hashFree(pHash);
    }
}


boolean isStatusOk(int status)
/* Convert wait() return status to return value. */
{
return WIFEXITED(status) && (WEXITSTATUS(status) == 0);
}

double jrCpuTime(struct jobResult *jr)
/* Get CPU time in seconds for job. */
{
return 0.01 * (jr->usrTicks + jr->sysTicks);
}

double subRealTime(struct submission *sub)
/* Get real time in seconds for job. */
{
/* note sub->*Time are unsigned, so we need to convert to double
 * before subtracting or time moving backwards is not detected
 */
return ((double)sub->endTime) - ((double)sub->startTime);
}

void showSickNodes(boolean showSummary)
/* Tell hub to show sick nodes on batch. */
{
int count = 0;
struct dyString *dy = dyStringNew(1024);
dyStringPrintf(dy, "showSickNodes %s %s", getUser(), resultsName);
struct slName *lineList = hubMultilineQuery(dy->string), *lineEl;
for (lineEl = lineList; lineEl != NULL; lineEl = lineEl->next)
    {
    ++count;
    char *line = lineEl->name;
    /* In show summary mode, only print the last line, 
     * which contains the totals.  Only print this one
     * if there's more than one line (the total is greater than zero). */
    if (!showSummary || !(lineEl->next || count == 1))
	printf("%s\n", line);
    }
slFreeList(&lineList);
dyStringFree(&dy);
}


void markRunJobStatus(struct jobDb *db)
/* Mark jobs based on results output file. 
 * Returns hash of results. */
{
struct job *job;
struct submission *sub;
struct hash *checkHash = newHash(0);
struct hash *resultsHash = hashResults(resultsName);
long time = clock1000();

verbose(1, "Checking finished jobs\n");
beginHappy();
for (job=db->jobList; job != NULL; job = job->next)
    {
    if ((sub = job->submissionList) != NULL)
        {
	/* Look for hitherto unclassified jobs that are either running or
	 * possibly finished. */
	if (!sub->queueError && !sub->inQueue && !sub->crashed && 
		!sub->hung && !sub->running && !sub->ranOk)
	    {
	    struct jobResult *jr = hashFindVal(resultsHash, sub->id);
	    if (jr == NULL)
	        {
		if (!sickBatch)
    		    sub->trackingError = 3;
		}
	    else
	        {
		sub->startTime = jr->startTime;
		sub->endTime = jr->endTime;
		sub->cpuTime = jrCpuTime(jr);
		sub->status = jr->status;
		sub->gotStatus = TRUE;
		if (sub->host)
		    freeMem(sub->host);
		sub->host = cloneString(jr->host);
		sub->inQueue = FALSE;
		sub->running = FALSE;
		sub->hung = FALSE;
		sub->slow = FALSE;
		if (sub->errFile)
		    freeMem(sub->errFile);
		sub->errFile = cloneString(jr->errFile);
		if (isStatusOk(sub->status) && checkOneJob(job, "out", checkHash, stderr) == 0)
		    sub->ranOk = TRUE;
		else
		    sub->crashed = TRUE;
		}
	    }
	}
    }
endHappy();
freeHash(&checkHash);
freeResults(&resultsHash);
verbose(2, "markRunJobStatus time (includes hashResults): %.2f seconds\n", (clock1000() - time) / 1000.0);
}

boolean needsRerun(struct submission *sub)
/* Return TRUE if submission needs to be rerun. */
{
if (sub == NULL)
    return TRUE;
return sub->submitError || sub->queueError || sub->crashed || sub->trackingError || sub->hung;
}

struct jobDb *paraCycle(char *batch)
/* Cycle forward through batch.  Return database. */
{

struct jobDb *db = readBatch(batch);
struct job *job;
int queueSize;
int pushCount = 0, retryCount = 0;
int tryCount;
boolean finished = FALSE;
long time = clock1000();

queueSize = markQueuedJobs(db);

markRunJobStatus(db);

beginHappy();
if (!sickBatch)
    for (tryCount=1; tryCount<=retries && !finished; ++tryCount)
	{
	for (job = db->jobList; job != NULL; job = job->next)
	    {
	    if (job->submissionCount < tryCount && 
	       (job->submissionList == NULL || needsRerun(job->submissionList)))
		{
		if (!submitJob(job))
		    {
		    finished = TRUE;
		    break;
		    }
		occassionalDot();
		// occassionalSleep();
		if (delayTime > 0)
		    {
		    atomicWriteBatch(db, batch);
		    sleep(delayTime);
		    }
		++pushCount;
		if (tryCount > 1)
		    ++retryCount;
		if (pushCount >= maxPush)
		    {
		    finished = TRUE;
		    break;
		    }
		if (pushCount + queueSize >= maxQueue && pushCount >= minPush)
		    {
		    finished = TRUE;
		    break;
		    }
		}
	    }
	}
endHappy();
atomicWriteBatch(db, batch);
verbose(1, "updated job database on disk\n");
if (pushCount > 0)
    verbose(1, "Pushed Jobs: %d\n", pushCount);
if (retryCount > 0)
    verbose(1, "Retried jobs: %d\n", retryCount);
verbose(2, "paraCycle time: %.2f seconds\n", (clock1000() - time) / 1000.0);
return db;
}

void paraPush(char *batch)
/* Push forward batch one time. */
{
struct jobDb *db = paraCycle(batch);
jobDbFree(&db);
}


void clearSickNodes()
/* Tell hub to clear sick nodes on batch */
{
struct dyString *dy = dyStringNew(1024);
char *result;
dyStringPrintf(dy, "clearSickNodes %s %s", getUser(), resultsName);
result = hubSingleLineQuery(dy->string);
dyStringFree(&dy);
if (!sameString(result, "0"))
    errAbort("Couldn't clear sick nodes for %s", batchDir);
freez(&result);
verbose(1, "Told hub to clear sick nodes\n");
}


void paraShove(char *batch)
/* Push batch of jobs and keep pushing until it's finished, polling
 * parasol every 5 minutes. */
{
struct jobDb *db;
struct job *job;
struct submission *sub;
int maxSleep = 5*60;
int curSleep = 15;
time_t start = time(NULL), now;
int sickBatchTries = 0;
#define MAXSICKBATCHTRIES 3
#define SICKBATCHSLEEP 10*60

for (;;)
    {
    boolean anyUnfinished = FALSE;
    db = paraCycle(batch);
    for (job = db->jobList; job != NULL; job = job->next)
        {
	if ((sub = job->submissionList) == NULL)
	    anyUnfinished = TRUE;
	else
	    {
	    enum jaState state = figureState(job);
	    if (job->submissionCount >= retries)
	        {
		if (state != jaUnsubmitted && state != jaQueued && state != jaRunning)
		    errAbort("Batch failed after %d tries on %s", retries, job->command);
		}
	    if (state != jaFinished)
	        anyUnfinished = TRUE;
	    }
	}
    jobDbFree(&db);
    if (!anyUnfinished)
        break;
    showSickNodes(TRUE);
    fflush(stdout);
    fflush(stderr);
    if (sickBatch)
	{
	++sickBatchTries;
	if (sickBatchTries >= MAXSICKBATCHTRIES)
    	    break;
	warn("Sick batch! will sleep %d minutes, clear sick nodes and retry", SICKBATCHSLEEP/60);
	sleep(SICKBATCHSLEEP);
	sickBatch = FALSE;
        clearSickNodes(); 
	}
    else
	{
	sickBatchTries = 0;
	sleep(curSleep);
	if (curSleep < maxSleep)
	    curSleep += 15;
	}
    now = time(NULL);
    verbose(1, "================\n");
    verbose(1, "Checking job status %d minutes after launch\n",  round((now-start)/60.0));
    }
if (sickBatch)
    errAbort("Sick batch! Correct problem and then run para clearSickNodes.");
else
    verbose(1, "Successful batch!\n");
}

void paraMake(char *batch, char *spec)
/* Do a create and then a shove. */
{
paraCreate(batch, spec);
paraShove(batch);
}

void reportOnJobs(struct jobDb *db)
/* Report on status of jobs. */
{
int submitError = 0, inQueue = 0, queueError = 0, trackingError = 0, running = 0, crashed = 0,
    slow = 0, hung = 0, ranOk = 0, unsubmitted = 0, total = 0, failed = 0;
struct job *job;
struct submission *sub;

for (job = db->jobList; job != NULL; job = job->next)
    {
    if ((sub = job->submissionList) != NULL)	/* Get most recent submission if any. */
        {
	if (sub->submitError) ++submitError;
	if (sub->queueError) ++queueError;
	if (sub->trackingError) ++trackingError;
	if (sub->inQueue) ++inQueue;
	if (sub->crashed) ++crashed;
	if (sub->slow) ++slow;
	if (sub->hung) ++hung;
	if (sub->running) ++running;
	if (sub->ranOk) ++ranOk;
	if (job->submissionCount >= retries && needsRerun(sub))
	     ++failed;
	}
    else
        ++unsubmitted;
    ++total;
    }
if (unsubmitted > 0)
   printf("unsubmitted jobs: %d\n", unsubmitted);
if (submitError > 0)
   printf("submission errors: %d\n", submitError);
if (queueError > 0)
   printf("queue errors: %d\n", queueError);
if (trackingError > 0)
   {
   if (!fileExists(resultsName))
       printf("%s: file not found.  paraHub can't write to this dir?\n", resultsName);
   else
       printf("tracking errors: %d\n", trackingError);
   }
if (inQueue > 0)
   printf("queued and waiting: %d\n", inQueue);
if (crashed > 0)
   printf("crashed: %d\n", crashed);
if (slow > 0)
   printf("slow (> %d minutes): %d\n", warnTime, slow);
if (hung > 0)
   printf("hung (> %d minutes): %d\n", killTime, hung);
if (running > 0)
   printf("running: %d\n", running);
if (ranOk > 0)
   printf("ranOk: %d\n", ranOk);
if (failed > 0)
   printf("failed %d times: %d\n", retries, failed);
printf("total jobs in batch: %d\n", total);
}

void paraListFailed(char *batch)
/* List all jobs that failed. */
{
struct jobDb *db = readBatch(batch);
struct job *job;
struct submission *sub;

markQueuedJobs(db);
markRunJobStatus(db);
for (job = db->jobList; job != NULL; job = job->next)
    {
    sub = job->submissionList;
    if (sub != NULL)
        {
	if (job->submissionCount >= retries && needsRerun(sub))
	    printf("%s\n", job->spec);
	}
    }
}

void paraListState(char *batch, enum jaState targetState)
/* List all jobs that match target state. */
{
struct jobDb *db = readBatch(batch);
struct job *job;
markQueuedJobs(db);
markRunJobStatus(db);
for (job = db->jobList; job != NULL; job = job->next)
    {
    enum jaState state = figureState(job);
    if (state == targetState)
	printf("%s\n", job->spec);
    }
}

struct submission* getLastSubmission(struct job *job)
/* get the last submission for a job, or NULL if not submitted */
{
struct submission* sub = job->submissionList;
while ((sub != NULL) && (sub->next != NULL))
    sub = sub->next;
return sub;
}

/* header for job status output */
static char *jobStatusHdr =
    "#state\ttries\treal\tcpu\thost\tjobid\tcmd\n";

void paraJobStatus(struct job *job, time_t now)
/* Print status of a job. */
{
enum jaState state = figureState(job);
char *stateStr = jaStateShortDesc[state];
double realTime = 0.0;
double cpuTime = 0.0;
struct submission *sub = getLastSubmission(job);
char *jobId = "";
char *host = "";

if (sub != NULL)
    {
    if (sub->trackingError)
        stateStr = "track";
    if (state == jaRunning)
        realTime = (now - sub->startTime);
    else
        {
        realTime = subRealTime(sub);
        cpuTime = sub->cpuTime;
        jobId = sub->id;
        }
    host = sub->host;
    }

subChar(job->command, '\t', ' ');  /* tabs not allowed in column */

printf("%s\t%d\t%0.2f\t%0.2f\t%s\t%s\t%s\n",
       stateStr,
       job->submissionCount,
       realTime, cpuTime, host, jobId,
       job->command);
}

void paraStatus(char *batch)
/* Print status of all jobs. */
{
struct jobDb *db = readBatch(batch);
struct job *job;
time_t now = time(NULL);

markQueuedJobs(db);
markRunJobStatus(db);
printf("%s", jobStatusHdr);
for (job = db->jobList; job != NULL; job = job->next)
    paraJobStatus(job, now);
}

void printErrFile(struct submission *sub)
/* Print error file if it exists. */
{
char localName[PATH_LEN];
safef(localName, sizeof(localName), "%s/%s", errDir, sub->id);
if (!fileExists(localName))
    {
    pmFetchFile(sub->host, sub->errFile, localName);
    }
if (fileExists(localName))
    {
    char *buf;
    size_t size;
    printf("stderr:\n");
    readInGulp(localName, &buf, &size);
    mustWrite(stdout, buf, size);
    freez(&buf);
    }
}

void problemReport(struct job *job, struct submission *sub, char *type)
/* Print report on one problem. */
{
struct check *check;
struct hash *hash = newHash(0);

printf("job: %s\n", job->command);
printf("id: %s\n", sub->id);
printf("failure type: %s\n", type);
if (sameString(type, "crash"))
    {
    time_t startTime = sub->startTime;
    printf("host: %s\n", sub->host);
    printf("start time: %s", ctime(&startTime)); /* ctime adds \n */
    printf("return: ");
    if (WIFEXITED(sub->status))
	printf("%d\n", WEXITSTATUS(sub->status));
    else if (WIFSIGNALED(sub->status))
	printf("signal %d\n", WTERMSIG(sub->status));
    else if (WIFSTOPPED(sub->status))
	printf("stopped %d\n", WSTOPSIG(sub->status));
    else 
	printf("unknown wait status %d\n", sub->status);
    for (check = job->checkList; check != NULL; check = check->next)
	doOneCheck(check, hash, stdout);
    printErrFile(sub);
    }
printf("\n");
hashFree(&hash);
}

void paraProblems(char *batch)
/*  List jobs that had problems (even if successfully rerun).  Includes host info */
{
struct jobDb *db = readBatch(batch);
struct job *job;
struct submission *sub;
int problemCount = 0;

markQueuedJobs(db);
markRunJobStatus(db);
for (job = db->jobList; job != NULL; job = job->next)
    {
    for (sub = job->submissionList; sub != NULL; sub = sub->next)
	{
	if (sub->hung)
	    {
	    problemReport(job, sub, "hung");
	    ++problemCount;
	    }
	else if (sub->slow)
	    {
	    problemReport(job, sub, "slow");
	    ++problemCount;
	    }
	else if (sub->trackingError)
	    {
	    problemReport(job, sub, "tracking error");
	    ++problemCount;
	    }
	else if (needsRerun(sub))
	    {
	    problemReport(job, sub, "crash");
	    ++problemCount;
	    }
	}
    }
printf("%d problems total\n", problemCount);
}

void runningReport(struct job *job, struct submission *sub)
/* Print report on one running job. */
{
time_t startTime = sub->startTime;
int duration = time(NULL) - startTime;

printf("command: %s\n", job->command);
printf("jobId: %s\n", sub->id);
printf("host: %s\n", sub->host);
printf("start time: %s", ctime(&startTime)); /* ctime adds \n */
printf("run time so far: %d sec,  %4.2f min, %4.2f hours,  %4.2f days\n", 
	duration, duration/60.0, duration/3600.0,  duration/(3600.0*24.0));
if (sub->slow)
  printf("This is a slow job, running time > %d minutes.\n", warnTime);
printf("\n");
}

void paraRunning(char *batch, char *minTimeS)
/*  List jobs that are running.  Includes host and time info */
{
struct jobDb *db = readBatch(batch);
struct job *job;
struct submission *sub;
int runCount = 0;
int minTime = 0;

if (minTimeS)
    minTime = atoi(minTimeS);
markQueuedJobs(db);
markRunJobStatus(db);
for (job = db->jobList; job != NULL; job = job->next)
    {
    if ((sub = job->submissionList) != NULL && sub->running)
	{
	time_t startTime = sub->startTime;
	int duration = time(NULL) - startTime;
	if (duration > (minTime * 60))
	    {
	    runningReport(job, sub);
	    ++runCount;
	    }
	}
    }
printf("total %s running: %d\n", !minTimeS ? "jobs" : "hippos", runCount);
}


void sendChillMessage()
/* Tell hub to chill out on job */
{
struct dyString *dy = dyStringNew(1024);
char *result;
dyStringPrintf(dy, "chill %s %s", getUser(), resultsName);
result = hubSingleLineQuery(dy->string);
dyStringFree(&dy);
if (result == NULL || !sameString(result, "ok"))
    errAbort("Couldn't chill %s\n", batchDir);
freez(&result);
verbose(1, "Told hub to chill out\n");
}

void paraResetCounts()
/* Send msg to hub to reset done and crashed counts on batch */
{
struct dyString *dy = dyStringNew(1024);
char *result;
dyStringPrintf(dy, "resetCounts %s %s", getUser(), resultsName);
result = hubSingleLineQuery(dy->string);
dyStringFree(&dy);
if (result == NULL || sameString(result, "-2"))
    errAbort("Couldn't reset done and crashed counts on batch %s\n", batchDir);
freez(&result);
verbose(1, "Told hub to reset done and crashed counts on batch %s\n", batchDir);
}


void freeBatch()
/* Send msg to hub to reset done and crashed counts on batch */
{
struct dyString *dy = dyStringNew(1024);
char *result;
dyStringPrintf(dy, "freeBatch %s %s", getUser(), resultsName);
result = hubSingleLineQuery(dy->string);
dyStringFree(&dy);
verbose(1, "Told hub to free all batch-related resources\n");
if (result == NULL)
    errAbort("result == NULL");
if (sameOk(result, "-3"))
    errAbort("User not found.");
if (sameOk(result, "-2"))
    errAbort("Batch not found.");
if (sameOk(result, "-1"))
    warn("Unable to free batch.  Jobs are queued or running.");
if (sameOk(result, "0"))
    verbose(1, "Batch freed.\n");
freez(&result);
}


void flushResults()
/* Send msg to hub to flush results file */
{
struct dyString *dy = dyStringNew(1024);
char *result;
dyStringPrintf(dy, "flushResults %s %s", getUser(), resultsName);
result = hubSingleLineQuery(dy->string);
dyStringFree(&dy);
verbose(1, "Told hub to flush the results file\n");
if (result == NULL)
    errAbort("result == NULL");
if (sameOk(result, "-3"))
    errAbort("User not found.");
if (sameOk(result, "-2"))
    errAbort("Batch not found.");
if (sameOk(result, "-1"))
    warn("Flushed results. Some jobs are still queued or running.");
if (sameOk(result, "0"))
    verbose(1, "Flushed results.\n");
freez(&result);
}



void paraCheck(char *batch)
/* Check on progress of a batch. */
{
struct jobDb *db = readBatch(batch);

markQueuedJobs(db);
markRunJobStatus(db);
reportOnJobs(db);

atomicWriteBatch(db, batch);
showSickNodes(TRUE);
if (sickBatch)
    errAbort("Sick batch! Correct problem and then run para clearSickNodes.");
}



int cleanTrackingErrors(struct jobDb *db)
/* Remove submissions with tracking errors. 
 * Returns count of these submissions */
{
int count = 0;
struct job *job;
struct submission *sub;
for (job = db->jobList; job != NULL; job = job->next)
    {
    sub = job->submissionList;
    if (sub != NULL)
        {
	if (sub->trackingError)
	    {
	    job->submissionCount -= 1;
	    job->submissionList = sub->next;
	    ++count;
	    }
	}
    }
verbose(1, "Chilled %d jobs\n", count);
return count;
}

void removeChilledSubmissions(char *batch)
/* Remove submissions from job database if we
 * asked them to chill out. */
{
struct jobDb *db = readBatch(batch);

markQueuedJobs(db);
markRunJobStatus(db);
(void) cleanTrackingErrors(db);  // ignore returned chillCount
atomicWriteBatch(db, batch);
}

void paraChill(char *batch)
/*  Tells system to not launch more jobs in this batch, but
 *  does not stop jobs that are already running.\n */
{
sendChillMessage();
removeChilledSubmissions(batch);
}

void paraStopAll(char *batch)
/* Stop batch of jobs. */
{
struct jobDb *db = readBatch(batch);
struct job *job;
struct submission *sub;
int killCount = 0, missCount = 0;

markQueuedJobs(db);
markRunJobStatus(db);
cleanTrackingErrors(db);
for (job = db->jobList; job != NULL; job = job->next)
    {
    sub = job->submissionList;
    if (sub != NULL)
        {
	if (sub->inQueue || sub->running)
	    {
	    if (killJob(sub->id))
		{
		job->submissionCount -= 1;
		job->submissionList = sub->next;
		killCount += 1;
		}
	    else
	        {
		missCount += 1;
		}
	    }
	}
    }
verbose(1, "%d running jobs stopped\n", killCount);
if (missCount > 0)
    printf("%d jobs not stopped - try another para stop in a little while\n",
    	missCount);
atomicWriteBatch(db, batch);
}

void paraStop(char *batch)
/* Stop batch of jobs. */
{
sendChillMessage();	/* Remove waiting jobs first, it's faster. */
paraStopAll(batch);
}

void printTimes(char *title, double seconds,  boolean showYears)
/* Print out times in seconds, hours, days, maybe years. */
{
printf("%-27s %9llds %10.2fm %8.2fh %7.2fd", 
   title, roundll(seconds), seconds/60, seconds/3600, seconds/(3600*24));
if (showYears)
     printf(" %6.3f y", seconds/(3600*24*365));
printf("\n");
}

long calcFirstToLast(struct jobDb *db)
/* Calculate time between first submission and last job finish. */
{
boolean first = TRUE;
struct job *job;
struct submission *sub;
long now = time(NULL);
long subTime, firstSub = BIGNUM, endTime, lastEnd = 0;
boolean gotEnd = FALSE;

for (job = db->jobList; job != NULL; job = job->next)
    {
    if ((sub = job->submissionList) != NULL)
        {
	subTime = sub->submitTime;
	if (subTime != 0 && subTime < now)	/* Protect against wacked out clocks. */
	    {
	    if (first)
		{
		firstSub = subTime;
		first = FALSE;
		}
	    else
		{
		if (subTime < firstSub) 
		    firstSub = subTime;
		}
	    }
	if (sub->endTime != 0)
	    {
	    endTime = sub->endTime;
	    if (endTime < now)	/* Protect against wacked out clocks. */
		{
		if (endTime > lastEnd) 
		    {
		    lastEnd = endTime;
		    gotEnd = TRUE;
		    }
		}
	    }
	}
    }
if (gotEnd)
    return lastEnd - firstSub;
else
    return now - firstSub;
}


void paraRam(char *batch, char *val)
/* set batch ram = val */
{
long long newRam = paraParseRam(val);
if (newRam == -1)
    usage();
struct jobDb *db = readBatch(batch);
struct job *job;
for (job = db->jobList; job != NULL; job = job->next)
    {
    job->ramUsed = newRam;
    }
atomicWriteBatch(db, batch);
}

void paraCpu(char *batch, char *val)
/* set batch cpu = val */
{
float newCpus = sqlFloat(val);
if (newCpus < 0)
    usage();
struct jobDb *db = readBatch(batch);
struct job *job;
for (job = db->jobList; job != NULL; job = job->next)
    {
    job->cpusUsed = newCpus;
    }
atomicWriteBatch(db, batch);
}

void paraTimes(char *batch)
/* Report times of run. */
{
struct jobDb *db = readBatch(batch);
double totalCpu = 0, totalWall = 0;
double oneWall, longestWall = 0;
struct job *job;
struct submission *sub;
int jobCount = 0;
int runningCount = 0;
int timedCount = 0;
int crashCount = 0;
int queueCount = 0;
double runTime = 0, longestRun = 0;
int otherCount = 0;
long now = time(NULL);
double ioTime;

markQueuedJobs(db);
markRunJobStatus(db);
for (job = db->jobList; job != NULL; job = job->next)
    {
    ++jobCount;
    if ((sub = job->submissionList) != NULL)
        {
	if (sub->running)
	   {
	   int oneTime = now - sub->startTime;
	   if (oneTime < 0)
	       warn("Strange start time in %s: %u", batch, sub->startTime);
	   else
	       runTime += oneTime;
	   if (oneTime > longestRun)
	     longestRun = oneTime;
	   ++runningCount;
	   }
	else if (sub->inQueue)
	   {
	   ++queueCount;
	   }
	else  if (sub->crashed || sub->hung)
	   {
	   ++crashCount;
	   }
	else if (sub->ranOk)
	   {
	   ++timedCount;
	   totalCpu += sub->cpuTime;
	   oneWall = subRealTime(sub);
	   if (oneWall < 0)	/* Protect against clock reset. */
	       {
	       warn("End before start job %s host %s", sub->id, sub->host);
	       warn("Start %u,  End %u", sub->startTime, sub->endTime);
	       oneWall = totalCpu;	
	       }
	   totalWall += oneWall;
	   if (oneWall > longestWall) 
	       {
	       longestWall = oneWall;
	       }
	   }
       else
	   {
	   ++otherCount;
	   }
	}
    }

printf("Completed: %d of %d jobs\n", timedCount, jobCount);
if (runningCount > 0)
    printf("Jobs currently running: %d\n", runningCount);
if (crashCount > 0)
    printf("Crashed: %d jobs\n", crashCount);
if (otherCount > 0)
    {
    if (!fileExists(resultsName))
	printf("%s: file not found.  paraHub can't write to this dir?\n", resultsName);
    else
	printf("Other count: %d jobs\n", otherCount);
    }
if (queueCount > 0)
    printf("In queue waiting: %d jobs\n", queueCount);
printTimes("CPU time in finished jobs:", totalCpu, TRUE);
ioTime = totalWall - totalCpu;
if (ioTime < 0) ioTime = 0;
printTimes("IO & Wait Time:", ioTime, TRUE);
if (runningCount > 0)
    {
    printTimes("Time in running jobs:", runTime, TRUE);
    }
if (timedCount > 0)
    {
    printTimes("Average job time:", totalWall/timedCount, FALSE);
    if (runningCount > 0)
        printTimes("Longest running job:", longestRun, FALSE);
    printTimes("Longest finished job:", longestWall, FALSE);
    printTimes("Submission to last job:", calcFirstToLast(db), FALSE);
    if (runningCount < 1)
        printTimes("Estimated complete:", 0.0, FALSE);
    else
        {
        int jobsInBatch = db->jobCount;
        int jobsToRun = jobsInBatch - timedCount;
        double timeMultiple = (double)jobsToRun / (double)runningCount;
        verbose(2, "inBatch: %d, toRun: %d, multiple: %.3f\n",
		jobsInBatch, jobsToRun, timeMultiple);
        printTimes("Estimated complete:",
                   timeMultiple*totalWall/timedCount, FALSE);
	}
    }
atomicWriteBatch(db, batch);
}

static char *determineCwdOptDir(char *optName)
/* determine batch or job directory from cwd and option */
{
char path[PATH_LEN];
char *opt = optionVal(optName, NULL);
if (opt == NULL)
    safecpy(path, sizeof(path), getCurrentDir());
else if (opt[0] != '/')
    {
    safecpy(path, sizeof(path), getCurrentDir());
    safecat(path, sizeof(path), "/");
    safecat(path, sizeof(path), opt);
    }
else
    safecpy(path, sizeof(path), opt);
// remove trailing "/" or "/." from name.  While the whole path should be normalized,
// this at least removes a common difference when using file name completion.
if (endsWith(path, "/."))
    path[strlen(path)-2] = '\0';
if (endsWith(path, "/"))
    path[strlen(path)-1] = '\0';

return cloneString(path);
}

int main(int argc, char *argv[])
/* Process command line. */
{
char *command;
char batch[PATH_LEN];
long startTime = clock1000();

optionInit(&argc, argv, optionSpecs);
if (argc < 2)
    usage();
retries = optionInt("retries",  retries);
maxQueue = optionInt("maxQueue",  maxQueue);
minPush = optionInt("minPush",  minPush);
maxPush = optionInt("maxPush",  maxPush);
warnTime = optionInt("warnTime", warnTime);
killTime = optionInt("killTime", killTime);
delayTime = optionInt("delayTime", delayTime);
if (optionExists("eta"))
    fprintf(stderr, "note: the -eta option is no longer required\n");
cpuUsage = optionFloat("cpu", cpuUsage);
if (cpuUsage < 0)
    usage();
ramUsage = paraParseRam(optionVal("ram","0"));
if (ramUsage == -1)
    usage();
batchDir = determineCwdOptDir("batch");
jobCwd = determineCwdOptDir("jobCwd");
safef(resultsName, sizeof(resultsName), "%s/%s", batchDir, resultsFileName);
safef(errDir, sizeof(errDir), "%s/err", batchDir);
safef(bookMarkName, sizeof(bookMarkName), "%s/%s", batchDir, bookMarkFileName);

command = argv[1];
safef(batch, sizeof(batch), "%s/batch", batchDir);

if (!sameWord(command,"create") && !sameWord(command,"make"))
    {
    checkPrioritySetting();
    checkMaxJobSetting();
    }

pushWarnHandler(paraVaWarn);

readBookMark();  /* read the para.bookmark file to initialize bookmark */

if (sameWord(command, "create") || sameWord(command, "creat"))
    {
    if (argc != 3)
        usage();
    paraCreate(batch, argv[2]);
    }
else if (sameWord(command, "recover"))
    {
    if (argc != 4)
        usage();
    paraRecover(batch, argv[2], argv[3]);
    }
else if (sameWord(command, "check"))
    {
    paraCheck(batch);
    }
else if (sameWord(command, "push"))
    {
    paraPush(batch);
    }
else if (sameWord(command, "shove"))
    {
    paraShove(batch);
    }
else if (sameWord(command, "make"))
    {
    if (argc != 3)
        usage();
    paraMake(batch, argv[2]);
    }
else if (sameWord(command, "try"))
    {
    maxPush = 10;
    paraPush(batch);
    }
else if (sameWord(command, "stop"))
    {
    paraStop(batch);
    }
else if (sameWord(command, "chill"))
    {
    paraChill(batch);
    }
else if (sameWord(command, "hung"))
    {
    paraListState(batch, jaHung);
    }
else if (sameWord(command, "slow"))
    {
    char temp[256];
    safef(temp, sizeof(temp), "%d", warnTime);
    paraRunning(batch, temp);
    }
else if (sameWord(command, "crashed"))
    {
    paraListState(batch, jaCrashed);
    }
else if (sameWord(command, "failed"))
    {
    paraListFailed(batch);
    }
else if (sameWord(command, "finished"))
    {
    paraListState(batch, jaFinished);
    }
else if (sameWord(command, "status"))
    {
    paraStatus(batch);
    }
else if (sameWord(command, "problems") || sameWord(command, "problem"))
    {
    paraProblems(batch);
    }
else if (sameWord(command, "running") || sameWord(command, "hippos") || sameWord(command, "hippo"))
    {
    paraRunning(batch, argv[2]);
    }
else if (sameWord(command, "time") || sameWord(command, "times"))
    {
    paraTimes(batch);
    }
else if (sameWord(command, "priority"))
    {
    if (argc != 3)
        usage();
    paraPriority(argv[2]);
    }
else if (sameWord(command, "maxJob") || sameWord(command, "maxNode"))
    {
    if (argc != 3)
        usage();
    /* backwards compatibility */
    if (sameWord(command, "maxNode"))
	warn("maxNode deprecated, use maxJob");
    paraMaxJob(argv[2]);
    }
else if (sameWord(command, "ram"))
    {
    if (argc != 3)
        usage();
    paraRam(batch, argv[2]);
    }
else if (sameWord(command, "cpu"))
    {
    if (argc != 3)
        usage();
    paraCpu(batch, argv[2]);
    }
else if (sameWord(command, "resetCounts"))
    {
    if (argc != 2)
        usage();
    paraResetCounts();
    }
else if (sameWord(command, "freeBatch"))
    {
    if (argc != 2)
        usage();
    freeBatch();
    }
else if (sameWord(command, "flushResults"))
    {
    if (argc != 2)
        usage();
    flushResults();
    }
else if (sameWord(command, "clearSickNodes"))
    {
    clearSickNodes();
    }
else if (sameWord(command, "showSickNodes"))
    {
    showSickNodes(FALSE);
    }
else
    {
    errAbort("Unrecognized command '%s'.  Run para with no arguments for usage summary", 
    	command);
    }

verbose(2, "Total para time: %.2f seconds\n", (clock1000() - startTime) / 1000.0);

return 0;
}

