Skip to content

Instantly share code, notes, and snippets.

@bart02
Last active January 27, 2022 05:59
Show Gist options
  • Select an option

  • Save bart02/99dac65fcb9feaa95594d965e7c72110 to your computer and use it in GitHub Desktop.

Select an option

Save bart02/99dac65fcb9feaa95594d965e7c72110 to your computer and use it in GitHub Desktop.
Innopolis Open Robotics iUAV 2021
# -*- coding: utf-8 -*-
from __future__ import print_function
import math
# Clover imports
import rospy
from clover import srv
from std_srvs.srv import Trigger
# Computer vision imports
import cv2
from cv_bridge import CvBridge
from sensor_msgs.msg import Image, CameraInfo
from image_geometry import PinholeCameraModel
from pyzbar import pyzbar
# ROS Transforms imports
from geometry_msgs.msg import TransformStamped
from tf2_ros import TransformBroadcaster
rospy.init_node('flight') # Инициализация ноды ROS
# Прокси к сервисам ROS (полет + светодиодная лента)
get_telemetry = rospy.ServiceProxy('get_telemetry', srv.GetTelemetry)
navigate = rospy.ServiceProxy('navigate', srv.Navigate)
set_effect = rospy.ServiceProxy('led/set_effect', srv.SetLEDEffect)
land = rospy.ServiceProxy('land', Trigger)
bridge = CvBridge() # Мост для изображений между ROS и OpenCV
# Информация о камере
cam_geometry = PinholeCameraModel()
cam_geometry.fromCameraInfo(rospy.wait_for_message('main_camera/camera_info', CameraInfo))
def navigate_wait(x=0.0, y=0.0, z=0.0, yaw=float('nan'), speed=5, frame_id='', auto_arm=False, tolerance=0.2):
"""Функция из документации Клевера для полета с ожиданием"""
navigate(x=x, y=y, z=z, yaw=yaw, speed=speed, frame_id=frame_id, auto_arm=auto_arm)
while not rospy.is_shutdown():
telem = get_telemetry(frame_id='navigate_target')
if math.sqrt(telem.x ** 2 + telem.y ** 2 + telem.z ** 2) < tolerance:
break
rospy.sleep(0.2)
def wait_qr():
"""Считывание QR-кода на текущем месте с ожиданием"""
qr_code = [''] # list для доступа внутри вложенной функции
def callback(data):
cv_image = bridge.imgmsg_to_cv2(data, 'bgr8')
barcodes = pyzbar.decode(cv_image)
for barcode in barcodes:
qr_code[0] = barcode.data.encode("utf-8")
qr_sub = rospy.Subscriber('main_camera/image_raw', Image, callback) # Подписчик на топик с изображением
while not qr_code[0]: # Ожидание появления данных в переменной (от callback(data) функции, в парал. потоке)
rospy.sleep(0.1)
qr_sub.unregister() # Отписка от топика
return qr_code[0]
def check_color():
"""Возвращает цвет, пикселей которого больше всего в текущем кадре камеры"""
cv_image = bridge.imgmsg_to_cv2(rospy.wait_for_message('main_camera/image_raw', Image), 'bgr8')
count = {'blue': cv2.countNonZero(cv2.inRange(cv_image, (200, 0, 0), (255, 10, 10))),
'green': cv2.countNonZero(cv2.inRange(cv_image, (0, 200, 0), (10, 255, 10))),
'yellow': cv2.countNonZero(cv2.inRange(cv_image, (0, 100, 100), (10, 255, 255)))}
# print(c)
return max(count, key=count.get)
def land_on_dronepoint():
"""Реализация посадки на дронпоинт с неизвестным местоположением"""
def image_callback(data):
def send_tarnsform(vector):
"""Создание новой СК относительно дронпоинта"""
br = TransformBroadcaster()
t = TransformStamped()
t.header.stamp = rospy.Time.now()
t.header.frame_id = cam_geometry.tfFrame()
t.child_frame_id = 'landing_place'
t.transform.translation.x = vector[0]
t.transform.translation.y = vector[1]
t.transform.translation.z = vector[2]
t.transform.rotation.x = 0
t.transform.rotation.y = 0
t.transform.rotation.z = 0
t.transform.rotation.w = 1
br.sendTransform(t)
cv_image = bridge.imgmsg_to_cv2(data, 'bgr8')
red = cv2.inRange(cv_image, (0, 0, 200), (10, 10, 255))
moments = cv2.moments(red, 1)
dM01 = moments['m01']
dM10 = moments['m10']
dArea = moments['m00']
if dArea > 100:
x = int(dM10 / dArea)
y = int(dM01 / dArea)
send_tarnsform(cam_geometry.projectPixelTo3dRay((x, y)))
# cv2.imshow('red', red)
# cv2.waitKey(1)
# Подписываемся на изображение с камеры, внутри callback происходит создание новой СК и летим в ноль отн. нее
image_sub = rospy.Subscriber('main_camera/image_raw', Image, image_callback)
rospy.sleep(1)
navigate(x=0, y=0, z=-0.8, yaw=float('nan'), speed=0.5, frame_id='landing_place')
rospy.sleep(20)
image_sub.unregister()
def led_on(color_str=''):
"""Зажигает ленту соотв. цветом"""
if color_str == 'blue':
set_effect(r=0, g=0, b=255)
elif color_str == 'green':
set_effect(r=0, g=255, b=0)
elif color_str == 'yellow':
set_effect(r=255, g=255, b=0)
elif color_str == 'red':
set_effect(r=255, g=0, b=0)
else:
set_effect(r=0, g=0, b=0)
def land_here(color_str):
"""Посадка на текущем месте с зажженной светодиодной лентой и послед. взлетом"""
led_on(color_str)
land()
rospy.sleep(10)
led_on()
navigate_wait(0, 0, 2, frame_id='body', auto_arm=True)
dronepoints = [(8, 2), (3, 6), (1, 2)] # Известные координаты трех дронпоинтов
colors = ['' for _ in range(len(dronepoints))] # Массив для хранения цветов дронпоинтов
led_on() # Отключаем ленту
# Взлет
navigate_wait(0, 0, 2, frame_id='body', auto_arm=True)
navigate_wait(0, 0, 1.5, frame_id='aruco_map')
# Считывание и парсинг QR-кода
qr_str = wait_qr()
print(qr_str)
qr = qr_str.split(',')
cur = 0 # Счетчик посещенных дронпоинтов
# Первый цикл. Проходимся по всем дронпоинтам, если цвет совпал, садимся.
for (i, dp) in enumerate(dronepoints):
navigate_wait(dp[0], dp[1], 2, frame_id='aruco_map')
colors[i] = check_color()
# print(colors[i])
if colors[i] == qr[cur]:
print("D{}_Delivery".format(i + 1))
land_here(colors[i])
qr[cur] = None
cur += 1
# Второй цикл. Проходимся по оставшимся дронпоинтам.
for left in qr:
if left is not None and left != 'red':
ind = colors.index(left)
dp = dronepoints[ind]
navigate_wait(dp[0], dp[1], 2, frame_id='aruco_map')
print("D{}_Delivery".format(ind + 1))
land_here(left)
navigate_wait(7, 7, 3, frame_id='aruco_map') # Полет в точку начала поиска 4-го дронпоинта
# Посадка на 4-м дронпоинте
led_on('red')
print("D4_Delivery")
land_on_dronepoint()
led_on()
# Полет в точку старта и посадка
navigate_wait(0, 0, 1, frame_id='body', auto_arm=True)
navigate_wait(0, 0, 2, frame_id='aruco_map')
land()
# Автоматический отчет
print("QR:", qr_str)
for i, c in enumerate(colors):
print("D{} = {}".format(i + 1, c))
# -*- coding: utf-8 -*-
from __future__ import print_function
import math
# Clover imports
import rospy
from clover import srv
from std_srvs.srv import Trigger
# Computer vision imports
import cv2
from cv_bridge import CvBridge
from sensor_msgs.msg import Image, CameraInfo
from image_geometry import PinholeCameraModel
from pyzbar import pyzbar
import face_recognition as fr
# ROS Transforms imports
from geometry_msgs.msg import TransformStamped
from tf2_ros import TransformBroadcaster
# Словарь с подготовленными энкодингами лиц из файлов
faces = {"einstein": fr.face_encodings(fr.load_image_file("einstein.png"))[0],
"jack": fr.face_encodings(fr.load_image_file("jack.png"))[0],
"sherlock": fr.face_encodings(fr.load_image_file("sherlock.png"))[0],
"silverhand": fr.face_encodings(fr.load_image_file("silverhand.png"))[0]}
report = {} # Словарь для хранения отчета
rospy.init_node('flight') # Инициализация ноды ROS
# Прокси к сервисам ROS (полет + светодиодная лента)
get_telemetry = rospy.ServiceProxy('get_telemetry', srv.GetTelemetry)
navigate = rospy.ServiceProxy('navigate', srv.Navigate)
set_effect = rospy.ServiceProxy('led/set_effect', srv.SetLEDEffect)
land = rospy.ServiceProxy('land', Trigger)
bridge = CvBridge() # Мост для изображений между ROS и OpenCV
# Информация о камере
cam_geometry = PinholeCameraModel()
cam_geometry.fromCameraInfo(rospy.wait_for_message('main_camera/camera_info', CameraInfo))
def navigate_wait(x=0, y=0, z=0, yaw=float('nan'), speed=5, frame_id='aruco_map', auto_arm=False, tolerance=0.2):
"""Функция из документации Клевера для полета с ожиданием"""
navigate(x=x, y=y, z=z, yaw=yaw, speed=speed, frame_id=frame_id, auto_arm=auto_arm)
while not rospy.is_shutdown():
telem = get_telemetry(frame_id='navigate_target')
if math.sqrt(telem.x ** 2 + telem.y ** 2 + telem.z ** 2) < tolerance:
break
rospy.sleep(0.2)
def wait_qr():
"""Считывание QR-кода на текущем месте с ожиданием"""
qr_code = [''] # list для доступа внутри вложенной функции
def callback(data):
cv_image = bridge.imgmsg_to_cv2(data, 'bgr8')
barcodes = pyzbar.decode(cv_image)
for barcode in barcodes:
qr_code[0] = barcode.data.encode("utf-8")
qr_sub = rospy.Subscriber('main_camera/image_raw', Image, callback) # Подписчик на топик с изображением
while not qr_code[0]: # Ожидание появления данных в переменной (от callback(data) функции, в парал. потоке)
rospy.sleep(0.1)
qr_sub.unregister() # Отписка от топика
return qr_code[0]
def go_to_dronepoint(color_to):
"""Реализация поиска дронпоинта с неизвестным местоположением"""
def image_callback(data):
"""Преобразует центральный пиксель цветного квадрата в луч в пространстве и создает новую СК"""
def send_tarnsform(vector):
br = TransformBroadcaster()
t = TransformStamped()
t.header.stamp = rospy.Time.now()
t.header.frame_id = cam_geometry.tfFrame()
t.child_frame_id = 'landing_place'
t.transform.translation.x = vector[0]
t.transform.translation.y = vector[1]
t.transform.translation.z = 0
t.transform.rotation.x = 0
t.transform.rotation.y = 0
t.transform.rotation.z = 0
t.transform.rotation.w = 1
br.sendTransform(t)
cv_image = bridge.imgmsg_to_cv2(data, 'bgr8')
if color_to == 'red':
red = cv2.inRange(cv_image, (0, 0, 200), (10, 10, 255))
elif color_to == 'yellow':
red = cv2.inRange(cv_image, (0, 100, 100), (10, 255, 255))
elif color_to == 'blue':
red = cv2.inRange(cv_image, (200, 0, 0), (255, 10, 10))
moments = cv2.moments(red, 1)
dM01 = moments['m01']
dM10 = moments['m10']
dArea = moments['m00']
if dArea > 100:
x = int(dM10 / dArea)
y = int(dM01 / dArea)
send_tarnsform(cam_geometry.projectPixelTo3dRay((x, y)))
# cv2.imshow('red', red)
# cv2.waitKey(1)
# Подписываемся на изображение с камеры, внутри callback происходит создание новой СК и летим в ноль отн. нее
image_sub = rospy.Subscriber('main_camera/image_raw', Image, image_callback)
rospy.sleep(1)
navigate_wait(x=0, y=0, z=0, yaw=float('nan'), speed=1, frame_id='landing_place', tolerance=0.1)
rospy.sleep(5)
image_sub.unregister()
def led_on(color_str=''):
"""Зажигает ленту соотв. цветом"""
if color_str == 'blue':
set_effect(r=0, g=0, b=255)
elif color_str == 'green':
set_effect(r=0, g=255, b=0)
elif color_str == 'yellow':
set_effect(r=255, g=255, b=0)
elif color_str == 'red':
set_effect(r=255, g=0, b=0)
else:
set_effect(r=0, g=0, b=0)
def land_here(color_str, face):
"""Посадка на текущем месте с зажженной светодиодной лентой, сохранением данных для отчета и послед. взлетом"""
global report
telem = get_telemetry('aruco_map')
led_on(color_str)
land()
if telem.y > 5 and telem.x < 5:
report["D1"] = "{}, delivered to {}".format(color_str, face)
print("D1 delivered to", face)
if telem.y > 5 and telem.x > 5:
report["D2"] = "{}, delivered to {}".format(color_str, face)
print("D2 delivered to", face)
if telem.y < 5 and telem.x > 5:
report["D3"] = "{}, delivered to {}".format(color_str, face)
print("D3 delivered to", face)
rospy.sleep(10)
led_on()
navigate_wait(0, 0, 2, frame_id='body', auto_arm=True)
def check_face():
"""Подлетает к лицу (+1м) и возвращает имя (из словаря faces)"""
telem = get_telemetry('aruco_map')
navigate_wait(telem.x + 1, telem.y, telem.z)
navigate_wait(telem.x + 1, telem.y, 1)
cv_image = bridge.imgmsg_to_cv2(rospy.wait_for_message('main_camera/image_raw', Image), 'bgr8')
face_encodings = fr.face_encodings(cv_image, fr.face_locations(cv_image))[0]
results = fr.compare_faces(faces.values(), face_encodings)
navigate_wait(telem.x + 1, telem.y, telem.z)
navigate_wait(telem.x, telem.y, telem.z)
return faces.keys()[results.index(True)]
led_on() # Отключаем ленту
# Взлет
navigate_wait(0, 0, 2, frame_id='body', auto_arm=True)
navigate_wait(0, 0, 1, frame_id='aruco_map')
# Считывание и парсинг QR-кода
qr_str = wait_qr()
print(qr_str)
qr = qr_str.split(',')
# Проходим по каждому цвету в QR
for color in qr:
navigate_wait(5, 5, 4, frame_id='aruco_map') # Летим на центр поля, высота достаточна, чтобы видеть все дронпоинты
go_to_dronepoint(color) # Летим к дронпоинту этого цвета
rospy.sleep(2)
# print("on_point")
telem = get_telemetry('aruco_map')
navigate_wait(telem.x, telem.y, 2) # Занимеаем высоту 2м
face = check_face() # Сохраняем имя получателя
land_here(color, face) # Посадка + сохранения данных
# Полет в точку старта
navigate_wait(0, 0, 1, frame_id='aruco_map')
land()
# Автоматичсекий отчет
print()
print("QR:", qr_str)
print("D1 =", report["D1"])
print("D2 =", report["D2"])
print("D3 =", report["D3"])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment