이 실습의 목적은 Businesss Intelligence System을 aws의 analytics 서비스를 활용해서 구현해 보는 것 입니다.
이 실습을 통해서 데이터 수집 -> 저장 -> 분석/처리 -> 시각화
단계를 aws의 analytics 서비스를 이용해서
어떻게 구축할 수 있는지 경험할 수 있습니다.
- Solutions Architecture Overview
- 사전 준비 작업
- [Step-1a] 입력 데이터를 수신할 Kinesis Data Streams 생성하기
- [Step-1b] 데이터를 S3에 저장하기 위한 Kinesis Data Firehose 생성하기
- [Step-1c] 데이터 파이프라인 동작 확인 하기
- [Step-1d] Athena를 이용해서 데이터 분석 하기
- [Step-1e] QuickSight를 이용한 데이터 시각화
- (Optional)[Step-1f] AWS Lambda Function을 이용해서 S3에 저장된 작은 파일들을 큰 파일로 합치기
- [Step-2a] 실시간 데이터 분석을 위한 Amazon OpenSearch Service 생성하기
- [Step-2b] AWS Lambda Function을 이용해서 실시간 데이터를 OpenSearch에 수집하기
- [Step-2c] Kibana를 이용한 데이터 시각화
- Recap and Review
- Resources
- Reference
- Deployment by AWS CDK
[Top]
실습을 시작 하기 전에 필요한 IAM User, EC2를 생성하고 및 구성합니다.
[Top]
AWS Management Console에서 Kinesis 서비스를 선택합니다.
- Get Started 버튼을 클릭합니다.
- [Create data stream] 버튼을 클릭합니다.
- Kinesis stream name 에 원하는 이름(예:
retail-trans
)을 입력합니다. - Data stream capacity에서 On-Demand 를 선택합니다.
Provisioned를 선택한 경우, Number of shards 에 원하는 shards 수(예:1
)를 입력합니다. - [Create data stream] 버튼을 클릭 후, 생성된 kinesis stream의 status가 active가 될 때까지 기다립니다.
[Top]
Kinesis Data Firehose를 이용해서 실시간으로 데이터를 S3, Redshift, OpenSearch 등의 목적지에 수집할 수 있습니다. AWS Management Console에서 Kinesis 서비스를 선택합니다.
-
Get Started 버튼을 클릭합니다.
-
Deliver streaming data with Kinesis Firehose delivery streams 메뉴의 [Create delivery stream] 을 클릭하여 새로운 Firehose 전송 스트림 생성을 시작합니다.
-
(Step 1: Name and source) Delivery stream name에 원하는 이름(예:
retail-trans
)를 입력합니다. -
Choose a source 에서
Kinesis Data Stream
를 선택하고, 앞서 생성한 Kinesis Data Stream(예:retail-trans
)을 선택 한 후, Next를 클릭합니다. -
(Step 2: Process records) Transform source records with AWS Lambda / Convert record format 은 둘다 default 옵션
Disabled
를 선택하고 Next를 클릭합니다. -
(Step 3: Choose a destination) Destination은 Amazon S3를 선택하고,
Create new
를 클릭해서 S3 bucket을 생성합니다. S3 bucket 이름은 이번 실습에서는aws-analytics-immersion-day-xxxxxxxx
형식으로xxxxxxxx
는 bucket 이름이 겹치지 않도록 임의의 숫자나 문자를 입력 합니다.S3 prefix를 입력합니다. 예를 들어서 다음과 같이 입력 합니다.
json-data/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/
S3 error prefix를 입력합니다. 예를 들어서 다음과 같이 입력 합니다.
error-json/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}
⚠️ S3 prefix 또는 S3 error prefix 패턴에는 줄 바꿈(\n
) 문자가 없어야합니다. 예제 패턴을 복사하여 S3 prefix 또는 S3 error prefix에 붙여 넣었다면, 후행 줄 바꿈을 제거하는 것이 좋습니다.S3 prefix와 3 error prefix 입력을 완료한 후에, Next를 클릭합니다. (참고: Amazon S3 객체에 대한 사용자 지정 접두사)
-
(Step 4: Configure settings) S3 buffer conditions에서 Buffer size는
1MB
, Buffer interval은60
seconds로 설정합니다. -
아래 IAM role에서
Create or update IAM Role ...
을 선택한 후, [Next] 버튼을 클릭합니다. -
(Step 5: Review) Review에서 입력한 정보를 확인한 뒤 틀린 부분이 없다면, [Create delivery stream] 버튼을 클릭하여 Firehose 생성을 완료합니다.
[Top]
샘플 데이터를 이용해서 Kinesis Data Streams -> Kinesis Data Firehose -> S3
로 데이터가 정상적으로 수집되는지 확인합니다.
- 앞서 생성한 E2 인스턴스에 SSH 접속을 합니다.
gen_kinesis_data.py
을 실행합니다.python3 gen_kinesis_data.py \ --region-name us-west-2 \ --service-name kinesis \ --stream-name retail-trans
gen_kinesis_data.py
자세한 사용법은--help
옵션을 이용해서 확인할 수 있습니다.python3 gen_kinesis_data.py --help
- 매 초 데이터가 발생하는 것을 확인합니다. 충분한 데이터 수집을 위해 실행 중인 상태로 다음 단계를 진행합니다.
- 몇 분 뒤 생성한 S3 bucket을 확인해 보면, 생성된 원본 데이터가 Kinesis Data Firehose를 통해 S3에 저장되는 것을 확인할 수 있습니다.
[Top]
Amazon Athena를 이용해서 S3에 저장된 데이터를 기반으로 테이블을 만들고, 테이블을 쿼리한 다음 쿼리 결과를 확인할 수 있습니다. 먼저 데이터를 쿼리하기 위해서 데이터베이스를 생성합니다.
- Athena 콘솔을 엽니다.
- Athena 콘솔을 처음 방문하면 시작하기 페이지로 이동합니다. [Get Started] 를 선택해 쿼리 편집기를 엽니다.
- 처음 방문 하는 경우라면, set up a query result location in Amazon S3 를 클릭해서 Athena의 쿼리 결과를 저장할 s3 위치를 설정합니다.
이번 실습에서는 Kinesis Data Firehose 설정 단계에서 생성한 s3 bucket에 Athena의 쿼리 결과를 저장할 디렉터리를 생성합니다.
예를 들어,
s3://aws-analytics-immersion-day-xxxxxxxx/athena-query-results/
(xxxxxxxx
는 bucket 이름이 겹치지 않도록 입력한 임의의 숫자나 문자열 입니다.) 처음 방문하는 경우가 아니라면, Athena 쿼리 편집기가 열립니다. - Athena 쿼리 편집기에서 예제 쿼리가 있는 쿼리 창을 볼 수 있습니다. 쿼리 창의 아무 곳에나 쿼리를 입력하기 시작합니다.
mydatabase
라는 데이터베이스를 생성하려면 다음 CREATE DATABASE 문을 입력한 다음, [Run Query] 를 선택합니다.CREATE DATABASE IF NOT EXISTS mydatabase
- 카탈로그 디스플레이가 새로 고쳐지고 왼쪽 [Catalog] 대시보드의 [DATABASE] 목록에
mydatabase
가 표시되는지 확인합니다.
- [DATABASE] 에
mydatabase
가 선택되었는지 확인한 후 [New Query] 를 선택합니다. - 쿼리 창에 다음 CREATE TABLE 문을 입력한 후 [Run Query] 를 선택합니다.
테이블
CREATE EXTERNAL TABLE IF NOT EXISTS `mydatabase.retail_trans_json`( `invoice` string COMMENT 'Invoice number', `stockcode` string COMMENT 'Product (item) code', `description` string COMMENT 'Product (item) name', `quantity` int COMMENT 'The quantities of each product (item) per transaction', `invoicedate` timestamp COMMENT 'Invoice date and time', `price` float COMMENT 'Unit price', `customer_id` string COMMENT 'Customer number', `country` string COMMENT 'Country name') PARTITIONED BY ( `year` int, `month` int, `day` int, `hour` int) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat' LOCATION 's3://aws-analytics-immersion-day-xxxxxxxx/json-data'
retail_trans_json
가 생성되고 데이터베이스의 [Catalog] 대시보드에 표시됩니다. - 테이블을 생성한 이후 [New Query] 를 선택하고 다음을 실행해서, 파티션의 데이터를 로드합니다.
참고로, Athena 테이블이 파티션 목록은 다음 쿼리를 이용해서 확인할 수 있습니다.
MSCK REPAIR TABLE mydatabase.retail_trans_json
SHOW PARTITIONS mydatabase.retail_trans_json
- [New Query] 를 선택하고 쿼리 창의 아무 곳에나 다음 문을 입력한 다음 [Run Query] 를 선택합니다.
다음과 같은 형식의 결과가 반환됩니다.
SELECT * FROM retail_trans_json LIMIT 10
[Top]
이번에는 Amazon QuickSight를 통해 데이터 시각화 작업을 합니다.
- QuickSight 콘솔로 이동합니다.
- QuickSight에 가입하기 위해 [Sign up for QuickSight] 버튼을 클릭합니다.
- Standard Edition을 선택한 후 [Continue] 버튼을 클릭합니다.
- QuickSight account name은 임의로 지정(중복될 경우 계정이 생성되지 않습니다) 하고, Notification email address는 개인 Email 주소를 입력합니다.
- QuckSight가 S3에 Access해야 하므로, [Choose S3 buckets] 를 클릭합니다.
- 아래와 같은 창이 뜨면, 데이터가 저장되어 있는
aws-analytics-immersion-day-xxxxxxxx
를 선택한 후 [Finish] 를 클릭합니다. - 계정이 생성된 후 [Go to Amazon QuickSight] 버튼을 클릭합니다.
- 우측 상단에 region이 데이터를 저장하고 있는 S3 bucket의 region과 동일하게 설정한 후, 좌측 상단 [New Analysis] 를 클릭합니다.
- [New Data Set] 버튼을 클릭합니다.
Athena
를 클릭하고 팝업 창의 Data source name에retail-quicksight
를 입력(임의의 값 입력 가능)하고, [Validate connection] 을 클릭 해서Validated
상태로 변경되면, [Create data source] 버튼을 클릭합니다.- Choose your table 화면에서 Database는
mydatabase
(앞서 생성한 Athena 데이터베이스), Tables 에서retail_trans_json
를 선택하고 Select 버튼을 클릭합니다. - Finish data set creation 화면에서 [Visualize] 버튼을 클릭 합니다.
retail_trans_json
테이블 데이터가 QuickSight SPICE 엔진에 로딩 되었는지 확인합니다. InvoicdDate
별Quantity
,Price
를 시각화 해 보겠습니다. 좌측 Fields list에서invoicedate
,price
,quantity
field를 차례대로 선택합니다. Visual types는 세로 막대 그래프를 선택합니다.- 방금 만든 Dashboard를 다른 사용자에게 공유해 보겠습니다. 좌측 상단 유저 아이콘을 클릭하고 [Manage QuickSight] 를 클릭합니다.
- Invite users 버튼을 클릭한 후 임의의 사용자 계정명(BI_user01)을 입력한 후 우측 [+] 버튼을 클릭합니다. Email은 다른 사용자의 Email 주소를 입력하고 Role은 AUTHOR, IAM User는 NO를 선택한 후 Invite 버튼을 클릭합니다.
- 사용자는 다음과 같은 Invitation Email을 받고 Click to accept invitation을 클릭하면 계정 생성 메뉴에서 비밀번호를 변경할 수 있습니다.
- QuickSight 화면으로 돌아가서 우측 상단의 Share > Share analysis 를 클릭합니다.
- BI_user01을 선택한 후 Share 버튼을 클릭합니다.
- 사용자는 다음과 같은 Email을 수신합니다. [Click to View] 를 클릭하여 분석결과를 확인할 수 있습니다.
[Top]
실시간으로 들어오는 데이터를 Kinesis Data Firehose를 이용해서 S3에 저장할 경우, 데이터 사이즈가 작은 파일들이 생성됩니다. Amazon Athena의 쿼리 성능 향상을 위해서 작은 파일들을 하나의 큰 파일로 합쳐주는 것이 좋습니다. 이러한 작업을 주기적으로 실행하기 위해서 Athena의 CTAS(Create Table As Select) 쿼리를 실행하는 AWS Lambda function 함수를 생성하고자 합니다.
- Athena 콘솔에 접속해서 Athena 쿼리 편집기로 이동합니다.
- [DATABASE] 에서 mydatabase를 선택하고, [New Query] 를 선택합니다.
- 쿼리 창에 다음 CREATE TABLE 문을 입력한 후 [Run Query] 를 선택합니다.
이번 실습에서는retal_tran_json
테이블의 json 포맷 데이터를 parquet 포맷으로 변경해서ctas_retail_trans_parquet
이라는 테이블에 저장할 것 입니다.
ctas_retail_trans_parquet
테이블의 데이터는 앞서 생성한 S3 bucket의s3://aws-analytics-immersion-day-xxxxxxxx/parquet-retail-trans
위치에 저장할 것 입니다.CREATE EXTERNAL TABLE `mydatabase.ctas_retail_trans_parquet`( `invoice` string COMMENT 'Invoice number', `stockcode` string COMMENT 'Product (item) code', `description` string COMMENT 'Product (item) name', `quantity` int COMMENT 'The quantities of each product (item) per transaction', `invoicedate` timestamp COMMENT 'Invoice date and time', `price` float COMMENT 'Unit price', `customer_id` string COMMENT 'Customer number', `country` string COMMENT 'Country name') PARTITIONED BY ( `year` int, `month` int, `day` int, `hour` int) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://aws-analytics-immersion-day-xxxxxxxx/parquet-retail-trans' TBLPROPERTIES ( 'has_encrypted_data'='false', 'parquet.compression'='SNAPPY') ;
- AWS Lambda 콘솔 을 엽니다.
- [Create a function] 을 선택합니다.
- Function name(함수 이름)에
MergeSmallFiles
을 입력합니다. - Runtime 에서
Python 3.11
을 선택합니다. - [Create a function] 을 선택합니다.
- Designer 탭에 [Add trigger] 를 선택합니다.
- Trigger configuration 의
Select a trigger
에서 CloudWatch Events/EventBridge 를 선택 합니다. Rule에서Create a new rule
선택하고, Rule name에 적절한 rule name(예:MergeSmallFilesEvent
)을 입력 합니다. Rule type으로Schedule expression
을 선택하고, Schedule expression에 매시각 5분 마다 작업이 실행되도록,cron(5 * * * *)
입력합니다. - Trigger configuration 에서 [Add] 를 클릭합니다.
- Function code의 코드 편집기에
athena_ctas.py
파일의 코드를 복사해서 붙여넣은 후, Deploy 버튼을 클릭합니다. - [Add environment variables] 를 클릭해서 다음 Environment variables을 등록합니다.
예를 들어, 다음과 같이 Environment variables을 설정합니다.
OLD_DATABASE=<source database> OLD_TABLE_NAME=<source table> NEW_DATABASE=<destination database> NEW_TABLE_NAME=<destination table> WORK_GROUP=<athena workgroup> OLD_TABLE_LOCATION_PREFIX=<s3 location prefix of source table> OUTPUT_PREFIX=<destination s3 prefix> STAGING_OUTPUT_PREFIX=<staging s3 prefix used by athena> COLUMN_NAMES=<columns of source table excluding partition keys>
OLD_DATABASE=mydatabase OLD_TABLE_NAME=retail_trans_json NEW_DATABASE=mydatabase NEW_TABLE_NAME=ctas_retail_trans_parquet WORK_GROUP=primary OLD_TABLE_LOCATION_PREFIX=s3://aws-analytics-immersion-day-xxxxxxxx/json-data OUTPUT_PREFIX=s3://aws-analytics-immersion-day-xxxxxxxx/parquet-retail-trans STAGING_OUTPUT_PREFIX=s3://aws-analytics-immersion-day-xxxxxxxx/tmp COLUMN_NAMES=invoice,stockcode,description,quantity,invoicedate,price,customer_id,country
- Athena 쿼리를 수행하는데 필요한 IAM Policy를 추가하기 위해서 Execution role에서
View the MergeSmallFiles-role-XXXXXXXX role on the IAM console.
을 클릭 해서 IAM Role을 수정합니다. - IAM Role의 [Permissions] 탭에서 [Attach policies] 버튼을 클릭 후, AmazonAthenaFullAccess, AmazonS3FullAccess 를 차례로 추가 합니다.
- Basic settings에서 [Edit] 선택합니다. Memory와 Timeout을 알맞게 조정합니다.
이 실습에서는 Timout을
5 min
으로 설정합니다.
[Top]
실시간으로 데이터를 저장하고, 분석하기 위해서 OpenSearch cluster를 생성합니다. Amazon ES 도메인은 OpenSearch 클러스터와 동의어입니다. 도메인은 설정, 인스턴스 유형, 인스턴스 수, 스토리지 리소스를 지정한 설정입니다.
- AWS Management Console에서 Analytics의 OpenSearch 서비스를 선택합니다.
- (Step 1: Choose deployment type) Create a new domain(새 도메인 생성) 을 선택합니다.
- OpenSearch 도메인 생성 페이지에서 Deployment type(배포 유형) 에 대해 Production(프로덕션) 을 선택합니다.
- 버전에서 해당 도메인의 OpenSearch 버전을 선택합니다. 지원되는 최신 버전을 선택하는 것이 좋습니다. 자세한 내용은 지원되는 OpenSearch 버전 단원을 참조하십시오.
- [Next] 를 선택합니다.
- (Step 2: Configure domain) 도메인의 이름을 입력합니다. 이 실습에서는 이후에 다룰
retail
를 예제 도메인 이름으로 사용합니다. - 인스턴스 유형 에서 Amazon ES 도메인의 인스턴스 유형을 선택합니다. 이 실습에서는 테스트 목적에 적합한 소용량의 경제적인 인스턴스 유형
t2.medium.opensearch
를 사용하는 것이 좋습니다. - 인스턴스 수 에 원하는 인스턴스 수를 입력합니다. 이 실습에서는 기본값
3
을 사용합니다. - 스토리지 유형에서 EBS를 선택합니다.
- 지금은 Dedicated master nodes(전용 마스터 노드), Snapshot configuration(스냅샷 구성) 및 Optional OpenSearch cluster settings(선택적 OpenSearch 클러스터 설정) 섹션을 무시할 수 있습니다.
- [Next] 를 선택합니다.
- (Step 3: Configure access and security) Network configuration(네트워크 구성) 의 경우 VPC access 를 선택합니다.
적절한 VPC와 subnet을 선택합니다. Security Groups으로 준비 단계에서 생성한
es-cluster-sg
를 선택합니다. - 지금은 Amazon Cognito Authentication(Amazon Cognito 인증) 과 Fine–grained access control 을 disable 합니다.
- Access policy(액세스 정책) 의 경우 Domain access policy(도메인 액세스 정책) 에서 JSON defined access policy(JSON 정의 액세스 정책) 선택한 다음,
Add or edit the access policy(액세스 정책 추가 또는 편집) 에 다음 템플릿을 이용해서 JSON defined access policy 를 생성해서 입력 합니다.
- JSON defined access policy Template -
<DOMAIN-NAME>
에 (Step 2: Configure domain) 에서 입력한 도메인 이름을 일력합니다.{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "*" }, "Action": [ "es:Describe*", "es:List*", "es:Get*", "es:ESHttp*" ], "Resource": "arn:aws:es:<region-id>:<account-id>:domain/<DOMAIN-NAME>/*" } ] }
- 예) 이번 실습에서는
retail
을 도메인 이름으로 사용했기 때문에, 아래와 같이 JSON defined access policy 를 생성합니다.{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "*" }, "Action": [ "es:Describe*", "es:List*", "es:Get*", "es:ESHttp*" ], "Resource": "arn:aws:es:us-west-2:123456789012:domain/retail/*" } ] }
- JSON defined access policy 생성을 완료하면, 아래와 같은 화면을 볼 수 있을 것입니다.
- JSON defined access policy Template -
- Encryption(암호화) 에서 Require HTTPS for all traffic to the domain 만 허용하고, 다른 항목은 disable 합니다.
- Encryption(암호화) 의 모든 기본값을 유지합니다. [Next] 를 선택합니다.
- Review 페이지에서 도메인 구성을 검토한 다음 확인을 선택합니다.
[Top]
Lambda function을 이용해서 Amazon ES에 데이터를 실시간으로 색인할 수 있습니다. 이번 실습에서는 AWS Lambda 콘솔을 사용하여 Lambda 함수를 생성합니다.
- AWS Lambda 콘솔 을 엽니다.
- Layers 메뉴에 들어가서 [Create layer] 을 선택합니다.
- Name에
es-lib
를 입력합니다. Upload a file from Amazon S3
를 선택하고, 라이브러리 코드가 저장된 s3 link url 또는 압축한 라이브러리 코드 파일을 입력합니다.es-lib.zip
생성 방법은 AWS Lambda Layer에 등록할 Python 패키지 생성 예제 를 참고하세요.Compatible runtimes
에서Python 3.11
을 선택합니다.
- AWS Lambda 콘솔 을 엽니다.
- [Create a function] 을 선택합니다.
- Function name(함수 이름)에
UpsertToES
을 입력합니다. - Runtime 에서
Python 3.11
을 선택합니다. - [Create a function] 을 선택합니다.
- Designer(디자이너) 에서 layers를 선택합니다. Layers에서 Add a layer를 선택합니다.
- Custom layers를 클릭하고 Name과 Version으로 앞서 생성한 layer의 Name과 Version을 선택합니다.
- [Add] 클릭합니다.
- Designer(디자이너) 에서
UpsertToES
을 선택하여 함수의 코드 및 구성으로 돌아갑니다. - Function code의 코드 편집기에
upsert_to_es.py
파일의 코드를 복사해서 붙여넣은 후, Deploy 버튼을 클릭합니다. - Environment variables 에서 [Edit] 를 클릭합니다.
- [Add environment variables] 를 클릭해서 아래 4개의 Environment variables을 등록합니다.
예를 들어, 다음과 같이 Environment variables을 설정합니다.
ES_HOST=<opensearch service domain> ES_INDEX=<opensearch index name> ES_TYPE=<opensearch type name> REQUIRED_FIELDS=<primary key로 사용될 column 목록> REGION_NAME=<region-name> DATE_TYPE_FIELDS=<column 중, date 또는 timestamp 데이터 타입의 column>
ES_HOST=vpc-retail-xkl5jpog76d5abzhg4kyfilymq.us-west-1.es.amazonaws.com ES_INDEX=retail ES_TYPE=trans REQUIRED_FIELDS=Invoice,StockCode,Customer_ID REGION_NAME=us-west-2 DATE_TYPE_FIELDS=InvoiceDate
- [Save] 선택합니다.
- lambda 함수를 VPC 내에서 실행 하고, Kinesis Data Streams에서 데이터를 읽기 위해서,
lamba 함수 실행에 필요한 Execution role에 필요한 IAM Policy를 추가햐야 합니다.
IAM Role 수정을 위해서
View the UpsertToES-role-XXXXXXXX role on the IAM console.
을 클릭 합니다. - IAM Role의 [Permissions] 탭에서 [Attach policies] 버튼을 클릭 후, AWSLambdaVPCAccessExecutionRole, AmazonKinesisReadOnlyAccess 를 차례로 추가 합니다.
- VPC 항목에서 [Edit] 버튼을 클릭해서 Edit VPC 화면으로 이동 한다. VPC connection 에서
Custom VPC
를 선택합니다. OpenSearch service의 도메인을 생성한 VPC와 subnets을 선택하고, OpenSearch service 도메인에 접근이 허용된 security groups을 선택합니다. - Basic settings에서 [Edit] 선택합니다. Memory와 Timeout을 알맞게 조정합니다. 이 실습에서는 Timout을
5 min
으로 설정합니다. - 다시 Designer 탭으로 돌아가서 [Add trigger] 를 선택합니다.
- Trigger configuration 의
Select a trigger
에서 Kinesis 를 선택 합니다. - Kinesis stream 에서 앞서 생성한 Kinesis Data Stream(
retail-trans
)을 선택합니다. - [Add] 를 선택합니다.
Lambda 함수가 Amazon OpenSearch Service에 데이터를 넣기 위해서는 Lambda 함수에 OpenSearch에 접근 권한이 필요합니다.
아래와 같은 순서로 OpenSearch에 데이터를 넣는데 필요한 권한을 Lambda 함수에 부여할 수 있습니다.
-
Amazon OpenSearch Cluster를 VPC의 private subnet에 생성했기 때문에, Amazon OpenSearch endpoint와 OpenSearch Dashboards(a.k.a Kibana) endpoint를 public 인터넷으로 접근할 수 없다. 따라서 OpenSearch 에 접속하기 위해서 ssh tunnel을 생성하고, local port forwarding을 해야 합니다.
-
Option 1) Using SSH Tunneling
-
ssh 설정 변경
Winodws 사용자의 경우, 여기를 참고하세요.
Mac/Linux 사용자의 경우, 다음과 같은 ssh tunnel 설정을 개인 PC에 있는 ssh config 파일에 추가합니다.# OpenSearch Tunnel Host estunnel HostName <EC2 Public IP of Bastion Host> User ec2-user IdentitiesOnly yes IdentityFile ~/.ssh/analytics-hol.pem LocalForward 9200 <OpenSearch Endpoint>:443
- EC2 Public IP of Bastion Host uses the public IP of the EC2 instance created in the Lab setup step.
- ex)
~$ ls -1 .ssh/ analytics-hol.pem config id_rsa ~$ tail .ssh/config # OpenSearch Tunnel Host estunnel HostName 214.132.71.219 User ubuntu IdentitiesOnly yes IdentityFile ~/.ssh/analytics-hol.pem LocalForward 9200 vpc-retail-qvwlxanar255vswqna37p2l2cy.us-west-2.es.amazonaws.com:443 ~$
-
Terminal 에서
ssh -N estunnel
명령어를 실행합니다.
-
-
Option 2) Connect using the EC2 Instance Connect CLI
- EC2 Instance Connect CLI 설치
sudo pip install ec2instanceconnectcli
- 실행
mssh ec2-user@{bastion-ec2-instance-id} -N -L 9200:{opensearch-endpoint}:443
- ex)
$ mssh ec2-user@i-0203f0d6f37ccbe5b -N -L 9200:vpc-retail-qvwlxanar255vswqna37p2l2cy.us-west-2.es.amazonaws.com:443
- EC2 Instance Connect CLI 설치
-
-
Web browser에서
https://localhost:9200/_dashboards/app/login?
으로 접속합니다. -
Amazon OpenSearch Service 생성할 때, 미리 만들었던 사용자 id와 Password를 입력합니다.
-
Welcome screen에서 Home 버튼 왼쪽에 있는 toolbar icon을 클릭한 후에 Security 메뉴를 선택합니다.
-
Security 메뉴 아래에 Roles을 선택합니다.
-
Create role 선택합니다.
-
Role이름을 입력 합니다. (e.g.,
firehose_role
). -
cluster permissions에
cluster_composite_ops
,cluster_monitor
를 추가합니다. -
Index permissions 에서 Index Patterns 선택하고, index-name* (e.g,
retail*
)을 입력합니다. -
Permissions 에
crud
,create_index
,manage
action group을 추가합니다.
다음으로 Lamabda 함수수의 IAM Role과 방금 생성한 OpenSearch Role을 연결합니다.
- Mapped users tab 을 클릭합니다.
- Manage mapping 클릭합니다.
- Backend roles 에 Lambda function이 사용하는 IAM Role의 ARN을 입력합니다.
arn:aws:iam::123456789012:role/UpsertToESServiceRole709-xxxxxxxxxxxx
. - Map 클릭합니다.
Note: Lambda 함수에 OpenSearch Role이 정상적으로 부여되지 않았다면, Lambda 함수를 수행할 때, 다음과 같은 에러가 발생할 수 있습니다:
[ERROR] AuthorizationException: AuthorizationException(403, 'security_exception', 'no permissions for [cluster:monitor/main] and User [name=arn:aws:iam::123456789012:role/UpsertToESServiceRole709-G1RQVRG80CQY, backend_roles=[arn:aws:iam::123456789012:role/UpsertToESServiceRole709-G1RQVRG80CQY], requestedTenant=null]')
[Top]
Amazon OpenSearch Service에서 수집된 데이터를 Kibana를 이용해서 시각화 작업을 합니다.
- Amazon OpenSearch Cluster를 VPC의 private subnet에 생성했기 때문에, Amazon OpenSearch endpoint와 OpenSearch Dashboards(a.k.a Kibana) endpoint를 public 인터넷으로 접근할 수 없다. 따라서 OpenSearch 에 접속하기 위해서 ssh tunnel을 생성하고, local port forwarding을 해야 합니다.
Mac 또는 Linux 사용자의 경우, 아래와 같이 개인 Local PC의 ssh config 파일에 ssh tunnel 설정을 추가 합니다. Windows 사용자의 경우, 여기를 참고한다.# OpenSearch Tunnel Host estunnel HostName <EC2 Public IP of Bastion Host> User ec2-user IdentitiesOnly yes IdentityFile ~/.ssh/analytics-hol.pem LocalForward 9200 <OpenSearch Endpoint>:443
- EC2 Public IP of Bastion Host 은 실습 환경 구성 단계에서 생성한 EC2 인스턴스의 Public IP 를 사용한다.
- 예)
~$ ls -1 .ssh/ analytics-hol.pem config id_rsa ~$ tail .ssh/config # OpenSearch Tunnel Host estunnel HostName 214.132.71.219 User ubuntu IdentitiesOnly yes IdentityFile ~/.ssh/analytics-hol.pem LocalForward 9200 vpc-retail-qvwlxanar255vswqna37p2l2cy.us-west-2.es.amazonaws.com:443 ~$
- Terminal 에서
ssh -N estunnel
를 실행합니다. - Web browser에서
https://localhost:9200/_dashboards/app/login?
으로 접속합니다. - (Home) Add Data to Kibana 에서 [Use OpenSearch data / Connect to your OpenSearch index] 클릭한다.
- (Management / Create index pattern) Create index pattern의 Step 1 of 2: Define index pattern 에서
Index pattern에
retail*
을 입력합니다. - (Management / Create index pattern) [> Next step] 을 선택합니다.
- (Management / Create index pattern) Create index pattern의 Step 2 of 2: Configure settings 에서
Time Filter field name에
InvoiceDate
를 선택합니다. - (Management / Create index pattern) [Create index pattern] 을 클릭합니다.
- (Management / Advanced Settings) 왼쪽 사이드바 메뉴에서 [Advanced Settings] 를 선택한 후, Timezone for date formatting을
Etc/UTC
로 설정합니다. 테스트용 데이터의 로그 생성 시간이UTC
기준이기 때문에 Kibana의 Timezone 역시UTC
로 설정합니다. - (Discover) Index pattern 생성을 완료 후, Discover를 선택해서 OpenSearch 수집된 데이터를 확인합니다.
- (Discover)
InvoicdDate
별Quantity
를 시각화 해 보겠습니다. 좌측의 Available fields에서 invoicdDate를 선택하고, 하단에 있는 Visualize를 클릭합니다. - (Visualize) 아래와 같이 Data 탭의 Metrics에서 Y-Axis를 Aggregation은
Sum
, Field는Quantity
를 선택 후 적용 합니다. - (Visualize) 좌측 상단의 [Save] 를 클릭하고, 저장한 그래프의 이름을 적은 후에 [Confirm Save] 를 클릭합니다.
- (Dashboards) 좌측의 Dashboard 아이콘을 클릭 후, [Create new dashboard] 버튼을 클릭 합니다.
- (Dashboards) 좌측 상단의 [Add] 를 클릭해서, Add Panels 에 이전 단계에서 생성한 그래프를 선택 합니다.
- (Dashboards) 좌측 상단의 [Save] 를 클릭 한 후, Save dashboard에서 Title을 입력한 이후, [Confirm Save] 를 클릭 합니다.
- (Dashboards) 아래와 같은 Dashboard를 확인할 수 있습니다.
[Top]
이 실습을 통해서 데이터 파이프라인을 만들어서 실시간 데이터 처리와 배치 데이터 처리 layer로 구성된 Lambda Architecture 구조의 Business Intelligent System을 구축해 보셨습니다.
[Top]
- slide: AWS Analytics Immersion Day - Build BI System from Scratch
- data source: Online Retail II Data Set
[Top]
- Amazon Simple Storage Service (Amazon S3)
- Amazon Athena
- Amazon OpenSearch Service
- AWS Lambda
- Amazon Kinesis Data Firehose
- Amazon Kinesis Data Streams
- Amazon QuickSight
- AWS Lambda Layers
-
AWS Lambda Layer에 등록할 Python 패키지 생성 예제: opensearch
⚠️ Python 패키지를 생성할 때는 AWS Lambda의 실행환경과 동일한 환경에서 생성해야하므로, Amazon Linux에서 Python 패키지를 생성하는 것을 추천 드립니다.[ec2-user@ip-172-31-6-207 ~] $ python3 -m venv es-lib # virtual environments을 생성함 [ec2-user@ip-172-31-6-207 ~] $ cd es-lib [ec2-user@ip-172-31-6-207 ~] $ source bin/activate (es-lib) $ mkdir -p python_modules # 필요한 패키지를 저장할 디렉터리 생성 (es-lib) $ pip install 'elasticsearch>=7.0.0,<7.11' requests requests-aws4auth -t python_modules # 필요한 패키지를 사용자가 지정한 패키지 디렉터리에 저장함 (es-lib) $ mv python_modules python # 사용자가 지정한 패키지 디렉터리 이름을 python으로 변경함 (python 디렉터리에 패키지를 설치할 경우 에러가 나기 때문에 다른 이름의 디렉터리에 패키지를 설치 후, 디렉터리 이름을 변경함) (es-lib) $ zip -r es-lib.zip python/ # 필요한 패키지가 설치된 디렉터리를 압축함 (es-lib) $ aws s3 mb s3://my-bucket-for-lambda-layer-packages # 압축한 패키지를 업로드할 s3 bucket을 생성함 (es-lib) $ aws s3 cp es-lib.zip s3://my-bucket-for-lambda-layer-packages/var/ # 압축한 패키지를 s3에 업로드 한 후, lambda layer에 패키지를 등록할 때, s3 위치를 등록하면 됨 (es-lib) $ deactivate
-
How to create a Lambda layer using a simulated Lambda environment with Docker
$ cat <<EOF > requirements.txt > elasticsearch>=7.0.0,<7.11 > requests==2.23.0 > requests-aws4auth==0.9 > EOF $ docker run -v "$PWD":/var/task "public.ecr.aws/sam/build-python3.7" /bin/sh -c "pip install -r requirements.txt -t python/lib/python3.7/site-packages/; exit" $ zip -r es-lib.zip python > /dev/null $ aws s3 mb s3://my-bucket-for-lambda-layer-packages $ aws s3 cp es-lib.zip s3://my-bucket-for-lambda-layer-packages/var/
-
- Windows SSH / Tunnel for Kibana Instructions - Amazon Elasticsearch Service
- Use an SSH Tunnel to access Kibana within an AWS VPC with PuTTy on Windows
[Top]
- Top 10 Performance Tuning Tips for Amazon Athena
- Extract, Transform and Load data into S3 data lake using CTAS and INSERT INTO statements in Amazon Athena
- Query Amazon S3 analytics data with Amazon Athena
- Elasticsearch tutorial: a quick start guide
- Run a petabyte scale cluster in Amazon Elasticsearch Service
- Analyze user behavior using Amazon Elasticsearch Service, Amazon Kinesis Data Firehose and Kibana
- Introduction to Messaging for Modern Cloud Architecture
- Understanding the Different Ways to Invoke Lambda Functions
- Amazon Kinesis Data Firehose custom prefixes for Amazon S3 objects
- Amazon Kinesis Firehose Data Transformation with AWS Lambda
- Under the hood: Scaling your Kinesis data streams
- Scale Amazon Kinesis Data Streams with AWS Application Auto Scaling
- 10 visualizations to try in Amazon QuickSight with sample data
- Visualize over 200 years of global climate data using Amazon Athena and Amazon QuickSight
- Advanced analytics with table calculations in Amazon QuickSight
- Optimize downstream data processing with Amazon Kinesis Data Firehose and Amazon EMR running Apache Spark
- Serverless Scaling for Ingesting, Aggregating, and Visualizing Apache Logs with Amazon Kinesis Firehose, AWS Lambda, and Amazon Elasticsearch Service
- Analyze Apache Parquet optimized data using Amazon Kinesis Data Firehose, Amazon Athena, and Amazon Redshift
- Our data lake story: How Woot.com built a serverless data lake on AWS
-
Securing your bastion hosts with Amazon EC2 Instance Connect
$ # (1) Create a new ssh key. $ ssh-keygen -t rsa -f my_rsa_key $ # (2) Push your SSH public key to the instance. $ aws ec2-instance-connect send-ssh-public-key \ --instance-id $BASTION_INSTANCE \ --availability-zone $DEPLOY_AZ \ --instance-os-user ec2-user \ --ssh-public-key file:///path/to/my_rsa_key.pub $ # (3) Connect to the instance using your private key. $ ssh -i /path/to/my_rsa_key ec2-user@$BASTION_DNS_NAME
[Top]
AWS CDK를 이용해서 배포하는 방법을 소개 합니다.
-
AWS CDK Toolkit을 설치합니다.
npm install -g aws-cdk
-
cdk가 정상적으로 설치되었는지, 다음 명령어를 실행해서 확인합니다.
cdk --version
예)
$ cdk --version 2.41.0 (build 56ba2ab)
cdk ls
list all stacks in the appcdk synth
emits the synthesized CloudFormation templatecdk deploy
deploy this stack to your default AWS account/regioncdk diff
compare deployed stack with current statecdk docs
open CDK documentation
[Top]
CDK로 배포할 경우, 아래 아키텍처 그림의 1(a), 1(b), 1(c), 1(f), 2(b), 2(a)
가 자동으로 생성됩니다.
-
Getting Started With the AWS CDK를 참고해서 cdk를 설치하고, cdk를 실행할 때 사용할 IAM User를 생성한 후,
~/.aws/config
에 등록합니다. (사전 준비 작업를 참고해서 IAM User를 생성합니다.) 예를 들어서, cdk_user라는 IAM User를 생성 한 후, 아래와 같이~/.aws/config
에 추가로 등록합니다.$ cat ~/.aws/config [profile cdk_user] aws_access_key_id=AKIAIOSFODNN7EXAMPLE aws_secret_access_key=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY region=us-west-2
-
Lambda Layer에 등록할 Python 패키지를 생성해서 s3 bucket에 저장한다. 에를 들어, elasticsearch 패키지를 Lambda Layer에 등록 할 수 있도록
lambda-layer-resources
라는 이름의 s3 bucket을 생성 후, 아래와 같이 저장합니다.$ aws s3 ls s3://lambda-layer-resources/var/ 2019-10-25 08:38:50 0 2019-10-25 08:40:28 1294387 es-lib.zip
-
소스 코드를 git에서 다운로드 받은 후,
S3_BUCKET_LAMBDA_LAYER_LIB
라는 환경 변수에 lambda layer에 등록할 패키지가 저장된 s3 bucket 이름을 설정 한 후,cdk deploy
명령어를 이용해서 배포합니다.$ git clone https://github.com/aws-samples/aws-analytics-immersion-day.git $ cd aws-analytics-immersion-day $ python3 -m venv .env $ source .env/bin/activate (.env) $ pip install -r requirements.txt (.env) $ export CDK_DEFAULT_ACCOUNT=$(aws sts get-caller-identity --query Account --output text) (.env) $ export CDK_DEFAULT_REGION=us-west-2 (.env) $ cdk bootstrap aws://${CDK_DEFAULT_ACCOUNT}/${CDK_DEFAULT_REGION} (.env) $ export S3_BUCKET_LAMBDA_LAYER_LIB=lambda-layer-resources (.env) $ cdk --profile cdk_user deploy --require-approval never --all
✅
cdk bootstrap ...
명령어는 CDK toolkit stack 배포를 위해 최초 한번만 실행 하고, 이후에 배포할 때는 CDK toolkit stack 배포 없이cdk deploy
명령어만 수행하면 됩니다.(.env) $ export CDK_DEFAULT_ACCOUNT=$(aws sts get-caller-identity --query Account --output text) (.env) $ export CDK_DEFAULT_REGION=us-west-2 (.env) $ export S3_BUCKET_LAMBDA_LAYER_LIB=lambda-layer-resources (.env) $ cdk --profile cdk_user deploy --require-approval never --all
-
배포한 애플리케이션을 삭제하려면,
cdk destroy
명령어를 아래와 같이 실행 합니다.(.env) $ cdk --profile cdk_user destroy --force --all
[Top]
See CONTRIBUTING for more information.
This library is licensed under the MIT-0 License. See the LICENSE file.