Author: clebert.suconic(a)jboss.com
Date: 2010-09-21 18:25:29 -0400 (Tue, 21 Sep 2010)
New Revision: 9709
Modified:
trunk/src/main/org/hornetq/core/paging/PagingStore.java
trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java
trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/server/HornetQServer.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
HORNETQ-523 - Ordering issue with TX and paging
Modified: trunk/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/PagingStore.java 2010-09-21 14:18:07 UTC (rev
9708)
+++ trunk/src/main/org/hornetq/core/paging/PagingStore.java 2010-09-21 22:25:29 UTC (rev
9709)
@@ -13,6 +13,8 @@
package org.hornetq.core.paging;
+import java.util.List;
+
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.server.ServerMessage;
@@ -49,7 +51,7 @@
void sync() throws Exception;
- boolean page(ServerMessage message, long transactionId) throws Exception;
+ boolean page(List<ServerMessage> messages, long transactionId) throws
Exception;
boolean page(ServerMessage message) throws Exception;
Modified: trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java 2010-09-21 14:18:07 UTC (rev
9708)
+++ trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java 2010-09-21 22:25:29 UTC (rev
9709)
@@ -236,6 +236,11 @@
{
return size.intValue();
}
+
+ public String toString()
+ {
+ return "PageImpl::pageID=" + this.pageId + ", file=" +
this.file;
+ }
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2010-09-21 14:18:07
UTC (rev 9708)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2010-09-21 22:25:29
UTC (rev 9709)
@@ -235,11 +235,16 @@
// Private -------------------------------------------------------
- private PagingStore newStore(final SimpleString address) throws Exception
+ protected PagingStore newStore(final SimpleString address) throws Exception
{
return pagingStoreFactory.newStore(address,
addressSettingsRepository.getMatch(address.toString()));
}
+
+ protected PagingStoreFactory getStoreFactory()
+ {
+ return pagingStoreFactory;
+ }
// Inner classes -------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java 2010-09-21
14:18:07 UTC (rev 9708)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java 2010-09-21
22:25:29 UTC (rev 9709)
@@ -220,6 +220,26 @@
{
return new NIOSequentialFileFactory(directory + File.separatorChar + directoryName,
false);
}
+
+ protected PagingManager getPagingManager()
+ {
+ return pagingManager;
+ }
+
+ protected StorageManager getStorageManager()
+ {
+ return storageManager;
+ }
+
+ protected PostOffice getPostOffice()
+ {
+ return postOffice;
+ }
+
+ protected ExecutorFactory getExecutorFactory()
+ {
+ return executorFactory;
+ }
// Private -------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-09-21 14:18:07
UTC (rev 9708)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-09-21 22:25:29
UTC (rev 9709)
@@ -14,6 +14,7 @@
package org.hornetq.core.paging.impl;
import java.text.DecimalFormat;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -110,7 +111,7 @@
private volatile Page currentPage;
private final ReentrantLock writeLock = new ReentrantLock();
-
+
/** duplicate cache used at this address */
private final DuplicateIDCache duplicateCache;
@@ -186,7 +187,7 @@
this.storeFactory = storeFactory;
this.syncNonTransactional = syncNonTransactional;
-
+
// Post office could be null on the backup node
if (postOffice == null)
{
@@ -196,7 +197,7 @@
{
this.duplicateCache = postOffice.getDuplicateIDCache(storeName);
}
-
+
}
// Public --------------------------------------------------------
@@ -263,7 +264,7 @@
return storeName;
}
- public boolean page(final ServerMessage message, final long transactionID) throws
Exception
+ public boolean page(final List<ServerMessage> message, final long transactionID)
throws Exception
{
// The sync on transactions is done on commit only
return page(message, transactionID, false);
@@ -273,7 +274,7 @@
{
// If non Durable, there is no need to sync as there is no requirement for
persistence for those messages in case
// of crash
- return page(message, -1, syncNonTransactional && message.isDurable());
+ return page(Arrays.asList(message), -1, syncNonTransactional &&
message.isDurable());
}
public void sync() throws Exception
@@ -541,7 +542,6 @@
writeLock.lock();
currentPageLock.writeLock().lock(); // Make sure no checks are done on currentPage
while we are depaging
-
try
{
if (!running)
@@ -597,6 +597,7 @@
{
returnPage = createPage(firstPageId++);
}
+
return returnPage;
}
}
@@ -619,10 +620,14 @@
* @return
* @throws Exception
*/
- private boolean readPage() throws Exception
+ protected boolean readPage() throws Exception
{
Page page = depage();
+ // It's important that only depage should happen while locked
+ // or we would be holding a lock for a long time
+ // The reading (IO part) should happen outside of any locks
+
if (page == null)
{
return false;
@@ -630,8 +635,8 @@
page.open();
- List<PagedMessage> messages = null;
-
+ List<PagedMessage> messages = null;
+
try
{
messages = page.read();
@@ -688,25 +693,25 @@
class OurRunnable implements Runnable
{
boolean ran;
-
+
final Runnable runnable;
-
+
OurRunnable(final Runnable runnable)
{
this.runnable = runnable;
}
-
+
public synchronized void run()
{
if (!ran)
{
runnable.run();
-
+
ran = true;
}
}
}
-
+
public void executeRunnableWhenMemoryAvailable(final Runnable runnable)
{
if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && maxSize
!= -1)
@@ -714,23 +719,23 @@
if (sizeInBytes.get() > maxSize)
{
OurRunnable ourRunnable = new OurRunnable(runnable);
-
+
onMemoryFreedRunnables.add(ourRunnable);
-
- //We check again to avoid a race condition where the size can come down just
after the element
- //has been added, but the check to execute was done before the element was
added
- //NOTE! We do not fix this race by locking the whole thing, doing this check
provides
- //MUCH better performance in a highly concurrent environment
+
+ // We check again to avoid a race condition where the size can come down just
after the element
+ // has been added, but the check to execute was done before the element was
added
+ // NOTE! We do not fix this race by locking the whole thing, doing this check
provides
+ // MUCH better performance in a highly concurrent environment
if (sizeInBytes.get() <= maxSize)
{
- //run it now
+ // run it now
ourRunnable.run();
}
return;
}
}
-
+
runnable.run();
}
@@ -797,9 +802,7 @@
}
- private boolean page(final ServerMessage message,
- final long transactionID,
- final boolean sync) throws Exception
+ protected boolean page(final List<ServerMessage> messages, final long
transactionID, final boolean sync) throws Exception
{
if (!running)
{
@@ -857,60 +860,63 @@
return false;
}
- PagedMessage pagedMessage;
-
- if (!message.isDurable())
+ for (ServerMessage message : messages)
{
- // The address should never be transient when paging (even for non-persistent
messages when paging)
- // This will force everything to be persisted
- message.bodyChanged();
- }
+ PagedMessage pagedMessage;
- if (transactionID != -1)
- {
- pagedMessage = new PagedMessageImpl(message, transactionID);
- }
- else
- {
- pagedMessage = new PagedMessageImpl(message);
- }
+ if (!message.isDurable())
+ {
+ // The address should never be transient when paging (even for
non-persistent messages when paging)
+ // This will force everything to be persisted
+ message.bodyChanged();
+ }
- int bytesToWrite = pagedMessage.getEncodeSize() + PageImpl.SIZE_RECORD;
-
- if (currentPageSize.addAndGet(bytesToWrite) > pageSize &&
currentPage.getNumberOfMessages() > 0)
- {
- // Make sure nothing is currently validating or using currentPage
- currentPageLock.writeLock().lock();
- try
+ if (transactionID != -1)
{
- openNewPage();
-
- // openNewPage will set currentPageSize to zero, we need to set it again
- currentPageSize.addAndGet(bytesToWrite);
+ pagedMessage = new PagedMessageImpl(message, transactionID);
}
- finally
+ else
{
- currentPageLock.writeLock().unlock();
+ pagedMessage = new PagedMessageImpl(message);
}
- }
- currentPageLock.readLock().lock();
+ int bytesToWrite = pagedMessage.getEncodeSize() + PageImpl.SIZE_RECORD;
- try
- {
- currentPage.write(pagedMessage);
+ if (currentPageSize.addAndGet(bytesToWrite) > pageSize &&
currentPage.getNumberOfMessages() > 0)
+ {
+ // Make sure nothing is currently validating or using currentPage
+ currentPageLock.writeLock().lock();
+ try
+ {
+ openNewPage();
- if (sync)
+ // openNewPage will set currentPageSize to zero, we need to set it
again
+ currentPageSize.addAndGet(bytesToWrite);
+ }
+ finally
+ {
+ currentPageLock.writeLock().unlock();
+ }
+ }
+
+ currentPageLock.readLock().lock();
+
+ try
{
- currentPage.sync();
+ currentPage.write(pagedMessage);
+
+ if (sync)
+ {
+ currentPage.sync();
+ }
}
+ finally
+ {
+ currentPageLock.readLock().unlock();
+ }
+ }
- return true;
- }
- finally
- {
- currentPageLock.readLock().unlock();
- }
+ return true;
}
finally
{
@@ -940,9 +946,9 @@
// Depage has to be done atomically, in case of failure it should be
// back to where it was
-
+
byte[] duplicateIdForPage = generateDuplicateID(pageId);
-
+
Transaction depageTransaction = new TransactionImpl(storageManager);
// DuplicateCache could be null during replication
@@ -950,7 +956,8 @@
{
if (duplicateCache.contains(duplicateIdForPage))
{
- log.warn("Page " + pageId + " had been processed already but
the file wasn't removed as a crash happened. Ignoring this page");
+ log.warn("Page " + pageId +
+ " had been processed already but the file wasn't removed as
a crash happened. Ignoring this page");
return true;
}
@@ -1058,7 +1065,7 @@
{
// This will set the journal transaction to commit;
depageTransaction.setContainsPersistent();
-
+
entry.getKey().storeUpdate(storageManager, this.pagingManager,
depageTransaction, entry.getValue().intValue());
}
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-09-21
14:18:07 UTC (rev 9708)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-09-21
22:25:29 UTC (rev 9709)
@@ -15,16 +15,15 @@
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Message;
+import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
@@ -476,10 +475,10 @@
if (addressManager.getBindingsForRoutingAddress(binding.getAddress()) == null)
{
pagingManager.deletePageStore(binding.getAddress());
-
+
managementService.unregisterAddress(binding.getAddress());
}
-
+
if (binding.getType() == BindingType.LOCAL_QUEUE)
{
managementService.unregisterQueue(uniqueName, binding.getAddress());
@@ -502,7 +501,7 @@
}
binding.close();
-
+
return binding;
}
@@ -537,7 +536,7 @@
{
route(message, new RoutingContextImpl(tx), direct);
}
-
+
public void route(final ServerMessage message, final RoutingContext context, final
boolean direct) throws Exception
{
// Sanity check
@@ -547,7 +546,7 @@
}
SimpleString address = message.getAddress();
-
+
setPagingStore(message);
Object duplicateID =
message.getObjectProperty(Message.HDR_DUPLICATE_DETECTION_ID);
@@ -614,17 +613,18 @@
else
{
Transaction tx = context.getTransaction();
-
+
boolean depage = tx.getProperty(TransactionPropertyIndexes.IS_DEPAGE) != null;
-
- // if the TX paged at least one message on a give address, all the other
addresses should also go towards paging cache now
+
+ // if the TX paged at least one message on a give address, all the other
addresses should also go towards
+ // paging cache now
boolean alreadyPaging = false;
-
+
if (tx.isPaging())
{
- alreadyPaging = getPageOperation(tx).isPaging(message.getAddress());
+ alreadyPaging = getPageOperation(tx).isPaging(message.getAddress());
}
-
+
if (!depage && message.storeIsPaging() || alreadyPaging)
{
tx.setPaging(true);
@@ -633,7 +633,7 @@
{
tx.commit();
}
-
+
return;
}
}
@@ -849,7 +849,7 @@
private void setPagingStore(final ServerMessage message) throws Exception
{
PagingStore store = pagingManager.getPageStore(message.getAddress());
-
+
message.setPagingStore(store);
}
@@ -1113,21 +1113,26 @@
private class PageMessageOperation implements TransactionOperation
{
- private final List<ServerMessage> messagesToPage = new
ArrayList<ServerMessage>();
-
- private final HashSet<SimpleString> addressesPaging = new
HashSet<SimpleString>();
-
+ private final HashMap<SimpleString, Pair<PagingStore,
List<ServerMessage>>> pagingData = new HashMap<SimpleString,
Pair<PagingStore, List<ServerMessage>>>();
+
private Transaction subTX = null;
-
+
void addMessageToPage(final ServerMessage message)
{
- messagesToPage.add(message);
- addressesPaging.add(message.getAddress());
+ Pair<PagingStore, List<ServerMessage>> pagePair =
pagingData.get(message.getAddress());
+ if (pagePair == null)
+ {
+ pagePair = new Pair<PagingStore,
List<ServerMessage>>(message.getPagingStore(),
+ new
ArrayList<ServerMessage>());
+ pagingData.put(message.getAddress(), pagePair);
+ }
+
+ pagePair.b.add(message);
}
-
+
boolean isPaging(final SimpleString address)
{
- return addressesPaging.contains(address);
+ return pagingData.get(address) != null;
}
public void afterCommit(final Transaction tx)
@@ -1142,7 +1147,7 @@
{
pageTransaction.commit();
}
-
+
if (subTX != null)
{
subTX.afterCommit();
@@ -1178,18 +1183,18 @@
{
pageMessages(tx);
}
-
+
if (subTX != null)
{
subTX.beforeCommit();
}
-
+
}
public void beforePrepare(final Transaction tx) throws Exception
{
pageMessages(tx);
-
+
if (subTX != null)
{
subTX.beforePrepare();
@@ -1206,7 +1211,7 @@
private void pageMessages(final Transaction tx) throws Exception
{
- if (!messagesToPage.isEmpty())
+ if (!pagingData.isEmpty())
{
PageTransactionInfo pageTransaction =
(PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
@@ -1223,21 +1228,33 @@
boolean pagingPersistent = false;
- Set<PagingStore> pagingStoresToSync = new
HashSet<PagingStore>();
+ ArrayList<ServerMessage> nonPagedMessages = null;
- for (ServerMessage message : messagesToPage)
+ for (Pair<PagingStore, List<ServerMessage>> pair :
pagingData.values())
{
- if (message.page(tx.getID()))
+
+ if (!pair.a.page(pair.b, tx.getID()))
{
- if (message.isDurable())
+ if (nonPagedMessages == null)
{
- // We only create pageTransactions if using persistent messages
+ nonPagedMessages = new ArrayList<ServerMessage>();
+ }
+ nonPagedMessages.addAll(pair.b);
+ }
+
+ for (ServerMessage msg : pair.b)
+ {
+ if (msg.isDurable())
+ {
pageTransaction.increment();
pagingPersistent = true;
- pagingStoresToSync.add(message.getPagingStore());
}
}
- else
+ }
+
+ if (nonPagedMessages != null)
+ {
+ for (ServerMessage message : nonPagedMessages)
{
// This could happen when the PageStore left the pageState
// we create a copy of the transaction so that messages are routed with
the same tx ID.
@@ -1246,9 +1263,9 @@
{
subTX = tx.copy();
}
-
+
route(message, subTX, false);
-
+
if (subTX.isContainsPersistent())
{
// The route wouldn't be able to update the persistent flag on
the main TX
@@ -1261,16 +1278,12 @@
if (pagingPersistent)
{
tx.setContainsPersistent();
-
- if (!pagingStoresToSync.isEmpty())
+ for (Pair<PagingStore, List<ServerMessage>> pair :
pagingData.values())
{
- for (PagingStore store : pagingStoresToSync)
- {
- store.sync();
- }
+ pair.a.sync();
+ }
- pageTransaction.store(storageManager, pagingManager, tx);
- }
+ pageTransaction.store(storageManager, pagingManager, tx);
}
}
}
Modified: trunk/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/HornetQServer.java 2010-09-21 14:18:07 UTC (rev
9708)
+++ trunk/src/main/org/hornetq/core/server/HornetQServer.java 2010-09-21 22:25:29 UTC (rev
9709)
@@ -23,6 +23,7 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.DivertConfiguration;
import org.hornetq.core.management.impl.HornetQServerControlImpl;
+import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.protocol.core.Channel;
@@ -58,6 +59,8 @@
RemotingService getRemotingService();
StorageManager getStorageManager();
+
+ PagingManager getPagingManager();
ManagementService getManagementService();
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-09-21 14:18:07
UTC (rev 9708)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-09-21 22:25:29
UTC (rev 9709)
@@ -502,6 +502,11 @@
{
return mbeanServer;
}
+
+ public PagingManager getPagingManager()
+ {
+ return pagingManager;
+ }
public RemotingService getRemotingService()
{
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2010-09-21 14:18:07
UTC (rev 9708)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2010-09-21 22:25:29
UTC (rev 9709)
@@ -14,6 +14,7 @@
package org.hornetq.core.server.impl;
import java.io.InputStream;
+import java.util.Arrays;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
@@ -269,7 +270,7 @@
{
if (pagingStore != null)
{
- return pagingStore.page(this, transactionID);
+ return pagingStore.page(Arrays.asList((ServerMessage)this), transactionID);
}
else
{
Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-09-21
14:18:07 UTC (rev 9708)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-09-21
22:25:29 UTC (rev 9709)
@@ -13,10 +13,17 @@
package org.hornetq.tests.integration.client;
+import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
@@ -33,14 +40,30 @@
import org.hornetq.api.core.client.MessageHandler;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.DivertConfiguration;
+import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.Page;
+import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.paging.PagingStoreFactory;
+import org.hornetq.core.paging.impl.PageImpl;
+import org.hornetq.core.paging.impl.PagingManagerImpl;
+import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
+import org.hornetq.core.paging.impl.PagingStoreImpl;
import org.hornetq.core.paging.impl.TestSupportPageStore;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.HornetQServerImpl;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.spi.core.security.HornetQSecurityManager;
+import org.hornetq.spi.core.security.HornetQSecurityManagerImpl;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.ExecutorFactory;
/**
* A PagingTest
@@ -129,17 +152,17 @@
server.start();
- final int numberOfIntegers = 256;
+ final int messageSize = 1024;
final int numberOfMessages = 30000;
- final byte[] body = new byte[numberOfIntegers * 4];
+ final byte[] body = new byte[messageSize];
ByteBuffer bb = ByteBuffer.wrap(body);
- for (int j = 1; j <= numberOfIntegers; j++)
+ for (int j = 1; j <= messageSize; j++)
{
- bb.putInt(j);
+ bb.put(getSamplebyte(j));
}
try
@@ -244,7 +267,7 @@
}
consumer.close();
-
+
session.close();
}
catch (Throwable e)
@@ -266,11 +289,20 @@
{
threads[i].join();
}
-
+
assertEquals(0, errors.get());
+ for (int i = 0 ; i < 20 &&
server.getPostOffice().getPagingManager().getTransactions().size() != 0; i++)
+ {
+ if (server.getPostOffice().getPagingManager().getTransactions().size() != 0)
+ {
+ // The delete may be asynchronous, giving some time case it eventually
happen asynchronously
+ Thread.sleep(500);
+ }
+ }
+
assertEquals(0,
server.getPostOffice().getPagingManager().getTransactions().size());
-
+
}
finally
{
@@ -740,11 +772,11 @@
message.putIntProperty(new SimpleString("id"), i);
producerTransacted.send(message);
-
+
if (i % 2 == 0)
{
System.out.println("Sending 20 msgs to make it page");
- for (int j = 0 ; j < 20; j++)
+ for (int j = 0; j < 20; j++)
{
ClientMessage msgSend = sessionNonTX.createMessage(true);
msgSend.getBodyBuffer().writeBytes(new byte[10 * 1024]);
@@ -756,7 +788,7 @@
{
System.out.println("Consuming 20 msgs to make it page");
ClientConsumer consumer =
sessionNonTX.createConsumer(PagingTest.ADDRESS);
- for (int j = 0 ; j < 20; j++)
+ for (int j = 0; j < 20; j++)
{
ClientMessage msgReceived = consumer.receive(10000);
assertNotNull(msgReceived);
@@ -765,7 +797,7 @@
consumer.close();
}
}
-
+
ClientConsumer consumerNonTX = sessionNonTX.createConsumer(PagingTest.ADDRESS);
while (true)
{
@@ -777,7 +809,6 @@
msgReceived.acknowledge();
}
consumerNonTX.close();
-
ClientConsumer consumer = sessionNonTX.createConsumer(PagingTest.ADDRESS);
@@ -798,7 +829,7 @@
// System.out.println(messageID);
Assert.assertNotNull(messageID);
Assert.assertEquals("message received out of order", i,
messageID.intValue());
-
+
System.out.println("MessageID = " + messageID);
message.acknowledge();
@@ -823,7 +854,6 @@
}
-
public void testDepageDuringTransaction4() throws Exception
{
clearData();
@@ -835,93 +865,88 @@
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
new HashMap<String, AddressSettings>());
-
+
server.getConfiguration().setJournalSyncNonTransactional(false);
server.getConfiguration().setJournalSyncTransactional(false);
server.start();
final AtomicInteger errors = new AtomicInteger(0);
-
+
final int messageSize = 1024; // 1k
final int numberOfMessages = 10000;
try
{
final ClientSessionFactory sf = createInVMFactory();
-
sf.setBlockOnNonDurableSend(true);
sf.setBlockOnDurableSend(true);
sf.setBlockOnAcknowledge(false);
final byte[] body = new byte[messageSize];
-
-
+
Thread producerThread = new Thread()
{
- public void run()
- {
- ClientSession sessionProducer = null;
- try
- {
- sessionProducer = sf.createSession(false, false);
- ClientProducer producer = sessionProducer.createProducer(ADDRESS);
-
- for (int i = 0 ; i < numberOfMessages; i++)
- {
- ClientMessage msg = sessionProducer.createMessage(true);
- msg.getBodyBuffer().writeBytes(body);
- msg.putIntProperty("count", i);
- producer.send(msg);
-
- if (i % 50 == 0 && i != 0)
- {
- sessionProducer.commit();
- //Thread.sleep(500);
- }
- }
-
- sessionProducer.commit();
-
- System.out.println("Producer gone");
-
-
-
- }
- catch (Throwable e)
- {
- e.printStackTrace(); // >> junit report
- errors.incrementAndGet();
- }
- finally
- {
- try
- {
- if (sessionProducer != null)
- {
- sessionProducer.close();
- }
- }
- catch (Throwable e)
- {
- e.printStackTrace();
- errors.incrementAndGet();
- }
- }
- }
+ public void run()
+ {
+ ClientSession sessionProducer = null;
+ try
+ {
+ sessionProducer = sf.createSession(false, false);
+ ClientProducer producer = sessionProducer.createProducer(ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = sessionProducer.createMessage(true);
+ msg.getBodyBuffer().writeBytes(body);
+ msg.putIntProperty("count", i);
+ producer.send(msg);
+
+ if (i % 50 == 0 && i != 0)
+ {
+ sessionProducer.commit();
+ // Thread.sleep(500);
+ }
+ }
+
+ sessionProducer.commit();
+
+ System.out.println("Producer gone");
+
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace(); // >> junit report
+ errors.incrementAndGet();
+ }
+ finally
+ {
+ try
+ {
+ if (sessionProducer != null)
+ {
+ sessionProducer.close();
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ }
};
-
+
ClientSession session = sf.createSession(true, true, 0);
session.start();
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
-
+
producerThread.start();
-
+
ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
-
-
- for (int i = 0 ; i < numberOfMessages; i++)
+
+ for (int i = 0; i < numberOfMessages; i++)
{
ClientMessage msg = consumer.receive(500000);
assertNotNull(msg);
@@ -929,15 +954,15 @@
msg.acknowledge();
if (i > 0 && i % 10 == 0)
{
- //session.commit();
+ // session.commit();
}
}
- //session.commit();
-
+ // session.commit();
+
session.close();
-
+
producerThread.join();
-
+
assertEquals(0, errors.get());
}
finally
@@ -953,6 +978,361 @@
}
+ // This test will force a depage thread as soon as the first message hits the page
+ public void testDepageOnTX5() throws Exception
+ {
+ clearData();
+
+ final Configuration config = createDefaultConfig();
+ HornetQSecurityManager securityManager = new HornetQSecurityManagerImpl();
+
+ final Executor executor = Executors.newSingleThreadExecutor();
+
+ final AtomicInteger countDepage = new AtomicInteger(0);
+ class HackPagingStore extends PagingStoreImpl
+ {
+ HackPagingStore(final SimpleString address,
+ final PagingManager pagingManager,
+ final StorageManager storageManager,
+ final PostOffice postOffice,
+ final SequentialFileFactory fileFactory,
+ final PagingStoreFactory storeFactory,
+ final SimpleString storeName,
+ final AddressSettings addressSettings,
+ final Executor executor,
+ final boolean syncNonTransactional)
+ {
+ super(address,
+ pagingManager,
+ storageManager,
+ postOffice,
+ fileFactory,
+ storeFactory,
+ storeName,
+ addressSettings,
+ executor,
+ syncNonTransactional);
+ }
+
+ protected boolean page(final List<ServerMessage> messages, final long
transactionID, final boolean sync) throws Exception
+ {
+ boolean paged = super.page(messages, transactionID, sync);
+
+ if (paged)
+ {
+
+ if (countDepage.incrementAndGet() == 1)
+ {
+ countDepage.set(0);
+
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ while (isStarted() && readPage());
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+ }
+
+ return paged;
+ }
+
+ public boolean startDepaging()
+ {
+ // do nothing, we are hacking depage right in between paging
+ return false;
+ }
+
+ };
+
+ class HackStoreFactory extends PagingStoreFactoryNIO
+ {
+ HackStoreFactory(final String directory,
+ final ExecutorFactory executorFactory,
+ final boolean syncNonTransactional)
+ {
+ super(directory, executorFactory, syncNonTransactional);
+ }
+
+ public synchronized PagingStore newStore(final SimpleString address, final
AddressSettings settings) throws Exception
+ {
+
+ return new HackPagingStore(address,
+ getPagingManager(),
+ getStorageManager(),
+ getPostOffice(),
+ null,
+ this,
+ address,
+ settings,
+ getExecutorFactory().getExecutor(),
+ syncNonTransactional);
+ }
+
+ };
+
+ HornetQServer server = new HornetQServerImpl(config,
ManagementFactory.getPlatformMBeanServer(), securityManager)
+
+ {
+ protected PagingManager createPagingManager()
+ {
+ return new PagingManagerImpl(new
HackStoreFactory(config.getPagingDirectory(),
+ getExecutorFactory(),
+
config.isJournalSyncNonTransactional()),
+ getStorageManager(),
+ getAddressSettingsRepository());
+ }
+ };
+
+ AddressSettings defaultSetting = new AddressSettings();
+ defaultSetting.setPageSizeBytes(PAGE_SIZE);
+ defaultSetting.setMaxSizeBytes(PAGE_MAX);
+ server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+ server.start();
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ final int messageSize = 1024; // 1k
+ final int numberOfMessages = 2000;
+
+ try
+ {
+ final ClientSessionFactory sf = createInVMFactory();
+
+ sf.setBlockOnNonDurableSend(true);
+ sf.setBlockOnDurableSend(true);
+ sf.setBlockOnAcknowledge(false);
+
+ final byte[] body = new byte[messageSize];
+
+ Thread producerThread = new Thread()
+ {
+ public void run()
+ {
+ ClientSession sessionProducer = null;
+ try
+ {
+ sessionProducer = sf.createSession(false, false);
+ ClientProducer producer = sessionProducer.createProducer(ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = sessionProducer.createMessage(true);
+ msg.getBodyBuffer().writeBytes(body);
+ msg.putIntProperty("count", i);
+ producer.send(msg);
+
+ if (i % 500 == 0 && i != 0)
+ {
+ sessionProducer.commit();
+ // Thread.sleep(500);
+ }
+ }
+
+ sessionProducer.commit();
+
+ System.out.println("Producer gone");
+
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace(); // >> junit report
+ errors.incrementAndGet();
+ }
+ finally
+ {
+ try
+ {
+ if (sessionProducer != null)
+ {
+ sessionProducer.close();
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ }
+ };
+
+ ClientSession session = sf.createSession(true, true, 0);
+ session.start();
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ producerThread.start();
+
+ ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = consumer.receive(500000);
+ assertNotNull(msg);
+ assertEquals(i, msg.getIntProperty("count").intValue());
+ msg.acknowledge();
+ if (i > 0 && i % 10 == 0)
+ {
+ // session.commit();
+ }
+ }
+ // session.commit();
+
+ session.close();
+
+ producerThread.join();
+
+ assertEquals(0, errors.get());
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+ public void testOrderingNonTX() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ final HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_SIZE * 2,
+ new HashMap<String,
AddressSettings>());
+
+ server.getConfiguration().setJournalSyncNonTransactional(false);
+ server.getConfiguration().setJournalSyncTransactional(false);
+
+ server.start();
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ final int messageSize = 1024; // 1k
+ final int numberOfMessages = 2000;
+
+ try
+ {
+ final ClientSessionFactory sf = createInVMFactory();
+
+ sf.setBlockOnNonDurableSend(false);
+ sf.setBlockOnDurableSend(true);
+ sf.setBlockOnAcknowledge(false);
+
+ final CountDownLatch ready = new CountDownLatch(1);
+
+ final byte[] body = new byte[messageSize];
+
+ Thread producerThread = new Thread()
+ {
+ public void run()
+ {
+ ClientSession sessionProducer = null;
+ try
+ {
+ sessionProducer = sf.createSession(true, true);
+ ClientProducer producer = sessionProducer.createProducer(ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = sessionProducer.createMessage(true);
+ msg.getBodyBuffer().writeBytes(body);
+ msg.putIntProperty("count", i);
+ producer.send(msg);
+
+ if (i == 1000)
+ {
+ // The session is not TX, but we do this just to perform a round
trip to the server
+ // and make sure there are no pending messages
+ sessionProducer.commit();
+
+
assertTrue(server.getPagingManager().getPageStore(ADDRESS).isPaging());
+ ready.countDown();
+ }
+ }
+
+ sessionProducer.commit();
+
+ System.out.println("Producer gone");
+
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace(); // >> junit report
+ errors.incrementAndGet();
+ }
+ finally
+ {
+ try
+ {
+ if (sessionProducer != null)
+ {
+ sessionProducer.close();
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ }
+ };
+
+ ClientSession session = sf.createSession(true, true, 0);
+ session.start();
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ producerThread.start();
+
+ assertTrue(ready.await(10, TimeUnit.SECONDS));
+
+ ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = consumer.receive(500000);
+ assertNotNull(msg);
+ assertEquals(i, msg.getIntProperty("count").intValue());
+ msg.acknowledge();
+ }
+
+ session.close();
+
+ producerThread.join();
+
+ assertEquals(0, errors.get());
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
public void testPageOnSchedulingNoRestart() throws Exception
{
internalTestPageOnScheduling(false);