[jboss-cvs] JBoss Messaging SVN: r7539 - in trunk: tests/src/org/jboss/messaging/tests/integration/client and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jul 7 19:25:08 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-07-07 19:25:08 -0400 (Tue, 07 Jul 2009)
New Revision: 7539
Added:
trunk/tests/src/org/jboss/messaging/tests/stress/journal/LargeJournalStressTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/client/CompactingTest.java
Log:
JBMESSAGING-1678 - Fixing OME on loading a large number of files
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-07-07 23:20:21 UTC (rev 7538)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-07-07 23:25:08 UTC (rev 7539)
@@ -1344,6 +1344,8 @@
{
final Set<Long> recordsToDelete = new HashSet<Long>();
final List<RecordInfo> records = new ArrayList<RecordInfo>();
+
+ final int DELETE_FLUSH = 20000;
long maxID = load(new LoaderCallback()
{
@@ -1365,6 +1367,24 @@
public void deleteRecord(final long id)
{
recordsToDelete.add(id);
+
+ // Clean up when the list is too large, or it won't be possible to load large sets of files
+ // Done as part of JBMESSAGING-1678
+ if (recordsToDelete.size() == DELETE_FLUSH)
+ {
+ Iterator<RecordInfo> iter = records.iterator();
+ while (iter.hasNext())
+ {
+ RecordInfo record = iter.next();
+
+ if (recordsToDelete.contains(record.id))
+ {
+ iter.remove();
+ }
+ }
+
+ recordsToDelete.clear();
+ }
}
});
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/CompactingTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/CompactingTest.java 2009-07-07 23:20:21 UTC (rev 7538)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/CompactingTest.java 2009-07-07 23:25:08 UTC (rev 7539)
@@ -22,7 +22,6 @@
package org.jboss.messaging.tests.integration.client;
-import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
@@ -35,8 +34,6 @@
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.journal.SequentialFile;
-import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
import org.jboss.messaging.core.server.JournalType;
import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.tests.util.ServiceTestBase;
Added: trunk/tests/src/org/jboss/messaging/tests/stress/journal/LargeJournalStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/journal/LargeJournalStressTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/journal/LargeJournalStressTest.java 2009-07-07 23:25:08 UTC (rev 7539)
@@ -0,0 +1,338 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.stress.journal;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.messaging.core.buffers.ChannelBuffers;
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.server.JournalType;
+import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+
+/**
+ * A LargeJournalStressTest
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class LargeJournalStressTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private static final String AD1 = "ad1";
+
+ private static final String AD2 = "ad2";
+
+ private static final String Q1 = "q1";
+
+ private static final String Q2 = "q2";
+
+ private MessagingServer server;
+
+ private ClientSessionFactory sf;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testMultiProducerAndCompactAIO() throws Throwable
+ {
+ internalTestMultiProducer(JournalType.ASYNCIO);
+ }
+
+ public void testMultiProducerAndCompactNIO() throws Throwable
+ {
+ internalTestMultiProducer(JournalType.NIO);
+ }
+
+ public void internalTestMultiProducer(JournalType journalType) throws Throwable
+ {
+
+ setupServer(journalType);
+
+ final AtomicInteger numberOfMessages = new AtomicInteger(0);
+ final int SLOW_INTERVAL = 25000;
+ final int NUMBER_OF_FAST_MESSAGES = SLOW_INTERVAL * 50;
+
+ final CountDownLatch latchReady = new CountDownLatch(2);
+ final CountDownLatch latchStart = new CountDownLatch(1);
+
+ class FastProducer extends Thread
+ {
+ Throwable e;
+
+ FastProducer()
+ {
+ super("Fast-Thread");
+ }
+
+ public void run()
+ {
+ ClientSession session = null;
+ ClientSession sessionSlow = null;
+ latchReady.countDown();
+ try
+ {
+ latchStart.await();
+ session = sf.createSession(true, true);
+ sessionSlow = sf.createSession(false, false);
+ ClientProducer prod = session.createProducer(AD2);
+ ClientProducer slowProd = sessionSlow.createProducer(AD1);
+ for (int i = 0; i < NUMBER_OF_FAST_MESSAGES; i++)
+ {
+ if (i % SLOW_INTERVAL == 0)
+ {
+ System.out.println("Sending slow message, msgs = " + i + " slowMessages = " + numberOfMessages.get());
+
+ if (numberOfMessages.incrementAndGet() % 5 == 0)
+ {
+ sessionSlow.commit();
+ }
+ slowProd.send(session.createClientMessage(true));
+ }
+ ClientMessage msg = session.createClientMessage(true);
+ msg.setBody(ChannelBuffers.wrappedBuffer(new byte[1024]));
+ prod.send(msg);
+ }
+ sessionSlow.commit();
+ }
+ catch (Throwable e)
+ {
+ this.e = e;
+ }
+ finally
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Throwable e)
+ {
+ this.e = e;
+ }
+ try
+ {
+ sessionSlow.close();
+ }
+ catch (Throwable e)
+ {
+ this.e = e;
+ }
+ }
+ }
+ }
+
+ class FastConsumer extends Thread
+ {
+ Throwable e;
+
+ FastConsumer()
+ {
+ super("Fast-Consumer");
+ }
+
+ public void run()
+ {
+ ClientSession session = null;
+ latchReady.countDown();
+ try
+ {
+ latchStart.await();
+ session = sf.createSession(true, true);
+ session.start();
+ ClientConsumer cons = session.createConsumer(Q2);
+ for (int i = 0; i < NUMBER_OF_FAST_MESSAGES; i++)
+ {
+ ClientMessage msg = cons.receive(60 * 1000);
+ msg.acknowledge();
+ }
+
+ assertNull(cons.receiveImmediate());
+ }
+ catch (Throwable e)
+ {
+ this.e = e;
+ }
+ finally
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Throwable e)
+ {
+ this.e = e;
+ }
+ }
+ }
+ }
+
+ FastConsumer f1 = new FastConsumer();
+ f1.start();
+
+ FastProducer p1 = new FastProducer();
+ p1.start();
+
+ latchReady.await();
+ latchStart.countDown();
+
+ p1.join();
+
+ if (p1.e != null)
+ {
+ throw p1.e;
+ }
+
+ f1.join();
+
+ if (f1.e != null)
+ {
+ throw f1.e;
+ }
+
+ sf.close();
+
+ server.stop();
+
+ setupServer(journalType);
+
+ ClientSession sess = sf.createSession(true, true);
+
+ ClientConsumer cons = sess.createConsumer(Q1);
+
+ sess.start();
+
+ for (int i = 0; i < numberOfMessages.intValue(); i++)
+ {
+ ClientMessage msg = cons.receive(10000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ }
+
+ assertNull(cons.receiveImmediate());
+
+ cons.close();
+
+ cons = sess.createConsumer(Q2);
+
+ assertNull(cons.receive(100));
+
+ sess.close();
+
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ clearData();
+ }
+
+ /**
+ * @throws Exception
+ * @throws MessagingException
+ */
+ private void setupServer(JournalType journalType) throws Exception, MessagingException
+ {
+ Configuration config = createDefaultConfig();
+ config.setJournalFileSize(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE);
+
+ config.setJournalType(journalType);
+
+ config.setJournalCompactMinFiles(0);
+ config.setJournalCompactPercentage(50);
+
+ server = createServer(true, config);
+
+ server.start();
+
+ sf = createInVMFactory();
+
+ ClientSession sess = sf.createSession();
+
+ try
+ {
+ sess.createQueue(AD1, Q1, true);
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ try
+ {
+ sess.createQueue(AD2, Q2, true);
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ sess.close();
+
+ sf = createInVMFactory();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ if (sf != null)
+ {
+ sf.close();
+ }
+
+ if (server != null)
+ {
+ server.stop();
+ }
+
+ // We don't super.tearDown here because in case of failure, the data may be useful for debug
+ // so, we only clear data on setup.
+ // super.tearDown();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
More information about the jboss-cvs-commits
mailing list