rand[om]

rand[om]

med ∩ ml

Cron jobs inside your web app

Table of contents

When you have a web application, almost always, you need to run some functions/scripts regularly. This what cron is for. I wanted to find an easy way to run “cron-like” jobs inside a FastAPI app. The app runs inside docker, and I just need those jobs to run as long as the app is also running.

Since the app is “async”, I found I can just have recurring background tasks as part of the app. We need to use the app lifespan and some functions that run a task, the asyncio.sleep for a pre-defined time. I decided to apply some random jitter to avoid multiple tasks starting at once. (Note: this can also be done in “sync” apps using threads).

Doing this has been great, because it makes it easier to just use the Python functions available, without having to write a separate script or CLI entrypoint, and we also avoid having to run tasks outside the container.

Important:

Be careful if you have multiple workers or multiple container replicas running at once. Since the tasks may run more often than you think.

from contextlib import asynccontextmanager
import random
import logging
import asyncio
from fastapi import FastAPI


def get_db_pool(): ...


@asynccontextmanager
async def lifespan(app: FastAPI):

    # "cron" jobs
    TASKS = []

    async def _expire_sessions_task():
        pool = get_db_pool()
        frequency_minutes = 5
        frequency_seconds = frequency_minutes * 60
        while True:
            logging.info("Expiring sessions")
            await expire_sessions(pool)
            # apply some jitter
            await asyncio.sleep(frequency_seconds * random.uniform(0.7, 1.3))

    async def _some_other_task():
        frequency_minutes = 60
        frequency_seconds = frequency_minutes * 60
        while True:
            # do something
            # apply some jitter
            await asyncio.sleep(frequency_seconds * random.uniform(0.7, 1.3))

    funcs = [
        _expire_sessions_task,
    ]

    for func in funcs:
        task = asyncio.create_task(func())
        TASKS.append(task)

    # Yield control back to FastAPI
    yield

    # Cleanup: Cancel the background task on shutdown
    for task in TASKS:
        task.cancel()
        try:
            await task
        except asyncio.CancelledError:
            pass

app = FastAPI(lifespan=lifespan)

Extra tip

You may want to write those “cron” functions in a way that doesn’t block the main application. For example, imagine you’re running some cleanup task, and you’re using SQLite, which can only have a single writer at once. It could be ok to take some time to clean the database rows, so you may want to write the code like this:

async def cleanup_rows():
    conn = get_read_connection()
    rows_to_clean = conn.execute("SELECT * FROM ... WHERE ...")
    for row in rows_to_clean:
        write_conn = get_write_connection()
        await run_in_threadpool(write_conn.execute, "DELETE FROM ... WHERE ...")

This will be slower than just running:

DELETE * FROM ... WHERE ...

But in return, we avoid locking the database or blocking the event loop for a long time.

Personal notes

Regular cron jobs will probably be better. This solution was “good enough” for the task I needed to do. Make sure to test and evaluate when running in production.