From 69c3b56bdecb97af1c222a3e2562963e824b351b Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 29 May 2026 21:19:41 +0100 Subject: [PATCH] fix: stabilize codex supervisor session listing --- CHANGELOG.md | 1 + docs/plugins/reference/codex-supervisor.md | 4 + .../codex-supervisor/src/json-rpc-client.ts | 29 ++- extensions/codex-supervisor/src/mcp-tools.ts | 4 +- .../codex-supervisor/src/plugin-tools.test.ts | 15 ++ .../codex-supervisor/src/plugin-tools.ts | 16 ++ .../codex-supervisor/src/supervisor.test.ts | 170 +++++++++++++++++- extensions/codex-supervisor/src/supervisor.ts | 58 +++++- 8 files changed, 286 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a6c81cb3d57..c8a63cf71277 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,7 @@ Docs: https://docs.openclaw.ai - Tighten phone-control mutation authorization [AI]. (#87150) Thanks @pgondhi987. - Clarify directive persistence authorization policy [AI]. (#86369) Thanks @pgondhi987. - Agents/Codex: keep spawned agent cwd/workspace state separated, keep hook context prompt-local, release session locks on timeout abort, avoid session event queue self-wait, preserve shared app-server state across startup or helper failures, keep native hook relay alive across restarts, route workspace memory through tools, resolve Codex runtime models first, report quarantined dynamic tools, format `skills` command output, and bound compaction/steering retries. (#87218, #86875, #86123, #87399, #87375, #87383, #87400) Thanks @mbelinky, @Alix-007, @luoyanglang, @yetval, and @sjf. +- Codex Supervisor: keep real-home app-server MCP session listing on the loaded/state-DB path, bound stored history scans, and close WebSocket probes cleanly. - Channels: thread canonical session keys into outbound hooks, preserve Matrix room-id case, keep fallback tool warnings mention-inert, retain delivered Slack final replies during late cleanup, continue iMessage polling after denied reactions, suppress duplicate native exec approvals, preserve Telegram SecretRef prompt config, suppress Discord recovered tool warnings, and block untrusted Teams service URLs. (#73706, #75670, #87366, #87451, #87334) Thanks @zeroaltitude, @lukeboyett, @xiaotian, and @eleqtrizit. - CLI/auth/doctor/providers: reject malformed numeric/timeout/subcommand-version inputs, wait for respawn child shutdown, bound Codex and GitHub Copilot OAuth/token requests, warm provider auth off the main thread, honor Codex response timeouts, bound local service startup, resolve GPT-5.5 without cached catalog, migrate legacy memory auto-provider config, rewrite non-canonical `api_key` auth profiles, and make doctor restart follow-ups actionable. (#87398, #86281, #87361) Thanks @Patrick-Erichsen, @samzong, @giodl73-repo, and @alkor2000. - Gateway/security/session state: expire browser tokens after auth rotation, scope assistant idempotency dedupe, drain probe client closes, avoid stale restart continuation reuse, preserve retry-after fallbacks, bound webchat image and artifact transcript scans, include seconds in inbound metadata timestamps, and evict current plugin-state namespaces at row caps. diff --git a/docs/plugins/reference/codex-supervisor.md b/docs/plugins/reference/codex-supervisor.md index a8cc7074c326..801650ad859a 100644 --- a/docs/plugins/reference/codex-supervisor.md +++ b/docs/plugins/reference/codex-supervisor.md @@ -17,3 +17,7 @@ Supervise Codex app-server sessions from OpenClaw. ## Surface contracts: tools + +## Session Listing + +`codex_sessions_list` defaults to loaded Codex sessions only. Set `include_stored` to include stored history; the plugin uses Codex app-server's state-DB-only listing path and caps stored results at 200 by default. Pass `max_stored_sessions` to lower or raise that cap, up to 1000. diff --git a/extensions/codex-supervisor/src/json-rpc-client.ts b/extensions/codex-supervisor/src/json-rpc-client.ts index ac532e8e632c..2b1fc3398e7e 100644 --- a/extensions/codex-supervisor/src/json-rpc-client.ts +++ b/extensions/codex-supervisor/src/json-rpc-client.ts @@ -264,6 +264,7 @@ function websocketMessageToString(data: WebSocket.RawData): string { class WebSocketCodexJsonRpcConnection extends BaseCodexJsonRpcConnection { private readonly ws: WebSocket; private readonly openPromise: Promise; + private closing = false; constructor(endpoint: Extract) { super(); @@ -294,7 +295,11 @@ class WebSocketCodexJsonRpcConnection extends BaseCodexJsonRpcConnection { } }); this.ws.once("error", (error) => this.fail(error)); - this.ws.once("close", () => this.fail(new Error("Codex app-server websocket closed"))); + this.ws.once("close", () => { + if (!this.closing) { + this.fail(new Error("Codex app-server websocket closed")); + } + }); } async ready(): Promise { @@ -310,7 +315,27 @@ class WebSocketCodexJsonRpcConnection extends BaseCodexJsonRpcConnection { } async close(): Promise { - this.ws.close(); + this.closing = true; + this.fail(new Error("Codex app-server websocket closed")); + if (this.ws.readyState === WebSocket.CLOSED) { + return; + } + await new Promise((resolve) => { + const timeout = setTimeout(() => { + this.ws.terminate(); + resolve(); + }, 1000); + this.ws.once("close", () => { + clearTimeout(timeout); + resolve(); + }); + if (this.ws.readyState === WebSocket.CONNECTING || this.ws.readyState === WebSocket.OPEN) { + this.ws.close(); + } else { + clearTimeout(timeout); + resolve(); + } + }); } } diff --git a/extensions/codex-supervisor/src/mcp-tools.ts b/extensions/codex-supervisor/src/mcp-tools.ts index cf6f75d40be6..5613cf4236b2 100644 --- a/extensions/codex-supervisor/src/mcp-tools.ts +++ b/extensions/codex-supervisor/src/mcp-tools.ts @@ -159,10 +159,12 @@ export function registerCodexSupervisorMcpTools( "List Codex sessions visible to the OpenClaw supervisor.", { include_stored: z.boolean().optional(), + max_stored_sessions: z.number().int().min(1).max(1000).optional(), }, - async ({ include_stored }) => { + async ({ include_stored, max_stored_sessions }) => { const result = await supervisor.listSessionSnapshot({ includeStored: include_stored ?? false, + maxStoredSessions: max_stored_sessions, }); return textResult( `codex sessions: ${result.sessions.length}`, diff --git a/extensions/codex-supervisor/src/plugin-tools.test.ts b/extensions/codex-supervisor/src/plugin-tools.test.ts index d28db87f1428..debdb28cf819 100644 --- a/extensions/codex-supervisor/src/plugin-tools.test.ts +++ b/extensions/codex-supervisor/src/plugin-tools.test.ts @@ -110,6 +110,21 @@ describe("createCodexSupervisorTools", () => { ).rejects.toThrow("Codex write controls are disabled"); }); + it("rejects stored session limits outside the runtime bounds", async () => { + const { supervisor } = createSupervisorStub(); + const tools = createCodexSupervisorTools({ + supervisor, + policy: { allowRawTranscripts: false, allowWriteControls: false }, + }); + + await expect( + toolByName(tools, "codex_sessions_list").execute("call-1", { + include_stored: true, + max_stored_sessions: 1001, + }), + ).rejects.toThrow("max_stored_sessions must be between 1 and 1000"); + }); + it("allows trusted read and write tools when policy enables them", async () => { const { calls, supervisor } = createSupervisorStub(); const tools = createCodexSupervisorTools({ diff --git a/extensions/codex-supervisor/src/plugin-tools.ts b/extensions/codex-supervisor/src/plugin-tools.ts index fcad3be3a97f..515b355399d1 100644 --- a/extensions/codex-supervisor/src/plugin-tools.ts +++ b/extensions/codex-supervisor/src/plugin-tools.ts @@ -13,6 +13,7 @@ const EmptyParamsSchema = Type.Object({}, { additionalProperties: false }); const SessionsListParamsSchema = Type.Object( { include_stored: Type.Optional(Type.Boolean()), + max_stored_sessions: Type.Optional(Type.Integer({ minimum: 1, maximum: 1000 })), }, { additionalProperties: false }, ); @@ -67,6 +68,20 @@ function readBooleanParam(params: Record, key: string): boolean return params[key] === true; } +function readIntegerParam(params: Record, key: string): number | undefined { + const value = params[key]; + if (value === undefined) { + return undefined; + } + if (!Number.isInteger(value)) { + throw new Error(`${key} must be an integer`); + } + if (value < 1 || value > 1000) { + throw new Error(`${key} must be between 1 and 1000`); + } + return value as number; +} + function readModeParam(params: Record): CodexSupervisorTurnMode | undefined { const mode = readStringParam(params, "mode"); if (!mode) { @@ -122,6 +137,7 @@ export function createCodexSupervisorTools({ const params = asRecord(rawParams); const result = await supervisor.listSessionSnapshot({ includeStored: readBooleanParam(params, "include_stored"), + maxStoredSessions: readIntegerParam(params, "max_stored_sessions"), }); return jsonResult({ summary: `codex sessions: ${result.sessions.length}`, diff --git a/extensions/codex-supervisor/src/supervisor.test.ts b/extensions/codex-supervisor/src/supervisor.test.ts index 38f1eb69ce27..64fa5d78193c 100644 --- a/extensions/codex-supervisor/src/supervisor.test.ts +++ b/extensions/codex-supervisor/src/supervisor.test.ts @@ -2,6 +2,7 @@ import * as fs from "node:fs/promises"; import * as os from "node:os"; import * as path from "node:path"; import { describe, expect, it } from "vitest"; +import { WebSocketServer } from "ws"; import { loadCodexSupervisorEndpoints, resolveCodexSupervisorPluginConfig } from "./config.js"; import { connectCodexAppServerEndpoint, resolveSafeApprovalResult } from "./json-rpc-client.js"; import { CodexSupervisor } from "./supervisor.js"; @@ -350,6 +351,7 @@ describe("CodexSupervisor", () => { ]); expect(fake.calls.find((call) => call.method === "thread/list")?.params).toMatchObject({ sourceKinds: ["cli", "vscode", "exec", "appServer", "unknown"], + useStateDbOnly: true, }); }); @@ -394,6 +396,51 @@ describe("CodexSupervisor", () => { ]); }); + it("bounds stored session pagination for large real Codex homes", async () => { + const fake = new FakeCodexConnection({ + id: "thread-1", + status: { type: "idle" }, + turns: [], + }); + fake.request = async (method, params) => { + fake.calls.push({ method, params }); + if (method === "thread/loaded/list") { + return { data: [], nextCursor: null }; + } + if (method === "thread/list") { + return { + data: [ + { id: "thread-1", status: { type: "notLoaded" }, turns: [] }, + { id: "thread-2", status: { type: "notLoaded" }, turns: [] }, + ], + nextCursor: "page-2", + }; + } + throw new Error(`unexpected method: ${method}`); + }; + const supervisor = new CodexSupervisor([endpoint], async () => fake); + + await expect( + supervisor.listSessions({ includeStored: true, maxStoredSessions: 1 }), + ).resolves.toEqual([ + { + endpointId: "local", + threadId: "thread-1", + status: "notLoaded", + }, + ]); + expect(fake.calls.filter((call) => call.method === "thread/list")).toEqual([ + { + method: "thread/list", + params: { + limit: 1, + sourceKinds: ["cli", "vscode", "exec", "appServer", "unknown"], + useStateDbOnly: true, + }, + }, + ]); + }); + it("closes settled connections when evicting them", async () => { const fake = new FakeCodexConnection({ id: "thread-1", @@ -508,6 +555,88 @@ describe("CodexSupervisor", () => { }); }); + it("uses a unique loaded endpoint match even when another endpoint is down", async () => { + const upEndpoint: CodexSupervisorEndpoint = { id: "up", transport: "stdio-proxy" }; + const downEndpoint: CodexSupervisorEndpoint = { id: "down", transport: "stdio-proxy" }; + const fake = new FakeCodexConnection({ + id: "thread-1", + status: { type: "idle" }, + turns: [], + }); + const supervisor = new CodexSupervisor([upEndpoint, downEndpoint], async (target) => { + if (target.id === "down") { + throw new Error("host offline"); + } + return fake; + }); + + await expect( + supervisor.sendToSession({ threadId: "thread-1", text: "continue" }), + ).resolves.toMatchObject({ + endpointId: "up", + threadId: "thread-1", + mode: "start", + }); + }); + + it("resolves omitted endpoint ids by exact thread read without scanning stored pages", async () => { + const fake = new FakeCodexConnection({ + id: "thread-old", + status: { type: "notLoaded" }, + turns: [], + }); + fake.request = async (method, params) => { + fake.calls.push({ method, params }); + if (method === "thread/loaded/list") { + return { data: [], nextCursor: null }; + } + if (method === "thread/read" && params?.threadId === "thread-old") { + return { thread: { id: "thread-old", status: { type: "notLoaded" }, turns: [] } }; + } + throw new Error(`unexpected method: ${method}`); + }; + const supervisor = new CodexSupervisor([endpoint], async () => fake); + + await expect(supervisor.readSession({ threadId: "thread-old" })).resolves.toEqual({ + thread: { id: "thread-old", status: { type: "notLoaded" }, turns: [] }, + }); + expect(fake.calls.map((call) => call.method)).toEqual([ + "thread/loaded/list", + "thread/read", + "thread/read", + ]); + }); + + it("resolves stored threads on healthy endpoints when another endpoint is down", async () => { + const downEndpoint: CodexSupervisorEndpoint = { id: "down", transport: "stdio-proxy" }; + const upEndpoint: CodexSupervisorEndpoint = { id: "up", transport: "stdio-proxy" }; + const fake = new FakeCodexConnection({ + id: "thread-old", + status: { type: "notLoaded" }, + turns: [], + }); + fake.request = async (method, params) => { + fake.calls.push({ method, params }); + if (method === "thread/loaded/list") { + return { data: [], nextCursor: null }; + } + if (method === "thread/read" && params?.threadId === "thread-old") { + return { thread: { id: "thread-old", status: { type: "notLoaded" }, turns: [] } }; + } + throw new Error(`unexpected method: ${method}`); + }; + const supervisor = new CodexSupervisor([downEndpoint, upEndpoint], async (target) => { + if (target.id === "down") { + throw new Error("host offline"); + } + return fake; + }); + + await expect(supervisor.readSession({ threadId: "thread-old" })).resolves.toEqual({ + thread: { id: "thread-old", status: { type: "notLoaded" }, turns: [] }, + }); + }); + it("steers active sessions when the in-progress turn is readable", async () => { const fake = new FakeCodexConnection({ id: "thread-1", @@ -668,6 +797,45 @@ async function waitForFile(filePath: string): Promise { } describe("connectCodexAppServerEndpoint", () => { + it("rejects pending websocket requests when the supervisor closes intentionally", async () => { + const server = new WebSocketServer({ host: "127.0.0.1", port: 0 }); + const port = await new Promise((resolve) => { + server.once("listening", () => { + const address = server.address(); + resolve(typeof address === "object" && address ? address.port : 0); + }); + }); + const sawProbeRequest = new Promise((resolve) => { + server.once("connection", (socket) => { + socket.on("message", (data) => { + const request = JSON.parse(data.toString()) as Record; + if (request.method === "initialize") { + socket.send(JSON.stringify({ id: request.id, result: {} })); + } + if (request.method === "thread/loaded/list") { + resolve(); + } + }); + }); + }); + const supervisor = new CodexSupervisor( + [{ id: "ws", transport: "websocket", url: `ws://127.0.0.1:${port}` }], + connectCodexAppServerEndpoint, + ); + + const probe = supervisor.probeEndpoints(); + await sawProbeRequest; + await supervisor.close(); + + await expect( + Promise.race([ + probe, + new Promise((_, reject) => setTimeout(() => reject(new Error("probe timed out")), 500)), + ]), + ).resolves.toMatchObject([{ endpointId: "ws", ok: false }]); + await new Promise((resolve) => server.close(() => resolve())); + }); + it("rejects malformed stdio frames instead of throwing out of band", async () => { const markerDir = await fs.mkdtemp(path.join(os.tmpdir(), "codex-supervisor-malformed-")); const marker = path.join(markerDir, "closed"); @@ -735,7 +903,7 @@ describe("connectCodexAppServerEndpoint", () => { process.stdout.write(JSON.stringify({ id: request.id, result: {} }) + "\\n"); return; } - if (request.method === "thread/list") { + if (request.method === "thread/loaded/list") { process.stdout.write(JSON.stringify({ id: request.id, result: { threads: [] } }) + "\\n"); setTimeout(() => process.exit(0), 0); } diff --git a/extensions/codex-supervisor/src/supervisor.ts b/extensions/codex-supervisor/src/supervisor.ts index f43f69dab5e9..0d8560aec90c 100644 --- a/extensions/codex-supervisor/src/supervisor.ts +++ b/extensions/codex-supervisor/src/supervisor.ts @@ -13,6 +13,7 @@ import type { type EndpointConnector = (endpoint: CodexSupervisorEndpoint) => Promise; const ALL_CODEX_THREAD_SOURCE_KINDS = ["cli", "vscode", "exec", "appServer", "unknown"]; +const DEFAULT_MAX_STORED_SESSIONS = 200; function isRecord(value: unknown): value is Record { return Boolean(value) && typeof value === "object" && !Array.isArray(value); @@ -137,7 +138,7 @@ export class CodexSupervisor { this.endpoints.map(async (endpoint) => { try { const connection = await this.connectionFor(endpoint.id); - await connection.request("thread/list", { limit: 1 }); + await connection.request("thread/loaded/list", { limit: 1 }); return { endpointId: endpoint.id, ok: true }; } catch (error) { this.forgetEndpoint(endpoint.id); @@ -151,12 +152,14 @@ export class CodexSupervisor { ); } - async listSessions(params: { includeStored?: boolean } = {}): Promise { + async listSessions( + params: { includeStored?: boolean; maxStoredSessions?: number } = {}, + ): Promise { return (await this.listSessionSnapshot(params)).sessions; } async listSessionSnapshot( - params: { includeStored?: boolean } = {}, + params: { includeStored?: boolean; maxStoredSessions?: number } = {}, ): Promise { const sessions: CodexSupervisorSession[] = []; const errors: CodexSupervisorEndpointHealth[] = []; @@ -273,12 +276,15 @@ export class CodexSupervisor { private async listEndpointSessions( endpoint: CodexSupervisorEndpoint, - params: { includeStored?: boolean }, + params: { includeStored?: boolean; maxStoredSessions?: number }, ): Promise { if (params.includeStored === true) { const loaded = await this.listLoadedThreadSessions(endpoint); const sessions = [...loaded]; - for (const stored of await this.listStoredThreadSessions(endpoint)) { + for (const stored of await this.listStoredThreadSessions( + endpoint, + params.maxStoredSessions, + )) { if (!sessions.some((session) => session.threadId === stored.threadId)) { sessions.push(stored); } @@ -318,14 +324,23 @@ export class CodexSupervisor { private async listStoredThreadSessions( endpoint: CodexSupervisorEndpoint, + maxStoredSessions = DEFAULT_MAX_STORED_SESSIONS, ): Promise { + const sessionLimit = Number.isFinite(maxStoredSessions) + ? Math.min(1000, Math.max(1, Math.floor(maxStoredSessions))) + : DEFAULT_MAX_STORED_SESSIONS; const sessions: CodexSupervisorSession[] = []; const connection = await this.connectionFor(endpoint.id); let cursor: string | undefined; do { + const remaining = sessionLimit - sessions.length; + if (remaining <= 0) { + break; + } const listed = await connection.request("thread/list", { - limit: 100, + limit: Math.min(100, remaining), sourceKinds: ALL_CODEX_THREAD_SOURCE_KINDS, + useStateDbOnly: true, ...(cursor ? { cursor } : {}), }); for (const thread of extractThreadList(listed)) { @@ -340,6 +355,9 @@ export class CodexSupervisor { const session = toSession(endpoint.id, thread); if (session) { sessions.push(session); + if (sessions.length >= sessionLimit) { + break; + } } } cursor = @@ -435,7 +453,7 @@ export class CodexSupervisor { if (params.endpointId) { return params.endpointId; } - const sessions = await this.listSessions({ includeStored: true }); + const sessions = await this.listSessions(); const matches = sessions.filter((session) => session.threadId === params.threadId); if (matches.length === 1) { return matches[0].endpointId; @@ -443,6 +461,32 @@ export class CodexSupervisor { if (matches.length > 1) { throw new Error(`Codex thread id is ambiguous across endpoints: ${params.threadId}`); } + const endpointIds = new Set(matches.map((match) => match.endpointId)); + for (const endpoint of this.endpoints) { + if (endpointIds.has(endpoint.id)) { + continue; + } + try { + const connection = await this.connectionFor(endpoint.id); + const read = await this.readThread(connection, params.threadId, false); + const thread = extractThread(read); + if (thread?.id === params.threadId) { + endpointIds.add(endpoint.id); + } + } catch (error) { + if (isLoadedThreadReadMiss(error)) { + continue; + } + this.forgetEndpoint(endpoint.id); + continue; + } + } + if (endpointIds.size === 1) { + return Array.from(endpointIds)[0]!; + } + if (endpointIds.size > 1) { + throw new Error(`Codex thread id is ambiguous across endpoints: ${params.threadId}`); + } throw new Error(`Codex thread not found: ${params.threadId}`); }