feat(assets): register output files as assets after prompt execution#12812
feat(assets): register output files as assets after prompt execution#12812
Conversation
b9599b9 to
42eda2b
Compare
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds a pending enrichment queue to the asset seeder: new 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@app/assets/services/ingest.py`:
- Around line 153-184: ingest_existing_file currently ignores the user_metadata
parameter (it sets spec["metadata"]=None) which drops prompt_id; update
ingest_existing_file so spec["metadata"] preserves the passed user_metadata
(convert it to the expected ExtractedMetadata form if needed) before calling
batch_insert_seed_assets: i.e., set spec["metadata"] = user_metadata if it's
already the right type, otherwise call the conversion method (e.g.,
user_metadata.to_user_metadata() or equivalent) so batch_insert_seed_assets can
extract prompt_id correctly. Ensure you reference the ingest_existing_file
function, the spec["metadata"] field, and
batch_insert_seed_assets/.to_user_metadata() handling when making the change.
In `@main.py`:
- Around line 234-249: The function _collect_output_absolute_paths should guard
against folder_paths.get_directory_by_type("output") returning None: capture
base_dir = folder_paths.get_directory_by_type("output") and if base_dir is None
either raise a clear exception (e.g., ValueError) or return an empty list to
avoid calling os.path.join(None, ...); update the logic in
_collect_output_absolute_paths to check base_dir before the loop and use
base_dir with os.path.join only when non-None so filenames are not turned into
"None/..." paths.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 50c7b5df-d9de-4f07-8ab5-1c92f13bc666
📒 Files selected for processing (5)
app/assets/seeder.pyapp/assets/services/__init__.pyapp/assets/services/ingest.pymain.pytests/test_asset_seeder.py
42eda2b to
f3b8454
Compare
There was a problem hiding this comment.
♻️ Duplicate comments (1)
main.py (1)
244-259:⚠️ Potential issue | 🟡 MinorGuard against
Nonefromget_directory_by_type.
folder_paths.get_directory_by_type("output")returnsstr | None. If it returnsNone,os.path.join(None, ...)produces paths like"None/subfolder/filename", which would silently fail during file registration.🛡️ Defensive fix
def _collect_output_absolute_paths(history_result: dict) -> list[str]: """Extract absolute file paths for output items from a history result.""" paths = [] base_dir = folder_paths.get_directory_by_type("output") + if base_dir is None: + return paths for node_output in history_result.get("outputs", {}).values():🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@main.py` around lines 244 - 259, The function _collect_output_absolute_paths uses folder_paths.get_directory_by_type("output") which may return None; guard against that by checking base_dir (the result of folder_paths.get_directory_by_type) before using os.path.join: if base_dir is None, return an empty list (or otherwise handle the missing output directory) so you don't build paths like "None/..." and silently register invalid file paths; update the logic in _collect_output_absolute_paths to perform this check and only call os.path.join when base_dir is a valid str.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@main.py`:
- Around line 244-259: The function _collect_output_absolute_paths uses
folder_paths.get_directory_by_type("output") which may return None; guard
against that by checking base_dir (the result of
folder_paths.get_directory_by_type) before using os.path.join: if base_dir is
None, return an empty list (or otherwise handle the missing output directory) so
you don't build paths like "None/..." and silently register invalid file paths;
update the logic in _collect_output_absolute_paths to perform this check and
only call os.path.join when base_dir is a valid str.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 29a281d2-1801-414d-81a8-268bb70dac99
📒 Files selected for processing (7)
app/assets/scanner.pyapp/assets/seeder.pyapp/assets/services/__init__.pyapp/assets/services/bulk_ingest.pyapp/assets/services/ingest.pymain.pytests/test_asset_seeder.py
f3b8454 to
b1ebcd0
Compare
There was a problem hiding this comment.
♻️ Duplicate comments (1)
main.py (1)
244-259:⚠️ Potential issue | 🟡 MinorGuard against
base_dirbeingNone.
folder_paths.get_directory_by_type("output")returnsstr | None. If it returnsNone,os.path.join(None, ...)produces paths like"None/subfolder/filename", which will silently fail downstream whenregister_output_fileschecksos.path.isfile().🛡️ Suggested fix
def _collect_output_absolute_paths(history_result: dict) -> list[str]: """Extract absolute file paths for output items from a history result.""" paths = [] base_dir = folder_paths.get_directory_by_type("output") + if base_dir is None: + return paths for node_output in history_result.get("outputs", {}).values():🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@main.py` around lines 244 - 259, The _collect_output_absolute_paths function currently assumes folder_paths.get_directory_by_type("output") returns a string, which can be None; guard against base_dir being None by checking the result of folder_paths.get_directory_by_type("output") at the start of _collect_output_absolute_paths and either return an empty list (or raise a clear error) if base_dir is None, so you never call os.path.join with None; update references in the function that use base_dir and ensure downstream callers like register_output_files receive only valid filesystem paths.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@main.py`:
- Around line 244-259: The _collect_output_absolute_paths function currently
assumes folder_paths.get_directory_by_type("output") returns a string, which can
be None; guard against base_dir being None by checking the result of
folder_paths.get_directory_by_type("output") at the start of
_collect_output_absolute_paths and either return an empty list (or raise a clear
error) if base_dir is None, so you never call os.path.join with None; update
references in the function that use base_dir and ensure downstream callers like
register_output_files receive only valid filesystem paths.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: e3a7a79c-2bee-454f-bb23-c399556eb70a
📒 Files selected for processing (7)
app/assets/scanner.pyapp/assets/seeder.pyapp/assets/services/__init__.pyapp/assets/services/bulk_ingest.pyapp/assets/services/ingest.pymain.pytests/test_asset_seeder.py
🚧 Files skipped from review as they are similar to previous changes (3)
- app/assets/scanner.py
- app/assets/services/bulk_ingest.py
- app/assets/services/ingest.py
b1ebcd0 to
ca66115
Compare
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@app/assets/seeder.py`:
- Around line 637-645: The drain currently clears _pending_enrich and releases
_lock before calling start_enrich, allowing enqueue_enrich to race and drop
work; make the handoff atomic by invoking start_enrich while still holding
self._lock (or by looping: keep _pending_enrich set and call start_enrich under
the lock until it returns True, only then clear _pending_enrich and call
_reset_to_idle). Update the block that references _pending_enrich, self._lock,
start_enrich, enqueue_enrich and _reset_to_idle so the transfer of the pending
payload to start_enrich cannot be interleaved with enqueue_enrich.
In `@app/assets/services/bulk_ingest.py`:
- Around line 37-40: The winner path assumes extracted_metadata has a
to_meta_rows() method but SeedAssetSpec.metadata can be a plain dict; update the
winner loop and ingest_existing_file() handling (and the similar spots around
the other occurrences referenced) to detect dict-backed metadata (e.g., check
isinstance(extracted_metadata, dict) or hasattr(extracted_metadata,
"to_meta_rows")) and only call extracted_metadata.to_meta_rows() when present;
for dicts, convert the dict into the expected metadata rows format (or pass
through/create rows from key/value pairs) so no AttributeError is raised when
inserting references.
In `@app/assets/services/ingest.py`:
- Around line 143-149: register_output_files currently only routes to
batch_insert_seed_assets which skips existing paths, so overwritten output
filenames keep stale asset rows (mtime_ns, job_id, hash) and later enrich can't
repair them; update the logic so ingest_existing_file (and register_output_files
loop) detects when an output path already maps to an asset and performs an
update of that asset's metadata (mtime_ns, job_id, checksum/hash) instead of
relying solely on batch_insert_seed_assets for new inserts—introduce or call an
update_existing_asset(path, user_metadata, job_id, new_hash, new_mtime_ns) path
from ingest_existing_file, ensure register_output_files increments registered
only when an insert or an update occurs, and keep batch_insert_seed_assets only
for brand-new file paths.
In `@main.py`:
- Around line 343-346: The output registration block is incorrectly nested
inside the 10-second GC branch and is therefore subject to overwritten
e.history_result and prompt_id when multiple prompts finish; move this logic out
of the periodic GC path so it runs immediately after each prompt completes
(i.e., invoke the sequence where the prompt completion is handled), calling
_collect_output_absolute_paths(e.history_result) and then
register_output_files(paths, job_id=prompt_id) and, if the return > 0,
asset_seeder.enqueue_enrich(roots=("output",), compute_hashes=True); ensure you
check asset_seeder.is_disabled() first and use the local e.history_result and
prompt_id (not shared/outer variables) so each prompt’s outputs are registered
independently.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: cb4e2bb5-ba06-4709-933c-31114300eab3
📒 Files selected for processing (7)
app/assets/scanner.pyapp/assets/seeder.pyapp/assets/services/__init__.pyapp/assets/services/bulk_ingest.pyapp/assets/services/ingest.pymain.pytests/test_asset_seeder.py
🚧 Files skipped from review as they are similar to previous changes (2)
- app/assets/services/init.py
- app/assets/scanner.py
99480ca to
a1233b1
Compare
There was a problem hiding this comment.
Actionable comments posted: 4
♻️ Duplicate comments (1)
app/assets/services/ingest.py (1)
150-151:⚠️ Potential issue | 🟡 MinorEither persist
user_metadatahere or drop the parameter.The payload is passed all the way into
ingest_existing_file(), but the update branch never writes it and the insert branch hardcodes"metadata": None. Any caller-supplied metadata is silently discarded.Also applies to: 159-227
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/assets/services/ingest.py` around lines 150 - 151, ingest_existing_file is receiving user_metadata but never persisting it: in the insert branch it hardcodes "metadata": None and the update branch doesn't write user_metadata; either remove the user_metadata parameter or persist it—update the insert branch to set "metadata": user_metadata (or merge with existing metadata) and modify the update branch to write/merge user_metadata into the stored record (or explicitly document/ignore it if you choose to drop the param); ensure changes are made inside the ingest_existing_file implementation and any callers remain consistent with the new behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@app/assets/scanner.py`:
- Line 430: The stale-result guard is wrongly comparing against the file's
current mtime (via get_mtime_ns(stat_p)) which causes the normal “file changed
after stub registration” path to be treated as stale and leave the DB row at
ENRICHMENT_STUB; instead capture and compare against the mtime that was recorded
when enrichment started (use the saved value that's stored with the row or the
variable initial_mtime_ns) or update/persist the new mtime_ns into the row
before re-queueing; change the comparison logic in the enrichment retry path
(places around initial_mtime_ns / get_mtime_ns(stat_p) and the ENRICHMENT_STUB
handling) to use the original-start mtime or persist the fresh mtime prior to
re-queue so the row can't remain stuck at ENRICHMENT_STUB.
In `@app/assets/seeder.py`:
- Around line 220-236: start_enrich() is being called before taking self._lock
which allows a race where the active scan finishes and the pending request is
enqueued after the worker stopped; make the enqueue-vs-start decision atomic by
taking self._lock before calling start_enrich (or implement a double-check: call
start_enrich, and if it returns False acquire self._lock and call start_enrich
again while holding the lock, enqueuing into self._pending_enrich only if the
second call still returns False). Update the logic in the method containing the
call to start_enrich, referencing self._lock, self._pending_enrich, and
start_enrich to ensure no gap between deciding to start a scan and setting
pending work.
In `@app/assets/services/ingest.py`:
- Around line 183-210: The branch that revives an existing reference (found via
get_reference_by_file_path) clears existing_ref.is_missing but never clears the
asset's missing tag; update the logic in the existing_ref handling to call
remove_missing_tag_for_asset_id(session, asset.id) whenever the reference is
revived and an asset is present (both in the siblings > 0 path after creating
new_asset — call remove_missing_tag_for_asset_id for new_asset.id — and in the
siblings == 0 path after reusing/mutating asset — call
remove_missing_tag_for_asset_id for asset.id), mirroring what
_ingest_file_from_path() does so the stale missing tag is removed immediately.
- Around line 213-225: The new path builds a spec dict and calls
batch_insert_seed_assets() without pre-creating derived tag rows, which causes
missing tag rows or insert failures; update this flow to call the same
tag-seeding logic used by insert_asset_specs() before invoking
batch_insert_seed_assets(): ensure derived tags for the provided tags list are
created (e.g., reuse the tag seeding function or invoke the same helper that
insert_asset_specs uses) so that the spec dict (built in this block) references
existing tag rows when batch_insert_seed_assets(session, [spec],
owner_id=owner_id) runs.
---
Duplicate comments:
In `@app/assets/services/ingest.py`:
- Around line 150-151: ingest_existing_file is receiving user_metadata but never
persisting it: in the insert branch it hardcodes "metadata": None and the update
branch doesn't write user_metadata; either remove the user_metadata parameter or
persist it—update the insert branch to set "metadata": user_metadata (or merge
with existing metadata) and modify the update branch to write/merge
user_metadata into the stored record (or explicitly document/ignore it if you
choose to drop the param); ensure changes are made inside the
ingest_existing_file implementation and any callers remain consistent with the
new behavior.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 854ce9df-1479-48f2-a086-e7ee08896729
📒 Files selected for processing (11)
app/assets/database/queries/__init__.pyapp/assets/database/queries/asset.pyapp/assets/database/queries/asset_reference.pyapp/assets/scanner.pyapp/assets/seeder.pyapp/assets/services/__init__.pyapp/assets/services/bulk_ingest.pyapp/assets/services/ingest.pymain.pytests-unit/seeder_test/test_seeder.pytests/test_asset_seeder.py
a1bcd7e to
939f9d7
Compare
fd16fa2 to
d350441
Compare
Add ingest_existing_file() to services/ingest.py as a public wrapper for registering on-disk files (stat, BLAKE3 hash, MIME detection, path-based tag derivation). After each prompt execution in the main loop, iterate history_result['outputs'] and register files with type 'output' as assets. Runs while the asset seeder is paused, gated behind asset_seeder.is_disabled(). Populates job_id on the asset_references table for provenance tracking. Ingest uses a two-phase approach: insert a stub record (hash=NULL) first for instant visibility, then defer hashing to the background seeder enrich phase to avoid blocking the prompt worker thread. When multiple enrich scans are enqueued while the seeder is busy, roots are now unioned and compute_hashes uses sticky-true (OR) logic so no queued work is silently dropped. Extract _reset_to_idle helper in the asset seeder to deduplicate the state reset pattern shared by _run_scan and mark_missing_outside_prefixes. Separate history parsing from output file registration: move generic file registration logic into register_output_files() in app/assets/services/ingest.py, keeping only the ComfyUI history format parsing (_collect_output_absolute_paths) in main.py. Amp-Thread-ID: https://ampcode.com/threads/T-019cf842-5199-71f0-941d-b420b5cf4d57
…ng it Pass user_metadata through spec['metadata'] to batch_insert_seed_assets. Update batch_insert_seed_assets to accept raw dicts (UserMetadata) in addition to ExtractedMetadata, passing them through as-is without calling .to_user_metadata(). Amp-Thread-ID: https://ampcode.com/threads/T-019cfbe3-6440-724b-a17b-66ce09ecd1ed
Return early with an empty list when folder_paths.get_directory_by_type('output')
returns None to avoid os.path.join(None, ...) producing invalid paths.
Amp-Thread-ID: https://ampcode.com/threads/T-019cfbe5-dffc-760a-9a37-9b041dd71e73
Move register_output_files() out of the periodic GC branch so it runs right after each prompt completes, using the local e.history_result and prompt_id. This prevents stale/overwritten values when multiple prompts finish before GC triggers. Keep enqueue_enrich() in the GC path since it's heavier and benefits from batching via the 10-second interval. Amp-Thread-ID: https://ampcode.com/threads/T-019cfdf4-c52c-771c-a920-57bac15c68be
ingest_existing_file now detects when a reference already exists for the path and updates mtime_ns, job_id, size_bytes, resets enrichment_level and clears the asset hash so the enricher re-hashes the new content. Only brand-new paths fall through to batch_insert_seed_assets. register_output_files only increments its counter on actual insert or update. Amp-Thread-ID: https://ampcode.com/threads/T-019cfdf4-c52c-771c-a920-57bac15c68be
…dict Have ingest_existing_file call extract_file_metadata() to build a proper ExtractedMetadata object, matching what the scanner does. This tightens SeedAssetSpec.metadata to ExtractedMetadata | None and removes dict-handling branches in bulk_ingest.py that would have raised AttributeError on to_meta_rows()/to_user_metadata(). Amp-Thread-ID: https://ampcode.com/threads/T-019cfdf9-2379-723a-82cf-306755e54396
Change _lock from Lock to RLock and move the start_enrich call inside the lock-held block so that enqueue_enrich cannot interleave between clearing _pending_enrich and starting the enrichment scan. This prevents a concurrent enqueue_enrich from stealing the IDLE slot and causing the drained payload to be silently dropped. Add tests covering: - pending enrich runs after scan completes - enqueue during drain does not lose work - concurrent enqueue during drain is queued for the next cycle Amp-Thread-ID: https://ampcode.com/threads/T-019cfe02-5710-7506-ae80-34bf16c0171a
…h validation - Detach ref to new stub asset on overwrite when siblings share the asset - Add optimistic mtime_ns guard in enrich_asset to discard stale results - Normalize and validate output paths stay under output root, deduplicate - Skip metadata extraction for stub-only registration (align with fast scan) - Add RLock comment explaining re-entrant drain requirement - Log warning when pending enrich drain fails to start - Add create_stub_asset and count_active_siblings query functions Amp-Thread-ID: https://ampcode.com/threads/T-019cfe06-f0dc-776f-81ad-e9f3d71be597
…t mtime - Make enqueue_enrich atomic by moving start_enrich call inside self._lock, preventing pending work from being lost when a scan finishes between the start attempt and the queue write. - Call ensure_tags_exist before batch_insert_seed_assets in ingest_existing_file to avoid FK violations on asset_reference_tags. - Fix test_enrich helper to use real file mtime instead of hardcoded value so the optimistic staleness guard in enrich_asset passes correctly. - Add db_engine_fk fixture (SQLite with PRAGMA foreign_keys=ON) and a regression test proving ingest_existing_file seeds Tag rows before inserting reference tags.
d350441 to
4266105
Compare
Summary
Register output files as assets immediately after prompt execution, replacing the need for a filesystem sweep to discover new outputs.
Changes
app/assets/services/ingest.py— Addingest_existing_file(), a public wrapper that handles stat, MIME detection, path-based tag derivation, and stub record insertion. Addregister_output_files()to batch-register a list of absolute paths as output assets withjob_idfor provenance.app/assets/services/bulk_ingest.py— Addjob_idfield toSeedAssetSpecandReferenceRowTypedDicts, wired throughbatch_insert_seed_assetsto populate thejob_idcolumn onasset_references.app/assets/services/__init__.py— Exportingest_existing_fileandregister_output_files.app/assets/seeder.py— Addstart_enrich()to trigger background hash computation. Add_reset_to_idle()helper to deduplicate state reset logic. Merge pending enrich requests (union roots, sticky-truecompute_hashes) so no queued work is silently dropped.app/assets/scanner.py— Passjob_id: Nonein spec dicts for scanned files (no associated prompt).main.py— Add_collect_output_absolute_paths()to parsehistory_result['outputs']and collect file paths. Callregister_output_files()withjob_id=prompt_idafter each prompt execution, then triggerenqueue_enrich()for background hashing.tests/test_asset_seeder.py— Test coverage for enqueue/merge/drain behavior and thread safety of the enrich flow.How it works
After each prompt execution, the executor's
history_result['outputs']contains a dict ofnode_id → ui_datawith every file each output node wrote. We parse that structure and register each file as an asset — no filesystem sweep needed.type == "output"(skips temp/preview files)job_idcolumn onasset_referenceswithprompt_idfor provenance trackingasset_seeder.is_disabled()