Created
August 13, 2025 02:10
-
-
Save DeoLeung/5f3f48acc64bf35509133044e5872520 to your computer and use it in GitHub Desktop.
livekit stt supports switch between push-to-talk and turn detection
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| this is the basic stucture supports switching betwween ptt and td in livekit agent | |
| I may later summit a full example and/or opensource my stt/tts implementation using volcengine, if...if i have time :) | |
| """ | |
| # overwrite agent's stt_node | |
| async def stt_node(self, audio, model_settings): | |
| activity = self._get_activity_or_raise() | |
| assert activity.stt is not None, ( | |
| 'stt_node called but no STT node is available' | |
| ) | |
| conn_options = activity.session.conn_options.stt_conn_options | |
| while True: | |
| async with activity.stt.stream(conn_options=conn_options) as stream: | |
| # NOTICE: your stt module should implement `on_demand` features, which means `sleep` until audio frames come in | |
| on_demand = activity.stt._opts.on_demand | |
| main_logger.info( | |
| 'stt_node stream called, current on_demand: %s', | |
| on_demand, | |
| ) | |
| button_pushed = False | |
| reconnect = asyncio.Event() | |
| # create an asyncio channel | |
| event_queue = asyncio.Queue() | |
| @utils.log_exceptions(logger=main_logger) | |
| async def _forward_input(on_demand: bool) -> None: | |
| nonlocal reconnect | |
| nonlocal button_pushed | |
| async for frame in audio: | |
| if on_demand: | |
| if stream.button_pushed: | |
| stream.push_frame(frame) | |
| button_pushed = True | |
| elif button_pushed: | |
| # use flush sentinel to tell stt pushed change | |
| stream._input_ch.send_nowait(stream._FlushSentinel()) | |
| button_pushed = False | |
| else: | |
| stream.push_frame(frame) | |
| if on_demand != activity.stt._opts.on_demand: | |
| main_logger.info( | |
| 'switching STT on_demand from %s to %s', | |
| on_demand, | |
| activity.stt._opts.on_demand, | |
| ) | |
| reconnect.set() | |
| break | |
| # send event of stream into the channel | |
| @utils.log_exceptions(logger=main_logger) | |
| async def _stream_events(): | |
| async for event in stream: | |
| await event_queue.put(event) | |
| await event_queue.put(None) # End of stream marker | |
| # add another task to send a None into the channel if reconnect is set | |
| @utils.log_exceptions(logger=main_logger) | |
| async def _reconnect_watcher(): | |
| nonlocal reconnect | |
| await reconnect.wait() | |
| await event_queue.put(None) # Signal reconnection | |
| forward_task = asyncio.create_task(_forward_input(on_demand=on_demand)) | |
| stream_task = asyncio.create_task(_stream_events()) | |
| reconnect_task = asyncio.create_task(_reconnect_watcher()) | |
| try: | |
| # in outer part, get event in channel and yield it | |
| while True: | |
| event = await event_queue.get() | |
| if event is None: | |
| # if reconnect, break the loop | |
| break | |
| yield event | |
| finally: | |
| await utils.aio.cancel_and_wait( | |
| forward_task, stream_task, reconnect_task | |
| ) | |
| if not reconnect.is_set(): | |
| break | |
| # register these rpcs to set the on_demand | |
| @ctx.room.local_participant.register_rpc_method('stt_button_push') | |
| async def stt_button_push(data: rtc.RpcInvocationData): | |
| pushed = False | |
| for stream in stt_v3._streams: | |
| pushed = stream.button_push() | |
| main_logger.info( | |
| 'stt_button_push called, current on_demand: %s, pushed: %s', | |
| stt_v3._opts.on_demand, | |
| pushed, | |
| ) | |
| return str(pushed) | |
| @ctx.room.local_participant.register_rpc_method('stt_audio_submit') | |
| async def stt_audio_submit(data: rtc.RpcInvocationData): | |
| submitted = False | |
| for stream in stt_v3._streams: | |
| submitted = stream.audio_submit() | |
| main_logger.info( | |
| 'stt_audio_submit called, current on_demand: %s, submitted: %s', | |
| stt_v3._opts.on_demand, | |
| submitted, | |
| ) | |
| return str(submitted) | |
| # subclass your stt module to implement the push events | |
| # NOTICE: you still need to implement the main logic in _run to fully use these events | |
| class SpeechStream(stt.SpeechStream): | |
| def __init__(): | |
| ... | |
| self._reconnect_event = asyncio.Event() | |
| self._button_pushed = asyncio.Event() | |
| self._audio_submitted = asyncio.Event() | |
| self._audio_cancelled = asyncio.Event() | |
| logger.info('stt:stream init') | |
| @property | |
| def button_pushed(self) -> bool: | |
| if not self._opts.on_demand: | |
| return True | |
| return self._button_pushed.is_set() | |
| def button_push(self): | |
| if self._opts.on_demand: | |
| self._button_pushed.set() | |
| logger.info('stt:start push to talk') | |
| return True | |
| return False | |
| def audio_submit(self): | |
| if self._opts.on_demand: | |
| if self._button_pushed.is_set(): | |
| self._audio_submitted.set() | |
| logger.info('stt:audio submitted') | |
| self._button_pushed.clear() | |
| return True | |
| return False | |
| def audio_cancel(self): | |
| if self._opts.on_demand: | |
| if self._button_pushed.is_set(): | |
| self._audio_cancelled.set() | |
| logger.info('stt:audio cancelled') | |
| self._button_pushed.clear() | |
| return True | |
| return False |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment