-
Notifications
You must be signed in to change notification settings - Fork 83
/
q02_minimum_cost_supplier.py
145 lines (119 loc) · 4.82 KB
/
q02_minimum_cost_supplier.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
TPC-H Problem Statement Query 2:
The Minimum Cost Supplier Query finds, in a given region, for each part of a certain type and size,
the supplier who can supply it at minimum cost. If several suppliers in that region offer the
desired part type and size at the same (minimum) cost, the query lists the parts from suppliers with
the 100 highest account balances. For each supplier, the query lists the supplier's account balance,
name and nation; the part's number and manufacturer; the supplier's address, phone number and
comment information.
The above problem statement text is copyrighted by the Transaction Processing Performance Council
as part of their TPC Benchmark H Specification revision 2.18.0.
"""
import datafusion
from datafusion import SessionContext, col, lit, functions as F
from util import get_data_path
# This is the part we're looking for. Values selected here differ from the spec in order to run
# unit tests on a small data set.
SIZE_OF_INTEREST = 15
TYPE_OF_INTEREST = "BRASS"
REGION_OF_INTEREST = "EUROPE"
# Load the dataframes we need
ctx = SessionContext()
df_part = ctx.read_parquet(get_data_path("part.parquet")).select(
"p_partkey", "p_mfgr", "p_type", "p_size"
)
df_supplier = ctx.read_parquet(get_data_path("supplier.parquet")).select(
"s_acctbal",
"s_name",
"s_address",
"s_phone",
"s_comment",
"s_nationkey",
"s_suppkey",
)
df_partsupp = ctx.read_parquet(get_data_path("partsupp.parquet")).select(
"ps_partkey", "ps_suppkey", "ps_supplycost"
)
df_nation = ctx.read_parquet(get_data_path("nation.parquet")).select(
"n_nationkey", "n_regionkey", "n_name"
)
df_region = ctx.read_parquet(get_data_path("region.parquet")).select(
"r_regionkey", "r_name"
)
# Filter down parts. Part names contain the type of interest, so we can use strpos to find where
# in the p_type column the word is. `strpos` will return 0 if not found, otherwise the position
# in the string where it is located.
df_part = df_part.filter(
F.strpos(col("p_type"), lit(TYPE_OF_INTEREST)) > lit(0)
).filter(col("p_size") == lit(SIZE_OF_INTEREST))
# Filter regions down to the one of interest
df_region = df_region.filter(col("r_name") == lit(REGION_OF_INTEREST))
# Now that we have the region, find suppliers in that region. Suppliers are tied to their nation
# and nations are tied to the region.
df_nation = df_nation.join(
df_region, left_on=["n_regionkey"], right_on=["r_regionkey"], how="inner"
)
df_supplier = df_supplier.join(
df_nation, left_on=["s_nationkey"], right_on=["n_nationkey"], how="inner"
)
# Now that we know who the potential suppliers are for the part, we can limit out part
# supplies table down. We can further join down to the specific parts we've identified
# as matching the request
df = df_partsupp.join(
df_supplier, left_on=["ps_suppkey"], right_on=["s_suppkey"], how="inner"
)
# Locate the minimum cost across all suppliers. There are multiple ways you could do this,
# but one way is to create a window function across all suppliers, find the minimum, and
# create a column of that value. We can then filter down any rows for which the cost and
# minimum do not match.
# The default window frame as of 5/6/2024 is from unbounded preceding to the current row.
# We want to evaluate the entire data frame, so we specify this.
window_frame = datafusion.WindowFrame("rows", None, None)
df = df.with_column(
"min_cost",
F.window(
"min",
[col("ps_supplycost")],
partition_by=[col("ps_partkey")],
window_frame=window_frame,
),
)
df = df.filter(col("min_cost") == col("ps_supplycost"))
df = df.join(df_part, left_on=["ps_partkey"], right_on=["p_partkey"], how="inner")
# From the problem statement, these are the values we wish to output
df = df.select(
"s_acctbal",
"s_name",
"n_name",
"p_partkey",
"p_mfgr",
"s_address",
"s_phone",
"s_comment",
)
# Sort and display 100 entries
df = df.sort(
col("s_acctbal").sort(ascending=False),
col("n_name").sort(),
col("s_name").sort(),
col("p_partkey").sort(),
)
df = df.limit(100)
# Show results
df.show()