Monitoring and Managing Runs
Monitoring and Managing Runs
Runs represent individual executions of tasks within the system. These runs are managed remotely and provide comprehensive capabilities for monitoring their status, inspecting details, and controlling their lifecycle.
Retrieving Runs
The system provides methods to retrieve single runs or lists of runs, allowing developers to programmatically access execution data.
Listing All Runs
To retrieve a collection of runs, use the Run.listall class method. This method returns an asynchronous iterator of Run objects, allowing for efficient processing of potentially large result sets.
import asyncio
from typing import AsyncIterator
async def get_all_runs():
# Retrieve up to 100 runs, sorted by creation date in ascending order
async for run in Run.listall(limit=100, sort_by=("created_at", "asc")):
print(f"Run Name: {run.name}, Phase: {run.phase}")
# To run in a synchronous context:
# for run in Run.listall(limit=100):
# print(f"Run Name: {run.name}, Phase: {run.phase}")
The listall method supports filtering and sorting to refine the results:
filters: A string to apply custom filtering logic.sort_by: A tuple(field, order)to specify sorting criteria (e.g.,("created_at", "desc")).limit: The maximum number of runs to return.
Getting a Specific Run
To retrieve a single run by its name, use the Run.get class method:
import asyncio
async def get_single_run(run_name: str):
try:
run = await Run.get(name=run_name)
print(f"Retrieved Run: {run.name}, Current Phase: {run.phase}")
except Exception as e:
print(f"Error retrieving run {run_name}: {e}")
# To run in a synchronous context:
# run = Run.get(name="my-specific-run-name")
Accessing Run Details
The RunDetails class provides a more comprehensive view of a run's configuration and metadata. You can retrieve RunDetails directly or from an existing Run object.
To get RunDetails directly by name:
import asyncio
async def get_run_details_by_name(run_name: str):
details = await RunDetails.get(name=run_name)
print(f"Run Details for {details.name}:")
print(f" Task Name: {details.task_name}")
print(f" Labels: {details.pb2.run_spec.labels}")
print(f" Interruptible: {details.pb2.run_spec.interruptible}")
# To run in a synchronous context:
# details = RunDetails.get(name="my-specific-run-name")
To get RunDetails from an existing Run object:
import asyncio
async def get_details_from_run(run_name: str):
run = await Run.get(name=run_name)
details = await run.details()
print(f"Details from Run object for {details.name}:")
print(f" Environment Variables: {details.pb2.run_spec.envs}")
# To run in a synchronous context:
# run = Run.get(name="my-specific-run-name")
# details = run.details()
Monitoring Run Status
Monitoring run status is crucial for understanding the progress and outcome of task executions.
Checking Run Phase
The Run object exposes properties to check its current execution phase:
run.phase: Returns a human-readable string representing the current phase (e.g., "RUNNING", "SUCCEEDED", "FAILED").run.raw_phase: Returns the underlying protocol buffer enum value for the phase, useful for programmatic comparisons.
import asyncio
async def check_run_phase(run_name: str):
run = await Run.get(name=run_name)
print(f"Run '{run.name}' is currently in phase: {run.phase}")
if run.raw_phase == run_definition_pb2.Phase.RUNNING:
print("The run is actively executing.")
# To run in a synchronous context:
# run = Run.get(name="my-specific-run-name")
# print(run.phase)
Waiting for Run Completion
The run.wait() method blocks execution until the run reaches a terminal state (succeeded, failed, aborted) or a specified state. It provides a rich progress panel in the console, showing status transitions and elapsed time.
import asyncio
async def wait_for_run_completion(run_name: str):
run = await Run.get(name=run_name)
print(f"Waiting for run '{run.name}' to complete...")
await run.wait()
print(f"Run '{run.name}' has completed with phase: {run.phase}")
# To run in a synchronous context:
# run = Run.get(name="my-specific-run-name")
# run.wait()
You can also wait for a run to enter the "running" state:
import asyncio
async def wait_for_run_to_start(run_name: str):
run = await Run.get(name=run_name)
print(f"Waiting for run '{run.name}' to start running...")
await run.wait(wait_for="running")
print(f"Run '{run.name}' is now in phase: {run.phase}")
Streaming Run Updates
For real-time monitoring, the run.watch() method provides an asynchronous generator that yields ActionDetails objects as the run progresses. This allows for custom handling of status changes without blocking.
import asyncio
from typing import AsyncGenerator
async def watch_run_progress(run_name: str):
run = await Run.get(name=run_name)
print(f"Watching run '{run.name}'...")
async for action_details in run.watch():
print(f" Current Phase: {action_details.phase}, Last Updated: {action_details.pb2.status.updated_at.ToDatetime()}")
if action_details.done():
print(f"Run '{run_details.name}' has reached a terminal state.")
break
Checking if a Run is Done
Both Run and RunDetails objects provide a done() method to quickly check if a run has reached a terminal state (succeeded, failed, aborted).
import asyncio
async def check_if_run_is_done(run_name: str):
run = await Run.get(name=run_name)
if run.done():
print(f"Run '{run.name}' is already done.")
else:
print(f"Run '{run.name}' is still active.")
Inspecting Run Details
Beyond basic status, you can delve into the specifics of a run's execution and configuration.
Viewing Logs
The run.show_logs() method displays the logs generated by the run. This is essential for debugging and understanding execution flow.
import asyncio
async def view_run_logs(run_name: str):
run = await Run.get(name=run_name)
print(f"Displaying logs for run '{run.name}':")
await run.show_logs(max_lines=50, show_ts=True) # Show last 50 lines with timestamps
# To run in a synchronous context:
# run = Run.get(name="my-specific-run-name")
# run.show_logs()
Parameters like attempt, max_lines, show_ts, raw, and filter_system allow for fine-grained control over log retrieval and display.
Accessing Inputs and Outputs
The RunDetails object provides inputs() and outputs() methods. These are designed to retrieve the data passed into and produced by the run.
import asyncio
async def get_run_io(run_name: str):
details = await RunDetails.get(name=run_name)
# Note: inputs() and outputs() are placeholders and will be extended
# to handle actual data retrieval from the run context.
run_inputs = await details.inputs()
run_outputs = await details.outputs()
print(f"Run '{details.name}' Inputs: {run_inputs}")
print(f"Run '{details.name}' Outputs: {run_outputs}")
Getting the Run UI URL
Each run has a corresponding URL in the web UI, which can be accessed via the run.url property. This is useful for quickly navigating to the visual representation of a run.
import asyncio
async def get_run_ui_url(run_name: str):
run = await Run.get(name=run_name)
print(f"View run '{run.name}' in the UI: {run.url}")
Managing Runs
Beyond monitoring, you can also take actions to control the lifecycle of a run.
Aborting a Run
To stop a currently executing run, use the run.abort() method. This sends a signal to terminate the run gracefully.
import asyncio
async def abort_a_run(run_name: str):
run = await Run.get(name=run_name)
if not run.done():
print(f"Attempting to abort run '{run.name}'...")
await run.abort()
print(f"Run '{run.name}' abortion request sent.")
await run.wait() # Optionally wait for the run to enter an aborted state
print(f"Run '{run.name}' is now in phase: {run.phase}")
else:
print(f"Run '{run.name}' is already in a terminal state ({run.phase}), cannot abort.")
# To run in a synchronous context:
# run = Run.get(name="my-specific-run-name")
# run.abort()