fix(imessage): keep split-send coalescing opt-in (#93143)

Merged via squash.

Prepared head SHA: 7fc3eca084
Co-authored-by: omarshahine <10343873+omarshahine@users.noreply.github.com>
Co-authored-by: omarshahine <10343873+omarshahine@users.noreply.github.com>
Reviewed-by: @omarshahine
This commit is contained in:
Omar Shahine
2026-06-21 17:12:21 -07:00
committed by GitHub
parent f719813a7e
commit a0714a3d68
10 changed files with 680 additions and 295 deletions

View File

@@ -1,4 +1,4 @@
24f11880cec619997ff93d303c32431bf4fb2bfefb56c9f0ece35ff91b329a80 config-baseline.json
3ac3be8b7e201eb577854806a9806ba90acbfb2616e14b3ffd1169f188620303 config-baseline.json
2923c1120c0369aeca6646cd67f7264590c6a1f4e5bc3157a04d7661324c6868 config-baseline.core.json
2d735389858305509528e74329b6f8c65d311e1471c3b4e91dc17aaab8e63a80 config-baseline.channel.json
769899651e2769833ae7e9c8fbf402e55f3d5e32da6bfe21a9659cc35d1f07bb config-baseline.channel.json
d2e2114f1cd43dc894fe1a4836677b42a2a5af825537d6c4a932da832d58a590 config-baseline.plugin.json

View File

@@ -94,28 +94,28 @@ Use this checklist when you already know your old BlueBubbles config and want th
iMessage and BlueBubbles share a lot of channel-level config. The keys that change are mostly transport (REST server vs local CLI). Behavior keys (`dmPolicy`, `groupPolicy`, `allowFrom`, etc.) keep the same meaning.
| BlueBubbles | bundled iMessage | Notes |
| ---------------------------------------------------------- | ----------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `channels.bluebubbles.enabled` | `channels.imessage.enabled` | Same semantics. |
| `channels.bluebubbles.serverUrl` | _(removed)_ | No REST server — the plugin spawns `imsg rpc` over stdio. |
| `channels.bluebubbles.password` | _(removed)_ | No webhook authentication needed. |
| _(implicit)_ | `channels.imessage.cliPath` | Path to `imsg` (default `imsg`); use a wrapper script for SSH. |
| _(implicit)_ | `channels.imessage.dbPath` | Optional Messages.app `chat.db` override; auto-detected when omitted. |
| _(implicit)_ | `channels.imessage.remoteHost` | `host` or `user@host` — only needed when `cliPath` is an SSH wrapper and you want SCP attachment fetches. |
| `channels.bluebubbles.dmPolicy` | `channels.imessage.dmPolicy` | Same values (`pairing` / `allowlist` / `open` / `disabled`). |
| `channels.bluebubbles.allowFrom` | `channels.imessage.allowFrom` | Pairing approvals carry over by handle, not by token. |
| `channels.bluebubbles.groupPolicy` | `channels.imessage.groupPolicy` | Same values (`allowlist` / `open` / `disabled`). |
| `channels.bluebubbles.groupAllowFrom` | `channels.imessage.groupAllowFrom` | Same. |
| `channels.bluebubbles.groups` | `channels.imessage.groups` | **Copy this verbatim, including any `groups: { "*": { ... } }` wildcard entry.** Per-group `requireMention`, `tools`, `toolsBySender` carry over. With `groupPolicy: "allowlist"`, an empty or missing `groups` block silently drops every group message — see "Group registry footgun" below. |
| `channels.bluebubbles.sendReadReceipts` | `channels.imessage.sendReadReceipts` | Default `true`. With the bundled plugin this only fires when the private API probe is up. |
| `channels.bluebubbles.includeAttachments` | `channels.imessage.includeAttachments` | Same shape, **same off-by-default**. If you had attachments flowing on BlueBubbles you must re-set this explicitly on the iMessage block — it does not carry over implicitly, and inbound photos/media will be silently dropped with no `Inbound message` log line until you do. |
| `channels.bluebubbles.attachmentRoots` | `channels.imessage.attachmentRoots` | Local roots; same wildcard rules. |
| _(N/A)_ | `channels.imessage.remoteAttachmentRoots` | Only used when `remoteHost` is set for SCP fetches. |
| `channels.bluebubbles.mediaMaxMb` | `channels.imessage.mediaMaxMb` | Default 16 MB on iMessage (BlueBubbles default was 8 MB). Set explicitly if you want to keep the lower cap. |
| `channels.bluebubbles.textChunkLimit` | `channels.imessage.textChunkLimit` | Default 4000 on both. |
| `channels.bluebubbles.coalesceSameSenderDms` | `channels.imessage.coalesceSameSenderDms` | Same opt-in. DM-only — group chats keep instant per-message dispatch on both channels. Widens the default inbound debounce to 2500 ms when enabled without an explicit `messages.inbound.byChannel.imessage`. See [iMessage docs § Coalescing split-send DMs](/channels/imessage#coalescing-split-send-dms-command--url-in-one-composition). |
| `channels.bluebubbles.enrichGroupParticipantsFromContacts` | _(N/A)_ | iMessage already reads sender display names from `chat.db`. |
| `channels.bluebubbles.actions.*` | `channels.imessage.actions.*` | Per-action toggles: `reactions`, `edit`, `unsend`, `reply`, `sendWithEffect`, `renameGroup`, `setGroupIcon`, `addParticipant`, `removeParticipant`, `leaveGroup`, `sendAttachment`. |
| BlueBubbles | bundled iMessage | Notes |
| ---------------------------------------------------------- | ----------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `channels.bluebubbles.enabled` | `channels.imessage.enabled` | Same semantics. |
| `channels.bluebubbles.serverUrl` | _(removed)_ | No REST server — the plugin spawns `imsg rpc` over stdio. |
| `channels.bluebubbles.password` | _(removed)_ | No webhook authentication needed. |
| _(implicit)_ | `channels.imessage.cliPath` | Path to `imsg` (default `imsg`); use a wrapper script for SSH. |
| _(implicit)_ | `channels.imessage.dbPath` | Optional Messages.app `chat.db` override; auto-detected when omitted. |
| _(implicit)_ | `channels.imessage.remoteHost` | `host` or `user@host` — only needed when `cliPath` is an SSH wrapper and you want SCP attachment fetches. |
| `channels.bluebubbles.dmPolicy` | `channels.imessage.dmPolicy` | Same values (`pairing` / `allowlist` / `open` / `disabled`). |
| `channels.bluebubbles.allowFrom` | `channels.imessage.allowFrom` | Pairing approvals carry over by handle, not by token. |
| `channels.bluebubbles.groupPolicy` | `channels.imessage.groupPolicy` | Same values (`allowlist` / `open` / `disabled`). |
| `channels.bluebubbles.groupAllowFrom` | `channels.imessage.groupAllowFrom` | Same. |
| `channels.bluebubbles.groups` | `channels.imessage.groups` | **Copy this verbatim, including any `groups: { "*": { ... } }` wildcard entry.** Per-group `requireMention`, `tools`, `toolsBySender` carry over. With `groupPolicy: "allowlist"`, an empty or missing `groups` block silently drops every group message — see "Group registry footgun" below. |
| `channels.bluebubbles.sendReadReceipts` | `channels.imessage.sendReadReceipts` | Default `true`. With the bundled plugin this only fires when the private API probe is up. |
| `channels.bluebubbles.includeAttachments` | `channels.imessage.includeAttachments` | Same shape, **same off-by-default**. If you had attachments flowing on BlueBubbles you must re-set this explicitly on the iMessage block — it does not carry over implicitly, and inbound photos/media will be silently dropped with no `Inbound message` log line until you do. |
| `channels.bluebubbles.attachmentRoots` | `channels.imessage.attachmentRoots` | Local roots; same wildcard rules. |
| _(N/A)_ | `channels.imessage.remoteAttachmentRoots` | Only used when `remoteHost` is set for SCP fetches. |
| `channels.bluebubbles.mediaMaxMb` | `channels.imessage.mediaMaxMb` | Default 16 MB on iMessage (BlueBubbles default was 8 MB). Set explicitly if you want to keep the lower cap. |
| `channels.bluebubbles.textChunkLimit` | `channels.imessage.textChunkLimit` | Default 4000 on both. |
| `channels.bluebubbles.coalesceSameSenderDms` | `channels.imessage.coalesceSameSenderDms` | Same opt-in. DM-only — group chats keep instant per-message dispatch on both channels. Widens the default inbound debounce to 7000 ms when enabled without an explicit `messages.inbound.byChannel.imessage` or global `messages.inbound.debounceMs`. See [iMessage docs § Coalescing split-send DMs](/channels/imessage#coalescing-split-send-dms-command--url-in-one-composition). |
| `channels.bluebubbles.enrichGroupParticipantsFromContacts` | _(N/A)_ | iMessage already reads sender display names from `chat.db`. |
| `channels.bluebubbles.actions.*` | `channels.imessage.actions.*` | Per-action toggles: `reactions`, `edit`, `unsend`, `reply`, `sendWithEffect`, `renameGroup`, `setGroupIcon`, `addParticipant`, `removeParticipant`, `leaveGroup`, `sendAttachment`. |
Multi-account configs (`channels.bluebubbles.accounts.*`) translate one-to-one to `channels.imessage.accounts.*`.

View File

@@ -681,7 +681,7 @@ The two rows arrive at OpenClaw ~0.8-2.0 s apart on most setups. Without coalesc
}
```
With the flag on and no explicit `messages.inbound.byChannel.imessage`, the debounce window widens to **2500 ms** (the legacy default is 0 ms — no debouncing). The wider window is required because Apple's split-send cadence of 0.8-2.0 s does not fit in a tighter default.
With the flag on and no explicit `messages.inbound.byChannel.imessage` or global `messages.inbound.debounceMs`, the debounce window widens to **7000 ms** (the legacy default is 0 ms — no debouncing). The wider window is required because Apple's URL-preview split-send cadence can stretch to several seconds while Messages.app emits the preview row.
To tune the window yourself:
@@ -690,10 +690,8 @@ The two rows arrive at OpenClaw ~0.8-2.0 s apart on most setups. Without coalesc
messages: {
inbound: {
byChannel: {
// 2500 ms works for most setups; raise to 4000 ms if your Mac is
// slow or under memory pressure (observed gap can stretch past 2 s
// then).
imessage: 2500,
// 7000 ms covers observed Messages.app URL-preview delays.
imessage: 7000,
},
},
},
@@ -715,15 +713,15 @@ The two rows arrive at OpenClaw ~0.8-2.0 s apart on most setups. Without coalesc
The "Flag on" column shows behavior on an `imsg` build that emits `balloon_bundle_id`. On older `imsg` builds that emit no balloon metadata at all, the rows below marked "Two turns" / "N turns" instead fall back to a legacy merge (one turn): OpenClaw cannot structurally tell a split-send from separate sends, so it preserves the pre-metadata merge. Precise separation activates once the build emits balloon metadata.
| User composes | `chat.db` produces | Flag off (default) | Flag on + window (imsg emits balloon metadata) |
| ------------------------------------------------------------------ | ----------------------------------- | --------------------------------------- | ------------------------------------------------ |
| `Dump https://example.com` (one send) | 2 rows ~1 s apart | Two agent turns: "Dump" alone, then URL | One turn: merged text `Dump https://example.com` |
| `Save this 📎image.jpg caption` (attachment + text) | 2 rows without URL balloon metadata | Two turns | Two turns (legacy merge on metadata-less builds) |
| `/status` (standalone command) | 1 row | Instant dispatch | **Wait up to window, then dispatch** |
| URL pasted alone | 1 row | Instant dispatch | Wait up to window, then dispatch |
| Text + URL sent as two deliberate separate messages, minutes apart | 2 rows outside window | Two turns | Two turns (window expires between them) |
| Rapid flood (>10 small DMs inside window) | N rows without URL balloon metadata | N turns | N turns (legacy merge on metadata-less builds) |
| Two people typing in a group chat | N rows from M senders | M+ turns (one per sender bucket) | M+ turns — group chats are not coalesced |
| User composes | `chat.db` produces | Flag off (default) | Flag on + window (imsg emits balloon metadata) |
| ------------------------------------------------------------------ | ----------------------------------- | --------------------------------------- | --------------------------------------------------------------------------------------------------- |
| `Dump https://example.com` (one send) | 2 rows ~1 s apart | Two agent turns: "Dump" alone, then URL | One turn: merged text `Dump https://example.com` |
| `Save this 📎image.jpg caption` (attachment + text) | 2 rows without URL balloon metadata | Two turns | Two turns after metadata is observed; one merged turn on old/pre-latch metadata-less sessions |
| `/status` (standalone command) | 1 row | Instant dispatch | **Wait up to window, then dispatch** |
| URL pasted alone | 1 row | Instant dispatch | Wait up to window, then dispatch |
| Text + URL sent as two deliberate separate messages, minutes apart | 2 rows outside window | Two turns | Two turns (window expires between them) |
| Rapid flood (>10 small DMs inside window) | N rows without URL balloon metadata | N turns | N turns after metadata is observed; one bounded merged turn on old/pre-latch metadata-less sessions |
| Two people typing in a group chat | N rows from M senders | M+ turns (one per sender bucket) | M+ turns — group chats are not coalesced |
## Inbound recovery after a bridge or gateway restart

View File

@@ -160,6 +160,7 @@ must be paired with `--lint`; regular doctor and repair runs reject them.
- State integrity and permissions checks (sessions, transcripts, state dir).
- Config file permission checks (chmod 600) when running locally.
- Model auth health: checks OAuth expiry, can refresh expiring tokens, and reports auth-profile cooldown/disabled states.
</Accordion>
<Accordion title="Gateway, services, and supervisors">
- Sandbox image repair when sandboxing is enabled.

View File

@@ -38,12 +38,40 @@ const debouncerControl = vi.hoisted(() => ({
holdEntries: false,
entries: [] as unknown[],
flush: undefined as undefined | (() => Promise<void>),
flushEach: undefined as undefined | (() => Promise<void>),
reset() {
this.holdEntries = false;
this.entries = [];
this.flush = undefined;
this.flushEach = undefined;
},
}));
const createChannelInboundDebouncerMock = vi.hoisted(() =>
vi.fn((opts: { onFlush: (entries: unknown[]) => Promise<void> }) => ({
debouncer: {
enqueue: async (entry: unknown) => {
if (!debouncerControl.holdEntries) {
await opts.onFlush([entry]);
return;
}
debouncerControl.entries.push(entry);
debouncerControl.flush = async () => {
const entries = debouncerControl.entries.splice(0);
await opts.onFlush(entries);
};
// Flush each collected entry as its own single-entry bucket, modeling
// the real non-debounced path (shouldDebounceTextInbound is mocked to
// false here) where every row dispatches individually.
debouncerControl.flushEach = async () => {
const entries = debouncerControl.entries.splice(0);
for (const queued of entries) {
await opts.onFlush([queued]);
}
};
},
},
})),
);
vi.mock("openclaw/plugin-sdk/transport-ready-runtime", () => ({
waitForTransportReady: waitForTransportReadyMock,
@@ -63,21 +91,7 @@ vi.mock("openclaw/plugin-sdk/channel-inbound", async (importOriginal) => {
const actual = await importOriginal<typeof import("openclaw/plugin-sdk/channel-inbound")>();
return {
...actual,
createChannelInboundDebouncer: vi.fn((opts) => ({
debouncer: {
enqueue: async (entry: unknown) => {
if (!debouncerControl.holdEntries) {
await opts.onFlush([entry]);
return;
}
debouncerControl.entries.push(entry);
debouncerControl.flush = async () => {
const entries = debouncerControl.entries.splice(0);
await opts.onFlush(entries);
};
},
},
})),
createChannelInboundDebouncer: createChannelInboundDebouncerMock,
shouldDebounceTextInbound: vi.fn(() => false),
};
});
@@ -108,6 +122,7 @@ describe("iMessage monitor last-route updates", () => {
readChannelAllowFromStoreMock.mockReset().mockResolvedValue([]);
recordInboundSessionMock.mockClear();
dispatchInboundMessageMock.mockClear();
createChannelInboundDebouncerMock.mockClear();
debouncerControl.reset();
clearCachedIMessagePrivateApiStatus();
});
@@ -213,6 +228,7 @@ describe("iMessage monitor last-route updates", () => {
config: {
channels: {
imessage: {
coalesceSameSenderDms: true,
dmPolicy: "allowlist",
allowFrom: ["+15550001111"],
sendReadReceipts: false,
@@ -382,6 +398,7 @@ describe("iMessage monitor last-route updates", () => {
config: {
channels: {
imessage: {
coalesceSameSenderDms: true,
dmPolicy: "allowlist",
allowFrom: ["+15550001111"],
sendReadReceipts: false,
@@ -1204,7 +1221,6 @@ describe("iMessage monitor last-route updates", () => {
is_from_me: false,
text: `missed during downtime ${id}`,
is_group: false,
balloon_bundle_id: "com.apple.messages.Handwriting",
created_at: thirtyMinAgo,
},
},
@@ -1238,7 +1254,7 @@ describe("iMessage monitor last-route updates", () => {
await vi.waitFor(() => {
expect(debouncerControl.entries).toHaveLength(2);
});
await debouncerControl.flush?.();
await debouncerControl.flushEach?.();
await vi.waitFor(() => {
expect(dispatchInboundMessageMock).toHaveBeenCalledTimes(2);
});
@@ -1277,7 +1293,6 @@ describe("iMessage monitor last-route updates", () => {
is_from_me: false,
text: `missed during downtime ${id}`,
is_group: false,
balloon_bundle_id: "com.apple.messages.Handwriting",
created_at: thirtyMinAgo,
},
},
@@ -1307,7 +1322,7 @@ describe("iMessage monitor last-route updates", () => {
expect(debouncerControl.entries).toHaveLength(2);
});
debouncerControl.entries.reverse();
await debouncerControl.flush?.();
await debouncerControl.flushEach?.();
await vi.waitFor(() => {
expect(dispatchInboundMessageMock).toHaveBeenCalledTimes(2);
});
@@ -1408,12 +1423,10 @@ describe("iMessage monitor last-route updates", () => {
expect(dispatchParams?.ctx.To).not.toBe("imessage:+15550001111");
});
it("legacy-merges coalesce buckets when imsg emits no balloon metadata (older builds)", async () => {
// Back-compat: older imsg builds emit no balloon_bundle_id, so a Dump + URL
// split-send arrives as two fieldless rows. We cannot structurally tell that
// apart from separate sends, so we preserve the pre-metadata merge rather
// than regress split-send users to two turns. Removed once imsg coalesces
// upstream (openclaw/imsg#141, tracked by #91243).
it("merges a command row with the following URL balloon row", async () => {
// Apple's command+URL composition can arrive as a command row followed by a
// URL-preview balloon row. The opt-in coalescer keeps the pair as one agent
// turn and uses balloon metadata to avoid collapsing ordinary rows.
debouncerControl.holdEntries = true;
let onNotification: ((message: { method: string; params: unknown }) => void) | undefined;
@@ -1426,97 +1439,18 @@ describe("iMessage monitor last-route updates", () => {
}),
waitForClose: vi.fn(async () => {
// Fresh dates relative to now so the stale-backlog age fence lets the
// live split-send through to the coalescer.
// live rows through to the debouncer.
for (const row of [
{
id: 91,
guid: "LIVE-GUID-91",
text: "Dump",
text: "summarize",
created_at: new Date(Date.now() - 2000).toISOString(),
},
{
id: 92,
guid: "LIVE-GUID-92",
text: "https://example.com",
created_at: new Date(Date.now() - 1000).toISOString(),
},
]) {
onNotification?.({
method: "message",
params: {
message: {
...row,
chat_id: 123,
sender: "+15550001111",
is_from_me: false,
is_group: false,
},
},
});
}
await vi.waitFor(() => {
expect(debouncerControl.flush).toBeDefined();
});
await debouncerControl.flush?.();
await Promise.resolve();
}),
stop: vi.fn(async () => {}),
};
createIMessageRpcClientMock.mockImplementation(async (params) => {
if (!params?.onNotification) {
throw new Error("expected iMessage notification handler");
}
onNotification = params.onNotification;
return client as never;
});
await monitorIMessageProvider({
config: {
channels: {
imessage: {
coalesceSameSenderDms: true,
dmPolicy: "allowlist",
allowFrom: ["+15550001111"],
sendReadReceipts: false,
},
},
messages: { inbound: { debounceMs: 2500 } },
session: { mainKey: "main" },
} as never,
runtime: { error: vi.fn(), exit: vi.fn(), log: vi.fn() },
});
expect(dispatchInboundMessageMock).toHaveBeenCalledTimes(1);
const mergedBody = dispatchInboundMessageMock.mock.calls[0]?.[0].ctx.Body ?? "";
expect(mergedBody).toContain("Dump");
expect(mergedBody).toContain("https://example.com");
});
it("merges coalesce buckets when imsg marks the URL balloon row structurally", async () => {
debouncerControl.holdEntries = true;
let onNotification: ((message: { method: string; params: unknown }) => void) | undefined;
const client = {
request: vi.fn(async (method: string) => {
if (method === "watch.subscribe") {
return { subscription: 1 };
}
throw new Error(`unexpected imsg method ${method}`);
}),
waitForClose: vi.fn(async () => {
// Fresh dates relative to now so the stale-backlog age fence lets the
// live split-send through to the coalescer.
for (const row of [
{
id: 93,
guid: "LIVE-GUID-93",
text: "Dump",
created_at: new Date(Date.now() - 2000).toISOString(),
},
{
id: 94,
guid: "LIVE-GUID-94",
text: "https://example.com",
text: "https://example.com/article",
balloon_bundle_id: "com.apple.messages.URLBalloonProvider",
created_at: new Date(Date.now() - 1000).toISOString(),
},
@@ -1560,15 +1494,388 @@ describe("iMessage monitor last-route updates", () => {
sendReadReceipts: false,
},
},
messages: { inbound: { debounceMs: 2500 } },
session: { mainKey: "main" },
} as never,
runtime: { error: vi.fn(), exit: vi.fn(), log: vi.fn() },
});
const debouncerOptions = createChannelInboundDebouncerMock.mock.calls.at(-1)?.[0] as
| { debounceMsOverride?: number }
| undefined;
expect(debouncerOptions?.debounceMsOverride).toBe(7000);
expect(dispatchInboundMessageMock).toHaveBeenCalledTimes(1);
expect(dispatchInboundMessageMock.mock.calls[0]?.[0].ctx.Body).toContain(
"Dump https://example.com",
);
const mergedBody = dispatchInboundMessageMock.mock.calls[0]?.[0].ctx.Body ?? "";
expect(mergedBody).toContain("summarize");
expect(mergedBody).toContain("https://example.com/article");
});
it("keeps ordinary buffered DMs separate after balloon metadata is observed", async () => {
debouncerControl.holdEntries = true;
let onNotification: ((message: { method: string; params: unknown }) => void) | undefined;
const client = {
request: vi.fn(async (method: string) => {
if (method === "watch.subscribe") {
return { subscription: 1 };
}
throw new Error(`unexpected imsg method ${method}`);
}),
waitForClose: vi.fn(async () => {
for (const row of [
{
id: 101,
guid: "LIVE-GUID-101",
text: "handwriting",
balloon_bundle_id: "com.apple.messages.HandwritingProvider",
created_at: new Date(Date.now() - 3000).toISOString(),
},
{
id: 102,
guid: "LIVE-GUID-102",
text: "first thought",
created_at: new Date(Date.now() - 2000).toISOString(),
},
{
id: 103,
guid: "LIVE-GUID-103",
text: "second thought",
created_at: new Date(Date.now() - 1000).toISOString(),
},
]) {
onNotification?.({
method: "message",
params: {
message: {
...row,
chat_id: 123,
sender: "+15550001111",
is_from_me: false,
is_group: false,
},
},
});
}
await vi.waitFor(() => {
expect(debouncerControl.flush).toBeDefined();
});
await debouncerControl.flush?.();
await Promise.resolve();
}),
stop: vi.fn(async () => {}),
};
createIMessageRpcClientMock.mockImplementation(async (params) => {
if (!params?.onNotification) {
throw new Error("expected iMessage notification handler");
}
onNotification = params.onNotification;
return client as never;
});
await monitorIMessageProvider({
config: {
channels: {
imessage: {
coalesceSameSenderDms: true,
dmPolicy: "allowlist",
allowFrom: ["+15550001111"],
sendReadReceipts: false,
},
},
session: { mainKey: "main" },
} as never,
runtime: { error: vi.fn(), exit: vi.fn(), log: vi.fn() },
});
expect(dispatchInboundMessageMock).toHaveBeenCalledTimes(3);
const bodies = dispatchInboundMessageMock.mock.calls.map((call) => call[0].ctx.Body ?? "");
expect(bodies.some((body) => body.includes("handwriting"))).toBe(true);
expect(bodies.some((body) => body.includes("first thought"))).toBe(true);
expect(bodies.some((body) => body.includes("second thought"))).toBe(true);
});
it("uses stale balloon rows as metadata support without dispatching them", async () => {
debouncerControl.holdEntries = true;
let onNotification: ((message: { method: string; params: unknown }) => void) | undefined;
const client = {
request: vi.fn(async (method: string) => {
if (method === "watch.subscribe") {
return { subscription: 1 };
}
throw new Error(`unexpected imsg method ${method}`);
}),
waitForClose: vi.fn(async () => {
for (const row of [
{
id: 201,
guid: "STALE-BALLOON-GUID-201",
text: "old handwriting",
balloon_bundle_id: "com.apple.messages.HandwritingProvider",
created_at: new Date(Date.now() - 60 * 60 * 1000).toISOString(),
},
{
id: 202,
guid: "LIVE-GUID-202",
text: "first fresh thought",
created_at: new Date(Date.now() - 2000).toISOString(),
},
{
id: 203,
guid: "LIVE-GUID-203",
text: "second fresh thought",
created_at: new Date(Date.now() - 1000).toISOString(),
},
]) {
onNotification?.({
method: "message",
params: {
message: {
...row,
chat_id: 123,
sender: "+15550001111",
is_from_me: false,
is_group: false,
},
},
});
}
await vi.waitFor(() => {
expect(debouncerControl.flush).toBeDefined();
});
await debouncerControl.flush?.();
await Promise.resolve();
}),
stop: vi.fn(async () => {}),
};
createIMessageRpcClientMock.mockImplementation(async (params) => {
if (!params?.onNotification) {
throw new Error("expected iMessage notification handler");
}
onNotification = params.onNotification;
return client as never;
});
await monitorIMessageProvider({
config: {
channels: {
imessage: {
coalesceSameSenderDms: true,
dbPath: path.join(os.tmpdir(), `openclaw-missing-chat-${Date.now()}.db`),
dmPolicy: "allowlist",
allowFrom: ["+15550001111"],
sendReadReceipts: false,
},
},
session: { mainKey: "main" },
} as never,
runtime: { error: vi.fn(), exit: vi.fn(), log: vi.fn() },
});
expect(dispatchInboundMessageMock).toHaveBeenCalledTimes(2);
const bodies = dispatchInboundMessageMock.mock.calls.map((call) => call[0].ctx.Body ?? "");
expect(bodies.some((body) => body.includes("old handwriting"))).toBe(false);
expect(bodies.some((body) => body.includes("first fresh thought"))).toBe(true);
expect(bodies.some((body) => body.includes("second fresh thought"))).toBe(true);
});
it("does not merge unrelated buffered rows into a following URL split-send", async () => {
debouncerControl.holdEntries = true;
let onNotification: ((message: { method: string; params: unknown }) => void) | undefined;
const client = {
request: vi.fn(async (method: string) => {
if (method === "watch.subscribe") {
return { subscription: 1 };
}
throw new Error(`unexpected imsg method ${method}`);
}),
waitForClose: vi.fn(async () => {
for (const row of [
{
id: 111,
guid: "LIVE-GUID-111",
text: "unrelated thought",
created_at: new Date(Date.now() - 3000).toISOString(),
},
{
id: 112,
guid: "LIVE-GUID-112",
text: "summarize",
created_at: new Date(Date.now() - 2000).toISOString(),
},
{
id: 113,
guid: "LIVE-GUID-113",
text: "https://example.com/article",
balloon_bundle_id: "com.apple.messages.URLBalloonProvider",
created_at: new Date(Date.now() - 1000).toISOString(),
},
]) {
onNotification?.({
method: "message",
params: {
message: {
...row,
chat_id: 123,
sender: "+15550001111",
is_from_me: false,
is_group: false,
},
},
});
}
await vi.waitFor(() => {
expect(debouncerControl.flush).toBeDefined();
});
await debouncerControl.flush?.();
await Promise.resolve();
}),
stop: vi.fn(async () => {}),
};
createIMessageRpcClientMock.mockImplementation(async (params) => {
if (!params?.onNotification) {
throw new Error("expected iMessage notification handler");
}
onNotification = params.onNotification;
return client as never;
});
await monitorIMessageProvider({
config: {
channels: {
imessage: {
coalesceSameSenderDms: true,
dmPolicy: "allowlist",
allowFrom: ["+15550001111"],
sendReadReceipts: false,
},
},
session: { mainKey: "main" },
} as never,
runtime: { error: vi.fn(), exit: vi.fn(), log: vi.fn() },
});
expect(dispatchInboundMessageMock).toHaveBeenCalledTimes(2);
const bodies = dispatchInboundMessageMock.mock.calls.map((call) => call[0].ctx.Body ?? "");
expect(bodies[0]).toContain("unrelated thought");
expect(bodies[0]).not.toContain("summarize");
expect(bodies[1]).toContain("summarize");
expect(bodies[1]).toContain("https://example.com/article");
expect(bodies[1]).not.toContain("unrelated thought");
});
it("does not merge unrelated buffered rows into an already-complete URL balloon message", async () => {
debouncerControl.holdEntries = true;
let onNotification: ((message: { method: string; params: unknown }) => void) | undefined;
const client = {
request: vi.fn(async (method: string) => {
if (method === "watch.subscribe") {
return { subscription: 1 };
}
throw new Error(`unexpected imsg method ${method}`);
}),
waitForClose: vi.fn(async () => {
for (const row of [
{
id: 211,
guid: "LIVE-GUID-211",
text: "unrelated thought",
created_at: new Date(Date.now() - 2000).toISOString(),
},
{
id: 212,
guid: "LIVE-GUID-212",
text: "summarize https://example.com/article",
balloon_bundle_id: "com.apple.messages.URLBalloonProvider",
created_at: new Date(Date.now() - 1000).toISOString(),
},
]) {
onNotification?.({
method: "message",
params: {
message: {
...row,
chat_id: 123,
sender: "+15550001111",
is_from_me: false,
is_group: false,
},
},
});
}
await vi.waitFor(() => {
expect(debouncerControl.flush).toBeDefined();
});
await debouncerControl.flush?.();
await Promise.resolve();
}),
stop: vi.fn(async () => {}),
};
createIMessageRpcClientMock.mockImplementation(async (params) => {
if (!params?.onNotification) {
throw new Error("expected iMessage notification handler");
}
onNotification = params.onNotification;
return client as never;
});
await monitorIMessageProvider({
config: {
channels: {
imessage: {
coalesceSameSenderDms: true,
dmPolicy: "allowlist",
allowFrom: ["+15550001111"],
sendReadReceipts: false,
},
},
session: { mainKey: "main" },
} as never,
runtime: { error: vi.fn(), exit: vi.fn(), log: vi.fn() },
});
expect(dispatchInboundMessageMock).toHaveBeenCalledTimes(2);
const bodies = dispatchInboundMessageMock.mock.calls.map((call) => call[0].ctx.Body ?? "");
expect(bodies[0]).toContain("unrelated thought");
expect(bodies[0]).not.toContain("summarize");
expect(bodies[1]).toContain("summarize");
expect(bodies[1]).toContain("https://example.com/article");
expect(bodies[1]).not.toContain("unrelated thought");
});
it("respects explicit iMessage inbound debounce timing", async () => {
const client = {
request: vi.fn(async (method: string) => {
if (method === "watch.subscribe") {
return { subscription: 1 };
}
throw new Error(`unexpected imsg method ${method}`);
}),
waitForClose: vi.fn(async () => {}),
stop: vi.fn(async () => {}),
};
createIMessageRpcClientMock.mockImplementation(async () => client as never);
await monitorIMessageProvider({
config: {
channels: {
imessage: {
coalesceSameSenderDms: true,
dmPolicy: "allowlist",
allowFrom: ["+15550001111"],
sendReadReceipts: false,
},
},
messages: { inbound: { byChannel: { imessage: 0 } } },
session: { mainKey: "main" },
} as never,
runtime: { error: vi.fn(), exit: vi.fn(), log: vi.fn() },
});
const debouncerOptions = createChannelInboundDebouncerMock.mock.calls.at(-1)?.[0] as
| { debounceMsOverride?: number }
| undefined;
expect(debouncerOptions?.debounceMsOverride).toBeUndefined();
});
});

View File

@@ -2,8 +2,8 @@
import { describe, expect, it } from "vitest";
import {
combineIMessagePayloads,
hasIMessageUrlBalloonBundleID,
IMESSAGE_URL_BALLOON_BUNDLE_ID,
isStandaloneIMessageUrlPreviewPayload,
MAX_COALESCED_ATTACHMENTS,
MAX_COALESCED_ENTRIES,
MAX_COALESCED_TEXT_CHARS,
@@ -24,52 +24,6 @@ const makePayload = (overrides: Partial<IMessagePayload> = {}): IMessagePayload
});
describe("combineIMessagePayloads", () => {
it("recognizes URL balloon rows from imsg structural metadata", () => {
const text = makePayload({ text: "Dump" });
const balloon = makePayload({
text: "https://example.com/article",
balloon_bundle_id: IMESSAGE_URL_BALLOON_BUNDLE_ID,
});
expect(hasIMessageUrlBalloonBundleID(text)).toBe(false);
expect(hasIMessageUrlBalloonBundleID(balloon)).toBe(true);
// A real URL split-send merges regardless of the session capability latch.
expect(shouldCombineIMessagePayloadBucket([text, balloon], false)).toBe(true);
expect(shouldCombineIMessagePayloadBucket([text, balloon], true)).toBe(true);
});
it("falls back to a legacy merge when the build has never emitted balloon metadata (older imsg)", () => {
// Older imsg builds emit no balloon_bundle_id at all. We cannot tell a URL
// split-send from separate sends, so we preserve the pre-metadata merge
// rather than regress split-send users to two turns. Back-compat path,
// removed once imsg coalesces upstream (openclaw/imsg#141, tracked by #91243).
const text = makePayload({ text: "Dump" });
const url = makePayload({ text: "https://example.com/article" });
expect(shouldCombineIMessagePayloadBucket([text, url], false)).toBe(true);
});
it("keeps a plain bucket separate once the build is known to emit balloon metadata", () => {
// Capability latch is true (a prior row this session carried metadata), so a
// plain bucket with no URL marker is genuinely not a split-send. imsg omits
// the field for plain rows, so this case is indistinguishable per-bucket and
// depends on the session-level signal.
const a = makePayload({ text: "first" });
const b = makePayload({ text: "second" });
expect(shouldCombineIMessagePayloadBucket([a, b], true)).toBe(false);
});
it("keeps a bucket separate when imsg exposes balloon metadata in the bucket but no URL marker", () => {
// New imsg surfaced balloon metadata in this very bucket, proving this build
// emits the field, but the bucket is not a URL split-send. Keep separate even
// if the latch had not flipped yet.
const text = makePayload({ text: "hi" });
const nonUrlBalloon = makePayload({
text: "tap to vote",
balloon_bundle_id: "com.apple.messages.MSMessageExtensionBalloonPlugin",
});
expect(shouldCombineIMessagePayloadBucket([text, nonUrlBalloon], false)).toBe(false);
});
it("throws on empty input", () => {
expect(() => combineIMessagePayloads([])).toThrow(
"combineIMessagePayloads: cannot combine empty payloads",
@@ -83,23 +37,22 @@ describe("combineIMessagePayloads", () => {
expect(result.guid).toBe("solo");
});
it("merges Dump + URL split-send into one payload anchored on the first GUID", () => {
const text = makePayload({
it("merges two same-sender rows into one payload anchored on the first GUID", () => {
const first = makePayload({
id: 41,
text: "Dump",
text: "summarize",
guid: "row-1",
created_at: "2025-01-01T00:00:00Z",
});
const balloon = makePayload({
const second = makePayload({
id: 42,
text: "https://example.com/article",
balloon_bundle_id: IMESSAGE_URL_BALLOON_BUNDLE_ID,
guid: "row-2",
created_at: "2025-01-01T00:00:01.500Z",
});
const merged = combineIMessagePayloads([text, balloon]);
const merged = combineIMessagePayloads([first, second]);
expect(merged.text).toBe("Dump https://example.com/article");
expect(merged.text).toBe("summarize https://example.com/article");
expect(merged.guid).toBe("row-1");
expect(merged.created_at).toBe("2025-01-01T00:00:01.500Z");
expect(merged.coalescedMessageGuids).toEqual(["row-1", "row-2"]);
@@ -209,3 +162,87 @@ describe("combineIMessagePayloads", () => {
expect(MAX_COALESCED_ENTRIES).toBeGreaterThan(1);
});
});
describe("isStandaloneIMessageUrlPreviewPayload", () => {
it("matches URL balloon rows that only carry the preview URL", () => {
expect(
isStandaloneIMessageUrlPreviewPayload(
makePayload({
text: "https://example.com/article",
balloon_bundle_id: IMESSAGE_URL_BALLOON_BUNDLE_ID,
}),
),
).toBe(true);
});
it("matches scheme-less www URL preview rows", () => {
expect(
isStandaloneIMessageUrlPreviewPayload(
makePayload({
text: "www.example.com/article",
balloon_bundle_id: IMESSAGE_URL_BALLOON_BUNDLE_ID,
}),
),
).toBe(true);
});
it("does not match already-complete URL balloon messages with text context", () => {
expect(
isStandaloneIMessageUrlPreviewPayload(
makePayload({
text: "summarize https://example.com/article",
balloon_bundle_id: IMESSAGE_URL_BALLOON_BUNDLE_ID,
}),
),
).toBe(false);
});
it("does not match non-URL balloon payloads", () => {
expect(
isStandaloneIMessageUrlPreviewPayload(
makePayload({
text: "https://example.com/article",
balloon_bundle_id: "com.apple.messages.HandwritingProvider",
}),
),
).toBe(false);
});
});
describe("shouldCombineIMessagePayloadBucket", () => {
it("combines a command row with a structurally marked URL balloon row", () => {
const command = makePayload({ text: "summarize", guid: "row-1" });
const preview = makePayload({
text: "https://example.com/article",
guid: "row-2",
balloon_bundle_id: IMESSAGE_URL_BALLOON_BUNDLE_ID,
});
expect(shouldCombineIMessagePayloadBucket([command, preview], true)).toBe(true);
});
it("keeps ordinary buffered rows separate once the bridge emits balloon metadata", () => {
const first = makePayload({ text: "first thought", guid: "row-1" });
const second = makePayload({ text: "second thought", guid: "row-2" });
expect(shouldCombineIMessagePayloadBucket([first, second], true)).toBe(false);
});
it("keeps non-URL balloon rows separate", () => {
const first = makePayload({ text: "first thought", guid: "row-1" });
const second = makePayload({
text: "second thought",
guid: "row-2",
balloon_bundle_id: "com.apple.messages.HandwritingProvider",
});
expect(shouldCombineIMessagePayloadBucket([first, second], false)).toBe(false);
});
it("falls back to combining old bridge buckets with no balloon metadata", () => {
const command = makePayload({ text: "summarize", guid: "row-1" });
const url = makePayload({ text: "https://example.com/article", guid: "row-2" });
expect(shouldCombineIMessagePayloadBucket([command, url], false)).toBe(true);
});
});

View File

@@ -1,10 +1,10 @@
// Imessage plugin module implements coalesce behavior.
// Imessage plugin module implements the same-sender inbound debounce merge.
import type { IMessagePayload } from "./types.js";
// Keep the coalescing contract narrow (caps, ID tracking, reply-context
// preference) so a future SDK lift into `openclaw/plugin-sdk/channel-inbound`
// is a mechanical extraction instead of a behavioral redesign. Apple's
// split-send pipeline is the behavior this protects.
// Keep the merge contract narrow (caps, ID tracking, reply-context preference)
// so a future SDK lift into `openclaw/plugin-sdk/channel-inbound` is a
// mechanical extraction instead of a behavioral redesign. Apple's URL-preview
// split-send pipeline is the iMessage-only behavior this still protects.
/**
* Bounds on the merged output when multiple inbound iMessage payloads are
@@ -22,52 +22,57 @@ export function hasIMessageUrlBalloonBundleID(payload: IMessagePayload): boolean
return payload.balloon_bundle_id === IMESSAGE_URL_BALLOON_BUNDLE_ID;
}
// imsg only emits `balloon_bundle_id` for rows that actually carry a balloon
// (the nil case is omitted on the wire), so a present, non-empty value is the
// signal that this build exposes balloon metadata at all.
function isSingleUrlToken(text: string): boolean {
if (/\s/.test(text)) {
return false;
}
if (/^www\.[^\s.]+\.[^\s]+$/i.test(text)) {
return true;
}
try {
const url = new URL(text);
return url.protocol === "http:" || url.protocol === "https:";
} catch {
return false;
}
}
export function isStandaloneIMessageUrlPreviewPayload(payload: IMessagePayload): boolean {
if (!hasIMessageUrlBalloonBundleID(payload)) {
return false;
}
const text = (payload.text ?? "").trim();
return text.length === 0 || isSingleUrlToken(text);
}
// imsg omits `balloon_bundle_id` for non-balloon rows, so a present value is
// the session signal that this bridge build exposes structural balloon
// metadata. Once latched, missing URL metadata is meaningful.
export function hasIMessageBalloonMetadata(payload: IMessagePayload): boolean {
return typeof payload.balloon_bundle_id === "string" && payload.balloon_bundle_id.length > 0;
}
/**
* Decide whether a debounced same-sender bucket should merge into one turn.
* Decide whether a debounced same-sender iMessage bucket should merge.
*
* `buildEmitsBalloonMetadata` is a session-level capability latch: once any
* inbound row from this imsg build has carried balloon metadata, absence of a
* URL marker is meaningful (the row genuinely is not a URL split-send), so we
* can keep ordinary buffered DMs separate. It must be session-scoped, not
* per-bucket: imsg omits `balloon_bundle_id` on the wire for non-balloon rows,
* so a bucket of plain text rows looks identical on old and new builds.
* URL-preview rows are merged with their preceding command row so Apple's
* command+URL split-send still reaches the agent as one turn. Once a bridge
* session has emitted balloon metadata, ordinary same-sender DMs without the
* URL marker flush separately instead of being collapsed.
*/
export function shouldCombineIMessagePayloadBucket(
payloads: readonly IMessagePayload[],
buildEmitsBalloonMetadata: boolean,
): boolean {
// Precise path: a real Apple URL-preview split-send carries the URL-balloon
// marker on the preview row — merge it into one turn.
if (payloads.some(hasIMessageUrlBalloonBundleID)) {
return true;
}
// Metadata-capable build (observed earlier this session or in this bucket):
// the missing URL marker is trustworthy, so keep ordinary buffered DMs as
// separate turns. This is the precision the structural gate exists for.
if (buildEmitsBalloonMetadata || payloads.some(hasIMessageBalloonMetadata)) {
return false;
}
// Back-compat (remove once imsg coalesces split-sends upstream — see
// openclaw/imsg#141, tracked by #91243): a build that has never emitted any
// balloon metadata cannot structurally tell a `Dump <url>` split-send from
// separate sends. Preserve the pre-metadata merge so split-send users do not
// regress to two turns on a released imsg that lacks the field.
//
// This never merges more than the shipped behavior already did: with
// `coalesceSameSenderDms` enabled, `main` debounces every same-sender DM and
// merges each multi-entry bucket unconditionally. So an unlatched session
// (old build, or a metadata-capable build before its first balloon row) is
// identical to today, not a new regression. Flushing these buckets instead
// would re-break old-imsg split-sends — the very case this guards. Fully
// closing the pre-latch window needs an imsg-advertised capability flag, which
// is part of the upstream #141 work.
// Older imsg builds expose no balloon metadata, so a command+URL split-send
// is indistinguishable from two ordinary text rows. Keep the internal fallback
// until imsg advertises upstream coalescing for that exact shape.
return true;
}
@@ -86,9 +91,8 @@ export type CoalescedIMessagePayload = IMessagePayload & {
/**
* Combine consecutive same-sender iMessage payloads into a single payload for
* downstream dispatch. Used when the debouncer flushes a bucket containing
* more than one event — e.g. Apple's split-send for `Dump https://example.com`
* arriving as two separate `chat.db` rows ~0.8-2.0 s apart.
* downstream dispatch. Used for Apple's URL-preview split-send, and for the
* general inbound debounce (`messages.inbound`, off by default) when configured.
*
* The first payload anchors the merged shape (preserving its GUID for reply
* threading). Text is concatenated with deduplication, attachments are merged

View File

@@ -70,6 +70,8 @@ import { advanceIMessageCatchupCursor, resolveCatchupConfig } from "./catchup.js
import {
combineIMessagePayloads,
hasIMessageBalloonMetadata,
hasIMessageUrlBalloonBundleID,
isStandaloneIMessageUrlPreviewPayload,
shouldCombineIMessagePayloadBucket,
} from "./coalesce.js";
import { repairIMessageConversationAnchor } from "./conversation-repair.js";
@@ -113,12 +115,31 @@ const APPROVAL_REACTION_POLL_INTERVAL_MS = 2_000;
const APPROVAL_REACTION_DISCOVERY_INTERVAL_MS = 60_000;
const IMESSAGE_TYPING_KEEPALIVE_INTERVAL_MS = 8_000;
const IMESSAGE_TYPING_KEEPALIVE_MAX_DURATION_MS = 10 * 60_000;
const IMESSAGE_SPLIT_SEND_COMPAT_DEBOUNCE_MS = 7_000;
type IMessageTypingController = Parameters<NonNullable<GetReplyOptions["onTypingController"]>>[0];
function resolveConfiguredIMessageTypingMode(cfg: OpenClawConfig) {
return cfg.session?.typingMode ?? cfg.agents?.defaults?.typingMode;
}
function resolveIMessageSplitSendCompatDebounceMs(
cfg: OpenClawConfig,
coalesceSameSenderDms: boolean,
): number | undefined {
if (!coalesceSameSenderDms) {
return undefined;
}
const inbound = cfg.messages?.inbound;
const channelOverride = inbound?.byChannel?.imessage;
if (typeof channelOverride === "number" && Number.isFinite(channelOverride)) {
return undefined;
}
if (typeof inbound?.debounceMs === "number" && Number.isFinite(inbound.debounceMs)) {
return undefined;
}
return IMESSAGE_SPLIT_SEND_COMPAT_DEBOUNCE_MS;
}
function isIMessagePluginPayloadAttachment(attachment: {
original_path?: string | null;
transfer_name?: string | null;
@@ -457,23 +478,11 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
: recoveryCursorRowid
: recoveryBoundaryRowid;
// When `coalesceSameSenderDms` is enabled and the user has not set an
// explicit inbound debounce for this channel, widen the window to 2500 ms.
// Apple's split-send for `<command> <URL>` arrives ~0.8-2.0 s apart on most
// setups, so the legacy 0 ms default would flush the command alone before
// the URL row reaches the debouncer.
const coalesceSameSenderDms = imessageCfg.coalesceSameSenderDms === true;
const inboundCfg = cfg.messages?.inbound;
const hasExplicitInboundDebounce =
typeof inboundCfg?.debounceMs === "number" ||
typeof inboundCfg?.byChannel?.imessage === "number";
const debounceMsOverride =
coalesceSameSenderDms && !hasExplicitInboundDebounce ? 2500 : undefined;
const debounceMsOverride = resolveIMessageSplitSendCompatDebounceMs(cfg, coalesceSameSenderDms);
// Session capability latch: flips true once any inbound row from this imsg
// build carries balloon metadata. The coalesce flush gate needs a build-level
// (not per-bucket) signal because imsg omits `balloon_bundle_id` for plain
// rows, so a bucket of plain text looks identical on old and new builds.
// signal because imsg omits `balloon_bundle_id` for plain rows.
let imsgEmitsBalloonMetadata = false;
let recoveryCursorHoldBeforeRowid: number | null = null;
let latestAdvancedRecoveryCursorRowid = recoveryCursorRowid ?? -1;
@@ -585,8 +594,8 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
message: IMessagePayload;
// Exact replay-guard key claimed for this row at ingestion (GUID or, for a
// GUID-less row, the composite fallback). Carried through so flush commits
// or releases the same key it claimed, even after coalescing rewrites the
// payload identity. null when the row had no derivable key (fail open).
// or releases the same key it claimed, even after a debounce merge rewrites
// the payload identity. null when the row had no derivable key (fail open).
replayKey: string | null;
}>({
cfg,
@@ -603,10 +612,6 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
? `chat:${msg.chat_id}`
: (msg.chat_guid ?? msg.chat_identifier ?? "unknown");
// With coalesceSameSenderDms enabled, DMs key on chat:sender so Apple's
// split text row and URL-balloon row land in the same bucket. The flush
// path still requires imsg's structural balloon metadata before merging.
// Group chats keep the legacy key to preserve multi-user turn structure.
if (coalesceSameSenderDms && msg.is_group !== true) {
return `imessage:${accountInfo.accountId}:dm:${conversationId}:${sender}`;
}
@@ -623,15 +628,14 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
return false;
}
// Hold opt-in DMs long enough for a following URL-balloon row to arrive.
// The flush gate (shouldCombineIMessagePayloadBucket) decides merge vs.
// separate: it merges precisely on imsg's balloon marker, and falls back
// to a legacy merge only when the build emits no balloon metadata at all.
// Opt-in DM coalescing holds rows long enough for Apple's command+URL
// split-send to arrive. Group chats keep instant per-message dispatch.
if (coalesceSameSenderDms) {
return msg.is_group !== true;
}
// Legacy gate: text-only, no control commands, no media.
// General same-sender inbound debounce: text-only, no control commands,
// no media. Off by default unless messages.inbound is configured.
return shouldDebounceTextInbound({
text: msg.text,
cfg,
@@ -644,7 +648,7 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
if (entries.length === 0) {
return;
}
// Dispatch one unit (a single row or a coalesced bucket), then commit the
// Dispatch one unit (a single row or a merged bucket), then commit the
// exact replay keys that were claimed at ingestion, or release them if
// dispatch throws so a transient failure can retry on a later re-emit. Per
// unit so a failure in one bucket entry cannot strand another's claim.
@@ -687,13 +691,47 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
}
return;
}
// The bucket-level gate only says this window contains URL-balloon work.
// Standalone URL preview rows merge with the immediately preceding row;
// already-complete URL messages flush any pending ordinary row first.
if (messages.some(hasIMessageUrlBalloonBundleID)) {
let pending: { message: IMessagePayload; replayKey: string | null } | null = null;
for (const entry of entries) {
if (isStandaloneIMessageUrlPreviewPayload(entry.message) && pending) {
const unitEntries = [pending, entry];
await dispatchUnit(
unitEntries,
combineIMessagePayloads(unitEntries.map((e) => e.message)),
);
pending = null;
continue;
}
if (hasIMessageUrlBalloonBundleID(entry.message)) {
if (pending) {
await dispatchUnit([pending], pending.message);
pending = null;
}
await dispatchUnit([entry], entry.message);
continue;
}
if (pending) {
await dispatchUnit([pending], pending.message);
}
pending = entry;
}
if (pending) {
await dispatchUnit([pending], pending.message);
}
return;
}
const combined = combineIMessagePayloads(messages);
if (shouldLogVerbose()) {
const text = combined.text ?? "";
const preview = text.slice(0, 50);
const ellipsis = text.length > 50 ? "..." : "";
logVerbose(`[imessage] coalesced ${entries.length} messages: "${preview}${ellipsis}"`);
logVerbose(
`[imessage] merged ${entries.length} debounced messages: "${preview}${ellipsis}"`,
);
}
await dispatchUnit(entries, combined);
},
@@ -1332,8 +1370,6 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
runtime.error?.(`imessage: dropping malformed RPC message payload (keys=${shape})`);
return;
}
// Latch build capability from any row that carries balloon metadata so the
// coalesce flush gate can trust a missing URL marker on later plain buckets.
if (!imsgEmitsBalloonMetadata && hasIMessageBalloonMetadata(message)) {
imsgEmitsBalloonMetadata = true;
}

View File

@@ -68,28 +68,6 @@ describe("parseIMessageNotification", () => {
expect(parsed?.reacted_to_guid).toBe("target-guid");
});
it("preserves imsg balloon bundle metadata when present", () => {
const parsed = parseIMessageNotification({
message: {
id: 1,
guid: "link-preview-guid",
chat_id: 2,
sender: "+10000000000",
is_from_me: false,
text: "https://example.com/article",
balloon_bundle_id: "com.apple.messages.URLBalloonProvider",
attachments: null,
chat_identifier: null,
chat_guid: null,
chat_name: null,
participants: null,
is_group: false,
},
});
expect(parsed?.balloon_bundle_id).toBe("com.apple.messages.URLBalloonProvider");
});
it("accepts iMessage attachment transfer_name and uti metadata", () => {
const parsed = parseIMessageNotification({
message: {
@@ -122,4 +100,27 @@ describe("parseIMessageNotification", () => {
uti: "com.apple.messages.pluginPayloadAttachment",
});
});
it("preserves imsg balloon bundle metadata when present", () => {
const parsed = parseIMessageNotification({
message: {
id: 1,
guid: "link-preview-guid",
chat_id: 2,
sender: "+10000000000",
destination_caller_id: null,
balloon_bundle_id: "com.apple.messages.URLBalloonProvider",
is_from_me: false,
text: "https://example.com/article",
attachments: null,
chat_identifier: null,
chat_guid: null,
chat_name: null,
participants: null,
is_group: false,
},
});
expect(parsed?.balloon_bundle_id).toBe("com.apple.messages.URLBalloonProvider");
});
});

View File

@@ -116,10 +116,11 @@ export type IMessageAccountConfig = {
/**
* Merge consecutive same-sender DM rows from `chat.db` into a single agent
* turn, so Apple's split-send (`<command> <URL>` arriving as two separate
* rows ~0.8-2.0 s apart) lands as one merged message. DM-only — group chats
* rows several seconds apart) lands as one merged message. DM-only — group chats
* keep instant per-message dispatch. Widens the default inbound debounce
* window to 2500 ms when enabled without an explicit
* `messages.inbound.byChannel.imessage`. Default: `false`.
* window to 7000 ms when enabled without an explicit
* `messages.inbound.byChannel.imessage` or global
* `messages.inbound.debounceMs`. Default: `false`.
*/
coalesceSameSenderDms?: boolean;
groups?: Record<