[jboss-cvs] JBoss Messaging SVN: r4883 - in branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core: persistence/impl/journal and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Aug 27 12:36:28 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-08-27 12:36:27 -0400 (Wed, 27 Aug 2008)
New Revision: 4883

Modified:
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
Log:
Small tweaks.. docs... etc

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java	2008-08-27 15:41:32 UTC (rev 4882)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java	2008-08-27 16:36:27 UTC (rev 4883)
@@ -83,22 +83,19 @@
    public void decode(final MessagingBuffer buffer)
    {
       transactionID = buffer.getLong();
-      final long messageID = buffer.getLong();
       message.decode(buffer);
-      message.setMessageID(messageID);
    }
 
    public void encode(final MessagingBuffer buffer)
    {
       buffer.putLong(transactionID);
-      buffer.putLong(message.getMessageID());
       message.encode(buffer);
    }
 
    public int getEncodeSize()
    {
       
-      return 8 + 8 + message.getEncodeSize();
+      return 8 + message.getEncodeSize();
    }
    
    // Package protected ---------------------------------------------

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java	2008-08-27 15:41:32 UTC (rev 4882)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java	2008-08-27 16:36:27 UTC (rev 4883)
@@ -27,7 +27,6 @@
 
 import org.jboss.messaging.core.journal.SequentialFileFactory;
 import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
-import org.jboss.messaging.core.paging.PageMessage;
 import org.jboss.messaging.core.paging.PagingStore;
 import org.jboss.messaging.core.paging.PagingStoreFactory;
 import org.jboss.messaging.util.SimpleString;

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-08-27 15:41:32 UTC (rev 4882)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-08-27 16:36:27 UTC (rev 4883)
@@ -555,7 +555,10 @@
                Page page = depage();
                if (page == null)
                {
-                  listener.clearLastRecord(lastRecord);
+                  if (lastRecord != null)
+                  {
+                     listener.clearLastRecord(lastRecord);
+                  }
                   lastRecord = null;
                   break;
                }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-08-27 15:41:32 UTC (rev 4882)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-08-27 16:36:27 UTC (rev 4883)
@@ -49,6 +49,7 @@
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.paging.LastPageRecord;
 import org.jboss.messaging.core.paging.PageTransactionInfo;
+import org.jboss.messaging.core.paging.Pager;
 import org.jboss.messaging.core.paging.impl.LastPageRecordImpl;
 import org.jboss.messaging.core.paging.impl.PageTransactionInfoImpl;
 import org.jboss.messaging.core.persistence.StorageManager;
@@ -325,7 +326,9 @@
                
                pageTransactionInfo.setRecordID(record.id);
                
-               postOffice.getPager().addTransaction(pageTransactionInfo);
+               Pager pager = postOffice.getPager();
+               
+               pager.addTransaction(pageTransactionInfo);
 
                break;
 			   }
@@ -339,7 +342,9 @@
 			      
 			      recordImpl.decode(buff);
 			      
-			      postOffice.getPager().loadLastPage(recordImpl);
+               Pager pager = postOffice.getPager();
+               
+               pager.loadLastPage(recordImpl);
 			      
 			      break;
 			   }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-08-27 15:41:32 UTC (rev 4882)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-08-27 16:36:27 UTC (rev 4883)
@@ -74,6 +74,15 @@
    
    private static final Logger log = Logger.getLogger(PostOfficeImpl.class);
    
+   private static final boolean isTrace = log.isTraceEnabled();
+   
+   // This is just a debug tool method.
+   // During debugs you could make log.trace as log.info, and change the variable isTrace above
+   private static void trace(String message)
+   {
+      log.trace(message);
+   }
+   
    //private final int nodeID;
    
    private final ConcurrentMap<SimpleString, List<Binding>> mappings = new ConcurrentHashMap<SimpleString, List<Binding>>();
@@ -462,7 +471,7 @@
 
       public void clearLastRecord(LastPageRecord lastRecord) throws Exception
       {
-         System.out.println("Clearing lastRecord information!!!!!!");
+         trace("Clearing lastRecord information " + lastRecord.getLastId());
          storageManager.storeDelete(lastRecord.getRecordId());
       }
       
@@ -507,24 +516,31 @@
             final long transactionIdDuringPaging = msg.getTransactionID();
             if (transactionIdDuringPaging > 0)
             {
-               final PageTransactionInfo pageInfo = transactions.get(transactionIdDuringPaging);
-               if (pageInfo == null)
+               final PageTransactionInfo pageTransactionInfo = transactions.get(transactionIdDuringPaging);
+               
+               // http://wiki.jboss.org/auth/wiki/JBossMessaging2Paging
+               // This is the Step D described on the "Transactions on Paging" section
+               if (pageTransactionInfo == null)
                {
-                  // TODO make it .trace
-                  log.warn("Transaction " + msg.getTransactionID() + " not found, ignoring message " + msg.getMessage().getMessageID(), new Exception("trace"));
+                  if (isTrace)
+                  {
+                     trace("Transaction " + msg.getTransactionID() + " not found, ignoring message " + msg.getMessage().getMessageID());
+                  }
                   continue;
                }
                
                // This is to avoid a race condition where messages are depaged before the commit arrived
-               pageInfo.waitCompletion();
+               pageTransactionInfo.waitCompletion();
 
                /// Update information about transactions
                if (msg.getMessage().isDurable())
                {
-                  pageInfo.decrement();
-                  pageTransactionsToUpdate.add(pageInfo);
+                  pageTransactionInfo.decrement();
+                  pageTransactionsToUpdate.add(pageTransactionInfo);
                }
             }
+            
+            msg.getMessage().setMessageID(storageManager.generateMessageID());
 
             refsToAdd.addAll(PostOfficeImpl.this.route(msg.getMessage()));
             
@@ -538,7 +554,9 @@
          for (PageTransactionInfo pageWithTransaction: pageTransactionsToUpdate)
          {
             if (pageWithTransaction.getNumberOfMessages() == 0)
-            { // no more messages.. delete the PageWithTransactionInfo
+            { 
+               // http://wiki.jboss.org/auth/wiki/JBossMessaging2Paging
+               // numberOfReads==numberOfWrites -> We delete the record
                storageManager.storeDeleteTransactional(depageTransactionID, pageWithTransaction.getRecordID());
                this.transactions.remove(pageWithTransaction.getTransactionID());
             }
@@ -560,7 +578,7 @@
       
       public void loadLastPage(LastPageRecord lastPage) throws Exception
       {
-         System.out.println("LastPage loaded was " + lastPage.getLastId());
+         System.out.println("LastPage loaded was " + lastPage.getLastId() + " recordID = " + lastPage.getRecordId());
          pagingManager.getPageStore(lastPage.getDestination()).setLastRecord(lastPage);
       }
 

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-08-27 15:41:32 UTC (rev 4882)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-08-27 16:36:27 UTC (rev 4883)
@@ -40,7 +40,6 @@
 import org.jboss.messaging.core.filter.impl.FilterImpl;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.paging.Pager;
-import org.jboss.messaging.core.paging.impl.PageMessageImpl;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.FlowController;
@@ -302,8 +301,6 @@
          }
          throw e;         
       }
-
-      msg.setMessageID(storageManager.generateMessageID());
       
       // This allows the no-local consumers to filter out the messages that come
       // from the same connection.
@@ -314,6 +311,10 @@
       {
          if (!pager.page(msg))
          {
+            // We only set the messageID after we are sure the message is not being paged
+            // Paged messages won't have an ID until they are depaged 
+            msg.setMessageID(storageManager.generateMessageID());
+
             List<MessageReference> refs = postOffice.route(msg);
    
             if (msg.getDurableRefCount() != 0)
@@ -1151,10 +1152,6 @@
       {
          pager.messageDone(message);
       }
-      else
-      {
-         System.out.println("Still " + message.getRefCount());
-      }
       
       if (message.isDurable() && queue.isDurable())
       {

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-08-27 15:41:32 UTC (rev 4882)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-08-27 16:36:27 UTC (rev 4883)
@@ -34,7 +34,7 @@
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.paging.Pager;
-import org.jboss.messaging.core.paging.PageTransactionInfo;
+import org.jboss.messaging.core.paging.impl.PageTransactionInfoImpl;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.server.MessageReference;
@@ -43,8 +43,6 @@
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.Transaction;
-import org.jboss.messaging.core.paging.impl.PageMessageImpl;
-import org.jboss.messaging.core.paging.impl.PageTransactionInfoImpl;
 import org.jboss.messaging.util.SimpleString;
 
 /**
@@ -257,14 +255,16 @@
          storageManager.commit(id);
       }
 
-      if (pageTransaction != null)
+      for (MessageReference ref : refsToAdd)
       {
-         pageTransaction.complete();
+         ref.getQueue().addLast(ref);
       }
 
-      for (MessageReference ref : refsToAdd)
+      // If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the transaction until all the messages were added to the queue
+      // or else we could deliver the messages out of order
+      if (pageTransaction != null)
       {
-         ref.getQueue().addLast(ref);
+         pageTransaction.complete();
       }
 
       for (MessageReference reference : acknowledgements)
@@ -413,6 +413,10 @@
 
    private void route(final ServerMessage message) throws Exception
    {
+      // We only set the messageID after we are sure the message is not being paged
+      // Paged messages won't have an ID until they are depaged 
+      message.setMessageID(storageManager.generateMessageID());
+
       List<MessageReference> refs = postOffice.route(message);
 
       refsToAdd.addAll(refs);
@@ -445,12 +449,13 @@
       for (ServerMessage message: pagedMessages)
       {
        
+         // http://wiki.jboss.org/auth/wiki/JBossMessaging2Paging
+         // Explained under Transaction On Paging. (This is the item B)
          if (pager.page(message, id))
          {
-            // This could happen if the destination was in page mode when the message was added, and it was changed when effectively adding it
-            
             if (message.isDurable())
             {
+               // We only create pageTransactions if using persistent messages
                pageTransaction.increment();
                pagingPersistent = true;
                pagedDestinationsToSync.add(message.getDestination());




More information about the jboss-cvs-commits mailing list