[hornetq-commits] JBoss hornetq SVN: r10886 - in branches/HORNETQ-720_Replication: tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Jun 27 10:18:53 EDT 2011


Author: borges
Date: 2011-06-27 10:18:52 -0400 (Mon, 27 Jun 2011)
New Revision: 10886

Modified:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
Log:
HORNETQ-720 Add some replication support

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-06-27 14:14:47 UTC (rev 10885)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-06-27 14:18:52 UTC (rev 10886)
@@ -38,8 +38,12 @@
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
 import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
 import org.hornetq.core.config.BridgeConfiguration;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.CoreQueueConfiguration;
@@ -76,6 +80,7 @@
 import org.hornetq.core.postoffice.impl.LocalQueueBinding;
 import org.hornetq.core.postoffice.impl.PostOfficeImpl;
 import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
 import org.hornetq.core.remoting.server.RemotingService;
 import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
 import org.hornetq.core.replication.ReplicationEndpoint;
@@ -133,7 +138,7 @@
    // ------------------------------------------------------------------------------------
 
    private static final Logger log = Logger.getLogger(HornetQServerImpl.class);
-   
+
    // JMS Topics (which are outside of the scope of the core API) will require a dumb subscription with a dummy-filter at this current version
    // as a way to keep its existence valid and TCK tests
    // That subscription needs an invalid filter, however paging needs to ignore any subscription with this filter.
@@ -147,7 +152,7 @@
    // Attributes
    // -----------------------------------------------------------------------------------
 
-   
+
    private final Version version;
 
    private final HornetQSecurityManager securityManager;
@@ -165,7 +170,7 @@
    private volatile QueueFactory queueFactory;
 
    private volatile PagingManager pagingManager;
- 
+
    private volatile PostOffice postOffice;
 
    private volatile ExecutorService threadPool;
@@ -187,7 +192,7 @@
    private volatile RemotingService remotingService;
 
    private volatile ManagementService managementService;
-   
+
    private volatile ConnectorsService connectorsService;
 
    private MemoryManager memoryManager;
@@ -217,9 +222,9 @@
    private final Set<ActivateCallback> activateCallbacks = new HashSet<ActivateCallback>();
 
    private volatile GroupingHandler groupingHandler;
-   
+
    private NodeManager nodeManager;
-   
+
    // Used to identify the server on tests... useful on debugging testcases
    private String identity;
 
@@ -352,7 +357,7 @@
             nodeManager.startLiveNode();
 
             initialisePart2();
-            
+
             log.info("Server is now live");
          }
          catch (Exception e)
@@ -392,11 +397,11 @@
             log.info("HornetQ Backup Server version " + getVersion().getFullVersion() + " [" + nodeManager.getNodeId() + "] started, waiting live to fail before it gets active");
 
             nodeManager.awaitLiveNode();
-            
+
             configuration.setBackup(false);
-            
+
             initialisePart2();
-            
+
             clusterManager.activate();
 
             log.info("Backup Server is now live");
@@ -511,11 +516,48 @@
       {
          try
          {
-            // TODO
+            nodeManager.startBackup();
 
+            initialisePart1();
+
+            clusterManager.start();
             // Try-Connect to live server using live-connector-ref
+            String liveConnectorsName = configuration.getLiveConnectorName();
+            if (liveConnectorsName == null)
+            {
+               throw new IllegalArgumentException("Cannot have a replicated backup without configuring its live-server!");
+            }
+            final TransportConfiguration config = configuration.getConnectorConfigurations().get(liveConnectorsName);
+            log.info("config is " + config);
+            final ServerLocatorInternal serverLocator =
+                     (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(config);
 
+            // XXX Need to retry the connection a couple of times
             // sit in loop and try and connect, if server is not live then it will return NOT_LIVE
+            final ClientSessionFactory liveServerSessionFactory = serverLocator.connect();
+
+            if (liveServerSessionFactory != null)
+            {
+               log.debug("announce backup to live-server");
+               liveServerSessionFactory.getConnection()
+                                       .getChannel(0, -1)
+                                       .send(new NodeAnnounceMessage(getNodeID().toString(), true, config));
+               log.info("backup announced");
+            }
+
+            started = true;
+
+            log.info("HornetQ Backup Server version " + getVersion().getFullVersion() + " [" + nodeManager.getNodeId() +
+                     "] started, waiting live to fail before it gets active");
+
+            nodeManager.awaitLiveNode();
+
+            // XXX ???
+            configuration.setBackup(false);
+
+            // XXX
+
+            initialisePart2();
          }
          catch (Exception e)
          {
@@ -523,7 +565,7 @@
          }
       }
 
-      public void close(boolean permanently) throws Exception
+      public void close(final boolean permanently) throws Exception
       {
       }
    }
@@ -556,7 +598,7 @@
 
          test.run();
       }
-      
+
       if (!configuration.isBackup())
       {
          if (configuration.isSharedStore() && configuration.isPersistenceEnabled())
@@ -577,10 +619,9 @@
 
          HornetQServerImpl.log.info("HornetQ Server version " + getVersion().getFullVersion() + " [" + nodeManager.getNodeId() + "] started");
       }
-
-
-      if (configuration.isBackup())
+      else
       {
+         // server is Backup
          if (configuration.isSharedStore())
          {
             activation = new SharedStoreBackupActivation();
@@ -588,7 +629,6 @@
          else
          {
             // Replicated
-
             activation = new SharedNothingBackupActivation();
          }
 
@@ -635,14 +675,14 @@
             managementService.removeNotificationListener(groupingHandler);
             groupingHandler = null;
          }
-         
+
          if (clusterManager != null)
          {
             clusterManager.stop();
          }
 
       }
-      
+
       // We close all the exception in an attempt to let any pending IO to finish
       // to avoid scenarios where the send or ACK got to disk but the response didn't get to the client
       // It may still be possible to have this scenario on a real failure (without the use of XA)
@@ -729,9 +769,9 @@
          {
             memoryManager.stop();
          }
-         
+
          threadPool.shutdown();
-         
+
          scheduledPool.shutdown();
 
          try
@@ -747,7 +787,7 @@
          }
          threadPool = null;
 
-         
+
          try
          {
             if (!scheduledPool.awaitTermination(10, TimeUnit.SECONDS))
@@ -761,7 +801,7 @@
          }
 
          threadPool = null;
-         
+
          scheduledPool = null;
 
          pagingManager = null;
@@ -805,22 +845,22 @@
    // HornetQServer implementation
    // -----------------------------------------------------------
 
-   
+
    public void setIdentity(String identity)
    {
       this.identity = identity;
    }
-   
+
    public String getIdentity()
    {
       return identity;
    }
-   
+
    public ScheduledExecutorService getScheduledPool()
    {
       return scheduledPool;
    }
-   
+
    public Configuration getConfiguration()
    {
       return configuration;
@@ -830,7 +870,7 @@
    {
       return mbeanServer;
    }
-   
+
    public PagingManager getPagingManager()
    {
       return pagingManager;
@@ -860,7 +900,7 @@
    {
       return securityRepository;
    }
-   
+
    public NodeManager getNodeManager()
    {
       return nodeManager;
@@ -1025,18 +1065,18 @@
    {
       return createQueue(address, queueName, filterString, durable, temporary, false);
    }
-   
+
    public Queue locateQueue(SimpleString queueName) throws Exception
    {
       Binding binding = postOffice.getBinding(queueName);
-      
+
       Bindable queue = binding.getBindable();
-      
+
       if (!(queue instanceof Queue))
       {
          throw new IllegalStateException("locateQueue should only be used to locate queues");
       }
-      
+
       return (Queue) binding.getBindable();
    }
 
@@ -1063,7 +1103,7 @@
       }
 
       Queue queue = (Queue)binding.getBindable();
-      
+
       if (queue.getPageSubscription() != null)
       {
          queue.getPageSubscription().close();
@@ -1177,7 +1217,7 @@
 
    protected PagingManager createPagingManager()
    {
-      
+
       return new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(),
                                                              configuration.getJournalBufferSize_NIO(),
                                                              scheduledPool,
@@ -1187,8 +1227,8 @@
                                    addressSettingsRepository);
    }
 
-   /** 
-    * This method is protected as it may be used as a hook for creating a custom storage manager (on tests for instance) 
+   /**
+    * This method is protected as it may be used as a hook for creating a custom storage manager (on tests for instance)
     */
    protected StorageManager createStorageManager()
    {
@@ -1392,7 +1432,7 @@
 
          addressSettingsDeployer.start();
       }
-      
+
       deployAddressSettingsFromConfiguration();
 
       storageManager.start();
@@ -1450,7 +1490,7 @@
       // Load the journal and populate queues, transactions and caches in memory
 
       pagingManager.reloadStores();
-      
+
       JournalLoadInformation[] journalInfo = loadJournals();
 
       compareJournals(journalInfo);
@@ -1488,7 +1528,7 @@
       // this needs to be done before clustering is fully activated
       callActivateCallbacks();
 
-      // Deply any pre-defined diverts
+      // Deploy any pre-defined diverts
       deployDiverts();
 
       if (deploymentManager != null)
@@ -1564,11 +1604,11 @@
       for (QueueBindingInfo queueBindingInfo : queueBindingInfos)
       {
          queueBindingInfosMap.put(queueBindingInfo.getId(), queueBindingInfo);
-         
+
          Filter filter = FilterImpl.createFilter(queueBindingInfo.getFilterString());
 
          PageSubscription subscription = pagingManager.getPageStore(queueBindingInfo.getAddress()).getCursorProvier().createSubscription(queueBindingInfo.getId(), filter, true);
-         
+
          Queue queue = queueFactory.createQueue(queueBindingInfo.getId(),
                                                 queueBindingInfo.getAddress(),
                                                 queueBindingInfo.getQueueName(),
@@ -1585,8 +1625,8 @@
 
          managementService.registerAddress(queueBindingInfo.getAddress());
          managementService.registerQueue(queue, queueBindingInfo.getAddress(), storageManager);
-         
-         
+
+
       }
 
       for (GroupingInfo groupingInfo : groupingInfos)
@@ -1672,12 +1712,12 @@
       }
 
       Filter filter = FilterImpl.createFilter(filterString);
-      
+
       long queueID = storageManager.generateUniqueID();
 
       PageSubscription pageSubscription;
-      
-      
+
+
       if (filterString != null && filterString.toString().equals(GENERIC_IGNORED_FILTER))
       {
          pageSubscription = null;
@@ -1771,7 +1811,7 @@
 
       managementService.registerDivert(divert, config);
    }
-   
+
    public void destroyDivert(SimpleString name) throws Exception
    {
       Binding binding = postOffice.getBinding(name);
@@ -1814,7 +1854,7 @@
          managementService.addNotificationListener(groupingHandler);
       }
    }
-   
+
    public void deployBridge(BridgeConfiguration config) throws Exception
    {
       if (clusterManager != null)
@@ -1822,7 +1862,7 @@
          clusterManager.deployBridge(config);
       }
    }
-   
+
    public void destroyBridge(String name) throws Exception
    {
       if (clusterManager != null)
@@ -1875,7 +1915,7 @@
    {
       return sessions.get(sessionName);
    }
-   
+
    /**
     * Check if journal directory exists or create it (if configured to do so)
     */
@@ -1896,7 +1936,7 @@
          }
       }
    }
-   
+
    @Override
    public String toString()
    {

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java	2011-06-27 14:14:47 UTC (rev 10885)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java	2011-06-27 14:18:52 UTC (rev 10886)
@@ -56,6 +56,7 @@
    // Constants -----------------------------------------------------
 
    protected static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+   protected static final String LIVE_NODE_NAME = "hqLIVE";
 
    // Attributes ----------------------------------------------------
 
@@ -167,13 +168,15 @@
       config1.setSecurityEnabled(false);
       config1.setSharedStore(false);
       config1.setBackup(true);
+      config1.setLiveConnectorName(LIVE_NODE_NAME);
       backupConfig = config1;
+
       backupServer = createBackupServer();
 
       Configuration config0 = super.createDefaultConfig();
       config0.getAcceptorConfigurations().clear();
       config0.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
-
+      config0.setName(LIVE_NODE_NAME);
       config0.getConnectorConfigurations().put("toBackup", getConnectorTransportConfiguration(false));
       //liveConfig.setBackupConnectorName("toBackup");
       config0.setSecurityEnabled(false);
@@ -181,8 +184,8 @@
       liveConfig = config0;
       liveServer = createLiveServer();
 
+      liveServer.start();
       backupServer.start();
-      liveServer.start();
    }
 
    @Override



More information about the hornetq-commits mailing list