Skip to content

MCP Integration Design

Overview

This document specifies the MCP (Model Context Protocol) servers that Reeve uses to interact with the Pulse Queue system and send notifications. These servers expose tools that Reeve can call directly from within Hapi/Claude Code sessions.

MCP Server 1: Pulse Queue Server

Module: src/reeve/mcp/pulse_server.py

Purpose: Allow Reeve to manage its own schedule by creating, viewing, and modifying pulses.

Connection Type: stdio (spawned on-demand by Reeve)

Implementation

"""
MCP Server for Pulse Queue Management

This server exposes tools that allow Reeve (Claude Code) to proactively manage
its own scheduling. Reeve can set alarms, check its upcoming schedule, and
cancel or reschedule tasks.

Usage:
    Configure in ~/.config/claude-code/mcp_config.json:
    {
      "mcpServers": {
        "pulse-queue": {
          "command": "uv",
          "args": ["run", "--directory", "/path/to/reeve-bot", "python", "-m", "reeve.mcp.pulse_server"]
        }
      }
    }
"""

from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import TextContent
from pydantic import Field
from typing import Annotated, Literal, Optional
from datetime import datetime, timezone, timedelta
from contextlib import asynccontextmanager
import os

from reeve.pulse.queue import PulseQueue
from reeve.pulse.enums import PulsePriority, PulseStatus

# Initialize the MCP server
app = Server("pulse-queue")

# Initialize the pulse queue (database connection)
DB_PATH = os.getenv("PULSE_DB_PATH", "~/.reeve/pulse_queue.db")
queue = PulseQueue(f"sqlite+aiosqlite:///{os.path.expanduser(DB_PATH)}")


# ============================================================================
# Tool Definitions
# ============================================================================

@app.tool()
async def schedule_pulse(
    scheduled_at: Annotated[
        str,
        Field(
            description=(
                "When to execute this pulse. Accepts multiple formats:\n"
                "- ISO 8601 timestamp: '2026-01-20T09:00:00Z'\n"
                "- Relative time: 'in 2 hours', 'in 30 minutes', 'tomorrow at 9am'\n"
                "- Special keywords: 'now' (immediate), 'tonight' (today at 10pm), 'tomorrow morning' (tomorrow at 8am)\n\n"
                "IMPORTANT: All times are interpreted in the user's local timezone unless a UTC 'Z' suffix is provided."
            ),
            examples=["2026-01-20T09:00:00Z", "in 2 hours", "tomorrow at 9am", "now"]
        )
    ],
    prompt: Annotated[
        str,
        Field(
            description=(
                "The instruction or context for Reeve when this pulse fires. "
                "This becomes the initial message in the spawned Hapi session.\n\n"
                "Be specific and action-oriented. Good examples:\n"
                "- 'Check flight status for UA123 and notify user if delayed'\n"
                "- 'Daily morning briefing: review calendar, check email, summarize priorities'\n"
                "- 'Follow up: Did user reply to the snowboarding trip proposal?'\n\n"
                "Avoid vague prompts like 'check things' or 'do stuff'."
            ),
            min_length=10,
            max_length=2000
        )
    ],
    priority: Annotated[
        Literal["critical", "high", "normal", "low", "deferred"],
        Field(
            description=(
                "Urgency level for this pulse. Determines execution order when multiple pulses are due:\n\n"
                "- 'critical' (🚨): Emergencies, user messages, system failures. Interrupts deep work.\n"
                "- 'high' (🔔): Important external events, user-facing tasks, scheduled alarms.\n"
                "- 'normal' (⏰): Regular maintenance, periodic checks, calendar reminders. (DEFAULT)\n"
                "- 'low' (📋): Background tasks, cleanup, non-urgent maintenance.\n"
                "- 'deferred' (🕐): Intentionally postponed tasks, snoozed items.\n\n"
                "Default: 'normal'"
            ),
        )
    ] = "normal",
    resume_in_current_session: Annotated[
        bool,
        Field(
            description=(
                "Whether to resume the pulse in the current session (default: False).\n\n"
                "- False (default): The pulse starts a brand new session. Recommended for most use cases.\n"
                "- True: The pulse resumes in the current session, injecting the prompt as if it were a new user message at the scheduled time.\n\n"
                "WARNING: Setting this to True means the prompt will be injected into the current session context. "
                "This is useful when there's rich context to preserve, but be careful - if the user advances the conversation, "
                "the pulse prompt might not be relevant anymore."
            ),
        )
    ] = False,
    sticky_notes: Annotated[
        Optional[list[str]],
        Field(
            description=(
                "Optional list of reminder strings to inject into the prompt when the pulse fires. "
                "Use this to carry forward context between pulses.\n\n"
                "Examples:\n"
                "- ['User asked about snowboarding trip on Monday', 'Check if anyone replied in group chat']\n"
                "- ['Flight departs at 6:45 AM', 'TSA PreCheck lane available']\n\n"
                "These will be prepended to the main prompt with clear formatting."
            ),
        )
    ] = None,
    tags: Annotated[
        Optional[list[str]],
        Field(
            description=(
                "Optional categorization tags for filtering and organization. "
                "Useful for querying related pulses or understanding patterns.\n\n"
                "Examples: ['daily', 'morning_routine'], ['trip_planning', 'japan'], ['follow_up', 'github']"
            ),
        )
    ] = None,
) -> str:
    """
    Schedule a new pulse (wake-up event) for Reeve.

    This is Reeve's primary tool for proactive behavior. Use this to schedule
    future tasks, set reminders, or create follow-up checks.

    When to use:
    - Set alarms: "Remind me to check ticket prices tomorrow at 8am"
    - Schedule follow-ups: "Check if user replied in 2 hours"
    - Create recurring checks: "Every morning at 9am, review calendar"
    - Defer tasks: "I can't handle this now, wake me up tonight to finish"

    When NOT to use:
    - Immediate actions (just do them now)
    - One-time informational tasks (use memory/notes instead)

    Examples:
        # Set a morning briefing
        schedule_pulse(
            scheduled_at="tomorrow at 9am",
            prompt="Daily morning briefing: review calendar, check email, summarize priorities",
            priority="normal",
            tags=["daily", "morning_routine"]
        )

        # Follow up on a pending task
        schedule_pulse(
            scheduled_at="in 2 hours",
            prompt="Check if user replied to the snowboarding trip proposal in group chat",
            priority="high",
            sticky_notes=["Sent message at 2:30 PM", "Waiting for Alex and Jamie to confirm"],
            tags=["follow_up", "social"]
        )

        # Critical pre-departure check
        schedule_pulse(
            scheduled_at="2026-01-20T06:00:00Z",
            prompt="Check flight status for UA123 and notify user immediately if delayed",
            priority="critical",
            tags=["travel", "urgent"]
        )

    Returns:
        Confirmation message with the pulse ID and scheduled time
    """
    # Parse scheduled_at (handle relative times, keywords, etc.)
    parsed_time = _parse_time_string(scheduled_at)

    # Determine session ID based on resume_in_current_session
    session_id = None
    if resume_in_current_session:
        try:
            session_id = ctx.session_id
        except (RuntimeError, AttributeError):
            # Session ID not available - fall back to new session
            pass

    # Create the pulse
    pulse_id = await queue.schedule_pulse(
        scheduled_at=parsed_time,
        prompt=prompt,
        priority=PulsePriority(priority),
        session_id=session_id,
        sticky_notes=sticky_notes,
        tags=tags,
        created_by="reeve",
    )

    # Format response
    time_str = parsed_time.strftime("%Y-%m-%d %H:%M:%S %Z")
    return (
        f"✓ Pulse scheduled successfully\n\n"
        f"Pulse ID: {pulse_id}\n"
        f"Scheduled: {time_str}\n"
        f"Priority: {priority} {_priority_emoji(priority)}\n"
        f"Prompt: {prompt[:100]}{'...' if len(prompt) > 100 else ''}"
    )


@app.tool()
async def list_upcoming_pulses(
    limit: Annotated[
        int,
        Field(
            description="Maximum number of pulses to return (default: 20, max: 100)",
            ge=1,
            le=100
        )
    ] = 20,
    include_completed: Annotated[
        bool,
        Field(
            description="Whether to include recently completed pulses (default: False, only show pending)"
        )
    ] = False,
) -> str:
    """
    List upcoming scheduled pulses.

    Use this to check Reeve's schedule and understand what tasks are coming up.
    Useful for:
    - Seeing what's on the agenda
    - Verifying a pulse was scheduled correctly
    - Detecting scheduling conflicts
    - Understanding workload distribution

    Examples:
        # Check what's coming up
        list_upcoming_pulses(limit=10)

        # Review recent history
        list_upcoming_pulses(limit=20, include_completed=True)

    Returns:
        Formatted list of pulses with time, priority, and prompt preview
    """
    statuses = [PulseStatus.PENDING]
    if include_completed:
        statuses.extend([PulseStatus.COMPLETED, PulseStatus.PROCESSING])

    pulses = await queue.get_upcoming_pulses(limit=limit, include_statuses=statuses)

    if not pulses:
        return "No upcoming pulses scheduled. The schedule is clear."

    # Format as a table
    lines = ["Upcoming Pulses:\n"]
    now = datetime.now(timezone.utc)

    for pulse in pulses:
        # Calculate time until pulse
        time_delta = pulse.scheduled_at - now
        if time_delta.total_seconds() < 0:
            time_str = "OVERDUE"
        elif time_delta.total_seconds() < 3600:
            time_str = f"in {int(time_delta.total_seconds() / 60)}m"
        elif time_delta.total_seconds() < 86400:
            time_str = f"in {int(time_delta.total_seconds() / 3600)}h"
        else:
            time_str = pulse.scheduled_at.strftime("%b %d %H:%M")

        emoji = _priority_emoji(pulse.priority.value)
        status_emoji = _status_emoji(pulse.status.value)
        prompt_preview = pulse.prompt[:60] + "..." if len(pulse.prompt) > 60 else pulse.prompt

        lines.append(
            f"{status_emoji} [{pulse.id:04d}] {emoji} {time_str:12s} | {prompt_preview}"
        )

    return "\n".join(lines)


@app.tool()
async def cancel_pulse(
    pulse_id: Annotated[
        int,
        Field(
            description="The ID of the pulse to cancel (from list_upcoming_pulses)",
            gt=0
        )
    ],
) -> str:
    """
    Cancel a scheduled pulse.

    Use this when a task is no longer needed or circumstances have changed.

    Examples:
        # User already handled the task manually
        cancel_pulse(pulse_id=42)

        # Event was cancelled
        cancel_pulse(pulse_id=123)

    Returns:
        Confirmation message or error if pulse couldn't be cancelled
    """
    success = await queue.cancel_pulse(pulse_id)

    if success:
        return f"✓ Pulse {pulse_id} cancelled successfully"
    else:
        return f"✗ Could not cancel pulse {pulse_id} (may be already completed or not found)"


@app.tool()
async def reschedule_pulse(
    pulse_id: Annotated[
        int,
        Field(
            description="The ID of the pulse to reschedule (from list_upcoming_pulses)",
            gt=0
        )
    ],
    new_scheduled_at: Annotated[
        str,
        Field(
            description=(
                "New execution time. Accepts same formats as schedule_pulse:\n"
                "- ISO 8601: '2026-01-20T09:00:00Z'\n"
                "- Relative: 'in 2 hours', 'tomorrow at 9am'\n"
                "- Keywords: 'tonight', 'tomorrow morning'"
            ),
            examples=["in 2 hours", "tomorrow at 9am", "2026-01-20T15:00:00Z"]
        )
    ],
) -> str:
    """
    Reschedule a pulse to a different time.

    Use this when timing needs to change but the task itself remains relevant.

    Examples:
        # Postpone to tomorrow
        reschedule_pulse(pulse_id=42, new_scheduled_at="tomorrow at 9am")

        # Move earlier
        reschedule_pulse(pulse_id=123, new_scheduled_at="in 30 minutes")

    Returns:
        Confirmation message with old and new times
    """
    parsed_time = _parse_time_string(new_scheduled_at)
    success = await queue.reschedule_pulse(pulse_id, parsed_time)

    if success:
        time_str = parsed_time.strftime("%Y-%m-%d %H:%M:%S %Z")
        return f"✓ Pulse {pulse_id} rescheduled to {time_str}"
    else:
        return f"✗ Could not reschedule pulse {pulse_id} (may be already completed or not found)"


# ============================================================================
# Helper Functions
# ============================================================================

def _parse_time_string(time_str: str) -> datetime:
    """
    Parse a flexible time string into a UTC datetime.

    Supports:
    - ISO 8601: "2026-01-20T09:00:00Z"
    - Relative: "in 2 hours", "in 30 minutes"
    - Keywords: "now", "tonight", "tomorrow morning"

    TODO: Integrate with a proper NLP library (dateparser, parsedatetime)
    For now, implement basic cases.
    """
    time_str = time_str.strip().lower()

    # Keyword: "now"
    if time_str == "now":
        return datetime.now(timezone.utc)

    # ISO 8601
    if "T" in time_str or time_str.endswith("Z"):
        return datetime.fromisoformat(time_str.replace("Z", "+00:00"))

    # Relative: "in X hours/minutes"
    if time_str.startswith("in "):
        parts = time_str[3:].split()
        if len(parts) == 2:
            amount = int(parts[0])
            unit = parts[1].rstrip("s")  # "hours" -> "hour"

            if unit == "minute":
                return datetime.now(timezone.utc) + timedelta(minutes=amount)
            elif unit == "hour":
                return datetime.now(timezone.utc) + timedelta(hours=amount)
            elif unit == "day":
                return datetime.now(timezone.utc) + timedelta(days=amount)

    # Fallback: raise error for unimplemented formats
    raise ValueError(
        f"Could not parse time string: '{time_str}'. "
        f"Supported formats: ISO 8601, 'now', 'in X hours/minutes/days'"
    )


def _priority_emoji(priority: str) -> str:
    """Map priority to emoji for visual scanning."""
    return {
        "critical": "🚨",
        "high": "🔔",
        "normal": "⏰",
        "low": "📋",
        "deferred": "🕐",
    }.get(priority, "")


def _status_emoji(status: str) -> str:
    """Map status to emoji for visual scanning."""
    return {
        "pending": "⏳",
        "processing": "⚙️",
        "completed": "✅",
        "failed": "❌",
        "cancelled": "🚫",
    }.get(status, "")


# ============================================================================
# Server Entry Point
# ============================================================================

async def main():
    """Run the MCP server on stdio."""
    async with stdio_server() as (read_stream, write_stream):
        await app.run(
            read_stream,
            write_stream,
            app.create_initialization_options()
        )


if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

MCP Server 2: Telegram Notification Server

Module: src/reeve/mcp/notification_server.py

Purpose: Allow Reeve to send push notifications to the user via Telegram.

Connection Type: stdio (spawned on-demand by Reeve)

Implementation

"""
MCP Server for Telegram Notifications

This server exposes tools that allow Reeve to send push notifications to the
user via Telegram. This is Reeve's "voice" - how it communicates proactively.

Environment Variables:
    TELEGRAM_BOT_TOKEN: Telegram bot token (required)
    TELEGRAM_CHAT_ID: User's Telegram chat ID (required)
    HAPI_BASE_URL: Base URL for Hapi sessions (optional, defaults to https://hapi.run)

Usage:
    Configure in ~/.config/claude-code/mcp_config.json:
    {
      "mcpServers": {
        "telegram-notifier": {
          "command": "uv",
          "args": ["run", "--directory", "/path/to/reeve-bot", "python", "-m", "reeve.mcp.notification_server"],
          "env": {
            "TELEGRAM_BOT_TOKEN": "your_bot_token",
            "TELEGRAM_CHAT_ID": "your_chat_id",
            "HAPI_BASE_URL": "https://hapi.run"
          }
        }
      }
    }
"""

import os
from typing import Annotated, Literal

import requests
from mcp.server.fastmcp import FastMCP, Context
from pydantic import Field

# Initialize the MCP server
mcp = FastMCP("telegram-notifier")

# Telegram Bot Configuration
BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
CHAT_ID = os.getenv("TELEGRAM_CHAT_ID")  # The user's chat ID
HAPI_BASE_URL = os.getenv("HAPI_BASE_URL", "https://hapi.run")

if not BOT_TOKEN or not CHAT_ID:
    raise ValueError("TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID environment variables are required")


# ============================================================================
# Tool Definitions
# ============================================================================

@mcp.tool()
async def send_notification(
    ctx: Context,
    message: Annotated[
        str,
        Field(
            description=(
                "The notification message to send (up to 4096 characters). "
                "Keep it concise and actionable - this is a push notification.\n\n"
                "Good examples:\n"
                "- '🔔 Powder Alert: 18 inches forecast for Mammoth this weekend'\n"
                "- '✓ Daily briefing complete. 3 meetings today, 2 high-priority emails.'\n"
                "- '🚨 Flight UA123 delayed 2 hours. New departure: 10:30 AM'\n\n"
                "If parse_mode is set, you can use formatting:\n"
                "- MarkdownV2: *bold*, _italic_, `code`, [link](url)\n"
                "- HTML: <b>bold</b>, <i>italic</i>, <code>code</code>, <a href='url'>link</a>"
            ),
            min_length=1,
            max_length=4096,
        ),
    ],
    priority: Annotated[
        Literal["silent", "normal", "critical"],
        Field(
            description=(
                "Notification priority level:\n"
                "- 'silent' (🔕): No alert, just logs to chat (no sound/vibration)\n"
                "- 'normal' (🔔): Standard push notification with sound (default)\n"
                "- 'critical' (🚨): High-priority alert with sound\n\n"
                "This controls both notification behavior and routing."
            ),
        ),
    ] = "normal",
    parse_mode: Annotated[
        Literal["MarkdownV2", "HTML", "Markdown"] | None,
        Field(
            description=(
                "Optional message formatting mode:\n"
                "- 'MarkdownV2': Markdown formatting (recommended) - *bold*, _italic_, `code`\n"
                "- 'HTML': HTML formatting - <b>bold</b>, <i>italic</i>, <code>code</code>\n"
                "- 'Markdown': Legacy Markdown (deprecated, use MarkdownV2)\n"
                "- None: Plain text (default)\n\n"
                "Use MarkdownV2 for rich notifications, None for simple alerts."
            ),
        ),
    ] = None,
) -> str:
    """
    Send a push notification to the user via Telegram.

    This is Reeve's primary communication channel with the user. Use this to:
    - Alert about important events (flight delays, weather alerts, etc.)
    - Provide task completion updates
    - Request user input or decisions
    - Share summaries and insights

    The tool automatically includes a "View in Claude Code" button linking to the current
    session, so the user can quickly jump back to the conversation context.

    Priority levels control notification behavior:
    - silent: No sound/vibration (for background updates)
    - normal: Standard notification with sound (default)
    - critical: High-priority alert with sound

    When to use:
    - Proactive alerts: "Something happened you should know about"
    - Task updates: "I finished X, here's the result"
    - Requests: "I need your input on Y"

    When NOT to use:
    - Responding to user messages (they're already in the chat)
    - Logging/debugging (use internal logs instead)
    - High-frequency updates (batch them into summaries)

    Examples:
        # Simple alert with auto-generated Claude Code link
        send_notification(
            message="✓ Daily briefing complete. 3 meetings today."
        )

        # Formatted urgent alert
        send_notification(
            message="*URGENT*: Flight UA123 delayed 2 hours\\nNew departure: 10:30 AM",
            parse_mode="MarkdownV2",
            priority="critical"
        )

        # Silent background update
        send_notification(
            message="📋 Archived 47 old notes to Diary/2026-01/",
            priority="silent"
        )

    Returns:
        Confirmation message or error details
    """
    try:
        # Determine notification sound based on priority
        disable_notification = priority == "silent"

        # Auto-generate Hapi URL from session ID
        session_link_url = None
        try:
            session_id = ctx.session_id
            session_link_url = f"{HAPI_BASE_URL}/sessions/{session_id}"
        except (RuntimeError, AttributeError):
            # Session ID not available - no link button
            pass

        # Send via Telegram Bot API
        url = f"https://api.telegram.org/bot{BOT_TOKEN}/sendMessage"
        payload = {
            "chat_id": CHAT_ID,
            "text": message,
            "disable_notification": disable_notification,
        }

        if parse_mode:
            payload["parse_mode"] = parse_mode

        # Add session link button if available
        if session_link_url:
            reply_markup = {
                "inline_keyboard": [[{"text": "View in Claude Code", "url": session_link_url}]]
            }
            payload["reply_markup"] = reply_markup

        response = requests.post(url, json=payload, timeout=10)
        response.raise_for_status()

        link_info = " with link" if session_link_url else ""
        return f"✓ Notification{link_info} sent successfully ({priority})"

    except requests.exceptions.RequestException as e:
        return f"✗ Failed to send notification: {str(e)}"


