# Purpose: New ParExecute using Darwin IPC
# Author:  Lukas Knecht
# Created:  1 Nov 1993
#

Debug := false:

# To obtain debugging information, set the global variable Debug := true;
DevNull := If( Debug=true, '>>~cbrg/debug/cbrg.trace', '>/dev/null' );

Machine := proc( Name:string, User:string, Class:integer,
	Processes:list(Process), MaxProcesses:posint,
	LoginControl:boolean, OffHours:integer..integer,
	LoadRange:numeric..numeric, ForcedRun:boolean,
	NiceValue:integer, StartCycle:numeric,
	DownCount:integer, LastProcess:integer )

  if nargs=13 then return(noeval(Machine(args)))
  elif nargs = 1 and type(args[1], string) then
    m := args[1];
    u := '';
    p := CaseSearchString('@', m);
    if p > 0 then
      u := m[1..p+1];
      m := m[p+2..-1]
    fi;
    e := length(m);
    # Check :Class
    c := 0;
    p := CaseSearchString(':', m);
    if p > 0 then c := atoi(m[p+2..-1]); e := p fi;
    # Check /MaxProcesses
    maxp := 1;
    p := CaseSearchString('/', m);
    if p > 0 then maxp := atoi(m[p+2..-1]); e := min(e, p) fi;
    return(noeval(Machine(m[1..e], u, c, [], maxp, false, 0..0,
			  maxp-0.3..maxp+1, false, 19, DBL_MAX, 0, 0)))
  fi;
  error('Invalid Machine format')
end:

Machine_type := noeval(structure(anything,Machine)):

Machine_select := proc(m, sel, val)
  i := SearchArray(sel, ['Name', 'User', 'Class', 'Processes', 'MaxProcesses',
			  'LoginControl', 'OffHours', 'LoadRange', 'ForcedRun',
			  'NiceValue', 'StartCycle', 'DownCount',
			  'LastProcess']);
  if i = 0 then
    error('Invalid selector', sel)
  fi;
  if nargs = 3 then m[i] := val else m[i] fi
end:
  
Process := proc( Pid:integer, Job:integer, Stopped:boolean,
	EventTime:numeric, JobTime:numeric )
  if type([args], [integer, integer, boolean, numeric, numeric]) then
    return(noeval(Process(args)))
  elif nargs = 1 and type(args[1], integer) then
    return(noeval(Process(args[1], 0, false, 0, 0)))
  fi;
  error('Invalid Process format')
end:

Process_type := noeval(structure(anything,Process)):

Process_select := proc(p, sel, val)
  i := SearchArray(sel, ['Pid', 'Job', 'Stopped', 'EventTime', 'JobTime',
			 'ElapsedTime']);
  if i = 0 then
    error('Invalid selector', sel)
  fi;
  if i > 5 then
    if nargs = 3 then error('Cannot assign to', sel) fi;
    t := round(If(p[Stopped], p[5], UTCTime() - p[4] + p[5]));
    if t >= 3600 then
      sprintf('%d:%02d:%02d', trunc(t/3600), trunc(mod(t,3600)/60), mod(t,60))
    else
      sprintf('%d:%02d', trunc(t/60), mod(t,60))
    fi
  else
    if nargs = 3 then p[i] := val else p[i] fi
  fi
end:

my_sscanf := proc(t, f)
  if length(t) > 400 then
    sscanf(t[1..400], f)
  else
    sscanf(t, f)
  fi
end:

log_line := proc()
  d := date();
  printf('%s:', d[5..-1]);
  for x in [args] do printf(' %a', x) od;
  lprint()
end:

check_getpid := proc(m: Machine)
  global nrCycles;
  if m[StartCycle] + 2 < nrCycles then
    log_line('Did not get pid for', m[Name]);
    m[StartCycle] := DBL_MAX
  fi
end:

other_job := proc(class: integer)
  global mach, todo, Queue;
  if length(todo) = 0 and class > 0 then
    maxClass := CreateArray(1..length(Queue));
    for m in mach do
      for p in m[Processes] do
	j := p[Job];
	if j > 0 then
	  maxClass[j] := max(maxClass[j], m['Class'])
	fi
      od
    od;
    now := UTCTime();
    maxTime := CreateArray(1..length(Queue));
    for m in mach do
      for p in m[Processes] do
	j := p[Job];
	if j > 0 and m['Class'] = maxClass[j] then
	  maxTime[j] := max(maxTime[j], now-p[EventTime]+p[JobTime])
	fi
      od
    od;
    minJob := 0; minClass := class; minTime := 0;
    for m in mach do
      for p in m[Processes] do
	j := p[Job];
	if j > 0 and not p[Stopped] and
	 (maxClass[j] < minClass or maxClass[j] = minClass and
	   maxTime[j] < minTime) then
	  minJob := j; minClass := maxClass[j]; minTime := maxTime[j]
	fi
      od
    od;
    minJob
  else
    0
  fi
end:

send_Text := proc(t: string)
  if traperror(SendTcp(t)) = lasterror then abnormal_termination() fi
end:

send_DATA := proc(name: string, pid: integer, data: string)
  if traperror(SendDataTcp(name, pid, data)) = lasterror then
    abnormal_termination()
  fi
end:

check_machine := proc(m: Machine)
  global todo;
  if m[StartCycle] < DBL_MAX then
    check_getpid(m)
  else
    m[LastProcess] := m[LastProcess] + 1;
    if m[LastProcess] > length(m[Processes]) and
      length(m[Processes]) < m[MaxProcesses] and
      length(todo) > 0 or other_job(m['Class']) > 0 then
      m[LastProcess] := 0;
      send_Text('MSTAT '.m[User].m[Name]);
      return(true)
    else
      if m[LastProcess] > length(m[Processes]) then m[LastProcess] := 1 fi;
      if m[LastProcess] <= length(m[Processes]) then
	send_Text('PSTAT '.m[Name].' '.m[Processes,m[LastProcess],Pid]);
	return(true)
      fi
    fi
  fi;
  return(false)
end:

create_process := proc(m: Machine)
  global nrCreated, nrCycles;
  if length(m[Processes]) < m[MaxProcesses] then
    check_getpid(m);
    if m[StartCycle] = DBL_MAX then
      log_line(m[Name], 'creates parallel process');
      send_Text(If(m[ForcedRun], 'RUNP ', 'RUN ').m[Name].
		' echo "ParExecSlave(180,'.m[NiceValue].
		');quit" | darwin -q '.DevNull );
      m[StartCycle] := nrCycles;
      nrCreated := nrCreated + 1
    fi
  else
    log_line('Machine', m[Name], 'already runs', length(m[Processes]),
	     'processes')
  fi
end:

same_machine := proc(m1: string, m2: string)
  if m1 = m2 then
    true
  else
    l1 := length(m1);
    l2 := length(m2);
    if l1 < l2 then
      evalb(m1 = m2[1..l1] and m2[l1+1] = '.')
    elif l2 < l1 then
      evalb(m2 = m1[1..l2] and m1[l2+1] = '.')
    else
      false
    fi
  fi
end:

machine_number := proc(m: string)
  global mach;
  p := CaseSearchString(':', m); if p < 0 then p := length(m) fi;
  q := CaseSearchString('/', m); if q < 0 then q := length(m) fi;
  m2 := m[1..min(p,q)];
  for i to length(mach) while not same_machine(m2, mach[i,Name]) do od;
  If(i <= length(mach), i, 0)
end:

ident_mach := (m,p) -> sprintf('%s(%d)', m[Name], p):

index_by_pid := proc(m: Machine, pid: integer)
  p := m[Processes];
  for i to length(p) while p[i,Pid] <> pid do od;
  If(i <= length(p), i, 0)
end:

index_by_job := proc(m: Machine, job: integer)
  p := m[Processes];
  for i to length(p) while p[i,Job] <> job do od;
  If(i <= length(p), i, 0)
end:

redo_job := proc(m: Machine, pid: integer)
  global todo, istodo;
  if nargs > 1 then
    p := index_by_pid(m, pid);
    if p > 0 then
      j := m[Processes,p,Job];
      if j > 0 and istodo[j] = 1 and SearchArray(j, todo) = 0 then
	todo := append(todo, j)
      fi
    fi
  else
    for p in m[Processes] do
      j := p[Job];
      if j > 0 and istodo[j] = 1 and SearchArray(j, todo) = 0 then
	todo := append(todo, j)
      fi
    od
  fi
end:

kill_job := proc(m: Machine, i: integer)
  p := m[Processes];
  if nargs > 1 then
    if i > 0 then
      send_Text('RSH '.m[Name].' kill '.p[i,Pid]);
      m[Processes] := [op(p[1..i-1]), op(p[i+1..-1])]
    fi
  else
    for p in m[Processes] do
      send_Text('RSH '.m[Name].' kill '.p[Pid])
    od;
    m[Processes] := []
  fi
end:

remove_job := proc(m: Machine, pid: integer)
  p := index_by_pid(m, pid);
  if p > 0 then
    m[Processes] := [op(m[Processes,1..p-1]), op(m[Processes,p+1..-1])]
  fi
end:

init_program := proc(m: Machine, pid: integer, prog: string)
  global nrStarted;
  if m[StartCycle] < DBL_MAX then
    m[Processes] := append(m[Processes], Process(pid));
    m[StartCycle] := DBL_MAX;
    send_DATA(m[Name], pid, prog);
    nrStarted := nrStarted + 1;
    log_line(ident_mach(m, pid), 'started')
  else
    log_line('Cannot handle pid', pid, 'from', m[Name])
  fi
end:

default_handler := proc(job: posint, t: string)
  global logfile;
  OpenWriting(logfile[1..-5].'.out.'.job);
  printf('%s', t):
  OpenWriting(previous);
end:

complete_job := proc(m: Machine, job: integer)
  global logfile, todo, mach, istodo;
  OpenAppending(logfile[1..-5].'.done');
  printf('%d:\n', job);
#  OpenAppending('/home/cbrg/debug/ParExec.trace');
#  lprint('Job:',job);
  OpenWriting(previous);
  OpenAppending(logfile);
  job2 := abs(job);
  istodo[job2] := 0;
  p := index_by_job(m, job2);
  if p > 0 then m[Processes,p,Job] := 0 fi;
  for m2 in mach do
    p := index_by_job(m2, job2);
    if p > 0 then
      log_line(ident_mach(m2, m2[Processes,p,Pid]),
	       sprintf('killed (also doing %d)', job2));
      kill_job(m2, p)
    fi
  od;
  OpenWriting(previous);
end:

handle_result := proc(m: Machine, pid: integer, result: string)
  global logfile, todo, mach, istodo, Queue, initCPU, jobCPU, nrCompleted,
  resultHandler, nrError;
 # option trace;
#  OpenAppending('~cbrg/debug/cbrg.trace');
  lprint('Enter handle_result',m, pid, result);
  r := my_sscanf(result, 'INIT DONE %g');
  p := index_by_pid(m, pid);
  if p > 0 then p := m[Processes,p] fi;
  if length(r) = 1 then
    log_line(ident_mach(m, pid), 'initialized', 
	     sprintf('(%.1f s CPU)', r[1]));
    initCPU := initCPU + r[1]
  else
    r := my_sscanf(result, 'RESULT %d %g');
    if length(r) = 2 then
      jobCPU := jobCPU + r[2];
      log_line(ident_mach(m, pid), 'completed job', r[1],
	       sprintf('(%.1f s CPU)', r[2]));
      if istodo[r[1]] <> 0 then
	resultHandler(r[1], result[CaseSearchString(':', result)+2..-1])
      fi;
      nrCompleted := nrCompleted + 1;
      complete_job(m, r[1])
    elif length(result) > 5 and result[1..5] = 'error' then
      log_line(ident_mach(m, pid), 'problem:', result[7..-1]);
      redo_job(m, pid); remove_job(m, pid);
      return()
    elif result = 'CONTINUE' then
      log_line(ident_mach(m, pid), 'sent 2nd request')
    else
      r := my_sscanf(result, 'RERROR %d');
      if length(r) = 1 then
	err := result[CaseSearchString(':', result)+2..-1];
	if p <> 0 and p[Job] = r[1] then
	  if r[1] = 0 then
	    log_line(ident_mach(m, pid), 'initialized with error:')
	  else
	    log_line(ident_mach(m, pid), 'completed job', r[1], 'with error:')
	  fi;
	  printf('%22s%s', ' ', err);
	  lprint();
	  if r[1] <> 0 then
	    nrError := nrError + 1;
	    complete_job(m, -r[1])
	  fi;
	  return()
	else
	  log_line(ident_mach(m, pid), 
		    sprintf('incorrectly ended job %d(%d!) with error:',
			     If(p <> 0, p[Job], 0), r[1]));
	  printf('%22s%s', ' ', err);
	  lprint();
	  redo_job(m, pid);
	  if p <> 0 then p[Job] := 0 fi;
	  return()
	fi
      else
	log_line(ident_mach(m, pid), 'sent invalid response');
	lprint(result);
	redo_job(m, pid);
	if p <> 0 then p[Job] := 0 fi
      fi
    fi
  fi;
  i := 1;
  while i <= length(todo) do
    if istodo[todo[i]] = 0 then
      todo := [op(todo[1..i-1]), op(todo[i+1..-1])]
    fi;
    i := i + 1
  od;
  if p <> 0 and p[Job] = 0 then
    newJob := other_job(m['Class']);
    if newJob > 0 then
      todo := [newJob]
    fi;
    if length(todo) > 0 then
      p[Job] := todo[1];
      log_line(ident_mach(m, pid), 'started job', todo[1]);
      job := traperror(eval(Queue[todo[1]]));
      if job = lasterror then abnormal_termination() fi;
      send_DATA(m[Name], pid, 'PE_job:='.todo[1].':'.job);
      OpenAppending(logfile);
      p[EventTime] := UTCTime(); p[JobTime] := 0;
      todo := todo[2..-1];
      OpenWriting(previous);
    else
      send_DATA(m[Name], pid, 'nojob := true:');
      log_line(ident_mach(m, pid), 'ending')
    fi
  fi
end:

send_loadrange := proc(m: Machine)
  send_Text(sprintf('LOADC %s %g %g', m[Name],
		    m[LoadRange,1], m[LoadRange,2]))
end:

send_loginctrl := proc(m: Machine)
  send_Text('LOGINC '.m[Name].If(m[LoginControl], ' ON', ' OFF'))
end:

send_offhours := proc(m: Machine)
  h := m[OffHours];
  send_Text('OFFHR '.m[Name].' '.h[1].' '.h[2])
end:

send_maxjobs := proc(m: Machine)
  send_Text('MAXJB '.m[Name].' '.m[MaxProcesses])
end:

ended_job := proc(m: Machine, pid: integer)
  global startable_processes;
  log_line(ident_mach(m, pid), 'ended');
  redo_job(m, pid);
  remove_job(m, pid);
  m[StartCycle] := DBL_MAX;
  startable_processes := startable_processes + 1
end:

stopped_job := proc(m: Machine, pid: integer)
  global mach, todo;
  p := index_by_pid(m, pid);
  if p > 0 then
    p := m[Processes,p];
    if not p[Stopped] then
      log_line(ident_mach(m, pid), 'stopped job', p[Job]);
      p[Stopped] := true;
      now := UTCTime();
      p[JobTime] := p[JobTime] + now - p[EventTime];  p[EventTime] := now;
      if p[Job] > 0 and SearchArray(p[Job], todo) = 0 then
	for j to length(mach) do
	  q := index_by_job(mach[j], p[Job]);
	  if q > 0 and not mach[j,Processes,q,Stopped] then break fi
	od;
	if j > length(mach) then
	  todo := append(todo, p[Job]);
	  # Pick up best machine to restart
	  for j to length(mach) while
	  length(mach[j,Processes]) = mach[j,MaxProcesses] do od;
	  if j <= length(mach) then check_machine(mach[j]) fi
	fi
      fi
    fi
  fi
end:

continued_job := proc(m: Machine, pid: integer)
  p := index_by_pid(m, pid);
  if p > 0 then
    p := m[Processes,p];
    if p[Stopped] then
      log_line(ident_mach(m, pid), 'continued job', If(p <> 0, p[Job], '?'));
      p[Stopped] := false; p[EventTime] := UTCTime()
    fi
  fi
end:

notify_state := proc(m: Machine, t: string)
  pid := my_sscanf(t, 'DONE %d');
  if length(pid) = 1 then
    ended_job(m, pid[1]);
    return(true)
  fi;
  pid := my_sscanf(t, 'STOPPED %d');
  if length(pid) = 1 then
    stopped_job(m, pid[1]);
    return(true)
  fi;
  pid := my_sscanf(t, 'CONTINUED %d');
  if length(pid) = 1 then
    continued_job(m, pid[1]);
    return(true)
  fi;
  false
end:

user_command := proc(t: string)
  global mach, istodo, todo, Queue, killed, startable_processes, send_mail;
  cmd := my_sscanf(t, '%s');
  if length(cmd) = 0 then
    return(false)
  fi;
  if cmd[1] = 'StartUsing' or cmd[1] = 'StopUsing' then
    cmd := my_sscanf(t, '%s %s');
    if length(cmd) = 2 then
      if cmd[1,3] = 'a' then
	if machine_number(cmd[2]) = 0 then
	  m := Machine(cmd[2]);
	  mach := append(mach, m);
	  log_line(m[Name], 'added to machine pool');
	  check_machine(m)
	else
	  log_line('Machine', cmd[2], 'already in use')
	fi
      else
	i := machine_number(cmd[2]);
	if i > 0 then
	  m := mach[i];
	  log_line(m[Name], 'removed from machine pool');
	  kill_job(m);
	  redo_job(m);
	  mach := [op(mach[1..i-1]), op(mach[i+1..-1])]
	else
	  log_line('Unknown machine in', t)
	fi
      fi
    else
      log_line('Invalid command', t)
    fi
  elif cmd[1] = 'Status' then
    log_line(sprintf('Status report (%d of %d jobs not done, %d not started):',
		       sum(istodo), length(istodo), length(todo)));
    lprint();
    lprint('  # Machine      Class   Pid Status    Job      CPU string');
    now := UTCTime();
    for i to length(mach) do
      m := mach[i];
      printf('%3d %-16s%2d ', i, m[User].m[Name], m['Class']);
      if m[StartCycle] < DBL_MAX then
	printf('      STARTED')
      elif length(m[Processes]) = 0 then
	if m[DownCount] > 0 then
	  printf('      DOWN(%d)', m[DownCount])
	else
	  printf('      INACTIVE')
	fi
      else
	for j to length(m[Processes]) do
	  p := m[Processes,j];
	  printf('%5d %-8s', p[Pid], If(p[Stopped], 'STOPPED', 'RUNNING'));
	  if p[Job] = 0 then
	    printf(' none')
	  else
	    q := Queue[p[Job]];
	    if type(q, string) then
	      q := sprintf('"%s"', If(length(q) > 23, q[1..20].'...', q))
	    fi;
	    printf('%5d%9s %a', p[Job], p[ElapsedTime], q)
	  fi;
	  if j < length(m[Processes]) then printf('\n%23s', ' ') fi
	od
      fi;
      lprint()
    od;
    lprint()
  elif cmd[1] = 'LoginControl' or cmd[1] = 'ForcedRun' then
    cmd := my_sscanf(t, '%s %s %s');
    if length(cmd) = 3 then
      i := machine_number(cmd[2]);
      if i > 0 then
	m := mach[i];
	if cmd[3] = 'on' or cmd[3] = 'off' then
	  m[cmd[1]] := evalb(cmd[3] = 'on');
	  if cmd[1] = 'LoginControl' then
	    send_loginctrl(m)
	  fi;
	  log_line(m[Name], 'turned', cmd[1], cmd[3])
	else
	  log_line(cmd[1], 'argument must be "on" or "off"')
	fi
      else
	log_line('Unknown machine in', t)
      fi
    else
      log_line('Invalid command', t)
    fi
  elif cmd[1] = 'NiceValue' then
    cmd := my_sscanf(t, '%s %s %d');
    if length(cmd) = 3 then
      i := machine_number(cmd[2]);
      if i > 0 then
	m := mach[i];
	if cmd[3] >= 0 and cmd[3] <= 19 then
	  m[NiceValue] := cmd[3];
	  log_line(m[Name], 'set nice value to', cmd[3])
	else
	  log_line('0 <= NiceValue <= 19')
	fi
      else
	log_line('Unknown machine in', t)
      fi
    else
      log_line('Invalid command', t)
    fi
  elif cmd[1] = 'OffHours' then
    cmd := my_sscanf(t, '%s %s %d..%d');
    if length(cmd) = 4 or length(cmd) = 2 then
      i := machine_number(cmd[2]);
      if i > 0 then
	m := mach[i];
	if length(cmd) = 2 then
	  m[OffHours] := 0..0
	else
	  m[OffHours] := cmd[3]..cmd[4]
	fi;
	send_offhours(m);
	log_line(m[Name], 'set off hours to', m[OffHours])
      else
	log_line('Unknown machine in', t)
      fi
    else
      log_line('Invalid command', t)
    fi
  elif cmd[1] = 'MaxJobs' then
    cmd := my_sscanf(t, '%s %s %d');
    if length(cmd) = 3 then
      i := machine_number(cmd[2]);
      if i > 0 then
	m := mach[i];
	if cmd[3] >= 1 then
	  m[MaxProcesses] := cmd[3];
	  m[LoadRange] := cmd[3] - 0.3 .. cmd[3] + 1;
	  send_maxjobs(m);
	  send_loadrange(m);
	  log_line(m[Name], 'set maximal jobs to', m[MaxProcesses])
	else
	  log_line('Cannot run less than 1 process on', m[Name])
	fi
      else
	log_line('Unknown machine in', t)
      fi
    else
      log_line('Invalid command', t)
    fi
  elif cmd[1] = 'LoadThreshold' then
    cmd := my_sscanf(t, '%s %s %g %g');
    if length(cmd) = 4 then
      i := machine_number(cmd[2]);
      if i > 0 then
	m := mach[i];
	if cmd[3] >= 0 and cmd[4] > cmd[3] then
	  m[LoadRange] := cmd[3]..cmd[4];
	  send_loadrange(m);
	  log_line(m[Name],
		    sprintf('set load thresholds to %g, %g', cmd[3], cmd[4]))
	else
	  log_line('Invalid load threshold values')
	fi
      else
	log_line('Unknown machine in', t)
      fi
    else
      cmd := my_sscanf(t, '%s %g %g');
      if length(cmd) = 3 then
	if cmd[2] >= 0 and cmd[3] > cmd[2] then
	  for m in mach do
	    m[LoadRange] := cmd[2]..cmd[3];
	    send_loadrange(m)
	  od;
	  log_line(sprintf('Set global load thresholds to %g, %g',
			     cmd[2], cmd[3]))
	else
	  log_line('Invalid load threshold values')
	fi
      else
	log_line('Invalid command', t)
      fi
    fi
  elif cmd[1] = 'RunAlso' then
    p := CaseSearchString(cmd[1], t);
    q := t[p+length(cmd[1])+2..-1];
    Queue := append(Queue, q);
    todo := append(todo, length(Queue));
    istodo := append(istodo, 1);
    if type(q, string) then
      q := sprintf('"%s"', If(length(q) > 30, q[1..27].'...', q))
    fi;
    startable_processes := startable_processes + 1;
    log_line('Job', length(Queue), q, 'added to queue')
  elif cmd[1] = 'KillAll' then
    killed := true;
    log_line('Killed all jobs')
  elif cmd[1] = 'Interrupt' then
    killed := true;
    OpenWriting('ParExecAction.redo');
    for i to length(istodo) do
      if istodo[i] > 0 then
	printf('RunAlso ');
	if type(Queue[i], string) then
	  printf('%s\n', Queue[i])
	else
	  printf('%a\n', Queue[i])
	fi
      fi
    od;
    OpenWriting(previous);
    OpenAppending(logfile);
    log_line('Killed all jobs, not done jobs written into');
    printf('%22sParExecAction.redo\n', ' ');
    OpenWriting(previous);
  elif cmd[1] = 'NoMail' then
    send_mail := false;
    log_line('No termination mail')
  else
    return(false)
  fi;
  true
end:

abnormal_termination := proc()
  global normal_termination, send_mail;
 # option trace;
  if normal_termination then
    log_line('Abnormal termination due to', lasterror);
    normal_termination := false;
    if send_mail then mail_message2(true) fi;
    user_command('Interrupt')
  fi
end:

process_controls := proc(t)
  p := 0;
  while p < length(t) do
    e := CaseSearchString('\n', p+t);
    if e < 0 then e := length(t) - p fi;
    if not user_command(t[p+1..p+e]) then
      log_line('Invalid control statement', t[p+1..p+e])
    fi;
    p := p + e + 1
  od
end:

mail_message2 := proc(aborted: boolean)
  global istodo, StartDate, StartTime, initCPU, jobCPU,
  nrCreated, nrStarted, nrVanished, nrCompleted, nrError;
  mailfile := '/tmp/msg.'.getpid();
  OpenWriting(mailfile);
  printf('ParExecuteIPC %s doing %s jobs\n', If(aborted, 'ABORTED', 'terminated'),
	 logfile[1..-5]);
  if sum(istodo) = 0 then
    printf('All jobs done\n')
  else
    printf('%d jobs remaining\n', sum(istodo))
  fi;
  printf('Started at %s\n', StartDate);
  printf('Ended at   %s\n\n', date());
  now := UTCTime();
  printf('%9.1f s real time\n', now - StartTime);
  printf('%9.1f s init CPU time\n', initCPU);
  printf('%9.1f s job CPU time\n', jobCPU);
  printf('%9.1f s total CPU time\n', jobCPU + initCPU);
  printf('%9.2f speedup\n',(initCPU + jobCPU) / (now - StartTime));
  printf('%9d processes created\n', nrCreated);
  printf('%9d processes started\n', nrStarted);
  printf('%9d processes vanished\n', nrVanished);
  printf('%9d jobs completed correctly\n', nrCompleted);
  printf('%9d jobs completed with error\n', nrError);
  OpenWriting(previous);
  CallSystem('Mail -s "ParExecuteIPC('.logfile[1..-5].') finished" $USER <'.mailfile.
          '; rm '.mailfile)
end:

ParExecuteIPC := proc(queue: list({string, structure}), ProgFileName: string,
		  machines: list(string), handler: {0, procedure}, delay: posint,
		  controls: string)
  global logfile, todo, mach, istodo, Queue, StartDate, StartTime, nrCycles,
  initCPU, jobCPU, nrCreated, nrStarted, nrVanished, nrCompleted,
  nrError, resultHandler, killed, startable_processes, send_mail,
  normal_termination;
 # option trace;
  
  oldprintgc := Set(printgc=false);
#  prog := ReadRawFile(libname.'/Server/'.ProgFileName);
  prog := ReadRawFile(ProgFileName);
  normal_termination := true; send_mail := false;
  if length(prog) = 0 then error(ProgFileName, 'not found') fi;
  Queue := queue;
  if length(Queue) = 0 then
    istodo := []
  else
    istodo := CreateArray(1..length(Queue), 1)
  fi;
  if length(ReadRawFile(ProgFileName.'.done')) > 0 then
    OpenReading(ProgFileName.'.done');
    do
      i := ReadLine ();
      if i = EOF then break fi;
#  if it is negative we want to do it over
#     i := abs (i);
#      if i <= length (istodo) then istodo[i] := 0 fi
      if i >0 and i<= length (istodo) then istodo[i] := 0 fi
    od
  fi;
  todo := [];
  for i to length(istodo) do
    if istodo[i] = 1 then todo := append(todo, i) fi
  od;

  resultHandler := If(handler = 0, default_handler, handler);
  logfile := ProgFileName.'.log';

  OpenWriting(logfile);
  
  StartDate := date();
  StartTime := UTCTime();
  initCPU := jobCPU := nrCycles := 0;
  nrCreated := nrStarted := nrVanished := nrCompleted := nrError := 0;
  printf('ParExecuteIPC started on %s at %s for pid %d\n',
	hostname(), StartDate, getpid() );
  printf( '%d of %d jobs to be done\n\n', length(todo), length(Queue) );

  r := traperror(ConnectTcp('/tmp/.ipc/darwin', false));
  if r = 'IPC [connect]: No such file or directory' or
     r = 'IPC [connect]: Connection refused' then
    log_line('Starting darwinipc daemon');
    lprint();
    CallSystem('darwinipc </dev/null '.DevNull.' &');
    sleep(5);
    ConnectTcp('/tmp/.ipc/darwin', false)
  elif r = lasterror and r <> 'Already connected' then
    error(r)
  fi;
  lprint('  # Machine             Class MaxP Initial Status');
  mach := CreateArray(1..length(machines));
  for i to length(mach) do
    mach[i] := Machine(machines[i])
  od;
  mach := sort(mach, x->-x['Class']);
  nrgot := i := 0;
  while nrgot < length(mach) do
    if i < length(mach) then
      i := i + 1;
      send_Text('MSTAT '.mach[i,User].mach[i,Name])
    fi;
    r := traperror(ReceiveDataTcp(20));
    if r = NULL then
      if i = length(mach) then break fi
    else
      nrgot := nrgot + 1;
      if r = lasterror then
      	lprint('Catched', r);
	abnormal_termination();
        return()
      elif length(r[3]) > 2 and r[3,1..2] = 'OK' then
	nr := machine_number(r[1]);
        printf('%3d %-20s%5d%5d %s', nr, r[1], mach[nr,'Class'],
	       mach[nr,MaxProcesses], r[3,4..-1]);
        lprint();
	if r[3] = 'OK STARTED' then sleep(5) fi
      fi
    fi
  od;
  lprint();
  usedDelay := 9; nextmachine := 1; killed := false; send_mail := true;
  startable_processes := length(Queue);
  if nargs > 5 then
    process_controls(controls)
  fi;
  do
    t := ReadRawFile('ParExecAction.'.getpid());
    if length(t) > 0 then
      process_controls(t);
      if Debug <> true then CallSystem('rm ParExecAction.'.getpid()) fi
    fi;
    t := ReadRawFile('ParExecAction');
    if length(t) > 0 then
      process_controls(t);
      CallSystem('rm  ParExecAction')
     fi;
    if sum(istodo) = 0 or killed then break fi;
    while sum(istodo) > 0 and not killed do
      r := traperror(ReceiveDataTcp(usedDelay)); 
     # OpenAppending('~cbrg/debug/cbrg.trace');
     # lprint('ParExec2, ReceiveDataTcp:',r);
      if r = NULL then  break fi;
      if r = 'Not connected' then
	if send_mail then mail_message2(true) fi;
	return()
      elif r = lasterror then
	log_line('Catched', r);
	abnormal_termination()
      else
        i := machine_number(r[1]);
#	lprint('ParExec2, Machine number:',i);
	if i > 0 then
	  m := mach[i];
	  t := r[3];
	  pid := my_sscanf(t, 'OK %d');  
#	  lprint('ParExec2, t:', t);
	  if length(pid) = 1 then
	    init_program(m, pid[1], prog);
          elif t = 'OK ALIVE' or t = 'OK BUSY' and m[ForcedRun] then
	    if m[DownCount] > 0 then
	      m[DownCount] := 0; check_machine(m)
	    elif startable_processes > 0 then
	      newJob := other_job(m['Class']);
	      if newJob > 0 then
		todo := [newJob]
	      fi;
	      if length(todo) > 0 then
		send_loadrange(m);
		send_loginctrl(m);
		send_offhours(m);
		send_maxjobs(m);
		create_process(m);
		startable_processes := startable_processes - 1
	      fi;
	    fi;
	  elif t = 'OK BUSY' then
	    if m[DownCount] > 0 then
	      m[DownCount] := 0; check_machine(m)
	    fi;
	  elif t = 'OK STARTED' then
	    redo_job(m);
	    m[Processes] := [];
	    m[DownCount] := 0;
            log_line(m[Name], 'is starting')
          elif t = 'OK DOWN' or t = 'error DOWN' then
	    m[DownCount] := m[DownCount] + 1;
	    if mod(m[DownCount], 10) = 0 then
	      log_line(m[Name], 'is down');
	      redo_job(m);
	      m[Processes] := []
	    fi
          elif t = 'OK RUNNING' then
	    continued_job(m, r[2])
	  elif t = 'OK STOPPED' then
	    stopped_job(m, r[2])
	  elif t = 'OK NONE' then
            nrVanished := nrVanished + 1;
	    log_line(ident_mach(m, r[2]), 'vanished');
	    redo_job(m, r[2]);
	    remove_job(m, r[2])
	  elif t = 'error Job limit reached' then
	    log_line('Job limit reached on', m[Name]);
	    m[StartCycle] := DBL_MAX;
	    startable_processes := startable_processes + 1
	  elif not notify_state(m, t) and not user_command(t) then
	    handle_result(m, r[2], t)
	  fi
	elif not user_command(r[3]) then
	  if length(r[3]) > 4 and r[3,1..4] = 'DONE' then
	    log_line(r[1], 'ended')
	  else
	    log_line('Unknown machine', r[1], 'sent', r[3])
	  fi;
	fi
      fi
    od;
    for i to length(mach) do
      if nextmachine <= length(mach) then
	b := check_machine(mach[nextmachine])
      else
	b := false
      fi;
      if nextmachine >= length(mach) then
	nrCycles := nrCycles + 1;
        nextmachine := 1;
        usedDelay := If(nargs < 5, 10, delay);
	if nrCycles > 3 then startable_processes := startable_processes + 1 fi
      else
        nextmachine := nextmachine + 1
      fi;
      if b then break fi
    od;
  od;
  log_line('===== ParExecuteIPC terminated. =====');
  OpenWriting(previous);
  if normal_termination then
    for m in mach do
      kill_job(m)
    od;
    if send_mail then mail_message2(false) fi;
    # Wait until all kill requests have been processed
    SendTcp('PING');
    do
      r := ReceiveTcp(300);
      if r = NULL or r = 'PING OK' then break fi
    od
  fi;
  Set(printgc=oldprintgc);
  NULL
end:

ParExecuteTest := proc(thisjob: {string, structure}, ProgFileName: string, machine: string)
  global job, init;

  oldprintgc := Set(printgc=false);
  prog := ReadRawFile(ProgFileName);
#  prog := ReadRawFile(libname.'/Server/'.ProgFileName);
  if length(prog) = 0 then error(ProgFileName, 'not found') fi;
  job := init := NULL;
# changed for consistency-  gmc oct 18 2000
   ReadProgram(ProgFileName);
#   ReadProgram(libname.'/Server/'.ProgFileName);
  if not type(init, procedure) then
    error('init is not being assigned a procedure in', ProgFileName)
  fi;
  if not type(job, procedure) then
    error('job is not being assigned a procedure in', ProgFileName)
  fi;
  r := traperror(ConnectTcp('/tmp/.ipc/darwin', false));
  if r = 'IPC [connect]: No such file or directory' or
     r = 'IPC [connect]: Connection refused' then
    lprint('Starting darwinipc daemon');
    lprint();
    CallSystem('darwinipc </dev/null '.DevNull.' &');
    sleep(5);
    ConnectTcp('/tmp/.ipc/darwin', false)
  elif r = lasterror and r <> 'Already connected' then
    error(r)
  fi;
  m := Machine(machine);
  SendTcp('MSTAT '.m[User].m[Name]);
  nrCycles := 0;
  do
    r := traperror(ReceiveDataTcp(30));
    nrCycles := nrCycles + 1;
    if r = NULL then
      if m[StartCycle] + 2 < nrCycles then
	log_line('error: Did not get pid');
	break
      elif length(m[Processes]) = 0 then
	if nrCycles > 2 then
	  log_line('error: Did not get status of', m[Name]);
	  break
	fi;
	SendTcp('MSTAT '.m[User].m[Name])
      else
	SendTcp('PSTAT '.m[Name].' '.m[Processes,1,Pid])
      fi
    elif r = 'Not connected' then
      log_line('error: not connected');
      break
    elif r = lasterror then
      log_line('Catched', r)
    elif same_machine(r[1], m[Name]) then
      t := r[3];
      pid := my_sscanf(t, 'OK %d');
      if length(pid) = 1 then
	if m[StartCycle] < DBL_MAX then
	  m[Processes] := [Process(pid[1])];
	  SendDataTcp(m[Name], pid[1], prog);
	  log_line(ident_mach(m, pid[1]), 'started');
	  m[StartCycle] := DBL_MAX
	else
	  log_line('Cannot handle pid', pid[1], 'from', m[Name]);
	  break
	fi
      elif t = 'OK ALIVE' or t = 'OK BUSY' then
	if length(m[Processes]) = 0 and m[StartCycle] = DBL_MAX then
	  log_line(m[Name], 'creates parallel process');
	  SendTcp('RUNP '.m[Name].
		  ' echo "ParExecSlave(30,0):quit" | darwin '.DevNull);
	  m[StartCycle] := nrCycles
	fi
      elif t = 'OK STARTED' then
	log_line(m[Name], 'is starting')
      elif t = 'OK DOWN' or t = 'error DOWN' then
	log_line('error:', m[Name], 'is down');
	break
      elif t = 'OK RUNNING' then
	log_line(ident_mach(m, r[2]), 'continued job');
	p := index_by_pid(m, r[2]);
	if p > 0 then m[Processes,p,Stopped] := false fi
      elif t = 'OK STOPPED' then
	log_line(ident_mach(m, r[2]), 'stopped job');
	p := index_by_pid(m, r[2]);
	if p > 0 then m[Processes,p,Stopped] := true fi
      elif t = 'OK NONE' then
	log_line('error:', ident_mach(m, r[2]), 'died');
	break
      else
	pid := my_sscanf(t, '%s %d');
	if length(pid) = 2 and pid[1] = 'DONE' then
	  log_line(ident_mach(m, pid[2]), 'ended');
	  break
	elif length(pid) = 2 and pid[1] = 'STOPPED' then
	  log_line(ident_mach(m, pid[2]), 'stopped job');
	  p := index_by_pid(m, pid[2]);
	  if p > 0 then m[Processes,p,Stopped] := true fi
	elif length(pid) = 2 and pid[1] = 'CONTINUED' then
	  log_line(ident_mach(m, pid[2]), 'continued job');
	  p := index_by_pid(m, pid[2]);
	  if p > 0 then m[Processes,p,Stopped] := false fi
	elif length(m[Processes]) > 0 then
	  pid := r[2];
	  r := my_sscanf(t, 'INIT DONE %g');
	  if length(r) = 1 then
	    log_line(ident_mach(m, pid), 'initialized', 
		      sprintf('(%.1f s CPU)', r[1]));
	    p := index_by_pid(m, pid);
	    if p > 0 then m[Processes,p,Job] := 1 fi;
	    log_line(ident_mach(m, pid), 'started job');
	    SendDataTcp(m[Name], pid, 'PE_job:=1:'.eval(thisjob))
	  else
	    r := my_sscanf(t, 'RESULT %d %g');
	    if length(r) = 2 then
	      log_line(ident_mach(m, pid), 'completed job',
			sprintf('(%.1f s CPU), result:', r[2]));
	      result := t[CaseSearchString(':', t)+2..-1];
	      printf('%s', result);
	      if length(result) = 0 or result[length(result)] <> '\n' then
		lprint()
	      fi;
	      SendDataTcp(m[Name], pid, 'nojob := true:');
	      log_line(ident_mach(m, pid), 'ending')
	    elif length(t) > 5 and t[1..5] = 'error' then
	      log_line(ident_mach(m, pid), 'problem:', t[7..-1]);
	      kill_job(m, index_by_pid(m, pid))
	    elif length(t) > 6 and t[1..6] = 'RERROR' then
	      log_line(ident_mach(m, pid), 'completed with error:');
	      printf('%22s%s', ' ', t[CaseSearchString(':', t)+2..-1]);
	      lprint();
	      kill_job(m, index_by_pid(m, pid))
	    elif t = 'CONTINUE' then
	      log_line(ident_mach(m, pid), 'sent 2nd request')
	    else
	      log_line(ident_mach(m, pid), 'sent invalid response:');
	      lprint(t);
	      kill_job(m, index_by_pid(m, pid))
	    fi
	  fi
	elif t = 'error Job limit reached' then
	  log_line('Job limit reached on', m[Name]);
	  break
	else
	  log_line('Cannot handle (for', m[Name].')', t);
	  kill_job(m, index_by_pid(m, r[2]));
	  break
	fi
      fi
    fi
  od;
  Set(printgc=oldprintgc);
  NULL
end:

GetMachineUsage := proc(logfile: string)
  mach := [];
  cnts := [];
  OpenReading(logfile);
  do
    t := ReadLine();
    if t = EOF then break fi;
    if length(t) <= 22 or t[17] <> '1' or t[18] <> '9' or t[21] <> ':' then
      next
    fi;
    t := t[23..-1];
    m := sscanf(t, '%s');
    if length(m) = 0 then next fi;
    m := m[1];  m := m[1..CaseSearchString('(', m)];
    i := SearchArray(m, mach);
    if i = 0 then
      mach := append(mach, m);
      i := length(mach);
      cnts := append(cnts, [0, 0, 0, 0, 0, 0, i])
    fi;
    c := cnts[i];
    if CaseSearchString('is starting', t) >= 0 then c[1] := c[1] + 1
    elif CaseSearchString('started', t) = length(t) - 7 then c[2] := c[2] + 1
    else
      p := CaseSearchString('initialized', t);
      if p >= 0 then
	c[3] := c[3] + 1;
	k := 5
      else
	p := CaseSearchString('completed', t);
	if p >= 0 then
	  c[4] := c[4] + 1;
	  k := 6
	fi
      fi;
      if p >= 0 then
	q := CaseSearchString('(', p + t);
	if q > 0 then
	  cpu := sscanf(p + q + 1 + t, '%g');
	  if length(cpu) = 1 then
	    c[k] := c[k] + cpu[1]
	  fi
	fi
      fi
    fi
  od;
  cnts := sort(cnts, x->-(10000*x[4]+x[3]));
  tot := cnts[1];
  for i from 2 to length(cnts) do
    tot := tot + cnts[i]
  od;
  printf('Machine Starts PIDs Inits    CPU    Avg  Jobs  Tot CPU     Avg %%Work  %%Cum\n');
  printf('------- ------ ---- ----- ------ ------ ----- -------- ------- ----- -----\n');
  cum := 0;
  for c in append(cnts, tot) do
    if sum(c) > c[7] then
      machName := If(c[7] <= length(mach), mach[c[7]], 'TOTAL');
      avgInit := If(c[3] = 0, '**', sprintf('%7.1f', c[5] / c[3]));
      avgTotal := If(c[4] = 0, '**', sprintf('%8.1f', c[6] / c[4]));
      cum := cum + c[4];
      cumValue := If(machName = 'TOTAL', '',
		     sprintf('%6.1f', 100 * cum / tot[4]));
      printf('%-10s%4d%5d%6d%7d%7s%6d%9d%8s%6.1f%6s\n',
	     machName, c[1], c[2], c[3], round(c[5]), avgInit,
	     c[4], round(c[6]), avgTotal, 100 * c[4] / tot[4],
	     cumValue)
    fi
  od
end:
