embedder¶
Converts article and intent text to 1024-dim BGE-M3 vectors via the SiliconFlow
/v1/embeddingsAPI. No local model, no thread pool — pure asynchttpx.
Responsibility¶
- Define the
BaseEmbedderABC so callers (matcher, intent CRUD) depend only on an interface, not a backend - Provide the
SiliconFlowEmbedderconcrete backend for the OpenAI-compatible/v1/embeddingsprotocol - Run a startup probe (
load()) that validates API key, model, and embedding dimension before flippingis_loadedto True - Drive the embed-then-upsert pipeline: pull pending articles from SQLite, embed them, upsert to Qdrant, then delete from
pending_articles - Emit per-batch observability events (
log_embed_event) for the dashboard
Not in scope¶
- Vector storage / search (lives in
vector_store) - Local-model inference (no MLX / sentence-transformers — remote API only in this release)
- LLM summary embedding (the summarizer does not embed; it generates text)
- Schema migration or article ingestion (lives in
collector+db)
Public interface¶
Base ABC (base.py)¶
class BaseEmbedder(ABC):
@property
@abstractmethod
def model_version(self) -> str: ... # persisted in payload as embedding_model_version
@property
@abstractmethod
def dim(self) -> int: ... # vector dimensionality; vector store uses this to size collections
@property
@abstractmethod
def max_input_chars(self) -> int: ... # per-text char cap the worker enforces before aembed
@property
@abstractmethod
def is_loaded(self) -> bool: ... # False until startup probe succeeds
@abstractmethod
def embed(self, texts: list[str]) -> list[list[float]]: ...
async def aembed(self, texts: list[str], *, timeout: float | None = None) -> list[list[float]]: ...
The default aembed offloads embed to a thread pool (suitable for local/CPU-bound backends). Remote backends override aembed directly and raise from embed to make the misuse loud.
SiliconFlow backend (openai_compat.py)¶
class SiliconFlowEmbedder(BaseEmbedder):
MODEL_VERSION = "bge-m3_v1"
EXPECTED_DIM = 1024
MAX_INPUT_CHARS = 8_000 # BGE-M3 8192-token ctx, conservative for pure-Chinese
def __init__(*, api_key, base_url='https://api.siliconflow.cn/v1',
model='BAAI/bge-m3', timeout=30.0)
async def load() -> None # idempotent probe; never raises
async def aembed(texts, *, timeout=None) # raises if not loaded
async def aclose() -> None # for shutdown / restart
@property
def status -> Literal["loading", "ok", "error", "closed"]
__init__ rejects any model whose name does not contain bge-m3 — the class is bge-m3-specific (1024-dim probe, fixed MODEL_VERSION). Other models need their own subclass.
load() never raises: a failed probe sets status="error" and leaves is_loaded=False so the caller's /health endpoint reports 503 indefinitely until operators fix the credentials / network.
aclose() sets status="closed" (not "error") so observability can distinguish a deliberate shutdown from a probe failure.
Errors raised from aembed:
| Exception | Meaning | Operator action |
|---|---|---|
EmbedderTransportError |
non-2xx HTTP, connect/read failure | check API key, network, SiliconFlow status |
EmbedderSchemaError |
response missing data / wrong shape |
sembr bug or incompatible endpoint |
RuntimeError("embedder not loaded") |
called before load() succeeded |
fix the upstream cause; the probe is gated on /health |
API key is stripped from logged response bodies (safe_body = response.text[:200].replace(self._api_key, "***")).
Factory (factory.py)¶
Selects backend by settings.embedder_backend. Only "siliconflow" is registered today; future backends (Voyage / Jina) are anticipated but not implemented. Raises ValueError on empty API key or unknown backend.
Embed-then-upsert worker (scheduler.py)¶
def add_embedder_worker_job(scheduler, embedder, qdrant, app=None) -> None
async def embedder_worker(embedder, qdrant, app=None) -> None
add_embedder_worker_job registers an IntervalTrigger(seconds=30) job under id embedder_worker with coalesce=True, max_instances=1 and an initial delay of 30 s (gives load() time to settle).
embedder_worker is the coroutine. One tick = one batch:
- Skip silently if
embedder.is_loadedis False (no event row written — only actual call attempts produce events) pull_pending_batch(BATCH_SIZE=32, MAX_ATTEMPTS=3)— empty queue is a no-op, no event row- Truncate each
(title + "\n\n" + body)toembedder.max_input_chars(8000 for the bge-m3 backend) - Compute dynamic timeout
max(30, total_chars / 1500)— ~1500 chars/sec is a conservative throughput floor that covers server queueing + BGE-M3 forward + RTT embedder.aembed(texts, timeout=...)— on failure:increment_retry; if retry+1 ≥ MAX_ATTEMPTS for a row,demote_md5s_to_deadfor that row only (per-row attribution, not whole batch)- Build
PointStructwith deterministic UUID =UUID(hex=md5)so re-runs are idempotent qdrant.upsert(collection_name="news_current", points, wait=True)— transienthttpx.ConnectError/TimeoutException→ log + return without incrementing the retry counter (next tick retries); other errors → increment retry, demote if at limit- Emit success event row with batch size + total chars + timeout
- If
appis passed, fireevent_match_batchbefore delete (event-driven matching path) delete_pending(md5s)— failure here is logged but swallowed; the deterministic UUID makes the next tick's re-embed safe
Module constants:
| Constant | Value | Why |
|---|---|---|
BATCH_SIZE |
32 | SiliconFlow single-request input cap |
MAX_ATTEMPTS |
3 | Embed+upsert attempts before demote to dead_articles |
POLL_INTERVAL_SECONDS |
30 | Interval trigger; matches the matcher cadence |
ALIAS_NAME |
"news_current" |
Qdrant alias targeted; actual collection switched at upgrade time |
Configuration¶
pydantic-settings fields (see sembr/config.py):
| Field | Default | Notes |
|---|---|---|
embedder_backend |
"siliconflow" |
only this value is registered |
embedder_api_base_url |
https://api.siliconflow.cn/v1 |
non-HTTPS / non-localhost emits a startup warning (cleartext key risk) |
embedder_api_key |
"" (SecretStr) |
empty fails fast in build_embedder |
embedder_model |
BAAI/bge-m3 |
must contain bge-m3; assertion in __init__ |
embedder_timeout_seconds |
30.0 |
probe timeout + httpx default; batch path overrides via dynamic calculation |
Upstream dependencies¶
db.articles—pull_pending_batch,delete_pending,increment_retry,demote_md5s_to_deaddb.sqlite—get_connfor the shared aiosqlite connectiondashboard.events.log_embed_event— best-effort observability (failures never poison the worker)
Downstream consumers¶
vector_store.qdrant.QdrantHandle— receivesupsertcalls withnews_currentaliasmatcher.event_match.event_match_batch— invoked synchronously after Qdrant upsert whenappis passed (event-driven matching path). The import is local to break a circular dependency betweenembedderandmatcher; the planned long-term fix is to invert the direction via the in-process log bus so the embedder only emits events and the matcher subscribes.api.healthanddashboard.read_model— readembedder.statusfor/healthand dashboard render
Known constraints¶
- bge-m3 only:
MODEL_VERSIONandEXPECTED_DIM=1024are class constants; swapping models requires a new subclass, not a config flip. - Non-HTTPS warning, not a hard fail: localhost /
127.xare allowed for development; everything else triggers a one-time warning that the API key will travel cleartext. IPv6 localhost (http://[::1]) is not whitelisted. - No retry on probe failure:
load()runs the probe exactly once. A bad key or network outage stays asstatus="error"until the process restarts — by design, so operators see persistent 503 instead of a silent retry loop. aembedempty input → empty output: returns[]without a network call, buttexts=[""](single empty string) does call SiliconFlow.- Idempotency relies on deterministic UUID:
_md5_to_uuid(md5)is the Qdrant point id. If the article md5 changes (it shouldn't — md5 isMD5(url + title)), the same article would write a duplicate point. - Worker max_instances=1: long-running batches block the next tick. With
total_chars/1500per batch andBATCH_SIZE=32 × 8000 chars = 171 sworst case, the 30 s tick can drift butcoalesce=Trueprevents backlog. - Event-driven matching coupling:
embedder_workercalls intomatchervia a local import to avoid a top-level cycle. This is a deliberate, documented smell — the matcher should subscribe to embedder-emitted events through the log bus rather than be invoked synchronously. Until that flip lands, treat the local import as load-bearing, not accidental.