Ecosystem

Breezing through IoT with Materialize and AWS Managed Kafka

Craig Breslawski

Breezing through IoT with Materialize and AWS Managed Kafka image

A plethora of streaming data is produced every day, from weather information to social media analytics. With the field of IoT (Internet of Things) maturing rapidly, almost every device is turning smart – sensing, capturing, and transmitting bytes of information that need to be processed in real-time to derive quick business insights.

The critical question is: how should data engineers deal with massive streaming data sets from IoT devices? And more importantly, what tools should they use to build their data pipelines?

Stream-processing technologies and streaming event stores such as Apache Kafka® are commonly used by more than 60% of Fortune 100 companies. Kafka is designed to be distributed, horizontally scalable, and well suited for real-time streaming apps that deal with massive amounts of data. Amazon Managed Streaming for Apache Kafka (MSK) is a fully managed service that makes it easy for developers to build and run streaming applications on AWS.

Materialize is a streaming database built for real-time enterprise data applications that is easy to use, and powerful. IoT data streams from MSK can be seamlessly ingested and processed using Materialize at scale, using SQL for creating incremental in-memory materialized views, and querying.

It is helpful to pair MSK with technologies like Materialize, especially in scenarios that require real-time data ingestion and analytics.

Read along to see how these technologies work together.

The Scenario: Real-time weather reporting

Imagine you work for a weather station with different sensors that collect data about weather and environmental conditions (like temperature measurements). To warn citizens about potential risks, you need to report metrics like the average temperature in real-time as the data arrives.

This blog post will demonstrate a sample weather IoT application where Materialize consumes IoT temperature measurements from MSK. It will incrementally update the materialized view and serve results from memory, allowing for low latency queries using SQL. Let’s get started…

First, some prerequisites

AWS Setup – MSK Cluster and Client Machine

MSK is a paid AWS service that requires an AWS account. After you get an AWS account, you can set up an MSK cluster using the instructions here. In this blog, we have used an Ubuntu20 image with the t2.micro instance type.

Python version > 3.7

We will be using a python compatible Kafka library to generate and push our data to the topic. This library only supports python version > 3.7.

PostgreSQL

Materialize is wire-compatible with PostgreSQL. You can use tools like the ‘psql’ client provided in Postgres to connect to its cluster. You can download Postgres here.

Materialize

You will need Materialize installed and ready in your system. To download Materialize, click here.

Exploring the IoT weather application

The code used to generate our data and upload it to the Kafka producer can be found here. Follow the README.md file present in the repo to install and set up the codebase for execution.

The file “weather_gen.py” contains the code that generates the IoT weather data. The code also makes use of the weather API service. Using this API requires a couple of parameters, one being the API key and the other a valid city name.

The “weather_gen.py” file also contains a ‘config.json’. The input fields in the JSON configuration file are:

api_key: This key is used for the weather API app.

topic: The topic name under which the data will be generated.

broker: This is the broker IP through which the communication will happen. You should update this value in accordance with your AWS service. The broker DNS can be found under the “properties” tab inside “Broker Details”. You can use any one of the endpoint values.

cities: The list of cities that we track weather for. Note that cities need to be valid for the weather API to work. Also, multiple cities need to be ‘,’ separated. In this example, we have used the top 10 most populated cities in America as an example. When calling the weather API service, the “weather_gen.py” code uses a random city.

With the help of the weather API, we get real-time data of any city present in the configuration file.

Constructing the app schema and running the producer app

The data schema for our Kafka topic is as follows:

To start producing data, execute the following command:

python3 weather_gen.py

Defining Materialized Views in Materialize

With our producer application in place, we are continuously generating data and sending it to a given topic. We can now proceed with ingesting this data into Materialize and creating some materialized views to extract some useful information from our random weather data.

Creating a Source in Materialize

Materialize can ingest data from different external systems, in this case, a Kafka stream.

First, use the psql client to connect to the Materialize instance. In this case, it’s running on localhost on port 6875.

psql -U materialize -h localhost -p 6875 materialize

Now, to create the source run the following command:

CREATE source weather_iot_data
FROM kafka broker 'YOUR KAFKA BROKER DNS'
topic 'topic_name_in_config_json'
format bytes;

Create View

Next, create a view which converts the bytes output received from our Kafka source to a viewable table format. To achieve this, first, convert the byte stream output to JSON and then map all the fields to the respective columns.

The SQL statement to create the view is:

CREATE VIEW iot_json_view AS
SELECT
(data->>'index'):: integer as index,
(data->>'date')::  date as date,
(data->>'city'):: varchar as city,
(data->>'latitude'):: decimal as latitude,
(data->>'longitude'):: decimal as longitude,
(data->>'temp_celcius'):: decimal as temp_celcius,
(data->>'condition'):: varchar as condition,
(data->>'wind_kmph'):: decimal as wind_kmph,
(data->>'wind_degree'):: decimal as wind_degree,
(data->>'pressure'):: decimal as pressure
FROM (SELECT CONVERT_FROM(data, 'utf8')::jsonb AS data FROM weather_iot_data);

Let’s do averages

Let’s build a materialized view that will help us calculate the average temperature of all the cities that have been recorded.

CREATE materialized VIEW get_avg_temperature AS 
SELECT city, Avg(temp_celcius):: numeric(10, 2) AS average_temp
FROM iot_json_view
GROUP BY city;

Now, let’s query this view:

SELECT * from get_avg_temperature;

Tailing the results

Materialize provides fast SQL queries on continually updated data. To see this in action, Materialize provides an easy way to re-run queries using TAIL. The results can be copied to standard output (stdout).

COPY (TAIL get_avg_temperature) TO stdout;

By running this statement, you can see how the changes in the input data are reflected in Materialize: whenever a new temperature value arrives for a given city, Materialize updates the computed result. To exit from the command, simply hit ‘ctrl + c’.

Conclusion

Materialize makes it easy for developers to build real-time streaming data applications. Hopefully, the explanation and code examples help demonstrate how MSK and Materialize go well together. Despite our focus on streaming IoT, Materialize is ideal for powering other apps, such as reactive web and mobile apps, data segmentation applications, and operational data applications, where capturing and analyzing streaming data fast is important.

If you have questions or are interested in connecting with others using Materialize, join the community in Slack. We look forward to engaging with you, hearing your feedback, and learning about the new use cases you’re building out with Materialize.