This app demonstrates the real-time incremental computation capabilities of Materialize in the context of an e-commerce website.
An e-commerce business wants to understand:
- Order trends throughout the day to discern patterns.
- What is selling the most?
- Understand supply/demand and manage inventory.
- Show inventory status in the website to encourage users to buy.
- Conversion Funnel: Effectiveness of the website in converting pageviews to actual buys.
- Low-stock Alerts: Generate alerts and automatically place orders to the warehouse if a specific item is close to running out of stock
We'll build materialized views that answer most of the questions by providing data in a business intelligence dashboard.
To generate the data we'll simulate users, items, purchases and pageviews on a fictional e-commerce website.
To simplify deploying all of this infrastructure, the demo is enclosed in a series of Docker images glued together via Docker Compose. As a secondary benefit, you can run the demo via Linux, an EC2 VM instance, or a Mac laptop.
The [docker compose file](docker compose.yml) spins up containers with the following names, connections and roles:
Our load generator (loadgen
) is a python script that does two things:
- It seeds MySQL with
item
,user
andpurchase
tables, and then begins rapidly addingpurchase
rows that join an item and a user. (~10 per second) - It simultaneously begins sending JSON-encoded
pageview
events directly to kafka. (~750 per second)
As the database writes occur, Debezium/Kafka stream the changes out of MySQL to Confluent Cloud Kafka. Materialize subscribes to this change feed and maintains our materialized views with the incoming data––materialized views typically being some report whose information we're regularly interested in viewing.
For example, if we wanted real time statistics of total pageviews
and orders by item, Materialize could maintain that report as a materialized view. And, in fact,
that is exactly what this demo will show.
M1 Mac Warning |
---|
This demo relies heavily on Docker images from several different sources, we haven't tested it on Apple M1 Silicon yet, which is known to have Docker compatibility issues. |
If you're on a Mac laptop, you might want to increase the amount of memory available to Docker Engine.
- From the Docker Desktop menu bar app, select Preferences.
- Go to the Advanced tab.
- Select at least 8 GiB of Memory.
- Click Apply and Restart.
- Sign up for Materialize
- Create a Confluent Cloud account, you can sign up for a free account here.
- Make sure you have Docker and Docker Compose installed.
-
Clone this repo and navigate to the directory by running:
git clone https://github.com/MaterializeInc/demos.git cd demos/ecommerce
-
Copy the
.env.example
file to.env
:cp .env.example .env
-
Update the details in the
.env
file:
# Confluent Cloud Details
CONFLUENT_BROKER_HOST=
CONFLUENT_API_KEY=
CONFLUENT_API_SECRET=
CONFLUENT_SCHEMA_REGISTRY_URL=
CONFLUENT_SCHEMA_REGISTRY_API_KEY=
CONFLUENT_SCHEMA_REGISTRY_API_SECRET=
- Go to your Confluent Cloud dashboard, and create the following topics:
mysql.shop.purchases
mysql.shop.items
mysql.shop.users
pageviews
Note: If you don't create the topics, the demo will not work, and you will see the following error in the Debezium logs:
Error while fetching metadata with correlation id … : {<topic>=UNKNOWN_TOPIC_OR_PARTITION}
Alternatively, you can set the auto.create.topics.enable
option for your cluster to true
, as described in the Confluent documentation. This option is disabled by default, but once enabled, it will allow you to automatically create topics when they are referenced in a Kafka producer or consumer.
-
Bring up the Docker Compose containers in the background:
docker compose up -d
This may take several minutes to complete the first time you run it. If all goes well, you'll have everything running in their own containers, with Debezium configured to ship changes from MySQL into Confluent Cloud.
-
Connect to Materialize
If you already have psql
installed on your machine, use the provided connection string to connect:
Example:
psql "postgres://user%40domain.com@materialize_host:6875/materialize"
Otherwise, you can find the steps to install and use your CLI of choice under Supported tools.
-
In
psql
, start by securely storing your Confluent Cloud credentials for the Kafka cluster and Schema Registry as secrets:CREATE SECRET confluent_username AS '<your-username>'; CREATE SECRET confluent_password AS '<your-password>'; CREATE SECRET schema_registry_username AS '<your-username>'; CREATE SECRET schema_registry_password AS '<your-password>';
-
Create the Kafka and Schema Registry connections:
-- Create Kafka connection CREATE CONNECTION confluent_cloud TO KAFKA ( BROKER '<your_broker>', SASL MECHANISMS = 'PLAIN', SASL USERNAME = SECRET confluent_username, SASL PASSWORD = SECRET confluent_password ); -- Create Registry connection CREATE CONNECTION schema_registry TO CONFLUENT SCHEMA REGISTRY ( URL '<your_schema_registry>', USERNAME = SECRET schema_registry_username, PASSWORD = SECRET schema_registry_password );
-
Now that you have your secrets and connections created, define all of the tables in
mysql.shop
as Kafka sources:CREATE SOURCE purchases FROM KAFKA CONNECTION confluent_cloud (TOPIC 'mysql.shop.purchases') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry ENVELOPE DEBEZIUM WITH (SIZE = '3xsmall'); CREATE SOURCE items FROM KAFKA CONNECTION confluent_cloud (TOPIC 'mysql.shop.items') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry ENVELOPE DEBEZIUM WITH (SIZE = '3xsmall'); CREATE SOURCE users FROM KAFKA CONNECTION confluent_cloud (TOPIC 'mysql.shop.users') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry ENVELOPE DEBEZIUM WITH (SIZE = '3xsmall');
Because these sources are pulling message schema data from the registry, Materialize knows the column types to use for each attribute.
-
We'll also want to create a JSON-formatted source for the
pageviews
:
CREATE SOURCE json_pageviews
FROM KAFKA CONNECTION confluent_cloud (TOPIC 'pageviews')
FORMAT BYTES
WITH (SIZE = '3xsmall');
With JSON-formatted messages, we don't know the schema so the JSON is pulled in as raw bytes and we still need to CAST data into the proper columns and types. We'll show that in the step below.
Now if you run SHOW SOURCES;
in the CLI, you should see the four sources we created:
materialize=> SHOW SOURCES;
name
----------------
items
json_pageviews
purchases
users
(4 rows)
materialize=>
-
Next, you can set up a cluster (logical compute) with one
xsmall
replica (physical compute):CREATE CLUSTER ecommerce_demo REPLICAS (xsmall_replica (SIZE 'xsmall')); SET CLUSTER = ecommerce_demo;
-
Next, we will create our first (non-materialized) view:
CREATE VIEW v_pageviews AS
SELECT
(data->'user_id')::int AS user_id,
-- Extract pageview type and target ID from URL
regexp_match(data->>'url', '/(products|profiles)/')[1] AS pageview_type,
(regexp_match(data->>'url', '/(?:products|profiles)/(\d+)')[1])::int AS target_id,
data->>'channel' AS channel,
(data->>'received_at')::bigint AS received_at
FROM (SELECT CONVERT_FROM(data, 'utf8')::jsonb AS data FROM json_pageviews);
This view doesn’t store the results of the query, but simply provides an alias for the embedded SELECT
statement and allows us to shape the pageview data into the format we need:
-
We are converting the incoming data from raw bytes to jsonb:
SELECT CONVERT_FROM(data, 'utf8')::jsonb AS data FROM json_pageviews;
-
We are using Postgres JSON notation (
data->'url'
), type casts (::string
) and [regexp_match](https://materialize.com/docs/sql/functions/#string-func:~:text=regexp_match(haystack) function to extract only the item_id from the raw pageview URL.(regexp_match((data->'url')::string, '/products/(\d+)')[1])::int AS target_id,
-
Depending on your setup, in some cases you might want to use a materialized view: a view that is persisted in durable storage and incrementally updated as new data arrives. We can use the
v_pageviews
view as a base to create a materialized view summarizingpageviews
by item and channel:
CREATE MATERIALIZED VIEW item_pageviews AS
SELECT target_id AS item_id,
channel,
COUNT(*) as pageviews
FROM v_pageviews
WHERE pageview_type = 'products'
GROUP BY item_id, channel;
Now if you select from this materialized view, you should see data populating:
SELECT * FROM item_pageviews ORDER BY pageviews DESC LIMIT 10;
If you re-run this statement a few times, you should see the pageview
counts changing as new data comes in and results get computed on the fly.
- Let's create some more views:
Purchase Summary:
CREATE MATERIALIZED VIEW purchase_summary AS
SELECT
item_id,
SUM(purchase_price) as revenue,
COUNT(id) AS orders,
SUM(quantity) AS items_sold
FROM purchases
GROUP BY item_id;
Item Summary: (Using purchase summary and pageview summary internally)
CREATE MATERIALIZED VIEW item_summary AS
SELECT
items.name AS item_name,
items.category AS item_category,
SUM(purchase_summary.items_sold) as items_sold,
SUM(purchase_summary.orders) as orders,
SUM(purchase_summary.revenue) as revenue,
SUM(item_pageviews.pageviews) as pageviews,
SUM(purchase_summary.orders) / SUM(item_pageviews.pageviews)::double AS conversion_rate
FROM items
JOIN purchase_summary ON purchase_summary.item_id = items.id
JOIN item_pageviews ON item_pageviews.item_id = items.id
GROUP BY item_name, item_category;
This view shows some of the JOIN capabilities of Materialize. We're joining our two previous views with items to create a summary of purchases, pageviews, and conversion rates.
Indexes assemble and incrementally maintain a query’s results updated in memory within a cluster, which speeds up query time:
CREATE INDEX item_summary_idx ON item_summary (item_name);
If you select from item_summary
you can see the results in real-time:
SELECT * FROM item_summary ORDER BY conversion_rate DESC LIMIT 10;
Remaining Stock: This view joins items and all purchases created after an item's inventory was updated, and creates a column that subtracts quantity_sold
from the total inventory
to get a live in-stock count.
CREATE MATERIALIZED VIEW remaining_stock AS
SELECT
items.id AS item_id,
MAX(items.inventory) - SUM(purchases.quantity) AS remaining_stock
FROM items
JOIN purchases ON purchases.item_id = items.id
AND purchases.created_at > items.inventory_updated_at
GROUP BY items.id;
Trending Items: Here, we are doing a bit of a hack because Materialize doesn't yet support window functions like RANK
. So instead we are doing a self join on purchase_summary
and counting up the items with more purchases than the current item to get a basic "trending" rank datapoint.
CREATE MATERIALIZED VIEW trending_items AS
SELECT
p1.item_id,
(
SELECT COUNT(*)
FROM purchase_summary p2
WHERE p2.items_sold > p1.items_sold
) as trend_rank
FROM purchase_summary p1;
Lastly, let's bring the trending items and remaining stock views together to create a view that a user-facing application might read from:
CREATE MATERIALIZED VIEW item_metadata AS
SELECT
rs.item_id as id, rs.remaining_stock, ti.trend_rank
FROM remaining_stock rs
JOIN trending_items ti ON ti.item_id = rs.item_id;
Now if you run SHOW VIEWS;
you should see all the views we just created:
materialize=> SHOW VIEWS;
name
------------------
item_metadata
item_pageviews
item_summary
purchase_summary
remaining_stock
trending_items
(6 rows)
-
To see the results change in real-time let's use
SUBSCRIBE
instead of vanillaSELECT
:COPY ( SUBSCRIBE ( SELECT * FROM trending_items )) TO STDOUT;
That's it! You've created some views that you can visualize in a BI tool like Metabase!
Sinks let you stream data out of Materialize, using either sources or views.
Let's create a view that flags "high-value" users that have spent $10k or more in total.
CREATE MATERIALIZED VIEW high_value_users AS
SELECT
users.id,
users.email,
SUM(purchase_price * quantity)::int AS lifetime_value,
COUNT(*) as purchases
FROM users
JOIN purchases ON purchases.user_id = users.id
GROUP BY 1,2
HAVING SUM(purchase_price * quantity) > 10000;
and then a sink to stream updates to this view back out to Kafka:
CREATE SINK high_value_users_sink
FROM high_value_users
INTO KAFKA CONNECTION confluent_cloud (TOPIC 'high-value-users-sink')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry
ENVELOPE DEBEZIUM
WITH (SIZE = '3xsmall');
Now if you go to the Confluent Cloud UI and navigate to the high-value-users-sink
topic, you should see data streaming in.
We can start simple with a materialized view that aggregates all users that have not completed a purchase:
CREATE MATERIALIZED VIEW incomplete_purchases AS
SELECT
users.id AS user_id,
users.email AS email,
users.is_vip AS is_vip,
purchases.item_id,
purchases.status,
purchases.quantity,
purchases.purchase_price,
purchases.created_at,
purchases.updated_at
FROM users
JOIN purchases ON purchases.user_id = users.id
WHERE purchases.status = 0;
Next create a materialized view that contains all users that are no longer browsing the site:
For the demo we would use 3 minutes as the idle time. But in a real world we could use a larger value like 30 minutes for example.
CREATE MATERIALIZED VIEW inactive_users_last_3_mins AS
SELECT
user_id,
date_trunc('minute', to_timestamp(received_at)) as visited_at_minute
FROM v_pageviews
WHERE
mz_now() >= (received_at*1000 + 180000)::numeric
GROUP BY 1,2;
We can check that it's working by querying the view:
SELECT * FROM inactive_users_last_3_mins LIMIT 5;
Finally, we can create a materialized view that contains all incomplete orders for the inactive users:
CREATE MATERIALIZED VIEW abandoned_cart AS
SELECT
incomplete_purchases.user_id,
incomplete_purchases.email,
incomplete_purchases.item_id,
incomplete_purchases.purchase_price,
incomplete_purchases.status
FROM incomplete_purchases
JOIN inactive_users_last_3_mins ON inactive_users_last_3_mins.user_id = incomplete_purchases.user_id
GROUP BY 1, 2, 3, 4, 5;
You can create a Kafka SINK or you can use SUBSCRIBE
to subscribe to the changes of the abandoned_cart
view:
COPY (
SUBSCRIBE (
SELECT * FROM abandoned_cart
)
) TO STDOUT;
-
In a browser, go to localhost:3030 (or <IP_ADDRESS:3030> if running on a VM).
-
Click Let's get started.
-
Complete the first set of fields asking for your email address. This information isn't crucial for anything but does have to be filled in.
-
On the Add your data page, fill in the following information:
Field Enter... Database Materialize Name shop Host Your Materialize Host Port 6875 Database name materialize Database username user%40domain.com Database password Your Materialize App Password. -
Proceed past the screens until you reach your primary dashboard.
-
Click Ask a question
-
Click Native query.
-
From Select a database, select shop.
-
In the query editor, enter:
SELECT * FROM item_summary ORDER BY conversion_rate DESC;
-
You can save the output and add it to a dashboard, once you've drafted a dashboard you can manually set the refresh rate to 1 second by adding
#refresh=1
to the end of the URL, here is an example of a real-time dashboard of top-viewed items and top converting items:
You now have Materialize doing real-time materialized views on a changefeed from a database and pageview events from Kafka. You have complex multi-layer views doing JOIN's and aggregations in order to distill the raw data into a form that's useful for downstream applications. In Metabase, you have the ability to create dashboards and reports based on this data.
You have som infrastructure running in Docker containers, so don't forget to run docker compose down
to shut everything down!