Remote Platform API & Entities
Remote Platform API & Entities
The Remote Platform API provides a programmatic interface for interacting with the core components and executions on the remote platform. This documentation outlines the key entities and the client-side API for managing projects, tasks, runs, actions, and secrets, along with capabilities for monitoring and logging.
Connecting to the Platform
Interacting with the remote platform begins by establishing a connection and authenticating. The ClientSet class serves as the primary entry point for accessing various platform services.
Client Initialization
To connect to the remote platform, initialize a ClientSet instance. This client aggregates service stubs for different functionalities like task management, run management, and secret management.
You can initialize the client using an endpoint, an API key, or by leveraging environment variables:
- By Endpoint: Connects to a specified platform endpoint.
from flyte.remote._client.controlplane import ClientSet
client = await ClientSet.for_endpoint("grpc.myplatform.com:443", insecure=False) - By API Key: Authenticates using a provided API key.
from flyte.remote._client.controlplane import ClientSet
client = await ClientSet.for_api_key("your-api-key-string") - From Environment: (Not yet implemented, but indicated by
ClientSet.from_env)
Once initialized, the ClientSet provides access to specific service clients, such as client.task_service, client.run_service, and client.secrets_service.
Authentication Mechanisms
The platform supports various authentication flows, managed by Authenticator subclasses. These authenticators handle the process of acquiring and refreshing access tokens, often integrating with the system keyring for secure credential storage.
- Proof Key for Code Exchange (PKCE) Flow (
PKCEAuthenticator): This flow is suitable for user-interactive applications. It typically opens a web browser for the user to log in, then exchanges an authorization code for an access token. ThePKCEAuthenticatormanages the entire process, including code verifier/challenge generation and token exchange. - Device Code Flow (
DeviceCodeAuthenticator): Designed for headless or input-constrained devices. It provides a user code and verification URI for the user to complete authentication on a separate device. The client then polls for the access token. - Client Credentials Flow (
ClientCredentialsAuthenticator): Used for machine-to-machine authentication, where a client ID and client secret are used to obtain an access token directly. - External Command Authenticator (
AsyncCommandAuthenticator): Allows an external command to be executed to retrieve an access token. This is useful for integrating with custom identity providers or existing credential management tools.
Authentication details, such as token endpoints and client IDs, are managed by ClientConfigStore implementations. The RemoteClientConfigStore can fetch this configuration directly from the platform's authentication metadata service.
gRPC Interceptors: Authentication and default metadata injection for gRPC calls are handled transparently by client-side interceptors (AuthUnaryUnaryInterceptor, AuthUnaryStreamInterceptor, AuthStreamUnaryInterceptor, AuthStreamStreamInterceptor). These interceptors automatically add the necessary authentication headers to outgoing requests and handle token refreshing when an UNAUTHENTICATED status is received from the server. This ensures that developers do not need to manually manage token lifecycle for each gRPC call.
Managing Platform Resources
The remote API exposes several core entities that represent the building blocks of your data and execution workflows.
Projects
A Project represents a logical grouping of resources on the platform.
- Retrieving Project Details: Use
Project.getto fetch a specific project by its name.from flyte.remote._project import Project
project = await Project.get("my-project-name")
print(f"Project Name: {project.name}, State: {project.state}") - Listing Projects: Iterate through all available projects using
Project.listall.from flyte.remote._project import Project
async for p in Project.listall():
print(f"Found project: {p.name}")
Tasks
A Task represents a deployable unit of computation.
-
Listing Tasks: Retrieve a list of tasks, optionally filtered by name, environment, project, or domain.
from flyte.remote._task import Task
async for t in Task.listall(by_task_name="my_data_processing_task", limit=10):
print(f"Task: {t.name}, Version: {t.version}") -
Retrieving Task Details: The
Task.getmethod returns aLazyEntitywhich defers fetching the fullTaskDetailsuntil it's accessed or invoked. This supports auto-versioning:version="specific-version": Fetches a task by an exact version.auto_version="latest": Retrieves the most recently created version of the task.auto_version="current": (Only within a task context) Derives the version from the currently executing task's context, useful for deploying environments with consistent versions.
from flyte.remote._task import Task
# Get the latest version of a task
lazy_task = Task.get(name="my_task", auto_version="latest")
task_details = await lazy_task.fetch() # Explicitly fetch details
print(f"Task Interface: {task_details.interface}")
# Get a specific version
task_v1 = Task.get(name="my_task", version="v1.0.0") -
Invoking Remote Tasks: When operating within a task context, a
TaskDetailsobject can be invoked directly, which submits the task to the platform for execution. Currently, only keyword arguments are supported for invocation.# Example within a task context (conceptual)
# from flyte import task
# from flyte.remote._task import Task
#
# @task
# async def my_workflow_task():
# remote_task = Task.get(name="another_task", auto_version="latest")
# result = await remote_task(input_param="value")
# print(f"Remote task output: {result}") -
Overriding Task Properties: You can temporarily override certain task properties like
short_name,resources,retries,timeout,env_vars, andsecretsusing theoverridemethod on aTaskDetailsobject. This creates a modified instance without altering the original task definition on the platform.from flyte.remote._task import Task
from flyte import Resources, SecretRequest
lazy_task = Task.get(name="my_task", auto_version="latest")
task_details = await lazy_task.fetch()
# Override resources and add a secret
overridden_task = task_details.override(
resources=Resources(cpu="1", mem="500Mi"),
secrets=SecretRequest(key="my-api-key")
)
print(f"Overridden task resources: {overridden_task.resources}")
Runs and Actions
A Run represents an execution instance of a task or workflow. An Action is a specific step or component within a Run.
- Understanding Runs and Actions: A
Runencapsulates the overall execution, whileActionobjects provide granular details about individual operations within that run.RunDetailsandActionDetailsoffer comprehensive metadata and status information. - Listing Runs and Actions: Retrieve lists of runs or actions, optionally filtered and sorted.
from flyte.remote._run import Run
async for r in Run.listall(limit=5):
print(f"Run: {r.name}, Phase: {r.phase}")
from flyte.remote._action import Action
async for a in Action.listall(for_run_name="my-run-name"):
print(f"Action: {a.name}, Phase: {a.phase}") - Retrieving Run and Action Details: Fetch specific run or action details by name.
from flyte.remote._run import Run
run = await Run.get("my-specific-run")
print(f"Run URL: {run.url}")
from flyte.remote._action import Action
action = await Action.get(run_name="my-specific-run", name="my-action-name")
print(f"Action Runtime: {action.runtime}") - Monitoring Execution:
wait(quiet=False, wait_for="terminal"): Blocks until the run or action reaches a terminal state (succeeded, failed, aborted). It provides a rich progress panel in the console.wait_for="running"can be used to wait until the action starts running.watch(cache_data_on_done=False): Provides an asynchronous generator that yieldsActionDetailsupdates as the execution progresses. This is useful for building custom monitoring UIs or logic.
# Wait for a run to complete
await run.wait()
# Watch action updates
async for ad in action.watch():
print(f"Action {ad.name} is in phase: {ad.phase}")
if ad.done():
break - Accessing Inputs and Outputs: Once an action is in a terminal state, its inputs and outputs can be retrieved.
ActionInputsandActionOutputswrap the raw protocol buffer data with more accessible Python types.inputs = await action.inputs()
print(f"Action Inputs: {inputs}")
outputs = await action.outputs()
print(f"Action Outputs: {outputs}") - Aborting Runs: Terminate an ongoing run using the
abortmethod.await run.abort()
print(f"Run {run.name} aborted.")
Secrets
The platform provides two distinct Secret concepts:
-
Local Secret Definition (
Secret- core class): This class, defined in the core codebase, is used to specify how sensitive information should be injected into tasks. Secrets can be mounted as environment variables or files.key: The name of the secret in the secret store.group: An optional organizational identifier.as_env_var: The name of the environment variable the secret will be mounted as. If not specified, it defaults toKEYorGROUP_KEY(uppercase, hyphens replaced with underscores).mount: (TODO: currently only/etc/flyte/secretsis supported) Specifies a file path for mounting.
from flyte import task, Secret
import os
@task(secrets=[Secret(key="my-api-key", as_env_var="API_KEY")])
async def my_task_with_secret():
api_key = os.environ.get("API_KEY")
print(f"API Key (first 5 chars): {api_key[:5]}...") -
Remote Secret Management (
Secret-src.flyte.remote._secret.Secret): This class provides an API for performing CRUD (Create, Read, Update, Delete) operations on secrets stored directly on the remote platform.- Creating Secrets: Store a new secret with a given name and value.
from flyte.remote._secret import Secret
await Secret.create(name="my-remote-secret", value="super-secret-value") - Retrieving Secrets: Fetch metadata about a secret by its name. Note that the actual secret value is not directly exposed for security reasons.
remote_secret = await Secret.get(name="my-remote-secret")
print(f"Remote Secret Name: {remote_secret.name}, Type: {remote_secret.type}") - Listing Secrets: Iterate through all secrets accessible within the current project and domain.
async for s in Secret.listall():
print(f"Found remote secret: {s.name}") - Deleting Secrets: Remove a secret from the remote store.
await Secret.delete(name="my-remote-secret")
print(f"Remote secret 'my-remote-secret' deleted.")
- Creating Secrets: Store a new secret with a given name and value.
Monitoring and Logging
The platform provides robust capabilities for monitoring execution and viewing logs.
Viewing Execution Logs
The Logs utility provides methods for streaming and viewing logs associated with Actions.
- Tailing Logs: Stream log lines in real-time.
from flyte.remote._logs import Logs
from flyte.core.identifier import ActionIdentifier, RunIdentifier
action_id = ActionIdentifier(
run=RunIdentifier(org="my-org", project="my-project", domain="development", name="my-run-name"),
name="my-action-name"
)
async for line in Logs.tail(action_id=action_id):
print(line.message, end="") - Interactive Log Viewer: The
create_viewermethod launches an interactive log viewer, especially useful in environments like Jupyter notebooks. It supports options for maximum lines, timestamps, raw output, and filtering system logs.The# Launch an interactive log viewer for an action
await Logs.create_viewer(
action_id=action_id,
attempt=1,
max_lines=50,
show_ts=True,
filter_system=True,
panel=True # Display in a rich panel
)AsyncLogViewerclass powers this interactive experience, managing the display and scrolling of log lines.
Utility and Best Practices
Asynchronous Operations
Many methods in the remote API are asynchronous (async def). This design allows for efficient handling of network I/O and long-running operations without blocking the main thread. When using these methods, ensure your code is within an async context and uses await. Synchronous wrappers are often provided via the @syncify decorator for convenience in non-async contexts.
Error Handling
The API uses standard Python exceptions for error conditions. Specific authentication errors, such as AuthenticationError, AccessTokenNotFoundError, and AuthenticationPending, are raised for issues during the authentication process. gRPC-specific errors (grpc.aio.AioRpcError) may also be raised for communication failures. Implement appropriate try...except blocks to handle these gracefully.
Serialization (ToJSONMixin)
Many remote entity classes inherit from ToJSONMixin, providing convenient methods to convert objects into JSON-serializable dictionaries (to_dict()) or JSON strings (to_json()). This is useful for debugging, logging, or integrating with other systems that consume JSON.
from flyte.remote._project import Project
project = await Project.get("my-project-name")
print(project.to_json())