Use finer-grained mutexes in TensorPipe RPC agent (#52749)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/52749
TensorPipe has recently changed some implementation details in how it schedules callbacks and this has exposed an issue in the RPC agent. Previously the callbacks of each pipe were executed independently and possibly simultaneously. For safety reasons (especially during shutdown) TensorPipe now synchronizes the pipes and thus invokes one callback at a time. Another characteristic of TensorPipe is that it "hijacks" some user threads to run some callbacks inline (e.g., if a low-level event loop completes an operation while a pipe is already busy, this completion is queued up and the user callback could be invoked later by a different thread, including the user's own thread).
These two effects combined caused a "reentrancy" phenomenon, where calling `context->connect` (formerly on line 850) to create a new client-side pipe could cause invoking a read callback on another pipe. Since we were holding `mutex_` when calling `context->connect`, and we were trying to re-acquire `mutex_` inside the read callback, this lead to a deadlock.
One solution to this problem is using finer-grained mutexes. In particular, introduce a mutex for each outgoing pipe (rather than a global one), which thus becomes the only one we need to acquire inside callbacks. At this point, the old `mutex_` is only guarding the vector of ClientPipes, thus we can rename it and release it earlier.
I also fixed the agent not acquiring any mutex when it set a message to error after a failed write (and also not removing the message from the timeout map).
ghstack-source-id: 122410367
Test Plan: Ran CI in #52677 together with the TensorPipe submodule update.
Reviewed By: mrshenli
Differential Revision: D26636345
fbshipit-source-id: d36da989f2aab51f4acb92d2e81bb15b76088df1