Fix TensorPipe agent trying to double-set error (#52837)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/52837
After https://github.com/pytorch/pytorch/pull/52749 we started seeing an increased flakiness of the TensorPipeDistAutogradTestWithSpawn.test_backward_node_failure_python_udf test, with failures like this one:
https://app.circleci.com/pipelines/github/pytorch/pytorch/277824/workflows/cfcbef5a-544e-43bd-b3b0-ebc7b95134fe/jobs/11145394
https://gist.github.com/lw/a0b48900673b5ae0f5d03aca1e72ffff
The logs are very clear and point to the changes in the error handling code upon a write error. Namely, the bug is triggered when a incoming read fails while there is an outgoing write, in which case the read callback (invoked first) will flush all pending futures, which then causes the write callback (invoked after) to not find the future it's looking for.
In a sense this bug wasn't introduced by https://github.com/pytorch/pytorch/pull/52749, however that PR introduced a check for whether the outgoing message was found, whereas before we would silence such a condition.
A fix for this could be to just resume silencing the error. However, I'm trying to go a bit further: when an outgoing write fails, we know that all subsequent callbacks will fail too, and thus all pending operations should be flushed. Hence we can do so, instead of just trying to flush a single given operation. This allows us to merge the error-handling code of both the read and write paths.
ghstack-source-id: 122509550
Test Plan: Will export to GitHub, run on CircleCI, and manually SSH into a machine and stress-run that test that was flaky.
Reviewed By: mrshenli
Differential Revision: D26663448
fbshipit-source-id: fbff0f6aff0d98994c08018a27c47c97149b920c