Created
May 5, 2025 11:14
-
-
Save adlvdl/80dd2fc51d6360c7c25228d74f6d36e2 to your computer and use it in GitHub Desktop.
Results of Gemini prompt asking for a Python program to obtain MMPs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Import necessary libraries | |
| import pandas as pd | |
| from rdkit import Chem | |
| from rdkit.Chem import rdMMPA | |
| from collections import defaultdict | |
| import logging | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| # Helper function to safely convert SMILES to Mol and count heavy atoms | |
| def count_heavy_atoms(smiles_string): | |
| """Calculates the number of heavy atoms in a SMILES string.""" | |
| mol = Chem.MolFromSmiles(smiles_string, sanitize=False) # Don't sanitize fragments like [*]H | |
| if mol is None: | |
| # Handle cases like [*]H which might fail standard sanitization | |
| # Try removing dummy atoms before counting if parsing failed | |
| try: | |
| # Replace dummy atoms [*] with H temporarily for counting, | |
| # but this might not be universally robust for complex fragments. | |
| # A safer approach is to handle None return. | |
| mol_no_dummy = Chem.MolFromSmiles(smiles_string.replace('[*]', ''), sanitize=True) | |
| if mol_no_dummy: | |
| return mol_no_dummy.GetNumHeavyAtoms() | |
| else: | |
| # Specific check for [*]H fragment | |
| if smiles_string == '[*]H': | |
| return 0 # Hydrogen itself is not a heavy atom | |
| else: | |
| logging.warning(f"Could not parse fragment SMILES for heavy atom count: {smiles_string}") | |
| return 0 # Treat unparsable fragments as having 0 heavy atoms for comparison | |
| except: | |
| logging.warning(f"Exception processing fragment SMILES for heavy atom count: {smiles_string}") | |
| return 0 | |
| else: | |
| # RDKit's GetNumHeavyAtoms correctly ignores dummy atoms like [*] | |
| return mol.GetNumHeavyAtoms() | |
| def find_best_mmp_pairs(df, id_col='INCHIKEY', smiles_col='SMILES', max_pairs=1000000, max_cuts=1): | |
| """ | |
| Finds Matched Molecular Pairs (MMPs) from a DataFrame. | |
| Args: | |
| df (pd.DataFrame): DataFrame with molecule identifiers and SMILES. | |
| id_col (str): Name of the column containing unique molecule identifiers. | |
| smiles_col (str): Name of the column containing SMILES strings. | |
| max_pairs (int): Maximum number of MMPs to return (passed to FindMMPAX). | |
| max_cuts (int): Maximum number of cuts to identify MMPs (typically 1 for standard MMPs). | |
| Returns: | |
| pd.DataFrame: DataFrame containing the best MMP for each molecule pair. | |
| Columns: 'ID1', 'ID2', 'Core', 'Fragment1', 'Fragment2'. | |
| """ | |
| logging.info("Starting MMP analysis...") | |
| # --- 1. Prepare Molecule Objects --- | |
| mols = [] | |
| mol_ids = [] | |
| valid_indices = [] # Keep track of original df indices for valid molecules | |
| for i, row in df.iterrows(): | |
| smiles = row[smiles_col] | |
| mol_id = row[id_col] | |
| if pd.isna(smiles) or pd.isna(mol_id): | |
| logging.warning(f"Skipping row {i} due to missing SMILES or ID.") | |
| continue | |
| try: | |
| mol = Chem.MolFromSmiles(str(smiles)) | |
| if mol: | |
| # Store the original identifier with the molecule object | |
| mol.SetProp("_Name", str(mol_id)) | |
| mols.append(mol) | |
| mol_ids.append(str(mol_id)) | |
| valid_indices.append(i) | |
| else: | |
| logging.warning(f"Could not parse SMILES: {smiles} (ID: {mol_id})") | |
| except Exception as e: | |
| logging.error(f"Error processing SMILES: {smiles} (ID: {mol_id}): {e}") | |
| if not mols: | |
| logging.error("No valid molecules could be processed.") | |
| return pd.DataFrame(columns=['ID1', 'ID2', 'Core', 'Fragment1', 'Fragment2']) | |
| logging.info(f"Successfully parsed {len(mols)} out of {len(df)} input structures.") | |
| # --- 2. Find MMPs using RDKit --- | |
| # rdMMPA.FindMMPAX takes a list of Mol objects | |
| # It returns tuples: (mol_idx1, mol_idx2, (core_smiles, frag1_smiles, frag2_smiles)) | |
| logging.info(f"Running rdMMPA.FindMMPAX (max_pairs={max_pairs}, max_cuts={max_cuts})...") | |
| try: | |
| mmpa_results = rdMMPA.FindMMPAX(mols, maxPairs=max_pairs, maxCuts=max_cuts) | |
| # Note: FindMMPAX might be memory intensive for large datasets. | |
| # Consider rdMMPA.FragmentMol followed by indexing if memory becomes an issue, | |
| # though FindMMPAX is generally more direct for finding pairs. | |
| except Exception as e: | |
| logging.error(f"Error during rdMMPA.FindMMPAX execution: {e}") | |
| return pd.DataFrame(columns=['ID1', 'ID2', 'Core', 'Fragment1', 'Fragment2']) | |
| logging.info("Finished rdMMPA.FindMMPAX. Processing results...") | |
| # --- 3. Filter for the best MMP per molecule pair --- | |
| # Store the best MMP found so far for each unique pair of molecule IDs | |
| # Key: tuple(sorted(id1, id2)), Value: dict with best mmp info | |
| best_mmp_for_pair = defaultdict(lambda: {'core_heavy_atoms': -1, 'total_frag_heavy_atoms': float('inf'), 'mmp_data': None}) | |
| count = 0 | |
| for idx1, idx2, (core_smiles, frag1_smiles, frag2_smiles) in mmpa_results: | |
| count += 1 | |
| if count % 10000 == 0: | |
| logging.info(f"Processed {count} potential MMPs from FindMMPAX...") | |
| id1 = mol_ids[idx1] | |
| id2 = mol_ids[idx2] | |
| # Ensure consistent ordering of IDs for the key | |
| pair_key = tuple(sorted((id1, id2))) | |
| # Calculate size metrics for filtering | |
| current_core_heavy_atoms = count_heavy_atoms(core_smiles) | |
| current_frag1_heavy_atoms = count_heavy_atoms(frag1_smiles) | |
| current_frag2_heavy_atoms = count_heavy_atoms(frag2_smiles) | |
| current_total_frag_heavy_atoms = current_frag1_heavy_atoms + current_frag2_heavy_atoms | |
| # Get the current best MMP recorded for this pair | |
| best_so_far = best_mmp_for_pair[pair_key] | |
| # Determine if the current MMP is better than the best found so far | |
| is_better = False | |
| if current_core_heavy_atoms > best_so_far['core_heavy_atoms']: | |
| # Larger core is always better | |
| is_better = True | |
| elif current_core_heavy_atoms == best_so_far['core_heavy_atoms']: | |
| # If cores are same size, prefer smaller total fragment size | |
| if current_total_frag_heavy_atoms < best_so_far['total_frag_heavy_atoms']: | |
| is_better = True | |
| elif current_total_frag_heavy_atoms == best_so_far['total_frag_heavy_atoms']: | |
| # Tie-breaking (optional): Could prefer shorter SMILES strings, etc. | |
| # For now, just keep the first one encountered with the same metrics. | |
| pass # Keep the existing one unless strictly better | |
| # Update the best MMP if the current one is better | |
| if is_better: | |
| # Ensure consistent ordering in the final output as well | |
| final_id1, final_id2 = pair_key | |
| final_frag1, final_frag2 = (frag1_smiles, frag2_smiles) if id1 == final_id1 else (frag2_smiles, frag1_smiles) | |
| best_mmp_for_pair[pair_key] = { | |
| 'core_heavy_atoms': current_core_heavy_atoms, | |
| 'total_frag_heavy_atoms': current_total_frag_heavy_atoms, | |
| 'mmp_data': { | |
| 'ID1': final_id1, | |
| 'ID2': final_id2, | |
| 'Core': core_smiles, | |
| 'Fragment1': final_frag1, # Fragment associated with ID1 | |
| 'Fragment2': final_frag2 # Fragment associated with ID2 | |
| } | |
| } | |
| logging.info(f"Finished processing {count} potential MMPs. Found {len(best_mmp_for_pair)} unique best pairs.") | |
| # --- 4. Format Output --- | |
| output_data = [mmp_info['mmp_data'] for mmp_info in best_mmp_for_pair.values() if mmp_info['mmp_data']] | |
| if not output_data: | |
| logging.warning("No MMPs found matching the criteria.") | |
| return pd.DataFrame(columns=['ID1', 'ID2', 'Core', 'Fragment1', 'Fragment2']) | |
| result_df = pd.DataFrame(output_data) | |
| logging.info(f"Returning DataFrame with {len(result_df)} MMPs.") | |
| return result_df | |
| # --- Example Usage --- | |
| if __name__ == "__main__": | |
| # Create a sample Pandas DataFrame | |
| data = { | |
| 'INCHIKEY': ['ID_A', 'ID_B', 'ID_C', 'ID_D', 'ID_E', 'ID_F', 'ID_G'], | |
| 'SMILES': [ | |
| 'CCO', # Ethanol (ID_A) | |
| 'CCC', # Propane (ID_B) - Should not pair with Ethanol easily by single cut | |
| 'CCOCC', # Diethyl ether (ID_C) | |
| 'CCCO', # Propanol (ID_D) - MMP with Ethanol (A) | |
| 'CCCN', # Propylamine (ID_E) - MMP with Propanol (D) | |
| 'CC(C)O', # Isopropanol (ID_F) - MMP with Propanol (D) | |
| 'CCOc1ccccc1', # Phenoxyethanol (ID_G) - MMP with Ethanol (A) | |
| 'INVALID_SMILES', # Invalid SMILES (should be skipped) | |
| 'CC(=O)O', # Acetic Acid (should not pair with others easily) | |
| 'CC(=O)N', # Acetamide (MMP with Acetic Acid) | |
| ] | |
| } | |
| # Add more InChIKeys (ensure uniqueness for the example) | |
| data['INCHIKEY'].extend(['ID_H', 'ID_I', 'ID_J']) # Match length of SMILES | |
| input_df = pd.DataFrame(data) | |
| print("--- Input DataFrame ---") | |
| print(input_df) | |
| print("\n") | |
| # Find MMPs | |
| mmp_df = find_best_mmp_pairs(input_df, id_col='INCHIKEY', smiles_col='SMILES') | |
| print("--- Output MMP DataFrame ---") | |
| print(mmp_df) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment