Skip to content

Commit

Permalink
FBS: Simplify Data[Consumer|Producer] send messages
Browse files Browse the repository at this point in the history
For some reason we were not sending everything as binary in this branch.
  • Loading branch information
jmillan committed Oct 27, 2023
1 parent 19c5f94 commit deff052
Show file tree
Hide file tree
Showing 8 changed files with 1,364 additions and 2,543 deletions.
13 changes: 2 additions & 11 deletions node/src/DataConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -517,23 +517,14 @@ export class DataConsumer<DataConsumerAppData extends AppData = AppData>

if (typeof message === 'string')
{
const messageOffset = builder.createString(message);

dataOffset = FbsDataConsumer.String.createString(builder, messageOffset);
message = Buffer.from(message);
}
else
{
const messageOffset = FbsDataConsumer.Binary.createValueVector(builder, message);

dataOffset = FbsDataConsumer.Binary.createBinary(builder, messageOffset);
}
dataOffset = FbsDataConsumer.SendRequest.createDataVector(builder, message);

const requestOffset = FbsDataConsumer.SendRequest.createSendRequest(
builder,
ppid,
typeof message === 'string' ?
FbsDataConsumer.Data.String :
FbsDataConsumer.Data.Binary,
dataOffset
);

Expand Down
13 changes: 2 additions & 11 deletions node/src/DataProducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -442,23 +442,14 @@ export class DataProducer<DataProducerAppData extends AppData = AppData>

if (typeof message === 'string')
{
const messageOffset = builder.createString(message);

dataOffset = FbsDataProducer.String.createString(builder, messageOffset);
message = Buffer.from(message);
}
else
{
const messageOffset = FbsDataProducer.Binary.createValueVector(builder, message);

dataOffset = FbsDataProducer.Binary.createBinary(builder, messageOffset);
}
dataOffset = FbsDataProducer.SendNotification.createDataVector(builder, message);

const notificationOffset = FbsDataProducer.SendNotification.createSendNotification(
builder,
ppid,
typeof message === 'string' ?
FbsDataProducer.Data.String :
FbsDataProducer.Data.Binary,
dataOffset,
subchannelsOffset,
requiredSubchannel ?? null
Expand Down
3,813 changes: 1,350 additions & 2,463 deletions rust/src/fbs.rs

Large diffs are not rendered by default.

8 changes: 2 additions & 6 deletions rust/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2878,12 +2878,10 @@ impl Notification for DataProducerSendNotification {
fn into_bytes(self, handler_id: Self::HandlerId) -> Vec<u8> {
let mut builder = Builder::new();

let binary_data = data_producer::Binary::create(&mut builder, self.payload);
let binary = data_producer::Data::create_binary(&mut builder, binary_data);
let data = data_producer::SendNotification::create(
&mut builder,
self.ppid,
binary,
self.payload,
self.subchannels,
self.required_subchannel,
);
Expand Down Expand Up @@ -3179,9 +3177,7 @@ impl Request for DataConsumerSendRequest {
fn into_bytes(self, id: u32, handler_id: Self::HandlerId) -> Vec<u8> {
let mut builder = Builder::new();

let binary_data = data_consumer::Binary::create(&mut builder, self.payload);
let binary = data_consumer::Data::create_binary(&mut builder, binary_data);
let data = data_consumer::SendRequest::create(&mut builder, self.ppid, binary);
let data = data_consumer::SendRequest::create(&mut builder, self.ppid, self.payload);
let request_body = request::Body::create_data_consumer_send_request(&mut builder, data);

let request = request::Request::create(
Expand Down
11 changes: 1 addition & 10 deletions worker/fbs/dataConsumer.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,9 @@ table String {
value: string (required);
}

table Binary {
value: [uint8] (required);
}

union Data {
String,
Binary
}

table SendRequest {
ppid: uint32;
data: Data (required);
data: [uint8] (required);
}

table SetSubchannelsRequest {
Expand Down
15 changes: 1 addition & 14 deletions worker/fbs/dataProducer.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,9 @@ table GetStatsResponse {
buffered_amount: uint32;
}

table String {
value: string (required);
}

table Binary {
value: [uint8] (required);
}

union Data {
String,
Binary
}

table SendNotification {
ppid: uint32;
data: Data (required);
data: [uint8] (required);
subchannels: [uint16];
required_subchannel: uint16 = null;
}
Expand Down
17 changes: 3 additions & 14 deletions worker/src/RTC/DataConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,20 +278,9 @@ namespace RTC
MS_THROW_ERROR("no SCTP association present");
}

const auto* body = request->data->body_as<FBS::DataConsumer::SendRequest>();
const uint8_t* data{ nullptr };
size_t len{ 0 };

if (body->data_type() == FBS::DataConsumer::Data::String)
{
data = body->data_as_String()->value()->Data();
len = body->data_as_String()->value()->size();
}
else
{
data = body->data_as_Binary()->value()->Data();
len = body->data_as_Binary()->value()->size();
}
const auto* body = request->data->body_as<FBS::DataConsumer::SendRequest>();
const uint8_t* data = body->data()->Data();
size_t len = body->data()->size();

const int ppid = body->ppid();

Expand Down
17 changes: 3 additions & 14 deletions worker/src/RTC/DataProducer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,20 +199,9 @@ namespace RTC
{
case Channel::ChannelNotification::Event::DATAPRODUCER_SEND:
{
const auto* body = notification->data->body_as<FBS::DataProducer::SendNotification>();
const uint8_t* data{ nullptr };
size_t len{ 0 };

if (body->data_type() == FBS::DataProducer::Data::String)
{
data = body->data_as_String()->value()->Data();
len = body->data_as_String()->value()->size();
}
else
{
data = body->data_as_Binary()->value()->Data();
len = body->data_as_Binary()->value()->size();
}
const auto* body = notification->data->body_as<FBS::DataProducer::SendNotification>();
const uint8_t* data = body->data()->Data();
size_t len = body->data()->size();

if (len > this->maxMessageSize)
{
Expand Down

0 comments on commit deff052

Please sign in to comment.