Skip to content

Instantly share code, notes, and snippets.

@kurokobo
Last active December 8, 2025 01:24
Show Gist options
  • Select an option

  • Save kurokobo/51fbe7f92f4526957e12dacfa7783cdf to your computer and use it in GitHub Desktop.

Select an option

Save kurokobo/51fbe7f92f4526957e12dacfa7783cdf to your computer and use it in GitHub Desktop.
Dify: Weaviate 1.19 to 1.27+ Migration Guide (community-edited, simplified)
#!/usr/bin/env python3
"""
Migration script to fix Weaviate schema incompatibility between 1.19.0 and 1.27.0+
This script:
- Identifies collections with old schema (no vectorConfig)
- Creates new collections with proper vectorConfig including "default" named vector
- Migrates data using cursor-based pagination (efficient for large datasets)
- Uses batch operations for fast inserts
- Preserves all object properties and vectors
Note:
- This is a community-edited version of the draft of the script presented by the Dify Team.
- This script is not officially supported by the Dify Team.
- The original source for this script can be found at https://github.com/langgenius/dify/issues/27291#issuecomment-3501003678.
- The changes made in this script are:
- Retrieve Weaviate connection info from environment variables to make this script run in the Worker container.
- Switch to cursor-based pagination in "replace_old_collection", since the migration could fail with large collections.
- Fix an issue where both the old and new collections remained without being deleted after migrating an empty collection.
"""
import os
import weaviate
from weaviate.classes.config import Configure, VectorDistances
import sys
import time
from typing import List, Dict, Any
# Configuration
WEAVIATE_ENDPOINT = os.getenv("WEAVIATE_ENDPOINT", "http://weaviate:8080")
WEAVIATE_GRPC_ENDPOINT = os.getenv("WEAVIATE_GRPC_ENDPOINT", "grpc://weaviate:50051")
WEAVIATE_API_KEY = os.getenv("WEAVIATE_API_KEY", "WVF5YThaHlkYwhGUSmCRgsX3tD5ngdN8pkih")
BATCH_SIZE = 1000
WEAVIATE_HOST = WEAVIATE_ENDPOINT.split("//")[-1].split(":")[0]
WEAVIATE_PORT = int(WEAVIATE_ENDPOINT.split(":")[-1])
WEAVIATE_GRPC_PORT = int(WEAVIATE_GRPC_ENDPOINT.split(":")[-1])
def identify_old_collections(client: weaviate.WeaviateClient) -> List[str]:
"""Identify collections that need migration (those without vectorConfig)"""
collections_to_migrate = []
all_collections = client.collections.list_all()
print(f"Found {len(all_collections)} total collections")
for collection_name in all_collections.keys():
# Only check Vector_index collections (Dify knowledge bases)
if not collection_name.startswith("Vector_index_"):
continue
collection = client.collections.get(collection_name)
config = collection.config.get()
# Check if this collection has the old schema
if config.vector_config is None:
collections_to_migrate.append(collection_name)
print(f" - {collection_name}: OLD SCHEMA (needs migration)")
else:
print(f" - {collection_name}: NEW SCHEMA (skip)")
return collections_to_migrate
def get_collection_schema(client: weaviate.WeaviateClient, collection_name: str) -> Dict[str, Any]:
"""Get the full schema of a collection via REST API"""
import requests
response = requests.get(
f"http://{WEAVIATE_HOST}:{WEAVIATE_PORT}/v1/schema/{collection_name}",
headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}
)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to get schema: {response.text}")
def create_new_collection(client: weaviate.WeaviateClient, old_name: str, schema: Dict[str, Any]) -> str:
"""Create a new collection with updated schema using REST API"""
import requests
# Generate new collection name
new_name = f"{old_name}_migrated"
print(f"Creating new collection: {new_name}")
# Build new schema with proper vectorConfig
# Note: When using vectorConfig (named vectors), we don't set class-level vectorizer
new_schema = {
"class": new_name,
# This is the key: define vectorConfig with "default" named vector
# Do NOT set class-level vectorizer when using vectorConfig
"vectorConfig": {
"default": {
"vectorizer": {
"none": {}
},
"vectorIndexType": "hnsw",
"vectorIndexConfig": {
"distance": "cosine",
"ef": -1,
"efConstruction": 128,
"maxConnections": 32
}
}
},
"properties": []
}
# Copy properties from old schema
if "properties" in schema:
new_schema["properties"] = schema["properties"]
# Create collection via REST API
response = requests.post(
f"{WEAVIATE_ENDPOINT}/v1/schema",
json=new_schema,
headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}
)
if response.status_code not in [200, 201]:
raise Exception(f"Failed to create collection: {response.text}")
print(f" Created new collection: {new_name}")
return new_name
def migrate_collection_data(
client: weaviate.WeaviateClient,
old_collection_name: str,
new_collection_name: str
) -> int:
"""Migrate data from old collection to new collection using cursor-based pagination"""
old_collection = client.collections.get(old_collection_name)
new_collection = client.collections.get(new_collection_name)
total_migrated = 0
cursor = None
print(f"Migrating data from {old_collection_name} to {new_collection_name}")
while True:
# Fetch batch of objects using cursor-based pagination
if cursor is None:
# First batch
response = old_collection.query.fetch_objects(
limit=BATCH_SIZE,
include_vector=True
)
else:
# Subsequent batches using cursor
response = old_collection.query.fetch_objects(
limit=BATCH_SIZE,
include_vector=True,
after=cursor
)
objects = response.objects
if not objects:
break
# Use batch insert for efficiency
with new_collection.batch.dynamic() as batch:
for obj in objects:
# Prepare properties
properties = obj.properties
# Add object with vector
batch.add_object(
properties=properties,
vector=obj.vector["default"] if isinstance(obj.vector, dict) else obj.vector,
uuid=obj.uuid
)
total_migrated += len(objects)
print(f" Migrated {total_migrated} objects...")
# Update cursor for next iteration
if len(objects) < BATCH_SIZE:
# Last batch
break
else:
# Get the last object's UUID for cursor
cursor = objects[-1].uuid
print(f" Total migrated: {total_migrated} objects")
return total_migrated
def verify_migration(
client: weaviate.WeaviateClient,
old_collection_name: str,
new_collection_name: str
):
"""Verify that the migration was successful"""
old_collection = client.collections.get(old_collection_name)
new_collection = client.collections.get(new_collection_name)
# Count objects in both collections
old_count_response = old_collection.query.fetch_objects(limit=1)
new_count_response = new_collection.query.fetch_objects(limit=1)
# Get aggregation for accurate counts
old_agg = old_collection.aggregate.over_all(total_count=True)
new_agg = new_collection.aggregate.over_all(total_count=True)
old_count = old_agg.total_count
new_count = new_agg.total_count
print(f"\nVerification:")
print(f" Old collection ({old_collection_name}): {old_count} objects")
print(f" New collection ({new_collection_name}): {new_count} objects")
if old_count == new_count:
print(f" Status: SUCCESS - Counts match!")
return True
else:
print(f" Status: WARNING - Counts don't match!")
return False
def replace_old_collection(
client: weaviate.WeaviateClient,
old_collection_name: str,
new_collection_name: str
):
"""Replace old collection with migrated one by recreating with original name"""
import requests
print(f"\nReplacing old collection with migrated data...")
# Step 1: Delete old collection
print(f" Step 1: Deleting old collection...")
response = requests.delete(
f"{WEAVIATE_ENDPOINT}/v1/schema/{old_collection_name}",
headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}
)
if response.status_code != 200:
print(f" Warning: Could not delete old collection: {response.text}")
else:
print(f" Deleted")
# Step 2: Get schema from migrated collection
print(f" Step 2: Getting schema from migrated collection...")
schema_response = requests.get(
f"{WEAVIATE_ENDPOINT}/v1/schema/{new_collection_name}",
headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}
)
schema = schema_response.json()
schema["class"] = old_collection_name
# Step 3: Create collection with original name and new schema
print(f" Step 3: Creating collection with original name...")
create_response = requests.post(
f"{WEAVIATE_ENDPOINT}/v1/schema",
json=schema,
headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}
)
if create_response.status_code not in [200, 201]:
raise Exception(f"Failed to create collection: {create_response.text}")
print(f" Created")
# Step 4: Copy data to collection with original name using cursor-based pagination
print(f" Step 4: Copying data to original collection name...")
migrated_collection = client.collections.get(new_collection_name)
new_collection = client.collections.get(old_collection_name)
total_copied = 0
cursor = None
while True:
# Fetch batch of objects using cursor-based pagination
if cursor is None:
# First batch
response = migrated_collection.query.fetch_objects(
include_vector=True,
limit=BATCH_SIZE
)
else:
# Subsequent batches using cursor
response = migrated_collection.query.fetch_objects(
include_vector=True,
limit=BATCH_SIZE,
after=cursor
)
objects = response.objects
if not objects:
break
# Use batch insert for efficiency
with new_collection.batch.dynamic() as batch:
for obj in objects:
batch.add_object(
properties=obj.properties,
vector=obj.vector,
uuid=obj.uuid
)
total_copied += len(objects)
print(f" Copied {total_copied} objects...")
# Update cursor for next iteration
if len(objects) < BATCH_SIZE:
break
else:
cursor = objects[-1].uuid
print(f" Total copied: {total_copied} objects")
# Step 5: Delete the temporary migrated collection
print(f" Step 5: Cleaning up temporary migrated collection...")
response = requests.delete(
f"{WEAVIATE_ENDPOINT}/v1/schema/{new_collection_name}",
headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}
)
if response.status_code == 200:
print(f" Cleaned up")
print(f"\n SUCCESS! {old_collection_name} now has the new schema with {total_copied} objects")
return True
def migrate_all_collections():
"""Main migration function"""
print("=" * 80)
print("Weaviate Collection Migration Script")
print("Migrating from Weaviate 1.19.0 schema to 1.27.0+ schema")
print("=" * 80)
print()
client = weaviate.connect_to_local(
host=WEAVIATE_HOST,
port=WEAVIATE_PORT,
grpc_port=WEAVIATE_GRPC_PORT,
auth_credentials=weaviate.auth.AuthApiKey(WEAVIATE_API_KEY)
)
try:
# Step 1: Identify collections that need migration
print("Step 1: Identifying collections that need migration...")
collections_to_migrate = identify_old_collections(client)
if not collections_to_migrate:
print("\nNo collections need migration. All collections are up to date!")
return
print(f"\nFound {len(collections_to_migrate)} collections to migrate:")
for col in collections_to_migrate:
print(f" - {col}")
# Confirm before proceeding
print("\nThis script will:")
print("1. Create new collections with updated schema")
print("2. Copy all data using efficient batch operations")
print("3. Verify the migration")
print("4. Optionally rename collections to activate the new ones")
print()
# Step 2: Migrate each collection
for collection_name in collections_to_migrate:
print("\n" + "=" * 80)
print(f"Migrating: {collection_name}")
print("=" * 80)
try:
# Get old schema
schema = get_collection_schema(client, collection_name)
# Create new collection
new_collection_name = create_new_collection(client, collection_name, schema)
# Migrate data
migrated_count = migrate_collection_data(client, collection_name, new_collection_name)
# Verify migration
success = verify_migration(client, collection_name, new_collection_name)
if success:
print(f"\nMigration successful for {collection_name}!")
print(f"New collection: {new_collection_name}")
# Automatically replace old collection with migrated one
try:
replace_old_collection(client, collection_name, new_collection_name)
except Exception as e:
print(f"\nWarning: Could not automatically replace collection: {e}")
print(f"\nTo activate manually:")
print(f"1. Delete the old collection: {collection_name}")
print(f"2. Rename {new_collection_name} to {collection_name}")
except Exception as e:
print(f"\nError migrating {collection_name}: {e}")
print(f"Skipping this collection and continuing...")
continue
print("\n" + "=" * 80)
print("Migration Complete!")
print("=" * 80)
print("\nSummary:")
print(f" Collections migrated: {len(collections_to_migrate)}")
finally:
client.close()
if __name__ == "__main__":
try:
migrate_all_collections()
except KeyboardInterrupt:
print("\n\nMigration interrupted by user.")
sys.exit(1)
except Exception as e:
print(f"\n\nFatal error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)

Weaviate 1.19 to 1.27+ Migration Guide for Dify

  • ⚠️ This guide is not officially supported by the Dify Team.
  • ⚠️ This is a community-edited, simplified version of the draft of the official migration guide presented by the Dify Team.
  • ⚠️ The original source for this guide and script can be found at #27291.

Complete guide to safely migrate Dify knowledge bases from Weaviate 1.19 to 1.27/1.33.


Outline

This guide covers the following two cases.
While Case A is recommended for a safer migration, this guide can also be applied to Case B:

  • Case A
    • You are currently running a version of Dify 1.9.1 or earlier with Weaviate 1.19 included.
    • All knowledge is functioning properly.
  • Case B
    • You have already upgraded to Weaviate 1.27+ and are running Dify 1.9.2 or later.
    • The knowledge created with the previous version is corrupted, and you have no backup to revert to the earlier version.

The procedure in this guide is as follows:

  1. Take a complete backup of your current Dify environment.
  2. If your Dify version is 1.9.1 or earlier, upgrade Dify.
  3. Operate the weaviate container and modify the directory structure of the LSM data.
  4. Operate the worker container and run the migration script.
  5. Perform cleanup.

Migration Procedure

Note:
This procedure cannot be rolled back by any means other than a restore. Attempting to roll back using anything other than a restore may make things worse.
We recommend that you follow the steps to take a full backup first, in preparation for a possible restore.


Step 1: Backup Your Environment

Stop your Dify services:

cd /path/to/dify/docker
docker compose down

Then making full copy or archive of your entire docker directory (/path/to/dify/docker for example) as a safety measure.

If you encounter issues later, you can restore this backup to revert to the original state.


Step 2: Upgrade to Weaviate 1.27+ (Only for Case A)

This step is only for Case A - users currently on Dify 1.9.1 or earlier with Weaviate 1.19.
If you are already running Weaviate 1.27+ (Case B), you can skip this step.

Follow the upgrade guide to move to the latest (or a specific) Dify version that uses Weaviate 1.27+.


Step 3: Fix Orphaned LSM Data

If your Dify has stopped, start it and wait until it has fully launched.

cd /path/to/dify/docker
docker compose up -d

Ensure your Weaviate using the image version 1.27.0 or higher.

cd /path/to/dify/docker
docker compose ps weaviate  # The "IMAGE" column should show "semitechnologies/weaviate:1.27.0" or higher

Enter the shell of your weaviatwe container:

cd /path/to/dify/docker
docker compose exec -it weaviate /bin/sh

Then run the following commands inside the container to fix LSM data:

cd /var/lib/weaviate
for dir in vector_index_*_node_*_lsm; do
  [ -d "$dir" ] || continue
  
  # Extract index ID and shard ID
  index_id=$(echo "$dir" | sed -n 's/vector_index_\([^_]*_[^_]*_[^_]*_[^_]*_[^_]*\)_node_.*/\1/p')
  shard_id=$(echo "$dir" | sed -n 's/.*_node_\([^_]*\)_lsm/\1/p')
  
  # Create target directory and copy
  mkdir -p "vector_index_${index_id}_node/$shard_id/lsm"
  cp -a "$dir/"* "vector_index_${index_id}_node/$shard_id/lsm/"
  
  echo "✓ Copied $dir"
done
exit

Then restart weaviate container to ensure changes are recognized:

cd /path/to/dify/docker
docker compose restart weaviate

Step 4: Migrate Schema

Place migrate_weaviate_collections_ce.py script to your /path/to/dify/docker/volumes/app/storage/ directory, then enter the shell of your worker container:

cp /path/to/migrate_weaviate_collections_ce.py /path/to/dify/docker/volumes/app/storage/
cd /path/to/dify/docker
docker compose exec -it worker /bin/bash

Then run the following commands inside the container to execute the migration script:

uv run /app/api/storage/migrate_weaviate_collections_ce.py
exit

Restart Dify services:

docker compose down
docker compose up -d

Verify in Dify UI:

  1. Go to your Dify console
  2. Open your knowledge bases
  3. Try "Retrieval Testing"
  4. Should work without errors!

Step 5: Cleanup (Optional)

After successful migration, you can delete orphaned files to free up space.
Enter the shell of your weaviatwe container:

cd /path/to/dify/docker
docker compose exec -it weaviate /bin/sh

Then run the following commands inside the container to delete orphaned files:

cd /var/lib/weaviate
rm -rf vector_index_*_node_*
exit

Also, you can delete the migration script from your storage volume:

rm /path/to/dify/docker/volumes/app/storage/migrate_weaviate_collections_ce.py

Files Needed

Credits

  • Original migration approach: Dify team
  • LSM recovery method: Chinese Dify community user
  • Combined solution: Community effort
@kurokobo
Copy link
Author

kurokobo commented Dec 7, 2025

@tomy-kyu
Thanks, I tried to reproduce your issue, but I couldn't.

  1. Deploy a whole new Dify 1.8.1 instance, install the OpenAI plugin, and configure the OpenAI API key.
  2. Create new Knowledge with the High Quality method using the OpenAI embedding model.
  3. Ensure that the Retrieval Testing works.
  4. Create new workflow app including Knowledge Retrieval node with the Knowledge created above.
  5. Ensure that the new app can retrieve the Knowledge, and retrieval count increased.
  6. Follow my guide to upgrade Dify to 1.9.2 and Weaviate to 1.27.0.

Now I can confirm that both the Retrieval Testing and the workflow app works, and retrieval count increased as expected.
Adding docs to the existing knowledge also works.

@tomy-kyu
Copy link

tomy-kyu commented Dec 7, 2025

@kurokobo

Thank you for your assistance with the verification process.
I've now understood that under normal circumstances, your system should function correctly.

My environment does indeed have some differences:

  • Weaviate was originally configured with the latest version 1.28 during initial setup.
  • We customized several processing steps to use gse as the tokenizer.
  • Our knowledge configuration always utilized parent-child chunks.

What occurred in my environment may be attributable to these factors.
Given the practical challenges of completely restoring and reimplementing all these changes at this time, I'm considering continuing to operate the tenant on version 1.10.1.fix1 for the moment.

Fortunately, we have two tenants (production and development), and the target is the development tenant. I expect the development tenant will continue to serve as a testing environment for versions 1.10 and above.

The production tenant remains on version 1.8.1. I'll freeze this version for ongoing operation for the time being, then consider setting up a new production tenant as needed and performing an environment migration.

I apologize for not being able to provide more useful information.
Best wishes for your future endeavors.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment