Pulse Queue Design¶
Overview¶
The Pulse Queue is the core scheduling mechanism that enables Reeve's proactive behavior. It manages when and why Reeve should "wake up" to perform tasks, check for updates, or process information.
Core Concepts¶
1. Pulse¶
A Pulse represents a single scheduled wake-up event for Reeve. Each pulse contains:
- When to wake up (
scheduled_at) - Why to wake up (
prompt- the context/task) - How urgent it is (
priority) - What context to resume (
session_id,sticky_notes) - Current state (
status)
2. Priority System¶
Pulses have different priority levels to manage Reeve's attention:
class PulsePriority(str, Enum):
"""
Priority levels for pulses, determining execution order when multiple
pulses are due at the same time.
Higher priority pulses are processed first. Priority also affects
interrupt behavior and user notification urgency.
"""
CRITICAL = "critical" # 🚨 Emergency: User messages, system failures
# - Interrupts deep work
# - Bypasses DND settings
# - Example: "Wife texted: Emergency"
HIGH = "high" # 🔔 Important: External events, user-facing tasks
# - Scheduled alarms the user set
# - External triggers (Telegram, Email)
# - Example: "Check flight status before departure"
NORMAL = "normal" # ⏰ Standard: Regular maintenance, scheduled checks
# - Periodic pulses (hourly check)
# - Calendar event reminders
# - Example: "Daily 9 AM: Review calendar"
LOW = "low" # 📋 Background: Non-urgent maintenance
# - Cleanup tasks
# - Background research
# - Example: "Weekly: Archive old notes"
DEFERRED = "deferred" # 🕐 Postponed: Intentionally delayed
# - User explicitly snoozed a task
# - Rescheduled due to conflicts
# - Example: "Moved from Monday to Friday"
Design Rationale: - String Enum: JSON-serializable, human-readable in DB - Explicit semantics: Each level has clear use cases - Emoji convention: Visual scanning in logs/UI - 5 levels: Enough granularity without decision paralysis
3. Pulse Status Lifecycle¶
class PulseStatus(str, Enum):
"""
Status of a pulse in its lifecycle.
State transitions:
PENDING -> PROCESSING -> COMPLETED (success)
-> FAILED (error, will retry)
-> CANCELLED (user/system cancelled)
"""
PENDING = "pending" # Waiting to be executed
PROCESSING = "processing" # Currently executing
COMPLETED = "completed" # Successfully executed
FAILED = "failed" # Execution failed (see error_message)
CANCELLED = "cancelled" # Manually cancelled by user/system
Design Rationale: - Simple states: Easy to reason about, no complex state machine - Processing state: Prevents duplicate execution (idempotency) - Failed vs Cancelled: Failed = retry possible, Cancelled = intentional stop
%%{init: {'theme': 'base', 'themeVariables': { 'lineColor': '#888'}}}%%
flowchart TD
create(["schedule_pulse()"]) --> pending["PENDING"]
pending --> |"Daemon detects due pulse"| processing
processing["PROCESSING"]
processing --> |"Executor runs"| result{Result?}
result --> |Success| completed["COMPLETED"]
result --> |Failure| failed["FAILED"]
failed --> retry{"retry_count < max?"}
retry --> |Yes| new_pending["New PENDING<br/>(exponential backoff)"]
retry --> |No| remains["Remains FAILED"]
cancel(["cancel_pulse()"]) -.-> |"At any time"| cancelled["CANCELLED"]
pending -.-> cancelled
%% Styles
classDef status fill:#4a90a4,stroke:#2e6b7a,color:#fff
classDef success fill:#6b9b76,stroke:#4a7a54,color:#fff
classDef failure fill:#c4955a,stroke:#a67940,color:#fff
classDef action fill:#888,stroke:#666,color:#fff
class pending,processing status
class completed,new_pending success
class failed,remains,cancelled failure
class create,cancel,result,retry action Database Schema¶
Pulse Model¶
from sqlalchemy import Column, Integer, String, DateTime, Text, Enum as SQLEnum, JSON
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import func
from datetime import datetime
from typing import Optional, List
Base = declarative_base()
class Pulse(Base):
"""
Represents a scheduled wake-up event for Reeve.
A pulse is the fundamental unit of Reeve's proactive behavior. It defines
when Reeve should wake up, what it should think about, and with what urgency.
Pulses can be:
- Periodic (e.g., hourly heartbeat checks)
- Aperiodic (e.g., "check ticket prices at 6:45 AM tomorrow")
- Event-triggered (e.g., external Telegram message arrives)
Once a pulse executes, it launches a Hapi session with the pulse's prompt
as the initial context, allowing Reeve to take action.
"""
__tablename__ = "pulses"
# Primary Key
id = Column(Integer, primary_key=True, autoincrement=True)
# Scheduling Information
scheduled_at = Column(
DateTime(timezone=True),
nullable=False,
index=True,
comment="When this pulse should execute (UTC timestamp)"
)
# Execution Context
prompt = Column(
Text,
nullable=False,
comment="The instruction/context for Reeve when this pulse fires. "
"This becomes the initial message in the Hapi session."
)
priority = Column(
SQLEnum(PulsePriority),
nullable=False,
default=PulsePriority.NORMAL,
index=True,
comment="Execution priority (determines order when multiple pulses are due)"
)
# Session Continuity (Optional)
session_id = Column(
String(500),
nullable=True,
comment="Optional Hapi session ID or URL to resume existing context. "
"If None, a new session is created."
)
sticky_notes = Column(
JSON,
nullable=True,
comment="Optional list of reminder strings to inject into the prompt. "
"Example: ['Check if user replied to ski trip', 'Follow up on PR review']"
)
# Execution State
status = Column(
SQLEnum(PulseStatus),
nullable=False,
default=PulseStatus.PENDING,
index=True,
comment="Current execution status of this pulse"
)
# Execution Results (populated after execution)
executed_at = Column(
DateTime(timezone=True),
nullable=True,
comment="When this pulse actually executed (may differ from scheduled_at)"
)
execution_duration_ms = Column(
Integer,
nullable=True,
comment="How long the Hapi session took to complete (milliseconds)"
)
error_message = Column(
Text,
nullable=True,
comment="Error message if status=FAILED. Used for debugging and retry logic."
)
# Retry Logic
retry_count = Column(
Integer,
nullable=False,
default=0,
comment="Number of times this pulse has been retried after failure"
)
max_retries = Column(
Integer,
nullable=False,
default=3,
comment="Maximum retry attempts before giving up"
)
# Metadata
created_at = Column(
DateTime(timezone=True),
nullable=False,
server_default=func.now(),
comment="When this pulse was created (for auditing)"
)
created_by = Column(
String(100),
nullable=False,
default="system",
comment="Who/what created this pulse. Examples: 'reeve', 'telegram_listener', 'user_cli'"
)
tags = Column(
JSON,
nullable=True,
comment="Optional tags for categorization/filtering. "
"Example: ['hourly_check', 'calendar_sync', 'snowboarding']"
)
# Database Indexes (in addition to individual column indexes)
# Composite index for the main query pattern
__table_args__ = (
# Most common query: "Get all pending/processing pulses due before now, ordered by priority"
Index('idx_pulse_execution', 'status', 'scheduled_at', 'priority'),
# For listing upcoming pulses: "What's on Reeve's schedule?"
Index('idx_pulse_upcoming', 'scheduled_at', 'status'),
)
def __repr__(self):
return (f"<Pulse(id={self.id}, scheduled_at={self.scheduled_at}, "
f"priority={self.priority.value}, status={self.status.value})>")
Alembic Migration Strategy¶
Initial Migration (versions/001_create_pulses_table.py):
"""Create pulses table
Revision ID: 001
Revises:
Create Date: 2026-01-19
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import sqlite
revision = '001'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'pulses',
# ... (full schema as defined above)
)
def downgrade():
op.drop_table('pulses')
Why Alembic? - Version-controlled schema changes - Safe migrations in production - Rollback capability - Explicit upgrade path
Queue Management Logic¶
PulseQueue Class¶
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy import select, and_, or_
from datetime import datetime, timezone
from typing import List, Optional
import asyncio
class PulseQueue:
"""
Manages the pulse queue: scheduling, retrieval, and execution tracking.
This is the core business logic layer for pulse management. It provides
a clean API for the MCP server, HTTP API, and daemon scheduler to interact
with the queue.
All database operations are async to support high concurrency.
"""
def __init__(self, db_url: str):
"""
Initialize the pulse queue with a database connection.
Args:
db_url: SQLAlchemy database URL. Examples:
- "sqlite+aiosqlite:///~/.reeve/pulse_queue.db" (async SQLite)
- "postgresql+asyncpg://user:pass@localhost/reeve" (async Postgres)
"""
self.engine = create_async_engine(db_url, echo=False)
self.SessionLocal = async_sessionmaker(
self.engine,
class_=AsyncSession,
expire_on_commit=False
)
async def schedule_pulse(
self,
scheduled_at: datetime,
prompt: str,
priority: PulsePriority = PulsePriority.NORMAL,
session_id: Optional[str] = None,
sticky_notes: Optional[List[str]] = None,
tags: Optional[List[str]] = None,
created_by: str = "system",
max_retries: int = 3
) -> int:
"""
Schedule a new pulse.
Args:
scheduled_at: When to execute (UTC timezone-aware datetime)
prompt: The instruction/context for Reeve
priority: Urgency level (default: NORMAL)
session_id: Optional Hapi session to resume
sticky_notes: Optional reminder strings to inject
tags: Optional categorization tags
created_by: Who created this pulse (for auditing)
max_retries: Max retry attempts on failure
Returns:
The pulse ID (integer)
Example:
pulse_id = await queue.schedule_pulse(
scheduled_at=datetime(2026, 1, 20, 9, 0, tzinfo=timezone.utc),
prompt="Daily morning briefing: Review calendar and tasks",
priority=PulsePriority.NORMAL,
tags=["daily", "morning_routine"]
)
"""
async with self.SessionLocal() as session:
pulse = Pulse(
scheduled_at=scheduled_at,
prompt=prompt,
priority=priority,
session_id=session_id,
sticky_notes=sticky_notes,
tags=tags,
created_by=created_by,
max_retries=max_retries,
status=PulseStatus.PENDING
)
session.add(pulse)
await session.commit()
await session.refresh(pulse)
return pulse.id
async def get_due_pulses(self, limit: int = 10) -> List[Pulse]:
"""
Get pulses that are due for execution.
Returns pulses where:
- scheduled_at <= now
- status = PENDING
- Ordered by: priority DESC, scheduled_at ASC
This ensures high-priority pulses execute first, and among same-priority
pulses, older ones execute first (FIFO).
Args:
limit: Maximum number of pulses to return
Returns:
List of Pulse objects ready for execution
"""
async with self.SessionLocal() as session:
now = datetime.now(timezone.utc)
stmt = (
select(Pulse)
.where(
and_(
Pulse.scheduled_at <= now,
Pulse.status == PulseStatus.PENDING
)
)
.order_by(
# Sort by priority enum value order (CRITICAL first)
Pulse.priority,
# Then by time (oldest first)
Pulse.scheduled_at
)
.limit(limit)
)
result = await session.execute(stmt)
return list(result.scalars().all())
async def get_upcoming_pulses(
self,
limit: int = 20,
include_statuses: Optional[List[PulseStatus]] = None
) -> List[Pulse]:
"""
Get upcoming scheduled pulses (for visibility/introspection).
Args:
limit: Maximum number of pulses to return
include_statuses: Filter by status (default: [PENDING])
Returns:
List of Pulse objects ordered by scheduled_at
"""
if include_statuses is None:
include_statuses = [PulseStatus.PENDING]
async with self.SessionLocal() as session:
stmt = (
select(Pulse)
.where(Pulse.status.in_(include_statuses))
.order_by(Pulse.scheduled_at)
.limit(limit)
)
result = await session.execute(stmt)
return list(result.scalars().all())
async def mark_processing(self, pulse_id: int) -> bool:
"""
Mark a pulse as currently processing (prevents duplicate execution).
Args:
pulse_id: The pulse to mark
Returns:
True if successfully marked, False if pulse was already processing/completed
"""
async with self.SessionLocal() as session:
pulse = await session.get(Pulse, pulse_id)
if not pulse or pulse.status != PulseStatus.PENDING:
return False
pulse.status = PulseStatus.PROCESSING
await session.commit()
return True
async def mark_completed(
self,
pulse_id: int,
execution_duration_ms: int
) -> None:
"""
Mark a pulse as successfully completed.
Args:
pulse_id: The pulse to mark
execution_duration_ms: How long execution took
"""
async with self.SessionLocal() as session:
pulse = await session.get(Pulse, pulse_id)
if pulse:
pulse.status = PulseStatus.COMPLETED
pulse.executed_at = datetime.now(timezone.utc)
pulse.execution_duration_ms = execution_duration_ms
await session.commit()
async def mark_failed(
self,
pulse_id: int,
error_message: str,
should_retry: bool = True
) -> Optional[int]:
"""
Mark a pulse as failed.
If retry is enabled and max_retries not exceeded, schedules a new
retry pulse.
Args:
pulse_id: The pulse to mark as failed
error_message: Description of the failure
should_retry: Whether to attempt retry
Returns:
New pulse ID if retried, None otherwise
"""
async with self.SessionLocal() as session:
pulse = await session.get(Pulse, pulse_id)
if not pulse:
return None
pulse.status = PulseStatus.FAILED
pulse.error_message = error_message
pulse.executed_at = datetime.now(timezone.utc)
# Retry logic
new_pulse_id = None
if should_retry and pulse.retry_count < pulse.max_retries:
# Schedule retry with exponential backoff: 2^retry_count minutes
retry_delay_minutes = 2 ** pulse.retry_count
retry_at = datetime.now(timezone.utc) + timedelta(minutes=retry_delay_minutes)
retry_pulse = Pulse(
scheduled_at=retry_at,
prompt=pulse.prompt,
priority=pulse.priority,
session_id=pulse.session_id,
sticky_notes=pulse.sticky_notes,
tags=pulse.tags,
created_by=f"retry_{pulse.created_by}",
max_retries=pulse.max_retries,
retry_count=pulse.retry_count + 1,
status=PulseStatus.PENDING
)
session.add(retry_pulse)
await session.flush()
new_pulse_id = retry_pulse.id
await session.commit()
return new_pulse_id
async def cancel_pulse(self, pulse_id: int) -> bool:
"""
Cancel a pending pulse.
Args:
pulse_id: The pulse to cancel
Returns:
True if cancelled, False if pulse was not in cancellable state
"""
async with self.SessionLocal() as session:
pulse = await session.get(Pulse, pulse_id)
if not pulse or pulse.status not in [PulseStatus.PENDING]:
return False
pulse.status = PulseStatus.CANCELLED
await session.commit()
return True
async def reschedule_pulse(
self,
pulse_id: int,
new_scheduled_at: datetime
) -> bool:
"""
Reschedule a pending pulse to a different time.
Args:
pulse_id: The pulse to reschedule
new_scheduled_at: New execution time
Returns:
True if rescheduled, False if pulse was not pending
"""
async with self.SessionLocal() as session:
pulse = await session.get(Pulse, pulse_id)
if not pulse or pulse.status != PulseStatus.PENDING:
return False
pulse.scheduled_at = new_scheduled_at
await session.commit()
return True
Design Decisions & Rationale¶
1. Why Async SQLAlchemy?¶
Decision: Use AsyncSession and async_sessionmaker
Rationale: - Daemon runs an asyncio event loop (concurrent MCP server + HTTP API + scheduler) - Blocking DB calls would stall the entire event loop - Async allows thousands of concurrent operations - Future-proof for high-throughput scenarios
2. Why String Enums?¶
Decision: class PulsePriority(str, Enum)
Rationale: - JSON-serializable (for API responses, MCP tool returns) - Human-readable in database (SQLite browser shows "high" not 2) - Type-safe in Python code - Better error messages ("got 'urgent', expected 'high'")
3. Why Composite Indexes?¶
Decision: Index('idx_pulse_execution', 'status', 'scheduled_at', 'priority')
Rationale: - The main query (get_due_pulses) filters by status + scheduled_at, sorts by priority - Composite index covers entire query (no table scan) - SQLite can use leftmost prefix for other queries - Tradeoff: Slightly slower writes (index maintenance), but reads are 100x faster
4. Why Retry Logic in DB?¶
Decision: Store retry_count and max_retries in Pulse model
Rationale: - Self-contained: Each pulse knows its own retry policy - Exponential backoff: 1min → 2min → 4min → 8min - Failure forensics: Can query "which pulses failed repeatedly?" - Future: Could add per-priority retry policies
5. Why Separate created_by Field?¶
Decision: Track who/what created each pulse
Rationale: - Debugging: "Why did this pulse fire?" - Analytics: "How many pulses come from Telegram vs. Reeve itself?" - Audit trail: Security/compliance - Future: Rate limiting per source
Execution Flow¶
Daemon Main Loop¶
┌─────────────────────────────────────────┐
│ While True (every 1 second): │
│ │
│ 1. Get due pulses from queue │
│ 2. For each pulse (up to 10): │
│ a. Mark as PROCESSING │
│ b. Launch Hapi session (async) │
│ c. Wait for completion │
│ d. Mark as COMPLETED/FAILED │
│ 3. Sleep 1 second │
└─────────────────────────────────────────┘
Why 1 second polling? - Sub-second precision (vs. cron's 1-minute minimum) - Low CPU overhead (DB query is cheap with indexes) - Can batch multiple due pulses in one iteration - Future: Could use DB triggers + notify/listen for zero-latency
Parallel Execution¶
Option A: Sequential (Initial Implementation) - Process one pulse at a time - Simple, predictable - Good for debugging
Option B: Parallel (Future Optimization) - Process up to N pulses concurrently - Faster overall throughput - Risk: Resource contention (CPU, Hapi sessions)
Recommendation: Start with Sequential, add Parallel after observing real-world load.
Next Steps¶
See mcp-integration.md for how Reeve interacts with this queue via MCP tools.