Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pulsar source): support lz4 decompression #14787

Closed
ediconss opened this issue Jan 25, 2024 · 10 comments · Fixed by #14833
Closed

feat(pulsar source): support lz4 decompression #14787

ediconss opened this issue Jan 25, 2024 · 10 comments · Fixed by #14833
Assignees
Labels
type/enhancement Improvements to existing implementation.
Milestone

Comments

@ediconss
Copy link

ediconss commented Jan 25, 2024

Describe the bug

my pulsar source table:

CREATE TABLE ods.ods_peach_commodity
(
id int4 primary key
,commodity_id int8 
, commodity_type_id int8
, sku_quantity int8
, status int2
, create_time timestamp with time zone
, create_by int8
, update_time timestamp with time zone
, update_by int8
, deleted int2
, commodity_category_id int8
, commodity_variety_id int8
, tenant_id int8
)
WITH (
   connector='pulsar',
   topic='fc-demo/cdc-fms-pgsql/event-peach.wholesale.commodity',
   service.url='pulsar://192.168.10.34:6650/',
   scan.startup.mode='latest',
   scan.startup.timestamp.millis='1400'
) FORMAT DEBEZIUM ENCODE JSON;

pulsar topic message:

{
    "before":{
        "commodity_id":1653,
        "commodity_type_id":1,
        "sku_quantity":1,
        "status":1,
        "create_time":"2023-09-14T08:12:11.089423Z",
        "create_by":1136099,
        "update_time":"2023-09-14T08:12:28.886734Z",
        "update_by":1136099,
        "deleted":99,
        "commodity_category_id":301,
        "commodity_variety_id":939,
        "tenant_id":1,
        "id":523
    },
    "after":{
        "commodity_id":1653,
        "commodity_type_id":1,
        "sku_quantity":1,
        "status":1,
        "create_time":"2023-09-14T08:12:11.089423Z",
        "create_by":1136099,
        "update_time":"2023-09-14T08:12:28.886734Z",
        "update_by":1136099,
        "deleted":99,
        "commodity_category_id":301,
        "commodity_variety_id":939,
        "tenant_id":1,
        "id":523
    },
    "source":{
        "version":"1.7.1.Final",
        "connector":"postgresql",
        "name":"event-peach",
        "ts_ms":1706161200318,
        "snapshot":"false",
        "db":"peach",
        "sequence":"[\"2883327530864\",\"2883327565352\"]",
        "schema":"wholesale",
        "table":"commodity",
        "txId":186061238,
        "lsn":2883327565352,
        "xmin":null
    },
    "op":"u",
    "ts_ms":1706161200596,
    "transaction":null
}

Error message/log

2024-01-25T05:43:52.925475106Z  WARN   rw-streaming pulsar::consumer::engine: rx terminated    
2024-01-25T05:43:52.925655099Z ERROR   rw-streaming pulsar::consumer::engine: Error sending end event to channel - send failed because receiver is gone    
2024-01-25T05:43:52.925675415Z  WARN   rw-streaming pulsar::consumer::engine: rx terminated    
2024-01-25T05:43:53.511098644Z  WARN        rw-main risingwave_common::util::resource_util::runtime: failed to get cpu quota in container, use system value instead error=not a number cgroup_version=V1
2024-01-25T05:43:53.926584802Z  WARN   rw-streaming actor{otel.name="Actor 458" actor_id=458 prev_epoch=5827869508304896 curr_epoch=5827869573840896}:executor{otel.name="Source 1CA00002715 (actor 458)" actor_id=458}: risingwave_stream::executor::source::source_executor: stream source reader error, actor: 458, source: TableId { table_id: 1038 }
2024-01-25T05:43:53.926855353Z  INFO   rw-streaming actor{otel.name="Actor 458" actor_id=458 prev_epoch=5827869508304896 curr_epoch=5827869573840896}:executor{otel.name="Source 1CA00002715 (actor 458)" actor_id=458}: risingwave_stream::executor::source::source_executor: actor 458 apply source split change to [Pulsar(PulsarSplit { topic: Topic { domain: "persistent", tenant: "fc-demo", namespace: "cdc-fms-pgsql", topic: "event-peach.wholesale.commodity", partition_index: None }, start_offset: Timestamp(1400) })]
2024-01-25T05:43:53.948018393Z  INFO   rw-streaming actor{otel.name="Actor 458" actor_id=458 prev_epoch=5827869508304896 curr_epoch=5827869573840896}:executor{otel.name="Source 1CA00002715 (actor 458)" actor_id=458}:build:new:new:connect:connect_inner: pulsar::connection_manager: Connected n°a597aa8a-6337-4aab-bc08-644eb19bce21 to pulsar://192.168.10.34:6650/ in 21ms    
2024-01-25T05:43:53.988121486Z  INFO   rw-streaming actor{otel.name="Actor 458" actor_id=458 prev_epoch=5827869508304896 curr_epoch=5827869573840896}:executor{otel.name="Source 1CA00002715 (actor 458)" actor_id=458}:build:validate:lookup_partitioned_topic:lookup_partitioned_topic:lookup_topic:get_connection:connect:connect_inner: pulsar::connection_manager: Connected n°e4a3f931-8c3f-4e7b-a81f-9c49990958ca to localhost:6650 via proxy pulsar://192.168.10.34:6650/ in 18ms    
2024-01-25T05:43:53.988260959Z  WARN   rw-streaming actor{otel.name="Actor 458" actor_id=458 prev_epoch=5827869508304896 curr_epoch=5827869573840896}:executor{otel.name="Source 1CA00002715 (actor 458)" actor_id=458}:build:new: pulsar::retry_op: Retry #0 -> connecting consumer 35 using connection e4a3f931-8c3f-4e7b-a81f-9c49990958ca to broker pulsar://192.168.10.34:6650/ to topic persistent://fc-demo/cdc-fms-pgsql/event-peach.wholesale.commodity

