[jboss-svn-commits] JBL Code SVN: r22715 - labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Fri Sep 12 07:26:38 EDT 2008
Author: mark.little at jboss.com
Date: 2008-09-12 07:26:38 -0400 (Fri, 12 Sep 2008)
New Revision: 22715
Modified:
labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java
Log:
https://jira.jboss.org/jira/browse/JBESB-1941
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java 2008-09-12 10:55:20 UTC (rev 22714)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java 2008-09-12 11:26:38 UTC (rev 22715)
@@ -153,21 +153,45 @@
}
else
{
- synchronized (messageQueue) {
-
- if (!addMessageToQueue(message)) {
+ Object addedObject;
+
+ if (passByValue) {
+ try {
+ addedObject = MessageSerializer.serialize(message);
+ } catch (IOException ex) {
+ logger.warn("Could not serialize message to pass by value.", ex);
return false;
}
+ } else {
+ addedObject = message;
+ }
+
+ synchronized (messageQueue) {
+ messageQueue.add(addedObject);
+
// Notify 1 waiting pickup thread of the delivery...
messageQueue.notify();
if (deliveryTimeout > 0) {
- try {
- // Wait on notification from the pickup thread...
- messageQueue.wait(deliveryTimeout);
- } catch (InterruptedException e) {
- logger.warn("Timeout expired while waiting on message pickup on InVM queue '" + epr.getAddr().getAddress() + "'.", e);
+ long start = System.currentTimeMillis();
+
+ // Continue to wait until the "addedObject" has been removed from the queue,
+ // or the delivery timeout expires...
+ while(messageQueue.contains(addedObject)) {
+ try {
+ // Wait on notification from the pickup thread....
+ messageQueue.wait(5); // Yes, it's a "magic" number, but we don't need to configure it or make a one-off constant!!!
+ } catch (InterruptedException e) {
+ logger.warn("Waiting delivery thread interupted while waiting on message pickup on InVM queue '" + epr.getAddr().getAddress() + "'. Exiting pickup wait state.");
+ break;
+ }
+
+ // If the delivery timeout has expired...
+ if(System.currentTimeMillis() > start + deliveryTimeout) {
+ logger.warn("Timeout expired while waiting on message pickup on InVM queue '" + epr.getAddr().getAddress() + "'.");
+ break;
+ }
}
}
}
@@ -183,19 +207,23 @@
}
}
- private boolean addMessageToQueue(Message message) {
+ private Object addMessageToQueue(Message message) {
+ Object addedObject;
+
if (passByValue) {
try {
- messageQueue.add(MessageSerializer.serialize(message));
+ addedObject = MessageSerializer.serialize(message);
+ messageQueue.add(addedObject);
} catch (IOException ex) {
logger.warn("Could not serialize message to pass by value.", ex);
return false;
}
} else {
+ addedObject = message;
messageQueue.add(message);
}
- return true;
+ return addedObject;
}
/**
@@ -319,7 +347,9 @@
{
synchronized (messageQueue)
{
- if (!addMessageToQueue(message)) {
+ Object addedObject = addMessageToQueue(message);
+
+ if (addedObject == null) {
return false;
}
@@ -345,7 +375,7 @@
public boolean doRedeliver (Message message)
{
synchronized (messageQueue) {
- messageQueue.add(message);
+ messageQueue.add(message);
}
return true;
More information about the jboss-svn-commits
mailing list