[jboss-svn-commits] JBL Code SVN: r21658 - labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Thu Aug 21 08:12:00 EDT 2008


Author: tfennelly
Date: 2008-08-21 08:11:59 -0400 (Thu, 21 Aug 2008)
New Revision: 21658

Modified:
   labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java
Log:
https://jira.jboss.org/jira/browse/JBESB-1944
Changed delivery to return to a wait state while delivered message is still in the queue.

Modified: labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java	2008-08-21 10:45:19 UTC (rev 21657)
+++ labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java	2008-08-21 12:11:59 UTC (rev 21658)
@@ -155,7 +155,9 @@
             {
                 synchronized (messageQueue) {
 
-                    if (!addMessageToQueue(message)) {
+                    Object addedObject = addMessageToQueue(message);
+
+                    if (addedObject == null) {
                         return false;
                     }
 
@@ -163,11 +165,24 @@
                     messageQueue.notify();
 
                     if (deliveryTimeout > 0) {
-                        try {
-                            // Wait on notification from the pickup thread...
-                            messageQueue.wait(deliveryTimeout);
-                        } catch (InterruptedException e) {
-                            logger.warn("Timeout expired while waiting on message pickup on InVM queue '" + epr.getAddr().getAddress() + "'.", e);
+                        long start = System.currentTimeMillis();
+
+                        // Continue to wait until the "addedObject" has been removed from the queue,
+                        // or the delivery timeout expires...
+                        while(messageQueue.contains(addedObject)) {
+                            try {
+                                // Wait on notification from the pickup thread....
+                                messageQueue.wait(5); // Yes, it's a "magic" number, but we don't need to configure it or make a one-off constant!!!
+                            } catch (InterruptedException e) {
+                                logger.warn("Waiting delivery thread interupted while waiting on message pickup on InVM queue '" + epr.getAddr().getAddress() + "'.  Exiting pickup wait state.");
+                                break;
+                            }
+
+                            // If the delivery timeout has expired...
+                            if(System.currentTimeMillis() > start + deliveryTimeout) {
+                                logger.warn("Timeout expired while waiting on message pickup on InVM queue '" + epr.getAddr().getAddress() + "'.");
+                                break;
+                            }
                         }
                     }
                 }
@@ -183,19 +198,23 @@
         }
     }
 
-    private boolean addMessageToQueue(Message message) {
+    private Object addMessageToQueue(Message message) {
+        Object addedObject;
+
         if (passByValue) {
             try {
-                messageQueue.add(MessageSerializer.serialize(message));
+                addedObject = MessageSerializer.serialize(message);
+                messageQueue.add(addedObject);
             } catch (IOException ex) {
                 logger.warn("Could not serialize message to pass by value.", ex);
                 return false;
             }
         } else {
+            addedObject = message;
             messageQueue.add(message);
         }
 
-        return true;
+        return addedObject;
     }
 
     /**
@@ -319,7 +338,9 @@
     {
         synchronized (messageQueue)
         {
-            if (!addMessageToQueue(message)) {
+            Object addedObject = addMessageToQueue(message);
+
+            if (addedObject == null) {
                 return false;
             }
 
@@ -345,7 +366,7 @@
     public boolean doRedeliver (Message message)
     {
         synchronized (messageQueue) {
-                messageQueue.add(message);
+            messageQueue.add(message);
         }
         
         return true;




More information about the jboss-svn-commits mailing list