Get started with Redis Streams by creating an end-to-end solution for analyzing Twitter data Credit: Jérôme Praxhttps://unsplash.com/photos/U_m-mPOZzMI Redis Streams is a new Redis data structure that allows you to manage data channels between producers and consumers. In two previous articles here, I covered the basics and benefits of Redis Streams and how to use Redis Streams consumer groups to scale out data stream consumers. In this article, I will demonstrate how to develop a data stream processing application using Redis Streams. In order to bring Redis Streams to life, let’s build a real-world solution for identifying the top influencers on Twitter. This application, which I’ll call TopSocialForce, will gather social messages from Twitter, store them in Redis Streams, and analyze them in real time. TopSocialForce will discover the top social forces by tagging any Twitter handle with more than 10,000 followers as an “influencer” and maintaining a running catalog. TopSocialForce will collect Twitter stream data through a third-party service called PubNub. (GNIP is another option.) The tweets follow the JSON format as described by Twitter, so every tweet contains the Twitter handle and its follower count, which we’ll use for influencer classification. Redis Labs Figure 1. How TopSocialForce works. With TopSocialForce, we have a few challenges to address. The tweets arrive both at random and at the rate of thousands of tweets per second. When an influencer tweets a message, we have to leverage that opportunity to tag the influencer, as we cannot predict when that account will tweet again. Therefore, we cannot afford to skip any tweet. TopSocialForce must not only be resilient to connection loss or software failures, but also have the appropriate database and data structures to save and process all tweets. Redis Streams application components A Redis database with the Redis Streams data structure allows us to collect, store, and drive the processing of Twitter feeds while meeting these requirements. In this example, we’ll build TopSocialForce on the Java platform using a Redis client library called Lettuce. Redis client libraries are available in other programming languages too, so you can select whichever language platform and library best suit your particular needs. Because TopSocialForce receives the Twitter stream from a service called PubNub, we’ll also need the Java library for PubNub. Specifically, the components of our stack include: Database: Redis 5.0 The new Redis Streams data structure is available in Redis 5.0. Language platform: Java 1.8 We selected Java for the sake of its popularity, but you could write your application in many other programming languages that support Redis Streams. Check the full list of Redis clients for more information. Sample Twitter feed: PubNub PubNub offers a streaming API for Twitter data, with a free version that delivers a limited set of tweets for developers. Because we developed TopSocialForce as a proof of concept, we used PubNub’s free service. Redis library: Lettuce 5.0 Lettuce is a popular Java client library for Redis. Lettuce supports Redis Streams starting with Lettuce 5.0. At the time of writing, Lettuce 5.0 was a beta release. (Note: Redisson is another Java library that supports Redis Streams.) Redis Streams solution: A Twitter ingest stream and a Twitter influencer classifier TopSocialForce consists of two primary processes, i.e., a Twitter ingest stream and a Twitter influencer classifier. Both processes use Redis Streams as the underlying stream database to store the Twitter feed. Ensuring the solution is resilient to connection loss Traditional messaging modules such as publish/subscribe require that publishers and subscribers are in execution and connected to a common channel all the time. However, Redis Streams does not have that limitation. In contrast, Redis Streams provides a persistent data store for the streaming data. As shown in Figure 2 and Figure 3 below, our Redis Stream decouples the ingest stream from the influencer classifier. Redis Labs Figure 2. The data ingest stream continues collecting data even when consumers are disconnected. Redis Labs Figure 3. The consumer must wait for new data to arrive when the ingest stream is disconnected. Ensuring data won’t be lost in transit Redis supports both persistence and replication to ensure that your data will not be lost. With persistence enabled, Redis saves data to disk in case it needs to be recovered. With replication, you can also deploy a replica server that stores a copy of the data so the data at rest is durable. However, when a consumer consumes the data, typically it moves out of Redis to the consumer’s process. If the data is lost during that transition, it may be lost forever. Redis Streams mitigates this problem with the help of explicit “ack” commands in consumer groups. I covered consumer groups in detail in a previous article here. In our TopSocialForce application, the influencer classifier process has a consumer that belongs to the consumer group, called InfluencerConsumerGroup. The lifecycle of the data in transit is as follows: Our influencer classifier calls the XREADGROUP command of the Redis Stream. In response to the call, Redis does two things: Copies the latest data object to the pending list of objects for that consumer. Delivers the object to the consumer. The consumer (influencer classifier) processes the data and sends XACK, which prompts Redis to remove the data from the pending list. Redis Labs Figure 4. The Redis Streams data lifecycle ensures data safety during transit. If the consumer loses the data in transit or while processing, the data remains in the pending list and hence is not lost. The influencer classifier process can always retrieve data from the pending list using the XREADGROUP command. Influencer catalog data structures TopSocialForce uses built-in Redis data structures – namely, a Sorted Set and a Hash – to store our influencer data. In the Sorted Set, we store the Twitter handles and their follower counts as each score (see Figure 5). We use the Hash data structure with a key for each influencer to store their account details. The Hash is indexed by key, influencer:[twitter handle]. Figure 6 shows a sample Hash data structure. Redis Labs Figure 5. Redis’s Sorted Set data structure is used to store the influencers. Redis Labs Figure 6. Redis’s Hash data structure is used to store influencer details. Redis Streams application design: Class hierarchies and relationships Because we used Java as our language platform, we leveraged Java’s object-oriented programming features to make TopSocialForce easily extensible. The UML class diagram in Figure 7 below gives a general idea of our class structure, and you can find this complete package on GitHub. In the following section, I’ll explain at a high level how the programs are organized and what they do. Feel free to read the documentation inside the programs themselves for further details. Common components LettuceConnection.java: This is a utility class that manages our connection to the Redis database via the Lettuce library. Other classes in the package use LettuceConnection objects to connect to Redis. InitializeConsumerGroup.java: In this class, we created the Redis Streams data structure and a new consumer group. By doing so, we initialized the database to receive new data from the producer and pass it on to the consumers. This program only needs to be run once to initialize the data structures. Ingest stream (producer) IngestStream.java: This is a parent class with the generic set of properties and methods required for any kind of ingest stream program. TwitterIngestStream.java: This class extends IngestStream, inheriting all of its protected and public properties and methods. In addition to the generic features, TwitterIngestStream adds features that are specific to Twitter’s data stream. Influencer classifier (consumer) InfluencerCollectorMain.java: This is the main Java class that starts the process. MessageProcessor.java: This is a Java interface that defines the processMessage() method. StreamConsumer.java: This is a generic class that reads new data as it arrives in our Redis Stream. StreamConsumer extends Thread and therefore runs as a separate thread. A StreamConsumer object passes on new data to the registered MessageProcessor object. InfluencerMessageProcessor.java: This class implements a MessageProcessor interface. In our program, we pass an InfluencerMessageProcessor object as the type MessageProcessor to StreamConsumer. InfluencerMessageProcessor then stores our influencer data in Redis using the Sorted Set and Hash data structures. You can implement your own version of MessageProcessor and pass it to StreamConsumer. Redis Labs Figure 7. The Java classes of the TopSocialForce application displayed as a UML class diagram. Running and testing the Redis Streams application Our documentation on GitHub provides details on how to compile and run these programs. However, before you get started, make sure you have the right connection parameters set in LettuceConnection.java. In a nutshell, just follow these steps: InitializeConsumerGroup: This program initializes a Redis stream and its consumer groups. You must run this before you collect and process Twitter data, and it will persist after initializing the data structures. TwitterIngestStream: Before you compile TwitterIngestStream.java, make sure you have updated your PubNub connection credentials. When you compile and run this program, it connects to PubNub and starts receiving the Twitter data stream. The program then stores the data using the Redis Streams data structure we initialized in the previous step. TwitterIngestStream reads the Twitter data continuously and does not terminate on its own. InfluencerCollectorMain: This class starts the StreamConsumer as a separate thread. StreamConsumer reads the new data in your Redis stream and passes it to the MessageProcessor object. The StreamConsumer thread makes a blocking call to Redis Streams. Therefore, it does not terminate on its own, but waits for new data in the Redis stream if there is none. When you execute TwitterIngestStream and InfluencerCollectorMain, make sure to test for the following: TwitterIngestStream and InfluencerCollector are running. Instances of InfluencerCollector objects are consuming the data. Failure scenario: TwitterIngestStream is down. Failure scenario: InfluencerCollector is down. Verifying the data in your Redis streams There’s more than one way to verify your data. Here I’ll show a few sample commands using the redis-cli interface: Data in Redis Streams Is the data growing? Run the following command a few times to see if the count is changing. XLEN twitterstream How can I know more about the data in Redis streams? The command below prints basic information such as the stream’s length, last generated ID, first entry, last entry, etc. XINFO STREAM twitterstream Can I get a list of the consumer groups attached to my stream? The following command lists all of the groups associated with the stream. XINFO GROUPS twitterstream Are the consumers in a consumer group idle or running? This command lists all of the consumers within the consumer group influencerclassifiers, their idle time, and the number of pending items to process. XINFO CONSUMERS twitterstream influencerclassifiers Influencers catalog How many influencers have I collected so far? ZCARD influencers What about the details about an influencer, say ABCD? HGETALL influencer:ABCD Next steps with Redis Streams My three-part series on Redis Streams started with an introduction to using Redis Streams for different use cases. In the second article, I walked through the details of the Redis Streams data lifecycle, from both the producer and the consumer points of view. In this final piece, I showed how you can apply the concepts we covered earlier to develop an end-to-end application using Redis Streams. As you scale up an application like TopSocialForce, you’ll appreciate how Redis Streams remains sleek, simple, and solid no matter how many data producers or consumers you’re managing. I sincerely hope this article series helped spark some ideas for how you could benefit from the new Redis Streams data structure. Redis Streams is available in Redis beginning with Redis 5.0. It is also available in Redis Enterprise, which offers high availability and durability with in-memory replication. Roshan Kumar is a senior product manager at Redis Labs. He has extensive experience in software development and technology marketing. Roshan has worked at Hewlett-Packard and many successful Silicon Valley startups including ZillionTV, Salorix, Alopa, and ActiveVideo. As an enthusiastic programmer, he designed and developed mindzeal.com, an online platform hosting computer programming courses for young students. Roshan holds a bachelor’s degree in computer science and an MBA from Santa Clara University. — New Tech Forum provides a venue to explore and discuss emerging enterprise technology in unprecedented depth and breadth. The selection is subjective, based on our pick of the technologies we believe to be important and of greatest interest to InfoWorld readers. InfoWorld does not accept marketing collateral for publication and reserves the right to edit all contributed content. Send all inquiries to newtechforum@infoworld.com. Related content feature What is Rust? Safe, fast, and easy software development Unlike most programming languages, Rust doesn't make you choose between speed, safety, and ease of use. Find out how Rust delivers better code with fewer compromises, and a few downsides to consider before learning Rust. By Serdar Yegulalp Nov 20, 2024 11 mins Rust Programming Languages Software Development how-to Kotlin for Java developers: Classes and coroutines Kotlin was designed to bring more flexibility and flow to programming in the JVM. Here's an in-depth look at how Kotlin makes working with classes and objects easier and introduces coroutines to modernize concurrency. By Matthew Tyson Nov 20, 2024 9 mins Java Kotlin Programming Languages analysis Azure AI Foundry tools for changes in AI applications Microsoft’s launch of Azure AI Foundry at Ignite 2024 signals a welcome shift from chatbots to agents and to using AI for business process automation. By Simon Bisson Nov 20, 2024 7 mins Microsoft Azure Generative AI Development Tools news Microsoft unveils imaging APIs for Windows Copilot Runtime Generative AI-backed APIs will allow developers to build image super resolution, image segmentation, object erase, and OCR capabilities into Windows applications. By Paul Krill Nov 19, 2024 2 mins Generative AI APIs Development Libraries and Frameworks Resources Videos