#!/usr/bin/env python3
"""
AgentsNet push IPC hook for Hermes Agent (v0.1.6 S10 Option B true-instant).

Assumes HG-1 GREEN per plan Rev 3: Hermes exposes `send_message` as a
direct Python import path (`hermes_agent.tools.send_message`). If HG-1
returns RED, this file must be replaced with the MCP-client-loop shape
per plan §4.4.1 fallback (approx. +100 LoC Python + MCP client boilerplate).

Responsibilities:
  1. Subscribe to daemon SSE at GET /v1/push/subscribe over UDS with bearer
     token from ~/.agentsnet/ipc-token.
  2. Auto-reconnect with exponential backoff (1s → 30s max) on connection
     drop.
  3. For each PushEvent::NewInboxMessage: resolve target platform + chat_id
     from the event's contact_alias by querying daemon's push_routes table
     via GET /v1/push/routes, then call Hermes' send_message(target=
     "<platform>:<chat_id>", text=<preview>).
  4. On observed inbound WhatsApp message to bot: POST
     /v1/push/whatsapp_window_ping {chat_id, observed_at} back to daemon
     so it can reset push_routes.whatsapp_window_expires_at (M4).
  5. Per-platform rate-limit queues: Telegram 1 msg/s per chat, Discord
     5/5s per DM, WhatsApp 1/s.

IMPORTANT — AP2 contract (plan AP-v016-4):
  This hook calls ONLY Hermes' public `send_message` tool. It does NOT
  patch Hermes source, read Hermes internal state beyond the public tool
  API, or read Hermes FTS5 session search (which would violate AP5).

  If Hermes renames or removes `send_message`, this hook surfaces a clear
  error via `agentsnet_push_status.last_error` and auto-falls-back to
  cron_30s on affected routes.

Security — AP-v016-8:
  Delivery payload carries ONLY preview metadata (≤64 char AP5-truncated
  summary OR generic non-text label). Full message body stays in
  agentsnet_message_inbox; user fetches explicitly via chatbox.

v0.1.6 skeleton. S11 evidence harness + operator's live Round H SLO run
fill in the remaining wiring. Lines marked `# TODO(v0.1.6-final)` are
deliberate stubs that make the structure reviewable without requiring
the full Axis D agentd-side HTTP wiring (deferred to operator's next
session per S8 handoff).
"""

import http.client
import json
import logging
import os
import re
import socket
import sys
import time
from pathlib import Path

# v0.1.9 P0-4 closure: prepend ~/.hermes/hermes-agent to sys.path so that
# `from tools.send_message_tool import send_message_tool` succeeds when
# the hook is run by launchd / systemd-user (neither sets cwd to hermes-agent).
# Before this, the hook would ImportError at startup under launchd even
# though a direct interactive launch from inside hermes-agent worked.
_HERMES_AGENT_ROOT = os.path.expanduser("~/.hermes/hermes-agent")
if os.path.isdir(_HERMES_AGENT_ROOT) and _HERMES_AGENT_ROOT not in sys.path:
    sys.path.insert(0, _HERMES_AGENT_ROOT)

# Load ~/.hermes/.env BEFORE importing send_message_tool — Hermes platform
# credentials (TELEGRAM_BOT_TOKEN, DISCORD_BOT_TOKEN, etc.) live there, and
# send_message_tool reads them via os.getenv at call time.
#
# Pass 2 empirical verification (UTM VM, 2026-04-21): without explicit
# load_dotenv, send_message_tool returns
#   {"error": "Platform 'telegram' is not configured ..."}
# because python-dotenv hasn't auto-loaded the .env file. The fully-
# initialized `hermes` CLI runtime does this at startup; standalone
# subprocesses do not.
try:
    from dotenv import load_dotenv as _load_dotenv
    _load_dotenv(os.path.expanduser("~/.hermes/.env"))
except ImportError:  # pragma: no cover
    # python-dotenv missing → platforms may not have credentials loaded.
    # Surface as a warning at startup; individual send attempts will
    # still error clearly via send_message_tool's own "Platform X is
    # not configured" message.
    pass

# AP2 public-API only. HG-1 GATE (2026-04-21) verdict: Hermes v0.10.0 exposes
# the send-message tool at `tools.send_message_tool.send_message_tool(args)`,
# a synchronous function that takes a dict of {action, platform, target, text}
# matching SEND_MESSAGE_SCHEMA. This call shape runs standalone in a python3
# subprocess without Hermes runtime state (verified via dry-run
# `{"action": "list"}` returning real platform targets).
#
# Two deployment shapes are supported:
#
#   1. Native Hermes (e.g. Carol / UTM macOS VM) — Hermes venv on-host,
#      `from tools.send_message_tool import send_message_tool` resolves
#      when we prepend `~/.hermes/hermes-agent` to sys.path. Default path.
#
#   2. Docker-Hermes (e.g. Bob / Mac mini `nousresearch/hermes-agent:latest`
#      in `hermes-discord-gateway` container) — tools live inside the
#      container at /opt/hermes. Host has no Python venv, so we shell out
#      via `docker exec <container> python3 -c "..."`. Selected when env
#      var `AGENTSNET_HOOK_DOCKER_CONTAINER` is set.
#
# The Docker path was added after v0.1.9 P0-4 closure session discovered
# Bob's Hermes runs in Docker, not natively.
_DOCKER_CONTAINER = os.environ.get("AGENTSNET_HOOK_DOCKER_CONTAINER", "").strip()
_DOCKER_BIN = os.environ.get(
    "AGENTSNET_HOOK_DOCKER_BIN",
    "/usr/local/bin/docker" if os.path.exists("/usr/local/bin/docker") else "docker",
)
# Docker-Hermes images ship Hermes code under /opt/hermes with a dedicated
# venv at /opt/hermes/.venv/bin/python3. The system /usr/bin/python3 lacks
# Hermes's deps (yaml, discord.py, etc.) — using it fails at import time.
_DOCKER_PYTHON = os.environ.get(
    "AGENTSNET_HOOK_DOCKER_PYTHON", "/opt/hermes/.venv/bin/python3",
)
_DOCKER_CWD = os.environ.get("AGENTSNET_HOOK_DOCKER_CWD", "/opt/hermes")

