阿里云首页 金融分布式架构 SOFAStack

收发延时消息

本文提供使用 TCP 协议下的 Java SDK 收发延时消息的示例代码供您参考。

前提条件

您已完成以下操作:

背景信息

延时消息用于指定消息发送到消息队列的服务端后,延时一段时间才被投递到客户端进行消费(例如 3 秒后才被消费),适用于解决一些消息生产和消费有时间窗口要求的场景,或者通过消息触发延迟任务的场景,类似于延迟队列。

延时消息的概念介绍及使用过程中的注意事项,请参见 消息类型 > 定时和延时消息

说明

对于新手用户,建议在正式收发消息前,阅读 Demo 工程来了解搭建消息队列工程的具体步骤。

发送延时消息

具体的示例代码,请以 消息队列代码库 为准。

发送延时消息的示例代码如下。

import java.util.Properties;
import java.util.concurrent.TimeUnit;
import com.alipay.sofa.sofamq.client.PropertyKeyConst;
import io.openmessaging.api.Message;
import io.openmessaging.api.MessagingAccessPoint;
import io.openmessaging.api.OMS;
import io.openmessaging.api.OMSBuiltinKeys;
import io.openmessaging.api.Producer;
import io.openmessaging.api.SendResult;

public class DelayProducerTest {
    public static void main(String... args) {
        Properties credentials = new Properties();
        // AccessKeyId 阿里云身份验证,在阿里云服务器管理控制台创建
        credentials.setProperty(OMSBuiltinKeys.ACCESS_KEY, "$accessKey");
        // AccessKeySecret 阿里云身份验证,在阿里云服务器管理控制台创建
        credentials.setProperty(OMSBuiltinKeys.SECRET_KEY, "xxxxx");
        // 设置 TCP 接入域名,进入控制台的概览页面查看接入点配置
        MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").endpoint("$endpoint")
                .withCredentials(credentials).build();
        Properties properties = new Properties();
        // 设置用户实例,进入控制台的概览页面查看接入点配置
        properties.setProperty(PropertyKeyConst.INSTANCE_ID, "$instanceId");
        properties.setProperty(PropertyKeyConst.GROUP_ID, "YOUR_GROUP");
        Producer producer = accessPoint.createProducer(properties);
        producer.start();
        Message message = new Message("$topic", "YOUR_TAG", "hello world".getBytes());
        // 延时消息,单位毫秒(ms),在指定延迟时间(当前时间之后)进行投递,例如消息在 5 秒后投递
        message.setStartDeliverTime(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(5));
        SendResult sendResult = producer.send(message);
        System.out.println(sendResult);
    }
}

订阅延时消息

延时消息的订阅与普通消息订阅一致,详情请参见 订阅消息