Skip to content

Instantly share code, notes, and snippets.

@ProgramRipper
Created January 5, 2023 13:49
Show Gist options
  • Select an option

  • Save ProgramRipper/fdf6bbfb0c62db6c0b55fb1c6dbabdc8 to your computer and use it in GitHub Desktop.

Select an option

Save ProgramRipper/fdf6bbfb0c62db6c0b55fb1c6dbabdc8 to your computer and use it in GitHub Desktop.
import asyncio
from broadcastlv.command import DanmuMsg
from broadcastlv.connection import Connection
from broadcastlv.event import Auth, Heartbeat, NeedData
async def heartbeat(conn: Connection, writer: asyncio.StreamWriter):
while True:
await asyncio.sleep(30)
writer.write(conn.send(Heartbeat(b"")))
await writer.drain()
async def listen(roomid: int):
reader, writer = await asyncio.open_connection(
"broadcastlv.chat.bilibili.com", 2243
)
conn = Connection()
writer.write(conn.send(Auth(roomid, protover=3)))
await writer.drain()
print(f"Connected to {roomid}")
task = asyncio.create_task(heartbeat(conn, writer))
while True:
while isinstance(event := conn.next_event(), NeedData):
conn.receive_data(await reader.read(event.size))
print(event)
async def main():
await listen(114514)
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment