Getting started with Celery and RabbitMQ

If you have a job that's computationally intensive, it wouldn't be a great idea to keep a user waiting; rather, it's best to do that in the background. Task queues are great tools that allow for async processing, outside of an HTTP request. Since DIVE works with potentially large amounts of data, we rely heavily on task queues to process it.

This post gives a basic overview of how to implement a task queue using Celery— a popular task queue for Python— used with RabbitMQ. It assumes little knowledge of task queues, and basic knowledge of Python and Flask.

The Github repository for this tutorial can be found here, if you want to play with it directly.

Overview

There are a couple of roles in the world of task queues you should know:

Producer: produces tasks to execute
Broker: dispatches tasks to the task queue, creates the task queue itself, delivers tasks from task queue to consumer
Consumer: consists of workers that execute the tasks

Celery is standard when implementing task queue workers in Python. While there's a slight learning curve, it's worth learning as it scales nicely to suit whatever needs you might have in the future. As for message brokers, Redis and RabbitMQ are both popular. For this tutorial, we will use Flask as a producer, Celery as the consumer of tasks, and RabbitMQ as the broker.

Set up

RabbitMQ

Let's get RabbitMQ up and running first. If it's not already installed, install RabbitMQ by running brew install rabbitmq in your command line.

Get RabbitMQ running in the background with:

$ sudo rabbitmq-server -detached

If you're getting an error with the rabbitmq-server command, make sure PATH=$PATH:/usr/local/sbin is in your .profile.

Next, create a RabbitMQ user and virtual host (vhost) with RabbitMQ's command line tool that manages the broker. vhosts are essentially namespaces to group queues and user permissions, helping to manage the broker.

$ sudo rabbitmqctl add_user [YOUR_NAME] [PASSWORD]
$ sudo rabbitmqctl add_vhost [VHOST_NAME]
$ sudo rabbitmqctl set_permissions -p [VHOST_NAME] [YOUR_NAME] ".*" ".*" ".*"

The last command sets gives configure, write, and read permissions for your user in this vhost. You'll need to remember the username, password, and vhost when specifying the broker url in the server script. (In the example repo, the username is admin, password is password, and vhost is test.) More commands here.

Virtual environment

Create a new directory, cd into it, and create the virtualenv:

$ mkdir celery-example
$ cd celery-example
$ virtualenv venv
$ source venv/bin/activate

Then, install the dependencies and save to requirements.txt:

$ pip install flask celery pandas numpy
$ pip freeze > requirements.txt

Finally, create your files:

$ touch server.py index.html

Server API

Now that dependencies are installed, let's open server.py and get into the code. The task for this example will be to upload a csv file, save it as a local file, then run some simple operations on it using pandas and numpy.

We'll initialize Flask and Celery, import dependencies, and instantiate the app:

import os  
from flask import Flask, url_for, jsonify, request, make_response  
from celery import Celery, states  
from flask_cors import CORS, cross_origin  
import numpy as np  
import pandas as pd

app = Flask(__name__)  
CORS(app)  

Plug in the Celery configuration:

app.config['CELERY_BROKER_URL'] = 'amqp://[YOUR_NAME]:[PASSWORD]@localhost/[VHOST_NAME]'  
app.config['CELERY_RESULT_BACKEND'] = 'amqp://[YOUR_NAME]:[PASSWORD]@localhost/[VHOST_NAME]'

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])  
celery.conf.update(app.config)  

celery.conf.update(app.config) takes in any additional configurations from Flask's config.

Finally, let's define a POST request.

@app.route('/upload', methods=['POST'])
def upload():  
    file_obj = request.files.get('file')
    file_name = file_obj.filename
    path = os.path.join('./uploads', file_name)
    file_obj.read(0)
    try:
        file_obj.save(path)
    except IOError:
        print 'I/O Error'
    file_obj.close()

    file_task = read_csv_task.apply_async(args=[path])
    return make_response(jsonify({'task_id': file_task.task_id}))

Here, the server receives the csv file with the POST request, then saves it to a folder, /uploads. Then we apply a Celery task asynchronously with .apply_async(), with read_csv_task as the Celery task. Once that's sent off, we send the id of the task back to the client.

Creating a Celery task

Time to create a task! Continuing in server.py, we'll define a background task that extracts information from the csv dataset.

Each task needs to have a decorator, @celery.task. By setting bind=True, the task function can access self as an argument, where we can update the task status with useful information.

@celery.task(bind=True)
def read_csv_task(self, path):  
    self.update_state(state=states.PENDING)
    df = pd.read_csv(path)
    result = compute_properties(df)
    return result

compute_properties() is fairly simple for now, giving summary stats about each column with pandas and numpy.

def compute_properties(df):  
    properties = {}
    properties['num_rows'] = len(df)
    properties['num_columns'] = len(df.columns)
    properties['column_data'] = get_column_data(df)
    return properties

def get_column_data(df):  
    result = []

    for c in df:
        info = {}
        col = df[c]
        info['name'] = c
        info['num_null'] = col.isnull().sum()

        if col.dtypes == 'int64':
            info['mean'] = np.mean(col)
            info['median'] = np.median(col)
            info['stddev'] = np.std(col)
            info['min'] = col.min()
            info['max'] = col.max()
        else:
            unique_values = col.unique().tolist()
            print len(unique_values), len(df)
            if len(unique_values) < len(df):
                info['unique_values'] = unique_values
            else:
                info['unique_values'] = True
        result.append(info)
    return result

Finally, specify the app's port and configure it to actually run:

if __name__ == '__main__':  
    app.run(port=8889, debug=True)

Great! Now, you should be able to get the server and worker up and running with:

$ python server.py
$ celery -A server.celery worker --loglevel=info

User interface

The interface is very simple, just an upload button with an ajax request. Open up index.html, add in jQuery, and a simple form:

<!DOCTYPE html>  
<html>  
<head>  
  <title>Celery Example</title>
  <script src="https://ajax.googleapis.com/ajax/libs/jquery/3.1.1/jquery.min.js"></script>
</head>  
<body>  
  <h1>Upload a csv file!</h1>
  <div>
    <form>
      <input type="file" accept="text/csv">
      <input type="submit">
    </form>
  </div>
  <h2>Results:</h2>
  <div id="result"></div>
</body>  
</html>  

In the script tag, add in the JavaScript to handle the submission of the file:

<script>  
  $("form").on("submit", processForm);

  function processForm(e) {
      e.preventDefault();

      var data = new FormData();
      var input = document.querySelector('input[type="file"]')

      data.append('file', input.files[0])

      window.fetch('http://localhost:8889/upload', {
          method: 'POST',
          body: data
      }).then(function(response) {
          return response.json()
      }).then(function(data) {
          checkTask(data.task_id)
      }).catch(function(error) {
          console.log('Error', error);
      });  
  }
</script>  

Taking the file, we send a POST request and receive the task id from the server.

But there's one more thing— how do we know that the task is completed? That's where checkTask() in the second .then() comes in. We'll have to add a little more complexity to the front and back end.

Checking for task completion

Let's revisit server.py, and specify the endpoint for checking the status of a task.

@app.route('/task/<task_id>', methods=['GET'])
def check_task_status(task_id):  
    task = read_csv_task.AsyncResult(task_id)
    state = task.state
    response = {}
    response['state'] = state

    if state == states.SUCCESS:
        response['result'] = task.get()
    elif state == states.FAILURE:
        try:
            response['error'] = task.info.get('error')
        except Exception as e:
            response['error'] = 'Unknown error occurred'

    return make_response(jsonify(response))

.AsyncResult() gives you access to the task state (pending, success, or failure), using the task id. Our response object includes the state, and the result if the task is complete.

On the client side, the checkTask() function looks like this:

  function checkTask(taskId) {
    window.fetch('http://localhost:8889/task/' + taskId 
    ).then(function(response) {
        return response.json()
    }).then(function(data) {
        if(data.state === 'PENDING') {
          setTimeout(function() {
            checkTask(taskId)
          }, 1000)
        } else if(data.state === 'FAILURE') {
          alert('Failure occurred')
        } else if(data.state === 'SUCCESS') {
          $("#result").html('<div>' + JSON.stringify(data.result) + '</div>')
        }
    }).catch(function(error) {
        console.log('Error', error);
    });  
  }

The task id is attached to the GET request, and if the state is pending, we will check after another second passes. When the task is completed successfully, the results are pasted onto the DOM.

Running the app

Now that everything should be set up, let's see this in action! Open index.html in your browser. If all goes well, you upload a CSV file, send it to the Flask server which produces the task to RabbitMQ (our broker), who then sends it to the consumer, the Celery worker, to execute the task. Once it's finished, the client receives the information.

Conclusion

Celery has a slight learning curve, but its features make it well worth it. There's so much more to it than discussed in this example, but hopefully it gives a good idea of how things work together. This article has some great links for further reading.

N.B. The repo is a work in progress— cleaner code and better interface coming soon!