[hornetq-commits] JBoss hornetq SVN: r8303 - in branches/ClebertTemporary: src/main/org/hornetq/core/completion and 16 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Nov 17 21:52:31 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-11-17 21:52:31 -0500 (Tue, 17 Nov 2009)
New Revision: 8303

Added:
   branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationContext.java
   branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationExceptionCallback.java
   branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/OperationContextImpl.java
   branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/SyncIOCompletion.java
Removed:
   branches/ClebertTemporary/src/main/org/hornetq/core/completion/CompletionContext.java
   branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/CompletionContextImpl.java
Modified:
   branches/ClebertTemporary/.classpath
   branches/ClebertTemporary/src/main/org/hornetq/core/journal/IOCompletion.java
   branches/ClebertTemporary/src/main/org/hornetq/core/journal/Journal.java
   branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
   branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
   branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/DummyCallback.java
   branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java
   branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TransactionCallback.java
   branches/ClebertTemporary/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java
   branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
   branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java
   branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
   branches/ClebertTemporary/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
   branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
   branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java
   branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
   branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
Refactoring II - won't commit at this point (some stuff pending and I wanted to save this checkpoint)

Modified: branches/ClebertTemporary/.classpath
===================================================================
--- branches/ClebertTemporary/.classpath	2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/.classpath	2009-11-18 02:52:31 UTC (rev 8303)
@@ -7,7 +7,7 @@
 	<classpathentry kind="src" path="tests/config"/>
 	<classpathentry excluding="**/.svn/**/*" kind="src" path="tests/src">
 		<attributes>
-			<attribute name="org.eclipse.jdt.launching.CLASSPATH_ATTR_LIBRARY_PATH_ENTRY" value="trunk/native/bin"/>
+			<attribute name="org.eclipse.jdt.launching.CLASSPATH_ATTR_LIBRARY_PATH_ENTRY" value="trunk-tmp/native/bin"/>
 		</attributes>
 	</classpathentry>
 	<classpathentry kind="src" path="tests/jms-tests/src"/>

Deleted: branches/ClebertTemporary/src/main/org/hornetq/core/completion/CompletionContext.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/completion/CompletionContext.java	2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/completion/CompletionContext.java	2009-11-18 02:52:31 UTC (rev 8303)
@@ -1,51 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.completion;
-
-
-/**
- * This represents a set of operations done as part of replication. 
- * When the entire set is done a group of Runnables can be executed.
- *
- * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- *
- */
-public interface CompletionContext
-{
-   /** To be called by the replication manager, when new replication is added to the queue */
-   void linedUp();
-   
-   boolean hasData();
-
-   /** To be called by the replication manager, when data is confirmed on the channel */
-   void replicated();
-   
-   void afterCompletion(Runnable runnable);
-   
-   /** To be called when there are no more operations pending */
-   void complete();
-   
-   /** Flush all pending callbacks on the Context */
-   void flush();
-   
-   /** Replication may need some extra controls to guarantee ordering
-    *  when nothing is persisted through the contexts 
-    * @return The context is empty
-    */
-   boolean isEmpty();
-   
-   void setEmpty(boolean empty);
-
-}

Copied: branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationContext.java (from rev 8302, branches/ClebertTemporary/src/main/org/hornetq/core/completion/CompletionContext.java)
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationContext.java	                        (rev 0)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationContext.java	2009-11-18 02:52:31 UTC (rev 8303)
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.completion;
+
+import org.hornetq.core.journal.IOCompletion;
+
+
+/**
+ * This represents a set of operations done as part of replication. 
+ * When the entire set is done a group of Runnables can be executed.
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface OperationContext extends IOCompletion
+{
+   /** To be called by the replication manager, when new replication is added to the queue */
+   void linedUp();
+   
+   boolean hasData();
+
+   void executeOnCompletion(IOCompletion runnable);
+   
+   /** To be called when there are no more operations pending */
+   void complete();
+   
+   /** Flush all pending callbacks on the Context */
+   void flush();
+   
+   /** Replication may need some extra controls to guarantee ordering
+    *  when nothing is persisted through the contexts 
+    * @return The context is empty
+    */
+   boolean isEmpty();
+   
+   void setEmpty(boolean empty);
+
+}

Added: branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationExceptionCallback.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationExceptionCallback.java	                        (rev 0)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationExceptionCallback.java	2009-11-18 02:52:31 UTC (rev 8303)
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.completion;
+
+/**
+ * A OperationExceptionCallback
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface OperationExceptionCallback
+{
+   void onError(int errorCode, String errorMessage);
+}

Deleted: branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/CompletionContextImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/CompletionContextImpl.java	2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/CompletionContextImpl.java	2009-11-18 02:52:31 UTC (rev 8303)
@@ -1,139 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.completion.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.hornetq.core.completion.CompletionContext;
-
-/**
- * A ReplicationToken
- *
- * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- * TODO: Maybe I should move this to persistence.journal. I need to check a few dependencies first.
- *
- */
-public class CompletionContextImpl implements CompletionContext
-{
-   private static final ThreadLocal<CompletionContext> tlReplicationContext = new ThreadLocal<CompletionContext>();
-
-   public static CompletionContext getContext()
-   {
-      CompletionContext token = tlReplicationContext.get();
-      if (token == null)
-      {
-         token = new CompletionContextImpl();
-         tlReplicationContext.set(token);
-      }
-      return token;
-   }
-
-   private List<Runnable> tasks;
-
-   private int linedup = 0;
-
-   private int replicated = 0;
-
-   private boolean empty = false;
-
-   private volatile boolean complete = false;
-
-   /**
-    * @param executor
-    */
-   public CompletionContextImpl()
-   {
-      super();
-   }
-
-   /** To be called by the replication manager, when new replication is added to the queue */
-   public void linedUp()
-   {
-      linedup++;
-   }
-
-   public boolean hasData()
-   {
-      return linedup > 0;
-   }
-
-   /** You may have several actions to be done after a replication operation is completed. */
-   public void afterCompletion(Runnable runnable)
-   {
-      if (complete)
-      {
-         // Sanity check, this shouldn't happen
-         throw new IllegalStateException("The Replication Context is complete, and no more tasks are accepted");
-      }
-
-      if (tasks == null)
-      {
-         // No need to use Concurrent, we only add from a single thread.
-         // We don't add any more Runnables after it is complete
-         tasks = new ArrayList<Runnable>();
-      }
-
-      tasks.add(runnable);
-   }
-
-   /** To be called by the replication manager, when data is confirmed on the channel */
-   public synchronized void replicated()
-   {
-      if (++replicated == linedup && complete)
-      {
-         flush();
-      }
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.core.replication.ReplicationToken#complete()
-    */
-   public synchronized void complete()
-   {
-      tlReplicationContext.set(null);
-      complete = true;
-      if (replicated == linedup && complete)
-      {
-         flush();
-      }
-   }
-
-   public synchronized void flush()
-   {
-      if (tasks != null)
-      {
-         for (Runnable run : tasks)
-         {
-            run.run();
-         }
-         tasks.clear();
-      }
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.core.replication.ReplicationContext#isRoundtrip()
-    */
-   public boolean isEmpty()
-   {
-      return empty;
-   }
-
-   public void setEmpty(final boolean sync)
-   {
-      this.empty = sync;
-   }
-
-}

Copied: branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/OperationContextImpl.java (from rev 8302, branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/CompletionContextImpl.java)
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/OperationContextImpl.java	                        (rev 0)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/OperationContextImpl.java	2009-11-18 02:52:31 UTC (rev 8303)
@@ -0,0 +1,155 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.completion.impl;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.hornetq.core.completion.OperationContext;
+import org.hornetq.core.journal.IOCompletion;
+
+/**
+ * A ReplicationToken
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * TODO: Maybe I should move this to persistence.journal. I need to check a few dependencies first.
+ *
+ */
+public class OperationContextImpl implements OperationContext
+{
+   private static final ThreadLocal<OperationContext> tlContext = new ThreadLocal<OperationContext>();
+
+   public static OperationContext getContext()
+   {
+      OperationContext token = tlContext.get();
+      if (token == null)
+      {
+         token = new OperationContextImpl();
+         tlContext.set(token);
+      }
+      return token;
+   }
+
+   private List<IOCompletion> tasks;
+   
+   private int linedup = 0;
+
+   private int replicated = 0;
+
+   private boolean empty = false;
+
+   private volatile boolean complete = false;
+
+   /**
+    * @param executor
+    */
+   public OperationContextImpl()
+   {
+      super();
+   }
+
+   /** To be called by the replication manager, when new replication is added to the queue */
+   public void linedUp()
+   {
+      linedup++;
+   }
+
+   public boolean hasData()
+   {
+      return linedup > 0;
+   }
+
+   /** You may have several actions to be done after a replication operation is completed. */
+   public void executeOnCompletion(IOCompletion completion)
+   {
+      if (complete)
+      {
+         // Sanity check, this shouldn't happen
+         throw new IllegalStateException("The Replication Context is complete, and no more tasks are accepted");
+      }
+
+      if (tasks == null)
+      {
+         // No need to use Concurrent, we only add from a single thread.
+         // We don't add any more Runnables after it is complete
+         tasks = new LinkedList<IOCompletion>();
+      }
+
+      tasks.add(completion);
+   }
+
+   /** To be called by the replication manager, when data is confirmed on the channel */
+   public synchronized void done()
+   {
+      if (++replicated == linedup && complete)
+      {
+         flush();
+      }
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.replication.ReplicationToken#complete()
+    */
+   public synchronized void complete()
+   {
+      tlContext.set(null);
+      complete = true;
+      if (replicated == linedup && complete)
+      {
+         flush();
+      }
+   }
+
+   public synchronized void flush()
+   {
+      if (tasks != null)
+      {
+         for (IOCompletion run : tasks)
+         {
+            run.done();
+         }
+         tasks.clear();
+      }
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.replication.ReplicationContext#isRoundtrip()
+    */
+   public boolean isEmpty()
+   {
+      return empty;
+   }
+
+   public void setEmpty(final boolean sync)
+   {
+      this.empty = sync;
+   }
+
+   
+   /* (non-Javadoc)
+    * @see org.hornetq.core.asyncio.AIOCallback#onError(int, java.lang.String)
+    */
+   public void onError(int errorCode, String errorMessage)
+   {
+      if (tasks != null)
+      {
+         for (IOCompletion run : tasks)
+         {
+            run.onError(errorCode, errorMessage);
+         }
+      }
+   }
+
+}

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/IOCompletion.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/IOCompletion.java	2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/IOCompletion.java	2009-11-18 02:52:31 UTC (rev 8303)
@@ -25,5 +25,4 @@
  */
 public interface IOCompletion extends AIOCallback
 {
-   void waitCompletion() throws Exception;
 }

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/Journal.java	2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/Journal.java	2009-11-18 02:52:31 UTC (rev 8303)
@@ -19,8 +19,10 @@
 
 /**
  * 
- * A Journal
+ * Most methods on the journal provide a blocking version where you select the sync mode and a non blocking mode where you pass a completion callback as a parameter.
  * 
+ * Notice also that even on the callback methods it's possible to pass the sync mode. That will only make sense on the NIO operations.
+ * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
@@ -31,14 +33,24 @@
 
    void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;
 
+   void appendAddRecord(long id, byte recordType, byte[] record, boolean sync, IOCompletion completionCallback) throws Exception;
+
    void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception;
 
+   void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync, IOCompletion completionCallback) throws Exception;
+
    void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;
 
+   void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync, IOCompletion completionCallback) throws Exception;
+
    void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception;
 
+   void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync, IOCompletion completionCallback) throws Exception;
+
    void appendDeleteRecord(long id, boolean sync) throws Exception;
 
+   void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception;
+
    // Transactional operations
 
    void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
@@ -57,6 +69,8 @@
 
    void appendCommitRecord(long txID, boolean sync) throws Exception;
 
+   void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws Exception;
+
    /** 
     * 
     * <p>If the system crashed after a prepare was called, it should store information that is required to bring the transaction 
@@ -70,10 +84,16 @@
     */
    void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync) throws Exception;
 
+   void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync, IOCompletion callback) throws Exception;
+
    void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws Exception;
 
+   void appendPrepareRecord(long txID, byte[] transactionData, boolean sync, IOCompletion callback) throws Exception;
+
    void appendRollbackRecord(long txID, boolean sync) throws Exception;
 
+   void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception;
+
    // Load
    
    JournalLoadInformation load(LoaderCallback reloadManager) throws Exception;

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java	2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java	2009-11-18 02:52:31 UTC (rev 8303)
@@ -229,7 +229,7 @@
 
    public int read(final ByteBuffer bytes) throws Exception
    {
-      IOCompletion waitCompletion = SimpleWaitIOCallback.getInstance();
+      SimpleWaitIOCallback waitCompletion = new SimpleWaitIOCallback();
 
       int bytesRead = read(bytes, waitCompletion);
 
@@ -281,7 +281,7 @@
    {
       if (sync)
       {
-         IOCompletion completion = SimpleWaitIOCallback.getInstance();
+         SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
   
          writeDirect(bytes, true, completion);
   

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java	2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java	2009-11-18 02:52:31 UTC (rev 8303)
@@ -178,7 +178,7 @@
    {
       if (sync)
       {
-         IOCompletion completion = SimpleWaitIOCallback.getInstance();
+         SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
 
          write(bytes, true, completion);
 

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/DummyCallback.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/DummyCallback.java	2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/DummyCallback.java	2009-11-18 02:52:31 UTC (rev 8303)
@@ -14,7 +14,6 @@
 
 package org.hornetq.core.journal.impl;
 
-import org.hornetq.core.journal.IOCompletion;
 import org.hornetq.core.logging.Logger;
 
 /**
@@ -24,13 +23,13 @@
  *
  *
  */
-public  class DummyCallback implements IOCompletion
+class DummyCallback extends SyncIOCompletion
 {
    private static DummyCallback instance = new DummyCallback();
    
    private static final Logger log = Logger.getLogger(SimpleWaitIOCallback.class);
    
-   public static IOCompletion getInstance()
+   public static DummyCallback getInstance()
    {
       return instance;
    }

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-11-18 02:52:31 UTC (rev 8303)
@@ -845,15 +845,31 @@
       appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync);
    }
 
+   public void appendAddRecord(final long id, final byte recordType, final byte[] record, final boolean sync, final IOCompletion callback) throws Exception
+   {
+      appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync, callback);
+   }
+   
    public void appendAddRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync) throws Exception
    {
+      SyncIOCompletion callback = getSyncCallback(sync);
+      
+      appendAddRecord(id, recordType, record, sync, callback);
+      
+      // We only wait on explicit callbacks
+      if (callback != null)
+      {
+         callback.waitCompletion();
+      }
+   }
+
+   public void appendAddRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync, final IOCompletion callback) throws Exception
+   {
       if (state != STATE_LOADED)
       {
          throw new IllegalStateException("Journal must be loaded first");
       }
 
-      IOCompletion callback = null;
-
       compactingLock.readLock().lock();
 
       try
@@ -864,8 +880,6 @@
 
          writeAddRecord(-1, id, recordType, record, size, bb); // fileID will be filled later
 
-         callback = getSyncCallback(sync);
-
          lockAppend.lock();
          try
          {
@@ -882,11 +896,6 @@
       {
          compactingLock.readLock().unlock();
       }
-
-      if (callback != null)
-      {
-         callback.waitCompletion();
-      }
    }
 
    public void appendUpdateRecord(final long id, final byte recordType, final byte[] record, final boolean sync) throws Exception
@@ -894,15 +903,31 @@
       appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync);
    }
 
+   public void appendUpdateRecord(final long id, final byte recordType, final byte[] record, final boolean sync, final IOCompletion callback) throws Exception
+   {
+      appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync, callback);
+   }
+
    public void appendUpdateRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync) throws Exception
    {
+      SyncIOCompletion callback = getSyncCallback(sync);
+      
+      appendUpdateRecord(id, recordType, record, sync, callback);
+      
+      // We only wait on explicit callbacks
+      if (callback != null)
+      {
+         callback.waitCompletion();
+      }
+   }
+   
+   public void appendUpdateRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync, final IOCompletion callback) throws Exception
+   {
       if (state != STATE_LOADED)
       {
          throw new IllegalStateException("Journal must be loaded first");
       }
 
-      IOCompletion callback = null;
-
       compactingLock.readLock().lock();
 
       try
@@ -924,8 +949,6 @@
 
          writeUpdateRecord(-1, id, recordType, record, size, bb);
 
-         callback = getSyncCallback(sync);
-
          lockAppend.lock();
          try
          {
@@ -951,14 +974,23 @@
       {
          compactingLock.readLock().unlock();
       }
+   }
 
+
+   public void appendDeleteRecord(final long id, final boolean sync) throws Exception
+   {
+      SyncIOCompletion callback = getSyncCallback(sync);
+      
+      appendDeleteRecord(id, sync, callback);
+      
+      // We only wait on explicit callbacks
       if (callback != null)
       {
          callback.waitCompletion();
       }
    }
-
-   public void appendDeleteRecord(final long id, final boolean sync) throws Exception
+   
+   public void appendDeleteRecord(final long id, final boolean sync, final IOCompletion callback) throws Exception
    {
       if (state != STATE_LOADED)
       {
@@ -967,8 +999,6 @@
 
       compactingLock.readLock().lock();
 
-      IOCompletion callback = null;
-
       try
       {
 
@@ -988,8 +1018,6 @@
 
          writeDeleteRecord(-1, id, size, bb);
 
-         callback = getSyncCallback(sync);
-
          lockAppend.lock();
          try
          {
@@ -1016,11 +1044,6 @@
       {
          compactingLock.readLock().unlock();
       }
-
-      if (callback != null)
-      {
-         callback.waitCompletion();
-      }
    }
 
    public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, final byte[] record) throws Exception
@@ -1165,6 +1188,12 @@
    {
       appendDeleteRecordTransactional(txID, id, NullEncoding.instance);
    }
+   
+   
+   public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync, IOCompletion completion) throws Exception
+   {
+      appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync, completion);
+   }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, byte[], boolean)
@@ -1174,6 +1203,18 @@
       appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync);
    }
 
+   public void appendPrepareRecord(final long txID, final EncodingSupport transactionData, final boolean sync) throws Exception
+   {
+      SyncIOCompletion syncCompletion = getSyncCallback(sync);
+      
+      appendPrepareRecord(txID, transactionData, sync, syncCompletion);
+      
+      if (syncCompletion != null)
+      {
+         syncCompletion.waitCompletion();
+      }
+   }
+
    /** 
     * 
     * <p>If the system crashed after a prepare was called, it should store information that is required to bring the transaction 
@@ -1187,7 +1228,7 @@
     * @param transactionData - extra user data for the prepare
     * @throws Exception
     */
-   public void appendPrepareRecord(final long txID, final EncodingSupport transactionData, final boolean sync) throws Exception
+   public void appendPrepareRecord(final long txID, final EncodingSupport transactionData, final boolean sync, IOCompletion completion) throws Exception
    {
       if (state != STATE_LOADED)
       {
@@ -1198,11 +1239,6 @@
 
       JournalTransaction tx = getTransactionInfo(txID);
 
-      if (sync)
-      {
-         tx.syncPreviousFiles(fileFactory.isSupportsCallbacks(), currentFile);
-      }
-
       try
       {
 
@@ -1214,7 +1250,7 @@
          lockAppend.lock();
          try
          {
-            JournalFile usedFile = appendRecord(bb, true, sync, tx, null);
+            JournalFile usedFile = appendRecord(bb, true, sync, tx, completion);
 
             tx.prepare(usedFile);
          }
@@ -1228,11 +1264,23 @@
       {
          compactingLock.readLock().unlock();
       }
-
-      // We should wait this outside of the lock, to increase throughput
-      tx.waitCompletion();
    }
+   
+   
+   
+   public void appendCommitRecord(final long txID, final boolean sync) throws Exception
+   {
+      SyncIOCompletion syncCompletion = getSyncCallback(sync);
+      
+      appendCommitRecord(txID, sync, syncCompletion);
+      
+      if (syncCompletion != null)
+      {
+         syncCompletion.waitCompletion();
+      }
+   }
 
+
    /**
     * <p>A transaction record (Commit or Prepare), will hold the number of elements the transaction has on each file.</p>
     * <p>For example, a transaction was spread along 3 journal files with 10 pendingTransactions on each file. 
@@ -1250,7 +1298,9 @@
     *
     * @see JournalImpl#writeTransaction(byte, long, org.hornetq.core.journal.impl.JournalImpl.JournalTransaction, EncodingSupport)
     */
-   public void appendCommitRecord(final long txID, final boolean sync) throws Exception
+   
+
+   public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception
    {
       if (state != STATE_LOADED)
       {
@@ -1282,7 +1332,7 @@
          lockAppend.lock();
          try
          {
-            JournalFile usedFile = appendRecord(bb, true, sync, tx, null);
+            JournalFile usedFile = appendRecord(bb, true, sync, tx, callback);
 
             tx.commit(usedFile);
          }
@@ -1296,15 +1346,23 @@
       {
          compactingLock.readLock().unlock();
       }
+   }
 
-      if (sync)
+   
+   public void appendRollbackRecord(final long txID, final boolean sync) throws Exception
+   {
+      SyncIOCompletion syncCompletion = getSyncCallback(sync);
+      
+      appendRollbackRecord(txID, sync, syncCompletion);
+      
+      if (syncCompletion != null)
       {
-         // We should wait this outside of the lock, to increase throuput
-         tx.waitCompletion();
+         syncCompletion.waitCompletion();
       }
+
    }
-
-   public void appendRollbackRecord(final long txID, final boolean sync) throws Exception
+   
+   public void appendRollbackRecord(final long txID, final boolean sync, final IOCompletion completion) throws Exception
    {
       if (state != STATE_LOADED)
       {
@@ -1331,7 +1389,7 @@
          lockAppend.lock();
          try
          {
-            JournalFile usedFile = appendRecord(bb, false, sync, tx, null);
+            JournalFile usedFile = appendRecord(bb, false, sync, tx, completion);
 
             tx.rollback(usedFile);
          }
@@ -1345,14 +1403,6 @@
       {
          compactingLock.readLock().unlock();
       }
-
-      // We should wait this outside of the lock, to increase throuput
-
-      if (sync)
-      {
-         tx.waitCompletion();
-      }
-
    }
 
    public int getAlignment() throws Exception
@@ -2833,7 +2883,7 @@
                                     final boolean completeTransaction,
                                     final boolean sync,
                                     final JournalTransaction tx,
-                                    IOCompletion callback) throws Exception
+                                    final IOCompletion parameterCallback) throws Exception
    {
       try
       {
@@ -2841,6 +2891,8 @@
          {
             throw new IllegalStateException("The journal is not loaded " + state);
          }
+         
+         final IOCompletion callback;
 
          int size = bb.capacity();
 
@@ -2874,25 +2926,26 @@
 
          if (tx != null)
          {
-            if (callback != null)
-            {
-               // sanity check, it should not happen.
-               throw new IllegalArgumentException("Invalid callback parameter. Use of tx is mutually exclusive with the callback");
-            }
-
             // The callback of a transaction has to be taken inside the lock,
             // when we guarantee the currentFile will not be changed,
             // since we individualize the callback per file
             if (fileFactory.isSupportsCallbacks())
             {
-               callback = tx.getCallback(currentFile);
+               // Set the delegated callback as a parameter
+               TransactionCallback txcallback = tx.getCallback(currentFile);
+               txcallback.setDelegateCompletion(parameterCallback);
+               callback = txcallback;
             }
+            else
+            {
+               callback = null;
+            }
 
             if (sync)
             {
-               // 99 % of the times this will be already synced, as previous files should be closed already.
-               // This is to have 100% guarantee the transaction will be persisted and no loss of information would
-               // happen
+               // In an edge case the transaction could still have pending data from previous files. 
+               // This shouldn't cause any blocking issues, as this is here to guarantee we cover all possibilities
+               // on guaranteeing the data is on the disk
                tx.syncPreviousFiles(fileFactory.isSupportsCallbacks(), currentFile);
             }
 
@@ -2903,6 +2956,10 @@
                tx.fillNumberOfRecords(currentFile, bb);
             }
          }
+         else
+         {
+            callback = parameterCallback;
+         }
 
          // Adding fileID
          bb.writerIndex(DataConstants.SIZE_BYTE);
@@ -3233,13 +3290,13 @@
       return tx;
    }
 
-   private IOCompletion getSyncCallback(final boolean sync)
+   private SyncIOCompletion getSyncCallback(final boolean sync)
    {
       if (fileFactory.isSupportsCallbacks())
       {
          if (sync)
          {
-            return SimpleWaitIOCallback.getInstance();
+            return new SimpleWaitIOCallback();
          }
          else
          {

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java	2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java	2009-11-18 02:52:31 UTC (rev 8303)
@@ -14,9 +14,9 @@
 package org.hornetq.core.journal.impl;
 
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.hornetq.core.exception.HornetQException;
-import org.hornetq.core.journal.IOCompletion;
 import org.hornetq.core.logging.Logger;
 
 /**
@@ -26,7 +26,7 @@
  *
  *
  */
-public class SimpleWaitIOCallback implements IOCompletion
+public class SimpleWaitIOCallback extends SyncIOCompletion
 {
 
    private static final Logger log = Logger.getLogger(SimpleWaitIOCallback.class);
@@ -37,12 +37,6 @@
 
    private volatile int errorCode = 0;
 
-   public static IOCompletion getInstance()
-   {
-      return new SimpleWaitIOCallback();
-   }
-
-
    public void done()
    {
       latch.countDown();
@@ -68,4 +62,9 @@
       }
       return;
    }
+   
+   public boolean waitCompletion(final long timeout) throws Exception
+   {
+      return latch.await(timeout, TimeUnit.MILLISECONDS);
+   }
 }

Added: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/SyncIOCompletion.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/SyncIOCompletion.java	                        (rev 0)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/SyncIOCompletion.java	2009-11-18 02:52:31 UTC (rev 8303)
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import org.hornetq.core.journal.IOCompletion;
+
+/**
+ * Internal class used to manage explicit syncs on the Journal through callbacks.
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public abstract class SyncIOCompletion implements IOCompletion
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+   
+   public abstract void waitCompletion() throws Exception;
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TransactionCallback.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TransactionCallback.java	2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TransactionCallback.java	2009-11-18 02:52:31 UTC (rev 8303)
@@ -31,15 +31,27 @@
    private volatile String errorMessage = null;
 
    private volatile int errorCode = 0;
+   
+   private volatile int up = 0;
+   
+   private volatile int done = 0;
+   
+   private volatile IOCompletion delegateCompletion;
 
    public void countUp()
    {
+      up++;
       countLatch.up();
    }
 
    public void done()
    {
       countLatch.down();
+      if (++done == up && delegateCompletion != null)
+      {
+         delegateCompletion.done();
+         delegateCompletion = null;
+      }
    }
 
    public void waitCompletion() throws InterruptedException
@@ -59,9 +71,30 @@
       this.errorCode = errorCode;
 
       countLatch.down();
+      
+      if (delegateCompletion != null)
+      {
+         delegateCompletion.onError(errorCode, errorMessage);
+      }
    }
 
    /**
+    * @return the delegateCompletion
+    */
+   public IOCompletion getDelegateCompletion()
+   {
+      return delegateCompletion;
+   }
+
+   /**
+    * @param delegateCompletion the delegateCompletion to set
+    */
+   public void setDelegateCompletion(IOCompletion delegateCompletion)
+   {
+      this.delegateCompletion = delegateCompletion;
+   }
+
+   /**
     * @return the errorMessage
     */
    public String getErrorMessage()

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2009-11-18 02:52:31 UTC (rev 8303)
@@ -997,8 +997,7 @@
 
       depageTransaction.commit();
 
-      // StorageManager does the check: if (replicated) -> do the proper cleanup already
-      storageManager.completeReplication();
+      storageManager.completeOperations();
 
       if (isTrace)
       {

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java	2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java	2009-11-18 02:52:31 UTC (rev 8303)
@@ -18,6 +18,7 @@
 
 import javax.transaction.xa.Xid;
 
+import org.hornetq.core.journal.IOCompletion;
 import org.hornetq.core.journal.JournalLoadInformation;
 import org.hornetq.core.paging.PageTransactionInfo;
 import org.hornetq.core.paging.PagedMessage;
@@ -56,13 +57,14 @@
 
    boolean isReplicated();
 
-   void afterCompletion(Runnable run);
+   void afterCompleteOperations(IOCompletion run);
    
    /** Block until the replication is done. 
     * @throws Exception */
-   void waitOnReplication(long timeout) throws Exception;
+   void waitOnOperations(long timeout) throws Exception;
 
-   void completeReplication();
+   /** To close the OperationsContext */
+   void completeOperations();
 
    UUID getPersistentID();
    

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-11-18 02:52:31 UTC (rev 8303)
@@ -33,11 +33,12 @@
 import javax.transaction.xa.Xid;
 
 import org.hornetq.core.buffers.ChannelBuffers;
-import org.hornetq.core.completion.impl.CompletionContextImpl;
+import org.hornetq.core.completion.impl.OperationContextImpl;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.IOCompletion;
 import org.hornetq.core.journal.Journal;
 import org.hornetq.core.journal.JournalLoadInformation;
 import org.hornetq.core.journal.PreparedTransactionInfo;
@@ -48,6 +49,7 @@
 import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
 import org.hornetq.core.journal.impl.JournalImpl;
 import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.core.journal.impl.SimpleWaitIOCallback;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.impl.MessageImpl;
 import org.hornetq.core.paging.PageTransactionInfo;
@@ -292,7 +294,7 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.persistence.StorageManager#completeReplication()
     */
-   public void completeReplication()
+   public void completeOperations()
    {
       if (replicator != null)
       {
@@ -308,19 +310,12 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.persistence.StorageManager#blockOnReplication()
     */
-   public void waitOnReplication(final long timeout) throws Exception
+   public void waitOnOperations(final long timeout) throws Exception
    {
-      final CountDownLatch latch = new CountDownLatch(1);
-      afterCompletion(new Runnable()
+      SimpleWaitIOCallback waitCallback = new SimpleWaitIOCallback();
+      afterCompleteOperations(waitCallback);
+      if (!waitCallback.waitCompletion(timeout))
       {
-         public void run()
-         {
-            latch.countDown();
-         }
-      });
-      completeReplication();
-      if (!latch.await(timeout, TimeUnit.MILLISECONDS))
-      {
          throw new IllegalStateException("no response received from replication");
       }
    }
@@ -364,9 +359,9 @@
 
    // TODO: shouldn't those page methods be on the PageManager? ^^^^
 
-   public void afterCompletion(Runnable run)
+   public void afterCompleteOperations(IOCompletion run)
    {
-      CompletionContextImpl.getContext().afterCompletion(run);
+      OperationContextImpl.getContext().executeOnCompletion(run);
    }
 
    public UUID getPersistentID()

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2009-11-18 02:52:31 UTC (rev 8303)
@@ -20,6 +20,7 @@
 import javax.transaction.xa.Xid;
 
 import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.journal.IOCompletion;
 import org.hornetq.core.journal.JournalLoadInformation;
 import org.hornetq.core.paging.PageTransactionInfo;
 import org.hornetq.core.paging.PagedMessage;
@@ -282,14 +283,6 @@
    }
 
    /* (non-Javadoc)
-    * @see org.hornetq.core.persistence.StorageManager#afterReplicated(java.lang.Runnable)
-    */
-   public void afterCompletion(Runnable run)
-   {
-      run.run();
-   }
-
-   /* (non-Javadoc)
     * @see org.hornetq.core.persistence.StorageManager#isReplicated()
     */
    public boolean isReplicated()
@@ -300,7 +293,7 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.persistence.StorageManager#completeReplication()
     */
-   public void completeReplication()
+   public void completeOperations()
    {
    }
 
@@ -336,7 +329,7 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.persistence.StorageManager#blockOnReplication(long)
     */
-   public void waitOnReplication(long timeout) throws Exception
+   public void waitOnOperations(long timeout) throws Exception
    {
    }
 
@@ -348,4 +341,12 @@
       throw new IllegalStateException("Null Persistence should never be used as replicated");
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#afterCompleteOperations(org.hornetq.core.journal.IOCompletion)
+    */
+   public void afterCompleteOperations(IOCompletion run)
+   {
+      run.done();
+   }
+
 }

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-11-18 02:52:31 UTC (rev 8303)
@@ -924,20 +924,13 @@
       }
       else
       {
-         if (storageManager.isReplicated())
+         storageManager.afterCompleteOperations(new Runnable()
          {
-            storageManager.afterCompletion(new Runnable()
+            public void run()
             {
-               public void run()
-               {
-                  addReferences(refs);
-               }
-            });
-         }
-         else
-         {
-            addReferences(refs);
-         }
+               addReferences(refs);
+            }
+         });
       }
    }
 

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java	2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java	2009-11-18 02:52:31 UTC (rev 8303)
@@ -15,7 +15,7 @@
 
 import java.util.Set;
 
-import org.hornetq.core.completion.CompletionContext;
+import org.hornetq.core.completion.OperationContext;
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.journal.EncodingSupport;
 import org.hornetq.core.journal.JournalLoadInformation;
@@ -53,7 +53,7 @@
    void closeContext();
    
    /** A list of tokens that are still waiting for replications to be completed */
-   Set<CompletionContext> getActiveTokens();
+   Set<OperationContext> getActiveTokens();
 
    /**
     * @param storeName

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-11-18 02:52:31 UTC (rev 8303)
@@ -22,8 +22,8 @@
 
 import org.hornetq.core.client.SessionFailureListener;
 import org.hornetq.core.client.impl.FailoverManager;
-import org.hornetq.core.completion.CompletionContext;
-import org.hornetq.core.completion.impl.CompletionContextImpl;
+import org.hornetq.core.completion.OperationContext;
+import org.hornetq.core.completion.impl.OperationContextImpl;
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.journal.EncodingSupport;
 import org.hornetq.core.journal.JournalLoadInformation;
@@ -82,7 +82,7 @@
 
    private final Object replicationLock = new Object();
 
-   private final Queue<CompletionContext> pendingTokens = new ConcurrentLinkedQueue<CompletionContext>();
+   private final Queue<OperationContext> pendingTokens = new ConcurrentLinkedQueue<OperationContext>();
 
    // Static --------------------------------------------------------
 
@@ -359,17 +359,17 @@
    {
       enabled = false;
       
-      LinkedHashSet<CompletionContext> activeContexts = new LinkedHashSet<CompletionContext>();
+      LinkedHashSet<OperationContext> activeContexts = new LinkedHashSet<OperationContext>();
       
       // The same context will be replicated on the pending tokens...
       // as the multiple operations will be replicated on the same context
       while (!pendingTokens.isEmpty())
       {
-         CompletionContext ctx = pendingTokens.poll();
+         OperationContext ctx = pendingTokens.poll();
          activeContexts.add(ctx);
       }
 
-      for (CompletionContext ctx : activeContexts)
+      for (OperationContext ctx : activeContexts)
       {
          ctx.complete();
          ctx.flush();
@@ -397,7 +397,7 @@
     */
    public void closeContext()
    {
-      final CompletionContext token = getContext();
+      final OperationContext token = getContext();
 
       if (token != null)
       {
@@ -414,15 +414,15 @@
    /* method for testcases only
     * @see org.hornetq.core.replication.ReplicationManager#getPendingTokens()
     */
-   public Set<CompletionContext> getActiveTokens()
+   public Set<OperationContext> getActiveTokens()
    {
       
-      LinkedHashSet<CompletionContext> activeContexts = new LinkedHashSet<CompletionContext>();
+      LinkedHashSet<OperationContext> activeContexts = new LinkedHashSet<OperationContext>();
       
       // The same context will be replicated on the pending tokens...
       // as the multiple operations will be replicated on the same context
       
-      for (CompletionContext ctx : pendingTokens)
+      for (OperationContext ctx : pendingTokens)
       {
          activeContexts.add(ctx);
       }
@@ -435,7 +435,7 @@
    {
       boolean runItNow = false;
 
-      CompletionContext repliToken = getContext();
+      OperationContext repliToken = getContext();
       repliToken.linedUp();
 
       synchronized (replicationLock)
@@ -458,7 +458,7 @@
 
       if (runItNow)
       {
-         repliToken.replicated();
+         repliToken.done();
       }
    }
 
@@ -472,11 +472,11 @@
 
    private void replicated()
    {
-      List<CompletionContext> tokensToExecute = getTokens();
+      List<OperationContext> tokensToExecute = getTokens();
 
-      for (CompletionContext ctx : tokensToExecute)
+      for (OperationContext ctx : tokensToExecute)
       {
-         ctx.replicated();
+         ctx.done();
       }
    }
 
@@ -486,7 +486,7 @@
 
    // Private -------------------------------------------------------
 
-   private void sync(CompletionContext context)
+   private void sync(OperationContext context)
    {
       boolean executeNow = false;
       synchronized (replicationLock)
@@ -507,14 +507,14 @@
       }
       if (executeNow)
       {
-         context.replicated();
+         context.done();
       }
    }
 
    
-   public CompletionContext getContext()
+   public OperationContext getContext()
    {
-      return CompletionContextImpl.getContext();
+      return OperationContextImpl.getContext();
    }
 
    /**
@@ -523,11 +523,11 @@
     * At last, if the list is empty, it will verify if there are any future tokens that are sync tokens, to avoid a case where no more replication is done due to inactivity.
     * @return
     */
-   private List<CompletionContext> getTokens()
+   private List<OperationContext> getTokens()
    {
-      List<CompletionContext> retList = new LinkedList<CompletionContext>();
+      List<OperationContext> retList = new LinkedList<OperationContext>();
 
-      CompletionContext tokenPolled = null;
+      OperationContext tokenPolled = null;
 
       // First will get all the non replicated tokens up to the first one that is not replicated
       do

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java	2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java	2009-11-18 02:52:31 UTC (rev 8303)
@@ -144,21 +144,14 @@
 
       tx.commit();
 
-      if (storageManager.isReplicated())
+      storageManager.afterCompleteOperations(new Runnable()
       {
-         storageManager.afterCompletion(new Runnable()
+         public void run()
          {
-            public void run()
-            {
-               execPrompter();
-            }
-         });
-         storageManager.completeReplication();
-      }
-      else
-      {
-         execPrompter();
-      }
+            execPrompter();
+         }
+      });
+      storageManager.completeOperations();
    }
    
    private void execPrompter()

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java	2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java	2009-11-18 02:52:31 UTC (rev 8303)
@@ -87,10 +87,7 @@
          }
          newList.add(groupBinding);
          storageManager.addGrouping(groupBinding);
-         if (storageManager.isReplicated())
-         {
-            storageManager.waitOnReplication(timeout);
-         }
+         storageManager.waitOnOperations(timeout);
          return new Response(groupBinding.getGroupId(), groupBinding.getClusterName());
       }
       else

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-18 02:52:31 UTC (rev 8303)
@@ -1718,23 +1718,16 @@
                              final boolean flush,
                              final boolean closeChannel)
    {
-      if (storageManager.isReplicated())
+      storageManager.afterCompleteOperations(new Runnable()
       {
-         storageManager.afterCompletion(new Runnable()
+         public void run()
          {
-            public void run()
-            {
-               doSendResponse(confirmPacket, response, flush, closeChannel);
-            }
+            doSendResponse(confirmPacket, response, flush, closeChannel);
+         }
 
-         });
-         
-         storageManager.completeReplication();
-      }
-      else
-      {
-         doSendResponse(confirmPacket, response, flush, closeChannel);
-      }
+      });
+      
+      storageManager.completeOperations();
    }
 
    /**

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java	2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java	2009-11-18 02:52:31 UTC (rev 8303)
@@ -98,7 +98,7 @@
    {
       this.containsPersistent = true;
    }
-   
+
    public long getID()
    {
       return id;
@@ -209,53 +209,35 @@
             }
          }
 
-         Runnable execAfterCommit = null;
-
-         if (operations != null)
-         {
-            execAfterCommit = new Runnable()
-            {
-               public void run()
-               {
-                  for (TransactionOperation operation : operations)
-                  {
-                     try
-                     {
-                        operation.afterCommit(TransactionImpl.this);
-                     }
-                     catch (Exception e)
-                     {
-                        // https://jira.jboss.org/jira/browse/HORNETQ-188
-                        // After commit shouldn't throw an exception
-                        log.warn(e.getMessage(), e);
-                     }
-                  }
-               }
-            };
-         }
-
          if (containsPersistent || (xid != null && state == State.PREPARED))
          {
             storageManager.commit(id);
 
             state = State.COMMITTED;
+         }
 
-            if (execAfterCommit != null)
+         // We use the Callback even for non persistence
+         // If we are using non-persistence with replication, the replication manager will have
+         // to execute this runnable in the correct order
+         storageManager.afterCompleteOperations(new Runnable()
+         {
+            public void run()
             {
-               if (storageManager.isReplicated())
+               for (TransactionOperation operation : operations)
                {
-                  storageManager.afterCompletion(execAfterCommit);
+                  try
+                  {
+                     operation.afterCommit(TransactionImpl.this);
+                  }
+                  catch (Exception e)
+                  {
+                     // https://jira.jboss.org/jira/browse/HORNETQ-188
+                     // After commit shouldn't throw an exception
+                     log.warn(e.getMessage(), e);
+                  }
                }
-               else
-               {
-                  execAfterCommit.run();
-               }
             }
-         }
-         else if (execAfterCommit != null)
-         {
-            execAfterCommit.run();
-         }
+         });
       }
    }
 

Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java	2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java	2009-11-18 02:52:31 UTC (rev 8303)
@@ -92,7 +92,7 @@
       }
       session.createQueue(address, queue, true);
       ClientProducer producer = session.createProducer(address);
-      boolean durable = true;
+      boolean durable = false;
       for (int i = 0; i < NUM; i++)
       {
          ClientMessage msg = session.createClientMessage(durable);

Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-11-18 02:52:31 UTC (rev 8303)
@@ -32,7 +32,7 @@
 import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
 import org.hornetq.core.client.impl.FailoverManager;
 import org.hornetq.core.client.impl.FailoverManagerImpl;
-import org.hornetq.core.completion.impl.CompletionContextImpl;
+import org.hornetq.core.completion.impl.OperationContextImpl;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.TransportConfiguration;
 import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -375,7 +375,7 @@
          }
 
          final CountDownLatch latch = new CountDownLatch(1);
-         CompletionContextImpl.getContext().afterCompletion(new Runnable()
+         OperationContextImpl.getContext().executeOnCompletion(new Runnable()
          {
             public void run()
             {
@@ -402,7 +402,7 @@
    private void blockOnReplication(ReplicationManagerImpl manager) throws Exception
    {
       final CountDownLatch latch = new CountDownLatch(1);
-      CompletionContextImpl.getContext().afterCompletion(new Runnable()
+      OperationContextImpl.getContext().executeOnCompletion(new Runnable()
       {
 
          public void run()
@@ -458,7 +458,7 @@
          replicatedJournal.appendPrepareRecord(1, new FakeData(), false);
 
          final CountDownLatch latch = new CountDownLatch(1);
-         CompletionContextImpl.getContext().afterCompletion(new Runnable()
+         OperationContextImpl.getContext().executeOnCompletion(new Runnable()
          {
 
             public void run()
@@ -468,22 +468,10 @@
 
          });
 
-         assertEquals(1, manager.getActiveTokens().size());
-
          manager.closeContext();
 
          assertTrue(latch.await(1, TimeUnit.SECONDS));
 
-         for (int i = 0; i < 100; i++)
-         {
-            // This is asynchronous. Have to wait completion
-            if (manager.getActiveTokens().size() == 0)
-            {
-               break;
-            }
-            Thread.sleep(1);
-         }
-
          assertEquals(0, manager.getActiveTokens().size());
          manager.stop();
       }
@@ -530,7 +518,7 @@
             }
 
 
-            CompletionContextImpl.getContext().afterCompletion(new Runnable()
+            OperationContextImpl.getContext().executeOnCompletion(new Runnable()
             {
 
                public void run()
@@ -552,17 +540,6 @@
             assertEquals(i, executions.get(i).intValue());
          }
          
-         for (int i = 0; i < 100; i++)
-         {
-            // This is asynchronous. Have to wait completion
-            if (manager.getActiveTokens().size() == 0)
-            {
-               break;
-            }
-            Thread.sleep(1);
-         }
-
-
          assertEquals(0, manager.getActiveTokens().size());
          manager.stop();
       }

Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-11-18 02:52:31 UTC (rev 8303)
@@ -1155,7 +1155,7 @@
       /* (non-Javadoc)
        * @see org.hornetq.core.persistence.StorageManager#afterReplicated(java.lang.Runnable)
        */
-      public void afterCompletion(Runnable run)
+      public void afterCompleteOperations(Runnable run)
       {
 
       }
@@ -1163,7 +1163,7 @@
       /* (non-Javadoc)
        * @see org.hornetq.core.persistence.StorageManager#completeReplication()
        */
-      public void completeReplication()
+      public void completeOperations()
       {
 
       }
@@ -1221,7 +1221,7 @@
       /* (non-Javadoc)
        * @see org.hornetq.core.persistence.StorageManager#blockOnReplication(long)
        */
-      public void waitOnReplication(long timeout) throws Exception
+      public void waitOnOperations(long timeout) throws Exception
       {
       }
 



More information about the hornetq-commits mailing list