Dynamic Tasks¶
Dynamic tasks allow you to create and delete scheduled tasks at runtime via the REST API, without redeploying your application. Task definitions are persisted in Redis and automatically restored on startup.
How It Works¶
- Register functions in your task group using the
@register_function()decorator - Create tasks via
POST /tasksspecifying a registered function, a cron expression, and optional parameters - Tasks are persisted in Redis and survive application restarts
- Delete tasks via
DELETE /taskswhen they are no longer needed
Step 1: Register Functions¶
Before you can create dynamic tasks, you need to register the functions that will be available. Use the @task_group.register_function() decorator:
Static tasks auto-register
Functions decorated with @task_group.add_task() are automatically registered in the function registry (unless you pass register=False). This means you can also create dynamic tasks from the same functions used by static tasks.
import logging
from fastapi import APIRouter, FastAPI
from fastapi_task_manager import Config, TaskGroup, TaskManager
logger = logging.getLogger(__name__)
app = FastAPI()
router = APIRouter()
# Create the task manager and a task group
task_manager = TaskManager(
app=app,
config=Config(redis_host="localhost"),
)
reports_group = TaskGroup(name="Reports", tags=["reports"])
task_manager.add_task_group(reports_group)
# Register functions available for dynamic task creation
@reports_group.register_function()
async def send_report(recipient: str = "[email protected]", report_type: str = "daily"):
# Logic to generate and send a report
logger.info("Sending %s report to %s", report_type, recipient)
@reports_group.register_function(name="generate_export")
async def export_data(file_format: str = "csv"):
# Logic to export data
logger.info("Exporting data as %s", file_format)
# Code below omitted 👇
👀 Full file preview
import logging
from fastapi import APIRouter, FastAPI
from fastapi_task_manager import Config, TaskGroup, TaskManager
logger = logging.getLogger(__name__)
app = FastAPI()
router = APIRouter()
# Create the task manager and a task group
task_manager = TaskManager(
app=app,
config=Config(redis_host="localhost"),
)
reports_group = TaskGroup(name="Reports", tags=["reports"])
task_manager.add_task_group(reports_group)
# Register functions available for dynamic task creation
@reports_group.register_function()
async def send_report(recipient: str = "[email protected]", report_type: str = "daily"):
# Logic to generate and send a report
logger.info("Sending %s report to %s", report_type, recipient)
@reports_group.register_function(name="generate_export")
async def export_data(file_format: str = "csv"):
# Logic to export data
logger.info("Exporting data as %s", file_format)
# Include the management router to expose task CRUD endpoints
manager_router = task_manager.get_manager_router()
router.include_router(
manager_router,
prefix="/task-manager",
tags=["task-manager"],
)
app.include_router(router)
Step 2: Include the Management Router¶
Make sure the management router is included in your FastAPI app so the dynamic task API endpoints are available:
# Code above omitted 👆
# Include the management router to expose task CRUD endpoints
manager_router = task_manager.get_manager_router()
router.include_router(
manager_router,
prefix="/task-manager",
tags=["task-manager"],
)
app.include_router(router)
👀 Full file preview
import logging
from fastapi import APIRouter, FastAPI
from fastapi_task_manager import Config, TaskGroup, TaskManager
logger = logging.getLogger(__name__)
app = FastAPI()
router = APIRouter()
# Create the task manager and a task group
task_manager = TaskManager(
app=app,
config=Config(redis_host="localhost"),
)
reports_group = TaskGroup(name="Reports", tags=["reports"])
task_manager.add_task_group(reports_group)
# Register functions available for dynamic task creation
@reports_group.register_function()
async def send_report(recipient: str = "[email protected]", report_type: str = "daily"):
# Logic to generate and send a report
logger.info("Sending %s report to %s", report_type, recipient)
@reports_group.register_function(name="generate_export")
async def export_data(file_format: str = "csv"):
# Logic to export data
logger.info("Exporting data as %s", file_format)
# Include the management router to expose task CRUD endpoints
manager_router = task_manager.get_manager_router()
router.include_router(
manager_router,
prefix="/task-manager",
tags=["task-manager"],
)
app.include_router(router)
Step 3: Create Tasks via API¶
Use the POST /tasks endpoint to create a new dynamic task:
curl -X POST http://localhost:8000/task-manager/tasks \
-H "Content-Type: application/json" \
-d '{
"task_group_name": "Reports",
"function_name": "send_report",
"cron_expression": "0 9 * * MON",
"kwargs": {"recipient": "[email protected]", "report_type": "weekly"},
"name": "weekly_team_report",
"description": "Send weekly report to team every Monday at 9am",
"tags": ["reports", "weekly"]
}'
The response will confirm the task creation:
{
"task_group_name": "Reports",
"task_name": "weekly_team_report",
"function_name": "send_report",
"cron_expression": "0 9 * * MON",
"kwargs": {"recipient": "[email protected]", "report_type": "weekly"},
"dynamic": true
}
Step 4: List Available Functions¶
Use the GET /functions endpoint to see which functions are available for dynamic task creation:
curl http://localhost:8000/task-manager/functions
{
"functions": [
{"task_group_name": "Reports", "function_name": "send_report"},
{"task_group_name": "Reports", "function_name": "generate_export"}
],
"count": 2
}
You can filter by task group:
curl "http://localhost:8000/task-manager/functions?task_group_name=Reports"
Step 5: Delete Dynamic Tasks¶
Use the DELETE /tasks endpoint to remove a dynamic task:
curl -X DELETE "http://localhost:8000/task-manager/tasks?task_group_name=Reports&task_name=weekly_team_report"
This will:
- Remove the task from the in-memory task list
- Clean up all associated Redis keys (next_run, statistics, retry state, heartbeat)
- Remove the persisted definition from Redis
Static tasks cannot be deleted
Only dynamic tasks (created via POST /tasks) can be deleted through the API. Attempting to delete a static task defined with @task_group.add_task() will return a 400 error.
Advanced Options¶
Per-Task Retry Backoff¶
You can override the global retry backoff settings for individual dynamic tasks:
curl -X POST http://localhost:8000/task-manager/tasks \
-H "Content-Type: application/json" \
-d '{
"task_group_name": "Reports",
"function_name": "send_report",
"cron_expression": "0 */6 * * *",
"kwargs": {"recipient": "[email protected]"},
"name": "vip_report",
"retry_backoff": 5.0,
"retry_backoff_max": 300.0
}'
High Priority Tasks¶
Set high_priority: true to give a task priority in scheduling:
curl -X POST http://localhost:8000/task-manager/tasks \
-H "Content-Type: application/json" \
-d '{
"task_group_name": "Reports",
"function_name": "send_report",
"cron_expression": "0 8 * * *",
"high_priority": true,
"name": "critical_daily_report"
}'
Custom Task Names¶
If you don't provide a name, one is auto-generated from the function name plus a hash of the kwargs and cron expression. Providing explicit names makes tasks easier to manage and monitor.
Persistence and Restart Behavior¶
Dynamic task definitions are stored in a Redis Hash. On application startup, the TaskManager automatically:
- Reads all persisted definitions from Redis
- Validates that the target task group and registered function still exist
- Recreates the in-memory task objects
If a registered function has been removed from the code since the task was created, the task is silently skipped with a warning log. You can clean up orphaned definitions by deleting them via the API or directly from Redis.
Programmatic API (Python)¶
In addition to the REST API, you can create and remove dynamic tasks directly from Python code. This is useful when you want to create tasks from your own endpoints, CLI commands, startup logic, or event handlers.
Creating Tasks from Code¶
Use task_group.add_dynamic_task() to create a task at runtime:
# Code above omitted 👆
@app.post("/custom/create-report-task")
async def create_custom_report_task(recipient: str, cron: str = "0 8 * * MON"):
# Create a dynamic task from application code (e.g., from an endpoint, a CLI, or startup logic)
task = reports_group.add_dynamic_task(
function_name="send_report",
cron_expression=cron,
kwargs={"recipient": recipient, "report_type": "weekly"},
description=f"Weekly report for {recipient}",
tags=["reports", "weekly"],
)
return {"created": task.name}
@app.delete("/custom/delete-report-task")
# Code below omitted 👇
👀 Full file preview
import logging
from fastapi import APIRouter, FastAPI
from fastapi_task_manager import Config, TaskGroup, TaskManager
logger = logging.getLogger(__name__)
app = FastAPI()
router = APIRouter()
# Create the task manager and a task group
task_manager = TaskManager(
app=app,
config=Config(redis_host="localhost"),
)
reports_group = TaskGroup(name="Reports", tags=["reports"])
task_manager.add_task_group(reports_group)
# Register functions available for dynamic task creation
@reports_group.register_function()
async def send_report(recipient: str = "[email protected]", report_type: str = "daily"):
# Logic to generate and send a report
logger.info("Sending %s report to %s", report_type, recipient)
# Static tasks auto-register their function by default
@reports_group.add_task("0 9 * * *", name="daily_cleanup")
async def daily_cleanup():
# This function is also available in the registry as "daily_cleanup"
logger.info("Running daily cleanup")
# --- Creating dynamic tasks programmatically ---
@app.post("/custom/create-report-task")
async def create_custom_report_task(recipient: str, cron: str = "0 8 * * MON"):
# Create a dynamic task from application code (e.g., from an endpoint, a CLI, or startup logic)
task = reports_group.add_dynamic_task(
function_name="send_report",
cron_expression=cron,
kwargs={"recipient": recipient, "report_type": "weekly"},
description=f"Weekly report for {recipient}",
tags=["reports", "weekly"],
)
return {"created": task.name}
@app.delete("/custom/delete-report-task")
async def delete_custom_report_task(task_name: str):
# Remove a dynamic task programmatically
removed = reports_group.remove_dynamic_task(task_name)
return {"removed": removed.name}
# Include the management router
manager_router = task_manager.get_manager_router()
router.include_router(manager_router, prefix="/task-manager", tags=["task-manager"])
app.include_router(router)
The method accepts the same parameters available in the REST API:
| Parameter | Type | Required | Description |
|---|---|---|---|
function_name |
str |
Yes | Name of a registered function |
cron_expression |
str |
Yes | Cron schedule expression |
kwargs |
dict |
No | Arguments passed to the function |
name |
str |
No | Custom task name (auto-generated if omitted) |
description |
str |
No | Human-readable description |
high_priority |
bool |
No | Use the high-priority stream |
tags |
list[str] |
No | Tags for filtering |
retry_backoff |
float |
No | Initial retry delay in seconds |
retry_backoff_max |
float |
No | Maximum retry delay in seconds |
The method returns the created Task object and raises RuntimeError if the function is not registered or the task name is already taken.
Removing Tasks from Code¶
Use task_group.remove_dynamic_task() to remove a dynamic task:
# Code above omitted 👆
removed = reports_group.remove_dynamic_task(task_name)
return {"removed": removed.name}
# Code below omitted 👇
👀 Full file preview
import logging
from fastapi import APIRouter, FastAPI
from fastapi_task_manager import Config, TaskGroup, TaskManager
logger = logging.getLogger(__name__)
app = FastAPI()
router = APIRouter()
# Create the task manager and a task group
task_manager = TaskManager(
app=app,
config=Config(redis_host="localhost"),
)
reports_group = TaskGroup(name="Reports", tags=["reports"])
task_manager.add_task_group(reports_group)
# Register functions available for dynamic task creation
@reports_group.register_function()
async def send_report(recipient: str = "[email protected]", report_type: str = "daily"):
# Logic to generate and send a report
logger.info("Sending %s report to %s", report_type, recipient)
# Static tasks auto-register their function by default
@reports_group.add_task("0 9 * * *", name="daily_cleanup")
async def daily_cleanup():
# This function is also available in the registry as "daily_cleanup"
logger.info("Running daily cleanup")
# --- Creating dynamic tasks programmatically ---
@app.post("/custom/create-report-task")
async def create_custom_report_task(recipient: str, cron: str = "0 8 * * MON"):
# Create a dynamic task from application code (e.g., from an endpoint, a CLI, or startup logic)
task = reports_group.add_dynamic_task(
function_name="send_report",
cron_expression=cron,
kwargs={"recipient": recipient, "report_type": "weekly"},
description=f"Weekly report for {recipient}",
tags=["reports", "weekly"],
)
return {"created": task.name}
@app.delete("/custom/delete-report-task")
async def delete_custom_report_task(task_name: str):
# Remove a dynamic task programmatically
removed = reports_group.remove_dynamic_task(task_name)
return {"removed": removed.name}
# Include the management router
manager_router = task_manager.get_manager_router()
router.include_router(manager_router, prefix="/task-manager", tags=["task-manager"])
app.include_router(router)
Only dynamic tasks can be removed — static tasks (defined with @add_task()) raise a RuntimeError.
No automatic Redis persistence
When using the Python API directly, the task is added only in-memory. It will not survive application restarts unless you also persist it to Redis (as the REST API does automatically). If you need persistence, use the REST API (POST /tasks) or replicate the Redis Hash logic from task_router_services.create_dynamic_task().
Complete Python API Example¶
import logging
from fastapi import APIRouter, FastAPI
from fastapi_task_manager import Config, TaskGroup, TaskManager
logger = logging.getLogger(__name__)
app = FastAPI()
router = APIRouter()
# Create the task manager and a task group
task_manager = TaskManager(
app=app,
config=Config(redis_host="localhost"),
)
reports_group = TaskGroup(name="Reports", tags=["reports"])
task_manager.add_task_group(reports_group)
# Register functions available for dynamic task creation
@reports_group.register_function()
async def send_report(recipient: str = "[email protected]", report_type: str = "daily"):
# Logic to generate and send a report
logger.info("Sending %s report to %s", report_type, recipient)
# Static tasks auto-register their function by default
@reports_group.add_task("0 9 * * *", name="daily_cleanup")
async def daily_cleanup():
# This function is also available in the registry as "daily_cleanup"
logger.info("Running daily cleanup")
# --- Creating dynamic tasks programmatically ---
@app.post("/custom/create-report-task")
async def create_custom_report_task(recipient: str, cron: str = "0 8 * * MON"):
# Create a dynamic task from application code (e.g., from an endpoint, a CLI, or startup logic)
task = reports_group.add_dynamic_task(
function_name="send_report",
cron_expression=cron,
kwargs={"recipient": recipient, "report_type": "weekly"},
description=f"Weekly report for {recipient}",
tags=["reports", "weekly"],
)
return {"created": task.name}
@app.delete("/custom/delete-report-task")
async def delete_custom_report_task(task_name: str):
# Remove a dynamic task programmatically
removed = reports_group.remove_dynamic_task(task_name)
return {"removed": removed.name}
# Include the management router
manager_router = task_manager.get_manager_router()
router.include_router(manager_router, prefix="/task-manager", tags=["task-manager"])
app.include_router(router)
Complete REST API Example¶
import logging
from fastapi import APIRouter, FastAPI
from fastapi_task_manager import Config, TaskGroup, TaskManager
logger = logging.getLogger(__name__)
app = FastAPI()
router = APIRouter()
# Create the task manager and a task group
task_manager = TaskManager(
app=app,
config=Config(redis_host="localhost"),
)
reports_group = TaskGroup(name="Reports", tags=["reports"])
task_manager.add_task_group(reports_group)
# Register functions available for dynamic task creation
@reports_group.register_function()
async def send_report(recipient: str = "[email protected]", report_type: str = "daily"):
# Logic to generate and send a report
logger.info("Sending %s report to %s", report_type, recipient)
@reports_group.register_function(name="generate_export")
async def export_data(file_format: str = "csv"):
# Logic to export data
logger.info("Exporting data as %s", file_format)
# Include the management router to expose task CRUD endpoints
manager_router = task_manager.get_manager_router()
router.include_router(
manager_router,
prefix="/task-manager",
tags=["task-manager"],
)
app.include_router(router)