Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some tables doesn't sync for PG CDC v2.1.0-rt.8 #19681

Closed
cyliu0 opened this issue Dec 4, 2024 · 13 comments · Fixed by #19805 · May be fixed by #19714
Closed

Some tables doesn't sync for PG CDC v2.1.0-rt.8 #19681

cyliu0 opened this issue Dec 4, 2024 · 13 comments · Fixed by #19805 · May be fixed by #19714
Assignees
Labels
priority/critical type/bug Something isn't working
Milestone

Comments

@cyliu0
Copy link
Collaborator

cyliu0 commented Dec 4, 2024

Describe the bug

https://buildkite.com/risingwave-test/ch-benchmark-pg-cdc-shared-source/builds/124#019391f2-6d26-4d75-bcc4-ab39abc277cb/8586-8587

https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&from=1733320258820&to=1733323410627&var-datasource=cdtasocg64074c&var-namespace=ch-benchmark-pg-cdc-shared-source-6ffck&var-instance=benchmark-risingwave&var-pod=All&var-component=All&var-table=All

{
  "consistent": false,
  "table-compare-results": [
    {
      "consistent": false,
      "table-checksums": [
        {
          "url": "[REDACTED]://[REDACTED]:[REDACTED]@benchmark-[REDACTED]ql:5432/[REDACTED]",
          "table-name": "customer",
          "table-checksum": -5419689675690500630,
          "table-rows": 4200000
        },
        {
          "url": "[REDACTED]://root:@benchmark-risingwave-frontend:4567/dev",
          "table-name": "customer",
          "table-checksum": 0,
          "table-rows": 0
        }
      ]
    },
    {
      "consistent": false,
      "table-checksums": [
        {
          "url": "[REDACTED]://[REDACTED]:[REDACTED]@benchmark-[REDACTED]ql:5432/[REDACTED]",
          "table-name": "district",
          "table-checksum": -1426567335868077560,
          "table-rows": 1400
        },
        {
          "url": "[REDACTED]://root:@benchmark-risingwave-frontend:4567/dev",
          "table-name": "district",
          "table-checksum": 0,
          "table-rows": 0
        }
      ]
    },
    {
      "consistent": false,
      "table-checksums": [
        {
          "url": "[REDACTED]://[REDACTED]:[REDACTED]@benchmark-[REDACTED]ql:5432/[REDACTED]",
          "table-name": "history",
          "table-checksum": 7137233099402833883,
          "table-rows": 4234095
        },
        {
          "url": "[REDACTED]://root:@benchmark-risingwave-frontend:4567/dev",
          "table-name": "history",
          "table-checksum": 0,
          "table-rows": 0
        }
      ]
    },
    {
      "consistent": true,
      "table-checksums": [
        {
          "url": "[REDACTED]://[REDACTED]:[REDACTED]@benchmark-[REDACTED]ql:5432/[REDACTED]",
          "table-name": "item",
          "table-checksum": -4944022614500989017,
          "table-rows": 100000
        },
        {
          "url": "[REDACTED]://root:@benchmark-risingwave-frontend:4567/dev",
          "table-name": "item",
          "table-checksum": -4944022614500989017,
          "table-rows": 100000
        }
      ]
    },
    {
      "consistent": false,
      "table-checksums": [
        {
          "url": "[REDACTED]://[REDACTED]:[REDACTED]@benchmark-[REDACTED]ql:5432/[REDACTED]",
          "table-name": "new_order",
          "table-checksum": -4257897487631633246,
          "table-rows": 1216426
        },
        {
          "url": "[REDACTED]://root:@benchmark-risingwave-frontend:4567/dev",
          "table-name": "new_order",
          "table-checksum": 0,
          "table-rows": 0
        }
      ]
    },
    {
      "consistent": true,
      "table-checksums": [
        {
          "url": "[REDACTED]://[REDACTED]:[REDACTED]@benchmark-[REDACTED]ql:5432/[REDACTED]",
          "table-name": "order_line",
          "table-checksum": 3083996299600455118,
          "table-rows": 42352780
        },
        {
          "url": "[REDACTED]://root:@benchmark-risingwave-frontend:4567/dev",
          "table-name": "order_line",
          "table-checksum": 3083996299600455118,
          "table-rows": 42352780
        }
      ]
    },
    {
      "consistent": true,
      "table-checksums": [
        {
          "url": "[REDACTED]://[REDACTED]:[REDACTED]@benchmark-[REDACTED]ql:5432/[REDACTED]",
          "table-name": "stock",
          "table-checksum": 123499618116277114,
          "table-rows": 14000000
        },
        {
          "url": "[REDACTED]://root:@benchmark-risingwave-frontend:4567/dev",
          "table-name": "stock",
          "table-checksum": 123499618116277114,
          "table-rows": 14000000
        }
      ]
    },
    {
      "consistent": false,
      "table-checksums": [
        {
          "url": "[REDACTED]://[REDACTED]:[REDACTED]@benchmark-[REDACTED]ql:5432/[REDACTED]",
          "table-name": "warehouse",
          "table-checksum": 4547342962216856301,
          "table-rows": 140
        },
        {
          "url": "[REDACTED]://root:@benchmark-risingwave-frontend:4567/dev",
          "table-name": "warehouse",
          "table-checksum": 0,
          "table-rows": 0
        }
      ]
    },
    {
      "consistent": true,
      "table-checksums": [
        {
          "url": "[REDACTED]://[REDACTED]:[REDACTED]@benchmark-[REDACTED]ql:5432/[REDACTED]",
          "table-name": "nation",
          "table-checksum": 4140424216966173119,
          "table-rows": 25
        },
        {
          "url": "[REDACTED]://root:@benchmark-risingwave-frontend:4567/dev",
          "table-name": "nation",
          "table-checksum": 4140424216966173119,
          "table-rows": 25
        }
      ]
    },
    {
      "consistent": true,
      "table-checksums": [
        {
          "url": "[REDACTED]://[REDACTED]:[REDACTED]@benchmark-[REDACTED]ql:5432/[REDACTED]",
          "table-name": "region",
          "table-checksum": 5923708037939308992,
          "table-rows": 5
        },
        {
          "url": "[REDACTED]://root:@benchmark-risingwave-frontend:4567/dev",
          "table-name": "region",
          "table-checksum": 5923708037939308992,
          "table-rows": 5
        }
      ]
    },
    {
      "consistent": true,
      "table-checksums": [
        {
          "url": "[REDACTED]://[REDACTED]:[REDACTED]@benchmark-[REDACTED]ql:5432/[REDACTED]",
          "table-name": "supplier",
          "table-checksum": -1496706393295477220,
          "table-rows": 10000
        },
        {
          "url": "[REDACTED]://root:@benchmark-risingwave-frontend:4567/dev",
          "table-name": "supplier",
          "table-checksum": -1496706393295477220,
          "table-rows": 10000
        }
      ]
    },
    {
      "consistent": true,
      "table-checksums": [
        {
          "url": "[REDACTED]://[REDACTED]:[REDACTED]@benchmark-[REDACTED]ql:5432/[REDACTED]",
          "table-name": "revenue1",
          "table-checksum": 7037181084742428403,
          "table-rows": 10000
        },
        {
          "url": "[REDACTED]://root:@benchmark-risingwave-frontend:4567/dev",
          "table-name": "revenue1",
          "table-checksum": 7037181084742428403,
          "table-rows": 10000
        }
      ]
    },
    {
      "consistent": true,
      "table-checksums": [
        {
          "url": "[REDACTED]://[REDACTED]:[REDACTED]@benchmark-[REDACTED]ql:5432/[REDACTED]",
          "table-name": "ch_benchmark_q1",
          "table-checksum": 582473082238383368,
          "table-rows": 15
        },
        {
          "url": "[REDACTED]://root:@benchmark-risingwave-frontend:4567/dev",
          "table-name": "ch_benchmark_q1",
          "table-checksum": 582473082238383368,
          "table-rows": 15
        }
      ]
    },
    {
      "consistent": true,
      "table-checksums": [
        {
          "url": "[REDACTED]://[REDACTED]:[REDACTED]@benchmark-[REDACTED]ql:5432/[REDACTED]",
          "table-name": "ch_benchmark_q2",
          "table-checksum": 4862096427301453156,
          "table-rows": 1880
        },
        {
          "url": "[REDACTED]://root:@benchmark-risingwave-frontend:4567/dev",
          "table-name": "ch_benchmark_q2",
          "table-checksum": 4862096427301453156,
          "table-rows": 1880
        }
      ]
    }
  ]
}

Error message/log

No response

To Reproduce

No response

Expected behavior

No response

How did you deploy RisingWave?

No response

The version of RisingWave

v2.1.0-rt.8

Additional context

No response

@cyliu0 cyliu0 added the type/bug Something isn't working label Dec 4, 2024
@github-actions github-actions bot added this to the release-2.2 milestone Dec 4, 2024
@cyliu0 cyliu0 changed the title Some table doesn't sync for PG CDC v2.1.0-rt.8 Some tables doesn't sync for PG CDC v2.1.0-rt.8 Dec 4, 2024
@StrikeW
Copy link
Contributor

StrikeW commented Dec 9, 2024

The problem is #19467 breaks a prerequisite for cdc tables. Previously, we require the pg publication is created if CREATE SOURCE returns success. But in #19467 we yield the first barrier before we created the source stream, so the publication may not have been created even CREATE SOURCE returns success.

In the log of Meta we can see that the customer table didn't added to the publication, because when the pipeline try to create table customer, it found that the publication didn't exist.

@StrikeW
Copy link
Contributor

StrikeW commented Dec 9, 2024

After #19467, the source data stream may not have been created even the source streaming job is created, since the initial barrier will be yielded to downstream before we successfully created the data stream.

Does the correctness of Share Source relies on source data stream must be created when CREATE SOURCE returns success ? cc @xxchan

@xxchan
Copy link
Member

xxchan commented Dec 9, 2024

Does the correctness of Share Source relies on source data stream must be created when CREATE SOURCE returns success ?

There's no such assumption

@xxchan
Copy link
Member

xxchan commented Dec 10, 2024

Wait, I just came up of a problem: sth like #19443 may occur.

We have a seek_to_latest in a newly created SourceExecutor. Since the reader creation can be delayed, it's theoretically possible that events happen in this order.

  1. create source. reader creating (with offset 0)
  2. create mv. backfill reader finishes creation first (with offset 0)
  3. produce 5 rows to kafka
  4. (problem here!) source seek to latest, and begins from offset 5. Then data from offset 0 to offset 5 is lost.

In other words, the assumption "source data stream must be created when CREATE SOURCE returns success" is needed (not precisely).

More precisely, events cannot happen in the order like above. (It can happen only when CREATE SOURCE, CREATE MV and Produce run concurrently).

(From the logs in https://buildkite.com/risingwavelabs/pull-request/builds/64080#0193b138-957e-4fa3-9572-51f9cd0be778, it seems indeed the problem.)

@xxchan
Copy link
Member

xxchan commented Dec 10, 2024

We may just avoid retrying for newly created source

I'm thinking that this can also fix the CDC issue (both 2 problem is that some work need to be done on creation). Then we don't need #19714 also. @StrikeW WDYT?

@StrikeW
Copy link
Contributor

StrikeW commented Dec 11, 2024

So forward the initial barrier barrier before the source data stream creation could lead to race condition for MV backfilling. And maybe there are more scenarios we don't know?

We may just avoid retrying for newly created source

I'm thinking that this can also fix the CDC issue (both 2 problem is that some work need to be done on creation). Then we don't need #19714 also. @StrikeW WDYT?

I thought about that before, but when external system cannot be reached, then the cluster will in crash loop state, since the source data stream cannot be built and #17807 still remain.

@StrikeW
Copy link
Contributor

StrikeW commented Dec 11, 2024

The semantic of initial barrier is to notify the downstream executor that upstream executor is inited. We concurrently poll barrier stream and the future to build source stream to avoid blocking the stream. But when we forward the initial barrier first before finishing the creation of source reader, the source executor actually doesn't finish its initialization.

We may just avoid retrying for newly created source

I'm thinking that this can also fix the CDC issue (both 2 problem is that some work need to be done on creation). Then we don't need #19714 also. @StrikeW WDYT?

Now I think it is risky to break this assumption for the source executor, which is special. We may need other mechanism to resolve #17807 for the source specifically.

edit: precisely it is not the forwarding of initial barrier, it should be the first barrier after the initial barrier from the code but the problem is similar.

@hzxa21
Copy link
Collaborator

hzxa21 commented Dec 11, 2024

precisely it is not the forwarding of initial barrier, it should be the first barrier after the initial barrier from the code but the problem is similar.

Exactly. IIUC, downstream actor treats the first barrier as the notification on successful creation of upstream source reader, which means upstream must block barriers until source reader creation is successful. How about having a special command attached to the barrier for such notification without relying on the barrier being the first barrier?

        // Poll the upstream to get the first barrier.
        let barrier = expect_first_barrier(&mut input).await?;
        let first_epoch = barrier.epoch;
        let owned_splits = barrier
            .initial_split_assignment(self.actor_ctx.id)
            .unwrap_or(&[])
            .to_vec();
        let is_pause_on_startup = barrier.is_pause_on_startup();
        yield Message::Barrier(barrier);

becomes

        // Init some states on first barrier
        let barrier = expect_first_barrier(&mut input).await?;
        let first_epoch = barrier.epoch;
        let owned_splits = barrier
            .initial_split_assignment(self.actor_ctx.id)
            .unwrap_or(&[])
            .to_vec();
        let is_pause_on_startup = barrier.is_pause_on_startup();
        yield Message::Barrier(barrier);
        
        // Poll upstream barriers until source reader finish creation
        while let message = stream.next().instrument_await("expect_first_barrier").await {
            let barrier = message
                .into_barrier()
                .expect("the message must be a barrier");
            yield Message::Barrier(barrier);
            if source_reader_created(&barrier) {
                break;
            }
            
        }

@StrikeW
Copy link
Contributor

StrikeW commented Dec 11, 2024

How about having a special command attached to the barrier for such notification without relying on the barrier being the first barrier?

Do you mean when the source executor finished creating the source reader, it should notify the Meta to push this special command to the streaming pipeline to let downstream executors know source reader is ready?

@hzxa21
Copy link
Collaborator

hzxa21 commented Dec 11, 2024

In other words, the assumption "source data stream must be created when CREATE SOURCE returns success" is needed (not precisely).

@xxchan How do we make sure that right now if we start the work immediately without waiting for the ckpt of the first barrier completes? If we start doing something immediately in the actor after seeing the first barrier without waiting for ckpt, isn't it possible that upstream actor in other parallelism is still creating the source reader?

@xxchan
Copy link
Member

xxchan commented Dec 12, 2024

@hzxa21 I don't fully get it. Previous implementation is meta will wait for the first barrier to be collected. (And SourceExec will first do work and then yield barrier) Do you mean that's not enough and we should wait for ckpt?

@hzxa21
Copy link
Collaborator

hzxa21 commented Dec 17, 2024

Discussed offline with @xxchan and @StrikeW. There are several tricky assumptions here and there so let me note them down here.

Assumptions

  1. On source backfill executor receiving its 1st barrier, we need to ensure that the upstream source reader is successfully created.
  2. On CREATE TABLE ... FROM <cdc_source> for PG, we need to ensure that the PG publication is successfully created before meta sending out the 1st barrier for table creation. Otherwise, validation in the meta side will fail. Since PG publication is created along with the cdc source reader, this assumption means that source executor needs to finish source reader creation before yielding the first barrier.

History of Changes

  1. Before feat(stream): wait committed epoch in state table init_epoch #19223, source executor will finish source reader creation before yielding the 1st barrier source executor receives. Assumption 1 and 2 both hold.
  2. After feat(stream): wait committed epoch in state table init_epoch #19223, source executor will yield its 1st barrier first, finish source reader creation, then yield the 2nd barrier.
    • Assumption 1 holds because source backfill executor is created on creating the downstream MV of the source. The 1st barrier source backfill executor receives must at least be the 2nd barrier of the upstream source so source reader must have been created.
    • Assumption 2 is broken because CREATE SOURCE will succeed once the 1st barrier source executor receives completes. If CREATE TABLE .. from <cdc_source> happen very fast after CREATE SOURCE, the 2nd barrier source executor receives may still be in-flight.
  3. After fix(source): revert create source reader with retry #19754, source executor will yield barriers concurrently during source reader creation. Therefore, it is possible that the 2nd barrier is yield before source reader creation completes. Neither assumption 1 nor 2 holds.

@hzxa21
Copy link
Collaborator

hzxa21 commented Dec 17, 2024

The cleanest way to fix this issue is to wait until the 2nd barrier completes before we claim CREATE SOURCE to be successful, which is similar to what we did for MV creation (1st barrier to trigger creation and wait for future barrier to report progress as completed)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment