Task Context¶
The TaskContext provides runtime metadata and advanced control over task execution. It gives you access to information about the current task and allows manual control over retries, failures, and execution flow.
Overview¶
When you need more control over your task's behavior, inject TaskContext as the first parameter of your task function. This special parameter is automatically provided by Chicory and gives you access to:
- Task metadata (ID, name, retry count)
- Manual retry triggering
- Explicit failure handling
- Retry state information
Basic Task Context Usage¶
To use task context, add it as the first parameter with type hint TaskContext:
from chicory import TaskContext
@app.task(
name="tasks.context_aware",
retry_policy=RetryPolicy(max_retries=3, retry_delay=1.0),
)
async def context_aware_task(ctx: TaskContext, value: int) -> dict[str, Any]:
"""
Task that uses context for runtime information.
Args:
ctx: Automatically injected by Chicory
value: Your regular task parameter
"""
# Access task metadata
print(f"Task ID: {ctx.task_id}")
print(f"Task Name: {ctx.task_name}")
print(f"Current attempt: {ctx.retries + 1}")
print(f"Max retries: {ctx.max_retries}")
return {
"value": value,
"task_id": ctx.task_id,
"attempts": ctx.retries + 1,
}
When calling the task, don't pass the context:
# ✅ Correct - context is injected automatically
result = await context_aware_task.delay(value=42)
# ❌ Wrong - don't pass context manually
# result = await context_aware_task.delay(ctx, value=42)
Automatic Injection
The TaskContext parameter is automatically injected by Chicory. You never pass it when calling .delay() or .send().
Task Metadata¶
The context provides access to task identification and retry state:
@app.task(
name="tasks.retry_info",
retry_policy=RetryPolicy(max_retries=5, retry_delay=1.0),
)
async def retry_info_task(ctx: TaskContext, value: int) -> dict:
print(f"Task ID: {ctx.task_id}") # Unique execution ID
print(f"Task Name: {ctx.task_name}") # Registered name
print(f"Attempt: {ctx.retries + 1}") # Current retry count (0-based)
print(f"Max retries: {ctx.max_retries}")
print(f"Remaining: {ctx.remaining_retries}")
if ctx.is_last_retry:
print("This is the final attempt!")
return {"attempt": ctx.retries + 1}
Manual Retry Control¶
Use ctx.retry() to manually trigger a retry:
Basic Manual Retry¶
@app.task(
name="tasks.manual_retry",
retry_policy=RetryPolicy(max_retries=3, retry_delay=1.0),
)
async def manual_retry_task(ctx: TaskContext, value: int) -> dict[str, Any]:
"""Task with manual retry logic."""
print(f"Processing value: {value}")
# Conditional retry logic
if value < 0:
print("Negative value detected, retrying...")
ctx.retry() # Manually trigger retry
# Check if this is the last retry
if ctx.is_last_retry and value < 0:
print("Last attempt with negative value - will fail")
# Don't retry on last attempt, let it fail naturally
return {
"value": value,
"attempts": ctx.retries + 1,
"status": "success",
}
Usage:
# Positive value - succeeds immediately
result = await manual_retry_task.delay(value=10)
data = await result.get(timeout=10.0)
print(f"Attempts: {data['attempts']}") # Output: 1
# Negative value - will retry
result = await manual_retry_task.delay(value=-5)
try:
data = await result.get(timeout=15.0)
print(f"Eventually succeeded after {data['attempts']} attempts")
except Exception as e:
print(f"Failed after {ctx.max_retries + 1} attempts")
Retry with Custom Delay¶
Override the default retry delay for specific situations:
@app.task(
name="tasks.custom_delay_retry",
retry_policy=RetryPolicy(max_retries=5, retry_delay=2.0),
)
async def custom_delay_retry(ctx: TaskContext, data: dict) -> dict:
"""Task with conditional custom retry delays."""
# Check for required field
if "required_field" not in data:
if ctx.retries < 2:
print("Missing required_field, retrying in 5 seconds...")
ctx.retry(countdown=5.0) # Wait 5 seconds before retry
else:
print("Still missing required_field after retries")
# After 2 retries, fail explicitly
ctx.fail(ValueError("required_field is mandatory"))
return {
"data": data,
"processed_at": datetime.now(UTC).isoformat(),
}
Example scenarios:
# Missing required field - will retry with 5s delay
incomplete_data = {"user_id": 123}
result = await custom_delay_retry.delay(data=incomplete_data)
# Retries after 5s, 5s, then fails
# Complete data - succeeds immediately
complete_data = {"user_id": 123, "required_field": "present"}
result = await custom_delay_retry.delay(data=complete_data)
data = await result.get(timeout=10.0) # Success on first attempt
Custom Countdown
Use ctx.retry(countdown=N) to override the retry policy's delay for specific situations, such as:
- Waiting for eventual consistency
- Rate limit cooldowns
- Service-specific backoff requirements
Explicit Failure¶
Use ctx.fail() to explicitly fail a task without retrying:
@app.task(
name="tasks.explicit_failure",
retry_policy=RetryPolicy(max_retries=3, retry_delay=1.0),
)
async def validate_and_process(ctx: TaskContext, user_input: str) -> dict:
"""Task that can fail explicitly on validation errors."""
# Validation that should NOT be retried
if len(user_input) < 3:
# Explicitly fail without retry
ctx.fail(ValueError("Input must be at least 3 characters"))
# Transient error that SHOULD be retried
try:
result = await external_api_call(user_input)
except ConnectionError as e:
# Let retry policy handle this
raise e
return {"result": result, "input": user_input}
Example usage:
# Validation error - fails immediately without retry
result = await validate_and_process.delay("ab") # Too short
try:
await result.get(timeout=10.0)
except Exception as e:
print(f"Failed immediately: {e}")
# No retries happened
# Valid input but API fails - will retry
result = await validate_and_process.delay("valid_input")
data = await result.get(timeout=30.0) # May retry on network errors
When to Use ctx.fail()
Use ctx.fail() for permanent errors that won't be fixed by retrying:
- Validation errors
- Authentication failures
- Malformed input data
- Business logic violations
Don't use it for transient errors that might succeed on retry:
- Network failures
- Temporary service outages
- Rate limiting
Next Steps¶
Now that you understand task context, explore:
- Delivery Modes - Delivery guarantees and acknowledgment