- 적용 방법
PLSQL – Trigger :
1개의 테이블의 Insert, Delete, Update Action을 감시하다가 Action이
발생하면 PLSQL – Package를 호출
PLSQL – Package :
XML생성 모듈과 Queue삽입 모듈로 구성 변경전 또는 후의 데이터를 받
아 XML파일로 구성한후 Queue에 삽입.
JAVA - Source :
Queue에 접속하여 데이터를 삽입하는 기능을 지님.
PLSQL Package에서 Wrapping하여 사용.
- 환경 설정
[D/B 환경 변수 설정]
C:\Oracle\admin\자신의 SID\pfile\init.ora 파일을 열어서
java_pool_size = 20000000 # for java stored procedure
compatible = 8.1.0 # for java stored procedure
job_queue_processes = 4 # for AQ
job_queue_interval = 10 # for AQ
open_links = 4 # for AQ
AQ_TM_PROCESSES = 1 # for AQ
job_queue_keep_connections = false # for AQ
[D/B User 권한 설정]
~ : user 명
~~ : Password
* AQ 관련
grant connect, resource, aq_administrator_role to ~ identified by ~~;
grant execute on sys.dbms_aqadm to ~;
grant execute on sys.dbms_aq to ~;
grant execute on sys.dbms_aqin to ~;
* JAVA 관련
grant execute on sys.dbms_java to ~;
dbms_java.grant_permission('~','java.net.SocketPermission','*', ‘connect,resolve');
-- D/B내부의 java program이 Queue에 접속하기위해서 권한부여
-- Syntax :
grant_permission(grantee, permission_type, permission_name, permission_action)
각 파라미터의 값은 SYS.JAVA$POLICY$ 테이블에서 확인 가능
[JAVA CLASSPATH]
D/B 서버의 CLASSPATH에 다음을 추가한다.
JDK1.1 의 경우
· $ORACLE_HOME/rdbms/jlib/aqapi11.jar
· $ORACLE_HOME/rdbms/jlib/classes111.zip
· $ORACLE_HOME/rdbms/jlib/nls_charset11.zip
JDK1.2 의 경우
· $ORACLE_HOME/rdbms/jlib/aqapi.jar
· $ORACLE_HOME/rdbms/jlib/classes12.zip
· $ORACLE_HOME/rdbms/jlib/nls_charset12.zip
[ 참고 ] 병래의 시스템 Path
classpath : .;c:\Oracle\Ora81\RDBMS\jlib\aqapi.jar;C:\bea\wlserver6.0sp1\lib\weblogic.jar path : C:\Oracle\Ora81\bin;C:\Program Files\Oracle\jre\1.1.7\bin;%SystemRoot%\system32; %SystemRoot%;%SystemRoot%\System32\Wbem;C:\Oracle\Ora81\orb\bin;c:\jdk1.3\bin |
- JAVA Code Load/Drop 방법
사용 프로그램
$ORACLE_HOME/bin/loadjava
$ORACLE_HOME/bin/dropjava
[Load JAVA source]
prompt> loadjava –r –v –user aqjms/aqjms 자바클래스명 또는 자바소스명(확장자포함)
[Drop JAVA source]
prompt> dropjava –user aqjms/aqjms 자바클래스명 또는 자바소스명(확장자포함)
* 작업 후 결과는 user_objects table에서 확인이 가능하며 Object_Type은 클래스를 load할 경우 ‘JAVA CLASS’하나만 생성되고 소스 코드를 load할 경우 ‘JAVA SOURCE’와 ‘JAVA CLASS’가 생선된다. Load가 올바르게 종료되면 Status가 ‘VALID’가 된다.
오류 발생시 user_errors에 오류 메시지가 기록된다.
- 소스 코드
PACKAGE IWMQ
AS TYPE ioElement IS RECORD ( name VARCHAR2(100), value VARCHAR2(1000) ); TYPE ioArray IS TABLE OF ioElement INDEX BY BINARY_INTEGER; PROCEDURE EnQ(p01 ioArray);
FUNCTION InsertQueue(TmpStr1 varchar2, TmpStr2 LONG, TmpStr3 LONG) return varchar2; END IWMQ; |
PACKAGE BODY IWMQ
AS PROCEDURE EnQ(p01 ioArray) is i integer; TmpStr LONG; TmpRaw LONG RAW; result varchar2(4000); begin TmpStr := '<?xml version="1.0"?>'||chr(13)||chr(10); TmpStr := TmpStr||'<XML_TO_QUEUE>'||chr(13)||chr(10); for i in 0..p01.count-1 loop TmpStr := TmpStr ||'<'||p01(i).name||'>'|| p01(i).value ||'</'||p01(i).name||'>'||chr(13)||chr(10); end loop; TmpStr := TmpStr || '</XML_TO_QUEUE>'; TmpRaw := TmpStr; result := InsertQueue('text', 'TEST MESSAGE : ', TmpStr); if result is null then result := 'null'; end if; insert into XMLTEST02 values(result); end EnQ; FUNCTION InsertQueue(TmpStr1 varchar2, TmpStr2 LONG, TmpStr3 LONG) return varchar2 as language java name 'CarSend.send(java.lang.String, java.lang.String, java.lang.String) return java.lang.String' ; END IWMQ; |
TRIGGER IWMQ_TRIGGER
after INSERT OR DELETE OR UPDATE on XMLTEST01 for each row declare TmpArray IWMQ.ioArray; i integer := 1; cursor a1 is select cname from col where tname = 'XMLTEST01' order by colno; BEGIN TmpArray(0).name := 'TABLE'; TmpArray(0).value := 'XMLTEST01'; if inserting then TmpArray(1).name := 'DBACTION'; TmpArray(1).value := 'INSERT'; i := 1; for a in a1 loop i := i + 1; TmpArray(i).name := a.cname; end loop; TmpArray(2).value := ltrim(rtrim(:new.col1)); TmpArray(3).value := ltrim(rtrim(:new.col2)); TmpArray(4).value := ltrim(rtrim(:new.col3)); TmpArray(5).value := ltrim(rtrim(:new.col4)); TmpArray(6).value := ltrim(rtrim(:new.col5)); TmpArray(7).value := ltrim(rtrim(:new.col6)); elsif updating then TmpArray(1).name := 'DBACTION'; TmpArray(1).value := 'UPDATE'; i := 1; for a in a1 loop i := i + 1; TmpArray(i).name := a.cname; end loop; TmpArray(2).value := ltrim(rtrim(:new.col1)); TmpArray(3).value := ltrim(rtrim(:new.col2)); TmpArray(4).value := ltrim(rtrim(:new.col3)); TmpArray(5).value := ltrim(rtrim(:new.col4)); TmpArray(6).value := ltrim(rtrim(:new.col5)); TmpArray(7).value := ltrim(rtrim(:new.col6)); elsif deleting then TmpArray(1).name := 'DBACTION'; TmpArray(1).value := 'DELETE'; i := 1; for a in a1 loop i := i + 1; TmpArray(i).name := a.cname; end loop; TmpArray(2).value := ltrim(rtrim(:old.col1)); TmpArray(3).value := ltrim(rtrim(:old.col2)); TmpArray(4).value := ltrim(rtrim(:old.col3)); TmpArray(5).value := ltrim(rtrim(:old.col4)); TmpArray(6).value := ltrim(rtrim(:old.col5)); TmpArray(7).value := ltrim(rtrim(:old.col6)); end if; IWMQ.EnQ( TmpArray ); END; |
import oracle.jms.AQjmsSession;
import javax.jms.*; public class CarSend extends QueueConnector{ //java CarSend jdbc:oracle:thin:@211.192.211.57:1521:cnapse aqjms aqjms textq text data1 data2 public static String send(String messageType,String maker, String model) throws JMSException{ String ret; try{ connect("jdbc:oracle:thin:@211.192.211.57:1521:cnapse", "aqjms", "aqjms"); Queue queue = ((AQjmsSession)queueSess).getQueue("aqjms", "textq"); QueueSender queueSender = queueSess.createSender(queue); queueSender.setPriority(-1); if (messageType.equals("text")){ TextMessage msg = queueSess.createTextMessage(); msg.setText(convertChar(maker+model)); queueSender.send(queue,msg); } ret = "Succeed !"; } catch(Exception e){ ret = e.toString(); } disconnect(); return ret; } public static void usage(){ System.out.println("java CarSend url user password queueName maker model"); } public static void main(String args[]) throws JMSException{ String messgeType = args[0]; String maker = args[1]; String model = args[2]; CarSend cs = new CarSend(); System.out.println(cs.send(messgeType, maker, model)); } public static String convertChar(String str){ String result = null; try{ String aa = System.getProperty("file.encoding"); System.out.println(aa); result = new String(str.getBytes("8859_1"), "KSC5601"); System.out.println("6"); } catch(java.io.UnsupportedEncodingException e){ e.printStackTrace(); } return result; } } |
import java.sql.*;
import javax.jms.*; import oracle.jms.AQjmsFactory; public class QueueConnector{ protected static QueueConnection queueConn; protected static QueueSession queueSess; public final static void connect(String url, String user, String password) throws JMSException{ QueueConnectionFactory queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(url, null); queueConn = queueConnectionFactory.createQueueConnection(user,password); queueSess = queueConn.createQueueSession(true, Session.CLIENT_ACKNOWLEDGE); queueConn.start(); } public final static void disconnect() throws JMSException{ if (queueConn != null){ try{ queueSess.commit(); queueSess.close(); queueConn.close(); } catch(Exception e){ } } } } |
* Weblogic의 MQ를 사용하는 방법은 데이터 베이스로 그 소스를 로드하였을 때 오류가 발생.
오라클의 AQ로 구현 하였습니다.
Weblogic의 MQ의 경우도 계속 시도는 하겠지만 성공여부는 아직 불투명한 상태 입니다.
참고로 말씀드리자면 weblogic.jar파일의 InitialContext.class를 데이터베이스에 로드하면 잘못된
형태의 클래스라는 메시지가 발생합니다.
그리고 오라클에서 EJB 빈클래스를 호출을 하면 Context 에러가 납니다.. 아마도 오라클의 내장 JVM이 jdk1.1.6을 지원하기 때문에 EJB 빈클래스를 오라클 내부로 로드를 할 때, 컴파일 하면서 에러가 납니다. 왜냐하면 jdk1.1.6은 EJB2.0을 지원하지 못하기 때문입니다..
[ 병래 문서 정리 ]
우선 PL/SQL 사용법을 간략하게 보겠습니다..
- Type 생성
create type security as object ( name varchar2(80), price number, shares number);
- Queue Table 생성
execute dbms_aqadm.create_queue_table ( queue_table => 'securities', queue_payload_type => 'security', multiple_consumers => TRUE, compatible => '8.1');
- Queue 생성
execute dbms_aqadm.create_queue ( queue_name => 'stock_ticker_queue', queue_table => 'securities');
execute dbms_aqadm.create_queue ( queue_name => 'stock_orders_queue', queue_table => 'securities');
- 큐 개시
execute dbms_aqadm.start_queue ( queue_name => 'stock_ticker_queue');
execute dbms_aqadm.start_queue ( queue_name => 'stock_orders_queue');
- Subscriber 등록
DECLARE
subscriber sys.aq$_agent;
BEGIN
subscriber := sys.aq$_agent('client1', NULL, NULL);
DBMS_AQADM.ADD_SUBSCRIBER(
queue_name => 'stock_ticker_queue',
subscriber => subscriber,
rule => 'tab.user_data.name IN (''xyz'', ''abc'')');
END;
- 조회
select * from aq$securities;
select queue_table from user_queue_tables;
select name from user_queues;
- Queue의 현재 상태 확인
select name, enqueue_enabled, dequeue_enabled from user_queues;
- 사용자 큐의 타임아웃, 메시지 개수 조사(모니터링)
select q.name, v.waiting, v.ready, v.expired
from v$aq v, user_queues q
where v.qid = q.qid;
- 큐 삭제
select name from user_queues;
execute dbms_aqadm.stop_queue(Queue_name => '큐이름');
execute dbms_aqadm.drop_queue(Queue_name => '큐이름');
- 큐 테이블 삭제
select * from tab;
삭제
execute dbms_aqadm.drop_queue_table( queue_table => '큐테이블이름');
강제삭제
execute dbms_aqadm.drop_queue_table( queue_table => 'SECURITIES', force => true);
- User 확인
select username, profile from dba_users;
[ 실전 예제 ]
단일 소비자 큐
1. queue 타입 작성
create type car_t as object(
name varchar2(40), color varchar2(20))
/
2. 큐 작성과 개시
begin
begin
-- 큐가 이미 존재하고 있으면 삭제
DBMS_AQADM.DROP_QUEUE_TABLE(QUEUE_TABLE=>'carqtab', FORCE=> TRUE);
DBMS_AQADM.DROP_QUEUE_TABLE(QUEUE_TABLE=>'carqtab2', FORCE=> TRUE);
EXCEPTION
WHEN OTHERS THEN
NULL;
END;
-- 큐 테이블 작성
DBMS_AQADM.CREATE_QUEUE_TABLE
(QUEUE_TABLE => 'carqtab',
MULTIPLE_CONSUMERS => false,
QUEUE_PAYLOAD_TYPE => 'car_t');
-- 큐 작성
DBMS_AQADM.CREATE_QUEUE
(QUEUE_TABLE => 'carqtab',
QUEUE_NAME => 'carq');
-- 큐 개시
DBMS_AQADM.START_QUEUE
(QUEUE_NAME => 'carq');
-- 큐 테이블 작성
DBMS_AQADM.CREATE_QUEUE_TABLE
(QUEUE_TABLE => 'carqtab2',
MULTIPLE_CONSUMERS => true,
QUEUE_PAYLOAD_TYPE => 'car_t');
-- 큐 작성
DBMS_AQADM.CREATE_QUEUE
(QUEUE_TABLE => 'carqtab2',
QUEUE_NAME => 'carq2');
-- 큐 개시
DBMS_AQADM.START_QUEUE
(QUEUE_NAME => 'carq2');
END;
3. Enqueue 엔큐
SET SERVEROUTPUT ON
DECLARE
data car_t;
enqopt DBMS_AQ.ENQUEUE_OPTIONS_T;
msgprop DBMS_AQ.MESSAGE_PROPERTIES_T;
msgid RAW(16);
BEGIN
data := car_t('CR-V','GOLD');
DBMS_AQ.ENQUEUE(QUEUE_NAME => 'carq',
ENQUEUE_OPTIONS => enqopt,
MESSAGE_PROPERTIES => msgprop,
PAYLOAD => data,
MSGID => msgid);
DBMS_OUTPUT.PUT_LINE('message id: ' || msgid);
COMMIT;
END;
4. 데이타가 enqueue 됐는지 확인
select msg_id, msg_state, user_data from aq$carqtab;
5. Dequeue 데큐
SET SERVEROUTPUT ON
DECLARE
data car_t;
deqopt DBMS_AQ.DEQUEUE_OPTIONS_T;
msgprop DBMS_AQ.MESSAGE_PROPERTIES_T;
msgid RAW(16);
BEGIN
DBMS_AQ.DEQUEUE(QUEUE_NAME => 'carq',
DEQUEUE_OPTIONS => deqopt,
MESSAGE_PROPERTIES => msgprop,
PAYLOAD => data,
MSGID => msgid);
DBMS_OUTPUT.PUT_LINE('message id: ' || msgid);
DBMS_OUTPUT.PUT_LINE(' name: ' || data.name);
DBMS_OUTPUT.PUT_LINE(' color: ' || data.color);
COMMIT;
END;
6. 메시지가 dequeue되면 레코드가 사라진다.(5번 수행후..)
select msg_id, msg_state, user_data from aq$carqtab;
복수 소비자 큐
에러가 너무 많이 남… 못해 봄
메시지 Grouping
큐 작성
BEGIN
BEGIN
-- 이미 큐(cue) 표가 존재하고 있다면 삭제
DBMS_AQADM.DROP_QUEUE_TABLE
(QUEUE_TABLE => 'carqtab3', FORCE => TRUE);
EXCEPTION
WHEN OTHERS THEN
NULL;
END;
-- 큐(cue) 표의 작성
DBMS_AQADM.CREATE_QUEUE_TABLE
(QUEUE_TABLE => 'carqtab3',
MESSAGE_GROUPING => DBMS_AQADM.TRANSACTIONAL,
QUEUE_PAYLOAD_TYPE => 'car_t');
-- 큐(cue)의 작성
DBMS_AQADM.CREATE_QUEUE
(QUEUE_TABLE => 'carqtab3',
QUEUE_NAME => 'carq3');
-- 큐(cue)의 개시
DBMS_AQADM.START_QUEUE
(QUEUE_NAME => 'carq3');
END;
/
[ grpcreate.sql ]
Enqueue 엔큐
DECLARE
-- 엔큐 을(를) 한 절차(procedure)
PROCEDURE ENQ(name VARCHAR2, color VARCHAR2) IS
data car_t;
enqopt DBMS_AQ.ENQUEUE_OPTIONS_T;
msgprop DBMS_AQ.MESSAGE_PROPERTIES_T;
msgid RAW(16);
BEGIN
data := car_t(name, color);
-- 엔큐
DBMS_AQ.ENQUEUE(QUEUE_NAME => 'carq3',
ENQUEUE_OPTIONS => enqopt,
MESSAGE_PROPERTIES => msgprop,
PAYLOAD => data,
MSGID => msgid);
END;
BEGIN
ENQ('Vitz', 'WHITE');
ENQ('Vitz', 'BLACK');
ENQ('Vitz', 'BLUE');
COMMIT;
ENQ('LEGNUM', 'WHITE');
ENQ('LEGNUM', 'BLACK');
ENQ('LEGNUM', 'BLUE');
COMMIT;
ENQ('IMPREZA', 'WHITE');
ENQ('IMPREZA', 'BLACK');
ENQ('IMPREZA', 'BLUE');
COMMIT;
END;
/
[ grpenq.sql ]
엔큐 확인
select enq_txn_id, user_data from aq$carqtab3;
Dequeue 데큐
SET SERVEROUTPUT ON
DECLARE
data car_t;
deqopt DBMS_AQ.DEQUEUE_OPTIONS_T;
msgprop DBMS_AQ.MESSAGE_PROPERTIES_T;
msgid RAW(16);
-- 메시지(message)가 벌써(이제) 없는 취지의 예외
no_messages EXCEPTION;
pragma exception_init(no_messages, -25228);
-- 메시지(message)·그룹(group)의 마지막으로 왔던 취지의 예외
end_of_group EXCEPTION;
pragma exception_init(end_of_group, -25235);
BEGIN
-- 데큐 때에 메시지(message) 도착을 대기하지 않도록 한다.
deqopt.WAIT := DBMS_AQ.NO_WAIT;
-- 메시지(message)가 꺼내고 위치를 최초의 메시지(message)에 설정
deqopt.NAVIGATION := DBMS_AQ.FIRST_MESSAGE;
LOOP
BEGIN
-- 데큐
DBMS_AQ.DEQUEUE(QUEUE_NAME => 'carq3',
DEQUEUE_OPTIONS => deqopt,
MESSAGE_PROPERTIES => msgprop,
PAYLOAD => data,
MSGID => msgid);
-- 내용을 표시
DBMS_OUTPUT.PUT_LINE('Name: ' || data.name ||
', Color: ' || data.color);
-- 메시지(message)가 꺼내고 위치를 ,동일한 트랜잭션(transaction)의
-- 다음의 메시지(message)에 설정
deqopt.NAVIGATION := DBMS_AQ.NEXT_MESSAGE;
EXCEPTION
-- 메시지(message)·그룹(group)의 마지막으로 왔다
WHEN end_of_group THEN
DBMS_OUTPUT.PUT_LINE
(' 이 트랜잭션(transaction)로 메시지(message)는 맹아 里麻(리마 ) 이전. ');
-- 메시지(message)가 꺼내고 위치를 ,다음의 트랜잭션(transaction)의
-- 최초의 메시지(message)에 설정
deqopt.NAVIGATION := DBMS_AQ.NEXT_TRANSACTION;
END;
END LOOP;
EXCEPTION
-- 메시지(message)가 벌써(이제) 없다
WHEN no_messages THEN
DBMS_OUTPUT.PUT_LINE
(' 메시지(message)는 맹아 里麻(리마 ) 이전. ');
END;
/
[ grpdeq.sql ]
2장
사용자 생성(user)
DROP USER &1 CASCADE;
CREATE USER &1 IDENTIFIED BY &2
DEFAULT TABLESPACE users
TEMPORARY TABLESPACE temp;
GRANT connect, resource TO &1;
GRANT aq_administrator_role TO &1;
[ create_user.sql ]
sql> @create_user aqtest1 aqtest1
sql> @create_user aqtest2 aqtest2
sql> @create_user aqtest3 aqtest3
데이터 베이스 링크 작성
sql > DROP PUBLIC DATABASE LINK remotedb;
sql > create public database link remotedb
connect to aqtest3
identified by aqtest3
using '&1';
[ create_db_link.sql ]
sql > @create_db_link ora901
Type 작성
DROP TYPE car_t;
CREATE TYPE car_t AS OBJECT (
name VARCHAR2(40),
color VARCHAR2(20)
)
/
[ create_type.sql ]
큐(queue)의 작성
선험적 도식(schema) 이름 | 큐(cue) 표명 | 큐(cue) 이름 | 비고 |
aqtest1 | fromqtab | fromq | 전파원 큐(cue) |
aqtest1 | toqtab | toq | 패턴(pattern) 1 용 전파선 큐(cue) |
aqtest2 | toqtab | toq | 패턴(pattern) 2 용 전파선 큐(cue) |
aqtest3 | toqtab | toq | 패턴(pattern) 3 용 전파선 큐(cue) |
BEGIN
BEGIN
-- 이미 큐(cue) 표가 존재하고 있다면 삭제
DBMS_AQADM.DROP_QUEUE_TABLE
(QUEUE_TABLE => '&1', FORCE => TRUE);
EXCEPTION
WHEN OTHERS THEN
NULL;
END;
-- 큐(cue) 표의 작성
DBMS_AQADM.CREATE_QUEUE_TABLE
(QUEUE_TABLE => '&1',
MULTIPLE_CONSUMERS => TRUE,
QUEUE_PAYLOAD_TYPE => 'car_t');
-- 큐(cue)의 작성
DBMS_AQADM.CREATE_QUEUE
(QUEUE_TABLE => '&1',
QUEUE_NAME => '&2');
-- 큐(cue)의 개시
DBMS_AQADM.START_QUEUE
(QUEUE_NAME => '&2');
END;
/
[ create_queue.sql ]
sql> @create_queue fromqtab fromq
sql> @create_queue toqtab toq
sql> connect aqtest2/aqtest2
sql> @create_queue toqtab toq
sql> connect aqtest3/aqtest3
sql> @create_queue toqtab toq
큐에 권한 부여
SET VERIFY OFF
BEGIN
-- 엔큐 권한을 부여
DBMS_AQADM.GRANT_QUEUE_PRIVILEGE
(PRIVILEGE => 'enqueue',
QUEUE_NAME => '&1',
GRANTEE => '&2');
END;
/
[ grant_enqueue.sql ]
sql> @grant_enqueue toq aqtest1
Subscriber의 추가
패턴(pattern) | Subscriber 이름 |
패턴(pattern) 1 | self |
패턴(pattern) 2 | friend |
패턴(pattern) 3 | remote |
패턴(pattern) | Subscriber 이름 | 어드레스(address) |
패턴(pattern) 1 | self | (aqtest1.)toq |
패턴(pattern) 2 | friend | aqtest2.toq |
패턴(pattern) 3 | remote | aqtest3.toq@remotedb |
SQL> connect aqtest1/aqtest1
Connected.
SQL> @add_subscriber toq self "" ""
PL/SQL procedure successfully completed.
SQL> connect aqtest2/aqtest2
Connected.
SQL> @add_subscriber toq friend "" ""
PL/SQL procedure successfully completed.
SQL> connect aqtest3/aqtest3
Connected.
SQL> @add_subscriber toq remote "" ""
PL/SQL procedure successfully completed.
다음은 리모트 subscribe를 등록
SQL> connect aqtest1/aqtest1
Connected.
SQL> @add_subscriber fromq self toq ""
PL/SQL procedure successfully completed.
SQL> @add_subscriber fromq friend aqtest2.toq ""
PL/SQL procedure successfully completed.
SQL> @add_subscriber fromq remote aqtest3.toq@remotedb ""
PL/SQL procedure successfully completed.
subscribe의 개시
BEGIN
-- 전파 스케줄(schedule)의 개시
DBMS_AQADM.SCHEDULE_PROPAGATION
(QUEUE_NAME => '&1',
DESTINATION => '&2');
END;
/
[ schedule.sql ]
local database의 queue에 schedule
sql> @schedule fromq “”
PL/SQL procedure successfully completed.
remote database의 queue에 schedule
SQL> @schedule fromq remotedb
PL/SQL procedure successfully completed.
메시지의 Enqueue
SET SERVEROUTPUT ON
SET VERIFY OFF DECLARE data car_t; enqopt DBMS_AQ.ENQUEUE_OPTIONS_T; msgprop DBMS_AQ.MESSAGE_PROPERTIES_T; msgid RAW(16); BEGIN data := car_t('&2', '&3'); -- 푸라이오리치를 설정 msgprop.PRIORITY := &4; -- 파부리슈 DBMS_AQ.ENQUEUE(QUEUE_NAME => '&1', ENQUEUE_OPTIONS => enqopt, MESSAGE_PROPERTIES => msgprop, PAYLOAD => data, MSGID => msgid); DBMS_OUTPUT.PUT_LINE('message id: ' || msgid); DBMS_OUTPUT.PUT_LINE(' name: ' || '&2'); DBMS_OUTPUT.PUT_LINE(' color: ' || '&3'); COMMIT; END; / |
[ publish.sql ]
실행 :
SQL> @publish fromq Mini WHITE 1
message id: 230AC008E2364EFF8E5908E9C830AACE
name: Mini
color: WHITE
PL/SQL procedure successfully completed.
큐의 확인
aq$< QueueTable 명 > 를 보고, 엔큐된지 아닌지 확인한다. msg_state의 값이 READY이면 아직 준비상태이고, PROCESSED이면 전파되어진 상태이다.
SQL> select msg_id, user_data, msg_state from aq$fromqtab;
MSG_ID
--------------------------------
USER_DATA(NAME, COLOR)
----------------------------------------------------------
MSG_STATE
---------
230AC008E2364EFF8E5908E9C830AACE
CAR_T('Mini', 'WHITE')
READY
메시지의 Dequeue
SET SERVEROUTPUT ON
SET VERIFY OFF DECLARE data car_t; deqopt DBMS_AQ.DEQUEUE_OPTIONS_T; msgprop DBMS_AQ.MESSAGE_PROPERTIES_T; msgid RAW(16); BEGIN -- Subscriber 이름의 설정 deqopt.CONSUMER_NAME := '&2'; -- Subscribe DBMS_AQ.DEQUEUE(QUEUE_NAME => '&1', DEQUEUE_OPTIONS => deqopt, MESSAGE_PROPERTIES => msgprop, PAYLOAD => data, MSGID => msgid); DBMS_OUTPUT.PUT_LINE('message id: ' || msgid); DBMS_OUTPUT.PUT_LINE(' name: ' || data.name); DBMS_OUTPUT.PUT_LINE(' color: ' || data.color); COMMIT; END; / |
[ subscribe.sql ]
확인 :
SQL> connect aqtest1/aqtest1
Connected.
SQL> @subscribe toq self
message id: 83281BFD9C67491DAA8C041FE06A2BFB
name: Mini
color: WHITE
PL/SQL procedure successfully completed.
SQL> connect aqtest2/aqtest2
Connected.
SQL> @subscribe toq friend
message id: 63C35F4EFD7947158F900F688ED8166D
name: Mini
color: WHITE
PL/SQL procedure successfully completed.
전파된 조건의 설정
패턴(pattern) | subscriber 이름 | 어드레스(address) | 룰(rule) |
4 | friend | aqtest2.toq | tab.user_data.name=’LANCIA’ |
5 | remote | aqtest3.toq@remotedb | priority<2 |
SQL> @publish fromq Cherokee RED 1
message id: E3CBEEF07A3E4DCABC78CBD10B7F71BF
name: Cherokee
color: RED
SQL> select msg_id, user_data from aq$fromqtab;
MSG_ID
--------------------------------
USER_DATA(NAME, COLOR)
-----------------------------------------------
DCA17B1D43F74F399F3C857594ED0975
CAR_T('Mini', 'WHITE')
E3CBEEF07A3E4DCABC78CBD10B7F71BF
CAR_T('Cherokee', 'RED')
SQL> connect aqtest1/aqtest1
Connected.
SQL> @subscribe toq self
message id: 97FD3960DE0E46F2ADB2C630A51EA9F7
name: Cherokee
color: RED
PL/SQL procedure successfully completed.
[ Oracle AQ & JMS ]
drop user &1 cascade; create user &1 identified by &2 default tablespace users temporary tablespace temp; gran t connect, resource to &1; grant aq_administrator_role to &1; |
[ create_user.sql ]
위의 create_user.sql을 c:\oracle\ora81\bin 디렉토리 밑에 저장하고
sqlplus > @create_user aqjms aqjms
import javax.jms.*;
import oracle.jms.*; import oracle.AQ.AQQueueTable; import oracle.AQ.AQException; import oracle.AQ.AQQueueTableProperty; public class CreateQueue extends QueueConnector{ public void createQueue(String url, String user, String password, String queueTableName, String queueName, String messgeType) throws JMSException{ try{ connect(url, user, password); AQjmsSession sess = (AQjmsSession)queueSess; AQQueueTableProperty queueTableProp; if (messgeType.equals("text")){ queueTableProp = new AQQueueTableProperty("SYS.AQ$_JMS_TEXT_MESSAGE"); } else { queueTableProp = new AQQueueTableProperty("SYS.AQ$_JMS_OBJECT_MESSAGE"); } try{ AQQueueTable oldTable = sess.getQueueTable(user, queueTableName); oldTable.drop(true); } catch(AQException e){ } catch(JMSException e){ } AQQueueTable queueTable = sess.createQueueTable(user, queueTableName, queueTableProp); AQjmsDestinationProperty destProp = new AQjmsDestinationProperty(); Queue queue = sess.createQueue(queueTable, queueName, destProp); ((AQjmsDestination)queue).start(queueSess, true, true); } catch(Exception e){ e.printStackTrace(); } disconnect(); } public static void usage(){ System.out.println("java CreateQueue url user password queueTableName" + " queueName MessageType(text or object)"); } public static void main(String args[]) throws Exception{ if (args.length < 6){ usage(); System.exit(1); } String url = args[0]; String user = args[1]; String password = args[2]; String queueTableName = args[3]; String queueName = args[4]; String messageType = args[5]; CreateQueue queue = new CreateQueue(); queue.createQueue(url, user, password, queueTableName, queueName, messageType); } } |
[ CreateQueue.java ]
- Queue 생성
c:\ > java CreateQueue jdbc:oracle:thin:@localhost:1521:ossi aqjms aqjms textqtab textq text
c:\ > java CreateQueue jdbc:oracle:thin:@localhost:1521:ossi aqjms aqjms objqtab objq object
- Queue 생성 확인
sqlplus > connect aqjms/aqjms
sqlplus > select name, enqueue_enabled, dequeue_enabled from user_queues;
import oracle.jms.AQjmsSession;
import javax.jms.*; public class CarSend extends QueueConnector{ public void send(String url, String user, String password, String queueName, String messageType, String maker, String model, boolean setPriority, int priority, String toProperty) throws JMSException{ try{ connect(url, user, password); Queue queue = ((AQjmsSession)queueSess).getQueue(user, queueName); QueueSender queueSender = queueSess.createSender(queue); queueSender.setPriority(priority); if (messageType.equals("text")){ TextMessage msg = queueSess.createTextMessage(); msg.setText(convertChar(maker+model)); queueSender.send(queue,msg); System.out.println("-- 송신된 메시지 ID 입니다 --"); System.out.println("Message ID : " +msg.getJMSMessageID()); } else{ Car theCar = new Car(maker, model); ObjectMessage msg = queueSess.createObjectMessage(); msg.setObject(theCar); queueSender.send(queue, msg); System.out.println("-- 송신된 메시지 ID 입니다 --"); System.out.println("Message ID : " +msg.getJMSMessageID()); } } catch(Exception e){ e.printStackTrace(); } disconnect(); } public static void usage(){ System.out.println("java CarSend url user password queueName maker model"); } public static void main(String args[]) throws JMSException{ if (args.length < 7){ usage(); System.exit(1); } String url = args[0]; String user = args[1]; String password = args[2]; String queueName = args[3]; String messgeType = args[4]; String maker = args[5]; String model = args[6]; boolean setPriority = false; int priority = -1; if ( args.length > 7){ setPriority = true; priority = Integer.parseInt(args[7]); } String toProperty = ""; if (args.length > 8){ toProperty = args[8]; } CarSend cs = new CarSend(); cs.send(url, user, password, queueName, messgeType, maker, model, setPriority, priority, toProperty); } public String convertChar(String str){ String result = null; try{ String aa = System.getProperty("file.encoding"); System.out.println(aa); result = new String(str.getBytes("8859_1"), "KSC5601"); System.out.println("6"); } catch(java.io.UnsupportedEncodingException e){ e.printStackTrace(); } return result; } } |
[ CarSend.java ]
- 큐에 메시지 송신
java CarSend jdbc:oracle:thin:@localhost:1521:ossi aqjms aqjms textq text Ossi C-napse
import oracle.jms.AQjmsSession;
import javax.jms.*; public class CarReceive extends QueueConnector{ public void receive(String url, String user, String password, String queueName, String messageType, String rule) throws JMSException{ try{ connect(url, user,password); Queue queue = ((AQjmsSession)queueSess).getQueue(user, queueName); QueueReceiver receiver = queueSess.createReceiver(queue, rule); if (messageType.equals("text")){ receiveTextMsg(receiver); } else if(messageType.equals("object")){ receiveObjectMsg(receiver); } } catch(Exception e){ System.out.println("Error : " +e); e.printStackTrace(); } disconnect(); } public static void receiveTextMsg(QueueReceiver receiver) throws JMSException{ while(true){ TextMessage msg = (TextMessage)receiver.receiveNoWait(); if (msg == null){ break; } else{ System.out.println("MessgeID : " +msg.getJMSMessageID()); System.out.println("Messgae : " +msg.getText()); } } } public static void receiveObjectMsg(QueueReceiver receiver) throws JMSException{ Car car; String maker; String color; ObjectMessage objMsg = null; while(true){ objMsg = (ObjectMessage)receiver.receiveNoWait(); if (objMsg == null){ break; } else { car = (Car)objMsg.getObject(); maker = car.getMaker(); color = car.getModel(); System.out.println("-- 수신 머라고??"); System.out.println("Message ID :" + objMsg.getJMSMessageID()); System.out.println("Message : " +maker+ " : " +color); } } } public static void usage(){ System.out.println("사용법이 틀렸음"); } public static void main(String args[]) throws JMSException{ if (args.length < 5){ usage(); System.exit(1); } String url = args[0]; String user = args[1]; String password = args[2]; String queueName = args[3]; String messageType = args[4]; String rule = null; if (args.length > 5) rule = args[5]; CarReceive cr = new CarReceive(); cr.receive(url, user, password, queueName, messageType, rule); } } |
- 큐의 메시지를 수신
java CarReceive jdbc:oracle:thin:@localhost:1521:cnapse aqjms aqjms textq text
Written by 오병래.