Skip to content

Instantly share code, notes, and snippets.

@DeoLeung
Created August 13, 2025 02:10
Show Gist options
  • Select an option

  • Save DeoLeung/5f3f48acc64bf35509133044e5872520 to your computer and use it in GitHub Desktop.

Select an option

Save DeoLeung/5f3f48acc64bf35509133044e5872520 to your computer and use it in GitHub Desktop.
livekit stt supports switch between push-to-talk and turn detection
"""
this is the basic stucture supports switching betwween ptt and td in livekit agent
I may later summit a full example and/or opensource my stt/tts implementation using volcengine, if...if i have time :)
"""
# overwrite agent's stt_node
async def stt_node(self, audio, model_settings):
activity = self._get_activity_or_raise()
assert activity.stt is not None, (
'stt_node called but no STT node is available'
)
conn_options = activity.session.conn_options.stt_conn_options
while True:
async with activity.stt.stream(conn_options=conn_options) as stream:
# NOTICE: your stt module should implement `on_demand` features, which means `sleep` until audio frames come in
on_demand = activity.stt._opts.on_demand
main_logger.info(
'stt_node stream called, current on_demand: %s',
on_demand,
)
button_pushed = False
reconnect = asyncio.Event()
# create an asyncio channel
event_queue = asyncio.Queue()
@utils.log_exceptions(logger=main_logger)
async def _forward_input(on_demand: bool) -> None:
nonlocal reconnect
nonlocal button_pushed
async for frame in audio:
if on_demand:
if stream.button_pushed:
stream.push_frame(frame)
button_pushed = True
elif button_pushed:
# use flush sentinel to tell stt pushed change
stream._input_ch.send_nowait(stream._FlushSentinel())
button_pushed = False
else:
stream.push_frame(frame)
if on_demand != activity.stt._opts.on_demand:
main_logger.info(
'switching STT on_demand from %s to %s',
on_demand,
activity.stt._opts.on_demand,
)
reconnect.set()
break
# send event of stream into the channel
@utils.log_exceptions(logger=main_logger)
async def _stream_events():
async for event in stream:
await event_queue.put(event)
await event_queue.put(None) # End of stream marker
# add another task to send a None into the channel if reconnect is set
@utils.log_exceptions(logger=main_logger)
async def _reconnect_watcher():
nonlocal reconnect
await reconnect.wait()
await event_queue.put(None) # Signal reconnection
forward_task = asyncio.create_task(_forward_input(on_demand=on_demand))
stream_task = asyncio.create_task(_stream_events())
reconnect_task = asyncio.create_task(_reconnect_watcher())
try:
# in outer part, get event in channel and yield it
while True:
event = await event_queue.get()
if event is None:
# if reconnect, break the loop
break
yield event
finally:
await utils.aio.cancel_and_wait(
forward_task, stream_task, reconnect_task
)
if not reconnect.is_set():
break
# register these rpcs to set the on_demand
@ctx.room.local_participant.register_rpc_method('stt_button_push')
async def stt_button_push(data: rtc.RpcInvocationData):
pushed = False
for stream in stt_v3._streams:
pushed = stream.button_push()
main_logger.info(
'stt_button_push called, current on_demand: %s, pushed: %s',
stt_v3._opts.on_demand,
pushed,
)
return str(pushed)
@ctx.room.local_participant.register_rpc_method('stt_audio_submit')
async def stt_audio_submit(data: rtc.RpcInvocationData):
submitted = False
for stream in stt_v3._streams:
submitted = stream.audio_submit()
main_logger.info(
'stt_audio_submit called, current on_demand: %s, submitted: %s',
stt_v3._opts.on_demand,
submitted,
)
return str(submitted)
# subclass your stt module to implement the push events
# NOTICE: you still need to implement the main logic in _run to fully use these events
class SpeechStream(stt.SpeechStream):
def __init__():
...
self._reconnect_event = asyncio.Event()
self._button_pushed = asyncio.Event()
self._audio_submitted = asyncio.Event()
self._audio_cancelled = asyncio.Event()
logger.info('stt:stream init')
@property
def button_pushed(self) -> bool:
if not self._opts.on_demand:
return True
return self._button_pushed.is_set()
def button_push(self):
if self._opts.on_demand:
self._button_pushed.set()
logger.info('stt:start push to talk')
return True
return False
def audio_submit(self):
if self._opts.on_demand:
if self._button_pushed.is_set():
self._audio_submitted.set()
logger.info('stt:audio submitted')
self._button_pushed.clear()
return True
return False
def audio_cancel(self):
if self._opts.on_demand:
if self._button_pushed.is_set():
self._audio_cancelled.set()
logger.info('stt:audio cancelled')
self._button_pushed.clear()
return True
return False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment