[hornetq-commits] JBoss hornetq SVN: r10815 - in branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq: core/client/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Jun 16 01:25:27 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-06-16 01:25:26 -0400 (Thu, 16 Jun 2011)
New Revision: 10815

Modified:
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/api/core/client/ClientSessionFactory.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
Log:
Some cleanup on bridges

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/api/core/client/ClientSessionFactory.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/api/core/client/ClientSessionFactory.java	2011-06-16 05:23:43 UTC (rev 10814)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/api/core/client/ClientSessionFactory.java	2011-06-16 05:25:26 UTC (rev 10815)
@@ -133,10 +133,16 @@
                                int ackBatchSize) throws HornetQException;
 
    void close();
+
+   /**
+    * Opposed to close, will call cleanup only on every created session and children objects.
+    */
+   void cleanup();
    
    ServerLocator getServerLocator();
    
    CoreRemotingConnection getConnection();
 
     boolean isClosed();
+
 }

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-06-16 05:23:43 UTC (rev 10814)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-06-16 05:25:26 UTC (rev 10815)
@@ -129,7 +129,7 @@
 
    private final long maxRetryInterval;
 
-   private final int reconnectAttempts;
+   private int reconnectAttempts;
 
    private final Set<SessionFailureListener> listeners = new ConcurrentHashSet<SessionFailureListener>();
 
@@ -450,7 +450,45 @@
 
       closed = true;
    }
+   
+   public void cleanup()
+   {
+      if (closed)
+      {
+         return;
+      }
 
+      // we need to stop the factory from connecting if it is in the middle of trying to failover before we get the lock
+      causeExit();
+      synchronized (createSessionLock)
+      {
+         synchronized (failoverLock)
+         {
+            HashSet<ClientSessionInternal> sessionsToClose;
+            synchronized (sessions)
+            {
+               sessionsToClose = new HashSet<ClientSessionInternal>(sessions);
+            }
+            // work on a copied set. the session will be removed from sessions when session.close() is called
+            for (ClientSessionInternal session : sessionsToClose)
+            {
+               try
+               {
+                  session.cleanUp(false);
+               }
+               catch (Exception e)
+               {
+                  log.warn("Unable to close session", e);
+               }
+            }
+
+            checkCloseConnection();
+         }
+      }
+
+      closed = true;
+   }
+
     public boolean isClosed()
     {
         return closed;
@@ -1447,4 +1485,13 @@
          cancelled = true;
       }
    }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.client.impl.ClientSessionFactoryInternal#setReconnectAttempts(int)
+    */
+   public void setReconnectAttempts(int attempts)
+   {
+      this.reconnectAttempts = attempts;
+   }
+
 }

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java	2011-06-16 05:23:43 UTC (rev 10814)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java	2011-06-16 05:25:26 UTC (rev 10815)
@@ -46,4 +46,6 @@
    void setBackupConnector(TransportConfiguration live, TransportConfiguration backUp);
 
    Object getBackupConnector();
+
+   void setReconnectAttempts(int i);
 }

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-06-16 05:23:43 UTC (rev 10814)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-06-16 05:25:26 UTC (rev 10815)
@@ -1457,7 +1457,7 @@
          {
             try
             {
-               factory.connect(reconnectAttempts, failoverOnInitialConnection);
+               factory.connect(initialConnectAttempts, failoverOnInitialConnection);
             }
             catch (HornetQException e)
             {

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-06-16 05:23:43 UTC (rev 10814)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-06-16 05:25:26 UTC (rev 10815)
@@ -18,6 +18,8 @@
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Message;
@@ -28,6 +30,7 @@
 import org.hornetq.api.core.client.SendAcknowledgementHandler;
 import org.hornetq.api.core.client.SessionFailureListener;
 import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.client.impl.ClientSessionInternal;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
 import org.hornetq.core.filter.Filter;
@@ -53,6 +56,7 @@
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author Clebert Suconic
  *
  * Created 12 Nov 2008 11:37:35
  *
@@ -64,13 +68,13 @@
    // Constants -----------------------------------------------------
 
    private static final Logger log = Logger.getLogger(BridgeImpl.class);
-   
+
    private static final boolean isTrace = log.isTraceEnabled();
 
    // Attributes ----------------------------------------------------
-   
+
    private static final SimpleString JMS_QUEUE_ADDRESS_PREFIX = new SimpleString("jms.queue.");
-   
+
    private static final SimpleString JMS_TOPIC_ADDRESS_PREFIX = new SimpleString("jms.topic.");
 
    protected final ServerLocatorInternal serverLocator;
@@ -83,6 +87,11 @@
 
    protected final Executor executor;
 
+   protected final ScheduledExecutorService scheduledExecutor;
+
+   /** Used when there's a scheduled reconnection */
+   protected ScheduledFuture<?> futureScheduledReconnection;
+
    private final Filter filter;
 
    private final SimpleString forwardingAddress;
@@ -102,7 +111,7 @@
    private final boolean useDuplicateDetection;
 
    private volatile boolean active;
-   
+
    private volatile boolean stopping;
 
    private final String user;
@@ -111,6 +120,18 @@
 
    private boolean activated;
 
+   private final int reconnectAttempts;
+
+   private int reconnectAttemptsInUse;
+
+   private final long retryInterval;
+
+   private final double retryMultiplier;
+
+   private final long maxRetryInterval;
+
+   private int retryCount = 0;
+
    private NotificationService notificationService;
 
    // Static --------------------------------------------------------
@@ -120,6 +141,10 @@
    // Public --------------------------------------------------------
 
    public BridgeImpl(final ServerLocatorInternal serverLocator,
+                     final int reconnectAttempts,
+                     final long retryInterval,
+                     final double retryMultiplier,
+                     final long maxRetryInterval,
                      final UUID nodeUUID,
                      final SimpleString name,
                      final Queue queue,
@@ -134,6 +159,17 @@
                      final boolean activated,
                      final StorageManager storageManager) throws Exception
    {
+
+      this.reconnectAttempts = reconnectAttempts;
+      
+      this.reconnectAttemptsInUse = -1;
+
+      this.retryInterval = retryInterval;
+
+      this.retryMultiplier = retryMultiplier;
+
+      this.maxRetryInterval = maxRetryInterval;
+
       this.serverLocator = serverLocator;
 
       this.nodeUUID = nodeUUID;
@@ -144,6 +180,8 @@
 
       this.executor = executor;
 
+      this.scheduledExecutor = scheduledExecutor;
+
       filter = FilterImpl.createFilter(filterString);
 
       this.forwardingAddress = forwardingAddress;
@@ -187,7 +225,7 @@
       }
    }
 
-   private void cancelRefs() throws Exception
+   private void cancelRefs()
    {
       MessageReference ref;
 
@@ -201,25 +239,32 @@
          }
          list.addFirst(ref);
       }
-      
+
       if (isTrace && list.isEmpty())
       {
-         log.trace("didn't have any references to cancel on bridge "  + this);
+         log.trace("didn't have any references to cancel on bridge " + this);
       }
 
-      Queue queue = null;
-      
+      Queue refqueue = null;
+
       long timeBase = System.currentTimeMillis();
 
       for (MessageReference ref2 : list)
       {
-         queue = ref2.getQueue();
+         refqueue = ref2.getQueue();
 
-         queue.cancel(ref2, timeBase);
+         try
+         {
+            refqueue.cancel(ref2, timeBase);
+         }
+         catch (Exception e)
+         {
+            // There isn't much we can do besides log an error
+            log.error("Couldn't cancel reference " + ref2, e);
+         }
       }
+   }
 
-   }
-   
    public void flushExecutor()
    {
       // Wait for any create objects runnable to complete
@@ -235,18 +280,22 @@
       }
    }
 
-
    public void stop() throws Exception
    {
-	  if (log.isDebugEnabled())
-	  {
-	     log.debug("Bridge " + this.name + " being stopped");
-	  }
+      if (log.isDebugEnabled())
+      {
+         log.debug("Bridge " + this.name + " being stopped");
+      }
+
+      stopping = true;
       
-      stopping = true;
+      if (futureScheduledReconnection != null)
+      {
+         futureScheduledReconnection.cancel(true);
+      }
 
       executor.execute(new StopRunnable());
-      
+
       if (notificationService != null)
       {
          TypedProperties props = new TypedProperties();
@@ -265,10 +314,10 @@
 
    public void pause() throws Exception
    {
-	  if (log.isDebugEnabled())
-	  {
-	     log.debug("Bridge " + this.name + " being paused");
-	  }
+      if (log.isDebugEnabled())
+      {
+         log.debug("Bridge " + this.name + " being paused");
+      }
 
       executor.execute(new PauseRunnable());
 
@@ -288,13 +337,13 @@
       }
    }
 
-    public void resume() throws Exception
-    {
-        queue.addConsumer(BridgeImpl.this);
-        queue.deliverAsync();
-    }
+   public void resume() throws Exception
+   {
+      queue.addConsumer(BridgeImpl.this);
+      queue.deliverAsync();
+   }
 
-    public boolean isStarted()
+   public boolean isStarted()
    {
       return started;
    }
@@ -303,7 +352,7 @@
    {
       activated = true;
 
-      executor.execute(new CreateObjectsRunnable());
+      executor.execute(new ConnectRunnable());
    }
 
    public SimpleString getName()
@@ -377,7 +426,7 @@
       {
          // We keep our own DuplicateID for the Bridge, so bouncing back and forths will work fine
          byte[] bytes = getDuplicateBytes(nodeUUID, message.getMessageID());
-   
+
          message.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, bytes);
       }
 
@@ -396,13 +445,13 @@
    public static byte[] getDuplicateBytes(final UUID nodeUUID, final long messageID)
    {
       byte[] bytes = new byte[24];
-  
+
       ByteBuffer bb = ByteBuffer.wrap(bytes);
-  
+
       bb.put(nodeUUID.asBytes());
-  
+
       bb.putLong(messageID);
-      
+
       return bytes;
    }
 
@@ -421,16 +470,16 @@
             return HandleStatus.BUSY;
          }
 
-		   if (isTrace)
-		   {
-		      log.trace("Bridge " + name + " is handling reference=" + ref); 
-		   }
+         if (isTrace)
+         {
+            log.trace("Bridge " + name + " is handling reference=" + ref);
+         }
          ref.handled();
 
          ServerMessage message = ref.getMessage();
 
          refs.add(ref);
-         
+
          message = beforeForward(message);
 
          SimpleString dest;
@@ -444,10 +493,10 @@
             // Preserve the original address
             dest = message.getAddress();
          }
-         //if we failover during send then there is a chance that the
-         //that this will throw a disconnect, we need to remove the message
-         //from the acks so it will get resent, duplicate detection will cope
-         //with any messages resent
+         // if we failover during send then there is a chance that the
+         // that this will throw a disconnect, we need to remove the message
+         // from the acks so it will get resent, duplicate detection will cope
+         // with any messages resent
          try
          {
             producer.send(dest, message);
@@ -467,14 +516,34 @@
 
    // FailureListener implementation --------------------------------
 
-   public void connectionFailed(final HornetQException me, boolean failedOver)
+   public final void connectionFailed(final HornetQException me, boolean failedOver)
    {
       log.warn(name + "::Connection failed with failedOver=" + failedOver, me);
       if (isTrace)
       {
-         log.trace("Calling BridgeImpl::connectionFailed(HOrnetQException me=" + me + ", boolean failedOver=" + failedOver);
+         log.trace("Calling BridgeImpl::connectionFailed(HOrnetQException me=" + me +
+                   ", boolean failedOver=" +
+                   failedOver);
       }
+      try
+      {
+         csf.cleanup();
+      }
+      catch (Throwable dontCare)
+      {
+      }
+
+      try
+      {
+         session.cleanUp(false);
+      }
+      catch (Throwable dontCare)
+      {
+      }
+
       fail(false);
+
+      scheduleRetryConnect();
    }
 
    public void beforeReconnect(final HornetQException exception)
@@ -482,8 +551,6 @@
       log.warn(name + "::Connection failed before reconnect ", exception);
       fail(true);
    }
-   
-   
 
    // Package protected ---------------------------------------------
 
@@ -497,8 +564,8 @@
    @Override
    public String toString()
    {
-      return this.getClass().getName() +
-             " [name=" + name +
+      return this.getClass().getName() + " [name=" +
+             name +
              ", nodeUUID=" +
              nodeUUID +
              ", queue=" +
@@ -516,212 +583,199 @@
              "]";
    }
 
-   private void fail(final boolean beforeReconnect)
+   protected void fail(final boolean permanently)
    {
-      // This will get called even after the bridge reconnects - in this case
-      // we want to cancel all unacked refs so they get resent
-      // duplicate detection will ensure no dups are routed on the other side
+      log.debug(name + "::BridgeImpl::fail being called, permanently=" + permanently);
 
-      log.debug(name + "::BridgeImpl::fail being called, beforeReconnect=" + beforeReconnect);
+      if (queue != null)
+      {
+         try
+         {
+            queue.removeConsumer(this);
+         }
+         catch (Exception dontcare)
+         {
+            log.debug(dontcare);
+         }
+      }
       
-      if (session.getConnection().isDestroyed())
+      cancelRefs();
+      if (queue != null)
       {
-         log.debug(name + "::Connection is destroyed, active = false now");
-         active = false;
+         queue.deliverAsync();
       }
-
-
-         if (!session.getConnection().isDestroyed())
-         {
-            if (beforeReconnect)
-            {
-               try {
-            	  log.debug(name + "::Connection is destroyed, active = false now");
-
-                  cancelRefs();
-               }
-               catch (Exception e)
-               {
-                   BridgeImpl.log.error("Failed to cancel refs", e);
-               }
-            }
-            else
-            {
-               try
-               {
-                  afterConnect();
-
-                  log.debug(name + "::After reconnect, setting active=true now");
-                  active = true;
-
-                  if (queue != null)
-                  {
-                     queue.deliverAsync();
-                  }
-               }
-               catch (Exception e)
-               {
-                  BridgeImpl.log.error("Failed to call after connect", e);
-               }
-            }
-         }
    }
 
    /* Hook for doing extra stuff after connection */
    protected void afterConnect() throws Exception
    {
-      //NOOP
+      retryCount = 0;
+      reconnectAttemptsInUse = reconnectAttempts;
    }
 
    /* Hook for creating session factory */
-   protected ClientSessionFactory createSessionFactory() throws Exception
+   protected ClientSessionFactoryInternal createSessionFactory() throws Exception
    {
-      return serverLocator.createSessionFactory();
+      ClientSessionFactoryInternal csf = (ClientSessionFactoryInternal)serverLocator.createSessionFactory();
+      // csf.setReconnectAttempts(0);
+      //csf.setInitialReconnectAttempts(1);
+      return csf;
    }
 
    /* This is called only when the bridge is activated */
-   protected synchronized boolean createObjects()
+   protected void connect()
    {
-      if (!started)
-      {
-         return false;
-      }
+      BridgeImpl.log.info("Connecting bridge " + name + " to its destination [" + nodeUUID.toString() + "], csf=" + this.csf);
 
-      boolean retry = false;
-      int retryCount = 0;
+      retryCount++;
 
-      do
+      try
       {
-         BridgeImpl.log.info("Connecting bridge " + name + " to its destination [" + nodeUUID.toString() + "]");
+         if (csf == null || csf.isClosed())
+         {
+            csf = createSessionFactory();
+            // Session is pre-acknowledge
+            session = (ClientSessionInternal)csf.createSession(user, password, false, true, true, true, 1);
+            try
+            {
+               session.addMetaData("Session-for-bridge", name.toString());
+               session.addMetaData("nodeUUID", nodeUUID.toString());
+            }
+            catch (Throwable dontCare)
+            {
+               // addMetaData here is just for debug purposes
+            }
+         }
 
-         try
+         if (forwardingAddress != null)
          {
-            if (csf == null || csf.isClosed())
+            BindingQuery query = null;
+
+            try
             {
-                csf = createSessionFactory();
-                // Session is pre-acknowledge
-                session = (ClientSessionInternal)csf.createSession(user, password, false, true, true, true, 1);
-                try
-                {
-                   session.addMetaData("Session-for-bridge", name.toString());
-                   session.addMetaData("nodeUUID", nodeUUID.toString());
-                }
-                catch (Throwable dontCare)
-                {
-                   // addMetaData here is just for debug purposes
-                }
+               query = session.bindingQuery(forwardingAddress);
             }
+            catch (Throwable e)
+            {
+               log.warn("Error on querying binding on bridge " + this.name + ". Retrying in 100 milliseconds", e);
+               // This was an issue during startup, we will not count this retry
+               retryCount--;
 
-            if (forwardingAddress != null)
+               scheduleRetryConnectFixedTimeout(100);
+               return;
+            }
+
+            if (forwardingAddress.startsWith(BridgeImpl.JMS_QUEUE_ADDRESS_PREFIX) || forwardingAddress.startsWith(BridgeImpl.JMS_TOPIC_ADDRESS_PREFIX))
             {
-               BindingQuery query = null;
-               
-               try
+               if (!query.isExists())
                {
-                  query = session.bindingQuery(forwardingAddress);
+                  log.warn("Address " + forwardingAddress +
+                           " doesn't have any bindings yet, retry #(" +
+                           retryCount +
+                           ")");
+                  scheduleRetryConnect();
+                  return;
                }
-               catch (Throwable e)
+            }
+            else
+            {
+               if (!query.isExists())
                {
-                  log.warn("Error on querying binding. Retrying", e);
-                  retry = true;
-                  Thread.sleep(100);
-                  continue;
+                  log.info("Bridge " + this.getName() +
+                           " connected to fowardingAddress=" +
+                           this.getForwardingAddress() +
+                           ". " +
+                           getForwardingAddress() +
+                           " doesn't have any bindings what means messages will be ignored until a binding is created.");
                }
-   
-               if (forwardingAddress.startsWith(BridgeImpl.JMS_QUEUE_ADDRESS_PREFIX) || forwardingAddress.startsWith(BridgeImpl.JMS_TOPIC_ADDRESS_PREFIX))
-               {
-                  if (!query.isExists())
-                  {
-                     retryCount ++;
-                     if (serverLocator.getReconnectAttempts() > 0)
-                     {
-                        if (retryCount > serverLocator.getReconnectAttempts())
-                        {
-                           log.warn("Retried " + forwardingAddress + " up to the configured reconnectAttempts(" + serverLocator.getReconnectAttempts() + "). Giving up now. The bridge " + this.getName() + " will not be activated");
-                           return false;
-                        }
-                     }
-   
-                     log.warn("Address " + forwardingAddress + " doesn't have any bindings yet, retry #(" + retryCount + ")");
-                     Thread.sleep(serverLocator.getRetryInterval());
-                     retry = true;
-                     csf.close();
-                     session.close();
-                     continue;
-                  }
-               }
-               else
-               {
-                  if (!query.isExists())
-                  {
-                     log.info("Bridge " + this.getName() + " connected to fowardingAddress=" + this.getForwardingAddress() + ". " + getForwardingAddress() + " doesn't have any bindings what means messages will be ignored until a binding is created.");
-                  }
-               }
             }
+         }
 
-            if (session == null)
-            {
-               // This can happen if the bridge is shutdown
-               return false;
-            }
+         producer = session.createProducer();
+         session.addFailureListener(BridgeImpl.this);
+         session.setSendAcknowledgementHandler(BridgeImpl.this);
 
-            producer = session.createProducer();
-            session.addFailureListener(BridgeImpl.this);
-            session.setSendAcknowledgementHandler(BridgeImpl.this);
+         afterConnect();
 
-            afterConnect();
+         active = true;
 
-            active = true;
+         queue.addConsumer(BridgeImpl.this);
+         queue.deliverAsync();
 
-            queue.addConsumer(BridgeImpl.this);
-            queue.deliverAsync();
+         BridgeImpl.log.info("Bridge " + name + " is connected [" + nodeUUID + "-> " + name + "]");
 
-            BridgeImpl.log.info("Bridge " + name + " is connected [" + nodeUUID + "-> " +  name +"]");
+         return;
+      }
+      catch (HornetQException e)
+      {
+         // the session was created while its server was starting, retry it:
+         if (e.getCode() == HornetQException.SESSION_CREATION_REJECTED)
+         {
+            BridgeImpl.log.warn("Server is starting, retry to create the session for bridge " + name);
 
-            return true;
+            // We are not going to count this one as a retry
+            retryCount--;
+            scheduleRetryConnectFixedTimeout(100);
+            return;
          }
-         catch (HornetQException e)
+         else
          {
-            if (csf != null)
-            {
-               csf.close();
-            }
+            BridgeImpl.log.warn("Bridge " + name + " is unable to connect to destination. Retrying", e);
+         }
+      }
+      catch (Exception e)
+      {
+         BridgeImpl.log.warn("Bridge " + name + " is unable to connect to destination. It will be disabled.", e);
+      }
 
-            // the session was created while its server was starting, retry it:
-            if (e.getCode() == HornetQException.SESSION_CREATION_REJECTED)
-            {
-               BridgeImpl.log.warn("Server is starting, retry to create the session for bridge " + name);
+      scheduleRetryConnect();
 
-               // Sleep a little to prevent spinning too much
-               try
-               {
-                  Thread.sleep(10);
-               }
-               catch (InterruptedException ignore)
-               {
-               }
+   }
 
-               retry = true;
+   protected void scheduleRetryConnect()
+   {
+      if (reconnectAttemptsInUse >= 0 && retryCount > reconnectAttempts)
+      {
+         log.warn("Bridge " + this.name +
+                  " achieved " +
+                  retryCount +
+                  " maxattempts=" +
+                  reconnectAttempts +
+                  " it will stop retrying to reconnect");
+         fail(true);
+         return;
+      }
 
-               continue;
-            }
-            else
-            {
-               BridgeImpl.log.warn("Bridge " + name + " is unable to connect to destination. It will be disabled.", e);
+      long timeout = (long)(this.retryCount * this.retryMultiplier * this.retryMultiplier);
+      if (timeout == 0)
+      {
+         timeout = this.retryInterval;
+      }
+      if (timeout > maxRetryInterval)
+      {
+         timeout = maxRetryInterval;
+      }
 
-               return false;
-            }
+      scheduleRetryConnectFixedTimeout(timeout);
+   }
+
+   protected void scheduleRetryConnectFixedTimeout(final long milliseconds)
+   {
+      if (csf != null)
+      {
+         try
+         {
+            csf.cleanup();
          }
-         catch (Exception e)
+         catch (Throwable ignored)
          {
-            BridgeImpl.log.warn("Bridge " + name + " is unable to connect to destination. It will be disabled.", e);
-
-            return false;
          }
       }
-      while (retry && !stopping);
 
-      return false;
+      csf = null;
+      session = null;
+
+      futureScheduledReconnection = scheduledExecutor.schedule(new FutureConnectRunnable(), milliseconds, TimeUnit.MILLISECONDS);
    }
 
    // Inner classes -------------------------------------------------
@@ -732,14 +786,6 @@
       {
          try
          {
-            // We need to close the session outside of the lock,
-            // so any pending operation will be canceled right away
-            
-            // TODO: Why closing the CSF will make a few clustering and failover tests to 
-            //       either deadlock or take forever on waiting 
-            //       locks
-            csf.close();
-            csf = null;
             if (session != null)
             {
                log.debug("Cleaning up session " + session);
@@ -747,6 +793,11 @@
                session.removeFailureListener(BridgeImpl.this);
             }
 
+            if (csf != null)
+            {
+               csf.close();
+            }
+
             synchronized (BridgeImpl.this)
             {
                log.debug("Closing Session for bridge " + BridgeImpl.this.name);
@@ -759,13 +810,6 @@
 
             queue.removeConsumer(BridgeImpl.this);
 
-            cancelRefs();
-
-            if (queue != null)
-            {
-               queue.deliverAsync();
-            }
-
             log.info("stopped bridge " + name);
          }
          catch (Exception e)
@@ -783,23 +827,15 @@
          {
             synchronized (BridgeImpl.this)
             {
-               log.debug("Closing Session for bridge " + BridgeImpl.this.name);
-
                started = false;
 
                active = false;
-
             }
 
             queue.removeConsumer(BridgeImpl.this);
 
-            cancelRefs();
+            internalCancelReferences();
 
-            if (queue != null)
-            {
-               queue.deliverAsync();
-            }
-
             log.info("paused bridge " + name);
          }
          catch (Exception e)
@@ -807,18 +843,33 @@
             BridgeImpl.log.error("Failed to pause bridge", e);
          }
       }
+
    }
 
-   private class CreateObjectsRunnable implements Runnable
+   private void internalCancelReferences()
    {
-      public synchronized void run()
+      cancelRefs();
+
+      if (queue != null)
       {
-         if (!createObjects())
-         {
-            active = false;
+         queue.deliverAsync();
+      }
+   }
 
-            started = false;
-         }
+   // The scheduling will still use the main executor here
+   private class FutureConnectRunnable implements Runnable
+   {
+      public void run()
+      {
+         executor.execute(new ConnectRunnable());
       }
    }
+
+   private class ConnectRunnable implements Runnable
+   {
+      public synchronized void run()
+      {
+         connect();
+      }
+   }
 }

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2011-06-16 05:23:43 UTC (rev 10814)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2011-06-16 05:25:26 UTC (rev 10815)
@@ -44,6 +44,7 @@
  * A ClusterConnectionBridge
  *
  * @author tim
+ * @author Clebert Suconic
  *
  *
  */
@@ -68,6 +69,10 @@
    private final String targetNodeID;
 
    public ClusterConnectionBridge(final ServerLocatorInternal serverLocator,
+                                  final int reconnectAttempts,
+                                  final long retryInterval,
+                                  final double retryMultiplier,
+                                  final long maxRetryInterval,
                                   final UUID nodeUUID,
                                   final String targetNodeID,
                                   final SimpleString name,
@@ -88,6 +93,10 @@
                                   final TransportConfiguration connector) throws Exception
    {
       super(serverLocator,
+            reconnectAttempts,
+            retryInterval,
+            retryMultiplier,
+            maxRetryInterval,
             nodeUUID,
             name,
             queue,
@@ -231,34 +240,15 @@
       super.stop();
    }
    
-   @Override
-   protected ClientSessionFactory createSessionFactory() throws Exception
+   protected void failed(final boolean permanently)
    {
-      //We create the session factory using the specified connector
+      super.fail(permanently);
       
-      return serverLocator.createSessionFactory(connector);      
-   }
-   
-   @Override
-   public void connectionFailed(HornetQException me, boolean failedOver)
-   {
-	  if (isTrace)
-	  {
-	     log.trace("Connection Failed on ClusterConnectionBridge, failedOver = " + failedOver + ", sessionClosed = " + session.isClosed(), new Exception ("trace"));
-	  }
-
-      if (!failedOver && !session.isClosed())
+      if (permanently)
       {
-         try
-         {
-            session.cleanUp(true);
-         }
-         catch (Exception e)
-         {
-            log.warn("Unable to clean up the session after a connection failure", e);
-         }
+         log.debug("cluster node for bridge " + this.getName() + " is permanently down");
          serverLocator.notifyNodeDown(targetNodeID);
       }
-      super.connectionFailed(me, failedOver);
+      
    }
 }

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-06-16 05:23:43 UTC (rev 10814)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-06-16 05:25:26 UTC (rev 10815)
@@ -397,12 +397,17 @@
       {
          serverLocator.setNodeID(nodeUUID.toString());
 
-         serverLocator.setReconnectAttempts(reconnectAttempts);
+         serverLocator.setReconnectAttempts(0);
 
          serverLocator.setClusterConnection(true);
          serverLocator.setClusterTransportConfiguration(connector);
          serverLocator.setBackup(server.getConfiguration().isBackup());
          serverLocator.setInitialConnectAttempts(-1);
+//         serverLocator.setInitialConnectAttempts(1);
+         
+         serverLocator.setClientFailureCheckPeriod(clientFailureCheckPeriod);
+         serverLocator.setConnectionTTL(connectionTTL);
+
          if (serverLocator.getConfirmationWindowSize() < 0)
          {
         	// We can't have confirmationSize = -1 on the cluster Bridge
@@ -466,7 +471,7 @@
             {
                log.trace("Closing clustering record " + record);
             }
-            record.pause();
+            record.close();
          }
          catch (Exception e)
          {
@@ -481,6 +486,10 @@
                                    final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
                                    final boolean last)
    {
+      if (log.isDebugEnabled())
+      {
+         log.debug("node " + nodeID + " connectionPair = " + connectorPair + " is up");
+      }
       // discard notifications about ourselves unless its from our backup
 
       if (nodeID.equals(nodeUUID.toString()))
@@ -677,24 +686,28 @@
    protected Bridge createBridge(MessageFlowRecordImpl record) throws Exception
    {
       ClusterConnectionBridge bridge = new ClusterConnectionBridge(serverLocator,
-                                                  nodeUUID,
-                                                  record.getNodeID(),
-                                                  record.getQueueName(),
-                                                  record.getQueue(),
-                                                  executorFactory.getExecutor(),
-                                                  null,
-                                                  null,
-                                                  scheduledExecutor,
-                                                  null,
-                                                  useDuplicateDetection,
-                                                  clusterUser,
-                                                  clusterPassword,
-                                                  !backup,
-                                                  server.getStorageManager(),
-                                                  managementService.getManagementAddress(),
-                                                  managementService.getManagementNotificationAddress(),
-                                                  record,
-                                                  record.getConnector());
+                                                                   reconnectAttempts,
+                                                                   retryInterval,
+                                                                   retryIntervalMultiplier,
+                                                                   maxRetryInterval,
+                                                                   nodeUUID,
+                                                                   record.getNodeID(),
+                                                                   record.getQueueName(),
+                                                                   record.getQueue(),
+                                                                   executorFactory.getExecutor(),
+                                                                   null,
+                                                                   null,
+                                                                   scheduledExecutor,
+                                                                   null,
+                                                                   useDuplicateDetection,
+                                                                   clusterUser,
+                                                                   clusterPassword,
+                                                                   !backup,
+                                                                   server.getStorageManager(),
+                                                                   managementService.getManagementAddress(),
+                                                                   managementService.getManagementNotificationAddress(),
+                                                                   record,
+                                                                   record.getConnector());
 
        return bridge;
    }

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-06-16 05:23:43 UTC (rev 10814)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-06-16 05:25:26 UTC (rev 10815)
@@ -687,12 +687,14 @@
       }
 
       serverLocator.setConfirmationWindowSize(config.getConfirmationWindowSize());
-      serverLocator.setReconnectAttempts(config.getReconnectAttempts());
-      serverLocator.setRetryInterval(config.getRetryInterval());
-      serverLocator.setRetryIntervalMultiplier(config.getRetryIntervalMultiplier());
-      serverLocator.setMaxRetryInterval(config.getMaxRetryInterval());
+      
+      // We are going to manually retry on the bridge in case of failure
+      serverLocator.setReconnectAttempts(0);
+      serverLocator.setInitialConnectAttempts(1);
+
+      
+      
       serverLocator.setClientFailureCheckPeriod(config.getClientFailureCheckPeriod());
-      serverLocator.setInitialConnectAttempts(config.getReconnectAttempts());
       serverLocator.setBlockOnDurableSend(!config.isUseDuplicateDetection());
       serverLocator.setBlockOnNonDurableSend(!config.isUseDuplicateDetection());
       if (!config.isUseDuplicateDetection())
@@ -702,6 +704,10 @@
       }
       clusterLocators.add(serverLocator);
       Bridge bridge = new BridgeImpl(serverLocator,
+                                     config.getReconnectAttempts(),
+                                     config.getRetryInterval(),
+                                     config.getRetryIntervalMultiplier(),
+                                     config.getMaxRetryInterval(),
                                      nodeUUID,
                                      new SimpleString(config.getName()),
                                      queue,

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java	2011-06-16 05:23:43 UTC (rev 10814)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java	2011-06-16 05:25:26 UTC (rev 10815)
@@ -641,7 +641,7 @@
       {
          log.trace("Sending Notification = "  + notification + 
                    ", notificationEnabled=" + notificationsEnabled + 
-                   " messagingServerControl=" + messagingServerControl, new Exception ("trace"));
+                   " messagingServerControl=" + messagingServerControl);
       }
       if (messagingServerControl != null && notificationsEnabled)
       {



More information about the hornetq-commits mailing list