[jboss-cvs] JBoss Messaging SVN: r5575 - in trunk/src/main/org/jboss/messaging/core: persistence/impl/journal and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sat Jan 3 08:02:58 EST 2009


Author: timfox
Date: 2009-01-03 08:02:57 -0500 (Sat, 03 Jan 2009)
New Revision: 5575

Modified:
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/DuplicateIDCacheImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
   trunk/src/main/org/jboss/messaging/core/transaction/TransactionPropertyIndexes.java
   trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
Log:
Paging, routing, tx refactoring part 5


Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-01-03 12:01:08 UTC (rev 5574)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-01-03 13:02:57 UTC (rev 5575)
@@ -736,7 +736,7 @@
       
       Transaction depageTransaction = new TransactionImpl(storageManager, postOffice);
       
-      depageTransaction.setContainsPersistent(true);
+      depageTransaction.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
       
       depageTransaction.putProperty(TransactionPropertyIndexes.IS_DEPAGE, Boolean.valueOf(true));
 

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-01-03 12:01:08 UTC (rev 5574)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-01-03 13:02:57 UTC (rev 5575)
@@ -78,6 +78,7 @@
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.ResourceManager;
 import org.jboss.messaging.core.transaction.Transaction;
+import org.jboss.messaging.core.transaction.TransactionPropertyIndexes;
 import org.jboss.messaging.core.transaction.impl.TransactionImpl;
 import org.jboss.messaging.util.IDGenerator;
 import org.jboss.messaging.util.JBMThreadFactory;
@@ -912,7 +913,7 @@
 
                   pageTransactionInfo.markIncomplete();
 
-                  tx.setPageTransaction(pageTransactionInfo);
+                  tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransactionInfo);                  
                   
                   pagingManager.addTransaction(pageTransactionInfo);
 

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/DuplicateIDCacheImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/DuplicateIDCacheImpl.java	2009-01-03 12:01:08 UTC (rev 5574)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/DuplicateIDCacheImpl.java	2009-01-03 13:02:57 UTC (rev 5575)
@@ -31,6 +31,7 @@
 import org.jboss.messaging.core.postoffice.DuplicateIDCache;
 import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.core.transaction.TransactionOperation;
+import org.jboss.messaging.core.transaction.TransactionPropertyIndexes;
 import org.jboss.messaging.util.ConcurrentHashSet;
 import org.jboss.messaging.util.Pair;
 import org.jboss.messaging.util.SimpleString;
@@ -180,7 +181,7 @@
       {
          storageManager.storeDuplicateIDTransactional(tx.getID(), address, duplID, recordID);
 
-         tx.setContainsPersistent(true);
+         tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
       }
 
       // For a tx, it's important that the entry is not added to the cache until commit (or prepare)

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java	2009-01-03 12:01:08 UTC (rev 5574)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java	2009-01-03 13:02:57 UTC (rev 5575)
@@ -328,8 +328,8 @@
                                                             queue.getPersistenceID(),
                                                             message.getMessageID());
             }
-
-            tx.setContainsPersistent(true);
+            
+            tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
          }
       }
 
@@ -353,7 +353,7 @@
    {
       if (message.isDurable() && queue.isDurable())
       {
-         tx.setContainsPersistent(true);
+         tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
       }
 
       tx.addOperation(new AcknowledgeOperation(storageManager, postOffice, queueSettingsRepository));

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-01-03 12:01:08 UTC (rev 5574)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-01-03 13:02:57 UTC (rev 5575)
@@ -303,8 +303,8 @@
             tx.addOperation(new AddMessageOperation(ref, first));
 
             if (durableRef)
-            {
-               tx.setContainsPersistent(true);
+            {               
+               tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
             }
          }
       }

Modified: trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java	2009-01-03 12:01:08 UTC (rev 5574)
+++ trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java	2009-01-03 13:02:57 UTC (rev 5575)
@@ -22,18 +22,11 @@
 
 package org.jboss.messaging.core.transaction;
 
-import java.util.List;
-import java.util.Set;
-
 import javax.transaction.xa.Xid;
 
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.paging.PageTransactionInfo;
-import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.core.settings.HierarchicalRepository;
-import org.jboss.messaging.core.settings.impl.QueueSettings;
-import org.jboss.messaging.util.SimpleString;
 
 /**
  * A JBoss Messaging internal transaction
@@ -51,8 +44,6 @@
 
    void addPagingMessage(ServerMessage message);
 
-   void setContainsPersistent(boolean containsPersistent);
-
    int getOperationsCount();
 
    long getID();
@@ -67,11 +58,9 @@
    
    void setState(State state);
 
-   boolean isContainsPersistent();
-
    void markAsRollbackOnly(MessagingException messagingException);
 
-   void setPageTransaction(PageTransactionInfo pageTransaction);
+   //void setPageTransaction(PageTransactionInfo pageTransaction);
 
    long getCreateTime();
 

Modified: trunk/src/main/org/jboss/messaging/core/transaction/TransactionPropertyIndexes.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/TransactionPropertyIndexes.java	2009-01-03 12:01:08 UTC (rev 5574)
+++ trunk/src/main/org/jboss/messaging/core/transaction/TransactionPropertyIndexes.java	2009-01-03 13:02:57 UTC (rev 5575)
@@ -41,4 +41,8 @@
    public static final int DESTINATIONS_IN_PAGE_MODE = 2;
    
    public static final int IS_DEPAGE = 3;
+   
+   public static final int CONTAINS_PERSISTENT = 4;
+   
+   public static final int PAGE_TRANSACTION = 5;
 }

Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2009-01-03 12:01:08 UTC (rev 5574)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2009-01-03 13:02:57 UTC (rev 5575)
@@ -15,7 +15,6 @@
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 
 import javax.transaction.xa.Xid;
 
@@ -26,10 +25,10 @@
 import org.jboss.messaging.core.paging.impl.PageTransactionInfoImpl;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.core.transaction.TransactionOperation;
+import org.jboss.messaging.core.transaction.TransactionPropertyIndexes;
 import org.jboss.messaging.util.SimpleString;
 
 /**
@@ -59,7 +58,7 @@
    // FIXME: As part of https://jira.jboss.org/jira/browse/JBMESSAGING-1313
    private final List<ServerMessage> pagedMessages = new ArrayList<ServerMessage>();
 
-   private volatile PageTransactionInfo pageTransaction;
+   //private volatile PageTransactionInfo pageTransaction;
 
    private final Xid xid;
 
@@ -67,8 +66,6 @@
 
    private volatile State state = State.ACTIVE;
 
-   private volatile boolean containsPersistent;
-
    private MessagingException messagingException;
 
    private final Object timeoutLock = new Object();
@@ -157,15 +154,6 @@
       return createTime;
    }
 
-   public void addAcknowledgement(final MessageReference acknowledgement) throws Exception
-   {
-      if (state != State.ACTIVE)
-      {
-         throw new IllegalStateException("Transaction is in invalid state " + state);
-      }
-
-   }
-
    public void prepare() throws Exception
    {
       synchronized (timeoutLock)
@@ -254,7 +242,7 @@
             pageMessages();
          }
 
-         if (containsPersistent || xid != null)
+         if ((getProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT) != null) || xid != null)
          {
             storageManager.commit(id);
          }
@@ -262,6 +250,9 @@
          // If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
          // transaction until all the messages were added to the queue
          // or else we could deliver the messages out of order
+         
+         PageTransactionInfo pageTransaction = (PageTransactionInfo)getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
+         
          if (pageTransaction != null)
          {
             pageTransaction.commit();
@@ -355,11 +346,6 @@
       return xid;
    }
 
-   public boolean isContainsPersistent()
-   {
-      return containsPersistent;
-   }
-
    public void markAsRollbackOnly(final MessagingException messagingException)
    {
       state = State.ROLLBACK_ONLY;
@@ -367,11 +353,6 @@
       this.messagingException = messagingException;
    }
 
-   public void setContainsPersistent(final boolean containsPersistent)
-   {
-      this.containsPersistent = containsPersistent;
-   }
-
    public void addOperation(final TransactionOperation operation)
    {
       checkCreateOperations();
@@ -386,10 +367,10 @@
       operations.remove(operation);
    }
    
-   public void setPageTransaction(PageTransactionInfo pageTransaction)
-   {
-      this.pageTransaction = pageTransaction;      
-   }
+//   public void setPageTransaction(PageTransactionInfo pageTransaction)
+//   {
+//      this.pageTransaction = pageTransaction;      
+//   }
    
    public void addPagingMessage(final ServerMessage message)
    {
@@ -425,10 +406,12 @@
 
    private void doRollback() throws Exception
    {
-      if (containsPersistent || xid != null)
+      if ((getProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT) != null) || xid != null)
       {
          storageManager.rollback(id);
       }
+      
+      PageTransactionInfo pageTransaction = (PageTransactionInfo)getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
 
       if (state == State.PREPARED && pageTransaction != null)
       {
@@ -450,9 +433,14 @@
    {
       if (!pagedMessages.isEmpty())
       {
+         PageTransactionInfo pageTransaction = (PageTransactionInfo)getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
+         
          if (pageTransaction == null)
          {
             pageTransaction = new PageTransactionInfoImpl(id);
+            
+            putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransaction);
+            
             // To avoid a race condition where depage happens before the transaction is completed, we need to inform the
             // pager about this transaction is being processed
             pagingManager.addTransaction(pageTransaction);
@@ -490,7 +478,8 @@
 
          if (pagingPersistent)
          {
-            containsPersistent = true;
+            putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
+            
             if (!pagedDestinationsToSync.isEmpty())
             {
                pagingManager.sync(pagedDestinationsToSync);




More information about the jboss-cvs-commits mailing list