|
import os |
|
import time |
|
import json |
|
import logging |
|
|
|
from typing import Any |
|
|
|
import boto3 |
|
import requests |
|
|
|
os.environ['AWS_DEFAULT_PROFILE'] = 'jason' |
|
os.environ['AWS_DEFAULT_REGION'] = 'eu-west-1' |
|
|
|
LOG_LEVEL = os.environ.get('LOG_LEVEL', 'info') |
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(name)s - %(levelname)s - %(message)s' |
|
) |
|
|
|
logger = logging.getLogger(__name__) |
|
logger.setLevel(LOG_LEVEL.upper()) |
|
|
|
|
|
class ConfigFetchBase: |
|
""" |
|
Base class for configuration fetching with refresh functionality. |
|
""" |
|
|
|
def __init__(self, application, environment, profile, refresh_interval: int = 60) -> None: |
|
""" |
|
Initialize the ConfigFetchBase instance. |
|
|
|
:param application: The application name |
|
:param environment: The environment (e.g., dev, prod) |
|
:param profile: The configuration profile |
|
:param refresh_interval: Time in seconds between config refreshes (default: 60) |
|
""" |
|
|
|
self._application = application |
|
self._environment = environment |
|
self._profile = profile |
|
|
|
self._refresh_refresh_interval = refresh_interval |
|
self._next_poll_interval_seconds = refresh_interval |
|
|
|
self._last_fetch_time = -1 # Initialize to -1 to ensure first fetch |
|
self._last_cached_response = {} # Cache for the last fetched configuration |
|
|
|
def can_refresh(self): |
|
""" |
|
Check if it's time to refresh the configuration. |
|
|
|
:return: True if refresh is needed, False otherwise |
|
""" |
|
if self._last_fetch_time < 0: |
|
return True # Always refresh if it's the first time |
|
|
|
current_time = int(time.time()) |
|
return (current_time - self._last_fetch_time) > self._next_poll_interval_seconds |
|
|
|
def mark_last_fetch_time(self, last_fetch_time: int = -1): |
|
""" |
|
Update the last fetch time. |
|
|
|
:param last_fetch_time: The timestamp to set (default: -1, which uses current time) |
|
""" |
|
self._last_fetch_time = self._last_fetch_time if last_fetch_time != -1 else int(time.time()) |
|
|
|
def fetch_config(self): |
|
""" |
|
Fetch the configuration. This method should be implemented by subclasses. |
|
|
|
:raises NotImplementedError: If not implemented in a subclass |
|
""" |
|
raise NotImplementedError(f'Must implement fetch_config for class {self.__class__.__name__}') |
|
|
|
|
|
|
|
class ConfigFetchDirect(ConfigFetchBase): |
|
""" |
|
Class for fetching configuration directly from AWS AppConfig. |
|
""" |
|
|
|
def __init__(self, application, environment, profile, refresh_interval: int = 60) -> None: |
|
""" |
|
Initialize the ConfigFetchDirect instance. |
|
|
|
:param application: The application name |
|
:param environment: The environment (e.g., dev, prod) |
|
:param profile: The configuration profile |
|
:param refresh_interval: Time in seconds between config refreshes (default: 60) |
|
""" |
|
super().__init__(application, environment, profile, refresh_interval) |
|
|
|
self._client = boto3.client('appconfigdata') |
|
|
|
# Start a configuration session with AWS AppConfig |
|
resp = self._client.start_configuration_session( |
|
ApplicationIdentifier=application, |
|
EnvironmentIdentifier=environment, |
|
ConfigurationProfileIdentifier=profile, |
|
RequiredMinimumPollIntervalInSeconds=refresh_interval |
|
) |
|
|
|
self._configuration_token = resp['InitialConfigurationToken'] |
|
|
|
def fetch_config(self) -> dict: |
|
""" |
|
Fetch the latest configuration from AWS AppConfig. |
|
|
|
:return: The fetched configuration as a dictionary |
|
""" |
|
|
|
if not self.can_refresh(): |
|
return self._last_cached_response |
|
|
|
resp = self._client.get_latest_configuration( |
|
ConfigurationToken=self._configuration_token |
|
) |
|
|
|
self.mark_last_fetch_time() |
|
|
|
# Update token and poll interval for next fetch |
|
self._configuration_token = resp['NextPollConfigurationToken'] |
|
self._next_poll_interval_seconds = resp['NextPollIntervalInSeconds'] |
|
|
|
# Parse and cache the configuration |
|
configuration = json.loads(resp['Configuration'].read()) |
|
self._last_cached_response = configuration |
|
|
|
return configuration |
|
|
|
|
|
class ConfigFetchAgent(ConfigFetchBase): |
|
""" |
|
Class for fetching configuration from a local AppConfig Agent. |
|
""" |
|
|
|
def __init__(self, application, environment, profile, refresh_interval: int = 60, base_url: str= '') -> None: |
|
""" |
|
Initialize the ConfigFetchAgent instance. |
|
|
|
:param application: The application name |
|
:param environment: The environment (e.g., dev, prod) |
|
:param profile: The configuration profile |
|
:param refresh_interval: Time in seconds between config refreshes (default: 60) |
|
:param base_url: The base URL of the AppConfig Agent (default: 'http://localhost:2772') |
|
""" |
|
self._base_url = base_url if base_url != '' else 'http://localhost:2772' |
|
super().__init__(application, environment, profile, refresh_interval) |
|
|
|
def fetch_config(self) -> dict: |
|
""" |
|
Fetch the latest configuration from the AppConfig Agent. |
|
|
|
:return: The fetched configuration as a dictionary |
|
""" |
|
if not self.can_refresh(): |
|
return self._last_cached_response |
|
|
|
# Make a GET request to the AppConfig Agent |
|
resp = requests.get(f'{self._base_url}/applications/{self._application}/environments/{self._environment}/configurations/{self._profile}') |
|
|
|
self.mark_last_fetch_time() |
|
|
|
# Parse and cache the configuration |
|
configuration = resp.json() |
|
self._last_cached_response = configuration |
|
|
|
return configuration |
|
|
|
|
|
class DynamicConfig(dict): |
|
""" |
|
A dictionary-like class that automatically updates its configuration. |
|
""" |
|
|
|
def __init__(self, application, environment, profile, fetch_type='direct', refresh_interval: int = 60, **kwargs): |
|
""" |
|
Initialize the DynamicConfig instance. |
|
|
|
:param application: The application name |
|
:param environment: The environment (e.g., dev, prod) |
|
:param profile: The configuration profile |
|
:param fetch_type: The type of fetcher to use ('direct' or 'agent', default: 'direct') |
|
:param refresh_interval: Time in seconds between config refreshes (default: 60) |
|
""" |
|
self._type = fetch_type |
|
self._config = {} |
|
|
|
# Choose the appropriate fetcher based on fetch_type |
|
if fetch_type == 'agent': |
|
self._fetcher = ConfigFetchAgent(application, environment, profile, refresh_interval=refresh_interval, **kwargs) |
|
else: |
|
self._fetcher = ConfigFetchDirect(application, environment, profile, refresh_interval=refresh_interval, **kwargs) |
|
|
|
self._config = self._fetcher.fetch_config() |
|
|
|
def __getitem__(self, key: Any) -> Any: |
|
""" |
|
Get an item from the configuration, refreshing it first. |
|
|
|
:param key: The key to look up in the configuration |
|
:return: The value associated with the key |
|
""" |
|
self._config = self._fetcher.fetch_config() |
|
return self._config[key] |
|
|
|
def __repr__(self) -> str: |
|
""" |
|
Return a string representation of the configuration. |
|
|
|
:return: A string representation of the configuration |
|
""" |
|
return self._config.__repr__() |
|
|
|
def __str__(self) -> str: |
|
""" |
|
Return a string version of the configuration. |
|
|
|
:return: A string version of the configuration |
|
""" |
|
return self._config.__str__() |