[jboss-cvs] JBoss Messaging SVN: r3205 - in trunk/src/main/org/jboss: messaging/core/contract and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Oct 19 09:22:08 EDT 2007


Author: timfox
Date: 2007-10-19 09:22:08 -0400 (Fri, 19 Oct 2007)
New Revision: 3205

Modified:
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/messaging/core/contract/PostOffice.java
   trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
   trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
Log:
tweaks


Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-10-19 12:42:17 UTC (rev 3204)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-10-19 13:22:08 UTC (rev 3205)
@@ -179,9 +179,9 @@
    
    private Object waitLock = new Object();
    
-   //debug
-   private SynchronizedInt toDeliverCount = new SynchronizedInt(0);
    
+   private boolean dosync = true;
+   
    // Constructors ---------------------------------------------------------------------------------
 
    ServerSessionEndpoint(String sessionID, ServerConnectionEndpoint connectionEndpoint) throws Exception
@@ -621,7 +621,7 @@
                
                if (supportsFailover)
                {
-               	postOffice.sendReplicateDeliveryMessage(queueName, id, del.getReference().getMessage().getMessageID(), deliveryId, false, true);
+               	postOffice.sendReplicateDeliveryMessage(queueName, id, del.getReference().getMessage().getMessageID(), deliveryId, false, true, true);
                }
             }
          }
@@ -911,8 +911,6 @@
    		while (iter.hasNext())
    		{
    			toDeliver.put(iter.next());
-   			
-   			this.toDeliverCount.increment();
    		}
    	}
    	
@@ -1063,8 +1061,6 @@
 	   		{
 	   			toDeliver.take();
 	   			
-	   			this.toDeliverCount.decrement();
-	   			
 	   			performDelivery(dr.del.getReference(), dr.deliveryID, dr.getConsumer()); 
 	   			
 	   			delivered = true;
@@ -1300,7 +1296,6 @@
    			//Clear toDeliver
    			while (toDeliver.poll(0) != null)
    			{
-   				this.toDeliverCount.decrement();
    			}
    			
    			log.warn("Timed out waiting for response to arrive");
@@ -1370,16 +1365,12 @@
       		 //producer in flight (since np don't need to be replicated)
       		 toDeliver.put(rec);
       		 
-      		 this.toDeliverCount.increment();
-      		 
       		 //Race check (there's a small chance the message in the queue got removed between the empty check
       		 //and the put so we do another check:
       		 if (toDeliver.peek() == rec)
       		 {
       			 toDeliver.take();
       			 
-      			 this.toDeliverCount.decrement();
-      			 
       			 performDelivery(delivery.getReference(), deliveryId, consumer);
       		 }
       	 }
@@ -1403,11 +1394,11 @@
          	 
          	 toDeliver.put(rec);
          	 
-         	 this.toDeliverCount.increment();
-         	 
          	 postOffice.sendReplicateDeliveryMessage(consumer.getQueueName(), id,
                                                      delivery.getReference().getMessage().getMessageID(),
-                                                     deliveryId, true, false);
+                                                     deliveryId, true, false, dosync);
+         	 
+         	 performDelivery(delivery.getReference(), deliveryId, consumer);
       	 }
       	 else
       	 {
@@ -1420,10 +1411,7 @@
       		 // Actually do the delivery now - we are only node in the cluster
          	 performDelivery(delivery.getReference(), deliveryId, consumer); 	  
       	 }
-       }
-       
-       //log.info("del count " + this.toDeliverCount.get());
-
+       }       
    }
    
    void performDelivery(MessageReference ref, long deliveryID, ServerConsumerEndpoint consumer)

Modified: trunk/src/main/org/jboss/messaging/core/contract/PostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/contract/PostOffice.java	2007-10-19 12:42:17 UTC (rev 3204)
+++ trunk/src/main/org/jboss/messaging/core/contract/PostOffice.java	2007-10-19 13:22:08 UTC (rev 3205)
@@ -143,7 +143,7 @@
    
    //FIXME - these do not belong here - only here temporarily until we implement generic Handler/Message abstraction
    
-   void sendReplicateDeliveryMessage(String queueName, String sessionID, long messageID, long deliveryID, boolean reply, boolean sync)
+   void sendReplicateDeliveryMessage(String queueName, String sessionID, long messageID, long deliveryID, boolean reply, boolean sync, boolean bodge)
    	throws Exception;
 
 	void sendReplicateAckMessage(String queueName, long messageID) throws Exception;

Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java	2007-10-19 12:42:17 UTC (rev 3204)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java	2007-10-19 13:22:08 UTC (rev 3205)
@@ -28,6 +28,7 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Vector;
 
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.contract.ChannelFactory;
@@ -44,10 +45,6 @@
 import org.jgroups.util.Rsp;
 import org.jgroups.util.RspList;
 
-import EDU.oswego.cs.dl.util.concurrent.Executor;
-import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
 /**
  * 
  * This class handles the interface with JGroups
@@ -266,6 +263,37 @@
    	}
    }
    
+   public void unicastControl(ClusterRequest request, Address address, boolean sync) throws Exception
+   {
+   	if (startedState == STARTED)
+   	{   		
+	   	if (trace) { log.trace(this + " multicasting " + request + " to control channel, sync=" + sync); }
+	
+	   	Message message = new Message(address, null, writeRequest(request));
+
+	   	Vector v = new Vector();
+	   	v.add(address);
+	   	
+	   	RspList rspList =
+	   		dispatcher.castMessage(v, message, sync ? GroupRequest.GET_ALL: GroupRequest.GET_NONE, castTimeout);	
+	   	
+	   	if (sync)
+	   	{			   	
+		   	Iterator iter = rspList.values().iterator();
+		   	
+		   	while (iter.hasNext())
+		   	{
+		   		Rsp rsp = (Rsp)iter.next();
+		   		
+		   		if (!rsp.wasReceived())
+		   		{
+		   			throw new IllegalStateException(this + " response not received from " + rsp.getSender() + " - there may be others");
+		   		}
+		   	}		
+	   	}
+   	}
+   }
+   
    public void multicastData(ClusterRequest request) throws Exception
    {
    	if (startedState == STARTED)

Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-10-19 12:42:17 UTC (rev 3204)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-10-19 13:22:08 UTC (rev 3205)
@@ -612,7 +612,7 @@
    //TODO - these don't belong here
        
    public void sendReplicateDeliveryMessage(String queueName, String sessionID, long messageID, long deliveryID,
-   		                                   boolean reply, boolean sync)
+   		                                   boolean reply, boolean sync, boolean bodgesync)
    	throws Exception
    {
    	//We use a semaphore to limit the number of outstanding replicates we can send without getting a response
@@ -631,7 +631,7 @@
 	   	   	   		   
 	   	Address replyAddress = null;
 	   	
-	   	if (reply)
+	   	if (reply && !bodgesync)
 	   	{
 	   		//TODO optimise this
 	   		
@@ -648,8 +648,15 @@
 		   Address address = getFailoverNodeDataChannelAddress();
 		   	
 		   if (address != null)
-		   {	   
-		   	groupMember.unicastData(request, address);
+		   {	  
+		   	if (bodgesync)
+		   	{
+		   		groupMember.unicastControl(request, address, true);
+		   	}
+		   	else
+		   	{
+		   		groupMember.unicastData(request, address);
+		   	}
 		   }
    	}
    	catch (Exception e)




More information about the jboss-cvs-commits mailing list