Skip to content

Worker

chicory.Worker

Worker(app: Chicory, config: WorkerConfig | None = None)

Async task worker.

Consumes messages from broker, executes tasks, and stores results.

Initialize a worker.

Parameters:

Name Type Description Default
app Chicory

The Chicory application instance

required
config WorkerConfig | None

WorkerConfig instance. If not provided, uses app.config.worker

None
Source code in src/chicory/worker.py
def __init__(
    self,
    app: Chicory,
    config: WorkerConfig | None = None,
) -> None:
    """
    Initialize a worker.

    Args:
        app: The Chicory application instance
        config: WorkerConfig instance. If not provided, uses app.config.worker
    """
    self.app = app
    self.config = config or app.config.worker

    # Set attributes from config
    self.concurrency = self.config.concurrency
    self.queue = self.config.queue
    self.use_dead_letter_queue = self.config.use_dead_letter_queue
    self.heartbeat_interval = self.config.heartbeat_interval
    self.heartbeat_ttl = self.config.heartbeat_ttl
    self.cleanup_interval = Worker._jitter(self.config.cleanup_interval, 0.15)
    self.stale_workers_timeout = self.config.stale_workers_timeout

    self._running = False
    self._executor = ThreadPoolExecutor(max_workers=self.concurrency)
    self._semaphore = asyncio.Semaphore(self.concurrency)
    self._tasks: set[asyncio.Task[Any]] = set()
    self._consume_task: asyncio.Task[None] | None = None
    self._heartbeat_task: asyncio.Task[None] | None = None
    self._cleanup_task: asyncio.Task[None] | None = None

    self.worker_id = f"{socket.gethostname()}-{os.getpid()}-{uuid.uuid4().hex[:8]}"
    self.hostname = socket.gethostname()
    self.pid = os.getpid()
    self.started_at = datetime.now(UTC)
    self.last_heartbeat: datetime | None = None

    self._tasks_processed = 0
    self._tasks_failed = 0

    self._logger = logging.getLogger(f"chicory.worker.{self.worker_id}")

    self._logger.info(
        f"Initialized worker {self.worker_id}",
        extra={
            "worker_id": self.worker_id,
            "queue": self.queue,
        },
    )

start async

start() -> None

Start the worker (non-blocking).

The worker will run in the background. Call stop() to gracefully shutdown.

Source code in src/chicory/worker.py
async def start(self) -> None:
    """
    Start the worker (non-blocking).

    The worker will run in the background. Call stop() to gracefully shutdown.
    """
    if self._running:
        self._logger.warning(
            "Worker is already running", extra={"worker_id": self.worker_id}
        )
        return

    self._logger.info(
        f"Starting worker with concurrency={self.concurrency}",
        extra={"worker_id": self.worker_id},
    )

    await self.app.connect()
    self._running = True

    # Start consume loop as a background task
    self._consume_task = asyncio.create_task(self._consume_loop())
    # Start heartbeat loop
    self._heartbeat_task = asyncio.create_task(self._heartbeat_loop())
    # Cleanup dead workers task
    self._cleanup_task = asyncio.create_task(self._cleanup_loop())

stop async

stop(timeout: float | None = None) -> None

Stop the worker gracefully.

Parameters:

Name Type Description Default
timeout float | None

Maximum time to wait for tasks to complete (seconds). If None, uses config.shutdown_timeout

None
Source code in src/chicory/worker.py
async def stop(self, timeout: float | None = None) -> None:
    """
    Stop the worker gracefully.

    Args:
        timeout: Maximum time to wait for tasks to complete (seconds).
                If None, uses config.shutdown_timeout
    """
    if not self._running:
        self._logger.warning(
            "Worker is not running", extra={"worker_id": self.worker_id}
        )
        return

    if timeout is None:
        timeout = self.config.shutdown_timeout

    self._logger.info("Stopping worker...", extra={"worker_id": self.worker_id})
    self._running = False
    self.app.broker.stop()

    # Stop heartbeat
    if self._heartbeat_task:
        self._heartbeat_task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._heartbeat_task

    # Wait for consume loop to finish
    if self._consume_task:
        try:
            await asyncio.wait_for(self._consume_task, timeout=5.0)
        except TimeoutError:
            self._logger.warning(
                "Consume loop did not finish in time",
                extra={"worker_id": self.worker_id},
            )
            self._consume_task.cancel()

    # Wait for running tasks with timeout
    if self._tasks:
        self._logger.info(
            f"Waiting for {len(self._tasks)} tasks to complete...",
            extra={"worker_id": self.worker_id},
        )
        try:
            await asyncio.wait_for(
                asyncio.gather(*self._tasks, return_exceptions=True),
                timeout=timeout,
            )
        except TimeoutError:
            self._logger.warning(
                f"Tasks did not complete within {timeout}s, cancelling...",
                extra={"worker_id": self.worker_id},
            )
            for task in self._tasks:
                task.cancel()
            await asyncio.gather(*self._tasks, return_exceptions=True)

    await self._send_heartbeat(is_running=False)

    await self._shutdown()

run async

run() -> None

Run the worker until stopped (blocking).

This sets up signal handlers and blocks until the worker is stopped via signal (SIGINT/SIGTERM) or programmatically via stop().

Example

worker = Worker(app) await worker.run() # Blocks

Source code in src/chicory/worker.py
async def run(self) -> None:
    """
    Run the worker until stopped (blocking).

    This sets up signal handlers and blocks until the worker is stopped
    via signal (SIGINT/SIGTERM) or programmatically via stop().

    Example:
        >>> worker = Worker(app)
        >>> await worker.run()  # Blocks
    """
    # Set up signal handlers (not supported on Windows with ProactorEventLoop)
    loop = asyncio.get_event_loop()
    signals_registered = False
    try:
        for sig in (signal.SIGINT, signal.SIGTERM):
            loop.add_signal_handler(sig, self._handle_shutdown)
        signals_registered = True
    except NotImplementedError:  # pragma: no cover
        # Windows doesn't support signal handlers with ProactorEventLoop
        self._logger.warning(
            "Signal handlers not supported on this platform. "
            "Use Ctrl+C or stop() method to terminate.",
            extra={"worker_id": self.worker_id},
        )

    try:
        await self.start()

        # Wait for worker to stop
        if self._consume_task:
            with contextlib.suppress(asyncio.CancelledError):
                await self._consume_task
    except KeyboardInterrupt:  # pragma: no cover
        # Handle Ctrl+C on Windows
        self._logger.info(
            "Keyboard interrupt received", extra={"worker_id": self.worker_id}
        )
        await self.stop()
    finally:
        # Clean up signal handlers if they were registered
        if signals_registered:
            for sig in (signal.SIGINT, signal.SIGTERM):
                loop.remove_signal_handler(sig)

get_stats

get_stats() -> WorkerStats

Get worker statistics.

Returns:

Name Type Description
WorkerStats WorkerStats

Current worker statistics.

Source code in src/chicory/worker.py
def get_stats(self) -> WorkerStats:
    """
    Get worker statistics.

    Returns:
        WorkerStats: Current worker statistics.
    """
    return WorkerStats(
        worker_id=self.worker_id,
        hostname=self.hostname,
        pid=self.pid,
        queue=self.queue,
        concurrency=self.concurrency,
        tasks_processed=self._tasks_processed,
        tasks_failed=self._tasks_failed,
        active_tasks=len(self._tasks),
        started_at=self.started_at,
        last_heartbeat=datetime.now(UTC),
        is_running=self._running,
    )

healthcheck async

healthcheck() -> WorkerStats

Perform a healthcheck on the worker.

Returns:

Name Type Description
WorkerStats WorkerStats

Current worker statistics.

Source code in src/chicory/worker.py
async def healthcheck(self) -> WorkerStats:
    """
    Perform a healthcheck on the worker.

    Returns:
        WorkerStats: Current worker statistics.
    """
    stats = self.get_stats()

    # Check broker connectivity
    stats.broker = await self.app.broker.healthcheck()

    # Check backend connectivity
    if self.app.backend:
        stats.backend = await self.app.backend.healthcheck()

    return stats

WorkerConfig

chicory.WorkerConfig

Bases: BaseSettings

Worker process configuration.