I want to share my experience with managing background tasks in a Python application, especially in the context of financial data processing. This isn’t just about choosing between file-based queues and Celery, but also about making decisions under hardware constraints and specific industry requirements.

The Challenge

My task was to process intraday data for over 4,000 equities — a hefty amount of data! All this needed to be efficiently inserted into MongoDB, which was hosted on a modest server with just 1 core and 2 GB RAM, also running Docker. The catch? I had to do this within a tight budget and resource constraints.

Why I Chose File-Based Queues?

Given the server limitations, I decided against using Celery, despite its robust features. Here’s why:

  • Hardware Limitations: My server setup was quite basic — definitely not cut out for the additional load that comes with Celery and a message broker like Redis or RabbitMQ.
  • Budget Constraints: As a small-scale operation in the finance sector, keeping costs low was crucial. More complex setups would mean more resources and potentially higher costs.
  • Simplicity and Reliability: I needed something straightforward yet reliable. File-based queues allowed me to manage tasks effectively without overcomplicating the system.

The Setup and Results: I set up a file-based queue system with two worker threads using Futures. This approach was surprisingly efficient. The entire process of handling the intraday data for over 4,000 equities (Roughly 7 Lakh document processing) and inserting them into MongoDB took about 2.5 hours. Not bad, right? Especially considering the server constraints!

Here’s a Peek at the Code: To give you a better idea, here’s a simplified example of the file-based queue system I used:

import json
import concurrent.futures

Sample task file (tasks.json)

[  
{"task_id": 1, "data": "Task 1 data"},  
{"task_id": 2, "data": "Task 2 data"},  
 ...  
]
def process_task(task):  
    print(f"Processing task {task['task_id']}: {task['data']}")  
    # Do Some processing or storing it to DB etc etc.   
  
def main():  
    try:  
        with open('tasks.json', 'r') as file:  
            tasks = json.load(file)  
        # Sadly I can't increase the workers due to hardware limition.  
        with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:  
            future_to_task = {executor.submit(process_task, task): task for task in tasks}  
  
            for future in concurrent.futures.as_completed(future_to_task):  
                task = future_to_task[future]  
                try:  
                    future.result()  
                except Exception as e:  
                    print(f"Task {task['task_id']} generated an exception: {e}")  
                else:  
                    print(f"Task {task['task_id']} completed successfully.")  
  
        print("All tasks completed.")  
  
    except Exception as e:  
        print(f"An error occurred when reading the task file: {e}")  
  
if __name__ == "__main__":  
    main()

When Would I Consider Celery?

Now, Celery is an excellent tool, no doubt. It shines in scenarios where you have:

  • More Complex Task Management: If you’re juggling a multitude of tasks that need advanced routing or scheduling.
  • Higher Scalability Needs: For applications that need to scale and distribute tasks across multiple servers.
  • Resource Availability: If you have a server setup that can handle the extra load without breaking a sweat (or the bank!).

My journey with file-based queues has been a testament to the fact that sometimes, simplicity and resourcefulness trump complexity, especially in the world of finance where every penny and every millisecond counts. Whether you’re a fellow data enthusiast in finance or just someone exploring background task management in Python, I hope my experience sheds some light on the practical aspects of choosing the right tool for the right job!