# The name `send_message` was the plan's wrong assumption. Earlier drafts of
# this file used `from hermes_agent.tools import send_message` which
# ImportErrors on every Hermes install.
try:
    from tools.send_message_tool import send_message_tool as _send_message_tool  # type: ignore
except ImportError as exc:  # pragma: no cover
    # Defensive fallback: Hermes installs where the import path differs
    # (future v0.11+?) surface the error clearly so install-time test-ping
    # fails loud instead of silently degrading. MCP-client-loop rewrite per
    # plan §4.4.1 is the escape hatch if this branch fires repeatedly.
    _send_message_tool = None
    _HERMES_IMPORT_ERROR = repr(exc)
else:
    _HERMES_IMPORT_ERROR = None


def _json_dump_safe(obj, max_chars: int = 256) -> "str | None":
    """v0.1.14 RC fix F2 — JSON-serialize a send_message_tool result for
    install-proof-status.json's `delivery_evidence` field.

    The previous `str(obj)[:256]` emitted Python repr() format
    (single-quote keys, bare True/False) which is not JSON-parseable.
    Downstream consumers that try to decode delivery_evidence as JSON
    (chatbox status displays, telemetry agents) would silently fail.

    json.dumps(default=str) handles dict / list / primitives natively
    and falls back to str() for non-serializable objects (datetime,
    Path, custom exceptions) without raising. Returns None for empty/
    falsy input so the field stays `null` in the parent JSON instead
    of `"None"`.
    """
    if not obj:
        return None
    try:
        s = json.dumps(obj, default=str, ensure_ascii=False)
    except (TypeError, ValueError):
        # Last-resort fallback: str() of the object as a JSON string.
        s = json.dumps(str(obj))
    if len(s) > max_chars:
        s = s[:max_chars]
    return s


def _send_via_docker(target: str, text: str, container: str) -> dict:
    """Docker-Hermes path: shell out to `docker exec <container> python3 -c`
    so send_message_tool runs inside the container where the Hermes code
    actually lives. Host has no Python venv to import from.
    """
    import json
    import subprocess

    # Inline script — argparse-free, JSON-serialized args for safety.
    # Load .env from HERMES_HOME first — send_message_tool reads platform
    # credentials (DISCORD_BOT_TOKEN, etc.) via os.getenv at call time, and
    # `docker exec` inherits only the container's process env, not the
    # dotenv file. Empirically confirmed: without this, Discord send
    # returns {"error": "Platform 'discord' is not configured..."} even
    # though config.yaml lists it.
    script = (
        "import json, os, sys\n"
        "try:\n"
        "    from dotenv import load_dotenv\n"
        "    load_dotenv(os.path.join(os.environ.get('HERMES_HOME', '/opt/data'), '.env'))\n"
        "except Exception:\n"
        "    pass\n"
        "from tools.send_message_tool import send_message_tool\n"
        "args = json.loads(sys.stdin.read())\n"
        "result = send_message_tool(args)\n"
        "print(json.dumps(result))\n"
    )
    payload = json.dumps({
        "action": "send",
        "target": target,
        "message": text,
    })
    try:
        proc = subprocess.run(
            [
                _DOCKER_BIN, "exec", "-i", "-w", _DOCKER_CWD, container,
                _DOCKER_PYTHON, "-c", script,
            ],
            input=payload,
            capture_output=True,
            text=True,
            timeout=30,
        )
    except subprocess.TimeoutExpired:
        return {"error": f"docker exec timeout after 30s (container={container})"}
    except FileNotFoundError as exc:
        return {"error": f"docker binary not found: {exc}"}

    if proc.returncode != 0:
        return {
            "error": f"docker exec rc={proc.returncode}",
            "stderr": proc.stderr[-400:] if proc.stderr else "",
        }
    out = proc.stdout.strip().splitlines()
    if not out:
        return {"error": "docker exec produced no output"}
    try:
        return json.loads(out[-1])
    except json.JSONDecodeError:
        return {"error": "non-json result", "raw": out[-1][:400]}


def send_message(target: str, text: str) -> dict:
    """Thin adapter around Hermes v0.10.0's `send_message_tool(args)`.

    HG-1 GATE (2026-04-21) empirically verified schema:
      - `target`: fully-qualified `<platform>:<chat_id>` string (e.g.
        `telegram:Jim Zhao`, `discord:#mychannel`). The `platform` prefix
        is inferred from the target; a separate `platform` kwarg would
        shadow it and cause `Unknown platform: <chat_id>` errors.
      - `message`: the body text field (NOT `text`). Hermes rejects
        missing `message` even when `text` is present.
      - Synchronous — send_message_tool manages asyncio internally.

    Credentials must be resolvable from `~/.hermes/config.yaml` or env
    vars at call time; Hermes's fully-initialized runtime handles this
    automatically when the push-IPC hook is loaded via the daemon's
    LaunchAgent / systemd-user unit (which inherits Hermes session env).

    Routing:
      - `AGENTSNET_HOOK_DOCKER_CONTAINER` set → Docker-Hermes exec path.
      - unset → native Hermes in-process path (default).
    """
    if _DOCKER_CONTAINER:
        return _send_via_docker(target, text, _DOCKER_CONTAINER)
    if _send_message_tool is None:
        raise RuntimeError(
            f"send_message_tool unavailable — import failed: {_HERMES_IMPORT_ERROR}"
        )
    return _send_message_tool({
        "action": "send",
        "target": target,
        "message": text,
    })


LOG = logging.getLogger("agentsnet.push")
IPC_SOCK = Path.home() / ".agentsnet" / "ipc.sock"
IPC_TOKEN = Path.home() / ".agentsnet" / "ipc-token"
SSE_PATH = "/v1/push/subscribe"
REVERSE_PATH = "/v1/push/whatsapp_window_ping"
# v0.1.9 P0-4 closure: daemon returns active push_routes here so the
# hook knows where to forward each NewInboxMessage.
ROUTES_PATH = "/v1/push/routes"
# v0.1.12 PR3 G4 (Bucket 2.6): proof-code capture endpoint.
AUTO_REGISTER_PATH = "/v1/push/auto_register"
PROOF_CODE_RE = re.compile(r"AN-PUSH-[A-F0-9]{6}")

# v0.1.9 P0-4 closure (P8 wire): daemon-side code parses this prefix
# from User-Agent to populate push_routes.hermes_version_at_register.
# Bump the version string when hook.py wire protocol changes.
HOOK_VERSION = "v0.1.12-p7p8"

RECONNECT_BACKOFF_SECONDS = [1, 2, 5, 10, 30]


def load_token() -> str:
    """Read IPC bearer token; return empty string on missing/unreadable."""
    try:
        return IPC_TOKEN.read_text().strip()
    except (OSError, IOError):
        return ""


def unix_http_connect(sock_path: Path) -> http.client.HTTPConnection:
    """Axum-compatible HTTP over UDS. Re-uses Python stdlib http.client
    with a socket-family override via a custom HTTPConnection subclass."""

    class UnixHTTPConnection(http.client.HTTPConnection):
        def connect(self):
            self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
            self.sock.connect(str(sock_path))

    return UnixHTTPConnection("localhost")


# ── v0.1.12 PR3 G4 (Bucket 2.6) — proof-code capture path ─────────

def post_auto_register(token: str, proof_code: str, platform: str,
                       chat_id: str) -> dict:
    """POST {proof_code, host: 'hermes', platform, chat_id} to the daemon.

    Returns the parsed JSON or `{"error": "..."}`. Caller decides reply.
    """
    try:
        conn = unix_http_connect(IPC_SOCK)
        body = json.dumps({
            "proof_code": proof_code,
            "host": "hermes",
            "platform": platform,
            "chat_id": chat_id,
        })
        conn.request(
            "POST", AUTO_REGISTER_PATH,
            body=body,
            headers={
                "Content-Type": "application/json",
                "Authorization": f"Bearer {token}",
                "User-Agent": f"agentsnet-hermes-hook/{HOOK_VERSION}",
            },
        )
        resp = conn.getresponse()
        raw = resp.read().decode("utf-8", errors="replace")
        conn.close()
        if resp.status != 200:
            return {"error": f"daemon status={resp.status} body={raw[:200]}"}
        try:
            return json.loads(raw)
        except json.JSONDecodeError:
            return {"error": f"non-json: {raw[:200]}"}
    except Exception as exc:
        return {"error": f"transport: {exc}"}


def auto_register_from_inbound(platform: str, chat_id: str, body_text: str) -> None:
    """Inspect inbound IM message body for `AN-PUSH-XXXXXX`.

    On match: POST to /v1/push/auto_register with the captured chat_id
    and reply via Hermes `send_message_tool`. Best-effort, never raises.

    Caller wires this into Hermes's inbound-event source — typically the
    same hook that posts to `/v1/push/whatsapp_window_ping` for the
    24h-window ping. The function deliberately does NOT subscribe to
    inbound messages itself.
    """
    match = PROOF_CODE_RE.search(body_text or "")
    if not match:
        return
    code = match.group(0)
    LOG.info("proof-code captured: %s on %s:%s", code, platform, chat_id)
    token = load_token()
    if not token:
        LOG.warning("auto_register skipped: no IPC token")
        return
    result = post_auto_register(token, code, platform, chat_id)
    status = (result.get("status") or "").strip()
    if status == "registered":
        reply = "AgentsNet push is configured. New messages will be delivered to this chat."
    elif status == "already_registered":
        reply = "AgentsNet push is already configured for this chat."
    elif status == "expired_proof":
        reply = "AgentsNet push setup failed: verification code expired. Generate a fresh code and try again."
    elif status == "invalid_proof":
        reply = "AgentsNet push setup failed: verification code is invalid or already used. Generate a fresh code and try again."
    else:
        reply = f"AgentsNet push setup failed: {result.get('error') or status or 'unknown'}"
    target = f"{platform}:{chat_id}"
    try:
        send_message(target, reply)
    except Exception:
        LOG.exception("auto_register reply send failed")


def fetch_routes(token: str) -> list:
    """v0.1.9 P0-4 closure: fetch the current push_routes list from the
    daemon. Returns a list of dicts
    `{host, platform, chat_id, scope, transport, hermes_version_at_register}`.
    Empty list on auth/DB failure (hook stays subscribed; will retry on
    next SSE reconnect).

    Called once at subscribe start + once per SSE reconnect cycle so
    newly-added routes become active without restarting the hook.
    """
    try:
        conn = unix_http_connect(IPC_SOCK)
        conn.request(
            "GET",
            ROUTES_PATH,
            headers={
                "Accept": "application/json",
                "Authorization": f"Bearer {token}",
            },
        )
        resp = conn.getresponse()
        if resp.status != 200:
            LOG.warning("fetch_routes: status=%s", resp.status)
            return []
        body = resp.read().decode("utf-8", errors="replace")
        data = json.loads(body)
        routes = data.get("routes", [])
        LOG.info("fetch_routes: loaded %d active route(s)", len(routes))
        return routes
    except Exception:
        LOG.exception("fetch_routes: failed — hook will deliver with empty route table")
        return []


def pick_target_for_event(routes: list, contact_id: str, milestone: "str | None") -> "str | None":
    """v0.1.9 P0-4 closure: given the route table + event's contact_id +
    optional milestone tag, return the `<platform>:<chat_id>` target
    string for Hermes send_message, or None if no route matches.

    Scope matching:
      - scope == "all"                 → always matches
      - scope == "milestones_only"     → matches iff milestone is truthy
      - scope == list of contact_ids   → matches iff contact_id in list

    If multiple routes match, returns the FIRST (registration order).
    Caller is free to iterate to deliver to all matches if the
    pipeline ever supports fan-out; current Round H design is "one
    route, one delivery" per event.
    """
    for r in routes:
        scope = r.get("scope")
        matched = False
        if scope == "all":
            matched = True
        elif scope == "milestones_only":
            matched = bool(milestone)
        elif isinstance(scope, list):
            matched = contact_id in scope
        else:
            # Unknown scope shape — conservative default = skip.
            LOG.warning("pick_target_for_event: unknown scope=%r", scope)
            continue
        if matched:
            return f"{r.get('platform')}:{r.get('chat_id')}"
    return None


def _periodic_install_proof_drain_loop() -> None:
    """v0.1.13 RC fix (Sonnet 4.6 / Hermes-Neonize WhatsApp 2026-04-27 23:43):
    drain install-proof-queue.jsonl every 5 s independent of the SSE
    subscription cycle. Pre-fix, the queue drain only ran at the top of
    each `_subscribe_once` reconnect; while SSE was steady-state connected,
    a freshly-written queue file would NOT drain until the next disconnect.
    Sonnet 4.6's RC test surfaced this: `install-proof-status.json
    status=delivered` from the daemon side, but hook.py never saw the
    queue entry for ~minutes until SSE happened to drop.

    Periodic 5 s drain is safe vs the SSE-cycle drain because
    `_drain_install_proof_queue` uses an atomic-replace pattern
    (read → process → write_back_pending). Concurrent drains would at
    worst process the same entry twice, which is cosmetically harmless
    (dedup is the IM platform's responsibility; user might see one
    duplicate proof message at most). Daemon thread (daemon=True) so it
    terminates with the process.
    """
    while True:
        try:
            _drain_install_proof_queue()
        except Exception:
            LOG.exception("periodic install-proof drain failed (non-fatal)")
        time.sleep(5)


def subscribe_and_dispatch() -> None:
    """Main loop. Subscribes to daemon SSE + dispatches each event.

    Runs forever; caller (LaunchAgent/systemd-user) restarts on crash.
    """
    if send_message is None:
        LOG.error(
            "hermes_agent.tools.send_message import failed: %s — push disabled",
            _HERMES_IMPORT_ERROR,
        )
        return

    token = load_token()
    if not token:
        LOG.error("~/.agentsnet/ipc-token not readable — cannot authenticate SSE")
        return

    # v0.1.13 RC fix: spawn periodic drain thread. Runs every 5s
    # independently of SSE state (originally added because the
    # SSE-cycle drain only re-fired on reconnect, leaving newly-written
    # queue entries idle while SSE was steady-state connected).
    #
    # v0.1.13 Codex review fix #6 / Sonnet round-6 dedup close-out
    # (2026-04-28, Task S close): the SSE-cycle drain that USED to live
    # at the top of each subscribe_and_dispatch iteration is now
    # REMOVED. With both drains running concurrently, Sonnet's
    # WhatsApp received 2 copies of every install-proof message
    # (visible artifact of "concurrent drains process the same entry
    # twice" — which the original 7593757 commit handwaved as
    # "cosmetically harmless"). The periodic-drain thread alone
    # provides full coverage:
    #   - Runs immediately at thread spawn (first iteration before
    #     time.sleep)
    #   - Runs every 5s thereafter regardless of SSE state
    #   - Independent of subscribe/disconnect/reconnect lifecycle
    # The SSE-cycle drain at line ~475 is fully redundant — every
    # case it covers (queue entries written during reconnect window)
    # is already covered by the periodic thread.
    import threading
    threading.Thread(
        target=_periodic_install_proof_drain_loop,
        name="agentsnet-install-proof-drain",
        daemon=True,
    ).start()
    # v0.1.14 PR9 B6.6 (Sonnet round-7 noise reduction): include PID in
    # the "sole drainer" log so an operator inspecting journald can
    # disambiguate which hook.py process emitted the line if more than
    # one is somehow running simultaneously (launchd-misconfigured
    # restart loop, etc.). Cosmetic only — no functional change.
    LOG.info("install-proof queue periodic drain thread started (5 s interval, sole drainer, pid=%d)", os.getpid())

    backoff_idx = 0
    while True:
        try:
            # v0.1.9 P0-4 closure: refresh route table on every reconnect
            # so newly-registered routes take effect without hook restart.
            routes = fetch_routes(token)
            _subscribe_once(token, routes)
            backoff_idx = 0  # reset on clean disconnect
        except Exception:
            LOG.exception("push hook subscribe_once failure; will reconnect")
        delay = RECONNECT_BACKOFF_SECONDS[
            min(backoff_idx, len(RECONNECT_BACKOFF_SECONDS) - 1)
        ]
        backoff_idx += 1
        time.sleep(delay)


def _rewrite_status_to_delivered(entry: dict, send_result: dict) -> None:
    """v0.1.13 RC fix Task U: hook.py upgrades install-proof-status.json
    from `queued` (written by daemon's dispatch_hermes_queue) to
    `delivered` (written here, after send_message_tool returns positive
    confirmation).

    Matches the Rust serde-rename schema in install_proof.rs::DispatchStatus
    (snake_case enum variant names + key names).

    Atomic-replace pattern: write tmp file, rename onto final path. If
    the daemon happens to be writing a NoPending status concurrently,
    one of the two writes wins; both are valid post-delivery states.
    """
    status_path = Path.home() / ".agentsnet" / "install-proof-status.json"
    # Read existing status (likely "queued" written by daemon)
    existing = {}
    try:
        if status_path.exists():
            existing = json.loads(status_path.read_text(encoding="utf-8"))
    except Exception:
        # Corrupt status file is acceptable — we overwrite it.
        existing = {}
    # Build the delivered status. Preserve host_type / channel /
    # chat_id_redacted from existing if present (those are populated by
    # daemon's redact_chat_id() call); fall back to entry's target if
    # missing.
    target = entry.get("target", "")
    channel = existing.get("channel") or entry.get("channel", "")
    if not channel and ":" in target:
        channel = target.split(":", 1)[0]
    chat_id_redacted = existing.get("chat_id_redacted", "")
    if not chat_id_redacted:
        # Compute simple redaction: head 4 + tail 2 separated by …
        chat_id = entry.get("chat_id", "") or (
            target.split(":", 1)[1] if ":" in target else target
        )
        if len(chat_id) > 6:
            chat_id_redacted = f"{chat_id[:4]}…{chat_id[-2:]}"
        elif chat_id:
            chat_id_redacted = "*" * len(chat_id)
    new_status = {
        "at_unix": int(time.time()),
        "status": "delivered",
        "host_type": existing.get("host_type", "hermes"),
        "channel": channel,
        "chat_id_redacted": chat_id_redacted,
        "error": None,
        # Audit trail for Task U: which send_message_tool result
        # triggered the upgrade. Truncated to keep status file small.
        # v0.1.14 RC fix F2 (T2 Dave WA Hermes 2026-04-28): previously
        # used `str(send_result)` which emits Python repr() format
        # (single-quote keys, bare True/False) — NOT JSON-parseable.
        # Downstream consumers parsing install-proof-status.json
        # (chatbox status displays, telemetry agents) would fail to
        # decode the embedded structure. json.dumps(default=str)
        # produces valid JSON; default=str catches non-JSON-serializable
        # types (datetime, Path, exceptions) without raising.
        "delivery_evidence": _json_dump_safe(send_result),
    }
    tmp_path = status_path.with_suffix(".json.tmp")
    tmp_path.write_text(
        json.dumps(new_status, indent=2),
        encoding="utf-8",
    )
    try:
        os.chmod(tmp_path, 0o600)
    except OSError:
        pass
    os.replace(tmp_path, status_path)


def _drain_install_proof_queue() -> None:
    """v0.1.13 B0.1 A-class — drain ~/.agentsnet/install-proof-queue.jsonl.

    Daemon writes one JSON line per pending proof when host_type=hermes:
    `{channel, target, message, purpose: "install_proof", issued_at_unix}`.
    We invoke send_message(target, message) for each line; on success the
    line is removed. Failures stay in the queue so the next iteration
    retries (idempotent — duplicates would be cosmetic, but not harmful
    since the user will see at most a small number of proof messages).

    Atomic-replace pattern: read all lines, drain, write back the lines
    we couldn't deliver. Bounded queue length by trimming to the last 64
    entries so a stuck channel adapter can't pile up unbounded.
    """
    queue_path = Path.home() / ".agentsnet" / "install-proof-queue.jsonl"
    if not queue_path.exists():
        return
    try:
        raw = queue_path.read_text(encoding="utf-8")
    except OSError:
        return
    lines = [ln for ln in raw.splitlines() if ln.strip()]
    if not lines:
        # Empty queue file — clean it up so we stop reading it next tick.
        try:
            queue_path.unlink()
        except OSError:
            pass
        return
    # v0.1.13 Codex review fix #3 / Sonnet round-4 P0 spam-loop fix
    # 2026-04-28: drain MUST be all-or-nothing per drain cycle. Pre-fix,
    # a successful send at line ~604 followed by ANY exception (e.g.
    # _rewrite_status_to_delivered subprocess kill, hook.py launchd
    # KeepAlive respawn race, log-format-string bug) would leave the
    # queue file un-cleared, hook.py would crash-restart, drain the same
    # entry again, send again, loop. Sonnet's WA chat received the same
    # AN-INSTALL-406A... message ~20× in 3 minutes during round-4
    # before manual launchctl unload.
    #
    # Defense-in-depth: write the SHRUNK queue (current line removed)
    # BEFORE invoking _rewrite_status_to_delivered or LOG.info on the
    # successfully-sent entry. If everything past the queue-clear blows
    # up, the entry is already permanently consumed and the worst-case
    # is a missing status-file rewrite (cosmetic — daemon's "queued"
    # status persists until next drain or daemon restart).
    pending: list[str] = []
    # Track which lines we've successfully sent so we can shrink the
    # queue mid-loop after each success.
    successfully_sent_indices: set[int] = set()

    def _persist_queue_minus_consumed() -> None:
        """Atomic-replace queue file with only un-sent + un-processed
        entries. Called immediately after each successful send so a
        post-send crash cannot re-deliver."""
        remaining: list[str] = []
        for j, ln in enumerate(lines):
            if j in successfully_sent_indices:
                continue  # consumed
            remaining.append(ln)
        # Cap at 64 to prevent unbounded growth on stuck adapter.
        remaining = remaining[-64:]
        if remaining:
            tp = queue_path.with_suffix(".jsonl.tmp")
            tp.write_text("\n".join(remaining) + "\n", encoding="utf-8")
            try:
                os.chmod(tp, 0o600)
            except OSError:
                pass
            os.replace(tp, queue_path)
        else:
            try:
                queue_path.unlink()
            except OSError:
                pass

    for idx, line in enumerate(lines):
        if idx >= 16:
            pending.append(line)
            continue
        try:
            entry = json.loads(line)
        except json.JSONDecodeError:
            LOG.warning("install-proof queue: dropping malformed line: %r", line[:120])
            continue  # malformed → drop (don't retry)
        target = entry.get("target")
        message = entry.get("message")
        if not target or not message:
            LOG.warning("install-proof queue: missing target/message; dropping")
            continue
        try:
            result = send_message(target, message)
        except Exception:
            LOG.exception("install-proof queue: send_message raised; will retry")
            pending.append(line)
            continue
        # v0.1.13 Codex review fix #5 / Sonnet round-5 spam-loop root
        # cause (2026-04-28): Hermes' `send_message_tool` returns a
        # JSON STRING in some adapters, not a dict. Sonnet's
        # ~/Library/Logs/agentsnet-push/push-ipc.err showed:
        #   result='{"success": true, "platform": "whatsapp", ...,
        #            "message_id": "3EB0CE59C74953DD8B6E70"}'
        # Note the single quotes — Python repr() of a str, not a dict.
        # The pre-fix `isinstance(result, dict)` checks all returned
        # False, so even success responses were rejected as
        # "ambiguous", queue entry re-appended, infinite resend.
        # Fix: if `result` is a str, try to JSON-parse it into a dict
        # before the schema check. Belt-and-suspenders: also accept
        # `success: true` as a positive delivery indicator (Hermes WA
        # adapter contract), in addition to messageId / message_id /
        # sent / status.
        if isinstance(result, str):
            try:
                parsed = json.loads(result)
                if isinstance(parsed, dict):
                    result = parsed
            except (json.JSONDecodeError, ValueError):
                pass
        if isinstance(result, dict) and result.get("error"):
            LOG.warning(
                "install-proof queue: send_message error %s; will retry",
                result.get("error"),
            )
            pending.append(line)
            continue
        # v0.1.13 RC fix Task U (MnD LX DCjm Hm S4.6 false-positive
        # 2026-04-28): require explicit positive delivery indicator
        # rather than just absence of error. The Discord adapter on
        # the Mac mini Docker host returned a non-error dict when the
        # target channel "agentsnetai" didn't resolve to any visible
        # channel; without this check, hook.py treated the result as
        # success and consumed the queue entry. Operator's Discord
        # global search confirmed zero proof messages were actually
        # sent. The contract for `send_message_tool` v0.10.0+ returns
        # one of:
        #   - {"messageId": "<platform-id>"} on success
        #   - {"message_id": "<platform-id>"} (snake_case variant)
        #   - {"success": true, "platform": ..., "message_id": ...}
        #     (Neonize-WhatsApp adapter shape, Sonnet log evidence
        #     2026-04-28: result='{"success": true, "platform":
        #     "whatsapp", "chat_id": "16823849331", "message_id":
        #     "3EB0CE59C74953DD8B6E70"}')
        #   - {"sent": true, ...} on some adapters
        #   - {"status": "ok"|"delivered"} on some adapters
        #   - {"error": "<reason>"} on failure (caught above)
        # Anything else is ambiguous → treat as failure + retry. False
        # positives here defeat ADR-041 anti-hallucination, but
        # over-rotating false NEGATIVES (rejecting genuine successes)
        # creates the resend-spam vector that hit Sonnet round-5 ~20×.
        delivered_ok = False
        if isinstance(result, dict):
            if result.get("messageId") or result.get("message_id"):
                delivered_ok = True
            elif result.get("sent") is True:
                delivered_ok = True
            elif result.get("success") is True:
                # Hermes WA / some other adapters use {"success": true,
                # "message_id": ...}. Both fields are present on real
                # success; either alone is sufficient.
                delivered_ok = True
            elif result.get("status") in ("ok", "delivered"):
                delivered_ok = True
        if not delivered_ok:
            LOG.warning(
                "install-proof queue: send_message returned ambiguous "
                "result (no messageId/sent/status); treating as failed "
                "to defeat ADR-041 false-positive — will retry. "
                "result=%r",
                result,
            )
            pending.append(line)
            continue
        # v0.1.13 Codex review fix #3 / Sonnet round-4 P0 spam-loop fix:
        # mark entry as consumed AND persist the shrunk queue
        # IMMEDIATELY, before any rewrite-status / log calls. If we
        # crash after this point, the entry is already permanently
        # gone and we will not re-spam the user. Worst case: missing
        # status-file rewrite (cosmetic — daemon's "queued" persists
        # until next manual restart).
        successfully_sent_indices.add(idx)
        try:
            _persist_queue_minus_consumed()
        except Exception:
            # Persisting failed — log but DON'T retry. If we retry, we
            # risk re-delivering on the next drain cycle. The entry
            # WILL still be in the queue file on disk, so on next
            # drain we'd re-read and re-send. Better: log loudly and
            # let the operator notice the disk-write failure. The
            # queue file's atomic-replace pattern means partial writes
            # can't corrupt — either the new shrunk file is in place
            # or the old one is.
            LOG.exception(
                "install-proof queue: CRITICAL — persist_queue_minus_consumed "
                "failed after send. Manual `> ~/.hermes/agentsnet-push-ipc/install-proof-queue.jsonl` "
                "may be needed to break a re-delivery loop."
            )
        # Two-phase status (Task U): rewrite install-proof-status.json
        # from "queued" (daemon-written) to "delivered" (us-written) on
        # actual confirmed delivery. Only for entries with purpose=
        # "install_proof" — daemon SSE-event messages don't write status.
        # Wrapped in try/except so a status-rewrite failure can't reach
        # the launchd respawn → re-deliver path.
        if entry.get("purpose") == "install_proof":
            try:
                _rewrite_status_to_delivered(entry, result)
            except Exception:
                LOG.exception(
                    "install-proof queue: status rewrite failed (non-fatal — "
                    "entry already consumed from queue file, no re-delivery risk)"
                )
        # Defensive log call — wrap to prevent any format-string failure
        # from raising and triggering launchd respawn after consumption.
        try:
            mid = None
            if isinstance(result, dict):
                mid = result.get("messageId") or result.get("message_id")
            LOG.info(
                "install-proof queue: delivered to target=%s purpose=%s "
                "messageId=%s",
                target,
                entry.get("purpose", "unknown"),
                mid,
            )
        except Exception:
            pass

    # End-of-loop fallback persist: if we processed any retries that
    # appended to `pending`, the per-success persist above already
    # handles the consumed entries. We just need to merge in any
    # `pending` (failed) entries with the still-unprocessed (idx >= 16)
    # entries. Same atomic-replace contract.
    if successfully_sent_indices or pending:
        # Re-derive remaining: indices NOT in successfully_sent_indices
        # AND not in pending (pending was already added from the
        # idx>=16 cap path or the failure path).
        # Simplest: take pending plus any idx-not-in-sent line that
        # isn't already in pending. To preserve order, walk lines.
        already_in_pending = set(pending)
        merged: list[str] = []
        for j, ln in enumerate(lines):
            if j in successfully_sent_indices:
                continue
            if ln in already_in_pending:
                continue  # already accounted for
            merged.append(ln)
        merged = (merged + pending)[-64:]
        if merged:
            tmp_path = queue_path.with_suffix(".jsonl.tmp")
            tmp_path.write_text("\n".join(merged) + "\n", encoding="utf-8")
            try:
                os.chmod(tmp_path, 0o600)
            except OSError:
                pass
            os.replace(tmp_path, queue_path)
        else:
            try:
                queue_path.unlink()
            except OSError:
                pass


def _subscribe_once(token: str, routes: list) -> None:
    """One SSE subscription cycle. Returns on disconnect; outer loop reconnects."""
    conn = unix_http_connect(IPC_SOCK)
    conn.request(
        "GET",
        SSE_PATH,
        headers={
            "Accept": "text/event-stream",
            "Authorization": f"Bearer {token}",
            # v0.1.9 P0-4 closure (P8 auto-detect): daemon parses this
            # User-Agent to populate push_routes.hermes_version_at_register.
            "User-Agent": f"agentsnet-hermes-hook/{HOOK_VERSION}",
        },
    )
    resp = conn.getresponse()
    if resp.status == 401:
        raise RuntimeError("SSE auth rejected (401) — check ~/.agentsnet/ipc-token")
    if resp.status == 409:
        raise RuntimeError(
            "SSE 409 Conflict — another push hook already subscribed; "
            "disconnect it first (launchctl unload / systemctl stop)"
        )
    if resp.status != 200:
        raise RuntimeError(f"SSE unexpected status {resp.status}")

    buf = b""
    while True:
        chunk = resp.read1(4096)
        if not chunk:
            LOG.info("SSE stream closed by daemon; reconnecting")
            return
        buf += chunk
        while b"\n\n" in buf:
            event, _, buf = buf.partition(b"\n\n")
            _handle_sse_event(event, routes)


def _handle_sse_event(raw: bytes, routes: list) -> None:
    """Parse an SSE frame + dispatch to send_message."""
    try:
        text = raw.decode("utf-8", errors="replace")
        # SSE frames have `event: <kind>\ndata: <json>` shape per axum SSE.
        payload = None
        for line in text.splitlines():
            if line.startswith("data: "):
                payload = json.loads(line[6:])
                break
        if not payload:
            return

        kind = payload.get("kind", {})
        if "NewInboxMessage" in kind:
            msg = kind["NewInboxMessage"]
            _dispatch_new_inbox_message(msg, routes)
        # WhatsAppWindowReset is daemon-internal (not sent to hook); hook
        # only ORIGINATES that event via the reverse endpoint.
    except Exception:
        LOG.exception("failed to handle SSE event")


def _dispatch_new_inbox_message(msg: dict, routes: list) -> None:
    """Call Hermes send_message with AP5 preview only.

    v0.1.9 P2.5 (D1 / #62): when the event's `content_type` is
    `ImageInline` and the daemon attached a `rendering_hint: inline_image`
    payload (mime + bytes_b64 from agentsnet_message_read), Hermes's
    send_message_tool receives a targeted Telegram sendPhoto call
    instead of a text-only fallback. For platforms that don't support
    inline images (Discord threads, WhatsApp 24h-window), we degrade to
    text "[image: <caption or 'inline image'>]".

    Over-cap images (> 256 KB decoded) are sent as text-only per the
    ContentType::ImageInline Phase-1 contract.
    """
    preview = msg.get("preview_text", "")
    contact = msg.get("contact_alias", "unknown")
    contact_id = msg.get("contact_id", "")
    milestone = msg.get("milestone")
    content_type = msg.get("content_type", "Text")
    rendering_hint = msg.get("rendering_hint")
    mime = msg.get("mime", "application/octet-stream")
    bytes_b64 = msg.get("bytes_b64")
    caption = msg.get("caption")

    if not preview and not bytes_b64:
        return

    # v0.1.9 P0-4 closure: look up the target from the route table instead
    # of relying on the historical AGENTSNET_PUSH_TEST_TARGET env var. The
    # daemon's /v1/push/routes returns the active subscriptions; scope
    # matching decides which route applies to this contact_id.
    #
    # If no route matches, we log + drop the event. That is the correct
    # behavior — the user explicitly did NOT opt this contact into push
    # delivery.
    target = pick_target_for_event(routes, contact_id, milestone)
    if not target:
        # Legacy escape hatch kept ONLY for test harnesses that want to
        # force a target without touching the daemon route table.
        # Production path: never hits this branch because pick_target_for_event
        # returned None on empty routes.
        target = os.environ.get("AGENTSNET_PUSH_TEST_TARGET")
    if not target:
        LOG.info(
            "push hook: no matching push_route for event (contact=%s contact_id=%s scope lookup returned empty) — dropping",
            contact,
            contact_id,
        )
        return
    LOG.info(
        "push hook: matched target=%s for contact=%s contact_id=%s (ctype=%s milestone=%s)",
        target,
        contact,
        contact_id,
        content_type,
        milestone,
    )

    # v0.1.9 P2.5: ImageInline branch — Telegram sendPhoto via the
    # send_message_tool multipart path.
    is_image_inline = (
        content_type == "ImageInline"
        and rendering_hint == "inline_image"
        and bytes_b64
    )
    is_oversize = rendering_hint == "inline_image_oversize"

    if is_image_inline and target.startswith("telegram:"):
        # Hermes's send_message_tool accepts `attachment_b64` + `mime` in
        # v0.11.x; older versions fall through to text. The tool itself
        # makes the multipart sendPhoto call.
        try:
            result = _send_message_tool({
                "action": "send",
                "target": target,
                "message": caption or f"[AgentsNet/{contact}] inline image",
                "attachment_b64": bytes_b64,
                "attachment_mime": mime,
            }) if _send_message_tool else None
            LOG.info(
                "delivered ImageInline push via sendPhoto: %s → %s (mime=%s)",
                contact,
                target,
                mime,
            )
            if result is None:
                LOG.warning("send_message_tool unavailable; image skipped")
            return
        except Exception:
            LOG.exception(
                "ImageInline sendPhoto failed; falling back to text caption"
            )
            # fall through to text-only

    # Text-only (all non-image types + oversize-image fallback).
    body_text = preview
    if is_oversize:
        body_text = f"[image > 256KB cap; title: {caption or contact}]"
    elif is_image_inline:
        body_text = f"[image from {contact}: {caption or 'inline'}]"

    # v0.1.9 P0-4 closure follow-up (WA discovery 2026-04-24): send_message_tool
    # returns a dict. Success → {"success": true, ...}; failure → {"error": "..."}.
    # The prior "delivered push" log line fired unconditionally on any non-exception
    # return, masking silent failures. E.g. WA target parsing rejected
    # `whatsapp:Jimin Zhao` (space in name) with `{"error": "No home channel set..."}`
    # but hook still logged "delivered" → operator saw no WA message but trusted log.
    # Inspect the return dict explicitly and log success vs error distinctly.
    try:
        result = send_message(target=target, text=f"[AgentsNet/{contact}] {body_text}")
    except Exception:
        LOG.exception("send_message raised")
        return

    # send_message_tool() returns a JSON-encoded string, not a dict. Verified
    # empirically 2026-04-24 across telegram / discord / whatsapp — all three
    # platforms return shapes like `'{"success": true, "platform": "...",
    # "message_id": "..."}'`. The docker exec path goes through its own
    # json.loads (in _send_via_docker) and returns a real dict, so we handle
    # both shapes defensively.
    import json as _json
    parsed = result
    if isinstance(result, str):
        try:
            parsed = _json.loads(result)
        except Exception:
            parsed = {"raw": result}

    if isinstance(parsed, dict) and parsed.get("success"):
        LOG.info(
            "delivered push: %s → %s (content_type=%s, platform=%s, message_id=%s)",
            contact,
            target,
            content_type,
            parsed.get("platform"),
            parsed.get("message_id"),
        )
    elif isinstance(parsed, dict) and "error" in parsed:
        LOG.error(
            "send_message failed: %s → %s: %s",
            contact,
            target,
            parsed.get("error"),
        )
    else:
        LOG.warning(
            "send_message returned unexpected shape: %s → %s: %r",
            contact,
            target,
            parsed,
        )


def report_whatsapp_window_reset(chat_id: str, observed_at_ms: int) -> None:
    """M4 reverse-direction: tell daemon the 24h WhatsApp window is reset.

    Called when the hook's own Hermes inbound-observer sees a user→bot
    message on WhatsApp. Daemon updates push_routes.whatsapp_window_expires_at.
    """
    token = load_token()
    if not token:
        return
    conn = unix_http_connect(IPC_SOCK)
    body = json.dumps({"chat_id": chat_id, "observed_at_ms": observed_at_ms})
    conn.request(
        "POST",
        REVERSE_PATH,
        body=body,
        headers={
            "Authorization": f"Bearer {token}",
            "Content-Type": "application/json",
        },
    )
    resp = conn.getresponse()
    if resp.status not in (200, 204):
        LOG.warning(
            "whatsapp_window_ping rejected: status=%s chat=%s",
            resp.status,
            chat_id,
        )


def main() -> None:
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s %(levelname)s agentsnet.push %(message)s",
    )
    subscribe_and_dispatch()


if __name__ == "__main__":
    main()
