[jboss-svn-commits] JBL Code SVN: r14348 - labs/jbossesb/trunk/product/services/jbossesb/src/main/java/org/jboss/internal/soa/esb/persistence/format/db.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Sat Aug 18 11:20:13 EDT 2007
Author: tfennelly
Date: 2007-08-18 11:20:13 -0400 (Sat, 18 Aug 2007)
New Revision: 14348
Modified:
labs/jbossesb/trunk/product/services/jbossesb/src/main/java/org/jboss/internal/soa/esb/persistence/format/db/DBMessageStoreImpl.java
Log:
Rolled back - build was broken.
Modified: labs/jbossesb/trunk/product/services/jbossesb/src/main/java/org/jboss/internal/soa/esb/persistence/format/db/DBMessageStoreImpl.java
===================================================================
--- labs/jbossesb/trunk/product/services/jbossesb/src/main/java/org/jboss/internal/soa/esb/persistence/format/db/DBMessageStoreImpl.java 2007-08-18 12:40:38 UTC (rev 14347)
+++ labs/jbossesb/trunk/product/services/jbossesb/src/main/java/org/jboss/internal/soa/esb/persistence/format/db/DBMessageStoreImpl.java 2007-08-18 15:20:13 UTC (rev 14348)
@@ -21,10 +21,6 @@
package org.jboss.internal.soa.esb.persistence.format.db;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.URI;
import java.sql.Connection;
@@ -41,7 +37,6 @@
import org.jboss.soa.esb.client.ServiceInvoker;
import org.jboss.soa.esb.listeners.message.MessageDeliverException;
import org.jboss.soa.esb.message.Message;
-import org.jboss.soa.esb.message.format.MessageType;
import org.jboss.soa.esb.message.urigen.MessageURIGenerator;
import org.jboss.soa.esb.persistence.manager.ConnectionManager;
import org.jboss.soa.esb.persistence.manager.ConnectionManagerException;
@@ -49,636 +44,421 @@
import org.jboss.soa.esb.services.persistence.MessageStore;
import org.jboss.soa.esb.services.persistence.MessageStoreException;
import org.jboss.soa.esb.services.persistence.RedeliverStore;
-import org.jboss.soa.esb.services.registry.RegistryException;
import org.jboss.soa.esb.util.Util;
+import org.jboss.util.Base64;
-/**
- * This implementation only deals with the Serializable and XML message formats.
- *
- * TODO: add extensibility to marshal/unmarshal of Message implementations
- * through plugins. Was in original design, but does not seem to have made it
- * into this initial implementation. However, Util serialize/deserialize are
- * tying us to these types anyway, despite the extensibility of the existing
- * Message plugin approach.
- */
-
public class DBMessageStoreImpl implements RedeliverStore
{
- private Logger logger = Logger.getLogger(this.getClass());
+ private Logger logger = Logger.getLogger(this.getClass());
- protected ConnectionManager mgr = null;
-
+ protected ConnectionManager mgr = null;
+
private Integer maxRedeliverCount = 10;
+
+ protected MessageURIGenerator uriGenerator = new DefaultMessageURIGenerator();
- protected MessageURIGenerator uriGenerator = new DefaultMessageURIGenerator();
-
- public DBMessageStoreImpl() throws ConnectionManagerException
- {
- mgr = ConnectionManagerFactory.getConnectionManager();
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.jboss.soa.esb.services.persistence.MessageStore#getMessageURIGenerator()
- */
- public MessageURIGenerator getMessageURIGenerator ()
- {
- return uriGenerator;
- }
-
- /**
- * add's a
- *
- * @Message to the database persistence store will set the 'delivered'
- * flag to TRUE by default - assuming that the
- * @Message has been delivered
- */
- public synchronized URI addMessage (Message message, String classification)
- throws MessageStoreException
- {
- URI uid = null;
- Connection conn = null;
- try
+ public DBMessageStoreImpl() throws ConnectionManagerException
{
- conn = mgr.getConnection();
- uid = uriGenerator.generateMessageURI(message);
- insert(uid, message, classification, "TRUE", conn);
+ mgr = ConnectionManagerFactory.getConnectionManager();
}
- catch (MessageStoreException ex)
- {
- throw ex;
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.services.persistence.MessageStore#getMessageURIGenerator()
+ */
+ public MessageURIGenerator getMessageURIGenerator() {
+ return uriGenerator;
}
- catch (Exception e)
+
+ /**
+ * add's a @Message to the database persistence store
+ * will set the 'delivered' flag to TRUE by default - assuming that the @Message has been delivered
+ */
+ public synchronized URI addMessage (Message message, String classification) throws MessageStoreException
{
- logger.error(e);
-
- e.printStackTrace();
-
- throw new MessageStoreException(e);
+ // String messageString = null;
+ URI uid = null;
+ Connection conn=null;
+ try{
+ conn = mgr.getConnection();
+ uid = uriGenerator.generateMessageURI(message);
+ insert(uid, message, classification, "TRUE", conn);
+ }
+ catch (Exception e)
+ {
+ logger.error(e);
+ throw new MessageStoreException(e);
+ }
+ finally
+ {
+ release(conn);
+ }
+
+ return uid;
}
- finally
- {
- release(conn);
- }
- return uid;
- }
-
- /**
- * return a
- *
- * @Message based on the passed in key in the form of a JBoss ESB
- * @URI format for URI: "urn:jboss/esb/message/UID#" +
- * UUID.randomUUID()" - see the method in this class
- * @addMessage
- */
- public synchronized Message getMessage (URI uid)
- throws MessageStoreException
- {
- Message message = null;
- Connection conn = null;
- try
+ /**
+ * return a @Message based on the passed in key in the form of a JBoss ESB @URI
+ * format for URI: "urn:jboss/esb/message/UID#" + UUID.randomUUID()" - see the method in this class @addMessage
+ */
+ public synchronized Message getMessage (URI uid)
+ throws MessageStoreException
{
- conn = mgr.getConnection();
- message = select(uid, conn);
+ Message message = null;
+ Connection conn=null;
+ try {
+ conn = mgr.getConnection();
+ message = select(uid, conn);
+ } catch (Exception e) {
+ throw new MessageStoreException(e);
+ } finally {
+ release(conn);
+ }
+ return message;
}
- catch (MessageStoreException ex)
- {
- throw ex;
- }
- catch (Exception e)
- {
- e.printStackTrace();
-
- throw new MessageStoreException(e);
- }
- finally
- {
- release(conn);
- }
- return message;
- }
-
+
/**
- * return a
- *
- * @Message based on the passed in key in the form of a JBoss ESB
- * @URI format for URI: "urn:jboss/esb/message/UID#" +
- * UUID.randomUUID()" - see the method in this class
- * @addMessage
- */
+ * return a @Message based on the passed in key in the form of a JBoss ESB @URI
+ * format for URI: "urn:jboss/esb/message/UID#" + UUID.randomUUID()" - see the method in this class @addMessage
+ */
public synchronized Message getMessage (URI uid, String classification)
- throws MessageStoreException
+ throws MessageStoreException
{
- Message message = null;
- Connection conn = null;
- try
- {
- conn = mgr.getConnection();
- message = select(uid, classification, conn);
- }
- catch (MessageStoreException ex)
- {
- throw ex;
- }
- catch (Exception e)
- {
- e.printStackTrace();
-
- throw new MessageStoreException(e);
- }
- finally
- {
- release(conn);
- }
- return message;
+ Message message = null;
+ Connection conn=null;
+ try {
+ conn = mgr.getConnection();
+ message = select(uid, classification, conn);
+ } catch (Exception e) {
+ throw new MessageStoreException(e);
+ } finally {
+ release(conn);
+ }
+ return message;
}
-
+
/**
- * remove a
- *
- * @Message based on the passed in key in the form of a JBoss ESB
- * @URI format for URI: "urn:jboss/esb/message/UID#" +
- * UUID.randomUUID()" - see the method in this class
- * @removeMessage
- */
+ * remove a @Message based on the passed in key in the form of a JBoss ESB @URI
+ * format for URI: "urn:jboss/esb/message/UID#" + UUID.randomUUID()" - see the method in this class @removeMessage
+ */
public synchronized int removeMessage (URI uid, String classification)
- throws MessageStoreException
+ throws MessageStoreException
{
- int response;
- Connection conn = null;
- try
- {
- conn = mgr.getConnection();
- response = delete(uid, classification, conn);
- }
- catch (Exception e)
- {
- e.printStackTrace();
-
- throw new MessageStoreException(e);
- }
- finally
- {
- release(conn);
- }
- return response;
+ int response;
+ Connection conn=null;
+ try {
+ conn = mgr.getConnection();
+ response = delete(uid, classification, conn);
+ } catch (Exception e) {
+ throw new MessageStoreException(e);
+ } finally {
+ release(conn);
+ }
+ return response;
}
-
- /**
- *
- * @param uid -
- * key for message to set undelivered flag on
- * @throws MessageStoreException
- */
- public void setUndelivered (URI uid) throws MessageStoreException
+
+ /**
+ *
+ * @param uid - key for message to set undelivered flag on
+ * @throws MessageStoreException
+ */
+ public void setUndelivered(URI uid) throws MessageStoreException
{
- String sql = "update message set delivered = 'FALSE' where uuid=?";
- Connection conn = null;
- try
- {
- conn = mgr.getConnection();
- PreparedStatement ps = conn.prepareStatement(sql);
- ps.setString(1, uid.toString());
- ps.execute();
+ String sql = "update message set delivered = 'FALSE' where uuid=?";
+ Connection conn=null;
+ try {
+ conn = mgr.getConnection();
+ PreparedStatement ps = conn.prepareStatement(sql);
+ ps.setString(1, uid.toString());
+ ps.execute();
+ } catch (Exception e) {
+ throw new MessageStoreException(e);
+ } finally {
+ release(conn);
+ }
+
}
- catch (Exception e)
- {
- e.printStackTrace();
-
- throw new MessageStoreException(e);
+
+ public void setDelivered(URI uid) throws MessageStoreException{
+ String sql = "update message set delivered = 'TRUE' where uuid=?";
+ Connection conn=null;
+ try {
+ conn = mgr.getConnection();
+ PreparedStatement ps = conn.prepareStatement(sql);
+ ps.setString(1, "FALSE");
+ ps.execute();
+ } catch (Exception e) {
+ throw new MessageStoreException(e);
+ } finally {
+ release(conn);
+ }
}
- finally
- {
- release(conn);
- }
+
+ /**
+ * This method can be used to retrieve a collection of all the undelivered (delivered=FALSE) from the message-store
+ * You should test for 'null' on the return type to see if any messages exist in the collection
+ * @return Map<URI, Message> - a collection of all the undelivered messages in the message-store
+ * @throws MessageStoreException
+ */
+ public Map<URI, Message> getUndeliveredMessages(String classification) throws MessageStoreException {
+ HashMap<URI, Message> messages = new HashMap<URI, Message>();
+ String sql = "select uuid from message where delivered='FALSE'";
+ if (classification!=null) {
+ sql += " and classification='" + classification + "'";
+ }
+ Connection conn=null;
+ try
+ {
+ conn = mgr.getConnection();
+ Statement stmt;
+ ResultSet rs;
+ stmt = conn.createStatement();
+ rs = stmt.executeQuery(sql);
+
+ while (rs.next()) {
+ URI uid = new URI(rs.getString(1));
+ Message msg = getMessage(uid);
+ messages.put(uid, msg);
+ }
+ rs.close();
+ stmt.close();
- }
-
- public void setDelivered (URI uid) throws MessageStoreException
- {
- String sql = "update message set delivered = 'TRUE' where uuid=?";
- Connection conn = null;
- try
- {
- conn = mgr.getConnection();
- PreparedStatement ps = conn.prepareStatement(sql);
- ps.setString(1, "FALSE");
- ps.execute();
+ }
+ catch (Exception e)
+ {
+ throw new MessageStoreException(e);
+ }
+ finally
+ {
+ release(conn);
+ }
+ logger.info("retrieved " + messages.size() + " undelivered messages");
+ return messages;
+
}
- catch (Exception e)
- {
- e.printStackTrace();
-
- throw new MessageStoreException(e);
- }
- finally
- {
- release(conn);
- }
- }
-
+
/**
- * This method can be used to retrieve a collection of all the
- * undelivered (delivered=FALSE) from the message-store You should test
- * for 'null' on the return type to see if any messages exist in the
- * collection
- *
- * @return Map<URI, Message> - a collection of all the undelivered
- * messages in the message-store
- * @throws MessageStoreException
- */
- public Map<URI, Message> getUndeliveredMessages (String classification)
- throws MessageStoreException
- {
- HashMap<URI, Message> messages = new HashMap<URI, Message>();
- String sql = "select uuid from message where delivered='FALSE'";
- if (classification != null)
- {
- sql += " and classification='" + classification + "'";
- }
- Connection conn = null;
- try
- {
- conn = mgr.getConnection();
- Statement stmt;
- ResultSet rs;
- stmt = conn.createStatement();
- rs = stmt.executeQuery(sql);
+ * This method can be used to retrieve a collection of all from the message-store
+ * You should test for 'null' on the return type to see if any messages exist in the collection
+ * @return Map<URI, Message> - a collection of all the undelivered messages in the message-store
+ * @throws MessageStoreException
+ */
+ public Map<URI, Message> getAllMessages(String classification) throws MessageStoreException {
+ HashMap<URI, Message> messages = new HashMap<URI, Message>();
+ String sql = "select uuid, message from message";
+ if (classification!=null) {
+ sql += " where classification='" + classification + "'";
+ }
+ Connection conn=null;
+ try
+ {
+ conn = mgr.getConnection();
+ Statement stmt;
+ ResultSet rs;
+ stmt = conn.createStatement();
+ rs = stmt.executeQuery(sql);
+
+ while (rs.next()) {
+ URI uid = new URI(rs.getString(1));
+ Message msg = Util.deserialize((Serializable) Base64.decodeToObject( rs.getString(2)));
+ messages.put(uid, msg);
+ }
+ rs.close();
+ stmt.close();
- while (rs.next())
- {
- URI uid = new URI(rs.getString(1));
- Message msg = getMessage(uid);
- messages.put(uid, msg);
- }
- rs.close();
- stmt.close();
- }
- catch (MessageStoreException ex)
- {
- throw ex;
- }
- catch (Exception e)
- {
- e.printStackTrace();
-
- throw new MessageStoreException(e);
- }
- finally
- {
- release(conn);
- }
- logger.info("retrieved " + messages.size() + " undelivered messages");
- return messages;
-
+ }
+ catch (Exception e)
+ {
+ throw new MessageStoreException(e);
+ }
+ finally
+ {
+ release(conn);
+ }
+ logger.debug("retrieved " + messages.size() + " " + classification + " messages");
+ return messages;
+
}
- /**
- * This method can be used to retrieve a collection of all from the
- * message-store You should test for 'null' on the return type to see if
- * any messages exist in the collection
- *
- * @return Map<URI, Message> - a collection of all the undelivered
- * messages in the message-store
- * @throws MessageStoreException
- */
- public Map<URI, Message> getAllMessages (String classification)
- throws MessageStoreException
- {
- HashMap<URI, Message> messages = new HashMap<URI, Message>();
- String sql = "select uuid, type, message from message";
- if (classification != null)
+ private void release (Connection conn)
{
- sql += " where classification='" + classification + "'";
- }
- Connection conn = null;
- try
- {
- conn = mgr.getConnection();
- Statement stmt;
- ResultSet rs;
- stmt = conn.createStatement();
- rs = stmt.executeQuery(sql);
- while (rs.next())
- {
- URI uid = new URI(rs.getString(1));
- Message msg = decode(rs);
- messages.put(uid, msg);
- }
- rs.close();
- stmt.close();
- }
- catch (MessageStoreException ex)
- {
- throw ex;
- }
- catch (Exception e)
- {
- e.printStackTrace();
-
- throw new MessageStoreException(e);
- }
- finally
- {
- release(conn);
- }
- logger.debug("retrieved " + messages.size() + " " + classification
- + " messages");
- return messages;
-
- }
-
- private void release (Connection conn)
- {
-
- if (conn != null)
- {
- try
- {
- conn.close();
- }
- catch (Exception e2)
- {
- logger.warn(e2.getMessage(), e2);
- }
- }
- }
-
- /**
- *
- */
- public boolean redeliver (URI uuid) throws MessageStoreException
- {
- boolean isDelivered = false;
- boolean error = false;
-
- Connection con = null;
- try
- {
- con = mgr.getConnection();
- con.setAutoCommit(false);
- con.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
-
- Message message = select(uuid, con);
-
- if (message != null
- && delete(uuid, RedeliverStore.CLASSIFICATION_RDLVR, con) == 1)
- {
- // now any good db should have set a read lock on this record,
- // until we commit.
- // if exception is thrown up the delivery count on the message
- // if exceeds the maxcount then update the classification to
- // DLQ.
- Service to = (Service) message.getProperties().getProperty(
- ServiceInvoker.DELIVER_TO);
- try
+ if (conn != null)
{
- ServiceInvoker si = new ServiceInvoker(to.getCategory(), to
- .getName());
- message.getProperties().setProperty(
- RedeliverStore.IS_REDELIVERY, true);
- si.deliverAsync(message);
- isDelivered = true;
- }
- catch (MessageDeliverException e)
- {
- logger.debug(e.getMessage(), e);
- }
-
- if (isDelivered)
- {
- // the message is delivered, we're good so remove it
- // from the store
- delete(uuid, RedeliverStore.CLASSIFICATION_RDLVR, con);
- }
- else
- {
- // the message was not delivered
- if (message.getProperties().getProperty(DELIVER_COUNT) == null)
- {
- // appearantly it was the first time
- message.getProperties().setProperty(
- RedeliverStore.DELIVER_COUNT,
- Integer.valueOf("1"));
- insert(uuid, message,
- MessageStore.CLASSIFICATION_RDLVR, "FALSE", con);
- }
- else
- {
- Integer redeliverCount = (Integer) message
- .getProperties().getProperty(DELIVER_COUNT);
- if (redeliverCount < maxRedeliverCount
- || maxRedeliverCount < 0)
+ try
{
- // up the count
- message.getProperties().setProperty(
- RedeliverStore.DELIVER_COUNT,
- ++redeliverCount);
- insert(uuid, message,
- MessageStore.CLASSIFICATION_RDLVR, "FALSE",
- con);
+ conn.close();
}
- else
+ catch (Exception e2)
{
- // undeliverable, send to the DLQ
- insert(uuid, message,
- MessageStore.CLASSIFICATION_DLQ, "FALSE",
- con);
+ logger.warn(e2.getMessage(), e2);
}
- }
}
- }
}
- catch (SQLException e)
- {
- if (logger.isDebugEnabled())
- {
- logger.debug("Deadlocks may occur under normal processing");
- logger.debug(e.getMessage(), e);
- }
- error = true;
- }
- finally
- {
- if (con != null)
- {
- try
- {
- if (!error)
- {
- con.commit();
- }
- else
- {
- con.rollback();
- }
- }
- catch (SQLException e)
- {
- logger.error(e);
- }
- try
- {
- con.close();
- }
- catch (Exception e2)
- {
- logger.error(e2);
- }
- }
- }
- return isDelivered;
- }
-
- private Message select (URI uid, Connection connection)
- throws SQLException, MessageStoreException
+ /**
+ *
+ */
+ public boolean redeliver(URI uuid) throws MessageStoreException
{
- Message message = null;
- String selectSql = "select * from message where uuid=?";
- PreparedStatement selectStmt = connection.prepareStatement(selectSql);
- selectStmt.setObject(1, uid.toString());
- ResultSet rs = selectStmt.executeQuery();
+ boolean isDelivered=false;
+ boolean error=false;
- if (rs.next())
- message = decode(rs);
-
- rs.close();
- selectStmt.close();
- return message;
+ Connection con = null;
+ try
+ {
+ con = mgr.getConnection();
+ con.setAutoCommit(false);
+ con.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
+
+ Message message=select(uuid, con);
+
+ if (message!=null && delete(uuid, RedeliverStore.CLASSIFICATION_RDLVR, con)==1) {
+ //now any good db should have set a read lock on this record, until we commit.
+ //if exception is thrown up the delivery count on the message
+ //if exceeds the maxcount then update the classification to DLQ.
+ Service to = (Service) message.getProperties().getProperty(ServiceInvoker.DELIVER_TO);
+ try {
+ ServiceInvoker si = new ServiceInvoker(to.getCategory(), to.getName());
+ message.getProperties().setProperty(RedeliverStore.IS_REDELIVERY, true);
+ si.deliverAsync(message);
+ isDelivered=true;
+ } catch (MessageDeliverException e) {
+ logger.debug(e.getMessage(), e);
+ }
+
+ if (isDelivered) {
+ //the message is delivered, we're good so remove it from the store
+ delete(uuid, RedeliverStore.CLASSIFICATION_RDLVR, con);
+ } else {
+ //the message was not delivered
+ if (message.getProperties().getProperty(DELIVER_COUNT)==null) {
+ //appearantly it was the first time
+ message.getProperties().setProperty(RedeliverStore.DELIVER_COUNT, Integer.valueOf("1"));
+ insert(uuid, message, MessageStore.CLASSIFICATION_RDLVR, "FALSE", con);
+ } else {
+ Integer redeliverCount = (Integer) message.getProperties().getProperty(DELIVER_COUNT);
+ if (redeliverCount < maxRedeliverCount || maxRedeliverCount < 0) {
+ //up the count
+ message.getProperties().setProperty(RedeliverStore.DELIVER_COUNT, ++redeliverCount);
+ insert(uuid, message, MessageStore.CLASSIFICATION_RDLVR, "FALSE", con);
+ } else {
+ //undeliverable, send to the DLQ
+ insert(uuid, message, MessageStore.CLASSIFICATION_DLQ, "FALSE", con);
+ }
+ }
+ }
+ }
+ }
+ catch (SQLException e)
+ {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Deadlocks may occur under normal processing");
+ logger.debug(e.getMessage(), e);
+ }
+ error=true;
+ }
+ finally
+ {
+ if (con!=null) {
+ try {
+ if (!error) {
+ con.commit();
+ } else {
+ con.rollback();
+ }
+ } catch (SQLException e) {
+ logger.error(e);
+ }
+ try {
+ con.close();
+ } catch (Exception e2) {
+ logger.error(e2);
+ }
+ }
+ }
+ return isDelivered;
}
-
- private Message select (URI uid, String classification,
- Connection connection) throws SQLException, MessageStoreException
+
+
+ private Message select(URI uid, Connection connection)
+ throws SQLException, MessageStoreException
{
- Message message = null;
- String selectSql = "select * from message where uuid=? and classification=?";
- PreparedStatement selectStmt = connection.prepareStatement(selectSql);
- selectStmt.setObject(1, uid.toString());
- selectStmt.setObject(2, classification);
- ResultSet rs = selectStmt.executeQuery();
-
- if (rs.next())
- message = decode(rs);
-
- rs.close();
- selectStmt.close();
- return message;
+ Message message=null;
+ String selectSql = "select * from message where uuid=?";
+ PreparedStatement selectStmt = connection.prepareStatement(selectSql);
+ selectStmt.setObject(1, uid.toString());
+ ResultSet rs = selectStmt.executeQuery();
+ if (rs.next()) {
+ try {
+ message = Util.deserialize((Serializable) Base64.decodeToObject(rs.getString("message")));
+ } catch (Exception e) {
+ throw new MessageStoreException(e);
+ }
+ }
+ rs.close();
+ selectStmt.close();
+ return message;
}
-
- private int delete (URI uid, String classification, Connection connection)
- throws SQLException
+
+ private Message select(URI uid, String classification, Connection connection)
+ throws SQLException, MessageStoreException
{
- String deleteSql = "delete from message where uuid=? and classification=?";
- PreparedStatement stmt = connection.prepareStatement(deleteSql);
- stmt.setObject(1, uid.toString());
- stmt.setObject(2, classification);
- int result = stmt.executeUpdate();
- stmt.close();
- return result;
+ Message message=null;
+ String selectSql = "select * from message where uuid=? and classification=?";
+ PreparedStatement selectStmt = connection.prepareStatement(selectSql);
+ selectStmt.setObject(1, uid.toString());
+ selectStmt.setObject(2, classification);
+ ResultSet rs = selectStmt.executeQuery();
+ if (rs.next()) {
+ try {
+ message = Util.deserialize((Serializable) Base64.decodeToObject(rs.getString("message")));
+ } catch (Exception e) {
+ throw new MessageStoreException(e);
+ }
+ }
+ rs.close();
+ selectStmt.close();
+ return message;
}
-
- private void insert (URI uid, Message message, String classification,
- String delivered, Connection conn) throws SQLException,
- MessageStoreException
+
+ private int delete(URI uid, String classification, Connection connection)
+ throws SQLException
{
- if (message.getType().equals(MessageType.JAVA_SERIALIZED)
- || message.getType().equals(MessageType.JBOSS_XML))
- {
- String sql = "insert into message(uuid, type, message, delivered, classification) values(?,?,?,?,?)";
- PreparedStatement ps = conn.prepareStatement(sql);
-
- ps.setString(1, uid.toString());
- ps.setString(2, message.getType().toString());
-
- try
- {
- String messageString = null;
- Serializable serializedForm = Util.serialize(message);
-
- if (serializedForm instanceof String)
- messageString = (String) serializedForm;
- else
- {
- ByteArrayOutputStream bo = new ByteArrayOutputStream();
- ObjectOutputStream os = new ObjectOutputStream(bo);
-
- os.writeObject(serializedForm);
- os.flush();
- os.close();
-
- messageString = new String(bo.toByteArray());
- }
-
- ps.setString(3, messageString);
- }
- catch (Exception e)
- {
- e.printStackTrace();
-
- throw new MessageStoreException(e);
- }
- ps.setString(4, "TRUE");
- ps.setString(5, classification);
- ps.execute();
- ps.close();
- }
- else
- {
- Thread.currentThread().dumpStack();
-
- throw new MessageStoreException("Unsupported MessageType: "
- + message.getType());
- }
+ String deleteSql = "delete from message where uuid=? and classification=?";
+ PreparedStatement stmt = connection.prepareStatement(deleteSql);
+ stmt.setObject(1, uid.toString());
+ stmt.setObject(2, classification);
+ int result = stmt.executeUpdate();
+ stmt.close();
+ return result;
}
-
- private final Message decode (ResultSet rs) throws MessageStoreException
+
+ private void insert(URI uid, Message message, String classification, String delivered, Connection conn)
+ throws SQLException, MessageStoreException
{
- try
- {
- URI type = new URI(rs.getString("type"));
- String data = rs.getString("message");
-
- if (type.equals(MessageType.JAVA_SERIALIZED))
- {
- ByteArrayInputStream bs = new ByteArrayInputStream(data.getBytes());
- ObjectInputStream os = new ObjectInputStream(bs);
-
- return Util.deserialize((Serializable) os.readObject());
- }
- else
- {
- if (type.equals(MessageType.JBOSS_XML))
- {
- return Util.deserialize(data);
- }
- else
- throw new MessageStoreException("Unsupported MessageType: "
- + type);
- }
- }
- catch (MessageStoreException ex)
- {
- throw ex;
- }
- catch (Exception ex)
- {
- ex.printStackTrace();
-
- throw new MessageStoreException(ex);
- }
+ String sql = "insert into message(uuid, type, message, delivered, classification) values(?,?,?,?,?)";
+ PreparedStatement ps = conn.prepareStatement(sql);
+
+ ps.setString(1, uid.toString());
+ ps.setString(2, message.getType().toString());
+ try {
+ String messageString = Base64.encodeObject(Util.serialize(message));
+ ps.setString(3, messageString);
+ } catch (Exception e) {
+ throw new MessageStoreException(e);
+ }
+ ps.setString(4, "TRUE");
+ ps.setString(5, classification);
+ ps.execute();
+ ps.close();
}
+
- public Integer getMaxRedeliverCount ()
- {
- return maxRedeliverCount;
+ public Integer getMaxRedeliverCount() {
+ return maxRedeliverCount;
}
- public void setMaxRedeliverCount (Integer maxRedeliverCount)
- {
- this.maxRedeliverCount = maxRedeliverCount;
+ public void setMaxRedeliverCount(Integer maxRedeliverCount) {
+ this.maxRedeliverCount = maxRedeliverCount;
}
}
More information about the jboss-svn-commits
mailing list