Created
January 4, 2025 13:56
-
-
Save OminousIndustries/67803e730ecc038eb1a8ac5aa527f838 to your computer and use it in GitHub Desktop.
Rpi AI Detection Notification Script
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import gi | |
| gi.require_version('Gst', '1.0') | |
| from gi.repository import Gst, GLib | |
| import os | |
| import numpy as np | |
| import cv2 | |
| import hailo | |
| import time | |
| import telebot | |
| from hailo_rpi_common import ( | |
| get_caps_from_pad, | |
| get_numpy_from_buffer, | |
| app_callback_class, | |
| ) | |
| from detection_pipeline import GStreamerDetectionApp | |
| # ----------------------------------------------------------------------------------------------- | |
| # User-defined class to be used in the callback function | |
| # ----------------------------------------------------------------------------------------------- | |
| # Inheritance from the app_callback_class | |
| class user_app_callback_class(app_callback_class): | |
| def __init__(self): | |
| super().__init__() | |
| self.new_variable = 42 # New variable example | |
| # Telegram configuration | |
| self.bot = telebot.TeleBot('AccessToken') | |
| self.chat_id = 'id' | |
| self.last_notification_time = 0 | |
| self.cooldown_seconds = 30 | |
| def new_function(self): # New function example | |
| return "The meaning of life is: " | |
| def send_notification(self): | |
| current_time = time.time() | |
| if current_time - self.last_notification_time >= self.cooldown_seconds: | |
| try: | |
| self.bot.send_message(self.chat_id, "🚨 Person Detected!") | |
| self.last_notification_time = current_time | |
| print("Telegram notification sent!") | |
| except Exception as e: | |
| print(f"Error sending Telegram notification: {str(e)}") | |
| # ----------------------------------------------------------------------------------------------- | |
| # User-defined callback function | |
| # ----------------------------------------------------------------------------------------------- | |
| # This is the callback function that will be called when data is available from the pipeline | |
| def app_callback(pad, info, user_data): | |
| # Get the GstBuffer from the probe info | |
| buffer = info.get_buffer() | |
| # Check if the buffer is valid | |
| if buffer is None: | |
| return Gst.PadProbeReturn.OK | |
| # Using the user_data to count the number of frames | |
| user_data.increment() | |
| string_to_print = f"Frame count: {user_data.get_count()}\n" | |
| # Get the caps from the pad | |
| format, width, height = get_caps_from_pad(pad) | |
| # If the user_data.use_frame is set to True, we can get the video frame from the buffer | |
| frame = None | |
| if user_data.use_frame and format is not None and width is not None and height is not None: | |
| # Get video frame | |
| frame = get_numpy_from_buffer(buffer, format, width, height) | |
| # Get the detections from the buffer | |
| roi = hailo.get_roi_from_buffer(buffer) | |
| detections = roi.get_objects_typed(hailo.HAILO_DETECTION) | |
| # Parse the detections | |
| detection_count = 0 | |
| for detection in detections: | |
| label = detection.get_label() | |
| bbox = detection.get_bbox() | |
| confidence = detection.get_confidence() | |
| if label == "person": | |
| string_to_print += f"Detection: {label} {confidence:.2f}\n" | |
| detection_count += 1 | |
| # Send Telegram notification when person is detected | |
| user_data.send_notification() | |
| if user_data.use_frame: | |
| # Note: using imshow will not work here, as the callback function is not running in the main thread | |
| # Let's print the detection count to the frame | |
| cv2.putText(frame, f"Detections: {detection_count}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) | |
| # Example of how to use the new_variable and new_function from the user_data | |
| # Let's print the new_variable and the result of the new_function to the frame | |
| cv2.putText(frame, f"{user_data.new_function()} {user_data.new_variable}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) | |
| # Convert the frame to BGR | |
| frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) | |
| user_data.set_frame(frame) | |
| print(string_to_print) | |
| return Gst.PadProbeReturn.OK | |
| if __name__ == "__main__": | |
| # Create an instance of the user app callback class | |
| user_data = user_app_callback_class() | |
| app = GStreamerDetectionApp(app_callback, user_data) | |
| app.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment