Client Configuration and Authentication
Client Configuration and Authentication
Client applications interact with various backend services through a unified client interface. This interface manages connections and provides access to specific service clients, while authentication mechanisms ensure secure access.
ClientSet: Service Interaction and Initialization
The ClientSet class serves as the primary entry point for client applications to interact with backend services. It encapsulates the gRPC channel management and provides access to various service stubs.
A ClientSet instance is initialized with a gRPC channel and an endpoint. It then creates stubs for core services like AdminService, TaskService, RunService, DataProxyService, RunLogsService, and SecretService.
Initialization Methods:
-
ClientSet.for_endpoint(endpoint: str, *, insecure: bool = False, **kwargs) -> ClientSetThis class method creates aClientSetinstance by establishing a gRPC channel to the specifiedendpoint. It is suitable for scenarios where the endpoint is known directly, and authentication might be handled separately or is not required (e.g., insecure connections for local development).from flyte.remote._client.client import ClientSet
import asyncio
async def get_client_for_endpoint():
# Example: Connecting to a local or known endpoint
client_set = await ClientSet.for_endpoint("localhost:8080", insecure=True)
# Use client_set.task_service, client_set.run_service, etc.
print(f"Connected to endpoint: {client_set.endpoint}")
await client_set.close()
# asyncio.run(get_client_for_endpoint()) -
ClientSet.for_api_key(api_key: str, *, insecure: bool = False, **kwargs) -> ClientSetThis method facilitates connection using an API key. It decodes the API key to extract the endpoint and then establishes a gRPC channel. This is useful for programmatic access where an API key is the primary authentication mechanism.from flyte.remote._client.client import ClientSet
import asyncio
async def get_client_for_api_key(my_api_key: str):
# Example: Connecting using an API key
client_set = await ClientSet.for_api_key(my_api_key)
print(f"Connected using API key to endpoint: {client_set.endpoint}")
await client_set.close()
# Replace "YOUR_API_KEY" with an actual API key
# asyncio.run(get_client_for_api_key("YOUR_API_KEY"))
Service Access:
Once initialized, ClientSet provides properties to access specific gRPC service stubs:
client_set.metadata_serviceclient_set.project_domain_serviceclient_set.task_serviceclient_set.run_serviceclient_set.dataproxy_serviceclient_set.logs_serviceclient_set.secrets_service
These properties return the respective service stubs, allowing direct interaction with the backend APIs.
Resource Management:
The close(grace: float | None = None) method allows for graceful shutdown of the underlying gRPC channel, releasing resources.
ClientConfig: Defining Authentication Parameters
The ClientConfig class is a Pydantic model that defines the necessary configuration parameters for various authentication flows, particularly those based on OAuth 2.0 and OpenID Connect (OIDC).
Key Fields:
token_endpoint: The OAuth 2.0 token endpoint URL.authorization_endpoint: The OAuth 2.0 authorization endpoint URL.redirect_uri: The URI where the authorization server redirects the user after granting authorization.client_id: The client identifier issued by the authorization server.device_authorization_endpoint: (Optional) The endpoint for the OAuth 2.0 Device Authorization Grant flow.scopes: (Optional) A list of scopes to request during authentication (e.g.,["offline_access", "openid"]).header_key: The HTTP header key used for sending the access token (defaults to "authorization").audience: (Optional) The audience for which the access token is intended.
Configuration Overrides:
The with_override(other: ClientConfig) -> ClientConfig method allows merging two ClientConfig instances. Values from the other instance take precedence, enabling flexible configuration management where default settings can be overridden by more specific ones.
Credentials and KeyringStore: Token Management
Credentials:
The Credentials class stores authentication tokens and related information:
access_token: The primary token used for API requests.refresh_token: (Optional) A token used to obtain new access tokens without re-authenticating the user.expires_in: (Optional) The lifetime in seconds of the access token.for_endpoint: The endpoint URL associated with these credentials.id: A unique identifier for the credentials, computed as an MD5 hash of theaccess_token. This ID is used internally by authenticators to track token freshness.
KeyringStore:
The KeyringStore utility provides a secure and persistent way to manage Credentials using the system keyring. This prevents users from needing to re-authenticate frequently.
-
KeyringStore.store(credentials: Credentials) -> CredentialsStores theaccess_tokenandrefresh_token(if present) in the system keyring. Thefor_endpointvalue is used as the service name for keyring entries. This method logs errors if the keyring is unavailable but does not raise exceptions, allowing the application to continue without persistent caching. -
KeyringStore.retrieve(for_endpoint: str) -> typing.Optional[Credentials]Retrieves storedaccess_tokenandrefresh_tokenfor a givenfor_endpointfrom the system keyring. It returns aCredentialsobject if tokens are found, otherwiseNone. -
KeyringStore.delete(for_endpoint: str)Removes all stored credentials associated with the specifiedfor_endpointfrom the system keyring.
Best Practice: Using KeyringStore is recommended for client applications to enhance user experience by securely caching authentication tokens.
Authenticator: The Foundation of Client Authentication
The Authenticator is an abstract base class that provides the core framework for managing authentication state, resolving configuration, and refreshing tokens. All specific authentication flows inherit from this class.
Initialization:
An Authenticator is initialized with:
endpoint: The target service endpoint.cfg_store: An optionalClientConfigStoreto fetch remote client configurations.client_config: An optional localClientConfigto override remote settings.credentials: Optional initialCredentials. If not provided,KeyringStore.retrieveattempts to load existing credentials.- HTTP session parameters (
http_session,http_proxy_url,verify,ca_cert_path, etc.).
Key Responsibilities:
- Configuration Resolution (
_resolve_config): Merges localClientConfigwith remote configuration fetched fromcfg_store. This process is thread-safe and coroutine-safe, ensuring the configuration is resolved only once. - Credential Management: Provides
get_credentials()to retrieve the current tokens and_set_credentials()to update them. - gRPC Metadata Generation (
get_grpc_call_auth_metadata): Formats the access token into gRPC-compatible metadata (e.g.,("authorization", "Bearer <access_token>")) for secure gRPC calls. It also includes acreds_idfor tracking. - Thread-Safe Credential Refresh (
refresh_credentials): This critical method handles token expiry. It implements a double-check locking mechanism to ensure that only one thread or coroutine attempts to refresh credentials at a time, preventing redundant network calls. If a refresh is needed, it calls the abstract_do_refresh_credentialsmethod implemented by subclasses. Upon successful refresh, it stores the new credentials inKeyringStore. If refresh fails, it deletes the old credentials fromKeyringStoreand re-raises the exception.
Subclass Implementation (_do_refresh_credentials):
Subclasses must implement the _do_refresh_credentials() abstract method. This method contains the specific logic for obtaining or refreshing an access token according to the chosen authentication flow. It should return a Credentials object.
PKCEAuthenticator: Interactive Browser-Based Authentication
The PKCEAuthenticator implements the OAuth 2.0 Proof Key for Code Exchange (PKCE) flow, which is suitable for public clients (like desktop or mobile apps) that cannot securely store a client secret. This flow typically involves opening a browser window for user interaction.
Use Case: User-facing applications where interactive login via a web browser is acceptable.
Flow:
- Generates a
code_verifierandcode_challenge. - Attempts to refresh existing credentials using a refresh token.
- If refresh fails or no credentials exist, it initiates the PKCE flow:
- Constructs an authorization URL with the
code_challenge. - Opens the URL in a browser, prompting the user to log in.
- After successful login, the authorization server redirects to a
redirect_uriwith an authorization code. - Exchanges the authorization code and
code_verifierfor anaccess_tokenandrefresh_tokenat the token endpoint.
- Constructs an authorization URL with the
- Stores the new
CredentialsinKeyringStore.
Configuration: Requires client_id, redirect_uri, authorization_endpoint, token_endpoint, scopes, and audience to be defined in the ClientConfig. For persistent sessions, scopes should include "offline_access" or similar to obtain a refresh token.
from flyte.remote._client.auth.pkce import PKCEAuthenticator
from flyte.remote._client.auth.client_config import ClientConfig
import asyncio
async def authenticate_with_pkce():
# Example ClientConfig (these values would typically come from a backend configuration)
client_cfg = ClientConfig(
token_endpoint="https://your-auth-server.com/oauth/token",
authorization_endpoint="https://your-auth-server.com/authorize",
redirect_uri="http://localhost:8080/callback",
client_id="YOUR_CLIENT_ID",
scopes=["openid", "profile", "email", "offline_access"],
audience="https://your-api-audience.com"
)
# Initialize PKCEAuthenticator
authenticator = PKCEAuthenticator(
endpoint="your-flyte-endpoint.com",
client_config=client_cfg,
# cfg_store=... (if remote config is used)
)
# Force a refresh to initiate the flow or use existing tokens
await authenticator.refresh_credentials()
creds = authenticator.get_credentials()
if creds:
print(f"PKCE Authentication successful. Access Token ID: {creds.id}")
else:
print("PKCE Authentication failed.")
# asyncio.run(authenticate_with_pkce())
DeviceCodeAuthenticator: Headless Device Authentication
The DeviceCodeAuthenticator implements the OAuth 2.0 Device Authorization Grant flow, designed for input-constrained devices or command-line applications where a web browser is not readily available for direct interaction.
Use Case: CLI tools, IoT devices, or other headless environments.
Flow:
- Attempts to refresh existing credentials using a refresh token.
- If refresh fails or no credentials exist, it initiates the device code flow:
- Requests a device code from the
device_authorization_endpoint. - Displays a user code and a verification URI to the user.
- Prompts the user to visit the URI on a separate device (e.g., their computer or phone) and enter the user code.
- Polls the token endpoint until the user completes authentication or the device code expires.
- Requests a device code from the
- Upon successful authentication, it obtains an
access_tokenandrefresh_token. - Stores the new
CredentialsinKeyringStore.
Configuration: Requires device_authorization_endpoint, token_endpoint, client_id, scopes, and audience to be defined in the ClientConfig. The device_authorization_endpoint must be configured on the backend.
from flyte.remote._client.auth.device_code import DeviceCodeAuthenticator
from flyte.remote._client.auth.client_config import ClientConfig
import asyncio
async def authenticate_with_device_code():
# Example ClientConfig
client_cfg = ClientConfig(
token_endpoint="https://your-auth-server.com/oauth/token",
device_authorization_endpoint="https://your-auth-server.com/oauth/device/code",
client_id="YOUR_CLIENT_ID_FOR_DEVICE_FLOW",
scopes=["openid", "profile", "offline_access"],
audience="https://your-api-audience.com"
)
# Initialize DeviceCodeAuthenticator
authenticator = DeviceCodeAuthenticator(
endpoint="your-flyte-endpoint.com",
client_config=client_cfg,
)
# Force a refresh to initiate the flow or use existing tokens
await authenticator.refresh_credentials()
creds = authenticator.get_credentials()
if creds:
print(f"Device Code Authentication successful. Access Token ID: {creds.id}")
else:
print("Device Code Authentication failed.")
# asyncio.run(authenticate_with_device_code())
ClientCredentialsAuthenticator: Machine-to-Machine Authentication
The ClientCredentialsAuthenticator implements the OAuth 2.0 Client Credentials Grant flow, designed for machine-to-machine authentication where a client (e.g., a service, daemon, or another application) needs to access resources without user involvement.
Use Case: Service accounts, background services, or applications authenticating directly with a client ID and secret.
Flow:
- Uses the provided
client_idandclient_credentials_secretto construct a basic authorization header. - Sends a request to the
token_endpointwith the authorization header and grant typeclient_credentials. - Receives an
access_token(and optionallyrefresh_tokenandexpires_in). - Stores the new
CredentialsinKeyringStore.
Configuration: Requires client_id and client_credentials_secret to be passed directly during initialization. The token_endpoint, scopes, and audience are typically defined in the ClientConfig.
Security Consideration: The client_credentials_secret is a sensitive credential and should be handled securely, for example, by loading it from environment variables or a secret management system, rather than hardcoding it.
from flyte.remote._client.auth.client_credentials import ClientCredentialsAuthenticator
from flyte.remote._client.auth.client_config import ClientConfig
import asyncio
import os
async def authenticate_with_client_credentials():
# Load client ID and secret securely (e.g., from environment variables)
client_id = os.getenv("FLYTE_CLIENT_ID", "YOUR_CLIENT_ID")
client_secret = os.getenv("FLYTE_CLIENT_SECRET", "YOUR_CLIENT_SECRET")
# Example ClientConfig
client_cfg = ClientConfig(
token_endpoint="https://your-auth-server.com/oauth/token",
client_id=client_id, # Note: client_id is also passed to the authenticator directly
scopes=["api_scope_1", "api_scope_2"],
audience="https://your-api-audience.com"
)
# Initialize ClientCredentialsAuthenticator
authenticator = ClientCredentialsAuthenticator(
endpoint="your-flyte-endpoint.com",
client_id=client_id,
client_credentials_secret=client_secret,
client_config=client_cfg,
)
# Force a refresh to obtain new tokens
await authenticator.refresh_credentials()
creds = authenticator.get_credentials()
if creds:
print(f"Client Credentials Authentication successful. Access Token ID: {creds.id}")
else:
print("Client Credentials Authentication failed.")
# asyncio.run(authenticate_with_client_credentials())
AsyncCommandAuthenticator: Custom Token Retrieval
The AsyncCommandAuthenticator provides a flexible mechanism to obtain an access token by executing an external command. This is useful for integrating with custom identity providers, existing authentication tools, or complex token generation processes.
Use Case: When standard OAuth 2.0 flows are not sufficient, and an external script or tool can provide the access token.
Flow:
- Executes the specified
commandas a subprocess. - Captures the standard output (stdout) of the command.
- Treats the stdout content (stripped of whitespace) as the
access_token. - Creates
Credentialswith the obtained token. - Stores the new
CredentialsinKeyringStore.
Configuration: Requires a command (a list of strings representing the command and its arguments) during initialization.
Considerations:
- The command must output a valid access token to stdout.
- Error handling is included: if the command returns a non-zero exit code or raises an exception, an
AuthenticationErroris raised with debugging information. - The command is executed asynchronously using
asyncio.create_subprocess_exec.
from flyte.remote._client.auth.command import AsyncCommandAuthenticator
import asyncio
import os
async def authenticate_with_command():
# Example: A hypothetical command that outputs an access token
# In a real scenario, this might be 'gcloud auth print-access-token' or a custom script.
# For demonstration, we'll use a simple echo command.
# Ensure the command is executable and outputs the token to stdout.
command = ["echo", "my_custom_access_token_from_command"]
# Initialize AsyncCommandAuthenticator
authenticator = AsyncCommandAuthenticator(
endpoint="your-flyte-endpoint.com",
command=command,
)
# Force a refresh to execute the command and obtain tokens
await authenticator.refresh_credentials()
creds = authenticator.get_credentials()
if creds:
print(f"Command Authentication successful. Access Token: {creds.access_token}")
else:
print("Command Authentication failed.")
# asyncio.run(authenticate_with_command())