[hornetq-commits] JBoss hornetq SVN: r11637 - in trunk: hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal and 8 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Nov 2 12:18:23 EDT 2011


Author: borges
Date: 2011-11-02 12:18:22 -0400 (Wed, 02 Nov 2011)
New Revision: 11637

Modified:
   trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/server/NodeManager.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
Log:
HORNETQ-720 Send live's nodeID to the backup at the end of synchronization.

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java	2011-11-02 16:17:48 UTC (rev 11636)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java	2011-11-02 16:18:22 UTC (rev 11637)
@@ -22,6 +22,7 @@
 
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.journal.Journal;
 import org.hornetq.core.journal.JournalLoadInformation;
@@ -42,6 +43,7 @@
 import org.hornetq.core.server.RouteContextList;
 import org.hornetq.core.server.RoutingContext;
 import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.cluster.ClusterConnection;
 import org.hornetq.core.server.group.impl.GroupBinding;
 import org.hornetq.core.transaction.ResourceManager;
 import org.hornetq.core.transaction.Transaction;
@@ -70,9 +72,9 @@
 
    /** Set the context back to the thread */
    void setContext(OperationContext context);
-   
+
    /**
-    * 
+    *
     * @param ioCriticalError is the server being stopped due to an IO critical error
     */
    void stop(boolean ioCriticalError) throws Exception;
@@ -109,7 +111,7 @@
 
    /** Confirms that a large message was finished */
    void confirmPendingLargeMessageTX(Transaction transaction, long messageID, long recordID) throws Exception;
-   
+
    /** Confirms that a large message was finished */
    void confirmPendingLargeMessage(long recordID) throws Exception;
 
@@ -157,7 +159,7 @@
     * @param message This is a temporary message that holds the parsed properties.
     *        The remoting layer can't create a ServerMessage directly, then this will be replaced.
     * @return
-    * @throws Exception 
+    * @throws Exception
     */
    LargeServerMessage createLargeMessage(long id, MessageInternal message) throws Exception;
 
@@ -249,9 +251,14 @@
    /**
     * @param replicationManager
     * @param pagingManager
+    * @param nodeID
+    * @param clusterConnection
+    * @param pair
     * @throws Exception
     */
-   void startReplication(ReplicationManager replicationManager, PagingManager pagingManager) throws Exception;
+   void startReplication(ReplicationManager replicationManager, PagingManager pagingManager, String nodeID,
+      ClusterConnection clusterConnection, Pair<TransportConfiguration, TransportConfiguration> pair)
+      throws Exception;
 
    /**
     * Adds message to page if we are paging.

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-11-02 16:17:48 UTC (rev 11636)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-11-02 16:18:22 UTC (rev 11637)
@@ -42,6 +42,7 @@
 import org.hornetq.api.core.Message;
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.filter.Filter;
@@ -90,6 +91,7 @@
 import org.hornetq.core.server.RouteContextList;
 import org.hornetq.core.server.RoutingContext;
 import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.cluster.ClusterConnection;
 import org.hornetq.core.server.group.impl.GroupBinding;
 import org.hornetq.core.server.impl.ServerMessageImpl;
 import org.hornetq.core.transaction.ResourceManager;
@@ -219,9 +221,11 @@
     private boolean journalLoaded = false;
 
     // Persisted core configuration
-    private final Map<SimpleString, PersistedRoles> mapPersistedRoles = new ConcurrentHashMap<SimpleString, PersistedRoles>();
+   private final Map<SimpleString, PersistedRoles> mapPersistedRoles =
+            new ConcurrentHashMap<SimpleString, PersistedRoles>();
 
-    private final Map<SimpleString, PersistedAddressSetting> mapPersistedAddressSettings = new ConcurrentHashMap<SimpleString, PersistedAddressSetting>();
+   private final Map<SimpleString, PersistedAddressSetting> mapPersistedAddressSettings =
+            new ConcurrentHashMap<SimpleString, PersistedAddressSetting>();
 
    public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory,
                                 final IOCriticalErrorListener criticalErrorListener)
@@ -231,24 +235,24 @@
 
    public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory,
                                 final ReplicationManager replicator, final IOCriticalErrorListener criticalErrorListener)
-    {
-        this.executorFactory = executorFactory;
+   {
+      this.executorFactory = executorFactory;
 
-        executor = executorFactory.getExecutor();
+      executor = executorFactory.getExecutor();
 
-        this.replicator = replicator;
+      this.replicator = replicator;
 
-        if (config.getJournalType() != JournalType.NIO && config.getJournalType() != JournalType.ASYNCIO)
-            {
-                throw new IllegalArgumentException("Only NIO and AsyncIO are supported journals");
-            }
+      if (config.getJournalType() != JournalType.NIO && config.getJournalType() != JournalType.ASYNCIO)
+      {
+         throw new IllegalArgumentException("Only NIO and AsyncIO are supported journals");
+      }
 
-        bindingsDir = config.getBindingsDirectory();
+      bindingsDir = config.getBindingsDirectory();
 
-        if (bindingsDir == null)
-            {
-                throw new NullPointerException("bindings-dir is null");
-            }
+      if (bindingsDir == null)
+      {
+         throw new NullPointerException("bindings-dir is null");
+      }
 
         createBindingsDir = config.isCreateBindingsDir();
 
@@ -306,11 +310,11 @@
                                                          config.getJournalBufferTimeout_NIO(),
                                                          config.isLogJournalWriteRate(),
                                                          criticalErrorListener);
-            }
-        else
-            {
-                throw new IllegalArgumentException("Unsupported journal type " + config.getJournalType());
-            }
+      }
+      else
+      {
+         throw new IllegalArgumentException("Unsupported journal type " + config.getJournalType());
+      }
 
       idGenerator = new BatchingIDGenerator(0, JournalStorageManager.CHECKPOINT_BATCH_SIZE, bindingsJournal);
         Journal localMessage = new JournalImpl(config.getJournalFileSize(),
@@ -323,22 +327,20 @@
                                                config.getJournalType() == JournalType.ASYNCIO ? config.getJournalMaxIO_AIO()
                                                : config.getJournalMaxIO_NIO());
 
-        if (replicator != null)
-            {
-                messageJournal = new ReplicatedJournal((byte)1, localMessage, replicator);
-            }
-        else
-            {
-                messageJournal = localMessage;
-            }
+      if (replicator != null)
+      {
+         messageJournal = new ReplicatedJournal((byte)1, localMessage, replicator);
+      }
+      else
+      {
+         messageJournal = localMessage;
+      }
 
-        largeMessagesDirectory = config.getLargeMessagesDirectory();
+      largeMessagesDirectory = config.getLargeMessagesDirectory();
+      largeMessagesFactory = new NIOSequentialFileFactory(largeMessagesDirectory, false, criticalErrorListener);
+      perfBlastPages = config.getJournalPerfBlastPages();
+   }
 
-        largeMessagesFactory = new NIOSequentialFileFactory(largeMessagesDirectory, false, criticalErrorListener);
-
-        perfBlastPages = config.getJournalPerfBlastPages();
-    }
-
     public void clearContext()
     {
         OperationContextImpl.clearContext();
@@ -355,7 +357,9 @@
     * @throws HornetQException
     */
    @Override
-   public void startReplication(ReplicationManager replicationManager, PagingManager pagingManager) throws Exception
+   public void startReplication(ReplicationManager replicationManager, PagingManager pagingManager, String nodeID,
+      ClusterConnection clusterConnection, Pair<TransportConfiguration, TransportConfiguration> pair)
+      throws Exception
    {
       if (!started)
       {
@@ -418,7 +422,8 @@
       storageManagerLock.writeLock().lock();
       try
       {
-         replicator.sendSynchronizationDone();
+         replicator.sendSynchronizationDone(nodeID);
+         clusterConnection.nodeAnnounced(System.currentTimeMillis(), nodeID, pair, true);
          // XXX HORNETQ-720 SEND a compare journals message?
       }
       finally

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2011-11-02 16:17:48 UTC (rev 11636)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2011-11-02 16:18:22 UTC (rev 11637)
@@ -24,6 +24,7 @@
 
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.journal.Journal;
 import org.hornetq.core.journal.JournalLoadInformation;
@@ -47,6 +48,7 @@
 import org.hornetq.core.server.RouteContextList;
 import org.hornetq.core.server.RoutingContext;
 import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.cluster.ClusterConnection;
 import org.hornetq.core.server.group.impl.GroupBinding;
 import org.hornetq.core.transaction.ResourceManager;
 import org.hornetq.core.transaction.Transaction;
@@ -580,16 +582,10 @@
    {
    }
 
-   /* (non-Javadoc)
-    * @see org.hornetq.core.persistence.StorageManager#confirmPendingLargeMessage(long)
-    */
    public void confirmPendingLargeMessage(long recordID) throws Exception
    {
    }
 
-   /* (non-Javadoc)
-    * @see org.hornetq.core.persistence.StorageManager#stop(boolean)
-    */
    public void stop(boolean ioCriticalError) throws Exception
    {
    }
@@ -607,7 +603,8 @@
    }
 
    @Override
-   public void startReplication(ReplicationManager replicationManager, PagingManager pagingManager) throws Exception
+   public void startReplication(ReplicationManager replicationManager, PagingManager pagingManager, String nodeID,
+      ClusterConnection clusterConnection, Pair<TransportConfiguration, TransportConfiguration> pair) throws Exception
    {
       // no-op
    }
@@ -621,5 +618,4 @@
    {
       return false;
    }
-
  }

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-11-02 16:17:48 UTC (rev 11636)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-11-02 16:18:22 UTC (rev 11637)
@@ -208,14 +208,9 @@
             } else if (packet.getType() == PacketImpl.BACKUP_REGISTRATION)
             {
                BackupRegistrationMessage msg = (BackupRegistrationMessage)packet;
-               if (server.startReplication(rc))
+               if (server.startReplication(rc, acceptorUsed.getClusterConnection(), getPair(msg.getConnector(), true)))
                {
-                  /*
-                   * HORNETQ-720 Instantiate a new server locator to call notifyNodeUp(...)? Or send
-                   * a CLUSTER_TOPOLOGY(_2?) message?
-                   */
-                  acceptorUsed.getClusterConnection().nodeAnnounced(System.currentTimeMillis(), msg.getNodeID(),
-                                                                    getPair(msg.getConnector(), true), true);
+                  // XXX if it fails, the backup should get to know it
                }
             }
          }

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java	2011-11-02 16:17:48 UTC (rev 11636)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java	2011-11-02 16:18:22 UTC (rev 11637)
@@ -547,7 +547,7 @@
             packet = new BackupRegistrationMessage();
             break;
          }
-         case PacketImpl.REPLICATION_START_STOP_SYNC:
+         case PacketImpl.REPLICATION_START_FINISH_SYNC:
          {
             packet = new ReplicationStartSyncMessage();
             break;

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java	2011-11-02 16:17:48 UTC (rev 11636)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java	2011-11-02 16:18:22 UTC (rev 11637)
@@ -204,7 +204,7 @@
 
    public static final byte BACKUP_REGISTRATION = 115;
 
-   public static final byte REPLICATION_START_STOP_SYNC = 120;
+   public static final byte REPLICATION_START_FINISH_SYNC = 120;
  
    // Static --------------------------------------------------------
 

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java	2011-11-02 16:17:48 UTC (rev 11636)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java	2011-11-02 16:18:22 UTC (rev 11637)
@@ -16,20 +16,24 @@
    private long[] ids;
    private JournalContent journalType;
    private boolean synchronizationIsFinished;
+   private String nodeID;
 
    public ReplicationStartSyncMessage()
    {
-      super(REPLICATION_START_STOP_SYNC);
+      super(REPLICATION_START_FINISH_SYNC);
    }
 
+   public ReplicationStartSyncMessage(String nodeID)
+   {
+      this();
+      synchronizationIsFinished = true;
+      this.nodeID = nodeID;
+   }
+
    public ReplicationStartSyncMessage(JournalFile[] datafiles, JournalContent contentType)
    {
       this();
-      if (datafiles == null && contentType == null)
-      {
-         synchronizationIsFinished = true;
-         return;
-      }
+      synchronizationIsFinished = false;
       ids = new long[datafiles.length];
       for (int i = 0; i < datafiles.length; i++)
       {
@@ -43,7 +47,10 @@
    {
       buffer.writeBoolean(synchronizationIsFinished);
       if (synchronizationIsFinished)
+      {
+         buffer.writeString(nodeID);
          return;
+      }
       buffer.writeByte(journalType.typeByte);
       buffer.writeInt(ids.length);
       for (long id : ids)
@@ -57,7 +64,10 @@
    {
       synchronizationIsFinished = buffer.readBoolean();
       if (synchronizationIsFinished)
+      {
+         nodeID = buffer.readString();
          return;
+      }
       journalType = JournalContent.getType(buffer.readByte());
       int length = buffer.readInt();
       ids = new long[length];
@@ -85,4 +95,9 @@
    {
       return ids;
    }
+
+   public String getNodeID()
+   {
+      return nodeID;
+   }
 }

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java	2011-11-02 16:17:48 UTC (rev 11636)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java	2011-11-02 16:18:22 UTC (rev 11637)
@@ -98,9 +98,11 @@
    /**
     * Informs backup that data synchronization is done.
     * <p>
-    * So if 'live' fails, the (up-to-date) backup now may take over its duties.
+    * So if 'live' fails, the (up-to-date) backup now may take over its duties. To do so, it must
+    * know which is the live's {@code nodeID}.
+    * @param nodeID
     */
-   void sendSynchronizationDone();
+   void sendSynchronizationDone(String nodeID);
 
    /**
     * Sends the whole content of the file to be duplicated.

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-11-02 16:17:48 UTC (rev 11636)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-11-02 16:18:22 UTC (rev 11637)
@@ -88,11 +88,6 @@
 
    private final IOCriticalErrorListener criticalErrorListener;
 
-   private static void trace(final String msg)
-   {
-      ReplicationEndpointImpl.log.trace(msg);
-   }
-
    private final HornetQServerImpl server;
 
    private Channel channel;
@@ -152,10 +147,6 @@
       journals[id] = journal;
    }
 
-   /*
-    * (non-Javadoc)
-    * @see org.hornetq.core.remoting.ChannelHandler#handlePacket(org.hornetq.core.remoting.Packet)
-    */
    public void handlePacket(final Packet packet)
    {
       PacketImpl response = new ReplicationResponseMessage();
@@ -212,7 +203,7 @@
             handleCompareDataMessage((ReplicationCompareDataMessage)packet);
             response = new NullResponseMessage();
          }
-         else if (type == PacketImpl.REPLICATION_START_STOP_SYNC)
+         else if (type == PacketImpl.REPLICATION_START_FINISH_SYNC)
          {
             handleStartReplicationSynchronization((ReplicationStartSyncMessage)packet);
          }
@@ -425,7 +416,7 @@
 
    // Private -------------------------------------------------------
 
-   private void finishSynchronization() throws Exception
+   private void finishSynchronization(String nodeID) throws Exception
    {
       for (JournalContent jc : EnumSet.allOf(JournalContent.class))
       {
@@ -486,7 +477,7 @@
          }
       }
       journalsHolder = null;
-      server.setRemoteBackupUpToDate();
+      server.setRemoteBackupUpToDate(nodeID);
       log.info("Backup server " + server + " is synchronized with live-server.");
       return;
    }
@@ -565,7 +556,7 @@
 
       if (packet.isSynchronizationFinished())
       {
-         finishSynchronization();
+         finishSynchronization(packet.getNodeID());
          return;
       }
 

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-11-02 16:17:48 UTC (rev 11636)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-11-02 16:18:22 UTC (rev 11637)
@@ -480,7 +480,6 @@
 
    private static class NullEncoding implements EncodingSupport
    {
-
       static NullEncoding instance = new NullEncoding();
 
       public void decode(final HornetQBuffer buffer)
@@ -495,7 +494,6 @@
       {
          return 0;
       }
-
    }
 
    @Override
@@ -528,8 +526,8 @@
    @Override
    public void syncPages(SequentialFile file, long id, SimpleString queueName) throws Exception
    {
-   if (enabled)
-      sendLargeFile(null, queueName, id, file, Long.MAX_VALUE);
+      if (enabled)
+         sendLargeFile(null, queueName, id, file, Long.MAX_VALUE);
    }
 
    /**
@@ -541,10 +539,7 @@
     * @param maxBytesToSend maximum number of bytes to read and send from the file
     * @throws Exception
     */
-   private void sendLargeFile(JournalContent content,
-      SimpleString pageStore,
-      final long id,
-      SequentialFile file,
+   private void sendLargeFile(JournalContent content, SimpleString pageStore, final long id, SequentialFile file,
       long maxBytesToSend) throws Exception
    {
       if (!enabled)
@@ -564,23 +559,23 @@
             int toSend = bytesRead;
             if (bytesRead > 0)
             {
-            if (bytesRead >= maxBytesToSend)
-            {
-               toSend = (int)maxBytesToSend;
-               maxBytesToSend = 0;
+               if (bytesRead >= maxBytesToSend)
+               {
+                  toSend = (int)maxBytesToSend;
+                  maxBytesToSend = 0;
+               }
+               else
+               {
+                  maxBytesToSend = maxBytesToSend - bytesRead;
+               }
+               buffer.limit(toSend);
             }
-            else
-            {
-               maxBytesToSend = maxBytesToSend - bytesRead;
-            }
-            buffer.limit(toSend);
-         }
-         buffer.rewind();
+            buffer.rewind();
 
-         // sending -1 or 0 bytes will close the file at the backup
-         sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, bytesRead, buffer));
-         if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
-            break;
+            // sending -1 or 0 bytes will close the file at the backup
+            sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, bytesRead, buffer));
+            if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
+               break;
          }
       }
       finally
@@ -597,9 +592,9 @@
    }
 
    @Override
-   public void sendSynchronizationDone()
+   public void sendSynchronizationDone(String nodeID)
    {
       if (enabled)
-         sendReplicatePacket(new ReplicationStartSyncMessage(null, null));
+         sendReplicatePacket(new ReplicationStartSyncMessage(nodeID));
    }
 }

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java	2011-11-02 16:17:48 UTC (rev 11636)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java	2011-11-02 16:18:22 UTC (rev 11637)
@@ -20,7 +20,9 @@
 
 import javax.management.MBeanServer;
 
+import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.core.config.BridgeConfiguration;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.DivertConfiguration;
@@ -33,6 +35,7 @@
 import org.hornetq.core.replication.ReplicationEndpoint;
 import org.hornetq.core.replication.ReplicationManager;
 import org.hornetq.core.security.Role;
+import org.hornetq.core.server.cluster.ClusterConnection;
 import org.hornetq.core.server.cluster.ClusterManager;
 import org.hornetq.core.server.group.GroupingHandler;
 import org.hornetq.core.server.impl.ConnectorsService;
@@ -57,7 +60,7 @@
 public interface HornetQServer extends HornetQComponent
 {
 
-   /** This method was created mainly for testing but it may be used in scenarios where 
+   /** This method was created mainly for testing but it may be used in scenarios where
     *  you need to have more than one Server inside the same VM.
     *  This identity will be exposed on logs what may help you to debug issues on the log traces and debugs.*/
    void setIdentity(String identity);
@@ -65,7 +68,7 @@
    String getIdentity();
 
    String describe();
-   
+
    Configuration getConfiguration();
 
    RemotingService getRemotingService();
@@ -113,7 +116,7 @@
    Set<ServerSession> getSessions();
 
    boolean isStarted();
-   
+
    boolean isStopped();
 
    HierarchicalRepository<Set<Role>> getSecurityRepository();
@@ -182,14 +185,17 @@
    void destroyBridge(String name) throws Exception;
 
    ServerSession getSessionByID(String sessionID);
-   
+
    void threadDump(String reason);
 
    void stop(boolean failoverOnServerShutdown) throws Exception;
 
    /**
     * @param rc
+    * @param pair
+    * @param clusterConnection
     * @return {@code true} if replication started successfully, {@code false} otherwise
     */
-   boolean startReplication(CoreRemotingConnection rc);
+   boolean startReplication(CoreRemotingConnection rc, ClusterConnection clusterConnection,
+      Pair<TransportConfiguration, TransportConfiguration> pair);
 }

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/NodeManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/NodeManager.java	2011-11-02 16:17:48 UTC (rev 11636)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/NodeManager.java	2011-11-02 16:18:22 UTC (rev 11637)
@@ -69,6 +69,11 @@
       return uuid;
    }
 
+   public void setNodeID(String nodeID)
+   {
+      this.nodeID = new SimpleString(nodeID);
+   }
+
    public abstract boolean isAwaitingFailback() throws Exception;
 
    public abstract boolean isBackupLive() throws Exception;

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-11-02 16:17:48 UTC (rev 11636)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-11-02 16:18:22 UTC (rev 11637)
@@ -109,6 +109,7 @@
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.QueueFactory;
 import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.cluster.ClusterConnection;
 import org.hornetq.core.server.cluster.ClusterManager;
 import org.hornetq.core.server.cluster.Transformer;
 import org.hornetq.core.server.cluster.impl.ClusterManagerImpl;
@@ -473,7 +474,7 @@
       stop(failoverOnServerShutdown, false);
    }
 
-   protected void stop(boolean failoverOnServerShutdown, boolean criticalIOError) throws Exception
+   private void stop(boolean failoverOnServerShutdown, boolean criticalIOError) throws Exception
    {
       synchronized (this)
       {
@@ -584,7 +585,7 @@
 
          for (Runnable task : tasks)
          {
-            HornetQServerImpl.log.debug(this + "::Waiting for " + task);
+               HornetQServerImpl.log.info(this + "::Waiting for " + task);
          }
 
          if (memoryManager != null)
@@ -592,9 +593,9 @@
             memoryManager.stop();
          }
 
-         threadPool.shutdown();
+            threadPool.shutdown();
 
-         scheduledPool.shutdown();
+            scheduledPool.shutdown();
 
          try
          {
@@ -1222,7 +1223,7 @@
    // null);
    // }
 
-   protected PagingManager createPagingManager()
+   private PagingManager createPagingManager()
    {
 
       return new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(),
@@ -1238,7 +1239,7 @@
    /**
     * This method is protected as it may be used as a hook for creating a custom storage manager (on tests for instance)
     */
-   protected StorageManager createStorageManager()
+   private StorageManager createStorageManager()
    {
       if (configuration.isPersistenceEnabled())
       {
@@ -2086,7 +2087,7 @@
 
             final TransportConfiguration config = configuration.getConnectorConfigurations().get(liveConnectorName);
             serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(config);
-            final QuorumManager quorumManager = new QuorumManager(serverLocator, nodeManager.getNodeId().toString());
+            final QuorumManager quorumManager = new QuorumManager(serverLocator);
 
             serverLocator.setReconnectAttempts(-1);
 
@@ -2127,10 +2128,11 @@
             while (true)
             {
                nodeManager.awaitLiveNode();
-               if (!started || quorumManager.isNodeDown())
-               {
-                  break;
-               }
+               break;
+//               if (!started || quorumManager.isNodeDown())
+//               {
+//                  break;
+//               }
             }
 
             serverLocator.close();
@@ -2278,13 +2280,20 @@
 
 
    @Override
-   public boolean startReplication(CoreRemotingConnection rc)
+   public boolean startReplication(CoreRemotingConnection rc, ClusterConnection clusterConnection,
+      Pair<TransportConfiguration, TransportConfiguration> pair)
    {
+      if (replicationManager != null)
+      {
+         return false;
+      }
+
       replicationManager = new ReplicationManagerImpl(rc, executorFactory);
       try
       {
          replicationManager.start();
-         storageManager.startReplication(replicationManager, pagingManager);
+         storageManager.startReplication(replicationManager, pagingManager, getNodeID().toString(), clusterConnection,
+                                         pair);
          return true;
       }
       catch (Exception e)
@@ -2313,8 +2322,9 @@
       return backupUpToDate;
    }
 
-   public void setRemoteBackupUpToDate()
+   public void setRemoteBackupUpToDate(String nodeID)
    {
+      nodeManager.setNodeID(nodeID);
       backupUpToDate = true;
    }
 }

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java	2011-11-02 16:17:48 UTC (rev 11636)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java	2011-11-02 16:18:22 UTC (rev 11637)
@@ -32,16 +32,14 @@
 
    // volatile boolean started;
    private final ServerLocator locator;
-   private final String targetServerName;
+   private final String targetServerName = "";
    private final Map<String, Pair<TransportConfiguration, TransportConfiguration>> nodes =
             new ConcurrentHashMap<String, Pair<TransportConfiguration, TransportConfiguration>>();
    private static final long DISCOVERY_TIMEOUT = 3;
 
-   public QuorumManager(ServerLocator serverLocator, String nodeID)
+   public QuorumManager(ServerLocator serverLocator)
    {
       this.locator = serverLocator;
-      this.targetServerName = nodeID;
-
       locator.addClusterTopologyListener(this);
    }
 

Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java	2011-11-02 16:17:48 UTC (rev 11636)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java	2011-11-02 16:18:22 UTC (rev 11637)
@@ -140,7 +140,7 @@
             deliver();
          }
 
-         if (packet.getType() == PacketImpl.REPLICATION_START_STOP_SYNC && mustHold)
+         if (packet.getType() == PacketImpl.REPLICATION_START_FINISH_SYNC && mustHold)
          {
             ReplicationStartSyncMessage syncMsg = (ReplicationStartSyncMessage)packet;
             if (syncMsg.isSynchronizationFinished() && !deliver)



More information about the hornetq-commits mailing list