Quick Start¶
Get Chicory up and running in under 5 minutes!
Prerequisites¶
- Python 3.11 or higher
- Redis server (we'll use Docker)
Step 1: Install Chicory¶
This installs Chicory with Redis support and the CLI tools.
Step 2: Start Redis¶
Using Docker (easiest):
Tip
Already have Redis? Skip this step and make sure it's running on localhost:6379
Step 3: Create Your Tasks¶
Create a file called tasks.py:
tasks.py
import asyncio
from chicory import Chicory, BrokerType, BackendType
# Initialize Chicory
app = Chicory(
broker=BrokerType.REDIS,
backend=BackendType.REDIS,
)
@app.task()
async def add(x: int, y: int) -> int:
"""Add two numbers."""
await asyncio.sleep(1) # Simulate work
return x + y
@app.task()
async def greet(name: str) -> str:
"""Generate a greeting."""
return f"Hello, {name}!"
Step 4: Start a Worker¶
Open a new terminal and run:
Step 5: Dispatch Tasks¶
Create another file called client.py:
client.py
import asyncio
from tasks import app, add, greet
async def main():
# Connect to broker and backend
await app.connect()
try:
# Dispatch a task
print("Dispatching add(10, 32)...")
result = await add.delay(10, 32)
# Wait for the result
print(f"Task ID: {result.task_id}")
answer = await result.get(timeout=10.0)
print(f"Result: {answer}")
# Dispatch another task
print("\nDispatching greet('World')...")
result = await greet.delay("World")
greeting = await result.get(timeout=10.0)
print(f"Result: {greeting}")
finally:
await app.disconnect()
if __name__ == "__main__":
asyncio.run(main())
Run it:
What Just Happened?¶
- Created Tasks: Defined async functions with
@app.task() - Started Worker: Worker connected to Redis and waits for tasks
- Dispatched Tasks: Used
.delay()to send tasks to the queue - Got Results: Used
.get()to wait for and retrieve results
Next Steps¶
Congratulations! You now have a working distributed task queue.
Learn More¶
- Tutorial - Understand core concepts
- Tutorial - In-depth guide with examples
- User Guide - Complete documentation
Try More Features¶
Add retry policies:
from chicory import RetryPolicy, RetryBackoff
@app.task(
retry_policy=RetryPolicy(
max_retries=3,
backoff=RetryBackoff.EXPONENTIAL,
)
)
async def flaky_task() -> str:
# Will retry up to 3 times if it fails
...
Add input validation:
from pydantic import BaseModel, EmailStr
class EmailData(BaseModel):
to: EmailStr
subject: str
body: str
@app.task()
async def send_email(data: EmailData) -> dict:
# Input is automatically validated
...
Use task context:
from chicory import TaskContext
@app.task()
async def advanced_task(context: TaskContext, value: int) -> int:
# Access task metadata
print(f"Task ID: {context.task_id}")
print(f"Attempt: {context.retries + 1}")
return value * 2
That's it! You're now ready to build distributed applications with Chicory.