[jboss-svn-commits] JBL Code SVN: r14346 - labs/jbossesb/trunk/product/services/jbossesb/src/main/java/org/jboss/internal/soa/esb/persistence/format/db.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Sat Aug 18 05:42:52 EDT 2007
Author: mark.little at jboss.com
Date: 2007-08-18 05:42:52 -0400 (Sat, 18 Aug 2007)
New Revision: 14346
Modified:
labs/jbossesb/trunk/product/services/jbossesb/src/main/java/org/jboss/internal/soa/esb/persistence/format/db/DBMessageStoreImpl.java
Log:
http://jira.jboss.com/jira/browse/JBESB-804
Modified: labs/jbossesb/trunk/product/services/jbossesb/src/main/java/org/jboss/internal/soa/esb/persistence/format/db/DBMessageStoreImpl.java
===================================================================
--- labs/jbossesb/trunk/product/services/jbossesb/src/main/java/org/jboss/internal/soa/esb/persistence/format/db/DBMessageStoreImpl.java 2007-08-18 09:40:11 UTC (rev 14345)
+++ labs/jbossesb/trunk/product/services/jbossesb/src/main/java/org/jboss/internal/soa/esb/persistence/format/db/DBMessageStoreImpl.java 2007-08-18 09:42:52 UTC (rev 14346)
@@ -21,6 +21,10 @@
package org.jboss.internal.soa.esb.persistence.format.db;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.URI;
import java.sql.Connection;
@@ -37,6 +41,7 @@
import org.jboss.soa.esb.client.ServiceInvoker;
import org.jboss.soa.esb.listeners.message.MessageDeliverException;
import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.format.MessageType;
import org.jboss.soa.esb.message.urigen.MessageURIGenerator;
import org.jboss.soa.esb.persistence.manager.ConnectionManager;
import org.jboss.soa.esb.persistence.manager.ConnectionManagerException;
@@ -44,421 +49,636 @@
import org.jboss.soa.esb.services.persistence.MessageStore;
import org.jboss.soa.esb.services.persistence.MessageStoreException;
import org.jboss.soa.esb.services.persistence.RedeliverStore;
+import org.jboss.soa.esb.services.registry.RegistryException;
import org.jboss.soa.esb.util.Util;
-import org.jboss.util.Base64;
+/**
+ * This implementation only deals with the Serializable and XML message formats.
+ *
+ * TODO: add extensibility to marshal/unmarshal of Message implementations
+ * through plugins. Was in original design, but does not seem to have made it
+ * into this initial implementation. However, Util serialize/deserialize are
+ * tying us to these types anyway, despite the extensibility of the existing
+ * Message plugin approach.
+ */
+
public class DBMessageStoreImpl implements RedeliverStore
{
- private Logger logger = Logger.getLogger(this.getClass());
+ private Logger logger = Logger.getLogger(this.getClass());
- protected ConnectionManager mgr = null;
-
+ protected ConnectionManager mgr = null;
+
private Integer maxRedeliverCount = 10;
-
- protected MessageURIGenerator uriGenerator = new DefaultMessageURIGenerator();
- public DBMessageStoreImpl() throws ConnectionManagerException
+ protected MessageURIGenerator uriGenerator = new DefaultMessageURIGenerator();
+
+ public DBMessageStoreImpl() throws ConnectionManagerException
+ {
+ mgr = ConnectionManagerFactory.getConnectionManager();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.jboss.soa.esb.services.persistence.MessageStore#getMessageURIGenerator()
+ */
+ public MessageURIGenerator getMessageURIGenerator ()
+ {
+ return uriGenerator;
+ }
+
+ /**
+ * add's a
+ *
+ * @Message to the database persistence store will set the 'delivered'
+ * flag to TRUE by default - assuming that the
+ * @Message has been delivered
+ */
+ public synchronized URI addMessage (Message message, String classification)
+ throws MessageStoreException
+ {
+ URI uid = null;
+ Connection conn = null;
+ try
{
- mgr = ConnectionManagerFactory.getConnectionManager();
+ conn = mgr.getConnection();
+ uid = uriGenerator.generateMessageURI(message);
+ insert(uid, message, classification, "TRUE", conn);
}
-
- /* (non-Javadoc)
- * @see org.jboss.soa.esb.services.persistence.MessageStore#getMessageURIGenerator()
- */
- public MessageURIGenerator getMessageURIGenerator() {
- return uriGenerator;
+ catch (MessageStoreException ex)
+ {
+ throw ex;
}
-
- /**
- * add's a @Message to the database persistence store
- * will set the 'delivered' flag to TRUE by default - assuming that the @Message has been delivered
- */
- public synchronized URI addMessage (Message message, String classification) throws MessageStoreException
+ catch (Exception e)
{
- // String messageString = null;
- URI uid = null;
- Connection conn=null;
- try{
- conn = mgr.getConnection();
- uid = uriGenerator.generateMessageURI(message);
- insert(uid, message, classification, "TRUE", conn);
- }
- catch (Exception e)
- {
- logger.error(e);
- throw new MessageStoreException(e);
- }
- finally
- {
- release(conn);
- }
-
- return uid;
+ logger.error(e);
+
+ e.printStackTrace();
+
+ throw new MessageStoreException(e);
}
+ finally
+ {
+ release(conn);
+ }
- /**
- * return a @Message based on the passed in key in the form of a JBoss ESB @URI
- * format for URI: "urn:jboss/esb/message/UID#" + UUID.randomUUID()" - see the method in this class @addMessage
- */
- public synchronized Message getMessage (URI uid)
- throws MessageStoreException
+ return uid;
+ }
+
+ /**
+ * return a
+ *
+ * @Message based on the passed in key in the form of a JBoss ESB
+ * @URI format for URI: "urn:jboss/esb/message/UID#" +
+ * UUID.randomUUID()" - see the method in this class
+ * @addMessage
+ */
+ public synchronized Message getMessage (URI uid)
+ throws MessageStoreException
+ {
+ Message message = null;
+ Connection conn = null;
+ try
{
- Message message = null;
- Connection conn=null;
- try {
- conn = mgr.getConnection();
- message = select(uid, conn);
- } catch (Exception e) {
- throw new MessageStoreException(e);
- } finally {
- release(conn);
- }
- return message;
+ conn = mgr.getConnection();
+ message = select(uid, conn);
}
-
+ catch (MessageStoreException ex)
+ {
+ throw ex;
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+
+ throw new MessageStoreException(e);
+ }
+ finally
+ {
+ release(conn);
+ }
+ return message;
+ }
+
/**
- * return a @Message based on the passed in key in the form of a JBoss ESB @URI
- * format for URI: "urn:jboss/esb/message/UID#" + UUID.randomUUID()" - see the method in this class @addMessage
- */
+ * return a
+ *
+ * @Message based on the passed in key in the form of a JBoss ESB
+ * @URI format for URI: "urn:jboss/esb/message/UID#" +
+ * UUID.randomUUID()" - see the method in this class
+ * @addMessage
+ */
public synchronized Message getMessage (URI uid, String classification)
- throws MessageStoreException
+ throws MessageStoreException
{
- Message message = null;
- Connection conn=null;
- try {
- conn = mgr.getConnection();
- message = select(uid, classification, conn);
- } catch (Exception e) {
- throw new MessageStoreException(e);
- } finally {
- release(conn);
- }
- return message;
+ Message message = null;
+ Connection conn = null;
+ try
+ {
+ conn = mgr.getConnection();
+ message = select(uid, classification, conn);
+ }
+ catch (MessageStoreException ex)
+ {
+ throw ex;
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+
+ throw new MessageStoreException(e);
+ }
+ finally
+ {
+ release(conn);
+ }
+ return message;
}
-
+
/**
- * remove a @Message based on the passed in key in the form of a JBoss ESB @URI
- * format for URI: "urn:jboss/esb/message/UID#" + UUID.randomUUID()" - see the method in this class @removeMessage
- */
+ * remove a
+ *
+ * @Message based on the passed in key in the form of a JBoss ESB
+ * @URI format for URI: "urn:jboss/esb/message/UID#" +
+ * UUID.randomUUID()" - see the method in this class
+ * @removeMessage
+ */
public synchronized int removeMessage (URI uid, String classification)
- throws MessageStoreException
+ throws MessageStoreException
{
- int response;
- Connection conn=null;
- try {
- conn = mgr.getConnection();
- response = delete(uid, classification, conn);
- } catch (Exception e) {
- throw new MessageStoreException(e);
- } finally {
- release(conn);
- }
- return response;
+ int response;
+ Connection conn = null;
+ try
+ {
+ conn = mgr.getConnection();
+ response = delete(uid, classification, conn);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+
+ throw new MessageStoreException(e);
+ }
+ finally
+ {
+ release(conn);
+ }
+ return response;
}
-
- /**
- *
- * @param uid - key for message to set undelivered flag on
- * @throws MessageStoreException
- */
- public void setUndelivered(URI uid) throws MessageStoreException
+
+ /**
+ *
+ * @param uid -
+ * key for message to set undelivered flag on
+ * @throws MessageStoreException
+ */
+ public void setUndelivered (URI uid) throws MessageStoreException
{
- String sql = "update message set delivered = 'FALSE' where uuid=?";
- Connection conn=null;
- try {
- conn = mgr.getConnection();
- PreparedStatement ps = conn.prepareStatement(sql);
- ps.setString(1, uid.toString());
- ps.execute();
- } catch (Exception e) {
- throw new MessageStoreException(e);
- } finally {
- release(conn);
- }
-
+ String sql = "update message set delivered = 'FALSE' where uuid=?";
+ Connection conn = null;
+ try
+ {
+ conn = mgr.getConnection();
+ PreparedStatement ps = conn.prepareStatement(sql);
+ ps.setString(1, uid.toString());
+ ps.execute();
}
-
- public void setDelivered(URI uid) throws MessageStoreException{
- String sql = "update message set delivered = 'TRUE' where uuid=?";
- Connection conn=null;
- try {
- conn = mgr.getConnection();
- PreparedStatement ps = conn.prepareStatement(sql);
- ps.setString(1, "FALSE");
- ps.execute();
- } catch (Exception e) {
- throw new MessageStoreException(e);
- } finally {
- release(conn);
- }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+
+ throw new MessageStoreException(e);
}
-
- /**
- * This method can be used to retrieve a collection of all the undelivered (delivered=FALSE) from the message-store
- * You should test for 'null' on the return type to see if any messages exist in the collection
- * @return Map<URI, Message> - a collection of all the undelivered messages in the message-store
- * @throws MessageStoreException
- */
- public Map<URI, Message> getUndeliveredMessages(String classification) throws MessageStoreException {
- HashMap<URI, Message> messages = new HashMap<URI, Message>();
- String sql = "select uuid from message where delivered='FALSE'";
- if (classification!=null) {
- sql += " and classification='" + classification + "'";
- }
- Connection conn=null;
- try
- {
- conn = mgr.getConnection();
- Statement stmt;
- ResultSet rs;
- stmt = conn.createStatement();
- rs = stmt.executeQuery(sql);
-
- while (rs.next()) {
- URI uid = new URI(rs.getString(1));
- Message msg = getMessage(uid);
- messages.put(uid, msg);
- }
- rs.close();
- stmt.close();
+ finally
+ {
+ release(conn);
+ }
- }
- catch (Exception e)
- {
- throw new MessageStoreException(e);
- }
- finally
- {
- release(conn);
- }
- logger.info("retrieved " + messages.size() + " undelivered messages");
- return messages;
-
+ }
+
+ public void setDelivered (URI uid) throws MessageStoreException
+ {
+ String sql = "update message set delivered = 'TRUE' where uuid=?";
+ Connection conn = null;
+ try
+ {
+ conn = mgr.getConnection();
+ PreparedStatement ps = conn.prepareStatement(sql);
+ ps.setString(1, "FALSE");
+ ps.execute();
}
-
+ catch (Exception e)
+ {
+ e.printStackTrace();
+
+ throw new MessageStoreException(e);
+ }
+ finally
+ {
+ release(conn);
+ }
+ }
+
/**
- * This method can be used to retrieve a collection of all from the message-store
- * You should test for 'null' on the return type to see if any messages exist in the collection
- * @return Map<URI, Message> - a collection of all the undelivered messages in the message-store
- * @throws MessageStoreException
- */
- public Map<URI, Message> getAllMessages(String classification) throws MessageStoreException {
- HashMap<URI, Message> messages = new HashMap<URI, Message>();
- String sql = "select uuid, message from message";
- if (classification!=null) {
- sql += " where classification='" + classification + "'";
- }
- Connection conn=null;
- try
- {
- conn = mgr.getConnection();
- Statement stmt;
- ResultSet rs;
- stmt = conn.createStatement();
- rs = stmt.executeQuery(sql);
-
- while (rs.next()) {
- URI uid = new URI(rs.getString(1));
- Message msg = Util.deserialize((Serializable) Base64.decodeToObject( rs.getString(2)));
- messages.put(uid, msg);
- }
- rs.close();
- stmt.close();
+ * This method can be used to retrieve a collection of all the
+ * undelivered (delivered=FALSE) from the message-store You should test
+ * for 'null' on the return type to see if any messages exist in the
+ * collection
+ *
+ * @return Map<URI, Message> - a collection of all the undelivered
+ * messages in the message-store
+ * @throws MessageStoreException
+ */
+ public Map<URI, Message> getUndeliveredMessages (String classification)
+ throws MessageStoreException
+ {
+ HashMap<URI, Message> messages = new HashMap<URI, Message>();
+ String sql = "select uuid from message where delivered='FALSE'";
+ if (classification != null)
+ {
+ sql += " and classification='" + classification + "'";
+ }
+ Connection conn = null;
+ try
+ {
+ conn = mgr.getConnection();
+ Statement stmt;
+ ResultSet rs;
+ stmt = conn.createStatement();
+ rs = stmt.executeQuery(sql);
- }
- catch (Exception e)
- {
- throw new MessageStoreException(e);
- }
- finally
- {
- release(conn);
- }
- logger.debug("retrieved " + messages.size() + " " + classification + " messages");
- return messages;
-
+ while (rs.next())
+ {
+ URI uid = new URI(rs.getString(1));
+ Message msg = getMessage(uid);
+ messages.put(uid, msg);
+ }
+ rs.close();
+ stmt.close();
+ }
+ catch (MessageStoreException ex)
+ {
+ throw ex;
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+
+ throw new MessageStoreException(e);
+ }
+ finally
+ {
+ release(conn);
+ }
+ logger.info("retrieved " + messages.size() + " undelivered messages");
+ return messages;
+
}
- private void release (Connection conn)
+ /**
+ * This method can be used to retrieve a collection of all from the
+ * message-store You should test for 'null' on the return type to see if
+ * any messages exist in the collection
+ *
+ * @return Map<URI, Message> - a collection of all the undelivered
+ * messages in the message-store
+ * @throws MessageStoreException
+ */
+ public Map<URI, Message> getAllMessages (String classification)
+ throws MessageStoreException
+ {
+ HashMap<URI, Message> messages = new HashMap<URI, Message>();
+ String sql = "select uuid, type, message from message";
+ if (classification != null)
{
+ sql += " where classification='" + classification + "'";
+ }
+ Connection conn = null;
+ try
+ {
+ conn = mgr.getConnection();
+ Statement stmt;
+ ResultSet rs;
+ stmt = conn.createStatement();
+ rs = stmt.executeQuery(sql);
- if (conn != null)
+ while (rs.next())
+ {
+ URI uid = new URI(rs.getString(1));
+ Message msg = decode(rs);
+ messages.put(uid, msg);
+ }
+ rs.close();
+ stmt.close();
+ }
+ catch (MessageStoreException ex)
+ {
+ throw ex;
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+
+ throw new MessageStoreException(e);
+ }
+ finally
+ {
+ release(conn);
+ }
+ logger.debug("retrieved " + messages.size() + " " + classification
+ + " messages");
+ return messages;
+
+ }
+
+ private void release (Connection conn)
+ {
+
+ if (conn != null)
+ {
+ try
+ {
+ conn.close();
+ }
+ catch (Exception e2)
+ {
+ logger.warn(e2.getMessage(), e2);
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ public boolean redeliver (URI uuid) throws MessageStoreException
+ {
+ boolean isDelivered = false;
+ boolean error = false;
+
+ Connection con = null;
+ try
+ {
+ con = mgr.getConnection();
+ con.setAutoCommit(false);
+ con.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
+
+ Message message = select(uuid, con);
+
+ if (message != null
+ && delete(uuid, RedeliverStore.CLASSIFICATION_RDLVR, con) == 1)
+ {
+ // now any good db should have set a read lock on this record,
+ // until we commit.
+ // if exception is thrown up the delivery count on the message
+ // if exceeds the maxcount then update the classification to
+ // DLQ.
+ Service to = (Service) message.getProperties().getProperty(
+ ServiceInvoker.DELIVER_TO);
+ try
{
- try
+ ServiceInvoker si = new ServiceInvoker(to.getCategory(), to
+ .getName());
+ message.getProperties().setProperty(
+ RedeliverStore.IS_REDELIVERY, true);
+ si.deliverAsync(message);
+ isDelivered = true;
+ }
+ catch (MessageDeliverException e)
+ {
+ logger.debug(e.getMessage(), e);
+ }
+
+ if (isDelivered)
+ {
+ // the message is delivered, we're good so remove it
+ // from the store
+ delete(uuid, RedeliverStore.CLASSIFICATION_RDLVR, con);
+ }
+ else
+ {
+ // the message was not delivered
+ if (message.getProperties().getProperty(DELIVER_COUNT) == null)
+ {
+ // appearantly it was the first time
+ message.getProperties().setProperty(
+ RedeliverStore.DELIVER_COUNT,
+ Integer.valueOf("1"));
+ insert(uuid, message,
+ MessageStore.CLASSIFICATION_RDLVR, "FALSE", con);
+ }
+ else
+ {
+ Integer redeliverCount = (Integer) message
+ .getProperties().getProperty(DELIVER_COUNT);
+ if (redeliverCount < maxRedeliverCount
+ || maxRedeliverCount < 0)
{
- conn.close();
+ // up the count
+ message.getProperties().setProperty(
+ RedeliverStore.DELIVER_COUNT,
+ ++redeliverCount);
+ insert(uuid, message,
+ MessageStore.CLASSIFICATION_RDLVR, "FALSE",
+ con);
}
- catch (Exception e2)
+ else
{
- logger.warn(e2.getMessage(), e2);
+ // undeliverable, send to the DLQ
+ insert(uuid, message,
+ MessageStore.CLASSIFICATION_DLQ, "FALSE",
+ con);
}
+ }
}
+ }
}
- /**
- *
- */
- public boolean redeliver(URI uuid) throws MessageStoreException
+ catch (SQLException e)
+ {
+ if (logger.isDebugEnabled())
+ {
+ logger.debug("Deadlocks may occur under normal processing");
+ logger.debug(e.getMessage(), e);
+ }
+ error = true;
+ }
+ finally
+ {
+ if (con != null)
+ {
+ try
+ {
+ if (!error)
+ {
+ con.commit();
+ }
+ else
+ {
+ con.rollback();
+ }
+ }
+ catch (SQLException e)
+ {
+ logger.error(e);
+ }
+ try
+ {
+ con.close();
+ }
+ catch (Exception e2)
+ {
+ logger.error(e2);
+ }
+ }
+ }
+ return isDelivered;
+ }
+
+ private Message select (URI uid, Connection connection)
+ throws SQLException, MessageStoreException
{
- boolean isDelivered=false;
- boolean error=false;
+ Message message = null;
+ String selectSql = "select * from message where uuid=?";
+ PreparedStatement selectStmt = connection.prepareStatement(selectSql);
+ selectStmt.setObject(1, uid.toString());
+ ResultSet rs = selectStmt.executeQuery();
- Connection con = null;
- try
- {
- con = mgr.getConnection();
- con.setAutoCommit(false);
- con.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
-
- Message message=select(uuid, con);
-
- if (message!=null && delete(uuid, RedeliverStore.CLASSIFICATION_RDLVR, con)==1) {
- //now any good db should have set a read lock on this record, until we commit.
- //if exception is thrown up the delivery count on the message
- //if exceeds the maxcount then update the classification to DLQ.
- Service to = (Service) message.getProperties().getProperty(ServiceInvoker.DELIVER_TO);
- try {
- ServiceInvoker si = new ServiceInvoker(to.getCategory(), to.getName());
- message.getProperties().setProperty(RedeliverStore.IS_REDELIVERY, true);
- si.deliverAsync(message);
- isDelivered=true;
- } catch (MessageDeliverException e) {
- logger.debug(e.getMessage(), e);
- }
-
- if (isDelivered) {
- //the message is delivered, we're good so remove it from the store
- delete(uuid, RedeliverStore.CLASSIFICATION_RDLVR, con);
- } else {
- //the message was not delivered
- if (message.getProperties().getProperty(DELIVER_COUNT)==null) {
- //appearantly it was the first time
- message.getProperties().setProperty(RedeliverStore.DELIVER_COUNT, Integer.valueOf("1"));
- insert(uuid, message, MessageStore.CLASSIFICATION_RDLVR, "FALSE", con);
- } else {
- Integer redeliverCount = (Integer) message.getProperties().getProperty(DELIVER_COUNT);
- if (redeliverCount < maxRedeliverCount || maxRedeliverCount < 0) {
- //up the count
- message.getProperties().setProperty(RedeliverStore.DELIVER_COUNT, ++redeliverCount);
- insert(uuid, message, MessageStore.CLASSIFICATION_RDLVR, "FALSE", con);
- } else {
- //undeliverable, send to the DLQ
- insert(uuid, message, MessageStore.CLASSIFICATION_DLQ, "FALSE", con);
- }
- }
- }
- }
- }
- catch (SQLException e)
- {
- if (logger.isDebugEnabled()) {
- logger.debug("Deadlocks may occur under normal processing");
- logger.debug(e.getMessage(), e);
- }
- error=true;
- }
- finally
- {
- if (con!=null) {
- try {
- if (!error) {
- con.commit();
- } else {
- con.rollback();
- }
- } catch (SQLException e) {
- logger.error(e);
- }
- try {
- con.close();
- } catch (Exception e2) {
- logger.error(e2);
- }
- }
- }
- return isDelivered;
+ if (rs.next())
+ message = decode(rs);
+
+ rs.close();
+ selectStmt.close();
+ return message;
}
-
-
- private Message select(URI uid, Connection connection)
- throws SQLException, MessageStoreException
+
+ private Message select (URI uid, String classification,
+ Connection connection) throws SQLException, MessageStoreException
{
- Message message=null;
- String selectSql = "select * from message where uuid=?";
- PreparedStatement selectStmt = connection.prepareStatement(selectSql);
- selectStmt.setObject(1, uid.toString());
- ResultSet rs = selectStmt.executeQuery();
- if (rs.next()) {
- try {
- message = Util.deserialize((Serializable) Base64.decodeToObject(rs.getString("message")));
- } catch (Exception e) {
- throw new MessageStoreException(e);
- }
- }
- rs.close();
- selectStmt.close();
- return message;
+ Message message = null;
+ String selectSql = "select * from message where uuid=? and classification=?";
+ PreparedStatement selectStmt = connection.prepareStatement(selectSql);
+ selectStmt.setObject(1, uid.toString());
+ selectStmt.setObject(2, classification);
+ ResultSet rs = selectStmt.executeQuery();
+
+ if (rs.next())
+ message = decode(rs);
+
+ rs.close();
+ selectStmt.close();
+ return message;
}
-
- private Message select(URI uid, String classification, Connection connection)
- throws SQLException, MessageStoreException
+
+ private int delete (URI uid, String classification, Connection connection)
+ throws SQLException
{
- Message message=null;
- String selectSql = "select * from message where uuid=? and classification=?";
- PreparedStatement selectStmt = connection.prepareStatement(selectSql);
- selectStmt.setObject(1, uid.toString());
- selectStmt.setObject(2, classification);
- ResultSet rs = selectStmt.executeQuery();
- if (rs.next()) {
- try {
- message = Util.deserialize((Serializable) Base64.decodeToObject(rs.getString("message")));
- } catch (Exception e) {
- throw new MessageStoreException(e);
- }
- }
- rs.close();
- selectStmt.close();
- return message;
+ String deleteSql = "delete from message where uuid=? and classification=?";
+ PreparedStatement stmt = connection.prepareStatement(deleteSql);
+ stmt.setObject(1, uid.toString());
+ stmt.setObject(2, classification);
+ int result = stmt.executeUpdate();
+ stmt.close();
+ return result;
}
-
- private int delete(URI uid, String classification, Connection connection)
- throws SQLException
+
+ private void insert (URI uid, Message message, String classification,
+ String delivered, Connection conn) throws SQLException,
+ MessageStoreException
{
- String deleteSql = "delete from message where uuid=? and classification=?";
- PreparedStatement stmt = connection.prepareStatement(deleteSql);
- stmt.setObject(1, uid.toString());
- stmt.setObject(2, classification);
- int result = stmt.executeUpdate();
- stmt.close();
- return result;
+ if (message.getType().equals(MessageType.JAVA_SERIALIZED)
+ || message.getType().equals(MessageType.JBOSS_XML))
+ {
+ String sql = "insert into message(uuid, type, message, delivered, classification) values(?,?,?,?,?)";
+ PreparedStatement ps = conn.prepareStatement(sql);
+
+ ps.setString(1, uid.toString());
+ ps.setString(2, message.getType().toString());
+
+ try
+ {
+ String messageString = null;
+ Serializable serializedForm = Util.serialize(message);
+
+ if (serializedForm instanceof String)
+ messageString = (String) serializedForm;
+ else
+ {
+ ByteArrayOutputStream bo = new ByteArrayOutputStream();
+ ObjectOutputStream os = new ObjectOutputStream(bo);
+
+ os.writeObject(serializedForm);
+ os.flush();
+ os.close();
+
+ messageString = new String(bo.toByteArray());
+ }
+
+ ps.setString(3, messageString);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+
+ throw new MessageStoreException(e);
+ }
+ ps.setString(4, "TRUE");
+ ps.setString(5, classification);
+ ps.execute();
+ ps.close();
+ }
+ else
+ {
+ Thread.currentThread().dumpStack();
+
+ throw new MessageStoreException("Unsupported MessageType: "
+ + message.getType());
+ }
}
-
- private void insert(URI uid, Message message, String classification, String delivered, Connection conn)
- throws SQLException, MessageStoreException
+
+ private final Message decode (ResultSet rs) throws MessageStoreException
{
- String sql = "insert into message(uuid, type, message, delivered, classification) values(?,?,?,?,?)";
- PreparedStatement ps = conn.prepareStatement(sql);
-
- ps.setString(1, uid.toString());
- ps.setString(2, message.getType().toString());
- try {
- String messageString = Base64.encodeObject(Util.serialize(message));
- ps.setString(3, messageString);
- } catch (Exception e) {
- throw new MessageStoreException(e);
- }
- ps.setString(4, "TRUE");
- ps.setString(5, classification);
- ps.execute();
- ps.close();
+ try
+ {
+ URI type = new URI(rs.getString("type"));
+ String data = rs.getString("message");
+
+ if (type.equals(MessageType.JAVA_SERIALIZED))
+ {
+ ByteArrayInputStream bs = new ByteArrayInputStream(data.getBytes());
+ ObjectInputStream os = new ObjectInputStream(bs);
+
+ return Util.deserialize((Serializable) os.readObject());
+ }
+ else
+ {
+ if (type.equals(MessageType.JBOSS_XML))
+ {
+ return Util.deserialize(data);
+ }
+ else
+ throw new MessageStoreException("Unsupported MessageType: "
+ + type);
+ }
+ }
+ catch (MessageStoreException ex)
+ {
+ throw ex;
+ }
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+
+ throw new MessageStoreException(ex);
+ }
}
-
- public Integer getMaxRedeliverCount() {
- return maxRedeliverCount;
+ public Integer getMaxRedeliverCount ()
+ {
+ return maxRedeliverCount;
}
- public void setMaxRedeliverCount(Integer maxRedeliverCount) {
- this.maxRedeliverCount = maxRedeliverCount;
+ public void setMaxRedeliverCount (Integer maxRedeliverCount)
+ {
+ this.maxRedeliverCount = maxRedeliverCount;
}
}
More information about the jboss-svn-commits
mailing list