messaging: no live updates — recipient must reload to see new messages #182
Labels
No labels
prio_critical
prio_low
type_bug
type_contact
type_issue
type_lead
type_question
type_story
type_task
No milestone
No project
No assignees
1 participant
Notifications
Due date
No due date set.
Dependencies
No dependencies set.
Reference
lhumina_code/hero_archipelagos#182
Loading…
Add table
Add a link
Reference in a new issue
No description provided.
Delete branch "%!s()"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
Problem
The messaging archipelago is pull-only — there is no SSE / WebSocket / poll loop. After mount the conversation list and active-chat messages are fetched once; they are never refetched on their own. A user who sends a message can see it (optimistic update from the closed #45 patch), but the recipient sees nothing until they reload the page or navigate away and back.
Grep for
subscribe|interval|poll|watch|stream|sseunderarchipelagos/messaging/src/returns nothing.Repro
Why this is worse than it sounds
Combined with hero_archipelagos #62, on the next reload tab B sees the new message and its own historical messages all rendered as
bubble-other. Effectively no one can tell who sent what without auth.Suggested fix
Short-term: poll
conversation.list+conversation.list_messages(active_sid)every N seconds while the island is focused.Medium-term: add an SSE/WS subscription on the
hero_osis_communicationsocket and streamchatmessage.new/conversation.updatedevents to subscribed clients.Found via QA session 2026-04-29 (
qa/messaging_2026-04-29/FINDINGS.md).Implementation Spec for Issue #182
Objective
Replace the messaging archipelago's pull-only data flow with a server-pushed event stream so that a recipient sees new messages without reloading the page. The medium-term path is chosen: hero_osis exposes a per-domain Server-Sent Events (SSE) endpoint at
/events, thehero_osis_sdk::communication::CommunicationClientgains asubscribe()method that yields aStream<Item = Result<ChatEvent, ClientError>>, and the messaging archipelago consumes that stream after the existing initial fetch.Requirements
hero_osis_server'sOsisCommunicationdomain. Events are emitted by the existingchatmessage_trigger_save_postandconversation_trigger_save_posthooks (already hand-written inrpc.rs, lines 1501 and 1534), so no*_generated.rsfile is touched and the codegen does not need a new feature.hero_rpc/crates/server/src/server/domain_server.rs) and SSE rides over the same axum router as aGET /eventsroute — the simplest coexistence with the existingOpenRpcTransport. WebSocket would require a second protocol upgrade path that hero_router already proxies generically but the SDK does not. gRPC streaming is rejected because nothing in the stack speaks it.hero_rpc::server::DomainServer::spawn(inhero_rpc, not inhero_osis) hard-codes the four routes (/rpc,/health,/openrpc.json,/.well-known/heroservice.json) and there is noextra_routeshook today, we add one to the generator: an optionaldomain_router_extensionconfigured viaOschemaBuildConfig::with_domain_router_ext("hero_osis_server::communication::sse_router"). The extension function returns anaxum::Router<()>that ismerged in. This generator change is the only change required tohero_rpcand follows the same pattern already used bybin_ui.(context_name, conversation participation). The route handler readsX-Hero-ContextfromRequestContext::from_headers(already done indomain_server.rs), then for every event filters byconversation.participant_keys.contains(&caller_user_key). With hero_osis #40 closed,X-Hero-Contextis now respected by the data layer end-to-end; the SSE filter sits on top of that as a delivery-time check.CommunicationClient::subscribe()is hand-written incrates/hero_osis_sdk/src/communication/mod.rs(the existingpub mod communication { ... }already alongside generated code). It is not generated. Codegen today produces only request-response RPCs (seeosis_client_generated.rs's uniformrpc_callshape) and adding stream support there would touch the generator for a single domain.archipelago.rslines 297–301 is the canonical rule).Files to Modify/Create
Repo
hero_osiscrates/hero_osis_server/src/communication/server/rpc.rs— modify: emit events from the existingchatmessage_trigger_save_postandconversation_trigger_save_postlifecycle triggers (lines 1501, 1534).crates/hero_osis_server/src/communication/server/mod.rs— modify: addpub mod sse;(declares the new hand-written supplementary module).crates/hero_osis_server/src/communication/server/sse.rs— create: holds the per-contexttokio::sync::broadcastregistry, theChatEventenum, apub fn sse_router() -> axum::Router<()>, and the GET/eventshandler.crates/hero_osis_server/build.rs— modify: call.with_domain_router_ext("communication", "crate::communication::server::sse_router").crates/hero_osis_server/Cargo.toml— modify: addaxum(forSse/KeepAlive) andtokio-streamfeatures as needed;tokio = { features = [..., "sync"] }is already present transitively.crates/hero_osis_sdk/src/communication/mod.rs— modify: addpub use events::*;and the hand-writtenimpl CommunicationClient { pub async fn subscribe(...) -> impl Stream<Item = Result<ChatEvent, ClientError>> { ... } }.crates/hero_osis_sdk/src/communication/events.rs— create: holds theChatEventJSON shape (mirrors the server's enum), the parser, and the WASM-vs-native split foreventsource-style consumption.crates/hero_osis_sdk/Cargo.toml— modify: addfutures = "0.3",reqwest = { features = ["stream"] }(native),gloo-netSSE /web-sysEventSource(wasm).tests/communication_subscribe.rs— create: integration test that bootshero_osis_serverin-process, callssubscribe(), performs asend_message, and asserts the receiver gets achatmessage.newevent.Repo
hero_archipelagosarchipelagos/messaging/src/services/messaging_service.rs— modify: addpub async fn subscribe_chat_events(osis_url, context_name) -> Result<impl Stream<Item = ChatEvent>, String>thin wrapper.archipelagos/messaging/src/archipelago.rs— modify: after the existingfetch_chatsuse_effect (line 107), spawn a long-lived task that consumes the stream and updatesconversations,active_messages(filtered by currentRoute::Chat { sid, .. }), with reconnect-with-backoff.archipelagos/messaging/Cargo.toml— modify: ensurefuturesand a path-pinnedhero_osis_sdk(during dev — see Notes).Repo
hero_rpc(one upstream change required)crates/generator/src/build/build.rs— modify: addwith_domain_router_ext(domain, fqfn)builder method (mirrorsbin_ui), and store it inOschemaBuildConfig.crates/server/src/server/domain_server.rs— modify: after the existingRouter::new() ... .with_state(state);block, if arouter_extension: Option<fn() -> Router<()>>was passed via the spawn API,router = router.merge(extension()).Implementation Plan
Step 1 — Generator: add
with_domain_router_exthero_rpc/crates/generator/src/build/build.rs,hero_rpc/crates/server/src/server/domain_server.rs,hero_rpc/crates/server/src/server/spawn.rs.hero_rpc.HashMap<String, String>fielddomain_router_extsonOschemaBuildConfigand awith_domain_router_ext(domain, fqfn)builder method.let extra = <fqfn>(); router.merge(extra)if the domain has an entry.DomainServer::spawn_with_extension(...)overload acceptingOption<axum::Router>; have the existingspawndelegate to it withNone.Step 2 — Server: define
ChatEventand broadcasterhero_osis/crates/hero_osis_server/src/communication/server/sse.rs(new),hero_osis/crates/hero_osis_server/src/communication/server/mod.rs.hero_osis.pub enum ChatEvent { ChatmessageNew(ChatMessage), ConversationUpdated(Conversation) }derivingSerialize. OptionalConversationRead { sid: String, by: String }if mark-as-read should also stream.pub struct CommunicationBroadcaster { senders: RwLock<HashMap<String /* context_name */, tokio::sync::broadcast::Sender<ChatEvent>>> }exposed as aOnceLockglobal (the trigger functionschatmessage_trigger_save_postare staticfnand have no access to the domainArc, so the broadcaster must be a process-wide singleton).pub fn publish(context_name: &str, event: ChatEvent)andpub fn subscribe(context_name: &str) -> broadcast::Receiver<ChatEvent>.pub fn sse_router() -> axum::Router<()>returning a router withGET /eventsthat:X-Hero-Contextfrom headers viaRequestContext::from_headers.?subscribe=conversation:<sid>,...query params or accepts no filter (subscribes to all events visible to the context).ChatmessageNew, look up theConversationbyevent.conversation_sidand only forward ifparticipant_keys.contains(&caller_self_key); forConversationUpdated, the conversation is already in hand.axum::response::sse::Ssewith aKeepAliveheartbeat (~15s).Step 3 — Server: emit events from lifecycle triggers
hero_osis/crates/hero_osis_server/src/communication/server/rpc.rs.hero_osis.OsisCommunication::chatmessage_trigger_save_post(obj: &ChatMessage)(line 1501), emitChatmessageNew(obj.clone())viasse::publish(...). Because the trigger fn is static and has noRequestContext, publish on a global channel; the SSE handler does the per-context filter at delivery time usingConversation.participant_keysagainst the caller'sself_key. With hero_osis #40 closed, the caller'sself_keyis correctly context-isolated upstream, so this delivery-time filter is sound.conversation_trigger_save_post(obj: &Conversation)(line 1534), publishingConversationUpdated.Step 4 — SDK:
ChatEventtypes andsubscribe()hero_osis/crates/hero_osis_sdk/src/communication/events.rs(new),hero_osis/crates/hero_osis_sdk/src/communication/mod.rs,hero_osis/crates/hero_osis_sdk/Cargo.toml.hero_osis.ChatEventenum (tagged JSON shape, e.g.{ "type": "chatmessage.new", "data": ChatMessage }).reqwestwithtext/event-streamaccept, parse SSE frames intoChatEventviaserde_json, expose asimpl Stream<Item = Result<ChatEvent, ClientError>>. Endpoint:{base_url}/hero_osis_communication/rpc/events(sibling to/rpc; goes through hero_router's existing per-service proxy unchanged).web_sys::EventSourcewithwith_credentials = false, wrap theonmessagecallback into afutures::channel::mpsc::UnboundedReceiver<ChatEvent>exposed asStream.pub async fn subscribe(&self) -> Result<impl Stream<Item = Result<ChatEvent, ClientError>> + Unpin, ClientError>onCommunicationClient.Step 5 — Server unit and SDK integration tests
hero_osis/tests/communication_subscribe.rs(new),hero_osis/crates/hero_osis_server/src/communication/server/sse.rs(extend with#[cfg(test)] mod tests).hero_osis.publish, assert the receiver gets the event.hero_osisserver in-process on a temp socket dir, create aCommunicationClient, subscribe, thensend_messagefrom a second client, assert the first receives aChatmessageNewevent with the correct content.ctx_1must not receive events for conversations whose participants do not include thectx_1caller key.Step 6 — UI: subscribe-on-mount in messaging archipelago
hero_archipelagos/archipelagos/messaging/src/services/messaging_service.rs,hero_archipelagos/archipelagos/messaging/src/archipelago.rs.hero_archipelagos.messaging_service.rs, addpub async fn subscribe_chat_events(osis_url, context_name) -> Result<impl Stream<Item = ChatEvent>, String>that delegates toclient.subscribe().archipelago.rs, after the existingfetch_chatsuse_effectblock (around line 107–127), add a newuse_effectthat:ChatEvent::ChatmessageNew(msg):conversations(last_message,last_message_time = msg.created_at * 1000,unread_count += 1unless this is the active chat).conversation_sidmatches the currentRoute::Chat { sid, .. }(read fromroute.peek()), convert viaservices::chatmessage_to_messagedataand push toactive_messages— but only if!active_messages.peek().iter().any(|m| m.id == msg_data.id)(mirrors the existing dedupe at lines 297–301).ChatEvent::ConversationUpdated(conv): replace the matching preview row inconversations, or insert at the head if it's a brand-new conversation.Step 7 — Manual smoke test
hero_archipelagos.hero_osisinstance, log in as different users, send from A, observe live append on B.Acceptance Criteria
hero_osisserver exposesGET /eventson the per-domaincommunicationsocket and emits SSE frames typed asChatEvent.chatmessage.newevents are emitted wheneverchatmessage_setsucceeds, andconversation.updatedevents wheneverconversation_setsucceeds.Conversation.participant_keys(filter is enforced server-side, not client-side).hero_osis_sdk::communication::CommunicationClient::subscribe()returns aStream<Item = Result<ChatEvent, ClientError>>on both native (reqwest) and WASM (EventSource).active_messagesand an updated preview row in B's conversation list within 2 seconds, with no manual refresh.hero_osis/tests/communication_subscribe.rsexercises send → receive end-to-end via in-process server and passes in CI.ctx_1does not receive any event produced forctx_2.Notes
Multi-repo PR strategy
Three PRs, ordered: hero_rpc → hero_osis → hero_archipelagos.
Rationale: the cargo workspace dependency direction is one-way (
hero_archipelagos→hero_osis_sdk→hero_rpc). Coordinating across repos with[patch]rev pins is the standard Hero pattern (see thecargo_depsskill).Sequence:
hero_rpc: addswith_domain_router_exttoOschemaBuildConfigand the optional router extension parameter toDomainServer::spawn. Lands first; backwards-compatible (existing callers passNone).hero_osis: server SSE route + broadcaster, SDKsubscribe(), server+SDK tests. While PR 1 is in review, PR 2's branch carries a temporary[patch.'https://forge.ourworld.tf/lhumina_code/hero_rpc.git']entry.hero_archipelagos: UI consumer. While PR 2 is in review, PR 3 carries a temporary[patch.'https://forge.ourworld.tf/lhumina_code/hero_osis.git']entry pointing to PR 2's branch. The patch line is removed in the final commit before merge, once PR 2 is ondevelopment.Relationship to hero_osis #40 (closed)
hero_osis #40 (server ignored
X-Hero-Contextfor communication) was resolved. With it closed, the data layer correctly isolates by context, which means the SSE handler's delivery-time filter (participant_keys vs caller's self_key) sits on top of an already context-isolated store rather than being the only line of defense. The trigger fn signature remains&ChatMessage/&Conversation(no&RequestContext), so the broadcaster still fans out on a global channel and the SSE handler does the per-caller filter — that is intentional and correct given the closed state of #40. A future cleanup could pipe&RequestContextinto triggers via a generator change and shard the broadcaster by context, but it is not required for this issue.Why SSE and not WebSocket
reqwestandEventSourcegive plain SSE on both targets with no extra deps; WebSocket requirestokio-tungstenitenatively and theweb_sys::WebSocketceremony in WASM, plus a framing protocol.Generated vs hand-written boundary
osis_server_generated.rs,rpc_generated.rs,osis_client_generated.rs,types.rs.rpc.rs(existing trigger fns get bodies),mod.rs(server, SDK), newsse.rs, newevents.rs.OschemaBuildConfig::with_domain_router_extand theDomainServer::spawnextension parameter. Pattern mirrors the existingbin_uihook.Implementation complete — three PRs
The fix shipped as a coordinated three-PR change across hero_rpc, hero_osis, and hero_archipelagos. Each PR linked below.
Architecture summary
hero_rpc (#39) — generator-level hook so any domain server can register supplementary axum routes without modifying the per-domain spawn template.
OschemaBuildConfig::with_domain_router_ext(domain, fqfn)— new builder.DomainService.extension: Option<Router>— when set,spawn_domain_serverbinds a dedicatedweb_events.sockalongsiderpc.sockandui.sock.hero_routerauto-discovers it via the standardweb_<name>.sockconvention so browsers can reach it at/<service>/<name>(e.g./hero_osis_communication/events).hero_osis (#44) — server-side broadcaster + SSE endpoint + SDK
subscribe().crates/hero_osis_server/src/communication/server/sse.rs: tagged-JSONChatEventenum (chatmessage.new/conversation.updated), process-widetokio::sync::broadcastwithLagged → resyncSSE-frame conversion, in-memory(conversation_sid → participant_keys)cache primed byconversation_trigger_save_postand_get_post, and an axum router exposing the SSE stream at/.chatmessage_trigger_save_postreads the cache to embed participants in the published event so the SSE delivery filter is O(participants) at delivery time, no storage round-trip on the hot path.events_handlerresolves the caller's pubkey fromX-Public-Key(or query-string fallback forweb_sys::EventSourcewhich can't set headers), filters per-subscriber, wraps asaxum::response::sse::Ssewith 15s keepalive.CommunicationClient::subscribe(public_key)method on both native (reqwesttext/event-streamconsumer with line-oriented SSE parser) and WASM (web_sys::EventSourcewith query-string params + Drop guard). All existing RPC methods remain available viaDerefto the inner generated client.resolve_caller_pubkeyprecedence.hero_archipelagos (#204) — UI consumer.
fetch_chatsuse_effect, the messaging archipelago opens an SSE subscription with reconnect-with-backoff (1s → 30s, 2x).chatmessage.newevents update the conversation preview row (last_message,last_message_time,unread_count) and append toactive_messageswhen the user is currently viewing that chat. Dedupe by message id keeps the optimistic-insert clean.conversation.updatedevents patch the matching preview row or insert a new one at the head.Resynctriggers a refetch without closing the stream;Disconnectedtriggers reconnect;SseErroris logged.handle_sendupgrades the existing entry tois_own = truewhen SSE wins the race against the optimistic insert, so the sender always sees their own bubble blue.Validation
Pre-auth carve-out (TODO post-auth)
The server-side
should_deliverfilter insse.rscurrently delivers all events to anonymous subscribers (noX-Public-Key). This is the documented pre-auth carve-out: the messaging archipelago still runs on theSYSTEM_KEY = "system"placeholder, so without the carve-out the QA repro for this issue would silently drop everything at the SSE filter. ATODOis flagged inline to flip back to fail-closed once real authentication lands. The participant-filter path is fully exercised whenever the caller IS authenticated.Out of scope
*_generated.rsfiles churn after PR #38 + #39 land in hero_rpc) was deliberately not committed in PR #44 — would have buried the actual SSE work in ~30k lines of mechanical churn. It will regenerate on firstcargo buildand can be committed in a follow-up if needed.bubble-otherbecause the server hardcodessender_public_key = "system") is a separate root cause and remains open. The race fix in this PR'shandle_sendkeeps the just-sent bubble blue at send time, but reload-time rendering is unchanged.*_generated.rsfiles are emitted formatted) is deferred to a separate PR.