Interacting with Tasks
Interacting with Tasks
Tasks are fundamental units of computation. This section details how to retrieve, inspect, execute, and modify task properties.
Retrieving Tasks
You can retrieve tasks either individually by their identifier or by listing multiple tasks based on various filters.
Retrieving a Specific Task
To retrieve a specific task, use the Task.get class method. This method returns a LazyEntity that defers fetching the full task details until they are needed.
from your_package import Task
# Retrieve a task by its name and a specific version
my_task_v1 = Task.get(name="my_data_processing_task", project="my_project", domain="development", version="v1.0.0")
# Retrieve the latest version of a task
latest_task = Task.get(name="my_data_processing_task", project="my_project", domain="development", auto_version="latest")
# Retrieve the current version of a task within a task context
# This is useful for deploying all environments with the same version.
# This call will only succeed when executed inside another task.
current_task = Task.get(name="my_data_processing_task", auto_version="current")
Parameters for Task.get:
name(str): The name of the task.project(str, optional): The project the task belongs to. IfNone, the current project from the configuration is used.domain(str, optional): The domain the task belongs to. IfNone, the current domain from the configuration is used.version(str, optional): The specific version of the task.auto_version(Literal["latest", "current"], optional):"latest": Retrieves the most recently created version of the task."current": Retrieves the version derived from the calling task's context. This option is only valid whenTask.getis called from within another task.
Important Considerations:
- Either
versionorauto_versionmust be provided. - Using
auto_version="current"outside of a task context will raise aValueError.
Listing All Tasks
To retrieve a list of tasks, use the Task.listall class method. This method returns an asynchronous iterator of Task objects, allowing efficient retrieval of potentially large numbers of tasks. The method is also syncify-decorated, meaning it can be called directly in synchronous contexts, which will internally await the asynchronous operation.
from your_package import Task
# List all tasks in the current project and domain
for task in Task.listall():
print(f"Task: {task.name} (Version: {task.version})")
# List tasks by a specific name
for task in Task.listall(by_task_name="my_data_processing_task"):
print(f"Found task: {task.name} v{task.version}")
# List tasks by environment prefix and sort by creation date descending
for task in Task.listall(by_task_env="staging", sort_by=("created_at", "desc"), limit=5):
print(f"Staging task: {task.name} v{task.version}")
Parameters for Task.listall:
by_task_name(str, optional): Filters tasks by an exact name match.by_task_env(str, optional): Filters tasks by an environment prefix in their name (e.g., "staging." will match "staging.my_task").project(str, optional): The project to filter tasks by. IfNone, the current project is used.domain(str, optional): The domain to filter tasks by. IfNone, the current domain is used.sort_by(Tuple[str, Literal["asc", "desc"]], optional): A tuple specifying the field to sort by (e.g., "created_at") and the sort order ("asc" for ascending, "desc" for descending). Defaults to("created_at", "asc").limit(int): The maximum number of tasks to return. Defaults to 100.
Accessing Task Details
When you retrieve a task using Task.get, you receive a LazyEntity object. This object defers the actual network call to fetch TaskDetails until you access one of its properties or attempt to execute it. Once fetched, the LazyEntity behaves like a TaskDetails object, providing comprehensive metadata about the task.
from your_package import Task
my_task_entity = Task.get(name="my_data_processing_task", project="my_project", domain="development", version="v1.0.0")
# Accessing a property triggers the fetch operation
print(f"Task Name: {my_task_entity.name}")
print(f"Task Version: {my_task_entity.version}")
print(f"Task Type: {my_task_entity.task_type}")
# Accessing input arguments
print(f"Required Arguments: {my_task_entity.required_args}")
print(f"Default Input Arguments: {my_task_entity.default_input_args}")
# Inspecting cache policy
cache_policy = my_task_entity.cache
print(f"Cache Behavior: {cache_policy.behavior}")
print(f"Cache Version Override: {cache_policy.version_override}")
# Viewing secrets and resources
print(f"Secrets: {my_task_entity.secrets}")
print(f"Resources: {my_task_entity.resources}")
Key Properties of TaskDetails:
name(str): The name of the task.version(str): The version of the task.task_type(str): The underlying type of the task (e.g., "python-task", "container").default_input_args(Tuple[str, ...]): A tuple of input argument names that have default values.required_args(Tuple[str, ...]): A tuple of input argument names that are required and do not have default values.interface: Provides detailed information about the task's inputs and outputs, including their types.cache: ACacheobject detailing the task's caching policy, including discoverability, version override, and ignored inputs.secrets: A list of secret keys configured for the task.resources: A tuple containing resource requests and limits (e.g., CPU, memory) for the task's container.
Executing Tasks
You can execute a task by calling the LazyEntity or TaskDetails object directly, passing the required input arguments as keyword arguments. When executed within a task context, this action submits the task to the controller for execution.
from your_package import Task
my_task_entity = Task.get(name="my_data_processing_task", project="my_project", domain="development", version="v1.0.0")
# Execute the task with keyword arguments
# This call will submit the task for execution if in a task context.
result = await my_task_entity(input_data="path/to/data.csv", output_prefix="s3://my-bucket/results")
print(f"Task execution initiated, result: {result}")
Important Considerations:
- Task execution currently only supports keyword arguments. Positional arguments will raise an error.
- The behavior of
__call__depends on the execution context. If called within another task, it will submit a sub-task. If called outside a task context, it will raise an error.
Overriding Task Properties
You can override certain properties of a task, such as its short name, resource requirements, retry strategy, timeout, environment variables, and secrets. This allows for dynamic adjustments to task behavior without modifying the original task definition. The override method modifies the local TaskDetails object and returns self, allowing for chaining.
from your_package import Task
from your_package import Resources, RetryStrategy, SecretRequest
my_task_entity = Task.get(name="my_data_processing_task", project="my_project", domain="development", version="v1.0.0")
# Override resources and add environment variables
overridden_task = my_task_entity.override(
resources=Resources(cpu="2", mem="4Gi"),
env_vars={"DEBUG_MODE": "true", "LOG_LEVEL": "INFO"},
retries=RetryStrategy(retries=3),
timeout="5m", # 5 minutes
secrets=SecretRequest(group="my_secret_group", key="api_key")
)
# The overridden_task is the same LazyEntity instance, but its internal TaskDetails
# has been modified.
print(f"Overridden Task Resources: {overridden_task.resources}")
print(f"Overridden Task Retries: {overridden_task.pb2.spec.task_template.metadata.retries.retries}")
# Now execute the task with the overridden properties
# result = await overridden_task(input_data="...", output_prefix="...")
Parameters for override:
short_name(str, optional): A shorter, more human-readable name for the task.resources(Resources, optional): Specifies CPU, memory, and GPU requirements.retries(Union[int, RetryStrategy], optional): The number of retries or a detailed retry strategy.timeout(TimeoutType, optional): The maximum duration for the task to run. Can be an integer (seconds) or a string (e.g., "5m", "1h").env_vars(Dict[str, str], optional): A dictionary of environment variables to set for the task's container.secrets(SecretRequest, optional): Specifies secrets to be mounted into the task's container.
Important Considerations:
- The
overridemethod modifies the properties of theTaskDetailsobject in memory. It does not alter the remote task definition. - The
LazyEntity.overridemethod first fetches theTaskDetailsif it hasn't already, then applies the overrides.