Task Resilience (Retries & Timeouts)
Building robust applications requires handling transient failures and preventing tasks from consuming excessive resources. Task resilience, through retries and timeouts, provides mechanisms to achieve this by automatically re-attempting failed operations and enforcing execution limits.
Retries
Retries enable tasks to automatically re-execute upon failure, mitigating issues caused by temporary network glitches, resource unavailability, or other transient errors. This mechanism enhances the reliability of your tasks without requiring manual intervention.
Retries are configured using the RetryStrategy class or by providing a simple integer count.
Simple Retry Count
The simplest way to configure retries is by specifying an integer value directly. This value represents the maximum number of times a task will be re-attempted after its initial failure.
from your_task_framework import task
@task(retries=5)
def my_reliable_task():
"""
This task will retry up to 5 times if it fails.
"""
# Simulate a transient failure
import random
if random.random() < 0.7:
raise ValueError("Simulated transient error")
print("Task succeeded!")
Advanced Retry Strategy
For more granular control over retry behavior, use the RetryStrategy class. This class allows you to define not only the number of retries but also the backoff mechanism between attempts.
The RetryStrategy class accepts the following parameters:
count(int): The total number of retries to attempt. This is the primary control for how many times a task will be re-executed.backoff(float | timedelta | None): Specifies the maximum delay between retry attempts. When an exponential backoff factor is applied, the delay will not exceed this value. If provided as anint, it is interpreted as seconds.backoff_factor(int | float | None): An exponential factor applied to the delay between retries. For example, abackoff_factorof2will double the delay after each subsequent retry attempt, up to thebackofflimit.
Example: Exponential Backoff
Implementing an exponential backoff strategy is crucial for preventing retry storms that can overwhelm a failing service. This approach increases the delay between retries, giving the system more time to recover.
from your_task_framework import task
from your_task_framework.resilience import RetryStrategy
from datetime import timedelta
@task(retries=RetryStrategy(count=5, backoff=timedelta(minutes=1), backoff_factor=2))
def my_network_task():
"""
This task will retry up to 5 times.
The delay between retries will increase exponentially (factor of 2)
but will not exceed 1 minute.
"""
import requests
try:
response = requests.get("http://flaky-service.example.com/data")
response.raise_for_status()
print("Data fetched successfully!")
except requests.exceptions.RequestException as e:
print(f"Request failed: {e}. Retrying...")
raise # Re-raise to trigger retry
Best Practices for Retries:
- Idempotency: Ensure tasks are idempotent, meaning they can be executed multiple times without causing unintended side effects.
- Exponential Backoff: Always prefer exponential backoff with a jitter (randomized delay) to prevent thundering herd problems. The
backoffandbackoff_factorparameters facilitate this. - Limit Retries: Set a reasonable
countto avoid indefinite retries that can mask persistent issues or consume excessive resources. - Monitor Retries: Implement monitoring to track retry counts and identify tasks that consistently fail and retry, indicating a deeper problem.
Timeouts
Timeouts prevent tasks from running indefinitely or remaining in a queued state for too long, ensuring efficient resource utilization and predictable task execution. They define the maximum allowable duration for different stages of a task's lifecycle.
Timeouts are configured using the Timeout class.
The Timeout class accepts the following parameters:
max_runtime(timedelta | int): The maximum duration a task is allowed to execute in a single attempt. If the task exceeds this time, it is terminated. If provided as anint, it is interpreted as seconds.max_queued_time(timedelta | int | None): The maximum duration a task can remain in the queue before it starts execution. If the task is not picked up by a worker within this time, it is considered timed out and will not execute. If provided as anint, it is interpreted as seconds.
Example Usage:
from your_task_framework import task, env
from your_task_framework.resilience import Timeout
from datetime import timedelta
import asyncio
# Define a Timeout instance
short_timeout = Timeout(max_runtime=timedelta(seconds=30), max_queued_time=timedelta(minutes=5))
@env.task(timeout=short_timeout)
async def long_running_computation():
"""
This task will be terminated if it runs for more than 30 seconds
in any single attempt. It will also be discarded if it stays in the
queue for more than 5 minutes without starting.
"""
print("Starting long computation...")
try:
await asyncio.sleep(45) # Simulate a task that takes 45 seconds
print("Computation finished.")
except asyncio.CancelledError:
print("Task cancelled due to max_runtime timeout.")
raise # Re-raise to propagate cancellation
@env.task(timeout=Timeout(max_runtime=timedelta(hours=1)))
async def batch_processing_task():
"""
This task has a generous runtime limit of 1 hour, but no specific
queued time limit.
"""
print("Starting batch processing...")
await asyncio.sleep(3000) # Simulate a long-running batch job
print("Batch processing complete.")
Understanding max_runtime vs. max_queued_time:
max_runtime: This timeout applies to the active execution phase of a task. If a task is configured with retries,max_runtimeapplies to each individual attempt. For example, ifmax_runtimeis 30 seconds andretriesis 3, the task could potentially run for up to 3 * 30 = 90 seconds in total across all attempts.max_queued_time: This timeout applies to the period before a task begins its first execution attempt. It ensures that tasks do not languish in the queue indefinitely, especially during periods of high load or worker unavailability. Once a task starts executing,max_queued_timeis no longer relevant for that specific task instance.
Performance Considerations:
max_runtime: Setting an appropriatemax_runtimeis crucial for preventing runaway tasks that consume excessive CPU or memory. Too short a runtime might prematurely terminate legitimate long-running operations, while too long a runtime can delay detection of issues.max_queued_time: A longmax_queued_timecan lead to tasks consuming queue resources for extended periods, potentially impacting the processing of newer, more urgent tasks. Conversely, a very shortmax_queued_timemight cause tasks to be discarded before workers have a chance to pick them up during peak load.
Best Practices for Timeouts:
- Granular Control: Apply specific timeouts based on the expected duration and criticality of each task.
- Monitor Queue Times: Track the actual time tasks spend in the queue to fine-tune
max_queued_timeand identify potential bottlenecks in your worker infrastructure. - Graceful Shutdown: Design tasks to handle
asyncio.CancelledErroror similar termination signals gracefully, allowing them to clean up resources before being forcefully stopped by amax_runtimetimeout. - Combine with Retries: Use timeouts in conjunction with retries. A
max_runtimetimeout can trigger a retry if the task fails to complete within its allotted time, allowing a fresh attempt.