Saya dapat mencapai ini - saya harus menebak-nebak banyak bagian dari Oracle API, dan mengumpulkan petunjuk dari berbagai blog. Bagi siapa pun yang tertarik di sini adalah cara saya membuatnya berfungsi -1. Saya membuat Objek Oracle di Oracle Db2. Dengan Objek Oracle ini, saya membuat tabel antrian dari tipe objek sebagai payload3. Saya sekarang dapat mengantrekan tipe AQMessage dengan muatan STRUCT, yang berisi objek data4. Dan saya dapat melakukan dequeue dengan konsumen JMS yang memahami jenis muatan ADT (Berkat artikel di http://blog.javaforge.net/post/30858904340/Oracle-advanced-queuing-spring-custom-types )
Berikut langkah-langkah dengan kode -Buat Oracle object . Objek dapat memiliki bidang tipe data primer seperti VARCHAR, TIMESTAMP dll dan juga BLOB, CLOB dll. Dalam hal ini saya menyediakan salah satu kolom sebagai blob untuk membuat segalanya lebih rumit.
create or replace type aq_event_obj as object
(
id varchar2(100),
payload BLOB
);
commit;
Sekarang buat tabel antrian. Jenis payload tabel adalah objek oracle.
private void setup(Connection conn) throws SQLException {
doUpdateDatabase(conn, "BEGIN " + "DBMS_AQADM.CREATE_QUEUE_TABLE( "
+ " QUEUE_TABLE => 'OBJ_SINGLE_QUEUE_TABLE', " + " QUEUE_PAYLOAD_TYPE => 'AQ_EVENT_OBJ', "
+ " COMPATIBLE => '10.0'); " + "END; ");
doUpdateDatabase(conn, "BEGIN " + "DBMS_AQADM.CREATE_QUEUE( " + " QUEUE_NAME => 'OBJ_SINGLE_QUEUE', "
+ " QUEUE_TABLE => 'OBJ_SINGLE_QUEUE_TABLE'); " + "END; ");
doUpdateDatabase(conn, "BEGIN " + " DBMS_AQADM.START_QUEUE('OBJ_SINGLE_QUEUE'); " + "END; ");
}
Anda sekarang dapat mengantrekan tipe AQMessage di Java dengan instance struct dari objek
public void enqueueMessage(OracleConnection conn, String correlationId, byte[] payloadData) throws Exception {
// First create the message properties:
AQMessageProperties aqMessageProperties = AQFactory.createAQMessageProperties();
aqMessageProperties.setCorrelation(correlationId);
aqMessageProperties.setExceptionQueue(EXCEPTION_QUEUE_NAME);
// Specify an agent as the sender:
AQAgent aqAgent = AQFactory.createAQAgent();
aqAgent.setName(SENDER_NAME);
aqAgent.setAddress(QUEUE_NAME);
aqMessageProperties.setSender(aqAgent);
// Create the payload
StructDescriptor structDescriptor = StructDescriptor.createDescriptor(EVENT_OBJECT, conn);
Map<String, Object> payloadMap = new HashMap<String, Object>();
payloadMap.put("ID", correlationId);
payloadMap.put("PAYLOAD", new OracleAQBLOBUtil().createBlob(conn, payloadData));
STRUCT struct = new STRUCT(structDescriptor, conn, payloadMap);
// Create the actual AQMessage instance:
AQMessage aqMessage = AQFactory.createAQMessage(aqMessageProperties);
aqMessage.setPayload(struct);
AQEnqueueOptions opt = new AQEnqueueOptions();
opt.setDeliveryMode(AQEnqueueOptions.DeliveryMode.PERSISTENT);
opt.setVisibility(AQEnqueueOptions.VisibilityOption.ON_COMMIT);
// execute the actual enqueue operation:
conn.enqueue(QUEUE_NAME, opt, aqMessage);
}
Bidang gumpalan membutuhkan penanganan khusus
public class OracleAQBLOBUtil {
public BLOB createBlob(OracleConnection conn, byte[] payload) throws Exception {
BLOB blob = BLOB.createTemporary(conn, false, BLOB.DURATION_SESSION);
OutputStream outputStream = blob.setBinaryStream(1L);
InputStream inputStream = new ByteArrayInputStream(payload);
try {
byte[] buffer = new byte[blob.getBufferSize()];
int bytesRead = 0;
while ((bytesRead = inputStream.read(buffer)) != -1) {
outputStream.write(buffer, 0, bytesRead);
}
return blob;
}
finally {
outputStream.close();
inputStream.close();
}
}
public byte[] saveOutputStream(BLOB blob) throws Exception {
InputStream inputStream = blob.getBinaryStream();
int counter;
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
while ((counter = inputStream.read()) > -1) {
byteArrayOutputStream.write(counter);
}
byteArrayOutputStream.close();
return byteArrayOutputStream.toByteArray();
}
}
Untuk konsumen, Anda perlu menyediakan instance ORADataFactory yang memungkinkan konsumen memahami jenis payload (objek kustom Anda).
AQjmsSession queueSession = (AQjmsSession) session;
Queue queue = (Queue) ctx.lookup(queueName);
MessageConsumer receiver = queueSession.createReceiver(queue, new OracleAQObjORADataFactory());
Di mana kode untuk OracleAQObjORADataFactory adalah
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.SQLException;
import oracle.jdbc.OracleTypes;
import oracle.jpub.runtime.MutableStruct;
import oracle.sql.BLOB;
import oracle.sql.Datum;
import oracle.sql.ORAData;
import oracle.sql.ORADataFactory;
import oracle.sql.STRUCT;
public class OracleAQObjORADataFactory implements ORAData, ORADataFactory {
public static final String EVENT_OBJECT = "SYSTEM.AQ_EVENT_OBJ";
public static final int _SQL_TYPECODE = OracleTypes.STRUCT;
protected MutableStruct _struct;
protected static int[] _sqlType = { java.sql.Types.VARCHAR, java.sql.Types.VARBINARY };
protected static ORADataFactory[] _factory = new ORADataFactory[2];
protected static final OracleAQObjORADataFactory _AqEventObjFactory = new OracleAQObjORADataFactory ();
public static ORADataFactory getORADataFactory() {
return _AqEventObjFactory;
}
/* constructors */
protected void _init_struct(boolean init) {
if (init)
_struct = new MutableStruct(new Object[2], _sqlType, _factory);
}
public OracleAQObjORADataFactory () {
_init_struct(true);
}
public OracleAQObjORADataFactory (String id, byte[] payload) throws SQLException {
_init_struct(true);
setId(id);
setPayload(payload);
}
/* ORAData interface */
public Datum toDatum(Connection c) throws SQLException {
return _struct.toDatum(c, EVENT_OBJECT);
}
/* ORADataFactory interface */
public ORAData create(Datum d, int sqlType) throws SQLException {
return create(null, d, sqlType);
}
protected ORAData create(OracleAQObjORADataFactory o, Datum d, int sqlType) throws SQLException {
if (d == null)
return null;
if (o == null)
o = new OracleAQObjORADataFactory ();
o._struct = new MutableStruct((STRUCT) d, _sqlType, _factory);
return o;
}
public String getId() throws SQLException {
return (String) _struct.getAttribute(0);
}
public void setId(String id) throws SQLException {
_struct.setAttribute(0, id);
}
public byte[] getPayload() throws SQLException {
BLOB blob = (BLOB) _struct.getAttribute(1);
InputStream inputStream = blob.getBinaryStream();
return getBytes(inputStream);
}
public byte[] getBytes(InputStream body) {
int c;
try {
ByteArrayOutputStream f = new ByteArrayOutputStream();
while ((c = body.read()) > -1) {
f.write(c);
}
f.close();
byte[] result = f.toByteArray();
return result;
}
catch (Exception e) {
System.err.println("Exception: " + e.getMessage());
e.printStackTrace();
return null;
}
}
public void setPayload(byte[] payload) throws SQLException {
_struct.setAttribute(1, payload);
}
}
Anda mungkin menggunakan Camel atau Spring di proyek Anda, dalam hal ini -1. Jika Anda menggunakan Camel 2.10.2 atau lebih tinggi, Anda dapat membuat konsumen JMS dengan wadah pendaftar pesan khusus (CAMEL-5676)2. Jika Anda menggunakan versi sebelumnya maka Anda mungkin tidak dapat menggunakan cara titik akhir (saya tidak dapat mengetahuinya), tetapi Anda dapat menggunakan pendengar permintaan JMS
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
<!-- this is just an example, you can also use a datasource as the ctor arg -->
<bean id="connectionFactoryOracleAQQueue" class="oracle.jms.AQjmsFactory" factory-method="getQueueConnectionFactory">
<constructor-arg index="0">
<value>jdbc:oracle:thin:@blrub442:1522:UB23</value>
</constructor-arg>
<constructor-arg index="1" type="java.util.Properties">
<value></value>
</constructor-arg>
</bean>
<bean id="oracleQueueCredentials" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
<property name="targetConnectionFactory">
<ref bean="connectionFactoryOracleAQQueue" />
</property>
<property name="username">
<value>system</value>
</property>
<property name="password">
<value>oracle</value>
</property>
</bean>
<!-- Definitions for JMS Listener classes that we have created -->
<bean id="aqMessageListener" class="com.misys.test.JmsRequestListener" />
<bean id="aqEventQueue" class="com.misys.test.OracleAqQueueFactoryBean">
<property name="connectionFactory" ref="oracleQueueCredentials" />
<property name="oracleQueueName" value="BOZ_SINGLE_QUEUE" />
</bean>
<!-- The Spring DefaultMessageListenerContainer configuration. This bean is automatically loaded when the JMS application context is started -->
<bean id="jmsContainer" class="com.misys.test.AQMessageListenerContainer" scope="singleton">
<property name="connectionFactory" ref="oracleQueueCredentials" />
<property name="destination" ref="aqEventQueue" />
<property name="messageListener" ref="aqMessageListener" />
<property name="sessionTransacted" value="false" />
</bean>
</beans>
Wadah pendengar pesan khusus
public class AQMessageListenerContainer extends DefaultMessageListenerContainer {
@Override
protected MessageConsumer createConsumer(Session session, Destination destination) throws JMSException {
return ((AQjmsSession) session).createConsumer(destination, getMessageSelector(),
OracleAQObjORADataFactory.getORADataFactory(), null, isPubSubNoLocal());
}
}
dan metode request listener onMessage
public void onMessage(Message msg) {
try {
AQjmsAdtMessage aQjmsAdtMessage = (AQjmsAdtMessage) msg;
OracleAQObjORADataFactory obj = (OracleAQObjORADataFactory) aQjmsAdtMessage.getAdtPayload();
System.out.println("Datetime: " + obj.getId());
System.out.println("Payload: " + new String(obj.getPayload(), Charset.forName("UTF-8")));
}
catch (Exception jmsException) {
if (logger.isErrorEnabled()) {
logger.error(jmsException.getLocalizedMessage());
}
}
}