[jboss-svn-commits] JBL Code SVN: r14346 - labs/jbossesb/trunk/product/services/jbossesb/src/main/java/org/jboss/internal/soa/esb/persistence/format/db.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Sat Aug 18 05:42:52 EDT 2007


Author: mark.little at jboss.com
Date: 2007-08-18 05:42:52 -0400 (Sat, 18 Aug 2007)
New Revision: 14346

Modified:
   labs/jbossesb/trunk/product/services/jbossesb/src/main/java/org/jboss/internal/soa/esb/persistence/format/db/DBMessageStoreImpl.java
Log:
http://jira.jboss.com/jira/browse/JBESB-804

Modified: labs/jbossesb/trunk/product/services/jbossesb/src/main/java/org/jboss/internal/soa/esb/persistence/format/db/DBMessageStoreImpl.java
===================================================================
--- labs/jbossesb/trunk/product/services/jbossesb/src/main/java/org/jboss/internal/soa/esb/persistence/format/db/DBMessageStoreImpl.java	2007-08-18 09:40:11 UTC (rev 14345)
+++ labs/jbossesb/trunk/product/services/jbossesb/src/main/java/org/jboss/internal/soa/esb/persistence/format/db/DBMessageStoreImpl.java	2007-08-18 09:42:52 UTC (rev 14346)
@@ -21,6 +21,10 @@
 
 package org.jboss.internal.soa.esb.persistence.format.db;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.net.URI;
 import java.sql.Connection;
@@ -37,6 +41,7 @@
 import org.jboss.soa.esb.client.ServiceInvoker;
 import org.jboss.soa.esb.listeners.message.MessageDeliverException;
 import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.format.MessageType;
 import org.jboss.soa.esb.message.urigen.MessageURIGenerator;
 import org.jboss.soa.esb.persistence.manager.ConnectionManager;
 import org.jboss.soa.esb.persistence.manager.ConnectionManagerException;
@@ -44,421 +49,636 @@
 import org.jboss.soa.esb.services.persistence.MessageStore;
 import org.jboss.soa.esb.services.persistence.MessageStoreException;
 import org.jboss.soa.esb.services.persistence.RedeliverStore;
+import org.jboss.soa.esb.services.registry.RegistryException;
 import org.jboss.soa.esb.util.Util;
-import org.jboss.util.Base64;
 
+/**
+ * This implementation only deals with the Serializable and XML message formats.
+ * 
+ * TODO: add extensibility to marshal/unmarshal of Message implementations
+ * through plugins. Was in original design, but does not seem to have made it
+ * into this initial implementation. However, Util serialize/deserialize are
+ * tying us to these types anyway, despite the extensibility of the existing
+ * Message plugin approach.
+ */
+
 public class DBMessageStoreImpl implements RedeliverStore
 {
-	private Logger logger = Logger.getLogger(this.getClass());
+    private Logger logger = Logger.getLogger(this.getClass());
 
-	protected ConnectionManager mgr = null;
-    
+    protected ConnectionManager mgr = null;
+
     private Integer maxRedeliverCount = 10;
-	
-	protected MessageURIGenerator uriGenerator = new DefaultMessageURIGenerator();
 
-	public DBMessageStoreImpl() throws ConnectionManagerException
+    protected MessageURIGenerator uriGenerator = new DefaultMessageURIGenerator();
+
+    public DBMessageStoreImpl() throws ConnectionManagerException
+    {
+	mgr = ConnectionManagerFactory.getConnectionManager();
+    }
+
+    /*
+         * (non-Javadoc)
+         * 
+         * @see org.jboss.soa.esb.services.persistence.MessageStore#getMessageURIGenerator()
+         */
+    public MessageURIGenerator getMessageURIGenerator ()
+    {
+	return uriGenerator;
+    }
+
+    /**
+         * add's a
+         * 
+         * @Message to the database persistence store will set the 'delivered'
+         *          flag to TRUE by default - assuming that the
+         * @Message has been delivered
+         */
+    public synchronized URI addMessage (Message message, String classification)
+	    throws MessageStoreException
+    {
+	URI uid = null;
+	Connection conn = null;
+	try
 	{
-			mgr = ConnectionManagerFactory.getConnectionManager();
+	    conn = mgr.getConnection();
+	    uid = uriGenerator.generateMessageURI(message);
+	    insert(uid, message, classification, "TRUE", conn);
 	}
-
-	/* (non-Javadoc)
-	 * @see org.jboss.soa.esb.services.persistence.MessageStore#getMessageURIGenerator()
-	 */
-	public MessageURIGenerator getMessageURIGenerator() {
-		return uriGenerator;
+	catch (MessageStoreException ex)
+	{
+	    throw ex;
 	}
-
-	/**
-	 * add's a @Message to the database persistence store
-	 * will set the 'delivered' flag to TRUE by default - assuming that the @Message has been delivered
-	 */
-	public synchronized URI addMessage (Message message, String classification) throws MessageStoreException
+	catch (Exception e)
 	{
-		// String messageString = null;
-		URI uid = null;
-        Connection conn=null;
-		try{
-			conn = mgr.getConnection();
-			uid = uriGenerator.generateMessageURI(message);
-            insert(uid, message, classification, "TRUE", conn);
-		}
-		catch (Exception e)
-		{
-			logger.error(e);
-			throw new MessageStoreException(e);
-		} 
-		finally
-		{
-			release(conn);
-		}
-
-		return uid;
+	    logger.error(e);
+	    
+	    e.printStackTrace();
+	    
+	    throw new MessageStoreException(e);
 	}
+	finally
+	{
+	    release(conn);
+	}
 
-	/**
-	 * return a @Message based on the passed in key in the form of a JBoss ESB @URI
-	 * format for URI: "urn:jboss/esb/message/UID#" + UUID.randomUUID()" - see the method in this class @addMessage
-	 */
-	public synchronized Message getMessage (URI uid)
-			throws MessageStoreException
+	return uid;
+    }
+
+    /**
+         * return a
+         * 
+         * @Message based on the passed in key in the form of a JBoss ESB
+         * @URI format for URI: "urn:jboss/esb/message/UID#" +
+         *      UUID.randomUUID()" - see the method in this class
+         * @addMessage
+         */
+    public synchronized Message getMessage (URI uid)
+	    throws MessageStoreException
+    {
+	Message message = null;
+	Connection conn = null;
+	try
 	{
-		Message message = null;
-        Connection conn=null;
-		try {
-			conn = mgr.getConnection();
-			message =  select(uid, conn);
-		} catch (Exception e) {
-			throw new MessageStoreException(e);
-		} finally {
-			release(conn);
-		}
-		return message;
+	    conn = mgr.getConnection();
+	    message = select(uid, conn);
 	}
-    
+	catch (MessageStoreException ex)
+	{
+	    throw ex;
+	}
+	catch (Exception e)
+	{
+	    e.printStackTrace();
+	    
+	    throw new MessageStoreException(e);
+	}
+	finally
+	{
+	    release(conn);
+	}
+	return message;
+    }
+
     /**
-     * return a @Message based on the passed in key in the form of a JBoss ESB @URI
-     * format for URI: "urn:jboss/esb/message/UID#" + UUID.randomUUID()" - see the method in this class @addMessage
-     */
+         * return a
+         * 
+         * @Message based on the passed in key in the form of a JBoss ESB
+         * @URI format for URI: "urn:jboss/esb/message/UID#" +
+         *      UUID.randomUUID()" - see the method in this class
+         * @addMessage
+         */
     public synchronized Message getMessage (URI uid, String classification)
-            throws MessageStoreException
+	    throws MessageStoreException
     {
-        Message message = null;
-        Connection conn=null;
-        try {
-            conn = mgr.getConnection();
-            message =  select(uid, classification, conn);
-        } catch (Exception e) {
-            throw new MessageStoreException(e);
-        } finally {
-            release(conn);
-        }
-        return message;
+	Message message = null;
+	Connection conn = null;
+	try
+	{
+	    conn = mgr.getConnection();
+	    message = select(uid, classification, conn);
+	}
+	catch (MessageStoreException ex)
+	{
+	    throw ex;
+	}
+	catch (Exception e)
+	{
+	    e.printStackTrace();
+	    
+	    throw new MessageStoreException(e);
+	}
+	finally
+	{
+	    release(conn);
+	}
+	return message;
     }
-    
+
     /**
-     * remove a @Message based on the passed in key in the form of a JBoss ESB @URI
-     * format for URI: "urn:jboss/esb/message/UID#" + UUID.randomUUID()" - see the method in this class @removeMessage
-     */
+         * remove a
+         * 
+         * @Message based on the passed in key in the form of a JBoss ESB
+         * @URI format for URI: "urn:jboss/esb/message/UID#" +
+         *      UUID.randomUUID()" - see the method in this class
+         * @removeMessage
+         */
     public synchronized int removeMessage (URI uid, String classification)
-            throws MessageStoreException
+	    throws MessageStoreException
     {
-        int response;
-        Connection conn=null;
-        try {
-            conn = mgr.getConnection();
-            response =  delete(uid, classification, conn);
-        } catch (Exception e) {
-            throw new MessageStoreException(e);
-        } finally {
-            release(conn);
-        }
-        return response;
+	int response;
+	Connection conn = null;
+	try
+	{
+	    conn = mgr.getConnection();
+	    response = delete(uid, classification, conn);
+	}
+	catch (Exception e)
+	{
+	    e.printStackTrace();
+	    
+	    throw new MessageStoreException(e);
+	}
+	finally
+	{
+	    release(conn);
+	}
+	return response;
     }
-	
-	/**
-	 * 
-	 * @param uid - key for message to set undelivered flag on
-	 * @throws MessageStoreException
-	 */
-	public void setUndelivered(URI uid) throws MessageStoreException
+
+    /**
+         * 
+         * @param uid -
+         *                key for message to set undelivered flag on
+         * @throws MessageStoreException
+         */
+    public void setUndelivered (URI uid) throws MessageStoreException
     {
-		String sql = "update message set delivered = 'FALSE' where uuid=?";
-        Connection conn=null;
-		try {
-			conn = mgr.getConnection();
-            PreparedStatement ps = conn.prepareStatement(sql);
-			ps.setString(1, uid.toString());
-			ps.execute();
-        } catch (Exception e) {
-			throw new MessageStoreException(e);
-		} finally {
-			release(conn);
-		}
-		
+	String sql = "update message set delivered = 'FALSE' where uuid=?";
+	Connection conn = null;
+	try
+	{
+	    conn = mgr.getConnection();
+	    PreparedStatement ps = conn.prepareStatement(sql);
+	    ps.setString(1, uid.toString());
+	    ps.execute();
 	}
-	
-	public void setDelivered(URI uid) throws MessageStoreException{
-		String sql = "update message set delivered = 'TRUE' where uuid=?";
-        Connection conn=null;
-		try {
-			conn = mgr.getConnection();
-            PreparedStatement ps = conn.prepareStatement(sql);
-			ps.setString(1, "FALSE");
-			ps.execute();
-		} catch (Exception e) {
-			throw new MessageStoreException(e);
-		} finally {
-			release(conn);
-		}
+	catch (Exception e)
+	{
+	    e.printStackTrace();
+	    
+	    throw new MessageStoreException(e);
 	}
-	
-	/**
-	 * This method can be used to retrieve a collection of all the undelivered (delivered=FALSE) from the message-store
-	 * You should test for 'null' on the return type to see if any messages exist in the collection
-	 * @return Map<URI, Message> - a collection of all the undelivered messages in the message-store
-	 * @throws MessageStoreException
-	 */
-	public Map<URI, Message> getUndeliveredMessages(String classification) throws MessageStoreException {
-		HashMap<URI, Message> messages = new HashMap<URI, Message>();
-		String sql = "select uuid from message where delivered='FALSE'";
-        if (classification!=null) {
-            sql += " and classification='" + classification + "'";
-        }
-        Connection conn=null;
-		try
-		{
-			conn = mgr.getConnection();
-			Statement stmt;
-			ResultSet rs;
-			stmt = conn.createStatement();
-			rs = stmt.executeQuery(sql);
-			
-			while (rs.next()) {
-				URI uid = new URI(rs.getString(1));
-				Message msg = getMessage(uid);
-				messages.put(uid, msg);
-			}
-			rs.close();
-			stmt.close();
+	finally
+	{
+	    release(conn);
+	}
 
-		}
-		catch (Exception e)
-		{
-			throw new MessageStoreException(e);
-		} 
-		finally
-		{
-			release(conn);
-		}
-		logger.info("retrieved " + messages.size() + " undelivered messages");
-		return messages;
-		
+    }
+
+    public void setDelivered (URI uid) throws MessageStoreException
+    {
+	String sql = "update message set delivered = 'TRUE' where uuid=?";
+	Connection conn = null;
+	try
+	{
+	    conn = mgr.getConnection();
+	    PreparedStatement ps = conn.prepareStatement(sql);
+	    ps.setString(1, "FALSE");
+	    ps.execute();
 	}
-    
+	catch (Exception e)
+	{
+	    e.printStackTrace();
+	    
+	    throw new MessageStoreException(e);
+	}
+	finally
+	{
+	    release(conn);
+	}
+    }
+
     /**
-     * This method can be used to retrieve a collection of all from the message-store
-     * You should test for 'null' on the return type to see if any messages exist in the collection
-     * @return Map<URI, Message> - a collection of all the undelivered messages in the message-store
-     * @throws MessageStoreException
-     */
-    public Map<URI, Message> getAllMessages(String classification) throws MessageStoreException {
-        HashMap<URI, Message> messages = new HashMap<URI, Message>();
-        String sql = "select uuid, message from message";
-        if (classification!=null) {
-            sql += " where classification='" + classification + "'";
-        }
-        Connection conn=null;
-        try
-        {
-            conn = mgr.getConnection();
-            Statement stmt;
-            ResultSet rs;
-            stmt = conn.createStatement();
-            rs = stmt.executeQuery(sql);
-            
-            while (rs.next()) {
-                URI uid = new URI(rs.getString(1));
-                Message msg = Util.deserialize((Serializable) Base64.decodeToObject( rs.getString(2)));
-                messages.put(uid, msg);
-            }
-            rs.close();
-            stmt.close();
+         * This method can be used to retrieve a collection of all the
+         * undelivered (delivered=FALSE) from the message-store You should test
+         * for 'null' on the return type to see if any messages exist in the
+         * collection
+         * 
+         * @return Map<URI, Message> - a collection of all the undelivered
+         *         messages in the message-store
+         * @throws MessageStoreException
+         */
+    public Map<URI, Message> getUndeliveredMessages (String classification)
+	    throws MessageStoreException
+    {
+	HashMap<URI, Message> messages = new HashMap<URI, Message>();
+	String sql = "select uuid from message where delivered='FALSE'";
+	if (classification != null)
+	{
+	    sql += " and classification='" + classification + "'";
+	}
+	Connection conn = null;
+	try
+	{
+	    conn = mgr.getConnection();
+	    Statement stmt;
+	    ResultSet rs;
+	    stmt = conn.createStatement();
+	    rs = stmt.executeQuery(sql);
 
-        }
-        catch (Exception e)
-        {
-            throw new MessageStoreException(e);
-        } 
-        finally
-        {
-            release(conn);
-        }
-        logger.debug("retrieved " + messages.size() + " " + classification + " messages");
-        return messages;
-        
+	    while (rs.next())
+	    {
+		URI uid = new URI(rs.getString(1));
+		Message msg = getMessage(uid);
+		messages.put(uid, msg);
+	    }
+	    rs.close();
+	    stmt.close();
+	}
+	catch (MessageStoreException ex)
+	{
+	    throw ex;
+	}
+	catch (Exception e)
+	{
+	    e.printStackTrace();
+	    
+	    throw new MessageStoreException(e);
+	}
+	finally
+	{
+	    release(conn);
+	}
+	logger.info("retrieved " + messages.size() + " undelivered messages");
+	return messages;
+
     }
 
-	private void release (Connection conn)
+    /**
+         * This method can be used to retrieve a collection of all from the
+         * message-store You should test for 'null' on the return type to see if
+         * any messages exist in the collection
+         * 
+         * @return Map<URI, Message> - a collection of all the undelivered
+         *         messages in the message-store
+         * @throws MessageStoreException
+         */
+    public Map<URI, Message> getAllMessages (String classification)
+	    throws MessageStoreException
+    {
+	HashMap<URI, Message> messages = new HashMap<URI, Message>();
+	String sql = "select uuid, type, message from message";
+	if (classification != null)
 	{
+	    sql += " where classification='" + classification + "'";
+	}
+	Connection conn = null;
+	try
+	{
+	    conn = mgr.getConnection();
+	    Statement stmt;
+	    ResultSet rs;
+	    stmt = conn.createStatement();
+	    rs = stmt.executeQuery(sql);
 
-		if (conn != null)
+	    while (rs.next())
+	    {
+		URI uid = new URI(rs.getString(1));
+		Message msg = decode(rs);
+		messages.put(uid, msg);
+	    }
+	    rs.close();
+	    stmt.close();
+	}
+	catch (MessageStoreException ex)
+	{
+	    throw ex;
+	}
+	catch (Exception e)
+	{
+	    e.printStackTrace();
+	    
+	    throw new MessageStoreException(e);
+	}
+	finally
+	{
+	    release(conn);
+	}
+	logger.debug("retrieved " + messages.size() + " " + classification
+		+ " messages");
+	return messages;
+
+    }
+
+    private void release (Connection conn)
+    {
+
+	if (conn != null)
+	{
+	    try
+	    {
+		conn.close();
+	    }
+	    catch (Exception e2)
+	    {
+		logger.warn(e2.getMessage(), e2);
+	    }
+	}
+    }
+
+    /**
+         * 
+         */
+    public boolean redeliver (URI uuid) throws MessageStoreException
+    {
+	boolean isDelivered = false;
+	boolean error = false;
+
+	Connection con = null;
+	try
+	{
+	    con = mgr.getConnection();
+	    con.setAutoCommit(false);
+	    con.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
+
+	    Message message = select(uuid, con);
+
+	    if (message != null
+		    && delete(uuid, RedeliverStore.CLASSIFICATION_RDLVR, con) == 1)
+	    {
+		// now any good db should have set a read lock on this record,
+		// until we commit.
+		// if exception is thrown up the delivery count on the message
+		// if exceeds the maxcount then update the classification to
+		// DLQ.
+		Service to = (Service) message.getProperties().getProperty(
+			ServiceInvoker.DELIVER_TO);
+		try
 		{
-			try
+		    ServiceInvoker si = new ServiceInvoker(to.getCategory(), to
+			    .getName());
+		    message.getProperties().setProperty(
+			    RedeliverStore.IS_REDELIVERY, true);
+		    si.deliverAsync(message);
+		    isDelivered = true;
+		}
+		catch (MessageDeliverException e)
+		{
+		    logger.debug(e.getMessage(), e);
+		}
+
+		if (isDelivered)
+		{
+		    // the message is delivered, we're good so remove it
+		    // from the store
+		    delete(uuid, RedeliverStore.CLASSIFICATION_RDLVR, con);
+		}
+		else
+		{
+		    // the message was not delivered
+		    if (message.getProperties().getProperty(DELIVER_COUNT) == null)
+		    {
+			// appearantly it was the first time
+			message.getProperties().setProperty(
+				RedeliverStore.DELIVER_COUNT,
+				Integer.valueOf("1"));
+			insert(uuid, message,
+				MessageStore.CLASSIFICATION_RDLVR, "FALSE", con);
+		    }
+		    else
+		    {
+			Integer redeliverCount = (Integer) message
+				.getProperties().getProperty(DELIVER_COUNT);
+			if (redeliverCount < maxRedeliverCount
+				|| maxRedeliverCount < 0)
 			{
-				conn.close();
+			    // up the count
+			    message.getProperties().setProperty(
+				    RedeliverStore.DELIVER_COUNT,
+				    ++redeliverCount);
+			    insert(uuid, message,
+				    MessageStore.CLASSIFICATION_RDLVR, "FALSE",
+				    con);
 			}
-			catch (Exception e2)
+			else
 			{
-                logger.warn(e2.getMessage(), e2);
+			    // undeliverable, send to the DLQ
+			    insert(uuid, message,
+				    MessageStore.CLASSIFICATION_DLQ, "FALSE",
+				    con);
 			}
+		    }
 		}
+	    }
 	}
-    /**
-     * 
-     */
-    public boolean redeliver(URI uuid) throws MessageStoreException
+	catch (SQLException e)
+	{
+	    if (logger.isDebugEnabled())
+	    {
+		logger.debug("Deadlocks may occur under normal processing");
+		logger.debug(e.getMessage(), e);
+	    }
+	    error = true;
+	}
+	finally
+	{
+	    if (con != null)
+	    {
+		try
+		{
+		    if (!error)
+		    {
+			con.commit();
+		    }
+		    else
+		    {
+			con.rollback();
+		    }
+		}
+		catch (SQLException e)
+		{
+		    logger.error(e);
+		}
+		try
+		{
+		    con.close();
+		}
+		catch (Exception e2)
+		{
+		    logger.error(e2);
+		}
+	    }
+	}
+	return isDelivered;
+    }
+
+    private Message select (URI uid, Connection connection)
+	    throws SQLException, MessageStoreException
     {
-        boolean isDelivered=false;
-        boolean error=false;
+	Message message = null;
+	String selectSql = "select * from message where uuid=?";
+	PreparedStatement selectStmt = connection.prepareStatement(selectSql);
+	selectStmt.setObject(1, uid.toString());
+	ResultSet rs = selectStmt.executeQuery();
 
-        Connection con = null;
-        try
-        {
-            con = mgr.getConnection();
-            con.setAutoCommit(false);
-            con.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
-               
-            Message message=select(uuid, con);
-            
-            if (message!=null && delete(uuid, RedeliverStore.CLASSIFICATION_RDLVR, con)==1) {
-                //now any good db should have set a read lock on this record, until we commit.
-                //if exception is thrown up the delivery count on the message
-                //if exceeds the maxcount then update the classification to DLQ.
-                Service to = (Service) message.getProperties().getProperty(ServiceInvoker.DELIVER_TO);
-                try {
-                    ServiceInvoker si = new ServiceInvoker(to.getCategory(), to.getName());
-                    message.getProperties().setProperty(RedeliverStore.IS_REDELIVERY, true);
-                    si.deliverAsync(message);
-                    isDelivered=true;
-                } catch (MessageDeliverException e) {
-                    logger.debug(e.getMessage(), e);
-                }
-                
-                if (isDelivered) {
-                    //the message is delivered, we're good so remove it from the store
-                    delete(uuid, RedeliverStore.CLASSIFICATION_RDLVR, con);
-                } else {
-                    //the message was not delivered
-                    if (message.getProperties().getProperty(DELIVER_COUNT)==null) {
-                        //appearantly it was the first time
-                        message.getProperties().setProperty(RedeliverStore.DELIVER_COUNT, Integer.valueOf("1"));
-                        insert(uuid, message, MessageStore.CLASSIFICATION_RDLVR, "FALSE", con);
-                    } else {
-                        Integer redeliverCount = (Integer) message.getProperties().getProperty(DELIVER_COUNT);
-                        if (redeliverCount < maxRedeliverCount || maxRedeliverCount < 0) {
-                            //up the count
-                            message.getProperties().setProperty(RedeliverStore.DELIVER_COUNT, ++redeliverCount);
-                            insert(uuid, message, MessageStore.CLASSIFICATION_RDLVR, "FALSE", con);
-                        } else {
-                            //undeliverable, send to the DLQ
-                            insert(uuid, message, MessageStore.CLASSIFICATION_DLQ, "FALSE", con);
-                        }
-                    }
-                }
-            }
-        }
-        catch (SQLException e)
-        {
-            if (logger.isDebugEnabled()) {
-                logger.debug("Deadlocks may occur under normal processing");
-                logger.debug(e.getMessage(), e);
-            }
-            error=true;
-        } 
-        finally
-        {
-            if (con!=null) {
-                try {
-                    if (!error) {
-                        con.commit();
-                    } else {
-                        con.rollback();
-                    }
-                } catch (SQLException e) {
-                    logger.error(e);
-                }
-                try {
-                    con.close();
-                } catch (Exception e2) {
-                    logger.error(e2);
-                }
-            }
-        }
-        return isDelivered; 
+	if (rs.next())
+	    message = decode(rs);
+
+	rs.close();
+	selectStmt.close();
+	return message;
     }
-    
-    
-    private Message select(URI uid, Connection connection) 
-        throws SQLException, MessageStoreException
+
+    private Message select (URI uid, String classification,
+	    Connection connection) throws SQLException, MessageStoreException
     {
-        Message message=null;
-        String selectSql = "select * from message where uuid=?";
-        PreparedStatement selectStmt = connection.prepareStatement(selectSql);
-        selectStmt.setObject(1, uid.toString());
-        ResultSet rs = selectStmt.executeQuery();
-        if (rs.next()) {
-            try {
-                message = Util.deserialize((Serializable) Base64.decodeToObject(rs.getString("message")));
-            } catch (Exception e) {
-                throw new MessageStoreException(e);
-            }
-        }
-        rs.close();
-        selectStmt.close();
-        return message;
+	Message message = null;
+	String selectSql = "select * from message where uuid=? and classification=?";
+	PreparedStatement selectStmt = connection.prepareStatement(selectSql);
+	selectStmt.setObject(1, uid.toString());
+	selectStmt.setObject(2, classification);
+	ResultSet rs = selectStmt.executeQuery();
+
+	if (rs.next())
+	    message = decode(rs);
+
+	rs.close();
+	selectStmt.close();
+	return message;
     }
-    
-    private Message select(URI uid, String classification, Connection connection) 
-    throws SQLException, MessageStoreException
+
+    private int delete (URI uid, String classification, Connection connection)
+	    throws SQLException
     {
-        Message message=null;
-        String selectSql = "select * from message where uuid=? and classification=?";
-        PreparedStatement selectStmt = connection.prepareStatement(selectSql);
-        selectStmt.setObject(1, uid.toString());
-        selectStmt.setObject(2, classification);
-        ResultSet rs = selectStmt.executeQuery();
-        if (rs.next()) {
-            try {
-                message = Util.deserialize((Serializable) Base64.decodeToObject(rs.getString("message")));
-            } catch (Exception e) {
-                throw new MessageStoreException(e);
-            }
-        }
-        rs.close();
-        selectStmt.close();
-        return message;
+	String deleteSql = "delete from message where uuid=? and classification=?";
+	PreparedStatement stmt = connection.prepareStatement(deleteSql);
+	stmt.setObject(1, uid.toString());
+	stmt.setObject(2, classification);
+	int result = stmt.executeUpdate();
+	stmt.close();
+	return result;
     }
-    
-    private int delete(URI uid, String classification, Connection connection)
-        throws SQLException
+
+    private void insert (URI uid, Message message, String classification,
+	    String delivered, Connection conn) throws SQLException,
+	    MessageStoreException
     {
-        String deleteSql = "delete from message where uuid=? and classification=?";
-        PreparedStatement stmt = connection.prepareStatement(deleteSql);
-        stmt.setObject(1, uid.toString());
-        stmt.setObject(2, classification);
-        int result = stmt.executeUpdate();
-        stmt.close();
-        return result;
+	if (message.getType().equals(MessageType.JAVA_SERIALIZED)
+		|| message.getType().equals(MessageType.JBOSS_XML))
+	{
+	    String sql = "insert into message(uuid, type, message, delivered, classification) values(?,?,?,?,?)";
+	    PreparedStatement ps = conn.prepareStatement(sql);
+
+	    ps.setString(1, uid.toString());
+	    ps.setString(2, message.getType().toString());
+
+	    try
+	    {
+		String messageString = null;
+		Serializable serializedForm = Util.serialize(message);
+
+		if (serializedForm instanceof String)
+		    messageString = (String) serializedForm;
+		else
+		{
+		    ByteArrayOutputStream bo = new ByteArrayOutputStream();
+		    ObjectOutputStream os = new ObjectOutputStream(bo);
+
+		    os.writeObject(serializedForm);
+		    os.flush();
+		    os.close();
+
+		    messageString = new String(bo.toByteArray());
+		}
+
+		ps.setString(3, messageString);
+	    }
+	    catch (Exception e)
+	    {
+		e.printStackTrace();
+		
+		throw new MessageStoreException(e);
+	    }
+	    ps.setString(4, "TRUE");
+	    ps.setString(5, classification);
+	    ps.execute();
+	    ps.close();
+	}
+	else
+	{
+	    Thread.currentThread().dumpStack();
+	    
+	    throw new MessageStoreException("Unsupported MessageType: "
+		    + message.getType());
+	}
     }
-    
-    private void insert(URI uid, Message message, String classification, String delivered, Connection conn) 
-        throws SQLException, MessageStoreException
+
+    private final Message decode (ResultSet rs) throws MessageStoreException
     {
-        String sql = "insert into message(uuid, type, message, delivered, classification) values(?,?,?,?,?)";
-        PreparedStatement ps = conn.prepareStatement(sql);
-        
-        ps.setString(1, uid.toString());
-        ps.setString(2, message.getType().toString());
-        try {
-            String messageString = Base64.encodeObject(Util.serialize(message));
-            ps.setString(3, messageString);
-        } catch (Exception e) {
-            throw new MessageStoreException(e);
-        }
-        ps.setString(4, "TRUE");
-        ps.setString(5, classification);
-        ps.execute();
-        ps.close();
+	try
+	{
+	    URI type = new URI(rs.getString("type"));
+	    String data = rs.getString("message");
+
+	    if (type.equals(MessageType.JAVA_SERIALIZED))
+	    {
+		ByteArrayInputStream bs = new ByteArrayInputStream(data.getBytes());
+		ObjectInputStream os = new ObjectInputStream(bs);
+
+		return Util.deserialize((Serializable) os.readObject());
+	    }
+	    else
+	    {
+		if (type.equals(MessageType.JBOSS_XML))
+		{
+		    return Util.deserialize(data);
+		}
+		else
+		    throw new MessageStoreException("Unsupported MessageType: "
+			    + type);
+	    }
+	}
+	catch (MessageStoreException ex)
+	{
+	    throw ex;
+	}
+	catch (Exception ex)
+	{
+	    ex.printStackTrace();
+	    
+	    throw new MessageStoreException(ex);
+	}
     }
-    
 
-    public Integer getMaxRedeliverCount() {
-        return maxRedeliverCount;
+    public Integer getMaxRedeliverCount ()
+    {
+	return maxRedeliverCount;
     }
 
-    public void setMaxRedeliverCount(Integer maxRedeliverCount) {
-        this.maxRedeliverCount = maxRedeliverCount;
+    public void setMaxRedeliverCount (Integer maxRedeliverCount)
+    {
+	this.maxRedeliverCount = maxRedeliverCount;
     }
 
 }




More information about the jboss-svn-commits mailing list