[jboss-cvs] JBoss Messaging SVN: r3191 - in trunk: src/etc/xmdesc and 6 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Oct 16 08:37:16 EDT 2007


Author: timfox
Date: 2007-10-16 08:37:16 -0400 (Tue, 16 Oct 2007)
New Revision: 3191

Modified:
   trunk/src/etc/server/default/deploy/db2-persistence-service.xml
   trunk/src/etc/server/default/deploy/mssql-persistence-service.xml
   trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
   trunk/src/etc/server/default/deploy/mysqlcluster-persistence-service.xml
   trunk/src/etc/server/default/deploy/oracle-persistence-service.xml
   trunk/src/etc/server/default/deploy/postgresql-persistence-service.xml
   trunk/src/etc/server/default/deploy/sybase-persistence-service.xml
   trunk/src/etc/xmdesc/MessagingPostOffice-xmbean.xml
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
   trunk/src/main/org/jboss/messaging/core/jmx/MessagingPostOfficeService.java
   trunk/tests/bin/runtest
   trunk/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java
   trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-1112


Modified: trunk/src/etc/server/default/deploy/db2-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/db2-persistence-service.xml	2007-10-15 13:03:24 UTC (rev 3190)
+++ trunk/src/etc/server/default/deploy/db2-persistence-service.xml	2007-10-16 12:37:16 UTC (rev 3191)
@@ -138,6 +138,11 @@
       
       <attribute name="CastTimeout">5000</attribute>
       
+      <!-- Max number of concurrent replications -->
+      
+      <attribute name="MaxConcurrentReplications">100</attribute>
+      
+      
       <!-- Enable this when the JGroups multiplexer comes of age 
       <attribute name="ChannelFactoryName">jgroups.mux:name=Multiplexer</attribute>
       <attribute name="ControlChannelName">udp-sync</attribute>

Modified: trunk/src/etc/server/default/deploy/mssql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/mssql-persistence-service.xml	2007-10-15 13:03:24 UTC (rev 3190)
+++ trunk/src/etc/server/default/deploy/mssql-persistence-service.xml	2007-10-16 12:37:16 UTC (rev 3191)
@@ -141,6 +141,10 @@
       
       <attribute name="CastTimeout">5000</attribute>
       
+      <!-- Max number of concurrent replications -->
+      
+      <attribute name="MaxConcurrentReplications">100</attribute>            
+      
       <!-- Enable this when the JGroups multiplexer comes of age
       <attribute name="ChannelFactoryName">jgroups.mux:name=Multiplexer</attribute>      
       <attribute name="ControlChannelName">udp-sync</attribute>

Modified: trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/mysql-persistence-service.xml	2007-10-15 13:03:24 UTC (rev 3190)
+++ trunk/src/etc/server/default/deploy/mysql-persistence-service.xml	2007-10-16 12:37:16 UTC (rev 3191)
@@ -141,6 +141,10 @@
                   
       <attribute name="CastTimeout">50000</attribute>
       
+      <!-- Max number of concurrent replications -->
+      
+      <attribute name="MaxConcurrentReplications">100</attribute>
+      
       <!-- Enable this when the JGroups multiplexer comes of age
       <attribute name="ChannelFactoryName">jgroups.mux:name=Multiplexer</attribute>      
       <attribute name="ControlChannelName">udp-sync</attribute>

Modified: trunk/src/etc/server/default/deploy/mysqlcluster-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/mysqlcluster-persistence-service.xml	2007-10-15 13:03:24 UTC (rev 3190)
+++ trunk/src/etc/server/default/deploy/mysqlcluster-persistence-service.xml	2007-10-16 12:37:16 UTC (rev 3191)
@@ -140,6 +140,10 @@
       <!-- Max time to wait for a synchronous call to node members using the MessageDispatcher -->
 
       <attribute name="CastTimeout">50000</attribute>
+      
+      <!-- Max number of concurrent replications -->
+      
+      <attribute name="MaxConcurrentReplications">100</attribute>            
 
       <!-- Enable this when the JGroups multiplexer comes of age
       <attribute name="ChannelFactoryName">jgroups.mux:name=Multiplexer</attribute>

Modified: trunk/src/etc/server/default/deploy/oracle-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/oracle-persistence-service.xml	2007-10-15 13:03:24 UTC (rev 3190)
+++ trunk/src/etc/server/default/deploy/oracle-persistence-service.xml	2007-10-16 12:37:16 UTC (rev 3191)
@@ -141,6 +141,10 @@
                   
       <attribute name="CastTimeout">5000</attribute>
       
+      <!-- Max number of concurrent replications -->
+      
+      <attribute name="MaxConcurrentReplications">100</attribute>            
+      
       <!-- Enable this when the JGroups multiplexer comes of age
       <attribute name="ChannelFactoryName">jgroups.mux:name=Multiplexer</attribute>      
       <attribute name="ControlChannelName">udp-sync</attribute>

Modified: trunk/src/etc/server/default/deploy/postgresql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/postgresql-persistence-service.xml	2007-10-15 13:03:24 UTC (rev 3190)
+++ trunk/src/etc/server/default/deploy/postgresql-persistence-service.xml	2007-10-16 12:37:16 UTC (rev 3191)
@@ -141,6 +141,10 @@
       
       <attribute name="CastTimeout">5000</attribute>
       
+      <!-- Max number of concurrent replications -->
+      
+      <attribute name="MaxConcurrentReplications">100</attribute>
+                  
       <!-- Enable this when the JGroups multiplexer comes of age
       <attribute name="ChannelFactoryName">jgroups.mux:name=Multiplexer</attribute>      
       <attribute name="ControlChannelName">udp-sync</attribute>

Modified: trunk/src/etc/server/default/deploy/sybase-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/sybase-persistence-service.xml	2007-10-15 13:03:24 UTC (rev 3190)
+++ trunk/src/etc/server/default/deploy/sybase-persistence-service.xml	2007-10-16 12:37:16 UTC (rev 3191)
@@ -146,6 +146,10 @@
                   
       <attribute name="CastTimeout">5000</attribute>
       
+      <!-- Max number of concurrent replications -->
+      
+      <attribute name="MaxConcurrentReplications">100</attribute>
+                  
       <!-- Enable this when the JGroups multiplexer comes of age
       <attribute name="ChannelFactoryName">jgroups.mux:name=Multiplexer</attribute>      
       <attribute name="ControlChannelName">udp-sync</attribute>

Modified: trunk/src/etc/xmdesc/MessagingPostOffice-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/MessagingPostOffice-xmbean.xml	2007-10-15 13:03:24 UTC (rev 3190)
+++ trunk/src/etc/xmdesc/MessagingPostOffice-xmbean.xml	2007-10-16 12:37:16 UTC (rev 3191)
@@ -119,6 +119,12 @@
       <type>java.lang.String</type>
    </attribute>
    
+   <attribute access="read-write" getMethod="getMaxConcurrentReplications" setMethod="setMaxConcurrentReplications">
+      <description>The maximum number of concurrent replications</description>
+      <name>MaxConcurrentReplications</name>
+      <type>int</type>
+   </attribute>
+   
    <attribute access="read-only" getMethod="getNodeIDView">
       <description>Get the set of nodes in the cluster</description>
       <name>NodeIDView</name>

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-10-15 13:03:24 UTC (rev 3190)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-10-16 12:37:16 UTC (rev 3191)
@@ -987,8 +987,6 @@
    	return gotSome;
    }
       
-   private Object myLock = new Object();
-   
    public void replicateDeliveryResponseReceived(long deliveryID) throws Exception
    {
    	//We look up the delivery in the list and actually perform the delivery
@@ -1002,115 +1000,8 @@
    		throw new java.lang.IllegalStateException("Cannot find delivery with id " + deliveryID);
    	}
    	   	
-   	boolean delivered = false;
+   	boolean delivered = false;   	
    	
