[jboss-cvs] JBoss Messaging SVN: r5575 - in trunk/src/main/org/jboss/messaging/core: persistence/impl/journal and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat Jan 3 08:02:58 EST 2009
Author: timfox
Date: 2009-01-03 08:02:57 -0500 (Sat, 03 Jan 2009)
New Revision: 5575
Modified:
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/DuplicateIDCacheImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
trunk/src/main/org/jboss/messaging/core/transaction/TransactionPropertyIndexes.java
trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
Log:
Paging, routing, tx refactoring part 5
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-01-03 12:01:08 UTC (rev 5574)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-01-03 13:02:57 UTC (rev 5575)
@@ -736,7 +736,7 @@
Transaction depageTransaction = new TransactionImpl(storageManager, postOffice);
- depageTransaction.setContainsPersistent(true);
+ depageTransaction.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
depageTransaction.putProperty(TransactionPropertyIndexes.IS_DEPAGE, Boolean.valueOf(true));
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-01-03 12:01:08 UTC (rev 5574)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-01-03 13:02:57 UTC (rev 5575)
@@ -78,6 +78,7 @@
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.ResourceManager;
import org.jboss.messaging.core.transaction.Transaction;
+import org.jboss.messaging.core.transaction.TransactionPropertyIndexes;
import org.jboss.messaging.core.transaction.impl.TransactionImpl;
import org.jboss.messaging.util.IDGenerator;
import org.jboss.messaging.util.JBMThreadFactory;
@@ -912,7 +913,7 @@
pageTransactionInfo.markIncomplete();
- tx.setPageTransaction(pageTransactionInfo);
+ tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransactionInfo);
pagingManager.addTransaction(pageTransactionInfo);
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/DuplicateIDCacheImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/DuplicateIDCacheImpl.java 2009-01-03 12:01:08 UTC (rev 5574)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/DuplicateIDCacheImpl.java 2009-01-03 13:02:57 UTC (rev 5575)
@@ -31,6 +31,7 @@
import org.jboss.messaging.core.postoffice.DuplicateIDCache;
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.core.transaction.TransactionOperation;
+import org.jboss.messaging.core.transaction.TransactionPropertyIndexes;
import org.jboss.messaging.util.ConcurrentHashSet;
import org.jboss.messaging.util.Pair;
import org.jboss.messaging.util.SimpleString;
@@ -180,7 +181,7 @@
{
storageManager.storeDuplicateIDTransactional(tx.getID(), address, duplID, recordID);
- tx.setContainsPersistent(true);
+ tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
}
// For a tx, it's important that the entry is not added to the cache until commit (or prepare)
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java 2009-01-03 12:01:08 UTC (rev 5574)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java 2009-01-03 13:02:57 UTC (rev 5575)
@@ -328,8 +328,8 @@
queue.getPersistenceID(),
message.getMessageID());
}
-
- tx.setContainsPersistent(true);
+
+ tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
}
}
@@ -353,7 +353,7 @@
{
if (message.isDurable() && queue.isDurable())
{
- tx.setContainsPersistent(true);
+ tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
}
tx.addOperation(new AcknowledgeOperation(storageManager, postOffice, queueSettingsRepository));
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-01-03 12:01:08 UTC (rev 5574)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-01-03 13:02:57 UTC (rev 5575)
@@ -303,8 +303,8 @@
tx.addOperation(new AddMessageOperation(ref, first));
if (durableRef)
- {
- tx.setContainsPersistent(true);
+ {
+ tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java 2009-01-03 12:01:08 UTC (rev 5574)
+++ trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java 2009-01-03 13:02:57 UTC (rev 5575)
@@ -22,18 +22,11 @@
package org.jboss.messaging.core.transaction;
-import java.util.List;
-import java.util.Set;
-
import javax.transaction.xa.Xid;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.paging.PageTransactionInfo;
-import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.core.settings.HierarchicalRepository;
-import org.jboss.messaging.core.settings.impl.QueueSettings;
-import org.jboss.messaging.util.SimpleString;
/**
* A JBoss Messaging internal transaction
@@ -51,8 +44,6 @@
void addPagingMessage(ServerMessage message);
- void setContainsPersistent(boolean containsPersistent);
-
int getOperationsCount();
long getID();
@@ -67,11 +58,9 @@
void setState(State state);
- boolean isContainsPersistent();
-
void markAsRollbackOnly(MessagingException messagingException);
- void setPageTransaction(PageTransactionInfo pageTransaction);
+ //void setPageTransaction(PageTransactionInfo pageTransaction);
long getCreateTime();
Modified: trunk/src/main/org/jboss/messaging/core/transaction/TransactionPropertyIndexes.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/TransactionPropertyIndexes.java 2009-01-03 12:01:08 UTC (rev 5574)
+++ trunk/src/main/org/jboss/messaging/core/transaction/TransactionPropertyIndexes.java 2009-01-03 13:02:57 UTC (rev 5575)
@@ -41,4 +41,8 @@
public static final int DESTINATIONS_IN_PAGE_MODE = 2;
public static final int IS_DEPAGE = 3;
+
+ public static final int CONTAINS_PERSISTENT = 4;
+
+ public static final int PAGE_TRANSACTION = 5;
}
Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2009-01-03 12:01:08 UTC (rev 5574)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2009-01-03 13:02:57 UTC (rev 5575)
@@ -15,7 +15,6 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import javax.transaction.xa.Xid;
@@ -26,10 +25,10 @@
import org.jboss.messaging.core.paging.impl.PageTransactionInfoImpl;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.core.transaction.TransactionOperation;
+import org.jboss.messaging.core.transaction.TransactionPropertyIndexes;
import org.jboss.messaging.util.SimpleString;
/**
@@ -59,7 +58,7 @@
// FIXME: As part of https://jira.jboss.org/jira/browse/JBMESSAGING-1313
private final List<ServerMessage> pagedMessages = new ArrayList<ServerMessage>();
- private volatile PageTransactionInfo pageTransaction;
+ //private volatile PageTransactionInfo pageTransaction;
private final Xid xid;
@@ -67,8 +66,6 @@
private volatile State state = State.ACTIVE;
- private volatile boolean containsPersistent;
-
private MessagingException messagingException;
private final Object timeoutLock = new Object();
@@ -157,15 +154,6 @@
return createTime;
}
- public void addAcknowledgement(final MessageReference acknowledgement) throws Exception
- {
- if (state != State.ACTIVE)
- {
- throw new IllegalStateException("Transaction is in invalid state " + state);
- }
-
- }
-
public void prepare() throws Exception
{
synchronized (timeoutLock)
@@ -254,7 +242,7 @@
pageMessages();
}
- if (containsPersistent || xid != null)
+ if ((getProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT) != null) || xid != null)
{
storageManager.commit(id);
}
@@ -262,6 +250,9 @@
// If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
// transaction until all the messages were added to the queue
// or else we could deliver the messages out of order
+
+ PageTransactionInfo pageTransaction = (PageTransactionInfo)getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
+
if (pageTransaction != null)
{
pageTransaction.commit();
@@ -355,11 +346,6 @@
return xid;
}
- public boolean isContainsPersistent()
- {
- return containsPersistent;
- }
-
public void markAsRollbackOnly(final MessagingException messagingException)
{
state = State.ROLLBACK_ONLY;
@@ -367,11 +353,6 @@
this.messagingException = messagingException;
}
- public void setContainsPersistent(final boolean containsPersistent)
- {
- this.containsPersistent = containsPersistent;
- }
-
public void addOperation(final TransactionOperation operation)
{
checkCreateOperations();
@@ -386,10 +367,10 @@
operations.remove(operation);
}
- public void setPageTransaction(PageTransactionInfo pageTransaction)
- {
- this.pageTransaction = pageTransaction;
- }
+// public void setPageTransaction(PageTransactionInfo pageTransaction)
+// {
+// this.pageTransaction = pageTransaction;
+// }
public void addPagingMessage(final ServerMessage message)
{
@@ -425,10 +406,12 @@
private void doRollback() throws Exception
{
- if (containsPersistent || xid != null)
+ if ((getProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT) != null) || xid != null)
{
storageManager.rollback(id);
}
+
+ PageTransactionInfo pageTransaction = (PageTransactionInfo)getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
if (state == State.PREPARED && pageTransaction != null)
{
@@ -450,9 +433,14 @@
{
if (!pagedMessages.isEmpty())
{
+ PageTransactionInfo pageTransaction = (PageTransactionInfo)getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
+
if (pageTransaction == null)
{
pageTransaction = new PageTransactionInfoImpl(id);
+
+ putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransaction);
+
// To avoid a race condition where depage happens before the transaction is completed, we need to inform the
// pager about this transaction is being processed
pagingManager.addTransaction(pageTransaction);
@@ -490,7 +478,8 @@
if (pagingPersistent)
{
- containsPersistent = true;
+ putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
+
if (!pagedDestinationsToSync.isEmpty())
{
pagingManager.sync(pagedDestinationsToSync);
More information about the jboss-cvs-commits
mailing list