Skip to content

Instantly share code, notes, and snippets.

@abdalrohman
Created October 27, 2024 08:14
Show Gist options
  • Select an option

  • Save abdalrohman/e86b5474dcfc9a2e3b74ea488dc90bbb to your computer and use it in GitHub Desktop.

Select an option

Save abdalrohman/e86b5474dcfc9a2e3b74ea488dc90bbb to your computer and use it in GitHub Desktop.
import concurrent.futures
import ctypes
import logging
import os
import queue
import shutil
import subprocess
import sys
import threading
from datetime import datetime
from typing import List, Set
import pywintypes
import win32con
import win32file
def is_admin():
try:
return ctypes.windll.shell32.IsUserAnAdmin()
except: # noqa: E722
return False
def setup_logging():
"""Configure logging with timestamp"""
log_dir = os.path.join(os.environ["USERPROFILE"], "Documents", "CleanupLogs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, f'cleanup_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log')
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler(log_file), logging.StreamHandler()],
)
return logging.getLogger(__name__)
def get_size_format(bytes):
"""Convert bytes to human readable format"""
for unit in ["B", "KB", "MB", "GB", "TB"]:
if bytes < 1024.0:
return f"{bytes:.2f} {unit}"
bytes /= 1024.0
def safe_remove(path, logger):
"""Safely remove a file or directory"""
try:
if os.path.isfile(path):
os.remove(path)
elif os.path.isdir(path):
shutil.rmtree(path)
logger.info(f"Successfully removed: {path}")
return True
except Exception as e:
logger.error(f"Error removing {path}: {str(e)}")
return False
def clean_temp_files(logger):
"""Clean temporary files from various Windows locations"""
temp_locations = [
os.environ.get("TEMP"),
os.environ.get("TMP"),
os.path.join(os.environ.get("WINDIR"), "Temp"),
os.path.join(os.environ.get("WINDIR"), "Prefetch"),
os.path.join(os.environ.get("LOCALAPPDATA"), "Temp"),
]
total_cleaned = 0
for location in temp_locations:
if not location or not os.path.exists(location):
continue
logger.info(f"Cleaning {location}")
for root, dirs, files in os.walk(location, topdown=False):
for name in files:
try:
file_path = os.path.join(root, name)
size = os.path.getsize(file_path)
if safe_remove(file_path, logger):
total_cleaned += size
except Exception as e:
logger.error(f"Error processing {name}: {str(e)}")
for name in dirs:
try:
dir_path = os.path.join(root, name)
if os.path.exists(dir_path):
size = sum(
os.path.getsize(os.path.join(dirpath, filename))
for dirpath, dirnames, filenames in os.walk(dir_path)
for filename in filenames
)
if safe_remove(dir_path, logger):
total_cleaned += size
except Exception as e:
logger.error(f"Error processing directory {name}: {str(e)}")
logger.info(f"Total space cleaned from temp files: {get_size_format(total_cleaned)}")
return total_cleaned
def clean_recycle_bin(logger):
"""Empty the Recycle Bin"""
try:
subprocess.run(
["powershell.exe", "-Command", "Clear-RecycleBin", "-Force", "-ErrorAction", "SilentlyContinue"],
capture_output=True,
)
logger.info("Recycle Bin cleaned successfully")
except Exception as e:
logger.error(f"Error cleaning Recycle Bin: {str(e)}")
class FastVHDXFinder:
def __init__(self, logger):
self.logger = logger
self.vhdx_files = queue.Queue()
self.known_paths = set()
self.search_complete = threading.Event()
def _is_excluded_path(self, path: str) -> bool:
"""Check if path should be excluded from search"""
excluded = {"windows.old", "$recycle.bin", "system volume information", "recovery", "config.msi"}
return any(x in path.lower() for x in excluded)
def _get_common_vhdx_locations(self) -> Set[str]:
"""Get list of common VHDX locations"""
locations = set()
# Common Docker WSL locations
appdata_local = os.environ.get("LOCALAPPDATA", "")
if appdata_local:
locations.add(os.path.join(appdata_local, "Docker", "wsl"))
locations.add(os.path.join(appdata_local, "Packages"))
# Hyper-V default locations
public = os.environ.get("PUBLIC", "")
if public:
locations.add(os.path.join(public, "Documents", "Hyper-V"))
# WSL default location
locations.add(os.path.expandvars("%LOCALAPPDATA%\\Packages\\"))
return locations
def _windows_search(self):
"""Use Windows Search indexing to find VHDX files"""
try:
cmd = 'powershell.exe -Command "Get-ChildItem -Path $env:SystemDrive\\ -Recurse -ErrorAction SilentlyContinue -Filter *.vhdx | Select-Object FullName"'
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
for line in process.stdout:
if "FullName" not in line and line.strip(): # Skip header
vhdx_path = line.strip()
if vhdx_path and not self._is_excluded_path(vhdx_path):
self.vhdx_files.put(vhdx_path)
self.known_paths.add(vhdx_path.lower())
except Exception as e:
self.logger.error(f"Error in Windows Search: {str(e)}")
def _search_directory(self, directory: str):
"""Search a specific directory for VHDX files"""
try:
for entry in os.scandir(directory):
try:
if entry.is_file() and entry.name.lower().endswith(".vhdx"):
if entry.path.lower() not in self.known_paths:
self.vhdx_files.put(entry.path)
self.known_paths.add(entry.path.lower())
elif entry.is_dir() and not self._is_excluded_path(entry.path):
self._search_directory(entry.path)
except (PermissionError, OSError):
continue
except (PermissionError, OSError):
pass
def _targeted_search(self):
"""Perform targeted search in common VHDX locations"""
common_locations = self._get_common_vhdx_locations()
for location in common_locations:
if os.path.exists(location):
self._search_directory(location)
def find_vhdx_files(self) -> List[str]:
"""Main method to find VHDX files using multiple search methods"""
# Start Windows Search in a separate thread
search_thread = threading.Thread(target=self._windows_search)
search_thread.start()
# Perform targeted search in common locations
self._targeted_search()
# Wait for Windows Search to complete
search_thread.join(timeout=30) # 30 second timeout
# Collect all found VHDX files
vhdx_files = []
while not self.vhdx_files.empty():
vhdx_files.append(self.vhdx_files.get())
return vhdx_files
def optimize_vhdx(vhdx_path: str, logger) -> bool:
"""Optimize a single VHDX file"""
try:
initial_size = os.path.getsize(vhdx_path)
logger.info(f"Optimizing VHDX: {vhdx_path} (Size: {get_size_format(initial_size)})")
# Try to get exclusive access to the file
try:
handle = win32file.CreateFile(
vhdx_path,
win32con.GENERIC_READ,
0, # Exclusive access
None,
win32con.OPEN_EXISTING,
win32con.FILE_ATTRIBUTE_NORMAL,
None,
)
win32file.CloseHandle(handle)
except pywintypes.error:
logger.info(f"VHDX file is in use: {vhdx_path}")
# If it's a Docker VHDX, try to stop Docker
if "docker" in vhdx_path.lower():
logger.info("Stopping Docker services...")
subprocess.run(["powershell.exe", "-Command", "Stop-Service docker"], capture_output=True)
subprocess.run(["wsl", "--shutdown"], capture_output=True)
# Optimize the VHDX
result = subprocess.run(
["powershell.exe", "-Command", f'Optimize-VHD -Path "{vhdx_path}" -Mode Full'],
capture_output=True,
text=True,
)
if result.returncode == 0:
final_size = os.path.getsize(vhdx_path)
saved = initial_size - final_size
logger.info(f"Successfully optimized {vhdx_path}")
logger.info(f"Space saved: {get_size_format(saved)}")
return True
else:
logger.error(f"Error optimizing {vhdx_path}: {result.stderr}")
return False
except Exception as e:
logger.error(f"Error processing VHDX {vhdx_path}: {str(e)}")
return False
def find_and_optimize_vhdx(logger):
"""Find and optimize all VHDX files using the improved search"""
finder = FastVHDXFinder(logger)
vhdx_files = finder.find_vhdx_files()
logger.info(f"Found {len(vhdx_files)} VHDX files")
# Optimize files in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(optimize_vhdx, vhdx, logger): vhdx for vhdx in vhdx_files}
for future in concurrent.futures.as_completed(futures):
vhdx = futures[future]
try:
future.result()
except Exception as e:
logger.error(f"Error optimizing {vhdx}: {str(e)}")
# def find_and_optimize_vhdx(logger):
# """Find and optimize all VHDX files on the system"""
# vhdx_files = []
# # Search all drives for VHDX files
# for drive in [f"{chr(x)}:" for x in range(65, 91) if os.path.exists(f"{chr(x)}:")]:
# logger.info(f"Searching for VHDX files on drive {drive}")
# try:
# for root, _, files in os.walk(drive):
# for file in files:
# if file.lower().endswith(".vhdx"):
# vhdx_path = os.path.join(root, file)
# vhdx_files.append(vhdx_path)
# except Exception as e:
# logger.error(f"Error searching drive {drive}: {str(e)}")
# # Optimize each VHDX file found
# for vhdx_path in vhdx_files:
# try:
# initial_size = os.path.getsize(vhdx_path)
# logger.info(f"Found VHDX: {vhdx_path} (Size: {get_size_format(initial_size)})")
# # Check if file is in use
# if "docker" in vhdx_path.lower():
# logger.info("Docker VHDX detected - ensuring Docker is stopped...")
# subprocess.run(["powershell.exe", "-Command", "Stop-Service docker"], capture_output=True)
# subprocess.run(["wsl", "--shutdown"], capture_output=True)
# # Optimize the VHDX using Optimize-VHD
# logger.info(f"Optimizing {vhdx_path}")
# result = subprocess.run(
# ["powershell.exe", "-Command", f'Optimize-VHD -Path "{vhdx_path}" -Mode Full'],
# capture_output=True,
# text=True,
# )
# if result.returncode == 0:
# final_size = os.path.getsize(vhdx_path)
# saved = initial_size - final_size
# logger.info(f"Successfully optimized {vhdx_path}")
# logger.info(f"Space saved: {get_size_format(saved)}")
# else:
# logger.error(f"Error optimizing {vhdx_path}: {result.stderr}")
# except Exception as e:
# logger.error(f"Error processing VHDX {vhdx_path}: {str(e)}")
def run_disk_cleanup(logger):
"""Run Windows Disk Cleanup utility with all options selected"""
try:
# Run cleanmgr.exe with system files included
logger.info("Running Windows Disk Cleanup utility...")
subprocess.run(["cleanmgr.exe", "/sagerun:1"], capture_output=True)
logger.info("Disk Cleanup completed")
except Exception as e:
logger.error(f"Error running Disk Cleanup: {str(e)}")
def main():
# Check for administrator privileges
logger = setup_logging()
logger.info("Starting Windows cleanup process...")
if not is_admin():
logger.error("This script requires administrator privileges. Please run as administrator.")
ctypes.windll.shell32.ShellExecuteW(None, "runas", sys.executable, " ".join(sys.argv), None, 1)
return
# Get initial disk space
total, used, free = shutil.disk_usage("/")
initial_free = free
logger.info(f"Initial free space: {get_size_format(free)}")
# Run cleanup operations
clean_temp_files(logger)
clean_recycle_bin(logger)
run_disk_cleanup(logger)
find_and_optimize_vhdx(logger)
# Calculate space saved
_, _, final_free = shutil.disk_usage("/")
space_saved = final_free - initial_free
logger.info(f"Total space saved: {get_size_format(space_saved)}")
logger.info("Cleanup process completed!")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment