[hornetq-commits] JBoss hornetq SVN: r10893 - in branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core: protocol/core/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Jun 28 11:21:14 EDT 2011


Author: borges
Date: 2011-06-28 11:21:14 -0400 (Tue, 28 Jun 2011)
New Revision: 10893

Modified:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HORNETQ-720 Trigger replication from 'live'
(which fails as current code requires it to be started from the backup).

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-06-28 15:19:24 UTC (rev 10892)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-06-28 15:21:14 UTC (rev 10893)
@@ -18,8 +18,19 @@
 import java.net.InetAddress;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 
 import org.hornetq.api.core.DiscoveryGroupConfiguration;
 import org.hornetq.api.core.HornetQException;
@@ -57,13 +68,13 @@
 
    private final Set<ClusterTopologyListener> topologyListeners = new HashSet<ClusterTopologyListener>();
 
-   private Set<ClientSessionFactory> factories = new HashSet<ClientSessionFactory>();
+   private final Set<ClientSessionFactory> factories = new HashSet<ClientSessionFactory>();
 
    private TransportConfiguration[] initialConnectors;
 
-   private DiscoveryGroupConfiguration discoveryGroupConfiguration;
+   private final DiscoveryGroupConfiguration discoveryGroupConfiguration;
 
-   private StaticConnector staticConnector = new StaticConnector();
+   private final StaticConnector staticConnector = new StaticConnector();
 
    private Topology topology = new Topology();
 
@@ -158,14 +169,14 @@
    private boolean backup;
 
    private final Exception e = new Exception();
-   
+
    // To be called when there are ServerLocator being finalized.
    // To be used on test assertions
    public static Runnable finalizeCallback = null;
-   
+
    public static synchronized void clearThreadPools()
    {
-      
+
       if (globalThreadPool != null)
       {
          globalThreadPool.shutdown();
@@ -184,7 +195,7 @@
             globalThreadPool = null;
          }
       }
-      
+
       if (globalScheduledThreadPool != null)
       {
          globalScheduledThreadPool.shutdown();
@@ -1318,8 +1329,9 @@
       }
    }
 
-   class StaticConnector implements Serializable
+   final class StaticConnector implements Serializable
    {
+      private static final long serialVersionUID = 959566606042156036L;
       private List<Connector> connectors;
 
       public ClientSessionFactory connect() throws HornetQException
@@ -1344,14 +1356,14 @@
 
          try
          {
-            
+
             List<Future<ClientSessionFactory>> futuresList = new ArrayList<Future<ClientSessionFactory>>();
-            
+
             for (Connector conn : connectors)
             {
                futuresList.add(threadPool.submit(conn));
             }
-            
+
             for (int i = 0, futuresSize = futuresList.size(); i < futuresSize; i++)
             {
                Future<ClientSessionFactory> future = futuresList.get(i);
@@ -1359,7 +1371,9 @@
                {
                   csf = future.get();
                   if (csf != null)
+                  {
                      break;
+                  }
                }
                catch (Exception e)
                {
@@ -1415,6 +1429,7 @@
          }
       }
 
+      @Override
       public void finalize() throws Throwable
       {
          if (!closed && finalizeCheck)
@@ -1423,7 +1438,7 @@
                      System.identityHashCode(this));
 
             log.warn("The ServerLocator you didn't close was created here:", e);
-            
+
             if (ServerLocatorImpl.finalizeCallback != null)
             {
                ServerLocatorImpl.finalizeCallback.run();
@@ -1437,7 +1452,7 @@
 
       class Connector implements Callable<ClientSessionFactory>
       {
-         private TransportConfiguration initialConnector;
+         private final TransportConfiguration initialConnector;
 
          private volatile ClientSessionFactoryInternal factory;
 

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-06-28 15:19:24 UTC (rev 10892)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-06-28 15:21:14 UTC (rev 10893)
@@ -54,6 +54,9 @@
 
    private final List<Interceptor> interceptors;
 
+   private final Map<String, ServerSessionPacketHandler> sessionHandlers =
+            new ConcurrentHashMap<String, ServerSessionPacketHandler>();
+
    public CoreProtocolManager(final HornetQServer server, final List<Interceptor> interceptors)
    {
       this.server = server;
@@ -153,8 +156,18 @@
             else if (packet.getType() == PacketImpl.HA_BACKUP_REGISTRATION)
             {
                HaBackupRegistrationMessage msg = (HaBackupRegistrationMessage)packet;
-               System.out.println("HA_BACKUP_REGISTRATION: " + msg);
+               System.out.println("HA_BACKUP_REGISTRATION: " + msg + " connector=" + msg.getConnector());
                System.out.println("HA_BR: " + server.getIdentity() + ", toString=" + server);
+               try
+               {
+                  server.addHaBackup(msg.getConnector());
+               }
+               catch (Exception e)
+               {
+                  // XXX This is not what we want
+                  e.printStackTrace();
+                  throw new RuntimeException(e);
+               }
             }
          }
       });
@@ -164,8 +177,6 @@
       return entry;
    }
 
-   private final Map<String, ServerSessionPacketHandler> sessionHandlers = new ConcurrentHashMap<String, ServerSessionPacketHandler>();
-
    public ServerSessionPacketHandler getSessionHandler(final String sessionName)
    {
       return sessionHandlers.get(sessionName);

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java	2011-06-28 15:19:24 UTC (rev 10892)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java	2011-06-28 15:21:14 UTC (rev 10893)
@@ -20,6 +20,7 @@
 import javax.management.MBeanServer;
 
 import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.core.config.BridgeConfiguration;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.DivertConfiguration;
@@ -49,23 +50,23 @@
  * This interface defines the internal interface of the HornetQ Server exposed to other components of the server. The
  * external management interface of the HornetQ Server is defined by the HornetQServerManagement interface This
  * interface is never exposed outside the HornetQ server, e.g. by JMX or other means
- * 
+ *
  * @author <a href="tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="ataylor at redhat.com">Andy Taylor</a>
  */
 public interface HornetQServer extends HornetQComponent
 {
-   
+
    void setIdentity(String identity);
-   
+
    String getIdentity();
-   
+
    Configuration getConfiguration();
 
    RemotingService getRemotingService();
 
    StorageManager getStorageManager();
-   
+
    PagingManager getPagingManager();
 
    ManagementService getManagementService();
@@ -75,12 +76,12 @@
    MBeanServer getMBeanServer();
 
    Version getVersion();
-   
+
    NodeManager getNodeManager();
 
    /**
     * Returns the resource to manage this HornetQ server.
-    * 
+    *
     * Using this control will throw IllegalStateException if the
     * server is not properly started.
     */
@@ -90,7 +91,7 @@
 
    void unregisterActivateCallback(ActivateCallback callback);
 
-   /** The journal at the backup server has to be equivalent as the journal used on the live node. 
+   /** The journal at the backup server has to be equivalent as the journal used on the live node.
     *  Or else the backup node is out of sync. */
    ReplicationEndpoint connectToReplicationEndpoint(Channel channel) throws Exception;
 
@@ -143,31 +144,31 @@
                      SimpleString filterString,
                      boolean durable,
                      boolean temporary) throws Exception;
-   
+
    Queue locateQueue(SimpleString queueName) throws Exception;
 
    void destroyQueue(SimpleString queueName, ServerSession session) throws Exception;
 
    ScheduledExecutorService getScheduledPool();
-   
+
    ExecutorFactory getExecutorFactory();
 
    void setGroupingHandler(GroupingHandler groupingHandler);
 
    GroupingHandler getGroupingHandler();
-   
+
    ReplicationEndpoint getReplicationEndpoint();
-   
+
    ReplicationManager getReplicationManager();
 
-   boolean checkActivate() throws Exception;      
-   
+   boolean checkActivate() throws Exception;
+
    void deployDivert(DivertConfiguration config) throws Exception;
 
    void destroyDivert(SimpleString name) throws Exception;
 
    ConnectorsService getConnectorsService();
-   
+
    void deployBridge(BridgeConfiguration config) throws Exception;
 
    void destroyBridge(String name) throws Exception;
@@ -175,4 +176,10 @@
    ServerSession getSessionByID(String sessionID);
 
    void stop(boolean failoverOnServerShutdown) throws Exception;
+
+   /**
+    * @param connector
+    * @throws Exception
+    */
+   void addHaBackup(TransportConfiguration connector) throws Exception;
 }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-06-28 15:19:24 UTC (rev 10892)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-06-28 15:21:14 UTC (rev 10893)
@@ -41,8 +41,10 @@
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
 import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
 import org.hornetq.core.config.BridgeConfiguration;
 import org.hornetq.core.config.Configuration;
@@ -85,6 +87,7 @@
 import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
 import org.hornetq.core.replication.ReplicationEndpoint;
 import org.hornetq.core.replication.ReplicationManager;
+import org.hornetq.core.replication.impl.ReplicationManagerImpl;
 import org.hornetq.core.security.CheckType;
 import org.hornetq.core.security.Role;
 import org.hornetq.core.security.SecurityStore;
@@ -119,6 +122,7 @@
 import org.hornetq.spi.core.protocol.RemotingConnection;
 import org.hornetq.spi.core.protocol.SessionCallback;
 import org.hornetq.spi.core.security.HornetQSecurityManager;
+import org.hornetq.utils.ConcurrentHashSet;
 import org.hornetq.utils.ExecutorFactory;
 import org.hornetq.utils.HornetQThreadFactory;
 import org.hornetq.utils.OrderedExecutorFactory;
@@ -209,6 +213,8 @@
 
    private final Map<String, ServerSession> sessions = new ConcurrentHashMap<String, ServerSession>();
 
+   private final Set<String> sharedNothingBackups = new ConcurrentHashSet<String>();
+
    private final Object initialiseLock = new Object();
 
    private boolean initialised;
@@ -576,6 +582,8 @@
 
    private Activation activation;
 
+   private ServerLocator serverLocator;
+
    public synchronized void start() throws Exception
    {
       initialiseLogging();
@@ -1247,31 +1255,23 @@
    // Private
    // --------------------------------------------------------------------------------------
 
-   // private boolean startReplication() throws Exception
-   // {
-   // // get list of backup names!
-   //
-   // if (!configuration.isSharedStore() && backupConnectorName != null)
-   // {
-   // TransportConfiguration backupConnector =
-   // configuration.getConnectorConfigurations().get(backupConnectorName);
-   //
-   // if (backupConnector == null)
-   // {
-   // HornetQServerImpl.log.warn("connector with name '" + backupConnectorName +
-   // "' is not defined in the configuration.");
-   // return false;
-   // }
-   // replicationFailoverManager = createBackupConnectionFailoverManager(backupConnector,
-   // threadPool, scheduledPool);
-   //
-   // replicationManager = new ReplicationManagerImpl(replicationFailoverManager, executorFactory);
-   // replicationManager.start();
-   // }
-   //
-   // return true;
-   // }
+   private boolean startReplication(TransportConfiguration connector) throws Exception
+   {
+      assert !configuration.isSharedStore();
+      if (configuration.isSharedStore() || connector == null)
+      {
+         return true;
+      }
 
+      serverLocator = HornetQClient.createServerLocatorWithHA(connector);
+      ClientSessionFactoryInternal replicationFailoverManager =
+               (ClientSessionFactoryInternal)serverLocator.createSessionFactory(connector);
+      replicationManager = new ReplicationManagerImpl(replicationFailoverManager, executorFactory);
+      replicationManager.start();
+
+      return true;
+   }
+
    private void callActivateCallbacks()
    {
       for (ActivateCallback callback : activateCallbacks)
@@ -1942,6 +1942,12 @@
       return "HornetQServerImpl::" + (identity == null ? "" : (identity + ", ")) + (nodeManager != null ? ("serverUUID=" + nodeManager.getUUID()) : "");
    }
 
-   // Inner classes
-   // --------------------------------------------------------------------------------
+   @Override
+   public void addHaBackup(TransportConfiguration connector) throws Exception
+   {
+      log.info(connector + " " + connector.getFactoryClassName() + " " + connector.getParams() + " " +
+               replicationManager);
+      startReplication(connector);
+      // throw new UnsupportedOperationException("unimplemented");
+   }
 }



More information about the hornetq-commits mailing list