Skip to content

text input from datastream for v1.0 #1521

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion livekit-agents/livekit/agents/llm/realtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class InputSpeechStartedEvent:

@dataclass
class InputSpeechStoppedEvent:
pass
user_transcription_enabled: bool


@dataclass
Expand Down
6 changes: 6 additions & 0 deletions livekit-agents/livekit/agents/pipeline/pipeline_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,12 @@ def generate_reply(
allow_interruptions=allow_interruptions,
)

def interrupt(self) -> None:
if self._activity is None:
raise ValueError("PipelineAgent isn't running")

self._activity.interrupt()

def update_task(self, task: AgentTask) -> None:
self._agent_task = task

Expand Down
36 changes: 35 additions & 1 deletion livekit-agents/livekit/agents/pipeline/room_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

@dataclass(frozen=True)
class RoomInputOptions:
text_enabled: bool = True
"""Whether to subscribe to text input"""
audio_enabled: bool = True
"""Whether to subscribe to audio"""
video_enabled: bool = False
Expand Down Expand Up @@ -48,6 +50,7 @@ class RoomOutputOptions:
DEFAULT_ROOM_INPUT_OPTIONS = RoomInputOptions()
DEFAULT_ROOM_OUTPUT_OPTIONS = RoomOutputOptions()
LK_PUBLISH_FOR_ATTR = "lk.publish_for"
LK_TEXT_INPUT_TOPIC = "lk.room_text_input"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should be clear about what topics we want to support. If the goal is to have this work ootb with chat components, then choosing a custom topic here might not be the best choice

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the question is are we going to keep the chat components in python/js sdk, and having both the original and the datastream or only the datastream for it? If so I can adjust here accordingly. cc @davidzhao

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the chat components are for client-side. but Python/Node agents should agree on the same topic so that it works with the client-side components

@lukasIO what do you recommend we should use? is client-side component listening to both the transcription topic and chat?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently chat components only listen to the chat topic and also send their messages only on the chat topic

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how would this work with how we are sending transcriptions? do you suggest also sending transcriptions to chat topic?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that was my understanding, yes. But maybe I misunderstood or am forgetting something.
Why wouldn't you want it on the chat topic?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mostly wondering if there's any conflicts between what the agent would want to use as input.. versus what is being spit out as output.

i.e. if there are two agents in the room, would that cause any cross talk.. or if the agent is being added to a livestream with a chat feature, would it automatically start interpreting random transcripts.

for that reason it seems it might be a good idea to be explicit about what is being sent to the agent as input?



class BaseStreamHandle:
Expand Down Expand Up @@ -226,6 +229,7 @@ def __init__(
"""
self._options = options
self._room = room
self._agent: Optional["PipelineAgent"] = None
self._tasks: set[asyncio.Task] = set()

# target participant
Expand Down Expand Up @@ -263,6 +267,12 @@ def __init__(
for participant in self._room.remote_participants.values():
self._on_participant_connected(participant)

# text input from datastream
if options.text_enabled:
self._room.register_text_stream_handler(
LK_TEXT_INPUT_TOPIC, self._on_text_input
)

@property
def audio(self) -> AsyncIterator[rtc.AudioFrame] | None:
if not self._audio_handle:
Expand All @@ -287,7 +297,9 @@ async def start(self, agent: Optional["PipelineAgent"] = None) -> None:
# link to the first connected participant if not set
self.set_participant(participant.identity)

if not agent:
# TODO(long): should we force the agent to be set or provide a set_agent method?
self._agent = agent
if not self._agent:
return

agent.input.audio = self.audio
Expand Down Expand Up @@ -399,6 +411,28 @@ async def _capture_text():
self._tasks.add(task)
task.add_done_callback(self._tasks.discard)

def _on_text_input(
self, reader: rtc.TextStreamReader, participant_identity: str
) -> None:
if participant_identity != self._participant_identity:
return

async def _read_text():
if not self._agent:
return

text = await reader.read_all()
logger.debug(
"received text input",
extra={"text": text, "participant": self._participant_identity},
)
self._agent.interrupt()
self._agent.generate_reply(user_input=text)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are calling agent inside the RoomInput, is that okay? should the agent.input has a text input, or we make agent required for RoomInput?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make agent required

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll do that when merging the RoomInput and Output.


task = asyncio.create_task(_read_text())
self._tasks.add(task)
task.add_done_callback(self._tasks.discard)

async def aclose(self) -> None:
self._room.off("participant_connected", self._on_participant_connected)
self._room.off("participant_disconnected", self._on_participant_disconnected)
Expand Down
13 changes: 7 additions & 6 deletions livekit-agents/livekit/agents/pipeline/task_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,14 +319,15 @@ def _on_input_speech_started(self, _: llm.InputSpeechStartedEvent) -> None:
log_event("input_speech_started")
self.interrupt() # input_speech_started is also interrupting on the serverside realtime session

def _on_input_speech_stopped(self, _: llm.InputSpeechStoppedEvent) -> None:
def _on_input_speech_stopped(self, ev: llm.InputSpeechStoppedEvent) -> None:
log_event("input_speech_stopped")
self.on_interim_transcript(
stt.SpeechEvent(
stt.SpeechEventType.INTERIM_TRANSCRIPT,
alternatives=[stt.SpeechData(text="", language="")],
if ev.user_transcription_enabled:
self.on_interim_transcript(
stt.SpeechEvent(
stt.SpeechEventType.INTERIM_TRANSCRIPT,
alternatives=[stt.SpeechData(text="", language="")],
)
)
)

def _on_input_audio_transcription_completed(
self, ev: llm.InputTranscriptionCompleted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,15 @@ def _handle_input_audio_buffer_speech_started(
def _handle_input_audio_buffer_speech_stopped(
self, _: InputAudioBufferSpeechStoppedEvent
) -> None:
self.emit("input_speech_stopped", llm.InputSpeechStoppedEvent())
user_transcription_enabled = (
self._realtime_model._opts.input_audio_transcription is not None
)
self.emit(
"input_speech_stopped",
llm.InputSpeechStoppedEvent(
user_transcription_enabled=user_transcription_enabled
),
)

def _handle_response_created(self, event: ResponseCreatedEvent) -> None:
assert event.response.id is not None, "response.id is None"
Expand Down