Background Jobs
Background work in the Python service is handled by TaskIQ with a Redis broker. TaskIQ workers run as a separate process alongside the FastAPI app.
Simple tasks
For one-off background work, use the @broker.task decorator:
from app.broker import broker
@broker.task
async def send_welcome_email(user_id: str) -> None:
# ... send the email
pass
Dispatch from application code:
await send_welcome_email.kiq(user_id="abc123")
.kiq() enqueues the task and returns immediately. The worker picks it up asynchronously.
Cron tasks
For scheduled recurring work, use the @cron decorator with distributed locking:
from app.broker import broker, cron
@cron(broker, "0 */6 * * *", lock_ttl=300)
async def sync_external_data(
db_session: AsyncSession = TaskiqDepends(get_db_session),
logger: Logger = TaskiqDepends(get_logger),
) -> None:
# runs every 6 hours
# lock_ttl=300 means the lock expires after 5 minutes
# if a previous run is still holding the lock, this invocation is skipped
pass
Key points about cron tasks:
- Distributed locking prevents multiple workers from running the same cron job simultaneously. Always set
lock_ttlto a reasonable upper bound for the task duration. - If a task exceeds its
lock_ttl, the lock expires and another worker may start a new run. Sizelock_ttlaccordingly. - Cron expressions use standard 5-field format (
minute hour day month weekday).
Run tracking
Scheduled cron jobs are tracked in the cron_job_runs table. Each execution records:
- Task name
- Start and end timestamps
- Status (success / failure)
- Error details (if failed)
This provides observability into scheduled work without needing external monitoring.
Dependency injection
TaskIQ uses TaskiqDepends for injecting dependencies into task functions. This is similar to FastAPI’s Depends but for the worker context:
from taskiq import TaskiqDepends
@broker.task
async def process_upload(
file_id: str,
db_session: AsyncSession = TaskiqDepends(get_db_session),
logger: Logger = TaskiqDepends(get_logger),
redis: Redis = TaskiqDepends(get_redis),
event_publisher: EventPublisher = TaskiqDepends(get_event_publisher),
) -> None:
ctx = Context(db=db_session, logger=logger)
# ... process the file
Available dependencies:
| Dependency | Provider | Description |
|---|---|---|
AsyncSession |
get_db_session |
Database session (auto-committed/rolled-back) |
Logger |
get_logger |
Structured logger |
Redis |
get_redis |
Redis client |
EventPublisher |
get_event_publisher |
Publish domain events |
Best practices
- Tasks must be idempotent. Redis delivery is at-least-once, so a task may run more than once. Design accordingly (use upserts, check for existing results before creating).
- Keep task arguments serializable. Pass IDs and simple values, not ORM objects. The task function should fetch fresh data from the database.
- Set reasonable timeouts. Long-running tasks should periodically check for cancellation or use
lock_ttlto bound execution. - Log generously. Background tasks lack request context. Use the injected logger to make debugging possible.
- Fail loudly. Let exceptions propagate so they are captured in run tracking. Do not silently swallow errors.