Skip to content

Instantly share code, notes, and snippets.

@ormaaj
Created November 16, 2025 11:00
Show Gist options
  • Select an option

  • Save ormaaj/5135d8c40d29e13291e56bd000495996 to your computer and use it in GitHub Desktop.

Select an option

Save ormaaj/5135d8c40d29e13291e56bd000495996 to your computer and use it in GitHub Desktop.
sendfile is not supported for transport <_UnixWritePipeTransport fd=7 idle bufsize=0>
Traceback (most recent call last):
File "/usr/bin/entry/entry_point", line 109, in _run_python
exec(code, exec_scope) # pylint: disable=exec-used
^^^^^^^^^^^^^^^^^^^^^^
File "<string>", line 187, in <module>
File "/usr/local/lib/python3.12/asyncio/runners.py", line 194, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "<string>", line 183, in main
File "<string>", line 163, in writer_with_sendfile
File "/usr/local/lib/python3.12/asyncio/base_events.py", line 1217, in sendfile
raise RuntimeError(
RuntimeError: sendfile is not supported for transport <_UnixWritePipeTransport fd=7 idle bufsize=0>
import asyncio
import os
import random
import struct
r_notify, w_notify = os.pipe()
r_data, w_data = os.pipe()
os.set_blocking(w_notify, False)
os.set_blocking(w_data, False)
class SiblingReaderProtocol(asyncio.Protocol):
def __init__(self):
self.data_transport = None
self.data_reader = None
def connection_made(self, transport):
print("[Reader] Notify pipe connected.")
def data_received(self, notify_bytes):
chunk_size, = struct.unpack('!Q', notify_bytes)
print(f"[Read] Got notification. Expected {chunk_size} bytes.")
asyncio.create_task(self.read_data_chunk(chunk_size))
async def read_data_chunk(self, size):
try:
chunk = await self.data_reader.readexactly(size)
print(f"[Read] read {len(chunk)} bytes.")
except asyncio.IncompleteReadError as e:
print(f"[Read] Incomplete read.")
def connection_lost(self, exc):
print("[Read] Notify pipe closed.")
if self.data_transport:
self.data_transport.close()
async def writer_with_sendfile(notify_pipe_fd, data_pipe_fd):
loop = asyncio.get_running_loop()
notify_transport, _ = await loop.connect_write_pipe(
asyncio.Protocol, os.fdopen(notify_pipe_fd, 'wb')
)
data_transport, _ = await loop.connect_write_pipe(
asyncio.Protocol, os.fdopen(data_pipe_fd, 'wb')
)
for i in range(3):
await asyncio.sleep(1.5)
chunk_size = random.randint(30000, 60000)
data_chunk = os.urandom(chunk_size)
print(f"\n[Write] chunk of {len(data_chunk)} bytes.")
r_temp, w_temp = os.pipe()
os.write(w_temp, data_chunk)
os.close(w_temp)
notify_transport.write(struct.pack('!Q', len(data_chunk)))
print("[Write] Send data...")
source_file = os.fdopen(r_temp, 'rb')
try:
await loop.sendfile(data_transport, source_file, count=len(data_chunk))
print("[Write] sendfile complete.")
finally:
source_file.close()
print("\n[Write] Done!")
notify_transport.close()
data_transport.close()
async def main():
loop = asyncio.get_running_loop()
reader_protocol = SiblingReaderProtocol()
await loop.connect_read_pipe(lambda: reader_protocol, os.fdopen(r_notify, 'rb'))
data_reader = asyncio.StreamReader()
data_reader_protocol = asyncio.StreamReaderProtocol(data_reader)
data_transport, _ = await loop.connect_read_pipe(lambda: data_reader_protocol, os.fdopen(r_data, 'rb'))
reader_protocol.data_transport = data_transport
reader_protocol.data_reader = data_reader
writer_task = asyncio.create_task(writer_with_sendfile(w_notify, w_data))
await writer_task
await asyncio.sleep(0.1)
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment