DBMS_AQ provides an API to the enqueue and dequeue operations in the Oracle Advanced Queuing (AQ) facility. Oracle8i enhances AQ in a number of ways, many reflected in changes in the DBMS_AQ and DBMS_AQADM packages.
TIP: A working knowledge of the Oracle Advanced Queuing facility and the DBMS_AQ and DBMS_AQADM packages is assumed for this section. If you need to learn more, you might want to check out Chapter 5 of Oracle Built-in Packages.
Oracle has changed the security model for Oracle AQ in Oracle8i. You can now set security at the system and queue level (discussed in the next section). These features are only available, however, for AQ 8.1-style queues.
To create queues in Oracle 8.1 that can make use of the new security features, you must set the compatible parameter in DBMS_AQADM.CREATE_QUEUE_TABLE to `8.1' or above. Here is the new, expanded header:
PROCEDURE DBMS_AQADM.CREATE_QUEUE_TABLE ( queue_table IN VARCHAR2, queue_payload_type IN VARCHAR2, storage_clause IN VARCHAR2 DEFAULT NULL, sort_list IN VARCHAR2 DEFAULT NULL, multiple_consumers IN BOOLEAN DEFAULT FALSE, message_grouping IN BINARY_INTEGER DEFAULT NONE, comment IN VARCHAR2 DEFAULT NULL, auto_commit IN BOOLEAN DEFAULT TRUE, primary_instance IN BINARY_INTEGER DEFAULT 0, secondary_instance IN BINARY_INTEGER DEFAULT 0, compatible IN VARCHAR2 DEFAULT NULL);
The first eight parameters are the same as in Oracle 8.0. The final three parameters have the meanings shown here:
The primary owner of the queue table. This instance performs the queue monitor scheduling and propagation for the queues in the queue table. The default is 0, which means scheduling and propagation will be performed in any available instance.
If you want to define a queue table with 8.1 compatibility, you will need to make a call like this:
BEGIN DBMS_AQADM.CREATE_QUEUE_TABLE ( 'workflow', 'workflow_ot', compatible => '8.1'); END;
I have used named notation to skip over all the intervening parameters (thereby accepting their default values) and set the compatibility level.
If you want to use the AQ 8.1 security features on a queue that was defined originally in an 8.0 database, you must convert the queue table to 8.1 compatibility by executing DBMS_AQADM.MIGRATE_QUEUE_TABLE on the queue table. Here is the header for this procedure:
PROCEDURE DBMS_AQADM.MIGRATE_QUEUE_TABLE( queue_table IN VARCHAR2, compatible IN VARCHAR2)
where queue_table is the name of the queue table to be migrated, and compatible indicates the direction of the migration, as shown in the following table.
Downgrade an 8.1 queue table to be 8.0 compatible.
Upgrade an 8.0 queue table to be 8.1 compatible.
Back in Oracle 8.0, administrators granted access to AQ operations by assigning roles that provided execution privileges on the AQ procedures. There was no security at the database object level, which meant that in Oracle 8.0 a user with the AQ_USER_ROLE could enqueue and dequeue to any queue in the system. This is obviously inadequate, and in Oracle8i, AQ offers a much more granular approach to security. An owner of an 8.1-compatible queue can now grant or revoke queue-level privileges on the queue (described in the " section). DBAs can grant or revoke new AQ system-level privileges to any database user. DBAs can also make any database user an AQ administrator.
The grant and revoke operations for AQ are not performed through the GRANT and REVOKE DDL statements. Instead, the DBMS_AQADM package provides a set of procedures. Some of these procedures were present in Oracle 8.0, but with limited capabilities.
To set a system-level privilege, call the following procedure:
PROCEDURE DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE ( privilege IN VARCHAR2, grantee IN VARCHAR2, admin_option IN BOOLEAN := FALSE);
where privilege is the AQ system privilege to grant, grantee is the user or role (including PUBLIC) to which the privilege is granted, and admin_option controls whether the grantee is allowed to use this procedure to grant the system privilege to other users or roles. The options for privilege are shown here:
TIP: Immediately after database installation, only SYS and SYSTEM have the privileges to run this program successfully. If you do not want to manage AQ from either of these schemas, you will want to grant MANAGE_ANY with the admin_option parameter set to TRUE to another schema (such as AQADMIN) and then work from there for future AQ administrative activities.
PROCEDURE DBMS_AQADM.REVOKE_SYSTEM_PRIVILEGE ( privilege IN VARCHAR2, grantee IN VARCHAR2);
Let's take a look at the steps a DBA will commonly take to set up a schema as an AQ administrator. First, create the user (I'll call it "WFADM" for "workflow administration") and grant the roles needed to function in the database and work as an AQ administrator:
CREATE USER WFADM IDENTIFIED BY WFADM; GRANT CONNECT, RESOURCE, aq_administrator_role TO WFADM;
Next, make sure that this schema can execute both of the AQ packages:
GRANT EXECUTE ON dbms_aq TO WFADM; GRANT EXECUTE ON dbms_aqadm TO WFADM;
BEGIN DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE ( 'ENQUEUE_ANY', 'WFADM', FALSE); DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE ( 'DEQUEUE_ANY', 'WFADM', FALSE); END; /
To set a queue-level privilege, call the following procedure:
PROCEDURE DBMS_AQADM.GRANT_QUEUE_PRIVILEGE ( privilege IN VARCHAR2, queue_name IN VARCHAR2, grantee IN VARCHAR2, admin_option IN BOOLEAN := FALSE);
where privilege is the AQ system privilege to grant, queue_name is the name of the queue on which the grant is to be made, grantee is the user or role (including PUBLIC) to which the privilege is granted, and admin_option controls whether the grantee is allowed to use this procedure to grant the privilege to other users or roles.
The options for privilege are shown here:
PROCEDURE DBMS_AQADM.REVOKE_QUEUE_PRIVILEGE ( privilege IN VARCHAR2, queue_name IN VARCHAR2, grantee IN VARCHAR2);
Queue-level grants are crucial when you want to set up individual schemas to be able to only enqueue or only dequeue for specific queues. Suppose, for example, that I am constructing a system to support universal health care in the United States. I want doctors to be able to enqueue a record of services performed, but I don't want them to be able to dequeue that information. I want my administrators to be able to dequeue that information, but not to enqueue it. I might execute a block like this:
BEGIN DBMS_AQADM.GRANT_QUEUE_PRIVILEGE ( 'DEQUEUE', 'FEELGOOD_service_record_q', 'Doctor_Role', FALSE); DBMS_AQADM.GRANT_QUEUE_PRIVILEGE( 'ENQUEUE', 'FEELGOOD_service_record_q', 'Admin_Role', FALSE); END; /
Oracle AQ adds various features in Oracle 8.1 that allow you to develop an application based on a publish/subscribe model. This model allows different components of the application to communicate with each other in a very flexible way, based on an important principle: publisher application components interact with subscriber application components only through messages and message content.
This means that publisher applications don't have to know about, or ever have to manage, recipient information. They "publish" their messages by putting information in queues. They don't worry about who is going to receive it, when, and how. The subscriber applications receive messages through the dequeue operation, based solely on the content of those messages. The identity of the publisher does not play a role in determining the enqueue operation, and the identities and number of subscriber applications can be modified without affecting the messages.
This basic approach was available in Oracle 8.0, when the AQ facility was first introduced. Oracle8i significantly improves the ability to implement a publish/subscribe model with the following features:
The following sections focus on the new PL/SQL features -- rule-based subscribers and the new LISTEN procedure.
Oracle AQ has always let you define subscriber lists and add subscribers with the DBMS_AQADM.ADD_SUBSCRIBER procedure. The 8.1 implementation adds another parameter to this procedure, allowing you to associate a rule with a subscriber. Here is the new procedure header:
PROCEDURE DBMS_AQADM.ADD_SUBSCRIBER ( queue_name IN VARCHAR2, subscriber IN sys.aq$_agent, rule IN VARCHAR2 DEFAULT NULL);
where queue_name is the name of the queue in which the subscriber is interested, subscriber is an object instance of type SYS.AQ$_AGENT that identifies the agent to be added to the subscription list, and rule is a string that contains a conditional expression.
The rule must be a string that is evaluated dynamically to a Boolean value: TRUE, FALSE, or NULL. The string may contain references to message properties (fields in the DBMS_AQ.MESSAGE_PROPERTIES_T record type), to attributes of the queue's payload (object payloads only, not RAW) and to PL/SQL functions (either built-in or your own).
Let's go over some rules and then look at some examples. The rule must conform to the following guidelines:
The only message properties currently supported are PRIORITY and CORRID (correlation identifier).
If you wish to reference attributes of the object payload in a queue, you must prefix each attribute with a qualifier of TAB.USER_DATA (a hardcoded string).
The maximum length of the rule parameter is 4000 characters.
Any PL/SQL functions you reference in the rule must be callable from within the WHERE clause of a SQL statement.
If you need to surround a literal with single quotes inside the rule, then you must use two single quotes in sequence.
The following examples should clarify these rules and their application:
Add a subscriber to the War Criminals Prosecution queue who is interested only in dequeuing messages of top priority:
DECLARE most_urgent SYS.AQ$_AGENT := SYS.AQ$_AGENT ('ChiefProsecutor', 'Cases_queue'); BEGIN DBMS_AQADM.ADD_SUBSCRIBER ( 'Cases_queue', most_urgent, 'PRIORITY = 1'); END;
The War Crimes Tribunal has just hired a junior prosecutor to handle cases involving atrocities in Latin America. The following block of code ensures that this agent will only handle low-priority cases from that region. Notice the use of multiple single quotes to ensure that the literal for region name is passed through properly:
DECLARE back_burner SYS.AQ$_AGENT := SYS.AQ$_AGENT ('JuniorProsecutor', 'Cases_queue'); BEGIN DBMS_AQADM.ADD_SUBSCRIBER ( 'Cases_queue', back_burner, 'PRIORITY > 3 AND CORRID = ''LATIN AMERICA'''); END;_
Big changes in the year 2015! Even as the fast food chains come to dominate the delivery of food to the world's population, the percentage of humans living in hunger increases. A worldwide protest movement rises up -- and the three biggest chains are purchased by the United Nations. Now all that technology and food delivery capability will be used directly to make sure that no one in the world starves. But wait -- we need to set up subscribers to receive orders for food in their specific regions. So I set up an object type to be used as the queue payload:
CREATE TYPE food_order_t AS OBJECT ( country VARCHAR2(100), region VARCHAR2(100), child_population NUMBER, adult_population NUMBER );
DECLARE lotsa_kids SYS.AQ$_AGENT := SYS.AQ$_AGENT ('FoodManager', 'Food_distribution_queue'); BEGIN DBMS_AQADM.ADD_SUBSCRIBER ( 'Food_distribution_queue', lotsa_kids, 'TAB.USER_DATA.country = ''USER'' AND TAB.USER_DATA.region = ''MIDWEST'' AND TAB.USER_DATA.child_population > 10000'); END;_
PROCEDURE DBMS_AQ.LISTEN ( agent_list IN AQ$_AGENT_LIST_T, wait IN BINARY_INTEGER DEFAULT DBMS_AQ.FOREVER, agent OUT SYS.AQ$_AGENT);
where agent_list is an index-by table defined in DBMS_AQ as follows:
TYPE DBMS_AQ.AQ$_AGENT_LIST_T IS TABLE of AQ$_AGENT INDEXED BY BINARY_INTEGER;
wait is the number of seconds the LISTEN procedure will wait or block as it waits for a message (the default is forever), and agent is the value returned by the procedure: an object.
The DBMS_AQ.LISTEN procedure is very similar to DBMS_ALERT.WAITANY. You can call the procedure to monitor one or more queues, which are identified by the address field of the agent object (only local queues are supported as addresses). When you call DBMS_AQ.LISTEN, your session will be blocked until a message is available in one of the queues, or until the wait time expires.
If a message is available for consumption on one of the queues indicated by the agents in agent_list, then that address will be returned in the agent OUT parameter. A successful completion of this call to DBMS_AQ.LISTEN does not, however, dequeue the message. Once you retrieve the agent, you must obtain the queue name and then issue an explicit DBMS_AQ.DEQUEUE call against that queue to get the payload.
If there are no messages found when the wait time expires in a call to DBMS_AQ.LISTEN, then the following error is raised:
ORA-25254: time-out in LISTEN while waiting for a message
Let's look at an example. All the members of my family love ice cream and we each have our own favorite flavor. We have installed Oracle8i on our local ice cream truck, along with a cellular modem. Whenever Sally, who drives the truck, is coming into our neighborhood, she will queue up messages indicating the flavors available that day. Every hot summer afternoon, we issue a call to DBMS_AQ.LISTEN and wait to hear who will have first (and hopefully not only) dibs on the ice cream. So let's walk through the code needed to accomplish this task (all to be found in the aqlisten.sql file on the companion disk).
Here is the payload for my queue:
/* Filename on companion disk: aqlisten.sql */ CREATE TYPE ice_cream_t IS OBJECT ( flavor VARCHAR2(30), calories INTEGER); /
Then I create a package to hold the queue-related data structures and also initialize my queue. The package specification contains a named constant for the queue name and the list of subscribers I will use in my call to DBMS_AQ.LISTEN:
CREATE OR REPLACE PACKAGE aqlisten IS qname CONSTANT CHAR(15) := 'ice_cream_queue'; tell_us DBMS_AQ.AQ$_AGENT_LIST_T; END; /
The package body defines a procedure that I use to define subscribers for my queue and also build the listen list:
PROCEDURE subscribe_me ( name IN VARCHAR2, flavor IN VARCHAR2) IS tell_me SYS.AQ$_AGENT := SYS.AQ$_AGENT (name, qname, NULL); BEGIN DBMS_AQADM.ADD_SUBSCRIBER ( qname, tell_me, 'TAB.USER_DATA.flavor = ''' || flavor || ''''); tell_us (NVL(tell_us.LAST,0)+1) := tell_me; END;
Notice that each subscriber has a rule associated with it: his or her favorite flavor of ice cream. The assignment to the tell_us index-by table always adds to the end of the table.
There is nothing else in the package body but the initialization section. This section contains code that will be executed for each session the first time that session references any element in the package. In the case of aqlisten, there are only two ways to reference it: use aqlisten.qname or the aqlisten.tell_us table. The first step in the initialization is to clean out any old versions of my queue table and queue:
DBMS_AQADM.STOP_QUEUE (qname, TRUE, TRUE, FALSE); DBMS_AQADM.DROP_QUEUE (qname); DBMS_AQADM.DROP_QUEUE_TABLE ('ice_cream_qtable');
Then I can create the elements anew, making sure that they are defined as I need them for the example:
DBMS_AQADM.CREATE_QUEUE_TABLE ( queue_table => 'ice_cream_qtable', queue_payload_type => 'ice_cream_t', multiple_consumers => TRUE, compatible => '8.1'); DBMS_AQADM.CREATE_QUEUE (qname, 'ice_cream_qtable'); DBMS_AQADM.START_QUEUE (qname);
Notice that I specify the ice cream queue table as an 8.1 queue table able to support multiple consumers. Great! Now I can define my subscribers and their favorite flavors and confirm the number of subscribers in my list:
subscribe_me ('Steven', 'ROCKY ROAD'); subscribe_me ('Veva', 'BUTTER PECAN'); subscribe_me ('Chris', 'VANILLA'); subscribe_me ('Eli', 'MINT CHOCOLATE CHIP'); DBMS_OUTPUT.PUT_LINE (tell_us.COUNT || ' subscribers in tell_us.');
Now I am ready to try it out. I create a procedure that allows me to enqueue a particular flavor and then listen for the corresponding agent. I then use DBMS_OUTPUT to confirm the agent by name and queue:
CREATE OR REPLACE PROCEDURE tasty_treat_time ( flavor IN VARCHAR2) IS tell_me SYS.AQ$_AGENT; queueOpts DBMS_AQ.ENQUEUE_OPTIONS_T; msgProps DBMS_AQ.MESSAGE_PROPERTIES_T; mmmmmm ice_cream_t; msgid RAW(16); BEGIN queueopts.visibility := DBMS_AQ.IMMEDIATE; /* Populate the object. */ mmmmmm := ice_cream_t (flavor, 10); DBMS_AQ.ENQUEUE ( aqlisten.qname, queueOpts, msgProps, mmmmmm, msgid); DBMS_AQ.LISTEN (aqlisten.tell_us, 0, tell_me); DBMS_OUTPUT.PUT_LINE ( 'Message for ' || tell_me.name || ' in queue ' || tell_me.address); END; /
When I run this script I see the following output:
BEGIN tasty_treat_time ('MINT CHOCOLATE CHIP'); tasty_treat_time ('VANILLA'); tasty_treat_time ('STRAWBERRY'); EXCEPTION WHEN OTHERS THEN DBMS_OUTPUT.PUT_LINE (SQLERRM); END; / Message for ELI in queue SCOTT.ICE_CREAM_QUEUE Message for CHRIS in queue SCOTT.ICE_CREAM_QUEUE ORA-24033: no recipients for message
Copyright (c) 2000 O'Reilly & Associates. All rights reserved.