Last active
August 6, 2025 05:03
-
-
Save gaurish/35a1b6f49d8c430a7f3f84cb1f2285aa to your computer and use it in GitHub Desktop.
Improved WinRM utilities with better error handling and syntax fixes
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Improved WinRM utilities with better error handling and syntax fixes | |
| """ | |
| import winrm | |
| import base64 | |
| import json | |
| from typing import Dict, List, Optional, Tuple, Any | |
| import time | |
| import re | |
| class WinRMUtilsV2: | |
| def __init__(self, host='localhost', port=5985, username='User01', password='6.12rgr84U'): | |
| """Initialize WinRM connection with retry logic""" | |
| self.host = host | |
| self.port = port | |
| self.username = username | |
| self.password = password | |
| self._create_session() | |
| def _create_session(self): | |
| """Create WinRM session""" | |
| self.session = winrm.Session( | |
| f'http://{self.host}:{self.port}/wsman', | |
| auth=(self.username, self.password), | |
| transport='basic' | |
| ) | |
| def _safe_decode(self, data: bytes, encoding='utf-8') -> str: | |
| """Safely decode bytes to string""" | |
| if not data: | |
| return "" | |
| try: | |
| return data.decode(encoding, errors='ignore') | |
| except Exception: | |
| return str(data) | |
| def run_cmd(self, command: str, timeout: int = 30) -> Tuple[str, str, int]: | |
| """Run command with timeout and better error handling""" | |
| try: | |
| result = self.session.run_cmd(command, timeout=timeout) | |
| return ( | |
| self._safe_decode(result.std_out), | |
| self._safe_decode(result.std_err), | |
| result.status_code | |
| ) | |
| except Exception as e: | |
| return "", str(e), -1 | |
| def run_ps(self, script: str, timeout: int = 30) -> Tuple[str, str, int]: | |
| """Run PowerShell script with proper escaping""" | |
| try: | |
| # Remove any problematic characters | |
| script = script.replace('\r', '') | |
| result = self.session.run_ps(script) | |
| return ( | |
| self._safe_decode(result.std_out), | |
| self._safe_decode(result.std_err), | |
| result.status_code | |
| ) | |
| except Exception as e: | |
| return "", str(e), -1 | |
| def write_file(self, path: str, content: str) -> bool: | |
| """Write content to file with better encoding""" | |
| try: | |
| # Use base64 to avoid any encoding issues | |
| encoded = base64.b64encode(content.encode('utf-8')).decode('ascii') | |
| # Use single quotes for the script to avoid escaping issues | |
| ps_script = f''' | |
| $encodedContent = '{encoded}' | |
| $bytes = [System.Convert]::FromBase64String($encodedContent) | |
| $text = [System.Text.Encoding]::UTF8.GetString($bytes) | |
| Set-Content -Path '{path}' -Value $text -Encoding UTF8 | |
| ''' | |
| stdout, stderr, code = self.run_ps(ps_script) | |
| return code == 0 | |
| except Exception as e: | |
| print(f"Error writing file: {e}") | |
| return False | |
| def read_file(self, path: str, encoding='UTF8') -> Optional[str]: | |
| """Read file with better error handling""" | |
| ps_script = f''' | |
| if (Test-Path '{path}') {{ | |
| try {{ | |
| Get-Content '{path}' -Encoding {encoding} -Raw | |
| }} catch {{ | |
| Write-Error "Failed to read file: $_" | |
| }} | |
| }} else {{ | |
| Write-Error "File not found: {path}" | |
| }} | |
| ''' | |
| stdout, stderr, code = self.run_ps(ps_script) | |
| if code == 0: | |
| return stdout | |
| return None | |
| def file_exists(self, path: str) -> bool: | |
| """Check if file exists""" | |
| ps_script = f"Test-Path '{path}'" | |
| stdout, stderr, code = self.run_ps(ps_script) | |
| return 'True' in stdout | |
| def search_registry(self, key_path: str, pattern: str) -> List[Dict[str, Any]]: | |
| """Search registry with pattern matching""" | |
| ps_script = f''' | |
| $results = @() | |
| try {{ | |
| Get-ChildItem "{key_path}" -Recurse -ErrorAction SilentlyContinue | ForEach-Object {{ | |
| $key = $_ | |
| Get-ItemProperty $key.PSPath -ErrorAction SilentlyContinue | ForEach-Object {{ | |
| $props = $_ | |
| $props.PSObject.Properties | Where-Object {{ | |
| $_.Name -match '{pattern}' -or $_.Value -match '{pattern}' | |
| }} | ForEach-Object {{ | |
| $results += @{{ | |
| KeyPath = $key.PSPath | |
| Name = $_.Name | |
| Value = $_.Value | |
| Type = $_.TypeNameOfValue | |
| }} | |
| }} | |
| }} | |
| }} | |
| $results | ConvertTo-Json -Depth 3 | |
| }} catch {{ | |
| Write-Error "Registry search failed: $_" | |
| }} | |
| ''' | |
| stdout, stderr, code = self.run_ps(ps_script) | |
| if code == 0 and stdout.strip(): | |
| try: | |
| result = json.loads(stdout) | |
| return result if isinstance(result, list) else [result] | |
| except: | |
| return [] | |
| return [] | |
| def find_files(self, path: str, pattern: str, recursive: bool = True) -> List[str]: | |
| """Find files with pattern""" | |
| recurse = '-Recurse' if recursive else '' | |
| ps_script = f''' | |
| try {{ | |
| Get-ChildItem -Path '{path}' -Filter '{pattern}' {recurse} -File -ErrorAction SilentlyContinue | | |
| Select-Object -ExpandProperty FullName | |
| }} catch {{ | |
| @() | |
| }} | |
| ''' | |
| stdout, stderr, code = self.run_ps(ps_script) | |
| if code == 0 and stdout.strip(): | |
| return [line.strip() for line in stdout.strip().split('\n') if line.strip()] | |
| return [] | |
| def get_file_content_base64(self, path: str) -> Optional[str]: | |
| """Get file content as base64 (useful for binary files)""" | |
| ps_script = f''' | |
| if (Test-Path '{path}') {{ | |
| try {{ | |
| $bytes = [System.IO.File]::ReadAllBytes('{path}') | |
| [System.Convert]::ToBase64String($bytes) | |
| }} catch {{ | |
| Write-Error "Failed to read file: $_" | |
| }} | |
| }} else {{ | |
| Write-Error "File not found" | |
| }} | |
| ''' | |
| stdout, stderr, code = self.run_ps(ps_script) | |
| if code == 0: | |
| return stdout.strip() | |
| return None | |
| def search_files_content(self, path: str, content_pattern: str, file_pattern: str = "*") -> List[Dict[str, str]]: | |
| """Search file contents with pattern""" | |
| ps_script = f''' | |
| $results = @() | |
| Get-ChildItem -Path '{path}' -Filter '{file_pattern}' -Recurse -File -ErrorAction SilentlyContinue | ForEach-Object {{ | |
| $file = $_ | |
| try {{ | |
| $content = Get-Content $file.FullName -Raw -ErrorAction SilentlyContinue | |
| if ($content -match '{content_pattern}') {{ | |
| $results += @{{ | |
| Path = $file.FullName | |
| Match = ($content | Select-String '{content_pattern}' | Select-Object -First 1).Line | |
| }} | |
| }} | |
| }} catch {{}} | |
| }} | |
| $results | ConvertTo-Json -Depth 2 | |
| ''' | |
| stdout, stderr, code = self.run_ps(ps_script) | |
| if code == 0 and stdout.strip(): | |
| try: | |
| result = json.loads(stdout) | |
| return result if isinstance(result, list) else [result] | |
| except: | |
| return [] | |
| return [] | |
| def get_process_info(self, name_pattern: str = None) -> List[Dict[str, Any]]: | |
| """Get process information with better error handling""" | |
| if name_pattern: | |
| filter_clause = f"| Where-Object {{$_.ProcessName -like '*{name_pattern}*'}}" | |
| else: | |
| filter_clause = "" | |
| ps_script = f''' | |
| Get-Process {filter_clause} -ErrorAction SilentlyContinue | | |
| Select-Object Id, ProcessName, Path, | |
| @{{Name='CommandLine';Expression={{ | |
| (Get-WmiObject Win32_Process -Filter "ProcessId=$($_.Id)" -ErrorAction SilentlyContinue).CommandLine | |
| }}}} | | |
| ConvertTo-Json -Depth 2 | |
| ''' | |
| stdout, stderr, code = self.run_ps(ps_script) | |
| if code == 0 and stdout.strip(): | |
| try: | |
| result = json.loads(stdout) | |
| return result if isinstance(result, list) else [result] | |
| except: | |
| return [] | |
| return [] | |
| def extract_strings_from_file(self, file_path: str, min_length: int = 4) -> List[str]: | |
| """Extract printable strings from a file (like Unix strings command)""" | |
| ps_script = f''' | |
| try {{ | |
| $bytes = [System.IO.File]::ReadAllBytes('{file_path}') | |
| $strings = @() | |
| $current = "" | |
| foreach ($byte in $bytes) {{ | |
| if ($byte -ge 32 -and $byte -le 126) {{ | |
| $current += [char]$byte | |
| }} else {{ | |
| if ($current.Length -ge {min_length}) {{ | |
| $strings += $current | |
| }} | |
| $current = "" | |
| }} | |
| }} | |
| if ($current.Length -ge {min_length}) {{ | |
| $strings += $current | |
| }} | |
| $strings | Select-Object -Unique | |
| }} catch {{ | |
| Write-Error "Failed to extract strings: $_" | |
| }} | |
| ''' | |
| stdout, stderr, code = self.run_ps(ps_script) | |
| if code == 0 and stdout.strip(): | |
| return stdout.strip().split('\n') | |
| return [] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment