[jboss-cvs] JBoss Messaging SVN: r5416 - in trunk: src/main/org/jboss/messaging/core/server/cluster/impl and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Nov 21 08:59:08 EST 2008
Author: timfox
Date: 2008-11-21 08:59:07 -0500 (Fri, 21 Nov 2008)
New Revision: 5416
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/BasicMessageFlowTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowBatchSizeTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWildcardTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWithFilterTest.java
Removed:
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowBatchSizeTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithFilterTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithWildcardTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SimpleOutflowTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverPreCommitMessageTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/queue/DeadLetterAddressTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/queue/ExpiryAddressTest.java
Log:
More tests mainly
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-11-21 10:32:48 UTC (rev 5415)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-11-21 13:59:07 UTC (rev 5416)
@@ -218,7 +218,7 @@
final boolean fanout) throws Exception
{
Binding binding = createBinding(address, queueName, filter, durable, temporary, fanout);
-
+
addBindingInMemory(binding);
if (durable)
@@ -317,7 +317,7 @@
//Can be determined from the number of consumers on the queue
long routings = binding.getRoutings();
- if (routings <= lowestRoutings || lowestRoutings == -1)
+ if (routings < lowestRoutings || lowestRoutings == -1)
{
lowestRoutings = routings;
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java 2008-11-21 10:32:48 UTC (rev 5415)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java 2008-11-21 13:59:07 UTC (rev 5416)
@@ -33,6 +33,7 @@
import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
+import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.server.cluster.BroadcastGroup;
@@ -56,6 +57,8 @@
*/
public class ClusterManagerImpl implements ClusterManager
{
+ private static final Logger log = Logger.getLogger(ClusterManagerImpl.class);
+
private final Map<String, BroadcastGroup> broadcastGroups = new HashMap<String, BroadcastGroup>();
private final Map<String, DiscoveryGroup> discoveryGroups = new HashMap<String, DiscoveryGroup>();
@@ -97,18 +100,18 @@
{
return;
}
-
- for (BroadcastGroup group: broadcastGroups.values())
+
+ for (BroadcastGroup group : broadcastGroups.values())
{
group.start();
}
-
- for (DiscoveryGroup group: discoveryGroups.values())
+
+ for (DiscoveryGroup group : discoveryGroups.values())
{
group.start();
}
-
- for (MessageFlow flow: this.messageFlows.values())
+
+ for (MessageFlow flow : this.messageFlows.values())
{
flow.start();
}
@@ -122,18 +125,18 @@
{
return;
}
-
- for (BroadcastGroup group: broadcastGroups.values())
+
+ for (BroadcastGroup group : broadcastGroups.values())
{
group.stop();
}
-
- for (DiscoveryGroup group: discoveryGroups.values())
+
+ for (DiscoveryGroup group : discoveryGroups.values())
{
group.stop();
}
-
- for (MessageFlow flow: this.messageFlows.values())
+
+ for (MessageFlow flow : this.messageFlows.values())
{
flow.stop();
}
@@ -150,8 +153,10 @@
{
if (broadcastGroups.containsKey(config.getName()))
{
- throw new IllegalArgumentException("There is already a broadcast-group with name " + config.getName() +
- " deployed");
+ log.warn("There is already a broadcast-group with name " + config.getName() +
+ " deployed. This one will not be deployed.");
+
+ return;
}
InetAddress localBindAddress = InetAddress.getByName(config.getLocalBindAddress());
@@ -179,8 +184,10 @@
{
if (discoveryGroups.containsKey(config.getName()))
{
- throw new IllegalArgumentException("There is already a discovery-group with name " + config.getName() +
- " deployed");
+ log.warn("There is already a discovery-group with name " + config.getName() +
+ " deployed. This one will not be deployed.");
+
+ return;
}
InetAddress groupAddress = InetAddress.getByName(config.getGroupAddress());
@@ -194,11 +201,41 @@
public synchronized void deployMessageFlow(final MessageFlowConfiguration config) throws Exception
{
+ if (config.getName() == null)
+ {
+ log.warn("Must specify a unique name for each message flow. This one will not be deployed.");
+
+ return;
+ }
+
+ if (config.getAddress() == null)
+ {
+ log.warn("Must specify an address each message flow. This one will not be deployed.");
+
+ return;
+ }
+
if (messageFlows.containsKey(config.getName()))
{
- throw new IllegalArgumentException("There is already a message-flow with name " + config.getName() +
- " deployed");
+ log.warn("There is already a message-flow with name " + config.getName() +
+ " deployed. This one will not be deployed.");
+
+ return;
}
+
+ if (config.getMaxBatchTime() == 0 || config.getMaxBatchTime() < -1)
+ {
+ log.warn("Invalid value for max-batch-time. Valid values are -1 or > 0");
+
+ return;
+ }
+
+ if (config.getMaxBatchSize() < 1)
+ {
+ log.warn("Invalid value for max-batch-size. Valid values are > 0");
+
+ return;
+ }
Transformer transformer = null;
@@ -235,6 +272,7 @@
storageManager,
postOffice,
queueSettingsRepository,
+ scheduledExecutor,
transformer,
config.getConnectors());
}
@@ -246,8 +284,10 @@
if (group == null)
{
- throw new IllegalArgumentException("There is no discovery-group with name " + config.getDiscoveryGroupName() +
- " deployed");
+ log.warn("There is no discovery-group with name " + config.getDiscoveryGroupName() +
+ " deployed. This one will not be deployed.");
+
+ return;
}
flow = new MessageFlowImpl(new SimpleString(config.getName()),
@@ -261,6 +301,7 @@
storageManager,
postOffice,
queueSettingsRepository,
+ scheduledExecutor,
transformer,
group);
}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java 2008-11-21 10:32:48 UTC (rev 5415)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java 2008-11-21 13:59:07 UTC (rev 5416)
@@ -24,6 +24,9 @@
import java.util.LinkedList;
import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import org.jboss.messaging.core.client.ClientProducer;
import org.jboss.messaging.core.client.ClientSession;
@@ -77,13 +80,15 @@
private java.util.Queue<MessageReference> refs = new LinkedList<MessageReference>();
private Transaction tx;
-
+
+ private long lastReceivedTime = -1;
+
private final StorageManager storageManager;
private final PostOffice postOffice;
private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
-
+
private final Transformer transformer;
private final ClientSessionFactory csf;
@@ -93,6 +98,8 @@
private ClientProducer producer;
private volatile boolean started;
+
+ private final ScheduledFuture<?> future;
// Static --------------------------------------------------------
@@ -107,7 +114,8 @@
final long maxBatchTime,
final StorageManager storageManager,
final PostOffice postOffice,
- final HierarchicalRepository<QueueSettings> queueSettingsRepository,
+ final HierarchicalRepository<QueueSettings> queueSettingsRepository,
+ final ScheduledExecutorService scheduledExecutor,
final Transformer transformer) throws Exception
{
this.queue = queue;
@@ -126,7 +134,16 @@
this.transformer = transformer;
- this.csf = new ClientSessionFactoryImpl(connectorConfig);
+ this.csf = new ClientSessionFactoryImpl(connectorConfig);
+
+ if (maxBatchTime != -1)
+ {
+ future = scheduledExecutor.scheduleAtFixedRate(new BatchTimeout(), maxBatchTime, maxBatchTime, TimeUnit.MILLISECONDS);
+ }
+ else
+ {
+ future = null;
+ }
}
public synchronized void start() throws Exception
@@ -152,6 +169,11 @@
started = false;
queue.removeConsumer(this);
+
+ if (future != null)
+ {
+ future.cancel(false);
+ }
// Wait until all batches are complete
@@ -177,7 +199,7 @@
}
// Consumer implementation ---------------------------------------
-
+
public HandleStatus handle(final MessageReference reference) throws Exception
{
if (busy)
@@ -193,6 +215,11 @@
}
refs.add(reference);
+
+ if (maxBatchTime != -1)
+ {
+ lastReceivedTime = System.currentTimeMillis();
+ }
count++;
@@ -213,12 +240,35 @@
// Private -------------------------------------------------------
+ private synchronized void timeoutBatch()
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ if (lastReceivedTime != -1 && count > 0)
+ {
+ long now = System.currentTimeMillis();
+
+ if (now - lastReceivedTime >= maxBatchTime)
+ {
+ sendBatch();
+ }
+ }
+ }
+
private void sendBatch()
{
try
{
synchronized (this)
{
+ if (count == 0)
+ {
+ return;
+ }
+
// TODO - duplicate detection on sendee and if batch size = 1 then don't need tx
while (true)
@@ -284,5 +334,13 @@
sendBatch();
}
}
+
+ private class BatchTimeout implements Runnable
+ {
+ public void run()
+ {
+ timeoutBatch();
+ }
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java 2008-11-21 10:32:48 UTC (rev 5415)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java 2008-11-21 13:59:07 UTC (rev 5416)
@@ -28,6 +28,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.filter.Filter;
@@ -85,6 +86,8 @@
private Map<TransportConfiguration, Forwarder> forwarders = new HashMap<TransportConfiguration, Forwarder>();
private final DiscoveryGroup discoveryGroup;
+
+ private final ScheduledExecutorService scheduledExecutor;
private volatile boolean started;
@@ -101,6 +104,7 @@
final StorageManager storageManager,
final PostOffice postOffice,
final HierarchicalRepository<QueueSettings> queueSettingsRepository,
+ final ScheduledExecutorService scheduledExecutor,
final Transformer transformer,
final List<TransportConfiguration> connectors) throws Exception
{
@@ -127,6 +131,8 @@
this.transformer = transformer;
this.discoveryGroup = null;
+
+ this.scheduledExecutor = scheduledExecutor;
this.updateConnectors(connectors);
}
@@ -144,6 +150,7 @@
final StorageManager storageManager,
final PostOffice postOffice,
final HierarchicalRepository<QueueSettings> queueSettingsRepository,
+ final ScheduledExecutorService scheduledExecutor,
final Transformer transformer,
final DiscoveryGroup discoveryGroup) throws Exception
{
@@ -166,6 +173,8 @@
this.postOffice = postOffice;
this.queueSettingsRepository = queueSettingsRepository;
+
+ this.scheduledExecutor = scheduledExecutor;
this.transformer = transformer;
@@ -278,6 +287,7 @@
storageManager,
postOffice,
queueSettingsRepository,
+ scheduledExecutor,
transformer);
forwarders.put(connector, forwarder);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java 2008-11-21 10:32:48 UTC (rev 5415)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java 2008-11-21 13:59:07 UTC (rev 5416)
@@ -133,7 +133,7 @@
{
storageManager.updateDeliveryCount(this);
}
-
+
QueueSettings queueSettings = queueSettingsRepository.getMatch(queue.getName().toString());
int maxDeliveries = queueSettings.getMaxDeliveryAttempts();
@@ -147,11 +147,11 @@
else
{
long redeliveryDelay = queueSettings.getRedeliveryDelay();
-
+
if (redeliveryDelay > 0)
{
scheduledDeliveryTime = System.currentTimeMillis() + redeliveryDelay;
-
+
storageManager.updateScheduledDeliveryTime(this);
}
queue.referenceCancelled();
@@ -161,16 +161,19 @@
}
public void sendToDeadLetterAddress(final StorageManager persistenceManager,
- final PostOffice postOffice,
- final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
+ final PostOffice postOffice,
+ final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
{
- SimpleString deadLetterAddress = queueSettingsRepository.getMatch(queue.getName().toString()).getDeadLetterAddress();
+ SimpleString deadLetterAddress = queueSettingsRepository.getMatch(queue.getName().toString())
+ .getDeadLetterAddress();
if (deadLetterAddress != null)
{
List<Binding> bindingList = postOffice.getBindingsForAddress(deadLetterAddress);
- if(bindingList == null || bindingList.size() == 0)
+
+ if (bindingList.isEmpty())
{
- log.warn("Message has exceeded max delivery attempts. No bindings for Dead Letter Address " + deadLetterAddress + " so dropping it");
+ log.warn("Message has exceeded max delivery attempts. No bindings for Dead Letter Address " + deadLetterAddress +
+ " so dropping it");
}
else
{
@@ -179,8 +182,9 @@
}
else
{
- log.warn("Message has exceeded max delivery attempts. No Dead Letter Address configured for queue " + queue.getName() + " so dropping it");
-
+ log.warn("Message has exceeded max delivery attempts. No Dead Letter Address configured for queue " + queue.getName() +
+ " so dropping it");
+
Transaction tx = new TransactionImpl(persistenceManager, postOffice);
tx.addAcknowledgement(this);
tx.commit();
@@ -196,9 +200,10 @@
if (expiryAddress != null)
{
List<Binding> bindingList = postOffice.getBindingsForAddress(expiryAddress);
- if(bindingList == null || bindingList.size() == 0)
+
+ if (bindingList.isEmpty())
{
- log.warn("Message has expired. No bindings for Expiry Address " + expiryAddress + " so dropping it");
+ log.warn("Message has expired. No bindings for Expiry Address " + expiryAddress + " so dropping it");
}
else
{
@@ -244,7 +249,7 @@
Transaction tx = new TransactionImpl(persistenceManager, postOffice);
ServerMessage copyMessage = makeCopy(expiry, persistenceManager);
-
+
copyMessage.setDestination(otherBinding.getAddress());
tx.addMessage(copyMessage);
@@ -254,7 +259,7 @@
tx.commit();
}
- private void move(final SimpleString address,
+ private void move(final SimpleString address,
final StorageManager persistenceManager,
final PostOffice postOffice,
final boolean expiry) throws Exception
Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/BasicMessageFlowTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/BasicMessageFlowTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/BasicMessageFlowTest.java 2008-11-21 13:59:07 UTC (rev 5416)
@@ -0,0 +1,786 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.distribution;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ *
+ * A ActivationTimeoutTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 4 Nov 2008 16:54:50
+ *
+ *
+ */
+public class BasicMessageFlowTest extends MessageFlowTestBase
+{
+ private static final Logger log = Logger.getLogger(BasicMessageFlowTest.class);
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testStaticListOutflow() throws Exception
+ {
+ Map<String, Object> service0Params = new HashMap<String, Object>();
+ MessagingService service0 = createMessagingService(0, service0Params);
+
+ Map<String, Object> service1Params = new HashMap<String, Object>();
+ MessagingService service1 = createMessagingService(1, service1Params);
+ service1.start();
+
+ Map<String, Object> service2Params = new HashMap<String, Object>();
+ MessagingService service2 = createMessagingService(2, service2Params);
+ service2.start();
+
+ Map<String, Object> service3Params = new HashMap<String, Object>();
+ MessagingService service3 = createMessagingService(3, service3Params);
+ service3.start();
+
+ Map<String, Object> service4Params = new HashMap<String, Object>();
+ MessagingService service4 = createMessagingService(4, service4Params);
+ service4.start();
+
+ List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+
+ TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service1Params);
+ connectors.add(server1tc);
+
+ TransportConfiguration server2tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service2Params);
+ connectors.add(server2tc);
+
+ TransportConfiguration server3tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service3Params);
+ connectors.add(server3tc);
+
+ TransportConfiguration server4tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service4Params);
+ connectors.add(server4tc);
+
+ final SimpleString testAddress = new SimpleString("testaddress");
+
+ MessageFlowConfiguration ofconfig = new MessageFlowConfiguration("outflow1",
+ testAddress.toString(),
+ null,
+ true,
+ 1,
+ -1,
+ null,
+ connectors);
+ Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+ ofconfigs.add(ofconfig);
+ service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
+
+ service0.start();
+
+ TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service0Params);
+
+ ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+ ClientSession session0 = csf0.createSession(false, true, true);
+
+ ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+ ClientSession session1 = csf1.createSession(false, true, true);
+
+ ClientSessionFactory csf2 = new ClientSessionFactoryImpl(server2tc);
+ ClientSession session2 = csf2.createSession(false, true, true);
+
+ ClientSessionFactory csf3 = new ClientSessionFactoryImpl(server3tc);
+ ClientSession session3 = csf3.createSession(false, true, true);
+
+ ClientSessionFactory csf4 = new ClientSessionFactoryImpl(server4tc);
+ ClientSession session4 = csf4.createSession(false, true, true);
+
+ session0.createQueue(testAddress, testAddress, null, false, false, true);
+ session1.createQueue(testAddress, testAddress, null, false, false, true);
+ session2.createQueue(testAddress, testAddress, null, false, false, true);
+ session3.createQueue(testAddress, testAddress, null, false, false, true);
+ session4.createQueue(testAddress, testAddress, null, false, false, true);
+
+ ClientProducer prod0 = session0.createProducer(testAddress);
+
+ ClientConsumer cons0 = session0.createConsumer(testAddress);
+ ClientConsumer cons1 = session1.createConsumer(testAddress);
+ ClientConsumer cons2 = session2.createConsumer(testAddress);
+ ClientConsumer cons3 = session3.createConsumer(testAddress);
+ ClientConsumer cons4 = session4.createConsumer(testAddress);
+
+ session0.start();
+
+ session1.start();
+ session2.start();
+ session3.start();
+ session4.start();
+
+ final int numMessages = 100;
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+ message.putIntProperty(propKey, i);
+ message.getBody().flip();
+
+ prod0.send(message);
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage rmessage0 = cons0.receive(1000);
+ assertNotNull(rmessage0);
+ assertEquals(i, rmessage0.getProperty(propKey));
+
+ ClientMessage rmessage1 = cons1.receive(1000);
+ assertNotNull(rmessage1);
+ assertEquals(i, rmessage1.getProperty(propKey));
+
+ ClientMessage rmessage2 = cons2.receive(1000);
+ assertNotNull(rmessage2);
+ assertEquals(i, rmessage2.getProperty(propKey));
+
+ ClientMessage rmessage3 = cons3.receive(1000);
+ assertNotNull(rmessage3);
+ assertEquals(i, rmessage3.getProperty(propKey));
+
+ ClientMessage rmessage4 = cons4.receive(1000);
+ assertNotNull(rmessage4);
+ assertEquals(i, rmessage4.getProperty(propKey));
+ }
+
+ session0.close();
+ session1.close();
+ session2.close();
+ session3.close();
+ session4.close();
+
+ service0.stop();
+ service1.stop();
+ service2.stop();
+ service3.stop();
+ service4.stop();
+
+ assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+ assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+ assertEquals(0, service2.getServer().getRemotingService().getConnections().size());
+ assertEquals(0, service3.getServer().getRemotingService().getConnections().size());
+ assertEquals(0, service4.getServer().getRemotingService().getConnections().size());
+ }
+
+ public void testStaticListRoundRobin() throws Exception
+ {
+ Map<String, Object> service0Params = new HashMap<String, Object>();
+ MessagingService service0 = createMessagingService(0, service0Params);
+
+ Map<String, Object> service1Params = new HashMap<String, Object>();
+ MessagingService service1 = createMessagingService(1, service1Params);
+ service1.start();
+
+ Map<String, Object> service2Params = new HashMap<String, Object>();
+ MessagingService service2 = createMessagingService(2, service2Params);
+ service2.start();
+
+ Map<String, Object> service3Params = new HashMap<String, Object>();
+ MessagingService service3 = createMessagingService(3, service3Params);
+ service3.start();
+
+ Map<String, Object> service4Params = new HashMap<String, Object>();
+ MessagingService service4 = createMessagingService(4, service4Params);
+ service4.start();
+
+ List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+
+ TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service1Params);
+ connectors.add(server1tc);
+
+ TransportConfiguration server2tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service2Params);
+ connectors.add(server2tc);
+
+ TransportConfiguration server3tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service3Params);
+ connectors.add(server3tc);
+
+ TransportConfiguration server4tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service4Params);
+ connectors.add(server4tc);
+
+ final SimpleString testAddress = new SimpleString("testaddress");
+
+ MessageFlowConfiguration ofconfig = new MessageFlowConfiguration("outflow1",
+ testAddress.toString(),
+ null,
+ false,
+ 1,
+ -1,
+ null,
+ connectors);
+ Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+ ofconfigs.add(ofconfig);
+ service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
+
+ service0.start();
+
+ TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service0Params);
+
+ ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+ ClientSession session0 = csf0.createSession(false, true, true);
+
+ ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+ ClientSession session1 = csf1.createSession(false, true, true);
+
+ ClientSessionFactory csf2 = new ClientSessionFactoryImpl(server2tc);
+ ClientSession session2 = csf2.createSession(false, true, true);
+
+ ClientSessionFactory csf3 = new ClientSessionFactoryImpl(server3tc);
+ ClientSession session3 = csf3.createSession(false, true, true);
+
+ ClientSessionFactory csf4 = new ClientSessionFactoryImpl(server4tc);
+ ClientSession session4 = csf4.createSession(false, true, true);
+
+ session0.createQueue(testAddress, testAddress, null, false, false, false);
+ session1.createQueue(testAddress, testAddress, null, false, false, false);
+ session2.createQueue(testAddress, testAddress, null, false, false, false);
+ session3.createQueue(testAddress, testAddress, null, false, false, false);
+ session4.createQueue(testAddress, testAddress, null, false, false, false);
+
+ ClientProducer prod0 = session0.createProducer(testAddress);
+
+ ClientConsumer cons0 = session0.createConsumer(testAddress);
+ ClientConsumer cons1 = session1.createConsumer(testAddress);
+ ClientConsumer cons2 = session2.createConsumer(testAddress);
+ ClientConsumer cons3 = session3.createConsumer(testAddress);
+ ClientConsumer cons4 = session4.createConsumer(testAddress);
+
+ session0.start();
+
+ session1.start();
+ session2.start();
+ session3.start();
+ session4.start();
+
+ final int numMessages = 10;
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+ message.putIntProperty(propKey, i);
+ message.getBody().flip();
+
+ prod0.send(message);
+ }
+
+ //Refs should be round-robin'd in the same order the connectors are specified in the outflow
+ //With the local consumer being last since it was created last
+
+ ArrayList<ClientConsumer> consumers = new ArrayList<ClientConsumer>();
+
+ consumers.add(cons1);
+ consumers.add(cons2);
+ consumers.add(cons3);
+ consumers.add(cons4);
+ consumers.add(cons0);
+
+ int count = 0;
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientConsumer consumer = consumers.get(count);
+
+ count++;
+ if (count == consumers.size())
+ {
+ count = 0;
+ }
+
+ ClientMessage msg = consumer.receive(1000);
+
+ assertNotNull(msg);
+
+ assertEquals(i, msg.getProperty(propKey));
+
+ msg.acknowledge();
+ }
+
+ session0.close();
+ session1.close();
+ session2.close();
+ session3.close();
+ session4.close();
+
+ service0.stop();
+ service1.stop();
+ service2.stop();
+ service3.stop();
+ service4.stop();
+
+ assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+ assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+ assertEquals(0, service2.getServer().getRemotingService().getConnections().size());
+ assertEquals(0, service3.getServer().getRemotingService().getConnections().size());
+ assertEquals(0, service4.getServer().getRemotingService().getConnections().size());
+ }
+
+ public void testMultipleFlows() throws Exception
+ {
+ Map<String, Object> service0Params = new HashMap<String, Object>();
+ MessagingService service0 = createMessagingService(0, service0Params);
+
+ Map<String, Object> service1Params = new HashMap<String, Object>();
+ MessagingService service1 = createMessagingService(1, service1Params);
+ service1.start();
+
+ Map<String, Object> service2Params = new HashMap<String, Object>();
+ MessagingService service2 = createMessagingService(2, service2Params);
+ service2.start();
+
+ Map<String, Object> service3Params = new HashMap<String, Object>();
+ MessagingService service3 = createMessagingService(3, service3Params);
+ service3.start();
+
+ Map<String, Object> service4Params = new HashMap<String, Object>();
+ MessagingService service4 = createMessagingService(4, service4Params);
+ service4.start();
+
+ List<TransportConfiguration> connectors1 = new ArrayList<TransportConfiguration>();
+ TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service1Params);
+ connectors1.add(server1tc);
+
+ List<TransportConfiguration> connectors2 = new ArrayList<TransportConfiguration>();
+ TransportConfiguration server2tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service2Params);
+ connectors2.add(server2tc);
+
+ List<TransportConfiguration> connectors3 = new ArrayList<TransportConfiguration>();
+ TransportConfiguration server3tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service3Params);
+ connectors3.add(server3tc);
+
+ List<TransportConfiguration> connectors4 = new ArrayList<TransportConfiguration>();
+ TransportConfiguration server4tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service4Params);
+ connectors4.add(server4tc);
+
+ final SimpleString testAddress = new SimpleString("testaddress");
+
+ MessageFlowConfiguration ofconfig1 = new MessageFlowConfiguration("flow1",
+ testAddress.toString(),
+ "beatle='john'",
+ false,
+ 1,
+ -1,
+ null,
+ connectors1);
+ MessageFlowConfiguration ofconfig2 = new MessageFlowConfiguration("flow2",
+ testAddress.toString(),
+ "beatle='paul'",
+ false,
+ 1,
+ -1,
+ null,
+ connectors2);
+ MessageFlowConfiguration ofconfig3 = new MessageFlowConfiguration("flow3",
+ testAddress.toString(),
+ "beatle='george'",
+ false,
+ 1,
+ -1,
+ null,
+ connectors3);
+ MessageFlowConfiguration ofconfig4 = new MessageFlowConfiguration("flow4",
+ testAddress.toString(),
+ "beatle='ringo'",
+ false,
+ 1,
+ -1,
+ null,
+ connectors4);
+
+ Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+ ofconfigs.add(ofconfig1);
+ ofconfigs.add(ofconfig2);
+ ofconfigs.add(ofconfig3);
+ ofconfigs.add(ofconfig4);
+ service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
+
+ service0.start();
+
+ TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service0Params);
+
+ ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+ ClientSession session0 = csf0.createSession(false, true, true);
+
+ ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+ ClientSession session1 = csf1.createSession(false, true, true);
+
+ ClientSessionFactory csf2 = new ClientSessionFactoryImpl(server2tc);
+ ClientSession session2 = csf2.createSession(false, true, true);
+
+ ClientSessionFactory csf3 = new ClientSessionFactoryImpl(server3tc);
+ ClientSession session3 = csf3.createSession(false, true, true);
+
+ ClientSessionFactory csf4 = new ClientSessionFactoryImpl(server4tc);
+ ClientSession session4 = csf4.createSession(false, true, true);
+
+ session0.createQueue(testAddress, testAddress, null, false, false, false);
+ session1.createQueue(testAddress, testAddress, null, false, false, false);
+ session2.createQueue(testAddress, testAddress, null, false, false, false);
+ session3.createQueue(testAddress, testAddress, null, false, false, false);
+ session4.createQueue(testAddress, testAddress, null, false, false, false);
+
+ ClientProducer prod0 = session0.createProducer(testAddress);
+
+ ClientConsumer cons1 = session1.createConsumer(testAddress);
+ ClientConsumer cons2 = session2.createConsumer(testAddress);
+ ClientConsumer cons3 = session3.createConsumer(testAddress);
+ ClientConsumer cons4 = session4.createConsumer(testAddress);
+
+ session1.start();
+ session2.start();
+ session3.start();
+ session4.start();
+
+ SimpleString propKey = new SimpleString("beatle");
+
+ ClientMessage messageJohn = session0.createClientMessage(false);
+ messageJohn.putStringProperty(propKey, new SimpleString("john"));
+ messageJohn.getBody().flip();
+
+ ClientMessage messagePaul = session0.createClientMessage(false);
+ messagePaul.putStringProperty(propKey, new SimpleString("paul"));
+ messagePaul.getBody().flip();
+
+ ClientMessage messageGeorge = session0.createClientMessage(false);
+ messageGeorge.putStringProperty(propKey, new SimpleString("george"));
+ messageGeorge.getBody().flip();
+
+ ClientMessage messageRingo = session0.createClientMessage(false);
+ messageRingo.putStringProperty(propKey, new SimpleString("ringo"));
+ messageRingo.getBody().flip();
+
+ ClientMessage messageOsama = session0.createClientMessage(false);
+ messageOsama.putStringProperty(propKey, new SimpleString("osama"));
+ messageOsama.getBody().flip();
+
+ prod0.send(messageJohn);
+ prod0.send(messagePaul);
+ prod0.send(messageGeorge);
+ prod0.send(messageRingo);
+ prod0.send(messageOsama);
+
+ ClientMessage r1 = cons1.receive(1000);
+ assertNotNull(r1);
+ assertEquals(new SimpleString("john"), r1.getProperty(propKey));
+ r1 = cons1.receiveImmediate();
+ assertNull(r1);
+
+ ClientMessage r2 = cons2.receive(1000);
+ assertNotNull(r2);
+ assertEquals(new SimpleString("paul"), r2.getProperty(propKey));
+ r2 = cons2.receiveImmediate();
+ assertNull(r2);
+
+ ClientMessage r3 = cons3.receive(1000);
+ assertNotNull(r3);
+ assertEquals(new SimpleString("george"), r3.getProperty(propKey));
+ r3 = cons3.receiveImmediate();
+ assertNull(r3);
+
+ ClientMessage r4 = cons4.receive(1000);
+ assertNotNull(r4);
+ assertEquals(new SimpleString("ringo"), r4.getProperty(propKey));
+ r4 = cons4.receiveImmediate();
+ assertNull(r4);
+
+ session0.close();
+ session1.close();
+ session2.close();
+ session3.close();
+ session4.close();
+
+ service0.stop();
+ service1.stop();
+ service2.stop();
+ service3.stop();
+ service4.stop();
+
+ assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+ assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+ assertEquals(0, service2.getServer().getRemotingService().getConnections().size());
+ assertEquals(0, service3.getServer().getRemotingService().getConnections().size());
+ assertEquals(0, service4.getServer().getRemotingService().getConnections().size());
+ }
+
+ public void testMessageFlowsSameName() throws Exception
+ {
+ Map<String, Object> service0Params = new HashMap<String, Object>();
+ MessagingService service0 = createMessagingService(0, service0Params);
+
+ Map<String, Object> service1Params = new HashMap<String, Object>();
+ MessagingService service1 = createMessagingService(1, service1Params);
+ service1.start();
+
+ TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service0Params);
+
+ List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+ TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service1Params);
+ connectors.add(server1tc);
+
+ final SimpleString address1 = new SimpleString("testaddress");
+
+ MessageFlowConfiguration ofconfig1 = new MessageFlowConfiguration("flow1", address1.toString(), "car='saab'", true, 1, -1, null, connectors);
+ MessageFlowConfiguration ofconfig2 = new MessageFlowConfiguration("flow1", address1.toString(),"car='bmw'", true, 1, -1, null, connectors);
+
+ Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+ ofconfigs.add(ofconfig1);
+ ofconfigs.add(ofconfig2);
+
+ service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
+
+ //Only one of the flows should be deployed
+ service0.start();
+
+ ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+ ClientSession session0 = csf0.createSession(false, true, true);
+
+ ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+ ClientSession session1 = csf1.createSession(false, true, true);
+
+ session0.createQueue(address1, address1, null, false, false, false);
+ session1.createQueue(address1, address1, null, false, false, false);
+ ClientProducer prod0 = session0.createProducer(address1);
+
+ ClientConsumer cons1 = session1.createConsumer(address1);
+
+ session1.start();
+
+ SimpleString propKey = new SimpleString("car");
+
+ ClientMessage messageSaab = session0.createClientMessage(false);
+ messageSaab.putStringProperty(propKey, new SimpleString("saab"));
+ messageSaab.getBody().flip();
+
+ ClientMessage messageBMW = session0.createClientMessage(false);
+ messageBMW.putStringProperty(propKey, new SimpleString("bmw"));
+ messageBMW.getBody().flip();
+
+ prod0.send(messageSaab);
+ prod0.send(messageBMW);
+
+ ClientMessage r1 = cons1.receive(1000);
+ assertNotNull(r1);
+
+ SimpleString val = (SimpleString)r1.getProperty(propKey);
+ assertTrue(val.equals(new SimpleString("saab")) || val.equals(new SimpleString("bmw")));
+ r1 = cons1.receiveImmediate();
+ assertNull(r1);
+
+ session0.close();
+ session1.close();
+
+ service0.stop();
+ service1.stop();
+
+ assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+ assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+ }
+
+ public void testMessageNullName() throws Exception
+ {
+ Map<String, Object> service0Params = new HashMap<String, Object>();
+ MessagingService service0 = createMessagingService(0, service0Params);
+
+ Map<String, Object> service1Params = new HashMap<String, Object>();
+ MessagingService service1 = createMessagingService(1, service1Params);
+ service1.start();
+
+ TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service0Params);
+
+ List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+ TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service1Params);
+ connectors.add(server1tc);
+
+ final SimpleString address1 = new SimpleString("testaddress");
+
+ MessageFlowConfiguration ofconfig1 = new MessageFlowConfiguration(null, address1.toString(), null, true, 1, -1, null, connectors);
+
+ Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+ ofconfigs.add(ofconfig1);
+
+ service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
+
+ service0.start();
+
+ ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+ ClientSession session0 = csf0.createSession(false, true, true);
+
+ ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+ ClientSession session1 = csf1.createSession(false, true, true);
+
+ session0.createQueue(address1, address1, null, false, false, false);
+ session1.createQueue(address1, address1, null, false, false, false);
+ ClientProducer prod0 = session0.createProducer(address1);
+
+ ClientConsumer cons1 = session1.createConsumer(address1);
+
+ session1.start();
+
+ SimpleString propKey = new SimpleString("car");
+
+ ClientMessage message = session0.createClientMessage(false);
+ message.getBody().flip();
+
+ prod0.send(message);
+
+ ClientMessage r1 = cons1.receive(1000);
+ assertNull(r1);
+
+ session0.close();
+ session1.close();
+
+ service0.stop();
+ service1.stop();
+
+ assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+ assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+ }
+
+ public void testMessageNullAdress() throws Exception
+ {
+ Map<String, Object> service0Params = new HashMap<String, Object>();
+ MessagingService service0 = createMessagingService(0, service0Params);
+
+ Map<String, Object> service1Params = new HashMap<String, Object>();
+ MessagingService service1 = createMessagingService(1, service1Params);
+ service1.start();
+
+ TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service0Params);
+
+ List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+ TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service1Params);
+ connectors.add(server1tc);
+
+ final SimpleString address1 = new SimpleString("testaddress");
+
+ MessageFlowConfiguration ofconfig1 = new MessageFlowConfiguration("blah", null, null, true, 1, -1, null, connectors);
+
+ Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+ ofconfigs.add(ofconfig1);
+
+ service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
+
+ service0.start();
+
+ ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+ ClientSession session0 = csf0.createSession(false, true, true);
+
+ ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+ ClientSession session1 = csf1.createSession(false, true, true);
+
+ session0.createQueue(address1, address1, null, false, false, false);
+ session1.createQueue(address1, address1, null, false, false, false);
+ ClientProducer prod0 = session0.createProducer(address1);
+
+ ClientConsumer cons1 = session1.createConsumer(address1);
+
+ session1.start();
+
+ SimpleString propKey = new SimpleString("car");
+
+ ClientMessage message = session0.createClientMessage(false);
+ message.getBody().flip();
+
+ prod0.send(message);
+
+ ClientMessage r1 = cons1.receive(1000);
+ assertNull(r1);
+
+ session0.close();
+ session1.close();
+
+ service0.stop();
+ service1.stop();
+
+ assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+ assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ assertEquals(0, InVMRegistry.instance.size());
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Copied: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowBatchSizeTest.java (from rev 5412, trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowBatchSizeTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowBatchSizeTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowBatchSizeTest.java 2008-11-21 13:59:07 UTC (rev 5416)
@@ -0,0 +1,203 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.distribution;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ *
+ * A MessageFlowBatchSizeTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 15 Nov 2008 09:06:55
+ *
+ *
+ */
+public class MessageFlowBatchSizeTest extends MessageFlowTestBase
+{
+ private static final Logger log = Logger.getLogger(MessageFlowBatchSizeTest.class);
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testBatchSize() throws Exception
+ {
+ Map<String, Object> service0Params = new HashMap<String, Object>();
+ MessagingService service0 = createMessagingService(0, service0Params);
+
+ Map<String, Object> service1Params = new HashMap<String, Object>();
+ MessagingService service1 = createMessagingService(1, service1Params);
+ service1.start();
+
+ List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+ TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service1Params);
+ connectors.add(server1tc);
+
+ final SimpleString address1 = new SimpleString("testaddress");
+
+ final int batchSize = 10;
+
+ MessageFlowConfiguration ofconfig = new MessageFlowConfiguration("outflow1", address1.toString(), null, true, batchSize, -1, null, connectors);
+ Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+ ofconfigs.add(ofconfig);
+ service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
+
+ service0.start();
+
+ TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service0Params);
+
+ ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+
+ ClientSession session0 = csf0.createSession(false, true, true);
+
+ ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+
+ ClientSession session1 = csf1.createSession(false, true, true);
+
+ session0.createQueue(address1, address1, null, false, false, true);
+
+ session1.createQueue(address1, address1, null, false, false, true);
+
+ ClientProducer prod0_1 = session0.createProducer(address1);
+
+ ClientConsumer cons0_1 = session0.createConsumer(address1);
+
+ ClientConsumer cons1_1 = session1.createConsumer(address1);
+
+ session0.start();
+
+ session1.start();
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ for (int j = 0; j < 10; j++)
+ {
+
+ for (int i = 0; i < batchSize - 1; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+ message.putIntProperty(propKey, i);
+ message.getBody().flip();
+
+ prod0_1.send(message);
+ }
+
+ for (int i = 0; i < batchSize - 1; i++)
+ {
+ ClientMessage rmessage1 = cons0_1.receive(1000);
+
+ assertNotNull(rmessage1);
+
+ assertEquals(i, rmessage1.getProperty(propKey));
+ }
+
+ ClientMessage rmessage1 = cons1_1.receive(250);
+
+ assertNull(rmessage1);
+
+ ClientMessage message = session0.createClientMessage(false);
+ message.putIntProperty(propKey, batchSize - 1);
+ message.getBody().flip();
+
+ prod0_1.send(message);
+
+ rmessage1 = cons0_1.receive(1000);
+
+ assertNotNull(rmessage1);
+
+ assertEquals(batchSize - 1, rmessage1.getProperty(propKey));
+
+ for (int i = 0; i < batchSize; i++)
+ {
+ rmessage1 = cons1_1.receive(1000);
+
+ assertNotNull(rmessage1);
+
+ assertEquals(i, rmessage1.getProperty(propKey));
+ }
+ }
+
+ session0.close();
+
+ session1.close();
+
+ service0.stop();
+ service1.stop();
+
+ assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+ assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ assertEquals(0, InVMRegistry.instance.size());
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
+
+
Copied: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWildcardTest.java (from rev 5412, trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithWildcardTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWildcardTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWildcardTest.java 2008-11-21 13:59:07 UTC (rev 5416)
@@ -0,0 +1,282 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.distribution;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ *
+ * A MessageFlowWildcardTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 15 Nov 2008 08:19:07
+ *
+ *
+ */
+public class MessageFlowWildcardTest extends MessageFlowTestBase
+{
+ private static final Logger log = Logger.getLogger(MessageFlowWildcardTest.class);
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testWithWildcard() throws Exception
+ {
+ Map<String, Object> service0Params = new HashMap<String, Object>();
+ MessagingService service0 = createMessagingService(0, service0Params);
+
+ Map<String, Object> service1Params = new HashMap<String, Object>();
+ MessagingService service1 = createMessagingService(1, service1Params);
+ service1.start();
+
+ List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+ TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service1Params);
+ connectors.add(server1tc);
+
+ final SimpleString address1 = new SimpleString("cheese.stilton");
+
+ final SimpleString address2 = new SimpleString("cheese.wensleydale");
+
+ final SimpleString address3 = new SimpleString("wine.shiraz");
+
+ final SimpleString address4 = new SimpleString("wine.cabernet");
+
+ final SimpleString match1 = new SimpleString("cheese.#");
+
+
+ MessageFlowConfiguration ofconfig = new MessageFlowConfiguration("outflow1", match1.toString(), null, true, 1, -1, null, connectors);
+ Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+ ofconfigs.add(ofconfig);
+ service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
+
+ service0.start();
+
+ TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service0Params);
+
+ ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+
+ ClientSession session0 = csf0.createSession(false, true, true);
+
+ ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+
+ ClientSession session1 = csf1.createSession(false, true, true);
+
+ session0.createQueue(address1, address1, null, false, false, true);
+ session0.createQueue(address2, address2, null, false, false, true);
+ session0.createQueue(address3, address3, null, false, false, true);
+ session0.createQueue(address4, address4, null, false, false, true);
+
+ session1.createQueue(address1, address1, null, false, false, true);
+ session1.createQueue(address2, address2, null, false, false, true);
+ session1.createQueue(address3, address3, null, false, false, true);
+ session1.createQueue(address4, address4, null, false, false, true);
+
+ ClientProducer prod0_1 = session0.createProducer(address1);
+ ClientProducer prod0_2 = session0.createProducer(address2);
+ ClientProducer prod0_3 = session0.createProducer(address3);
+ ClientProducer prod0_4 = session0.createProducer(address4);
+
+ ClientConsumer cons0_1 = session0.createConsumer(address1);
+ ClientConsumer cons0_2 = session0.createConsumer(address2);
+ ClientConsumer cons0_3 = session0.createConsumer(address3);
+ ClientConsumer cons0_4 = session0.createConsumer(address4);
+
+ ClientConsumer cons1_1 = session1.createConsumer(address1);
+ ClientConsumer cons1_2 = session1.createConsumer(address2);
+ ClientConsumer cons1_3 = session1.createConsumer(address3);
+ ClientConsumer cons1_4 = session1.createConsumer(address4);
+
+ session0.start();
+
+ session1.start();
+
+ final int numMessages = 100;
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+ message.putIntProperty(propKey, i);
+ message.getBody().flip();
+
+ prod0_1.send(message);
+ }
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+ message.putIntProperty(propKey, i);
+ message.getBody().flip();
+
+ prod0_2.send(message);
+ }
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+ message.putIntProperty(propKey, i);
+ message.getBody().flip();
+
+ prod0_3.send(message);
+ }
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+ message.putIntProperty(propKey, i);
+ message.getBody().flip();
+
+ prod0_4.send(message);
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage rmessage1 = cons0_1.receive(1000);
+
+ assertNotNull(rmessage1);
+
+ assertEquals(i, rmessage1.getProperty(propKey));
+
+ ClientMessage rmessage2 = cons0_2.receive(1000);
+
+ assertNotNull(rmessage2);
+
+ assertEquals(i, rmessage2.getProperty(propKey));
+
+ ClientMessage rmessage3 = cons0_3.receive(1000);
+
+ assertNotNull(rmessage3);
+
+ assertEquals(i, rmessage3.getProperty(propKey));
+
+ ClientMessage rmessage4 = cons0_4.receive(1000);
+
+ assertNotNull(rmessage4);
+
+ assertEquals(i, rmessage4.getProperty(propKey));
+ }
+
+ ClientMessage rmessage1 = cons0_1.receiveImmediate();
+
+ assertNull(rmessage1);
+
+ ClientMessage rmessage2 = cons0_2.receiveImmediate();
+
+ assertNull(rmessage2);
+
+ ClientMessage rmessage3 = cons0_3.receiveImmediate();
+
+ assertNull(rmessage3);
+
+ ClientMessage rmessage4 = cons0_4.receiveImmediate();
+
+ assertNull(rmessage4);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ rmessage1 = cons1_1.receive(1000);
+
+ assertNotNull(rmessage1);
+
+ assertEquals(i, rmessage1.getProperty(propKey));
+
+ rmessage2 = cons1_2.receive(1000);
+
+ assertNotNull(rmessage2);
+
+ assertEquals(i, rmessage2.getProperty(propKey));
+ }
+
+ rmessage1 = cons1_1.receiveImmediate();
+
+ assertNull(rmessage1);
+
+ rmessage2 = cons1_2.receiveImmediate();
+
+ assertNull(rmessage2);
+
+ rmessage3 = cons1_3.receiveImmediate();
+
+ assertNull(rmessage3);
+
+ rmessage4 = cons1_4.receiveImmediate();
+
+ assertNull(rmessage4);
+
+ session0.close();
+
+ session1.close();
+
+ service0.stop();
+ service1.stop();
+
+ assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+ assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ assertEquals(0, InVMRegistry.instance.size());
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
+
Copied: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWithFilterTest.java (from rev 5412, trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithFilterTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWithFilterTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWithFilterTest.java 2008-11-21 13:59:07 UTC (rev 5416)
@@ -0,0 +1,206 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.distribution;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ *
+ * A MessageFlowWithFilterTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 15 Nov 2008 08:58:49
+ *
+ *
+ */
+public class MessageFlowWithFilterTest extends MessageFlowTestBase
+{
+ private static final Logger log = Logger.getLogger(MessageFlowWithFilterTest.class);
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testWithWildcard() throws Exception
+ {
+ Map<String, Object> service0Params = new HashMap<String, Object>();
+ MessagingService service0 = createMessagingService(0, service0Params);
+
+ Map<String, Object> service1Params = new HashMap<String, Object>();
+ MessagingService service1 = createMessagingService(1, service1Params);
+ service1.start();
+
+ List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+ TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service1Params);
+ connectors.add(server1tc);
+
+ final SimpleString address1 = new SimpleString("testaddress");
+
+ final String filter = "selectorkey='ORANGES'";
+
+ MessageFlowConfiguration ofconfig = new MessageFlowConfiguration("outflow1", address1.toString(), filter, true, 1, -1, null, connectors);
+ Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+ ofconfigs.add(ofconfig);
+ service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
+
+ service0.start();
+
+ TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service0Params);
+
+ ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+
+ ClientSession session0 = csf0.createSession(false, true, true);
+
+ ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+
+ ClientSession session1 = csf1.createSession(false, true, true);
+
+ session0.createQueue(address1, address1, null, false, false, true);
+
+ session1.createQueue(address1, address1, null, false, false, true);
+
+ ClientProducer prod0_1 = session0.createProducer(address1);
+
+ ClientConsumer cons0_1 = session0.createConsumer(address1);
+
+ ClientConsumer cons1_1 = session1.createConsumer(address1);
+
+ session0.start();
+
+ session1.start();
+
+ final int numMessages = 100;
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ final SimpleString propKey2 = new SimpleString("selectorkey");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+ message.putIntProperty(propKey, i);
+ message.putStringProperty(propKey2, new SimpleString("ORANGES"));
+ message.getBody().flip();
+
+ prod0_1.send(message);
+ }
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+ message.putIntProperty(propKey, i);
+ message.putStringProperty(propKey2, new SimpleString("APPLES"));
+ message.getBody().flip();
+
+ prod0_1.send(message);
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage rmessage1 = cons0_1.receive(1000);
+
+ assertNotNull(rmessage1);
+
+ assertEquals(i, rmessage1.getProperty(propKey));
+
+ ClientMessage rmessage2 = cons1_1.receive(1000);
+
+ assertNotNull(rmessage2);
+
+ assertEquals(i, rmessage2.getProperty(propKey));
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage rmessage1 = cons0_1.receive(1000);
+
+ assertNotNull(rmessage1);
+
+ assertEquals(i, rmessage1.getProperty(propKey));
+ }
+
+ ClientMessage rmessage1 = cons0_1.receiveImmediate();
+
+ assertNull(rmessage1);
+
+ ClientMessage rmessage2 = cons1_1.receiveImmediate();
+
+ assertNull(rmessage2);
+
+ session0.close();
+
+ session1.close();
+
+ service0.stop();
+ service1.stop();
+
+ assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+ assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ assertEquals(0, InVMRegistry.instance.size());
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
+
Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowBatchSizeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowBatchSizeTest.java 2008-11-21 10:32:48 UTC (rev 5415)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowBatchSizeTest.java 2008-11-21 13:59:07 UTC (rev 5416)
@@ -1,225 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
- * Middleware LLC, and individual contributors by the @authors tag. See the
- * copyright.txt in the distribution for a full listing of individual
- * contributors.
- *
- * This is free software; you can redistribute it and/or modify it under the
- * terms of the GNU Lesser General Public License as published by the Free
- * Software Foundation; either version 2.1 of the License, or (at your option)
- * any later version.
- *
- * This software is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- * details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this software; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
- * site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.integration.cluster.distribution;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import junit.framework.TestCase;
-
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
-import org.jboss.messaging.core.config.impl.ConfigurationImpl;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
-import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
-import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- *
- * A OutflowBatchSizeTest
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * Created 15 Nov 2008 09:06:55
- *
- *
- */
-public class OutflowBatchSizeTest extends TestCase
-{
- private static final Logger log = Logger.getLogger(OutflowBatchSizeTest.class);
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private MessagingService service0;
-
- private MessagingService service1;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testBatchSize() throws Exception
- {
- Configuration service0Conf = new ConfigurationImpl();
- service0Conf.setClustered(true);
- service0Conf.setSecurityEnabled(false);
- Map<String, Object> service0Params = new HashMap<String, Object>();
- service0Params.put(TransportConstants.SERVER_ID_PROP_NAME, 0);
- service0Conf.getAcceptorConfigurations()
- .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
- service0Params));
-
- Configuration service1Conf = new ConfigurationImpl();
- service1Conf.setClustered(true);
- service1Conf.setSecurityEnabled(false);
- Map<String, Object> service1Params = new HashMap<String, Object>();
- service1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
-
- service1Conf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
- service1Params));
- service1 = MessagingServiceImpl.newNullStorageMessagingServer(service1Conf);
- service1.start();
-
- List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
- TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
- service1Params);
- connectors.add(server1tc);
-
- final SimpleString address1 = new SimpleString("testaddress");
-
- final int batchSize = 10;
-
- MessageFlowConfiguration ofconfig = new MessageFlowConfiguration("outflow1", address1.toString(), null, true, batchSize, 0, null, connectors);
- Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
- ofconfigs.add(ofconfig);
- service0Conf.setMessageFlowConfigurations(ofconfigs);
-
- service0 = MessagingServiceImpl.newNullStorageMessagingServer(service0Conf);
- service0.start();
-
- TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
- service0Params);
-
- ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
-
- ClientSession session0 = csf0.createSession(false, true, true);
-
- ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
-
- ClientSession session1 = csf1.createSession(false, true, true);
-
- session0.createQueue(address1, address1, null, false, false, true);
-
- session1.createQueue(address1, address1, null, false, false, true);
-
- ClientProducer prod0_1 = session0.createProducer(address1);
-
- ClientConsumer cons0_1 = session0.createConsumer(address1);
-
- ClientConsumer cons1_1 = session1.createConsumer(address1);
-
- session0.start();
-
- session1.start();
-
- final SimpleString propKey = new SimpleString("testkey");
-
- for (int j = 0; j < 10; j++)
- {
-
- for (int i = 0; i < batchSize - 1; i++)
- {
- ClientMessage message = session0.createClientMessage(false);
- message.putIntProperty(propKey, i);
- message.getBody().flip();
-
- prod0_1.send(message);
- }
-
- for (int i = 0; i < batchSize - 1; i++)
- {
- ClientMessage rmessage1 = cons0_1.receive(1000);
-
- assertNotNull(rmessage1);
-
- assertEquals(i, rmessage1.getProperty(propKey));
- }
-
- ClientMessage rmessage1 = cons1_1.receive(250);
-
- assertNull(rmessage1);
-
- ClientMessage message = session0.createClientMessage(false);
- message.putIntProperty(propKey, batchSize - 1);
- message.getBody().flip();
-
- prod0_1.send(message);
-
- rmessage1 = cons0_1.receive(1000);
-
- assertNotNull(rmessage1);
-
- assertEquals(batchSize - 1, rmessage1.getProperty(propKey));
-
- for (int i = 0; i < batchSize; i++)
- {
- rmessage1 = cons1_1.receive(1000);
-
- assertNotNull(rmessage1);
-
- assertEquals(i, rmessage1.getProperty(propKey));
- }
- }
-
- session0.close();
-
- session1.close();
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- @Override
- protected void setUp() throws Exception
- {
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- service0.stop();
-
- assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
-
- service1.stop();
-
- assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
-
- assertEquals(0, InVMRegistry.instance.size());
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
-
-
Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithFilterTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithFilterTest.java 2008-11-21 10:32:48 UTC (rev 5415)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithFilterTest.java 2008-11-21 13:59:07 UTC (rev 5416)
@@ -1,232 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
- * Middleware LLC, and individual contributors by the @authors tag. See the
- * copyright.txt in the distribution for a full listing of individual
- * contributors.
- *
- * This is free software; you can redistribute it and/or modify it under the
- * terms of the GNU Lesser General Public License as published by the Free
- * Software Foundation; either version 2.1 of the License, or (at your option)
- * any later version.
- *
- * This software is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- * details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this software; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
- * site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.integration.cluster.distribution;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import junit.framework.TestCase;
-
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
-import org.jboss.messaging.core.config.impl.ConfigurationImpl;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
-import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
-import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- *
- * A OutflowWithFilterTest
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * Created 15 Nov 2008 08:58:49
- *
- *
- */
-public class OutflowWithFilterTest extends TestCase
-{
- private static final Logger log = Logger.getLogger(OutflowWithFilterTest.class);
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private MessagingService service0;
-
- private MessagingService service1;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testWithWildcard() throws Exception
- {
- Configuration service0Conf = new ConfigurationImpl();
- service0Conf.setClustered(true);
- service0Conf.setSecurityEnabled(false);
- Map<String, Object> service0Params = new HashMap<String, Object>();
- service0Params.put(TransportConstants.SERVER_ID_PROP_NAME, 0);
- service0Conf.getAcceptorConfigurations()
- .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
- service0Params));
-
- Configuration service1Conf = new ConfigurationImpl();
- service1Conf.setClustered(true);
- service1Conf.setSecurityEnabled(false);
- Map<String, Object> service1Params = new HashMap<String, Object>();
- service1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
-
- service1Conf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
- service1Params));
- service1 = MessagingServiceImpl.newNullStorageMessagingServer(service1Conf);
- service1.start();
-
- List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
- TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
- service1Params);
- connectors.add(server1tc);
-
- final SimpleString address1 = new SimpleString("testaddress");
-
- final String filter = "selectorkey='ORANGES'";
-
- MessageFlowConfiguration ofconfig = new MessageFlowConfiguration("outflow1", address1.toString(), filter, true, 1, 0, null, connectors);
- Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
- ofconfigs.add(ofconfig);
- service0Conf.setMessageFlowConfigurations(ofconfigs);
-
- service0 = MessagingServiceImpl.newNullStorageMessagingServer(service0Conf);
- service0.start();
-
- TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
- service0Params);
-
- ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
-
- ClientSession session0 = csf0.createSession(false, true, true);
-
- ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
-
- ClientSession session1 = csf1.createSession(false, true, true);
-
- session0.createQueue(address1, address1, null, false, false, true);
-
- session1.createQueue(address1, address1, null, false, false, true);
-
- ClientProducer prod0_1 = session0.createProducer(address1);
-
- ClientConsumer cons0_1 = session0.createConsumer(address1);
-
- ClientConsumer cons1_1 = session1.createConsumer(address1);
-
- session0.start();
-
- session1.start();
-
- final int numMessages = 100;
-
- final SimpleString propKey = new SimpleString("testkey");
-
- final SimpleString propKey2 = new SimpleString("selectorkey");
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session0.createClientMessage(false);
- message.putIntProperty(propKey, i);
- message.putStringProperty(propKey2, new SimpleString("ORANGES"));
- message.getBody().flip();
-
- prod0_1.send(message);
- }
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session0.createClientMessage(false);
- message.putIntProperty(propKey, i);
- message.putStringProperty(propKey2, new SimpleString("APPLES"));
- message.getBody().flip();
-
- prod0_1.send(message);
- }
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage rmessage1 = cons0_1.receive(1000);
-
- assertNotNull(rmessage1);
-
- assertEquals(i, rmessage1.getProperty(propKey));
-
- ClientMessage rmessage2 = cons1_1.receive(1000);
-
- assertNotNull(rmessage2);
-
- assertEquals(i, rmessage2.getProperty(propKey));
- }
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage rmessage1 = cons0_1.receive(1000);
-
- assertNotNull(rmessage1);
-
- assertEquals(i, rmessage1.getProperty(propKey));
- }
-
- ClientMessage rmessage1 = cons0_1.receiveImmediate();
-
- assertNull(rmessage1);
-
- ClientMessage rmessage2 = cons1_1.receiveImmediate();
-
- assertNull(rmessage2);
-
- session0.close();
-
- session1.close();
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- @Override
- protected void setUp() throws Exception
- {
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- service0.stop();
-
- assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
-
- service1.stop();
-
- assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
-
- assertEquals(0, InVMRegistry.instance.size());
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
-
Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithWildcardTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithWildcardTest.java 2008-11-21 10:32:48 UTC (rev 5415)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithWildcardTest.java 2008-11-21 13:59:07 UTC (rev 5416)
@@ -1,308 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
- * Middleware LLC, and individual contributors by the @authors tag. See the
- * copyright.txt in the distribution for a full listing of individual
- * contributors.
- *
- * This is free software; you can redistribute it and/or modify it under the
- * terms of the GNU Lesser General Public License as published by the Free
- * Software Foundation; either version 2.1 of the License, or (at your option)
- * any later version.
- *
- * This software is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- * details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this software; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
- * site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.integration.cluster.distribution;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import junit.framework.TestCase;
-
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
-import org.jboss.messaging.core.config.impl.ConfigurationImpl;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
-import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
-import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- *
- * A OutflowWithWildcardTest
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * Created 15 Nov 2008 08:19:07
- *
- *
- */
-public class OutflowWithWildcardTest extends TestCase
-{
- private static final Logger log = Logger.getLogger(OutflowWithWildcardTest.class);
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private MessagingService service0;
-
- private MessagingService service1;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testWithWildcard() throws Exception
- {
- Configuration service0Conf = new ConfigurationImpl();
- service0Conf.setClustered(true);
- service0Conf.setSecurityEnabled(false);
- Map<String, Object> service0Params = new HashMap<String, Object>();
- service0Params.put(TransportConstants.SERVER_ID_PROP_NAME, 0);
- service0Conf.getAcceptorConfigurations()
- .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
- service0Params));
-
- Configuration service1Conf = new ConfigurationImpl();
- service1Conf.setClustered(true);
- service1Conf.setSecurityEnabled(false);
- Map<String, Object> service1Params = new HashMap<String, Object>();
- service1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
-
- service1Conf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
- service1Params));
- service1 = MessagingServiceImpl.newNullStorageMessagingServer(service1Conf);
- service1.start();
-
- List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
- TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
- service1Params);
- connectors.add(server1tc);
-
- final SimpleString address1 = new SimpleString("cheese.stilton");
-
- final SimpleString address2 = new SimpleString("cheese.wensleydale");
-
- final SimpleString address3 = new SimpleString("wine.shiraz");
-
- final SimpleString address4 = new SimpleString("wine.cabernet");
-
- final SimpleString match1 = new SimpleString("cheese.#");
-
-
- MessageFlowConfiguration ofconfig = new MessageFlowConfiguration("outflow1", match1.toString(), null, true, 1, 0, null, connectors);
- Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
- ofconfigs.add(ofconfig);
- service0Conf.setMessageFlowConfigurations(ofconfigs);
-
- service0 = MessagingServiceImpl.newNullStorageMessagingServer(service0Conf);
- service0.start();
-
- TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
- service0Params);
-
- ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
-
- ClientSession session0 = csf0.createSession(false, true, true);
-
- ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
-
- ClientSession session1 = csf1.createSession(false, true, true);
-
- session0.createQueue(address1, address1, null, false, false, true);
- session0.createQueue(address2, address2, null, false, false, true);
- session0.createQueue(address3, address3, null, false, false, true);
- session0.createQueue(address4, address4, null, false, false, true);
-
- session1.createQueue(address1, address1, null, false, false, true);
- session1.createQueue(address2, address2, null, false, false, true);
- session1.createQueue(address3, address3, null, false, false, true);
- session1.createQueue(address4, address4, null, false, false, true);
-
- ClientProducer prod0_1 = session0.createProducer(address1);
- ClientProducer prod0_2 = session0.createProducer(address2);
- ClientProducer prod0_3 = session0.createProducer(address3);
- ClientProducer prod0_4 = session0.createProducer(address4);
-
- ClientConsumer cons0_1 = session0.createConsumer(address1);
- ClientConsumer cons0_2 = session0.createConsumer(address2);
- ClientConsumer cons0_3 = session0.createConsumer(address3);
- ClientConsumer cons0_4 = session0.createConsumer(address4);
-
- ClientConsumer cons1_1 = session1.createConsumer(address1);
- ClientConsumer cons1_2 = session1.createConsumer(address2);
- ClientConsumer cons1_3 = session1.createConsumer(address3);
- ClientConsumer cons1_4 = session1.createConsumer(address4);
-
- session0.start();
-
- session1.start();
-
- final int numMessages = 100;
-
- final SimpleString propKey = new SimpleString("testkey");
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session0.createClientMessage(false);
- message.putIntProperty(propKey, i);
- message.getBody().flip();
-
- prod0_1.send(message);
- }
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session0.createClientMessage(false);
- message.putIntProperty(propKey, i);
- message.getBody().flip();
-
- prod0_2.send(message);
- }
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session0.createClientMessage(false);
- message.putIntProperty(propKey, i);
- message.getBody().flip();
-
- prod0_3.send(message);
- }
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session0.createClientMessage(false);
- message.putIntProperty(propKey, i);
- message.getBody().flip();
-
- prod0_4.send(message);
- }
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage rmessage1 = cons0_1.receive(1000);
-
- assertNotNull(rmessage1);
-
- assertEquals(i, rmessage1.getProperty(propKey));
-
- ClientMessage rmessage2 = cons0_2.receive(1000);
-
- assertNotNull(rmessage2);
-
- assertEquals(i, rmessage2.getProperty(propKey));
-
- ClientMessage rmessage3 = cons0_3.receive(1000);
-
- assertNotNull(rmessage3);
-
- assertEquals(i, rmessage3.getProperty(propKey));
-
- ClientMessage rmessage4 = cons0_4.receive(1000);
-
- assertNotNull(rmessage4);
-
- assertEquals(i, rmessage4.getProperty(propKey));
- }
-
- ClientMessage rmessage1 = cons0_1.receiveImmediate();
-
- assertNull(rmessage1);
-
- ClientMessage rmessage2 = cons0_2.receiveImmediate();
-
- assertNull(rmessage2);
-
- ClientMessage rmessage3 = cons0_3.receiveImmediate();
-
- assertNull(rmessage3);
-
- ClientMessage rmessage4 = cons0_4.receiveImmediate();
-
- assertNull(rmessage4);
-
- for (int i = 0; i < numMessages; i++)
- {
- rmessage1 = cons1_1.receive(1000);
-
- assertNotNull(rmessage1);
-
- assertEquals(i, rmessage1.getProperty(propKey));
-
- rmessage2 = cons1_2.receive(1000);
-
- assertNotNull(rmessage2);
-
- assertEquals(i, rmessage2.getProperty(propKey));
- }
-
- rmessage1 = cons1_1.receiveImmediate();
-
- assertNull(rmessage1);
-
- rmessage2 = cons1_2.receiveImmediate();
-
- assertNull(rmessage2);
-
- rmessage3 = cons1_3.receiveImmediate();
-
- assertNull(rmessage3);
-
- rmessage4 = cons1_4.receiveImmediate();
-
- assertNull(rmessage4);
-
- session0.close();
-
- session1.close();
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- @Override
- protected void setUp() throws Exception
- {
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- service0.stop();
-
- assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
-
- service1.stop();
-
- assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
-
- assertEquals(0, InVMRegistry.instance.size());
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
-
Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SimpleOutflowTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SimpleOutflowTest.java 2008-11-21 10:32:48 UTC (rev 5415)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SimpleOutflowTest.java 2008-11-21 13:59:07 UTC (rev 5416)
@@ -1,309 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
- * Middleware LLC, and individual contributors by the @authors tag. See the
- * copyright.txt in the distribution for a full listing of individual
- * contributors.
- *
- * This is free software; you can redistribute it and/or modify it under the
- * terms of the GNU Lesser General Public License as published by the Free
- * Software Foundation; either version 2.1 of the License, or (at your option)
- * any later version.
- *
- * This software is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- * details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this software; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
- * site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.integration.cluster.distribution;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import junit.framework.TestCase;
-
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
-import org.jboss.messaging.core.config.impl.ConfigurationImpl;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
-import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
-import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- *
- * A ActivationTimeoutTest
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * Created 4 Nov 2008 16:54:50
- *
- *
- */
-public class SimpleOutflowTest extends TestCase
-{
- private static final Logger log = Logger.getLogger(SimpleOutflowTest.class);
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private MessagingService service0;
-
- private MessagingService service1;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testSimpleOutflowFanout() throws Exception
- {
- Configuration service0Conf = new ConfigurationImpl();
- service0Conf.setClustered(true);
- service0Conf.setSecurityEnabled(false);
- Map<String, Object> service0Params = new HashMap<String, Object>();
- service0Params.put(TransportConstants.SERVER_ID_PROP_NAME, 0);
- service0Conf.getAcceptorConfigurations()
- .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
- service0Params));
-
- Configuration service1Conf = new ConfigurationImpl();
- service1Conf.setClustered(true);
- service1Conf.setSecurityEnabled(false);
- Map<String, Object> service1Params = new HashMap<String, Object>();
- service1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
-
- service1Conf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
- service1Params));
- service1 = MessagingServiceImpl.newNullStorageMessagingServer(service1Conf);
- service1.start();
-
- List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
- TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
- service1Params);
- connectors.add(server1tc);
-
- final SimpleString testAddress = new SimpleString("testaddress");
-
- MessageFlowConfiguration ofconfig = new MessageFlowConfiguration("outflow1", testAddress.toString(), null, true, 1, 0, null, connectors);
- Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
- ofconfigs.add(ofconfig);
- service0Conf.setMessageFlowConfigurations(ofconfigs);
-
- service0 = MessagingServiceImpl.newNullStorageMessagingServer(service0Conf);
- service0.start();
-
- TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
- service0Params);
-
- ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
-
- ClientSession session0 = csf0.createSession(false, true, true);
-
- ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
-
- ClientSession session1 = csf1.createSession(false, true, true);
-
- session0.createQueue(testAddress, testAddress, null, false, false, true);
-
- session1.createQueue(testAddress, testAddress, null, false, false, true);
-
- ClientProducer prod0 = session0.createProducer(testAddress);
-
- ClientConsumer cons0 = session0.createConsumer(testAddress);
-
- ClientConsumer cons1 = session1.createConsumer(testAddress);
-
- session0.start();
-
- session1.start();
-
- final int numMessages = 100;
-
- final SimpleString propKey = new SimpleString("testkey");
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session0.createClientMessage(false);
- message.putIntProperty(propKey, i);
- message.getBody().flip();
-
- prod0.send(message);
- }
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage rmessage0 = cons0.receive(1000);
-
- assertNotNull(rmessage0);
-
- assertEquals(i, rmessage0.getProperty(propKey));
-
- ClientMessage rmessage1 = cons1.receive(1000);
-
- assertNotNull(rmessage1);
-
- assertEquals(i, rmessage1.getProperty(propKey));
- }
- }
-
- public void testSimpleOutflowRoundRobin() throws Exception
- {
- Configuration service0Conf = new ConfigurationImpl();
- service0Conf.setClustered(true);
- service0Conf.setSecurityEnabled(false);
- Map<String, Object> service0Params = new HashMap<String, Object>();
- service0Params.put(TransportConstants.SERVER_ID_PROP_NAME, 0);
- service0Conf.getAcceptorConfigurations()
- .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
- service0Params));
-
- Configuration service1Conf = new ConfigurationImpl();
- service1Conf.setClustered(true);
- service1Conf.setSecurityEnabled(false);
- Map<String, Object> service1Params = new HashMap<String, Object>();
- service1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
-
- service1Conf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
- service1Params));
- service1 = MessagingServiceImpl.newNullStorageMessagingServer(service1Conf);
- service1.start();
-
- List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
- TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
- service1Params);
- connectors.add(server1tc);
-
- final SimpleString testAddress = new SimpleString("testaddress");
-
- MessageFlowConfiguration ofconfig = new MessageFlowConfiguration("outflow1", testAddress.toString(), null, false, 1, 0, null, connectors);
- Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
- ofconfigs.add(ofconfig);
- service0Conf.setMessageFlowConfigurations(ofconfigs);
-
- service0 = MessagingServiceImpl.newNullStorageMessagingServer(service0Conf);
- service0.start();
-
- TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
- service0Params);
-
- ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
-
- ClientSession session0 = csf0.createSession(false, true, true);
-
- ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
-
- ClientSession session1 = csf1.createSession(false, true, true);
-
- session0.createQueue(testAddress, testAddress, null, false, false, false);
-
- session1.createQueue(testAddress, testAddress, null, false, false, false);
-
- ClientProducer prod0 = session0.createProducer(testAddress);
-
- ClientConsumer cons0 = session0.createConsumer(testAddress);
-
- ClientConsumer cons1 = session1.createConsumer(testAddress);
-
- session0.start();
-
- session1.start();
-
- final int numMessages = 100;
-
- final SimpleString propKey = new SimpleString("testkey");
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session0.createClientMessage(false);
- message.putIntProperty(propKey, i);
- message.getBody().flip();
-
- prod0.send(message);
- }
-
- ClientMessage msg = cons0.receive(1000);
-
- boolean toggle = msg != null;
-
- int i;
- if (toggle)
- {
- assertEquals(0, msg.getProperty(propKey));
-
- i = 1;
- }
- else
- {
- i = 0;
- }
-
- for (; i < numMessages; i++)
- {
- if (!toggle)
- {
- ClientMessage rmessage0 = cons0.receive(1000);
-
- assertNotNull(rmessage0);
-
- assertEquals(i, rmessage0.getProperty(propKey));
- }
- else
- {
- ClientMessage rmessage1 = cons1.receive(1000);
-
- assertNotNull(rmessage1);
-
- assertEquals(i, rmessage1.getProperty(propKey));
- }
-
- toggle = !toggle;
- }
- }
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- @Override
- protected void setUp() throws Exception
- {
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- service0.stop();
-
- assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
-
- service1.stop();
-
- assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
-
- assertEquals(0, InVMRegistry.instance.size());
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverPreCommitMessageTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverPreCommitMessageTest.java 2008-11-21 10:32:48 UTC (rev 5415)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverPreCommitMessageTest.java 2008-11-21 13:59:07 UTC (rev 5416)
@@ -70,10 +70,6 @@
// Public --------------------------------------------------------
- /*
- * Set messages to expire very soon, send a load of them, so at some of them get expired when they reach the client
- * After failover make sure all are received ok
- */
public void testPreCommitFailoverTest() throws Exception
{
ClientSessionFactoryInternal sf1 = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
@@ -134,10 +130,6 @@
{
message.acknowledge();
- //We sleep a little to make sure messages aren't consumed too quickly and some
- //will expire before reaching consumer
- Thread.sleep(1);
-
count++;
}
else
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/queue/DeadLetterAddressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/queue/DeadLetterAddressTest.java 2008-11-21 10:32:48 UTC (rev 5415)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/queue/DeadLetterAddressTest.java 2008-11-21 13:59:07 UTC (rev 5416)
@@ -21,29 +21,29 @@
*/
package org.jboss.messaging.tests.integration.queue;
-import org.jboss.messaging.tests.util.UnitTestCase;
-import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
-import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.message.impl.MessageImpl;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.impl.XidImpl;
-import org.jboss.messaging.core.message.impl.MessageImpl;
+import org.jboss.messaging.tests.util.UnitTestCase;
import org.jboss.messaging.util.SimpleString;
-import org.jboss.messaging.jms.client.JBossMessage;
-import javax.transaction.xa.Xid;
-import javax.transaction.xa.XAResource;
-import java.util.Map;
-import java.util.HashMap;
-
/**
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
*/
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/queue/ExpiryAddressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/queue/ExpiryAddressTest.java 2008-11-21 10:32:48 UTC (rev 5415)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/queue/ExpiryAddressTest.java 2008-11-21 13:59:07 UTC (rev 5416)
@@ -21,30 +21,23 @@
*/
package org.jboss.messaging.tests.integration.queue;
-import org.jboss.messaging.tests.util.UnitTestCase;
-import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientProducer;
+import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_ACTUAL_EXPIRY_TIME;
+
import org.jboss.messaging.core.client.ClientConsumer;
import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.client.ClientSessionFactory;
import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.settings.impl.QueueSettings;
-import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.transaction.impl.XidImpl;
-import org.jboss.messaging.core.message.impl.MessageImpl;
-import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_ACTUAL_EXPIRY_TIME;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.tests.util.UnitTestCase;
import org.jboss.messaging.util.SimpleString;
-import javax.transaction.xa.Xid;
-import javax.transaction.xa.XAResource;
-import java.util.Map;
-import java.util.HashMap;
-
-
/**
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
*/
@@ -136,7 +129,7 @@
clientConsumer.close();
}
- public void testHeadersSet() throws Exception
+ public void testHeadersSet() throws Exception
{
final int NUM_MESSAGES = 5;
SimpleString ea = new SimpleString("DLA");
@@ -151,7 +144,7 @@
ClientSession sendSession = sessionFactory.createSession(false, true, true);
ClientProducer producer = sendSession.createProducer(qName);
- long expiration = System.currentTimeMillis();
+ long expiration = System.currentTimeMillis();
for (int i = 0; i < NUM_MESSAGES; i++)
{
ClientMessage tm = createTextMessage("Message:" + i, clientSession);
@@ -163,7 +156,7 @@
clientSession.start();
ClientMessage m = clientConsumer.receive(1000);
assertNull(m);
- //All the messages should now be in the EQ
+ // All the messages should now be in the EQ
ClientConsumer cc3 = clientSession.createConsumer(eq);
@@ -177,7 +170,7 @@
assertEquals("Message:" + i, text);
// Check the headers
- Long actualExpiryTime = (Long) tm.getProperty(HDR_ACTUAL_EXPIRY_TIME);
+ Long actualExpiryTime = (Long)tm.getProperty(HDR_ACTUAL_EXPIRY_TIME);
assertTrue(actualExpiryTime >= expiration);
}
@@ -191,9 +184,9 @@
TransportConfiguration transportConfig = new TransportConfiguration(INVM_ACCEPTOR_FACTORY);
configuration.getAcceptorConfigurations().add(transportConfig);
messagingService = MessagingServiceImpl.newNullStorageMessagingServer(configuration);
- //start the server
+ // start the server
messagingService.start();
- //then we create a client as normal
+ // then we create a client as normal
ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
clientSession = sessionFactory.createSession(true, true, false);
}
@@ -228,4 +221,3 @@
}
}
-
More information about the jboss-cvs-commits
mailing list