Redis 7.x 系列【14】数据类型之流(Stream)

有道无术,术尚可求,有术无道,止于术。

本系列Redis 版本 7.2.5

源码地址:https://gitee.com/pearl-organization/study-redis-demo

文章目录

    • 1. 概述
    • 2. 常用命令
      • 2.1 XADD
      • 2.2 XRANGE
      • 2.3 XREVRANGE
      • 2.4 XDEL
      • 2.5 XLEN
      • 2.6 XREAD
      • 2.7 XGROUP CREATE
      • 2.8 XACK
      • 2.9 XPENDING
    • 3. 应用场景

1. 概述

消息队列:是指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递,生产者产生消息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由服务端给其推送消息。

Redis 也支持消息队列功能,在 5.0 版本之前,基于以下两种方式实现:

  • Pub/Sub
  • List

Pub/Sub 发布订阅模式,消息的发送者不会将消息直接发送给特定的接收者,而是通过消息通道广播出去,让订阅该消息主题的订阅者消费到:

在这里插入图片描述
Pub/Sub 中的消息无法持久化,如果出现网络断开、宕机等,消息就会被丢弃。而且也没有 Ack 机制来保证数据的可靠性,假设一个消费者都没有,那消息就直接被丢弃了。

Redis List 也可以实现消息队列,按照插入顺序排序,可以添加一个元素到列表的头部(左边)或者尾部(右边)。 将需要延后处理的任务结构体序列化成字符串塞进 Redis 的列表,另一个线程从这个列表中轮询数据进行处理:
在这里插入图片描述
Redis List 同样存在诸多问题,比如,不支持多消费者模式,不支持延时消息,不支持优先级,不支持消息确认机制等等。

Redis Stream5.0 版本中引入的一种新的数据结构,用于实现简单但功能强大的消息传递模式。以时间序列的方式存储消息,每个消息都有一个唯一的 ID ,并且可以被多个消费者订阅和消费。是 Redis 实现消息队列的另外一种模式,支持消息的持久化、支持自动生成全局唯一 1D、支持 Ack 确认消息模式、支持消费组模式等,旨在让消息队列更加的稳定和可靠。

其结构图如下:
在这里插入图片描述
各部分解释:

  • Message Content:消息内容
  • Consumer group:消费组,通过 XGROUP CREATE 命令创建,同一个消费组可以有多个消费者
  • Last_delivered_id:游标,每个消费组会有个游标 Last_delivered_id,任意一个消费者读取了消息都会使游标往前移动。
  • Consumer:消费者,消费组中的消费者
  • Pending_ ids:消费者会有一个状态变量,用于记录被当前消费已读取但未 ack 的消息 Id ,如果客户端没有 ack ,这个变量里面的消息 ID 会越来越多,一旦某个消息被 ack 它就开始减少。这个 pending_ids 变量在 Redis 官方被称之为 PEL(Pending Entries List),记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符),它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理

2. 常用命令

Stream 相关所有命令:

命名描述
XACK确认消费者已经成功处理从 Stream 中获取的消息
XADD添加消息到队列末尾
XAUTOCLAIM转移符合指定条件的待处理流条目的所有权
XCLAIM改变待处理消息的所有权
XDEL删除消息
XGROUP CREATE为存储在 key 的流创建一个新的消费者组
XGROUP CREATECONSUMER要在存储在key的流的消费者组中创建一个消费者
XGROUP DELCONSUMER消费者组中删除一个消费者
XGROUP DESTROY删除一个已存在的消费者组
XGROUP SETID为消费者组设置最后传递的ID
XINFO CONSUMERS返回消费者组中的消费者列表
XINFO GROUPS返回消费者组列表
XINFO STREAM存储在的key流的相关信息
XLEN获取 Stream 中的消息长度
XPENDING通过消费者组从流中获取数据但不确认这些数据,会产生待处理条目
XRANGE获取消息列表(可以指定范围)
XREAD获取消息(阻塞/非阻塞),返回大于指定 ID 的消息
XREADGROUPXREAD命令的一个特殊版本,支持消费者组
XREVRANGEXRANGE 相比区别在于反向获取,ID从大到小
XSETID内部命令。它用于主节点来复制流的最后传递的ID
XTRIM限制 Stream 的长度,如果已经超长会进行截取

2.1 XADD

XADD 命令用于向 Stream(流)数据结构末尾添加消息。

语法格式:

XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]

参数说明:

  • key:指定要添加消息的 Stream 的名称。
  • [NOMKSTREAM]:可选参数,用于指定当流不存在时是否报错。默认情况下,如果指定的流不存在,XADD命令会创建。如果使用NOMKSTREAM选项,则流不存在时命令会失败。
  • [MAXLEN|MINID [=|~] threshold [LIMIT count]]:这组选项用于控制流的最大长度或最小消息 ID
    • MAXLEN maxlen:限制 Stream 的最大长度。当长度达到maxlen时,旧的消息会被自动删除。
    • MINID minid:指定最旧的消息ID。当插入新消息时,如果已经存在比minid更旧的消息,则会将这些消息删除。
    • [=|~]:操作符,=表示精确匹配,~表示小于等于(对于MINID而言)或大于等于(对于MAXLEN而言)。
    • [LIMIT count]:当使用MAXLEN~时,指定需要保留的消息数量的最小值。
  • *|ID:消息的ID。使用*表示自动生成一个唯一的ID。如果不使用*,则需要提供一个有效的消息ID,它必须大于流中所有已存在的消息的ID
  • field value [field value ...]:消息的字段和值。可以指定一个或多个字段及其对应的值。

示例,插入消息:

localhost:0>XADD mystream * msg_1 100 msg_2 38
"1719279960591-0"

示例, 插入消息,并限制长度不超过 1000 条:

localhost:0>XADD mystream MAXLEN 1000 * msg_3 100 msg_4 38
"1719279971749-0"

查看控制台:

在这里插入图片描述

2.2 XRANGE

XRANGE 命令用于获取指定范围内的消息。

命令格式:

XRANGE key start end [COUNT count]

参数说明:

  • key:指定 Streamkey
  • start:指定要检索的消息范围的起始 ID 。可以使用特殊值-来表示最小值。
  • end:指定要检索的消息范围的结束 ID 。可以使用特殊值+来表示最大值。
  • [COUNT count]:可选参数,用于限制返回的消息数量。

注意事项:

  • Stream 的消息 ID 由两部分组成:一个时间戳和一个序列号。时间戳表示消息被添加到 Stream 的时间,而序列号则用于在同一时间戳内区分不同的消息。
  • XRANGE 命令返回的消息是按照它们在 Stream 中的顺序排列的,即按照消息 ID 的顺序。
  • 如果在检索消息时使用了 COUNT 参数,但指定的范围内的消息数量少于 COUNT 指定的数量,那么只会返回范围内的所有消息。

示例,检索所有消息:

localhost:0>XRANGE mystream - +
 1)    1)   "1719279960591-0"
  2)      1)    "msg_1"
   2)    "100"
   3)    "msg_2"
   4)    "38"
 2)    1)   "1719279971749-0"
  2)      1)    "msg_3"
   2)    "100"
   3)    "msg_4"
   4)    "38"

示例,检索特定范围内的消息:

localhost:0>XRANGE mystream  1719279960591-0 1719279960600-0
 1)    1)   "1719279960591-0"
  2)      1)    "msg_1"
   2)    "100"
   3)    "msg_2"
   4)    "38"

示例,限制返回的消息数量:

localhost:0>XRANGE mystream - + COUNT 1
 1)    1)   "1719279960591-0"
  2)      1)    "msg_1"
   2)    "100"
   3)    "msg_2"
   4) 

2.3 XREVRANGE

XREVRANGE 命令与 XRANGE 命令类似,但它是按照消息 ID 的递减顺序(用于反向)获取指定范围内的消息。

命令格式:

XREVRANGE key end start [COUNT count]

示例,检索最后两个时间序列的消息:


localhost:0>XREVRANGE mystream + - COUNT 2
 1)    1)   "1719279971749-0"
  2)      1)    "msg_3"
   2)    "100"
   3)    "msg_4"
   4)    "38"

 2)    1)   "1719279960591-0"
  2)      1)    "msg_1"
   2)    "100"
   3)    "msg_2"
   4)    "38"

2.4 XDEL

XDEL 命令用于从 Stream 中删除指定的消息。返回一个整数,表示被成功删除的消息数量。

命令格式:

XDEL key ID [ID ...]

参数说明:

  • key:指定 Streamkey
  • ID:一个或多个要删除的消息的 ID

注意事项:

  • 在使用 XDEL 命令时,你需要确保提供的消息 ID 是存在的,否则命令将不会删除任何消息,并返回0。
  • 可以通过一次 XDEL 命令删除多个消息,只需在命令中提供多个消息 ID 即可。
  • XDEL 命令不会改变 Stream 的其他消息的顺序或 ID

示例,删除消息:

localhost:0>XDEL mystream 1719280747405-0
"1"

2.5 XLEN

XLEN 命令用于获取指定 Stream 中包含的消息数量,即流的长度。如果 Stream 不存在或为空,则返回 0

命令格式:

XLEN key

示例:

localhost:0>XLEN mystream
"1"

2.6 XREAD

XREAD 命令是用于从 Stream 独立消费消息,支持阻塞等待新消息的到来。返回一个数组,其中每个元素都是一个包含 Stream key 和消息列表的数组。消息列表是一个包含消息 ID 和消息数据的数组。

命令格式:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

参数说明:

  • COUNT count:指定一次读取的最大消息数量。如果未指定,则默认为 1
  • BLOCK milliseconds:用于指定阻塞的时间(以毫秒为单位)。如果指定了此参数,并且 Stream 中没有可消费的消息,客户端将在指定的时间内阻塞等待。如果设置为 0 ,则表示非阻塞模式,即如果没有消息可消费,则立即返回。
  • STREAMS key [key ...]:指定要从中读取消息的 Streamkey 。可以指定一个或多个。
  • ID [ID ...]:对于每个指定的 key ,可以提供一个或多个消息 ID 。这些 ID 用于指定从哪个位置开始读取消息。如果某个 key 后面没有指定 ID ,则默认为从该 Stream 的最新消息开始读取。

示例,非阻塞模式读取最新消息:

XREAD COUNT 1 STREAMS mystream $

示例,阻塞模式,读取最新消息并等待新消息:

XREAD COUNT 1 BLOCK 10000 STREAMS mystream $

2.7 XGROUP CREATE

XGROUP CREATE 命令用于在已存在的流(stream)上创建一个新的消费者组(consumer group)。消费者组允许多个消费者(consumer)协作消费同一个流中的数据,并且每个消费者都可以从自己的位置开始读取流。

命令格式:

XGROUP CREATE <key> <groupname> <id> [MKSTREAM] [MKTABLE] [CREATECONSUMER <consumername>]

参数说明:

  • <key>:流的名称。
  • <groupname>:消费者组的名称。
  • <id>:消费者组初始的最后一个条目 ID ,即消费者组开始读取的起始点。可以使用$表示流的最新条目,或者使用0表示流的起始点,或者使用任何其他有效的 ID
  • [MKSTREAM]:可选参数,如果流不存在,则创建它。
  • [MKTABLE]:在 Redis 6.2 及更高版本中引入的可选参数,用于创建与流关联的二级索引表(secondary index table)。这通常用于支持基于特定字段的查询。
  • [CREATECONSUMER <consumername>]:在 Redis 6.2 及更高版本中引入的可选参数,用于在创建消费者组时同时创建一个消费者。

示例,创建一个新的消费者组,从流的最新条目开始读取:

localhost:0>XGROUP CREATE mystream mygroup $ MKSTREAM
"OK"

2.8 XACK

XACK 命令用于确消费者已经成功处理了一个或多个消息。这些消息通常是从流(Stream)中读取的,并存储在消费者组的待处理条目列表(Pending Entry ListPEL)中。通过发送 XACK 命令,消费者通知 Redis 服务器它已经完成了一个或多个消息的处理,从而将这些消息从 PEL 中移除。

命令格式:

XACK <key> <groupname> <consumername> <ID> [<ID> ...]

参数说明:

  • <key>:流的名称。
  • <groupname>:消费者组的名称。
  • <consumername>:消费者的名称。
  • <ID>:要确认的消息的ID,可以指定一个或多个。

示例,假设消费者已经读取了一些消息,并决定它们已经被成功处理。现在,消费者想要确认这些消息:

XACK mystream mygroup myconsumer 1526569900000-0 1526569900002-0

在这个例子中,消费者确认了两个消息,它们的 ID 分别是 1526569900000-01526569900002-0

一旦消息被确认,它们将从该消费者组的 PEL 中移除,表示这些消息已经被成功处理。注意,即使消息被确认并从 PEL 中移除,它们仍然保留在流中,并且可以被其他消费者组或消费者读取。

如果消费者在处理消息时失败,或者需要稍后重试,它可以选择不发送 XACK 命令,这样消息将保持在 PEL 中,直到消费者准备好确认它们或它们因超时而被自动从 PEL 中移除(取决于消费者组的配置)。

2.9 XPENDING

XPENDING 命令用于查询消费者组中未确认消息的详细信息。允许你查看哪些消息正在等待被处理,以及哪些消费者拥有这些消息。

命令格式:

XPENDING <key> <groupname> [start end count] [consumername]

参数说明:

  • <key>:流的名称。
  • <groupname>:消费者组的名称。
  • [start end count]:这三个参数是可选的,用于限制查询结果的范围。
  • start:查询的开始消息ID
  • end:查询的结束消息ID
  • count:要返回的消息数量。
  • [consumername]:可选参数,指定要查询的消费者的名称。如果不提供此参数,将返回消费者组中的所有未确认消息。

XPENDING 命令返回一个数组,其中包含以下信息:

  • 总未确认消息数:整数,表示在指定范围内未确认的消息总数。
  • 最小消息ID:字符串,表示在指定范围内未确认消息的最小ID
  • 最大消息ID:字符串,表示在指定范围内未确认消息的最大ID
  • 每个消费者的未确认消息:一个数组,其中每个元素都是一个包含消费者名称和该消费者拥有的未确认消息数的数组。

注意事项:

  • XPENDING 是一个只读命令,它不会修改任何数据。
  • 如果提供了 consumername 参数,则只返回该消费者的未确认消息信息。
  • 如果提供了 [start end count] 参数,则只返回指定范围内的未确认消息信息。
  • 通过 XPENDING 命令,你可以轻松地监控消费者组中的未确认消息,从而确保消息得到及时处理,并在必要时进行故障排除。

示例:

XPENDING mystream mygroup
2) "1526569900000-0"  # 最小消息ID  
3) "1526569900002-0"  # 最大消息ID  
4) 1) 1) "myconsumer" # 消费者名称  
     2) (integer) 2   # 该消费者拥有的未确认消息数

3. 应用场景

Redis Stream 主要用于消息队列,所以可以用来解决应用解耦,异步消息,流量削峰等问题,实现高性能,高可用,可伸缩和最终一致性架构。但是更推荐使用专业的消息队列,比如RabbitMQRocketMQ等,对于简单的应用场景,如果能满足需求,也可以使用Redis Stream

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/764797.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

240702_昇思学习打卡-Day14-基于MindNLP+MusicGen生成自己的个性化音乐

240702_昇思学习打卡-Day14-基于MindNLPMusicGen生成自己的个性化音乐 前面一致做的都是图像的&#xff0c;可视化比较强&#xff0c;可以有比较多的图片帮助理解&#xff0c;但今天这个是关于音频的生成&#xff0c;基本只有干巴巴的代码&#xff0c;我尽量描述清楚些。相关研…

Python语言接入关键词搜索商品api疑点解析

接入关键词搜索商品API通常需要以下步骤&#xff1a; 了解API文档&#xff1a;首先&#xff0c;你需要阅读API的文档&#xff0c;了解API的基本功能、请求方式&#xff08;GET、POST等&#xff09;、请求参数、返回数据格式等信息。 安装必要的库&#xff1a;根据API的要求&am…

文件销毁是一件非常重要的事情分享一下我是如何安全、环保地处理

如何安全有效地销毁文件&#xff1a;一份详尽指南 在信息爆炸的时代&#xff0c;文件的生成、存储与处理已成为日常生活和工作中不可或缺的一部分。然而&#xff0c;随着数据量的激增&#xff0c;如何妥善管理并最终安全销毁不再需要的文件&#xff0c;成为了一个日益重要的议…

ListBox自动滚动并限制显示条数

1、实现功能 限制ListBox显示的最大条数&#xff1b; ListBox自动滚动&#xff0c;显示最新行&#xff1b; 2、C#代码 using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.IO; using Syst…

JSP实现简单的登录和注册

JSP实现登录和注册&#xff08;Map集合模拟数据库&#xff09; 1、login.jsp2、 loginSelect.jsp3、register.jsp4、 RegisterSelect.jsp5、 index.jsp 1、login.jsp login.jsp中username和password在LoginSelect.jsp验证是否一致使用session.setAttribute("login_msg&quo…

职场小白必备待办工具有哪些 适合新手的待办app

初入职场的小白们&#xff0c;常常会遇到各种挑战。从最初的迷茫&#xff0c;到对工作的逐步熟悉&#xff0c;每一步都需要时间和精力的投入。尤其是当面对繁多的工作任务时&#xff0c;如何快速有效地完成它们&#xff0c;成为了许多职场新人需要面对的问题。 在这个快节奏的…

nginx.conf的配置文件

nginx.conf 1.全局模块 worker_processes 1 工作进程数&#xff0c;设置成服务器内核数的2倍&#xff08;一般不超过8个&#xff0c;超过8个会降低性能4个 1-2个&#xff09; 处理进程的过程必然涉及配置文件和展示页面&#xff0c;也就是涉及打开文件的数量。 linux默认打…

Ceyear®VSA 信号分析软件

CeyearVSA 信号分析软件 CeyearVSA 矢量信号分析软件 CeyearVSA 矢量信号分析软件将信号分析体验和测试应用于桌面&#xff0c;帮助排查问题并优化设计。 CeyearVSA 矢量信号分析软件结合仪表支持在线解调分析&#xff0c;也可支持信号导入离线分析&#xff1b;软件具有多种…

Python实现万花筒效果:创造炫目的动态图案

文章目录 引言准备工作前置条件 代码实现与解析导入必要的库初始化Pygame定义绘制万花筒图案的函数主循环 完整代码 引言 万花筒效果通过反射和旋转图案创造出美丽的对称图案。在这篇博客中&#xff0c;我们将使用Python来实现一个动态的万花筒效果。通过利用Pygame库&#xf…

mac有什么解压软件 mac怎么解压7z软件 苹果电脑好用的压缩软件有哪些

众所周知&#xff0c;macOS集成解压缩软件归档实用工具&#xff0c;可直接解压zip格式的压缩包。但对于其他比较常见的格式&#xff0c;诸如RAR、7z、TAR等&#xff0c;则无能为力&#xff0c;不过&#xff0c;我们可以选择大量第三方解压缩软件&#xff0c;帮助我们更好地完成…

数据库取出来的日期格式是数组格式,序列化日期格式

序列化前&#xff0c;如图所示&#xff1a; 解决方式&#xff0c;序列化日期&#xff08;localdatetime&#xff09;格式 步骤一、添加序列化类 package com.abliner.test.common.configure;import com.alibaba.fastjson.serializer.JSONSerializer; import com.alibaba.fas…

虚拟纪念展馆建设的重大意义:重新定义纪念活动的未来

一、什么是虚拟纪念展馆&#xff1f; 虚拟纪念展馆是一种利用3D、VR等技术在线展示历史事件、人物或文化遗产的数字化空间。这些展馆通过虚拟现实、增强现实和3D建模等技术手段&#xff0c;创建出身临其境的体验&#xff0c;使参观者可以在互联网上以互动方式探索和学习。 二、…

最快33天录用!一投就中的医学4区SCI,几乎不退稿~

【SciencePub学术】今天小编给大家推荐2本生物医学领域的SCI&#xff0c;此期刊为我处目前合作的重点期刊&#xff01;影响因子0-3.0之间&#xff0c;最重要的是审稿周期较短&#xff0c;对急投的学者较为友好&#xff01; 医学医药类SCI 01 / 期刊概况 【期刊简介】IF&…

2025年U.S.News世界大学排名前200榜单

近日&#xff0c;U.S. News公布了2025全球最佳院校排名&#xff0c;作为公认的四大世界高校排行榜&#xff0c;该排名主要围绕着学术声誉、学术成果等&#xff0c;因此备受访问学者、联合培养博士生及博士后申请者们青睐&#xff0c;知识人网小编特作介绍并发布排名前200的榜单…

Springboot整合Redis以及业务工具类示例

docker安装Redis参考我另一篇博客Docker安装Redis及持久化 一、Get-Started 依赖 <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-data-redis --> <dependency><groupId>org.springframework.boot</groupId>…

浏览器自动填充登录用户名和密码,如何清除

文章目录 刷新网页的时候浏览器会自动填充用户名和密码刷新之后效果图解决方案完整的login.vue代码核心代码原理(添加 readonly 和监听 focus 事件) 刷新网页的时候浏览器会自动填充用户名和密码 刷新之后效果图 解决方案 完整的login.vue代码 <template><div class…

MATLAB将两个折线图画在一个图里

界面如图 输入行数和列数&#xff0c;点击开始填入数据&#xff0c;其中第一列为x值&#xff0c;后面几列&#xff0c;每一列都是y坐标值&#xff0c;填好后点击画在同一张图里即可。点击置零就把所有数变成0&#xff0c;另外也可以选择节点样式。 .mlapp格式的文件如下 夸克…

助力消费品牌,打造3D立体互动购物体验!

随着3D、VR、AR等技术的飞跃发展&#xff0c;传统网络购物的界限被彻底打破&#xff0c;消费者不再局限于文字、图片或视频的二维体验&#xff0c;而是能够沉浸于3D构建的立体世界中&#xff0c;享受3D看房、虚拟逛街的全新购物模式。 51建模网作为国内3D互动展示领域的佼佼者…

如果这时你还不清理C盘,那只能眼睁睁看着电脑越来越卡 直到系统崩溃

如果这时候你还不清理C盘&#xff0c;那只能眼睁睁看着电脑越来越卡 直到系统崩溃。很多人就是想偷懒&#xff0c;当然这是人的天性&#xff0c;明明知道自己的C盘空间就那么大&#xff0c;一天天看着C盘空间越来越小&#xff0c;还不去清理C盘。 这样的人有两种&#xff0c;一…

Hadoop3:Yarn常用Shell命令

一、查看任务 1、查看所有任务 yarn application -list2、根据状态查看任务 语法 yarn application -list -appStates &#xff08;所有状态&#xff1a;ALL、NEW、NEW_SAVING、SUBMITTED、ACCEPTED、RUNNING、FINISHED、FAILED、KILLED&#xff09;例如 yarn application…