Building a real-time HTTP traffic stream with Apache Kafka

Kimmo Sääskilahti

Data Science Engineer

10th Mar 2020

There are many reasons to record and analyze the traffic flowing in and out of your APIs. This data enables you to build audit logs or send alerts of anomalous activities, such as denial-of-service (DoS) attacks. More generally, you can also monitor the health and usage of your API and deeply understand customer behavior.

This article focuses on building a real-time pipeline for streaming HTTP traffic to Apache Kafka. By the end, we'll have built an example server application with Node.js, started Apache Kafka locally, and recorded data to Kafka from our server.

Kafka (short for Apache Kafka) is a high-performance distributed streaming platform. It's often used to centralize log management and to decouple data sources from data sinks. Kafka is a good choice for streaming data because it can ingest data from various sources at huge volumes. It's also tailor-made for real-time use cases, such as sending alerts of DoS attacks. Kafka also has various connectors for sending data to other services for further analysis. For example: Amazon S3, Azure Blob Storage, ElasticSearch, or HDFS.

⚠️ Prerequisites:#

✅ Steps:#

  1. Creating a Node.js server
  2. Preparing Kafka
  3. Creating an OpenAPI specification from recordings
  4. Conclusion

All of the code and instructions for this tutorial can be found in the meeshkan-express-kafka-demo GitHub repository.

Creating a Node.js server#

We'll create a RESTful server with Express and record traffic logs in the HTTP Types format. HTTP Types is a human-readable JSON format for HTTP exchanges, with an example exchange looking as follows:

To log HTTP traffic from Express to Kafka, we'll need:

  1. Middleware converting Express requests and responses to HTTP Types objects. The @meeshkanml/express-middleware package handles this.
  2. A transport sending the HTTP Types objects to Kafka. This is provided by http-types-kafka.

We'll see how to put these together below.

Our server is defined in src/index.ts. The entry point to the program is the main() function defined as follows:

Here, we're first creating a Kafka producer by defining the Kafka topic to write to and the list of brokers (consisting only of localhost:9092). http-types-kafka is a wrapper around kafkajs and KafkaConfig is defined in kafkajs. kafkaExchangeTransport is a function taking a HttpExchange object and returning a promise.

In our case, this promise is defined as:

The Express app is defined in the buildApp function. This function is also in the src/index.ts and looks like:

Here, we're using express.json() middleware to parse request bodies as JSON. Express middleware for logging API traffic is created with the httpTypesExpressMiddleware imported from the @meeshkanml/express-middleware package. The object takes a list of transports as an argument, so we could also send our logs to other destinations such as a local file.

The actual user-facing API of our server is mounted on the /users route defined in usersRouter. The function creating the Express router takes an instance of UserStore to access the list of users. For demonstration purposes, we define our synchronous in-memory user store as follows:

The store keeps an in-memory dictionary of users by mapping user IDs to User objects. It also exposes getUserById and createUser methods for getting and creating users.

User requests are handled by our server as follows:

The router exposes POST / and GET /:userId routes for creating and fetching users, respectively. Remember the router is mounted to /users, so the routes translate to POST /users and GET /users/:userId routes at top-level.

The request to create a new user is handled by validating the user input first. Creating a new user is then delegated to userStore.createUser and the created User object is sent back to the user as JSON.

Fetching a user is similar. The user ID given in the route must be a string, after which a user is fetched from userStore.getUserbyId. The store returns undefined if the user is not found, so that's converted to a response with status code 404.

Preparing Kafka#

Before starting our server, we need to start Kafka.

If you prefer to install Kafka on your own machine, you can follow the instructions in Kafka Quick Start. Alternatively, you can use Docker. Our demo repository has a Docker Compose file docker-compose.yml. This file starts a single instance of Zookeeper, a centralized service for maintaining configuration information, and a single instance of Kafka. The Docker Compose file has been copied from the kafka-stack-docker-compose repository with small modifications.

Using Docker Compose, we can use the command line to start the Kafka cluster by running:

The -d flag starts the Kafka cluster in the background. Data stored in Kafka is persisted in the local kafka-data/ directory so that data is not lost after stopping the containers. Kafka broker is listening at port 9092, which is also published by Docker.

Now we need to create a Kafka topic for our recordings. Run one of the following commands to create a topic named http_recordings, depending on whether you have Kafka tools installed or not:

The latter command executes the kafka-topics command inside the kafka1 container started by Docker Compose.

To see messages arriving to Kafka, start a console consumer to consume the http_recordings topic:

Recording calls#

Now we're ready to start our server and make some calls! You can start the server with:

Let's now make some calls to localhost:3000 using curl:

Our Kafka console consumer should print HTTP exchanges line by line, showing we're successfully recording:

Creating an OpenAPI specification from recordings#

To show a potential use case for our HTTP recordings, we'll use the recordings to create an OpenAPI specification. This will be done using the meeshkan Python tool. Our OpenAPI specification will then act as a contract - specifying the API endpoints and what data they consume or produce. It can be used for documentation or testing.

To get started, install meeshkan from PyPI:

To create an OpenAPI specification to the directory my_spec/, run the following command:

meeshkan will update the OpenAPI specification in memory whenever new data arrives in http_recordings topic. Stop meeshkan with Ctrl+C and the specification is written to my_spec directory with an openapi.json as follows:

Finally, we can close down our Kafka cluster:


To summarize, we created an Express server running in Node.js and added a middleware logging all HTTP exchanges to Apache Kafka. We also saw how to use meeshkan to create an OpenAPI specification of our server.

If you haven't tried it yourself yet, you can follow the steps of this article in our GitHub repository.

meeshkan is still under development, so we very much appreciate any feedback. Feel free to comment below or try our tutorial.

Thank you for reading!

Newer postOlder post


ContactPricingAbout usT&CDocs