Skip to content

API Reference

Core

policy

Policy engine — registration and lookup of authorization policies.

policy(resource_type, action, *, predicate=None, registry=None, query_only=False)

Decorator that registers a policy function for (model, action).

The decorated function receives an actor and returns a ColumnElement[bool] filter expression.

When predicate is provided, the predicate's __call__ is registered as the policy function instead of the decorated function body. The decorated function is still used for its name and docstring.

Parameters:

Name Type Description Default
resource_type type

The SQLAlchemy model class.

required
action str

The action string (e.g., "read", "update").

required
predicate Predicate | None

Optional composable predicate to use as the policy function.

None
registry PolicyRegistry | None

Optional custom registry. Defaults to the global registry.

None
query_only bool

If True, this policy uses SQL constructs not supported by the in-memory evaluator. can() and authorize() will raise QueryOnlyPolicyError instead of attempting evaluation. Defaults to False.

False

Returns:

Type Description
Callable[[F], F]

A decorator that registers the function and returns it unchanged.

Example::

@policy(Post, "read")
def post_read(actor: User) -> ColumnElement[bool]:
    return or_(
        Post.is_published == True,
        Post.author_id == actor.id,
    )

@policy(Post, "update", predicate=is_author)
def post_update(actor: User) -> ColumnElement[bool]:
    ...

@policy(Post, "read", query_only=True)
def complex_read(actor: User) -> ColumnElement[bool]:
    return func.lower(Post.category) == "public"
Source code in src/sqla_authz/policy/_decorator.py
def policy(
    resource_type: type,
    action: str,
    *,
    predicate: Predicate | None = None,
    registry: PolicyRegistry | None = None,
    query_only: bool = False,
) -> Callable[[F], F]:
    """Decorator that registers a policy function for (model, action).

    The decorated function receives an actor and returns a
    ``ColumnElement[bool]`` filter expression.

    When ``predicate`` is provided, the predicate's ``__call__`` is
    registered as the policy function instead of the decorated function body.
    The decorated function is still used for its name and docstring.

    Args:
        resource_type: The SQLAlchemy model class.
        action: The action string (e.g., "read", "update").
        predicate: Optional composable predicate to use as the policy function.
        registry: Optional custom registry. Defaults to the global registry.
        query_only: If ``True``, this policy uses SQL constructs not supported
            by the in-memory evaluator. ``can()`` and ``authorize()`` will raise
            ``QueryOnlyPolicyError`` instead of attempting evaluation.
            Defaults to ``False``.

    Returns:
        A decorator that registers the function and returns it unchanged.

    Example::

        @policy(Post, "read")
        def post_read(actor: User) -> ColumnElement[bool]:
            return or_(
                Post.is_published == True,
                Post.author_id == actor.id,
            )

        @policy(Post, "update", predicate=is_author)
        def post_update(actor: User) -> ColumnElement[bool]:
            ...

        @policy(Post, "read", query_only=True)
        def complex_read(actor: User) -> ColumnElement[bool]:
            return func.lower(Post.category) == "public"
    """

    def decorator(fn: F) -> F:
        target = registry if registry is not None else get_default_registry()
        policy_fn: Callable[..., ColumnElement[bool]] = predicate if predicate is not None else fn
        target.register(
            resource_type,
            action,
            policy_fn,
            name=fn.__name__,
            description=fn.__doc__ or "",
            query_only=query_only,
        )
        return fn

    return decorator

authorize_query(stmt, *, actor, action, registry=None)

Apply authorization filters to a SQLAlchemy SELECT statement.

Looks up registered policies for the statement's entities and the given action, evaluates them with the actor, and applies the resulting WHERE clauses.

Parameters:

Name Type Description Default
stmt Select[Any]

A SQLAlchemy 2.0 Select statement.

required
actor ActorLike

The user/principal. Must satisfy ActorLike protocol.

required
action str

The action being performed (e.g., "read", "update").

required
registry PolicyRegistry | None

Optional custom registry. Defaults to the global registry.

None

Returns:

Type Description
Select[Any]

A new Select with authorization filters applied.

Example::

stmt = select(Post).where(Post.category == "tech")
stmt = authorize_query(stmt, actor=current_user, action="read")
# SQL: SELECT ... WHERE category = 'tech'
#      AND (is_published OR author_id = :id)
Source code in src/sqla_authz/compiler/_query.py
def authorize_query(
    stmt: Select[Any],
    *,
    actor: ActorLike,
    action: str,
    registry: PolicyRegistry | None = None,
) -> Select[Any]:
    """Apply authorization filters to a SQLAlchemy SELECT statement.

    Looks up registered policies for the statement's entities and the
    given action, evaluates them with the actor, and applies the
    resulting WHERE clauses.

    Args:
        stmt: A SQLAlchemy 2.0 Select statement.
        actor: The user/principal. Must satisfy ActorLike protocol.
        action: The action being performed (e.g., "read", "update").
        registry: Optional custom registry. Defaults to the global registry.

    Returns:
        A new Select with authorization filters applied.

    Example::

        stmt = select(Post).where(Post.category == "tech")
        stmt = authorize_query(stmt, actor=current_user, action="read")
        # SQL: SELECT ... WHERE category = 'tech'
        #      AND (is_published OR author_id = :id)
    """
    target_registry = registry if registry is not None else get_default_registry()

    check_unknown_action(target_registry, action)

    desc_list: list[dict[str, Any]] = stmt.column_descriptions
    for desc in desc_list:
        entity: type | None = desc.get("entity")
        if entity is None:
            continue

        filter_expr = evaluate_policies(target_registry, entity, action, actor)
        stmt = stmt.where(filter_expr)

    return stmt

can(actor, action, resource, *, registry=None, session=None)

Check if actor can perform action on a specific resource instance.

Returns True if the policy filter matches the resource, False otherwise. The real application database is never touched — the filter expression is evaluated in-memory by walking the SQLAlchemy ColumnElement AST.

Parameters:

Name Type Description Default
actor ActorLike

The user/principal performing the action.

required
action str

The action string (e.g., "read", "update").

required
resource DeclarativeBase

A mapped SQLAlchemy model instance.

required
registry PolicyRegistry | None

Optional custom registry. Defaults to the global registry.

None
session Session | None

Optional session (reserved for future use).

None

Returns:

Type Description
bool

True if access is granted, False if denied.

Raises:

Type Description
QueryOnlyPolicyError

If any matching policy is marked query_only=True.

Example::

post = session.get(Post, 1)
if can(current_user, "read", post):
    return post
Source code in src/sqla_authz/_checks.py
def can(
    actor: ActorLike,
    action: str,
    resource: DeclarativeBase,
    *,
    registry: PolicyRegistry | None = None,
    session: Session | None = None,
) -> bool:
    """Check if *actor* can perform *action* on a specific resource instance.

    Returns ``True`` if the policy filter matches the resource, ``False``
    otherwise.  The real application database is **never** touched — the
    filter expression is evaluated in-memory by walking the SQLAlchemy
    ColumnElement AST.

    Args:
        actor: The user/principal performing the action.
        action: The action string (e.g., ``"read"``, ``"update"``).
        resource: A mapped SQLAlchemy model instance.
        registry: Optional custom registry.  Defaults to the global registry.
        session: Optional session (reserved for future use).

    Returns:
        ``True`` if access is granted, ``False`` if denied.

    Raises:
        QueryOnlyPolicyError: If any matching policy is marked
            ``query_only=True``.

    Example::

        post = session.get(Post, 1)
        if can(current_user, "read", post):
            return post
    """
    target_registry = registry if registry is not None else get_default_registry()

    check_unknown_action(target_registry, action)

    resource_type = type(resource)

    # Check for query-only policies before attempting in-memory evaluation
    policies = target_registry.lookup(resource_type, action)
    if any(p.query_only for p in policies):
        raise QueryOnlyPolicyError(
            resource_type=resource_type.__name__,
            action=action,
            query_only_policies=[p.name for p in policies if p.query_only],
        )

    filter_expr = evaluate_policies(
        target_registry, resource_type, action, actor, policies=policies
    )

    return eval_expression(filter_expr, resource)

can_create(actor, resource, *, registry=None, session=None)

Check whether actor can create resource in its current state.

This is a convenience wrapper around can(..., action="create") for pending or transient ORM instances. Populate the object first, then call can_create() before flushing or committing.

Parameters:

Name Type Description Default
actor ActorLike

The user/principal attempting the create.

required
resource DeclarativeBase

The pending mapped SQLAlchemy model instance.

required
registry PolicyRegistry | None

Optional custom registry. Defaults to the global registry.

None
session Session | None

Optional session (reserved for future use).

None

Returns:

Type Description
bool

True if creation is allowed, False otherwise.

Source code in src/sqla_authz/_checks.py
def can_create(
    actor: ActorLike,
    resource: DeclarativeBase,
    *,
    registry: PolicyRegistry | None = None,
    session: Session | None = None,
) -> bool:
    """Check whether *actor* can create *resource* in its current state.

    This is a convenience wrapper around ``can(..., action="create")`` for
    pending or transient ORM instances. Populate the object first, then call
    ``can_create()`` before flushing or committing.

    Args:
        actor: The user/principal attempting the create.
        resource: The pending mapped SQLAlchemy model instance.
        registry: Optional custom registry. Defaults to the global registry.
        session: Optional session (reserved for future use).

    Returns:
        ``True`` if creation is allowed, ``False`` otherwise.
    """
    return can(actor, CREATE, resource, registry=registry, session=session)

authorize(actor, action, resource, *, registry=None, message=None, session=None)

Assert that actor is authorized to perform action on resource.

Raises :class:~sqla_authz.exceptions.AuthorizationDenied when access is denied. Returns None on success.

Parameters:

Name Type Description Default
actor ActorLike

The user/principal performing the action.

required
action str

The action string (e.g., "read", "update").

required
resource DeclarativeBase

A mapped SQLAlchemy model instance.

required
registry PolicyRegistry | None

Optional custom registry. Defaults to the global registry.

None
message str | None

Optional custom error message for the exception.

None
session Session | None

Optional session (reserved for future use).

None

Raises:

Type Description
AuthorizationDenied

If the actor is not authorized.

QueryOnlyPolicyError

If any matching policy is marked query_only=True.

Example::

authorize(current_user, "update", post)  # raises if denied
Source code in src/sqla_authz/_checks.py
def authorize(
    actor: ActorLike,
    action: str,
    resource: DeclarativeBase,
    *,
    registry: PolicyRegistry | None = None,
    message: str | None = None,
    session: Session | None = None,
) -> None:
    """Assert that *actor* is authorized to perform *action* on *resource*.

    Raises :class:`~sqla_authz.exceptions.AuthorizationDenied` when access
    is denied.  Returns ``None`` on success.

    Args:
        actor: The user/principal performing the action.
        action: The action string (e.g., ``"read"``, ``"update"``).
        resource: A mapped SQLAlchemy model instance.
        registry: Optional custom registry.  Defaults to the global registry.
        message: Optional custom error message for the exception.
        session: Optional session (reserved for future use).

    Raises:
        AuthorizationDenied: If the actor is not authorized.
        QueryOnlyPolicyError: If any matching policy is marked
            ``query_only=True``.

    Example::

        authorize(current_user, "update", post)  # raises if denied
    """
    if not can(actor, action, resource, registry=registry, session=session):
        raise AuthorizationDenied(
            actor=actor,
            action=action,
            resource_type=type(resource).__name__,
            message=message,
        )

authorize_create(actor, resource, *, registry=None, message=None, session=None)

Assert that actor can create resource in its current state.

This is a convenience wrapper around authorize(..., action="create") for pending or transient ORM instances.

Parameters:

Name Type Description Default
actor ActorLike

The user/principal attempting the create.

required
resource DeclarativeBase

The pending mapped SQLAlchemy model instance.

required
registry PolicyRegistry | None

Optional custom registry. Defaults to the global registry.

None
message str | None

Optional custom error message for the exception.

None
session Session | None

Optional session (reserved for future use).

None
Source code in src/sqla_authz/_checks.py
def authorize_create(
    actor: ActorLike,
    resource: DeclarativeBase,
    *,
    registry: PolicyRegistry | None = None,
    message: str | None = None,
    session: Session | None = None,
) -> None:
    """Assert that *actor* can create *resource* in its current state.

    This is a convenience wrapper around ``authorize(..., action="create")``
    for pending or transient ORM instances.

    Args:
        actor: The user/principal attempting the create.
        resource: The pending mapped SQLAlchemy model instance.
        registry: Optional custom registry. Defaults to the global registry.
        message: Optional custom error message for the exception.
        session: Optional session (reserved for future use).
    """
    authorize(
        actor,
        CREATE,
        resource,
        registry=registry,
        message=message,
        session=session,
    )

configure(*, on_missing_policy=None, default_action=None, log_policy_decisions=None, on_unloaded_relationship=None, strict_mode=None, on_unprotected_get=None, on_text_query=None, on_skip_authz=None, audit_bypasses=None, intercept_creates=None, intercept_updates=None, intercept_deletes=None, on_write_denied=None, on_unknown_action=None)

Update the global configuration by merging overrides.

Only non-None values are applied. Returns the new global config.

Parameters:

Name Type Description Default
on_missing_policy OnMissingPolicy | None

Set to "deny" or "raise".

None
default_action str | None

Set the default action string.

None
log_policy_decisions bool | None

Enable/disable audit logging of policy decisions.

None
on_unloaded_relationship OnUnloadedRelationship | None

Set to "deny", "raise", or "warn".

None
strict_mode bool | None

Enable strict mode (applies convenience defaults).

None
on_unprotected_get OnBypassAction | None

Set to "ignore", "warn", or "raise".

None
on_text_query OnBypassAction | None

Set to "ignore", "warn", or "raise".

None
on_skip_authz OnSkipAuthz | None

Set to "ignore", "warn", or "log".

None
audit_bypasses bool | None

Enable/disable bypass audit logging.

None
intercept_creates bool | None

Enable interception of ORM create/insert flushes.

None
intercept_updates bool | None

Enable interception of UPDATE statements.

None
intercept_deletes bool | None

Enable interception of DELETE statements.

None
on_write_denied OnWriteDenied | None

Set to "raise" or "filter".

None
on_unknown_action OnUnknownAction | None

Set to "ignore", "warn", or "raise".

None

Returns:

Type Description
AuthzConfig

The updated global AuthzConfig.

Example::

configure(on_missing_policy="raise")
# Now missing policies raise NoPolicyError instead of denying
Source code in src/sqla_authz/config/_config.py
def configure(
    *,
    on_missing_policy: OnMissingPolicy | None = None,
    default_action: str | None = None,
    log_policy_decisions: bool | None = None,
    on_unloaded_relationship: OnUnloadedRelationship | None = None,
    strict_mode: bool | None = None,
    on_unprotected_get: OnBypassAction | None = None,
    on_text_query: OnBypassAction | None = None,
    on_skip_authz: OnSkipAuthz | None = None,
    audit_bypasses: bool | None = None,
    intercept_creates: bool | None = None,
    intercept_updates: bool | None = None,
    intercept_deletes: bool | None = None,
    on_write_denied: OnWriteDenied | None = None,
    on_unknown_action: OnUnknownAction | None = None,
) -> AuthzConfig:
    """Update the global configuration by merging overrides.

    Only non-None values are applied. Returns the new global config.

    Args:
        on_missing_policy: Set to ``"deny"`` or ``"raise"``.
        default_action: Set the default action string.
        log_policy_decisions: Enable/disable audit logging of policy decisions.
        on_unloaded_relationship: Set to ``"deny"``, ``"raise"``, or ``"warn"``.
        strict_mode: Enable strict mode (applies convenience defaults).
        on_unprotected_get: Set to ``"ignore"``, ``"warn"``, or ``"raise"``.
        on_text_query: Set to ``"ignore"``, ``"warn"``, or ``"raise"``.
        on_skip_authz: Set to ``"ignore"``, ``"warn"``, or ``"log"``.
        audit_bypasses: Enable/disable bypass audit logging.
        intercept_creates: Enable interception of ORM create/insert flushes.
        intercept_updates: Enable interception of UPDATE statements.
        intercept_deletes: Enable interception of DELETE statements.
        on_write_denied: Set to ``"raise"`` or ``"filter"``.
        on_unknown_action: Set to ``"ignore"``, ``"warn"``, or ``"raise"``.

    Returns:
        The updated global ``AuthzConfig``.

    Example::

        configure(on_missing_policy="raise")
        # Now missing policies raise NoPolicyError instead of denying
    """
    global _global_config
    _global_config = _global_config.merge(
        on_missing_policy=on_missing_policy,
        default_action=default_action,
        log_policy_decisions=log_policy_decisions,
        on_unloaded_relationship=on_unloaded_relationship,
        strict_mode=strict_mode,
        on_unprotected_get=on_unprotected_get,
        on_text_query=on_text_query,
        on_skip_authz=on_skip_authz,
        audit_bypasses=audit_bypasses,
        intercept_creates=intercept_creates,
        intercept_updates=intercept_updates,
        intercept_deletes=intercept_deletes,
        on_write_denied=on_write_denied,
        on_unknown_action=on_unknown_action,
    )
    return _global_config

Actions

actions

Action constants and factory for sqla-authz.

Provides well-known action constants for use with @policy, authorize_query, can, and other APIs. Using constants instead of bare strings gives IDE autocomplete and prevents typos.

Example::

from sqla_authz.actions import READ, UPDATE, action

PUBLISH = action("publish")

@policy(Post, READ)
def post_read(actor: User) -> ColumnElement[bool]:
    return Post.is_published == True

@policy(Post, PUBLISH)
def post_publish(actor: User) -> ColumnElement[bool]:
    return Post.author_id == actor.id

READ = 'read' module-attribute

Built-in action constant for read operations.

UPDATE = 'update' module-attribute

Built-in action constant for update operations.

DELETE = 'delete' module-attribute

Built-in action constant for delete operations.

CREATE = 'create' module-attribute

Built-in action constant for create operations.

action(name)

Create a validated action name for use in policies and queries.

Validates that the name follows conventions: non-empty, lowercase, alphabetic (underscores allowed). This catches common mistakes like action("Read Posts") or action("") at definition time.

Custom actions created with this factory work identically to the built-in constants — they're plain strings.

Parameters:

Name Type Description Default
name str

The action name. Must be lowercase alphabetic with optional underscores (e.g., "approve", "soft_delete").

required

Returns:

Type Description
str

The validated action name string.

Raises:

Type Description
ValueError

If the name doesn't follow naming conventions.

Example::

APPROVE = action("approve")
SOFT_DELETE = action("soft_delete")
PUBLISH = action("publish")

@policy(Article, PUBLISH)
def article_publish(actor: User) -> ColumnElement[bool]:
    return Article.author_id == actor.id
Source code in src/sqla_authz/actions.py
def action(name: str) -> str:
    """Create a validated action name for use in policies and queries.

    Validates that the name follows conventions: non-empty, lowercase,
    alphabetic (underscores allowed). This catches common mistakes like
    ``action("Read Posts")`` or ``action("")`` at definition time.

    Custom actions created with this factory work identically to the
    built-in constants — they're plain strings.

    Args:
        name: The action name. Must be lowercase alphabetic with
            optional underscores (e.g., ``"approve"``, ``"soft_delete"``).

    Returns:
        The validated action name string.

    Raises:
        ValueError: If the name doesn't follow naming conventions.

    Example::

        APPROVE = action("approve")
        SOFT_DELETE = action("soft_delete")
        PUBLISH = action("publish")

        @policy(Article, PUBLISH)
        def article_publish(actor: User) -> ColumnElement[bool]:
            return Article.author_id == actor.id
    """
    if not name:
        raise ValueError("Action name must be non-empty")
    if not name.replace("_", "").isalpha():
        raise ValueError(
            f"Action name must contain only lowercase letters and underscores, got {name!r}"
        )
    if not name.islower():
        raise ValueError(
            f"Action name must be lowercase, got {name!r}. Use {name.lower()!r} instead."
        )
    return name

Types

ActorLike

Bases: Protocol

Structural type for authorization actors.

Any object with an id attribute satisfies this protocol. Works with SQLAlchemy models, dataclasses, Pydantic models, named tuples — no inheritance required.

Example::

@dataclass
class User:
    id: int
    name: str

user = User(id=1, name="Alice")
assert isinstance(user, ActorLike)

PolicyRegistry()

Registry that maps (model, action) pairs to policy functions.

All public methods are thread-safe via an internal lock.

Example::

registry = PolicyRegistry()
registry.register(Post, "read", my_policy_fn, name="p", description="")
policies = registry.lookup(Post, "read")
Source code in src/sqla_authz/policy/_registry.py
def __init__(self) -> None:
    self._policies: dict[tuple[type, str], list[PolicyRegistration]] = {}
    self._scopes: list[ScopeRegistration] = []
    self._lock = threading.Lock()

register(resource_type, action, fn, *, name, description, validate_signature=True, query_only=False)

Register a policy function for a (model, action) pair.

Multiple policies can be registered for the same key; they will be OR'd together at evaluation time.

Parameters:

Name Type Description Default
resource_type type

The SQLAlchemy model class.

required
action str

The action string (e.g., "read", "update").

required
fn Callable[..., ColumnElement[bool]]

A callable that takes an actor and returns a ColumnElement[bool] filter expression.

required
name str

Human-readable name for the policy (used in logging).

required
description str

Description of the policy (typically the docstring).

required
validate_signature bool

If True (default), validate that fn accepts at least one positional parameter.

True
query_only bool

If True, marks this policy as query-only. can() and authorize() will raise QueryOnlyPolicyError instead of attempting in-memory evaluation. Defaults to False.

False

Returns:

Type Description
None

None

Example::

registry = PolicyRegistry()
registry.register(
    Post, "read",
    lambda actor: Post.is_published == True,
    name="published_only",
    description="Allow reading published posts",
)
Source code in src/sqla_authz/policy/_registry.py
def register(
    self,
    resource_type: type,
    action: str,
    fn: Callable[..., ColumnElement[bool]],
    *,
    name: str,
    description: str,
    validate_signature: bool = True,
    query_only: bool = False,
) -> None:
    """Register a policy function for a (model, action) pair.

    Multiple policies can be registered for the same key; they will
    be OR'd together at evaluation time.

    Args:
        resource_type: The SQLAlchemy model class.
        action: The action string (e.g., ``"read"``, ``"update"``).
        fn: A callable that takes an actor and returns a
            ``ColumnElement[bool]`` filter expression.
        name: Human-readable name for the policy (used in logging).
        description: Description of the policy (typically the docstring).
        validate_signature: If ``True`` (default), validate that *fn*
            accepts at least one positional parameter.
        query_only: If ``True``, marks this policy as query-only.
            ``can()`` and ``authorize()`` will raise
            ``QueryOnlyPolicyError`` instead of attempting in-memory
            evaluation. Defaults to ``False``.

    Returns:
        None

    Example::

        registry = PolicyRegistry()
        registry.register(
            Post, "read",
            lambda actor: Post.is_published == True,
            name="published_only",
            description="Allow reading published posts",
        )
    """
    if validate_signature:
        _validate_policy_signature(fn)

    registration = PolicyRegistration(
        resource_type=resource_type,
        action=action,
        fn=fn,
        name=name,
        description=description,
        query_only=query_only,
    )
    key = (resource_type, action)
    with self._lock:
        if key not in self._policies:
            self._policies[key] = []
        self._policies[key].append(registration)

lookup(resource_type, action)

Look up all policies for a (model, action) pair.

Returns a copy of the internal list so callers cannot mutate the registry state.

Parameters:

Name Type Description Default
resource_type type

The SQLAlchemy model class to look up.

required
action str

The action string to look up.

required

Returns:

Type Description
list[PolicyRegistration]

A list of PolicyRegistration objects. Empty list if

list[PolicyRegistration]

no policies are registered for the given key.

Example::

policies = registry.lookup(Post, "read")
for p in policies:
    print(p.name)
Source code in src/sqla_authz/policy/_registry.py
def lookup(self, resource_type: type, action: str) -> list[PolicyRegistration]:
    """Look up all policies for a (model, action) pair.

    Returns a copy of the internal list so callers cannot mutate
    the registry state.

    Args:
        resource_type: The SQLAlchemy model class to look up.
        action: The action string to look up.

    Returns:
        A list of ``PolicyRegistration`` objects.  Empty list if
        no policies are registered for the given key.

    Example::

        policies = registry.lookup(Post, "read")
        for p in policies:
            print(p.name)
    """
    with self._lock:
        return list(self._policies.get((resource_type, action), []))

has_policy(resource_type, action)

Check whether at least one policy exists for (model, action).

Parameters:

Name Type Description Default
resource_type type

The SQLAlchemy model class.

required
action str

The action string.

required

Returns:

Type Description
bool

True if at least one policy is registered, False otherwise.

Example::

if not registry.has_policy(Post, "delete"):
    print("No delete policy for Post")
Source code in src/sqla_authz/policy/_registry.py
def has_policy(self, resource_type: type, action: str) -> bool:
    """Check whether at least one policy exists for (model, action).

    Args:
        resource_type: The SQLAlchemy model class.
        action: The action string.

    Returns:
        ``True`` if at least one policy is registered, ``False`` otherwise.

    Example::

        if not registry.has_policy(Post, "delete"):
            print("No delete policy for Post")
    """
    with self._lock:
        return (resource_type, action) in self._policies

registered_entities(action)

Return all entity types that have policies registered for action.

Useful for applying loader criteria to relationship loads that are not part of the main query.

Parameters:

Name Type Description Default
action str

The action string to filter by.

required

Returns:

Type Description
set[type]

A set of model classes with registered policies for the action.

Example::

entities = registry.registered_entities("read")
# e.g., {Post, User}
Source code in src/sqla_authz/policy/_registry.py
def registered_entities(self, action: str) -> set[type]:
    """Return all entity types that have policies registered for *action*.

    Useful for applying loader criteria to relationship loads
    that are not part of the main query.

    Args:
        action: The action string to filter by.

    Returns:
        A set of model classes with registered policies for the action.

    Example::

        entities = registry.registered_entities("read")
        # e.g., {Post, User}
    """
    with self._lock:
        return {entity for entity, act in self._policies if act == action}

register_scope(scope_reg)

Register a cross-cutting scope filter.

Parameters:

Name Type Description Default
scope_reg ScopeRegistration

A ScopeRegistration containing the scope function, target models, and optional action restriction.

required

Example::

from sqla_authz.policy._scope import ScopeRegistration
reg = ScopeRegistration(
    applies_to=(Post, Comment),
    fn=my_scope_fn,
    name="tenant",
    description="Tenant isolation",
    actions=None,
)
registry.register_scope(reg)
Source code in src/sqla_authz/policy/_registry.py
def register_scope(self, scope_reg: ScopeRegistration) -> None:
    """Register a cross-cutting scope filter.

    Args:
        scope_reg: A ``ScopeRegistration`` containing the scope
            function, target models, and optional action restriction.

    Example::

        from sqla_authz.policy._scope import ScopeRegistration
        reg = ScopeRegistration(
            applies_to=(Post, Comment),
            fn=my_scope_fn,
            name="tenant",
            description="Tenant isolation",
            actions=None,
        )
        registry.register_scope(reg)
    """
    with self._lock:
        self._scopes.append(scope_reg)

lookup_scopes(resource_type, action=None)

Look up all scopes that apply to a model and optional action.

Parameters:

Name Type Description Default
resource_type type

The SQLAlchemy model class.

required
action str | None

Optional action string to filter by. Scopes with no actions restriction always match. Scopes with an actions list only match if action is included.

None

Returns:

Type Description
list[ScopeRegistration]

A list of matching ScopeRegistration objects.

Example::

scopes = registry.lookup_scopes(Post, "read")
Source code in src/sqla_authz/policy/_registry.py
def lookup_scopes(
    self, resource_type: type, action: str | None = None
) -> list[ScopeRegistration]:
    """Look up all scopes that apply to a model and optional action.

    Args:
        resource_type: The SQLAlchemy model class.
        action: Optional action string to filter by. Scopes with
            no ``actions`` restriction always match. Scopes with
            an ``actions`` list only match if *action* is included.

    Returns:
        A list of matching ``ScopeRegistration`` objects.

    Example::

        scopes = registry.lookup_scopes(Post, "read")
    """
    with self._lock:
        result: list[ScopeRegistration] = []
        for s in self._scopes:
            if resource_type not in s.applies_to:
                continue
            if action is not None and s.actions is not None and action not in s.actions:
                continue
            result.append(s)
        return result

has_scopes(resource_type)

Check whether at least one scope exists for a model.

Parameters:

Name Type Description Default
resource_type type

The SQLAlchemy model class.

required

Returns:

Type Description
bool

True if at least one scope covers this model.

Example::

if registry.has_scopes(Post):
    print("Post has scope coverage")
Source code in src/sqla_authz/policy/_registry.py
def has_scopes(self, resource_type: type) -> bool:
    """Check whether at least one scope exists for a model.

    Args:
        resource_type: The SQLAlchemy model class.

    Returns:
        ``True`` if at least one scope covers this model.

    Example::

        if registry.has_scopes(Post):
            print("Post has scope coverage")
    """
    with self._lock:
        return any(resource_type in s.applies_to for s in self._scopes)

known_actions()

Return all action strings that have registered policies.

Thread-safe. Useful for introspection and validation.

Returns:

Type Description
set[str]

A set of action strings.

Example::

actions = registry.known_actions()
# e.g., {"read", "update", "delete"}
Source code in src/sqla_authz/policy/_registry.py
def known_actions(self) -> set[str]:
    """Return all action strings that have registered policies.

    Thread-safe. Useful for introspection and validation.

    Returns:
        A set of action strings.

    Example::

        actions = registry.known_actions()
        # e.g., {"read", "update", "delete"}
    """
    with self._lock:
        return {action for _, action in self._policies.keys()}

known_actions_for(resource_type)

Return all action strings registered for a specific model.

Parameters:

Name Type Description Default
resource_type type

The SQLAlchemy model class.

required

Returns:

Type Description
set[str]

A set of action strings registered for that model.

Example::

actions = registry.known_actions_for(Post)
# e.g., {"read", "update"}
Source code in src/sqla_authz/policy/_registry.py
def known_actions_for(self, resource_type: type) -> set[str]:
    """Return all action strings registered for a specific model.

    Args:
        resource_type: The SQLAlchemy model class.

    Returns:
        A set of action strings registered for that model.

    Example::

        actions = registry.known_actions_for(Post)
        # e.g., {"read", "update"}
    """
    with self._lock:
        return {act for rt, act in self._policies.keys() if rt is resource_type}

clear()

Remove all registered policies and scopes.

Primarily useful in test teardown to reset the registry state between tests.

Returns:

Type Description
None

None

Example::

registry.clear()
assert registry.lookup(Post, "read") == []
Source code in src/sqla_authz/policy/_registry.py
def clear(self) -> None:
    """Remove all registered policies and scopes.

    Primarily useful in test teardown to reset the registry state
    between tests.

    Returns:
        None

    Example::

        registry.clear()
        assert registry.lookup(Post, "read") == []
    """
    with self._lock:
        self._policies.clear()
        self._scopes.clear()

Configuration

AuthzConfig(on_missing_policy='deny', default_action='read', log_policy_decisions=False, on_unloaded_relationship='deny', strict_mode=False, on_unprotected_get='ignore', on_text_query='ignore', on_skip_authz='ignore', audit_bypasses=False, intercept_creates=False, intercept_updates=False, intercept_deletes=False, on_write_denied='raise', on_unknown_action='ignore') dataclass

Layered configuration with merge semantics (global -> session -> query).

Attributes:

Name Type Description
on_missing_policy OnMissingPolicy

Behavior when no policy is registered. "deny" returns zero rows (WHERE FALSE). "raise" raises NoPolicyError.

default_action str

The default action string used when none is specified.

Example::

config = AuthzConfig(on_missing_policy="raise")
merged = config.merge(default_action="update")

merge(*, on_missing_policy=None, default_action=None, log_policy_decisions=None, on_unloaded_relationship=None, strict_mode=None, on_unprotected_get=None, on_text_query=None, on_skip_authz=None, audit_bypasses=None, intercept_creates=None, intercept_updates=None, intercept_deletes=None, on_write_denied=None, on_unknown_action=None)

Return a new config with non-None overrides applied.

Parameters:

Name Type Description Default
on_missing_policy OnMissingPolicy | None

Override for on_missing_policy (ignored if None).

None
default_action str | None

Override for default_action (ignored if None).

None
log_policy_decisions bool | None

Override for log_policy_decisions (ignored if None).

None
on_unloaded_relationship OnUnloadedRelationship | None

Override for on_unloaded_relationship (ignored if None).

None
strict_mode bool | None

Override for strict_mode (ignored if None).

None
on_unprotected_get OnBypassAction | None

Override for on_unprotected_get (ignored if None).

None
on_text_query OnBypassAction | None

Override for on_text_query (ignored if None).

None
on_skip_authz OnSkipAuthz | None

Override for on_skip_authz (ignored if None).

None
audit_bypasses bool | None

Override for audit_bypasses (ignored if None).

None
intercept_creates bool | None

Override for intercept_creates (ignored if None).

None
intercept_updates bool | None

Override for intercept_updates (ignored if None).

None
intercept_deletes bool | None

Override for intercept_deletes (ignored if None).

None
on_write_denied OnWriteDenied | None

Override for on_write_denied (ignored if None).

None
on_unknown_action OnUnknownAction | None

Override for on_unknown_action (ignored if None).

None

Returns:

Type Description
AuthzConfig

A new AuthzConfig with overrides merged.

Example::

base = AuthzConfig()
session_cfg = base.merge(on_missing_policy="raise")
query_cfg = session_cfg.merge(default_action="update")
Source code in src/sqla_authz/config/_config.py
def merge(
    self,
    *,
    on_missing_policy: OnMissingPolicy | None = None,
    default_action: str | None = None,
    log_policy_decisions: bool | None = None,
    on_unloaded_relationship: OnUnloadedRelationship | None = None,
    strict_mode: bool | None = None,
    on_unprotected_get: OnBypassAction | None = None,
    on_text_query: OnBypassAction | None = None,
    on_skip_authz: OnSkipAuthz | None = None,
    audit_bypasses: bool | None = None,
    intercept_creates: bool | None = None,
    intercept_updates: bool | None = None,
    intercept_deletes: bool | None = None,
    on_write_denied: OnWriteDenied | None = None,
    on_unknown_action: OnUnknownAction | None = None,
) -> AuthzConfig:
    """Return a new config with non-None overrides applied.

    Args:
        on_missing_policy: Override for on_missing_policy (ignored if None).
        default_action: Override for default_action (ignored if None).
        log_policy_decisions: Override for log_policy_decisions (ignored if None).
        on_unloaded_relationship: Override for on_unloaded_relationship (ignored if None).
        strict_mode: Override for strict_mode (ignored if None).
        on_unprotected_get: Override for on_unprotected_get (ignored if None).
        on_text_query: Override for on_text_query (ignored if None).
        on_skip_authz: Override for on_skip_authz (ignored if None).
        audit_bypasses: Override for audit_bypasses (ignored if None).
        intercept_creates: Override for intercept_creates (ignored if None).
        intercept_updates: Override for intercept_updates (ignored if None).
        intercept_deletes: Override for intercept_deletes (ignored if None).
        on_write_denied: Override for on_write_denied (ignored if None).
        on_unknown_action: Override for on_unknown_action (ignored if None).

    Returns:
        A new ``AuthzConfig`` with overrides merged.

    Example::

        base = AuthzConfig()
        session_cfg = base.merge(on_missing_policy="raise")
        query_cfg = session_cfg.merge(default_action="update")
    """
    return AuthzConfig(
        on_missing_policy=(
            on_missing_policy if on_missing_policy is not None else self.on_missing_policy
        ),
        default_action=(default_action if default_action is not None else self.default_action),
        log_policy_decisions=(
            log_policy_decisions
            if log_policy_decisions is not None
            else self.log_policy_decisions
        ),
        on_unloaded_relationship=(
            on_unloaded_relationship
            if on_unloaded_relationship is not None
            else self.on_unloaded_relationship
        ),
        strict_mode=(strict_mode if strict_mode is not None else self.strict_mode),
        on_unprotected_get=(
            on_unprotected_get if on_unprotected_get is not None else self.on_unprotected_get
        ),
        on_text_query=(on_text_query if on_text_query is not None else self.on_text_query),
        on_skip_authz=(on_skip_authz if on_skip_authz is not None else self.on_skip_authz),
        audit_bypasses=(audit_bypasses if audit_bypasses is not None else self.audit_bypasses),
        intercept_creates=(
            intercept_creates if intercept_creates is not None else self.intercept_creates
        ),
        intercept_updates=(
            intercept_updates if intercept_updates is not None else self.intercept_updates
        ),
        intercept_deletes=(
            intercept_deletes if intercept_deletes is not None else self.intercept_deletes
        ),
        on_write_denied=(
            on_write_denied if on_write_denied is not None else self.on_write_denied
        ),
        on_unknown_action=(
            on_unknown_action if on_unknown_action is not None else self.on_unknown_action
        ),
    )

Predicates

Predicate(fn, *, name='')

A composable authorization predicate.

Wraps a callable that takes an actor and returns a ColumnElement[bool]. Supports & (AND), | (OR), and ~ (NOT) composition.

Example::

is_published = Predicate(lambda actor: Post.is_published == True)
is_author = Predicate(lambda actor: Post.author_id == actor.id)

combined = is_published | is_author
expr = combined(current_user)  # ColumnElement[bool]
Source code in src/sqla_authz/policy/_predicate.py
def __init__(self, fn: Callable[..., ColumnElement[bool]], *, name: str = "") -> None:
    self._fn = fn
    self._name = name or getattr(fn, "__name__", "<anonymous>")

name property

The human-readable name of this predicate.

predicate(fn)

Decorator/factory that creates a Predicate from a callable.

Example::

@predicate
def is_published(actor: User) -> ColumnElement[bool]:
    return Post.is_published == True

# Or as a factory:
is_author = predicate(lambda actor: Post.author_id == actor.id)
Source code in src/sqla_authz/policy/_predicate.py
def predicate(fn: Callable[..., ColumnElement[bool]]) -> Predicate:
    """Decorator/factory that creates a Predicate from a callable.

    Example::

        @predicate
        def is_published(actor: User) -> ColumnElement[bool]:
            return Post.is_published == True

        # Or as a factory:
        is_author = predicate(lambda actor: Post.author_id == actor.id)
    """
    return Predicate(fn, name=getattr(fn, "__name__", "<lambda>"))

always_allow = Predicate(_always_allow, name='always_allow') module-attribute

always_deny = Predicate(_always_deny, name='always_deny') module-attribute

Scopes

scope(applies_to, *, actions=None, registry=None)

Decorator that registers a cross-cutting scope filter.

Scopes are AND'd with OR'd policy results for matching models. They enforce invariants like tenant isolation that must not be accidentally bypassed by adding a new policy.

The decorated function receives (actor, Model) where Model is the class currently being filtered, and returns a ColumnElement[bool].

Parameters:

Name Type Description Default
applies_to Sequence[type]

List of SQLAlchemy model classes this scope covers.

required
actions Sequence[str] | None

Optional list of actions to restrict this scope to. If None (default), the scope applies to all actions.

None
registry PolicyRegistry | None

Optional custom registry. Defaults to the global registry.

None

Returns:

Type Description
Callable[[F], F]

A decorator that registers the function and returns it unchanged.

Example::

@scope(applies_to=[Post, Comment, Document])
def tenant_scope(actor: User, Model: type) -> ColumnElement[bool]:
    return Model.org_id == actor.org_id

@scope(applies_to=[Post, Comment], actions=["read"])
def soft_delete(actor: User, Model: type) -> ColumnElement[bool]:
    return Model.deleted_at.is_(None)
Source code in src/sqla_authz/policy/_scope.py
def scope(
    applies_to: Sequence[type],
    *,
    actions: Sequence[str] | None = None,
    registry: PolicyRegistry | None = None,
) -> Callable[[F], F]:
    """Decorator that registers a cross-cutting scope filter.

    Scopes are AND'd with OR'd policy results for matching models.
    They enforce invariants like tenant isolation that must not be
    accidentally bypassed by adding a new policy.

    The decorated function receives ``(actor, Model)`` where ``Model``
    is the class currently being filtered, and returns a
    ``ColumnElement[bool]``.

    Args:
        applies_to: List of SQLAlchemy model classes this scope covers.
        actions: Optional list of actions to restrict this scope to.
            If ``None`` (default), the scope applies to all actions.
        registry: Optional custom registry. Defaults to the global registry.

    Returns:
        A decorator that registers the function and returns it unchanged.

    Example::

        @scope(applies_to=[Post, Comment, Document])
        def tenant_scope(actor: User, Model: type) -> ColumnElement[bool]:
            return Model.org_id == actor.org_id

        @scope(applies_to=[Post, Comment], actions=["read"])
        def soft_delete(actor: User, Model: type) -> ColumnElement[bool]:
            return Model.deleted_at.is_(None)
    """
    if not applies_to:
        raise ValueError("applies_to must be a non-empty sequence of model classes")

    def decorator(fn: F) -> F:
        _validate_scope_signature(fn)

        target = registry if registry is not None else get_default_registry()
        scope_reg = ScopeRegistration(
            applies_to=tuple(applies_to),
            fn=fn,
            name=fn.__name__,
            description=fn.__doc__ or "",
            actions=tuple(actions) if actions is not None else None,
        )
        target.register_scope(scope_reg)
        return fn

    return decorator

ScopeRegistration(applies_to, fn, name, description, actions) dataclass

A registered scope function with its metadata.

Attributes:

Name Type Description
applies_to tuple[type, ...]

The model classes this scope applies to.

fn Callable[..., ColumnElement[bool]]

The scope function (actor, Model) -> ColumnElement[bool].

name str

The scope function name (for debugging/logging).

description str

Human-readable description (from docstring).

actions tuple[str, ...] | None

Actions this scope is restricted to, or None for all.

verify_scopes(base, *, field=None, when=None, registry=None)

Verify that all matching models have scope coverage.

Scans all concrete subclasses of base and checks that each matching model has at least one registered scope.

Exactly one of field or when must be provided.

Parameters:

Name Type Description Default
base type[DeclarativeBase]

The SQLAlchemy DeclarativeBase class to scan.

required
field str | None

Column name to match on. Models with this column are expected to have a scope.

None
when Callable[[type], bool] | None

Predicate (Model) -> bool. Models for which the predicate returns True are expected to have a scope.

None
registry PolicyRegistry | None

Optional custom registry. Defaults to the global registry.

None

Raises:

Type Description
ValueError

If neither or both of field and when are provided.

UnscopedModelError

If any matching models lack scope coverage.

Example::

from sqla_authz import verify_scopes
verify_scopes(Base, field="org_id")
Source code in src/sqla_authz/_verify.py
def verify_scopes(
    base: type[DeclarativeBase],
    *,
    field: str | None = None,
    when: Callable[[type], bool] | None = None,
    registry: PolicyRegistry | None = None,
) -> None:
    """Verify that all matching models have scope coverage.

    Scans all concrete subclasses of *base* and checks that each
    matching model has at least one registered scope.

    Exactly one of *field* or *when* must be provided.

    Args:
        base: The SQLAlchemy ``DeclarativeBase`` class to scan.
        field: Column name to match on.  Models with this column
            are expected to have a scope.
        when: Predicate ``(Model) -> bool``.  Models for which
            the predicate returns ``True`` are expected to have a scope.
        registry: Optional custom registry.  Defaults to the global registry.

    Raises:
        ValueError: If neither or both of *field* and *when* are provided.
        UnscopedModelError: If any matching models lack scope coverage.

    Example::

        from sqla_authz import verify_scopes
        verify_scopes(Base, field="org_id")
    """
    if field is None and when is None:
        raise ValueError("Either 'field' or 'when' must be provided")
    if field is not None and when is not None:
        raise ValueError("Only one of 'field' or 'when' may be provided, not both")

    target_registry = registry if registry is not None else get_default_registry()

    if field is not None:
        predicate = _field_predicate(field)
    else:
        assert when is not None
        predicate = when

    unscoped: list[type] = []
    for model in _concrete_subclasses(base):
        if predicate(model) and not target_registry.has_scopes(model):
            unscoped.append(model)

    if unscoped:
        raise UnscopedModelError(models=unscoped, field=field)

Session

authorized_sessionmaker(bind, *, actor_provider, action='read', registry=None, config=None, **kwargs)

Create a sessionmaker with automatic authorization interception.

Convenience factory that creates a sessionmaker and installs authorization interceptors in one step.

Parameters:

Name Type Description Default
bind Any

The engine or connection to bind to.

required
actor_provider Callable[[], ActorLike]

A callable returning the current actor.

required
action str

Default action string.

'read'
registry PolicyRegistry | None

Policy registry. Defaults to the global registry.

None
config AuthzConfig | None

Configuration. Defaults to the global config.

None
**kwargs Any

Additional keyword arguments passed to sessionmaker.

{}

Returns:

Type Description
sessionmaker[Session]

A configured sessionmaker with authorization interception.

Example::

AuthorizedSession = authorized_sessionmaker(
    bind=engine,
    actor_provider=get_current_user,
    action="read",
)

with AuthorizedSession() as session:
    posts = session.execute(select(Post)).scalars().all()
    # Only authorized posts are returned
Source code in src/sqla_authz/session/_interceptor.py
def authorized_sessionmaker(
    bind: Any,
    *,
    actor_provider: Callable[[], ActorLike],
    action: str = "read",
    registry: PolicyRegistry | None = None,
    config: AuthzConfig | None = None,
    **kwargs: Any,
) -> sessionmaker[Session]:
    """Create a sessionmaker with automatic authorization interception.

    Convenience factory that creates a ``sessionmaker`` and installs
    authorization interceptors in one step.

    Args:
        bind: The engine or connection to bind to.
        actor_provider: A callable returning the current actor.
        action: Default action string.
        registry: Policy registry. Defaults to the global registry.
        config: Configuration. Defaults to the global config.
        **kwargs: Additional keyword arguments passed to ``sessionmaker``.

    Returns:
        A configured ``sessionmaker`` with authorization interception.

    Example::

        AuthorizedSession = authorized_sessionmaker(
            bind=engine,
            actor_provider=get_current_user,
            action="read",
        )

        with AuthorizedSession() as session:
            posts = session.execute(select(Post)).scalars().all()
            # Only authorized posts are returned
    """
    factory: sessionmaker[Session] = sessionmaker(bind=bind, **kwargs)
    install_interceptor(
        factory,
        actor_provider=actor_provider,
        action=action,
        registry=registry,
        config=config,
    )
    return factory

install_interceptor(session_factory, *, actor_provider, action='read', registry=None, config=None)

Install a do_orm_execute event listener on a session factory.

The listener intercepts SELECT queries and, when enabled via config, CREATE/UPDATE/DELETE writes as well.

Parameters:

Name Type Description Default
session_factory sessionmaker[Session]

A SQLAlchemy sessionmaker instance.

required
actor_provider Callable[[], ActorLike]

A callable returning the current actor. Called once per query execution.

required
action str

Default action string. Can be overridden per-query via execution_options(authz_action="...").

'read'
registry PolicyRegistry | None

Policy registry to use. Defaults to the global registry.

None
config AuthzConfig | None

Configuration to use. Defaults to the global config.

None

Example::

factory = sessionmaker(bind=engine)
install_interceptor(
    factory,
    actor_provider=get_current_user,
    action="read",
)

with factory() as session:
    # All SELECT queries are automatically authorized
    posts = session.execute(select(Post)).scalars().all()
Source code in src/sqla_authz/session/_interceptor.py
def install_interceptor(
    session_factory: sessionmaker[Session],
    *,
    actor_provider: Callable[[], ActorLike],
    action: str = "read",
    registry: PolicyRegistry | None = None,
    config: AuthzConfig | None = None,
) -> None:
    """Install a ``do_orm_execute`` event listener on a session factory.

    The listener intercepts SELECT queries and, when enabled via config,
    CREATE/UPDATE/DELETE writes as well.

    Args:
        session_factory: A SQLAlchemy ``sessionmaker`` instance.
        actor_provider: A callable returning the current actor.
            Called once per query execution.
        action: Default action string. Can be overridden per-query
            via ``execution_options(authz_action="...")``.
        registry: Policy registry to use. Defaults to the global registry.
        config: Configuration to use. Defaults to the global config.

    Example::

        factory = sessionmaker(bind=engine)
        install_interceptor(
            factory,
            actor_provider=get_current_user,
            action="read",
        )

        with factory() as session:
            # All SELECT queries are automatically authorized
            posts = session.execute(select(Post)).scalars().all()
    """
    target_registry = registry if registry is not None else get_default_registry()
    target_config = config if config is not None else get_global_config()

    handler = _build_authz_handler(
        actor_provider=actor_provider,
        action=action,
        target_registry=target_registry,
        target_config=target_config,
    )
    create_handler = _build_create_authz_handler(
        actor_provider=actor_provider,
        target_registry=target_registry,
        target_config=target_config,
    )
    event.listen(session_factory, "do_orm_execute", handler)
    event.listen(session_factory, "before_flush", create_handler)

AuthorizationContext(actor, action, config) dataclass

Carries actor, action, and config through the authorization pipeline.

Attributes:

Name Type Description
actor ActorLike

The current user/principal satisfying ActorLike.

action str

The action being performed (e.g., "read", "update").

config AuthzConfig

The resolved configuration for this authorization check.

Example::

ctx = AuthorizationContext(
    actor=current_user,
    action="read",
    config=AuthzConfig(),
)

Exceptions

exceptions

Exception hierarchy for sqla-authz.

AuthzError

Bases: Exception

Base exception for all sqla-authz errors.

AuthorizationDenied(*, actor, action, resource_type, message=None)

Bases: AuthzError

Actor is not authorized to perform the requested action.

Attributes:

Name Type Description
actor

The actor that was denied.

action

The action that was attempted.

resource_type

The type of resource involved.

Example::

try:
    authorize(user, "delete", post)
except AuthorizationDenied as exc:
    print(f"{exc.actor} cannot {exc.action} {exc.resource_type}")
Source code in src/sqla_authz/exceptions.py
def __init__(
    self,
    *,
    actor: object,
    action: str,
    resource_type: str,
    message: str | None = None,
) -> None:
    self.actor = actor
    self.action = action
    self.resource_type = resource_type
    if message is None:
        message = f"Actor {actor!r} is not authorized to {action} {resource_type}"
    super().__init__(message)

NoPolicyError(*, resource_type, action)

Bases: AuthzError

No policy registered for (resource_type, action).

Raised when configured to error on missing policies instead of the default deny-by-default (WHERE FALSE) behavior.

Attributes:

Name Type Description
resource_type

The resource type with no policy.

action

The action with no policy.

Example::

configure(on_missing_policy="raise")
# Now missing policies raise instead of silently denying
Source code in src/sqla_authz/exceptions.py
def __init__(
    self,
    *,
    resource_type: str,
    action: str,
) -> None:
    self.resource_type = resource_type
    self.action = action
    super().__init__(f"No policy registered for ({resource_type}, {action!r})")

PolicyCompilationError

Bases: AuthzError

Policy returned an invalid expression.

Raised when a policy function returns something other than a SQLAlchemy ColumnElement[bool].

UnknownActionError(*, action, known_actions, suggestion=None)

Bases: AuthzError

Action string has no registered policies for any model.

Raised when on_unknown_action="raise" and the action string doesn't match any registered policy. This typically indicates a typo in the action name.

Attributes:

Name Type Description
action

The unrecognized action string.

known_actions

List of valid action strings.

suggestion

Closest matching action, if any.

Example::

configure(on_unknown_action="raise")
# authorize_query(..., action="raed") now raises:
# UnknownActionError: Action 'raed' has no registered policies.
#   Did you mean 'read'?
#   Known actions: ['create', 'delete', 'read', 'update']
Source code in src/sqla_authz/exceptions.py
def __init__(
    self,
    *,
    action: str,
    known_actions: list[str],
    suggestion: str | None = None,
) -> None:
    self.action = action
    self.known_actions = known_actions
    self.suggestion = suggestion
    parts = [f"Action {action!r} has no registered policies."]
    if suggestion:
        parts.append(f"Did you mean {suggestion!r}?")
    parts.append(f"Known actions: {known_actions}")
    super().__init__(" ".join(parts))

UnscopedModelError(*, models, field=None)

Bases: AuthzError

One or more models lack required scope coverage.

Raised by verify_scopes() when models matching the check criteria have no registered scopes.

Attributes:

Name Type Description
models

The model classes that lack scope coverage.

field

The field name used for matching (if any).

Source code in src/sqla_authz/exceptions.py
def __init__(
    self,
    *,
    models: list[type],
    field: str | None = None,
) -> None:
    self.models = models
    self.field = field
    names = ", ".join(m.__name__ for m in models)
    if field:
        msg = f"The following models have a '{field}' column but no registered scope: {names}"
    else:
        msg = f"The following models have no registered scope: {names}"
    super().__init__(msg)

WriteDeniedError(*, actor, action, resource_type, message=None)

Bases: AuthzError

Write operation denied by authorization policy.

Raised when a write operation targets data that the actor is not authorized to create, update, or delete.

Attributes:

Name Type Description
actor

The actor that was denied.

action

The action that was attempted.

resource_type

The type of resource involved.

Source code in src/sqla_authz/exceptions.py
def __init__(
    self,
    *,
    actor: object,
    action: str,
    resource_type: str,
    message: str | None = None,
) -> None:
    self.actor = actor
    self.action = action
    self.resource_type = resource_type
    if message is None:
        message = f"Actor {actor!r} is not authorized to {action} {resource_type}"
    super().__init__(message)

FastAPI Integration

AuthzDep(model, action, *, id_param=None, pk_column='id', registry=None)

FastAPI dependency for authorized queries.

Returns a Depends() instance that resolves authorized model instances by applying registered policies. When id_param is None, returns a list of all authorized instances (collection endpoint). When id_param is set, fetches a single instance by primary key from the named path parameter, returning 404 if not found or not authorized.

Use directly as a default parameter value in route signatures.

Parameters:

Name Type Description Default
model type

The SQLAlchemy model class to query.

required
action str

The authorization action (e.g., "read").

required
id_param str | None

Path parameter name for single-item lookups.

None
pk_column str

Model attribute name for the primary key column. Defaults to "id". Use this when your model's PK attribute has a different name (e.g., "uuid").

'id'
registry PolicyRegistry | None

Optional per-dependency registry override.

None

Returns:

Type Description
Any

A FastAPI Depends instance.

Example::

@app.get("/posts")
async def list_posts(
    posts: list[Post] = AuthzDep(Post, "read"),
) -> list[dict]:
    return [{"id": p.id, "title": p.title} for p in posts]

@app.get("/posts/{post_id}")
async def get_post(
    post: Post = AuthzDep(Post, "read", id_param="post_id"),
) -> dict:
    return {"id": post.id, "title": post.title}

# Model with non-'id' PK:
@app.get("/documents/{doc_uuid}")
async def get_document(
    doc: Document = AuthzDep(
        Document, "read", id_param="doc_uuid", pk_column="uuid"
    ),
) -> dict:
    return {"uuid": doc.uuid}
Source code in src/sqla_authz/integrations/fastapi/_dependencies.py
def AuthzDep(
    model: type,
    action: str,
    *,
    id_param: str | None = None,
    pk_column: str = "id",
    registry: PolicyRegistry | None = None,
) -> Any:
    """FastAPI dependency for authorized queries.

    Returns a ``Depends()`` instance that resolves authorized model
    instances by applying registered policies. When ``id_param`` is
    ``None``, returns a list of all authorized instances (collection
    endpoint). When ``id_param`` is set, fetches a single instance by
    primary key from the named path parameter, returning 404 if not
    found or not authorized.

    Use directly as a default parameter value in route signatures.

    Args:
        model: The SQLAlchemy model class to query.
        action: The authorization action (e.g., ``"read"``).
        id_param: Path parameter name for single-item lookups.
        pk_column: Model attribute name for the primary key column.
            Defaults to ``"id"``. Use this when your model's PK
            attribute has a different name (e.g., ``"uuid"``).
        registry: Optional per-dependency registry override.

    Returns:
        A FastAPI ``Depends`` instance.

    Example::

        @app.get("/posts")
        async def list_posts(
            posts: list[Post] = AuthzDep(Post, "read"),
        ) -> list[dict]:
            return [{"id": p.id, "title": p.title} for p in posts]

        @app.get("/posts/{post_id}")
        async def get_post(
            post: Post = AuthzDep(Post, "read", id_param="post_id"),
        ) -> dict:
            return {"id": post.id, "title": post.title}

        # Model with non-'id' PK:
        @app.get("/documents/{doc_uuid}")
        async def get_document(
            doc: Document = AuthzDep(
                Document, "read", id_param="doc_uuid", pk_column="uuid"
            ),
        ) -> dict:
            return {"uuid": doc.uuid}
    """
    dep_fn = _make_dependency(
        model, action, id_param=id_param, pk_column=pk_column, registry=registry
    )
    return Depends(dep_fn)

get_actor(request)

Sentinel dependency — override via app.dependency_overrides[get_actor].

Raises NotImplementedError when called directly. Override this dependency in your FastAPI app to provide the current actor.

Example::

from sqla_authz.integrations.fastapi import get_actor

app.dependency_overrides[get_actor] = my_get_current_user
Source code in src/sqla_authz/integrations/fastapi/_dependencies.py
def get_actor(request: Request) -> ActorLike:
    """Sentinel dependency — override via ``app.dependency_overrides[get_actor]``.

    Raises ``NotImplementedError`` when called directly. Override this
    dependency in your FastAPI app to provide the current actor.

    Example::

        from sqla_authz.integrations.fastapi import get_actor

        app.dependency_overrides[get_actor] = my_get_current_user
    """
    raise NotImplementedError(
        "Override get_actor via app.dependency_overrides[get_actor]. "
        "See sqla-authz docs for configuration guide."
    )

get_session(request)

Sentinel dependency — override via app.dependency_overrides[get_session].

Raises NotImplementedError when called directly. Override this dependency in your FastAPI app to provide the current SQLAlchemy session.

Example::

from sqla_authz.integrations.fastapi import get_session

app.dependency_overrides[get_session] = my_get_db_session
Source code in src/sqla_authz/integrations/fastapi/_dependencies.py
def get_session(request: Request) -> Session:
    """Sentinel dependency — override via ``app.dependency_overrides[get_session]``.

    Raises ``NotImplementedError`` when called directly. Override this
    dependency in your FastAPI app to provide the current SQLAlchemy session.

    Example::

        from sqla_authz.integrations.fastapi import get_session

        app.dependency_overrides[get_session] = my_get_db_session
    """
    raise NotImplementedError(
        "Override get_session via app.dependency_overrides[get_session]. "
        "See sqla-authz docs for configuration guide."
    )

install_error_handlers(app)

Install exception handlers for sqla-authz errors on a FastAPI app.

Converts authorization exceptions into proper HTTP responses:

  • AuthorizationDenied -> 403 Forbidden
  • NoPolicyError -> 500 Internal Server Error

Parameters:

Name Type Description Default
app FastAPI

The FastAPI application instance.

required

Example::

from fastapi import FastAPI
from sqla_authz.integrations.fastapi import install_error_handlers

app = FastAPI()
install_error_handlers(app)
Source code in src/sqla_authz/integrations/fastapi/_errors.py
def install_error_handlers(app: FastAPI) -> None:
    """Install exception handlers for sqla-authz errors on a FastAPI app.

    Converts authorization exceptions into proper HTTP responses:

    - ``AuthorizationDenied`` -> 403 Forbidden
    - ``NoPolicyError`` -> 500 Internal Server Error

    Args:
        app: The FastAPI application instance.

    Example::

        from fastapi import FastAPI
        from sqla_authz.integrations.fastapi import install_error_handlers

        app = FastAPI()
        install_error_handlers(app)
    """

    @app.exception_handler(AuthorizationDenied)
    async def authz_denied_handler(  # pyright: ignore[reportUnusedFunction]
        request: object, exc: AuthorizationDenied
    ) -> JSONResponse:
        return JSONResponse(
            status_code=403,
            content={"detail": str(exc)},
        )

    @app.exception_handler(NoPolicyError)
    async def no_policy_handler(  # pyright: ignore[reportUnusedFunction]
        request: object, exc: NoPolicyError
    ) -> JSONResponse:
        return JSONResponse(
            status_code=500,
            content={"detail": str(exc)},
        )

Testing

testing

sqla-authz testing utilities — MockActor, assertions, and fixtures.

Provides test helpers for verifying authorization policies:

  • MockActor / factories: Lightweight actors for tests.
  • Assertion helpers: assert_authorized, assert_denied, assert_query_contains.
  • Fixtures: authz_registry, authz_config, authz_context.

Example::

from sqla_authz.testing import MockActor, assert_authorized
from sqlalchemy import select

def test_admin_reads_all(session, sample_data):
    assert_authorized(session, select(Post), MockActor(id=1, role="admin"), "read")

MockActor(id, role='viewer', org_id=None) dataclass

Test actor that satisfies the ActorLike protocol.

A lightweight, immutable dataclass for use in tests. Provides the id property required by ActorLike, plus optional role and org_id attributes commonly used in policies.

Example::

actor = MockActor(id=1, role="admin", org_id=5)
assert isinstance(actor, ActorLike)

make_admin(id=1)

Create an admin MockActor.

Parameters:

Name Type Description Default
id int | str

The actor's identifier. Defaults to 1.

1

Returns:

Type Description
MockActor

A MockActor with role="admin".

Example::

admin = make_admin()
assert admin.role == "admin"
Source code in src/sqla_authz/testing/_actors.py
def make_admin(id: int | str = 1) -> MockActor:
    """Create an admin ``MockActor``.

    Args:
        id: The actor's identifier. Defaults to ``1``.

    Returns:
        A ``MockActor`` with ``role="admin"``.

    Example::

        admin = make_admin()
        assert admin.role == "admin"
    """
    return MockActor(id=id, role="admin")

make_user(id=1, role='viewer', org_id=None)

