[hornetq-commits] JBoss hornetq SVN: r10137 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Jan 24 20:09:51 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-01-24 20:09:51 -0500 (Mon, 24 Jan 2011)
New Revision: 10137

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java
Log:
HORNETQ-628 - fix for paging counters

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2011-01-24 20:40:55 UTC (rev 10136)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2011-01-25 01:09:51 UTC (rev 10137)
@@ -102,8 +102,6 @@
    private final SortedMap<Long, PageCursorInfo> consumedPages = Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
    
    private final PageSubscriptionCounter counter;
-   
-   private final AtomicLong deliveredCount = new AtomicLong(0);
 
    // We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
    private final ConcurrentLinkedQueue<PagePosition> redeliveries = new ConcurrentLinkedQueue<PagePosition>();
@@ -178,7 +176,7 @@
    
    public long getMessageCount()
    {
-      return counter.getValue() - deliveredCount.get();
+      return counter.getValue();
    }
 
    public PageSubscriptionCounter getCounter()
@@ -969,7 +967,6 @@
             for (PagePosition confirmed : positions)
             {
                cursor.processACK(confirmed);
-               cursor.deliveredCount.decrementAndGet();
             }
 
          }
@@ -1206,7 +1203,6 @@
       {
          if (!isredelivery)
          {
-            deliveredCount.incrementAndGet();
             PageSubscriptionImpl.this.getPageInfo(position).remove(position);
          }
       }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-01-24 20:40:55 UTC (rev 10136)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-01-25 01:09:51 UTC (rev 10137)
@@ -662,11 +662,11 @@
       {
          if (pageSubscription != null)
          {
-            return messageReferences.size() + getScheduledCount() + getDeliveringCount() + pageSubscription.getMessageCount();
+            return messageReferences.size() + getScheduledCount()  + pageSubscription.getMessageCount();
          }
          else
          {
-            return messageReferences.size() + getScheduledCount() + getDeliveringCount();
+            return messageReferences.size() + getScheduledCount();
          }
       }
    }

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java	2011-01-24 20:40:55 UTC (rev 10136)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java	2011-01-25 01:09:51 UTC (rev 10137)
@@ -15,6 +15,7 @@
 
 import java.nio.ByteBuffer;
 import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.SimpleString;
@@ -28,14 +29,20 @@
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.postoffice.Bindings;
+import org.hornetq.core.postoffice.impl.LocalQueueBinding;
 import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.ServerSession;
 import org.hornetq.core.server.impl.QueueImpl;
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.tests.util.ServiceTestBase;
 
 /**
- * A PagingOrderTest
+ * A PagingOrderTest.
+ * 
+ * PagingTest has a lot of tests already. I decided to create a newer one more specialized on Ordering and counters
  *
  * @author clebertsuconic
  *
@@ -177,12 +184,10 @@
          {
             ClientMessage message = consumer.receive(5000);
             assertNotNull(message);
-            System.out.println("msg = " + message.getIntProperty("id"));
             assertEquals(i, message.getIntProperty("id").intValue());
 
             if (i < 100)
             {
-               System.out.println("Acking " + i);
                // Do not consume the last one so we could restart
                message.acknowledge();
             }
@@ -211,7 +216,6 @@
          {
             ClientMessage message = consumer.receive(5000);
             assertNotNull(message);
-            System.out.println("msg = " + message.getIntProperty("id"));
             assertEquals(i, message.getIntProperty("id").intValue());
             message.acknowledge();
          }
@@ -235,6 +239,305 @@
 
    }
 
+   public void testPageCounter() throws Throwable
+   {
+      boolean persistentMessages = true;
+
+      Configuration config = createDefaultConfig();
+
+      config.setJournalSyncNonTransactional(false);
+
+      HornetQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      final int messageSize = 1024;
+
+      final int numberOfMessages = 500;
+
+      try
+      {
+         ServerLocator locator = createInVMNonHALocator();
+
+         locator.setClientFailureCheckPeriod(1000);
+         locator.setConnectionTTL(2000);
+         locator.setReconnectAttempts(0);
+
+         locator.setBlockOnNonDurableSend(true);
+         locator.setBlockOnDurableSend(true);
+         locator.setBlockOnAcknowledge(true);
+         locator.setConsumerWindowSize(1024 * 1024);
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         ClientSession session = sf.createSession(false, false, false);
+
+         Queue q1 = server.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+         Queue q2 = server.createQueue(ADDRESS, new SimpleString("inactive"), null, true, false);
+
+         ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+         byte[] body = new byte[messageSize];
+
+         ByteBuffer bb = ByteBuffer.wrap(body);
+
+         for (int j = 1; j <= messageSize; j++)
+         {
+            bb.put(getSamplebyte(j));
+         }
+
+         final AtomicInteger errors = new AtomicInteger(0);
+
+         Thread t1 = new Thread()
+         {
+            public void run()
+            {
+               try
+               {
+                  ServerLocator sl = createInVMNonHALocator();
+                  ClientSessionFactory sf = sl.createSessionFactory();
+                  ClientSession sess = sf.createSession(true, true, 0);
+                  sess.start();
+                  ClientConsumer cons = sess.createConsumer(ADDRESS);
+                  for (int i = 0; i < numberOfMessages; i++)
+                  {
+                     ClientMessage msg = cons.receive(5000);
+                     assertNotNull(msg);
+                     assertEquals(i, msg.getIntProperty("id").intValue());
+                     msg.acknowledge();
+                  }
+                  
+                  assertNull(cons.receiveImmediate());
+                  sess.close();
+                  sl.close();
+               }
+               catch (Throwable e)
+               {
+                  e.printStackTrace();
+                  errors.incrementAndGet();
+               }
+
+            }
+         };
+
+         t1.start();
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage message = session.createMessage(persistentMessages);
+
+            HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+            bodyLocal.writeBytes(body);
+
+            message.putIntProperty(new SimpleString("id"), i);
+
+            producer.send(message);
+            if (i % 20 == 0)
+            {
+               session.commit();
+            }
+         }
+
+         session.commit();
+
+         t1.join();
+
+         assertEquals(0, errors.get());
+
+         assertEquals(numberOfMessages, q2.getMessageCount());
+         assertEquals(0, q1.getMessageCount());
+         
+         
+         session.close();
+         sf.close();
+         locator.close();
+         
+         server.stop();
+         
+         
+         server.start();
+         
+         Bindings bindings = server.getPostOffice().getBindingsForAddress(ADDRESS);
+         
+         q1 = null;
+         q2 = null;
+         
+         for (Binding bind : bindings.getBindings())
+         {
+            if (bind instanceof LocalQueueBinding)
+            {
+               LocalQueueBinding qb = (LocalQueueBinding)bind;
+               if (qb.getQueue().getName().equals(ADDRESS))
+               {
+                  q1 = qb.getQueue();
+               }
+               
+               if (qb.getQueue().getName().equals(new SimpleString("inactive")))
+               {
+                  q2 = qb.getQueue();
+               }
+            }
+         }
+         
+         assertNotNull(q1);
+         
+         assertNotNull(q2);
+         
+
+         assertEquals(numberOfMessages, q2.getMessageCount());
+         assertEquals(0, q1.getMessageCount());
+         
+         
+         
+         
+
+      }
+      catch (Throwable e)
+      {
+         e.printStackTrace();
+         throw e;
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+   }
+
+   public void testPageCounter2() throws Throwable
+   {
+      boolean persistentMessages = true;
+
+      Configuration config = createDefaultConfig();
+
+      config.setJournalSyncNonTransactional(false);
+
+      HornetQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      final int messageSize = 1024;
+
+      final int numberOfMessages = 500;
+
+      try
+      {
+         ServerLocator locator = createInVMNonHALocator();
+
+         locator.setClientFailureCheckPeriod(1000);
+         locator.setConnectionTTL(2000);
+         locator.setReconnectAttempts(0);
+
+         locator.setBlockOnNonDurableSend(true);
+         locator.setBlockOnDurableSend(true);
+         locator.setBlockOnAcknowledge(true);
+         locator.setConsumerWindowSize(1024 * 1024);
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         ClientSession session = sf.createSession(false, false, false);
+
+         Queue q1 = server.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+         Queue q2 = server.createQueue(ADDRESS, new SimpleString("inactive"), null, true, false);
+
+         ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+         byte[] body = new byte[messageSize];
+
+         ByteBuffer bb = ByteBuffer.wrap(body);
+
+         for (int j = 1; j <= messageSize; j++)
+         {
+            bb.put(getSamplebyte(j));
+         }
+
+         final AtomicInteger errors = new AtomicInteger(0);
+
+         Thread t1 = new Thread()
+         {
+            public void run()
+            {
+               try
+               {
+                  ServerLocator sl = createInVMNonHALocator();
+                  ClientSessionFactory sf = sl.createSessionFactory();
+                  ClientSession sess = sf.createSession(true, true, 0);
+                  sess.start();
+                  ClientConsumer cons = sess.createConsumer(ADDRESS);
+                  for (int i = 0; i < 10; i++)
+                  {
+                     ClientMessage msg = cons.receive(5000);
+                     assertNotNull(msg);
+                     assertEquals(i, msg.getIntProperty("id").intValue());
+                     msg.acknowledge();
+                  }
+                  sess.close();
+                  sl.close();
+               }
+               catch (Throwable e)
+               {
+                  e.printStackTrace();
+                  errors.incrementAndGet();
+               }
+
+            }
+         };
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage message = session.createMessage(persistentMessages);
+
+            HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+            bodyLocal.writeBytes(body);
+
+            message.putIntProperty(new SimpleString("id"), i);
+
+            producer.send(message);
+            if (i % 20 == 0)
+            {
+               session.commit();
+            }
+         }
+
+         session.commit();
+
+         t1.start();
+         t1.join();
+
+         assertEquals(0, errors.get());
+
+         assertEquals(numberOfMessages, q2.getMessageCount());
+         assertEquals(numberOfMessages - 10, q1.getMessageCount());
+
+      }
+      catch (Throwable e)
+      {
+         e.printStackTrace();
+         throw e;
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+   }
+
    public void testOrderOverRollback() throws Throwable
    {
       boolean persistentMessages = true;
@@ -302,8 +605,6 @@
 
          session.close();
 
-         System.out.println("number of refs " + queue.getNumberOfReferences());
-
          session = sf.createSession(false, false, 0);
 
          session.start();
@@ -314,7 +615,6 @@
          {
             ClientMessage message = consumer.receive(5000);
             assertNotNull(message);
-            System.out.println("msg = " + message.getIntProperty("id"));
             assertEquals(i, message.getIntProperty("id").intValue());
             message.acknowledge();
          }
@@ -333,7 +633,6 @@
          {
             ClientMessage message = consumer.receive(5000);
             assertNotNull(message);
-            System.out.println("msg = " + message.getIntProperty("id"));
             assertEquals(i, message.getIntProperty("id").intValue());
             message.acknowledge();
          }



More information about the hornetq-commits mailing list