Skip to content

Commit

Permalink
Fix Eventhub Partition Key and UI Graph (#1083)
Browse files Browse the repository at this point in the history
We now hash the partition key column value obtained from the destination
table name in create mirror for PG->EH.
This is to reduce the number of Eventhub batches we create. Noticed that
not doing so makes the mirror extremely slow
Also fixes some code of the UI Graph component
  • Loading branch information
Amogh-Bharadwaj authored Jan 15, 2024
1 parent 1b906e7 commit 8c5bb09
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 19 deletions.
11 changes: 9 additions & 2 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,13 @@ func (c *EventHubConnector) processBatch(
return 0, err
}

ehConfig, ok := c.hubManager.peerConfig.Get(destination.PeerName)
if !ok {
c.logger.Error("failed to get eventhub config", slog.Any("error", err))
return 0, err
}

numPartitions := ehConfig.PartitionCount
// Scoped eventhub is of the form peer_name.eventhub_name.partition_column
// partition_column is the column in the table that is used to determine
// the partition key for the eventhub.
Expand All @@ -186,7 +193,7 @@ func (c *EventHubConnector) processBatch(
} else {
partitionKey = fmt.Sprintf("%v", partitionValue)
}

partitionKey = utils.HashedPartitionKey(partitionKey, numPartitions)
destination.SetPartitionValue(partitionKey)
err = batchPerTopic.AddEvent(ctx, destination, json, false)
if err != nil {
Expand All @@ -196,7 +203,7 @@ func (c *EventHubConnector) processBatch(

curNumRecords := numRecords.Load()
if curNumRecords%1000 == 0 {
c.logger.Error("processBatch", slog.Int("number of records processed for sending", int(curNumRecords)))
c.logger.Info("processBatch", slog.Int("number of records processed for sending", int(curNumRecords)))
}

case <-ticker.C:
Expand Down
18 changes: 18 additions & 0 deletions flow/connectors/utils/partition_hash.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package utils

import (
"fmt"
"hash/fnv"
)

func hashString(s string) uint32 {
h := fnv.New32a()
h.Write([]byte(s))
return h.Sum32()
}

func HashedPartitionKey(s string, numPartitions uint32) string {
hashValue := hashString(s)
partition := hashValue % numPartitions
return fmt.Sprintf("%d", partition)
}
4 changes: 3 additions & 1 deletion ui/app/mirrors/edit/[mirrorId]/aggregatedCountsByInterval.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ function aggregateCountsByInterval(
} else if (interval === '15min') {
N = 15;
}
const date = roundUpToNearestNMinutes(timestamp, N);

const currTs = new Date(timestamp);
const date = roundUpToNearestNMinutes(currTs, N);
const formattedTimestamp = moment(date).format(timeUnit);

if (!aggregatedCounts[formattedTimestamp]) {
Expand Down
29 changes: 13 additions & 16 deletions ui/app/mirrors/edit/[mirrorId]/cdcGraph.tsx
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
'use client';
import { SyncStatusRow } from '@/app/dto/MirrorsDTO';
import { formatGraphLabel, timeOptions } from '@/app/utils/graph';
import { timeOptions } from '@/app/utils/graph';
import { Label } from '@/lib/Label';
import { BarChart } from '@tremor/react';
import { useEffect, useState } from 'react';
import { useMemo, useState } from 'react';
import ReactSelect from 'react-select';
import aggregateCountsByInterval from './aggregatedCountsByInterval';

function CdcGraph({ syncs }: { syncs: SyncStatusRow[] }) {
let [aggregateType, setAggregateType] = useState('hour');
const initialCount: [string, number][] = [];
let [counts, setCounts] = useState(initialCount);

useEffect(() => {
let rows = syncs.map((sync) => ({
const graphValues = useMemo(() => {
const rows = syncs.map((sync) => ({
timestamp: sync.startTime,
count: sync.numRows,
}));

let counts = aggregateCountsByInterval(rows, aggregateType);
counts = counts.slice(0, 29);
counts = counts.reverse();
setCounts(counts);
}, [aggregateType, syncs]);
let timedRowCounts = aggregateCountsByInterval(rows, aggregateType);
timedRowCounts = timedRowCounts.slice(0, 29);
timedRowCounts = timedRowCounts.reverse();
return timedRowCounts.map((count) => ({
name: count[0],
'Rows synced at a point in time': count[1],
}));
}, [syncs, aggregateType]);

return (
<div>
Expand All @@ -40,10 +40,7 @@ function CdcGraph({ syncs }: { syncs: SyncStatusRow[] }) {
</div>
<BarChart
className='mt-3'
data={counts.map((count) => ({
name: formatGraphLabel(new Date(count[0]), aggregateType),
'Rows synced at a point in time': count[1],
}))}
data={graphValues}
index='name'
categories={['Rows synced at a point in time']}
/>
Expand Down

0 comments on commit 8c5bb09

Please sign in to comment.