[hornetq-commits] JBoss hornetq SVN: r11938 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/paging/cursor/impl and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Dec 23 09:05:27 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-12-23 09:05:26 -0500 (Fri, 23 Dec 2011)
New Revision: 11938

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
JBPAPP-7817 - removing unreferenced records during deletion of a destination

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java	2011-12-22 14:40:03 UTC (rev 11937)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java	2011-12-23 14:05:26 UTC (rev 11938)
@@ -58,7 +58,7 @@
 
    void scheduleCleanupCheck();
 
-   void cleanupEntries() throws Exception;
+   void cleanupEntries(boolean completeDelete) throws Exception;
 
    void disableAutoCleanup();
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java	2011-12-22 14:40:03 UTC (rev 11937)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java	2011-12-23 14:05:26 UTC (rev 11938)
@@ -44,5 +44,8 @@
     * @param variance
     */
    void addInc(long id, int variance);
+   
+   // used when deleting the counter
+   void delete() throws Exception;
 
 }
\ No newline at end of file

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java	2011-12-22 14:40:03 UTC (rev 11937)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java	2011-12-23 14:05:26 UTC (rev 11938)
@@ -175,6 +175,25 @@
       }
 
    }
+   
+   public void delete() throws Exception
+   {
+      synchronized (this)
+      {
+         long tx = storage.generateUniqueID();
+         for (Long record : incrementRecords)
+         {
+            storage.deleteIncrementRecord(tx, record.longValue());
+         }
+         
+         if (recordID >= 0)
+         {
+            storage.deletePageCounter(tx, this.recordID);
+         }
+         
+         storage.commit(tx);
+      }
+   }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageSubscriptionCounter#loadInc(long, int)

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2011-12-22 14:40:03 UTC (rev 11937)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2011-12-23 14:05:26 UTC (rev 11938)
@@ -201,7 +201,7 @@
             {
                try
                {
-                  cleanupEntries();
+                  cleanupEntries(false);
                }
                catch (Exception e)
                {
@@ -215,8 +215,12 @@
    /** 
     * It will cleanup all the records for completed pages
     * */
-   public void cleanupEntries() throws Exception
+   public void cleanupEntries(final boolean completeDelete) throws Exception
    {
+      if (completeDelete)
+      {
+         counter.delete();
+      }
       Transaction tx = new TransactionImpl(store);
 
       boolean persist = false;
@@ -292,7 +296,10 @@
                      }
                   }
 
-                  cursorProvider.scheduleCleanup();
+                  if (!completeDelete) 
+                  {
+                     cursorProvider.scheduleCleanup();
+                  }
                }
             });
          }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-12-22 14:40:03 UTC (rev 11937)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-12-23 14:05:26 UTC (rev 11938)
@@ -1147,7 +1147,9 @@
                }
                else
                {
-                  log.warn("Can't find queue " + encoding.queueID + " while reloading ACKNOWLEDGE_CURSOR");
+                  log.info("Can't find queue " + encoding.queueID + " while reloading ACKNOWLEDGE_CURSOR, deleting record now");
+                  messageJournal.appendDeleteRecord(record.id, false);
+                  
                }
 
                break;
@@ -1166,7 +1168,8 @@
                }
                else
                {
-                  log.warn("Can't find queue " + encoding.queueID + " while reloading ACKNOWLEDGE_CURSOR");
+                  log.info("Can't find queue " + encoding.queueID + " while reloading PAGE_CURSOR_COUNTER_VALUE, deleting record now");
+                  messageJournal.appendDeleteRecord(record.id, false);
                }
 
                break;
@@ -1186,7 +1189,8 @@
                }
                else
                {
-                  log.warn("Can't find queue " + encoding.queueID + " while reloading ACKNOWLEDGE_CURSOR");
+                  log.info("Can't find queue " + encoding.queueID + " while reloading PAGE_CURSOR_COUNTER_INC, deleting record now");
+                  messageJournal.appendDeleteRecord(record.id, false);
                }
 
                break;

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-12-22 14:40:03 UTC (rev 11937)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-12-23 14:05:26 UTC (rev 11938)
@@ -977,11 +977,6 @@
 
       Queue queue = (Queue)binding.getBindable();
 
-      if (queue.getPageSubscription() != null)
-      {
-         queue.getPageSubscription().close();
-      }
-
       if (queue.getConsumerCount() != 0)
       {
          throw new HornetQException(HornetQException.ILLEGAL_STATE, "Cannot delete queue " + queue.getName() +
@@ -1004,14 +999,27 @@
          }
       }
 
+      postOffice.removeBinding(queueName);
+
       queue.deleteAllReferences();
 
       if (queue.isDurable())
       {
          storageManager.deleteQueueBinding(queue.getID());
       }
+      
 
-      postOffice.removeBinding(queueName);
+      if (queue.getPageSubscription() != null)
+      {
+         queue.getPageSubscription().close();
+      }
+      
+      PageSubscription subs = queue.getPageSubscription();
+      
+      if (subs != null)
+      {
+         subs.cleanupEntries(true);
+      }
    }
 
    public synchronized void registerActivateCallback(final ActivateCallback callback)

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2011-12-22 14:40:03 UTC (rev 11937)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2011-12-23 14:05:26 UTC (rev 11938)
@@ -121,7 +121,7 @@
    protected void tearDown() throws Exception
    {
       locator.close();
-      
+
       locator = null;
 
       super.tearDown();
@@ -368,7 +368,6 @@
 
       config.setJournalSyncNonTransactional(false);
 
-      
       HornetQServer server = createServer(true,
                                           config,
                                           PagingTest.PAGE_SIZE,
@@ -429,25 +428,25 @@
             }
          }
          session.commit();
-         
+
          session.start();
-         
+
          ClientConsumer cons = session.createConsumer(ADDRESS);
-         
-         for (int i = 0 ; i < numberOfMessages; i++)
+
+         for (int i = 0; i < numberOfMessages; i++)
          {
             message = cons.receive(5000);
             assertNotNull(message);
             message.acknowledge();
-            
+
             if (i % 10 == 0)
             {
                session.commit();
             }
          }
-         
+
          session.commit();
-         
+
       }
       finally
       {
@@ -4283,6 +4282,146 @@
       }
    }
 
+   public void testTwoQueuesConsumeOneRestart() throws Exception
+   {
+      boolean persistentMessages = true;
+
+      clearData();
+
+      Configuration config = createDefaultConfig();
+
+      config.setJournalSyncNonTransactional(false);
+
+      HornetQServer server = createServer(true,
+                                          config,
+                                          PagingTest.PAGE_SIZE,
+                                          PagingTest.PAGE_MAX,
+                                          new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      final int messageSize = 1024;
+
+      final int numberOfMessages = 1000;
+
+      try
+      {
+         ServerLocator locator = createInVMNonHALocator();
+
+         locator.setClientFailureCheckPeriod(120000);
+         locator.setConnectionTTL(5000000);
+         locator.setCallTimeout(120000);
+
+         locator.setBlockOnNonDurableSend(true);
+         locator.setBlockOnDurableSend(true);
+         locator.setBlockOnAcknowledge(true);
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         ClientSession session = sf.createSession(false, false, false);
+
+         session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=1"), null, true);
+         session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=2"), null, true);
+
+         ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+         ClientMessage message = null;
+
+         byte[] body = new byte[messageSize];
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            message = session.createMessage(persistentMessages);
+
+            HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+            bodyLocal.writeBytes(body);
+
+            message.putIntProperty("propTest", i % 2 == 0 ? 1 : 2);
+
+            producer.send(message);
+            if (i % 1000 == 0)
+            {
+               session.commit();
+            }
+         }
+
+         session.commit();
+
+         session.start();
+
+         // ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS.concat("=1"));
+         //
+         // for (int i = 0; i < numberOfMessages; i++)
+         // {
+         // message = consumer.receive(500000);
+         // assertNotNull(message);
+         // message.acknowledge();
+         //
+         // // assertEquals(msg, message.getIntProperty("propTest").intValue());
+         //
+         // System.out.println("i = " + i + " msg = " + message.getIntProperty("propTest"));
+         // }
+         //
+         // session.commit();
+
+         // consumer.close();
+
+         session.deleteQueue(PagingTest.ADDRESS.concat("=1"));
+         // server.stop();
+         // server.start();
+
+         sf = locator.createSessionFactory();
+
+         session = sf.createSession(false, false, false);
+
+         session.start();
+
+         ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS.concat("=2"));
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            message = consumer.receive(500000);
+            assertNotNull(message);
+            message.acknowledge();
+
+            // assertEquals(msg, message.getIntProperty("propTest").intValue());
+
+            System.out.println("i = " + i + " msg = " + message.getIntProperty("propTest"));
+         }
+
+         session.commit();
+
+         assertNull(consumer.receiveImmediate());
+
+         consumer.close();
+
+         // It's async, so need to wait a bit for it happening
+         assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
+
+         server.stop();
+
+         server.start();
+
+         server.stop();
+         server.start();
+
+         sf.close();
+
+         locator.close();
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+   }
+
    public void testDLAOnLargeMessageAndPaging() throws Exception
    {
       clearData();
@@ -4499,7 +4638,7 @@
 
          pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS);
 
-         pgStoreAddress.getCursorProvier().getSubscription(server.locateQueue(ADDRESS).getID()).cleanupEntries();
+         pgStoreAddress.getCursorProvier().getSubscription(server.locateQueue(ADDRESS).getID()).cleanupEntries(false);
 
          pgStoreAddress.getCursorProvier().cleanup();
 



More information about the hornetq-commits mailing list