#include /* the blocking implementation of recv_accumulate is pretty straightforward */ int MPIX_Recv_accumulate(void * buf, int count, MPI_Datatype datatype, MPI_Op op, int source, int tag, MPI_Comm comm, MPI_Status * status) { if (op!=MPI_REPLACE) { void * tmp; int rcount, typesize; MPI_Type_size(datatype, &typesize); MPI_Alloc_mem(count*typesize, MPI_INFO_NULL, &tmp); MPI_Recv(tmp, count, datatype, source, tag, comm, status); MPI_Get_count(status, datatype, &rcount); MPI_Reduce_local(tmp, buf, rcountirequest), &flag, &istatus); if (flag) { /* if the irecv is done, we can do the accumulate */ int rcount; MPI_Get_count(&istatus, info->datatype, &rcount); MPI_Reduce_local(info->tmp, info->buf, rcount<(info->count) ? rcount : (info->count), info->datatype, info->op); MPI_Free_mem(info->tmp); /* set the status object appropriately */ MPI_Status_set_elements(status, info->datatype, rcount); MPI_Status_set_cancelled(status, 0); status->MPI_SOURCE = info->source; status->MPI_TAG = info->tag; status->MPI_ERROR = MPI_SUCCESS; /* assume MPI_ERRORS_ARE_FATAL is used (otherwise everything here needs error-checking */ /* complete the generalized request */ MPI_Grequest_complete( *(info->request) ); } return MPI_SUCCESS; } int free_fn(void * extra_state) { MPI_Free_mem(extra_state); return MPI_SUCCESS; } /* I am really unsure about the implementation of cancel on grequests */ int cancel_fn(void * extra_state, int complete) { if (!complete) { irecv_accumulate_info_t * info = (irecv_accumulate_info_t *) extra_state; int flag; MPI_Status istatus; MPI_Test( &(info->irequest), &flag, &istatus); if (!flag) { /* if the irecv did not complete already, then cancel it */ MPI_Cancel( &(info->irequest) ); } /* free the buffer without doing anything to the data */ MPI_Free_mem(info->tmp); } /* can I return success if the request is complete when it is cancelled? */ return MPI_SUCCESS; } int MPIX_Irecv_accumulate(void * buf, int count, MPI_Datatype datatype, MPI_Op op, int source, int tag, MPI_Comm comm, MPI_Request * request) { if (op!=MPI_REPLACE) { /* allocate the extra state for the grequest */ irecv_accumulate_info_t * info; MPI_Alloc_mem( sizeof(irecv_accumulate_info_t), MPI_INFO_NULL, &info ); /* initiate the irecv */ int typesize; MPI_Type_size(datatype, &typesize); MPI_Alloc_mem(count*typesize, MPI_INFO_NULL, &(info->tmp) ); MPI_Irecv(info->tmp, count, datatype, source, tag, comm, &(info->irequest) ); /* copy all the metadata for the irecv and subsequent accumulate into the info struct */ info->buf = buf; info->count = count; info->datatype = datatype; info->op = op; info->source = source; info->tag = tag; info->request = request; /* start the grequest */ MPI_Grequest_start(&query_fn, &free_fn, &cancel_fn, info, request); } else { /* MPI_REPLACE is equivalent to a regular irecv... */ MPI_Irecv(buf, count, datatype, source, tag, comm, request); } return MPI_SUCCESS; }