Skip to main content

Client Configuration and Authentication

Client Configuration and Authentication

Client applications interact with various backend services through a unified client interface. This interface manages connections and provides access to specific service clients, while authentication mechanisms ensure secure access.

ClientSet: Service Interaction and Initialization

The ClientSet class serves as the primary entry point for client applications to interact with backend services. It encapsulates the gRPC channel management and provides access to various service stubs.

A ClientSet instance is initialized with a gRPC channel and an endpoint. It then creates stubs for core services like AdminService, TaskService, RunService, DataProxyService, RunLogsService, and SecretService.

Initialization Methods:

  • ClientSet.for_endpoint(endpoint: str, *, insecure: bool = False, **kwargs) -> ClientSet This class method creates a ClientSet instance by establishing a gRPC channel to the specified endpoint. It is suitable for scenarios where the endpoint is known directly, and authentication might be handled separately or is not required (e.g., insecure connections for local development).

    from flyte.remote._client.client import ClientSet
    import asyncio

    async def get_client_for_endpoint():
    # Example: Connecting to a local or known endpoint
    client_set = await ClientSet.for_endpoint("localhost:8080", insecure=True)
    # Use client_set.task_service, client_set.run_service, etc.
    print(f"Connected to endpoint: {client_set.endpoint}")
    await client_set.close()

    # asyncio.run(get_client_for_endpoint())
  • ClientSet.for_api_key(api_key: str, *, insecure: bool = False, **kwargs) -> ClientSet This method facilitates connection using an API key. It decodes the API key to extract the endpoint and then establishes a gRPC channel. This is useful for programmatic access where an API key is the primary authentication mechanism.

    from flyte.remote._client.client import ClientSet
    import asyncio

    async def get_client_for_api_key(my_api_key: str):
    # Example: Connecting using an API key
    client_set = await ClientSet.for_api_key(my_api_key)
    print(f"Connected using API key to endpoint: {client_set.endpoint}")
    await client_set.close()

    # Replace "YOUR_API_KEY" with an actual API key
    # asyncio.run(get_client_for_api_key("YOUR_API_KEY"))

Service Access:

Once initialized, ClientSet provides properties to access specific gRPC service stubs:

  • client_set.metadata_service
  • client_set.project_domain_service
  • client_set.task_service
  • client_set.run_service
  • client_set.dataproxy_service
  • client_set.logs_service
  • client_set.secrets_service

These properties return the respective service stubs, allowing direct interaction with the backend APIs.

Resource Management:

The close(grace: float | None = None) method allows for graceful shutdown of the underlying gRPC channel, releasing resources.

ClientConfig: Defining Authentication Parameters

The ClientConfig class is a Pydantic model that defines the necessary configuration parameters for various authentication flows, particularly those based on OAuth 2.0 and OpenID Connect (OIDC).

Key Fields:

  • token_endpoint: The OAuth 2.0 token endpoint URL.
  • authorization_endpoint: The OAuth 2.0 authorization endpoint URL.
  • redirect_uri: The URI where the authorization server redirects the user after granting authorization.
  • client_id: The client identifier issued by the authorization server.
  • device_authorization_endpoint: (Optional) The endpoint for the OAuth 2.0 Device Authorization Grant flow.
  • scopes: (Optional) A list of scopes to request during authentication (e.g., ["offline_access", "openid"]).
  • header_key: The HTTP header key used for sending the access token (defaults to "authorization").
  • audience: (Optional) The audience for which the access token is intended.

Configuration Overrides:

The with_override(other: ClientConfig) -> ClientConfig method allows merging two ClientConfig instances. Values from the other instance take precedence, enabling flexible configuration management where default settings can be overridden by more specific ones.

Credentials and KeyringStore: Token Management

Credentials:

The Credentials class stores authentication tokens and related information:

  • access_token: The primary token used for API requests.
  • refresh_token: (Optional) A token used to obtain new access tokens without re-authenticating the user.
  • expires_in: (Optional) The lifetime in seconds of the access token.
  • for_endpoint: The endpoint URL associated with these credentials.
  • id: A unique identifier for the credentials, computed as an MD5 hash of the access_token. This ID is used internally by authenticators to track token freshness.

KeyringStore:

The KeyringStore utility provides a secure and persistent way to manage Credentials using the system keyring. This prevents users from needing to re-authenticate frequently.

  • KeyringStore.store(credentials: Credentials) -> Credentials Stores the access_token and refresh_token (if present) in the system keyring. The for_endpoint value is used as the service name for keyring entries. This method logs errors if the keyring is unavailable but does not raise exceptions, allowing the application to continue without persistent caching.

  • KeyringStore.retrieve(for_endpoint: str) -> typing.Optional[Credentials] Retrieves stored access_token and refresh_token for a given for_endpoint from the system keyring. It returns a Credentials object if tokens are found, otherwise None.

  • KeyringStore.delete(for_endpoint: str) Removes all stored credentials associated with the specified for_endpoint from the system keyring.

Best Practice: Using KeyringStore is recommended for client applications to enhance user experience by securely caching authentication tokens.

Authenticator: The Foundation of Client Authentication

The Authenticator is an abstract base class that provides the core framework for managing authentication state, resolving configuration, and refreshing tokens. All specific authentication flows inherit from this class.

Initialization:

An Authenticator is initialized with:

  • endpoint: The target service endpoint.
  • cfg_store: An optional ClientConfigStore to fetch remote client configurations.
  • client_config: An optional local ClientConfig to override remote settings.
  • credentials: Optional initial Credentials. If not provided, KeyringStore.retrieve attempts to load existing credentials.
  • HTTP session parameters (http_session, http_proxy_url, verify, ca_cert_path, etc.).

Key Responsibilities:

  • Configuration Resolution (_resolve_config): Merges local ClientConfig with remote configuration fetched from cfg_store. This process is thread-safe and coroutine-safe, ensuring the configuration is resolved only once.
  • Credential Management: Provides get_credentials() to retrieve the current tokens and _set_credentials() to update them.
  • gRPC Metadata Generation (get_grpc_call_auth_metadata): Formats the access token into gRPC-compatible metadata (e.g., ("authorization", "Bearer <access_token>")) for secure gRPC calls. It also includes a creds_id for tracking.
  • Thread-Safe Credential Refresh (refresh_credentials): This critical method handles token expiry. It implements a double-check locking mechanism to ensure that only one thread or coroutine attempts to refresh credentials at a time, preventing redundant network calls. If a refresh is needed, it calls the abstract _do_refresh_credentials method implemented by subclasses. Upon successful refresh, it stores the new credentials in KeyringStore. If refresh fails, it deletes the old credentials from KeyringStore and re-raises the exception.

Subclass Implementation (_do_refresh_credentials):

Subclasses must implement the _do_refresh_credentials() abstract method. This method contains the specific logic for obtaining or refreshing an access token according to the chosen authentication flow. It should return a Credentials object.

PKCEAuthenticator: Interactive Browser-Based Authentication

The PKCEAuthenticator implements the OAuth 2.0 Proof Key for Code Exchange (PKCE) flow, which is suitable for public clients (like desktop or mobile apps) that cannot securely store a client secret. This flow typically involves opening a browser window for user interaction.

Use Case: User-facing applications where interactive login via a web browser is acceptable.

Flow:

  1. Generates a code_verifier and code_challenge.
  2. Attempts to refresh existing credentials using a refresh token.
  3. If refresh fails or no credentials exist, it initiates the PKCE flow:
    • Constructs an authorization URL with the code_challenge.
    • Opens the URL in a browser, prompting the user to log in.
    • After successful login, the authorization server redirects to a redirect_uri with an authorization code.
    • Exchanges the authorization code and code_verifier for an access_token and refresh_token at the token endpoint.
  4. Stores the new Credentials in KeyringStore.

Configuration: Requires client_id, redirect_uri, authorization_endpoint, token_endpoint, scopes, and audience to be defined in the ClientConfig. For persistent sessions, scopes should include "offline_access" or similar to obtain a refresh token.

from flyte.remote._client.auth.pkce import PKCEAuthenticator
from flyte.remote._client.auth.client_config import ClientConfig
import asyncio

async def authenticate_with_pkce():
# Example ClientConfig (these values would typically come from a backend configuration)
client_cfg = ClientConfig(
token_endpoint="https://your-auth-server.com/oauth/token",
authorization_endpoint="https://your-auth-server.com/authorize",
redirect_uri="http://localhost:8080/callback",
client_id="YOUR_CLIENT_ID",
scopes=["openid", "profile", "email", "offline_access"],
audience="https://your-api-audience.com"
)

# Initialize PKCEAuthenticator
authenticator = PKCEAuthenticator(
endpoint="your-flyte-endpoint.com",
client_config=client_cfg,
# cfg_store=... (if remote config is used)
)

# Force a refresh to initiate the flow or use existing tokens
await authenticator.refresh_credentials()
creds = authenticator.get_credentials()
if creds:
print(f"PKCE Authentication successful. Access Token ID: {creds.id}")
else:
print("PKCE Authentication failed.")

# asyncio.run(authenticate_with_pkce())

DeviceCodeAuthenticator: Headless Device Authentication

The DeviceCodeAuthenticator implements the OAuth 2.0 Device Authorization Grant flow, designed for input-constrained devices or command-line applications where a web browser is not readily available for direct interaction.

Use Case: CLI tools, IoT devices, or other headless environments.

Flow:

  1. Attempts to refresh existing credentials using a refresh token.
  2. If refresh fails or no credentials exist, it initiates the device code flow:
    • Requests a device code from the device_authorization_endpoint.
    • Displays a user code and a verification URI to the user.
    • Prompts the user to visit the URI on a separate device (e.g., their computer or phone) and enter the user code.
    • Polls the token endpoint until the user completes authentication or the device code expires.
  3. Upon successful authentication, it obtains an access_token and refresh_token.
  4. Stores the new Credentials in KeyringStore.

Configuration: Requires device_authorization_endpoint, token_endpoint, client_id, scopes, and audience to be defined in the ClientConfig. The device_authorization_endpoint must be configured on the backend.

