[hornetq-commits] JBoss hornetq SVN: r9499 - in trunk: src/main/org/hornetq/core/persistence/impl/journal and 3 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Aug 3 14:31:57 EDT 2010
Author: clebert.suconic at jboss.com
Date: 2010-08-03 14:31:56 -0400 (Tue, 03 Aug 2010)
New Revision: 9499
Added:
trunk/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java
Modified:
trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
Log:
https://jira.jboss.org/browse/HORNETQ-468 - adding check on journal startup to delete unreferenced messaged
Modified: trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2010-08-03 13:49:15 UTC (rev 9498)
+++ trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2010-08-03 18:31:56 UTC (rev 9499)
@@ -135,7 +135,7 @@
* @see org.hornetq.core.journal.SequentialFileFactory#releaseBuffer(java.nio.ByteBuffer)
*/
@Override
- public void releaseBuffer(final ByteBuffer buffer)
+ public synchronized void releaseBuffer(final ByteBuffer buffer)
{
AsynchronousFileImpl.destroyBuffer(buffer);
}
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-08-03 13:49:15 UTC (rev 9498)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-08-03 18:31:56 UTC (rev 9499)
@@ -17,9 +17,11 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
@@ -772,6 +774,9 @@
List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
Map<Long, ServerMessage> messages = new HashMap<Long, ServerMessage>();
+
+ // used to identify messages that are not referenced
+ Set<Long> referencedMessages = new HashSet<Long>();
JournalLoadInformation info = messageJournal.load(records,
preparedTransactions,
@@ -834,6 +839,8 @@
{
throw new IllegalStateException("Cannot find message " + record.id);
}
+
+ referencedMessages.add(messageID);
queueMessages.put(messageID, new AddMessageRecord(message));
@@ -1003,6 +1010,15 @@
msg.decrementDelayDeletionCount();
}
}
+
+ for (ServerMessage msg : messages.values())
+ {
+ if (!referencedMessages.contains(msg.getMessageID()))
+ {
+ log.info("Deleting unreferenced message id=" + msg.getMessageID() + " from the journal");
+ deleteMessage(msg.getMessageID());
+ }
+ }
if (perfBlastPages != -1)
{
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-08-03 13:49:15 UTC (rev 9498)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-08-03 18:31:56 UTC (rev 9499)
@@ -1433,6 +1433,10 @@
// also note then when this happens as part of a trasaction its the tx commt of the ack that is important
// not this
+
+ // Also note that this delete shouldn't sync to disk, or else we would build up the executor's queue
+ // as we can't delete each messaging with sync=true while adding messages transactionally.
+ // There is a startup check to remove non referenced messages case these deletes fail
try
{
storageManager.deleteMessage(message.getMessageID());
Added: trunk/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java 2010-08-03 18:31:56 UTC (rev 9499)
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.persistence;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.arjuna.ats.internal.arjuna.template.HashList;
+
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.persistence.GroupingInfo;
+import org.hornetq.core.persistence.QueueBindingInfo;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A DeleteMessagesOnStartupTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class DeleteMessagesOnStartupTest extends StorageManagerTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ volatile boolean deleteMessages = false;
+
+ ArrayList<Long> deletedMessage = new ArrayList<Long>();
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testDeleteMessagesOnStartup() throws Exception
+ {
+ createStorage();
+
+ ServerMessage msg = new ServerMessageImpl(1, 100);
+
+ journal.storeMessage(msg);
+
+ journal.storeMessage(new ServerMessageImpl(2, 100));
+
+ journal.storeMessage(new ServerMessageImpl(3, 100));
+
+ journal.storeReference(1, 1, true);
+
+ journal.stop();
+
+ journal.start();
+
+ Map<Long, Queue> queues = new HashMap<Long, Queue>();
+
+ journal.loadMessageJournal(new FakePostOffice(), null, null, queues, null);
+
+ journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
+
+ assertEquals(2, deletedMessage.size());
+
+ assertEquals(new Long(2), deletedMessage.get(0));
+
+ assertEquals(new Long(3), deletedMessage.get(1));
+ }
+
+ protected JournalStorageManager createJournalStorageManager(Configuration configuration)
+ {
+ return new JournalStorageManager(configuration, execFactory)
+ {
+ public void deleteMessage(final long messageID) throws Exception
+ {
+ System.out.println("message : " + messageID);
+ deletedMessage.add(messageID);
+ super.deleteMessage(messageID);
+ }
+
+ };
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java 2010-08-03 13:49:15 UTC (rev 9498)
+++ trunk/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java 2010-08-03 18:31:56 UTC (rev 9499)
@@ -24,7 +24,6 @@
import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
-import org.hornetq.core.server.JournalType;
import org.hornetq.core.server.Queue;
import org.hornetq.jms.persistence.JMSStorageManager;
import org.hornetq.jms.persistence.impl.journal.JMSJournalStorageManagerImpl;
@@ -121,7 +120,7 @@
{
Configuration configuration = createDefaultConfig();
- journal = new JournalStorageManager(configuration, execFactory);
+ journal = createJournalStorageManager(configuration);
journal.start();
@@ -133,6 +132,14 @@
}
/**
+ * @param configuration
+ */
+ protected JournalStorageManager createJournalStorageManager(Configuration configuration)
+ {
+ return new JournalStorageManager(configuration, execFactory);
+ }
+
+ /**
* @return
* @throws Exception
*/
Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java 2010-08-03 13:49:15 UTC (rev 9498)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java 2010-08-03 18:31:56 UTC (rev 9499)
@@ -23,6 +23,7 @@
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.MessageReferenceImpl;
import org.hornetq.core.transaction.Transaction;
public class FakePostOffice implements PostOffice
@@ -162,8 +163,7 @@
public MessageReference reroute(final ServerMessage message, final Queue queue, final Transaction tx) throws Exception
{
- // TODO Auto-generated method stub
- return null;
+ return new MessageReferenceImpl();
}
public void route(final ServerMessage message, final Transaction tx) throws Exception
More information about the hornetq-commits
mailing list