Schema Validation for Ingested Records

Schema validation operates as the primary control plane within the Catalog Ingestion & ILS Sync Pipelines architecture, enforcing structural integrity before bibliographic and authority records enter transactional systems. For public sector libraries and enterprise ILS deployments, deterministic validation prevents downstream corruption, maintains patron-facing metadata accuracy, and satisfies institutional compliance mandates. This guide outlines production-ready validation patterns, orchestration strategies, and compliance synchronization workflows optimized for high-throughput catalog environments.

Declarative Contract Definitions

Effective validation begins with declarative contract definitions. Modern Python implementations typically leverage Pydantic v2 to model MARC21, BIBFRAME, or custom JSON-LD payloads. Each record class requires explicit field-level constraints, type coercion rules, and conditional validation logic. When integrating with legacy ILS systems, validators must accommodate vendor-specific extensions while preserving core interoperability standards.

python
from datetime import datetime, timezone
from typing import List, Optional

from pydantic import BaseModel, ConfigDict, Field, field_validator

class MarcSubfield(BaseModel):
    code: str = Field(pattern=r"^[a-z0-9]$")
    value: str = Field(min_length=1, max_length=2048)

class MarcField(BaseModel):
    tag: str = Field(pattern=r"^\d{3}$")
    indicator1: Optional[str] = Field(default=None, pattern=r"^[ #]$")
    indicator2: Optional[str] = Field(default=None, pattern=r"^[ #]$")
    subfields: List[MarcSubfield] = Field(default_factory=list)

    @field_validator("tag")
    @classmethod
    def validate_control_vs_data(cls, v: str) -> str:
        if v.startswith("00"):
            raise ValueError("Control fields (00X) must not contain subfields")
        return v

class BibliographicRecord(BaseModel):
    model_config = ConfigDict(str_strip_whitespace=True, extra="forbid")
    record_id: str = Field(alias="001")
    leader: str = Field(min_length=24, max_length=24)
    fields: List[MarcField]
    created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

The contract above enforces strict typing, prevents unexpected vendor extensions via extra="forbid", and isolates control fields from data fields. For comprehensive schema modeling guidance, consult the official Pydantic v2 documentation.

Ingestion Boundary & Early Rejection

The initial normalization stage relies on Parsing MARC Records with pymarc to convert raw binary or XML streams into structured dictionaries prior to schema evaluation. At this ingestion boundary, validators should immediately reject malformed indicators, duplicate control fields, and invalid subfield codes, routing non-conforming payloads to a quarantine queue rather than blocking batch execution.

Quarantine routing must preserve the original payload alongside a machine-readable rejection reason. This pattern enables automated reprocessing when upstream vendor feeds correct structural anomalies, while maintaining pipeline throughput.

python
import json
from datetime import datetime, timezone
from typing import Optional

from pydantic import ValidationError

from .models import BibliographicRecord  # defined in the previous section

def ingest_and_validate(raw_record: dict, quarantine_sink) -> Optional[BibliographicRecord]:
    try:
        return BibliographicRecord(**raw_record)
    except ValidationError as e:
        quarantine_payload = {
            "original": raw_record,
            "errors": e.errors(),
            "timestamp": datetime.now(timezone.utc).isoformat(),
        }
        quarantine_sink.send(json.dumps(quarantine_payload).encode())
        return None

Multi-Tier Validation Architecture

Production pipelines implement a multi-tier validation strategy: syntactic, structural, and semantic. Syntactic checks verify encoding standards, byte lengths, and delimiter placement. Structural validation enforces mandatory field presence, indicator alignment, and subfield ordering. Semantic validation cross-references controlled vocabularies, authority IDs, and institutional policies.

A critical early-stage gate involves Validating MARC Leader Fields Before Database Insert, which ensures record status, type of record, and bibliographic level align with ILS indexing requirements. Following structural clearance, semantic checks verify that subject headings map to authorized thesauri and that call number classifications conform to local shelving rules. Final reconciliation occurs by Validating Pipeline Outputs Against ILS Reference Data, guaranteeing that transformed records match authoritative system baselines before commit.

PII Masking & Audit-Ready Logging

Public sector infrastructure requires strict adherence to data minimization and privacy regulations. Validation layers must intercept and mask personally identifiable information (PII) before records are persisted or forwarded to downstream analytics. Common PII vectors in catalog metadata include patron annotations in 500/583 fields, restricted access URLs in 856 fields, and internal tracking codes.

Implement field-level redaction using Pydantic serializers combined with deterministic hashing for audit trail continuity:

python
import hashlib
import re

from pydantic import model_serializer

from .models import BibliographicRecord  # defined in the previous section

class SanitizedBibliographicRecord(BibliographicRecord):
    _PII_PATTERNS = [
        re.compile(r"(?:patron|staff|internal)\s*id[:\s]+(\w+)", re.IGNORECASE),
        re.compile(r"(?:ssn|dob|phone)[:\s]+([\d\-]+)", re.IGNORECASE)
    ]

    @model_serializer(mode="wrap")
    def mask_pii(self, handler):
        serialized = handler(self)
        for field, value in serialized.items():
            if isinstance(value, str):
                for pattern in self._PII_PATTERNS:
                    value = pattern.sub(lambda m: f"[REDACTED:{hashlib.sha256(m.group(1).encode()).hexdigest()[:8]}]", value)
                serialized[field] = value
        return serialized

Audit-ready logging requires structured, immutable event streams. Every validation pass must emit a JSON-formatted log entry containing a correlation ID, validation tier, pass/fail status, and masked payload fingerprints. Utilize structlog to enforce consistent key-value formatting, ensuring logs remain queryable by SIEM platforms and compliant with NIST SP 800-53 audit controls. Avoid logging raw payloads; instead, log schema hashes, field counts, and validation durations.

Orchestration & Horizontal Scaling

Validation must be orchestrated alongside data acquisition and synchronization routines to prevent pipeline bottlenecks. In distributed architectures, validation workers consume records from message brokers or staged object storage, applying schema checks asynchronously. Rate-limited ingestion from vendor endpoints requires careful backpressure handling, as documented in ILS REST API Polling & Rate Limiting. Validation steps should remain stateless and idempotent, enabling horizontal scaling during peak cataloging windows.

Workflow orchestrators like Apache Airflow or Prefect enforce strict dependency graphs where schema validation gates subsequent enrichment, deduplication, and database commit phases. Implement exponential backoff with jitter for transient validation service failures, and configure dead-letter queues for records that exceed retry thresholds. Structured alerting with diagnostic payloads ensures rapid remediation without manual log inspection, maintaining SLA compliance across municipal and academic library consortia.