[hornetq-commits] JBoss hornetq SVN: r8038 - in branches/Replication_Clebert: src/main/org/hornetq/core/replication and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Oct 2 13:15:12 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-10-02 13:15:12 -0400 (Fri, 02 Oct 2009)
New Revision: 8038

Modified:
   branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPrepareMessage.java
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationToken.java
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java
   branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
Changes

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPrepareMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPrepareMessage.java	2009-10-02 15:59:19 UTC (rev 8037)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPrepareMessage.java	2009-10-02 17:15:12 UTC (rev 8038)
@@ -46,7 +46,7 @@
 
    public ReplicationPrepareMessage()
    {
-      super(REPLICATION_DELETE_TX);
+      super(REPLICATION_PREPARE);
    }
 
    public ReplicationPrepareMessage(byte journalID, long txId, EncodingSupport encodingData)

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java	2009-10-02 15:59:19 UTC (rev 8037)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java	2009-10-02 17:15:12 UTC (rev 8038)
@@ -13,6 +13,8 @@
 
 package org.hornetq.core.replication;
 
+import java.util.Set;
+
 import org.hornetq.core.journal.EncodingSupport;
 import org.hornetq.core.server.HornetQComponent;
 
@@ -42,5 +44,13 @@
    void appendPrepareRecord(byte journalID, long txID, EncodingSupport transactionData) throws Exception;
 
    void appendRollbackRecord(byte journalID, long txID) throws Exception;
+   
+   /** Add an action to be executed after the pending replications */
+   void addReplicationAction(Runnable runnable);
+   
+   void completeToken();
+   
+   /** A list of tokens that are still waiting for replications to be completed */
+   Set<ReplicationToken> getActiveTokens();
 
 }

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationToken.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationToken.java	2009-10-02 15:59:19 UTC (rev 8037)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationToken.java	2009-10-02 17:15:12 UTC (rev 8038)
@@ -30,7 +30,7 @@
    /** To be called by the replication manager, when data is confirmed on the channel */
    void replicated();
    
-   void addFutureCompletion(Runnable runnable);
+   void addReplicationAction(Runnable runnable);
    
    /** To be called when there are no more operations pending */
    void complete();

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2009-10-02 15:59:19 UTC (rev 8037)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2009-10-02 17:15:12 UTC (rev 8038)
@@ -122,7 +122,7 @@
       Configuration config = server.getConfiguration();
 
       // TODO: this needs an executor
-      JournalStorageManager storage = new JournalStorageManager(config, null);
+      storage = new JournalStorageManager(config, null);
       storage.start();
 
       this.bindingsJournal = storage.getBindingsJournal();

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-10-02 15:59:19 UTC (rev 8037)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-10-02 17:15:12 UTC (rev 8038)
@@ -14,12 +14,12 @@
 package org.hornetq.core.replication.impl;
 
 import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 
 import org.hornetq.core.client.impl.ConnectionManager;
 import org.hornetq.core.journal.EncodingSupport;
-import org.hornetq.core.journal.impl.JournalImpl;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.remoting.ChannelHandler;
@@ -36,6 +36,7 @@
 import org.hornetq.core.remoting.spi.HornetQBuffer;
 import org.hornetq.core.replication.ReplicationManager;
 import org.hornetq.core.replication.ReplicationToken;
+import org.hornetq.utils.ConcurrentHashSet;
 
 /**
  * A RepplicationManagerImpl
@@ -75,6 +76,8 @@
 
    private final Queue<ReplicationToken> pendingTokens = new ConcurrentLinkedQueue<ReplicationToken>();
 
+   private final ConcurrentHashSet<ReplicationToken> activeTokens = new ConcurrentHashSet<ReplicationToken>();
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -121,7 +124,6 @@
       sendReplicatePacket(new ReplicationAddTXMessage(journalID, false, txID, id, recordType, record));
    }
 
-   
    /* (non-Javadoc)
     * @see org.hornetq.core.replication.ReplicationManager#appendUpdateRecordTransactional(byte, long, long, byte, org.hornetq.core.journal.EncodingSupport)
     */
@@ -134,7 +136,6 @@
       sendReplicatePacket(new ReplicationAddTXMessage(journalID, true, txID, id, recordType, record));
    }
 
-   
    /* (non-Javadoc)
     * @see org.hornetq.core.replication.ReplicationManager#appendCommitRecord(byte, long, boolean)
     */
@@ -232,11 +233,50 @@
       if (token == null)
       {
          token = new ReplicationTokenImpl(executor);
+         activeTokens.add(token);
          repliToken.set(token);
       }
       return token;
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.replication.ReplicationManager#addReplicationAction(java.lang.Runnable)
+    */
+   public void addReplicationAction(Runnable runnable)
+   {
+      getReplicationToken().addReplicationAction(runnable);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.replication.ReplicationManager#completeToken()
+    */
+   public void completeToken()
+   {
+      final ReplicationToken token = repliToken.get();
+      if (token != null)
+      {
+         // Disassociate thread local
+         repliToken.set(null);
+         // Remove from pending tokens as soon as this is complete
+         token.addReplicationAction(new Runnable()
+         {
+            public void run()
+            {
+               activeTokens.remove(token);
+            }
+         });
+      }
+   }
+   
+   /* (non-Javadoc)
+    * @see org.hornetq.core.replication.ReplicationManager#getPendingTokens()
+    */
+   public Set<ReplicationToken> getActiveTokens()
+   {
+      return activeTokens;
+   }
+
+
    private void sendReplicatePacket(final Packet packet)
    {
       boolean runItNow = false;
@@ -298,7 +338,6 @@
        */
       public void handlePacket(Packet packet)
       {
-         System.out.println("HandlePacket on client");
          if (packet.getType() == PacketImpl.REPLICATION_RESPONSE)
          {
             replicated();
@@ -311,7 +350,7 @@
    {
 
       static NullEncoding instance = new NullEncoding();
-      
+
       public void decode(final HornetQBuffer buffer)
       {
       }
@@ -327,4 +366,5 @@
 
    }
 
+
 }

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java	2009-10-02 15:59:19 UTC (rev 8037)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java	2009-10-02 17:15:12 UTC (rev 8038)
@@ -66,7 +66,7 @@
    }
    
    /** You may have several actions to be done after a replication operation is completed. */
-   public synchronized void addFutureCompletion(Runnable runnable)
+   public synchronized void addReplicationAction(Runnable runnable)
    {
       System.out.println("pendings on addFutureCompletion = " + pendings);
       if (pendings == 0)

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-10-02 15:59:19 UTC (rev 8037)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-10-02 17:15:12 UTC (rev 8038)
@@ -354,6 +354,12 @@
       {
          storageManager.stop();
       }
+      
+      if (replicationEndpoint != null)
+      {
+         replicationEndpoint.stop();
+         replicationEndpoint = null;
+      }
 
       if (securityManager != null)
       {

Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-10-02 15:59:19 UTC (rev 8037)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-10-02 17:15:12 UTC (rev 8038)
@@ -146,20 +146,22 @@
       {
          ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager, executor);
          manager.start();
-         
+
+         manager.appendPrepareRecord((byte)0, 100, new FakeData());
+
          manager.appendAddRecord((byte)0, 1, (byte)1, new FakeData());
          manager.appendUpdateRecord((byte)0, 1, (byte)2, new FakeData());
          manager.appendDeleteRecord((byte)0, 1);
          manager.appendAddRecordTransactional((byte)0, 2, 2, (byte)1, new FakeData());
          manager.appendUpdateRecordTransactional((byte)0, 2, 2, (byte)2, new FakeData());
          manager.appendCommitRecord((byte)0, 2);
-         
-         manager.appendDeleteRecordTransactional((byte)0, 3, 4,new FakeData());
+
+         manager.appendDeleteRecordTransactional((byte)0, 3, 4, new FakeData());
          manager.appendPrepareRecord((byte)0, 3, new FakeData());
          manager.appendRollbackRecord((byte)0, 3);
 
          final CountDownLatch latch = new CountDownLatch(1);
-         manager.getReplicationToken().addFutureCompletion(new Runnable()
+         manager.getReplicationToken().addReplicationAction(new Runnable()
          {
 
             public void run()
@@ -169,6 +171,21 @@
 
          });
          assertTrue(latch.await(1, TimeUnit.SECONDS));
+         assertEquals(1, manager.getActiveTokens().size());
+         
+         manager.completeToken();
+         
+         for (int i = 0 ; i < 100; i++)
+         {
+            // This is asynchronous. Have to wait completion
+            if (manager.getActiveTokens().size() == 0)
+            {
+               break;
+            }
+            Thread.sleep(1);
+         }
+
+         assertEquals(0, manager.getActiveTokens().size());
          manager.stop();
       }
       finally
@@ -265,6 +282,7 @@
 
    protected void tearDown() throws Exception
    {
+
       executor.shutdown();
 
       scheduledExecutor.shutdown();
@@ -274,6 +292,9 @@
       connectionManager = null;
 
       scheduledExecutor = null;
+      
+      super.tearDown();
+
    }
 
    // Private -------------------------------------------------------



More information about the hornetq-commits mailing list