agent.py. The queue system ensures messages are processed sequentially per conversation while allowing the AI to check for new messages and events during execution.
System Architecture
Dual Queue Design
Each conversation maintains two separate Redis queues:Control Flow Diagram
Processing Priority
- Messages always process first - User intent takes priority
- Events process after messages - System updates handled secondarily
- AI can peek/pop events during message processing - Optional integration
- Maximum 10 iterations, 2-minute timeout - Prevents infinite loops
Environment Variables
How It Works: Step by Step
1. Message Arrival
When a message arrives at/message endpoint:
2. Event Arrival
When an event arrives at/nexus/event endpoint:
3. Processing Logic
Understanding Queue Helpers in Agent.py
What Are Queue Helpers?
Queue helpers are your interface to the queue system from within agent.py. They’re automatically injected when your agent processes a message, giving you methods to check for additional messages that arrived while you were thinking/processing.Key Concept: Why Check for New Messages?
While your agent is processing (calling APIs, thinking, generating responses), users might send additional messages:- “I want pizza” → [Agent starts processing]
- “Make it large” → [Queued]
- “Actually, pepperoni” → [Queued]
Queue Helper Methods Explained
The queue helpers provide several methods to interact with queued messages. Here’s what each does:Working with QueuedMessage Objects
When you get messages from the queue, they come wrapped in aQueuedMessage object:
Practical Implementation Patterns
Pattern 1: Check Before Critical Actions
The most important pattern is checking for new messages before any action that would be hard to undo:Pattern 2: Continuous Context Building
For longer conversations, continuously aggregate messages:Pattern 3: Message Aggregation in Tool Calls
When using tools (like update_order_draft), always check for new messages first:Pattern 4: Handling Rapid-Fire Messages
Users often send multiple short messages quickly. Aggregate them intelligently:When to Check for New Messages
Critical Decision Points
Always check for new messages at these key moments:Real Example from Agent.py
Here’s how this actually looks in a real agent implementation:Common Pitfalls and Solutions
Pitfall 1: Forgetting to Check
Pitfall 2: Checking Too Late
Pitfall 3: Not Handling Message Context
A Quick Note on Events
While this guide focuses on messages, the queue system also handles events (payment updates, order status changes, etc.). Events work similarly but have lower priority:Summary: Queue-Aware Agent Development
The Mental Model
Think of your agent as having “checkpoints” where you pause and ask: “Did the user say anything else while I was thinking?”Essential Checklist
- Always check before tool calls that create/modify data
- Check after slow operations (API calls, complex calculations)
- Aggregate messages intelligently (handle corrections, additions)
- Test with rapid messages (users don’t wait for responses)
- Handle the no-queue case (
if self.queue_helpers:)