Skip to content

Instantly share code, notes, and snippets.

@fanannan
Created April 1, 2025 07:38
Show Gist options
  • Select an option

  • Save fanannan/ecac93d0973044de680266e9d4fc69fd to your computer and use it in GitHub Desktop.

Select an option

Save fanannan/ecac93d0973044de680266e9d4fc69fd to your computer and use it in GitHub Desktop.
Duplicate a Django model instance
"""
# Django モデルインスタンス複製ツール - ドキュメント
## 使い方
```python
# インポート方法
from your_app.utils import duplicate_instance
# 基本的な使用方法(すべての関連オブジェクトを複製する)
original_instance = YourModel.objects.get(id=1)
new_instance = duplicate_instance(original_instance)
# 関連オブジェクトを複製せず、同じオブジェクトを参照する場合
new_instance = duplicate_instance(original_instance, duplicate_relations=False)
# 特定のフィールドを除外する場合
new_instance = duplicate_instance(original_instance, exclude_fields=['field1', 'field2'])
# トランザクションを使用した安全な複製(推奨)
from django.db import transaction
with transaction.atomic():
new_instance = duplicate_instance(original_instance)
```
## コードの構造
このツールは、Djangoモデルインスタンスを効率的に複製するための一連の機能を提供します。
1. `duplicate_instance`: メイン関数。インスタンスの複製プロセス全体を調整します。
2. `_copy_basic_fields`: 基本的なフィールド値を新しいインスタンスにコピーします。
3. `_process_direct_relations`: ForeignKeyやOneToOneなどの直接関係を処理します(複製モード)。
4. `_reference_direct_relations`: 直接関係を処理しますが、元のオブジェクトを参照します(参照モード)。
5. `_process_many_to_many`: ManyToManyリレーションシップを処理します。
6. `_process_reverse_relations`: 逆関係(他のモデルからこのモデルを指すもの)を処理します。
7. `_duplicate_reverse_one_to_one`: 逆OneToOne関係を複製します。
8. `_duplicate_reverse_foreign_key`: 逆ForeignKey関係を複製します。
9. `_process_generic_foreign_keys`: GenericForeignKeyリレーションシップを処理します。
10. `_process_generic_relations`: GenericRelation(逆ジェネリック関係)を処理します。
## 特徴
- **関連オブジェクトの処理オプション**: 関連オブジェクトを複製するか、単に参照するかを選択できます。
- **循環参照の対応**: 相互参照モデルでも安全に機能します。
- **柔軟な除外**: 特定のフィールドを複製から除外できます。
- **モジュラー設計**: 機能ごとに分割された明確な構造で、メンテナンスが容易です。
- **モデル継承サポート**: 親モデルからの継承フィールドも正しく複製されます。
- **GenericForeignKeyサポート**: contenttypesフレームワークを使用したジェネリック関係にも対応しています。
- **エラーログ機能**: 問題が発生した場合は詳細なログが記録されます。
## 注意事項と制限
1. **カスタムsaveメソッド**: モデルにカスタムsaveメソッドがある場合、複製中に呼び出されます。
2. **制約条件**: 複雑なユニーク制約がある場合、追加の処理が必要になる場合があります。
3. **Djangoシグナル**: 複製プロセス中にシグナル(pre_save、post_saveなど)が発生します。
4. **大規模な取引**: 深くネストされたモデルでは大きなトランザクションが発生する可能性があります。トランザクション管理の使用を推奨します。
5. **ファイル/画像フィールド**: ファイル参照はコピーされますが、実際のファイルは複製されません。必要に応じて手動でファイルの複製処理を追加してください。
6. **データベース特有の機能**: データベース特有のフィールドタイプは追加のサポートが必要な場合があります。
7. **パフォーマンス**: 多数の関連を持つ複雑なモデルの複製はリソースを多く消費する場合があります。
8. **ManyToManyの動作**: M2M関係は常に参照のみが複製され、関連オブジェクト自体は複製されません。
9. **カスタム中間モデル**: ManyToManyフィールドにカスタム中間モデルがある場合、中間モデルのカスタムフィールドは複製されません。
10. **auto_nowフィールド**: `auto_now`および`auto_now_add`が設定されたフィールドは、新しいインスタンスでは現在の時刻が設定されます(元の値は保持されません)。
## 使用例
```python
# ブログ記事を複製する例
original_post = BlogPost.objects.get(id=123)
new_post = duplicate_instance(original_post)
# 記事の内容を更新
new_post.title = "新しいタイトル(コピー)"
new_post.slug = "new-post-copy"
new_post.published = False
new_post.save()
# 製品を複製するが、在庫情報は複製しない例
original_product = Product.objects.get(id=456)
new_product = duplicate_instance(
original_product,
exclude_fields=['stock_level', 'sku', 'barcode']
)
# GenericForeignKeyを使用したコメントシステムの例
original_comment = Comment.objects.get(id=789)
new_comment = duplicate_instance(original_comment)
```
## exclude_fieldsの動作
exclude_fieldsに指定したフィールドは、新しいインスタンスではモデルのデフォルト値が使用されます。
現在のコードの動作を詳しく説明すると:
- exclude_fieldsに含まれるフィールドは、元のインスタンスから新しいインスタンスへコピーされません。
- 新しいインスタンスを作成する際(model_class())、Djangoはモデルに定義されたデフォルト値を自動的に設定します。
したがって、除外されたフィールドは:
- モデル定義でデフォルト値が指定されていれば、そのデフォルト値が使用されます
- デフォルト値が指定されていない場合は、Djangoのデフォルト(文字列ならば空文字列、整数ならNoneなど)が使用されます
- null=TrueのフィールドはNoneになります
## 逆参照
逆参照(reverse relations)の扱いは、duplicate_relationsパラメータに依存します。このコードでは逆参照の処理は非常に重要な部分で、以下のように動作します:
**duplicate_relations=Trueの場合(デフォルト)**:
- 元のインスタンスを参照している他のオブジェクト(逆参照)も複製されます
- 新しく複製された関連オブジェクトは、新しい親インスタンスを指すように更新されます
- これにより、元のオブジェクトグラフ全体が複製されます
**duplicate_relations=Falseの場合**:
- 逆参照オブジェクトは複製されません
- 新しいインスタンスは最初、逆参照されているオブジェクトを持ちません
- 元のインスタンスを参照しているオブジェクトは、引き続き元のインスタンスのみを参照します
## GenericForeignKeyとGenericRelationの処理
このコードは、Djangoの`contenttypes`フレームワークを使用したGenericForeignKeyとGenericRelationも適切に処理します:
- GenericForeignKeyフィールドは基盤となるcontenttype_idとobject_idフィールドを通じて処理されます
- duplicate_relations=Trueの場合、GenericForeignKeyが参照するオブジェクトも複製されます
- GenericRelation(逆参照)は他の逆参照と同様に処理されます
## データの一貫性と安全性
大規模なオブジェクトグラフを複製する場合は、常にトランザクション内で操作を実行することをお勧めします:
```python
from django.db import transaction
with transaction.atomic():
new_instance = duplicate_instance(complex_instance)
```
これにより、複製プロセス中にエラーが発生した場合でもデータベースの一貫性が保たれます。
## エラー処理
このコードは様々なエラーケースに対応するために堅牢に設計されています。ロギングシステムを使用して問題を診断できます:
```python
import logging
logging.basicConfig(level=logging.WARNING)
```
エラーが発生した場合は、詳細なログメッセージが記録されます。
## 高度な使用法
複雑なモデル構造や特殊なニーズに対応するために、必要に応じてこのコードを拡張できます。モジュラー設計になっているため、特定の部分だけを変更することが容易です。
## モデル継承の対応
親モデルから継承したフィールドも、子モデルのフィールドと同様に複製されます。抽象基底クラス、多重テーブル継承、プロキシモデルのすべてのケースで正しく動作します。
"""
from django.db import models
from django.contrib.contenttypes.fields import GenericForeignKey, GenericRelation
from django.contrib.contenttypes.models import ContentType
import logging
logger = logging.getLogger(__name__)
def duplicate_instance(instance, exclude_fields=None, duplicate_relations=True, _processed_instances=None):
"""
Create a complete duplicate of a Django model instance with options for handling relations.
Args:
instance: The model instance to duplicate
exclude_fields: List of field names to exclude from duplication
duplicate_relations: If True, related objects will be duplicated
If False, new instance will reference the same related objects
_processed_instances: Internal parameter to prevent infinite recursion
Returns:
The new duplicated instance
"""
if exclude_fields is None:
exclude_fields = []
# Initialize recursion tracking if this is the top-level call
if _processed_instances is None:
_processed_instances = {}
# Guard against None instances
if instance is None:
return None
# Prevent infinite recursion by tracking processed instances
instance_key = f"{instance.__class__.__name__}_{instance.pk}"
if instance_key in _processed_instances:
return _processed_instances[instance_key]
# Create basic copy with regular fields (but don't save yet)
new_instance = _copy_basic_fields(instance, exclude_fields)
# Store reference to prevent infinite recursion in case of circular references
_processed_instances[instance_key] = new_instance
# Process direct relations (where this model points to others)
if duplicate_relations:
_process_direct_relations(instance, new_instance, exclude_fields, _processed_instances)
else:
_reference_direct_relations(instance, new_instance, exclude_fields)
# Process generic foreign keys
_process_generic_foreign_keys(instance, new_instance, exclude_fields, duplicate_relations, _processed_instances)
# Process many-to-many relationships
_process_many_to_many(instance, new_instance, exclude_fields)
# Now save the instance with all direct relations set
new_instance.save()
# Process reverse relations (where other models point to this one)
if duplicate_relations:
_process_reverse_relations(instance, new_instance, exclude_fields, _processed_instances)
_process_generic_relations(instance, new_instance, exclude_fields, _processed_instances)
# Final save to ensure all relations are properly stored
new_instance.save()
return new_instance
def _copy_basic_fields(instance, exclude_fields):
"""Copy basic field values to a new model instance."""
model_class = instance.__class__
new_instance = model_class()
# Get all fields including those from parent models
for field in model_class._meta.get_fields():
# Skip if not a concrete field (abstract or through m2m fields)
if not hasattr(field, 'concrete') or not field.concrete:
continue
# Skip primary key, excluded fields, and relations
if field.primary_key or field.name in exclude_fields or (
field.is_relation and not isinstance(field, models.ForeignKey) and
not isinstance(field, models.OneToOneField)
):
continue
# Skip auto_now and auto_now_add fields (they'll be set on save)
if hasattr(field, 'auto_now') and field.auto_now:
continue
if hasattr(field, 'auto_now_add') and field.auto_now_add:
continue
# Skip relations that will be handled separately
if field.is_relation:
continue
# Copy regular field value
try:
setattr(new_instance, field.name, getattr(instance, field.name))
except Exception as e:
logger.warning(f"Could not copy field {field.name}: {str(e)}")
# Don't save yet - we'll do that after setting relations
return new_instance
def _process_direct_relations(instance, new_instance, exclude_fields, processed_instances):
"""Process ForeignKey and OneToOne fields where this model points to others, duplicating them."""
model_class = instance.__class__
for field in model_class._meta.get_fields():
# Skip if not a concrete field
if not hasattr(field, 'concrete') or not field.concrete:
continue
# Skip non-relations and excluded fields
if not field.is_relation or field.name in exclude_fields:
continue
# Skip M2M and reverse relations
if field.many_to_many or field.one_to_many:
continue
# Skip GenericForeignKey as they're handled separately
if hasattr(field, 'ct_field'):
continue
# Process only direct ForeignKey and OneToOne fields
if isinstance(field, models.ForeignKey) or isinstance(field, models.OneToOneField):
try:
# Safely get related object (may be None for nullable fields)
related_obj = getattr(instance, field.name)
if related_obj is None:
continue
# Create a duplicate of the related object
related_copy = duplicate_instance(
related_obj,
exclude_fields=exclude_fields,
duplicate_relations=True,
_processed_instances=processed_instances
)
setattr(new_instance, field.name, related_copy)
except Exception as e:
logger.warning(f"Error duplicating relation {field.name}: {str(e)}")
def _reference_direct_relations(instance, new_instance, exclude_fields):
"""Process ForeignKey and OneToOne fields where this model points to others, referencing existing objects."""
model_class = instance.__class__
for field in model_class._meta.get_fields():
# Skip if not a concrete field
if not hasattr(field, 'concrete') or not field.concrete:
continue
# Skip non-relations and excluded fields
if not field.is_relation or field.name in exclude_fields:
continue
# Skip M2M and reverse relations
if field.many_to_many or field.one_to_many:
continue
# Skip GenericForeignKey as they're handled separately
if hasattr(field, 'ct_field'):
continue
# Process only direct ForeignKey and OneToOne fields
if isinstance(field, models.ForeignKey) or isinstance(field, models.OneToOneField):
try:
# Safely get related object (may be None for nullable fields)
related_obj = getattr(instance, field.name)
if related_obj is None:
continue
# Reference the same related object
setattr(new_instance, field.name, related_obj)
except Exception as e:
logger.warning(f"Error referencing relation {field.name}: {str(e)}")
def _process_generic_foreign_keys(instance, new_instance, exclude_fields, duplicate_relations, processed_instances):
"""Process GenericForeignKey fields."""
model_class = instance.__class__
# Find all GenericForeignKey fields
for field in model_class._meta.private_fields:
if not isinstance(field, GenericForeignKey):
continue
if field.name in exclude_fields:
continue
try:
# Get content type and object id field names
ct_field = field.ct_field
fk_field = field.fk_field
# These should already be copied as basic fields, but let's ensure
if hasattr(instance, ct_field) and hasattr(instance, fk_field):
ct_value = getattr(instance, ct_field)
fk_value = getattr(instance, fk_field)
if ct_value is not None and fk_value is not None:
# Ensure new_instance is saved and has a primary key before setting relations
if new_instance.pk is None:
new_instance.save()
# Set the same content type
setattr(new_instance, ct_field, ct_value)
if duplicate_relations:
# Get the actual related object
related_obj = getattr(instance, field.name)
if related_obj is not None:
# Duplicate the related object
related_copy = duplicate_instance(
related_obj,
exclude_fields=exclude_fields,
duplicate_relations=True,
_processed_instances=processed_instances
)
# Set the new object ID
setattr(new_instance, fk_field, related_copy.pk)
else:
# Just reference the same object
setattr(new_instance, fk_field, fk_value)
except Exception as e:
logger.warning(f"Error processing GenericForeignKey {field.name}: {str(e)}")
def _process_many_to_many(instance, new_instance, exclude_fields):
"""Process ManyToMany relationships."""
model_class = instance.__class__
for field in model_class._meta.get_fields():
if not field.many_to_many or field.name in exclude_fields:
continue
# Skip reverse relations
if field.auto_created:
continue
try:
# Get the related manager for this m2m field
source_m2m = getattr(instance, field.name)
# We need to save before we can add m2m relations
if new_instance.pk is None:
new_instance.save()
target_m2m = getattr(new_instance, field.name)
# Add the same related objects (not duplicating them)
related_objects = list(source_m2m.all())
if related_objects:
target_m2m.add(*related_objects)
except Exception as e:
logger.warning(f"Error processing M2M field {field.name}: {str(e)}")
def _process_reverse_relations(instance, new_instance, exclude_fields, processed_instances):
"""Process reverse relations (where other models point to this one)."""
model_class = instance.__class__
# Find all related objects that point to our instance
for related_object in [f for f in model_class._meta.get_fields() if f.one_to_many or f.one_to_one and f.auto_created]:
# Skip excluded relations
if related_object.name in exclude_fields:
continue
# Skip GenericRelation as they're handled separately
if isinstance(related_object, GenericRelation):
continue
# Get the accessor name
accessor_name = related_object.get_accessor_name()
# Skip if the accessor doesn't exist
if not hasattr(instance, accessor_name):
continue
# Handle OneToOne reverse relations
if related_object.one_to_one:
_duplicate_reverse_one_to_one(
instance, new_instance, related_object, accessor_name, exclude_fields, processed_instances
)
# Handle ForeignKey reverse relations
elif related_object.one_to_many:
_duplicate_reverse_foreign_key(
instance, new_instance, related_object, accessor_name, exclude_fields, processed_instances
)
def _process_generic_relations(instance, new_instance, exclude_fields, processed_instances):
"""Process GenericRelation fields (objects that reference this one via GenericForeignKey)."""
model_class = instance.__class__
# Find all GenericRelation fields pointing to this model
for field in model_class._meta.get_fields():
if not isinstance(field, GenericRelation) or field.name in exclude_fields:
continue
try:
# Get the accessor name
accessor_name = field.name
# Get related objects
related_manager = getattr(instance, accessor_name)
related_objects = list(related_manager.all())
for related_obj in related_objects:
# Find which GenericForeignKey points to our instance
content_type = ContentType.objects.get_for_model(instance)
# Find the GenericForeignKey in the related model
generic_fk = None
for gfk in related_obj.__class__._meta.private_fields:
if isinstance(gfk, GenericForeignKey):
# Check if this GenericForeignKey points to our instance
if (getattr(related_obj, gfk.ct_field) == content_type and
str(getattr(related_obj, gfk.fk_field)) == str(instance.pk)):
generic_fk = gfk
break
if generic_fk:
# Create a copy of the related object
field_to_exclude = [generic_fk.name] + list(exclude_fields)
related_copy = duplicate_instance(
related_obj,
exclude_fields=field_to_exclude,
duplicate_relations=True,
_processed_instances=processed_instances
)
# Set the content type field (should be the same)
# Set the object id to our new instance
setattr(related_copy, generic_fk.ct_field, content_type)
setattr(related_copy, generic_fk.fk_field, new_instance.pk)
related_copy.save()
except Exception as e:
logger.warning(f"Error processing GenericRelation {field.name}: {str(e)}")
def _duplicate_reverse_one_to_one(instance, new_instance, related_object, accessor_name, exclude_fields, processed_instances):
"""Duplicate a reverse OneToOne relation."""
try:
related_instance = getattr(instance, accessor_name)
if related_instance:
# Create a copy but exclude the field pointing back to prevent infinite recursion
field_to_exclude = list(exclude_fields) + [related_object.field.name]
related_copy = duplicate_instance(
related_instance,
exclude_fields=field_to_exclude,
duplicate_relations=True,
_processed_instances=processed_instances
)
# Update the copy to point to our new instance
setattr(related_copy, related_object.field.name, new_instance)
related_copy.save()
except models.ObjectDoesNotExist:
# No related object exists
pass
except AttributeError as e:
logger.warning(f"AttributeError in reverse OneToOne relation {accessor_name}: {str(e)}")
except Exception as e:
logger.warning(f"Error in reverse OneToOne relation {accessor_name}: {str(e)}")
def _duplicate_reverse_foreign_key(instance, new_instance, related_object, accessor_name, exclude_fields, processed_instances):
"""Duplicate reverse ForeignKey relations."""
try:
related_manager = getattr(instance, accessor_name)
related_instances = list(related_manager.all())
for related_instance in related_instances:
# Create a copy but exclude the field pointing back to prevent infinite recursion
field_to_exclude = list(exclude_fields) + [related_object.field.name]
related_copy = duplicate_instance(
related_instance,
exclude_fields=field_to_exclude,
duplicate_relations=True,
_processed_instances=processed_instances
)
# Update the copy to point to our new instance
setattr(related_copy, related_object.field.name, new_instance)
related_copy.save()
except AttributeError as e:
logger.warning(f"AttributeError in reverse ForeignKey relation {accessor_name}: {str(e)}")
except Exception as e:
logger.warning(f"Error in reverse ForeignKey relation {accessor_name}: {str(e)}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment