Created
April 1, 2025 07:38
-
-
Save fanannan/ecac93d0973044de680266e9d4fc69fd to your computer and use it in GitHub Desktop.
Duplicate a Django model instance
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| # Django モデルインスタンス複製ツール - ドキュメント | |
| ## 使い方 | |
| ```python | |
| # インポート方法 | |
| from your_app.utils import duplicate_instance | |
| # 基本的な使用方法(すべての関連オブジェクトを複製する) | |
| original_instance = YourModel.objects.get(id=1) | |
| new_instance = duplicate_instance(original_instance) | |
| # 関連オブジェクトを複製せず、同じオブジェクトを参照する場合 | |
| new_instance = duplicate_instance(original_instance, duplicate_relations=False) | |
| # 特定のフィールドを除外する場合 | |
| new_instance = duplicate_instance(original_instance, exclude_fields=['field1', 'field2']) | |
| # トランザクションを使用した安全な複製(推奨) | |
| from django.db import transaction | |
| with transaction.atomic(): | |
| new_instance = duplicate_instance(original_instance) | |
| ``` | |
| ## コードの構造 | |
| このツールは、Djangoモデルインスタンスを効率的に複製するための一連の機能を提供します。 | |
| 1. `duplicate_instance`: メイン関数。インスタンスの複製プロセス全体を調整します。 | |
| 2. `_copy_basic_fields`: 基本的なフィールド値を新しいインスタンスにコピーします。 | |
| 3. `_process_direct_relations`: ForeignKeyやOneToOneなどの直接関係を処理します(複製モード)。 | |
| 4. `_reference_direct_relations`: 直接関係を処理しますが、元のオブジェクトを参照します(参照モード)。 | |
| 5. `_process_many_to_many`: ManyToManyリレーションシップを処理します。 | |
| 6. `_process_reverse_relations`: 逆関係(他のモデルからこのモデルを指すもの)を処理します。 | |
| 7. `_duplicate_reverse_one_to_one`: 逆OneToOne関係を複製します。 | |
| 8. `_duplicate_reverse_foreign_key`: 逆ForeignKey関係を複製します。 | |
| 9. `_process_generic_foreign_keys`: GenericForeignKeyリレーションシップを処理します。 | |
| 10. `_process_generic_relations`: GenericRelation(逆ジェネリック関係)を処理します。 | |
| ## 特徴 | |
| - **関連オブジェクトの処理オプション**: 関連オブジェクトを複製するか、単に参照するかを選択できます。 | |
| - **循環参照の対応**: 相互参照モデルでも安全に機能します。 | |
| - **柔軟な除外**: 特定のフィールドを複製から除外できます。 | |
| - **モジュラー設計**: 機能ごとに分割された明確な構造で、メンテナンスが容易です。 | |
| - **モデル継承サポート**: 親モデルからの継承フィールドも正しく複製されます。 | |
| - **GenericForeignKeyサポート**: contenttypesフレームワークを使用したジェネリック関係にも対応しています。 | |
| - **エラーログ機能**: 問題が発生した場合は詳細なログが記録されます。 | |
| ## 注意事項と制限 | |
| 1. **カスタムsaveメソッド**: モデルにカスタムsaveメソッドがある場合、複製中に呼び出されます。 | |
| 2. **制約条件**: 複雑なユニーク制約がある場合、追加の処理が必要になる場合があります。 | |
| 3. **Djangoシグナル**: 複製プロセス中にシグナル(pre_save、post_saveなど)が発生します。 | |
| 4. **大規模な取引**: 深くネストされたモデルでは大きなトランザクションが発生する可能性があります。トランザクション管理の使用を推奨します。 | |
| 5. **ファイル/画像フィールド**: ファイル参照はコピーされますが、実際のファイルは複製されません。必要に応じて手動でファイルの複製処理を追加してください。 | |
| 6. **データベース特有の機能**: データベース特有のフィールドタイプは追加のサポートが必要な場合があります。 | |
| 7. **パフォーマンス**: 多数の関連を持つ複雑なモデルの複製はリソースを多く消費する場合があります。 | |
| 8. **ManyToManyの動作**: M2M関係は常に参照のみが複製され、関連オブジェクト自体は複製されません。 | |
| 9. **カスタム中間モデル**: ManyToManyフィールドにカスタム中間モデルがある場合、中間モデルのカスタムフィールドは複製されません。 | |
| 10. **auto_nowフィールド**: `auto_now`および`auto_now_add`が設定されたフィールドは、新しいインスタンスでは現在の時刻が設定されます(元の値は保持されません)。 | |
| ## 使用例 | |
| ```python | |
| # ブログ記事を複製する例 | |
| original_post = BlogPost.objects.get(id=123) | |
| new_post = duplicate_instance(original_post) | |
| # 記事の内容を更新 | |
| new_post.title = "新しいタイトル(コピー)" | |
| new_post.slug = "new-post-copy" | |
| new_post.published = False | |
| new_post.save() | |
| # 製品を複製するが、在庫情報は複製しない例 | |
| original_product = Product.objects.get(id=456) | |
| new_product = duplicate_instance( | |
| original_product, | |
| exclude_fields=['stock_level', 'sku', 'barcode'] | |
| ) | |
| # GenericForeignKeyを使用したコメントシステムの例 | |
| original_comment = Comment.objects.get(id=789) | |
| new_comment = duplicate_instance(original_comment) | |
| ``` | |
| ## exclude_fieldsの動作 | |
| exclude_fieldsに指定したフィールドは、新しいインスタンスではモデルのデフォルト値が使用されます。 | |
| 現在のコードの動作を詳しく説明すると: | |
| - exclude_fieldsに含まれるフィールドは、元のインスタンスから新しいインスタンスへコピーされません。 | |
| - 新しいインスタンスを作成する際(model_class())、Djangoはモデルに定義されたデフォルト値を自動的に設定します。 | |
| したがって、除外されたフィールドは: | |
| - モデル定義でデフォルト値が指定されていれば、そのデフォルト値が使用されます | |
| - デフォルト値が指定されていない場合は、Djangoのデフォルト(文字列ならば空文字列、整数ならNoneなど)が使用されます | |
| - null=TrueのフィールドはNoneになります | |
| ## 逆参照 | |
| 逆参照(reverse relations)の扱いは、duplicate_relationsパラメータに依存します。このコードでは逆参照の処理は非常に重要な部分で、以下のように動作します: | |
| **duplicate_relations=Trueの場合(デフォルト)**: | |
| - 元のインスタンスを参照している他のオブジェクト(逆参照)も複製されます | |
| - 新しく複製された関連オブジェクトは、新しい親インスタンスを指すように更新されます | |
| - これにより、元のオブジェクトグラフ全体が複製されます | |
| **duplicate_relations=Falseの場合**: | |
| - 逆参照オブジェクトは複製されません | |
| - 新しいインスタンスは最初、逆参照されているオブジェクトを持ちません | |
| - 元のインスタンスを参照しているオブジェクトは、引き続き元のインスタンスのみを参照します | |
| ## GenericForeignKeyとGenericRelationの処理 | |
| このコードは、Djangoの`contenttypes`フレームワークを使用したGenericForeignKeyとGenericRelationも適切に処理します: | |
| - GenericForeignKeyフィールドは基盤となるcontenttype_idとobject_idフィールドを通じて処理されます | |
| - duplicate_relations=Trueの場合、GenericForeignKeyが参照するオブジェクトも複製されます | |
| - GenericRelation(逆参照)は他の逆参照と同様に処理されます | |
| ## データの一貫性と安全性 | |
| 大規模なオブジェクトグラフを複製する場合は、常にトランザクション内で操作を実行することをお勧めします: | |
| ```python | |
| from django.db import transaction | |
| with transaction.atomic(): | |
| new_instance = duplicate_instance(complex_instance) | |
| ``` | |
| これにより、複製プロセス中にエラーが発生した場合でもデータベースの一貫性が保たれます。 | |
| ## エラー処理 | |
| このコードは様々なエラーケースに対応するために堅牢に設計されています。ロギングシステムを使用して問題を診断できます: | |
| ```python | |
| import logging | |
| logging.basicConfig(level=logging.WARNING) | |
| ``` | |
| エラーが発生した場合は、詳細なログメッセージが記録されます。 | |
| ## 高度な使用法 | |
| 複雑なモデル構造や特殊なニーズに対応するために、必要に応じてこのコードを拡張できます。モジュラー設計になっているため、特定の部分だけを変更することが容易です。 | |
| ## モデル継承の対応 | |
| 親モデルから継承したフィールドも、子モデルのフィールドと同様に複製されます。抽象基底クラス、多重テーブル継承、プロキシモデルのすべてのケースで正しく動作します。 | |
| """ | |
| from django.db import models | |
| from django.contrib.contenttypes.fields import GenericForeignKey, GenericRelation | |
| from django.contrib.contenttypes.models import ContentType | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| def duplicate_instance(instance, exclude_fields=None, duplicate_relations=True, _processed_instances=None): | |
| """ | |
| Create a complete duplicate of a Django model instance with options for handling relations. | |
| Args: | |
| instance: The model instance to duplicate | |
| exclude_fields: List of field names to exclude from duplication | |
| duplicate_relations: If True, related objects will be duplicated | |
| If False, new instance will reference the same related objects | |
| _processed_instances: Internal parameter to prevent infinite recursion | |
| Returns: | |
| The new duplicated instance | |
| """ | |
| if exclude_fields is None: | |
| exclude_fields = [] | |
| # Initialize recursion tracking if this is the top-level call | |
| if _processed_instances is None: | |
| _processed_instances = {} | |
| # Guard against None instances | |
| if instance is None: | |
| return None | |
| # Prevent infinite recursion by tracking processed instances | |
| instance_key = f"{instance.__class__.__name__}_{instance.pk}" | |
| if instance_key in _processed_instances: | |
| return _processed_instances[instance_key] | |
| # Create basic copy with regular fields (but don't save yet) | |
| new_instance = _copy_basic_fields(instance, exclude_fields) | |
| # Store reference to prevent infinite recursion in case of circular references | |
| _processed_instances[instance_key] = new_instance | |
| # Process direct relations (where this model points to others) | |
| if duplicate_relations: | |
| _process_direct_relations(instance, new_instance, exclude_fields, _processed_instances) | |
| else: | |
| _reference_direct_relations(instance, new_instance, exclude_fields) | |
| # Process generic foreign keys | |
| _process_generic_foreign_keys(instance, new_instance, exclude_fields, duplicate_relations, _processed_instances) | |
| # Process many-to-many relationships | |
| _process_many_to_many(instance, new_instance, exclude_fields) | |
| # Now save the instance with all direct relations set | |
| new_instance.save() | |
| # Process reverse relations (where other models point to this one) | |
| if duplicate_relations: | |
| _process_reverse_relations(instance, new_instance, exclude_fields, _processed_instances) | |
| _process_generic_relations(instance, new_instance, exclude_fields, _processed_instances) | |
| # Final save to ensure all relations are properly stored | |
| new_instance.save() | |
| return new_instance | |
| def _copy_basic_fields(instance, exclude_fields): | |
| """Copy basic field values to a new model instance.""" | |
| model_class = instance.__class__ | |
| new_instance = model_class() | |
| # Get all fields including those from parent models | |
| for field in model_class._meta.get_fields(): | |
| # Skip if not a concrete field (abstract or through m2m fields) | |
| if not hasattr(field, 'concrete') or not field.concrete: | |
| continue | |
| # Skip primary key, excluded fields, and relations | |
| if field.primary_key or field.name in exclude_fields or ( | |
| field.is_relation and not isinstance(field, models.ForeignKey) and | |
| not isinstance(field, models.OneToOneField) | |
| ): | |
| continue | |
| # Skip auto_now and auto_now_add fields (they'll be set on save) | |
| if hasattr(field, 'auto_now') and field.auto_now: | |
| continue | |
| if hasattr(field, 'auto_now_add') and field.auto_now_add: | |
| continue | |
| # Skip relations that will be handled separately | |
| if field.is_relation: | |
| continue | |
| # Copy regular field value | |
| try: | |
| setattr(new_instance, field.name, getattr(instance, field.name)) | |
| except Exception as e: | |
| logger.warning(f"Could not copy field {field.name}: {str(e)}") | |
| # Don't save yet - we'll do that after setting relations | |
| return new_instance | |
| def _process_direct_relations(instance, new_instance, exclude_fields, processed_instances): | |
| """Process ForeignKey and OneToOne fields where this model points to others, duplicating them.""" | |
| model_class = instance.__class__ | |
| for field in model_class._meta.get_fields(): | |
| # Skip if not a concrete field | |
| if not hasattr(field, 'concrete') or not field.concrete: | |
| continue | |
| # Skip non-relations and excluded fields | |
| if not field.is_relation or field.name in exclude_fields: | |
| continue | |
| # Skip M2M and reverse relations | |
| if field.many_to_many or field.one_to_many: | |
| continue | |
| # Skip GenericForeignKey as they're handled separately | |
| if hasattr(field, 'ct_field'): | |
| continue | |
| # Process only direct ForeignKey and OneToOne fields | |
| if isinstance(field, models.ForeignKey) or isinstance(field, models.OneToOneField): | |
| try: | |
| # Safely get related object (may be None for nullable fields) | |
| related_obj = getattr(instance, field.name) | |
| if related_obj is None: | |
| continue | |
| # Create a duplicate of the related object | |
| related_copy = duplicate_instance( | |
| related_obj, | |
| exclude_fields=exclude_fields, | |
| duplicate_relations=True, | |
| _processed_instances=processed_instances | |
| ) | |
| setattr(new_instance, field.name, related_copy) | |
| except Exception as e: | |
| logger.warning(f"Error duplicating relation {field.name}: {str(e)}") | |
| def _reference_direct_relations(instance, new_instance, exclude_fields): | |
| """Process ForeignKey and OneToOne fields where this model points to others, referencing existing objects.""" | |
| model_class = instance.__class__ | |
| for field in model_class._meta.get_fields(): | |
| # Skip if not a concrete field | |
| if not hasattr(field, 'concrete') or not field.concrete: | |
| continue | |
| # Skip non-relations and excluded fields | |
| if not field.is_relation or field.name in exclude_fields: | |
| continue | |
| # Skip M2M and reverse relations | |
| if field.many_to_many or field.one_to_many: | |
| continue | |
| # Skip GenericForeignKey as they're handled separately | |
| if hasattr(field, 'ct_field'): | |
| continue | |
| # Process only direct ForeignKey and OneToOne fields | |
| if isinstance(field, models.ForeignKey) or isinstance(field, models.OneToOneField): | |
| try: | |
| # Safely get related object (may be None for nullable fields) | |
| related_obj = getattr(instance, field.name) | |
| if related_obj is None: | |
| continue | |
| # Reference the same related object | |
| setattr(new_instance, field.name, related_obj) | |
| except Exception as e: | |
| logger.warning(f"Error referencing relation {field.name}: {str(e)}") | |
| def _process_generic_foreign_keys(instance, new_instance, exclude_fields, duplicate_relations, processed_instances): | |
| """Process GenericForeignKey fields.""" | |
| model_class = instance.__class__ | |
| # Find all GenericForeignKey fields | |
| for field in model_class._meta.private_fields: | |
| if not isinstance(field, GenericForeignKey): | |
| continue | |
| if field.name in exclude_fields: | |
| continue | |
| try: | |
| # Get content type and object id field names | |
| ct_field = field.ct_field | |
| fk_field = field.fk_field | |
| # These should already be copied as basic fields, but let's ensure | |
| if hasattr(instance, ct_field) and hasattr(instance, fk_field): | |
| ct_value = getattr(instance, ct_field) | |
| fk_value = getattr(instance, fk_field) | |
| if ct_value is not None and fk_value is not None: | |
| # Ensure new_instance is saved and has a primary key before setting relations | |
| if new_instance.pk is None: | |
| new_instance.save() | |
| # Set the same content type | |
| setattr(new_instance, ct_field, ct_value) | |
| if duplicate_relations: | |
| # Get the actual related object | |
| related_obj = getattr(instance, field.name) | |
| if related_obj is not None: | |
| # Duplicate the related object | |
| related_copy = duplicate_instance( | |
| related_obj, | |
| exclude_fields=exclude_fields, | |
| duplicate_relations=True, | |
| _processed_instances=processed_instances | |
| ) | |
| # Set the new object ID | |
| setattr(new_instance, fk_field, related_copy.pk) | |
| else: | |
| # Just reference the same object | |
| setattr(new_instance, fk_field, fk_value) | |
| except Exception as e: | |
| logger.warning(f"Error processing GenericForeignKey {field.name}: {str(e)}") | |
| def _process_many_to_many(instance, new_instance, exclude_fields): | |
| """Process ManyToMany relationships.""" | |
| model_class = instance.__class__ | |
| for field in model_class._meta.get_fields(): | |
| if not field.many_to_many or field.name in exclude_fields: | |
| continue | |
| # Skip reverse relations | |
| if field.auto_created: | |
| continue | |
| try: | |
| # Get the related manager for this m2m field | |
| source_m2m = getattr(instance, field.name) | |
| # We need to save before we can add m2m relations | |
| if new_instance.pk is None: | |
| new_instance.save() | |
| target_m2m = getattr(new_instance, field.name) | |
| # Add the same related objects (not duplicating them) | |
| related_objects = list(source_m2m.all()) | |
| if related_objects: | |
| target_m2m.add(*related_objects) | |
| except Exception as e: | |
| logger.warning(f"Error processing M2M field {field.name}: {str(e)}") | |
| def _process_reverse_relations(instance, new_instance, exclude_fields, processed_instances): | |
| """Process reverse relations (where other models point to this one).""" | |
| model_class = instance.__class__ | |
| # Find all related objects that point to our instance | |
| for related_object in [f for f in model_class._meta.get_fields() if f.one_to_many or f.one_to_one and f.auto_created]: | |
| # Skip excluded relations | |
| if related_object.name in exclude_fields: | |
| continue | |
| # Skip GenericRelation as they're handled separately | |
| if isinstance(related_object, GenericRelation): | |
| continue | |
| # Get the accessor name | |
| accessor_name = related_object.get_accessor_name() | |
| # Skip if the accessor doesn't exist | |
| if not hasattr(instance, accessor_name): | |
| continue | |
| # Handle OneToOne reverse relations | |
| if related_object.one_to_one: | |
| _duplicate_reverse_one_to_one( | |
| instance, new_instance, related_object, accessor_name, exclude_fields, processed_instances | |
| ) | |
| # Handle ForeignKey reverse relations | |
| elif related_object.one_to_many: | |
| _duplicate_reverse_foreign_key( | |
| instance, new_instance, related_object, accessor_name, exclude_fields, processed_instances | |
| ) | |
| def _process_generic_relations(instance, new_instance, exclude_fields, processed_instances): | |
| """Process GenericRelation fields (objects that reference this one via GenericForeignKey).""" | |
| model_class = instance.__class__ | |
| # Find all GenericRelation fields pointing to this model | |
| for field in model_class._meta.get_fields(): | |
| if not isinstance(field, GenericRelation) or field.name in exclude_fields: | |
| continue | |
| try: | |
| # Get the accessor name | |
| accessor_name = field.name | |
| # Get related objects | |
| related_manager = getattr(instance, accessor_name) | |
| related_objects = list(related_manager.all()) | |
| for related_obj in related_objects: | |
| # Find which GenericForeignKey points to our instance | |
| content_type = ContentType.objects.get_for_model(instance) | |
| # Find the GenericForeignKey in the related model | |
| generic_fk = None | |
| for gfk in related_obj.__class__._meta.private_fields: | |
| if isinstance(gfk, GenericForeignKey): | |
| # Check if this GenericForeignKey points to our instance | |
| if (getattr(related_obj, gfk.ct_field) == content_type and | |
| str(getattr(related_obj, gfk.fk_field)) == str(instance.pk)): | |
| generic_fk = gfk | |
| break | |
| if generic_fk: | |
| # Create a copy of the related object | |
| field_to_exclude = [generic_fk.name] + list(exclude_fields) | |
| related_copy = duplicate_instance( | |
| related_obj, | |
| exclude_fields=field_to_exclude, | |
| duplicate_relations=True, | |
| _processed_instances=processed_instances | |
| ) | |
| # Set the content type field (should be the same) | |
| # Set the object id to our new instance | |
| setattr(related_copy, generic_fk.ct_field, content_type) | |
| setattr(related_copy, generic_fk.fk_field, new_instance.pk) | |
| related_copy.save() | |
| except Exception as e: | |
| logger.warning(f"Error processing GenericRelation {field.name}: {str(e)}") | |
| def _duplicate_reverse_one_to_one(instance, new_instance, related_object, accessor_name, exclude_fields, processed_instances): | |
| """Duplicate a reverse OneToOne relation.""" | |
| try: | |
| related_instance = getattr(instance, accessor_name) | |
| if related_instance: | |
| # Create a copy but exclude the field pointing back to prevent infinite recursion | |
| field_to_exclude = list(exclude_fields) + [related_object.field.name] | |
| related_copy = duplicate_instance( | |
| related_instance, | |
| exclude_fields=field_to_exclude, | |
| duplicate_relations=True, | |
| _processed_instances=processed_instances | |
| ) | |
| # Update the copy to point to our new instance | |
| setattr(related_copy, related_object.field.name, new_instance) | |
| related_copy.save() | |
| except models.ObjectDoesNotExist: | |
| # No related object exists | |
| pass | |
| except AttributeError as e: | |
| logger.warning(f"AttributeError in reverse OneToOne relation {accessor_name}: {str(e)}") | |
| except Exception as e: | |
| logger.warning(f"Error in reverse OneToOne relation {accessor_name}: {str(e)}") | |
| def _duplicate_reverse_foreign_key(instance, new_instance, related_object, accessor_name, exclude_fields, processed_instances): | |
| """Duplicate reverse ForeignKey relations.""" | |
| try: | |
| related_manager = getattr(instance, accessor_name) | |
| related_instances = list(related_manager.all()) | |
| for related_instance in related_instances: | |
| # Create a copy but exclude the field pointing back to prevent infinite recursion | |
| field_to_exclude = list(exclude_fields) + [related_object.field.name] | |
| related_copy = duplicate_instance( | |
| related_instance, | |
| exclude_fields=field_to_exclude, | |
| duplicate_relations=True, | |
| _processed_instances=processed_instances | |
| ) | |
| # Update the copy to point to our new instance | |
| setattr(related_copy, related_object.field.name, new_instance) | |
| related_copy.save() | |
| except AttributeError as e: | |
| logger.warning(f"AttributeError in reverse ForeignKey relation {accessor_name}: {str(e)}") | |
| except Exception as e: | |
| logger.warning(f"Error in reverse ForeignKey relation {accessor_name}: {str(e)}") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment