[jboss-cvs] JBoss Messaging SVN: r3191 - in trunk: src/etc/xmdesc and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Oct 16 08:37:16 EDT 2007
Author: timfox
Date: 2007-10-16 08:37:16 -0400 (Tue, 16 Oct 2007)
New Revision: 3191
Modified:
trunk/src/etc/server/default/deploy/db2-persistence-service.xml
trunk/src/etc/server/default/deploy/mssql-persistence-service.xml
trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
trunk/src/etc/server/default/deploy/mysqlcluster-persistence-service.xml
trunk/src/etc/server/default/deploy/oracle-persistence-service.xml
trunk/src/etc/server/default/deploy/postgresql-persistence-service.xml
trunk/src/etc/server/default/deploy/sybase-persistence-service.xml
trunk/src/etc/xmdesc/MessagingPostOffice-xmbean.xml
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
trunk/src/main/org/jboss/messaging/core/jmx/MessagingPostOfficeService.java
trunk/tests/bin/runtest
trunk/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java
trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-1112
Modified: trunk/src/etc/server/default/deploy/db2-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/db2-persistence-service.xml 2007-10-15 13:03:24 UTC (rev 3190)
+++ trunk/src/etc/server/default/deploy/db2-persistence-service.xml 2007-10-16 12:37:16 UTC (rev 3191)
@@ -138,6 +138,11 @@
<attribute name="CastTimeout">5000</attribute>
+ <!-- Max number of concurrent replications -->
+
+ <attribute name="MaxConcurrentReplications">100</attribute>
+
+
<!-- Enable this when the JGroups multiplexer comes of age
<attribute name="ChannelFactoryName">jgroups.mux:name=Multiplexer</attribute>
<attribute name="ControlChannelName">udp-sync</attribute>
Modified: trunk/src/etc/server/default/deploy/mssql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/mssql-persistence-service.xml 2007-10-15 13:03:24 UTC (rev 3190)
+++ trunk/src/etc/server/default/deploy/mssql-persistence-service.xml 2007-10-16 12:37:16 UTC (rev 3191)
@@ -141,6 +141,10 @@
<attribute name="CastTimeout">5000</attribute>
+ <!-- Max number of concurrent replications -->
+
+ <attribute name="MaxConcurrentReplications">100</attribute>
+
<!-- Enable this when the JGroups multiplexer comes of age
<attribute name="ChannelFactoryName">jgroups.mux:name=Multiplexer</attribute>
<attribute name="ControlChannelName">udp-sync</attribute>
Modified: trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/mysql-persistence-service.xml 2007-10-15 13:03:24 UTC (rev 3190)
+++ trunk/src/etc/server/default/deploy/mysql-persistence-service.xml 2007-10-16 12:37:16 UTC (rev 3191)
@@ -141,6 +141,10 @@
<attribute name="CastTimeout">50000</attribute>
+ <!-- Max number of concurrent replications -->
+
+ <attribute name="MaxConcurrentReplications">100</attribute>
+
<!-- Enable this when the JGroups multiplexer comes of age
<attribute name="ChannelFactoryName">jgroups.mux:name=Multiplexer</attribute>
<attribute name="ControlChannelName">udp-sync</attribute>
Modified: trunk/src/etc/server/default/deploy/mysqlcluster-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/mysqlcluster-persistence-service.xml 2007-10-15 13:03:24 UTC (rev 3190)
+++ trunk/src/etc/server/default/deploy/mysqlcluster-persistence-service.xml 2007-10-16 12:37:16 UTC (rev 3191)
@@ -140,6 +140,10 @@
<!-- Max time to wait for a synchronous call to node members using the MessageDispatcher -->
<attribute name="CastTimeout">50000</attribute>
+
+ <!-- Max number of concurrent replications -->
+
+ <attribute name="MaxConcurrentReplications">100</attribute>
<!-- Enable this when the JGroups multiplexer comes of age
<attribute name="ChannelFactoryName">jgroups.mux:name=Multiplexer</attribute>
Modified: trunk/src/etc/server/default/deploy/oracle-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/oracle-persistence-service.xml 2007-10-15 13:03:24 UTC (rev 3190)
+++ trunk/src/etc/server/default/deploy/oracle-persistence-service.xml 2007-10-16 12:37:16 UTC (rev 3191)
@@ -141,6 +141,10 @@
<attribute name="CastTimeout">5000</attribute>
+ <!-- Max number of concurrent replications -->
+
+ <attribute name="MaxConcurrentReplications">100</attribute>
+
<!-- Enable this when the JGroups multiplexer comes of age
<attribute name="ChannelFactoryName">jgroups.mux:name=Multiplexer</attribute>
<attribute name="ControlChannelName">udp-sync</attribute>
Modified: trunk/src/etc/server/default/deploy/postgresql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/postgresql-persistence-service.xml 2007-10-15 13:03:24 UTC (rev 3190)
+++ trunk/src/etc/server/default/deploy/postgresql-persistence-service.xml 2007-10-16 12:37:16 UTC (rev 3191)
@@ -141,6 +141,10 @@
<attribute name="CastTimeout">5000</attribute>
+ <!-- Max number of concurrent replications -->
+
+ <attribute name="MaxConcurrentReplications">100</attribute>
+
<!-- Enable this when the JGroups multiplexer comes of age
<attribute name="ChannelFactoryName">jgroups.mux:name=Multiplexer</attribute>
<attribute name="ControlChannelName">udp-sync</attribute>
Modified: trunk/src/etc/server/default/deploy/sybase-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/sybase-persistence-service.xml 2007-10-15 13:03:24 UTC (rev 3190)
+++ trunk/src/etc/server/default/deploy/sybase-persistence-service.xml 2007-10-16 12:37:16 UTC (rev 3191)
@@ -146,6 +146,10 @@
<attribute name="CastTimeout">5000</attribute>
+ <!-- Max number of concurrent replications -->
+
+ <attribute name="MaxConcurrentReplications">100</attribute>
+
<!-- Enable this when the JGroups multiplexer comes of age
<attribute name="ChannelFactoryName">jgroups.mux:name=Multiplexer</attribute>
<attribute name="ControlChannelName">udp-sync</attribute>
Modified: trunk/src/etc/xmdesc/MessagingPostOffice-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/MessagingPostOffice-xmbean.xml 2007-10-15 13:03:24 UTC (rev 3190)
+++ trunk/src/etc/xmdesc/MessagingPostOffice-xmbean.xml 2007-10-16 12:37:16 UTC (rev 3191)
@@ -119,6 +119,12 @@
<type>java.lang.String</type>
</attribute>
+ <attribute access="read-write" getMethod="getMaxConcurrentReplications" setMethod="setMaxConcurrentReplications">
+ <description>The maximum number of concurrent replications</description>
+ <name>MaxConcurrentReplications</name>
+ <type>int</type>
+ </attribute>
+
<attribute access="read-only" getMethod="getNodeIDView">
<description>Get the set of nodes in the cluster</description>
<name>NodeIDView</name>
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-10-15 13:03:24 UTC (rev 3190)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-10-16 12:37:16 UTC (rev 3191)
@@ -987,8 +987,6 @@
return gotSome;
}
- private Object myLock = new Object();
-
public void replicateDeliveryResponseReceived(long deliveryID) throws Exception
{
//We look up the delivery in the list and actually perform the delivery
@@ -1002,115 +1000,8 @@
throw new java.lang.IllegalStateException("Cannot find delivery with id " + deliveryID);
}
- boolean delivered = false;
+ boolean delivered = false;
- //I have commented this out since we should be able guarantee responses come back in order if we use
- //a QueuedExecutor on the other node to send the response
-
-// //Note there will only be contention on this if two or more responses come back at the same time - which is unlikely
-// //TODO - This can occur since replicates are sent to the other node, and the responses are sent back using a pool which
-// //means earlier responses can be received after later ones -hence we need to cope with this
-// //However - if we used a queued executor on the other node to send back responses we could remove all this locking!!
-// synchronized (myLock)
-// {
-// long toWait = DELIVERY_WAIT_TIMEOUT;
-//
-// while (toWait > 0)
-// {
-// DeliveryRecord dr = (DeliveryRecord)toDeliver.peek();
-//
-// if (dr == null)
-// {
-// if (trace) { log.trace("No more deliveries in list"); }
-//
-// break;
-// }
-//
-// if (trace) { log.trace("Peeked delivery record: " + dr.deliveryID); }
-//
-// boolean wait = false;
-//
-// //Needs to be synchronized to prevent delivery occurring twice e.g. if this occurs at same time as collectDeliveries
-// synchronized (dr)
-// {
-// boolean performDelivery = false;
-//
-// if (dr.waitingForResponse)
-// {
-// if (dr == rec)
-// {
-// if (trace) { log.trace("Found our delivery"); }
-//
-// performDelivery = true;
-// }
-// else
-// {
-// if (!delivered)
-// {
-// //We have to wait for another response to arrive first
-//
-// if (trace) { log.trace("Not ours - need to wait"); }
-//
-// wait = true;
-// }
-// else
-// {
-// //We have delivered ours and possibly any non replicated deliveries too
-//
-// myLock.notify();
-//
-// break;
-// }
-// }
-// }
-// else
-// {
-// //Non replicated delivery
-//
-// if (trace) { log.trace("Non replicated delivery"); }
-//
-// performDelivery = true;
-// }
-//
-// if (performDelivery)
-// {
-// toDeliver.take();
-//
-// performDelivery(dr.del.getReference(), dr.deliveryID, dr.getConsumer());
-//
-// delivered = true;
-//
-// dr.waitingForResponse = false;
-//
-// delivered = true;
-// }
-// }
-//
-// if (wait)
-// {
-// long start = System.currentTimeMillis();
-//
-// try
-// {
-// if (trace) { log.trace("Waiting"); }
-//
-// //We need to wait since responses have come back out of order
-// myLock.wait(toWait);
-//
-// if (trace) { log.trace("Woke up"); }
-// }
-// catch (InterruptedException e)
-// {
-// }
-// toWait -= (System.currentTimeMillis() - start);
-// }
-// }
-// if (toWait <= 0)
-// {
-// throw new IllegalStateException("Timed out waiting for previous response to arrive");
-// }
-// }
-
while (true)
{
DeliveryRecord dr = (DeliveryRecord)toDeliver.peek();
@@ -1143,7 +1034,7 @@
{
//We have to wait for another response to arrive first
- throw new IllegalStateException("Reponses have come back our of order");
+ throw new IllegalStateException("Responses have come back our of order");
}
else
{
Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-10-15 13:03:24 UTC (rev 3190)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-10-16 12:37:16 UTC (rev 3191)
@@ -22,7 +22,6 @@
package org.jboss.messaging.core.impl.postoffice;
import java.io.Serializable;
-import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Types;
@@ -38,6 +37,7 @@
import java.util.Properties;
import java.util.Set;
import java.util.Vector;
+import java.util.concurrent.Semaphore;
import javax.management.ListenerNotFoundException;
import javax.management.MBeanNotificationInfo;
@@ -53,6 +53,7 @@
import org.jboss.jms.server.endpoint.ServerSessionEndpoint;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.contract.Binding;
+import org.jboss.messaging.core.contract.ChannelFactory;
import org.jboss.messaging.core.contract.ClusterNotification;
import org.jboss.messaging.core.contract.ClusterNotifier;
import org.jboss.messaging.core.contract.Condition;
@@ -60,7 +61,6 @@
import org.jboss.messaging.core.contract.Delivery;
import org.jboss.messaging.core.contract.Filter;
import org.jboss.messaging.core.contract.FilterFactory;
-import org.jboss.messaging.core.contract.ChannelFactory;
import org.jboss.messaging.core.contract.Message;
import org.jboss.messaging.core.contract.MessageReference;
import org.jboss.messaging.core.contract.MessageStore;
@@ -221,6 +221,10 @@
private volatile boolean firstNode;
+ //We keep use a semaphore to limit the number of concurrent replication requests to avoid
+ //overwhelming JGroups
+ private Semaphore replicateSemaphore;
+
// Constructors ---------------------------------------------------------------------------------
/*
@@ -287,7 +291,8 @@
String groupName,
ChannelFactory jChannelFactory,
long stateTimeout, long castTimeout,
- boolean supportsFailover)
+ boolean supportsFailover,
+ int maxConcurrentReplications)
throws Exception
{
this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms, pm, tr,
@@ -300,6 +305,8 @@
this.supportsFailover = supportsFailover;
nbSupport = new NotificationBroadcasterSupport();
+
+ replicateSemaphore = new Semaphore(maxConcurrentReplications, true);
}
// MessagingComponent overrides -----------------------------------------------------------------
@@ -603,36 +610,51 @@
}
//TODO - these don't belong here
-
+
public void sendReplicateDeliveryMessage(String queueName, String sessionID, long messageID, long deliveryID,
boolean reply, boolean sync)
throws Exception
{
- //There is no need to lock this while failover node change is occuring since the receiving node is tolerant to duplicate
- //adds or acks
-
- Address replyAddress = null;
+ //We use a semaphore to limit the number of outstanding replicates we can send without getting a response
+ //This is to prevent overwhelming JGroups
+ //See http://jira.jboss.com/jira/browse/JBMESSAGING-1112
- if (reply)
+ replicateSemaphore.acquire();
+
+ try
+ {
+ //There is no need to lock this while failover node change is occuring since the receiving node is tolerant to duplicate
+ //adds or acks
+
+ Address replyAddress = null;
+
+ if (reply)
+ {
+ //TODO optimise this
+
+ PostOfficeAddressInfo info = (PostOfficeAddressInfo)nodeIDAddressMap.get(new Integer(thisNodeID));
+
+ replyAddress = info.getDataChannelAddress();
+ }
+
+ ClusterRequest request = new ReplicateDeliveryMessage(thisNodeID, queueName, sessionID, messageID, deliveryID, replyAddress);
+
+ if (trace) { log.trace(this + " sending replicate delivery message " + queueName + " " + sessionID + " " + messageID); }
+
+ //TODO could be optimised too
+ Address address = getFailoverNodeDataChannelAddress();
+
+ if (address != null)
+ {
+ groupMember.unicastData(request, address);
+ }
+ }
+ catch (Exception e)
{
- //TODO optimise this
+ replicateSemaphore.release();
- PostOfficeAddressInfo info = (PostOfficeAddressInfo)nodeIDAddressMap.get(new Integer(thisNodeID));
-
- replyAddress = info.getDataChannelAddress();
+ throw e;
}
-
- ClusterRequest request = new ReplicateDeliveryMessage(thisNodeID, queueName, sessionID, messageID, deliveryID, replyAddress);
-
- if (trace) { log.trace(this + " sending replicate delivery message " + queueName + " " + sessionID + " " + messageID); }
-
- //TODO could be optimised too
- Address address = getFailoverNodeDataChannelAddress();
-
- if (address != null)
- {
- groupMember.unicastData(request, address);
- }
}
public void sendReplicateAckMessage(String queueName, long messageID) throws Exception
@@ -1257,19 +1279,22 @@
queue.removeFromRecoveryArea(nodeID, messageID);
}
+
public void handleReplicateDeliveryAck(String sessionID, long deliveryID) throws Exception
{
if (trace) { log.trace(this + " handleReplicateDeliveryAck " + sessionID + " " + deliveryID); }
- //TOD - this does not belong here
+ //TODO - this does not belong here
ServerSessionEndpoint session = serverPeer.getSession(sessionID);
+ replicateSemaphore.release();
+
if (session == null)
{
log.warn("Cannot find session " + sessionID);
return;
- }
+ }
session.replicateDeliveryResponseReceived(deliveryID);
}
Modified: trunk/src/main/org/jboss/messaging/core/jmx/MessagingPostOfficeService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/jmx/MessagingPostOfficeService.java 2007-10-15 13:03:24 UTC (rev 3190)
+++ trunk/src/main/org/jboss/messaging/core/jmx/MessagingPostOfficeService.java 2007-10-16 12:37:16 UTC (rev 3191)
@@ -95,6 +95,8 @@
private String groupName;
private boolean clustered;
+
+ private int maxConcurrentReplications = 100;
private MessagingPostOffice postOffice;
@@ -305,6 +307,16 @@
this.clustered = clustered;
}
+ public int getMaxConcurrentReplications()
+ {
+ return maxConcurrentReplications;
+ }
+
+ public void setMaxConcurrentReplications(int number)
+ {
+ this.maxConcurrentReplications = number;
+ }
+
public String listBindings()
{
return postOffice.printBindingInformation();
@@ -399,7 +411,8 @@
groupName,
jChannelFactory,
stateTimeout, castTimeout,
- serverPeer.isSupportsFailover());
+ serverPeer.isSupportsFailover(),
+ maxConcurrentReplications);
}
else
{
Modified: trunk/tests/bin/runtest
===================================================================
--- trunk/tests/bin/runtest 2007-10-15 13:03:24 UTC (rev 3190)
+++ trunk/tests/bin/runtest 2007-10-16 12:37:16 UTC (rev 3191)
@@ -174,7 +174,7 @@
fi
fi
-"$JAVA_HOME/bin/java" $JAVA_OPTS -Djgroups.bind_addr=localhost -cp "$CLASSPATH" \
+"$JAVA_HOME/bin/java" -Xmx1024M $JAVA_OPTS -Djgroups.bind_addr=localhost -cp "$CLASSPATH" \
org.jboss.test.messaging.tools.junit.SelectiveTestRunner $TARGET_CLASS $TARGET_TEST
#
Modified: trunk/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java 2007-10-15 13:03:24 UTC (rev 3190)
+++ trunk/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java 2007-10-16 12:37:16 UTC (rev 3191)
@@ -96,7 +96,7 @@
sc.getPostOfficeSQLProperties(), true, nodeID,
"Clustered", ms, pm, tr, ff, cf, idm, cn,
groupName, jChannelFactory,
- stateTimeout, castTimeout, true);
+ stateTimeout, castTimeout, true, 100);
postOffice.start();
Modified: trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2007-10-15 13:03:24 UTC (rev 3190)
+++ trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2007-10-16 12:37:16 UTC (rev 3191)
@@ -369,7 +369,7 @@
{
int index = Integer.parseInt(remoteDebugIndex);
- sb.append("-Xdebug -Xnoagent -Djava.compiler=NONE ").
+ sb.append("-Xmx1024M -Xdebug -Xnoagent -Djava.compiler=NONE ").
append("-Xrunjdwp:transport=dt_shmem,server=n,suspend=n,address=rmiserver_").
append(index).append(' ');
}
@@ -398,13 +398,9 @@
sb.append("-Dtest.bind.address=").append(bindAddress).append(' ');
- String jgroupsBindAddr = System.getProperty(org.jgroups.Global.BIND_ADDR);
+ //Use test.bind.address for the jgroups.bind_addr
- //If not found default to localhost
- if (jgroupsBindAddr == null)
- {
- jgroupsBindAddr = "localhost";
- }
+ String jgroupsBindAddr = bindAddress;
sb.append("-D").append(org.jgroups.Global.BIND_ADDR).append("=")
.append(jgroupsBindAddr).append(' ');
More information about the jboss-cvs-commits
mailing list