Author: clebert.suconic(a)jboss.com
Date: 2009-11-23 00:45:35 -0500 (Mon, 23 Nov 2009)
New Revision: 8375
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
treating exceptions on the OperationContext
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-11-23
04:57:31 UTC (rev 8374)
+++
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-11-23
05:45:35 UTC (rev 8375)
@@ -111,6 +111,12 @@
/** You may have several actions to be done after a replication operation is
completed. */
public void executeOnCompletion(final IOAsyncTask completion)
{
+ if (errorCode != -1)
+ {
+ completion.onError(errorCode, errorMessage);
+ return;
+ }
+
boolean executeNow = false;
synchronized (this)
@@ -175,10 +181,8 @@
while (iter.hasNext())
{
TaskHolder holder = iter.next();
- if (!holder.executed && stored >= holder.storeLined &&
replicated >= holder.replicationLined)
+ if (stored >= holder.storeLined && replicated >=
holder.replicationLined)
{
- holder.executed = true;
-
if (executor != null)
{
// If set, we use an executor to avoid the server being single
threaded
@@ -224,15 +228,6 @@
*/
public void complete()
{
- // TODO: test and fix exceptions on the Context
- if (tasks != null && errorMessage != null)
- {
- for (TaskHolder run : tasks)
- {
- run.task.onError(errorCode, errorMessage);
- }
- }
-
// We hold errors until the complete is set, or the callbacks will never get
informed
errorCode = -1;
errorMessage = null;
@@ -241,10 +236,21 @@
/* (non-Javadoc)
* @see org.hornetq.core.asyncio.AIOCallback#onError(int, java.lang.String)
*/
- public void onError(int errorCode, String errorMessage)
+ public synchronized void onError(int errorCode, String errorMessage)
{
this.errorCode = errorCode;
this.errorMessage = errorMessage;
+
+ if (tasks != null)
+ {
+ Iterator<TaskHolder> iter = tasks.iterator();
+ while (iter.hasNext())
+ {
+ TaskHolder holder = iter.next();
+ holder.task.onError(errorCode, errorMessage);
+ iter.remove();
+ }
+ }
}
class TaskHolder
@@ -253,8 +259,6 @@
int replicationLined;
- boolean executed;
-
IOAsyncTask task;
TaskHolder(IOAsyncTask task)
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-23
04:57:31 UTC (rev 8374)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-23
05:45:35 UTC (rev 8375)
@@ -26,6 +26,7 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.client.ClientSessionFactory;
@@ -54,6 +55,7 @@
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.remoting.Interceptor;
import org.hornetq.core.remoting.Packet;
import org.hornetq.core.remoting.RemotingConnection;
@@ -412,7 +414,96 @@
server.stop();
}
}
+
+ public void testExceptionSettingActionBefore() throws Exception
+ {
+ OperationContext ctx = OperationContextImpl.getContext(factory);
+
+ ctx.lineUp();
+
+ String msg = "I'm an exception";
+
+ ctx.onError(5, msg);
+
+ final AtomicInteger lastError = new AtomicInteger(0);
+
+ final List<String> msgsResult = new ArrayList<String>();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ ctx.executeOnCompletion(new IOAsyncTask()
+ {
+ public void onError(int errorCode, String errorMessage)
+ {
+ lastError.set(errorCode);
+ msgsResult.add(errorMessage);
+ latch.countDown();
+ }
+
+ public void done()
+ {
+ }
+ });
+
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+ assertEquals(5, lastError.get());
+
+ assertEquals(1, msgsResult.size());
+
+ assertEquals(msg, msgsResult.get(0));
+
+ final CountDownLatch latch2 = new CountDownLatch(1);
+
+ // Adding the Task after the exception should still throw an exception
+ ctx.executeOnCompletion(new IOAsyncTask()
+ {
+ public void onError(int errorCode, String errorMessage)
+ {
+ lastError.set(errorCode);
+ msgsResult.add(errorMessage);
+ latch2.countDown();
+ }
+
+ public void done()
+ {
+ }
+ });
+
+ assertTrue(latch2.await(5, TimeUnit.SECONDS));
+
+ assertEquals(2, msgsResult.size());
+ assertEquals(msg, msgsResult.get(0));
+
+ assertEquals(msg, msgsResult.get(1));
+
+ // Clearing any exception from the Context, so we can use the context again
+ ctx.complete();
+
+
+ final CountDownLatch latch3 = new CountDownLatch(1);
+
+ ctx.executeOnCompletion(new IOAsyncTask()
+ {
+ public void onError(int errorCode, String errorMessage)
+ {
+ }
+
+ public void done()
+ {
+ latch3.countDown();
+ }
+ });
+
+
+ assertTrue(latch2.await(5, TimeUnit.SECONDS));
+
+
+
+
+ }
+
/**
* @return
*/