Documentation Index
Fetch the complete documentation index at: https://docs.chataigne.ai/llms.txt
Use this file to discover all available pages before exploring further.
This guide explains how to implement queue-aware logic in your chatbot’s agent.py. The queue system ensures messages are processed sequentially per conversation while allowing the AI to check for new messages and events during execution.
System Architecture
Dual Queue Design
Each conversation maintains two separate Redis queues:
conv:{conversation_id}:messages → User messages (WhatsApp)
conv:{conversation_id}:events → System events (payments, orders)
conv:{conversation_id}:processing → Processing lock (TTL: 2 minutes)
Control Flow Diagram
Processing Priority
- Messages always process first - User intent takes priority
- Events process after messages - System updates handled secondarily
- AI can peek/pop events during message processing - Optional integration
- Maximum 10 iterations, 2-minute timeout - Prevents infinite loops
Environment Variables
REDIS_URL=redis://localhost:6379 # Redis connection URL
How It Works: Step by Step
1. Message Arrival
When a message arrives at /message endpoint:
# main.py - FastAPI endpoint
@app.post("/message")
async def handle_message(payload: ChatbotPayloadDTO):
# Check if conversation is already processing
if await queue_manager.is_processing(payload.conversation.id):
# Add to message queue
await queue_manager.add_message_to_queue(
payload.conversation.id,
payload
)
return {"status": "queued"}
# Not processing - start new processing
await conversation_processor.handle_new_message(
payload.conversation.id,
process_conversation_messages
)
return {"status": "processing"}
2. Event Arrival
When an event arrives at /nexus/event endpoint:
# main.py - FastAPI endpoint
@app.post("/nexus/event")
async def handle_event(event: NexusEvent):
# Events always queue if messages are processing
if await queue_manager.is_processing(event.conversation.id):
await queue_manager.add_event_to_queue(
event.conversation.id,
event
)
return {"status": "queued"}
# Process immediately if no messages
await conversation_processor.handle_new_event(
event.conversation.id,
process_conversation_events
)
return {"status": "processing"}
3. Processing Logic
# Simplified processing flow
async def process_conversation_messages(conversation_id: str):
# Pop all queued messages
messages = await queue_manager.pop_all_messages_from_queue(conversation_id)
for batch in messages:
# Create agent with queue helpers
agent = ChatAgent()
agent.set_queue_helpers(AgentQueueHelpers(queue_manager))
# Process the message batch
await agent.process_message(batch.get_payload())
Understanding Queue Helpers in Agent.py
What Are Queue Helpers?
Queue helpers are your interface to the queue system from within agent.py. They’re automatically injected when your agent processes a message, giving you methods to check for additional messages that arrived while you were thinking/processing.
class ChatAgent:
def __init__(self):
self.queue_helpers = None # Automatically set by main.py
def set_queue_helpers(self, queue_helpers: AgentQueueHelpers):
"""Called by main.py before processing each message"""
self.queue_helpers = queue_helpers
Key Concept: Why Check for New Messages?
While your agent is processing (calling APIs, thinking, generating responses), users might send additional messages:
- “I want pizza” → [Agent starts processing]
- “Make it large” → [Queued]
- “Actually, pepperoni” → [Queued]
Without queue checks, you’d create a small pizza. With queue checks, you can catch these modifications.
Queue Helper Methods Explained
The queue helpers provide several methods to interact with queued messages. Here’s what each does:
# 1. has_new_messages() - Quick boolean check
if await self.queue_helpers.has_new_messages(conversation_id):
# Returns True/False - doesn't tell you what the messages are
print("User sent more messages!")
# 2. check_message_queue() - Look without taking (PEEK)
peeked = await self.queue_helpers.check_message_queue(conversation_id, count=5)
# Returns list of QueuedMessage objects
# Messages STAY in queue - you're just looking
for msg in peeked:
content = msg.get_payload().lastMessage.content
print(f"User said: {content}")
# 3. get_new_messages() - Take all messages (POP)
new_messages = await self.queue_helpers.get_new_messages(conversation_id)
# Returns list of QueuedMessage objects
# Messages are REMOVED from queue
# This is what you'll use 90% of the time
# 4. pop_message_from_queue() - Take just one message
single_msg = await self.queue_helpers.pop_message_from_queue(conversation_id)
# Rarely used - usually you want all messages
Working with QueuedMessage Objects
When you get messages from the queue, they come wrapped in a QueuedMessage object:
# Get messages from queue
messages = await self.queue_helpers.get_new_messages(conversation_id)
for queued_msg in messages:
# Extract the actual ChatbotPayloadDTO
payload = queued_msg.get_payload()
# Now you can access all the usual fields
user_text = payload.lastMessage.content
user_phone = payload.customer.whatsappNumber
timestamp = payload.lastMessage.timestamp
# The payload has the same structure as your main process_message payload
print(f"User {user_phone} said: {user_text}")
Practical Implementation Patterns
Pattern 1: Check Before Critical Actions
The most important pattern is checking for new messages before any action that would be hard to undo:
async def process_message(self, payload: ChatbotPayloadDTO):
conversation_id = payload.conversation.id
# Initial processing of "I want a pizza"
intent = self.analyze_intent(payload)
if intent == "create_order":
# Extract initial order details
order = self.build_order_from_message(payload)
# CRITICAL: Check for new messages before creating order
if self.queue_helpers: # Always check if helpers are available
new_messages = await self.queue_helpers.get_new_messages(conversation_id)
if new_messages:
# User sent "make it large" and "extra cheese" while we processed
self.logger.info(f"Found {len(new_messages)} additional messages")
# Merge all messages together
all_messages = [payload] + [m.get_payload() for m in new_messages]
# Rebuild order with ALL information
order = self.build_order_from_messages(all_messages)
# Let user know we saw everything
await self.send_message(
f"Got it! I've included your {len(new_messages)} additional requests."
)
# Now create order with complete information
await self.create_order(order)
Pattern 2: Continuous Context Building
For longer conversations, continuously aggregate messages:
async def handle_complex_order(self, initial_payload: ChatbotPayloadDTO):
conversation_id = initial_payload.conversation.id
context = {
'messages': [initial_payload.lastMessage.content],
'order_items': [],
'special_requests': []
}
# Process initial message
await self.process_initial_request(initial_payload, context)
# Check for follow-up messages multiple times
while context['status'] != 'complete':
# Simulate some processing time (API calls, etc.)
await self.call_some_api()
# Check if user sent more messages
new_messages = await self.queue_helpers.get_new_messages(conversation_id)
if new_messages:
for msg in new_messages:
payload = msg.get_payload()
user_text = payload.lastMessage.content.lower()
# Add to our running context
context['messages'].append(payload.lastMessage.content)
# Check for special keywords
if 'cancel' in user_text:
return await self.handle_cancellation()
elif 'change' in user_text or 'actually' in user_text:
context['has_modifications'] = True
elif any(size in user_text for size in ['small', 'medium', 'large']):
context['size_preference'] = self.extract_size(user_text)
# Reprocess with new context
await self.update_order_with_context(context)
# Continue processing...
context['status'] = self.check_if_complete(context)
When using tools (like update_order_draft), always check for new messages first:
async def execute_tool_call(self, tool_name: str, tool_args: dict, context: dict):
conversation_id = context['conversation_id']
# Special handling for order-related tools
if tool_name in ['update_order_draft', 'create_order', 'send_order_form']:
# Check for last-minute changes
new_messages = await self.queue_helpers.get_new_messages(conversation_id)
if new_messages:
self.logger.info(f"Found {len(new_messages)} new messages before {tool_name}")
# Extract any modifications from new messages
for msg in new_messages:
payload = msg.get_payload()
content = payload.lastMessage.content
# Quick parse for common modifications
if 'no ' in content.lower():
# "no onions", "no ice", etc.
tool_args = self.apply_negation_to_order(tool_args, content)
elif any(word in content.lower() for word in ['add', 'extra', 'more']):
# "add bacon", "extra cheese"
tool_args = self.apply_addition_to_order(tool_args, content)
elif 'deliver' in content.lower() or 'pickup' in content.lower():
# Service type change
tool_args = self.update_service_type(tool_args, content)
# Add note about modifications
tool_args['customer_note'] = self.merge_customer_notes(
tool_args.get('customer_note', ''),
f"Modified based on {len(new_messages)} additional messages"
)
# Execute tool with potentially modified args
return await self.nexus.execute_tool(tool_name, tool_args)
Pattern 4: Handling Rapid-Fire Messages
Users often send multiple short messages quickly. Aggregate them intelligently:
async def handle_rapid_messages(self, initial_payload: ChatbotPayloadDTO):
conversation_id = initial_payload.conversation.id
messages_buffer = [initial_payload.lastMessage.content]
# Wait a brief moment for rapid-fire messages
await asyncio.sleep(0.5) # Small delay to let messages queue up
# Get all queued messages
new_messages = await self.queue_helpers.get_new_messages(conversation_id)
if new_messages:
# Add all to buffer
for msg in new_messages:
messages_buffer.append(msg.get_payload().lastMessage.content)
# Combine related messages intelligently
combined_intent = self.intelligent_message_combination(messages_buffer)
# Example: ["I want pizza", "large", "pepperoni", "no wait", "hawaiian"]
# Becomes: "I want a large hawaiian pizza" (recognized the correction)
return await self.process_combined_intent(combined_intent)
def intelligent_message_combination(self, messages: List[str]) -> str:
"""Combine multiple messages into coherent intent"""
# Handle corrections (last relevant message wins)
if any('no wait' in m.lower() or 'actually' in m.lower() for m in messages):
# Find the correction point and use messages after it
correction_index = next(
i for i, m in enumerate(messages)
if 'no wait' in m.lower() or 'actually' in m.lower()
)
messages = messages[correction_index:]
# Simple combination for now
return ' '.join(messages)
When to Check for New Messages
Critical Decision Points
Always check for new messages at these key moments:
# 1. Before ANY tool call that creates/modifies data
if tool_name in ['update_order_draft', 'create_order', 'send_order_form']:
new_messages = await self.queue_helpers.get_new_messages(conversation_id)
# 2. After time-consuming operations
response = await self.call_menu_api() # Takes 1-2 seconds
new_messages = await self.queue_helpers.get_new_messages(conversation_id)
# 3. Before sending confirmations or summaries
if "confirm" in response_text or "is this correct" in response_text:
# User might have sent corrections
new_messages = await self.queue_helpers.get_new_messages(conversation_id)
# 4. When conversation seems complete
if intent == "order_complete":
# Last chance to catch "wait, one more thing..."
final_messages = await self.queue_helpers.get_new_messages(conversation_id)
Real Example from Agent.py
Here’s how this actually looks in a real agent implementation:
async def process_message(self, payload: ChatbotPayloadDTO):
"""Main entry point for message processing"""
conversation_id = payload.conversation.id
# Analyze what the user wants
intent = self.extract_intent(payload)
if intent == "order_food":
# Start building the order
order_context = {
'items': self.extract_items(payload),
'conversation_id': conversation_id
}
# Maybe we need to look up menu items (slow operation)
menu_matches = await self.search_menu(order_context['items'])
# CHECK POINT 1: After slow operation
new_msgs = await self.queue_helpers.get_new_messages(conversation_id)
if new_msgs:
# User might have added "make it spicy" or "large size"
for msg in new_msgs:
additional_text = msg.get_payload().lastMessage.content
order_context['modifications'] = self.extract_modifications(additional_text)
# Prepare to create order
order_data = self.build_order_data(menu_matches, order_context)
# CHECK POINT 2: Before tool call
final_msgs = await self.queue_helpers.get_new_messages(conversation_id)
if final_msgs:
# Last-second changes like "actually, cancel the drinks"
order_data = self.apply_final_modifications(order_data, final_msgs)
# Now safe to create order
result = await self.tools.update_order_draft(order_data)
return f"Great! I've added {len(order_data['items'])} items to your order."
Common Pitfalls and Solutions
Pitfall 1: Forgetting to Check
# ❌ BAD: No queue check before order creation
async def create_order_bad(self, payload):
order = self.parse_order(payload)
return await self.tools.create_order(order) # Misses "wait, add fries!"
# ✅ GOOD: Always check before critical actions
async def create_order_good(self, payload):
order = self.parse_order(payload)
# Check for last-minute additions
new_messages = await self.queue_helpers.get_new_messages(payload.conversation.id)
if new_messages:
order = self.merge_order_updates(order, new_messages)
return await self.tools.create_order(order)
Pitfall 2: Checking Too Late
# ❌ BAD: Checking after the action
async def process_order_bad(self, payload):
result = await self.tools.create_order(order_data) # Already created!
# Too late - order already placed
new_messages = await self.queue_helpers.get_new_messages(conversation_id)
# ✅ GOOD: Check before irreversible actions
async def process_order_good(self, payload):
# Check BEFORE creating
new_messages = await self.queue_helpers.get_new_messages(conversation_id)
if new_messages:
order_data = self.update_with_new_messages(order_data, new_messages)
result = await self.tools.create_order(order_data)
Pitfall 3: Not Handling Message Context
# ❌ BAD: Treating each message independently
for msg in new_messages:
content = msg.get_payload().lastMessage.content
if "large" in content:
size = "large"
if "small" in content:
size = "small" # Oops, overwrote large!
# ✅ GOOD: Consider message relationships
messages_text = [m.get_payload().lastMessage.content for m in new_messages]
# ["make it large", "no wait", "small please"]
# Recognize "no wait" as a correction indicator
final_size = self.extract_final_intent(messages_text) # Returns "small"
A Quick Note on Events
While this guide focuses on messages, the queue system also handles events (payment updates, order status changes, etc.). Events work similarly but have lower priority:
# Events are processed after all messages are done
if await self.queue_helpers.has_pending_events(conversation_id):
events = await self.queue_helpers.pop_all_events(conversation_id)
# Handle payment failures, order updates, etc.
The key difference: Messages interrupt processing, events wait their turn.
Summary: Queue-Aware Agent Development
The Mental Model
Think of your agent as having “checkpoints” where you pause and ask: “Did the user say anything else while I was thinking?”
# Your agent's thought process:
1. Receive: "I want pizza"
2. Think: "Let me look up pizzas..." (2 seconds pass)
3. CHECKPOINT: Check queue → Found: "large please"
4. Think: "Okay, large pizza it is"
5. CHECKPOINT: Check queue → Found: "with pepperoni"
6. Action: Create large pepperoni pizza order
Essential Checklist
Final Implementation Tip
The best place to add queue checks is right before you’re about to do something you can’t easily undo. When in doubt, check the queue - it’s a fast operation and prevents user frustration.
# The golden rule:
if about_to_do_something_important:
new_messages = await self.queue_helpers.get_new_messages(conversation_id)
if new_messages:
# Update your understanding before proceeding
context = merge_new_information(context, new_messages)