# ============================================================================
# Server Entry Point
# ============================================================================

if __name__ == "__main__":
    mcp.run()

Configuration

MCP Client Configuration

Add to ~/.config/claude-code/mcp_config.json:

{
  "mcpServers": {
    "pulse-queue": {
      "command": "uv",
      "args": [
        "run",
        "--directory",
        "/home/reuben/workspace/reeve-bot",
        "python",
        "-m",
        "reeve.mcp.pulse_server"
      ],
      "env": {
        "PULSE_DB_PATH": "/home/reuben/.reeve/pulse_queue.db"
      }
    },
    "telegram-notifier": {
      "command": "uv",
      "args": [
        "run",
        "--directory",
        "/home/reuben/workspace/reeve-bot",
        "python",
        "-m",
        "reeve.mcp.notification_server"
      ],
      "env": {
        "TELEGRAM_BOT_TOKEN": "your_bot_token_here",
        "TELEGRAM_CHAT_ID": "your_chat_id_here",
        "HAPI_BASE_URL": "https://hapi.run"
      }
    }
  }
}

Security Note: The bot token is sensitive. Consider using a secrets manager or environment variable injection instead of hardcoding in the config.

Environment Variables

Required: - TELEGRAM_BOT_TOKEN: Bot API token from @BotFather - TELEGRAM_CHAT_ID: User's chat ID (get from /start message)

Optional: - PULSE_DB_PATH: Path to SQLite database (default: ~/.reeve/pulse_queue.db) - HAPI_BASE_URL: Base URL for Hapi sessions (default: https://hapi.run)

Design Principles

1. Type Safety with Pydantic

All tool parameters use Annotated[Type, Field(...)] to provide: - Runtime validation (Pydantic) - IDE autocomplete - Auto-generated MCP documentation - Clear error messages

2. Comprehensive Documentation

Each tool has: - Docstring: High-level purpose and usage - Field descriptions: Detailed parameter explanations - Examples: Concrete use cases - When to use / When NOT to use: Decision guidance

This helps Reeve (Claude) make intelligent decisions about when to call tools.

3. User-Friendly Formatting

  • Emojis: Visual priority/status indicators
  • Relative times: "in 2 hours" vs. ISO timestamps
  • Concise output: Essential info only, no clutter

4. Error Handling

  • Graceful failures: Return error strings, don't raise exceptions
  • Actionable messages: "Could not cancel pulse (not found)" vs. "Error 404"
  • Success confirmation: Always confirm what happened

Testing MCP Tools

Manual Testing

# Terminal 1: Start MCP server
uv run python -m reeve.mcp.pulse_server

# Terminal 2: Send test input (MCP JSON-RPC protocol)
echo '{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "tools/call",
  "params": {
    "name": "schedule_pulse",
    "arguments": {
      "scheduled_at": "in 5 minutes",
      "prompt": "Test pulse",
      "priority": "normal"
    }
  }
}' | uv run python -m reeve.mcp.pulse_server

Integration Testing

Use the Hapi/Claude Code test environment to call tools and verify behavior.

Next Steps

See deployment.md for production deployment, systemd configuration, and monitoring setup.