[jboss-cvs] JBoss Messaging SVN: r5416 - in trunk: src/main/org/jboss/messaging/core/server/cluster/impl and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Nov 21 08:59:08 EST 2008


Author: timfox
Date: 2008-11-21 08:59:07 -0500 (Fri, 21 Nov 2008)
New Revision: 5416

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/BasicMessageFlowTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowBatchSizeTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWildcardTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWithFilterTest.java
Removed:
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowBatchSizeTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithFilterTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithWildcardTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SimpleOutflowTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverPreCommitMessageTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/queue/DeadLetterAddressTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/queue/ExpiryAddressTest.java
Log:
More tests mainly


Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-11-21 10:32:48 UTC (rev 5415)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-11-21 13:59:07 UTC (rev 5416)
@@ -218,7 +218,7 @@
                                           final boolean fanout) throws Exception
    {
       Binding binding = createBinding(address, queueName, filter, durable, temporary, fanout);
-
+      
       addBindingInMemory(binding);
 
       if (durable)
@@ -317,7 +317,7 @@
                      //Can be determined from the number of consumers on the queue
                      long routings = binding.getRoutings();
                      
-                     if (routings <= lowestRoutings || lowestRoutings == -1)
+                     if (routings < lowestRoutings || lowestRoutings == -1)
                      {                        
                         lowestRoutings = routings;
                         

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java	2008-11-21 10:32:48 UTC (rev 5415)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java	2008-11-21 13:59:07 UTC (rev 5416)
@@ -33,6 +33,7 @@
 import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
 import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
 import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
+import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.server.cluster.BroadcastGroup;
@@ -56,6 +57,8 @@
  */
 public class ClusterManagerImpl implements ClusterManager
 {
+   private static final Logger log = Logger.getLogger(ClusterManagerImpl.class);
+
    private final Map<String, BroadcastGroup> broadcastGroups = new HashMap<String, BroadcastGroup>();
 
    private final Map<String, DiscoveryGroup> discoveryGroups = new HashMap<String, DiscoveryGroup>();
@@ -97,18 +100,18 @@
       {
          return;
       }
-      
-      for (BroadcastGroup group: broadcastGroups.values())
+
+      for (BroadcastGroup group : broadcastGroups.values())
       {
          group.start();
       }
-      
-      for (DiscoveryGroup group: discoveryGroups.values())
+
+      for (DiscoveryGroup group : discoveryGroups.values())
       {
          group.start();
       }
-      
-      for (MessageFlow flow: this.messageFlows.values())
+
+      for (MessageFlow flow : this.messageFlows.values())
       {
          flow.start();
       }
@@ -122,18 +125,18 @@
       {
          return;
       }
-      
-      for (BroadcastGroup group: broadcastGroups.values())
+
+      for (BroadcastGroup group : broadcastGroups.values())
       {
          group.stop();
       }
-      
-      for (DiscoveryGroup group: discoveryGroups.values())
+
+      for (DiscoveryGroup group : discoveryGroups.values())
       {
          group.stop();
       }
-      
-      for (MessageFlow flow: this.messageFlows.values())
+
+      for (MessageFlow flow : this.messageFlows.values())
       {
          flow.stop();
       }
@@ -150,8 +153,10 @@
    {
       if (broadcastGroups.containsKey(config.getName()))
       {
-         throw new IllegalArgumentException("There is already a broadcast-group with name " + config.getName() +
-                                            " deployed");
+         log.warn("There is already a broadcast-group with name " + config.getName() +
+                  " deployed. This one will not be deployed.");
+         
+         return;
       }
 
       InetAddress localBindAddress = InetAddress.getByName(config.getLocalBindAddress());
@@ -179,8 +184,10 @@
    {
       if (discoveryGroups.containsKey(config.getName()))
       {
-         throw new IllegalArgumentException("There is already a discovery-group with name " + config.getName() +
-                                            " deployed");
+         log.warn("There is already a discovery-group with name " + config.getName() +
+                  " deployed. This one will not be deployed.");
+
+         return;
       }
 
       InetAddress groupAddress = InetAddress.getByName(config.getGroupAddress());
@@ -194,11 +201,41 @@
 
    public synchronized void deployMessageFlow(final MessageFlowConfiguration config) throws Exception
    {
+      if (config.getName() == null)
+      {
+         log.warn("Must specify a unique name for each message flow. This one will not be deployed.");
+
+         return;
+      }
+      
+      if (config.getAddress() == null)
+      {
+         log.warn("Must specify an address each message flow. This one will not be deployed.");
+
+         return;
+      }
+      
       if (messageFlows.containsKey(config.getName()))
       {
-         throw new IllegalArgumentException("There is already a message-flow with name " + config.getName() +
-                                            " deployed");
+         log.warn("There is already a message-flow with name " + config.getName() +
+                  " deployed. This one will not be deployed.");
+
+         return;
       }
+      
+      if (config.getMaxBatchTime() == 0 || config.getMaxBatchTime() < -1)
+      {
+         log.warn("Invalid value for max-batch-time. Valid values are -1 or > 0");
+         
+         return;
+      }
+      
+      if (config.getMaxBatchSize() < 1)
+      {
+         log.warn("Invalid value for max-batch-size. Valid values are > 0");
+         
+         return;
+      }
 
       Transformer transformer = null;
 
@@ -235,6 +272,7 @@
                                     storageManager,
                                     postOffice,
                                     queueSettingsRepository,
+                                    scheduledExecutor,
                                     transformer,
                                     config.getConnectors());
       }
@@ -246,8 +284,10 @@
 
          if (group == null)
          {
-            throw new IllegalArgumentException("There is no discovery-group with name " + config.getDiscoveryGroupName() +
-                                               " deployed");
+            log.warn("There is no discovery-group with name " + config.getDiscoveryGroupName() +
+                     " deployed. This one will not be deployed.");
+
+            return;
          }
 
          flow = new MessageFlowImpl(new SimpleString(config.getName()),
@@ -261,6 +301,7 @@
                                     storageManager,
                                     postOffice,
                                     queueSettingsRepository,
+                                    scheduledExecutor,
                                     transformer,
                                     group);
       }

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java	2008-11-21 10:32:48 UTC (rev 5415)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java	2008-11-21 13:59:07 UTC (rev 5416)
@@ -24,6 +24,9 @@
 
 import java.util.LinkedList;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 import org.jboss.messaging.core.client.ClientProducer;
 import org.jboss.messaging.core.client.ClientSession;
@@ -77,13 +80,15 @@
    private java.util.Queue<MessageReference> refs = new LinkedList<MessageReference>();
 
    private Transaction tx;
-
+   
+   private long lastReceivedTime = -1;
+   
    private final StorageManager storageManager;
 
    private final PostOffice postOffice;
 
    private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
-   
+     
    private final Transformer transformer;
 
    private final ClientSessionFactory csf;
@@ -93,6 +98,8 @@
    private ClientProducer producer;
       
    private volatile boolean started;
+   
+   private final ScheduledFuture<?> future;
 
    // Static --------------------------------------------------------
 
@@ -107,7 +114,8 @@
                         final long maxBatchTime,
                         final StorageManager storageManager,
                         final PostOffice postOffice,
-                        final HierarchicalRepository<QueueSettings> queueSettingsRepository,                        
+                        final HierarchicalRepository<QueueSettings> queueSettingsRepository,  
+                        final ScheduledExecutorService scheduledExecutor,
                         final Transformer transformer) throws Exception
    {
       this.queue = queue;
@@ -126,7 +134,16 @@
       
       this.transformer = transformer;
       
-      this.csf = new ClientSessionFactoryImpl(connectorConfig);      
+      this.csf = new ClientSessionFactoryImpl(connectorConfig);  
+      
+      if (maxBatchTime != -1)
+      {
+         future = scheduledExecutor.scheduleAtFixedRate(new BatchTimeout(), maxBatchTime, maxBatchTime, TimeUnit.MILLISECONDS);
+      }
+      else
+      {
+         future = null;
+      }
    }
    
    public synchronized void start() throws Exception
@@ -152,6 +169,11 @@
       started = false;
 
       queue.removeConsumer(this);
+      
+      if (future != null)
+      {
+         future.cancel(false);
+      }
 
       // Wait until all batches are complete
 
@@ -177,7 +199,7 @@
    }
 
    // Consumer implementation ---------------------------------------
-
+   
    public HandleStatus handle(final MessageReference reference) throws Exception
    {
       if (busy)
@@ -193,6 +215,11 @@
          }
 
          refs.add(reference);
+         
+         if (maxBatchTime != -1)
+         {
+            lastReceivedTime = System.currentTimeMillis();
+         }
 
          count++;
 
@@ -213,12 +240,35 @@
 
    // Private -------------------------------------------------------
 
+   private synchronized void timeoutBatch()
+   {
+      if (!started)
+      {
+         return;
+      }
+      
+      if (lastReceivedTime != -1 && count > 0)
+      {
+         long now = System.currentTimeMillis();
+         
+         if (now - lastReceivedTime >= maxBatchTime)
+         {
+            sendBatch();
+         }
+      }      
+   }
+   
    private void sendBatch()
    {
       try
       {
          synchronized (this)
          {
+            if (count == 0)
+            {
+               return;
+            }
+            
             // TODO - duplicate detection on sendee and if batch size = 1 then don't need tx
 
             while (true)
@@ -284,5 +334,13 @@
          sendBatch();
       }
    }
+   
+   private class BatchTimeout implements Runnable
+   {
+      public void run()
+      {
+         timeoutBatch();
+      }
+   }
 
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java	2008-11-21 10:32:48 UTC (rev 5415)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java	2008-11-21 13:59:07 UTC (rev 5416)
@@ -28,6 +28,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.filter.Filter;
@@ -85,6 +86,8 @@
    private Map<TransportConfiguration, Forwarder> forwarders = new HashMap<TransportConfiguration, Forwarder>();
 
    private final DiscoveryGroup discoveryGroup;
+   
+   private final ScheduledExecutorService scheduledExecutor;
 
    private volatile boolean started;
 
@@ -101,6 +104,7 @@
                           final StorageManager storageManager,
                           final PostOffice postOffice,
                           final HierarchicalRepository<QueueSettings> queueSettingsRepository,
+                          final ScheduledExecutorService scheduledExecutor,
                           final Transformer transformer,
                           final List<TransportConfiguration> connectors) throws Exception
    {
@@ -127,6 +131,8 @@
       this.transformer = transformer;
 
       this.discoveryGroup = null;
+      
+      this.scheduledExecutor = scheduledExecutor;
 
       this.updateConnectors(connectors);
    }
@@ -144,6 +150,7 @@
                           final StorageManager storageManager,
                           final PostOffice postOffice,
                           final HierarchicalRepository<QueueSettings> queueSettingsRepository,
+                          final ScheduledExecutorService scheduledExecutor,
                           final Transformer transformer,
                           final DiscoveryGroup discoveryGroup) throws Exception
    {
@@ -166,6 +173,8 @@
       this.postOffice = postOffice;
 
       this.queueSettingsRepository = queueSettingsRepository;
+      
+      this.scheduledExecutor = scheduledExecutor;
 
       this.transformer = transformer;
 
@@ -278,6 +287,7 @@
                                                     storageManager,
                                                     postOffice,
                                                     queueSettingsRepository,
+                                                    scheduledExecutor,
                                                     transformer);
 
             forwarders.put(connector, forwarder);

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java	2008-11-21 10:32:48 UTC (rev 5415)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java	2008-11-21 13:59:07 UTC (rev 5416)
@@ -133,7 +133,7 @@
       {
          storageManager.updateDeliveryCount(this);
       }
-      
+
       QueueSettings queueSettings = queueSettingsRepository.getMatch(queue.getName().toString());
       int maxDeliveries = queueSettings.getMaxDeliveryAttempts();
 
@@ -147,11 +147,11 @@
       else
       {
          long redeliveryDelay = queueSettings.getRedeliveryDelay();
-         
+
          if (redeliveryDelay > 0)
          {
             scheduledDeliveryTime = System.currentTimeMillis() + redeliveryDelay;
-            
+
             storageManager.updateScheduledDeliveryTime(this);
          }
          queue.referenceCancelled();
@@ -161,16 +161,19 @@
    }
 
    public void sendToDeadLetterAddress(final StorageManager persistenceManager,
-                         final PostOffice postOffice,
-                         final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
+                                       final PostOffice postOffice,
+                                       final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
    {
-      SimpleString deadLetterAddress = queueSettingsRepository.getMatch(queue.getName().toString()).getDeadLetterAddress();
+      SimpleString deadLetterAddress = queueSettingsRepository.getMatch(queue.getName().toString())
+                                                              .getDeadLetterAddress();
       if (deadLetterAddress != null)
       {
          List<Binding> bindingList = postOffice.getBindingsForAddress(deadLetterAddress);
-         if(bindingList == null || bindingList.size() == 0)
+         
+         if (bindingList.isEmpty())
          {
-             log.warn("Message has exceeded max delivery attempts. No bindings for Dead Letter Address " + deadLetterAddress + " so dropping it");
+            log.warn("Message has exceeded max delivery attempts. No bindings for Dead Letter Address " + deadLetterAddress +
+                     " so dropping it");
          }
          else
          {
@@ -179,8 +182,9 @@
       }
       else
       {
-         log.warn("Message has exceeded max delivery attempts. No Dead Letter Address configured for queue " + queue.getName() + " so dropping it");
-         
+         log.warn("Message has exceeded max delivery attempts. No Dead Letter Address configured for queue " + queue.getName() +
+                  " so dropping it");
+
          Transaction tx = new TransactionImpl(persistenceManager, postOffice);
          tx.addAcknowledgement(this);
          tx.commit();
@@ -196,9 +200,10 @@
       if (expiryAddress != null)
       {
          List<Binding> bindingList = postOffice.getBindingsForAddress(expiryAddress);
-         if(bindingList == null || bindingList.size() == 0)
+         
+         if (bindingList.isEmpty())
          {
-             log.warn("Message has expired. No bindings for Expiry Address " + expiryAddress + " so dropping it");
+            log.warn("Message has expired. No bindings for Expiry Address " + expiryAddress + " so dropping it");
          }
          else
          {
@@ -244,7 +249,7 @@
       Transaction tx = new TransactionImpl(persistenceManager, postOffice);
 
       ServerMessage copyMessage = makeCopy(expiry, persistenceManager);
-      
+
       copyMessage.setDestination(otherBinding.getAddress());
 
       tx.addMessage(copyMessage);
@@ -254,7 +259,7 @@
       tx.commit();
    }
 
-    private void move(final SimpleString address,
+   private void move(final SimpleString address,
                      final StorageManager persistenceManager,
                      final PostOffice postOffice,
                      final boolean expiry) throws Exception

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/BasicMessageFlowTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/BasicMessageFlowTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/BasicMessageFlowTest.java	2008-11-21 13:59:07 UTC (rev 5416)
@@ -0,0 +1,786 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ * 
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ * 
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.distribution;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * 
+ * A ActivationTimeoutTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 4 Nov 2008 16:54:50
+ *
+ *
+ */
+public class BasicMessageFlowTest extends MessageFlowTestBase
+{
+   private static final Logger log = Logger.getLogger(BasicMessageFlowTest.class);
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testStaticListOutflow() throws Exception
+   {     
+      Map<String, Object> service0Params = new HashMap<String, Object>();      
+      MessagingService service0 = createMessagingService(0, service0Params);
+      
+      Map<String, Object> service1Params = new HashMap<String, Object>();      
+      MessagingService service1 = createMessagingService(1, service1Params);      
+      service1.start();
+      
+      Map<String, Object> service2Params = new HashMap<String, Object>();      
+      MessagingService service2 = createMessagingService(2, service2Params);      
+      service2.start();
+      
+      Map<String, Object> service3Params = new HashMap<String, Object>();      
+      MessagingService service3 = createMessagingService(3, service3Params);      
+      service3.start();
+      
+      Map<String, Object> service4Params = new HashMap<String, Object>();      
+      MessagingService service4 = createMessagingService(4, service4Params);      
+      service4.start();
+
+      List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+      
+      TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service1Params);
+      connectors.add(server1tc);
+      
+      TransportConfiguration server2tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service2Params);
+      connectors.add(server2tc);
+      
+      TransportConfiguration server3tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service3Params);
+      connectors.add(server3tc);
+      
+      TransportConfiguration server4tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service4Params);
+      connectors.add(server4tc);
+
+      final SimpleString testAddress = new SimpleString("testaddress");
+
+      MessageFlowConfiguration ofconfig = new MessageFlowConfiguration("outflow1",
+                                                                       testAddress.toString(),
+                                                                       null,
+                                                                       true,
+                                                                       1,
+                                                                       -1,
+                                                                       null,
+                                                                       connectors);
+      Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+      ofconfigs.add(ofconfig);
+      service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
+
+      service0.start();
+
+      TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service0Params);
+
+      ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+      ClientSession session0 = csf0.createSession(false, true, true);
+
+      ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+      ClientSession session1 = csf1.createSession(false, true, true);
+      
+      ClientSessionFactory csf2 = new ClientSessionFactoryImpl(server2tc);
+      ClientSession session2 = csf2.createSession(false, true, true);
+      
+      ClientSessionFactory csf3 = new ClientSessionFactoryImpl(server3tc);
+      ClientSession session3 = csf3.createSession(false, true, true);
+      
+      ClientSessionFactory csf4 = new ClientSessionFactoryImpl(server4tc);
+      ClientSession session4 = csf4.createSession(false, true, true);
+
+      session0.createQueue(testAddress, testAddress, null, false, false, true);
+      session1.createQueue(testAddress, testAddress, null, false, false, true);
+      session2.createQueue(testAddress, testAddress, null, false, false, true);
+      session3.createQueue(testAddress, testAddress, null, false, false, true);
+      session4.createQueue(testAddress, testAddress, null, false, false, true);
+
+      ClientProducer prod0 = session0.createProducer(testAddress);
+
+      ClientConsumer cons0 = session0.createConsumer(testAddress);
+      ClientConsumer cons1 = session1.createConsumer(testAddress);
+      ClientConsumer cons2 = session2.createConsumer(testAddress);
+      ClientConsumer cons3 = session3.createConsumer(testAddress);
+      ClientConsumer cons4 = session4.createConsumer(testAddress);      
+
+      session0.start();
+
+      session1.start();
+      session2.start();
+      session3.start();
+      session4.start();
+
+      final int numMessages = 100;
+
+      final SimpleString propKey = new SimpleString("testkey");
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session0.createClientMessage(false);
+         message.putIntProperty(propKey, i);
+         message.getBody().flip();
+
+         prod0.send(message);
+      }
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage rmessage0 = cons0.receive(1000);
+         assertNotNull(rmessage0);
+         assertEquals(i, rmessage0.getProperty(propKey));
+
+         ClientMessage rmessage1 = cons1.receive(1000);
+         assertNotNull(rmessage1);
+         assertEquals(i, rmessage1.getProperty(propKey));
+         
+         ClientMessage rmessage2 = cons2.receive(1000);
+         assertNotNull(rmessage2);
+         assertEquals(i, rmessage2.getProperty(propKey));
+         
+         ClientMessage rmessage3 = cons3.receive(1000);
+         assertNotNull(rmessage3);
+         assertEquals(i, rmessage3.getProperty(propKey));
+         
+         ClientMessage rmessage4 = cons4.receive(1000);
+         assertNotNull(rmessage4);
+         assertEquals(i, rmessage4.getProperty(propKey));
+      }
+      
+      session0.close();
+      session1.close();
+      session2.close();
+      session3.close();
+      session4.close();
+      
+      service0.stop();      
+      service1.stop();
+      service2.stop();
+      service3.stop();
+      service4.stop();
+      
+      assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+      assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+      assertEquals(0, service2.getServer().getRemotingService().getConnections().size());
+      assertEquals(0, service3.getServer().getRemotingService().getConnections().size());
+      assertEquals(0, service4.getServer().getRemotingService().getConnections().size());
+   }
+
+   public void testStaticListRoundRobin() throws Exception
+   {
+      Map<String, Object> service0Params = new HashMap<String, Object>();      
+      MessagingService service0 = createMessagingService(0, service0Params);
+      
+      Map<String, Object> service1Params = new HashMap<String, Object>();      
+      MessagingService service1 = createMessagingService(1, service1Params);      
+      service1.start();
+      
+      Map<String, Object> service2Params = new HashMap<String, Object>();      
+      MessagingService service2 = createMessagingService(2, service2Params);      
+      service2.start();
+      
+      Map<String, Object> service3Params = new HashMap<String, Object>();      
+      MessagingService service3 = createMessagingService(3, service3Params);      
+      service3.start();
+      
+      Map<String, Object> service4Params = new HashMap<String, Object>();      
+      MessagingService service4 = createMessagingService(4, service4Params);      
+      service4.start();
+
+      List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+      
+      TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service1Params);
+      connectors.add(server1tc);
+      
+      TransportConfiguration server2tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service2Params);
+      connectors.add(server2tc);
+      
+      TransportConfiguration server3tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service3Params);
+      connectors.add(server3tc);
+      
+      TransportConfiguration server4tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service4Params);
+      connectors.add(server4tc);
+
+      final SimpleString testAddress = new SimpleString("testaddress");
+
+      MessageFlowConfiguration ofconfig = new MessageFlowConfiguration("outflow1",
+                                                                       testAddress.toString(),
+                                                                       null,
+                                                                       false,
+                                                                       1,
+                                                                       -1,
+                                                                       null,
+                                                                       connectors);
+      Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+      ofconfigs.add(ofconfig);
+      service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
+
+      service0.start();
+
+      TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service0Params);
+
+      ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+      ClientSession session0 = csf0.createSession(false, true, true);
+
+      ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+      ClientSession session1 = csf1.createSession(false, true, true);
+      
+      ClientSessionFactory csf2 = new ClientSessionFactoryImpl(server2tc);
+      ClientSession session2 = csf2.createSession(false, true, true);
+      
+      ClientSessionFactory csf3 = new ClientSessionFactoryImpl(server3tc);
+      ClientSession session3 = csf3.createSession(false, true, true);
+      
+      ClientSessionFactory csf4 = new ClientSessionFactoryImpl(server4tc);
+      ClientSession session4 = csf4.createSession(false, true, true);
+
+      session0.createQueue(testAddress, testAddress, null, false, false, false);
+      session1.createQueue(testAddress, testAddress, null, false, false, false);
+      session2.createQueue(testAddress, testAddress, null, false, false, false);
+      session3.createQueue(testAddress, testAddress, null, false, false, false);
+      session4.createQueue(testAddress, testAddress, null, false, false, false);
+
+      ClientProducer prod0 = session0.createProducer(testAddress);
+
+      ClientConsumer cons0 = session0.createConsumer(testAddress);
+      ClientConsumer cons1 = session1.createConsumer(testAddress);
+      ClientConsumer cons2 = session2.createConsumer(testAddress);
+      ClientConsumer cons3 = session3.createConsumer(testAddress);
+      ClientConsumer cons4 = session4.createConsumer(testAddress);      
+
+      session0.start();
+
+      session1.start();
+      session2.start();
+      session3.start();
+      session4.start();
+
+      final int numMessages = 10;
+
+      final SimpleString propKey = new SimpleString("testkey");
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session0.createClientMessage(false);
+         message.putIntProperty(propKey, i);
+         message.getBody().flip();
+
+         prod0.send(message);
+      }
+      
+      //Refs should be round-robin'd in the same order the connectors are specified in the outflow
+      //With the local consumer being last since it was created last
+      
+      ArrayList<ClientConsumer> consumers = new ArrayList<ClientConsumer>();
+           
+      consumers.add(cons1);
+      consumers.add(cons2);
+      consumers.add(cons3);
+      consumers.add(cons4);
+      consumers.add(cons0);
+      
+      int count = 0;
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientConsumer consumer = consumers.get(count);
+         
+         count++;
+         if (count == consumers.size())
+         {
+            count = 0;
+         }
+         
+         ClientMessage msg = consumer.receive(1000);
+         
+         assertNotNull(msg);
+         
+         assertEquals(i, msg.getProperty(propKey));
+         
+         msg.acknowledge();
+      }
+      
+      session0.close();
+      session1.close();
+      session2.close();
+      session3.close();
+      session4.close();
+
+      service0.stop();      
+      service1.stop();
+      service2.stop();
+      service3.stop();
+      service4.stop();
+      
+      assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+      assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+      assertEquals(0, service2.getServer().getRemotingService().getConnections().size());
+      assertEquals(0, service3.getServer().getRemotingService().getConnections().size());
+      assertEquals(0, service4.getServer().getRemotingService().getConnections().size());
+   }
+   
+   public void testMultipleFlows() throws Exception
+   {
+      Map<String, Object> service0Params = new HashMap<String, Object>();      
+      MessagingService service0 = createMessagingService(0, service0Params);
+      
+      Map<String, Object> service1Params = new HashMap<String, Object>();      
+      MessagingService service1 = createMessagingService(1, service1Params);      
+      service1.start();
+      
+      Map<String, Object> service2Params = new HashMap<String, Object>();      
+      MessagingService service2 = createMessagingService(2, service2Params);      
+      service2.start();
+      
+      Map<String, Object> service3Params = new HashMap<String, Object>();      
+      MessagingService service3 = createMessagingService(3, service3Params);      
+      service3.start();
+      
+      Map<String, Object> service4Params = new HashMap<String, Object>();      
+      MessagingService service4 = createMessagingService(4, service4Params);      
+      service4.start();
+
+      List<TransportConfiguration> connectors1 = new ArrayList<TransportConfiguration>();      
+      TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service1Params);
+      connectors1.add(server1tc);
+      
+      List<TransportConfiguration> connectors2 = new ArrayList<TransportConfiguration>();
+      TransportConfiguration server2tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service2Params);
+      connectors2.add(server2tc);
+      
+      List<TransportConfiguration> connectors3 = new ArrayList<TransportConfiguration>();
+      TransportConfiguration server3tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service3Params);
+      connectors3.add(server3tc);
+      
+      List<TransportConfiguration> connectors4 = new ArrayList<TransportConfiguration>();
+      TransportConfiguration server4tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service4Params);
+      connectors4.add(server4tc);
+
+      final SimpleString testAddress = new SimpleString("testaddress");
+
+      MessageFlowConfiguration ofconfig1 = new MessageFlowConfiguration("flow1",
+                                                                       testAddress.toString(),
+                                                                       "beatle='john'",
+                                                                       false,
+                                                                       1,
+                                                                       -1,
+                                                                       null,
+                                                                       connectors1);
+      MessageFlowConfiguration ofconfig2 = new MessageFlowConfiguration("flow2",
+                                                                        testAddress.toString(),
+                                                                        "beatle='paul'",
+                                                                        false,
+                                                                        1,
+                                                                        -1,
+                                                                        null,
+                                                                        connectors2);
+      MessageFlowConfiguration ofconfig3 = new MessageFlowConfiguration("flow3",
+                                                                        testAddress.toString(),
+                                                                        "beatle='george'",
+                                                                        false,
+                                                                        1,
+                                                                        -1,
+                                                                        null,
+                                                                        connectors3);
+      MessageFlowConfiguration ofconfig4 = new MessageFlowConfiguration("flow4",
+                                                                        testAddress.toString(),
+                                                                        "beatle='ringo'",
+                                                                        false,
+                                                                        1,
+                                                                        -1,
+                                                                        null,
+                                                                        connectors4);
+                 
+      Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+      ofconfigs.add(ofconfig1);
+      ofconfigs.add(ofconfig2);
+      ofconfigs.add(ofconfig3);
+      ofconfigs.add(ofconfig4);
+      service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
+
+      service0.start();
+
+      TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service0Params);
+
+      ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+      ClientSession session0 = csf0.createSession(false, true, true);
+
+      ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+      ClientSession session1 = csf1.createSession(false, true, true);
+      
+      ClientSessionFactory csf2 = new ClientSessionFactoryImpl(server2tc);
+      ClientSession session2 = csf2.createSession(false, true, true);
+      
+      ClientSessionFactory csf3 = new ClientSessionFactoryImpl(server3tc);
+      ClientSession session3 = csf3.createSession(false, true, true);
+      
+      ClientSessionFactory csf4 = new ClientSessionFactoryImpl(server4tc);
+      ClientSession session4 = csf4.createSession(false, true, true);
+
+      session0.createQueue(testAddress, testAddress, null, false, false, false);
+      session1.createQueue(testAddress, testAddress, null, false, false, false);
+      session2.createQueue(testAddress, testAddress, null, false, false, false);
+      session3.createQueue(testAddress, testAddress, null, false, false, false);
+      session4.createQueue(testAddress, testAddress, null, false, false, false);
+
+      ClientProducer prod0 = session0.createProducer(testAddress);
+
+      ClientConsumer cons1 = session1.createConsumer(testAddress);
+      ClientConsumer cons2 = session2.createConsumer(testAddress);
+      ClientConsumer cons3 = session3.createConsumer(testAddress);
+      ClientConsumer cons4 = session4.createConsumer(testAddress);      
+
+      session1.start();
+      session2.start();
+      session3.start();
+      session4.start();
+      
+      SimpleString propKey = new SimpleString("beatle");
+      
+      ClientMessage messageJohn = session0.createClientMessage(false);
+      messageJohn.putStringProperty(propKey, new SimpleString("john"));
+      messageJohn.getBody().flip();
+      
+      ClientMessage messagePaul = session0.createClientMessage(false);
+      messagePaul.putStringProperty(propKey, new SimpleString("paul"));
+      messagePaul.getBody().flip();
+      
+      ClientMessage messageGeorge = session0.createClientMessage(false);
+      messageGeorge.putStringProperty(propKey, new SimpleString("george"));
+      messageGeorge.getBody().flip();
+      
+      ClientMessage messageRingo = session0.createClientMessage(false);
+      messageRingo.putStringProperty(propKey, new SimpleString("ringo"));
+      messageRingo.getBody().flip();
+      
+      ClientMessage messageOsama = session0.createClientMessage(false);
+      messageOsama.putStringProperty(propKey, new SimpleString("osama"));
+      messageOsama.getBody().flip();
+
+      prod0.send(messageJohn);
+      prod0.send(messagePaul);
+      prod0.send(messageGeorge);
+      prod0.send(messageRingo);
+      prod0.send(messageOsama); 
+      
+      ClientMessage r1 = cons1.receive(1000);
+      assertNotNull(r1);
+      assertEquals(new SimpleString("john"), r1.getProperty(propKey));
+      r1 = cons1.receiveImmediate();
+      assertNull(r1);
+      
+      ClientMessage r2 = cons2.receive(1000);
+      assertNotNull(r2);
+      assertEquals(new SimpleString("paul"), r2.getProperty(propKey));
+      r2 = cons2.receiveImmediate();
+      assertNull(r2);
+      
+      ClientMessage r3 = cons3.receive(1000);
+      assertNotNull(r3);
+      assertEquals(new SimpleString("george"), r3.getProperty(propKey));
+      r3 = cons3.receiveImmediate();
+      assertNull(r3);
+      
+      ClientMessage r4 = cons4.receive(1000);
+      assertNotNull(r4);
+      assertEquals(new SimpleString("ringo"), r4.getProperty(propKey));
+      r4 = cons4.receiveImmediate();
+      assertNull(r4);
+            
+      session0.close();
+      session1.close();
+      session2.close();
+      session3.close();
+      session4.close();
+      
+      service0.stop();      
+      service1.stop();
+      service2.stop();
+      service3.stop();
+      service4.stop();
+      
+      assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+      assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+      assertEquals(0, service2.getServer().getRemotingService().getConnections().size());
+      assertEquals(0, service3.getServer().getRemotingService().getConnections().size());
+      assertEquals(0, service4.getServer().getRemotingService().getConnections().size());
+   }
+   
+   public void testMessageFlowsSameName() throws Exception
+   {
+      Map<String, Object> service0Params = new HashMap<String, Object>();      
+      MessagingService service0 = createMessagingService(0, service0Params);
+      
+      Map<String, Object> service1Params = new HashMap<String, Object>();      
+      MessagingService service1 = createMessagingService(1, service1Params);      
+      service1.start();
+
+      TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service0Params);
+      
+      List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+      TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service1Params);
+      connectors.add(server1tc);
+      
+      final SimpleString address1 = new SimpleString("testaddress");
+                 
+      MessageFlowConfiguration ofconfig1 = new MessageFlowConfiguration("flow1", address1.toString(), "car='saab'", true, 1, -1, null, connectors);
+      MessageFlowConfiguration ofconfig2 = new MessageFlowConfiguration("flow1", address1.toString(),"car='bmw'", true, 1, -1, null, connectors);
+                  
+      Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+      ofconfigs.add(ofconfig1);
+      ofconfigs.add(ofconfig2);
+      
+      service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
+   
+      //Only one of the flows should be deployed
+      service0.start();
+      
+      ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+      ClientSession session0 = csf0.createSession(false, true, true);
+
+      ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+      ClientSession session1 = csf1.createSession(false, true, true);
+      
+      session0.createQueue(address1, address1, null, false, false, false);
+      session1.createQueue(address1, address1, null, false, false, false);
+      ClientProducer prod0 = session0.createProducer(address1);
+
+      ClientConsumer cons1 = session1.createConsumer(address1);
+     
+      session1.start();
+      
+      SimpleString propKey = new SimpleString("car");
+      
+      ClientMessage messageSaab = session0.createClientMessage(false);
+      messageSaab.putStringProperty(propKey, new SimpleString("saab"));
+      messageSaab.getBody().flip();
+      
+      ClientMessage messageBMW = session0.createClientMessage(false);
+      messageBMW.putStringProperty(propKey, new SimpleString("bmw"));
+      messageBMW.getBody().flip();
+      
+      prod0.send(messageSaab);
+      prod0.send(messageBMW);
+      
+      ClientMessage r1 = cons1.receive(1000);
+      assertNotNull(r1);
+      
+      SimpleString val = (SimpleString)r1.getProperty(propKey);
+      assertTrue(val.equals(new SimpleString("saab")) || val.equals(new SimpleString("bmw")));
+      r1 = cons1.receiveImmediate();
+      assertNull(r1);
+      
+      session0.close();
+      session1.close();
+            
+      service0.stop();      
+      service1.stop();
+
+      assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+      assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+   }
+   
+   public void testMessageNullName() throws Exception
+   {
+      Map<String, Object> service0Params = new HashMap<String, Object>();      
+      MessagingService service0 = createMessagingService(0, service0Params);
+      
+      Map<String, Object> service1Params = new HashMap<String, Object>();      
+      MessagingService service1 = createMessagingService(1, service1Params);      
+      service1.start();
+
+      TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service0Params);
+      
+      List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+      TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service1Params);
+      connectors.add(server1tc);
+      
+      final SimpleString address1 = new SimpleString("testaddress");
+                 
+      MessageFlowConfiguration ofconfig1 = new MessageFlowConfiguration(null, address1.toString(), null, true, 1, -1, null, connectors);
+                    
+      Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+      ofconfigs.add(ofconfig1);
+   
+      service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
+   
+      service0.start();
+      
+      ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+      ClientSession session0 = csf0.createSession(false, true, true);
+
+      ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+      ClientSession session1 = csf1.createSession(false, true, true);
+      
+      session0.createQueue(address1, address1, null, false, false, false);
+      session1.createQueue(address1, address1, null, false, false, false);
+      ClientProducer prod0 = session0.createProducer(address1);
+
+      ClientConsumer cons1 = session1.createConsumer(address1);
+     
+      session1.start();
+      
+      SimpleString propKey = new SimpleString("car");
+      
+      ClientMessage message = session0.createClientMessage(false);
+      message.getBody().flip();
+      
+      prod0.send(message);
+      
+      ClientMessage r1 = cons1.receive(1000);
+      assertNull(r1);
+     
+      session0.close();
+      session1.close();
+            
+      service0.stop();      
+      service1.stop();
+
+      assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+      assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+   }
+   
+   public void testMessageNullAdress() throws Exception
+   {
+      Map<String, Object> service0Params = new HashMap<String, Object>();      
+      MessagingService service0 = createMessagingService(0, service0Params);
+      
+      Map<String, Object> service1Params = new HashMap<String, Object>();      
+      MessagingService service1 = createMessagingService(1, service1Params);      
+      service1.start();
+
+      TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service0Params);
+      
+      List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+      TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service1Params);
+      connectors.add(server1tc);
+      
+      final SimpleString address1 = new SimpleString("testaddress");
+                 
+      MessageFlowConfiguration ofconfig1 = new MessageFlowConfiguration("blah", null, null, true, 1, -1, null, connectors);
+                    
+      Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+      ofconfigs.add(ofconfig1);
+   
+      service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
+   
+      service0.start();
+      
+      ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+      ClientSession session0 = csf0.createSession(false, true, true);
+
+      ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+      ClientSession session1 = csf1.createSession(false, true, true);
+      
+      session0.createQueue(address1, address1, null, false, false, false);
+      session1.createQueue(address1, address1, null, false, false, false);
+      ClientProducer prod0 = session0.createProducer(address1);
+
+      ClientConsumer cons1 = session1.createConsumer(address1);
+     
+      session1.start();
+      
+      SimpleString propKey = new SimpleString("car");
+      
+      ClientMessage message = session0.createClientMessage(false);
+      message.getBody().flip();
+      
+      prod0.send(message);
+      
+      ClientMessage r1 = cons1.receive(1000);
+      assertNull(r1);
+     
+      session0.close();
+      session1.close();
+            
+      service0.stop();      
+      service1.stop();
+
+      assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+      assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+   }
+   
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      assertEquals(0, InVMRegistry.instance.size());
+   }
+
+   // Private -------------------------------------------------------
+   
+   // Inner classes -------------------------------------------------
+}

Copied: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowBatchSizeTest.java (from rev 5412, trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowBatchSizeTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowBatchSizeTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowBatchSizeTest.java	2008-11-21 13:59:07 UTC (rev 5416)
@@ -0,0 +1,203 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ * 
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ * 
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.distribution;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * 
+ * A MessageFlowBatchSizeTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 15 Nov 2008 09:06:55
+ *
+ *
+ */
+public class MessageFlowBatchSizeTest extends MessageFlowTestBase
+{
+   private static final Logger log = Logger.getLogger(MessageFlowBatchSizeTest.class);
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testBatchSize() throws Exception
+   {
+      Map<String, Object> service0Params = new HashMap<String, Object>();      
+      MessagingService service0 = createMessagingService(0, service0Params);
+      
+      Map<String, Object> service1Params = new HashMap<String, Object>();      
+      MessagingService service1 = createMessagingService(1, service1Params);      
+      service1.start();
+
+      List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+      TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service1Params);
+      connectors.add(server1tc);
+      
+      final SimpleString address1 = new SimpleString("testaddress");
+                       
+      final int batchSize = 10;
+ 
+      MessageFlowConfiguration ofconfig = new MessageFlowConfiguration("outflow1", address1.toString(), null, true, batchSize, -1, null, connectors);
+      Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+      ofconfigs.add(ofconfig);
+      service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
+
+      service0.start();
+      
+      TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service0Params);
+      
+      ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+      
+      ClientSession session0 = csf0.createSession(false, true, true);
+      
+      ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+      
+      ClientSession session1 = csf1.createSession(false, true, true);
+      
+      session0.createQueue(address1, address1, null, false, false, true);
+
+      session1.createQueue(address1, address1, null, false, false, true);  
+      
+      ClientProducer prod0_1 = session0.createProducer(address1);
+         
+      ClientConsumer cons0_1 = session0.createConsumer(address1);
+      
+      ClientConsumer cons1_1 = session1.createConsumer(address1);
+
+      session0.start();
+      
+      session1.start();
+      
+      final SimpleString propKey = new SimpleString("testkey");
+      
+      for (int j = 0; j < 10; j++)
+      {
+          
+         for (int i = 0; i < batchSize - 1; i++)
+         {      
+            ClientMessage message = session0.createClientMessage(false);
+            message.putIntProperty(propKey, i);        
+            message.getBody().flip();
+                 
+            prod0_1.send(message);
+         }
+              
+         for (int i = 0; i < batchSize - 1; i++)
+         {
+            ClientMessage rmessage1 = cons0_1.receive(1000);
+            
+            assertNotNull(rmessage1);
+            
+            assertEquals(i, rmessage1.getProperty(propKey));         
+         }
+         
+         ClientMessage rmessage1 = cons1_1.receive(250);
+         
+         assertNull(rmessage1);
+         
+         ClientMessage message = session0.createClientMessage(false);
+         message.putIntProperty(propKey, batchSize - 1);        
+         message.getBody().flip();
+              
+         prod0_1.send(message);
+         
+         rmessage1 = cons0_1.receive(1000);
+         
+         assertNotNull(rmessage1);
+         
+         assertEquals(batchSize - 1, rmessage1.getProperty(propKey));  
+         
+         for (int i = 0; i < batchSize; i++)
+         {
+            rmessage1 = cons1_1.receive(1000);
+            
+            assertNotNull(rmessage1);
+            
+            assertEquals(i, rmessage1.getProperty(propKey));         
+         }
+      }
+            
+      session0.close();
+      
+      session1.close();
+      
+      service0.stop();      
+      service1.stop();
+      
+      assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+      assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+   }
+   
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      assertEquals(0, InVMRegistry.instance.size());
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}
+
+

