[hornetq-commits] JBoss hornetq SVN: r9288 - in trunk: tests/src/org/hornetq/tests/integration/journal and 1 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Jun 3 02:00:02 EDT 2010
Author: clebert.suconic at jboss.com
Date: 2010-06-03 02:00:01 -0400 (Thu, 03 Jun 2010)
New Revision: 9288
Added:
trunk/tests/src/org/hornetq/tests/stress/journal/JournalRestartStressTest.java
Modified:
trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-399 - fixing file size and adding tests
Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2010-06-01 14:55:59 UTC (rev 9287)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2010-06-03 06:00:01 UTC (rev 9288)
@@ -181,6 +181,10 @@
{
sequentialFile.position(0);
SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
+
+ // To Fix the size of the file
+ writingChannel.writerIndex(writingChannel.capacity());
+
sequentialFile.writeDirect(writingChannel.toByteBuffer(), true, completion);
completion.waitCompletion();
sequentialFile.close();
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-06-01 14:55:59 UTC (rev 9287)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-06-03 06:00:01 UTC (rev 9288)
@@ -14,6 +14,7 @@
package org.hornetq.tests.integration.journal;
import java.io.File;
+import java.io.FilenameFilter;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -204,7 +205,7 @@
performNonTransactionalDelete = false;
}
- setup(50, 60 * 1024, false);
+ setup(2, 60 * 1024, false);
ArrayList<Long> liveIDs = new ArrayList<Long>();
@@ -765,6 +766,30 @@
file.mkdir();
}
+
+ protected void tearDown() throws Exception
+ {
+
+ File testDir = new File(getTestDir());
+
+ File files[] = testDir.listFiles(new FilenameFilter()
+ {
+
+ public boolean accept(File dir, String name)
+ {
+ return name.startsWith(filePrefix) && name.endsWith(fileExtension);
+ }
+ });
+
+ for (File file : files)
+ {
+ assertEquals("File " + file + " doesn't have the expected number of bytes", fileSize, file.length());
+
+ System.out.println("File " + file);
+ }
+
+ super.tearDown();
+ }
/* (non-Javadoc)
* @see org.hornetq.tests.unit.core.journal.impl.JournalImplTestBase#getFileFactory()
Added: trunk/tests/src/org/hornetq/tests/stress/journal/JournalRestartStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/JournalRestartStressTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/JournalRestartStressTest.java 2010-06-03 06:00:01 UTC (rev 9288)
@@ -0,0 +1,219 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.stress.journal;
+
+import java.util.ArrayList;
+import java.util.Random;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.RandomUtil;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * Simulates the journal being updated, compacted cleared up,
+ * and having multiple restarts,
+ * To make sure the journal would survive at multiple restarts of the server
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalRestartStressTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testLoad() throws Throwable
+ {
+ HornetQServer server2 = createServer(true, false);
+
+ server2.getConfiguration().setJournalFileSize(10 * 1024 * 1024);
+ server2.getConfiguration().setJournalMinFiles(10);
+ server2.getConfiguration().setJournalCompactMinFiles(3);
+ server2.getConfiguration().setJournalCompactPercentage(50);
+
+
+ for (int i = 0 ; i < 10; i++)
+ {
+ server2.start();
+
+ ClientSessionFactory sf = createFactory(false);
+ sf.setMinLargeMessageSize(1024 * 1024);
+ sf.setBlockOnDurableSend(false);
+
+
+ ClientSession session = sf.createSession(true, true);
+
+ try
+ {
+ session.createQueue("slow-queue", "slow-queue");
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ session.start();
+ ClientConsumer consumer = session.createConsumer("slow-queue");
+
+
+ while (true)
+ {
+ System.out.println("Received message from previous");
+ ClientMessage msg = consumer.receiveImmediate();
+ if (msg == null)
+ {
+ break;
+ }
+ msg.acknowledge();
+ }
+
+
+
+ produceMessages(sf, 30000);
+
+ server2.stop();
+ }
+
+ }
+ // Package protected ---------------------------------------------
+
+ /**
+ * @param TIMEOUT
+ * @param NMSGS
+ * @throws HornetQException
+ * @throws InterruptedException
+ * @throws Throwable
+ */
+ private void produceMessages(final ClientSessionFactory sf, final int NMSGS) throws HornetQException,
+ InterruptedException,
+ Throwable
+ {
+
+ final int TIMEOUT = 5000;
+
+ System.out.println("sending " + NMSGS + " messages");
+
+
+ final ClientSession sessionSend = sf.createSession(true, true);
+
+ ClientProducer prod2 = sessionSend.createProducer("slow-queue");
+
+
+ try
+ {
+ sessionSend.createQueue("Queue", "Queue", true);
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ final ClientSession sessionReceive = sf.createSession(true, true);
+ sessionReceive.start();
+
+ final ArrayList<Throwable> errors = new ArrayList<Throwable>();
+
+ Thread tReceive = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ ClientConsumer consumer = sessionReceive.createConsumer("Queue");
+
+ for (int i = 0; i < NMSGS; i++)
+ {
+ if (i % 500 == 0)
+ {
+ double percent = (double)i / (double) NMSGS;
+ System.out.println("msgs " + i + " of " + NMSGS + ", " + (int)(percent * 100) + "%");
+ Thread.sleep(100);
+ }
+
+ ClientMessage msg = consumer.receive(TIMEOUT);
+ if (msg == null)
+ {
+ errors.add(new Exception("Didn't receive msgs"));
+ break;
+ }
+ msg.acknowledge();
+ }
+ }
+ catch (Exception e)
+ {
+ errors.add(e);
+ }
+ }
+ };
+
+ tReceive.start();
+
+ ClientProducer prod = sessionSend.createProducer("Queue");
+
+ Random random = new Random();
+
+ for (int i = 0; i < NMSGS; i++)
+ {
+ ClientMessage msg = sessionSend.createMessage(true);
+
+ int size = RandomUtil.randomPositiveInt() % 10024;
+
+ if (size == 0) size = 10 * 1024;
+
+ byte[] buffer = new byte[size];
+
+ random.nextBytes(buffer);
+
+ msg.getBodyBuffer().writeBytes(buffer);
+
+ prod.send(msg);
+
+ if (i % 5000 == 0)
+ {
+ prod2.send(msg);
+ System.out.println("Sending slow message");
+ }
+ }
+
+ tReceive.join();
+
+ sessionReceive.close();
+ sessionSend.close();
+
+ for (Throwable e : errors)
+ {
+ throw e;
+ }
+ }
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
More information about the hornetq-commits
mailing list