Last active
January 27, 2022 05:59
-
-
Save bart02/99dac65fcb9feaa95594d965e7c72110 to your computer and use it in GitHub Desktop.
Innopolis Open Robotics iUAV 2021
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # -*- coding: utf-8 -*- | |
| from __future__ import print_function | |
| import math | |
| # Clover imports | |
| import rospy | |
| from clover import srv | |
| from std_srvs.srv import Trigger | |
| # Computer vision imports | |
| import cv2 | |
| from cv_bridge import CvBridge | |
| from sensor_msgs.msg import Image, CameraInfo | |
| from image_geometry import PinholeCameraModel | |
| from pyzbar import pyzbar | |
| # ROS Transforms imports | |
| from geometry_msgs.msg import TransformStamped | |
| from tf2_ros import TransformBroadcaster | |
| rospy.init_node('flight') # Инициализация ноды ROS | |
| # Прокси к сервисам ROS (полет + светодиодная лента) | |
| get_telemetry = rospy.ServiceProxy('get_telemetry', srv.GetTelemetry) | |
| navigate = rospy.ServiceProxy('navigate', srv.Navigate) | |
| set_effect = rospy.ServiceProxy('led/set_effect', srv.SetLEDEffect) | |
| land = rospy.ServiceProxy('land', Trigger) | |
| bridge = CvBridge() # Мост для изображений между ROS и OpenCV | |
| # Информация о камере | |
| cam_geometry = PinholeCameraModel() | |
| cam_geometry.fromCameraInfo(rospy.wait_for_message('main_camera/camera_info', CameraInfo)) | |
| def navigate_wait(x=0.0, y=0.0, z=0.0, yaw=float('nan'), speed=5, frame_id='', auto_arm=False, tolerance=0.2): | |
| """Функция из документации Клевера для полета с ожиданием""" | |
| navigate(x=x, y=y, z=z, yaw=yaw, speed=speed, frame_id=frame_id, auto_arm=auto_arm) | |
| while not rospy.is_shutdown(): | |
| telem = get_telemetry(frame_id='navigate_target') | |
| if math.sqrt(telem.x ** 2 + telem.y ** 2 + telem.z ** 2) < tolerance: | |
| break | |
| rospy.sleep(0.2) | |
| def wait_qr(): | |
| """Считывание QR-кода на текущем месте с ожиданием""" | |
| qr_code = [''] # list для доступа внутри вложенной функции | |
| def callback(data): | |
| cv_image = bridge.imgmsg_to_cv2(data, 'bgr8') | |
| barcodes = pyzbar.decode(cv_image) | |
| for barcode in barcodes: | |
| qr_code[0] = barcode.data.encode("utf-8") | |
| qr_sub = rospy.Subscriber('main_camera/image_raw', Image, callback) # Подписчик на топик с изображением | |
| while not qr_code[0]: # Ожидание появления данных в переменной (от callback(data) функции, в парал. потоке) | |
| rospy.sleep(0.1) | |
| qr_sub.unregister() # Отписка от топика | |
| return qr_code[0] | |
| def check_color(): | |
| """Возвращает цвет, пикселей которого больше всего в текущем кадре камеры""" | |
| cv_image = bridge.imgmsg_to_cv2(rospy.wait_for_message('main_camera/image_raw', Image), 'bgr8') | |
| count = {'blue': cv2.countNonZero(cv2.inRange(cv_image, (200, 0, 0), (255, 10, 10))), | |
| 'green': cv2.countNonZero(cv2.inRange(cv_image, (0, 200, 0), (10, 255, 10))), | |
| 'yellow': cv2.countNonZero(cv2.inRange(cv_image, (0, 100, 100), (10, 255, 255)))} | |
| # print(c) | |
| return max(count, key=count.get) | |
| def land_on_dronepoint(): | |
| """Реализация посадки на дронпоинт с неизвестным местоположением""" | |
| def image_callback(data): | |
| def send_tarnsform(vector): | |
| """Создание новой СК относительно дронпоинта""" | |
| br = TransformBroadcaster() | |
| t = TransformStamped() | |
| t.header.stamp = rospy.Time.now() | |
| t.header.frame_id = cam_geometry.tfFrame() | |
| t.child_frame_id = 'landing_place' | |
| t.transform.translation.x = vector[0] | |
| t.transform.translation.y = vector[1] | |
| t.transform.translation.z = vector[2] | |
| t.transform.rotation.x = 0 | |
| t.transform.rotation.y = 0 | |
| t.transform.rotation.z = 0 | |
| t.transform.rotation.w = 1 | |
| br.sendTransform(t) | |
| cv_image = bridge.imgmsg_to_cv2(data, 'bgr8') | |
| red = cv2.inRange(cv_image, (0, 0, 200), (10, 10, 255)) | |
| moments = cv2.moments(red, 1) | |
| dM01 = moments['m01'] | |
| dM10 = moments['m10'] | |
| dArea = moments['m00'] | |
| if dArea > 100: | |
| x = int(dM10 / dArea) | |
| y = int(dM01 / dArea) | |
| send_tarnsform(cam_geometry.projectPixelTo3dRay((x, y))) | |
| # cv2.imshow('red', red) | |
| # cv2.waitKey(1) | |
| # Подписываемся на изображение с камеры, внутри callback происходит создание новой СК и летим в ноль отн. нее | |
| image_sub = rospy.Subscriber('main_camera/image_raw', Image, image_callback) | |
| rospy.sleep(1) | |
| navigate(x=0, y=0, z=-0.8, yaw=float('nan'), speed=0.5, frame_id='landing_place') | |
| rospy.sleep(20) | |
| image_sub.unregister() | |
| def led_on(color_str=''): | |
| """Зажигает ленту соотв. цветом""" | |
| if color_str == 'blue': | |
| set_effect(r=0, g=0, b=255) | |
| elif color_str == 'green': | |
| set_effect(r=0, g=255, b=0) | |
| elif color_str == 'yellow': | |
| set_effect(r=255, g=255, b=0) | |
| elif color_str == 'red': | |
| set_effect(r=255, g=0, b=0) | |
| else: | |
| set_effect(r=0, g=0, b=0) | |
| def land_here(color_str): | |
| """Посадка на текущем месте с зажженной светодиодной лентой и послед. взлетом""" | |
| led_on(color_str) | |
| land() | |
| rospy.sleep(10) | |
| led_on() | |
| navigate_wait(0, 0, 2, frame_id='body', auto_arm=True) | |
| dronepoints = [(8, 2), (3, 6), (1, 2)] # Известные координаты трех дронпоинтов | |
| colors = ['' for _ in range(len(dronepoints))] # Массив для хранения цветов дронпоинтов | |
| led_on() # Отключаем ленту | |
| # Взлет | |
| navigate_wait(0, 0, 2, frame_id='body', auto_arm=True) | |
| navigate_wait(0, 0, 1.5, frame_id='aruco_map') | |
| # Считывание и парсинг QR-кода | |
| qr_str = wait_qr() | |
| print(qr_str) | |
| qr = qr_str.split(',') | |
| cur = 0 # Счетчик посещенных дронпоинтов | |
| # Первый цикл. Проходимся по всем дронпоинтам, если цвет совпал, садимся. | |
| for (i, dp) in enumerate(dronepoints): | |
| navigate_wait(dp[0], dp[1], 2, frame_id='aruco_map') | |
| colors[i] = check_color() | |
| # print(colors[i]) | |
| if colors[i] == qr[cur]: | |
| print("D{}_Delivery".format(i + 1)) | |
| land_here(colors[i]) | |
| qr[cur] = None | |
| cur += 1 | |
| # Второй цикл. Проходимся по оставшимся дронпоинтам. | |
| for left in qr: | |
| if left is not None and left != 'red': | |
| ind = colors.index(left) | |
| dp = dronepoints[ind] | |
| navigate_wait(dp[0], dp[1], 2, frame_id='aruco_map') | |
| print("D{}_Delivery".format(ind + 1)) | |
| land_here(left) | |
| navigate_wait(7, 7, 3, frame_id='aruco_map') # Полет в точку начала поиска 4-го дронпоинта | |
| # Посадка на 4-м дронпоинте | |
| led_on('red') | |
| print("D4_Delivery") | |
| land_on_dronepoint() | |
| led_on() | |
| # Полет в точку старта и посадка | |
| navigate_wait(0, 0, 1, frame_id='body', auto_arm=True) | |
| navigate_wait(0, 0, 2, frame_id='aruco_map') | |
| land() | |
| # Автоматический отчет | |
| print("QR:", qr_str) | |
| for i, c in enumerate(colors): | |
| print("D{} = {}".format(i + 1, c)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # -*- coding: utf-8 -*- | |
| from __future__ import print_function | |
| import math | |
| # Clover imports | |
| import rospy | |
| from clover import srv | |
| from std_srvs.srv import Trigger | |
| # Computer vision imports | |
| import cv2 | |
| from cv_bridge import CvBridge | |
| from sensor_msgs.msg import Image, CameraInfo | |
| from image_geometry import PinholeCameraModel | |
| from pyzbar import pyzbar | |
| import face_recognition as fr | |
| # ROS Transforms imports | |
| from geometry_msgs.msg import TransformStamped | |
| from tf2_ros import TransformBroadcaster | |
| # Словарь с подготовленными энкодингами лиц из файлов | |
| faces = {"einstein": fr.face_encodings(fr.load_image_file("einstein.png"))[0], | |
| "jack": fr.face_encodings(fr.load_image_file("jack.png"))[0], | |
| "sherlock": fr.face_encodings(fr.load_image_file("sherlock.png"))[0], | |
| "silverhand": fr.face_encodings(fr.load_image_file("silverhand.png"))[0]} | |
| report = {} # Словарь для хранения отчета | |
| rospy.init_node('flight') # Инициализация ноды ROS | |
| # Прокси к сервисам ROS (полет + светодиодная лента) | |
| get_telemetry = rospy.ServiceProxy('get_telemetry', srv.GetTelemetry) | |
| navigate = rospy.ServiceProxy('navigate', srv.Navigate) | |
| set_effect = rospy.ServiceProxy('led/set_effect', srv.SetLEDEffect) | |
| land = rospy.ServiceProxy('land', Trigger) | |
| bridge = CvBridge() # Мост для изображений между ROS и OpenCV | |
| # Информация о камере | |
| cam_geometry = PinholeCameraModel() | |
| cam_geometry.fromCameraInfo(rospy.wait_for_message('main_camera/camera_info', CameraInfo)) | |
| def navigate_wait(x=0, y=0, z=0, yaw=float('nan'), speed=5, frame_id='aruco_map', auto_arm=False, tolerance=0.2): | |
| """Функция из документации Клевера для полета с ожиданием""" | |
| navigate(x=x, y=y, z=z, yaw=yaw, speed=speed, frame_id=frame_id, auto_arm=auto_arm) | |
| while not rospy.is_shutdown(): | |
| telem = get_telemetry(frame_id='navigate_target') | |
| if math.sqrt(telem.x ** 2 + telem.y ** 2 + telem.z ** 2) < tolerance: | |
| break | |
| rospy.sleep(0.2) | |
| def wait_qr(): | |
| """Считывание QR-кода на текущем месте с ожиданием""" | |
| qr_code = [''] # list для доступа внутри вложенной функции | |
| def callback(data): | |
| cv_image = bridge.imgmsg_to_cv2(data, 'bgr8') | |
| barcodes = pyzbar.decode(cv_image) | |
| for barcode in barcodes: | |
| qr_code[0] = barcode.data.encode("utf-8") | |
| qr_sub = rospy.Subscriber('main_camera/image_raw', Image, callback) # Подписчик на топик с изображением | |
| while not qr_code[0]: # Ожидание появления данных в переменной (от callback(data) функции, в парал. потоке) | |
| rospy.sleep(0.1) | |
| qr_sub.unregister() # Отписка от топика | |
| return qr_code[0] | |
| def go_to_dronepoint(color_to): | |
| """Реализация поиска дронпоинта с неизвестным местоположением""" | |
| def image_callback(data): | |
| """Преобразует центральный пиксель цветного квадрата в луч в пространстве и создает новую СК""" | |
| def send_tarnsform(vector): | |
| br = TransformBroadcaster() | |
| t = TransformStamped() | |
| t.header.stamp = rospy.Time.now() | |
| t.header.frame_id = cam_geometry.tfFrame() | |
| t.child_frame_id = 'landing_place' | |
| t.transform.translation.x = vector[0] | |
| t.transform.translation.y = vector[1] | |
| t.transform.translation.z = 0 | |
| t.transform.rotation.x = 0 | |
| t.transform.rotation.y = 0 | |
| t.transform.rotation.z = 0 | |
| t.transform.rotation.w = 1 | |
| br.sendTransform(t) | |
| cv_image = bridge.imgmsg_to_cv2(data, 'bgr8') | |
| if color_to == 'red': | |
| red = cv2.inRange(cv_image, (0, 0, 200), (10, 10, 255)) | |
| elif color_to == 'yellow': | |
| red = cv2.inRange(cv_image, (0, 100, 100), (10, 255, 255)) | |
| elif color_to == 'blue': | |
| red = cv2.inRange(cv_image, (200, 0, 0), (255, 10, 10)) | |
| moments = cv2.moments(red, 1) | |
| dM01 = moments['m01'] | |
| dM10 = moments['m10'] | |
| dArea = moments['m00'] | |
| if dArea > 100: | |
| x = int(dM10 / dArea) | |
| y = int(dM01 / dArea) | |
| send_tarnsform(cam_geometry.projectPixelTo3dRay((x, y))) | |
| # cv2.imshow('red', red) | |
| # cv2.waitKey(1) | |
| # Подписываемся на изображение с камеры, внутри callback происходит создание новой СК и летим в ноль отн. нее | |
| image_sub = rospy.Subscriber('main_camera/image_raw', Image, image_callback) | |
| rospy.sleep(1) | |
| navigate_wait(x=0, y=0, z=0, yaw=float('nan'), speed=1, frame_id='landing_place', tolerance=0.1) | |
| rospy.sleep(5) | |
| image_sub.unregister() | |
| def led_on(color_str=''): | |
| """Зажигает ленту соотв. цветом""" | |
| if color_str == 'blue': | |
| set_effect(r=0, g=0, b=255) | |
| elif color_str == 'green': | |
| set_effect(r=0, g=255, b=0) | |
| elif color_str == 'yellow': | |
| set_effect(r=255, g=255, b=0) | |
| elif color_str == 'red': | |
| set_effect(r=255, g=0, b=0) | |
| else: | |
| set_effect(r=0, g=0, b=0) | |
| def land_here(color_str, face): | |
| """Посадка на текущем месте с зажженной светодиодной лентой, сохранением данных для отчета и послед. взлетом""" | |
| global report | |
| telem = get_telemetry('aruco_map') | |
| led_on(color_str) | |
| land() | |
| if telem.y > 5 and telem.x < 5: | |
| report["D1"] = "{}, delivered to {}".format(color_str, face) | |
| print("D1 delivered to", face) | |
| if telem.y > 5 and telem.x > 5: | |
| report["D2"] = "{}, delivered to {}".format(color_str, face) | |
| print("D2 delivered to", face) | |
| if telem.y < 5 and telem.x > 5: | |
| report["D3"] = "{}, delivered to {}".format(color_str, face) | |
| print("D3 delivered to", face) | |
| rospy.sleep(10) | |
| led_on() | |
| navigate_wait(0, 0, 2, frame_id='body', auto_arm=True) | |
| def check_face(): | |
| """Подлетает к лицу (+1м) и возвращает имя (из словаря faces)""" | |
| telem = get_telemetry('aruco_map') | |
| navigate_wait(telem.x + 1, telem.y, telem.z) | |
| navigate_wait(telem.x + 1, telem.y, 1) | |
| cv_image = bridge.imgmsg_to_cv2(rospy.wait_for_message('main_camera/image_raw', Image), 'bgr8') | |
| face_encodings = fr.face_encodings(cv_image, fr.face_locations(cv_image))[0] | |
| results = fr.compare_faces(faces.values(), face_encodings) | |
| navigate_wait(telem.x + 1, telem.y, telem.z) | |
| navigate_wait(telem.x, telem.y, telem.z) | |
| return faces.keys()[results.index(True)] | |
| led_on() # Отключаем ленту | |
| # Взлет | |
| navigate_wait(0, 0, 2, frame_id='body', auto_arm=True) | |
| navigate_wait(0, 0, 1, frame_id='aruco_map') | |
| # Считывание и парсинг QR-кода | |
| qr_str = wait_qr() | |
| print(qr_str) | |
| qr = qr_str.split(',') | |
| # Проходим по каждому цвету в QR | |
| for color in qr: | |
| navigate_wait(5, 5, 4, frame_id='aruco_map') # Летим на центр поля, высота достаточна, чтобы видеть все дронпоинты | |
| go_to_dronepoint(color) # Летим к дронпоинту этого цвета | |
| rospy.sleep(2) | |
| # print("on_point") | |
| telem = get_telemetry('aruco_map') | |
| navigate_wait(telem.x, telem.y, 2) # Занимеаем высоту 2м | |
| face = check_face() # Сохраняем имя получателя | |
| land_here(color, face) # Посадка + сохранения данных | |
| # Полет в точку старта | |
| navigate_wait(0, 0, 1, frame_id='aruco_map') | |
| land() | |
| # Автоматичсекий отчет | |
| print() | |
| print("QR:", qr_str) | |
| print("D1 =", report["D1"]) | |
| print("D2 =", report["D2"]) | |
| print("D3 =", report["D3"]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment