Task Handlers¶
Create and configure scheduled tasks using the @scheduler decorator with Asyncz triggers.
What You'll Learn¶
- Using the @scheduler decorator
- Available triggers (cron, interval, date)
- Task configuration options
- Advanced scheduling patterns
Quick Start¶
from ravyn.contrib.schedulers.asyncz.decorator import scheduler
@scheduler(name="daily_cleanup", trigger="cron", hour=2, minute=0)
async def cleanup_old_data():
"""Runs every day at 2:00 AM."""
await delete_old_records()
print("Cleanup completed")
Info
Install scheduler support: pip install ravyn[schedulers]
The @scheduler Decorator¶
The @scheduler decorator marks a function as a scheduled task.
Basic Usage¶
from ravyn.contrib.schedulers.asyncz.decorator import scheduler
@scheduler(name="my_task", trigger="interval", hours=1)
async def my_scheduled_task():
"""Runs every hour."""
print("Task executed!")
Decorator Parameters¶
| Parameter | Type | Description | Default |
|---|---|---|---|
name |
str | Task name | None |
trigger |
str/object | Trigger type or instance | None |
id |
str | Explicit task ID | None |
misfire_grace_time |
int | Seconds after scheduled time task can still run | undefined |
coalesce |
bool | Run once if multiple executions missed | undefined |
max_instances |
int | Max concurrent instances | undefined |
store |
str | Task store alias | None |
executor |
str | Executor alias | None |
is_enabled |
bool | Enable/disable task | True |
Triggers¶
CronTrigger - Schedule Like Cron¶
Run tasks at specific times using cron-style scheduling:
from ravyn.contrib.schedulers.asyncz.decorator import scheduler
# Every day at 9:00 AM
@scheduler(name="morning_report", trigger="cron", hour=9, minute=0)
async def generate_morning_report():
await create_daily_report()
# Every Monday at 10:00 AM
@scheduler(
name="weekly_summary",
trigger="cron",
day_of_week="mon",
hour=10,
minute=0
)
async def weekly_summary():
await send_weekly_email()
# First day of month at midnight
@scheduler(name="monthly_billing", trigger="cron", day=1, hour=0, minute=0)
async def process_monthly_billing():
await generate_invoices()
Cron Parameters:
| Parameter | Type | Description | Range |
|---|---|---|---|
year |
int/str | 4-digit year | - |
month |
int/str | Month | 1-12 |
day |
int/str | Day of month | 1-31 |
week |
int/str | ISO week | 1-53 |
day_of_week |
int/str | Weekday | 0-6 or mon-sun |
hour |
int/str | Hour | 0-23 |
minute |
int/str | Minute | 0-59 |
second |
int/str | Second | 0-59 |
IntervalTrigger - Fixed Intervals¶
Run tasks at regular intervals:
# Every 30 minutes
@scheduler(name="sync_data", trigger="interval", minutes=30)
async def sync_with_api():
await fetch_latest_data()
# Every 2 hours
@scheduler(name="backup", trigger="interval", hours=2)
async def backup_database():
await create_backup()
# Every day
@scheduler(name="cleanup", trigger="interval", days=1)
async def daily_cleanup():
await remove_old_files()
# Complex interval
@scheduler(
name="complex_task",
trigger="interval",
weeks=1,
days=2,
hours=3,
minutes=30
)
async def complex_interval():
# Runs every 1 week, 2 days, 3 hours, and 30 minutes
pass
Interval Parameters:
| Parameter | Type | Description |
|---|---|---|
weeks |
int | Number of weeks |
days |
int | Number of days |
hours |
int | Number of hours |
minutes |
int | Number of minutes |
seconds |
int | Number of seconds |
start_date |
datetime/str | Start time |
end_date |
datetime/str | End time |
DateTrigger - One-Time Execution¶
Run a task once at a specific date/time:
from datetime import datetime
# Run once at specific time
@scheduler(
name="campaign_launch",
trigger="date",
run_date=datetime(2026, 12, 25, 9, 0)
)
async def launch_campaign():
await activate_holiday_campaign()
# Run once (immediately if no date specified)
@scheduler(name="one_time_task", trigger="date")
async def run_once():
await initialize_system()
Advanced Triggers¶
OrTrigger - Multiple Schedules¶
Run when ANY trigger fires:
from asyncz.triggers import CronTrigger, IntervalTrigger, OrTrigger
# Run at 9 AM OR every 3 hours
trigger = OrTrigger([
CronTrigger(hour=9, minute=0),
IntervalTrigger(hours=3)
])
@scheduler(name="flexible_task", trigger=trigger)
async def flexible_execution():
await process_data()
AndTrigger - Combined Conditions¶
Run when ALL triggers agree:
from asyncz.triggers import CronTrigger, AndTrigger
# Run on weekdays AND between 9-5
trigger = AndTrigger([
CronTrigger(day_of_week="mon-fri"),
CronTrigger(hour="9-17")
])
@scheduler(name="business_hours_task", trigger=trigger)
async def during_business_hours():
await send_notifications()
Task Configuration¶
Preventing Overlaps¶
# Only 1 instance at a time
@scheduler(
name="heavy_task",
trigger="interval",
minutes=5,
max_instances=1
)
async def heavy_processing():
await process_large_dataset()
Handling Missed Runs¶
# If missed, run only once (not multiple times)
@scheduler(
name="sync_task",
trigger="interval",
minutes=10,
coalesce=True
)
async def sync_data():
await sync_with_external_api()
Grace Period¶
# Allow 60 seconds after scheduled time
@scheduler(
name="time_sensitive",
trigger="cron",
hour=9,
minute=0,
misfire_grace_time=60
)
async def time_sensitive_task():
await send_morning_email()
Complete Example¶
from ravyn import Ravyn
from ravyn.contrib.schedulers.asyncz.config import AsynczConfig
from ravyn.contrib.schedulers.asyncz.decorator import scheduler
# Define tasks
@scheduler(name="hourly_sync", trigger="interval", hours=1)
async def sync_data():
"""Sync data every hour."""
print("Syncing data...")
await fetch_from_api()
@scheduler(name="daily_report", trigger="cron", hour=8, minute=0)
async def generate_report():
"""Generate report at 8 AM daily."""
print("Generating report...")
await create_daily_report()
@scheduler(
name="cleanup",
trigger="cron",
hour=2,
minute=0,
max_instances=1
)
async def cleanup_old_data():
"""Cleanup at 2 AM, prevent overlaps."""
print("Cleaning up...")
await delete_old_records()
# Configure app
app = Ravyn(
scheduler_config=AsynczConfig(),
enable_scheduler=True,
scheduler_tasks={
"hourly_sync": "tasks.sync_data",
"daily_report": "tasks.generate_report",
"cleanup": "tasks.cleanup_old_data"
}
)
Best Practices¶
1. Use Descriptive Names¶
# Good - clear purpose
@scheduler(name="user_cleanup_daily", trigger="cron", hour=2)
async def cleanup_inactive_users():
pass
2. Prevent Overlaps for Long Tasks¶
# Good - prevent concurrent runs
@scheduler(
name="heavy_processing",
trigger="interval",
hours=1,
max_instances=1
)
async def process_large_files():
pass
3. Handle Errors Gracefully¶
# Good - error handling
@scheduler(name="api_sync", trigger="interval", minutes=15)
async def sync_with_api():
try:
await fetch_data()
except Exception as e:
logger.error(f"Sync failed: {e}")
# Maybe send alert
Stores and Executors¶
Configure custom stores and executors:
from ravyn import Ravyn
from ravyn.contrib.schedulers.asyncz.config import AsynczConfig
app = Ravyn(
scheduler_config=AsynczConfig(),
enable_scheduler=True,
scheduler_configurations={
"stores": {
"default": {"type": "memory"},
"redis": {"type": "redis", "host": "localhost"}
},
"executors": {
"default": {"type": "asyncio"},
"threadpool": {"type": "threadpool", "max_workers": 20}
}
}
)
# Use custom store/executor
@scheduler(
name="redis_task",
trigger="interval",
hours=1,
store="redis",
executor="threadpool"
)
async def task_with_custom_config():
pass
Learn More¶
- Asyncz Documentation - Complete trigger reference
- Scheduler Setup - Configure the scheduler
- SchedulerConfig - Configuration options
Next Steps¶
- Scheduler Configuration - Enable and configure
- Background Tasks - One-off tasks
- Lifespan Events - Application lifecycle