TacoQ Logo
Quickstart

Setup

Get TacoQ up and running on your project using Docker and the Python SDK.

This section assumes you have a basic understanding of the core concepts of task queues and TacoQ. Read the Core Concepts section if you haven't already.

Prerequisites

We recommend using UV to run Python projects.

Infrastructure

TacoQ requires Postgres, RabbitMQ, and the Relay to be running. Let's start by creating a docker-compose.yml file to launch them:

volumes:
  rabbitmq_data: {}
  postgres_data: {}

services:
  # ================================================
  # TacoQ Relay
  # The relay has two functions:
  # 1. Reads task updates from the message broker
  #    and writes them to the database.
  # 2. Has a REST API for getting tasks by ID.
  # ================================================

  relay:
    image: ghcr.io/taco-xyz/tacoq-relay:latest
    ports:
      - "3000:3000"
    depends_on:
      rabbitmq:
        condition: service_healthy
      postgres:
        condition: service_healthy
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:3000/health"]
      interval: 5s
      timeout: 5s
      retries: 5
    environment:
      TACOQ_DATABASE_URL: postgresql://user:password@localhost:5432/tacoq
      TACOQ_BROKER_URL: amqp://user:password@localhost:5672

  # ================================================
  # Broker
  # This is the message broker where all tasks get
  # routed through to the appropriate worker and to
  # the relay so it can save them to the database.
  # ================================================

  rabbitmq:
    image: rabbitmq:4-management
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      RABBITMQ_DEFAULT_USER: user
      RABBITMQ_DEFAULT_PASS: password
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq
    healthcheck:
      test: ["CMD", "rabbitmq-diagnostics", "check_port_connectivity"]
      interval: 5s
      timeout: 5s
      retries: 5

  # ================================================
  # Storage
  # This is the database where all tasks get saved.
  # ================================================

  postgres:
    image: postgres:latest
    environment:
      POSTGRES_USER: user
      POSTGRES_PASSWORD: password
      POSTGRES_DB: tacoq
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U user -d tacoq"]
      interval: 5s
      timeout: 5s
      retries: 5

Run docker compose up to start the services and we're ready to go!

Client

Worker

With the infrastructure running, we want to create a worker that can execute tasks. Let's start by installing the TacoQ Python SDK:

pip install tacoq

or, for UV users:

uv init
uv add tacoq

The worker must know how to receive new task assignments and send updates through the broker, so let's start by configuring that:

from tacoq import (
    WorkerApplication,
    BrokerConfig,
    WorkerApplicationConfig,
)

broker_config = BrokerConfig(url="amqp://user:password@localhost:5672")
worker_config = WorkerApplicationConfig(
    name="worker_waiter_1",
    kind="worker_waiter_kind",
    broker_config=broker_config,
    broker_prefetch_count=5,
)

worker_app = WorkerApplication(config=worker_config)

Note that the worker field kind is set to "worker_waiter_kind". This field will be used by the publisher to know which set of workers to send the task to. We recommend using environment variables to align these values.

The worker application has been created, but it doesn't know how to handle any tasks that come its way. So, let's teach it to handle a task:

import json
from tacoq import TaskInput, TaskOutput

@worker_app.task(kind="task_wait_n_seconds")
async def task_wait_n_seconds(input_data: TaskInput) -> TaskOutput:

    # The input data must be de-serialized from a string into your preferred
    # data structure. In this case, we're using JSON. You could use Avro or Proto!
    input_data_dict: dict[str, Any] = json.loads(input_data)
    seconds = input_data_dict.get("seconds", 0)

    # The task is now executed. Here we simply wait for the specified number of
    # seconds and then return a results dictionary.
    await asyncio.sleep(seconds)

    # The results are serialized back into a string so that they can be
    # transported back to whomever requested the task.
    return json.dumps(
        {
            "result": "Hello, world! You waited for %d seconds" % seconds,
            "seconds": seconds,
        }
    )

Note the task field kind is set to "task_wait_n_seconds". You can think about it the following way: - Worker Kind: Helps the publisher know which set of workers to send the task to. - Task Kind: Helps the worker know which method to use to handle a task. If you're familiar with task queues, you're probably used to only specifying the task kind and not the worker kind. You can read about this design decision in the System Architecture section.

Now that our worker is ready to handle tasks, we can boot it up via its entrypoint method:

if __name__ == "__main__":
    import asyncio
    asyncio.run(worker_app.entrypoint())

The worker is running and ready to handle tasks. Now, let's publish some tasks for it to take care of!

PublisherClient

We'll start by setting up the publisher and its configuration. The publisher's one and only responsability is to publish tasks via the message broker so that the relay and the worker can take care of the rest.

from tacoq import PublisherClient, BrokerConfig

broker_config = BrokerConfig(url="amqp://user:password@localhost:5672")
publisher = PublisherClient(broker_config=broker_config)

With the publisher application created, we don't need to run an entrypoint. Instead, we will simply use the publisher to publish the task.

Let's publish a new task, wait for it to complete, and then retrieve the results:

# We must serialize the input data in a string so that it can be passed and
# interpreted by the worker!
task_input = json.dumps({"duration": 2})

# The task is published to the message broker. Note that the worker kind and
# task kind but be properly configured and match the worker and task kinds
# in the worker application.
task = await publisher.publish_task(
    worker_kind="worker_waiter_kind",
    task_kind="task_wait_n_seconds",
    input_data=task_input,
)

# The task's ID is important for later!
task_id = task.id

Our task has now been published and is being worked on. But how do we retrieve the task's status and results?

RelayClient

When the worker is done with the task, it sends the results to the relay, who saves them in the database. The relay can be queried via REST for the task's current state.

To communicate with the relay, we can use the RelayClient class:

from tacoq import RelayClient

# The relay's URL is passed as an argument to the constructor.
relay_client = RelayClient(url="http://localhost:3000")

# We can now fetch retrieve the task's status and results. You can optionally
# set `retry_until_complete` to `True` to have the publisher retry the request
# until the task has been completed by the worker.
completed_task = await relay_client.get_task(task_id)

# Let's load the results into a dictionary and print them.
result = json.loads(completed_task.results)
print(result)

# Hurray!

Congratulations! You've just published, executed, and retrieved a task using TacoQ. You can keep learning more about TacoQ in the Technical Reference section.

TacoDivision Logo

Frameworks

TacoQ

TacoDocs

WIP

TacoFlow

Soon

TacoBI

Soon

TacoCI

Soon

Taco Plus

Docs Templates

WIP

Early Access

Soon

Priority Support

Soon

BI Templates

Soon

Community

© 2025 Taco Division.
All rights reserved.