Businesses use customer data platforms (CDPs) to collect customer-centric data into a single cohesive place where it can be served and acted upon to improve the product, customer experience, marketing, sales and more.
Existing CDPs mostly fall into one of two categories:
CDP as a Service | CDP on a Warehouse | |
---|---|---|
Examples | Segment, mParticle | Rudderstack, Snowplow + Hightouch |
Data/Logic Model | Customer-centric schema, defined by provider | Anything in the warehouse, defined in SQL |
Data Latency | 5 seconds - 1 minute | 10min - 1 day |
Typically Owned By… | Marketing Ops Team | Data Team |
Pricing Model | Based on user counts and feature usage | Based on data volumes |
Each has trade-offs that vary in significance from company to company, but one thing is clear: There is no real-time CDP that can be owned and managed by the data team.
That’s where Materialize comes in.
Materialize empowers the data team to manage CDP data views with sub-second latency — all using standard SQL and dbt workflows.
The data team is a better owner of the business logic behind the CDP: They’re already working with the same data to produce analytical reporting, they’ve built an expertise on the various nuances and quirks within their business’s data, and they are already updating SQL logic when things change.
As for the benefits of real-time, the marginal improvements of ratcheting down CDP latency are obvious: Fewer discrepancies between data and reality mean less wasted Sales effort, fewer mistargeted ads and emails, less time between important business events and alerts and activity that need to happen as a result of them.
Less obvious are powerful new capabilities created when you think of it less as “real-time” and more as “a view of your customers that is always up-to-date”.
New Capabilities:
- CDP as an event-driven workflow enrichment API - Imagine your security team wants to optimize an important process like fraud or abuse detection and offboarding. Lack of context is a key driver of big, high-impact failures both in terms of false positives (flagging a good customer) and false negatives (missing a big fraudster.) They can query the CDP from their automation workflows to pull in a wealth of contextual information about a customer with up-to-the second accuracy.
- CDP as a real-time feature store - Both CDPs and Feature Stores have a user dimension table at their heart. Expanding your real-time CDP to serve as a real-time feature store is a matter of adding columns (features.)
- Push actions from the CDP - Materialize is an event-driven database: The incoming events trigger computation, and updates can be pushed to downstream systems. SQL can be written to filter for a specific condition to be met, and, when reached, an event can be pushed to trigger action somewhere else.
- Push actions based on absence of activity - A particularly useful subset of the above… Write SQL to filter on lack of activity like “Items in cart, no pageviews for 30 minutes”. This type of feature requires inefficient polling in most systems, but is a straightforward SQL filter in Materialize.
The Plan: A Real-Time CDP on Materialize
In the rest of this article, we’ll use Materialize as the warehouse component in the “CDP on a Warehouse” model: We’ll join and aggregate data from product analytics and our primary application database into a single unified and up-to-date view with one row per customer, and we’ll serve that to downstream tools via the built-in Postgres wire-compatibility.
Data Sources:
- Application DB: We’re using the PostgreSQL Source to get access to a live replica of our Users table (and any other relevant tables).
- Analytics: In our example, Segment is handling the collection of user-centric product analytics events, but it could be easily swapped for Snowplow, Rudderstack, etc…
Transformations:
We’re building a single materialized view cdp_users
with many columns. You could call this a users dimension table. The data grain is users: there is one row per user.
Outputs / Read API’s
- SQL Query Interface: Applications can connect via PostgreSQL wire protocol and query the view. This can be extended with a script into an internal HTTP API.
- Data Tools: By presenting as Postgres, other data tools like Cube, Hightouch, and Hex can be used to read from Materialize.
1. Connect the Input Streams
To start, we’ll need Segment and Postgres streaming data into Materialize:
- Use the Segment to Materialize guide, to get analytics events flowing.
- Use the Postgres CDC guide to connect Materialize to your primary DB via a replication slot.
Once the above prerequisites are complete, we should have a materialized view of analytics data called segment_events_json
and a users
view matching the schema of the users table from the upstream Postgres DB.
2. Write the SQL
With our source data now streaming in, it’s time to transform it into a single view with one row per user. We’ll use intermediate views to keep our logic clean, but everything will eventually flow into a single real-time CDP view called cdp_users
.
Identity Resolution
A core component of a CDP is identity resolution: People interact with your product from multiple devices, and you need a way of tying them all to the same correct user ID. Typically, this happens via anonymousId
’s set per device, AND an identify
event that links the anonymousId
to a userId
when the user logs in to your product.
To facilitate identity resolution, create a one-to-many “identities” join table:
CREATE VIEW stg_analytics_user_identities AS
SELECT
data->>'anonymousId' as anonymousId,
data->>'userId' as userId
FROM segment_events_json
WHERE
data->>'type' == 'identify'
AND data ? 'anonymousId'
AND data ? 'userId'
GROUP BY 1;
important
Note: We’re using Postgres json
notation to reference and check for presence of keys in the analytics event, which is still a single column of jsonb
type.
The userId
is a shared ID also used in the primary DB.
We can join through this table to aggregate analytics data by user.
Preparing Analytics Data
Let’s do a few types of transformations on the feed of product analytics events to get a feel for the capabilities of Materialize.
Here’s a view that aggregates the last 30 days of pageviews by user, using the join table we created above:
CREATE VIEW stg_user_analytics_last_30 AS
SELECT
users.uuid,
COUNT(*) as pageview_count_last_30_days,
...
FROM users
JOIN stg_analytics_user_identities ON users.uuid = stg_analytics_user_identities.userId
JOIN segment_events_json ON
data->>'anonymousId' = stg_analytics_user_identities.anonymousId
AND data->>'type' == 'page'
-- Filter to last 30 days
AND MZ_NOW() < data->>'receivedAt'::BIGINT + (30 * 86400000)
GROUP BY 1;
The only special thing we’re doing here is the last join condition using MZ_NOW()
.
This gives us a way to specify a sliding window of pageviews to evaluate, which is important because it allows us to limit the resources used maintaining the view.
Without some kind of temporal filter, the resources necessary to maintain this kind of view would continue to grow over time.
As a second example, here is a view that pulls in the last five pageviews by user:
CREATE VIEW stg_user_analytics_recent_pageviews AS
SELECT * FROM
(SELECT DISTINCT users.uuid as uuid FROM users) grp,
LATERAL (
SELECT data->'properties' as pageview FROM segment_events_json
WHERE
data->>'type' == 'page'
AND data->>'userId' = grp.uuid
ORDER BY data->>'receivedAt' DESC LIMIT 5
);
The LATERAL
join syntax above is used as part of a Top-K by group pattern, getting us the five most recent pageviews by user in a way that is more performant than subqueries, self-joins or WINDOW
functions.
The resulting view has up to five rows per user, if we want just a single row per user we can use a jsonb_agg
function to select all pageviews out as a JSON array.
Joining with Users
Finally, let’s build the cdp_users
view where we bring everything together:
CREATE VIEW cdp_users AS
SELECT
users.*,
stg_user_analytics_last_30.*,
jsonb_agg(stg_user_analytics_recent_pageviews.pageview) as recent_pageviews
FROM users
LEFT JOIN stg_user_analytics_last_30 ON
stg_user_analytics_last_30.uuid = users.uuid
LEFT JOIN stg_user_analytics_recent_pageviews ON
stg_user_analytics_recent_pageviews.uuid = users.uuid
GROUP BY users.uuid;
If we were to SELECT
from this view at this point, we’d get an answer, but performance would be very poor because we aren’t proactively maintaining the results in an index. Materialize would create a temporary dataflow, calculate the answer, then throw the whole thing away.
To make reads performant, let’s create an INDEX
:
CREATE INDEX cdp_users_uuid_idx ON cdp_users (uuid);
This command takes the entire hierarchy (or DAG) of views, converts them to dataflows, processes all existing data through the dataflows to compute the results, and continues to incrementally compute the results in memory as new analytics events and updates to the users table stream in.
At this point, running: SELECT * FROM cdp_users WHERE uuid = 'ABC123';
returns up-to-date information in a fraction of a second.
In a traditional data warehouse, you would encounter two hard blockers to doing the same:
- Data Latencies - Source data can only be batch loaded in at 5 minute intervals at best, and even that is difficult. After loading data, batch SQL transformations need to run, adding to the latency.
- Query Performance - Lookup queries on traditional OLAP databases are costly and slow. On Materialize they are cheap and fast because results are maintained in memory. No computation is done on lookup queries.
3. Connect to downstream tools.
We mentioned a couple ways of reading from our CDP view at the beginning, let’s explore those now:
Postgres Drivers
Anything that speaks Postgres wire-protocol should, given the right credentials, be able to query our view.
Here is a Node.JS example:
const { Client } = require('pg');
const client = new Client({
user: MATERIALIZE_USERNAME,
password: MATERIALIZE_PASSWORD,
host: MATERIALIZE_HOST,
port: 6875,
database: 'materialize',
ssl: true
});
async function main() {
await client.connect();
const res = await client.query("SELECT * FROM cdp_users WHERE uuid = 'ABC123'");
console.log(res.rows);
}
main();
From here, we can extend this script into an internal HTTP API that can be used to handle lookup queries from other services. We can also Stream updates out of Materialize using the SUBSCRIBE
primitive in place of SELECT
.
Other Tools
Thanks to the Postgres wire-protocol compatibility, we could also connect data SaaS tools and frameworks that integrate with Postgres.
- We could connect to Materialize from Cube.js and get more advanced Auth, plus REST and GraphQL API’s out-of-the-box.
- We could connect Hightouch or Hex to Materialize using their built-in Postgres connectors.
Where to go from here
We now have a view that joins multiple sources of customer data together in real time and serves it to downstream applications and integrations. The individual ideas here are nothing revolutionary: CDP managed in SQL and real-time CDP. The interesting and new capability is the combination of the two: CDP managed in SQL AND operating in real-time.
To take this idea a step further and learn more about Materialize primitives, we could:
- Manage the entire project in dbt using the dbt-Materialize adapter.
- Create internal or user-facing alerts and subscribe to notifications in real-time.
- Extend the view to create a real-time feature store in Materialize.
If you’re interested in getting access to Materialize to prototype your own customer data platform use cases, register for access and we’ll be in touch. Or, join us in the community to discuss your ideas and get input from Materialize experts.