[jboss-cvs] JBoss Messaging SVN: r3225 - in trunk: src/main/org/jboss/jms/server and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sat Oct 20 12:13:52 EDT 2007


Author: timfox
Date: 2007-10-20 12:13:52 -0400 (Sat, 20 Oct 2007)
New Revision: 3225

Added:
   trunk/src/main/org/jboss/messaging/util/Throttle.java
Modified:
   trunk/docs/userguide/en/modules/c_configuration.xml
   trunk/src/main/org/jboss/jms/server/ServerPeer.java
   trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java
Log:
Added throttle


Modified: trunk/docs/userguide/en/modules/c_configuration.xml
===================================================================
--- trunk/docs/userguide/en/modules/c_configuration.xml	2007-10-20 12:43:01 UTC (rev 3224)
+++ trunk/docs/userguide/en/modules/c_configuration.xml	2007-10-20 16:13:52 UTC (rev 3225)
@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <chapter id="c_configuration">
-   <title>JBoss Messaging Clustering Configuration</title>
+   <title>JBoss Messaging Clustering Notes</title>
    <para>JBoss Messaging clustering should work out of the box in most cases
    with no configuration changes. It is however crucial that every node is
    assigned a unique server id, as specified in the installation guide.</para>
@@ -31,16 +31,23 @@
    attribute in the server peer to true. By default this is false. The
    side-effect of setting this to true is that messages cannot be distributed
    as freely around the cluster</para>
-   <para>When pulling reliable messages from one node to another,
-   JBoss Messaging can use client acnowledgement or an XA transaction. The
-   default is client acknowledgement. Using XA transactions
-   is a fairly heavyweight operation but ensures absolute once and only once delivery.</para>
-   <para>
-   If the call to send a persistent message to a persistent destination returns successfully with no exception,
-   then you can be sure that the message was persisted.
-   However if the call doesn't return successfully e.g. if an exception is thrown, then you *can't be sure the message wasn't persisted*. Since the       failure might have occurred after persisting the message but before writing the response to the caller.
-This is a common attribute of any RPC type call: You can't tell by the call not returning that the call didn't actually succeed. Whether it's a web services call, an HTTP get request, an ejb invocation the same applies.
-The trick is to code your application so your operations are *idempotent* - i.e. they can be repeated without getting the system into an inconsistent state.
-With a message system you can do this on the application level, by checking for duplicate messages, and discarding them if they arrive. Duplicate checking is a very powerful technique that can remove the need for XA transactions in many cases.
-   </para>
+   <para>When pulling reliable messages from one node to another, JBoss
+   Messaging can use client acnowledgement or an XA transaction. The default
+   is client acknowledgement. Using XA transactions is a fairly heavyweight
+   operation but ensures absolute once and only once delivery.</para>
+   <para>If the call to send a persistent message to a persistent destination
+   returns successfully with no exception, then you can be sure that the
+   message was persisted. However if the call doesn't return successfully e.g.
+   if an exception is thrown, then you *can't be sure the message wasn't
+   persisted*. Since the failure might have occurred after persisting the
+   message but before writing the response to the caller. This is a common
+   attribute of any RPC type call: You can't tell by the call not returning
+   that the call didn't actually succeed. Whether it's a web services call, an
+   HTTP get request, an ejb invocation the same applies. The trick is to code
+   your application so your operations are *idempotent* - i.e. they can be
+   repeated without getting the system into an inconsistent state. With a
+   message system you can do this on the application level, by checking for
+   duplicate messages, and discarding them if they arrive. Duplicate checking
+   is a very powerful technique that can remove the need for XA transactions
+   in many cases.</para>
 </chapter>
\ No newline at end of file

Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java	2007-10-20 12:43:01 UTC (rev 3224)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java	2007-10-20 16:13:52 UTC (rev 3225)
@@ -1431,7 +1431,7 @@
       //    TODO - if I find a way not using UnifiedClassLoader3 directly, then get rid of
       //    <path refid="jboss.jmx.classpath"/> from jms/build.xml dependentmodule.classpath
       //    
