Azure EventHubs are a cloud-scale telemetry ingestion from websites, apps and devices. Because of the tremendous event handling capacities, they are also being used in IoT architecture. Thus, we talk about handling millions of events per second.
With the implementation of multiple partition architecture behind the scenes, EventHubs are highly scalable to receive events from hundreds of sources. As the capability of ingestion of events is one important aspect of EventHubs, it also provides enough capabilities for the consumers to read events efficiently with high scalability.
Consuming events from the EventHubs
You can consume events from Event hubs using one of the following techniques
1. EventHubReceiver is used to receive events from a specific partition through a specific consumer group. But, do not provide much control on managing the receiving end of the EventHub
2. EventProcessorHost provides a more efficient way of receiving events from EventHubs with Checkpointing, Partition lease management. It is thread-safe, provides multi-process, safe running environment for event process implementations.
This blog post will focus on Checkpointing and setting up InitialOffsetProvider
Events arrive EventHubs at the partition level. EventProcessorHost is the .Net client for EventHub that is used extensively to consume events.
The following question is very common with the developers starting to use EventHubs
I have set the message retention day to 1 for my eventhubs but I am anyway seeing messages older than 1 one day in my event hub when I retrieve them”
One important concept to understand is “Message Retention” days. Mostly, it is misunderstood with “Time to Live (TTL)” in Queues, Topic or Subscription.
Message Retention – is to ensure the events are available for the specific time i.e. like a minimum guarantee that the events will be available for consumption. It does not mandate that the events will be deleted after this time.
Time to Live (TTL) – is to define the lifetime of the messages i.e. the messages will be lost or remove from the storage after this specific time irrespective of whether the message is read or not.
Events in EventHub partitions are not cleared after the Message Retention is elapsed nor they can be cleared programmatically. So there is a high likelihood that events from beyond the retention period are retrieved. This can be controlled by checkpointing and managing offsets properly.
This opens multiple scenario in consuming events. Let us discuss few with code examples.
A new EventHub sb-test-ns01-eh01 with 4 partitions is created for this purpose.
Configure the EventHub Shared Access Policies to Publish and Consume events to and from the EventHub respectively.
A new Storage Account sbtestns01eh01storage is created. This is used for maintaining checkpoint and partition lease details while consuming events from the EventHub.
Two .Net client applications, one for publishing events and the other for consuming events are developed.
Event Publisher Application
Refer the following packages and its dependencies in the application:
This application uses EventHubClient to publish events.
Event Consumer Application
Refer the following packages and its dependencies in the application:
The above EventProcessorHost uses EventHubConsumerGroup.DefaultGroupName to consume events.
Event Processor Implementation
Scenario 1: Clean Event Hub
The Event Publisher and Processor applications are connected to the EventHub and no events are processed as the partitions are clean and no events are yet published.
Once you start the publisher and the processor, you’ll notice the events start getting published and they are being processed.
Scenario 2: Event Hub with existing data
When an EventProcessorHost connect to an EventHub that already has events in the partitions, it starts processing all the older events that are still retained in the EventHub
Though no events are being published, the EventProcessor started processing all old read events from the EventHub partitions. Also, note the offset are empty when the leases are allocated.
Scenario 3: Consume any unread data
In many cases, it is unnecessary to process events that are read already. It is practical to process only unread events. Unread events can happen in two cases. One case is that the new events that are yet to arrive and the second is that when EventProcessor is idle due to some pause, not running due to error or intentional shutdown. The EventHub may be receiving events if there are active publishers.
In such cases, the EventProcessor has to start processing events that are received after the events that are processed previously. This is where Checkpointing plays a useful role.
The messages arrive at the EventProcessor in batches. It is suggested to set Checkpoint after processing each batch.
The checkpoint along with lease details for each partition for the consumer group will be stored in a container within the storage configured initially (this is the main reason why EventProcessor hosts expect you to supply a Azure blob storage details) at EventProcessorHost. The name of the lease will be eventhub path if not specified. The files in the container are a blob locations that store a json text.
When the EventProcessor connects to the EventHub and the leases are allocated, the EventProcessor will load details from these container files for the checkpoint offset and start processing events based on that. Based on the checkpoint, the offset is initialized.
The following represents that the EventProcessorHost is awaiting new events after the checkpoint offset and has not processed the old read events that are still present at the EventHub
The following represents that the EventProcessorHost is processing the events arrived after the checkpoint offset while it is not running or idle and has not processed the old read events that are still present at the EventHub. The new arriving events and the events that are not read can be differentiated based on timestamps present in both the sections
Note: If these container files are missing or deleted, the checkpoint data is lost and the EventProcessor will process all the events available in the EventHub.
For high scalable consumer throughput, multiple EventProcessorHosts can be used.
Checkpointing can also be done in various patterns, like end of batch processing or at regular intervals
Scenario 4: Consume only new data
This is an interesting scenario where you simply want to ignore all the old (existing events) and start consuming only the fresh ones. This is very common during development where you applied a new message type and wanted to ignore the previous ones, it’s also useful in a certain scenario where it doesn’t make sense to look into historical data.
When an EventProcessorHost initializes an EventProcessor, IntialOffsetProvider can be set in the EventProcessorOptions to start processing event from the set offset.
Note: Initial offset will only work once for each container lease. Once check-pointed the lease will start overriding initial-offset setting. This also means that initial offset will be overridden by the checkpoint offset even if the former is higher than the latter. So based on the required scenario, checkpoint and initial offset are to be handled appropriately.
Based on this scenario and the available EventHub capabilities, some changes to the initialization and checkpoint are done.
a. Assign a lease name at the EventProcessorHost initialization, so that a new container is created whenever the EventProcessorHost is initialized and there will be no checkpoint data. Hold on, if there are no checkpoint data, the EventProcessor would process old read event. Yes, but the following configurations will make sure only new events are processed.
b. Set the InitialOffsetProvider to read events in all partitions from now as mentioned below.
Both the above changes will ensure the EventProcessorHost will not have a Checkpoint offset value to consider or override the InitialOffsetProvider value. So the events will always be processed based on the initial offset option.
Note: The offset will not be assigned as there are no values in the storage container. But till process events that arrive only from the time the EventProcessor start listening
Also, every time the EventProcessorHost is initialized a new storage container will be created to store the partition lease management data. The containers can be cleared or deleted as needed later.
Though this blog post is a longer one, hope it helps to understand the behavior of the EventHub Consumers and efficient processing of events.