Skip to content

Instantly share code, notes, and snippets.

@aswinkumar1999
Created May 20, 2023 06:41
Show Gist options
  • Select an option

  • Save aswinkumar1999/5e08b3e012671151ae220b980142e5fe to your computer and use it in GitHub Desktop.

Select an option

Save aswinkumar1999/5e08b3e012671151ae220b980142e5fe to your computer and use it in GitHub Desktop.
Track Flights on top of IIT Madras and sends a notification to mobile.
# Track flights over IIT Madras
# Using OpenSKY API and sends notification via IFTTT
# Import required libraries
from bs4 import BeautifulSoup # For HTML parsing
from selenium import webdriver # For web scraping
import requests # For making HTTP requests
from selenium.webdriver.common.keys import Keys # For keyboard actions in Selenium
from opensky_api import OpenSkyApi # For accessing the OpenSky Network API
import re # For regular expressions
import wget # For downloading files
import os # For operating system related tasks
import time # For adding delays
from ifttt_webhook import IftttWebhook # For making webhook requests to IFTTT
# Webhooks API
IFTTT_KEY = '###############################' # IFTTT API key
ifttt = IftttWebhook(IFTTT_KEY) # Initialize IFTTT webhook
api = OpenSkyApi() # Initialize OpenSky API
def push_notification(call_sign):
# Construct URL for flight details
url = 'https://uk.flightaware.com/live/flight/' + call_sign
print(url)
wget.download(url) # Download the webpage
url = 'file:///home/aswinkumar99/opensky-api/' + call_sign # Local file path
options = webdriver.ChromeOptions()
options.add_argument('headless') # Run Chrome in headless mode
options.add_argument('window-size=1200x600')
browser = webdriver.Chrome(options=options) # Launch Chrome browser
browser.get(url) # Load the webpage
time.sleep(1) # Wait for page to load
elem = browser.find_element_by_tag_name("body") # Find body element
html = browser.page_source # Get page source
main_page_content = BeautifulSoup(html) # Parse page content using BeautifulSoup
browser.close() # Close the browser
air_dict = {} # Dictionary to store flight information
# Get all information and store it in a dictionary
result = re.search("addService\(googletag.pubads\(\)\)(.*).setTargeting\('userclass', ''\)", str(main_page_content))
x = result.group(1).split(".setTargeting(")
x.pop(0)
temp_list = []
for i in x:
temp_list = i[:-1].split(", ")
air_dict[temp_list[0][1:-1]] = temp_list[1][1:-1]
# Get Flight no and Airline name and add to dict
result2 = re.search("title(.*)/title", str(main_page_content))
temp_list = result2.group(1)[4:-46].split(" ")
air_dict['flight_number'] = temp_list[0]
try:
air_dict['airline_name'] = temp_list[2]
except:
air_dict['airline_name'] = "Private Jet"
os.remove(call_sign) # Remove the downloaded file
return air_dict
callsign_list = [] # List to store callsigns
time_list = [] # List to store timestamps
while True:
# Insti:
# Min 12.982919, 80.196334
# Max 13.011269, 80.259084
try:
states = api.get_states(bbox=(12.982919, 13.011269, 80.196334, 80.259084)) # Get states within the specified bounding box
for s in states.states:
# Check if callsign already present in list
if s.callsign not in callsign_list:
callsign_list.append(s.callsign) # Add callsign to list
time_list.append(time.time()) # Add timestamp to list
print("callsign: %r, long: %r, lat: %r" % (s.callsign, s.longitude, s.latitude), end="\t")
air_dict = push_notification(s.callsign.replace(" ", "")) # Get flight information
try:
# Trigger IFTTT webhook with flight information
ifttt.trigger('flight_above', value1=air_dict.get('airline_name', " ") + " " + air_dict.get('flight_number', " "),
value2=air_dict.get('origin_IATA', " ") + "--->" + air_dict.get('destination_IATA', " "),
value3=air_dict.get('aircraft_make', " ") + " " + air_dict.get('aircraft_model', " ")
+ " " + air_dict.get('engine_type', " "))
except:
callsign_list.pop() # Remove callsign from list
time_list.pop() # Remove timestamp from list
# Clean up the lists
del_list = []
for i in range(len(time_list)):
if time.time() - time_list[i] > 600: # If more than 10 minutes have passed
del_list.append(i) # Add index to delete list
for i in del_list:
callsign_list.pop(i) # Remove callsign at the specified index
time_list.pop(i) # Remove timestamp at the specified index
time.sleep(20) # Delay for 20 seconds
except:
time.sleep(20) # Delay for 20 seconds
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment