Author: clebert.suconic(a)jboss.com
Date: 2010-09-13 18:45:10 -0400 (Mon, 13 Sep 2010)
New Revision: 9681
Modified:
trunk/src/main/org/hornetq/core/paging/PageTransactionInfo.java
trunk/src/main/org/hornetq/core/paging/PagingManager.java
trunk/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
Log:
Fixing a leak on PageTransaction and adding tests on ordering for paging
Modified: trunk/src/main/org/hornetq/core/paging/PageTransactionInfo.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2010-09-13 22:39:14
UTC (rev 9680)
+++ trunk/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2010-09-13 22:45:10
UTC (rev 9681)
@@ -40,12 +40,12 @@
long getTransactionID();
- void store(StorageManager storageManager,Transaction tx) throws Exception;
+ void store(StorageManager storageManager, PagingManager pagingManager, Transaction tx)
throws Exception;
- void storeUpdate(StorageManager storageManager, Transaction tx, int depages) throws
Exception;
+ void storeUpdate(StorageManager storageManager, PagingManager pagingManager,
Transaction tx, int depages) throws Exception;
// To be used after the update was stored or reload
- void update(int update, StorageManager storageManager);
+ void update(int update, StorageManager storageManager, PagingManager pagingManager);
void increment();
Modified: trunk/src/main/org/hornetq/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/PagingManager.java 2010-09-13 22:39:14 UTC (rev
9680)
+++ trunk/src/main/org/hornetq/core/paging/PagingManager.java 2010-09-13 22:45:10 UTC (rev
9681)
@@ -13,6 +13,8 @@
package org.hornetq.core.paging;
+import java.util.Map;
+
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.postoffice.PostOffice;
@@ -67,6 +69,8 @@
* @param transactionID
*/
void removeTransaction(long transactionID);
+
+ Map<Long, PageTransactionInfo> getTransactions();
/**
* Reload previously created PagingStores into memory
Modified: trunk/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-09-13
22:39:14 UTC (rev 9680)
+++ trunk/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-09-13
22:45:10 UTC (rev 9681)
@@ -20,6 +20,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PageTransactionInfo;
+import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.TransactionOperation;
@@ -56,6 +57,7 @@
public PageTransactionInfoImpl(final long transactionID)
{
+ this();
this.transactionID = transactionID;
countDownCompleted = new CountDownLatch(1);
}
@@ -81,7 +83,7 @@
return transactionID;
}
- public void update(final int update, final StorageManager storageManager)
+ public void update(final int update, final StorageManager storageManager,
PagingManager pagingManager)
{
int sizeAfterUpdate = numberOfMessages.addAndGet(-update);
if (sizeAfterUpdate == 0 && storageManager != null)
@@ -95,6 +97,11 @@
log.warn("Can't delete page transaction id=" + this.recordID);
}
}
+
+ if (sizeAfterUpdate == 0 && pagingManager != null)
+ {
+ pagingManager.removeTransaction(this.transactionID);
+ }
}
public void increment()
@@ -149,7 +156,7 @@
}
}
- public void store(final StorageManager storageManager, final Transaction tx) throws
Exception
+ public void store(final StorageManager storageManager, PagingManager pagingManager,
final Transaction tx) throws Exception
{
storageManager.storePageTransaction(tx.getID(), this);
}
@@ -157,7 +164,7 @@
/* (non-Javadoc)
* @see
org.hornetq.core.paging.PageTransactionInfo#storeUpdate(org.hornetq.core.persistence.StorageManager,
org.hornetq.core.transaction.Transaction, int)
*/
- public void storeUpdate(final StorageManager storageManager, final Transaction tx,
final int depages) throws Exception
+ public void storeUpdate(final StorageManager storageManager, final PagingManager
pagingManager, final Transaction tx, final int depages) throws Exception
{
storageManager.updatePageTransaction(tx.getID(), this, depages);
@@ -187,7 +194,7 @@
public void afterCommit(Transaction tx)
{
- pgToUpdate.update(depages, storageManager);
+ pgToUpdate.update(depages, storageManager, pagingManager);
}
});
}
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2010-09-13 22:39:14
UTC (rev 9680)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2010-09-13 22:45:10
UTC (rev 9681)
@@ -14,6 +14,7 @@
package org.hornetq.core.paging.impl;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -155,7 +156,16 @@
{
return transactions.get(id);
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.PagingManager#getTransactions()
+ */
+ public Map<Long, PageTransactionInfo> getTransactions()
+ {
+ return transactions;
+ }
+
// HornetQComponent implementation
//
------------------------------------------------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-09-13 22:39:14
UTC (rev 9680)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-09-13 22:45:10
UTC (rev 9681)
@@ -1059,7 +1059,7 @@
// This will set the journal transaction to commit;
depageTransaction.setContainsPersistent();
- entry.getKey().storeUpdate(storageManager, depageTransaction,
entry.getValue().intValue());
+ entry.getKey().storeUpdate(storageManager, this.pagingManager,
depageTransaction, entry.getValue().intValue());
}
depageTransaction.commit();
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-09-13
22:39:14 UTC (rev 9680)
+++
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-09-13
22:45:10 UTC (rev 9681)
@@ -913,7 +913,7 @@
PageTransactionInfo pageTX =
pagingManager.getTransaction(pageUpdate.pageTX);
- pageTX.update(pageUpdate.recods, null);
+ pageTX.update(pageUpdate.recods, null, null);
}
else
{
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-09-13
22:39:14 UTC (rev 9680)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-09-13
22:45:10 UTC (rev 9681)
@@ -1269,7 +1269,7 @@
store.sync();
}
- pageTransaction.store(storageManager, tx);
+ pageTransaction.store(storageManager, pagingManager, tx);
}
}
}
Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-09-13
22:39:14 UTC (rev 9680)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-09-13
22:45:10 UTC (rev 9681)
@@ -34,6 +34,7 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.DivertConfiguration;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.paging.impl.TestSupportPageStore;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
@@ -163,6 +164,10 @@
for (int i = 0; i < numberOfMessages; i++)
{
+ if (i % 500 == 0)
+ {
+ session.commit();
+ }
message = session.createMessage(true);
HornetQBuffer bodyLocal = message.getBodyBuffer();
@@ -240,6 +245,8 @@
}
consumer.close();
+
+ session.close();
}
catch (Throwable e)
{
@@ -260,6 +267,9 @@
{
threads[i].join();
}
+
+ assertEquals(0,
server.getPostOffice().getPagingManager().getTransactions().size());
+
}
finally
{
@@ -812,6 +822,136 @@
}
+
+ public void testDepageDuringTransaction4() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.getConfiguration().setJournalSyncNonTransactional(false);
+ server.getConfiguration().setJournalSyncTransactional(false);
+
+ server.start();
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ final int messageSize = 1024; // 1k
+ final int numberOfMessages = 10000;
+
+ try
+ {
+ final ClientSessionFactory sf = createInVMFactory();
+
+
+ sf.setBlockOnNonDurableSend(true);
+ sf.setBlockOnDurableSend(true);
+ sf.setBlockOnAcknowledge(false);
+
+ final byte[] body = new byte[messageSize];
+
+
+ Thread producerThread = new Thread()
+ {
+ public void run()
+ {
+ ClientSession sessionProducer = null;
+ try
+ {
+ sessionProducer = sf.createSession(false, false);
+ ClientProducer producer = sessionProducer.createProducer(ADDRESS);
+
+ for (int i = 0 ; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = sessionProducer.createMessage(true);
+ msg.getBodyBuffer().writeBytes(body);
+ msg.putIntProperty("count", i);
+ producer.send(msg);
+
+ if (i % 50 == 0 && i != 0)
+ {
+ sessionProducer.commit();
+ //Thread.sleep(500);
+ }
+ }
+
+ sessionProducer.commit();
+
+ System.out.println("Producer gone");
+
+
+
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace(); // >> junit report
+ errors.incrementAndGet();
+ }
+ finally
+ {
+ try
+ {
+ if (sessionProducer != null)
+ {
+ sessionProducer.close();
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ }
+ };
+
+ ClientSession session = sf.createSession(true, true, 0);
+ session.start();
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ producerThread.start();
+
+ ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+
+
+ for (int i = 0 ; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = consumer.receive(500000);
+ assertNotNull(msg);
+ assertEquals(i, msg.getIntProperty("count").intValue());
+ msg.acknowledge();
+ if (i > 0 && i % 10 == 0)
+ {
+ //session.commit();
+ }
+ }
+ //session.commit();
+
+ session.close();
+
+ producerThread.join();
+
+ assertEquals(0, errors.get());
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
public void testPageOnSchedulingNoRestart() throws Exception
{
internalTestPageOnScheduling(false);
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-09-13
22:39:14 UTC (rev 9680)
+++
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-09-13
22:45:10 UTC (rev 9681)
@@ -688,6 +688,154 @@
Assert.assertEquals(0, storeImpl.getAddressSize());
}
+ public void testOrderOnPaging() throws Throwable
+ {
+ clearData();
+ SequentialFileFactory factory = new NIOSequentialFileFactory(this.getPageDir());
+
+ PagingStoreFactory storeFactory = new FakeStoreFactory(factory);
+
+ final int MAX_SIZE = 1024 * 10;
+
+ AddressSettings settings = new AddressSettings();
+ settings.setPageSizeBytes(MAX_SIZE);
+ settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+
+ final TestSupportPageStore storeImpl = new
PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+ createMockManager(),
+
createStorageManagerMock(),
+ createPostOfficeMock(),
+ factory,
+ storeFactory,
+ new
SimpleString("test"),
+ settings,
+ executor,
+ true);
+
+ storeImpl.start();
+
+ Assert.assertEquals(0, storeImpl.getNumberOfPages());
+
+ // Marked the store to be paged
+ storeImpl.startPaging();
+
+ final CountDownLatch producedLatch = new CountDownLatch(1);
+
+ Assert.assertEquals(1, storeImpl.getNumberOfPages());
+
+ final SimpleString destination = new SimpleString("test");
+
+ final long NUMBER_OF_MESSAGES = 100000;
+
+ final List<Throwable> errors = new ArrayList<Throwable>();
+
+ class WriterThread extends Thread
+ {
+
+ public WriterThread()
+ {
+ super("PageWriter");
+ }
+
+ @Override
+ public void run()
+ {
+
+ try
+ {
+ for (long i = 0; i < NUMBER_OF_MESSAGES; i++)
+ {
+ // Each thread will Keep paging until all the messages are depaged.
+ // This is possible because the depage thread is not actually reading
the pages.
+ // Just using the internal API to remove it from the page file system
+ ServerMessage msg = createMessage(i, storeImpl, destination,
createRandomBuffer(i, 1024));
+ msg.putLongProperty("count", i);
+ while (!storeImpl.page(msg))
+ {
+ storeImpl.startPaging();
+ }
+
+ if (i == 0)
+ {
+ producedLatch.countDown();
+ }
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.add(e);
+ }
+ }
+ }
+
+ class ReaderThread extends Thread
+ {
+ public ReaderThread()
+ {
+ super("PageReader");
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+
+ long msgsRead = 0;
+
+ while (msgsRead < NUMBER_OF_MESSAGES)
+ {
+ Page page = storeImpl.depage();
+ if (page != null)
+ {
+ page.open();
+ List<PagedMessage> messages = page.read();
+
+ for (PagedMessage pgmsg : messages)
+ {
+ ServerMessage msg = pgmsg.getMessage(null);
+
+ assertEquals(msgsRead++, msg.getMessageID());
+
+ assertEquals(msg.getMessageID(),
msg.getLongProperty("count").longValue());
+ }
+
+ page.close();
+ page.delete();
+ }
+ else
+ {
+ System.out.println("Depaged!!!!");
+ Thread.sleep(500);
+ }
+ }
+
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.add(e);
+ }
+ }
+ }
+
+ WriterThread producerThread = new WriterThread();
+ producerThread.start();
+ ReaderThread consumer = new ReaderThread();
+ consumer.start();
+
+ producerThread.join();
+ consumer.join();
+
+ storeImpl.stop();
+
+ for (Throwable e: errors)
+ {
+ throw e;
+ }
+ }
+
/**
* @return
*/
@@ -878,6 +1026,15 @@
return false;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.PagingManager#getTransactions()
+ */
+ public Map<Long, PageTransactionInfo> getTransactions()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
class FakeStorageManager implements StorageManager
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2010-09-13
22:39:14 UTC (rev 9680)
+++
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2010-09-13
22:45:10 UTC (rev 9681)
@@ -17,6 +17,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -35,7 +36,6 @@
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.postoffice.impl.DuplicateIDCacheImpl;
-import org.hornetq.core.server.JournalType;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.transaction.impl.ResourceManagerImpl;
@@ -312,6 +312,14 @@
return false;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.PagingManager#getTransactions()
+ */
+ public Map<Long, PageTransactionInfo> getTransactions()
+ {
+ return null;
+ }
+
}
}