Author: clebert.suconic(a)jboss.com
Date: 2009-11-21 15:37:23 -0500 (Sat, 21 Nov 2009)
New Revision: 8363
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/asyncio/AsynchronousFile.java
branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/Journal.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/SequentialFile.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/replication/ReplicationManager.java
branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/client/OrderTest.java
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
sync on files
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/asyncio/AsynchronousFile.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/asyncio/AsynchronousFile.java 2009-11-21
17:13:50 UTC (rev 8362)
+++
branches/ClebertCallback/src/main/org/hornetq/core/asyncio/AsynchronousFile.java 2009-11-21
20:37:23 UTC (rev 8363)
@@ -41,13 +41,20 @@
* */
long size() throws HornetQException;
+ /** Some operations may need to be done only after persitency is done.
+ * for instance, when a messaging system needs to guarantee ordering over
non-persistent data,
+ * it needs to make sure it will only deliver the message after all the data is
persisted.
+ * The sync won't perform any disk operation however it will wait for all the
current pending operations
+ * on this file to be finished */
+ void syncCallback(AIOCallback aioCallback);
+
/** Any error will be reported on the callback interface */
void write(long position, long size, ByteBuffer directByteBuffer, AIOCallback
aioCallback);
void read(long position, long size, ByteBuffer directByteBuffer, AIOCallback
aioCallback) throws HornetQException;
void fill(long position, int blocks, long size, byte fillChar) throws
HornetQException;
-
+
void setBufferCallback(BufferCallback callback);
int getBlockSize();
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2009-11-21
17:13:50 UTC (rev 8362)
+++
branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2009-11-21
20:37:23 UTC (rev 8363)
@@ -295,9 +295,9 @@
public void run()
{
writeSemaphore.acquireUninterruptibly();
+
+ final long sequence = nextWritingSequence.getAndIncrement();
- long sequence = nextWritingSequence.getAndIncrement();
-
try
{
write(handler, sequence, position, size, directByteBuffer,
aioCallback);
@@ -321,7 +321,7 @@
{
writeSemaphore.acquireUninterruptibly();
- long sequence = nextWritingSequence.getAndIncrement();
+ final long sequence = nextWritingSequence.getAndIncrement();
try
{
@@ -438,16 +438,42 @@
}
}
- // Private
---------------------------------------------------------------------------
+ // Callback methods
------------------------------------------------------------------
+ public void syncCallback(final AIOCallback callback)
+ {
+ pendingWrites.up();
+
+ writeExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ callbackLock.lock();
+
+ try
+ {
+ final long sequence = nextWritingSequence.getAndIncrement();
+
+ // This will execute the callback immediately if nothing is pending,
+ // or it will place it to the queue waiting for a response
+ executeCallback(callback, sequence);
+
+ }
+ finally
+ {
+ callbackLock.unlock();
+ }
+ }
+ });
+
+ }
+
/** */
@SuppressWarnings("unused")
private void callbackDone(final AIOCallback callback, final long sequence, final
ByteBuffer buffer)
{
writeSemaphore.release();
- pendingWrites.down();
-
callbackLock.lock();
try
@@ -456,20 +482,11 @@
if (sequence == -1)
{
callback.done();
+ pendingWrites.down();
}
else
{
- if (sequence == nextReadSequence)
- {
- nextReadSequence++;
- callback.done();
- flushCallbacks();
- }
- else
- {
- // System.out.println("Buffering callback");
- pendingCallbacks.add(new CallbackHolder(sequence, callback));
- }
+ executeCallback(callback, sequence);
}
// The buffer is not sent on callback for read operations
@@ -484,6 +501,26 @@
}
}
+ /**
+ * @param callback
+ * @param sequence
+ */
+ private void executeCallback(final AIOCallback callback, final long sequence)
+ {
+ if (sequence == nextReadSequence)
+ {
+ nextReadSequence++;
+ callback.done();
+ pendingWrites.down();
+ flushCallbacks();
+ }
+ else
+ {
+ // System.out.println("Buffering callback");
+ pendingCallbacks.add(new CallbackHolder(sequence, callback));
+ }
+ }
+
private void flushCallbacks()
{
while (!pendingCallbacks.isEmpty() && pendingCallbacks.peek().sequence ==
nextReadSequence)
@@ -498,6 +535,7 @@
{
holder.callback.done();
}
+ pendingWrites.down();
nextReadSequence++;
}
}
@@ -550,6 +588,8 @@
}
}
+ // Private
---------------------------------------------------------------------------
+
private void pollEvents()
{
if (!opened)
Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/Journal.java 2009-11-21
17:13:50 UTC (rev 8362)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/Journal.java 2009-11-21
20:37:23 UTC (rev 8363)
@@ -112,5 +112,7 @@
void perfBlast(int pages) throws Exception;
+ void sync(IOCompletion callback);
+
}
Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/SequentialFile.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/journal/SequentialFile.java 2009-11-21
17:13:50 UTC (rev 8362)
+++
branches/ClebertCallback/src/main/org/hornetq/core/journal/SequentialFile.java 2009-11-21
20:37:23 UTC (rev 8363)
@@ -80,6 +80,9 @@
void sync() throws Exception;
+ /** This method will make sure the parameter callback will be invoked after all
pending sync operations are done */
+ void syncCallback(IOAsyncTask callback);
+
long size() throws Exception;
void renameTo(String newFileName) throws Exception;
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-11-21
17:13:50 UTC (rev 8362)
+++
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-11-21
20:37:23 UTC (rev 8363)
@@ -304,4 +304,14 @@
throw new IllegalStateException("File not opened");
}
}
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.journal.impl.AbstractSequentialFile#syncCallbackDirect(org.hornetq.core.journal.IOAsyncTask)
+ */
+ @Override
+ protected void syncCallbackDirect(IOAsyncTask callback)
+ {
+ aioFile.syncCallback(callback);
+ }
+
}
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-21
17:13:50 UTC (rev 8362)
+++
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-21
20:37:23 UTC (rev 8363)
@@ -16,7 +16,6 @@
import java.io.File;
import java.nio.ByteBuffer;
import java.util.List;
-import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.core.journal.IOAsyncTask;
@@ -190,11 +189,30 @@
write(bytes, false, DummyCallback.getInstance());
}
}
+
+ /**
+ * invoke the callback after all pending operations are complete.
+ */
+ public void syncCallback(IOAsyncTask callback)
+ {
+ if (timedBuffer != null)
+ {
+ timedBuffer.syncCallback(callback);
+ }
+ else
+ {
+ syncCallbackDirect(callback);
+ }
+ }
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
+ protected abstract void syncCallbackDirect(IOAsyncTask callback);
+
protected File getFile()
{
return file;
@@ -252,15 +270,22 @@
{
public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final
List<IOAsyncTask> callbacks)
{
- buffer.flip();
-
- if (buffer.limit() == 0)
+ if (buffer == null)
{
- factory.releaseBuffer(buffer);
+ syncCallbackDirect(new DelegateCallback(callbacks));
}
else
{
- writeDirect(buffer, requestedSync, new DelegateCallback(callbacks));
+ buffer.flip();
+
+ if (buffer.limit() == 0)
+ {
+ factory.releaseBuffer(buffer);
+ }
+ else
+ {
+ writeDirect(buffer, requestedSync, new DelegateCallback(callbacks));
+ }
}
}
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-21
17:13:50 UTC (rev 8362)
+++
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-21
20:37:23 UTC (rev 8363)
@@ -864,7 +864,31 @@
callback.waitCompletion();
}
}
+
+ public void sync(IOCompletion callback)
+ {
+ callback.lineUp();
+ compactingLock.readLock().lock();
+
+ try
+ {
+ lockAppend.lock();
+ try
+ {
+ currentFile.getFile().syncCallback(callback);
+ }
+ finally
+ {
+ lockAppend.unlock();
+ }
+ }
+ finally
+ {
+ compactingLock.readLock().unlock();
+ }
+ }
+
public void appendAddRecord(final long id, final byte recordType, final
EncodingSupport record, final boolean sync, final IOCompletion callback) throws Exception
{
if (LOAD_TRACE)
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-11-21
17:13:50 UTC (rev 8362)
+++
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-11-21
20:37:23 UTC (rev 8363)
@@ -220,6 +220,18 @@
internalWrite(bytes, sync, null);
}
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.journal.impl.AbstractSequentialFile#syncCallbackDirect(org.hornetq.core.journal.IOAsyncTask)
+ */
+ @Override
+ protected void syncCallbackDirect(IOAsyncTask callback)
+ {
+ // Nothing to be done on NIO.
+ // Timed buffer took care of everything
+ callback.done();
+ }
+
/**
* @param bytes
* @param sync
@@ -243,4 +255,5 @@
callback.done();
}
}
+
}
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-21
17:13:50 UTC (rev 8362)
+++
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-21
20:37:23 UTC (rev 8363)
@@ -225,15 +225,30 @@
return true;
}
}
+
- public synchronized void addBytes(final byte[] bytes, final boolean sync, final
IOAsyncTask callback)
+ /**
+ * This method will make sure this callback will be executed after all the pending
callbacks
+ */
+ public synchronized void syncCallback(IOAsyncTask callback)
{
- if (buffer.writerIndex() == 0)
+ resumeTimerIfNeeded();
+
+ callbacks.add(callback);
+
+ pendingSync = true;
+
+ if (flushOnSync)
{
- // Resume latch
- latchTimer.down();
+ flush();
}
+ }
+
+ public synchronized void addBytes(final byte[] bytes, final boolean sync, final
IOAsyncTask callback)
+ {
+ resumeTimerIfNeeded();
+
buffer.writeBytes(bytes);
callbacks.add(callback);
@@ -259,35 +274,57 @@
}
}
+ private void resumeTimerIfNeeded()
+ {
+ if (buffer.writerIndex() == 0 && callbacks.size() == 0)
+ {
+ // Resume latch
+ latchTimer.down();
+ }
+ }
+
public synchronized void flush()
{
- if (buffer.writerIndex() > 0)
+ if (buffer.writerIndex() > 0 || callbacks.size() > 0)
{
+ // Stop latch
latchTimer.up();
-
- int pos = buffer.writerIndex();
-
- if (logRates)
+
+ if (buffer.writerIndex() == 0 && callbacks.size() > 0)
{
- bytesFlushed += pos;
+ // This is to perform a sync callback.
+ // When we get to here, means we have sync callbacks waiting with no buffer
+ // on this case we need to call sync on the file to make sure no other
callbacks are pending
+ bufferObserver.flushBuffer(null, pendingSync, callbacks);
+
+ callbacks = new LinkedList<IOAsyncTask>();
}
-
- ByteBuffer directBuffer = bufferObserver.newBuffer(bufferSize, pos);
-
- // Putting a byteArray on a native buffer is much faster, since it will do in a
single native call.
- // Using directBuffer.put(buffer) would make several append calls for each byte
-
- directBuffer.put(buffer.array(), 0, pos);
-
- bufferObserver.flushBuffer(directBuffer, pendingSync, callbacks);
-
- callbacks = new LinkedList<IOAsyncTask>();
-
- active = false;
- pendingSync = false;
-
- buffer.clear();
- bufferLimit = 0;
+ else
+ {
+ int pos = buffer.writerIndex();
+
+ if (logRates)
+ {
+ bytesFlushed += pos;
+ }
+
+ ByteBuffer directBuffer = bufferObserver.newBuffer(bufferSize, pos);
+
+ // Putting a byteArray on a native buffer is much faster, since it will do in
a single native call.
+ // Using directBuffer.put(buffer) would make several append calls for each
byte
+
+ directBuffer.put(buffer.array(), 0, pos);
+
+ bufferObserver.flushBuffer(directBuffer, pendingSync, callbacks);
+
+ callbacks = new LinkedList<IOAsyncTask>();
+
+ active = false;
+ pendingSync = false;
+
+ buffer.clear();
+ bufferLimit = 0;
+ }
}
}
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-21
17:13:50 UTC (rev 8362)
+++
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-21
20:37:23 UTC (rev 8363)
@@ -506,10 +506,7 @@
public void sync()
{
- if (replicator != null)
- {
- replicator.sync();
- }
+ messageJournal.sync(OperationContextImpl.getContext());
}
// Transactional operations
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-21
17:13:50 UTC (rev 8362)
+++
branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-21
20:37:23 UTC (rev 8363)
@@ -919,10 +919,7 @@
}
else
{
- if (storageManager.isReplicated())
- {
- storageManager.sync();
- }
+ storageManager.sync();
}
message.incrementRefCount(reference);
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-11-21
17:13:50 UTC (rev 8362)
+++
branches/ClebertCallback/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-11-21
20:37:23 UTC (rev 8363)
@@ -86,6 +86,6 @@
*/
void compareJournals(JournalLoadInformation[] journalInfo) throws HornetQException;
- void sync();
+ void sync(OperationContext ctx);
}
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2009-11-21
17:13:50 UTC (rev 8362)
+++
branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2009-11-21
20:37:23 UTC (rev 8363)
@@ -25,6 +25,7 @@
import org.hornetq.core.journal.TransactionFailureCallback;
import org.hornetq.core.journal.impl.JournalImpl.ByteArrayEncoding;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.replication.ReplicationManager;
@@ -433,6 +434,15 @@
localJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#sync()
+ */
+ public void sync(IOCompletion ctx)
+ {
+ replicationManager.sync((OperationContext)ctx);
+ localJournal.sync(ctx);
+ }
+
/**
* @param committedRecords
* @param preparedTransactions
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-21
17:13:50 UTC (rev 8362)
+++
branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-21
20:37:23 UTC (rev 8363)
@@ -439,9 +439,29 @@
replicatingChannel.sendBlocking(new ReplicationCompareDataMessage(journalInfo));
}
- public void sync()
+
+ public void sync(OperationContext context)
{
- sync(OperationContextImpl.getContext());
+ boolean executeNow = false;
+ synchronized (replicationLock)
+ {
+ context.replicationLineUp();
+ if (pendingTokens.isEmpty())
+ {
+ // this means the list is empty and we should process it now
+ executeNow = true;
+ }
+ else
+ {
+ // adding the sync to be executed in order
+ // as soon as the reponses are back from the backup
+ this.pendingTokens.add(new SyncOperation(context));
+ }
+ }
+ if (executeNow)
+ {
+ context.replicationDone();
+ }
}
// Package protected ---------------------------------------------
@@ -490,31 +510,6 @@
ctx.replicationDone();
}
}
-
- private void sync(OperationContext context)
- {
- boolean executeNow = false;
- synchronized (replicationLock)
- {
- context.replicationLineUp();
- if (pendingTokens.isEmpty())
- {
- // this means the list is empty and we should process it now
- executeNow = true;
- }
- else
- {
- // adding the sync to be executed in order
- // as soon as the reponses are back from the backup
- this.pendingTokens.add(new SyncOperation(context));
- }
- }
- if (executeNow)
- {
- context.replicationDone();
- }
- }
-
public OperationContext getContext()
{
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/client/OrderTest.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/client/OrderTest.java 2009-11-21
17:13:50 UTC (rev 8362)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/client/OrderTest.java 2009-11-21
20:37:23 UTC (rev 8363)
@@ -64,63 +64,48 @@
// Public --------------------------------------------------------
- public void testLoop() throws Exception
- {
- for (int i = 0 ; i < 50; i ++)
- {
- testSimpleOrder();
- tearDown();
- setUp();
- }
- }
-
public void testSimpleOrder() throws Exception
{
ClientSessionFactory sf = createNettyFactory();
- sf.setBlockOnNonPersistentSend(true);
- sf.setBlockOnPersistentSend(true);
+ sf.setBlockOnNonPersistentSend(false);
+ sf.setBlockOnPersistentSend(false);
sf.setBlockOnAcknowledge(true);
-
ClientSession session = sf.createSession(true, true, 0);
-
+
try
{
session.createQueue("queue", "queue", true);
ClientProducer prod = session.createProducer("queue");
-
for (int i = 0; i < 100; i++)
{
- ClientMessage msg = session.createClientMessage(i == 0);
+ ClientMessage msg = session.createClientMessage(i % 2 == 0);
msg.setBody(session.createBuffer(new byte[1024]));
msg.putIntProperty("id", i);
prod.send(msg);
}
session.close();
-
+
boolean started = false;
for (int start = 0; start < 3; start++)
{
-
-
- if (start == 20)
+
+ if (start == 2)
{
started = true;
server.stop();
server.start();
}
-
+
session = sf.createSession(true, true);
-
+
session.start();
-
-// fail(session);
-
+
ClientConsumer cons = session.createConsumer("queue");
for (int i = 0; i < 100; i++)
@@ -144,7 +129,7 @@
assertEquals(i, msg.getIntProperty("id").intValue());
}
}
-
+
session.close();
}
@@ -156,11 +141,10 @@
}
}
-
-
+
private void fail(ClientSession session) throws InterruptedException
{
-
+
final CountDownLatch latch = new CountDownLatch(1);
class MyListener implements SessionFailureListener
@@ -178,8 +162,6 @@
MyListener listener = new MyListener();
session.addFailureListener(listener);
-
-
RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
// Simulate failure on connection
@@ -190,12 +172,10 @@
boolean ok = latch.await(1000, TimeUnit.MILLISECONDS);
assertTrue(ok);
-
+
session.removeFailureListener(listener);
}
-
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java 2009-11-21
17:13:50 UTC (rev 8362)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java 2009-11-21
20:37:23 UTC (rev 8363)
@@ -52,12 +52,12 @@
// Public --------------------------------------------------------
- public void _test() throws Exception
+ public void test() throws Exception
{
for (int i = 0; i < 100; i++)
{
System.out.println("<<<<<< " + i + "
>>>>>>>");
- testTxMixedPersistentAndNonPersistentMessagesOrderWithReplicatedBackup();
+ testMixedPersistentAndNonPersistentMessagesOrderWithReplicatedBackup();
tearDown();
setUp();
}
@@ -80,7 +80,7 @@
ClientSessionFactory csf = new
ClientSessionFactoryImpl(getConnectorTransportConfiguration(true));
csf.setBlockOnNonPersistentSend(false);
- csf.setBlockOnPersistentSend(true);
+ csf.setBlockOnPersistentSend(false);
ClientSession session = null;
if (transactional)
{
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-21
17:13:50 UTC (rev 8362)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-21
20:37:23 UTC (rev 8363)
@@ -531,7 +531,7 @@
}
else
{
- manager.sync();
+ manager.sync(OperationContextImpl.getContext());
}
OperationContextImpl.getContext().executeOnCompletion(new IOAsyncTask()
@@ -961,5 +961,12 @@
{
}
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.journal.Journal#sync(org.hornetq.core.journal.IOCompletion)
+ */
+ public void sync(IOCompletion callback)
+ {
+ }
+
}
}
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java 2009-11-21
17:13:50 UTC (rev 8362)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java 2009-11-21
20:37:23 UTC (rev 8363)
@@ -85,25 +85,34 @@
protected static class CountDownCallback implements AIOCallback
{
- private final CountDownLatch latch;
+ private final CountDownLatch latchDone;
+ private final CountDownLatch waitCallback;
+
private final List<Integer> outputList;
private final int order;
private final AtomicInteger errors;
- public CountDownCallback(final CountDownLatch latch, final AtomicInteger errors,
final List<Integer> outputList, final int order)
+ public CountDownCallback(final CountDownLatch latch, final AtomicInteger errors,
final List<Integer> outputList, final int order, final CountDownLatch waitCallback)
{
- this.latch = latch;
+ this.latchDone = latch;
this.outputList = outputList;
this.order = order;
this.errors = errors;
+
+ this.waitCallback = waitCallback;
}
+ public CountDownCallback(final CountDownLatch latch, final AtomicInteger errors,
final List<Integer> outputList, final int order)
+ {
+ this(latch, errors, outputList, order, null);
+ }
+
volatile boolean doneCalled = false;
volatile int errorCalled = 0;
@@ -112,15 +121,26 @@
public void done()
{
+ if (waitCallback != null)
+ {
+ try
+ {
+ waitCallback.await();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(); // -> junit report
+ }
+ }
if (outputList != null)
{
outputList.add(order);
}
doneCalled = true;
timesDoneCalled.incrementAndGet();
- if (latch != null)
+ if (latchDone != null)
{
- latch.countDown();
+ latchDone.countDown();
}
}
@@ -135,11 +155,11 @@
{
errors.incrementAndGet();
}
- if (latch != null)
+ if (latchDone != null)
{
// even thought an error happened, we need to inform the latch,
// or the test won't finish
- latch.countDown();
+ latchDone.countDown();
}
}
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java 2009-11-21
17:13:50 UTC (rev 8362)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java 2009-11-21
20:37:23 UTC (rev 8363)
@@ -22,6 +22,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.TestSuite;
@@ -420,6 +421,60 @@
}
}
+ public void testOrderOnSynCallback() throws Exception
+ {
+ boolean closed = false;
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor,
pollerExecutor);
+ ByteBuffer buffer = null;
+ try
+ {
+ final int NUMBER_LINES = 100;
+ final int SIZE = 512;
+
+ controller.open(FILE_NAME, 100);
+
+ controller.fill(0, 1, NUMBER_LINES * SIZE, (byte)'j');
+
+ CountDownLatch latch = new CountDownLatch(NUMBER_LINES * 2);
+
+ buffer = AsynchronousFileImpl.newBuffer(SIZE);
+ buffer.rewind();
+ for (int j = 0; j < SIZE; j++)
+ {
+ buffer.put((byte)(j % Byte.MAX_VALUE));
+ }
+
+ ArrayList<Integer> result = new ArrayList<Integer>();
+
+ for (int i = 0; i < NUMBER_LINES * 2; i++)
+ {
+ CountDownCallback aio = new CountDownCallback(latch, null, result, i);
+ if (i % 2 == 0)
+ {
+ controller.write(i * SIZE, SIZE, buffer, aio);
+ }
+ else
+ {
+ controller.syncCallback(aio);
+ }
+ }
+
+ controller.close();
+ closed = true;
+
+ // We are not waiting the latch, as close should already hold for any writes
+ CountDownCallback.checkResults(NUMBER_LINES * 2, result);
+ }
+ finally
+ {
+ AsynchronousFileImpl.destroyBuffer(buffer);
+ if (!closed)
+ {
+ controller.close();
+ }
+ }
+ }
+
public void testBufferCallbackAwaysSameBuffer() throws Exception
{
boolean closed = false;
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java 2009-11-21
17:13:50 UTC (rev 8362)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java 2009-11-21
20:37:23 UTC (rev 8363)
@@ -17,11 +17,16 @@
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
+import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.tests.util.UnitTestCase;
/**
@@ -39,9 +44,9 @@
protected void setUp() throws Exception
{
super.setUp();
-
+
factory = createFactory();
-
+
factory.start();
}
@@ -49,13 +54,13 @@
protected void tearDown() throws Exception
{
assertEquals(0, AsynchronousFileImpl.getTotalMaxIO());
-
+
factory.stop();
-
+
factory = null;
-
+
forceGC();
-
+
super.tearDown();
}
@@ -172,7 +177,139 @@
sf2.close();
}
-
+
+ public void testOrder() throws Exception
+ {
+ SequentialFile sf = factory.createSequentialFile("order-test.hq", 100);
+
+ sf.open();
+
+ factory.activateBuffer(sf);
+
+ final int records = 5000;
+
+ sf.fill(0, records * 1024, (byte)0);
+
+
+ final ArrayList<Integer> result = new ArrayList<Integer>();
+
+ final CountDownLatch latch = new CountDownLatch(records);
+
+ HornetQBuffer buffer = ChannelBuffers.wrappedBuffer(new byte[512]);
+
+ for (int i = 0 ; i < records; i++)
+ {
+ final int toadd = i;
+ IOAsyncTask callback = new IOAsyncTask()
+ {
+
+ public void onError(int errorCode, String errorMessage)
+ {
+ }
+
+ public void done()
+ {
+ result.add(toadd);
+
+ latch.countDown();
+ }
+
+ };
+
+ if (i % 2 == 0)
+ {
+ sf.disableAutoFlush();
+ sf.fits(512);
+ sf.write(buffer, false, callback);
+ sf.enableAutoFlush();
+ }
+ else
+ {
+ sf.syncCallback(callback);
+ }
+ }
+
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+ assertEquals(records, result.size());
+
+ int i = 0;
+
+ for (Integer r : result)
+ {
+ assertEquals(i++, r.intValue());
+ }
+
+
+ factory.deactivateBuffer();
+
+ sf.close();
+ }
+
+ public void testOrder2() throws Exception
+ {
+ SequentialFile sf = factory.createSequentialFile("order-test.hq", 100);
+
+ sf.open();
+
+ factory.activateBuffer(sf);
+
+ final int records = 1000;
+
+ sf.fill(0, records * 1024, (byte)0);
+
+
+ final ArrayList<Integer> result = new ArrayList<Integer>();
+
+ final CountDownLatch latch = new CountDownLatch(records);
+
+ HornetQBuffer buffer = ChannelBuffers.wrappedBuffer(new byte[512]);
+
+ for (int i = 0 ; i < records; i++)
+ {
+ final int toadd = i;
+ IOAsyncTask callback = new IOAsyncTask()
+ {
+
+ public void onError(int errorCode, String errorMessage)
+ {
+ }
+
+ public void done()
+ {
+ result.add(toadd);
+
+ latch.countDown();
+ }
+
+ };
+
+ if (i == 10)
+ {
+ sf.write(buffer, false, callback);
+ }
+ else
+ {
+ sf.syncCallback(callback);
+ }
+ }
+
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+ assertEquals(records, result.size());
+
+ int i = 0;
+
+ for (Integer r : result)
+ {
+ assertEquals(i++, r.intValue());
+ }
+
+ factory.deactivateBuffer();
+
+ sf.close();
+ }
+
public void testRename() throws Exception
{
SequentialFile sf = factory.createSequentialFile("test1.hq", 1);
@@ -184,7 +321,7 @@
assertEquals(1, fileNames.size());
assertTrue(fileNames.contains("test1.hq"));
-
+
sf.renameTo("test1.cmp");
fileNames = factory.listFiles("cmp");
@@ -204,7 +341,7 @@
assertEquals(0, fileNames.size());
}
-
+
public void testWriteandRead() throws Exception
{
SequentialFile sf = factory.createSequentialFile("write.hq", 1);
@@ -222,7 +359,7 @@
String s3 = "echidna";
byte[] bytes3 = s3.getBytes("UTF-8");
ByteBuffer bb3 = factory.wrapBuffer(bytes3);
-
+
long initialPos = sf.position();
sf.writeDirect(bb1, true);
long bytesWritten = sf.position() - initialPos;
@@ -305,7 +442,6 @@
sf.writeDirect(bb2, true);
bytesWritten = sf.position() - initialPos;
-
assertEquals(bb2.limit(), bytesWritten);
initialPos = sf.position();
@@ -382,9 +518,9 @@
try
{
-
+
bb1 = factory.wrapBuffer(bytes1);
-
+
sf.writeDirect(bb1, true);
fail("Should throw exception");
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-11-21
17:13:50 UTC (rev 8362)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-11-21
20:37:23 UTC (rev 8363)
@@ -634,10 +634,17 @@
*/
public void setTimedBuffer(TimedBuffer buffer)
{
- // TODO Auto-generated method stub
}
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.journal.SequentialFile#syncCallback(org.hornetq.core.journal.IOAsyncTask)
+ */
+ public void syncCallback(IOAsyncTask callback)
+ {
+ callback.done();
+ }
+
}
/* (non-Javadoc)