[jboss-cvs] JBoss Messaging SVN: r5768 - in trunk: src/main/org/jboss/messaging/core/management/impl and 8 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jan 30 14:46:02 EST 2009
Author: timfox
Date: 2009-01-30 14:46:02 -0500 (Fri, 30 Jan 2009)
New Revision: 5768
Added:
trunk/src/main/org/jboss/messaging/core/postoffice/QueueBinding.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java
trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java
trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/RemoteQueueBinding.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java
trunk/src/schemas/jbm-queues.xsd
trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
Log:
more clustering
Modified: trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java 2009-01-30 17:20:35 UTC (rev 5767)
+++ trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java 2009-01-30 19:46:02 UTC (rev 5768)
@@ -166,6 +166,6 @@
TabularData getConnectors() throws Exception;
- void sendQueueInfoToQueue(String queueName) throws Exception;
+ void sendQueueInfoToQueue(String queueName, String address) throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java 2009-01-30 17:20:35 UTC (rev 5767)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java 2009-01-30 19:46:02 UTC (rev 5768)
@@ -544,9 +544,9 @@
return TransportConfigurationInfo.toTabularData(connectorConfigurations);
}
- public void sendQueueInfoToQueue(final String queueName) throws Exception
+ public void sendQueueInfoToQueue(final String queueName, final String address) throws Exception
{
- postOffice.sendQueueInfoToQueue(new SimpleString(queueName));
+ postOffice.sendQueueInfoToQueue(new SimpleString(queueName), new SimpleString(address));
}
// NotificationEmitter implementation ----------------------------
Modified: trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java 2009-01-30 17:20:35 UTC (rev 5767)
+++ trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java 2009-01-30 19:46:02 UTC (rev 5768)
@@ -239,9 +239,9 @@
return localControl.getConnectors();
}
- public void sendQueueInfoToQueue(final String queueName) throws Exception
+ public void sendQueueInfoToQueue(final String queueName, final String address) throws Exception
{
- localControl.sendQueueInfoToQueue(queueName);
+ localControl.sendQueueInfoToQueue(queueName, address);
}
public boolean addAddress(String address) throws Exception
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java 2009-01-30 17:20:35 UTC (rev 5767)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java 2009-01-30 19:46:02 UTC (rev 5768)
@@ -81,5 +81,5 @@
DuplicateIDCache getDuplicateIDCache(SimpleString address);
- void sendQueueInfoToQueue(SimpleString queueName) throws Exception;
+ void sendQueueInfoToQueue(SimpleString queueName, SimpleString address) throws Exception;
}
Added: trunk/src/main/org/jboss/messaging/core/postoffice/QueueBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/QueueBinding.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/QueueBinding.java 2009-01-30 19:46:02 UTC (rev 5768)
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.postoffice;
+
+/**
+ * A QueueBinding
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 30 Jan 2009 11:04:37
+ *
+ *
+ */
+public interface QueueBinding extends Binding
+{
+ int consumerCount();
+}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java 2009-01-30 17:20:35 UTC (rev 5767)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java 2009-01-30 19:46:02 UTC (rev 5768)
@@ -63,7 +63,7 @@
private final List<Binding> exclusiveBindings = new CopyOnWriteArrayList<Binding>();
public Collection<Binding> getBindings()
- {
+ {
return bindingsMap.values();
}
@@ -93,8 +93,8 @@
bindings.add(binding);
}
-
- bindingsMap.put(binding.getID(), binding);
+
+ bindingsMap.put(binding.getID(), binding);
}
public void removeBinding(final Binding binding)
@@ -120,7 +120,7 @@
}
}
- bindingsMap.remove(binding.getID());
+ bindingsMap.remove(binding.getID());
}
private void routeFromCluster(final ServerMessage message, final Transaction tx) throws Exception
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java 2009-01-30 17:20:35 UTC (rev 5767)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java 2009-01-30 19:46:02 UTC (rev 5768)
@@ -26,7 +26,8 @@
import java.util.List;
import org.jboss.messaging.core.filter.Filter;
-import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.postoffice.QueueBinding;
import org.jboss.messaging.core.server.Bindable;
import org.jboss.messaging.core.server.Consumer;
import org.jboss.messaging.core.server.Queue;
@@ -42,8 +43,10 @@
*
*
*/
-public class LocalQueueBinding implements Binding
+public class LocalQueueBinding implements QueueBinding
{
+ private static final Logger log = Logger.getLogger(LocalQueueBinding.class);
+
private final SimpleString address;
private final Queue queue;
@@ -142,12 +145,15 @@
public void willRoute(final ServerMessage message)
{
}
-
-
-
+
public boolean isQueueBinding()
{
return true;
}
+
+ public int consumerCount()
+ {
+ return queue.getConsumerCount();
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-01-30 17:20:35 UTC (rev 5767)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-01-30 19:46:02 UTC (rev 5768)
@@ -228,7 +228,7 @@
public void onNotification(final Notification notification)
- {
+ {
synchronized (notificationLock)
{
NotificationType type = notification.getType();
@@ -238,7 +238,7 @@
TypedProperties props = notification.getProperties();
SimpleString queueName = (SimpleString)props.getProperty(ManagementHelper.HDR_QUEUE_NAME);
-
+
SimpleString address = (SimpleString)props.getProperty(ManagementHelper.HDR_ADDRESS);
Integer transientID = (Integer)props.getProperty(ManagementHelper.HDR_BINDING_ID);
@@ -359,7 +359,7 @@
// and post office is activated but queue remains unactivated after failover so delivery never occurs
// even though failover is complete
public synchronized void addBinding(final Binding binding) throws Exception
- {
+ {
binding.setID(generateTransientID());
addBindingInMemory(binding);
@@ -572,7 +572,7 @@
- public void sendQueueInfoToQueue(final SimpleString queueName) throws Exception
+ public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleString address) throws Exception
{
//We send direct to the queue so we can send it to the same queue that is bound to the notifications adress - this is crucial for ensuring
//that queue infos and notifications are received in a contiguous consistent stream
@@ -600,43 +600,46 @@
for (QueueInfo info: queueInfos.values())
{
- message = createQueueInfoMessage(NotificationType.BINDING_ADDED, queueName);
-
- message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
- message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName());
- message.putIntProperty(ManagementHelper.HDR_BINDING_ID, info.getID());
-
- queue.preroute(message, null);
- queue.route(message, null);
-
- int consumersWithFilters = info.getFilterStrings() != null ? info.getFilterStrings().size() : 0;
-
- for (int i = 0; i < info.getNumberOfConsumers() - consumersWithFilters; i++)
+ if (info.getAddress().startsWith(address))
{
- message = createQueueInfoMessage(NotificationType.CONSUMER_CREATED, queueName);
+ message = createQueueInfoMessage(NotificationType.BINDING_ADDED, queueName);
- message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName());
+ message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
+ message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName());
+ message.putIntProperty(ManagementHelper.HDR_BINDING_ID, info.getID());
queue.preroute(message, null);
queue.route(message, null);
- }
-
- if (info.getFilterStrings() != null)
- {
- for (SimpleString filterString: info.getFilterStrings())
+
+ int consumersWithFilters = info.getFilterStrings() != null ? info.getFilterStrings().size() : 0;
+
+ for (int i = 0; i < info.getNumberOfConsumers() - consumersWithFilters; i++)
{
message = createQueueInfoMessage(NotificationType.CONSUMER_CREATED, queueName);
- message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName());
- message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
+ message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName());
queue.preroute(message, null);
queue.route(message, null);
}
- }
+
+ if (info.getFilterStrings() != null)
+ {
+ for (SimpleString filterString: info.getFilterStrings())
+ {
+ message = createQueueInfoMessage(NotificationType.CONSUMER_CREATED, queueName);
+
+ message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName());
+ message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
+
+
+ queue.preroute(message, null);
+ queue.route(message, null);
+ }
+ }
+ }
}
}
-
}
// Private -----------------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/RemoteQueueBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/RemoteQueueBinding.java 2009-01-30 17:20:35 UTC (rev 5767)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/RemoteQueueBinding.java 2009-01-30 19:46:02 UTC (rev 5768)
@@ -20,10 +20,9 @@
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
-
package org.jboss.messaging.core.server.cluster;
-import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.core.postoffice.QueueBinding;
import org.jboss.messaging.util.SimpleString;
/**
@@ -35,10 +34,9 @@
*
*
*/
-public interface RemoteQueueBinding extends Binding
+public interface RemoteQueueBinding extends QueueBinding
{
void addConsumer(SimpleString filterString) throws Exception;
-
- void removeConsumer(SimpleString filterString) throws Exception;
+ void removeConsumer(SimpleString filterString) throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java 2009-01-30 17:20:35 UTC (rev 5767)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java 2009-01-30 19:46:02 UTC (rev 5768)
@@ -293,10 +293,11 @@
"'" +
NotificationType.CONSUMER_CLOSED +
"') AND " +
+ "("+ ManagementHelper.HDR_ADDRESS + " IS NULL OR " +
ManagementHelper.HDR_ADDRESS +
" LIKE '" +
queueDataAddress +
- "%'");
+ "%')");
session.createQueue(DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS, notifQueueName, filter, false, true);
@@ -311,7 +312,8 @@
ManagementHelper.putOperationInvocation(message,
ManagementServiceImpl.getMessagingServerObjectName(),
"sendQueueInfoToQueue",
- notifQueueName.toString());
+ notifQueueName.toString(),
+ queueDataAddress);
ClientProducer prod = session.createProducer(ConfigurationImpl.DEFAULT_MANAGEMENT_ADDRESS);
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java 2009-01-30 17:20:35 UTC (rev 5767)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java 2009-01-30 19:46:02 UTC (rev 5768)
@@ -193,8 +193,6 @@
public synchronized void stop() throws Exception
{
- log.info("Stoping cluster connection");
-
if (!started)
{
return;
@@ -205,11 +203,8 @@
discoveryGroup.unregisterListener(this);
}
- log.info("Three are " + records.size() + " records");
-
for (MessageFlowRecord record : records.values())
{
- log.info("stopping record");
record.close();
}
@@ -317,7 +312,6 @@
record.setBridge(bridge);
- log.info("added record");
records.put(connectorPair, record);
bridge.start();
@@ -344,8 +338,6 @@
{
StringBuilder str = new StringBuilder(replaceWildcardChars(config.getFactoryClassName()));
- log.info("config is " + config);
-
if (config.getParams() != null)
{
if (!config.getParams().isEmpty())
@@ -393,9 +385,7 @@
public void close() throws Exception
{
- log.info("stopping bridge");
bridge.stop();
- log.info("stopped bridge");
for (RemoteQueueBinding binding : bindings.values())
{
@@ -424,8 +414,6 @@
firstReset = true;
- log.info("did reset");
-
return;
}
@@ -436,18 +424,16 @@
NotificationType type = NotificationType.valueOf(message.getProperty(ManagementHelper.HDR_NOTIFICATION_TYPE)
.toString());
-
- log.info("Got notification message " + type);
-
+
+
if (type == NotificationType.BINDING_ADDED)
- {
- log.info("queue created");
+ {
SimpleString uniqueName = UUIDGenerator.getInstance().generateSimpleStringUUID();
SimpleString queueAddress = (SimpleString)message.getProperty(ManagementHelper.HDR_ADDRESS);
SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);
-
+
SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
Integer queueID = (Integer)message.getProperty(ManagementHelper.HDR_BINDING_ID);
@@ -468,7 +454,6 @@
}
else if (type == NotificationType.BINDING_REMOVED)
{
- log.info("queue destroyed");
SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);
RemoteQueueBinding binding = bindings.remove(queueName);
@@ -477,7 +462,6 @@
}
else if (type == NotificationType.CONSUMER_CREATED)
{
- log.info("consumer created");
SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);
SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
@@ -488,7 +472,6 @@
}
else if (type == NotificationType.CONSUMER_CLOSED)
{
- log.info("consumer closed");
SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);
SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java 2009-01-30 17:20:35 UTC (rev 5767)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java 2009-01-30 19:46:02 UTC (rev 5768)
@@ -198,6 +198,8 @@
public void willRoute(final ServerMessage message)
{
+ log.info("routing to remote queue binding");
+
//We add a header with the name of the queue, holding a list of the transient ids of the queues to route to
//TODO - this can be optimised
@@ -282,5 +284,10 @@
consumerCount--;
}
+
+ public synchronized int consumerCount()
+ {
+ return consumerCount;
+ }
}
Modified: trunk/src/schemas/jbm-queues.xsd
===================================================================
--- trunk/src/schemas/jbm-queues.xsd 2009-01-30 17:20:35 UTC (rev 5767)
+++ trunk/src/schemas/jbm-queues.xsd 2009-01-30 19:46:02 UTC (rev 5768)
@@ -85,5 +85,8 @@
<xsd:attribute name="durable" type="xsd:boolean"
use="optional">
</xsd:attribute>
+ <xsd:attribute name="insert-duplicate-detection-header" type="xsd:boolean"
+ use="optional">
+ </xsd:attribute>
</xsd:complexType>
</xsd:schema>
Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java 2009-01-30 19:46:02 UTC (rev 5768)
@@ -0,0 +1,503 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.distribution;
+
+import static org.jboss.messaging.core.remoting.impl.invm.TransportConstants.SERVER_ID_PROP_NAME;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.BridgeConfiguration;
+import org.jboss.messaging.core.config.cluster.ClusterConnectionConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.core.postoffice.Bindings;
+import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.postoffice.QueueBinding;
+import org.jboss.messaging.core.postoffice.impl.LocalQueueBinding;
+import org.jboss.messaging.core.server.Messaging;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.cluster.RemoteQueueBinding;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.util.Pair;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A ClusterTestBase
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 30 Jan 2009 11:29:43
+ *
+ *
+ */
+public class ClusterTestBase extends ServiceTestBase
+{
+ private static final Logger log = Logger.getLogger(ClusterTestBase.class);
+
+ private static final long WAIT_TIMEOUT = 10000;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ clearData();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ // Private -------------------------------------------------------------------------------------------------------
+
+ private static final int MAX_CONSUMERS = 100;
+
+ private ClientConsumer[] consumers = new ClientConsumer[MAX_CONSUMERS];
+
+ private static final SimpleString COUNT_PROP = new SimpleString("count_prop");
+
+ private static final SimpleString FILTER_PROP = new SimpleString("animal");
+
+ private static final int MAX_SERVERS = 10;
+
+ private MessagingService[] services = new MessagingService[MAX_SERVERS];
+
+ private ClientSessionFactory[] sfs = new ClientSessionFactory[MAX_SERVERS];
+
+ protected void waitForBindings(int node,
+ final String address,
+ final int count,
+ final int consumerCount,
+ final boolean local) throws Exception
+ {
+ //log.info("waiting for bindings on node " + node + " address " + address + " count " + count + " consumerCount " + consumerCount + " local " + local);
+ MessagingService service = this.services[node];
+
+ if (service == null)
+ {
+ throw new IllegalArgumentException("No service at " + node);
+ }
+
+ PostOffice po = service.getServer().getPostOffice();
+
+ long start = System.currentTimeMillis();
+
+ do
+ {
+ Bindings bindings = po.getBindingsForAddress(new SimpleString(address));
+
+ int bindingCount = 0;
+
+ int totConsumers = 0;
+
+ for (Binding binding : bindings.getBindings())
+ {
+ if ((binding instanceof LocalQueueBinding && local) || (binding instanceof RemoteQueueBinding && !local))
+ {
+ QueueBinding qBinding = (QueueBinding)binding;
+
+ bindingCount++;
+
+ totConsumers += qBinding.consumerCount();
+ }
+ }
+
+ //log.info("binding count " + bindingCount + " consumer Count " + totConsumers);
+
+ if (bindingCount == count && totConsumers == consumerCount)
+ {
+ log.info("Waited " + (System.currentTimeMillis() - start));
+ return;
+ }
+
+ Thread.sleep(10);
+ }
+ while (System.currentTimeMillis() - start < WAIT_TIMEOUT);
+
+ throw new IllegalStateException("Timed out waiting for bindings");
+ }
+
+ protected void createQueue(int node, String address, String queueName, String filterVal, boolean durable) throws Exception
+ {
+ ClientSessionFactory sf = this.sfs[node];
+
+ if (sf == null)
+ {
+ throw new IllegalArgumentException("No sf at " + node);
+ }
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ String filterString = null;
+
+ if (filterVal != null)
+ {
+ filterString = FILTER_PROP.toString() + "='" + filterVal + "'";
+ }
+
+ session.createQueue(address, queueName, filterString, durable, false);
+
+ session.close();
+ }
+
+ protected void deleteQueue(int node, String queueName) throws Exception
+ {
+ ClientSessionFactory sf = this.sfs[node];
+
+ if (sf == null)
+ {
+ throw new IllegalArgumentException("No sf at " + node);
+ }
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.deleteQueue(queueName);
+
+ session.close();
+ }
+
+ protected void addConsumer(int consumerID, int node, String queueName, String filterVal) throws Exception
+ {
+ if (consumers[consumerID] != null)
+ {
+ throw new IllegalArgumentException("Already a consumer at " + node);
+ }
+
+ ClientSessionFactory sf = this.sfs[node];
+
+ if (sf == null)
+ {
+ throw new IllegalArgumentException("No sf at " + node);
+ }
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ String filterString = null;
+
+ if (filterVal != null)
+ {
+ filterString = FILTER_PROP.toString() + "='" + filterVal + "'";
+ }
+
+ ClientConsumer consumer = session.createConsumer(queueName, filterString);
+
+ session.start();
+
+ consumers[consumerID] = consumer;
+ }
+
+ protected void removeConsumer(int consumerID) throws Exception
+ {
+ ClientConsumer consumer = consumers[consumerID];
+
+ if (consumer == null)
+ {
+ throw new IllegalArgumentException("No consumer at " + consumerID);
+ }
+
+ consumer.close();
+ }
+
+ protected void send(int node, String address, int numMessages, boolean durable, String filterVal) throws Exception
+ {
+ ClientSessionFactory sf = this.sfs[node];
+
+ if (sf == null)
+ {
+ throw new IllegalArgumentException("No sf at " + node);
+ }
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ ClientProducer producer = session.createProducer(address);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(durable);
+
+ if (filterVal != null)
+ {
+ message.putStringProperty(FILTER_PROP, new SimpleString(filterVal));
+ }
+
+ message.putIntProperty(COUNT_PROP, i);
+
+ producer.send(message);
+ }
+
+ session.close();
+ }
+
+ protected void verifyReceiveAll(int numMessages, int... consumerIDs) throws Exception
+ {
+ for (int i = 0; i < consumerIDs.length; i++)
+ {
+ ClientConsumer consumer = consumers[consumerIDs[i]];
+
+ if (consumer == null)
+ {
+ throw new IllegalArgumentException("No consumer at " + consumerIDs[i]);
+ }
+
+ for (int j = 0; j < numMessages; j++)
+ {
+ ClientMessage message = consumer.receive(500);
+
+ assertNotNull(message);
+
+ assertEquals(j, message.getProperty(COUNT_PROP));
+ }
+ }
+ }
+
+ protected void verifyReceiveRoundRobin(int numMessages, int... consumerIDs) throws Exception
+ {
+ int count = 0;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientConsumer consumer = consumers[consumerIDs[count]];
+
+ if (consumer == null)
+ {
+ throw new IllegalArgumentException("No consumer at " + consumerIDs[i]);
+ }
+
+ ClientMessage message = consumer.receive(500);
+
+ assertNotNull(message);
+
+ assertEquals(i, message.getProperty(COUNT_PROP));
+
+ count++;
+
+ if (count == consumerIDs.length)
+ {
+ count = 0;
+ }
+ }
+ }
+
+ protected void verifyNotReceive(int... consumerIDs) throws Exception
+ {
+ for (int i = 0; i < consumerIDs.length; i++)
+ {
+ ClientConsumer consumer = consumers[consumerIDs[i]];
+
+ if (consumer == null)
+ {
+ throw new IllegalArgumentException("No consumer at " + consumerIDs[i]);
+ }
+
+ assertNull(consumer.receive(200));
+ }
+ }
+
+ protected void setupSessionFactory(int node, boolean netty)
+ {
+ if (sfs[node] != null)
+ {
+ throw new IllegalArgumentException("Already a service at " + node);
+ }
+
+ Map<String, Object> params = generateParams(node, netty);
+
+ TransportConfiguration serverTotc;
+
+ if (netty)
+ {
+ serverTotc = new TransportConfiguration(NETTY_CONNECTOR_FACTORY, params);
+ }
+ else
+ {
+ serverTotc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params);
+ }
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(serverTotc);
+
+ sfs[node] = sf;
+ }
+
+ protected void setupServer(int node, boolean fileStorage, boolean netty)
+ {
+ if (services[node] != null)
+ {
+ throw new IllegalArgumentException("Already a service at node " + node);
+ }
+
+ Configuration configuration = new ConfigurationImpl();
+
+ configuration.setSecurityEnabled(false);
+ configuration.setBindingsDirectory(getBindingsDir(node));
+ configuration.setJournalMinFiles(2);
+ configuration.setJournalDirectory(getJournalDir(node));
+ configuration.setJournalFileSize(100 * 1024);
+ configuration.setPagingDirectory(getPageDir(node));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node));
+ configuration.setClustered(true);
+
+ configuration.getAcceptorConfigurations().clear();
+
+ Map<String, Object> params = generateParams(node, netty);
+
+ TransportConfiguration invmtc = new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params);
+ configuration.getAcceptorConfigurations().add(invmtc);
+
+ if (netty)
+ {
+ TransportConfiguration nettytc = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
+ configuration.getAcceptorConfigurations().add(nettytc);
+ }
+
+ MessagingService service;
+
+ if (fileStorage)
+ {
+ service = Messaging.newMessagingService(configuration);
+ }
+ else
+ {
+ service = Messaging.newNullStorageMessagingService(configuration);
+ }
+ services[node] = service;
+ }
+
+ private Map<String, Object> generateParams(int node, boolean netty)
+ {
+ Map<String, Object> params = new HashMap<String, Object>();
+ params.put(SERVER_ID_PROP_NAME, node);
+
+ if (netty)
+ {
+ params.put(org.jboss.messaging.integration.transports.netty.TransportConstants.PORT_PROP_NAME,
+ org.jboss.messaging.integration.transports.netty.TransportConstants.DEFAULT_PORT + node);
+ }
+
+ return params;
+ }
+
+ protected void clearServer(int node)
+ {
+ if (services[node] != null)
+ {
+ throw new IllegalArgumentException("No service at node " + node);
+ }
+
+ services[node] = null;
+ }
+
+ protected void setupClusterConnection(String name,
+ int nodeFrom,
+ int nodeTo,
+ String address,
+ boolean forwardWhenNoConsumers,
+ boolean netty)
+ {
+ MessagingService serviceFrom = services[nodeFrom];
+
+ Map<String, TransportConfiguration> connectors = serviceFrom.getServer()
+ .getConfiguration()
+ .getConnectorConfigurations();
+
+ Map<String, Object> params = generateParams(nodeTo, netty);
+
+ TransportConfiguration serverTotc;
+
+ if (netty)
+ {
+ serverTotc = new TransportConfiguration(NETTY_CONNECTOR_FACTORY, params);
+ }
+ else
+ {
+ serverTotc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params);
+ }
+
+ connectors.put(serverTotc.getName(), serverTotc);
+
+ serviceFrom.getServer().getConfiguration().setConnectorConfigurations(connectors);
+
+ Pair<String, String> connectorPair = new Pair<String, String>(serverTotc.getName(), null);
+
+ List<Pair<String, String>> pairs = new ArrayList<Pair<String, String>>();
+ pairs.add(connectorPair);
+
+ BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(null,
+ null,
+ null,
+ null,
+ 1,
+ -1,
+ null,
+ 10,
+ 1d,
+ -1,
+ -1,
+ false,
+ connectorPair);
+
+ ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
+ address,
+ bridgeConfiguration,
+ true,
+ forwardWhenNoConsumers,
+ pairs);
+ List<ClusterConnectionConfiguration> clusterConfs = serviceFrom.getServer()
+ .getConfiguration()
+ .getClusterConfigurations();
+
+ clusterConfs.add(clusterConf);
+
+ serviceFrom.getServer().getConfiguration().setClusterConfigurations(clusterConfs);
+ }
+
+ protected void startServers(int... nodes) throws Exception
+ {
+ for (int i = 0; i < nodes.length; i++)
+ {
+ services[nodes[i]].start();
+ }
+ }
+
+ protected void stopServers(int... nodes) throws Exception
+ {
+ for (int i = 0; i < nodes.length; i++)
+ {
+ services[nodes[i]].stop();
+ }
+ }
+
+}
Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2009-01-30 19:46:02 UTC (rev 5768)
@@ -0,0 +1,314 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.integration.cluster.distribution;
+
+/**
+ * A OnewayTwoNodeClusterTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 30 Jan 2009 18:03:28
+ *
+ *
+ */
+public class OnewayTwoNodeClusterTest extends ClusterTestBase
+{
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ setupServer(0, false, false);
+ setupServer(1, false, false);
+
+ setupClusterConnection("cluster1", 0, 1, "queues", false, false);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ stopServers(0, 1);
+
+ super.tearDown();
+ }
+
+ public void testStartTargetServerBeforeSourceServer() throws Exception
+ {
+ startServers(1, 0);
+
+ setupSessionFactory(0, false);
+ setupSessionFactory(1, false);
+
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+ addConsumer(0, 1, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+ verifyReceiveAll(10, 0);
+ verifyNotReceive(0);
+ }
+
+ public void testStartSourceServerBeforeTargetServer() throws Exception
+ {
+ startServers(0, 1);
+
+ setupSessionFactory(0, false);
+ setupSessionFactory(1, false);
+
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+ addConsumer(0, 1, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+ verifyReceiveAll(10, 0);
+ verifyNotReceive(0);
+ }
+
+ public void testBasicLocalReceive() throws Exception
+ {
+ startServers(1, 0);
+
+ setupSessionFactory(0, false);
+ setupSessionFactory(1, false);
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ addConsumer(0, 0, "queue0", null);
+
+ send(0, "queues.testaddress", 10, false, null);
+ verifyReceiveAll(10, 0);
+ verifyNotReceive(0);
+
+ addConsumer(1, 0, "queue0", null);
+ verifyNotReceive(1);
+ }
+
+ public void testBasicRoundRobin() throws Exception
+ {
+ startServers(1, 0);
+
+ setupSessionFactory(0, false);
+ setupSessionFactory(1, false);
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+
+ addConsumer(1, 1, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(0, "queues.testaddress", 1, 1, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+ verifyReceiveRoundRobin(10, 0, 1);
+ verifyNotReceive(0, 1);
+ }
+
+ public void testRoundRobinMultipleQueues() throws Exception
+ {
+ startServers(1, 0);
+
+ setupSessionFactory(0, false);
+ setupSessionFactory(1, false);
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+
+ createQueue(0, "queues.testaddress", "queue1", null, false);
+ createQueue(1, "queues.testaddress", "queue1", null, false);
+
+ createQueue(0, "queues.testaddress", "queue2", null, false);
+ createQueue(1, "queues.testaddress", "queue2", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+
+ addConsumer(2, 0, "queue1", null);
+ addConsumer(3, 1, "queue1", null);
+
+ addConsumer(4, 0, "queue2", null);
+ addConsumer(5, 1, "queue2", null);
+
+ waitForBindings(0, "queues.testaddress", 3, 3, true);
+ waitForBindings(0, "queues.testaddress", 3, 3, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveRoundRobin(10, 0, 1);
+
+ verifyReceiveRoundRobin(10, 2, 3);
+
+ verifyReceiveRoundRobin(10, 4, 5);
+
+ verifyNotReceive(0, 1, 2, 3, 4, 5);
+ }
+
+ public void testMultipleNonLoadBalancedQueues() throws Exception
+ {
+ startServers(1, 0);
+
+ setupSessionFactory(0, false);
+ setupSessionFactory(1, false);
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(0, "queues.testaddress", "queue1", null, false);
+ createQueue(0, "queues.testaddress", "queue2", null, false);
+ createQueue(0, "queues.testaddress", "queue3", null, false);
+ createQueue(0, "queues.testaddress", "queue4", null, false);
+
+
+ createQueue(1, "queues.testaddress", "queue5", null, false);
+ createQueue(1, "queues.testaddress", "queue6", null, false);
+ createQueue(1, "queues.testaddress", "queue7", null, false);
+ createQueue(1, "queues.testaddress", "queue8", null, false);
+ createQueue(1, "queues.testaddress", "queue9", null, false);
+
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 0, "queue1", null);
+ addConsumer(2, 0, "queue2", null);
+ addConsumer(3, 0, "queue3", null);
+ addConsumer(4, 0, "queue4", null);
+
+ addConsumer(5, 1, "queue5", null);
+ addConsumer(6, 1, "queue6", null);
+ addConsumer(7, 1, "queue7", null);
+ addConsumer(8, 1, "queue8", null);
+ addConsumer(9, 1, "queue9", null);
+
+ waitForBindings(0, "queues.testaddress", 5, 5, true);
+ waitForBindings(0, "queues.testaddress", 5, 5, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+
+ verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+ }
+
+ public void testMixtureLoadBalancedAndNonLoadBalancedQueues() throws Exception
+ {
+ startServers(1, 0);
+
+ setupSessionFactory(0, false);
+ setupSessionFactory(1, false);
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(0, "queues.testaddress", "queue1", null, false);
+ createQueue(0, "queues.testaddress", "queue2", null, false);
+ createQueue(0, "queues.testaddress", "queue3", null, false);
+ createQueue(0, "queues.testaddress", "queue4", null, false);
+
+
+ createQueue(1, "queues.testaddress", "queue5", null, false);
+ createQueue(1, "queues.testaddress", "queue6", null, false);
+ createQueue(1, "queues.testaddress", "queue7", null, false);
+ createQueue(1, "queues.testaddress", "queue8", null, false);
+ createQueue(1, "queues.testaddress", "queue9", null, false);
+
+ createQueue(0, "queues.testaddress", "queue10", null, false);
+ createQueue(1, "queues.testaddress", "queue10", null, false);
+
+ createQueue(0, "queues.testaddress", "queue11", null, false);
+ createQueue(1, "queues.testaddress", "queue11", null, false);
+
+ createQueue(0, "queues.testaddress", "queue12", null, false);
+ createQueue(1, "queues.testaddress", "queue12", null, false);
+
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 0, "queue1", null);
+ addConsumer(2, 0, "queue2", null);
+ addConsumer(3, 0, "queue3", null);
+ addConsumer(4, 0, "queue4", null);
+
+ addConsumer(5, 1, "queue5", null);
+ addConsumer(6, 1, "queue6", null);
+ addConsumer(7, 1, "queue7", null);
+ addConsumer(8, 1, "queue8", null);
+ addConsumer(9, 1, "queue9", null);
+
+ addConsumer(10, 0, "queue10", null);
+ addConsumer(11, 1, "queue10", null);
+
+ addConsumer(12, 0, "queue11", null);
+ addConsumer(13, 1, "queue11", null);
+
+ addConsumer(14, 0, "queue12", null);
+ addConsumer(15, 1, "queue12", null);
+
+ waitForBindings(0, "queues.testaddress", 8, 8, true);
+ waitForBindings(0, "queues.testaddress", 8, 8, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+
+ verifyReceiveRoundRobin(10, 10, 11);
+ verifyReceiveRoundRobin(10, 12, 13);
+ verifyReceiveRoundRobin(10, 14, 15);
+
+ verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
+ }
+
+ public void testNotRouteToNonMatchingAddress() throws Exception
+ {
+ startServers(1, 0);
+
+ setupSessionFactory(0, false);
+ setupSessionFactory(1, false);
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue1", null, false);
+
+ createQueue(0, "queues.testaddress2", "queue2", null, false);
+ createQueue(1, "queues.testaddress2", "queue2", null, false);
+ createQueue(0, "queues.testaddress2", "queue3", null, false);
+ createQueue(1, "queues.testaddress2", "queue4", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue1", null);
+ addConsumer(2, 0, "queue2", null);
+ addConsumer(3, 1, "queue2", null);
+ addConsumer(4, 0, "queue3", null);
+ addConsumer(5, 1, "queue4", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(0, "queues.testaddress", 1, 1, false);
+
+ waitForBindings(0, "queues.testaddress2", 2, 2, true);
+ waitForBindings(0, "queues.testaddress2", 2, 2, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveAll(10, 0, 1);
+
+ verifyNotReceive(2, 3, 4, 5);
+ }
+
+
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java 2009-01-30 17:20:35 UTC (rev 5767)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java 2009-01-30 19:46:02 UTC (rev 5768)
@@ -48,6 +48,12 @@
*/
public class FakePostOffice implements PostOffice
{
+ public void sendQueueInfoToQueue(SimpleString queueName, SimpleString address) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
private ConcurrentHashMap<SimpleString, Binding> bindings = new ConcurrentHashMap<SimpleString, Binding>();
private ConcurrentHashSet<SimpleString> addresses = new ConcurrentHashSet<SimpleString>();
More information about the jboss-cvs-commits
mailing list