[hornetq-commits] JBoss hornetq SVN: r11939 - in branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710: src/main/org/hornetq/core/paging/cursor/impl and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Dec 23 09:14:45 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-12-23 09:14:45 -0500 (Fri, 23 Dec 2011)
New Revision: 11939

Modified:
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
Backporting JBPAPP-7817

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/PageSubscription.java	2011-12-23 14:05:26 UTC (rev 11938)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/PageSubscription.java	2011-12-23 14:14:45 UTC (rev 11939)
@@ -58,7 +58,7 @@
 
    void scheduleCleanupCheck();
 
-   void cleanupEntries() throws Exception;
+   void cleanupEntries(boolean completeDelete) throws Exception;
 
    void disableAutoCleanup();
 

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java	2011-12-23 14:05:26 UTC (rev 11938)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java	2011-12-23 14:14:45 UTC (rev 11939)
@@ -44,5 +44,8 @@
     * @param variance
     */
    void addInc(long id, int variance);
+   
+   // used when deleting the counter
+   void delete() throws Exception;
 
 }
\ No newline at end of file

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java	2011-12-23 14:05:26 UTC (rev 11938)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java	2011-12-23 14:14:45 UTC (rev 11939)
@@ -175,6 +175,25 @@
       }
 
    }
+   
+   public void delete() throws Exception
+   {
+      synchronized (this)
+      {
+         long tx = storage.generateUniqueID();
+         for (Long record : incrementRecords)
+         {
+            storage.deleteIncrementRecord(tx, record.longValue());
+         }
+         
+         if (recordID >= 0)
+         {
+            storage.deletePageCounter(tx, this.recordID);
+         }
+         
+         storage.commit(tx);
+      }
+   }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageSubscriptionCounter#loadInc(long, int)

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2011-12-23 14:05:26 UTC (rev 11938)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2011-12-23 14:14:45 UTC (rev 11939)
@@ -201,7 +201,7 @@
             {
                try
                {
-                  cleanupEntries();
+                  cleanupEntries(false);
                }
                catch (Exception e)
                {
@@ -215,8 +215,12 @@
    /** 
     * It will cleanup all the records for completed pages
     * */
-   public void cleanupEntries() throws Exception
+   public void cleanupEntries(final boolean completeDelete) throws Exception
    {
+      if (completeDelete)
+      {
+         counter.delete();
+      }
       Transaction tx = new TransactionImpl(store);
 
       boolean persist = false;
@@ -292,7 +296,10 @@
                      }
                   }
 
-                  cursorProvider.scheduleCleanup();
+                  if (!completeDelete) 
+                  {
+                     cursorProvider.scheduleCleanup();
+                  }
                }
             });
          }

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-12-23 14:05:26 UTC (rev 11938)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-12-23 14:14:45 UTC (rev 11939)
@@ -1084,7 +1084,8 @@
                }
                else
                {
-                  log.warn("Can't find queue " + encoding.queueID + " while reloading ACKNOWLEDGE_CURSOR");
+                  log.debug("Can't find queue " + encoding.queueID + " while reloading ACKNOWLEDGE_CURSOR");
+                  messageJournal.appendDeleteRecord(record.id, false);
                }
 
                break;
@@ -1103,7 +1104,8 @@
                }
                else
                {
-                  log.warn("Can't find queue " + encoding.queueID + " while reloading ACKNOWLEDGE_CURSOR");
+                  log.debug("Can't find queue " + encoding.queueID + " while reloading PAGE_CURSOR_COUNTER_VALUE");
+                  messageJournal.appendDeleteRecord(record.id, false);
                }
 
                break;
@@ -1123,7 +1125,8 @@
                }
                else
                {
-                  log.warn("Can't find queue " + encoding.queueID + " while reloading ACKNOWLEDGE_CURSOR");
+                  log.debug("Can't find queue " + encoding.queueID + " while reloading PAGE_CURSOR_COUNTER_INC");
+                  messageJournal.appendDeleteRecord(record.id, false);
                }
 
                break;

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-12-23 14:05:26 UTC (rev 11938)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-12-23 14:14:45 UTC (rev 11939)
@@ -1088,11 +1088,6 @@
       }
 
       Queue queue = (Queue)binding.getBindable();
-      
-      if (queue.getPageSubscription() != null)
-      {
-         queue.getPageSubscription().close();
-      }
 
       if (queue.getConsumerCount() != 0)
       {
@@ -1116,14 +1111,27 @@
          }
       }
 
+      postOffice.removeBinding(queueName);
+
       queue.deleteAllReferences();
 
       if (queue.isDurable())
       {
          storageManager.deleteQueueBinding(queue.getID());
       }
+      
 
-      postOffice.removeBinding(queueName);
+      if (queue.getPageSubscription() != null)
+      {
+         queue.getPageSubscription().close();
+      }
+      
+      PageSubscription subs = queue.getPageSubscription();
+      
+      if (subs != null)
+      {
+         subs.cleanupEntries(true);
+      }
    }
 
    public synchronized void registerActivateCallback(final ActivateCallback callback)

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2011-12-23 14:05:26 UTC (rev 11938)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2011-12-23 14:14:45 UTC (rev 11939)
@@ -24,6 +24,7 @@
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.transaction.xa.XAResource;
@@ -121,6 +122,8 @@
    {
       locator.close();
 
+      locator = null;
+
       super.tearDown();
    }
 
@@ -644,8 +647,7 @@
                                PagingTest.PAGE_MAX,
                                new HashMap<String, AddressSettings>());
          server.start();
-         
-         
+
          locator = createInVMNonHALocator();
          locator.setBlockOnNonDurableSend(true);
          locator.setBlockOnDurableSend(true);
@@ -659,7 +661,7 @@
          session = sf.createSession(false, false, false);
 
          producer = session.createProducer(PagingTest.ADDRESS);
-         
+
          for (int i = 0; i < numberOfMessages; i++)
          {
             message = session.createMessage(true);
@@ -676,9 +678,9 @@
                session.commit();
             }
          }
-         
+
          session.commit();
-         
+
          server.stop();
 
          server = createServer(true,
@@ -693,7 +695,7 @@
 
          queue = server.locateQueue(ADDRESS);
 
-        // assertEquals(numberOfMessages, queue.getMessageCount());
+         // assertEquals(numberOfMessages, queue.getMessageCount());
 
          xids = new LinkedList<Xid>();
 
@@ -725,7 +727,6 @@
 
          sessionConsumer.close();
 
-
       }
       finally
       {
@@ -1038,50 +1039,56 @@
 
       server.start();
 
-      ServerLocator locator = createInVMNonHALocator();
+      try
+      {
 
-      locator.setBlockOnNonDurableSend(true);
-      locator.setBlockOnDurableSend(true);
-      locator.setBlockOnAcknowledge(true);
+         ServerLocator locator = createInVMNonHALocator();
 
-      ClientSessionFactory csf = locator.createSessionFactory();
+         locator.setBlockOnNonDurableSend(true);
+         locator.setBlockOnDurableSend(true);
+         locator.setBlockOnAcknowledge(true);
 
-      ClientSession session = csf.createSession();
+         ClientSessionFactory csf = locator.createSessionFactory();
 
-      session.start();
+         ClientSession session = csf.createSession();
 
-      for (int i = 1; i <= 2; i++)
-      {
-         ClientConsumer cons = session.createConsumer("q" + i);
+         session.start();
 
-         for (int j = 3; j < 6; j++)
+         for (int i = 1; i <= 2; i++)
          {
-            ClientMessage msg = cons.receive(5000);
+            ClientConsumer cons = session.createConsumer("q" + i);
 
-            assertNotNull(msg);
+            for (int j = 3; j < 6; j++)
+            {
+               ClientMessage msg = cons.receive(5000);
 
-            assertEquals("str-" + j, msg.getStringProperty("id"));
+               assertNotNull(msg);
 
-            msg.acknowledge();
+               assertEquals("str-" + j, msg.getStringProperty("id"));
+
+               msg.acknowledge();
+            }
+
+            session.commit();
+            assertNull(cons.receive(500));
+
          }
 
-         session.commit();
-         assertNull(cons.receive(500));
+         session.close();
 
-      }
+         long timeout = System.currentTimeMillis() + 5000;
 
-      session.close();
+         while (System.currentTimeMillis() < timeout && server.getPagingManager().getPageStore(ADDRESS).isPaging())
+         {
+            Thread.sleep(100);
+         }
 
-      long timeout = System.currentTimeMillis() + 5000;
-
-      while (System.currentTimeMillis() < timeout && server.getPagingManager().getPageStore(ADDRESS).isPaging())
+         locator.close();
+      }
+      finally
       {
-         Thread.sleep(100);
+         server.stop();
       }
-
-      locator.close();
-
-      server.stop();
    }
 
    public void testTwoQueuesOneNoRouting() throws Exception
