Skip to content

Instantly share code, notes, and snippets.

@Quidge
Created July 23, 2025 18:32
Show Gist options
  • Select an option

  • Save Quidge/49cdfa8484436fe35793089d77a03a66 to your computer and use it in GitHub Desktop.

Select an option

Save Quidge/49cdfa8484436fe35793089d77a03a66 to your computer and use it in GitHub Desktop.
Potential refactor of apply_run_migrations()

Currently, our run_apply_migrations() runs like so:

def run_apply_migrations(schemas: list[str], dry_run: bool = False):

	migration_failures: list[str] = []
	
	a_lot_of_argument_validation()

	# the core loop
	for customer_schema_name in schemas:
		
		with transaction.begin() as trans:
			
			try:
				migrate_schema(customer_schema_name)
			except MigrationError:
				migration_failures.append("couldn't migrate {customer_schema_name")
				continue

			if dry_run:
				trans.rollback()
			else:
				trans.commit()

So, if dry_run is passed, we loop through each schema and try the migration to see if there are any constraint/index/db errors. Then we roll back. This allows us to test if the migration would work.

We want to change the implementation that in dry_run=False mode (IE the normal migration mode), we run the whole thing in dry_run=True mode and then if there aren't any failures, re-run the whole thing in dry_run=False mode.

The idea being to, before we run a migration for real, determine if any customer schema would have a problem. Once we know there aren't any customer schemas that would have a problem, we migrate (for real) every customer schema.

After the refactor, that might look like:

def run_apply_migrations(schemas: list[str], dry_run: bool = False):

	migration_failures: list[str] = []
	
	a_lot_of_argument_validation()

	# the core loop
	for customer_schema_name in schemas:
		
		with transaction.begin() as trans:
			
			try:
				migrate_schema(customer_schema_name)
			except MigrationError:
				migration_failures.append("couldn't migrate {customer_schema_name")
				continue

			trans.rollback()

		if migration_failures:
			exit_and_report_migration_failures()

	if dry_run=False:
		
		for customer_schema_name in schemas:
		
			with transaction.begin() as trans:
				migrate_schema(customer_schema_name)
				
				trans.commit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment