Skip to content

Instantly share code, notes, and snippets.

@Duroktar
Created May 7, 2017 22:11
Show Gist options
  • Select an option

  • Save Duroktar/b1385aa0951e13d3f041e49745f53993 to your computer and use it in GitHub Desktop.

Select an option

Save Duroktar/b1385aa0951e13d3f041e49745f53993 to your computer and use it in GitHub Desktop.
Simple file watcher I wrote for future transcription to golang
#!/usr/bin/env python
import os
import sys
import subprocess
import time
class Watcher:
_process = None
_history = {}
def __init__(self, command, *targets):
self.command = command
self.targets = targets
def start(self):
self.spawn_process()
def initialize(self):
for source in self.targets:
last_modified = os.path.getmtime(source)
self._history[source] = last_modified
def update(self):
for source in self.targets:
if os.path.getmtime(source) != self._history[source]:
print("-" * 25)
print("<%s> modified. Reloading." % source)
self.spawn_process()
def spawn_process(self):
if self._process is not None and self._process.poll():
self.kill_process()
kwargs = {"shell": True, "start_new_session": True}
self.initialize()
print("Spawning process")
print("-" * 25)
self._process = subprocess.Popen([self.command], **kwargs)
def kill_process(self):
if self._process is None:
return
print("-" * 25)
print("Killing process")
self._process.terminate()
self._process.wait()
self._process = None
def main_loop():
watcher = Watcher(sys.argv[1], *sys.argv[2:])
watcher.start()
try:
while 1:
try:
watcher.update()
except Exception as e:
print(e)
return 1
else:
time.sleep(1)
except KeyboardInterrupt:
watcher.kill_process()
print("Shutting down goreloader.")
return 0
def check_args():
if len(sys.argv) < 3:
if len(sys.argv) < 2:
print("No target file!")
print("No command given!")
sys.exit(1)
fail = False
for each in sys.argv[2:]:
if not os.path.exists(each):
fail = True
print("Target <%s> doesn't exist!" % each)
if fail:
sys.exit(1)
if __name__ == '__main__':
check_args()
try:
sys.exit(main_loop())
except Exception as e:
print("\ngoreloader halted abruptly. Check "
"tasks for hanging threads..\n")
print(e)
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment