To better serve Wise business and customer needs, the PipelineWise codebase needs to shrink. We have made the difficult decision that, going forward many components of PipelineWise will be removed or incorporated in the main repo. The last version before this decision is v0.64.1
We thank all in the open-source community, that over the past 6 years, have helped to make PipelineWise a robust product for heterogeneous replication of many many Terabytes, daily
This is a Singer tap that reads data from files located inside a given S3 bucket and produces JSON-formatted data following the Singer spec.
This is a PipelineWise compatible tap connector.
The recommended method of running this tap is to use it from PipelineWise. When running it from PipelineWise you don't need to configure this tap with JSON files and most of things are automated. Please check the related documentation at Tap S3 CSV
If you want to run this Singer Tap independently please read further.
First, make sure Python 3 is installed on your system or follow these installation instructions for Mac or Ubuntu.
It's recommended to use a virtualenv:
python3 -m venv venv
pip install pipelinewise-tap-s3-csv
or
python3 -m venv venv
. venv/bin/activate
pip install --upgrade pip
pip install .
Here is an example of basic config, that's using the defualt Profile based authentication:
```json
{
"start_date": "2000-01-01T00:00:00Z",
"bucket": "tradesignals-crawler",
"tables": [{
"search_prefix": "feeds",
"search_pattern": ".csv",
"table_name": "my_table",
"key_properties": ["id"],
"delimiter": ","
}]
}
```
Profile based authentication used by default using the default
profile. To use another profile set aws_profile
parameter in config.json
or set the AWS_PROFILE
environment variable.
For non-profile based authentication set aws_access_key_id
, aws_secret_access_key
and optionally the aws_session_token
parameter in the config.json
. Alternatively you can define them out of config.json
by setting AWS_ACCESS_KEY_ID
, AWS_SECRET_ACCESS_KEY
and AWS_SESSION_TOKEN
environment variables.
A bit of a run down on each of the properties:
- aws_profile: AWS Profile name for Profile based authentication. If not provided,
AWS_PROFILE
environment variable will be used. - aws_access_key_id: AWS access key ID for Non-Profile based authentication. If not provided,
AWS_ACCESS_KEY_ID
environment variable will be used. - aws_secret_access_key: AWS secret access key for Non-Profile based authentication. If not provided,
AWS_SECRET_ACCESS_KEY
environment variable will be used. - aws_session_token: AWS session token for Non-Profile based authentication. If not provided,
AWS_SESSION_TOKEN
environment variable will be used. - aws_endpoint_url: (Optional): The complete URL to use for the constructed client. Normally, botocore will automatically construct the appropriate URL to use when communicating with a service. You can specify a complete URL (including the "http/https" scheme) to override this behavior. For example https://nyc3.digitaloceanspaces.com
- start_date: This is the datetime that the tap will use to look for newly updated or created files, based on the modified timestamp of the file.
- bucket: The name of the bucket to search for files under.
- tables: JSON object that the tap will use to search for files, and emit records as "tables" from those files.
The table
field consists of one or more objects, that describe how to find files and emit records. A more detailed (and unescaped) example below:
[
{
"search_prefix": "exports"
"search_pattern": "my_table\\/.*\\.csv",
"table_name": "my_table",
"key_properties": ["id"],
"date_overrides": ["created_at"],
"delimiter": ","
},
...
]
- search_prefix: This is a prefix to apply after the bucket, but before the file search pattern, to allow you to find files in "directories" below the bucket.
- search_pattern: This is an escaped regular expression that the tap will use to find files in the bucket + prefix. It's a bit strange, since this is an escaped string inside of an escaped string, any backslashes in the RegEx will need to be double-escaped.
- table_name: This value is a string of your choosing, and will be used to name the stream that records are emitted under for files matching content.
- key_properties: These are the "primary keys" of the CSV files, to be used by the target for deduplication and primary key definitions downstream in the destination.
- date_overrides: Specifies field names in the files that are supposed to be parsed as a datetime. The tap doesn't attempt to automatically determine if a field is a datetime, so this will make it explicit in the discovered schema.
- delimiter: This allows you to specify a custom delimiter, such as
\t
or|
, if that applies to your files.
A sample configuration is available inside config.sample.json
- Install python test dependencies in a virtual env and run nose unit and integration tests
make venv
- To run unit tests:
make unit_tests
- To run integration tests:
Integration tests require a valid S3 bucket and credentials should be passed as environment variables, this project uses Minio server.
Fist, start a Minio server docker container:
mkdir -p ./minio/data/awesome_bucket
UID=$(id -u) GID=$(id -g) docker-compose up -d
Run integration tests:
make integration_tests
- Install python dependencies and run python linter
make venv pylint