logbus¶
In-process log capture, tagging, ring-buffering, and asyncio fan-out. The dashboard's Logs tab subscribes to it via Server-Sent Events to show a live, tag-filtered stream without any external broker.
Responsibility¶
- Capture every
logging.LogRecordin the process via alogging.Handlerattached to the root logger - Route each record to one of seven UI tags (
collector,embedder,matcher,notifier,api,scheduler,http) by logger-name prefix - Keep the most recent N records per tag in an in-memory ring buffer so a freshly-opened dashboard tab can show context without round-tripping to disk
- Fan out new records to async subscribers (one queue per dashboard SSE connection) without blocking the logging thread
- Expose runtime per-tag level changes (e.g. raise
embedderto DEBUG for ten minutes during an incident) without persisting them — process restart returns to the configured default
Not in scope¶
- Persisting logs to disk or shipping them to an external system (stdout/stderr
StreamHandlercontinues to do that independently) - Cross-process aggregation — each FastAPI worker has its own bus
- Authentication or authorization on the SSE stream — that's the dashboard router's job
- Structured-logging formatting (JSON, OTel) — records are stored as plain Python dicts
Architecture at a glance¶
logging.LogRecord
│
▼
RingBufferHandler.emit (any thread — sync logging API)
│
▼ router.route(record) → tag
LogBus.emit(tag, entry) (still on the calling thread)
│ ├── deque[tag].append(entry) (lock held ~µs)
│ └── for each subscriber whose tag filter matches:
│ loop.call_soon_threadsafe(_put_drop_oldest, q, entry)
▼
asyncio.Queue per subscriber (only the matching tag, never others)
│
▼
SSE generator (dashboard) (async — no work on logging thread)
Public interface¶
Tag routing (router.py)¶
ALL_TAGS: tuple[str, ...] = (
"collector", "embedder", "matcher", "notifier",
"api", "scheduler", "http",
)
TAG_PREFIX_MAP: list[tuple[str, str]] # logger-name prefix → tag
THIRD_PARTY_LOGGERS_BY_TAG: dict[str, tuple[str, ...]]
def route(record: logging.LogRecord) -> str: ...
route() walks TAG_PREFIX_MAP sorted by prefix length descending — the longest match always wins, so the order in which entries are written in source is not load-bearing. An unknown logger name falls back to the api tag.
THIRD_PARTY_LOGGERS_BY_TAG is the single source of truth for stdlib loggers (httpx, httpcore, uvicorn.access) whose level must follow a UI tag. install_logbus() reads it at startup to silence them at WARNING; the dashboard's PUT /api/dashboard/logs/level endpoint reads the same map to resync them when the operator raises a tag.
Note that summarizer is intentionally tagged as matcher and vector_store / db / dashboard are tagged as api. The seven tags exist to give an operator a small, fixed set of streams to switch between in the UI; further granularity would clutter the tab strip without adding diagnostic value.
Ring buffer + fan-out (bus.py)¶
class LogBus:
def emit(self, tag: str, entry: dict[str, Any]) -> None: ...
def subscribe(
self,
q: asyncio.Queue[dict[str, Any] | None],
*,
tag: str | None = None,
) -> list[dict[str, Any]]: ...
def unsubscribe(self, q: asyncio.Queue) -> None: ...
def set_tag_level(self, tag: str, level: int) -> None: ...
def get_tag_levels(self) -> dict[str, int]: ...
def tag_info(self) -> list[dict[str, Any]]: ...
def get_bus() -> LogBus: ... # process-wide singleton
emit() is called from arbitrary threads (the logging machinery is sync). It:
- Bails out silently if no event loop has been registered yet (pre-lifespan-start records — there's nothing to fan out to)
- Drops the entry if its level is below the tag's current threshold
- Appends to the per-tag deque (
maxlen = buffer_per_tag, FIFO eviction) - For each subscriber whose tag filter is
Noneor equalstag, schedules_put_drop_oldeston the loop viacall_soon_threadsafe
The deque append and the fan-out scheduling happen under a single threading.Lock. The lock hold is microseconds (a dict lookup, a deque append, and N call_soon_threadsafe calls) and is required to keep the deque-write and subscriber-snapshot consistent.
subscribe(q, tag="embedder") is the important shape for SSE consumers: the snapshot is just that tag's deque (no flatten, no sort) and the subscriber's queue subsequently only receives entries for that one tag. A subscriber that streams embedder does not pay for traffic on the other six tags. subscribe(q) (no tag) is preserved for tests and any future "raw firehose" consumer; in that mode the snapshot flattens all tag deques and is sorted by timestamp outside the lock.
_put_drop_oldest is the queue-write helper. If the subscriber's queue is full it evicts the oldest entry and tries the new one — preferring to lose stale history rather than block emit. If the queue stays full after the eviction (two concurrent emits both racing the same full queue) it drops the new entry. Subscribers see an unsignalled gap, never a stalled stream.
Handler (handler.py)¶
Set to logging.DEBUG so every record reaches the bus; per-tag level filtering happens inside emit() so the operator can raise/lower a tag at runtime without changing handler state. Records carry their level_no through into the bus entry, which is the value the per-tag threshold compares against.
Lifespan installation (install.py)¶
def install_logbus(
loop: asyncio.AbstractEventLoop,
*,
buffer_per_tag: int = 1000,
default_level: int = logging.INFO,
) -> None: ...
Called as the first line of the FastAPI lifespan coroutine. Steps it performs, in order:
- Hand the running event loop to the bus (so
emit()knows where to schedule fan-out) - Resize all tag deques to
buffer_per_tag - Set every tag's level to
default_level - Silence each logger named in
THIRD_PARTY_LOGGERS_BY_TAG.values()to WARNING (httpx / httpcore / uvicorn.access — they are extremely chatty at INFO) - If a
RingBufferHandleris already attached to the root logger, return early (idempotent on test reruns within the same process) - Pin any pre-existing root
StreamHandlerwhose level isNOTSETtoINFO, before lowering the root logger toDEBUG. Without this step, lowering the root would let DEBUG records flood docker logs / stderr through the inherited stream handler - Attach a fresh
RingBufferHandlerand lower root toDEBUG
Configuration¶
| Field | Default | Notes |
|---|---|---|
dashboard_log_buffer_per_tag |
1000 |
Records retained per tag in the ring buffer. Bounded [100, 10000]. Memory is roughly 7 × buffer_per_tag × ~500 B, so the max sits around 35 MB |
dashboard_log_level |
"INFO" |
One of DEBUG / INFO / WARNING / ERROR. Applied to all 7 tags at startup; the dashboard PUT /level endpoint can adjust each tag at runtime |
Runtime tag-level changes via PUT /api/dashboard/logs/level are process-memory only — they survive until the next restart, then the dashboard_log_level default reapplies. This is intentional: the bus is a debugging affordance, not a configuration surface.
Upstream dependencies¶
- Python's
loggingmodule (root logger, handler attachment) - The asyncio event loop owned by FastAPI's lifespan
That's it. The bus is deliberately self-contained.
Downstream consumers¶
dashboard.logs_routes—GET /api/dashboard/logs/stream?tag=...opens an SSE connection that callsbus.subscribe(q, tag=tag).GET /tagsreportsbus.tag_info().PUT /levelcallsbus.set_tag_level(...)- Every other module logs through standard
logging.getLogger(__name__)and is unaware that a bus exists; capture is via root-handler attachment
Known constraints¶
- Single-process only: the bus is an in-memory ring per Python process. Running multiple uvicorn workers means the dashboard sees only the worker that fielded the SSE request. The 1.0 topology is single-process, so this is acceptable. A multi-worker deployment needs an external aggregator (Vector, Loki, etc.) — at which point the bus's role narrows to "live preview within one worker"
- No persistence: a process restart drops the ring buffer and all runtime tag-level overrides. Anyone investigating a yesterday-night incident reads the underlying stream handler's destination (
docker logs/ stderr / the file sink), not the bus - Drop-oldest under backpressure: a subscriber that can't keep up with
emit()loses history transparently. There is no "lagged" indication on the wire. The 2000-slot subscriber queue paired with a 1 s SSE poll interval gives several seconds of slack before an eviction happens; in practice we have not seen evictions outside synthetic stress tests - Pre-lifespan records are silently dropped: anything logged before
install_logbus()runs (e.g., during top-level imports) does not reach the bus because no event loop is registered yet. Module-level code should keep its log volume to a minimum — and operationally, those records still go to the existing stream handler / docker logs - Tag granularity is fixed at seven: adding an eighth tag means changing
ALL_TAGS,TAG_PREFIX_MAP, the dashboardTagNameenum, and the dashboard's tab strip. The choice is deliberate — a small fixed set keeps the UI readable