Created
October 31, 2025 11:48
-
-
Save manchicken/8e205bf14e6146a0e32a35b24d4913ff to your computer and use it in GitHub Desktop.
Hunt for malware based on an inventory list as input
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Local Malware Scanner for Inventory Files | |
| Reads inventory.csv and scans files with ClamAV and YARA | |
| before prioritizing what to send to VirusTotal. | |
| """ | |
| import os | |
| import sys | |
| import csv | |
| import subprocess | |
| import hashlib | |
| from pathlib import Path | |
| def setup_logging(output_folder): | |
| """Setup simple logging""" | |
| log_file = os.path.join(output_folder, "scan.log") | |
| log_handle = open(log_file, 'w') | |
| return log_handle | |
| def log(log_handle, message): | |
| """Write to log and stdout""" | |
| print(message) | |
| log_handle.write(message + "\n") | |
| log_handle.flush() | |
| def check_dependencies(): | |
| """Check if required tools are installed""" | |
| missing = [] | |
| # Check ClamAV | |
| try: | |
| subprocess.run(['clamscan', '--version'], capture_output=True, check=True) | |
| except (subprocess.CalledProcessError, FileNotFoundError): | |
| missing.append('clamav') | |
| # Check YARA | |
| try: | |
| subprocess.run(['yara', '--version'], capture_output=True, check=True) | |
| except (subprocess.CalledProcessError, FileNotFoundError): | |
| missing.append('yara') | |
| return missing | |
| def scan_with_clamav(file_path): | |
| """Scan a single file with ClamAV""" | |
| try: | |
| result = subprocess.run( | |
| ['clamscan', '--no-summary', file_path], | |
| capture_output=True, | |
| text=True, | |
| timeout=30 | |
| ) | |
| # ClamAV returns 1 if virus found, 0 if clean | |
| if result.returncode == 1: | |
| # Parse output for virus name | |
| for line in result.stdout.split('\n'): | |
| if 'FOUND' in line: | |
| return {'infected': True, 'signature': line.strip()} | |
| return {'infected': False} | |
| except Exception as e: | |
| return {'infected': False, 'error': str(e)} | |
| def scan_with_yara(file_path, rules_path): | |
| """Scan a single file with YARA rules""" | |
| try: | |
| result = subprocess.run( | |
| ['yara', '-r', rules_path, file_path], | |
| capture_output=True, | |
| text=True, | |
| timeout=30 | |
| ) | |
| # YARA outputs matches as: rule_name file_path | |
| matches = [] | |
| for line in result.stdout.strip().split('\n'): | |
| if line: | |
| matches.append(line.split()[0]) | |
| if matches: | |
| return {'matched': True, 'rules': matches} | |
| return {'matched': False} | |
| except Exception as e: | |
| return {'matched': False, 'error': str(e)} | |
| def calculate_sha256(file_path): | |
| """Calculate SHA256 hash of a file""" | |
| sha256_hash = hashlib.sha256() | |
| try: | |
| with open(file_path, "rb") as f: | |
| for byte_block in iter(lambda: f.read(65536), b""): | |
| sha256_hash.update(byte_block) | |
| return sha256_hash.hexdigest() | |
| except Exception as e: | |
| return None | |
| def scan_inventory(inventory_csv, output_folder, yara_rules_path): | |
| """Main scanning function""" | |
| log_handle = setup_logging(output_folder) | |
| log(log_handle, f"[*] Starting scan of inventory: {inventory_csv}") | |
| log(log_handle, f"[*] Output folder: {output_folder}") | |
| log(log_handle, f"[*] YARA rules path: {yara_rules_path}") | |
| # Ensure output folder exists | |
| os.makedirs(output_folder, exist_ok=True) | |
| # Output CSV files | |
| high_priority_csv = os.path.join(output_folder, "high_priority_vt.csv") | |
| medium_priority_csv = os.path.join(output_folder, "medium_priority_vt.csv") | |
| no_detections_csv = os.path.join(output_folder, "no_local_detections.csv") | |
| # CSV columns for output | |
| fieldnames = [ | |
| 'File', | |
| 'SHA256', | |
| 'Type by Extension', | |
| 'Actual Type', | |
| 'Size', | |
| 'ClamAV Result', | |
| 'YARA Matches', | |
| 'Priority' | |
| ] | |
| # Read inventory | |
| with open(inventory_csv, 'r') as f: | |
| reader = csv.DictReader(f) | |
| inventory = list(reader) | |
| log(log_handle, f"[*] Found {len(inventory)} files in inventory") | |
| # Results lists | |
| high_priority = [] | |
| medium_priority = [] | |
| no_detections = [] | |
| # Scan each file | |
| for idx, row in enumerate(inventory, 1): | |
| file_path = row['File'] | |
| log(log_handle, f"\n[{idx}/{len(inventory)}] Scanning: {file_path}") | |
| # Check if file still exists | |
| if not os.path.exists(file_path): | |
| log(log_handle, f" [!] File not found, skipping") | |
| continue | |
| # Calculate hash | |
| file_hash = calculate_sha256(file_path) | |
| if not file_hash: | |
| log(log_handle, f" [!] Failed to hash file") | |
| continue | |
| # Scan with ClamAV | |
| log(log_handle, f" [*] Running ClamAV scan...") | |
| clam_result = scan_with_clamav(file_path) | |
| # Scan with YARA | |
| log(log_handle, f" [*] Running YARA scan...") | |
| yara_result = scan_with_yara(file_path, yara_rules_path) | |
| # Prepare output row | |
| output_row = { | |
| 'File': file_path, | |
| 'SHA256': file_hash, | |
| 'Type by Extension': row.get('Type by Extension', ''), | |
| 'Actual Type': row.get('Actual Type', ''), | |
| 'Size': row.get('Size', ''), | |
| 'ClamAV Result': '', | |
| 'YARA Matches': '', | |
| 'Priority': '' | |
| } | |
| # Determine priority | |
| is_infected = False | |
| if clam_result.get('infected'): | |
| output_row['ClamAV Result'] = clam_result.get('signature', 'INFECTED') | |
| log(log_handle, f" [!!!] ClamAV DETECTED: {clam_result.get('signature')}") | |
| is_infected = True | |
| else: | |
| output_row['ClamAV Result'] = 'No detection' | |
| if yara_result.get('matched'): | |
| output_row['YARA Matches'] = ', '.join(yara_result.get('rules', [])) | |
| log(log_handle, f" [!!!] YARA MATCHED: {output_row['YARA Matches']}") | |
| is_infected = True | |
| else: | |
| output_row['YARA Matches'] = 'No matches' | |
| # Categorize | |
| if is_infected: | |
| output_row['Priority'] = 'HIGH' | |
| high_priority.append(output_row) | |
| log(log_handle, f" [!!!] PRIORITY: HIGH - Local tools detected malware, verify with VT") | |
| elif 'application/x-' in row.get('Actual Type', '') or 'executable' in row.get('Actual Type', '').lower(): | |
| output_row['Priority'] = 'MEDIUM' | |
| medium_priority.append(output_row) | |
| log(log_handle, f" [!] PRIORITY: MEDIUM - Executable with no local detection, check VT") | |
| else: | |
| output_row['Priority'] = 'LOW' | |
| no_detections.append(output_row) | |
| log(log_handle, f" [i] PRIORITY: LOW - No local detection, VT recommended if quota allows") | |
| # Write output CSVs | |
| log(log_handle, f"\n[*] Writing results...") | |
| with open(high_priority_csv, 'w', newline='') as f: | |
| writer = csv.DictWriter(f, fieldnames=fieldnames) | |
| writer.writeheader() | |
| writer.writerows(high_priority) | |
| with open(medium_priority_csv, 'w', newline='') as f: | |
| writer = csv.DictWriter(f, fieldnames=fieldnames) | |
| writer.writeheader() | |
| writer.writerows(medium_priority) | |
| with open(no_detections_csv, 'w', newline='') as f: | |
| writer = csv.DictWriter(f, fieldnames=fieldnames) | |
| writer.writeheader() | |
| writer.writerows(no_detections) | |
| # Summary | |
| log(log_handle, f"\n{'='*60}") | |
| log(log_handle, f"SCAN COMPLETE") | |
| log(log_handle, f"{'='*60}") | |
| log(log_handle, f"Total files scanned: {len(inventory)}") | |
| log(log_handle, f"HIGH priority (local detection): {len(high_priority)}") | |
| log(log_handle, f"MEDIUM priority (executables, no local detection): {len(medium_priority)}") | |
| log(log_handle, f"LOW priority (no local detection): {len(no_detections)}") | |
| log(log_handle, f"\nREMINDER: 'No local detection' does NOT mean clean.") | |
| log(log_handle, f"VirusTotal verification is still recommended for all files.") | |
| log(log_handle, f"\nOutput files:") | |
| log(log_handle, f" - {high_priority_csv}") | |
| log(log_handle, f" - {medium_priority_csv}") | |
| log(log_handle, f" - {no_detections_csv}") | |
| log(log_handle, f" - {os.path.join(output_folder, 'scan.log')}") | |
| log(log_handle, f"{'='*60}") | |
| log_handle.close() | |
| def main(): | |
| """Main entry point""" | |
| if len(sys.argv) != 4: | |
| print("Usage: scan-inventory <inventory.csv> <output_folder> <yara_rules_path>") | |
| print("\nExample:") | |
| print(" scan-inventory inventory.csv ./scan_results ./yara-rules/") | |
| print("\nYARA rules path should point to a directory containing .yar files") | |
| sys.exit(1) | |
| inventory_csv = sys.argv[1] | |
| output_folder = sys.argv[2] | |
| yara_rules_path = sys.argv[3] | |
| # Validate inputs | |
| if not os.path.exists(inventory_csv): | |
| print(f"Error: Inventory file not found: {inventory_csv}") | |
| sys.exit(1) | |
| if not os.path.exists(yara_rules_path): | |
| print(f"Error: YARA rules path not found: {yara_rules_path}") | |
| sys.exit(1) | |
| # Check dependencies | |
| missing = check_dependencies() | |
| if missing: | |
| print(f"Error: Missing required tools: {', '.join(missing)}") | |
| print("\nInstall with:") | |
| if 'clamav' in missing: | |
| print(" sudo apt-get install clamav") | |
| print(" sudo freshclam # Update signatures") | |
| if 'yara' in missing: | |
| print(" sudo apt-get install yara") | |
| sys.exit(1) | |
| scan_inventory(inventory_csv, output_folder, yara_rules_path) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment