[hornetq-commits] JBoss hornetq SVN: r8302 - in branches/ClebertTemporary: src/main/org/hornetq/core/completion and 12 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Nov 17 16:40:57 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-11-17 16:40:56 -0500 (Tue, 17 Nov 2009)
New Revision: 8302

Added:
   branches/ClebertTemporary/src/main/org/hornetq/core/completion/
   branches/ClebertTemporary/src/main/org/hornetq/core/completion/CompletionContext.java
   branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/
   branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/CompletionContextImpl.java
Removed:
   branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationContext.java
   branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java
Modified:
   branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java
   branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
   branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java
   branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
   branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
   branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
   branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
another iteration

Copied: branches/ClebertTemporary/src/main/org/hornetq/core/completion/CompletionContext.java (from rev 8301, branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationContext.java)
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/completion/CompletionContext.java	                        (rev 0)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/completion/CompletionContext.java	2009-11-17 21:40:56 UTC (rev 8302)
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.completion;
+
+
+/**
+ * This represents a set of operations done as part of replication. 
+ * When the entire set is done a group of Runnables can be executed.
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface CompletionContext
+{
+   /** To be called by the replication manager, when new replication is added to the queue */
+   void linedUp();
+   
+   boolean hasData();
+
+   /** To be called by the replication manager, when data is confirmed on the channel */
+   void replicated();
+   
+   void afterCompletion(Runnable runnable);
+   
+   /** To be called when there are no more operations pending */
+   void complete();
+   
+   /** Flush all pending callbacks on the Context */
+   void flush();
+   
+   /** Replication may need some extra controls to guarantee ordering
+    *  when nothing is persisted through the contexts 
+    * @return The context is empty
+    */
+   boolean isEmpty();
+   
+   void setEmpty(boolean empty);
+
+}

