Skip to content

Integrations API

driftwatch.integrations.fastapi.DriftMiddleware

DriftMiddleware(app: ASGIApp, monitor: Monitor, feature_extractor: Callable[[dict[str, Any]], dict[str, Any]] | None = None, prediction_extractor: Callable[[dict[str, Any]], dict[str, Any]] | None = None, check_interval: int = 100, min_samples: int = 50, buffer_size: int = 10000, enabled: bool = True)

Bases: BaseHTTPMiddleware

FastAPI middleware for automatic drift monitoring.

Collects input features from requests and runs drift detection on a configurable schedule.

Parameters:

Name Type Description Default
app ASGIApp

The ASGI application

required
monitor Monitor

DriftWatch Monitor instance with reference data

required
feature_extractor Callable[[dict[str, Any]], dict[str, Any]] | None

Function to extract features from request body. Defaults to returning the entire request body as features.

None
check_interval int

Number of requests between drift checks. Set to 0 to disable automatic checks.

100
min_samples int

Minimum samples required before running drift check.

50
enabled bool

Whether drift collection is enabled.

True
Example
from fastapi import FastAPI
from driftwatch import Monitor
from driftwatch.integrations.fastapi import DriftMiddleware

monitor = Monitor(reference_data=train_df)
app = FastAPI()

app.add_middleware(
    DriftMiddleware,
    monitor=monitor,
    check_interval=100,
)
Source code in src/driftwatch/integrations/fastapi.py
def __init__(
    self,
    app: ASGIApp,
    monitor: Monitor,
    feature_extractor: Callable[[dict[str, Any]], dict[str, Any]] | None = None,
    prediction_extractor: Callable[[dict[str, Any]], dict[str, Any]] | None = None,
    check_interval: int = 100,
    min_samples: int = 50,
    buffer_size: int = 10000,
    enabled: bool = True,
) -> None:
    super().__init__(app)
    self.monitor = monitor
    self.feature_extractor = feature_extractor or (lambda x: x)
    self.prediction_extractor = prediction_extractor
    self.check_interval = check_interval
    self.min_samples = min_samples
    self.buffer_size = buffer_size
    self.enabled = enabled
    self.state = DriftState(
        samples=deque(maxlen=buffer_size),
        predictions=deque(maxlen=buffer_size),
    )
    self._background_tasks: set[asyncio.Task[None]] = set()

dispatch async

dispatch(request: Request, call_next: Callable) -> Response

Process request and collect features for drift monitoring.

Source code in src/driftwatch/integrations/fastapi.py
async def dispatch(self, request: Request, call_next: Callable) -> Response:
    """Process request and collect features for drift monitoring."""
    if not self.enabled:
        return cast("Response", await call_next(request))

    # Skip non-POST requests and internal endpoints
    if request.method != "POST" or request.url.path.startswith("/drift"):
        return cast("Response", await call_next(request))

    # Try to extract features from request body
    try:
        body = await request.json()
        features = self.feature_extractor(body)

        if features and isinstance(features, dict):
            # Filter to only monitored features
            monitored = {
                k: v
                for k, v in features.items()
                if k in self.monitor.monitored_features
            }
            if monitored:
                self.state.add_sample(monitored)

    except Exception:
        # Don't fail the request if feature extraction fails
        pass

    # Process the request
    response = cast("Response", await call_next(request))

    # Try to extract predictions from response
    if self.prediction_extractor is not None:
        try:
            # For JSONResponse, we can access the body
            if hasattr(response, "body"):
                import json

                response_body = json.loads(response.body)
                prediction = self.prediction_extractor(response_body)
                if prediction and isinstance(prediction, dict):
                    self.state.add_prediction(prediction)
        except Exception:
            pass

    # Check if we should run drift detection
    if self._should_check_drift():
        task = asyncio.create_task(self._run_drift_check())
        self._background_tasks.add(task)
        task.add_done_callback(self._background_tasks.discard)

    return response

driftwatch.integrations.alerting.SlackAlerter

SlackAlerter(webhook_url: str, throttle_minutes: int = 60, mention_user: str | None = None, channel_override: str | None = None)

Send drift alerts to Slack via webhook.

Formats drift reports as Slack Block Kit messages with feature-level details and supports alert throttling to avoid spam.

Parameters:

Name Type Description Default
webhook_url str

Slack webhook URL (https://hooks.slack.com/...)

required
throttle_minutes int

Minimum minutes between alerts (default: 60)

60
mention_user str | None

Optional Slack user ID to mention (@U123ABC)

None
channel_override str | None

Optional channel to post to (overrides webhook default)

None
Example
from driftwatch.integrations.alerting import SlackAlerter

alerter = SlackAlerter(
    webhook_url="https://hooks.slack.com/services/...",
    throttle_minutes=60
)

if report.has_drift():
    alerter.send(report)
Source code in src/driftwatch/integrations/alerting.py
def __init__(
    self,
    webhook_url: str,
    throttle_minutes: int = 60,
    mention_user: str | None = None,
    channel_override: str | None = None,
) -> None:
    self.webhook_url = webhook_url
    self.throttle_seconds = throttle_minutes * 60
    self.mention_user = mention_user
    self.channel_override = channel_override
    self._last_alert_time: float = 0.0

send

send(report: DriftReport, force: bool = False, custom_message: str | None = None) -> bool

Send drift report to Slack.

Parameters:

Name Type Description Default
report DriftReport

DriftReport to send

required
force bool

Skip throttling check

False
custom_message str | None

Optional custom message prefix

None

Returns:

Type Description
bool

True if alert was sent, False if throttled

Raises:

Type Description
HTTPError

If webhook request fails

Source code in src/driftwatch/integrations/alerting.py
def send(
    self,
    report: DriftReport,
    force: bool = False,
    custom_message: str | None = None,
) -> bool:
    """
    Send drift report to Slack.

    Args:
        report: DriftReport to send
        force: Skip throttling check
        custom_message: Optional custom message prefix

    Returns:
        True if alert was sent, False if throttled

    Raises:
        httpx.HTTPError: If webhook request fails
    """
    # Check throttling
    if not force and self._is_throttled():
        return False

    # Build Slack message
    blocks = self._build_blocks(report, custom_message)
    payload: dict[str, Any] = {"blocks": blocks}

    if self.channel_override:
        payload["channel"] = self.channel_override

    # Send to Slack
    response = httpx.post(
        self.webhook_url, json=payload, timeout=10.0, follow_redirects=True
    )
    response.raise_for_status()

    # Update throttle timestamp
    self._last_alert_time = time.time()

    return True

get_next_alert_time

get_next_alert_time() -> datetime | None

Get the earliest time the next alert can be sent.

Source code in src/driftwatch/integrations/alerting.py
def get_next_alert_time(self) -> datetime | None:
    """Get the earliest time the next alert can be sent."""
    if self._last_alert_time == 0.0:
        return None

    next_time = self._last_alert_time + self.throttle_seconds
    return datetime.fromtimestamp(next_time, tz=timezone.utc)

reset_throttle

reset_throttle() -> None

Reset throttle timer (allows immediate next alert).

Source code in src/driftwatch/integrations/alerting.py
def reset_throttle(self) -> None:
    """Reset throttle timer (allows immediate next alert)."""
    self._last_alert_time = 0.0