[jboss-svn-commits] JBL Code SVN: r22715 - labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Fri Sep 12 07:26:38 EDT 2008


Author: mark.little at jboss.com
Date: 2008-09-12 07:26:38 -0400 (Fri, 12 Sep 2008)
New Revision: 22715

Modified:
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java
Log:
https://jira.jboss.org/jira/browse/JBESB-1941

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java	2008-09-12 10:55:20 UTC (rev 22714)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java	2008-09-12 11:26:38 UTC (rev 22715)
@@ -153,21 +153,45 @@
             }
             else
             {
-                synchronized (messageQueue) {
-
-                    if (!addMessageToQueue(message)) {
+                Object addedObject;
+                
+                if (passByValue) {
+                    try {
+                        addedObject = MessageSerializer.serialize(message);
+                    } catch (IOException ex) {
+                        logger.warn("Could not serialize message to pass by value.", ex);
                         return false;
                     }
+                } else {
+                    addedObject = message;
+                }
+                
+                synchronized (messageQueue) {
 
+                    messageQueue.add(addedObject);
+
                     // Notify 1 waiting pickup thread of the delivery...
                     messageQueue.notify();
 
                     if (deliveryTimeout > 0) {
-                        try {
-                            // Wait on notification from the pickup thread...
-                            messageQueue.wait(deliveryTimeout);
-                        } catch (InterruptedException e) {
-                            logger.warn("Timeout expired while waiting on message pickup on InVM queue '" + epr.getAddr().getAddress() + "'.", e);
+                        long start = System.currentTimeMillis();
+
+                        // Continue to wait until the "addedObject" has been removed from the queue,
+                        // or the delivery timeout expires...
+                        while(messageQueue.contains(addedObject)) {
+                            try {
+                                // Wait on notification from the pickup thread....
+                                messageQueue.wait(5); // Yes, it's a "magic" number, but we don't need to configure it or make a one-off constant!!!
+                            } catch (InterruptedException e) {
+                                logger.warn("Waiting delivery thread interupted while waiting on message pickup on InVM queue '" + epr.getAddr().getAddress() + "'.  Exiting pickup wait state.");
+                                break;
+                            }
+
+                            // If the delivery timeout has expired...
+                            if(System.currentTimeMillis() > start + deliveryTimeout) {
+                                logger.warn("Timeout expired while waiting on message pickup on InVM queue '" + epr.getAddr().getAddress() + "'.");
+                                break;
+                            }
                         }
                     }
                 }
@@ -183,19 +207,23 @@
         }
     }
 
-    private boolean addMessageToQueue(Message message) {
+    private Object addMessageToQueue(Message message) {
+        Object addedObject;
+
         if (passByValue) {
             try {
-                messageQueue.add(MessageSerializer.serialize(message));
+                addedObject = MessageSerializer.serialize(message);
+                messageQueue.add(addedObject);
             } catch (IOException ex) {
                 logger.warn("Could not serialize message to pass by value.", ex);
                 return false;
             }
         } else {
+            addedObject = message;
             messageQueue.add(message);
         }
 
-        return true;
+        return addedObject;
     }
 
     /**
@@ -319,7 +347,9 @@
     {
         synchronized (messageQueue)
         {
-            if (!addMessageToQueue(message)) {
+            Object addedObject = addMessageToQueue(message);
+
+            if (addedObject == null) {
                 return false;
             }
 
@@ -345,7 +375,7 @@
     public boolean doRedeliver (Message message)
     {
         synchronized (messageQueue) {
-                messageQueue.add(message);
+            messageQueue.add(message);
         }
         
         return true;




More information about the jboss-svn-commits mailing list