datasets
fix: loading of datasets from Disk(#7373)
#7489
Open

fix: loading of datasets from Disk(#7373) #7489

sam-hey wants to merge 5 commits into huggingface:main from sam-hey:fix/concatenate_datasets
sam-hey
sam-hey96 days ago

Fixes dataset loading from disk by ensuring that memory maps and streams are properly closed.

For more details, see #7373.

sam-hey fix: concat of datasets
52461406
sam-hey Merge remote-tracking branch 'source/main' into fix/concatenate_datasets
d2898318
sam-hey add _memory_mapped_record_batch_reader_from_file
02c3cf82
sam-hey
sam-hey94 days ago (edited 94 days ago)ā¤ 1

@nepfaff Could you confirm if this fixes the issue for you? I checked Memray, and everything looked good on my end.

Install: pip install git+https://github.com/sam-hey/datasets.git@fix/concatenate_datasets

nepfaff
nepfaff92 days agoā¤ 1

Will aim to get to this soon. I don't have a rapid testing pipeline setup but need to wait for some AWS nodes to become free

nepfaff
nepfaff91 days ago (edited 91 days ago)

I now set up a small experiment:

# Log initial RAM usage
    process = psutil.Process(os.getpid())
    initial_ram = process.memory_info().rss / (1024 * 1024)  # Convert to MB
    logging.info(f"Initial RAM usage: {initial_ram:.2f} MB")

    chunk_datasets = [
        Dataset.load_from_disk(dataset_path, keep_in_memory=False) for _ in range(N)
    ]
    combined_dataset = concatenate_datasets(chunk_datasets)

    # Log final RAM usage
    final_ram = process.memory_info().rss / (1024 * 1024)  # Convert to MB
    ram_diff = final_ram - initial_ram
    logging.info(f"Final RAM usage: {final_ram:.2f} MB")
    logging.info(f"RAM usage increase: {ram_diff:.2f} MB")

The RAM usage is linearly correlated with N on datasets master!

For my test dataset:

  • N=5 => RAM usage increase: 26302.91 MB
  • N=10 => RAM usage increase: 52315.18 MB
  • N=20 => RAM usage increase: 104510.65 MB
  • N=40 => RAM usage increase: 209166.30 MB

Unfortunately, your patch doesn't seem to change this:

pip install git+https://github.com/sam-hey/datasets.git@fix/concatenate_datasets
pip list | grep datasets
datasets                 3.5.1.dev0

Gives exactly the same RAM statistics.

Edit: The results are a bit flawed as the memory increase all seems to come from Dataset.load_from_disk(dataset_path, keep_in_memory=False) here (which I don't think should happen either?) and not from concatenate_datasets. This seems different from my large-scale setup that runs out of memory during concatenate_datasets but I don't seem to be able to replicate this here...

sam-hey sam-hey marked this pull request as draft 91 days ago
sam-hey fix: close more streams
1d7133b5
sam-hey sam-hey marked this pull request as ready for review 91 days ago
sam-hey doc: add some docs
5d2da73d
sam-hey
sam-hey91 days ago (edited 91 days ago)

Thanks a lot, @nepfaff, for taking a look at this! It seems that concatenate_datasets() is fixed with this PR. I can also confirm that loading a large number of files requires significant memory. However, as I understand it, this is expected/a bug since the memory consumption stems from pa.memory_map(), which returns a memory-mapped file.

This behavior might be related to this bug: apache/arrow#34423

Screenshot 2025-04-03 at 16 01 11
lhoestq
lhoestq commented on 2025-04-15
src/datasets/table.py
89 opened_stream, memory_mapped_stream = _memory_mapped_record_batch_reader_from_file(filename)
6590 pa_table = opened_stream.read_all()
91 opened_stream.close()
92
memory_mapped_stream.close()
6693
return pa_table
lhoestq79 days ago

is it really ok to close the memory map, given the memory mapped table is still in use ?

sam-hey79 days ago

Yes, pa_table includes all information and is a copy: see the example of read_all(). https://arrow.apache.org/docs/python/ipc.html

lhoestq79 days ago

I think read_all() doesn't load the data in memory here, it loads buffers that are memory mapped from disk

lasuomela71 days ago (edited 71 days ago)ā¤ 2

I can confirm that this works:

import pyarrow as pa

BATCH_SIZE = 10000
NUM_BATCHES = 1000

schema = pa.schema([pa.field('nums', pa.int32())])

# Write in stream format
with pa.OSFile('bigfile.arrow', 'wb') as sink:
    with pa.ipc.new_stream(sink, schema) as writer:
        for _ in range(NUM_BATCHES):
            batch = pa.record_batch([pa.array(range(BATCH_SIZE), type=pa.int32())], schema)
            writer.write(batch)

# Read the stream back
with pa.memory_map('bigfile.arrow', 'rb') as source:
    with pa.ipc.open_stream(source) as reader:
        table = reader.read_all()

print("LEN:", table.num_rows)
print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20))

# Read the first batch
print("")
print("First batch:")
print(table[0][0:BATCH_SIZE])

Out:

LEN: 10000000
RSS: 0MB

First batch:
[
  [
    0,
    1,
    2,
    3,
    4,
    ...
    9995,
    9996,
    9997,
    9998,
    9999
  ]
]
sam-hey70 days agošŸ‘ 1

@lasuomela Thanks a lot for checking! I’m currently on vacation and without a laptop to verify it myself.
@lhoestq Would this be sufficient proof for you?

lhoestq
lhoestq70 days ago (edited 70 days ago)ā¤ 1

Great ! have you tested that it also fixes the memory issue in your case @iamollas ?

Happy to know that it works for you @sam-hey ! Looking forward to merging this

HuggingFaceDocBuilderDev
HuggingFaceDocBuilderDev70 days ago

The docs for this PR live here. All of your documentation changes will be reflected on that endpoint. The docs are available until 30 days after the last update.

Login to write a write a comment.

Login via GitHub

Reviewers
Assignees
No one assigned
Labels
Milestone