Instant DevelopersArchitectureScheduler

Scheduler

|||

INDW's merge scheduler: admission tiers, ordered apply, checkpoints, and mixture orchestration.

The scheduler (indw.schedule) coordinates the full merge lifecycle: source interleaving, stage pool dispatch, admission tier assignment, dedup stack management, and ordered apply to output. It is the operational core that makes deterministic output possible at any scale.

Merge lifecycle

Rendering diagram...

Bootstrap

bootstrap_merge_run initializes the merge context:

  • Loads QualityPipelineConfig and resolves PipelineConfigContext
  • Creates CorpusCleaningPipeline, QualityGate, and dedup stack
  • Opens or restores checkpoint state from work directory
  • Acquires merge run lock to prevent concurrent writes

Source interleaving

_InterleavedSources reads multiple raw/<source>/data.jsonl files in weighted round-robin order. Each source maintains an independent line offset for checkpoint resume.

python
from indw.schedule.core import _InterleavedSources
 
sources = _InterleavedSources.open(source_paths, checkpoint, mix_weights)
for name, path, line_no, line in sources:
    process_document(name, line_no, line)

Apply coordinator

The apply coordinator writes survivors to output in strict document sequence. Out-of-order results from parallel workers are buffered and applied when their sequence number is reached. This guarantees deterministic ordering regardless of worker completion order.

Admission tiers

PCI admission assigns documents to tiers before heavy processing:

TierNameBehavior
TIER0Fast passProceeds to heavy stage pools immediately
TIER1RejectedTerminal — written to reject log, not output

evaluate_admission in Stage0 assigns tiers based on meaningful character count and structural signals. Documents rejected at Stage0 receive TIER1.

Checkpoints

MergeCheckpoint tracks per-source progress:

python
from indw.schedule.state.checkpoint import MergeCheckpoint
 
checkpoint = MergeCheckpoint(merge_work)
offset = checkpoint.line_offset("web-crawl")  # lines already processed
checkpoint.advance("web-crawl", lines_processed=500)
checkpoint.flush()

Resume behavior:

  • Default: resume from last checkpoint (resume=True)
  • --fresh: clear checkpoints and start clean
  • Checkpoint interval configurable via checkpoint_interval
Concurrent merges:

Two merge processes writing to the same work directory can corrupt checkpoints. The merge run lock prevents this by default. Use --force only when you intend to take over a stale lock.

Dedup stack

The scheduler builds and restores a three-layer dedup stack during bootstrap:

LayerTypeScope
ExactSHA-256 content hashGlobal across corpus
FuzzyMinHash LSHNear-duplicate detection
SemanticEmbedding + FAISSParaphrase detection

Dedup state persists in the work directory for resume. restore_merge_dedup_from_output rebuilds indexes from prior output when resuming.

Mixture orchestration

MixtureOrchestrationConfig controls source weighting and domain balance:

yaml
balance:
  enabled: true
  domain_caps:
    web: 0.55
    code: 0.20
  soft_cap_overflow: 0.15
  quality_cap_bypass_score: 0.75

build_corpus_mixture_plan computes target mixture fractions. The apply coordinator enforces caps during ordered write, bypassing caps for documents above the quality threshold.

Finalize

run_merge_finalize completes the merge:

  • Flushes dedup indexes and checkpoint state
  • Computes sorted_output_hash for determinism verification
  • Emits stage profile and reject log summaries
  • Releases merge run lock

Python API

python
from indw.schedule import merge_with_quality
from indw.filter.spec.quality import QualityPipelineConfig
 
result = merge_with_quality(
    "./raw", "./out/filtered.jsonl",
    quality_config=QualityPipelineConfig(),
    work_dir="./work",
    workers=4,
    chunk_size=500,
    resume=True,
)
print(result["sorted_output_hash"])
© 2026 Instant Developers. All rights reserved.