RabbitMQ Broker¶
The RabbitMQ broker provides enterprise-grade message queuing with advanced features like priority queues, flexible routing, and native high availability.
Installation¶
Install Chicory with RabbitMQ support:
Quick Start¶
Basic Setup¶
from chicory import Chicory, BrokerType
app = Chicory(broker=BrokerType.RABBITMQ)
@app.task()
async def my_task(x: int) -> int:
return x * 2
# Start worker
# chicory worker tasks:app
Configuration¶
Configure via environment variables:
export CHICORY_BROKER_RABBITMQ_HOST=localhost
export CHICORY_BROKER_RABBITMQ_PORT=5672
export CHICORY_BROKER_RABBITMQ_USERNAME=guest
export CHICORY_BROKER_RABBITMQ_PASSWORD=guest
Or programmatically:
from chicory.config import ChicoryConfig, RabbitMQBrokerConfig
from pydantic import SecretStr
config = ChicoryConfig(
broker=BrokerConfig(
rabbitmq=RabbitMQBrokerConfig(
host="localhost",
port=5672,
username="guest",
password=SecretStr("guest"),
)
)
)
app = Chicory(broker=BrokerType.RABBITMQ, config=config)
Configuration Options¶
Connection Settings¶
from chicory.config import RabbitMQBrokerConfig
from pydantic import SecretStr
config = RabbitMQBrokerConfig(
# Basic connection
host="localhost",
port=5672,
username="guest",
password=SecretStr("guest"),
vhost="/", # Virtual host (optional)
# Or use full AMQP URL (overrides above)
url="amqp://user:pass@localhost:5672//vhost",
# Connection pooling
connection_pool_size=1,
channel_pool_size=10,
channel_acquire_timeout=10.0,
)
Environment variables:
CHICORY_BROKER_RABBITMQ_HOST=localhost
CHICORY_BROKER_RABBITMQ_PORT=5672
CHICORY_BROKER_RABBITMQ_USERNAME=guest
CHICORY_BROKER_RABBITMQ_PASSWORD=guest
CHICORY_BROKER_RABBITMQ_VHOST=/
# Or
CHICORY_BROKER_RABBITMQ_URL=amqp://guest:guest@localhost:5672/
Queue Settings¶
config = RabbitMQBrokerConfig(
# Queue durability
durable_queues=True, # Survive broker restarts
# Queue limits
queue_max_length=100000, # Max messages
queue_max_length_bytes=None, # Max bytes (optional)
# Message TTL
message_ttl=3600000, # 1 hour in milliseconds
# Queue mode
queue_mode="default", # or "lazy" for disk-first
# Priority queues
max_priority=10, # Enable priority 0-10 (None = disabled)
# Prefetch count (QoS)
prefetch_count=1, # Messages per consumer
)
Environment variables:
CHICORY_BROKER_RABBITMQ_DURABLE_QUEUES=true
CHICORY_BROKER_RABBITMQ_QUEUE_MAX_LENGTH=100000
CHICORY_BROKER_RABBITMQ_MESSAGE_TTL=3600000
CHICORY_BROKER_RABBITMQ_QUEUE_MODE=lazy
CHICORY_BROKER_RABBITMQ_MAX_PRIORITY=10
CHICORY_BROKER_RABBITMQ_PREFETCH_COUNT=1
Dead Letter Queue¶
config = RabbitMQBrokerConfig(
# DLQ limits
dlq_max_length=10000,
dlq_message_ttl=None, # No expiration
# DLQ scanning
max_dlq_scan_limit=10000, # Max messages to scan
)
Reconnection¶
config = RabbitMQBrokerConfig(
# Reconnection backoff
reconnect_delay_base=1.0, # Initial delay (seconds)
reconnect_delay_max=60.0, # Maximum delay (seconds)
)
How It Works¶
Queue Structure¶
Chicory creates these RabbitMQ queues:
chicory.queue.{queue} # Main task queue
chicory.dlq.{queue} # Dead-letter queue
chicory.delayed-queue.{queue} # Delayed task staging
Message Flow¶
-
Publishing:
-
Consuming:
-
Dead Letter:
Channel Pooling¶
Chicory maintains connection and channel pools:
# 1 connection pool (for publishing)
# 10 channels (shared across publish operations)
# 1 dedicated connection for consuming
# 1 dedicated channel for consuming
This provides better performance and connection efficiency.
Delivery Modes¶
At-Least-Once Delivery¶
Messages are acknowledged after successful processing:
from chicory.types import DeliveryMode
app = Chicory(
broker=BrokerType.RABBITMQ,
delivery_mode=DeliveryMode.AT_LEAST_ONCE,
)
Flow: 1. Message delivered to worker 2. Worker processes task 3. Worker sends ACK 4. Message removed from queue
On failure: - Worker sends NACK - Message requeued or moved to DLQ
Guarantees: No message loss, possible duplicates
At-Most-Once Delivery¶
Messages are acknowledged before processing:
Flow: 1. Message delivered to worker 2. Worker sends immediate ACK 3. Worker processes task
On failure: - Task lost (already ACKed)
Guarantees: No duplicates, possible message loss
Priority Queues¶
Enable priority-based task processing:
Configuration¶
Publishing Priority Tasks¶
from chicory.types import TaskMessage
# High priority task
message = TaskMessage(
id="task-1",
name="urgent_task",
args=[],
kwargs={},
retries=0,
priority=10, # Highest priority
)
await app.broker.publish(message)
# Normal priority task
message = TaskMessage(..., priority=5)
await app.broker.publish(message)
# Low priority task
message = TaskMessage(..., priority=1)
await app.broker.publish(message)
Priority Overhead
Priority queues have slight performance overhead. Use only when needed.
Dead Letter Queue¶
Native DLX Support¶
RabbitMQ provides native dead-letter exchange (DLX) support:
How it works:
1. Task fails after max retries
2. Worker rejects message (NACK with requeue=False)
3. RabbitMQ routes to DLX automatically
4. Message appears in chicory.dlq.{queue}
DLQ Operations¶
# List DLQ messages
dlq_messages = await app.broker.get_dlq_messages("default", count=100)
for msg in dlq_messages:
print(f"Task: {msg.original_message.name}")
print(f"Error: {msg.error}")
print(f"Failed at: {msg.failed_at}")
# Replay a message
success = await app.broker.replay_from_dlq(
message_id="task-123",
queue="default",
reset_retries=True,
)
# Delete from DLQ
success = await app.broker.delete_from_dlq("task-123", queue="default")
# Purge DLQ
count = await app.broker.purge_dlq("default")
DLQ Scanning
get_dlq_messages() scans the queue, which can be slow for large DLQs. Consider using the RabbitMQ Management API for better performance.
Delayed Tasks¶
Chicory implements delayed tasks using TTL + dead-letter-exchange pattern:
from datetime import datetime, timedelta, UTC
# Schedule task
eta = datetime.now(UTC) + timedelta(hours=1)
message = TaskMessage(..., eta=eta)
await app.broker.publish(message)
How it works:
1. Message published to chicory.delayed-queue.{queue}
2. Message has per-message TTL
3. After TTL expires, RabbitMQ routes to main queue via DLX
4. Worker processes task
Limitations: - Minimum delay: ~1 second - Maximum delay: Limited by RabbitMQ's 32-bit TTL (24 days)
Queue Modes¶
Default Mode¶
Messages kept in memory for fast access:
Use for: - High-throughput, low-latency workloads - Small message sizes - Sufficient RAM available
Lazy Mode¶
Messages moved to disk as soon as possible:
Use for: - Large queue depths - Limited RAM - Large message payloads - Can tolerate slight latency increase
Trade-offs: - Lower memory usage - Slightly higher latency (~10-20%) - Disk I/O becomes bottleneck
Next Steps¶
- Configure Backends for result storage
- Configure Retry Policies for error handling
- Review Configuration for advanced tuning