当前位置 : 主页 > 编程语言 > java >

Redis数据类型8-9:GEO、Stream

来源:互联网 收集:自由互联 发布时间:2023-12-28
GEO GEO 是用来存储并操作地理位置信息的数据类型。 随着移动互联网时代到来, LBS 服务 (Location-Based Service) 愈发潮流,例如附近的建筑等。 GEO 就是为了解决 Redis 对位置信息的存储需求

GEO

GEO 是用来存储并操作地理位置信息的数据类型。

随着移动互联网时代到来,LBS服务 (Location-Based Service) 愈发潮流,例如附近的建筑等。

GEO 就是为了解决 Redis 对位置信息的存储需求而诞生的。

内部实现

GEO 内部使用 Sorted Set 集合类型,使用 [GeoHash](Geohash - Wikipedia) 编码方法实现了经纬度到 Sorted Set 中元素权重分数的转换。

将经纬度保存到 Sorted Set 中,利用 Sorted Set 提供的“按权重进行有序范围查找”的特性,实现 LBS 服务中频繁使用的“搜索附近”的需求。

# 向locations添加
GEOADD locations 116.034579 39.903628 114
# 从locations获取10个5km内目标
GEORADIUS locations 116.054579 39.040650 5 km ASC COUNT 10

Stream

Stream 是 Redis 专为 消息队列 设计的数据类型。

在 Stream 之前,基于原生 Redis 消息队列的实现方式都有着或多或少的缺陷:

  • 发布订阅模式,不能持久化也就无法可靠的保存消息,并且对于离线重连的客户端不能读取历史消息的缺陷;
  • List 实现消息队列的方式不能重复消费,一个消息消费完就会被删除,而且生产者需要自行实现全局唯一 ID。

基于此,Stream 用于完美地实现消息队列,它支持消息的持久化、支持自动生成全局唯一 ID、支持ack确认消息的模式、支持消费组模式。

常用命令

  • XADD:插入消息,保证有序,可以自动生成全局唯一 ID;
  • XLEN :查询消息长度;
  • XREAD:用于读取消息,可以按 ID 读取数据;
  • XDEL : 根据消息 ID 删除消息;
  • DEL :删除整个 Stream;
  • XRANGE :读取区间消息
  • XREADGROUP:按消费组形式读取消息;
  • XPENDING 和 XACK:
    • XPENDING 命令可以用来查询每个消费组内所有消费者「已读取、但尚未确认」的消息;
    • XACK 命令用于向消息队列确认消息处理已完成;

应用场景: 消息队列

普通消息队列

生产者向队列发布消息:

# * 代表由Redis自动生成一个全局唯一ID
> XADD mymq * name xiaolin
"1654254953808-0"
# 返回为生成的唯一ID,由"-"符号分为两部分
# 前一部分为数据插入时,以毫秒为单位计算的当前服务器时间
# 后一部分为当前毫秒的消息序号,从0开始编号

消费者读取消息:

# 从 ID 号为 1654254953807-0 的消息开始,读取后续的所有消息(示例中一共 1 条)
> XREAD STREAMS mymq 1654254953807-0
1) 1) "mymq"
   2) 1) 1) "1654254953808-0"
         2) 1) "name"
            2) "xiaolin"      
# 阻塞读取,表示10000毫秒阻塞读取最新消息($)
> XREAD BLOCK 10000 STREAMS mymq $
(nil)
(10.00s)

消费组

相较于 List,Stream 有自己新的概念:消费组。创建组之后,使用 XREADGROUP 命令让组内的消费者读取消息。

# 命令最后的参数“>”,表示从第一条尚未被消费的消息开始读取。
> XREADGROUP GROUP group1 consumer1 STREAMS mymq >
1) 1) "mymq"
   2) 1) 1) "1654254953808-0"
         2) 1) "name"
            2) "xiaolin"

消息队列中的消息一旦被消费组里的一个消费者读取了,就不能再被该消费组内的其他消费者读取了,即同一个消费组里的消费者不能消费同一条消息。

但是,**不同消费组的消费者可以消费同一条消息(但是有前提条件,创建消息组的时候,不同消费组指定了相同位置开始读取消息)**。

使用消费组的目的是让组内的多个消费者共同分担读取消息。

因此,通常会让每个消费者读取部分消息,从而实现消息读取的负载均衡。

消费者宕机重启

Streams 使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用 XACK 命令通知 “消息已经处理完成”。

如果消费者没有成功处理消息,它就不会给 Streams 发送 XACK 命令,消息仍然会留存。此时,消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。

消息丢失问题

消息队列分为三个大部分生产者、消费者和队列中间件。

逐一分析:

  • 生产者不会丢失消息,只要能正常收到 ack 确认响应,就表示发送成功,对异常进行重发就可以避免消息丢失。
  • 消费者也不会丢失消息,前文提到,Redis 在使用 Pending List 进行消费确认,所以消费者也不会丢失消息。
  • 队列中间件,在语句场景下是 Redis 会 丢失消息,主要在以下场景:
    • AOF持久化时异步写盘,宕机时会丢失消息。
    • 主从复制也是异步操作,也可能丢失消息。
上一篇:102. 二叉树的层序遍历
下一篇:没有了
网友评论