[jboss-cvs] JBoss Messaging SVN: r4883 - in branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core: persistence/impl/journal and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Aug 27 12:36:28 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-08-27 12:36:27 -0400 (Wed, 27 Aug 2008)
New Revision: 4883
Modified:
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
Log:
Small tweaks.. docs... etc
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java 2008-08-27 15:41:32 UTC (rev 4882)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java 2008-08-27 16:36:27 UTC (rev 4883)
@@ -83,22 +83,19 @@
public void decode(final MessagingBuffer buffer)
{
transactionID = buffer.getLong();
- final long messageID = buffer.getLong();
message.decode(buffer);
- message.setMessageID(messageID);
}
public void encode(final MessagingBuffer buffer)
{
buffer.putLong(transactionID);
- buffer.putLong(message.getMessageID());
message.encode(buffer);
}
public int getEncodeSize()
{
- return 8 + 8 + message.getEncodeSize();
+ return 8 + message.getEncodeSize();
}
// Package protected ---------------------------------------------
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java 2008-08-27 15:41:32 UTC (rev 4882)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java 2008-08-27 16:36:27 UTC (rev 4883)
@@ -27,7 +27,6 @@
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
-import org.jboss.messaging.core.paging.PageMessage;
import org.jboss.messaging.core.paging.PagingStore;
import org.jboss.messaging.core.paging.PagingStoreFactory;
import org.jboss.messaging.util.SimpleString;
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-08-27 15:41:32 UTC (rev 4882)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-08-27 16:36:27 UTC (rev 4883)
@@ -555,7 +555,10 @@
Page page = depage();
if (page == null)
{
- listener.clearLastRecord(lastRecord);
+ if (lastRecord != null)
+ {
+ listener.clearLastRecord(lastRecord);
+ }
lastRecord = null;
break;
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-08-27 15:41:32 UTC (rev 4882)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-08-27 16:36:27 UTC (rev 4883)
@@ -49,6 +49,7 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.paging.LastPageRecord;
import org.jboss.messaging.core.paging.PageTransactionInfo;
+import org.jboss.messaging.core.paging.Pager;
import org.jboss.messaging.core.paging.impl.LastPageRecordImpl;
import org.jboss.messaging.core.paging.impl.PageTransactionInfoImpl;
import org.jboss.messaging.core.persistence.StorageManager;
@@ -325,7 +326,9 @@
pageTransactionInfo.setRecordID(record.id);
- postOffice.getPager().addTransaction(pageTransactionInfo);
+ Pager pager = postOffice.getPager();
+
+ pager.addTransaction(pageTransactionInfo);
break;
}
@@ -339,7 +342,9 @@
recordImpl.decode(buff);
- postOffice.getPager().loadLastPage(recordImpl);
+ Pager pager = postOffice.getPager();
+
+ pager.loadLastPage(recordImpl);
break;
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-08-27 15:41:32 UTC (rev 4882)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-08-27 16:36:27 UTC (rev 4883)
@@ -74,6 +74,15 @@
private static final Logger log = Logger.getLogger(PostOfficeImpl.class);
+ private static final boolean isTrace = log.isTraceEnabled();
+
+ // This is just a debug tool method.
+ // During debugs you could make log.trace as log.info, and change the variable isTrace above
+ private static void trace(String message)
+ {
+ log.trace(message);
+ }
+
//private final int nodeID;
private final ConcurrentMap<SimpleString, List<Binding>> mappings = new ConcurrentHashMap<SimpleString, List<Binding>>();
@@ -462,7 +471,7 @@
public void clearLastRecord(LastPageRecord lastRecord) throws Exception
{
- System.out.println("Clearing lastRecord information!!!!!!");
+ trace("Clearing lastRecord information " + lastRecord.getLastId());
storageManager.storeDelete(lastRecord.getRecordId());
}
@@ -507,24 +516,31 @@
final long transactionIdDuringPaging = msg.getTransactionID();
if (transactionIdDuringPaging > 0)
{
- final PageTransactionInfo pageInfo = transactions.get(transactionIdDuringPaging);
- if (pageInfo == null)
+ final PageTransactionInfo pageTransactionInfo = transactions.get(transactionIdDuringPaging);
+
+ // http://wiki.jboss.org/auth/wiki/JBossMessaging2Paging
+ // This is the Step D described on the "Transactions on Paging" section
+ if (pageTransactionInfo == null)
{
- // TODO make it .trace
- log.warn("Transaction " + msg.getTransactionID() + " not found, ignoring message " + msg.getMessage().getMessageID(), new Exception("trace"));
+ if (isTrace)
+ {
+ trace("Transaction " + msg.getTransactionID() + " not found, ignoring message " + msg.getMessage().getMessageID());
+ }
continue;
}
// This is to avoid a race condition where messages are depaged before the commit arrived
- pageInfo.waitCompletion();
+ pageTransactionInfo.waitCompletion();
/// Update information about transactions
if (msg.getMessage().isDurable())
{
- pageInfo.decrement();
- pageTransactionsToUpdate.add(pageInfo);
+ pageTransactionInfo.decrement();
+ pageTransactionsToUpdate.add(pageTransactionInfo);
}
}
+
+ msg.getMessage().setMessageID(storageManager.generateMessageID());
refsToAdd.addAll(PostOfficeImpl.this.route(msg.getMessage()));
@@ -538,7 +554,9 @@
for (PageTransactionInfo pageWithTransaction: pageTransactionsToUpdate)
{
if (pageWithTransaction.getNumberOfMessages() == 0)
- { // no more messages.. delete the PageWithTransactionInfo
+ {
+ // http://wiki.jboss.org/auth/wiki/JBossMessaging2Paging
+ // numberOfReads==numberOfWrites -> We delete the record
storageManager.storeDeleteTransactional(depageTransactionID, pageWithTransaction.getRecordID());
this.transactions.remove(pageWithTransaction.getTransactionID());
}
@@ -560,7 +578,7 @@
public void loadLastPage(LastPageRecord lastPage) throws Exception
{
- System.out.println("LastPage loaded was " + lastPage.getLastId());
+ System.out.println("LastPage loaded was " + lastPage.getLastId() + " recordID = " + lastPage.getRecordId());
pagingManager.getPageStore(lastPage.getDestination()).setLastRecord(lastPage);
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-08-27 15:41:32 UTC (rev 4882)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-08-27 16:36:27 UTC (rev 4883)
@@ -40,7 +40,6 @@
import org.jboss.messaging.core.filter.impl.FilterImpl;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.paging.Pager;
-import org.jboss.messaging.core.paging.impl.PageMessageImpl;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.FlowController;
@@ -302,8 +301,6 @@
}
throw e;
}
-
- msg.setMessageID(storageManager.generateMessageID());
// This allows the no-local consumers to filter out the messages that come
// from the same connection.
@@ -314,6 +311,10 @@
{
if (!pager.page(msg))
{
+ // We only set the messageID after we are sure the message is not being paged
+ // Paged messages won't have an ID until they are depaged
+ msg.setMessageID(storageManager.generateMessageID());
+
List<MessageReference> refs = postOffice.route(msg);
if (msg.getDurableRefCount() != 0)
@@ -1151,10 +1152,6 @@
{
pager.messageDone(message);
}
- else
- {
- System.out.println("Still " + message.getRefCount());
- }
if (message.isDurable() && queue.isDurable())
{
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-08-27 15:41:32 UTC (rev 4882)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-08-27 16:36:27 UTC (rev 4883)
@@ -34,7 +34,7 @@
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.paging.Pager;
-import org.jboss.messaging.core.paging.PageTransactionInfo;
+import org.jboss.messaging.core.paging.impl.PageTransactionInfoImpl;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.server.MessageReference;
@@ -43,8 +43,6 @@
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.Transaction;
-import org.jboss.messaging.core.paging.impl.PageMessageImpl;
-import org.jboss.messaging.core.paging.impl.PageTransactionInfoImpl;
import org.jboss.messaging.util.SimpleString;
/**
@@ -257,14 +255,16 @@
storageManager.commit(id);
}
- if (pageTransaction != null)
+ for (MessageReference ref : refsToAdd)
{
- pageTransaction.complete();
+ ref.getQueue().addLast(ref);
}
- for (MessageReference ref : refsToAdd)
+ // If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the transaction until all the messages were added to the queue
+ // or else we could deliver the messages out of order
+ if (pageTransaction != null)
{
- ref.getQueue().addLast(ref);
+ pageTransaction.complete();
}
for (MessageReference reference : acknowledgements)
@@ -413,6 +413,10 @@
private void route(final ServerMessage message) throws Exception
{
+ // We only set the messageID after we are sure the message is not being paged
+ // Paged messages won't have an ID until they are depaged
+ message.setMessageID(storageManager.generateMessageID());
+
List<MessageReference> refs = postOffice.route(message);
refsToAdd.addAll(refs);
@@ -445,12 +449,13 @@
for (ServerMessage message: pagedMessages)
{
+ // http://wiki.jboss.org/auth/wiki/JBossMessaging2Paging
+ // Explained under Transaction On Paging. (This is the item B)
if (pager.page(message, id))
{
- // This could happen if the destination was in page mode when the message was added, and it was changed when effectively adding it
-
if (message.isDurable())
{
+ // We only create pageTransactions if using persistent messages
pageTransaction.increment();
pagingPersistent = true;
pagedDestinationsToSync.add(message.getDestination());
More information about the jboss-cvs-commits
mailing list