Skip to content

Commit

Permalink
Merge branch 'main' into xzhseh/sql-udf-binding-update
Browse files Browse the repository at this point in the history
  • Loading branch information
xzhseh authored Jan 10, 2024
2 parents 173fa27 + cf9a2db commit 46d949e
Show file tree
Hide file tree
Showing 76 changed files with 2,463 additions and 1,005 deletions.
9 changes: 4 additions & 5 deletions ci/scripts/run-backfill-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ test_replication_with_column_pruning() {
echo "--- Kill cluster"
cargo make kill
cargo make wait-processes-exit
wait
}

# Test sink backfill recovery
Expand All @@ -177,16 +178,11 @@ test_sink_backfill_recovery() {

# Check progress
sqllogictest -p 4566 -d dev 'e2e_test/backfill/sink/create_sink.slt'
sqllogictest -p 4566 -d dev 'e2e_test/background_ddl/common/validate_one_job.slt'

# Restart
restart_cluster
sleep 3

# FIXME(kwannoel): Sink's backfill progress is not recovered yet.
# Check progress
# sqllogictest -p 4566 -d dev 'e2e_test/background_ddl/common/validate_one_job.slt'

# Sink back into rw
run_sql "CREATE TABLE table_kafka (v1 int primary key)
WITH (
Expand All @@ -199,6 +195,9 @@ test_sink_backfill_recovery() {

# Verify data matches upstream table.
sqllogictest -p 4566 -d dev 'e2e_test/backfill/sink/validate_sink.slt'
cargo make kill
cargo make wait-processes-exit
wait
}

main() {
Expand Down
4 changes: 2 additions & 2 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ steps:

- label: "S3 source on OpenDAL fs engine"
key: "s3-source-test-for-opendal-fs-engine"
command: "ci/scripts/s3-source-test-for-opendal-fs-engine.sh -p ci-release -s run.csv"
command: "ci/scripts/s3-source-test-for-opendal-fs-engine.sh -p ci-release -s run.py"
if: |
!(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-s3-source-tests"
Expand All @@ -531,7 +531,7 @@ steps:
# TODO(Kexiang): Enable this test after we have a GCS_SOURCE_TEST_CONF.
# - label: "GCS source on OpenDAL fs engine"
# key: "s3-source-test-for-opendal-fs-engine"
# command: "ci/scripts/s3-source-test-for-opendal-fs-engine.sh -p ci-release -s gcs.csv"
# command: "ci/scripts/s3-source-test-for-opendal-fs-engine.sh -p ci-release -s gcs_source.py"
# if: |
# !(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
# || build.pull_request.labels includes "ci/run-s3-source-tests"
Expand Down
12 changes: 5 additions & 7 deletions dashboard/components/BackPressureTable.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,11 @@ import { sortBy } from "lodash"
import Head from "next/head"
import { Fragment, useEffect, useState } from "react"
import useErrorToast from "../hook/useErrorToast"
import { getActorBackPressures } from "../pages/api/metric"
import {
BackPressuresMetrics,
getActorBackPressures,
} from "../pages/api/metric"
import RateBar from "./RateBar"
import { Metrics } from "./metrics"

interface BackPressuresMetrics {
outputBufferBlockingDuration: Metrics[]
}

export default function BackPressureTable({
selectedFragmentIds,
Expand All @@ -50,7 +48,7 @@ export default function BackPressureTable({
async function doFetch() {
while (true) {
try {
let metrics: BackPressuresMetrics = await getActorBackPressures()
let metrics = await getActorBackPressures()
metrics.outputBufferBlockingDuration = sortBy(
metrics.outputBufferBlockingDuration,
(m) => (m.metric.fragment_id, m.metric.downstream_fragment_id)
Expand Down
143 changes: 117 additions & 26 deletions dashboard/components/FragmentGraph.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,20 @@ import {
theme,
useDisclosure,
} from "@chakra-ui/react"
import { tinycolor } from "@ctrl/tinycolor"
import loadable from "@loadable/component"
import * as d3 from "d3"
import { cloneDeep } from "lodash"
import { Fragment, useCallback, useEffect, useRef, useState } from "react"
import {
Edge,
FragmentBox,
FragmentBoxPosition,
Position,
generateBoxEdges,
layout,
} from "../lib/layout"
import { PlanNodeDatum } from "../pages/fragment_graph"
import BackPressureTable from "./BackPressureTable"

const ReactJson = loadable(() => import("react-json-view"))

Expand Down Expand Up @@ -86,19 +87,21 @@ function boundBox<Datum>(
const nodeRadius = 12
const nodeMarginX = nodeRadius * 6
const nodeMarginY = nodeRadius * 4
const fragmentMarginX = nodeRadius
const fragmentMarginY = nodeRadius
const fragmentDistanceX = nodeRadius * 5
const fragmentDistanceY = nodeRadius * 5
const fragmentMarginX = nodeRadius * 2
const fragmentMarginY = nodeRadius * 2
const fragmentDistanceX = nodeRadius * 2
const fragmentDistanceY = nodeRadius * 2

export default function FragmentGraph({
planNodeDependencies,
fragmentDependency,
selectedFragmentId,
backPressures,
}: {
planNodeDependencies: Map<string, d3.HierarchyNode<PlanNodeDatum>>
fragmentDependency: FragmentBox[]
selectedFragmentId: string | undefined
selectedFragmentId?: string
backPressures?: Map<string, number>
}) {
const svgRef = useRef<SVGSVGElement>(null)

Expand Down Expand Up @@ -288,6 +291,7 @@ export default function FragmentGraph({
const applyStreamNode = (g: StreamNodeSelection) => {
g.attr("transform", (d) => `translate(${d.x},${d.y})`)

// Node circle
let circle = g.select<SVGCircleElement>("circle")
if (circle.empty()) {
circle = g.append("circle")
Expand All @@ -299,6 +303,7 @@ export default function FragmentGraph({
.style("cursor", "pointer")
.on("click", (_d, i) => openPlanNodeDetail(i.data))

// Node name under the circle
let text = g.select<SVGTextElement>("text")
if (text.empty()) {
text = g.append("text")
Expand Down Expand Up @@ -330,13 +335,8 @@ export default function FragmentGraph({
streamNodeSelection.call(applyStreamNode)
}

const createFragment = (sel: Enter<FragmentSelection>) => {
const gSel = sel
.append("g")
.attr("class", "fragment")
.call(applyFragment)
return gSel
}
const createFragment = (sel: Enter<FragmentSelection>) =>
sel.append("g").attr("class", "fragment").call(applyFragment)

const fragmentSelection = svgSelection
.select<SVGGElement>(".fragments")
Expand All @@ -351,7 +351,7 @@ export default function FragmentGraph({
// Fragment Edges
const edgeSelection = svgSelection
.select<SVGGElement>(".fragment-edges")
.selectAll<SVGPathElement, null>(".fragment-edge")
.selectAll<SVGGElement, null>(".fragment-edge")
.data(fragmentEdgeLayout)
type EdgeSelection = typeof edgeSelection

Expand All @@ -363,20 +363,69 @@ export default function FragmentGraph({
.x(({ x }) => x)
.y(({ y }) => y)

const applyEdge = (sel: EdgeSelection) =>
sel
const applyEdge = (gSel: EdgeSelection) => {
// Edge line
let path = gSel.select<SVGPathElement>("path")
if (path.empty()) {
path = gSel.append("path")
}

const isEdgeSelected = (d: Edge) =>
isSelected(d.source) || isSelected(d.target)

const color = (d: Edge) => {
if (backPressures) {
let value = backPressures.get(`${d.target}_${d.source}`)
if (value) {
return backPressureColor(value)
}
}

return isEdgeSelected(d)
? theme.colors.blue["500"]
: theme.colors.gray["300"]
}

const width = (d: Edge) => {
if (backPressures) {
let value = backPressures.get(`${d.target}_${d.source}`)
if (value) {
return backPressureWidth(value)
}
}

return isEdgeSelected(d) ? 4 : 2
}

path
.attr("d", ({ points }) => line(points))
.attr("fill", "none")
.attr("stroke-width", (d) =>
isSelected(d.source) || isSelected(d.target) ? 2 : 1
)
.attr("stroke", (d) =>
isSelected(d.source) || isSelected(d.target)
? theme.colors.blue["500"]
: theme.colors.gray["300"]
)
.attr("stroke-width", width)
.attr("stroke", color)

// Tooltip for back pressure rate
let title = gSel.select<SVGTitleElement>("title")
if (title.empty()) {
title = gSel.append<SVGTitleElement>("title")
}

const text = (d: Edge) => {
if (backPressures) {
let value = backPressures.get(`${d.target}_${d.source}`)
if (value) {
return `${value.toFixed(2)}%`
}
}

return ""
}

title.text(text)

return gSel
}
const createEdge = (sel: Enter<EdgeSelection>) =>
sel.append("path").attr("class", "fragment-edge").call(applyEdge)
sel.append("g").attr("class", "fragment-edge").call(applyEdge)

edgeSelection.enter().call(createEdge)
edgeSelection.call(applyEdge)
Expand All @@ -385,6 +434,7 @@ export default function FragmentGraph({
}, [
fragmentLayout,
fragmentEdgeLayout,
backPressures,
selectedFragmentId,
openPlanNodeDetail,
])
Expand Down Expand Up @@ -423,7 +473,48 @@ export default function FragmentGraph({
<g className="fragment-edges" />
<g className="fragments" />
</svg>
<BackPressureTable selectedFragmentIds={includedFragmentIds} />
{/* <BackPressureTable selectedFragmentIds={includedFragmentIds} /> */}
</Fragment>
)
}

/**
* The color for the edge with given back pressure value.
*
* @param value The back pressure rate, between 0 and 100.
*/
function backPressureColor(value: number) {
const colorRange = [
theme.colors.green["100"],
theme.colors.green["300"],
theme.colors.yellow["400"],
theme.colors.orange["500"],
theme.colors.red["700"],
].map((c) => tinycolor(c))

value = Math.max(value, 0)
value = Math.min(value, 100)

const step = colorRange.length - 1
const pos = (value / 100) * step
const floor = Math.floor(pos)
const ceil = Math.ceil(pos)

const color = tinycolor(colorRange[floor])
.mix(tinycolor(colorRange[ceil]), (pos - floor) * 100)
.toHexString()

return color
}

/**
* The width for the edge with given back pressure value.
*
* @param value The back pressure rate, between 0 and 100.
*/
function backPressureWidth(value: number) {
value = Math.max(value, 0)
value = Math.min(value, 100)

return 30 * (value / 100) + 2
}
2 changes: 0 additions & 2 deletions dashboard/hook/useErrorToast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ export default function useErrorToast() {
duration: 5000,
isClosable: true,
})

console.error(e)
},
[toast]
)
Expand Down
29 changes: 24 additions & 5 deletions dashboard/lib/layout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ export interface FragmentBox {
width: number
height: number
parentIds: string[]
externalParentIds: string[]
fragment?: TableFragments_Fragment
}

Expand Down Expand Up @@ -414,14 +415,11 @@ export function layoutPoint(
nodeRadius: number
): FragmentPointPosition[] {
const fragmentBoxes: Array<FragmentBox> = []
for (let { id, name, order, parentIds, ...others } of fragments) {
for (let { ...others } of fragments) {
fragmentBoxes.push({
id,
name,
parentIds,
width: nodeRadius * 2,
height: nodeRadius * 2,
order,
externalParentIds: [], // we don't care about external parent for point layout
...others,
})
}
Expand Down Expand Up @@ -498,6 +496,27 @@ export function generateBoxEdges(layoutMap: FragmentBoxPosition[]): Edge[] {
target: parentId,
})
}

// Simply draw a horizontal line here.
// Typically, external parent is only applicable to `StreamScan` fragment,
// and there'll be only one external parent due to `UpstreamShard` distribution
// and plan node sharing. So there's no overlapping issue.
for (const externalParentId of fragment.externalParentIds) {
links.push({
points: [
{
x: fragment.x,
y: fragment.y + fragment.height / 2,
},
{
x: fragment.x + 100,
y: fragment.y + fragment.height / 2,
},
],
source: fragment.id,
target: externalParentId,
})
}
}
return links
}
8 changes: 8 additions & 0 deletions dashboard/pages/api/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ class Api {
try {
const res = await fetch(url)
const data = await res.json()

// Throw error if response is not ok.
// See `DashboardError::into_response`.
if (!res.ok) {
throw `${res.status} ${res.statusText}${
data.error ? ": " + data.error : ""
}`
}
return data
} catch (e) {
console.error(e)
Expand Down
Loading

0 comments on commit 46d949e

Please sign in to comment.