This project demonstrates the use of AWS, Apache NiFi, and Snowflake to implement real-time data ingestion and manage Slowly Changing Dimensions (SCD) Type 1 and Type 2 changes for customer data. This pipeline provides a scalable solution for both current and historical data tracking in a data warehouse.
- Setup Instructions
- Conclusion
- References
- Install Docker Composer
- Create the User for Docker in EC2
- Install Pip Package
- Copy the Docker Compose File to EC2 from your local machine
- Start the Docker Container
- Run
docker-compose upin the EC2 - Services will run on certain ports:
- Jupyter Lab: exposed on 4888 (internal port mapping is done between Docker and EC2 machine)
- Zookeeper: 2181
- NiFi: 2080
In Security Group, add inbound rules to allow traffic to the required ports.
- Access Jupyter Notebook (port 4888) to generate fake data using the Faker library.
- Create a
Customer.csvfile containing the generated data. - Save the file to
/opt/workspace/nifi/fake-datasetin the Docker NiFi container.
- Access NiFi (port 2080).
- Create three tasks in NiFi:
- List File: Monitors the directory and detects new files.
- Fetch File: Retrieves files detected by the List File task.
- Put S3 Object: Uploads files to an AWS S3 bucket.
- In AWS IAM, create a user with S3 full access.
- Copy Access Key and Secret Key to configure NiFi.
- Set the Bucket Name and Object Key for file storage.
- Create a new database and three tables in Snowflake:
- Customer Table: Stores current customer data (SCD Type 1).
- Customer History: Maintains all historical updates (SCD Type 2).
- Customer Raw Data: Staging table for initial data loading.
- Create a Stream to track changes in the Customer table.
- Create an external stage in Snowflake linked to the S3 bucket.
- Define a File Format for incoming data.
- Configure a Pipe to load data from S3 to the
CUSTOMER_RAWtable in Snowflake. - Link Snowpipe notifications to S3 using SQS Channel.
- Use a Merge Statement to update records in the Customer table:
- Match records in
CUSTOMER_RAWwithCUSTOMER. - If a matching
customer_idexists, update records. - If no match, insert new records.
- Match records in
- Create a stored procedure to handle the merge.
- Schedule the procedure to run every minute using a task.
- Set up a Stream on Customer Table to monitor changes.
- For INSERT actions, add a new record to the
CUSTOMER_HISTORYtable. - For UPDATE actions, close the previous version of the record and insert a new row.
- For DELETE actions, mark the end timestamp on the existing record.
- Set up tasks to automate these actions on a regular schedule.
This project demonstrates the use of AWS, Apache NiFi, and Snowflake to implement real-time data ingestion and manage SCD Type 1 and SCD Type 2 changes for customer data. This pipeline provides a scalable solution for both current and historical data tracking in a data warehouse.
