[jboss-cvs] JBoss Messaging SVN: r3225 - in trunk: src/main/org/jboss/jms/server and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat Oct 20 12:13:52 EDT 2007
Author: timfox
Date: 2007-10-20 12:13:52 -0400 (Sat, 20 Oct 2007)
New Revision: 3225
Added:
trunk/src/main/org/jboss/messaging/util/Throttle.java
Modified:
trunk/docs/userguide/en/modules/c_configuration.xml
trunk/src/main/org/jboss/jms/server/ServerPeer.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java
Log:
Added throttle
Modified: trunk/docs/userguide/en/modules/c_configuration.xml
===================================================================
--- trunk/docs/userguide/en/modules/c_configuration.xml 2007-10-20 12:43:01 UTC (rev 3224)
+++ trunk/docs/userguide/en/modules/c_configuration.xml 2007-10-20 16:13:52 UTC (rev 3225)
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<chapter id="c_configuration">
- <title>JBoss Messaging Clustering Configuration</title>
+ <title>JBoss Messaging Clustering Notes</title>
<para>JBoss Messaging clustering should work out of the box in most cases
with no configuration changes. It is however crucial that every node is
assigned a unique server id, as specified in the installation guide.</para>
@@ -31,16 +31,23 @@
attribute in the server peer to true. By default this is false. The
side-effect of setting this to true is that messages cannot be distributed
as freely around the cluster</para>
- <para>When pulling reliable messages from one node to another,
- JBoss Messaging can use client acnowledgement or an XA transaction. The
- default is client acknowledgement. Using XA transactions
- is a fairly heavyweight operation but ensures absolute once and only once delivery.</para>
- <para>
- If the call to send a persistent message to a persistent destination returns successfully with no exception,
- then you can be sure that the message was persisted.
- However if the call doesn't return successfully e.g. if an exception is thrown, then you *can't be sure the message wasn't persisted*. Since the failure might have occurred after persisting the message but before writing the response to the caller.
-This is a common attribute of any RPC type call: You can't tell by the call not returning that the call didn't actually succeed. Whether it's a web services call, an HTTP get request, an ejb invocation the same applies.
-The trick is to code your application so your operations are *idempotent* - i.e. they can be repeated without getting the system into an inconsistent state.
-With a message system you can do this on the application level, by checking for duplicate messages, and discarding them if they arrive. Duplicate checking is a very powerful technique that can remove the need for XA transactions in many cases.
- </para>
+ <para>When pulling reliable messages from one node to another, JBoss
+ Messaging can use client acnowledgement or an XA transaction. The default
+ is client acknowledgement. Using XA transactions is a fairly heavyweight
+ operation but ensures absolute once and only once delivery.</para>
+ <para>If the call to send a persistent message to a persistent destination
+ returns successfully with no exception, then you can be sure that the
+ message was persisted. However if the call doesn't return successfully e.g.
+ if an exception is thrown, then you *can't be sure the message wasn't
+ persisted*. Since the failure might have occurred after persisting the
+ message but before writing the response to the caller. This is a common
+ attribute of any RPC type call: You can't tell by the call not returning
+ that the call didn't actually succeed. Whether it's a web services call, an
+ HTTP get request, an ejb invocation the same applies. The trick is to code
+ your application so your operations are *idempotent* - i.e. they can be
+ repeated without getting the system into an inconsistent state. With a
+ message system you can do this on the application level, by checking for
+ duplicate messages, and discarding them if they arrive. Duplicate checking
+ is a very powerful technique that can remove the need for XA transactions
+ in many cases.</para>
</chapter>
\ No newline at end of file
Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java 2007-10-20 12:43:01 UTC (rev 3224)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java 2007-10-20 16:13:52 UTC (rev 3225)
@@ -1431,7 +1431,7 @@
// TODO - if I find a way not using UnifiedClassLoader3 directly, then get rid of
// <path refid="jboss.jmx.classpath"/> from jms/build.xml dependentmodule.classpath
//
-
+
String destType = isQueue ? "Queue" : "Topic";
String className = "org.jboss.jms.server.destination." + destType + "Service";
@@ -1458,6 +1458,9 @@
String jndiName, boolean params, int fullSize,
int pageSize, int downCacheSize) throws Exception
{
+ log.trace("Deploying destination" + destinationMBeanConfig + " jndiName: " + jndiName +
+ "fullSize: " + fullSize + " pageSize: " + pageSize + " downCacheSize: " + downCacheSize);
+
MBeanServer mbeanServer = getServer();
Element element = Util.stringToElement(destinationMBeanConfig);
Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-10-20 12:43:01 UTC (rev 3224)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-10-20 16:13:52 UTC (rev 3225)
@@ -77,6 +77,7 @@
import org.jboss.messaging.core.impl.tx.TxCallback;
import org.jboss.messaging.util.ConcurrentHashSet;
import org.jboss.messaging.util.StreamUtils;
+import org.jboss.messaging.util.Throttle;
import org.jgroups.Address;
import org.jgroups.View;
@@ -223,10 +224,12 @@
//We keep use a semaphore to limit the number of concurrent replication requests to avoid
//overwhelming JGroups
- private Semaphore replicateSemaphore;
+ //private Semaphore replicateSemaphore;
- private int maxConcurrentReplications;
+ //private int maxConcurrentReplications;
+ private Throttle throttle = new Throttle(5000, 5);
+
// Constructors ---------------------------------------------------------------------------------
/*
@@ -308,9 +311,9 @@
nbSupport = new NotificationBroadcasterSupport();
- this.maxConcurrentReplications = maxConcurrentReplications;
+ // this.maxConcurrentReplications = maxConcurrentReplications;
- replicateSemaphore = new Semaphore(maxConcurrentReplications, true);
+ //replicateSemaphore = new Semaphore(maxConcurrentReplications, true);
}
// MessagingComponent overrides -----------------------------------------------------------------
@@ -625,7 +628,7 @@
if (reply)
{
- replicateSemaphore.acquire();
+ //replicateSemaphore.acquire();
}
try
@@ -653,6 +656,8 @@
if (address != null)
{
+ throttle.ping();
+
groupMember.unicastData(request, address);
}
}
@@ -660,7 +665,7 @@
{
if (reply)
{
- replicateSemaphore.release();
+ // replicateSemaphore.release();
}
throw e;
@@ -678,6 +683,8 @@
if (address != null)
{
+ throttle.ping();
+
groupMember.unicastData(request, address);
}
}
@@ -1218,7 +1225,9 @@
if (binding == null)
{
- throw new IllegalStateException("Cannot find queue with name " + queueName +" has it been deployed?");
+ //This is ok - the queue might not have been deployed yet - when the queue is deployed it
+ //will request for deliveries to be sent to it in a batch
+ return;
}
Queue queue = binding.queue;
@@ -1297,7 +1306,7 @@
//TODO - this does not belong here
ServerSessionEndpoint session = serverPeer.getSession(sessionID);
- replicateSemaphore.release();
+ // replicateSemaphore.release();
if (session == null)
{
@@ -2811,13 +2820,12 @@
{
Map deliveries = new HashMap();
- //FIXME - this is ugly
+ //TODO - this is ugly
//Find a better way of getting the sessions
- //We shouldn't know abou the server peer
+ //We shouldn't know about the server peer
if (serverPeer != null)
- {
-
+ {
Collection sessions = serverPeer.getSessions();
Iterator iter2 = sessions.iterator();
@@ -2828,7 +2836,9 @@
session.deliverAnyWaitingDeliveries(null);
- session.collectDeliveries(deliveries, firstNode, null);
+ session.collectDeliveries(deliveries, firstNode, null);
+
+ // releaseAndReplaceSemaphore();
}
if (!firstNode)
@@ -2851,16 +2861,21 @@
}
//Now we replace the semaphore since some of the acks may not come back from the old failover node
- if (replicateSemaphore != null)
- {
- Semaphore oldSem = replicateSemaphore;
-
- replicateSemaphore = new Semaphore(maxConcurrentReplications);
-
- oldSem.release(maxConcurrentReplications);
- }
+ // releaseAndReplaceSemaphore();
}
+// private void releaseAndReplaceSemaphore()
+// {
+// if (replicateSemaphore != null)
+// {
+// Semaphore oldSem = replicateSemaphore;
+//
+// replicateSemaphore = new Semaphore(maxConcurrentReplications);
+//
+// oldSem.release(maxConcurrentReplications);
+// }
+// }
+//
/**
* This method fails over all the queues from node <failedNodeId> onto this node. It is triggered
@@ -3052,6 +3067,8 @@
{
gotSome = true;
}
+
+ // releaseAndReplaceSemaphore();
}
if (gotSome)
Added: trunk/src/main/org/jboss/messaging/util/Throttle.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/Throttle.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/util/Throttle.java 2007-10-20 16:13:52 UTC (rev 3225)
@@ -0,0 +1,70 @@
+package org.jboss.messaging.util;
+
+import org.jboss.logging.Logger;
+
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+
+/**
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: $</tt>20 Oct 2007
+ *
+ * $Id: $
+ *
+ */
+public class Throttle
+{
+ protected Logger log = Logger.getLogger(Throttle.class);
+
+ private long minSleep;
+
+ private long every;
+
+ public Throttle(double rate, long minSleep)
+ {
+ this.minSleep = minSleep;
+
+ every = (long)(rate * minSleep) / 1000;
+
+ log.info("every: " + every);
+ }
+
+ private int count;
+
+ public synchronized void ping()
+ {
+ count++;
+
+ if (count == every)
+ {
+ try
+ {
+ Thread.sleep(minSleep);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ count = 0;
+ }
+ }
+}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java 2007-10-20 12:43:01 UTC (rev 3224)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java 2007-10-20 16:13:52 UTC (rev 3225)
@@ -350,8 +350,12 @@
log.info("starting server 1");
ServerManagement.start(1, "all", false);
+ log.info("server 1 started");
+ log.info("*** TRYING TO DEPLOY QUEUE");
ServerManagement.deployQueue("testDistributedQueue", 1);
+ log.info("DEPLOYED QUEUE");
ServerManagement.deployTopic("testDistributedTopic", 1);
+ log.info("Deployed destinations");
Thread.sleep(5000);
@@ -362,8 +366,10 @@
log.info("Starting server 2");
ServerManagement.start(2, "all", false);
+ log.info("server 2 started");
ServerManagement.deployQueue("testDistributedQueue", 2);
ServerManagement.deployTopic("testDistributedTopic", 2);
+ log.info("Deployed destinations");
Thread.sleep(5000);
@@ -374,9 +380,12 @@
log.info("Starting server 1");
ServerManagement.start(1, "all", false);
+ log.info("server 1 started");
ServerManagement.deployQueue("testDistributedQueue", 1);
ServerManagement.deployTopic("testDistributedTopic", 1);
+ log.info("Deployed destinations");
+
Thread.sleep(5000);
log.info("Killing server 2");
More information about the jboss-cvs-commits
mailing list