Instant DevelopersReferenceAPI Reference

API Reference

|||

Python API reference for INDW merge, configuration, ingest, and export functions.

INDW exposes a Python API for programmatic pipeline execution, configuration, and export. All public symbols are importable from the indw package.

Core merge

merge_with_quality

python
from indw.schedule import merge_with_quality
 
result: dict = merge_with_quality(
    raw_dir,           # Path | str — directory with */data.jsonl
    out_path,          # Path | str — output JSONL path
    *,
    quality_config=None,       # QualityPipelineConfig
    corpus_registry=None,      # CorpusRegistry
    work_dir=None,             # Path — checkpoints and artifacts
    fresh=False,               # clear checkpoints
    resume=True,               # resume from checkpoint
    append=False,              # append to existing output
    workers=None,              # parallel worker count
    chunk_size=None,           # batch size
    source_filter=None,        # list[str] — subset of sources
    checkpoint_interval=None,  # lines between checkpoint flushes
    time_limit_sec=None,       # optional time limit
)

Returns a dict with keys including kept, rejected, sorted_output_hash, and stage timing summaries.

Configuration

QualityPipelineConfig

python
from indw.filter.spec.quality import QualityPipelineConfig
 
cfg = QualityPipelineConfig()                          # defaults
cfg = QualityPipelineConfig.from_dict(yaml_dict)       # from YAML
cfg_dict = cfg.to_dict()                               # serialize
 
# Policy accessors
cfg.language_policy()    # LanguagePolicyConfig
cfg.pii_policy()         # PiiPolicyConfig
cfg.toxicity_policy()    # ToxicityPolicyConfig
cfg.license_policy()     # LicensePolicyConfig

PipelineConfigContext

python
from indw.config.resolve import PipelineConfigContext
 
ctx = PipelineConfigContext.resolve(quality_spec="data/filtering/quality_fast_first")
cfg = ctx.quality
gate_ctx = ctx.with_quality(cfg)

Quality gate

QualityGate

python
from indw.filter.gate.quality import QualityGate
 
gate = QualityGate(ctx=PipelineConfigContext.resolve())
decision = gate.evaluate(document_dict)
# decision.accepted: bool
# decision.score: float
# decision.reasons: list[str]

Ingest

FastDatasetPipeline

python
from indw.ingest.run import FastDatasetPipeline
 
pipeline = FastDatasetPipeline(
    work_dir="./work",
    config=None,
    corpus_id="default",
    quality_config_path="configs/filtering/quality_fast_first.yaml",
)
output_path = pipeline.run(
    sources_dict,
    skip_download=False,
    fresh_merge=False,
    merge_workers=4,
)

DatasetDownloader

python
from indw.ingest.download import DatasetDownloader
 
downloader = DatasetDownloader("./raw")
downloader.fetch_all(sources_config)

Corpus registry

CorpusRegistry

python
from indw.store.corpus.registry import CorpusRegistry
 
corpus = CorpusRegistry("./work", corpus_id="my-corpus")

Deduplication

python
from indw.dedup.normalize import content_hash
 
h = content_hash("normalized text content")

Export

python
from indw.store.export.fast_export import export_token_bins_fast
from indw.store.export.memmap_stream import build_pretrain_dataloader, build_val_dataloader
 
export_token_bins_fast(jsonl_path, output_dir, tokenizer_path="./tokenizer")
loader = build_pretrain_dataloader(token_bins_dir, batch_size=32)

Mixture planning

python
from indw.schedule.mix.mixture_planner import build_corpus_mixture_plan
from indw.schedule.mix.config import MixtureOrchestrationConfig
 
plan = build_corpus_mixture_plan(orchestration_config, source_registry)

Checkpoint and triage

python
from indw.schedule.state.checkpoint import MergeCheckpoint, triage_merge
 
checkpoint = MergeCheckpoint(work_dir)
offset = checkpoint.line_offset("source-name")
 
report = triage_merge(work_dir, raw_dir=raw_dir)

Backend resolution

python
from indw.schedule.backends.config import pipeline_execution_backend, dask_scheduler_address
from indw.schedule.backends.factory import resolve_execution_backend
 
backend_name = pipeline_execution_backend()
backend = resolve_execution_backend()
scheduler = dask_scheduler_address()

Package exports

python
from indw import (
    CorpusRegistry,
    QualityGate,
    QualityPipelineConfig,
    merge_with_quality,
    FastDatasetPipeline,
    DatasetDownloader,
    export_token_bins_fast,
    build_pretrain_dataloader,
    build_val_dataloader,
)
© 2026 Instant Developers. All rights reserved.