Author: clebert.suconic(a)jboss.com
Date: 2009-11-18 15:44:12 -0500 (Wed, 18 Nov 2009)
New Revision: 8315
Added:
branches/ClebertTemporary/src/main/org/hornetq/core/journal/IOAsyncTask.java
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationContext.java
branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/OperationContextImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/IOCompletion.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/SequentialFile.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/DummyCallback.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TransactionCallback.java
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
fixes
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationContext.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationContext.java 2009-11-18
19:46:20 UTC (rev 8314)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationContext.java 2009-11-18
20:44:12 UTC (rev 8315)
@@ -13,6 +13,7 @@
package org.hornetq.core.completion;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.IOCompletion;
@@ -26,12 +27,10 @@
*/
public interface OperationContext extends IOCompletion
{
- /** To be called by the replication manager, when new replication is added to the
queue */
- void linedUp();
boolean hasData();
- void executeOnCompletion(IOCompletion runnable);
+ void executeOnCompletion(IOAsyncTask runnable);
/** To be called when there are no more operations pending */
void complete();
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/OperationContextImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/OperationContextImpl.java 2009-11-18
19:46:20 UTC (rev 8314)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/OperationContextImpl.java 2009-11-18
20:44:12 UTC (rev 8315)
@@ -17,7 +17,7 @@
import java.util.List;
import org.hornetq.core.completion.OperationContext;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
/**
* A ReplicationToken
@@ -42,7 +42,7 @@
return token;
}
- private List<IOCompletion> tasks;
+ private List<IOAsyncTask> tasks;
private int linedup = 0;
@@ -72,7 +72,7 @@
}
/** You may have several actions to be done after a replication operation is
completed. */
- public void executeOnCompletion(IOCompletion completion)
+ public void executeOnCompletion(IOAsyncTask completion)
{
if (complete)
{
@@ -84,7 +84,7 @@
{
// No need to use Concurrent, we only add from a single thread.
// We don't add any more Runnables after it is complete
- tasks = new LinkedList<IOCompletion>();
+ tasks = new LinkedList<IOAsyncTask>();
}
tasks.add(completion);
@@ -116,7 +116,7 @@
{
if (tasks != null)
{
- for (IOCompletion run : tasks)
+ for (IOAsyncTask run : tasks)
{
run.done();
}
@@ -145,7 +145,7 @@
{
if (tasks != null)
{
- for (IOCompletion run : tasks)
+ for (IOAsyncTask run : tasks)
{
run.onError(errorCode, errorMessage);
}
Copied: branches/ClebertTemporary/src/main/org/hornetq/core/journal/IOAsyncTask.java (from
rev 8314, branches/ClebertTemporary/src/main/org/hornetq/core/journal/IOCompletion.java)
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/IOAsyncTask.java
(rev 0)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/journal/IOAsyncTask.java 2009-11-18
20:44:12 UTC (rev 8315)
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal;
+
+import org.hornetq.core.asyncio.AIOCallback;
+
+/**
+ *
+ * This class is just a direct extension of AIOCallback.
+ * Just to avoid the direct dependency of org.hornetq.core.asynciio.AIOCallback from the
journal.
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
+ *
+ */
+public interface IOAsyncTask extends AIOCallback
+{
+}
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/IOCompletion.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/journal/IOCompletion.java 2009-11-18
19:46:20 UTC (rev 8314)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/journal/IOCompletion.java 2009-11-18
20:44:12 UTC (rev 8315)
@@ -13,16 +13,14 @@
package org.hornetq.core.journal;
-import org.hornetq.core.asyncio.AIOCallback;
-
/**
- *
- * This class is just a direct extension of AIOCallback.
- * Just to avoid the direct dependency of org.hornetq.core.asynciio.AIOCallback from the
journal.
- *
- * @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
+ * A IOCompletion
*
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
*/
-public interface IOCompletion extends AIOCallback
+public interface IOCompletion extends IOAsyncTask
{
+ void linedUp();
}
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/SequentialFile.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/journal/SequentialFile.java 2009-11-18
19:46:20 UTC (rev 8314)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/journal/SequentialFile.java 2009-11-18
20:44:12 UTC (rev 8315)
@@ -56,17 +56,17 @@
void delete() throws Exception;
- void write(HornetQBuffer bytes, boolean sync, IOCompletion callback) throws
Exception;
+ void write(HornetQBuffer bytes, boolean sync, IOAsyncTask callback) throws Exception;
void write(HornetQBuffer bytes, boolean sync) throws Exception;
/** Write directly to the file without using any buffer */
- void writeDirect(ByteBuffer bytes, boolean sync, IOCompletion callback);
+ void writeDirect(ByteBuffer bytes, boolean sync, IOAsyncTask callback);
/** Write directly to the file without using any buffer */
void writeDirect(ByteBuffer bytes, boolean sync) throws Exception;
- int read(ByteBuffer bytes, IOCompletion callback) throws Exception;
+ int read(ByteBuffer bytes, IOAsyncTask callback) throws Exception;
int read(ByteBuffer bytes) throws Exception;
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-11-18
19:46:20 UTC (rev 8314)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-11-18
20:44:12 UTC (rev 8315)
@@ -25,7 +25,7 @@
import org.hornetq.core.asyncio.AsynchronousFile;
import org.hornetq.core.asyncio.BufferCallback;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.logging.Logger;
@@ -214,7 +214,7 @@
aioFile.setBufferCallback(callback);
}
- public int read(final ByteBuffer bytes, final IOCompletion callback) throws Exception
+ public int read(final ByteBuffer bytes, final IOAsyncTask callback) throws Exception
{
int bytesToRead = bytes.limit();
@@ -298,7 +298,7 @@
*
* @param sync Not used on AIO
* */
- public void writeDirect(final ByteBuffer bytes, final boolean sync, IOCompletion
callback)
+ public void writeDirect(final ByteBuffer bytes, final boolean sync, IOAsyncTask
callback)
{
final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-18
19:46:20 UTC (rev 8314)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-18
20:44:12 UTC (rev 8315)
@@ -18,7 +18,7 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.logging.Logger;
@@ -159,7 +159,7 @@
}
- public void write(final HornetQBuffer bytes, final boolean sync, final IOCompletion
callback) throws Exception
+ public void write(final HornetQBuffer bytes, final boolean sync, final IOAsyncTask
callback) throws Exception
{
if (timedBuffer != null)
{
@@ -203,18 +203,18 @@
// Inner classes -------------------------------------------------
- protected static class DelegateCallback implements IOCompletion
+ protected static class DelegateCallback implements IOAsyncTask
{
- final List<IOCompletion> delegates;
+ final List<IOAsyncTask> delegates;
- DelegateCallback(final List<IOCompletion> delegates)
+ DelegateCallback(final List<IOAsyncTask> delegates)
{
this.delegates = delegates;
}
public void done()
{
- for (IOCompletion callback : delegates)
+ for (IOAsyncTask callback : delegates)
{
try
{
@@ -229,7 +229,7 @@
public void onError(final int errorCode, final String errorMessage)
{
- for (IOCompletion callback : delegates)
+ for (IOAsyncTask callback : delegates)
{
try
{
@@ -249,7 +249,7 @@
protected class LocalBufferObserver implements TimedBufferObserver
{
- public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final
List<IOCompletion> callbacks)
+ public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final
List<IOAsyncTask> callbacks)
{
buffer.flip();
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/DummyCallback.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/DummyCallback.java 2009-11-18
19:46:20 UTC (rev 8314)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/DummyCallback.java 2009-11-18
20:44:12 UTC (rev 8315)
@@ -46,5 +46,12 @@
public void waitCompletion() throws Exception
{
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.IOCompletion#linedUp()
+ */
+ public void linedUp()
+ {
+ }
}
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-18
19:46:20 UTC (rev 8314)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-18
20:44:12 UTC (rev 8315)
@@ -44,6 +44,7 @@
import org.hornetq.core.buffers.ChannelBuffer;
import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.LoaderCallback;
@@ -869,6 +870,11 @@
{
throw new IllegalStateException("Journal must be loaded first");
}
+
+ if (callback != null)
+ {
+ callback.linedUp();
+ }
compactingLock.readLock().lock();
@@ -927,6 +933,11 @@
{
throw new IllegalStateException("Journal must be loaded first");
}
+
+ if (callback != null)
+ {
+ callback.linedUp();
+ }
compactingLock.readLock().lock();
@@ -996,6 +1007,11 @@
{
throw new IllegalStateException("Journal must be loaded first");
}
+
+ if (callback != null)
+ {
+ callback.linedUp();
+ }
compactingLock.readLock().lock();
@@ -1228,13 +1244,18 @@
* @param transactionData - extra user data for the prepare
* @throws Exception
*/
- public void appendPrepareRecord(final long txID, final EncodingSupport
transactionData, final boolean sync, IOCompletion completion) throws Exception
+ public void appendPrepareRecord(final long txID, final EncodingSupport
transactionData, final boolean sync, IOCompletion callback) throws Exception
{
if (state != STATE_LOADED)
{
throw new IllegalStateException("Journal must be loaded first");
}
+ if (callback != null)
+ {
+ callback.linedUp();
+ }
+
compactingLock.readLock().lock();
JournalTransaction tx = getTransactionInfo(txID);
@@ -1250,7 +1271,7 @@
lockAppend.lock();
try
{
- JournalFile usedFile = appendRecord(bb, true, sync, tx, completion);
+ JournalFile usedFile = appendRecord(bb, true, sync, tx, callback);
tx.prepare(usedFile);
}
@@ -1307,6 +1328,11 @@
throw new IllegalStateException("Journal must be loaded first");
}
+ if (callback != null)
+ {
+ callback.linedUp();
+ }
+
compactingLock.readLock().lock();
JournalTransaction tx = transactions.remove(txID);
@@ -1362,13 +1388,18 @@
}
- public void appendRollbackRecord(final long txID, final boolean sync, final
IOCompletion completion) throws Exception
+ public void appendRollbackRecord(final long txID, final boolean sync, final
IOCompletion callback) throws Exception
{
if (state != STATE_LOADED)
{
throw new IllegalStateException("Journal must be loaded first");
}
+ if (callback != null)
+ {
+ callback.linedUp();
+ }
+
compactingLock.readLock().lock();
JournalTransaction tx = null;
@@ -1389,7 +1420,7 @@
lockAppend.lock();
try
{
- JournalFile usedFile = appendRecord(bb, false, sync, tx, completion);
+ JournalFile usedFile = appendRecord(bb, false, sync, tx, callback);
tx.rollback(usedFile);
}
@@ -2883,7 +2914,7 @@
final boolean completeTransaction,
final boolean sync,
final JournalTransaction tx,
- final IOCompletion parameterCallback) throws
Exception
+ final IOAsyncTask parameterCallback) throws
Exception
{
try
{
@@ -2892,7 +2923,7 @@
throw new IllegalStateException("The journal is not loaded " +
state);
}
- final IOCompletion callback;
+ final IOAsyncTask callback;
int size = bb.capacity();
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-11-18
19:46:20 UTC (rev 8314)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-11-18
20:44:12 UTC (rev 8315)
@@ -19,7 +19,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.logging.Logger;
@@ -133,7 +133,7 @@
return read(bytes, null);
}
- public int read(final ByteBuffer bytes, final IOCompletion callback) throws Exception
+ public int read(final ByteBuffer bytes, final IOAsyncTask callback) throws Exception
{
try
{
@@ -197,7 +197,7 @@
return new NIOSequentialFile(factory, getFile());
}
- public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOCompletion
callback)
+ public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOAsyncTask
callback)
{
if (callback == null)
{
@@ -226,7 +226,7 @@
* @throws IOException
* @throws Exception
*/
- private void internalWrite(final ByteBuffer bytes, final boolean sync, final
IOCompletion callback) throws Exception
+ private void internalWrite(final ByteBuffer bytes, final boolean sync, final
IOAsyncTask callback) throws Exception
{
position.addAndGet(bytes.limit());
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java 2009-11-18
19:46:20 UTC (rev 8314)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java 2009-11-18
20:44:12 UTC (rev 8315)
@@ -67,4 +67,11 @@
{
return latch.await(timeout, TimeUnit.MILLISECONDS);
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.IOCompletion#linedUp()
+ */
+ public void linedUp()
+ {
+ }
}
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-18
19:46:20 UTC (rev 8314)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-18
20:44:12 UTC (rev 8315)
@@ -22,7 +22,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.hornetq.core.buffers.ChannelBuffers;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.utils.VariableLatch;
@@ -56,7 +56,7 @@
private int bufferLimit = 0;
- private List<IOCompletion> callbacks;
+ private List<IOAsyncTask> callbacks;
private final Lock lock = new ReentrantReadWriteLock().writeLock();
@@ -106,7 +106,7 @@
buffer.clear();
bufferLimit = 0;
- callbacks = new ArrayList<IOCompletion>();
+ callbacks = new ArrayList<IOAsyncTask>();
this.flushOnSync = flushOnSync;
latchTimer.up();
this.timeout = timeout;
@@ -225,7 +225,7 @@
}
}
- public synchronized void addBytes(final byte[] bytes, final boolean sync, final
IOCompletion callback)
+ public synchronized void addBytes(final byte[] bytes, final boolean sync, final
IOAsyncTask callback)
{
if (buffer.writerIndex() == 0)
{
@@ -280,7 +280,7 @@
bufferObserver.flushBuffer(directBuffer, pendingSync, callbacks);
- callbacks = new ArrayList<IOCompletion>();
+ callbacks = new ArrayList<IOAsyncTask>();
active = false;
pendingSync = false;
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java 2009-11-18
19:46:20 UTC (rev 8314)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java 2009-11-18
20:44:12 UTC (rev 8315)
@@ -17,7 +17,7 @@
import java.nio.ByteBuffer;
import java.util.List;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
/**
* A TimedBufferObserver
@@ -39,7 +39,7 @@
// Public --------------------------------------------------------
- public void flushBuffer(ByteBuffer buffer, boolean syncRequested,
List<IOCompletion> callbacks);
+ public void flushBuffer(ByteBuffer buffer, boolean syncRequested,
List<IOAsyncTask> callbacks);
/** Return the number of remaining bytes that still fit on the observer (file) */
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TransactionCallback.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TransactionCallback.java 2009-11-18
19:46:20 UTC (rev 8314)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TransactionCallback.java 2009-11-18
20:44:12 UTC (rev 8315)
@@ -14,7 +14,7 @@
package org.hornetq.core.journal.impl;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.utils.VariableLatch;
/**
@@ -24,7 +24,7 @@
*
*
*/
-public class TransactionCallback implements IOCompletion
+public class TransactionCallback implements IOAsyncTask
{
private final VariableLatch countLatch = new VariableLatch();
@@ -36,7 +36,7 @@
private volatile int done = 0;
- private volatile IOCompletion delegateCompletion;
+ private volatile IOAsyncTask delegateCompletion;
public void countUp()
{
@@ -81,7 +81,7 @@
/**
* @return the delegateCompletion
*/
- public IOCompletion getDelegateCompletion()
+ public IOAsyncTask getDelegateCompletion()
{
return delegateCompletion;
}
@@ -89,7 +89,7 @@
/**
* @param delegateCompletion the delegateCompletion to set
*/
- public void setDelegateCompletion(IOCompletion delegateCompletion)
+ public void setDelegateCompletion(IOAsyncTask delegateCompletion)
{
this.delegateCompletion = delegateCompletion;
}
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java 2009-11-18
19:46:20 UTC (rev 8314)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java 2009-11-18
20:44:12 UTC (rev 8315)
@@ -18,7 +18,7 @@
import javax.transaction.xa.Xid;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
@@ -57,7 +57,7 @@
boolean isReplicated();
- void afterCompleteOperations(IOCompletion run);
+ void afterCompleteOperations(IOAsyncTask run);
/** Block until the replication is done.
* @throws Exception */
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-18
19:46:20 UTC (rev 8314)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-18
20:44:12 UTC (rev 8315)
@@ -26,9 +26,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
import javax.transaction.xa.Xid;
@@ -38,6 +36,7 @@
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
@@ -363,7 +362,7 @@
// TODO: shouldn't those page methods be on the PageManager? ^^^^
- public void afterCompleteOperations(IOCompletion run)
+ public void afterCompleteOperations(IOAsyncTask run)
{
OperationContextImpl.getContext().executeOnCompletion(run);
}
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-11-18
19:46:20 UTC (rev 8314)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-11-18
20:44:12 UTC (rev 8315)
@@ -20,7 +20,7 @@
import javax.transaction.xa.Xid;
import org.hornetq.core.buffers.ChannelBuffers;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
@@ -344,7 +344,7 @@
/* (non-Javadoc)
* @see
org.hornetq.core.persistence.StorageManager#afterCompleteOperations(org.hornetq.core.journal.IOCompletion)
*/
- public void afterCompleteOperations(IOCompletion run)
+ public void afterCompleteOperations(IOAsyncTask run)
{
run.done();
}
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-18
19:46:20 UTC (rev 8314)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-18
20:44:12 UTC (rev 8315)
@@ -28,7 +28,7 @@
import org.hornetq.core.client.management.impl.ManagementHelper;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.Filter;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.ManagementService;
import org.hornetq.core.management.Notification;
@@ -925,7 +925,7 @@
}
else
{
- storageManager.afterCompleteOperations(new IOCompletion()
+ storageManager.afterCompleteOperations(new IOAsyncTask()
{
public void onError(int errorCode, String errorMessage)
{
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2009-11-18
19:46:20 UTC (rev 8314)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2009-11-18
20:44:12 UTC (rev 8315)
@@ -16,7 +16,7 @@
import java.util.concurrent.Executor;
import org.hornetq.core.filter.Filter;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
@@ -145,7 +145,7 @@
tx.commit();
- storageManager.afterCompleteOperations(new IOCompletion()
+ storageManager.afterCompleteOperations(new IOAsyncTask()
{
public void onError(int errorCode, String errorMessage)
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-18
19:46:20 UTC (rev 8314)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-18
20:44:12 UTC (rev 8315)
@@ -34,7 +34,7 @@
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.filter.impl.FilterImpl;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.ManagementService;
import org.hornetq.core.management.Notification;
@@ -1719,7 +1719,7 @@
final boolean flush,
final boolean closeChannel)
{
- storageManager.afterCompleteOperations(new IOCompletion()
+ storageManager.afterCompleteOperations(new IOAsyncTask()
{
public void onError(int errorCode, String errorMessage)
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2009-11-18
19:46:20 UTC (rev 8314)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2009-11-18
20:44:12 UTC (rev 8315)
@@ -19,7 +19,7 @@
import javax.transaction.xa.Xid;
import org.hornetq.core.exception.HornetQException;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
@@ -220,7 +220,7 @@
// We use the Callback even for non persistence
// If we are using non-persistence with replication, the replication manager
will have
// to execute this runnable in the correct order
- storageManager.afterCompleteOperations(new IOCompletion()
+ storageManager.afterCompleteOperations(new IOAsyncTask()
{
public void onError(int errorCode, String errorMessage)
Modified:
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
---
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-18
19:46:20 UTC (rev 8314)
+++
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-18
20:44:12 UTC (rev 8315)
@@ -38,6 +38,7 @@
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
@@ -376,7 +377,7 @@
}
final CountDownLatch latch = new CountDownLatch(1);
- OperationContextImpl.getContext().executeOnCompletion(new IOCompletion()
+ OperationContextImpl.getContext().executeOnCompletion(new IOAsyncTask()
{
public void onError(int errorCode, String errorMessage)
@@ -408,7 +409,7 @@
private void blockOnReplication(ReplicationManagerImpl manager) throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
- OperationContextImpl.getContext().executeOnCompletion(new IOCompletion()
+ OperationContextImpl.getContext().executeOnCompletion(new IOAsyncTask()
{
public void onError(int errorCode, String errorMessage)
@@ -467,7 +468,7 @@
replicatedJournal.appendPrepareRecord(1, new FakeData(), false);
final CountDownLatch latch = new CountDownLatch(1);
- OperationContextImpl.getContext().executeOnCompletion(new IOCompletion()
+ OperationContextImpl.getContext().executeOnCompletion(new IOAsyncTask()
{
public void onError(int errorCode, String errorMessage)
@@ -529,7 +530,7 @@
replicatedJournal.appendPrepareRecord(i, new FakeData(), false);
}
- OperationContextImpl.getContext().executeOnCompletion(new IOCompletion()
+ OperationContextImpl.getContext().executeOnCompletion(new IOAsyncTask()
{
public void onError(int errorCode, String errorMessage)
Modified:
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java
===================================================================
---
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java 2009-11-18
19:46:20 UTC (rev 8314)
+++
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java 2009-11-18
20:44:12 UTC (rev 8315)
@@ -18,7 +18,7 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.impl.TimedBuffer;
import org.hornetq.core.journal.impl.TimedBufferObserver;
import org.hornetq.tests.util.UnitTestCase;
@@ -42,7 +42,7 @@
// Public --------------------------------------------------------
- IOCompletion dummyCallback = new IOCompletion()
+ IOAsyncTask dummyCallback = new IOAsyncTask()
{
public void done()
@@ -64,7 +64,7 @@
final AtomicInteger flushTimes = new AtomicInteger(0);
class TestObserver implements TimedBufferObserver
{
- public void flushBuffer(final ByteBuffer buffer, final boolean sync, final
List<IOCompletion> callbacks)
+ public void flushBuffer(final ByteBuffer buffer, final boolean sync, final
List<IOAsyncTask> callbacks)
{
buffers.add(buffer);
flushTimes.incrementAndGet();
Modified:
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
---
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-11-18
19:46:20 UTC (rev 8314)
+++
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-11-18
20:44:12 UTC (rev 8315)
@@ -20,7 +20,7 @@
import java.util.concurrent.ConcurrentHashMap;
import org.hornetq.core.asyncio.BufferCallback;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.TimedBuffer;
@@ -241,11 +241,11 @@
final ByteBuffer bytes;
- final IOCompletion callback;
+ final IOAsyncTask callback;
volatile boolean sendError;
- CallbackRunnable(final FakeSequentialFile file, final ByteBuffer bytes, final
IOCompletion callback)
+ CallbackRunnable(final FakeSequentialFile file, final ByteBuffer bytes, final
IOAsyncTask callback)
{
this.file = file;
this.bytes = bytes;
@@ -399,7 +399,7 @@
return read(bytes, null);
}
- public int read(final ByteBuffer bytes, final IOCompletion callback) throws
Exception
+ public int read(final ByteBuffer bytes, final IOAsyncTask callback) throws
Exception
{
if (!open)
{
@@ -439,7 +439,7 @@
return data.position();
}
- public synchronized void writeDirect(final ByteBuffer bytes, final boolean sync,
final IOCompletion callback)
+ public synchronized void writeDirect(final ByteBuffer bytes, final boolean sync,
final IOAsyncTask callback)
{
if (!open)
{
@@ -605,7 +605,7 @@
/* (non-Javadoc)
* @see
org.hornetq.core.journal.SequentialFile#write(org.hornetq.core.remoting.spi.HornetQBuffer,
boolean, org.hornetq.core.journal.IOCallback)
*/
- public void write(HornetQBuffer bytes, boolean sync, IOCompletion callback) throws
Exception
+ public void write(HornetQBuffer bytes, boolean sync, IOAsyncTask callback) throws
Exception
{
writeDirect(ByteBuffer.wrap(bytes.array()), sync, callback);
Modified:
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
---
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-18
19:46:20 UTC (rev 8314)
+++
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-18
20:44:12 UTC (rev 8315)
@@ -29,7 +29,7 @@
import javax.transaction.xa.Xid;
import org.hornetq.core.buffers.ChannelBuffers;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
@@ -1236,7 +1236,7 @@
/* (non-Javadoc)
* @see
org.hornetq.core.persistence.StorageManager#afterCompleteOperations(org.hornetq.core.journal.IOCompletion)
*/
- public void afterCompleteOperations(IOCompletion run)
+ public void afterCompleteOperations(IOAsyncTask run)
{
// TODO Auto-generated method stub