[Mpi3-ft] MPI FT Scenario #2: Master-Leader-Workers

Erez Haba erezh at MICROSOFT.com
Tue Apr 21 19:59:24 CDT 2009


Hi all, here's the draft code for scenario 2, please take a look.
(I still need to beef up the text, but the entire code and concepts are there)


Scenario 2: Master-Leader-Workers

New MPI API's in this scenario

MPI_Is_restored_rank(int* restored);
Indicates if this rank has been restored or is the first start of this process. Can be defined as a generation counter instead of a flag, however for this scenario only the flag version was needed.

MPI_Comm_rejoin(const char* name, MPI_Comm* newcomm);
This interface recreates the communicator context from a named saved context. It does not restore error handles or any of the attributes (possibly including the comm name). I does restore all the necessary context for the MPI library to continue and work with this communicator such as, contextid, comm rank to world rank mapping and other information that is impl dependent.

MPI_Comm_save(MPI_Comm comm, const char* name);
This interface saves the communicator context under specific name. it is a collective operation and all ranks should call this interface with the same name and communicator. The reason to make this a collective operation is logical and practical. However from a theoretical pov it does not have to be a collective call. The reasons are, (1) all ranks have to save the communicator context to be able to restore if one fails, saving by only some ranks in the comm has no meaning. (2) a lot of the data is shared between the ranks (like the rank mapping) saving a single object scales much better than saving an object per rank. (3) different objects would possibly require different name for each rank which seems like a complication of the programming model.


Description

Helper code
This code is used but the master, leader and worker to establish the group communicator. The master has to participate as it is part of the initial split of comm world.

const int x_group_size = 10;

MPI_Comm get_workers_group_comm()
{
    MPI_Comm comm;
    int restored = 0;
    MPI_Is_restored_rank(&restored);

    if(restored)
    {
        //
        // Get an already saved communicator context
        // N.B. need to define the failure semantic.
        //
        MPI_Comm_rejoin("workers-group-comm", &comm);
    }
    else
    {
        MPI_Comm_rank(MPI_COMM_WORLD, &rank);
        int color = (rank - 1) / x_group_size;
        int key =   (rank - 1) % x_group_size;
        MPI_Comm_split(MPI_COMM_WORLD, color, key, &comm);


        //
        // Save the communicator context to be able to restore it later.
        // note that communicator attributes and error handler are not saved.
        //
        MPI_Comm_save(comm, "workers-group-comm");

        //
        // N.B. The barrier here guarantees that the comm was saved for all ranks and
        // that recovery is posible.
        //
        MPI_Barrier(MPI_COMM_WORLD);
    }
}


Worker
The worker code is very similar to the scenario 1 worker code, except that its using the group communicator.


//
// Assumption: fatal errors on comm x would abort only the group of processes that are part of comm x
//
//

void worker()
{
    MPI_Init(0, 0);
    MPI_Comm comm = get_workers_group_comm();

    for(;;)
    {
        MPI_Recv(src=0, &query, comm);
        if(is_done_message(query))
            break;

        process_query(&query, &answer);

        MPI_Send(dst=0, &answer, comm);
    }

    MPI_Finalize()
}



Leader
The leader code is the glue between the master and the workers, it redistribute the worker and reassemble the answers to send back to the master.
It is responsible for restarting the master and the workers in case of a failure.


void leader()
{
    MPI_Init(0, 0)
    MPI_Comm comm = get_workers_group_comm();

    int my_group_size;
    MPI_Comm_size(comm, &my_group_size);

    //
    // Set the error handler on comm world to 'errors return' to be able to handle the
    // errors and restart the master or workers if they fail.  Set the error handler
    // *after* creating the group communicator, to avoid inheriting the error handler.
    //
    MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN);

    //
    // process requests coming from the master, until the *done* message arrives
    //
    for(;;)
    {
        QueryMessage query;
        rc = MPI_Recv(src=0, &query, MPI_COMM_WORLD);
        if(rc != MPI_SUCCESS)
        {
            //
            // Communication with the master failed, try to restart it
            //
            rc = MPI_Comm_Restart_rank(MPI_COMM_WORLD, 0);
            if(rc == MPI_SUCCESS)
                continue;

            //
            // Could not restart the master, maybe other leaders could; abort this group
            // and expect the master to restart this leader.
            //
            MPI_Abort(comm, 1);
        }


        for(int i = 1; i < my_group_size; i++)
        {
            WorkerQuery worker_query;
            compose_worker_query(query, i, &worker_query);
            for(;;)
            {
                rc = MPI_Send(dst=i, worker_query, comm);
                if(rc == MPI_SUCCESS)
                    break;

                //
                // Cannot communicate with this worker; try to restart it; if successful
                // send the work item again.
                //
                rc = MPI_Comm_Restart_rank(comm, i);
                if(rc == MPI_SUCCESS)
                    continue;

                //
                // Could not start the worker; abort this group and expect the master to
                // restart it.
                //
                MPI_Abort(comm, 2);
            }
        }

        if(is_done_message(query))
            break;

        AnswerMessage answer;
        for(int i = 1; i < my_group_size; i++)
        {
            WorkerAnswer worker_anser;
            rc = MPI_Recv(src=i, worker_answer, comm);
            if(rc !- MPI_SUCCESS)
            {
                //
                // Cannot receive the answer from this worker, and we can't really retry
                // abort this group and expect the master to restart it.
                //
                MPI_Abort(comm, 3);
            }
            agregate_worker_result(worker_answer, i, &answer);
        }

        //
        // Send the result to the master; no need to check for error here, we will detect
        // it when with the next receive.
        //
        MPI_Send(dst=0, &answer, MPI_COMM_WORLD);
    }

    MPI_Finalize()
}


Master
The master code is very similar to scenario 1. It's the master responsibility to restart the leaders in case of a failure.

int leaderof(int group)
{
    return (group - 1) * x_group_size + 1;
}


void master()
{
    MPI_Init()

    //
    // The master does not need a workers group comm, but in needs to participate in the
    // creation of that comm;
    //
    get_workers_group_comm()

    MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
    MPI_Comm_size(MPI_COMM_WORLD, &world_size);

    //
    // include the master as group 0
    //
    int ngroups = (world_size - 1) / x_group_size + 1 + 1;
    MPI_Request r[ngroups] = MPI_REQUEST_NULL;
    QueryMessage q[ngroups];
    AnswerMessage a[ngroups];
    int active_groups = 0;

    bool restarting[ngroups] = false;

    //
    // Phase 1: send initial requests
    //
    for(int i = 1; i < ngroups; i++)
    {
        if(get_next_query(stream, &q[i]) == eof)
            break;

        active_groups++;
        MPI_Send(dest=leaderof(i);, &q[i], MPI_COMM_WORLD);
        rc = MPI_Irecv(src=leaderof(i), buffer=&a[i], request=&r[i], MPI_COMM_WORLD)
        if(rc != MPI_SUCCESS)
        {
            restart_leader(i, restarting, q, a, r, stream);
        }
    }

    //
    // Phase 2: finalize any unnecessary ranks
    //
    for(int i = active_groups + 1; i < ngroups; i++)
    {
        MPI_Send(dest=leaderof(i), &done_msg, MPI_COMM_WORLD);
    }

    //
    // The progress engine. Get answers; send new requests and handle
    // process restarts
    //
    while(active_groups != 0)
    {
        rc = MPI_Waitany(ngroups, r, &i, MPI_STATUS_IGNORE);

        if(rc != MPI_SUCCESS)
        {
            if(!restarting[i])
            {
                restart_leader(i, restarting, q, a, r, stream)
            }
            else
            {
                active_groups--;
            }
            continue;
        }

        restarting[i] = false;

        process_answer(&a[i]);

        if(get_next_input(stream, &q[i]) == eof)
        {
            active_groups--;
            MPI_Send(dest=leaderof(i), &done_msg)
        {
        else
        {
            MPI_Send(dest=leaderof(i), &q[i])
            rc = MPI_Irecv(src=leaderof(i), buffer=&a[i], request=&r[i], MPI_COMM_WORLD)
            if(rc != MPI_SUCCESS)
            {
                restart_leader(i, restarting, q, a, r, stream);
            }
        }
    }

    MPI_Finalize();
}


void restart_leader(int i, int restarting[], Query q[], Answer q[], MPI_Request r[], Stream stream)
{
    restarting[i] = true;
    push_query_back(stream, &q[i]);
    MPI_Comm_Irestart_rank(MPI_COMM_WORLD, leaderof(i), &r[i]);
}


Thanks,
.Erez

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.mpi-forum.org/pipermail/mpiwg-ft/attachments/20090421/c3f9049d/attachment.html>


More information about the mpiwg-ft mailing list