Created
October 27, 2024 08:14
-
-
Save abdalrohman/e86b5474dcfc9a2e3b74ea488dc90bbb to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import concurrent.futures | |
| import ctypes | |
| import logging | |
| import os | |
| import queue | |
| import shutil | |
| import subprocess | |
| import sys | |
| import threading | |
| from datetime import datetime | |
| from typing import List, Set | |
| import pywintypes | |
| import win32con | |
| import win32file | |
| def is_admin(): | |
| try: | |
| return ctypes.windll.shell32.IsUserAnAdmin() | |
| except: # noqa: E722 | |
| return False | |
| def setup_logging(): | |
| """Configure logging with timestamp""" | |
| log_dir = os.path.join(os.environ["USERPROFILE"], "Documents", "CleanupLogs") | |
| os.makedirs(log_dir, exist_ok=True) | |
| log_file = os.path.join(log_dir, f'cleanup_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log') | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s - %(levelname)s - %(message)s", | |
| handlers=[logging.FileHandler(log_file), logging.StreamHandler()], | |
| ) | |
| return logging.getLogger(__name__) | |
| def get_size_format(bytes): | |
| """Convert bytes to human readable format""" | |
| for unit in ["B", "KB", "MB", "GB", "TB"]: | |
| if bytes < 1024.0: | |
| return f"{bytes:.2f} {unit}" | |
| bytes /= 1024.0 | |
| def safe_remove(path, logger): | |
| """Safely remove a file or directory""" | |
| try: | |
| if os.path.isfile(path): | |
| os.remove(path) | |
| elif os.path.isdir(path): | |
| shutil.rmtree(path) | |
| logger.info(f"Successfully removed: {path}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error removing {path}: {str(e)}") | |
| return False | |
| def clean_temp_files(logger): | |
| """Clean temporary files from various Windows locations""" | |
| temp_locations = [ | |
| os.environ.get("TEMP"), | |
| os.environ.get("TMP"), | |
| os.path.join(os.environ.get("WINDIR"), "Temp"), | |
| os.path.join(os.environ.get("WINDIR"), "Prefetch"), | |
| os.path.join(os.environ.get("LOCALAPPDATA"), "Temp"), | |
| ] | |
| total_cleaned = 0 | |
| for location in temp_locations: | |
| if not location or not os.path.exists(location): | |
| continue | |
| logger.info(f"Cleaning {location}") | |
| for root, dirs, files in os.walk(location, topdown=False): | |
| for name in files: | |
| try: | |
| file_path = os.path.join(root, name) | |
| size = os.path.getsize(file_path) | |
| if safe_remove(file_path, logger): | |
| total_cleaned += size | |
| except Exception as e: | |
| logger.error(f"Error processing {name}: {str(e)}") | |
| for name in dirs: | |
| try: | |
| dir_path = os.path.join(root, name) | |
| if os.path.exists(dir_path): | |
| size = sum( | |
| os.path.getsize(os.path.join(dirpath, filename)) | |
| for dirpath, dirnames, filenames in os.walk(dir_path) | |
| for filename in filenames | |
| ) | |
| if safe_remove(dir_path, logger): | |
| total_cleaned += size | |
| except Exception as e: | |
| logger.error(f"Error processing directory {name}: {str(e)}") | |
| logger.info(f"Total space cleaned from temp files: {get_size_format(total_cleaned)}") | |
| return total_cleaned | |
| def clean_recycle_bin(logger): | |
| """Empty the Recycle Bin""" | |
| try: | |
| subprocess.run( | |
| ["powershell.exe", "-Command", "Clear-RecycleBin", "-Force", "-ErrorAction", "SilentlyContinue"], | |
| capture_output=True, | |
| ) | |
| logger.info("Recycle Bin cleaned successfully") | |
| except Exception as e: | |
| logger.error(f"Error cleaning Recycle Bin: {str(e)}") | |
| class FastVHDXFinder: | |
| def __init__(self, logger): | |
| self.logger = logger | |
| self.vhdx_files = queue.Queue() | |
| self.known_paths = set() | |
| self.search_complete = threading.Event() | |
| def _is_excluded_path(self, path: str) -> bool: | |
| """Check if path should be excluded from search""" | |
| excluded = {"windows.old", "$recycle.bin", "system volume information", "recovery", "config.msi"} | |
| return any(x in path.lower() for x in excluded) | |
| def _get_common_vhdx_locations(self) -> Set[str]: | |
| """Get list of common VHDX locations""" | |
| locations = set() | |
| # Common Docker WSL locations | |
| appdata_local = os.environ.get("LOCALAPPDATA", "") | |
| if appdata_local: | |
| locations.add(os.path.join(appdata_local, "Docker", "wsl")) | |
| locations.add(os.path.join(appdata_local, "Packages")) | |
| # Hyper-V default locations | |
| public = os.environ.get("PUBLIC", "") | |
| if public: | |
| locations.add(os.path.join(public, "Documents", "Hyper-V")) | |
| # WSL default location | |
| locations.add(os.path.expandvars("%LOCALAPPDATA%\\Packages\\")) | |
| return locations | |
| def _windows_search(self): | |
| """Use Windows Search indexing to find VHDX files""" | |
| try: | |
| cmd = 'powershell.exe -Command "Get-ChildItem -Path $env:SystemDrive\\ -Recurse -ErrorAction SilentlyContinue -Filter *.vhdx | Select-Object FullName"' | |
| process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | |
| for line in process.stdout: | |
| if "FullName" not in line and line.strip(): # Skip header | |
| vhdx_path = line.strip() | |
| if vhdx_path and not self._is_excluded_path(vhdx_path): | |
| self.vhdx_files.put(vhdx_path) | |
| self.known_paths.add(vhdx_path.lower()) | |
| except Exception as e: | |
| self.logger.error(f"Error in Windows Search: {str(e)}") | |
| def _search_directory(self, directory: str): | |
| """Search a specific directory for VHDX files""" | |
| try: | |
| for entry in os.scandir(directory): | |
| try: | |
| if entry.is_file() and entry.name.lower().endswith(".vhdx"): | |
| if entry.path.lower() not in self.known_paths: | |
| self.vhdx_files.put(entry.path) | |
| self.known_paths.add(entry.path.lower()) | |
| elif entry.is_dir() and not self._is_excluded_path(entry.path): | |
| self._search_directory(entry.path) | |
| except (PermissionError, OSError): | |
| continue | |
| except (PermissionError, OSError): | |
| pass | |
| def _targeted_search(self): | |
| """Perform targeted search in common VHDX locations""" | |
| common_locations = self._get_common_vhdx_locations() | |
| for location in common_locations: | |
| if os.path.exists(location): | |
| self._search_directory(location) | |
| def find_vhdx_files(self) -> List[str]: | |
| """Main method to find VHDX files using multiple search methods""" | |
| # Start Windows Search in a separate thread | |
| search_thread = threading.Thread(target=self._windows_search) | |
| search_thread.start() | |
| # Perform targeted search in common locations | |
| self._targeted_search() | |
| # Wait for Windows Search to complete | |
| search_thread.join(timeout=30) # 30 second timeout | |
| # Collect all found VHDX files | |
| vhdx_files = [] | |
| while not self.vhdx_files.empty(): | |
| vhdx_files.append(self.vhdx_files.get()) | |
| return vhdx_files | |
| def optimize_vhdx(vhdx_path: str, logger) -> bool: | |
| """Optimize a single VHDX file""" | |
| try: | |
| initial_size = os.path.getsize(vhdx_path) | |
| logger.info(f"Optimizing VHDX: {vhdx_path} (Size: {get_size_format(initial_size)})") | |
| # Try to get exclusive access to the file | |
| try: | |
| handle = win32file.CreateFile( | |
| vhdx_path, | |
| win32con.GENERIC_READ, | |
| 0, # Exclusive access | |
| None, | |
| win32con.OPEN_EXISTING, | |
| win32con.FILE_ATTRIBUTE_NORMAL, | |
| None, | |
| ) | |
| win32file.CloseHandle(handle) | |
| except pywintypes.error: | |
| logger.info(f"VHDX file is in use: {vhdx_path}") | |
| # If it's a Docker VHDX, try to stop Docker | |
| if "docker" in vhdx_path.lower(): | |
| logger.info("Stopping Docker services...") | |
| subprocess.run(["powershell.exe", "-Command", "Stop-Service docker"], capture_output=True) | |
| subprocess.run(["wsl", "--shutdown"], capture_output=True) | |
| # Optimize the VHDX | |
| result = subprocess.run( | |
| ["powershell.exe", "-Command", f'Optimize-VHD -Path "{vhdx_path}" -Mode Full'], | |
| capture_output=True, | |
| text=True, | |
| ) | |
| if result.returncode == 0: | |
| final_size = os.path.getsize(vhdx_path) | |
| saved = initial_size - final_size | |
| logger.info(f"Successfully optimized {vhdx_path}") | |
| logger.info(f"Space saved: {get_size_format(saved)}") | |
| return True | |
| else: | |
| logger.error(f"Error optimizing {vhdx_path}: {result.stderr}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error processing VHDX {vhdx_path}: {str(e)}") | |
| return False | |
| def find_and_optimize_vhdx(logger): | |
| """Find and optimize all VHDX files using the improved search""" | |
| finder = FastVHDXFinder(logger) | |
| vhdx_files = finder.find_vhdx_files() | |
| logger.info(f"Found {len(vhdx_files)} VHDX files") | |
| # Optimize files in parallel | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: | |
| futures = {executor.submit(optimize_vhdx, vhdx, logger): vhdx for vhdx in vhdx_files} | |
| for future in concurrent.futures.as_completed(futures): | |
| vhdx = futures[future] | |
| try: | |
| future.result() | |
| except Exception as e: | |
| logger.error(f"Error optimizing {vhdx}: {str(e)}") | |
| # def find_and_optimize_vhdx(logger): | |
| # """Find and optimize all VHDX files on the system""" | |
| # vhdx_files = [] | |
| # # Search all drives for VHDX files | |
| # for drive in [f"{chr(x)}:" for x in range(65, 91) if os.path.exists(f"{chr(x)}:")]: | |
| # logger.info(f"Searching for VHDX files on drive {drive}") | |
| # try: | |
| # for root, _, files in os.walk(drive): | |
| # for file in files: | |
| # if file.lower().endswith(".vhdx"): | |
| # vhdx_path = os.path.join(root, file) | |
| # vhdx_files.append(vhdx_path) | |
| # except Exception as e: | |
| # logger.error(f"Error searching drive {drive}: {str(e)}") | |
| # # Optimize each VHDX file found | |
| # for vhdx_path in vhdx_files: | |
| # try: | |
| # initial_size = os.path.getsize(vhdx_path) | |
| # logger.info(f"Found VHDX: {vhdx_path} (Size: {get_size_format(initial_size)})") | |
| # # Check if file is in use | |
| # if "docker" in vhdx_path.lower(): | |
| # logger.info("Docker VHDX detected - ensuring Docker is stopped...") | |
| # subprocess.run(["powershell.exe", "-Command", "Stop-Service docker"], capture_output=True) | |
| # subprocess.run(["wsl", "--shutdown"], capture_output=True) | |
| # # Optimize the VHDX using Optimize-VHD | |
| # logger.info(f"Optimizing {vhdx_path}") | |
| # result = subprocess.run( | |
| # ["powershell.exe", "-Command", f'Optimize-VHD -Path "{vhdx_path}" -Mode Full'], | |
| # capture_output=True, | |
| # text=True, | |
| # ) | |
| # if result.returncode == 0: | |
| # final_size = os.path.getsize(vhdx_path) | |
| # saved = initial_size - final_size | |
| # logger.info(f"Successfully optimized {vhdx_path}") | |
| # logger.info(f"Space saved: {get_size_format(saved)}") | |
| # else: | |
| # logger.error(f"Error optimizing {vhdx_path}: {result.stderr}") | |
| # except Exception as e: | |
| # logger.error(f"Error processing VHDX {vhdx_path}: {str(e)}") | |
| def run_disk_cleanup(logger): | |
| """Run Windows Disk Cleanup utility with all options selected""" | |
| try: | |
| # Run cleanmgr.exe with system files included | |
| logger.info("Running Windows Disk Cleanup utility...") | |
| subprocess.run(["cleanmgr.exe", "/sagerun:1"], capture_output=True) | |
| logger.info("Disk Cleanup completed") | |
| except Exception as e: | |
| logger.error(f"Error running Disk Cleanup: {str(e)}") | |
| def main(): | |
| # Check for administrator privileges | |
| logger = setup_logging() | |
| logger.info("Starting Windows cleanup process...") | |
| if not is_admin(): | |
| logger.error("This script requires administrator privileges. Please run as administrator.") | |
| ctypes.windll.shell32.ShellExecuteW(None, "runas", sys.executable, " ".join(sys.argv), None, 1) | |
| return | |
| # Get initial disk space | |
| total, used, free = shutil.disk_usage("/") | |
| initial_free = free | |
| logger.info(f"Initial free space: {get_size_format(free)}") | |
| # Run cleanup operations | |
| clean_temp_files(logger) | |
| clean_recycle_bin(logger) | |
| run_disk_cleanup(logger) | |
| find_and_optimize_vhdx(logger) | |
| # Calculate space saved | |
| _, _, final_free = shutil.disk_usage("/") | |
| space_saved = final_free - initial_free | |
| logger.info(f"Total space saved: {get_size_format(space_saved)}") | |
| logger.info("Cleanup process completed!") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment