From 8c5bb091ab5b9ca10cb3b16b4fea0ab3d6c1deae Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Mon, 15 Jan 2024 23:59:21 +0530 Subject: [PATCH] Fix Eventhub Partition Key and UI Graph (#1083) We now hash the partition key column value obtained from the destination table name in create mirror for PG->EH. This is to reduce the number of Eventhub batches we create. Noticed that not doing so makes the mirror extremely slow Also fixes some code of the UI Graph component --- flow/connectors/eventhub/eventhub.go | 11 +++++-- flow/connectors/utils/partition_hash.go | 18 ++++++++++++ .../[mirrorId]/aggregatedCountsByInterval.ts | 4 ++- ui/app/mirrors/edit/[mirrorId]/cdcGraph.tsx | 29 +++++++++---------- 4 files changed, 43 insertions(+), 19 deletions(-) create mode 100644 flow/connectors/utils/partition_hash.go diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 2e711e0cf6..b0b248a488 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -175,6 +175,13 @@ func (c *EventHubConnector) processBatch( return 0, err } + ehConfig, ok := c.hubManager.peerConfig.Get(destination.PeerName) + if !ok { + c.logger.Error("failed to get eventhub config", slog.Any("error", err)) + return 0, err + } + + numPartitions := ehConfig.PartitionCount // Scoped eventhub is of the form peer_name.eventhub_name.partition_column // partition_column is the column in the table that is used to determine // the partition key for the eventhub. @@ -186,7 +193,7 @@ func (c *EventHubConnector) processBatch( } else { partitionKey = fmt.Sprintf("%v", partitionValue) } - + partitionKey = utils.HashedPartitionKey(partitionKey, numPartitions) destination.SetPartitionValue(partitionKey) err = batchPerTopic.AddEvent(ctx, destination, json, false) if err != nil { @@ -196,7 +203,7 @@ func (c *EventHubConnector) processBatch( curNumRecords := numRecords.Load() if curNumRecords%1000 == 0 { - c.logger.Error("processBatch", slog.Int("number of records processed for sending", int(curNumRecords))) + c.logger.Info("processBatch", slog.Int("number of records processed for sending", int(curNumRecords))) } case <-ticker.C: diff --git a/flow/connectors/utils/partition_hash.go b/flow/connectors/utils/partition_hash.go new file mode 100644 index 0000000000..14de3ae943 --- /dev/null +++ b/flow/connectors/utils/partition_hash.go @@ -0,0 +1,18 @@ +package utils + +import ( + "fmt" + "hash/fnv" +) + +func hashString(s string) uint32 { + h := fnv.New32a() + h.Write([]byte(s)) + return h.Sum32() +} + +func HashedPartitionKey(s string, numPartitions uint32) string { + hashValue := hashString(s) + partition := hashValue % numPartitions + return fmt.Sprintf("%d", partition) +} diff --git a/ui/app/mirrors/edit/[mirrorId]/aggregatedCountsByInterval.ts b/ui/app/mirrors/edit/[mirrorId]/aggregatedCountsByInterval.ts index c156b380b0..8abda13b25 100644 --- a/ui/app/mirrors/edit/[mirrorId]/aggregatedCountsByInterval.ts +++ b/ui/app/mirrors/edit/[mirrorId]/aggregatedCountsByInterval.ts @@ -44,7 +44,9 @@ function aggregateCountsByInterval( } else if (interval === '15min') { N = 15; } - const date = roundUpToNearestNMinutes(timestamp, N); + + const currTs = new Date(timestamp); + const date = roundUpToNearestNMinutes(currTs, N); const formattedTimestamp = moment(date).format(timeUnit); if (!aggregatedCounts[formattedTimestamp]) { diff --git a/ui/app/mirrors/edit/[mirrorId]/cdcGraph.tsx b/ui/app/mirrors/edit/[mirrorId]/cdcGraph.tsx index a1c315459b..5df84d50d1 100644 --- a/ui/app/mirrors/edit/[mirrorId]/cdcGraph.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/cdcGraph.tsx @@ -1,28 +1,28 @@ 'use client'; import { SyncStatusRow } from '@/app/dto/MirrorsDTO'; -import { formatGraphLabel, timeOptions } from '@/app/utils/graph'; +import { timeOptions } from '@/app/utils/graph'; import { Label } from '@/lib/Label'; import { BarChart } from '@tremor/react'; -import { useEffect, useState } from 'react'; +import { useMemo, useState } from 'react'; import ReactSelect from 'react-select'; import aggregateCountsByInterval from './aggregatedCountsByInterval'; function CdcGraph({ syncs }: { syncs: SyncStatusRow[] }) { let [aggregateType, setAggregateType] = useState('hour'); - const initialCount: [string, number][] = []; - let [counts, setCounts] = useState(initialCount); - useEffect(() => { - let rows = syncs.map((sync) => ({ + const graphValues = useMemo(() => { + const rows = syncs.map((sync) => ({ timestamp: sync.startTime, count: sync.numRows, })); - - let counts = aggregateCountsByInterval(rows, aggregateType); - counts = counts.slice(0, 29); - counts = counts.reverse(); - setCounts(counts); - }, [aggregateType, syncs]); + let timedRowCounts = aggregateCountsByInterval(rows, aggregateType); + timedRowCounts = timedRowCounts.slice(0, 29); + timedRowCounts = timedRowCounts.reverse(); + return timedRowCounts.map((count) => ({ + name: count[0], + 'Rows synced at a point in time': count[1], + })); + }, [syncs, aggregateType]); return (
@@ -40,10 +40,7 @@ function CdcGraph({ syncs }: { syncs: SyncStatusRow[] }) {
({ - name: formatGraphLabel(new Date(count[0]), aggregateType), - 'Rows synced at a point in time': count[1], - }))} + data={graphValues} index='name' categories={['Rows synced at a point in time']} />