当前位置 : 主页 > 编程语言 > java >

Hazelcast Support(支持)

来源:互联网 收集:自由互联 发布时间:2022-12-23
Spring 集成提供通道适配器和其他实用程序组件,以与内存数据网格Hazelcast进行交互。 您需要将此依赖项包含在项目中: dependency groupIdorg.springframework.integration/groupId artifactIdspring-integr

Hazelcast Support(支持)_Source

Spring 集成提供通道适配器和其他实用程序组件,以与内存数据网格 Hazelcast 进行交互。

您需要将此依赖项包含在项目中:

<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-hazelcast</artifactId> <version>6.0.0</version></dependency>

Hazelcast 组件的 XML 命名空间和架构位置定义包括:

xmlns:int-hazelcast="http://www.springframework.org/schema/integration/hazelcast"xsi:schemaLocation="http://www.springframework.org/schema/integration/hazelcast https://www.springframework.org/schema/integration/hazelcast/spring-integration-hazelcast.xsd"

Hazelcast 事件驱动的入站通道适配器

Hazelcast提供分布式数据结构,例如:

  • ​​com.hazelcast.map.IMap​​
  • ​​com.hazelcast.multimap.MultiMap​​
  • ​​com.hazelcast.collection.IList​​
  • ​​com.hazelcast.collection.ISet​​
  • ​​com.hazelcast.collection.IQueue​​
  • ​​com.hazelcast.topic.ITopic​​
  • ​​com.hazelcast.replicatedmap.ReplicatedMap​​

它还提供事件侦听器,以便侦听对这些数据结构所做的修改。

  • ​​com.hazelcast.core.EntryListener<K, V>​​
  • ​​com.hazelcast.collection.ItemListener​​
  • ​​com.hazelcast.topic.MessageListener​​

Hazelcast 事件驱动的入站通道适配器侦听相关的缓存事件,并将事件消息发送到定义的通道。 它支持XML和JavaConfig驱动的配置。

XML 配置:

<int-hazelcast:inbound-channel-adapter channel="mapChannel" cache="map" cache-events="UPDATED, REMOVED" cache-listening-policy="SINGLE" />

Hazelcast 事件驱动的入站通道适配器需要以下属性:

  • ​​channel​​:指定消息发送到的通道;
  • ​​cache​​:指定侦听的分布式对象引用。 这是一个强制性属性;
  • ​​cache-events​​:指定侦听的缓存事件。 它是一个可选属性,其默认值为 。 其支持的值如下:ADDED
  • 和 支持的缓存事件类型:、、、 和IMapMultiMapADDEDREMOVEDUPDATEDEVICTEDEVICT_ALLCLEAR_ALL;
  • 支持的缓存事件类型:、、、ReplicatedMapADDEDREMOVEDUPDATEDEVICTED;
  • 支持的缓存事件类型 和 : , 。 没有 的缓存事件类型。IListISetIQueueADDEDREMOVEDITopic
  • ​​cache-listening-policy​​:将缓存侦听策略指定为 或 。 它是一个可选属性,其默认值为 。 侦听具有相同缓存事件属性的同一缓存对象的每个 Hazelcast 入站通道适配器都可以接收单个事件消息或所有事件消息。 如果是,则侦听具有相同缓存事件属性的同一缓存对象的所有 Hazelcast 入站通道适配器都将收到所有事件消息。 如果是,他们将收到唯一的事件消息。SINGLEALLSINGLEALLSINGLE

一些配置示例:

分布式地图

<int:channel id="mapChannel"/><int-hazelcast:inbound-channel-adapter channel="mapChannel" cache="map" cache-events="UPDATED, REMOVED" /><bean id="map" factory-bean="instance" factory-method="getMap"> <constructor-arg value="map"/></bean><bean id="instance" class="com.hazelcast.core.Hazelcast" factory-method="newHazelcastInstance"> <constructor-arg> <bean class="com.hazelcast.config.Config" /> </constructor-arg></bean>

分布式多地图

<int-hazelcast:inbound-channel-adapter channel="multiMapChannel" cache="multiMap" cache-events="ADDED, REMOVED, CLEAR_ALL" /><bean id="multiMap" factory-bean="instance" factory-method="getMultiMap"> <constructor-arg value="multiMap"/></bean>

分布式列表

<int-hazelcast:inbound-channel-adapter channel="listChannel" cache="list" cache-events="ADDED, REMOVED" cache-listening-policy="ALL" /><bean id="list" factory-bean="instance" factory-method="getList"> <constructor-arg value="list"/></bean>

分布式集

<int-hazelcast:inbound-channel-adapter channel="setChannel" cache="set" /><bean id="set" factory-bean="instance" factory-method="getSet"> <constructor-arg value="set"/></bean>

分布式队列

<int-hazelcast:inbound-channel-adapter channel="queueChannel" cache="queue" cache-events="REMOVED" cache-listening-policy="ALL" /><bean id="queue" factory-bean="instance" factory-method="getQueue"> <constructor-arg value="queue"/></bean>

分布式主题

<int-hazelcast:inbound-channel-adapter channel="topicChannel" cache="topic" /><bean id="topic" factory-bean="instance" factory-method="getTopic"> <constructor-arg value="topic"/></bean>

复制的地图

<int-hazelcast:inbound-channel-adapter channel="replicatedMapChannel" cache="replicatedMap" cache-events="ADDED, UPDATED, REMOVED" cache-listening-policy="SINGLE" /><bean id="replicatedMap" factory-bean="instance" factory-method="getReplicatedMap"> <constructor-arg value="replicatedMap"/></bean>

Java 配置示例:

以下示例显示了一个配置。 相同的配置可用于其他分布式数据结构(、、、、 和 ):​​DistributedMap​​​​IMap​​​​MultiMap​​​​ReplicatedMap​​​​IList​​​​ISet​​​​IQueue​​​​ITopic​​

@Beanpublic PollableChannel distributedMapChannel() { return new QueueChannel();}@Beanpublic IMap<Integer, String> distributedMap() { return hazelcastInstance().getMap("Distributed_Map");}@Beanpublic HazelcastInstance hazelcastInstance() { return Hazelcast.newHazelcastInstance();}@Beanpublic HazelcastEventDrivenMessageProducer hazelcastEventDrivenMessageProducer() { final HazelcastEventDrivenMessageProducer producer = new HazelcastEventDrivenMessageProducer(distributedMap()); producer.setOutputChannel(distributedMapChannel()); producer.setCacheEventTypes("ADDED,REMOVED,UPDATED,CLEAR_ALL"); producer.setCacheListeningPolicy(CacheListeningPolicyType.SINGLE); return producer;}

榛子连续查询入站通道适配器

Hazelcast 连续查询允许侦听对特定地图条目执行的修改。 Hazelcast 连续查询入站通道适配器是一个事件驱动的通道适配器,它根据定义的谓词侦听相关的分布式映射事件。

@Beanpublic PollableChannel cqDistributedMapChannel() { return new QueueChannel();}@Beanpublic IMap<Integer, String> cqDistributedMap() { return hazelcastInstance().getMap("CQ_Distributed_Map");}@Beanpublic HazelcastInstance hazelcastInstance() { return Hazelcast.newHazelcastInstance();}@Beanpublic HazelcastContinuousQueryMessageProducer hazelcastContinuousQueryMessageProducer() { final HazelcastContinuousQueryMessageProducer producer = new HazelcastContinuousQueryMessageProducer(cqDistributedMap(), "surname=TestSurname"); producer.setOutputChannel(cqDistributedMapChannel()); producer.setCacheEventTypes("UPDATED"); producer.setIncludeValue(false); return producer;}

它支持六个属性,如下所示:

  • ​​channel​​:指定消息发送到的通道;
  • ​​cache​​:指定侦听的分布式映射引用。 命令的;
  • ​​cache-events​​:指定侦听的缓存事件。 作为其默认值的可选属性。 支持的值为 、、 和ADDEDADDEDREMOVEDUPDATEDEVICTEDEVICT_ALLCLEAR_ALL;
  • ​​predicate​​:指定用于侦听对特定映射条目执行的修改的谓词。 命令的;
  • ​​include-value​​:指定在连续查询结果中包含值和旧值。 可选为默认值;true
  • ​​cache-listening-policy​​:将缓存侦听策略指定为 或 。 可选,默认值为 。 每个侦听具有相同缓存事件属性的相同缓存对象的 Hazelcast CQ 入站通道适配器都可以接收单个事件消息或所有事件消息。 如果是,则侦听具有相同缓存事件属性的同一缓存对象的所有 Hazelcast CQ 入站通道适配器都将收到所有事件消息。 如果是,他们将收到唯一的事件消息。SINGLEALLSINGLEALLSINGLE

Hazelcast 群集监视器入站通道适配器

Hazelcast 群集监视器支持侦听在群集上执行的修改。 Hazelcast 群集监视器入站通道适配器是一个事件驱动的通道适配器,侦听相关的成员资格、分布式对象、迁移、生命周期和客户端事件:

@Beanpublic PollableChannel eventChannel() { return new QueueChannel();}@Beanpublic HazelcastInstance hazelcastInstance() { return Hazelcast.newHazelcastInstance();}@Beanpublic HazelcastClusterMonitorMessageProducer hazelcastClusterMonitorMessageProducer() { HazelcastClusterMonitorMessageProducer producer = new HazelcastClusterMonitorMessageProducer(hazelcastInstance()); producer.setOutputChannel(eventChannel()); producer.setMonitorEventTypes("DISTRIBUTED_OBJECT"); return producer;}

它支持以下三个属性:

  • ​​channel​​:指定消息发送到的通道;
  • ​​hazelcast-instance​​:指定用于侦听群集事件的 Hazelcast 实例引用。 这是一个强制性属性;
  • ​​monitor-types​​:指定侦听的监视器类型。 它是一个可选属性,是默认值。 支持的值为 、、、、。MEMBERSHIPMEMBERSHIPDISTRIBUTED_OBJECTMIGRATIONLIFECYCLECLIENT

Hazelcast 分布式 SQL 入站通道适配器

Hazelcast允许在分布式地图上运行分布式查询。 Hazelcast 分布式 SQL 入站通道适配器是一个轮询入站通道适配器。 它运行定义的分布式 sql 命令,并根据迭代类型返回结果。

@Beanpublic PollableChannel dsDistributedMapChannel() { return new QueueChannel();}@Beanpublic IMap<Integer, String> dsDistributedMap() { return hazelcastInstance().getMap("DS_Distributed_Map");}@Beanpublic HazelcastInstance hazelcastInstance() { return Hazelcast.newHazelcastInstance();}@Bean@InboundChannelAdapter(value = "dsDistributedMapChannel", poller = @Poller(maxMessagesPerPoll = "1"))public HazelcastDistributedSQLMessageSource hazelcastDistributedSQLMessageSource() { final HazelcastDistributedSQLMessageSource messageSource = new HazelcastDistributedSQLMessageSource(dsDistributedMap(), "name='TestName' AND surname='TestSurname'"); messageSource.setIterationType(DistributedSQLIterationType.ENTRY); return messageSource;}

