[hornetq-commits] JBoss hornetq SVN: r10046 - in trunk: src/main/org/hornetq/core/persistence/impl/journal and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Dec 16 17:14:19 EST 2010


Author: clebert.suconic at jboss.com
Date: 2010-12-16 17:14:18 -0500 (Thu, 16 Dec 2010)
New Revision: 10046

Modified:
   trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
   trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java
Log:
HORNETQ-574 more pagecounters stuff

Modified: trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java	2010-12-16 19:47:43 UTC (rev 10045)
+++ trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java	2010-12-16 22:14:18 UTC (rev 10046)
@@ -169,13 +169,16 @@
     */
    public void processReload()
    {
-      for (Pair<Long, Integer> incElement : loadList)
+      if (loadList != null)
       {
-         value.addAndGet(incElement.b);
-         incrementRecords.add(incElement.a);
+         for (Pair<Long, Integer> incElement : loadList)
+         {
+            value.addAndGet(incElement.b);
+            incrementRecords.add(incElement.a);
+         }
+         loadList.clear();
+         loadList = null;
       }
-      loadList.clear();
-      loadList = null;
    }
 
    /* (non-Javadoc)

Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2010-12-16 19:47:43 UTC (rev 10045)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2010-12-16 22:14:18 UTC (rev 10046)
@@ -1064,7 +1064,7 @@
 
                if (sub != null)
                {
-                  sub.getCounter().loadValue(record.id, encoding.value);
+                  sub.getCounter().loadInc(record.id, encoding.value);
                }
                else
                {
@@ -1127,6 +1127,11 @@
       }
 
       loadPreparedTransactions(postOffice, pagingManager, resourceManager, queues, queueInfos, preparedTransactions, duplicateIDMap, pageSubscriptions);
+      
+      for (PageSubscription sub: pageSubscriptions.values())
+      {
+         sub.getCounter().processReload();
+      }
 
       for (LargeServerMessage msg : largeMessages)
       {

Modified: trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java	2010-12-16 19:47:43 UTC (rev 10045)
+++ trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java	2010-12-16 22:14:18 UTC (rev 10046)
@@ -13,14 +13,14 @@
 
 package org.hornetq.tests.integration.paging;
 
+import javax.transaction.xa.Xid;
+
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.ServerLocator;
-import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.paging.cursor.PageSubscription;
 import org.hornetq.core.paging.cursor.PageSubscriptionCounter;
-import org.hornetq.core.paging.cursor.impl.PageSubscriptionCounterImpl;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.Queue;
@@ -85,6 +85,66 @@
       }
    }
 
+   public void testCleanupCounter() throws Exception
+   {
+      ClientSessionFactory sf = sl.createSessionFactory();
+      ClientSession session = sf.createSession();
+
+      try
+      {
+         Queue queue = server.createQueue(new SimpleString("A1"), new SimpleString("A1"), null, true, false);
+
+         PageSubscriptionCounter counter = locateCounter(queue);
+
+         StorageManager storage = server.getStorageManager();
+
+         Transaction tx = new TransactionImpl(server.getStorageManager());
+
+         for (int i = 0 ; i < 2100; i++)
+         {
+
+            counter.increment(tx, 1);
+   
+            if (i % 200 == 0)
+            {
+               tx.commit();
+      
+               storage.waitOnOperations();
+
+               assertEquals(i + 1, counter.getValue());
+               
+               tx = new TransactionImpl(server.getStorageManager());
+            }
+         }
+
+         tx.commit();
+         
+         storage.waitOnOperations();
+         
+         assertEquals(2100, counter.getValue());
+         
+         server.stop();
+
+         server = newHornetQServer();
+
+         server.start();
+
+         queue = server.locateQueue(new SimpleString("A1"));
+
+         assertNotNull(queue);
+
+         counter = locateCounter(queue);
+
+         assertEquals(2100, counter.getValue());
+
+      }
+      finally
+      {
+         sf.close();
+         session.close();
+      }
+   }
+
    public void testRestartCounter() throws Exception
    {
       Queue queue = server.createQueue(new SimpleString("A1"), new SimpleString("A1"), null, true, false);
@@ -112,13 +172,13 @@
       server = newHornetQServer();
 
       server.start();
-      
+
       queue = server.locateQueue(new SimpleString("A1"));
-      
+
       assertNotNull(queue);
-      
+
       counter = locateCounter(queue);
-      
+
       assertEquals(1, counter.getValue());
 
    }
@@ -141,34 +201,52 @@
 
    public void testPrepareCounter() throws Exception
    {
-      ClientSessionFactory sf = sl.createSessionFactory();
-      ClientSession session = sf.createSession();
+      Xid xid = newXID();
 
-      try
-      {
-         Queue queue = server.createQueue(new SimpleString("A1"), new SimpleString("A1"), null, true, false);
+      Queue queue = server.createQueue(new SimpleString("A1"), new SimpleString("A1"), null, true, false);
 
-         PageSubscriptionCounter counter = locateCounter(queue);
+      PageSubscriptionCounter counter = locateCounter(queue);
 
-         StorageManager storage = server.getStorageManager();
+      StorageManager storage = server.getStorageManager();
 
-         Transaction tx = new TransactionImpl(server.getStorageManager());
+      Transaction tx = new TransactionImpl(xid, server.getStorageManager(), 300);
 
+      for (int i = 0 ; i < 2000; i++)
+      {
          counter.increment(tx, 1);
+      }
 
-         assertEquals(0, counter.getValue());
+      assertEquals(0, counter.getValue());
 
-         tx.commit();
+      tx.prepare();
 
-         storage.waitOnOperations();
+      storage.waitOnOperations();
 
-         assertEquals(1, counter.getValue());
-      }
-      finally
-      {
-         sf.close();
-         session.close();
-      }
+      assertEquals(0, counter.getValue());
+      
+      server.stop();
+      
+      server = newHornetQServer();
+      
+      server.start();
+      
+      queue = server.locateQueue(new SimpleString("A1"));
+      
+      assertNotNull(queue);
+      
+      counter = locateCounter(queue);
+      
+      tx = server.getResourceManager().removeTransaction(xid);
+      
+      assertNotNull(tx);
+      
+      assertEquals(0, counter.getValue());
+      
+      tx.commit(false);
+      
+      assertEquals(2000, counter.getValue());
+      
+      
    }
 
    // Package protected ---------------------------------------------



More information about the hornetq-commits mailing list