[jboss-cvs] JBoss Messaging SVN: r2890 - in trunk: src/main/org/jboss/messaging/core/impl/postoffice and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jul 13 09:17:55 EDT 2007
Author: timfox
Date: 2007-07-13 09:17:55 -0400 (Fri, 13 Jul 2007)
New Revision: 2890
Modified:
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTestBase.java
Log:
Few more fixes
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-07-13 07:18:19 UTC (rev 2889)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-07-13 13:17:55 UTC (rev 2890)
@@ -171,6 +171,8 @@
private LinkedQueue toDeliver = new LinkedQueue();
+ private boolean waitingToClose = false;
+
// Constructors ---------------------------------------------------------------------------------
ServerSessionEndpoint(String sessionID, ServerConnectionEndpoint connectionEndpoint) throws Exception
@@ -894,7 +896,10 @@
{
synchronized (deliveryLock)
{
- deliveryLock.notifyAll();
+ if (waitingToClose)
+ {
+ deliveryLock.notifyAll();
+ }
}
}
}
@@ -923,93 +928,189 @@
boolean delivered = false;
- //Note there will only be contention on this if two or more responses come back at the same time - which is unlikely
- //Is it even possible? If the responses come back on the same JGroups channel - surely this can't happen - maybe
- //we can remove the lock?
- //Anyway there is little overhead if the lock is not contended
- synchronized (myLock)
+ //I have commented this out since we should be able guarantee responses come back in order if we use
+ //a QueuedExecutor on the other node to send the response
+
+// //Note there will only be contention on this if two or more responses come back at the same time - which is unlikely
+// //TODO - This can occur since replicates are sent to the other node, and the responses are sent back using a pool which
+// //means earlier responses can be received after later ones -hence we need to cope with this
+// //However - if we used a queued executor on the other node to send back responses we could remove all this locking!!
+// synchronized (myLock)
+// {
+// long toWait = DELIVERY_WAIT_TIMEOUT;
+//
+// while (toWait > 0)
+// {
+// DeliveryRecord dr = (DeliveryRecord)toDeliver.peek();
+//
+// if (dr == null)
+// {
+// if (trace) { log.trace("No more deliveries in list"); }
+//
+// break;
+// }
+//
+// if (trace) { log.trace("Peeked delivery record: " + dr.deliveryID); }
+//
+// boolean wait = false;
+//
+// //Needs to be synchronized to prevent delivery occurring twice e.g. if this occurs at same time as collectDeliveries
+// synchronized (dr)
+// {
+// boolean performDelivery = false;
+//
+// if (dr.waitingForResponse)
+// {
+// if (dr == rec)
+// {
+// if (trace) { log.trace("Found our delivery"); }
+//
+// performDelivery = true;
+// }
+// else
+// {
+// if (!delivered)
+// {
+// //We have to wait for another response to arrive first
+//
+// if (trace) { log.trace("Not ours - need to wait"); }
+//
+// wait = true;
+// }
+// else
+// {
+// //We have delivered ours and possibly any non replicated deliveries too
+//
+// myLock.notify();
+//
+// break;
+// }
+// }
+// }
+// else
+// {
+// //Non replicated delivery
+//
+// if (trace) { log.trace("Non replicated delivery"); }
+//
+// performDelivery = true;
+// }
+//
+// if (performDelivery)
+// {
+// toDeliver.take();
+//
+// performDelivery(dr.del.getReference(), dr.deliveryID, dr.getConsumer());
+//
+// delivered = true;
+//
+// dr.waitingForResponse = false;
+//
+// delivered = true;
+// }
+// }
+//
+// if (wait)
+// {
+// long start = System.currentTimeMillis();
+//
+// try
+// {
+// if (trace) { log.trace("Waiting"); }
+//
+// //We need to wait since responses have come back out of order
+// myLock.wait(toWait);
+//
+// if (trace) { log.trace("Woke up"); }
+// }
+// catch (InterruptedException e)
+// {
+// }
+// toWait -= (System.currentTimeMillis() - start);
+// }
+// }
+// if (toWait <= 0)
+// {
+// throw new IllegalStateException("Timed out waiting for previous response to arrive");
+// }
+// }
+
+ while (true)
{
- long toWait = DELIVERY_WAIT_TIMEOUT;
+ DeliveryRecord dr = (DeliveryRecord)toDeliver.peek();
+
+ if (dr == null)
+ {
+ if (trace) { log.trace("No more deliveries in list"); }
+
+ break;
+ }
- while (toWait > 0)
- {
- DeliveryRecord dr = (DeliveryRecord)toDeliver.peek();
-
- if (dr == null)
- {
- //Response came back after deliveries collected? - Do nothing
- break;
- }
-
- boolean wait = false;
-
- //Needs to be synchronized to prevent delivery occurring twice e.g. if this occurs at same time as collectDeliveries
- synchronized (dr)
- {
- boolean performDelivery = false;
-
- if (dr.waitingForResponse)
- {
- if (dr == rec)
- {
- performDelivery = true;
- }
- else
- {
+ if (trace) { log.trace("Peeked delivery record: " + dr.deliveryID); }
+
+ //Needs to be synchronized to prevent delivery occurring twice e.g. if this occurs at same time as collectDeliveries
+ synchronized (dr)
+ {
+ boolean performDelivery = false;
+
+ if (dr.waitingForResponse)
+ {
+ if (dr == rec)
+ {
+ if (trace) { log.trace("Found our delivery"); }
+
+ performDelivery = true;
+ }
+ else
+ {
+ if (!delivered)
+ {
//We have to wait for another response to arrive first
- wait = true;
- }
- }
- else
- {
- //Non replicated delivery
- performDelivery = true;
- }
-
- if (performDelivery)
- {
- toDeliver.take();
-
- performDelivery(dr.del.getReference(), dr.deliveryID, dr.getConsumer());
-
- delivered = true;
-
- dr.waitingForResponse = false;
-
- myLock.notify();
-
- break;
- }
- }
-
- if (wait)
- {
- long start = System.currentTimeMillis();
-
- try
- {
- //We need to wait since responses have come back out of order
- myLock.wait(toWait);
- }
- catch (InterruptedException e)
- {
- }
- toWait -= (System.currentTimeMillis() - start);
- }
- }
- if (toWait <= 0)
- {
- throw new IllegalStateException("Timed out waiting for previous response to arrive");
- }
- }
+
+ throw new IllegalStateException("Reponses have come back our of order");
+ }
+ else
+ {
+ //We have delivered ours and possibly any non replicated deliveries too
+
+ break;
+ }
+ }
+ }
+ else
+ {
+ //Non replicated delivery
+
+ if (trace) { log.trace("Non replicated delivery"); }
+
+ performDelivery = true;
+ }
+
+ if (performDelivery)
+ {
+ toDeliver.take();
+
+ performDelivery(dr.del.getReference(), dr.deliveryID, dr.getConsumer());
+
+ delivered = true;
+
+ dr.waitingForResponse = false;
+
+ delivered = true;
+ }
+ }
+ }
if (delivered)
{
synchronized (deliveryLock)
{
- deliveryLock.notifyAll();
+ if (waitingToClose)
+ {
+ deliveryLock.notifyAll();
+ }
}
- }
+ }
}
// Package protected ----------------------------------------------------------------------------
@@ -1203,6 +1304,7 @@
if (wait)
{
+ waitingToClose = true;
try
{
deliveryLock.wait(toWait);
@@ -1222,7 +1324,8 @@
while (toDeliver.poll(0) != null) {}
log.warn("Timed out waiting for response to arrive");
- }
+ }
+ waitingToClose = false;
}
}
@@ -1236,6 +1339,8 @@
deliveryId = deliveryIdSequence.increment();
+ if (trace) { log.trace("Delivery id is now " + deliveryId); }
+
//TODO can't we combine flags isRetainDeliveries and isReplicating - surely they're mutually exclusive?
if (consumer.isRetainDeliveries())
{
@@ -1254,6 +1359,8 @@
{
//This basically just releases the memory reference
+ if (trace) { log.trace("Acknowledging delivery now"); }
+
delivery.acknowledge(null);
}
catch (Throwable t)
@@ -1275,6 +1382,8 @@
{
if (!toDeliver.isEmpty())
{
+ if (trace) { log.trace("Message is unreliable and there are refs in the toDeliver so adding to list"); }
+
//We need to add to the list to prevent non persistent messages overtaking persistent messages from the same
//producer in flight (since np don't need to be replicated)
toDeliver.put(rec);
@@ -1290,6 +1399,8 @@
}
else
{
+ if (trace) { log.trace("Message is unreliable, but no deliveries in list so performing delivery now"); }
+
// Actually do the delivery now
performDelivery(delivery.getReference(), deliveryId, consumer);
}
Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-07-13 07:18:19 UTC (rev 2889)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-07-13 13:17:55 UTC (rev 2890)
@@ -83,6 +83,7 @@
import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
import EDU.oswego.cs.dl.util.concurrent.ReentrantWriterPreferenceReadWriteLock;
@@ -214,7 +215,8 @@
//use it
private ServerPeer serverPeer;
- private PooledExecutor replyExecutor;
+ //Note this MUST be a queued executor to ensure replicate repsonses arrive back in order
+ private QueuedExecutor replyExecutor;
private volatile int failoverNodeID = -1;
@@ -1579,7 +1581,8 @@
leftSet = new ConcurrentHashSet();
}
- replyExecutor = new PooledExecutor(new LinkedQueue(), 10);
+ //NOTE, MUST be a QueuedExecutor so we ensure that responses arrive back in order
+ replyExecutor = new QueuedExecutor(new LinkedQueue());
}
private void deInit()
@@ -3010,14 +3013,19 @@
public void afterCommit(boolean onePhase) throws Exception
{
- if (nodeID == null)
- {
- multicastRequest(request);
- }
- else
- {
- unicastRequest(request, nodeID.intValue());
- }
+// if (nodeID == null)
+// {
+// multicastRequest(request);
+// }
+// else
+// {
+// unicastRequest(request, nodeID.intValue());
+// }
+
+ //For now we always multicast otherwise there is the possibility that messages send unicast arrive in a different order
+ //to messages send multicast
+ //We might be able to fix this using anycast
+ multicastRequest(request);
}
public void afterPrepare() throws Exception
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTestBase.java 2007-07-13 07:18:19 UTC (rev 2889)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTestBase.java 2007-07-13 13:17:55 UTC (rev 2890)
@@ -327,11 +327,13 @@
do
{
- tm = (TextMessage)cons2.receive(1000);
+ tm = (TextMessage)cons2.receive(5000);
if (tm != null)
{
msgs.add(tm.getText());
+
+ log.info("Got message " + tm.getText());
}
}
while (tm != null);
More information about the jboss-cvs-commits
mailing list