Skip to content

Instantly share code, notes, and snippets.

@tmck-code
Last active June 28, 2024 04:19
Show Gist options
  • Select an option

  • Save tmck-code/3f393a22e2639526b9f44db1c7d7742f to your computer and use it in GitHub Desktop.

Select an option

Save tmck-code/3f393a22e2639526b9f44db1c7d7742f to your computer and use it in GitHub Desktop.
Buildkite Pipeline summary
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import os, csv, json, sys
from dataclasses import dataclass, field
from datetime import datetime, tzinfo, timedelta, timezone
import signal
import time
from typing import List, Dict
import dateutil.relativedelta
import dateutil.parser
from pybuildkite.buildkite import Buildkite, BuildState
from pybuildkite.builds import Builds
import requests
from tabulate import tabulate
STATES = {
'failed': BuildState.FAILED,
'finished': BuildState.FINISHED,
'running': BuildState.RUNNING,
'canceled': BuildState.CANCELED,
}
@dataclass
class Timeout:
limit: int
def _timeout_handler(self, signum, frame):
raise TimeoutError(f'took longer than {self.limit}')
def __enter__(self):
signal.signal(signal.SIGALRM, self._timeout_handler)
signal.alarm(self.limit)
def __exit__(self, exc_type, exc_val, exc_tb):
signal.alarm(0)
@dataclass
class BKPipelineHelper:
access_token: str
org_name: str
pipeline_name: str = ''
client: Buildkite = field(init=False)
def __post_init__(self):
self.client = Buildkite()
self.client.set_access_token(self.access_token)
def _request_page(self, args: list=[], kwargs: dict={}, retries: int=5, timeout: int=30):
for i in range(retries):
try:
with Timeout(timeout):
return self.client.builds().list_all_for_pipeline(*args, **kwargs)
except (TimeoutError, requests.exceptions.ReadTimeout) as e:
print(f'exceeded timeout of {timeout} seconds, retrying {i+1}/{retries}', file=sys.stderr)
time.sleep(1)
else:
print(f'failed to get builds for {self.pipeline_name} after {retries} retries', file=sys.stderr)
raise Exception('failed to get builds')
def search_builds(self, metadata: dict = {}, states: list = [], limit: int = 100) -> BKBuildCollection:
'Searches pipeline builds matching given metadata key/value pairs'
print(f'\n- searching "{self.pipeline_name}" for builds with metadata: {metadata}, states: {states}', file=sys.stderr)
builds = []
next_page = 1
while next_page and len(builds) < limit:
response = self._request_page(
args=[self.org_name, self.pipeline_name],
kwargs={
'meta_data': metadata,
'states': states,
'page': next_page,
'with_pagination': True,
},
)
print(next_page, len(response.body), file=sys.stderr)
for b in response.body:
builds.append(b)
if len(builds) >= limit:
break
next_page=response.next_page
return BKBuildCollection(builds)
def format_duration(created_at, finished_at):
delta = dateutil.relativedelta.relativedelta(
dateutil.parser.isoparse(created_at),
dateutil.parser.isoparse(finished_at),
)
keys = ['days', 'hours', 'minutes', 'seconds']
return ':'.join([f'{delta.__dict__[k]:>02d}' for k in keys])
def pretty_format_state(state: str):
if state == 'passed':
return f'\033[0;32m{state}\033[0;0m'
elif state == 'failed':
return f'\033[0;31m{state}\033[0;0m'
elif state == 'canceled':
return f'\033[0;0;3m{state}\033[0;0m'
return state
@dataclass
class BKBuildCollection:
builds: List[Dict]
def __init__(self, builds):
self.builds = builds
def jobs_with_states(self, job_states):
for build in self.builds:
for job in build['jobs']:
if job.get('state', None) in job_states:
yield build['id'], job
def __iter__(self):
'Allows iteration e.g. for build in build_collection:'
yield from self.builds
def as_json(self):
return json.dumps(self.builds, indent=2)
def as_csv(self, delim='\t', meta_data = []):
rows = []
for i, row in enumerate(self.as_table(meta_data)):
if i == 0:
rows.append(delim.join(list(map(str, row.keys()))))
rows.append(delim.join(list(map(str, row.values()))))
return '\n'.join(rows)
def as_table(self, env = [], meta_data = []):
rows = []
for b in self.builds:
finished_at = b['finished_at'] or datetime.now().astimezone().isoformat()
rows.append({
# 'pipeline': b['pipeline']['slug'],
'state': pretty_format_state(b['state']),
'created_at': dateutil.parser.isoparse(b['created_at']).astimezone().strftime('%Y%m%d %H:%M:%S%z'),
'finished_at': dateutil.parser.isoparse(finished_at).astimezone().strftime('%Y%m%d %H:%M:%S%z'),
'duration': format_duration(finished_at, b['created_at']),
'commit': b['commit'][:8],
'web_url': b['web_url'],
**{k: b['env'].get(k, '') for k in env},
**{k: b['meta_data'].get(k, '') for k in meta_data},
})
return rows
def create_new_builds(builds, args):
print("retry listed builds? this will create a new build using master HEAD, with the same ENV vars")
input()
for i, b in enumerate(builds):
kwargs = {
'organization': args['org'],
'pipeline': args['pipeline'],
'commit': 'HEAD',
'branch': 'master',
'message': b['message'],
'env': b['env'],
}
print(i, kwargs)
rerun = input()
if rerun == 'y':
response = helper.client.builds().create_build(**kwargs)
print(response['web_url'], "\n")
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Buildkite helper!')
parser.add_argument('--org', help='buildkite organization name', required=True)
parser.add_argument('--pipeline', help='pipeline slug', required=True)
parser.add_argument('--state', help='pipeline final state', required=True, choices=list(STATES.keys()))
parser.add_argument('--limit', help='the max number of jobs to fetch', type=int, default=100)
parser.add_argument('--metadata', help='metadata to search', type=json.loads, default='{}')
parser.add_argument('--metadata-keys', help='metadata to display', type=json.loads, default='[]')
parser.add_argument('--env-keys', help='ENVs to display', type=json.loads, default='[]')
parser.add_argument('--format', help='output format', type=str, default='table')
parser.add_argument('--create-new-builds', help='create new builds for all listed builds', action='store_true', default=False)
args = parser.parse_args().__dict__
print(args, file=sys.stderr)
helper = BKPipelineHelper(
access_token = os.environ['BUILDKITE_ACCESS_TOKEN'],
org_name = args['org'],
pipeline_name = args.get('pipeline', ''),
)
builds = helper.search_builds(
states = [STATES[args['state']]],
limit = args['limit'],
metadata = args['metadata'],
)
if args['format'] == 'table':
print(tabulate(builds.as_table(args['env_keys'], args['metadata_keys']), headers='keys'))
elif args['format'] == 'csv':
print(builds.as_csv('\t', args['env_keys']+ args['metadata_keys']))
elif args['format'] == 'json':
print(builds.as_json())
elif args['format'] == 'silent':
pass
else:
print(f'invalid format! {args}')
if args['create_new_builds']:
create_new_builds(builds, args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment