Created
May 3, 2021 11:14
-
-
Save trevormunoz/8d4f5f1942392bd91c626cbb6b7decdd to your computer and use it in GitHub Desktop.
Example usage of umd-mith/veefor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #%% | |
| from dataclasses import asdict, fields | |
| from datetime import datetime | |
| import pytz | |
| import json | |
| import re | |
| from collections import Counter | |
| import ulid | |
| from pydantic import ValidationError | |
| import lakeland_db_migrate_v4 as lv4 | |
| import lakeland_db_migrate_v4.source_mappings as sm | |
| # %% | |
| MAGIC_NUMBERS = { | |
| "accessions": 51, | |
| "files": 10325, | |
| "items": 4428, | |
| "entities": 1134, | |
| "subjects": 94, | |
| "relationships": 24, | |
| } | |
| expected_counts = Counter(MAGIC_NUMBERS) | |
| #%% | |
| def load_all(): | |
| """Create a dictionary of source data indexed by Airtable identifiers for convenience.""" | |
| source_files = [ | |
| "Accessions.json", | |
| "Files.json", | |
| "Entities.json", | |
| "Subjects.json", | |
| "Relationships.json", | |
| "Items.json", | |
| ] | |
| fieldmaps = [ | |
| sm.accessions_source_column_mappings, | |
| sm.files_source_column_mappings, | |
| sm.entities_source_column_mappings, | |
| sm.subjects_source_column_mappings, | |
| sm.relationships_source_column_mappings, | |
| sm.items_source_column_mappings, | |
| ] | |
| airtable_lookup_map = {} | |
| for argpair in list(zip(source_files, fieldmaps)): | |
| instances = lv4.validate_inputs(*argpair) | |
| for ix in instances: | |
| airtable_lookup_map[ix.airtable_idno] = ix | |
| return airtable_lookup_map | |
| # %% | |
| airtable_datadict = load_all() | |
| # %% | |
| # Crude check to make sure nothing obvious got screwed up in loading the data | |
| checker = Counter([type(v).__name__ for k, v in airtable_datadict.items()]) | |
| assert checker["AccessionSourceRecord"] == expected_counts["accessions"] | |
| assert checker["FileSourceRecord"] == expected_counts["files"] | |
| assert checker["ItemSourceRecord"] == expected_counts["items"] | |
| assert checker["EntityRelationshipSourceRecord"] == expected_counts["relationships"] | |
| assert checker["SubjectSourceRecord"] == expected_counts["subjects"] | |
| # =========================== CREATE V4 RECORDS ======================================== | |
| # %% | |
| def handle_accessions(instance): | |
| keys_to_init = { | |
| "airtable_idno", | |
| "airtable_created_time", | |
| "title", | |
| "description", | |
| "donor_name", | |
| "donation_date", | |
| "legacy_idno_umd", | |
| "idno", | |
| "file_array", | |
| } | |
| all_consumed_keys = [*keys_to_init] | |
| sidecar_data = filter_extra_fields(instance, all_consumed_keys) | |
| v4_record = lv4.DonationGroupingRecord( | |
| idno=ulid.new().str, | |
| v3_airtable_idno=instance.airtable_idno, | |
| v3_airtable_created_time=instance.airtable_created_time, | |
| title=instance.title, | |
| description=instance.description, | |
| donor_name=instance.donor_name, | |
| donor_email="", | |
| donor_phone="", | |
| donation_date=instance.donation_date, | |
| legacy_idno=instance.legacy_idno_umd, | |
| v3_files_array=instance.file_array, | |
| ) | |
| return (v4_record, sidecar_data) | |
| #%% | |
| def handle_files(instance): | |
| keys_to_init = { | |
| "airtable_idno", | |
| "airtable_created_time", | |
| } | |
| all_consumed_keys = [*keys_to_init] | |
| sidecar_data = filter_extra_fields(instance, all_consumed_keys) | |
| v4_record = lv4.FileRecord( | |
| idno=ulid.new().str, | |
| v3_airtable_idno=instance.airtable_idno, | |
| v3_airtable_created_time=instance.airtable_created_time, | |
| donation_grouping_id=instance.linked_accession, | |
| ) | |
| return (v4_record, sidecar_data) | |
| #%% | |
| def handle_subjects(instance): | |
| keys_to_init = { | |
| "airtable_idno", | |
| "airtable_created_time", | |
| "name", | |
| "category", | |
| } | |
| all_consumed_keys = [*keys_to_init] | |
| sidecar_data = filter_extra_fields(instance, all_consumed_keys) | |
| v4_record = lv4.SubjectRecord( | |
| idno=ulid.new().str, | |
| v3_airtable_idno=instance.airtable_idno, | |
| v3_airtable_created_time=instance.airtable_created_time, | |
| name=instance.name, | |
| subject_type=instance.category, | |
| ) | |
| return (v4_record, sidecar_data) | |
| #%% | |
| def handle_entities(instance): | |
| keys_to_init = { | |
| "airtable_idno", | |
| "airtable_created_time", | |
| "name", | |
| "alt_name", | |
| "bio_hist", | |
| "category", | |
| "idno", | |
| "lchp_source_code", | |
| } | |
| all_consumed_keys = [*keys_to_init] | |
| sidecar_data = filter_extra_fields(instance, all_consumed_keys) | |
| v4_record = lv4.EntityRecord( | |
| idno=ulid.new().str, | |
| v3_airtable_idno=instance.airtable_idno, | |
| v3_airtable_created_time=instance.airtable_created_time, | |
| name=instance.name, | |
| alt_name=instance.alt_name, | |
| bio_hist=instance.bio_hist, | |
| entity_type=instance.category, | |
| legacy_idno_lchp=instance.lchp_source_code, | |
| ) | |
| return (v4_record, sidecar_data) | |
| #%% | |
| def handle_relationships(instance): | |
| keys_to_init = { | |
| "airtable_idno", | |
| "airtable_created_time", | |
| "entity_1", | |
| "entity_2", | |
| "name", | |
| "relation_type", | |
| } | |
| all_consumed_keys = [*keys_to_init] | |
| sidecar_data = filter_extra_fields(instance, all_consumed_keys) | |
| v4_record = lv4.EntityRelationshipRecord( | |
| idno=ulid.new().str, | |
| v3_airtable_idno=instance.airtable_idno, | |
| v3_airtable_created_time=instance.airtable_created_time, | |
| name=instance.name, | |
| subject_entity=instance.entity_1, | |
| object_entity=instance.entity_2, | |
| relationship_predicate=instance.relation_type, | |
| ) | |
| return (v4_record, sidecar_data) | |
| #%% | |
| def filter_extra_fields(instance, consumed_keys): | |
| all_src_keys = asdict(instance) | |
| noninit_keys = [k for k in all_src_keys.keys() if k not in consumed_keys] | |
| popped = [] | |
| for k in noninit_keys: | |
| pk = all_src_keys.pop(k) | |
| if pk not in [[], "", False]: | |
| popped.append((k, pk)) | |
| return dict(popped) | |
| #%% | |
| def handle_item_types(instance, matcher): | |
| itypes, icats = matcher | |
| it = instance.__getattribute__(itypes) | |
| ic = instance.__getattribute__(icats) | |
| flattened_types = [] | |
| if not isinstance(it, str): | |
| flattened_types.extend(it) | |
| # raise RuntimeError("item type screwup") | |
| else: | |
| flattened_types.append(it) | |
| if isinstance(ic, str): | |
| flattened_types.append(ic) | |
| elif isinstance(ic, list): | |
| flattened_types + ic | |
| return list(filter(None, flattened_types)) | |
| #%% | |
| def handle_collections(collection_name): | |
| if not collection_name.startswith("Lakeland - African Americans in College Park"): | |
| return collection_name | |
| else: | |
| return "" | |
| #%% | |
| def handle_item(instance: lv4.sources.ItemSourceRecord) -> None: | |
| """Migrate source items to v4 items""" | |
| keys_to_init = { | |
| "airtable_idno", | |
| "airtable_created_time", | |
| "legacy_idno_umd", | |
| "title", | |
| "description", | |
| "created_date", | |
| } | |
| types_cats = { | |
| "obj_type", | |
| "category", | |
| } | |
| collections = {"collection"} | |
| v4_type = handle_item_types(instance, types_cats) | |
| all_consumed_keys = [*keys_to_init, *types_cats, *collections] | |
| sidecar_data = filter_extra_fields(instance, all_consumed_keys) | |
| try: | |
| v4_item_record = lv4.ItemRecord( | |
| idno=ulid.new().str, | |
| v3_airtable_idno=instance.airtable_idno, | |
| v3_airtable_created_time=instance.airtable_created_time, | |
| title=instance.title, | |
| description=instance.description, | |
| v3_created_date=instance.created_date, | |
| item_type=v4_type, | |
| collection=handle_collections(instance.collection), | |
| ) | |
| except ValidationError as err: | |
| print(err) | |
| print(v4_type) | |
| return (v4_item_record, sidecar_data) | |
| #%% | |
| V4_RECORDS = {} | |
| def migrate(input_pair): | |
| v3_airtable_idno, record_instance = input_pair | |
| if isinstance(record_instance, lv4.sources.AccessionSourceRecord): | |
| res = handle_accessions(record_instance) | |
| V4_RECORDS[res[0].v3_airtable_idno] = res | |
| if isinstance(record_instance, lv4.sources.FileSourceRecord): | |
| res = handle_files(record_instance) | |
| V4_RECORDS[res[0].v3_airtable_idno] = res | |
| if isinstance(record_instance, lv4.sources.SubjectSourceRecord): | |
| res = handle_subjects(record_instance) | |
| V4_RECORDS[res[0].v3_airtable_idno] = res | |
| if isinstance(record_instance, lv4.sources.EntitySourceRecord): | |
| res = handle_entities(record_instance) | |
| V4_RECORDS[res[0].v3_airtable_idno] = res | |
| if isinstance(record_instance, lv4.sources.EntityRelationshipSourceRecord): | |
| res = handle_relationships(record_instance) | |
| V4_RECORDS[res[0].v3_airtable_idno] = res | |
| if isinstance(record_instance, lv4.sources.ItemSourceRecord): | |
| res = handle_item(record_instance) | |
| V4_RECORDS[res[0].v3_airtable_idno] = res | |
| # %% | |
| for i in airtable_datadict.items(): | |
| migrate(i) | |
| # %% | |
| assert len(V4_RECORDS) == sum(expected_counts.values()) | |
| len(V4_RECORDS) | |
| # =========================== RESOLVE RECORD LINKAGES ======================================== | |
| # %% | |
| airtable_id_regex = re.compile(r"rec[a-zA-Z0-9]{14}") | |
| #%% | |
| RESOLVED_V4_RECORDS = {} | |
| sidecar_key_collector = [] | |
| for recitem in V4_RECORDS.items(): | |
| rec, sidecar = recitem[1] | |
| v4_files = [] | |
| if isinstance(rec, lv4.DonationGroupingRecord): | |
| if isinstance(rec.v3_files_array, list): | |
| resolved_files = [V4_RECORDS[fid][0].idno for fid in rec.v3_files_array] | |
| v4_files.extend(resolved_files) | |
| if isinstance(rec.v3_files_array, str): | |
| v4_files.append(V4_RECORDS[rec.v3_files_array][0].idno) | |
| RESOLVED_V4_RECORDS[rec.idno] = (rec, {**sidecar, "v4_files": resolved_files}) | |
| if isinstance(rec, lv4.FileRecord): | |
| rec.donation_grouping_id = V4_RECORDS[sidecar["linked_accession"]][0].idno | |
| try: | |
| rec.item_id = V4_RECORDS[sidecar["part_of_item"]][0].idno | |
| except KeyError: | |
| pass | |
| used_keys = {"linked_accession", "part_of_item"} | |
| new_sidecar = {k: v for k, v in sidecar.items() if k not in used_keys} | |
| RESOLVED_V4_RECORDS[rec.idno] = (rec, new_sidecar) | |
| if isinstance(rec, lv4.SubjectRecord): | |
| if isinstance(sidecar["linked_items_array"], list): | |
| resolved_items = [ | |
| V4_RECORDS[i][0].idno for i in sidecar["linked_items_array"] | |
| ] | |
| else: | |
| resolved_items = [V4_RECORDS[sidecar["linked_items_array"]][0].idno] | |
| rec.linked_items = resolved_items | |
| used_keys = {"linked_items_array"} | |
| new_sidecar = {k: v for k, v in sidecar.items() if k not in used_keys} | |
| RESOLVED_V4_RECORDS[rec.idno] = (rec, new_sidecar) | |
| if isinstance(rec, lv4.EntityRecord): | |
| RESOLVED_V4_RECORDS[rec.idno] = (rec, sidecar) | |
| if isinstance(rec, lv4.EntityRelationshipRecord): | |
| try: | |
| rec.subject_entity = V4_RECORDS[rec.subject_entity][0].idno | |
| except KeyError as err: | |
| print("Errored on {}".format(rec.v3_airtable_idno)) | |
| try: | |
| rec.object_entity = V4_RECORDS[rec.object_entity][0].idno | |
| except KeyError as err: | |
| print("Errored on {}".format(rec.v3_airtable_idno)) | |
| RESOLVED_V4_RECORDS[rec.idno] = (rec, sidecar) | |
| if isinstance(rec, lv4.ItemRecord): | |
| try: | |
| resolved_linked_people = [ | |
| V4_RECORDS[p][0].idno for p in sidecar["linked_people"] | |
| ] | |
| rec.linked_entities.extend(resolved_linked_people) | |
| except KeyError as err: | |
| pass | |
| try: | |
| resolved_sources = [ | |
| V4_RECORDS[s][0].idno for s in sidecar["linked_entity_source"] | |
| ] | |
| rec.linked_entities_as_sources.extend(resolved_sources) | |
| except KeyError as err: | |
| pass | |
| try: | |
| if isinstance(sidecar["linked_places_orgs"], list): | |
| resolved_place_orgs = [ | |
| V4_RECORDS[po][0].idno for po in sidecar["linked_places_orgs"] | |
| ] | |
| rec.linked_entities.extend(resolved_place_orgs) | |
| else: | |
| rec.linked_entities.append( | |
| V4_RECORDS[sidecar["linked_places_orgs"]][0].idno | |
| ) | |
| except KeyError as err: | |
| pass | |
| try: | |
| if isinstance(sidecar["linked_subjects"], list): | |
| resolved_subjects = [ | |
| V4_RECORDS[sbj][0].idno for sbj in sidecar["linked_subjects"] | |
| ] | |
| rec.linked_subjects.extend(resolved_subjects) | |
| elif isinstance(sidecar["linked_subjects"], str): | |
| rec.linked_subjects.append( | |
| V4_RECORDS[sidecar["linked_subjects"]][0].idno | |
| ) | |
| else: | |
| raise RuntimeError("Got weird subject") | |
| except KeyError as err: | |
| pass | |
| try: | |
| if isinstance(sidecar["linked_entity_interviewees"], str): | |
| rec.linked_entities_as_interviewees.append( | |
| V4_RECORDS[sidecar["linked_entity_interviewees"]][0].idno | |
| ) | |
| elif isinstance(sidecar["linked_entity_interviewees"], list): | |
| resolved_interviewees = [ | |
| V4_RECORDS[ie][0].idno | |
| for ie in sidecar["linked_entity_interviewees"] | |
| ] | |
| rec.linked_entities_as_interviewees.extend(resolved_interviewees) | |
| else: | |
| raise RuntimeError("Got an interviewee of invalid type") | |
| except KeyError as err: | |
| pass | |
| try: | |
| if isinstance(sidecar["linked_entity_interviewers"], str): | |
| rec.linked_entities_as_interviewers.append( | |
| V4_RECORDS[sidecar["linked_entity_interviewers"]][0].idno | |
| ) | |
| elif isinstance(sidecar["linked_entity_interviewers"], list): | |
| resolved_interviewers = [ | |
| V4_RECORDS[ie][0].idno | |
| for ie in sidecar["linked_entity_interviewers"] | |
| ] | |
| rec.linked_entities_as_interviewers.extend(resolved_interviewers) | |
| else: | |
| raise RuntimeError("Got an interviewer of invalid type") | |
| except KeyError as err: | |
| pass | |
| used_keys = { | |
| "linked_people", | |
| "linked_entity_source", | |
| "linked_places_orgs", | |
| "linked_subjects", | |
| "linked_entity_interviewees", | |
| "linked_entity_interviewers", | |
| } | |
| new_sidecar = {k: v for k, v in sidecar.items() if k not in used_keys} | |
| RESOLVED_V4_RECORDS[rec.idno] = (rec, new_sidecar) | |
| #%% | |
| assert len(V4_RECORDS) == len(RESOLVED_V4_RECORDS) | |
| items = [n[0] for n in RESOLVED_V4_RECORDS.values() if isinstance(n[0], lv4.ItemRecord)] | |
| items_with_no_links = [] | |
| for i in items: | |
| sentinel_value = 0 | |
| sentinel_value += len(i.linked_entities) | |
| sentinel_value += len(i.linked_entities_as_interviewers) | |
| sentinel_value += len(i.linked_entities_as_interviewees) | |
| sentinel_value += len(i.linked_entities_as_sources) | |
| if sentinel_value == 0: | |
| items_with_no_links.append(i) | |
| print("{} items had no linkages".format(len(items_with_no_links))) | |
| # =========================== PERSIST V4 RECORDS ======================================== | |
| #%% | |
| TZ = pytz.timezone("US/Eastern") | |
| to_json = [] | |
| table_keys = { | |
| "DonationGroupingRecord": "Donation Groups", | |
| "FileRecord": "Files", | |
| "EntityRecord": "Entities", | |
| "SubjectRecord": "Subjects", | |
| "EntityRelationshipRecord": "Relationships", | |
| "ItemRecord": "Items", | |
| } | |
| for rv in RESOLVED_V4_RECORDS.values(): | |
| rrec, rsidecar = rv | |
| record_time = datetime.now(TZ) | |
| to_save = { | |
| **asdict(rrec), | |
| "table_key": table_keys[type(rrec).__name__], | |
| "migrated_at": record_time.isoformat(), | |
| "extra_fields": rsidecar, | |
| } | |
| to_json.append(to_save) | |
| with open("outputs/migrated_records.json", "w") as outfile: | |
| outfile.write(json.dumps(to_json)) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment