PolarDB提供消息排队和消息处理。用户定义的消息存储在队列中;队列的集合存储在队列表中。DBMS_AQADM包中的存储过程创建并管理消息队列和队列表。使用DBMS_AQ包可添加消息到队列或者从队列中删除消息,或者注册或注销PL/SQL回调过程。

PolarDB还通过如下SQL命令为DBMS_AQ 包提供扩展(不兼容)功能。

  • ALTER QUEUE
  • ALTER QUEUE TABLE
  • CREATE QUEUE
  • CREATE QUEUE TABLE
  • DROP QUEUE
  • DROP QUEUE TABLE

DBMS_AQ 包为您提供让消息入队/出队和管理回调过程的过程。支持的存储过程包括:

函数/存储过程 返回类型 说明
ENQUEUE n/a 发布消息到队列。
DEQUEUE n/a 如果有消息可用或者在消息可用时,从队列检索消息。
REGISTER n/a 注册回调过程。
UNREGISTER n/a 注销回调过程。

PolarDB的DBMS_AQ 实施与 Oracle 的版本相比是部分实施。仅支持上表中列出的那些存储过程。

PolarDB支持使用下面列出的常量:

常量 说明 用于参数
DBMS_AQ.BROWSE (0) 读取消息而不锁定。 dequeue_options_t.dequeue_mode
DBMS_AQ.LOCKED (1) 此常量已定义,如果使用会返回错误。 dequeue_options_t.dequeue_mode
DBMS_AQ.REMOVE (2) 读取之后删除消息;该参数为默认值。 dequeue_options_t.dequeue_mode
DBMS_AQ.REMOVE_NODATA (3) 此常量已定义,如果使用会返回错误。 dequeue_options_t.dequeue_mode
DBMS_AQ.FIRST_MESSAGE (0) 返回与搜索词匹配的第一个可用消息。 dequeue_options_t.navigation
DBMS_AQ.NEXT_MESSAGE (1) 返回与搜索词匹配的下一个可用消息。 dequeue_options_t.navigation
DBMS_AQ.NEXT_TRANSACTION (2) 此常量已定义,如果使用会返回错误。 dequeue_options_t.navigation
DBMS_AQ.FOREVER (0) 如果找不到与搜索词匹配的消息,则持续等待,该参数为默认值。 dequeue_options_t.wait
DBMS_AQ.NO_WAIT (1) 如果找不到与搜索词匹配的消息,则不等待。 dequeue_options_t.wait
DBMS_AQ.ON_COMMIT (0) 出队是当前事务的一部分。 enqueue_options_t.visibility,dequeue_options_t.visibility
DBMS_AQ.IMMEDIATE (1) 此常量已定义,如果使用会返回错误。 enqueue_options_t.visibility,dequeue_options_t.visibility
DBMS_AQ.PERSISTENT (0) 此消息应存储在表中。 enqueue_options_t.delivery_mode
DBMS_AQ.BUFFERED (1) 此常量已定义,如果使用会返回错误。 enqueue_options_t.delivery_mode
DBMS_AQ.READY (0) 指定消息已经准备好进行处理。 message_properties_t.state
DBMS_AQ.WAITING (1) 指定消息正在等待处理。 message_properties_t.state
DBMS_AQ.PROCESSED (2) 指定消息已处理。 message_properties_t.state
DBMS_AQ.EXPIRED (3) 指定消息处于异常队列中。 message_properties_t.state
DBMS_AQ.NO_DELAY (0) 此常量已定义,如果使用会返回错误。 message_properties_t.delay
DBMS_AQ.NEVER (NULL) 此常量已定义,如果使用会返回错误。 message_properties_t.expiration
DBMS_AQ.NAMESPACE_AQ (0) 接受来自 DBMS_AQ 队列的通知。 sys.aq$_reg_info.namespace
DBMS_AQ.NAMESPACE_ANONYMOUS (1) 此常量已定义,如果使用会返回错误。 sys.aq$_reg_info.namespace

ENQUEUE

ENQUEUE 存储过程将一个条目添加到队列。特征为:

ENQUEUE(
  queue_name IN VARCHAR2,
  enqueue_options IN DBMS_AQ.ENQUEUE_OPTIONS_T,
  message_properties IN DBMS_AQ.MESSAGE_PROPERTIES_T,
  payload IN <type_name>,
  msgid OUT RAW)

参数

  • queue_name

    现有队列的名称(可能是 schema 限定的)。如果您省略 schema 名称,则服务器将使用在 SEARCH_PATH 中指定的 schema。请注意,与 Oracle 不同,不带引号的标识符在存储之前将转换为小写。要包括特殊字符或者使用区分大小写的名称,请在双引号中引起名称。

  • enqueue_options

    enqueue_options 是类型为 enqueue_options_t 的值:

    DBMS_AQ.ENQUEUE_OPTIONS_T IS RECORD(
      visibility BINARY_INTEGER DEFAULT ON_COMMIT,
      relative_msgid RAW(16) DEFAULT NULL,
      sequence_deviation BINARY INTEGER DEFAULT NULL,
      transformation VARCHAR2(61) DEFAULT NULL,
      delivery_mode PLS_INTEGER NOT NULL DEFAULT PERSISTENT);

    目前,enqueue_options_t 唯一支持的参数值为:

    visibility ON_COMMIT
    delivery_mode PERSISTENT
    sequence_deviation NULL
    transformation NULL
    relative_msgid NULL
  • message_properties

    message_properties 是类型为 message_properties_t 的值:

          message_properties_t IS RECORD(
        priority INTEGER,
        delay INTEGER,
        expiration INTEGER,
        correlation CHARACTER VARYING(128) COLLATE pg_catalog.”C”,
        attempts INTEGER,
        recipient_list“AQ$_RECIPIENT_LIST_T”,
        exception_queue CHARACTER VARYING(61) COLLATE pg_catalog.”C”,
        enqueue_time TIMESTAMP WITHOUT TIME ZONE,
          state INTEGER,
         original_msgid BYTEA,
          transaction_group CHARACTER VARYING(30) COLLATE pg_catalog.”C”,
          delivery_mode INTEGER
        DBMS_AQ.PERSISTENT);

    message_properties_t支持的值如下:

    参数 说明
    priority 如果队列表定义包括 sort_list 并引用了 priority,则此参数影响消息出队的顺序。较低的值指示较高的出队优先级。
    delay 指定消息可用于出队之前将经过的秒数,或者NO_DELAY。
    expiration 使用 expiration 参数指定消息过期的秒数。
    correlation 使用 correlation 指定将与条目关联的消息;默认值为 NULL。
    attempts 这是系统维护的值,指定消息出队的尝试次数。
    recipient_list 不支持此参数。
    exception_queue 使用 exception_queue 参数指定异常队列的名称,如果消息过期或者由回退太多次数的事务出队,则消息将移动到该队列。
    enqueue_time enqueue_time 是记录添加到队列的时间;此值由系统提供。
    state 此参数由 DBMS_AQ 维护,状态可以为:
    • DBMS_AQ.READY – 未达到延迟。
    • DBMS_AQ.WAITING – 队列条目已准备好处理。
    • DBMS_AQ.PROCESSED – 队列条目已处理。
    • DBMS_AQ.EXPIRED – 队列条目已移动到异常队列。
    original_msgid 为了实现兼容性而支持此参数,忽略此参数。
    transaction_group 为了实现兼容性而支持此参数,忽略此参数。
    delivery_mode 不支持此参数;指定 DBMS_AQ.PERSISTENT 的值。
  • payload

    使用 payload 参数提供将与队列条目关联的数据。有效负载类型必须与创建对应的队列表时指定的类型匹配(参见DBMS_AQADM.CREATE_QUEUE_TABLE)。

  • msgid

    使用 msgid 参数检索唯一(系统生成的)消息标识符。

示例

以下匿名块调用 DBMS_AQ.ENQUEUE,将消息添加到名为 work_order 的队列:

DECLARE

  enqueue_options    DBMS_AQ.ENQUEUE_OPTIONS_T;
  message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
  message_handle     raw(16);
  payload            work_order;

BEGIN

  payload := work_order('Smith', 'system upgrade');

DBMS_AQ.ENQUEUE(
  queue_name         => 'work_order',
  enqueue_options    => enqueue_options,
  message_properties => message_properties,
  payload            => payload,
  msgid              => message_handle
    );
 END;

DEQUEUE

DEQUEUE 存储过程让消息出队。特征为:

DEQUEUE(
  queue_name IN VARCHAR2,
  dequeue_options IN DBMS_AQ.DEQUEUE_OPTIONS_T,
  message_properties OUT DBMS_AQ.MESSAGE_PROPERTIES_T,
  payload OUT type_name,
  msgid OUT RAW)

参数

  • queue_name

    现有队列的名称(可能是 schema 限定的)。如果您省略 schema 名称,则服务器将使用在 SEARCH_PATH 中指定的 schema。请注意,与 Oracle 不同,不带引号的标识符在存储之前将转换为小写。要包括特殊字符或者使用区分大小写的名称,请在双引号中引起名称。

  • dequeue_options

    dequeue _options 是类型为 dequeue_options_t 的值:

    DEQUEUE_OPTIONS_T IS RECORD(
      consumer_name CHARACTER VARYING(30),
      dequeue_mode INTEGER,
      navigation INTEGER,
      visibility INTEGER,
      wait INTEGER,
      msgid BYTEA,
      correlation CHARACTER VARYING(128),
      deq_condition CHARACTER VARYING(4000),
      transformation CHARACTER VARYING(61),
      delivery_mode INTEGER);

    目前,dequeue_options_t 支持的参数值为:

    参数 说明
    consumer_name 必须为 NULL。
    dequeue_mode 出队操作的锁定行为。必须为:
    • DBMS_AQ.BROWSE – 读取消息而不获取锁定。
    • DBMS_AQ.LOCKED – 获取锁定之后读取消息。
    • DBMS_AQ.REMOVE – 删除消息之前读取消息。
    • DBMS_AQ.REMOVE_NODATA – 读取消息,但不删除消息。
    navigation 标识将检索的消息。必须为:
    • FIRST_MESSAGE – 队列中与搜索词匹配的第一条消息。
    • NEXT_MESSAGE – 与第一个词语匹配的下一条可用消息。
    visibility 必须为 ON_COMMIT – 如果您回退当前事务,出队项目将保持在队列中。
    wait 必须为大于 0 的数字,或者:
    • DBMS_AQ.FOREVER – 无限期等待。
    • DBMS_AQ.NO_WAIT – 不等待。
    msgid 将出队消息的 ID。
    correlation 为了实现兼容性而提供的支持,将被忽略。
    deq_condition 一个 VARCHAR2 表达式,求值为 BOOLEAN 值,指示消息是否应出队。
    transformation 为了实现兼容性而提供的支持,将被忽略。
    delivery_mode 必须为 PERSISTENT;此时不支持缓冲的消息。
  • message_properties

    message_properties 是类型为 message_properties_t 的值:

        message_properties_t IS RECORD(
        priority INTEGER,
        delay INTEGER,
        expiration INTEGER,
        correlation CHARACTER VARYING(128) COLLATE pg_catalog.”C”,
        attempts INTEGER,
        recipient_list“AQ$_RECIPIENT_LIST_T”,
        exception_queue CHARACTER VARYING(61) COLLATE pg_catalog.”C”,
        enqueue_time TIMESTAMP WITHOUT TIME ZONE,
        state INTEGER,
        original_msgid BYTEA,
        transaction_group CHARACTER VARYING(30) COLLATE pg_catalog.”C”,
        delivery_mode INTEGER
      DBMS_AQ.PERSISTENT);

    message_properties_t 支持的值为:

    参数 说明
    priority 如果队列表定义包括 sort_list 并引用了 priority,则此参数影响消息出队的顺序。较低的值指示较高的出队优先级。
    delay 指定消息在可以出队或 NO_DELAY 之前将经过的秒数。
    expiration 使用 expiration 参数指定消息过期的秒数。
    correlation 使用 correlation 指定将与条目关联的消息;默认值为 NULL。
    attempts 这是系统维护的值,指定消息出队的尝试次数。
    recipient_list 不支持此参数。
    exception_queue 使用 exception_queue 参数指定异常队列的名称,如果消息过期或者由回退太多次数的事务出队,则消息将移动到该队列。
    enqueue_time enqueue_time 是记录添加到队列的时间;此值由系统提供。
    state 此参数由 DBMS_AQ 维护,状态可以为:
    • DBMS_AQ.WAITING – 未达到延迟。
    • DBMS_AQ.READY – 队列条目已准备好处理。
    • DBMS_AQ.PROCESSED – 队列条目已处理。
    • DBMS_AQ.EXPIRED – 队列条目已移动到异常队列。
    original_msgid 为了实现兼容性而支持此参数,但被忽略。
    transaction_group 为了实现兼容性而支持此参数,但被忽略。
    delivery_mode 不支持此参数;指定 DBMS_AQ.PERSISTENT 的值。
  • payload

    使用payload参数检索具有出队操作的消息的有效负载。有效负载类型必须与创建队列表时指定的类型匹配。

  • msgid

    使用msgid参数检索唯一消息标识符。

示例

以下匿名块调用DBMS_AQ.DEQUEUE,从队列和有效负载中检索消息:

DECLARE

  dequeue_options    DBMS_AQ.DEQUEUE_OPTIONS_T;
  message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
  message_handle     raw(16);
  payload            work_order;

BEGIN
  dequeue_options.dequeue_mode := DBMS_AQ.BROWSE;

  DBMS_AQ.DEQUEUE(
    queue_name         => 'work_queue',
    dequeue_options    => dequeue_options,
    message_properties => message_properties,
    payload            => payload,
    msgid              => message_handle
  );

  DBMS_OUTPUT.PUT_LINE(
  'The next work order is [' || payload.subject || '].'
  );
END;

有效负载由DBMS_OUTPUT.PUT_LINE显示。

REGISTER

使用 REGISTER 存储过程注册在项目入队或出队时接收通知的电子邮件地址、过程或 URL。特征为:

REGISTER(
  reg_list IN SYS.AQ$_REG_INFO_LIST,
  count IN NUMBER)

参数

  • reg_list

    reg_list 是类型为 AQ$_REG_INFO_LIST 的列表,提供有关您要注册的各个订阅的信息。列表中每个条目的类型都是 AQ$_REG_INFO,可以包含:

    属性 类型 说明
    name VARCHAR2 (128) 订阅的名称(可能是 schema 限定的)。
    namespace NUMERIC 唯一支持的值为 DBMS_AQ.NAMESPACE_AQ (0)
    callback VARCHAR2 (4000) 说明将对通知执行的操作。目前,仅支持调用 PL/SQL 存储过程。调用应采取以下形式:plsql://schema.procedure其中:
    • schema:指定存储过程所在的 schema。
    • procedure:指定将通知的存储过程的名称。
    context RAW (16) 回调过程需要的任何用户定义的值。
  • count

    countreg_list中的条目数。

示例

以下匿名块调用 DBMS_AQ.REGISTER,注册在队列中添加或删除项目时将通知到的存储过程。为在 DECLARE 部分中标识的每个订阅提供一组属性(类型为 sys.aq$_reg_info):

DECLARE
   subscription1 sys.aq$_reg_info;
   subscription2 sys.aq$_reg_info;
   subscription3 sys.aq$_reg_info;
   subscriptionlist sys.aq$_reg_info_list;
BEGIN
   subscription1 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://assign_worker?PR=0',HEXTORAW('FFFF'));
   subscription2 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://add_to_history?PR=1',HEXTORAW('FFFF'));
   subscription3 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://reserve_parts?PR=2',HEXTORAW('FFFF'));

   subscriptionlist := sys.aq$_reg_info_list(subscription1, subscription2, subscription3);
   dbms_aq.register(subscriptionlist, 3);
   commit;
  END;
   /

subscriptionlist的类型为sys.aq$_reg_info_list,包含以前描述的sys.aq$_reg_info对象。列表名称和对象计数传递到dbms_aq.register

UNREGISTER

使用UNREGISTER存储过程关闭与入队和出队相关的通知。特征为:

UNREGISTER(
  reg_list IN SYS.AQ$_REG_INFO_LIST,
  count
IN NUMBER)

参数

  • reg_list

    reg_list是类型为AQ$_REG_INFO_LIST的列表,提供有关您要注册的各个订阅的信息。列表中每个条目的类型都是AQ$_REG_INFO,可以包含:

    属性 类型 说明
    name VARCHAR2 (128) 订阅的名称(可能是 schema 限定的)。
    namespace NUMERIC 唯一支持的值为 DBMS_AQ.NAMESPACE_AQ (0)
    callback VARCHAR2 (4000) 说明将对通知执行的操作。目前,仅支持调用 PL/SQL 存储过程。调用应采取以下形式:plsql://schema.procedure其中:
    • schema:指定存储过程所在的 schema。
    • procedure:指定将通知的存储过程的名称。
    context RAW (16) 该存储过程需要的任何用户定义的值。
  • count

    countreg_list中的条目数。

示例

以下匿名块调用DBMS_AQ.UNREGISTER,禁用在示例中为DBMS_AQ.REGISTER指定的通知:

DECLARE
   subscription1 sys.aq$_reg_info;
   subscription2 sys.aq$_reg_info;
   subscription3 sys.aq$_reg_info;
   subscriptionlist sys.aq$_reg_info_list;
BEGIN
   subscription1 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://assign_worker?PR=0',HEXTORAW('FFFF'));
   subscription2 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://add_to_history?PR=1',HEXTORAW('FFFF'));
   subscription3 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://reserve_parts?PR=2',HEXTORAW('FFFF'));

   subscriptionlist := sys.aq$_reg_info_list(subscription1, subscription2, subscription3);
   dbms_aq.unregister(subscriptionlist, 3);
   commit;
  END;
   /

subscriptionlist的类型为sys.aq$_reg_info_list,包含以前描述的sys.aq$_reg_info对象。列表名称和对象计数传递到dbms_aq.unregister