from flyte.remote._client.auth.device_code import DeviceCodeAuthenticator
from flyte.remote._client.auth.client_config import ClientConfig
import asyncio

async def authenticate_with_device_code():
# Example ClientConfig
client_cfg = ClientConfig(
token_endpoint="https://your-auth-server.com/oauth/token",
device_authorization_endpoint="https://your-auth-server.com/oauth/device/code",
client_id="YOUR_CLIENT_ID_FOR_DEVICE_FLOW",
scopes=["openid", "profile", "offline_access"],
audience="https://your-api-audience.com"
)

# Initialize DeviceCodeAuthenticator
authenticator = DeviceCodeAuthenticator(
endpoint="your-flyte-endpoint.com",
client_config=client_cfg,
)

# Force a refresh to initiate the flow or use existing tokens
await authenticator.refresh_credentials()
creds = authenticator.get_credentials()
if creds:
print(f"Device Code Authentication successful. Access Token ID: {creds.id}")
else:
print("Device Code Authentication failed.")

# asyncio.run(authenticate_with_device_code())

ClientCredentialsAuthenticator: Machine-to-Machine Authentication

The ClientCredentialsAuthenticator implements the OAuth 2.0 Client Credentials Grant flow, designed for machine-to-machine authentication where a client (e.g., a service, daemon, or another application) needs to access resources without user involvement.

Use Case: Service accounts, background services, or applications authenticating directly with a client ID and secret.

Flow:

  1. Uses the provided client_id and client_credentials_secret to construct a basic authorization header.
  2. Sends a request to the token_endpoint with the authorization header and grant type client_credentials.
  3. Receives an access_token (and optionally refresh_token and expires_in).
  4. Stores the new Credentials in KeyringStore.

Configuration: Requires client_id and client_credentials_secret to be passed directly during initialization. The token_endpoint, scopes, and audience are typically defined in the ClientConfig.

Security Consideration: The client_credentials_secret is a sensitive credential and should be handled securely, for example, by loading it from environment variables or a secret management system, rather than hardcoding it.

from flyte.remote._client.auth.client_credentials import ClientCredentialsAuthenticator
from flyte.remote._client.auth.client_config import ClientConfig
import asyncio
import os

async def authenticate_with_client_credentials():
# Load client ID and secret securely (e.g., from environment variables)
client_id = os.getenv("FLYTE_CLIENT_ID", "YOUR_CLIENT_ID")
client_secret = os.getenv("FLYTE_CLIENT_SECRET", "YOUR_CLIENT_SECRET")

# Example ClientConfig
client_cfg = ClientConfig(
token_endpoint="https://your-auth-server.com/oauth/token",
client_id=client_id, # Note: client_id is also passed to the authenticator directly
scopes=["api_scope_1", "api_scope_2"],
audience="https://your-api-audience.com"
)

# Initialize ClientCredentialsAuthenticator
authenticator = ClientCredentialsAuthenticator(
endpoint="your-flyte-endpoint.com",
client_id=client_id,
client_credentials_secret=client_secret,
client_config=client_cfg,
)

# Force a refresh to obtain new tokens
await authenticator.refresh_credentials()
creds = authenticator.get_credentials()
if creds:
print(f"Client Credentials Authentication successful. Access Token ID: {creds.id}")
else:
print("Client Credentials Authentication failed.")

# asyncio.run(authenticate_with_client_credentials())

AsyncCommandAuthenticator: Custom Token Retrieval

The AsyncCommandAuthenticator provides a flexible mechanism to obtain an access token by executing an external command. This is useful for integrating with custom identity providers, existing authentication tools, or complex token generation processes.

Use Case: When standard OAuth 2.0 flows are not sufficient, and an external script or tool can provide the access token.

Flow:

  1. Executes the specified command as a subprocess.
  2. Captures the standard output (stdout) of the command.
  3. Treats the stdout content (stripped of whitespace) as the access_token.
  4. Creates Credentials with the obtained token.
  5. Stores the new Credentials in KeyringStore.

Configuration: Requires a command (a list of strings representing the command and its arguments) during initialization.

Considerations:

  • The command must output a valid access token to stdout.
  • Error handling is included: if the command returns a non-zero exit code or raises an exception, an AuthenticationError is raised with debugging information.
  • The command is executed asynchronously using asyncio.create_subprocess_exec.
from flyte.remote._client.auth.command import AsyncCommandAuthenticator
import asyncio
import os

async def authenticate_with_command():
# Example: A hypothetical command that outputs an access token
# In a real scenario, this might be 'gcloud auth print-access-token' or a custom script.
# For demonstration, we'll use a simple echo command.
# Ensure the command is executable and outputs the token to stdout.
command = ["echo", "my_custom_access_token_from_command"]

# Initialize AsyncCommandAuthenticator
authenticator = AsyncCommandAuthenticator(
endpoint="your-flyte-endpoint.com",
command=command,
)

# Force a refresh to execute the command and obtain tokens
await authenticator.refresh_credentials()
creds = authenticator.get_credentials()
if creds:
print(f"Command Authentication successful. Access Token: {creds.access_token}")
else:
print("Command Authentication failed.")

# asyncio.run(authenticate_with_command())