Skip to content

Commit

Permalink
Merge pull request #14 from cam-inc/feature/add_storage_type_for_save…
Browse files Browse the repository at this point in the history
…_resumetoken

Fixes: resume-token's architecture
  • Loading branch information
KenFujimoto12 authored Apr 27, 2022
2 parents 96ea7ed + bb181bd commit e422286
Show file tree
Hide file tree
Showing 56 changed files with 2,572 additions and 525 deletions.
82 changes: 76 additions & 6 deletions .env.template
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,90 @@ BIGQUERY_TABLE=
KINESIS_STREAM_NAME=
KINESIS_STREAM_REGION=

# Require
## One resume token is saved in the location specified here.
## e.g. PERSISTENT_VOLUME_DIR=/dsk/pvc/
PERSISTENT_VOLUME_DIR=
# Optional
## You have to specify this environment variable if you want to export Cloud PubSub.
PUBSUB_TOPIC_NAME=

# Require
## Specify the location you want to export.
## e.g. EXPORT_DESTINATION=bigquery
## e.g. EXPORT_DESTINATION=bigquery,pubsub,kinesisStream
## e.g. EXPORT_DESTINATION=bigquery,pubsub,kinesisStream,file
EXPORT_DESTINATION=

# Require
## Specify the time zone you run this middleware by referring to the following. (e.g. TIME_ZONE=Asia/Tokyo)
## https://cs.opensource.google/go/go/+/master:src/time/zoneinfo_abbrs_windows.go;drc=72ab424bc899735ec3c1e2bd3301897fc11872ba;l=15
TIME_ZONE=

# File Exporter Relationship ===============================================================================

# Optional
## If you select file as the export destination ("EXPORT_DESTINATION") file full path(default "stdout")
## e.g. FILE_EXPORTER_WRITER=file
## e.g. FILE_EXPORTER_WRITER=stdout
FILE_EXPORTER_WRITER=

# Optional
## If you select file export and no stdout mode, one file max size(mega-bytes)
FILE_EXPORTER_WRITER_MAX_MEGABYTES=
# Optional
## If you select file export and no stdout mode, the maximum number of days to retain (The default is not to remove).
FILE_EXPORTER_WRITER_MAX_DAYS=
# Optional
## the maximum number of old rotated file to retain(The default is to retain all old log files)
FILE_EXPORTER_WRITER_MAX_BACKUPS=

# Optional
## Log type when file is selected as the export destination of change streams.
## Defaul is "changeStream".
FILE_EXPORTER_LOG_TYPE_KEY=

# Optional
## If you select file as the export destination for change streams, the key for the change streams field in the log.
## Defaul is "cs".
FILE_EXPORTER_CHANGE_STREAM_KEY=

# Optional
## JSON time field key. Default is no field itself.
FILE_EXPORTER_TIME_KEY=

# Optional
## JSON time field key. Default is no field itself.
FILE_EXPORTER_NAME_KEY=

# ==========================================================================================================

# Resume Token Relationship =================================================================================

# Require
## One resume token is saved in the location specified here.
## e.g. RESUME_TOKEN_VOLUME_DIR=/dsk/pvc
RESUME_TOKEN_VOLUME_DIR=

# Optional
## specify saved resume token storage type (default value is file)
## e.g. file, s3, gcs
RESUME_TOKEN_VOLUME_TYPE=

# Optional
## specify saved resume token cloud storage bucket name (Not required for file type)
RESUME_TOKEN_VOLUME_BUCKET_NAME=

# Optional
## specify saved resume token file name (default value is {MONGODB_COLLECTION}.dat)
RESUME_TOKEN_FILE_NAME=

# Optional
## specify saved resume token cloud storage bucket region (Not required for file type)
RESUME_TOKEN_BUCKET_REGION=

# Optional
## specify saved resume token interval (default 0 sec)
RESUME_TOKEN_SAVE_INTERVAL_SEC=

# ==========================================================================================================

# Log Relationship ========================================================================================
# Optional
## MxT use zap library.
## Specify the MxT log setting you run this middleware by referring to the following.
Expand All @@ -49,4 +117,6 @@ LOG_FORMAT=
LOG_OUTPUT_DIRECTORY=
### Specify log output File.
### e.g. LOG_OUTPUT_FILE=mxt.log
LOG_OUTPUT_FILE=
LOG_OUTPUT_FILE=

# ==========================================================================================================
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ LABEL org.opencontainers.image.source="https://github.com/cam-inc/MxTransporter"
WORKDIR /go/src

COPY . ./

RUN go mod download

ARG CGO_ENABLED=0
Expand All @@ -31,4 +32,4 @@ COPY --from=build /go/bin/main /go/bin/main
COPY --from=build /go/bin/health /go/bin/health
COPY --from=build /usr/share/zoneinfo /usr/share/zoneinfo

ENTRYPOINT ["/go/bin/main"]
ENTRYPOINT ["/go/bin/main"]
12 changes: 12 additions & 0 deletions Dockerfile.local
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM golang:latest

WORKDIR /go/src

COPY . ./
RUN go mod download

ARG CGO_ENABLED=0
ARG GOOS=linux
ARG GOARCH=amd64

CMD go run ./cmd/main.go
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,8 @@

build-image:
@echo "build image..."
docker build -t mxtransporter .
docker build -t mxtransporter -f Dockerfile .

build-image-for-local:
@echo "build image..."
docker build -t mxtransporter -f Dockerfile.local .
84 changes: 70 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ With MxTransporter, real-time data can be reproduced and retained on the data ut

# Features
- Flexible export destination
It supports data warehouse and streaming service as export destinations after collecting change streams.

It supports data warehouses, streaming services, etc. as destinations for Change Streams after they have been retrieved and formatted.

- Simultaneous multi-export destination

Multiple supported data warehouses and streaming services can be selected at the same time to export formatted change streams information.
The formatted Change Streams information allows you to select multiple data warehouses, streaming services, etc. as destinations at the same time.

- Container base

Expand Down Expand Up @@ -100,27 +100,47 @@ In this system, resume token is saved in Persistent Volume associated with the c

The resume token of the change streams just before the container stopped is stored in the persistent volume, so you can refer to it and get again the change streams that you missed while the container stopped and the new container started again.

The resume token is stored in the directory where the persistent volume is mounted.
The resume token can be set to the following as the save destination.

#### local file
The resume token is stored in the directory where the persistent volume is mounted.<br>
You can choose to save to a local file by setting ```RESUME_TOKEN_VOLUME_TYPE = file```.

```PERSISTENT_VOLUME_DIR``` is an environment variable given to the container.
```RESUME_TOKEN_VOLUME_DIR``` is an environment variable given to the container.

```
{$PERSISTENT_VOLUME_DIR}/{year}/{month}/{day}
{$RESUME_TOKEN_VOLUME_DIR}/{$RESUME_TOKEN_FILE_NAME}.dat
```

The resume token is saved in ```{year}-{month}-{day}.dat```.
The resume token is saved in a file called ``` {RESUME_TOKEN_FILE_NAME} .dat```. <br>
```RESUME_TOKEN_FILE_NAME``` is an optional environment variable, so if you don't set it, it will be saved in a file named ```{MONGODB_COLLECTION} .dat```.

```
$ pwd
{$PERSISTENT_VOLUME_DIR}/{year}/{month}/{day}
{$PERSISTENT_VOLUME_DIR}
$ ls
{year}-{month}-{day}.dat
{RESUME_TOKEN_FILE_NAME}.dat
$ cat {year}-{month}-{day}.dat
$ cat {RESUME_TOKEN_FILE_NAME}.dat
T7466SLQD7J49BT7FQ4DYERM6BYGEMVD9ZFTGUFLTPFTVWS35FU4BHUUH57J3BR33UQSJJ8TMTK365V5JMG2WYXF93TYSA6BBW9ZERYX6HRHQWYS
```

#### External storage
It is also possible to save to cloud storage.

You can choose to save to S3 or GCS by setting ```RESUME_TOKEN_VOLUME_TYPE = s3 or RESUME_TOKEN_VOLUME_TYPE = gcs```. <br>
Set ``` RESUME_TOKEN_VOLUME_DIR``` as the object key.

Set the following environment variables as you like.

```
RESUME_TOKEN_VOLUME_BUCKET_NAME
RESUME_TOKEN_FILE_NAME
RESUME_TOKEN_BUCKET_REGION
RESUME_TOKEN_SAVE_INTERVAL_SEC
```

When getting change-streams by referring to resume token, it is designed to specify resume token in ```startAfrter``` of ```Collection.Watch()```.

<br>
Expand All @@ -131,6 +151,25 @@ MxTransporter export change streams to the following description.
- Google Cloud BigQuery
- Google Cloud Pub/Sub
- Amazon Kinesis Data Streams
- Standard output

Set the environment variables as follows.
```
EXPORT_DESTINATION=bigquery
or
EXPORT_DESTINATION=kinesisStream
or
EXPORT_DESTINATION=pubsub
or
EXPORT_DESTINATION=file
```


### BigQuery
Create a BigQuery Table with a schema like the one below.
Expand Down Expand Up @@ -177,7 +216,10 @@ Table schema
```

### Pub/Sub
No special preparation is required. Automaticaly, create a Topic with the MongoDB Database name, and a Subscription with the MongoDB Collection name from which the change streams originated.
Set the following environment variables to specify the topic name to which Change Streams will be exported.
```
PUBSUB_TOPIC_NAME
```

Change streams are sent to that subscription in a pipe (|) separated CSV.

Expand All @@ -186,6 +228,14 @@ No special preparation is required. If you want to separate the data warehouse t

Change streams are sent to that in a pipe (|) separated CSV.

### Standard output
It is tandard output or file output.
This feature assumes the case of relaying data via a sidecar-powered agent (fluentd, fluentbit, etc.).
Due to the specifications of Docker log, if it is output as standard, it will be chunked at 16K Bytes, so if you want to avoid it, please use a file.
```
{"logType": "{FILE_EXPORTER_LOG_TYPE_KEY}","{FILE_EXPORTER_CHANGE_STREAM_KEY}":{// Change Stream Data //}}
```

<br>

## Format
Expand All @@ -210,14 +260,20 @@ It is formatted into a pipe (|) separated CSV and put.
"}|insert|2021-10-01 23:59:59|{"_id":"6893253plm30db298659298h”,”name”:”xxx”}|{“coll”:”xxx”,”db”:”xxx”}|{“_id":"6893253plm30db298659298h"}|null
```

### Standard output
It is basic JSON. It is possible to change the key of ChangeStream, add a Time field by specifying the environment variable option.
```
{"logType": "{FILE_EXPORTER_LOG_TYPE_KEY}","{FILE_EXPORTER_CHANGE_STREAM_KEY}":{// Change Stream Data //},"{FILE_EXPORTER_TIME_KEY}":"2022-04-20T01:47:39.228Z"}
```

<br>

# Contributors
| [<img src="https://avatars.githubusercontent.com/KenFujimoto12" width="130px;"/><br />Kenshirou](https://github.com/KenFujimoto12) <br /> |
| :---: |
| [<img src="https://avatars.githubusercontent.com/KenFujimoto12" width="130px;"/><br />Kenshirou](https://github.com/KenFujimoto12) <br /> | [<img src="https://avatars.githubusercontent.com/syama666" width="130px;"/><br />Yoshinori Sugiyama](https://github.com/syama666) <br /> |
| :---: | :---: |
<br>


# Copyright

CAM, Inc. All rights reserved.
CAM, Inc. All rights reserved.
Loading

0 comments on commit e422286

Please sign in to comment.