Merge pull request #51509 from NousResearch/salvage/49041-compression-session-lineage

fix(tui): preserve live session identity across compression (#49041)
This commit is contained in:
kshitij
2026-06-24 02:04:48 +05:30
committed by GitHub
4 changed files with 238 additions and 4 deletions

View File

@@ -78,7 +78,7 @@ def active_session_limit_message(active_count: int, max_sessions: int) -> str:
def _state_dir() -> Path:
return get_hermes_home() / "runtime"
return Path(get_hermes_home()) / "runtime"
def _state_path() -> Path:
@@ -311,6 +311,43 @@ def release_active_session(lease: ActiveSessionLease) -> None:
lease.released = True
def transfer_active_session(
lease: ActiveSessionLease,
*,
session_id: str,
metadata: Optional[dict[str, Any]] = None,
) -> bool:
"""Move an existing lease to a new session id without dropping the slot."""
new_session_id = str(session_id or "")
if not new_session_id:
return False
if lease.released:
return False
if not lease.enabled:
lease.session_id = new_session_id
return True
state_path = _state_path()
with _FileLock(_lock_path()):
entries = _prune_dead(_read_entries(state_path))
updated = False
for entry in entries:
if str(entry.get("lease_id") or "") != lease.lease_id:
continue
entry["session_id"] = new_session_id
entry["updated_at"] = time.time()
if metadata:
entry["metadata"] = {
str(k): v for k, v in metadata.items() if isinstance(k, str)
}
updated = True
break
if updated:
_write_entries(state_path, entries)
lease.session_id = new_session_id
return updated
def active_session_registry_snapshot() -> list[dict[str, Any]]:
"""Return the pruned active-session registry for diagnostics/tests."""
state_path = _state_path()

View File

@@ -113,6 +113,33 @@ def test_active_session_registry_prunes_dead_pids(tmp_path, monkeypatch):
lease.release()
def test_transfer_active_session_reanchors_existing_lease(tmp_path, monkeypatch):
home = tmp_path / ".hermes"
monkeypatch.setenv("HERMES_HOME", str(home))
lease, message = active_sessions.try_acquire_active_session(
session_id="session-old",
surface="tui",
config={"max_concurrent_sessions": 1},
metadata={"live_session_id": "ui-1"},
)
assert message is None
assert lease is not None
assert active_sessions.transfer_active_session(
lease,
session_id="session-new",
metadata={"live_session_id": "ui-1"},
)
snapshot = active_sessions.active_session_registry_snapshot()
assert lease.session_id == "session-new"
assert len(snapshot) == 1
assert snapshot[0]["session_id"] == "session-new"
assert snapshot[0]["metadata"] == {"live_session_id": "ui-1"}
lease.release()
def test_pid_alive_uses_safe_pid_exists_without_signalling(monkeypatch):
checked: list[int] = []

View File

@@ -734,6 +734,100 @@ def test_session_resume_reuses_existing_live_session(server, monkeypatch):
assert all(sid == winner for sid in server._sessions)
def test_session_resume_reuses_live_agent_after_compression_rotation(server, monkeypatch):
"""Resume must match the live agent's current session_id, not stale session_key."""
target = "20260409_020202_child"
stale_parent = "20260409_010101_parent"
sid = "live-rotated"
server._sessions[sid] = {
"agent": types.SimpleNamespace(model="test/model", session_id=target),
"created_at": 123.0,
"display_history_prefix": [],
"history": [{"role": "assistant", "content": "live child"}],
"history_lock": threading.RLock(),
"last_active": 123.0,
"running": False,
"session_key": stale_parent,
"transport": server._stdio_transport,
}
class _DB:
def get_session(self, _sid):
return {"id": target}
def get_session_by_title(self, _title):
return None
def resolve_resume_session_id(self, _target):
return target
monkeypatch.setattr(server, "_get_db", lambda: _DB())
monkeypatch.setattr(server, "_emit", lambda *_args, **_kwargs: None)
monkeypatch.setattr(
server,
"_session_info",
lambda _agent, _session=None: {"model": "test/model"},
)
result = server.handle_request(
{
"id": "r1",
"method": "session.resume",
"params": {"session_id": target, "cols": 100},
}
)
assert "error" not in result
assert result["result"]["session_id"] == sid
assert result["result"]["session_key"] == target
assert len(server._sessions) == 1
def test_sync_session_key_after_compress_reanchors_active_session_lease(
server, monkeypatch, tmp_path
):
home = tmp_path / ".hermes"
monkeypatch.setenv("HERMES_HOME", str(home))
from hermes_cli.active_sessions import (
active_session_registry_snapshot,
try_acquire_active_session,
)
lease, message = try_acquire_active_session(
session_id="session-old",
surface="tui",
config={"max_concurrent_sessions": 1},
metadata={"live_session_id": "ui-1"},
)
assert message is None
assert lease is not None
session = {
"active_session_lease": lease,
"agent": types.SimpleNamespace(session_id="session-new"),
"session_key": "session-old",
}
fake_approval = types.SimpleNamespace(
disable_session_yolo=lambda *_args, **_kwargs: None,
enable_session_yolo=lambda *_args, **_kwargs: None,
is_session_yolo_enabled=lambda *_args, **_kwargs: False,
register_gateway_notify=lambda *_args, **_kwargs: None,
unregister_gateway_notify=lambda *_args, **_kwargs: None,
)
monkeypatch.setattr(server, "_restart_slash_worker", lambda *_args, **_kwargs: None)
with patch.dict(sys.modules, {"tools.approval": fake_approval}):
server._sync_session_key_after_compress("ui-1", session)
snapshot = active_session_registry_snapshot()
assert session["session_key"] == "session-new"
assert lease.session_id == "session-new"
assert [entry["session_id"] for entry in snapshot] == ["session-new"]
lease.release()
def test_session_resume_live_payload_uses_current_history_with_ancestors(server, monkeypatch):
"""Live resume should not reuse a stale ancestor-inclusive snapshot."""

View File

@@ -381,6 +381,59 @@ def _release_active_session_slot(session: dict | None) -> None:
logger.debug("Failed to release active session slot", exc_info=True)
def _transfer_active_session_slot(
sid: str,
session: dict,
*,
new_session_id: str,
) -> bool:
if not new_session_id:
return False
lease = session.get("active_session_lease")
if lease is None:
return True
try:
from hermes_cli.active_sessions import transfer_active_session
if transfer_active_session(
lease,
session_id=new_session_id,
metadata={"live_session_id": sid},
):
return True
except Exception:
logger.debug("Failed to transfer active session slot", exc_info=True)
# Fallback: the in-place transfer could not move the lease (entry pruned /
# pid-check transiently failed). Reserve the new slot BEFORE releasing the
# old one, so a concurrent gateway at the session cap cannot grab the freed
# slot in a release-then-reacquire window and leave this session with no
# lease at all (#49041 review). If the reserve fails, KEEP the old lease.
new_lease, limit_message = _claim_active_session_slot(
new_session_id,
live_session_id=sid,
)
if new_lease is not None:
old_lease = session.pop("active_session_lease", None)
if old_lease is not None:
try:
old_lease.release()
except Exception:
logger.debug("Failed to release stale active session slot", exc_info=True)
session["active_session_lease"] = new_lease
return True
# Reserve failed — retain the existing lease rather than dropping it.
if limit_message:
logger.warning(
"Compression session lease re-anchor failed (kept old lease): "
"sid=%s new_session_id=%s reason=%s",
sid,
new_session_id,
limit_message,
)
return False
def _finalize_session(session: dict | None, end_reason: str = "tui_close") -> None:
"""Best-effort finalize hook + memory commit for a session.
@@ -2543,6 +2596,19 @@ def _sync_session_key_after_compress(
if not new_session_id or new_session_id == old_key:
return
lease_reanchored = _transfer_active_session_slot(
sid,
session,
new_session_id=new_session_id,
)
if not lease_reanchored:
logger.warning(
"Compression session lease did not re-anchor: sid=%s old_session_id=%s new_session_id=%s",
sid,
old_key,
new_session_id,
)
try:
from tools.approval import (
disable_session_yolo,
@@ -4940,7 +5006,7 @@ def _session_live_title(session: dict, key: str) -> str:
def _session_live_item(sid: str, session: dict, current_sid: str = "") -> dict:
key = str(session.get("session_key") or sid)
key = _session_lookup_key(session, fallback=sid)
agent = session.get("agent")
history = list(session.get("history") or [])
status = _session_live_status(sid, session)
@@ -4964,11 +5030,21 @@ def _session_live_item(sid: str, session: dict, current_sid: str = "") -> dict:
}
def _session_lookup_key(session: dict, *, fallback: str = "") -> str:
agent = session.get("agent")
return str(
getattr(agent, "session_id", None)
or session.get("session_key")
or fallback
or ""
)
def _find_live_session_by_key(session_key: str) -> tuple[str, dict] | None:
for sid, session in list(_sessions.items()):
if session.get("_finalized"):
continue
if str(session.get("session_key") or "") == session_key:
if _session_lookup_key(session, fallback=sid) == session_key:
return sid, session
return None
@@ -5012,7 +5088,7 @@ def _live_session_payload(
"messages": _history_to_messages(history),
"running": running,
"session_id": sid,
"session_key": session.get("session_key") or sid,
"session_key": _session_lookup_key(session, fallback=sid),
"started_at": float(session.get("created_at") or time.time()),
"status": _session_live_status(sid, session),
}