[jboss-svn-commits] JBL Code SVN: r21658 - labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Thu Aug 21 08:12:00 EDT 2008
Author: tfennelly
Date: 2008-08-21 08:11:59 -0400 (Thu, 21 Aug 2008)
New Revision: 21658
Modified:
labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java
Log:
https://jira.jboss.org/jira/browse/JBESB-1944
Changed delivery to return to a wait state while delivered message is still in the queue.
Modified: labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java 2008-08-21 10:45:19 UTC (rev 21657)
+++ labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java 2008-08-21 12:11:59 UTC (rev 21658)
@@ -155,7 +155,9 @@
{
synchronized (messageQueue) {
- if (!addMessageToQueue(message)) {
+ Object addedObject = addMessageToQueue(message);
+
+ if (addedObject == null) {
return false;
}
@@ -163,11 +165,24 @@
messageQueue.notify();
if (deliveryTimeout > 0) {
- try {
- // Wait on notification from the pickup thread...
- messageQueue.wait(deliveryTimeout);
- } catch (InterruptedException e) {
- logger.warn("Timeout expired while waiting on message pickup on InVM queue '" + epr.getAddr().getAddress() + "'.", e);
+ long start = System.currentTimeMillis();
+
+ // Continue to wait until the "addedObject" has been removed from the queue,
+ // or the delivery timeout expires...
+ while(messageQueue.contains(addedObject)) {
+ try {
+ // Wait on notification from the pickup thread....
+ messageQueue.wait(5); // Yes, it's a "magic" number, but we don't need to configure it or make a one-off constant!!!
+ } catch (InterruptedException e) {
+ logger.warn("Waiting delivery thread interupted while waiting on message pickup on InVM queue '" + epr.getAddr().getAddress() + "'. Exiting pickup wait state.");
+ break;
+ }
+
+ // If the delivery timeout has expired...
+ if(System.currentTimeMillis() > start + deliveryTimeout) {
+ logger.warn("Timeout expired while waiting on message pickup on InVM queue '" + epr.getAddr().getAddress() + "'.");
+ break;
+ }
}
}
}
@@ -183,19 +198,23 @@
}
}
- private boolean addMessageToQueue(Message message) {
+ private Object addMessageToQueue(Message message) {
+ Object addedObject;
+
if (passByValue) {
try {
- messageQueue.add(MessageSerializer.serialize(message));
+ addedObject = MessageSerializer.serialize(message);
+ messageQueue.add(addedObject);
} catch (IOException ex) {
logger.warn("Could not serialize message to pass by value.", ex);
return false;
}
} else {
+ addedObject = message;
messageQueue.add(message);
}
- return true;
+ return addedObject;
}
/**
@@ -319,7 +338,9 @@
{
synchronized (messageQueue)
{
- if (!addMessageToQueue(message)) {
+ Object addedObject = addMessageToQueue(message);
+
+ if (addedObject == null) {
return false;
}
@@ -345,7 +366,7 @@
public boolean doRedeliver (Message message)
{
synchronized (messageQueue) {
- messageQueue.add(message);
+ messageQueue.add(message);
}
return true;
More information about the jboss-svn-commits
mailing list