[hornetq-commits] JBoss hornetq SVN: r9499 - in trunk: src/main/org/hornetq/core/persistence/impl/journal and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Aug 3 14:31:57 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-08-03 14:31:56 -0400 (Tue, 03 Aug 2010)
New Revision: 9499

Added:
   trunk/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java
Modified:
   trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
   trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
   trunk/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java
   trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
Log:
https://jira.jboss.org/browse/HORNETQ-468 - adding check on journal startup to delete unreferenced messaged

Modified: trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java	2010-08-03 13:49:15 UTC (rev 9498)
+++ trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java	2010-08-03 18:31:56 UTC (rev 9499)
@@ -135,7 +135,7 @@
     * @see org.hornetq.core.journal.SequentialFileFactory#releaseBuffer(java.nio.ByteBuffer)
     */
    @Override
-   public void releaseBuffer(final ByteBuffer buffer)
+   public synchronized void releaseBuffer(final ByteBuffer buffer)
    {
       AsynchronousFileImpl.destroyBuffer(buffer);
    }

Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2010-08-03 13:49:15 UTC (rev 9498)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2010-08-03 18:31:56 UTC (rev 9499)
@@ -17,9 +17,11 @@
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 
@@ -772,6 +774,9 @@
       List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
 
       Map<Long, ServerMessage> messages = new HashMap<Long, ServerMessage>();
+      
+      // used to identify messages that are not referenced
+      Set<Long> referencedMessages = new HashSet<Long>();
 
       JournalLoadInformation info = messageJournal.load(records,
                                                         preparedTransactions,
@@ -834,6 +839,8 @@
                {
                   throw new IllegalStateException("Cannot find message " + record.id);
                }
+               
+               referencedMessages.add(messageID);
 
                queueMessages.put(messageID, new AddMessageRecord(message));
 
@@ -1003,6 +1010,15 @@
             msg.decrementDelayDeletionCount();
          }
       }
+      
+      for (ServerMessage msg : messages.values())
+      {
+         if (!referencedMessages.contains(msg.getMessageID()))
+         {
+            log.info("Deleting unreferenced message id=" + msg.getMessageID() + " from the journal");
+            deleteMessage(msg.getMessageID());
+         }
+      }
 
       if (perfBlastPages != -1)
       {

Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-08-03 13:49:15 UTC (rev 9498)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-08-03 18:31:56 UTC (rev 9499)
@@ -1433,6 +1433,10 @@
 
             // also note then when this happens as part of a trasaction its the tx commt of the ack that is important
             // not this
+            
+            // Also note that this delete shouldn't sync to disk, or else we would build up the executor's queue
+            // as we can't delete each messaging with sync=true while adding messages transactionally.
+            // There is a startup check to remove non referenced messages case these deletes fail
             try
             {
                storageManager.deleteMessage(message.getMessageID());

Added: trunk/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java	2010-08-03 18:31:56 UTC (rev 9499)
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.persistence;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.arjuna.ats.internal.arjuna.template.HashList;
+
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.persistence.GroupingInfo;
+import org.hornetq.core.persistence.QueueBindingInfo;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A DeleteMessagesOnStartupTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class DeleteMessagesOnStartupTest extends StorageManagerTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   volatile boolean deleteMessages = false;
+
+   ArrayList<Long> deletedMessage = new ArrayList<Long>();
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testDeleteMessagesOnStartup() throws Exception
+   {
+      createStorage();
+
+      ServerMessage msg = new ServerMessageImpl(1, 100);
+
+      journal.storeMessage(msg);
+
+      journal.storeMessage(new ServerMessageImpl(2, 100));
+
+      journal.storeMessage(new ServerMessageImpl(3, 100));
+      
+      journal.storeReference(1, 1, true);
+
+      journal.stop();
+
+      journal.start();
+
+      Map<Long, Queue> queues = new HashMap<Long, Queue>();
+
+      journal.loadMessageJournal(new FakePostOffice(), null, null, queues, null);
+
+      journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
+      
+      assertEquals(2, deletedMessage.size());
+      
+      assertEquals(new Long(2), deletedMessage.get(0));
+      
+      assertEquals(new Long(3), deletedMessage.get(1));
+   }
+
+   protected JournalStorageManager createJournalStorageManager(Configuration configuration)
+   {
+      return new JournalStorageManager(configuration, execFactory)
+      {
+         public void deleteMessage(final long messageID) throws Exception
+         {
+            System.out.println("message : " + messageID);
+            deletedMessage.add(messageID);
+            super.deleteMessage(messageID);
+         }
+
+      };
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java	2010-08-03 13:49:15 UTC (rev 9498)
+++ trunk/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java	2010-08-03 18:31:56 UTC (rev 9499)
@@ -24,7 +24,6 @@
 import org.hornetq.core.persistence.GroupingInfo;
 import org.hornetq.core.persistence.QueueBindingInfo;
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
-import org.hornetq.core.server.JournalType;
 import org.hornetq.core.server.Queue;
 import org.hornetq.jms.persistence.JMSStorageManager;
 import org.hornetq.jms.persistence.impl.journal.JMSJournalStorageManagerImpl;
@@ -121,7 +120,7 @@
    {
       Configuration configuration = createDefaultConfig();
 
-      journal = new JournalStorageManager(configuration, execFactory);
+      journal = createJournalStorageManager(configuration);
 
       journal.start();
 
@@ -133,6 +132,14 @@
    }
 
    /**
+    * @param configuration
+    */
+   protected JournalStorageManager createJournalStorageManager(Configuration configuration)
+   {
+      return new JournalStorageManager(configuration, execFactory);
+   }
+
+   /**
     * @return
     * @throws Exception
     */

Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java	2010-08-03 13:49:15 UTC (rev 9498)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java	2010-08-03 18:31:56 UTC (rev 9499)
@@ -23,6 +23,7 @@
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.RoutingContext;
 import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.MessageReferenceImpl;
 import org.hornetq.core.transaction.Transaction;
 
 public class FakePostOffice implements PostOffice
@@ -162,8 +163,7 @@
 
    public MessageReference reroute(final ServerMessage message, final Queue queue, final Transaction tx) throws Exception
    {
-      // TODO Auto-generated method stub
-      return null;
+      return new MessageReferenceImpl();
    }
 
    public void route(final ServerMessage message, final Transaction tx) throws Exception



More information about the hornetq-commits mailing list