Implement backend-agnostic rpc._wait_all_workers() utility (#30710)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/30710
We need a backend-agnostic mechanism to do barrier-like operation before locally destroy RRef context and shutdown RPC Agent.
- Sort worker names.
- Elect the first name as the leader in the ordered worker names.
- Followers reports therir intent to synchronize to the leader.
- Leader also reports to itself, when `_wait_all_workers()` called.
- If all workers report their intent to proceed, leader send the command to every one to proceed.
Test Plan:
# Unit tests
```
buck test mode/dev-nosan //caffe2/test:rpc_fork -- test_wait_all_workers
buck-out/gen/caffe2/test/rpc_fork\#binary.par -r test_wait_all_workers$
buck-out/gen/caffe2/test/rpc_fork\#binary.par -r test_rref_leak
buck-out/gen/caffe2/test/rpc_fork\#binary.par -r test_rref_forward_chain
```
```
buck test mode/dev-nosan //caffe2/test:rpc_fork_thrift -- test_wait_all_workers
buck-out/gen/caffe2/test/rpc_fork_thrift\#binary.par -r test_wait_all_workers$
```
# Stress runs
```
buck test mode/dev-nosan //caffe2/test:rpc_fork_thrift -- test_stress_light_rpc --stress-runs 10
```
```
buck test mode/dev-nosan //caffe2/test:rpc_spawn_thrift -- test_stress_light_rpc --stress-runs 10
```
```
buck test mode/dev-nosan //caffe2/test:rpc_fork_thrift -- test_stress_heavy_rpc --stress-runs 10
```
```
buck test mode/dev-nosan //caffe2/test:rpc_spawn_thrift -- test_stress_heavy_rpc --stress-runs 10
```
# Debug
```
buck test mode/dev-nosan caffe2/test:rpc_fork -- test_shutdown
```
```
buck test mode/dev-nosan //caffe2/test:dist_autograd_fork -- test_clean_context_during_backward
buck build mode/dev-nosan //caffe2/test:dist_autograd_fork
buck-out/gen/caffe2/test/dist_autograd_fork\#binary.par -r test_clean_context_during_backward
```
https://our.intern.facebook.com/intern/testinfra/diagnostics/281475127895800.844424945328750.1575664368/
```
I1206 12:27:47.491420 185619 process_group_agent.cpp:211] Shutting down ProcessGroupAgent.
I1206 12:27:47.493880 185630 process_group_agent.cpp:211] Shutting down ProcessGroupAgent.
I1206 12:27:47.494526 185625 process_group_agent.cpp:211] Shutting down ProcessGroupAgent.
I1206 12:27:47.495390 185636 process_group_agent.cpp:211] Shutting down ProcessGroupAgent.
E1206 12:27:47.544198 185627 pair.cc:642] 1 --->>> 0, read ERROR: AsyncSocketException: Network error, type = Network error, errno = 104 (Connection reset by peer)
E1206 12:27:47.544203 185633 pair.cc:642] 2 --->>> 0, read ERROR: AsyncSocketException: Network error, type = Network error, errno = 104 (Connection reset by peer)
E1206 12:27:47.544210 185639 pair.cc:642] 3 --->>> 0, read ERROR: AsyncSocketException: Network error, type = Network error, errno = 104 (Connection reset by peer)
```
This should mean the UDF in the request has been run, so Python proceeded and ran to `_agent.shutdown()`.
While the RpcAgents on followers wanted to send back the response, but the leader has closed RPC.
Need to re-trigger "pytorch_rpc-buck" to reproduce the rare-seen issue.
Differential Revision: D18643137
fbshipit-source-id: d669d4fc9ad65ed48bed1329a4eb1c32ba51323c