Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Why buffer subprocess output rather than write directly to stdout/stderr? #52

Open
jonahgeorge opened this issue Aug 22, 2023 · 2 comments

Comments

@jonahgeorge
Copy link

It appears that the buffering of the subprocess output interferes with shell redirection. Observe a lack of output in either case:

single-beat python test/long_waiting_process.py | less
single-beat python test/long_waiting_process.py > log

I'm curious what the rationale for buffering this output is rather than sending it directly to sys.stdout / sys.stderr via the patch below:

diff --git a/singlebeat/beat.py b/singlebeat/beat.py
index b07c7de..82c1658 100644
--- a/singlebeat/beat.py
+++ b/singlebeat/beat.py
@@ -194,12 +194,6 @@ class Process(object):
         if self.state == State.PAUSED:
             self.state = State.WAITING
 
-    def stdout_read_cb(self, data):
-        sys.stdout.write(data)
-
-    def stderr_read_cb(self, data):
-        sys.stderr.write(data)
-
     async def timer_cb_paused(self):
         pass
 
@@ -324,27 +318,13 @@ class Process(object):
             await self.timer_cb()
             await asyncio.sleep(config.HEARTBEAT_INTERVAL)
 
-    async def _read_stream(self, stream, cb):
-        decoder = codecs.getincrementaldecoder('utf-8')(errors='strict')
-
-        while True:
-            line = await stream.read(100)
-            if line:
-                cb(decoder.decode(line))
-            else:
-                break
-
     async def spawn_process(self):
-        cmd = self.args
-        env = os.environ
-
         self.state = State.RUNNING
+
         try:
             self.sprocess = await asyncio.create_subprocess_exec(
-                *cmd,
-                env=env,
-                stdout=asyncio.subprocess.PIPE,
-                stderr=asyncio.subprocess.PIPE
+                *self.args,
+                env=os.environ
             )
         except FileNotFoundError:
             """
@@ -353,13 +333,9 @@ class Process(object):
             """
             logger.exception("file not found")
             return self.child_exit_cb(1)
+
         try:
-            await asyncio.wait(
-                [
-                    asyncio.create_task(self._read_stream(self.sprocess.stdout, self.forward_stdout)),
-                    asyncio.create_task(self._read_stream(self.sprocess.stderr, self.forward_stderr)),
-                ]
-            )
+            await self.sprocess.wait()
             self.child_exit_cb(self.sprocess.returncode)
         except SystemExit as e:
             os._exit(e.code)
@@ -488,12 +464,6 @@ class Process(object):
         async for msg in self.async_redis.listen():
             self.pubsub_callback(msg)
 
-    def forward_stdout(self, buf):
-        self.stdout_read_cb(buf)
-
-    def forward_stderr(self, buf):
-        self.stderr_read_cb(buf)
-
 
 async def run_process():
     process = Process(sys.argv[1:])
@jonahgeorge
Copy link
Author

Actually, it appears this PR fixes it as well:
#28

@ybrs
Copy link
Owner

ybrs commented Aug 30, 2023

Hi @jonahgeorge

Regarding #28 I don't remember exactly the reason why it was never closed but just looking at it again [1] I think calling "flush" all the time doesn't feel right to me. If that is the case one can use PYTHONUNBUFFERED environment variable. I'd say practically it should have the same effect. So we don't need to call flush. Which was also discussed in #22

For the diff you added here, as I understand you simply want to remove the pipes. So that create_subprocess_exec will pass it to loop.subprocess_exec which already handles the io direction anyways. (Please correct me, if I'm missing something).

For this approach, I don't have many arguments against it. It will remove some code and pass it to the framework. Maybe we can put limit= as an environment variable, in case someone needs to fine tune the buffer.

When I first write this I used pyuv, then moved it to tornado, then moved it to asyncio. I honestly don't remember if there was a specific reason to keep pipes and handle forwarding. It might be simply just because of I wanted to keep it on par with tornado.

I'd say, can you put it into a pull request and we can merge and try ?

[1] https://github.com/ybrs/single-beat/pull/28/files#diff-fe06202dfa089ced8f935d1530b76acf71876400c17783f10a5ecf8c7ac3e67aR181

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants