[hornetq-commits] JBoss hornetq SVN: r10048 - in trunk: src/main/org/hornetq/core/paging/cursor/impl and 1 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Dec 16 18:20:10 EST 2010
Author: clebert.suconic at jboss.com
Date: 2010-12-16 18:20:10 -0500 (Thu, 16 Dec 2010)
New Revision: 10048
Modified:
trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java
Log:
HORNETQ-574 more pagecounters stuff
Modified: trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java 2010-12-16 22:29:27 UTC (rev 10047)
+++ trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java 2010-12-16 23:20:10 UTC (rev 10048)
@@ -40,7 +40,6 @@
/**
*
- * This method is also used by Journal.loadMessageJournal
* @param id
* @param variance
*/
Modified: trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2010-12-16 22:29:27 UTC (rev 10047)
+++ trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2010-12-16 23:20:10 UTC (rev 10048)
@@ -50,7 +50,7 @@
// the journal record id that is holding the current value
private long recordID = -1;
- private final boolean persistent;
+ private boolean persistent;
private final StorageManager storage;
@@ -100,19 +100,20 @@
*/
public void increment(Transaction tx, int add) throws Exception
{
- tx.setContainsPersistent();
- if (!persistent)
+ if (persistent)
{
- replayIncrement(tx, -1, add);
+ tx.setContainsPersistent();
+ long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add);
+ replayIncrement(tx, id, add);
}
else
{
- long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add);
-
- replayIncrement(tx, id, add);
+ replayIncrement(tx, -1, add);
}
+
+
}
/**
@@ -199,6 +200,12 @@
incrementRecords.add(id);
}
}
+
+ /** used on testing only */
+ public void setPersistent(final boolean persistent)
+ {
+ this.persistent = persistent;
+ }
/** This method sould alwas be called from a single threaded executor */
protected void cleanup()
Modified: trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java 2010-12-16 22:29:27 UTC (rev 10047)
+++ trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java 2010-12-16 23:20:10 UTC (rev 10048)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.paging.cursor.PageSubscriptionCounter;
+import org.hornetq.core.paging.cursor.impl.PageSubscriptionCounterImpl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
@@ -145,6 +146,69 @@
}
}
+
+ public void testCleanupCounterNonPersistent() throws Exception
+ {
+ ClientSessionFactory sf = sl.createSessionFactory();
+ ClientSession session = sf.createSession();
+
+ try
+ {
+ Queue queue = server.createQueue(new SimpleString("A1"), new SimpleString("A1"), null, true, false);
+
+ PageSubscriptionCounter counter = locateCounter(queue);
+
+ ((PageSubscriptionCounterImpl)counter).setPersistent(false);
+
+ StorageManager storage = server.getStorageManager();
+
+ Transaction tx = new TransactionImpl(server.getStorageManager());
+
+ for (int i = 0 ; i < 2100; i++)
+ {
+
+ counter.increment(tx, 1);
+
+ if (i % 200 == 0)
+ {
+ tx.commit();
+
+ storage.waitOnOperations();
+
+ assertEquals(i + 1, counter.getValue());
+
+ tx = new TransactionImpl(server.getStorageManager());
+ }
+ }
+
+ tx.commit();
+
+ storage.waitOnOperations();
+
+ assertEquals(2100, counter.getValue());
+
+ server.stop();
+
+ server = newHornetQServer();
+
+ server.start();
+
+ queue = server.locateQueue(new SimpleString("A1"));
+
+ assertNotNull(queue);
+
+ counter = locateCounter(queue);
+
+ assertEquals(0, counter.getValue());
+
+ }
+ finally
+ {
+ sf.close();
+ session.close();
+ }
+ }
+
public void testRestartCounter() throws Exception
{
Queue queue = server.createQueue(new SimpleString("A1"), new SimpleString("A1"), null, true, false);
More information about the hornetq-commits
mailing list