[hornetq-commits] JBoss hornetq SVN: r8018 - in branches/Replication_Clebert: src/main/org/hornetq/core/persistence/impl/journal and 6 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Sep 30 22:50:16 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-09-30 22:50:16 -0400 (Wed, 30 Sep 2009)
New Revision: 8018

Modified:
   branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java
   branches/Replication_Clebert/src/main/org/hornetq/core/journal/TransactionFailureCallback.java
   branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
   branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationEndpoint.java
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java
   branches/Replication_Clebert/src/main/org/hornetq/core/server/HornetQServer.java
   branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
   branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
changes...

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java	2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java	2009-10-01 02:50:16 UTC (rev 8018)
@@ -75,9 +75,6 @@
 
    // Load
    
-   /** This method could be promoted to {@link Journal} interface when we decide to use the loadManager 
-    *  instead of load(List,List)
-    */
    long load(LoaderCallback reloadManager) throws Exception;
 
 

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/journal/TransactionFailureCallback.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/journal/TransactionFailureCallback.java	2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/journal/TransactionFailureCallback.java	2009-10-01 02:50:16 UTC (rev 8018)
@@ -16,7 +16,7 @@
 import java.util.List;
 
 /**
- * A TransactionFailureCallback
+ * A Callback to receive information about bad transactions for extra cleanup required for broken transactions such as large messages.
  *
  * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
  *

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-10-01 02:50:16 UTC (rev 8018)
@@ -163,13 +163,7 @@
 
       journalDir = config.getJournalDirectory();
 
-      if (journalDir == null)
-      {
-         throw new NullPointerException("journal-dir is null");
-      }
 
-      createJournalDir = config.isCreateJournalDir();
-
       SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir);
 
       bindingsJournal = new JournalImpl(1024 * 1024,
@@ -181,6 +175,13 @@
                                         "bindings",
                                         1);
 
+      if (journalDir == null)
+      {
+         throw new NullPointerException("journal-dir is null");
+      }
+
+      createJournalDir = config.isCreateJournalDir();
+
       syncNonTransactional = config.isJournalSyncNonTransactional();
 
       syncTransactional = config.isJournalSyncTransactional();
@@ -738,7 +739,7 @@
          messageJournal.perfBlast(perfBlastPages);
       }
    }
-
+   
    /**
     * @param messages
     * @param buff
@@ -1141,7 +1142,7 @@
    // This should be accessed from this package only
    void deleteFile(final SequentialFile file)
    {
-      executor.execute(new Runnable()
+      Runnable deleteAction = new Runnable()
       {
          public void run()
          {
@@ -1155,7 +1156,16 @@
             }
          }
 
-      });
+      };
+      
+      if (executor == null)
+      {
+         deleteAction.run();
+      }
+      else
+      {
+         executor.execute(deleteAction);
+      }
    }
 
    /**

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java	2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java	2009-10-01 02:50:16 UTC (rev 8018)
@@ -141,7 +141,7 @@
    
    // Replication
    
-   public static final byte REPLICATION_APPEND = 77;
+   public static final byte REPLICATION_APPEND = 80;
 
    // Static --------------------------------------------------------
 

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java	2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java	2009-10-01 02:50:16 UTC (rev 8018)
@@ -33,6 +33,9 @@
 
    private long id;
 
+   /** 0 - Bindings, 1 - MessagesJournal */
+   private byte journalID;
+
    private byte recordType;
 
    private EncodingSupport encodingData;
@@ -42,27 +45,26 @@
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
-   
+
    public ReplicationAddMessage()
    {
       super(REPLICATION_APPEND);
    }
 
-   public ReplicationAddMessage(long id, byte recordType, EncodingSupport encodingData)
+   public ReplicationAddMessage(byte journalID, long id, byte recordType, EncodingSupport encodingData)
    {
       this();
+      this.journalID = journalID;
       this.id = id;
       this.recordType = recordType;
       this.encodingData = encodingData;
    }
 
    // Public --------------------------------------------------------
-   
-   
-   
+
    public int getRequiredBufferSize()
    {
-      return BASIC_PACKET_SIZE +
+      return BASIC_PACKET_SIZE + DataConstants.SIZE_BYTE +
              DataConstants.SIZE_LONG +
              DataConstants.SIZE_BYTE +
              DataConstants.SIZE_INT +
@@ -73,6 +75,7 @@
    @Override
    public void encodeBody(final HornetQBuffer buffer)
    {
+      buffer.writeByte(journalID);
       buffer.writeLong(id);
       buffer.writeByte(recordType);
       buffer.writeInt(encodingData.getEncodeSize());
@@ -82,6 +85,7 @@
    @Override
    public void decodeBody(final HornetQBuffer buffer)
    {
+      journalID = buffer.readByte();
       id = buffer.readLong();
       recordType = buffer.readByte();
       int size = buffer.readInt();
@@ -89,7 +93,38 @@
       buffer.readBytes(recordData);
    }
 
+   /**
+    * @return the id
+    */
+   public long getId()
+   {
+      return id;
+   }
 
+   /**
+    * @return the journalID
+    */
+   public byte getJournalID()
+   {
+      return journalID;
+   }
+
+   /**
+    * @return the recordType
+    */
+   public byte getRecordType()
+   {
+      return recordType;
+   }
+
+   /**
+    * @return the recordData
+    */
+   public byte[] getRecordData()
+   {
+      return recordData;
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationEndpoint.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationEndpoint.java	2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationEndpoint.java	2009-10-01 02:50:16 UTC (rev 8018)
@@ -13,6 +13,7 @@
 
 package org.hornetq.core.replication;
 
+import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.remoting.ChannelHandler;
 import org.hornetq.core.server.HornetQComponent;
 
@@ -26,4 +27,8 @@
 public interface ReplicationEndpoint extends ChannelHandler, HornetQComponent
 {
 
+   void setChannel(Channel channel);
+
+   Channel getChannel();
+
 }

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java	2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java	2009-10-01 02:50:16 UTC (rev 8018)
@@ -24,5 +24,5 @@
  */
 public interface ReplicationManager  extends HornetQComponent
 {
-   void appendAddRecord(long id, byte recordType, EncodingSupport record);
+   void appendAddRecord(byte journalID, long id, byte recordType, EncodingSupport record);
 }

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2009-10-01 02:50:16 UTC (rev 8018)
@@ -13,11 +13,16 @@
 
 package org.hornetq.core.replication.impl;
 
-import org.hornetq.core.remoting.ChannelHandler;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.journal.Journal;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationPacket;
+import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
 import org.hornetq.core.replication.ReplicationEndpoint;
-import org.hornetq.core.replication.ReplicationManager;
 import org.hornetq.core.server.HornetQServer;
 
 /**
@@ -32,10 +37,20 @@
 
    // Constants -----------------------------------------------------
 
+   private static final Logger log = Logger.getLogger(ReplicationEndpointImpl.class);
+
    // Attributes ----------------------------------------------------
 
    private final HornetQServer server;
 
+   private Channel channel;
+
+   private Journal bindingsJournal;
+
+   private Journal messagingJournal;
+   
+   private JournalStorageManager storage;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -45,12 +60,26 @@
    }
 
    // Public --------------------------------------------------------
-   /* (non-Javadoc)
+   /* 
+    * (non-Javadoc)
     * @see org.hornetq.core.remoting.ChannelHandler#handlePacket(org.hornetq.core.remoting.Packet)
     */
    public void handlePacket(Packet packet)
    {
-      System.out.println("packet = " + packet);
+      try
+      {
+         if (packet.getType() == PacketImpl.REPLICATION_APPEND)
+         {
+            System.out.println("Replicated");
+            handleAppendAddRecord(packet);
+         }
+      }
+      catch (Exception e)
+      {
+         // TODO: what to do when the IO fails on the backup side? should we shutdown the backup?
+         log.warn(e.getMessage(), e);
+      }
+      channel.send(new NullResponseMessage());
    }
 
    /* (non-Javadoc)
@@ -66,6 +95,17 @@
     */
    public void start() throws Exception
    {
+      Configuration config = server.getConfiguration();
+      
+      // TODO: this needs an executor
+      JournalStorageManager storage = new JournalStorageManager(config, null);
+      storage.start();
+      
+      this.bindingsJournal = storage.getBindingsJournal();
+      this.messagingJournal = storage.getBindingsJournal();
+      
+      // We only need to load internal structures on the backup...
+      storage.loadInternalOnly();
    }
 
    /* (non-Javadoc)
@@ -73,14 +113,52 @@
     */
    public void stop() throws Exception
    {
+      storage.stop();
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.replication.ReplicationEndpoint#getChannel()
+    */
+   public Channel getChannel()
+   {
+      return channel;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.replication.ReplicationEndpoint#setChannel(org.hornetq.core.remoting.Channel)
+    */
+   public void setChannel(Channel channel)
+   {
+      this.channel = channel;
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
 
    // Private -------------------------------------------------------
 
+   /**
+    * @param packet
+    * @throws Exception
+    */
+   private void handleAppendAddRecord(Packet packet) throws Exception
+   {
+      ReplicationAddMessage addMessage = (ReplicationAddMessage)packet;
+      Journal journalToUse;
+
+      if (addMessage.getJournalID() == (byte)0)
+      {
+         journalToUse = bindingsJournal;
+      }
+      else
+      {
+         journalToUse = messagingJournal;
+      }
+
+      journalToUse.appendAddRecord(addMessage.getId(), addMessage.getRecordType(), addMessage.getRecordData(), false);
+   }
+
    // Inner classes -------------------------------------------------
 
 }

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-10-01 02:50:16 UTC (rev 8018)
@@ -13,11 +13,19 @@
 
 package org.hornetq.core.replication.impl;
 
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+
 import org.hornetq.core.client.impl.ConnectionManager;
 import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.logging.Logger;
 import org.hornetq.core.remoting.Channel;
+import org.hornetq.core.remoting.ChannelHandler;
+import org.hornetq.core.remoting.Packet;
 import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.remoting.impl.wireformat.CreateReplicationSessionMessage;
+import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
 import org.hornetq.core.replication.ReplicationManager;
 import org.hornetq.core.replication.ReplicationToken;
@@ -33,12 +41,15 @@
 {
 
    // Constants -----------------------------------------------------
+   private static final Logger log = Logger.getLogger(ReplicationManagerImpl.class);
 
    // Attributes ----------------------------------------------------
 
    // TODO: where should this be configured?
    private static final int WINDOW_SIZE = 100 * 1024;
 
+   private final ResponseHandler responseHandler = new ResponseHandler();
+
    private final ConnectionManager connectionManager;
 
    private RemotingConnection connection;
@@ -47,6 +58,16 @@
 
    private boolean started;
 
+   private boolean playedResponsesOnFailure;
+
+   private final Object replicationLock = new Object();
+
+   private final Executor executor;
+
+   private final ThreadLocal<ReplicationToken> repliToken = new ThreadLocal<ReplicationToken>();
+
+   private final Queue<ReplicationToken> pendingTokens = new ConcurrentLinkedQueue<ReplicationToken>();
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -54,10 +75,11 @@
    /**
     * @param replicationConnectionManager
     */
-   public ReplicationManagerImpl(ConnectionManager connectionManager)
+   public ReplicationManagerImpl(final ConnectionManager connectionManager, final Executor executor)
    {
       super();
       this.connectionManager = connectionManager;
+      this.executor = executor;
    }
 
    // Public --------------------------------------------------------
@@ -65,11 +87,10 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.replication.ReplicationManager#replicate(byte[], org.hornetq.core.replication.ReplicationToken)
     */
-   
-   
-   public void appendAddRecord(long id, byte recordType, EncodingSupport encodingData)
+
+   public void appendAddRecord(final byte journalID, final long id, final byte recordType, final EncodingSupport encodingData)
    {
-      replicatingChannel.send(new ReplicationAddMessage(id, recordType, encodingData));
+      sendReplicatePacket(new ReplicationAddMessage(journalID, id, recordType, encodingData));
    }
 
    /* (non-Javadoc)
@@ -85,22 +106,22 @@
     */
    public synchronized void start() throws Exception
    {
-      this.started = true;
-
       connection = connectionManager.getConnection(1);
 
       long channelID = connection.generateChannelID();
 
       Channel mainChannel = connection.getChannel(1, -1, false);
 
-      Channel tempChannel = connection.getChannel(channelID, WINDOW_SIZE, false);
+      this.replicatingChannel = connection.getChannel(channelID, WINDOW_SIZE, false);
 
+      this.replicatingChannel.setHandler(this.responseHandler);
+      
       CreateReplicationSessionMessage replicationStartPackage = new CreateReplicationSessionMessage(channelID,
                                                                                                     WINDOW_SIZE);
 
       mainChannel.sendBlocking(replicationStartPackage);
 
-      this.replicatingChannel = tempChannel;
+      this.started = true;
    }
 
    /* (non-Javadoc)
@@ -114,7 +135,7 @@
       }
 
       this.started = false;
-      
+
       if (connection != null)
       {
          connection.destroy();
@@ -123,6 +144,63 @@
       connection = null;
    }
 
+   public ReplicationToken getReplicationToken()
+   {
+      ReplicationToken token = repliToken.get();
+      if (token == null)
+      {
+         token = new ReplicationTokenImpl(executor);
+         repliToken.set(token);
+      }
+      return token;
+   }
+
+   private void sendReplicatePacket(final Packet packet)
+   {
+      boolean runItNow = false;
+
+      ReplicationToken repliToken = getReplicationToken();
+      repliToken.linedUp();
+
+      synchronized (replicationLock)
+      {
+         if (playedResponsesOnFailure)
+         {
+            // Already replicating channel failed, so just play the action now
+
+            runItNow = true;
+         }
+         else
+         {
+            pendingTokens.add(repliToken);
+
+            // TODO: Should I use connect.write directly here?
+            replicatingChannel.send(packet);
+         }
+      }
+
+      // Execute outside lock
+
+      if (runItNow)
+      {
+         repliToken.replicated();
+      }
+   }
+
+   private void replicated()
+   {
+      ReplicationToken tokenPolled = pendingTokens.poll();
+      if (tokenPolled == null)
+      {
+         // We should debug the logs if this happens
+         log.warn("Missing replication token on the stack. There is a bug on the ReplicatoinManager since this was not supposed to happen");
+      }
+      else
+      {
+         tokenPolled.replicated();
+      }
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -131,4 +209,20 @@
 
    // Inner classes -------------------------------------------------
 
+   protected class ResponseHandler implements ChannelHandler
+   {
+      /* (non-Javadoc)
+       * @see org.hornetq.core.remoting.ChannelHandler#handlePacket(org.hornetq.core.remoting.Packet)
+       */
+      public void handlePacket(Packet packet)
+      {
+         System.out.println("HandlePacket on client");
+         if (packet.getType() == PacketImpl.NULL_RESPONSE)
+         {
+            replicated();
+         }
+      }
+
+   }
+
 }

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java	2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java	2009-10-01 02:50:16 UTC (rev 8018)
@@ -46,6 +46,7 @@
    public synchronized void linedUp()
    {
       pendings++;
+      System.out.println("pendings (lined up) = " + pendings);
    }
 
    /** To be called by the replication manager, when data is confirmed on the channel */
@@ -62,11 +63,13 @@
             tasks.clear();
          }
       }
+      System.out.println("pendings (replicated) = " + pendings);
    }
    
    /** You may have several actions to be done after a replication operation is completed. */
    public synchronized void addFutureCompletion(Runnable runnable)
    {
+      System.out.println("pendings = " + pendings);
       if (pendings == 0)
       {
          executor.execute(runnable);

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/HornetQServer.java	2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/HornetQServer.java	2009-10-01 02:50:16 UTC (rev 8018)
@@ -24,6 +24,7 @@
 import org.hornetq.core.management.impl.HornetQServerControlImpl;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
@@ -71,7 +72,7 @@
 
    ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String name, int lastReceivedCommandID) throws Exception;
    
-   ReplicationEndpoint createReplicationEndpoint() throws HornetQException;
+   ReplicationEndpoint createReplicationEndpoint(Channel channel) throws Exception;
 
    CreateSessionResponseMessage createSession(String name,
                                               long channelID,                                              

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java	2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java	2009-10-01 02:50:16 UTC (rev 8018)
@@ -196,7 +196,7 @@
       try
       {
          Channel channel = connection.getChannel(request.getSessionChannelID(), request.getWindowSize(), false);
-         ReplicationEndpoint endpoint = server.createReplicationEndpoint();
+         ReplicationEndpoint endpoint = server.createReplicationEndpoint(channel);
          channel.setHandler(endpoint);
          response = new NullResponseMessage();
 

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-10-01 02:50:16 UTC (rev 8018)
@@ -200,7 +200,7 @@
    
    private ReplicationManager replicationManager;
    
-   private ReplicationEndpoint replicationEndpoint = new ReplicationEndpointImpl(this);
+   private ReplicationEndpoint replicationEndpoint;
 
    private final Set<ActivateCallback> activateCallbacks = new HashSet<ActivateCallback>();
 
@@ -593,7 +593,7 @@
       return new CreateSessionResponseMessage(true, version.getIncrementingVersion());
    }
    
-   public synchronized ReplicationEndpoint createReplicationEndpoint() throws HornetQException
+   public synchronized ReplicationEndpoint createReplicationEndpoint(final Channel channel) throws Exception
    {
       if (!configuration.isBackup())
       {
@@ -603,7 +603,11 @@
       if (replicationEndpoint == null)
       {
          replicationEndpoint = new ReplicationEndpointImpl(this);
+         replicationEndpoint.start();
       }
+      
+      replicationEndpoint.setChannel(channel);
+      
       return replicationEndpoint;
    }
 
@@ -712,7 +716,7 @@
                                                           scheduledPool,
                                                           null);
             
-            this.replicationManager = new ReplicationManagerImpl(replicatingConnectionManager);
+            this.replicationManager = new ReplicationManagerImpl(replicatingConnectionManager, this.executorFactory.getExecutor());
             replicationManager.start();
          }
       }
@@ -1113,6 +1117,8 @@
             }
          }, 0, dumpInfoInterval, TimeUnit.MILLISECONDS);
       }
+      
+      startReplication();
 
       initialised = true;
    }

Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-10-01 02:50:16 UTC (rev 8018)
@@ -18,16 +18,15 @@
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 
-import javax.management.MBeanServer;
-
 import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
 import org.hornetq.core.client.impl.ConnectionManager;
 import org.hornetq.core.client.impl.ConnectionManagerImpl;
@@ -36,39 +35,19 @@
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.journal.EncodingSupport;
 import org.hornetq.core.management.ManagementService;
-import org.hornetq.core.management.impl.HornetQServerControlImpl;
-import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.remoting.ChannelHandler;
 import org.hornetq.core.remoting.Interceptor;
 import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
-import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
-import org.hornetq.core.remoting.server.RemotingService;
 import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
-import org.hornetq.core.replication.ReplicationEndpoint;
-import org.hornetq.core.replication.impl.ReplicationEndpointImpl;
 import org.hornetq.core.replication.impl.ReplicationManagerImpl;
-import org.hornetq.core.security.HornetQSecurityManager;
-import org.hornetq.core.security.Role;
-import org.hornetq.core.server.ActivateCallback;
 import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.Queue;
-import org.hornetq.core.server.QueueFactory;
-import org.hornetq.core.server.ServerSession;
-import org.hornetq.core.server.cluster.ClusterManager;
 import org.hornetq.core.server.impl.HornetQServerImpl;
