This section assumes you have a basic understanding of the core concepts of task queues and TacoQ. Read the Core Concepts section if you haven't already.
We recommend using UV to run Python projects.
TacoQ requires Postgres, RabbitMQ, and the Relay to be running. Let's start
by creating a docker-compose.yml
file to launch them:
volumes:
rabbitmq_data: {}
postgres_data: {}
services:
# ================================================
# TacoQ Relay
# The relay has two functions:
# 1. Reads task updates from the message broker
# and writes them to the database.
# 2. Has a REST API for getting tasks by ID.
# ================================================
relay:
image: ghcr.io/taco-xyz/tacoq-relay:latest
ports:
- "3000:3000"
depends_on:
rabbitmq:
condition: service_healthy
postgres:
condition: service_healthy
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:3000/health"]
interval: 5s
timeout: 5s
retries: 5
environment:
TACOQ_DATABASE_URL: postgresql://user:password@localhost:5432/tacoq
TACOQ_BROKER_URL: amqp://user:password@localhost:5672
# ================================================
# Broker
# This is the message broker where all tasks get
# routed through to the appropriate worker and to
# the relay so it can save them to the database.
# ================================================
rabbitmq:
image: rabbitmq:4-management
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: user
RABBITMQ_DEFAULT_PASS: password
volumes:
- rabbitmq_data:/var/lib/rabbitmq
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "check_port_connectivity"]
interval: 5s
timeout: 5s
retries: 5
# ================================================
# Storage
# This is the database where all tasks get saved.
# ================================================
postgres:
image: postgres:latest
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: password
POSTGRES_DB: tacoq
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U user -d tacoq"]
interval: 5s
timeout: 5s
retries: 5
Run docker compose up
to start the services and we're ready to go!
With the infrastructure running, we want to create a worker that can execute tasks. Let's start by installing the TacoQ Python SDK:
pip install tacoq
or, for UV users:
uv init
uv add tacoq
The worker must know how to receive new task assignments and send updates through the broker, so let's start by configuring that:
from tacoq import (
WorkerApplication,
BrokerConfig,
WorkerApplicationConfig,
)
broker_config = BrokerConfig(url="amqp://user:password@localhost:5672")
worker_config = WorkerApplicationConfig(
name="worker_waiter_1",
kind="worker_waiter_kind",
broker_config=broker_config,
broker_prefetch_count=5,
)
worker_app = WorkerApplication(config=worker_config)
Note that the worker field
kind
is set to"worker_waiter_kind"
. This field will be used by the publisher to know which set of workers to send the task to. We recommend using environment variables to align these values.
The worker application has been created, but it doesn't know how to handle any tasks that come its way. So, let's teach it to handle a task:
import json
from tacoq import TaskInput, TaskOutput
@worker_app.task(kind="task_wait_n_seconds")
async def task_wait_n_seconds(input_data: TaskInput) -> TaskOutput:
# The input data must be de-serialized from a string into your preferred
# data structure. In this case, we're using JSON. You could use Avro or Proto!
input_data_dict: dict[str, Any] = json.loads(input_data)
seconds = input_data_dict.get("seconds", 0)
# The task is now executed. Here we simply wait for the specified number of
# seconds and then return a results dictionary.
await asyncio.sleep(seconds)
# The results are serialized back into a string so that they can be
# transported back to whomever requested the task.
return json.dumps(
{
"result": "Hello, world! You waited for %d seconds" % seconds,
"seconds": seconds,
}
)
Note the task field
kind
is set to"task_wait_n_seconds"
. You can think about it the following way: - Worker Kind: Helps the publisher know which set of workers to send the task to. - Task Kind: Helps the worker know which method to use to handle a task. If you're familiar with task queues, you're probably used to only specifying the task kind and not the worker kind. You can read about this design decision in the System Architecture section.
Now that our worker is ready to handle tasks, we can boot it up via its
entrypoint
method:
if __name__ == "__main__":
import asyncio
asyncio.run(worker_app.entrypoint())
The worker is running and ready to handle tasks. Now, let's publish some tasks for it to take care of!
We'll start by setting up the publisher and its configuration. The publisher's one and only responsability is to publish tasks via the message broker so that the relay and the worker can take care of the rest.
from tacoq import PublisherClient, BrokerConfig
broker_config = BrokerConfig(url="amqp://user:password@localhost:5672")
publisher = PublisherClient(broker_config=broker_config)
With the publisher application created, we don't need to run an entrypoint. Instead, we will simply use the publisher to publish the task.
Let's publish a new task, wait for it to complete, and then retrieve the results:
# We must serialize the input data in a string so that it can be passed and
# interpreted by the worker!
task_input = json.dumps({"duration": 2})
# The task is published to the message broker. Note that the worker kind and
# task kind but be properly configured and match the worker and task kinds
# in the worker application.
task = await publisher.publish_task(
worker_kind="worker_waiter_kind",
task_kind="task_wait_n_seconds",
input_data=task_input,
)
# The task's ID is important for later!
task_id = task.id
Our task has now been published and is being worked on. But how do we retrieve the task's status and results?
When the worker is done with the task, it sends the results to the relay, who saves them in the database. The relay can be queried via REST for the task's current state.
To communicate with the relay, we can use the RelayClient
class:
from tacoq import RelayClient
# The relay's URL is passed as an argument to the constructor.
relay_client = RelayClient(url="http://localhost:3000")
# We can now fetch retrieve the task's status and results. You can optionally
# set `retry_until_complete` to `True` to have the publisher retry the request
# until the task has been completed by the worker.
completed_task = await relay_client.get_task(task_id)
# Let's load the results into a dictionary and print them.
result = json.loads(completed_task.results)
print(result)
# Hurray!
Congratulations! You've just published, executed, and retrieved a task using TacoQ. You can keep learning more about TacoQ in the Technical Reference section.