Copied: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWildcardTest.java (from rev 5412, trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithWildcardTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWildcardTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWildcardTest.java	2008-11-21 13:59:07 UTC (rev 5416)
@@ -0,0 +1,282 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ * 
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ * 
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.distribution;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * 
+ * A MessageFlowWildcardTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 15 Nov 2008 08:19:07
+ *
+ *
+ */
+public class MessageFlowWildcardTest extends MessageFlowTestBase
+{
+   private static final Logger log = Logger.getLogger(MessageFlowWildcardTest.class);
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testWithWildcard() throws Exception
+   {
+      Map<String, Object> service0Params = new HashMap<String, Object>();      
+      MessagingService service0 = createMessagingService(0, service0Params);
+      
+      Map<String, Object> service1Params = new HashMap<String, Object>();      
+      MessagingService service1 = createMessagingService(1, service1Params);      
+      service1.start();
+
+      List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+      TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service1Params);
+      connectors.add(server1tc);
+      
+      final SimpleString address1 = new SimpleString("cheese.stilton");
+      
+      final SimpleString address2 = new SimpleString("cheese.wensleydale");
+      
+      final SimpleString address3 = new SimpleString("wine.shiraz");
+      
+      final SimpleString address4 = new SimpleString("wine.cabernet");
+      
+      final SimpleString match1 = new SimpleString("cheese.#");
+      
+            
+      MessageFlowConfiguration ofconfig = new MessageFlowConfiguration("outflow1", match1.toString(), null, true, 1, -1, null, connectors);
+      Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+      ofconfigs.add(ofconfig);
+      service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
+     
+      service0.start();
+      
+      TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service0Params);
+      
+      ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+      
+      ClientSession session0 = csf0.createSession(false, true, true);
+      
+      ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+      
+      ClientSession session1 = csf1.createSession(false, true, true);
+      
+      session0.createQueue(address1, address1, null, false, false, true);
+      session0.createQueue(address2, address2, null, false, false, true);
+      session0.createQueue(address3, address3, null, false, false, true);
+      session0.createQueue(address4, address4, null, false, false, true);     
+      
+      session1.createQueue(address1, address1, null, false, false, true);
+      session1.createQueue(address2, address2, null, false, false, true);
+      session1.createQueue(address3, address3, null, false, false, true);
+      session1.createQueue(address4, address4, null, false, false, true);     
+      
+      ClientProducer prod0_1 = session0.createProducer(address1);
+      ClientProducer prod0_2 = session0.createProducer(address2);
+      ClientProducer prod0_3 = session0.createProducer(address3);
+      ClientProducer prod0_4 = session0.createProducer(address4);      
+      
+      ClientConsumer cons0_1 = session0.createConsumer(address1);
+      ClientConsumer cons0_2 = session0.createConsumer(address2);
+      ClientConsumer cons0_3 = session0.createConsumer(address3);
+      ClientConsumer cons0_4 = session0.createConsumer(address4);
+      
+      ClientConsumer cons1_1 = session1.createConsumer(address1);
+      ClientConsumer cons1_2 = session1.createConsumer(address2);
+      ClientConsumer cons1_3 = session1.createConsumer(address3);
+      ClientConsumer cons1_4 = session1.createConsumer(address4);
+      
+      session0.start();
+      
+      session1.start();
+      
+      final int numMessages = 100;
+      
+      final SimpleString propKey = new SimpleString("testkey");
+      
+      for (int i = 0; i < numMessages; i++)
+      {      
+         ClientMessage message = session0.createClientMessage(false);
+         message.putIntProperty(propKey, i);
+         message.getBody().flip();
+              
+         prod0_1.send(message);
+      }
+      for (int i = 0; i < numMessages; i++)
+      {      
+         ClientMessage message = session0.createClientMessage(false);
+         message.putIntProperty(propKey, i);
+         message.getBody().flip();
+              
+         prod0_2.send(message);
+      }
+      for (int i = 0; i < numMessages; i++)
+      {      
+         ClientMessage message = session0.createClientMessage(false);
+         message.putIntProperty(propKey, i);
+         message.getBody().flip();
+              
+         prod0_3.send(message);
+      }
+      for (int i = 0; i < numMessages; i++)
+      {      
+         ClientMessage message = session0.createClientMessage(false);
+         message.putIntProperty(propKey, i);
+         message.getBody().flip();
+              
+         prod0_4.send(message);
+      }
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage rmessage1 = cons0_1.receive(1000);
+         
+         assertNotNull(rmessage1);
+         
+         assertEquals(i, rmessage1.getProperty(propKey));
+         
+         ClientMessage rmessage2 = cons0_2.receive(1000);
+         
+         assertNotNull(rmessage2);
+         
+         assertEquals(i, rmessage2.getProperty(propKey));
+         
+         ClientMessage rmessage3 = cons0_3.receive(1000);
+         
+         assertNotNull(rmessage3);
+         
+         assertEquals(i, rmessage3.getProperty(propKey));  
+         
+         ClientMessage rmessage4 = cons0_4.receive(1000);
+         
+         assertNotNull(rmessage4);
+         
+         assertEquals(i, rmessage4.getProperty(propKey));  
+      }
+      
+      ClientMessage rmessage1 = cons0_1.receiveImmediate();
+      
+      assertNull(rmessage1);
+      
+      ClientMessage rmessage2 = cons0_2.receiveImmediate();
+      
+      assertNull(rmessage2);
+            
+      ClientMessage rmessage3 = cons0_3.receiveImmediate();
+      
+      assertNull(rmessage3);
+      
+      ClientMessage rmessage4 = cons0_4.receiveImmediate();
+      
+      assertNull(rmessage4);
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         rmessage1 = cons1_1.receive(1000);
+         
+         assertNotNull(rmessage1);
+         
+         assertEquals(i, rmessage1.getProperty(propKey));
+         
+         rmessage2 = cons1_2.receive(1000);
+         
+         assertNotNull(rmessage2);
+         
+         assertEquals(i, rmessage2.getProperty(propKey));         
+      }
+      
+      rmessage1 = cons1_1.receiveImmediate();
+      
+      assertNull(rmessage1);
+      
+      rmessage2 = cons1_2.receiveImmediate();
+      
+      assertNull(rmessage2);
+            
+      rmessage3 = cons1_3.receiveImmediate();
+      
+      assertNull(rmessage3);
+      
+      rmessage4 = cons1_4.receiveImmediate();
+      
+      assertNull(rmessage4);
+      
+      session0.close();
+      
+      session1.close();
+      
+      service0.stop();      
+      service1.stop();
+      
+      assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+      assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+   }
+   
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {      
+      assertEquals(0, InVMRegistry.instance.size());
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}
+

Copied: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWithFilterTest.java (from rev 5412, trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithFilterTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWithFilterTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWithFilterTest.java	2008-11-21 13:59:07 UTC (rev 5416)
@@ -0,0 +1,206 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ * 
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ * 
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.distribution;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * 
+ * A MessageFlowWithFilterTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 15 Nov 2008 08:58:49
+ *
+ *
+ */
+public class MessageFlowWithFilterTest extends MessageFlowTestBase
+{
+   private static final Logger log = Logger.getLogger(MessageFlowWithFilterTest.class);
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testWithWildcard() throws Exception
+   {
+      Map<String, Object> service0Params = new HashMap<String, Object>();      
+      MessagingService service0 = createMessagingService(0, service0Params);
+      
+      Map<String, Object> service1Params = new HashMap<String, Object>();      
+      MessagingService service1 = createMessagingService(1, service1Params);      
+      service1.start();
+
+      List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+      TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service1Params);
+      connectors.add(server1tc);
+      
+      final SimpleString address1 = new SimpleString("testaddress");
+                 
+      final String filter = "selectorkey='ORANGES'";
+      
+      MessageFlowConfiguration ofconfig = new MessageFlowConfiguration("outflow1", address1.toString(), filter, true, 1, -1, null, connectors);
+      Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+      ofconfigs.add(ofconfig);
+      service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
+   
+      service0.start();
+      
+      TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service0Params);
+      
+      ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+      
+      ClientSession session0 = csf0.createSession(false, true, true);
+      
+      ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+      
+      ClientSession session1 = csf1.createSession(false, true, true);
+      
+      session0.createQueue(address1, address1, null, false, false, true);
+
+      session1.createQueue(address1, address1, null, false, false, true);  
+      
+      ClientProducer prod0_1 = session0.createProducer(address1);
+         
+      ClientConsumer cons0_1 = session0.createConsumer(address1);
+      
+      ClientConsumer cons1_1 = session1.createConsumer(address1);
+
+      session0.start();
+      
+      session1.start();
+      
+      final int numMessages = 100;
+      
+      final SimpleString propKey = new SimpleString("testkey");
+      
+      final SimpleString propKey2 = new SimpleString("selectorkey");
+      
+      for (int i = 0; i < numMessages; i++)
+      {      
+         ClientMessage message = session0.createClientMessage(false);
+         message.putIntProperty(propKey, i);
+         message.putStringProperty(propKey2, new SimpleString("ORANGES"));
+         message.getBody().flip();
+              
+         prod0_1.send(message);
+      }
+      for (int i = 0; i < numMessages; i++)
+      {      
+         ClientMessage message = session0.createClientMessage(false);
+         message.putIntProperty(propKey, i);
+         message.putStringProperty(propKey2, new SimpleString("APPLES"));
+         message.getBody().flip();
+              
+         prod0_1.send(message);
+      }
+   
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage rmessage1 = cons0_1.receive(1000);
+         
+         assertNotNull(rmessage1);
+         
+         assertEquals(i, rmessage1.getProperty(propKey));
+         
+         ClientMessage rmessage2 = cons1_1.receive(1000);
+         
+         assertNotNull(rmessage2);
+         
+         assertEquals(i, rmessage2.getProperty(propKey));         
+      }
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage rmessage1 = cons0_1.receive(1000);
+         
+         assertNotNull(rmessage1);
+         
+         assertEquals(i, rmessage1.getProperty(propKey));                      
+      }
+      
+      ClientMessage rmessage1 = cons0_1.receiveImmediate();
+      
+      assertNull(rmessage1);
+      
+      ClientMessage rmessage2 = cons1_1.receiveImmediate();
+      
+      assertNull(rmessage2);
+            
+      session0.close();
+      
+      session1.close();
+      
+      service0.stop();      
+      service1.stop();
+      
+      assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+      assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+   }
+   
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      assertEquals(0, InVMRegistry.instance.size());
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}
+

Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowBatchSizeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowBatchSizeTest.java	2008-11-21 10:32:48 UTC (rev 5415)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowBatchSizeTest.java	2008-11-21 13:59:07 UTC (rev 5416)
@@ -1,225 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
- * Middleware LLC, and individual contributors by the @authors tag. See the
- * copyright.txt in the distribution for a full listing of individual
- * contributors.
- * 
- * This is free software; you can redistribute it and/or modify it under the
- * terms of the GNU Lesser General Public License as published by the Free
- * Software Foundation; either version 2.1 of the License, or (at your option)
- * any later version.
- * 
- * This software is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- * details.
- * 
- * You should have received a copy of the GNU Lesser General Public License
- * along with this software; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
- * site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.integration.cluster.distribution;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import junit.framework.TestCase;
-
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
-import org.jboss.messaging.core.config.impl.ConfigurationImpl;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
-import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
-import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * 
- * A OutflowBatchSizeTest
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
- * Created 15 Nov 2008 09:06:55
- *
- *
- */
-public class OutflowBatchSizeTest extends TestCase
-{
-   private static final Logger log = Logger.getLogger(OutflowBatchSizeTest.class);
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   private MessagingService service0;
-
-   private MessagingService service1;
-   
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   public void testBatchSize() throws Exception
-   {
-      Configuration service0Conf = new ConfigurationImpl();
-      service0Conf.setClustered(true);
-      service0Conf.setSecurityEnabled(false);
-      Map<String, Object> service0Params = new HashMap<String, Object>();
-      service0Params.put(TransportConstants.SERVER_ID_PROP_NAME, 0);
-      service0Conf.getAcceptorConfigurations()
-                  .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
-                                                  service0Params));
-
-      Configuration service1Conf = new ConfigurationImpl();
-      service1Conf.setClustered(true);
-      service1Conf.setSecurityEnabled(false);
-      Map<String, Object> service1Params = new HashMap<String, Object>();
-      service1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
-
-      service1Conf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
-                                                                              service1Params));
-      service1 = MessagingServiceImpl.newNullStorageMessagingServer(service1Conf);
-      service1.start();
-
-      List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
-      TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
-                                                                    service1Params);
-      connectors.add(server1tc);
-      
-      final SimpleString address1 = new SimpleString("testaddress");
-                       
-      final int batchSize = 10;
- 
-      MessageFlowConfiguration ofconfig = new MessageFlowConfiguration("outflow1", address1.toString(), null, true, batchSize, 0, null, connectors);
-      Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
-      ofconfigs.add(ofconfig);
-      service0Conf.setMessageFlowConfigurations(ofconfigs);
-
-      service0 = MessagingServiceImpl.newNullStorageMessagingServer(service0Conf);
-      service0.start();
-      
-      TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
-                                                                    service0Params);
-      
-      ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
-      
-      ClientSession session0 = csf0.createSession(false, true, true);
-      
-      ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
-      
-      ClientSession session1 = csf1.createSession(false, true, true);
-      
-      session0.createQueue(address1, address1, null, false, false, true);
-
-      session1.createQueue(address1, address1, null, false, false, true);  
-      
-      ClientProducer prod0_1 = session0.createProducer(address1);
-         
-      ClientConsumer cons0_1 = session0.createConsumer(address1);
-      
-      ClientConsumer cons1_1 = session1.createConsumer(address1);
-
-      session0.start();
-      
-      session1.start();
-      
-      final SimpleString propKey = new SimpleString("testkey");
-      
-      for (int j = 0; j < 10; j++)
-      {
-          
-         for (int i = 0; i < batchSize - 1; i++)
-         {      
-            ClientMessage message = session0.createClientMessage(false);
-            message.putIntProperty(propKey, i);        
-            message.getBody().flip();
-                 
-            prod0_1.send(message);
-         }
-              
-         for (int i = 0; i < batchSize - 1; i++)
-         {
-            ClientMessage rmessage1 = cons0_1.receive(1000);
-            
-            assertNotNull(rmessage1);
-            
-            assertEquals(i, rmessage1.getProperty(propKey));         
-         }
-         
-         ClientMessage rmessage1 = cons1_1.receive(250);
-         
-         assertNull(rmessage1);
-         
-         ClientMessage message = session0.createClientMessage(false);
-         message.putIntProperty(propKey, batchSize - 1);        
-         message.getBody().flip();
-              
-         prod0_1.send(message);
-         
-         rmessage1 = cons0_1.receive(1000);
-         
-         assertNotNull(rmessage1);
-         
-         assertEquals(batchSize - 1, rmessage1.getProperty(propKey));  
-         
-         for (int i = 0; i < batchSize; i++)
-         {
-            rmessage1 = cons1_1.receive(1000);
-            
-            assertNotNull(rmessage1);
-            
-            assertEquals(i, rmessage1.getProperty(propKey));         
-         }
-      }
-            
-      session0.close();
-      
-      session1.close();
-   }
-   
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   @Override
-   protected void setUp() throws Exception
-   {
-   }
-
-   @Override
-   protected void tearDown() throws Exception
-   {
-      service0.stop();
-      
-      assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
-
-      service1.stop();
-
-      assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
-
-      assertEquals(0, InVMRegistry.instance.size());
-   }
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}
-
-

Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithFilterTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithFilterTest.java	2008-11-21 10:32:48 UTC (rev 5415)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithFilterTest.java	2008-11-21 13:59:07 UTC (rev 5416)
@@ -1,232 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
- * Middleware LLC, and individual contributors by the @authors tag. See the
- * copyright.txt in the distribution for a full listing of individual
- * contributors.
- * 
- * This is free software; you can redistribute it and/or modify it under the
- * terms of the GNU Lesser General Public License as published by the Free
- * Software Foundation; either version 2.1 of the License, or (at your option)
- * any later version.
- * 
- * This software is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- * details.
- * 
- * You should have received a copy of the GNU Lesser General Public License
- * along with this software; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
- * site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.integration.cluster.distribution;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import junit.framework.TestCase;
-
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
-import org.jboss.messaging.core.config.impl.ConfigurationImpl;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
-import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
-import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * 
- * A OutflowWithFilterTest
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
- * Created 15 Nov 2008 08:58:49
- *
- *
- */
-public class OutflowWithFilterTest extends TestCase
-{
-   private static final Logger log = Logger.getLogger(OutflowWithFilterTest.class);
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   private MessagingService service0;
-
-   private MessagingService service1;
-   
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   public void testWithWildcard() throws Exception
-   {
-      Configuration service0Conf = new ConfigurationImpl();
-      service0Conf.setClustered(true);
-      service0Conf.setSecurityEnabled(false);
-      Map<String, Object> service0Params = new HashMap<String, Object>();
-      service0Params.put(TransportConstants.SERVER_ID_PROP_NAME, 0);
-      service0Conf.getAcceptorConfigurations()
-                  .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
-                                                  service0Params));
-
-      Configuration service1Conf = new ConfigurationImpl();
-      service1Conf.setClustered(true);
-      service1Conf.setSecurityEnabled(false);
-      Map<String, Object> service1Params = new HashMap<String, Object>();
-      service1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
-
-      service1Conf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
-                                                                              service1Params));
-      service1 = MessagingServiceImpl.newNullStorageMessagingServer(service1Conf);
-      service1.start();
-
-      List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
-      TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
-                                                                    service1Params);
-      connectors.add(server1tc);
-      
-      final SimpleString address1 = new SimpleString("testaddress");
-                 
-      final String filter = "selectorkey='ORANGES'";
-      
-      MessageFlowConfiguration ofconfig = new MessageFlowConfiguration("outflow1", address1.toString(), filter, true, 1, 0, null, connectors);
-      Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
-      ofconfigs.add(ofconfig);
-      service0Conf.setMessageFlowConfigurations(ofconfigs);
-
-      service0 = MessagingServiceImpl.newNullStorageMessagingServer(service0Conf);
-      service0.start();
-      
-      TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
-                                                                    service0Params);
-      
-      ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
-      
-      ClientSession session0 = csf0.createSession(false, true, true);
-      
-      ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
-      
-      ClientSession session1 = csf1.createSession(false, true, true);
-      
-      session0.createQueue(address1, address1, null, false, false, true);
-
-      session1.createQueue(address1, address1, null, false, false, true);  
-      
-      ClientProducer prod0_1 = session0.createProducer(address1);
-         
-      ClientConsumer cons0_1 = session0.createConsumer(address1);
-      
-      ClientConsumer cons1_1 = session1.createConsumer(address1);
-
-      session0.start();
-      
-      session1.start();
-      
-      final int numMessages = 100;
-      
-      final SimpleString propKey = new SimpleString("testkey");
-      
-      final SimpleString propKey2 = new SimpleString("selectorkey");
-      
-      for (int i = 0; i < numMessages; i++)
-      {      
-         ClientMessage message = session0.createClientMessage(false);
-         message.putIntProperty(propKey, i);
-         message.putStringProperty(propKey2, new SimpleString("ORANGES"));
-         message.getBody().flip();
-              
-         prod0_1.send(message);
-      }
-      for (int i = 0; i < numMessages; i++)
-      {      
-         ClientMessage message = session0.createClientMessage(false);
-         message.putIntProperty(propKey, i);
-         message.putStringProperty(propKey2, new SimpleString("APPLES"));
-         message.getBody().flip();
-              
-         prod0_1.send(message);
-      }
-   
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage rmessage1 = cons0_1.receive(1000);
-         
-         assertNotNull(rmessage1);
-         
-         assertEquals(i, rmessage1.getProperty(propKey));
-         
-         ClientMessage rmessage2 = cons1_1.receive(1000);
-         
-         assertNotNull(rmessage2);
-         
-         assertEquals(i, rmessage2.getProperty(propKey));         
-      }
-      
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage rmessage1 = cons0_1.receive(1000);
-         
-         assertNotNull(rmessage1);
-         
-         assertEquals(i, rmessage1.getProperty(propKey));                      
-      }
-      
-      ClientMessage rmessage1 = cons0_1.receiveImmediate();
-      
-      assertNull(rmessage1);
-      
-      ClientMessage rmessage2 = cons1_1.receiveImmediate();
-      
-      assertNull(rmessage2);
-            
-      session0.close();
-      
-      session1.close();
-   }
-   
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   @Override
-   protected void setUp() throws Exception
-   {
-   }
-
-   @Override
-   protected void tearDown() throws Exception
-   {
-      service0.stop();
-      
-      assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
-
-      service1.stop();
-
-      assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
-
-      assertEquals(0, InVMRegistry.instance.size());
-   }
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}
-

Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithWildcardTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithWildcardTest.java	2008-11-21 10:32:48 UTC (rev 5415)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithWildcardTest.java	2008-11-21 13:59:07 UTC (rev 5416)
@@ -1,308 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
- * Middleware LLC, and individual contributors by the @authors tag. See the
- * copyright.txt in the distribution for a full listing of individual
- * contributors.
- * 
- * This is free software; you can redistribute it and/or modify it under the
- * terms of the GNU Lesser General Public License as published by the Free
- * Software Foundation; either version 2.1 of the License, or (at your option)
- * any later version.
- * 
- * This software is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- * details.
- * 
- * You should have received a copy of the GNU Lesser General Public License
- * along with this software; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
- * site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.integration.cluster.distribution;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import junit.framework.TestCase;
-
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
-import org.jboss.messaging.core.config.impl.ConfigurationImpl;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
-import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
-import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * 
- * A OutflowWithWildcardTest
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
- * Created 15 Nov 2008 08:19:07
- *
- *
- */
-public class OutflowWithWildcardTest extends TestCase
-{
-   private static final Logger log = Logger.getLogger(OutflowWithWildcardTest.class);
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   private MessagingService service0;
-
-   private MessagingService service1;
-   
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   public void testWithWildcard() throws Exception
-   {
-      Configuration service0Conf = new ConfigurationImpl();
-      service0Conf.setClustered(true);
-      service0Conf.setSecurityEnabled(false);
-      Map<String, Object> service0Params = new HashMap<String, Object>();
-      service0Params.put(TransportConstants.SERVER_ID_PROP_NAME, 0);
-      service0Conf.getAcceptorConfigurations()
-                  .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
-                                                  service0Params));
-
-      Configuration service1Conf = new ConfigurationImpl();
-      service1Conf.setClustered(true);
-      service1Conf.setSecurityEnabled(false);
-      Map<String, Object> service1Params = new HashMap<String, Object>();
-      service1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
-
-      service1Conf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
-                                                                              service1Params));
-      service1 = MessagingServiceImpl.newNullStorageMessagingServer(service1Conf);
-      service1.start();
-
-      List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
-      TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
-                                                                    service1Params);
-      connectors.add(server1tc);
-      
-      final SimpleString address1 = new SimpleString("cheese.stilton");
-      
-      final SimpleString address2 = new SimpleString("cheese.wensleydale");
-      
-      final SimpleString address3 = new SimpleString("wine.shiraz");
-      
-      final SimpleString address4 = new SimpleString("wine.cabernet");
-      
-      final SimpleString match1 = new SimpleString("cheese.#");
-      
-            
-      MessageFlowConfiguration ofconfig = new MessageFlowConfiguration("outflow1", match1.toString(), null, true, 1, 0, null, connectors);
-      Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
-      ofconfigs.add(ofconfig);
-      service0Conf.setMessageFlowConfigurations(ofconfigs);
-
-      service0 = MessagingServiceImpl.newNullStorageMessagingServer(service0Conf);
-      service0.start();
-      
-      TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
-                                                                    service0Params);
-      
-      ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
-      
-      ClientSession session0 = csf0.createSession(false, true, true);
-      
-      ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
-      
-      ClientSession session1 = csf1.createSession(false, true, true);
-      
-      session0.createQueue(address1, address1, null, false, false, true);
-      session0.createQueue(address2, address2, null, false, false, true);
-      session0.createQueue(address3, address3, null, false, false, true);
-      session0.createQueue(address4, address4, null, false, false, true);     
-      
-      session1.createQueue(address1, address1, null, false, false, true);
-      session1.createQueue(address2, address2, null, false, false, true);
-      session1.createQueue(address3, address3, null, false, false, true);
-      session1.createQueue(address4, address4, null, false, false, true);     
-      
-      ClientProducer prod0_1 = session0.createProducer(address1);
-      ClientProducer prod0_2 = session0.createProducer(address2);
-      ClientProducer prod0_3 = session0.createProducer(address3);
-      ClientProducer prod0_4 = session0.createProducer(address4);      
-      
-      ClientConsumer cons0_1 = session0.createConsumer(address1);
-      ClientConsumer cons0_2 = session0.createConsumer(address2);
-      ClientConsumer cons0_3 = session0.createConsumer(address3);
-      ClientConsumer cons0_4 = session0.createConsumer(address4);
-      
-      ClientConsumer cons1_1 = session1.createConsumer(address1);
-      ClientConsumer cons1_2 = session1.createConsumer(address2);
-      ClientConsumer cons1_3 = session1.createConsumer(address3);
-      ClientConsumer cons1_4 = session1.createConsumer(address4);
-      
-      session0.start();
-      
-      session1.start();
-      
-      final int numMessages = 100;
-      
-      final SimpleString propKey = new SimpleString("testkey");
-      
-      for (int i = 0; i < numMessages; i++)
-      {      
-         ClientMessage message = session0.createClientMessage(false);
-         message.putIntProperty(propKey, i);
-         message.getBody().flip();
-              
-         prod0_1.send(message);
-      }
-      for (int i = 0; i < numMessages; i++)
-      {      
-         ClientMessage message = session0.createClientMessage(false);
-         message.putIntProperty(propKey, i);
-         message.getBody().flip();
-              
-         prod0_2.send(message);
-      }
-      for (int i = 0; i < numMessages; i++)
-      {      
-         ClientMessage message = session0.createClientMessage(false);
-         message.putIntProperty(propKey, i);
-         message.getBody().flip();
-              
-         prod0_3.send(message);
-      }
-      for (int i = 0; i < numMessages; i++)
-      {      
-         ClientMessage message = session0.createClientMessage(false);
-         message.putIntProperty(propKey, i);
-         message.getBody().flip();
-              
-         prod0_4.send(message);
-      }
-      
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage rmessage1 = cons0_1.receive(1000);
-         
-         assertNotNull(rmessage1);
-         
-         assertEquals(i, rmessage1.getProperty(propKey));
-         
-         ClientMessage rmessage2 = cons0_2.receive(1000);
-         
-         assertNotNull(rmessage2);
-         
-         assertEquals(i, rmessage2.getProperty(propKey));
-         
-         ClientMessage rmessage3 = cons0_3.receive(1000);
-         
-         assertNotNull(rmessage3);
-         
-         assertEquals(i, rmessage3.getProperty(propKey));  
-         
-         ClientMessage rmessage4 = cons0_4.receive(1000);
-         
-         assertNotNull(rmessage4);
-         
-         assertEquals(i, rmessage4.getProperty(propKey));  
-      }
-      
-      ClientMessage rmessage1 = cons0_1.receiveImmediate();
-      
-      assertNull(rmessage1);
-      
-      ClientMessage rmessage2 = cons0_2.receiveImmediate();
-      
-      assertNull(rmessage2);
-            
-      ClientMessage rmessage3 = cons0_3.receiveImmediate();
-      
-      assertNull(rmessage3);
-      
-      ClientMessage rmessage4 = cons0_4.receiveImmediate();
-      
-      assertNull(rmessage4);
-      
-      for (int i = 0; i < numMessages; i++)
-      {
-         rmessage1 = cons1_1.receive(1000);
-         
-         assertNotNull(rmessage1);
-         
-         assertEquals(i, rmessage1.getProperty(propKey));
-         
-         rmessage2 = cons1_2.receive(1000);
-         
-         assertNotNull(rmessage2);
-         
-         assertEquals(i, rmessage2.getProperty(propKey));         
-      }
-      
-      rmessage1 = cons1_1.receiveImmediate();
-      
-      assertNull(rmessage1);
-      
-      rmessage2 = cons1_2.receiveImmediate();
-      
-      assertNull(rmessage2);
-            
-      rmessage3 = cons1_3.receiveImmediate();
-      
-      assertNull(rmessage3);
-      
-      rmessage4 = cons1_4.receiveImmediate();
-      
-      assertNull(rmessage4);
-      
-      session0.close();
-      
-      session1.close();
-   }
-   
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   @Override
-   protected void setUp() throws Exception
-   {
-   }
-
-   @Override
-   protected void tearDown() throws Exception
-   {
-      service0.stop();
-      
-      assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
-
-      service1.stop();
-
-      assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
-
-      assertEquals(0, InVMRegistry.instance.size());
-   }
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}
-

Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SimpleOutflowTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SimpleOutflowTest.java	2008-11-21 10:32:48 UTC (rev 5415)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SimpleOutflowTest.java	2008-11-21 13:59:07 UTC (rev 5416)
@@ -1,309 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
- * Middleware LLC, and individual contributors by the @authors tag. See the
- * copyright.txt in the distribution for a full listing of individual
- * contributors.
- * 
- * This is free software; you can redistribute it and/or modify it under the
- * terms of the GNU Lesser General Public License as published by the Free
- * Software Foundation; either version 2.1 of the License, or (at your option)
- * any later version.
- * 
- * This software is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- * details.
- * 
- * You should have received a copy of the GNU Lesser General Public License
- * along with this software; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
- * site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.integration.cluster.distribution;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import junit.framework.TestCase;
-
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
-import org.jboss.messaging.core.config.impl.ConfigurationImpl;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
-import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
-import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * 
- * A ActivationTimeoutTest
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
- * Created 4 Nov 2008 16:54:50
- *
- *
- */
-public class SimpleOutflowTest extends TestCase
-{
-   private static final Logger log = Logger.getLogger(SimpleOutflowTest.class);
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   private MessagingService service0;
-
-   private MessagingService service1;
-   
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   public void testSimpleOutflowFanout() throws Exception
-   {
-      Configuration service0Conf = new ConfigurationImpl();
-      service0Conf.setClustered(true);
-      service0Conf.setSecurityEnabled(false);
-      Map<String, Object> service0Params = new HashMap<String, Object>();
-      service0Params.put(TransportConstants.SERVER_ID_PROP_NAME, 0);
-      service0Conf.getAcceptorConfigurations()
-                  .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
-                                                  service0Params));
-
-      Configuration service1Conf = new ConfigurationImpl();
-      service1Conf.setClustered(true);
-      service1Conf.setSecurityEnabled(false);
-      Map<String, Object> service1Params = new HashMap<String, Object>();
-      service1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
-
-      service1Conf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
-                                                                              service1Params));
-      service1 = MessagingServiceImpl.newNullStorageMessagingServer(service1Conf);
-      service1.start();
-
-      List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
-      TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
-                                                                    service1Params);
-      connectors.add(server1tc);
-      
-      final SimpleString testAddress = new SimpleString("testaddress");
-
-      MessageFlowConfiguration ofconfig = new MessageFlowConfiguration("outflow1", testAddress.toString(), null, true, 1, 0, null, connectors);
-      Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
-      ofconfigs.add(ofconfig);
-      service0Conf.setMessageFlowConfigurations(ofconfigs);
-
-      service0 = MessagingServiceImpl.newNullStorageMessagingServer(service0Conf);
-      service0.start();
-      
-      TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
-                                                                    service0Params);
-      
-      ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
-      
-      ClientSession session0 = csf0.createSession(false, true, true);
-      
-      ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
-      
-      ClientSession session1 = csf1.createSession(false, true, true);
-      
-      session0.createQueue(testAddress, testAddress, null, false, false, true);
-      
-      session1.createQueue(testAddress, testAddress, null, false, false, true);
-      
-      ClientProducer prod0 = session0.createProducer(testAddress);
-      
-      ClientConsumer cons0 = session0.createConsumer(testAddress);
-      
-      ClientConsumer cons1 = session1.createConsumer(testAddress);
-      
-      session0.start();
-      
-      session1.start();
-      
-      final int numMessages = 100;
-      
-      final SimpleString propKey = new SimpleString("testkey");
-      
-      for (int i = 0; i < numMessages; i++)
-      {      
-         ClientMessage message = session0.createClientMessage(false);
-         message.putIntProperty(propKey, i);
-         message.getBody().flip();
-              
-         prod0.send(message);
-      }
-      
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage rmessage0 = cons0.receive(1000);
-         
-         assertNotNull(rmessage0);
-         
-         assertEquals(i, rmessage0.getProperty(propKey));
-         
-         ClientMessage rmessage1 = cons1.receive(1000);
-         
-         assertNotNull(rmessage1);
-         
-         assertEquals(i, rmessage1.getProperty(propKey));  
-      }
-   }
-   
-   public void testSimpleOutflowRoundRobin() throws Exception
-   {
-      Configuration service0Conf = new ConfigurationImpl();
-      service0Conf.setClustered(true);
-      service0Conf.setSecurityEnabled(false);
-      Map<String, Object> service0Params = new HashMap<String, Object>();
-      service0Params.put(TransportConstants.SERVER_ID_PROP_NAME, 0);
-      service0Conf.getAcceptorConfigurations()
-                  .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
-                                                  service0Params));
-
-      Configuration service1Conf = new ConfigurationImpl();
-      service1Conf.setClustered(true);
-      service1Conf.setSecurityEnabled(false);
-      Map<String, Object> service1Params = new HashMap<String, Object>();
-      service1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
-
-      service1Conf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
-                                                                              service1Params));
-      service1 = MessagingServiceImpl.newNullStorageMessagingServer(service1Conf);
-      service1.start();
-
-      List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
-      TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
-                                                                    service1Params);
-      connectors.add(server1tc);
-      
-      final SimpleString testAddress = new SimpleString("testaddress");
-
-      MessageFlowConfiguration ofconfig = new MessageFlowConfiguration("outflow1", testAddress.toString(), null, false, 1, 0, null, connectors);
-      Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
-      ofconfigs.add(ofconfig);
-      service0Conf.setMessageFlowConfigurations(ofconfigs);
-
-      service0 = MessagingServiceImpl.newNullStorageMessagingServer(service0Conf);
-      service0.start();
-      
-      TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
-                                                                    service0Params);
-      
-      ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
-      
-      ClientSession session0 = csf0.createSession(false, true, true);
-      
-      ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
-      
-      ClientSession session1 = csf1.createSession(false, true, true);
-      
-      session0.createQueue(testAddress, testAddress, null, false, false, false);
-      
-      session1.createQueue(testAddress, testAddress, null, false, false, false);
-      
-      ClientProducer prod0 = session0.createProducer(testAddress);
-      
-      ClientConsumer cons0 = session0.createConsumer(testAddress);
-      
-      ClientConsumer cons1 = session1.createConsumer(testAddress);
-      
-      session0.start();
-      
-      session1.start();
-      
-      final int numMessages = 100;
-      
-      final SimpleString propKey = new SimpleString("testkey");
-      
-      for (int i = 0; i < numMessages; i++)
-      {      
-         ClientMessage message = session0.createClientMessage(false);
-         message.putIntProperty(propKey, i);
-         message.getBody().flip();
-              
-         prod0.send(message);
-      }
-      
-      ClientMessage msg = cons0.receive(1000);
-      
-      boolean toggle = msg != null;
-      
-      int i;
-      if (toggle)
-      {
-         assertEquals(0, msg.getProperty(propKey));
-         
-         i = 1;
-      }
-      else
-      {
-         i = 0;
-      }
-      
-      for (; i < numMessages; i++)
-      {
-         if (!toggle)
-         {
-            ClientMessage rmessage0 = cons0.receive(1000);
-            
-            assertNotNull(rmessage0);
-            
-            assertEquals(i, rmessage0.getProperty(propKey));
-         }
-         else
-         {
-            ClientMessage rmessage1 = cons1.receive(1000);
-            
-            assertNotNull(rmessage1);
-            
-            assertEquals(i, rmessage1.getProperty(propKey));  
-         }
-         
-         toggle = !toggle;
-      }
-   }
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   @Override
-   protected void setUp() throws Exception
-   {
-   }
-
-   @Override
-   protected void tearDown() throws Exception
-   {
-      service0.stop();
-      
-      assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
-
-      service1.stop();
-
-      assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
-
-      assertEquals(0, InVMRegistry.instance.size());
-   }
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverPreCommitMessageTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverPreCommitMessageTest.java	2008-11-21 10:32:48 UTC (rev 5415)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverPreCommitMessageTest.java	2008-11-21 13:59:07 UTC (rev 5416)
@@ -70,10 +70,6 @@
 
    // Public --------------------------------------------------------
 
-   /*
-    * Set messages to expire very soon, send a load of them, so at some of them get expired when they reach the client
-    * After failover make sure all are received ok
-    */
    public void testPreCommitFailoverTest() throws Exception
    {
       ClientSessionFactoryInternal sf1 = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
@@ -134,10 +130,6 @@
          {
             message.acknowledge();
 
-            //We sleep a little to make sure messages aren't consumed too quickly and some
-            //will expire before reaching consumer
-            Thread.sleep(1);
-
             count++;
          }
          else

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/queue/DeadLetterAddressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/queue/DeadLetterAddressTest.java	2008-11-21 10:32:48 UTC (rev 5415)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/queue/DeadLetterAddressTest.java	2008-11-21 13:59:07 UTC (rev 5416)
@@ -21,29 +21,29 @@
  */
 package org.jboss.messaging.tests.integration.queue;
 
-import org.jboss.messaging.tests.util.UnitTestCase;
-import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
-import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.message.impl.MessageImpl;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.impl.XidImpl;
-import org.jboss.messaging.core.message.impl.MessageImpl;
+import org.jboss.messaging.tests.util.UnitTestCase;
 import org.jboss.messaging.util.SimpleString;
-import org.jboss.messaging.jms.client.JBossMessage;
 
-import javax.transaction.xa.Xid;
-import javax.transaction.xa.XAResource;
-import java.util.Map;
-import java.util.HashMap;
-
 /**
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
  */

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/queue/ExpiryAddressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/queue/ExpiryAddressTest.java	2008-11-21 10:32:48 UTC (rev 5415)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/queue/ExpiryAddressTest.java	2008-11-21 13:59:07 UTC (rev 5416)
@@ -21,30 +21,23 @@
  */
 package org.jboss.messaging.tests.integration.queue;
 
-import org.jboss.messaging.tests.util.UnitTestCase;
-import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientProducer;
+import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_ACTUAL_EXPIRY_TIME;
+
 import org.jboss.messaging.core.client.ClientConsumer;
 import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.client.ClientSessionFactory;
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.settings.impl.QueueSettings;
-import org.jboss.messaging.core.config.impl.ConfigurationImpl;
 import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
 import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.transaction.impl.XidImpl;
-import org.jboss.messaging.core.message.impl.MessageImpl;
-import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_ACTUAL_EXPIRY_TIME;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.tests.util.UnitTestCase;
 import org.jboss.messaging.util.SimpleString;
 
-import javax.transaction.xa.Xid;
-import javax.transaction.xa.XAResource;
-import java.util.Map;
-import java.util.HashMap;
-
-
 /**
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
  */
@@ -136,7 +129,7 @@
       clientConsumer.close();
    }
 
-    public void testHeadersSet() throws Exception
+   public void testHeadersSet() throws Exception
    {
       final int NUM_MESSAGES = 5;
       SimpleString ea = new SimpleString("DLA");
@@ -151,7 +144,7 @@
       ClientSession sendSession = sessionFactory.createSession(false, true, true);
       ClientProducer producer = sendSession.createProducer(qName);
 
-         long expiration = System.currentTimeMillis();
+      long expiration = System.currentTimeMillis();
       for (int i = 0; i < NUM_MESSAGES; i++)
       {
          ClientMessage tm = createTextMessage("Message:" + i, clientSession);
@@ -163,7 +156,7 @@
       clientSession.start();
       ClientMessage m = clientConsumer.receive(1000);
       assertNull(m);
-      //All the messages should now be in the EQ
+      // All the messages should now be in the EQ
 
       ClientConsumer cc3 = clientSession.createConsumer(eq);
 
@@ -177,7 +170,7 @@
          assertEquals("Message:" + i, text);
 
          // Check the headers
-         Long actualExpiryTime = (Long) tm.getProperty(HDR_ACTUAL_EXPIRY_TIME);
+         Long actualExpiryTime = (Long)tm.getProperty(HDR_ACTUAL_EXPIRY_TIME);
          assertTrue(actualExpiryTime >= expiration);
       }
 
@@ -191,9 +184,9 @@
       TransportConfiguration transportConfig = new TransportConfiguration(INVM_ACCEPTOR_FACTORY);
       configuration.getAcceptorConfigurations().add(transportConfig);
       messagingService = MessagingServiceImpl.newNullStorageMessagingServer(configuration);
-      //start the server
+      // start the server
       messagingService.start();
-      //then we create a client as normal
+      // then we create a client as normal
       ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
       clientSession = sessionFactory.createSession(true, true, false);
    }
@@ -228,4 +221,3 @@
    }
 
 }
-




More information about the jboss-cvs-commits mailing list