Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

design: deprecate concept of SUBSOURCE #26881

Conversation

sploiselle
Copy link
Contributor

Motivation

Proposes a design for MaterializeInc/database-issues#6051

@sploiselle
Copy link
Contributor Author

@bkirwi sagely asks, "Why TABLE instead of some other noun in MZ?" My answer is just that TABLEs are a well understood concept, and these tables are vaguely like other tables in MZ (except they can't be written to). Maybe MATERIALIZED VIEW is the best analog (persisted, cannot be written to), but the idea that you'd type CREATE MATERIALIZED VIEW foo FROM SOURCE (referenced_object) to add these new object feels clunky to me.

@sjwiesman
Copy link
Contributor

sjwiesman commented May 3, 2024

Fwiw, I like table as an abstraction and hope someday we even allow writing directly to this tables along side sources.

Wanted to jot down some thoughts from IRL conversations of ways this is useful beyond user understanding. You mention being able to create a second table from a Kafka source to enable schema migrations. This would also unlock a form of truncation for append only sources. With just the features in this design doc we would rely on Kafka compaction to delete rows, but one could imagine extending the syntax to support specifying the starting offset behavior.

CREATE TABLE truncated FROM SOURCE  kafka_src START OFFSET (0, 10, 100);

CREATE TABLE truncated FROM SOURCE  kafka_src START TIMESTAMP -10000;

We can extend this logic to webhook sources, assuming the endpoint is tied to the source object. Dropping and recreating webhook sources is difficult today because events can't easily be replayed. Running two webhook sources in parallel is tricky because the upstream system has to be configured to point at both or a proxy put in place. But with a source writing to two tables, users have the ability to seed the new shard for until it contains enough data to satisfy downstream temporal filters and then seamlessly blue/green consuming views.

@sjwiesman
Copy link
Contributor

sjwiesman commented May 5, 2024

It just occurred to me this could also open the door to solving MaterializeInc/database-issues#6771 for sources

Writing down errors could be a configuration on the table. Or you could have a table that only contains errrors.

CREATE TABLE t FROM kafka_srcs IGNORE ERRORS;

CREATE TABLE dlq FROM kafka_srcs ERRORS ONLY;

@sploiselle
Copy link
Contributor Author

Seth's idea seems like it could propose a novel approach to MaterializeInc/database-issues#6771 for sources.

@benesch
Copy link
Contributor

benesch commented May 5, 2024

It just occurred to me this could also open the door to solving https://github.com/MaterializeInc/database-issues/issues/6771 for sources

Whether we emit or suppress decoding errors seems orthogonal to me to whether we call these things subsources or tables! We've had https://github.com/MaterializeInc/database-issues/issues/4040 for a while, which proposes adding various types of WITH (IGNORE <BLAH> ERRORS = TRUE) options to sources.

@sploiselle
Copy link
Contributor Author

sploiselle commented May 5, 2024

Whether we emit or suppress decoding errors seems orthogonal to me to whether we call these things subsources or tables!

The design doc included in this PR is not just a discussion of whether ingestions export data to a noun called subsources or a tables. It deals substantially in a design that move progress collection data in "the source itself" and Kafka sources growing ingestion exports (irrespective of whether they're called subsources or tables).

I think there's a graceful implementation of Seth's idea that requires Kafka sources to have multiple ingestion exports, so it seems germane to mention here.

edit: the idea that I'm supportive of here is having the ability to split Ok and Err into separate collections, which seems like it's greatly simplified if you have can have multiple outputs from all sources. That's not currently possible with Kafka, which only gives you one output. However, after leaving this comment, I realize you could hack together the same effect by creating a kind of bizarro derivative subsource on a Kafka source.


Doing both of these lets us remove the concept of subsources from Materialize
entirely. It, by definition, also lets us get rid of the notion of "progress
subsources," which is a concept that users often find confusion.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
subsources," which is a concept that users often find confusion.
subsources," which is a concept that users often find confusing.


- Users no longer need to use the keyword `SUBSOURCE` to interact with
Materialize in any capacity; and we do this without deprecating any
functionality whatsoever.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is implied, but might be worth explicitly stating that we're also going to remove subsource from any system catalog table names or rows. (Since these aren't quite subsource keywords.)

Comment on lines 188 to 190
- Do we want to mark the object type for "read-only" tables as something other
than "table?" Seems like it would be nice to make clear to someone exploring
an environment which tables support inserts vs. which don't.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it depends the interface! I'd suggest we use "table" for anything programmatic—i.e., these tables should be in mz_tables, be labeled as objects in mz_objects, that sort of thing. But then the commands for humans (SHOW TABLES,SHOW OBJECTS) probably want to very prominently display read-only vs not. And the console of course can use a prominent badge or tooltip or whatever makes sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Comment on lines 165 to 168
- ID, persist shard becomes regular `TABLE` no longer associated
with the source whatsoever. Users can continue depending on this, but they
should drop it after the release at their leisure
- Durable statement becomes `CREATE TABLE curr_source_name ()`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any users depending on these sources today? If not ... maybe we can just rip it out?

This seems like a piece of the migration we could do incrementally. E.g., just ship a change that prevents users from depending on sources directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if any users depend on them today but this seems like the simplest and most fool-proof approach. I'm not sure what exactly you mean by preventing users from depending on sources directly, though––can you elaborate?

Comment on lines 185 to 187
- Terraform and DBT: Marta, Seth, and Bobby say that we can simply roll forward
with a breaking change and teach users what the new version ought to do. Is
that right?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow this, but it doesn't seem quite right to me! That or I'm not understanding correctly. We need to make sure that after shipping the breaking change to Materialize, running terraform apply against an existing project with no changes does not cause sources to be recreated, and similarly with the dbt adapter. In the worst case we can write a migration guide detailing changes that must be apply to the code in Terraform and dbt projects, but that's a pretty disruptive thing to force on users, since they won't have any control over when the breaking change to Materialize goes out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the dbt workflow is going to be as clean as we'd hope, but...captured my current understanding of how this comes together in this gist. I haven't actually tried to use model versioning, so this is all off the top of my head. Roping in @jtcohen6 for a sanity check! 🫠

- `TEXT COLUMNS`, `IGNORE COLUMNS` move from "current source" to `CREATE
TABLE`
- Progress collection
- ID, persist hard becomes `SOURCE`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- ID, persist hard becomes `SOURCE`
- ID, persist shard becomes `SOURCE`

Comment on lines 154 to 160
- Current source
- ID, persist shard becomes read-only `TABLE`
- Durable statement becomes `CREATE TABLE curr_source_name FROM SOURCE
curr_progress_name ( topic_name )`
- Progress collection
- ID, persist shard becomes `SOURCE`
- Durable statement becomes `CREATE SOURCE curr_progress_name FROM...`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems clean from the perspective of downstream views, but will be quite disruptive to the Terraform and dbt adapters. Left a comment along these lines below. We should see if there's a way to have the Terraform and dbt adapters automatically adapt to the new setup.

@benesch
Copy link
Contributor

benesch commented May 6, 2024

This is really exciting, Sean! Thanks for writing this up.

@benesch
Copy link
Contributor

benesch commented May 6, 2024

The idea that I'm supportive of here is having the ability to split Ok and Err into separate collections, which seems like it's greatly simplified if you have can have multiple outputs from all sources. That's not currently possible with Kafka, which only gives you one output. However, after leaving this comment, I realize you could hack together the same effect by creating a kind of bizarro derivative subsource on a Kafka source.

That's just a performance optimization though, right? persist_source already splits the shard into Ok and Err streams, right?

I might be misunderstanding, but I really feel like we already have all the pieces that we need to support https://github.com/MaterializeInc/database-issues/issues/6771, and we don't need to expand the scope of this migration by considering it in the design!

For example, one proposal for MaterializeInc/database-issues#6771 is to add a special $errors syntax, so that e.g. foo$errors returns the errors stream. AFAICT, that design would be totally orthogonal to the subsources-as-tables design!

@sploiselle
Copy link
Contributor Author

That's just a performance optimization though, right? persist_source already splits the shard into Ok and Err streams, right?

Yeah; maybe the opportunity for a semantic optimization moreso than performance. My point is that if we had multiple outputs from a Kafka source, we could easily split Ok and Err into separate collections. This would give users the option to never encounter definite errors when querying a topic/table/ingestion export, while still tracking what all of those errors were elsewhere (maybe a user cares about this for monitoring).

However, it sounds like you have a clear and distinct idea for MaterializeInc/database-issues#6771, though, that is not this. Not a big deal, just wanted to defend that the common thread Seth and I saw was not orthogonal to this design.

Comment on lines 97 to 100
`kafka_src` is again the progress relation for the source. The data is in a
table named `my_t`. You would never be able to specify more than one table in
the original `CREATE SOURCE` statement because you would have to two outputs
from the same source with the same schema, which we should disallow.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
`kafka_src` is again the progress relation for the source. The data is in a
table named `my_t`. You would never be able to specify more than one table in
the original `CREATE SOURCE` statement because you would have to two outputs
from the same source with the same schema, which we should disallow.
`kafka_src` is again the progress relation for the source. The data is in a
table named `my_t`. You would never be able to specify more than one table in
the original `CREATE SOURCE` statement because you would have two outputs
from the same source with the same schema, which we should disallow.

Comment on lines 185 to 187
- Terraform and DBT: Marta, Seth, and Bobby say that we can simply roll forward
with a breaking change and teach users what the new version ought to do. Is
that right?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the dbt workflow is going to be as clean as we'd hope, but...captured my current understanding of how this comes together in this gist. I haven't actually tried to use model versioning, so this is all off the top of my head. Roping in @jtcohen6 for a sanity check! 🫠

Comment on lines 188 to 190
- Do we want to mark the object type for "read-only" tables as something other
than "table?" Seems like it would be nice to make clear to someone exploring
an environment which tables support inserts vs. which don't.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

```sql
CREATE SOURCE kafka_src
FROM KAFKA CONNECTION kafka_conn (TOPIC 't')
FOR ALL TABLES
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a chat with @benesch: does this imply that there would be a FOR NO TABLES (or similar) option that only creates the source, but suppresses automatic table creation? This would be useful so we can decouple source management and table creation in dbt.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also curious if that feature already exists today as FOR TABLES ().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Today, you'd get errors for not having created any objects in CREATE SOURCE. However, we could introduce support for FOR TABLES (). I dont know exactly what would happen w/r/t frontier management w/ PG and MySQL sources, but it's something we could figure out.

@benesch
Copy link
Contributor

benesch commented May 8, 2024

That's just a performance optimization though, right? persist_source already splits the shard into Ok and Err streams, right?

Yeah; maybe the opportunity for a semantic optimization moreso than performance. My point is that if we had multiple outputs from a Kafka source, we could easily split Ok and Err into separate collections. This would give users the option to never encounter definite errors when querying a topic/table/ingestion export, while still tracking what all of those errors were elsewhere (maybe a user cares about this for monitoring).

However, it sounds like you have a clear and distinct idea for MaterializeInc/database-issues#6771, though, that is not this. Not a big deal, just wanted to defend that the common thread Seth and I saw was not orthogonal to this design.

Ah, yeah, sorry, we're just thinking about this differently! I wouldn't want to do something special for subsource-tables, because views and matviews have exactly the same underlying oks and errs collections.

Here's how it shakes out in my head:

         source             view       matview   
        /       \           /   \      /   \  
      table_a   table_b    oks errs   oks errs
      /   \      /  \    
     oks errs  oks errs  

Vs what I think you and Seth were describing for multi-output sources:

            source ----------------+
           /   |    \               \
          /    |     \               \
         /     |      \               \
        /      |       \               \
       /       |        \               \
table_a_oks table_a_errs  table_b_oks  table_b_errs

Which makes total sense when considering multi-output sources in isolation! But I don't immediately see how we'd generalize this design to views or matviews.

@sploiselle
Copy link
Contributor Author

@benesch Yeah, totally! I was not suggesting that this is the way but just that it was something we could do. Agreed that the lack of generalization here isn't as ideal as what you've envisioned.

@sploiselle sploiselle force-pushed the deprecate-subsource-design-doc branch 2 times, most recently from 3960b2d to 8dc1d55 Compare May 9, 2024 01:16
Comment on lines 48 to 49
normalizing `TEXT COLUMN` references. Purification should be limited to only
do work that _must_ be performed asynchronously. (Note that _should_ is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small note: purification is also (correctly, IMO) used for work that should only be performed once, when the user types the SQL into Materialize, vs on every boot, when the SQL is loaded from the catalog. That sort of work is not always asynchronous.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sure––that's a good call. The issue I have is moreso where the code lives vs. when it is called. Will clarify.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we split up purification into two stages to make this more explicit? async, and then normalization (or whatever other one-off work exists).

The annoyance here is that there is one-off work which we currently do in purification which very closely resembles other work we have to do post-purification which depends on knowing the catalog state when we rejoin the main coord thread (e.g. we have to know all of a source's subsources and cannot assume that the set available to us in purification is the set that exists when we rejoin the main coord thread).

I guess the reason that I'm harping on this which I haven't clearly articulated is that there is a world in which creating a source and altering a source can use the exact same code paths w/r/t managing the subsources that it offers. I'll amend the design doc to split these things out more neatly and clearly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's what I've had in the back of my mind. In case it moves it's in the section evocatively titled "Mind Palace."

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we split up purification into two stages to make this more explicit? async, and then normalization (or whatever other one-off work exists).

That makes sense to me!

@sploiselle sploiselle force-pushed the deprecate-subsource-design-doc branch from 8dc1d55 to 5010b54 Compare May 9, 2024 13:09
#### TEXT + IGNORE COLUMNS

These options can remain on `CREATE SOURCE` statements, and the `ADD SUBSOURCE`
variants of them will move to instead be `CREATE TABLE` variants. We could also
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you describe what this would look like for CREATE TABLE? Would it just be

CREATE TABLE my_new_table FROM SOURCE postgres_src (upstream_schema.upstream_table) WITH (TEXT COLUMNS (upstream_table.column_1), IGNORE COLUMNS (upstream_table.column_2))

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. They will be the same as the ADD SUBSOURCE options but as options on CREATE TABLE. This is what I meant by move.


## The Problems

- To complete #20208, we have to know the schema of the Kafka topic. This
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? I didn't see in the above doc why this is necessarily different from kafka sources today, where we also need to know their schema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CreateSubsourceStatement requires knowing the schema of the object.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure but isn't that also true today? I'm perhaps misunderstanding your phrasing here -- how does that constraint block the implementation of #20208?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this phrasing is clearer?

we have to know the schema of the Kafka topic when creating the subsource/table that it will write to.

- Subsource planning (generating `CreateSubsourceStatement`) currently happens
during purification before planning the `SOURCE`.
- Purification's current structure also means that we have to implement some
bits in purification (for new sources) and some in sequencing (when
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the issue here is that we have to do similar things in both purification and in sequencing based on whether we are creating a new source vs adding a subsource/table to an existing source?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

to separate "planning" and purification's async concerns, e.g.
`PurificationError` wraps some other error that expresses the semantics of
handling PostgreSQL `TEXT COLUMNS`.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about the 'genericizing' of subsource planning? Is that part of the success criteria or just a side-effect of this work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To generate the CREATE SUBSOURCE statements after planning the source, I intend to genericize subsource planning.

3. Restructure purification such that we track the names of the sources we've
been requested to create, as well as the "upstream" objects to which they
refer––i.e. instead of generating `Vec<CreateSubsourceStatement>` we should
generate `BTreeMap<SubsourceName, UpstreamObjectName>`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would make things so much clearer!

`UpstreamObjectName` in the code is likely to just be `UnresolvedItemName`.

5. After planning the primary source, plan all subsources by generating
`CreateSourceStatement`s using the name, reference mapping; we'll get the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
`CreateSourceStatement`s using the name, reference mapping; we'll get the
`CreateSubsourceStatement`s using the name, reference mapping; we'll get the

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, ty!


## Open questions

What have I overlooked that will receive blocking feedback on?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😏

Comment on lines +81 to +93
To support this, we can extend the `SourceConnection` trait further to
describe the kind of transformations we will allow to each type of source.
For PG and MySQL this is simple to envision as something like:

```rust
fn merge_external_data<T, O>(
&mut self, metadata: Vec<T>, options: Vec<O>
) -> Result<(), Err>
```

Then, being able to add subsources to a source will just require implementing a
merge algorithm for the new metadata and options.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this idea a lot - will definitely simplify the work in adding new source types going forward and understanding how they work

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mentioned elsewhere but I added a "Mind Palace" section that we might be able to just write the initial impls of a source in terms of this, and then every new source supports ALTER SOURCE...ADD SUBSOURCE by default.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes that context was helpful, thanks!

```

This will let us ask for the given schema of a subsource after the source
itself is planned.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we validate the options like TEXT COLUMNS on the top-level source plan before we've requested the schema from the relevant subsources? Or will we start allowing invalid references in those options in the top-level source statement and throw an error when we plan all the subsources?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TEXT COLUMNS and IGNORE COLUMNS only need to validate that they refer to valid columns in the source's "subsource catalog." That should just be done as part of normalizing them––i.e. when we figure out which object they refer to, we should write down that fully qualified name, and if we can't find it, we should error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idk if this makes things any clearer, but I added a section around some of the unification I'm hoping to achieve between creating and modifying sources. This might just end up being a conceptual, rather than concrete, framework for me. idk how gnarly the typing will get.

@sploiselle sploiselle force-pushed the deprecate-subsource-design-doc branch from 5010b54 to 8dca808 Compare May 9, 2024 14:57
the only distinction being that new sources start from the empty state, and
modified sources start with whatever state they had when they were modified.

I am not firmly committed to this idea, but the gist of it seems sound. If we
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also agree the gist seems sound but it could get tricky in the implementation! I would vote for a first pass time-boxed attempt at this to see if you can get it to work

Copy link
Contributor

@rjobanp rjobanp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

responses / updates LGTM on the subsource planning doc!

https://github.com/MaterializeInc/materialize/issues/24843, and also additional
work to support multi-output sources in Kafka https://github.com/MaterializeInc/materialize/issues/26994.

#### System tables
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will have a pretty significant impact in the console, so it'd be good to get @RobinClowers' 👀. More broadly, could you expand on how this affects existing core system catalog tables like mz_sources, mz_source_status_history, or mz_source_statistics?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looping me in Marta, I have the same questions about status and statistics, but the change outlined here is pretty straightforward for the console.

Copy link
Contributor

@guswynn guswynn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Signing off from my end!

One side thing I want to note is that I plan at some point to split mz_source_statistics into mz_source_statistics and mz_subsource_statistics, as sources and subsources produce different sets of statistics. I suppose after this change, the latter would be something like mz_ingested_table_statistics`?

@sploiselle
Copy link
Contributor Author

At @bosconi 's request I've moved these to Notion.

@sploiselle sploiselle closed this May 21, 2024
@sploiselle sploiselle deleted the deprecate-subsource-design-doc branch May 21, 2024 15:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants