Skip to content

FastAPI Task Manager

FastAPI Task Manager

Lightweight, efficient and fast to code scheduled task management system built on FastAPI

PyPI - Version PyPI - Downloads PyPI - License PyPI - Python Version Pepy Total Downloads Coveralls


Documentation: https://fastapi-task-manager.morry98.com

Source Code: https://github.com/Morry98/fastapi-task-manager


Overview

FastAPI Task Manager is a lightweight and efficient scheduled task management system built on top of FastAPI and Redis. It is designed to help developers easily create, manage, and execute scheduled tasks within their FastAPI applications.

Key Features

  • FastAPI Extension - Built as an extension to FastAPI, making it easy to integrate into existing FastAPI applications and leverage its features
  • Redis Streams Architecture - Uses Redis Streams with leader election for distributed task scheduling and single-instance execution safety
  • Fast to Code - Increase the speed to develop scheduled tasks by about 400% to 500%, only a wrapper function is needed*
  • Fewer Bugs - Reduce about 60% of human (developer) induced errors managing lock, Redis keys and task execution*
  • Scheduled Tasks - Provides a simple and intuitive API for defining and scheduling tasks to run at specific intervals or times
  • Task Management - Includes FastAPI router with 12 endpoints to manage tasks: disable, enable, trigger, monitor health, and more
  • Dynamic Tasks - Create and delete tasks at runtime via REST API without redeploying
  • Retry with Backoff - Automatic exponential backoff on task failures with configurable per-task overrides
  • Fault Tolerant - Task heartbeat monitoring, automatic reconciliation of stale tasks, and leader failover
  • Easy to Use - Designed to be easy to use and learn. Less time reading docs
  • Robust - Get production-ready code

* estimation based on real production task migrated to FastAPI task manager from custom "cron job" solution.

Requirements

FastAPI Task Manager stands on the shoulders of giants:

  • FastAPI - Modern, fast web framework for building APIs
  • Redis - In-memory data structure store for task storage and locking

Installation

Prerequisites

You need to have a FastAPI project set up. If you don't have one, check the FastAPI installation tutorial.

Install FastAPI Task Manager:

uv add fastapi-task-managerResolved XX packages in XXms
Installed 1 package in XXms
+ fastapi-task-manager==x.x.x
pip install fastapi-task-managerSuccessfully installed fastapi-task-manager
poetry add fastapi-task-managerUsing version ^x.x.x for fastapi-task-manager

Updating dependencies
Resolving dependencies... (0.1s)

Package operations: 1 install, 0 updates, 0 removals

Quick Example

Here's a one file simple example to get you started.

In this example, we create a FastAPI application with FastAPI Task Manager integrated.
We define a scheduled task that runs every five minutes and we include the optional task management router to manage our tasks.

Non production example

This is a simple example contained in a single file for demonstration purposes only. In a real-world application, you would typically organize your code into multiple files and modules for better maintainability and scalability.
In learn section of the docs, you can find a more complete explanation of how to set up FastAPI Task Manager in a more structured way.

Imports

# Code above omitted 👆

from fastapi_task_manager import Config as ManagerConfig  # renamed to avoid conflict with our app Config class
from fastapi_task_manager import TaskGroup, TaskManager

# Code below omitted 👇
👀 Full file preview
from fastapi import APIRouter, FastAPI
from pydantic_settings import BaseSettings, SettingsConfigDict

from fastapi_task_manager import Config as ManagerConfig  # renamed to avoid conflict with our app Config class
from fastapi_task_manager import TaskGroup, TaskManager


class Config(BaseSettings):
    model_config = SettingsConfigDict(
        # `.env.prod` takes priority over `.env`
        env_file=(".env", ".env.prod"),
        extra="forbid",
    )

    # --------- Redis config variables ---------
    redis_host: str
    redis_port: int = 6379
    redis_password: str | None = None
    redis_db: int = 0
    # --------- End of redis config variables ---------

    # --------- App config variables ---------
    app_name: str = "my_fastapi_app"
    concurrent_tasks: int = 3
    # --------- End of app config variables ---------


CONFIG = Config()  # ty: ignore[missing-argument]

app = FastAPI()
router = APIRouter()

task_manager = TaskManager(
    config=ManagerConfig(
        redis_host=CONFIG.redis_host,
        redis_port=CONFIG.redis_port,
        redis_password=CONFIG.redis_password,
        redis_db=CONFIG.redis_db,
        concurrent_tasks=CONFIG.concurrent_tasks,
        redis_key_prefix=CONFIG.app_name,
    ),
    app=app,
)
my_example_task_group = TaskGroup(
    tags=["example"],
    name="My Example Task Group",
)
task_manager.add_task_group(my_example_task_group)

manager_router = task_manager.get_manager_router()
router.include_router(
    manager_router,
    prefix="/task-manager",
    tags=["task-manager"],
)

app.include_router(router)


@my_example_task_group.add_task(
    "*/5 * * * *",  # Run every 5 minutes
    name="my_scheduled_task",
    description="This is my scheduled task",
)
async def my_scheduled_task():
    pass
    # Your task logic here

FastAPI Task Manager setup

# Code above omitted 👆

task_manager = TaskManager(
    config=ManagerConfig(
        redis_host=CONFIG.redis_host,
        redis_port=CONFIG.redis_port,
        redis_password=CONFIG.redis_password,
        redis_db=CONFIG.redis_db,
        concurrent_tasks=CONFIG.concurrent_tasks,
        redis_key_prefix=CONFIG.app_name,
    ),
    app=app,
)
my_example_task_group = TaskGroup(
    tags=["example"],
    name="My Example Task Group",
)
task_manager.add_task_group(my_example_task_group)

# Code below omitted 👇
👀 Full file preview
from fastapi import APIRouter, FastAPI
from pydantic_settings import BaseSettings, SettingsConfigDict

from fastapi_task_manager import Config as ManagerConfig  # renamed to avoid conflict with our app Config class
from fastapi_task_manager import TaskGroup, TaskManager


class Config(BaseSettings):
    model_config = SettingsConfigDict(
        # `.env.prod` takes priority over `.env`
        env_file=(".env", ".env.prod"),
        extra="forbid",
    )

    # --------- Redis config variables ---------
    redis_host: str
    redis_port: int = 6379
    redis_password: str | None = None
    redis_db: int = 0
    # --------- End of redis config variables ---------

    # --------- App config variables ---------
    app_name: str = "my_fastapi_app"
    concurrent_tasks: int = 3
    # --------- End of app config variables ---------


CONFIG = Config()  # ty: ignore[missing-argument]

app = FastAPI()
router = APIRouter()

task_manager = TaskManager(
    config=ManagerConfig(
        redis_host=CONFIG.redis_host,
        redis_port=CONFIG.redis_port,
        redis_password=CONFIG.redis_password,
        redis_db=CONFIG.redis_db,
        concurrent_tasks=CONFIG.concurrent_tasks,
        redis_key_prefix=CONFIG.app_name,
    ),
    app=app,
)
my_example_task_group = TaskGroup(
    tags=["example"],
    name="My Example Task Group",
)
task_manager.add_task_group(my_example_task_group)

manager_router = task_manager.get_manager_router()
router.include_router(
    manager_router,
    prefix="/task-manager",
    tags=["task-manager"],
)

app.include_router(router)


@my_example_task_group.add_task(
    "*/5 * * * *",  # Run every 5 minutes
    name="my_scheduled_task",
    description="This is my scheduled task",
)
async def my_scheduled_task():
    pass
    # Your task logic here

Add task management router

# Code above omitted 👆

manager_router = task_manager.get_manager_router()
router.include_router(
    manager_router,
    prefix="/task-manager",
    tags=["task-manager"],
)

# Code below omitted 👇
👀 Full file preview
from fastapi import APIRouter, FastAPI
from pydantic_settings import BaseSettings, SettingsConfigDict

from fastapi_task_manager import Config as ManagerConfig  # renamed to avoid conflict with our app Config class
from fastapi_task_manager import TaskGroup, TaskManager


class Config(BaseSettings):
    model_config = SettingsConfigDict(
        # `.env.prod` takes priority over `.env`
        env_file=(".env", ".env.prod"),
        extra="forbid",
    )

    # --------- Redis config variables ---------
    redis_host: str
    redis_port: int = 6379
    redis_password: str | None = None
    redis_db: int = 0
    # --------- End of redis config variables ---------

    # --------- App config variables ---------
    app_name: str = "my_fastapi_app"
    concurrent_tasks: int = 3
    # --------- End of app config variables ---------


CONFIG = Config()  # ty: ignore[missing-argument]

app = FastAPI()
router = APIRouter()

task_manager = TaskManager(
    config=ManagerConfig(
        redis_host=CONFIG.redis_host,
        redis_port=CONFIG.redis_port,
        redis_password=CONFIG.redis_password,
        redis_db=CONFIG.redis_db,
        concurrent_tasks=CONFIG.concurrent_tasks,
        redis_key_prefix=CONFIG.app_name,
    ),
    app=app,
)
my_example_task_group = TaskGroup(
    tags=["example"],
    name="My Example Task Group",
)
task_manager.add_task_group(my_example_task_group)

manager_router = task_manager.get_manager_router()
router.include_router(
    manager_router,
    prefix="/task-manager",
    tags=["task-manager"],
)

app.include_router(router)


@my_example_task_group.add_task(
    "*/5 * * * *",  # Run every 5 minutes
    name="my_scheduled_task",
    description="This is my scheduled task",
)
async def my_scheduled_task():
    pass
    # Your task logic here

Define a scheduled task

# Code above omitted 👆

@my_example_task_group.add_task(
    "*/5 * * * *",  # Run every 5 minutes
    name="my_scheduled_task",
    description="This is my scheduled task",
)
async def my_scheduled_task():
    pass
    # Your task logic here
👀 Full file preview
from fastapi import APIRouter, FastAPI
from pydantic_settings import BaseSettings, SettingsConfigDict

from fastapi_task_manager import Config as ManagerConfig  # renamed to avoid conflict with our app Config class
from fastapi_task_manager import TaskGroup, TaskManager


class Config(BaseSettings):
    model_config = SettingsConfigDict(
        # `.env.prod` takes priority over `.env`
        env_file=(".env", ".env.prod"),
        extra="forbid",
    )

    # --------- Redis config variables ---------
    redis_host: str
    redis_port: int = 6379
    redis_password: str | None = None
    redis_db: int = 0
    # --------- End of redis config variables ---------

    # --------- App config variables ---------
    app_name: str = "my_fastapi_app"
    concurrent_tasks: int = 3
    # --------- End of app config variables ---------


CONFIG = Config()  # ty: ignore[missing-argument]

app = FastAPI()
router = APIRouter()

task_manager = TaskManager(
    config=ManagerConfig(
        redis_host=CONFIG.redis_host,
        redis_port=CONFIG.redis_port,
        redis_password=CONFIG.redis_password,
        redis_db=CONFIG.redis_db,
        concurrent_tasks=CONFIG.concurrent_tasks,
        redis_key_prefix=CONFIG.app_name,
    ),
    app=app,
)
my_example_task_group = TaskGroup(
    tags=["example"],
    name="My Example Task Group",
)
task_manager.add_task_group(my_example_task_group)

manager_router = task_manager.get_manager_router()
router.include_router(
    manager_router,
    prefix="/task-manager",
    tags=["task-manager"],
)

app.include_router(router)


@my_example_task_group.add_task(
    "*/5 * * * *",  # Run every 5 minutes
    name="my_scheduled_task",
    description="This is my scheduled task",
)
async def my_scheduled_task():
    pass
    # Your task logic here

License

This project is licensed under the terms of the MIT license.