[jboss-cvs] JBoss Messaging SVN: r6309 - trunk/src/main/org/jboss/messaging/core/paging/impl.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Apr 3 17:10:42 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-04-03 17:10:42 -0400 (Fri, 03 Apr 2009)
New Revision: 6309
Modified:
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
Log:
No code changes on this commit.. just auto-format and removing some old comments
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-04-03 16:43:27 UTC (rev 6308)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-04-03 21:10:42 UTC (rev 6309)
@@ -512,7 +512,7 @@
if (!ok)
{
- log.warn("Timed out waiting for depage executor on destination " + this.storeName + " to stop");
+ log.warn("Timed out waiting for depage executor on destination " + storeName + " to stop");
}
if (currentPage != null)
@@ -770,7 +770,7 @@
if (fileFactory == null)
{
- fileFactory = storeFactory.newFileFactory(this.getStoreName());
+ fileFactory = storeFactory.newFileFactory(getStoreName());
}
SequentialFile file = fileFactory.createSequentialFile(fileName, 1000);
@@ -817,114 +817,103 @@
Transaction depageTransaction = new TransactionImpl(storageManager);
-// SendLock sendLock = postOffice.getAddressLock(destination);
-//
-// sendLock.beforeSend();
-//
-// try
-// {
- depageTransaction.putProperty(TransactionPropertyIndexes.IS_DEPAGE, Boolean.valueOf(true));
+ depageTransaction.putProperty(TransactionPropertyIndexes.IS_DEPAGE, Boolean.valueOf(true));
- HashSet<PageTransactionInfo> pageTransactionsToUpdate = new HashSet<PageTransactionInfo>();
+ HashSet<PageTransactionInfo> pageTransactionsToUpdate = new HashSet<PageTransactionInfo>();
- for (PagedMessage pagedMessage : pagedMessages)
- {
- ServerMessage message = null;
+ for (PagedMessage pagedMessage : pagedMessages)
+ {
+ ServerMessage message = null;
- message = pagedMessage.getMessage(storageManager);
+ message = pagedMessage.getMessage(storageManager);
- final long transactionIdDuringPaging = pagedMessage.getTransactionID();
+ final long transactionIdDuringPaging = pagedMessage.getTransactionID();
- if (transactionIdDuringPaging >= 0)
+ if (transactionIdDuringPaging >= 0)
+ {
+ final PageTransactionInfo pageTransactionInfo = pagingManager.getTransaction(transactionIdDuringPaging);
+
+ // http://wiki.jboss.org/wiki/JBossMessaging2Paging
+ // This is the Step D described on the "Transactions on Paging"
+ // section
+ if (pageTransactionInfo == null)
{
- final PageTransactionInfo pageTransactionInfo = pagingManager.getTransaction(transactionIdDuringPaging);
+ log.warn("Transaction " + pagedMessage.getTransactionID() +
+ " used during paging not found, ignoring message " +
+ message);
- // http://wiki.jboss.org/wiki/JBossMessaging2Paging
- // This is the Step D described on the "Transactions on Paging"
- // section
- if (pageTransactionInfo == null)
- {
- log.warn("Transaction " + pagedMessage.getTransactionID() +
- " used during paging not found, ignoring message " +
- message);
+ continue;
+ }
- continue;
- }
+ // This is to avoid a race condition where messages are depaged
+ // before the commit arrived
- // This is to avoid a race condition where messages are depaged
- // before the commit arrived
-
- while (running && !pageTransactionInfo.waitCompletion(500))
+ while (running && !pageTransactionInfo.waitCompletion(500))
+ {
+ // This is just to give us a chance to interrupt the process..
+ // if we start a shutdown in the middle of transactions, the commit/rollback may never come, delaying
+ // the shutdown of the server
+ if (isTrace)
{
- // This is just to give us a chance to interrupt the process..
- // if we start a shutdown in the middle of transactions, the commit/rollback may never come, delaying
- // the shutdown of the server
- if (isTrace)
- {
- trace("Waiting pageTransaction to complete");
- }
+ trace("Waiting pageTransaction to complete");
}
+ }
- if (!running)
- {
- break;
- }
+ if (!running)
+ {
+ break;
+ }
- if (!pageTransactionInfo.isCommit())
+ if (!pageTransactionInfo.isCommit())
+ {
+ if (isTrace)
{
- if (isTrace)
- {
- trace("Rollback was called after prepare, ignoring message " + message);
- }
- continue;
+ trace("Rollback was called after prepare, ignoring message " + message);
}
-
- // Update information about transactions
- if (message.isDurable())
- {
- pageTransactionInfo.decrement();
- pageTransactionsToUpdate.add(pageTransactionInfo);
- }
+ continue;
}
- postOffice.route(message, depageTransaction);
+ // Update information about transactions
+ if (message.isDurable())
+ {
+ pageTransactionInfo.decrement();
+ pageTransactionsToUpdate.add(pageTransactionInfo);
+ }
}
- if (!running)
+ postOffice.route(message, depageTransaction);
+ }
+
+ if (!running)
+ {
+ depageTransaction.rollback();
+ return false;
+ }
+
+ for (PageTransactionInfo pageWithTransaction : pageTransactionsToUpdate)
+ {
+ // This will set the journal transaction to commit;
+ depageTransaction.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
+
+ if (pageWithTransaction.getNumberOfMessages() == 0)
{
- depageTransaction.rollback();
- return false;
+ // http://wiki.jboss.org/wiki/JBossMessaging2Paging
+ // numberOfReads==numberOfWrites -> We delete the record
+ storageManager.deletePageTransactional(depageTransaction.getID(), pageWithTransaction.getRecordID());
+ pagingManager.removeTransaction(pageWithTransaction.getTransactionID());
}
-
- for (PageTransactionInfo pageWithTransaction : pageTransactionsToUpdate)
+ else
{
- // This will set the journal transaction to commit;
- depageTransaction.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
-
- if (pageWithTransaction.getNumberOfMessages() == 0)
- {
- // http://wiki.jboss.org/wiki/JBossMessaging2Paging
- // numberOfReads==numberOfWrites -> We delete the record
- storageManager.deletePageTransactional(depageTransaction.getID(), pageWithTransaction.getRecordID());
- pagingManager.removeTransaction(pageWithTransaction.getTransactionID());
- }
- else
- {
- storageManager.storePageTransaction(depageTransaction.getID(), pageWithTransaction);
- }
+ storageManager.storePageTransaction(depageTransaction.getID(), pageWithTransaction);
}
+ }
- depageTransaction.commit();
+ depageTransaction.commit();
- if (isTrace)
- {
- trace("Depage committed, running = " + running);
- }
-// }
-// finally
-// {
-// sendLock.afterSend();
-// }
+ if (isTrace)
+ {
+ trace("Depage committed, running = " + running);
+ }
return true;
}
@@ -1027,7 +1016,9 @@
// To be used on isDropMessagesWhenFull
private boolean isDrop()
{
- return (getMaxSizeBytes() > 0 && getAddressSize() > getMaxSizeBytes()) || (pagingManager.getMaxGlobalSize() > 0 && pagingManager.getGlobalSize() > pagingManager.getMaxGlobalSize());
+ return getMaxSizeBytes() > 0 && getAddressSize() > getMaxSizeBytes() ||
+ pagingManager.getMaxGlobalSize() > 0 &&
+ pagingManager.getGlobalSize() > pagingManager.getMaxGlobalSize();
}
// Inner classes -------------------------------------------------
More information about the jboss-cvs-commits
mailing list