-   	//I have commented this out since we should be able guarantee responses come back in order if we use
-   	//a QueuedExecutor on the other node to send the response
-   	
-//   	//Note there will only be contention on this if two or more responses come back at the same time - which is unlikely
-//   	//TODO - This can occur since replicates are sent to the other node, and the responses are sent back using a pool which
-//   	//means earlier responses can be received after later ones -hence we need to cope with this
-//   	//However - if we used a queued executor on the other node to send back responses we could remove all this locking!!
-//   	synchronized (myLock)
-//   	{
-//   		long toWait = DELIVERY_WAIT_TIMEOUT;
-//   		
-//   		while (toWait > 0)
-//      	{
-//      		DeliveryRecord dr = (DeliveryRecord)toDeliver.peek();
-//      		      	      		
-//      		if (dr == null)
-//      		{
-//      			if (trace) { log.trace("No more deliveries in list"); }
-//      			
-//      			break;
-//      		}
-//      		
-//      		if (trace) { log.trace("Peeked delivery record: " + dr.deliveryID); }
-//      		
-//      		boolean wait = false;
-//      		
-//      		//Needs to be synchronized to prevent delivery occurring twice e.g. if this occurs at same time as collectDeliveries
-//      		synchronized (dr)
-//      		{	   		
-//   	   		boolean performDelivery = false;
-//   	   		
-//   	   		if (dr.waitingForResponse)
-//   	   		{
-//   	   			if (dr == rec)
-//   	   			{
-//   	   				if (trace) { log.trace("Found our delivery"); }
-//   	   				
-//   	   				performDelivery = true;
-//   	   			}
-//   	   			else
-//   	   			{
-//   	   				if (!delivered)
-//   	   				{
-//	   	   				//We have to wait for another response to arrive first
-//	   	   				
-//	   	   				if (trace) { log.trace("Not ours - need to wait"); }
-//	   	   				
-//	   	   				wait = true;
-//   	   				}
-//   	   				else
-//   	   				{
-//   	   					//We have delivered ours and possibly any non replicated deliveries too   	   					   	   					
-//   	   	   	   	
-//   	   	   	   	myLock.notify();
-//   	   	   	   	
-//   	   					break;
-//   	   				}
-//   	   			}
-//   	   		}
-//   	   		else
-//   	   		{
-//   	   			//Non replicated delivery
-//   	   			
-//   	   			if (trace) { log.trace("Non replicated delivery"); }
-//   	   			
-//   	   			performDelivery = true;
-//   	   		}
-//   	   		
-//   	   		if (performDelivery)
-//   	   		{
-//   	   			toDeliver.take();
-//   	   			
-//   	   			performDelivery(dr.del.getReference(), dr.deliveryID, dr.getConsumer()); 
-//   	   			
-//   	   			delivered = true;
-//   	   	   	
-//   	   	   	dr.waitingForResponse = false;
-//   	   	   	
-//   	   	   	delivered = true;
-//   	   		}
-//      		}
-//      		
-//      		if (wait)
-//      		{
-//   				long start = System.currentTimeMillis();
-//   				
-//      			try
-//      			{
-//      				if (trace) { log.trace("Waiting"); }
-//      				
-//      				//We need to wait since responses have come back out of order
-//      				myLock.wait(toWait);
-//      				
-//      				if (trace) { log.trace("Woke up"); }
-//      			}
-//      			catch (InterruptedException e)
-//      			{      				
-//      			}
-//      			toWait -= (System.currentTimeMillis() - start);
-//      		}      		
-//      	}
-//   		if (toWait <= 0)
-//   		{
-//   			throw new IllegalStateException("Timed out waiting for previous response to arrive");
-//   		}
-//   	}   	   	
-   	
 		while (true)
    	{
    		DeliveryRecord dr = (DeliveryRecord)toDeliver.peek();
@@ -1143,7 +1034,7 @@
 	   				{
    	   				//We have to wait for another response to arrive first
    	   				
-   	   				throw new IllegalStateException("Reponses have come back our of order");
+   	   				throw new IllegalStateException("Responses have come back our of order");
 	   				}
 	   				else
 	   				{

Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-10-15 13:03:24 UTC (rev 3190)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-10-16 12:37:16 UTC (rev 3191)
@@ -22,7 +22,6 @@
 package org.jboss.messaging.core.impl.postoffice;
 
 import java.io.Serializable;
-import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.Types;
@@ -38,6 +37,7 @@
 import java.util.Properties;
 import java.util.Set;
 import java.util.Vector;
+import java.util.concurrent.Semaphore;
 
 import javax.management.ListenerNotFoundException;
 import javax.management.MBeanNotificationInfo;
@@ -53,6 +53,7 @@
 import org.jboss.jms.server.endpoint.ServerSessionEndpoint;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.contract.Binding;
+import org.jboss.messaging.core.contract.ChannelFactory;
 import org.jboss.messaging.core.contract.ClusterNotification;
 import org.jboss.messaging.core.contract.ClusterNotifier;
 import org.jboss.messaging.core.contract.Condition;
@@ -60,7 +61,6 @@
 import org.jboss.messaging.core.contract.Delivery;
 import org.jboss.messaging.core.contract.Filter;
 import org.jboss.messaging.core.contract.FilterFactory;
-import org.jboss.messaging.core.contract.ChannelFactory;
 import org.jboss.messaging.core.contract.Message;
 import org.jboss.messaging.core.contract.MessageReference;
 import org.jboss.messaging.core.contract.MessageStore;
@@ -221,6 +221,10 @@
    
    private volatile boolean firstNode;
    
+   //We keep use a semaphore to limit the number of concurrent replication requests to avoid
+   //overwhelming JGroups
+   private Semaphore replicateSemaphore;
+      
    // Constructors ---------------------------------------------------------------------------------
 
    /*
@@ -287,7 +291,8 @@
                               String groupName,
                               ChannelFactory jChannelFactory,
                               long stateTimeout, long castTimeout,
-                              boolean supportsFailover)
+                              boolean supportsFailover,
+                              int maxConcurrentReplications)
       throws Exception
    {
    	this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms, pm, tr,
@@ -300,6 +305,8 @@
       this.supportsFailover = supportsFailover;
       
       nbSupport = new NotificationBroadcasterSupport();
+      
+      replicateSemaphore = new Semaphore(maxConcurrentReplications, true);
    }
       
    // MessagingComponent overrides -----------------------------------------------------------------
@@ -603,36 +610,51 @@
    }
    
    //TODO - these don't belong here
-    
+       
    public void sendReplicateDeliveryMessage(String queueName, String sessionID, long messageID, long deliveryID,
    		                                   boolean reply, boolean sync)
    	throws Exception
    {
-   	//There is no need to lock this while failover node change is occuring since the receiving node is tolerant to duplicate
-		//adds or acks
-   	   	   		   
-   	Address replyAddress = null;
+   	//We use a semaphore to limit the number of outstanding replicates we can send without getting a response
+   	//This is to prevent overwhelming JGroups
+   	//See http://jira.jboss.com/jira/browse/JBMESSAGING-1112
    	
-   	if (reply)
+   	replicateSemaphore.acquire();
+   	
+   	try
+   	{	   	   	
+	   	//There is no need to lock this while failover node change is occuring since the receiving node is tolerant to duplicate
+			//adds or acks
+	   	   	   		   
+	   	Address replyAddress = null;
+	   	
+	   	if (reply)
+	   	{
+	   		//TODO optimise this
+	   		
+	   		PostOfficeAddressInfo info = (PostOfficeAddressInfo)nodeIDAddressMap.get(new Integer(thisNodeID));
+	   		
+	   		replyAddress = info.getDataChannelAddress();
+	   	}
+	   	
+	   	ClusterRequest request = new ReplicateDeliveryMessage(thisNodeID, queueName, sessionID, messageID, deliveryID, replyAddress);
+	   	
+	   	if (trace) { log.trace(this + " sending replicate delivery message " + queueName + " " + sessionID + " " + messageID); }
+				   
+	   	//TODO could be optimised too
+		   Address address = getFailoverNodeDataChannelAddress();
+		   	
+		   if (address != null)
+		   {	   
+		   	groupMember.unicastData(request, address);
+		   }
+   	}
+   	catch (Exception e)
    	{
-   		//TODO optimise this
+   		replicateSemaphore.release();
    		
-   		PostOfficeAddressInfo info = (PostOfficeAddressInfo)nodeIDAddressMap.get(new Integer(thisNodeID));
-   		
-   		replyAddress = info.getDataChannelAddress();
+   		throw e;
    	}
-   	
-   	ClusterRequest request = new ReplicateDeliveryMessage(thisNodeID, queueName, sessionID, messageID, deliveryID, replyAddress);
-   	
-   	if (trace) { log.trace(this + " sending replicate delivery message " + queueName + " " + sessionID + " " + messageID); }
-			   
-   	//TODO could be optimised too
-	   Address address = getFailoverNodeDataChannelAddress();
-	   	
-	   if (address != null)
-	   {	   
-	   	groupMember.unicastData(request, address);
-	   }
    }
 
 	public void sendReplicateAckMessage(String queueName, long messageID) throws Exception
@@ -1257,19 +1279,22 @@
    	queue.removeFromRecoveryArea(nodeID, messageID);  
    }
    
+   
    public void handleReplicateDeliveryAck(String sessionID, long deliveryID) throws Exception
    {
    	if (trace) { log.trace(this + " handleReplicateDeliveryAck " + sessionID + " " + deliveryID); }
    	  	
-   	//TOD - this does not belong here
+   	//TODO - this does not belong here
    	ServerSessionEndpoint session = serverPeer.getSession(sessionID);
    	
+   	replicateSemaphore.release();
+   	
    	if (session == null)
    	{
    		log.warn("Cannot find session " + sessionID);
    		
    		return;
-   	}
+   	}   	   	
    	
    	session.replicateDeliveryResponseReceived(deliveryID);
    }

Modified: trunk/src/main/org/jboss/messaging/core/jmx/MessagingPostOfficeService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/jmx/MessagingPostOfficeService.java	2007-10-15 13:03:24 UTC (rev 3190)
+++ trunk/src/main/org/jboss/messaging/core/jmx/MessagingPostOfficeService.java	2007-10-16 12:37:16 UTC (rev 3191)
@@ -95,6 +95,8 @@
    private String groupName;
    
    private boolean clustered;
+   
+   private int maxConcurrentReplications = 100;
 
    private MessagingPostOffice postOffice;
 
@@ -305,6 +307,16 @@
        this.clustered = clustered;
    }
    
+   public int getMaxConcurrentReplications()
+   {
+   	return maxConcurrentReplications;
+   }
+   
+   public void setMaxConcurrentReplications(int number)
+   {
+   	this.maxConcurrentReplications = number;
+   }
+   
    public String listBindings()
    {
       return postOffice.printBindingInformation();
@@ -399,7 +411,8 @@
 	                                               groupName,
 	                                               jChannelFactory,
 	                                               stateTimeout, castTimeout,
-                                                  serverPeer.isSupportsFailover());
+                                                  serverPeer.isSupportsFailover(),
+                                                  maxConcurrentReplications);
          }
          else
          {

Modified: trunk/tests/bin/runtest
===================================================================
--- trunk/tests/bin/runtest	2007-10-15 13:03:24 UTC (rev 3190)
+++ trunk/tests/bin/runtest	2007-10-16 12:37:16 UTC (rev 3191)
@@ -174,7 +174,7 @@
    fi
 fi
 
-"$JAVA_HOME/bin/java" $JAVA_OPTS -Djgroups.bind_addr=localhost -cp "$CLASSPATH" \
+"$JAVA_HOME/bin/java" -Xmx1024M $JAVA_OPTS -Djgroups.bind_addr=localhost -cp "$CLASSPATH" \
 org.jboss.test.messaging.tools.junit.SelectiveTestRunner $TARGET_CLASS $TARGET_TEST
 
 #

Modified: trunk/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java	2007-10-15 13:03:24 UTC (rev 3190)
+++ trunk/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java	2007-10-16 12:37:16 UTC (rev 3191)
@@ -96,7 +96,7 @@
                                  sc.getPostOfficeSQLProperties(), true, nodeID,
                                  "Clustered", ms, pm, tr, ff, cf, idm, cn,
                                  groupName, jChannelFactory,
-                                 stateTimeout, castTimeout, true);
+                                 stateTimeout, castTimeout, true, 100);
       
       postOffice.start();
 

Modified: trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java	2007-10-15 13:03:24 UTC (rev 3190)
+++ trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java	2007-10-16 12:37:16 UTC (rev 3191)
@@ -369,7 +369,7 @@
       {
          int index = Integer.parseInt(remoteDebugIndex);
 
-         sb.append("-Xdebug -Xnoagent -Djava.compiler=NONE ").
+         sb.append("-Xmx1024M -Xdebug -Xnoagent -Djava.compiler=NONE ").
             append("-Xrunjdwp:transport=dt_shmem,server=n,suspend=n,address=rmiserver_").
             append(index).append(' ');
       }
@@ -398,13 +398,9 @@
 
       sb.append("-Dtest.bind.address=").append(bindAddress).append(' ');
 
-      String jgroupsBindAddr = System.getProperty(org.jgroups.Global.BIND_ADDR);
+      //Use test.bind.address for the jgroups.bind_addr
       
-      //If not found default to localhost
-      if (jgroupsBindAddr == null)
-      {
-      	jgroupsBindAddr = "localhost";
-      }
+      String jgroupsBindAddr = bindAddress;
       
       sb.append("-D").append(org.jgroups.Global.BIND_ADDR).append("=")
          .append(jgroupsBindAddr).append(' ');      




More information about the jboss-cvs-commits mailing list