[hornetq-commits] JBoss hornetq SVN: r8315 - in branches/ClebertTemporary: src/main/org/hornetq/core/completion/impl and 13 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Nov 18 15:44:13 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-11-18 15:44:12 -0500 (Wed, 18 Nov 2009)
New Revision: 8315

Added:
   branches/ClebertTemporary/src/main/org/hornetq/core/journal/IOAsyncTask.java
Modified:
   branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationContext.java
   branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/OperationContextImpl.java
   branches/ClebertTemporary/src/main/org/hornetq/core/journal/IOCompletion.java
   branches/ClebertTemporary/src/main/org/hornetq/core/journal/SequentialFile.java
   branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
   branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
   branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/DummyCallback.java
   branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
   branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java
   branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
   branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java
   branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TransactionCallback.java
   branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java
   branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
   branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
   branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
   branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
   branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java
   branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
   branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
fixes

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationContext.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationContext.java	2009-11-18 19:46:20 UTC (rev 8314)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationContext.java	2009-11-18 20:44:12 UTC (rev 8315)
@@ -13,6 +13,7 @@
 
 package org.hornetq.core.completion;
 
+import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.journal.IOCompletion;
 
 
@@ -26,12 +27,10 @@
  */
 public interface OperationContext extends IOCompletion
 {
-   /** To be called by the replication manager, when new replication is added to the queue */
-   void linedUp();
    
    boolean hasData();
 
-   void executeOnCompletion(IOCompletion runnable);
+   void executeOnCompletion(IOAsyncTask runnable);
    
    /** To be called when there are no more operations pending */
    void complete();

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/OperationContextImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/OperationContextImpl.java	2009-11-18 19:46:20 UTC (rev 8314)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/OperationContextImpl.java	2009-11-18 20:44:12 UTC (rev 8315)
@@ -17,7 +17,7 @@
 import java.util.List;
 
 import org.hornetq.core.completion.OperationContext;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
 
 /**
  * A ReplicationToken
@@ -42,7 +42,7 @@
       return token;
    }
 
-   private List<IOCompletion> tasks;
+   private List<IOAsyncTask> tasks;
    
    private int linedup = 0;
 
@@ -72,7 +72,7 @@
    }
 
    /** You may have several actions to be done after a replication operation is completed. */
-   public void executeOnCompletion(IOCompletion completion)
+   public void executeOnCompletion(IOAsyncTask completion)
    {
       if (complete)
       {
@@ -84,7 +84,7 @@
       {
          // No need to use Concurrent, we only add from a single thread.
          // We don't add any more Runnables after it is complete
-         tasks = new LinkedList<IOCompletion>();
+         tasks = new LinkedList<IOAsyncTask>();
       }
 
       tasks.add(completion);
@@ -116,7 +116,7 @@
    {
       if (tasks != null)
       {
-         for (IOCompletion run : tasks)
+         for (IOAsyncTask run : tasks)
          {
             run.done();
          }
@@ -145,7 +145,7 @@
    {
       if (tasks != null)
       {
-         for (IOCompletion run : tasks)
+         for (IOAsyncTask run : tasks)
          {
             run.onError(errorCode, errorMessage);
          }

Copied: branches/ClebertTemporary/src/main/org/hornetq/core/journal/IOAsyncTask.java (from rev 8314, branches/ClebertTemporary/src/main/org/hornetq/core/journal/IOCompletion.java)
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/IOAsyncTask.java	                        (rev 0)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/IOAsyncTask.java	2009-11-18 20:44:12 UTC (rev 8315)
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal;
+
+import org.hornetq.core.asyncio.AIOCallback;
+
+/**
+ * 
+ * This class is just a direct extension of AIOCallback.
+ * Just to avoid the direct dependency of org.hornetq.core.asynciio.AIOCallback from the journal.
+ * 
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public interface IOAsyncTask extends AIOCallback
+{
+}

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/IOCompletion.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/IOCompletion.java	2009-11-18 19:46:20 UTC (rev 8314)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/IOCompletion.java	2009-11-18 20:44:12 UTC (rev 8315)
@@ -13,16 +13,14 @@
 
 package org.hornetq.core.journal;
 
-import org.hornetq.core.asyncio.AIOCallback;
-
 /**
- * 
- * This class is just a direct extension of AIOCallback.
- * Just to avoid the direct dependency of org.hornetq.core.asynciio.AIOCallback from the journal.
- * 
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ * A IOCompletion
  *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
  */
-public interface IOCompletion extends AIOCallback
+public interface IOCompletion extends IOAsyncTask
 {
+   void linedUp();
 }

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/SequentialFile.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/SequentialFile.java	2009-11-18 19:46:20 UTC (rev 8314)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/SequentialFile.java	2009-11-18 20:44:12 UTC (rev 8315)
@@ -56,17 +56,17 @@
 
    void delete() throws Exception;
 
-   void write(HornetQBuffer bytes, boolean sync, IOCompletion callback) throws Exception;
+   void write(HornetQBuffer bytes, boolean sync, IOAsyncTask callback) throws Exception;
 
    void write(HornetQBuffer bytes, boolean sync) throws Exception;
 
    /** Write directly to the file without using any buffer */
-   void writeDirect(ByteBuffer bytes, boolean sync, IOCompletion callback);
+   void writeDirect(ByteBuffer bytes, boolean sync, IOAsyncTask callback);
 
    /** Write directly to the file without using any buffer */
    void writeDirect(ByteBuffer bytes, boolean sync) throws Exception;
 
-   int read(ByteBuffer bytes, IOCompletion callback) throws Exception;
+   int read(ByteBuffer bytes, IOAsyncTask callback) throws Exception;
 
    int read(ByteBuffer bytes) throws Exception;
 

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java	2009-11-18 19:46:20 UTC (rev 8314)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java	2009-11-18 20:44:12 UTC (rev 8315)
@@ -25,7 +25,7 @@
 import org.hornetq.core.asyncio.AsynchronousFile;
 import org.hornetq.core.asyncio.BufferCallback;
 import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.logging.Logger;
@@ -214,7 +214,7 @@
       aioFile.setBufferCallback(callback);
    }
 
-   public int read(final ByteBuffer bytes, final IOCompletion callback) throws Exception
+   public int read(final ByteBuffer bytes, final IOAsyncTask callback) throws Exception
    {
       int bytesToRead = bytes.limit();
 
@@ -298,7 +298,7 @@
     * 
     * @param sync Not used on AIO
     *  */
-   public void writeDirect(final ByteBuffer bytes, final boolean sync, IOCompletion callback)
+   public void writeDirect(final ByteBuffer bytes, final boolean sync, IOAsyncTask callback)
    {
       final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
 

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java	2009-11-18 19:46:20 UTC (rev 8314)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java	2009-11-18 20:44:12 UTC (rev 8315)
@@ -18,7 +18,7 @@
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.logging.Logger;
@@ -159,7 +159,7 @@
 
    }
 
-   public void write(final HornetQBuffer bytes, final boolean sync, final IOCompletion callback) throws Exception
+   public void write(final HornetQBuffer bytes, final boolean sync, final IOAsyncTask callback) throws Exception
    {
       if (timedBuffer != null)
       {
@@ -203,18 +203,18 @@
 
    // Inner classes -------------------------------------------------
 
-   protected static class DelegateCallback implements IOCompletion
+   protected static class DelegateCallback implements IOAsyncTask
    {
-      final List<IOCompletion> delegates;
+      final List<IOAsyncTask> delegates;
 
-      DelegateCallback(final List<IOCompletion> delegates)
+      DelegateCallback(final List<IOAsyncTask> delegates)
       {
          this.delegates = delegates;
       }
 
       public void done()
       {
-         for (IOCompletion callback : delegates)
+         for (IOAsyncTask callback : delegates)
          {
             try
             {
@@ -229,7 +229,7 @@
 
       public void onError(final int errorCode, final String errorMessage)
       {
-         for (IOCompletion callback : delegates)
+         for (IOAsyncTask callback : delegates)
          {
             try
             {
@@ -249,7 +249,7 @@
 
    protected class LocalBufferObserver implements TimedBufferObserver
    {
-      public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final List<IOCompletion> callbacks)
+      public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final List<IOAsyncTask> callbacks)
       {
          buffer.flip();
 

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/DummyCallback.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/DummyCallback.java	2009-11-18 19:46:20 UTC (rev 8314)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/DummyCallback.java	2009-11-18 20:44:12 UTC (rev 8315)
@@ -46,5 +46,12 @@
    public void waitCompletion() throws Exception
    {
    }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.IOCompletion#linedUp()
+    */
+   public void linedUp()
+   {
+   }
 }
 

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-11-18 19:46:20 UTC (rev 8314)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-11-18 20:44:12 UTC (rev 8315)
@@ -44,6 +44,7 @@
 import org.hornetq.core.buffers.ChannelBuffer;
 import org.hornetq.core.buffers.ChannelBuffers;
 import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.journal.IOCompletion;
 import org.hornetq.core.journal.JournalLoadInformation;
 import org.hornetq.core.journal.LoaderCallback;
@@ -869,6 +870,11 @@
       {
          throw new IllegalStateException("Journal must be loaded first");
       }
+      
+      if (callback != null)
+      {
+         callback.linedUp();
+      }
 
       compactingLock.readLock().lock();
 
@@ -927,6 +933,11 @@
       {
          throw new IllegalStateException("Journal must be loaded first");
       }
+      
+      if (callback != null)
+      {
+         callback.linedUp();
+      }
 
       compactingLock.readLock().lock();
 
@@ -996,6 +1007,11 @@
       {
          throw new IllegalStateException("Journal must be loaded first");
       }
+      
+      if (callback != null)
+      {
+         callback.linedUp();
+      }
 
       compactingLock.readLock().lock();
 
@@ -1228,13 +1244,18 @@
     * @param transactionData - extra user data for the prepare
     * @throws Exception
     */
-   public void appendPrepareRecord(final long txID, final EncodingSupport transactionData, final boolean sync, IOCompletion completion) throws Exception
+   public void appendPrepareRecord(final long txID, final EncodingSupport transactionData, final boolean sync, IOCompletion callback) throws Exception
    {
       if (state != STATE_LOADED)
       {
          throw new IllegalStateException("Journal must be loaded first");
       }
 
+      if (callback != null)
+      {
+         callback.linedUp();
+      }
+      
       compactingLock.readLock().lock();
 
       JournalTransaction tx = getTransactionInfo(txID);
@@ -1250,7 +1271,7 @@
          lockAppend.lock();
          try
          {
-            JournalFile usedFile = appendRecord(bb, true, sync, tx, completion);
+            JournalFile usedFile = appendRecord(bb, true, sync, tx, callback);
 
             tx.prepare(usedFile);
          }
@@ -1307,6 +1328,11 @@
          throw new IllegalStateException("Journal must be loaded first");
       }
 
+      if (callback != null)
+      {
+         callback.linedUp();
+      }
+      
       compactingLock.readLock().lock();
 
       JournalTransaction tx = transactions.remove(txID);
@@ -1362,13 +1388,18 @@
 
    }
    
-   public void appendRollbackRecord(final long txID, final boolean sync, final IOCompletion completion) throws Exception
+   public void appendRollbackRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception
    {
       if (state != STATE_LOADED)
       {
          throw new IllegalStateException("Journal must be loaded first");
       }
 
+      if (callback != null)
+      {
+         callback.linedUp();
+      }
+      
       compactingLock.readLock().lock();
 
       JournalTransaction tx = null;
@@ -1389,7 +1420,7 @@
          lockAppend.lock();
          try
          {
-            JournalFile usedFile = appendRecord(bb, false, sync, tx, completion);
+            JournalFile usedFile = appendRecord(bb, false, sync, tx, callback);
 
             tx.rollback(usedFile);
          }
@@ -2883,7 +2914,7 @@
                                     final boolean completeTransaction,
                                     final boolean sync,
                                     final JournalTransaction tx,
-                                    final IOCompletion parameterCallback) throws Exception
+                                    final IOAsyncTask parameterCallback) throws Exception
    {
       try
       {
@@ -2892,7 +2923,7 @@
             throw new IllegalStateException("The journal is not loaded " + state);
          }
          
-         final IOCompletion callback;
+         final IOAsyncTask callback;
 
          int size = bb.capacity();
 

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java	2009-11-18 19:46:20 UTC (rev 8314)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java	2009-11-18 20:44:12 UTC (rev 8315)
@@ -19,7 +19,7 @@
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.logging.Logger;
@@ -133,7 +133,7 @@
       return read(bytes, null);
    }
 
-   public int read(final ByteBuffer bytes, final IOCompletion callback) throws Exception
+   public int read(final ByteBuffer bytes, final IOAsyncTask callback) throws Exception
    {
       try
       {
@@ -197,7 +197,7 @@
       return new NIOSequentialFile(factory, getFile());
    }
 
-   public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOCompletion callback)
+   public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback)
    {
       if (callback == null)
       {
@@ -226,7 +226,7 @@
     * @throws IOException
     * @throws Exception
     */
-   private void internalWrite(final ByteBuffer bytes, final boolean sync, final IOCompletion callback) throws Exception
+   private void internalWrite(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback) throws Exception
    {
       position.addAndGet(bytes.limit());
 

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java	2009-11-18 19:46:20 UTC (rev 8314)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java	2009-11-18 20:44:12 UTC (rev 8315)
@@ -67,4 +67,11 @@
    {
       return latch.await(timeout, TimeUnit.MILLISECONDS);
    }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.IOCompletion#linedUp()
+    */
+   public void linedUp()
+   {
+   }
 }

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TimedBuffer.java	2009-11-18 19:46:20 UTC (rev 8314)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TimedBuffer.java	2009-11-18 20:44:12 UTC (rev 8315)
@@ -22,7 +22,7 @@
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.hornetq.core.buffers.ChannelBuffers;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
 import org.hornetq.utils.VariableLatch;
@@ -56,7 +56,7 @@
 
    private int bufferLimit = 0;
 
-   private List<IOCompletion> callbacks;
+   private List<IOAsyncTask> callbacks;
 
    private final Lock lock = new ReentrantReadWriteLock().writeLock();
 
@@ -106,7 +106,7 @@
       buffer.clear();
       bufferLimit = 0;
 
-      callbacks = new ArrayList<IOCompletion>();
+      callbacks = new ArrayList<IOAsyncTask>();
       this.flushOnSync = flushOnSync;
       latchTimer.up();
       this.timeout = timeout;
@@ -225,7 +225,7 @@
       }
    }
 
-   public synchronized void addBytes(final byte[] bytes, final boolean sync, final IOCompletion callback)
+   public synchronized void addBytes(final byte[] bytes, final boolean sync, final IOAsyncTask callback)
    {
       if (buffer.writerIndex() == 0)
       {
@@ -280,7 +280,7 @@
 
          bufferObserver.flushBuffer(directBuffer, pendingSync, callbacks);
 
-         callbacks = new ArrayList<IOCompletion>();
+         callbacks = new ArrayList<IOAsyncTask>();
 
          active = false;
          pendingSync = false;

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java	2009-11-18 19:46:20 UTC (rev 8314)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java	2009-11-18 20:44:12 UTC (rev 8315)
@@ -17,7 +17,7 @@
 import java.nio.ByteBuffer;
 import java.util.List;
 
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
 
 /**
  * A TimedBufferObserver
@@ -39,7 +39,7 @@
 
    // Public --------------------------------------------------------
    
-   public void flushBuffer(ByteBuffer buffer, boolean syncRequested, List<IOCompletion> callbacks);
+   public void flushBuffer(ByteBuffer buffer, boolean syncRequested, List<IOAsyncTask> callbacks);
    
    
    /** Return the number of remaining bytes that still fit on the observer (file) */

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TransactionCallback.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TransactionCallback.java	2009-11-18 19:46:20 UTC (rev 8314)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TransactionCallback.java	2009-11-18 20:44:12 UTC (rev 8315)
@@ -14,7 +14,7 @@
 
 package org.hornetq.core.journal.impl;
 
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.utils.VariableLatch;
 
 /**
@@ -24,7 +24,7 @@
  *
  *
  */
-public class TransactionCallback implements IOCompletion
+public class TransactionCallback implements IOAsyncTask
 {
    private final VariableLatch countLatch = new VariableLatch();
 
@@ -36,7 +36,7 @@
    
    private volatile int done = 0;
    
-   private volatile IOCompletion delegateCompletion;
+   private volatile IOAsyncTask delegateCompletion;
 
    public void countUp()
    {
@@ -81,7 +81,7 @@
    /**
     * @return the delegateCompletion
     */
-   public IOCompletion getDelegateCompletion()
+   public IOAsyncTask getDelegateCompletion()
    {
       return delegateCompletion;
    }
@@ -89,7 +89,7 @@
    /**
     * @param delegateCompletion the delegateCompletion to set
     */
-   public void setDelegateCompletion(IOCompletion delegateCompletion)
+   public void setDelegateCompletion(IOAsyncTask delegateCompletion)
    {
       this.delegateCompletion = delegateCompletion;
    }

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java	2009-11-18 19:46:20 UTC (rev 8314)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java	2009-11-18 20:44:12 UTC (rev 8315)
@@ -18,7 +18,7 @@
 
 import javax.transaction.xa.Xid;
 
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.journal.JournalLoadInformation;
 import org.hornetq.core.paging.PageTransactionInfo;
 import org.hornetq.core.paging.PagedMessage;
@@ -57,7 +57,7 @@
 
    boolean isReplicated();
 
-   void afterCompleteOperations(IOCompletion run);
+   void afterCompleteOperations(IOAsyncTask run);
    
    /** Block until the replication is done. 
     * @throws Exception */

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-11-18 19:46:20 UTC (rev 8314)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-11-18 20:44:12 UTC (rev 8315)
@@ -26,9 +26,7 @@
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
 
 import javax.transaction.xa.Xid;
 
@@ -38,6 +36,7 @@
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.journal.IOCompletion;
 import org.hornetq.core.journal.Journal;
 import org.hornetq.core.journal.JournalLoadInformation;
@@ -363,7 +362,7 @@
 
    // TODO: shouldn't those page methods be on the PageManager? ^^^^
 
-   public void afterCompleteOperations(IOCompletion run)
+   public void afterCompleteOperations(IOAsyncTask run)
    {
       OperationContextImpl.getContext().executeOnCompletion(run);
    }

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2009-11-18 19:46:20 UTC (rev 8314)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2009-11-18 20:44:12 UTC (rev 8315)
@@ -20,7 +20,7 @@
 import javax.transaction.xa.Xid;
 
 import org.hornetq.core.buffers.ChannelBuffers;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.journal.JournalLoadInformation;
 import org.hornetq.core.paging.PageTransactionInfo;
 import org.hornetq.core.paging.PagedMessage;
@@ -344,7 +344,7 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.persistence.StorageManager#afterCompleteOperations(org.hornetq.core.journal.IOCompletion)
     */
-   public void afterCompleteOperations(IOCompletion run)
+   public void afterCompleteOperations(IOAsyncTask run)
    {
       run.done();
    }

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-11-18 19:46:20 UTC (rev 8314)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-11-18 20:44:12 UTC (rev 8315)
@@ -28,7 +28,7 @@
 import org.hornetq.core.client.management.impl.ManagementHelper;
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.filter.Filter;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.management.ManagementService;
 import org.hornetq.core.management.Notification;
@@ -925,7 +925,7 @@
       }
       else
       {
-         storageManager.afterCompleteOperations(new IOCompletion()
+         storageManager.afterCompleteOperations(new IOAsyncTask()
          {
             public void onError(int errorCode, String errorMessage)
             {

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java	2009-11-18 19:46:20 UTC (rev 8314)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java	2009-11-18 20:44:12 UTC (rev 8315)
@@ -16,7 +16,7 @@
 import java.util.concurrent.Executor;
 
 import org.hornetq.core.filter.Filter;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.PostOffice;
@@ -145,7 +145,7 @@
 
       tx.commit();
 
-      storageManager.afterCompleteOperations(new IOCompletion()
+      storageManager.afterCompleteOperations(new IOAsyncTask()
       {
          
          public void onError(int errorCode, String errorMessage)

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-18 19:46:20 UTC (rev 8314)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-18 20:44:12 UTC (rev 8315)
@@ -34,7 +34,7 @@
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.filter.impl.FilterImpl;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.management.ManagementService;
 import org.hornetq.core.management.Notification;
@@ -1719,7 +1719,7 @@
                              final boolean flush,
                              final boolean closeChannel)
    {
-      storageManager.afterCompleteOperations(new IOCompletion()
+      storageManager.afterCompleteOperations(new IOAsyncTask()
       {
          
          public void onError(int errorCode, String errorMessage)

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java	2009-11-18 19:46:20 UTC (rev 8314)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java	2009-11-18 20:44:12 UTC (rev 8315)
@@ -19,7 +19,7 @@
 import javax.transaction.xa.Xid;
 
 import org.hornetq.core.exception.HornetQException;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.PostOffice;
@@ -220,7 +220,7 @@
          // We use the Callback even for non persistence
          // If we are using non-persistence with replication, the replication manager will have
          // to execute this runnable in the correct order
-         storageManager.afterCompleteOperations(new IOCompletion()
+         storageManager.afterCompleteOperations(new IOAsyncTask()
          {
 
             public void onError(int errorCode, String errorMessage)

Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-11-18 19:46:20 UTC (rev 8314)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-11-18 20:44:12 UTC (rev 8315)
@@ -38,6 +38,7 @@
 import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.journal.IOCompletion;
 import org.hornetq.core.journal.Journal;
 import org.hornetq.core.journal.JournalLoadInformation;
@@ -376,7 +377,7 @@
          }
 
          final CountDownLatch latch = new CountDownLatch(1);
-         OperationContextImpl.getContext().executeOnCompletion(new IOCompletion()
+         OperationContextImpl.getContext().executeOnCompletion(new IOAsyncTask()
          {
 
             public void onError(int errorCode, String errorMessage)
@@ -408,7 +409,7 @@
    private void blockOnReplication(ReplicationManagerImpl manager) throws Exception
    {
       final CountDownLatch latch = new CountDownLatch(1);
-      OperationContextImpl.getContext().executeOnCompletion(new IOCompletion()
+      OperationContextImpl.getContext().executeOnCompletion(new IOAsyncTask()
       {
 
          public void onError(int errorCode, String errorMessage)
@@ -467,7 +468,7 @@
          replicatedJournal.appendPrepareRecord(1, new FakeData(), false);
 
          final CountDownLatch latch = new CountDownLatch(1);
-         OperationContextImpl.getContext().executeOnCompletion(new IOCompletion()
+         OperationContextImpl.getContext().executeOnCompletion(new IOAsyncTask()
          {
 
             public void onError(int errorCode, String errorMessage)
@@ -529,7 +530,7 @@
                replicatedJournal.appendPrepareRecord(i, new FakeData(), false);
             }
 
-            OperationContextImpl.getContext().executeOnCompletion(new IOCompletion()
+            OperationContextImpl.getContext().executeOnCompletion(new IOAsyncTask()
             {
 
                public void onError(int errorCode, String errorMessage)

Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java	2009-11-18 19:46:20 UTC (rev 8314)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java	2009-11-18 20:44:12 UTC (rev 8315)
@@ -18,7 +18,7 @@
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.journal.impl.TimedBuffer;
 import org.hornetq.core.journal.impl.TimedBufferObserver;
 import org.hornetq.tests.util.UnitTestCase;
@@ -42,7 +42,7 @@
 
    // Public --------------------------------------------------------
 
-   IOCompletion dummyCallback = new IOCompletion()
+   IOAsyncTask dummyCallback = new IOAsyncTask()
    {
 
       public void done()
@@ -64,7 +64,7 @@
       final AtomicInteger flushTimes = new AtomicInteger(0);
       class TestObserver implements TimedBufferObserver
       {
-         public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOCompletion> callbacks)
+         public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOAsyncTask> callbacks)
          {
             buffers.add(buffer);
             flushTimes.incrementAndGet();

Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-11-18 19:46:20 UTC (rev 8314)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-11-18 20:44:12 UTC (rev 8315)
@@ -20,7 +20,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.hornetq.core.asyncio.BufferCallback;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.journal.impl.TimedBuffer;
@@ -241,11 +241,11 @@
 
       final ByteBuffer bytes;
 
-      final IOCompletion callback;
+      final IOAsyncTask callback;
 
       volatile boolean sendError;
 
-      CallbackRunnable(final FakeSequentialFile file, final ByteBuffer bytes, final IOCompletion callback)
+      CallbackRunnable(final FakeSequentialFile file, final ByteBuffer bytes, final IOAsyncTask callback)
       {
          this.file = file;
          this.bytes = bytes;
@@ -399,7 +399,7 @@
          return read(bytes, null);
       }
 
-      public int read(final ByteBuffer bytes, final IOCompletion callback) throws Exception
+      public int read(final ByteBuffer bytes, final IOAsyncTask callback) throws Exception
       {
          if (!open)
          {
@@ -439,7 +439,7 @@
          return data.position();
       }
 
-      public synchronized void writeDirect(final ByteBuffer bytes, final boolean sync, final IOCompletion callback)
+      public synchronized void writeDirect(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback)
       {
          if (!open)
          {
@@ -605,7 +605,7 @@
       /* (non-Javadoc)
        * @see org.hornetq.core.journal.SequentialFile#write(org.hornetq.core.remoting.spi.HornetQBuffer, boolean, org.hornetq.core.journal.IOCallback)
        */
-      public void write(HornetQBuffer bytes, boolean sync, IOCompletion callback) throws Exception
+      public void write(HornetQBuffer bytes, boolean sync, IOAsyncTask callback) throws Exception
       {
          writeDirect(ByteBuffer.wrap(bytes.array()), sync, callback);
 

Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-11-18 19:46:20 UTC (rev 8314)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-11-18 20:44:12 UTC (rev 8315)
@@ -29,7 +29,7 @@
 import javax.transaction.xa.Xid;
 
 import org.hornetq.core.buffers.ChannelBuffers;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.journal.JournalLoadInformation;
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.journal.SequentialFileFactory;
@@ -1236,7 +1236,7 @@
       /* (non-Javadoc)
        * @see org.hornetq.core.persistence.StorageManager#afterCompleteOperations(org.hornetq.core.journal.IOCompletion)
        */
-      public void afterCompleteOperations(IOCompletion run)
+      public void afterCompleteOperations(IOAsyncTask run)
       {
          // TODO Auto-generated method stub
          



More information about the hornetq-commits mailing list