Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added zlib reading and upgraded crc library to move to OTP > 23 #11

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{erl_opts, [debug_info]}.
{deps, [
{gpb, "4.10.6"},
{crc32cer, "0.1.3"},
{crc32cer, "0.1.10"},
{erlwater, "0.1.0"}
]}.

Expand Down
16 changes: 15 additions & 1 deletion src/pulserl_io.erl
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,23 @@ decode_metadata(HeadersAndPayload) ->
HeadersAndPayload
end,
<<Metadata:MetadataSize/binary, Payload/binary>> = MetadataPayload,
{pulsar_api:decode_msg(Metadata, 'MessageMetadata'), Payload}
MetaData = pulsar_api:decode_msg(Metadata, 'MessageMetadata'),
uncompress(MetaData, Payload)
end.

uncompress(#'MessageMetadata'{compression = 'ZLIB'} = MetaData, Payload) ->
Z = zlib:open(),
ok = zlib:inflateInit(Z),
[UnzippedPayload] = zlib:inflate(Z, Payload),
zlib:close(Z),
Comment on lines +118 to +121
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @RobKamstra!
This looks great ✨.

I think de/compression is a good feature to have especially when zlib implementation is already part of the erlang OTP.
I just have one suggestion, can we move the decompression logic upward like to the connection or consumer process? That way we can keep the zlibstream open for reuse instead of opening and closing every time for every inflation.

I traced the usage of pulserl_io:decode_metadata(HeadersAndPayload), where you do the inflation to https://github.com/skulup/pulserl/blob/bc6ac2fd9a52b24483d8bad75719bac4e9442d3e/src/pulserl_consumer.erl#L377

which is a handler callback to
https://github.com/skulup/pulserl/blob/bc6ac2fd9a52b24483d8bad75719bac4e9442d3e/src/pulserl_conn.erl#L349

From this standpoint it seems we've two places to do this: The connection process or the consumer process.

It seems like zlibstream per consumer process will be the better option as we may not want to block the connection process from doing IO stuffs.

What do you think?

{MetaData, UnzippedPayload};

uncompress(#'MessageMetadata'{compression = 'NONE'} = MetaData, Payload) ->
{MetaData, Payload};

uncompress(_MetaData, _Payload) ->
{error, unsupported_compression}.

verify_checksum(HeadersAndPayload) ->
case has_checksum(HeadersAndPayload) of
true ->
Expand Down