它需要一个轮询器并支持四个属性:

  • ​​channel​​:指定消息发送到的通道。 这是一个强制性属性;
  • ​​cache​​:指定要查询的分布式引用。 它是必需属性;IMap
  • ​​iteration-type​​:指定结果类型。 分布式 SQL 可以在 、 或 上运行。 它是一个可选属性,是默认属性。 支持的值为 ,和EntrySetKeySetLocalKeySetValuesVALUEENTRY, `KEYLOCAL_KEYVALUE;
  • ​​distributed-sql​​:指定 sql 语句的 where 子句。 这是一个必需属性。

榛子出站通道适配器

Hazelcast 出站通道适配器侦听其定义的通道,并将传入消息写入相关的分布式缓存。 它需要 之一 或 用于分布式对象定义。 支持的分布式对象包括:、 和 。​​cache​​​​cache-expression​​​​HazelcastHeaders.CACHE_NAME​​​​IMap​​​​MultiMap​​​​ReplicatedMap​​​​IList​​​​ISet​​​​IQueue​​​​ITopic​​

@Beanpublic MessageChannel distributedMapChannel() { return new DirectChannel();}@Beanpublic IMap<Integer, String> distributedMap() { return hzInstance().getMap("Distributed_Map");}@Beanpublic HazelcastInstance hzInstance() { return Hazelcast.newHazelcastInstance();}@Bean@ServiceActivator(inputChannel = "distributedMapChannel")public HazelcastCacheWritingMessageHandler hazelcastCacheWritingMessageHandler() { HazelcastCacheWritingMessageHandler handler = new HazelcastCacheWritingMessageHandler(); handler.setDistributedObject(distributedMap()); handler.setKeyExpression(new SpelExpressionParser().parseExpression("payload.id")); handler.setExtractPayload(true); return handler;}

它需要以下属性:

  • ​​channel​​:指定消息发送到的通道;
  • ​​cache​​:指定分布式对象引用。 自选;
  • ​​cache-expression​​:通过 Spring 表达式语言 (SpEL) 指定分布式对象。 自选;
  • ​​key-expression​​:通过 Spring 表达式语言 (SpEL) 指定键值对的键。 可选且仅对 和分布式数据结构是必需的。IMapMultiMapReplicatedMap
  • ​​extract-payload​​:指定是发送整条消息还是仅发送有效负载。 默认属性为可选属性。 如果为 true,则只有有效负载将写入分布式对象。 否则,将通过转换消息头和有效负载来写入整个消息。true

通过在标头中设置分布式对象名称,可以通过同一通道将消息写入不同的分布式对象。 如果未定义 或 属性,则必须在请求中设置标头。​​cache​​​​cache-expression​​​​HazelcastHeaders.CACHE_NAME​​​​Message​​

榛子领袖选举

如果需要领导者选举(例如,对于只有一个节点应该接收消息的高可用性消息使用者),可以使用基于 Hazelcast 的:​​LeaderInitiator​​

@Beanpublic HazelcastInstance hazelcastInstance() { return Hazelcast.newHazelcastInstance();}@Beanpublic LeaderInitiator initiator() { return new LeaderInitiator(hazelcastInstance());}

当一个节点被选为领导者时,它将向所有应用程序侦听器发送一个。​​OnGrantedEvent​​

榛子消息存储

对于分布式消息传递状态管理,例如对于持久或跟踪消息组,提供了以下实现:​​QueueChannel​​​​Aggregator​​​​HazelcastMessageStore​​

@Beanpublic HazelcastInstance hazelcastInstance() { return Hazelcast.newHazelcastInstance();}@Beanpublic MessageGroupStore messageStore() { return new HazelcastMessageStore(hazelcastInstance());}

默认情况下,用于将消息和组存储为键/值。 任何自定义都可以提供给 .​​SPRING_INTEGRATION_MESSAGE_STORE​​​​IMap​​​​IMap​​​​HazelcastMessageStore​​

榛子广播元数据存储

可以使用后备 Hazelcast 来实现 。 默认地图是使用可自定义的名称创建的。​​ListenableMetadataStore​​​​IMap​​​​SPRING_INTEGRATION_METADATA_STORE​​

@Beanpublic HazelcastInstance hazelcastInstance() { return Hazelcast.newHazelcastInstance();}@Beanpublic MetadataStore metadataStore() { return new HazelcastMetadataStore(hazelcastInstance());}

该实现允许您注册自己的侦听器类型以通过 侦听事件。​​HazelcastMetadataStore​​​​ListenableMetadataStore​​​​MetadataStoreListener​​​​addListener(MetadataStoreListener callback)​​

榛子锁注册表

可以使用后备 Hazelcast 分布式支持来实现 a:​​LockRegistry​​​​ILock​​

@Beanpublic HazelcastInstance hazelcastInstance() { return Hazelcast.newHazelcastInstance();}@Beanpublic LockRegistry lockRegistry() { return new HazelcastLockRegistry(hazelcastInstance());}

当与共享(例如 存储管理),可用于跨多个应用程序实例提供此功能,以便一次只有一个实例可以操作组。​​MessageGroupStore​​​​Aggregator​​​​HazelcastLockRegistry​​

对于所有分布式操作,必须在 上启用 CP 子系统。​​HazelcastInstance​​

上一篇:没有了
下一篇:没有了
网友评论