Create a regular user MockActor.

Parameters:

Name Type Description Default
id int | str

The actor's identifier. Defaults to 1.

1
role str

The actor's role. Defaults to "viewer".

'viewer'
org_id int | None

Optional organization ID.

None

Returns:

Type Description
MockActor

A MockActor with the specified attributes.

Example::

user = make_user(id=5, role="editor", org_id=3)
assert user.role == "editor"
Source code in src/sqla_authz/testing/_actors.py
def make_user(
    id: int | str = 1,
    role: str = "viewer",
    org_id: int | None = None,
) -> MockActor:
    """Create a regular user ``MockActor``.

    Args:
        id: The actor's identifier. Defaults to ``1``.
        role: The actor's role. Defaults to ``"viewer"``.
        org_id: Optional organization ID.

    Returns:
        A ``MockActor`` with the specified attributes.

    Example::

        user = make_user(id=5, role="editor", org_id=3)
        assert user.role == "editor"
    """
    return MockActor(id=id, role=role, org_id=org_id)

make_anonymous()

Create an anonymous MockActor with id=0.

Returns:

Type Description
MockActor

A MockActor with id=0 and role="anonymous".

Example::

anon = make_anonymous()
assert anon.id == 0
assert anon.role == "anonymous"
Source code in src/sqla_authz/testing/_actors.py
def make_anonymous() -> MockActor:
    """Create an anonymous ``MockActor`` with ``id=0``.

    Returns:
        A ``MockActor`` with ``id=0`` and ``role="anonymous"``.

    Example::

        anon = make_anonymous()
        assert anon.id == 0
        assert anon.role == "anonymous"
    """
    return MockActor(id=0, role="anonymous")

assert_authorized(session, stmt, actor, action, *, expected_count=None, registry=None)

Assert that a query returns results after authorization.

Applies authorize_query and executes the statement. Fails with AssertionError if zero rows are returned. Optionally checks that the exact number of rows matches expected_count.

Parameters:

Name Type Description Default
session Session

A SQLAlchemy Session to execute the query.

required
stmt Select[Any]

A Select statement to authorize.

required
actor ActorLike

The actor to authorize as.

required
action str

The action string (e.g., "read").

required
expected_count int | None

If given, assert exactly this many rows.

None
registry PolicyRegistry | None

Optional custom registry.

None

Example::

assert_authorized(session, select(Post), admin, "read", expected_count=3)
Source code in src/sqla_authz/testing/_assertions.py
def assert_authorized(
    session: Session,
    stmt: Select[Any],
    actor: ActorLike,
    action: str,
    *,
    expected_count: int | None = None,
    registry: PolicyRegistry | None = None,
) -> None:
    """Assert that a query returns results after authorization.

    Applies ``authorize_query`` and executes the statement. Fails with
    ``AssertionError`` if zero rows are returned. Optionally checks
    that the exact number of rows matches ``expected_count``.

    Args:
        session: A SQLAlchemy ``Session`` to execute the query.
        stmt: A ``Select`` statement to authorize.
        actor: The actor to authorize as.
        action: The action string (e.g., ``"read"``).
        expected_count: If given, assert exactly this many rows.
        registry: Optional custom registry.

    Example::

        assert_authorized(session, select(Post), admin, "read", expected_count=3)
    """
    target_registry = registry if registry is not None else get_default_registry()
    authorized_stmt = authorize_query(
        stmt,
        actor=actor,
        action=action,
        registry=target_registry,
    )
    results = session.execute(authorized_stmt).scalars().all()
    count = len(results)

    if count == 0:
        raise AssertionError(
            f"expected authorized query to return rows, but got 0 "
            f"(actor={actor!r}, action={action!r})"
        )

    if expected_count is not None and count != expected_count:
        raise AssertionError(
            f"expected {expected_count} rows, but got {count} (actor={actor!r}, action={action!r})"
        )

assert_denied(session, stmt, actor, action, *, registry=None)

Assert that a query returns zero results after authorization.

The inverse of assert_authorized — verifies deny-by-default or explicit denial.

Parameters:

Name Type Description Default
session Session

A SQLAlchemy Session to execute the query.

required
stmt Select[Any]

A Select statement to authorize.

required
actor ActorLike

The actor to authorize as.

required
action str

The action string.

required
registry PolicyRegistry | None

Optional custom registry.

None

Example::

assert_denied(session, select(Post), anonymous_user, "delete")
Source code in src/sqla_authz/testing/_assertions.py
def assert_denied(
    session: Session,
    stmt: Select[Any],
    actor: ActorLike,
    action: str,
    *,
    registry: PolicyRegistry | None = None,
) -> None:
    """Assert that a query returns zero results after authorization.

    The inverse of ``assert_authorized`` — verifies deny-by-default
    or explicit denial.

    Args:
        session: A SQLAlchemy ``Session`` to execute the query.
        stmt: A ``Select`` statement to authorize.
        actor: The actor to authorize as.
        action: The action string.
        registry: Optional custom registry.

    Example::

        assert_denied(session, select(Post), anonymous_user, "delete")
    """
    target_registry = registry if registry is not None else get_default_registry()
    authorized_stmt = authorize_query(
        stmt,
        actor=actor,
        action=action,
        registry=target_registry,
    )
    results = session.execute(authorized_stmt).scalars().all()
    count = len(results)

    if count != 0:
        raise AssertionError(
            f"expected zero rows but got {count} (actor={actor!r}, action={action!r})"
        )

assert_query_contains(stmt, actor, action, *, text, registry=None)

Assert that compiled SQL of an authorized query contains the given text.

Useful for structural tests that verify filter expressions are applied without requiring a database connection.

Parameters:

Name Type Description Default
stmt Select[Any]

A Select statement to authorize.

required
actor ActorLike

The actor to authorize as.

required
action str

The action string.

required
text str

The text to search for in the compiled SQL.

required
registry PolicyRegistry | None

Optional custom registry.

None

Example::

assert_query_contains(
    select(Post), admin, "read",
    text="is_published", registry=registry,
)
Source code in src/sqla_authz/testing/_assertions.py
def assert_query_contains(
    stmt: Select[Any],
    actor: ActorLike,
    action: str,
    *,
    text: str,
    registry: PolicyRegistry | None = None,
) -> None:
    """Assert that compiled SQL of an authorized query contains the given text.

    Useful for structural tests that verify filter expressions are applied
    without requiring a database connection.

    Args:
        stmt: A ``Select`` statement to authorize.
        actor: The actor to authorize as.
        action: The action string.
        text: The text to search for in the compiled SQL.
        registry: Optional custom registry.

    Example::

        assert_query_contains(
            select(Post), admin, "read",
            text="is_published", registry=registry,
        )
    """
    target_registry = registry if registry is not None else get_default_registry()
    authorized_stmt = authorize_query(
        stmt,
        actor=actor,
        action=action,
        registry=target_registry,
    )
    compiled_sql = str(authorized_stmt.compile(compile_kwargs={"literal_binds": True}))

    if text not in compiled_sql:
        raise AssertionError(f"{text!r} not found in compiled SQL:\n{compiled_sql}")

authz_registry()

Provide a fresh, isolated PolicyRegistry for each test.

The registry is created empty and is not shared with the global default registry.

Example::

def test_my_policy(authz_registry):
    authz_registry.register(Post, "read", my_fn, name="p", description="")
    assert authz_registry.has_policy(Post, "read")
Source code in src/sqla_authz/testing/_fixtures.py
@pytest.fixture()
def authz_registry() -> Generator[PolicyRegistry, None, None]:
    """Provide a fresh, isolated ``PolicyRegistry`` for each test.

    The registry is created empty and is not shared with the global
    default registry.

    Example::

        def test_my_policy(authz_registry):
            authz_registry.register(Post, "read", my_fn, name="p", description="")
            assert authz_registry.has_policy(Post, "read")
    """
    yield PolicyRegistry()

authz_config()

Provide a default authorization config for testing.

Returns a simple dict with default settings. Will be updated to return an AuthzConfig instance when the config module is ready.

Example::

def test_with_config(authz_config):
    assert authz_config["on_missing_policy"] == "deny"
Source code in src/sqla_authz/testing/_fixtures.py
@pytest.fixture()
def authz_config() -> dict[str, Any]:
    """Provide a default authorization config for testing.

    Returns a simple dict with default settings. Will be updated to
    return an ``AuthzConfig`` instance when the config module is ready.

    Example::

        def test_with_config(authz_config):
            assert authz_config["on_missing_policy"] == "deny"
    """
    try:
        from sqla_authz.config._config import AuthzConfig

        return AuthzConfig()  # type: ignore[return-value]
    except ImportError:
        return {"on_missing_policy": "deny", "default_action": "read"}

authz_context()

Provide a default AuthorizationContext for tests.

Source code in src/sqla_authz/testing/_fixtures.py
@pytest.fixture()
def authz_context() -> AuthorizationContext:
    """Provide a default ``AuthorizationContext`` for tests."""
    return AuthorizationContext(
        actor=MockActor(id=1),
        action="read",
        config=AuthzConfig(),
    )