diff --git a/application/export_change_streams.go b/application/export_change_streams.go index 2db64d9..ccd0b8b 100644 --- a/application/export_change_streams.go +++ b/application/export_change_streams.go @@ -215,6 +215,7 @@ type ( exportToKinesisStream(ctx context.Context, cs primitive.M) error exportToFile(ctx context.Context, cs primitive.M) error saveResumeToken(ctx context.Context, rt string) error + err() error } ChangeStreamsExporterImpl struct { @@ -260,13 +261,19 @@ func (c *changeStreamsExporterClientImpl) exportToPubsub(ctx context.Context, cs func (c *changeStreamsExporterClientImpl) exportToKinesisStream(ctx context.Context, cs primitive.M) error { return c.kinesisStream.ExportToKinesisStream(ctx, cs) } + func (c *changeStreamsExporterClientImpl) exportToFile(ctx context.Context, cs primitive.M) error { return c.fileExporter.Export(ctx, cs) } + func (c *changeStreamsExporterClientImpl) saveResumeToken(ctx context.Context, rt string) error { return c.resumeToken.SaveResumeToken(ctx, rt) } +func (c *changeStreamsExporterClientImpl) err() error { + return c.cs.Err() +} + func (c *ChangeStreamsExporterImpl) exportChangeStreams(ctx context.Context) error { defer c.exporter.close(ctx) @@ -277,6 +284,7 @@ func (c *ChangeStreamsExporterImpl) exportChangeStreams(ctx context.Context) err expDstList := strings.Split(expDst, ",") for c.exporter.next(ctx) { + csMap, err := c.exporter.decode() if err != nil { return err @@ -327,5 +335,12 @@ func (c *ChangeStreamsExporterImpl) exportChangeStreams(ctx context.Context) err return err } } + + if err := c.exporter.err(); err != nil { + return errors.InternalServerError.Wrap("Could not get the next event for change stream.", err) + } + + c.log.Info("Acquisition of change streams was interrupted.") + return nil } diff --git a/application/export_change_streams_mock.go b/application/export_change_streams_mock.go index 75fc64e..aa9b784 100644 --- a/application/export_change_streams_mock.go +++ b/application/export_change_streams_mock.go @@ -116,3 +116,7 @@ func (m *mockChangeStreamsExporterClientImpl) saveResumeToken(_ context.Context, m.csCursorFlag = false return nil } + +func (m *mockChangeStreamsExporterClientImpl) err() error { + return nil +}