Copied: branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/CompletionContextImpl.java (from rev 8301, branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java)
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/CompletionContextImpl.java	                        (rev 0)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/CompletionContextImpl.java	2009-11-17 21:40:56 UTC (rev 8302)
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.completion.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.hornetq.core.completion.CompletionContext;
+
+/**
+ * A ReplicationToken
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * TODO: Maybe I should move this to persistence.journal. I need to check a few dependencies first.
+ *
+ */
+public class CompletionContextImpl implements CompletionContext
+{
+   private static final ThreadLocal<CompletionContext> tlReplicationContext = new ThreadLocal<CompletionContext>();
+
+   public static CompletionContext getContext()
+   {
+      CompletionContext token = tlReplicationContext.get();
+      if (token == null)
+      {
+         token = new CompletionContextImpl();
+         tlReplicationContext.set(token);
+      }
+      return token;
+   }
+
+   private List<Runnable> tasks;
+
+   private int linedup = 0;
+
+   private int replicated = 0;
+
+   private boolean empty = false;
+
+   private volatile boolean complete = false;
+
+   /**
+    * @param executor
+    */
+   public CompletionContextImpl()
+   {
+      super();
+   }
+
+   /** To be called by the replication manager, when new replication is added to the queue */
+   public void linedUp()
+   {
+      linedup++;
+   }
+
+   public boolean hasData()
+   {
+      return linedup > 0;
+   }
+
+   /** You may have several actions to be done after a replication operation is completed. */
+   public void afterCompletion(Runnable runnable)
+   {
+      if (complete)
+      {
+         // Sanity check, this shouldn't happen
+         throw new IllegalStateException("The Replication Context is complete, and no more tasks are accepted");
+      }
+
+      if (tasks == null)
+      {
+         // No need to use Concurrent, we only add from a single thread.
+         // We don't add any more Runnables after it is complete
+         tasks = new ArrayList<Runnable>();
+      }
+
+      tasks.add(runnable);
+   }
+
+   /** To be called by the replication manager, when data is confirmed on the channel */
+   public synchronized void replicated()
+   {
+      if (++replicated == linedup && complete)
+      {
+         flush();
+      }
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.replication.ReplicationToken#complete()
+    */
+   public synchronized void complete()
+   {
+      tlReplicationContext.set(null);
+      complete = true;
+      if (replicated == linedup && complete)
+      {
+         flush();
+      }
+   }
+
+   public synchronized void flush()
+   {
+      if (tasks != null)
+      {
+         for (Runnable run : tasks)
+         {
+            run.run();
+         }
+         tasks.clear();
+      }
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.replication.ReplicationContext#isRoundtrip()
+    */
+   public boolean isEmpty()
+   {
+      return empty;
+   }
+
+   public void setEmpty(final boolean sync)
+   {
+      this.empty = sync;
+   }
+
+}

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java	2009-11-17 20:01:28 UTC (rev 8301)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java	2009-11-17 21:40:56 UTC (rev 8302)
@@ -56,7 +56,7 @@
 
    boolean isReplicated();
 
-   void afterReplicated(Runnable run);
+   void afterCompletion(Runnable run);
    
    /** Block until the replication is done. 
     * @throws Exception */

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-11-17 20:01:28 UTC (rev 8301)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-11-17 21:40:56 UTC (rev 8302)
@@ -33,6 +33,7 @@
 import javax.transaction.xa.Xid;
 
 import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.completion.impl.CompletionContextImpl;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.filter.Filter;
@@ -310,7 +311,7 @@
    public void waitOnReplication(final long timeout) throws Exception
    {
       final CountDownLatch latch = new CountDownLatch(1);
-      afterReplicated(new Runnable()
+      afterCompletion(new Runnable()
       {
          public void run()
          {
@@ -363,13 +364,9 @@
 
    // TODO: shouldn't those page methods be on the PageManager? ^^^^
 
-   public void afterReplicated(Runnable run)
+   public void afterCompletion(Runnable run)
    {
-      if (replicator == null)
-      {
-         throw new IllegalStateException("StorageManager is not replicated");
-      }
-      replicator.afterReplicated(run);
+      CompletionContextImpl.getContext().afterCompletion(run);
    }
 
    public UUID getPersistentID()

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2009-11-17 20:01:28 UTC (rev 8301)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2009-11-17 21:40:56 UTC (rev 8302)
@@ -284,7 +284,7 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.persistence.StorageManager#afterReplicated(java.lang.Runnable)
     */
-   public void afterReplicated(Runnable run)
+   public void afterCompletion(Runnable run)
    {
       run.run();
    }

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-11-17 20:01:28 UTC (rev 8301)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-11-17 21:40:56 UTC (rev 8302)
@@ -926,7 +926,7 @@
       {
          if (storageManager.isReplicated())
          {
-            storageManager.afterReplicated(new Runnable()
+            storageManager.afterCompletion(new Runnable()
             {
                public void run()
                {

Deleted: branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationContext.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationContext.java	2009-11-17 20:01:28 UTC (rev 8301)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationContext.java	2009-11-17 21:40:56 UTC (rev 8302)
@@ -1,47 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.replication;
-
-
-/**
- * This represents a set of operations done as part of replication. 
- * When the entire set is done a group of Runnables can be executed.
- *
- * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- *
- */
-public interface ReplicationContext
-{
-   /** To be called by the replication manager, when new replication is added to the queue */
-   void linedUp();
-   
-   boolean hasReplication();
-
-   /** To be called by the replication manager, when data is confirmed on the channel */
-   void replicated();
-   
-   void addReplicationAction(Runnable runnable);
-   
-   /** To be called when there are no more operations pending */
-   void complete();
-   
-   /** Flush all pending callbacks on the Context */
-   void flush();
-   
-   boolean isSync();
-   
-   void setSync(boolean sync);
-
-}

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java	2009-11-17 20:01:28 UTC (rev 8301)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java	2009-11-17 21:40:56 UTC (rev 8302)
@@ -15,6 +15,7 @@
 
 import java.util.Set;
 
+import org.hornetq.core.completion.CompletionContext;
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.journal.EncodingSupport;
 import org.hornetq.core.journal.JournalLoadInformation;
@@ -49,13 +50,10 @@
 
    void appendRollbackRecord(byte journalID, long txID) throws Exception;
    
-   /** Add an action to be executed after the pending replications */
-   void afterReplicated(Runnable runnable);
-   
    void closeContext();
    
    /** A list of tokens that are still waiting for replications to be completed */
-   Set<ReplicationContext> getActiveTokens();
+   Set<CompletionContext> getActiveTokens();
 
    /**
     * @param storeName

Deleted: branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java	2009-11-17 20:01:28 UTC (rev 8301)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java	2009-11-17 21:40:56 UTC (rev 8302)
@@ -1,126 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.replication.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.hornetq.core.replication.ReplicationContext;
-
-/**
- * A ReplicationToken
- *
- * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class ReplicationContextImpl implements ReplicationContext
-{
-   private List<Runnable> tasks;
-
-   private int linedup = 0;
-
-   private int replicated = 0;
-
-   private boolean sync = false;
-
-   private volatile boolean complete = false;
-
-   /**
-    * @param executor
-    */
-   public ReplicationContextImpl()
-   {
-      super();
-   }
-
-   /** To be called by the replication manager, when new replication is added to the queue */
-   public void linedUp()
-   {
-      linedup++;
-   }
-
-   public boolean hasReplication()
-   {
-      return linedup > 0;
-   }
-
-   /** You may have several actions to be done after a replication operation is completed. */
-   public void addReplicationAction(Runnable runnable)
-   {
-      if (complete)
-      {
-         // Sanity check, this shouldn't happen
-         throw new IllegalStateException("The Replication Context is complete, and no more tasks are accepted");
-      }
-
-      if (tasks == null)
-      {
-         // No need to use Concurrent, we only add from a single thread.
-         // We don't add any more Runnables after it is complete
-         tasks = new ArrayList<Runnable>();
-      }
-
-      tasks.add(runnable);
-   }
-
-   /** To be called by the replication manager, when data is confirmed on the channel */
-   public synchronized void replicated()
-   {
-      // roundtrip packets won't have lined up packets
-      if (++replicated == linedup && complete)
-      {
-         flush();
-      }
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.core.replication.ReplicationToken#complete()
-    */
-   public synchronized void complete()
-   {
-      complete = true;
-      if (replicated == linedup && complete)
-      {
-         flush();
-      }
-   }
-
-   public synchronized void flush()
-   {
-      if (tasks != null)
-      {
-         for (Runnable run : tasks)
-         {
-            run.run();
-         }
-         tasks.clear();
-      }
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.core.replication.ReplicationContext#isRoundtrip()
-    */
-   public boolean isSync()
-   {
-      return sync;
-   }
-
-   public void setSync(final boolean sync)
-   {
-      this.sync = sync;
-   }
-
-}

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-11-17 20:01:28 UTC (rev 8301)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-11-17 21:40:56 UTC (rev 8302)
@@ -13,14 +13,17 @@
 
 package org.hornetq.core.replication.impl;
 
-import java.util.ArrayList;
 import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.hornetq.core.client.SessionFailureListener;
 import org.hornetq.core.client.impl.FailoverManager;
+import org.hornetq.core.completion.CompletionContext;
+import org.hornetq.core.completion.impl.CompletionContextImpl;
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.journal.EncodingSupport;
 import org.hornetq.core.journal.JournalLoadInformation;
@@ -45,9 +48,7 @@
 import org.hornetq.core.remoting.impl.wireformat.ReplicationPageWriteMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
-import org.hornetq.core.replication.ReplicationContext;
 import org.hornetq.core.replication.ReplicationManager;
-import org.hornetq.utils.ConcurrentHashSet;
 import org.hornetq.utils.SimpleString;
 
 /**
@@ -81,10 +82,8 @@
 
    private final Object replicationLock = new Object();
 
-   private final ThreadLocal<ReplicationContext> tlReplicationContext = new ThreadLocal<ReplicationContext>();
+   private final Queue<CompletionContext> pendingTokens = new ConcurrentLinkedQueue<CompletionContext>();
 
-   private final Queue<ReplicationContext> pendingTokens = new ConcurrentLinkedQueue<ReplicationContext>();
-
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -360,17 +359,17 @@
    {
       enabled = false;
       
-      LinkedHashSet<ReplicationContext> activeContexts = new LinkedHashSet<ReplicationContext>();
+      LinkedHashSet<CompletionContext> activeContexts = new LinkedHashSet<CompletionContext>();
       
       // The same context will be replicated on the pending tokens...
       // as the multiple operations will be replicated on the same context
       while (!pendingTokens.isEmpty())
       {
-         ReplicationContext ctx = pendingTokens.poll();
+         CompletionContext ctx = pendingTokens.poll();
          activeContexts.add(ctx);
       }
 
-      for (ReplicationContext ctx : activeContexts)
+      for (CompletionContext ctx : activeContexts)
       {
          ctx.complete();
          ctx.flush();
@@ -393,38 +392,17 @@
       started = false;
    }
 
-   public ReplicationContext getContext()
-   {
-      ReplicationContext token = tlReplicationContext.get();
-      if (token == null)
-      {
-         token = new ReplicationContextImpl();
-         tlReplicationContext.set(token);
-      }
-      return token;
-   }
-
    /* (non-Javadoc)
-    * @see org.hornetq.core.replication.ReplicationManager#addReplicationAction(java.lang.Runnable)
-    */
-   public void afterReplicated(final Runnable runnable)
-   {
-      getContext().addReplicationAction(runnable);
-   }
-
-   /* (non-Javadoc)
     * @see org.hornetq.core.replication.ReplicationManager#completeToken()
     */
    public void closeContext()
    {
-      final ReplicationContext token = tlReplicationContext.get();
+      final CompletionContext token = getContext();
 
       if (token != null)
       {
-         // Disassociate thread local
-         tlReplicationContext.set(null);
          // Remove from pending tokens as soon as this is complete
-         if (!token.hasReplication())
+         if (!token.hasData())
          {
             sync(token);
          }
@@ -432,18 +410,19 @@
       }
    }
 
+
    /* method for testcases only
     * @see org.hornetq.core.replication.ReplicationManager#getPendingTokens()
     */
-   public Set<ReplicationContext> getActiveTokens()
+   public Set<CompletionContext> getActiveTokens()
    {
       
-      LinkedHashSet<ReplicationContext> activeContexts = new LinkedHashSet<ReplicationContext>();
+      LinkedHashSet<CompletionContext> activeContexts = new LinkedHashSet<CompletionContext>();
       
       // The same context will be replicated on the pending tokens...
       // as the multiple operations will be replicated on the same context
       
-      for (ReplicationContext ctx : pendingTokens)
+      for (CompletionContext ctx : pendingTokens)
       {
          activeContexts.add(ctx);
       }
@@ -456,7 +435,7 @@
    {
       boolean runItNow = false;
 
-      ReplicationContext repliToken = getContext();
+      CompletionContext repliToken = getContext();
       repliToken.linedUp();
 
       synchronized (replicationLock)
@@ -493,9 +472,9 @@
 
    private void replicated()
    {
-      ArrayList<ReplicationContext> tokensToExecute = getTokens();
+      List<CompletionContext> tokensToExecute = getTokens();
 
-      for (ReplicationContext ctx : tokensToExecute)
+      for (CompletionContext ctx : tokensToExecute)
       {
          ctx.replicated();
       }
@@ -507,13 +486,13 @@
 
    // Private -------------------------------------------------------
 
-   private void sync(ReplicationContext context)
+   private void sync(CompletionContext context)
    {
       boolean executeNow = false;
       synchronized (replicationLock)
       {
          context.linedUp();
-         context.setSync(true);
+         context.setEmpty(true);
          if (pendingTokens.isEmpty())
          {
             // this means the list is empty and we should process it now
@@ -532,6 +511,11 @@
       }
    }
 
+   
+   public CompletionContext getContext()
+   {
+      return CompletionContextImpl.getContext();
+   }
 
    /**
     * This method will first get all the sync tokens (that won't go to the backup node)
@@ -539,11 +523,11 @@
     * At last, if the list is empty, it will verify if there are any future tokens that are sync tokens, to avoid a case where no more replication is done due to inactivity.
     * @return
     */
-   private ArrayList<ReplicationContext> getTokens()
+   private List<CompletionContext> getTokens()
    {
-      ArrayList<ReplicationContext> retList = new ArrayList<ReplicationContext>(1);
+      List<CompletionContext> retList = new LinkedList<CompletionContext>();
 
-      ReplicationContext tokenPolled = null;
+      CompletionContext tokenPolled = null;
 
       // First will get all the non replicated tokens up to the first one that is not replicated
       do
@@ -558,16 +542,16 @@
          retList.add(tokenPolled);
 
       }
-      while (tokenPolled.isSync());
+      while (tokenPolled.isEmpty());
 
       // This is to avoid a situation where we won't have more replicated packets
-      // all the packets will need to be processed
+      // We need to make sure we process any pending sync packet up to the next non empty packet
       synchronized (replicationLock)
       {
-         while (!pendingTokens.isEmpty() && pendingTokens.peek().isSync())
+         while (!pendingTokens.isEmpty() && pendingTokens.peek().isEmpty())
          {
             tokenPolled = pendingTokens.poll();
-            if (!tokenPolled.isSync())
+            if (!tokenPolled.isEmpty())
             {
                throw new IllegalStateException("Replicatoin context is not a roundtrip token as expected");
             }

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java	2009-11-17 20:01:28 UTC (rev 8301)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java	2009-11-17 21:40:56 UTC (rev 8302)
@@ -146,7 +146,7 @@
 
       if (storageManager.isReplicated())
       {
-         storageManager.afterReplicated(new Runnable()
+         storageManager.afterCompletion(new Runnable()
          {
             public void run()
             {

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-17 20:01:28 UTC (rev 8301)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-17 21:40:56 UTC (rev 8302)
@@ -1720,7 +1720,7 @@
    {
       if (storageManager.isReplicated())
       {
-         storageManager.afterReplicated(new Runnable()
+         storageManager.afterCompletion(new Runnable()
          {
             public void run()
             {

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java	2009-11-17 20:01:28 UTC (rev 8301)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java	2009-11-17 21:40:56 UTC (rev 8302)
@@ -244,7 +244,7 @@
             {
                if (storageManager.isReplicated())
                {
-                  storageManager.afterReplicated(execAfterCommit);
+                  storageManager.afterCompletion(execAfterCommit);
                }
                else
                {

Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-11-17 20:01:28 UTC (rev 8301)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-11-17 21:40:56 UTC (rev 8302)
@@ -32,6 +32,7 @@
 import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
 import org.hornetq.core.client.impl.FailoverManager;
 import org.hornetq.core.client.impl.FailoverManagerImpl;
+import org.hornetq.core.completion.impl.CompletionContextImpl;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.TransportConfiguration;
 import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -272,20 +273,8 @@
          replicatedJournal.appendPrepareRecord(3, new FakeData(), false);
          replicatedJournal.appendRollbackRecord(3, false);
 
-         assertEquals(1, manager.getActiveTokens().size());
-
          blockOnReplication(manager);
 
-         for (int i = 0; i < 100; i++)
-         {
-            // This is asynchronous. Have to wait completion
-            if (manager.getActiveTokens().size() == 0)
-            {
-               break;
-            }
-            Thread.sleep(1);
-         }
-
          assertEquals(0, manager.getActiveTokens().size());
 
          ServerMessage msg = new ServerMessageImpl();
@@ -386,7 +375,7 @@
          }
 
          final CountDownLatch latch = new CountDownLatch(1);
-         manager.afterReplicated(new Runnable()
+         CompletionContextImpl.getContext().afterCompletion(new Runnable()
          {
             public void run()
             {
@@ -413,7 +402,7 @@
    private void blockOnReplication(ReplicationManagerImpl manager) throws Exception
    {
       final CountDownLatch latch = new CountDownLatch(1);
-      manager.afterReplicated(new Runnable()
+      CompletionContextImpl.getContext().afterCompletion(new Runnable()
       {
 
          public void run()
@@ -469,7 +458,7 @@
          replicatedJournal.appendPrepareRecord(1, new FakeData(), false);
 
          final CountDownLatch latch = new CountDownLatch(1);
-         manager.afterReplicated(new Runnable()
+         CompletionContextImpl.getContext().afterCompletion(new Runnable()
          {
 
             public void run()
@@ -541,7 +530,7 @@
             }
 
 
-            manager.afterReplicated(new Runnable()
+            CompletionContextImpl.getContext().afterCompletion(new Runnable()
             {
 
                public void run()

Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-11-17 20:01:28 UTC (rev 8301)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-11-17 21:40:56 UTC (rev 8302)
@@ -1155,7 +1155,7 @@
       /* (non-Javadoc)
        * @see org.hornetq.core.persistence.StorageManager#afterReplicated(java.lang.Runnable)
        */
-      public void afterReplicated(Runnable run)
+      public void afterCompletion(Runnable run)
       {
 
       }



More information about the hornetq-commits mailing list