To Reproduce

No response

Expected behavior

No response

How did you deploy RisingWave?

No response

The version of RisingWave

docker
PostgreSQL 9.5-RisingWave-1.6.0

Additional context

No response

@ediconss ediconss added the type/bug Something isn't working label Jan 25, 2024
@github-actions github-actions bot added this to the release-1.7 milestone Jan 25, 2024
@xiangjinwu
Copy link
Contributor

Could you connect to prometheus (port 9500) and query for user_source_reader_error?

@ediconss
Copy link
Author

ediconss commented Jan 26, 2024

Could you connect to prometheus (port 9500) and query for user_source_reader_error?

Prometheus need to be installed separately? I am currently using Docker for installation

@xiangjinwu
Copy link
Contributor

Prometheus need to be installed separately? I am currently using Docker for installation

I thought it was started with docker compose, which includes a prometheus container.

If that is not the case, we can also get the info without prometheus. Depending on your deployment mode:

mode host:port
standalone (single process) host-or-ip:1250
separate meta, compute, frontend, compactor host-or-ip-of-compute:1222

run http://host:port/metrics' | grep user_source_reader_error

@ediconss
Copy link
Author

ediconss commented Jan 26, 2024

now i started with docker compose, but not found the metrics user_source_reader_error in Prometheus and use http://host:port/metrics

@xiangjinwu
Copy link
Contributor

user_source_reader_error only appears when you also see risingwave_stream::executor::source::source_executor: stream source reader error in the log, as you have pasted above.

Alternatively, if you are still experimenting, you can also try docker image tagged with nightly-20240125, which would include the error detail in log in addition to the user_source_reader_error metric. However nightly images are not well tested and may have other issues compared to releases. Make sure it is only used in a separated environment.

@ediconss
Copy link
Author

okey, Thanks, I will test it later

@ediconss
Copy link
Author

ediconss commented Jan 29, 2024

rw-streaming pulsar::consumer::engine: rx terminated
2024-01-29 10:26:53 2024-01-29T02:26:53.220073881Z ERROR rw-streaming pulsar::consumer::engine: Error sending end event to channel - send failed because receiver is gone
2024-01-29 10:26:53 2024-01-29T02:26:53.220091752Z WARN rw-streaming pulsar::consumer::engine: rx terminated
2024-01-29 10:26:53 2024-01-29T02:26:53.662540901Z WARN rw-main risingwave_common::util::resource_util::runtime: failed to get cpu quota in container, use system value instead error=not a number cgroup_version=V1
2024-01-29 10:26:54 2024-01-29T02:26:54.220704013Z WARN rw-streaming actor{otel.name="Actor 19" actor_id=19 prev_epoch=5849744124280832 curr_epoch=5849744189816832}:executor{otel.name="Source 1300002715 (actor 19)"}: risingwave_stream::executor::source::source_executor: stream source reader error error=Connector error: consumer error: Decompression error: got a LZ4 compressed message but 'lz4' cargo feature is deactivated
2024-01-29 10:26:54
2024-01-29 10:26:54 Backtrace:
2024-01-29 10:26:54 disabled backtrace

@ediconss
Copy link
Author

i used pulsar connection , pulsar used lz4 compressed message

@xiangjinwu xiangjinwu changed the title bug(pulsar source): use pulsar source consume debezium message, can't Ingest data feat(pulsar source): support lz4 decompression Jan 29, 2024
@xiangjinwu xiangjinwu added type/enhancement Improvements to existing implementation. and removed type/bug Something isn't working labels Jan 29, 2024
@xiangjinwu
Copy link
Contributor

Thanks for your info. Then either RisingWave will need to support decompressing lz4 message from pulsar, or you can proceed by disabling lz4 compression on the producer side before the feature is available.

@ediconss
Copy link
Author

disable lz4 compression it's okey, Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement Improvements to existing implementation.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants