matcher¶
Per-intent ANN search and event-driven scoring. Cron-mode intents run an APScheduler tick that queries Qdrant for new articles in a recency window; event-mode intents are scored in-process against every embedding batch as it lands. Both paths emit a
Matchlist toapp.state.on_match, which the notifier replaces at startup.
Responsibility¶
- Run one APScheduler job per enabled cron-mode intent on its configured trigger (hourly / daily / weekly), each tick filtering Qdrant
news_currentbyingested_at_tsrecency window and the intent'sfeed_filter - Maintain an in-process cache of event-mode intent vectors and score every freshly embedded article batch against them in pure Python (cosine via dot product)
- Buffer event-mode hits per intent in
event_pending, merging similar titles into groups, and flush either ontrigger_countreached or onmax_wait_secondselapsed (sweeper job) - Track which
(intent_id, article_id)pairs have already fired throughmatch_seenso cron-mode notifications don't repeat across ticks (event-mode does not writematch_seen) - Provide a manual fire path that reuses the cron scan logic but bypasses
match_seenso an operator can see what an intent would match right now - Surface a stable
on_matchinjection seam so notifier wiring happens at lifespan startup, not at module import time
Not in scope¶
- Article ingestion (lives in
collector) - Embedding (lives in
embedder) - Collection bootstrap and alias management (lives in
vector_store) - Summary text generation and channel delivery (lives in
summarizerandnotifier) — matcher only produces a list ofMatchobjects - Cross-process coordination — the event cache, sweeper, and fire-task store are single-process
Public interface¶
Match callback (callback.py)¶
@dataclass
class Match:
intent_id: int
article_id: str # UUID string matching Qdrant point id
score: float
payload: dict # title / url / body / feed_id / published_at
OnMatchCallback = Callable[[list[Match]], Awaitable[None]]
async def log_matches(matches: list[Match]) -> None
Match.payload carries the article payload as Qdrant returned it; downstream consumers should not assume more fields than title, url, body, feed_id, published_at. log_matches is the placeholder default — a real notifier replaces app.state.on_match during lifespan startup.
Cron scan (scan.py)¶
@dataclass
class ScanOptions:
lookback_seconds: int
threshold: float
skip_seen: bool # filter already-seen ids before returning
feed_ids: list[int] | None # None = all feeds, [] = nothing
write_match_seen: bool # cron path = True, fire path = False
async def scan_once(intent, options, conn, qdrant_client) -> list[Match]
async def run_intent_scan(intent_id: int, app: FastAPI) -> None
scan_once is the shared core for scheduled ticks and manual fires. run_intent_scan is what APScheduler invokes — it loads the Intent from SQLite, short-circuits if the intent has been disabled or deleted between job registration and tick, runs scan_once with write_match_seen=True, and forwards the result to app.state.on_match.
A failed on_match is logged but never re-raised: match_seen has already been written, so the tick is "done" from the matcher's perspective even if the downstream pipeline blew up.
Job lifecycle (jobs.py)¶
def register_intent_job(scheduler, intent, app, *, fire_immediately=False) -> None
def unregister_intent_job(scheduler, intent_id) -> None
def reregister_intent_job(scheduler, intent, app) -> None # alias for register
async def register_all_enabled(scheduler, intents, app, qdrant_client) -> None
Job id is matcher-intent-{intent_id}; replace_existing=True makes re-registration idempotent. Triggers are built with coalesce=True, max_instances=1, and misfire_grace_time=None so a delayed wakeup never silently skips a fire. The trigger's tzinfo is passed as a string (not a ZoneInfo object) — APScheduler 3.11 sorts due jobs by datetime and a tz-mismatch silently prevents firing.
register_all_enabled is the startup entry point: for every enabled cron-mode intent it confirms a vector is present in Qdrant before registering. An intent whose Qdrant point was lost (partial-delete failure, manual cleanup) is skipped with an error so it does not produce an unbounded stream of "no vector" warnings every tick.
Event-mode intents do NOT register an APScheduler job — they live entirely in the event-driven path below.
Event-driven path (event_match.py, event_cache.py, event_buffer.py)¶
class EventIntentCache:
def add(intent_id, entry) -> None
def remove(intent_id) -> None
def get(intent_id) -> EventIntentEntry | None
@dataclass
class EventIntentEntry:
vector: list[float]
threshold: float
feed_filter_ids: list[int] | None
schedule: EventSchedule
async def load_event_cache(cache, qdrant_handle, conn) -> None
async def event_match_batch(app, points, conn) -> None
async def absorb(conn, intent_id, batch_matches, schedule) -> bool
async def flush(conn, app, intent_id) -> None
async def sweep_timed_out(conn, app, event_intent_cache) -> None
event_match_batch is called by the embedder worker after each Qdrant upsert. It iterates the cache, scores every (intent, article) pair via dot product (valid as cosine because the embedder is contracted to be unit-normalized — see BaseEmbedder.is_unit_normalized), and feeds hits to absorb. The whole call is wrapped in a try/except that logs and swallows: an event-path bug must not abort embedder ingestion.
absorb opens an explicit BEGIN IMMEDIATE transaction, merges the new batch_matches into event_pending by title similarity (≥0.85 SequenceMatcher ratio), and returns True when the per-intent buffered group count meets schedule.trigger_count. Article-level dedup inside each group prevents double-counting when the embedder retries a batch.
flush performs an atomic DELETE … RETURNING members_json on event_pending for the intent, commits, and only then awaits on_match. A failed on_match is logged but never re-raised — same contract as the cron path: the buffer is already cleared.
sweep_timed_out runs every 30 s as a separate APScheduler job (event_y_sweeper in main.lifespan). It picks the oldest buffered group per intent and flushes any whose age exceeds schedule.max_wait_seconds. Each intent's flush is isolated so one bad pipeline call does not stop the others.
Manual fire (fire_tasks.py)¶
@dataclass
class FireTask:
task_id: str
intent_id: int
status: str # "running" | "done" | "error"
started_at: datetime
finished_at: datetime | None
match_count: int
matches: list[dict]
pushed: bool
push_error: str | None
def create_task(intent_id) -> FireTask
def get_task(task_id) -> FireTask | None
def throttle_check(intent_id, rate_seconds=60) -> bool
def sweep_expired(ttl_seconds=3600) -> int
In-memory state, single-process. throttle_check consults _last_fire_at[intent_id]; create_task updates that map at task creation, so the rate-limit window starts at the accepted request, not at each rejection. APScheduler runs sweep_expired every five minutes to keep the task map bounded to ~1 h of recent fires.
The fire path runs scan_once with write_match_seen=False, so manually firing an intent never updates match_seen — it shows the operator what would match right now without affecting the next scheduled tick.
Cron recall (cron_recall.py)¶
def past_n_fire_times(
schedule: CronSchedule,
timezone: str,
n: int,
*,
now: datetime | None = None,
) -> list[datetime]
Computes the n most recent past fire-times for a cron schedule in a given timezone, returning them newest-first as UTC-aware datetimes. The current interval is always excluded (a daily 09:00 schedule queried at 14:00 still skips today's 09:00). Used by the backfill orchestrator and the history-list date-bounding logic.
The now parameter allows pinning wall-clock for deterministic tests. DST transitions are handled on a best-effort basis — the function documents this limitation explicitly in its docstring.
Backfill (backfill.py, backfill_tasks.py)¶
async def probe_oldest_news_ts(qdrant_client) -> int | None
async def run_backfill(
scheduler: AsyncIOScheduler,
intent: Intent,
app,
*,
task_id: str,
since_utc: str | None = None,
until_utc: str | None = None,
) -> None
Replays past cron fire-times through the standard scan+summarize pipeline. The orchestrator:
- Takes a schedule snapshot from the intent row before any concurrent PUT can mutate it
- Computes past fire-times via
past_n_fire_times - If
since_utcis not specified, bounds the lower end by Qdrant's oldestingested_at_tsso fire-times before any article existed are skipped - For each past fire-time, runs
scan_oncewithnowpinned so the lookback window is anchored to the replayed tick, not to wall-clock - Writes a
summary_historyrow per tick
In-memory task tracking mirrors the fire-task registry. A running backfill can be observed via GET /intents/{intent_id}/backfill/{task_id}.
Configuration¶
| Field | Default | Notes |
|---|---|---|
Intent.threshold |
model-dependent (typically 0.5–0.7 for BGE-M3) | per-intent; passed to Qdrant query_points(score_threshold=...) |
CronSchedule.lookback_seconds |
configured per intent | window of ingested_at_ts the cron scan considers |
CronSchedule.skip_seen |
True |
when False, the scan re-emits matches every tick (notify-every-time mode) |
EventSchedule.trigger_count |
configured per intent | flush when this many groups accumulate |
EventSchedule.max_wait_seconds |
configured per intent | flush when the oldest group reaches this age |
_SEARCH_LIMIT |
100 |
upper bound on Qdrant results per cron tick |
_FIRE_RATE_LIMIT_SECONDS |
60 |
per-intent manual fire rate limit |
_TASK_TTL_SECONDS |
3600 |
manual fire task retention |
SEMBR_DEBUG_MATCHER |
unset | env var; when set, an empty cron tick runs two extra Qdrant queries to log whether the time filter or the threshold was the limiting factor — off by default because empty results are common and the extra queries multiply Qdrant load |
Upstream dependencies¶
db.intents—get_intent,list_intentsdb.match_seen—insert_unseen_returning_newdb.sqlite.get_conn— shared aiosqlite connectionvector_store.intents.ALIAS_NAMEandvector_store.news.ALIAS_NAME— alias names for retrieve/queryvector_store.qdrant.extract_point_vector— named-vector-safe vector extractionsummarizer.grouping.GroupingStepandnormalize— title-similarity grouping for event bufferembedder.base.BaseEmbedder.is_unit_normalized— contract that lets_dotstand in for cosine
Downstream consumers¶
app.state.on_match— set by the notifier (orlog_matchesas the no-op default); both cron and event paths call this with alist[Match]api.fire— usesfire_tasks.create_task/get_taskfor the manual-fire endpoint, andscan_once(withwrite_match_seen=False) for the actual runembedder.scheduler.embedder_worker— callsevent_match_batchafter each Qdrant upsertmain.lifespan— registersregister_all_enabled,load_event_cache, the 30 ssweep_timed_outjob, and the 5 minsweep_expiredjob
Known constraints¶
on_matchis single-handler:app.state.on_matchis a single async callable. The notifier owns it; if a future feature wants two consumers (e.g., notification + analytics sink) it must fan out inside its own handler. Both cron and event paths callon_matchonce per drain.- Event scoring is pure Python:
_dotwalks 1024 floats per (intent, article) pair without numpy. At 1.0 scale (≤50 event-mode intents, batches up to ~32 articles) the cost is negligible, but a backend swap that increases dim, or scaling event-mode intents into the hundreds, will want a vectorized implementation. - Unit-normalization is a contract, not a check:
_dotequals cosine only when the embedder returns L2-normalized vectors. The matcher refuses to score whenembedder.is_unit_normalizedis False, but a backend that returns False from that property and silently emits unnormalized vectors will produce wrong scores. A new embedder must implement this property truthfully. match_seenis best-effort dedup: cron-mode writesmatch_seenbefore callingon_match. Ifon_matchfails, the tick is silently lost — the row is already marked seen and won't fire again. This is the deliberate trade-off (Risk E1 in the design): better to drop one tick than to flood the user whenon_matchrecovers from a transient error.- Group merge is first-match-wins: in
absorb, a batch group whose normalized title is ≥0.85 similar to two existing groups merges into the first one ingroup_idorder. There is no transitive cross-group merging at absorb time — that happens implicitly when the buffer is flushed and the summarizer regroups everything. - Event cache and
match_seenare independent: deleting an intent removes it from the event cache and unregisters the cron job, but does not deletematch_seenrows orevent_pendingrows. The intent FK does the cleanup at the SQLite level, so a re-created intent (same id) will not see stale dedup state. - APScheduler job lookup is by id only:
matcher-intent-{intent_id}reuses the SQLite intent id. Restarting the service while an intent's id is reused (e.g., test fixtures) will produce silent overwrites —replace_existing=Trueis the intended behavior, but it does mean an older job's args are silently replaced. - No diagnostic by default: an empty cron tick used to run two extra Qdrant queries to log whether the time filter or the threshold was the limiting factor. That probe now runs only when
SEMBR_DEBUG_MATCHERis set in the environment, because empty results are the normal case under cron mode and the extra queries multiplied Qdrant load by ~3× when many intents had nothing new this window.