[jboss-cvs] JBoss Messaging SVN: r7528 - in trunk: tests/src/org/jboss/messaging/tests/integration/client and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jul 6 19:33:05 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-07-06 19:33:05 -0400 (Mon, 06 Jul 2009)
New Revision: 7528
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/client/CompactingTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java
Log:
Fixes on compactors
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-07-06 20:00:55 UTC (rev 7527)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-07-06 23:33:05 UTC (rev 7528)
@@ -499,17 +499,17 @@
try
{
- wholeFileBuffer = fileFactory.newBuffer((int)file.getFile().size());
+ final int filesize = (int)file.getFile().size();
+ wholeFileBuffer = fileFactory.newBuffer((int)filesize);
+
final int journalFileSize = file.getFile().read(wholeFileBuffer);
- if (journalFileSize != file.getFile().size())
+ if (journalFileSize != filesize)
{
throw new RuntimeException("Invalid read! The system couldn't read the entire file into memory");
}
- wholeFileBuffer.position(0);
-
// First long is the ordering timestamp, we just jump its position
wholeFileBuffer.position(SIZE_HEADER);
@@ -1397,12 +1397,11 @@
compactingLock.writeLock().lock();
try
{
+ autoReclaim = false;
// We need to move to the next file, as we need a clear start for negatives and positives counts
- moveNextFile();
+ moveNextFile(true);
- autoReclaim = false;
-
// Take the snapshots and replace the structures
dataFilesToProcess.addAll(dataFiles);
@@ -1417,6 +1416,11 @@
dataFiles.clear();
+ if (dataFilesToProcess.size() == 0)
+ {
+ return;
+ }
+
compactor = new JournalCompactor(fileFactory, this, records.keySet(), dataFilesToProcess.get(0).getFileID());
for (Map.Entry<Long, JournalTransaction> entry : transactions.entrySet())
@@ -1438,19 +1442,8 @@
// Read the files, and use the JournalCompactor class to create the new outputFiles, and the new collections as
// well
- JournalFile previousFile = null;
for (final JournalFile file : dataFilesToProcess)
{
- if (previousFile != null)
- {
- if (file.getFileID() < previousFile.getFileID())
- {
- // Sanity check, this should never happen
- throw new IllegalStateException("DataFiles out of order!");
- }
- }
- previousFile = file;
-
log.info("Compacting file " + file.getFile().getFileName() + ", internalID = " + file.getFileID());
readJournalFile(fileFactory, file, compactor);
}
@@ -1493,6 +1486,8 @@
dataFiles.addFirst(fileToAdd);
}
+ trace("There are " + dataFiles.size() + " datafiles Now");
+
// Replay pending commands (including updates, deletes and commits)
localCompactor.replayPendingCommands();
@@ -2001,21 +1996,7 @@
log.warn("Could not remove file " + file);
}
- // FIXME - size() involves a scan!!!
- if (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() < minFiles)
- {
- // Re-initialise it
-
- JournalFile jf = reinitializeFile(file);
-
- freeFiles.add(jf);
- }
- else
- {
- file.getFile().open(1);
-
- file.getFile().delete();
- }
+ addFreeFile(file);
}
}
}
@@ -2046,7 +2027,7 @@
long compactMargin = (long)(totalBytes * compactPercentage);
- if (totalLiveSize < compactMargin && compactor == null && dataFiles.length > compactMinFiles)
+ if (totalLiveSize < compactMargin && compactorWait.getCount() == 0 && dataFiles.length > compactMinFiles)
{
log.info("Compacting being started, numberOfDataFiles = " + dataFiles.length +
@@ -2055,6 +2036,8 @@
", margin to start compacting = " +
compactMargin);
+ compactorWait.waitCompletion();
+
compactorWait.up();
// We can't use the executor for the compacting... or we would lock files opening and creation (besides other
@@ -2148,6 +2131,8 @@
* It will call waitComplete on every transaction, so any assertions on the file system will be correct after this */
public void debugWait() throws Exception
{
+ compactorWait.waitCompletion();
+
fileFactory.testFlush();
for (JournalTransaction tx : transactions.values())
@@ -2230,7 +2215,7 @@
lock.acquire();
try
{
- moveNextFile();
+ moveNextFile(true);
debugWait();
}
finally
@@ -2279,6 +2264,11 @@
try
{
+ while (!compactorWait.waitCompletion(60000))
+ {
+ log.warn("Waiting the compactor to finish its operations");
+ }
+
filesExecutor.shutdown();
if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS))
@@ -2286,14 +2276,9 @@
log.warn("Couldn't stop journal executor after 60 seconds");
}
- while (!compactorWait.waitCompletion(60000))
- {
- log.warn("Waiting the compactor to finish its operations");
- }
-
fileFactory.stop();
- if (currentFile != null)
+ if (currentFile != null && currentFile.getFile().isOpen())
{
currentFile.getFile().close();
}
@@ -2316,13 +2301,6 @@
finally
{
lock.release();
- try
- {
- compactingLock.writeLock().unlock();
- }
- catch (Throwable ignored)
- {
- }
}
}
@@ -2342,8 +2320,7 @@
{
for (JournalFile file : oldFiles)
{
- dataFiles.remove(file);
- freeFiles.add(reinitializeFile(file));
+ addFreeFile(file);
}
for (JournalFile file : newFiles)
@@ -2371,6 +2348,29 @@
// Private
// -----------------------------------------------------------------------------
+ /**
+ * @param file
+ * @throws Exception
+ */
+ private void addFreeFile(JournalFile file) throws Exception
+ {
+ // FIXME - size() involves a scan!!!
+ if (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() < minFiles)
+ {
+ // Re-initialise it
+
+ JournalFile jf = reinitializeFile(file);
+
+ freeFiles.add(jf);
+ }
+ else
+ {
+ file.getFile().open(1);
+
+ file.getFile().delete();
+ }
+ }
+
private void checkReclaimStatus() throws Exception
{
reclaimer.scan(getDataFiles());
@@ -2558,7 +2558,7 @@
if (!currentFile.getFile().fits(size))
{
currentFile.getFile().unlockBuffer();
- moveNextFile();
+ moveNextFile(false);
currentFile.getFile().lockBuffer();
// The same check needs to be done at the new file also
@@ -2699,12 +2699,11 @@
}
// You need to guarantee lock.acquire() before calling this method
- private void moveNextFile() throws InterruptedException
+ private void moveNextFile(boolean synchronous) throws InterruptedException
{
- // Asynchronously close the file
- closeFile(currentFile);
+ closeFile(currentFile, synchronous);
- currentFile = enqueueOpenFile();
+ currentFile = enqueueOpenFile(synchronous);
if (trace)
{
@@ -2719,14 +2718,14 @@
* <p>In case there are no cached opened files, this method will block until the file was opened,
* what would happen only if the system is under heavy load by another system (like a backup system, or a DB sharing the same box as JBM).</p>
* */
- private JournalFile enqueueOpenFile() throws InterruptedException
+ private JournalFile enqueueOpenFile(boolean synchronous) throws InterruptedException
{
if (trace)
{
trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
}
- filesExecutor.execute(new Runnable()
+ Runnable run = new Runnable()
{
public void run()
{
@@ -2739,10 +2738,19 @@
log.error(e.getMessage(), e);
}
}
- });
+ };
- if (autoReclaim)
+ if (synchronous)
{
+ run.run();
+ }
+ else
+ {
+ filesExecutor.execute(run);
+ }
+
+ if (autoReclaim && !synchronous)
+ {
filesExecutor.execute(new Runnable()
{
public void run()
@@ -2827,12 +2835,12 @@
return nextOpenedFile;
}
- private void closeFile(final JournalFile file)
+ private void closeFile(final JournalFile file, boolean synchronous)
{
fileFactory.deactivate(file.getFile());
pendingCloseFiles.add(file);
- filesExecutor.execute(new Runnable()
+ Runnable run = new Runnable()
{
public void run()
{
@@ -2855,7 +2863,17 @@
compactingLock.readLock().unlock();
}
}
- });
+ };
+
+ if (synchronous)
+ {
+ run.run();
+ }
+ else
+ {
+ filesExecutor.execute(run);
+ }
+
}
private JournalTransaction getTransactionInfo(final long txID)
Added: trunk/tests/src/org/jboss/messaging/tests/integration/client/CompactingTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/CompactingTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/CompactingTest.java 2009-07-06 23:33:05 UTC (rev 7528)
@@ -0,0 +1,326 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.client;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.messaging.core.buffers.ChannelBuffers;
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.server.JournalType;
+import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+
+/**
+ * A CompactingTest
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class CompactingTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private static final String AD1 = "ad1";
+
+ private static final String AD2 = "ad2";
+
+ private static final String Q1 = "q1";
+
+ private static final String Q2 = "q2";
+
+ private MessagingServer server;
+
+ private ClientSessionFactory sf;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testMultiProducerAndCompactNIO() throws Throwable
+ {
+ internalTestMultiProducer(JournalType.NIO);
+ }
+
+ public void testMultiProducerAndCompactAIO() throws Throwable
+ {
+ internalTestMultiProducer(JournalType.ASYNCIO);
+ }
+
+ public void internalTestMultiProducer(JournalType journalType) throws Throwable
+ {
+
+ setupServer(journalType);
+
+ final AtomicInteger numberOfMessages = new AtomicInteger(0);
+ final int NUMBER_OF_FAST_MESSAGES = 100000;
+ final int SLOW_INTERVAL = 100;
+
+ final CountDownLatch latchReady = new CountDownLatch(2);
+ final CountDownLatch latchStart = new CountDownLatch(1);
+
+ class FastProducer extends Thread
+ {
+ Throwable e;
+
+ FastProducer()
+ {
+ super("Fast-Thread");
+ }
+
+ public void run()
+ {
+ ClientSession session = null;
+ ClientSession sessionSlow = null;
+ latchReady.countDown();
+ try
+ {
+ latchStart.await();
+ session = sf.createSession(true, true);
+ sessionSlow = sf.createSession(false, false);
+ ClientProducer prod = session.createProducer(AD2);
+ ClientProducer slowProd = sessionSlow.createProducer(AD1);
+ for (int i = 0; i < NUMBER_OF_FAST_MESSAGES; i++)
+ {
+ if (i % SLOW_INTERVAL == 0)
+ {
+ if (numberOfMessages.incrementAndGet() % 5 == 0)
+ {
+ sessionSlow.commit();
+ }
+ slowProd.send(session.createClientMessage(true));
+ }
+ ClientMessage msg = session.createClientMessage(true);
+ msg.setBody(ChannelBuffers.wrappedBuffer(new byte[1024]));
+ prod.send(msg);
+ }
+ sessionSlow.commit();
+ }
+ catch (Throwable e)
+ {
+ this.e = e;
+ }
+ finally
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Throwable e)
+ {
+ this.e = e;
+ }
+ try
+ {
+ sessionSlow.close();
+ }
+ catch (Throwable e)
+ {
+ this.e = e;
+ }
+ }
+ }
+ }
+
+ class FastConsumer extends Thread
+ {
+ Throwable e;
+
+ FastConsumer()
+ {
+ super("Fast-Consumer");
+ }
+
+ public void run()
+ {
+ ClientSession session = null;
+ latchReady.countDown();
+ try
+ {
+ latchStart.await();
+ session = sf.createSession(true, true);
+ session.start();
+ ClientConsumer cons = session.createConsumer(Q2);
+ for (int i = 0; i < NUMBER_OF_FAST_MESSAGES; i++)
+ {
+ ClientMessage msg = cons.receive(60 * 1000);
+ msg.acknowledge();
+ }
+
+ assertNull(cons.receiveImmediate());
+ }
+ catch (Throwable e)
+ {
+ this.e = e;
+ }
+ finally
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Throwable e)
+ {
+ this.e = e;
+ }
+ }
+ }
+ }
+
+ FastConsumer f1 = new FastConsumer();
+ f1.start();
+
+ FastProducer p1 = new FastProducer();
+ p1.start();
+
+ latchReady.await();
+ latchStart.countDown();
+
+ p1.join();
+
+ if (p1.e != null)
+ {
+ throw p1.e;
+ }
+
+ f1.join();
+
+ if (f1.e != null)
+ {
+ throw f1.e;
+ }
+
+ sf.close();
+
+ server.stop();
+
+ setupServer(journalType);
+
+ ClientSession sess = sf.createSession(true, true);
+
+ ClientConsumer cons = sess.createConsumer(Q1);
+
+ sess.start();
+
+ for (int i = 0; i < numberOfMessages.intValue(); i++)
+ {
+ ClientMessage msg = cons.receive(10000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ }
+
+ assertNull(cons.receiveImmediate());
+
+ cons.close();
+
+ cons = sess.createConsumer(Q2);
+
+ assertNull(cons.receive(100));
+
+ sess.close();
+
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ clearData();
+ }
+
+ /**
+ * @throws Exception
+ * @throws MessagingException
+ */
+ private void setupServer(JournalType journalType) throws Exception, MessagingException
+ {
+ Configuration config = createDefaultConfig();
+ config.setJournalFileSize(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE);
+
+ config.setJournalType(journalType);
+
+ config.setJournalCompactMinFiles(ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_MIN_FILES);
+ config.setJournalCompactPercentage(ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_PERCENTAGE);
+
+ server = createServer(true, config);
+
+ server.start();
+
+ sf = createInVMFactory();
+
+ ClientSession sess = sf.createSession();
+
+ try
+ {
+ sess.createQueue(AD1, Q1, true);
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ try
+ {
+ sess.createQueue(AD2, Q2, true);
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ sess.close();
+
+ sf = createInVMFactory();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ sf.close();
+
+ server.stop();
+
+ // super.tearDown();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java 2009-07-06 20:00:55 UTC (rev 7527)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java 2009-07-06 23:33:05 UTC (rev 7528)
@@ -301,8 +301,6 @@
}
}
- System.out.println("Number of Files: " + journal.getDataFilesCount());
-
if (regularAdd)
{
for (int i = 0; i < NUMBER_OF_RECORDS; i++)
@@ -555,8 +553,6 @@
update(i);
}
- System.out.println("Number of Files: " + journal.getDataFilesCount());
-
for (int i = 0; i < NUMBER_OF_RECORDS; i++)
{
if (!(i % 10 == 0))
@@ -696,15 +692,8 @@
journal.forceMoveNextFile();
}
- System.out.println("DataFiles = " + journal.getDataFilesCount());
-
JournalFile files[] = journal.getDataFiles();
- for (JournalFile file : files)
- {
- System.out.println("Size: " + file.getLiveSize());
- }
-
stopJournal();
createJournal();
startJournal();
@@ -712,17 +701,10 @@
journal.forceMoveNextFile();
- System.out.println("DataFiles = " + journal.getDataFilesCount());
-
JournalFile files2[] = journal.getDataFiles();
assertEquals(files.length, files2.length);
- for (JournalFile file : files2)
- {
- System.out.println("Size: " + file.getLiveSize());
- }
-
for (int i = 0; i < files.length; i++)
{
assertEquals(expectedSizes.get(i).intValue(), files[i].getLiveSize());
@@ -742,11 +724,6 @@
for (JournalFile file : files3)
{
- System.out.println("Size: " + file.getLiveSize());
- }
-
- for (JournalFile file : files3)
- {
assertEquals(0, file.getLiveSize());
}
More information about the jboss-cvs-commits
mailing list