Skip to content

Commit

Permalink
polish
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Oct 31, 2024
1 parent d0eafaa commit 44966b0
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 29 deletions.
7 changes: 2 additions & 5 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,11 +542,8 @@ func (h *FlowRequestHandler) CDCBatches(ctx context.Context, req *protos.GetCDCB
var rowsBehind int32
if len(batches) > 0 {
firstId := batches[0].BatchId
if err := h.pool.QueryRow(
ctx,
"select count(distinct batch_id), count(distinct batch_id) filter (where batch_id > $2) from peerdb_stats.cdc_batches where flow_name=$1 and start_time is not null",
req.FlowJobName,
firstId,
if err := h.pool.QueryRow(ctx, `select count(distinct batch_id), count(distinct batch_id) filter (where batch_id > $2)
from peerdb_stats.cdc_batches where flow_name=$1 and start_time is not null`, req.FlowJobName, firstId,
).Scan(&total, &rowsBehind); err != nil {
return nil, err
}
Expand Down
18 changes: 18 additions & 0 deletions protos/route.proto
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,20 @@ message GetCDCBatchesResponse {
int32 page = 3;
}

message GraphRequest {
string flow_job_name = 1;
string aggregate_type = 2; // TODO name?
}

message GraphResponseItem {
double time = 1;
double rows = 2;
}

message GraphResponse {
repeated GraphResponseItem data = 1;
}

message MirrorLog {
string flow_name = 1;
string error_message = 2;
Expand Down Expand Up @@ -557,6 +571,10 @@ service FlowService {
option (google.api.http) = { post: "/v1/mirrors/cdc/batches", body: "*" };
}

rpc CDCGraph(GraphRequest) returns (GraphResponse) {
option (google.api.http) = { post: "/v1/mirrors/cdc/graph", body: "*" }
}

rpc InitialLoadSummary(InitialLoadSummaryRequest) returns (InitialLoadSummaryResponse) {
option (google.api.http) = { get: "/v1/mirrors/cdc/initial_load/{parent_mirror_name}" };
}
Expand Down
4 changes: 1 addition & 3 deletions ui/app/mirrors/[mirrorId]/aggregatedCountsByInterval.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ type timestampType = {
count: number;
};

function aggregateCountsByInterval(
export default function aggregateCountsByInterval(
timestamps: timestampType[],
interval: TimeAggregateTypes
): [string, number][] {
Expand Down Expand Up @@ -83,5 +83,3 @@ function aggregateCountsByInterval(

return resultArray;
}

export default aggregateCountsByInterval;
52 changes: 39 additions & 13 deletions ui/app/mirrors/[mirrorId]/cdcGraph.tsx
Original file line number Diff line number Diff line change
@@ -1,31 +1,59 @@
'use client';
import SelectTheme from '@/app/styles/select';
import { TimeAggregateTypes, timeOptions } from '@/app/utils/graph';
import {
formatGraphLabel,
TimeAggregateTypes,
timeOptions,
} from '@/app/utils/graph';
import { Label } from '@/lib/Label';
import { BarChart } from '@tremor/react';
import { useMemo, useState } from 'react';
import { useEffect, useState } from 'react';
import ReactSelect from 'react-select';

type CdcGraphProps = {};
type CdcGraphProps = { mirrorName: string };

function CdcGraph({}: CdcGraphProps) {
let [aggregateType, setAggregateType] = useState<TimeAggregateTypes>(
export default function CdcGraph({ mirrorName }: CdcGraphProps) {
const [aggregateType, setAggregateType] = useState<TimeAggregateTypes>(
TimeAggregateTypes.HOUR
);
const [graphValues, setGraphValues] = useState<
{ name: string; 'Rows synced at a point in time': number }[]
>([]);

const graphValues = useMemo(() => {
return []; /* TODO
useEffect(() => {
const fetchData = async () => {
const req: any = {
flowJobName: mirrorName,
aggregateType,
};

const res = await fetch('/api/v1/mirrors/cdc/graph', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
cache: 'no-store',
body: JSON.stringify(req),
});
const data = await res.json();
setGraphValues(
data.data.map(({ time, rows }) => ({
name: formatGraphLabel(new Date(time), aggregateType),
'Rows synced at a point in time': Number(rows),
}))
);
};

fetchData();
/* TODO
const rows = syncs.map((sync) => ({
timestamp: sync.endTime,
count: sync.numRows,
}));
let timedRowCounts = aggregateCountsByInterval(rows, aggregateType);
timedRowCounts = timedRowCounts.slice(0, 29).reverse();
return timedRowCounts.map((count) => ({
name: formatGraphLabel(new Date(count[0]), aggregateType),
'Rows synced at a point in time': Number(count[1]),
})); */
}, [aggregateType]);
}, [mirrorName, aggregateType]);

return (
<div>
Expand All @@ -51,5 +79,3 @@ function CdcGraph({}: CdcGraphProps) {
</div>
);
}

export default CdcGraph;
12 changes: 5 additions & 7 deletions ui/app/mirrors/[mirrorId]/qrepGraph.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,20 @@ type QRepGraphProps = {
};

function QrepGraph({ syncs }: QRepGraphProps) {
let [aggregateType, setAggregateType] = useState<TimeAggregateTypes>(
const [aggregateType, setAggregateType] = useState<TimeAggregateTypes>(
TimeAggregateTypes.HOUR
);
const initialCount: [string, number][] = [];
let [counts, setCounts] = useState(initialCount);
const [counts, setCounts] = useState(initialCount);

useEffect(() => {
let rows = syncs.map((sync) => ({
const rows = syncs.map((sync) => ({
timestamp: sync.startTime!,
count: Number(sync.rowsInPartition) ?? 0,
}));

let counts = aggregateCountsByInterval(rows, aggregateType);
counts = counts.slice(0, 29);
counts = counts.reverse();
setCounts(counts);
const counts = aggregateCountsByInterval(rows, aggregateType);
setCounts(counts.slice(0, 29).reverse());
}, [aggregateType, syncs]);

return (
Expand Down
2 changes: 1 addition & 1 deletion ui/app/mirrors/[mirrorId]/syncStatusTable.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ export const SyncStatusTable = ({ mirrorName }: SyncStatusTableProps) => {
beforeId: beforeId,
afterId: afterId,
};
const res = await fetch(`/api/v1/mirrors/cdc/batches`, {
const res = await fetch('/api/v1/mirrors/cdc/batches', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Expand Down

0 comments on commit 44966b0

Please sign in to comment.