Source code for formatparse.validation

"""Post-parse validation."""

from __future__ import annotations

import re
import warnings
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union
from urllib.parse import urlparse

from ._native import ParseResult
from .exceptions import MultipleValidationErrors, ValidationError, ValidationWarning
from .types import ValidationMode, ValidatorMap


[docs] def validator(func: Callable[..., Any]) -> Callable[..., Any]: """Mark a function as a post-parse validator (metadata for tooling / docs). Validators are invoked with the parsed value for one field; they should raise :exc:`ValidationError` on failure or return normally on success. :param func: Callable taking the parsed value (and optional extra args if you wrap it yourself before passing to :func:`apply_validators`). """ setattr(func, "_formatparse_validator", True) return func
def _sorted_validator_keys(keys: Iterable[Union[str, int]]) -> List[Union[str, int]]: ints = sorted(k for k in keys if isinstance(k, int)) strs = sorted(k for k in keys if isinstance(k, str)) return ints + strs def _warn_validation_failure(err: ValidationError, *, kind: str) -> None: """Emit :exc:`ValidationWarning` for lenient mode (``kind`` is e.g. ``\"field\"`` or ``\"hook\"``).""" field = err.field base = str(err) if field is not None: msg = f"{kind} validation failed (field={field!r}): {base}" else: msg = f"{kind} validation failed: {base}" warnings.warn(msg, ValidationWarning, stacklevel=3) def _validator_field_value(result: ParseResult, key: Union[str, int]) -> Any: if isinstance(key, str): if key not in result.named: raise ValidationError( f"no named field {key!r} in parse result", field=key, ) return result.named[key] fixed = result.fixed if not isinstance(key, int) or key < 0 or key >= len(fixed): raise ValidationError( f"fixed field index {key!r} out of range (len(fixed)={len(fixed)})", field=key, ) return fixed[key] def _collect_field_validator_errors( result: ParseResult, validators: ValidatorMap, ) -> List[ValidationError]: """Run every per-field validator and return all failures (never raises).""" errors: List[ValidationError] = [] for key in _sorted_validator_keys(validators.keys()): fn = validators[key] try: value = _validator_field_value(result, key) except ValidationError as e: errors.append(e) continue try: fn(value) except ValidationError as e: err = ValidationError(str(e), field=key) err.__cause__ = e.__cause__ if e.__cause__ is not None else e errors.append(err) except Exception as e: err = ValidationError(f"validator failed: {e}", field=key) err.__cause__ = e errors.append(err) return errors
[docs] def apply_validators( result: Optional[ParseResult], validators: Optional[ValidatorMap], *, mode: ValidationMode = "strict", ) -> Optional[ParseResult]: """Run post-parse validators on ``named`` / ``fixed`` values. Validators are **raise-based**: on success the callable returns (typically ``None``). On failure raise :exc:`ValidationError` (recommended) or any exception (wrapped in :exc:`ValidationError`). Replacing values in ``fixed`` slots is not supported; use named fields or mutate ``result.named`` yourself after validation if you need coercion. :param result: Output of :func:`parse` / :meth:`FormatParser.parse`, or ``None``. :param validators: Map from **field key** to validator. Keys are ``str`` field names for :attr:`ParseResult.named` or ``int`` indices for :attr:`ParseResult.fixed`. :param mode: ``\"strict\"`` — stop on first error. ``\"collect\"`` — run all validators, then raise :exc:`MultipleValidationErrors` if any failed. ``\"lenient\"`` — run all validators, emit :exc:`ValidationWarning` for each failure, and always return ``result``. :returns: The same ``result`` reference after validation (including lenient runs with failures). :raises ValidationError: In ``strict`` mode when a validator fails. :raises MultipleValidationErrors: In ``collect`` mode when any validator fails. """ if result is None or not validators: return result if mode == "lenient": for key in _sorted_validator_keys(validators.keys()): fn = validators[key] try: value = _validator_field_value(result, key) except ValidationError as e: _warn_validation_failure(e, kind="field") continue try: fn(value) except ValidationError as e: err = ValidationError(str(e), field=key) err.__cause__ = e.__cause__ if e.__cause__ is not None else e _warn_validation_failure(err, kind="field") except Exception as e: err = ValidationError(f"validator failed: {e}", field=key) err.__cause__ = e _warn_validation_failure(err, kind="field") return result if mode == "strict": for key in _sorted_validator_keys(validators.keys()): fn = validators[key] try: value = _validator_field_value(result, key) except ValidationError: raise try: fn(value) except ValidationError as e: err = ValidationError(str(e), field=key) err.__cause__ = e.__cause__ if e.__cause__ is not None else e raise err from err.__cause__ except Exception as e: err = ValidationError(f"validator failed: {e}", field=key) err.__cause__ = e raise err from e return result errors = _collect_field_validator_errors(result, validators) if errors: raise MultipleValidationErrors(errors) return result
[docs] class ValidationPipeline: """Ordered registry of per-field validators and whole-result hooks (issue #11). Build with :meth:`add_validator` and/or :meth:`add_hook`, then :meth:`apply` on a :class:`ParseResult`. Per-field keys follow :func:`apply_validators` (``str`` for named fields, ``int`` for ``fixed`` indices). If the same field is registered twice, the **last** registration wins. Hooks run in registration order **after** the per-field validator pass completes (on success, or in ``lenient`` mode after every field validator has been attempted). Async validators and inline ``{...:validator(...)}`` syntax remain deferred (see issue #11). """ __slots__ = ("_steps", "_hooks")
[docs] def __init__(self) -> None: self._steps: List[Tuple[Union[str, int], Callable[..., Any]]] = [] self._hooks: List[Callable[[ParseResult], None]] = []
[docs] def add_validator( self, field: Union[str, int], fn: Callable[..., Any], ) -> ValidationPipeline: """Register ``fn`` for ``field``; returns ``self`` for chaining.""" self._steps.append((field, fn)) return self
[docs] def add_hook(self, fn: Callable[[ParseResult], None]) -> ValidationPipeline: """Register a whole-result hook; runs after per-field validators. Chainable.""" self._hooks.append(fn) return self
[docs] def as_mapping(self) -> Dict[Union[str, int], Callable[..., Any]]: """Last registration per field wins (dict built in ``add_validator`` order).""" m: Dict[Union[str, int], Callable[..., Any]] = {} for k, fn in self._steps: m[k] = fn return m
[docs] def apply( self, result: Optional[ParseResult], *, mode: ValidationMode = "strict", ) -> Optional[ParseResult]: """Run per-field validators, then registered hooks. If ``result`` is ``None``, returns ``None`` immediately (no validators or hooks). Hooks receive the full :class:`ParseResult` and use the same raise-based contract as :func:`apply_validators`. Failures are :exc:`ValidationError` (``field`` preserved when the raised error had one; otherwise ``None`` for generic hook failures). Other exceptions become :exc:`ValidationError` with ``field=None``. ``mode`` matches :func:`apply_validators`: ``strict`` stops on the first failure (field or hook). ``collect`` runs **all** per-field validators and **all** hooks, then raises a single :exc:`MultipleValidationErrors` listing field failures first (same key order as :func:`apply_validators`), then hook failures in hook registration order. ``lenient`` runs all field validators (warning on each failure), then all hooks (warning on each failure), and returns ``result`` without raising validation exceptions. """ if result is None: return result if mode == "collect": errors = _collect_field_validator_errors(result, self.as_mapping()) for h in self._hooks: try: h(result) except ValidationError as e: err = ValidationError(str(e), field=e.field) err.__cause__ = e.__cause__ if e.__cause__ is not None else e errors.append(err) except Exception as e: err = ValidationError(f"hook failed: {e}", field=None) err.__cause__ = e errors.append(err) if errors: raise MultipleValidationErrors(errors) return result apply_validators(result, self.as_mapping(), mode=mode) if not self._hooks: return result if mode == "lenient": for h in self._hooks: try: h(result) except ValidationError as e: err = ValidationError(str(e), field=e.field) err.__cause__ = e.__cause__ if e.__cause__ is not None else e _warn_validation_failure(err, kind="hook") except Exception as e: err = ValidationError(f"hook failed: {e}", field=None) err.__cause__ = e _warn_validation_failure(err, kind="hook") return result if mode == "strict": for h in self._hooks: try: h(result) except ValidationError as e: err = ValidationError(str(e), field=e.field) err.__cause__ = e.__cause__ if e.__cause__ is not None else e raise err from err.__cause__ except Exception as e: err = ValidationError(f"hook failed: {e}", field=None) err.__cause__ = e raise err from e return result
[docs] def in_range( min_value: Optional[Union[int, float]] = None, max_value: Optional[Union[int, float]] = None, ) -> Callable[[Union[int, float]], None]: """Return a validator that accepts numeric ``value`` within ``[min_value, max_value]``.""" def check(value: Union[int, float]) -> None: if min_value is not None and value < min_value: raise ValidationError( f"expected value >= {min_value!r}, got {value!r}", ) if max_value is not None and value > max_value: raise ValidationError( f"expected value <= {max_value!r}, got {value!r}", ) return check
[docs] def non_empty_str(value: Any) -> None: """Reject ``None``, non-strings, or blank/whitespace-only strings.""" if not isinstance(value, str) or not value.strip(): raise ValidationError("expected non-empty string")
# Practical ``user@host`` check (not full RFC 5322 / internationalized email). _EMAIL_RE = re.compile(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$")
[docs] def is_valid_email(value: Any) -> None: """Reject values that are not plausible ``user@domain`` mailbox strings. Uses a simple ASCII pattern suitable for post-parse validation only. It does not implement full RFC 5322 (no quoted-string local parts, no comments) and is not a deliverability or security check. :raises ValidationError: If ``value`` is not a non-empty string matching the pattern. """ if not isinstance(value, str) or not value.strip(): raise ValidationError("expected a non-empty string for email") if not _EMAIL_RE.fullmatch(value.strip()): raise ValidationError("invalid email address")
[docs] def is_valid_url(value: Any) -> None: """Reject values that are not ``http`` or ``https`` URLs with a non-empty host. Uses :func:`urllib.parse.urlparse`. This does not verify reachability, TLS, or that the resource exists. :raises ValidationError: If the string is empty, the scheme is not ``http``/``https``, or the parsed URL has no network location (host). """ if not isinstance(value, str) or not value.strip(): raise ValidationError("expected a non-empty string for URL") parsed = urlparse(value.strip()) if parsed.scheme not in ("http", "https"): raise ValidationError("expected URL with http or https scheme") if not parsed.netloc: raise ValidationError("invalid URL: missing host")
def _validation_source_exclusive( *, validators: Optional[ValidatorMap], pipeline: Optional[ValidationPipeline], ) -> None: if validators is not None and pipeline is not None: raise ValueError("pass only one of validators= or pipeline=") def validation_source_exclusive( *, validators: Optional[ValidatorMap], pipeline: Optional["ValidationPipeline"], ) -> None: if validators is not None and pipeline is not None: raise ValueError("pass only one of validators= or pipeline=") def post_parse_validate( result: Optional[ParseResult], *, validators: Optional[ValidatorMap] = None, pipeline: Optional["ValidationPipeline"] = None, validation_mode: ValidationMode = "strict", ) -> Optional[ParseResult]: """Run optional validators or pipeline after parse (shared by parse / ValidatedParser).""" validation_source_exclusive(validators=validators, pipeline=pipeline) if pipeline is not None: return pipeline.apply(result, mode=validation_mode) return apply_validators(result, validators, mode=validation_mode)