feat(messaging): subscribe to live ChatEvent SSE updates #204

Merged
rawdaGastan merged 1 commit from development_messaging_sse_subscribe into development 2026-05-04 13:59:59 +00:00
Member

Summary

Closes the user-visible side of hero_archipelagos #182: the messaging archipelago no longer requires a page reload to see new messages. Recipients see incoming bubbles within ~1-2 seconds via Server-Sent Events.

This is the third and final PR in the multi-repo fix:

  1. hero_rpc #39 — generator hook + per-domain web_events.sock listener (merged).
  2. hero_osis #44 — server-side SSE broadcaster + SDK subscribe() (merged).
  3. This PR — UI consumer in the messaging archipelago.

Closes #182

Changes

archipelagos/messaging/src/archipelago.rs

After the existing fetch_chats use_effect, a new use_effect spawns a long-lived task that:

  • Calls services::subscribe_chat_events(osis_url, context_name, public_key) (thin wrapper around CommunicationClient::subscribe).
  • Loops on incoming events with reconnect-with-backoff (1s → 30s, 2x).
  • On Ok(ChatEvent::ChatmessageNew { message, .. }):
    • Updates the conversation preview row in conversations with truncated last_message, last_message_time, and (when not actively viewing the chat) unread_count += 1.
    • When viewing the chat, converts the message via services::chatmessage_to_messagedata and pushes to active_messages with the same dedupe rule as the optimistic-insert path (by message id).
  • On Ok(ChatEvent::ConversationUpdated(conv)):
    • Patches the matching preview row or inserts a new one at the head if the conversation is unknown locally.
  • On Err(SubscribeError::Resync(_)): re-fetches conversations and the active chat's messages without closing the stream.
  • On Err(SubscribeError::Disconnected): breaks the inner loop, sleeps with backoff, reconnects.
  • On Err(SubscribeError::SseError(msg)): logs via tracing::warn!, continues consuming.

Sender race fix in handle_send

When the SSE event echo of the user's own send arrived before the optimistic-insert in handle_send ran, the existing dedupe (!already_present) skipped the is_own = true override and the sender briefly saw a gray bubble for their own message. The fix:

// Race: SSE may have already pushed this message (rendered as is_own=false
// because the placeholder identity doesn't match). If so, upgrade the
// existing entry instead of skipping.
let mut upgraded = false;
active_messages.with_mut(|msgs| {
    if let Some(existing) = msgs.iter_mut().find(|m| m.id == msg_data.id) {
        existing.is_own = true;
        existing.sender_name = "Me".to_string();
        upgraded = true;
    }
});
if !upgraded {
    active_messages.write().push(msg_data);
}

archipelagos/messaging/src/services/messaging_service.rs

Adds subscribe_chat_events(osis_url, context_name, public_key) -> Result<impl Stream<Item = Result<ChatEvent, SubscribeError>> + Unpin, String> — thin wrapper around CommunicationClient::subscribe so the archipelago doesn't have to construct the client directly.

archipelagos/messaging/Cargo.toml

  • Native (cfg(not(target_arch = "wasm32"))): tokio = { version = "1", features = ["time", "rt"] } for tokio::time::sleep in the backoff loop.
  • WASM (cfg(target_arch = "wasm32")): gloo-timers = { version = "0.3", features = ["futures"] } for gloo_timers::future::TimeoutFuture in the backoff loop.

Test plan

  • cargo check -p hero_archipelagos_messaging --target x86_64-unknown-linux-gnu — clean.
  • cargo check -p hero_archipelagos_messaging --target wasm32-unknown-unknown — clean.
  • Manual end-to-end browser smoke (two profiles + full hero_proc/hero_osis/hero_router/hero_os stack):
    • User A sends a message in the active chat → User B sees the bubble appear live in their chat panel within ~1-2s, no reload.
    • User A creates a new DM → User B sees the conversation row appear at the head of their list, no reload.
    • Sender's optimistic-insert + SSE echo dedupe correctly: A sees exactly one bubble for their own send, rendered as is_own = true.
    • Conversation list preview + unread count update live for users not currently viewing the chat.

Notes

Pre-auth carve-out on the server

The server-side filter (hero_osis PR #44) currently delivers all events to anonymous subscribers (no X-Public-Key header) because the messaging archipelago still runs on the SYSTEM_KEY = "system" placeholder identity. This is documented as a TODO to flip back to fail-closed once real authentication lands. Post-auth, the participant filter on the server side becomes the only line of defence — that path is exercised by hero_osis's unit tests and remains correct for any caller that does supply a real public key.

Issue #192 is NOT addressed here

The pre-existing bug where every message renders as bubble-other (gray) after a page reload — including the user's own — is issue #192, a regression of #62. That's a separate root cause (server hardcodes sender_public_key = "system", breaking the is_own check on fetched messages) and is essentially blocked on auth landing. The race fix in this PR's handle_send keeps the just-sent bubble blue at send time, but reload-time rendering is unchanged from the existing behaviour.

## Summary Closes the user-visible side of [hero_archipelagos #182](https://forge.ourworld.tf/lhumina_code/hero_archipelagos/issues/182): the messaging archipelago no longer requires a page reload to see new messages. Recipients see incoming bubbles within ~1-2 seconds via Server-Sent Events. This is the third and final PR in the multi-repo fix: 1. ✅ [hero_rpc #39](https://forge.ourworld.tf/lhumina_code/hero_rpc/pulls/39) — generator hook + per-domain `web_events.sock` listener (merged). 2. ✅ [hero_osis #44](https://forge.ourworld.tf/lhumina_code/hero_osis/pulls/44) — server-side SSE broadcaster + SDK `subscribe()` (merged). 3. **This PR** — UI consumer in the messaging archipelago. ## Related Issue Closes https://forge.ourworld.tf/lhumina_code/hero_archipelagos/issues/182 ## Changes ### `archipelagos/messaging/src/archipelago.rs` After the existing `fetch_chats` `use_effect`, a new `use_effect` spawns a long-lived task that: - Calls `services::subscribe_chat_events(osis_url, context_name, public_key)` (thin wrapper around `CommunicationClient::subscribe`). - Loops on incoming events with **reconnect-with-backoff** (1s → 30s, 2x). - On `Ok(ChatEvent::ChatmessageNew { message, .. })`: - Updates the conversation preview row in `conversations` with truncated `last_message`, `last_message_time`, and (when not actively viewing the chat) `unread_count += 1`. - When viewing the chat, converts the message via `services::chatmessage_to_messagedata` and pushes to `active_messages` with the same dedupe rule as the optimistic-insert path (by message id). - On `Ok(ChatEvent::ConversationUpdated(conv))`: - Patches the matching preview row or inserts a new one at the head if the conversation is unknown locally. - On `Err(SubscribeError::Resync(_))`: re-fetches `conversations` and the active chat's messages without closing the stream. - On `Err(SubscribeError::Disconnected)`: breaks the inner loop, sleeps with backoff, reconnects. - On `Err(SubscribeError::SseError(msg))`: logs via `tracing::warn!`, continues consuming. ### Sender race fix in `handle_send` When the SSE event echo of the user's own send arrived **before** the optimistic-insert in `handle_send` ran, the existing dedupe (`!already_present`) skipped the `is_own = true` override and the sender briefly saw a gray bubble for their own message. The fix: ```rust // Race: SSE may have already pushed this message (rendered as is_own=false // because the placeholder identity doesn't match). If so, upgrade the // existing entry instead of skipping. let mut upgraded = false; active_messages.with_mut(|msgs| { if let Some(existing) = msgs.iter_mut().find(|m| m.id == msg_data.id) { existing.is_own = true; existing.sender_name = "Me".to_string(); upgraded = true; } }); if !upgraded { active_messages.write().push(msg_data); } ``` ### `archipelagos/messaging/src/services/messaging_service.rs` Adds `subscribe_chat_events(osis_url, context_name, public_key) -> Result<impl Stream<Item = Result<ChatEvent, SubscribeError>> + Unpin, String>` — thin wrapper around `CommunicationClient::subscribe` so the archipelago doesn't have to construct the client directly. ### `archipelagos/messaging/Cargo.toml` - Native (`cfg(not(target_arch = "wasm32"))`): `tokio = { version = "1", features = ["time", "rt"] }` for `tokio::time::sleep` in the backoff loop. - WASM (`cfg(target_arch = "wasm32")`): `gloo-timers = { version = "0.3", features = ["futures"] }` for `gloo_timers::future::TimeoutFuture` in the backoff loop. ## Test plan - [x] `cargo check -p hero_archipelagos_messaging --target x86_64-unknown-linux-gnu` — clean. - [x] `cargo check -p hero_archipelagos_messaging --target wasm32-unknown-unknown` — clean. - [x] Manual end-to-end browser smoke (two profiles + full hero_proc/hero_osis/hero_router/hero_os stack): - User A sends a message in the active chat → User B sees the bubble appear live in their chat panel within ~1-2s, no reload. - User A creates a new DM → User B sees the conversation row appear at the head of their list, no reload. - Sender's optimistic-insert + SSE echo dedupe correctly: A sees exactly **one** bubble for their own send, rendered as `is_own = true`. - Conversation list preview + unread count update live for users not currently viewing the chat. ## Notes ### Pre-auth carve-out on the server The server-side filter ([hero_osis PR #44](https://forge.ourworld.tf/lhumina_code/hero_osis/pulls/44)) currently delivers all events to anonymous subscribers (no `X-Public-Key` header) because the messaging archipelago still runs on the `SYSTEM_KEY = "system"` placeholder identity. This is documented as a `TODO` to flip back to fail-closed once real authentication lands. **Post-auth, the participant filter on the server side becomes the only line of defence** — that path is exercised by hero_osis's unit tests and remains correct for any caller that does supply a real public key. ### Issue #192 is NOT addressed here The pre-existing bug where every message renders as `bubble-other` (gray) after a page reload — including the user's own — is **issue [#192](https://forge.ourworld.tf/lhumina_code/hero_archipelagos/issues/192)**, a regression of #62. That's a separate root cause (server hardcodes `sender_public_key = "system"`, breaking the `is_own` check on fetched messages) and is essentially blocked on auth landing. The race fix in this PR's `handle_send` keeps the **just-sent** bubble blue at send time, but reload-time rendering is unchanged from the existing behaviour.
feat(messaging): subscribe to live ChatEvent SSE updates
All checks were successful
Build and Test / build (pull_request) Successful in 5m29s
ff181f8621
Recipients now see new messages live without reloading. After the
existing initial fetch, the messaging archipelago opens an SSE
subscription against hero_osis_communication's web_events.sock (via
hero_osis_sdk's CommunicationClient::subscribe).

Long-lived consumer task with reconnect-with-backoff (1s -> 30s, 2x):
- ChatmessageNew updates the conversation preview row + active
  messages signal, with sender-side dedupe upgrading the existing
  optimistic-insert entry to is_own=true to win the SSE-vs-RPC race.
- ConversationUpdated patches the matching preview row or inserts a
  new one at the head.
- Resync triggers a refetch of chats + active messages without
  closing the stream.
- Disconnected breaks the inner loop for reconnect.

#182
rawdaGastan merged commit dd4de92894 into development 2026-05-04 13:59:59 +00:00
rawdaGastan deleted branch development_messaging_sse_subscribe 2026-05-04 14:00:04 +00:00
Sign in to join this conversation.
No reviewers
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
lhumina_code/hero_archipelagos!204
No description provided.