Skip to main content

Caching Task Results

Caching task results significantly improves workflow efficiency by reusing outputs from previously executed tasks with identical inputs. This avoids redundant computations, reduces execution time, and optimizes resource utilization, especially for computationally intensive or frequently invoked tasks.

Configuring Task Caching

The Cache configuration object controls how task results are cached. You define caching behavior, versioning strategies, and concurrency handling through its parameters.

The Cache object is initialized with the following parameters:

  • behavior: Determines the caching strategy.
    • "auto": Enables caching. The system automatically generates a cache version based on defined policies.
    • "override": Enables caching using a manually specified version_override.
    • "disable": Disables caching for the task.
  • version_override: An optional string that explicitly sets the cache version. When provided, this version takes precedence over any policy-generated versions. This is useful for forcing a cache invalidation or ensuring a specific version is used across deployments.
  • serialize: A boolean flag. When True, if multiple concurrent executions of the same task (with identical inputs) are initiated, only one instance executes. The other instances wait and then reuse the result from the executing instance once it completes. This prevents redundant computations for concurrent identical requests.
  • ignored_inputs: A tuple of input names (strings) that should be excluded when generating the cache version hash. This is useful when certain inputs do not affect the task's output and should not trigger a cache miss.
  • salt: An arbitrary string used to "salt" the hash generation process. Adding a salt ensures that even if the underlying code or inputs are identical, a different hash is produced if the salt changes. This provides an additional layer of control for cache invalidation.
  • policies: A list of CachePolicy objects that define how the cache version is generated when behavior is "auto" and version_override is not set. If no policies are explicitly provided, a default set of policies is used.

Example: Basic Cache Configuration

from flytekit.core.cache import Cache # Assuming Cache is imported from here

# Enable caching with default policies
cache_config_auto = Cache(behavior="auto")

# Enable caching with a specific version
cache_config_override = Cache(behavior="override", version_override="my-custom-version-123")

# Disable caching
cache_config_disable = Cache(behavior="disable")

# Enable caching and serialize concurrent executions
cache_config_serialize = Cache(behavior="auto", serialize=True)

# Enable caching, ignoring a specific input
cache_config_ignored_input = Cache(behavior="auto", ignored_inputs=("timestamp_input",))

Cache Versioning

The core of caching relies on generating a unique version identifier for each task execution. If a task is executed again with the same version, its cached results are reused.

Version Generation Logic

When caching is enabled (behavior is "auto" or "override"):

  1. If version_override is provided, that string is used directly as the cache version.
  2. If version_override is not provided, the system attempts to generate a version using a CodeBundle if available. This is typically for pre-compiled or pre-packaged tasks where the version is already computed.
  3. Otherwise, the system iterates through the configured policies to generate a composite hash. Each policy contributes a part to the overall version string, which is then SHA-256 hashed to produce the final cache version.

Cache Policies

Cache policies define the rules for generating a version string. A CachePolicy is a protocol that requires a get_version method. This method receives a salt and VersionParameters object, which provides context such as the task's function, container image, or code bundle information.

FunctionBodyPolicy

The FunctionBodyPolicy is a common default policy. It generates a version by hashing the Abstract Syntax Tree (AST) of the task's Python function body, combined with the provided salt. This ensures that any change in the function's logic or structure invalidates the cache. This policy is highly effective for ensuring cache invalidation when the task's implementation changes.

Custom Cache Policies

Developers can implement custom CachePolicy objects to define bespoke versioning logic. For example, a policy could hash specific external data dependencies, or even a timestamp to force periodic invalidation.

import hashlib
from flytekit.core.cache import CachePolicy, VersionParameters # Assuming these are imported

class MyCustomFunctionHashPolicy(CachePolicy):
def get_version(self, salt: str, params: VersionParameters) -> str:
# This example policy hashes the function's name and the salt.
# In a real scenario, you might hash external data identifiers or other metadata.
if params.func:
data_to_hash = f"{params.func.__name__}-{salt}".encode("utf-8")
return hashlib.sha256(data_to_hash).hexdigest()
return ""

# Using a custom policy
# You can combine custom policies with default ones or use only custom ones.
cache_config_custom_policy = Cache(
behavior="auto",
policies=[MyCustomFunctionHashPolicy()]
)

Note: The VersionParameters object currently provides func, image, and code_bundle. For policies that need to inspect task inputs for versioning, the underlying system would need to provide access to those inputs during version generation. The ignored_inputs parameter on the Cache object handles input-based exclusion at a higher level.

Best Practices and Considerations

  • When to use version_override: Use version_override when you need absolute control over cache invalidation, such as during development iterations where you frequently change code but want to force a re-run, or when deploying a new version of a model that should always re-compute. Be cautious, as manual overrides can lead to stale caches if not managed carefully.
  • Impact of ignored_inputs: Carefully select inputs to ignore. Only exclude inputs that genuinely do not affect the task's output. Incorrectly ignoring an input can lead to incorrect cached results being reused.
  • Serialization for Concurrency: The serialize=True option is crucial for tasks that are expensive to run and frequently invoked with identical inputs concurrently. It prevents resource contention and ensures consistent results by funneling all identical requests through a single execution.
  • Performance: Caching significantly boosts performance by avoiding re-execution. However, the version generation process itself (especially complex custom policies) adds a small overhead. This overhead is typically negligible compared to the savings from avoiding task execution.
  • Debugging Cache Misses: If a task is unexpectedly re-executing, verify the behavior setting, check for changes in version_override, and inspect the policies to understand how the version hash is being generated. Changes in function body, dependencies, or even the salt can trigger a cache miss.
  • Immutability of Cached Data: Ensure that tasks produce deterministic outputs for given inputs. If a task's output can vary even with identical inputs (e.g., due to external non-deterministic factors), caching might lead to inconsistent results. In such cases, consider disabling caching or using a version_override that changes frequently.