/*
 * Copyright (c) 2013      Mellanox Technologies, Inc.
 *                         All rights reserved.
 * $COPYRIGHT$
 *
 * Additional copyrights may follow
 *
 * $HEADER$
 */

#include "oshmem_config.h"
#include <stdio.h>
#include <stdlib.h>

#include "opal/util/bit_ops.h"

#include "oshmem/constants.h"
#include "oshmem/op/op.h"
#include "oshmem/mca/spml/spml.h"
#include "oshmem/mca/scoll/scoll.h"
#include "oshmem/mca/scoll/base/base.h"
#include "scoll_basic.h"

static int _algorithm_central_counter(struct oshmem_group_t *group,
                                       struct oshmem_op_t *op,
                                       void *target,
                                       const void *source,
                                       size_t nlong,
                                       long *pSync,
                                       void *pWrk);
static int _algorithm_tournament(struct oshmem_group_t *group,
                                  struct oshmem_op_t *op,
                                  void *target,
                                  const void *source,
                                  size_t nlong,
                                  long *pSync,
                                  void *pWrk);
static int _algorithm_recursive_doubling(struct oshmem_group_t *group,
                                          struct oshmem_op_t *op,
                                          void *target,
                                          const void *source,
                                          size_t nlong,
                                          long *pSync,
                                          void *pWrk);
static int _algorithm_linear(struct oshmem_group_t *group,
                              struct oshmem_op_t *op,
                              void *target,
                              const void *source,
                              size_t nlong,
                              long *pSync,
                              void *pWrk);
static int _algorithm_log(struct oshmem_group_t *group,
                           struct oshmem_op_t *op,
                           void *target,
                           const void *source,
                           size_t nlong,
                           long *pSync,
                           void *pWrk);

int mca_scoll_basic_reduce(struct oshmem_group_t *group,
                           struct oshmem_op_t *op,
                           void *target,
                           const void *source,
                           size_t nlong,
                           long *pSync,
                           void *pWrk,
                           int alg)
{
    int rc = OSHMEM_SUCCESS;

    /* Arguments validation */
    if (!group) {
        SCOLL_ERROR("Active set (group) of PE is not defined");
        rc = OSHMEM_ERR_BAD_PARAM;
    }

    /* Check if this PE is part of the group */
    if ((rc == OSHMEM_SUCCESS) && oshmem_proc_group_is_member(group)) {
        int i = 0;

        /* Do nothing on zero-length request */
        if (OPAL_UNLIKELY(!nlong)) {
            return OSHMEM_SUCCESS;
        }

        if (pSync) {
            alg = (alg == SCOLL_DEFAULT_ALG ?
                    mca_scoll_basic_param_reduce_algorithm : alg);
            switch (alg) {
            case SCOLL_ALG_REDUCE_CENTRAL_COUNTER:
                {
                    rc = _algorithm_central_counter(group,
                                                     op,
                                                     target,
                                                     source,
                                                     nlong,
                                                     pSync,
                                                     pWrk);
                    break;
                }
            case SCOLL_ALG_REDUCE_TOURNAMENT:
                {
                    rc = _algorithm_tournament(group,
                                                op,
                                                target,
                                                source,
                                                nlong,
                                                pSync,
                                                pWrk);
                    break;
                }
            case SCOLL_ALG_REDUCE_RECURSIVE_DOUBLING:
                {
                    rc = _algorithm_recursive_doubling(group,
                                                        op,
                                                        target,
                                                        source,
                                                        nlong,
                                                        pSync,
                                                        pWrk);
                    break;
                }
            case SCOLL_ALG_REDUCE_LEGACY_LINEAR:
                {
                    rc = _algorithm_linear(group,
                                            op,
                                            target,
                                            source,
                                            nlong,
                                            pSync,
                                            pWrk);
                    break;
                }
            case SCOLL_ALG_REDUCE_LEGACY_LOG:
                {
                    rc = _algorithm_log(group,
                                         op,
                                         target,
                                         source,
                                         nlong,
                                         pSync,
                                         pWrk);
                    break;
                }
            default:
                {
                    rc = _algorithm_central_counter(group,
                                                     op,
                                                     target,
                                                     source,
                                                     nlong,
                                                     pSync,
                                                     pWrk);
                }
            }
        } else {
            SCOLL_ERROR("Incorrect argument pSync");
            rc = OSHMEM_ERR_BAD_PARAM;
        }

        /* Restore initial values */
        SCOLL_VERBOSE(12,
                      "PE#%d Restore special synchronization array",
                      group->my_pe);
        for (i = 0; pSync && (i < _SHMEM_REDUCE_SYNC_SIZE); i++) {
            pSync[i] = _SHMEM_SYNC_VALUE;
        }
    }

    return rc;
}

/*
 This algorithm is quite simple and straightforward for PEs with identical data size.
 One node gathers data from peers and send final result to them.
 Outlay:
 NP-1 competing network transfers are needed.
 */
static int _algorithm_central_counter(struct oshmem_group_t *group,
                                       struct oshmem_op_t *op,
                                       void *target,
                                       const void *source,
                                       size_t nlong,
                                       long *pSync,
                                       void *pWrk)
{
    int rc = OSHMEM_SUCCESS;
    int i = 0;
    int PE_root = oshmem_proc_pe(group->proc_array[0]);

    SCOLL_VERBOSE(12, "[#%d] Reduce algorithm: Central Counter", group->my_pe);

    if (PE_root == group->my_pe) {
        int pe_cur = 0;
        void *target_cur = NULL;

        target_cur = malloc(nlong);
        if (target_cur) {
            memcpy(target, (void *) source, nlong);

            SCOLL_VERBOSE(14,
                          "[#%d] Gather data from all PEs in the group",
                          group->my_pe);
            for (i = 0; (i < group->proc_count) && (rc == OSHMEM_SUCCESS);
                    i++) {
                /* Get PE ID of a peer from the group */
                pe_cur = oshmem_proc_pe(group->proc_array[i]);

                if (pe_cur == group->my_pe)
                    continue;

                SCOLL_VERBOSE(14,
                              "[#%d] Gather data (%d bytes) from #%d",
                              group->my_pe, (int)nlong, pe_cur);

                /* Clean up temporary buffer */
                memset(target_cur, 0, nlong);

                /* Get data from the current peer */
                rc = MCA_SPML_CALL(get(oshmem_ctx_default, (void *)source, nlong, target_cur, pe_cur));

                /* Do reduction operation */
                if (rc == OSHMEM_SUCCESS) {
                    op->o_func.c_fn(target_cur, target, nlong / op->dt_size);
                }
            }

            free(target_cur);
        } else {
            rc = OSHMEM_ERR_OUT_OF_RESOURCE;
        }
    }

    /* Send result to all PE in group */
    if (rc == OSHMEM_SUCCESS) {
        SCOLL_VERBOSE(14,
                      "[#%d] Broadcast from the root #%d",
                      group->my_pe, PE_root);
        rc = BCAST_FUNC(group,
                PE_root,
                target,
                target,
                nlong,
                (pSync + 1),
                true,
                SCOLL_DEFAULT_ALG);
    }

    return rc;
}

static int _algorithm_tournament(struct oshmem_group_t *group,
                                  struct oshmem_op_t *op,
                                  void *target,
                                  const void *source,
                                  size_t nlong,
                                  long *pSync,
                                  void *pWrk)
{
    int rc = OSHMEM_SUCCESS;
    int round = 0;
    int exit_flag = group->proc_count - 1;
    long value = SHMEM_SYNC_INIT;
    int my_id = oshmem_proc_group_find_id(group, group->my_pe);
    int peer_id = 0;
    int peer_pe = 0;
    void *target_cur = NULL;
    int PE_root = oshmem_proc_pe(group->proc_array[0]);

    SCOLL_VERBOSE(12, "[#%d] Reduce algorithm: Tournament", group->my_pe);
    SCOLL_VERBOSE(15, "[#%d] pSync[0] = %ld", group->my_pe, pSync[0]);

    /* Set current state as WAIT */
    pSync[0] = SHMEM_SYNC_WAIT;

    target_cur = malloc(nlong);
    if (target_cur) {
        memcpy(target_cur, (void *) source, nlong);
    } else {
        return OSHMEM_ERR_OUT_OF_RESOURCE;
    }

    while (exit_flag && (rc == OSHMEM_SUCCESS)) {
        /* Define a peer for competition */
        peer_id = my_id ^ (1 << round);

        /* Update exit condition and round counter */
        exit_flag >>= 1;
        round++;

        /* Do not have peer for tournament */
        if (peer_id >= group->proc_count)
            continue;

        if (my_id < peer_id) {
            pSync[0] = peer_id;
            value = my_id;

            SCOLL_VERBOSE(14, "[#%d] round = %d wait", group->my_pe, round);
            rc = MCA_SPML_CALL(wait((void*)pSync, SHMEM_CMP_EQ, (void*)&value, SHMEM_LONG));

            /* Do reduction operation */
            if (rc == OSHMEM_SUCCESS) {
                op->o_func.c_fn(target, target_cur, nlong / op->dt_size);
            }
        } else {
            peer_pe = oshmem_proc_pe(group->proc_array[peer_id]);

#if 1 /* It is ugly implementation of compare and swap operation
         Usage of this hack does not give performance improvement but
         it is expected that shmem_long_cswap() will make it faster.
       */
            do {
                MCA_SPML_CALL(get(oshmem_ctx_default, (void*)pSync, sizeof(value), (void*)&value, peer_pe));
            } while (value != my_id);

            SCOLL_VERBOSE(14,
                          "[#%d] round = %d send data to #%d",
                          group->my_pe, round, peer_pe);
            rc = MCA_SPML_CALL(put(oshmem_ctx_default, target, nlong, target_cur, peer_pe));

            MCA_SPML_CALL(fence(oshmem_ctx_default));

            SCOLL_VERBOSE(14,
                          "[#%d] round = %d signals to #%d",
                          group->my_pe, round, peer_pe);
            value = peer_id;
            rc = MCA_SPML_CALL(put(oshmem_ctx_default, (void*)pSync, sizeof(value), (void*)&value, peer_pe));
#endif
            SCOLL_VERBOSE(14, "[#%d] round = %d wait", group->my_pe, round);
            value = SHMEM_SYNC_RUN;
            rc = MCA_SPML_CALL(wait((void*)pSync, SHMEM_CMP_EQ, (void*)&value, SHMEM_LONG));

            break;
        }
    }

    /* Send result to all PE in group */
    if ((my_id == 0) && (rc == OSHMEM_SUCCESS)) {
        SCOLL_VERBOSE(14, "[#%d] signals to all", group->my_pe);

        memcpy(target, target_cur, nlong);

        value = SHMEM_SYNC_RUN;
        for (peer_id = 1;
                (peer_id < group->proc_count) && (rc == OSHMEM_SUCCESS);
                peer_id++) {
            peer_pe = oshmem_proc_pe(group->proc_array[peer_id]);
            rc = MCA_SPML_CALL(put(oshmem_ctx_default, (void*)pSync, sizeof(value), (void*)&value, peer_pe));
        }
    }

    /* Send result to all PE in group */
    if (rc == OSHMEM_SUCCESS) {
        SCOLL_VERBOSE(14,
                      "[#%d] Broadcast from the root #%d",
                      group->my_pe, PE_root);
        rc = BCAST_FUNC(group,
                PE_root,
                target,
                target,
                nlong,
                (pSync + 1),
                true,
                SCOLL_DEFAULT_ALG);
    }

    free(target_cur);

    SCOLL_VERBOSE(15, "[#%d] pSync[0] = %ld", group->my_pe, pSync[0]);

    return rc;
}

static int _algorithm_recursive_doubling(struct oshmem_group_t *group,
                                          struct oshmem_op_t *op,
                                          void *target,
                                          const void *source,
                                          size_t nlong,
                                          long *pSync,
                                          void *pWrk)
{
    int rc = OSHMEM_SUCCESS;
    int round = 0;
    int floor2_proc = 0;
    int exit_flag = 0;
    long value = SHMEM_SYNC_INIT;
    void *target_cur = NULL;
    int my_id = oshmem_proc_group_find_id(group, group->my_pe);
    int peer_id = 0;
    int peer_pe = 0;
    int i = 0;

    floor2_proc = 1;
    i = group->proc_count;
    i >>= 1;
    while (i) {
        i >>= 1;
        floor2_proc <<= 1;
    }

    target_cur = malloc(nlong);
    if (target_cur) {
        memcpy(target_cur, (void *) source, nlong);
    } else {
        return OSHMEM_ERR_OUT_OF_RESOURCE;
    }

    SCOLL_VERBOSE(12,
                  "[#%d] Reduce algorithm: Recursive Doubling",
                  group->my_pe);
    SCOLL_VERBOSE(15,
                  "[#%d] pSync[0] = %ld floor2_proc = %d",
                  group->my_pe, pSync[0], floor2_proc);

    if (my_id >= floor2_proc) {
        /* I am in extra group, my partner is node (my_id-y) in basic group */
        peer_id = my_id - floor2_proc;
        peer_pe = oshmem_proc_pe(group->proc_array[peer_id]);

        /* Special procedure is needed in case target and source are the same */
        if (source == target) {
            SCOLL_VERBOSE(14,
                          "[#%d] wait for peer #%d is ready",
                          group->my_pe, peer_pe);
            value = SHMEM_SYNC_WAIT;
            rc = MCA_SPML_CALL(wait((void*)pSync, SHMEM_CMP_EQ, (void*)&value, SHMEM_LONG));
        }

        SCOLL_VERBOSE(14,
                      "[#%d] is extra send data to #%d",
                      group->my_pe, peer_pe);
        rc = MCA_SPML_CALL(put(oshmem_ctx_default, target, nlong, target_cur, peer_pe));

        MCA_SPML_CALL(fence(oshmem_ctx_default));

        SCOLL_VERBOSE(14,
                      "[#%d] is extra and signal to #%d",
                      group->my_pe, peer_pe);
        value = SHMEM_SYNC_RUN;
        rc = MCA_SPML_CALL(put(oshmem_ctx_default, (void*)pSync, sizeof(value), (void*)&value, peer_pe));

        SCOLL_VERBOSE(14, "[#%d] wait", group->my_pe);
        value = SHMEM_SYNC_RUN;
        rc = MCA_SPML_CALL(wait((void*)pSync, SHMEM_CMP_EQ, (void*)&value, SHMEM_LONG));
    } else {
        /* Wait for a peer from extra group */
        if ((group->proc_count - floor2_proc) > my_id) {
            /* I am in basic group, my partner is node (my_id+y) in extra group */
            peer_id = my_id + floor2_proc;
            peer_pe = oshmem_proc_pe(group->proc_array[peer_id]);

            /* Special procedure is needed in case target and source are the same */
            if (source == target) {
                SCOLL_VERBOSE(14,
                              "[#%d] signal to #%d that I am ready",
                              group->my_pe, peer_pe);
                value = SHMEM_SYNC_WAIT;
                rc = MCA_SPML_CALL(put(oshmem_ctx_default, (void*)pSync, sizeof(value), (void*)&value, peer_pe));
            }

            SCOLL_VERBOSE(14,
                          "[#%d] wait a signal from #%d",
                          group->my_pe, peer_pe);
            value = SHMEM_SYNC_RUN;
            rc = MCA_SPML_CALL(wait((void*)pSync, SHMEM_CMP_EQ, (void*)&value, SHMEM_LONG));

            /* Do reduction operation */
            if (rc == OSHMEM_SUCCESS) {
                op->o_func.c_fn(target, target_cur, nlong / op->dt_size);
            }
        }

        /* Pairwise exchange  */
        exit_flag = floor2_proc - 1;
        pSync[0] = round;
        while (exit_flag && (rc == OSHMEM_SUCCESS)) {
            /* Define a peer for competition */
            peer_id = my_id ^ (1 << round);

            /* Update exit condition and round counter */
            exit_flag >>= 1;
            round++;

            peer_pe = oshmem_proc_pe(group->proc_array[peer_id]);

#if 1 /* It is ugly implementation of compare and swap operation
         Usage of this hack does not give performance improvement but
         it is expected that shmem_long_cswap() will make it faster.
       */
            do {
                MCA_SPML_CALL(get(oshmem_ctx_default, (void*)pSync, sizeof(value), (void*)&value, peer_pe));
            } while (value != (round - 1));

            SCOLL_VERBOSE(14,
                          "[#%d] round = %d send data to #%d",
                          group->my_pe, round, peer_pe);
            rc = MCA_SPML_CALL(put(oshmem_ctx_default, target, nlong, target_cur, peer_pe));

            MCA_SPML_CALL(fence(oshmem_ctx_default));

            SCOLL_VERBOSE(14,
                          "[#%d] round = %d signals to #%d",
                          group->my_pe, round, peer_pe);
            value = SHMEM_SYNC_RUN;
            rc = MCA_SPML_CALL(put(oshmem_ctx_default, (void*)pSync, sizeof(value), (void*)&value, peer_pe));
#endif

            SCOLL_VERBOSE(14, "[#%d] round = %d wait", group->my_pe, round);
            value = SHMEM_SYNC_RUN;
            rc = MCA_SPML_CALL(wait((void*)pSync, SHMEM_CMP_EQ, (void*)&value, SHMEM_LONG));

            /* Do reduction operation */
            if (rc == OSHMEM_SUCCESS) {
                op->o_func.c_fn(target, target_cur, nlong / op->dt_size);
            }

            pSync[0] = round;
        }

        memcpy(target, target_cur, nlong);

        /* Notify a peer from extra group */
        if ((group->proc_count - floor2_proc) > my_id) {
            /* I am in basic group, my partner is node (my_id+y) in extra group */
            peer_id = my_id + floor2_proc;
            peer_pe = oshmem_proc_pe(group->proc_array[peer_id]);

            SCOLL_VERBOSE(14,
                          "[#%d] is extra send data to #%d",
                          group->my_pe, peer_pe);
            rc = MCA_SPML_CALL(put(oshmem_ctx_default, target, nlong, target_cur, peer_pe));

            MCA_SPML_CALL(fence(oshmem_ctx_default));

            SCOLL_VERBOSE(14, "[#%d] signals to #%d", group->my_pe, peer_pe);
            value = SHMEM_SYNC_RUN;
            rc = MCA_SPML_CALL(put(oshmem_ctx_default, (void*)pSync, sizeof(value), (void*)&value, peer_pe));
        }
    }

    free(target_cur);

    SCOLL_VERBOSE(15, "[#%d] pSync[0] = %ld", group->my_pe, pSync[0]);

    return rc;
}

static int _algorithm_linear(struct oshmem_group_t *group,
                              struct oshmem_op_t *op,
                              void *target,
                              const void *source,
                              size_t nlong,
                              long *pSync,
                              void *pWrk)
{
    int rc = OSHMEM_SUCCESS;
    int i, rank, size;
    char *free_buffer = NULL;
    char *pml_buffer = NULL;
    char *inbuf;
    int peer_id = 0;
    int peer_pe = 0;

    /* Initialize */
    rank = group->my_pe;
    size = group->proc_count;
    int root_id = size - 1;
    int root_pe = oshmem_proc_pe(group->proc_array[root_id]);

    SCOLL_VERBOSE(12, "[#%d] Reduce algorithm: Basic", group->my_pe);

    /* If not root, send data to the root. */

    if (rank != root_pe) {
        rc = MCA_SPML_CALL(send((void*)source, nlong, root_pe, MCA_SPML_BASE_PUT_STANDARD));
    } else {

        /* for reducing buffer allocation lengths.... */

        if (size > 1) {
            free_buffer = (char*) malloc(nlong);
            if (NULL == free_buffer) {
                return OSHMEM_ERR_OUT_OF_RESOURCE;
            }
            pml_buffer = free_buffer;
        }

        /* Initialize the receive buffer. */

        if (root_id == (size - 1)) {
            memcpy(target, (void *) source, nlong);
        } else {
            peer_id = size - 1;
            peer_pe = oshmem_proc_pe(group->proc_array[peer_id]);
            rc = MCA_SPML_CALL(recv(target, nlong, peer_pe));
        }
        if (OSHMEM_SUCCESS != rc) {
            if (NULL != free_buffer) {
                free(free_buffer);
            }
            return rc;
        }

        /* Loop receiving and calling reduction function (C or Fortran). */

        for (i = size - 2; i >= 0; --i) {
            if (root_id == i) {
                inbuf = (char*) source;
            } else {
                peer_id = i;
                peer_pe = oshmem_proc_pe(group->proc_array[peer_id]);
                rc = MCA_SPML_CALL(recv(pml_buffer, nlong, peer_pe));
                if (OSHMEM_SUCCESS != rc) {
                    if (NULL != free_buffer) {
                        free(free_buffer);
                    }
                    return rc;
                }

                inbuf = pml_buffer;
            }

            /* Perform the reduction */
            op->o_func.c_fn(inbuf, target, nlong / op->dt_size);
        }

        if (NULL != free_buffer) {
            free(free_buffer);
        }
    }

    /* Send result to all PE in group */
    if (rc == OSHMEM_SUCCESS) {
        SCOLL_VERBOSE(14,
                      "[#%d] Broadcast from the root #%d",
                      group->my_pe, root_pe);
        rc = BCAST_FUNC(group,
                root_pe,
                target,
                target,
                nlong,
                (pSync + 1),
                true,
                SCOLL_DEFAULT_ALG);
    }

    /* All done */
    return rc;
}

static int _algorithm_log(struct oshmem_group_t *group,
                           struct oshmem_op_t *op,
                           void *target,
                           const void *source,
                           size_t nlong,
                           long *pSync,
                           void *pWrk)
{
    int rc = OSHMEM_SUCCESS;
    int i, size, rank, vrank;
    int mask;
    void *sbuf = (void*) source;
    void *rbuf = target;
    char *free_buffer = NULL;
    char *free_rbuf = NULL;
    char *pml_buffer = NULL;
    char *snd_buffer = NULL;
    char *rcv_buffer = (char*) rbuf;
    int my_id = oshmem_proc_group_find_id(group, group->my_pe);
    int peer_id = 0;
    int peer_pe = 0;
    int root_id = 0;
    int root_pe = oshmem_proc_pe(group->proc_array[root_id]);
    int dim = 0;

    /* Initialize */
    rank = group->my_pe;
    size = group->proc_count;
    dim = opal_cube_dim(group->proc_count);
    vrank = (my_id + size - root_id) % size;

    SCOLL_VERBOSE(12, "[#%d] Reduce algorithm: Log", rank);

    /* Allocate the incoming and resulting message buffers.  See lengthy
     * rationale above. */

    free_buffer = (char*) malloc(nlong);
    if (NULL == free_buffer) {
        return OSHMEM_ERR_OUT_OF_RESOURCE;
    }

    pml_buffer = free_buffer;
    rcv_buffer = pml_buffer;

    /* Allocate sendbuf in case the MPI_IN_PLACE option has been used. See lengthy
     * rationale above. */

    snd_buffer = (char*) sbuf;

    if (my_id != root_id && 0 == (vrank & 1)) {
        /* root is the only one required to provide a valid rbuf.
         * Assume rbuf is invalid for all other ranks, so fix it up
         * here to be valid on all non-leaf ranks */
        free_rbuf = (char*) malloc(nlong);
        if (NULL == free_rbuf) {
            rc = OSHMEM_ERR_OUT_OF_RESOURCE;
            goto cleanup_and_return;
        }
        rbuf = free_rbuf;
    }

    /* Loop over cube dimensions. High processes send to low ones in the
     * dimension. */

    for (i = 0, mask = 1; i < dim; ++i, mask <<= 1) {

        /* A high-proc sends to low-proc and stops. */
        if (vrank & mask) {
            peer_id = vrank & ~mask;
            peer_id = (peer_id + root_id) % size;
            peer_pe = oshmem_proc_pe(group->proc_array[peer_id]);

            rc = MCA_SPML_CALL(send((void*)snd_buffer, nlong, peer_pe, MCA_SPML_BASE_PUT_STANDARD));
            if (OSHMEM_SUCCESS != rc) {
                goto cleanup_and_return;
            }
            snd_buffer = (char*) rbuf;
            break;
        }

        /* A low-proc receives, reduces, and moves to a higher
         * dimension. */

        else {
            peer_id = vrank | mask;
            if (peer_id >= size) {
                continue;
            }
            peer_id = (peer_id + root_id) % size;
            peer_pe = oshmem_proc_pe(group->proc_array[peer_id]);

            /* Most of the time (all except the first one for commutative
             * operations) we receive in the user provided buffer
             * (rbuf). But the exception is here to allow us to dont have
             * to copy from the sbuf to a temporary location. If the
             * operation is commutative we dont care in which order we
             * apply the operation, so for the first time we can receive
             * the data in the pml_buffer and then apply to operation
             * between this buffer and the user provided data. */

            rc = MCA_SPML_CALL(recv(rcv_buffer, nlong, peer_pe));
            if (OSHMEM_SUCCESS != rc) {
                goto cleanup_and_return;
            }
            /* Perform the operation. The target is always the user
             * provided buffer We do the operation only if we receive it
             * not in the user buffer */
            if (snd_buffer != sbuf) {
                /* the target buffer is the locally allocated one */
                op->o_func.c_fn(rcv_buffer, pml_buffer, nlong / op->dt_size);
            } else {
                /* If we're commutative, we don't care about the order of
                 * operations and we can just reduce the operations now.
                 * If we are not commutative, we have to copy the send
                 * buffer into a temp buffer (pml_buffer) and then reduce
                 * what we just received against it. */
                {
                    op->o_func.c_fn(sbuf, pml_buffer, nlong / op->dt_size);
                }
                /* now we have to send the buffer containing the computed data */
                snd_buffer = pml_buffer;
                /* starting from now we always receive in the user
                 * provided buffer */
                rcv_buffer = (char*) rbuf;
            }
        }
    }

    /* Get the result to the root if needed. */
    rc = OSHMEM_SUCCESS;
    if (0 == vrank) {
        if (root_id == my_id) {
            memcpy(rbuf, snd_buffer, nlong);
        } else {
            rc = MCA_SPML_CALL(send((void*)snd_buffer, nlong, root_pe, MCA_SPML_BASE_PUT_STANDARD));
        }
    } else if (my_id == root_id) {
        rc = MCA_SPML_CALL(recv(rcv_buffer, nlong, root_pe));
        if (rcv_buffer != rbuf) {
            op->o_func.c_fn(rcv_buffer, rbuf, nlong / op->dt_size);
        }
    }

    cleanup_and_return: if (NULL != free_buffer) {
        free(free_buffer);
    }
    if (NULL != free_rbuf) {
        free(free_rbuf);
    }

    /* Send result to all PE in group */
    if (rc == OSHMEM_SUCCESS) {
        SCOLL_VERBOSE(14,
                      "[#%d] Broadcast from the root #%d",
                      rank, root_pe);
        rc = BCAST_FUNC(group,
                root_pe,
                target,
                target,
                nlong,
                (pSync + 1),
                true,
                SCOLL_DEFAULT_ALG);
    }

    /* All done */
    return rc;
}
