Last active
June 28, 2024 04:19
-
-
Save tmck-code/3f393a22e2639526b9f44db1c7d7742f to your computer and use it in GitHub Desktop.
Buildkite Pipeline summary
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| from __future__ import annotations | |
| import argparse | |
| import os, csv, json, sys | |
| from dataclasses import dataclass, field | |
| from datetime import datetime, tzinfo, timedelta, timezone | |
| import signal | |
| import time | |
| from typing import List, Dict | |
| import dateutil.relativedelta | |
| import dateutil.parser | |
| from pybuildkite.buildkite import Buildkite, BuildState | |
| from pybuildkite.builds import Builds | |
| import requests | |
| from tabulate import tabulate | |
| STATES = { | |
| 'failed': BuildState.FAILED, | |
| 'finished': BuildState.FINISHED, | |
| 'running': BuildState.RUNNING, | |
| 'canceled': BuildState.CANCELED, | |
| } | |
| @dataclass | |
| class Timeout: | |
| limit: int | |
| def _timeout_handler(self, signum, frame): | |
| raise TimeoutError(f'took longer than {self.limit}') | |
| def __enter__(self): | |
| signal.signal(signal.SIGALRM, self._timeout_handler) | |
| signal.alarm(self.limit) | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| signal.alarm(0) | |
| @dataclass | |
| class BKPipelineHelper: | |
| access_token: str | |
| org_name: str | |
| pipeline_name: str = '' | |
| client: Buildkite = field(init=False) | |
| def __post_init__(self): | |
| self.client = Buildkite() | |
| self.client.set_access_token(self.access_token) | |
| def _request_page(self, args: list=[], kwargs: dict={}, retries: int=5, timeout: int=30): | |
| for i in range(retries): | |
| try: | |
| with Timeout(timeout): | |
| return self.client.builds().list_all_for_pipeline(*args, **kwargs) | |
| except (TimeoutError, requests.exceptions.ReadTimeout) as e: | |
| print(f'exceeded timeout of {timeout} seconds, retrying {i+1}/{retries}', file=sys.stderr) | |
| time.sleep(1) | |
| else: | |
| print(f'failed to get builds for {self.pipeline_name} after {retries} retries', file=sys.stderr) | |
| raise Exception('failed to get builds') | |
| def search_builds(self, metadata: dict = {}, states: list = [], limit: int = 100) -> BKBuildCollection: | |
| 'Searches pipeline builds matching given metadata key/value pairs' | |
| print(f'\n- searching "{self.pipeline_name}" for builds with metadata: {metadata}, states: {states}', file=sys.stderr) | |
| builds = [] | |
| next_page = 1 | |
| while next_page and len(builds) < limit: | |
| response = self._request_page( | |
| args=[self.org_name, self.pipeline_name], | |
| kwargs={ | |
| 'meta_data': metadata, | |
| 'states': states, | |
| 'page': next_page, | |
| 'with_pagination': True, | |
| }, | |
| ) | |
| print(next_page, len(response.body), file=sys.stderr) | |
| for b in response.body: | |
| builds.append(b) | |
| if len(builds) >= limit: | |
| break | |
| next_page=response.next_page | |
| return BKBuildCollection(builds) | |
| def format_duration(created_at, finished_at): | |
| delta = dateutil.relativedelta.relativedelta( | |
| dateutil.parser.isoparse(created_at), | |
| dateutil.parser.isoparse(finished_at), | |
| ) | |
| keys = ['days', 'hours', 'minutes', 'seconds'] | |
| return ':'.join([f'{delta.__dict__[k]:>02d}' for k in keys]) | |
| def pretty_format_state(state: str): | |
| if state == 'passed': | |
| return f'\033[0;32m{state}\033[0;0m' | |
| elif state == 'failed': | |
| return f'\033[0;31m{state}\033[0;0m' | |
| elif state == 'canceled': | |
| return f'\033[0;0;3m{state}\033[0;0m' | |
| return state | |
| @dataclass | |
| class BKBuildCollection: | |
| builds: List[Dict] | |
| def __init__(self, builds): | |
| self.builds = builds | |
| def jobs_with_states(self, job_states): | |
| for build in self.builds: | |
| for job in build['jobs']: | |
| if job.get('state', None) in job_states: | |
| yield build['id'], job | |
| def __iter__(self): | |
| 'Allows iteration e.g. for build in build_collection:' | |
| yield from self.builds | |
| def as_json(self): | |
| return json.dumps(self.builds, indent=2) | |
| def as_csv(self, delim='\t', meta_data = []): | |
| rows = [] | |
| for i, row in enumerate(self.as_table(meta_data)): | |
| if i == 0: | |
| rows.append(delim.join(list(map(str, row.keys())))) | |
| rows.append(delim.join(list(map(str, row.values())))) | |
| return '\n'.join(rows) | |
| def as_table(self, env = [], meta_data = []): | |
| rows = [] | |
| for b in self.builds: | |
| finished_at = b['finished_at'] or datetime.now().astimezone().isoformat() | |
| rows.append({ | |
| # 'pipeline': b['pipeline']['slug'], | |
| 'state': pretty_format_state(b['state']), | |
| 'created_at': dateutil.parser.isoparse(b['created_at']).astimezone().strftime('%Y%m%d %H:%M:%S%z'), | |
| 'finished_at': dateutil.parser.isoparse(finished_at).astimezone().strftime('%Y%m%d %H:%M:%S%z'), | |
| 'duration': format_duration(finished_at, b['created_at']), | |
| 'commit': b['commit'][:8], | |
| 'web_url': b['web_url'], | |
| **{k: b['env'].get(k, '') for k in env}, | |
| **{k: b['meta_data'].get(k, '') for k in meta_data}, | |
| }) | |
| return rows | |
| def create_new_builds(builds, args): | |
| print("retry listed builds? this will create a new build using master HEAD, with the same ENV vars") | |
| input() | |
| for i, b in enumerate(builds): | |
| kwargs = { | |
| 'organization': args['org'], | |
| 'pipeline': args['pipeline'], | |
| 'commit': 'HEAD', | |
| 'branch': 'master', | |
| 'message': b['message'], | |
| 'env': b['env'], | |
| } | |
| print(i, kwargs) | |
| rerun = input() | |
| if rerun == 'y': | |
| response = helper.client.builds().create_build(**kwargs) | |
| print(response['web_url'], "\n") | |
| if __name__ == '__main__': | |
| parser = argparse.ArgumentParser(description='Buildkite helper!') | |
| parser.add_argument('--org', help='buildkite organization name', required=True) | |
| parser.add_argument('--pipeline', help='pipeline slug', required=True) | |
| parser.add_argument('--state', help='pipeline final state', required=True, choices=list(STATES.keys())) | |
| parser.add_argument('--limit', help='the max number of jobs to fetch', type=int, default=100) | |
| parser.add_argument('--metadata', help='metadata to search', type=json.loads, default='{}') | |
| parser.add_argument('--metadata-keys', help='metadata to display', type=json.loads, default='[]') | |
| parser.add_argument('--env-keys', help='ENVs to display', type=json.loads, default='[]') | |
| parser.add_argument('--format', help='output format', type=str, default='table') | |
| parser.add_argument('--create-new-builds', help='create new builds for all listed builds', action='store_true', default=False) | |
| args = parser.parse_args().__dict__ | |
| print(args, file=sys.stderr) | |
| helper = BKPipelineHelper( | |
| access_token = os.environ['BUILDKITE_ACCESS_TOKEN'], | |
| org_name = args['org'], | |
| pipeline_name = args.get('pipeline', ''), | |
| ) | |
| builds = helper.search_builds( | |
| states = [STATES[args['state']]], | |
| limit = args['limit'], | |
| metadata = args['metadata'], | |
| ) | |
| if args['format'] == 'table': | |
| print(tabulate(builds.as_table(args['env_keys'], args['metadata_keys']), headers='keys')) | |
| elif args['format'] == 'csv': | |
| print(builds.as_csv('\t', args['env_keys']+ args['metadata_keys'])) | |
| elif args['format'] == 'json': | |
| print(builds.as_json()) | |
| elif args['format'] == 'silent': | |
| pass | |
| else: | |
| print(f'invalid format! {args}') | |
| if args['create_new_builds']: | |
| create_new_builds(builds, args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment