Skip to content

Instantly share code, notes, and snippets.

@gdgellatly
Created August 24, 2024 00:01
Show Gist options
  • Select an option

  • Save gdgellatly/9bc7a899fad679ae907257cf8fbbcf5b to your computer and use it in GitHub Desktop.

Select an option

Save gdgellatly/9bc7a899fad679ae907257cf8fbbcf5b to your computer and use it in GitHub Desktop.
Dropping and Readding FK's
# fk_unconstrained.py
# Execute SQL statements with FK constraints temporarily disabled
# Used by the 16.0.4.0.0 migration (needs to be imported)
import pprint
from openupgradelib import openupgrade
MODULE = "some_module"
def _get_foreign_key_constraints(env, table_name, delete_actions):
"""
Retrieves foreign key constraints for the specified table filtered by
delete actions.
Parameters:
- env: Odoo environment.
- table_name: Table name.
- delete_actions: List of delete actions to filter.
Returns:
- Dictionary of constraints with constraint names as keys and details
as values.
"""
env.cr.execute(
"""
SELECT conname,
conrelid::regclass AS table_name,
confrelid::regclass AS foreign_table,
(SELECT string_agg(quote_ident(attname), ', ')
FROM unnest(conkey) AS cols(colid)
JOIN pg_attribute AS a
ON a.attnum = cols.colid AND a.attrelid = c.oid) AS columns,
(CASE WHEN confdeltype = 'a' THEN 'NO ACTION'
WHEN confdeltype = 'r' THEN 'RESTRICT'
WHEN confdeltype = 'c' THEN 'CASCADE'
WHEN confdeltype = 'n' THEN 'SET NULL'
ELSE 'SET DEFAULT' END) AS delete_action
FROM pg_constraint con
JOIN pg_class c ON con.conrelid = c.oid
WHERE con.contype = 'f'
AND (con.conrelid = %s::regclass OR con.confrelid = %s::regclass)
AND (CASE WHEN confdeltype = 'a' THEN 'NO ACTION'
WHEN confdeltype = 'r' THEN 'RESTRICT'
WHEN confdeltype = 'c' THEN 'CASCADE'
WHEN confdeltype = 'n' THEN 'SET NULL'
ELSE 'SET DEFAULT' END) = ANY(%s);
""",
(table_name, table_name, delete_actions),
)
constraints = env.cr.fetchall()
return {
conname: {
"table_name": table_name,
"foreign_table": foreign_table,
"columns": columns,
"delete_action": delete_action,
}
for conname, table_name, foreign_table, columns, delete_action in constraints
}
def _generate_readd_scripts_for_tables(env, table_names, delete_actions):
"""
Generates the scripts to re-add foreign key constraints for the specified
tables based on the delete actions.
Parameters:
- env: Odoo environment.
- table_names: List of table names.
- delete_actions: List of delete actions to add.
Returns:
- List of tuples with table names and re-add statements.
"""
readd_scripts = []
for table_name in table_names:
env.cr.execute(
"""
SELECT 'ALTER TABLE ' || quote_ident(c.relname) ||
' ADD CONSTRAINT ' || quote_ident(conname) ||
' FOREIGN KEY (' ||
(SELECT string_agg(quote_ident(attname), ', ')
FROM unnest(conkey) AS cols(colid)
JOIN pg_attribute AS a
ON a.attnum = cols.colid AND a.attrelid = c.oid) ||
') REFERENCES ' || quote_ident(f.relname) || '(' ||
(SELECT string_agg(quote_ident(attname), ', ')
FROM unnest(confkey) AS cols(colid)
JOIN pg_attribute AS a
ON a.attnum = cols.colid AND a.attrelid = f.oid) ||
') ON DELETE ' || (CASE WHEN confdeltype = 'a' THEN 'NO ACTION'
WHEN confdeltype = 'r' THEN 'RESTRICT'
WHEN confdeltype = 'c' THEN 'CASCADE'
WHEN confdeltype = 'n' THEN 'SET NULL'
ELSE 'SET DEFAULT' END) || ';'
FROM pg_constraint con
JOIN pg_class c ON con.conrelid = c.oid
JOIN pg_class f ON con.confrelid = f.oid
WHERE con.contype = 'f'
AND (con.conrelid = %s::regclass OR con.confrelid = %s::regclass)
AND (CASE WHEN confdeltype = 'a' THEN 'NO ACTION'
WHEN confdeltype = 'r' THEN 'RESTRICT'
WHEN confdeltype = 'c' THEN 'CASCADE'
WHEN confdeltype = 'n' THEN 'SET NULL'
ELSE 'SET DEFAULT' END) = ANY(%s);
""",
(table_name, table_name, delete_actions),
)
result = env.cr.fetchall()
readd_scripts.append((table_name, [row[0] for row in result]))
return readd_scripts
def _drop_foreign_keys(env, table_name, delete_actions):
"""
Drops foreign key constraints for the specified table based on delete
actions.
Parameters:
- env: Odoo environment.
- table_name: Table name.
- delete_actions: List of delete actions to drop.
"""
openupgrade.logged_query(
env.cr,
"""
DO $$
DECLARE
r RECORD;
BEGIN
FOR r IN
(SELECT conname, c.relname AS table_name
FROM pg_constraint con
JOIN pg_class c ON con.conrelid = c.oid
WHERE con.contype = 'f'
AND (con.conrelid = %s::regclass OR con.confrelid = %s::regclass)
AND (CASE WHEN confdeltype = 'a' THEN 'NO ACTION'
WHEN confdeltype = 'r' THEN 'RESTRICT'
WHEN confdeltype = 'c' THEN 'CASCADE'
WHEN confdeltype = 'n' THEN 'SET NULL'
ELSE 'SET DEFAULT' END) = ANY(%s))
LOOP
EXECUTE 'ALTER TABLE ' || quote_ident(r.table_name) ||
' DROP CONSTRAINT ' || quote_ident(r.conname);
END LOOP;
END $$;
""",
(table_name, table_name, delete_actions),
)
def _readd_foreign_keys_for_tables(env, readd_scripts):
"""
Re-add foreign key constraints for all tables using the provided re-add
scripts.
Parameters:
- env: Odoo environment.
- readd_scripts: List of re-add scripts for each table.
"""
for _, statements in readd_scripts:
for statement in statements:
openupgrade.logged_query(env.cr, statement)
def run_without_fk_constraints(
env, statements, table_names, delete_actions=None, module=MODULE, verbose=False
):
"""
Execute a list of SQL statements without foreign key constraints for
multiple tables.
1. Drop constraints for all specified tables.
2. Execute the provided statements.
3. Re-add constraints for all specified tables.
Parameters:
- env: Odoo environment.
- statements: List of SQL statements to execute.
- table_names: List of table names.
- delete_actions: List of delete actions to drop; default is all types.
- module: Module name for logging; defaults to MODULE.
- verbose: Boolean to enable verbose logging of constraint details.
"""
if delete_actions is None:
delete_actions = ["NO ACTION", "RESTRICT", "CASCADE", "SET NULL", "SET DEFAULT"]
# Step 1: Get foreign key constraints before dropping
before_dropping = {}
for table_name in table_names:
before_dropping[table_name] = _get_foreign_key_constraints(
env, table_name, delete_actions
)
if verbose:
openupgrade.message(
env.cr,
module=module,
table=table_name,
column=False,
message=(
"Before dropping constraints: "
""
f"{pprint.pformat(before_dropping[table_name])}"
),
)
# Step 2: Generate re-adding script for all tables
readd_scripts = _generate_readd_scripts_for_tables(env, table_names, delete_actions)
# Step 3: Drop foreign keys for all specified tables
openupgrade.message(
env.cr,
module=module,
table=False,
column=False,
message=(
"Dropping foreign key constraints with delete "
f"actions {', '.join(delete_actions)}..."
),
)
for table_name in table_names:
_drop_foreign_keys(env, table_name, delete_actions)
openupgrade.message(
env.cr,
module=module,
table=table_name,
column=False,
message=(
"Foreign key constraints with delete actions "
f"{', '.join(delete_actions)} "
f"dropped for table {table_name}."
),
)
# Check if all constraints are actually dropped
any_remaining = False
for table_name in table_names:
remaining_constraints = _get_foreign_key_constraints(
env, table_name, delete_actions
)
if remaining_constraints:
any_remaining = True
openupgrade.message(
env.cr,
module=module,
table=table_name,
column=False,
message=(
f"Remaining constraints with delete actions "
f"{', '.join(delete_actions)} after dropping "
f"for table {table_name}: "
f"{pprint.pformat(remaining_constraints)}"
),
)
if any_remaining:
raise Exception("Foreign key constraints were not completely dropped.")
# Step 4: Execute the provided statements
openupgrade.message(
env.cr,
module=module,
table=False,
column=False,
message="Executing unconstrained statements...",
)
for statement in statements:
openupgrade.logged_query(env.cr, statement)
# Step 5: Re-add foreign keys for all specified tables
_readd_foreign_keys_for_tables(env, readd_scripts)
# Step 6: Get foreign key constraints after re-adding
after_readding = {}
for table_name in table_names:
after_readding[table_name] = _get_foreign_key_constraints(
env, table_name, delete_actions
)
if verbose:
openupgrade.message(
env.cr,
module=module,
table=table_name,
column=False,
message=(
"After re-adding constraints: "
f"{pprint.pformat(after_readding[table_name])}"
),
)
# Step 7: Compare before and after constraints
any_differences = False
for table_name in table_names:
after_adding = after_readding[table_name]
if after_adding != before_dropping[table_name] or verbose:
openupgrade.message(
env.cr,
module=module,
table=table_name,
column=False,
message=(
f"Constraints mismatch for table {table_name}:\n"
f"Before re-adding: {pprint.pformat(before_dropping[table_name])}\n"
f"After re-adding: {pprint.pformat(after_adding)}"
),
)
any_differences = True
if any_differences:
raise Exception("Constraints mismatch after re-adding for one or more tables.")
else:
openupgrade.message(
env.cr,
module=module,
table=False,
column=False,
message="Constraints identical after re-adding for all tables.",
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment