Skip to content

Commit

Permalink
Merge pull request #23 from cam-inc/develop
Browse files Browse the repository at this point in the history
main merge
  • Loading branch information
KenFujimoto12 authored May 11, 2022
2 parents e422286 + f4092d5 commit d608b76
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 0 deletions.
15 changes: 15 additions & 0 deletions application/export_change_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ type (
exportToKinesisStream(ctx context.Context, cs primitive.M) error
exportToFile(ctx context.Context, cs primitive.M) error
saveResumeToken(ctx context.Context, rt string) error
err() error
}

ChangeStreamsExporterImpl struct {
Expand Down Expand Up @@ -260,13 +261,19 @@ func (c *changeStreamsExporterClientImpl) exportToPubsub(ctx context.Context, cs
func (c *changeStreamsExporterClientImpl) exportToKinesisStream(ctx context.Context, cs primitive.M) error {
return c.kinesisStream.ExportToKinesisStream(ctx, cs)
}

func (c *changeStreamsExporterClientImpl) exportToFile(ctx context.Context, cs primitive.M) error {
return c.fileExporter.Export(ctx, cs)
}

func (c *changeStreamsExporterClientImpl) saveResumeToken(ctx context.Context, rt string) error {
return c.resumeToken.SaveResumeToken(ctx, rt)
}

func (c *changeStreamsExporterClientImpl) err() error {
return c.cs.Err()
}

func (c *ChangeStreamsExporterImpl) exportChangeStreams(ctx context.Context) error {
defer c.exporter.close(ctx)

Expand All @@ -277,6 +284,7 @@ func (c *ChangeStreamsExporterImpl) exportChangeStreams(ctx context.Context) err
expDstList := strings.Split(expDst, ",")

for c.exporter.next(ctx) {

csMap, err := c.exporter.decode()
if err != nil {
return err
Expand Down Expand Up @@ -327,5 +335,12 @@ func (c *ChangeStreamsExporterImpl) exportChangeStreams(ctx context.Context) err
return err
}
}

if err := c.exporter.err(); err != nil {
return errors.InternalServerError.Wrap("Could not get the next event for change stream.", err)
}

c.log.Info("Acquisition of change streams was interrupted.")

return nil
}
4 changes: 4 additions & 0 deletions application/export_change_streams_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,7 @@ func (m *mockChangeStreamsExporterClientImpl) saveResumeToken(_ context.Context,
m.csCursorFlag = false
return nil
}

func (m *mockChangeStreamsExporterClientImpl) err() error {
return nil
}

0 comments on commit d608b76

Please sign in to comment.