Skip to content

Commit

Permalink
Merge branch 'main' into yiming/non-async-complete-barrier
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Mar 5, 2024
2 parents 66f916e + a835506 commit e882dd1
Show file tree
Hide file tree
Showing 102 changed files with 2,159 additions and 754 deletions.
159 changes: 88 additions & 71 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ aws-sdk-s3 = { version = "1", default-features = false, features = [
"rt-tokio",
"rustls",
] }
aws-sdk-ec2 = { version = "1", default-features = false, features = [
# To bump the version of aws-sdk-ec2, check the README of https://github.com/risingwavelabs/rw-aws-sdk-ec2
aws-sdk-ec2 = { package = "rw-aws-sdk-ec2", version = "1", default-features = false, features = [
"rt-tokio",
"rustls",
] }
Expand Down
18 changes: 9 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,16 @@ RisingWave is a Postgres-compatible streaming database engineered to provide the

![RisingWave](https://github.com/risingwavelabs/risingwave-docs/blob/main/docs/images/new_archi_grey.png)

## Try it out in 5 minutes
Docker pull:
```
docker run -it --pull=always -p 4566:4566 -p 5691:5691 risingwavelabs/risingwave:latest playground
```
Now connect to RisingWave using `psql`:
```
psql -h localhost -p 4566 -d dev -U root
## Try it out in 60 seconds

Install RisingWave:
```shell
curl https://risingwave.com/sh | sh
```
Don’t have Docker? Learn how to install RisingWave on Mac, Ubuntu, and other environments at [Quick Start](https://docs.risingwave.com/docs/current/get-started/).

Then follow the prompts to start and connect to RisingWave.

To learn about other installation options such as Docker, see [Quick Start](https://docs.risingwave.com/docs/current/get-started/).

## Production deployments

Expand Down
4 changes: 4 additions & 0 deletions ci/scripts/check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ sccache --zero-stats
echo "--- Run clippy check (release)"
cargo clippy --release --all-targets --features "rw-static-link,enable_test_epoch_in_release" --locked -- -D warnings

echo "--- Run cargo check on building the release binary (release)"
cargo check -p risingwave_cmd_all --features "rw-static-link" --profile release
cargo check -p risingwave_cmd --bin risectl --features "rw-static-link" --profile release

echo "--- Show sccache stats"
sccache --show-stats
sccache --zero-stats
Expand Down
5 changes: 5 additions & 0 deletions ci/scripts/integration-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ fi
echo "--- install postgresql"
sudo yum install -y postgresql15

echo "--- install poetry"
curl -sSL https://install.python-poetry.org | POETRY_VERSION=1.8.0 python3 -



echo "--- download rwctest-key"
aws secretsmanager get-secret-value --secret-id "gcp-buildkite-rwctest-key" --region us-east-2 --query "SecretString" --output text >gcp-rwctest.json

Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ fi

echo "--- Build risingwave release binary"
cargo build -p risingwave_cmd_all --features "rw-static-link" --profile release
cargo build --bin risectl --features "rw-static-link" --profile release
cargo build -p risingwave_cmd --bin risectl --features "rw-static-link" --profile release
cd target/release && chmod +x risingwave risectl

echo "--- Upload nightly binary to s3"
Expand Down
13 changes: 7 additions & 6 deletions dashboard/lib/api/metric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
import { Metrics, MetricsSample } from "../../components/metrics"
import api from "./api"

export const INTERVAL = 5000
export interface BackPressuresMetrics {
outputBufferBlockingDuration: Metrics[]
}

// Get back pressure from meta node -> prometheus
export async function getActorBackPressures() {
// Get back pressure from Prometheus
export async function fetchPrometheusBackPressure() {
const res: BackPressuresMetrics = await api.get(
"/metrics/fragment/prometheus_back_pressures"
)
Expand Down Expand Up @@ -114,7 +113,8 @@ function convertToBackPressureMetrics(

export function calculateBPRate(
backPressureNew: BackPressureInfo[],
backPressureOld: BackPressureInfo[]
backPressureOld: BackPressureInfo[],
intervalMs: number
): BackPressuresMetrics {
let mapNew = convertToMapAndAgg(backPressureNew)
let mapOld = convertToMapAndAgg(backPressureOld)
Expand All @@ -124,7 +124,8 @@ export function calculateBPRate(
result.set(
key,
// The *100 in end of the formular is to convert the BP rate to the value used in web UI drawing
((value - (mapOld.get(key) || 0)) / ((INTERVAL / 1000) * 1000000000)) *
((value - (mapOld.get(key) || 0)) /
((intervalMs / 1000) * 1000000000)) *
100
)
} else {
Expand All @@ -149,7 +150,7 @@ export const BackPressureInfo = {
}

// Get back pressure from meta node -> compute node
export async function getBackPressureWithoutPrometheus() {
export async function fetchEmbeddedBackPressure() {
const response = await api.get("/metrics/fragment/embedded_back_pressures")
let backPressureInfos: BackPressureInfo[] = response.backPressureInfos.map(
BackPressureInfo.fromJSON
Expand Down
173 changes: 88 additions & 85 deletions dashboard/pages/fragment_graph.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import {
} from "@chakra-ui/react"
import * as d3 from "d3"
import { dagStratify } from "d3-dag"
import _, { sortBy } from "lodash"
import _ from "lodash"
import Head from "next/head"
import { parseAsInteger, useQueryState } from "nuqs"
import { Fragment, useCallback, useEffect, useMemo, useState } from "react"
Expand All @@ -40,11 +40,9 @@ import useErrorToast from "../hook/useErrorToast"
import useFetch from "../lib/api/fetch"
import {
BackPressureInfo,
BackPressuresMetrics,
INTERVAL,
calculateBPRate,
getActorBackPressures,
getBackPressureWithoutPrometheus,
fetchEmbeddedBackPressure,
fetchPrometheusBackPressure,
} from "../lib/api/metric"
import { getFragments, getStreamingJobs } from "../lib/api/streaming"
import { FragmentBox } from "../lib/layout"
Expand All @@ -55,6 +53,9 @@ interface DispatcherNode {
[actorId: number]: Dispatcher[]
}

// Refresh interval (ms) for back pressure stats
const INTERVAL = 5000

/** Associated data of each plan node in the fragment graph, including the dispatchers. */
export interface PlanNodeDatum {
name: string
Expand Down Expand Up @@ -181,21 +182,21 @@ const backPressureDataSources: BackPressureDataSource[] = [
"Prometheus",
]

// The state of the embedded back pressure metrics.
// The metrics from previous fetch are stored here to calculate the rate.
interface EmbeddedBackPressureInfo {
previous: BackPressureInfo[]
current: BackPressureInfo[]
}

export default function Streaming() {
const { response: relationList } = useFetch(getStreamingJobs)
const { response: fragmentList } = useFetch(getFragments)

const [relationId, setRelationId] = useQueryState("id", parseAsInteger)
const [selectedFragmentId, setSelectedFragmentId] = useState<number>()
// used to get the data source
const [backPressureDataSource, setBackPressureDataSource] =
useState<BackPressureDataSource>("Embedded")

const { response: actorBackPressures } = useFetch(
getActorBackPressures,
INTERVAL,
backPressureDataSource === "Prometheus"
)
const toast = useErrorToast()

const fragmentDependencyCallback = useCallback(() => {
if (fragmentList) {
Expand All @@ -211,7 +212,6 @@ export default function Streaming() {
}
}
}
return undefined
}, [fragmentList, relationId])

useEffect(() => {
Expand All @@ -222,44 +222,8 @@ export default function Streaming() {
}
}
}
return () => {}
}, [relationId, relationList, setRelationId])

// get back pressure rate without prometheus
// TODO(bugen): extract the following logic to a hook and unify the interface
// with Prometheus data source.
const [backPressuresMetricsWithoutPromtheus, setBackPressuresMetrics] =
useState<BackPressuresMetrics>()
const [previousBP, setPreviousBP] = useState<BackPressureInfo[]>([])
const [currentBP, setCurrentBP] = useState<BackPressureInfo[]>([])
const toast = useErrorToast()

useEffect(() => {
if (backPressureDataSource === "Embedded") {
const interval = setInterval(() => {
const fetchNewBP = async () => {
const newBP = await getBackPressureWithoutPrometheus()
setPreviousBP(currentBP)
setCurrentBP(newBP)
}

fetchNewBP().catch(console.error)
}, INTERVAL)
return () => clearInterval(interval)
}
}, [currentBP, backPressureDataSource])

useEffect(() => {
if (currentBP !== null && previousBP !== null) {
const metrics = calculateBPRate(currentBP, previousBP)
metrics.outputBufferBlockingDuration = sortBy(
metrics.outputBufferBlockingDuration,
(m) => (m.metric.fragmentId, m.metric.downstreamFragmentId)
)
setBackPressuresMetrics(metrics)
}
}, [currentBP, previousBP])

const fragmentDependency = fragmentDependencyCallback()?.fragmentDep
const fragmentDependencyDag = fragmentDependencyCallback()?.fragmentDepDag
const fragments = fragmentDependencyCallback()?.fragments
Expand Down Expand Up @@ -323,49 +287,86 @@ export default function Streaming() {
toast(new Error(`Actor ${searchActorIdInt} not found`))
}

const [backPressureDataSource, setBackPressureDataSource] =
useState<BackPressureDataSource>("Embedded")

// Periodically fetch Prometheus back-pressure from Meta node
const { response: promethusMetrics } = useFetch(
fetchPrometheusBackPressure,
INTERVAL,
backPressureDataSource === "Prometheus"
)

// Periodically fetch embedded back-pressure from Meta node
// Didn't call `useFetch()` because the `setState` way is special.
const [embeddedBackPressureInfo, setEmbeddedBackPressureInfo] =
useState<EmbeddedBackPressureInfo>()
useEffect(() => {
if (backPressureDataSource === "Embedded") {
const interval = setInterval(() => {
fetchEmbeddedBackPressure().then(
(newBP) => {
console.log(newBP)
setEmbeddedBackPressureInfo((prev) =>
prev
? {
previous: prev.current,
current: newBP,
}
: {
previous: newBP, // Use current value to show zero rate, but it's fine
current: newBP,
}
)
},
(e) => {
console.error(e)
toast(e, "error")
}
)
}, INTERVAL)
return () => {
clearInterval(interval)
}
}
}, [backPressureDataSource])

const backPressures = useMemo(() => {
if (actorBackPressures || backPressuresMetricsWithoutPromtheus) {
if (promethusMetrics || embeddedBackPressureInfo) {
let map = new Map()

if (
backPressureDataSource === "Embedded" &&
backPressuresMetricsWithoutPromtheus
) {
for (const m of backPressuresMetricsWithoutPromtheus.outputBufferBlockingDuration) {
if (backPressureDataSource === "Embedded" && embeddedBackPressureInfo) {
const metrics = calculateBPRate(
embeddedBackPressureInfo.current,
embeddedBackPressureInfo.previous,
INTERVAL
)
for (const m of metrics.outputBufferBlockingDuration) {
map.set(
`${m.metric.fragmentId}_${m.metric.downstreamFragmentId}`,
m.sample[0].value
)
}
} else if (
backPressureDataSource === "Prometheus" &&
actorBackPressures
) {
if (actorBackPressures) {
for (const m of actorBackPressures.outputBufferBlockingDuration) {
if (m.sample.length > 0) {
// Note: We issue an instant query to Prometheus to get the most recent value.
// So there should be only one sample here.
//
// Due to https://github.com/risingwavelabs/risingwave/issues/15280, it's still
// possible that an old version of meta service returns a range-query result.
// So we take the one with the latest timestamp here.
const value = _(m.sample).maxBy((s) => s.timestamp)!.value * 100
map.set(
`${m.metric.fragment_id}_${m.metric.downstream_fragment_id}`,
value
)
}
} else if (backPressureDataSource === "Prometheus" && promethusMetrics) {
for (const m of promethusMetrics.outputBufferBlockingDuration) {
if (m.sample.length > 0) {
// Note: We issue an instant query to Prometheus to get the most recent value.
// So there should be only one sample here.
//
// Due to https://github.com/risingwavelabs/risingwave/issues/15280, it's still
// possible that an old version of meta service returns a range-query result.
// So we take the one with the latest timestamp here.
const value = _(m.sample).maxBy((s) => s.timestamp)!.value * 100
map.set(
`${m.metric.fragment_id}_${m.metric.downstream_fragment_id}`,
value
)
}
}
}
return map
}
}, [
backPressureDataSource,
actorBackPressures,
backPressuresMetricsWithoutPromtheus,
])
}, [backPressureDataSource, promethusMetrics, embeddedBackPressureInfo])

const retVal = (
<Flex p={3} height="calc(100vh - 20px)" flexDirection="column">
Expand Down Expand Up @@ -444,12 +445,14 @@ export default function Streaming() {
event.target.value as BackPressureDataSource
)
}
defaultValue="Embedded"
>
{backPressureDataSources.map((algo) => (
<option value={algo} key={algo}>
{algo}
</option>
))}
<option value="Embedded" key="Embedded">
Embedded (5 secs)
</option>
<option value="Prometheus" key="Prometheus">
Prometheus (1 min)
</option>
</Select>
</FormControl>
<Flex height="full" width="full" flexDirection="column">
Expand Down
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

Loading

0 comments on commit e882dd1

Please sign in to comment.