Author: clebert.suconic(a)jboss.com
Date: 2011-03-11 16:33:07 -0500 (Fri, 11 Mar 2011)
New Revision: 10322
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
Log:
Adding new test
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2011-03-11
21:32:40 UTC (rev 10321)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2011-03-11
21:33:07 UTC (rev 10322)
@@ -1610,6 +1610,7 @@
if (dataFilesToProcess.size() == 0)
{
+ trace("Finishing compacting, nothing to process");
return;
}
@@ -1740,7 +1741,7 @@
if (JournalImpl.trace)
{
- JournalImpl.log.debug("Finished compacting on journal");
+ trace("Finished compacting on journal");
}
if (JournalImpl.TRACE_RECORDS)
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2011-03-11
21:32:40 UTC (rev 10321)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2011-03-11
21:33:07 UTC (rev 10322)
@@ -17,13 +17,21 @@
import java.io.FilenameFilter;
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import junit.framework.Assert;
import org.hornetq.api.core.Pair;
-import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.PreparedTransactionInfo;
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.SequentialFile;
@@ -36,9 +44,13 @@
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.tests.unit.core.journal.impl.JournalImplTestBase;
import org.hornetq.tests.unit.core.journal.impl.fakes.SimpleEncoding;
import org.hornetq.utils.IDGenerator;
+import org.hornetq.utils.OrderedExecutorFactory;
import org.hornetq.utils.SimpleIDGenerator;
/**
@@ -238,41 +250,41 @@
setup(2, 60 * 1024, false);
createJournal();
-
+
startJournal();
-
+
load();
startCompact();
-
+
addTx(1, 2);
-
+
prepare(1, new SimpleEncoding(10, (byte)0));
-
+
finishCompact();
-
+
stopJournal();
-
+
createJournal();
-
+
startJournal();
-
+
loadAndCheck();
-
+
startCompact();
-
+
commit(1);
-
+
finishCompact();
-
+
journal.compact();
-
+
stopJournal();
-
+
createJournal();
startJournal();
-
+
loadAndCheck();
}
@@ -281,37 +293,37 @@
setup(2, 60 * 1024, false);
createJournal();
-
+
startJournal();
-
+
load();
addTx(1, 2);
-
+
prepare(1, new SimpleEncoding(10, (byte)0));
-
+
stopJournal();
-
+
createJournal();
-
+
startJournal();
-
+
loadAndCheck();
-
+
startCompact();
-
+
commit(1);
-
+
finishCompact();
-
+
journal.compact();
-
+
stopJournal();
-
+
createJournal();
startJournal();
-
+
loadAndCheck();
}
@@ -320,29 +332,29 @@
setup(2, 60 * 1024, false);
createJournal();
-
+
startJournal();
-
+
load();
addTx(1, 2, 3);
-
+
prepare(1, new SimpleEncoding(10, (byte)0));
-
+
startCompact();
-
+
commit(1);
-
+
finishCompact();
-
+
journal.compact();
-
+
stopJournal();
-
+
createJournal();
startJournal();
-
+
loadAndCheck();
}
@@ -1446,7 +1458,7 @@
}
long tx1 = idGenerator.generateID();
-
+
journal.forceMoveNextFile();
ArrayList<Long> listToDelete = new ArrayList<Long>();
@@ -1469,7 +1481,7 @@
startCompact();
System.out.println("Committing TX " + tx1);
rollback(tx0);
- for (int i = 0 ; i < 10; i++)
+ for (int i = 0; i < 10; i++)
{
addTx(tx1, ids[i]);
}
@@ -1506,7 +1518,7 @@
}
long tx1 = idGenerator.generateID();
-
+
journal.forceMoveNextFile();
ArrayList<Long> listToDelete = new ArrayList<Long>();
@@ -1529,7 +1541,7 @@
startCompact();
System.out.println("Committing TX " + tx1);
rollback(tx0);
- for (int i = 0 ; i < 10; i++)
+ for (int i = 0; i < 10; i++)
{
addTx(tx1, ids[i]);
}
@@ -1564,11 +1576,11 @@
ids[i] = idGenerator.generateID();
addTx(tx0, ids[i]);
}
-
+
commit(tx0);
startCompact();
- for (int i = 0 ; i < 10; i++)
+ for (int i = 0; i < 10; i++)
{
delete(ids[i]);
}
@@ -1591,22 +1603,22 @@
long tx0 = idGenerator.generateID();
add(idGenerator.generateID());
- long ids[] = new long[]{idGenerator.generateID(), idGenerator.generateID()};
+ long ids[] = new long[] { idGenerator.generateID(), idGenerator.generateID() };
addTx(tx0, ids[0]);
addTx(tx0, ids[1]);
-
+
journal.forceMoveNextFile();
-
+
commit(tx0);
-
+
journal.forceMoveNextFile();
-
+
delete(ids[0]);
delete(ids[1]);
-
+
journal.forceMoveNextFile();
-
+
journal.compact();
stopJournal();
@@ -1700,6 +1712,161 @@
}
+ public void testStressDeletesNoSync() throws Exception
+ {
+ Configuration config = createBasicConfig();
+ config.setJournalFileSize(100 * 1024);
+ System.out.println(config.getJournalDirectory());
+ config.setJournalSyncNonTransactional(false);
+ config.setJournalSyncTransactional(false);
+// config.setJournalBufferTimeout_NIO(2000000000);
+// config.setJournalBufferTimeout_AIO(2000000000);
+ config.setJournalCompactMinFiles(0);
+ config.setJournalCompactPercentage(0);
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ final AtomicBoolean running = new AtomicBoolean(true);
+
+ final AtomicLong seqGenerator = new AtomicLong(1);
+
+ final ExecutorService executor = Executors.newCachedThreadPool();
+
+ OrderedExecutorFactory factory = new OrderedExecutorFactory(executor);
+
+ final ExecutorService deleteExecutor = Executors.newCachedThreadPool();
+
+ final JournalStorageManager storage = new JournalStorageManager(config, factory);
+
+ storage.start();
+ storage.loadInternalOnly();
+
+ ((JournalImpl)storage.getMessageJournal()).setAutoReclaim(false);
+ final LinkedList<Long> survivingMsgs = new LinkedList<Long>();
+
+ Runnable producerRunnable = new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ while (running.get())
+ {
+ final long[] values = new long[100];
+ long tx = seqGenerator.incrementAndGet();
+
+ OperationContextImpl ctx = new OperationContextImpl(executor);
+ storage.setContext(ctx);
+
+ for (int i = 0; i < 100; i++)
+ {
+ long id = seqGenerator.incrementAndGet();
+ values[i] = id;
+
+ ServerMessageImpl message = new ServerMessageImpl(id, 100);
+
+ message.getBodyBuffer().writeBytes(new byte[1024]);
+
+ storage.storeMessageTransactional(tx, message);
+ }
+ ServerMessageImpl message = new
ServerMessageImpl(seqGenerator.incrementAndGet(), 100);
+
+ survivingMsgs.add(message.getMessageID());
+
+ // This one will stay here forever
+ storage.storeMessage(message);
+
+ storage.commit(tx);
+
+ ctx.executeOnCompletion(new IOAsyncTask()
+ {
+ public void onError(int errorCode, String errorMessage)
+ {
+ }
+
+ public void done()
+ {
+ deleteExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ for (long messageID : values)
+ {
+ storage.deleteMessage(messageID);
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+
+ }
+ });
+ }
+ });
+
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ };
+
+ Runnable compressRunnable = new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ int i = 0;
+ while (running.get())
+ {
+ Thread.sleep(500);
+ System.out.println("Compacting");
+ ((JournalImpl)storage.getMessageJournal()).compact();
+ ((JournalImpl)storage.getMessageJournal()).checkReclaimStatus();
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+
+ }
+ };
+
+ Thread producerThread = new Thread(producerRunnable);
+ producerThread.start();
+
+ Thread compactorThread = new Thread(compressRunnable);
+ compactorThread.start();
+
+ Thread.sleep(10000);
+
+ running.set(false);
+
+ producerThread.join();
+
+ compactorThread.join();
+
+ executor.shutdown();
+
+ assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS));
+
+ deleteExecutor.shutdown();
+
+ assertTrue(deleteExecutor.awaitTermination(30, TimeUnit.SECONDS));
+
+ storage.stop();
+ }
+
@Override
protected void setUp() throws Exception
{