Author: clebert.suconic(a)jboss.com
Date: 2010-08-09 13:03:25 -0400 (Mon, 09 Aug 2010)
New Revision: 9519
Modified:
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
trunk/tests/src/org/hornetq/tests/util/RandomUtil.java
Log:
HORNETQ-475 - Reuse of cleaned up files instead of always deleting them
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-09 15:01:23 UTC
(rev 9518)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-09 17:03:25 UTC
(rev 9519)
@@ -2220,7 +2220,7 @@
JournalImpl.log.warn("Could not remove file " + file);
}
- addFreeFile(file);
+ addFreeFile(file, false);
}
}
@@ -2364,10 +2364,33 @@
SequentialFile controlFile = createControlFile(null, null, new Pair<String,
String>(tmpFileName,
cleanedFileName));
- file.getFile().delete();
+
+ SequentialFile returningFile =
fileFactory.createSequentialFile(file.getFile().getFileName(), maxAIO);
+
+ returningFile.renameTo(renameExtensionFile(tmpFileName, ".cmp") +
".tmp");
+
tmpFile.renameTo(cleanedFileName);
+
controlFile.delete();
+ final JournalFile retJournalfile = new JournalFileImpl(returningFile, -1);
+
+ filesExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ addFreeFile(retJournalfile, true);
+ }
+ catch (Throwable e)
+ {
+ log.warn("Error reinitializing file " + file, e);
+ }
+
+ }
+ });
+
}
finally
{
@@ -2750,7 +2773,7 @@
{
try
{
- addFreeFile(file);
+ addFreeFile(file, false);
}
catch (Throwable e)
{
@@ -2768,13 +2791,22 @@
for (JournalFile file : newFiles)
{
- String newName = file.getFile().getFileName();
- newName = newName.substring(0, newName.lastIndexOf(".cmp"));
+ String newName = renameExtensionFile(file.getFile().getFileName(),
".cmp");
file.getFile().renameTo(newName);
}
}
+ /**
+ * @param name
+ * @return
+ */
+ private String renameExtensionFile(String name, String extension)
+ {
+ name = name.substring(0, name.lastIndexOf(extension));
+ return name;
+ }
+
/** This is an interception point for testcases, when the compacted files are written,
before replacing the data structures */
protected void onCompactDone()
{
@@ -2787,12 +2819,11 @@
* @param file
* @throws Exception
*/
- private void addFreeFile(final JournalFile file) throws Exception
+ private void addFreeFile(final JournalFile file, final boolean renameTmp) throws
Exception
{
if (file.getFile().size() != this.getFileSize())
{
- // This will happen during cleanup
- log.debug("Deleting " + file + ".. as it doesn't have the
standard size", new Exception ("trace"));
+ log.warn("Deleting " + file + ".. as it doesn't have the
configured size", new Exception ("trace"));
file.getFile().delete();
}
else
@@ -2802,6 +2833,11 @@
// Re-initialise it
JournalFile jf = reinitializeFile(file);
+
+ if (renameTmp)
+ {
+ jf.getFile().renameTo(renameExtensionFile(jf.getFile().getFileName(),
".tmp"));
+ }
freeFiles.add(jf);
}
@@ -3109,14 +3145,7 @@
String fileName;
- if (tmpCompact)
- {
- fileName = filePrefix + "-" + fileID + "." + fileExtension +
".cmp";
- }
- else
- {
- fileName = filePrefix + "-" + fileID + "." + fileExtension;
- }
+ fileName = createFileName(tmpCompact, fileID);
if (JournalImpl.trace)
{
@@ -3158,6 +3187,25 @@
return new JournalFileImpl(sequentialFile, fileID);
}
+ /**
+ * @param tmpCompact
+ * @param fileID
+ * @return
+ */
+ private String createFileName(final boolean tmpCompact, long fileID)
+ {
+ String fileName;
+ if (tmpCompact)
+ {
+ fileName = filePrefix + "-" + fileID + "." + fileExtension +
".cmp";
+ }
+ else
+ {
+ fileName = filePrefix + "-" + fileID + "." + fileExtension;
+ }
+ return fileName;
+ }
+
private void openFile(final JournalFile file, final boolean multiAIO) throws
Exception
{
if (multiAIO)
Modified: trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2010-08-09
15:01:23 UTC (rev 9518)
+++ trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2010-08-09
17:03:25 UTC (rev 9519)
@@ -292,6 +292,8 @@
}
return;
}
+
+ position.addAndGet(bytes.limit());
if (maxIOSemaphore == null)
{
@@ -336,8 +338,6 @@
*/
private void doInternalWrite(final ByteBuffer bytes, final boolean sync, final
IOAsyncTask callback) throws Exception
{
- position.addAndGet(bytes.limit());
-
channel.write(bytes);
if (sync)
Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-08-09
15:01:23 UTC (rev 9518)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-08-09
17:03:25 UTC (rev 9519)
@@ -739,7 +739,7 @@
final int numberOfIntegers = 10;
- final int numberOfMessages = 10;
+ final int numberOfMessages = 500;
try
{
@@ -756,6 +756,8 @@
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
ClientMessage message = null;
+
+
for (int i = 0; i < numberOfMessages; i++)
{
@@ -780,13 +782,13 @@
session.start();
for (int i = 0; i < numberOfMessages; i++)
{
- ClientMessage msg = consumer.receive(500);
+ System.out.println("Received " + i);
+ ClientMessage msg = consumer.receive(5000);
Assert.assertNotNull(msg);
msg.acknowledge();
+ session.commit();
}
- session.commit();
-
session.close();
}
finally
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java 2010-08-09
15:01:23 UTC (rev 9518)
+++
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java 2010-08-09
17:03:25 UTC (rev 9519)
@@ -25,6 +25,7 @@
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.tests.unit.core.journal.impl.fakes.SimpleEncoding;
+import org.hornetq.tests.util.RandomUtil;
/**
*
@@ -37,6 +38,17 @@
public abstract class JournalImplTestUnit extends JournalImplTestBase
{
private static final Logger log = Logger.getLogger(JournalImplTestUnit.class);
+
+ protected void tearDown() throws Exception
+ {
+ List<String> files = fileFactory.listFiles(fileExtension);
+
+ for (String file : files)
+ {
+ SequentialFile seqFile = fileFactory.createSequentialFile(file, 1);
+ assertEquals(fileSize, seqFile.size());
+ }
+ }
// General tests
// =============
@@ -2390,14 +2402,14 @@
public void testMultipleAddUpdateDeleteDifferentRecordLengths() throws Exception
{
- setup(10, 2048, true);
+ setup(10, 20480, true);
createJournal();
startJournal();
load();
for (int i = 0; i < 100; i++)
{
- byte[] record = generateRecord(10 + (int)(1500 * Math.random()));
+ byte[] record = generateRecord(RandomUtil.randomInterval(1500, 10000));
journal.appendAddRecord(i, (byte)0, record, false);
@@ -2406,7 +2418,7 @@
for (int i = 0; i < 100; i++)
{
- byte[] record = generateRecord(10 + (int)(1024 * Math.random()));
+ byte[] record = generateRecord(10 + RandomUtil.randomInterval(1500, 10000));
journal.appendUpdateRecord(i, (byte)0, record, false);
Modified: trunk/tests/src/org/hornetq/tests/util/RandomUtil.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/RandomUtil.java 2010-08-09 15:01:23 UTC (rev
9518)
+++ trunk/tests/src/org/hornetq/tests/util/RandomUtil.java 2010-08-09 17:03:25 UTC (rev
9519)
@@ -72,6 +72,12 @@
return Math.abs(RandomUtil.randomInt());
}
+
+ public static int randomInterval(final int min, final int max)
+ {
+ return min + randomMax(max - min);
+ }
+
public static int randomMax(int max)
{
int value = randomPositiveInt() % max;