Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Temporal join may lead to scheduling failure. #14843

Closed
shanicky opened this issue Jan 29, 2024 · 4 comments · Fixed by #14848
Closed

Temporal join may lead to scheduling failure. #14843

shanicky opened this issue Jan 29, 2024 · 4 comments · Fixed by #14848
Assignees
Labels
type/bug Something isn't working
Milestone

Comments

@shanicky
Copy link
Contributor

shanicky commented Jan 29, 2024

Describe the bug

No response

Error message/log

No response

To Reproduce

-- default parallelism = 10;
create table t1(v1 int primary key);
create table t2(v1 int primary key);

-- t1 and t2 have same parallelism as 10
alter table t2 set parallelism = 3;

create table t(v1 int, v2 int) append only;

dev=> create materialized view m as select a.v1 from (select a.v1, a.v2 from t a) a left join t1 for system_time as of proctime() as kt1 on a.v1 = kt1.v1 left join t2 for system_time as of proctime() as kt2 on a.v1 = kt2.v1  ;
ERROR:  Failed to run the query

Caused by these errors (recent errors listed first):
  1: gRPC request to meta service failed: Internal error
  2: Failed to schedule: {Failed(GlobalId(32)), Failed(GlobalId(17)), Failed(GlobalId(31)), Failed(GlobalId(19)), Failed(GlobalId(29))}


dev=> explain create materialized view m as select a.v1 from (select a.v1, a.v2 from t a) a left join t1 for system_time as of proctime() as kt1 on a.v1 = kt1.v1 left join t2 for system_time as of proctime() as kt2 on a.v1 = kt2.v1  ;
 StreamMaterialize { columns: [v1, t._row_id(hidden), t2.v1(hidden)], stream_key: [t._row_id, v1], pk_columns: [t._row_id, v1], pk_conflict: NoCheck }
 └─StreamExchange { dist: HashShard(t.v1, t._row_id) }
   └─StreamTemporalJoin { type: LeftOuter, predicate: t.v1 = t2.v1 }
     ├─StreamTemporalJoin { type: LeftOuter, predicate: t.v1 = t1.v1 }
     │ ├─StreamExchange { dist: HashShard(t.v1) }
     │ │ └─StreamTableScan { table: t, columns: [v1, _row_id] }
     │ └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(t1.v1) }
     │   └─StreamTableScan { table: t1, columns: [v1] }
     └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(t2.v1) }
       └─StreamTableScan { table: t2, columns: [v1] }
(10 rows)

Expected behavior

No response

How did you deploy RisingWave?

No response

The version of RisingWave

No response

Additional context

No response

@shanicky shanicky added the type/bug Something isn't working label Jan 29, 2024
@github-actions github-actions bot added this to the release-1.7 milestone Jan 29, 2024
@st1page
Copy link
Contributor

st1page commented Jan 29, 2024

I think it is optimizer's bug and the optimizer can not assume the two tables have the same distribution, so an exchange between the two join is needed

   └─StreamTemporalJoin { type: LeftOuter, predicate: t.v1 = t2.v1 }
     ├─StreamTemporalJoin { type: LeftOuter, predicate: t.v1 = t1.v1 }

@st1page
Copy link
Contributor

st1page commented Jan 29, 2024

The regular join is ok because it does not need to obey the outer side's distribution but temporal join does... @chenzl25 Can you take a look when you are free? the user finding the issue has worked around by copying the column in the inner table.

dev=> explain create materialized view m as select a.v1 from (select a.v1, a.v2 from t a) a left join t1 as kt1 on a.v1 = kt1.v1 left join t2  as kt2 on a.v1 = kt2.v1  ;
                                                                      QUERY PLAN                                                                       
-------------------------------------------------------------------------------------------------------------------------------------------------------
 StreamMaterialize { columns: [v1, t._row_id(hidden), t2.v1(hidden)], stream_key: [t._row_id, v1], pk_columns: [t._row_id, v1], pk_conflict: NoCheck }
 └─StreamExchange { dist: HashShard(t.v1, t._row_id) }
   └─StreamHashJoin { type: LeftOuter, predicate: t.v1 = t2.v1 }
     ├─StreamHashJoin { type: LeftOuter, predicate: t.v1 = t1.v1 }
     │ ├─StreamExchange { dist: HashShard(t.v1) }
     │ │ └─StreamTableScan { table: t, columns: [v1, _row_id] }
     │ └─StreamExchange { dist: HashShard(t1.v1) }
     │   └─StreamTableScan { table: t1, columns: [v1] }
     └─StreamExchange { dist: HashShard(t2.v1) }
       └─StreamTableScan { table: t2, columns: [v1] }

@st1page
Copy link
Contributor

st1page commented Jan 29, 2024

maybe we can derive the temporal join's distribution as UpstreamHashShard in optimizer?

@chenzl25
Copy link
Contributor

chenzl25 commented Jan 29, 2024

I believe enforcing a shuffle regardless of the distribution of the temporal join LHS is a way to address this issue. By doing so, we can still benefit from the temporal join distribution while eliminating the need for a shuffle in the regular join.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants