/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
 * Copyright (c) 2007      The Trustees of Indiana University.
 *                         All rights reserved.
 * Copyright (c) 2011-2016 Cisco Systems, Inc.  All rights reserved.
 * Copyright (c) 2011-2017 Los Alamos National Security, LLC. All
 *                         rights reserved.
 * Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
 * Copyright (c) 2014-2016 Research Organization for Information Science
 *                         and Technology (RIST). All rights reserved.
 * $COPYRIGHT$
 *
 * Additional copyrights may follow
 *
 * $HEADER$
 */

#include "opal_config.h"
#include "opal/constants.h"
#include "opal/types.h"
#include "opal_stdint.h"
#include "opal/mca/hwloc/base/base.h"
#include "opal/util/argv.h"
#include "opal/util/opal_environ.h"
#include "opal/util/output.h"
#include "opal/util/proc.h"
#include "opal/util/output.h"
#include "opal/util/show_help.h"
#include "opal/util/opal_getcwd.h"
#include "opal/constants.h"
#include "opal/mca/pmix/base/base.h"
#include "opal/mca/pmix/base/pmix_base_hash.h"
#include "pmix_cray.h"

static char cray_pmi_version[128];

static int cray_init(opal_list_t *ilist);
static int cray_fini(void);
static int cray_initialized(void);
static int cray_abort(int flat, const char *msg,
                      opal_list_t *procs);
static int cray_spawn(opal_list_t *jobinfo, opal_list_t *apps, opal_jobid_t *jobid);
static int cray_spawn_nb(opal_list_t *jobinfo, opal_list_t *apps,
                         opal_pmix_spawn_cbfunc_t cbfunc,
                         void *cbdata);
static int cray_job_connect(opal_list_t *procs);
static int cray_job_disconnect(opal_list_t *procs);
static int cray_job_disconnect_nb(opal_list_t *procs,
                                  opal_pmix_op_cbfunc_t cbfunc,
                                  void *cbdata);
static int cray_resolve_peers(const char *nodename,
                              opal_jobid_t jobid,
                              opal_list_t *procs);
static int cray_resolve_nodes(opal_jobid_t jobid, char **nodelist);
static int cray_put(opal_pmix_scope_t scope, opal_value_t *kv);
static int cray_fence(opal_list_t *procs, int collect_data);
static int cray_fencenb(opal_list_t *procs, int collect_data,
                        opal_pmix_op_cbfunc_t cbfunc, void *cbdata);
static int cray_commit(void);
static int cray_get(const opal_process_name_t *id,
                    const char *key, opal_list_t *info,
                    opal_value_t **kv);
static int cray_get_nb(const opal_process_name_t *id, const char *key,
                       opal_list_t *info,
                       opal_pmix_value_cbfunc_t cbfunc, void *cbdata);
static int cray_publish(opal_list_t *info);
static int cray_publish_nb(opal_list_t *info,
                           opal_pmix_op_cbfunc_t cbfunc, void *cbdata);
static int cray_lookup(opal_list_t *data, opal_list_t *info);
static int cray_lookup_nb(char **keys, opal_list_t *info,
                          opal_pmix_lookup_cbfunc_t cbfunc, void *cbdata);
static int cray_unpublish(char **keys, opal_list_t *info);
static int cray_unpublish_nb(char **keys, opal_list_t *info,
                            opal_pmix_op_cbfunc_t cbfunc, void *cbdata);
static const char *cray_get_version(void);
static int cray_store_local(const opal_process_name_t *proc,
                          opal_value_t *val);
static const char *cray_get_nspace(opal_jobid_t jobid);
static void cray_register_jobid(opal_jobid_t jobid, const char *nspace);

#if 0
static bool cray_get_attr(const char *attr, opal_value_t **kv);
#endif

const opal_pmix_base_module_t opal_pmix_cray_module = {
    .init = cray_init,
    .finalize = cray_fini,
    .initialized = cray_initialized,
    .abort = cray_abort,
    .commit = cray_commit,
    .fence = cray_fence,
    .fence_nb = cray_fencenb,
    .put = cray_put,
    .get = cray_get,
    .get_nb = cray_get_nb,
    .publish = cray_publish,
    .publish_nb = cray_publish_nb,
    .lookup = cray_lookup,
    .lookup_nb = cray_lookup_nb,
    .unpublish = cray_unpublish,
    .unpublish_nb = cray_unpublish_nb,
    .spawn = cray_spawn,
    .spawn_nb = cray_spawn_nb,
    .connect = cray_job_connect,
    .disconnect = cray_job_disconnect,
    .disconnect_nb = cray_job_disconnect_nb,
    .resolve_peers = cray_resolve_peers,
    .resolve_nodes = cray_resolve_nodes,
    .get_version = cray_get_version,
    .register_evhandler = opal_pmix_base_register_handler,
    .deregister_evhandler = opal_pmix_base_deregister_handler,
    .store_local = cray_store_local,
    .get_nspace = cray_get_nspace,
    .register_jobid = cray_register_jobid
};

// usage accounting
static int pmix_init_count = 0;

// local object
typedef struct {
    opal_object_t super;
    opal_event_t ev;
    opal_pmix_op_cbfunc_t opcbfunc;
    void *cbdata;
} pmi_opcaddy_t;
static OBJ_CLASS_INSTANCE(pmi_opcaddy_t,
                          opal_object_t,
                          NULL, NULL);

struct fence_result {
    volatile int flag;
    int status;
};

// PMI constant values:
static int pmix_kvslen_max = 0;
static int pmix_keylen_max = 0;
static int pmix_vallen_max = 0;
static int pmix_vallen_threshold = INT_MAX;

// Job environment description
static int pmix_size = 0;
static int pmix_rank = 0;
static int pmix_lrank = 0;
static int pmix_nrank = 0;
static int pmix_nlranks = 0;
static int pmix_appnum = 0;
static int pmix_usize = 0;
static char *pmix_kvs_name = NULL;
static int *pmix_lranks = NULL;
static opal_process_name_t pmix_pname;
static uint32_t pmix_jobid = -1;

static char* pmix_error(int pmix_err);
#define OPAL_PMI_ERROR(pmi_err, pmi_func)                       \
    do {                                                        \
        opal_output(0, "%s [%s:%d:%s]: %s\n",                   \
                    pmi_func, __FILE__, __LINE__, __func__,     \
                    pmix_error(pmi_err));                       \
    } while(0);

#define CRAY_WAIT_FOR_COMPLETION(a)               \
    do {                                          \
        while ((a)) {                             \
            usleep(10);                           \
        }                                         \
    } while (0)

static void cray_get_more_info(void)
{
    int alps_status = 0, i;
    uint64_t apid;
    size_t alps_count;
    int lli_ret = 0, place_ret;
    alpsAppLayout_t layout;
    char *npstring;
    char *firstrankstring;
    char **nps, **firstranks;
    int *base_pe_in_app;
    int *pes_in_app;
    char pbuf[OPAL_PATH_MAX];

    /*
     * First get our apid
     */

    lli_ret = alps_app_lli_lock();
    if (0 != lli_ret) {
        OPAL_OUTPUT_VERBOSE((20, opal_pmix_base_framework.framework_output,
                             "%s pmix:cray: alps_app_lli_lock returned %d",
                             OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), lli_ret));
        goto fn_exit;
    }

    lli_ret = alps_app_lli_put_request(ALPS_APP_LLI_ALPS_REQ_APID, NULL, 0);
    if (ALPS_APP_LLI_ALPS_STAT_OK != lli_ret) {
        OPAL_OUTPUT_VERBOSE((20, opal_pmix_base_framework.framework_output,
                             "%s pmix:cray: alps_app_lli_put_request - APID returned %d",
                             OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), lli_ret));
        goto fn_exit_w_lock;
    }

    lli_ret = alps_app_lli_get_response (&alps_status, &alps_count);
    if (ALPS_APP_LLI_ALPS_STAT_OK != alps_status) {
        OPAL_OUTPUT_VERBOSE((20, opal_pmix_base_framework.framework_output,
                             "%s pmix:cray: alps_app_lli_get_response returned %d",
                             OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), alps_status));
        goto fn_exit_w_lock;
    }

    lli_ret = alps_app_lli_get_response_bytes (&apid, sizeof(apid));
    if (ALPS_APP_LLI_ALPS_STAT_OK != lli_ret) {
        OPAL_OUTPUT_VERBOSE((20, opal_pmix_base_framework.framework_output,
                             "%s pmix:cray: alps_app_lli_get_response_bytes returned %d",
                             OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), lli_ret));
        goto fn_exit_w_lock;
    }

    /*
     * get some items from alps placement file
     */

    place_ret = alps_get_placement_info(apid,
                                        &layout,
                                        NULL,
                                        NULL,
                                        NULL,
                                        NULL,
                                        NULL,
                                        &base_pe_in_app,
                                        &pes_in_app,
                                        NULL,
                                        NULL);
    if (1 != place_ret) {
        OPAL_OUTPUT_VERBOSE((20, opal_pmix_base_framework.framework_output,
                             "%s pmix:cray: alps_get_placement_info returned %d (%s)",
                             OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), place_ret, strerror(errno)));
        goto fn_exit;
    }

    OPAL_OUTPUT_VERBOSE((2, opal_pmix_base_framework.framework_output,
                           "%s pmix:cray: alps_get_placement_info returned %d first pe on node is %d",
                            OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), place_ret, layout.firstPe));

    nps = NULL;
    firstranks = NULL;
    for (i=0; i < layout.numCmds; i++) {
        snprintf(pbuf, sizeof(pbuf), "%d", pes_in_app[i]);
        opal_argv_append_nosize(&nps, pbuf);
        snprintf(pbuf, sizeof(pbuf), "%d", base_pe_in_app[i]);
        opal_argv_append_nosize(&firstranks, pbuf);
    }

    npstring = opal_argv_join(nps, ' ');
    firstrankstring = opal_argv_join(firstranks, ' ');
    opal_argv_free(nps);
    opal_argv_free(firstranks);

    /*
     * stuff values into environment variables
     */

    /* add these envars to prep MPI-2 info pre-defined key/values */
    snprintf(pbuf, sizeof(pbuf), "%d", layout.numCmds);
    opal_setenv("OMPI_NUM_APP_CTX", pbuf, true, &environ);
    opal_setenv("OMPI_FIRST_RANKS", firstrankstring, true, &environ);
    opal_setenv("OMPI_APP_CTX_NUM_PROCS", npstring, true, &environ);
    free(firstrankstring);
    free(npstring);
    free(base_pe_in_app);
    free(pes_in_app);

    /*
     * ALPS always starts the application in the directory
     * where the aprun command was run to do the launch.
     * For SLURM, we have to check the SLURM_WORKING_DIR env.
     * variable.  If it is set, we can't set wdir since
     * we can't assume PWD is where we started.
     */
    if(getenv("SLURM_WORKING_DIR") == NULL) {
        opal_getcwd(pbuf, OPAL_PATH_MAX);
        opal_setenv("OMPI_MCA_initial_wdir", pbuf, true, &environ);
    }

   fn_exit_w_lock:
    lli_ret = alps_app_lli_unlock();
    if (ALPS_APP_LLI_ALPS_STAT_OK != lli_ret) {
        OPAL_OUTPUT_VERBOSE((20, opal_pmix_base_framework.framework_output,
                             "%s pmix:cray: alps_app_lli_unlock returned %d",
                             OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), lli_ret));
    }

   fn_exit:
    return;
}

static int cray_init(opal_list_t *ilist)
{
    int i, spawned, size, rank, appnum, my_node;
    int rc, ret = OPAL_ERROR;
    char *pmapping = NULL;
    char buf[PMI2_MAX_ATTRVALUE];
    int found;
    int major, minor, revision;
    uint32_t jobfam;
    opal_value_t kv;
    opal_process_name_t ldr;
    char nmtmp[64];
    char *str, **localranks = NULL;
    opal_process_name_t name;

    ++pmix_init_count;

    /* if we can't startup PMI, we can't be used */
    if ( PMI2_Initialized () ) {
        opal_output_verbose(10, opal_pmix_base_framework.framework_output,
                        "%s pmix:cray: pmi already initialized",
                        OPAL_NAME_PRINT(pmix_pname));
        return OPAL_SUCCESS;
    }
    size = -1;
    rank = -1;
    appnum = -1;
    if (PMI_SUCCESS != (rc = PMI2_Init(&spawned, &size, &rank, &appnum))) {
        opal_show_help("help-pmix-base.txt", "pmix2-init-failed", true, rc);
        return OPAL_ERROR;
    }
    if( size < 0 || rank < 0 ){
        opal_show_help("help-pmix-base.txt", "pmix2-init-returned-bad-values", true);
        goto err_exit;
    }

    pmix_size = size;
    pmix_rank = rank;
    pmix_appnum = appnum;

    pmix_vallen_max = PMI2_MAX_VALLEN;
    pmix_kvslen_max = PMI2_MAX_VALLEN; // FIX ME: What to put here for versatility?
    pmix_keylen_max = PMI2_MAX_KEYLEN;
    pmix_vallen_threshold = PMI2_MAX_VALLEN * 3;
    pmix_vallen_threshold >>= 2;

    /*
     * get the version info
     */

    if (PMI_SUCCESS != PMI_Get_version_info(&major,&minor,&revision)) {
        return OPAL_ERROR;
    }

    snprintf(cray_pmi_version, sizeof(cray_pmi_version),
             "%d.%d.%d", major, minor, revision);

    pmix_kvs_name = (char*)malloc(pmix_kvslen_max);
    if( pmix_kvs_name == NULL ){
         PMI2_Finalize();
         ret = OPAL_ERR_OUT_OF_RESOURCE;
         goto err_exit;
    }

    rc = PMI2_Job_GetId(pmix_kvs_name, pmix_kvslen_max);
    if( PMI_SUCCESS != rc ) {
        OPAL_PMI_ERROR(rc, "PMI2_Job_GetId");
        goto err_exit;
    }

    rc = sscanf(pmix_kvs_name,"kvs_%u",&jobfam);
    if (rc != 1) {
        opal_output_verbose(10, opal_pmix_base_framework.framework_output,
                           "%s pmix:cray: pmix_kvs_name %s",
                            OPAL_NAME_PRINT(pmix_pname), pmix_kvs_name);
        rc = OPAL_ERROR;
        goto err_exit;
    }

    pmix_jobid = jobfam << 16;

    /* store our name in the opal_proc_t so that
     * debug messages will make sense - an upper
     * layer will eventually overwrite it, but that
     * won't do any harm */
    pmix_pname.jobid = pmix_jobid;
    pmix_pname.vpid = pmix_rank;
    opal_proc_set_name(&pmix_pname);
    opal_output_verbose(10, opal_pmix_base_framework.framework_output,
                        "%s pmix:cray: assigned tmp name %d %d pmix_kvs_name %s",
                        OPAL_NAME_PRINT(pmix_pname),pmix_pname.jobid,pmix_pname.vpid,pmix_kvs_name);

    pmapping = (char*)malloc(PMI2_MAX_VALLEN);
    if( pmapping == NULL ){
        rc = OPAL_ERR_OUT_OF_RESOURCE;
        OPAL_ERROR_LOG(rc);
        return rc;
    }

    rc = PMI2_Info_GetJobAttr("PMI_process_mapping", pmapping, PMI2_MAX_VALLEN, &found);
    if( !found || PMI_SUCCESS != rc ) {
        OPAL_PMI_ERROR(rc,"PMI2_Info_GetJobAttr");
        return OPAL_ERROR;
    }

    pmix_lranks = pmix_cray_parse_pmap(pmapping, pmix_rank, &my_node, &pmix_nlranks);
    if (NULL == pmix_lranks) {
        rc = OPAL_ERR_OUT_OF_RESOURCE;
        OPAL_ERROR_LOG(rc);
        return rc;
    }

    free(pmapping);

    // setup hash table
    opal_pmix_base_hash_init();

    /* setup a name for retrieving data associated with the job */
    name.jobid = pmix_jobid;
    name.vpid = OPAL_VPID_WILDCARD;

    /* save the job size */
    OBJ_CONSTRUCT(&kv, opal_value_t);
    kv.key = strdup(OPAL_PMIX_JOB_SIZE);
    kv.type = OPAL_UINT32;
    kv.data.uint32 = pmix_size;
    if (OPAL_SUCCESS != (rc = opal_pmix_base_store(&name, &kv))) {
        OPAL_ERROR_LOG(rc);
        OBJ_DESTRUCT(&kv);
        goto err_exit;
    }
    OBJ_DESTRUCT(&kv);

    /* save the appnum */
    OBJ_CONSTRUCT(&kv, opal_value_t);
    kv.key = strdup(OPAL_PMIX_APPNUM);
    kv.type = OPAL_UINT32;
    kv.data.uint32 = pmix_appnum;
    if (OPAL_SUCCESS != (ret = opal_pmix_base_store(&OPAL_PROC_MY_NAME, &kv))) {
        OPAL_ERROR_LOG(ret);
        OBJ_DESTRUCT(&kv);
        goto err_exit;
    }
    OBJ_DESTRUCT(&kv);

    rc = PMI2_Info_GetJobAttr("universeSize", buf, 16, &found);
    if( PMI_SUCCESS != rc ) {
        OPAL_PMI_ERROR(rc, "PMI_Get_universe_size");
        goto err_exit;
    }

    pmix_usize = atoi(buf);

    OBJ_CONSTRUCT(&kv, opal_value_t);
    kv.key = strdup(OPAL_PMIX_UNIV_SIZE);
    kv.type = OPAL_UINT32;
    kv.data.uint32 = pmix_usize;
    if (OPAL_SUCCESS != (rc = opal_pmix_base_store(&OPAL_PROC_MY_NAME, &kv))) {
        OPAL_ERROR_LOG(rc);
        OBJ_DESTRUCT(&kv);
        goto err_exit;
    }
    OBJ_DESTRUCT(&kv);

    /* push this into the dstore for subsequent fetches */
    OBJ_CONSTRUCT(&kv, opal_value_t);
    kv.key = strdup(OPAL_PMIX_MAX_PROCS);
    kv.type = OPAL_UINT32;
    kv.data.uint32 = pmix_usize;
    if (OPAL_SUCCESS != (ret = opal_pmix_base_store(&name, &kv))) {
        OPAL_ERROR_LOG(ret);
        OBJ_DESTRUCT(&kv);
        goto err_exit;
    }
    OBJ_DESTRUCT(&kv);

    OBJ_CONSTRUCT(&kv, opal_value_t);
    kv.key = strdup(OPAL_PMIX_JOBID);
    kv.type = OPAL_UINT32;
    kv.data.uint32 = pmix_jobid;
    if (OPAL_SUCCESS != (ret = opal_pmix_base_store(&name, &kv))) {
        OPAL_ERROR_LOG(ret);
        OBJ_DESTRUCT(&kv);
        goto err_exit;
    }
    OBJ_DESTRUCT(&kv);

    /* save the local size */
    OBJ_CONSTRUCT(&kv, opal_value_t);
    kv.key = strdup(OPAL_PMIX_LOCAL_SIZE);
    kv.type = OPAL_UINT32;
    kv.data.uint32 = pmix_nlranks;
    if (OPAL_SUCCESS != (rc = opal_pmix_base_store(&name, &kv))) {
        OPAL_ERROR_LOG(rc);
        OBJ_DESTRUCT(&kv);
        goto err_exit;
    }
    OBJ_DESTRUCT(&kv);

    ldr.vpid = pmix_lranks[0];
    ldr.jobid = pmix_pname.jobid;

    /* find ourselves and build up a string for local peer info */
    memset(nmtmp, 0, 64);
    for (i=0; i < pmix_nlranks; i++) {
        ret = snprintf(nmtmp, 64, "%d", pmix_lranks[i]);
        opal_argv_append_nosize(&localranks, nmtmp);
        if (pmix_rank == pmix_lranks[i]) {
            pmix_lrank = i;
            pmix_nrank = i;
        }
    }

    str = opal_argv_join(localranks, ',');
    opal_argv_free(localranks);

    OBJ_CONSTRUCT(&kv, opal_value_t);
    kv.key = strdup(OPAL_PMIX_LOCAL_PEERS);
    kv.type = OPAL_STRING;
    kv.data.string = str;
    if (OPAL_SUCCESS != (ret = opal_pmix_base_store(&name, &kv))) {
        OPAL_ERROR_LOG(ret);
        OBJ_DESTRUCT(&kv);
        goto err_exit;
    }
    OBJ_DESTRUCT(&kv);

    /* save the local leader */
    OBJ_CONSTRUCT(&kv, opal_value_t);
    kv.key = strdup(OPAL_PMIX_LOCALLDR);
    kv.type = OPAL_UINT64;
    kv.data.uint64 = *(uint64_t*)&ldr;
    if (OPAL_SUCCESS != (ret = opal_pmix_base_store(&name, &kv))) {
        OPAL_ERROR_LOG(ret);
        OBJ_DESTRUCT(&kv);
        goto err_exit;
    }

    /* save our local rank */
    OBJ_CONSTRUCT(&kv, opal_value_t);
    kv.key = strdup(OPAL_PMIX_LOCAL_RANK);
    kv.type = OPAL_UINT16;
    kv.data.uint16 = pmix_lrank;
    if (OPAL_SUCCESS != (ret = opal_pmix_base_store(&OPAL_PROC_MY_NAME, &kv))) {
        OPAL_ERROR_LOG(ret);
        OBJ_DESTRUCT(&kv);
        goto err_exit;
    }

    /* and our node rank */
    OBJ_CONSTRUCT(&kv, opal_value_t);
    kv.key = strdup(OPAL_PMIX_NODE_RANK);
    kv.type = OPAL_UINT16;
    kv.data.uint16 = pmix_nrank;
    if (OPAL_SUCCESS != (ret = opal_pmix_base_store(&OPAL_PROC_MY_NAME, &kv))) {
        OPAL_ERROR_LOG(ret);
        OBJ_DESTRUCT(&kv);
        goto err_exit;
    }
    OBJ_DESTRUCT(&kv);

    cray_get_more_info();

    return OPAL_SUCCESS;
err_exit:
    PMI2_Finalize();
    return ret;
}

static int cray_fini(void) {

    if (0 == pmix_init_count) {
        return OPAL_SUCCESS;
    }

    if (0 == --pmix_init_count) {

        opal_output_verbose(10, opal_pmix_base_framework.framework_output,
                        "%s pmix:cray: calling PMI2_Finalize",
                        OPAL_NAME_PRINT(pmix_pname));

        PMI2_Finalize();

        if (NULL != pmix_kvs_name) {
            free(pmix_kvs_name);
            pmix_kvs_name = NULL;
        }

        if (NULL != pmix_lranks) {
            free(pmix_lranks);
            pmix_lranks = NULL;
        }
    }

    return OPAL_SUCCESS;
}

static int cray_initialized(void)
{
    if (0 < pmix_init_count) {
        return 1;
    }
    return 0;
}

static int cray_abort(int flag, const char *msg,
                      opal_list_t *procs)
{
    PMI2_Abort(flag, msg);
    return OPAL_SUCCESS;
}

static int cray_spawn(opal_list_t *jobinfo, opal_list_t *apps, opal_jobid_t *jobid)
{
    return OPAL_ERR_NOT_SUPPORTED;
}

static int cray_spawn_nb(opal_list_t *jobinfo, opal_list_t *apps,
                         opal_pmix_spawn_cbfunc_t cbfunc,
                         void *cbdata)
{
    return OPAL_ERR_NOT_SUPPORTED;
}

static int cray_job_connect(opal_list_t *procs)
{
    return OPAL_ERR_NOT_SUPPORTED;
}

static int cray_job_disconnect(opal_list_t *procs)
{
    return OPAL_ERR_NOT_SUPPORTED;
}

static int cray_job_disconnect_nb(opal_list_t *procs,
                                  opal_pmix_op_cbfunc_t cbfunc,
                                  void *cbdata)
{
    return OPAL_ERR_NOT_SUPPORTED;
}

static int cray_resolve_peers(const char *nodename,
                              opal_jobid_t jobid,
                              opal_list_t *procs)
{
    return OPAL_ERR_NOT_IMPLEMENTED;
}

static int cray_resolve_nodes(opal_jobid_t jobid, char **nodelist)
{
    return OPAL_ERR_NOT_IMPLEMENTED;
}

static int cray_put(opal_pmix_scope_t scope,
                  opal_value_t *kv)
{
    int rc;

    opal_output_verbose(10, opal_pmix_base_framework.framework_output,
                        "%s pmix:cray cray_put key %s scope %d\n",
                         OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), kv->key, scope);

    if (!pmix_init_count) {
        return OPAL_ERROR;
    }

    /*
     * for now just always just global cache
     */

    if (NULL == mca_pmix_cray_component.cache_global) {
        mca_pmix_cray_component.cache_global = OBJ_NEW(opal_buffer_t);
    }

    opal_output_verbose(20, opal_pmix_base_framework.framework_output,
                        "%s pmix:cray put global data for key %s type %d",
                         OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), kv->key, kv->type);
    if (OPAL_SUCCESS != (rc = opal_dss.pack(mca_pmix_cray_component.cache_global, &kv, 1, OPAL_VALUE))) {
        OPAL_PMI_ERROR(rc,"pmix:cray opal_dss.pack returned error");
        OPAL_ERROR_LOG(rc);
    }

    return rc;
}

static int cray_commit(void)
{
    return OPAL_SUCCESS;
}

static void fencenb(int sd, short args, void *cbdata)
{
    pmi_opcaddy_t *op = (pmi_opcaddy_t*)cbdata;
    int rc, cnt;
    int32_t i;
    int *all_lens = NULL;
    opal_value_t *kp, kvn;
    opal_buffer_t *send_buffer = NULL;
    opal_buffer_t *buf = NULL;
    void *sbuf_ptr;
    char *cptr, *rcv_buff = NULL;
    opal_process_name_t id;
    typedef struct {
        uint32_t pmix_rank;
        opal_process_name_t name;
        int32_t nbytes;
    } bytes_and_rank_t;
    int32_t rcv_nbytes_tot;
    bytes_and_rank_t s_bytes_and_rank;
    bytes_and_rank_t *r_bytes_and_ranks = NULL;
    opal_hwloc_locality_t locality;
    opal_list_t vals;
    char *cpuset = NULL;

    opal_output_verbose(2, opal_pmix_base_framework.framework_output,
                        "%s pmix:cray executing fence cache_global %p cache_local %p",
                        OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
                        (void *)mca_pmix_cray_component.cache_global,
                        (void *)mca_pmix_cray_component.cache_local);


    /*
     * "unload" the cache_local/cache_global buffers, first copy
     * it so we can continue to use the local buffers if further
     * calls to put can be made
     */

    send_buffer = OBJ_NEW(opal_buffer_t);
    if (NULL == send_buffer) {
        rc = OPAL_ERR_OUT_OF_RESOURCE;
        goto fn_exit;
    }

    opal_dss.copy_payload(send_buffer, mca_pmix_cray_component.cache_global);
    opal_dss.unload(send_buffer, &sbuf_ptr, &s_bytes_and_rank.nbytes);
    s_bytes_and_rank.pmix_rank = pmix_rank;
    s_bytes_and_rank.name = OPAL_PROC_MY_NAME;

    r_bytes_and_ranks = (bytes_and_rank_t *)malloc(pmix_size * sizeof(bytes_and_rank_t));
    if (NULL == r_bytes_and_ranks) {
        rc = OPAL_ERR_OUT_OF_RESOURCE;
        goto fn_exit;
    }

    /*
     * gather up all the buffer sizes and rank order.
     * doing this step below since the cray pmi PMI_Allgather doesn't deliver
     * the gathered data necessarily in PMI rank order, although the order stays
     * the same for the duration of a job - assuming no node failures.
     */

    if (PMI_SUCCESS != (rc = PMI_Allgather(&s_bytes_and_rank,r_bytes_and_ranks,sizeof(bytes_and_rank_t)))) {
        OPAL_PMI_ERROR(rc,"PMI_Allgather");
        rc = OPAL_ERR_COMM_FAILURE;
        goto fn_exit;
    }


    for (rcv_nbytes_tot=0,i=0; i < pmix_size; i++) {
        rcv_nbytes_tot += r_bytes_and_ranks[i].nbytes;
    }

    opal_output_verbose(20, opal_pmix_base_framework.framework_output,
                        "%s pmix:cray total number of bytes to receive %d",
                        OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), rcv_nbytes_tot);

    rcv_buff = (char *) malloc(rcv_nbytes_tot * sizeof(char));
    if (NULL == rcv_buff) {
        rc = OPAL_ERR_OUT_OF_RESOURCE;
        goto fn_exit;
    }

    all_lens = (int *)malloc(sizeof(int) * pmix_size);
    if (NULL == all_lens) {
        rc = OPAL_ERR_OUT_OF_RESOURCE;
        goto fn_exit;
    }
    for (i=0; i< pmix_size; i++) {
        all_lens[r_bytes_and_ranks[i].pmix_rank] = r_bytes_and_ranks[i].nbytes;
    }

    if (PMI_SUCCESS != (rc = PMI_Allgatherv(sbuf_ptr,s_bytes_and_rank.nbytes,rcv_buff,all_lens))) {
        OPAL_PMI_ERROR(rc,"PMI_Allgatherv");
        rc = OPAL_ERR_COMM_FAILURE;
        goto fn_exit;
    }

    OBJ_RELEASE(send_buffer);
    send_buffer  = NULL;

    buf = OBJ_NEW(opal_buffer_t);
    if (buf == NULL) {
        rc = OPAL_ERR_OUT_OF_RESOURCE;
        goto fn_exit;
    }

    for (cptr = rcv_buff, i=0; i < pmix_size; i++) {

        id = r_bytes_and_ranks[i].name;

        buf->base_ptr = NULL;  /* TODO: ugh */
        if (OPAL_SUCCESS != (rc = opal_dss.load(buf, (void *)cptr, r_bytes_and_ranks[i].nbytes))) {
            OPAL_PMI_ERROR(rc,"pmix:cray opal_dss.load failed");
            goto fn_exit;
        }

        /* unpack and stuff in to the dstore */
        cnt = 1;
        while (OPAL_SUCCESS == (rc = opal_dss.unpack(buf, &kp, &cnt, OPAL_VALUE))) {
            OPAL_OUTPUT_VERBOSE((20, opal_pmix_base_framework.framework_output,
                                 "%s pmix:cray unpacked kp with key %s type(%d) for id  %s",
                                 OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), kp->key, kp->type, OPAL_NAME_PRINT(id)));

            if (OPAL_SUCCESS != (rc = opal_pmix_base_store(&id, kp))) {
                OPAL_ERROR_LOG(rc);
                goto fn_exit;
            }
            OBJ_RELEASE(kp);
            cnt = 1;
        }

        cptr += r_bytes_and_ranks[i].nbytes;

    }

    buf->base_ptr = NULL;  /* TODO: ugh */
    OBJ_RELEASE(buf);

    opal_output_verbose(2, opal_pmix_base_framework.framework_output,
                        "%s pmix:cray kvs_fence complete",
                        OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));

    /* fetch my cpuset */
    OBJ_CONSTRUCT(&vals, opal_list_t);
    if (OPAL_SUCCESS == (rc = opal_pmix_base_fetch(&pmix_pname,
                                                   OPAL_PMIX_CPUSET, &vals))) {
        kp = (opal_value_t*)opal_list_get_first(&vals);
        cpuset = strdup(kp->data.string);
    } else {
        cpuset = NULL;
    }
    OPAL_LIST_DESTRUCT(&vals);

    /* Get the modex data from each local process and set the
     * localities to avoid having the MPI layer fetch data
     * for every process in the job.
     *
     *  we only need to set locality for each local rank as "not found"
     * equates to "non-local"
     */

    for (i=0; i < pmix_nlranks; i++) {
        id.vpid = pmix_lranks[i];
        id.jobid = pmix_jobid;
        OPAL_OUTPUT_VERBOSE((2, opal_pmix_base_framework.framework_output,
                             "%s checking out if %s is local to me",
                             OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
                             OPAL_NAME_PRINT(id)));
        /* fetch cpuset for this vpid */
        OBJ_CONSTRUCT(&vals, opal_list_t);
        if (OPAL_SUCCESS != (rc = opal_pmix_base_fetch(&id,
                                                    OPAL_PMIX_CPUSET, &vals))) {
            OPAL_OUTPUT_VERBOSE((2, opal_pmix_base_framework.framework_output,
                                 "%s cpuset for local proc %s not found",
                                 OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
                                 OPAL_NAME_PRINT(id)));
            OPAL_LIST_DESTRUCT(&vals);
            /* even though the cpuset wasn't found, we at least know it is
             * on the same node with us */
            locality = OPAL_PROC_ON_CLUSTER | OPAL_PROC_ON_CU | OPAL_PROC_ON_NODE;
        } else {
            kp = (opal_value_t*)opal_list_get_first(&vals);
            if (NULL == kp->data.string) {
                /* if we share a node, but we don't know anything more, then
                 * mark us as on the node as this is all we know
                 */
                locality = OPAL_PROC_ON_CLUSTER | OPAL_PROC_ON_CU | OPAL_PROC_ON_NODE;
            } else {
                /* determine relative location on our node */
                locality = opal_hwloc_base_get_relative_locality(opal_hwloc_topology,
                                                                 cpuset,
                                                                 kp->data.string);
            }
            OPAL_LIST_DESTRUCT(&vals);
        }
        OPAL_OUTPUT_VERBOSE((1, opal_pmix_base_framework.framework_output,
                             "%s pmix:cray proc %s locality %s",
                             OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
                             OPAL_NAME_PRINT(id),
                             opal_hwloc_base_print_locality(locality)));

        OBJ_CONSTRUCT(&kvn, opal_value_t);
        kvn.key = strdup(OPAL_PMIX_LOCALITY);
        kvn.type = OPAL_UINT16;
        kvn.data.uint16 = locality;
        opal_pmix_base_store(&id, &kvn);
        OBJ_DESTRUCT(&kvn);
    }

fn_exit:
    if (NULL != cpuset) {
        free(cpuset);
    }
    if (all_lens != NULL) {
        free(all_lens);
    }
    if (rcv_buff != NULL) {
        free(rcv_buff);
    }
    if (r_bytes_and_ranks != NULL) {
        free(r_bytes_and_ranks);
    }
    if (NULL != op->opcbfunc) {
        op->opcbfunc(rc, op->cbdata);
    }
    OBJ_RELEASE(op);
    return;
}

static void fence_release(int status, void *cbdata)
{
    struct fence_result *res = (struct fence_result*)cbdata;
    res->status = status;
    opal_atomic_wmb();
    res->flag = 0;
}

static int cray_fence(opal_list_t *procs, int collect_data)
{
    struct fence_result result = { 1, OPAL_SUCCESS };
    cray_fencenb(procs, collect_data, fence_release, (void*)&result);
    CRAY_WAIT_FOR_COMPLETION(result.flag);
    return result.status;
}


static int cray_fencenb(opal_list_t *procs, int collect_data,
                      opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
{
    pmi_opcaddy_t *op;

    /* thread-shift this so we don't block in Cray's barrier */
    op = OBJ_NEW(pmi_opcaddy_t);
    op->opcbfunc = cbfunc;
    op->cbdata = cbdata;
    event_assign(&op->ev, opal_pmix_base.evbase, -1,
                 EV_WRITE, fencenb, op);
    event_active(&op->ev, EV_WRITE, 1);

    return OPAL_SUCCESS;
}

static int cray_get(const opal_process_name_t *id, const char *key, opal_list_t *info, opal_value_t **kv)
{
    int rc;
    opal_list_t vals;

    OPAL_OUTPUT_VERBOSE((2, opal_pmix_base_framework.framework_output,
                         "%s pmix:cray getting value for proc %s key %s",
                         OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
                         OPAL_NAME_PRINT(*id), key));

    OBJ_CONSTRUCT(&vals, opal_list_t);
    rc = opal_pmix_base_fetch(id, key, &vals);
    if (OPAL_SUCCESS == rc) {
        *kv = (opal_value_t*)opal_list_remove_first(&vals);
        return OPAL_SUCCESS;
    } else {
        OPAL_OUTPUT_VERBOSE((2, opal_pmix_base_framework.framework_output,
                             "%s pmix:cray fetch from dstore failed: %d",
                             OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), rc));
    }
    OPAL_LIST_DESTRUCT(&vals);

    return rc;

}

static int cray_get_nb(const opal_process_name_t *id, const char *key,
                       opal_list_t *info, opal_pmix_value_cbfunc_t cbfunc, void *cbdata)
{
    return OPAL_ERR_NOT_IMPLEMENTED;
}

static int cray_publish(opal_list_t *info)
{
    return OPAL_ERR_NOT_SUPPORTED;
}

static int cray_publish_nb(opal_list_t *info,
                           opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
{
    return OPAL_ERR_NOT_SUPPORTED;
}

static int cray_lookup(opal_list_t *data, opal_list_t *info)
{
    return OPAL_ERR_NOT_SUPPORTED;
}

static int cray_lookup_nb(char **keys, opal_list_t *info,
                          opal_pmix_lookup_cbfunc_t cbfunc, void *cbdata)
{
    return OPAL_ERR_NOT_SUPPORTED;
}

static int cray_unpublish(char **keys, opal_list_t *info)
{
    return OPAL_ERR_NOT_SUPPORTED;
}

static int cray_unpublish_nb(char **keys, opal_list_t *info,
                            opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
{
    return OPAL_ERR_NOT_SUPPORTED;
}

static const char *cray_get_version(void)
{
    return cray_pmi_version;
}

static int cray_store_local(const opal_process_name_t *proc,
                          opal_value_t *val)
{
    opal_pmix_base_store(proc, val);

    return OPAL_SUCCESS;
}

static const char *cray_get_nspace(opal_jobid_t jobid)
{
    return "N/A";
}

static void cray_register_jobid(opal_jobid_t jobid, const char *nspace)
{
    return;
}

static char* pmix_error(int pmix_err)
{
    char * err_msg;

    switch(pmix_err) {
        case PMI_FAIL: err_msg = "Operation failed"; break;
        case PMI_ERR_INIT: err_msg = "PMI is not initialized"; break;
        case PMI_ERR_NOMEM: err_msg = "Input buffer not large enough"; break;
        case PMI_ERR_INVALID_ARG: err_msg = "Invalid argument"; break;
        case PMI_ERR_INVALID_KEY: err_msg = "Invalid key argument"; break;
        case PMI_ERR_INVALID_KEY_LENGTH: err_msg = "Invalid key length argument"; break;
        case PMI_ERR_INVALID_VAL: err_msg = "Invalid value argument"; break;
        case PMI_ERR_INVALID_VAL_LENGTH: err_msg = "Invalid value length argument"; break;
        case PMI_ERR_INVALID_LENGTH: err_msg = "Invalid length argument"; break;
        case PMI_ERR_INVALID_NUM_ARGS: err_msg = "Invalid number of arguments"; break;
        case PMI_ERR_INVALID_ARGS: err_msg = "Invalid args argument"; break;
        case PMI_ERR_INVALID_NUM_PARSED: err_msg = "Invalid num_parsed length argument"; break;
        case PMI_ERR_INVALID_KEYVALP: err_msg = "Invalid keyvalp argument"; break;
        case PMI_ERR_INVALID_SIZE: err_msg = "Invalid size argument"; break;
#if defined(PMI_ERR_INVALID_KVS)
        /* pmi.h calls this a valid return code but mpich doesn't define it (slurm does). */
        case PMI_ERR_INVALID_KVS: err_msg = "Invalid kvs argument"; break;
#endif
        case PMI_SUCCESS: err_msg = "Success"; break;
        default: err_msg = "Unkown error";
    }
    return err_msg;
}
