[jboss-cvs] JBoss Messaging SVN: r2890 - in trunk: src/main/org/jboss/messaging/core/impl/postoffice and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Jul 13 09:17:55 EDT 2007


Author: timfox
Date: 2007-07-13 09:17:55 -0400 (Fri, 13 Jul 2007)
New Revision: 2890

Modified:
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTestBase.java
Log:
Few more fixes


Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-07-13 07:18:19 UTC (rev 2889)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-07-13 13:17:55 UTC (rev 2890)
@@ -171,6 +171,8 @@
    
    private LinkedQueue toDeliver = new LinkedQueue();
    
+   private boolean waitingToClose = false;
+   
    // Constructors ---------------------------------------------------------------------------------
 
    ServerSessionEndpoint(String sessionID, ServerConnectionEndpoint connectionEndpoint) throws Exception
@@ -894,7 +896,10 @@
 	   			{
 	   		   	synchronized (deliveryLock)
 	   		   	{
-	   		   		deliveryLock.notifyAll();
+	   		   		if (waitingToClose)
+	   		   		{
+	   		   			deliveryLock.notifyAll();
+	   		   		}
 	   		   	}   					   			
 	   			}
 	   		}
@@ -923,93 +928,189 @@
    	   	
    	boolean delivered = false;
    	
-   	//Note there will only be contention on this if two or more responses come back at the same time - which is unlikely
-   	//Is it even possible? If the responses come back on the same JGroups channel - surely this can't happen - maybe
-   	//we can remove the lock?
-   	//Anyway there is little overhead if the lock is not contended
-   	synchronized (myLock)
+   	//I have commented this out since we should be able guarantee responses come back in order if we use
+   	//a QueuedExecutor on the other node to send the response
+   	
+//   	//Note there will only be contention on this if two or more responses come back at the same time - which is unlikely
+//   	//TODO - This can occur since replicates are sent to the other node, and the responses are sent back using a pool which
+//   	//means earlier responses can be received after later ones -hence we need to cope with this
+//   	//However - if we used a queued executor on the other node to send back responses we could remove all this locking!!
+//   	synchronized (myLock)
+//   	{
+//   		long toWait = DELIVERY_WAIT_TIMEOUT;
+//   		
+//   		while (toWait > 0)
+//      	{
+//      		DeliveryRecord dr = (DeliveryRecord)toDeliver.peek();
+//      		      	      		
+//      		if (dr == null)
+//      		{
+//      			if (trace) { log.trace("No more deliveries in list"); }
+//      			
+//      			break;
+//      		}
+//      		
+//      		if (trace) { log.trace("Peeked delivery record: " + dr.deliveryID); }
+//      		
+//      		boolean wait = false;
+//      		
+//      		//Needs to be synchronized to prevent delivery occurring twice e.g. if this occurs at same time as collectDeliveries
+//      		synchronized (dr)
+//      		{	   		
+//   	   		boolean performDelivery = false;
+//   	   		
+//   	   		if (dr.waitingForResponse)
+//   	   		{
+//   	   			if (dr == rec)
+//   	   			{
+//   	   				if (trace) { log.trace("Found our delivery"); }
+//   	   				
+//   	   				performDelivery = true;
+//   	   			}
+//   	   			else
+//   	   			{
+//   	   				if (!delivered)
+//   	   				{
+//	   	   				//We have to wait for another response to arrive first
+//	   	   				
+//	   	   				if (trace) { log.trace("Not ours - need to wait"); }
+//	   	   				
+//	   	   				wait = true;
+//   	   				}
+//   	   				else
+//   	   				{
+//   	   					//We have delivered ours and possibly any non replicated deliveries too   	   					   	   					
+//   	   	   	   	
+//   	   	   	   	myLock.notify();
+//   	   	   	   	
+//   	   					break;
+//   	   				}
+//   	   			}
+//   	   		}
+//   	   		else
+//   	   		{
+//   	   			//Non replicated delivery
+//   	   			
+//   	   			if (trace) { log.trace("Non replicated delivery"); }
+//   	   			
+//   	   			performDelivery = true;
+//   	   		}
+//   	   		
+//   	   		if (performDelivery)
+//   	   		{
+//   	   			toDeliver.take();
+//   	   			
+//   	   			performDelivery(dr.del.getReference(), dr.deliveryID, dr.getConsumer()); 
+//   	   			
+//   	   			delivered = true;
+//   	   	   	
+//   	   	   	dr.waitingForResponse = false;
+//   	   	   	
+//   	   	   	delivered = true;
+//   	   		}
+//      		}
+//      		
+//      		if (wait)
+//      		{
+//   				long start = System.currentTimeMillis();
+//   				
+//      			try
+//      			{
+//      				if (trace) { log.trace("Waiting"); }
+//      				
+//      				//We need to wait since responses have come back out of order
+//      				myLock.wait(toWait);
+//      				
+//      				if (trace) { log.trace("Woke up"); }
+//      			}
+//      			catch (InterruptedException e)
+//      			{      				
+//      			}
+//      			toWait -= (System.currentTimeMillis() - start);
+//      		}      		
+//      	}
+//   		if (toWait <= 0)
+//   		{
+//   			throw new IllegalStateException("Timed out waiting for previous response to arrive");
+//   		}
+//   	}   	   	
+   	
+		while (true)
    	{
-   		long toWait = DELIVERY_WAIT_TIMEOUT;
+   		DeliveryRecord dr = (DeliveryRecord)toDeliver.peek();
+   		      	      		
+   		if (dr == null)
+   		{
+   			if (trace) { log.trace("No more deliveries in list"); }
+   			
+   			break;
+   		}
    		
-   		while (toWait > 0)
-      	{
-      		DeliveryRecord dr = (DeliveryRecord)toDeliver.peek();
-      		
-      		if (dr == null)
-      		{
-      			//Response came back after deliveries collected? - Do nothing
-      			break;
-      		}
-      		
-      		boolean wait = false;
-      		
-      		//Needs to be synchronized to prevent delivery occurring twice e.g. if this occurs at same time as collectDeliveries
-      		synchronized (dr)
-      		{	   		
-   	   		boolean performDelivery = false;
-   	   		
-   	   		if (dr.waitingForResponse)
-   	   		{
-   	   			if (dr == rec)
-   	   			{
-   	   				performDelivery = true;
-   	   			}
-   	   			else
-   	   			{
+   		if (trace) { log.trace("Peeked delivery record: " + dr.deliveryID); }
+   		
+   		//Needs to be synchronized to prevent delivery occurring twice e.g. if this occurs at same time as collectDeliveries
+   		synchronized (dr)
+   		{	   		
+	   		boolean performDelivery = false;
+	   		
+	   		if (dr.waitingForResponse)
+	   		{
+	   			if (dr == rec)
+	   			{
+	   				if (trace) { log.trace("Found our delivery"); }
+	   				
+	   				performDelivery = true;
+	   			}
+	   			else
+	   			{
+	   				if (!delivered)
+	   				{
    	   				//We have to wait for another response to arrive first
-   	   				wait = true;
-   	   			}
-   	   		}
-   	   		else
-   	   		{
-   	   			//Non replicated delivery
-   	   			performDelivery = true;
-   	   		}
-   	   		
-   	   		if (performDelivery)
-   	   		{
-   	   			toDeliver.take();
-   	   			
-   	   			performDelivery(dr.del.getReference(), dr.deliveryID, dr.getConsumer()); 
-   	   			
-   	   			delivered = true;
-   	   	   	
-   	   	   	dr.waitingForResponse = false;
-   	   	   	
-   	   	   	myLock.notify();
-   	   	   	
-   	   	   	break;
-   	   		}
-      		}
-      		
-      		if (wait)
-      		{
-   				long start = System.currentTimeMillis();
-   				
-      			try
-      			{
-      				//We need to wait since responses have come back out of order
-      				myLock.wait(toWait);
-      			}
-      			catch (InterruptedException e)
-      			{      				
-      			}
-      			toWait -= (System.currentTimeMillis() - start);
-      		}      		
-      	}
-   		if (toWait <= 0)
-   		{
-   			throw new IllegalStateException("Timed out waiting for previous response to arrive");
-   		}
-   	}   	   	
+   	   				
+   	   				throw new IllegalStateException("Reponses have come back our of order");
+	   				}
+	   				else
+	   				{
+	   					//We have delivered ours and possibly any non replicated deliveries too   	   					   	   					
+	   	   	   	
+	   					break;
+	   				}
+	   			}
+	   		}
+	   		else
+	   		{
+	   			//Non replicated delivery
+	   			
+	   			if (trace) { log.trace("Non replicated delivery"); }
+	   			
+	   			performDelivery = true;
+	   		}
+	   		
+	   		if (performDelivery)
+	   		{
+	   			toDeliver.take();
+	   			
+	   			performDelivery(dr.del.getReference(), dr.deliveryID, dr.getConsumer()); 
+	   			
+	   			delivered = true;
+	   	   	
+	   	   	dr.waitingForResponse = false;
+	   	   	
+	   	   	delivered = true;
+	   		}
+   		}	
+   	}   	  	      	
    	
    	if (delivered)
    	{
 	   	synchronized (deliveryLock)
 	   	{
-	   		deliveryLock.notifyAll();
+	   		if (waitingToClose)
+	   		{
+	   			deliveryLock.notifyAll();
+	   		}
 	   	}
-   	}
+   	}   	   	   	   
    }
 
    // Package protected ----------------------------------------------------------------------------
@@ -1203,6 +1304,7 @@
 	   		
 	   		if (wait)
 	   		{
+	   			waitingToClose = true;
 	   			try
 	   			{
 	   				deliveryLock.wait(toWait);
@@ -1222,7 +1324,8 @@
    			while (toDeliver.poll(0) != null) {}
    			
    			log.warn("Timed out waiting for response to arrive");
-   		}   		   		
+   		}   		
+   		waitingToClose = false;
    	}
    }
    
@@ -1236,6 +1339,8 @@
    	 
    	 deliveryId = deliveryIdSequence.increment();   	 
    	 
+   	 if (trace) { log.trace("Delivery id is now " + deliveryId); }
+   	 
    	 //TODO can't we combine flags isRetainDeliveries and isReplicating - surely they're mutually exclusive?
        if (consumer.isRetainDeliveries())
        {      	 
@@ -1254,6 +1359,8 @@
        	{
        		//This basically just releases the memory reference
        		
+       		if (trace) { log.trace("Acknowledging delivery now"); }
+       		
        		delivery.acknowledge(null);
        	}
        	catch (Throwable t)
@@ -1275,6 +1382,8 @@
        {
       	 if (!toDeliver.isEmpty())
       	 {
+      		 if (trace) { log.trace("Message is unreliable and there are refs in the toDeliver so adding to list"); }
+      		 
       		 //We need to add to the list to prevent non persistent messages overtaking persistent messages from the same
       		 //producer in flight (since np don't need to be replicated)
       		 toDeliver.put(rec);
@@ -1290,6 +1399,8 @@
       	 }
       	 else
       	 {
+      		 if (trace) { log.trace("Message is unreliable, but no deliveries in list so performing delivery now"); }
+      		 
       		 // Actually do the delivery now
          	 performDelivery(delivery.getReference(), deliveryId, consumer); 
       	 }

Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-07-13 07:18:19 UTC (rev 2889)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-07-13 13:17:55 UTC (rev 2890)
@@ -83,6 +83,7 @@
 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
 import EDU.oswego.cs.dl.util.concurrent.ReentrantWriterPreferenceReadWriteLock;
 
@@ -214,7 +215,8 @@
    //use it
    private ServerPeer serverPeer;
    
-   private PooledExecutor replyExecutor;
+   //Note this MUST be a queued executor to ensure replicate repsonses arrive back in order
+   private QueuedExecutor replyExecutor;
    
    private volatile int failoverNodeID = -1;
    
@@ -1579,7 +1581,8 @@
          leftSet = new ConcurrentHashSet();
       }
       
-      replyExecutor = new PooledExecutor(new LinkedQueue(),  10);
+      //NOTE, MUST be a QueuedExecutor so we ensure that responses arrive back in order
+      replyExecutor = new QueuedExecutor(new LinkedQueue());
    }
    
    private void deInit()
@@ -3010,14 +3013,19 @@
 
 		public void afterCommit(boolean onePhase) throws Exception
 		{
-			if (nodeID == null)
-			{
-				multicastRequest(request);
-			}
-			else
-			{
-				unicastRequest(request, nodeID.intValue());
-			}
+//			if (nodeID == null)
+//			{
+//				multicastRequest(request);
+//			}
+//			else
+//			{
+//				unicastRequest(request, nodeID.intValue());
+//			}
+			
+			//For now we always multicast otherwise there is the possibility that messages send unicast arrive in a different order
+			//to messages send multicast
+			//We might be able to fix this using anycast
+			multicastRequest(request);
 		}
 
 		public void afterPrepare() throws Exception

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTestBase.java	2007-07-13 07:18:19 UTC (rev 2889)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTestBase.java	2007-07-13 13:17:55 UTC (rev 2890)
@@ -327,11 +327,13 @@
          
          do
          {
-            tm = (TextMessage)cons2.receive(1000);
+            tm = (TextMessage)cons2.receive(5000);
             
             if (tm != null)
             {                     
 	            msgs.add(tm.getText());
+	            
+	            log.info("Got message " + tm.getText());
             }
          }           
          while (tm != null);




More information about the jboss-cvs-commits mailing list