[Mpi3-ft] New FT API

Josh Hursey jjhursey at open-mpi.org
Fri Aug 12 09:15:47 CDT 2011


Comments inline below.

On Thu, Aug 11, 2011 at 3:48 PM, Darius Buntinas <buntinas at mcs.anl.gov> wrote:
>
> Hi Josh,
>
> Thanks for you comments, I made mine inline.
>
> On Aug 10, 2011, at 4:29 PM, Josh Hursey wrote:
>
>> Thanks for getting that new interface going. I have some notes below
>> for discussion.
>>
>> -- Josh
>>
>>
>> - Should we rename MPI_COMM_GET_FAILED to MPI_COMM_GROUP_FAILED() to
>> better match the MPI_Comm_group() signature?
>
> I'd be OK with that.
>
>> - Should we add an optional MPI_Info parameter to the
>> MPI_Comm_get_failed() operation to allow for implementation specific
>> optimizations - similar to what we had with the 'mask' in the previous
>> proposal.
>
> I looked through both the ticket276 document and the wiki but couldn't find a list of valid masks, so I'm not sure how it would be used.  The one mask I did see was "new", but that should be determined using MPI_GROUP_DIFFERENCE to avoid a race.  What were the other mask values we were considering?

I'm ok with dropping it, but I just wanted to note that the mask
introduced a path for implementations to help the users manage large
lists of processes by categorizing them in some non-standard way.
However, the new/old mask was the driving use case, and we have a
different way to do that. So I would keep it out for now, unless
others have use cases.

>
>> - What should we do for intercommunicators?
>> A) Should we expand the signature of MPI_Comm_get_failed to return
>> both the failed set for both the local and remote groups?
>> MPI_Comm_get_failed(comm, local_grp, remote_grp)
>> B) Should MPI_Comm_get_failed only return the remote group for
>> intercommunicators, and force the user to MPI_Comm_group() to get the
>> local group then call a MPI_Group_get_failed() to get the subset of
>> failures?
>> C) Add a MPI_Comm_get_failed_remote(comm, grp) that would return the
>> failures in the remote group, and MPI_Comm_get_failed() would only
>> ever return the failures in the local list.
>> D) Something else?
>
> I think we want C.  If we want to parallel mpi_comm_group, we can have mpi_comm_group_failed return the failed processes in the local group, then have the mpi_comm_remote_group_failed to return the failed processes in the remote group (like mpi_comm_remote_group).

I like (C) the best as well. I agree that it fits best with the
current standard.

>
>>
>> - MPI_ANY_SOURCE: I think the user should have to pass in a group
>> containing the list of failed ranks that it is allowing to participate
>> even though they are failed. If there are other failures on the
>> communicator that are not contained in this list then the
>> MPI_ANY_SOURCE will fail as before. This protects the user from
>> acknowledging more processes than it expects to, and avoids the thread
>> safety issue mentioned on the wiki.
>
> It returns the group of processes that were acknowledged.  The idea is that the user can compare the returned group with a group that it had previously to see if any new processes were detected (and acknowledged).

It is nice that this matches the behavior of
MPI_Comm_validate/MPI_Comm_group_failed, which also returns the agreed
upon list - which may include failures that the user may not be aware
of yet.

>
> We could have the user pass in a group of processes to be acknowledged, but then the implementation needs to keep track of which failures are acknowledged and which aren't, as opposed to just a flag: anysource enabled/disabled.  And it still doesn't address the thread safety problem:  A thread can still come in and acknowledge failures that occurred between the time another thread checked/acknowledged failures and when it called a blocking wildcard receive.

I don't think that we would have to store the acknowledged group on
the communicator. The call would have to check the user provided group
against the group of all known failures (something we would have to
have access to regardless). If there are known failures in the
communicator but not in the user provided group, then we do not
re-activate any_source. So there is a chance that the call returns
without reactivating MPI_ANY_SOURCE. But this puts the user in the
predicament that they might have to call MPI_Comm_recognize() and
MPI_Comm_group_failed() in a loop to reactivate MPI_ANY_SOURCE, which
is not pretty.

It is a fair point though from a programability stance. I would
suggest adopting the  MPI_Comm_enable_any_source() signature then to
make it abundantly clear.

>
>> - MPI_ANY_SOURCE: I do not have a preference on MPI_Comm_recognize
>> versus MPI_Comm_enable_any_source. A 'recognized' rank can be defined
>> as a rank that the application has acknowledged as failed to MPI, and
>> understands that it will not participate in any group operations like
>> MPI_ANY_SOURCE or collectives (though a special recognition operation
>> is provided for collectives).
>
> I think I'm now leaning towards MPI_Comm_enable_anysource (see above).

Me too.

>
>> - Nullify: I like keeping this separate since there is a question of
>> whether or not it is useful to provide MPI_PROC_NULL semantics for P2P
>> operations to failed peers. I think the signatures are fine. Maybe
>> change them to MPI_Comm_group_nullified/MPI_Comm_group_nullify to line
>> up with MPI_Comm_group - though I could see users getting those mixed
>> up pretty easily.
>
> OK
>
>> - MPI_Comm_validate: For this operation, are the processes identified
>> in the group 'recognized' for MPI_ANY_SOURCE? In the previous proposal
>> the collective validate would 'recognize' the failed processes
>> automatically. I would say that in this version we should -not- do
>> this. I do not see much benefit in this given the flexibility of the
>> new interface. Just a point of discussion, since it would be different
>> semantics than those in the previous proposal.
>
> Right, we can see validate as a collective function operating on a collective state (collectives enabled) and the recognize/anysource_enable function as a local function operating on local state (anysource enabled).
>
>> - Thread safety: If we force the user to specify a group of processes
>> that it wants to recognize [MPI_Comm_recognize(comm, input_group,
>> output_group)]. The input_group would allow the user to specify those
>> failed processes that it is wanting to acknowledge. The output_group
>> would represent the full set of recognized ranks for this
>> communicator. By requiring the user to specify an input_group this
>> prevents the MPI implementation from adding more failed processes to
>> the recognized group without the users knowledge. The user would then
>> need to make sure that the threads know about how they are each
>> managing the recognition status.
>
> Yes, but it does not prevent another thread from recognizing failures before the thread can call receive(anysource):
>
> Thread A                      Thread B
> ========                      ========
> Recognize(comm, groupA)
> ----------- PROC X FAILS ----------------
>                              Recognize(comm, groupB) // groupB contains proc X
> Recv(comm, anysource)
>
> Thread A may hang if the receive can only match a message that would have been sent by process X.

Yep, but this is the users fault and they have tools to make sure it
doesn't happen. Or put another way, this is an erroneous/dangerous
program. Since both threads are operating on the same communicator,
they need to be coordinating that activity.

>
>> - Thread safety: There is another question of what happens if:
>>  ThreadA: MPI_Recv(comm, MPI_ANY_SOURCE)
>>  --- Rank X fails ---
>>  ThreadB: Notice a failure of Rank X
>>  ThreadB: MPI_Comm_recognize(comm, {rankX})
>> There is a race between when the error of Rank X failure is reported
>> to ThreadA, and when ThreadB recognizes the failure. If ThreadB
>> recognizes the failure before ThreadA is put on the run queue, should
>> ThreadA return an error? or should it keep processing? I think it
>> should return an error, and we should discourage the users from such
>> constructs, but I could be convinced otherwise.
>
> I think failure detection should be atomic wrt threads of a process.  As soon as the failure is detected, all anysource requests should be completed with an error, regardless of which thread detected it or which thread is waiting on the request.  Now it's possible that ThreadB recognizes the new failure before ThreadA returns from its receive.  But ThreadA can still check for new failures by doing a comm_group_failure then a group_difference with a group of failed processes it requested earlier.

So this is a slight modification on the scenario that you had above.
Thread A                      Thread B
========                      ========
Recognize(comm, groupA)
Recv(comm, anysource)
----------- PROC X FAILS ----------------
                             Recognize(comm, groupB) // groupB contains proc X
Recv() returns ??

I think that the Recv() should fail since when it was posted Proc X
was not recognized yet. So to avoid unintentionally blocking ThreadA,
it should return an error. The question becomes how to implement this
semantic.

One way I thought that we could implement it is with a counter on the
request and communicator. When Recognize() is called it stores the
number of known alive processes (P_A) on the communicator, and sets
the boolean (is_any_source_enabled) to true. When a Recv(ANY_SOURCE)
is posted then that number (P_A) is copied onto the request, and
allowed to be posted since (is_any_source_enabled) is true. When a new
process fails in the communicator the boolean (is_any_source_enabled)
is set to false. (is_any_source_enabled) should be used to force the
Recv request to return an error, but if Thread B calls Recognize()
before that check on the request then the check could return thinking
that all is well. But since when ThreadB calls Recognize() it updates
the counter (P_A), then in the request check we check not only the
(is_any_source_enabled) value but also the value of (P_A) stored on
the request against the value of (P_A) stored on the communicator. If
they are different then the request should return in error.

So that's a long way of saying, I think that the Recv() in ThreadA
should return an error, and I think there is a straightforward way of
providing this semantic without much overhead.

