Skip to content

Instantly share code, notes, and snippets.

@voluntas
Created December 30, 2025 06:57
Show Gist options
  • Select an option

  • Save voluntas/b25e28134b60a7731802b5d2bc17594f to your computer and use it in GitHub Desktop.

Select an option

Save voluntas/b25e28134b60a7731802b5d2bc17594f to your computer and use it in GitHub Desktop.

Python 向け

リレーサーバー

from moqt import Relay

relay = Relay(
    cert_file="server.crt",
    key_file="server.key",
)

def on_publish(namespace: bytes, track_name: bytes, track_alias: int):
    print(f"PUBLISH: {namespace}/{track_name}")

def on_subscribe(namespace: bytes, track_name: bytes, track_alias: int):
    print(f"SUBSCRIBE: {namespace}/{track_name}")

relay.set_on_publish(on_publish)
relay.set_on_subscribe(on_subscribe)
relay.start(port=4433)

# QUIC: moqt://127.0.0.1:4433/moqt
# WebTransport: https://127.0.0.1:4433/moqt

クライアント (Publisher)

from moqt import Client

with Client(verify=False) as client:
    with client.connect("moqt://127.0.0.1:4433/moqt") as session:
        publisher = session.publish(
            namespace=[b"live"],
            track_name=b"video",
        )

        publisher.send_object(group_id=0, object_id=0, payload=b"Hello, MOQT!")
        publisher.publish_done()

クライアント (Subscriber)

from moqt import Client

with Client(verify=False) as client:
    with client.connect("moqt://127.0.0.1:4433/moqt") as session:
        def on_object(group_id: int, object_id: int, payload: bytes):
            print(f"Received: {payload}")

        subscriber = session.subscribe(
            namespace=[b"live"],
            track_name=b"video",
            on_object=on_object,
        )

        # データ受信待機

        subscriber.unsubscribe()

ブラウザ向け

高レベル API (推奨)

MediaStream を使用した簡単な配信・受信 API です。WebCodecs API を内部で使用し、エンコード・デコードを自動的に行います。

Publisher

import { createMediaPublisher } from "moqt-js"

const stream = await navigator.mediaDevices.getUserMedia({ audio: true, video: true })

const publisher = await createMediaPublisher(
  "https://example.com/moqt",
  {
    namespace: ["live"],
    audio: {
      trackName: "audio",
      codec: "opus",
      bitrate: 128_000,
    },
    video: {
      trackName: "video",
      codec: "h264",
      width: 1280,
      height: 720,
      bitrate: 2_000_000,
      framerate: 30,
    },
  },
  {
    onStateChange: (state) => console.log("state:", state),
    onError: (e) => console.error(e),
  }
)

await publisher.start(stream)

// 配信終了
await publisher.close()

Subscriber

import { createMediaSubscriber } from "moqt-js"

const subscriber = await createMediaSubscriber(
  "https://example.com/moqt",
  {
    namespace: ["live"],
    audio: { trackName: "audio" },
    video: { trackName: "video" },
  },
  {
    onStateChange: (state) => console.log("state:", state),
    onError: (e) => console.error(e),
  }
)

await subscriber.start()

// MediaStream を video 要素に設定
videoElement.srcObject = subscriber.mediaStream

// 受信終了
await subscriber.close()

低レベル API

プロトコルレベルの細かい制御が必要な場合に使用します。WebCodecs API や MediaStream API の処理は自前で実装する必要があります。

Publisher

import { connect } from "moqt-js"

const session = await connect(
  "https://example.com/moqt",
  {
    close: () => console.log("disconnected"),
    error: (e) => console.error(e),
  }
)

const publisher = await session.publish(
  ["live"],
  "video",
  { error: (e) => console.error(e) }
)

publisher.sendObject({ groupId: 0, objectId: 0, payload: new Uint8Array([1, 2, 3]) })
await publisher.done()

Subscriber

import { connect } from "moqt-js"

const session = await connect(
  "https://example.com/moqt",
  {
    close: () => console.log("disconnected"),
    error: (e) => console.error(e),
  }
)

const subscriber = await session.subscribe(
  ["live"],
  "video",
  {
    object: (obj) => console.log(obj),
    end: () => console.log("track ended"),
    error: (e) => console.error(e),
  }
)

// データ受信待機

await subscriber.unsubscribe()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment