Last active
August 4, 2025 00:02
-
-
Save Wolfterro/dfb791b56262ef779c78698e494182ca to your computer and use it in GitHub Desktop.
A simple media server using Python's http.server library with Accept-Ranges support, specially useful for video content with seek support.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import http.server | |
| import socketserver | |
| import os | |
| import re | |
| import urllib | |
| import sys | |
| import argparse | |
| RANGE_BYTES_RE = re.compile(r'bytes=(\d*)-(\d*)?\Z') | |
| class ThreadingHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer): | |
| pass | |
| class RangeRequestNoCacheHTTPRequestHandler(http.server.SimpleHTTPRequestHandler): | |
| def __init__(self, *args, **kwargs): | |
| arguments = add_args() | |
| directory = arguments.dir | |
| super().__init__(*args, directory=directory, **kwargs) | |
| # overriding | |
| def send_head(self): | |
| if 'Range' not in self.headers: | |
| self.range = None | |
| return super().send_head() | |
| try: | |
| self.range = self._parse_range_bytes(self.headers['Range']) | |
| except ValueError as e: | |
| self.send_error(416, 'Requested Range Not Satisfiable') | |
| print(str(e)) | |
| return None | |
| start, end = self.range | |
| path = self.translate_path(self.path) | |
| if os.path.isdir(path): | |
| parts = urllib.parse.urlsplit(self.path) | |
| print(parts) | |
| if not parts.path.endswith('/'): | |
| self.send_response(301) | |
| new_parts = (parts[0], parts[1], parts[2] + '/', | |
| parts[3], parts[4]) | |
| new_url = urllib.parse.urlunsplit(new_parts) | |
| self.send_header("Location", new_url) | |
| self.end_headers() | |
| return None | |
| for index in "index.html", "index.htm": | |
| index = os.path.join(path, index) | |
| if os.path.exists(index): | |
| path = index | |
| break | |
| try: | |
| f = open(path, 'rb') | |
| except IOError: | |
| self.send_error(404, 'Not Found') | |
| return None | |
| self.send_response(206) | |
| ctype = self.guess_type(path) | |
| self.send_header('Content-type', ctype) | |
| self.send_header('Accept-Ranges', 'bytes') | |
| fs = os.fstat(f.fileno()) | |
| file_len = fs[6] | |
| if start is not None and start >= file_len: | |
| self.send_error(416, 'Requested Range Not Satisfiable') | |
| return None | |
| if end is None or end > file_len: | |
| end = file_len | |
| self.send_header('Content-Range', 'bytes %s-%s/%s' % (start, end - 1, file_len)) | |
| self.send_header('Content-Length', str(end - start)) | |
| self.send_header('Last-Modified', self.date_time_string(fs.st_mtime)) | |
| self.end_headers() | |
| return f | |
| def _parse_range_bytes(self, range_bytes): | |
| if range_bytes == '': | |
| return None, None | |
| m = RANGE_BYTES_RE.match(range_bytes) | |
| if not m: | |
| raise ValueError('Invalid byte range %s' % range_bytes) | |
| if m.group(1) == '': | |
| start = None | |
| else: | |
| start = int(m.group(1)) | |
| if m.group(2) == '': | |
| end = None | |
| else: | |
| end = int(m.group(2)) + 1 | |
| return start, end | |
| # overriding | |
| def end_headers(self): | |
| # Code to disable the browser cache | |
| self.send_header('Cache-Control', 'max-age=0') | |
| self.send_header('Expires', '0') | |
| super().end_headers() | |
| # overriding | |
| def copyfile(self, source, outputfile): | |
| try: | |
| if not self.range: | |
| return super().copyfile(source, outputfile) | |
| start, end = self.range | |
| self._copy_range(source, outputfile, start, end) | |
| except BrokenPipeError: | |
| # When you seek a video on your browser | |
| # The browser interrupts the response reception of the video file | |
| # Because this error will occur | |
| # Ignore this | |
| pass | |
| def _copy_range(self, infile, outfile, start, end): | |
| bufsize = 16 * 1024 | |
| if start is not None: | |
| infile.seek(start) | |
| while True: | |
| size = bufsize | |
| if end is not None: | |
| left = end - infile.tell() | |
| if left < size: | |
| size = left | |
| buf = infile.read(size) | |
| if not buf: | |
| break | |
| outfile.write(buf) | |
| def add_args(): | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument('-p', '--port', help='Seleciona a porta que o servidor irá rodar', required=False, type=int, default=8000) | |
| parser.add_argument('-d', '--dir', help='Seleciona o diretório que o servidor irá servir', required=True, type=str) | |
| return parser.parse_args() | |
| def main(args): | |
| print(">> Rodando servidor na porta {}...".format(args.port)) | |
| http_server = ThreadingHTTPServer(('', args.port), RangeRequestNoCacheHTTPRequestHandler) | |
| http_server.serve_forever() | |
| if __name__ == '__main__': | |
| try: | |
| args = add_args() | |
| main(args) | |
| except KeyboardInterrupt: | |
| print("\n>> Saindo...") | |
| sys.exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment