Skip to content

Dynamic Tasks

Dynamic tasks allow you to create and delete scheduled tasks at runtime via the REST API, without redeploying your application. Task definitions are persisted in Redis and automatically restored on startup.


How It Works

  1. Register functions in your task group using the @register_function() decorator
  2. Create tasks via POST /tasks specifying a registered function, a cron expression, and optional parameters
  3. Tasks are persisted in Redis and survive application restarts
  4. Delete tasks via DELETE /tasks when they are no longer needed

Step 1: Register Functions

Before you can create dynamic tasks, you need to register the functions that will be available. Use the @task_group.register_function() decorator:

Static tasks auto-register

Functions decorated with @task_group.add_task() are automatically registered in the function registry (unless you pass register=False). This means you can also create dynamic tasks from the same functions used by static tasks.

import logging

from fastapi import APIRouter, FastAPI

from fastapi_task_manager import Config, TaskGroup, TaskManager

logger = logging.getLogger(__name__)

app = FastAPI()
router = APIRouter()

# Create the task manager and a task group
task_manager = TaskManager(
    app=app,
    config=Config(redis_host="localhost"),
)
reports_group = TaskGroup(name="Reports", tags=["reports"])
task_manager.add_task_group(reports_group)


# Register functions available for dynamic task creation
@reports_group.register_function()
async def send_report(recipient: str = "[email protected]", report_type: str = "daily"):
    # Logic to generate and send a report
    logger.info("Sending %s report to %s", report_type, recipient)


@reports_group.register_function(name="generate_export")
async def export_data(file_format: str = "csv"):
    # Logic to export data
    logger.info("Exporting data as %s", file_format)

# Code below omitted 👇
👀 Full file preview
import logging

from fastapi import APIRouter, FastAPI

from fastapi_task_manager import Config, TaskGroup, TaskManager

logger = logging.getLogger(__name__)

app = FastAPI()
router = APIRouter()

# Create the task manager and a task group
task_manager = TaskManager(
    app=app,
    config=Config(redis_host="localhost"),
)
reports_group = TaskGroup(name="Reports", tags=["reports"])
task_manager.add_task_group(reports_group)


# Register functions available for dynamic task creation
@reports_group.register_function()
async def send_report(recipient: str = "[email protected]", report_type: str = "daily"):
    # Logic to generate and send a report
    logger.info("Sending %s report to %s", report_type, recipient)


@reports_group.register_function(name="generate_export")
async def export_data(file_format: str = "csv"):
    # Logic to export data
    logger.info("Exporting data as %s", file_format)


# Include the management router to expose task CRUD endpoints
manager_router = task_manager.get_manager_router()
router.include_router(
    manager_router,
    prefix="/task-manager",
    tags=["task-manager"],
)

app.include_router(router)

Step 2: Include the Management Router

Make sure the management router is included in your FastAPI app so the dynamic task API endpoints are available:

# Code above omitted 👆

# Include the management router to expose task CRUD endpoints
manager_router = task_manager.get_manager_router()
router.include_router(
    manager_router,
    prefix="/task-manager",
    tags=["task-manager"],
)

app.include_router(router)
👀 Full file preview
import logging

from fastapi import APIRouter, FastAPI

from fastapi_task_manager import Config, TaskGroup, TaskManager

logger = logging.getLogger(__name__)

app = FastAPI()
router = APIRouter()

# Create the task manager and a task group
task_manager = TaskManager(
    app=app,
    config=Config(redis_host="localhost"),
)
reports_group = TaskGroup(name="Reports", tags=["reports"])
task_manager.add_task_group(reports_group)


# Register functions available for dynamic task creation
@reports_group.register_function()
async def send_report(recipient: str = "[email protected]", report_type: str = "daily"):
    # Logic to generate and send a report
    logger.info("Sending %s report to %s", report_type, recipient)


@reports_group.register_function(name="generate_export")
async def export_data(file_format: str = "csv"):
    # Logic to export data
    logger.info("Exporting data as %s", file_format)


# Include the management router to expose task CRUD endpoints
manager_router = task_manager.get_manager_router()
router.include_router(
    manager_router,
    prefix="/task-manager",
    tags=["task-manager"],
)

app.include_router(router)

Step 3: Create Tasks via API

Use the POST /tasks endpoint to create a new dynamic task:

curl -X POST http://localhost:8000/task-manager/tasks \
  -H "Content-Type: application/json" \
  -d '{
    "task_group_name": "Reports",
    "function_name": "send_report",
    "cron_expression": "0 9 * * MON",
    "kwargs": {"recipient": "[email protected]", "report_type": "weekly"},
    "name": "weekly_team_report",
    "description": "Send weekly report to team every Monday at 9am",
    "tags": ["reports", "weekly"]
  }'

The response will confirm the task creation:

{
  "task_group_name": "Reports",
  "task_name": "weekly_team_report",
  "function_name": "send_report",
  "cron_expression": "0 9 * * MON",
  "kwargs": {"recipient": "[email protected]", "report_type": "weekly"},
  "dynamic": true
}

Step 4: List Available Functions

Use the GET /functions endpoint to see which functions are available for dynamic task creation:

curl http://localhost:8000/task-manager/functions
{
  "functions": [
    {"task_group_name": "Reports", "function_name": "send_report"},
    {"task_group_name": "Reports", "function_name": "generate_export"}
  ],
  "count": 2
}

You can filter by task group:

curl "http://localhost:8000/task-manager/functions?task_group_name=Reports"

Step 5: Delete Dynamic Tasks

Use the DELETE /tasks endpoint to remove a dynamic task:

curl -X DELETE "http://localhost:8000/task-manager/tasks?task_group_name=Reports&task_name=weekly_team_report"

This will:

  • Remove the task from the in-memory task list
  • Clean up all associated Redis keys (next_run, statistics, retry state, heartbeat)
  • Remove the persisted definition from Redis

Static tasks cannot be deleted

Only dynamic tasks (created via POST /tasks) can be deleted through the API. Attempting to delete a static task defined with @task_group.add_task() will return a 400 error.


Advanced Options

Per-Task Retry Backoff

You can override the global retry backoff settings for individual dynamic tasks:

curl -X POST http://localhost:8000/task-manager/tasks \
  -H "Content-Type: application/json" \
  -d '{
    "task_group_name": "Reports",
    "function_name": "send_report",
    "cron_expression": "0 */6 * * *",
    "kwargs": {"recipient": "[email protected]"},
    "name": "vip_report",
    "retry_backoff": 5.0,
    "retry_backoff_max": 300.0
  }'

High Priority Tasks

Set high_priority: true to give a task priority in scheduling:

curl -X POST http://localhost:8000/task-manager/tasks \
  -H "Content-Type: application/json" \
  -d '{
    "task_group_name": "Reports",
    "function_name": "send_report",
    "cron_expression": "0 8 * * *",
    "high_priority": true,
    "name": "critical_daily_report"
  }'

Custom Task Names

If you don't provide a name, one is auto-generated from the function name plus a hash of the kwargs and cron expression. Providing explicit names makes tasks easier to manage and monitor.


Persistence and Restart Behavior

Dynamic task definitions are stored in a Redis Hash. On application startup, the TaskManager automatically:

  1. Reads all persisted definitions from Redis
  2. Validates that the target task group and registered function still exist
  3. Recreates the in-memory task objects

If a registered function has been removed from the code since the task was created, the task is silently skipped with a warning log. You can clean up orphaned definitions by deleting them via the API or directly from Redis.


Programmatic API (Python)

In addition to the REST API, you can create and remove dynamic tasks directly from Python code. This is useful when you want to create tasks from your own endpoints, CLI commands, startup logic, or event handlers.

Creating Tasks from Code

Use task_group.add_dynamic_task() to create a task at runtime:

# Code above omitted 👆

@app.post("/custom/create-report-task")
async def create_custom_report_task(recipient: str, cron: str = "0 8 * * MON"):
    # Create a dynamic task from application code (e.g., from an endpoint, a CLI, or startup logic)
    task = reports_group.add_dynamic_task(
        function_name="send_report",
        cron_expression=cron,
        kwargs={"recipient": recipient, "report_type": "weekly"},
        description=f"Weekly report for {recipient}",
        tags=["reports", "weekly"],
    )
    return {"created": task.name}


@app.delete("/custom/delete-report-task")

# Code below omitted 👇
👀 Full file preview
import logging

from fastapi import APIRouter, FastAPI

from fastapi_task_manager import Config, TaskGroup, TaskManager

logger = logging.getLogger(__name__)

app = FastAPI()
router = APIRouter()

# Create the task manager and a task group
task_manager = TaskManager(
    app=app,
    config=Config(redis_host="localhost"),
)
reports_group = TaskGroup(name="Reports", tags=["reports"])
task_manager.add_task_group(reports_group)


# Register functions available for dynamic task creation
@reports_group.register_function()
async def send_report(recipient: str = "[email protected]", report_type: str = "daily"):
    # Logic to generate and send a report
    logger.info("Sending %s report to %s", report_type, recipient)


# Static tasks auto-register their function by default
@reports_group.add_task("0 9 * * *", name="daily_cleanup")
async def daily_cleanup():
    # This function is also available in the registry as "daily_cleanup"
    logger.info("Running daily cleanup")


# --- Creating dynamic tasks programmatically ---


@app.post("/custom/create-report-task")
async def create_custom_report_task(recipient: str, cron: str = "0 8 * * MON"):
    # Create a dynamic task from application code (e.g., from an endpoint, a CLI, or startup logic)
    task = reports_group.add_dynamic_task(
        function_name="send_report",
        cron_expression=cron,
        kwargs={"recipient": recipient, "report_type": "weekly"},
        description=f"Weekly report for {recipient}",
        tags=["reports", "weekly"],
    )
    return {"created": task.name}


@app.delete("/custom/delete-report-task")
async def delete_custom_report_task(task_name: str):
    # Remove a dynamic task programmatically
    removed = reports_group.remove_dynamic_task(task_name)
    return {"removed": removed.name}


# Include the management router
manager_router = task_manager.get_manager_router()
router.include_router(manager_router, prefix="/task-manager", tags=["task-manager"])
app.include_router(router)

The method accepts the same parameters available in the REST API:

Parameter Type Required Description
function_name str Yes Name of a registered function
cron_expression str Yes Cron schedule expression
kwargs dict No Arguments passed to the function
name str No Custom task name (auto-generated if omitted)
description str No Human-readable description
high_priority bool No Use the high-priority stream
tags list[str] No Tags for filtering
retry_backoff float No Initial retry delay in seconds
retry_backoff_max float No Maximum retry delay in seconds

The method returns the created Task object and raises RuntimeError if the function is not registered or the task name is already taken.

Removing Tasks from Code

Use task_group.remove_dynamic_task() to remove a dynamic task:

# Code above omitted 👆

    removed = reports_group.remove_dynamic_task(task_name)
    return {"removed": removed.name}



# Code below omitted 👇
👀 Full file preview
import logging

from fastapi import APIRouter, FastAPI

from fastapi_task_manager import Config, TaskGroup, TaskManager

logger = logging.getLogger(__name__)

app = FastAPI()
router = APIRouter()

# Create the task manager and a task group
task_manager = TaskManager(
    app=app,
    config=Config(redis_host="localhost"),
)
reports_group = TaskGroup(name="Reports", tags=["reports"])
task_manager.add_task_group(reports_group)


# Register functions available for dynamic task creation
@reports_group.register_function()
async def send_report(recipient: str = "[email protected]", report_type: str = "daily"):
    # Logic to generate and send a report
    logger.info("Sending %s report to %s", report_type, recipient)


# Static tasks auto-register their function by default
@reports_group.add_task("0 9 * * *", name="daily_cleanup")
async def daily_cleanup():
    # This function is also available in the registry as "daily_cleanup"
    logger.info("Running daily cleanup")


# --- Creating dynamic tasks programmatically ---


@app.post("/custom/create-report-task")
async def create_custom_report_task(recipient: str, cron: str = "0 8 * * MON"):
    # Create a dynamic task from application code (e.g., from an endpoint, a CLI, or startup logic)
    task = reports_group.add_dynamic_task(
        function_name="send_report",
        cron_expression=cron,
        kwargs={"recipient": recipient, "report_type": "weekly"},
        description=f"Weekly report for {recipient}",
        tags=["reports", "weekly"],
    )
    return {"created": task.name}


@app.delete("/custom/delete-report-task")
async def delete_custom_report_task(task_name: str):
    # Remove a dynamic task programmatically
    removed = reports_group.remove_dynamic_task(task_name)
    return {"removed": removed.name}


# Include the management router
manager_router = task_manager.get_manager_router()
router.include_router(manager_router, prefix="/task-manager", tags=["task-manager"])
app.include_router(router)

Only dynamic tasks can be removed — static tasks (defined with @add_task()) raise a RuntimeError.

No automatic Redis persistence

When using the Python API directly, the task is added only in-memory. It will not survive application restarts unless you also persist it to Redis (as the REST API does automatically). If you need persistence, use the REST API (POST /tasks) or replicate the Redis Hash logic from task_router_services.create_dynamic_task().

Complete Python API Example

import logging

from fastapi import APIRouter, FastAPI

from fastapi_task_manager import Config, TaskGroup, TaskManager

logger = logging.getLogger(__name__)

app = FastAPI()
router = APIRouter()

# Create the task manager and a task group
task_manager = TaskManager(
    app=app,
    config=Config(redis_host="localhost"),
)
reports_group = TaskGroup(name="Reports", tags=["reports"])
task_manager.add_task_group(reports_group)


# Register functions available for dynamic task creation
@reports_group.register_function()
async def send_report(recipient: str = "[email protected]", report_type: str = "daily"):
    # Logic to generate and send a report
    logger.info("Sending %s report to %s", report_type, recipient)


# Static tasks auto-register their function by default
@reports_group.add_task("0 9 * * *", name="daily_cleanup")
async def daily_cleanup():
    # This function is also available in the registry as "daily_cleanup"
    logger.info("Running daily cleanup")


# --- Creating dynamic tasks programmatically ---


@app.post("/custom/create-report-task")
async def create_custom_report_task(recipient: str, cron: str = "0 8 * * MON"):
    # Create a dynamic task from application code (e.g., from an endpoint, a CLI, or startup logic)
    task = reports_group.add_dynamic_task(
        function_name="send_report",
        cron_expression=cron,
        kwargs={"recipient": recipient, "report_type": "weekly"},
        description=f"Weekly report for {recipient}",
        tags=["reports", "weekly"],
    )
    return {"created": task.name}


@app.delete("/custom/delete-report-task")
async def delete_custom_report_task(task_name: str):
    # Remove a dynamic task programmatically
    removed = reports_group.remove_dynamic_task(task_name)
    return {"removed": removed.name}


# Include the management router
manager_router = task_manager.get_manager_router()
router.include_router(manager_router, prefix="/task-manager", tags=["task-manager"])
app.include_router(router)

Complete REST API Example

import logging

from fastapi import APIRouter, FastAPI

from fastapi_task_manager import Config, TaskGroup, TaskManager

logger = logging.getLogger(__name__)

app = FastAPI()
router = APIRouter()

# Create the task manager and a task group
task_manager = TaskManager(
    app=app,
    config=Config(redis_host="localhost"),
)
reports_group = TaskGroup(name="Reports", tags=["reports"])
task_manager.add_task_group(reports_group)


# Register functions available for dynamic task creation
@reports_group.register_function()
async def send_report(recipient: str = "[email protected]", report_type: str = "daily"):
    # Logic to generate and send a report
    logger.info("Sending %s report to %s", report_type, recipient)


@reports_group.register_function(name="generate_export")
async def export_data(file_format: str = "csv"):
    # Logic to export data
    logger.info("Exporting data as %s", file_format)


# Include the management router to expose task CRUD endpoints
manager_router = task_manager.get_manager_router()
router.include_router(
    manager_router,
    prefix="/task-manager",
    tags=["task-manager"],
)

app.include_router(router)