Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync batch pagination #2207

Merged
merged 16 commits into from
Nov 4, 2024
124 changes: 106 additions & 18 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,19 @@ func (h *FlowRequestHandler) cdcFlowStatus(
return nil, err
}

cdcBatchesResponse, err := h.GetCDCBatches(ctx, &protos.GetCDCBatchesRequest{
FlowJobName: req.FlowJobName,
Limit: 0,
})
if err != nil {
var cdcBatches []*protos.CDCBatch
if !req.ExcludeBatches {
cdcBatchesResponse, err := h.GetCDCBatches(ctx, &protos.GetCDCBatchesRequest{FlowJobName: req.FlowJobName})
if err != nil {
return nil, err
}
cdcBatches = cdcBatchesResponse.CdcBatches
}

var rowsSynced int64
if err := h.pool.QueryRow(ctx,
"select coalesce(sum(rows_in_batch), 0) from peerdb_stats.cdc_batches where flow_name=$1", req.FlowJobName,
).Scan(&rowsSynced); err != nil {
return nil, err
}

Expand All @@ -190,10 +198,43 @@ func (h *FlowRequestHandler) cdcFlowStatus(
SnapshotStatus: &protos.SnapshotStatus{
Clones: initialLoadResponse.TableSummaries,
},
CdcBatches: cdcBatchesResponse.CdcBatches,
CdcBatches: cdcBatches,
RowsSynced: rowsSynced,
}, nil
}

func (h *FlowRequestHandler) CDCGraph(ctx context.Context, req *protos.GraphRequest) (*protos.GraphResponse, error) {
truncField := "minute"
switch req.AggregateType {
serprex marked this conversation as resolved.
Show resolved Hide resolved
case "1hour":
heavycrystal marked this conversation as resolved.
Show resolved Hide resolved
truncField = "hour"
case "1day":
truncField = "day"
case "1month":
truncField = "month"
}
rows, err := h.pool.Query(ctx, `select tm, coalesce(sum(rows_in_batch), 0)
from generate_series(date_trunc($2, now() - $1::INTERVAL * 30), now(), $1::INTERVAL) tm
left join peerdb_stats.cdc_batches on start_time >= tm and start_time < tm + $1::INTERVAL
group by 1 order by 1`, req.AggregateType, truncField)
if err != nil {
return nil, err
}
data, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (*protos.GraphResponseItem, error) {
var t time.Time
var r int64
if err := row.Scan(&t, &r); err != nil {
return nil, err
}
return &protos.GraphResponseItem{Time: float64(t.UnixMilli()), Rows: float64(r)}, nil
})
if err != nil {
return nil, err
}

return &protos.GraphResponse{Data: data}, nil
}

