diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index da68b6457c..ffd6eba459 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -175,11 +175,19 @@ func (h *FlowRequestHandler) cdcFlowStatus( return nil, err } - cdcBatchesResponse, err := h.GetCDCBatches(ctx, &protos.GetCDCBatchesRequest{ - FlowJobName: req.FlowJobName, - Limit: 0, - }) - if err != nil { + var cdcBatches []*protos.CDCBatch + if !req.ExcludeBatches { + cdcBatchesResponse, err := h.GetCDCBatches(ctx, &protos.GetCDCBatchesRequest{FlowJobName: req.FlowJobName}) + if err != nil { + return nil, err + } + cdcBatches = cdcBatchesResponse.CdcBatches + } + + var rowsSynced int64 + if err := h.pool.QueryRow(ctx, + "select coalesce(sum(rows_in_batch), 0) from peerdb_stats.cdc_batches where flow_name=$1", req.FlowJobName, + ).Scan(&rowsSynced); err != nil { return nil, err } @@ -190,10 +198,43 @@ func (h *FlowRequestHandler) cdcFlowStatus( SnapshotStatus: &protos.SnapshotStatus{ Clones: initialLoadResponse.TableSummaries, }, - CdcBatches: cdcBatchesResponse.CdcBatches, + CdcBatches: cdcBatches, + RowsSynced: rowsSynced, }, nil } +func (h *FlowRequestHandler) CDCGraph(ctx context.Context, req *protos.GraphRequest) (*protos.GraphResponse, error) { + truncField := "minute" + switch req.AggregateType { + case "1hour": + truncField = "hour" + case "1day": + truncField = "day" + case "1month": + truncField = "month" + } + rows, err := h.pool.Query(ctx, `select tm, coalesce(sum(rows_in_batch), 0) + from generate_series(date_trunc($2, now() - $1::INTERVAL * 30), now(), $1::INTERVAL) tm + left join peerdb_stats.cdc_batches on start_time >= tm and start_time < tm + $1::INTERVAL + group by 1 order by 1`, req.AggregateType, truncField) + if err != nil { + return nil, err + } + data, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (*protos.GraphResponseItem, error) { + var t time.Time + var r int64 + if err := row.Scan(&t, &r); err != nil { + return nil, err + } + return &protos.GraphResponseItem{Time: float64(t.UnixMilli()), Rows: float64(r)}, nil + }) + if err != nil { + return nil, err + } + + return &protos.GraphResponse{Data: data}, nil +} + func (h *FlowRequestHandler) InitialLoadSummary( ctx context.Context, req *protos.InitialLoadSummaryRequest, @@ -455,18 +496,39 @@ func (h *FlowRequestHandler) getMirrorCreatedAt(ctx context.Context, flowJobName } func (h *FlowRequestHandler) GetCDCBatches(ctx context.Context, req *protos.GetCDCBatchesRequest) (*protos.GetCDCBatchesResponse, error) { - mirrorName := req.FlowJobName - limit := req.Limit + return h.CDCBatches(ctx, req) +} + +func (h *FlowRequestHandler) CDCBatches(ctx context.Context, req *protos.GetCDCBatchesRequest) (*protos.GetCDCBatchesResponse, error) { limitClause := "" - if limit > 0 { - limitClause = fmt.Sprintf(" LIMIT %d", limit) + if req.Limit > 0 { + limitClause = fmt.Sprintf(" LIMIT %d", req.Limit) } - q := `SELECT DISTINCT ON(batch_id) batch_id,start_time,end_time,rows_in_batch,batch_start_lsn,batch_end_lsn FROM peerdb_stats.cdc_batches - WHERE flow_name=$1 AND start_time IS NOT NULL ORDER BY batch_id DESC, start_time DESC` + limitClause - rows, err := h.pool.Query(ctx, q, mirrorName) + + whereExpr := "" + queryArgs := append(make([]any, 0, 2), req.FlowJobName) + + sortOrderBy := "desc" + if req.BeforeId != 0 || req.AfterId != 0 { + if req.BeforeId != -1 { + queryArgs = append(queryArgs, req.BeforeId) + whereExpr = fmt.Sprintf(" AND batch_id < $%d", len(queryArgs)) + } else if req.AfterId != -1 { + queryArgs = append(queryArgs, req.AfterId) + whereExpr = fmt.Sprintf(" AND batch_id > $%d", len(queryArgs)) + sortOrderBy = "asc" + } + } + + q := fmt.Sprintf(`SELECT DISTINCT ON(batch_id) + batch_id,start_time,end_time,rows_in_batch,batch_start_lsn,batch_end_lsn + FROM peerdb_stats.cdc_batches + WHERE flow_name=$1 AND start_time IS NOT NULL%s + ORDER BY batch_id %s%s`, whereExpr, sortOrderBy, limitClause) + rows, err := h.pool.Query(ctx, q, queryArgs...) if err != nil { - slog.Error(fmt.Sprintf("unable to query cdc batches - %s: %s", mirrorName, err.Error())) - return nil, fmt.Errorf("unable to query cdc batches - %s: %w", mirrorName, err) + slog.Error(fmt.Sprintf("unable to query cdc batches - %s: %s", req.FlowJobName, err.Error())) + return nil, fmt.Errorf("unable to query cdc batches - %s: %w", req.FlowJobName, err) } batches, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (*protos.CDCBatch, error) { @@ -477,8 +539,8 @@ func (h *FlowRequestHandler) GetCDCBatches(ctx context.Context, req *protos.GetC var startLSN pgtype.Numeric var endLSN pgtype.Numeric if err := rows.Scan(&batchID, &startTime, &endTime, &numRows, &startLSN, &endLSN); err != nil { - slog.Error(fmt.Sprintf("unable to scan cdc batches - %s: %s", mirrorName, err.Error())) - return nil, fmt.Errorf("unable to scan cdc batches - %s: %w", mirrorName, err) + slog.Error(fmt.Sprintf("unable to scan cdc batches - %s: %s", req.FlowJobName, err.Error())) + return nil, fmt.Errorf("unable to scan cdc batches - %s: %w", req.FlowJobName, err) } var batch protos.CDCBatch @@ -511,9 +573,35 @@ func (h *FlowRequestHandler) GetCDCBatches(ctx context.Context, req *protos.GetC if batches == nil { batches = []*protos.CDCBatch{} } + if req.Ascending != (sortOrderBy == "asc") { + slices.Reverse(batches) + } + + var total int32 + var rowsBehind int32 + if len(batches) > 0 { + op := '>' + if req.Ascending { + op = '<' + } + firstId := batches[0].BatchId + if err := h.pool.QueryRow(ctx, fmt.Sprintf(`select count(distinct batch_id), count(distinct batch_id) filter (where batch_id%c$2) + from peerdb_stats.cdc_batches where flow_name=$1 and start_time is not null`, op), req.FlowJobName, firstId, + ).Scan(&total, &rowsBehind); err != nil { + return nil, err + } + } else if err := h.pool.QueryRow( + ctx, + "select count(distinct batch_id) from peerdb_stats.cdc_batches where flow_name=$1 and start_time is not null", + req.FlowJobName, + ).Scan(&total); err != nil { + return nil, err + } return &protos.GetCDCBatchesResponse{ CdcBatches: batches, + Total: total, + Page: rowsBehind/int32(req.Limit) + 1, }, nil } @@ -602,7 +690,7 @@ func (h *FlowRequestHandler) ListMirrorLogs( } sortOrderBy := "desc" - if req.BeforeId != 0 && req.AfterId != 0 { + if req.BeforeId != 0 || req.AfterId != 0 { if req.BeforeId != -1 { whereArgs = append(whereArgs, req.BeforeId) whereExprs = append(whereExprs, fmt.Sprintf("id < $%d", len(whereArgs))) diff --git a/protos/route.proto b/protos/route.proto index a729f88ac4..0265f221ee 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -145,6 +145,7 @@ message CreatePeerResponse { message MirrorStatusRequest { string flow_job_name = 1; bool include_flow_info = 2; + bool exclude_batches = 3; } message PartitionStatus { @@ -320,6 +321,7 @@ message CDCMirrorStatus { repeated CDCBatch cdc_batches = 3; peerdb_peers.DBType source_type = 4; peerdb_peers.DBType destination_type = 5; + int64 rows_synced = 6; } message MirrorStatusResponse { @@ -343,10 +345,29 @@ message InitialLoadSummaryResponse { message GetCDCBatchesRequest { string flow_job_name = 1; uint32 limit = 2; + bool ascending = 3; + int64 before_id = 4; + int64 after_id = 5; } message GetCDCBatchesResponse { repeated CDCBatch cdc_batches = 1; + int32 total = 2; + int32 page = 3; +} + +message GraphRequest { + string flow_job_name = 1; + string aggregate_type = 2; // TODO name? +} + +message GraphResponseItem { + double time = 1; + double rows = 2; +} + +message GraphResponse { + repeated GraphResponseItem data = 1; } message MirrorLog { @@ -545,11 +566,19 @@ service FlowService { } rpc GetCDCBatches(GetCDCBatchesRequest) returns (GetCDCBatchesResponse) { - option (google.api.http) = { get: "/v1/mirrors/cdc/batches/{flow_job_name}"}; + option (google.api.http) = { get: "/v1/mirrors/cdc/batches/{flow_job_name}" }; + } + + rpc CDCBatches(GetCDCBatchesRequest) returns (GetCDCBatchesResponse) { + option (google.api.http) = { post: "/v1/mirrors/cdc/batches", body: "*" }; + } + + rpc CDCGraph(GraphRequest) returns (GraphResponse) { + option (google.api.http) = { post: "/v1/mirrors/cdc/graph", body: "*" }; } rpc InitialLoadSummary(InitialLoadSummaryRequest) returns (InitialLoadSummaryResponse) { - option (google.api.http) = { get: "/v1/mirrors/cdc/initial_load/{parent_mirror_name}"}; + option (google.api.http) = { get: "/v1/mirrors/cdc/initial_load/{parent_mirror_name}" }; } rpc GetPeerInfo(PeerInfoRequest) returns (PeerInfoResponse) { diff --git a/ui/app/mirrors/[mirrorId]/aggregatedCountsByInterval.ts b/ui/app/mirrors/[mirrorId]/aggregatedCountsByInterval.ts index 42b74e6198..92b2f6cb98 100644 --- a/ui/app/mirrors/[mirrorId]/aggregatedCountsByInterval.ts +++ b/ui/app/mirrors/[mirrorId]/aggregatedCountsByInterval.ts @@ -7,7 +7,7 @@ type timestampType = { count: number; }; -function aggregateCountsByInterval( +export default function aggregateCountsByInterval( timestamps: timestampType[], interval: TimeAggregateTypes ): [string, number][] { @@ -83,5 +83,3 @@ function aggregateCountsByInterval( return resultArray; } - -export default aggregateCountsByInterval; diff --git a/ui/app/mirrors/[mirrorId]/cdc.tsx b/ui/app/mirrors/[mirrorId]/cdc.tsx index e404749b01..34556379b1 100644 --- a/ui/app/mirrors/[mirrorId]/cdc.tsx +++ b/ui/app/mirrors/[mirrorId]/cdc.tsx @@ -1,5 +1,5 @@ 'use client'; -import { CDCBatch, MirrorStatusResponse } from '@/grpc_generated/route'; +import { MirrorStatusResponse } from '@/grpc_generated/route'; import { Label } from '@/lib/Label'; import { ProgressCircle } from '@/lib/ProgressCircle'; import { Tab, TabGroup, TabList, TabPanel, TabPanels } from '@tremor/react'; @@ -10,14 +10,9 @@ import { SnapshotStatusTable } from './snapshot'; type CDCMirrorStatusProps = { status: MirrorStatusResponse; - rows: CDCBatch[]; syncStatusChild?: React.ReactNode; }; -export function CDCMirror({ - status, - rows, - syncStatusChild, -}: CDCMirrorStatusProps) { +export function CDCMirror({ status, syncStatusChild }: CDCMirrorStatusProps) { const LocalStorageTabKey = 'cdctab'; const [selectedTab, setSelectedTab] = useLocalStorage(LocalStorageTabKey, 0); const [mounted, setMounted] = useState(false); @@ -60,7 +55,6 @@ export function CDCMirror({ (); - let rowsSynced = syncs.reduce((acc, sync) => { - if (sync.endTime !== null) { - return acc + Number(sync.numRows); - } - return acc; - }, 0); +export default function CdcDetails({ + createdAt, + mirrorConfig, + mirrorStatus, +}: props) { + const [syncInterval, setSyncInterval] = useState(); const tablesSynced = mirrorConfig.config?.tableMappings; useEffect(() => { - getCurrentIdleTimeout(mirrorConfig.config?.flowJobName ?? '').then( - (res) => { - getSyncInterval(res); - } + getCurrentIdleTimeout(mirrorConfig.config?.flowJobName ?? '').then((res) => + setSyncInterval(res) ); }, [mirrorConfig.config?.flowJobName]); return ( @@ -82,8 +77,8 @@ function CdcDetails({ syncs, createdAt, mirrorConfig, mirrorStatus }: props) {
@@ -95,8 +90,8 @@ function CdcDetails({ syncs, createdAt, mirrorConfig, mirrorStatus }: props) {
@@ -129,7 +124,9 @@ function CdcDetails({ syncs, createdAt, mirrorConfig, mirrorStatus }: props) {
- +
@@ -151,8 +148,7 @@ const SyncIntervalLabel: React.FC<{ syncInterval?: number }> = ({ if (!syncInterval) { return ; - } - if (syncInterval >= 3600) { + } else if (syncInterval >= 3600) { const hours = Math.floor(syncInterval / 3600); formattedInterval = `${hours} hour${hours !== 1 ? 's' : ''}`; } else if (syncInterval >= 60) { @@ -164,5 +160,3 @@ const SyncIntervalLabel: React.FC<{ syncInterval?: number }> = ({ return ; }; - -export default CdcDetails; diff --git a/ui/app/mirrors/[mirrorId]/cdcGraph.tsx b/ui/app/mirrors/[mirrorId]/cdcGraph.tsx index 02e2c0d26a..e022101daa 100644 --- a/ui/app/mirrors/[mirrorId]/cdcGraph.tsx +++ b/ui/app/mirrors/[mirrorId]/cdcGraph.tsx @@ -5,35 +5,44 @@ import { TimeAggregateTypes, timeOptions, } from '@/app/utils/graph'; -import { CDCBatch } from '@/grpc_generated/route'; import { Label } from '@/lib/Label'; import { BarChart } from '@tremor/react'; -import { useMemo, useState } from 'react'; +import { useEffect, useState } from 'react'; import ReactSelect from 'react-select'; -import aggregateCountsByInterval from './aggregatedCountsByInterval'; -type CdcGraphProps = { - syncs: CDCBatch[]; -}; +type CdcGraphProps = { mirrorName: string }; -function CdcGraph({ syncs }: CdcGraphProps) { - let [aggregateType, setAggregateType] = useState( +export default function CdcGraph({ mirrorName }: CdcGraphProps) { + const [aggregateType, setAggregateType] = useState( TimeAggregateTypes.HOUR ); + const [graphValues, setGraphValues] = useState< + { name: string; 'Rows synced at a point in time': number }[] + >([]); - const graphValues = useMemo(() => { - const rows = syncs.map((sync) => ({ - timestamp: sync.endTime, - count: sync.numRows, - })); - let timedRowCounts = aggregateCountsByInterval(rows, aggregateType); - timedRowCounts = timedRowCounts.slice(0, 29); - timedRowCounts = timedRowCounts.reverse(); - return timedRowCounts.map((count) => ({ - name: formatGraphLabel(new Date(count[0]), aggregateType), - 'Rows synced at a point in time': Number(count[1]), - })); - }, [syncs, aggregateType]); + useEffect(() => { + const fetchData = async () => { + const req: any = { + flowJobName: mirrorName, + aggregateType, + }; + + const res = await fetch('/api/v1/mirrors/cdc/graph', { + method: 'POST', + cache: 'no-store', + body: JSON.stringify(req), + }); + const data: { data: { time: number; rows: number }[] } = await res.json(); + setGraphValues( + data.data.map(({ time, rows }) => ({ + name: formatGraphLabel(new Date(time), aggregateType), + 'Rows synced at a point in time': Number(rows), + })) + ); + }; + + fetchData(); + }, [mirrorName, aggregateType]); return (
@@ -59,5 +68,3 @@ function CdcGraph({ syncs }: CdcGraphProps) {
); } - -export default CdcGraph; diff --git a/ui/app/mirrors/[mirrorId]/handlers.ts b/ui/app/mirrors/[mirrorId]/handlers.ts index 7e68f26a53..0a3f46a4e2 100644 --- a/ui/app/mirrors/[mirrorId]/handlers.ts +++ b/ui/app/mirrors/[mirrorId]/handlers.ts @@ -12,6 +12,7 @@ export const getMirrorState = async ( body: JSON.stringify({ flow_job_name, include_flow_info: true, + exclude_batches: true, }), }); if (!res.ok) throw res.json(); diff --git a/ui/app/mirrors/[mirrorId]/page.tsx b/ui/app/mirrors/[mirrorId]/page.tsx index 5516e0986a..33832a66cd 100644 --- a/ui/app/mirrors/[mirrorId]/page.tsx +++ b/ui/app/mirrors/[mirrorId]/page.tsx @@ -51,12 +51,7 @@ export default function ViewMirror({ params: { mirrorId } }: EditMirrorProps) { let actionsDropdown = null; if (mirrorState?.cdcStatus) { - syncStatusChild = ( - - ); + syncStatusChild = ; const dbType = dBTypeFromJSON(mirrorState.cdcStatus.destinationType); @@ -93,11 +88,7 @@ export default function ViewMirror({ params: { mirrorId } }: EditMirrorProps) {
{mirrorId}
{actionsDropdown} - + ); } else if (mirrorState?.qrepStatus) { diff --git a/ui/app/mirrors/[mirrorId]/qrepGraph.tsx b/ui/app/mirrors/[mirrorId]/qrepGraph.tsx index 84eb958935..7bc1dbff5d 100644 --- a/ui/app/mirrors/[mirrorId]/qrepGraph.tsx +++ b/ui/app/mirrors/[mirrorId]/qrepGraph.tsx @@ -17,22 +17,20 @@ type QRepGraphProps = { }; function QrepGraph({ syncs }: QRepGraphProps) { - let [aggregateType, setAggregateType] = useState( + const [aggregateType, setAggregateType] = useState( TimeAggregateTypes.HOUR ); const initialCount: [string, number][] = []; - let [counts, setCounts] = useState(initialCount); + const [counts, setCounts] = useState(initialCount); useEffect(() => { - let rows = syncs.map((sync) => ({ + const rows = syncs.map((sync) => ({ timestamp: sync.startTime!, count: Number(sync.rowsInPartition) ?? 0, })); - let counts = aggregateCountsByInterval(rows, aggregateType); - counts = counts.slice(0, 29); - counts = counts.reverse(); - setCounts(counts); + const counts = aggregateCountsByInterval(rows, aggregateType); + setCounts(counts.slice(0, 29).reverse()); }, [aggregateType, syncs]); return ( diff --git a/ui/app/mirrors/[mirrorId]/syncStatus.tsx b/ui/app/mirrors/[mirrorId]/syncStatus.tsx index 0c2d2ba49d..fb45bfc6e7 100644 --- a/ui/app/mirrors/[mirrorId]/syncStatus.tsx +++ b/ui/app/mirrors/[mirrorId]/syncStatus.tsx @@ -1,6 +1,6 @@ 'use client'; import { fetcher } from '@/app/utils/swr'; -import { CDCBatch, CDCTableTotalCountsResponse } from '@/grpc_generated/route'; +import { CDCTableTotalCountsResponse } from '@/grpc_generated/route'; import useSWR from 'swr'; import CdcGraph from './cdcGraph'; import RowsDisplay from './rowsDisplay'; @@ -9,10 +9,9 @@ import TableStats from './tableStats'; type SyncStatusProps = { flowJobName: string; - rows: CDCBatch[]; }; -export default function SyncStatus({ flowJobName, rows }: SyncStatusProps) { +export default function SyncStatus({ flowJobName }: SyncStatusProps) { const { data: tableStats, error, @@ -31,9 +30,9 @@ export default function SyncStatus({ flowJobName, rows }: SyncStatusProps) {
- +
- +
) diff --git a/ui/app/mirrors/[mirrorId]/syncStatusTable.tsx b/ui/app/mirrors/[mirrorId]/syncStatusTable.tsx index 6453408948..493822fcf3 100644 --- a/ui/app/mirrors/[mirrorId]/syncStatusTable.tsx +++ b/ui/app/mirrors/[mirrorId]/syncStatusTable.tsx @@ -1,22 +1,21 @@ 'use client'; -import SelectTheme from '@/app/styles/select'; import TimeLabel from '@/components/TimeComponent'; -import { CDCBatch } from '@/grpc_generated/route'; +import { + CDCBatch, + GetCDCBatchesRequest, + GetCDCBatchesResponse, +} from '@/grpc_generated/route'; import { Button } from '@/lib/Button'; import { Icon } from '@/lib/Icon'; import { Label } from '@/lib/Label'; import { ProgressCircle } from '@/lib/ProgressCircle'; -import { SearchField } from '@/lib/SearchField'; import { Table, TableCell, TableRow } from '@/lib/Table'; import moment from 'moment'; -import { useMemo, useState } from 'react'; -import ReactSelect from 'react-select'; +import { useCallback, useEffect, useState } from 'react'; import { RowDataFormatter } from './rowsDisplay'; -type SyncStatusTableProps = { - rows: CDCBatch[]; -}; +type SyncStatusTableProps = { mirrorName: string }; function TimeWithDurationOrRunning({ startTime, @@ -46,63 +45,54 @@ function TimeWithDurationOrRunning({ } const ROWS_PER_PAGE = 5; -const sortOptions = [ - { value: 'batchId', label: 'Batch ID' }, - { value: 'startTime', label: 'Start Time' }, - { value: 'endTime', label: 'End Time' }, - { value: 'numRows', label: 'Rows Synced' }, -]; - -export const SyncStatusTable = ({ rows }: SyncStatusTableProps) => { +export const SyncStatusTable = ({ mirrorName }: SyncStatusTableProps) => { + const [totalPages, setTotalPages] = useState(1); const [currentPage, setCurrentPage] = useState(1); - const [sortField, setSortField] = useState< - 'startTime' | 'endTime' | 'numRows' | 'batchId' - >('batchId'); - - const [sortDir, setSortDir] = useState<'asc' | 'dsc'>('dsc'); - const totalPages = Math.ceil(rows.length / ROWS_PER_PAGE); - const [searchQuery, setSearchQuery] = useState(NaN); - const displayedRows = useMemo(() => { - const searchRows = rows.filter((row) => row.batchId == searchQuery); - const shownRows = searchRows.length > 0 ? searchRows : rows; - shownRows.sort((a, b) => { - let aValue: any = a[sortField]; - let bValue: any = b[sortField]; - if (aValue === undefined || bValue === undefined) { - return 0; - } - if (sortField === 'batchId') { - aValue = BigInt(aValue); - bValue = BigInt(bValue); - } + const [ascending, setAscending] = useState(false); + const [[beforeId, afterId], setBeforeAfterId] = useState([-1, -1]); + const [batches, setBatches] = useState([]); - if (aValue < bValue) { - return sortDir === 'dsc' ? 1 : -1; - } else if (aValue > bValue) { - return sortDir === 'dsc' ? -1 : 1; - } else { - return 0; - } - }); + useEffect(() => { + const fetchData = async () => { + const req: GetCDCBatchesRequest = { + flowJobName: mirrorName, + limit: ROWS_PER_PAGE, + beforeId: beforeId, + afterId: afterId, + ascending, + }; + const res = await fetch('/api/v1/mirrors/cdc/batches', { + method: 'POST', + cache: 'no-store', + body: JSON.stringify(req), + }); + const data: GetCDCBatchesResponse = await res.json(); + setBatches(data.cdcBatches ?? []); + setCurrentPage(data.page); + setTotalPages(Math.ceil(data.total / req.limit)); + }; - const startRow = (currentPage - 1) * ROWS_PER_PAGE; - const endRow = startRow + ROWS_PER_PAGE; - return shownRows.length > ROWS_PER_PAGE - ? shownRows.slice(startRow, endRow) - : shownRows; - }, [searchQuery, currentPage, rows, sortField, sortDir]); + fetchData(); + }, [mirrorName, beforeId, afterId, ascending]); - const handlePrevPage = () => { - if (currentPage > 1) { - setCurrentPage(currentPage - 1); + const nextPage = useCallback(() => { + if (batches.length === 0) { + setBeforeAfterId([-1, ascending ? 0 : -1]); + } else if (ascending) { + setBeforeAfterId([-1, batches[batches.length - 1].batchId]); + } else { + setBeforeAfterId([batches[batches.length - 1].batchId, -1]); } - }; - - const handleNextPage = () => { - if (currentPage < totalPages) { - setCurrentPage(currentPage + 1); + }, [batches, ascending]); + const prevPage = useCallback(() => { + if (batches.length === 0 || currentPage < 3) { + setBeforeAfterId([-1, ascending ? 0 : -1]); + } else if (ascending) { + setBeforeAfterId([batches[0].batchId, -1]); + } else { + setBeforeAfterId([-1, batches[0].batchId]); } - }; + }, [batches, ascending, currentPage]); return ( { toolbar={{ left: (
- - @@ -123,53 +113,30 @@ export const SyncStatusTable = ({ rows }: SyncStatusTableProps) => { > -
- opt.value === sortField) - ?.label, - }} - onChange={(val, _) => { - const sortVal = - (val?.value as - | 'startTime' - | 'endTime' - | 'numRows' - | 'batchId') ?? 'batchId'; - setSortField(sortVal); - }} - defaultValue={{ value: 'batchId', label: 'Batch ID' }} - theme={SelectTheme} - /> -
), - right: ( - ) => - setSearchQuery(+e.target.value) - } - /> - ), }} header={ @@ -185,7 +152,7 @@ export const SyncStatusTable = ({ rows }: SyncStatusTableProps) => { } > - {displayedRows.map((row) => ( + {batches.map((row) => ( diff --git a/ui/app/peers/[peerName]/lagGraph.tsx b/ui/app/peers/[peerName]/lagGraph.tsx index 7107b7d809..87b90fa8c8 100644 --- a/ui/app/peers/[peerName]/lagGraph.tsx +++ b/ui/app/peers/[peerName]/lagGraph.tsx @@ -22,7 +22,7 @@ function parseLSN(lsn: string): number { if (!lsn) return 0; const [lsn1, lsn2] = lsn.split('/'); return Number( - (BigInt(parseInt(lsn1)) << BigInt(32)) | BigInt(parseInt(lsn2)) + (BigInt(parseInt(lsn1, 16)) << BigInt(32)) | BigInt(parseInt(lsn2, 16)) ); } @@ -135,7 +135,7 @@ export default function LagGraph({ peerName }: LagGraphProps) { /> setShowLsn((val) => !val)} /> )} diff --git a/ui/app/settings/page.tsx b/ui/app/settings/page.tsx index e5bb5ad15b..7ebb1b4cd0 100644 --- a/ui/app/settings/page.tsx +++ b/ui/app/settings/page.tsx @@ -123,9 +123,6 @@ const DynamicSettingItem = ({ const updatedSetting = { ...setting, value: newValue }; await fetch('/api/v1/dynamic_settings', { method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, body: JSON.stringify(updatedSetting), }); setEditMode(false); diff --git a/ui/components/LogsTable.tsx b/ui/components/LogsTable.tsx index c340044b44..cc09deeb7e 100644 --- a/ui/components/LogsTable.tsx +++ b/ui/components/LogsTable.tsx @@ -69,9 +69,6 @@ export default function LogsTable({ try { const response = await fetch('/api/v1/mirrors/logs', { method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, cache: 'no-store', body: JSON.stringify(req), });