[jboss-cvs] JBoss Messaging SVN: r5576 - in trunk: src/main/org/jboss/messaging/core/persistence/impl/journal and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat Jan 3 10:04:37 EST 2009
Author: timfox
Date: 2009-01-03 10:04:37 -0500 (Sat, 03 Jan 2009)
New Revision: 5576
Modified:
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
trunk/src/main/org/jboss/messaging/core/transaction/TransactionPropertyIndexes.java
trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java
Log:
transaction, routing, paging refactoring part 6
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-01-03 13:02:57 UTC (rev 5575)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-01-03 15:04:37 UTC (rev 5576)
@@ -734,7 +734,7 @@
// Depage has to be done atomically, in case of failure it should be
// back to where it was
- Transaction depageTransaction = new TransactionImpl(storageManager, postOffice);
+ Transaction depageTransaction = new TransactionImpl(storageManager);
depageTransaction.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-01-03 13:02:57 UTC (rev 5575)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-01-03 15:04:37 UTC (rev 5576)
@@ -850,7 +850,7 @@
Xid xid = encodingXid.xid;
- Transaction tx = new TransactionImpl(preparedTransaction.id, xid, this, postOffice);
+ Transaction tx = new TransactionImpl(preparedTransaction.id, xid, this);
List<MessageReference> referencesToAck = new ArrayList<MessageReference>();
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java 2009-01-03 13:02:57 UTC (rev 5575)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java 2009-01-03 15:04:37 UTC (rev 5576)
@@ -391,7 +391,7 @@
private void createTx()
{
- tx = new TransactionImpl(storageManager, postOffice);
+ tx = new TransactionImpl(storageManager);
}
// Inner classes -------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java 2009-01-03 13:02:57 UTC (rev 5575)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java 2009-01-03 15:04:37 UTC (rev 5576)
@@ -208,7 +208,7 @@
log.warn("Message has exceeded max delivery attempts. No Dead Letter Address configured for queue " + queue.getName() +
" so dropping it");
- Transaction tx = new TransactionImpl(storageManager, postOffice);
+ Transaction tx = new TransactionImpl(storageManager);
acknowledge(tx, storageManager, postOffice, queueSettingsRepository);
@@ -239,7 +239,7 @@
{
log.warn("Message has expired. No expiry queue configured for queue " + queue.getName() + " so dropping it");
- Transaction tx = new TransactionImpl(storageManager, postOffice);
+ Transaction tx = new TransactionImpl(storageManager);
acknowledge(tx, storageManager, postOffice, queueSettingsRepository);
@@ -380,7 +380,7 @@
final HierarchicalRepository<QueueSettings> queueSettingsRepository,
final boolean expiry) throws Exception
{
- Transaction tx = new TransactionImpl(storageManager, postOffice);
+ Transaction tx = new TransactionImpl(storageManager);
// FIXME: JBMESSAGING-1468
ServerMessage copyMessage = makeCopy(expiry, storageManager);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-01-03 13:02:57 UTC (rev 5575)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-01-03 15:04:37 UTC (rev 5576)
@@ -29,8 +29,10 @@
import org.jboss.messaging.core.list.impl.PriorityLinkedListImpl;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.message.impl.MessageImpl;
+import org.jboss.messaging.core.paging.PageTransactionInfo;
import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.paging.impl.PageTransactionInfoImpl;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.DuplicateIDCache;
import org.jboss.messaging.core.postoffice.PostOffice;
@@ -186,7 +188,7 @@
{
//We need to store the duplicate id atomically with the message storage, so we need to create a tx for this
- tx = new TransactionImpl(storageManager, postOffice);
+ tx = new TransactionImpl(storageManager);
startedTx = true;
}
@@ -259,8 +261,19 @@
if (!depage && !message.isReload() && (pagingAddresses.contains(destination) || pagingManager.isPaging(destination)))
{
pagingAddresses.add(destination);
-
- tx.addPagingMessage(message);
+
+ List<ServerMessage> messages = (List<ServerMessage>)tx.getProperty(TransactionPropertyIndexes.PAGED_MESSAGES);
+
+ if (messages == null)
+ {
+ messages = new ArrayList<ServerMessage>();
+
+ tx.putProperty(TransactionPropertyIndexes.PAGED_MESSAGES, messages);
+
+ tx.addOperation(new PageMessageOperation());
+ }
+
+ messages.add(message);
}
else
{
@@ -314,57 +327,8 @@
tx.commit();
}
}
-
- private class AddMessageOperation implements TransactionOperation
- {
- private final MessageReference ref;
-
- private final boolean first;
-
- AddMessageOperation(final MessageReference ref, final boolean first)
- {
- this.ref = ref;
-
- this.first = first;
- }
-
- public void afterCommit(final Transaction tx) throws Exception
- {
- addLast(ref);
- }
-
- public void afterPrepare(final Transaction tx) throws Exception
- {
- }
-
- public void afterRollback(final Transaction tx) throws Exception
- {
- }
-
- public void beforeCommit(final Transaction tx) throws Exception
- {
- }
-
- public void beforePrepare(final Transaction tx) throws Exception
- {
- }
-
- public void beforeRollback(final Transaction tx) throws Exception
- {
- ServerMessage msg = ref.getMessage();
-
- PagingStore store = pagingManager.getPageStore(msg.getDestination());
-
- store.addSize(-ref.getMemoryEstimate());
-
- if (first)
- {
- store.addSize(-msg.getMemoryEstimate());
- }
- }
-
- }
-
+
+
// Queue implementation ----------------------------------------------------------------------------------------
public boolean isClustered()
@@ -626,7 +590,7 @@
{
int count = 0;
- Transaction tx = new TransactionImpl(storageManager, postOffice);
+ Transaction tx = new TransactionImpl(storageManager);
Iterator<MessageReference> iter = messageReferences.iterator();
@@ -665,7 +629,7 @@
{
boolean deleted = false;
- Transaction tx = new TransactionImpl(storageManager, postOffice);
+ Transaction tx = new TransactionImpl(storageManager);
Iterator<MessageReference> iter = messageReferences.iterator();
@@ -713,7 +677,7 @@
final PostOffice postOffice,
final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
{
- Transaction tx = new TransactionImpl(storageManager, postOffice);
+ Transaction tx = new TransactionImpl(storageManager);
int count = 0;
Iterator<MessageReference> iter = messageReferences.iterator();
@@ -800,7 +764,7 @@
final PostOffice postOffice,
final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
{
- Transaction tx = new TransactionImpl(storageManager, postOffice);
+ Transaction tx = new TransactionImpl(storageManager);
int count = 0;
Iterator<MessageReference> iter = messageReferences.iterator();
@@ -1160,5 +1124,154 @@
}
}
}
+
+ //TODO - this can be further optimised to have one PageMessageOperation per message, NOT one which uses a shared list
+ private class PageMessageOperation implements TransactionOperation
+ {
+ public void afterCommit(final Transaction tx) throws Exception
+ {
+ }
+ public void afterPrepare(final Transaction tx) throws Exception
+ {
+ }
+
+ public void afterRollback(final Transaction tx) throws Exception
+ {
+ }
+
+ public void beforeCommit(final Transaction tx) throws Exception
+ {
+ if (tx.getState() != Transaction.State.PREPARED)
+ {
+ pageMessages(tx);
+ }
+ }
+
+ public void beforePrepare(final Transaction tx) throws Exception
+ {
+ pageMessages(tx);
+ }
+
+ public void beforeRollback(final Transaction tx) throws Exception
+ {
+ }
+
+ private void pageMessages(final Transaction tx) throws Exception
+ {
+ List<ServerMessage> messages = (List<ServerMessage>)tx.getProperty(TransactionPropertyIndexes.PAGED_MESSAGES);
+
+ if (messages != null && !messages.isEmpty())
+ {
+ PageTransactionInfo pageTransaction = (PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
+
+ if (pageTransaction == null)
+ {
+ pageTransaction = new PageTransactionInfoImpl(tx.getID());
+
+ tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransaction);
+
+ // To avoid a race condition where depage happens before the transaction is completed, we need to inform the
+ // pager about this transaction is being processed
+ pagingManager.addTransaction(pageTransaction);
+ }
+
+ boolean pagingPersistent = false;
+
+ HashSet<SimpleString> pagedDestinationsToSync = new HashSet<SimpleString>();
+
+ // We only need to add the dupl id header once per transaction
+ boolean first = true;
+ for (ServerMessage message : messages)
+ {
+ // http://wiki.jboss.org/wiki/JBossMessaging2Paging
+ // Explained under Transaction On Paging. (This is the item B)
+ if (pagingManager.page(message, tx.getID(), first))
+ {
+ if (message.isDurable())
+ {
+ // We only create pageTransactions if using persistent messages
+ pageTransaction.increment();
+ pagingPersistent = true;
+ pagedDestinationsToSync.add(message.getDestination());
+ }
+ }
+ else
+ {
+ // This could happen when the PageStore left the pageState
+
+ //TODO is this correct - don't we lose transactionality here???
+ postOffice.route(message, null);
+ }
+ first = false;
+ }
+
+ if (pagingPersistent)
+ {
+ tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
+
+ if (!pagedDestinationsToSync.isEmpty())
+ {
+ pagingManager.sync(pagedDestinationsToSync);
+ storageManager.storePageTransaction(tx.getID(), pageTransaction);
+ }
+ }
+
+ messages.clear();
+ }
+ }
+
+ }
+
+ private class AddMessageOperation implements TransactionOperation
+ {
+ private final MessageReference ref;
+
+ private final boolean first;
+
+ AddMessageOperation(final MessageReference ref, final boolean first)
+ {
+ this.ref = ref;
+
+ this.first = first;
+ }
+
+ public void afterCommit(final Transaction tx) throws Exception
+ {
+ addLast(ref);
+ }
+
+ public void afterPrepare(final Transaction tx) throws Exception
+ {
+ }
+
+ public void afterRollback(final Transaction tx) throws Exception
+ {
+ }
+
+ public void beforeCommit(final Transaction tx) throws Exception
+ {
+ }
+
+ public void beforePrepare(final Transaction tx) throws Exception
+ {
+ }
+
+ public void beforeRollback(final Transaction tx) throws Exception
+ {
+ ServerMessage msg = ref.getMessage();
+
+ PagingStore store = pagingManager.getPageStore(msg.getDestination());
+
+ store.addSize(-ref.getMemoryEstimate());
+
+ if (first)
+ {
+ store.addSize(-msg.getMemoryEstimate());
+ }
+ }
+
+ }
+
+
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-01-03 13:02:57 UTC (rev 5575)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-01-03 15:04:37 UTC (rev 5576)
@@ -249,7 +249,7 @@
closed = true;
- Transaction tx = new TransactionImpl(storageManager, postOffice);
+ Transaction tx = new TransactionImpl(storageManager);
while (iter.hasNext())
{
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-01-03 13:02:57 UTC (rev 5575)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-01-03 15:04:37 UTC (rev 5576)
@@ -218,7 +218,7 @@
if (!xa)
{
- tx = new TransactionImpl(storageManager, postOffice);
+ tx = new TransactionImpl(storageManager);
}
this.channel = channel;
@@ -1649,7 +1649,7 @@
}
finally
{
- tx = new TransactionImpl(storageManager, postOffice);
+ tx = new TransactionImpl(storageManager);
}
channel.confirm(packet);
@@ -2555,12 +2555,12 @@
{
// Might be null if XA
- tx = new TransactionImpl(storageManager, postOffice);
+ tx = new TransactionImpl(storageManager);
}
doRollback(tx);
- tx = new TransactionImpl(storageManager, postOffice);
+ tx = new TransactionImpl(storageManager);
}
private void send(final ServerMessage msg) throws Exception
Modified: trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java 2009-01-03 13:02:57 UTC (rev 5575)
+++ trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java 2009-01-03 15:04:37 UTC (rev 5576)
@@ -25,8 +25,6 @@
import javax.transaction.xa.Xid;
import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.paging.PageTransactionInfo;
-import org.jboss.messaging.core.server.ServerMessage;
/**
* A JBoss Messaging internal transaction
@@ -42,8 +40,6 @@
void rollback() throws Exception;
- void addPagingMessage(ServerMessage message);
-
int getOperationsCount();
long getID();
@@ -60,8 +56,6 @@
void markAsRollbackOnly(MessagingException messagingException);
- //void setPageTransaction(PageTransactionInfo pageTransaction);
-
long getCreateTime();
void addOperation(TransactionOperation sync);
Modified: trunk/src/main/org/jboss/messaging/core/transaction/TransactionPropertyIndexes.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/TransactionPropertyIndexes.java 2009-01-03 13:02:57 UTC (rev 5575)
+++ trunk/src/main/org/jboss/messaging/core/transaction/TransactionPropertyIndexes.java 2009-01-03 15:04:37 UTC (rev 5576)
@@ -45,4 +45,6 @@
public static final int CONTAINS_PERSISTENT = 4;
public static final int PAGE_TRANSACTION = 5;
+
+ public static final int PAGED_MESSAGES = 6;
}
Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2009-01-03 13:02:57 UTC (rev 5575)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2009-01-03 15:04:37 UTC (rev 5576)
@@ -13,7 +13,6 @@
package org.jboss.messaging.core.transaction.impl;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
import javax.transaction.xa.Xid;
@@ -21,15 +20,11 @@
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.paging.PageTransactionInfo;
-import org.jboss.messaging.core.paging.PagingManager;
-import org.jboss.messaging.core.paging.impl.PageTransactionInfoImpl;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.core.transaction.TransactionOperation;
import org.jboss.messaging.core.transaction.TransactionPropertyIndexes;
-import org.jboss.messaging.util.SimpleString;
/**
* A TransactionImpl
@@ -51,15 +46,6 @@
private final StorageManager storageManager;
- private final PostOffice postOffice;
-
- private final PagingManager pagingManager;
-
- // FIXME: As part of https://jira.jboss.org/jira/browse/JBMESSAGING-1313
- private final List<ServerMessage> pagedMessages = new ArrayList<ServerMessage>();
-
- //private volatile PageTransactionInfo pageTransaction;
-
private final Xid xid;
private final long id;
@@ -72,24 +58,10 @@
private final long createTime;
- //For a transaction used for depaging, we never want to immediately page the refs again
- //private final boolean depage;
-
- public TransactionImpl(final StorageManager storageManager, final PostOffice postOffice)
+ public TransactionImpl(final StorageManager storageManager)
{
this.storageManager = storageManager;
- this.postOffice = postOffice;
-
- if (postOffice == null)
- {
- pagingManager = null;
- }
- else
- {
- pagingManager = postOffice.getPagingManager();
- }
-
xid = null;
id = storageManager.generateUniqueID();
@@ -101,17 +73,6 @@
{
this.storageManager = storageManager;
- this.postOffice = postOffice;
-
- if (postOffice == null)
- {
- pagingManager = null;
- }
- else
- {
- pagingManager = postOffice.getPagingManager();
- }
-
this.xid = xid;
id = storageManager.generateUniqueID();
@@ -119,25 +80,14 @@
createTime = System.currentTimeMillis();
}
- public TransactionImpl(final long id, final Xid xid, final StorageManager storageManager, final PostOffice postOffice)
+ public TransactionImpl(final long id, final Xid xid, final StorageManager storageManager)
{
this.storageManager = storageManager;
- this.postOffice = postOffice;
-
this.xid = xid;
this.id = id;
- if (postOffice == null)
- {
- pagingManager = null;
- }
- else
- {
- pagingManager = postOffice.getPagingManager();
- }
-
createTime = System.currentTimeMillis();
}
@@ -181,8 +131,6 @@
}
}
- pageMessages();
-
storageManager.prepare(id, xid);
state = State.PREPARED;
@@ -237,11 +185,6 @@
}
}
- if (state != State.PREPARED)
- {
- pageMessages();
- }
-
if ((getProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT) != null) || xid != null)
{
storageManager.commit(id);
@@ -258,8 +201,6 @@
pageTransaction.commit();
}
- clear();
-
state = State.COMMITTED;
if (operations != null)
@@ -367,16 +308,6 @@
operations.remove(operation);
}
-// public void setPageTransaction(PageTransactionInfo pageTransaction)
-// {
-// this.pageTransaction = pageTransaction;
-// }
-
- public void addPagingMessage(final ServerMessage message)
- {
- this.pagedMessages.add(message);
- }
-
public int getOperationsCount()
{
return operations.size();
@@ -417,8 +348,6 @@
{
pageTransaction.rollback();
}
-
- clear();
}
private void checkCreateOperations()
@@ -429,68 +358,4 @@
}
}
- private void pageMessages() throws Exception
- {
- if (!pagedMessages.isEmpty())
- {
- PageTransactionInfo pageTransaction = (PageTransactionInfo)getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
-
- if (pageTransaction == null)
- {
- pageTransaction = new PageTransactionInfoImpl(id);
-
- putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransaction);
-
- // To avoid a race condition where depage happens before the transaction is completed, we need to inform the
- // pager about this transaction is being processed
- pagingManager.addTransaction(pageTransaction);
- }
-
- boolean pagingPersistent = false;
-
- HashSet<SimpleString> pagedDestinationsToSync = new HashSet<SimpleString>();
-
- // We only need to add the dupl id header once per transaction
- boolean first = true;
- for (ServerMessage message : pagedMessages)
- {
- // http://wiki.jboss.org/wiki/JBossMessaging2Paging
- // Explained under Transaction On Paging. (This is the item B)
- if (pagingManager.page(message, id, first))
- {
- if (message.isDurable())
- {
- // We only create pageTransactions if using persistent messages
- pageTransaction.increment();
- pagingPersistent = true;
- pagedDestinationsToSync.add(message.getDestination());
- }
- }
- else
- {
- // This could happen when the PageStore left the pageState
-
- //TODO is this correct - don't we lose transactionality here???
- postOffice.route(message, null);
- }
- first = false;
- }
-
- if (pagingPersistent)
- {
- putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
-
- if (!pagedDestinationsToSync.isEmpty())
- {
- pagingManager.sync(pagedDestinationsToSync);
- storageManager.storePageTransaction(id, pageTransaction);
- }
- }
- }
- }
-
- private void clear()
- {
- pagedMessages.clear();
- }
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java 2009-01-03 13:02:57 UTC (rev 5575)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java 2009-01-03 15:04:37 UTC (rev 5576)
@@ -669,7 +669,7 @@
EasyMock.replay(sm, po);
- Transaction tx = new TransactionImpl(sm, po);
+ Transaction tx = new TransactionImpl(sm);
EasyMock.verify(sm, po);
More information about the jboss-cvs-commits
mailing list