feat(communication): server-pushed chat events via SSE #44

Merged
rawdaGastan merged 3 commits from development_messaging_sse_subscribe into development 2026-05-04 13:46:41 +00:00
Member

Summary

Server-pushed chat events over Server-Sent Events. Recipients now see new messages live without reloading the page — closing the messaging archipelago's pull-only data flow.

Wires the per-domain extension hook from hero_rpc PR #39 (now merged on development) to mount a web_events.sock on the communication domain. hero_router auto-discovers it via the standard web_<name>.sock convention, exposing /hero_osis_communication/events to browsers.

Closes (UI half — see hero_archipelagos PR for the consumer): lhumina_code/hero_archipelagos#182

Changes

crates/hero_osis_server/src/communication/server/sse.rs (new, ~480 lines incl. 11 tests)

  • ChatEvent enum: tagged JSON {type: "chatmessage.new" | "conversation.updated", data: {...}} wire format.
  • CommunicationBroadcaster: process-wide tokio::sync::broadcast (capacity 256) with Laggedresync SSE-frame conversion.
  • In-memory (conversation_sid → participant_keys) cache, primed by conversation_trigger_save_post and _get_post. chatmessage_trigger_save_post reads the cache to embed participants in the published event so the SSE delivery filter is O(participants) at delivery, no storage round-trip on the hot path.
  • set_conversation_lookup override hook for consumer crates that need a different resolution strategy.
  • events_handler reads X-Public-Key header (or ?public_key=… query fallback for web_sys::EventSource which can't set headers), filters per-subscriber, wraps as axum::response::sse::Sse with 15 s keepalive.
  • Pre-auth carve-out: anonymous callers (no public key) currently receive every event. Documented TODO to flip back to fail-closed once real authentication lands. Rationale: the messaging archipelago still runs on the SYSTEM_KEY = "system" placeholder; without the carve-out the QA repro for #182 silently drops everything at the SSE boundary.

crates/hero_osis_server/src/communication/server/{mod.rs, rpc.rs}

  • mod.rs: declares pub mod sse; and re-exports the public surface (ChatEvent, CommunicationBroadcaster, publish, subscribe, sse_router, etc.).
  • rpc.rs: trigger functions emit events (conversation_trigger_save_post, conversation_trigger_get_post, chatmessage_trigger_save_post). The _get_post cache priming closes the cold-start case where conversations created before process boot would otherwise have empty participant_keys.

crates/hero_osis_server/build.rs

  • Calls OschemaBuildConfig::with_domain_router_ext("communication", "hero_osis_server::communication::server::sse_router") so the generator emits extension: Some(sse_router()) for the communication domain. Other domains keep extension: None and behave unchanged.

crates/hero_osis_server/src/bin/hero_osis.rs

  • Auto-regenerated by build.rs to include the new extension: field on the DomainService literal for each domain.

crates/hero_osis_sdk/src/communication/{events.rs, mod.rs} (sdk/events.rs is new)

  • ChatEvent: SDK-side mirror of the server enum.
  • SubscribeError enum: Resync(u64) / Disconnected / SseError(String) (sibling enum because ClientError lives in the upstream hero_rpc_client crate).
  • CommunicationClient is now a thin wrapper struct around the generated client with Deref forwarding — all existing RPC methods keep working unchanged via deref coercion. Adds subscribe(public_key) with cfg-gated implementations:
    • Native (reqwest + tokio): text/event-stream consumer with a line-oriented SSE parser.
    • WASM (web_sys::EventSource): query-string fallback for context + public_key (EventSource can't set custom headers), Drop guard closes the underlying connection on stream drop.
  • Reconnect is the caller's responsibility — subscribe() ends with Err(SubscribeError::Disconnected) when the connection drops, no hidden retry surprises.

Cargo deps

  • hero_osis_server: adds tokio-stream (for BroadcastStream) and futures (for Stream / StreamExt).
  • hero_osis_sdk: cfg-gated additions — reqwest stream feature + bytes (native), wasm-bindgen-futures + js-sys + web-sys EventSource feature (wasm32). futures common.

Test plan

  • cargo test -p hero_osis_server -p hero_osis_sdk --lib — all 124 server tests + 5 SDK tests pass.
  • 11 communication::server::sse:: unit tests cover: broadcaster mechanics, JSON tag shape, anonymous/participant filter, trigger pipeline (save_post + get_post cache priming), cache-miss safe-fail, resolve_caller_pubkey precedence (header > query).
  • WASM cargo check -p hero_osis_sdk --target wasm32-unknown-unknown — clean.
  • Manual end-to-end browser smoke (two profiles, hero_proc + hero_osis + hero_router + hero_os): A sends → B sees the bubble appear in the active chat panel within ~1-2 s without reload. Conversation list preview + unread count also update live.

Notes

Generator regen

build.rs regenerates *_generated.rs files in every domain crate when invoked, picking up the new extension: emission from hero_rpc PR #39's generator. Those regenerated files are intentionally not included in this PR — the diff would be ~30k lines of mechanical churn (whitespace + extension: None field additions for every non-communication domain) that buries the actual SSE work.

A separate follow-up PR can land the regen snapshots if they prove necessary for CI tree-dirty checks.

Pre-existing local-dev pin

This PR doesn't touch the [patch."https://forge.ourworld.tf/lhumina_code/hero_lib.git"] block in workspace Cargo.toml (still pinned to /home/omar/hero/code0/...). That's a pre-existing local-dev artefact tracked separately.

Race fix in messaging archipelago

The companion hero_archipelagos PR includes a small race fix in archipelago.rs::handle_send — when the SSE event echo of the user's own send arrives before the optimistic-insert, the existing entry is now upgraded (is_own = true, sender_name = "Me") instead of being dedup'd out. Without that, the sender briefly saw a gray bubble for their own message because SSE won the race and chatmessage_to_messagedata defaults is_own = false under the placeholder identity scheme.

Sequencing

This is PR 2 of 3 for issue #182:

  1. hero_rpc #39 — generator hook (merged).
  2. This PR — server SSE + SDK subscribe.
  3. hero_archipelagos — UI consumer (will open after this PR merges).
## Summary Server-pushed chat events over Server-Sent Events. Recipients now see new messages live without reloading the page — closing the messaging archipelago's pull-only data flow. Wires the per-domain extension hook from [hero_rpc PR #39](https://forge.ourworld.tf/lhumina_code/hero_rpc/pulls/39) (now merged on `development`) to mount a `web_events.sock` on the `communication` domain. hero_router auto-discovers it via the standard `web_<name>.sock` convention, exposing `/hero_osis_communication/events` to browsers. ## Related Issue Closes (UI half — see hero_archipelagos PR for the consumer): https://forge.ourworld.tf/lhumina_code/hero_archipelagos/issues/182 ## Changes ### `crates/hero_osis_server/src/communication/server/sse.rs` (new, ~480 lines incl. 11 tests) - `ChatEvent` enum: tagged JSON `{type: "chatmessage.new" | "conversation.updated", data: {...}}` wire format. - `CommunicationBroadcaster`: process-wide `tokio::sync::broadcast` (capacity 256) with `Lagged` → `resync` SSE-frame conversion. - In-memory `(conversation_sid → participant_keys)` cache, primed by `conversation_trigger_save_post` and `_get_post`. `chatmessage_trigger_save_post` reads the cache to embed participants in the published event so the SSE delivery filter is O(participants) at delivery, no storage round-trip on the hot path. - `set_conversation_lookup` override hook for consumer crates that need a different resolution strategy. - `events_handler` reads `X-Public-Key` header (or `?public_key=…` query fallback for `web_sys::EventSource` which can't set headers), filters per-subscriber, wraps as `axum::response::sse::Sse` with 15 s keepalive. - **Pre-auth carve-out**: anonymous callers (no public key) currently receive every event. Documented `TODO` to flip back to fail-closed once real authentication lands. Rationale: the messaging archipelago still runs on the `SYSTEM_KEY = "system"` placeholder; without the carve-out the QA repro for #182 silently drops everything at the SSE boundary. ### `crates/hero_osis_server/src/communication/server/{mod.rs, rpc.rs}` - `mod.rs`: declares `pub mod sse;` and re-exports the public surface (`ChatEvent`, `CommunicationBroadcaster`, `publish`, `subscribe`, `sse_router`, etc.). - `rpc.rs`: trigger functions emit events (`conversation_trigger_save_post`, `conversation_trigger_get_post`, `chatmessage_trigger_save_post`). The `_get_post` cache priming closes the cold-start case where conversations created before process boot would otherwise have empty `participant_keys`. ### `crates/hero_osis_server/build.rs` - Calls `OschemaBuildConfig::with_domain_router_ext("communication", "hero_osis_server::communication::server::sse_router")` so the generator emits `extension: Some(sse_router())` for the communication domain. Other domains keep `extension: None` and behave unchanged. ### `crates/hero_osis_server/src/bin/hero_osis.rs` - Auto-regenerated by build.rs to include the new `extension:` field on the `DomainService` literal for each domain. ### `crates/hero_osis_sdk/src/communication/{events.rs, mod.rs}` (sdk/events.rs is new) - `ChatEvent`: SDK-side mirror of the server enum. - `SubscribeError` enum: `Resync(u64)` / `Disconnected` / `SseError(String)` (sibling enum because `ClientError` lives in the upstream `hero_rpc_client` crate). - `CommunicationClient` is now a thin wrapper struct around the generated client with `Deref` forwarding — all existing RPC methods keep working unchanged via deref coercion. Adds `subscribe(public_key)` with cfg-gated implementations: - **Native** (reqwest + tokio): `text/event-stream` consumer with a line-oriented SSE parser. - **WASM** (`web_sys::EventSource`): query-string fallback for context + public_key (EventSource can't set custom headers), `Drop` guard closes the underlying connection on stream drop. - Reconnect is the caller's responsibility — `subscribe()` ends with `Err(SubscribeError::Disconnected)` when the connection drops, no hidden retry surprises. ### Cargo deps - `hero_osis_server`: adds `tokio-stream` (for `BroadcastStream`) and `futures` (for `Stream` / `StreamExt`). - `hero_osis_sdk`: cfg-gated additions — `reqwest` `stream` feature + `bytes` (native), `wasm-bindgen-futures` + `js-sys` + `web-sys` `EventSource` feature (wasm32). `futures` common. ## Test plan - [x] `cargo test -p hero_osis_server -p hero_osis_sdk --lib` — all 124 server tests + 5 SDK tests pass. - [x] 11 `communication::server::sse::` unit tests cover: broadcaster mechanics, JSON tag shape, anonymous/participant filter, trigger pipeline (save_post + get_post cache priming), cache-miss safe-fail, `resolve_caller_pubkey` precedence (header > query). - [x] WASM `cargo check -p hero_osis_sdk --target wasm32-unknown-unknown` — clean. - [x] Manual end-to-end browser smoke (two profiles, hero_proc + hero_osis + hero_router + hero_os): A sends → B sees the bubble appear in the active chat panel within ~1-2 s without reload. Conversation list preview + unread count also update live. ## Notes ### Generator regen build.rs regenerates `*_generated.rs` files in every domain crate when invoked, picking up the new `extension:` emission from hero_rpc PR #39's generator. Those regenerated files are intentionally **not** included in this PR — the diff would be ~30k lines of mechanical churn (whitespace + `extension: None` field additions for every non-communication domain) that buries the actual SSE work. A separate follow-up PR can land the regen snapshots if they prove necessary for CI tree-dirty checks. ### Pre-existing local-dev pin This PR doesn't touch the `[patch."https://forge.ourworld.tf/lhumina_code/hero_lib.git"]` block in workspace `Cargo.toml` (still pinned to `/home/omar/hero/code0/...`). That's a pre-existing local-dev artefact tracked separately. ### Race fix in messaging archipelago The companion hero_archipelagos PR includes a small race fix in `archipelago.rs::handle_send` — when the SSE event echo of the user's own send arrives before the optimistic-insert, the existing entry is now upgraded (`is_own = true`, `sender_name = "Me"`) instead of being dedup'd out. Without that, the sender briefly saw a gray bubble for their own message because SSE won the race and `chatmessage_to_messagedata` defaults `is_own = false` under the placeholder identity scheme. ### Sequencing This is PR 2 of 3 for issue #182: 1. ✅ [hero_rpc #39](https://forge.ourworld.tf/lhumina_code/hero_rpc/pulls/39) — generator hook (merged). 2. **This PR** — server SSE + SDK subscribe. 3. hero_archipelagos — UI consumer (will open after this PR merges).
feat(communication): server-pushed chat events via SSE
Some checks failed
Build and Test / build (push) Failing after 19s
Build Linux / build-linux (linux-amd64-musl, false, x86_64-unknown-linux-musl) (push) Failing after 1m26s
Build and Test / build (pull_request) Failing after 17s
Build Linux / build-linux (linux-amd64-musl, false, x86_64-unknown-linux-musl) (pull_request) Failing after 1m19s
9fdc65735e
Adds GET / on the communication domain's web_events.sock streaming
chatmessage.new and conversation.updated events as JSON SSE frames.
Per-subscriber filter checks X-Public-Key against participant_keys;
falls back to delivering everything when caller is anonymous (pre-auth
dev mode, flagged for tightening once auth lands).

SDK CommunicationClient gains subscribe(public_key) on native (reqwest)
and WASM (web_sys::EventSource); existing RPC methods unchanged via Deref.

11 unit tests cover broadcaster, filter, trigger pipeline, and pubkey
resolution.

lhumina_code/hero_archipelagos#182
chore: bump hero_rpc + migrate seed bin to async MockGenerator
All checks were successful
Build and Test / build (push) Successful in 7m40s
Build and Test / build (pull_request) Successful in 7m45s
Build Linux / build-linux (linux-amd64-musl, false, x86_64-unknown-linux-musl) (push) Successful in 7m58s
Build Linux / build-linux (linux-amd64-musl, false, x86_64-unknown-linux-musl) (pull_request) Successful in 16m10s
90874d3011
- Cargo.lock: bump hero_rpc to e512c1bf (PR #39 — with_domain_router_ext)
  and herolib to b8b01eba (cascaded after PR #38).
- bin/seed.rs: MockGenerator::from_env now returns a Future after
  hero_rpc PR #38 migrated hero_rpc_osis to broker-first AiClient API;
  add the missing .await before .map_err so cargo check passes.
rawdaGastan merged commit 5409f15beb into development 2026-05-04 13:46:41 +00:00
rawdaGastan deleted branch development_messaging_sse_subscribe 2026-05-04 13:46:46 +00:00
Sign in to join this conversation.
No reviewers
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
lhumina_code/hero_osis!44
No description provided.