[hornetq-commits] JBoss hornetq SVN: r10910 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Jul 1 09:31:53 EDT 2011


Author: borges
Date: 2011-07-01 09:31:52 -0400 (Fri, 01 Jul 2011)
New Revision: 10910

Modified:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
Log:
HORNETQ-720 Still with non-functional replication

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-07-01 13:30:25 UTC (rev 10909)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-07-01 13:31:52 UTC (rev 10910)
@@ -53,7 +53,6 @@
 import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.journal.TransactionFailureCallback;
 import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
-import org.hornetq.core.journal.impl.ExportJournal;
 import org.hornetq.core.journal.impl.JournalFile;
 import org.hornetq.core.journal.impl.JournalImpl;
 import org.hornetq.core.journal.impl.JournalReaderCallback;
@@ -98,13 +97,12 @@
 import org.hornetq.utils.DataConstants;
 import org.hornetq.utils.ExecutorFactory;
 import org.hornetq.utils.HornetQThreadFactory;
-import org.hornetq.utils.UUID;
 import org.hornetq.utils.XidCodecSupport;
 
 /**
- * 
+ *
  * A JournalStorageManager
- * 
+ *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
@@ -155,15 +153,13 @@
 
    public static final byte PAGE_CURSOR_COUNTER_INC = 41;
 
-   private UUID persistentID;
-
    private final BatchingIDGenerator idGenerator;
 
-   private final ReplicationManager replicator;
+   private ReplicationManager replicator;
 
-   private final Journal messageJournal;
+   private Journal messageJournal;
 
-   private final Journal bindingsJournal;
+   private Journal bindingsJournal;
 
    private final SequentialFileFactory largeMessagesFactory;
 
@@ -330,6 +326,17 @@
       return replicator != null;
    }
 
+   public void setReplicator(ReplicationManager replicationManager)
+   {
+      assert replicationManager != null;
+      replicator = replicationManager;
+      Journal localMessageJournal = messageJournal;
+      Journal localBindingsJournal = bindingsJournal;
+      bindingsJournal = new ReplicatedJournal(((byte)0), localBindingsJournal, replicator);
+      messageJournal = new ReplicatedJournal((byte)1, localMessageJournal, replicator);
+      // XXX HORNETQ-720 obviously missing here is the synchronization step.
+   }
+
    public void waitOnOperations() throws Exception
    {
       if (!started)
@@ -340,9 +347,6 @@
       waitOnOperations(0);
    }
 
-   /* (non-Javadoc)
-    * @see org.hornetq.core.persistence.StorageManager#blockOnReplication()
-    */
    public boolean waitOnOperations(final long timeout) throws Exception
    {
       if (!started)
@@ -756,11 +760,11 @@
          ref.setPersistedCount(ref.getDeliveryCount());
          DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(),
                                                                                   ref.getDeliveryCount());
-   
+
          messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
                                            JournalStorageManager.UPDATE_DELIVERY_COUNT,
                                            updateInfo,
-   
+
                                            syncNonTransactional,
                                            getContext(syncNonTransactional));
       }
@@ -1137,19 +1141,19 @@
 
             continue;
          }
-         
+
          // Redistribution could install a Redistributor while we are still loading records, what will be an issue with prepared ACKs
          // We make sure te Queue is paused before we reroute values.
          queue.pause();
 
          Collection<AddMessageRecord> valueRecords = queueRecords.values();
-         
+
          long currentTime = System.currentTimeMillis();
 
          for (AddMessageRecord record : valueRecords)
          {
             long scheduledDeliveryTime = record.scheduledDeliveryTime;
-            
+
             if (scheduledDeliveryTime != 0 && scheduledDeliveryTime <= currentTime)
             {
                scheduledDeliveryTime = 0;
@@ -1224,7 +1228,7 @@
       {
          messageJournal.perfBlast(perfBlastPages);
       }
-      
+
       for (Queue queue : queues.values())
       {
          queue.resume();
@@ -1442,8 +1446,8 @@
 
       return bindingsInfo;
    }
-   
 
+
    /* (non-Javadoc)
     * @see org.hornetq.core.persistence.StorageManager#lineUpContext()
     */
@@ -2990,6 +2994,7 @@
          this.refEncoding = refEncoding;
       }
 
+      @Override
       public String toString()
       {
          return "AddRef;" + refEncoding;
@@ -3006,6 +3011,7 @@
          this.refEncoding = refEncoding;
       }
 
+      @Override
       public String toString()
       {
          return "ACK;" + refEncoding;
@@ -3022,6 +3028,7 @@
 
       Message msg;
 
+      @Override
       public String toString()
       {
          StringBuffer buffer = new StringBuffer();

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-07-01 13:30:25 UTC (rev 10909)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-07-01 13:31:52 UTC (rev 10910)
@@ -29,6 +29,7 @@
 import org.hornetq.core.protocol.core.CoreRemotingConnection;
 import org.hornetq.core.protocol.core.Packet;
 import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
+import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
 import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.HaBackupRegistrationMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
@@ -75,7 +76,7 @@
                                                                                                              : null,
                                                                                                              server.getNodeID());
 
-      Channel channel1 = rc.getChannel(1, -1);
+      Channel channel1 = rc.getChannel(CHANNEL_ID.SESSION.id, -1);
 
       ChannelHandler handler = new HornetQPacketHandler(this, server, channel1, rc);
 
@@ -90,7 +91,7 @@
 
       final ConnectionEntry entry = new ConnectionEntry(rc, System.currentTimeMillis(), ttl);
 
-      final Channel channel0 = rc.getChannel(0, -1);
+      final Channel channel0 = rc.getChannel(CHANNEL_ID.PING.id, -1);
 
       channel0.setHandler(new ChannelHandler()
       {
@@ -157,10 +158,14 @@
             {
                HaBackupRegistrationMessage msg = (HaBackupRegistrationMessage)packet;
                System.out.println("HA_BACKUP_REGISTRATION: " + msg + " connector=" + msg.getConnector());
+               long channelID = msg.getChannelID();
+               Channel channelX = rc.getChannel(CHANNEL_ID.SESSION.id, -1);
+               Channel replicationChannel = rc.getChannel(CHANNEL_ID.REPLICATION.id, -1);
+               System.out.println("msg channelID: " + channelID);
                System.out.println("HA_BR: " + server.getIdentity() + ", toString=" + server);
                try
                {
-                  server.addHaBackup(msg.getConnector());
+                  server.addHaBackup(channelX, replicationChannel);
                }
                catch (Exception e)
                {

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java	2011-07-01 13:30:25 UTC (rev 10909)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java	2011-07-01 13:31:52 UTC (rev 10910)
@@ -25,10 +25,12 @@
 import org.hornetq.core.protocol.core.CoreRemotingConnection;
 import org.hornetq.core.protocol.core.Packet;
 import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
+import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
 import org.hornetq.core.protocol.core.impl.wireformat.CreateQueueMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.CreateReplicationSessionMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.HaBackupRegistrationMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReattachSessionMessage;
@@ -113,6 +115,25 @@
 
             break;
          }
+         case PacketImpl.HA_BACKUP_REGISTRATION:
+         {
+            HaBackupRegistrationMessage msg = (HaBackupRegistrationMessage)packet;
+            System.out.println("HA_BACKUP_REGISTRATION: " + msg + " connector=" + msg.getConnector());
+            long channelID = msg.getChannelID();
+            Channel channelX = connection.getChannel(CHANNEL_ID.SESSION.id, -1);
+            Channel replicationChannel = connection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
+            System.out.println("HA_BR: " + server.getIdentity() + ", toString=" + server);
+            try
+            {
+               server.addHaBackup(channelX, replicationChannel);
+            }
+            catch (Exception e)
+            {
+               // XXX This is not what we want
+               e.printStackTrace();
+               throw new RuntimeException(e);
+            }
+         }
          default:
          {
             HornetQPacketHandler.log.error("Invalid packet " + packet);

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java	2011-07-01 13:30:25 UTC (rev 10909)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java	2011-07-01 13:31:52 UTC (rev 10910)
@@ -21,12 +21,13 @@
 import org.hornetq.core.journal.JournalLoadInformation;
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.persistence.OperationContext;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
 import org.hornetq.core.server.HornetQComponent;
 
 /**
+ * Used by the {@link JournalStorageManager} to update the replicated journal.
+ * 
  * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- *
  */
 public interface ReplicationManager extends HornetQComponent
 {
@@ -79,7 +80,7 @@
 
    /**
     * @param journalInfo
-    * @throws HornetQException 
+    * @throws HornetQException
     */
    void compareJournals(JournalLoadInformation[] journalInfo) throws HornetQException;
 

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java	2011-07-01 13:30:25 UTC (rev 10909)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java	2011-07-01 13:31:52 UTC (rev 10910)
@@ -30,7 +30,7 @@
 import org.hornetq.core.replication.ReplicationManager;
 
 /**
- * Used by the {@link JournalStorageManager} to replicate journal calls. 
+ * Used by the {@link JournalStorageManager} to replicate journal calls.
  *
  * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
  *
@@ -59,10 +59,11 @@
 
    private final byte journalID;
 
-   public ReplicatedJournal(final byte journaID, final Journal localJournal, final ReplicationManager replicationManager)
+   public ReplicatedJournal(final byte journalID, final Journal localJournal,
+                            final ReplicationManager replicationManager)
    {
       super();
-      journalID = journaID;
+      this.journalID = journalID;
       this.localJournal = localJournal;
       this.replicationManager = replicationManager;
    }
@@ -201,10 +202,10 @@
       }
       replicationManager.appendCommitRecord(journalID, txID, lineUpContext);
       localJournal.appendCommitRecord(txID, sync, callback, lineUpContext);
-      
+
    }
 
-   
+
    /**
     * @param id
     * @param sync

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-07-01 13:30:25 UTC (rev 10909)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-07-01 13:31:52 UTC (rev 10910)
@@ -33,6 +33,7 @@
 import org.hornetq.core.protocol.core.ChannelHandler;
 import org.hornetq.core.protocol.core.CoreRemotingConnection;
 import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
 import org.hornetq.core.protocol.core.impl.PacketImpl;
 import org.hornetq.core.protocol.core.impl.wireformat.CreateReplicationSessionMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddMessage;
@@ -52,7 +53,7 @@
 
 /**
  * A ReplicationManagerImpl
- * 
+ *
  * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
  */
 public class ReplicationManagerImpl implements ReplicationManager
@@ -65,12 +66,11 @@
 
    private final ResponseHandler responseHandler = new ResponseHandler();
 
-   private final ClientSessionFactoryInternal sessionFactory;
+//   private final ClientSessionFactoryInternal sessionFactory;
+//   private CoreRemotingConnection replicatingConnection;
 
-   private CoreRemotingConnection replicatingConnection;
+   private final Channel replicatingChannel;
 
-   private Channel replicatingChannel;
-
    private boolean started;
 
    private volatile boolean enabled;
@@ -79,10 +79,12 @@
 
    private final Queue<OperationContext> pendingTokens = new ConcurrentLinkedQueue<OperationContext>();
 
-   private final ExecutorFactory executorFactory;
+   private ExecutorFactory executorFactory;
 
    private SessionFailureListener failureListener;
 
+   private final Channel systemChannel;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -90,15 +92,25 @@
    public ReplicationManagerImpl(final ClientSessionFactoryInternal sessionFactory, final ExecutorFactory executorFactory)
    {
       super();
-      this.sessionFactory = sessionFactory;
       this.executorFactory = executorFactory;
+
+      CoreRemotingConnection conn = sessionFactory.getConnection();
+      systemChannel = conn.getChannel(CHANNEL_ID.SESSION.id, -1);
+      replicatingChannel = conn.getChannel(CHANNEL_ID.REPLICATION.id, -1);
    }
 
    // Public --------------------------------------------------------
 
-   /* (non-Javadoc)
-    * @see org.hornetq.core.replication.ReplicationManager#replicate(byte[], org.hornetq.core.replication.ReplicationToken)
+   /**
+    * @param systemChannel
+    * @param replicatingChannel
     */
+   public ReplicationManagerImpl(Channel systemChannel, Channel replicatingChannel)
+   {
+      super();
+      this.systemChannel = systemChannel;
+      this.replicatingChannel = replicatingChannel;
+   }
 
    public void appendAddRecord(final byte journalID, final long id, final byte recordType, final EncodingSupport record)
    {
@@ -302,26 +314,21 @@
          throw new IllegalStateException("ReplicationManager is already started");
       }
 
-      replicatingConnection = sessionFactory.getConnection();
+//      replicatingConnection = sessionFactory.getConnection();
+//
+//      if (replicatingConnection == null)
+//      {
+//         ReplicationManagerImpl.log.warn("Backup server MUST be started before live server. Initialisation will not proceed.");
+//         throw new HornetQException(HornetQException.ILLEGAL_STATE,
+//                                    "Backup server MUST be started before live server. Initialisation will not proceed.");
+//      }
 
-      if (replicatingConnection == null)
-      {
-         ReplicationManagerImpl.log.warn("Backup server MUST be started before live server. Initialisation will not proceed.");
-         throw new HornetQException(HornetQException.ILLEGAL_STATE,
-                                    "Backup server MUST be started before live server. Initialisation will not proceed.");
-      }
-
-      long channelID = replicatingConnection.generateChannelID();
-
-      Channel mainChannel = replicatingConnection.getChannel(1, -1);
-
-      replicatingChannel = replicatingConnection.getChannel(channelID, -1);
-
       replicatingChannel.setHandler(responseHandler);
 
-      CreateReplicationSessionMessage replicationStartPackage = new CreateReplicationSessionMessage(channelID);
+      CreateReplicationSessionMessage replicationStartPackage =
+               new CreateReplicationSessionMessage(replicatingChannel.getID());
 
-      mainChannel.sendBlocking(replicationStartPackage);
+      systemChannel.sendBlocking(replicationStartPackage);
 
       failureListener = new SessionFailureListener()
       {
@@ -351,7 +358,7 @@
          {
          }
       };
-      sessionFactory.addFailureListener(failureListener);
+      // sessionFactory.addFailureListener(failureListener);
 
       started = true;
 
@@ -390,15 +397,15 @@
          replicatingChannel.close();
       }
 
-      sessionFactory.causeExit();
-      sessionFactory.removeFailureListener(failureListener);
-      if (replicatingConnection != null)
-      {
-         replicatingConnection.destroy();
-      }
+//      sessionFactory.causeExit();
+//      sessionFactory.removeFailureListener(failureListener);
+//      if (replicatingConnection != null)
+//      {
+//         replicatingConnection.destroy();
+//      }
+//
+//      replicatingConnection = null;
 
-      replicatingConnection = null;
-
       started = false;
    }
 

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java	2011-07-01 13:30:25 UTC (rev 10909)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java	2011-07-01 13:31:52 UTC (rev 10910)
@@ -20,7 +20,6 @@
 import javax.management.MBeanServer;
 
 import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.core.config.BridgeConfiguration;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.DivertConfiguration;
@@ -177,9 +176,5 @@
 
    void stop(boolean failoverOnServerShutdown) throws Exception;
 
-   /**
-    * @param connector
-    * @throws Exception
-    */
-   void addHaBackup(TransportConfiguration connector) throws Exception;
+   void addHaBackup(Channel channelX, Channel replicationChannel) throws Exception;
 }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-07-01 13:30:25 UTC (rev 10909)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-07-01 13:31:52 UTC (rev 10910)
@@ -44,7 +44,6 @@
 import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
 import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
-import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
 import org.hornetq.core.config.BridgeConfiguration;
 import org.hornetq.core.config.Configuration;
@@ -82,6 +81,7 @@
 import org.hornetq.core.postoffice.impl.LocalQueueBinding;
 import org.hornetq.core.postoffice.impl.PostOfficeImpl;
 import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
 import org.hornetq.core.protocol.core.impl.wireformat.HaBackupRegistrationMessage;
 import org.hornetq.core.remoting.server.RemotingService;
 import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
@@ -123,7 +123,6 @@
 import org.hornetq.spi.core.protocol.RemotingConnection;
 import org.hornetq.spi.core.protocol.SessionCallback;
 import org.hornetq.spi.core.security.HornetQSecurityManager;
-import org.hornetq.utils.ConcurrentHashSet;
 import org.hornetq.utils.ExecutorFactory;
 import org.hornetq.utils.HornetQThreadFactory;
 import org.hornetq.utils.OrderedExecutorFactory;
@@ -214,8 +213,6 @@
 
    private final Map<String, ServerSession> sessions = new ConcurrentHashMap<String, ServerSession>();
 
-   private final Set<String> sharedNothingBackups = new ConcurrentHashSet<String>();
-
    private final Object initialiseLock = new Object();
 
    private boolean initialised;
@@ -517,7 +514,7 @@
       }
    }
 
-   private class SharedNothingBackupActivation implements Activation
+   private final class SharedNothingBackupActivation implements Activation
    {
       public void run()
       {
@@ -550,11 +547,10 @@
             }
             log.info("announce backup to live-server (id=" + liveConnectorName + ")");
             liveServerSessionFactory.getConnection()
-                                    .getChannel(0, -1)
+                                    .getChannel(CHANNEL_ID.SESSION.id, -1)
                                     .send(new HaBackupRegistrationMessage(getNodeID().toString(), config));
             log.info("backup registered");
 
-
             started = true;
 
             log.info("HornetQ Backup Server version " + getVersion().getFullVersion() + " [" + nodeManager.getNodeId() +
@@ -639,7 +635,8 @@
          }
          else
          {
-            // Replicated
+            assert replicationEndpoint == null;
+            replicationEndpoint = new ReplicationEndpointImpl(this);
             activation = new SharedNothingBackupActivation();
          }
 
@@ -993,14 +990,12 @@
 
    public synchronized ReplicationEndpoint connectToReplicationEndpoint(final Channel channel) throws Exception
    {
-      if (configuration.isBackup())
+      if (!configuration.isBackup())
       {
-         throw new HornetQException(HornetQException.ILLEGAL_STATE, "Connected server is a backup server");
+         throw new HornetQException(HornetQException.ILLEGAL_STATE, "Connected server is not a backup server " +
+                  getIdentity());
       }
 
-      assert replicationEndpoint == null;
-      replicationEndpoint = new ReplicationEndpointImpl(this);
-
       if (replicationEndpoint == null)
          System.err.println("endpoint is null!");
 
@@ -1262,23 +1257,23 @@
    // Private
    // --------------------------------------------------------------------------------------
 
-   private boolean startReplication(TransportConfiguration connector) throws Exception
-   {
-      assert !configuration.isSharedStore();
-      if (configuration.isSharedStore() || connector == null)
-      {
-         return true;
-      }
+//   private boolean startReplication(TransportConfiguration connector) throws Exception
+//   {
+//      assert !configuration.isSharedStore();
+//      if (configuration.isSharedStore() || connector == null)
+//      {
+//         return true;
+//      }
+//
+//      serverLocator = HornetQClient.createServerLocatorWithHA(connector);
+//      ClientSessionFactoryInternal replicationFailoverManager =
+//               (ClientSessionFactoryInternal)serverLocator.createSessionFactory(connector);
+//      replicationManager = new ReplicationManagerImpl(replicationFailoverManager, executorFactory);
+//      replicationManager.start();
+//
+//      return true;
+//   }
 
-      serverLocator = HornetQClient.createServerLocatorWithHA(connector);
-      ClientSessionFactoryInternal replicationFailoverManager =
-               (ClientSessionFactoryInternal)serverLocator.createSessionFactory(connector);
-      replicationManager = new ReplicationManagerImpl(replicationFailoverManager, executorFactory);
-      replicationManager.start();
-
-      return true;
-   }
-
    private void callActivateCallbacks()
    {
       for (ActivateCallback callback : activateCallbacks)
@@ -1950,11 +1945,23 @@
    }
 
    @Override
-   public void addHaBackup(TransportConfiguration connector) throws Exception
+   public void addHaBackup(Channel systemChannel, Channel replicatingChannel) throws Exception
    {
-      log.info(connector + " " + connector.getFactoryClassName() + " " + connector.getParams() + " " +
-               replicationManager);
-      startReplication(connector);
-      // throw new UnsupportedOperationException("unimplemented");
+      if (!(storageManager instanceof JournalStorageManager))
+         return;
+      JournalStorageManager journalStorageManager = (JournalStorageManager)storageManager;
+
+      System.out.println(HornetQServerImpl.class.getName() + " " + this.getIdentity() +
+               ": create a ReplicationManagerImpl. Using ChannelID=" + systemChannel);
+      // XXX not sure this is the right call to use
+//      final ServerLocatorInternal serverLocator =
+//               (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(connector);
+//      ClientSessionFactoryInternal sessionFactory = (ClientSessionFactoryInternal)serverLocator.createSessionFactory();
+      replicationManager = new ReplicationManagerImpl(systemChannel, replicatingChannel);
+      System.out.println("rep.start()");
+      replicationManager.start();
+
+      System.out.println("add RepMan to JournalStorageManager...");
+      journalStorageManager.setReplicator(replicationManager);
    }
 }

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java	2011-07-01 13:30:25 UTC (rev 10909)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java	2011-07-01 13:31:52 UTC (rev 10910)
@@ -169,8 +169,7 @@
       config1.setLargeMessagesDirectory(config1.getLargeMessagesDirectory() + "_backup");
       config1.getAcceptorConfigurations().clear();
       config1.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false));
-      TransportConfiguration tc = getConnectorTransportConfiguration(true);
-      config1.getConnectorConfigurations().put(LIVE_NODE_NAME, tc);
+      config1.getConnectorConfigurations().put(LIVE_NODE_NAME, getConnectorTransportConfiguration(true));
 
       //liveConfig.setBackupConnectorName("toBackup");
       config1.setSecurityEnabled(false);



More information about the hornetq-commits mailing list