Last active
March 10, 2017 17:15
-
-
Save zemogle/770fd9805e4811bf4e019e311fd84752 to your computer and use it in GitHub Desktop.
Timelapse creation from LCO data
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| *.pyc | |
| secrets.py |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import settings | |
| import os | |
| import click | |
| token = settings.ARCHIVE_TOKEN | |
| headers = {'Authorization': 'Token {}'.format(token)} | |
| def find_reqnums(tracknum): | |
| def find_frames_object(request): | |
| ''' | |
| user_reqs: Full User Request dict, or list of dictionaries, containing individual observation requests | |
| header: provide auth token from the request API so we don't need to get it twice | |
| ''' | |
| frames = [] | |
| frame_urls = [] | |
| archive_headers = get_headers('A') | |
| url = '{}frames/?RLEVEL=&REQNUM={}'.format(settings.ARCHIVE_URL, last_update, supernova.name) | |
| response = requests.get(url, headers=archive_headers).json() | |
| frames = response['results'] | |
| logger.debug("Found {} frames".format(len(frames))) | |
| if not response: | |
| # No frames for this object since last update | |
| return None | |
| for frame in frames: | |
| logger.debug("Looking for frame {}".format(frame['id'])) | |
| thumbnail_url = "{}{}/?width=1000&height=1000&label={}".format(settings.THUMBNAIL_URL, frame['id'], date_obs.strftime("%d %b %Y %H:%M")) | |
| try: | |
| resp = requests.get(thumbnail_url, headers=archive_headers) | |
| frame_urls.append({'id':str(frame['id']), 'url':resp.json()['url'],'date_obs':date_obs}) | |
| except ValueError: | |
| logger.debug("Failed to get thumbnail URL for %s - %s" % (frame_id, resp.status_code)) | |
| logger.debug("Total frames=%s" % (len(frames))) | |
| return frame_urls, last_update | |
| def find_frames(reqnum): | |
| ''' | |
| user_reqs: Full User Request dict, or list of dictionaries, containing individual observation requests | |
| header: provide auth token from the request API so we don't need to get it twice | |
| ''' | |
| frames = [] | |
| logger.debug("User request: %s" % reqnum) | |
| headers = get_headers('A') | |
| for req in user_reqs: | |
| url = '{}frames/?REQNUM={}'.format(settings.ARCHIVE_URL, req) | |
| resp = requests.get(url, headers=headers).json() | |
| if resp.get('detail',''): | |
| logger.error("Connection problem: {}".format(resp['detail'])) | |
| continue | |
| if resp['count'] > 0: | |
| frames += [f['id'] for f in resp['results']] | |
| logger.debug('Frames %s' % len(frames)) | |
| return frames | |
| def get_thumbnails(frames): | |
| headers = get_headers(mode='A') | |
| frame_urls = [] | |
| for frame_id in frames: | |
| thumbnail_url = "{}{}/?width=1000&height=1000".format(settings.THUMBNAIL_URL, frame_id['id']) | |
| try: | |
| resp = requests.get(thumbnail_url, headers=headers) | |
| frame_urls.append({'id':str(frame_id), 'url':resp.json()['url']}) | |
| except ValueError: | |
| logger.debug("Failed to get thumbnail URL for %s - %s" % (frame_id, resp.content)) | |
| logger.debug("Total frames=%s calibrated=%s" % (len(frames), len(frame_urls))) | |
| return frame_urls | |
| def download_frames(supernova_name, frames, download_dir): | |
| current_files = glob.glob(download_dir+"*.jpg") | |
| for frame in frames: | |
| frame_date = frame['date_obs'].strftime("%Y%m%d%H%M%S") | |
| file_name = '%s_%s.jpg' % (supernova_name, frame_date) | |
| full_filename = os.path.join(download_dir, file_name) | |
| if full_filename in current_files: | |
| logger.debug("Frame {} already present".format(file_name)) | |
| continue | |
| with open(full_filename, "wb") as f: | |
| logger.debug("Downloading %s" % file_name) | |
| response = requests.get(frame['url'], stream=True) | |
| logger.debug(frame['url']) | |
| if response.status_code != 200: | |
| logger.debug('Failed to download: %s' % response.status_code) | |
| return False | |
| total_length = response.headers.get('content-length') | |
| if total_length is None: | |
| f.write(response.content) | |
| else: | |
| for data in response.iter_content(): | |
| f.write(data) | |
| f.close() | |
| return True | |
| def make_timelapse(request): | |
| logger.debug('Making timelapse for %s' % reqnum) | |
| path = "%s_*.jpg" % os.path.join(settings.MEDIA_ROOT,reqnum) | |
| files = glob.glob(path) | |
| if len(files) > 0: | |
| outfile = '%s%s.mp4' % (settings.MEDIA_ROOT) | |
| video_options = "ffmpeg -framerate 10 -pattern_type glob -i '{}' -vf 'scale=2*iw:-1, crop=iw/2:ih/2' -s 696x520 -vcodec libx264 -pix_fmt yuv420p {} -y".format(path, outfile) | |
| subprocess.call(video_options, shell=True) | |
| return len(files) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ARCHIVE_API_URL = 'https://archive-api.lco.global/frames/' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment