Last active
November 18, 2025 15:47
-
-
Save ddelange/6517e3267fb74eeee804e3b1490b1c1d to your computer and use it in GitHub Desktop.
Handling the live output stream of a command
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import logging | |
| from collections import deque | |
| from concurrent.futures import ThreadPoolExecutor | |
| from functools import partial | |
| from subprocess import PIPE, CalledProcessError, CompletedProcess, Popen | |
| def stream_command( | |
| args, | |
| *, | |
| stdout_handler=logging.info, | |
| stderr_handler=logging.error, # for a version that redirects stderr to stdout (without multithreading), see https://stackoverflow.com/a/76626021/5511061 | |
| check=True, | |
| text=True, | |
| **kwargs, | |
| ): | |
| """Mimic subprocess.run, while processing the command output in real time ref https://stackoverflow.com/a/76634163/5511061.""" | |
| with ( | |
| Popen(args, text=text, stdout=PIPE, stderr=PIPE, **kwargs) as process, | |
| ThreadPoolExecutor(2) as pool, # two threads to handle the (live) streams separately | |
| ): | |
| exhaust = partial(deque, maxlen=0) # collections recipe: exhaust an iterable at C-speed | |
| exhaust_async = partial(pool.submit, exhaust) # exhaust non-blocking in a background thread | |
| # exhaust both streams (stdout and stderr iterables) asynchronously, each in its own thread | |
| exhaust_async(stdout_handler(line[:-1]) for line in process.stdout) | |
| exhaust_async(stderr_handler(line[:-1]) for line in process.stderr) | |
| retcode = process.poll() # block until both iterables are exhausted (process finished) | |
| if check and retcode: | |
| raise CalledProcessError(retcode, process.args) | |
| return CompletedProcess(process.args, retcode) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import logging | |
| from subprocess import PIPE, STDOUT, CalledProcessError, CompletedProcess, Popen | |
| def stream_command( | |
| args, | |
| *, | |
| stdout_handler=logging.info, # for a separate stderr_handler (using multithreading), see https://stackoverflow.com/a/76634163/5511061 | |
| check=True, | |
| text=True, | |
| **kwargs, | |
| ): | |
| """Mimic subprocess.run, while processing the command output in real time ref https://stackoverflow.com/a/76626021/5511061.""" | |
| with Popen(args, text=text, stdout=PIPE, stderr=STDOUT, **kwargs) as process: | |
| for line in process.stdout: | |
| stdout_handler(line[:-1]) | |
| retcode = process.poll() | |
| if check and retcode: | |
| raise CalledProcessError(retcode, process.args) | |
| return CompletedProcess(process.args, retcode) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| stream_command(["echo", "test"]) | |
| # INFO:root:test | |
| stream_command("cat ./nonexist", shell=True, check=False) | |
| # ERROR:root:cat: ./nonexist: No such file or directory | |
| stream_command(["echo", "test"], stdout_handler=print) | |
| # test | |
| stdout_lines = [] | |
| def handler(line): | |
| print(line) | |
| logging.info(line) | |
| stdout_lines.append(line) | |
| stream_command(["echo", "test"], stdout_handler=handler) | |
| # test | |
| # INFO:root:test | |
| print(stdout_lines) | |
| # ['test'] | |
| # stream to log file | |
| logging.basicConfig( | |
| level=logging.DEBUG, | |
| format="%(asctime)s:%(levelname)-7s %(filename)20s:%(lineno)-4d %(name)s:%(message)s", | |
| filename="./capture.log", | |
| filemode="w", | |
| encoding="utf-8", | |
| ) | |
| logging.info("test from python") | |
| stream_command(["echo", "test from subprocess"]) |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
reference answers: