PolarDB-O数据库提供消息排队和消息处理。用户定义的消息存储在队列中;队列的集合存储在队列表中。DBMS_AQADM 包中的存储过程创建并管理消息队列和队列表。使用 DBMS_AQ 包可添加消息到队列或者从队列中删除消息,或者注册或注销 PL/SQL 回调过程。

PolarDB-O还通过如下SQL命令为DBMS_AQ包提供扩展(不兼容)功能:

  • ALTER QUEUE
  • ALTER QUEUE TABLE
  • CREATE QUEUE
  • CREATE QUEUE TABLE
  • DROP QUEUE
  • DROP QUEUE TABLE

DBMS_AQADM 包提供允许您创建和管理队列和队列表的存储过程。

函数/存储过程 返回类型 说明
ALTER_QUEUE n/a 修改现有的队列。
ALTER_QUEUE_TABLE n/a 修改现有的队列表。
CREATE_QUEUE n/a 创建队列。
CREATE_QUEUE_TABLE n/a 创建队列表。
DROP_QUEUE n/a 删除现有队列。
DROP_QUEUE_TABLE n/a 删除现有队列表。
PURGE_QUEUE_TABLE n/a 从队列表中删除一个或多个消息。
START_QUEUE n/a 使队列对于入队和出队过程可用。
STOP_QUEUE n/a 使队列对于入队和出队过程不可用。

PolarDB-O的DBMS_AQADM实施与 Oracle 的版本相比是部分实施。仅支持上表中列出的函数和存储过程。

PolarDB-O支持使用下面列出的参数:

常量 说明 用于参数
DBMS_AQADM.TRANSACTIONAL(1) 此常量已定义,但是,如果使用会返回错误。 message_grouping
DBMS_AQADM.NONE(0) 用于为队列表指定消息分组。 message_grouping
DBMS_AQADM.NORMAL_QUEUE(0) 与 create_queue 一起使用来指定 queue_type。 queue_type
DBMS_AQADM.EXCEPTION_QUEUE (1) 与 create_queue 一起使用来指定queue_type。 queue_type
DBMS_AQADM.INFINITE(-1) 与 create_queue 一起使用来指定 retention_time。 retention_time
DBMS_AQADM.PERSISTENT (0) 此消息应存储在表中。 enqueue_options_t.delivery_mode
DBMS_AQADM.BUFFERED (1) 此常量已定义,但是,如果使用会返回错误。 enqueue_options_t.delivery_mode
DBMS_AQADM.PERSISTENT_OR_BUFFERED (2) 此常量已定义,但是,如果使用会返回错误。 enqueue_options_t.delivery_mode

ALTER_QUEUE

使用 ALTER_QUEUE 存储过程修改现有队列。特征为:

ALTER_QUEUE(
  max_retries IN NUMBER DEFAULT NULL,
  retry_delay IN NUMBER DEFAULT 0
  retention_time IN NUMBER DEFAULT 0,
  auto_commit IN BOOLEAN DEFAULT TRUE)
  comment IN VARCHAR2 DEFAULT NULL,

参数

参数 描述
queue_name 新队列的名称。
max_retries max_retries 指定使用 dequeue 语句删除消息的最大尝试次数。max_retries 的值随着每个 ROLLBACK 语句增加。当失败的尝试次数达到 max_retries 指定的值之后,消息移动到异常队列。指定 0 以指示不允许重试。
retry_delay retry_delay 指定在 ROLLBACK 之后计划重新处理消息等待的秒数。指定 0 可指示应立即重试消息(默认值)。
retention_time retention_time 指定消息在出队之后进行存储所经过的时间长度(以秒为单位)。您还可以指定 0(默认值)以指示消息在出队之后不应保留,或者指定 INFINITE 以永久保留消息。
auto_commit 为了实现兼容性而支持此参数,但被忽略。
comment comment 指定与队列关联的注释。

示例

以下命令更改名为 work_order 的队列,将 retry_delay 参数设置为 5 秒:

EXEC DBMS_AQADM.ALTER_QUEUE(queue_name => 'work_order', retry_delay => 5);

ALTER_QUEUE_TABLE

使用 ALTER_QUEUE_TABLE 存储过程修改现有队列表。特征为:

ALTER_QUEUE_TABLE (
  queue_table IN VARCHAR2,
  comment IN VARCHAR2 DEFAULT NULL,
  primary_instance IN BINARY_INTEGER DEFAULT 0,
  secondary_instance IN BINARY_INTEGER DEFAULT 0,

参数

参数 描述
queue_table 队列表的名称(可能是 schema 限定的)。
comment 使用 comment 参数可以提供有关队列表的注释。
primary_instance 为了实现兼容性而支持 primary_instance,但被忽略。
secondary_instance 为了实现兼容性而支持 secondary_instance,但被忽略。

示例

以下命令修改名为 work_order_table 的队列表:

EXEC DBMS_AQADM.ALTER_QUEUE_TABLE
      (queue_table => 'work_order_table', comment => 'This queue table contains work orders for the shipping department.');

队列表名为 work_order_table;该命令添加注释到队列表的定义。

CREATE_QUEUE

使用 CREATE_QUEUE 存储过程在现有队列表中创建队列。特征为:

CREATE_QUEUE(
  queue_name IN VARCHAR2
  queue_table IN VARCHAR2,
  queue_type IN BINARY_INTEGER DEFAULT NORMAL_QUEUE,
  max_retries IN NUMBER DEFAULT 5,
  retry_delay IN NUMBER DEFAULT 0
  retention_time IN NUMBER DEFAULT 0,
  dependency_tracking IN BOOLEAN DEFAULT FALSE,
  comment IN VARCHAR2 DEFAULT NULL,
  auto_commit IN BOOLEAN DEFAULT TRUE)

参数

参数 描述
queue_name 新队列的名称。
queue_table 新队列所在的表的名称。
queue_type 新队列的类型。queue_type 的有效值为:
  • DBMS_AQADM.NORMAL_QUEUE:此值指定普通队列(默认值)。
  • DBMS_AQADM.EXCEPTION_QUEUE:此值指定新队列是异常队列。异常队列仅支持出队操作。
max_retries max_retries 指定使用 dequeue 语句删除消息的最大尝试次数。max_retries 的值随着每个 ROLLBACK 语句增加。当失败的尝试次数达到 max_retries 指定的值之后,消息移动到异常队列。系统表的默认值为 0;用户创建的表的默认值为 5。
retry_delay retry_delay 指定在 ROLLBACK 之后计划重新处理消息等待的秒数。指定 0 可指示应立即重试消息(默认值)。
retention_time retention_time 指定消息在出队之后进行存储所经过的时间长度(以秒为单位)。您还可以指定 0(默认值)以指示消息在出队之后不应保留,或者指定 INFINITE 以永久保留消息。
dependency_tracking 为了实现兼容性而支持此参数,但被忽略。
comment comment 指定与队列关联的注释。
auto_commit 为了实现兼容性而支持此参数,但被忽略。

示例

以下匿名块在 work_order_table 表中创建名为 work_order 的队列:

BEGIN
DBMS_AQADM.CREATE_QUEUE ( queue_name => 'work_order', queue_table => 'work_order_table', comment => 'This queue contains pending work orders.');
END;

CREATE_QUEUE_TABLE

使用 CREATE_QUEUE_TABLE 存储过程创建队列表:特征为:

CREATE_QUEUE_TABLE (
  queue_table IN VARCHAR2,
  queue_payload_type IN VARCHAR2,
  storage_clause IN VARCHAR2 DEFAULT NULL,
  sort_list IN VARCHAR2 DEFAULT NULL,
  multiple_consumers IN BOOLEAN DEFAULT FALSE,
  message_grouping IN BINARY_INTEGER DEFAULT NONE,
  comment IN VARCHAR2 DEFAULT NULL,
  auto_commit IN BOOLEAN DEFAULT TRUE,
  primary_instance IN BINARY_INTEGER DEFAULT 0,
  secondary_instance IN BINARY_INTEGER DEFAULT 0,
  compatible IN VARCHAR2 DEFAULT NULL,
  secure IN BOOLEAN DEFAULT FALSE)

参数

参数 描述
queue_table 队列表的名称(可能是 schema 限定的)。
queue_payload_type 将存储在队列表中的数据的用户定义类型。请注意,要指定 RAW 数据类型,您必须创建标识 RAW 类型的用户定义类型。
storage_clause 使用 storage_clause 参数可指定队列表的属性。请注意,只强制实施 TABLESPACE 选项;为了实现兼容性而支持其他所有选项,但被忽略。可使用 TABLESPACE 子句指定将在其中创建表的表空间的名称。
  • storage_clause 可以是以下一项或多项:

    TABLESPACE tablespace_name、PCTFREE integer、PCTUSED integer、INITRANS integer、MAXTRANS integer 或 STORAGE storage_option。

  • storage_option 可以为以下一项或多项:

    MINEXTENTS integer、MAXEXTENTS integer、PCTINCREASE integer、INITIAL size_clause、NEXT、FREELISTS integer、OPTIMAL size_clause、BUFFER_POOL {KEEP|RECYCLE|DEFAULT}。

sort_list sort_list 选项控制队列的出队顺序;指定将用于对队列进行排序(升序)的列名称。当前接受的值为 enq_time 和 priority 的以下组合:
  • enq_time, priority
  • priority, enq_time
  • priority
  • enq_time
multiple_consumers 如果指定,multiple_consumers 必须为 FALSE。
message_grouping 如果指定,message_grouping 必须为 NONE。
comment 使用 comment 参数可以提供有关队列表的注释。
auto_commit 为了实现兼容性而支持 auto_commit,但被忽略。
primary_instance 为了实现兼容性而支持 primary_instance,但被忽略。
secondary_instance 为了实现兼容性而支持 secondary_instance,但被忽略。
compatible 为了实现兼容性而支持 compatible,但被忽略。
secure secure is accepted for compatibility, but is ignored.

示例

以下匿名块首先创建类型 (work_order),该类型具有保存名称 (VARCHAR2) 的属性,以及项目说明 (a TEXT)。然后,块使用该类型创建队列表:

BEGIN

CREATE TYPE work_order AS (name VARCHAR2, project TEXT, completed BOOLEAN);

EXEC DBMS_AQADM.CREATE_QUEUE_TABLE
      (queue_table => 'work_order_table',
       queue_payload_type => 'work_order',
       comment => 'Work order message queue table');
END;

队列表名为 work_order_table,包含类型为 work_order 的有效负载。注释说明这是 Work order message queue table。

DROP_QUEUE

使用 DROP_QUEUE 存储过程可以删除队列。特征为:

DROP_QUEUE(
  queue_name  IN VARCHAR2,
  auto_commit IN BOOLEAN DEFAULT TRUE)

参数

参数 描述
queue_name 要删除的队列的名称。
auto_commit 为了实现兼容性而支持 auto_commit,但被忽略。

示例

以下匿名块删除名为 work_order 的队列:

BEGIN
DBMS_AQADM.DROP_QUEUE(queue_name => 'work_order');
END;

DROP_QUEUE_TABLE

使用 DROP_QUEUE_TABLE 存储过程可以删除队列表。特征为:

DROP_QUEUE_TABLE(
  queue_table IN VARCHAR2,
  force IN BOOLEAN default FALSE,
  auto_commit IN BOOLEAN default TRUE)

参数

参数 描述
queue_table 队列表的名称(可能是 schema 限定的)。
force force 关键字决定 DROP_QUEUE_TABLE 命令在删除包含项目的表时的行为:
  • 如果目标表包含项目且 force 为 FALSE,则该命令将失败,并且服务器将发出错误。
  • 如果目标表包含项目且 force 为 TRUE,则该命令将删除表以及任何从属对象。
auto_commit 为了实现兼容性而支持 auto_commit,但被忽略。

示例

以下匿名块删除名为 work_order_table 的表:

BEGIN
   DBMS_AQADM.DROP_QUEUE_TABLE ('work_order_table', force => TRUE);
END;

PURGE_QUEUE_TABLE

使用 PURGE_QUEUE_TABLE 存储过程可以从队列表中删除消息。特征为:

PURGE_QUEUE_TABLE(
  queue_table IN VARCHAR2,
  purge_condition IN VARCHAR2,
  purge_options IN aq$_purge_options_t)

参数

参数 描述
queue_table queue_table 指定从中删除消息的队列表的名称。
purge_condition 使用 purge_condition 可以指定服务器在决定要清除哪些消息时将评估的条件(SQL WHERE 子句)。
purge_options purge_options 是类型为 aq$_purge_options_t 的对象。aq$_purge_options_t 对象包含内容请参见表 1
表 1. aq$_purge_options_t
属性 类型 说明
Block Boolean 如果应当在表中所有队列上保有排他锁,则指定 TRUE。默认值为 FALSE。
delivery_mode INTEGER delivery_mode 指定将清除的消息类型。唯一可接受的值为 DBMS_AQ.PERSISTENT。

示例

以下匿名块从 work_order_table 中删除 completed 列值为 YES 的任何消息:

DECLARE
   purge_options dbms_aqadm.aq$_purge_options_t;
BEGIN
   dbms_aqadm.purge_queue_table('work_order_table', 'completed = YES', purge_options);
  END;

START_QUEUE

使用 START_QUEUE 存储过程使队列可用于排队和取消排队。特征为:

START_QUEUE(
  queue_name IN VARCHAR2,
  enqueue IN BOOLEAN DEFAULT TRUE,
  dequeue IN BOOLEAN DEFAULT TRUE)

参数

参数 描述
queue_name queue_name 指定要启动的队列的名称。
enqueue 指定 TRUE 以启用排队(默认值),或指定 FALSE 以保持当前设置不变。
dequeue 指定 TRUE 以启用取消排队(默认值),或指定 FALSE 以保持当前设置不变。

示例

以下匿名块使名为 work_order 的队列可用于排队:

BEGIN
DBMS_AQADM.START_QUEUE
(queue_name => 'work_order);
END;

STOP_QUEUE

使用 STOP_QUEUE 存储过程在指定队列中禁用排队或取消排队。特征为:

STOP_QUEUE(
  queue_name IN VARCHAR2,
  enqueue IN BOOLEAN DEFAULT TRUE,
  dequeue IN BOOLEAN DEFAULT TRUE,
  wait IN BOOLEAN DEFAULT TRUE)

参数

参数 描述
queue_name queue_name 指定要停止的队列的名称。
enqueue 指定 TRUE 以禁用排队(默认值),或指定 FALSE 以保持当前设置不变。
dequeue 指定 TRUE 禁用出队(默认值),或指定 FALSE 保持当前设置不变。
wait 指定TRUE以指示服务器等待任何未完成的事务完成,然后再应用指定的更改。在等待停止队列时,在指定队列中不允许任何事务排队或取消排队。指定FALSE以立即停止队列。

示例

以下匿名块在名为 work_order 的队列中禁用排队和取消排队:

BEGIN
DBMS_AQADM.STOP_QUEUE(queue_name =>'work_order', enqueue=>TRUE, dequeue=>TRUE, wait=>TRUE);
END;

排队和取消排队将在任何未完成的事务完成之后停止。