In the past weeks we've been working on a new computer vision project at Aigency. As part of the project we have to process images using OpenCV image filters. This is a rather tedious process that takes a lot of time, so we made a nice workflow for it. In this blog post I'll show you how we used Prefect and Ray to build a scalable image processing workflow.
What is Prefect?
As many will know, machine-learning and data engineering projects often use pipelines, workflows or DAGS to process large amounts of data. There are plethora of tools out there to perform machine learning and data processing tasks.
At Aigency we've used a number of different tools like Azure Synapse, Spark, and Airflow. Azure Synapse and Spark are on the large end of the spectrum and quite expensive. Airflow can be used for large projects, but we used it mostly for smaller things.
There is however, a new kid on the block: Prefect. A workflow engine that allows you to build workflows completely in Python. A very basic workflow for Prefect looks like this:
from prefect import task, flow @task def calculate_numbers(a, b): return a + b @flow def process_all_the_numbers(): result = calculate_numbers(100, 200) if __name__ == "__main__": process_all_the_numbers()
A workflow in Prefect starts with a method decorated with
@flow. This method
chains together multiple tasks. Tasks are written as python methods decorated
To run a workflow, you can run it from the command line using python. It will then run on your local machine of course.
When you're happy with the workflow, you can deploy it to a Prefect server. This allows the workflow to be scheduled.
Prefect is available as a Python package, and can be installed like so:
pip install prefect
Now that you have a basic understanding of what Prefect is, let's dive a little deeper into our use case.
Processing data in parallel with Prefect
For our solution we need to tune the parameters of the OpenCV pipeline. We use a basic hyper parameter search algorithm for this that works like this:
- First, we collect the images that we want to process
- Next, we generate a collection of randomly choosen hyper parameters
- Then, we use the collection of images, and the collection of parameters to run experiments
- After the experiments are completed, we combine the output of the experiments into a report
You can build such a workflow with the following code:
from prefect import flow, task @task def find_images(input_folder: str) -> List[str]: return [path.join(input_folder,filename) for filename in listdir(input_folder)] @task def process_images(image_files: List[str], settings: RunSettings) -> ProcessingResult: # Perform processing on the image files result = ProcessingResult(data=x) @task def generate_run_settings(num_experiments: int, space: SearchSpace) -> List[RunSettings]: return [SearchSpace.random() for _ in range(num_experiments)] @task def collect_results(results: List[ProcessingResult]) -> None: # Collect the results, and write to a metrics file. @flow def tune_pipeline(input_folder: str) -> None: image_files = find_images(input_folder) run_settings = generate_run_settings(num_experiments, space) image_files = [image_files for _ in range(num_experiments)] results = process_images.map(image_files, run_settings) collect_results(results) if __name__ == "__main__": tune_pipeline("data/raw")
There are a couple of highlights in this code:
First, notice that we parameterized the workflow with the input folder. We provide a default value for it in the script. However, when you deploy the workflow to the Prefect server, you can specify it in configuration.
Next, In the
tune_pipeline we call the method
map on the
as if it were an object. We can do this because the
@task decorator turned it
Task that we can manipulate. The
map method will iterate over lists
of parameters and call the underlying method with the values in the lists.
map method returns a list of futures that you can collect. When you do this directly
in the flow it's quite hard to work with. However, if you give the list of results to another
task, in this case
collect_results, Prefect will automatically wait for all the results
to arrive and unwrap them for you.
When you run the workflow code without any additional configuration, you'll notice
that it is trying to paralellize the
process_images task. It sort of works,
but isn't very ideal.
It's time to something about that.
What is Ray?
Prefect uses a task runner to run each of the tasks in a flow. By default it uses a
It works, but not for huge amounts of tasks. It sometimes fails for unclear reasons.
You have options to use other task runners, like Ray.
Ray is a distributed runtime for Python often used in machine learning projects because it's capable of training models in a distributed fashion too.
Ray has two primitives: Tasks, and Actors. A task is a stateless function, can accept input and can produce output. An actor is a class with state running on a Ray node.
Since you can schedule python functions on Ray, it's a great candidate for running workflow tasks.
Using ray to run tasks
To use ray, you'll need to install two packages using pip:
pip install ray prefect-ray
Next, you'll need to modify the workflow code, so it looks like this:
from prefect_ray import RayTaskRunner @flow(task_runner=RayTaskRunner) def tune_pipeline(input_folder: str) -> None: # Other code pass
And that's it, you can run it as you would before, by starting the script from the commandline.
If you're deploying the workflow to a Prefect server, you'll need to add one more piece of configuration, the address of your Ray cluster.
from prefect_ray import RayTaskRunner @flow(task_runner=RayTaskRunner("my-ray-server:6379")) def tune_pipeline(input_folder: str) -> None: # Other code pass
You can create a Ray cluster using the instructions in the manual. Fun fact, it runs on Kubernetes, or on a set of VMs. Even a single VM works.
It takes a bit of work to configure everything, but the experience is great.
Tips and tricks
There are a few things that we found are important to keep in mind when working with Ray and Prefect.
Exchange small pieces of data between tasks
Tasks can return any python object you like. However, if you pass a lot of data and you're running on Ray, you'll find that it generates a ton of network traffic.
If you have to work with large amounts of data, we recommend storing it on disk and pass along a filename. We also recommend keeping tasks together that require the same large dataset so you don't have to shuffle data around as much.
Yes, that's a thing, I know a lot of data scientists never hear of these :P Since you can run the workflow locally, you can write unit-tests for your workflow code. It will save you a lot of time.
Skip the Sqlite database
Prefect uses a database to store workflow runs and task runs. By default it creates a sqlite database in a hidden folder in your user home folder.
Sqlite can't handle parallel calls. So, when you start using Ray you'll quickly run into database problems.
We replaced the local database with a docker container running postgres. Yes, it's more work to configure, but it will save you a lot of trouble.
You can find how to set up a proper database for Prefect in the manual.
In this post we've covered how to build workflows to process data in parallel with Prefect. We've covered how to supercharge the task runner with Ray so you can run tasks on a cluster. Finally, we've covered a few handy tips to make the most out of the setup.
Hope you enjoyed this one!