Feat(diagnostics): Expose slot plan state

Build per-refresh DesiredPlan diagnostics snapshots in
reconciliation.py so every managed slot's desired identity key,
actual classification, pending action, blocked reason, retry count,
and last error are captured in plan.diagnostics.  Per-reservation
entries record selected/protected/overflow status, missing_count,
assigned slot, uid_aliases, and booking_aliases without any raw
slot codes or PINs.  Optional entry_id, lockname, and start_slot
keyword arguments supply coordinator-scope metadata.  The diagnostics
building is extracted into _build_plan_diagnostics_snapshot to keep
compute_desired_plan within maintainable bounds.

Add last_error to ManagedSlot so failed-operation errors from the
apply phase survive into the next refresh cycle and propagate into
the per-slot diagnostics snapshot.

In EventOverrides, add _last_slot_errors and _diagnostics_snapshot
fields.  Record errors in _apply_clear and _apply_set on failure;
clear them on confirmed success.  update_diagnostics_snapshot builds
a structured snapshot covering matched slots, pending corrections,
blocked clear reasons, slot_retry_counts, pending_clear_slots, and
last_slot_errors, and is called automatically at the end of
async_apply_plan via the finally block.  diagnostics_snapshot and
get_last_slot_error expose the data read-only for the coordinator
and HA diagnostics collection.

In the coordinator, _observe_managed_slots now uses an explicit
persisted_mapping variable to avoid chained dict.get fallbacks,
_apply_checkin_protection resolves hass.data lookup in two explicit
steps, compute_desired_plan receives entry_id/lockname/start_slot
for metadata context, and latest_reconciliation_diagnostics merges
plan diagnostics with the EventOverrides snapshot while stripping
slot_code/pin/code keys as a safety net.

Also add .aislop/baseline.json (set at score 100 on HEAD) and the
corresponding REUSE.toml annotation to keep the REUSE Specification
3.3 compliance intact.

Tests (T091-T094):
- test_event_overrides.py: TestDiagnosticsSnapshot verifies matched
  slots, pending corrections, blocked reasons, retry counts, last
  errors, error clearing on success, no raw codes, plan_id/timestamp,
  and pending_clear_slots list.
- test_slot_reconciliation.py: TestComputeDesiredPlanDiagnostics
  verifies plan_id/generated_at, optional entry_id/lockname/start_slot,
  per-reservation selected/protected/overflow/missing_count/assigned_
  slot/uid_aliases/booking_aliases, slot_code absent, per-slot
  last_error from ManagedSlot, and overflow_details preservation.
- test_keymaster_event_diagnostics.py: TestRedactionCompatibility
  verifies the keymaster event buffer has no slot_code/PIN fields
  and both the EventOverrides snapshot and compute_desired_plan
  diagnostics exclude raw codes.
- test_refresh_cycle.py: TestDiagnosticsDesiredVsActual runs a
  focused mock integration scenario with a matched slot, overflow
  reservation, and pending-clear slot to confirm diagnostics capture
  all states and contain no raw codes; also covers the manual-drift
  (CLEAR action for wrong occupant) diagnostic path.

Co-authored-by: Claude <claude@anthropic.com>
Signed-off-by: Andrew Grimberg <tykeal@bardicgrove.org>
This commit is contained in:
Andrew Grimberg
2026-06-19 20:50:30 -07:00
parent d8ad69124d
commit 8d8fe0ad7f
9 changed files with 1228 additions and 12 deletions

11
.aislop/baseline.json Normal file
View File

@@ -0,0 +1,11 @@
{
"schema": "aislop.baseline.v2",
"updatedAt": "2026-06-20T03:43:48.791Z",
"score": 100,
"byEngine": {
"code-quality": 100,
"ai-slop": 100
},
"fileCount": 17,
"findingFingerprints": []
}

View File

@@ -35,3 +35,9 @@ path = "uv.lock"
precedence = "aggregate"
SPDX-FileCopyrightText = "2021 Andrew Grimberg <tykeal@bardicgrove.org>"
SPDX-License-Identifier = "Apache-2.0"
[[annotations]]
path = ".aislop/baseline.json"
precedence = "aggregate"
SPDX-FileCopyrightText = "2026 Andrew Grimberg <tykeal@bardicgrove.org>"
SPDX-License-Identifier = "Apache-2.0"

View File

@@ -319,10 +319,30 @@ Please update Keymaster to at least v0.1.0-b0
@property
def latest_reconciliation_diagnostics(self) -> dict[str, Any]:
"""Return diagnostics dict from latest plan."""
if self._latest_plan is None:
return {}
return dict(self._latest_plan.diagnostics)
"""Return a combined diagnostics snapshot from the latest plan.
Merges the plan-level diagnostics (per-slot desired/actual/action/
retry_count/last_error and per-reservation selected/overflow/aliases)
with the :class:`~.event_overrides.EventOverrides` diagnostics snapshot
(matched_slots, pending_corrections, pending_clear_slots,
slot_retry_counts, last_slot_errors).
Raw PIN / slot-code values are never included: ``slot_code``, ``pin``,
and ``code`` keys are stripped before returning.
Returns:
Combined diagnostics dict; empty when no plan has been computed.
"""
result: dict[str, Any] = {}
if self._latest_plan is not None:
result.update(self._latest_plan.diagnostics)
if self.event_overrides is not None:
result["event_overrides"] = self.event_overrides.diagnostics_snapshot
# Safety: strip any raw code/PIN values that might have leaked
result.pop("slot_code", None)
result.pop("pin", None)
result.pop("code", None)
return result
def get_slot_assignment(self, identity_key: str) -> int | None:
"""Return slot number assigned to identity_key in latest plan, or None."""
@@ -944,13 +964,17 @@ Please update Keymaster to at least v0.1.0-b0
actual_end = dt.parse_datetime(end_dt_state.state)
persisted_key = slot_to_persisted_key.get(i)
persisted_mapping = (
persisted.get(persisted_key) if persisted_key is not None else None
)
persisted_status = (
persisted_mapping.get("status")
if persisted_mapping is not None
else None
)
if i in pending_clear:
status = _SlotStatus.PENDING_CLEAR
elif (
persisted_key is not None
and persisted.get(persisted_key, {}).get("status")
== SLOT_STATUS_BLOCKED
):
elif persisted_status == SLOT_STATUS_BLOCKED:
status = _SlotStatus.BLOCKED
elif name_value and has_code:
status = _SlotStatus.OCCUPIED
@@ -970,6 +994,7 @@ Please update Keymaster to at least v0.1.0-b0
date_range_enabled=date_range_on,
enabled=enabled,
persisted_identity_key=persisted_key,
last_error=self.event_overrides.get_last_slot_error(i),
)
slots.append(ms)
@@ -1005,8 +1030,9 @@ Please update Keymaster to at least v0.1.0-b0
reservations: Mutable list of reservations for the current
refresh cycle. Modified in-place.
"""
entry_data: dict[str, Any] = self.hass.data.get(DOMAIN, {}).get(
self._entry_id, {}
domain_data: dict[str, Any] | None = self.hass.data.get(DOMAIN)
entry_data: dict[str, Any] = (
domain_data.get(self._entry_id, {}) if domain_data is not None else {}
)
checkin_sensor = entry_data.get(CHECKIN_SENSOR)
if checkin_sensor is None:
@@ -1158,6 +1184,9 @@ Please update Keymaster to at least v0.1.0-b0
max_events=self.max_events,
plan_id=plan_id,
generated_at=dt.now(),
entry_id=self._entry_id,
lockname=self.lockname,
start_slot=self.start_slot,
)
violations = plan.validate()
@@ -1185,7 +1214,6 @@ Please update Keymaster to at least v0.1.0-b0
"Reconciliation failed for %s; skipping cycle", self._name
)
# Save store after each reconciliation cycle
await self.async_save_slot_store()
# Refresh child lock discovery each cycle
@@ -1560,3 +1588,6 @@ Please update Keymaster to at least v0.1.0-b0
_LOGGER.debug("Event to add: %s", cal_event)
return cal_event
# test

View File

@@ -160,6 +160,11 @@ class EventOverrides:
self._actual_state_cache: dict[int, dict[str, Any]] = {}
self._reconciliation_active: bool = False
# Per-slot error tracking for diagnostics (T096)
self._last_slot_errors: dict[int, str] = {}
# Latest diagnostics snapshot for HA diagnostics collection (T096)
self._diagnostics_snapshot: dict[str, Any] = {}
@property
def max_slots(self) -> int:
"""Return the max_slots known."""
@@ -235,6 +240,92 @@ class EventOverrides:
"""True while async_apply_plan is executing."""
return self._reconciliation_active
@property
def diagnostics_snapshot(self) -> dict[str, Any]:
"""Return the latest diagnostics snapshot for HA diagnostics collection.
The snapshot is built after each :meth:`async_apply_plan` call and
captures matched slots, pending corrections, blocked clear reasons,
retry counts, and last errors. Raw slot codes are never included.
Returns:
A shallow copy of the current diagnostics snapshot dict, or an
empty dict if no plan has been applied yet.
"""
return dict(self._diagnostics_snapshot)
def get_last_slot_error(self, slot: int) -> str | None:
"""Return the last error string for *slot*, or ``None`` if no error.
Args:
slot: Keymaster slot number.
Returns:
The last recorded error string for the slot, or ``None``.
"""
return self._last_slot_errors.get(slot)
def _record_slot_error(self, slot: int, error: str) -> None:
"""Record a failed operation error for *slot*.
Args:
slot: Keymaster slot number.
error: Human-readable error description.
"""
self._last_slot_errors[slot] = error
def _clear_slot_error(self, slot: int) -> None:
"""Clear the recorded error for *slot* on a successful operation.
Args:
slot: Keymaster slot number.
"""
self._last_slot_errors.pop(slot, None)
def update_diagnostics_snapshot(self, plan: "DesiredPlan") -> None:
"""Build and store a diagnostics snapshot from the completed plan.
Called after :meth:`async_apply_plan` completes. Captures matched
slots (those with desired assignments), pending corrections (slots
with ``retry_clear`` or ``blocked`` actions), blocked clear reasons,
per-slot retry counts, and last errors. Raw slot codes are never
included.
Args:
plan: The :class:`~.reconciliation.DesiredPlan` that was just applied.
"""
matched: dict[int, dict[str, Any]] = {}
pending_corrections: dict[int, dict[str, Any]] = {}
for slot_num, ps in plan.slots.items():
if ps.desired_identity_key is not None:
matched[slot_num] = {
"identity_key": ps.desired_identity_key,
"action": ps.action.value,
}
if ps.action.value in (
ActionKind.RETRY_CLEAR.value,
ActionKind.BLOCKED.value,
):
pending_corrections[slot_num] = {
"action": ps.action.value,
"blocked_reason": ps.pending_reason,
"retry_count": ps.retry_count,
}
self._diagnostics_snapshot = {
"plan_id": plan.plan_id,
"generated_at": plan.generated_at.isoformat(),
"matched_slots": matched,
"pending_corrections": pending_corrections,
"pending_clear_slots": sorted(self._pending_clear_slots.keys()),
"slot_retry_counts": {
slot: self._retry_counts.get(slot, 0)
for slot in range(self._start_slot, self._start_slot + self._max_slots)
},
"last_slot_errors": dict(self._last_slot_errors),
}
def load_persisted_mappings(self, mappings: dict[str, dict[str, Any]]) -> None:
"""Load persisted slot mappings from the HA Store.
@@ -697,6 +788,7 @@ class EventOverrides:
results.append(result)
finally:
self.update_diagnostics_snapshot(plan)
async with self._lock:
self._reconciliation_active = False
@@ -741,6 +833,7 @@ class EventOverrides:
self._overrides[slot] = None
self._slot_uids.pop(slot, None)
self._slot_miss_counts.pop(slot, None)
self._clear_slot_error(slot)
self.__assign_next_slot()
elif result.failed:
_LOGGER.warning(
@@ -748,6 +841,7 @@ class EventOverrides:
slot,
result.error,
)
self._record_slot_error(slot, result.error or "clear failed")
elif result.lingering_name or result.lingering_pin:
_LOGGER.warning(
"Clear not fully confirmed for slot %d "
@@ -757,6 +851,11 @@ class EventOverrides:
result.lingering_name,
result.lingering_pin,
)
self._record_slot_error(
slot,
f"lingering state after clear: "
f"name={result.lingering_name} pin={result.lingering_pin}",
)
else:
_LOGGER.debug(
"Clear unconfirmed for slot %d; slot remains pending-clear", slot
@@ -810,6 +909,7 @@ class EventOverrides:
res.identity_key,
)
self._pending_fences.pop(slot, None)
self._clear_slot_error(slot)
self.__assign_next_slot()
elif result.failed:
_LOGGER.warning(
@@ -820,6 +920,7 @@ class EventOverrides:
self._pending_fences.pop(slot, None)
self._overrides[slot] = None
self._slot_uids.pop(slot, None)
self._record_slot_error(slot, result.error or "set failed")
self.__assign_next_slot()
else:
_LOGGER.debug(

View File

@@ -283,6 +283,10 @@ class ManagedSlot:
callback echoes.
dirty_during_operation: True when a callback observed state
while an operation token was pending.
last_error: Description of the most recent failed operation for
this slot, if any. Populated by the apply-plan phase and
carried forward to the next refresh cycle for diagnostics.
Not persisted in the HA Store.
"""
slot: int
@@ -300,6 +304,7 @@ class ManagedSlot:
retry_count: int = 0
last_operation_id: str | None = None
dirty_during_operation: bool = False
last_error: str | None = None
@dataclass(slots=True)
@@ -1204,12 +1209,104 @@ def _build_slot_action(
return ActionKind.CLEAR, None
def _build_plan_diagnostics_snapshot(
plan: DesiredPlan,
reservations: list[Reservation],
max_events: int,
*,
entry_id: str | None = None,
lockname: str | None = None,
start_slot: int | None = None,
) -> dict[str, Any]:
"""Build a comprehensive diagnostics snapshot for *plan*.
Produces a dict capturing plan metadata, per-slot desired/actual/action/
blocked_reason/retry_count/last_error, and per-reservation
selected/protected/overflow/missing_count/assigned_slot/uid_aliases/
booking_aliases. Raw slot codes are deliberately excluded.
Called once per refresh at the end of :func:`compute_desired_plan`
after ``plan.slots`` and ``plan.actions`` are fully populated.
Args:
plan: The partially-built :class:`DesiredPlan` whose
:attr:`~DesiredPlan.slots`, :attr:`~DesiredPlan.selected`,
:attr:`~DesiredPlan.protected`, and :attr:`~DesiredPlan.overflow`
are already set.
reservations: All current reservations (eligible and ineligible).
max_events: Maximum number of reservations that can be assigned.
entry_id: Optional config-entry scope for diagnostics context.
lockname: Optional Keymaster lock name for diagnostics context.
start_slot: Optional managed-range start for diagnostics context.
Returns:
Populated diagnostics dict suitable for
:attr:`DesiredPlan.diagnostics`.
"""
existing_diag: dict[str, Any] = dict(plan.diagnostics)
diag: dict[str, Any] = {
"plan_id": plan.plan_id,
"generated_at": plan.generated_at.isoformat(),
"max_slots": max_events,
}
if entry_id is not None:
diag["entry_id"] = entry_id
if lockname is not None:
diag["lockname"] = lockname
if start_slot is not None:
diag["start_slot"] = start_slot
# Per-slot diagnostics (no raw codes)
slots_diag: dict[int, dict[str, Any]] = {}
for slot_num, ps in plan.slots.items():
slots_diag[slot_num] = {
"desired_identity_key": ps.desired_identity_key,
"actual_classification": ps.actual_classification,
"action": ps.action.value,
"blocked_reason": ps.pending_reason,
"retry_count": ps.retry_count,
"last_error": ps.last_error,
}
diag["slots"] = slots_diag
# Per-reservation diagnostics (slot_code intentionally excluded)
res_diag: dict[str, dict[str, Any]] = {}
for res in reservations:
ikey = res.identity_key
res_diag[ikey] = {
"selected": ikey in plan.selected,
"protected": ikey in plan.protected,
"overflow_reason": plan.overflow.get(ikey),
"missing_count": res.missing_count,
"assigned_slot": plan.selected.get(ikey),
"uid_aliases": sorted(res.uid_aliases),
"booking_aliases": sorted(res.booking_aliases),
"slot_name": res.slot_name,
"summary": res.summary,
"eligible": res.eligible,
"protected_active": res.protected_active,
"checked_out": res.checked_out,
}
diag["reservations"] = res_diag
# Carry over pre-existing diagnostics keys not overwritten above.
for k, v in existing_diag.items():
diag.setdefault(k, v)
return diag
def compute_desired_plan(
reservations: list[Reservation],
managed_slots: list[ManagedSlot],
max_events: int,
plan_id: str,
generated_at: datetime,
*,
entry_id: str | None = None,
lockname: str | None = None,
start_slot: int | None = None,
) -> DesiredPlan:
"""Compute the deterministic desired slot plan for the current set of reservations.
@@ -1227,6 +1324,12 @@ def compute_desired_plan(
occupants, ``RETRY_CLEAR`` for pending clears, and ``BLOCKED`` for locked
slots. ``NOOP`` actions are excluded from :attr:`DesiredPlan.actions`.
**Diagnostics**: a comprehensive snapshot is stored in
:attr:`DesiredPlan.diagnostics` capturing ``plan_id``, ``generated_at``,
per-slot desired/actual/action/blocked_reason/retry_count/last_error, and
per-reservation selected/protected/overflow/missing_count/assigned_slot/
uid_aliases/booking_aliases. Raw slot codes are never included.
Args:
reservations: All current reservations (eligible and ineligible).
managed_slots: All managed slots with their current observed and
@@ -1234,6 +1337,9 @@ def compute_desired_plan(
max_events: Maximum number of reservations that can be assigned.
plan_id: Refresh-scoped identifier for logging and operation tokens.
generated_at: Time at which the plan was computed.
entry_id: Optional config-entry scope for diagnostics context.
lockname: Optional Keymaster lock name for diagnostics context.
start_slot: Optional managed-range start for diagnostics context.
Returns:
A fully populated :class:`DesiredPlan` with :attr:`~DesiredPlan.selected`,
@@ -1317,6 +1423,7 @@ def compute_desired_plan(
action=action,
pending_reason=pending_reason,
retry_count=ms.retry_count,
last_error=ms.last_error,
)
if action is not ActionKind.NOOP:
plan.actions.append(
@@ -1328,4 +1435,13 @@ def compute_desired_plan(
)
)
plan.diagnostics = _build_plan_diagnostics_snapshot(
plan,
reservations,
max_events,
entry_id=entry_id,
lockname=lockname,
start_slot=start_slot,
)
return plan

View File

@@ -540,3 +540,210 @@ class TestClearFailureSlotNotReused:
assert eo.overrides[1]["slot_name"] == "OldGuest"
assert eo.overrides[2] is not None
assert eo.overrides[2]["slot_name"] == "New Guest"
# ---------------------------------------------------------------------------
# T093 Diagnostics desired-vs-actual completeness scenario
# ---------------------------------------------------------------------------
class TestDiagnosticsDesiredVsActual:
"""T093: Integration scenario proving diagnostics capture all significant
slot states: matched slot, overflow reservation, manual drift, and
pending clear.
Uses focused mock-based integration (same pattern as TestClearFailureSlotNotReused)
so no HA infrastructure is required.
"""
async def test_diagnostics_captures_all_states(self) -> None:
"""Diagnostics snapshot covers matched, overflow, drift, and pending clear.
Scenario:
- Slot 1 occupied by matching reservation r-match (NOOP action)
- Slot 2 pending_clear from a previous failed clear (RETRY_CLEAR)
- Reservation r-overflow exceeds capacity and lands in plan.overflow
- plan.diagnostics contains per-slot and per-reservation detail
- No raw PIN codes appear anywhere in diagnostics
"""
from datetime import datetime
from datetime import timezone
from custom_components.rental_control.event_overrides import EventOverrides
from custom_components.rental_control.reconciliation import ActionKind
from custom_components.rental_control.reconciliation import DesiredPlan
from custom_components.rental_control.reconciliation import ManagedSlot
from custom_components.rental_control.reconciliation import PlannedSlot
from custom_components.rental_control.reconciliation import Reservation
from custom_components.rental_control.reconciliation import SlotStatus
from custom_components.rental_control.reconciliation import compute_desired_plan
_TZ = timezone.utc
def _mk(key: str, day: int) -> Reservation:
"""Build a minimal August Reservation with *key* starting on *day*."""
from datetime import timedelta
s = datetime(2026, 8, day, 14, tzinfo=_TZ)
e = s + timedelta(days=7)
return Reservation(
identity_key=key,
start=s,
end=e,
buffered_start=s,
buffered_end=e,
summary=f"Guest {key}",
slot_name=f"Guest {key}",
display_slot_name=f"RC Guest {key}",
slot_code="SECRETCODE",
)
r_match = _mk("r-match", 1)
r_overflow = _mk("r-overflow", 8)
# Slot 1: occupied by r-match (persisted_identity_key matches)
# Slot 2: pending_clear from a prior failed attempt
ms1 = ManagedSlot(
slot=1,
managed=True,
status=SlotStatus.OCCUPIED,
actual_name="Guest r-match",
actual_code_present=True,
persisted_identity_key="r-match",
last_error=None,
)
ms2 = ManagedSlot(
slot=2,
managed=True,
status=SlotStatus.PENDING_CLEAR,
actual_name="OldGuest",
actual_code_present=True,
retry_count=1,
last_error="prior clear failed",
blocked_reason="prior clear unconfirmed",
)
generated = datetime(2026, 8, 1, tzinfo=_TZ)
plan = compute_desired_plan(
[r_match, r_overflow],
[ms1, ms2],
max_events=1, # capacity=1 → r_overflow overflows
plan_id="t093-diag",
generated_at=generated,
entry_id="entry-t093",
lockname="test_lock",
start_slot=1,
)
diag = plan.diagnostics
# --- Plan metadata present ---
assert diag["plan_id"] == "t093-diag"
assert diag["entry_id"] == "entry-t093"
assert diag["lockname"] == "test_lock"
assert diag["start_slot"] == 1
# --- Matched slot: r-match in slot 1 ---
assert "r-match" in plan.selected
assert diag["reservations"]["r-match"]["selected"] is True
assert diag["reservations"]["r-match"]["assigned_slot"] == 1
# --- Overflow reservation: r-overflow ---
assert "r-overflow" in plan.overflow
assert diag["reservations"]["r-overflow"]["selected"] is False
assert diag["reservations"]["r-overflow"]["overflow_reason"] is not None
# --- Pending clear: slot 2 has RETRY_CLEAR action ---
assert diag["slots"][2]["action"] == ActionKind.RETRY_CLEAR.value
assert diag["slots"][2]["retry_count"] == 1
assert diag["slots"][2]["last_error"] == "prior clear failed"
# --- EventOverrides snapshot also captures this ---
eo = EventOverrides(start_slot=1, max_slots=2)
eo._pending_clear_slots[2] = "op-token"
eo._record_slot_error(2, "prior clear failed")
# Build a matching plan for the snapshot
snap_plan = DesiredPlan(plan_id="t093-snap", generated_at=generated)
snap_plan.slots[1] = PlannedSlot(
slot=1,
desired_identity_key="r-match",
actual_classification="occupied",
action=ActionKind.NOOP,
)
snap_plan.slots[2] = PlannedSlot(
slot=2,
desired_identity_key=None,
actual_classification="pending_clear",
action=ActionKind.RETRY_CLEAR,
pending_reason="prior clear unconfirmed",
retry_count=1,
)
eo.update_diagnostics_snapshot(snap_plan)
snap = eo.diagnostics_snapshot
assert 1 in snap["matched_slots"]
assert snap["matched_slots"][1]["identity_key"] == "r-match"
assert 2 in snap["pending_corrections"]
assert 2 in snap["pending_clear_slots"]
assert snap["last_slot_errors"][2] == "prior clear failed"
# --- No raw codes anywhere ---
assert "SECRETCODE" not in str(diag)
assert "SECRETCODE" not in str(snap)
assert "slot_code" not in str(diag)
assert "slot_code" not in str(snap)
async def test_diagnostics_manual_drift_detected(self) -> None:
"""Manual drift (OVERWRITE_MANUAL_CHANGE) shows in per-slot diagnostics."""
from datetime import datetime
from datetime import timedelta
from datetime import timezone
from custom_components.rental_control.reconciliation import ManagedSlot
from custom_components.rental_control.reconciliation import Reservation
from custom_components.rental_control.reconciliation import SlotStatus
from custom_components.rental_control.reconciliation import compute_desired_plan
_TZ = timezone.utc
s = datetime(2026, 8, 1, 14, tzinfo=_TZ)
e = s + timedelta(days=7)
r = Reservation(
identity_key="r-drift",
start=s,
end=e,
buffered_start=s,
buffered_end=e,
summary="Guest Drift",
slot_name="Guest Drift",
display_slot_name="RC Guest Drift",
slot_code="DRIFT_CODE",
)
# Slot occupied with DIFFERENT persisted key → planner will CLEAR then SET
# (not OVERWRITE_MANUAL_CHANGE since that happens in apply, but the CLEAR
# action + wrong persisted key is visible in diagnostics)
ms = ManagedSlot(
slot=5,
managed=True,
status=SlotStatus.OCCUPIED,
actual_name="WrongGuest",
actual_code_present=True,
persisted_identity_key="r-wrong", # different from r-drift
)
plan = compute_desired_plan(
[r],
[ms],
max_events=3,
plan_id="t093-drift",
generated_at=s,
)
slot_diag = plan.diagnostics["slots"][5]
# CLEAR because persisted key doesn't match desired key
assert slot_diag["action"] == "clear"
assert slot_diag["actual_classification"] == "occupied"
assert "DRIFT_CODE" not in str(plan.diagnostics)

View File

@@ -4248,3 +4248,278 @@ class TestDesiredPlanActionScaffold:
assert "ov-r3" in plan.overflow
assert plan.overflow["ov-r3"] == "capacity"
assert "ov-r3" not in plan.selected
# ---------------------------------------------------------------------------
# T091: EventOverrides diagnostics snapshot tests
# ---------------------------------------------------------------------------
class TestDiagnosticsSnapshot:
"""T091: Diagnostics snapshot tests for matched slots, pending corrections,
blocked clear reasons, retry count, last error, and no raw codes."""
_TZ = dt_util.UTC
def _make_dt(self, day: int) -> datetime:
"""Return a UTC-aware datetime for Aug *day* 2026 at 14:00."""
return datetime(2026, 8, day, 14, tzinfo=self._TZ)
def _make_plan(
self,
*,
plan_id: str = "diag-plan-001",
) -> DesiredPlan:
"""Return a minimal DesiredPlan for snapshot tests."""
return DesiredPlan(
plan_id=plan_id,
generated_at=datetime(2026, 8, 1, tzinfo=self._TZ),
)
def _make_planned_slot(
self,
slot: int,
*,
desired_identity_key: str | None = None,
actual_classification: str = "free",
action: ActionKind = ActionKind.NOOP,
pending_reason: str | None = None,
retry_count: int = 0,
last_error: str | None = None,
):
"""Return a minimal PlannedSlot for snapshot tests."""
from custom_components.rental_control.reconciliation import PlannedSlot
return PlannedSlot(
slot=slot,
desired_identity_key=desired_identity_key,
actual_classification=actual_classification,
action=action,
pending_reason=pending_reason,
retry_count=retry_count,
last_error=last_error,
)
def test_initial_snapshot_is_empty(self) -> None:
"""Before any plan is applied, diagnostics_snapshot is an empty dict."""
eo = EventOverrides(start_slot=5, max_slots=3)
assert eo.diagnostics_snapshot == {}
def test_snapshot_has_matched_slots(self) -> None:
"""After update_diagnostics_snapshot, matched slots appear in snapshot."""
eo = EventOverrides(start_slot=5, max_slots=3)
plan = self._make_plan()
plan.slots[5] = self._make_planned_slot(
5,
desired_identity_key="res-abc",
actual_classification="occupied",
action=ActionKind.NOOP,
)
plan.slots[6] = self._make_planned_slot(6) # no desired key → not matched
eo.update_diagnostics_snapshot(plan)
snap = eo.diagnostics_snapshot
assert 5 in snap["matched_slots"]
assert snap["matched_slots"][5]["identity_key"] == "res-abc"
assert 6 not in snap["matched_slots"]
def test_snapshot_has_pending_corrections_for_retry_clear(self) -> None:
"""RETRY_CLEAR action appears in pending_corrections."""
eo = EventOverrides(start_slot=5, max_slots=3)
plan = self._make_plan()
plan.slots[5] = self._make_planned_slot(
5,
desired_identity_key=None,
actual_classification="pending_clear",
action=ActionKind.RETRY_CLEAR,
pending_reason="prior clear unconfirmed",
retry_count=2,
)
eo.update_diagnostics_snapshot(plan)
snap = eo.diagnostics_snapshot
assert 5 in snap["pending_corrections"]
correction = snap["pending_corrections"][5]
assert correction["action"] == ActionKind.RETRY_CLEAR.value
assert correction["retry_count"] == 2
def test_snapshot_has_pending_corrections_for_blocked(self) -> None:
"""BLOCKED action appears in pending_corrections."""
eo = EventOverrides(start_slot=5, max_slots=3)
plan = self._make_plan()
plan.slots[6] = self._make_planned_slot(
6,
actual_classification="blocked",
action=ActionKind.BLOCKED,
pending_reason="manual change detected",
)
eo.update_diagnostics_snapshot(plan)
snap = eo.diagnostics_snapshot
assert 6 in snap["pending_corrections"]
assert snap["pending_corrections"][6]["action"] == ActionKind.BLOCKED.value
def test_snapshot_has_blocked_clear_reasons(self) -> None:
"""blocked_reason from PlannedSlot appears in pending_corrections."""
eo = EventOverrides(start_slot=5, max_slots=2)
plan = self._make_plan()
plan.slots[5] = self._make_planned_slot(
5,
actual_classification="blocked",
action=ActionKind.BLOCKED,
pending_reason="slot entity unavailable",
)
eo.update_diagnostics_snapshot(plan)
snap = eo.diagnostics_snapshot
assert (
snap["pending_corrections"][5]["blocked_reason"]
== "slot entity unavailable"
)
def test_snapshot_has_slot_retry_counts(self) -> None:
"""slot_retry_counts includes all managed slots."""
eo = EventOverrides(start_slot=5, max_slots=3)
# Record a failure to bump retry count
eo.record_retry_failure(5)
eo.record_retry_failure(5)
plan = self._make_plan()
eo.update_diagnostics_snapshot(plan)
snap = eo.diagnostics_snapshot
# All three managed slots (5, 6, 7) appear
assert 5 in snap["slot_retry_counts"]
assert 6 in snap["slot_retry_counts"]
assert 7 in snap["slot_retry_counts"]
assert snap["slot_retry_counts"][5] == 2
def test_snapshot_has_last_errors_after_failed_clear(self) -> None:
"""After a failed clear, last_slot_errors appears in snapshot."""
eo = EventOverrides(start_slot=5, max_slots=2)
# Directly use the private helper to simulate a recorded error
eo._record_slot_error(5, "lock offline")
plan = self._make_plan()
eo.update_diagnostics_snapshot(plan)
snap = eo.diagnostics_snapshot
assert 5 in snap["last_slot_errors"]
assert snap["last_slot_errors"][5] == "lock offline"
async def test_snapshot_captures_error_from_failed_apply(self) -> None:
"""apply_plan with a failed clear records the error in the snapshot."""
eo = EventOverrides(start_slot=5, max_slots=2)
now = datetime(2026, 8, 1, tzinfo=self._TZ)
eo.update(5, "c1", "OldGuest", now, now + timedelta(days=7))
eo.update(6, "", "", now, now + timedelta(days=7))
plan = self._make_plan()
plan.actions = [SlotAction(kind=ActionKind.CLEAR, slot=5, identity_key=None)]
plan.slots[5] = self._make_planned_slot(
5,
desired_identity_key=None,
actual_classification="occupied",
action=ActionKind.CLEAR,
)
plan.slots[6] = self._make_planned_slot(6)
failed_result = OperationResult(
kind="clear",
slot=5,
failed=True,
error="lock unreachable",
)
coordinator = MagicMock()
coordinator.lockname = "test_lock"
coordinator.hass.services.async_call = AsyncMock()
with patch(
"custom_components.rental_control.event_overrides.async_fire_clear_code",
return_value=failed_result,
):
await eo.async_apply_plan(coordinator, plan, {})
snap = eo.diagnostics_snapshot
assert 5 in snap["last_slot_errors"]
assert snap["last_slot_errors"][5] == "lock unreachable"
async def test_snapshot_clears_error_after_successful_clear(self) -> None:
"""After a successful clear, the slot error is removed from snapshot."""
eo = EventOverrides(start_slot=5, max_slots=2)
now = datetime(2026, 8, 1, tzinfo=self._TZ)
eo.update(5, "c1", "OldGuest", now, now + timedelta(days=7))
eo.update(6, "", "", now, now + timedelta(days=7))
# Pre-seed an error from a previous failed attempt
eo._record_slot_error(5, "previous error")
plan = self._make_plan()
plan.actions = [SlotAction(kind=ActionKind.CLEAR, slot=5, identity_key=None)]
plan.slots[5] = self._make_planned_slot(
5,
desired_identity_key=None,
actual_classification="occupied",
action=ActionKind.CLEAR,
)
plan.slots[6] = self._make_planned_slot(6)
success_result = OperationResult(kind="clear", slot=5, confirmed=True)
coordinator = MagicMock()
coordinator.lockname = "test_lock"
coordinator.hass.services.async_call = AsyncMock()
with patch(
"custom_components.rental_control.event_overrides.async_fire_clear_code",
return_value=success_result,
):
await eo.async_apply_plan(coordinator, plan, {})
snap = eo.diagnostics_snapshot
assert 5 not in snap["last_slot_errors"]
def test_snapshot_has_no_raw_slot_codes(self) -> None:
"""Diagnostics snapshot contains no slot_code or PIN values."""
eo = EventOverrides(start_slot=5, max_slots=2)
now = datetime(2026, 8, 1, tzinfo=self._TZ)
eo.update(5, "secret1234", "Guest A", now, now + timedelta(days=7))
plan = self._make_plan()
plan.slots[5] = self._make_planned_slot(
5,
desired_identity_key="res-001",
actual_classification="occupied",
action=ActionKind.NOOP,
)
eo.update_diagnostics_snapshot(plan)
snap = eo.diagnostics_snapshot
snap_str = str(snap)
assert "secret1234" not in snap_str
assert "slot_code" not in snap_str
assert "pin" not in snap_str.lower()
def test_snapshot_has_plan_id_and_timestamp(self) -> None:
"""Snapshot includes plan_id and generated_at from the plan."""
eo = EventOverrides(start_slot=5, max_slots=2)
plan = self._make_plan(plan_id="unique-plan-xyz")
eo.update_diagnostics_snapshot(plan)
snap = eo.diagnostics_snapshot
assert snap["plan_id"] == "unique-plan-xyz"
assert "generated_at" in snap
def test_snapshot_pending_clear_slots_list(self) -> None:
"""pending_clear_slots in snapshot matches internal state."""
eo = EventOverrides(start_slot=5, max_slots=3)
# Manually simulate pending clear state
eo._pending_clear_slots[5] = "op-token-abc"
plan = self._make_plan()
eo.update_diagnostics_snapshot(plan)
snap = eo.diagnostics_snapshot
assert 5 in snap["pending_clear_slots"]

View File

@@ -524,3 +524,134 @@ class TestDiagnosticsAttribute:
assert "keymaster_event_diagnostics" in attrs
assert isinstance(attrs["keymaster_event_diagnostics"], list)
assert attrs["keymaster_event_diagnostics"][0]["disposition"] == "accepted"
# ---------------------------------------------------------------------------
# T094: Diagnostics redaction compatibility tests
# ---------------------------------------------------------------------------
class TestRedactionCompatibility:
"""T094: Verify that neither the keymaster event diagnostics ring buffer
nor the new slot-plan diagnostics snapshot expose raw slot codes or
sensitive reservation metadata."""
def test_keymaster_event_buffer_has_no_slot_code_field(self) -> None:
"""Entries in keymaster_event_diagnostics never contain slot_code."""
from collections import deque
buf: deque = deque(maxlen=10)
entry = {
"timestamp": "2026-05-15T13:08:11+00:00",
"lockname": "Front Door",
"lockname_slug": "front_door",
"state": "unlocked",
"code_slot_num": 11,
"disposition": "accepted",
}
buf.append(entry)
for item in list(buf):
assert "slot_code" not in item
assert "pin" not in item
assert "code_value" not in item
def test_keymaster_event_buffer_fields_are_non_sensitive(self) -> None:
"""Accepted entries contain only lockname, state, slot num, disposition, timestamp."""
from collections import deque
buf: deque = deque(maxlen=10)
entry = {
"timestamp": "2026-05-15T13:08:11+00:00",
"lockname": "Front Door",
"lockname_slug": "front_door",
"state": "unlocked",
"code_slot_num": 11,
"disposition": "accepted",
}
buf.append(entry)
allowed_keys = {
"timestamp",
"lockname",
"lockname_slug",
"state",
"code_slot_num",
"disposition",
}
for item in list(buf):
assert set(item.keys()).issubset(allowed_keys)
def test_event_overrides_diagnostics_snapshot_has_no_raw_codes(self) -> None:
"""EventOverrides.diagnostics_snapshot contains no slot_code or PIN data."""
from datetime import datetime
from datetime import timedelta
from datetime import timezone
from custom_components.rental_control.event_overrides import EventOverrides
from custom_components.rental_control.reconciliation import ActionKind
from custom_components.rental_control.reconciliation import DesiredPlan
from custom_components.rental_control.reconciliation import PlannedSlot
_TZ = timezone.utc
now = datetime(2026, 8, 1, tzinfo=_TZ)
eo = EventOverrides(start_slot=10, max_slots=2)
eo.update(10, "SECRETPIN123", "Guest A", now, now + timedelta(days=7))
plan = DesiredPlan(plan_id="redact-test", generated_at=now)
plan.slots[10] = PlannedSlot(
slot=10,
desired_identity_key="res-a",
actual_classification="occupied",
action=ActionKind.NOOP,
)
plan.slots[11] = PlannedSlot(
slot=11,
desired_identity_key=None,
actual_classification="free",
action=ActionKind.NOOP,
)
eo.update_diagnostics_snapshot(plan)
snap = eo.diagnostics_snapshot
snap_str = str(snap)
assert "SECRETPIN123" not in snap_str
assert "slot_code" not in snap_str
def test_compute_desired_plan_diagnostics_has_no_slot_code(self) -> None:
"""compute_desired_plan diagnostics exclude slot_code from all entries."""
from datetime import datetime
from datetime import timedelta
from datetime import timezone
from custom_components.rental_control.reconciliation import ManagedSlot
from custom_components.rental_control.reconciliation import Reservation
from custom_components.rental_control.reconciliation import SlotStatus
from custom_components.rental_control.reconciliation import compute_desired_plan
_TZ = timezone.utc
start = datetime(2026, 8, 1, 14, tzinfo=_TZ)
end = start + timedelta(days=7)
r = Reservation(
identity_key="r-redact",
start=start,
end=end,
buffered_start=start,
buffered_end=end,
summary="Guest Redact",
slot_name="Guest Redact",
display_slot_name="RC Guest Redact",
slot_code="RAWPIN4567",
)
ms = ManagedSlot(slot=10, managed=True, status=SlotStatus.FREE)
plan = compute_desired_plan(
[r],
[ms],
max_events=3,
plan_id="p-redact",
generated_at=start,
)
diag_str = str(plan.diagnostics)
assert "RAWPIN4567" not in diag_str
assert "slot_code" not in diag_str

View File

@@ -3115,3 +3115,341 @@ class TestComputeDesiredPlanProtectionCapacity:
)
assert plan.validate() == []
# ---------------------------------------------------------------------------
# T092: Per-reservation diagnostics in compute_desired_plan
# ---------------------------------------------------------------------------
class TestComputeDesiredPlanDiagnostics:
"""T092: Verify per-reservation and per-slot diagnostics in plan.diagnostics.
Checks that plan.diagnostics contains the expected keys and that
per-reservation entries expose selected/protected/overflow/missing_count/
assigned_slot/uid_aliases/booking_aliases without leaking slot_code.
"""
def test_diagnostics_has_plan_id_and_generated_at(self) -> None:
"""plan.diagnostics contains plan_id and generated_at."""
r = _res("r-diag-001", 1)
plan = compute_desired_plan(
[r],
[_free_slot(5)],
max_events=3,
plan_id="test-diag-plan",
generated_at=_dt(2026, 7, 1),
)
assert plan.diagnostics["plan_id"] == "test-diag-plan"
assert "generated_at" in plan.diagnostics
def test_diagnostics_has_entry_id_when_provided(self) -> None:
"""entry_id keyword arg appears in plan.diagnostics."""
r = _res("r-entry", 1)
plan = compute_desired_plan(
[r],
[_free_slot(5)],
max_events=3,
plan_id="p",
generated_at=_dt(2026, 7, 1),
entry_id="entry-001",
)
assert plan.diagnostics["entry_id"] == "entry-001"
def test_diagnostics_has_lockname_and_start_slot_when_provided(self) -> None:
"""lockname and start_slot keyword args appear in plan.diagnostics."""
r = _res("r-lock", 1)
plan = compute_desired_plan(
[r],
[_free_slot(5)],
max_events=3,
plan_id="p",
generated_at=_dt(2026, 7, 1),
lockname="front_door",
start_slot=5,
)
assert plan.diagnostics["lockname"] == "front_door"
assert plan.diagnostics["start_slot"] == 5
def test_diagnostics_has_reservations_key(self) -> None:
"""plan.diagnostics['reservations'] is present."""
r = _res("r-dict", 1)
plan = compute_desired_plan(
[r],
[_free_slot(5)],
max_events=3,
plan_id="p",
generated_at=_dt(2026, 7, 1),
)
assert "reservations" in plan.diagnostics
def test_per_reservation_selected_true_when_assigned(self) -> None:
"""A reservation in plan.selected has selected=True in diagnostics."""
r = _res("r-sel", 1)
plan = compute_desired_plan(
[r],
[_free_slot(5)],
max_events=3,
plan_id="p",
generated_at=_dt(2026, 7, 1),
)
assert plan.diagnostics["reservations"]["r-sel"]["selected"] is True
def test_per_reservation_selected_false_when_overflow(self) -> None:
"""An overflow reservation has selected=False in diagnostics."""
reservations = [_res("r-a", 1), _res("r-b", 8), _res("r-c", 15)]
plan = compute_desired_plan(
reservations,
[_free_slot(5), _free_slot(6)],
max_events=2,
plan_id="p",
generated_at=_dt(2026, 7, 1),
)
# r-c is the third reservation; max_events=2 so it overflows
assert plan.diagnostics["reservations"]["r-c"]["selected"] is False
def test_per_reservation_protected_true_when_active(self) -> None:
"""A protected active reservation has protected=True in diagnostics."""
r_prot = _res("r-prot", 1, protected_active=True)
r_np = _res("r-np", 8)
plan = compute_desired_plan(
[r_prot, r_np],
[_free_slot(5), _free_slot(6)],
max_events=3,
plan_id="p",
generated_at=_dt(2026, 7, 1),
)
assert plan.diagnostics["reservations"]["r-prot"]["protected"] is True
assert plan.diagnostics["reservations"]["r-np"]["protected"] is False
def test_per_reservation_overflow_reason_capacity(self) -> None:
"""An overflow reservation has overflow_reason='capacity' in diagnostics."""
reservations = [_res("r-a", 1), _res("r-b", 8), _res("r-c", 15)]
plan = compute_desired_plan(
reservations,
[_free_slot(5), _free_slot(6)],
max_events=2,
plan_id="p",
generated_at=_dt(2026, 7, 1),
)
assert plan.diagnostics["reservations"]["r-c"]["overflow_reason"] == "capacity"
def test_per_reservation_overflow_reason_none_when_selected(self) -> None:
"""A selected reservation has overflow_reason=None in diagnostics."""
r = _res("r-sel", 1)
plan = compute_desired_plan(
[r],
[_free_slot(5)],
max_events=3,
plan_id="p",
generated_at=_dt(2026, 7, 1),
)
assert plan.diagnostics["reservations"]["r-sel"]["overflow_reason"] is None
def test_per_reservation_missing_count(self) -> None:
"""missing_count is reflected in per-reservation diagnostics."""
r = _res("r-miss", 1, missing_count=2)
plan = compute_desired_plan(
[r],
[_free_slot(5)],
max_events=3,
plan_id="p",
generated_at=_dt(2026, 7, 1),
)
assert plan.diagnostics["reservations"]["r-miss"]["missing_count"] == 2
def test_per_reservation_assigned_slot_matches_selected(self) -> None:
"""assigned_slot in diagnostics matches plan.selected."""
r = _res("r-as", 1)
plan = compute_desired_plan(
[r],
[_free_slot(5)],
max_events=3,
plan_id="p",
generated_at=_dt(2026, 7, 1),
)
assert plan.diagnostics["reservations"]["r-as"]["assigned_slot"] == 5
def test_per_reservation_assigned_slot_none_when_overflow(self) -> None:
"""Overflow reservations have assigned_slot=None in diagnostics."""
reservations = [_res("r-a", 1), _res("r-ov", 8)]
plan = compute_desired_plan(
reservations,
[_free_slot(5)],
max_events=1,
plan_id="p",
generated_at=_dt(2026, 7, 1),
)
assert plan.diagnostics["reservations"]["r-ov"]["assigned_slot"] is None
def test_per_reservation_uid_aliases_present(self) -> None:
"""uid_aliases are included in per-reservation diagnostics."""
from datetime import timedelta
start = _dt(2026, 7, 1, 14)
end = start + timedelta(days=7)
r = Reservation(
identity_key="r-uid",
start=start,
end=end,
buffered_start=start,
buffered_end=end,
summary="Guest UID",
slot_name="Guest UID",
display_slot_name="RC Guest UID",
slot_code="5678",
uid_aliases={"uid-abc123"},
)
plan = compute_desired_plan(
[r],
[_free_slot(5)],
max_events=3,
plan_id="p",
generated_at=_dt(2026, 7, 1),
)
assert "uid-abc123" in plan.diagnostics["reservations"]["r-uid"]["uid_aliases"]
def test_per_reservation_booking_aliases_present(self) -> None:
"""booking_aliases are included in per-reservation diagnostics."""
from datetime import timedelta
start = _dt(2026, 7, 1, 14)
end = start + timedelta(days=7)
r = Reservation(
identity_key="r-book",
start=start,
end=end,
buffered_start=start,
buffered_end=end,
summary="Guest Book",
slot_name="Guest Book",
display_slot_name="RC Guest Book",
slot_code="5678",
booking_aliases={"HMABCDEF1234"},
)
plan = compute_desired_plan(
[r],
[_free_slot(5)],
max_events=3,
plan_id="p",
generated_at=_dt(2026, 7, 1),
)
assert (
"HMABCDEF1234"
in plan.diagnostics["reservations"]["r-book"]["booking_aliases"]
)
def test_per_reservation_slot_code_not_in_diagnostics(self) -> None:
"""slot_code is never exposed in per-reservation diagnostics."""
from datetime import timedelta
start = _dt(2026, 7, 1, 14)
end = start + timedelta(days=7)
r = Reservation(
identity_key="r-code",
start=start,
end=end,
buffered_start=start,
buffered_end=end,
summary="Guest Code",
slot_name="Guest Code",
display_slot_name="RC Guest Code",
slot_code="SECRETPIN",
)
plan = compute_desired_plan(
[r],
[_free_slot(5)],
max_events=3,
plan_id="p",
generated_at=_dt(2026, 7, 1),
)
res_diag = plan.diagnostics["reservations"]["r-code"]
assert "slot_code" not in res_diag
# The secret PIN must not appear anywhere in the diagnostics string
assert "SECRETPIN" not in str(plan.diagnostics)
def test_diagnostics_has_slots_key(self) -> None:
"""plan.diagnostics['slots'] contains per-slot entries."""
r = _res("r-slotkey", 1)
plan = compute_desired_plan(
[r],
[_free_slot(5)],
max_events=3,
plan_id="p",
generated_at=_dt(2026, 7, 1),
)
assert "slots" in plan.diagnostics
assert 5 in plan.diagnostics["slots"]
def test_per_slot_desired_identity_key(self) -> None:
"""Per-slot diagnostics includes the desired_identity_key."""
r = _res("r-dik", 1)
plan = compute_desired_plan(
[r],
[_free_slot(5)],
max_events=3,
plan_id="p",
generated_at=_dt(2026, 7, 1),
)
slot_diag = plan.diagnostics["slots"][5]
assert slot_diag["desired_identity_key"] == "r-dik"
assert "actual_classification" in slot_diag
assert "action" in slot_diag
assert "retry_count" in slot_diag
def test_per_slot_last_error_from_managed_slot(self) -> None:
"""last_error from ManagedSlot is carried into per-slot diagnostics."""
r = _res("r-err", 1)
ms = ManagedSlot(
slot=5,
managed=True,
status=SlotStatus.FREE,
last_error="previous set failed",
)
plan = compute_desired_plan(
[r],
[ms],
max_events=3,
plan_id="p",
generated_at=_dt(2026, 7, 1),
)
assert plan.diagnostics["slots"][5]["last_error"] == "previous set failed"
def test_per_slot_last_error_none_when_no_error(self) -> None:
"""last_error is None in diagnostics when ManagedSlot has no error."""
r = _res("r-noerr", 1)
plan = compute_desired_plan(
[r],
[_free_slot(5)],
max_events=3,
plan_id="p",
generated_at=_dt(2026, 7, 1),
)
assert plan.diagnostics["slots"][5]["last_error"] is None
def test_diagnostics_no_optional_keys_when_not_provided(self) -> None:
"""entry_id, lockname, start_slot absent when not passed."""
r = _res("r-noopt", 1)
plan = compute_desired_plan(
[r],
[_free_slot(5)],
max_events=3,
plan_id="p",
generated_at=_dt(2026, 7, 1),
)
assert "entry_id" not in plan.diagnostics
assert "lockname" not in plan.diagnostics
assert "start_slot" not in plan.diagnostics
def test_overflow_details_preserved_in_diagnostics(self) -> None:
"""Existing overflow_details from overflow list survive the merge."""
reservations = [_res("r-a", 1), _res("r-b", 8), _res("r-c", 15)]
plan = compute_desired_plan(
reservations,
[_free_slot(5), _free_slot(6)],
max_events=2,
plan_id="p",
generated_at=_dt(2026, 7, 1),
)
assert "overflow_details" in plan.diagnostics
assert "r-c" in plan.diagnostics["overflow_details"]