[hornetq-commits] JBoss hornetq SVN: r11359 - in branches/Branch_2_2_EAP_cluster4: src/main/org/hornetq/core/protocol/core/impl and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Sep 16 23:00:36 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-09-16 23:00:35 -0400 (Fri, 16 Sep 2011)
New Revision: 11359

Modified:
   branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
   branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
   branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/Topology.java
   branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
   branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
   branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterManager.java
   branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
   branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
   branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
cluster cleanup

Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-09-17 02:31:55 UTC (rev 11358)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-09-17 03:00:35 UTC (rev 11359)
@@ -1111,6 +1111,11 @@
       }
    }
 
+   public String getIdentity()
+   {
+      return identity;
+   }
+   
    public void setIdentity(String identity)
    {
       this.identity = identity;
@@ -1282,8 +1287,8 @@
       {
          log.debug("nodeDown " + this + " nodeID=" + nodeID + " as being down", new Exception("trace"));
       }
-
-      if (topology.removeMember(eventTime, nodeID))
+      
+      if (!(isClusterConnection() && nodeID.equals(this.getNodeID())) && topology.removeMember(eventTime, nodeID))
       {
          if (topology.isEmpty())
          {

Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java	2011-09-17 02:31:55 UTC (rev 11358)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java	2011-09-17 03:00:35 UTC (rev 11359)
@@ -39,8 +39,12 @@
    
    void setAfterConnectionInternalListener(AfterConnectInternalListener listener);
    
-   /** Used to better identify Cluster Connection Locators on logs while debugging logs */
+   /** Used to better identify Cluster Connection Locators on logs. To facilitate eventual debugging.
+    * 
+    *  This method used to be on tests interface, but I'm now making it part of the public interface since*/
    void setIdentity(String identity);
+   
+   String getIdentity();
 
    void setNodeID(String nodeID);
 

Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/Topology.java	2011-09-17 02:31:55 UTC (rev 11358)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/Topology.java	2011-09-17 03:00:35 UTC (rev 11359)
@@ -108,7 +108,7 @@
       {
          if (log.isDebugEnabled())
          {
-            log.info(this + "::Live node " + nodeId + "=" + memberInput);
+            log.debug(this + "::node " + nodeId + "=" + memberInput);
          }
          memberInput.setUniqueEventID(System.currentTimeMillis());
          mapTopology.remove(nodeId);
@@ -212,7 +212,7 @@
                             currentMember +
                             ", memberInput=" +
                             memberInput +
-                            "newMember=" + newMember);
+                            "newMember=" + newMember, new Exception ("trace"));
                }
 
 
@@ -301,7 +301,7 @@
          {
             if (member.getUniqueEventID() > uniqueEventID)
             {
-               log.info("The removeMember was issued before the node " + nodeId + " was started, ignoring call");
+               log.debug("The removeMember was issued before the node " + nodeId + " was started, ignoring call");
                member = null;
             }
             else
@@ -482,22 +482,17 @@
    public synchronized String describe(final String text)
    {
 
-      String desc = text + "\n";
+      String desc = text + "topology on " + this + ":\n";
       for (Entry<String, TopologyMember> entry : new HashMap<String, TopologyMember>(mapTopology).entrySet())
       {
          desc += "\t" + entry.getKey() + " => " + entry.getValue() + "\n";
       }
       desc += "\t" + "nodes=" + nodes() + "\t" + "members=" + members();
-      return desc;
-   }
-
-   public void clear()
-   {
-      if (Topology.log.isDebugEnabled())
+      if (mapTopology.isEmpty())
       {
-         Topology.log.debug(this + "::clear", new Exception("trace"));
+         desc += "\tEmpty";
       }
-      mapTopology.clear();
+      return desc;
    }
 
    public int members()

Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-09-17 02:31:55 UTC (rev 11358)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-09-17 03:00:35 UTC (rev 11359)
@@ -178,16 +178,18 @@
                };
                
                final boolean isCC = msg.isClusterConnection();
-               
-               acceptorUsed.getClusterConnection().addClusterTopologyListener(listener, isCC);
-               
-               rc.addCloseListener(new CloseListener()
+               if (acceptorUsed.getClusterConnection() != null)
                {
-                  public void connectionClosed()
+                  acceptorUsed.getClusterConnection().addClusterTopologyListener(listener, isCC);
+                  
+                  rc.addCloseListener(new CloseListener()
                   {
-                     acceptorUsed.getClusterConnection().removeClusterTopologyListener(listener, isCC);
-                  }
-               });
+                     public void connectionClosed()
+                     {
+                        acceptorUsed.getClusterConnection().removeClusterTopologyListener(listener, isCC);
+                     }
+                  });
+               }
             }
             else if (packet.getType() == PacketImpl.NODE_ANNOUNCE)
             {

Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterConnection.java	2011-09-17 02:31:55 UTC (rev 11358)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterConnection.java	2011-09-17 03:00:35 UTC (rev 11359)
@@ -62,7 +62,7 @@
    // for debug
    String describe();
 
-   void announceNode();
+   void informTopology();
    
    void announceBackup();
 }

Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterManager.java	2011-09-17 02:31:55 UTC (rev 11358)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterManager.java	2011-09-17 03:00:35 UTC (rev 11359)
@@ -49,6 +49,8 @@
    void flushExecutor();
 
    void announceBackup() throws Exception;
+   
+   void deploy() throws Exception;
 
    void deployBridge(BridgeConfiguration config) throws Exception;
 

Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-09-17 02:31:55 UTC (rev 11358)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-09-17 03:00:35 UTC (rev 11359)
@@ -449,7 +449,7 @@
    
    public void announceBackup()
    {
-      this.backupServerLocator = clusterConnector.createServerLocator();
+      this.backupServerLocator = clusterConnector.createServerLocator(false);
       
       backupServerLocator.setReconnectAttempts(-1);
       backupServerLocator.setInitialConnectAttempts(-1);
@@ -475,9 +475,9 @@
                                                                     true,
                                                                     connector,
                                                                     null));
+                  backupSessionFactory.close();
                   log.info("backup announced");
                }
-               //backupSessionFactory.close();
             }
             catch (Exception e)
             {
@@ -619,7 +619,7 @@
 
 
 
-      serverLocator = clusterConnector.createServerLocator();
+      serverLocator = clusterConnector.createServerLocator(true);
 
       if (serverLocator != null)
       {
@@ -674,6 +674,7 @@
          log.debug("sending notification: " + notification);
          managementService.sendNotification(notification);
       }
+
    }
 
    public TransportConfiguration getConnector()
@@ -780,7 +781,6 @@
                {
                   log.debug(this + "::Creating record for nodeID=" + nodeID + ", connectorPair=" + connectorPair);
                }
-               log.info(this + "::Creating record for nodeID=" + nodeID + ", connectorPair=" + connectorPair);
 
                // New node - create a new flow record
 
@@ -822,7 +822,7 @@
       }
    }
    
-   public synchronized void announceNode()
+   public synchronized void informTopology()
    {
       String nodeID = server.getNodeID().toString();
       
@@ -1504,7 +1504,8 @@
    @Override
    public String toString()
    {
-      return "ClusterConnectionImpl [nodeUUID=" + nodeUUID +
+      return "ClusterConnectionImpl@" + System.identityHashCode(this)  + 
+             "[nodeUUID=" + nodeUUID +
              ", connector=" +
              connector +
              ", address=" +
@@ -1534,7 +1535,7 @@
 
    interface ClusterConnector
    {
-      ServerLocatorInternal createServerLocator();
+      ServerLocatorInternal createServerLocator(boolean includeTopology);
    }
 
    private class StaticClusterConnector implements ClusterConnector
@@ -1546,7 +1547,7 @@
          this.tcConfigs = tcConfigs;
       }
 
-      public ServerLocatorInternal createServerLocator()
+      public ServerLocatorInternal createServerLocator(boolean includeTopology)
       {
          if (tcConfigs != null && tcConfigs.length > 0)
          {
@@ -1554,7 +1555,9 @@
             {
                log.debug(ClusterConnectionImpl.this + "Creating a serverLocator for " + Arrays.toString(tcConfigs));
             }
-            return new ServerLocatorImpl(topology, true, tcConfigs);
+            ServerLocatorImpl locator = new ServerLocatorImpl(includeTopology ? topology : null, true, tcConfigs);
+            locator.setClusterConnection(true);
+            return locator;
          }
          else
          {
@@ -1582,9 +1585,11 @@
          this.dg = dg;
       }
 
-      public ServerLocatorInternal createServerLocator()
+      public ServerLocatorInternal createServerLocator(boolean includeTopology)
       {
-         return new ServerLocatorImpl(topology, true, dg);
+         ServerLocatorImpl locator = new ServerLocatorImpl(includeTopology ? topology : null, true, dg);
+         return locator;
+
       }
    }
 }

Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-09-17 02:31:55 UTC (rev 11358)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-09-17 03:00:35 UTC (rev 11359)
@@ -168,13 +168,8 @@
       return nodeUUID.toString();
    }
 
-   public synchronized void start() throws Exception
+   public synchronized void deploy() throws Exception
    {
-      if (started)
-      {
-         return;
-      }
-
       if (clustered)
       {
          for (BroadcastGroupConfiguration config : configuration.getBroadcastGroupConfigurations())
@@ -186,22 +181,48 @@
          {
             deployClusterConnection(config);
          }
+      }
 
-         for (ClusterConnection conn : clusterConnections.values())
+      for (BridgeConfiguration config : configuration.getBridgeConfigurations())
+      {
+         deployBridge(config);
+      }
+   }
+
+   public synchronized void start() throws Exception
+   {
+      if (started)
+      {
+         return;
+      }
+
+      for (BroadcastGroup group: broadcastGroups.values())
+      {
+         if (!backup)
          {
-            conn.announceNode();
-            if (backup)
-            {
-               conn.announceBackup();
-            }
+            group.start();
          }
       }
+      
+      for (ClusterConnection conn : clusterConnections.values())
+      {
+         conn.start();
+         if (backup)
+         {
+            conn.informTopology();
+            conn.announceBackup();
+         }
+      }
 
-      for (BridgeConfiguration config : configuration.getBridgeConfigurations())
+      for (Bridge bridge : bridges.values())
       {
-         deployBridge(config);
+         if (!backup)
+         {
+            bridge.start();
+         }
       }
 
+
       started = true;
    }
 
@@ -255,7 +276,7 @@
       clusterLocators.clear();
       started = false;
 
-      clusterConnections.clear();
+      clearClusterConnections();
    }
 
    public void flushExecutor()
@@ -487,10 +508,6 @@
 
       managementService.registerBridge(bridge, config);
 
-      if (!backup)
-      {
-         bridge.start();
-      }
    }
 
    public void destroyBridge(final String name) throws Exception
@@ -536,11 +553,18 @@
             e.printStackTrace();
          }
       }
-      clusterConnections.clear();
+      clearClusterConnections();
    }
 
    // Private methods ----------------------------------------------------------------------------------------------------
    
+   
+   private void clearClusterConnections()
+   {
+      clusterConnections.clear();
+      this.defaultClusterConnection = null;
+   }
+   
    private void deployClusterConnection(final ClusterConnectionConfiguration config) throws Exception
    {
       if (config.getName() == null)
@@ -673,7 +697,6 @@
       {
          log.debug("ClusterConnection.start at " + clusterConnection, new Exception("trace"));
       }
-      clusterConnection.start();
    }
    
    private Transformer instantiateTransformer(final String transformerClassName)
@@ -748,11 +771,6 @@
       broadcastGroups.put(config.getName(), group);
 
       managementService.registerBroadcastGroup(group, config);
-
-      if (!backup)
-      {
-         group.start();
-      }
    }
 
    private void logWarnNoConnector(final String connectorName, final String bgName)

Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-09-17 02:31:55 UTC (rev 11358)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-09-17 03:00:35 UTC (rev 11359)
@@ -227,7 +227,12 @@
    
    // Used to identify the server on tests... useful on debugging testcases
    private String identity;
+   
+   private Thread backupActivationThread;
 
+   private Activation activation;
+
+
    // Constructors
    // ---------------------------------------------------------------------------------
 
@@ -289,11 +294,6 @@
    // lifecycle methods
    // ----------------------------------------------------------------
 
-   private interface Activation extends Runnable
-   {
-      void close(boolean permanently) throws Exception;
-   }
-
    /*
     * Can be overridden for tests
     */
@@ -309,259 +309,6 @@
       }
    }
 
-   private class NoSharedStoreLiveActivation implements Activation
-   {
-      public void run()
-      {
-         try
-         {
-            initialisePart1();
-
-            initialisePart2();
-
-            if (identity != null)
-            {
-               log.info("Server " + identity + " is now live");
-            }
-            else
-            {
-               log.info("Server is now live");
-            }
-         }
-         catch (Exception e)
-         {
-            log.error("Failure in initialisation", e);
-         }
-      }
-
-      public void close(boolean permanently) throws Exception
-      {
-
-      }
-   }
-
-   private class SharedStoreLiveActivation implements Activation
-   {
-      public void run()
-      {
-         try
-         {
-            log.info("Waiting to obtain live lock");
-
-            checkJournalDirectory();
-
-            initialisePart1();
-
-            if(nodeManager.isBackupLive())
-            {
-               //looks like we've failed over at some point need to inform that we are the backup so when the current live
-               // goes down they failover to us
-               clusterManager.announceBackup();
-               Thread.sleep(configuration.getFailbackDelay());
-            }
-
-            nodeManager.startLiveNode();
-
-            if (stopped)
-            {
-               return;
-            }
-            
-            initialisePart2();
-            
-            log.info("Server is now live");
-         }
-         catch (Exception e)
-         {
-            log.error("Failure in initialisation", e);
-         }
-      }
-
-      public void close(boolean permanently) throws Exception
-      {
-         if(permanently)
-         {
-            nodeManager.crashLiveServer();
-         }
-         else
-         {
-            nodeManager.pauseLiveServer();
-         }
-      }
-   }
-
-
-   private class SharedStoreBackupActivation implements Activation
-   {
-      
-      volatile boolean closed = false;
-      public void run()
-      {
-         try
-         {
-            nodeManager.startBackup();
-
-            initialisePart1();
-
-            clusterManager.start();
-
-            started = true;
-
-            log.info("HornetQ Backup Server version " + getVersion().getFullVersion() + " [" + nodeManager.getNodeId() + "] started, waiting live to fail before it gets active");
-
-            nodeManager.awaitLiveNode();
-            
-            configuration.setBackup(false);
-            
-            if (stopped)
-            {
-               return;
-            }
-            
-            initialisePart2();
-            
-            clusterManager.activate();
-
-            log.info("Backup Server is now live");
-
-            nodeManager.releaseBackup();
-            if(configuration.isAllowAutoFailBack())
-            {
-               class FailbackChecker implements Runnable
-               {
-                  boolean restarting = false;
-                  public void run()
-                  {
-                     try
-                     {
-                        if(!restarting && nodeManager.isAwaitingFailback())
-                        {
-                           log.info("live server wants to restart, restarting server in backup");
-                           restarting = true;
-                           Thread t = new Thread(new Runnable()
-                           {
-                              public void run()
-                              {
-                                 try
-                                 {
-                                    log.debug(HornetQServerImpl.this + "::Stopping live node in favor of failback");
-                                    stop(true);
-                                    // We need to wait some time before we start the backup again
-                                    // otherwise we may eventually start before the live had a chance to get it
-                                    Thread.sleep(configuration.getFailbackDelay());
-                                    configuration.setBackup(true);
-                                    log.debug(HornetQServerImpl.this + "::Starting backup node now after failback");
-                                    start();
-                                 }
-                                 catch (Exception e)
-                                 {
-                                    log.warn("unable to restart server, please kill and restart manually", e);
-                                 }
-                              }
-                           });
-                           t.start();
-                        }
-                     }
-                     catch (Exception e)
-                     {
-                        log.debug(e.getMessage(), e);
-                        //hopefully it will work next call
-                     }
-                  }
-               }
-               scheduledPool.scheduleAtFixedRate(new FailbackChecker(),  1000l, 1000l, TimeUnit.MILLISECONDS);
-            }
-         }
-         catch (InterruptedException e)
-         {
-            //this is ok, we are being stopped
-         }
-         catch (ClosedChannelException e)
-         {
-            //this is ok too, we are being stopped
-         }
-         catch (Exception e)
-         {
-            if(!(e.getCause() instanceof InterruptedException))
-            {
-               log.error("Failure in initialisation", e);
-            }
-         }
-         catch(Throwable e)
-         {
-            log.error("Failure in initialisation", e);
-         }
-      }
-
-      public void close(boolean permanently) throws Exception
-      {
-         if (configuration.isBackup())
-         {
-            long timeout = 30000;
-
-            long start = System.currentTimeMillis();
-
-            while (backupActivationThread.isAlive() && System.currentTimeMillis() - start < timeout)
-            {
-               nodeManager.interrupt();
-
-               backupActivationThread.interrupt();
-               
-               backupActivationThread.join(1000);
-
-            }
-
-            if (System.currentTimeMillis() - start >= timeout)
-            {
-               threadDump("Timed out waiting for backup activation to exit");
-            }
-
-            nodeManager.stopBackup();
-         }
-         else
-         {
-            //if we are now live, behave as live
-            // We need to delete the file too, otherwise the backup will failover when we shutdown or if the backup is
-            // started before the live
-            if(permanently)
-            {
-               nodeManager.crashLiveServer();
-            }
-            else
-            {
-               nodeManager.pauseLiveServer();
-            }
-         }
-      }
-   }
-
-   private class SharedNothingBackupActivation implements Activation
-   {
-      public void run()
-      {
-         try
-         {
-            // TODO
-
-            // Try-Connect to live server using live-connector-ref
-
-            // sit in loop and try and connect, if server is not live then it will return NOT_LIVE
-         }
-         catch (Exception e)
-         {
-            log.error("Failure in initialisation", e);
-         }
-      }
-
-      public void close(boolean permanently) throws Exception
-      {
-      }
-   }
-
-   private Thread backupActivationThread;
-
-   private Activation activation;
-
    public synchronized void start() throws Exception
    {
       stopped = false;
@@ -611,6 +358,7 @@
       }
 
 
+      // The activation on fail-back may change the value of isBackup, for that reason we are not using else here
       if (configuration.isBackup())
       {
          if (configuration.isSharedStore())
@@ -1069,7 +817,6 @@
       return new HashSet<ServerSession>(sessions.values());
    }
 
-   // TODO - should this really be here?? It's only used in tests
    public boolean isInitialised()
    {
       synchronized (initialiseLock)
@@ -1232,9 +979,145 @@
       return connectorsService;
    }
 
-   // Public
-   // ---------------------------------------------------------------------------------------
+    
+   public synchronized boolean checkActivate() throws Exception
+   {
+      if (configuration.isBackup())
+      {
+         // Handle backup server activation
 
+         if (!configuration.isSharedStore())
+         {
+            if (replicationEndpoint == null)
+            {
+               HornetQServerImpl.log.warn("There is no replication endpoint, can't activate this backup server");
+
+               throw new HornetQException(HornetQException.INTERNAL_ERROR, "Can't activate the server");
+            }
+
+            replicationEndpoint.stop();
+         }
+
+         // Complete the startup procedure
+
+         HornetQServerImpl.log.info("Activating backup server");
+
+         configuration.setBackup(false);
+
+         initialisePart2();
+      }
+
+      return true;
+   }
+
+   public void deployDivert(DivertConfiguration config) throws Exception
+   {
+      if (config.getName() == null)
+      {
+         HornetQServerImpl.log.warn("Must specify a name for each divert. This one will not be deployed.");
+
+         return;
+      }
+
+      if (config.getAddress() == null)
+      {
+         HornetQServerImpl.log.warn("Must specify an address for each divert. This one will not be deployed.");
+
+         return;
+      }
+
+      if (config.getForwardingAddress() == null)
+      {
+         HornetQServerImpl.log.warn("Must specify an forwarding address for each divert. This one will not be deployed.");
+
+         return;
+      }
+
+      SimpleString sName = new SimpleString(config.getName());
+
+      if (postOffice.getBinding(sName) != null)
+      {
+         HornetQServerImpl.log.warn("Binding already exists with name " + sName + ", divert will not be deployed");
+
+         return;
+      }
+
+      SimpleString sAddress = new SimpleString(config.getAddress());
+
+      Transformer transformer = instantiateTransformer(config.getTransformerClassName());
+
+      Filter filter = FilterImpl.createFilter(config.getFilterString());
+
+      Divert divert = new DivertImpl(new SimpleString(config.getForwardingAddress()),
+                                     sName,
+                                     new SimpleString(config.getRoutingName()),
+                                     config.isExclusive(),
+                                     filter,
+                                     transformer,
+                                     postOffice,
+                                     storageManager);
+
+      Binding binding = new DivertBinding(storageManager.generateUniqueID(), sAddress, divert);
+
+      postOffice.addBinding(binding);
+
+      managementService.registerDivert(divert, config);
+   }
+   
+   public void destroyDivert(SimpleString name) throws Exception
+   {
+      Binding binding = postOffice.getBinding(name);
+      if (binding == null)
+      {
+         throw new HornetQException(HornetQException.INTERNAL_ERROR, "No binding for divert " + name);
+      }
+      if (!(binding instanceof DivertBinding))
+      {
+         throw new HornetQException(HornetQException.INTERNAL_ERROR, "Binding " + name + " is not a divert");
+      }
+
+      postOffice.removeBinding(name);
+   }
+
+
+
+   public void deployBridge(BridgeConfiguration config) throws Exception
+   {
+      if (clusterManager != null)
+      {
+         clusterManager.deployBridge(config);
+      }
+   }
+   
+   public void destroyBridge(String name) throws Exception
+   {
+      if (clusterManager != null)
+      {
+         clusterManager.destroyBridge(name);
+      }
+   }
+   
+   public ServerSession getSessionByID(String sessionName)
+   {
+      return sessions.get(sessionName);
+   }
+   
+   // PUBLIC -------
+   
+   public String toString()
+   {
+      if (identity != null)
+      {
+         return "HornetQServerImpl::" + identity;
+      }
+      else
+      {
+         return "HornetQServerImpl::" + (nodeManager != null ? "serverUUID=" + nodeManager.getUUID() : "");
+      }
+   }
+
+
+
    // Package protected
    // ----------------------------------------------------------------------------
 
@@ -1296,34 +1179,6 @@
    // Private
    // --------------------------------------------------------------------------------------
 
-   // private boolean startReplication() throws Exception
-   // {
-   // String backupConnectorName = configuration.getBackupConnectorName();
-   //
-   // if (!configuration.isSharedStore() && backupConnectorName != null)
-   // {
-   // TransportConfiguration backupConnector = configuration.getConnectorConfigurations().get(backupConnectorName);
-   //
-   // if (backupConnector == null)
-   // {
-   // HornetQServerImpl.log.warn("connector with name '" + backupConnectorName +
-   // "' is not defined in the configuration.");
-   // }
-   // else
-   // {
-   //
-   // replicationFailoverManager = createBackupConnectionFailoverManager(backupConnector,
-   // threadPool,
-   // scheduledPool);
-   //
-   // replicationManager = new ReplicationManagerImpl(replicationFailoverManager, executorFactory);
-   // replicationManager.start();
-   // }
-   // }
-   //
-   // return true;
-   // }
-
    private void callActivateCallbacks()
    {
       for (ActivateCallback callback : activateCallbacks)
@@ -1340,44 +1195,6 @@
       }
    }
 
-   public synchronized boolean checkActivate() throws Exception
-   {
-      if (configuration.isBackup())
-      {
-         // Handle backup server activation
-
-         if (!configuration.isSharedStore())
-         {
-            if (replicationEndpoint == null)
-            {
-               HornetQServerImpl.log.warn("There is no replication endpoint, can't activate this backup server");
-
-               throw new HornetQException(HornetQException.INTERNAL_ERROR, "Can't activate the server");
-            }
-
-            replicationEndpoint.stop();
-         }
-
-         // Complete the startup procedure
-
-         HornetQServerImpl.log.info("Activating backup server");
-
-         configuration.setBackup(false);
-
-         initialisePart2();
-      }
-
-      return true;
-   }
-
-   private class FileActivateRunner implements Runnable
-   {
-      public void run()
-      {
-
-      }
-   }
-
    private void initialiseLogging()
    {
       LogDelegateFactory logDelegateFactory = (LogDelegateFactory)instantiateInstance(configuration.getLogDelegateFactoryClassName());
@@ -1478,8 +1295,11 @@
                                               nodeManager.getUUID(),
                                               configuration.isBackup(),
                                               configuration.isClustered());
+      
 
+      clusterManager.deploy();
 
+
       remotingService = new RemotingServiceImpl(clusterManager, configuration, this, managementService, scheduledPool);
 
       messagingServerControl = managementService.registerServer(postOffice,
@@ -1826,76 +1646,6 @@
       }
    }
 
-   public void deployDivert(DivertConfiguration config) throws Exception
-   {
-      if (config.getName() == null)
-      {
-         HornetQServerImpl.log.warn("Must specify a name for each divert. This one will not be deployed.");
-
-         return;
-      }
-
-      if (config.getAddress() == null)
-      {
-         HornetQServerImpl.log.warn("Must specify an address for each divert. This one will not be deployed.");
-
-         return;
-      }
-
-      if (config.getForwardingAddress() == null)
-      {
-         HornetQServerImpl.log.warn("Must specify an forwarding address for each divert. This one will not be deployed.");
-
-         return;
-      }
-
-      SimpleString sName = new SimpleString(config.getName());
-
-      if (postOffice.getBinding(sName) != null)
-      {
-         HornetQServerImpl.log.warn("Binding already exists with name " + sName + ", divert will not be deployed");
-
-         return;
-      }
-
-      SimpleString sAddress = new SimpleString(config.getAddress());
-
-      Transformer transformer = instantiateTransformer(config.getTransformerClassName());
-
-      Filter filter = FilterImpl.createFilter(config.getFilterString());
-
-      Divert divert = new DivertImpl(new SimpleString(config.getForwardingAddress()),
-                                     sName,
-                                     new SimpleString(config.getRoutingName()),
-                                     config.isExclusive(),
-                                     filter,
-                                     transformer,
-                                     postOffice,
-                                     storageManager);
-
-      Binding binding = new DivertBinding(storageManager.generateUniqueID(), sAddress, divert);
-
-      postOffice.addBinding(binding);
-
-      managementService.registerDivert(divert, config);
-   }
-   
-   public void destroyDivert(SimpleString name) throws Exception
-   {
-      Binding binding = postOffice.getBinding(name);
-      if (binding == null)
-      {
-         throw new HornetQException(HornetQException.INTERNAL_ERROR, "No binding for divert " + name);
-      }
-      if (!(binding instanceof DivertBinding))
-      {
-         throw new HornetQException(HornetQException.INTERNAL_ERROR, "Binding " + name + " is not a divert");
-      }
-
-      postOffice.removeBinding(name);
-   }
-
-
    private synchronized void deployGroupingHandlerConfiguration(final GroupingHandlerConfiguration config) throws Exception
    {
       if (config != null)
@@ -1922,22 +1672,6 @@
          managementService.addNotificationListener(groupingHandler);
       }
    }
-   
-   public void deployBridge(BridgeConfiguration config) throws Exception
-   {
-      if (clusterManager != null)
-      {
-         clusterManager.deployBridge(config);
-      }
-   }
-   
-   public void destroyBridge(String name) throws Exception
-   {
-      if (clusterManager != null)
-      {
-         clusterManager.destroyBridge(name);
-      }
-   }
 
    private Transformer instantiateTransformer(final String transformerClassName)
    {
@@ -1979,11 +1713,6 @@
 
    }
 
-   public ServerSession getSessionByID(String sessionName)
-   {
-      return sessions.get(sessionName);
-   }
-   
    /**
     * Check if journal directory exists or create it (if configured to do so)
     */
@@ -2005,18 +1734,284 @@
       }
    }
    
-   public String toString()
+   /**
+    * To be called by backup trying to fail back the server
+    */
+   private void startFailbackChecker()
    {
-      if (identity != null)
+      scheduledPool.scheduleAtFixedRate(new FailbackChecker(),  1000l, 1000l, TimeUnit.MILLISECONDS);
+   }
+
+
+   // Inner classes
+   // --------------------------------------------------------------------------------
+   
+   class FailbackChecker implements Runnable
+   {
+      boolean restarting = false;
+      public void run()
       {
-      	return "HornetQServerImpl::" + identity;
+         try
+         {
+            if(!restarting && nodeManager.isAwaitingFailback())
+            {
+               log.info("live server wants to restart, restarting server in backup");
+               restarting = true;
+               Thread t = new Thread(new Runnable()
+               {
+                  public void run()
+                  {
+                     try
+                     {
+                        log.debug(HornetQServerImpl.this + "::Stopping live node in favor of failback");
+                        stop(true);
+                        // We need to wait some time before we start the backup again
+                        // otherwise we may eventually start before the live had a chance to get it
+                        Thread.sleep(configuration.getFailbackDelay());
+                        configuration.setBackup(true);
+                        log.debug(HornetQServerImpl.this + "::Starting backup node now after failback");
+                        start();
+                     }
+                     catch (Exception e)
+                     {
+                        log.warn("unable to restart server, please kill and restart manually", e);
+                     }
+                  }
+               });
+               t.start();
+            }
+         }
+         catch (Exception e)
+         {
+            log.warn(e.getMessage(), e);
+         }
       }
-      else
+   }
+
+   
+
+   private class SharedStoreLiveActivation implements Activation
+   {
+      public void run()
       {
-      	return "HornetQServerImpl::" + (nodeManager != null ? "serverUUID=" + nodeManager.getUUID() : "");
+         try
+         {
+            log.info("Waiting to obtain live lock");
+
+            checkJournalDirectory();
+            
+            if (log.isDebugEnabled())
+            {
+               log.debug("First part initialization on " + this);
+            }
+
+            initialisePart1();
+
+            if(nodeManager.isBackupLive())
+            {
+               //looks like we've failed over at some point need to inform that we are the backup so when the current live
+               // goes down they failover to us
+               if (log.isDebugEnabled())
+               {
+                  log.debug("announcing backup to the former live" + this);
+               }
+
+               clusterManager.announceBackup();
+               Thread.sleep(configuration.getFailbackDelay());
+            }
+
+            nodeManager.startLiveNode();
+
+            if (stopped)
+            {
+               return;
+            }
+            
+            initialisePart2();
+            
+            log.info("Server is now live");
+         }
+         catch (Exception e)
+         {
+            log.error("Failure in initialisation", e);
+         }
       }
+
+      public void close(boolean permanently) throws Exception
+      {
+         if(permanently)
+         {
+            nodeManager.crashLiveServer();
+         }
+         else
+         {
+            nodeManager.pauseLiveServer();
+         }
+      }
    }
 
-   // Inner classes
-   // --------------------------------------------------------------------------------
+
+   private class SharedStoreBackupActivation implements Activation
+   {
+      public void run()
+      {
+         try
+         {
+            nodeManager.startBackup();
+
+            initialisePart1();
+            
+            clusterManager.start();
+
+            started = true;
+
+            log.info("HornetQ Backup Server version " + getVersion().getFullVersion() + " [" + nodeManager.getNodeId() + "] started, waiting live to fail before it gets active");
+
+            nodeManager.awaitLiveNode();
+            
+            configuration.setBackup(false);
+            
+            if (stopped)
+            {
+               return;
+            }
+            
+            initialisePart2();
+            
+            clusterManager.activate();
+
+            log.info("Backup Server is now live");
+
+            nodeManager.releaseBackup();
+            if(configuration.isAllowAutoFailBack())
+            {
+               startFailbackChecker();
+            }
+         }
+         catch (InterruptedException e)
+         {
+            //this is ok, we are being stopped
+         }
+         catch (ClosedChannelException e)
+         {
+            //this is ok too, we are being stopped
+         }
+         catch (Exception e)
+         {
+            if(!(e.getCause() instanceof InterruptedException))
+            {
+               log.error("Failure in initialisation", e);
+            }
+         }
+         catch(Throwable e)
+         {
+            log.error("Failure in initialisation", e);
+         }
+      }
+
+      /**
+       * 
+       */
+      public void close(boolean permanently) throws Exception
+      {
+         if (configuration.isBackup())
+         {
+            long timeout = 30000;
+
+            long start = System.currentTimeMillis();
+
+            while (backupActivationThread.isAlive() && System.currentTimeMillis() - start < timeout)
+            {
+               nodeManager.interrupt();
+
+               backupActivationThread.interrupt();
+               
+               backupActivationThread.join(1000);
+
+            }
+
+            if (System.currentTimeMillis() - start >= timeout)
+            {
+               threadDump("Timed out waiting for backup activation to exit");
+            }
+
+            nodeManager.stopBackup();
+         }
+         else
+         {
+            //if we are now live, behave as live
+            // We need to delete the file too, otherwise the backup will failover when we shutdown or if the backup is
+            // started before the live
+            if(permanently)
+            {
+               nodeManager.crashLiveServer();
+            }
+            else
+            {
+               nodeManager.pauseLiveServer();
+            }
+         }
+      }
+   }
+   
+   private interface Activation extends Runnable
+   {
+      void close(boolean permanently) throws Exception;
+   }
+
+   private class SharedNothingBackupActivation implements Activation
+   {
+      public void run()
+      {
+         try
+         {
+            // TODO
+
+            // Try-Connect to live server using live-connector-ref
+
+            // sit in loop and try and connect, if server is not live then it will return NOT_LIVE
+         }
+         catch (Exception e)
+         {
+            log.error("Failure in initialisation", e);
+         }
+      }
+
+      public void close(boolean permanently) throws Exception
+      {
+      }
+   }
+
+   private class NoSharedStoreLiveActivation implements Activation
+   {
+      public void run()
+      {
+         try
+         {
+            initialisePart1();
+
+            initialisePart2();
+
+            if (identity != null)
+            {
+               log.info("Server " + identity + " is now live");
+            }
+            else
+            {
+               log.info("Server is now live");
+            }
+         }
+         catch (Exception e)
+         {
+            log.error("Failure in initialisation", e);
+         }
+      }
+
+      public void close(boolean permanently) throws Exception
+      {
+
+      }
+   }
+
+
 }

Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2011-09-17 02:31:55 UTC (rev 11358)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2011-09-17 03:00:35 UTC (rev 11359)
@@ -1062,7 +1062,7 @@
 
          return;
       }
-
+      
       consumer.receiveCredits(credits);
    }
 

Modified: branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java	2011-09-17 02:31:55 UTC (rev 11358)
+++ branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java	2011-09-17 03:00:35 UTC (rev 11359)
@@ -28,6 +28,7 @@
 import org.hornetq.api.core.client.ClientProducer;
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.api.core.client.SessionFailureListener;
 import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
@@ -76,18 +77,28 @@
       locator.setBlockOnDurableSend(true);
       locator.setFailoverOnInitialConnection(true);
       locator.setReconnectAttempts(-1);
+      ((ServerLocatorInternal)locator).setIdentity("testAutoFailback");
+       
       ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
       final CountDownLatch latch = new CountDownLatch(1);
 
       ClientSession session = sendAndConsume(sf, true);
+      
+      System.out.println(locator.getTopology().describe());
 
       MyListener listener = new MyListener(latch);
 
       session.addFailureListener(listener);
+      
+      System.out.println(locator.getTopology().describe());
 
       liveServer.crash();
-
+      
       assertTrue(latch.await(5, TimeUnit.SECONDS));
+      
+      log.info("backup (nowLive) topology = " + backupServer.getServer().getClusterManager().getDefaultConnection().getTopology().describe());
+      
+      log.info("Server Crash!!!");
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
@@ -97,6 +108,11 @@
 
       producer.send(message);
 
+      verifyMessageOnServer(1, 1);
+
+      System.out.println(locator.getTopology().describe());
+      
+
       session.removeFailureListener(listener);
 
       final CountDownLatch latch2 = new CountDownLatch(1);
@@ -107,6 +123,10 @@
 
       log.info("******* starting live server back");
       liveServer.start();
+      
+      Thread.sleep(1000);
+      
+      System.out.println("After failback: " + locator.getTopology().describe());
 
       assertTrue(latch2.await(5, TimeUnit.SECONDS));
 
@@ -118,6 +138,8 @@
 
       session.close();
 
+      verifyMessageOnServer(0, 1);
+
       sf.close();
 
       Assert.assertEquals(0, sf.numSessions());
@@ -125,6 +147,29 @@
       Assert.assertEquals(0, sf.numConnections());
    }
 
+   /**
+    * @throws Exception
+    * @throws HornetQException
+    */
+   private void verifyMessageOnServer(final int server, final int numberOfMessages) throws Exception, HornetQException
+   {
+      ServerLocator backupLocator = createInVMLocator(server);
+      ClientSessionFactory factorybkp = backupLocator.createSessionFactory();
+      ClientSession sessionbkp = factorybkp.createSession(false, false);
+      sessionbkp.start();
+      ClientConsumer consumerbkp = sessionbkp.createConsumer(ADDRESS);
+      for (int i = 0 ; i < numberOfMessages; i++)
+      {
+         ClientMessage msg = consumerbkp.receive(1000);
+         assertNotNull(msg);
+         msg.acknowledge();
+         sessionbkp.commit();
+      }
+      sessionbkp.close();
+      factorybkp.close();
+      backupLocator.close();
+   }
+
    public void testAutoFailbackThenFailover() throws Exception
    {
       locator.setBlockOnNonDurableSend(true);
@@ -253,7 +298,7 @@
 
       if (createQueue)
       {
-         session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, false);
+         session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
       }
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
@@ -288,6 +333,8 @@
       }
 
       ClientMessage message3 = consumer.receiveImmediate();
+      
+      consumer.close();
 
       Assert.assertNull(message3);
 
@@ -315,6 +362,7 @@
 
       public void connectionFailed(final HornetQException me, boolean failedOver)
       {
+         System.out.println("Failed, me");
          latch.countDown();
       }
 

Modified: branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/util/ServiceTestBase.java	2011-09-17 02:31:55 UTC (rev 11358)
+++ branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/util/ServiceTestBase.java	2011-09-17 03:00:35 UTC (rev 11359)
@@ -38,6 +38,7 @@
 import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
 import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
 import org.hornetq.core.remoting.impl.invm.InVMRegistry;
+import org.hornetq.core.remoting.impl.invm.TransportConstants;
 import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
 import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
 import org.hornetq.core.server.HornetQServer;
@@ -530,7 +531,19 @@
       locators.add(locatorWithoutHA);
       return locatorWithoutHA;
    }
+   
+   protected ServerLocator createInVMLocator(final int serverID)
+   {
+      Map<String, Object> server1Params = new HashMap<String, Object>();
 
+      if (serverID != 0)
+      {
+         server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, serverID);
+      }
+
+      return HornetQClient.createServerLocatorWithHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY, server1Params));
+   }
+ 
    protected ClientSessionFactoryImpl createFactory(final String connectorClass) throws Exception
    {
       ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(connectorClass));



More information about the hornetq-commits mailing list