Skip to content

Describes the concepts of lambda architecture and the actual deployment process with an example of building a serverless business intelligence systems using Amazon Kinesis, S3, Athena, OpenSearch Service, and QuickSight.

License

Notifications You must be signed in to change notification settings

aws-samples/aws-analytics-immersion-day

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

77 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

AWS Analytics Immersion Day Workshop

The purpose of this lab is to implement a Businesss Intelligence(BI) System using AWS Analytics Services.

You'll learn the concepts of lambda architecture and the actual deployment process through the example of building a serverless business intelligence system using Amazon Kinesis, S3, Athena, OpenSearch Service, and QuickSight.

Through this lab, you will set up a Data Collection -> Store -> Analysis/Processing -> Visualization pipeline.

Table of Contents

Solutions Architecture Overview

aws-analytics-system-architecture

[Top]

Lab setup

Before starting the lab, create and configure EC2, the IAM user you need.

[Top]

Create Kinesis Data Streams to receive input data

aws-analytics-system-build-steps

Select Kinesis from the list of services on the AWS Management Console.

  1. Make sure the Kinesis Data Streams radio button is selected and click Create data stream button.
  2. Enter retail-trans as the Data stream name.
  3. Enter the desired name for Kinesis stream name (e.g. retail-trans).
  4. Choose either the On-demand or Provisioned capacity mode.
    With the On-demand mode, you can then choose Create Kinesis stream to create your data stream.
    With the Provisioned mode, you must then specify the number of shards you need, and then choose Create Kinesis stream.
    If you choose Provisioned mode, enter 1 in Number of open shards under Data stream capacity.
  5. Click the Create data stream button and wait for the status of the created kinesis stream to become active.

[Top]

Create Kinesis Data Firehose to store data in S3

Kinesis Data Firehose will allow collecting data in real-time and batch it to load into a storage location such as Amazon S3, Amazon Redshift or OpenSearch Service.

aws-analytics-system-build-steps

  1. If you are on the Kinesis Data Stream page from the previous step, select Delivery streams from the left sidebar. If you are starting from the Kinesis landing page, select the Kinesis Data Firehose radio button and click the Create delivery stream button.

  2. (Step 1: Name and source) For Delivery stream name enter retail-trans.

  3. Under Choose a source, select the Kinesis Data Stream radio button and choose retail-trans stream that you created earlier from the dropdown list. Click Next. If you do not see your data stream listed, make sure you are in Oregon region and your data stream from previous step is in Active state.

  4. (Step 2: Process records) For Transform source records with AWS Lambda and Convert record format, leave both at Disabled and click Next.

  5. (Step 3: Choose a destination) Select Amazon S3 as Destination and click Create new to create a new S3 bucket. S3 bucket names are globally unique, so choose a bucket name that is unique for you. You can call it aws-analytics-immersion-day-xxxxxxxx where xxxxxxxx is a series of random numbers or characters of your choice. You can use something like your name or your favorite number.

  6. Under S3 Prefix, copy and paste the following text exactly as shown. Enter S3 prefix. For example, type as follows:

    json-data/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/
    

    At this point, you may see a message You can't include expressions in the prefix unless you also specify an error prefix. Ignore this, it will go away once you enter the error prefix in the next step.

    Under S3 error prefix, copy and paste the following text exactly as shown.

    error-json/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}
    

    ⚠️ S3 prefix or S3 error prefix pattern must not contain a new line(\n) character. If you have copied the example pattern and pasted it into the S3 prefix or S3 error prefix, it is a good idea to remove the trailing line breaks.

    After entering S3 prefix and 3 error prefix, click Next. (cf. Custom Prefixes for Amazon S3 Objects)

  7. (Step 4: Configure settings) Set buffer size to 1 MB and buffer interval to 60 seconds in S3 buffer conditions. Leave everything else as default.

  8. Under Permissions IAM role, select Create or update IAM role and click the Next button. aws-kinesis-firehose-create_new_iam_role

  9. (Step 5: Review) If there are no errors after checking the information entered in Review, click the Create delivery stream button to complete the Firehose creation.

[Top]

Verify data pipeline operation

In this step, we will generate sample data and verify it is being processed and stored as follows- Kinesis Data Streams -> Kinesis Data Firehose -> S3.

aws-analytics-system-build-steps

  1. Connect SSH to the previously created E2 instance. You can go to the AWS Console and click the Connect button on the instance details page, or SSH from your local machine command line using the key pair you downloaded.
  2. Run gen_kinesis_data.py script on the EC2 instance by entering the following command -
    python3 gen_kinesis_data.py \
      --region-name us-west-2 \
      --service-name kinesis \
      --stream-name retail-trans
    If you would like to know more about the usage of this command, you can type
    python3 gen_kinesis_data.py --help
  3. Verify that data is generated every second. Let it run for a few minutes and terminate the script. You can enter Ctrl+C to end the script execution.
  4. Go to S3 service and open the bucket you created earlier. You can see that the original data has been delivered by Kinesis Data Firehose to S3 and stored in a folder structure by year, month, day, and hour.

[Top]

Analyze data using Athena

Using Amazon Athena, you can create tables based on data stored in S3, query those tables using SQL, and view query results.

First, create a database to query the data.

aws-analytics-system-build-steps

Step 1: Create a database

  1. Go to Athena from the list of services on the AWS Management console.
  2. The first time you visit Athena console, you will be taken to the Get Started page. Click the Get Started button to open the query editor.
  3. If this is your first time using Athena, you need to first set an S3 location to save Athena's query results. Click the set up a query result location in Amazon S3 box. aws-athena-setup-query-results-location-01 In this lab, we will create a new folder in the same S3 bucket you created in [Step-1b] Create Kinesis Data Firehose to store data in S3 section. For example, set your query location as s3://aws-analytics-immersion-day-xxxxxxxx/athena-query-results/ (xxxxxxxx is the unique string you gave to your S3 bucket) aws-athena-setup-query-results-location-02 Unless you are visiting for the first time, Athena Query Editor is oppened.
  4. You can see a query window with sample queries in the Athena Query Editor. You can start typing your SQL query anywhere in this window.
  5. Create a new database called mydatabase. Enter the following statement in the query window and click the Run Query button.
    CREATE DATABASE IF NOT EXISTS mydatabase
    
  6. Confirm that the the dropdown list under Database section on the left panel has updated with a new database called mydatabase. If you do not see it, make sure the Data source is selected to AwsDataCatalog. aws-athena-create-database

Step 2: Create a table

  1. Make sure that mydatabase is selected in Database, and click the + button above the query window to open a new query.

  2. Copy the following query into the query editor window, replace the xxxxxxx in the last line under LOCATION with the string of your S3 bucket, and click the Run Query button to execute the query to create a new table.

    CREATE EXTERNAL TABLE IF NOT EXISTS `mydatabase.retail_trans_json`(
      `invoice` string COMMENT 'Invoice number',
      `stockcode` string COMMENT 'Product (item) code',
      `description` string COMMENT 'Product (item) name',
      `quantity` int COMMENT 'The quantities of each product (item) per transaction',
      `invoicedate` timestamp COMMENT 'Invoice date and time',
      `price` float COMMENT 'Unit price',
      `customer_id` string COMMENT 'Customer number',
      `country` string COMMENT 'Country name')
    PARTITIONED BY (
      `year` int,
      `month` int,
      `day` int,
      `hour` int)
    ROW FORMAT SERDE
      'org.openx.data.jsonserde.JsonSerDe'
    STORED AS INPUTFORMAT
      'org.apache.hadoop.mapred.TextInputFormat'
    OUTPUTFORMAT
      'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
    LOCATION
      's3://aws-analytics-immersion-day-xxxxxxxx/json-data'
    

    If the query is successful, a table named retail_trans_json is created and displayed on the left panel under the Tables section.

    If you get an error, check if (a) you have updated the LOCATION to the correct S3 bucket name, (b) you have mydatabase selected under the Database dropdown, and (c) you have AwsDataCatalog selected as the Data source.

  3. After creating the table, click the + button to create a new query. Run the following query to load the partition data.

    MSCK REPAIR TABLE mydatabase.retail_trans_json
    

    You can list all the partitions in the Athena table in unsorted order by running the following query.

    SHOW PARTITIONS mydatabase.retail_trans_json
    

Step 3: Query Data

  • Click the + button to open a new query tab. Enter the following SQL statement to query 10 transactions from the table and click Run Query.

    SELECT *
    FROM retail_trans_json
    LIMIT 10
    

    The result is returned in the following format: aws_athena_select_all_limit_10

    You can experiment with writing different SQL statements to query, filter, sort the data based on different parameters. You have now learned how Amazon Athena allows querying data in Amazon S3 easily without requiring any database servers.

[Top]

Data visualization with QuickSight

In this section, we will use Amazon QuickSight to visualize the data that was collected by Kinesis, stored in S3, and analyzed using Athena previously.

aws-analytics-system-build-steps

  1. Go to QuickSight Console.
  2. Click the Sign up for QuickSight button to sign up for QuickSight.
  3. Select Standard Edition and click the Continue button.
  4. Specify a QuickSight account name. This name should be unique to you, so use the unique string in the account name similar to how you did for the S3 bucket name earlier. Enter your personal email address under Notification email address.
  5. QuckSight needs access to S3 to be able to read data. Check the Amazon S3 box, and select aws-analytics-immersion-day-xxxxxxxx bucket from the list. Click Finish. aws-quicksight-choose-s3-bucket
  6. After the account is created, click the Go to Amazon QuickSight button. Confirm that you are in US West (Oregon) region. Click on the account name on the top right corner and select US West (Oregon) if it is not already set to Oregon. Click the New Analysis button and click on New dataset on the next screen. aws-quicksight-new_data_sets
  7. Click Athena and enter retail-quicksight in the Data source name in the pop-up window. Click Validate connection to change to Validated, then click the Create data source button. aws-quicksight-athena_data_source
  8. On the Choose your table screen, select Catalog AwsDataCatalog, Database mydatabase and Tables retail_trans_json. Click the Select button. aws-quicksight-athena-choose_your_table
  9. On the Finish dataset creation screen, choose Directly query your data and click the Visualize button. aws-quicksight-finish-dataset-creation
  10. Let's visualize the Quantity and Price by InvoiceDate. Select vertical bar chart from the Visual types box on the bottom left. In the field wells, drag invoicedate from the left panel into X axis, drag price, and quantity into Value. You will see a chart get populated as shown below. aws-quicksight-bar-chart
  11. Let's share the Dashboard we just created with other users. Click on the account name on the top right corner and select Manage QuickSight.
  12. Click the + button on the right side, and enter an email address of the person with whom you want to share the visualization. Click the Invite button and close the popup window.
    aws-quicksight-user-invitation
  13. Users you invite will receive the following Invitation Email. They can click the button to accept invitation. aws-quicksight-user-email
  14. Return to the QuickSight home screen, select your analysis, and click Share> Share analysis from the upper right corner. aws-quicksight-share-analysis
  15. Select BI_user01 and click the Share button. aws-quicksight-share-analysis-users
  16. Users receive the following email: You can check the analysis results by clicking Click to View. aws-quicksight-user-email-click-to-view

[Top]

(Optional) Combine small files stored in S3 into large files using AWS Lambda Function

When real-time incoming data is stored in S3 using Kinesis Data Firehose, files with small data size are created. To improve the query performance of Amazon Athena, it is recommended to combine small files into one large file. To run these tasks periodically, we are going to create an AWS Lambda function function that executes Athena's Create Table As Select (CTAS) query.

aws-analytics-system-build-steps-extra

Step 1: Create a table to store CTAS query results

  1. Access Athena Console and go to the Athena Query Editor.
  2. Select mydatabase from DATABASE and navigate to New Query.
  3. Enter the following CREATE TABLE statement in the query window and select Run Query.
    In this exercise, we will change the json format data of the retal_tran_json table into parquet format and store it in a table called ctas_retail_trans_parquet.
    The data in the ctas_retail_trans_parquet table will be saved in the location s3://aws-analytics-immersion-day-xxxxxxxx/parquet-retail-trans of the S3 bucket created earlier.
    CREATE EXTERNAL TABLE `mydatabase.ctas_retail_trans_parquet`(
      `invoice` string COMMENT 'Invoice number',
      `stockcode` string COMMENT 'Product (item) code',
      `description` string COMMENT 'Product (item) name',
      `quantity` int COMMENT 'The quantities of each product (item) per transaction',
      `invoicedate` timestamp COMMENT 'Invoice date and time',
      `price` float COMMENT 'Unit price',
      `customer_id` string COMMENT 'Customer number',
      `country` string COMMENT 'Country name')
    PARTITIONED BY (
      `year` int,
      `month` int,
      `day` int,
      `hour` int)
    ROW FORMAT SERDE
      'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
    STORED AS INPUTFORMAT
      'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
    OUTPUTFORMAT
      'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
    LOCATION
      's3://aws-analytics-immersion-day-xxxxxxxx/parquet-retail-trans'
    TBLPROPERTIES (
      'has_encrypted_data'='false',
      'parquet.compression'='SNAPPY')
    ;
    

Step 2: Create an AWS Lambda Function

  1. Open the AWS Lambda Console.
  2. Select Create a function.
  3. Enter MergeSmallFiles for Function name.
  4. Select Python 3.11 in Runtime.
  5. Select Create a function. aws-athena-ctas-lambda-create-function
  6. Select Add trigger in the Designer tab.
  7. Select CloudWatch Events/EventBridge in Select a trigger of Trigger configuration. Select Create a new rule in Rule and enter the appropriate rule name (eg MergeSmallFilesEvent) in Rule name. Select Schedule expression as the rule type, and enter cron(5 * * * *) for running the task every 5 minutes in the schedule expression. aws-athena-ctas-lambda-add-trigger
  8. In Trigger configuration, click [Add].
  9. Copy and paste the code from the athena_ctas.py file into the code editor of the Function code. Click Deploy.
  10. Click [Add environment variables] to register the following environment variables.
    OLD_DATABASE=<source database>
    OLD_TABLE_NAME=<source table>
    NEW_DATABASE=<destination database>
    NEW_TABLE_NAME=<destination table>
    WORK_GROUP=<athena workgroup>
    OLD_TABLE_LOCATION_PREFIX=<s3 location prefix of source table>
    OUTPUT_PREFIX=<destination s3 prefix>
    STAGING_OUTPUT_PREFIX=<staging s3 prefix used by athena>
    COLUMN_NAMES=<columns of source table excluding partition keys>
    For example, set Environment variables as follows:
    OLD_DATABASE=mydatabase
    OLD_TABLE_NAME=retail_trans_json
    NEW_DATABASE=mydatabase
    NEW_TABLE_NAME=ctas_retail_trans_parquet
    WORK_GROUP=primary
    OLD_TABLE_LOCATION_PREFIX=s3://aws-analytics-immersion-day-xxxxxxxx/json-data
    OUTPUT_PREFIX=s3://aws-analytics-immersion-day-xxxxxxxx/parquet-retail-trans
    STAGING_OUTPUT_PREFIX=s3://aws-analytics-immersion-day-xxxxxxxx/tmp
    COLUMN_NAMES=invoice,stockcode,description,quantity,invoicedate,price,customer_id,country
    
  11. To add the IAM Policy required to execute Athena queries, click View the MergeSmallFiles-role-XXXXXXXX role on the IAM console. in the Execution role and modify the IAM Role. aws-athena-ctas-lambda-execution-iam-role
  12. After clicking the Attach policies button in the Permissions tab of IAM Role, add AmazonAthenaFullAccess and AmazonS3FullAccess in order. aws-athena-ctas-lambda-iam-role-policies
  13. Select Edit in Basic settings. Adjust Memory and Timeout appropriately. In this lab, we set Timout to 5 min.

[Top]

Create Amazon OpenSearch Service for Real-Time Data Analysis

An OpenSearch cluster is created to store and analyze data in real time. An OpenSearch Service domain is synonymous with an OpenSearch cluster. Domains are clusters with the settings, instance types, instance counts, and storage resources that you specify.

aws-analytics-system-build-steps

  1. In the AWS Management Console, choose Amazon OpenSearch Service under Analytics.
  2. Choose Create a new domain.
  3. Provide a name for the domain. The examples in this tutorial use the name retail.
  4. Ignore the Custom endpoint setting.
  5. For the deployment type, choose Production.
  6. For Version, choose the latest version. For more information about the versions, see Supported OpenSearch Versions.
  7. Under Data nodes, change the instance type to t3.small.search and keep the default value of three nodes.
  8. Under Network, choose VPC access (recommended). Choose the appropriate VPC and subnet. Select the es-cluster-sg created in the preparation step as Security Groups.
  9. In the fine-grained access control settings, choose Create master user. Provide a username and password.
  10. For now, ignore the SAML authentication and Amazon Cognito authentication sections.
  11. For Access policy, choose Only use fine-grained access control.
  12. Ignore the rest of the settings and choose Create. New domains typically take 15–30 minutes to initialize, but can take longer depending on the configuration.

[Top]

Ingest real-time data into OpenSearch using AWS Lambda Functions

You can index data into Amazon OpenSearch Service in real time using a Lambda function. In this lab, you will create a Lambda function using the AWS Lambda console.

aws-analytics-system-build-steps

To add a common library to Layers for use by Lambda functions,

  1. Open the AWS Lambda Console.
  2. Enter the Layers menu and select Create layer.
  3. Enter es-lib for the Name.
  4. Select Upload a file from Amazon S3 and enter the s3 link url where the library code is stored or the compressed library code file. For how to create es-lib.zip, refer to Example of creating a Python package to register in AWS Lambda Layer.
  5. Select Python 3.11 from Compatible runtimes.

To create a Lambda function,

  1. Open the AWS Lambda Console.
  2. Select Create a function.
  3. Enter UpsertToES for Function name.
  4. Select Python 3.11 in Runtime.
  5. Select Create a function. aws-lambda-create-function
  6. In the Designer tab. choose Add a layer at Layers.
  7. Select Custome Layers in Choose a Layer section, and choose Name and Version of the previously created layer as Name and Version in Custom layers. aws-lambda-add-layer-to-function
  8. Click Add.
  9. Select UpsertToES in the Designer tab to return to Function code and Configuration.
  10. Copy and paste the code from the upsert_to_es.py file into the code editor of the Function code. Click Deploy
  11. In Environment variables, click Edit.
  12. Click Add environment variables to register the following 4 environment variables.
    ES_HOST=<opensearch service domain>
    ES_INDEX=<opensearch index name>
    ES_TYPE=<opensearch type name>
    REQUIRED_FIELDS=<columns to be used as primary key>
    REGION_NAME=<region-name>
    DATE_TYPE_FIELDS=<columns of which data type is either date or timestamp>
    For example, set Environment variables as follows:
    ES_HOST=vpc-retail-xkl5jpog76d5abzhg4kyfilymq.us-west-1.es.amazonaws.com
    ES_INDEX=retail
    ES_TYPE=trans
    REQUIRED_FIELDS=Invoice,StockCode,Customer_ID
    REGION_NAME=us-west-2
    DATE_TYPE_FIELDS=InvoiceDate
    
  13. Click Save.
  14. In order to execute the lambda function in the VPC and read data from Kinesis Data Streams, you need to add the IAM Policy required for the Execution role required to execute the lamba function. Click View the UpsertToES-role-XXXXXXXX role on the IAM console. to edit the IAM Role. aws-lambda-execution-iam-role
  15. After clicking the Attach policies button in the Permissions tab of IAM Role, add AWSLambdaVPCAccessExecutionRole and AmazonKinesisReadOnlyAccess in order. aws-lambda-iam-role-policies
  16. Add the following policy statements into customer inline policy (e.g., UpsertToESDefaultPolicyXXXXX). The following IAM Policy enables the lambda function to ingest data into the retail index in the opensearch service.
    {
        "Action": [
            "es:DescribeElasticsearchDomain",
            "es:DescribeElasticsearchDomainConfig",
            "es:DescribeElasticsearchDomains",
            "es:ESHttpPost",
            "es:ESHttpPut"
        ],
        "Resource": [
            "arn:aws:es:region:account-id:domain/retail",
            "arn:aws:es:region:account-id:domain/retail/*"
        ],
        "Effect": "Allow"
    },
    {
        "Action": "es:ESHttpGet",
        "Resource": [
            "arn:aws:es:region:account-id:domain/retail",
            "arn:aws:es:region:account-id:domain/retail/_all/_settings",
            "arn:aws:es:region:account-id:domain/retail/_cluster/stats",
            "arn:aws:es:region:account-id:domain/retail/_nodes",
            "arn:aws:es:region:account-id:domain/retail/_nodes/*/stats",
            "arn:aws:es:region:account-id:domain/retail/_nodes/stats",
            "arn:aws:es:region:account-id:domain/retail/_stats",
            "arn:aws:es:region:account-id:domain/retail/retail*/_mapping/trans",
            "arn:aws:es:region:account-id:domain/retail/retail*/_stats"
        ],
        "Effect": "Allow"
    }
    
  17. Click the Edit button in the VPC category to go to the Edit VPC screen. Select Custom VPC for VPC connection. Choose the VPC and subnets where you created the domain for the OpenSearch service, and choose the security groups that are allowed access to the OpenSearch service domain.
  18. Select Edit in Basic settings. Adjust Memory and Timeout appropriately. In this lab, we set Timout to 5 min.
  19. Go back to the Designer tab and select Add trigger.
  20. Select Kinesis from Select a trigger in the Trigger configuration.
  21. Select the Kinesis Data Stream (retail-trans) created earlier in Kinesis stream.
  22. Click Add. aws-lambda-kinesis

Enable the Lambda function to ingest records into Amazon OpenSearch

The lambda function uses the delivery role to sign HTTP (Signature Version 4) requests before sending the data to the Amazon OpenSearch Service endpoint.

You manage Amazon OpenSearch Service fine-grained access control permissions using roles, users, and mappings. This section describes how to create roles and set permissions for the lambda function.

Complete the following steps:

  1. The Amazon OpenSearch cluster is provisioned in a VPC. Hence, the Amazon OpenSearch endpoint and the Kibana endpoint are not available over the internet. In order to access the endpoints, we have to create a ssh tunnel and do local port forwarding.

    • Option 1) Using SSH Tunneling

      1. Setup ssh configuration

        For Winodws, refer to here.
        For Mac/Linux, to access the OpenSearch Cluster, add the ssh tunnel configuration to the ssh config file of the personal local PC as follows.

        # OpenSearch Tunnel
        Host estunnel
          HostName <EC2 Public IP of Bastion Host>
          User ec2-user
          IdentitiesOnly yes
          IdentityFile ~/.ssh/analytics-hol.pem
          LocalForward 9200 <OpenSearch Endpoint>:443
        • EC2 Public IP of Bastion Host uses the public IP of the EC2 instance created in the Lab setup step.
        • ex)
        ~$ ls -1 .ssh/
        analytics-hol.pem
        config
        id_rsa
        ~$ tail .ssh/config
        # OpenSearch Tunnel
        Host estunnel
          HostName 214.132.71.219
          User ubuntu
          IdentitiesOnly yes
          IdentityFile ~/.ssh/analytics-hol.pem
          LocalForward 9200 vpc-retail-qvwlxanar255vswqna37p2l2cy.us-west-2.es.amazonaws.com:443
        ~$
      2. Run ssh -N estunnel in Terminal.

    • Option 2) Connect using the EC2 Instance Connect CLI

      1. Install EC2 Instance Connect CLI
        sudo pip install ec2instanceconnectcli
        
      2. Run
        mssh ec2-user@{bastion-ec2-instance-id} -N -L 9200:{opensearch-endpoint}:443
      • ex)
        $ mssh ec2-user@i-0203f0d6f37ccbe5b -N -L 9200:vpc-retail-qvwlxanar255vswqna37p2l2cy.us-west-2.es.amazonaws.com:443
        
  2. Connect to https://localhost:9200/_dashboards/app/login? in a web browser.

  3. Enter the master user and password that you set up when you created the Amazon OpenSearch Service endpoint. The user and password are stored in the AWS Secrets Manager as a name such as OpenSearchMasterUserSecret1-xxxxxxxxxxxx.

  4. In the Welcome screen, click the toolbar icon to the left side of Home button. Choose Security. ops-dashboards-sidebar-menu-security

  5. Under Security, choose Roles.

  6. Choose Create role.

  7. Name your role; for example, firehose_role.

  8. For cluster permissions, add cluster_composite_ops and cluster_monitor.

  9. Under Index permissions, choose Index Patterns and enter index-name*; for example, retail*.

  10. Under Permissions, add three action groups: crud, create_index, and manage.

  11. Choose Create. ops-create-firehose_role

In the next step, you map the IAM role that the lambda function uses to the role you just created.

  1. Choose the Mapped users tab. ops-role-mappings
  2. Choose Manage mapping and under Backend roles,
  3. For Backend Roles, enter the IAM ARN of the role the lambda function uses: arn:aws:iam::123456789012:role/UpsertToESServiceRole709-xxxxxxxxxxxx. ops-entries-for-firehose_role
  4. Choose Map.

Note: After OpenSearch Role mapping for the lambda function, you would not be supposed to meet a data delivery failure with the lambda function like this:

[ERROR] AuthorizationException: AuthorizationException(403, 'security_exception', 'no permissions for [cluster:monitor/main] and User [name=arn:aws:iam::123456789012:role/UpsertToESServiceRole709-G1RQVRG80CQY, backend_roles=[arn:aws:iam::123456789012:role/UpsertToESServiceRole709-G1RQVRG80CQY], requestedTenant=null]')

[Top]

Data visualization with Kibana

Visualize data collected from Amazon OpenSearch Service using Kibana.

aws-analytics-system-build-steps

  1. The Amazon OpenSearch cluster is provisioned in a VPC. Hence, the Amazon OpenSearch endpoint and the Kibana endpoint are not available over the internet. In order to access the endpoints, we have to create a ssh tunnel and do local port forwarding.

    • Option 1) Using SSH Tunneling

      1. Setup ssh configuration

        For Winodws, refer to here.
        For Mac/Linux, to access the OpenSearch Cluster, add the ssh tunnel configuration to the ssh config file of the personal local PC as follows.

        # OpenSearch Tunnel
        Host estunnel
          HostName <EC2 Public IP of Bastion Host>
          User ec2-user
          IdentitiesOnly yes
          IdentityFile ~/.ssh/analytics-hol.pem
          LocalForward 9200 <OpenSearch Endpoint>:443
        • EC2 Public IP of Bastion Host uses the public IP of the EC2 instance created in the Lab setup step.
        • ex)
        ~$ ls -1 .ssh/
        analytics-hol.pem
        config
        id_rsa
        ~$ tail .ssh/config
        # OpenSearch Tunnel
        Host estunnel
          HostName 214.132.71.219
          User ubuntu
          IdentitiesOnly yes
          IdentityFile ~/.ssh/analytics-hol.pem
          LocalForward 9200 vpc-retail-qvwlxanar255vswqna37p2l2cy.us-west-2.es.amazonaws.com:443
        ~$
      2. Run ssh -N estunnel in Terminal.

    • Option 2) Connect using the EC2 Instance Connect CLI

      1. Install EC2 Instance Connect CLI
        sudo pip install ec2instanceconnectcli
        
      2. Run
        mssh ec2-user@{bastion-ec2-instance-id} -N -L 9200:{opensearch-endpoint}:443
      • ex)
        $ mssh ec2-user@i-0203f0d6f37ccbe5b -N -L 9200:vpc-retail-qvwlxanar255vswqna37p2l2cy.us-west-2.es.amazonaws.com:443
        
  2. Connect to https://localhost:9200/_dashboards/app/login? in a web browser.

  3. Enter the master user and password that you set up when you created the Amazon OpenSearch Service endpoint. The user name and password of the master user are stored in the AWS Secrets Manager as a name such as OpenSearchMasterUserSecret1-xxxxxxxxxxxx.

  4. In the Welcome screen, click the toolbar icon to the left side of Home button. Choose Stack Managerment. ops-dashboards-sidebar-menu

  5. (Management / Create index pattern) In Step 1 of 2: Define index pattern of Create index pattern, enter retail* in Index pattern. ops-create-index-pattern

  6. (Management / Create index pattern) Choose > Next step.

  7. (Management / Create index pattern) Select InvoiceDate for the Time Filter field name in Step 2 of 2: Configure settings of the Create index pattern. ops-create-index-pattern-configure-setting

  8. (Management / Create index pattern) Click Create index pattern.

  9. (Management / Advanced Settings) After selecting Advanced Settings from the left sidebar menu, set Timezone for date formatting to Etc/UTC. Since the log creation time of the test data is based on UTC, Kibana's Timezone is also set to UTC. kibana-02d-management-advanced-setting

  10. (Discover) After completing the creation of Index pattern, select Discover to check the data collected in OpenSearch. kibana-03-discover

  11. (Discover) Let's visualize the Quantity by InvoicdDate. Select invoicdDate from Available fields on the left, and click Visualize at the bottom kibana-04-discover-visualize

  12. (Visualize) After selecting Y-Axis in Metrics on the Data tab, apply Sum for Aggregation, and Quantity for Field as shown below. kibana-05-discover-change-metrics

  13. (Visualize) Click Save in the upper left corner, write down the name of the graph you saved, and then click Confirm Save. kibna-08-visualize-save

  14. (Dashboards) Click Dashboard icon on the left and click the Create new dashboard button. kibana-09-dashboards

  15. (Dashboards) You can see the following Dashboards. kibana-13-complete

[Top]

Recap and Review

⚠️ At the end of this lab, you should delete the resources you used to avoid incurring additional charges for the AWS account you used.

Through this lab, we have built a Business Intelligent System with Lambda Architecture such that consists of real-time data processing and batch data processing layers.

[Top]

Resources

[Top]

Reference

AWS Developer Guide By Services

  • Amazon Simple Storage Service (Amazon S3)
  • Amazon Athena
  • Amazon OpenSearch Service
  • AWS Lambda
  • Amazon Kinesis Data Firehose
  • Amazon Kinesis Data Streams
  • Amazon QuickSight
  • AWS Lambda Layers
    • Example of creating a python package to register with AWS Lambda layer: elasticsearch

      ⚠️ You should create the python package on Amazon Linux, otherwise create it using a simulated Lambda environment with Docker.

      [ec2-user@ip-172-31-6-207 ~] $ python3 -m venv es-lib
      [ec2-user@ip-172-31-6-207 ~] $ cd es-lib
      [ec2-user@ip-172-31-6-207 ~] $ source bin/activate
      (es-lib) $ mkdir -p python_modules
      (es-lib) $ pip install opensearch-py==2.0.1 requests==2.31.0 requests-aws4auth==1.1.2 -t python_modules
      (es-lib) $ mv python_modules python
      (es-lib) $ zip -r es-lib.zip python/
      (es-lib) $ aws s3 mb s3://my-bucket-for-lambda-layer-packages
      (es-lib) $ aws s3 cp es-lib.zip s3://my-bucket-for-lambda-layer-packages/var/
      (es-lib) $ deactivate
      
    • How to create a Lambda layer using a simulated Lambda environment with Docker

      $ cat <<EOF > requirements.txt
      > opensearch-py==2.0.1
      > requests==2.31.0
      > requests-aws4auth==1.1.2
      > EOF
      $ docker run -v "$PWD":/var/task "public.ecr.aws/sam/build-python3.11" /bin/sh -c "pip install -r requirements.txt -t python/lib/python3.11/site-packages/; exit"
      $ zip -r es-lib.zip python > /dev/null
      $ aws s3 mb s3://my-bucket-for-lambda-layer-packages
      $ aws s3 cp es-lib.zip s3://my-bucket-for-lambda-layer-packages/var/
      

SSH Tunnel for Kibana Instructions with PuTTy on Windows

[Top]

Further readings

Amazon S3
Amazon Athena
Amazon Elasticsearch Service
AWS Lambda
Amazon Kinesis Data Firehose
Amazon Kinesis Data Streams
Amazon Kinesis Data Analytics
Amazon QuickSight
Etc
Securely Connect Bastion Hosts
  • Securing your bastion hosts with Amazon EC2 Instance Connect

    $ # (1) Create a new ssh key.
    $ ssh-keygen -t rsa -f my_rsa_key
    
    $ # (2) Push your SSH public key to the instance.
    $ aws ec2-instance-connect send-ssh-public-key \
      --instance-id $BASTION_INSTANCE \
      --availability-zone $DEPLOY_AZ \
      --instance-os-user ec2-user \
      --ssh-public-key file:///path/to/my_rsa_key.pub
    
    $ # (3) Connect to the instance using your private key.
    $ ssh -i /path/to/my_rsa_key ec2-user@$BASTION_DNS_NAME
    
  • Connect using the EC2 Instance Connect CLI

     $ sudo pip install ec2instanceconnectcli
     $ mssh ec2-user@i-001234a4bf70dec41EXAMPLE # ec2-instance-id
     

[Top]

Deployment by AWS CDK

⚠️ At the end of this lab, you should delete the resources you used to avoid incurring additional charges for the AWS account you used.

Introducing how to deploy using the AWS CDK.

Prerequisites

  1. Install AWS CDK Toolkit.

    npm install -g aws-cdk
  2. Verify that cdk is installed properly by running the following command:

    cdk --version
    

    ex)

    $ cdk --version
    2.41.0 (build 56ba2ab)
Useful commands
  • cdk ls list all stacks in the app
  • cdk synth emits the synthesized CloudFormation template
  • cdk deploy deploy this stack to your default AWS account/region
  • cdk diff compare deployed stack with current state
  • cdk docs open CDK documentation

[Top]

Deployment

When deployed as CDK, 1(a), 1(b), 1(c), 1(f), 2(b), 2(a) in the architecture diagram below are automatically created.

aws-analytics-system-build-steps-extra

  1. Refer to Getting Started With the AWS CDK to install cdk. Create an IAM User to be used when running cdk and register it in ~/.aws/config. (cf. Creating an IAM User)
    For example, after creating an IAM User called cdk_user, add it to ~/.aws/config as shown below.

    $ cat ~/.aws/config
    [profile cdk_user]
    aws_access_key_id=AKIAIOSFODNN7EXAMPLE
    aws_secret_access_key=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
    region=us-west-2
  2. Create a Python package to register in the Lambda Layer and store it in the s3 bucket. For example, create an s3 bucket named lambda-layer-resources so that you can save the elasticsearch package to register in the Lambda Layer as follows.

    $ aws s3 ls s3://lambda-layer-resources/var/
    2019-10-25 08:38:50          0
    2019-10-25 08:40:28    1294387 es-lib.zip
  3. After downloading the source code from git, enter the s3 bucket name where the package to be registered in the lambda layer is stored in an environment variable called S3_BUCKET_LAMBDA_LAYER_LIB. After setting, deploy using the cdk deploy command.

    $ git clone https://github.com/aws-samples/aws-analytics-immersion-day.git
    $ cd aws-analytics-immersion-day
    $ python3 -m venv .env
    $ source .env/bin/activate
    (.env) $ pip install -r requirements.txt
    (.env) $ export CDK_DEFAULT_ACCOUNT=$(aws sts get-caller-identity --query Account --output text)
    (.env) $ export CDK_DEFAULT_REGION=us-west-2
    (.env) $ cdk bootstrap aws://${CDK_DEFAULT_ACCOUNT}/${CDK_DEFAULT_REGION}
    (.env) $ export S3_BUCKET_LAMBDA_LAYER_LIB=lambda-layer-resources
    (.env) $ cdk --profile cdk_user deploy --require-approval never --all

    cdk bootstrap ... command is executed only once for the first time to deploy CDK toolkit stack, and for subsequent deployments, you only need to execute cdk deploy command without distributing CDK toolkit stack.

    (.env) $ export CDK_DEFAULT_ACCOUNT=$(aws sts get-caller-identity --query Account --output text)
    (.env) $ export CDK_DEFAULT_REGION=us-west-2
    (.env) $ export S3_BUCKET_LAMBDA_LAYER_LIB=lambda-layer-resources
    (.env) $ cdk --profile cdk_user deploy --require-approval never --all
  4. Enable the Lambda function to ingest records into Amazon OpenSearch.

Clean Up

To delete the deployed application, execute the cdk destroy command as follows.

(.env) $ cdk --profile cdk_user destroy --force --all

[Top]

Security

See CONTRIBUTING for more information.

License

This library is licensed under the MIT-0 License. See the LICENSE file.

About

Describes the concepts of lambda architecture and the actual deployment process with an example of building a serverless business intelligence systems using Amazon Kinesis, S3, Athena, OpenSearch Service, and QuickSight.

Topics

Resources

License

Code of conduct

Security policy

Stars

Watchers

Forks

Packages

No packages published