Skip to main content

Execution Environments

Execution Environments provide a robust mechanism for defining and managing the runtime context of your code, particularly for tasks. They encapsulate configurations such as compute resources, container images, environment variables, and dependencies, ensuring consistent and reproducible execution across different stages of development and deployment.

Defining a Base Environment

The Environment class serves as the foundational blueprint for an execution context. It allows you to specify core properties that govern where and how your code runs.

When defining an environment, consider these key parameters:

  • name: A unique identifier for the environment. It must be in snake_case or kebab-case format.
  • image: The Docker image to use for the environment. Specify a full image name (e.g., "my-repo/my-image:latest") or use "auto" to leverage a default image.
  • resources: Defines the compute resources (CPU, memory, GPU) allocated to the environment. Use the Resources object to specify these limits and requests.
  • env_vars: A dictionary of environment variables to set within the execution context.
  • secrets: Specifies secrets to inject into the environment, typically for sensitive information like API keys. Use a SecretRequest object.
  • depends_on: A list of other Environment instances that this environment depends on. This hints at deployment order, ensuring dependencies are available when the environment is deployed.
  • pod_template: An optional Kubernetes V1PodTemplate or a string reference to one, allowing fine-grained control over the underlying Kubernetes pod configuration.
  • description: An optional human-readable description of the environment.

Example: Defining a basic environment

from typing import List, Dict, Optional, Union
from dataclasses import field, replace
from datetime import timedelta
import inspect
import weakref

# Assume these types are defined elsewhere in the package
class Resources:
def __init__(self, cpu: str = "1", memory: str = "1Gi"):
self.cpu = cpu
self.memory = memory

class Image:
def __init__(self, name: str):
self.name = name

class SecretRequest:
def __init__(self, key: str):
self.key = key

class Cache:
def __init__(self, enable: bool = True):
self.enable = enable

class ReusePolicy:
def __init__(self, concurrency: int = 1):
self.concurrency = concurrency

class RetryStrategy:
def __init__(self, retries: int = 0):
self.retries = retries

class Documentation:
def __init__(self, text: str):
self.text = text

class TaskTemplate:
def __init__(self, func, name, image, resources, cache, retries, timeout, reusable, docs, env_vars, secrets, pod_template, parent_env, interface, report, short_name, plugin_config, max_inline_io_bytes):
self.func = func
self.name = name
self.image = image
self.resources = resources
self.cache = cache
self.retries = retries
self.timeout = timeout
self.reusable = reusable
self.docs = docs
self.env_vars = env_vars
self.secrets = secrets
self.pod_template = pod_template
self.parent_env = parent_env
self.interface = interface
self.report = report
self.short_name = short_name
self.plugin_config = plugin_config
self.max_inline_io_bytes = max_inline_io_bytes

class AsyncFunctionTaskTemplate(TaskTemplate):
pass

class NativeInterface:
@staticmethod
def from_callable(func):
return "interface_from_callable"

# Mock external types and functions for demonstration
V1PodTemplate = object
Secret = object
CacheRequest = Union[str, Cache]
FunctionTypes = object
P = object
R = object
MAX_INLINE_IO_BYTES = 1024 * 1024 # 1MB

def is_snake_or_kebab_with_numbers(name: str) -> bool:
return True # Simplified for example

_ENVIRONMENT_REGISTRY = [] # Mock registry

class Environment:
name: str
depends_on: List["Environment"] = field(default_factory=list)
pod_template: Optional[Union[str, "V1PodTemplate"]] = None
description: Optional[str] = None
secrets: Optional[SecretRequest] = None
env_vars: Optional[Dict[str, str]] = None
resources: Optional[Resources] = None
image: Union[str, Image, Literal["auto"]] = "auto"

def __init__(self, name: str, image: Union[str, Image, Literal["auto"]] = "auto", resources: Optional[Resources] = None, env_vars: Optional[Dict[str, str]] = None, secrets: Optional[SecretRequest] = None, depends_on: Optional[List["Environment"]] = None, pod_template: Optional[Union[str, "V1PodTemplate"]] = None, description: Optional[str] = None):
self.name = name
self.image = image
self.resources = resources
self.env_vars = env_vars
self.secrets = secrets
self.depends_on = depends_on if depends_on is not None else []
self.pod_template = pod_template
self.description = description
self.__post_init__()

def __post_init__(self):
if not is_snake_or_kebab_with_numbers(self.name):
raise ValueError(f"Environment name '{self.name}' must be in snake_case or kebab-case format.")
if not isinstance(self.image, (Image, str)):
raise TypeError(f"Expected image to be of type str or Image, got {type(self.image)}")
if self.secrets and not isinstance(self.secrets, (str, Secret, List)):
raise TypeError(f"Expected secrets to be of type SecretRequest, got {type(self.secrets)}")
for dep in self.depends_on:
if not isinstance(dep, Environment):
raise TypeError(f"Expected depends_on to be of type List[Environment], got {type(dep)}")
if self.resources is not None and not isinstance(self.resources, Resources):
raise TypeError(f"Expected resources to be of type Resources, got {type(self.resources)}")
if self.env_vars is not None and not isinstance(self.env_vars, dict):
raise TypeError(f"Expected env_vars to be of type Dict[str, str], got {type(self.env_vars)}")
_ENVIRONMENT_REGISTRY.append(self)

def add_dependency(self, *env: "Environment"):
for e in env:
if not isinstance(e, Environment):
raise TypeError(f"Expected Environment, got {type(e)}")
if e.name == self.name:
raise ValueError("Cannot add self as a dependency")
if e in self.depends_on:
continue
self.depends_on.extend(env)

def clone_with(
self,
name: str,
image: Optional[Union[str, Image, Literal["auto"]]] = None,
resources: Optional[Resources] = None,
env_vars: Optional[Dict[str, str]] = None,
secrets: Optional[SecretRequest] = None,
depends_on: Optional[List["Environment"]] = None,
**kwargs: Any,
) -> "Environment":
raise NotImplementedError

def _get_kwargs(self) -> Dict[str, Any]:
kwargs: Dict[str, Any] = {
"depends_on": self.depends_on,
"image": self.image,
}
if self.resources is not None:
kwargs["resources"] = self.resources
if self.secrets is not None:
kwargs["secrets"] = self.secrets
if self.env_vars is not None:
kwargs["env_vars"] = self.env_vars
if self.pod_template is not None:
kwargs["pod_template"] = self.pod_template
if self.description is not None:
kwargs["description"] = self.description
return kwargs

from typing import cast, Literal, Any, Callable

class TaskEnvironment(Environment):
cache: CacheRequest = "disable"
reusable: ReusePolicy | None = None
plugin_config: Optional[Any] = None

_tasks: Dict[str, TaskTemplate] = field(default_factory=dict, init=False)

def __init__(self, name: str, image: Union[str, Image, Literal["auto"]] = "auto", resources: Optional[Resources] = None, env_vars: Optional[Dict[str, str]] = None, secrets: Optional[SecretRequest] = None, depends_on: Optional[List["Environment"]] = None, pod_template: Optional[Union[str, "V1PodTemplate"]] = None, description: Optional[str] = None, cache: CacheRequest = "disable", reusable: ReusePolicy | None = None, plugin_config: Optional[Any] = None):
super().__init__(name, image, resources, env_vars, secrets, depends_on, pod_template, description)
self.cache = cache
self.reusable = reusable
self.plugin_config = plugin_config
self.__post_init__() # Call super's post_init again, then this one

def __post_init__(self) -> None:
# super().__post_init__() # Already called by super().__init__
if self.reusable is not None and self.plugin_config is not None:
raise ValueError("Cannot set plugin_config when environment is reusable.")
if self.reusable and not isinstance(self.reusable, ReusePolicy):
raise TypeError(f"Expected reusable to be of type ReusePolicy, got {type(self.reusable)}")
if self.cache and not isinstance(self.cache, (str, Cache)):
raise TypeError(f"Expected cache to be of type str or Cache, got {type(self.cache)}")

def clone_with(
self,
name: str,
image: Optional[Union[str, Image, Literal["auto"]]] = None,
resources: Optional[Resources] = None,
env_vars: Optional[Dict[str, str]] = None,
secrets: Optional[SecretRequest] = None,
depends_on: Optional[List["Environment"]] = None,
**kwargs: Any,
) -> "TaskEnvironment":
cache = kwargs.pop("cache", None)
reusable = None
reusable_set = False
if "reusable" in kwargs:
reusable_set = True
reusable = kwargs.pop("reusable", None)

if kwargs:
raise TypeError(f"Unexpected keyword arguments: {list(kwargs.keys())}")

base_kwargs = self._get_kwargs()
base_kwargs["name"] = name
if image is not None:
base_kwargs["image"] = image
if resources is not None:
base_kwargs["resources"] = resources
if cache is not None:
base_kwargs["cache"] = cache
if env_vars is not None:
base_kwargs["env_vars"] = env_vars
if reusable_set:
base_kwargs["reusable"] = reusable
if secrets is not None:
base_kwargs["secrets"] = secrets
if depends_on is not None:
base_kwargs["depends_on"] = depends_on

# Manually handle TaskEnvironment specific fields not in _get_kwargs
if cache is not None:
base_kwargs["cache"] = cache
if reusable_set:
base_kwargs["reusable"] = reusable
if self.plugin_config is not None: # Preserve plugin_config if not overridden
base_kwargs["plugin_config"] = self.plugin_config

return replace(self, **base_kwargs)

def task(
self,
_func=None,
*,
short_name: Optional[str] = None,
cache: CacheRequest | None = None,
retries: Union[int, RetryStrategy] = 0,
timeout: Union[timedelta, int] = 0,
docs: Optional[Documentation] = None,
pod_template: Optional[Union[str, "V1PodTemplate"]] = None,
report: bool = False,
max_inline_io_bytes: int = MAX_INLINE_IO_BYTES,
) -> Union[AsyncFunctionTaskTemplate, Callable[P, R]]:
def decorator(func: FunctionTypes) -> AsyncFunctionTaskTemplate[P, R]:
short = short_name or func.__name__
task_name = self.name + "." + func.__name__

if not inspect.iscoroutinefunction(func) and self.reusable is not None:
if self.reusable.concurrency > 1:
raise ValueError(
"Reusable environments with concurrency greater than 1 are only supported for async tasks. "
"Please use an async function or set concurrency to 1."
)

task_template_class: type[AsyncFunctionTaskTemplate[P, R]] | None = AsyncFunctionTaskTemplate[P, R] # Simplified plugin registry lookup

task_template_class = cast(type[AsyncFunctionTaskTemplate[P, R]], task_template_class)
tmpl = task_template_class(
func=func,
name=task_name,
image=self.image,
resources=self.resources,
cache=cache or self.cache,
retries=retries,
timeout=timeout,
reusable=self.reusable,
docs=docs,
env_vars=self.env_vars,
secrets=self.secrets,
pod_template=pod_template or self.pod_template,
parent_env=weakref.ref(self),
interface=NativeInterface.from_callable(func),
report=report,
short_name=short,
plugin_config=self.plugin_config,
max_inline_io_bytes=max_inline_io_bytes,
)
self._tasks[task_name] = tmpl
return tmpl

if _func is None:
return cast(AsyncFunctionTaskTemplate, decorator)
return cast(AsyncFunctionTaskTemplate, decorator(_func))

@property
def tasks(self) -> Dict[str, TaskTemplate]:
return self._tasks

def add_task(self, task: TaskTemplate) -> TaskTemplate:
if task.name in self._tasks:
raise ValueError(f"Task {task.name} already exists in the environment. Task names should be unique.")
self._tasks[task.name] = task
task.parent_env = weakref.ref(self)
return task

my_base_env = Environment(
name="my-base-env",
image="ubuntu:22.04",
resources=Resources(cpu="500m", memory="512Mi"),
env_vars={"DEBUG": "true"},
description="A base environment for general utilities."
)

You can manage dependencies between environments using add_dependency:

db_env = Environment(name="database-env", image="postgres:14")
app_env = Environment(name="application-env", image="my-app:latest")
app_env.add_dependency(db_env)

The clone_with method is not implemented for the base Environment class.

Task-Specific Environments

The TaskEnvironment class extends Environment by adding capabilities specifically tailored for defining and executing tasks. It inherits all parameters from Environment and introduces:

  • cache: The default cache policy for tasks within this environment. Can be "disable" or a Cache object.
  • reusable: A ReusePolicy object that dictates if and how Python processes can be reused across multiple task executions. This can significantly optimize performance for certain workloads.
  • plugin_config: An optional configuration object for custom task plugins, allowing integration with specialized execution backends.

Example: Defining a task environment

from dataclasses import dataclass

@dataclass
class MyPluginConfig:
param: str = "default"

my_task_env = TaskEnvironment(
name="data-processing-env",
image="my-data-image:1.0",
resources=Resources(cpu="1", memory="2Gi"),
cache="enable", # Enable caching by default for tasks in this environment
reusable=ReusePolicy(concurrency=4), # Allow reusing processes with 4 concurrent tasks
plugin_config=MyPluginConfig(param="custom_value"),
description="Environment for data processing tasks with caching and reuse."
)

Associating Tasks with an Environment

The primary way to use TaskEnvironment is through its task decorator, which transforms a Python function into an executable task within that environment.

import asyncio

@my_task_env.task
async def preprocess_data(input_path: str) -> str:
"""
Preprocesses raw data from the given input path.
"""
print(f"Preprocessing data from {input_path} in {my_task_env.name}")
await asyncio.sleep(1) # Simulate async work
output_path = f"{input_path}_processed"
return output_path

@my_task_env.task(cache="disable", retries=3, timeout=timedelta(minutes=5))
def analyze_results(processed_data_path: str):
"""
Analyzes the processed data. This task overrides the environment's cache policy.
"""
print(f"Analyzing results from {processed_data_path} in {my_task_env.name}")
# Simulate synchronous work
return "analysis_report.txt"

# Accessing tasks defined in the environment
print(f"Tasks in {my_task_env.name}: {list(my_task_env.tasks.keys())}")
# Output: Tasks in data-processing-env: ['data-processing-env.preprocess_data', 'data-processing-env.analyze_results']

The task decorator allows for task-specific overrides of environment parameters, such as cache, retries, timeout, docs, pod_template, report, and max_inline_io_bytes. These task-level parameters take precedence over the environment's default settings.

Advanced Configuration

Resource Allocation

Define Resources to specify CPU and memory requests and limits for your environment. This ensures your tasks receive adequate compute power and prevents resource exhaustion.

high_resource_env = TaskEnvironment(
name="heavy-compute-env",
image="my-ml-image:gpu",
resources=Resources(cpu="4", memory="16Gi", gpu="1"), # Assuming GPU is supported by Resources
description="Environment for GPU-accelerated machine learning tasks."
)

Environment Variables and Secrets

Use env_vars for general configuration and secrets for sensitive data. Secrets are injected securely at runtime.

secure_env = TaskEnvironment(
name="secure-api-env",
image="my-api-image:latest",
env_vars={"API_VERSION": "v2"},
secrets=SecretRequest(key="my-api-key"), # Assumes SecretRequest handles fetching the secret
description="Environment for API services with environment variables and secrets."
)

Environment Dependencies

The depends_on parameter allows you to declare a logical dependency between environments. When an environment with dependencies is deployed, the system ensures its dependent environments are also deployed. This is crucial for orchestrating complex applications.

data_source_env = Environment(name="data-source", image="my-data-source:latest")
etl_env = TaskEnvironment(
name="etl-pipeline",
image="my-etl-image:latest",
depends_on=[data_source_env],
description="ETL environment that depends on a data source."
)

Caching

TaskEnvironment and individual tasks support caching. When cache is enabled, the results of a task are stored and reused if the inputs and task definition remain the same, significantly speeding up repeated executions.

# Environment with caching enabled by default
cached_env = TaskEnvironment(name="cached-tasks", image="python:3.9", cache="enable")

@cached_env.task
def expensive_calculation(x: int, y: int) -> int:
# This task's results will be cached
return x + y

# Task that explicitly disables caching, overriding the environment's default
@cached_env.task(cache="disable")
def non_cached_operation(data: str) -> str:
return data.upper()

Reusable Environments

The reusable policy in TaskEnvironment allows the underlying Python process to be kept alive and reused for multiple task executions. This reduces startup overhead and is particularly beneficial for tasks with high execution frequency or long initialization times.

Important Considerations for Reusable Environments:

  • Asynchronous Tasks: Reusable environments with a concurrency greater than 1 are only supported for async functions. For synchronous tasks, concurrency must be 1.
  • pod_template Restriction: You cannot set a pod_template on a task if its environment is marked as reusable.
  • plugin_config Restriction: You cannot set plugin_config when an environment is reusable.
async_reusable_env = TaskEnvironment(
name="async-reusable-env",
image="python:3.10-slim",
reusable=ReusePolicy(concurrency=8), # Allows up to 8 concurrent async tasks in the same process
description="Environment for highly concurrent async tasks."
)

@async_reusable_env.task
async def process_item(item_id: str) -> str:
await asyncio.sleep(0.1) # Simulate quick async processing
return f"Processed {item_id}"

Custom Task Plugins

The plugin_config parameter allows you to integrate custom task types by providing a configuration object. The system uses this configuration to find and instantiate the appropriate task plugin.

# Assuming a custom plugin is registered for MyPluginConfig
custom_plugin_env = TaskEnvironment(
name="custom-task-env",
image="my-custom-plugin-image:latest",
plugin_config=MyPluginConfig(param="special_setting"),
description="Environment for tasks using a custom plugin."
)

@custom_plugin_env.task
def run_custom_job():
print("Running a job with custom plugin configuration.")

Managing Environments

Cloning Environments

The TaskEnvironment class provides a clone_with method to create a new environment instance based on an existing one, allowing you to override specific parameters. This is useful for creating variations of an environment without redefining everything.

dev_env = TaskEnvironment(name="dev-env", image="my-app:dev", resources=Resources(cpu="500m"))
prod_env = dev_env.clone_with(
name="prod-env",
image="my-app:prod",
resources=Resources(cpu="2", memory="4Gi"),
cache="enable"
)

print(f"Dev Env Image: {dev_env.image}, Prod Env Image: {prod_env.image}")
print(f"Dev Env CPU: {dev_env.resources.cpu}, Prod Env CPU: {prod_env.resources.cpu}")

Adding Tasks Dynamically

While the task decorator is the primary way to associate functions, you can also programmatically add TaskTemplate instances to an environment using add_task. This is useful for scenarios where tasks are generated or loaded dynamically.

dynamic_env = TaskEnvironment(name="dynamic-env", image="python:3.9")

def my_dynamic_func():
print("This is a dynamically added task.")

# Create a TaskTemplate instance (simplified for example)
dynamic_task_template = AsyncFunctionTaskTemplate(
func=my_dynamic_func,
name="dynamic-env.my_dynamic_task",
image=dynamic_env.image,
resources=dynamic_env.resources,
cache="disable", retries=0, timeout=0, reusable=None, docs=None, env_vars=None, secrets=None, pod_template=None, parent_env=None, interface=None, report=False, short_name="my_dynamic_task", plugin_config=None, max_inline_io_bytes=MAX_INLINE_IO_BYTES
)

dynamic_env.add_task(dynamic_task_template)
print(f"Tasks in dynamic_env: {list(dynamic_env.tasks.keys())}")

Best Practices and Considerations

  • Granularity: Define environments at a logical level. A single environment might encompass all tasks for a specific service or a set of related data processing steps. Avoid creating an environment for every single task unless its requirements are truly unique.
  • Naming Conventions: Adhere to snake_case or kebab-case for environment names for consistency and compatibility.
  • Image Management: Use versioned Docker images (e.g., my-image:1.0.0) to ensure reproducibility. Avoid latest in production environments.
  • Resource Allocation: Start with reasonable resource requests and monitor usage. Over-provisioning wastes resources, while under-provisioning can lead to performance issues or task failures.
  • Security: Always use secrets for sensitive information. Avoid hardcoding credentials or passing them via plain env_vars.
  • Reusable Environments: Leverage reusable environments for performance-critical async tasks, but be mindful of the restrictions on pod_template and plugin_config. Ensure your async tasks are designed to be stateless or manage state carefully across invocations within a reused process.
  • Dependencies: Clearly define depends_on relationships to ensure correct deployment order and operational stability.