Author: clebert.suconic(a)jboss.com
Date: 2009-12-04 15:34:45 -0500 (Fri, 04 Dec 2009)
New Revision: 8570
Modified:
trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java
trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/OperationContextUnitTest.java
Log:
A few tweaks on the journal and OperationContext
Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2009-12-04
20:25:10 UTC (rev 8569)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2009-12-04
20:34:45 UTC (rev 8570)
@@ -187,7 +187,9 @@
if (writingChannel != null)
{
sequentialFile.position(0);
- sequentialFile.writeDirect(writingChannel.toByteBuffer(), true);
+ SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
+ sequentialFile.writeDirect(writingChannel.toByteBuffer(), true, completion);
+ completion.waitCompletion();
sequentialFile.close();
newDataFiles.add(currentFile);
}
@@ -224,7 +226,7 @@
writingChannel.writeInt(fileID);
}
- protected void addToRecordsSnaptsho(long id)
+ protected void addToRecordsSnaptshot(long id)
{
recordsSnapshot.add(id);
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java
===================================================================
---
trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java 2009-12-04
20:25:10 UTC (rev 8569)
+++
trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java 2009-12-04
20:34:45 UTC (rev 8570)
@@ -141,7 +141,8 @@
{
if (timedBuffer != null)
{
- timedBuffer.flush();
+ // When moving to a new file, we need to make sure any pending buffer will be
transfered to the buffer
+ timedBuffer.flush(true);
timedBuffer.setObserver(null);
}
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2009-12-04 20:25:10
UTC (rev 8569)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2009-12-04 20:34:45
UTC (rev 8570)
@@ -176,7 +176,7 @@
{
for (long id : ids)
{
- addToRecordsSnaptsho(id);
+ addToRecordsSnaptshot(id);
}
}
@@ -184,7 +184,7 @@
{
for (long id : ids2)
{
- addToRecordsSnaptsho(id);
+ addToRecordsSnaptshot(id);
}
}
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-12-04 20:25:10 UTC
(rev 8569)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-12-04 20:34:45 UTC
(rev 8570)
@@ -272,9 +272,18 @@
public void flush()
{
+ flush(false);
+ }
+
+ /**
+ * force means the Journal is moving to a new file. Any pending write need to be done
immediately
+ * or data could be lost
+ * */
+ public void flush(final boolean force)
+ {
synchronized (this)
{
- if (!delayFlush && buffer.writerIndex() > 0)
+ if ((force || !delayFlush) && buffer.writerIndex() > 0)
{
int pos = buffer.writerIndex();
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-12-04
20:25:10 UTC (rev 8569)
+++
trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-12-04
20:34:45 UTC (rev 8570)
@@ -223,9 +223,6 @@
*/
public void complete()
{
- // We hold errors until the complete is set, or the callbacks will never get
informed
- errorCode = -1;
- errorMessage = null;
}
/* (non-Javadoc)
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/OperationContextUnitTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/OperationContextUnitTest.java 2009-12-04
20:25:10 UTC (rev 8569)
+++
trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/OperationContextUnitTest.java 2009-12-04
20:34:45 UTC (rev 8570)
@@ -19,6 +19,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.tests.util.UnitTestCase;
@@ -42,7 +43,7 @@
// Public --------------------------------------------------------
- public void testCaptureException() throws Exception
+ public void testCaptureExceptionOnExecutor() throws Exception
{
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.shutdown();
@@ -92,6 +93,80 @@
assertEquals(1, numberOfFailures.get());
}
+ public void testCaptureExceptionOnFailure() throws Exception
+ {
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ final OperationContextImpl context = new OperationContextImpl(executor)
+ {
+ public void complete()
+ {
+ super.complete();
+ latch.countDown();
+ }
+
+ };
+
+ context.storeLineUp();
+
+ final AtomicInteger failures = new AtomicInteger(0);
+
+ Thread t = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ context.waitCompletion(5000);
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ failures.incrementAndGet();
+ }
+ }
+ };
+
+ t.start();
+
+ // Need to wait complete to be called first or the test would be invalid.
+ // We use a latch instead of forcing a sleep here
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+ context.onError(1, "Poop happens!");
+
+ t.join();
+
+ assertEquals(1, failures.get());
+
+
+ failures.set(0);
+
+ final AtomicInteger operations = new AtomicInteger(0);
+
+ // We should be up to date with lineUps and executions. this should now just finish
processing
+ context.executeOnCompletion(new IOAsyncTask()
+ {
+
+ public void done()
+ {
+ operations.incrementAndGet();
+ }
+
+ public void onError(int errorCode, String errorMessage)
+ {
+ failures.incrementAndGet();
+ }
+
+ });
+
+
+ assertEquals(1, failures.get());
+ assertEquals(0, operations.get());
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Show replies by date