Instant DevelopersDeveloperCustom Stages

Custom Stages

|||

Extend INDW pipeline stages with custom filters, gates, and cleaning logic.

INDW pipelines are extensible at several integration points: Stage0 content filters, quality gate evaluation, cleaning pipeline stages, and document gate checks.

Integration points

PointModuleWhen it runs
Stage0 content filterindw.filter.stage0.engineBefore heavy stages
Document gateindw.clean.gate.evaluateAfter cleaning
Quality gateindw.filter.gate.qualityAfter extraction and dedup
Cleaning stageindw.clean.document.cleanDuring clean pool
Admission tierindw.filter.stage0.admissionEnd of Stage0

Custom Stage0 filter

Add rejection logic to the structural filter path by extending run_stage0_content_filters behavior through configuration thresholds, or by adding a pre-filter hook:

python
from indw.filter.stage0.engine import run_stage0_content_filters, worker_gate_policy
from indw.clean.gate.evaluate import document_gate_raw
 
def custom_stage0_check(text: str, meaningful_chars: int) -> str | None:
    """Return rejection reason string, or None to pass."""
    if meaningful_chars < 200:
        return "too_short_custom"
    features = document_gate_raw(text)
    if features.get("custom_signal", 0) > 0.9:
        return "custom_spam_detected"
    return None
 
# Integrate in batch processing:
pol = worker_gate_policy()
reject = run_stage0_content_filters(text, meaningful_chars=chars, raw=features, pol=pol)
if reject is None:
    reject = custom_stage0_check(text, chars)

Custom quality gate

Extend quality evaluation by adding threshold checks to your quality profile:

yaml
thresholds:
  min_chars: 200
  max_commercial_score: 0.30   # tighten for your corpus
  min_informative_density: 0.020

For programmatic custom scoring:

python
from indw.filter.gate.quality import QualityGate
from indw.config.resolve import PipelineConfigContext
 
class CustomQualityGate(QualityGate):
    def evaluate(self, doc: dict):
        decision = super().evaluate(doc)
        if doc.get("meta", {}).get("source_tier") == "low":
            decision.score *= 0.8
        return decision

Custom cleaning stage

Add cleaning logic to the document cleaning pipeline:

python
from indw.clean.document.clean import clean_document_text
 
def custom_clean(text: str, config) -> str:
    text = clean_document_text(text, config)
    # Custom post-processing
    text = text.replace("REDACTED_TEMPLATE", "")
    return text

Register custom cleaning in CleaningConfig by enabling the relevant stages and adding post-clean hooks in your merge bootstrap.

Custom document gate

python
from indw.clean.gate.evaluate import document_gate_raw
 
def custom_document_gate(text: str, thresholds: dict) -> str | None:
    features = document_gate_raw(text)
    if features["alpha_ratio"] < thresholds.get("min_alpha_ratio", 0.45):
        return "low_alpha_ratio"
    if "required_keyword" not in text.lower():
        return "missing_required_content"
    return None

Stage pool registration

For deeper integration, add a new stage pool to the execution graph:

python
# indw.schedule.stages.pools — follow existing pool patterns
def process_custom_pool_batch(batch: list[dict]) -> dict:
    terminal = []
    survivors = []
    for item in batch:
        if should_reject(item):
            terminal.append(make_terminal(item, reason="custom_reject"))
        else:
            survivors.append(item)
    return {"terminal": terminal, "survivors": survivors}

New pools must preserve the {"terminal": [...], "survivors": [...]} contract and integrate with MergeDocumentContext for stage marks.

Determinism requirement:

Custom stages must be deterministic: no randomness, no time-dependent logic, no unordered iteration. Non-deterministic stages break hash parity across backends.

© 2026 Instant Developers. All rights reserved.