[jboss-cvs] JBoss Messaging SVN: r5662 - in trunk/src/main/org/jboss/messaging/core/paging: impl and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jan 19 16:47:13 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-01-19 16:47:13 -0500 (Mon, 19 Jan 2009)
New Revision: 5662

Modified:
   trunk/src/main/org/jboss/messaging/core/paging/PageTransactionInfo.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PageTransactionInfoImpl.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
Log:
Interrupting pageTransaction.wait when the pageStore is stopped to avoid wait timeouts

Modified: trunk/src/main/org/jboss/messaging/core/paging/PageTransactionInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PageTransactionInfo.java	2009-01-19 20:15:40 UTC (rev 5661)
+++ trunk/src/main/org/jboss/messaging/core/paging/PageTransactionInfo.java	2009-01-19 21:47:13 UTC (rev 5662)
@@ -33,8 +33,12 @@
  */
 public interface PageTransactionInfo extends EncodingSupport
 {
-   boolean waitCompletion() throws Exception;
+   boolean waitCompletion(int timeoutMilliSeconds) throws Exception;
 
+   boolean isCommit();
+   
+   boolean isRollback();
+   
    void commit();
 
    void rollback();

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PageTransactionInfoImpl.java	2009-01-19 20:15:40 UTC (rev 5661)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PageTransactionInfoImpl.java	2009-01-19 21:47:13 UTC (rev 5662)
@@ -26,6 +26,7 @@
 import static org.jboss.messaging.util.DataConstants.SIZE_LONG;
 
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.jboss.messaging.core.paging.PageTransactionInfo;
@@ -48,7 +49,8 @@
 
    private CountDownLatch countDownCompleted;
 
-   private volatile boolean complete;
+   private volatile boolean committed;
+   private volatile boolean rolledback;
 
    private final AtomicInteger numberOfMessages = new AtomicInteger(0);
 
@@ -113,7 +115,7 @@
       numberOfMessages.set(buffer.getInt());
       countDownCompleted = null; // if it is being readed, probably it was
       // committed
-      complete = true; // Unless it is a incomplete prepare, which is marked by
+      committed = true; // Unless it is a incomplete prepare, which is marked by
       // markIcomplete
    }
 
@@ -130,36 +132,41 @@
 
    public void commit()
    {
-      complete = true;
+      committed = true;
       /** 
        * this is to avoid a race condition where the transaction still being committed while another thread is depaging messages
        */
       countDownCompleted.countDown();
    }
-
-   /** 
-    * this is to avoid a race condition where the transaction still being committed while another thread is depaging messages
-    */
-   public boolean waitCompletion() throws InterruptedException
+   
+   
+   public boolean waitCompletion(int timeoutMilliseconds) throws InterruptedException
    {
-      if (countDownCompleted != null)
-      {
-         countDownCompleted.await();
-      }
-
-      return complete;
+      return countDownCompleted.await(timeoutMilliseconds, TimeUnit.MILLISECONDS);
    }
-
+   
+   
+   public boolean isCommit()
+   {
+      return committed;
+   }
+   
+   public boolean isRollback()
+   {
+      return rolledback;
+   }
+   
    public void rollback()
    {
-      complete = false;
-
+      rolledback = true;
+      committed = false;
       countDownCompleted.countDown();
    }
 
    public void markIncomplete()
    {
-      complete = false;
+      committed = false;
+      rolledback = false;
       
       countDownCompleted = new CountDownLatch(1);
    }

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-01-19 20:15:40 UTC (rev 5661)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-01-19 21:47:13 UTC (rev 5662)
@@ -761,7 +761,7 @@
     * If persistent messages are also used, it will update eventual PageTransactions
     */
 
-   private void onDepage(final int pageId, final SimpleString destination, final List<PagedMessage> pagedMessages) throws Exception
+   private boolean onDepage(final int pageId, final SimpleString destination, final List<PagedMessage> pagedMessages) throws Exception
    {
       if (isTrace)
       {
@@ -771,7 +771,7 @@
       if (pagedMessages.size() == 0)
       {
          // nothing to be done on this case.
-         return;
+         return true;
       }
 
       // Depage has to be done atomically, in case of failure it should be
@@ -779,12 +779,10 @@
 
       Transaction depageTransaction = new TransactionImpl(storageManager);
 
-      depageTransaction.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
-
       depageTransaction.putProperty(TransactionPropertyIndexes.IS_DEPAGE, Boolean.valueOf(true));
 
       HashSet<PageTransactionInfo> pageTransactionsToUpdate = new HashSet<PageTransactionInfo>();
-
+      
       for (PagedMessage pagedMessage : pagedMessages)
       {
          ServerMessage message = null;
@@ -811,10 +809,26 @@
 
             // This is to avoid a race condition where messages are depaged
             // before the commit arrived
-            if (!pageTransactionInfo.waitCompletion())
+            
+            while (running && !pageTransactionInfo.waitCompletion(500))
             {
+               // This is just to give us a chance to interrupt the process..
+               // if we start a shutdown in the middle of transactions, the commit/rollback may never come, delaying the shutdown of the server
                if (isTrace)
                {
+                  trace("Waiting pageTransaction to complete");
+               }
+            }
+            
+            if (!running)
+            {
+               break;
+            }
+            
+            if (!pageTransactionInfo.isCommit())
+            {
+               if (isTrace)
+               {
                   trace("Rollback was called after prepare, ignoring message " + message);
                }
                continue;
@@ -830,9 +844,18 @@
 
          postOffice.route(message, depageTransaction);
       }
+      
+      if (!running)
+      {
+         depageTransaction.rollback();
+         return false;
+      }
 
       for (PageTransactionInfo pageWithTransaction : pageTransactionsToUpdate)
       {
+         // This will set the journal transaction to commit;
+         depageTransaction.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
+
          if (pageWithTransaction.getNumberOfMessages() == 0)
          {
             // http://wiki.jboss.org/wiki/JBossMessaging2Paging
@@ -852,6 +875,8 @@
       {
          trace("Depage committed, running = " + running);
       }
+      
+      return true;
    }
 
    /**
@@ -967,9 +992,10 @@
 
       List<PagedMessage> messages = page.read();
 
-      onDepage(page.getPageId(), storeName, messages);
-
-      page.delete();
+      if (onDepage(page.getPageId(), storeName, messages))
+      {
+         page.delete();
+      }
    }
 
    // Inner classes -------------------------------------------------
@@ -996,11 +1022,15 @@
 
                // Note: clearDepage is an atomic operation, it needs to be done even if readPage was not executed
                // because the page was full
-               if (!clearDepage())
+               if (running && !clearDepage())
                {
                   followingExecutor.execute(this);
                }
             }
+            else
+            {
+               System.out.println("Not running, giving up");
+            }
          }
          catch (Exception e)
          {




More information about the jboss-cvs-commits mailing list