Skip to content

Instantly share code, notes, and snippets.

@jay0lee
Last active November 4, 2025 12:35
Show Gist options
  • Select an option

  • Save jay0lee/6add1e4d5970f8e9c202161ab801c668 to your computer and use it in GitHub Desktop.

Select an option

Save jay0lee/6add1e4d5970f8e9c202161ab801c668 to your computer and use it in GitHub Desktop.
import argparse
import csv
import os
import json
import logging
import socket
import sys
import time
from itertools import batched
import google.auth
import google.auth.transport.requests
from googleapiclient.discovery import build
def get_service():
scopes = ['https://www.googleapis.com/auth/cloud-platform']
credentials, project = google.auth.default(scopes=scopes)
credentials, _ = google.auth.default(scopes=scopes, quota_project_id=project)
credentials.refresh(google.auth.transport.requests.Request())
socket.setdefaulttimeout(10)
svc = build('discoveryengine',
'v1')
logger_to_suppress = logging.getLogger('google_auth_httplib2')
logger_to_suppress.setLevel(logging.ERROR)
return svc
def watch_operation(service, operation):
while True:
req = service.projects().locations().identityMappingStores().operations().get(
name=operation)
resp = req.execute(num_retries=5)
if resp.get('done'):
return resp
print(f' operation is not done. sleeping 10 seconds before checking again...')
print(resp)
time.sleep(10)
def call_api(service, function, **kwargs):
parameters = kwargs.copy()
if function:
method = getattr(service, function)(**parameters)
else:
method = service
return method.execute(num_retries=5)
def call_pages(service, function, items='items', page_message=None, **kwargs):
pageToken = None
all_pages = list()
total_items = 0
while True:
this_page = call_api(service, function, pageToken=pageToken, **kwargs)
if not this_page:
this_page = {items: []}
if page_message:
print(page_message)
all_pages += this_page.get(items, [])
pageToken = this_page.get('nextPageToken')
if not pageToken:
return all_pages
def read_csv_to_dict(filename):
data = []
try:
with open(filename, 'r', newline='', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile)
header = next(reader)
expected_header = ["groupId", "userId", "externalIdentity"]
if header != expected_header:
print(f"Warning: Expected header {expected_header}, but found {header}.", file=sys.stderr)
for row in reader:
row_dict = {}
for key, value in zip(header, row):
# Check if the cell value is not an empty string
if value:
row_dict[key] = value
data.append(row_dict)
return data
except FileNotFoundError:
print(f"ERROR: The file at '{filename}' was not found.", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"ERROR: An unexpected error occurred: {e}", file=sys.stderr)
sys.exit(1)
def create_store(args):
"""Creates a new store."""
service = get_service()
parent = f'projects/{args.project}/locations/global'
identityMappingStoreId = args.store
name = f'{parent}/identityMappingStores/{identityMappingStoreId}'
body = {'name': name}
resp = call_api(service.projects().locations().identityMappingStores(),
'create',
identityMappingStoreId=identityMappingStoreId,
parent=parent,
body=body)
name = resp.get('name', 'Unknown')
print(f' created mapping store {name}')
def list_stores(args):
"""Lists all store files in the current directory."""
service = get_service()
parent = f'projects/{args.project}/locations/global'
resp = call_api(service.projects().locations().identityMappingStores(),
'list',
parent=parent)
stores = resp.get('identityMappingStores')
print(f'Stores: ({len(stores)})')
for store in stores:
print(f' {store.get("name")}')
def delete_store(args):
"""Deletes a specified store file."""
service = get_service()
parent = f'projects/{args.project}/locations/global'
identityMappingStoreId = args.store
name = f'{parent}/identityMappingStores/{identityMappingStoreId}'
resp = call_api(service.projects().locations().identityMappingStores(),
'delete',
name=name)
if not resp.get('done'):
operation = resp.get('name')
resp = watch_operation(service, operation)
print(f' Deleted mapping store {name}')
def import_mappings(args):
"""Imports mappings from a JSON file into a store."""
service = get_service()
parent = f'projects/{args.project}/locations/global'
identityMappingStore = f'{parent}/identityMappingStores/{args.store}'
mappings = read_csv_to_dict(args.csvfilename)
total_mappings = len(mappings)
attempted_imports = 0
imported_mappings = 0
mapping_chunks = list(batched(mappings, 5000))
for mapping_chunk in mapping_chunks:
print(f'Importing {len(mapping_chunk)} mappings...')
body = {
"inlineSource": {
"identityMappingEntries": mapping_chunk
}
}
resp = call_api(service.projects().locations().identityMappingStores(),
'importIdentityMappings',
identityMappingStore=identityMappingStore,
body=body)
if not resp.get('done'):
operation = resp.get('name')
resp = watch_operation(service, operation)
metadata = resp.get('metadata', {})
successCount = int(metadata.get('successCount', 0))
totalCount = int(metadata.get('totalCount', 0))
imported_mappings += successCount
attempted_imports += totalCount
print(f'Succesfully imported {imported_mappings:,} of {attempted_imports:,} attempted mappings.')
def list_mappings(args):
"""Lists all mappings in a specified store."""
service = get_service()
parent = f'projects/{args.project}/locations/global'
identityMappingStore = f'{parent}/identityMappingStores/{args.store}'
pageSize = 10000
items = 'identityMappingEntries'
svc = service.projects().locations().identityMappingStores()
function = 'listIdentityMappings'
mappings = call_pages(service=svc,
function=function,
items=items,
page_message='got a page',
pageSize=pageSize,
identityMappingStore=identityMappingStore)
headers = set()
for mapping in mappings:
headers.update(mapping.keys())
writer = csv.DictWriter(sys.stdout, fieldnames=headers)
writer.writeheader()
writer.writerows(mappings)
def purge_mappings(args):
"""Removes all mappings from a specified store."""
service = get_service()
parent = f'projects/{args.project}/locations/global'
identityMappingStore = f'{parent}/identityMappingStores/{args.store}'
mappings = read_csv_to_dict(args.csvfilename)
total_mappings = len(mappings)
purged_mappings = 0
mapping_chunks = list(batched(mappings, 10000))
for mapping_chunk in mapping_chunks:
print(f'purging {len(mapping_chunk)} mappings...')
body = {
"inlineSource": {
"identityMappingEntries": mapping_chunk
}
}
req = service.projects().locations().identityMappingStores().purgeIdentityMappings(
identityMappingStore=identityMappingStore,
body=body)
resp = req.execute(num_retries=5)
if not resp.get('done'):
operation = resp.get('name')
resp = watch_operation(service, operation)
metadata = resp.get('metadata', {})
successCount = int(metadata.get('successCount', 0))
totalCount = int(metadata.get('totalCount', 0))
print(f' Successfully purged {successCount:,} of {totalCount:,} mappings...')
def main():
parser = argparse.ArgumentParser(description="A simple key-value store management tool.")
subparsers = parser.add_subparsers(dest='action', required=True, help='Available actions')
# Create Store sub-parser
create_parser = subparsers.add_parser('createstore', help='Creates a new key-value store.')
create_parser.add_argument('--store', help='The name of the store to create (e.g., mystore).')
create_parser.add_argument('--project', help='GCP project of the store to create.')
create_parser.set_defaults(func=create_store)
# List Stores sub-parser
list_stores_parser = subparsers.add_parser('liststores', help='Lists all available stores.')
list_stores_parser.add_argument('--project', help='GCP project of the stores.')
list_stores_parser.set_defaults(func=list_stores)
# Delete Store sub-parser
delete_parser = subparsers.add_parser('deletestore', help='Deletes a specified key-value store.')
delete_parser.add_argument('--store', help='The name of the store to delete.')
delete_parser.add_argument('--project', help='GCP project of the stores.')
delete_parser.set_defaults(func=delete_store)
# Import Mappings sub-parser
import_parser = subparsers.add_parser('importmappings', help='Imports mappings from a JSON file into a store.')
import_parser.add_argument('--store', help='The name of the store to import into.')
import_parser.add_argument('--project', help='GCP project of the stores.')
import_parser.add_argument('--csvfilename', help='The CSV file containing the mappings.')
import_parser.set_defaults(func=import_mappings)
# List Mappings sub-parser
list_mappings_parser = subparsers.add_parser('listmappings', help='Lists all mappings in a specified store.')
list_mappings_parser.add_argument('--store', help='The name of the store to list mappings from.')
list_mappings_parser.add_argument('--project', help='GCP project of the stores.')
list_mappings_parser.set_defaults(func=list_mappings)
# Purge Mappings sub-parser
purge_parser = subparsers.add_parser('purgemappings', help='Removes all mappings from a specified store.')
purge_parser.add_argument('--store', help='The name of the store to purge.')
purge_parser.add_argument('--project', help='GCP project of the stores.')
purge_parser.add_argument('--filter', help='filter which determines which mappings are purged.')
purge_parser.add_argument('--csvfilename', help='The CSV file containing the mappings.')
purge_parser.add_argument('--force', action='store_true', help='Actually make the purge changes.')
purge_parser.set_defaults(func=purge_mappings)
args = parser.parse_args()
args.func(args)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment