Caching Task Results
Caching task results significantly improves workflow efficiency by reusing outputs from previously executed tasks with identical inputs. This avoids redundant computations, reduces execution time, and optimizes resource utilization, especially for computationally intensive or frequently invoked tasks.
Configuring Task Caching
The Cache configuration object controls how task results are cached. You define caching behavior, versioning strategies, and concurrency handling through its parameters.
The Cache object is initialized with the following parameters:
behavior: Determines the caching strategy."auto": Enables caching. The system automatically generates a cache version based on defined policies."override": Enables caching using a manually specifiedversion_override."disable": Disables caching for the task.
version_override: An optional string that explicitly sets the cache version. When provided, this version takes precedence over any policy-generated versions. This is useful for forcing a cache invalidation or ensuring a specific version is used across deployments.serialize: A boolean flag. WhenTrue, if multiple concurrent executions of the same task (with identical inputs) are initiated, only one instance executes. The other instances wait and then reuse the result from the executing instance once it completes. This prevents redundant computations for concurrent identical requests.ignored_inputs: A tuple of input names (strings) that should be excluded when generating the cache version hash. This is useful when certain inputs do not affect the task's output and should not trigger a cache miss.salt: An arbitrary string used to "salt" the hash generation process. Adding a salt ensures that even if the underlying code or inputs are identical, a different hash is produced if the salt changes. This provides an additional layer of control for cache invalidation.policies: A list ofCachePolicyobjects that define how the cache version is generated whenbehavioris"auto"andversion_overrideis not set. If no policies are explicitly provided, a default set of policies is used.
Example: Basic Cache Configuration
from flytekit.core.cache import Cache # Assuming Cache is imported from here
# Enable caching with default policies
cache_config_auto = Cache(behavior="auto")
# Enable caching with a specific version
cache_config_override = Cache(behavior="override", version_override="my-custom-version-123")
# Disable caching
cache_config_disable = Cache(behavior="disable")
# Enable caching and serialize concurrent executions
cache_config_serialize = Cache(behavior="auto", serialize=True)
# Enable caching, ignoring a specific input
cache_config_ignored_input = Cache(behavior="auto", ignored_inputs=("timestamp_input",))
Cache Versioning
The core of caching relies on generating a unique version identifier for each task execution. If a task is executed again with the same version, its cached results are reused.
Version Generation Logic
When caching is enabled (behavior is "auto" or "override"):
- If
version_overrideis provided, that string is used directly as the cache version. - If
version_overrideis not provided, the system attempts to generate a version using aCodeBundleif available. This is typically for pre-compiled or pre-packaged tasks where the version is already computed. - Otherwise, the system iterates through the configured
policiesto generate a composite hash. Each policy contributes a part to the overall version string, which is then SHA-256 hashed to produce the final cache version.
Cache Policies
Cache policies define the rules for generating a version string. A CachePolicy is a protocol that requires a get_version method. This method receives a salt and VersionParameters object, which provides context such as the task's function, container image, or code bundle information.
FunctionBodyPolicy
The FunctionBodyPolicy is a common default policy. It generates a version by hashing the Abstract Syntax Tree (AST) of the task's Python function body, combined with the provided salt. This ensures that any change in the function's logic or structure invalidates the cache. This policy is highly effective for ensuring cache invalidation when the task's implementation changes.
Custom Cache Policies
Developers can implement custom CachePolicy objects to define bespoke versioning logic. For example, a policy could hash specific external data dependencies, or even a timestamp to force periodic invalidation.
import hashlib
from flytekit.core.cache import CachePolicy, VersionParameters # Assuming these are imported
class MyCustomFunctionHashPolicy(CachePolicy):
def get_version(self, salt: str, params: VersionParameters) -> str:
# This example policy hashes the function's name and the salt.
# In a real scenario, you might hash external data identifiers or other metadata.
if params.func:
data_to_hash = f"{params.func.__name__}-{salt}".encode("utf-8")
return hashlib.sha256(data_to_hash).hexdigest()
return ""
# Using a custom policy
# You can combine custom policies with default ones or use only custom ones.
cache_config_custom_policy = Cache(
behavior="auto",
policies=[MyCustomFunctionHashPolicy()]
)
Note: The VersionParameters object currently provides func, image, and code_bundle. For policies that need to inspect task inputs for versioning, the underlying system would need to provide access to those inputs during version generation. The ignored_inputs parameter on the Cache object handles input-based exclusion at a higher level.
Best Practices and Considerations
- When to use
version_override: Useversion_overridewhen you need absolute control over cache invalidation, such as during development iterations where you frequently change code but want to force a re-run, or when deploying a new version of a model that should always re-compute. Be cautious, as manual overrides can lead to stale caches if not managed carefully. - Impact of
ignored_inputs: Carefully select inputs to ignore. Only exclude inputs that genuinely do not affect the task's output. Incorrectly ignoring an input can lead to incorrect cached results being reused. - Serialization for Concurrency: The
serialize=Trueoption is crucial for tasks that are expensive to run and frequently invoked with identical inputs concurrently. It prevents resource contention and ensures consistent results by funneling all identical requests through a single execution. - Performance: Caching significantly boosts performance by avoiding re-execution. However, the version generation process itself (especially complex custom policies) adds a small overhead. This overhead is typically negligible compared to the savings from avoiding task execution.
- Debugging Cache Misses: If a task is unexpectedly re-executing, verify the
behaviorsetting, check for changes inversion_override, and inspect thepoliciesto understand how the version hash is being generated. Changes in function body, dependencies, or even thesaltcan trigger a cache miss. - Immutability of Cached Data: Ensure that tasks produce deterministic outputs for given inputs. If a task's output can vary even with identical inputs (e.g., due to external non-deterministic factors), caching might lead to inconsistent results. In such cases, consider disabling caching or using a
version_overridethat changes frequently.