Skip to content

Task

chicory.Task

Task(
    fn: Callable[P, R], app: Chicory, options: TaskOptions
)

Bases: Generic[P, R]

Wrapper around a task function providing invocation APIs.

Source code in src/chicory/task.py
def __init__(self, fn: Callable[P, R], app: Chicory, options: TaskOptions) -> None:
    self.fn = fn
    self.app = app
    self.options = options
    self.is_async = inspect.iscoroutinefunction(fn)

    self.name = self.options.name or self._resolve_task_name()

    # Check if first param is TaskContext using inspect.signature
    # This works regardless of whether TaskContext is imported in TYPE_CHECKING
    sig = inspect.signature(fn)
    params = list(sig.parameters.values())
    self.has_context = False
    if len(params) > 0:
        first_param = params[0]
        annotation = first_param.annotation
        # Check if annotation is TaskContext (runtime)
        # or string 'TaskContext' (TYPE_CHECKING)
        if annotation is TaskContext or (
            isinstance(annotation, str) and annotation == "TaskContext"
        ):
            self.has_context = True

    # Build Pydantic model for validation
    self._input_model = self._build_input_model()
    self.output_model = (
        sig.return_annotation
        if sig.return_annotation != inspect.Signature.empty
        else None
    )

delay async

delay(*args: args, **kwargs: kwargs) -> AsyncResult[R]

Dispatch this task for asynchronous execution.

Publishes a message to the broker and returns an AsyncResult that can be used to poll or await the outcome.

Parameters:

Name Type Description Default
*args args

Positional arguments forwarded to the task function.

()
**kwargs kwargs

Keyword arguments forwarded to the task function.

{}

Returns:

Type Description
AsyncResult[R]

An AsyncResult instance for tracking the task's state and

AsyncResult[R]

retrieving its return value.

Raises:

Type Description
ValidationError

If input validation is enabled and the arguments fail validation.

Source code in src/chicory/task.py
async def delay(self, *args: P.args, **kwargs: P.kwargs) -> AsyncResult[R]:
    """Dispatch this task for asynchronous execution.

    Publishes a message to the broker and returns an ``AsyncResult``
    that can be used to poll or await the outcome.

    Args:
        *args: Positional arguments forwarded to the task function.
        **kwargs: Keyword arguments forwarded to the task function.

    Returns:
        An ``AsyncResult`` instance for tracking the task's state and
        retrieving its return value.

    Raises:
        ValidationError: If input validation is enabled and the
            arguments fail validation.
    """
    validation_mode = (
        self.options.validation_mode or self.app.config.validation_mode
    )

    if validation_mode in (ValidationMode.INPUTS, ValidationMode.STRICT):
        _ = self._validate_inputs(*args, **kwargs)

    task_id = str(uuid.uuid4())
    message = TaskMessage(
        id=task_id,
        name=self.name,
        args=list(args),
        kwargs=kwargs,
        retries=0,
        eta=None,
    )

    await self.app.broker.publish(message)
    return AsyncResult[R](task_id, self.app.backend)

send async

send(*args: args, **kwargs: kwargs) -> str

Fire-and-forget task invocation.

Returns task_id for logging/tracing.

Source code in src/chicory/task.py
async def send(self, *args: P.args, **kwargs: P.kwargs) -> str:
    """
    Fire-and-forget task invocation.

    Returns task_id for logging/tracing.
    """
    result = await self.delay(*args, **kwargs)
    return result.task_id

TaskContext

chicory.TaskContext dataclass

TaskContext(
    task_id: str,
    task_name: str,
    retries: int,
    max_retries: int,
    retry_policy: RetryPolicy | None = None,
)

Runtime context injected into tasks.

is_last_retry property

is_last_retry: bool

Check if this is the last retry attempt.

Returns:

Type Description
bool

True if the current attempt is the final one before the

bool

task would exceed its maximum retry count.

remaining_retries property

remaining_retries: int

Get the number of remaining retry attempts.

Returns:

Type Description
int

The number of retries still available (0 when exhausted).

retry

retry(
    countdown: float | None = None,
    exc: Exception | None = None,
) -> NoReturn

Request a task retry by raising a RetryError.

Parameters:

Name Type Description Default
countdown float | None

Optional delay in seconds before retry. If None and a retry_policy is configured, the delay will be calculated from the policy.

None
exc Exception | None

Optional exception that caused the retry. Used to check if the exception is retryable according to the retry policy.

None
Source code in src/chicory/context.py
def retry(
    self,
    countdown: float | None = None,
    exc: Exception | None = None,
) -> NoReturn:
    """
    Request a task retry by raising a RetryError.

    Args:
        countdown: Optional delay in seconds before retry.
                    If None and a retry_policy is configured,
                    the delay will be calculated from the policy.
        exc: Optional exception that caused the retry. Used to check if the
                exception is retryable according to the retry policy.
    """

    effective_max = (
        self.retry_policy.max_retries if self.retry_policy else self.max_retries
    )

    if self.retries >= effective_max:
        raise MaxRetriesExceededError(
            f"Task {self.task_name!r} exceeded max retries ({effective_max})"
        )

    # Check if exception is retryable (if provided and policy exists)
    if (
        exc is not None
        and self.retry_policy is not None
        and not self.retry_policy.should_retry(exc)
    ):
        raise exc  # Re-raise original exception, will go to DLQ

    # Calculate countdown from policy if not provided
    if countdown is None and self.retry_policy is not None:
        next_attempt = self.retries + 1
        countdown = self.retry_policy.calculate_delay(next_attempt)

    raise RetryError(
        retries=self.retries,
        max_retries=effective_max,
        countdown=countdown,
    )

fail

fail(exc: Exception) -> NoReturn

Explicitly fail the task (will be moved to DLQ).

Source code in src/chicory/context.py
def fail(self, exc: Exception) -> NoReturn:
    """Explicitly fail the task (will be moved to DLQ)."""
    raise exc