Created
December 6, 2025 10:56
-
-
Save KaushikShresth07/97a91145a6d4bcaca163fe9e19c604c4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Enhanced HuskyLens Interactive Example | |
| Comprehensive interactive menu for testing all HuskyLens features | |
| """ | |
| import random | |
| import time | |
| import json | |
| import sys | |
| import argparse | |
| from datetime import datetime | |
| from huskylib import (HuskyLensLibrary, HuskyLensError, HuskyLensConnectionError, | |
| HuskyLensCommunicationError, HuskyLensTimeoutError, | |
| HuskyLensValidationError) | |
| algorthimsByteID = { | |
| "ALGORITHM_OBJECT_TRACKING": "0100", | |
| "ALGORITHM_FACE_RECOGNITION": "0000", | |
| "ALGORITHM_OBJECT_RECOGNITION": "0200", | |
| "ALGORITHM_LINE_TRACKING": "0300", | |
| "ALGORITHM_COLOR_RECOGNITION": "0400", | |
| "ALGORITHM_TAG_RECOGNITION": "0500", | |
| "ALGORITHM_OBJECT_CLASSIFICATION": "0600", | |
| "ALGORITHM_QR_CODE_RECOGNTITION": "0700", | |
| "ALGORITHM_BARCODE_RECOGNTITION": "0800", | |
| } | |
| commandList = [ | |
| ('knock()', 'Test connection'), | |
| ('setCustomName(name, id)', 'Set custom name for object ID'), | |
| ('customText(text, x, y)', 'Display custom text on screen'), | |
| ('clearText()', 'Clear custom text'), | |
| ('requestAll()', 'Get all detected objects'), | |
| ('saveModelToSDCard(id)', 'Save model to SD card'), | |
| ('loadModelFromSDCard(id)', 'Load model from SD card'), | |
| ('savePictureToSDCard()', 'Save picture to SD card'), | |
| ('count()', 'Get total object count'), | |
| ('learnedObjCount()', 'Get learned object count'), | |
| ('frameNumber()', 'Get current frame number'), | |
| ('saveScreenshotToSDCard()', 'Save screenshot to SD card'), | |
| ('blocks()', 'Get all blocks (detected objects)'), | |
| ('arrows()', 'Get all arrows (direction indicators)'), | |
| ('learned()', 'Get all learned objects'), | |
| ('learnedBlocks()', 'Get learned blocks'), | |
| ('learnedArrows()', 'Get learned arrows'), | |
| ('getObjectByID(id)', 'Get object by ID'), | |
| ('getBlocksByID(id)', 'Get blocks by ID'), | |
| ('getArrowsByID(id)', 'Get arrows by ID'), | |
| ('algorthim(name)', 'Set algorithm'), | |
| ('learn(id)', 'Learn object with ID'), | |
| ('forget()', 'Forget all learned objects'), | |
| ('get_stats()', 'Get library statistics'), | |
| ] | |
| def print_separator(char="=", width=80): | |
| print(char * width) | |
| def print_header(text): | |
| print_separator("=") | |
| print(f" {text}") | |
| print_separator("=") | |
| def print_menu(): | |
| """Print enhanced menu""" | |
| print_header("HUSKYLENS INTERACTIVE MENU") | |
| print("\n Available Commands:") | |
| print(" " + "β" * 76) | |
| for i, (cmd, desc) in enumerate(commandList): | |
| letter = chr(i + 97) # a, b, c, ... | |
| print(f" {letter:2s}). {cmd:35s} - {desc}") | |
| print("\n Special Commands:") | |
| print(" " + "β" * 76) | |
| print(" menu - Show this menu") | |
| print(" quit - Exit program") | |
| print(" stats - Show library statistics") | |
| print(" help - Show detailed help") | |
| print(" alg - List all algorithms") | |
| print(" " + "β" * 76) | |
| def print_object_nicely(obj, detailed=False): | |
| """Enhanced object printing""" | |
| count = 1 | |
| if isinstance(obj, list): | |
| if len(obj) == 0: | |
| print("\t β No objects detected") | |
| return | |
| for i in obj: | |
| obj_type = "BLOCK" if i.type == "BLOCK" else "ARROW" | |
| learned = "β Learned" if i.learned else "β Not Learned" | |
| if obj_type == "BLOCK": | |
| center = i.center() if hasattr(i, 'center') else (i.x + i.width//2, i.y + i.height//2) | |
| area = i.area() if hasattr(i, 'area') else i.width * i.height | |
| print(f"\t {obj_type} #{count}:") | |
| print(f"\t ID: {i.ID} | Position: ({i.x}, {i.y})") | |
| print(f"\t Size: {i.width}Γ{i.height} | Area: {area} pxΒ²") | |
| print(f"\t Center: {center} | Status: {learned}") | |
| if detailed: | |
| print(f"\t Full: {json.dumps(i.__dict__, indent=10)}") | |
| else: # ARROW | |
| print(f"\t {obj_type} #{count}:") | |
| print(f"\t ID: {i.ID} | Tail: ({i.xTail}, {i.yTail})") | |
| print(f"\t Head: ({i.xHead}, {i.yHead}) | Status: {learned}") | |
| if detailed: | |
| print(f"\t Full: {json.dumps(i.__dict__, indent=10)}") | |
| count += 1 | |
| else: | |
| print(f"\t Single object: {json.dumps(obj.__dict__, indent=8)}") | |
| def print_help(): | |
| """Print detailed help""" | |
| print_header("DETAILED HELP") | |
| print(""" | |
| Connection: | |
| - Make sure HuskyLens is properly connected (I2C or Serial) | |
| - For I2C: Check with 'sudo i2cdetect -y 1' (should show 0x32) | |
| - For Serial: Check with 'ls -l /dev/tty*' | |
| Algorithms: | |
| - Use 'alg' command to see all available algorithms | |
| - Switch algorithms using 't' command (random) or specify algorithm name | |
| Learning: | |
| - Use 'u' to learn an object (point camera at object and press button) | |
| - Use 'v' to forget all learned objects | |
| - Learned objects have ID > 0 | |
| Detection: | |
| - 'l' gets all detected blocks (objects) | |
| - 'm' gets all detected arrows (directions) | |
| - 'n' gets all learned objects | |
| - 'e' gets everything at once | |
| Customization: | |
| - 'b' sets a custom name for an object ID | |
| - 'c' displays custom text on the HuskyLens screen | |
| - 'd' clears the custom text | |
| SD Card: | |
| - Requires SD card to be inserted | |
| - 'f' saves model, 'g' loads model | |
| - 'h' saves picture, 'k' saves screenshot | |
| """) | |
| def print_algorithms(): | |
| """Print all available algorithms""" | |
| print_header("AVAILABLE ALGORITHMS") | |
| for i, (name, code) in enumerate(algorthimsByteID.items(), 1): | |
| print(f" {i:2d}. {name.replace('ALGORITHM_', '')}") | |
| print(f" Code: {code}") | |
| def execute_command(hl, cmd_letter): | |
| """Execute command based on letter with comprehensive error handling""" | |
| v = cmd_letter.lower() | |
| start_time = time.time() | |
| try: | |
| # Validate command letter | |
| if not v or len(v) != 1 or not v.isalpha(): | |
| print(f"[*] ERROR: Invalid command '{cmd_letter}' [*]") | |
| return False | |
| if v == 'a': | |
| print('[*] COMMAND -> knock() [*]') | |
| result = hl.knock() | |
| print(f"[*] RESPONSE: {result} [*]") | |
| elif v == 'b': | |
| name = f"test_{random.randint(1, 100)}" | |
| id_val = random.randint(1, 3) | |
| print(f'[*] COMMAND -> setCustomName("{name}", {id_val}) [*]') | |
| result = hl.setCustomName(name, id_val) | |
| print(f"[*] RESPONSE: {result} [*]") | |
| elif v == 'c': | |
| text = f"HL_{random.randint(1, 100)}" | |
| x = random.randint(5, 300) | |
| y = random.randint(5, 200) | |
| print(f'[*] COMMAND -> customText("{text}", {x}, {y}) [*]') | |
| result = hl.customText(text, x, y) | |
| print(f"[*] RESPONSE: {result} [*]") | |
| elif v == 'd': | |
| print('[*] COMMAND -> clearText() [*]') | |
| result = hl.clearText() | |
| print(f"[*] RESPONSE: {result} [*]") | |
| elif v == 'e': | |
| print('[*] COMMAND -> requestAll() [*]') | |
| print("[*] RESPONSE [*]") | |
| result = hl.requestAll() | |
| print_object_nicely(result, detailed=True) | |
| elif v == 'f': | |
| id_val = random.randint(1, 99) | |
| print(f'[*] COMMAND -> saveModelToSDCard({id_val}) [*]') | |
| result = hl.saveModelToSDCard(id_val) | |
| print(f"[*] RESPONSE: {result} [*]") | |
| elif v == 'g': | |
| id_val = random.randint(1, 99) | |
| print(f'[*] COMMAND -> loadModelFromSDCard({id_val}) [*]') | |
| result = hl.loadModelFromSDCard(id_val) | |
| print(f"[*] RESPONSE: {result} [*]") | |
| elif v == 'h': | |
| print('[*] COMMAND -> savePictureToSDCard() [*]') | |
| result = hl.savePictureToSDCard() | |
| print(f"[*] RESPONSE: {result} [*]") | |
| elif v == 'i': | |
| print('[*] COMMAND -> count() [*]') | |
| result = hl.count() | |
| print(f"[*] RESPONSE: {result} [*]") | |
| elif v == 'j': | |
| print('[*] COMMAND -> learnedObjCount() [*]') | |
| result = hl.learnedObjCount() | |
| print(f"[*] RESPONSE: {result} [*]") | |
| elif v == 'k': | |
| print('[*] COMMAND -> saveScreenshotToSDCard() [*]') | |
| result = hl.saveScreenshotToSDCard() | |
| print(f"[*] RESPONSE: {result} [*]") | |
| elif v == 'l': | |
| print('[*] COMMAND -> blocks() [*]') | |
| print("[*] RESPONSE [*]") | |
| result = hl.blocks() | |
| print_object_nicely(result, detailed=True) | |
| elif v == 'm': | |
| print('[*] COMMAND -> arrows() [*]') | |
| print("[*] RESPONSE [*]") | |
| result = hl.arrows() | |
| print_object_nicely(result, detailed=True) | |
| elif v == 'n': | |
| print('[*] COMMAND -> learned() [*]') | |
| print("[*] RESPONSE [*]") | |
| result = hl.learned() | |
| print_object_nicely(result, detailed=True) | |
| elif v == 'o': | |
| print('[*] COMMAND -> learnedBlocks() [*]') | |
| print("[*] RESPONSE [*]") | |
| result = hl.learnedBlocks() | |
| print_object_nicely(result, detailed=True) | |
| elif v == 'p': | |
| print('[*] COMMAND -> learnedArrows() [*]') | |
| print("[*] RESPONSE [*]") | |
| result = hl.learnedArrows() | |
| print_object_nicely(result, detailed=True) | |
| elif v == 'q': | |
| id_val = 1 | |
| print(f'[*] COMMAND -> getObjectByID({id_val}) [*]') | |
| print("[*] RESPONSE [*]") | |
| result = hl.getObjectByID(id_val) | |
| print_object_nicely(result, detailed=True) | |
| elif v == 'r': | |
| id_val = 1 | |
| print(f'[*] COMMAND -> getBlocksByID({id_val}) [*]') | |
| print("[*] RESPONSE [*]") | |
| result = hl.getBlocksByID(id_val) | |
| print_object_nicely(result, detailed=True) | |
| elif v == 's': | |
| id_val = 1 | |
| print(f'[*] COMMAND -> getArrowsByID({id_val}) [*]') | |
| print("[*] RESPONSE [*]") | |
| result = hl.getArrowsByID(id_val) | |
| print_object_nicely(result, detailed=True) | |
| elif v == 't': | |
| algs = list(algorthimsByteID.keys()) | |
| alg = algs[random.randint(0, len(algs) - 1)] | |
| print(f'[*] COMMAND -> algorthim("{alg}") [*]') | |
| result = hl.algorthim(alg) | |
| print(f"[*] RESPONSE: {result} [*]") | |
| elif v == 'u': | |
| id_val = 1 | |
| print(f'[*] COMMAND -> learn({id_val}) [*]') | |
| print("[*] Note: Point camera at object and press button on HuskyLens [*]") | |
| result = hl.learn(id_val) | |
| print(f"[*] RESPONSE: {result} [*]") | |
| elif v == 'v': | |
| print('[*] COMMAND -> forget() [*]') | |
| result = hl.forget() | |
| print(f"[*] RESPONSE: {result} [*]") | |
| elif v == 'w': | |
| print('[*] COMMAND -> frameNumber() [*]') | |
| result = hl.frameNumber() | |
| print(f"[*] RESPONSE: {result} [*]") | |
| else: | |
| print(f"[*] ERROR: Unknown command '{v}' [*]") | |
| return False | |
| duration = time.time() - start_time | |
| print(f"[*] Duration: {duration:.3f}s [*]") | |
| return True | |
| except HuskyLensValidationError as e: | |
| duration = time.time() - start_time | |
| print(f"[*] ERROR (Validation): {e} [*]") | |
| print(f"[*] π‘ Check input parameters (IDs, coordinates, text length) [*]") | |
| print(f"[*] Duration: {duration:.3f}s [*]") | |
| return False | |
| except HuskyLensConnectionError as e: | |
| duration = time.time() - start_time | |
| print(f"[*] ERROR (Connection): {e} [*]") | |
| print(f"[*] π‘ Check physical connection and device power [*]") | |
| print(f"[*] Duration: {duration:.3f}s [*]") | |
| return False | |
| except HuskyLensCommunicationError as e: | |
| duration = time.time() - start_time | |
| print(f"[*] ERROR (Communication): {e} [*]") | |
| print(f"[*] π‘ Communication error - command will be retried automatically [*]") | |
| print(f"[*] Duration: {duration:.3f}s [*]") | |
| return False | |
| except HuskyLensTimeoutError as e: | |
| duration = time.time() - start_time | |
| print(f"[*] ERROR (Timeout): {e} [*]") | |
| print(f"[*] π‘ Device may be unresponsive - try again or power cycle [*]") | |
| print(f"[*] Duration: {duration:.3f}s [*]") | |
| return False | |
| except KeyboardInterrupt: | |
| print("\n[*] Command interrupted by user [*]") | |
| return False | |
| except HuskyLensError as e: | |
| duration = time.time() - start_time | |
| print(f"[*] ERROR (HuskyLens): {e} [*]") | |
| print(f"[*] Duration: {duration:.3f}s [*]") | |
| return False | |
| except Exception as e: | |
| duration = time.time() - start_time | |
| print(f"[*] ERROR (Unexpected): {e} [*]") | |
| print(f"[*] Error type: {type(e).__name__} [*]") | |
| print(f"[*] Duration: {duration:.3f}s [*]") | |
| import traceback | |
| if hl.debug: | |
| traceback.print_exc() | |
| return False | |
| def main(): | |
| parser = argparse.ArgumentParser(description='Enhanced HuskyLens Interactive Example') | |
| parser.add_argument('--protocol', choices=['I2C', 'SERIAL'], default='I2C', | |
| help='Connection protocol (default: I2C)') | |
| parser.add_argument('--address', type=lambda x: int(x, 0), default=0x32, | |
| help='I2C address in hex (default: 0x32)') | |
| parser.add_argument('--port', default='/dev/ttyUSB1', | |
| help='Serial port (for SERIAL protocol)') | |
| parser.add_argument('--debug', action='store_true', help='Enable debug mode') | |
| parser.add_argument('--verbose', action='store_true', help='Enable verbose output') | |
| args = parser.parse_args() | |
| print_header("HUSKYLENS INTERACTIVE EXAMPLE") | |
| print(f" Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| print(f" Protocol: {args.protocol}") | |
| if args.protocol == 'I2C': | |
| print(f" I2C Address: 0x{args.address:02X}") | |
| else: | |
| print(f" Serial Port: {args.port}") | |
| print_separator() | |
| # Initialize HuskyLens with enhanced error handling | |
| hl = None | |
| try: | |
| print("\n Initializing HuskyLens...") | |
| if args.protocol == "I2C": | |
| try: | |
| hl = HuskyLensLibrary("I2C", "", address=args.address, | |
| debug=args.debug, verbose=args.verbose, | |
| max_retries=3, timeout=5.0) | |
| except HuskyLensValidationError as e: | |
| print(f" β Validation Error: {e}") | |
| print(" π‘ Check I2C address is in valid range (0x08-0x77)") | |
| return 1 | |
| else: | |
| try: | |
| hl = HuskyLensLibrary("SERIAL", args.port, debug=args.debug, | |
| verbose=args.verbose, max_retries=3, timeout=5.0) | |
| except HuskyLensValidationError as e: | |
| print(f" β Validation Error: {e}") | |
| print(" π‘ Check serial port path and baud rate") | |
| return 1 | |
| print(" β HuskyLens initialized successfully!") | |
| # Perform initial health check | |
| try: | |
| health_ok = hl._health_check() | |
| if health_ok: | |
| print(" β Initial health check: PASSED\n") | |
| else: | |
| print(" β Initial health check: WARNING (continuing anyway)\n") | |
| except Exception as e: | |
| print(f" β Health check failed: {e} (continuing anyway)\n") | |
| except HuskyLensConnectionError as e: | |
| print(f" β Connection Error: {e}") | |
| print("\n π§ Troubleshooting:") | |
| if args.protocol == "I2C": | |
| print(" 1. Check I2C wiring (SDA, SCL, VCC, GND)") | |
| print(" 2. Verify I2C is enabled: sudo raspi-config") | |
| print(" 3. Check detection: sudo i2cdetect -y 1") | |
| print(" 4. Verify I2C address: should show 0x32") | |
| print(" 5. Check permissions: sudo chmod 666 /dev/i2c-1") | |
| else: | |
| print(" 1. Check serial wiring") | |
| print(" 2. Verify port exists: ls -l /dev/tty*") | |
| print(" 3. Check permissions: sudo usermod -a -G dialout $USER") | |
| print(" 4. Verify baud rate matches device") | |
| return 1 | |
| except ImportError as e: | |
| print(f" β Import Error: {e}") | |
| print("\n π§ Install missing dependencies:") | |
| print(" sudo apt install python3-smbus python3-serial python3-pil") | |
| print(" pip3 install --break-system-packages pypng") | |
| return 1 | |
| except Exception as e: | |
| print(f" β Failed to initialize HuskyLens: {e}") | |
| print(f" π‘ Error type: {type(e).__name__}") | |
| print("\n π§ General Troubleshooting:") | |
| print(" 1. Check HuskyLens is powered on") | |
| print(" 2. Verify connection type matches protocol") | |
| print(" 3. Try power cycling the device") | |
| if args.debug: | |
| import traceback | |
| traceback.print_exc() | |
| return 1 | |
| print_menu() | |
| # Main loop | |
| running = True | |
| command_count = 0 | |
| while running: | |
| try: | |
| cmd = input("\n Enter command letter (or 'menu'/'quit'/'help'/'stats'/'alg'): ").strip().lower() | |
| if cmd == "quit" or cmd == "q" or cmd == "exit": | |
| running = False | |
| print("\n Exiting...") | |
| break | |
| elif cmd == "menu" or cmd == "m": | |
| print_menu() | |
| continue | |
| elif cmd == "help" or cmd == "h": | |
| print_help() | |
| continue | |
| elif cmd == "stats" or cmd == "s": | |
| stats = hl.get_stats() | |
| print_header("LIBRARY STATISTICS") | |
| print(f" Protocol: {stats['protocol']}") | |
| print(f" Address: {stats['address']}") | |
| print(f" Commands Sent: {stats['commands_sent']}") | |
| print(f" Errors: {stats['errors']}") | |
| print(f" Uptime: {stats['uptime_seconds']:.2f}s") | |
| print(f" Commands/sec: {stats['commands_per_second']:.2f}") | |
| continue | |
| elif cmd == "alg" or cmd == "a": | |
| print_algorithms() | |
| continue | |
| elif len(cmd) == 1 and cmd.isalpha(): | |
| command_count += 1 | |
| print(f"\n [Command #{command_count}]") | |
| execute_command(hl, cmd) | |
| else: | |
| print(f" β Invalid command: '{cmd}'. Type 'menu' for options or 'quit' to exit.") | |
| except KeyboardInterrupt: | |
| print("\n\n Interrupted by user. Exiting...") | |
| running = False | |
| except EOFError: | |
| print("\n\n EOF detected. Exiting...") | |
| running = False | |
| except Exception as e: | |
| print(f"\n β Unexpected error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| # Final statistics | |
| if command_count > 0: | |
| stats = hl.get_stats() | |
| print_separator() | |
| print(f"\n Session Summary:") | |
| print(f" Commands executed: {command_count}") | |
| print(f" Total commands sent: {stats['commands_sent']}") | |
| print(f" Total errors: {stats['errors']}") | |
| print(f" Session duration: {stats['uptime_seconds']:.2f}s") | |
| print_separator() | |
| print(f"\n Completed: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| return 0 | |
| if __name__ == "__main__": | |
| try: | |
| exit_code = main() | |
| sys.exit(exit_code) | |
| except KeyboardInterrupt: | |
| print("\n\n Interrupted by user. Exiting...") | |
| sys.exit(130) | |
| except Exception as e: | |
| print(f"\n\n Unexpected error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment