PTY WebSocket select! leaks three of four spawned tasks on disconnect #50

Closed
opened 2026-04-22 14:55:30 +00:00 by rawdaGastan · 5 comments
Member

Category: resource / low-medium

What

The PTY WebSocket proxy spawns four concurrent tasks and races them with tokio::select!. When one completes (e.g. the browser closes the WebSocket), the select! returns, but the other three tasks are not aborted — dropping a JoinHandle does not cancel the task. They continue running until their channels naturally drain.

Where

crates/hero_router/src/server/terminal.rs:618-632

tokio::select! {
    _ = browser_to_server => {}
    _ = bytes_bridge => {}
    _ = forwarder => {}
    _ = server_to_browser => {}
}

// Unregister regardless of which side tore down first.
{
    let mut map = attached().lock().await;
    map.remove(&name);
}

Ok(())

The four tasks are:

Task Exits when
browser_to_server browser WS stream yields None/Err
bytes_bridge all senders into bytes_tx are dropped
forwarder msg_tx is dropped (owns server_sink)
server_to_browser hero_proc WS server stream yields None/Err

The channels are shared between tasks, so even after one task exits the others may keep their senders alive — e.g. if browser_to_server dies first, forwarder still holds server_sink and keeps writing to hero_proc's WS socket until msg_tx is dropped. Since msg_tx is captured by browser_to_server (which is finishing) and bytes_bridge, it takes both to drop before forwarder unblocks.

Why it's wrong

  • Under normal use this self-heals within a few hundred ms. Under session churn (user rapidly reconnecting, or a dead connection that times out at tungstenite's keepalive layer) three tasks linger per dead session, each holding a WS handle, a Vec buffer, and an Arc into the session map. The map entry is removed on L628, but the tasks keep live references to their captured variables.
  • Worse: the tasks were spawned before map.remove(&name), so they continue operating on a session that no longer exists in the registry. A resize message or inject_reply() racing with cleanup can briefly find the entry gone and fail-fast.
  • If the upstream hero_proc WS is wedged (e.g. network partition), server_to_browser can block forever on .next().await with no way to unstick it.

Reproduction

Open 50 terminal sessions in a loop, close each immediately after connecting:

for i in $(seq 50); do
  (timeout 1 websocat "ws://127.0.0.1:9988/api/terminal/foo${i}/ws" >/dev/null &)
done
wait
# `ps` / `tokio-console` will show lingering tasks for several seconds.

Suggested fix

Collect the JoinHandles, then on the winning arm .abort() the rest:

let mut b2s = browser_to_server;
let mut bb  = bytes_bridge;
let mut fw  = forwarder;
let mut s2b = server_to_browser;

tokio::select! {
    _ = &mut b2s => { bb.abort(); fw.abort(); s2b.abort(); }
    _ = &mut bb  => { b2s.abort(); fw.abort(); s2b.abort(); }
    _ = &mut fw  => { b2s.abort(); bb.abort(); s2b.abort(); }
    _ = &mut s2b => { b2s.abort(); bb.abort(); fw.abort(); }
}

Or use a CancellationToken shared across all four tasks (cleaner if the task count grows).

Either way, explicitly drop msg_tx / bytes_tx before the select! (whoever doesn't need to send anymore) so the other side naturally unblocks too.

## Category: resource / **low-medium** ## What The PTY WebSocket proxy spawns four concurrent tasks and races them with `tokio::select!`. When **one** completes (e.g. the browser closes the WebSocket), the `select!` returns, but the other three tasks are **not aborted** — dropping a `JoinHandle` does not cancel the task. They continue running until their channels naturally drain. ## Where [`crates/hero_router/src/server/terminal.rs:618-632`](src/server/terminal.rs#L618) ```rust tokio::select! { _ = browser_to_server => {} _ = bytes_bridge => {} _ = forwarder => {} _ = server_to_browser => {} } // Unregister regardless of which side tore down first. { let mut map = attached().lock().await; map.remove(&name); } Ok(()) ``` The four tasks are: | Task | Exits when | |---|---| | `browser_to_server` | browser WS stream yields `None`/`Err` | | `bytes_bridge` | all senders into `bytes_tx` are dropped | | `forwarder` | `msg_tx` is dropped (owns `server_sink`) | | `server_to_browser` | hero_proc WS server stream yields `None`/`Err` | The channels are shared between tasks, so even after one task exits the others may keep their senders alive — e.g. if `browser_to_server` dies first, `forwarder` still holds `server_sink` and keeps writing to hero_proc's WS socket until `msg_tx` is dropped. Since `msg_tx` is captured by `browser_to_server` (which is finishing) **and** `bytes_bridge`, it takes *both* to drop before `forwarder` unblocks. ## Why it's wrong - Under normal use this self-heals within a few hundred ms. Under session churn (user rapidly reconnecting, or a dead connection that times out at tungstenite's keepalive layer) three tasks linger per dead session, each holding a WS handle, a Vec buffer, and an `Arc` into the session map. The map entry is removed on L628, but the tasks keep live references to their captured variables. - Worse: the tasks were spawned before `map.remove(&name)`, so they continue operating on a session that no longer exists in the registry. A resize message or `inject_reply()` racing with cleanup can briefly find the entry gone and fail-fast. - If the upstream hero_proc WS is wedged (e.g. network partition), `server_to_browser` can block forever on `.next().await` with no way to unstick it. ## Reproduction Open 50 terminal sessions in a loop, close each immediately after connecting: ```bash for i in $(seq 50); do (timeout 1 websocat "ws://127.0.0.1:9988/api/terminal/foo${i}/ws" >/dev/null &) done wait # `ps` / `tokio-console` will show lingering tasks for several seconds. ``` ## Suggested fix Collect the `JoinHandle`s, then on the winning arm `.abort()` the rest: ```rust let mut b2s = browser_to_server; let mut bb = bytes_bridge; let mut fw = forwarder; let mut s2b = server_to_browser; tokio::select! { _ = &mut b2s => { bb.abort(); fw.abort(); s2b.abort(); } _ = &mut bb => { b2s.abort(); fw.abort(); s2b.abort(); } _ = &mut fw => { b2s.abort(); bb.abort(); s2b.abort(); } _ = &mut s2b => { b2s.abort(); bb.abort(); fw.abort(); } } ``` Or use a `CancellationToken` shared across all four tasks (cleaner if the task count grows). Either way, explicitly drop `msg_tx` / `bytes_tx` before the `select!` (whoever doesn't need to send anymore) so the other side naturally unblocks too.
rawdaGastan added this to the ACTIVE project 2026-04-23 11:14:26 +00:00
Author
Member

Implementation Spec for Issue #50

Objective

Abort the three remaining pty_proxy tasks when any one of the four completes, so WebSocket handles, channel senders, and captured buffers are released promptly on disconnect instead of lingering across session churn.

Requirements

  • When any one of browser_to_server, bytes_bridge, forwarder, or server_to_browser finishes, the other three MUST be aborted.
  • After select! returns, the four handles must be awaited (results ignored) so each task's captured stack frame — including server_sink, browser_sink, browser_source, server_source, and msg_tx / reply_tx clones — drops before pty_proxy returns. This prevents the tasks from outliving the function.
  • The existing attached().remove(&name) call must stay after the select! (and can run concurrently with or just before the final awaits). It must NOT be moved before select! — the map entry is actively read during the session by inject_reply (external, via terminal.reply RPC) and by browser_to_server (internal, for resize state at line 493). Moving the remove earlier would silently break CPR/DA reply forwarding.
  • No new crates: use tokio::pin! + JoinHandle::abort(). Confirmed tokio_util (which would provide CancellationToken) is not a dependency of hero_router.
  • Behavior preserved: pty_proxy's return type (Result<()>), error handling in the caller (pty_handler), and the JOB_NAME_PREFIX/session-registry contract are all unchanged.

Files to Modify/Create

  • crates/hero_router/src/server/terminal.rs — rewrite the tokio::select! block at roughly lines 617-623 to pin handles and abort siblings on the winning arm; follow with sequential .await on all four handles so captured resources drop; keep the attached().remove(&name) block where it is today (just before the trailing Ok(())).

Implementation Plan

Step 1: Rewrite the select-and-cleanup tail of pty_proxy

Files: crates/hero_router/src/server/terminal.rs

Current shape (lines 617-631):

tokio::select! {
    _ = browser_to_server => {}
    _ = bytes_bridge => {}
    _ = forwarder => {}
    _ = server_to_browser => {}
}

// Unregister regardless of which side tore down first.
{
    let mut map = attached().lock().await;
    map.remove(&name);
}

Ok(())

Replace with:

// When any one of the four tasks completes, abort the other three so
// their captured WebSocket sinks/sources and channel senders are released
// promptly. Awaiting dropped JoinHandles does NOT abort the underlying
// tasks (issue #50).
tokio::pin!(browser_to_server, bytes_bridge, forwarder, server_to_browser);

tokio::select! {
    _ = &mut browser_to_server => {
        bytes_bridge.abort();
        forwarder.abort();
        server_to_browser.abort();
    }
    _ = &mut bytes_bridge => {
        browser_to_server.abort();
        forwarder.abort();
        server_to_browser.abort();
    }
    _ = &mut forwarder => {
        browser_to_server.abort();
        bytes_bridge.abort();
        server_to_browser.abort();
    }
    _ = &mut server_to_browser => {
        browser_to_server.abort();
        bytes_bridge.abort();
        forwarder.abort();
    }
}

// Unregister regardless of which side tore down first. Staying after the
// select is intentional: `inject_reply` (called from `terminal.reply` RPC)
// and the internal resize handler both need the map entry to be present
// for the duration of the session.
{
    let mut map = attached().lock().await;
    map.remove(&name);
}

// Wait for the aborted handles to finish unwinding so their captured
// stack frames drop before pty_proxy returns. `JoinError::is_cancelled()`
// on the aborted tasks is expected and intentionally ignored.
let _ = browser_to_server.await;
let _ = bytes_bridge.await;
let _ = forwarder.await;
let _ = server_to_browser.await;

Ok(())

Subtasks:

  • Replace the 14-line block at lines 617-631 of crates/hero_router/src/server/terminal.rs with the block above.
  • No changes anywhere else in the file (imports stay as-is; tokio::pin! is a macro re-export from tokio which is already fully-featured in this crate).
  • Do NOT move the attached().remove(&name) call to before the select!.
  • Do NOT introduce tokio_util::sync::CancellationToken or add any dependency to Cargo.toml.

Dependencies: none.

Step 2: Tests

Files: none (no new test file is practical).

Rationale: the four tasks close over WebSocket sinks / SplitSink<WebSocketStream<..>> types that are not trivially mockable, and attached() is a process-global OnceLock. A meaningful unit test would require an in-process axum WebSocket harness plus a tokio-tungstenite echo server, which is heavier than the fix itself. The change is verifiable by inspection — each select! arm demonstrably aborts exactly the three sibling handles.

For the reviewer, a light manual smoke test:

# Open a session, connect WS, close immediately, repeat rapidly.
for i in $(seq 50); do
  curl -sX POST unix-socket ~/hero/var/sockets/hero_router/rpc.sock \
       -d '{"jsonrpc":"2.0","id":1,"method":"terminal.create","params":["t'$i'"]}'
done
# Open each terminal in a browser tab, close each after first keystroke.
# Watch /proc/<pid>/task count (or `tokio-console`) — should not grow
# monotonically across sessions.

Dependencies: Step 1.

Acceptance Criteria

  • When one of the four tasks exits, the other three are aborted. Verified by inspection: each of the four select! arms calls .abort() on the other three JoinHandles.
  • Every aborted handle is awaited (let _ = ...await) so captured resources (server_sink, browser_sink, browser_source, server_source, and all sender clones) drop before pty_proxy returns.
  • attached().remove(&name) stays AFTER the select! — inject_reply and the resize handler continue to see the session during its full lifetime.
  • Cargo.toml is unchanged (no new crate dependencies).
  • cargo test --workspace passes.
  • cargo clippy --workspace --all-targets -- -D warnings passes.
  • Diff scope: changes confined to the tail of pty_proxy (lines 617-631). No changes elsewhere.

Notes

  • JoinHandle::abort() is synchronous and idempotent. Calling it on a handle whose task already completed normally (the winner) is a no-op. Awaiting an aborted task returns Err(JoinError) with is_cancelled() == true; we discard via let _ = ...await.
  • Whether to also drop(reply_tx) / drop(msg_tx) on the parent stack before the select! was considered. With .abort() in place, these drops are redundant — aborts forcibly terminate downstream tasks regardless of channel state, and the parent senders are dropped implicitly when pty_proxy returns. Skipping the explicit drops keeps the diff minimal.
  • Why not tokio_util::sync::CancellationToken: the dependency would be new to this crate, and the 4-task case is cleanly expressed with tokio::pin! + .abort(). If the task count grows or nested cancellation is needed in the future, migration to CancellationToken is straightforward.
  • The inject_reply race window is unchanged by this fix — it matches today's behavior. Once the aborts land and bytes_bridge is cancelled, any in-flight inject_reply that already cloned the sender will have its send silently dropped when the unbounded mpsc receiver is gone. inject_reply already returns Err("no active PTY for session '...'") when the map entry is missing, which callers (the terminal.reply RPC) already handle as a soft error.
## Implementation Spec for Issue #50 ### Objective Abort the three remaining `pty_proxy` tasks when any one of the four completes, so WebSocket handles, channel senders, and captured buffers are released promptly on disconnect instead of lingering across session churn. ### Requirements - When any one of `browser_to_server`, `bytes_bridge`, `forwarder`, or `server_to_browser` finishes, the other three MUST be aborted. - After `select!` returns, the four handles must be awaited (results ignored) so each task's captured stack frame — including `server_sink`, `browser_sink`, `browser_source`, `server_source`, and `msg_tx` / `reply_tx` clones — drops before `pty_proxy` returns. This prevents the tasks from outliving the function. - The existing `attached().remove(&name)` call must stay **after** the `select!` (and can run concurrently with or just before the final `await`s). It must NOT be moved before `select!` — the map entry is actively read during the session by `inject_reply` (external, via `terminal.reply` RPC) and by `browser_to_server` (internal, for resize state at line 493). Moving the remove earlier would silently break CPR/DA reply forwarding. - No new crates: use `tokio::pin!` + `JoinHandle::abort()`. Confirmed `tokio_util` (which would provide `CancellationToken`) is not a dependency of `hero_router`. - Behavior preserved: `pty_proxy`'s return type (`Result<()>`), error handling in the caller (`pty_handler`), and the `JOB_NAME_PREFIX`/session-registry contract are all unchanged. ### Files to Modify/Create - `crates/hero_router/src/server/terminal.rs` — rewrite the `tokio::select!` block at roughly lines 617-623 to pin handles and abort siblings on the winning arm; follow with sequential `.await` on all four handles so captured resources drop; keep the `attached().remove(&name)` block where it is today (just before the trailing `Ok(())`). ### Implementation Plan #### Step 1: Rewrite the select-and-cleanup tail of `pty_proxy` Files: `crates/hero_router/src/server/terminal.rs` Current shape (lines 617-631): ```rust tokio::select! { _ = browser_to_server => {} _ = bytes_bridge => {} _ = forwarder => {} _ = server_to_browser => {} } // Unregister regardless of which side tore down first. { let mut map = attached().lock().await; map.remove(&name); } Ok(()) ``` Replace with: ```rust // When any one of the four tasks completes, abort the other three so // their captured WebSocket sinks/sources and channel senders are released // promptly. Awaiting dropped JoinHandles does NOT abort the underlying // tasks (issue #50). tokio::pin!(browser_to_server, bytes_bridge, forwarder, server_to_browser); tokio::select! { _ = &mut browser_to_server => { bytes_bridge.abort(); forwarder.abort(); server_to_browser.abort(); } _ = &mut bytes_bridge => { browser_to_server.abort(); forwarder.abort(); server_to_browser.abort(); } _ = &mut forwarder => { browser_to_server.abort(); bytes_bridge.abort(); server_to_browser.abort(); } _ = &mut server_to_browser => { browser_to_server.abort(); bytes_bridge.abort(); forwarder.abort(); } } // Unregister regardless of which side tore down first. Staying after the // select is intentional: `inject_reply` (called from `terminal.reply` RPC) // and the internal resize handler both need the map entry to be present // for the duration of the session. { let mut map = attached().lock().await; map.remove(&name); } // Wait for the aborted handles to finish unwinding so their captured // stack frames drop before pty_proxy returns. `JoinError::is_cancelled()` // on the aborted tasks is expected and intentionally ignored. let _ = browser_to_server.await; let _ = bytes_bridge.await; let _ = forwarder.await; let _ = server_to_browser.await; Ok(()) ``` Subtasks: - Replace the 14-line block at lines 617-631 of `crates/hero_router/src/server/terminal.rs` with the block above. - No changes anywhere else in the file (imports stay as-is; `tokio::pin!` is a macro re-export from `tokio` which is already fully-featured in this crate). - Do NOT move the `attached().remove(&name)` call to before the `select!`. - Do NOT introduce `tokio_util::sync::CancellationToken` or add any dependency to `Cargo.toml`. Dependencies: none. #### Step 2: Tests Files: none (no new test file is practical). Rationale: the four tasks close over `WebSocket` sinks / `SplitSink<WebSocketStream<..>>` types that are not trivially mockable, and `attached()` is a process-global `OnceLock`. A meaningful unit test would require an in-process axum WebSocket harness plus a tokio-tungstenite echo server, which is heavier than the fix itself. The change is verifiable by inspection — each `select!` arm demonstrably aborts exactly the three sibling handles. For the reviewer, a light manual smoke test: ``` # Open a session, connect WS, close immediately, repeat rapidly. for i in $(seq 50); do curl -sX POST unix-socket ~/hero/var/sockets/hero_router/rpc.sock \ -d '{"jsonrpc":"2.0","id":1,"method":"terminal.create","params":["t'$i'"]}' done # Open each terminal in a browser tab, close each after first keystroke. # Watch /proc/<pid>/task count (or `tokio-console`) — should not grow # monotonically across sessions. ``` Dependencies: Step 1. ### Acceptance Criteria - [ ] When one of the four tasks exits, the other three are aborted. Verified by inspection: each of the four `select!` arms calls `.abort()` on the other three `JoinHandle`s. - [ ] Every aborted handle is awaited (`let _ = ...await`) so captured resources (`server_sink`, `browser_sink`, `browser_source`, `server_source`, and all sender clones) drop before `pty_proxy` returns. - [ ] `attached().remove(&name)` stays AFTER the `select!` — inject_reply and the resize handler continue to see the session during its full lifetime. - [ ] `Cargo.toml` is unchanged (no new crate dependencies). - [ ] `cargo test --workspace` passes. - [ ] `cargo clippy --workspace --all-targets -- -D warnings` passes. - [ ] Diff scope: changes confined to the tail of `pty_proxy` (lines 617-631). No changes elsewhere. ### Notes - `JoinHandle::abort()` is synchronous and idempotent. Calling it on a handle whose task already completed normally (the winner) is a no-op. Awaiting an aborted task returns `Err(JoinError)` with `is_cancelled() == true`; we discard via `let _ = ...await`. - Whether to also `drop(reply_tx)` / `drop(msg_tx)` on the parent stack before the `select!` was considered. With `.abort()` in place, these drops are redundant — aborts forcibly terminate downstream tasks regardless of channel state, and the parent senders are dropped implicitly when `pty_proxy` returns. Skipping the explicit drops keeps the diff minimal. - Why not `tokio_util::sync::CancellationToken`: the dependency would be new to this crate, and the 4-task case is cleanly expressed with `tokio::pin!` + `.abort()`. If the task count grows or nested cancellation is needed in the future, migration to `CancellationToken` is straightforward. - The `inject_reply` race window is unchanged by this fix — it matches today's behavior. Once the aborts land and `bytes_bridge` is cancelled, any in-flight `inject_reply` that already cloned the sender will have its send silently dropped when the unbounded mpsc receiver is gone. `inject_reply` already returns `Err("no active PTY for session '...'")` when the map entry is missing, which callers (the `terminal.reply` RPC) already handle as a soft error.
Author
Member

Spec correction — avoid double-poll of the winning handle

The pseudo-code in the previous comment had the .await calls AFTER the select!:

tokio::select! {
    _ = &mut browser_to_server => { bytes_bridge.abort(); forwarder.abort(); server_to_browser.abort(); }
    // ...
}

let _ = browser_to_server.await;  // <-- bug: winner was already polled to completion
let _ = bytes_bridge.await;
let _ = forwarder.await;
let _ = server_to_browser.await;

Awaiting a JoinHandle that was already polled to completion by tokio::select! is not safe — the output has already been consumed, so the second .await either deadlocks (returns Pending forever) or panics depending on the tokio version.

Corrected pattern

Move the .awaits INSIDE each arm, awaiting only the three non-winners:

tokio::pin!(browser_to_server, bytes_bridge, forwarder, server_to_browser);

tokio::select! {
    _ = &mut browser_to_server => {
        bytes_bridge.abort();
        forwarder.abort();
        server_to_browser.abort();
        let _ = bytes_bridge.await;
        let _ = forwarder.await;
        let _ = server_to_browser.await;
    }
    _ = &mut bytes_bridge => {
        browser_to_server.abort();
        forwarder.abort();
        server_to_browser.abort();
        let _ = browser_to_server.await;
        let _ = forwarder.await;
        let _ = server_to_browser.await;
    }
    _ = &mut forwarder => {
        browser_to_server.abort();
        bytes_bridge.abort();
        server_to_browser.abort();
        let _ = browser_to_server.await;
        let _ = bytes_bridge.await;
        let _ = server_to_browser.await;
    }
    _ = &mut server_to_browser => {
        browser_to_server.abort();
        bytes_bridge.abort();
        forwarder.abort();
        let _ = browser_to_server.await;
        let _ = bytes_bridge.await;
        let _ = forwarder.await;
    }
}

{
    let mut map = attached().lock().await;
    map.remove(&name);
}

Ok(())

Each arm aborts and then awaits exactly the three sibling handles (never the winner). All other parts of the spec (pin!, map-remove ordering after select, no new deps) stay as before.

## Spec correction — avoid double-poll of the winning handle The pseudo-code in the previous comment had the `.await` calls AFTER the `select!`: ```rust tokio::select! { _ = &mut browser_to_server => { bytes_bridge.abort(); forwarder.abort(); server_to_browser.abort(); } // ... } let _ = browser_to_server.await; // <-- bug: winner was already polled to completion let _ = bytes_bridge.await; let _ = forwarder.await; let _ = server_to_browser.await; ``` Awaiting a `JoinHandle` that was already polled to completion by `tokio::select!` is not safe — the output has already been consumed, so the second `.await` either deadlocks (returns `Pending` forever) or panics depending on the tokio version. ### Corrected pattern Move the `.await`s INSIDE each arm, awaiting only the three non-winners: ```rust tokio::pin!(browser_to_server, bytes_bridge, forwarder, server_to_browser); tokio::select! { _ = &mut browser_to_server => { bytes_bridge.abort(); forwarder.abort(); server_to_browser.abort(); let _ = bytes_bridge.await; let _ = forwarder.await; let _ = server_to_browser.await; } _ = &mut bytes_bridge => { browser_to_server.abort(); forwarder.abort(); server_to_browser.abort(); let _ = browser_to_server.await; let _ = forwarder.await; let _ = server_to_browser.await; } _ = &mut forwarder => { browser_to_server.abort(); bytes_bridge.abort(); server_to_browser.abort(); let _ = browser_to_server.await; let _ = bytes_bridge.await; let _ = server_to_browser.await; } _ = &mut server_to_browser => { browser_to_server.abort(); bytes_bridge.abort(); forwarder.abort(); let _ = browser_to_server.await; let _ = bytes_bridge.await; let _ = forwarder.await; } } { let mut map = attached().lock().await; map.remove(&name); } Ok(()) ``` Each arm aborts and then awaits exactly the three sibling handles (never the winner). All other parts of the spec (pin!, map-remove ordering after select, no new deps) stay as before.
Author
Member

Test Results

  • Total: 71
  • Passed: 71
  • Failed: 0
  • Ignored: 0

Lint

  • cargo clippy --workspace --all-targets -- -D warnings: pass
  • cargo fmt --all --check: pass
  • cargo build --workspace: pass

No new tests added

The four tasks close over WebSocket sinks / SplitSink<WebSocketStream<..>> types that are not trivially mockable, and attached() is a process-global OnceLock. A meaningful end-to-end test would require an in-process axum WebSocket harness plus a tokio-tungstenite echo server, which is heavier than the fix itself. The change is verifiable by inspection:

  • Each of the four select! arms aborts exactly the three sibling JoinHandles.
  • Each arm awaits exactly the three aborted handles (never the winner), avoiding a double-poll of a completed JoinHandle.
  • The attached().remove(&name) block is kept AFTER the select so inject_reply and the internal resize handler continue to see the session during its full lifetime.
  • No Cargo.toml changes.

Recommended manual smoke test (rapid reconnect):

for i in $(seq 50); do
  (timeout 1 websocat "ws://127.0.0.1:9988/api/terminal/foo${i}/ws" >/dev/null &)
done
wait
# Watch /proc/<pid>/task count (or tokio-console) - should NOT grow
# monotonically across sessions.
## Test Results - Total: 71 - Passed: 71 - Failed: 0 - Ignored: 0 ### Lint - `cargo clippy --workspace --all-targets -- -D warnings`: pass - `cargo fmt --all --check`: pass - `cargo build --workspace`: pass ### No new tests added The four tasks close over `WebSocket` sinks / `SplitSink<WebSocketStream<..>>` types that are not trivially mockable, and `attached()` is a process-global `OnceLock`. A meaningful end-to-end test would require an in-process axum WebSocket harness plus a tokio-tungstenite echo server, which is heavier than the fix itself. The change is verifiable by inspection: - Each of the four `select!` arms aborts exactly the three sibling `JoinHandle`s. - Each arm awaits exactly the three aborted handles (never the winner), avoiding a double-poll of a completed `JoinHandle`. - The `attached().remove(&name)` block is kept AFTER the select so `inject_reply` and the internal resize handler continue to see the session during its full lifetime. - No `Cargo.toml` changes. Recommended manual smoke test (rapid reconnect): ```bash for i in $(seq 50); do (timeout 1 websocat "ws://127.0.0.1:9988/api/terminal/foo${i}/ws" >/dev/null &) done wait # Watch /proc/<pid>/task count (or tokio-console) - should NOT grow # monotonically across sessions. ```
Author
Member

Implementation Summary

Closing #50 via PR (linked in a follow-up comment).

Changes

crates/hero_router/src/server/terminal.rs (+50 / -5)

Rewrote the tokio::select! block at the tail of pty_proxy to abort the three non-winning tasks when any one of the four completes, and to await only those three handles (never the winner).

Before:

tokio::select! {
    _ = browser_to_server => {}
    _ = bytes_bridge => {}
    _ = forwarder => {}
    _ = server_to_browser => {}
}
// unregister
Ok(())

After:

  • tokio::pin!(browser_to_server, bytes_bridge, forwarder, server_to_browser);
  • Four arms, each of shape:
    • _ = &mut X => { abort(others); let _ = others.await; }
  • attached().remove(&name) kept AFTER the select so inject_reply and the resize handler continue to see the session for its full lifetime.

Why this pattern

  • .abort() is required: dropping a JoinHandle does not cancel the underlying task. Each select! arm now explicitly aborts the three sibling handles.
  • Arm-scoped .await of non-winners only: awaiting a JoinHandle that was already polled to completion by select! is not safe (can deadlock or panic depending on tokio version). The winner of each arm is never awaited again.
  • Sequential .await after .abort(): ensures each aborted task finishes unwinding before pty_proxy returns, so captured SplitSinks, sources, and channel senders drop promptly instead of lingering across session churn.
  • Map-remove stays AFTER select: inject_reply (called from terminal.reply RPC) and the internal resize handler both read the map entry during normal session operation. Moving the remove earlier would silently break CPR/DA reply forwarding and resize updates. The teardown-time race window for inject_reply is unchanged from today.
  • No new crates: uses tokio::pin! + JoinHandle::abort(). tokio_util's CancellationToken was considered and rejected — the 4-task case is cleanly expressed in ~40 lines without adding a dependency.

Verification

  • cargo build --workspace: pass.
  • cargo test --workspace: 71 passed / 0 failed / 0 ignored.
  • cargo clippy --workspace --all-targets -- -D warnings: clean.
  • cargo fmt --all --check: clean.

Review focus

Manual smoke test (rapid reconnect):

for i in $(seq 50); do
  (timeout 1 websocat "ws://127.0.0.1:9988/api/terminal/foo${i}/ws" >/dev/null &)
done
wait
# Expected: /proc/<pid>/task count is bounded across sessions.

Code review should confirm:

  1. Each arm aborts exactly the three non-winners.
  2. Each arm awaits exactly the three non-winners (no double-poll).
  3. attached().remove(&name) is still after the select!.
  4. No Cargo.toml changes.
## Implementation Summary Closing #50 via PR (linked in a follow-up comment). ### Changes **`crates/hero_router/src/server/terminal.rs`** (+50 / -5) Rewrote the `tokio::select!` block at the tail of `pty_proxy` to abort the three non-winning tasks when any one of the four completes, and to await only those three handles (never the winner). Before: ```rust tokio::select! { _ = browser_to_server => {} _ = bytes_bridge => {} _ = forwarder => {} _ = server_to_browser => {} } // unregister Ok(()) ``` After: - `tokio::pin!(browser_to_server, bytes_bridge, forwarder, server_to_browser);` - Four arms, each of shape: - `_ = &mut X => { abort(others); let _ = others.await; }` - `attached().remove(&name)` kept AFTER the select so `inject_reply` and the resize handler continue to see the session for its full lifetime. ### Why this pattern - **`.abort()` is required**: dropping a `JoinHandle` does not cancel the underlying task. Each `select!` arm now explicitly aborts the three sibling handles. - **Arm-scoped `.await` of non-winners only**: awaiting a `JoinHandle` that was already polled to completion by `select!` is not safe (can deadlock or panic depending on tokio version). The winner of each arm is never awaited again. - **Sequential `.await` after `.abort()`**: ensures each aborted task finishes unwinding before `pty_proxy` returns, so captured `SplitSink`s, sources, and channel senders drop promptly instead of lingering across session churn. - **Map-remove stays AFTER select**: `inject_reply` (called from `terminal.reply` RPC) and the internal resize handler both read the map entry during normal session operation. Moving the remove earlier would silently break CPR/DA reply forwarding and resize updates. The teardown-time race window for `inject_reply` is unchanged from today. - **No new crates**: uses `tokio::pin!` + `JoinHandle::abort()`. `tokio_util`'s `CancellationToken` was considered and rejected — the 4-task case is cleanly expressed in ~40 lines without adding a dependency. ### Verification - `cargo build --workspace`: pass. - `cargo test --workspace`: 71 passed / 0 failed / 0 ignored. - `cargo clippy --workspace --all-targets -- -D warnings`: clean. - `cargo fmt --all --check`: clean. ### Review focus Manual smoke test (rapid reconnect): ``` for i in $(seq 50); do (timeout 1 websocat "ws://127.0.0.1:9988/api/terminal/foo${i}/ws" >/dev/null &) done wait # Expected: /proc/<pid>/task count is bounded across sessions. ``` Code review should confirm: 1. Each arm aborts exactly the three non-winners. 2. Each arm awaits exactly the three non-winners (no double-poll). 3. `attached().remove(&name)` is still after the `select!`. 4. No `Cargo.toml` changes.
Author
Member

Pull request opened: #56

This PR implements the changes discussed in this issue.

Pull request opened: https://forge.ourworld.tf/lhumina_code/hero_router/pulls/56 This PR implements the changes discussed in this issue.
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
lhumina_code/hero_router#50
No description provided.