Asynchronous tasks in Python with Celery + RabbitMQ + Redis

Asynchronous tasks in Python with Celery + RabbitMQ + Redis

In this article, we are going to use Celery, RabbitMQ, and Redis to build a distributed Task queue.
But what is a distributed task queue, and why would you build one?

A distributed task queue allows you offload work to another process, to be handled asynchronously (once you push the work onto the queue, you don’t wait) and in parallel (you can use other cores to process the work).

So it basically gives you the ability to execute tasks in the background while the application continues to resolve other tasks.

Use Cases of Task Queues

The most basic and understandable example would be sending emails after the user is registered. In this case, you don’t know how much time is it going to get to send the email to the user, it can be 1ms but it can be more, or sometimes even not sent at all, because, in these case scenarios, you are not responsible or simply said you’re not aware of the task is going to be successfully done, because it’s another provider who is going to do that for you.
So now that you got a simple idea of how you can benefit from the task queues, identifying such tasks is as simple as checking to see if they belong to one of the following categories:

  • Third-party tasks — The web app must serve users quickly without waiting for other actions to complete while the page loads, e.g., sending an email or notification or propagating updates to internal tools (such as gathering data for A/B testing or system logging).

  • Long-running jobs — Jobs that are expensive in resources, where users need to wait while they compute their results, e.g., complex workflow execution (DAG workflows), graph generation, Map-Reduce like tasks, and serving of media content (video, audio).

  • Periodic tasks — Jobs that you will schedule to run at a specific time or after an interval, e.g., monthly report generation or a web scraper that runs twice a day.

Setting up the dependencies for Celery

Celery requires a message transport to send and receive messages. Some candidates that you can use as message brokers are:

For this tutorial we are going to use RabbitMQ, you can use any other message broker that you want (ex. Redis).

It’s also good to mention what are we going to use Redis for now since for the message transporter we are using RabbitMQ.
When tasks are sent to the broker, and then executed by the celery worker, we want to save the state, and also to see which tasks have been executed before. For that, you’re going to need some kind of data store and for this one, we are going to use Redis.

For the result stores, we also have many candidates:

  • AMQP, Redis

  • Memcached,

  • SQLAlchemy, Django ORM

  • Apache Cassandra, Elasticsearch, Riak, etc

To set up these services we are going to use docker as it’s easy to set up, it’s an isolated environment and you can easily reproduce the same environment when you have a configuration (Dockerfile or docker-compose).

Project setup

Let’s start a new python project from scratch. First let’s create a new directory, create all the files necessary for the project, and then initialize the virtual environment.

touch docker-compose.yml requirements.txt
touch tasks.py

Create and activate a virtual environment:

python -m venv env
source env/bin/activate

Now let’s install the project requirements from requirements.txt. For this project, we are just going to need celery and Redis.

pip install celery==5.0.5 redis

Now it’s time to configure docker-compose to run RabbitMQ and Redis. In the docker-compose.yaml paste the following YAML configuration.

version: "3"
services:
  rabbitmq:
    image: rabbitmq:latest
    environment:
      - RABBITMQ_DEFAULT_USER=guest
      - RABBITMQ_DEFAULT_PASS=guest
    ports:
      - "5672:5672"

  redis:
    image: redis:latest
    ports:
      - "6379:6379"

Here we simply start up two services, by defining the image key to point to the image in dockerhub, mapping the ports <host:docker> , and adding environment variables. To see what types of environment variables you can use with your image, you can simply go to the corresponding image in dockerhub, and see the documentation. For example, you can check how to use RabbitMQ image here

Now, let’s initialize the celery app to use RabbitMQ as a message transporter and Redis as a result store.
In the tasks.py, let’s go ahead and paste the following code:


from celery import Celery
from time import sleep

broker_url = "amqp://localhost"
redis_url = "redis://localhost"
app = Celery('tasks', broker=broker_url, backend=redis_url)


@app.task
def say_hello(name: str):
    sleep(5) 
    return f"Hello {name}"

I tried to keep the code as minimal as possible, so you can understand the purpose of this tutorial.
As you can see, we have defined the URLs for RabbitMQ and Redis, and then we simply initialize the celery app using those configurations. The first parameter tasks is the name of the current module.

Then we decorated the function say_hello with @app.task which tells that the function is marked as a task, and then can later be called using .delay() which we will see in a bit.

Normally we would have a module celery_app.py to only initialize the celery application instance, and then a separate moduletasks.py in which we would define the tasks that we want to run by celery.

Build and run services with docker

Now we only need to run the services (RabbitMQ and Redis) with docker. To run the images inside a container we simply run:

docker-compose up -d

This will take a while if you don’t have these images pulled locally. Then to verify that the containers are up and running we write:

docker ps

And you should see two services running, and additional information for each one, if not check the logs for any possible errors.
Now let’s start the celery worker, and then let’s try to run some tasks with python interactive shell.

Start the celery worker:

 celery -A tasks worker -l info --pool=solo

This will run celery worker, and if you see the logs it should tell you that it has successfully connected with the broker.

Now let's run a celery task with Python interactive shell:

$ python
---------------------------------
Type "help", "copyright", "credits" or "license" for more information.
>>> from tasks import say_hello
>>> say_hello.delay("Valon")
<AsyncResult: 55ad96a9-f7ea-44f4-9a47-e15b90d6d8a2>

We can see that we called the function using .delay() and then passed the name argument. This method is actually a star-argument shortcut to another method called apply_async(). Then we see that we get <AsyncResult back which is the task that was passed to the broker, and after that will get consumed and finished in the background by celery.

If you look at your worker now, you will see in the logs that the worker received a task and then after 5 seconds will tell you that the task finished successfully.

Now let’s run the same task but let’s put the results store in the game now. In the python shell let’s store the result in a variable, and then let's see the properties

If we didn’t have the backend configured at the celery (Redis), we couldn’t access these properties or functions, because by default it wouldn’t store any state, but since we have it, we can see and get the pieces of information about our tasks. If you wanna dig deeper you can access your Redis database with a tool like table plus or you can set Flowerto monitor Redis and RabbitMQ.

As you can see in the image above, all the tasks are stored in Redis.

Wrapping up

In this article, we have set up a python application with Celery, RabbitMQ, and Redis from scratch. The purpose of the article was to show you what is task queue, what can we benefit from it, and how to implement it.
The examples of the task are just for demonstration, but you can use the same configuration as I did on this one, adding tasks in the tasks module and the configuration in celery_app.py. See the docs here

I highly encourage you to use celery in your application as it is quite useful when you have things that take longer time, you need to schedule tasks, etc.

You can find the full source code of the article on the GitHub repository, with the instructions GitHub Repo.

If you found it helpful, please don’t forget to clap & share it on your social network or with your friends.

If you want to support my work you can buy me a coffee by clicking the image below😄

If you have any questions, feel free to reach out to me.

Connect with me on LinkedIn, GitHub