turbo-tasks-backend: prevent duplicate task restores with restoring bits (#92389)
### What?
Adds `data_restoring` and `meta_restoring` transient flag bits to
`TaskStorage`, a shared `restored: Event` on `Storage`, and a
synchronous `EventListener::wait()` primitive. These are used to
coordinate concurrent restore attempts so the same task is never loaded
from the backing store more than once simultaneously.
### Why?
When multiple threads race to access an unrestored task, each thread
would independently call into the backing storage to load the task's
data. This causes redundant I/O and can result in one thread's write
being silently discarded when another thread also applies a restore for
the same task. The fix ensures only one thread performs the actual I/O
while others wait for it to finish.
### How?
**New flag bits** (`storage_schema.rs`):
`meta_restoring` and `data_restoring` are transient (not persisted) bits
that track whether a restore is currently in progress for each category
of a task. `meta_restored` and `data_restored` are also transient bits
that indicate when a category has been fully restored.
**New `restored` event** (`storage.rs`):
A single `Event` on `Storage` that is notified after every restore
attempt (success or failure). Waiting threads subscribe to it, then
re-check the flags under lock after waking.
**`EventListener::wait()`** (`event.rs`):
A synchronous blocking wrapper around `event_listener`'s `.wait()`,
needed because restore logic runs in synchronous backend operation
contexts (not async). Includes both non-`hanging_detection` and
`hanging_detection` builds.
**Restore protocol** (`operation/mod.rs`):
All three restore entry points (`task()`, `task_pair()`,
`prepare_tasks_with_callback()`) follow the same pattern:
1. **Classify under lock** — for each category that needs restoring: if
the `*_restoring` bit is already set, mark the task as "wait";
otherwise, set the bit and claim the I/O.
2. **Drop lock and do I/O** — only the thread that claimed a category
performs the backing-store lookup.
3. **Wait for other threads** — done *after* our own I/O so both
threads' work can overlap. Uses a fast path (check flags under lock
before registering a listener) to skip listener allocation when the
other thread has already finished. Returns the `StorageWriteGuard`
directly to the caller to avoid a second lock acquisition.
4. **Apply results under lock** — merge the loaded data into the task
via `apply_restore_result()` (returns `Result<()>`), set `*_restored`,
clear `*_restoring`.
5. **Notify once after all tasks** — a single `notify(usize::MAX)` on
the `restored` event after all I/O results have been applied, so waiters
are unblocked as late as possible but all at once.
**Batch restores** (`prepare_tasks_with_callback`):
Uses a `TaskRestoreEntry` struct to track per-task state through phases:
- **Phase 1a** — classify each task under lock: already restored →
callback immediately; another thread restoring → mark as wait; unclaimed
→ set `*_restoring` bit and claim.
- **Phase 1b** — batch I/O for all claimed tasks (single-task or batch
backing-store API).
- **Phase 1c** — apply I/O results under lock, clear restoring bits.
Errors are collected (rather than panicking immediately) so all
restoring bits are cleared first—otherwise other threads waiting on
those bits would hang.
- **Phase 2** — callbacks for self-restored tasks (separated from 1c so
the notify happens first and other threads are unblocked as early as
possible). Also inserts task types into the task cache.
- **Phase 3** — per-task loop: for each task being restored by another
thread, wait (using `wait_for_restore_or_panic`), then immediately call
the callback with the returned write guard.
A fast path at the top of `prepare_tasks_with_callback` short-circuits
the entire pipeline when `!should_restore()`, marking tasks as restored
and calling callbacks directly.
**Shared helpers:**
- `apply_restore_result()` — returns `Result<()>`; clears the restoring
bit and merges storage on success, or returns `Err` for the caller to
handle after notifying waiters.
- `wait_for_restoring_task()` — fast path (check under write lock,
return guard if already restored) + slow path (listener loop). Accepts
`TaskDataCategory` to handle waiting for both categories in a single
call with `All`.
- `wait_for_restore_or_panic()` — wrapper that panics on error,
returning the `StorageWriteGuard` on success.
**Other changes:**
- `TurboPersistence` (`turbo-persistence/src/db.rs`): removed the
`is_empty: AtomicBool` field; emptiness is now checked via a read lock
on inner state, avoiding a stale flag that could prevent restores after
writes.
<!-- NEXT_JS_LLM_PR -->
---------
Co-authored-by: Tobias Koppers <sokra@users.noreply.github.com>
Co-authored-by: Claude <noreply@anthropic.com>