func (h *FlowRequestHandler) InitialLoadSummary(
ctx context.Context,
req *protos.InitialLoadSummaryRequest,
Expand Down Expand Up @@ -455,18 +496,39 @@ func (h *FlowRequestHandler) getMirrorCreatedAt(ctx context.Context, flowJobName
}

func (h *FlowRequestHandler) GetCDCBatches(ctx context.Context, req *protos.GetCDCBatchesRequest) (*protos.GetCDCBatchesResponse, error) {
mirrorName := req.FlowJobName
limit := req.Limit
return h.CDCBatches(ctx, req)
}

func (h *FlowRequestHandler) CDCBatches(ctx context.Context, req *protos.GetCDCBatchesRequest) (*protos.GetCDCBatchesResponse, error) {
limitClause := ""
if limit > 0 {
limitClause = fmt.Sprintf(" LIMIT %d", limit)
if req.Limit > 0 {
limitClause = fmt.Sprintf(" LIMIT %d", req.Limit)
}
q := `SELECT DISTINCT ON(batch_id) batch_id,start_time,end_time,rows_in_batch,batch_start_lsn,batch_end_lsn FROM peerdb_stats.cdc_batches
WHERE flow_name=$1 AND start_time IS NOT NULL ORDER BY batch_id DESC, start_time DESC` + limitClause
rows, err := h.pool.Query(ctx, q, mirrorName)

whereExpr := ""
queryArgs := append(make([]any, 0, 2), req.FlowJobName)

sortOrderBy := "desc"
serprex marked this conversation as resolved.
Show resolved Hide resolved
if req.BeforeId != 0 || req.AfterId != 0 {
if req.BeforeId != -1 {
serprex marked this conversation as resolved.
Show resolved Hide resolved
queryArgs = append(queryArgs, req.BeforeId)
whereExpr = fmt.Sprintf(" AND batch_id < $%d", len(queryArgs))
} else if req.AfterId != -1 {
queryArgs = append(queryArgs, req.AfterId)
whereExpr = fmt.Sprintf(" AND batch_id > $%d", len(queryArgs))
sortOrderBy = "asc"
}
}

q := fmt.Sprintf(`SELECT DISTINCT ON(batch_id)
batch_id,start_time,end_time,rows_in_batch,batch_start_lsn,batch_end_lsn
FROM peerdb_stats.cdc_batches
WHERE flow_name=$1 AND start_time IS NOT NULL%s
ORDER BY batch_id %s%s`, whereExpr, sortOrderBy, limitClause)
rows, err := h.pool.Query(ctx, q, queryArgs...)
if err != nil {
slog.Error(fmt.Sprintf("unable to query cdc batches - %s: %s", mirrorName, err.Error()))
return nil, fmt.Errorf("unable to query cdc batches - %s: %w", mirrorName, err)
slog.Error(fmt.Sprintf("unable to query cdc batches - %s: %s", req.FlowJobName, err.Error()))
return nil, fmt.Errorf("unable to query cdc batches - %s: %w", req.FlowJobName, err)
}

batches, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (*protos.CDCBatch, error) {
Expand All @@ -477,8 +539,8 @@ func (h *FlowRequestHandler) GetCDCBatches(ctx context.Context, req *protos.GetC
var startLSN pgtype.Numeric
var endLSN pgtype.Numeric
if err := rows.Scan(&batchID, &startTime, &endTime, &numRows, &startLSN, &endLSN); err != nil {
slog.Error(fmt.Sprintf("unable to scan cdc batches - %s: %s", mirrorName, err.Error()))
return nil, fmt.Errorf("unable to scan cdc batches - %s: %w", mirrorName, err)
slog.Error(fmt.Sprintf("unable to scan cdc batches - %s: %s", req.FlowJobName, err.Error()))
return nil, fmt.Errorf("unable to scan cdc batches - %s: %w", req.FlowJobName, err)
}

var batch protos.CDCBatch
Expand Down Expand Up @@ -511,9 +573,35 @@ func (h *FlowRequestHandler) GetCDCBatches(ctx context.Context, req *protos.GetC
if batches == nil {
batches = []*protos.CDCBatch{}
}
if req.Ascending != (sortOrderBy == "asc") {
slices.Reverse(batches)
}

var total int32
var rowsBehind int32
if len(batches) > 0 {
op := '>'
if req.Ascending {
op = '<'
}
firstId := batches[0].BatchId
if err := h.pool.QueryRow(ctx, fmt.Sprintf(`select count(distinct batch_id), count(distinct batch_id) filter (where batch_id%c$2)
from peerdb_stats.cdc_batches where flow_name=$1 and start_time is not null`, op), req.FlowJobName, firstId,
).Scan(&total, &rowsBehind); err != nil {
return nil, err
}
} else if err := h.pool.QueryRow(
ctx,
"select count(distinct batch_id) from peerdb_stats.cdc_batches where flow_name=$1 and start_time is not null",
req.FlowJobName,
).Scan(&total); err != nil {
return nil, err
}

return &protos.GetCDCBatchesResponse{
CdcBatches: batches,
Total: total,
Page: rowsBehind/int32(req.Limit) + 1,
}, nil
}

Expand Down Expand Up @@ -602,7 +690,7 @@ func (h *FlowRequestHandler) ListMirrorLogs(
}

sortOrderBy := "desc"
if req.BeforeId != 0 && req.AfterId != 0 {
if req.BeforeId != 0 || req.AfterId != 0 {
if req.BeforeId != -1 {
whereArgs = append(whereArgs, req.BeforeId)
whereExprs = append(whereExprs, fmt.Sprintf("id < $%d", len(whereArgs)))
Expand Down
33 changes: 31 additions & 2 deletions protos/route.proto
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ message CreatePeerResponse {
message MirrorStatusRequest {
string flow_job_name = 1;
bool include_flow_info = 2;
bool exclude_batches = 3;
heavycrystal marked this conversation as resolved.
Show resolved Hide resolved
}

message PartitionStatus {
Expand Down Expand Up @@ -320,6 +321,7 @@ message CDCMirrorStatus {
repeated CDCBatch cdc_batches = 3;
peerdb_peers.DBType source_type = 4;
peerdb_peers.DBType destination_type = 5;
int64 rows_synced = 6;
}

message MirrorStatusResponse {
Expand All @@ -343,10 +345,29 @@ message InitialLoadSummaryResponse {
message GetCDCBatchesRequest {
string flow_job_name = 1;
uint32 limit = 2;
bool ascending = 3;
int64 before_id = 4;
int64 after_id = 5;
}

message GetCDCBatchesResponse {
repeated CDCBatch cdc_batches = 1;
int32 total = 2;
int32 page = 3;
}

message GraphRequest {
string flow_job_name = 1;
string aggregate_type = 2; // TODO name?
}

message GraphResponseItem {
double time = 1;
double rows = 2;
heavycrystal marked this conversation as resolved.
Show resolved Hide resolved
}

message GraphResponse {
repeated GraphResponseItem data = 1;
}

message MirrorLog {
Expand Down Expand Up @@ -545,11 +566,19 @@ service FlowService {
}

rpc GetCDCBatches(GetCDCBatchesRequest) returns (GetCDCBatchesResponse) {
option (google.api.http) = { get: "/v1/mirrors/cdc/batches/{flow_job_name}"};
option (google.api.http) = { get: "/v1/mirrors/cdc/batches/{flow_job_name}" };
}

rpc CDCBatches(GetCDCBatchesRequest) returns (GetCDCBatchesResponse) {
option (google.api.http) = { post: "/v1/mirrors/cdc/batches", body: "*" };
}

rpc CDCGraph(GraphRequest) returns (GraphResponse) {
option (google.api.http) = { post: "/v1/mirrors/cdc/graph", body: "*" };
}

rpc InitialLoadSummary(InitialLoadSummaryRequest) returns (InitialLoadSummaryResponse) {
option (google.api.http) = { get: "/v1/mirrors/cdc/initial_load/{parent_mirror_name}"};
option (google.api.http) = { get: "/v1/mirrors/cdc/initial_load/{parent_mirror_name}" };
}

rpc GetPeerInfo(PeerInfoRequest) returns (PeerInfoResponse) {
Expand Down
4 changes: 1 addition & 3 deletions ui/app/mirrors/[mirrorId]/aggregatedCountsByInterval.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ type timestampType = {
count: number;
};

function aggregateCountsByInterval(
export default function aggregateCountsByInterval(
timestamps: timestampType[],
interval: TimeAggregateTypes
): [string, number][] {
Expand Down Expand Up @@ -83,5 +83,3 @@ function aggregateCountsByInterval(

return resultArray;
}

export default aggregateCountsByInterval;
10 changes: 2 additions & 8 deletions ui/app/mirrors/[mirrorId]/cdc.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
'use client';
import { CDCBatch, MirrorStatusResponse } from '@/grpc_generated/route';
import { MirrorStatusResponse } from '@/grpc_generated/route';
import { Label } from '@/lib/Label';
import { ProgressCircle } from '@/lib/ProgressCircle';
import { Tab, TabGroup, TabList, TabPanel, TabPanels } from '@tremor/react';
Expand All @@ -10,14 +10,9 @@ import { SnapshotStatusTable } from './snapshot';

type CDCMirrorStatusProps = {
status: MirrorStatusResponse;
rows: CDCBatch[];
syncStatusChild?: React.ReactNode;
};
export function CDCMirror({
status,
rows,
syncStatusChild,
}: CDCMirrorStatusProps) {
export function CDCMirror({ status, syncStatusChild }: CDCMirrorStatusProps) {
const LocalStorageTabKey = 'cdctab';
const [selectedTab, setSelectedTab] = useLocalStorage(LocalStorageTabKey, 0);
const [mounted, setMounted] = useState(false);
Expand Down Expand Up @@ -60,7 +55,6 @@ export function CDCMirror({
<TabPanels>
<TabPanel>
<CdcDetails
syncs={rows}
createdAt={status.createdAt}
mirrorConfig={status.cdcStatus!}
mirrorStatus={status.currentFlowState}
Expand Down
40 changes: 17 additions & 23 deletions ui/app/mirrors/[mirrorId]/cdcDetails.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import PeerButton from '@/components/PeerComponent';
import TimeLabel from '@/components/TimeComponent';
import { FlowStatus } from '@/grpc_generated/flow';
import { dBTypeFromJSON } from '@/grpc_generated/peers';
import { CDCBatch, CDCMirrorStatus } from '@/grpc_generated/route';
import { CDCMirrorStatus } from '@/grpc_generated/route';
import { Label } from '@/lib/Label';
import { ProgressCircle } from '@/lib/ProgressCircle';
import Link from 'next/link';
Expand All @@ -16,27 +16,22 @@ import { RowDataFormatter } from './rowsDisplay';
import TablePairs from './tablePairs';

type props = {
syncs: CDCBatch[];
mirrorConfig: CDCMirrorStatus;
createdAt?: Date;
mirrorStatus: FlowStatus;
};
function CdcDetails({ syncs, createdAt, mirrorConfig, mirrorStatus }: props) {
const [syncInterval, getSyncInterval] = useState<number>();

let rowsSynced = syncs.reduce((acc, sync) => {
if (sync.endTime !== null) {
return acc + Number(sync.numRows);
}
return acc;
}, 0);
export default function CdcDetails({
createdAt,
mirrorConfig,
mirrorStatus,
}: props) {
const [syncInterval, setSyncInterval] = useState<number>();

const tablesSynced = mirrorConfig.config?.tableMappings;
useEffect(() => {
getCurrentIdleTimeout(mirrorConfig.config?.flowJobName ?? '').then(
(res) => {
getSyncInterval(res);
}
getCurrentIdleTimeout(mirrorConfig.config?.flowJobName ?? '').then((res) =>
setSyncInterval(res)
);
}, [mirrorConfig.config?.flowJobName]);
return (
Expand Down Expand Up @@ -82,8 +77,8 @@ function CdcDetails({ syncs, createdAt, mirrorConfig, mirrorStatus }: props) {
</div>
<div>
<PeerButton
peerName={mirrorConfig?.config?.sourceName ?? ''}
peerType={dBTypeFromJSON(mirrorConfig?.sourceType)}
peerName={mirrorConfig.config?.sourceName ?? ''}
peerType={dBTypeFromJSON(mirrorConfig.sourceType)}
/>
</div>
</div>
Expand All @@ -95,8 +90,8 @@ function CdcDetails({ syncs, createdAt, mirrorConfig, mirrorStatus }: props) {
</div>
<div>
<PeerButton
peerName={mirrorConfig?.config?.destinationName ?? ''}
peerType={dBTypeFromJSON(mirrorConfig?.destinationType)}
peerName={mirrorConfig.config?.destinationName ?? ''}
peerType={dBTypeFromJSON(mirrorConfig.destinationType)}
/>
</div>
</div>
Expand Down Expand Up @@ -129,7 +124,9 @@ function CdcDetails({ syncs, createdAt, mirrorConfig, mirrorStatus }: props) {
</Label>
</div>
<div>
<Label variant='body'>{RowDataFormatter(rowsSynced)}</Label>
<Label variant='body'>
{RowDataFormatter(mirrorConfig.rowsSynced)}
</Label>
</div>
</div>

Expand All @@ -151,8 +148,7 @@ const SyncIntervalLabel: React.FC<{ syncInterval?: number }> = ({

if (!syncInterval) {
return <ProgressCircle variant='determinate_progress_circle' />;
}
if (syncInterval >= 3600) {
} else if (syncInterval >= 3600) {
serprex marked this conversation as resolved.
Show resolved Hide resolved
const hours = Math.floor(syncInterval / 3600);
formattedInterval = `${hours} hour${hours !== 1 ? 's' : ''}`;
} else if (syncInterval >= 60) {
Expand All @@ -164,5 +160,3 @@ const SyncIntervalLabel: React.FC<{ syncInterval?: number }> = ({

return <Label>{formattedInterval}</Label>;
};

export default CdcDetails;
Loading
Loading