-      
+   	
       String destType = isQueue ? "Queue" : "Topic";
       String className = "org.jboss.jms.server.destination." + destType + "Service";
       
@@ -1458,6 +1458,9 @@
                                             String jndiName, boolean params, int fullSize,
                                             int pageSize, int downCacheSize) throws Exception
    {
+   	log.trace("Deploying destination" + destinationMBeanConfig + " jndiName: " + jndiName +
+			       "fullSize: " + fullSize + " pageSize: " + pageSize + " downCacheSize: " + downCacheSize);
+     	
       MBeanServer mbeanServer = getServer();
 
       Element element = Util.stringToElement(destinationMBeanConfig);

Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-10-20 12:43:01 UTC (rev 3224)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-10-20 16:13:52 UTC (rev 3225)
@@ -77,6 +77,7 @@
 import org.jboss.messaging.core.impl.tx.TxCallback;
 import org.jboss.messaging.util.ConcurrentHashSet;
 import org.jboss.messaging.util.StreamUtils;
+import org.jboss.messaging.util.Throttle;
 import org.jgroups.Address;
 import org.jgroups.View;
 
@@ -223,10 +224,12 @@
    
    //We keep use a semaphore to limit the number of concurrent replication requests to avoid
    //overwhelming JGroups
-   private Semaphore replicateSemaphore;
+   //private Semaphore replicateSemaphore;
       
-   private int maxConcurrentReplications;
+   //private int maxConcurrentReplications;
    
+   private Throttle throttle = new Throttle(5000, 5);
+   
    // Constructors ---------------------------------------------------------------------------------
 
    /*
@@ -308,9 +311,9 @@
       
       nbSupport = new NotificationBroadcasterSupport();
       
-      this.maxConcurrentReplications = maxConcurrentReplications;
+    //  this.maxConcurrentReplications = maxConcurrentReplications;
       
-      replicateSemaphore = new Semaphore(maxConcurrentReplications, true);
+      //replicateSemaphore = new Semaphore(maxConcurrentReplications, true);
    }
       
    // MessagingComponent overrides -----------------------------------------------------------------
@@ -625,7 +628,7 @@
    	
    	if (reply)
    	{
-   		replicateSemaphore.acquire();
+   		//replicateSemaphore.acquire();
    	}
    	
    	try
@@ -653,6 +656,8 @@
 		   	
 		   if (address != null)
 		   {	   
+		   	throttle.ping();
+		   	
 		   	groupMember.unicastData(request, address);
 		   }
    	}
@@ -660,7 +665,7 @@
    	{
    		if (reply)
    		{
-   			replicateSemaphore.release();
+   		//	replicateSemaphore.release();
    		}
    		
    		throw e;
@@ -678,6 +683,8 @@
 	   	
 	   if (address != null)
 	   {	   
+	   	throttle.ping();
+	   	
 	   	groupMember.unicastData(request, address);
 	   }
 	}
@@ -1218,7 +1225,9 @@
    	
    	if (binding == null)
    	{
-   		throw new IllegalStateException("Cannot find queue with name " + queueName +" has it been deployed?");
+   		//This is ok - the queue might not have been deployed yet - when the queue is deployed it
+   		//will request for deliveries to be sent to it in a batch
+   		return;
    	}
    	
    	Queue queue = binding.queue;
@@ -1297,7 +1306,7 @@
    	//TODO - this does not belong here
    	ServerSessionEndpoint session = serverPeer.getSession(sessionID);
    	
-   	replicateSemaphore.release();
+   //	replicateSemaphore.release();
    	
    	if (session == null)
    	{
@@ -2811,13 +2820,12 @@
 	   	{
 	   		Map deliveries = new HashMap();
 	   		
-				//FIXME - this is ugly
+				//TODO - this is ugly
 				//Find a better way of getting the sessions
-	   		//We shouldn't know abou the server peer
+	   		//We shouldn't know about the server peer
 	   		
 	   		if (serverPeer != null)
-	   		{
-					
+	   		{					
 					Collection sessions = serverPeer.getSessions();
 					
 					Iterator iter2 = sessions.iterator();
@@ -2828,7 +2836,9 @@
 						
 						session.deliverAnyWaitingDeliveries(null);
 						
-						session.collectDeliveries(deliveries, firstNode, null);				
+						session.collectDeliveries(deliveries, firstNode, null);
+						
+					//	releaseAndReplaceSemaphore();						
 					}   				  
 					
 					if (!firstNode)
@@ -2851,16 +2861,21 @@
    	}
    	
    	//Now we replace the semaphore since some of the acks may not come back from the old failover node
-   	if (replicateSemaphore != null)
-   	{
-   		Semaphore oldSem = replicateSemaphore;
-
-   		replicateSemaphore = new Semaphore(maxConcurrentReplications);
-   		
-   		oldSem.release(maxConcurrentReplications);   		
-   	}
+  // 	releaseAndReplaceSemaphore();
    }
    
+//   private void releaseAndReplaceSemaphore()
+//   {
+//   	if (replicateSemaphore != null)
+//   	{
+//   		Semaphore oldSem = replicateSemaphore;
+//
+//   		replicateSemaphore = new Semaphore(maxConcurrentReplications);
+//   		
+//   		oldSem.release(maxConcurrentReplications);   		
+//   	}
+//   }
+//   
 
    /**
     * This method fails over all the queues from node <failedNodeId> onto this node. It is triggered
@@ -3052,6 +3067,8 @@
       				{
       					gotSome = true;
       				}
+      				
+      			//	releaseAndReplaceSemaphore();
       			}   				  
       			
       			if (gotSome)

Added: trunk/src/main/org/jboss/messaging/util/Throttle.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/Throttle.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/util/Throttle.java	2007-10-20 16:13:52 UTC (rev 3225)
@@ -0,0 +1,70 @@
+package org.jboss.messaging.util;
+
+import org.jboss.logging.Logger;
+
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+
+/**
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: $</tt>20 Oct 2007
+ *
+ * $Id: $
+ *
+ */
+public class Throttle
+{
+   protected Logger log = Logger.getLogger(Throttle.class);
+	
+   private long minSleep;
+   
+   private long every;
+	
+	public Throttle(double rate, long minSleep)
+	{
+		this.minSleep = minSleep;
+		
+		every = (long)(rate * minSleep) / 1000;
+		
+		log.info("every: " + every);
+	}
+	
+	private int count;
+	
+	public synchronized void ping()
+	{
+		count++;
+		
+		if (count == every)
+		{
+			try
+			{
+				Thread.sleep(minSleep);
+			}
+			catch (InterruptedException e)
+			{				
+			}
+			count = 0;
+		}
+	}
+}

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java	2007-10-20 12:43:01 UTC (rev 3224)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java	2007-10-20 16:13:52 UTC (rev 3225)
@@ -350,8 +350,12 @@
             
             log.info("starting server 1");
             ServerManagement.start(1, "all", false);
+            log.info("server 1 started");
+            log.info("*** TRYING TO DEPLOY QUEUE");
             ServerManagement.deployQueue("testDistributedQueue", 1);
+            log.info("DEPLOYED QUEUE");
             ServerManagement.deployTopic("testDistributedTopic", 1);
+            log.info("Deployed destinations");
             
             Thread.sleep(5000);
             
@@ -362,8 +366,10 @@
             
             log.info("Starting server 2");
             ServerManagement.start(2, "all", false);
+            log.info("server 2 started");
             ServerManagement.deployQueue("testDistributedQueue", 2);
             ServerManagement.deployTopic("testDistributedTopic", 2);
+            log.info("Deployed destinations");            
             
             Thread.sleep(5000);
             
@@ -374,9 +380,12 @@
             
             log.info("Starting server 1");
             ServerManagement.start(1, "all", false);
+            log.info("server 1 started");
             ServerManagement.deployQueue("testDistributedQueue", 1);
             ServerManagement.deployTopic("testDistributedTopic", 1);
+            log.info("Deployed destinations");            
             
+            
             Thread.sleep(5000);
             
             log.info("Killing server 2");




More information about the jboss-cvs-commits mailing list