Distributed in-memory map-reduce for data analyzer (#5129)
Adds class `DistributedDataAnlyzer` that implements a map-reduce on
distributed memory.
- instead of writing hundred/thousands of temp files as intermediate
storage in the map/reduce as in `DataAnalyzer`, each node holds
disjoints ordered subsets of pairs of `(metric, sample id)` as a
distributed tensor.
- Also removes the need to specify `metric_dtypes` as it's automatically
inferred by the return of `metric_function(data)`.
- Removes the need to have a distributed file system where all nodes
need to be able to write to: here only rank 0 does the writing.
- Much faster than the original map-reduce based on writing and loading
several temp files into the disk, requires less memory, no temp files,
and is simpler.
## How does it work
- for each metric, the only results storage is `metric_result` as a list
of `(sample_id, metric_value)` tuples.
- `metric_result` is converted to a 2D tensor when whole dataset has
been iterated.
- `sample_idx_dtype` and `metric_value_dtype` are collected by an
`all_reduce(op=MIN)` and `MAX` across the `metric_result` of all nodes.
- each node holds a `metric_result` tensor of `N` samples as `N x
(metric, sample)`, sorted by metric, with different `metric` values
across nodes. E.g.:
- node 1 holds `[ [1,20], [1, 30], [2,10], [2, 30]]`, node 2 holds `[
[3,10], [3, 20], [3,15], [3, 40]]`, and node 3 holds `[ [4,20], [4, 30],
[5,25], [5, 50]]`.
- to convert the list `[(metric,sample)]` to a `dict{ metric =
[samples]}` each node iterates only its own dataset, as dictionary keys
do not overlap across nodes. In this case, node 1 builds `{ 1: [20, 30],
2: [10,30]}`, node 2 builds `{ 3: [10, 20, 15, 40] }`, and node 3 holds
`{ 4: [20, 30], 5: [25, 50]}`.
- To write the merged files: (1) rank 0 opens the file, (2) iteratively
receives buffers of values, dict keys and dict values from other ranks
and writes them, and (3) closes the file.
## Future work
Ideally, one could take this and do the curriculum setup on-the-fly when
calling deepspeed `initialize`, i.e. without writing/loading map-reduce
files and without forcing the user to call `.map()` and `.reduce()`
beforehand. It takes less than 10 seconds so it's totally feasible.
## References
- `file_write_ordered()` implements a sequential shared write similar to
[`MPI_File_write_ordered`](https://www.open-mpi.org/doc/v3.0/man3/MPI_File_write_ordered.3.php).
It is however adapted to communicate and write a list of tensors,
instead of a single tensor. And it is also adapted to have only rank 0
writing to the file, instead of using a shared pointer.
- `dist_sample_sort()` implements a distributed sample sort, as detailed
[here](https://brunomaga.github.io/Distributed-Sort) and illustrated
below. The ranges in step 3 guarantee disjoint subsets of keys (metric
values) across nodes.

---------
Co-authored-by: Logan Adams <114770087+loadams@users.noreply.github.com>