Extend INDW pipeline stages with custom filters, gates, and cleaning logic.
INDW pipelines are extensible at several integration points: Stage0 content filters, quality gate evaluation, cleaning pipeline stages, and document gate checks.
Integration points
Point
Module
When it runs
Stage0 content filter
indw.filter.stage0.engine
Before heavy stages
Document gate
indw.clean.gate.evaluate
After cleaning
Quality gate
indw.filter.gate.quality
After extraction and dedup
Cleaning stage
indw.clean.document.clean
During clean pool
Admission tier
indw.filter.stage0.admission
End of Stage0
Custom Stage0 filter
Add rejection logic to the structural filter path by extending run_stage0_content_filters behavior through configuration thresholds, or by adding a pre-filter hook:
python
from indw.filter.stage0.engine import run_stage0_content_filters, worker_gate_policyfrom indw.clean.gate.evaluate import document_gate_rawdef custom_stage0_check(text: str, meaningful_chars: int) -> str | None: """Return rejection reason string, or None to pass.""" if meaningful_chars < 200: return "too_short_custom" features = document_gate_raw(text) if features.get("custom_signal", 0) > 0.9: return "custom_spam_detected" return None# Integrate in batch processing:pol = worker_gate_policy()reject = run_stage0_content_filters(text, meaningful_chars=chars, raw=features, pol=pol)if reject is None: reject = custom_stage0_check(text, chars)
Custom quality gate
Extend quality evaluation by adding threshold checks to your quality profile:
yaml
thresholds: min_chars: 200 max_commercial_score: 0.30 # tighten for your corpus min_informative_density: 0.020
Add cleaning logic to the document cleaning pipeline:
python
from indw.clean.document.clean import clean_document_textdef custom_clean(text: str, config) -> str: text = clean_document_text(text, config) # Custom post-processing text = text.replace("REDACTED_TEMPLATE", "") return text
Register custom cleaning in CleaningConfig by enabling the relevant stages and adding post-clean hooks in your merge bootstrap.
Custom document gate
python
from indw.clean.gate.evaluate import document_gate_rawdef custom_document_gate(text: str, thresholds: dict) -> str | None: features = document_gate_raw(text) if features["alpha_ratio"] < thresholds.get("min_alpha_ratio", 0.45): return "low_alpha_ratio" if "required_keyword" not in text.lower(): return "missing_required_content" return None
Stage pool registration
For deeper integration, add a new stage pool to the execution graph:
python
# indw.schedule.stages.pools — follow existing pool patternsdef process_custom_pool_batch(batch: list[dict]) -> dict: terminal = [] survivors = [] for item in batch: if should_reject(item): terminal.append(make_terminal(item, reason="custom_reject")) else: survivors.append(item) return {"terminal": terminal, "survivors": survivors}
New pools must preserve the {"terminal": [...], "survivors": [...]} contract and integrate with MergeDocumentContext for stage marks.
Determinism requirement:
Custom stages must be deterministic: no randomness, no time-dependent logic, no unordered iteration. Non-deterministic stages break hash parity across backends.