Execution Environments
Execution Environments provide a robust mechanism for defining and managing the runtime context of your code, particularly for tasks. They encapsulate configurations such as compute resources, container images, environment variables, and dependencies, ensuring consistent and reproducible execution across different stages of development and deployment.
Defining a Base Environment
The Environment class serves as the foundational blueprint for an execution context. It allows you to specify core properties that govern where and how your code runs.
When defining an environment, consider these key parameters:
name: A unique identifier for the environment. It must be insnake_caseorkebab-caseformat.image: The Docker image to use for the environment. Specify a full image name (e.g.,"my-repo/my-image:latest") or use"auto"to leverage a default image.resources: Defines the compute resources (CPU, memory, GPU) allocated to the environment. Use theResourcesobject to specify these limits and requests.env_vars: A dictionary of environment variables to set within the execution context.secrets: Specifies secrets to inject into the environment, typically for sensitive information like API keys. Use aSecretRequestobject.depends_on: A list of otherEnvironmentinstances that this environment depends on. This hints at deployment order, ensuring dependencies are available when the environment is deployed.pod_template: An optional KubernetesV1PodTemplateor a string reference to one, allowing fine-grained control over the underlying Kubernetes pod configuration.description: An optional human-readable description of the environment.
Example: Defining a basic environment
from typing import List, Dict, Optional, Union
from dataclasses import field, replace
from datetime import timedelta
import inspect
import weakref
# Assume these types are defined elsewhere in the package
class Resources:
def __init__(self, cpu: str = "1", memory: str = "1Gi"):
self.cpu = cpu
self.memory = memory
class Image:
def __init__(self, name: str):
self.name = name
class SecretRequest:
def __init__(self, key: str):
self.key = key
class Cache:
def __init__(self, enable: bool = True):
self.enable = enable
class ReusePolicy:
def __init__(self, concurrency: int = 1):
self.concurrency = concurrency
class RetryStrategy:
def __init__(self, retries: int = 0):
self.retries = retries
class Documentation:
def __init__(self, text: str):
self.text = text
class TaskTemplate:
def __init__(self, func, name, image, resources, cache, retries, timeout, reusable, docs, env_vars, secrets, pod_template, parent_env, interface, report, short_name, plugin_config, max_inline_io_bytes):
self.func = func
self.name = name
self.image = image
self.resources = resources
self.cache = cache
self.retries = retries
self.timeout = timeout
self.reusable = reusable
self.docs = docs
self.env_vars = env_vars
self.secrets = secrets
self.pod_template = pod_template
self.parent_env = parent_env
self.interface = interface
self.report = report
self.short_name = short_name
self.plugin_config = plugin_config
self.max_inline_io_bytes = max_inline_io_bytes
class AsyncFunctionTaskTemplate(TaskTemplate):
pass
class NativeInterface:
@staticmethod
def from_callable(func):
return "interface_from_callable"
# Mock external types and functions for demonstration
V1PodTemplate = object
Secret = object
CacheRequest = Union[str, Cache]
FunctionTypes = object
P = object
R = object
MAX_INLINE_IO_BYTES = 1024 * 1024 # 1MB
def is_snake_or_kebab_with_numbers(name: str) -> bool:
return True # Simplified for example
_ENVIRONMENT_REGISTRY = [] # Mock registry
class Environment:
name: str
depends_on: List["Environment"] = field(default_factory=list)
pod_template: Optional[Union[str, "V1PodTemplate"]] = None
description: Optional[str] = None
secrets: Optional[SecretRequest] = None
env_vars: Optional[Dict[str, str]] = None
resources: Optional[Resources] = None
image: Union[str, Image, Literal["auto"]] = "auto"
def __init__(self, name: str, image: Union[str, Image, Literal["auto"]] = "auto", resources: Optional[Resources] = None, env_vars: Optional[Dict[str, str]] = None, secrets: Optional[SecretRequest] = None, depends_on: Optional[List["Environment"]] = None, pod_template: Optional[Union[str, "V1PodTemplate"]] = None, description: Optional[str] = None):
self.name = name
self.image = image
self.resources = resources
self.env_vars = env_vars
self.secrets = secrets
self.depends_on = depends_on if depends_on is not None else []
self.pod_template = pod_template
self.description = description
self.__post_init__()
def __post_init__(self):
if not is_snake_or_kebab_with_numbers(self.name):
raise ValueError(f"Environment name '{self.name}' must be in snake_case or kebab-case format.")
if not isinstance(self.image, (Image, str)):
raise TypeError(f"Expected image to be of type str or Image, got {type(self.image)}")
if self.secrets and not isinstance(self.secrets, (str, Secret, List)):
raise TypeError(f"Expected secrets to be of type SecretRequest, got {type(self.secrets)}")
for dep in self.depends_on:
if not isinstance(dep, Environment):
raise TypeError(f"Expected depends_on to be of type List[Environment], got {type(dep)}")
if self.resources is not None and not isinstance(self.resources, Resources):
raise TypeError(f"Expected resources to be of type Resources, got {type(self.resources)}")
if self.env_vars is not None and not isinstance(self.env_vars, dict):
raise TypeError(f"Expected env_vars to be of type Dict[str, str], got {type(self.env_vars)}")
_ENVIRONMENT_REGISTRY.append(self)
def add_dependency(self, *env: "Environment"):
for e in env:
if not isinstance(e, Environment):
raise TypeError(f"Expected Environment, got {type(e)}")
if e.name == self.name:
raise ValueError("Cannot add self as a dependency")
if e in self.depends_on:
continue
self.depends_on.extend(env)
def clone_with(
self,
name: str,
image: Optional[Union[str, Image, Literal["auto"]]] = None,
resources: Optional[Resources] = None,
env_vars: Optional[Dict[str, str]] = None,
secrets: Optional[SecretRequest] = None,
depends_on: Optional[List["Environment"]] = None,
**kwargs: Any,
) -> "Environment":
raise NotImplementedError
def _get_kwargs(self) -> Dict[str, Any]:
kwargs: Dict[str, Any] = {
"depends_on": self.depends_on,
"image": self.image,
}
if self.resources is not None:
kwargs["resources"] = self.resources
if self.secrets is not None:
kwargs["secrets"] = self.secrets
if self.env_vars is not None:
kwargs["env_vars"] = self.env_vars
if self.pod_template is not None:
kwargs["pod_template"] = self.pod_template
if self.description is not None:
kwargs["description"] = self.description
return kwargs
from typing import cast, Literal, Any, Callable
class TaskEnvironment(Environment):
cache: CacheRequest = "disable"
reusable: ReusePolicy | None = None
plugin_config: Optional[Any] = None
_tasks: Dict[str, TaskTemplate] = field(default_factory=dict, init=False)
def __init__(self, name: str, image: Union[str, Image, Literal["auto"]] = "auto", resources: Optional[Resources] = None, env_vars: Optional[Dict[str, str]] = None, secrets: Optional[SecretRequest] = None, depends_on: Optional[List["Environment"]] = None, pod_template: Optional[Union[str, "V1PodTemplate"]] = None, description: Optional[str] = None, cache: CacheRequest = "disable", reusable: ReusePolicy | None = None, plugin_config: Optional[Any] = None):
super().__init__(name, image, resources, env_vars, secrets, depends_on, pod_template, description)
self.cache = cache
self.reusable = reusable
self.plugin_config = plugin_config
self.__post_init__() # Call super's post_init again, then this one
def __post_init__(self) -> None:
# super().__post_init__() # Already called by super().__init__
if self.reusable is not None and self.plugin_config is not None:
raise ValueError("Cannot set plugin_config when environment is reusable.")
if self.reusable and not isinstance(self.reusable, ReusePolicy):
raise TypeError(f"Expected reusable to be of type ReusePolicy, got {type(self.reusable)}")
if self.cache and not isinstance(self.cache, (str, Cache)):
raise TypeError(f"Expected cache to be of type str or Cache, got {type(self.cache)}")
def clone_with(
self,
name: str,
image: Optional[Union[str, Image, Literal["auto"]]] = None,
resources: Optional[Resources] = None,
env_vars: Optional[Dict[str, str]] = None,
secrets: Optional[SecretRequest] = None,
depends_on: Optional[List["Environment"]] = None,
**kwargs: Any,
) -> "TaskEnvironment":
cache = kwargs.pop("cache", None)
reusable = None
reusable_set = False
if "reusable" in kwargs:
reusable_set = True
reusable = kwargs.pop("reusable", None)
if kwargs:
raise TypeError(f"Unexpected keyword arguments: {list(kwargs.keys())}")
base_kwargs = self._get_kwargs()
base_kwargs["name"] = name
if image is not None:
base_kwargs["image"] = image
if resources is not None:
base_kwargs["resources"] = resources
if cache is not None:
base_kwargs["cache"] = cache
if env_vars is not None:
base_kwargs["env_vars"] = env_vars
if reusable_set:
base_kwargs["reusable"] = reusable
if secrets is not None:
base_kwargs["secrets"] = secrets
if depends_on is not None:
base_kwargs["depends_on"] = depends_on
# Manually handle TaskEnvironment specific fields not in _get_kwargs
if cache is not None:
base_kwargs["cache"] = cache
if reusable_set:
base_kwargs["reusable"] = reusable
if self.plugin_config is not None: # Preserve plugin_config if not overridden
base_kwargs["plugin_config"] = self.plugin_config
return replace(self, **base_kwargs)
def task(
self,
_func=None,
*,
short_name: Optional[str] = None,
cache: CacheRequest | None = None,
retries: Union[int, RetryStrategy] = 0,
timeout: Union[timedelta, int] = 0,
docs: Optional[Documentation] = None,
pod_template: Optional[Union[str, "V1PodTemplate"]] = None,
report: bool = False,
max_inline_io_bytes: int = MAX_INLINE_IO_BYTES,
) -> Union[AsyncFunctionTaskTemplate, Callable[P, R]]:
def decorator(func: FunctionTypes) -> AsyncFunctionTaskTemplate[P, R]:
short = short_name or func.__name__
task_name = self.name + "." + func.__name__
if not inspect.iscoroutinefunction(func) and self.reusable is not None:
if self.reusable.concurrency > 1:
raise ValueError(
"Reusable environments with concurrency greater than 1 are only supported for async tasks. "
"Please use an async function or set concurrency to 1."
)
task_template_class: type[AsyncFunctionTaskTemplate[P, R]] | None = AsyncFunctionTaskTemplate[P, R] # Simplified plugin registry lookup
task_template_class = cast(type[AsyncFunctionTaskTemplate[P, R]], task_template_class)
tmpl = task_template_class(
func=func,
name=task_name,
image=self.image,
resources=self.resources,
cache=cache or self.cache,
retries=retries,
timeout=timeout,
reusable=self.reusable,
docs=docs,
env_vars=self.env_vars,
secrets=self.secrets,
pod_template=pod_template or self.pod_template,
parent_env=weakref.ref(self),
interface=NativeInterface.from_callable(func),
report=report,
short_name=short,
plugin_config=self.plugin_config,
max_inline_io_bytes=max_inline_io_bytes,
)
self._tasks[task_name] = tmpl
return tmpl
if _func is None:
return cast(AsyncFunctionTaskTemplate, decorator)
return cast(AsyncFunctionTaskTemplate, decorator(_func))
@property
def tasks(self) -> Dict[str, TaskTemplate]:
return self._tasks
def add_task(self, task: TaskTemplate) -> TaskTemplate:
if task.name in self._tasks:
raise ValueError(f"Task {task.name} already exists in the environment. Task names should be unique.")
self._tasks[task.name] = task
task.parent_env = weakref.ref(self)
return task
my_base_env = Environment(
name="my-base-env",
image="ubuntu:22.04",
resources=Resources(cpu="500m", memory="512Mi"),
env_vars={"DEBUG": "true"},
description="A base environment for general utilities."
)
You can manage dependencies between environments using add_dependency:
db_env = Environment(name="database-env", image="postgres:14")
app_env = Environment(name="application-env", image="my-app:latest")
app_env.add_dependency(db_env)
The clone_with method is not implemented for the base Environment class.
Task-Specific Environments
The TaskEnvironment class extends Environment by adding capabilities specifically tailored for defining and executing tasks. It inherits all parameters from Environment and introduces:
cache: The default cache policy for tasks within this environment. Can be"disable"or aCacheobject.reusable: AReusePolicyobject that dictates if and how Python processes can be reused across multiple task executions. This can significantly optimize performance for certain workloads.plugin_config: An optional configuration object for custom task plugins, allowing integration with specialized execution backends.
Example: Defining a task environment
from dataclasses import dataclass
@dataclass
class MyPluginConfig:
param: str = "default"
my_task_env = TaskEnvironment(
name="data-processing-env",
image="my-data-image:1.0",
resources=Resources(cpu="1", memory="2Gi"),
cache="enable", # Enable caching by default for tasks in this environment
reusable=ReusePolicy(concurrency=4), # Allow reusing processes with 4 concurrent tasks
plugin_config=MyPluginConfig(param="custom_value"),
description="Environment for data processing tasks with caching and reuse."
)
Associating Tasks with an Environment
The primary way to use TaskEnvironment is through its task decorator, which transforms a Python function into an executable task within that environment.
import asyncio
@my_task_env.task
async def preprocess_data(input_path: str) -> str:
"""
Preprocesses raw data from the given input path.
"""
print(f"Preprocessing data from {input_path} in {my_task_env.name}")
await asyncio.sleep(1) # Simulate async work
output_path = f"{input_path}_processed"
return output_path
@my_task_env.task(cache="disable", retries=3, timeout=timedelta(minutes=5))
def analyze_results(processed_data_path: str):
"""
Analyzes the processed data. This task overrides the environment's cache policy.
"""
print(f"Analyzing results from {processed_data_path} in {my_task_env.name}")
# Simulate synchronous work
return "analysis_report.txt"
# Accessing tasks defined in the environment
print(f"Tasks in {my_task_env.name}: {list(my_task_env.tasks.keys())}")
# Output: Tasks in data-processing-env: ['data-processing-env.preprocess_data', 'data-processing-env.analyze_results']
The task decorator allows for task-specific overrides of environment parameters, such as cache, retries, timeout, docs, pod_template, report, and max_inline_io_bytes. These task-level parameters take precedence over the environment's default settings.
Advanced Configuration
Resource Allocation
Define Resources to specify CPU and memory requests and limits for your environment. This ensures your tasks receive adequate compute power and prevents resource exhaustion.
high_resource_env = TaskEnvironment(
name="heavy-compute-env",
image="my-ml-image:gpu",
resources=Resources(cpu="4", memory="16Gi", gpu="1"), # Assuming GPU is supported by Resources
description="Environment for GPU-accelerated machine learning tasks."
)
Environment Variables and Secrets
Use env_vars for general configuration and secrets for sensitive data. Secrets are injected securely at runtime.
secure_env = TaskEnvironment(
name="secure-api-env",
image="my-api-image:latest",
env_vars={"API_VERSION": "v2"},
secrets=SecretRequest(key="my-api-key"), # Assumes SecretRequest handles fetching the secret
description="Environment for API services with environment variables and secrets."
)
Environment Dependencies
The depends_on parameter allows you to declare a logical dependency between environments. When an environment with dependencies is deployed, the system ensures its dependent environments are also deployed. This is crucial for orchestrating complex applications.
data_source_env = Environment(name="data-source", image="my-data-source:latest")
etl_env = TaskEnvironment(
name="etl-pipeline",
image="my-etl-image:latest",
depends_on=[data_source_env],
description="ETL environment that depends on a data source."
)
Caching
TaskEnvironment and individual tasks support caching. When cache is enabled, the results of a task are stored and reused if the inputs and task definition remain the same, significantly speeding up repeated executions.
# Environment with caching enabled by default
cached_env = TaskEnvironment(name="cached-tasks", image="python:3.9", cache="enable")
@cached_env.task
def expensive_calculation(x: int, y: int) -> int:
# This task's results will be cached
return x + y
# Task that explicitly disables caching, overriding the environment's default
@cached_env.task(cache="disable")
def non_cached_operation(data: str) -> str:
return data.upper()
Reusable Environments
The reusable policy in TaskEnvironment allows the underlying Python process to be kept alive and reused for multiple task executions. This reduces startup overhead and is particularly beneficial for tasks with high execution frequency or long initialization times.
Important Considerations for Reusable Environments:
- Asynchronous Tasks: Reusable environments with a
concurrencygreater than 1 are only supported forasyncfunctions. For synchronous tasks,concurrencymust be 1. pod_templateRestriction: You cannot set apod_templateon a task if its environment is marked asreusable.plugin_configRestriction: You cannot setplugin_configwhen an environment isreusable.
async_reusable_env = TaskEnvironment(
name="async-reusable-env",
image="python:3.10-slim",
reusable=ReusePolicy(concurrency=8), # Allows up to 8 concurrent async tasks in the same process
description="Environment for highly concurrent async tasks."
)
@async_reusable_env.task
async def process_item(item_id: str) -> str:
await asyncio.sleep(0.1) # Simulate quick async processing
return f"Processed {item_id}"
Custom Task Plugins
The plugin_config parameter allows you to integrate custom task types by providing a configuration object. The system uses this configuration to find and instantiate the appropriate task plugin.
# Assuming a custom plugin is registered for MyPluginConfig
custom_plugin_env = TaskEnvironment(
name="custom-task-env",
image="my-custom-plugin-image:latest",
plugin_config=MyPluginConfig(param="special_setting"),
description="Environment for tasks using a custom plugin."
)
@custom_plugin_env.task
def run_custom_job():
print("Running a job with custom plugin configuration.")
Managing Environments
Cloning Environments
The TaskEnvironment class provides a clone_with method to create a new environment instance based on an existing one, allowing you to override specific parameters. This is useful for creating variations of an environment without redefining everything.
dev_env = TaskEnvironment(name="dev-env", image="my-app:dev", resources=Resources(cpu="500m"))
prod_env = dev_env.clone_with(
name="prod-env",
image="my-app:prod",
resources=Resources(cpu="2", memory="4Gi"),
cache="enable"
)
print(f"Dev Env Image: {dev_env.image}, Prod Env Image: {prod_env.image}")
print(f"Dev Env CPU: {dev_env.resources.cpu}, Prod Env CPU: {prod_env.resources.cpu}")
Adding Tasks Dynamically
While the task decorator is the primary way to associate functions, you can also programmatically add TaskTemplate instances to an environment using add_task. This is useful for scenarios where tasks are generated or loaded dynamically.
dynamic_env = TaskEnvironment(name="dynamic-env", image="python:3.9")
def my_dynamic_func():
print("This is a dynamically added task.")
# Create a TaskTemplate instance (simplified for example)
dynamic_task_template = AsyncFunctionTaskTemplate(
func=my_dynamic_func,
name="dynamic-env.my_dynamic_task",
image=dynamic_env.image,
resources=dynamic_env.resources,
cache="disable", retries=0, timeout=0, reusable=None, docs=None, env_vars=None, secrets=None, pod_template=None, parent_env=None, interface=None, report=False, short_name="my_dynamic_task", plugin_config=None, max_inline_io_bytes=MAX_INLINE_IO_BYTES
)
dynamic_env.add_task(dynamic_task_template)
print(f"Tasks in dynamic_env: {list(dynamic_env.tasks.keys())}")
Best Practices and Considerations
- Granularity: Define environments at a logical level. A single environment might encompass all tasks for a specific service or a set of related data processing steps. Avoid creating an environment for every single task unless its requirements are truly unique.
- Naming Conventions: Adhere to
snake_caseorkebab-casefor environment names for consistency and compatibility. - Image Management: Use versioned Docker images (e.g.,
my-image:1.0.0) to ensure reproducibility. Avoidlatestin production environments. - Resource Allocation: Start with reasonable resource requests and monitor usage. Over-provisioning wastes resources, while under-provisioning can lead to performance issues or task failures.
- Security: Always use
secretsfor sensitive information. Avoid hardcoding credentials or passing them via plainenv_vars. - Reusable Environments: Leverage reusable environments for performance-critical async tasks, but be mindful of the restrictions on
pod_templateandplugin_config. Ensure your async tasks are designed to be stateless or manage state carefully across invocations within a reused process. - Dependencies: Clearly define
depends_onrelationships to ensure correct deployment order and operational stability.