[jboss-cvs] JBoss Messaging SVN: r5662 - in trunk/src/main/org/jboss/messaging/core/paging: impl and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jan 19 16:47:13 EST 2009
Author: clebert.suconic at jboss.com
Date: 2009-01-19 16:47:13 -0500 (Mon, 19 Jan 2009)
New Revision: 5662
Modified:
trunk/src/main/org/jboss/messaging/core/paging/PageTransactionInfo.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PageTransactionInfoImpl.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
Log:
Interrupting pageTransaction.wait when the pageStore is stopped to avoid wait timeouts
Modified: trunk/src/main/org/jboss/messaging/core/paging/PageTransactionInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PageTransactionInfo.java 2009-01-19 20:15:40 UTC (rev 5661)
+++ trunk/src/main/org/jboss/messaging/core/paging/PageTransactionInfo.java 2009-01-19 21:47:13 UTC (rev 5662)
@@ -33,8 +33,12 @@
*/
public interface PageTransactionInfo extends EncodingSupport
{
- boolean waitCompletion() throws Exception;
+ boolean waitCompletion(int timeoutMilliSeconds) throws Exception;
+ boolean isCommit();
+
+ boolean isRollback();
+
void commit();
void rollback();
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PageTransactionInfoImpl.java 2009-01-19 20:15:40 UTC (rev 5661)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PageTransactionInfoImpl.java 2009-01-19 21:47:13 UTC (rev 5662)
@@ -26,6 +26,7 @@
import static org.jboss.messaging.util.DataConstants.SIZE_LONG;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.messaging.core.paging.PageTransactionInfo;
@@ -48,7 +49,8 @@
private CountDownLatch countDownCompleted;
- private volatile boolean complete;
+ private volatile boolean committed;
+ private volatile boolean rolledback;
private final AtomicInteger numberOfMessages = new AtomicInteger(0);
@@ -113,7 +115,7 @@
numberOfMessages.set(buffer.getInt());
countDownCompleted = null; // if it is being readed, probably it was
// committed
- complete = true; // Unless it is a incomplete prepare, which is marked by
+ committed = true; // Unless it is a incomplete prepare, which is marked by
// markIcomplete
}
@@ -130,36 +132,41 @@
public void commit()
{
- complete = true;
+ committed = true;
/**
* this is to avoid a race condition where the transaction still being committed while another thread is depaging messages
*/
countDownCompleted.countDown();
}
-
- /**
- * this is to avoid a race condition where the transaction still being committed while another thread is depaging messages
- */
- public boolean waitCompletion() throws InterruptedException
+
+
+ public boolean waitCompletion(int timeoutMilliseconds) throws InterruptedException
{
- if (countDownCompleted != null)
- {
- countDownCompleted.await();
- }
-
- return complete;
+ return countDownCompleted.await(timeoutMilliseconds, TimeUnit.MILLISECONDS);
}
-
+
+
+ public boolean isCommit()
+ {
+ return committed;
+ }
+
+ public boolean isRollback()
+ {
+ return rolledback;
+ }
+
public void rollback()
{
- complete = false;
-
+ rolledback = true;
+ committed = false;
countDownCompleted.countDown();
}
public void markIncomplete()
{
- complete = false;
+ committed = false;
+ rolledback = false;
countDownCompleted = new CountDownLatch(1);
}
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-01-19 20:15:40 UTC (rev 5661)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-01-19 21:47:13 UTC (rev 5662)
@@ -761,7 +761,7 @@
* If persistent messages are also used, it will update eventual PageTransactions
*/
- private void onDepage(final int pageId, final SimpleString destination, final List<PagedMessage> pagedMessages) throws Exception
+ private boolean onDepage(final int pageId, final SimpleString destination, final List<PagedMessage> pagedMessages) throws Exception
{
if (isTrace)
{
@@ -771,7 +771,7 @@
if (pagedMessages.size() == 0)
{
// nothing to be done on this case.
- return;
+ return true;
}
// Depage has to be done atomically, in case of failure it should be
@@ -779,12 +779,10 @@
Transaction depageTransaction = new TransactionImpl(storageManager);
- depageTransaction.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
-
depageTransaction.putProperty(TransactionPropertyIndexes.IS_DEPAGE, Boolean.valueOf(true));
HashSet<PageTransactionInfo> pageTransactionsToUpdate = new HashSet<PageTransactionInfo>();
-
+
for (PagedMessage pagedMessage : pagedMessages)
{
ServerMessage message = null;
@@ -811,10 +809,26 @@
// This is to avoid a race condition where messages are depaged
// before the commit arrived
- if (!pageTransactionInfo.waitCompletion())
+
+ while (running && !pageTransactionInfo.waitCompletion(500))
{
+ // This is just to give us a chance to interrupt the process..
+ // if we start a shutdown in the middle of transactions, the commit/rollback may never come, delaying the shutdown of the server
if (isTrace)
{
+ trace("Waiting pageTransaction to complete");
+ }
+ }
+
+ if (!running)
+ {
+ break;
+ }
+
+ if (!pageTransactionInfo.isCommit())
+ {
+ if (isTrace)
+ {
trace("Rollback was called after prepare, ignoring message " + message);
}
continue;
@@ -830,9 +844,18 @@
postOffice.route(message, depageTransaction);
}
+
+ if (!running)
+ {
+ depageTransaction.rollback();
+ return false;
+ }
for (PageTransactionInfo pageWithTransaction : pageTransactionsToUpdate)
{
+ // This will set the journal transaction to commit;
+ depageTransaction.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
+
if (pageWithTransaction.getNumberOfMessages() == 0)
{
// http://wiki.jboss.org/wiki/JBossMessaging2Paging
@@ -852,6 +875,8 @@
{
trace("Depage committed, running = " + running);
}
+
+ return true;
}
/**
@@ -967,9 +992,10 @@
List<PagedMessage> messages = page.read();
- onDepage(page.getPageId(), storeName, messages);
-
- page.delete();
+ if (onDepage(page.getPageId(), storeName, messages))
+ {
+ page.delete();
+ }
}
// Inner classes -------------------------------------------------
@@ -996,11 +1022,15 @@
// Note: clearDepage is an atomic operation, it needs to be done even if readPage was not executed
// because the page was full
- if (!clearDepage())
+ if (running && !clearDepage())
{
followingExecutor.execute(this);
}
}
+ else
+ {
+ System.out.println("Not running, giving up");
+ }
}
catch (Exception e)
{
More information about the jboss-cvs-commits
mailing list