Building A Distributed App for Computer Vision Tasks

Gaurav Mohan
4 min readMar 20, 2023

The ability to streamline computer vision tasks is incredibly important in order to scale real-time processes across different workflows. By building underlying architecture to handle processing in the backend, the frontend can function like an API where a user makes a request to process an image or video using a specific integrated task. The focus of this project is to explain how to build a simplified version of this concept and show how the functionality can be scaled.

This is a Flask App that utilizes Celery for task management and processing and Redis for brokering messages between the Celery worker and the Flask app and cacheing data from the task results. A message broker is a service that acts as a middleman between the sender and receiver of a message. By using a message broker, Celery can distribute tasks across multiple machines and ensure that each task is executed only once. In this case the task that is being processed is Object Tracking using YOLO object detection and SORT, which is designed for tracking applications where only past and current frames are available and the method produces object identities on the fly.

# instantiate the Flask app and create a message broker to communicate
# between the task queue and the worker node
app = Flask(__name__)

redis_client = redis.Redis(host='localhost', port=6379, db=0)

app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379'
app.debug = True

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'])
celery.conf.update(app.config)

The app gets a POST request to handle the video url and process a Celery task asynchronously using the delay function. The frontend stores the generated task id and makes constant requests to access the broker messages to find and update the status of the task. Finally when the task is finished the results are displayed neatly in a scrollable box.

@celery.task()
def process(video_url):
model = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True)
tracker = Sort()

response = requests.get(str(video_url))
with open('video.mp4', 'wb') as f:
f.write(response.content)
cap = cv2.VideoCapture('video.mp4')

frame_count = 0
video_dict = {}
while True:
ret, frame = cap.read()
if not ret:
break


results = model(frame, size=640)
outputs = results.pandas().xyxy[0]

bboxes = outputs[['xmin', 'ymin', 'xmax', 'ymax']].values
class_names = outputs['name'].tolist()

tracked_objects = tracker.update(bboxes)

for obj, class_name in zip(tracked_objects, class_names):
key = str(obj[4])
if key in video_dict:
new_dict = {
"class": class_name,
"frame_number": frame_count,
"positions": {
"x1": round(obj[0], 0),
"x2": round(obj[2], 0),
"y1": round(obj[1], 0),
"y2": round(obj[3], 0),
}
}
video_dict[key].append(new_dict)
else:
video_dict[key] = [
{
"class": class_name,
"frame_number": frame_count,
"positions": {
"x1": round(obj[0], 0),
"x2": round(obj[2], 0),
"y1": round(obj[1], 0),
"y2": round(obj[3], 0),
}
}
]
frame_count += 1

cap.release()
cv2.destroyAllWindows()
return video_dict

def push():
source_name = request.form['source_name']
video_url = request.form['source_url']
task = process.delay(video_url)
return {'id': task.id, 'status': 'queued', 'source_name': source_name}, 202

Scaling the App

The demo above utilizes one worker node for a single task, meaning it isn’t distributed in nature. However, Celery is horizontally scalable which makes it possible to create a distributed system. To make a Celery application distributed, you need to add more Celery worker nodes to your system and run them in parallel. The worker nodes share the same task queue and the tasks are stored by Redis in the order that it is received.

You can edit the push() function to send a task directly to worker using send_task rather than delay. This way you can specify which task is being added to the queue.

# In terminal run the following to add a new worker
celery -A main.celery worker --loglevel=info

# updated push function in main.py
@app.route('/push', methods=['POST'])
def push():
source_name = request.form['source_name']
video_url = request.form['source_url']
task = celery.send_task('celery.process', args=[video_url])
return {'id': task.id, 'status': 'queued', 'source_name': source_name}, 202I simply apply the dependencies that are supported for python 3.8 and then build my model, where the setup function is deployed first and then the predict function is deployed after. The result is that a task that would normally take over 10 minutes to complete only takes about 2 minutes with their API that utilizes Celery, Redis, and Kubernetes. A simple app that leverages distributed systems can go a long way in scaling applications for niche needs.

To dynamically balance the number of tasks each worker is handling, you can edit the concurrency settings to ‘autoscale’ in the configuration.This option allows you to automatically adjust the number of worker processes in a pool based on the number of active tasks. CELERYD_POOL_RESTARTS enables automatic worker pool restarts, which can be useful for freeing up memory and other resources.

# update configuration of celery to include autoscaling

app = Flask(__name__)

redis_client = redis.Redis(host='localhost', port=6379, db=0)

app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379'
app.debug = True

app.config['CELERYD_POOL_RESTARTS'] = True
app.config['CELERYD_AUTOSCALER'] = 'celery.worker.autoscale:Autoscaler'
app.config['CELERYD_AUTOSCALER_SETTINGS'] = {
'min_concurrency': 2,
'max_concurrency': 8,
}

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'])
celery.conf.update(app.config)

Now you can effectively scale your app to handle more tasks and automatically distribute the workers instantiated to handle these tasks in parallel.

Choosing a Message Broker

Redis is a popular choice for a messaging broker due to its high performance and low latency. It is an in-memory data store that can persist data to disk, which makes it a good choice for message queuing. It also supports various data structures like lists, sets, and hashes, which can be useful in building more complex messaging patterns. It can also cache results and provides simple and flexible API’s. However, it is not ideal for handling large amounts of data.

If your application is dealing with a large amount of data, it may be better to use Apache Kafka which is designed to handle large volumes of data. It does require significant resources to run efficiently and a more complicated setup process.

--

--

Gaurav Mohan

I enjoy exploring how data science and computer vision can influence sports strategy. I also enjoy exploring the use cases of Generative AI in full-stack apps.