Skip to content

Commit

Permalink
delta improvements, introducing since option (#75)
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia authored Oct 8, 2024
1 parent 3e0a8c4 commit 38dd908
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,12 @@ private void sendSubscribeSynchronized(String channel, Protocol.SubscribeRequest
if (Client.this.getState() != ClientState.CONNECTED) {
return;
}
this.handleSubscribeReply(channel, reply);
try {
this.handleSubscribeReply(channel, reply);
} catch (Exception e) {
// Should never happen.
e.printStackTrace();
}
this.futures.remove(cmd.getId());
}).orTimeout(this.opts.getTimeout(), TimeUnit.MILLISECONDS).exceptionally(e -> {
this.executor.submit(() -> {
Expand Down Expand Up @@ -602,7 +607,7 @@ public void removeSubscription(Subscription sub) {
}
}

private void handleSubscribeReply(String channel, Protocol.Reply reply) {
private void handleSubscribeReply(String channel, Protocol.Reply reply) throws Exception {
Subscription sub = this.getSub(channel);
if (sub != null) {
Protocol.SubscribeResult result;
Expand Down Expand Up @@ -868,25 +873,11 @@ private void processReply(Protocol.Reply reply) throws Exception {
}
}

private void handlePub(String channel, Protocol.Publication pub) throws Exception {
void handlePub(String channel, Protocol.Publication pub) throws Exception {
ClientInfo info = ClientInfo.fromProtocolClientInfo(pub.getInfo());
Subscription sub = this.getSub(channel);
if (sub != null) {
PublicationEvent event = new PublicationEvent();
byte[] pubData = pub.getData().toByteArray();
byte[] prevData = sub.getPrevData();
if (prevData != null && pub.getDelta()) {
pubData = Fossil.applyDelta(prevData, pubData);
}
sub.setPrevData(pubData);
event.setData(pubData);
event.setInfo(info);
event.setOffset(pub.getOffset());
event.setTags(pub.getTagsMap());
if (pub.getOffset() > 0) {
sub.setOffset(pub.getOffset());
}
sub.getListener().onPublication(sub, event);
sub.handlePublication(pub);
} else {
ServerSubscription serverSub = this.getServerSub(channel);
if (serverSub != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ public class Subscription {
private int resubscribeAttempts = 0;
private String token;
private com.google.protobuf.ByteString data;
private String delta;
private boolean deltaNegotiated;
private byte[] prevData;

Expand All @@ -42,8 +41,12 @@ public class Subscription {
this.data = com.google.protobuf.ByteString.copyFrom(opts.getData());
}
this.prevData = null;
this.delta = "";
this.deltaNegotiated = false;
if (opts.getSince() != null) {
this.offset = opts.getSince().getOffset();
this.epoch = opts.getSince().getEpoch();
this.recover = true;
}
}

Subscription(final Client client, final String channel, final SubscriptionEventListener listener) {
Expand All @@ -70,7 +73,7 @@ long getOffset() {
return offset;
}

void setOffset(long offset) {
private void setOffset(long offset) {
this.offset = offset;
}

Expand Down Expand Up @@ -172,7 +175,26 @@ void moveToUnsubscribed(boolean sendUnsubscribe, int code, String reason) {
this._unsubscribe(sendUnsubscribe, code, reason);
}

void moveToSubscribed(Protocol.SubscribeResult result) {
void handlePublication(Protocol.Publication pub) throws Exception {
ClientInfo info = ClientInfo.fromProtocolClientInfo(pub.getInfo());
PublicationEvent event = new PublicationEvent();
byte[] pubData = pub.getData().toByteArray();
byte[] prevData = this.getPrevData();
if (prevData != null && pub.getDelta()) {
pubData = Fossil.applyDelta(prevData, pubData);
}
this.setPrevData(pubData);
event.setData(pubData);
event.setInfo(info);
event.setOffset(pub.getOffset());
event.setTags(pub.getTagsMap());
if (pub.getOffset() > 0) {
this.setOffset(pub.getOffset());
}
this.listener.onPublication(this, event);
}

void moveToSubscribed(Protocol.SubscribeResult result) throws Exception {
this.setState(SubscriptionState.SUBSCRIBED);
if (result.getRecoverable()) {
this.recover = true;
Expand All @@ -189,12 +211,7 @@ void moveToSubscribed(Protocol.SubscribeResult result) {

if (result.getPublicationsCount() > 0) {
for (Protocol.Publication publication : result.getPublicationsList()) {
PublicationEvent publicationEvent = new PublicationEvent();
publicationEvent.setData(publication.getData().toByteArray());
publicationEvent.setOffset(publication.getOffset());
publicationEvent.setInfo(ClientInfo.fromProtocolClientInfo(publication.getInfo()));
this.listener.onPublication(this, publicationEvent);
this.setOffset(publication.getOffset());
this.client.handlePub(this.channel, publication);
}
} else {
this.setOffset(result.getOffset());
Expand Down Expand Up @@ -468,11 +485,11 @@ private void presenceStatsSynchronized(ResultCallback<PresenceStatsResult> cb) {
}
}

public byte[] getPrevData() {
private byte[] getPrevData() {
return prevData;
}

public void setPrevData(byte[] prevData) {
private void setPrevData(byte[] prevData) {
this.prevData = prevData;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,22 @@ public String getDelta() {
return delta;
}

// setDelta allows using delta compression for subscription. The delta compression
// must be also enabled on server side. The only value at this point is "fossil".
// See https://centrifugal.dev/docs/server/delta_compression.
public void setDelta(String delta) {
this.delta = delta;
}

private String delta = "";

public void setSince(StreamPosition streamPosition) {
this.since = streamPosition;
}

public StreamPosition getSince() {
return since;
}

private StreamPosition since;
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public void onLeave(Subscription sub, LeaveEvent event) {
Subscription sub;
SubscriptionOptions subOpts = new SubscriptionOptions();
// You can set `delta` to `"fossil"` for using delta compression via
// `subOpts.setDelta("fossil")`;
// subOpts.setDelta("fossil");
try {
sub = client.newSubscription("chat:index", subOpts, subListener);
} catch (DuplicateSubscriptionException e) {
Expand Down

0 comments on commit 38dd908

Please sign in to comment.