Operational workflows for running INDW in development, CI, staging, and production environments.
Development workflow
Iterate on quality profiles and custom gates locally:
# 1. Verify install
indw doctor
# 2. Run merge on sample corpus
indw merge ./examples/raw ./examples/out/filtered.jsonl \
--work-dir ./examples/work --workers 2 --fresh
# 3. Run unit tests
indw test --profile unit
# 4. Inspect reject log
cat ./examples/work/audit/reject_log.jsonl | head -20
CI validation workflow
Gate pipeline changes on parity and critical tests:
#!/bin/bash
set -euo pipefail
indw doctor
indw test --profile critical
indw test --profile parity
indw validate
Add to your CI pipeline after any change to indw.filter, indw.schedule, or indw.clean.
Custom config workflow
Load a YAML quality profile for reproducible runs:
python examples/merge_custom_config.py
Or programmatically:
import yaml
from pathlib import Path
from indw.filter.spec.quality import QualityPipelineConfig
from indw.schedule import merge_with_quality
cfg = QualityPipelineConfig.from_dict(
yaml.safe_load(Path("configs/filtering/quality_fast_first.yaml").read_text())
)
merge_with_quality("./raw", "./out/filtered.jsonl", quality_config=cfg, work_dir="./work", fresh=True)
Resume workflow
Interrupted merges resume from checkpoint:
# First run (interrupted)
indw merge ./raw ./out/filtered.jsonl --work-dir ./work --workers 4
# Resume (omit --fresh)
indw merge ./raw ./out/filtered.jsonl --work-dir ./work --workers 4
Checkpoint state tracks per-source line offsets. Changing --workers on resume is safe — output hash remains stable.
Production certification workflow
Certify a pipeline before scaling to full corpus:
# 1. Run on acceptance sample
indw merge ./raw-sample ./out/sample.jsonl --work-dir ./work-sample --fresh --workers 4
# 2. Validate determinism
indw validate
# 3. Audit pipeline health
indw audit --kind pipeline --work-dir ./work-sample
# 4. Stage0 throughput check
indw audit --kind stage0 --workers 4
# 5. Production benchmark
indw benchmark
# 6. Scale to full corpus
indw merge ./raw ./out/filtered.jsonl --work-dir ./work --workers 16 --backend multiprocess
Distributed workflow
Scale to a Dask cluster for large corpora:
# Start Dask scheduler (on cluster head node)
dask scheduler --host 0.0.0.0 --port 8786
# Start workers
dask worker tcp://scheduler:8786 --nworkers 4 --memory-limit 8GB
# Run merge from client
export INSTANT_PIPELINE_BACKEND=dask
export INSTANT_DASK_SCHEDULER=tcp://scheduler:8786
indw merge ./raw ./out/filtered.jsonl --workers 16 --work-dir ./work
# Verify Dask integration
indw audit --kind dask
Ingest + merge workflow
End-to-end from Hugging Face download to filtered output:
from indw.ingest.run import FastDatasetPipeline
pipeline = FastDatasetPipeline(
work_dir="./work",
quality_config_path="configs/filtering/quality_fast_first.yaml",
)
pipeline.run(sources_yaml, merge_workers=4, fresh_merge=True)
Triage workflow
Diagnose failed or stuck merges:
from indw.schedule.state.checkpoint import triage_merge
report = triage_merge(Path("./work"), raw_dir=Path("./raw"))
print(report)
Triage reports checkpoint state, lock status, output sync, and recommended recovery actions.
© 2026 Instant Developers. All rights reserved.