Created
May 7, 2017 22:11
-
-
Save Duroktar/b1385aa0951e13d3f041e49745f53993 to your computer and use it in GitHub Desktop.
Simple file watcher I wrote for future transcription to golang
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| import os | |
| import sys | |
| import subprocess | |
| import time | |
| class Watcher: | |
| _process = None | |
| _history = {} | |
| def __init__(self, command, *targets): | |
| self.command = command | |
| self.targets = targets | |
| def start(self): | |
| self.spawn_process() | |
| def initialize(self): | |
| for source in self.targets: | |
| last_modified = os.path.getmtime(source) | |
| self._history[source] = last_modified | |
| def update(self): | |
| for source in self.targets: | |
| if os.path.getmtime(source) != self._history[source]: | |
| print("-" * 25) | |
| print("<%s> modified. Reloading." % source) | |
| self.spawn_process() | |
| def spawn_process(self): | |
| if self._process is not None and self._process.poll(): | |
| self.kill_process() | |
| kwargs = {"shell": True, "start_new_session": True} | |
| self.initialize() | |
| print("Spawning process") | |
| print("-" * 25) | |
| self._process = subprocess.Popen([self.command], **kwargs) | |
| def kill_process(self): | |
| if self._process is None: | |
| return | |
| print("-" * 25) | |
| print("Killing process") | |
| self._process.terminate() | |
| self._process.wait() | |
| self._process = None | |
| def main_loop(): | |
| watcher = Watcher(sys.argv[1], *sys.argv[2:]) | |
| watcher.start() | |
| try: | |
| while 1: | |
| try: | |
| watcher.update() | |
| except Exception as e: | |
| print(e) | |
| return 1 | |
| else: | |
| time.sleep(1) | |
| except KeyboardInterrupt: | |
| watcher.kill_process() | |
| print("Shutting down goreloader.") | |
| return 0 | |
| def check_args(): | |
| if len(sys.argv) < 3: | |
| if len(sys.argv) < 2: | |
| print("No target file!") | |
| print("No command given!") | |
| sys.exit(1) | |
| fail = False | |
| for each in sys.argv[2:]: | |
| if not os.path.exists(each): | |
| fail = True | |
| print("Target <%s> doesn't exist!" % each) | |
| if fail: | |
| sys.exit(1) | |
| if __name__ == '__main__': | |
| check_args() | |
| try: | |
| sys.exit(main_loop()) | |
| except Exception as e: | |
| print("\ngoreloader halted abruptly. Check " | |
| "tasks for hanging threads..\n") | |
| print(e) | |
| sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment