[jboss-cvs] JBoss Messaging SVN: r5661 - in trunk/src/main/org/jboss/messaging/core: persistence/impl/journal and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jan 19 15:15:40 EST 2009
Author: clebert.suconic at jboss.com
Date: 2009-01-19 15:15:40 -0500 (Mon, 19 Jan 2009)
New Revision: 5661
Modified:
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
Log:
Fix on XATest & Paging
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2009-01-19 17:23:09 UTC (rev 5660)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2009-01-19 20:15:40 UTC (rev 5661)
@@ -247,6 +247,8 @@
pagingStoreFactory.setPagingManager(this);
pagingStoreFactory.setStorageManager(storageManager);
+
+ reloadStores();
started = true;
}
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-01-19 17:23:09 UTC (rev 5660)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-01-19 20:15:40 UTC (rev 5661)
@@ -51,6 +51,7 @@
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.core.transaction.TransactionPropertyIndexes;
import org.jboss.messaging.core.transaction.impl.TransactionImpl;
+import org.jboss.messaging.util.Future;
import org.jboss.messaging.util.SimpleString;
/**
@@ -125,10 +126,7 @@
// variable isTrace above
private static void trace(final String message)
{
- if (isTrace)
- {
- log.trace(message);
- }
+ log.trace(message);
}
// Constructors --------------------------------------------------
@@ -497,25 +495,23 @@
{
if (running)
{
- writeLock.lock();
- currentPageLock.writeLock().lock();
+ running = false;
- try
- {
- running = false;
+ Future future = new Future();
- // FIXME -!! using a volatile to control execution is ugly.
- // The runnable could remain running after paging store is stopped
+ executor.execute(future);
- if (currentPage != null)
- {
- currentPage.close();
- }
+ boolean ok = future.await(10000);
+
+ if (!ok)
+ {
+ log.warn("Timed out waiting for depage executor to stop");
}
- finally
+
+ if (currentPage != null)
{
- writeLock.unlock();
- currentPageLock.writeLock().unlock();
+ currentPage.close();
+ currentPage = null;
}
}
}
@@ -657,6 +653,11 @@
try
{
+ if (!running)
+ {
+ return null;
+ }
+
if (numberOfPages == 0)
{
return null;
@@ -791,7 +792,7 @@
message = pagedMessage.getMessage(storageManager);
final long transactionIdDuringPaging = pagedMessage.getTransactionID();
-
+
if (transactionIdDuringPaging >= 0)
{
final PageTransactionInfo pageTransactionInfo = pagingManager.getTransaction(transactionIdDuringPaging);
@@ -804,7 +805,7 @@
log.warn("Transaction " + pagedMessage.getTransactionID() +
" used during paging not found, ignoring message " +
message);
-
+
continue;
}
@@ -847,7 +848,10 @@
depageTransaction.commit();
- trace("Depage committed");
+ if (isTrace)
+ {
+ trace("Depage committed, running = " + running);
+ }
}
/**
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-01-19 17:23:09 UTC (rev 5660)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-01-19 20:15:40 UTC (rev 5661)
@@ -30,6 +30,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -75,7 +76,9 @@
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.ResourceManager;
import org.jboss.messaging.core.transaction.Transaction;
+import org.jboss.messaging.core.transaction.TransactionOperation;
import org.jboss.messaging.core.transaction.TransactionPropertyIndexes;
+import org.jboss.messaging.core.transaction.Transaction.State;
import org.jboss.messaging.core.transaction.impl.TransactionImpl;
import org.jboss.messaging.util.IDGenerator;
import org.jboss.messaging.util.JBMThreadFactory;
@@ -770,6 +773,8 @@
tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransactionInfo);
pagingManager.addTransaction(pageTransactionInfo);
+
+ tx.addOperation(new FinishPageMessageOperation());
break;
}
@@ -1427,5 +1432,55 @@
return SimpleString.sizeofString(address) + SimpleString.sizeofString(duplID);
}
}
+
+ private class FinishPageMessageOperation implements TransactionOperation
+ {
+ public void afterCommit(final Transaction tx) throws Exception
+ {
+ // If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
+ // transaction until all the messages were added to the queue
+ // or else we could deliver the messages out of order
+
+ PageTransactionInfo pageTransaction = (PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
+
+ if (pageTransaction != null)
+ {
+ pageTransaction.commit();
+ }
+ }
+
+ public void afterPrepare(final Transaction tx) throws Exception
+ {
+ }
+
+ public void afterRollback(final Transaction tx) throws Exception
+ {
+ PageTransactionInfo pageTransaction = (PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
+
+ if (tx.getState() == State.PREPARED && pageTransaction != null)
+ {
+ pageTransaction.rollback();
+ }
+ }
+
+ public void beforeCommit(final Transaction tx) throws Exception
+ {
+ }
+
+ public void beforePrepare(final Transaction tx) throws Exception
+ {
+ }
+
+ public void beforeRollback(final Transaction tx) throws Exception
+ {
+ }
+
+ private void pageMessages(final Transaction tx) throws Exception
+ {
+ }
+
+ }
+
+
}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-01-19 17:23:09 UTC (rev 5660)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-01-19 20:15:40 UTC (rev 5661)
@@ -153,8 +153,6 @@
if (pagingManager != null)
{
pagingManager.setPostOffice(this);
-
- pagingManager.start();
}
// Injecting the postoffice (itself) on queueFactory for paging-control
@@ -181,7 +179,6 @@
messageExpiryExecutor.shutdown();
}
- pagingManager.stop();
addressManager.clear();
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-01-19 17:23:09 UTC (rev 5660)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-01-19 20:15:40 UTC (rev 5661)
@@ -234,8 +234,6 @@
pagingManager = createPagingManager();
- pagingManager.start();
-
resourceManager = new ResourceManagerImpl((int)configuration.getTransactionTimeout() / 1000,
configuration.getTransactionTimeoutScanPeriod());
postOffice = new PostOfficeImpl(storageManager,
@@ -256,6 +254,8 @@
securityStore.setSecurityManager(securityManager);
postOffice.start();
+
+ pagingManager.start();
List<QueueBindingInfo> queueBindingInfos = new ArrayList<QueueBindingInfo>();
List<SimpleString> destinations = new ArrayList<SimpleString>();
@@ -295,8 +295,6 @@
Map<SimpleString, List<Pair<SimpleString, Long>>> duplicateIDMap = new HashMap<SimpleString, List<Pair<SimpleString, Long>>>();
-
- pagingManager.reloadStores();
storageManager.loadMessageJournal(postOffice,
storageManager,
@@ -422,6 +420,8 @@
// Ignore
}
+ pagingManager.stop();
+ pagingManager = null;
securityStore = null;
resourceManager.stop();
resourceManager = null;
More information about the jboss-cvs-commits
mailing list