From b36b558b6de20b398bfc4dfeacb8868d6c743793 Mon Sep 17 00:00:00 2001 From: Keyna Rafael <95432445+keyn4@users.noreply.github.com> Date: Thu, 1 Aug 2024 14:04:04 -0400 Subject: [PATCH] cleaning duplicates (#4) --- tap_pipedrive/stream.py | 3 +++ tap_pipedrive/tap.py | 8 ++++++++ 2 files changed, 11 insertions(+) diff --git a/tap_pipedrive/stream.py b/tap_pipedrive/stream.py index bb433f3..200f16e 100644 --- a/tap_pipedrive/stream.py +++ b/tap_pipedrive/stream.py @@ -7,6 +7,9 @@ class PipedriveStream(object): + def __init__(self): + self.ids = [] + tap = None endpoint = '' key_properties = [] diff --git a/tap_pipedrive/tap.py b/tap_pipedrive/tap.py index 1fb8595..d18dc50 100644 --- a/tap_pipedrive/tap.py +++ b/tap_pipedrive/tap.py @@ -278,7 +278,15 @@ def do_paginate(self, stream, stream_metadata): # records with metrics with singer.metrics.record_counter(stream.schema) as counter: with singer.Transformer(singer.NO_INTEGER_DATETIME_PARSING) as optimus_prime: + stream_name = stream.get_name() for row in self.iterate_response(response): + # logic to avoid duplicates HGI-6285 + if row["id"] not in stream.ids: + stream.ids.append(row["id"]) + else: + logger.info(f"id '{row['id']}' was previously fetched and processed for {stream_name}, skipping duplicate value...") + continue + row = stream.process_row(row) if not row: # in case of a non-empty response with an empty element continue