FastAPI Task Manager
Lightweight, efficient and fast to code scheduled task management system built on FastAPI
Documentation: https://fastapi-task-manager.morry98.com
Source Code: https://github.com/Morry98/fastapi-task-manager
Overview¶
FastAPI Task Manager is a lightweight and efficient scheduled task management system built on top of FastAPI and Redis. It is designed to help developers easily create, manage, and execute scheduled tasks within their FastAPI applications.
Key Features¶
- FastAPI Extension - Built as an extension to FastAPI, making it easy to integrate into existing FastAPI applications and leverage its features
- Redis Streams Architecture - Uses Redis Streams with leader election for distributed task scheduling and single-instance execution safety
- Fast to Code - Increase the speed to develop scheduled tasks by about 400% to 500%, only a wrapper function is needed*
- Fewer Bugs - Reduce about 60% of human (developer) induced errors managing lock, Redis keys and task execution*
- Scheduled Tasks - Provides a simple and intuitive API for defining and scheduling tasks to run at specific intervals or times
- Task Management - Includes FastAPI router with 12 endpoints to manage tasks: disable, enable, trigger, monitor health, and more
- Dynamic Tasks - Create and delete tasks at runtime via REST API without redeploying
- Retry with Backoff - Automatic exponential backoff on task failures with configurable per-task overrides
- Fault Tolerant - Task heartbeat monitoring, automatic reconciliation of stale tasks, and leader failover
- Easy to Use - Designed to be easy to use and learn. Less time reading docs
- Robust - Get production-ready code
* estimation based on real production task migrated to FastAPI task manager from custom "cron job" solution.
Requirements¶
FastAPI Task Manager stands on the shoulders of giants:
- FastAPI - Modern, fast web framework for building APIs
- Redis - In-memory data structure store for task storage and locking
Installation¶
Prerequisites
You need to have a FastAPI project set up. If you don't have one, check the FastAPI installation tutorial.
Install FastAPI Task Manager:
Installed 1 package in XXms
+ fastapi-task-manager==x.x.x
Updating dependencies
Resolving dependencies... (0.1s)
Package operations: 1 install, 0 updates, 0 removals
Quick Example¶
Here's a one file simple example to get you started.
In this example, we create a FastAPI application with FastAPI Task Manager integrated.
We define a scheduled task that runs every five minutes and we include the optional task management router to manage our tasks.
Non production example
This is a simple example contained in a single file for demonstration purposes only. In a real-world application, you would typically organize your code into multiple files and modules for better maintainability and scalability.
In learn section of the docs, you can find a more complete explanation of how to set up FastAPI Task Manager in a more structured way.
Imports¶
# Code above omitted 👆
from fastapi_task_manager import Config as ManagerConfig # renamed to avoid conflict with our app Config class
from fastapi_task_manager import TaskGroup, TaskManager
# Code below omitted 👇
👀 Full file preview
from fastapi import APIRouter, FastAPI
from pydantic_settings import BaseSettings, SettingsConfigDict
from fastapi_task_manager import Config as ManagerConfig # renamed to avoid conflict with our app Config class
from fastapi_task_manager import TaskGroup, TaskManager
class Config(BaseSettings):
model_config = SettingsConfigDict(
# `.env.prod` takes priority over `.env`
env_file=(".env", ".env.prod"),
extra="forbid",
)
# --------- Redis config variables ---------
redis_host: str
redis_port: int = 6379
redis_password: str | None = None
redis_db: int = 0
# --------- End of redis config variables ---------
# --------- App config variables ---------
app_name: str = "my_fastapi_app"
concurrent_tasks: int = 3
# --------- End of app config variables ---------
CONFIG = Config() # ty: ignore[missing-argument]
app = FastAPI()
router = APIRouter()
task_manager = TaskManager(
config=ManagerConfig(
redis_host=CONFIG.redis_host,
redis_port=CONFIG.redis_port,
redis_password=CONFIG.redis_password,
redis_db=CONFIG.redis_db,
concurrent_tasks=CONFIG.concurrent_tasks,
redis_key_prefix=CONFIG.app_name,
),
app=app,
)
my_example_task_group = TaskGroup(
tags=["example"],
name="My Example Task Group",
)
task_manager.add_task_group(my_example_task_group)
manager_router = task_manager.get_manager_router()
router.include_router(
manager_router,
prefix="/task-manager",
tags=["task-manager"],
)
app.include_router(router)
@my_example_task_group.add_task(
"*/5 * * * *", # Run every 5 minutes
name="my_scheduled_task",
description="This is my scheduled task",
)
async def my_scheduled_task():
pass
# Your task logic here
FastAPI Task Manager setup¶
# Code above omitted 👆
task_manager = TaskManager(
config=ManagerConfig(
redis_host=CONFIG.redis_host,
redis_port=CONFIG.redis_port,
redis_password=CONFIG.redis_password,
redis_db=CONFIG.redis_db,
concurrent_tasks=CONFIG.concurrent_tasks,
redis_key_prefix=CONFIG.app_name,
),
app=app,
)
my_example_task_group = TaskGroup(
tags=["example"],
name="My Example Task Group",
)
task_manager.add_task_group(my_example_task_group)
# Code below omitted 👇
👀 Full file preview
from fastapi import APIRouter, FastAPI
from pydantic_settings import BaseSettings, SettingsConfigDict
from fastapi_task_manager import Config as ManagerConfig # renamed to avoid conflict with our app Config class
from fastapi_task_manager import TaskGroup, TaskManager
class Config(BaseSettings):
model_config = SettingsConfigDict(
# `.env.prod` takes priority over `.env`
env_file=(".env", ".env.prod"),
extra="forbid",
)
# --------- Redis config variables ---------
redis_host: str
redis_port: int = 6379
redis_password: str | None = None
redis_db: int = 0
# --------- End of redis config variables ---------
# --------- App config variables ---------
app_name: str = "my_fastapi_app"
concurrent_tasks: int = 3
# --------- End of app config variables ---------
CONFIG = Config() # ty: ignore[missing-argument]
app = FastAPI()
router = APIRouter()
task_manager = TaskManager(
config=ManagerConfig(
redis_host=CONFIG.redis_host,
redis_port=CONFIG.redis_port,
redis_password=CONFIG.redis_password,
redis_db=CONFIG.redis_db,
concurrent_tasks=CONFIG.concurrent_tasks,
redis_key_prefix=CONFIG.app_name,
),
app=app,
)
my_example_task_group = TaskGroup(
tags=["example"],
name="My Example Task Group",
)
task_manager.add_task_group(my_example_task_group)
manager_router = task_manager.get_manager_router()
router.include_router(
manager_router,
prefix="/task-manager",
tags=["task-manager"],
)
app.include_router(router)
@my_example_task_group.add_task(
"*/5 * * * *", # Run every 5 minutes
name="my_scheduled_task",
description="This is my scheduled task",
)
async def my_scheduled_task():
pass
# Your task logic here
Add task management router¶
# Code above omitted 👆
manager_router = task_manager.get_manager_router()
router.include_router(
manager_router,
prefix="/task-manager",
tags=["task-manager"],
)
# Code below omitted 👇
👀 Full file preview
from fastapi import APIRouter, FastAPI
from pydantic_settings import BaseSettings, SettingsConfigDict
from fastapi_task_manager import Config as ManagerConfig # renamed to avoid conflict with our app Config class
from fastapi_task_manager import TaskGroup, TaskManager
class Config(BaseSettings):
model_config = SettingsConfigDict(
# `.env.prod` takes priority over `.env`
env_file=(".env", ".env.prod"),
extra="forbid",
)
# --------- Redis config variables ---------
redis_host: str
redis_port: int = 6379
redis_password: str | None = None
redis_db: int = 0
# --------- End of redis config variables ---------
# --------- App config variables ---------
app_name: str = "my_fastapi_app"
concurrent_tasks: int = 3
# --------- End of app config variables ---------
CONFIG = Config() # ty: ignore[missing-argument]
app = FastAPI()
router = APIRouter()
task_manager = TaskManager(
config=ManagerConfig(
redis_host=CONFIG.redis_host,
redis_port=CONFIG.redis_port,
redis_password=CONFIG.redis_password,
redis_db=CONFIG.redis_db,
concurrent_tasks=CONFIG.concurrent_tasks,
redis_key_prefix=CONFIG.app_name,
),
app=app,
)
my_example_task_group = TaskGroup(
tags=["example"],
name="My Example Task Group",
)
task_manager.add_task_group(my_example_task_group)
manager_router = task_manager.get_manager_router()
router.include_router(
manager_router,
prefix="/task-manager",
tags=["task-manager"],
)
app.include_router(router)
@my_example_task_group.add_task(
"*/5 * * * *", # Run every 5 minutes
name="my_scheduled_task",
description="This is my scheduled task",
)
async def my_scheduled_task():
pass
# Your task logic here
Define a scheduled task¶
# Code above omitted 👆
@my_example_task_group.add_task(
"*/5 * * * *", # Run every 5 minutes
name="my_scheduled_task",
description="This is my scheduled task",
)
async def my_scheduled_task():
pass
# Your task logic here
👀 Full file preview
from fastapi import APIRouter, FastAPI
from pydantic_settings import BaseSettings, SettingsConfigDict
from fastapi_task_manager import Config as ManagerConfig # renamed to avoid conflict with our app Config class
from fastapi_task_manager import TaskGroup, TaskManager
class Config(BaseSettings):
model_config = SettingsConfigDict(
# `.env.prod` takes priority over `.env`
env_file=(".env", ".env.prod"),
extra="forbid",
)
# --------- Redis config variables ---------
redis_host: str
redis_port: int = 6379
redis_password: str | None = None
redis_db: int = 0
# --------- End of redis config variables ---------
# --------- App config variables ---------
app_name: str = "my_fastapi_app"
concurrent_tasks: int = 3
# --------- End of app config variables ---------
CONFIG = Config() # ty: ignore[missing-argument]
app = FastAPI()
router = APIRouter()
task_manager = TaskManager(
config=ManagerConfig(
redis_host=CONFIG.redis_host,
redis_port=CONFIG.redis_port,
redis_password=CONFIG.redis_password,
redis_db=CONFIG.redis_db,
concurrent_tasks=CONFIG.concurrent_tasks,
redis_key_prefix=CONFIG.app_name,
),
app=app,
)
my_example_task_group = TaskGroup(
tags=["example"],
name="My Example Task Group",
)
task_manager.add_task_group(my_example_task_group)
manager_router = task_manager.get_manager_router()
router.include_router(
manager_router,
prefix="/task-manager",
tags=["task-manager"],
)
app.include_router(router)
@my_example_task_group.add_task(
"*/5 * * * *", # Run every 5 minutes
name="my_scheduled_task",
description="This is my scheduled task",
)
async def my_scheduled_task():
pass
# Your task logic here
License¶
This project is licensed under the terms of the MIT license.