Skip to content

Instantly share code, notes, and snippets.

@manchicken
Created October 31, 2025 11:48
Show Gist options
  • Select an option

  • Save manchicken/8e205bf14e6146a0e32a35b24d4913ff to your computer and use it in GitHub Desktop.

Select an option

Save manchicken/8e205bf14e6146a0e32a35b24d4913ff to your computer and use it in GitHub Desktop.
Hunt for malware based on an inventory list as input
#!/usr/bin/env python3
"""
Local Malware Scanner for Inventory Files
Reads inventory.csv and scans files with ClamAV and YARA
before prioritizing what to send to VirusTotal.
"""
import os
import sys
import csv
import subprocess
import hashlib
from pathlib import Path
def setup_logging(output_folder):
"""Setup simple logging"""
log_file = os.path.join(output_folder, "scan.log")
log_handle = open(log_file, 'w')
return log_handle
def log(log_handle, message):
"""Write to log and stdout"""
print(message)
log_handle.write(message + "\n")
log_handle.flush()
def check_dependencies():
"""Check if required tools are installed"""
missing = []
# Check ClamAV
try:
subprocess.run(['clamscan', '--version'], capture_output=True, check=True)
except (subprocess.CalledProcessError, FileNotFoundError):
missing.append('clamav')
# Check YARA
try:
subprocess.run(['yara', '--version'], capture_output=True, check=True)
except (subprocess.CalledProcessError, FileNotFoundError):
missing.append('yara')
return missing
def scan_with_clamav(file_path):
"""Scan a single file with ClamAV"""
try:
result = subprocess.run(
['clamscan', '--no-summary', file_path],
capture_output=True,
text=True,
timeout=30
)
# ClamAV returns 1 if virus found, 0 if clean
if result.returncode == 1:
# Parse output for virus name
for line in result.stdout.split('\n'):
if 'FOUND' in line:
return {'infected': True, 'signature': line.strip()}
return {'infected': False}
except Exception as e:
return {'infected': False, 'error': str(e)}
def scan_with_yara(file_path, rules_path):
"""Scan a single file with YARA rules"""
try:
result = subprocess.run(
['yara', '-r', rules_path, file_path],
capture_output=True,
text=True,
timeout=30
)
# YARA outputs matches as: rule_name file_path
matches = []
for line in result.stdout.strip().split('\n'):
if line:
matches.append(line.split()[0])
if matches:
return {'matched': True, 'rules': matches}
return {'matched': False}
except Exception as e:
return {'matched': False, 'error': str(e)}
def calculate_sha256(file_path):
"""Calculate SHA256 hash of a file"""
sha256_hash = hashlib.sha256()
try:
with open(file_path, "rb") as f:
for byte_block in iter(lambda: f.read(65536), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
except Exception as e:
return None
def scan_inventory(inventory_csv, output_folder, yara_rules_path):
"""Main scanning function"""
log_handle = setup_logging(output_folder)
log(log_handle, f"[*] Starting scan of inventory: {inventory_csv}")
log(log_handle, f"[*] Output folder: {output_folder}")
log(log_handle, f"[*] YARA rules path: {yara_rules_path}")
# Ensure output folder exists
os.makedirs(output_folder, exist_ok=True)
# Output CSV files
high_priority_csv = os.path.join(output_folder, "high_priority_vt.csv")
medium_priority_csv = os.path.join(output_folder, "medium_priority_vt.csv")
no_detections_csv = os.path.join(output_folder, "no_local_detections.csv")
# CSV columns for output
fieldnames = [
'File',
'SHA256',
'Type by Extension',
'Actual Type',
'Size',
'ClamAV Result',
'YARA Matches',
'Priority'
]
# Read inventory
with open(inventory_csv, 'r') as f:
reader = csv.DictReader(f)
inventory = list(reader)
log(log_handle, f"[*] Found {len(inventory)} files in inventory")
# Results lists
high_priority = []
medium_priority = []
no_detections = []
# Scan each file
for idx, row in enumerate(inventory, 1):
file_path = row['File']
log(log_handle, f"\n[{idx}/{len(inventory)}] Scanning: {file_path}")
# Check if file still exists
if not os.path.exists(file_path):
log(log_handle, f" [!] File not found, skipping")
continue
# Calculate hash
file_hash = calculate_sha256(file_path)
if not file_hash:
log(log_handle, f" [!] Failed to hash file")
continue
# Scan with ClamAV
log(log_handle, f" [*] Running ClamAV scan...")
clam_result = scan_with_clamav(file_path)
# Scan with YARA
log(log_handle, f" [*] Running YARA scan...")
yara_result = scan_with_yara(file_path, yara_rules_path)
# Prepare output row
output_row = {
'File': file_path,
'SHA256': file_hash,
'Type by Extension': row.get('Type by Extension', ''),
'Actual Type': row.get('Actual Type', ''),
'Size': row.get('Size', ''),
'ClamAV Result': '',
'YARA Matches': '',
'Priority': ''
}
# Determine priority
is_infected = False
if clam_result.get('infected'):
output_row['ClamAV Result'] = clam_result.get('signature', 'INFECTED')
log(log_handle, f" [!!!] ClamAV DETECTED: {clam_result.get('signature')}")
is_infected = True
else:
output_row['ClamAV Result'] = 'No detection'
if yara_result.get('matched'):
output_row['YARA Matches'] = ', '.join(yara_result.get('rules', []))
log(log_handle, f" [!!!] YARA MATCHED: {output_row['YARA Matches']}")
is_infected = True
else:
output_row['YARA Matches'] = 'No matches'
# Categorize
if is_infected:
output_row['Priority'] = 'HIGH'
high_priority.append(output_row)
log(log_handle, f" [!!!] PRIORITY: HIGH - Local tools detected malware, verify with VT")
elif 'application/x-' in row.get('Actual Type', '') or 'executable' in row.get('Actual Type', '').lower():
output_row['Priority'] = 'MEDIUM'
medium_priority.append(output_row)
log(log_handle, f" [!] PRIORITY: MEDIUM - Executable with no local detection, check VT")
else:
output_row['Priority'] = 'LOW'
no_detections.append(output_row)
log(log_handle, f" [i] PRIORITY: LOW - No local detection, VT recommended if quota allows")
# Write output CSVs
log(log_handle, f"\n[*] Writing results...")
with open(high_priority_csv, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(high_priority)
with open(medium_priority_csv, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(medium_priority)
with open(no_detections_csv, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(no_detections)
# Summary
log(log_handle, f"\n{'='*60}")
log(log_handle, f"SCAN COMPLETE")
log(log_handle, f"{'='*60}")
log(log_handle, f"Total files scanned: {len(inventory)}")
log(log_handle, f"HIGH priority (local detection): {len(high_priority)}")
log(log_handle, f"MEDIUM priority (executables, no local detection): {len(medium_priority)}")
log(log_handle, f"LOW priority (no local detection): {len(no_detections)}")
log(log_handle, f"\nREMINDER: 'No local detection' does NOT mean clean.")
log(log_handle, f"VirusTotal verification is still recommended for all files.")
log(log_handle, f"\nOutput files:")
log(log_handle, f" - {high_priority_csv}")
log(log_handle, f" - {medium_priority_csv}")
log(log_handle, f" - {no_detections_csv}")
log(log_handle, f" - {os.path.join(output_folder, 'scan.log')}")
log(log_handle, f"{'='*60}")
log_handle.close()
def main():
"""Main entry point"""
if len(sys.argv) != 4:
print("Usage: scan-inventory <inventory.csv> <output_folder> <yara_rules_path>")
print("\nExample:")
print(" scan-inventory inventory.csv ./scan_results ./yara-rules/")
print("\nYARA rules path should point to a directory containing .yar files")
sys.exit(1)
inventory_csv = sys.argv[1]
output_folder = sys.argv[2]
yara_rules_path = sys.argv[3]
# Validate inputs
if not os.path.exists(inventory_csv):
print(f"Error: Inventory file not found: {inventory_csv}")
sys.exit(1)
if not os.path.exists(yara_rules_path):
print(f"Error: YARA rules path not found: {yara_rules_path}")
sys.exit(1)
# Check dependencies
missing = check_dependencies()
if missing:
print(f"Error: Missing required tools: {', '.join(missing)}")
print("\nInstall with:")
if 'clamav' in missing:
print(" sudo apt-get install clamav")
print(" sudo freshclam # Update signatures")
if 'yara' in missing:
print(" sudo apt-get install yara")
sys.exit(1)
scan_inventory(inventory_csv, output_folder, yara_rules_path)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment