Real-Time Customer Data Platform Views on Materialize

Let's demonstrate the unique features of Materialize by building the core functionality of a customer data platform.

Andy Hattemer
Andy HattemerGTM

Businesses use customer data platforms (CDPs) to collect customer-centric data into a single cohesive place where it can be served and acted upon to improve the product, customer experience, marketing, sales and more.

Existing CDPs mostly fall into one of two categories:

CDP as a ServiceCDP on a Warehouse
ExamplesSegment, mParticleRudderstack, Snowplow + Hightouch
Data/Logic ModelCustomer-centric schema, defined by providerAnything in the warehouse, defined in SQL
Data Latency5 seconds - 1 minute10min - 1 day
Typically Owned By...Marketing Ops TeamData Team
Pricing ModelBased on user counts and feature usageBased on data volumes

Each has trade-offs that vary in significance from company to company, but one thing is clear: There is no real-time CDP that can be owned and managed by the data team.

That's where Materialize comes in.

Materialize empowers the data team to manage CDP data views with sub-second latency — all using standard SQL and dbt workflows.

The data team is a better owner of the business logic behind the CDP: They're already working with the same data to produce analytical reporting, they've built an expertise on the various nuances and quirks within their business's data, and they are already updating SQL logic when things change.

As for the benefits of real-time, the marginal improvements of ratcheting down CDP latency are obvious: Fewer discrepancies between data and reality mean less wasted Sales effort, fewer mistargeted ads and emails, less time between important business events and alerts and activity that need to happen as a result of them.

Less obvious are powerful new capabilities created when you think of it less as "real-time" and more as "a view of your customers that is always up-to-date".

New Capabilities:

  1. CDP as an event-driven workflow enrichment API - Imagine your security team wants to optimize an important process like fraud or abuse detection and offboarding. Lack of context is a key driver of big, high-impact failures both in terms of false positives (flagging a good customer) and false negatives (missing a big fraudster.) They can query the CDP from their automation workflows to pull in a wealth of contextual information about a customer with up-to-the second accuracy.
  2. CDP as a real-time feature store - Both CDPs and Feature Stores have a user dimension table at their heart. Expanding your real-time CDP to serve as a real-time feature store is a matter of adding columns (features.)
  3. Push actions from the CDP - Materialize is an event-driven database: The incoming events trigger computation, and updates can be pushed to downstream systems. SQL can be written to filter for a specific condition to be met, and, when reached, an event can be pushed to trigger action somewhere else.
  4. Push actions based on absence of activity - A particularly useful subset of the above... Write SQL to filter on lack of activity like "Items in cart, no pageviews for 30 minutes". This type of feature requires inefficient polling in most systems, but is a straightforward SQL filter in Materialize.

    For Example: Alcohol delivery service Drizly used SQL filters on absence of activity to efficently trigger user-facing cart abandonment notifications from Materialize. Read the full story here.

The Plan: A Real-Time CDP on Materialize

In the rest of this article, we'll use Materialize as the warehouse component in the "CDP on a Warehouse" model: We'll join and aggregate data from product analytics and our primary application database into a single unified and up-to-date view with one row per customer, and we'll serve that to downstream tools via the built-in Postgres wire-compatibility.

Here's a quick diagram

Data Sources:

  1. Application DB: We're using the PostgreSQL Source to get access to a live replica of our Users table (and any other relevant tables).
  2. Analytics: In our example, Segment is handling the collection of user-centric product analytics events, but it could be easily swapped for Snowplow, Rudderstack, etc...

Transformations:

We're building a single materialized view cdp_users with many columns. You could call this a users dimension table. The data grain is users: there is one row per user.

Outputs / Read API's

  1. SQL Query Interface: Applications can connect via PostgreSQL wire protocol and query the view. This can be extended with a script into an internal HTTP API.
  2. Data Tools: By presenting as Postgres, other data tools like Cube, Hightouch, and Hex can be used to read from Materialize.

1. Connect the Input Streams

To start, we'll need Segment and Postgres streaming data into Materialize:

  1. Use the Segment to Materialize guide, to get analytics events flowing.
  2. Use the Postgres CDC guide to connect Materialize to your primary DB via a replication slot.

Once the above prerequisites are complete, we should have a materialized view of analytics data called segment_events_json and a users view matching the schema of the users table from the upstream Postgres DB.

2. Write the SQL

With our source data now streaming in, it's time to transform it into a single view with one row per user. We'll use intermediate views to keep our logic clean, but everything will eventually flow into a single real-time CDP view called cdp_users.

Identity Resolution

A core component of a CDP is identity resolution: People interact with your product from multiple devices, and you need a way of tying them all to the same correct user ID. Typically, this happens via anonymousId's set per device, AND an identify event that links the anonymousId to a userId when the user logs in to your product.

To facilitate identity resolution, create a one-to-many "identities" join table:

CREATE VIEW stg_analytics_user_identities AS
  SELECT
    data->>'anonymousId' as anonymousId,
    data->>'userId' as userId
  FROM segment_events_json
  WHERE
    data->>'type' == 'identify'
    AND data ? 'anonymousId'
    AND data ? 'userId'
  GROUP BY 1;

Note: We're using Postgres json notation to reference and check for presence of keys in the analytics event, which is still a single column of jsonb type.

The userId is a shared ID also used in the primary DB. We can join through this table to aggregate analytics data by user.

Preparing Analytics Data

Let's do a few types of transformations on the feed of product analytics events to get a feel for the capabilities of Materialize.

Here's a view that aggregates the last 30 days of pageviews by user, using the join table we created above:

CREATE VIEW stg_user_analytics_last_30 AS
  SELECT
    users.uuid,
    COUNT(*) as pageview_count_last_30_days,
    ...
  FROM users
  JOIN stg_analytics_user_identities ON users.uuid = stg_analytics_user_identities.userId
  JOIN segment_events_json ON
    data->>'anonymousId' = stg_analytics_user_identities.anonymousId
    AND data->>'type' == 'page'
    -- Filter to last 30 days
    AND MZ_NOW() < data->>'receivedAt'::BIGINT + (30 * 86400000)
  GROUP BY 1;

The only special thing we're doing here is the last join condition using MZ_NOW(). This gives us a way to specify a sliding window of pageviews to evaluate, which is important because it allows us to limit the resources used maintaining the view. Without some kind of temporal filter, the resources necessary to maintain this kind of view would continue to grow over time.

As a second example, here is a view that pulls in the last five pageviews by user:

CREATE VIEW stg_user_analytics_recent_pageviews AS
  SELECT * FROM
    (SELECT DISTINCT users.uuid as uuid FROM users) grp,
    LATERAL (
        SELECT data->'properties' as pageview FROM segment_events_json
        WHERE
          data->>'type' == 'page'
          AND data->>'userId' = grp.uuid
        ORDER BY data->>'receivedAt' DESC LIMIT 5
    );

The LATERAL join syntax above is used as part of a Top-K by group pattern, getting us the five most recent pageviews by user in a way that is more performant than subqueries, self-joins or WINDOW functions.

The resulting view has up to five rows per user, if we want just a single row per user we can use a jsonb_agg function to select all pageviews out as a JSON array.

Joining with Users

Finally, let's build the cdp_users view where we bring everything together:

CREATE VIEW cdp_users AS
  SELECT
    users.*,
    stg_user_analytics_last_30.*,
    jsonb_agg(stg_user_analytics_recent_pageviews.pageview) as recent_pageviews
  FROM users
  LEFT JOIN stg_user_analytics_last_30 ON
    stg_user_analytics_last_30.uuid = users.uuid
  LEFT JOIN stg_user_analytics_recent_pageviews ON
    stg_user_analytics_recent_pageviews.uuid = users.uuid
  GROUP BY users.uuid;

If we were to SELECT from this view at this point, we'd get an answer, but performance would be very poor because we aren't proactively maintaining the results in an index. Materialize would create a temporary dataflow, calculate the answer, then throw the whole thing away.

To make reads performant, let's create an INDEX:

CREATE INDEX cdp_users_uuid_idx ON cdp_users (uuid);

This command takes the entire hierarchy (or DAG) of views, converts them to dataflows, processes all existing data through the dataflows to compute the results, and continues to incrementally compute the results in memory as new analytics events and updates to the users table stream in.

At this point, running: SELECT * FROM cdp_users WHERE uuid = 'ABC123'; returns up-to-date information in a fraction of a second.

In a traditional data warehouse, you would encounter two hard blockers to doing the same:

  1. Data Latencies - Source data can only be batch loaded in at 5 minute intervals at best, and even that is difficult. After loading data, batch SQL transformations need to run, adding to the latency.
  2. Query Performance - Lookup queries on traditional OLAP databases are costly and slow. On Materialize they are cheap and fast because results are maintained in memory. No computation is done on lookup queries.

3. Connect to downstream tools.

We mentioned a couple ways of reading from our CDP view at the beginning, let's explore those now:

Postgres Drivers

Anything that speaks Postgres wire-protocol should, given the right credentials, be able to query our view.

Here is a Node.JS example:

const { Client } = require('pg');

const client = new Client({
  user: MATERIALIZE_USERNAME,
  password: MATERIALIZE_PASSWORD,
  host: MATERIALIZE_HOST,
  port: 6875,
  database: 'materialize',
  ssl: true
});

async function main() {
  await client.connect();
  const res = await client.query("SELECT * FROM cdp_users WHERE uuid = 'ABC123'");
  console.log(res.rows);
}

main();

From here, we can extend this script into an internal HTTP API that can be used to handle lookup queries from other services. We can also Stream updates out of Materialize using the SUBSCRIBE primitive in place of SELECT.

Other Tools

Thanks to the Postgres wire-protocol compatibility, we could also connect data SaaS tools and frameworks that integrate with Postgres.

Where to go from here

We now have a view that joins multiple sources of customer data together in real time and serves it to downstream applications and integrations. The individual ideas here are nothing revolutionary: CDP managed in SQL and real-time CDP. The interesting and new capability is the combination of the two: CDP managed in SQL AND operating in real-time.

To take this idea a step further and learn more about Materialize primitives, we could:

  1. Manage the entire project in dbt using the dbt-Materialize adapter.
  2. Create internal or user-facing alerts and subscribe to notifications in real-time.
  3. Extend the view to create a real-time feature store in Materialize.

If you're interested in getting access to Materialize to prototype your own customer data platform use cases, register for access and we'll be in touch. Or, join us in the community to discuss your ideas and get input from Materialize experts.

More Articles

Key Concept

How and why is Materialize compatible with PostgreSQL?

As a streaming database, Materialize is fundamentally different on the inside, but it's compatible with PostgreSQL in a few important ways.

Andy Hattemer

Oct 18, 2022

Company News

Materialize Makes Using Real-Time Data As Simple As Batch with New Distributed Streaming Database

Developers can now leverage real-time data using standard ANSI SQL, with new functionality including elastic storage separated from compute, strict-serializability, active replication and horizontal scalability.

Oct 3, 2022

Product Update

Announcing the next generation of Materialize

Today, we’re excited to announce a product that we feel is transformational: a persistent, scalable, cloud-native Materialize.

Arjun Narayan
Frank McSherry

Oct 3, 2022

Join the Materialize Community

Join hundreds of other Materialize users and connect directly with our engineers.

Join the Community

© 2022 Materialize, Inc. Terms of Service