Skip to content

Commit

Permalink
RMPMP-242: Updated dltOptions logs on error
Browse files Browse the repository at this point in the history
  • Loading branch information
stewartboyd119 committed Sep 3, 2024
1 parent ecb642e commit b2fdacd
Showing 1 changed file with 2 additions and 2 deletions.
4 changes: 2 additions & 2 deletions workoption.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (d dltOption) apply(w *Work) {
// establish a writer to the DLT early, so when the time comes the write is fast
writer, err := w.kafkaProvider.Writer(ctx, d.dltConfig)
if err != nil {
w.logger.Errorw(ctx, "Failed to get writer for dlt", "error", err, "offset", message.Offset, "partition", message.Partition, "topic", message.Topic)
w.logger.Errorw(ctx, "Failed to get writer for dlt", "error", err, "offset", message.Offset, "partition", message.Partition, "source_topic", message.Topic, "dlt_topic", d.dltConfig.Topic)
return
}

Expand All @@ -136,7 +136,7 @@ func (d dltOption) apply(w *Work) {
}

if _, err := writer.WriteRaw(ctx, &message.Key, message.value); err != nil {
w.logger.Errorw(ctx, "Failed to forward to DLT", "error", err, "offset", message.Offset, "partition", message.Partition, "topic", message.Topic)
w.logger.Errorw(ctx, "Failed to forward to DLT", "error", err, "offset", message.Offset, "partition", message.Partition, "source_topic", message.Topic, "dlt_topic", d.dltConfig.Topic)
}
}
w.onDones = append(w.onDones, f)
Expand Down

0 comments on commit b2fdacd

Please sign in to comment.