Phase 02: LLM Client, Inference Abstraction, and Constrained Output¶
1. Overview¶
This phase builds the LLM calling infrastructure that every agent in TheAct will use. It sits between the raw OpenAI Python library and the turn engine (Phase 03). The goal is a thin, focused layer that handles the concerns unique to our system:
- Async support -- post-turn processing (memory updates + game state check) runs in parallel via
asyncio.gather - Streaming -- tokens yield as they arrive so the CLI (Phase 04) can display them in real time
- Thinking token separation -- The thinking model produces reasoning tokens before the actual response; we capture both streams separately
- YAML-based structured output -- small models cannot reliably produce JSON schema output; we ask for YAML in fenced blocks and parse it ourselves
- Retry with feedback -- when YAML parsing fails, we retry with the parse error so the model can self-correct
- Token estimation -- simple character-based estimation for context budget decisions in Phase 03
- Per-agent configuration -- different temperature, max_tokens, and system prompt patterns for narrator vs. character vs. post-turn agents
What this phase does NOT cover: prompt templates, context assembly, conversation history management, or turn orchestration. Those belong to Phase 03.
File Layout¶
All code from this phase lives under src/theact/llm/:
src/theact/llm/
__init__.py # Public API re-exports
config.py # LLMConfig, AgentConfig, defaults
client.py # Thin wrapper around AsyncOpenAI
inference.py # complete(), stream(), complete_structured()
streaming.py # StreamResult, async generator, thinking separation
parsing.py # YAML extraction and parsing
tokens.py # Token estimation utilities
2. Architecture¶
The layers, bottom to top:
+----------------------------------------------------------+
| Phase 03: Turn Engine / Agents |
| (calls inference functions with messages + config) |
+----------------------------------------------------------+
|
v
+----------------------------------------------------------+
| inference.py |
| complete() -- non-streaming, returns LLMResult |
| stream() -- streaming, yields StreamChunk |
| complete_structured() -- YAML parse + retry loop |
+----------------------------------------------------------+
|
v
+----------------------------------------------------------+
| client.py |
| get_client() -> AsyncOpenAI (singleton) |
| Configured from LLMConfig (base_url, api_key, model) |
+----------------------------------------------------------+
|
v
+----------------------------------------------------------+
| openai.AsyncOpenAI |
| chat.completions.create(stream=True/False) |
+----------------------------------------------------------+
|
v
+----------------------------------------------------------+
| OpenAI-compatible endpoint |
+----------------------------------------------------------+
Key design choice: everything is async. Even "non-streaming" calls use AsyncOpenAI so that the turn engine can asyncio.gather multiple calls. The sync entry point is just asyncio.run() at the CLI layer.
3. API Design¶
3.1 Configuration (config.py)¶
from dataclasses import dataclass, field
from typing import Optional
@dataclass(frozen=True)
class LLMConfig:
"""Global LLM configuration. One instance per game session."""
base_url: str = "https://api.openai.com/v1"
api_key: str = "" # loaded from env
model: str = "olafangensan-glm-4.7-flash-heretic"
default_temperature: float = 1.0
default_max_tokens: int = 900
context_limit: int = 8192 # model's context window size
@dataclass(frozen=True)
class AgentLLMConfig:
"""Per-agent-type overrides. Merged with LLMConfig at call time."""
temperature: Optional[float] = None
max_tokens: Optional[int] = None
structured: bool = False # whether to parse YAML from response
max_retries: int = 2 # retries on YAML parse failure
retry_temperature_bump: float = 0.1 # increase temp on each retry
# Sensible defaults for each agent type
NARRATOR_CONFIG = AgentLLMConfig(
temperature=1.0,
max_tokens=600,
structured=True,
max_retries=2,
)
CHARACTER_CONFIG = AgentLLMConfig(
temperature=1.0,
max_tokens=400,
structured=False,
)
MEMORY_UPDATE_CONFIG = AgentLLMConfig(
temperature=0.3,
max_tokens=500,
structured=True,
max_retries=2,
)
GAME_STATE_CONFIG = AgentLLMConfig(
temperature=0.2,
max_tokens=200,
structured=True,
max_retries=2,
)
SUMMARIZER_CONFIG = AgentLLMConfig(
temperature=0.3,
max_tokens=300,
structured=False,
)
3.2 Client (client.py)¶
from openai import AsyncOpenAI
from theact.llm.config import LLMConfig
_client: AsyncOpenAI | None = None
def get_client(config: LLMConfig) -> AsyncOpenAI:
"""Return a singleton AsyncOpenAI client configured ."""
# NOTE: Once created, the singleton ignores subsequent configs.
# Call reset_client() first if the config has changed.
global _client
if _client is None:
_client = AsyncOpenAI(
base_url=config.base_url,
api_key=config.api_key,
)
return _client
def reset_client() -> None:
"""Reset the singleton. Useful for tests."""
global _client
_client = None
3.3 Result Types (streaming.py)¶
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class LLMResult:
"""Complete response from a non-streaming LLM call."""
content: str
thinking: str # thinking/reasoning tokens (may be empty)
finish_reason: str # "stop", "length", etc.
prompt_tokens: Optional[int] = None
completion_tokens: Optional[int] = None
@property
def total_tokens(self) -> Optional[int]:
if self.prompt_tokens is not None and self.completion_tokens is not None:
return self.prompt_tokens + self.completion_tokens
return None
@dataclass
class StreamChunk:
"""A single chunk from a streaming response."""
content: str = "" # response content delta
thinking: str = "" # thinking content delta
finish_reason: Optional[str] = None
@property
def is_thinking(self) -> bool:
return len(self.thinking) > 0
@property
def is_content(self) -> bool:
return len(self.content) > 0
@property
def is_done(self) -> bool:
return self.finish_reason is not None
@dataclass
class StructuredResult:
"""Result from a structured (YAML-parsed) LLM call."""
data: dict # parsed YAML as a Python dict
raw_content: str # the full text response
thinking: str
attempts: int = 1 # how many tries it took
finish_reason: str = "stop"
prompt_tokens: Optional[int] = None
completion_tokens: Optional[int] = None
3.4 Inference Functions (inference.py)¶
from typing import AsyncIterator
from theact.llm.config import LLMConfig, AgentLLMConfig
from theact.llm.streaming import LLMResult, StreamChunk, StructuredResult
Message = dict[str, str] # {"role": "...", "content": "..."}
async def complete(
messages: list[Message],
llm_config: LLMConfig,
agent_config: AgentLLMConfig | None = None,
) -> LLMResult:
"""
Non-streaming completion. Returns the full response at once.
Used for post-turn processing where streaming isn't needed.
Note: This function must also extract thinking tokens from non-streaming
responses. It should check `message.model_extra` for a `reasoning_content`
field and also parse `<think>...</think>` tags from `message.content`,
removing them from the `content` field and placing them in the `thinking`
field of the returned LLMResult.
"""
...
async def stream(
messages: list[Message],
llm_config: LLMConfig,
agent_config: AgentLLMConfig | None = None,
) -> AsyncIterator[StreamChunk]:
"""
Streaming completion. Yields StreamChunk objects as tokens arrive.
Each chunk contains either thinking or content text (never both).
Used by narrator and character agents for real-time CLI display.
"""
...
async def complete_structured(
messages: list[Message],
llm_config: LLMConfig,
agent_config: AgentLLMConfig | None = None,
yaml_hint: str = "",
) -> StructuredResult:
"""
Non-streaming completion that parses YAML from the response.
Retries with error feedback on parse failure.
yaml_hint is an optional description of the expected YAML structure,
included in the retry prompt to help the model self-correct.
"""
...
async def stream_structured(
messages: list[Message],
llm_config: LLMConfig,
agent_config: AgentLLMConfig | None = None,
yaml_hint: str = "",
) -> tuple[AsyncIterator[StreamChunk], "asyncio.Future[StructuredResult]"]:
"""
Streaming completion that also parses YAML after the stream completes.
Returns both the stream (for live display) and a future that resolves
to the parsed StructuredResult.
The caller can iterate the stream for live display, and then await
the future to get the parsed data.
IMPORTANT: The caller MUST fully consume the stream iterator before
awaiting the future. If the stream is not fully consumed (e.g., the
caller breaks early), the future will never resolve.
"""
...
4. Structured Output¶
4.1 Strategy¶
Small 7B models cannot reliably use JSON mode or tool-call-based structured output. Instead:
- The system prompt instructs the model to output YAML inside a fenced code block.
- We extract the YAML block from the response text.
- We parse it with
yaml.safe_load(). - On failure, we retry by appending the parse error as a user message.
Only certain agent types need structured output: - Narrator -- returns narration text + metadata - Memory update -- returns structured diff - Game state check -- returns a simple status object - Character -- plain text, no structure needed
4.2 YAML Templates¶
These templates are included in the system prompt for each agent type. The model sees the template and fills it in.
Narrator Response¶
The system prompt includes:
Respond with your narration, then provide metadata in a YAML block:
```yaml
narration: |
[Your narration text here. Can be multiple lines.
Use the YAML literal block scalar.]
responding_characters:
- character_id_1
- character_id_2
mood: [tense | calm | urgent | mysterious | humorous | dramatic | melancholic]
add:
- "Short factual statement about something new learned or that happened"
- "Another fact"
remove:
- "Exact text of a memory entry that is no longer relevant"
update:
- old: "Exact text of existing memory entry"
new: "Updated version of that memory entry"
summary: "One-sentence summary of what changed this turn"
chapter_complete: false
reason: "One sentence explaining progress or why not complete"
new_beats:
- "Exact beat text that was hit this turn"
### 4.3 YAML Parsing (`parsing.py`)
```python
import re
import yaml
from typing import Any
class YAMLParseError(Exception):
"""Raised when YAML extraction or parsing fails."""
def __init__(self, message: str, raw_content: str):
super().__init__(message)
self.raw_content = raw_content
def extract_yaml_block(text: str) -> str:
"""
Extract YAML content from a fenced code block in the response.
Looks for ```yaml ... ``` first.
Falls back to ``` ... ``` (unfenced but code-blocked).
Falls back to treating the entire response as YAML if no blocks found.
"""
# Try ```yaml ... ``` first.
# Use findall and take the LAST match -- the model may include example
# YAML blocks earlier in its response before the actual answer.
matches = re.findall(r"```yaml\s*\n(.*?)```", text, re.DOTALL)
if matches:
return matches[-1].strip()
# Try generic ``` ... ```
matches = re.findall(r"```\s*\n(.*?)```", text, re.DOTALL)
if matches:
return matches[-1].strip()
# No code block found -- try the whole text as YAML.
# WARNING: This fallback can produce false positives (e.g., plain English
# text parsed as a YAML string). The implementation should log a warning
# when this path is taken.
return text.strip()
def parse_yaml_response(text: str) -> dict[str, Any]:
"""
Extract and parse YAML from LLM response text.
Raises YAMLParseError with a descriptive message on failure.
"""
yaml_str = extract_yaml_block(text)
try:
result = yaml.safe_load(yaml_str)
except yaml.YAMLError as e:
raise YAMLParseError(
f"YAML parse error: {e}",
raw_content=text,
) from e
if not isinstance(result, dict):
raise YAMLParseError(
f"Expected YAML to parse as a dictionary, got {type(result).__name__}",
raw_content=text,
)
return result
def validate_yaml_fields(
data: dict[str, Any],
required_fields: list[str],
) -> list[str]:
"""
Check that required fields are present. Returns list of missing field names.
Does not raise -- caller decides whether to retry or proceed with partial data.
"""
return [f for f in required_fields if f not in data]
4.4 Retry Flow¶
The retry logic lives in complete_structured() in inference.py. Pseudocode:
async def complete_structured(messages, llm_config, agent_config, yaml_hint=""):
config = agent_config or AgentLLMConfig(structured=True)
attempts = 0
last_error = ""
working_messages = list(messages) # copy so retries don't pollute original
while attempts <= config.max_retries:
attempts += 1
temperature = (config.temperature or llm_config.default_temperature)
if attempts > 1:
temperature += config.retry_temperature_bump * (attempts - 1)
result = await complete(
working_messages,
llm_config,
AgentLLMConfig(
temperature=temperature,
max_tokens=config.max_tokens,
structured=False, # we parse ourselves
),
)
try:
data = parse_yaml_response(result.content)
return StructuredResult(
data=data,
raw_content=result.content,
thinking=result.thinking,
attempts=attempts,
finish_reason=result.finish_reason,
prompt_tokens=result.prompt_tokens,
completion_tokens=result.completion_tokens,
)
except YAMLParseError as e:
last_error = str(e)
# Append correction message for retry.
# NOTE on context growth: each retry adds ~2 messages (assistant
# response + user correction). With max_retries=2, this adds up
# to 4 extra messages. For small models with tight context
# windows, consider truncating the failed response to the first
# 200 chars to limit context growth.
working_messages.append({
"role": "assistant",
"content": result.content[:200] + ("..." if len(result.content) > 200 else ""),
})
correction = f"Your response could not be parsed. Error: {last_error}"
if yaml_hint:
correction += f"\n\nPlease output valid YAML matching this structure:\n{yaml_hint}"
correction += "\n\nPlease try again with valid YAML in a ```yaml``` code block."
working_messages.append({
"role": "user",
"content": correction,
})
# All retries exhausted -- raise
raise YAMLParseError(
f"Failed to parse YAML after {attempts} attempts. Last error: {last_error}",
raw_content=result.content,
)
5. Streaming¶
5.1 How Thinking Tokens Are Returned¶
The OpenAI-compatible endpoint for thinking models returns thinking/reasoning tokens as part of the streamed response. Since the official openai library (v2.29.0) does not have a dedicated reasoning_content field on ChoiceDelta, The provider delivers thinking tokens through one of these mechanisms (we must handle both):
reasoning_contentas an extra field on the delta dict -- accessible viagetattr(chunk.choices[0].delta, "reasoning_content", None)or through the raw dict representationchunk.model_extra/chunk.choices[0].delta.model_extra.- Thinking content wrapped in
<think>...</think>tags within the regularcontentfield -- some OpenAI-compatible endpoints embed thinking this way.
Our streaming layer will detect and handle both approaches.
5.2 Stream Processing¶
Note on think tag handling: Streaming chunks can split <think> or </think> tags across boundaries. The implementation should buffer the last few characters when they could be the start of a tag (<, <t, <th, etc.) and flush them on the next chunk. For v1, a simpler approach: if a chunk ends with < or starts with partial tag text, buffer it. The implementation may need to handle this pragmatically based on observed provider behavior.
from typing import AsyncIterator
from openai import AsyncOpenAI
from openai.types.chat import ChatCompletionChunk
from theact.llm.streaming import StreamChunk, LLMResult
async def _process_stream(
response, # AsyncStream[ChatCompletionChunk]
) -> AsyncIterator[StreamChunk]:
"""
Process an OpenAI streaming response, separating thinking from content.
Yields StreamChunk objects.
"""
in_think_tag = False
# Buffer for partial tag detection. If a chunk ends with characters that
# could be the start of a <think> or </think> tag, we hold them here and
# prepend them to the next chunk before processing.
tag_buffer = ""
async for chunk in response:
if not chunk.choices:
continue
delta = chunk.choices[0].delta
finish_reason = chunk.choices[0].finish_reason
# Strategy 1: Check for reasoning_content in model_extra
reasoning = None
if hasattr(delta, "model_extra") and delta.model_extra:
reasoning = delta.model_extra.get("reasoning_content")
# Also check "reasoning" as a fallback key
if reasoning is None:
reasoning = delta.model_extra.get("reasoning")
if reasoning:
yield StreamChunk(thinking=reasoning)
continue
content = delta.content or ""
# Prepend any buffered partial-tag characters from the previous chunk
if tag_buffer:
content = tag_buffer + content
tag_buffer = ""
# Buffer trailing characters that could be the start of a tag.
# Partial prefixes of "<think>" or "</think>" should be held back.
_TAG_PREFIXES = ("<", "<t", "<th", "<thi", "<thin", "<think",
"</", "</t", "</th", "</thi", "</thin", "</think")
for i in range(min(len(content), 8), 0, -1):
if content[-i:] in _TAG_PREFIXES:
tag_buffer = content[-i:]
content = content[:-i]
break
# Strategy 2: Detect <think>...</think> tags in content.
# Handle case where a single chunk contains both <think> and </think>.
if "<think>" in content:
in_think_tag = True
before, _, after = content.partition("<think>")
if before:
yield StreamChunk(content=before)
# Check if </think> also appears in the remainder (same chunk)
if "</think>" in after:
in_think_tag = False
think_text, _, post_think = after.partition("</think>")
if think_text:
yield StreamChunk(thinking=think_text)
if post_think:
yield StreamChunk(content=post_think)
elif after:
yield StreamChunk(thinking=after)
continue
if "</think>" in content:
in_think_tag = False
before, _, after = content.partition("</think>")
if before:
yield StreamChunk(thinking=before)
if after:
yield StreamChunk(content=after)
continue
if in_think_tag:
yield StreamChunk(thinking=content)
elif content:
yield StreamChunk(content=content)
if finish_reason:
# Flush any remaining buffer as-is
if tag_buffer:
if in_think_tag:
yield StreamChunk(thinking=tag_buffer)
else:
yield StreamChunk(content=tag_buffer)
tag_buffer = ""
yield StreamChunk(finish_reason=finish_reason)
async def collect_stream(
stream: AsyncIterator[StreamChunk],
) -> LLMResult:
"""
Consume an entire stream and collect into an LLMResult.
Useful when you want streaming display but also need the final result.
"""
content_parts: list[str] = []
thinking_parts: list[str] = []
finish_reason = "stop"
async for chunk in stream:
if chunk.content:
content_parts.append(chunk.content)
if chunk.thinking:
thinking_parts.append(chunk.thinking)
if chunk.finish_reason:
finish_reason = chunk.finish_reason
return LLMResult(
content="".join(content_parts),
thinking="".join(thinking_parts),
finish_reason=finish_reason,
)
5.3 Tee Pattern for Stream + Parse¶
For stream_structured(), we need to both yield chunks for live display AND collect the full text for YAML parsing. We use an async tee pattern:
import asyncio
from typing import AsyncIterator
from theact.llm.streaming import StreamChunk, StructuredResult
async def stream_structured(messages, llm_config, agent_config=None, yaml_hint=""):
"""
Returns (stream, future) where:
- stream is an AsyncIterator[StreamChunk] for live display
- future is an asyncio.Future[StructuredResult] that resolves after
the stream is fully consumed
"""
result_future: asyncio.Future[StructuredResult] = asyncio.Future()
content_parts: list[str] = []
thinking_parts: list[str] = []
raw_stream = stream(messages, llm_config, agent_config)
async def tee_stream() -> AsyncIterator[StreamChunk]:
finish_reason = "stop"
try:
async for chunk in raw_stream:
if chunk.content:
content_parts.append(chunk.content)
if chunk.thinking:
thinking_parts.append(chunk.thinking)
if chunk.finish_reason:
finish_reason = chunk.finish_reason
yield chunk
# Stream done -- parse YAML
full_content = "".join(content_parts)
full_thinking = "".join(thinking_parts)
try:
data = parse_yaml_response(full_content)
result_future.set_result(StructuredResult(
data=data,
raw_content=full_content,
thinking=full_thinking,
finish_reason=finish_reason,
))
except YAMLParseError as e:
result_future.set_exception(e)
except Exception as e:
result_future.set_exception(e)
return tee_stream(), result_future
5.4 How Phase 04 (CLI) Will Consume This¶
Phase 04 will consume the stream roughly like this (shown here for context, not implemented in this phase):
# Pseudocode for CLI consumption
stream_iter = await stream(messages, config)
async for chunk in stream_iter:
if chunk.is_thinking:
ui.append_thinking(chunk.thinking)
elif chunk.is_content:
ui.append_content(chunk.content)
elif chunk.is_done:
ui.finalize()
6. Error Handling¶
6.1 Error Types¶
class LLMError(Exception):
"""Base exception for LLM-related errors."""
pass
class LLMConnectionError(LLMError):
"""Failed to connect to the API endpoint."""
pass
class LLMRateLimitError(LLMError):
"""Rate limited by the API."""
def __init__(self, message: str, retry_after: float | None = None):
super().__init__(message)
self.retry_after = retry_after
class LLMResponseError(LLMError):
"""The API returned an error response."""
pass
# YAMLParseError is defined in parsing.py (see section 4.3)
6.2 API Error Handling¶
The complete() and stream() functions wrap OpenAI client exceptions:
import openai
from theact.llm.config import LLMConfig, AgentLLMConfig
async def _call_api(messages, llm_config, agent_config, stream_mode=False):
"""Internal: make the actual API call with error wrapping."""
client = get_client(llm_config)
temperature = agent_config.temperature if agent_config and agent_config.temperature is not None else llm_config.default_temperature
max_tokens = agent_config.max_tokens if agent_config and agent_config.max_tokens is not None else llm_config.default_max_tokens
try:
return await client.chat.completions.create(
model=llm_config.model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
stream=stream_mode,
)
except openai.APIConnectionError as e:
raise LLMConnectionError(f"Cannot reach {llm_config.base_url}: {e}") from e
except openai.RateLimitError as e:
retry_after = None
if hasattr(e, "response") and e.response is not None:
retry_after_header = e.response.headers.get("retry-after")
if retry_after_header:
retry_after = float(retry_after_header)
raise LLMRateLimitError(str(e), retry_after=retry_after) from e
except openai.APIStatusError as e:
raise LLMResponseError(f"API error {e.status_code}: {e.message}") from e
6.3 Retry Strategy Summary¶
| Scenario | Retry? | Strategy |
|---|---|---|
| YAML parse failure | Yes, up to max_retries | Append error + assistant response to messages, bump temperature |
| API connection error | No | Raise immediately, let caller decide |
| Rate limit (429) | No at this layer | Raise LLMRateLimitError with retry_after; Phase 03 can implement backoff |
| API 500 error | No | Raise immediately |
finish_reason == "length" | No | Return result as-is; caller can check finish_reason and re-request with higher max_tokens |
The retry logic is deliberately kept simple. Only YAML parse failures are retried automatically because that is the most common failure mode with small models and can be self-corrected by the model.
7. Token Estimation¶
7.1 Approach¶
We use a simple character-based heuristic. For English text with typical LLM tokenizers, the ratio is approximately 1 token per 4 characters. This is imprecise but sufficient for context budget management (deciding when to summarize history, how much context to include, etc.).
We do NOT add tiktoken as a dependency. Reasons: - The model uses its own tokenizer, not OpenAI's - Exact counts are not needed -- we only need rough estimates for budget decisions - Keeps dependencies minimal
7.2 Implementation (tokens.py)¶
# Approximate characters-per-token ratio for English text.
# This is a rough heuristic. Real tokenizers vary (3.5-4.5 chars/token).
CHARS_PER_TOKEN = 4
# Overhead per message in a chat completion (role, formatting, separators).
# OpenAI charges ~4 tokens per message for formatting.
MESSAGE_OVERHEAD_TOKENS = 4
def estimate_tokens(text: str) -> int:
"""Estimate the token count for a string of text."""
if not text:
return 0
return max(1, len(text) // CHARS_PER_TOKEN)
def estimate_messages_tokens(messages: list[dict[str, str]]) -> int:
"""
Estimate total token count for a list of chat messages.
Accounts for per-message overhead (role, separators).
"""
total = 0
for msg in messages:
total += MESSAGE_OVERHEAD_TOKENS
total += estimate_tokens(msg.get("content", ""))
total += estimate_tokens(msg.get("role", ""))
# Every conversation has a base overhead of ~3 tokens
total += 3
return total
def tokens_remaining(
messages: list[dict[str, str]],
context_limit: int,
max_completion_tokens: int,
) -> int:
"""
Estimate how many tokens are available for additional context
(e.g., memory injection, world description) before hitting the limit.
Returns negative if already over budget.
"""
used = estimate_messages_tokens(messages)
return context_limit - used - max_completion_tokens
7.3 Context Budget¶
The context limit is configured on LLMConfig.context_limit (default 8192). Phase 03's context assembly uses tokens_remaining() with this value to decide when to summarize and how much history to include.
8. Configuration¶
8.1 What Is Configurable¶
| Setting | Source | Default |
|---|---|---|
base_url | LLM_BASE_URL env var or constructor | https://api.openai.com/v1 |
api_key | LLM_API_KEY env var (required) | -- |
model | LLM_MODEL env var or constructor | olafangensan-glm-4.7-flash-heretic |
default_temperature | constructor | 1.0 |
default_max_tokens | constructor | 900 |
Per-agent temperature | AgentLLMConfig | Falls back to default_temperature |
Per-agent max_tokens | AgentLLMConfig | Falls back to default_max_tokens |
Per-agent max_retries | AgentLLMConfig | 2 |
8.2 Loading Config¶
import os
from theact.llm.config import LLMConfig
def load_llm_config() -> LLMConfig:
"""Load LLM configuration from environment variables."""
api_key = os.environ.get("LLM_API_KEY", "")
if not api_key:
raise ValueError(
"LLM_API_KEY environment variable is required. "
"Set it in your .env file or shell environment."
)
return LLMConfig(
base_url=os.environ.get("LLM_BASE_URL", "https://api.openai.com/v1"),
api_key=api_key,
model=os.environ.get("LLM_MODEL", "olafangensan-glm-4.7-flash-heretic"),
)
9. Implementation Steps¶
Build in this order. Each step is a single file or small commit.
Step 1: Package structure¶
Create src/theact/llm/ with __init__.py. Ensure the package is importable.
Add pyyaml to pyproject.toml dependencies.
Step 2: config.py¶
Implement LLMConfig, AgentLLMConfig, the four agent-type defaults (NARRATOR_CONFIG, CHARACTER_CONFIG, MEMORY_UPDATE_CONFIG, GAME_STATE_CONFIG), and load_llm_config().
Step 3: client.py¶
Implement get_client() and reset_client(). Singleton AsyncOpenAI instance.
Step 4: tokens.py¶
Implement estimate_tokens(), estimate_messages_tokens(), tokens_remaining(), and the context limit constants.
Step 5: parsing.py¶
Implement extract_yaml_block(), parse_yaml_response(), validate_yaml_fields(), and YAMLParseError.
Step 6: streaming.py¶
Implement LLMResult, StreamChunk, StructuredResult dataclasses. Implement _process_stream() for thinking token separation and collect_stream().
Step 7: inference.py¶
Implement complete(), stream(), complete_structured(), stream_structured(), error types, and _call_api().
Step 8: __init__.py¶
Re-export the public API:
from theact.llm.config import (
LLMConfig,
AgentLLMConfig,
load_llm_config,
NARRATOR_CONFIG,
CHARACTER_CONFIG,
MEMORY_UPDATE_CONFIG,
GAME_STATE_CONFIG,
)
from theact.llm.inference import complete, stream, complete_structured, stream_structured
from theact.llm.streaming import LLMResult, StreamChunk, StructuredResult
from theact.llm.parsing import parse_yaml_response, YAMLParseError
from theact.llm.tokens import estimate_tokens, estimate_messages_tokens, tokens_remaining
from theact.llm.client import get_client, reset_client
Step 9: Verification script¶
Write scripts/test_llm.py (see Section 10).
10. Verification¶
10.1 Test Script (scripts/test_llm.py)¶
A standalone script that exercises all the key functionality against the live API endpoint:
"""
Smoke test for the LLM client layer.
Run: uv run python scripts/test_llm.py
Requires LLM_API_KEY in environment or .env file.
"""
import asyncio
from dotenv import load_dotenv
load_dotenv()
from theact.llm import (
load_llm_config,
complete,
stream,
complete_structured,
estimate_tokens,
estimate_messages_tokens,
LLMResult,
StreamChunk,
StructuredResult,
AgentLLMConfig,
NARRATOR_CONFIG,
)
async def test_complete():
"""Test basic non-streaming completion."""
print("=== Test: complete() ===")
config = load_llm_config()
result = await complete(
messages=[{"role": "user", "content": "Say hello in exactly 5 words."}],
llm_config=config,
)
assert isinstance(result, LLMResult)
print(f" Content: {result.content}")
print(f" Thinking: {result.thinking[:100]}..." if result.thinking else " Thinking: (none)")
print(f" Finish reason: {result.finish_reason}")
print(" PASSED\n")
async def test_stream():
"""Test streaming completion with thinking separation."""
print("=== Test: stream() ===")
config = load_llm_config()
thinking_parts = []
content_parts = []
async for chunk in await stream(
messages=[{"role": "user", "content": "What is 2+2? Explain briefly."}],
llm_config=config,
):
if chunk.is_thinking:
thinking_parts.append(chunk.thinking)
elif chunk.is_content:
content_parts.append(chunk.content)
content = "".join(content_parts)
thinking = "".join(thinking_parts)
print(f" Content: {content}")
print(f" Thinking: {thinking[:100]}..." if thinking else " Thinking: (none)")
print(" PASSED\n")
async def test_structured():
"""Test YAML-parsed structured output."""
print("=== Test: complete_structured() ===")
config = load_llm_config()
result = await complete_structured(
messages=[
{"role": "system", "content": (
"You are a narrator for a text RPG. "
"Respond with a brief narration and metadata in a YAML block.\n\n"
"```yaml\n"
"narration: |\n"
" [Your narration here]\n"
"responding_characters:\n"
" - character_1\n"
"mood: calm\n"
"```"
)},
{"role": "user", "content": "I enter the tavern."},
],
llm_config=config,
agent_config=NARRATOR_CONFIG,
yaml_hint="narration: |\\n ...\\nresponding_characters:\\n - ...\\nmood: calm|tense|urgent",
)
assert isinstance(result, StructuredResult)
print(f" Parsed data: {result.data}")
print(f" Attempts: {result.attempts}")
print(f" Raw content: {result.raw_content[:200]}...")
print(" PASSED\n")
async def test_token_estimation():
"""Test token estimation utilities (no API call needed)."""
print("=== Test: token estimation ===")
text = "Hello, world! This is a test of token estimation."
tokens = estimate_tokens(text)
print(f" '{text}' -> ~{tokens} tokens")
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello!"},
]
msg_tokens = estimate_messages_tokens(messages)
print(f" 2-message conversation -> ~{msg_tokens} tokens")
print(" PASSED\n")
async def test_parallel():
"""Test parallel calls via asyncio.gather (simulates post-turn processing)."""
print("=== Test: parallel calls ===")
config = load_llm_config()
async def call(prompt: str) -> str:
result = await complete(
messages=[{"role": "user", "content": prompt}],
llm_config=config,
agent_config=AgentLLMConfig(max_tokens=100),
)
return result.content
results = await asyncio.gather(
call("Say 'alpha' and nothing else."),
call("Say 'beta' and nothing else."),
call("Say 'gamma' and nothing else."),
)
for i, r in enumerate(results):
print(f" Call {i}: {r.strip()}")
print(" PASSED\n")
async def main():
await test_token_estimation() # no API call, always safe
await test_complete()
await test_stream()
await test_structured()
await test_parallel()
print("All tests passed.")
if __name__ == "__main__":
asyncio.run(main())
10.2 What to Check Manually¶
- Thinking tokens appear in
result.thinking(not empty for the thinking model) - Streaming chunks arrive incrementally (not all at once)
- YAML parsing succeeds on first attempt for well-prompted requests
- Retry works when YAML is intentionally malformed (can test by corrupting the response in a mock)
- Parallel calls complete faster than sequential (wall-clock time)
- Token estimation gives reasonable numbers (~1 token per 4 chars)
10.3 Unit Tests (No API Required)¶
These can be plain pytest tests that don't hit the network:
test_extract_yaml_block-- various formats (fenced, unfenced, no block)test_parse_yaml_response-- valid YAML, invalid YAML, non-dict YAMLtest_validate_yaml_fields-- missing fields, all presenttest_estimate_tokens-- empty string, short string, long stringtest_estimate_messages_tokens-- various message liststest_stream_chunk_properties--is_thinking,is_content,is_done
10.4 Live Testing & Regression Capture¶
After the smoke test (scripts/test_llm.py) passes, perform deeper live API validation. The goal is to discover how the real API endpoint behaves and lock down edge cases as automated tests.
Step 1 — Exploratory testing against the real API: - Run scripts/test_llm.py and carefully inspect output. Pay attention to: - Do thinking tokens actually appear in result.thinking? If empty, the thinking token extraction strategy may need adjustment for this endpoint. - Does the model output YAML in fenced blocks when asked? Try multiple prompts. Note which phrasing works best. - What happens when max_tokens is too low and the response is truncated mid-YAML? Does finish_reason == "length" get set? - Does streaming actually yield chunks incrementally, or does the endpoint buffer? - Test complete_structured() with a deliberately bad system prompt (one that's unlikely to produce YAML). Verify retry logic works — the model gets the error feedback and self-corrects. - Test parallel calls with asyncio.gather of 5+ simultaneous requests. Check for rate limiting behavior. - Send a very long prompt (close to context limit). Verify the response isn't empty or garbled.
Step 2 — Fix and capture: - For each unexpected behavior, fix the code and write a regression test. - If the endpoint uses <think> tags instead of reasoning_content, write a test with a real captured response chunk to verify parsing. - If YAML extraction fails on a specific model output pattern (e.g., model writes yaml after the triple backticks with no newline), add that pattern to test_extract_yaml_block. - Save representative real model responses as test fixtures in tests/fixtures/ for offline replay tests.
Step 3 — Verify regression suite: - Run uv run pytest tests/ -v and confirm all new regression tests pass. - Re-run scripts/test_llm.py to confirm live behavior still works after any code changes.
11. Dependencies¶
New Dependencies to Add¶
| Package | Purpose | Version |
|---|---|---|
pyyaml | YAML parsing for structured output | >=6.0 |
Already Present (No Changes)¶
| Package | Purpose |
|---|---|
openai (>=2.29.0) | OpenAI-compatible API client (includes AsyncOpenAI) |
python-dotenv (>=1.2.2) | .env file loading |
Not Adding¶
| Package | Why Not |
|---|---|
tiktoken | Model uses its own tokenizer; char-based estimation is sufficient |
pydantic | Already a transitive dependency via openai, but we use plain dataclasses to keep the layer thin |
tenacity | Our retry logic is simple enough (1 loop) that a retry library is overkill |
aiohttp | openai already handles async HTTP via httpx |