文档

延时消息

更新时间:
一键部署

如果您希望消息被投递后延迟一段时间被消费者消费,您可以使用云消息队列 RabbitMQ 版的延时消息。云消息队列 RabbitMQ 版原生支持延时消息,使用方式比开源RabbitMQ更简单。

什么是延时消息

延时消息是指在指定时间段之后才被消费者消费的消息。

应用场景

延时消息适用于以下场景:

  • 对消息生产和消费有时间窗口要求的场景。例如,在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在30分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略。

  • 通过消息触发延时任务的场景。例如,在指定时间段之后向用户发送提醒消息。

延时时间设置规则

  • 延时时间的值必须为非负整数。单位为毫秒。

  • 若延时时间超过延时消息最大延时时间,则当作普通消息处理,即消息发送后被立即投递给消费者消费。最大延时时间根据实例类型不同有所差异,具体数值,请参见集群限制

  • 若您为延时消息设置了消息存活时间,则延时消息的实际存活时间=min{消息级存活时间, Queue级别存活时间}+延时时间。消息存活时间的具体信息,请参见消息存活时间

方案对比

云消息队列 RabbitMQ 版和开源RabbitMQ支持的延时消息实现方案对比如下:

项目

开源RabbitMQ

云消息队列 RabbitMQ 版

死信Exchange+Queue的消息存活时间

支持

支持

死信Exchange+消息的消息存活时间

支持

支持

开源延时消息插件方案

支持

支持

原生延时消息方案(推荐)

不支持

支持

原生延时消息方案

云消息队列 RabbitMQ 版通过对消息设置delay来实现延时效果。云消息队列 RabbitMQ 版原生延时消息的流转过程如下:

  1. 生产者向Exchange发布设置了delay属性的消息。

  2. Exchange将消息路由至Queue。

  3. 在设置的delay时间到期后,消费者才能从Queue消费消息。

原生延时消息最佳实践

  • 生产者客户端

    云消息队列 RabbitMQ 版原生延时消息的使用方式非常简单。您只需要在生产者客户端发布消息时,通过delay为消息设置一个延时时间。

    发布延时消息的Java示例代码如下:

    Map<String, Object> headers = new HashMap<>();
    headers.put("delay", "5000");//表示消息延时5000毫秒。
    AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).headers(headers).build();

    更多语言示例代码,请参见AMQP Demos

  • 消费者客户端

    为保证延时消息时效性,建议您在消费消息时使用push模式的basic.consume方法,而不要使用pull模式的basic.get方法。因为云消息队列 RabbitMQ 版的消息是分布式存储的,如果您使用pull模式的basic.get方法获取消息,并不能保证正好从存储的节点获取消息。

开源延时消息插件方案

为了减少与开源RabbitMQ的差别,云消息队列 RabbitMQ 版也基于原生的延时消息支持使用开源插件式的方式来使用延时消息,并免去插件的安装。具体使用流程如下:

  1. 声明x-delayed-message类型的Exchange,并填写该Exchange的扩展参数x-delayed-type以指定Exchange的路由类型。示例如下:

    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-delayed-type", "direct");
    channel.exchangeDeclare("ExchangeName", "x-delayed-message", true, false, args);

    参数说明如下:

    参数

    说明

    x-delayed-type

    Exchange的类型,指定路由规则。取值说明如下:

    • direct

    • fanout

    • topic

    • headers

    • x-jms-queue

    • x-jms-topic

    ExchangeName

    Exchange的名称。

    说明

    请确保声明的Exchange已存在。具体步骤,请参见Exchange管理

    x-delayed-message

    指定Exchange类型,以支持投递延时消息。

  2. 发送延时消息。在消息的Header属性中增加一个键为x-delay,值为毫秒数的键值对,并且指定发送的目标Exchange为上一步已声明的Exchange。示例如下:

    byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");
    Map<String, Object> headers = new HashMap<String, Object>();
    headers.put("x-delay", 5000);//表示消息延时5000毫秒。
    AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
    channel.basicPublish("ExchangeName", "", props.build(), messageBodyBytes);

常见问题

为什么实际的延时时间大于设置的延时时间?

因为客户端使用了pull模式的basic.get方法消费消息。云消息队列 RabbitMQ 版的消息是集群存储的,使用pull模式的basic.get方法路由到一台Broker时,可能无法及时拉取存储在其他Broker上的消息。

  • 本页导读 (1)
文档反馈