Skip to content

Instantly share code, notes, and snippets.

@konstantin-kornienko
Created February 10, 2022 08:24
Show Gist options
  • Select an option

  • Save konstantin-kornienko/167a1d5a3b11ae7aa9fd314ae5424e85 to your computer and use it in GitHub Desktop.

Select an option

Save konstantin-kornienko/167a1d5a3b11ae7aa9fd314ae5424e85 to your computer and use it in GitHub Desktop.
JumpCloud instance assignments automation
#!/bin/python3
#
# Install jcapiv2 and diskcache to use this
# pip install git+https://github.com/TheJumpCloud/jcapi-python.git#subdirectory=jcapiv2 diskcache
#
import re
import time
import requests
import jcapiv2
import logging
import argparse
from datetime import datetime, timezone
from diskcache import Cache
DELETE_SYSTEM_OLDER_THAN_SECONDS = 6*60*60
JSON_CT = 'application/json'
JC_RECORDS_LIMIT = 100
logger = None
args = None
class Stats():
def __init__(self) -> None:
self.deleted = 0
self.up_to_date = 0
self.assigned = 0
def __str__(self) -> str:
return f'deleted: {self.deleted}\tassigned: {self.assigned}\tup to date: {self.up_to_date}'
class Automation():
CACHE_DIR = '/tmp/jc_cache'
CACHE_GROUPS_SEC = 8*60*60
def __init__(self, name: str, api_key: str, managed_hosts_masks: list, assign_rules: dict) -> None:
self.system_groups = None
self.api = {}
self.name = name
self.api_key = api_key
self.managed_systems_regex = [re.compile('.*\\b' + x + '\\b') for x in managed_hosts_masks]
self.assign_rules = assign_rules
self.stats = Stats()
self.cache = None
def cache_get(self, key: str):
if self.cache is None:
self.cache = Cache(self.CACHE_DIR)
return self.cache.get(f'{self.name}_{key}')
def cache_set(self, key: str, value):
if self.cache is None:
self.cache = Cache(self.CACHE_DIR)
return self.cache.set(f'{self.name}_{key}', value, expire=self.CACHE_GROUPS_SEC)
def get_all_system_groups(self):
cache_key = 'system_groups'
self.system_groups = self.cache_get(cache_key)
if self.system_groups is not None:
logger.info(f'Got info about {len(self.system_groups)} system groups from cache')
else:
self.system_groups = {}
response = self.api['groups'].groups_list(JSON_CT, JSON_CT, limit=JC_RECORDS_LIMIT, filter=['type:eq:system_group'])
for group in response:
self.system_groups[group.name] = group.id
if self.system_groups:
self.cache_set(cache_key, self.system_groups)
logger.info(f'Got info about {len(self.system_groups)} system groups')
def get_system_member_groups(self, system_id):
cache_key = f'system_member_groups_{system_id}'
system_member_groups = self.cache_get(cache_key)
if system_member_groups is None:
system_member_groups = self.api['systems'].graph_system_member_of(system_id, JSON_CT, JSON_CT, filter=['type:eq:system_group'], limit=JC_RECORDS_LIMIT)
if system_member_groups:
self.cache_set(cache_key, system_member_groups)
return system_member_groups
def delete_system(self, host: dict):
if not args.test:
result = requests.delete(
f'https://console.jumpcloud.com/api/systems/{host["_id"]}',
headers={
'x-api-key': self.api['systems'].api_client.configuration.api_key['x-api-key'],
'Accept': JSON_CT,
}
)
result.raise_for_status()
self.stats.deleted += 1
# logger.debug(f' delete response: {result.json()}')
def assign_system(self, host: dict):
host_name = host['hostname']
group_name = next((
group_name for regex, group_name in self.assign_rules.items() if re.match(regex, host_name)
), None)
if group_name is None:
raise RuntimeError('Cannot determine group to assign host {host_name} to')
group_id = self.system_groups.get(group_name, None)
if not group_id:
raise RuntimeError('Cannot get group id for {host_name} to')
# add host to system group
body = jcapiv2.SystemGroupMembersReq(op='add', type='system', id=host['_id'])
if not args.test:
self.api['graph'].graph_system_group_members_post(group_id, JSON_CT, JSON_CT, body=body)
logger.warning(f'Host {host_name} added to group {group_name}')
self.stats.assigned += 1
def process_host(self, host: dict):
host_name = host['hostname']
host_id = host['_id']
# logger.debug(f'Processing host {host_name} with id {host_id}')
managed_host = next((
True for regex in self.managed_systems_regex if regex.match(host_name)
), False)
if not managed_host:
# logger.debug(f' skipping, {host_name} is not managed')
return
last_contact = host['lastContact']
if not last_contact:
logger.debug(f'host {host_name} was never yet contacted, skipping')
return
last_seen_seconds = (
datetime.now(timezone.utc)
- datetime.fromisoformat(last_contact.replace("Z", "+00:00"))
).total_seconds()
if last_seen_seconds > DELETE_SYSTEM_OLDER_THAN_SECONDS:
self.delete_system(host)
logger.warning(f'Deleted host {host_name}, last seen {last_seen_seconds/60/60/24:.2f} days ago')
return
groups = self.get_system_member_groups(host_id)
if groups:
logger.info(f'host {host_name} is already assigned to at least one group')
self.stats.up_to_date += 1
return
logger.debug(f'host {host_name} is not assigned to groups, fixing')
self.assign_system(host)
def process(self):
configuration = jcapiv2.Configuration()
configuration.api_key['x-api-key'] = self.api_key
client = jcapiv2.ApiClient(configuration)
# create an instance of the API class
logger.info('Connecting to JC')
self.api['systems'] = jcapiv2.SystemsApi(client)
self.api['groups'] = jcapiv2.GroupsApi(client)
self.api['graph'] = jcapiv2.GraphApi(client)
self.get_all_system_groups()
# Process hosts
hosts = []
while True:
response = requests.get(
f'https://console.jumpcloud.com/api/systems?limit={JC_RECORDS_LIMIT}&skip={len(hosts)}&fields=lastContact&fields=hostname&sort=hostname',
headers={
'x-api-key': configuration.api_key['x-api-key'],
'Accept': JSON_CT,
}
)
response.raise_for_status()
response_hosts = response.json()['results']
results_count = len(response_hosts)
logger.debug(f'Got {results_count} hosts')
if results_count == 0:
break
hosts += response_hosts
for host in hosts:
self.process_host(host)
logger.info(f'Processed {len(hosts)} hosts')
client.__del__() # otherways it want' be called and main thread will hang
self.cache.close()
def parse_args():
global args
parser = argparse.ArgumentParser()
parser.add_argument('--debug', help='Debug mode', action='store_true', required=False)
parser.add_argument('--test', help='Test mode', action='store_true', required=False)
args = parser.parse_args()
def init_logging():
global logger
logger = logging.getLogger('jc')
logger.setLevel(logging.DEBUG)
log_handler = logging.StreamHandler()
log_handler.setFormatter(logging.Formatter('%(asctime)s [%(name)s] [%(levelname)s] %(message)s'))
logging.getLogger().addHandler(log_handler)
if __name__ == "__main__":
parse_args()
init_logging()
automations = [
Automation(
name='NonProd',
api_key='...',
managed_hosts_masks=[
'grafana',
'kibana',
'logstash',
'es-(master|data)',
'vm',
],
assign_rules={
'^nonprod-': 'Tools-NonProd',
}
)
]
start_time = time.time()
for automation in automations:
automation.process()
logger.info("Finished in {:.2f} seconds".format(time.time() - start_time))
for automation in automations:
logger.info(f'{automation.name}: {automation.stats}')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment