How to Connect Segment to Materialize

Connect a real-time stream of user-centric events from Segment to Materialize, using Kafka as the intermediary.

Segment is a customer data platform that operates as a sort of ingestion and routing middleware for real-time user-centric events.

Materialize is a streaming database that allows you to take the same SQL queries you run on traditional databases and create incremetally updated materialized views (i.e. standing SQL queries that update themselves in real-time).

When integrated, Segment and Materialize enable data teams to build real-time customer data API's, segmentation, automation and alerting.

This guide details how to send a live feed of events from Segment to Materialize.

Architecture

Currently, the best option is to use the Segment Webhooks destination, which allows us to point the real-time stream of Segment events at any HTTP endpoint.

That presents a challenge: Materialize doesn't currently have a Webhook (HTTP) Source. Although one may be added in the future, there's a tracking issue for it here. We need to use a connector service that accepts incoming HTTP webhooks and maps the payloads to either Kafka or PostgreSQL, the two sources that currently have production-level support on Materialize.

Note: Materialize is Postgres wire-compatible, but we can't use the Segment Postgres destination because it is a Warehouse Destination: Instead of streaming updates, Warehouse destinations sync data in bulk at regular 6-24 hour intervals.

Upstash Kafka

There are several intermediate services that can help map webhooks to Materialize, the easiest option we have found is to use Upstash Kafka. Upstash offers a cloud-based managed Kafka product with a generous free-tier, very reasonable pay-per-message pricing, and, most importantly, a simple webhook endpoint for every topic.

Segment to Materialize Architecture Diagram

Alternative: AWS API Gateway + Lambda Function

If your org is unable to use Upstash, or you're already using another Kafka provider, you can also use an AWS Lambda serverless function that is publicly addressable via an API Gateway configuration to route Segment events into your Kafka cluster that way.

For reference on configuring API Gateway and Lambda to produce messages to Kafka, see Creating a serverless Apache Kafka publisher using AWS Lambda. The only additional configuration step: The public HTTP endpoint from your API Gateway will need to be added as a webhook destination from Segment.

Configure Kafka

For the rest of this example, we'll use Upstash Kafka as the adapter between Segment and Materialize, it is serving as the message buffer that takes in the JSON payloads of all Segment events via an HTTP webhook, places them on a single partition topic with seven day retention, and allows them to be consumed via Materialize Kafka Source.

Note: In the latest version of Materialize, Kafka source records are immediately saved in storage, so seven day message retention in Kafka is not required, but it's a useful redundancy feature in the event of unforeseen issues.

Create a free Upstash account, then create a new Kafka Cluster and Topic. (For more guidance, see Upstash Docs)

Save the Webhook URL for later. (Just the URL, not the entire curl statement.)

Upstash Kafka Webhook URL

Save the Endpoint, Username and Password as well.

Upstash Kafka Webhook URL

Configure Segment to Write to Kafka

Back in the Segment admin panel, create a new destination and select Webhooks as the destination type:

Add the Upstash Webhook URL in the Destination Settings page in Segment:

Confirm that events are flowing by going back to the cluster in the Upstash console, navigating to the topics tab and clicking on the topic name to confirm that the topic has more than zero messages.

Configure Materialize to Consume from Kafka

Now that we are streaming events from Segment to Upstash, we need to create a streaming input SOURCE in Materialize that consumes messages from Kafka:

Current Materialize Version
CREATE SECRET upstash_kafka_password AS '<UPSTASH_KAFKA_BROKER_PASSWORD>';

CREATE CONNECTION upstash_kafka_connection TO KAFKA (
    BROKER '<UPSTASH_KAFKA_BROKER_URL>',
    SASL MECHANISMS = 'SCRAM-SHA-256',
    SASL USERNAME = '<UPSTASH_KAFKA_BROKER_USERNAME>',
    SASL PASSWORD = SECRET kafka_password
);

CREATE SOURCE upstash_segment
  FROM KAFKA CONNECTION upstash_kafka_connection (TOPIC '<TOPIC_NAME>')
  FORMAT BYTES
  WITH (SIZE = '3xsmall');

In the latest versions of Materialize, we create separate reusable SECRET and CONNECTION objects that are referenced in the SOURCE. Additionally, upon creation of a SOURCE, Materialize immediately begins consuming events and saving them in storage.

FORMAT BYTES in the SQL above indicates Materialize is consuming the Segment events as raw bytes. The final step is to convert and parse them as JSON.

If you're using the Materialize Binary, you'll need this syntax instead:

Materialize Binary
CREATE SOURCE upstash_segment
  FROM KAFKA BROKER '<UPSTASH_KAFKA_BROKER_URL>' TOPIC '<TOPIC_NAME>'
  WITH (
      security_protocol = 'SASL_SSL',
      sasl_mechanisms = 'SCRAM-SHA-256',
      sasl_username = '<UPSTASH_KAFKA_BROKER_USERNAME>',
      sasl_password = '<UPSTASH_KAFKA_BROKER_PASSWORD>'
  )
FORMAT BYTES;
Convert to JSON
CREATE MATERIALIZED VIEW segment_events_json AS
  SELECT
    CAST(data AS jsonb) AS data
  FROM (
    SELECT convert_from(data, 'utf8') AS data
    FROM upstash_segment
  );

It is efficient to use a materialized view here because, since in current versions, materialized views write output down to storage, the above is a stateless dataflow, messages are individually pulled into the compute layer, converted to JSON, and written back down to storage. This means conversion only happens once, and all later SQL can reference pre-parsed JSON.

Keep in mind, the materialized view segment_events_json is still a single view of all types of segment events (pageviews, identifies, tracks, etc...) and it has a single column data of type jsonb containing the metadata of each event. You'll need to do further transformation to begin generating value from it.

Tip: Materialize implements the same JSON referencing features as Postgres, see jsonb docs for examples.

Where to go from here

Now you have access to a real-time feed of Segment events in Materialize, with it, you can:

  1. Build Customer Data Platform features
  2. Monitor data for errors, anomalies or any other condition.
  3. Create a push feed of notifications

If you have any questions or issues, join us in the Materialize community where other users and Materialize engineers can help!

Join the Materialize Community

Join hundreds of other Materialize users and connect directly with our engineers.

Join the Community

© 2022 Materialize, Inc. Terms of Service