Skip to content

Commit d4d5428

Browse files
committed
add send_bytes
1 parent ab25c30 commit d4d5428

File tree

3 files changed

+66
-1
lines changed

3 files changed

+66
-1
lines changed

livekit-protocol/protocol

Submodule protocol updated 79 files

livekit/src/room/data_stream/outgoing.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,54 @@ impl OutgoingStreamManager {
386386
Ok(info)
387387
}
388388

389+
/// Send bytes to participants in the room.
390+
///
391+
/// This method sends an in-memory blob of bytes to participants in the room
392+
/// as a byte stream. It opens a stream using the provided options, writes the
393+
/// entire buffer, and closes the stream before returning.
394+
///
395+
/// The `total_length` in the header is set from the provided data and is not
396+
/// overridable by `options.total_length`.
397+
pub async fn send_bytes(
398+
&self,
399+
data: impl AsRef<[u8]>,
400+
options: StreamByteOptions,
401+
) -> StreamResult<ByteStreamInfo> {
402+
let bytes = data.as_ref();
403+
404+
let byte_header = proto::data_stream::ByteHeader {
405+
name: options.name.unwrap_or_default(),
406+
};
407+
let header = proto::data_stream::Header {
408+
stream_id: options.id.unwrap_or_else(|| create_random_uuid()),
409+
timestamp: Utc::now().timestamp_millis(),
410+
topic: options.topic,
411+
mime_type: options.mime_type.unwrap_or_else(|| BYTE_MIME_TYPE.to_owned()),
412+
total_length: Some(bytes.len() as u64), // not overridable
413+
encryption_type: proto::encryption::Type::None.into(),
414+
attributes: options.attributes,
415+
content_header: Some(proto::data_stream::header::ContentHeader::ByteHeader(
416+
byte_header.clone(),
417+
)),
418+
};
419+
420+
let open_options = RawStreamOpenOptions {
421+
header: header.clone(),
422+
destination_identities: options.destination_identities,
423+
packet_tx: self.packet_tx.clone(),
424+
};
425+
let writer = ByteStreamWriter {
426+
info: Arc::new(ByteStreamInfo::from_headers(header, byte_header)),
427+
stream: Arc::new(Mutex::new(RawStream::open(open_options).await?)),
428+
};
429+
430+
let info = (*writer.info).clone();
431+
writer.write(bytes).await?;
432+
writer.close().await?;
433+
434+
Ok(info)
435+
}
436+
389437
pub async fn send_file(
390438
&self,
391439
path: impl AsRef<Path>,

livekit/src/room/participant/local_participant.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -945,6 +945,23 @@ impl LocalParticipant {
945945
self.session().unwrap().outgoing_stream_manager.send_file(path, options).await
946946
}
947947

948+
/// Send an in-memory blob of bytes to participants in the room.
949+
///
950+
/// This method sends a provided byte slice as a byte stream.
951+
///
952+
/// # Arguments
953+
///
954+
/// * `data` - The bytes to send.
955+
/// * `options` - Configuration options for the byte stream, including topic and
956+
/// destination participants.
957+
pub async fn send_bytes(
958+
&self,
959+
data: impl AsRef<[u8]>,
960+
options: StreamByteOptions,
961+
) -> StreamResult<ByteStreamInfo> {
962+
self.session().unwrap().outgoing_stream_manager.send_bytes(data, options).await
963+
}
964+
948965
/// Stream text incrementally to participants in the room.
949966
///
950967
/// This method allows sending text data in chunks as it becomes available.

0 commit comments

Comments
 (0)