@@ -1275,6 +1282,40 @@
          bb.put(getSamplebyte(j));
       }
 
+      final AtomicBoolean running = new AtomicBoolean(true);
+
+      class TCount extends Thread
+      {
+         Queue queue;
+
+         TCount(Queue queue)
+         {
+            this.queue = queue;
+         }
+
+         public void run()
+         {
+            try
+            {
+               while (running.get())
+               {
+                  // log.info("Message count = " + queue.getMessageCount() + " on queue " + queue.getName());
+                  queue.getMessagesAdded();
+                  queue.getMessageCount();
+                  // log.info("Message added = " + queue.getMessagesAdded() + " on queue " + queue.getName());
+                  Thread.sleep(10);
+               }
+            }
+            catch (InterruptedException e)
+            {
+               log.info("Thread interrupted");
+            }
+         }
+      };
+
+      TCount tcount1 = null;
+      TCount tcount2 = null;
+
       try
       {
          {
@@ -1309,6 +1350,7 @@
             {
                if (i % 500 == 0)
                {
+                  log.info("Sent " + i + " messages");
                   session.commit();
                }
                message = session.createMessage(true);
@@ -1339,6 +1381,23 @@
                                new HashMap<String, AddressSettings>());
          server.start();
 
+         Queue queue1 = server.locateQueue(PagingTest.ADDRESS.concat("-1"));
+
+         Queue queue2 = server.locateQueue(PagingTest.ADDRESS.concat("-2"));
+
+         assertNotNull(queue1);
+
+         assertNotNull(queue2);
+
+         assertNotSame(queue1, queue2);
+
+         tcount1 = new TCount(queue1);
+
+         tcount2 = new TCount(queue2);
+
+         tcount1.start();
+         tcount2.start();
+
          ServerLocator locator = createInVMNonHALocator();
          final ClientSessionFactory sf2 = locator.createSessionFactory();
 
@@ -1375,8 +1434,14 @@
 
                         Assert.assertNotNull(message2);
 
-                        if (i % 1000 == 0)
+                        if (i % 100 == 0)
+                        {
+                           if (i % 5000 == 0)
+                           {
+                              log.info(addressToSubscribe + " consumed " + i + " messages");
+                           }
                            session.commit();
+                        }
 
                         try
                         {
@@ -1437,6 +1502,20 @@
       }
       finally
       {
+         running.set(false);
+
+         if (tcount1 != null)
+         {
+            tcount1.interrupt();
+            tcount1.join();
+         }
+
+         if (tcount2 != null)
+         {
+            tcount2.interrupt();
+            tcount2.join();
+         }
+
          try
          {
             server.stop();
@@ -2434,7 +2513,7 @@
 
          producerThread.start();
 
-         assertTrue(ready.await(10, TimeUnit.SECONDS));
+         assertTrue(ready.await(100, TimeUnit.SECONDS));
 
          ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
 
@@ -4100,6 +4179,146 @@
       }
    }
 
+   public void testTwoQueuesConsumeOneRestart() throws Exception
+   {
+      boolean persistentMessages = true;
+
+      clearData();
+
+      Configuration config = createDefaultConfig();
+
+      config.setJournalSyncNonTransactional(false);
+
+      HornetQServer server = createServer(true,
+                                          config,
+                                          PagingTest.PAGE_SIZE,
+                                          PagingTest.PAGE_MAX,
+                                          new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      final int messageSize = 1024;
+
+      final int numberOfMessages = 1000;
+
+      try
+      {
+         ServerLocator locator = createInVMNonHALocator();
+
+         locator.setClientFailureCheckPeriod(120000);
+         locator.setConnectionTTL(5000000);
+         locator.setCallTimeout(120000);
+
+         locator.setBlockOnNonDurableSend(true);
+         locator.setBlockOnDurableSend(true);
+         locator.setBlockOnAcknowledge(true);
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         ClientSession session = sf.createSession(false, false, false);
+
+         session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=1"), null, true);
+         session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=2"), null, true);
+
+         ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+         ClientMessage message = null;
+
+         byte[] body = new byte[messageSize];
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            message = session.createMessage(persistentMessages);
+
+            HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+            bodyLocal.writeBytes(body);
+
+            message.putIntProperty("propTest", i % 2 == 0 ? 1 : 2);
+
+            producer.send(message);
+            if (i % 1000 == 0)
+            {
+               session.commit();
+            }
+         }
+
+         session.commit();
+
+         session.start();
+
+         // ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS.concat("=1"));
+         //
+         // for (int i = 0; i < numberOfMessages; i++)
+         // {
+         // message = consumer.receive(500000);
+         // assertNotNull(message);
+         // message.acknowledge();
+         //
+         // // assertEquals(msg, message.getIntProperty("propTest").intValue());
+         //
+         // System.out.println("i = " + i + " msg = " + message.getIntProperty("propTest"));
+         // }
+         //
+         // session.commit();
+
+         // consumer.close();
+
+         session.deleteQueue(PagingTest.ADDRESS.concat("=1"));
+         // server.stop();
+         // server.start();
+
+         sf = locator.createSessionFactory();
+
+         session = sf.createSession(false, false, false);
+
+         session.start();
+
+         ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS.concat("=2"));
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            message = consumer.receive(500000);
+            assertNotNull(message);
+            message.acknowledge();
+
+            // assertEquals(msg, message.getIntProperty("propTest").intValue());
+
+            System.out.println("i = " + i + " msg = " + message.getIntProperty("propTest"));
+         }
+
+         session.commit();
+
+         assertNull(consumer.receiveImmediate());
+
+         consumer.close();
+
+         // It's async, so need to wait a bit for it happening
+         assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
+
+         server.stop();
+
+         server.start();
+
+         server.stop();
+         server.start();
+
+         sf.close();
+
+         locator.close();
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+   }
+
    public void testDLAOnLargeMessageAndPaging() throws Exception
    {
       clearData();
@@ -4316,7 +4535,7 @@
 
          pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS);
 
-         pgStoreAddress.getCursorProvier().getSubscription(server.locateQueue(ADDRESS).getID()).cleanupEntries();
+         pgStoreAddress.getCursorProvier().getSubscription(server.locateQueue(ADDRESS).getID()).cleanupEntries(false);
 
          pgStoreAddress.getCursorProvier().cleanup();
 
@@ -4455,7 +4674,7 @@
          for (int i = 0; i < 500; i++)
          {
             log.info("Received message " + i);
-            message = cons.receive(5000);
+            message = cons.receive(10000);
             assertNotNull(message);
             message.acknowledge();
 



More information about the hornetq-commits mailing list