Author: clebert.suconic(a)jboss.com
Date: 2009-09-09 23:14:38 -0400 (Wed, 09 Sep 2009)
New Revision: 7948
Added:
trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java
Modified:
trunk/src/main/org/hornetq/core/asyncio/impl/TimedBuffer.java
trunk/src/main/org/hornetq/core/journal/SequentialFile.java
trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTestBase.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageTestBase.java
trunk/tests/src/org/hornetq/tests/integration/client/CompactingTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
HORNETQ-124 - Fixing races on compacting
Modified: trunk/src/main/org/hornetq/core/asyncio/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/asyncio/impl/TimedBuffer.java 2009-09-09 16:37:43 UTC
(rev 7947)
+++ trunk/src/main/org/hornetq/core/asyncio/impl/TimedBuffer.java 2009-09-10 03:14:38 UTC
(rev 7948)
@@ -179,12 +179,12 @@
this.bufferObserver = observer;
}
- public void lock()
+ public void disableAutoFlush()
{
lock.lock();
}
- public void unlock()
+ public void enableAutoFlush()
{
lock.unlock();
}
Modified: trunk/src/main/org/hornetq/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/SequentialFile.java 2009-09-09 16:37:43 UTC
(rev 7947)
+++ trunk/src/main/org/hornetq/core/journal/SequentialFile.java 2009-09-10 03:14:38 UTC
(rev 7948)
@@ -81,8 +81,8 @@
void renameTo(String newFileName) throws Exception;
- void lockBuffer();
+ void disableAutoFlush();
- void unlockBuffer();
+ void enableAutoFlush();
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-09-09
16:37:43 UTC (rev 7947)
+++ trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-09-10
03:14:38 UTC (rev 7948)
@@ -127,14 +127,14 @@
return timedBuffer.checkSize(size);
}
- public void lockBuffer()
+ public void disableAutoFlush()
{
- timedBuffer.lock();
+ timedBuffer.disableAutoFlush();
}
- public void unlockBuffer()
+ public void enableAutoFlush()
{
- timedBuffer.unlock();
+ timedBuffer.enableAutoFlush();
}
public synchronized void close() throws Exception
Modified: trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2009-09-09
16:37:43 UTC (rev 7947)
+++ trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2009-09-10
03:14:38 UTC (rev 7948)
@@ -91,14 +91,14 @@
public void activate(SequentialFile file)
{
final AIOSequentialFile sequentialFile = (AIOSequentialFile)file;
- timedBuffer.lock();
+ timedBuffer.disableAutoFlush();
try
{
sequentialFile.setTimedBuffer(timedBuffer);
}
finally
{
- timedBuffer.unlock();
+ timedBuffer.enableAutoFlush();
}
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2009-09-09 16:37:43
UTC (rev 7947)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2009-09-10 03:14:38
UTC (rev 7948)
@@ -548,9 +548,8 @@
ByteBuffer bufferWrite = fileFactory.newBuffer(journal.getFileSize());
writingChannel = ChannelBuffers.wrappedBuffer(bufferWrite);
- currentFile = journal.getFile(false, false, false);
+ currentFile = journal.getFile(false, false, false, true);
sequentialFile = currentFile.getFile();
- sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
sequentialFile.open(1);
fileID = nextOrderingID++;
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-09-09 16:37:43 UTC
(rev 7947)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-09-10 03:14:38 UTC
(rev 7948)
@@ -202,7 +202,8 @@
private ExecutorService filesExecutor = null;
- private final Semaphore lock = new Semaphore(1);
+ // Lock used during the append of records
+ private final Semaphore lockAppend = new Semaphore(1);
/** We don't lock the journal while compacting, however we need to lock it while
taking and updating snapshots */
private final ReadWriteLock compactingLock = new ReentrantReadWriteLock();
@@ -835,7 +836,7 @@
callback = getSyncCallback(sync);
- lock.acquire();
+ lockAppend.acquire();
try
{
JournalFile usedFile = appendRecord(bb, false, sync, null, callback);
@@ -844,7 +845,7 @@
}
finally
{
- lock.release();
+ lockAppend.release();
}
}
finally
@@ -895,7 +896,7 @@
callback = getSyncCallback(sync);
- lock.acquire();
+ lockAppend.acquire();
try
{
JournalFile usedFile = appendRecord(bb, false, sync, null, callback);
@@ -913,7 +914,7 @@
}
finally
{
- lock.release();
+ lockAppend.release();
}
}
finally
@@ -962,7 +963,7 @@
callback = getSyncCallback(sync);
- lock.acquire();
+ lockAppend.acquire();
try
{
JournalFile usedFile = appendRecord(bb, false, sync, null, callback);
@@ -981,7 +982,7 @@
}
finally
{
- lock.release();
+ lockAppend.release();
}
}
finally
@@ -1024,7 +1025,7 @@
JournalTransaction tx = getTransactionInfo(txID);
- lock.acquire();
+ lockAppend.acquire();
try
{
JournalFile usedFile = appendRecord(bb, false, false, tx, null);
@@ -1033,7 +1034,7 @@
}
finally
{
- lock.release();
+ lockAppend.release();
}
}
finally
@@ -1073,7 +1074,7 @@
JournalTransaction tx = getTransactionInfo(txID);
- lock.acquire();
+ lockAppend.acquire();
try
{
JournalFile usedFile = appendRecord(bb, false, false, tx, null);
@@ -1082,7 +1083,7 @@
}
finally
{
- lock.release();
+ lockAppend.release();
}
}
finally
@@ -1115,7 +1116,7 @@
JournalTransaction tx = getTransactionInfo(txID);
- lock.acquire();
+ lockAppend.acquire();
try
{
JournalFile usedFile = appendRecord(bb, false, false, tx, null);
@@ -1124,7 +1125,7 @@
}
finally
{
- lock.release();
+ lockAppend.release();
}
}
finally
@@ -1158,6 +1159,8 @@
throw new IllegalStateException("Journal must be loaded first");
}
+ compactingLock.readLock().lock();
+
JournalTransaction tx = getTransactionInfo(txID);
if (sync)
@@ -1165,8 +1168,6 @@
tx.syncPreviousFiles(fileFactory.isSupportsCallbacks(), currentFile);
}
- compactingLock.readLock().lock();
-
try
{
@@ -1175,7 +1176,7 @@
writeTransaction(-1, PREPARE_RECORD, txID, tx, transactionData, size, -1, bb);
- lock.acquire();
+ lockAppend.acquire();
try
{
JournalFile usedFile = appendRecord(bb, true, sync, tx, null);
@@ -1184,7 +1185,7 @@
}
finally
{
- lock.release();
+ lockAppend.release();
}
}
@@ -1221,10 +1222,10 @@
throw new IllegalStateException("Journal must be loaded first");
}
+ compactingLock.readLock().lock();
+
JournalTransaction tx = transactions.remove(txID);
- compactingLock.readLock().lock();
-
try
{
@@ -1237,7 +1238,7 @@
writeTransaction(-1, COMMIT_RECORD, txID, tx, null,
SIZE_COMPLETE_TRANSACTION_RECORD, -1, bb);
- lock.acquire();
+ lockAppend.acquire();
try
{
JournalFile usedFile = appendRecord(bb, true, sync, tx, null);
@@ -1246,7 +1247,7 @@
}
finally
{
- lock.release();
+ lockAppend.release();
}
}
@@ -1291,7 +1292,7 @@
bb.writeLong(txID);
bb.writeInt(size);
- lock.acquire();
+ lockAppend.acquire();
try
{
JournalFile usedFile = appendRecord(bb, false, sync, tx, null);
@@ -1300,7 +1301,7 @@
}
finally
{
- lock.release();
+ lockAppend.release();
}
}
@@ -1406,6 +1407,8 @@
try
{
+ log.info("Starting compacting operation on journal");
+
// We need to guarantee that the journal is frozen for this short time
// We don't freeze the journal as we compact, only for the short time where
we replace records
compactingLock.writeLock().lock();
@@ -1459,10 +1462,13 @@
Collections.sort(dataFilesToProcess, new JournalFileComparator());
+ // This is where most of the work is done, taking most of the time of the
compacting routine.
+ // Notice there are no locks while this is being done.
+
// Read the files, and use the JournalCompactor class to create the new
outputFiles, and the new collections as
// well
for (final JournalFile file : dataFilesToProcess)
- {
+ {
readJournalFile(fileFactory, file, compactor);
}
@@ -1518,7 +1524,7 @@
{
if (trace)
{
- trace("Merging pending transaction " + newTransaction +
" after compacting to the journal");
+ trace("Merging pending transaction " + newTransaction +
" after compacting the journal");
}
JournalTransaction liveTransaction =
transactions.get(newTransaction.getId());
if (liveTransaction == null)
@@ -1541,6 +1547,8 @@
renameFiles(dataFilesToProcess, newDatafiles);
deleteControlFile(controlFile);
+ log.info("Finished compacting on journal");
+
}
finally
{
@@ -1929,7 +1937,7 @@
for (int i = 0; i < filesToCreate; i++)
{
// Keeping all files opened can be very costly (mainly on AIO)
- freeFiles.add(createFile(false, false, true));
+ freeFiles.add(createFile(false, false, true, false));
}
}
@@ -2234,19 +2242,27 @@
// In some tests we need to force the journal to move to a next file
public void forceMoveNextFile() throws Exception
{
- lock.acquire();
+ compactingLock.readLock().lock();
try
{
- moveNextFile(true);
- if (autoReclaim)
+ lockAppend.acquire();
+ try
{
- checkAndReclaimFiles();
+ moveNextFile(true);
+ if (autoReclaim)
+ {
+ checkAndReclaimFiles();
+ }
+ debugWait();
}
- debugWait();
+ finally
+ {
+ lockAppend.release();
+ }
}
finally
{
- lock.release();
+ compactingLock.readLock().unlock();
}
}
@@ -2286,7 +2302,7 @@
throw new IllegalStateException("Journal is already stopped");
}
- lock.acquire();
+ lockAppend.acquire();
try
{
@@ -2321,7 +2337,7 @@
}
finally
{
- lock.release();
+ lockAppend.release();
}
}
@@ -2501,9 +2517,6 @@
return recordSize;
}
- /**
- * This method requires bufferControl disabled, or the reads are going to be invalid
- * */
private List<JournalFile> orderFiles() throws Exception
{
@@ -2522,7 +2535,7 @@
file.read(bb);
int fileID = bb.getInt();
-
+
fileFactory.releaseBuffer(bb);
bb = null;
@@ -2532,6 +2545,16 @@
nextFileID.set(fileID);
}
+ int fileNameID = getFileNameID(fileName);
+
+ // The compactor could create a fileName but use a previously assigned ID.
+ // Because of that we need to take both parts into account
+ if (nextFileID.get() < fileNameID)
+ {
+ nextFileID.set(fileNameID);
+ }
+
+
orderedFiles.add(new JournalFileImpl(file, fileID));
file.close();
@@ -2571,14 +2594,14 @@
throw new IllegalArgumentException("Record is too large to store "
+ size);
}
- // The buffer on the file can't be flushed or the currentFile could be
affected
- currentFile.getFile().lockBuffer();
+ // Disable auto flush on the timer. The Timer should'nt flush anything
+ currentFile.getFile().disableAutoFlush();
if (!currentFile.getFile().fits(size))
{
- currentFile.getFile().unlockBuffer();
+ currentFile.getFile().enableAutoFlush();
moveNextFile(false);
- currentFile.getFile().lockBuffer();
+ currentFile.getFile().disableAutoFlush();
// The same check needs to be done at the new file also
if (!currentFile.getFile().fits(size))
@@ -2642,10 +2665,24 @@
}
finally
{
- currentFile.getFile().unlockBuffer();
+ currentFile.getFile().enableAutoFlush();
}
}
+
+ /** Get the ID part of the name */
+ private int getFileNameID(String fileName)
+ {
+ try
+ {
+ return Integer.parseInt(fileName.substring(filePrefix.length()+1,
fileName.indexOf('.')));
+ }
+ catch (Throwable e)
+ {
+ log.warn("Impossible to get the ID part of the file name " + fileName,
e);
+ return 0;
+ }
+ }
/**
* This method will create a new file on the file system, pre-fill it with
FILL_CHARACTER
@@ -2653,12 +2690,24 @@
* @return
* @throws Exception
*/
- private JournalFile createFile(final boolean keepOpened, final boolean multiAIO, final
boolean fill) throws Exception
+ private JournalFile createFile(final boolean keepOpened,
+ final boolean multiAIO,
+ final boolean fill,
+ final boolean tmpCompact) throws Exception
{
int fileID = generateFileID();
- String fileName = filePrefix + "-" + fileID + "." +
fileExtension;
+ String fileName;
+ if (tmpCompact)
+ {
+ fileName = filePrefix + "-" + fileID + "." + fileExtension +
".cmp";
+ }
+ else
+ {
+ fileName = filePrefix + "-" + fileID + "." + fileExtension;
+ }
+
if (trace)
{
trace("Creating file " + fileName);
@@ -2820,7 +2869,7 @@
* */
private void pushOpenedFile() throws Exception
{
- JournalFile nextOpenedFile = getFile(true, true, true);
+ JournalFile nextOpenedFile = getFile(true, true, true, false);
openedFiles.offer(nextOpenedFile);
}
@@ -2829,12 +2878,20 @@
* @return
* @throws Exception
*/
- JournalFile getFile(final boolean keepOpened, final boolean multiAIO, final boolean
fill) throws Exception
+ JournalFile getFile(final boolean keepOpened,
+ final boolean multiAIO,
+ final boolean fill,
+ final boolean tmpCompactExtension) throws Exception
{
JournalFile nextOpenedFile = null;
try
{
nextOpenedFile = freeFiles.remove();
+ if (tmpCompactExtension)
+ {
+ SequentialFile sequentialFile = nextOpenedFile.getFile();
+ sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
+ }
}
catch (NoSuchElementException ignored)
{
@@ -2842,7 +2899,7 @@
if (nextOpenedFile == null)
{
- nextOpenedFile = createFile(keepOpened, multiAIO, fill);
+ nextOpenedFile = createFile(keepOpened, multiAIO, fill, tmpCompactExtension);
}
else
{
@@ -2951,7 +3008,7 @@
{
for (String dataFile : dataFiles)
{
- SequentialFile file = fileFactory.createSequentialFile(dataFile, 1);
+ SequentialFile file = fileFactory.createSequentialFile(dataFile, 1);
if (file.exists())
{
file.delete();
@@ -2960,7 +3017,7 @@
for (String newFile : newFiles)
{
- SequentialFile file = fileFactory.createSequentialFile(newFile, 1);
+ SequentialFile file = fileFactory.createSequentialFile(newFile, 1);
if (file.exists())
{
final String originalName = file.getFileName();
@@ -3179,7 +3236,7 @@
{
try
{
- lock.acquire();
+ lockAppend.acquire();
HornetQBuffer bb = newBuffer(128 * 1024);
@@ -3188,7 +3245,7 @@
appendRecord(bb, false, false, null, null);
}
- lock.release();
+ lockAppend.release();
}
catch (Exception e)
{
Modified: trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-09-09
16:37:43 UTC (rev 7947)
+++ trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-09-10
03:14:38 UTC (rev 7948)
@@ -292,14 +292,14 @@
/* (non-Javadoc)
* @see org.hornetq.core.journal.SequentialFile#lockBuffer()
*/
- public void lockBuffer()
+ public void disableAutoFlush()
{
}
/* (non-Javadoc)
* @see org.hornetq.core.journal.SequentialFile#unlockBuffer()
*/
- public void unlockBuffer()
+ public void enableAutoFlush()
{
}
Modified:
trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTestBase.java
===================================================================
---
trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTestBase.java 2009-09-09
16:37:43 UTC (rev 7947)
+++
trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTestBase.java 2009-09-10
03:14:38 UTC (rev 7948)
@@ -47,7 +47,7 @@
*
* $Id: MessageImplTestBase.java 2883 2007-07-12 23:36:16Z timfox $
*/
-public class MessageHeaderTestBase extends HornetQServerTestCase
+public abstract class MessageHeaderTestBase extends HornetQServerTestCase
{
// Constants -----------------------------------------------------
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageTestBase.java
===================================================================
---
trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageTestBase.java 2009-09-09
16:37:43 UTC (rev 7947)
+++
trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageTestBase.java 2009-09-10
03:14:38 UTC (rev 7948)
@@ -29,7 +29,7 @@
*
* $Id$
*/
-public class MessageTestBase extends HornetQServerTestCase
+public abstract class MessageTestBase extends HornetQServerTestCase
{
// Constants -----------------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/integration/client/CompactingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/CompactingTest.java 2009-09-09
16:37:43 UTC (rev 7947)
+++ trunk/tests/src/org/hornetq/tests/integration/client/CompactingTest.java 2009-09-10
03:14:38 UTC (rev 7948)
@@ -47,10 +47,16 @@
private static final String AD2 = "ad2";
+ private static final String AD3 = "ad3";
+
private static final String Q1 = "q1";
private static final String Q2 = "q2";
+ private static final String Q3 = "q3";
+
+ private static final int TOT_AD3 = 5000;
+
private HornetQServer server;
private ClientSessionFactory sf;
@@ -76,6 +82,36 @@
setupServer(journalType);
+ ClientSession session = sf.createSession(false, false);
+
+ try
+ {
+ ClientProducer producer = session.createProducer(AD3);
+
+ byte[] buffer = new byte[10 * 1024];
+
+ ClientMessage msg = session.createClientMessage(true);
+ msg.setBody(ChannelBuffers.wrappedBuffer(buffer));
+ for (int i = 0; i < TOT_AD3; i++)
+ {
+ producer.send(msg);
+ if (i % 100 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+ }
+ finally
+ {
+ session.close();
+ }
+
+ server.stop();
+
+ setupServer(journalType);
+
final AtomicInteger numberOfMessages = new AtomicInteger(0);
final int NUMBER_OF_FAST_MESSAGES = 100000;
final int SLOW_INTERVAL = 100;
@@ -224,7 +260,7 @@
try
{
-
+
sess = sf.createSession(true, true);
ClientConsumer cons = sess.createConsumer(Q1);
@@ -246,6 +282,19 @@
assertNull(cons.receive(100));
+ cons.close();
+
+ cons = sess.createConsumer(Q3);
+
+ for (int i = 0; i < TOT_AD3; i++)
+ {
+ ClientMessage msg = cons.receive(60000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ }
+
+ assertNull(cons.receiveImmediate());
+
}
finally
{
@@ -305,6 +354,14 @@
{
}
+ try
+ {
+ sess.createQueue(AD3, Q3, true);
+ }
+ catch (Exception ignored)
+ {
+ }
+
sess.close();
sf = createInVMFactory();
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java 2009-09-09
16:37:43 UTC (rev 7947)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java 2009-09-10
03:14:38 UTC (rev 7948)
@@ -13,9 +13,30 @@
package org.hornetq.tests.integration.jms.client;
+import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP;
+import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
+import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
+import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND;
+import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
+import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
+import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
+import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
+import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
+import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
+import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
+import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
+import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE;
+import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
+import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_THREAD_POOL_MAX_SIZE;
+import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_USE_GLOBAL_POOLS;
+
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.List;
+import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.MapMessage;
import javax.jms.Message;
@@ -26,7 +47,11 @@
import javax.jms.Session;
import javax.jms.TextMessage;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.jms.client.HornetQMessage;
import org.hornetq.tests.util.JMSTestBase;
+import org.hornetq.utils.Pair;
/**
* Receive Messages and resend them, like the bridge would do
@@ -62,6 +87,11 @@
for (int i = 0; i < 10; i++)
{
+ BytesMessage bm = sess.createBytesMessage();
+ bm.setObjectProperty(HornetQMessage.JMS_HORNETQ_INPUT_STREAM,
+ createFakeLargeStream(2 *
ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE));
+ msgs.add(bm);
+
MapMessage mm = sess.createMapMessage();
mm.setBoolean("boolean", true);
mm.setByte("byte", (byte)3);
@@ -125,8 +155,17 @@
sess.commit();
- if (copiedMessage instanceof MapMessage)
+ if (copiedMessage instanceof BytesMessage)
{
+ BytesMessage copiedBytes = (BytesMessage)copiedMessage;
+
+ for (int i = 0; i < copiedBytes.getBodyLength(); i++)
+ {
+ assertEquals(getSamplebyte(i), copiedBytes.readByte());
+ }
+ }
+ else if (copiedMessage instanceof MapMessage)
+ {
MapMessage copiedMap = (MapMessage)copiedMessage;
MapMessage originalMap = (MapMessage)originalMessage;
assertEquals(originalMap.getString("str"),
copiedMap.getString("str"));
@@ -209,7 +248,46 @@
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
+ protected void createCF(List<Pair<TransportConfiguration,
TransportConfiguration>> connectorConfigs,
+ List<String> jndiBindings) throws Exception
+ {
+ int retryInterval = 1000;
+ double retryIntervalMultiplier = 1.0;
+ int reconnectAttempts = -1;
+ boolean failoverOnServerShutdown = true;
+ int callTimeout = 30000;
+
jmsServer.createConnectionFactory("ManualReconnectionToSingleServerTest",
+ connectorConfigs,
+ null,
+ DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ DEFAULT_CONNECTION_TTL,
+ callTimeout,
+ DEFAULT_MAX_CONNECTIONS,
+ true,
+ DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ DEFAULT_CONSUMER_WINDOW_SIZE,
+ DEFAULT_CONSUMER_MAX_RATE,
+ DEFAULT_PRODUCER_WINDOW_SIZE,
+ DEFAULT_PRODUCER_MAX_RATE,
+ DEFAULT_BLOCK_ON_ACKNOWLEDGE,
+ DEFAULT_BLOCK_ON_PERSISTENT_SEND,
+ DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
+ DEFAULT_AUTO_GROUP,
+ DEFAULT_PRE_ACKNOWLEDGE,
+
DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
+ DEFAULT_ACK_BATCH_SIZE,
+ DEFAULT_ACK_BATCH_SIZE,
+ DEFAULT_USE_GLOBAL_POOLS,
+ DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
+ DEFAULT_THREAD_POOL_MAX_SIZE,
+ retryInterval,
+ retryIntervalMultiplier,
+ reconnectAttempts,
+ failoverOnServerShutdown,
+ jndiBindings);
+ }
+
@Override
protected void setUp() throws Exception
{
Added:
trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java 2009-09-10
03:14:38 UTC (rev 7948)
@@ -0,0 +1,493 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.journal;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.JournalType;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.SimpleString;
+
+/**
+ * A MultiThreadConsumerStressTest
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class MultiThreadCompactorTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ final SimpleString ADDRESS = new SimpleString("SomeAddress");
+
+ final SimpleString QUEUE = new SimpleString("SomeQueue");
+
+ private HornetQServer server;
+
+ private ClientSessionFactory sf;
+
+ protected int getNumberOfIterations()
+ {
+ return 3;
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ stopServer();
+ super.tearDown();
+ }
+
+ public void testMultiThreadCompact() throws Throwable
+ {
+ setupServer(JournalType.ASYNCIO);
+ for (int i = 0; i < getNumberOfIterations(); i++)
+ {
+ System.out.println("######################################");
+ System.out.println("test # " + i);
+ internalTestProduceAndConsume();
+ stopServer();
+
+ AIOSequentialFileFactory factory = new
AIOSequentialFileFactory(getJournalDir());
+ JournalImpl journal = new
JournalImpl(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE,
+ 2,
+ 0,
+ 0,
+ factory,
+ "hornetq-data",
+ "hq",
+ 100);
+ List<RecordInfo> committedRecords = new ArrayList<RecordInfo>();
+ List<PreparedTransactionInfo> preparedTransactions = new
ArrayList<PreparedTransactionInfo>();
+ journal.start();
+ journal.load(committedRecords, preparedTransactions);
+
+ assertEquals(0, committedRecords.size());
+ assertEquals(0, preparedTransactions.size());
+
+ System.out.println("DataFiles = " + journal.getDataFilesCount());
+
+ if (i % 2 == 0 && i > 0)
+ {
+ System.out.println("DataFiles = " + journal.getDataFilesCount());
+ journal.forceMoveNextFile();
+ assertEquals(0, journal.getDataFilesCount());
+ }
+
+ journal.stop();
+ journal = null;
+
+ setupServer(JournalType.ASYNCIO);
+ }
+ }
+
+ public void internalTestProduceAndConsume() throws Throwable
+ {
+
+ addBogusData(100, "LAZY-QUEUE");
+
+ System.out.println(getTemporaryDir());
+ boolean transactionalOnConsume = true;
+ boolean transactionalOnProduce = true;
+ int numberOfConsumers = 30;
+ // this test assumes numberOfConsumers == numberOfProducers
+ int numberOfProducers = numberOfConsumers;
+ int produceMessage = 5000;
+ int commitIntervalProduce = 100;
+ int consumeMessage = (int)(produceMessage * 0.9);
+ int commitIntervalConsume = 100;
+
+ System.out.println("ConsumeMessages = " + consumeMessage + "
produceMessage = " + produceMessage);
+
+ // Number of messages expected to be received after restart
+ int numberOfMessagesExpected = (produceMessage - consumeMessage) *
numberOfConsumers;
+
+ CountDownLatch latchReady = new CountDownLatch(numberOfConsumers +
numberOfProducers);
+
+ CountDownLatch latchStart = new CountDownLatch(1);
+
+ ArrayList<BaseThread> threads = new ArrayList<BaseThread>();
+
+ ProducerThread[] prod = new ProducerThread[numberOfProducers];
+ for (int i = 0; i < numberOfProducers; i++)
+ {
+ prod[i] = new ProducerThread(i,
+ latchReady,
+ latchStart,
+ transactionalOnConsume,
+ produceMessage,
+ commitIntervalProduce);
+ prod[i].start();
+ threads.add(prod[i]);
+ }
+
+ ConsumerThread[] cons = new ConsumerThread[numberOfConsumers];
+
+ for (int i = 0; i < numberOfConsumers; i++)
+ {
+ cons[i] = new ConsumerThread(i,
+ latchReady,
+ latchStart,
+ transactionalOnProduce,
+ consumeMessage,
+ commitIntervalConsume);
+ cons[i].start();
+ threads.add(cons[i]);
+ }
+
+ latchReady.await();
+ latchStart.countDown();
+
+ for (BaseThread t : threads)
+ {
+ t.join();
+ if (t.e != null)
+ {
+ throw t.e;
+ }
+ }
+
+ server.stop();
+
+ setupServer(JournalType.ASYNCIO);
+
+ drainQueue(numberOfMessagesExpected, QUEUE);
+ drainQueue(100, new SimpleString("LAZY-QUEUE"));
+
+ server.stop();
+
+ setupServer(JournalType.ASYNCIO);
+ drainQueue(0, QUEUE);
+ drainQueue(0, new SimpleString("LAZY-QUEUE"));
+
+ }
+
+ /**
+ * @param numberOfMessagesExpected
+ * @param queue
+ * @throws HornetQException
+ */
+ private void drainQueue(int numberOfMessagesExpected, SimpleString queue) throws
HornetQException
+ {
+ ClientSession sess = sf.createSession(true, true);
+
+ ClientConsumer consumer = sess.createConsumer(queue);
+
+ sess.start();
+
+ for (int i = 0; i < numberOfMessagesExpected; i++)
+ {
+ ClientMessage msg = consumer.receive(5000);
+ assertNotNull(msg);
+
+ if (i % 100 == 0)
+ {
+ System.out.println("Received #" + i + " on thread after
start");
+ }
+ msg.acknowledge();
+ }
+
+ assertNull(consumer.receiveImmediate());
+
+ sess.close();
+ }
+
+ /**
+ * @throws HornetQException
+ */
+ private void addBogusData(int nmessages, String queue) throws HornetQException
+ {
+ ClientSession session = sf.createSession(false, false);
+ try
+ {
+ session.createQueue(queue, queue, true);
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ ClientProducer prod = session.createProducer(queue);
+ for (int i = 0; i < nmessages; i++)
+ {
+ ClientMessage msg = session.createClientMessage(true);
+ msg.getBody().writeBytes(new byte[1024]);
+ prod.send(msg);
+ }
+ session.commit();
+
+ session.start();
+
+ ClientConsumer cons = session.createConsumer(queue);
+ assertNotNull(cons.receive(1000));
+ session.rollback();
+ session.close();
+ }
+
+ protected void stopServer() throws Exception
+ {
+ try
+ {
+ if (server != null && server.isStarted())
+ {
+ server.stop();
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace(System.out); // System.out => junit reports
+ }
+
+ sf = null;
+ }
+
+ private void setupServer(JournalType journalType) throws Exception, HornetQException
+ {
+ if (server == null)
+ {
+ Configuration config = createDefaultConfig(true);
+ config.setJournalFileSize(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE);
+
+ config.setJournalType(journalType);
+ config.setJMXManagementEnabled(false);
+
+ config.setJournalFileSize(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE);
+ config.setJournalMinFiles(ConfigurationImpl.DEFAULT_JOURNAL_MIN_FILES);
+
+
config.setJournalCompactMinFiles(ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_MIN_FILES);
+
config.setJournalCompactPercentage(ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_PERCENTAGE);
+
+ // config.setJournalCompactMinFiles(0);
+ // config.setJournalCompactPercentage(0);
+
+ server = createServer(true, config);
+ }
+
+ server.start();
+
+ sf = createNettyFactory();
+
+ ClientSession sess = sf.createSession();
+
+ try
+ {
+ sess.createQueue(ADDRESS, QUEUE, true);
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ sess.close();
+
+ sf = createInVMFactory();
+ }
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ class BaseThread extends Thread
+ {
+ Throwable e;
+
+ final CountDownLatch latchReady;
+
+ final CountDownLatch latchStart;
+
+ final int numberOfMessages;
+
+ final int commitInterval;
+
+ final boolean transactional;
+
+ BaseThread(String name,
+ CountDownLatch latchReady,
+ CountDownLatch latchStart,
+ boolean transactional,
+ int numberOfMessages,
+ int commitInterval)
+ {
+ super(name);
+ this.transactional = transactional;
+ this.latchReady = latchReady;
+ this.latchStart = latchStart;
+ this.commitInterval = commitInterval;
+ this.numberOfMessages = numberOfMessages;
+ }
+
+ }
+
+ class ProducerThread extends BaseThread
+ {
+ ProducerThread(int id,
+ CountDownLatch latchReady,
+ CountDownLatch latchStart,
+ boolean transactional,
+ int numberOfMessages,
+ int commitInterval)
+ {
+ super("ClientProducer:" + id, latchReady, latchStart, transactional,
numberOfMessages, commitInterval);
+ }
+
+ public void run()
+ {
+ ClientSession session = null;
+ latchReady.countDown();
+ try
+ {
+ latchStart.await();
+ session = sf.createSession(!transactional, !transactional);
+ ClientProducer prod = session.createProducer(ADDRESS);
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ if (transactional)
+ {
+ if (i % commitInterval == 0)
+ {
+ session.commit();
+ }
+ }
+ if (i % 100 == 0)
+ {
+ // System.out.println(Thread.currentThread().getName() + "::sent
#" + i);
+ }
+ ClientMessage msg = session.createClientMessage(true);
+ msg.setBody(ChannelBuffers.wrappedBuffer(new byte[1024]));
+ prod.send(msg);
+ }
+
+ if (transactional)
+ {
+ session.commit();
+ }
+
+ System.out.println("Thread " + Thread.currentThread().getName() +
+ " sent " +
+ numberOfMessages +
+ " messages");
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ this.e = e;
+ }
+ finally
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ class ConsumerThread extends BaseThread
+ {
+ ConsumerThread(int id,
+ CountDownLatch latchReady,
+ CountDownLatch latchStart,
+ boolean transactional,
+ int numberOfMessages,
+ int commitInterval)
+ {
+ super("ClientConsumer:" + id, latchReady, latchStart, transactional,
numberOfMessages, commitInterval);
+ }
+
+ public void run()
+ {
+ ClientSession session = null;
+ latchReady.countDown();
+ try
+ {
+ latchStart.await();
+ session = sf.createSession(!transactional, !transactional);
+ session.start();
+ ClientConsumer cons = session.createConsumer(QUEUE);
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = cons.receive(60 * 1000);
+ msg.acknowledge();
+ if (i % commitInterval == 0)
+ {
+ session.commit();
+ }
+ if (i % 100 == 0)
+ {
+ // System.out.println(Thread.currentThread().getName() +
"::received #" + i);
+ }
+ }
+
+ System.out.println("Thread " + Thread.currentThread().getName() +
+ " received " +
+ numberOfMessages +
+ " messages");
+
+ session.commit();
+ }
+ catch (Throwable e)
+ {
+ this.e = e;
+ }
+ finally
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Throwable e)
+ {
+ this.e = e;
+ }
+ }
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-09-09
16:37:43 UTC (rev 7947)
+++
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-09-10
03:14:38 UTC (rev 7948)
@@ -585,14 +585,14 @@
/* (non-Javadoc)
* @see org.hornetq.core.journal.SequentialFile#lockBuffer()
*/
- public void lockBuffer()
+ public void disableAutoFlush()
{
}
/* (non-Javadoc)
* @see org.hornetq.core.journal.SequentialFile#unlockBuffer()
*/
- public void unlockBuffer()
+ public void enableAutoFlush()
{
}