[hornetq-commits] JBoss hornetq SVN: r8570 - in trunk: src/main/org/hornetq/core/persistence/impl/journal and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Dec 4 15:34:46 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-12-04 15:34:45 -0500 (Fri, 04 Dec 2009)
New Revision: 8570

Modified:
   trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
   trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java
   trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
   trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
   trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
   trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/OperationContextUnitTest.java
Log:
A few tweaks on the journal and OperationContext

Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java	2009-12-04 20:25:10 UTC (rev 8569)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java	2009-12-04 20:34:45 UTC (rev 8570)
@@ -187,7 +187,9 @@
       if (writingChannel != null)
       {
          sequentialFile.position(0);
-         sequentialFile.writeDirect(writingChannel.toByteBuffer(), true);
+         SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
+         sequentialFile.writeDirect(writingChannel.toByteBuffer(), true, completion);
+         completion.waitCompletion();
          sequentialFile.close();
          newDataFiles.add(currentFile);
       }
@@ -224,7 +226,7 @@
       writingChannel.writeInt(fileID);
    }
 
-   protected void addToRecordsSnaptsho(long id)
+   protected void addToRecordsSnaptshot(long id)
    {
       recordsSnapshot.add(id);
    }

Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java	2009-12-04 20:25:10 UTC (rev 8569)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java	2009-12-04 20:34:45 UTC (rev 8570)
@@ -141,7 +141,8 @@
    {
       if (timedBuffer != null)
       {
-         timedBuffer.flush();
+         // When moving to a new file, we need to make sure any pending buffer will be transfered to the buffer
+         timedBuffer.flush(true);
          timedBuffer.setObserver(null);
       }
    }

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java	2009-12-04 20:25:10 UTC (rev 8569)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java	2009-12-04 20:34:45 UTC (rev 8570)
@@ -176,7 +176,7 @@
       {
          for (long id : ids)
          {
-            addToRecordsSnaptsho(id);
+            addToRecordsSnaptshot(id);
          }
       }
 
@@ -184,7 +184,7 @@
       {
          for (long id : ids2)
          {
-            addToRecordsSnaptsho(id);
+            addToRecordsSnaptshot(id);
          }
       }
    }

Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java	2009-12-04 20:25:10 UTC (rev 8569)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java	2009-12-04 20:34:45 UTC (rev 8570)
@@ -272,9 +272,18 @@
 
    public void flush()
    {
+      flush(false);
+   }
+
+   /** 
+    * force means the Journal is moving to a new file. Any pending write need to be done immediately
+    * or data could be lost
+    * */
+   public void flush(final boolean force)
+   {
       synchronized (this)
       {
-         if (!delayFlush && buffer.writerIndex() > 0)
+         if ((force || !delayFlush) && buffer.writerIndex() > 0)
          {
             int pos = buffer.writerIndex();
 

Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java	2009-12-04 20:25:10 UTC (rev 8569)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java	2009-12-04 20:34:45 UTC (rev 8570)
@@ -223,9 +223,6 @@
     */
    public void complete()
    {
-      // We hold errors until the complete is set, or the callbacks will never get informed
-      errorCode = -1;
-      errorMessage = null;
    }
 
    /* (non-Javadoc)

Modified: trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/OperationContextUnitTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/OperationContextUnitTest.java	2009-12-04 20:25:10 UTC (rev 8569)
+++ trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/OperationContextUnitTest.java	2009-12-04 20:34:45 UTC (rev 8570)
@@ -19,6 +19,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
 import org.hornetq.tests.util.UnitTestCase;
 
@@ -42,7 +43,7 @@
 
    // Public --------------------------------------------------------
 
-   public void testCaptureException() throws Exception
+   public void testCaptureExceptionOnExecutor() throws Exception
    {
       ExecutorService executor = Executors.newSingleThreadExecutor();
       executor.shutdown();
@@ -92,6 +93,80 @@
       assertEquals(1, numberOfFailures.get());
    }
    
+   public void testCaptureExceptionOnFailure() throws Exception
+   {
+      ExecutorService executor = Executors.newSingleThreadExecutor();
+      
+      final CountDownLatch latch = new CountDownLatch(1);
+      
+      final OperationContextImpl context = new OperationContextImpl(executor)
+      {
+         public void complete()
+         {
+            super.complete();
+            latch.countDown();
+         }
+
+      };
+      
+      context.storeLineUp();
+      
+      final AtomicInteger failures = new AtomicInteger(0);
+      
+      Thread t = new Thread()
+      {
+         public void run()
+         {
+            try
+            {
+               context.waitCompletion(5000);
+            }
+            catch (Throwable e)
+            {
+               e.printStackTrace();
+               failures.incrementAndGet();
+            }
+         }
+      };
+      
+      t.start();
+
+      // Need to wait complete to be called first or the test would be invalid.
+      // We use a latch instead of forcing a sleep here
+      assertTrue(latch.await(5, TimeUnit.SECONDS));
+      
+      context.onError(1, "Poop happens!");
+      
+      t.join();
+      
+      assertEquals(1, failures.get());
+      
+      
+      failures.set(0);
+      
+      final AtomicInteger operations = new AtomicInteger(0); 
+      
+      // We should be up to date with lineUps and executions. this should now just finish processing
+      context.executeOnCompletion(new IOAsyncTask()
+      {
+
+         public void done()
+         {
+            operations.incrementAndGet();
+         }
+
+         public void onError(int errorCode, String errorMessage)
+         {
+            failures.incrementAndGet();
+         }
+         
+      });
+      
+      
+      assertEquals(1, failures.get());
+      assertEquals(0, operations.get());
+   }
+   
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------



More information about the hornetq-commits mailing list