diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index 70efa7597b..da68b6457c 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -5,6 +5,7 @@ import ( "database/sql" "fmt" "log/slog" + "slices" "strings" "time" @@ -581,8 +582,8 @@ func (h *FlowRequestHandler) ListMirrorLogs( ctx context.Context, req *protos.ListMirrorLogsRequest, ) (*protos.ListMirrorLogsResponse, error) { - whereExprs := make([]string, 0, 2) - whereArgs := make([]interface{}, 0, 2) + whereExprs := make([]string, 0, 3) + whereArgs := make([]any, 0, 4) if req.FlowJobName != "" { whereArgs = append(whereArgs, req.FlowJobName) whereExprs = append(whereExprs, "position($1 in flow_name) > 0") @@ -593,23 +594,47 @@ func (h *FlowRequestHandler) ListMirrorLogs( whereExprs = append(whereExprs, fmt.Sprintf("error_type = $%d", len(whereArgs))) } + // count query doesn't want paging + countWhereArgs := slices.Clone(whereArgs) + var countWhereClause string + if len(whereExprs) != 0 { + countWhereClause = " WHERE " + strings.Join(whereExprs, " AND ") + } + + sortOrderBy := "desc" + if req.BeforeId != 0 && req.AfterId != 0 { + if req.BeforeId != -1 { + whereArgs = append(whereArgs, req.BeforeId) + whereExprs = append(whereExprs, fmt.Sprintf("id < $%d", len(whereArgs))) + } else if req.AfterId != -1 { + whereArgs = append(whereArgs, req.AfterId) + whereExprs = append(whereExprs, fmt.Sprintf("id > $%d", len(whereArgs))) + sortOrderBy = "" + } + } + var whereClause string if len(whereExprs) != 0 { whereClause = " WHERE " + strings.Join(whereExprs, " AND ") } - skip := (req.Page - 1) * req.NumPerPage - rows, err := h.pool.Query(ctx, fmt.Sprintf(`select flow_name, error_message, error_type, error_timestamp - from peerdb_stats.flow_errors %s - order by error_timestamp desc - limit %d offset %d`, whereClause, req.NumPerPage, skip), whereArgs...) + // page is deprecated + var offsetClause string + if req.Page != 0 { + offsetClause = fmt.Sprintf(" offset %d", (req.Page-1)*req.NumPerPage) + } + + rows, err := h.pool.Query(ctx, fmt.Sprintf(`select id, flow_name, error_message, error_type, error_timestamp + from peerdb_stats.flow_errors%s + order by id %s + limit %d%s`, whereClause, sortOrderBy, req.NumPerPage, offsetClause), whereArgs...) if err != nil { return nil, err } mirrorErrors, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (*protos.MirrorLog, error) { var log protos.MirrorLog var errorTimestamp time.Time - if err := rows.Scan(&log.FlowName, &log.ErrorMessage, &log.ErrorType, &errorTimestamp); err != nil { + if err := rows.Scan(&log.Id, &log.FlowName, &log.ErrorMessage, &log.ErrorType, &errorTimestamp); err != nil { return nil, err } log.ErrorTimestamp = float64(errorTimestamp.UnixMilli()) @@ -618,14 +643,37 @@ func (h *FlowRequestHandler) ListMirrorLogs( if err != nil { return nil, err } + if sortOrderBy == "" { + slices.Reverse(mirrorErrors) + } var total int32 - if err := h.pool.QueryRow(ctx, "select count(*) from peerdb_stats.flow_errors"+whereClause, whereArgs...).Scan(&total); err != nil { + var rowsBehind int32 + if len(mirrorErrors) > 0 { + firstId := mirrorErrors[0].Id + countWhereArgs = append(countWhereArgs, firstId) + if err := h.pool.QueryRow( + ctx, + fmt.Sprintf("select count(*), count(*) filter (where id > $%d) from peerdb_stats.flow_errors%s", + len(countWhereArgs), countWhereClause), + countWhereArgs..., + ).Scan(&total, &rowsBehind); err != nil { + return nil, err + } + } else if err := h.pool.QueryRow( + ctx, "select count(*) from peerdb_stats.flow_errors"+countWhereClause, countWhereArgs..., + ).Scan(&total); err != nil { return nil, err } + page := req.Page + if page == 0 { + page = rowsBehind/req.NumPerPage + 1 + } + return &protos.ListMirrorLogsResponse{ Errors: mirrorErrors, Total: total, + Page: page, }, nil } diff --git a/protos/route.proto b/protos/route.proto index 9b85da6f47..a729f88ac4 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -354,16 +354,20 @@ message MirrorLog { string error_message = 2; string error_type = 3; double error_timestamp = 4; + int32 id = 5; } message ListMirrorLogsRequest { string flow_job_name = 1; string level = 2; int32 page = 3; int32 num_per_page = 4; + int32 before_id = 5; + int32 after_id = 6; } message ListMirrorLogsResponse { repeated MirrorLog errors = 1; int32 total = 2; + int32 page = 3; } message ValidateCDCMirrorResponse{ diff --git a/ui/app/mirror-logs/table.tsx b/ui/app/mirror-logs/table.tsx index fc9206a362..4d14c80826 100644 --- a/ui/app/mirror-logs/table.tsx +++ b/ui/app/mirror-logs/table.tsx @@ -1,14 +1,8 @@ 'use client'; import LogsTable from '@/components/LogsTable'; -import { - ListMirrorLogsRequest, - ListMirrorLogsResponse, - ListMirrorNamesResponse, - MirrorLog, -} from '@/grpc_generated/route'; +import { ListMirrorNamesResponse } from '@/grpc_generated/route'; import { ProgressCircle } from '@/lib/ProgressCircle'; -import { useEffect, useState } from 'react'; import ReactSelect from 'react-select'; import 'react-toastify/dist/ReactToastify.css'; import useSWR from 'swr'; @@ -16,7 +10,6 @@ import { useLocalStorage } from 'usehooks-ts'; import { fetcher } from '../utils/swr'; export default function LogsView() { - const [logs, setLogs] = useState([]); const [mirrorName, setMirrorName] = useLocalStorage( 'peerdbMirrorNameFilterForLogs', '' @@ -25,45 +18,9 @@ export default function LogsView() { 'peerdbLogTypeFilterForLogs', 'all' ); - const [currentPage, setCurrentPage] = useState(1); - const [totalPages, setTotalPages] = useState(1); const { data: mirrors }: { data: ListMirrorNamesResponse; error: any } = useSWR('/api/v1/mirrors/names', fetcher); - useEffect(() => { - setCurrentPage(1); - }, [mirrorName]); - - useEffect(() => { - const req: ListMirrorLogsRequest = { - level: logLevel, - flowJobName: mirrorName, - page: currentPage, - numPerPage: 15, - }; - - const fetchData = async () => { - try { - const response = await fetch('/api/v1/mirrors/logs', { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - cache: 'no-store', - body: JSON.stringify(req), - }); - const data: ListMirrorLogsResponse = await response.json(); - const numPages = Math.ceil(data.total / req.numPerPage); - setLogs(data.errors); - setTotalPages(numPages); - } catch (error) { - console.error('Error fetching mirror logs:', error); - } - }; - - fetchData(); - }, [currentPage, mirrorName, logLevel]); - if (!mirrors) { return ; } @@ -107,12 +64,7 @@ export default function LogsView() { /> - + ); } diff --git a/ui/app/mirrors/errors/[mirrorName]/page.tsx b/ui/app/mirrors/errors/[mirrorName]/page.tsx index 42c36c336d..af8acfb66d 100644 --- a/ui/app/mirrors/errors/[mirrorName]/page.tsx +++ b/ui/app/mirrors/errors/[mirrorName]/page.tsx @@ -1,56 +1,13 @@ 'use client'; import LogsTable from '@/components/LogsTable'; -import { - ListMirrorLogsRequest, - ListMirrorLogsResponse, - MirrorLog, -} from '@/grpc_generated/route'; import { Label } from '@/lib/Label'; import { useParams } from 'next/navigation'; -import { useEffect, useState } from 'react'; import { ToastContainer } from 'react-toastify'; import 'react-toastify/dist/ReactToastify.css'; export default function MirrorError() { const params = useParams<{ mirrorName: string }>(); - const [mirrorErrors, setMirrorErrors] = useState([]); - const [currentPage, setCurrentPage] = useState(1); - const [totalPages, setTotalPages] = useState(1); - - useEffect(() => { - setCurrentPage(1); - }, [params.mirrorName]); - - useEffect(() => { - const req: ListMirrorLogsRequest = { - flowJobName: params.mirrorName, - page: currentPage, - numPerPage: 10, - level: 'all', - }; - - const fetchData = async () => { - try { - const response = await fetch('/api/v1/mirrors/logs', { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - cache: 'no-store', - body: JSON.stringify(req), - }); - const data: ListMirrorLogsResponse = await response.json(); - const numPages = Math.ceil(data.total / req.numPerPage); - setMirrorErrors(data.errors); - setTotalPages(numPages); - } catch (error) { - console.error('Error fetching mirror errors:', error); - } - }; - - fetchData(); - }, [currentPage, params.mirrorName]); return ( <> @@ -72,10 +29,9 @@ export default function MirrorError() { diff --git a/ui/components/LogsTable.tsx b/ui/components/LogsTable.tsx index 7d3486158a..c340044b44 100644 --- a/ui/components/LogsTable.tsx +++ b/ui/components/LogsTable.tsx @@ -1,9 +1,14 @@ import TimeLabel from '@/components/TimeComponent'; -import { MirrorLog } from '@/grpc_generated/route'; +import { + ListMirrorLogsRequest, + ListMirrorLogsResponse, + MirrorLog, +} from '@/grpc_generated/route'; import { Button } from '@/lib/Button'; import { Icon } from '@/lib/Icon'; import { Label } from '@/lib/Label'; import { Table, TableCell, TableRow } from '@/lib/Table'; +import { useCallback, useEffect, useState } from 'react'; import 'react-toastify/dist/ReactToastify.css'; const colorForErrorType = (errorType: string) => { @@ -25,26 +30,63 @@ const extractFromCloneName = (mirrorOrCloneName: string) => { }; export default function LogsTable({ - logs, - currentPage, - totalPages, - setCurrentPage, + numPerPage, + mirrorName, + logLevel, }: { - logs: MirrorLog[]; - currentPage: number; - totalPages: number; - setCurrentPage: (page: number) => void; + numPerPage: number; + mirrorName: string; + logLevel: string; }) { - const handleNextPage = () => { - if (currentPage < totalPages) { - setCurrentPage(currentPage + 1); + const [logs, setLogs] = useState([]); + const [currentPage, setCurrentPage] = useState(1); + const [totalPages, setTotalPages] = useState(1); + const [[beforeId, afterId], setBeforeAfterId] = useState([-1, -1]); + const nextPage = useCallback(() => { + if (logs.length === 0) { + setBeforeAfterId([-1, -1]); } - }; - const handlePrevPage = () => { - if (currentPage > 1) { - setCurrentPage(currentPage - 1); + setBeforeAfterId([logs[logs.length - 1].id, -1]); + }, [logs]); + const prevPage = useCallback(() => { + if (logs.length === 0 || currentPage < 3) { + setBeforeAfterId([-1, -1]); } - }; + setBeforeAfterId([-1, logs[0].id]); + }, [logs, currentPage]); + + useEffect(() => { + const fetchData = async () => { + const req: ListMirrorLogsRequest = { + level: logLevel, + flowJobName: mirrorName, + beforeId, + afterId, + numPerPage, + page: 0, // deprecated + }; + + try { + const response = await fetch('/api/v1/mirrors/logs', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + cache: 'no-store', + body: JSON.stringify(req), + }); + const data: ListMirrorLogsResponse = await response.json(); + const numPages = Math.ceil(data.total / req.numPerPage); + setLogs(data.errors); + setTotalPages(numPages); + setCurrentPage(data.page); + } catch (error) { + console.error('Error fetching mirror logs:', error); + } + }; + + fetchData(); + }, [mirrorName, logLevel, numPerPage, afterId, beforeId]); return ( - - @@ -82,7 +124,7 @@ export default function LogsTable({ }} > {logs.map((log, idx) => ( - +