Skip to content

Instantly share code, notes, and snippets.

@gaborgsomogyi
Created November 29, 2025 21:31
Show Gist options
  • Select an option

  • Save gaborgsomogyi/e62c8d7ed5f15dc4a8cb1d51ec0dd1e0 to your computer and use it in GitHub Desktop.

Select an option

Save gaborgsomogyi/e62c8d7ed5f15dc4a8cb1d51ec0dd1e0 to your computer and use it in GitHub Desktop.

Tasks

  • Tokens must be tracked per service, only updated tokens must be sent to task managers
  • Dynamic token initialization from user code must be added
  package org.apache.flink.api.common.security;

  /**
   * Manager for obtaining delegation tokens dynamically at runtime.
   * This allows user code to request tokens for services not known at cluster startup.
   */
  @PublicEvolving
  public interface DynamicDelegationTokenManager {

      /**
       * Request delegation tokens for a service.
       * 
       * @param serviceName Name identifying the service (must match provider's serviceName())
       * @param serviceConfig Configuration needed to obtain tokens
       * @return Handle to obtained tokens
       * @throws Exception if token acquisition fails
       */
      DelegationTokenHandle requestTokens(
          String serviceName,
          Map<String, String> serviceConfig
      ) throws Exception;

      /**
       * Request delegation tokens with timeout.
       */
      DelegationTokenHandle requestTokens(
          String serviceName,
          Map<String, String> serviceConfig,
          long timeoutMs
      ) throws Exception;
  }

  /**
   * Opaque handle to delegation tokens.
   */
  @PublicEvolving
  public interface DelegationTokenHandle {
      String getTokenId();
      boolean isValid();
      long getExpirationTime();
  }

  ---
  How User Code Uses It

  1. Access via RuntimeContext

  // Add to RuntimeContext interface
  public interface RuntimeContext {

      /**
       * Get the dynamic delegation token manager for requesting tokens at runtime.
       * 
       * @return Dynamic token manager, or null if not available
       */
      @Nullable
      @Experimental
      DynamicDelegationTokenManager getDynamicDelegationTokenManager();
  }

  // Iceberg Connector Usage
  public class FlinkCatalogFactory implements CatalogFactory {

      @Override
      public Catalog createCatalog(
              String name,
              Map<String, String> properties,
              ReadableConfig readableConfig) {

          String hmsUri = properties.get("uri");

          // Try to obtain delegation tokens dynamically
          RuntimeContext runtimeContext = getRuntimeContext();
          if (runtimeContext != null) {
              DynamicDelegationTokenManager tokenManager =
                  runtimeContext.getDynamicDelegationTokenManager();

              if (tokenManager != null) {
                  try {
                      // Request tokens for Iceberg service
                      Map<String, String> serviceConfig = new HashMap<>();
                      serviceConfig.put("hive.metastore.uri", hmsUri);
                      serviceConfig.put("hive.metastore.kerberos.principal",
                                       properties.getOrDefault("kerberos.principal", "hive/_HOST@REALM"));

                      // This triggers token acquisition!
                      DelegationTokenHandle tokenHandle = tokenManager.requestTokens(
                          "iceberg",  // serviceName - must match IcebergDelegationTokenProvider.serviceName()
                          serviceConfig
                      );

                      LOG.info("Obtained delegation tokens for HMS: {}, valid until: {}",
                               hmsUri,
                               new Date(tokenHandle.getExpirationTime()));

                  } catch (Exception e) {
                      LOG.warn("Failed to obtain delegation tokens for HMS: {}, will try without",
                               hmsUri, e);
                  }
              } else {
                  LOG.debug("DynamicDelegationTokenManager not available, " +
                           "tokens must be configured statically");
              }
          }

          // Create catalog - tokens are already injected into Hadoop config by Flink!
          return new HiveCatalog(...);
      }
  }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment