[jboss-svn-commits] JBL Code SVN: r14348 - labs/jbossesb/trunk/product/services/jbossesb/src/main/java/org/jboss/internal/soa/esb/persistence/format/db.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Sat Aug 18 11:20:13 EDT 2007


Author: tfennelly
Date: 2007-08-18 11:20:13 -0400 (Sat, 18 Aug 2007)
New Revision: 14348

Modified:
   labs/jbossesb/trunk/product/services/jbossesb/src/main/java/org/jboss/internal/soa/esb/persistence/format/db/DBMessageStoreImpl.java
Log:
Rolled back - build was broken.

Modified: labs/jbossesb/trunk/product/services/jbossesb/src/main/java/org/jboss/internal/soa/esb/persistence/format/db/DBMessageStoreImpl.java
===================================================================
--- labs/jbossesb/trunk/product/services/jbossesb/src/main/java/org/jboss/internal/soa/esb/persistence/format/db/DBMessageStoreImpl.java	2007-08-18 12:40:38 UTC (rev 14347)
+++ labs/jbossesb/trunk/product/services/jbossesb/src/main/java/org/jboss/internal/soa/esb/persistence/format/db/DBMessageStoreImpl.java	2007-08-18 15:20:13 UTC (rev 14348)
@@ -21,10 +21,6 @@
 
 package org.jboss.internal.soa.esb.persistence.format.db;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.net.URI;
 import java.sql.Connection;
@@ -41,7 +37,6 @@
 import org.jboss.soa.esb.client.ServiceInvoker;
 import org.jboss.soa.esb.listeners.message.MessageDeliverException;
 import org.jboss.soa.esb.message.Message;
-import org.jboss.soa.esb.message.format.MessageType;
 import org.jboss.soa.esb.message.urigen.MessageURIGenerator;
 import org.jboss.soa.esb.persistence.manager.ConnectionManager;
 import org.jboss.soa.esb.persistence.manager.ConnectionManagerException;
@@ -49,636 +44,421 @@
 import org.jboss.soa.esb.services.persistence.MessageStore;
 import org.jboss.soa.esb.services.persistence.MessageStoreException;
 import org.jboss.soa.esb.services.persistence.RedeliverStore;
-import org.jboss.soa.esb.services.registry.RegistryException;
 import org.jboss.soa.esb.util.Util;
+import org.jboss.util.Base64;
 
-/**
- * This implementation only deals with the Serializable and XML message formats.
- * 
- * TODO: add extensibility to marshal/unmarshal of Message implementations
- * through plugins. Was in original design, but does not seem to have made it
- * into this initial implementation. However, Util serialize/deserialize are
- * tying us to these types anyway, despite the extensibility of the existing
- * Message plugin approach.
- */
-
 public class DBMessageStoreImpl implements RedeliverStore
 {
-    private Logger logger = Logger.getLogger(this.getClass());
+	private Logger logger = Logger.getLogger(this.getClass());
 
-    protected ConnectionManager mgr = null;
-
+	protected ConnectionManager mgr = null;
+    
     private Integer maxRedeliverCount = 10;
+	
+	protected MessageURIGenerator uriGenerator = new DefaultMessageURIGenerator();
 
-    protected MessageURIGenerator uriGenerator = new DefaultMessageURIGenerator();
-
-    public DBMessageStoreImpl() throws ConnectionManagerException
-    {
-	mgr = ConnectionManagerFactory.getConnectionManager();
-    }
-
-    /*
-         * (non-Javadoc)
-         * 
-         * @see org.jboss.soa.esb.services.persistence.MessageStore#getMessageURIGenerator()
-         */
-    public MessageURIGenerator getMessageURIGenerator ()
-    {
-	return uriGenerator;
-    }
-
-    /**
-         * add's a
-         * 
-         * @Message to the database persistence store will set the 'delivered'
-         *          flag to TRUE by default - assuming that the
-         * @Message has been delivered
-         */
-    public synchronized URI addMessage (Message message, String classification)
-	    throws MessageStoreException
-    {
-	URI uid = null;
-	Connection conn = null;
-	try
+	public DBMessageStoreImpl() throws ConnectionManagerException
 	{
-	    conn = mgr.getConnection();
-	    uid = uriGenerator.generateMessageURI(message);
-	    insert(uid, message, classification, "TRUE", conn);
+			mgr = ConnectionManagerFactory.getConnectionManager();
 	}
-	catch (MessageStoreException ex)
-	{
-	    throw ex;
+
+	/* (non-Javadoc)
+	 * @see org.jboss.soa.esb.services.persistence.MessageStore#getMessageURIGenerator()
+	 */
+	public MessageURIGenerator getMessageURIGenerator() {
+		return uriGenerator;
 	}
-	catch (Exception e)
+
+	/**
+	 * add's a @Message to the database persistence store
+	 * will set the 'delivered' flag to TRUE by default - assuming that the @Message has been delivered
+	 */
+	public synchronized URI addMessage (Message message, String classification) throws MessageStoreException
 	{
-	    logger.error(e);
-	    
-	    e.printStackTrace();
-	    
-	    throw new MessageStoreException(e);
+		// String messageString = null;
+		URI uid = null;
+        Connection conn=null;
+		try{
+			conn = mgr.getConnection();
+			uid = uriGenerator.generateMessageURI(message);
+            insert(uid, message, classification, "TRUE", conn);
+		}
+		catch (Exception e)
+		{
+			logger.error(e);
+			throw new MessageStoreException(e);
+		} 
+		finally
+		{
+			release(conn);
+		}
+
+		return uid;
 	}
-	finally
-	{
-	    release(conn);
-	}
 
-	return uid;
-    }
-
-    /**
-         * return a
-         * 
-         * @Message based on the passed in key in the form of a JBoss ESB
-         * @URI format for URI: "urn:jboss/esb/message/UID#" +
-         *      UUID.randomUUID()" - see the method in this class
-         * @addMessage
-         */
-    public synchronized Message getMessage (URI uid)
-	    throws MessageStoreException
-    {
-	Message message = null;
-	Connection conn = null;
-	try
+	/**
+	 * return a @Message based on the passed in key in the form of a JBoss ESB @URI
+	 * format for URI: "urn:jboss/esb/message/UID#" + UUID.randomUUID()" - see the method in this class @addMessage
+	 */
+	public synchronized Message getMessage (URI uid)
+			throws MessageStoreException
 	{
-	    conn = mgr.getConnection();
-	    message = select(uid, conn);
+		Message message = null;
+        Connection conn=null;
+		try {
+			conn = mgr.getConnection();
+			message =  select(uid, conn);
+		} catch (Exception e) {
+			throw new MessageStoreException(e);
+		} finally {
+			release(conn);
+		}
+		return message;
 	}
-	catch (MessageStoreException ex)
-	{
-	    throw ex;
-	}
-	catch (Exception e)
-	{
-	    e.printStackTrace();
-	    
-	    throw new MessageStoreException(e);
-	}
-	finally
-	{
-	    release(conn);
-	}
-	return message;
-    }
-
+    
     /**
-         * return a
-         * 
-         * @Message based on the passed in key in the form of a JBoss ESB
-         * @URI format for URI: "urn:jboss/esb/message/UID#" +
-         *      UUID.randomUUID()" - see the method in this class
-         * @addMessage
-         */
+     * return a @Message based on the passed in key in the form of a JBoss ESB @URI
+     * format for URI: "urn:jboss/esb/message/UID#" + UUID.randomUUID()" - see the method in this class @addMessage
+     */
     public synchronized Message getMessage (URI uid, String classification)
-	    throws MessageStoreException
+            throws MessageStoreException
     {
-	Message message = null;
-	Connection conn = null;
-	try
-	{
-	    conn = mgr.getConnection();
-	    message = select(uid, classification, conn);
-	}
-	catch (MessageStoreException ex)
-	{
-	    throw ex;
-	}
-	catch (Exception e)
-	{
-	    e.printStackTrace();
-	    
-	    throw new MessageStoreException(e);
-	}
-	finally
-	{
-	    release(conn);
-	}
-	return message;
+        Message message = null;
+        Connection conn=null;
+        try {
+            conn = mgr.getConnection();
+            message =  select(uid, classification, conn);
+        } catch (Exception e) {
+            throw new MessageStoreException(e);
+        } finally {
+            release(conn);
+        }
+        return message;
     }
-
+    
     /**
-         * remove a
-         * 
-         * @Message based on the passed in key in the form of a JBoss ESB
-         * @URI format for URI: "urn:jboss/esb/message/UID#" +
-         *      UUID.randomUUID()" - see the method in this class
-         * @removeMessage
-         */
+     * remove a @Message based on the passed in key in the form of a JBoss ESB @URI
+     * format for URI: "urn:jboss/esb/message/UID#" + UUID.randomUUID()" - see the method in this class @removeMessage
+     */
     public synchronized int removeMessage (URI uid, String classification)
-	    throws MessageStoreException
+            throws MessageStoreException
     {
-	int response;
-	Connection conn = null;
-	try
-	{
-	    conn = mgr.getConnection();
-	    response = delete(uid, classification, conn);
-	}
-	catch (Exception e)
-	{
-	    e.printStackTrace();
-	    
-	    throw new MessageStoreException(e);
-	}
-	finally
-	{
-	    release(conn);
-	}
-	return response;
+        int response;
+        Connection conn=null;
+        try {
+            conn = mgr.getConnection();
+            response =  delete(uid, classification, conn);
+        } catch (Exception e) {
+            throw new MessageStoreException(e);
+        } finally {
+            release(conn);
+        }
+        return response;
     }
-
-    /**
-         * 
-         * @param uid -
-         *                key for message to set undelivered flag on
-         * @throws MessageStoreException
-         */
-    public void setUndelivered (URI uid) throws MessageStoreException
+	
+	/**
+	 * 
+	 * @param uid - key for message to set undelivered flag on
+	 * @throws MessageStoreException
+	 */
+	public void setUndelivered(URI uid) throws MessageStoreException
     {
-	String sql = "update message set delivered = 'FALSE' where uuid=?";
-	Connection conn = null;
-	try
-	{
-	    conn = mgr.getConnection();
-	    PreparedStatement ps = conn.prepareStatement(sql);
-	    ps.setString(1, uid.toString());
-	    ps.execute();
+		String sql = "update message set delivered = 'FALSE' where uuid=?";
+        Connection conn=null;
+		try {
+			conn = mgr.getConnection();
+            PreparedStatement ps = conn.prepareStatement(sql);
+			ps.setString(1, uid.toString());
+			ps.execute();
+        } catch (Exception e) {
+			throw new MessageStoreException(e);
+		} finally {
+			release(conn);
+		}
+		
 	}
-	catch (Exception e)
-	{
-	    e.printStackTrace();
-	    
-	    throw new MessageStoreException(e);
+	
+	public void setDelivered(URI uid) throws MessageStoreException{
+		String sql = "update message set delivered = 'TRUE' where uuid=?";
+        Connection conn=null;
+		try {
+			conn = mgr.getConnection();
+            PreparedStatement ps = conn.prepareStatement(sql);
+			ps.setString(1, "FALSE");
+			ps.execute();
+		} catch (Exception e) {
+			throw new MessageStoreException(e);
+		} finally {
+			release(conn);
+		}
 	}
-	finally
-	{
-	    release(conn);
-	}
+	
+	/**
+	 * This method can be used to retrieve a collection of all the undelivered (delivered=FALSE) from the message-store
+	 * You should test for 'null' on the return type to see if any messages exist in the collection
+	 * @return Map<URI, Message> - a collection of all the undelivered messages in the message-store
+	 * @throws MessageStoreException
+	 */
+	public Map<URI, Message> getUndeliveredMessages(String classification) throws MessageStoreException {
+		HashMap<URI, Message> messages = new HashMap<URI, Message>();
+		String sql = "select uuid from message where delivered='FALSE'";
+        if (classification!=null) {
+            sql += " and classification='" + classification + "'";
+        }
+        Connection conn=null;
+		try
+		{
+			conn = mgr.getConnection();
+			Statement stmt;
+			ResultSet rs;
+			stmt = conn.createStatement();
+			rs = stmt.executeQuery(sql);
+			
+			while (rs.next()) {
+				URI uid = new URI(rs.getString(1));
+				Message msg = getMessage(uid);
+				messages.put(uid, msg);
+			}
+			rs.close();
+			stmt.close();
 
-    }
-
-    public void setDelivered (URI uid) throws MessageStoreException
-    {
-	String sql = "update message set delivered = 'TRUE' where uuid=?";
-	Connection conn = null;
-	try
-	{
-	    conn = mgr.getConnection();
-	    PreparedStatement ps = conn.prepareStatement(sql);
-	    ps.setString(1, "FALSE");
-	    ps.execute();
+		}
+		catch (Exception e)
+		{
+			throw new MessageStoreException(e);
+		} 
+		finally
+		{
+			release(conn);
+		}
+		logger.info("retrieved " + messages.size() + " undelivered messages");
+		return messages;
+		
 	}
-	catch (Exception e)
-	{
-	    e.printStackTrace();
-	    
-	    throw new MessageStoreException(e);
-	}
-	finally
-	{
-	    release(conn);
-	}
-    }
-
+    
     /**
-         * This method can be used to retrieve a collection of all the
-         * undelivered (delivered=FALSE) from the message-store You should test
-         * for 'null' on the return type to see if any messages exist in the
-         * collection
-         * 
-         * @return Map<URI, Message> - a collection of all the undelivered
-         *         messages in the message-store
-         * @throws MessageStoreException
-         */
-    public Map<URI, Message> getUndeliveredMessages (String classification)
-	    throws MessageStoreException
-    {
-	HashMap<URI, Message> messages = new HashMap<URI, Message>();
-	String sql = "select uuid from message where delivered='FALSE'";
-	if (classification != null)
-	{
-	    sql += " and classification='" + classification + "'";
-	}
-	Connection conn = null;
-	try
-	{
-	    conn = mgr.getConnection();
-	    Statement stmt;
-	    ResultSet rs;
-	    stmt = conn.createStatement();
-	    rs = stmt.executeQuery(sql);
+     * This method can be used to retrieve a collection of all from the message-store
+     * You should test for 'null' on the return type to see if any messages exist in the collection
+     * @return Map<URI, Message> - a collection of all the undelivered messages in the message-store
+     * @throws MessageStoreException
+     */
+    public Map<URI, Message> getAllMessages(String classification) throws MessageStoreException {
+        HashMap<URI, Message> messages = new HashMap<URI, Message>();
+        String sql = "select uuid, message from message";
+        if (classification!=null) {
+            sql += " where classification='" + classification + "'";
+        }
+        Connection conn=null;
+        try
+        {
+            conn = mgr.getConnection();
+            Statement stmt;
+            ResultSet rs;
+            stmt = conn.createStatement();
+            rs = stmt.executeQuery(sql);
+            
+            while (rs.next()) {
+                URI uid = new URI(rs.getString(1));
+                Message msg = Util.deserialize((Serializable) Base64.decodeToObject( rs.getString(2)));
+                messages.put(uid, msg);
+            }
+            rs.close();
+            stmt.close();
 
-	    while (rs.next())
-	    {
-		URI uid = new URI(rs.getString(1));
-		Message msg = getMessage(uid);
-		messages.put(uid, msg);
-	    }
-	    rs.close();
-	    stmt.close();
-	}
-	catch (MessageStoreException ex)
-	{
-	    throw ex;
-	}
-	catch (Exception e)
-	{
-	    e.printStackTrace();
-	    
-	    throw new MessageStoreException(e);
-	}
-	finally
-	{
-	    release(conn);
-	}
-	logger.info("retrieved " + messages.size() + " undelivered messages");
-	return messages;
-
+        }
+        catch (Exception e)
+        {
+            throw new MessageStoreException(e);
+        } 
+        finally
+        {
+            release(conn);
+        }
+        logger.debug("retrieved " + messages.size() + " " + classification + " messages");
+        return messages;
+        
     }
 
-    /**
-         * This method can be used to retrieve a collection of all from the
-         * message-store You should test for 'null' on the return type to see if
-         * any messages exist in the collection
-         * 
-         * @return Map<URI, Message> - a collection of all the undelivered
-         *         messages in the message-store
-         * @throws MessageStoreException
-         */
-    public Map<URI, Message> getAllMessages (String classification)
-	    throws MessageStoreException
-    {
-	HashMap<URI, Message> messages = new HashMap<URI, Message>();
-	String sql = "select uuid, type, message from message";
-	if (classification != null)
+	private void release (Connection conn)
 	{
-	    sql += " where classification='" + classification + "'";
-	}
-	Connection conn = null;
-	try
-	{
-	    conn = mgr.getConnection();
-	    Statement stmt;
-	    ResultSet rs;
-	    stmt = conn.createStatement();
-	    rs = stmt.executeQuery(sql);
 
-	    while (rs.next())
-	    {
-		URI uid = new URI(rs.getString(1));
-		Message msg = decode(rs);
-		messages.put(uid, msg);
-	    }
-	    rs.close();
-	    stmt.close();
-	}
-	catch (MessageStoreException ex)
-	{
-	    throw ex;
-	}
-	catch (Exception e)
-	{
-	    e.printStackTrace();
-	    
-	    throw new MessageStoreException(e);
-	}
-	finally
-	{
-	    release(conn);
-	}
-	logger.debug("retrieved " + messages.size() + " " + classification
-		+ " messages");
-	return messages;
-
-    }
-
-    private void release (Connection conn)
-    {
-
-	if (conn != null)
-	{
-	    try
-	    {
-		conn.close();
-	    }
-	    catch (Exception e2)
-	    {
-		logger.warn(e2.getMessage(), e2);
-	    }
-	}
-    }
-
-    /**
-         * 
-         */
-    public boolean redeliver (URI uuid) throws MessageStoreException
-    {
-	boolean isDelivered = false;
-	boolean error = false;
-
-	Connection con = null;
-	try
-	{
-	    con = mgr.getConnection();
-	    con.setAutoCommit(false);
-	    con.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
-
-	    Message message = select(uuid, con);
-
-	    if (message != null
-		    && delete(uuid, RedeliverStore.CLASSIFICATION_RDLVR, con) == 1)
-	    {
-		// now any good db should have set a read lock on this record,
-		// until we commit.
-		// if exception is thrown up the delivery count on the message
-		// if exceeds the maxcount then update the classification to
-		// DLQ.
-		Service to = (Service) message.getProperties().getProperty(
-			ServiceInvoker.DELIVER_TO);
-		try
+		if (conn != null)
 		{
-		    ServiceInvoker si = new ServiceInvoker(to.getCategory(), to
-			    .getName());
-		    message.getProperties().setProperty(
-			    RedeliverStore.IS_REDELIVERY, true);
-		    si.deliverAsync(message);
-		    isDelivered = true;
-		}
-		catch (MessageDeliverException e)
-		{
-		    logger.debug(e.getMessage(), e);
-		}
-
-		if (isDelivered)
-		{
-		    // the message is delivered, we're good so remove it
-		    // from the store
-		    delete(uuid, RedeliverStore.CLASSIFICATION_RDLVR, con);
-		}
-		else
-		{
-		    // the message was not delivered
-		    if (message.getProperties().getProperty(DELIVER_COUNT) == null)
-		    {
-			// appearantly it was the first time
-			message.getProperties().setProperty(
-				RedeliverStore.DELIVER_COUNT,
-				Integer.valueOf("1"));
-			insert(uuid, message,
-				MessageStore.CLASSIFICATION_RDLVR, "FALSE", con);
-		    }
-		    else
-		    {
-			Integer redeliverCount = (Integer) message
-				.getProperties().getProperty(DELIVER_COUNT);
-			if (redeliverCount < maxRedeliverCount
-				|| maxRedeliverCount < 0)
+			try
 			{
-			    // up the count
-			    message.getProperties().setProperty(
-				    RedeliverStore.DELIVER_COUNT,
-				    ++redeliverCount);
-			    insert(uuid, message,
-				    MessageStore.CLASSIFICATION_RDLVR, "FALSE",
-				    con);
+				conn.close();
 			}
-			else
+			catch (Exception e2)
 			{
-			    // undeliverable, send to the DLQ
-			    insert(uuid, message,
-				    MessageStore.CLASSIFICATION_DLQ, "FALSE",
-				    con);
+                logger.warn(e2.getMessage(), e2);
 			}
-		    }
 		}
-	    }
 	}
-	catch (SQLException e)
-	{
-	    if (logger.isDebugEnabled())
-	    {
-		logger.debug("Deadlocks may occur under normal processing");
-		logger.debug(e.getMessage(), e);
-	    }
-	    error = true;
-	}
-	finally
-	{
-	    if (con != null)
-	    {
-		try
-		{
-		    if (!error)
-		    {
-			con.commit();
-		    }
-		    else
-		    {
-			con.rollback();
-		    }
-		}
-		catch (SQLException e)
-		{
-		    logger.error(e);
-		}
-		try
-		{
-		    con.close();
-		}
-		catch (Exception e2)
-		{
-		    logger.error(e2);
-		}
-	    }
-	}
-	return isDelivered;
-    }
-
-    private Message select (URI uid, Connection connection)
-	    throws SQLException, MessageStoreException
+    /**
+     * 
+     */
+    public boolean redeliver(URI uuid) throws MessageStoreException
     {
-	Message message = null;
-	String selectSql = "select * from message where uuid=?";
-	PreparedStatement selectStmt = connection.prepareStatement(selectSql);
-	selectStmt.setObject(1, uid.toString());
-	ResultSet rs = selectStmt.executeQuery();
+        boolean isDelivered=false;
+        boolean error=false;
 
-	if (rs.next())
-	    message = decode(rs);
-
-	rs.close();
-	selectStmt.close();
-	return message;
+        Connection con = null;
+        try
+        {
+            con = mgr.getConnection();
+            con.setAutoCommit(false);
+            con.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
+               
+            Message message=select(uuid, con);
+            
+            if (message!=null && delete(uuid, RedeliverStore.CLASSIFICATION_RDLVR, con)==1) {
+                //now any good db should have set a read lock on this record, until we commit.
+                //if exception is thrown up the delivery count on the message
+                //if exceeds the maxcount then update the classification to DLQ.
+                Service to = (Service) message.getProperties().getProperty(ServiceInvoker.DELIVER_TO);
+                try {
+                    ServiceInvoker si = new ServiceInvoker(to.getCategory(), to.getName());
+                    message.getProperties().setProperty(RedeliverStore.IS_REDELIVERY, true);
+                    si.deliverAsync(message);
+                    isDelivered=true;
+                } catch (MessageDeliverException e) {
+                    logger.debug(e.getMessage(), e);
+                }
+                
+                if (isDelivered) {
+                    //the message is delivered, we're good so remove it from the store
+                    delete(uuid, RedeliverStore.CLASSIFICATION_RDLVR, con);
+                } else {
+                    //the message was not delivered
+                    if (message.getProperties().getProperty(DELIVER_COUNT)==null) {
+                        //appearantly it was the first time
+                        message.getProperties().setProperty(RedeliverStore.DELIVER_COUNT, Integer.valueOf("1"));
+                        insert(uuid, message, MessageStore.CLASSIFICATION_RDLVR, "FALSE", con);
+                    } else {
+                        Integer redeliverCount = (Integer) message.getProperties().getProperty(DELIVER_COUNT);
+                        if (redeliverCount < maxRedeliverCount || maxRedeliverCount < 0) {
+                            //up the count
+                            message.getProperties().setProperty(RedeliverStore.DELIVER_COUNT, ++redeliverCount);
+                            insert(uuid, message, MessageStore.CLASSIFICATION_RDLVR, "FALSE", con);
+                        } else {
+                            //undeliverable, send to the DLQ
+                            insert(uuid, message, MessageStore.CLASSIFICATION_DLQ, "FALSE", con);
+                        }
+                    }
+                }
+            }
+        }
+        catch (SQLException e)
+        {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Deadlocks may occur under normal processing");
+                logger.debug(e.getMessage(), e);
+            }
+            error=true;
+        } 
+        finally
+        {
+            if (con!=null) {
+                try {
+                    if (!error) {
+                        con.commit();
+                    } else {
+                        con.rollback();
+                    }
+                } catch (SQLException e) {
+                    logger.error(e);
+                }
+                try {
+                    con.close();
+                } catch (Exception e2) {
+                    logger.error(e2);
+                }
+            }
+        }
+        return isDelivered; 
     }
-
-    private Message select (URI uid, String classification,
-	    Connection connection) throws SQLException, MessageStoreException
+    
+    
+    private Message select(URI uid, Connection connection) 
+        throws SQLException, MessageStoreException
     {
-	Message message = null;
-	String selectSql = "select * from message where uuid=? and classification=?";
-	PreparedStatement selectStmt = connection.prepareStatement(selectSql);
-	selectStmt.setObject(1, uid.toString());
-	selectStmt.setObject(2, classification);
-	ResultSet rs = selectStmt.executeQuery();
-
-	if (rs.next())
-	    message = decode(rs);
-
-	rs.close();
-	selectStmt.close();
-	return message;
+        Message message=null;
+        String selectSql = "select * from message where uuid=?";
+        PreparedStatement selectStmt = connection.prepareStatement(selectSql);
+        selectStmt.setObject(1, uid.toString());
+        ResultSet rs = selectStmt.executeQuery();
+        if (rs.next()) {
+            try {
+                message = Util.deserialize((Serializable) Base64.decodeToObject(rs.getString("message")));
+            } catch (Exception e) {
+                throw new MessageStoreException(e);
+            }
+        }
+        rs.close();
+        selectStmt.close();
+        return message;
     }
-
-    private int delete (URI uid, String classification, Connection connection)
-	    throws SQLException
+    
+    private Message select(URI uid, String classification, Connection connection) 
+    throws SQLException, MessageStoreException
     {
-	String deleteSql = "delete from message where uuid=? and classification=?";
-	PreparedStatement stmt = connection.prepareStatement(deleteSql);
-	stmt.setObject(1, uid.toString());
-	stmt.setObject(2, classification);
-	int result = stmt.executeUpdate();
-	stmt.close();
-	return result;
+        Message message=null;
+        String selectSql = "select * from message where uuid=? and classification=?";
+        PreparedStatement selectStmt = connection.prepareStatement(selectSql);
+        selectStmt.setObject(1, uid.toString());
+        selectStmt.setObject(2, classification);
+        ResultSet rs = selectStmt.executeQuery();
+        if (rs.next()) {
+            try {
+                message = Util.deserialize((Serializable) Base64.decodeToObject(rs.getString("message")));
+            } catch (Exception e) {
+                throw new MessageStoreException(e);
+            }
+        }
+        rs.close();
+        selectStmt.close();
+        return message;
     }
-
-    private void insert (URI uid, Message message, String classification,
-	    String delivered, Connection conn) throws SQLException,
-	    MessageStoreException
+    
+    private int delete(URI uid, String classification, Connection connection)
+        throws SQLException
     {
-	if (message.getType().equals(MessageType.JAVA_SERIALIZED)
-		|| message.getType().equals(MessageType.JBOSS_XML))
-	{
-	    String sql = "insert into message(uuid, type, message, delivered, classification) values(?,?,?,?,?)";
-	    PreparedStatement ps = conn.prepareStatement(sql);
-
-	    ps.setString(1, uid.toString());
-	    ps.setString(2, message.getType().toString());
-
-	    try
-	    {
-		String messageString = null;
-		Serializable serializedForm = Util.serialize(message);
-
-		if (serializedForm instanceof String)
-		    messageString = (String) serializedForm;
-		else
-		{
-		    ByteArrayOutputStream bo = new ByteArrayOutputStream();
-		    ObjectOutputStream os = new ObjectOutputStream(bo);
-
-		    os.writeObject(serializedForm);
-		    os.flush();
-		    os.close();
-
-		    messageString = new String(bo.toByteArray());
-		}
-
-		ps.setString(3, messageString);
-	    }
-	    catch (Exception e)
-	    {
-		e.printStackTrace();
-		
-		throw new MessageStoreException(e);
-	    }
-	    ps.setString(4, "TRUE");
-	    ps.setString(5, classification);
-	    ps.execute();
-	    ps.close();
-	}
-	else
-	{
-	    Thread.currentThread().dumpStack();
-	    
-	    throw new MessageStoreException("Unsupported MessageType: "
-		    + message.getType());
-	}
+        String deleteSql = "delete from message where uuid=? and classification=?";
+        PreparedStatement stmt = connection.prepareStatement(deleteSql);
+        stmt.setObject(1, uid.toString());
+        stmt.setObject(2, classification);
+        int result = stmt.executeUpdate();
+        stmt.close();
+        return result;
     }
-
-    private final Message decode (ResultSet rs) throws MessageStoreException
+    
+    private void insert(URI uid, Message message, String classification, String delivered, Connection conn) 
+        throws SQLException, MessageStoreException
     {
-	try
-	{
-	    URI type = new URI(rs.getString("type"));
-	    String data = rs.getString("message");
-
-	    if (type.equals(MessageType.JAVA_SERIALIZED))
-	    {
-		ByteArrayInputStream bs = new ByteArrayInputStream(data.getBytes());
-		ObjectInputStream os = new ObjectInputStream(bs);
-
-		return Util.deserialize((Serializable) os.readObject());
-	    }
-	    else
-	    {
-		if (type.equals(MessageType.JBOSS_XML))
-		{
-		    return Util.deserialize(data);
-		}
-		else
-		    throw new MessageStoreException("Unsupported MessageType: "
-			    + type);
-	    }
-	}
-	catch (MessageStoreException ex)
-	{
-	    throw ex;
-	}
-	catch (Exception ex)
-	{
-	    ex.printStackTrace();
-	    
-	    throw new MessageStoreException(ex);
-	}
+        String sql = "insert into message(uuid, type, message, delivered, classification) values(?,?,?,?,?)";
+        PreparedStatement ps = conn.prepareStatement(sql);
+        
+        ps.setString(1, uid.toString());
+        ps.setString(2, message.getType().toString());
+        try {
+            String messageString = Base64.encodeObject(Util.serialize(message));
+            ps.setString(3, messageString);
+        } catch (Exception e) {
+            throw new MessageStoreException(e);
+        }
+        ps.setString(4, "TRUE");
+        ps.setString(5, classification);
+        ps.execute();
+        ps.close();
     }
+    
 
-    public Integer getMaxRedeliverCount ()
-    {
-	return maxRedeliverCount;
+    public Integer getMaxRedeliverCount() {
+        return maxRedeliverCount;
     }
 
-    public void setMaxRedeliverCount (Integer maxRedeliverCount)
-    {
-	this.maxRedeliverCount = maxRedeliverCount;
+    public void setMaxRedeliverCount(Integer maxRedeliverCount) {
+        this.maxRedeliverCount = maxRedeliverCount;
     }
 
 }




More information about the jboss-svn-commits mailing list