JBoss hornetq SVN: r9500 - trunk/tests/src/org/hornetq/tests/integration/persistence.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-03 14:39:46 -0400 (Tue, 03 Aug 2010)
New Revision: 9500
Modified:
trunk/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-468 - improving the test. Just a tweak
Modified: trunk/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java 2010-08-03 18:31:56 UTC (rev 9499)
+++ trunk/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java 2010-08-03 18:39:46 UTC (rev 9500)
@@ -61,9 +61,10 @@
journal.storeMessage(msg);
- journal.storeMessage(new ServerMessageImpl(2, 100));
-
- journal.storeMessage(new ServerMessageImpl(3, 100));
+ for (int i = 2; i < 100; i++)
+ {
+ journal.storeMessage(new ServerMessageImpl(i, 100));
+ }
journal.storeReference(1, 1, true);
@@ -77,11 +78,12 @@
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
- assertEquals(2, deletedMessage.size());
+ assertEquals(98, deletedMessage.size());
- assertEquals(new Long(2), deletedMessage.get(0));
-
- assertEquals(new Long(3), deletedMessage.get(1));
+ for (Long messageID : deletedMessage)
+ {
+ assertTrue("messageID = " + messageID, messageID.longValue() >= 2 && messageID <= 99);
+ }
}
protected JournalStorageManager createJournalStorageManager(Configuration configuration)
14 years, 5 months
JBoss hornetq SVN: r9499 - in trunk: src/main/org/hornetq/core/persistence/impl/journal and 3 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-03 14:31:56 -0400 (Tue, 03 Aug 2010)
New Revision: 9499
Added:
trunk/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java
Modified:
trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
Log:
https://jira.jboss.org/browse/HORNETQ-468 - adding check on journal startup to delete unreferenced messaged
Modified: trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2010-08-03 13:49:15 UTC (rev 9498)
+++ trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2010-08-03 18:31:56 UTC (rev 9499)
@@ -135,7 +135,7 @@
* @see org.hornetq.core.journal.SequentialFileFactory#releaseBuffer(java.nio.ByteBuffer)
*/
@Override
- public void releaseBuffer(final ByteBuffer buffer)
+ public synchronized void releaseBuffer(final ByteBuffer buffer)
{
AsynchronousFileImpl.destroyBuffer(buffer);
}
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-08-03 13:49:15 UTC (rev 9498)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-08-03 18:31:56 UTC (rev 9499)
@@ -17,9 +17,11 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
@@ -772,6 +774,9 @@
List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
Map<Long, ServerMessage> messages = new HashMap<Long, ServerMessage>();
+
+ // used to identify messages that are not referenced
+ Set<Long> referencedMessages = new HashSet<Long>();
JournalLoadInformation info = messageJournal.load(records,
preparedTransactions,
@@ -834,6 +839,8 @@
{
throw new IllegalStateException("Cannot find message " + record.id);
}
+
+ referencedMessages.add(messageID);
queueMessages.put(messageID, new AddMessageRecord(message));
@@ -1003,6 +1010,15 @@
msg.decrementDelayDeletionCount();
}
}
+
+ for (ServerMessage msg : messages.values())
+ {
+ if (!referencedMessages.contains(msg.getMessageID()))
+ {
+ log.info("Deleting unreferenced message id=" + msg.getMessageID() + " from the journal");
+ deleteMessage(msg.getMessageID());
+ }
+ }
if (perfBlastPages != -1)
{
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-08-03 13:49:15 UTC (rev 9498)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-08-03 18:31:56 UTC (rev 9499)
@@ -1433,6 +1433,10 @@
// also note then when this happens as part of a trasaction its the tx commt of the ack that is important
// not this
+
+ // Also note that this delete shouldn't sync to disk, or else we would build up the executor's queue
+ // as we can't delete each messaging with sync=true while adding messages transactionally.
+ // There is a startup check to remove non referenced messages case these deletes fail
try
{
storageManager.deleteMessage(message.getMessageID());
Added: trunk/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java 2010-08-03 18:31:56 UTC (rev 9499)
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.persistence;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.arjuna.ats.internal.arjuna.template.HashList;
+
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.persistence.GroupingInfo;
+import org.hornetq.core.persistence.QueueBindingInfo;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A DeleteMessagesOnStartupTest
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class DeleteMessagesOnStartupTest extends StorageManagerTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ volatile boolean deleteMessages = false;
+
+ ArrayList<Long> deletedMessage = new ArrayList<Long>();
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testDeleteMessagesOnStartup() throws Exception
+ {
+ createStorage();
+
+ ServerMessage msg = new ServerMessageImpl(1, 100);
+
+ journal.storeMessage(msg);
+
+ journal.storeMessage(new ServerMessageImpl(2, 100));
+
+ journal.storeMessage(new ServerMessageImpl(3, 100));
+
+ journal.storeReference(1, 1, true);
+
+ journal.stop();
+
+ journal.start();
+
+ Map<Long, Queue> queues = new HashMap<Long, Queue>();
+
+ journal.loadMessageJournal(new FakePostOffice(), null, null, queues, null);
+
+ journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
+
+ assertEquals(2, deletedMessage.size());
+
+ assertEquals(new Long(2), deletedMessage.get(0));
+
+ assertEquals(new Long(3), deletedMessage.get(1));
+ }
+
+ protected JournalStorageManager createJournalStorageManager(Configuration configuration)
+ {
+ return new JournalStorageManager(configuration, execFactory)
+ {
+ public void deleteMessage(final long messageID) throws Exception
+ {
+ System.out.println("message : " + messageID);
+ deletedMessage.add(messageID);
+ super.deleteMessage(messageID);
+ }
+
+ };
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java 2010-08-03 13:49:15 UTC (rev 9498)
+++ trunk/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java 2010-08-03 18:31:56 UTC (rev 9499)
@@ -24,7 +24,6 @@
import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
-import org.hornetq.core.server.JournalType;
import org.hornetq.core.server.Queue;
import org.hornetq.jms.persistence.JMSStorageManager;
import org.hornetq.jms.persistence.impl.journal.JMSJournalStorageManagerImpl;
@@ -121,7 +120,7 @@
{
Configuration configuration = createDefaultConfig();
- journal = new JournalStorageManager(configuration, execFactory);
+ journal = createJournalStorageManager(configuration);
journal.start();
@@ -133,6 +132,14 @@
}
/**
+ * @param configuration
+ */
+ protected JournalStorageManager createJournalStorageManager(Configuration configuration)
+ {
+ return new JournalStorageManager(configuration, execFactory);
+ }
+
+ /**
* @return
* @throws Exception
*/
Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java 2010-08-03 13:49:15 UTC (rev 9498)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java 2010-08-03 18:31:56 UTC (rev 9499)
@@ -23,6 +23,7 @@
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.MessageReferenceImpl;
import org.hornetq.core.transaction.Transaction;
public class FakePostOffice implements PostOffice
@@ -162,8 +163,7 @@
public MessageReference reroute(final ServerMessage message, final Queue queue, final Transaction tx) throws Exception
{
- // TODO Auto-generated method stub
- return null;
+ return new MessageReferenceImpl();
}
public void route(final ServerMessage message, final Transaction tx) throws Exception
14 years, 5 months
JBoss hornetq SVN: r9498 - branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-03 09:49:15 -0400 (Tue, 03 Aug 2010)
New Revision: 9498
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/DiscoveryEntry.java
Log:
add DiscoveryEntry#toString() method
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/DiscoveryEntry.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/DiscoveryEntry.java 2010-08-03 13:45:12 UTC (rev 9497)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/DiscoveryEntry.java 2010-08-03 13:49:15 UTC (rev 9498)
@@ -50,4 +50,10 @@
{
return lastUpdate;
}
+
+ @Override
+ public String toString()
+ {
+ return "DiscoveryEntry[nodeID=" + nodeID + ", connector=" + connector + ", lastUpdate=" + lastUpdate + "]";
+ }
}
14 years, 5 months
JBoss hornetq SVN: r9497 - branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-03 09:45:12 -0400 (Tue, 03 Aug 2010)
New Revision: 9497
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
Log:
HA refactoring
* make sure we expire the connectors even when the packet is coming from the own node
(otherwise the last node would never expire the 2nd to last node and callListeners would not be triggered)
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2010-08-03 13:36:54 UTC (rev 9496)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2010-08-03 13:45:12 UTC (rev 9497)
@@ -314,6 +314,11 @@
if (nodeID.equals(originatingNodeID))
{
+ if (checkExpiration())
+ {
+ callListeners();
+ }
+
// Ignore traffic from own node
continue;
}
@@ -340,23 +345,7 @@
}
}
- long now = System.currentTimeMillis();
-
- Iterator<Map.Entry<String, DiscoveryEntry>> iter = connectors.entrySet().iterator();
-
- // Weed out any expired connectors
-
- while (iter.hasNext())
- {
- Map.Entry<String, DiscoveryEntry> entry = iter.next();
-
- if (entry.getValue().getLastUpdate() + timeout <= now)
- {
- iter.remove();
-
- changed = true;
- }
- }
+ changed = changed || checkExpiration();
}
if (changed)
@@ -409,4 +398,28 @@
}
}
+ private boolean checkExpiration()
+ {
+ boolean changed = false;
+ long now = System.currentTimeMillis();
+
+ Iterator<Map.Entry<String, DiscoveryEntry>> iter = connectors.entrySet().iterator();
+
+ // Weed out any expired connectors
+
+ while (iter.hasNext())
+ {
+ Map.Entry<String, DiscoveryEntry> entry = iter.next();
+
+ if (entry.getValue().getLastUpdate() + timeout <= now)
+ {
+ System.out.println("remove " + entry);
+ iter.remove();
+
+ changed = true;
+ }
+ }
+
+ return changed;
+ }
}
14 years, 5 months
JBoss hornetq SVN: r9496 - branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-03 09:36:54 -0400 (Tue, 03 Aug 2010)
New Revision: 9496
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HA refactoring
* make sure the journal directory exists before trying to create file in it
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-08-03 03:23:01 UTC (rev 9495)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-08-03 13:36:54 UTC (rev 9496)
@@ -297,22 +297,8 @@
{
try
{
- File journalDir = new File(configuration.getJournalDirectory());
+ checkJournalDirectory();
- if (!journalDir.exists())
- {
- if (configuration.isCreateJournalDir())
- {
- journalDir.mkdirs();
- }
- else
- {
- throw new IllegalArgumentException("Directory " + journalDir +
- " does not exist and will not create it");
- }
- }
-
-
// We now load the node id file, creating it, if it doesn't exist yet
File nodeIDFile = new File(configuration.getJournalDirectory(), "node.id");
@@ -394,21 +380,8 @@
{
log.info("Waiting to obtain live lock");
- File journalDir = new File(configuration.getJournalDirectory());
+ checkJournalDirectory();
- if (!journalDir.exists())
- {
- if (configuration.isCreateJournalDir())
- {
- journalDir.mkdirs();
- }
- else
- {
- throw new IllegalArgumentException("Directory " + journalDir +
- " does not exist and will not create it");
- }
- }
-
liveLock = createLockFile("live.lock", configuration.getJournalDirectory());
liveLock.lock();
@@ -536,6 +509,8 @@
{
try
{
+ checkJournalDirectory();
+
backupLock = createLockFile("backup.lock", configuration.getJournalDirectory());
log.info("Waiting to become backup node");
@@ -1582,11 +1557,14 @@
// We do this at the end - we don't want things like MDBs or other connections connecting to a backup server until
// it is activated
+
+ remotingService.start();
+
+ System.out.println("remoting service is started");
clusterManager.start();
initialised = true;
- remotingService.start();
}
/**
@@ -1893,7 +1871,28 @@
});
}
+
+ /**
+ * Check if journal directory exists or create it (if configured to do so)
+ */
+ private void checkJournalDirectory()
+ {
+ File journalDir = new File(configuration.getJournalDirectory());
+ if (!journalDir.exists())
+ {
+ if (configuration.isCreateJournalDir())
+ {
+ journalDir.mkdirs();
+ }
+ else
+ {
+ throw new IllegalArgumentException("Directory " + journalDir +
+ " does not exist and will not be created");
+ }
+ }
+ }
+
// Inner classes
// --------------------------------------------------------------------------------
}
14 years, 5 months
JBoss hornetq SVN: r9495 - trunk/src/main/org/hornetq/core/persistence/impl/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-02 23:23:01 -0400 (Mon, 02 Aug 2010)
New Revision: 9495
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
https://jira.jboss.org/browse/HORNETQ-468 - removing sync on deleteMessage
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-08-03 02:42:47 UTC (rev 9494)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-08-03 03:23:01 UTC (rev 9495)
@@ -511,7 +511,11 @@
public void deleteMessage(final long messageID) throws Exception
{
- messageJournal.appendDeleteRecord(messageID, syncNonTransactional, getContext(syncNonTransactional));
+ // Messages are deleted on postACK, one after another.
+ // If these deletes are synchronized, we would build up messages on the Executor
+ // increasing chances of losing deletes.
+ // The StorageManager should verify messages without references
+ messageJournal.appendDeleteRecord(messageID, false, getContext(false));
}
public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception
14 years, 5 months
JBoss hornetq SVN: r9494 - trunk/src/main/org/hornetq/core/paging/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-02 22:42:47 -0400 (Mon, 02 Aug 2010)
New Revision: 9494
Modified:
trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java
Log:
https://jira.jboss.org/browse/HORNETQ-467 - fixing leakage caused by the JDK on NIO
Modified: trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java 2010-07-30 18:36:17 UTC (rev 9493)
+++ trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java 2010-08-03 02:42:47 UTC (rev 9494)
@@ -13,6 +13,7 @@
package org.hornetq.core.paging.impl;
+import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -94,7 +95,9 @@
{
ArrayList<PagedMessage> messages = new ArrayList<PagedMessage>();
- ByteBuffer buffer2 = fileFactory.newBuffer((int)file.size());
+ // Using direct buffer, as described on https://jira.jboss.org/browse/HORNETQ-467
+ ByteBuffer buffer2 = ByteBuffer.allocateDirect((int)file.size());
+
file.position(0);
file.read(buffer2);
14 years, 5 months