|
#!/usr/bin/env python3 |
|
""" |
|
nest_json_keys.py - v2 |
|
|
|
Converts flat dot-separated JSON keys into nested dictionaries. |
|
Example: {"foo.bar.baz": "value"} -> {"foo": {"bar": {"baz": "value"}}} |
|
""" |
|
|
|
import os |
|
import json |
|
import argparse |
|
from glob import glob |
|
from typing import Any, Dict, List, Set, Tuple, Optional |
|
|
|
|
|
class ConflictError(Exception): |
|
"""Raised when a key path would need to be both a value and a dictionary.""" |
|
|
|
pass |
|
|
|
|
|
def set_nested_value( |
|
target: Dict, key_parts: List[str], value: Any, full_key: str |
|
) -> None: |
|
""" |
|
Set a value in a nested dictionary structure, creating intermediate dicts as needed. |
|
|
|
Args: |
|
target: The dictionary to modify |
|
key_parts: List of key parts forming the path (e.g., ['foo', 'bar', 'baz']) |
|
value: The value to set |
|
full_key: The original full key (for error messages) |
|
|
|
Raises: |
|
ConflictError: If a path element needs to be both a dict and a value |
|
""" |
|
current = target |
|
|
|
# Navigate/create the path to the parent of the final key |
|
for i, part in enumerate(key_parts[:-1]): |
|
if part not in current: |
|
# Create new intermediate dict |
|
current[part] = {} |
|
elif not isinstance(current[part], dict): |
|
# Conflict: this path element is already a non-dict value |
|
partial_key = ".".join(key_parts[: i + 1]) |
|
raise ConflictError( |
|
f"Conflict at '{partial_key}': cannot nest '{full_key}' " |
|
f"because '{partial_key}' is already a non-dict value" |
|
) |
|
current = current[part] |
|
|
|
# Set the final value |
|
final_key = key_parts[-1] |
|
if ( |
|
final_key in current |
|
and isinstance(current[final_key], dict) |
|
and not isinstance(value, dict) |
|
): |
|
# Conflict: trying to overwrite a dict with a non-dict value |
|
raise ConflictError( |
|
f"Conflict at '{full_key}': cannot set non-dict value " |
|
f"because '{full_key}' already has nested children" |
|
) |
|
|
|
current[final_key] = value |
|
|
|
|
|
def nest_dict( |
|
flat_dict: Dict[str, Any], prefix: Optional[str] = None |
|
) -> Dict[str, Any]: |
|
""" |
|
Convert a flat dictionary with dot-separated keys into a nested dictionary. |
|
|
|
Args: |
|
flat_dict: Dictionary with potentially dot-separated keys |
|
prefix: If provided, only nest keys that start with this prefix |
|
|
|
Returns: |
|
Nested dictionary |
|
|
|
Raises: |
|
ConflictError: If there are conflicting key paths |
|
""" |
|
result = {} |
|
errors = [] |
|
|
|
for key, value in flat_dict.items(): |
|
# Skip empty keys |
|
if not key: |
|
continue |
|
|
|
# Determine if this key should be processed |
|
if prefix is not None: |
|
# Check if key matches the prefix |
|
if key == prefix: |
|
# Key exactly matches prefix - this is the prefix value itself |
|
# We'll handle this specially: set it at the prefix location |
|
prefix_parts = prefix.split(".") |
|
try: |
|
set_nested_value(result, prefix_parts, value, key) |
|
except ConflictError as e: |
|
errors.append(str(e)) |
|
continue |
|
elif key.startswith(prefix + "."): |
|
# Key has the prefix - extract the subkey |
|
subkey = key[len(prefix) + 1 :] |
|
prefix_parts = prefix.split(".") |
|
|
|
# We need to nest under the prefix location |
|
# First ensure prefix path exists |
|
current = result |
|
for part in prefix_parts[:-1]: |
|
if part not in current: |
|
current[part] = {} |
|
elif not isinstance(current[part], dict): |
|
errors.append( |
|
f"Conflict: cannot process '{key}' because '{part}' is not a dict" |
|
) |
|
continue |
|
current = current[part] |
|
|
|
# Ensure the final prefix part exists as a dict |
|
final_prefix = prefix_parts[-1] |
|
if final_prefix not in current: |
|
current[final_prefix] = {} |
|
elif not isinstance(current[final_prefix], dict): |
|
errors.append( |
|
f"Conflict: cannot process '{key}' because prefix '{prefix}' is not a dict" |
|
) |
|
continue |
|
|
|
# Now set the subkey within the prefix location |
|
subkey_parts = subkey.split(".") |
|
try: |
|
set_nested_value(current[final_prefix], subkey_parts, value, key) |
|
except ConflictError as e: |
|
errors.append(str(e)) |
|
continue |
|
else: |
|
# Key doesn't match prefix - keep it as-is |
|
result[key] = value |
|
continue |
|
|
|
# No prefix, or we're processing a matching key |
|
# Check if the key has dots (needs nesting) |
|
if "." in key: |
|
key_parts = key.split(".") |
|
try: |
|
set_nested_value(result, key_parts, value, key) |
|
except ConflictError as e: |
|
errors.append(str(e)) |
|
else: |
|
# No dots - just set directly |
|
result[key] = value |
|
|
|
if errors: |
|
raise ConflictError("\n".join(errors)) |
|
|
|
return result |
|
|
|
|
|
def has_dots_in_keys(data: Dict[str, Any], prefix: Optional[str] = None) -> bool: |
|
""" |
|
Check if any keys in the dictionary have dots (and optionally match a prefix). |
|
|
|
Args: |
|
data: Dictionary to check |
|
prefix: If provided, only check keys matching this prefix |
|
|
|
Returns: |
|
True if any processable keys with dots are found |
|
""" |
|
for key in data.keys(): |
|
if not key: |
|
continue |
|
|
|
if prefix is not None: |
|
# Check if key matches or starts with prefix |
|
if key == prefix or key.startswith(prefix + "."): |
|
if "." in key: |
|
return True |
|
else: |
|
if "." in key: |
|
return True |
|
|
|
return False |
|
|
|
|
|
def process_file( |
|
filepath: str, prefix: Optional[str] = None, dry_run: bool = False |
|
) -> bool: |
|
""" |
|
Process a single JSON file, nesting keys as specified. |
|
|
|
Args: |
|
filepath: Path to the JSON file |
|
prefix: Optional prefix to filter which keys to nest |
|
dry_run: If True, don't write changes, just report what would happen |
|
|
|
Returns: |
|
True if file was processed/modified, False if skipped |
|
""" |
|
# Read the file |
|
try: |
|
with open(filepath, "r", encoding="utf-8") as f: |
|
data = json.load(f) |
|
except json.JSONDecodeError as e: |
|
print(f"[ERROR] Failed to parse {filepath}: {e}") |
|
return False |
|
except Exception as e: |
|
print(f"[ERROR] Failed to read {filepath}: {e}") |
|
return False |
|
|
|
# Validate that data is a dictionary |
|
if not isinstance(data, dict): |
|
print(f"[SKIP] {filepath}: root element is not an object") |
|
return False |
|
|
|
# Check if there's anything to process |
|
if not has_dots_in_keys(data, prefix): |
|
if prefix: |
|
print(f"[SKIP] {filepath}: no keys matching prefix '{prefix}' with dots") |
|
else: |
|
print(f"[SKIP] {filepath}: no dot-separated keys found") |
|
return False |
|
|
|
# Process the data |
|
try: |
|
nested_data = nest_dict(data, prefix) |
|
except ConflictError as e: |
|
print(f"\n[ERROR] Conflicts in {filepath}:\n{e}") |
|
return False |
|
|
|
# Write the result |
|
if not dry_run: |
|
try: |
|
with open(filepath, "w", encoding="utf-8") as f: |
|
json.dump(nested_data, f, ensure_ascii=False, indent=2) |
|
except Exception as e: |
|
print(f"[ERROR] Failed to write {filepath}: {e}") |
|
return False |
|
|
|
# Report success |
|
if prefix: |
|
print(f"[OK] Processed {filepath} (nested keys with prefix '{prefix}')") |
|
else: |
|
print(f"[OK] Processed {filepath} (nested all dot-separated keys)") |
|
|
|
return True |
|
|
|
|
|
def collect_files( |
|
path: str, file_patterns: Optional[List[str]] = None |
|
) -> Tuple[Set[str], bool]: |
|
""" |
|
Collect files to process based on path and optional file patterns. |
|
|
|
Args: |
|
path: Base directory path |
|
file_patterns: Optional list of file patterns (globs) |
|
|
|
Returns: |
|
Tuple of (set of file paths, error_occurred flag) |
|
""" |
|
files_to_process = set() |
|
error_occurred = False |
|
|
|
if file_patterns: |
|
# Process specific file patterns |
|
for pattern in file_patterns: |
|
# Handle both absolute and relative patterns |
|
if os.path.isabs(pattern): |
|
search_pattern = pattern |
|
else: |
|
search_pattern = os.path.join(path, pattern) |
|
|
|
# Find matching files |
|
matches = glob(search_pattern, recursive=False) |
|
matched_files = [os.path.abspath(f) for f in matches if os.path.isfile(f)] |
|
|
|
if not matched_files: |
|
print(f"[ERROR] No files matched pattern '{pattern}'") |
|
error_occurred = True |
|
else: |
|
files_to_process.update(matched_files) |
|
else: |
|
# Process all JSON files in the directory |
|
pattern = os.path.join(path, "*.json") |
|
json_files = [os.path.abspath(f) for f in glob(pattern) if os.path.isfile(f)] |
|
|
|
if not json_files: |
|
print(f"[ERROR] No .json files found in directory '{path}'") |
|
error_occurred = True |
|
else: |
|
files_to_process.update(json_files) |
|
|
|
return files_to_process, error_occurred |
|
|
|
|
|
def main(): |
|
"""Main entry point for the script.""" |
|
parser = argparse.ArgumentParser( |
|
description="Nest dot-separated keys in JSON files.", |
|
formatter_class=argparse.RawDescriptionHelpFormatter, |
|
epilog=""" |
|
Examples: |
|
%(prog)s |
|
Process all .json files in the current directory |
|
|
|
%(prog)s --file en.json --prefix menu.items |
|
Nest only keys starting with 'menu.items' in en.json |
|
|
|
%(prog)s --path ./locales --file "*.locale.json" |
|
Process all .locale.json files in ./locales directory |
|
""", |
|
) |
|
|
|
parser.add_argument( |
|
"--path", |
|
type=str, |
|
default=os.path.dirname(os.path.abspath(__file__)) or ".", |
|
help="Directory containing JSON files (default: script directory)", |
|
) |
|
|
|
parser.add_argument( |
|
"--file", |
|
type=str, |
|
action="append", |
|
dest="files", |
|
help="File pattern to process (can be used multiple times, supports wildcards)", |
|
) |
|
|
|
parser.add_argument( |
|
"--prefix", |
|
type=str, |
|
default=None, |
|
help='Only nest keys starting with this prefix (e.g., "menu.items")', |
|
) |
|
|
|
parser.add_argument( |
|
"--dry-run", |
|
action="store_true", |
|
help="Show what would be done without making changes", |
|
) |
|
|
|
args = parser.parse_args() |
|
|
|
# Validate path |
|
if not os.path.isdir(args.path): |
|
print(f"[ERROR] Path '{args.path}' is not a valid directory") |
|
return 1 |
|
|
|
# Collect files |
|
files_to_process, file_error = collect_files(args.path, args.files) |
|
|
|
if file_error or not files_to_process: |
|
return 1 |
|
|
|
# Process files |
|
print(f"\nProcessing {len(files_to_process)} file(s)...") |
|
|
|
processed_count = 0 |
|
for filepath in sorted(files_to_process): |
|
if process_file(filepath, prefix=args.prefix, dry_run=args.dry_run): |
|
processed_count += 1 |
|
|
|
# Summary |
|
print( |
|
f"\n{'Would process' if args.dry_run else 'Processed'} {processed_count}/{len(files_to_process)} file(s)" |
|
) |
|
|
|
return 0 |
|
|
|
|
|
if __name__ == "__main__": |
|
exit(main()) |