/* paraRun - stuff to help manage para operations on collections.  */
/* The overall design of this system involves a number of threads,
 * all synchronized on synQueues. The threads come in three sorts,
 * a manager, workers, and customers.  The customers send a
 * run-request message to the manager. This message includes a
 * description of a collection and the job to be run on each
 * item in the collection. The manager breaks up the job into
 * bundles which typically contain more than a single item, and
 * assigns bundles to workers. The workers send a bundle-done message
 * containing the bundle and information on the time they spent on
 * it back to the manager when they are done. When the job is finished
 * the manager sends a run done message back to the client.
 *
 * There are three types of message queues:
 *     manager's queue:
 *        reader: manager
 *        writers: workers (bundle message)
 *                 customers (run message)
 *     workers's queue:
 *        reader: workers
 *        writer: manager (bundle message)
 *     customer's queue:
 *        reader: customer
 *        writer: manager (run message)
 * Each of the three types of threads will wait on their respective
 * queues.  The customer queue is transient though.  It only exists
 * for the single run done message.
 *
 * A wrinkle in all of this is that the worker threads can become
 * customers when they encounter another para statement during the
 * course of their execution. To keep the CPUs occupied, the manager
 * will add a new worker thread to the workers queue when a customer
 * comes in, which corresponds to the CPU that the customer used to
 * be using. When a worker comes in with the final bundle to finish
 * the customer request, the manager removes that worker thread.
 * The removed threads aren't actually destroyed, but put in a
 * free thread pool, since creating threads does actually take a 
 * little time. */

#include <sys/time.h>
#include "common.h"
#include "hash.h"
#include "dlist.h"
#include "../compiler/pfPreamble.h"
#include "pthreadWrap.h"
#include "synQueue.h"
#include "cacheQueue.h"

enum messageType
/* Different message types. */
    {
    mRun = 111,
    mBundle = 222,
    };

enum collectionType
/* Different types of collections. */
    {
    cDir,
    cArray,
    cRange,
    };

struct messageHeader
/* Shared start of message */
    {
    enum messageType type;
    };

struct paraRun
/* Information on a job to do to all items in a collection, and
 * some stuff to help us manage the work. */
    {
    enum messageType messageType;	/* Always mRun */
    enum paraRunType prtType;		/* do/get/filter etc. */
    long itemCount;			/* Element count */
    int itemSize;				/* Size each element */
    double totalRunTime;	/* Total job run time (finished only). */
    long itemsSubmitted;	/* Number of items submitted to threads. */
    long itemsFinished;		/* Number of items finished by threads. */
    struct synQueue *customerQueue;	/* Where customer waits. */
    void *localVars;		/* Local variables in caller. */
    int localSize;		/* Size of local vars. */
    void (*process)(_pf_Stack *stack, char *key, void *item, void *localVars);  
    	/* A function that processes one item. */
    struct dlNode *node;	/* Node in managers activeRun list */
    enum collectionType collectionType;	   /* Collection type. */

    /* The rest of this depends on the type.  A union here would be
     * slightly more efficienct in terms of memory, but not enough to
     * justify the increased lines of code. */
    struct _pf_dir *dir;			/* Dir for dirs. */
    struct hashCookie hashCookie;		/* Position in dir */
    struct _pf_array *array;			/* Array for arrays */
    long arrayIx;				/* Position in array */
    long rangeStart;				/* Range info */
    long rangeEnd;				/*   "    "   */
    long rangeIx;				/* Position in range */

    /* Output of run may go here. */
    struct _pf_type *expType;		/* Type returned by process */
    union _pf_stack result;
    };

struct jobBundle
/* A bundle of jobs that can be taken by our worker threads. */
    {
    enum messageType messageType;	/* Always mBundle */
    enum paraRunType prtType;		/* do/get/filter etc. */
    int itemCount;	/* Count of items. */
    int itemSize;	/* Size of each item. */
    void *items;	/* An array of items to process. */
    void *itemsToFree;	/* For locally allocated items, we'll free these */
    char **keyStrings;	/* Keys if a string */
    char *keyAsIx;	/* Keys if an index. */
    void *localVars;	/* Local variables in caller. */
    int localSize;	/* Size of local vars. */
    void *localOrig;	/* Original copy of local vars. */
    void (*process)(_pf_Stack *stack, char *key, void *item, void *localVars);  
    	/* A function that processes one item. */
    struct timeval startTime;	/* Start time (wall clock) */
    struct timeval endTime;		/* End time (wall clock). */
    time_t startCpu;		/* Start CPU time. */
    time_t endCpu;		/* End CPU time. */
    struct worker *worker;	/* The worker on this one. */
    struct paraRun *run;	/* Run this is part of. */

    /* Output of run may go here. */
    enum _pf_single_type stResult;
    union _pf_stack singleResult;
    void *multiResult;
    };

struct worker
/* Information on one worker. */
    {
    struct synQueue *queue;	/* Worker waits on this. */
    pthread_t thread;	/* Worker's thread. */
    struct dlNode *node;	/* Node in double linked list. */
    _pf_Stack stack[16*1024];   /* Expression stack. */
    };
static struct worker *workerNew();

struct manager
/* Information on the manager. */ 
    {
    struct synQueue *queue;	/* Can contain two types of message */
    pthread_t thread;	/* Manager's thread. */
    struct dlList *activeWorkers;	/* Workers in action. */
    struct dlList *readyWorkers; 	/* Workers wanting a job. */
    struct dlList *reserveWorkers;	/* Workers in reserve. */
    struct dlList *activeRuns;		/* Currently active runs. */
    int cpuCount;
    };
static struct manager *managerNew(int cpuCount);

static struct manager *theManager;
static int paraCpuCount = 2;
// static int paraCpuCount = 1;


void _pf_paraRunInit()
/* Initialize job scheduler. */
{
/* TODO - figure out actual CPU count. */
theManager = managerNew(paraCpuCount);
}

void jobBundleRun(struct jobBundle *job, _pf_Stack *stack)
/* Run job on all items. */
{
char *item = job->items;
int i;
job->startCpu = clock();
gettimeofday(&job->startTime, NULL);
for (i=0; i<job->itemCount; ++i)
    {
    char *key;
    if (job->keyStrings)
        key = job->keyStrings[i];
    else
        key = job->keyAsIx + i;
    if (job->localVars)
        memcpy(job->localVars, job->localOrig, job->localSize);
    job->process(stack, key, item, job->localVars);
    item += job->itemSize;
    }
gettimeofday(&job->endTime, NULL);
job->endCpu = clock();
}

static void *workWork(void *v)
/* Work away at queue endlessly. */
{
struct worker *w = v;
for (;;)
    {
    struct jobBundle *job = synQueueGet(w->queue);
    jobBundleRun(job, w->stack);
    synQueuePut(theManager->queue, job);
    }
}

struct worker *workerNew()
/* Create a new worker */
{
struct worker *worker;
AllocVar(worker);
worker->queue = synQueueNew();
pthreadCreate(&worker->thread, NULL, workWork, worker);
AllocVar(worker->node);
worker->node->val = worker;
return worker;
}


void getDirJobs(struct _pf_dir *dir, struct hashCookie *cookie, int size,
	struct jobBundle *job)
/* Allocate an array of size, and fill it with next bunch of items from
 * hash. */
{
int i;
void **itemBuf = needLargeMem(size*sizeof(void*));
char **keyBuf = needLargeMem(size*sizeof(char *));
job->keyStrings = keyBuf;
job->items = job->itemsToFree = itemBuf;
for (i=0; i<size; ++i)
    {
    struct hashEl *hel = hashNext(cookie);
    *itemBuf++ = &hel->val;
    *keyBuf++ = hel->name;
    }
}

struct jobBundle *paraRunGetNextJob(struct paraRun *run, struct worker *worker)
/* Make up next job structure looking at average run times and
 * available threads */
{
struct jobBundle *job;
long itemsInBundle;
long itemsLeft = run->itemCount - run->itemsSubmitted;

if (itemsLeft == 0)
    return NULL;
if (run->itemsFinished == 0)
    {
    // uglyf("paraRunGetNextJob no one finished yet.\n");
    itemsInBundle = run->itemCount/(paraCpuCount*10);
    }
else if (run->totalRunTime > 0.0001)
    {
    // uglyf("total run time is %f, aveTime %f\n", run->totalRunTime, run->totalRunTime/run->itemsFinished);
    itemsInBundle = 0.005*run->itemsFinished/run->totalRunTime;
    }
else
    {
    // uglyf("Short items, assigning a bunch\n");
    itemsInBundle = run->itemCount/paraCpuCount;
    }
if (itemsInBundle < 1)
    itemsInBundle = 1;
if (itemsInBundle > itemsLeft)
    itemsInBundle = itemsLeft;

// uglyf("paraRunGetNextJob itemsInBundle %ld, itemsLeft %ld\n", itemsInBundle, itemsLeft);
AllocVar(job);
job->messageType = mBundle;
job->itemCount = itemsInBundle;
job->itemSize = run->itemSize;
if (run->localVars)
    {
    job->localOrig = run->localVars;
    job->localSize = run->localSize;
    job->localVars = needMem(run->localSize);
    }
job->process = run->process;
job->worker = worker;
job->run = run;
switch (run->collectionType)
    {
    case cDir:
	getDirJobs(run->dir, &run->hashCookie, itemsInBundle, job);
        break;
    case cArray:
	job->items = run->array->elements + run->itemSize*run->arrayIx;
	job->keyAsIx += run->arrayIx; 
	run->arrayIx += itemsInBundle;
        break;
    case cRange:
	{
	_pf_Long i, *pt, end;
	job->items = job->itemsToFree = pt = 
		needLargeMem(itemsInBundle*sizeof(_pf_Long));
	job->keyAsIx += run->rangeIx;
	end = run->rangeIx + itemsInBundle;
	for (i=run->rangeIx; i<end; ++i)
	    *pt++ = i;
	run->rangeIx = end;
        break;
	}
    }
run->itemsSubmitted += itemsInBundle;
return job;
}

static double tvToDouble(struct timeval *tv)
/* Return timeval in seconds as a double-precision floating point number. */
{
double res = tv->tv_usec * 0.000001;
res += tv->tv_sec;
return res;
}

void jobBundleFree(struct jobBundle **pJob)
/* Free up a job. */
{
struct jobBundle *job = *pJob;
if (job != NULL)
    {
    freeMem(job->itemsToFree);
    freeMem(job->keyStrings);
    freeMem(job->localVars);
    freez(pJob);
    }
}

static void scheduleRun(struct manager *manager, struct paraRun *run)
/* Add run to active list. Pull processor off of reserve list to
 * start it off.  Also grab any workers on the ready list. */
{
struct dlNode *workerNode;
struct worker *worker;
struct jobBundle *job;

/* Set up job on active run list. */
AllocVar(run->node);
run->node->val = run;
dlAddTail(manager->activeRuns, run->node);

workerNode = dlPopHead(manager->reserveWorkers);
if (workerNode == NULL)
     {
     worker = workerNew();
     workerNode = worker->node;
     }
else
    worker = workerNode->val;
dlAddTail(manager->activeWorkers, workerNode);
job = paraRunGetNextJob(run, worker);
synQueuePut(worker->queue, job);

/* Loop through putting any other ready workers on the job. */
while ((workerNode = dlPopHead(manager->readyWorkers)) != NULL)
    {
    worker = workerNode->val;
    job = paraRunGetNextJob(run, worker);
    if (job)
	{
	dlAddTail(manager->activeWorkers, workerNode);
	synQueuePut(worker->queue, job);
	}
    else
        {
	dlAddHead(manager->readyWorkers, workerNode);
	break;
	}
    }
}

void foldBundleIntoRun(struct paraRun *run, struct jobBundle *job)
/* Add information from bundle into run. */
{
double dt = tvToDouble(&job->endTime) - tvToDouble(&job->startTime);
if (dt < 0)
    {	/* We wrapped - it's a new day! */
    dt += 24*3600;
    if (dt > 1000 || dt < 0)  /* Sanity check. */
	dt = 0.01;	      /* Default to our "optimal" time */
    }
run->totalRunTime += dt;
run->itemsFinished += job->itemCount;
}

static void checkWork(struct manager *manager, struct jobBundle *job)
/* See if it's the last job in the bundle.  If so then move the
 * worker to reserve, and report job done to customer.  Otherwise
 * do more work on this run if there is work left to start. Otherwise
 * see if there's another active run worker can go to. Otherwise put
 * worker on ready list. */
{
struct paraRun *run = job->run;
struct worker *worker = job->worker;

// uglyf("checkWork %d, %d\n", job->messageType, job->itemCount);
foldBundleIntoRun(run, job);
jobBundleFree(&job);
if (run->itemsFinished == run->itemCount)
    {
    // uglyf("finished one run of %d\n", (int)run->itemCount);
    dlRemove(run->node);	/* Remove node from active run list */
    freez(&run->node);
    synQueuePut(run->customerQueue, run);
    dlRemove(worker->node);
    dlAddTail(manager->reserveWorkers, worker->node);
    }
else if (run->itemsSubmitted < run->itemCount)
    {
    // uglyf("Submitted %ld of %ld, putting worker back on same run\n", run->itemsSubmitted, run->itemCount);
    job = paraRunGetNextJob(run, worker);
    synQueuePut(worker->queue, job);
    }
else
    {
    struct dlNode *runNode;
    struct paraRun *newRun = NULL;
    for (runNode = manager->activeRuns->head; !dlEnd(runNode); 
    	runNode = runNode->next)
        {
	if (runNode->val != run)
	    {
	    newRun = runNode->val;
	    break;
	    }
	}
    if (newRun)
        {
	// uglyf("Putting worker on another run\n");
	job = paraRunGetNextJob(newRun, worker);
	synQueuePut(worker->queue, job);
	}
    else
        {
	// uglyf("Idling worker %p\n", worker);
	dlRemove(worker->node);
	dlAddTail(manager->readyWorkers, worker->node);
	}
    }
}

static void *manageManage(void *v)
/* Manage away on our message queue. */
{
struct manager *manager = v;
for (;;)
    {
    enum messageType type;
    struct messageHeader *message = synQueueGet(manager->queue);
    // uglyf("Manager got message %d\n", (int)message->type);
    if (message->type == mRun)
	scheduleRun(manager, (struct paraRun *)message);
    else
        checkWork(manager, (struct jobBundle *)message);
    }
}

static struct manager *managerNew(int cpuCount)
/* Create a new manager for a number of CPUs. */
{
struct manager *manager;
struct worker *worker;
int i;
AllocVar(manager);
manager->queue = synQueueNew();
manager->activeRuns = dlListNew();
manager->activeWorkers = dlListNew();
manager->readyWorkers = dlListNew();
manager->reserveWorkers = dlListNew();
manager->cpuCount = cpuCount;

/* Make one less worker ready than we have CPUs.  This is
 * because one CPU is already occupied.  We'll get it back
 * when it submits a batch of jobs to us though... */
for (i=1; i<cpuCount; ++i)
    {
    worker = workerNew();
    dlAddTail(manager->readyWorkers, worker->node);
    }
pthreadCreate(&manager->thread, NULL, manageManage, manager);
return manager;
}

static void *paraDoRun(struct paraRun *run)
/* Send run to manager and wait for results. */
{
void *result = NULL;
synQueuePut(theManager->queue, run);
synQueueGet(run->customerQueue);
return result;
}

static void paraRunFree(struct paraRun **pRun)
/* Free up a paraRun. */
{
struct paraRun *run = *pRun;
if (run != NULL)
    {
    cacheQueueFree(&run->customerQueue);
    freez(pRun);
    }
}

static struct paraRun *paraRunNew(void *localVars, int localSize,
    void (*process)(_pf_Stack *stack, char *key, void *item, void *localVars),
    enum paraRunType prtType, int expTypeId)
/* Set up shared parts of paraRun structure. */
{
struct paraRun *run;
AllocVar(run);
run->messageType = mRun;
run->prtType = prtType;
run->customerQueue = cacheQueueAlloc();
run->localVars = localVars;
run->localSize = localSize;
run->process = process;
run->expType = _pf_type_table[expTypeId];
return run;
}

void *_pf_paraRunArray(struct _pf_array *array, void *localVars, int localSize,
    void (*process)(_pf_Stack *stack, char *key, void *item, void *localVars),
    enum paraRunType prtType, int expTypeId)
/* Build up run structure on array. */
{
struct paraRun *run = paraRunNew(localVars, localSize,
	process, prtType, expTypeId);
void *result;
run->collectionType = cArray;
run->itemCount = array->size;
run->itemSize = array->elSize;
run->array = array;
result = paraDoRun(run);
paraRunFree(&run);
return result;
}

void *_pf_paraRunDir(struct _pf_dir *dir, void *localVars, int localSize,
    void (*process)(_pf_Stack *stack, char *key, void *item, void *localVars),
    enum paraRunType prtType, int expTypeId)
/* Run process on each item in dir. */
{
struct paraRun *run = paraRunNew(localVars, localSize,
	process, prtType, expTypeId);
void *result;
run->collectionType = cDir;
run->itemCount = dir->hash->elCount;
run->itemSize = sizeof(void*);
run->dir = dir;
run->hashCookie = hashFirst(dir->hash);
result = paraDoRun(run);
paraRunFree(&run);
return result;
}

void *_pf_paraRunRange(long start, long end, void *localVars, int localSize,
    void (*process)(_pf_Stack *stack, char *key, void *item, void *localVars),
    enum paraRunType prtType, int expTypeId)
/* Build up run structure on range. */
{
struct paraRun *run = paraRunNew(localVars, localSize,
	process, prtType, expTypeId);
void *result;
run->collectionType = cRange;
run->itemCount = end - start;
run->itemSize = sizeof(_pf_Long);
run->rangeIx = run->rangeStart = start;
run->rangeEnd = end;
result = paraDoRun(run);
paraRunFree(&run);
return result;
}

