From 9868cd94eb4877949dbd525cdc2eaff440e1e81e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 29 Oct 2024 20:25:41 +0000 Subject: [PATCH 1/2] improve logs pagination using LIMIT/OFFSET is inaccurate when new logs come it while logs being viewed this makes a minor improvement by instead being based on records before/after an id from previous page, which is how I'd like to also add pagination to sync batches in mirror status --- flow/cmd/mirror_status.go | 53 +++++++++++--- protos/route.proto | 7 +- ui/app/mirror-logs/table.tsx | 52 +------------ ui/app/mirrors/errors/[mirrorName]/page.tsx | 50 +------------ ui/components/LogsTable.tsx | 81 ++++++++++++++++----- 5 files changed, 115 insertions(+), 128 deletions(-) diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index 70efa7597b..ee95aa9c12 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -5,6 +5,7 @@ import ( "database/sql" "fmt" "log/slog" + "slices" "strings" "time" @@ -581,8 +582,8 @@ func (h *FlowRequestHandler) ListMirrorLogs( ctx context.Context, req *protos.ListMirrorLogsRequest, ) (*protos.ListMirrorLogsResponse, error) { - whereExprs := make([]string, 0, 2) - whereArgs := make([]interface{}, 0, 2) + whereExprs := make([]string, 0, 3) + whereArgs := make([]any, 0, 4) if req.FlowJobName != "" { whereArgs = append(whereArgs, req.FlowJobName) whereExprs = append(whereExprs, "position($1 in flow_name) > 0") @@ -593,23 +594,39 @@ func (h *FlowRequestHandler) ListMirrorLogs( whereExprs = append(whereExprs, fmt.Sprintf("error_type = $%d", len(whereArgs))) } + // count query doesn't want paging + countWhereArgs := slices.Clone(whereArgs) + var countWhereClause string + if len(whereExprs) != 0 { + countWhereClause = " WHERE " + strings.Join(whereExprs, " AND ") + } + + sortOrderBy := "desc" + if req.BeforeId != -1 { + whereArgs = append(whereArgs, req.BeforeId) + whereExprs = append(whereExprs, fmt.Sprintf("id < $%d", len(whereArgs))) + } else if req.AfterId != -1 { + whereArgs = append(whereArgs, req.AfterId) + whereExprs = append(whereExprs, fmt.Sprintf("id > $%d", len(whereArgs))) + sortOrderBy = "" + } + var whereClause string if len(whereExprs) != 0 { whereClause = " WHERE " + strings.Join(whereExprs, " AND ") } - skip := (req.Page - 1) * req.NumPerPage - rows, err := h.pool.Query(ctx, fmt.Sprintf(`select flow_name, error_message, error_type, error_timestamp - from peerdb_stats.flow_errors %s - order by error_timestamp desc - limit %d offset %d`, whereClause, req.NumPerPage, skip), whereArgs...) + rows, err := h.pool.Query(ctx, fmt.Sprintf(`select id, flow_name, error_message, error_type, error_timestamp + from peerdb_stats.flow_errors%s + order by id %s + limit %d`, whereClause, sortOrderBy, req.NumPerPage), whereArgs...) if err != nil { return nil, err } mirrorErrors, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (*protos.MirrorLog, error) { var log protos.MirrorLog var errorTimestamp time.Time - if err := rows.Scan(&log.FlowName, &log.ErrorMessage, &log.ErrorType, &errorTimestamp); err != nil { + if err := rows.Scan(&log.Id, &log.FlowName, &log.ErrorMessage, &log.ErrorType, &errorTimestamp); err != nil { return nil, err } log.ErrorTimestamp = float64(errorTimestamp.UnixMilli()) @@ -618,14 +635,32 @@ func (h *FlowRequestHandler) ListMirrorLogs( if err != nil { return nil, err } + if sortOrderBy == "" { + slices.Reverse(mirrorErrors) + } var total int32 - if err := h.pool.QueryRow(ctx, "select count(*) from peerdb_stats.flow_errors"+whereClause, whereArgs...).Scan(&total); err != nil { + var rowsBehind int32 + if len(mirrorErrors) > 0 { + firstId := mirrorErrors[0].Id + countWhereArgs = append(countWhereArgs, firstId) + if err := h.pool.QueryRow( + ctx, + fmt.Sprintf("select count(*), count(*) filter (where id > $%d) from peerdb_stats.flow_errors%s", + len(countWhereArgs), countWhereClause), + countWhereArgs..., + ).Scan(&total, &rowsBehind); err != nil { + return nil, err + } + } else if err := h.pool.QueryRow( + ctx, "select count(*) from peerdb_stats.flow_errors"+countWhereClause, countWhereArgs..., + ).Scan(&total); err != nil { return nil, err } return &protos.ListMirrorLogsResponse{ Errors: mirrorErrors, Total: total, + Page: rowsBehind/req.NumPerPage + 1, }, nil } diff --git a/protos/route.proto b/protos/route.proto index 9b85da6f47..89887141bd 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -354,16 +354,19 @@ message MirrorLog { string error_message = 2; string error_type = 3; double error_timestamp = 4; + int32 id = 5; } message ListMirrorLogsRequest { string flow_job_name = 1; string level = 2; - int32 page = 3; - int32 num_per_page = 4; + int32 num_per_page = 3; + int32 before_id = 4; + int32 after_id = 5; } message ListMirrorLogsResponse { repeated MirrorLog errors = 1; int32 total = 2; + int32 page = 3; } message ValidateCDCMirrorResponse{ diff --git a/ui/app/mirror-logs/table.tsx b/ui/app/mirror-logs/table.tsx index fc9206a362..4d14c80826 100644 --- a/ui/app/mirror-logs/table.tsx +++ b/ui/app/mirror-logs/table.tsx @@ -1,14 +1,8 @@ 'use client'; import LogsTable from '@/components/LogsTable'; -import { - ListMirrorLogsRequest, - ListMirrorLogsResponse, - ListMirrorNamesResponse, - MirrorLog, -} from '@/grpc_generated/route'; +import { ListMirrorNamesResponse } from '@/grpc_generated/route'; import { ProgressCircle } from '@/lib/ProgressCircle'; -import { useEffect, useState } from 'react'; import ReactSelect from 'react-select'; import 'react-toastify/dist/ReactToastify.css'; import useSWR from 'swr'; @@ -16,7 +10,6 @@ import { useLocalStorage } from 'usehooks-ts'; import { fetcher } from '../utils/swr'; export default function LogsView() { - const [logs, setLogs] = useState([]); const [mirrorName, setMirrorName] = useLocalStorage( 'peerdbMirrorNameFilterForLogs', '' @@ -25,45 +18,9 @@ export default function LogsView() { 'peerdbLogTypeFilterForLogs', 'all' ); - const [currentPage, setCurrentPage] = useState(1); - const [totalPages, setTotalPages] = useState(1); const { data: mirrors }: { data: ListMirrorNamesResponse; error: any } = useSWR('/api/v1/mirrors/names', fetcher); - useEffect(() => { - setCurrentPage(1); - }, [mirrorName]); - - useEffect(() => { - const req: ListMirrorLogsRequest = { - level: logLevel, - flowJobName: mirrorName, - page: currentPage, - numPerPage: 15, - }; - - const fetchData = async () => { - try { - const response = await fetch('/api/v1/mirrors/logs', { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - cache: 'no-store', - body: JSON.stringify(req), - }); - const data: ListMirrorLogsResponse = await response.json(); - const numPages = Math.ceil(data.total / req.numPerPage); - setLogs(data.errors); - setTotalPages(numPages); - } catch (error) { - console.error('Error fetching mirror logs:', error); - } - }; - - fetchData(); - }, [currentPage, mirrorName, logLevel]); - if (!mirrors) { return ; } @@ -107,12 +64,7 @@ export default function LogsView() { /> - + ); } diff --git a/ui/app/mirrors/errors/[mirrorName]/page.tsx b/ui/app/mirrors/errors/[mirrorName]/page.tsx index 42c36c336d..af8acfb66d 100644 --- a/ui/app/mirrors/errors/[mirrorName]/page.tsx +++ b/ui/app/mirrors/errors/[mirrorName]/page.tsx @@ -1,56 +1,13 @@ 'use client'; import LogsTable from '@/components/LogsTable'; -import { - ListMirrorLogsRequest, - ListMirrorLogsResponse, - MirrorLog, -} from '@/grpc_generated/route'; import { Label } from '@/lib/Label'; import { useParams } from 'next/navigation'; -import { useEffect, useState } from 'react'; import { ToastContainer } from 'react-toastify'; import 'react-toastify/dist/ReactToastify.css'; export default function MirrorError() { const params = useParams<{ mirrorName: string }>(); - const [mirrorErrors, setMirrorErrors] = useState([]); - const [currentPage, setCurrentPage] = useState(1); - const [totalPages, setTotalPages] = useState(1); - - useEffect(() => { - setCurrentPage(1); - }, [params.mirrorName]); - - useEffect(() => { - const req: ListMirrorLogsRequest = { - flowJobName: params.mirrorName, - page: currentPage, - numPerPage: 10, - level: 'all', - }; - - const fetchData = async () => { - try { - const response = await fetch('/api/v1/mirrors/logs', { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - cache: 'no-store', - body: JSON.stringify(req), - }); - const data: ListMirrorLogsResponse = await response.json(); - const numPages = Math.ceil(data.total / req.numPerPage); - setMirrorErrors(data.errors); - setTotalPages(numPages); - } catch (error) { - console.error('Error fetching mirror errors:', error); - } - }; - - fetchData(); - }, [currentPage, params.mirrorName]); return ( <> @@ -72,10 +29,9 @@ export default function MirrorError() { diff --git a/ui/components/LogsTable.tsx b/ui/components/LogsTable.tsx index 7d3486158a..e933ceea78 100644 --- a/ui/components/LogsTable.tsx +++ b/ui/components/LogsTable.tsx @@ -1,9 +1,14 @@ import TimeLabel from '@/components/TimeComponent'; -import { MirrorLog } from '@/grpc_generated/route'; +import { + ListMirrorLogsRequest, + ListMirrorLogsResponse, + MirrorLog, +} from '@/grpc_generated/route'; import { Button } from '@/lib/Button'; import { Icon } from '@/lib/Icon'; import { Label } from '@/lib/Label'; import { Table, TableCell, TableRow } from '@/lib/Table'; +import { useCallback, useEffect, useState } from 'react'; import 'react-toastify/dist/ReactToastify.css'; const colorForErrorType = (errorType: string) => { @@ -25,26 +30,62 @@ const extractFromCloneName = (mirrorOrCloneName: string) => { }; export default function LogsTable({ - logs, - currentPage, - totalPages, - setCurrentPage, + numPerPage, + mirrorName, + logLevel, }: { - logs: MirrorLog[]; - currentPage: number; - totalPages: number; - setCurrentPage: (page: number) => void; + numPerPage: number; + mirrorName: string; + logLevel: string; }) { - const handleNextPage = () => { - if (currentPage < totalPages) { - setCurrentPage(currentPage + 1); + const [logs, setLogs] = useState([]); + const [currentPage, setCurrentPage] = useState(1); + const [totalPages, setTotalPages] = useState(1); + const [[beforeId, afterId], setBeforeAfterId] = useState([-1, -1]); + const nextPage = useCallback(() => { + if (logs.length === 0) { + setBeforeAfterId([-1, -1]); } - }; - const handlePrevPage = () => { - if (currentPage > 1) { - setCurrentPage(currentPage - 1); + setBeforeAfterId([logs[logs.length - 1].id, -1]); + }, [logs]); + const prevPage = useCallback(() => { + if (logs.length === 0 || currentPage < 3) { + setBeforeAfterId([-1, -1]); } - }; + setBeforeAfterId([-1, logs[0].id]); + }, [logs, currentPage]); + + useEffect(() => { + const fetchData = async () => { + const req: ListMirrorLogsRequest = { + level: logLevel, + flowJobName: mirrorName, + beforeId, + afterId, + numPerPage, + }; + + try { + const response = await fetch('/api/v1/mirrors/logs', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + cache: 'no-store', + body: JSON.stringify(req), + }); + const data: ListMirrorLogsResponse = await response.json(); + const numPages = Math.ceil(data.total / req.numPerPage); + setLogs(data.errors); + setTotalPages(numPages); + setCurrentPage(data.page); + } catch (error) { + console.error('Error fetching mirror logs:', error); + } + }; + + fetchData(); + }, [mirrorName, logLevel, numPerPage, afterId, beforeId]); return ( - - @@ -82,7 +123,7 @@ export default function LogsTable({ }} > {logs.map((log, idx) => ( - + Date: Wed, 30 Oct 2024 03:22:51 +0000 Subject: [PATCH 2/2] backwards compatibility --- flow/cmd/mirror_status.go | 31 ++++++++++++++++++++++--------- protos/route.proto | 7 ++++--- ui/components/LogsTable.tsx | 1 + 3 files changed, 27 insertions(+), 12 deletions(-) diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index ee95aa9c12..da68b6457c 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -602,13 +602,15 @@ func (h *FlowRequestHandler) ListMirrorLogs( } sortOrderBy := "desc" - if req.BeforeId != -1 { - whereArgs = append(whereArgs, req.BeforeId) - whereExprs = append(whereExprs, fmt.Sprintf("id < $%d", len(whereArgs))) - } else if req.AfterId != -1 { - whereArgs = append(whereArgs, req.AfterId) - whereExprs = append(whereExprs, fmt.Sprintf("id > $%d", len(whereArgs))) - sortOrderBy = "" + if req.BeforeId != 0 && req.AfterId != 0 { + if req.BeforeId != -1 { + whereArgs = append(whereArgs, req.BeforeId) + whereExprs = append(whereExprs, fmt.Sprintf("id < $%d", len(whereArgs))) + } else if req.AfterId != -1 { + whereArgs = append(whereArgs, req.AfterId) + whereExprs = append(whereExprs, fmt.Sprintf("id > $%d", len(whereArgs))) + sortOrderBy = "" + } } var whereClause string @@ -616,10 +618,16 @@ func (h *FlowRequestHandler) ListMirrorLogs( whereClause = " WHERE " + strings.Join(whereExprs, " AND ") } + // page is deprecated + var offsetClause string + if req.Page != 0 { + offsetClause = fmt.Sprintf(" offset %d", (req.Page-1)*req.NumPerPage) + } + rows, err := h.pool.Query(ctx, fmt.Sprintf(`select id, flow_name, error_message, error_type, error_timestamp from peerdb_stats.flow_errors%s order by id %s - limit %d`, whereClause, sortOrderBy, req.NumPerPage), whereArgs...) + limit %d%s`, whereClause, sortOrderBy, req.NumPerPage, offsetClause), whereArgs...) if err != nil { return nil, err } @@ -658,9 +666,14 @@ func (h *FlowRequestHandler) ListMirrorLogs( return nil, err } + page := req.Page + if page == 0 { + page = rowsBehind/req.NumPerPage + 1 + } + return &protos.ListMirrorLogsResponse{ Errors: mirrorErrors, Total: total, - Page: rowsBehind/req.NumPerPage + 1, + Page: page, }, nil } diff --git a/protos/route.proto b/protos/route.proto index 89887141bd..a729f88ac4 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -359,9 +359,10 @@ message MirrorLog { message ListMirrorLogsRequest { string flow_job_name = 1; string level = 2; - int32 num_per_page = 3; - int32 before_id = 4; - int32 after_id = 5; + int32 page = 3; + int32 num_per_page = 4; + int32 before_id = 5; + int32 after_id = 6; } message ListMirrorLogsResponse { repeated MirrorLog errors = 1; diff --git a/ui/components/LogsTable.tsx b/ui/components/LogsTable.tsx index e933ceea78..c340044b44 100644 --- a/ui/components/LogsTable.tsx +++ b/ui/components/LogsTable.tsx @@ -63,6 +63,7 @@ export default function LogsTable({ beforeId, afterId, numPerPage, + page: 0, // deprecated }; try {