[jboss-cvs] JBoss Messaging SVN: r5578 - in trunk: src/main/org/jboss/messaging/core/config/impl and 15 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jan 5 08:08:07 EST 2009


Author: timfox
Date: 2009-01-05 08:08:07 -0500 (Mon, 05 Jan 2009)
New Revision: 5578

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowRestartTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
   trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
   trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMAcceptor.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnection.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnector.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMRegistry.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/mock/MockConnector.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
Log:
Message flow restart + various tweaks and fixes


Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2009-01-05 13:08:07 UTC (rev 5578)
@@ -13,6 +13,7 @@
 package org.jboss.messaging.core.client.impl;
 
 import java.io.File;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.LinkedList;
@@ -694,7 +695,12 @@
       {
          if (!directory.exists())
          {
-            directory.mkdirs();
+            boolean ok = directory.mkdirs();
+            
+            if (!ok)
+            {
+               throw new IOException("Failed to create directory " + directory.getCanonicalPath());
+            }
          }
 
          ClientFileMessageImpl message = new ClientFileMessageImpl();

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2009-01-05 13:08:07 UTC (rev 5578)
@@ -324,7 +324,7 @@
    private TransportConfiguration parseTransportConfiguration(final Node node)
    {
       Node nameNode = node.getAttributes().getNamedItem("name");
-
+      
       String name = nameNode != null ? nameNode.getNodeValue() : null;
 
       NodeList children = node.getChildNodes();

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java	2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java	2009-01-05 13:08:07 UTC (rev 5578)
@@ -29,6 +29,7 @@
 import java.util.List;
 
 import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.logging.Logger;
 
 /**
  * 
@@ -40,6 +41,8 @@
  */
 public abstract class AbstractSequentialFactory implements SequentialFileFactory
 {
+   private static final Logger log = Logger.getLogger(AbstractSequentialFactory.class);
+   
    protected final String journalDir;
 
    public AbstractSequentialFactory(final String journalDir)
@@ -51,9 +54,14 @@
     * Create the directory if it doesn't exist yet
     */
    public void createDirs() throws Exception
-   {
+   {      
       File file = new File(journalDir);
-      file.mkdirs();
+      boolean ok = file.mkdirs();
+//FIXME - uncomment when https://jira.jboss.org/jira/browse/JBMESSAGING-1477 is complete      
+//      if (!ok)
+//      {
+//         throw new IOException("Failed to create directory " + journalDir);
+//      }
    }
    
 

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java	2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java	2009-01-05 13:08:07 UTC (rev 5578)
@@ -26,6 +26,7 @@
 
 import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.logging.Logger;
 
 /**
  * 
@@ -37,6 +38,8 @@
  */
 public class NIOSequentialFileFactory extends AbstractSequentialFactory implements SequentialFileFactory
 {
+   private static final Logger log = Logger.getLogger(NIOSequentialFileFactory.class);
+   
    public NIOSequentialFileFactory(final String journalDir)
    {
       super(journalDir);

Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java	2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java	2009-01-05 13:08:07 UTC (rev 5578)
@@ -69,7 +69,7 @@
     * @return
     * @throws Exception 
     */
-   PagingStore createPageStore(SimpleString destination) throws Exception;
+   PagingStore createPageStore(SimpleString destination, boolean createDir) throws Exception;
 
    /** To return the PageStore associated with the address */
    PagingStore getPageStore(SimpleString address) throws Exception;

Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java	2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java	2009-01-05 13:08:07 UTC (rev 5578)
@@ -38,7 +38,7 @@
  */
 public interface PagingStoreFactory
 {
-   PagingStore newStore(SimpleString destinationName, QueueSettings queueSettings);
+   PagingStore newStore(SimpleString destinationName, QueueSettings queueSettings, boolean createDir);
 
    Executor getGlobalDepagerExecutor();
 

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2009-01-05 13:08:07 UTC (rev 5578)
@@ -135,7 +135,7 @@
 
       for (SimpleString dest : destinations)
       {
-         createPageStore(dest);
+         createPageStore(dest, false);
       }
    }
 
@@ -143,13 +143,13 @@
     * @param destination
     * @return
     */
-   public synchronized PagingStore createPageStore(final SimpleString storeName) throws Exception
+   public synchronized PagingStore createPageStore(final SimpleString storeName, final boolean createDir) throws Exception
    {
       PagingStore store = stores.get(storeName);
 
       if (store == null)
       {
-         store = newStore(storeName);
+         store = newStore(storeName, createDir);
 
          PagingStore oldStore = stores.putIfAbsent(storeName, store);
 
@@ -172,7 +172,7 @@
 
       if (store == null)
       {
-         store = createPageStore(storeName);
+         store = createPageStore(storeName, true);
       }
 
       return store;
@@ -334,9 +334,9 @@
 
    // Private -------------------------------------------------------
 
-   private PagingStore newStore(final SimpleString destinationName)
+   private PagingStore newStore(final SimpleString destinationName, final boolean createDir)
    {
-      return pagingStoreFactory.newStore(destinationName, queueSettingsRepository.getMatch(destinationName.toString()));
+      return pagingStoreFactory.newStore(destinationName, queueSettingsRepository.getMatch(destinationName.toString()), createDir);
    }
 
    // Inner classes -------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java	2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java	2009-01-05 13:08:07 UTC (rev 5578)
@@ -106,17 +106,18 @@
       parentExecutor.awaitTermination(30, TimeUnit.SECONDS);
    }
 
-   public PagingStore newStore(final SimpleString destinationName, final QueueSettings settings)
+   public PagingStore newStore(final SimpleString destinationName, final QueueSettings settings, final boolean createDir)
    {      
       final String destinationDirectory = directory + "/" + Base64.encodeBytes(destinationName.getData(), Base64.URL_SAFE);
-
+      
       return new PagingStoreImpl(pagingManager,
                                  storageManager,
                                  postOffice,
                                  newFileFactory(destinationDirectory),
                                  destinationName,
                                  settings,
-                                 executorFactory.getExecutor());
+                                 executorFactory.getExecutor(),
+                                 createDir);
    }
 
    public void setPagingManager(final PagingManager pagingManager)
@@ -146,8 +147,7 @@
 
       }
       else
-      {
-         
+      {         
          ArrayList<SimpleString> filesReturn = new ArrayList<SimpleString>(files.length);
          
          for (File file: files)

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-01-05 13:08:07 UTC (rev 5578)
@@ -111,6 +111,8 @@
    private final ReadWriteLock currentPageLock = new ReentrantReadWriteLock();
 
    private volatile boolean running = false;
+   
+   private final boolean createDir;
 
    // Static --------------------------------------------------------
 
@@ -135,7 +137,8 @@
                           final SequentialFileFactory fileFactory,
                           final SimpleString storeName,
                           final QueueSettings queueSettings,
-                          final Executor executor)
+                          final Executor executor,
+                          final boolean createDir)
    {
       if (pagingManager == null)
       {
@@ -166,6 +169,8 @@
       this.executor = executor;
 
       this.pagingManager = pagingManager;
+      
+      this.createDir = createDir;
    }
 
    // Public --------------------------------------------------------
@@ -531,7 +536,10 @@
          {
             currentPageLock.writeLock().lock();
 
-            fileFactory.createDirs();
+            if (createDir)
+            {               
+               fileFactory.createDirs();
+            }
 
             firstPageId = Integer.MAX_VALUE;
             currentPageId = 0;

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-01-05 13:08:07 UTC (rev 5578)
@@ -548,6 +548,8 @@
          }
       }
 
+      pagingManager.reloadStores();
+      
       Map<SimpleString, List<Pair<SimpleString, Long>>> duplicateIDMap = new HashMap<SimpleString, List<Pair<SimpleString, Long>>>();
 
       storageManager.loadMessageJournal(this, storageManager, queueSettingsRepository, queues, resourceManager, duplicateIDMap);
@@ -565,8 +567,7 @@
       }
 
       // This is necessary as if the server was previously stopped while a depage was being executed,
-      // it needs to resume the depage process on those destinations
-      pagingManager.reloadStores();
+      // it needs to resume the depage process on those destinations      
       pagingManager.startGlobalDepage();
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java	2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java	2009-01-05 13:08:07 UTC (rev 5578)
@@ -138,7 +138,7 @@
    }
 
    private synchronized Address addAndUpdateAddressMap(SimpleString address)
-   {
+   {     
       Address add = addresses.get(address);
       if (add == null)
       {
@@ -191,7 +191,7 @@
    }
 
    private List<SimpleString> getAddresses(final Address address)
-   {
+   {      
       List<SimpleString> addresses = new ArrayList<SimpleString>();
       SimpleString[] parts = address.getAddressParts();
 
@@ -199,18 +199,20 @@
       addresses.add(SINGLE_WORD_SIMPLESTRING);
       addresses.add(ANY_WORDS_SIMPLESTRING);
       if (address.getAddressParts().length > 1)
-      {
+      {         
          addresses = addPart(addresses, address, 1);
+               
       }
       addresses.remove(address.getAddress());
+                 
       return addresses;
    }
 
    private List<SimpleString> addPart(final List<SimpleString> addresses, final Address address, final int pos)
-   {
+   {      
       List<SimpleString> newAddresses = new ArrayList<SimpleString>();
       for (SimpleString add : addresses)
-      {
+      {         
          newAddresses.add(add.concat(DELIM).concat(SINGLE_WORD));
          newAddresses.add(add.concat(DELIM).concat(ANY_WORDS));
          newAddresses.add(add.concat(DELIM).concat(address.getAddressParts()[pos]));

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java	2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java	2009-01-05 13:08:07 UTC (rev 5578)
@@ -210,6 +210,10 @@
       {
          acceptor.stop();
       }
+      
+      acceptors.clear();
+      
+      connections.clear();
 
       started = false;
    }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMAcceptor.java	2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMAcceptor.java	2009-01-05 13:08:07 UTC (rev 5578)
@@ -116,7 +116,7 @@
          throw new IllegalStateException("Acceptor is not started");
       }
       
-      new InVMConnection(connectionID, remoteHandler, new Listener(connector));               
+      new InVMConnection(id, connectionID, remoteHandler, new Listener(connector));               
    }
    
    public void disconnect(final String connectionID)

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnection.java	2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnection.java	2009-01-05 13:08:07 UTC (rev 5578)
@@ -53,19 +53,23 @@
    private final String id;
 
    private boolean closed;
+   
+   private final int serverID;
 
    private static final ExecutorFactory factory =
       new OrderedExecutorFactory(Executors.newCachedThreadPool(new JBMThreadFactory("JBM-InVM-Transport-Threads")));
 
    private final Executor executor;
 
-   public InVMConnection(final BufferHandler handler, final ConnectionLifeCycleListener listener)
-   {
-      this(UUIDGenerator.getInstance().generateSimpleStringUUID().toString(), handler, listener);
+   public InVMConnection(final int serverID, final BufferHandler handler, final ConnectionLifeCycleListener listener)
+   {      
+      this(serverID, UUIDGenerator.getInstance().generateSimpleStringUUID().toString(), handler, listener);
    }
 
-   public InVMConnection(final String id, final BufferHandler handler, final ConnectionLifeCycleListener listener)
+   public InVMConnection(final int serverID, final String id, final BufferHandler handler, final ConnectionLifeCycleListener listener)
    {
+      this.serverID = serverID;
+      
       this.handler = handler;
 
       this.listener = listener;
@@ -134,6 +138,6 @@
 
    public String getRemoteAddress()
    {
-      return "invm";
+      return "invm:" + serverID;
    }
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnector.java	2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnector.java	2009-01-05 13:08:07 UTC (rev 5578)
@@ -62,7 +62,7 @@
       }
    }
 
-   private final int id;
+   protected final int id;
    
    private final BufferHandler handler;
    
@@ -156,11 +156,10 @@
       }
    }
 
-
    // This may be an injection point for mocks on tests
    protected Connection internalCreateConnection(final BufferHandler handler, final ConnectionLifeCycleListener listener)
    {
-      return new InVMConnection(handler, listener);
+      return new InVMConnection(id, handler, listener);
    }
       
    private class Listener implements ConnectionLifeCycleListener

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMRegistry.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMRegistry.java	2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMRegistry.java	2009-01-05 13:08:07 UTC (rev 5578)
@@ -35,20 +35,19 @@
 public class InVMRegistry
 {
    private static final Logger log = Logger.getLogger(InVMRegistry.class);
-   
+
    public static final InVMRegistry instance = new InVMRegistry();
-   
-   private ConcurrentMap<Integer, InVMAcceptor> acceptors =
-      new ConcurrentHashMap<Integer, InVMAcceptor>();
-   
+
+   private ConcurrentMap<Integer, InVMAcceptor> acceptors = new ConcurrentHashMap<Integer, InVMAcceptor>();
+
    public void registerAcceptor(final int id, final InVMAcceptor acceptor)
-   {      
+   {     
       if (acceptors.putIfAbsent(id, acceptor) != null)
       {
          throw new IllegalArgumentException("Acceptor with id " + id + " already registered");
       }
    }
-   
+
    public void unregisterAcceptor(final int id)
    {      
       if (acceptors.remove(id) == null)
@@ -56,17 +55,17 @@
          throw new IllegalArgumentException("Acceptor with id " + id + " not registered");
       }
    }
-    
+
    public InVMAcceptor getAcceptor(final int id)
    {
       return acceptors.get(id);
    }
-   
+
    public void clear()
    {
       this.acceptors.clear();
    }
-   
+
    public int size()
    {
       return this.acceptors.size();

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java	2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java	2009-01-05 13:08:07 UTC (rev 5578)
@@ -166,23 +166,26 @@
    }
 
    public synchronized void start() throws Exception
-   {
+   {      
       if (started)
       {
          return;
       }
+      
+      queue.addConsumer(this);
 
       createTx();
 
-      createObjects();
-
-      queue.addConsumer(this);
-
-      started = true;
+      if (createObjects())
+      {         
+         started = true;
+         
+         queue.deliverAsync(executor);
+      }
    }
 
    public synchronized void stop() throws Exception
-   {
+   {      
       started = false;
 
       queue.removeConsumer(this);
@@ -239,7 +242,7 @@
          {
             return HandleStatus.BUSY;
          }
-         
+          
          reference.getQueue().referenceHandled();
          
          refs.add(reference);
@@ -290,7 +293,7 @@
 
    // Private -------------------------------------------------------
 
-   private void createObjects() throws Exception
+   private boolean createObjects() throws Exception
    {
       try
       {
@@ -302,12 +305,14 @@
          
          stop();
          
-         return;
+         return false;
       }
 
       session.addFailureListener(this);
 
       producer = session.createProducer(null);      
+      
+      return true;
    }
 
    private synchronized void timeoutBatch()
@@ -359,13 +364,13 @@
           
             SimpleString forwardingDestination = (SimpleString)message.getProperty(MessageImpl.HDR_ORIGIN_QUEUE);
 
-            producer.send(forwardingDestination, message);
-         }
+            producer.send(forwardingDestination, message);                       
+         }                 
 
          session.commit();
 
          tx.commit();
-
+         
          createTx();
 
          busy = false;

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java	2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java	2009-01-05 13:08:07 UTC (rev 5578)
@@ -39,7 +39,9 @@
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.BindingType;
+import org.jboss.messaging.core.postoffice.Bindings;
 import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.server.Link;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.cluster.Forwarder;
 import org.jboss.messaging.core.server.cluster.MessageFlow;
@@ -101,7 +103,7 @@
    private final int maxRetriesBeforeFailover;
 
    private final int maxRetriesAfterFailover;
-   
+
    private final boolean useDuplicateDetection;
 
    /*
@@ -159,7 +161,7 @@
       this.maxRetriesBeforeFailover = maxRetriesBeforeFailover;
 
       this.maxRetriesAfterFailover = maxRetriesAfterFailover;
-      
+
       this.useDuplicateDetection = useDuplicateDetection;
 
       this.updateConnectors(connectors);
@@ -220,7 +222,7 @@
       this.maxRetriesBeforeFailover = maxRetriesBeforeFailover;
 
       this.maxRetriesAfterFailover = maxRetriesAfterFailover;
-      
+
       this.useDuplicateDetection = useDuplicateDetection;
    }
 
@@ -265,15 +267,15 @@
    {
       return started;
    }
-   
+
    // MessageFlow implementation ------------------------------------
-   
+
    public SimpleString getName()
    {
       return name;
    }
-   
-   //For testing only
+
+   // For testing only
    public Set<Forwarder> getForwarders()
    {
       return new HashSet<Forwarder>(forwarders.values());
@@ -322,31 +324,66 @@
       {
          if (!forwarders.containsKey(connectorPair))
          {
-            SimpleString queueName = new SimpleString("outflow." + name +
-                                                      "." +
-                                                      UUIDGenerator.getInstance().generateSimpleStringUUID());
             
-            SimpleString linkName = new SimpleString("link." + queueName.toString());
-
-            Binding binding = postOffice.getBinding(queueName);
-
-            // TODO need to delete store and forward queues that are no longer in the config
-            // and also allow ability to change filterstring etc. while keeping the same name
-            if (binding == null)
+            SimpleString linkName = new SimpleString("link." + name + "." +
+                                                     generateConnectorString(connectorPair.a) + "-" +
+                                                     (connectorPair.b == null ? "null" : generateConnectorString(connectorPair.b)));
+            
+            Queue queue = null;
+            
+            Bindings bindings = postOffice.getBindingsForAddress(address);
+              
+            for (Binding binding: bindings.getBindings())
             {
+               if (binding.getType() == BindingType.LINK)
+               {
+                  Link link = (Link)binding.getBindable();
+                  
+                  if (link.getName().equals(linkName))
+                  {
+                     //Found the link
+                     
+                     SimpleString queueName = link.getLinkAddress();
+                     
+                     Binding queueBinding = postOffice.getBinding(queueName);
+                     
+                     if (queueBinding == null)
+                     {
+                        throw new IllegalStateException("Cannot find queue with name " + queueName);
+                     }
+                     
+                     queue = (Queue)queueBinding.getBindable();
+                  }
+               }
+            }
+            
+            if (queue == null)
+            {               
+               SimpleString queueName = new SimpleString("outflow." + name +
+                                                         "." +
+                                                         UUIDGenerator.getInstance().generateSimpleStringUUID());
+               
                Filter filter = filterString == null ? null : new FilterImpl(filterString);
 
-               //Create the queue
+               // Create the queue
+
+               Binding binding = postOffice.addQueueBinding(queueName, queueName, filter, true, false, exclusive);
                
-               binding = postOffice.addQueueBinding(queueName, queueName, filter, true, false, exclusive);
-               
-               //Create the link
-               
-               postOffice.addLinkBinding(linkName, address, filter, true, false, exclusive, queueName, useDuplicateDetection);
+               queue = (Queue)binding.getBindable();
+
+               // Create the link
+
+               postOffice.addLinkBinding(linkName,
+                                         address,
+                                         filter,
+                                         true,
+                                         false,
+                                         exclusive,
+                                         queueName,
+                                         useDuplicateDetection);
             }
-            
-            Queue queue = (Queue)binding.getBindable();
 
+    
             Forwarder forwarder = new ForwarderImpl(queue,
                                                     connectorPair,
                                                     executorFactory.getExecutor(),
@@ -370,5 +407,39 @@
          }
       }
    }
+   
+   private String replaceWildcardChars(final String str)
+   {
+      return str.replace('.', '-');
+   }
+   
+   private SimpleString generateConnectorString(final TransportConfiguration config) throws Exception
+   {      
+      StringBuilder str = new StringBuilder(replaceWildcardChars(config.getFactoryClassName()));
+      
+      if (!config.getParams().isEmpty())
+      {
+         str.append("?");
+      }
+      
+      boolean first = true;
+      for (Map.Entry<String, Object> entry: config.getParams().entrySet())
+      {
+         if (!first)
+         {
+            str.append("&");
+         }
+         String encodedKey = replaceWildcardChars(entry.getKey());
+         
+         String val = entry.getValue().toString();
+         String encodedVal = replaceWildcardChars(val);
+         
+         str.append(encodedKey).append('=').append(encodedVal);
+         
+         first = false;
+      }
 
+      return new SimpleString(str.toString());
+   }
+
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-01-05 13:08:07 UTC (rev 5578)
@@ -107,9 +107,9 @@
 
    private PostOffice postOffice;
 
-   private final ExecutorService asyncDeliveryPool = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-async-session-delivery-threads"));
+   private ExecutorService asyncDeliveryPool;
 
-   private final ExecutorFactory executorFactory = new OrderedExecutorFactory(asyncDeliveryPool);
+   private ExecutorFactory executorFactory;
 
    private HierarchicalRepository<Set<Role>> securityRepository;
 
@@ -156,7 +156,11 @@
       {
          return;
       }
+      
+      asyncDeliveryPool = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-async-session-delivery-threads"));
 
+      executorFactory = new OrderedExecutorFactory(asyncDeliveryPool);
+
       /*
        * The following components are pluggable on the messaging server: Configuration, StorageManager, RemotingService,
        * SecurityManager and ManagementRegistration They must already be injected by the time the messaging server

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-01-05 13:08:07 UTC (rev 5578)
@@ -159,7 +159,7 @@
 
       DuplicateIDCache cache = null;
 
-      if (duplicateID != null)
+      if (!message.isReload() && duplicateID != null)
       {
          cache = postOffice.getDuplicateIDCache(message.getDestination());
 
@@ -352,7 +352,7 @@
    }
 
    public void addLast(final MessageReference ref)
-   {      
+   {          
       add(ref, false);
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2009-01-05 13:08:07 UTC (rev 5578)
@@ -32,7 +32,6 @@
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
  * 
- * TODO - this should be refactored to use transaction operations for adding, acking paging stuff etc
  */
 public class TransactionImpl implements Transaction
 {

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java	2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java	2009-01-05 13:08:07 UTC (rev 5578)
@@ -30,11 +30,11 @@
 import junit.framework.AssertionFailedError;
 
 import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientFileMessage;
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.ClientProducer;
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.ClientFileMessage;
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
 import org.jboss.messaging.core.client.impl.ClientSessionImpl;
 import org.jboss.messaging.core.config.Configuration;
@@ -42,7 +42,6 @@
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.message.Message;
-import org.jboss.messaging.core.message.impl.MessageImpl;
 import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
 import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
 import org.jboss.messaging.core.remoting.impl.RemotingServiceImpl;

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/mock/MockConnector.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/mock/MockConnector.java	2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/mock/MockConnector.java	2009-01-05 13:08:07 UTC (rev 5578)
@@ -74,7 +74,7 @@
    @Override
    protected Connection internalCreateConnection(final BufferHandler handler, final ConnectionLifeCycleListener listener)
    {
-      return new MockConnection(handler, listener);
+      return new MockConnection(id, handler, listener);
    }
 
    // Private -------------------------------------------------------
@@ -93,9 +93,9 @@
        * @param handler
        * @param listener
        */
-      public MockConnection(final BufferHandler handler, final ConnectionLifeCycleListener listener)
+      public MockConnection(final int serverID, final BufferHandler handler, final ConnectionLifeCycleListener listener)
       {
-         super(handler, listener);
+         super(serverID, handler, listener);
       }
 
       @Override

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowRestartTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowRestartTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowRestartTest.java	2009-01-05 13:08:07 UTC (rev 5578)
@@ -0,0 +1,631 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ * 
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ * 
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.distribution;
+
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_USE_DUPLICATE_DETECTION;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.util.Pair;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * 
+ * A MessageFlowRestartTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 24 Nov 2008 14:26:45
+ *
+ *
+ */
+public class MessageFlowRestartTest extends ServiceTestBase
+{
+   private static final Logger log = Logger.getLogger(MessageFlowRestartTest.class);
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testRestartOutflow() throws Exception
+   {
+      Map<String, Object> service0Params = new HashMap<String, Object>();
+      MessagingService service0 = createClusteredServiceWithParams(true, service0Params);
+
+      Map<String, Object> service1Params = new HashMap<String, Object>();
+      service1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+      MessagingService service1 = createClusteredServiceWithParams(true, service1Params);
+      
+      //We don't start server 1 at this point
+      
+      Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+
+      TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service1Params,
+                                                                    "connector1");
+      connectors.put(server1tc.getName(), server1tc);
+
+      service0.getServer().getConfiguration().setConnectorConfigurations(connectors);
+
+      List<Pair<String, String>> connectorNames = new ArrayList<Pair<String, String>>();
+      connectorNames.add(new Pair<String, String>(server1tc.getName(), null));
+     
+      final SimpleString testAddress = new SimpleString("testaddress");
+
+      MessageFlowConfiguration ofconfig = new MessageFlowConfiguration("outflow1",
+                                                                       testAddress.toString(),
+                                                                       null,
+                                                                       false,
+                                                                       1,
+                                                                       -1,
+                                                                       null,
+                                                                       DEFAULT_RETRY_INTERVAL,
+                                                                       DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+                                                                       0,
+                                                                       0,
+                                                                       DEFAULT_USE_DUPLICATE_DETECTION,
+                                                                       connectorNames);
+      
+      Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+      ofconfigs.add(ofconfig);
+      service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
+
+      log.info("starting service0");
+      service0.start();
+      
+      log.info("started service");
+      
+      TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service0Params);
+
+      ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+      ClientSession session0 = csf0.createSession(false, true, true);
+      
+      session0.createQueue(testAddress, testAddress, null, false, false);
+            
+      ClientProducer prod0 = session0.createProducer(testAddress);
+
+      ClientConsumer cons0 = session0.createConsumer(testAddress);
+      
+      session0.start();
+
+      final int numMessages = 10;
+
+      final SimpleString propKey = new SimpleString("testkey");
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session0.createClientMessage(true);
+         message.putIntProperty(propKey, i);
+         message.getBody().flip();
+
+         prod0.send(message);
+      }
+      
+      log.info("sent messages");
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage rmessage0 = cons0.receive(1000);
+         assertNotNull(rmessage0);
+         assertEquals(i, rmessage0.getProperty(propKey));
+      }
+          
+      // At this point the messages should be in the store and forward queue for server 1
+      
+      // Now shutdown server 0 and start servers 1 and 0
+             
+      service0.stop();
+      
+      service1.start();
+      
+      ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+      ClientSession session1 = csf1.createSession(false, true, true);
+      
+      session1.createQueue(testAddress, testAddress, null, false, false);
+      
+      service0.start();
+         
+      csf0 = new ClientSessionFactoryImpl(server0tc);
+      session0 = csf0.createSession(false, true, true);
+      
+      session0.createQueue(testAddress, testAddress, null, false, false);
+            
+      cons0 = session0.createConsumer(testAddress);
+      
+      session0.start();
+                               
+      ClientConsumer cons1 = session1.createConsumer(testAddress);
+      
+      session1.start();
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage rmessage0 = cons1.receive(1000);
+         assertNotNull(rmessage0);
+         assertEquals(i, rmessage0.getProperty(propKey));
+      }
+      
+      ClientMessage rmessage = cons0.receive(1000);
+      
+      assertNull(rmessage);
+      
+      service0.stop();      
+      service1.stop();
+      
+      assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+      assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+   }
+//
+//   public void testStaticListRoundRobin() throws Exception
+//   {
+//      Map<String, Object> service0Params = new HashMap<String, Object>();
+//      MessagingService service0 = createMessagingService(0, service0Params);
+//
+//      Map<String, Object> service1Params = new HashMap<String, Object>();
+//      MessagingService service1 = createMessagingService(1, service1Params);
+//      service1.start();
+//
+//      Map<String, Object> service2Params = new HashMap<String, Object>();
+//      MessagingService service2 = createMessagingService(2, service2Params);
+//      service2.start();
+//
+//      Map<String, Object> service3Params = new HashMap<String, Object>();
+//      MessagingService service3 = createMessagingService(3, service3Params);
+//      service3.start();
+//
+//      Map<String, Object> service4Params = new HashMap<String, Object>();
+//      MessagingService service4 = createMessagingService(4, service4Params);
+//      service4.start();
+//
+//      Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+//
+//      TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+//                                                                    service1Params,
+//                                                                    "connector1");
+//      connectors.put(server1tc.getName(), server1tc);
+//
+//      TransportConfiguration server2tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+//                                                                    service2Params,
+//                                                                    "connector2");
+//      connectors.put(server2tc.getName(), server2tc);
+//
+//      TransportConfiguration server3tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+//                                                                    service3Params,
+//                                                                    "connector3");
+//      connectors.put(server3tc.getName(), server3tc);
+//
+//      TransportConfiguration server4tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+//                                                                    service4Params,
+//                                                                    "connector4");
+//      connectors.put(server4tc.getName(), server4tc);
+//
+//      service0.getServer().getConfiguration().setConnectorConfigurations(connectors);
+//
+//      List<Pair<String, String>> connectorNames = new ArrayList<Pair<String, String>>();
+//      connectorNames.add(new Pair<String, String>(server1tc.getName(), null));
+//      connectorNames.add(new Pair<String, String>(server2tc.getName(), null));
+//      connectorNames.add(new Pair<String, String>(server3tc.getName(), null));
+//      connectorNames.add(new Pair<String, String>(server4tc.getName(), null));
+//
+//      final SimpleString testAddress = new SimpleString("testaddress");
+//
+//      MessageFlowConfiguration ofconfig = new MessageFlowConfiguration("outflow1",
+//                                                                       testAddress.toString(),
+//                                                                       null,
+//                                                                       true,
+//                                                                       1,
+//                                                                       -1,
+//                                                                       null,
+//                                                                       DEFAULT_RETRY_INTERVAL,
+//                                                                       DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+//                                                                       DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+//                                                                       DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
+//                                                                       DEFAULT_USE_DUPLICATE_DETECTION,
+//                                                                       connectorNames);
+//      Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+//      ofconfigs.add(ofconfig);
+//      service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
+//
+//      service0.start();
+//
+//      TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+//                                                                    service0Params);
+//
+//      ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+//      ClientSession session0 = csf0.createSession(false, true, true);
+//
+//      ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+//      ClientSession session1 = csf1.createSession(false, true, true);
+//
+//      ClientSessionFactory csf2 = new ClientSessionFactoryImpl(server2tc);
+//      ClientSession session2 = csf2.createSession(false, true, true);
+//
+//      ClientSessionFactory csf3 = new ClientSessionFactoryImpl(server3tc);
+//      ClientSession session3 = csf3.createSession(false, true, true);
+//
+//      ClientSessionFactory csf4 = new ClientSessionFactoryImpl(server4tc);
+//      ClientSession session4 = csf4.createSession(false, true, true);
+//
+//      session0.createQueue(testAddress, testAddress, null, false, false);
+//      session1.createQueue(testAddress, testAddress, null, false, false);
+//      session2.createQueue(testAddress, testAddress, null, false, false);
+//      session3.createQueue(testAddress, testAddress, null, false, false);
+//      session4.createQueue(testAddress, testAddress, null, false, false);
+//
+//      ClientProducer prod0 = session0.createProducer(testAddress);
+//
+//      ClientConsumer cons0 = session0.createConsumer(testAddress);
+//      ClientConsumer cons1 = session1.createConsumer(testAddress);
+//      ClientConsumer cons2 = session2.createConsumer(testAddress);
+//      ClientConsumer cons3 = session3.createConsumer(testAddress);
+//      ClientConsumer cons4 = session4.createConsumer(testAddress);
+//
+//      session0.start();
+//
+//      session1.start();
+//      session2.start();
+//      session3.start();
+//      session4.start();
+//
+//      final int numMessages = 10;
+//
+//      final SimpleString propKey = new SimpleString("testkey");
+//
+//      for (int i = 0; i < numMessages; i++)
+//      {
+//         ClientMessage message = session0.createClientMessage(false);
+//         message.putIntProperty(propKey, i);
+//         message.getBody().flip();
+//
+//         prod0.send(message);
+//      }
+//
+//      // Refs should be round-robin'd in the same order the connectors are specified in the outflow
+//      // With the local consumer being last since it was created last
+//
+//      ArrayList<ClientConsumer> consumers = new ArrayList<ClientConsumer>();
+//
+//      consumers.add(cons1);
+//      consumers.add(cons2);
+//      consumers.add(cons3);
+//      consumers.add(cons4);
+//      consumers.add(cons0);
+//
+//      int count = 0;
+//      for (int i = 0; i < numMessages; i++)
+//      {
+//         ClientConsumer consumer = consumers.get(count);
+//
+//         count++;
+//         if (count == consumers.size())
+//         {
+//            count = 0;
+//         }
+//
+//         ClientMessage msg = consumer.receive(1000);
+//
+//         assertNotNull(msg);
+//
+//         assertEquals(i, msg.getProperty(propKey));
+//
+//         msg.acknowledge();
+//      }
+//
+//      session0.close();
+//      session1.close();
+//      session2.close();
+//      session3.close();
+//      session4.close();
+//
+//      service0.stop();
+//      service1.stop();
+//      service2.stop();
+//      service3.stop();
+//      service4.stop();
+//
+//      assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+//      assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+//      assertEquals(0, service2.getServer().getRemotingService().getConnections().size());
+//      assertEquals(0, service3.getServer().getRemotingService().getConnections().size());
+//      assertEquals(0, service4.getServer().getRemotingService().getConnections().size());
+//   }
+//
+//
+//   public void testMultipleFlows() throws Exception
+//   {
+//      Map<String, Object> service0Params = new HashMap<String, Object>();
+//      MessagingService service0 = createMessagingService(0, service0Params);
+//
+//      Map<String, Object> service1Params = new HashMap<String, Object>();
+//      MessagingService service1 = createMessagingService(1, service1Params);
+//      service1.start();
+//
+//      Map<String, Object> service2Params = new HashMap<String, Object>();
+//      MessagingService service2 = createMessagingService(2, service2Params);
+//      service2.start();
+//
+//      Map<String, Object> service3Params = new HashMap<String, Object>();
+//      MessagingService service3 = createMessagingService(3, service3Params);
+//      service3.start();
+//
+//      Map<String, Object> service4Params = new HashMap<String, Object>();
+//      MessagingService service4 = createMessagingService(4, service4Params);
+//      service4.start();
+//
+//      Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+//
+//      TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+//                                                                    service1Params,
+//                                                                    "connector1");
+//      connectors.put(server1tc.getName(), server1tc);
+//
+//      TransportConfiguration server2tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+//                                                                    service2Params,
+//                                                                    "connector2");
+//      connectors.put(server2tc.getName(), server2tc);
+//
+//      TransportConfiguration server3tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+//                                                                    service3Params,
+//                                                                    "connector3");
+//      connectors.put(server3tc.getName(), server3tc);
+//
+//      TransportConfiguration server4tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+//                                                                    service4Params,
+//                                                                    "connector4");
+//      connectors.put(server4tc.getName(), server4tc);
+//
+//      service0.getServer().getConfiguration().setConnectorConfigurations(connectors);
+//
+//      List<Pair<String, String>> connectorNames1 = new ArrayList<Pair<String, String>>();
+//      connectorNames1.add(new Pair<String, String>(server1tc.getName(), null));
+//      
+//      List<Pair<String, String>> connectorNames2 = new ArrayList<Pair<String, String>>();
+//      connectorNames2.add(new Pair<String, String>(server2tc.getName(), null));
+//      
+//      List<Pair<String, String>> connectorNames3 = new ArrayList<Pair<String, String>>();
+//      connectorNames3.add(new Pair<String, String>(server3tc.getName(), null));
+//      
+//      List<Pair<String, String>> connectorNames4 = new ArrayList<Pair<String, String>>();
+//      connectorNames4.add(new Pair<String, String>(server4tc.getName(), null));
+//      
+//      final SimpleString testAddress = new SimpleString("testaddress");
+//
+//      MessageFlowConfiguration ofconfig1 = new MessageFlowConfiguration("flow1",
+//                                                                        testAddress.toString(),
+//                                                                        "beatle='john'",
+//                                                                        false,
+//                                                                        1,
+//                                                                        -1,
+//                                                                        null,
+//                                                                        DEFAULT_RETRY_INTERVAL,
+//                                                                        DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+//                                                                        DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+//                                                                        DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
+//                                                                        DEFAULT_USE_DUPLICATE_DETECTION,
+//                                                                        connectorNames1);
+//      MessageFlowConfiguration ofconfig2 = new MessageFlowConfiguration("flow2",
+//                                                                        testAddress.toString(),
+//                                                                        "beatle='paul'",
+//                                                                        false,
+//                                                                        1,
+//                                                                        -1,
+//                                                                        null,
+//                                                                        DEFAULT_RETRY_INTERVAL,
+//                                                                        DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+//                                                                        DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+//                                                                        DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
+//                                                                        DEFAULT_USE_DUPLICATE_DETECTION,
+//                                                                        connectorNames2);
+//      MessageFlowConfiguration ofconfig3 = new MessageFlowConfiguration("flow3",
+//                                                                        testAddress.toString(),
+//                                                                        "beatle='george'",
+//                                                                        false,
+//                                                                        1,
+//                                                                        -1,
+//                                                                        null,
+//                                                                        DEFAULT_RETRY_INTERVAL,
+//                                                                        DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+//                                                                        DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+//                                                                        DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
+//                                                                        DEFAULT_USE_DUPLICATE_DETECTION,
+//                                                                        connectorNames3);
+//      MessageFlowConfiguration ofconfig4 = new MessageFlowConfiguration("flow4",
+//                                                                        testAddress.toString(),
+//                                                                        "beatle='ringo'",
+//                                                                        false,
+//                                                                        1,
+//                                                                        -1,
+//                                                                        null,
+//                                                                        DEFAULT_RETRY_INTERVAL,
+//                                                                        DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+//                                                                        DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+//                                                                        DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
+//                                                                        DEFAULT_USE_DUPLICATE_DETECTION,
+//                                                                        connectorNames4);
+//
+//      Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+//      ofconfigs.add(ofconfig1);
+//      ofconfigs.add(ofconfig2);
+//      ofconfigs.add(ofconfig3);
+//      ofconfigs.add(ofconfig4);
+//      service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
+//
+//      service0.start();
+//
+//      TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+//                                                                    service0Params);
+//
+//      ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+//      ClientSession session0 = csf0.createSession(false, true, true);
+//
+//      ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+//      ClientSession session1 = csf1.createSession(false, true, true);
+//
+//      ClientSessionFactory csf2 = new ClientSessionFactoryImpl(server2tc);
+//      ClientSession session2 = csf2.createSession(false, true, true);
+//
+//      ClientSessionFactory csf3 = new ClientSessionFactoryImpl(server3tc);
+//      ClientSession session3 = csf3.createSession(false, true, true);
+//
+//      ClientSessionFactory csf4 = new ClientSessionFactoryImpl(server4tc);
+//      ClientSession session4 = csf4.createSession(false, true, true);
+//
+//      session0.createQueue(testAddress, testAddress, null, false, false);
+//      session1.createQueue(testAddress, testAddress, null, false, false);
+//      session2.createQueue(testAddress, testAddress, null, false, false);
+//      session3.createQueue(testAddress, testAddress, null, false, false);
+//      session4.createQueue(testAddress, testAddress, null, false, false);
+//
+//      ClientProducer prod0 = session0.createProducer(testAddress);
+//
+//      ClientConsumer cons1 = session1.createConsumer(testAddress);
+//      ClientConsumer cons2 = session2.createConsumer(testAddress);
+//      ClientConsumer cons3 = session3.createConsumer(testAddress);
+//      ClientConsumer cons4 = session4.createConsumer(testAddress);
+//
+//      session1.start();
+//      session2.start();
+//      session3.start();
+//      session4.start();
+//
+//      SimpleString propKey = new SimpleString("beatle");
+//
+//      ClientMessage messageJohn = session0.createClientMessage(false);
+//      messageJohn.putStringProperty(propKey, new SimpleString("john"));
+//      messageJohn.getBody().flip();
+//
+//      ClientMessage messagePaul = session0.createClientMessage(false);
+//      messagePaul.putStringProperty(propKey, new SimpleString("paul"));
+//      messagePaul.getBody().flip();
+//
+//      ClientMessage messageGeorge = session0.createClientMessage(false);
+//      messageGeorge.putStringProperty(propKey, new SimpleString("george"));
+//      messageGeorge.getBody().flip();
+//
+//      ClientMessage messageRingo = session0.createClientMessage(false);
+//      messageRingo.putStringProperty(propKey, new SimpleString("ringo"));
+//      messageRingo.getBody().flip();
+//
+//      ClientMessage messageOsama = session0.createClientMessage(false);
+//      messageOsama.putStringProperty(propKey, new SimpleString("osama"));
+//      messageOsama.getBody().flip();
+//
+//      prod0.send(messageJohn);
+//      prod0.send(messagePaul);
+//      prod0.send(messageGeorge);
+//      prod0.send(messageRingo);
+//      prod0.send(messageOsama);
+//
+//      ClientMessage r1 = cons1.receive(1000);
+//      assertNotNull(r1);
+//      assertEquals(new SimpleString("john"), r1.getProperty(propKey));
+//      r1 = cons1.receiveImmediate();
+//      assertNull(r1);
+//
+//      ClientMessage r2 = cons2.receive(1000);
+//      assertNotNull(r2);
+//      assertEquals(new SimpleString("paul"), r2.getProperty(propKey));
+//      r2 = cons2.receiveImmediate();
+//      assertNull(r2);
+//
+//      ClientMessage r3 = cons3.receive(1000);
+//      assertNotNull(r3);
+//      assertEquals(new SimpleString("george"), r3.getProperty(propKey));
+//      r3 = cons3.receiveImmediate();
+//      assertNull(r3);
+//
+//      ClientMessage r4 = cons4.receive(1000);
+//      assertNotNull(r4);
+//      assertEquals(new SimpleString("ringo"), r4.getProperty(propKey));
+//      r4 = cons4.receiveImmediate();
+//      assertNull(r4);
+//
+//      session0.close();
+//      session1.close();
+//      session2.close();
+//      session3.close();
+//      session4.close();
+//
+//      service0.stop();
+//      service1.stop();
+//      service2.stop();
+//      service3.stop();
+//      service4.stop();
+//
+//      assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+//      assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+//      assertEquals(0, service2.getServer().getRemotingService().getConnections().size());
+//      assertEquals(0, service3.getServer().getRemotingService().getConnections().size());
+//      assertEquals(0, service4.getServer().getRemotingService().getConnections().size());
+//   }
+   
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.clearData();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      assertEquals(0, InVMRegistry.instance.size());
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}
+
+

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowTestBase.java	2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowTestBase.java	2009-01-05 13:08:07 UTC (rev 5578)
@@ -45,6 +45,22 @@
  */
 public abstract class MessageFlowTestBase extends TestCase
 {
+   
+   protected MessagingService createMessagingServiceNIO(final int id, final Map<String, Object> params)
+   {
+      Configuration serviceConf = new ConfigurationImpl();
+      serviceConf.setClustered(true);
+      serviceConf.setSecurityEnabled(false); 
+      serviceConf.setJournalMinFiles(2);
+      serviceConf.setJournalFileSize(100 * 1024);
+      params.put(TransportConstants.SERVER_ID_PROP_NAME, id);
+      serviceConf.getAcceptorConfigurations()
+                  .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+                                                  params));
+      MessagingService service = MessagingServiceImpl.newMessagingService(serviceConf);
+      return service;
+   }
+   
    protected MessagingService createMessagingService(final int id, final Map<String, Object> params)
    {
       Configuration serviceConf = new ConfigurationImpl();

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java	2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java	2009-01-05 13:08:07 UTC (rev 5578)
@@ -74,7 +74,7 @@
 
       managerImpl.start();
 
-      TestSupportPageStore store = (TestSupportPageStore)managerImpl.createPageStore(new SimpleString("simple-test"));
+      TestSupportPageStore store = (TestSupportPageStore)managerImpl.createPageStore(new SimpleString("simple-test"), true);
 
       ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(10));
 
@@ -123,7 +123,7 @@
                                                             false);
       managerImpl.start();
 
-      managerImpl.createPageStore(new SimpleString("simple-test"));
+      managerImpl.createPageStore(new SimpleString("simple-test"), true);
 
       ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(100));
 

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-01-05 13:08:07 UTC (rev 5578)
@@ -67,7 +67,7 @@
                                                   factory,
                                                   destinationTestName,
                                                   new QueueSettings(),
-                                                  executor);
+                                                  executor, true);
 
       storeImpl.start();
 
@@ -90,7 +90,7 @@
                                                   factory,
                                                   destinationTestName,
                                                   new QueueSettings(),
-                                                  executor);
+                                                  executor, true);
 
       storeImpl.start();
 
@@ -123,7 +123,7 @@
                                       factory,
                                       destinationTestName,
                                       new QueueSettings(),
-                                      executor);
+                                      executor, true);
 
       storeImpl.start();
 
@@ -141,7 +141,7 @@
                                                   factory,
                                                   destinationTestName,
                                                   new QueueSettings(),
-                                                  executor);
+                                                  executor, true);
 
       storeImpl.start();
 
@@ -202,7 +202,7 @@
                                                            factory,
                                                            destinationTestName,
                                                            new QueueSettings(),
-                                                           executor);
+                                                           executor, true);
 
       storeImpl.start();
 

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java	2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java	2009-01-05 13:08:07 UTC (rev 5578)
@@ -113,7 +113,7 @@
                                                                  factory,
                                                                  new SimpleString("test"),
                                                                  settings,
-                                                                 executor);
+                                                                 executor, true);
 
       storeImpl.start();
 
@@ -268,7 +268,7 @@
                                                             factory,
                                                             new SimpleString("test"),
                                                             settings,
-                                                            executor);
+                                                            executor, true);
       storeImpl2.start();
 
       int numberOfPages = storeImpl2.getNumberOfPages();

Modified: trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java	2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java	2009-01-05 13:08:07 UTC (rev 5578)
@@ -135,7 +135,12 @@
    {
       return createService(realFiles, configuration, new HashMap<String, QueueSettings>());
    }
-
+   
+   protected MessagingService createClusteredServiceWithParams(final boolean realFiles, final Map<String, Object> params)
+   {
+      return createService(realFiles, createClusteredDefaultConfig(params, INVM_ACCEPTOR_FACTORY), new HashMap<String, QueueSettings>());
+   }
+   
    protected Configuration createDefaultConfig()
    {
       return createDefaultConfig(false);
@@ -145,16 +150,24 @@
    {
       if (netty)
       {
-         return createDefaultConfig(INVM_ACCEPTOR_FACTORY, NETTY_ACCEPTOR_FACTORY);
+         return createDefaultConfig(new HashMap<String, Object>(), INVM_ACCEPTOR_FACTORY, NETTY_ACCEPTOR_FACTORY);
       }
       else
       {
-         return createDefaultConfig(INVM_ACCEPTOR_FACTORY);
-      }
+         return createDefaultConfig(new HashMap<String, Object>(), INVM_ACCEPTOR_FACTORY);
+      }      
+   }
+   
+   protected Configuration createClusteredDefaultConfig(final Map<String, Object> params, final String... acceptors)
+   {
+      Configuration config = createDefaultConfig(params, acceptors);
       
+      config.setClustered(true);
+      
+      return config;
    }
 
-   protected Configuration createDefaultConfig(final String... acceptors)
+   protected Configuration createDefaultConfig(final Map<String, Object> params, final String... acceptors)
    {
       Configuration configuration = new ConfigurationImpl();
       configuration.setSecurityEnabled(false);
@@ -169,7 +182,7 @@
 
       for (String acceptor : acceptors)
       {
-         TransportConfiguration transportConfig = new TransportConfiguration(acceptor);
+         TransportConfiguration transportConfig = new TransportConfiguration(acceptor, params);
          configuration.getAcceptorConfigurations().add(transportConfig);
       }
 




More information about the jboss-cvs-commits mailing list