Need help on using Azure event hubs in the following scenario. I think consumer groups might be the right option for this scenario, but I was not able to find a concrete example online.
在以下方案中需要有关使用Azure事件中心的帮助。我认为消费者群体可能是这种情况的正确选择,但我无法在网上找到具体的例子。
Here is the rough description of the problem and the proposed solution using the event hubs (I am not sure if this is the optimal solution. Will appreciate your feedback)
以下是问题的粗略描述以及使用事件中心的建议解决方案(我不确定这是否是最佳解决方案。非常感谢您的反馈)
I have multiple event-sources that generate a lot of event data (telemetry data from sensors) which needs to be saved to our database and some analysis like running average, min-max should be performed in parallel.
我有多个事件源可以生成大量的事件数据(来自传感器的遥测数据),需要保存到我们的数据库中,并且应该并行执行一些分析,如运行平均值,最小值 - 最大值。
The sender can only send data to a single endpoint, but the event-hub should make this data available to both the data handlers.
发送方只能将数据发送到单个端点,但事件中心应该使这些数据可供两个数据处理程序使用。
I am thinking about using two consumer groups, first one will be a cluster of worker role instances that take care of saving the data to our key-value store and the second consumer group will be an analysis engine (likely to go with Azure Stream Analysis).
我正在考虑使用两个使用者组,第一个将是一组工作者角色实例,负责将数据保存到我们的键值存储,第二个使用者组将是一个分析引擎(可能与Azure流分析一起使用) )。
Firstly, how do I setup the consumer groups and is there something that I need to do on the sender/receiver side such that copies of events appear on all consumer groups?
首先,我如何设置消费者群体,是否需要在发送者/接收者方面做些事情,以便事件副本出现在所有消费者群体中?
I did read many examples online, but they either use client.GetDefaultConsumerGroup(); and/or have all partitions processed by multiple instances of a same worker role.
我在线阅读了很多例子,但他们要么使用client.GetDefaultConsumerGroup();和/或让所有分区由同一辅助角色的多个实例处理。
For my scenario, when a event is triggered, it needs to be processed by two different worker roles in parallel (one that saves the data and second one that does some analysis)
对于我的场景,当一个事件被触发时,它需要由两个不同的工作者角色并行处理(一个保存数据,另一个进行一些分析)
Thank You!
谢谢!
1 个解决方案
#1
23
TLDR: Looks reasonable, just make two Consumer Groups by using different names with CreateConsumerGroupIfNotExists.
TLDR:看起来很合理,只需使用CreateConsumerGroupIfNotExists使用不同的名称来创建两个使用者组。
Consumer Groups are primarily a concept so exactly how they work depends on how your subscribers are implemented. As you know, conceptually they are a group of subscribers working together so that each group receives all the messages and under ideal (won't happen) circumstances probably consumes each message once. This means that each Consumer Group will "have all partitions processed by multiple instances of the same worker role." You want this.
消费者群体主要是一个概念,因此它们的工作方式取决于您的订阅者的实施方式。如您所知,从概念上讲,它们是一组一起工作的订阅者,以便每个组接收所有消息,并且在理想情况下(不会发生)情况可能会消耗每个消息一次。这意味着每个使用者组将“使所有分区由同一辅助角色的多个实例处理”。你要这个。
This can be implemented in different ways. Microsoft has provided two ways to consume messages from Event Hubs directly plus the option to use things like Streaming Analytics which are probably built on top of the two direct ways. The first way is the Event Hub Receiver, the second which is higher level is the Event Processor Host.
这可以以不同方式实现。 Microsoft提供了两种直接使用Event Hubs消息的方法,以及使用Streaming Analytics等可能基于两种直接方式构建的选项。第一种方式是Event Hub Receiver,第二种方式是更高级别的事件处理器主机。
I have not used Event Hub Receiver directly so this particular comment is based on the theory of how these sorts of systems work and speculation from the documentation: While they are created from EventHubConsumerGroups this serves little purpose as these receivers do not coordinate with one another. If you use these you will need to (and can!) do all the coordination and committing of offsets yourself which has advantages in some scenarios such as writing the offset to a transactional DB in the same transaction as computed aggregates. Using these low level receivers, having different logical consumer groups using the same Azure consumer group probably shouldn't (normative not practical advice) be particularly problematic, but you should use different names in case it either does matter or you change to EventProcessorHosts.
我没有直接使用Event Hub Receiver所以这个特别的评论是基于这些系统如何工作的理论和文档中的推测:虽然它们是从EventHubConsumerGroups创建的,但由于这些接收器不相互协调,因此它没有用处。如果您使用这些,您将需要(并且可以!)自己完成所有协调和提交偏移,这在某些情况下具有优势,例如在与计算聚合相同的事务中将偏移量写入事务DB。使用这些低级别接收器,使用相同Azure使用者组的不同逻辑使用者组可能不应该(规范性而非实用的建议)特别成问题,但是如果要么重要或者您更改为EventProcessorHosts,则应使用不同的名称。
Now onto more useful information, EventProcessorHosts are probably built on top of EventHubReceivers. They are a higher level thing and there is support to enable multiple machines to work together as a logical consumer group. Below I've included a lightly edited snippet from my code that makes an EventProcessorHost with a bunch of comments left in explaining some choices.
现在转到更有用的信息,EventProcessorHosts可能建立在EventHubReceivers之上。它们是更高级别的东西,并且支持使多台机器作为逻辑消费者组一起工作。下面我已经从我的代码中包含了一个轻微编辑的片段,它使得一个EventProcessorHost在解释一些选择时留下了一堆注释。
//We need an identifier for the lease. It must be unique across concurrently //running instances of the program. There are three main options for this. The //first is a static value from a config file. The second is the machine's NETBIOS//name ie System.Environment.MachineName. The third is a random value unique per run which//we have chosen here, if our VMs have very weak randomness bad things may happen.string hostName = Guid.NewGuid().ToString();//It's not clear if we want this here long term or if we prefer that the Consumer //Groups be created out of band. Nor are there necessarily good tools to discover //existing consumer groups.NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(eventHubConnectionString);EventHubDescription ehd = namespaceManager.GetEventHub(eventHubPath);namespaceManager.CreateConsumerGroupIfNotExists(ehd.Path, consumerGroupName);host = new EventProcessorHost(hostName, eventHubPath, consumerGroupName, eventHubConnectionString, storageConnectionString, leaseContainerName);//Call something like this when you want it to starthost.RegisterEventProcessorFactoryAsync(factory)You'll notice that I told Azure to make a new Consumer Group if it doesn't exist, you'll get a lovely error message if it doesn't. I honestly don't know what the purpose of this is because it doesn't include the Storage connection string which needs to be the same across instances in order for the EventProcessorHost's coordination (and presumably commits) to work properly.
您会注意到我告诉Azure创建一个新的消费者组,如果它不存在,如果没有,您将得到一个可爱的错误消息。老实说,我不知道这是什么目的,因为它不包括跨实例需要相同的存储连接字符串,以便EventProcessorHost的协调(并且可能是提交)正常工作。
Here I've provided a picture from Azure Storage Explorer of leases the leases and presumably offsets from a Consumer Group I was experimenting with in November. Note that while I have a testhub and a testhub-testcg container, this is due to manually naming them. If they were in the same container it would be things like "$Default/0" vs "testcg/0".
在这里,我提供了一张来自Azure Storage Explorer的图片,租用了我在11月试验的消费者群体的租约和可能的偏差。请注意,虽然我有一个testhub和一个testhub-testcg容器,但这是由于手动命名它们。如果它们在同一个容器中,那么就像“$ Default / 0”vs“testcg / 0”。
As you can see there is one blob per partition. My assumption is that these blobs are used for two things. The first of these is the Blob leases for distributing partitions amongst instances see here, the second is storing the offsets within the partition that have been committed.
如您所见,每个分区有一个blob。我的假设是这些blob用于两件事。第一个是用于在实例之间分配分区的Blob租约,请参见此处,第二个是在已提交的分区中存储偏移量。
Rather than the data getting pushed to the Consumer Groups the consuming instances are asking the storage system for data at some offset in one partition. EventProcessorHosts are a nice high level way of having a logical consumer group where each partition is only getting read by one consumer at a time, and where the progress the logical consumer group has made in each partition is not forgotten.
消费实例不是将数据推送到消费者组,而是要求存储系统在一个分区中的某个偏移量处获取数据。 EventProcessorHosts是一个很好的高级方式,拥有一个逻辑消费者组,每个分区一次只能被一个消费者读取,并且不会忘记逻辑消费者组在每个分区中所取得的进展。
Remember that the throughput per partition is measured so that if you're maxing out ingress you can only have two logical consumers that are all up to speed. As such you'll want to make sure you have enough partitions, and throughput units, that you can:
请记住,每个分区的吞吐量都是经过测量的,这样如果您最大限度地减少进入,那么您只能拥有两个完全符合速度的逻辑消费者。因此,您需要确保有足够的分区和吞吐量单位,您可以:
In conclusion: consumer groups are what you need. The examples you read that use a specific consumer group are good, within each logical consumer group use the same name for the Azure Consumer Group and have different logical consumer groups use different ones.
总之:消费者群体是您所需要的。您阅读的使用特定使用者组的示例很好,在每个逻辑使用者组中使用Azure使用者组的相同名称,并且不同的逻辑使用者组使用不同的名称。
I haven't yet used Azure Stream Analytics, but at least during the preview release you are limited to the default consumer group. So don't use the default consumer group for something else, and if you need two separate lots of Azure Stream Analytics you may need to do something nasty. But it's easy to configure!
我还没有使用过Azure Stream Analytics,但至少在预览版本中,您只能使用默认的使用者群组。因此,不要将默认的使用者组用于其他内容,如果您需要两个单独的Azure Stream Analytics,您可能需要做一些令人讨厌的事情。但它很容易配置!
【感谢龙石为本站提供信息共享平台 http://www.longshidata.com/pages/exchange.html】