>
>> - 'notion of thread-specific state in the MPI standard?' From what I
>> could find, I do not think there is a notion of thread specific state
>> in the MPI standard. There is a concept of the 'main thread', but I
>> think that is as far as the standard goes in this regard.
>
> Yeah, the main thread was the only instance we could come up with here.
>
> I think what we really need to make this thread safe is another object that keeps track of whether the communicator is anysource enabled or of which processes have been recognized.  This object would then need to be passed in to all receive operations.  Each thread can manage its own object and enable/disable anysource receives as necessary.  Of course this means adding a new parameter to receive operations which I don't think would make the forum happy.
>
> Instead, by using thread-local storage, we make this object implicit.  If we want to make it implicit to avoid adding parameters to receive, without using thread-local storage, we run into the thread safety issues.
>
> So the options I see are:
>    1. Change the API and add a parameter to receive function
>    2. Make the state implicit but don't use thread-local storage and have thread safety
>        problems
>    3. Make the state implicit and use thread-local storage
>
> I think 3 is the least evil of the three.

I think all three are evil for different reasons, but I agree that we
must have a solution that allows the user to write a thread safe
program.

So I think (1) is something we should try to avoid. Adding a new Recv
function and a new object seems hack'ish. We got a fair amount of push
back from the Forum for adding implicit state on the communicator, so
(2) and (3) seem to bring that back a little more strongly than we
have so far in this thread. Potentially having any_source enabled in
one thread but not another on the same communicator is a weird
semantic to introduce, so we should avoid trending that way.

The (4) option would be to make this the user's problem. I think it
would be easier on all sides if we just said that the example you
cited was erroneous/dangerous since two threads are mutating the state
(is_any_source_enabled) of a shared object (comm), and the user is
responsible for coordinating that action.

The first approach I thought of is below, though I admit that it is
off-the-cuff so might be rough or wrong.

If the threads use a reader/writer like construct to make sure that
Recognize (writer) is only called when there are no outstanding
Recv(ANY_SOURCE)'s posted (readers) then I think the user can avoid
this problem.

The complexity comes in (as it does in all FT programs) when we think
of when Recognize() is expected to be called. I would expect it to be
called in response to a detected process failure.

So the program might look like (need different tags to provide thread context):
Thread A                      Thread B
========                      ========
while(...) {                  while(...) {
if(!Recv(comm, any, tagA))    if(!Recv(comm, any, tagB))
  Recognize(comm, groupA)        Recognize(comm, groupB)
}                             }

Then with a reader/writer like construct we would force the user to
complete all outstanding Recv(comm, any_source) operations before
calling Recognize() on the communicator. Since Recv(any_soruce) is
guaranteed to complete if there is a new error then there is no
deadlock in waiting for the Recv(any_source) to complete.

So adding the reader/write locks we would have:
Thread A                      Thread B
========                      ========
while(...) {                  while(...) {
reader_enter()                reader_enter()
if(!Recv(comm, any, tagA))    if(!Recv(comm, any, tagB))
  reader_leave()                 reader_leave()
  writer_enter()                 writer_enter()
  Recognize(comm, groupA)        Recognize(comm, groupB)
  writer_leave()                 writer_leave()
else                          else
  reader_leave()                 reader_leave()
}                             }

And they might interleave like:

Thread A                      Thread B
========                      ========
reader_enter()                reader_enter()
Recv(comm, any, tagA)         Recv(comm, any, tagB)
----------- PROC X FAILS ----------------
// Recv fails
reader_leave()
writer_enter() // waits
                              // Recv fails
                              reader_leave()
                              writer_enter() // no-wait
                              Recognize(comm, groupB)//{X}
                              writer_leave()
                              reader_enter() // waits
----------- PROC Y FAILS ----------------
Recognize(comm, groupA)//{X,Y}
writer_leave()
                              Recv(comm, any, tagB)

Which leaves us with the same problem :/

The solution is to coordinate the action of the threads during the
Recognize() (write). If you think about it, it is a little odd that
both threads are calling Recognize(), so elect just one. And we need a
thread_barrier() to make sure that ThreadB does not starve ThreadA by
looping through numerous failed Recv(comm, any, tagB) while not
allowing ThreadA to Recognize the failures.

Thread A                      Thread B
========                      ========
while(...) {                  while(...) {
reader_enter()                reader_enter()
if(!Recv(comm, any, tagA))    if(!Recv(comm, any, tagB))
  reader_leave()                 reader_leave()
  writer_enter()                 writer_enter()
  Recognize(comm, groupA)
  thread_barrier()               thread_barrier()
  writer_leave()                 writer_leave()
else                          else
  reader_leave()                 reader_leave()
}                             }


And they might interleave like:

Thread A                      Thread B
========                      ========
reader_enter()                reader_enter()
Recv(comm, any, tagA)         Recv(comm, any, tagB)
----------- PROC X FAILS ----------------
// Recv fails
reader_leave()
writer_enter() // waits
                              // Recv fails
                              reader_leave()
                              writer_enter() // no-wait
                              thread_barrier // wait for threadA
----------- PROC Y FAILS ----------------
Recognize(comm, groupA)//{X,Y}
thread_barrier() //no-wait
writer_leave()
                              writer_leave()
                              reader_enter()
                              Recv(comm, any, tagB)
reader_enter()
Recv(comm, any, tagA)


That seems to solve things (to me anyway), but we need a termination
detection fix to make sure this eventually stops in a uniform manner.
Otherwise we might get:

Thread A                      Thread B
========                      ========
reader_enter()                reader_enter()
Recv(comm, any, tagA)         Recv(comm, any, tagB)
                              // Success
                              reader_leave()
                              // Exit loop and continue
----------- PROC X FAILS ----------------
// Recv fails
reader_leave()
writer_enter() // no-wait
----------- PROC Y FAILS ----------------
Recognize(comm, groupA)//{X,Y}
thread_barrier() // wait for ThreadB to join... but it won't
// Deadlock


I think that the easiest way to do this (there are others that we
could devise to reduce the synchronization overhead between threads)
would be to not react to the error each time, but process them inline
even if there are no failures.

Thread A                      Thread B
========                      ========
while(...) {                  while(...) {
 reader_enter()                reader_enter()
 Recv(comm, any, tagA)         Recv(comm, any, tagB))
 reader_leave()                reader_leave()

 writer_enter()                writer_enter()
 Recognize(comm, groupA)
 thread_barrier()              thread_barrier()
 writer_leave()                writer_leave()
}                             }

So we never get the odd mismatch between one thread exiting the loop,
and the other waiting for it to rendezvous in the error condition.
That is while the two loops are synchronized.

Is it cumbersome, yea. But it does seem to allow the user to manage it
at the application level. Some additional complexity is required when
thinking about accounting for processes failures in an application
since the timing of the error associated with the process failure can
occur at any time so the program logic must be able to adapt to such
events. I think this threading example highlights a race condition
when managing the perceived state of a process group after an error.

It should be noted that the synchronization problem highlighted is not
exclusive to MPI_ANY_SOURCE, but since we have the ability to NULLIFY
a process we have the same problem there.

Thread A                      Thread B
========                      ========
while(...) {                  while(...) {
if(!Recv(comm, procA, tagA))  if(!Recv(comm, procA, tagB))
  Nullify(comm, procA)          Nullify(comm, procA)
}                             }

So the two threads would need to coordinate the Nullify (writer)
between the Recv (reader) in the two threads.

Thoughts?

-- Josh

>
> -d
>
>
>>
>>
>>
>>
>> On Wed, Aug 10, 2011 at 4:42 PM, Darius Buntinas <buntinas at mcs.anl.gov> wrote:
>>>
>>> I've started a wiki page describing a new API based on feedback from the forum and comments during our last meeting.  It's still a work in progress, but please look over it and send me your comments, specifically on the "thread safety" section.
>>>
>>> https://svn.mpi-forum.org/trac/mpi-forum-web/wiki/ft/run_through_stabilization_2
>>>
>>> Thanks,
>>> -d
>>> _______________________________________________
>>> mpi3-ft mailing list
>>> mpi3-ft at lists.mpi-forum.org
>>> http://lists.mpi-forum.org/mailman/listinfo.cgi/mpi3-ft
>>>
>>>
>>
>>
>>
>> --
>> Joshua Hursey
>> Postdoctoral Research Associate
>> Oak Ridge National Laboratory
>> http://users.nccs.gov/~jjhursey
>>
>> _______________________________________________
>> mpi3-ft mailing list
>> mpi3-ft at lists.mpi-forum.org
>> http://lists.mpi-forum.org/mailman/listinfo.cgi/mpi3-ft
>
>
> _______________________________________________
> mpi3-ft mailing list
> mpi3-ft at lists.mpi-forum.org
> http://lists.mpi-forum.org/mailman/listinfo.cgi/mpi3-ft
>
>



-- 
Joshua Hursey
Postdoctoral Research Associate
Oak Ridge National Laboratory
http://users.nccs.gov/~jjhursey




More information about the mpiwg-ft mailing list