[jboss-cvs] JBoss Messaging SVN: r5661 - in trunk/src/main/org/jboss/messaging/core: persistence/impl/journal and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jan 19 15:15:40 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-01-19 15:15:40 -0500 (Mon, 19 Jan 2009)
New Revision: 5661

Modified:
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
Log:
Fix on XATest & Paging

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2009-01-19 17:23:09 UTC (rev 5660)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2009-01-19 20:15:40 UTC (rev 5661)
@@ -247,6 +247,8 @@
       pagingStoreFactory.setPagingManager(this);
 
       pagingStoreFactory.setStorageManager(storageManager);
+      
+      reloadStores();
 
       started = true;
    }

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-01-19 17:23:09 UTC (rev 5660)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-01-19 20:15:40 UTC (rev 5661)
@@ -51,6 +51,7 @@
 import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.core.transaction.TransactionPropertyIndexes;
 import org.jboss.messaging.core.transaction.impl.TransactionImpl;
+import org.jboss.messaging.util.Future;
 import org.jboss.messaging.util.SimpleString;
 
 /**
@@ -125,10 +126,7 @@
    // variable isTrace above
    private static void trace(final String message)
    {
-      if (isTrace)
-      {
-         log.trace(message);
-      }
+      log.trace(message);
    }
 
    // Constructors --------------------------------------------------
@@ -497,25 +495,23 @@
    {
       if (running)
       {
-         writeLock.lock();
-         currentPageLock.writeLock().lock();
+         running = false;
 
-         try
-         {
-            running = false;
+         Future future = new Future();
 
-            // FIXME -!! using a volatile to control execution is ugly.
-            // The runnable could remain running after paging store is stopped
+         executor.execute(future);
 
-            if (currentPage != null)
-            {
-               currentPage.close();
-            }
+         boolean ok = future.await(10000);
+
+         if (!ok)
+         {
+            log.warn("Timed out waiting for depage executor to stop");
          }
-         finally
+
+         if (currentPage != null)
          {
-            writeLock.unlock();
-            currentPageLock.writeLock().unlock();
+            currentPage.close();
+            currentPage = null;
          }
       }
    }
@@ -657,6 +653,11 @@
 
       try
       {
+         if (!running)
+         {
+            return null;
+         }
+
          if (numberOfPages == 0)
          {
             return null;
@@ -791,7 +792,7 @@
          message = pagedMessage.getMessage(storageManager);
 
          final long transactionIdDuringPaging = pagedMessage.getTransactionID();
-
+         
          if (transactionIdDuringPaging >= 0)
          {
             final PageTransactionInfo pageTransactionInfo = pagingManager.getTransaction(transactionIdDuringPaging);
@@ -804,7 +805,7 @@
                log.warn("Transaction " + pagedMessage.getTransactionID() +
                         " used during paging not found, ignoring message " +
                         message);
-               
+
                continue;
             }
 
@@ -847,7 +848,10 @@
 
       depageTransaction.commit();
 
-      trace("Depage committed");
+      if (isTrace)
+      {
+         trace("Depage committed, running = " + running);
+      }
    }
 
    /**

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-01-19 17:23:09 UTC (rev 5660)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-01-19 20:15:40 UTC (rev 5661)
@@ -30,6 +30,7 @@
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -75,7 +76,9 @@
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.ResourceManager;
 import org.jboss.messaging.core.transaction.Transaction;
+import org.jboss.messaging.core.transaction.TransactionOperation;
 import org.jboss.messaging.core.transaction.TransactionPropertyIndexes;
+import org.jboss.messaging.core.transaction.Transaction.State;
 import org.jboss.messaging.core.transaction.impl.TransactionImpl;
 import org.jboss.messaging.util.IDGenerator;
 import org.jboss.messaging.util.JBMThreadFactory;
@@ -770,6 +773,8 @@
                   tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransactionInfo);
 
                   pagingManager.addTransaction(pageTransactionInfo);
+                  
+                  tx.addOperation(new FinishPageMessageOperation());
 
                   break;
                }
@@ -1427,5 +1432,55 @@
          return SimpleString.sizeofString(address) + SimpleString.sizeofString(duplID);
       }
    }
+   
+   private class FinishPageMessageOperation implements TransactionOperation
+   {
 
+      public void afterCommit(final Transaction tx) throws Exception
+      {
+         // If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
+         // transaction until all the messages were added to the queue
+         // or else we could deliver the messages out of order
+         
+         PageTransactionInfo pageTransaction = (PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
+         
+         if (pageTransaction != null)
+         {
+            pageTransaction.commit();
+         }
+      }
+
+      public void afterPrepare(final Transaction tx) throws Exception
+      {
+      }
+
+      public void afterRollback(final Transaction tx) throws Exception
+      {
+         PageTransactionInfo pageTransaction = (PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
+
+         if (tx.getState() == State.PREPARED && pageTransaction != null)
+         {
+            pageTransaction.rollback();
+         }
+      }
+
+      public void beforeCommit(final Transaction tx) throws Exception
+      {
+      }
+
+      public void beforePrepare(final Transaction tx) throws Exception
+      {
+      }
+
+      public void beforeRollback(final Transaction tx) throws Exception
+      {
+      }
+
+      private void pageMessages(final Transaction tx) throws Exception
+      {
+      }
+
+   }
+   
+
 }

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-01-19 17:23:09 UTC (rev 5660)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-01-19 20:15:40 UTC (rev 5661)
@@ -153,8 +153,6 @@
       if (pagingManager != null)
       {
          pagingManager.setPostOffice(this);
-
-         pagingManager.start();
       }
 
       // Injecting the postoffice (itself) on queueFactory for paging-control
@@ -181,7 +179,6 @@
          messageExpiryExecutor.shutdown();
       }
 
-      pagingManager.stop();
 
       addressManager.clear();
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-01-19 17:23:09 UTC (rev 5660)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-01-19 20:15:40 UTC (rev 5661)
@@ -234,8 +234,6 @@
 
       pagingManager = createPagingManager();
 
-      pagingManager.start();
-
       resourceManager = new ResourceManagerImpl((int)configuration.getTransactionTimeout() / 1000,
                                                 configuration.getTransactionTimeoutScanPeriod());
       postOffice = new PostOfficeImpl(storageManager,
@@ -256,6 +254,8 @@
       securityStore.setSecurityManager(securityManager);
 
       postOffice.start();
+      
+      pagingManager.start();
 
       List<QueueBindingInfo> queueBindingInfos = new ArrayList<QueueBindingInfo>();
       List<SimpleString> destinations = new ArrayList<SimpleString>();
@@ -295,8 +295,6 @@
 
       Map<SimpleString, List<Pair<SimpleString, Long>>> duplicateIDMap = new HashMap<SimpleString, List<Pair<SimpleString, Long>>>();
 
-      
-      pagingManager.reloadStores();
 
       storageManager.loadMessageJournal(postOffice,
                                         storageManager,
@@ -422,6 +420,8 @@
          // Ignore
       }
 
+      pagingManager.stop();
+      pagingManager = null;
       securityStore = null;
       resourceManager.stop();
       resourceManager = null;




More information about the jboss-cvs-commits mailing list