[hornetq-commits] JBoss hornetq SVN: r10048 - in trunk: src/main/org/hornetq/core/paging/cursor/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Dec 16 18:20:10 EST 2010


Author: clebert.suconic at jboss.com
Date: 2010-12-16 18:20:10 -0500 (Thu, 16 Dec 2010)
New Revision: 10048

Modified:
   trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
   trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
   trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java
Log:
HORNETQ-574 more pagecounters stuff

Modified: trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java	2010-12-16 22:29:27 UTC (rev 10047)
+++ trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java	2010-12-16 23:20:10 UTC (rev 10048)
@@ -40,7 +40,6 @@
 
    /**
     * 
-    * This method is also used by Journal.loadMessageJournal
     * @param id
     * @param variance
     */

Modified: trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java	2010-12-16 22:29:27 UTC (rev 10047)
+++ trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java	2010-12-16 23:20:10 UTC (rev 10048)
@@ -50,7 +50,7 @@
    // the journal record id that is holding the current value
    private long recordID = -1;
 
-   private final boolean persistent;
+   private boolean persistent;
 
    private final StorageManager storage;
 
@@ -100,19 +100,20 @@
     */
    public void increment(Transaction tx, int add) throws Exception
    {
-      tx.setContainsPersistent();
 
-      if (!persistent)
+      if (persistent)
       {
-         replayIncrement(tx, -1, add);
+         tx.setContainsPersistent();
+         long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add);
+         replayIncrement(tx, id, add);
       }
       else
       {
-         long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add);
-
-         replayIncrement(tx, id, add);
+         replayIncrement(tx, -1, add);
       }
 
+
+      
    }
 
    /**
@@ -199,6 +200,12 @@
          incrementRecords.add(id);
       }
    }
+   
+   /** used on testing only */
+   public void setPersistent(final boolean persistent)
+   {
+      this.persistent = persistent;
+   }
 
    /** This method sould alwas be called from a single threaded executor */
    protected void cleanup()

Modified: trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java	2010-12-16 22:29:27 UTC (rev 10047)
+++ trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java	2010-12-16 23:20:10 UTC (rev 10048)
@@ -21,6 +21,7 @@
 import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.core.paging.cursor.PageSubscription;
 import org.hornetq.core.paging.cursor.PageSubscriptionCounter;
+import org.hornetq.core.paging.cursor.impl.PageSubscriptionCounterImpl;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.Queue;
@@ -145,6 +146,69 @@
       }
    }
 
+
+   public void testCleanupCounterNonPersistent() throws Exception
+   {
+      ClientSessionFactory sf = sl.createSessionFactory();
+      ClientSession session = sf.createSession();
+
+      try
+      {
+         Queue queue = server.createQueue(new SimpleString("A1"), new SimpleString("A1"), null, true, false);
+
+         PageSubscriptionCounter counter = locateCounter(queue);
+         
+         ((PageSubscriptionCounterImpl)counter).setPersistent(false);
+
+         StorageManager storage = server.getStorageManager();
+
+         Transaction tx = new TransactionImpl(server.getStorageManager());
+
+         for (int i = 0 ; i < 2100; i++)
+         {
+
+            counter.increment(tx, 1);
+   
+            if (i % 200 == 0)
+            {
+               tx.commit();
+      
+               storage.waitOnOperations();
+
+               assertEquals(i + 1, counter.getValue());
+               
+               tx = new TransactionImpl(server.getStorageManager());
+            }
+         }
+
+         tx.commit();
+         
+         storage.waitOnOperations();
+         
+         assertEquals(2100, counter.getValue());
+         
+         server.stop();
+
+         server = newHornetQServer();
+
+         server.start();
+
+         queue = server.locateQueue(new SimpleString("A1"));
+
+         assertNotNull(queue);
+
+         counter = locateCounter(queue);
+
+         assertEquals(0, counter.getValue());
+
+      }
+      finally
+      {
+         sf.close();
+         session.close();
+      }
+   }
+
    public void testRestartCounter() throws Exception
    {
       Queue queue = server.createQueue(new SimpleString("A1"), new SimpleString("A1"), null, true, false);



More information about the hornetq-commits mailing list