[jboss-cvs] JBoss Messaging SVN: r5469 - in trunk: src/main/org/jboss/messaging/core/config and 10 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sun Dec 7 09:27:40 EST 2008


Author: timfox
Date: 2008-12-07 09:27:39 -0500 (Sun, 07 Dec 2008)
New Revision: 5469

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowReconnectTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/config/TransportConfiguration.java
   trunk/src/main/org/jboss/messaging/core/config/cluster/MessageFlowConfiguration.java
   trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
   trunk/src/main/org/jboss/messaging/core/remoting/Channel.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
   trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterManager.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/MessageFlow.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/BasicMessageFlowTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/DiscoveryFlowTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowBatchSizeTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowBatchTimeTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowTransformerTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWildcardTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWithFilterTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/StaticFlowTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailureListenerOnFailoverTest.java
Log:
More reconnect stuff


Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2008-12-07 14:27:39 UTC (rev 5469)
@@ -590,8 +590,6 @@
                connection.destroy();
             }
 
-            log.info("done is " + done);
-
             return done;
          }
          else
@@ -682,7 +680,7 @@
    }
 
    private RemotingConnection getConnectionWithRetry(final List<ClientSessionInternal> sessions, final int retries)
-   {
+   {      
       long interval = retryInterval;
 
       int count = 0;
@@ -696,7 +694,9 @@
             // Failed to get backup connection
 
             if (retries != 0)
-            {
+            {              
+               count++;
+               
                if (retries != -1 && count == retries)
                {
                   log.warn("Retried " + retries + " times to reconnect. Now giving up.");
@@ -704,8 +704,6 @@
                   return null;
                }
 
-               count++;
-
                log.warn("Now waiting " + interval + " ms before attempting reconnection.");
 
                try

Modified: trunk/src/main/org/jboss/messaging/core/config/TransportConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/TransportConfiguration.java	2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/src/main/org/jboss/messaging/core/config/TransportConfiguration.java	2008-12-07 14:27:39 UTC (rev 5469)
@@ -74,8 +74,6 @@
       return params;
    }
    
-   private int hash = -1;
-   
    public int hashCode()
    {
       return factoryClassName.hashCode();

Modified: trunk/src/main/org/jboss/messaging/core/config/cluster/MessageFlowConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/cluster/MessageFlowConfiguration.java	2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/src/main/org/jboss/messaging/core/config/cluster/MessageFlowConfiguration.java	2008-12-07 14:27:39 UTC (rev 5469)
@@ -58,6 +58,14 @@
 
    private final String transformerClassName;
 
+   private final long retryInterval;
+
+   private final double retryIntervalMultiplier;
+
+   private final int maxRetriesBeforeFailover;
+
+   private final int maxRetriesAfterFailover;
+
    public MessageFlowConfiguration(final String name,
                                    final String address,
                                    final String filterString,
@@ -65,6 +73,10 @@
                                    final int maxBatchSize,
                                    final long maxBatchTime,
                                    final String transformerClassName,
+                                   final long retryInterval,
+                                   final double retryIntervalMultiplier,
+                                   final int maxRetriesBeforeFailover,
+                                   final int maxRetriesAfterFailover,
                                    final List<Pair<String, String>> staticConnectorNamePairs)
    {
       this.name = name;
@@ -74,6 +86,10 @@
       this.maxBatchSize = maxBatchSize;
       this.maxBatchTime = maxBatchTime;
       this.transformerClassName = transformerClassName;
+      this.retryInterval = retryInterval;
+      this.retryIntervalMultiplier = retryIntervalMultiplier;
+      this.maxRetriesBeforeFailover = maxRetriesBeforeFailover;
+      this.maxRetriesAfterFailover = maxRetriesAfterFailover;
       this.staticConnectorNamePairs = staticConnectorNamePairs;
       this.discoveryGroupName = null;
    }
@@ -85,6 +101,10 @@
                                    final int maxBatchSize,
                                    final long maxBatchTime,
                                    final String transformerClassName,
+                                   final long retryInterval,
+                                   final double retryIntervalMultiplier,
+                                   final int maxRetriesBeforeFailover,
+                                   final int maxRetriesAfterFailover,
                                    final String discoveryGroupName)
    {
       this.name = name;
@@ -94,6 +114,10 @@
       this.maxBatchSize = maxBatchSize;
       this.maxBatchTime = maxBatchTime;
       this.transformerClassName = transformerClassName;
+      this.retryInterval = retryInterval;
+      this.retryIntervalMultiplier = retryIntervalMultiplier;
+      this.maxRetriesBeforeFailover = maxRetriesBeforeFailover;
+      this.maxRetriesAfterFailover = maxRetriesAfterFailover;
       this.staticConnectorNamePairs = null;
       this.discoveryGroupName = discoveryGroupName;
    }
@@ -142,4 +166,29 @@
    {
       return this.discoveryGroupName;
    }
+
+   public List<Pair<String, String>> getStaticConnectorNamePairs()
+   {
+      return staticConnectorNamePairs;
+   }
+
+   public long getRetryInterval()
+   {
+      return retryInterval;
+   }
+
+   public double getRetryIntervalMultiplier()
+   {
+      return retryIntervalMultiplier;
+   }
+
+   public int getMaxRetriesBeforeFailover()
+   {
+      return maxRetriesBeforeFailover;
+   }
+
+   public int getMaxRetriesAfterFailover()
+   {
+      return maxRetriesAfterFailover;
+   }
 }

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2008-12-07 14:27:39 UTC (rev 5469)
@@ -22,6 +22,11 @@
 
 package org.jboss.messaging.core.config.impl;
 
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+
 import java.io.InputStreamReader;
 import java.io.Reader;
 import java.net.URL;
@@ -30,6 +35,7 @@
 import java.util.List;
 import java.util.Map;
 
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
 import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
@@ -514,6 +520,14 @@
       String discoveryGroupName = null;
 
       String transformerClassName = null;
+      
+      long retryInterval = DEFAULT_RETRY_INTERVAL;
+      
+      double retryIntervalMultiplier = DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+      
+      int maxRetriesBeforeFailover = DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
+      
+      int maxRetriesAfterFailover = DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
 
       NodeList children = bgNode.getChildNodes();
 
@@ -549,6 +563,22 @@
          {
             transformerClassName = child.getTextContent().trim();
          }
+         else if (child.getNodeName().equals("retry-interval"))
+         {
+            retryInterval = XMLUtil.parseLong(child);
+         }
+         else if (child.getNodeName().equals("retry-interval-multiplier"))
+         {
+            retryIntervalMultiplier = XMLUtil.parseDouble(child);
+         }
+         else if (child.getNodeName().equals("max-retries-before-failover"))
+         {
+            maxRetriesBeforeFailover = XMLUtil.parseInt(child);
+         }
+         else if (child.getNodeName().equals("max-retries-after-failover"))
+         {
+            maxRetriesAfterFailover = XMLUtil.parseInt(child);
+         }
          else if (child.getNodeName().equals("connector"))
          {
             String connectorName = child.getAttributes().getNamedItem("connector-name").getNodeValue();
@@ -577,6 +607,10 @@
                                                maxBatchSize,
                                                maxBatchTime,
                                                transformerClassName,
+                                               retryInterval,
+                                               retryIntervalMultiplier,
+                                               maxRetriesBeforeFailover,
+                                               maxRetriesAfterFailover,
                                                staticConnectorNames);
       }
       else
@@ -588,6 +622,10 @@
                                                maxBatchSize,
                                                maxBatchTime,
                                                transformerClassName,
+                                               retryInterval,
+                                               retryIntervalMultiplier,
+                                               maxRetriesBeforeFailover,
+                                               maxRetriesAfterFailover,                                               
                                                discoveryGroupName);
       }
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/Channel.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/Channel.java	2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/src/main/org/jboss/messaging/core/remoting/Channel.java	2008-12-07 14:27:39 UTC (rev 5469)
@@ -36,8 +36,6 @@
 
    void close();
 
-   void fail();
-
    Channel getReplicatingChannel();
 
    void transferConnection(RemotingConnection newConnection);

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-12-07 14:27:39 UTC (rev 5469)
@@ -444,12 +444,7 @@
       // Then call the listeners
       callListeners(me);
 
-      internalClose();
-
-      for (Channel channel : channels.values())
-      {
-         channel.fail();
-      }      
+      internalClose();   
    }
 
    public void destroy()
@@ -1045,8 +1040,6 @@
 
       public Packet sendBlocking(final Packet packet) throws MessagingException
       {
-         // System.identityHashCode(this.connection) + " " + packet.getType());
-
          if (closed)
          {
             throw new MessagingException(MessagingException.NOT_CONNECTED, "Connection is destroyed");
@@ -1203,7 +1196,7 @@
       }
 
       public void replicateComplete()
-      {
+      {      
          if (!connection.active)
          {
             // We're on backup so send back a replication response
@@ -1317,10 +1310,6 @@
          closed = true;
       }
 
-      public void fail()
-      {
-      }
-
       public Channel getReplicatingChannel()
       {
          return replicatingChannel;

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java	2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java	2008-12-07 14:27:39 UTC (rev 5469)
@@ -103,7 +103,7 @@
       
       connectionTTL = config.getConnectionTTLOverride();
 
-      backup = config.isBackup();
+      backup = config.isBackup();            
    }
 
    // RemotingService implementation -------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java	2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java	2008-12-07 14:27:39 UTC (rev 5469)
@@ -26,6 +26,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
 import org.jboss.messaging.core.security.JBMSecurityManager;
 import org.jboss.messaging.core.security.Role;
+import org.jboss.messaging.core.server.cluster.ClusterManager;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.ResourceManager;
@@ -110,4 +111,6 @@
    ResourceManager getResourceManager();
 
    List<ServerSession> getSessions(String connectionID);
+   
+   ClusterManager getClusterManager();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterManager.java	2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterManager.java	2008-12-07 14:27:39 UTC (rev 5469)
@@ -20,12 +20,10 @@
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
  */
 
-
 package org.jboss.messaging.core.server.cluster;
 
-import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
-import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
-import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
+import java.util.Map;
+
 import org.jboss.messaging.core.server.MessagingComponent;
 
 /**
@@ -35,8 +33,8 @@
  * 
  * Created 18 Nov 2008 09:23:26
  *
- *
  */
 public interface ClusterManager extends MessagingComponent
 {
+   Map<String, MessageFlow> getMessageFlows();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/MessageFlow.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/MessageFlow.java	2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/MessageFlow.java	2008-12-07 14:27:39 UTC (rev 5469)
@@ -23,6 +23,8 @@
 
 package org.jboss.messaging.core.server.cluster;
 
+import java.util.Set;
+
 import org.jboss.messaging.core.server.MessagingComponent;
 
 /**
@@ -35,7 +37,6 @@
  *
  */
 public interface MessageFlow extends MessagingComponent
-{
-   
-
+{   
+   Set<Forwarder> getForwarders();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java	2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java	2008-12-07 14:27:39 UTC (rev 5469)
@@ -84,7 +84,7 @@
    private final Configuration configuration;
 
    private volatile boolean started;
-
+      
    public ClusterManagerImpl(final ExecutorFactory executorFactory,
                              final StorageManager storageManager,
                              final PostOffice postOffice,
@@ -165,6 +165,11 @@
    {
       return started;
    }
+   
+   public Map<String, MessageFlow> getMessageFlows()
+   {
+      return new HashMap<String, MessageFlow>(messageFlows);
+   }
 
    private synchronized void deployBroadcastGroup(final BroadcastGroupConfiguration config) throws Exception
    {
@@ -359,6 +364,10 @@
                                     queueSettingsRepository,
                                     scheduledExecutor,
                                     transformer,
+                                    config.getRetryInterval(),
+                                    config.getRetryIntervalMultiplier(),
+                                    config.getMaxRetriesBeforeFailover(),
+                                    config.getMaxRetriesAfterFailover(),
                                     conns);
       }
       else
@@ -388,6 +397,10 @@
                                     queueSettingsRepository,
                                     scheduledExecutor,
                                     transformer,
+                                    config.getRetryInterval(),
+                                    config.getRetryIntervalMultiplier(),
+                                    config.getMaxRetriesBeforeFailover(),
+                                    config.getMaxRetriesAfterFailover(),
                                     group);
       }
 

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java	2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java	2008-12-07 14:27:39 UTC (rev 5469)
@@ -32,10 +32,14 @@
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.client.ClientSessionFactory;
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
 import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.remoting.FailureListener;
+import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.server.HandleStatus;
 import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.Queue;
@@ -58,7 +62,7 @@
  *
  *
  */
-public class ForwarderImpl implements Forwarder
+public class ForwarderImpl implements Forwarder, FailureListener
 {
    // Constants -----------------------------------------------------
 
@@ -81,25 +85,25 @@
    private java.util.Queue<MessageReference> refs = new LinkedList<MessageReference>();
 
    private Transaction tx;
-   
+
    private long lastReceivedTime = -1;
-   
+
    private final StorageManager storageManager;
 
    private final PostOffice postOffice;
 
    private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
-     
+
    private final Transformer transformer;
 
    private final ClientSessionFactory csf;
-   
+
    private ClientSession session;
 
    private ClientProducer producer;
-      
+
    private volatile boolean started;
-   
+
    private final ScheduledFuture<?> future;
 
    // Static --------------------------------------------------------
@@ -109,15 +113,19 @@
    // Public --------------------------------------------------------
 
    public ForwarderImpl(final Queue queue,
-                        final Pair<TransportConfiguration,TransportConfiguration> connectorPair,
+                        final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
                         final Executor executor,
                         final int maxBatchSize,
                         final long maxBatchTime,
                         final StorageManager storageManager,
                         final PostOffice postOffice,
-                        final HierarchicalRepository<QueueSettings> queueSettingsRepository,  
+                        final HierarchicalRepository<QueueSettings> queueSettingsRepository,
                         final ScheduledExecutorService scheduledExecutor,
-                        final Transformer transformer) throws Exception
+                        final Transformer transformer,
+                        final long retryInterval,
+                        final double retryIntervalMultiplier,
+                        final int maxRetriesBeforeFailover,
+                        final int maxRetriesAfterFailover)
    {
       this.queue = queue;
 
@@ -132,36 +140,42 @@
       this.postOffice = postOffice;
 
       this.queueSettingsRepository = queueSettingsRepository;
-      
+
       this.transformer = transformer;
-      
-      this.csf = new ClientSessionFactoryImpl(connectorPair.a, connectorPair.b);  
-      
+
+      this.csf = new ClientSessionFactoryImpl(connectorPair.a,
+                                              connectorPair.b,
+                                              retryInterval,
+                                              retryIntervalMultiplier,
+                                              maxRetriesBeforeFailover,
+                                              maxRetriesAfterFailover);
+
       if (maxBatchTime != -1)
       {
-         future = scheduledExecutor.scheduleAtFixedRate(new BatchTimeout(), maxBatchTime, maxBatchTime, TimeUnit.MILLISECONDS);
+         future = scheduledExecutor.scheduleAtFixedRate(new BatchTimeout(),
+                                                        maxBatchTime,
+                                                        maxBatchTime,
+                                                        TimeUnit.MILLISECONDS);
       }
       else
       {
          future = null;
       }
    }
-   
+
    public synchronized void start() throws Exception
    {
       if (started)
       {
          return;
       }
-      
+
       createTx();
-      
-      session = csf.createSession(false, false, false);
 
-      producer = session.createProducer(null);
+      createObjects();
 
-      queue.addConsumer(this);       
-      
+      queue.addConsumer(this);
+
       started = true;
    }
 
@@ -170,7 +184,7 @@
       started = false;
 
       queue.removeConsumer(this);
-      
+
       if (future != null)
       {
          future.cancel(false);
@@ -188,19 +202,25 @@
       {
          log.warn("Timed out waiting for batch to be sent");
       }
-      
+
       session.close();
-      
+
       started = false;
    }
-   
+
    public boolean isStarted()
    {
       return started;
    }
+   
+   //For testing only
+   public RemotingConnection getForwardingConnection()
+   {
+      return ((ClientSessionImpl)session).getConnection();
+   }
 
    // Consumer implementation ---------------------------------------
-   
+
    public HandleStatus handle(final MessageReference reference) throws Exception
    {
       if (busy)
@@ -216,7 +236,7 @@
          }
 
          refs.add(reference);
-         
+
          if (maxBatchTime != -1)
          {
             lastReceivedTime = System.currentTimeMillis();
@@ -235,75 +255,114 @@
       }
    }
 
+   // FailureListener implementation --------------------------------
+
+   public synchronized boolean connectionFailed(final MessagingException me)
+   {
+      //By the time this is called
+      synchronized (this)
+      {      
+         try
+         {
+            session.close();
+   
+            createObjects();
+         }
+         catch (Exception e)
+         {
+            log.error("Failed to reconnect", e);
+         }
+   
+         return true;
+      }
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
 
    // Private -------------------------------------------------------
 
+   private void createObjects() throws Exception
+   {
+      try
+      {
+         session = csf.createSession(false, false, false);
+      }
+      catch (MessagingException me)
+      {
+         log.warn("Unable to connect. Message flow is now disabled.");
+         
+         stop();
+         
+         return;
+      }
+
+      session.addFailureListener(this);
+
+      producer = session.createProducer(null);      
+   }
+
    private synchronized void timeoutBatch()
    {
       if (!started)
       {
          return;
       }
-      
+
       if (lastReceivedTime != -1 && count > 0)
       {
          long now = System.currentTimeMillis();
-         
+
          if (now - lastReceivedTime >= maxBatchTime)
          {
             sendBatch();
          }
-      }      
+      }
    }
-   
-   private void sendBatch()
+
+   private synchronized void sendBatch()
    {
       try
       {
-         synchronized (this)
+         if (count == 0)
          {
-            if (count == 0)
+            return;
+         }
+         
+         // TODO - duplicate detection on sendee and if batch size = 1 then don't need tx
+
+         while (true)
+         {
+            MessageReference ref = refs.poll();
+
+            if (ref == null)
             {
-               return;
+               break;
             }
-            
-            // TODO - duplicate detection on sendee and if batch size = 1 then don't need tx
 
-            while (true)
-            {
-               MessageReference ref = refs.poll();
+            tx.addAcknowledgement(ref);
 
-               if (ref == null)
-               {
-                  break;
-               }
+            ServerMessage message = ref.getMessage();
 
-               tx.addAcknowledgement(ref);
-
-               ServerMessage message = ref.getMessage();
-               
-               if (transformer != null)
-               {
-                  message = transformer.transform(message);
-               }
-
-               producer.send(message.getDestination(), message);
+            if (transformer != null)
+            {
+               message = transformer.transform(message);
             }
 
-            session.commit();
+            producer.send(message.getDestination(), message);
+         }
 
-            tx.commit();
+         session.commit();
 
-            createTx();
+         tx.commit();
 
-            busy = false;
+         createTx();
 
-            count = 0;
-         }
+         busy = false;
 
+         count = 0;
+         
          queue.deliverAsync(executor);
       }
       catch (Exception e)
@@ -335,7 +394,7 @@
          sendBatch();
       }
    }
-   
+
    private class BatchTimeout implements Runnable
    {
       public void run()

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java	2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java	2008-12-07 14:27:39 UTC (rev 5469)
@@ -92,6 +92,14 @@
 
    private volatile boolean started;
 
+   private final long retryInterval;
+
+   private final double retryIntervalMultiplier;
+
+   private final int maxRetriesBeforeFailover;
+
+   private final int maxRetriesAfterFailover;
+
    /*
     * Constructor using static list of connectors
     */
@@ -107,7 +115,11 @@
                           final HierarchicalRepository<QueueSettings> queueSettingsRepository,
                           final ScheduledExecutorService scheduledExecutor,
                           final Transformer transformer,
-                          final List<Pair<TransportConfiguration,TransportConfiguration>> connectors) throws Exception
+                          final long retryInterval,
+                          final double retryIntervalMultiplier,
+                          final int maxRetriesBeforeFailover,
+                          final int maxRetriesAfterFailover,
+                          final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
    {
       this.name = name;
 
@@ -135,6 +147,14 @@
 
       this.scheduledExecutor = scheduledExecutor;
 
+      this.retryInterval = retryInterval;
+
+      this.retryIntervalMultiplier = retryIntervalMultiplier;
+
+      this.maxRetriesBeforeFailover = maxRetriesBeforeFailover;
+
+      this.maxRetriesAfterFailover = maxRetriesAfterFailover;
+
       this.updateConnectors(connectors);
    }
 
@@ -153,6 +173,10 @@
                           final HierarchicalRepository<QueueSettings> queueSettingsRepository,
                           final ScheduledExecutorService scheduledExecutor,
                           final Transformer transformer,
+                          final long retryInterval,
+                          final double retryIntervalMultiplier,
+                          final int maxRetriesBeforeFailover,
+                          final int maxRetriesAfterFailover,
                           final DiscoveryGroup discoveryGroup) throws Exception
    {
       this.name = name;
@@ -180,6 +204,14 @@
       this.transformer = transformer;
 
       this.discoveryGroup = discoveryGroup;
+
+      this.retryInterval = retryInterval;
+
+      this.retryIntervalMultiplier = retryIntervalMultiplier;
+
+      this.maxRetriesBeforeFailover = maxRetriesBeforeFailover;
+
+      this.maxRetriesAfterFailover = maxRetriesAfterFailover;
    }
 
    public synchronized void start() throws Exception
@@ -223,6 +255,12 @@
    {
       return started;
    }
+   
+   //For testing only
+   public Set<Forwarder> getForwarders()
+   {
+      return new HashSet<Forwarder>(forwarders.values());
+   }
 
    // DiscoveryListener implementation ------------------------------------------------------------------
 
@@ -246,11 +284,12 @@
 
       connectorSet.addAll(connectors);
 
-      Iterator<Map.Entry<Pair<TransportConfiguration,TransportConfiguration>, Forwarder>> iter = forwarders.entrySet().iterator();
+      Iterator<Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, Forwarder>> iter = forwarders.entrySet()
+                                                                                                            .iterator();
 
       while (iter.hasNext())
       {
-         Map.Entry<Pair<TransportConfiguration,TransportConfiguration>, Forwarder> entry = iter.next();
+         Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, Forwarder> entry = iter.next();
 
          if (!connectorSet.contains(entry.getKey()))
          {
@@ -262,7 +301,7 @@
          }
       }
 
-      for (Pair<TransportConfiguration,TransportConfiguration> connectorPair : connectors)
+      for (Pair<TransportConfiguration, TransportConfiguration> connectorPair : connectors)
       {
          if (!forwarders.containsKey(connectorPair))
          {
@@ -290,7 +329,11 @@
                                                     postOffice,
                                                     queueSettingsRepository,
                                                     scheduledExecutor,
-                                                    transformer);
+                                                    transformer,
+                                                    retryInterval,
+                                                    retryIntervalMultiplier,
+                                                    maxRetriesBeforeFailover,
+                                                    maxRetriesAfterFailover);
 
             forwarders.put(connectorPair, forwarder);
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-12-07 14:27:39 UTC (rev 5469)
@@ -452,6 +452,11 @@
    {
       return started;
    }
+   
+   public ClusterManager getClusterManager()
+   {
+      return clusterManager;
+   }
 
    private synchronized void checkActivate(final RemotingConnection connection)
    {

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2008-12-07 14:27:39 UTC (rev 5469)
@@ -65,7 +65,7 @@
       if (packet.getType() == PacketImpl.CREATESESSION && channel1.getReplicatingChannel() != null)
       {
          CreateSessionMessage msg = (CreateSessionMessage)packet;
-
+         
          Packet replPacket = new ReplicateCreateSessionMessage(msg.getName(), msg.getSessionChannelID(),
                                                                msg.getVersion(), msg.getUsername(),
                                                                msg.getPassword(), msg.getMinLargeMessageSize(), 

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/BasicMessageFlowTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/BasicMessageFlowTest.java	2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/BasicMessageFlowTest.java	2008-12-07 14:27:39 UTC (rev 5469)
@@ -22,6 +22,11 @@
 
 package org.jboss.messaging.tests.integration.cluster.distribution;
 
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -98,6 +103,10 @@
                                                                         1,
                                                                         -1,
                                                                         null,
+                                                                        DEFAULT_RETRY_INTERVAL,
+                                                                        DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+                                                                        DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+                                                                        DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
                                                                         connectorNames);
       MessageFlowConfiguration ofconfig2 = new MessageFlowConfiguration("flow1",
                                                                         address1.toString(),
@@ -106,6 +115,10 @@
                                                                         1,
                                                                         -1,
                                                                         null,
+                                                                        DEFAULT_RETRY_INTERVAL,
+                                                                        DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+                                                                        DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+                                                                        DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
                                                                         connectorNames);
 
       Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
@@ -193,6 +206,10 @@
                                                                         1,
                                                                         -1,
                                                                         null,
+                                                                        DEFAULT_RETRY_INTERVAL,
+                                                                        DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+                                                                        DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+                                                                        DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
                                                                         connectorNames);
 
       Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
@@ -267,6 +284,10 @@
                                                                         1,
                                                                         -1,
                                                                         null,
+                                                                        DEFAULT_RETRY_INTERVAL,
+                                                                        DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+                                                                        DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+                                                                        DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
                                                                         connectorNames);
 
       Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/DiscoveryFlowTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/DiscoveryFlowTest.java	2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/DiscoveryFlowTest.java	2008-12-07 14:27:39 UTC (rev 5469)
@@ -22,6 +22,11 @@
 
 package org.jboss.messaging.tests.integration.cluster.distribution;
 
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -192,6 +197,10 @@
                                                                        1,
                                                                        -1,
                                                                        null,
+                                                                       DEFAULT_RETRY_INTERVAL,
+                                                                       DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+                                                                       DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+                                                                       DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
                                                                        discoveryGroupName);
       Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
       ofconfigs.add(ofconfig);

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowBatchSizeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowBatchSizeTest.java	2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowBatchSizeTest.java	2008-12-07 14:27:39 UTC (rev 5469)
@@ -22,6 +22,11 @@
 
 package org.jboss.messaging.tests.integration.cluster.distribution;
 
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -97,6 +102,10 @@
                                                                        batchSize,
                                                                        -1,
                                                                        null,
+                                                                       DEFAULT_RETRY_INTERVAL,
+                                                                       DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+                                                                       DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+                                                                       DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
                                                                        connectorNames);
       Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
       ofconfigs.add(ofconfig);

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowBatchTimeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowBatchTimeTest.java	2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowBatchTimeTest.java	2008-12-07 14:27:39 UTC (rev 5469)
@@ -22,6 +22,11 @@
 
 package org.jboss.messaging.tests.integration.cluster.distribution;
 
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -98,6 +103,10 @@
                                                                        batchSize,
                                                                        batchTime,
                                                                        null,
+                                                                       DEFAULT_RETRY_INTERVAL,
+                                                                       DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+                                                                       DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+                                                                       DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
                                                                        connectorNames);
       Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
       ofconfigs.add(ofconfig);

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowReconnectTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowReconnectTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowReconnectTest.java	2008-12-07 14:27:39 UTC (rev 5469)
@@ -0,0 +1,332 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ * 
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ * 
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.distribution;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnector;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.cluster.Forwarder;
+import org.jboss.messaging.core.server.cluster.MessageFlow;
+import org.jboss.messaging.core.server.cluster.impl.ForwarderImpl;
+import org.jboss.messaging.util.Pair;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * 
+ * A MessageFlowReconnectTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 7 Dec 2008 11:48:30
+ *
+ *
+ */
+public class MessageFlowReconnectTest extends MessageFlowTestBase
+{
+   private static final Logger log = Logger.getLogger(MessageFlowReconnectTest.class);
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testAutomaticReconnectBeforeFailover() throws Exception
+   {
+      Map<String, Object> service0Params = new HashMap<String, Object>();
+      MessagingService service0 = createMessagingService(0, service0Params);
+
+      Map<String, Object> service1Params = new HashMap<String, Object>();
+      MessagingService service1 = createMessagingService(1, service1Params);
+      service1.start();
+
+      Map<String, Object> service2Params = new HashMap<String, Object>();
+      MessagingService service2 = createMessagingService(2, service2Params);
+      service2.start();
+
+      TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service0Params,
+                                                                    "server0tc");
+
+      Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+
+      TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service1Params,
+                                                                    "server1tc");
+
+      TransportConfiguration server2tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service2Params,
+                                                                    "server2tc");
+
+      connectors.put(server1tc.getName(), server1tc);
+      
+      connectors.put(server2tc.getName(), server2tc);
+      
+      service0.getServer().getConfiguration().setConnectorConfigurations(connectors);
+
+      List<Pair<String, String>> connectorNames = new ArrayList<Pair<String, String>>();
+      connectorNames.add(new Pair<String, String>(server1tc.getName(), server2tc.getName()));
+
+      final SimpleString address1 = new SimpleString("testaddress");
+
+      final long retryInterval = 50;
+      final double retryIntervalMultiplier = 1d;
+      final int retriesBeforeFailover = 3;
+      final int maxRetriesAfterFailover = -1;
+      
+      final String flowName = "flow1";
+      
+      MessageFlowConfiguration ofconfig1 = new MessageFlowConfiguration(flowName,
+                                                                        address1.toString(),
+                                                                        null,
+                                                                        true,
+                                                                        1,
+                                                                        -1,
+                                                                        null,
+                                                                        retryInterval,
+                                                                        retryIntervalMultiplier,
+                                                                        retriesBeforeFailover,
+                                                                        maxRetriesAfterFailover,
+                                                                        connectorNames);
+
+      Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+      ofconfigs.add(ofconfig1);
+
+      service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
+
+      service0.start();
+
+      ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+      ClientSession session0 = csf0.createSession(false, true, true);
+
+      ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+      ClientSession session1 = csf1.createSession(false, true, true);
+
+      session0.createQueue(address1, address1, null, false, false, false);
+      session1.createQueue(address1, address1, null, false, false, false);
+      ClientProducer prod0 = session0.createProducer(address1);
+
+      ClientConsumer cons1 = session1.createConsumer(address1);
+
+      session1.start();
+
+      ClientMessage message = session0.createClientMessage(false);
+      SimpleString propKey = new SimpleString("propkey");
+      SimpleString propVal = new SimpleString("propval");
+      message.putStringProperty(propKey, propVal);
+      message.getBody().flip();
+      
+      //Now we will simulate a failure of the message flow connection between server1 and server2
+      //And prevent reconnection for a few tries, then it will reconnect without failing over
+      MessageFlow flow = service0.getServer().getClusterManager().getMessageFlows().get(flowName);
+      Forwarder forwarder = flow.getForwarders().iterator().next();
+      RemotingConnection forwardingConnection = ((ForwarderImpl)forwarder).getForwardingConnection();
+      InVMConnector.failOnCreateConnection = true;
+      InVMConnector.numberOfFailures = retriesBeforeFailover - 1;
+      forwardingConnection.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+
+      prod0.send(message);
+
+      ClientMessage r1 = cons1.receive(1000);
+      assertNotNull(r1);
+      assertEquals(propVal, r1.getProperty(propKey));
+
+      session0.close();
+      session1.close();
+
+      service0.stop();
+      service1.stop();
+      service2.stop();
+
+      assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+      assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+      assertEquals(0, service2.getServer().getRemotingService().getConnections().size());
+   }
+   
+   public void testAutomaticReconnectTryThenFailover() throws Exception
+   {
+      Map<String, Object> service0Params = new HashMap<String, Object>();
+      MessagingService service0 = createMessagingService(0, service0Params);
+
+      Map<String, Object> service1Params = new HashMap<String, Object>();
+      MessagingService service1 = createMessagingService(1, service1Params);
+            
+      Map<String, Object> service2Params = new HashMap<String, Object>();
+      MessagingService service2 = createMessagingService(2, service2Params, true);
+      
+      TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service0Params,
+                                                                    "server0tc");
+
+      Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+
+      TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service1Params,
+                                                                    "server1tc");
+
+      TransportConfiguration server2tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service2Params,
+                                                                    "server2tc");
+      
+      connectors.put(server1tc.getName(), server1tc);
+      
+      connectors.put(server2tc.getName(), server2tc);
+      
+      service1.getServer().getConfiguration().setConnectorConfigurations(connectors);
+      
+      service1.getServer().getConfiguration().setBackupConnectorName(server2tc.getName());
+      
+      service2.getServer().getConfiguration().setBackup(true);
+      
+      service1.start();
+      
+      service2.start();
+      
+      log.info("Started service1 and service2");
+                 
+      service0.getServer().getConfiguration().setConnectorConfigurations(connectors);
+
+      List<Pair<String, String>> connectorNames = new ArrayList<Pair<String, String>>();
+      connectorNames.add(new Pair<String, String>(server1tc.getName(), server2tc.getName()));
+
+      final SimpleString address1 = new SimpleString("testaddress");
+
+      final long retryInterval = 50;
+      final double retryIntervalMultiplier = 1d;
+      final int retriesBeforeFailover = 3;
+      final int maxRetriesAfterFailover = -1;
+      
+      final String flowName = "flow1";
+      
+      MessageFlowConfiguration ofconfig1 = new MessageFlowConfiguration(flowName,
+                                                                        address1.toString(),
+                                                                        null,
+                                                                        true,
+                                                                        1,
+                                                                        -1,
+                                                                        null,
+                                                                        retryInterval,
+                                                                        retryIntervalMultiplier,
+                                                                        retriesBeforeFailover,
+                                                                        maxRetriesAfterFailover,
+                                                                        connectorNames);
+
+      Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+      ofconfigs.add(ofconfig1);
+
+      service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
+
+      service0.start();
+      
+      log.info("started service0");
+
+      ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+      ClientSession session0 = csf0.createSession(false, true, true);
+
+      ClientSessionFactory csf2 = new ClientSessionFactoryImpl(server2tc);
+      ClientSession session2 = csf2.createSession(false, true, true);
+
+      session0.createQueue(address1, address1, null, false, false, false);
+      session2.createQueue(address1, address1, null, false, false, false);
+      ClientProducer prod0 = session0.createProducer(address1);
+
+      ClientConsumer cons1 = session2.createConsumer(address1);
+
+      session2.start();
+
+      ClientMessage message = session0.createClientMessage(false);
+      SimpleString propKey = new SimpleString("propkey");
+      SimpleString propVal = new SimpleString("propval");
+      message.putStringProperty(propKey, propVal);
+      message.getBody().flip();
+      
+      //Now we will simulate a failure of the message flow connection between server1 and server2
+      //And prevent reconnection for a few tries, then it will failover
+      MessageFlow flow = service0.getServer().getClusterManager().getMessageFlows().get(flowName);
+      Forwarder forwarder = flow.getForwarders().iterator().next();
+      RemotingConnection forwardingConnection = ((ForwarderImpl)forwarder).getForwardingConnection();
+      InVMConnector.failOnCreateConnection = true;
+      InVMConnector.numberOfFailures = retriesBeforeFailover;
+      forwardingConnection.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+
+      prod0.send(message);
+
+      ClientMessage r1 = cons1.receive(2000);
+      assertNotNull(r1);
+      assertEquals(propVal, r1.getProperty(propKey));
+
+      session0.close();
+      session2.close();
+
+      service0.stop();
+      service1.stop();
+      service2.stop();
+
+      assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+      assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+      assertEquals(0, service2.getServer().getRemotingService().getConnections().size());
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      InVMConnector.resetFailures();
+      
+      assertEquals(0, InVMRegistry.instance.size());
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowTestBase.java	2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowTestBase.java	2008-12-07 14:27:39 UTC (rev 5469)
@@ -20,12 +20,13 @@
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
  */
 
-
 package org.jboss.messaging.tests.integration.cluster.distribution;
 
 import java.util.HashMap;
 import java.util.Map;
 
+import junit.framework.TestCase;
+
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
@@ -33,8 +34,6 @@
 import org.jboss.messaging.core.server.MessagingService;
 import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
 
-import junit.framework.TestCase;
-
 /**
  * A MessageFlowTestBase
  *
@@ -46,7 +45,7 @@
  */
 public abstract class MessageFlowTestBase extends TestCase
 {
-   protected MessagingService createMessagingService(final int id, Map<String, Object> params)
+   protected MessagingService createMessagingService(final int id, final Map<String, Object> params)
    {
       Configuration serviceConf = new ConfigurationImpl();
       serviceConf.setClustered(true);
@@ -59,6 +58,20 @@
       return service;
    }
    
+   protected MessagingService createMessagingService(final int id, final Map<String, Object> params, final boolean backup)
+   {
+      Configuration serviceConf = new ConfigurationImpl();
+      serviceConf.setClustered(true);
+      serviceConf.setSecurityEnabled(false);     
+      serviceConf.setBackup(backup);
+      params.put(TransportConstants.SERVER_ID_PROP_NAME, id);
+      serviceConf.getAcceptorConfigurations()
+                  .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+                                                  params));
+      MessagingService service = MessagingServiceImpl.newNullStorageMessagingServer(serviceConf);
+      return service;
+   }
+   
    protected MessagingService createMessagingService(final int id)
    {
       return this.createMessagingService(id, new HashMap<String, Object>());

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowTransformerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowTransformerTest.java	2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowTransformerTest.java	2008-12-07 14:27:39 UTC (rev 5469)
@@ -22,6 +22,11 @@
 
 package org.jboss.messaging.tests.integration.cluster.distribution;
 
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -95,6 +100,10 @@
                                                                        1,
                                                                        -1,
                                                                        "org.jboss.messaging.tests.integration.cluster.distribution.SimpleTransformer",
+                                                                       DEFAULT_RETRY_INTERVAL,
+                                                                       DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+                                                                       DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+                                                                       DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
                                                                        connectorNames);
       Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
       ofconfigs.add(ofconfig);

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWildcardTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWildcardTest.java	2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWildcardTest.java	2008-12-07 14:27:39 UTC (rev 5469)
@@ -22,6 +22,11 @@
 
 package org.jboss.messaging.tests.integration.cluster.distribution;
 
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -103,6 +108,10 @@
                                                                        1,
                                                                        -1,
                                                                        null,
+                                                                       DEFAULT_RETRY_INTERVAL,
+                                                                       DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+                                                                       DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+                                                                       DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
                                                                        connectorNames);
       Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
       ofconfigs.add(ofconfig);

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWithFilterTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWithFilterTest.java	2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowWithFilterTest.java	2008-12-07 14:27:39 UTC (rev 5469)
@@ -22,6 +22,11 @@
 
 package org.jboss.messaging.tests.integration.cluster.distribution;
 
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -97,6 +102,10 @@
                                                                        1,
                                                                        -1,
                                                                        null,
+                                                                       DEFAULT_RETRY_INTERVAL,
+                                                                       DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+                                                                       DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+                                                                       DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
                                                                        connectorNames);
       Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
       ofconfigs.add(ofconfig);

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/StaticFlowTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/StaticFlowTest.java	2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/StaticFlowTest.java	2008-12-07 14:27:39 UTC (rev 5469)
@@ -22,6 +22,11 @@
 
 package org.jboss.messaging.tests.integration.cluster.distribution;
 
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -36,8 +41,6 @@
 import org.jboss.messaging.core.client.ClientSessionFactory;
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
 import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
-import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
 import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
@@ -129,6 +132,10 @@
                                                                        1,
                                                                        -1,
                                                                        null,
+                                                                       DEFAULT_RETRY_INTERVAL,
+                                                                       DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+                                                                       DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+                                                                       DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
                                                                        connectorNames);
       Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
       ofconfigs.add(ofconfig);
@@ -290,6 +297,10 @@
                                                                        1,
                                                                        -1,
                                                                        null,
+                                                                       DEFAULT_RETRY_INTERVAL,
+                                                                       DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+                                                                       DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+                                                                       DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
                                                                        connectorNames);
       Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
       ofconfigs.add(ofconfig);
@@ -466,6 +477,10 @@
                                                                         1,
                                                                         -1,
                                                                         null,
+                                                                        DEFAULT_RETRY_INTERVAL,
+                                                                        DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+                                                                        DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+                                                                        DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
                                                                         connectorNames1);
       MessageFlowConfiguration ofconfig2 = new MessageFlowConfiguration("flow2",
                                                                         testAddress.toString(),
@@ -474,6 +489,10 @@
                                                                         1,
                                                                         -1,
                                                                         null,
+                                                                        DEFAULT_RETRY_INTERVAL,
+                                                                        DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+                                                                        DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+                                                                        DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
                                                                         connectorNames2);
       MessageFlowConfiguration ofconfig3 = new MessageFlowConfiguration("flow3",
                                                                         testAddress.toString(),
@@ -482,6 +501,10 @@
                                                                         1,
                                                                         -1,
                                                                         null,
+                                                                        DEFAULT_RETRY_INTERVAL,
+                                                                        DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+                                                                        DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+                                                                        DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
                                                                         connectorNames3);
       MessageFlowConfiguration ofconfig4 = new MessageFlowConfiguration("flow4",
                                                                         testAddress.toString(),
@@ -490,6 +513,10 @@
                                                                         1,
                                                                         -1,
                                                                         null,
+                                                                        DEFAULT_RETRY_INTERVAL,
+                                                                        DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+                                                                        DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+                                                                        DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
                                                                         connectorNames4);
 
       Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailureListenerOnFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailureListenerOnFailoverTest.java	2008-12-05 17:26:32 UTC (rev 5468)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailureListenerOnFailoverTest.java	2008-12-07 14:27:39 UTC (rev 5469)
@@ -43,7 +43,6 @@
 import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
 import org.jboss.messaging.core.server.MessagingService;
 import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
-import org.jboss.messaging.util.SimpleString;
 
 /**
  * 
@@ -65,8 +64,6 @@
 
    // Attributes ----------------------------------------------------
 
-   private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
-
    private MessagingService liveService;
 
    private MessagingService backupService;




More information about the jboss-cvs-commits mailing list