Author: clebert.suconic(a)jboss.com
Date: 2009-11-19 20:00:17 -0500 (Thu, 19 Nov 2009)
New Revision: 8326
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
Log:
Fixing tests
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-19
19:53:49 UTC (rev 8325)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-20
01:00:17 UTC (rev 8326)
@@ -2964,7 +2964,10 @@
{
// Set the delegated callback as a parameter
TransactionCallback txcallback = tx.getCallback(currentFile);
- txcallback.setDelegateCompletion(parameterCallback);
+ if (parameterCallback != null)
+ {
+ txcallback.setDelegateCompletion(parameterCallback);
+ }
callback = txcallback;
}
else
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-19
19:53:49 UTC (rev 8325)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-20
01:00:17 UTC (rev 8326)
@@ -15,6 +15,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
@@ -280,7 +281,7 @@
bufferObserver.flushBuffer(directBuffer, pendingSync, callbacks);
- callbacks = new ArrayList<IOAsyncTask>();
+ callbacks = new LinkedList<IOAsyncTask>();
active = false;
pendingSync = false;
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-19
19:53:49 UTC (rev 8325)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-20
01:00:17 UTC (rev 8326)
@@ -627,6 +627,8 @@
{
acknowledge(ref);
}
+
+ storageManager.completeOperations();
}
public void setExpiryAddress(final SimpleString expiryAddress)
@@ -945,6 +947,7 @@
try
{
storageManager.updateDeliveryCount(reference);
+ storageManager.completeOperations();
}
catch (Exception e)
{
@@ -974,6 +977,7 @@
try
{
sendToDeadLetterAddress(reference);
+ storageManager.completeOperations();
}
catch (Exception e)
{
@@ -1005,7 +1009,8 @@
{
try
{
- sendToDeadLetterAddress(reference);
+ storageManager.updateScheduledDeliveryTime(reference);
+ storageManager.completeOperations();
}
catch (Exception e)
{
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2009-11-19
19:53:49 UTC (rev 8325)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2009-11-20
01:00:17 UTC (rev 8326)
@@ -147,14 +147,38 @@
storageManager.prepare(id, xid);
state = State.PREPARED;
+ // We use the Callback even for non persistence
+ // If we are using non-persistence with replication, the replication manager
will have
+ // to execute this runnable in the correct order
+ storageManager.afterCompleteOperations(new IOAsyncTask()
+ {
- if (operations != null)
- {
- for (TransactionOperation operation : operations)
+ public void onError(int errorCode, String errorMessage)
{
- operation.afterPrepare(this);
+ log.warn("IO Error completing the transaction, code = " +
errorCode + ", message = " + errorMessage);
}
- }
+
+ public void done()
+ {
+ if (operations != null)
+ {
+ System.out.println("Prepare was executed fine");
+ for (TransactionOperation operation : operations)
+ {
+ try
+ {
+ operation.afterPrepare(TransactionImpl.this);
+ }
+ catch (Exception e)
+ {
+ //
https://jira.jboss.org/jira/browse/HORNETQ-188
+ // After commit shouldn't throw an exception
+ log.warn(e.getMessage(), e);
+ }
+ }
+ }
+ }
+ });
}
}
@@ -186,7 +210,11 @@
{
if (state == State.ACTIVE)
{
- prepare();
+ // Why do we need a prepare record on the onePhase optimization?
+ // Why we can't just go straight to commit, if we are doing one
phase anyway?
+ state = State.PREPARED;
+// System.out.println("Adding Prepare");
+// prepare();
}
}
if (state != State.PREPARED)
@@ -212,6 +240,7 @@
if (containsPersistent || (xid != null && state == State.PREPARED))
{
+ System.out.println("Adding commit");
storageManager.commit(id);
state = State.COMMITTED;
@@ -230,6 +259,7 @@
public void done()
{
+ System.out.println("Commit was executed fine");
if (operations != null)
{
for (TransactionOperation operation : operations)
@@ -283,13 +313,38 @@
state = State.ROLLEDBACK;
- if (operations != null)
+ // We use the Callback even for non persistence
+ // If we are using non-persistence with replication, the replication manager
will have
+ // to execute this runnable in the correct order
+ storageManager.afterCompleteOperations(new IOAsyncTask()
{
- for (TransactionOperation operation : operations)
+
+ public void onError(int errorCode, String errorMessage)
{
- operation.afterRollback(this);
+ log.warn("IO Error completing the transaction, code = " +
errorCode + ", message = " + errorMessage);
}
- }
+
+ public void done()
+ {
+ if (operations != null)
+ {
+ System.out.println("Rollback was executed fine");
+ for (TransactionOperation operation : operations)
+ {
+ try
+ {
+ operation.afterRollback(TransactionImpl.this);
+ }
+ catch (Exception e)
+ {
+ //
https://jira.jboss.org/jira/browse/HORNETQ-188
+ // After commit shouldn't throw an exception
+ log.warn(e.getMessage(), e);
+ }
+ }
+ }
+ }
+ });
}
}