[jboss-svn-commits] JBL Code SVN: r15712 - in labs/jbossesb/workspace/dbevenius/redeliver/product: services/jbossesb/src/main/java/org/jboss/internal/soa/esb/persistence/format/db and 4 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Wed Oct 10 03:39:45 EDT 2007


Author: beve
Date: 2007-10-10 03:39:44 -0400 (Wed, 10 Oct 2007)
New Revision: 15712

Added:
   labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/test/java/org/jboss/soa/esb/schedule/
   labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/test/java/org/jboss/soa/esb/schedule/RedeliverEventMessageComposerUnitTest.java
Modified:
   labs/jbossesb/workspace/dbevenius/redeliver/product/rosetta/src/org/jboss/soa/esb/services/persistence/RedeliverStore.java
   labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/main/java/org/jboss/internal/soa/esb/persistence/format/db/DBMessageStoreImpl.java
   labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/main/java/org/jboss/soa/esb/actions/MessagePersister.java
   labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/main/java/org/jboss/soa/esb/actions/MessageRedeliverer.java
   labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/main/java/org/jboss/soa/esb/schedule/RedeliverEventMessageComposer.java
Log:
Changes for SJ custom redelivery.


Modified: labs/jbossesb/workspace/dbevenius/redeliver/product/rosetta/src/org/jboss/soa/esb/services/persistence/RedeliverStore.java
===================================================================
--- labs/jbossesb/workspace/dbevenius/redeliver/product/rosetta/src/org/jboss/soa/esb/services/persistence/RedeliverStore.java	2007-10-10 07:03:21 UTC (rev 15711)
+++ labs/jbossesb/workspace/dbevenius/redeliver/product/rosetta/src/org/jboss/soa/esb/services/persistence/RedeliverStore.java	2007-10-10 07:39:44 UTC (rev 15712)
@@ -24,4 +24,5 @@
      * @throws MessageStoreException
      */
     public boolean redeliver(URI uid) throws MessageStoreException;
+    public boolean redeliver(URI uid,String classification) throws MessageStoreException;
 }

Modified: labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/main/java/org/jboss/internal/soa/esb/persistence/format/db/DBMessageStoreImpl.java
===================================================================
--- labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/main/java/org/jboss/internal/soa/esb/persistence/format/db/DBMessageStoreImpl.java	2007-10-10 07:03:21 UTC (rev 15711)
+++ labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/main/java/org/jboss/internal/soa/esb/persistence/format/db/DBMessageStoreImpl.java	2007-10-10 07:39:44 UTC (rev 15712)
@@ -299,24 +299,34 @@
 			}
 		}
 	}
+	
+	/**
+	 * 
+	 */
+    public boolean redeliver(URI uuid) throws MessageStoreException
+    {
+    	return redeliver(uuid, RedeliverStore.CLASSIFICATION_RDLVR);
+    }
+    
     /**
      * 
      */
-    public boolean redeliver(URI uuid) throws MessageStoreException
+    public boolean redeliver(URI uuid,String classification) throws MessageStoreException
     {
         boolean isDelivered=false;
         boolean error=false;
-
         Connection con = null;
         try
         {
             con = mgr.getConnection();
             con.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
             con.setAutoCommit(false);
+//            con.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
+               
+            Message message=select(uuid, classification, con);
             
-            Message message=select(uuid, con);
-            
-            if (message!=null && delete(uuid, RedeliverStore.CLASSIFICATION_RDLVR, con)==1) {
+            int result = delete(uuid, classification, con);
+            if (message!=null && result == 1) {
                 //now any good db should have set a read lock on this record, until we commit.
                 //if exception is thrown up the delivery count on the message
                 //if exceeds the maxcount then update the classification to DLQ.
@@ -332,19 +342,19 @@
                 
                 if (isDelivered) {
                     //the message is delivered, we're good so remove it from the store
-                    delete(uuid, RedeliverStore.CLASSIFICATION_RDLVR, con);
+                    int secondDelete = delete(uuid, classification , con);
                 } else {
                     //the message was not delivered
                     if (message.getProperties().getProperty(DELIVER_COUNT)==null) {
                         //appearantly it was the first time
                         message.getProperties().setProperty(RedeliverStore.DELIVER_COUNT, Integer.valueOf("1"));
-                        insert(uuid, message, MessageStore.CLASSIFICATION_RDLVR, "FALSE", con);
+                        insert(uuid, message, classification, "FALSE", con);
                     } else {
                         Integer redeliverCount = (Integer) message.getProperties().getProperty(DELIVER_COUNT);
                         if (redeliverCount < maxRedeliverCount || maxRedeliverCount < 0) {
                             //up the count
                             message.getProperties().setProperty(RedeliverStore.DELIVER_COUNT, ++redeliverCount);
-                            insert(uuid, message, MessageStore.CLASSIFICATION_RDLVR, "FALSE", con);
+                            insert(uuid, message, CLASSIFICATION_RDLVR, "FALSE", con);
                         } else {
                             //undeliverable, send to the DLQ
                             insert(uuid, message, MessageStore.CLASSIFICATION_DLQ, "FALSE", con);
@@ -376,6 +386,7 @@
                 try {
                     con.close();
                 } catch (Exception e2) {
+                	e2.printStackTrace();
                     logger.error(e2);
                 }
             }
@@ -389,18 +400,27 @@
     {
         Message message=null;
         String selectSql = "select * from "+tableName+" where uuid=?";
-        PreparedStatement selectStmt = connection.prepareStatement(selectSql);
-        selectStmt.setObject(1, uid.toString());
-        ResultSet rs = selectStmt.executeQuery();
-        if (rs.next()) {
-            try {
-                message = Util.deserialize((Serializable) Base64.decodeToObject(rs.getString("message")));
-            } catch (Exception e) {
-                throw new MessageStoreException(e);
-            }
+        PreparedStatement selectStmt = null;
+        ResultSet rs = null;
+        try
+        {
+        	
+	        selectStmt = connection.prepareStatement(selectSql);
+	        selectStmt.setObject(1, uid.toString());
+	        rs = selectStmt.executeQuery();
+	        if (rs.next()) {
+	            try {
+	                message = Util.deserialize((Serializable) Base64.decodeToObject(rs.getString("message")));
+	            } catch (Exception e) {
+	                throw new MessageStoreException(e);
+	            }
+	        }
         }
-        rs.close();
-        selectStmt.close();
+        finally
+        {
+	        if ( rs != null ) rs.close();
+	        if ( selectStmt != null ) selectStmt.close();
+        }
         return message;
     }
     
@@ -409,19 +429,27 @@
     {
         Message message=null;
         String selectSql = "select * from "+tableName+" where uuid=? and classification=?";
-        PreparedStatement selectStmt = connection.prepareStatement(selectSql);
-        selectStmt.setObject(1, uid.toString());
-        selectStmt.setObject(2, classification);
-        ResultSet rs = selectStmt.executeQuery();
-        if (rs.next()) {
-            try {
-                message = Util.deserialize((Serializable) Base64.decodeToObject(rs.getString("message")));
-            } catch (Exception e) {
-                throw new MessageStoreException(e);
-            }
+        PreparedStatement selectStmt = null;
+        ResultSet rs = null;
+        try
+        {
+	        selectStmt = connection.prepareStatement(selectSql);
+	        selectStmt.setObject(1, uid.toString());
+	        selectStmt.setObject(2, classification);
+	        rs = selectStmt.executeQuery();
+	        if (rs.next()) {
+	            try {
+	                message = Util.deserialize((Serializable) Base64.decodeToObject(rs.getString("message")));
+	            } catch (Exception e) {
+	                throw new MessageStoreException(e);
+	            }
+	        }
         }
-        rs.close();
-        selectStmt.close();
+        finally
+        {
+	        if ( rs != null ) rs.close();
+	        if ( selectStmt != null) selectStmt.close();
+        }
         return message;
     }
     
@@ -429,11 +457,19 @@
         throws SQLException
     {
         String deleteSql = "delete from "+tableName+" where uuid=? and classification=?";
-        PreparedStatement stmt = connection.prepareStatement(deleteSql);
-        stmt.setObject(1, uid.toString());
-        stmt.setObject(2, classification);
-        int result = stmt.executeUpdate();
-        stmt.close();
+        PreparedStatement stmt = null;
+        int result = 0;
+        try
+        {
+	        stmt = connection.prepareStatement(deleteSql);
+	        stmt.setObject(1, uid.toString());
+	        stmt.setObject(2, classification);
+	        result = stmt.executeUpdate();
+        }
+        finally
+        {
+	        if ( stmt != null ) stmt.close();
+        }
         return result;
     }
     
@@ -441,20 +477,27 @@
         throws SQLException, MessageStoreException
     {
         String sql = "insert into "+tableName+"(uuid, type, message, delivered, classification) values(?,?,?,?,?)";
-        PreparedStatement ps = conn.prepareStatement(sql);
+        PreparedStatement ps = null;
+        try
+        {
+	        ps = conn.prepareStatement(sql);
         
-        ps.setString(1, uid.toString());
-        ps.setString(2, message.getType().toString());
-        try {
-            String messageString = Base64.encodeObject(Util.serialize(message));
-            ps.setString(3, messageString);
-        } catch (Exception e) {
-            throw new MessageStoreException(e);
+	        ps.setString(1, uid.toString());
+	        ps.setString(2, message.getType().toString());
+	        try {
+	            String messageString = Base64.encodeObject(Util.serialize(message));
+	            ps.setString(3, messageString);
+	        } catch (Exception e) {
+	            throw new MessageStoreException(e);
+	        }
+	        ps.setString(4, "TRUE");
+	        ps.setString(5, classification);
+	        ps.execute();
         }
-        ps.setString(4, "TRUE");
-        ps.setString(5, classification);
-        ps.execute();
-        ps.close();
+        finally
+        {
+	        if ( ps != null ) ps.close();
+        }
     }
     
 

Modified: labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/main/java/org/jboss/soa/esb/actions/MessagePersister.java
===================================================================
--- labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/main/java/org/jboss/soa/esb/actions/MessagePersister.java	2007-10-10 07:03:21 UTC (rev 15711)
+++ labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/main/java/org/jboss/soa/esb/actions/MessagePersister.java	2007-10-10 07:39:44 UTC (rev 15712)
@@ -42,16 +42,19 @@
 {
     public final static String MESSAGE_STORE_CLASS = "message-store-class";
     public final static String CLASSIFICATION_ATTR = "classification";
+    public final static String IGNORE_MSG_CLASSIFICATION_ATTR = "ignore-message-classification";
     
     
     protected ConfigTree config;
     protected MessageStore messageStore;
     protected String classification;
 	private Logger log = Logger.getLogger(this.getClass());
+	private boolean ignoreMsgClassification;
 
 	public MessagePersister(ConfigTree config) throws ConfigurationException
 	{
         this.config = config;
+        log.debug(config);
 	}
 	/** 
      * Persists the message to the MessageStore
@@ -61,7 +64,7 @@
         String classification = this.classification;
         try {
             //the message can override the classification
-            if (message.getProperties().getProperty(MessageStore.CLASSIFICATION)!=null) {
+            if (ignoreMsgClassification != true && message.getProperties().getProperty(MessageStore.CLASSIFICATION)!=null) {
                 classification = String.valueOf(message.getProperties().getProperty(MessageStore.CLASSIFICATION));
                 message.getProperties().remove(MessageStore.CLASSIFICATION);
             }
@@ -82,9 +85,10 @@
         if (classificationValue!=null) {
             classification = classificationValue;
         }
+        ignoreMsgClassification = Boolean.parseBoolean(config.getAttribute( IGNORE_MSG_CLASSIFICATION_ATTR, "false") );
         if (log.isDebugEnabled()) {
             log.debug("MessagePersister started with classification=" + classification 
-                + " and message-store-class=" + messageStore);
+                + " and message-store-class=" + messageStore + ", " + IGNORE_MSG_CLASSIFICATION_ATTR + "=" + ignoreMsgClassification);
         }
         messageStore = MessageStoreFactory.getInstance().getMessageStore(messageStoreClass);
     }
@@ -133,10 +137,11 @@
         if (message.getProperties().getProperty(MessageStore.MESSAGE_URI)!=null) {
             URI uid = (URI) message.getProperties().getProperty(MessageStore.MESSAGE_URI);
             try {
-                if (message.getProperties().getProperty(MessageStore.CLASSIFICATION)!=null) {
+                if (ignoreMsgClassification != true && message.getProperties().getProperty(MessageStore.CLASSIFICATION)!=null) {
                     classification = String.valueOf(message.getProperties().getProperty(MessageStore.CLASSIFICATION));
                 }
                 //the message can override the classification
+                log.debug("Will remove uid: " + uid + ", classification: " + classification);
                 messageStore.removeMessage(uid, classification);
             } catch (MessageStoreException mse) {
                 log.error("Could obtain messages.", mse);

Modified: labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/main/java/org/jboss/soa/esb/actions/MessageRedeliverer.java
===================================================================
--- labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/main/java/org/jboss/soa/esb/actions/MessageRedeliverer.java	2007-10-10 07:03:21 UTC (rev 15711)
+++ labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/main/java/org/jboss/soa/esb/actions/MessageRedeliverer.java	2007-10-10 07:39:44 UTC (rev 15712)
@@ -63,15 +63,18 @@
     
     public Message process(Message message) throws ActionProcessingException
     {
-        classification = MessageStore.CLASSIFICATION_RDLVR;
+        String classification = MessageStore.CLASSIFICATION_RDLVR;
         try {
             //the message can override the classification
             if (message.getProperties().getProperty(MessageStore.CLASSIFICATION)!=null) {
                 classification = String.valueOf(message.getProperties().getProperty(MessageStore.CLASSIFICATION));
             }
             Map<URI, Message> messageMap = messageStore.getAllMessages(classification);
+            int counter = 0;
             for (URI uid : messageMap.keySet()) {
-                redeliverStore.redeliver(uid);
+            	counter++;
+            	log.debug( counter  + "Going to redeliver uid : "+ uid );
+                redeliverStore.redeliver(uid,classification);
             }
         } catch (MessageStoreException mse) {
             log.error("Could not obtain messages.", mse);

Modified: labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/main/java/org/jboss/soa/esb/schedule/RedeliverEventMessageComposer.java
===================================================================
--- labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/main/java/org/jboss/soa/esb/schedule/RedeliverEventMessageComposer.java	2007-10-10 07:03:21 UTC (rev 15711)
+++ labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/main/java/org/jboss/soa/esb/schedule/RedeliverEventMessageComposer.java	2007-10-10 07:39:44 UTC (rev 15712)
@@ -16,6 +16,7 @@
  */
 public class RedeliverEventMessageComposer implements ScheduledEventMessageComposer
 {
+	private String classification = MessageStore.CLASSIFICATION_RDLVR;
 
     /* (non-Javadoc)
      * @see org.jboss.soa.esb.listeners.ScheduledEventMessageComposer#composeMessage()
@@ -23,7 +24,7 @@
     public Message composeMessage() throws SchedulingException 
     {
         Message message = org.jboss.soa.esb.message.format.MessageFactory.getInstance().getMessage();
-        message.getProperties().setProperty(MessageStore.CLASSIFICATION, MessageStore.CLASSIFICATION_RDLVR);
+        message.getProperties().setProperty(MessageStore.CLASSIFICATION, classification);
         return message;
     }
 
@@ -31,15 +32,13 @@
      * @see org.jboss.soa.esb.listeners.ScheduledEventMessageComposer#initialize(org.jboss.soa.esb.helpers.ConfigTree)
      */
     public void initialize(ConfigTree config) throws ConfigurationException {
-        // TODO Auto-generated method stub
-        
+       classification = config.getAttribute( MessageStore.CLASSIFICATION, MessageStore.CLASSIFICATION_RDLVR );
     }
 
     /* (non-Javadoc)
      * @see org.jboss.soa.esb.listeners.ScheduledEventMessageComposer#onProcessingComplete(org.jboss.soa.esb.message.Message)
      */
     public Message onProcessingComplete(Message message) throws SchedulingException {
-        // TODO Auto-generated method stub
         return null;
     }
 
@@ -47,8 +46,11 @@
      * @see org.jboss.soa.esb.listeners.ScheduledEventMessageComposer#uninitialize()
      */
     public void uninitialize() {
-        // TODO Auto-generated method stub
-        
     }
 
+	public String getClassification()
+	{
+		return classification;
+	}
+
 }

Added: labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/test/java/org/jboss/soa/esb/schedule/RedeliverEventMessageComposerUnitTest.java
===================================================================
--- labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/test/java/org/jboss/soa/esb/schedule/RedeliverEventMessageComposerUnitTest.java	                        (rev 0)
+++ labs/jbossesb/workspace/dbevenius/redeliver/product/services/jbossesb/src/test/java/org/jboss/soa/esb/schedule/RedeliverEventMessageComposerUnitTest.java	2007-10-10 07:39:44 UTC (rev 15712)
@@ -0,0 +1,80 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2006, JBoss Inc., and
+ * individual contributors as indicated by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ * 
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ * 
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+package org.jboss.soa.esb.schedule;
+
+import static org.jboss.soa.esb.services.persistence.MessageStore.CLASSIFICATION;
+import static org.jboss.soa.esb.services.persistence.MessageStore.CLASSIFICATION_RDLVR;
+import static org.junit.Assert.assertEquals;
+import junit.framework.JUnit4TestAdapter;
+
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.message.Message;
+import org.junit.Test;
+
+/**
+ * 
+ * @author <a href="mailto:daniel.bevenius at gmail.com">Daniel Bevenius</a>				
+ *
+ */
+public class RedeliverEventMessageComposerUnitTest
+{
+	private RedeliverEventMessageComposer composer = new RedeliverEventMessageComposer();
+	private final String customClassification = "test-classification";
+	
+	@Test
+	public void classification_parameter() throws ConfigurationException
+	{
+		composer.initialize( createConfigTree( customClassification ) );
+		
+		assertEquals( customClassification, composer.getClassification() );
+	}
+	
+	@Test
+	public void composeMessage_default_classification() throws ConfigurationException, SchedulingException
+	{
+		composer.initialize( new ConfigTree("empty") );
+		Message message = composer.composeMessage();
+		assertEquals( CLASSIFICATION_RDLVR, message.getProperties().getProperty( CLASSIFICATION ) );
+	}
+
+	@Test
+	public void composeMessage_custom_classification() throws ConfigurationException, SchedulingException
+	{
+		composer.initialize( createConfigTree( customClassification ) );
+		Message message = composer.composeMessage();
+		assertEquals( customClassification, message.getProperties().getProperty( CLASSIFICATION ) );
+	}
+	
+	private ConfigTree createConfigTree( final String customClassification )
+	{
+		ConfigTree config = new ConfigTree("junit");
+		config.setAttribute( CLASSIFICATION,  customClassification );
+		return config;
+	}
+	
+	public static junit.framework.Test suite()
+	{
+		return new JUnit4TestAdapter(RedeliverEventMessageComposerUnitTest.class);
+	}
+
+}




More information about the jboss-svn-commits mailing list