Building a real-time HTTP traffic stream with Apache Kafka
Data Science Engineer
10th Mar 2020
There are many reasons to record and analyze the traffic flowing in and out of your APIs. This data enables you to build audit logs or send alerts of anomalous activities, such as denial-of-service (DoS) attacks. More generally, you can also monitor the health and usage of your API and deeply understand customer behavior.
This article focuses on building a real-time pipeline for streaming HTTP traffic to Apache Kafka. By the end, we'll have built an example server application with Node.js, started Apache Kafka locally, and recorded data to Kafka from our server.
Kafka (short for Apache Kafka) is a high-performance distributed streaming platform. It's often used to centralize log management and to decouple data sources from data sinks. Kafka is a good choice for streaming data because it can ingest data from various sources at huge volumes. It's also tailor-made for real-time use cases, such as sending alerts of DoS attacks. Kafka also has various connectors for sending data to other services for further analysis. For example: Amazon S3, Azure Blob Storage, ElasticSearch, or HDFS.
All of the code and instructions for this tutorial can be found in the
meeshkan-express-kafka-demo GitHub repository.
We'll create a RESTful server with Express and record traffic logs in the HTTP Types format. HTTP Types is a human-readable JSON format for HTTP exchanges, with an example exchange looking as follows:
To log HTTP traffic from Express to Kafka, we'll need:
- Middleware converting Express requests and responses to HTTP Types objects. The @meeshkanml/express-middleware package handles this.
- A transport sending the HTTP Types objects to Kafka. This is provided by http-types-kafka.
We'll see how to put these together below.
Our server is defined in src/index.ts. The entry point to the program is the
main() function defined as follows:
Here, we're first creating a Kafka producer by defining the Kafka topic to write to and the list of brokers (consisting only of
http-types-kafka is a wrapper around kafkajs and
KafkaConfig is defined in
kafkaExchangeTransport is a function taking a
HttpExchange object and returning a promise.
In our case, this promise is defined as:
app is defined in the
buildApp function. This function is also in the
src/index.ts and looks like:
Here, we're using
express.json() middleware to parse request bodies as JSON. Express middleware for logging API traffic is created with the
httpTypesExpressMiddleware imported from the
@meeshkanml/express-middleware package. The object takes a list of transports as an argument, so we could also send our logs to other destinations such as a local file.
The actual user-facing API of our server is mounted on the
/users route defined in
usersRouter. The function creating the Express router takes an instance of
UserStore to access the list of users. For demonstration purposes, we define our synchronous in-memory user store as follows:
The store keeps an in-memory dictionary of users by mapping user IDs to
User objects. It also exposes
createUser methods for getting and creating users.
User requests are handled by our server as follows:
The router exposes
POST / and
GET /:userId routes for creating and fetching users, respectively. Remember the router is mounted to
/users, so the routes translate to
POST /users and
GET /users/:userId routes at top-level.
The request to create a new user is handled by validating the user input first. Creating a new user is then delegated to
userStore.createUser and the created
User object is sent back to the user as JSON.
Fetching a user is similar. The user ID given in the route must be a string, after which a user is fetched from
userStore.getUserbyId. The store returns
undefined if the user is not found, so that's converted to a response with status code 404.
Before starting our server, we need to start Kafka.
If you prefer to install Kafka on your own machine, you can follow the instructions in Kafka Quick Start. Alternatively, you can use Docker. Our demo repository has a Docker Compose file docker-compose.yml. This file starts a single instance of Zookeeper, a centralized service for maintaining configuration information, and a single instance of Kafka. The Docker Compose file has been copied from the kafka-stack-docker-compose repository with small modifications.
Using Docker Compose, we can use the command line to start the Kafka cluster by running:
-d flag starts the Kafka cluster in the background. Data stored in Kafka is persisted in the local
kafka-data/ directory so that data is not lost after stopping the containers. Kafka broker is listening at port 9092, which is also published by Docker.
Now we need to create a Kafka topic for our recordings. Run one of the following commands to create a topic named
http_recordings, depending on whether you have Kafka tools installed or not:
The latter command executes the
kafka-topics command inside the
kafka1 container started by Docker Compose.
To see messages arriving to Kafka, start a console consumer to consume the
Now we're ready to start our server and make some calls! You can start the server with:
Let's now make some calls to
Our Kafka console consumer should print HTTP exchanges line by line, showing we're successfully recording:
To show a potential use case for our HTTP recordings, we'll use the recordings to create an OpenAPI specification. This will be done using the
meeshkan Python tool. Our OpenAPI specification will then act as a contract - specifying the API endpoints and what data they consume or produce. It can be used for documentation or testing.
To get started, install
meeshkan from PyPI:
To create an OpenAPI specification to the directory
my_spec/, run the following command:
meeshkan will update the OpenAPI specification in memory whenever new data arrives in
http_recordings topic. Stop
Ctrl+C and the specification is written to
my_spec directory with an
openapi.json as follows:
Finally, we can close down our Kafka cluster:
To summarize, we created an Express server running in Node.js and added a middleware logging all HTTP exchanges to Apache Kafka. We also saw how to use
meeshkan to create an OpenAPI specification of our server.
If you haven't tried it yourself yet, you can follow the steps of this article in our GitHub repository.
Thank you for reading!