[hornetq-commits] JBoss hornetq SVN: r10046 - in trunk: src/main/org/hornetq/core/persistence/impl/journal and 1 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Dec 16 17:14:19 EST 2010
Author: clebert.suconic at jboss.com
Date: 2010-12-16 17:14:18 -0500 (Thu, 16 Dec 2010)
New Revision: 10046
Modified:
trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java
Log:
HORNETQ-574 more pagecounters stuff
Modified: trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2010-12-16 19:47:43 UTC (rev 10045)
+++ trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2010-12-16 22:14:18 UTC (rev 10046)
@@ -169,13 +169,16 @@
*/
public void processReload()
{
- for (Pair<Long, Integer> incElement : loadList)
+ if (loadList != null)
{
- value.addAndGet(incElement.b);
- incrementRecords.add(incElement.a);
+ for (Pair<Long, Integer> incElement : loadList)
+ {
+ value.addAndGet(incElement.b);
+ incrementRecords.add(incElement.a);
+ }
+ loadList.clear();
+ loadList = null;
}
- loadList.clear();
- loadList = null;
}
/* (non-Javadoc)
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-12-16 19:47:43 UTC (rev 10045)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-12-16 22:14:18 UTC (rev 10046)
@@ -1064,7 +1064,7 @@
if (sub != null)
{
- sub.getCounter().loadValue(record.id, encoding.value);
+ sub.getCounter().loadInc(record.id, encoding.value);
}
else
{
@@ -1127,6 +1127,11 @@
}
loadPreparedTransactions(postOffice, pagingManager, resourceManager, queues, queueInfos, preparedTransactions, duplicateIDMap, pageSubscriptions);
+
+ for (PageSubscription sub: pageSubscriptions.values())
+ {
+ sub.getCounter().processReload();
+ }
for (LargeServerMessage msg : largeMessages)
{
Modified: trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java 2010-12-16 19:47:43 UTC (rev 10045)
+++ trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java 2010-12-16 22:14:18 UTC (rev 10046)
@@ -13,14 +13,14 @@
package org.hornetq.tests.integration.paging;
+import javax.transaction.xa.Xid;
+
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.ServerLocator;
-import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.paging.cursor.PageSubscriptionCounter;
-import org.hornetq.core.paging.cursor.impl.PageSubscriptionCounterImpl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
@@ -85,6 +85,66 @@
}
}
+ public void testCleanupCounter() throws Exception
+ {
+ ClientSessionFactory sf = sl.createSessionFactory();
+ ClientSession session = sf.createSession();
+
+ try
+ {
+ Queue queue = server.createQueue(new SimpleString("A1"), new SimpleString("A1"), null, true, false);
+
+ PageSubscriptionCounter counter = locateCounter(queue);
+
+ StorageManager storage = server.getStorageManager();
+
+ Transaction tx = new TransactionImpl(server.getStorageManager());
+
+ for (int i = 0 ; i < 2100; i++)
+ {
+
+ counter.increment(tx, 1);
+
+ if (i % 200 == 0)
+ {
+ tx.commit();
+
+ storage.waitOnOperations();
+
+ assertEquals(i + 1, counter.getValue());
+
+ tx = new TransactionImpl(server.getStorageManager());
+ }
+ }
+
+ tx.commit();
+
+ storage.waitOnOperations();
+
+ assertEquals(2100, counter.getValue());
+
+ server.stop();
+
+ server = newHornetQServer();
+
+ server.start();
+
+ queue = server.locateQueue(new SimpleString("A1"));
+
+ assertNotNull(queue);
+
+ counter = locateCounter(queue);
+
+ assertEquals(2100, counter.getValue());
+
+ }
+ finally
+ {
+ sf.close();
+ session.close();
+ }
+ }
+
public void testRestartCounter() throws Exception
{
Queue queue = server.createQueue(new SimpleString("A1"), new SimpleString("A1"), null, true, false);
@@ -112,13 +172,13 @@
server = newHornetQServer();
server.start();
-
+
queue = server.locateQueue(new SimpleString("A1"));
-
+
assertNotNull(queue);
-
+
counter = locateCounter(queue);
-
+
assertEquals(1, counter.getValue());
}
@@ -141,34 +201,52 @@
public void testPrepareCounter() throws Exception
{
- ClientSessionFactory sf = sl.createSessionFactory();
- ClientSession session = sf.createSession();
+ Xid xid = newXID();
- try
- {
- Queue queue = server.createQueue(new SimpleString("A1"), new SimpleString("A1"), null, true, false);
+ Queue queue = server.createQueue(new SimpleString("A1"), new SimpleString("A1"), null, true, false);
- PageSubscriptionCounter counter = locateCounter(queue);
+ PageSubscriptionCounter counter = locateCounter(queue);
- StorageManager storage = server.getStorageManager();
+ StorageManager storage = server.getStorageManager();
- Transaction tx = new TransactionImpl(server.getStorageManager());
+ Transaction tx = new TransactionImpl(xid, server.getStorageManager(), 300);
+ for (int i = 0 ; i < 2000; i++)
+ {
counter.increment(tx, 1);
+ }
- assertEquals(0, counter.getValue());
+ assertEquals(0, counter.getValue());
- tx.commit();
+ tx.prepare();
- storage.waitOnOperations();
+ storage.waitOnOperations();
- assertEquals(1, counter.getValue());
- }
- finally
- {
- sf.close();
- session.close();
- }
+ assertEquals(0, counter.getValue());
+
+ server.stop();
+
+ server = newHornetQServer();
+
+ server.start();
+
+ queue = server.locateQueue(new SimpleString("A1"));
+
+ assertNotNull(queue);
+
+ counter = locateCounter(queue);
+
+ tx = server.getResourceManager().removeTransaction(xid);
+
+ assertNotNull(tx);
+
+ assertEquals(0, counter.getValue());
+
+ tx.commit(false);
+
+ assertEquals(2000, counter.getValue());
+
+
}
// Package protected ---------------------------------------------
More information about the hornetq-commits
mailing list