[hornetq-commits] JBoss hornetq SVN: r9300 - in trunk: tests/src/org/hornetq/tests/integration/journal and 1 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Sat Jun 5 03:02:37 EDT 2010
Author: clebert.suconic at jboss.com
Date: 2010-06-05 03:02:37 -0400 (Sat, 05 Jun 2010)
New Revision: 9300
Modified:
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
trunk/tests/src/org/hornetq/tests/stress/journal/JournalRestartStressTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-407 improving journal shutdown on compacting and avoid a rare test failure on JournalRestartStressTest
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-06-03 22:15:48 UTC (rev 9299)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-06-05 07:02:37 UTC (rev 9300)
@@ -2277,6 +2277,11 @@
// compacting is disabled
return;
}
+
+ if (state != JournalImpl.STATE_LOADED)
+ {
+ return;
+ }
JournalFile[] dataFiles = getDataFiles();
@@ -2536,6 +2541,16 @@
try
{
+
+ state = JournalImpl.STATE_STOPPED;
+
+ compactorExecutor.shutdown();
+
+ if (!compactorExecutor.awaitTermination(120, TimeUnit.SECONDS))
+ {
+ JournalImpl.log.warn("Couldn't stop compactor executor after 120 seconds");
+ }
+
filesExecutor.shutdown();
if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS))
@@ -2564,8 +2579,6 @@
freeFiles.clear();
openedFiles.clear();
-
- state = JournalImpl.STATE_STOPPED;
}
finally
{
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-06-03 22:15:48 UTC (rev 9299)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-06-05 07:02:37 UTC (rev 9300)
@@ -23,6 +23,8 @@
import junit.framework.Assert;
import org.hornetq.api.core.Pair;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.AbstractJournalUpdateTask;
@@ -181,7 +183,58 @@
{
internalCompactTest(false, false, true, true, false, false, false, false, false, false, true, true, true);
}
+
+ public void testCompactFirstFileReclaimed() throws Exception
+ {
+ setup(2, 60 * 1024, false);
+
+ final byte recordType = (byte)0;
+
+ journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO);
+
+ journal.start();
+
+ journal.loadInternalOnly();
+
+ journal.appendAddRecord(1, recordType, "test".getBytes(), true);
+
+ journal.forceMoveNextFile();
+
+
+ journal.appendUpdateRecord(1, recordType, "update".getBytes(), true);
+
+ journal.appendDeleteRecord(1, true);
+
+ journal.appendAddRecord(2, recordType, "finalRecord".getBytes(), true);
+
+
+ for (int i = 10 ; i < 100; i++)
+ {
+ journal.appendAddRecord(i, recordType, ("tst" + i).getBytes(), true);
+ journal.forceMoveNextFile();
+ journal.appendUpdateRecord(i, recordType, ("uptst" + i).getBytes(), true);
+ journal.appendDeleteRecord(i, true);
+ }
+
+ journal.compact();
+
+ journal.stop();
+
+ List<RecordInfo> records = new ArrayList<RecordInfo>();
+
+ List<PreparedTransactionInfo> preparedRecords = new ArrayList<PreparedTransactionInfo>();
+
+ journal.start();
+
+ journal.load(records, preparedRecords, null);
+
+ assertEquals(1, records.size());
+
+
+
+ }
+
private void internalCompactTest(final boolean preXA, // prepare before compact
final boolean postXA, // prepare after compact
final boolean regularAdd,
Modified: trunk/tests/src/org/hornetq/tests/stress/journal/JournalRestartStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/JournalRestartStressTest.java 2010-06-03 22:15:48 UTC (rev 9299)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/JournalRestartStressTest.java 2010-06-05 07:02:37 UTC (rev 9300)
@@ -57,18 +57,16 @@
server2.getConfiguration().setJournalCompactMinFiles(3);
server2.getConfiguration().setJournalCompactPercentage(50);
-
- for (int i = 0 ; i < 10; i++)
+ for (int i = 0; i < 10; i++)
{
server2.start();
-
+
ClientSessionFactory sf = createFactory(false);
sf.setMinLargeMessageSize(1024 * 1024);
sf.setBlockOnDurableSend(false);
-
ClientSession session = sf.createSession(true, true);
-
+
try
{
session.createQueue("slow-queue", "slow-queue");
@@ -79,8 +77,7 @@
session.start();
ClientConsumer consumer = session.createConsumer("slow-queue");
-
-
+
while (true)
{
System.out.println("Received message from previous");
@@ -91,15 +88,16 @@
}
msg.acknowledge();
}
-
-
-
+
+ session.close();
+
produceMessages(sf, 30000);
-
+
server2.stop();
}
}
+
// Package protected ---------------------------------------------
/**
@@ -110,19 +108,17 @@
* @throws Throwable
*/
private void produceMessages(final ClientSessionFactory sf, final int NMSGS) throws HornetQException,
- InterruptedException,
- Throwable
+ InterruptedException,
+ Throwable
{
-
+
final int TIMEOUT = 5000;
-
+
System.out.println("sending " + NMSGS + " messages");
+ final ClientSession sessionSend = sf.createSession(true, true);
- final ClientSession sessionSend = sf.createSession(true, true);
-
ClientProducer prod2 = sessionSend.createProducer("slow-queue");
-
try
{
@@ -139,6 +135,7 @@
Thread tReceive = new Thread()
{
+ @Override
public void run()
{
try
@@ -149,8 +146,8 @@
{
if (i % 500 == 0)
{
- double percent = (double)i / (double) NMSGS;
- System.out.println("msgs " + i + " of " + NMSGS + ", " + (int)(percent * 100) + "%");
+ double percent = (double)i / (double)NMSGS;
+ System.out.println("msgs " + i + " of " + NMSGS + ", " + (int)(percent * 100) + "%");
Thread.sleep(100);
}
@@ -179,11 +176,14 @@
for (int i = 0; i < NMSGS; i++)
{
ClientMessage msg = sessionSend.createMessage(true);
-
+
int size = RandomUtil.randomPositiveInt() % 10024;
- if (size == 0) size = 10 * 1024;
-
+ if (size == 0)
+ {
+ size = 10 * 1024;
+ }
+
byte[] buffer = new byte[size];
random.nextBytes(buffer);
@@ -191,7 +191,7 @@
msg.getBodyBuffer().writeBytes(buffer);
prod.send(msg);
-
+
if (i % 5000 == 0)
{
prod2.send(msg);
@@ -203,6 +203,7 @@
sessionReceive.close();
sessionSend.close();
+ sf.close();
for (Throwable e : errors)
{
More information about the hornetq-commits
mailing list