INDW exposes a Python API for programmatic pipeline execution, configuration, and export. All public symbols are importable from the indw package.
Core merge
merge_with_quality
from indw.schedule import merge_with_quality
result: dict = merge_with_quality(
raw_dir, # Path | str — directory with */data.jsonl
out_path, # Path | str — output JSONL path
*,
quality_config=None, # QualityPipelineConfig
corpus_registry=None, # CorpusRegistry
work_dir=None, # Path — checkpoints and artifacts
fresh=False, # clear checkpoints
resume=True, # resume from checkpoint
append=False, # append to existing output
workers=None, # parallel worker count
chunk_size=None, # batch size
source_filter=None, # list[str] — subset of sources
checkpoint_interval=None, # lines between checkpoint flushes
time_limit_sec=None, # optional time limit
)
Returns a dict with keys including kept, rejected, sorted_output_hash, and stage timing summaries.
Configuration
QualityPipelineConfig
from indw.filter.spec.quality import QualityPipelineConfig
cfg = QualityPipelineConfig() # defaults
cfg = QualityPipelineConfig.from_dict(yaml_dict) # from YAML
cfg_dict = cfg.to_dict() # serialize
# Policy accessors
cfg.language_policy() # LanguagePolicyConfig
cfg.pii_policy() # PiiPolicyConfig
cfg.toxicity_policy() # ToxicityPolicyConfig
cfg.license_policy() # LicensePolicyConfig
PipelineConfigContext
from indw.config.resolve import PipelineConfigContext
ctx = PipelineConfigContext.resolve(quality_spec="data/filtering/quality_fast_first")
cfg = ctx.quality
gate_ctx = ctx.with_quality(cfg)
Quality gate
QualityGate
from indw.filter.gate.quality import QualityGate
gate = QualityGate(ctx=PipelineConfigContext.resolve())
decision = gate.evaluate(document_dict)
# decision.accepted: bool
# decision.score: float
# decision.reasons: list[str]
Ingest
FastDatasetPipeline
from indw.ingest.run import FastDatasetPipeline
pipeline = FastDatasetPipeline(
work_dir="./work",
config=None,
corpus_id="default",
quality_config_path="configs/filtering/quality_fast_first.yaml",
)
output_path = pipeline.run(
sources_dict,
skip_download=False,
fresh_merge=False,
merge_workers=4,
)
DatasetDownloader
from indw.ingest.download import DatasetDownloader
downloader = DatasetDownloader("./raw")
downloader.fetch_all(sources_config)
Corpus registry
CorpusRegistry
from indw.store.corpus.registry import CorpusRegistry
corpus = CorpusRegistry("./work", corpus_id="my-corpus")
Deduplication
from indw.dedup.normalize import content_hash
h = content_hash("normalized text content")
Export
from indw.store.export.fast_export import export_token_bins_fast
from indw.store.export.memmap_stream import build_pretrain_dataloader, build_val_dataloader
export_token_bins_fast(jsonl_path, output_dir, tokenizer_path="./tokenizer")
loader = build_pretrain_dataloader(token_bins_dir, batch_size=32)
Mixture planning
from indw.schedule.mix.mixture_planner import build_corpus_mixture_plan
from indw.schedule.mix.config import MixtureOrchestrationConfig
plan = build_corpus_mixture_plan(orchestration_config, source_registry)
Checkpoint and triage
from indw.schedule.state.checkpoint import MergeCheckpoint, triage_merge
checkpoint = MergeCheckpoint(work_dir)
offset = checkpoint.line_offset("source-name")
report = triage_merge(work_dir, raw_dir=raw_dir)
Backend resolution
from indw.schedule.backends.config import pipeline_execution_backend, dask_scheduler_address
from indw.schedule.backends.factory import resolve_execution_backend
backend_name = pipeline_execution_backend()
backend = resolve_execution_backend()
scheduler = dask_scheduler_address()
Package exports
from indw import (
CorpusRegistry,
QualityGate,
QualityPipelineConfig,
merge_with_quality,
FastDatasetPipeline,
DatasetDownloader,
export_token_bins_fast,
build_pretrain_dataloader,
build_val_dataloader,
)
© 2026 Instant Developers. All rights reserved.