-import org.hornetq.core.settings.HierarchicalRepository;
-import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.core.transaction.ResourceManager;
-import org.hornetq.core.version.Version;
 import org.hornetq.tests.util.ServiceTestBase;
 import org.hornetq.utils.ExecutorFactory;
 import org.hornetq.utils.HornetQThreadFactory;
-import org.hornetq.utils.SimpleString;
 
 /**
  * A ReplicationTest
@@ -111,7 +90,7 @@
 
       try
       {
-         ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager);
+         ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager, executor);
          manager.start();
          manager.stop();
       }
@@ -134,7 +113,7 @@
 
       try
       {
-         ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager);
+         ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager, executor);
          try
          {
             manager.start();
@@ -165,10 +144,20 @@
 
       try
       {
-         ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager);
+         ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager, executor);
          manager.start();
-         manager.appendAddRecord(1, (byte)1, new DataImplement());
-         Thread.sleep(1000);
+         manager.appendAddRecord((byte)0, 1, (byte)1, new DataImplement());
+         final CountDownLatch latch = new CountDownLatch(1);
+         manager.getReplicationToken().addFutureCompletion(new Runnable()
+         {
+
+            public void run()
+            {
+               latch.countDown();
+            }
+
+         });
+         assertTrue(latch.await(1, TimeUnit.SECONDS));
          manager.stop();
       }
       finally
@@ -176,7 +165,7 @@
          server.stop();
       }
    }
-   
+
    class DataImplement implements EncodingSupport
    {
 
@@ -196,7 +185,7 @@
       {
          return 5;
       }
-      
+
    }
 
    // Package protected ---------------------------------------------
@@ -279,313 +268,4 @@
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------
-
-   static class FakeServer implements HornetQServer
-   {
-
-      public Queue createQueue(SimpleString address,
-                               SimpleString queueName,
-                               SimpleString filter,
-                               boolean durable,
-                               boolean temporary) throws Exception
-      {
-         return null;
-      }
-
-      public CreateSessionResponseMessage createSession(String name,
-                                                        long channelID,
-                                                        String username,
-                                                        String password,
-                                                        int minLargeMessageSize,
-                                                        int incrementingVersion,
-                                                        RemotingConnection remotingConnection,
-                                                        boolean autoCommitSends,
-                                                        boolean autoCommitAcks,
-                                                        boolean preAcknowledge,
-                                                        boolean xa,
-                                                        int producerWindowSize) throws Exception
-      {
-
-         return null;
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.server.HornetQServer#deployQueue(org.hornetq.utils.SimpleString, org.hornetq.utils.SimpleString, org.hornetq.utils.SimpleString, boolean, boolean)
-       */
-      public Queue deployQueue(SimpleString address,
-                               SimpleString queueName,
-                               SimpleString filterString,
-                               boolean durable,
-                               boolean temporary) throws Exception
-      {
-
-         return null;
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.server.HornetQServer#destroyQueue(org.hornetq.utils.SimpleString, org.hornetq.core.server.ServerSession)
-       */
-      public void destroyQueue(SimpleString queueName, ServerSession session) throws Exception
-      {
-
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.server.HornetQServer#getAddressSettingsRepository()
-       */
-      public HierarchicalRepository<AddressSettings> getAddressSettingsRepository()
-      {
-
-         return null;
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.server.HornetQServer#getClusterManager()
-       */
-      public ClusterManager getClusterManager()
-      {
-
-         return null;
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.server.HornetQServer#getConfiguration()
-       */
-      public Configuration getConfiguration()
-      {
-
-         return null;
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.server.HornetQServer#getConnectionCount()
-       */
-      public int getConnectionCount()
-      {
-
-         return 0;
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.server.HornetQServer#getExecutorFactory()
-       */
-      public ExecutorFactory getExecutorFactory()
-      {
-
-         return null;
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.server.HornetQServer#getHornetQServerControl()
-       */
-      public HornetQServerControlImpl getHornetQServerControl()
-      {
-
-         return null;
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.server.HornetQServer#getMBeanServer()
-       */
-      public MBeanServer getMBeanServer()
-      {
-
-         return null;
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.server.HornetQServer#getManagementService()
-       */
-      public ManagementService getManagementService()
-      {
-
-         return null;
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.server.HornetQServer#getNodeID()
-       */
-      public SimpleString getNodeID()
-      {
-
-         return null;
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.server.HornetQServer#getPostOffice()
-       */
-      public PostOffice getPostOffice()
-      {
-
-         return null;
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.server.HornetQServer#getQueueFactory()
-       */
-      public QueueFactory getQueueFactory()
-      {
-
-         return null;
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.server.HornetQServer#getRemotingService()
-       */
-      public RemotingService getRemotingService()
-      {
-
-         return null;
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.server.HornetQServer#getResourceManager()
-       */
-      public ResourceManager getResourceManager()
-      {
-
-         return null;
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.server.HornetQServer#getSecurityManager()
-       */
-      public HornetQSecurityManager getSecurityManager()
-      {
-
-         return null;
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.server.HornetQServer#getSecurityRepository()
-       */
-      public HierarchicalRepository<Set<Role>> getSecurityRepository()
-      {
-
-         return null;
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.server.HornetQServer#getSession(java.lang.String)
-       */
-      public ServerSession getSession(String name)
-      {
-
-         return null;
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.server.HornetQServer#getSessions()
-       */
-      public Set<ServerSession> getSessions()
-      {
-
-         return null;
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.server.HornetQServer#getSessions(java.lang.String)
-       */
-      public List<ServerSession> getSessions(String connectionID)
-      {
-
-         return null;
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.server.HornetQServer#getStorageManager()
-       */
-      public StorageManager getStorageManager()
-      {
-
-         return null;
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.server.HornetQServer#getVersion()
-       */
-      public Version getVersion()
-      {
-
-         return null;
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.server.HornetQServer#isInitialised()
-       */
-      public boolean isInitialised()
-      {
-
-         return false;
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.server.HornetQServer#isStarted()
-       */
-      public boolean isStarted()
-      {
-
-         return false;
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.server.HornetQServer#reattachSession(org.hornetq.core.remoting.RemotingConnection, java.lang.String, int)
-       */
-      public ReattachSessionResponseMessage reattachSession(RemotingConnection connection,
-                                                            String name,
-                                                            int lastReceivedCommandID) throws Exception
-      {
-
-         return null;
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.server.HornetQServer#registerActivateCallback(org.hornetq.core.server.ActivateCallback)
-       */
-      public void registerActivateCallback(ActivateCallback callback)
-      {
-
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.server.HornetQServer#removeSession(java.lang.String)
-       */
-      public void removeSession(String name) throws Exception
-      {
-
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.server.HornetQServer#unregisterActivateCallback(org.hornetq.core.server.ActivateCallback)
-       */
-      public void unregisterActivateCallback(ActivateCallback callback)
-      {
-
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.server.HornetQComponent#start()
-       */
-      public void start() throws Exception
-      {
-
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.server.HornetQComponent#stop()
-       */
-      public void stop() throws Exception
-      {
-
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.server.HornetQServer#createReplicationEndpoint()
-       */
-      public ReplicationEndpoint createReplicationEndpoint()
-      {
-         return new ReplicationEndpointImpl(this);
-      }
-
-   }
 }



More information about the hornetq-commits mailing list