Skip to content

Instantly share code, notes, and snippets.

@kind-abhasin
Created September 18, 2025 19:15
Show Gist options
  • Select an option

  • Save kind-abhasin/be94e0973346376273e2b60648fa1d9f to your computer and use it in GitHub Desktop.

Select an option

Save kind-abhasin/be94e0973346376273e2b60648fa1d9f to your computer and use it in GitHub Desktop.
Redis Data Analysis
"""
Script to populate Redis with sample user reactions data and scan the results.
"""
import random
from datetime import datetime, timedelta
import redis
def populate_user_reactions(client):
"""Populate Redis with sample user reactions data"""
print("πŸ”„ Populating Redis with sample user reactions data...")
# Configuration
num_users = 100
reactions_per_user = 20
# Generate realistic data ranges
user_id_start = 1000
post_id_start = 10000
post_id_range = 10000 # Posts 10000-19999
# Time range: last 30 days
now = datetime.now()
thirty_days_ago = now - timedelta(days=30)
total_operations = 0
for user_offset in range(num_users):
user_id = user_id_start + user_offset
key = f"user:reactions:{user_id}"
# Generate random reactions for this user
reactions = {}
for _ in range(reactions_per_user):
# Random post ID
post_id = post_id_start + random.randint(0, post_id_range - 1)
# Random timestamp in last 30 days (as microseconds for precision)
random_time = thirty_days_ago + timedelta(
seconds=random.randint(0, int((now - thirty_days_ago).total_seconds()))
)
timestamp_usec = int(random_time.timestamp() * 1_000_000)
# Add to reactions (post_id as member, timestamp as score)
reactions[str(post_id)] = timestamp_usec
# Bulk add all reactions for this user using zadd
client.zadd(key, reactions)
total_operations += 1
# Progress indicator
if user_offset % 20 == 0:
print(f" Created {user_offset + 1}/{num_users} user reaction sets...")
print(f"βœ… Created {num_users} user reaction keys with {reactions_per_user} reactions each")
print(f" Total reactions: {num_users * reactions_per_user:,}")
print(f" Total operations: {total_operations}")
return num_users, reactions_per_user
def analyze_memory_usage(client):
"""Analyze memory usage of user reactions data"""
print("\nπŸ’Ύ Memory Usage Analysis")
# Find user reaction keys
cursor = 0
user_keys = []
while True:
cursor, keys_batch = client.scan(cursor, match="user:reactions:*", count=1000)
user_keys.extend(keys_batch)
if cursor == 0:
break
if not user_keys:
print("No user reaction keys found")
return
print(f"Analyzing {len(user_keys)} user reaction keys...")
# Sample keys for memory analysis
sample_size = min(10, len(user_keys))
sample_keys = random.sample(user_keys, sample_size)
total_memory_bytes = 0
for key in sample_keys:
try:
# Get memory usage for this specific key
memory_bytes = client.memory_usage(key)
total_memory_bytes += memory_bytes
# Get key details
reactions_count = client.zcard(key)
print(f" {key}: {memory_bytes:,} bytes ({reactions_count} reactions)")
except Exception as e:
print(f" {key}: Error getting memory usage - {e}")
if sample_size > 0:
avg_memory_per_key = total_memory_bytes / sample_size
estimated_total_memory = avg_memory_per_key * len(user_keys)
print("\nπŸ“Š Memory Analysis Summary:")
print(f" Sample size: {sample_size} keys")
print(
f" Average memory per key: {avg_memory_per_key:,.0f} bytes ({avg_memory_per_key / 1024:.1f} KB)"
)
print(
f" Estimated total memory for all {len(user_keys)} keys: {estimated_total_memory:,.0f} bytes ({estimated_total_memory / 1024 / 1024:.2f} MB)"
)
print(
f" Memory per reaction: {avg_memory_per_key / 20:.1f} bytes (assuming 20 reactions/key)"
)
# Breakdown of memory components
estimated_key_name_size = 20 # "user:reactions:1000" β‰ˆ 20 bytes
estimated_zset_overhead = 60 # Redis sorted set overhead
estimated_data_per_reaction = 13 # post_id (5) + timestamp (8) bytes
estimated_total_data = 20 * estimated_data_per_reaction
estimated_total = estimated_key_name_size + estimated_zset_overhead + estimated_total_data
print("\nπŸ” Memory Breakdown Estimate:")
print(f" Key name ('user:reactions:XXXX'): ~{estimated_key_name_size} bytes")
print(f" Sorted set overhead: ~{estimated_zset_overhead} bytes")
print(
f" Data (20 reactions Γ— {estimated_data_per_reaction} bytes): ~{estimated_total_data} bytes"
)
print(f" Estimated total: ~{estimated_total} bytes")
print(f" Actual average: {avg_memory_per_key:.0f} bytes")
print(
f" Difference: {abs(avg_memory_per_key - estimated_total):.0f} bytes (Redis overhead/fragmentation)"
)
def scan_sample_data(client):
"""Scan and display sample of the created data"""
print("\nπŸ” Scanning sample user reactions data...")
# Find user reaction keys
cursor = 0
user_keys = []
while True:
cursor, keys_batch = client.scan(cursor, match="user:reactions:*", count=1000)
user_keys.extend(keys_batch)
if cursor == 0:
break
print(f"Found {len(user_keys)} user reaction keys")
# Sample a few keys and show their data
sample_keys = random.sample(user_keys, min(5, len(user_keys)))
for key in sample_keys:
# Get total count
total_reactions = client.zcard(key)
# Get latest 3 reactions (highest scores = most recent)
latest_reactions = client.zrevrange(key, 0, 2, withscores=True)
# Get oldest 3 reactions (lowest scores = oldest)
oldest_reactions = client.zrange(key, 0, 2, withscores=True)
print(f"\nπŸ“Š Key: {key}")
print(f" Total reactions: {total_reactions}")
print(" Latest reactions:")
for post_id, timestamp_usec in latest_reactions:
dt = datetime.fromtimestamp(timestamp_usec / 1_000_000)
print(f" Post {post_id} at {dt.strftime('%Y-%m-%d %H:%M:%S')}")
print(" Oldest reactions:")
for post_id, timestamp_usec in oldest_reactions:
dt = datetime.fromtimestamp(timestamp_usec / 1_000_000)
print(f" Post {post_id} at {dt.strftime('%Y-%m-%d %H:%M:%S')}")
# Add memory analysis
analyze_memory_usage(client)
def main() -> None:
print("Connecting to local Redis instance...")
try:
# Simple Redis connection
client = redis.Redis(host='localhost', port=6379, decode_responses=True)
# Test connection
pong = client.ping()
print(f"βœ… Connected successfully! Ping response: {pong}")
# Ask user what to do
print("\nOptions:")
print("1. Populate Redis with sample user reactions data")
print("2. Scan existing data")
print("3. Clear all data and repopulate")
choice = input("\nEnter choice (1/2/3) or press Enter for option 1: ").strip()
if choice == "3":
print("\nπŸ—‘οΈ Clearing existing data...")
client.flushdb()
print("βœ… Cleared all data")
choice = "1"
if choice == "2":
scan_sample_data(client)
else:
# Default: populate data
# Get memory before
info_before = client.info()
memory_before = info_before.get('used_memory', 0)
num_users, reactions_per_user = populate_user_reactions(client)
# Get memory after
info_after = client.info()
memory_after = info_after.get('used_memory', 0)
memory_used_for_data = memory_after - memory_before
# Show sample of what was created
scan_sample_data(client)
# Show memory impact
print("\nπŸ’Ύ Memory Impact of Population:")
print(
f" Memory before: {memory_before:,} bytes ({memory_before / 1024 / 1024:.2f} MB)"
)
print(f" Memory after: {memory_after:,} bytes ({memory_after / 1024 / 1024:.2f} MB)")
print(
f" Memory used for data: {memory_used_for_data:,} bytes ({memory_used_for_data / 1024 / 1024:.2f} MB)"
)
# Show Redis info
info = client.info()
print("\nπŸ“ˆ Redis Info:")
print(f" Total keys: {info.get('db0', {}).get('keys', 0) if 'db0' in info else 0}")
print(f" Redis version: {info.get('redis_version', 'unknown')}")
print(f" Memory used: {info.get('used_memory_human', 'unknown')}")
print(f" Peak memory: {info.get('used_memory_peak_human', 'unknown')}")
print(f" Memory fragmentation ratio: {info.get('mem_fragmentation_ratio', 'unknown')}")
except redis.ConnectionError:
print("❌ Could not connect to Redis at localhost:6379")
print("Try starting Redis with: nd docker up -d redis")
except Exception as e:
print(f"❌ Error: {e}")
if __name__ == "__main__":
main()
@kind-abhasin
Copy link
Author

It doesn't take any django dependencies, so you can just do - :)

python script.py 

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment