cog
1525f7f8 - feat(coglet): IPC bridge enhancements for file outputs, large payloads, and upload URL (#2746)

Commit
32 days ago
feat(coglet): IPC bridge enhancements for file outputs, large payloads, and upload URL (#2746) * fix(coglet): ensure logs are under 4MiB across bridge This commit truncates log lines at 4MiB of data and adds a truncation notice to the log line. This protects against a panic/slot poisoning that can happen if we exceed the codec configured (8MiB) size. Any log line that exceeds 1MiB boarders on useless. To ensure that even the most insane log lines are kept without disruption, we have implemented 4MiB limit. * feat(coglet): wire file-based output spilling for large IPC frames Outputs > 6MiB are spilled to disk and sent as FileOutput path references over IPC instead of inline, avoiding the 8MiB LengthDelimitedCodec frame limit. Coglet creates per-prediction output dirs at /tmp/coglet/outputs/{prediction_id}/ and passes the path to the worker via SlotRequest::Predict. * feat(coglet): wire orchestrator FileOutput handling from disk When the worker spills output to disk (FileOutput), the orchestrator reads it back and integrates it into the prediction via append_output. Handles both Oversized (JSON spill) and FileType (path reference) variants. * feat(coglet): stream generator yields over IPC as they happen Generator outputs were collected into a Vec and bundled into a single Done frame, which could exceed the 8MiB IPC limit. Now each yield is sent immediately via slot_sender.send_output(), streaming through SlotResponse::Output frames. The Done frame carries an empty array for generators since outputs were already streamed. Plumbs slot_sender from PythonPredictHandler::predict through to process_generator_output and process_async_result for both predict and train paths. * feat(coglet): route file outputs to disk via SlotSender File-type outputs (os.PathLike, io.IOBase) are now detected in the worker and written to the per-prediction output dir instead of being base64-encoded in-process. This keeps network I/O out of the worker subprocess — the parent process handles uploads. - Add SlotSender::write_file_output(bytes, ext) for language-agnostic file writing from any FFI worker (Python, Node, etc.) - Add send_output_item() helper that routes by type: PathLike sends existing path, IOBase reads bytes and writes via SlotSender, everything else goes through the existing JSON serialization path - Update process_single_output to also detect and route file types * feat(coglet): base64 data URI fallback for file outputs in orchestrator The orchestrator's FileOutput handler now distinguishes between FileOutputKind::Oversized (JSON spill, deserialize as before) and FileOutputKind::FileType (binary file, base64-encode as data URI). This is the fallback path when no upload_url is configured. File outputs from the worker are read from disk and encoded as data:{mime};base64,{data} URIs using mime_guess for content type detection. * feat(coglet): add optional mime_type to FileOutput protocol Add mime_type: Option<String> to SlotResponse::FileOutput so FFI workers can pass an explicit MIME type. When None, the orchestrator falls back to mime_guess from the file extension (matching old cog behavior). Plumbing is in place for future use — all current callers pass None. * feat(coglet): wire upload_url from CLI to orchestrator for file uploads Thread --upload-url CLI arg through coglet.server.serve() to the orchestrator event loop. When set, file outputs are PUT to the signed endpoint per-yield as received (matching old Python cog behavior). - upload_file() does PUT with Content-Type, extracts Location header, strips query params, follows redirects via reqwest - Uploads are spawned as tokio tasks so they don't block the event loop - Done handler awaits pending uploads before finalizing the prediction - Failed/Cancelled/Error handlers abort pending uploads immediately - Falls back to base64 data URI encoding when no upload_url is set * fix(coglet): fix stub generation and venv setup - Root .venv is now created by _setup_venv task (used by build:coglet, build:coglet:wheel, build:sdk, generate:stubs, stub:check) - _.python.venv uses REPO_ROOT so all tasks share the same venv - mise clean:python removes .venv, coglet-python/.venv, and *.so - stub_gen.rs: write coglet/_impl.pyi (not __init__.pyi) so the native module stubs don't get overwritten by mypy stubgen - module_variable! declarations for __version__, __build__, server so ty can resolve members imported from coglet._impl - stub:check depends on generate:stubs to avoid duplicating logic - #[allow(clippy::too_many_arguments)] on serve_impl (8 args after upload_url) * chore(coglet): regenerate Python type stubs * fix(coglet): decouple output from Done message to prevent IPC frame overflow Output is now always sent as a separate message before Done, with automatic spill-to-disk for values exceeding the 6MiB IPC frame limit. The orchestrator reconstructs the final output from accumulated per-yield values (generators, file uploads) rather than from the Done message payload. This fixes three issues: - Large single outputs (>8MiB) caused "frame size too big" panics that poisoned slots, since the entire output was embedded in the Done message - Generator/iterator outputs were lost because per-yield values accumulated in outputs() were ignored by the Done handler in favor of the empty Stream(vec![]) from the worker - File upload outputs (Path, IOBase) yielded from generators were silently dropped for the same reason Also adds --upload-url flag to `cog serve`, mock upload server to the integration test harness, and three new integration tests: - coglet_large_output: 9MiB string output (would poison slot without spill) - coglet_iterator_upload_url: generator Path yields uploaded to --upload-url - coglet_iterator_path_output: generator Path yields as base64 data URIs * chore: go fmt * fix(test): correct coglet_iterator_path_output assertion and go formatting cog predict writes file outputs to disk, not as base64 JSON to stdout. Fix assertion to check stderr for "Written output to" messages. * fix(coglet): complete prediction synchronously when no uploads pending The Done handler unconditionally used tokio::spawn to wait for pending uploads before calling set_succeeded(). This introduced a race with service.rs: Notify::notified() could miss the wakeup if the spawned task called notify_waiters() before the service registered its waiter, causing the prediction to hang indefinitely. Split the Done handler into two paths: - No uploads: call set_succeeded() synchronously in the event loop - Has uploads: spawn a task to await them (preserves existing behavior) This restores the synchronous notification path for the common case and fixes the CI hang in the coglet_large_output integration test. * chore: update llm docs * fix(coglet): use notify_one instead of notify_waiters to eliminate race notify_waiters() only wakes currently-registered waiters. In the predict flow, service.rs checks is_terminal() then awaits on a separate Notify — if the orchestrator fires notify_waiters() between those two steps, the notification is lost and the prediction hangs. notify_one() stores a permit that a future .notified().await consumes immediately, closing the race window entirely. There is exactly one waiter per prediction so notify_one is semantically correct. * fix(build): add source caching, fix output globs, and propagate COG_CA_CERT - build:cog: add sources/outputs so mise skips rebuild when Go sources are unchanged - build:coglet:wheel:linux-x64: fix output glob to match maturin's actual manylinux filename (was never matching, always rebuilding) - build:coglet:wheel:linux-arm64: same glob fix - test:integration: depend on linux-x64 wheel instead of native macOS wheel (only linux wheel is needed for Docker tests) - test:integration: pass extra args through to go test (e.g. -count=4) - test:integration: propagate COG_CA_CERT for custom CA certs (WARP) - Helper tasks (_setup_dist, _setup_venv, _clean_dist): use silent instead of quiet to suppress timing output - AGENTS.md: correct outdated references to embedded Python wheel (wheels are resolved from dist/ at Docker build time) * fix(serve): unify cog serve build path with cog build via ExcludeSource All CLI dev-mode commands (serve, predict, run, train) now use resolver.Build() with ExcludeSource=true instead of the separate BuildBase() path. This ensures coglet and SDK wheels are properly installed in dev-mode images, sharing Docker layer cache with cog build (all layers before COPY . /src are identical). Key changes: - Add ExcludeSource field to BuildOptions; when true, generates Dockerfile via GenerateModelBase (no COPY . /src) - Update serve, predict, run, train CLI commands to use Build() with serveBuildOptions() instead of BuildBase() - Fix GenerateOpenAPISchema to accept optional sourceDir for volume-mounting /src during schema validation - Centralize harness env var propagation (propagatedEnvVars) used by both Setup() and cmdCogServe - Add coglet_large_output integration test: async webhook-based test for 9MiB output that exceeds IPC frame limit * refactor: remove BuildBase — all dev-mode commands use Build(ExcludeSource) Remove the entire BuildBase code path that is now dead: - Factory.BuildBase interface method - DockerfileFactory.BuildBase implementation - Resolver.BuildBase method - image.BuildBase function - BuildBaseOptions struct and WithDefaults - buildBaseOptionsFromFlags CLI helper - config.BaseDockerImageName helper - mockFactory.buildBaseFunc in tests All dev-mode CLI commands (serve, predict, run, train) now use Build(ExcludeSource=true) which shares Docker layer cache with cog build and properly installs all wheels. * chore: remove unused containerInfo type (lint fix) * fix(run): skip schema validation for cog run cog run executes arbitrary commands (echo, cat, etc.) and may not have a predictor or trainer in cog.yaml. Add SkipSchemaValidation to BuildOptions so schema generation, validation, and bundling are all skipped for cog run — which previously worked because BuildBase never validated schemas.
Author
Parents
Loading