Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

discussion: append-only sink #7045

Closed
st1page opened this issue Dec 23, 2022 · 8 comments
Closed

discussion: append-only sink #7045

st1page opened this issue Dec 23, 2022 · 8 comments

Comments

@st1page
Copy link
Contributor

st1page commented Dec 23, 2022

In #7035, our system will derive hidden columns as the primary key for the user which is even not declared by the user.

In Flink, there are three kinds of sinks encoding that express the changes of a table, ApendOnly, Retractable and Upsert https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/concepts/dynamic_tables/#table-to-stream-conversion. The sink we have exactly implemented is exactly upsert sink. We will derive a primary key for the stream and operate the table with the key. Our system actually converts the append-only stream to an upsert stream implicitly by adding the row_id column, knowing that all operations are inserts and the keys will never conflict.

So we need to introduce append-only sink

Unresolved question: how do users control whether the sink is append-only or upsert?

In Flink, users can easily define the primary key of the external dynamic table(sink) https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/.

The JDBC sink operate in upsert mode for exchange UPDATE/DELETE messages with the external system if a primary key is defined on the DDL, otherwise, it operates in append mode and doesn’t support to consume UPDATE/DELETE messages.

But in our system, the primary key on the MVew or Sink is inferred internally, which is a black box for users.

E.g. (I did not experient in flink SQL, but I think It is like that) In flink we can easily create an append-only sink for aggregation results which is not easy to expressed in our system:

CREATE TABLE events(page_id INT)
CREATE TABLE PVresult (
  cnt INT,
  page_id INT,
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
   'table-name' = 'users_cnt'
);

-- write data into the JDBC table from the other table "T"
INSERT INTO PVresult 
SELECT page_id, count(*) as cnt FROM T;

INSERT INTO events values (1),(2);

SELECT * from PVresult;
page_id | cnt
---------------------
1       | 1
2       | 1
INSERT INTO events values (1),(4);

SELECT * from PVresult;
page_id | cnt
---------------------
1       | 1
1       | 2
2       | 1
4       | 1

Because the user does not define the primary key of the result table, all historical results will appear in the result table and the behavior is semantically consistent with the statement "SELECT ... INSERT ..."

But Risingwave use "create materialized view" statement, which means it can just maintain the newest result. So it produces an upsert stream naturally.

@github-actions github-actions bot added this to the release-0.1.16 milestone Dec 23, 2022
@liurenjie1024 liurenjie1024 removed this from the release-0.1.16 milestone Dec 23, 2022
@fuyufjh
Copy link
Member

fuyufjh commented Dec 27, 2022

It looks like the problem is not only about "append-only sink", but rather about the syntax to define a Sink. In other words: would Flink's syntax be better?

@fuyufjh
Copy link
Member

fuyufjh commented Dec 28, 2022

@tabVersion is working on a PRD describing the user-facing procedure.

@fuyufjh
Copy link
Member

fuyufjh commented Dec 29, 2022

I am considering the pros & cons of our current approach i.e. the CREATE SINK syntax. It seems that some known problems can be solved in this way:

Problem: Whether the sink should include hidden columns? #7035

My answer is definitely "No". Hidden columns, as its name implies, these columns are only used by RisingWave internally and should not be exposed to users in any form, including materialized views and sinks.

Problem: How do users define the primary key?

This can be discussed in 3 cases:

  1. For sink without PK e.g. Parquet files in S3
    • Great. Nothing to worry about.
  2. For updatable sink with predefined PK e.g. a MySQL table
    • Get the primary key definition from external systems. For example, for JDBC connectors, the primary keys can be retrieved from getPrimaryKeys() in DatabaseMetaData API.
    • Note that even if PK is unknown, RisingWave can still write data normally because it knows all the fields. For example, for JDBC connectors, it can issue a DELETE ... WHERE (c1 = ... and c2 = ... and ...) which contains all the columns rather than PK columns. It may be low-efficient, but it does work technically.
  3. For updatable sink with we-defined PK e.g. Redis
    • We can derive the PK columns and sink results accordingly. In most cases, e.g. GROUP BY queries or a series of JOINs with dimension tables, the PK derivation rules are quite easy to understand and we can just write a document to explain it.
    • In this case, we MUST provide a way to let users specify the order of PK columns. ORDER BY sounds like a good choice suggested by @st1page.

Problem: how do users control whether the sink is append-only or upsert? (this issue)

I think this problem can be divided into 2 parts.

First, whether a SQL query is append-only or upsert is not controlled by users. Instead, it's a property of the query itself and could only be derived by our optimizer. By the way, EMIT ON WINDOW CLOSE, as a clause of the SQL query, can alter a query to be append-only by deferring updates until windows closed.

Second, whether a Sink connector is append-only or upsert is also not controlled by users. It is an intrinsic property of the Sink e.g. Kafka/Pulsar is append-only, while Redis/JDBC is updatable, etc.

When creating a Sink with a query, what we should do is check whether they are compatible: if the query is append-only, either append-only or updatable sink works; but for updatable queries, the sink connector must be updatable as well.

A special case is JDBC connector, as mentioned in the issue description, Flink's JDBC connector decides the behavior according to PK's existence in the users' definition. For us, check Case 3 in the previous section: we can just ask for the external system for the PKs.

Summary

It looks like our current syntax can still work, so how about keeping it as is now?

@neverchanje
Copy link
Contributor

neverchanje commented Jan 3, 2023

I personally feel that EMIT ON WINDOW CLOSE is difficult for people to understand, even for those SQL experts. When they simply want to write queries to create a Materialized View, why do they need to control whether the data is append-only or not? The only reason they want to use EMIT ON WINDOW CLOSE is because they want to output the data to an external sink, which requires append-only semantics. As a result, my initial proposal for the syntax was (No idea if it's implementable):

  • Stay what it is now for creating a materialized view. No EMIT ON WINDOW CLOSE
  • When it's facing an append-only sink, such as Delta Lake, it can only be created viaCREATE APPEND_ONLY SINK.
  • When it comes to an updateable sink, i.e, Postgres, either APPEND_ONLY SINK or UPDATABLE SINK can be created.

For now, I think the only reason for me to accept EMIT ON WINDOW CLOSE is when a streaming query, with it and without it, will be entirely different DAGs. If this is the case, I would like to see if there's any workaround for it.

@fuyufjh
Copy link
Member

fuyufjh commented Jan 3, 2023

I personally feel that EMIT ON WINDOW CLOSE is difficult for people to understand, even for those SQL experts. When they simply want to write queries to create a Materialized View, why do they need to control whether the data is append-only or not? The only reason they want to use EMIT ON WINDOW CLOSE is because they want to output the data to an external sink, which requires append-only semantics. As a result, my initial proposal for the syntax was (No idea if it's implementable):

  • Stay what it is now for creating a materialized view. No EMIT ON WINDOW CLOSE
  • When it's facing an append-only sink, such as Delta Lake, it can only be created viaCREATE APPEND_ONLY SINK.
  • When it comes to an updateable sink, i.e, Postgres, either APPEND_ONLY SINK or UPDATABLE SINK can be created.

For now, I think the only reason for me to accept EMIT ON WINDOW CLOSE is that a streaming query, with it and without it, will be entirely different DAGs. If this is the case, I would like to see if there's any workaround for it.

Yes, that's also my concern. From either us or the user's perspective, query with EMIT ON WINDOW CLOSE behaves differently from the query without it. I would like to make this change of behavior more explicit.

@fuyufjh
Copy link
Member

fuyufjh commented Jan 3, 2023

@tabVersion is working on a PRD describing the user-facing procedure.

Link to the doc: https://www.notion.so/risingwave-labs/Sink-User-Behavior-Guideline-be17dc1d7c89415388bb7bb43effcbe9

@github-actions
Copy link
Contributor

github-actions bot commented Mar 5, 2023

This issue has been open for 60 days with no activity. Could you please update the status? Feel free to continue discussion or close as not planned.

@tabVersion
Copy link
Contributor

As we encountered several issues with deducted internal primary keys that do not match the downstream's primary key, leading to panic when creating sinks, we are adjusting the sink's position.
Now we removing constraints in the sink module.

  • When data is passed to the sink module, it should no longer be seen maintained by Risingwave, which means we don't enforce constraints on it. Sink is only responsible for writing to the downstream with given data and ops.
    • for jdbc-like downstream (have a schema), we see the pk from the downstream as the source of truth. Users are responsible for writing the correct query.
    • for redis-like downstream (do not have a schema), we require users to provide the key's and value's encoding format.

cc @fuyufjh @st1page @wenym1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants