Last active
November 4, 2025 12:35
-
-
Save jay0lee/6add1e4d5970f8e9c202161ab801c668 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import argparse | |
| import csv | |
| import os | |
| import json | |
| import logging | |
| import socket | |
| import sys | |
| import time | |
| from itertools import batched | |
| import google.auth | |
| import google.auth.transport.requests | |
| from googleapiclient.discovery import build | |
| def get_service(): | |
| scopes = ['https://www.googleapis.com/auth/cloud-platform'] | |
| credentials, project = google.auth.default(scopes=scopes) | |
| credentials, _ = google.auth.default(scopes=scopes, quota_project_id=project) | |
| credentials.refresh(google.auth.transport.requests.Request()) | |
| socket.setdefaulttimeout(10) | |
| svc = build('discoveryengine', | |
| 'v1') | |
| logger_to_suppress = logging.getLogger('google_auth_httplib2') | |
| logger_to_suppress.setLevel(logging.ERROR) | |
| return svc | |
| def watch_operation(service, operation): | |
| while True: | |
| req = service.projects().locations().identityMappingStores().operations().get( | |
| name=operation) | |
| resp = req.execute(num_retries=5) | |
| if resp.get('done'): | |
| return resp | |
| print(f' operation is not done. sleeping 10 seconds before checking again...') | |
| print(resp) | |
| time.sleep(10) | |
| def call_api(service, function, **kwargs): | |
| parameters = kwargs.copy() | |
| if function: | |
| method = getattr(service, function)(**parameters) | |
| else: | |
| method = service | |
| return method.execute(num_retries=5) | |
| def call_pages(service, function, items='items', page_message=None, **kwargs): | |
| pageToken = None | |
| all_pages = list() | |
| total_items = 0 | |
| while True: | |
| this_page = call_api(service, function, pageToken=pageToken, **kwargs) | |
| if not this_page: | |
| this_page = {items: []} | |
| if page_message: | |
| print(page_message) | |
| all_pages += this_page.get(items, []) | |
| pageToken = this_page.get('nextPageToken') | |
| if not pageToken: | |
| return all_pages | |
| def read_csv_to_dict(filename): | |
| data = [] | |
| try: | |
| with open(filename, 'r', newline='', encoding='utf-8') as csvfile: | |
| reader = csv.reader(csvfile) | |
| header = next(reader) | |
| expected_header = ["groupId", "userId", "externalIdentity"] | |
| if header != expected_header: | |
| print(f"Warning: Expected header {expected_header}, but found {header}.", file=sys.stderr) | |
| for row in reader: | |
| row_dict = {} | |
| for key, value in zip(header, row): | |
| # Check if the cell value is not an empty string | |
| if value: | |
| row_dict[key] = value | |
| data.append(row_dict) | |
| return data | |
| except FileNotFoundError: | |
| print(f"ERROR: The file at '{filename}' was not found.", file=sys.stderr) | |
| sys.exit(1) | |
| except Exception as e: | |
| print(f"ERROR: An unexpected error occurred: {e}", file=sys.stderr) | |
| sys.exit(1) | |
| def create_store(args): | |
| """Creates a new store.""" | |
| service = get_service() | |
| parent = f'projects/{args.project}/locations/global' | |
| identityMappingStoreId = args.store | |
| name = f'{parent}/identityMappingStores/{identityMappingStoreId}' | |
| body = {'name': name} | |
| resp = call_api(service.projects().locations().identityMappingStores(), | |
| 'create', | |
| identityMappingStoreId=identityMappingStoreId, | |
| parent=parent, | |
| body=body) | |
| name = resp.get('name', 'Unknown') | |
| print(f' created mapping store {name}') | |
| def list_stores(args): | |
| """Lists all store files in the current directory.""" | |
| service = get_service() | |
| parent = f'projects/{args.project}/locations/global' | |
| resp = call_api(service.projects().locations().identityMappingStores(), | |
| 'list', | |
| parent=parent) | |
| stores = resp.get('identityMappingStores') | |
| print(f'Stores: ({len(stores)})') | |
| for store in stores: | |
| print(f' {store.get("name")}') | |
| def delete_store(args): | |
| """Deletes a specified store file.""" | |
| service = get_service() | |
| parent = f'projects/{args.project}/locations/global' | |
| identityMappingStoreId = args.store | |
| name = f'{parent}/identityMappingStores/{identityMappingStoreId}' | |
| resp = call_api(service.projects().locations().identityMappingStores(), | |
| 'delete', | |
| name=name) | |
| if not resp.get('done'): | |
| operation = resp.get('name') | |
| resp = watch_operation(service, operation) | |
| print(f' Deleted mapping store {name}') | |
| def import_mappings(args): | |
| """Imports mappings from a JSON file into a store.""" | |
| service = get_service() | |
| parent = f'projects/{args.project}/locations/global' | |
| identityMappingStore = f'{parent}/identityMappingStores/{args.store}' | |
| mappings = read_csv_to_dict(args.csvfilename) | |
| total_mappings = len(mappings) | |
| attempted_imports = 0 | |
| imported_mappings = 0 | |
| mapping_chunks = list(batched(mappings, 5000)) | |
| for mapping_chunk in mapping_chunks: | |
| print(f'Importing {len(mapping_chunk)} mappings...') | |
| body = { | |
| "inlineSource": { | |
| "identityMappingEntries": mapping_chunk | |
| } | |
| } | |
| resp = call_api(service.projects().locations().identityMappingStores(), | |
| 'importIdentityMappings', | |
| identityMappingStore=identityMappingStore, | |
| body=body) | |
| if not resp.get('done'): | |
| operation = resp.get('name') | |
| resp = watch_operation(service, operation) | |
| metadata = resp.get('metadata', {}) | |
| successCount = int(metadata.get('successCount', 0)) | |
| totalCount = int(metadata.get('totalCount', 0)) | |
| imported_mappings += successCount | |
| attempted_imports += totalCount | |
| print(f'Succesfully imported {imported_mappings:,} of {attempted_imports:,} attempted mappings.') | |
| def list_mappings(args): | |
| """Lists all mappings in a specified store.""" | |
| service = get_service() | |
| parent = f'projects/{args.project}/locations/global' | |
| identityMappingStore = f'{parent}/identityMappingStores/{args.store}' | |
| pageSize = 10000 | |
| items = 'identityMappingEntries' | |
| svc = service.projects().locations().identityMappingStores() | |
| function = 'listIdentityMappings' | |
| mappings = call_pages(service=svc, | |
| function=function, | |
| items=items, | |
| page_message='got a page', | |
| pageSize=pageSize, | |
| identityMappingStore=identityMappingStore) | |
| headers = set() | |
| for mapping in mappings: | |
| headers.update(mapping.keys()) | |
| writer = csv.DictWriter(sys.stdout, fieldnames=headers) | |
| writer.writeheader() | |
| writer.writerows(mappings) | |
| def purge_mappings(args): | |
| """Removes all mappings from a specified store.""" | |
| service = get_service() | |
| parent = f'projects/{args.project}/locations/global' | |
| identityMappingStore = f'{parent}/identityMappingStores/{args.store}' | |
| mappings = read_csv_to_dict(args.csvfilename) | |
| total_mappings = len(mappings) | |
| purged_mappings = 0 | |
| mapping_chunks = list(batched(mappings, 10000)) | |
| for mapping_chunk in mapping_chunks: | |
| print(f'purging {len(mapping_chunk)} mappings...') | |
| body = { | |
| "inlineSource": { | |
| "identityMappingEntries": mapping_chunk | |
| } | |
| } | |
| req = service.projects().locations().identityMappingStores().purgeIdentityMappings( | |
| identityMappingStore=identityMappingStore, | |
| body=body) | |
| resp = req.execute(num_retries=5) | |
| if not resp.get('done'): | |
| operation = resp.get('name') | |
| resp = watch_operation(service, operation) | |
| metadata = resp.get('metadata', {}) | |
| successCount = int(metadata.get('successCount', 0)) | |
| totalCount = int(metadata.get('totalCount', 0)) | |
| print(f' Successfully purged {successCount:,} of {totalCount:,} mappings...') | |
| def main(): | |
| parser = argparse.ArgumentParser(description="A simple key-value store management tool.") | |
| subparsers = parser.add_subparsers(dest='action', required=True, help='Available actions') | |
| # Create Store sub-parser | |
| create_parser = subparsers.add_parser('createstore', help='Creates a new key-value store.') | |
| create_parser.add_argument('--store', help='The name of the store to create (e.g., mystore).') | |
| create_parser.add_argument('--project', help='GCP project of the store to create.') | |
| create_parser.set_defaults(func=create_store) | |
| # List Stores sub-parser | |
| list_stores_parser = subparsers.add_parser('liststores', help='Lists all available stores.') | |
| list_stores_parser.add_argument('--project', help='GCP project of the stores.') | |
| list_stores_parser.set_defaults(func=list_stores) | |
| # Delete Store sub-parser | |
| delete_parser = subparsers.add_parser('deletestore', help='Deletes a specified key-value store.') | |
| delete_parser.add_argument('--store', help='The name of the store to delete.') | |
| delete_parser.add_argument('--project', help='GCP project of the stores.') | |
| delete_parser.set_defaults(func=delete_store) | |
| # Import Mappings sub-parser | |
| import_parser = subparsers.add_parser('importmappings', help='Imports mappings from a JSON file into a store.') | |
| import_parser.add_argument('--store', help='The name of the store to import into.') | |
| import_parser.add_argument('--project', help='GCP project of the stores.') | |
| import_parser.add_argument('--csvfilename', help='The CSV file containing the mappings.') | |
| import_parser.set_defaults(func=import_mappings) | |
| # List Mappings sub-parser | |
| list_mappings_parser = subparsers.add_parser('listmappings', help='Lists all mappings in a specified store.') | |
| list_mappings_parser.add_argument('--store', help='The name of the store to list mappings from.') | |
| list_mappings_parser.add_argument('--project', help='GCP project of the stores.') | |
| list_mappings_parser.set_defaults(func=list_mappings) | |
| # Purge Mappings sub-parser | |
| purge_parser = subparsers.add_parser('purgemappings', help='Removes all mappings from a specified store.') | |
| purge_parser.add_argument('--store', help='The name of the store to purge.') | |
| purge_parser.add_argument('--project', help='GCP project of the stores.') | |
| purge_parser.add_argument('--filter', help='filter which determines which mappings are purged.') | |
| purge_parser.add_argument('--csvfilename', help='The CSV file containing the mappings.') | |
| purge_parser.add_argument('--force', action='store_true', help='Actually make the purge changes.') | |
| purge_parser.set_defaults(func=purge_mappings) | |
| args = parser.parse_args() | |
| args.func(args) | |
| if __name__ == "__main__": | |
| try: | |
| main() | |
| except KeyboardInterrupt: | |
| sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment