[hornetq-commits] JBoss hornetq SVN: r10969 - in branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core: replication/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Jul 12 09:31:04 EDT 2011


Author: ataylor
Date: 2011-07-12 09:31:03 -0400 (Tue, 12 Jul 2011)
New Revision: 10969

Modified:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
announce replication in backup thread and make sure to activate the cluster manager

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-07-12 07:40:24 UTC (rev 10968)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-07-12 13:31:03 UTC (rev 10969)
@@ -1217,7 +1217,7 @@
          updateArraysAndPairs();
       }
 
-      if (last)
+      if (last && topologyArray != null)
       {
          receivedTopology = true;
       }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-07-12 07:40:24 UTC (rev 10968)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-07-12 13:31:03 UTC (rev 10969)
@@ -87,7 +87,9 @@
    // Used on tests, to simulate failures on delete pages
    private boolean deletePages = true;
 
-   // Constructors --------------------------------------------------
+   private boolean started;
+
+    // Constructors --------------------------------------------------
    public ReplicationEndpointImpl(final HornetQServer server)
    {
       this.server = server;
@@ -198,7 +200,7 @@
     */
    public boolean isStarted()
    {
-      return true;
+      return started;
    }
 
    /* (non-Javadoc)
@@ -229,6 +231,8 @@
 
       pageManager.start();
 
+       started = true;
+
    }
 
    /* (non-Javadoc)
@@ -236,6 +240,10 @@
     */
    public void stop() throws Exception
    {
+      if(!started)
+      {
+          return;
+      }
       // This could be null if the backup server is being
       // shut down without any live server connecting here
       if (channel != null)
@@ -269,6 +277,8 @@
       largeMessages.clear();
 
       pageManager.stop();
+
+       started = false;
    }
 
    /* (non-Javadoc)

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-07-12 07:40:24 UTC (rev 10968)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-07-12 13:31:03 UTC (rev 10969)
@@ -537,44 +537,65 @@
             initialisePart1();
             clusterManager.start();
 
+
             String liveConnectorName = configuration.getLiveConnectorName();
             if (liveConnectorName == null)
             {
                throw new IllegalArgumentException("Cannot have a replicated backup without configuring its live-server!");
             }
             final TransportConfiguration config = configuration.getConnectorConfigurations().get(liveConnectorName);
-           serverLocator =
+            serverLocator =
                      (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(config);
 
             serverLocator.setReconnectAttempts(-1);
 
-            final ClientSessionFactory liveServerSessionFactory = serverLocator.connect();
-
-            if (liveServerSessionFactory == null)
+            threadPool.execute(new Runnable()
             {
-               // XXX
-               throw new RuntimeException("Need to retry...");
-            }
-            CoreRemotingConnection liveConnection = liveServerSessionFactory.getConnection();
-            Channel pingChannel = liveConnection.getChannel(CHANNEL_ID.PING.id, -1);
-            Channel replicationChannel = liveConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
+                @Override
+                public void run()
+                {
+                    try
+                    {
+                        final ClientSessionFactory liveServerSessionFactory = serverLocator.connect();
 
-            replicationChannel.setHandler(replicationEndpoint);
-            connectToReplicationEndpoint(replicationChannel);
-            replicationEndpoint.start();
+                        if (liveServerSessionFactory == null)
+                        {
+                           // XXX
+                           throw new RuntimeException("Need to retry...");
+                        }
+                        CoreRemotingConnection liveConnection = liveServerSessionFactory.getConnection();
+                        Channel pingChannel = liveConnection.getChannel(CHANNEL_ID.PING.id, -1);
+                        Channel replicationChannel = liveConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
 
-            clusterManager.announceReplicatingBackup(pingChannel);
+                        replicationChannel.setHandler(replicationEndpoint);
+                        connectToReplicationEndpoint(replicationChannel);
+                        replicationEndpoint.start();
 
+                        clusterManager.announceReplicatingBackup(pingChannel);
+                    }
+                    catch (Exception e)
+                    {
+                        log.warn("unable to announce backup for replication", e);
+                    }
+                }
+            });
+
+
             log.info("HornetQ Backup Server version " + getVersion().getFullVersion() + " [" + nodeManager.getNodeId() +
                      "] started, waiting live to fail before it gets active");
             started = true;
+
+            // Server node (i.e. Life node) is not running, now the backup takes over.
+            //we must remember to close stuff we don't need any more
             nodeManager.awaitLiveNode();
-            // Server node (i.e. Life node) is not running, now the backup takes over.
+            serverLocator.close();
             replicationEndpoint.stop();
             configuration.setBackup(false);
 
             initialisePart2();
 
+            clusterManager.activate();
+
          }
          catch (Exception e)
          {



More information about the hornetq-commits mailing list