[hornetq-commits] JBoss hornetq SVN: r8375 - in branches/ClebertCallback: tests/src/org/hornetq/tests/integration/replication and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Nov 23 00:45:35 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-11-23 00:45:35 -0500 (Mon, 23 Nov 2009)
New Revision: 8375

Modified:
   branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
   branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
treating exceptions on the OperationContext

Modified: branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java	2009-11-23 04:57:31 UTC (rev 8374)
+++ branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java	2009-11-23 05:45:35 UTC (rev 8375)
@@ -111,6 +111,12 @@
    /** You may have several actions to be done after a replication operation is completed. */
    public void executeOnCompletion(final IOAsyncTask completion)
    {
+      if (errorCode != -1)
+      {
+         completion.onError(errorCode, errorMessage);
+         return;
+      }
+
       boolean executeNow = false;
 
       synchronized (this)
@@ -175,10 +181,8 @@
          while (iter.hasNext())
          {
             TaskHolder holder = iter.next();
-            if (!holder.executed && stored >= holder.storeLined && replicated >= holder.replicationLined)
+            if (stored >= holder.storeLined && replicated >= holder.replicationLined)
             {
-               holder.executed = true;
-
                if (executor != null)
                {
                   // If set, we use an executor to avoid the server being single threaded
@@ -224,15 +228,6 @@
     */
    public void complete()
    {
-      // TODO: test and fix exceptions on the Context
-      if (tasks != null && errorMessage != null)
-      {
-         for (TaskHolder run : tasks)
-         {
-            run.task.onError(errorCode, errorMessage);
-         }
-      }
-
       // We hold errors until the complete is set, or the callbacks will never get informed
       errorCode = -1;
       errorMessage = null;
@@ -241,10 +236,21 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.asyncio.AIOCallback#onError(int, java.lang.String)
     */
-   public void onError(int errorCode, String errorMessage)
+   public synchronized void onError(int errorCode, String errorMessage)
    {
       this.errorCode = errorCode;
       this.errorMessage = errorMessage;
+
+      if (tasks != null)
+      {
+         Iterator<TaskHolder> iter = tasks.iterator();
+         while (iter.hasNext())
+         {
+            TaskHolder holder = iter.next();
+            holder.task.onError(errorCode, errorMessage);
+            iter.remove();
+         }
+      }
    }
 
    class TaskHolder
@@ -253,8 +259,6 @@
 
       int replicationLined;
 
-      boolean executed;
-
       IOAsyncTask task;
 
       TaskHolder(IOAsyncTask task)

Modified: branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-11-23 04:57:31 UTC (rev 8374)
+++ branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-11-23 05:45:35 UTC (rev 8375)
@@ -26,6 +26,7 @@
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.hornetq.core.buffers.ChannelBuffers;
 import org.hornetq.core.client.ClientSessionFactory;
@@ -54,6 +55,7 @@
 import org.hornetq.core.persistence.OperationContext;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
 import org.hornetq.core.remoting.Interceptor;
 import org.hornetq.core.remoting.Packet;
 import org.hornetq.core.remoting.RemotingConnection;
@@ -412,7 +414,96 @@
          server.stop();
       }
    }
+   
+   public void testExceptionSettingActionBefore() throws Exception
+   {
+      OperationContext ctx = OperationContextImpl.getContext(factory);
+      
+      ctx.lineUp();
+      
+      String msg = "I'm an exception";
+      
+      ctx.onError(5, msg);
+      
+      final AtomicInteger lastError = new AtomicInteger(0);
+      
+      final List<String> msgsResult = new ArrayList<String>();
+      
+      final CountDownLatch latch = new CountDownLatch(1);
+      
+      ctx.executeOnCompletion(new IOAsyncTask()
+      {
+         public void onError(int errorCode, String errorMessage)
+         {
+            lastError.set(errorCode);
+            msgsResult.add(errorMessage);
+            latch.countDown();
+         }
+         
+         public void done()
+         {
+         }
+      });
+      
+      assertTrue(latch.await(5, TimeUnit.SECONDS));
+      
+      assertEquals(5, lastError.get());
+      
+      assertEquals(1, msgsResult.size());
+      
+      assertEquals(msg, msgsResult.get(0));
+      
+      final CountDownLatch latch2 = new CountDownLatch(1);
+      
+      // Adding the Task after the exception should still throw an exception
+      ctx.executeOnCompletion(new IOAsyncTask()
+      {
+         public void onError(int errorCode, String errorMessage)
+         {
+            lastError.set(errorCode);
+            msgsResult.add(errorMessage);
+            latch2.countDown();
+         }
+         
+         public void done()
+         {
+         }
+      });
+      
+      assertTrue(latch2.await(5, TimeUnit.SECONDS));
+      
+      assertEquals(2, msgsResult.size());
 
+      assertEquals(msg, msgsResult.get(0));
+      
+      assertEquals(msg, msgsResult.get(1));
+      
+      // Clearing any exception from the Context, so we can use the context again
+      ctx.complete();
+      
+
+      final CountDownLatch latch3 = new CountDownLatch(1);
+      
+      ctx.executeOnCompletion(new IOAsyncTask()
+      {
+         public void onError(int errorCode, String errorMessage)
+         {
+         }
+         
+         public void done()
+         {
+            latch3.countDown();
+         }
+      });
+      
+      
+      assertTrue(latch2.await(5, TimeUnit.SECONDS));
+      
+      
+      
+      
+   }
+
    /**
     * @return
     */



More information about the hornetq-commits mailing list