- Tokens must be tracked per service, only updated tokens must be sent to task managers
- Dynamic token initialization from user code must be added
package org.apache.flink.api.common.security;
/**
* Manager for obtaining delegation tokens dynamically at runtime.
* This allows user code to request tokens for services not known at cluster startup.
*/
@PublicEvolving
public interface DynamicDelegationTokenManager {
/**
* Request delegation tokens for a service.
*
* @param serviceName Name identifying the service (must match provider's serviceName())
* @param serviceConfig Configuration needed to obtain tokens
* @return Handle to obtained tokens
* @throws Exception if token acquisition fails
*/
DelegationTokenHandle requestTokens(
String serviceName,
Map<String, String> serviceConfig
) throws Exception;
/**
* Request delegation tokens with timeout.
*/
DelegationTokenHandle requestTokens(
String serviceName,
Map<String, String> serviceConfig,
long timeoutMs
) throws Exception;
}
/**
* Opaque handle to delegation tokens.
*/
@PublicEvolving
public interface DelegationTokenHandle {
String getTokenId();
boolean isValid();
long getExpirationTime();
}
---
How User Code Uses It
1. Access via RuntimeContext
// Add to RuntimeContext interface
public interface RuntimeContext {
/**
* Get the dynamic delegation token manager for requesting tokens at runtime.
*
* @return Dynamic token manager, or null if not available
*/
@Nullable
@Experimental
DynamicDelegationTokenManager getDynamicDelegationTokenManager();
}
// Iceberg Connector Usage
public class FlinkCatalogFactory implements CatalogFactory {
@Override
public Catalog createCatalog(
String name,
Map<String, String> properties,
ReadableConfig readableConfig) {
String hmsUri = properties.get("uri");
// Try to obtain delegation tokens dynamically
RuntimeContext runtimeContext = getRuntimeContext();
if (runtimeContext != null) {
DynamicDelegationTokenManager tokenManager =
runtimeContext.getDynamicDelegationTokenManager();
if (tokenManager != null) {
try {
// Request tokens for Iceberg service
Map<String, String> serviceConfig = new HashMap<>();
serviceConfig.put("hive.metastore.uri", hmsUri);
serviceConfig.put("hive.metastore.kerberos.principal",
properties.getOrDefault("kerberos.principal", "hive/_HOST@REALM"));
// This triggers token acquisition!
DelegationTokenHandle tokenHandle = tokenManager.requestTokens(
"iceberg", // serviceName - must match IcebergDelegationTokenProvider.serviceName()
serviceConfig
);
LOG.info("Obtained delegation tokens for HMS: {}, valid until: {}",
hmsUri,
new Date(tokenHandle.getExpirationTime()));
} catch (Exception e) {
LOG.warn("Failed to obtain delegation tokens for HMS: {}, will try without",
hmsUri, e);
}
} else {
LOG.debug("DynamicDelegationTokenManager not available, " +
"tokens must be configured statically");
}
}
// Create catalog - tokens are already injected into Hadoop config by Flink!
return new HiveCatalog(...);
}
}