[jboss-cvs] JBoss Messaging SVN: r7526 - in trunk: tests/src/org/jboss/messaging/tests/integration/journal and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jul 6 15:26:35 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-07-06 15:26:35 -0400 (Mon, 06 Jul 2009)
New Revision: 7526
Modified:
trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java
Log:
tweaks
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2009-07-06 15:42:01 UTC (rev 7525)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2009-07-06 19:26:35 UTC (rev 7526)
@@ -361,12 +361,10 @@
if (timedBuffer != null)
{
// sanity check.. it shouldn't happen
- throw new IllegalStateException("Illegal buffered usage. Can't use ByteBuffer write while buffer SequentialFile");
+ log.warn("Illegal buffered usage. Can't use ByteBuffer write while buffer SequentialFile");
}
- else
- {
- doWrite(bytes, callback);
- }
+
+ doWrite(bytes, callback);
}
public void write(final ByteBuffer bytes, final boolean sync) throws Exception
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-07-06 15:42:01 UTC (rev 7525)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-07-06 19:26:35 UTC (rev 7526)
@@ -195,6 +195,8 @@
private final LinkedBlockingDeque<JournalFile> dataFiles = new LinkedBlockingDeque<JournalFile>();
+ private final LinkedBlockingDeque<JournalFile> pendingCloseFiles = new LinkedBlockingDeque<JournalFile>();
+
private final Queue<JournalFile> freeFiles = new ConcurrentLinkedQueue<JournalFile>();
private final BlockingQueue<JournalFile> openedFiles = new LinkedBlockingQueue<JournalFile>();
@@ -1405,6 +1407,14 @@
dataFilesToProcess.addAll(dataFiles);
+ for (JournalFile file : pendingCloseFiles)
+ {
+ file.getFile().close();
+ }
+
+ dataFilesToProcess.addAll(pendingCloseFiles);
+ pendingCloseFiles.clear();
+
dataFiles.clear();
compactor = new JournalCompactor(fileFactory, this, records.keySet(), dataFilesToProcess.get(0).getFileID());
@@ -1967,132 +1977,6 @@
return maxID;
}
- protected void deleteControlFile(final SequentialFile controlFile) throws Exception
- {
- controlFile.delete();
- }
-
- /** being protected as testcases can override this method */
- protected void renameFiles(final List<JournalFile> oldFiles, final List<JournalFile> newFiles) throws Exception
- {
- for (JournalFile file : oldFiles)
- {
- dataFiles.remove(file);
- freeFiles.add(reinitializeFile(file));
- }
-
- for (JournalFile file : newFiles)
- {
- String newName = file.getFile().getFileName();
- newName = newName.substring(0, newName.lastIndexOf(".cmp"));
- file.getFile().renameTo(newName);
- }
-
- }
-
- /** This is an interception point for testcases, when the compacted files are written, before replacing the data structures */
- protected void onCompactDone()
- {
- }
-
- /**
- * @throws Exception
- */
- protected SequentialFile createControlFile(final List<JournalFile> files, final List<JournalFile> newFiles) throws Exception
- {
- return JournalCompactor.writeControlFile(fileFactory, files, newFiles);
- }
-
- // TestableJournal implementation
- // --------------------------------------------------------------
-
- public void setAutoReclaim(final boolean autoReclaim)
- {
- this.autoReclaim = autoReclaim;
- }
-
- public boolean isAutoReclaim()
- {
- return autoReclaim;
- }
-
- public String debug() throws Exception
- {
- checkReclaimStatus();
-
- StringBuilder builder = new StringBuilder();
-
- for (JournalFile file : dataFiles)
- {
- builder.append("DataFile:" + file +
- " posCounter = " +
- file.getPosCount() +
- " reclaimStatus = " +
- file.isCanReclaim() +
- " live size = " +
- file.getLiveSize() +
- "\n");
- if (file instanceof JournalFileImpl)
- {
- builder.append(((JournalFileImpl)file).debug());
-
- }
- }
-
- for (JournalFile file : freeFiles)
- {
- builder.append("FreeFile:" + file + "\n");
- }
-
- if (currentFile != null)
- {
- builder.append("CurrentFile:" + currentFile + " posCounter = " + currentFile.getPosCount() + "\n");
-
- if (currentFile instanceof JournalFileImpl)
- {
- builder.append(((JournalFileImpl)currentFile).debug());
- }
- }
- else
- {
- builder.append("CurrentFile: No current file at this point!");
- }
-
- builder.append("#Opened Files:" + openedFiles.size());
-
- return builder.toString();
- }
-
- /** Method for use on testcases.
- * It will call waitComplete on every transaction, so any assertions on the file system will be correct after this */
- public void debugWait() throws Exception
- {
- fileFactory.testFlush();
-
- for (JournalTransaction tx : transactions.values())
- {
- tx.waitCallbacks();
- }
-
- if (filesExecutor != null && !filesExecutor.isShutdown())
- {
- // Send something to the closingExecutor, just to make sure we went
- // until its end
- final CountDownLatch latch = new CountDownLatch(1);
-
- filesExecutor.execute(new Runnable()
- {
- public void run()
- {
- latch.countDown();
- }
- });
-
- latch.await();
- }
-
- }
-
public void checkAndReclaimFiles() throws Exception
{
// We can't start compacting while compacting is working
@@ -2200,6 +2084,96 @@
}
}
+ // TestableJournal implementation
+ // --------------------------------------------------------------
+
+ public void setAutoReclaim(final boolean autoReclaim)
+ {
+ this.autoReclaim = autoReclaim;
+ }
+
+ public boolean isAutoReclaim()
+ {
+ return autoReclaim;
+ }
+
+ public String debug() throws Exception
+ {
+ checkReclaimStatus();
+
+ StringBuilder builder = new StringBuilder();
+
+ for (JournalFile file : dataFiles)
+ {
+ builder.append("DataFile:" + file +
+ " posCounter = " +
+ file.getPosCount() +
+ " reclaimStatus = " +
+ file.isCanReclaim() +
+ " live size = " +
+ file.getLiveSize() +
+ "\n");
+ if (file instanceof JournalFileImpl)
+ {
+ builder.append(((JournalFileImpl)file).debug());
+
+ }
+ }
+
+ for (JournalFile file : freeFiles)
+ {
+ builder.append("FreeFile:" + file + "\n");
+ }
+
+ if (currentFile != null)
+ {
+ builder.append("CurrentFile:" + currentFile + " posCounter = " + currentFile.getPosCount() + "\n");
+
+ if (currentFile instanceof JournalFileImpl)
+ {
+ builder.append(((JournalFileImpl)currentFile).debug());
+ }
+ }
+ else
+ {
+ builder.append("CurrentFile: No current file at this point!");
+ }
+
+ builder.append("#Opened Files:" + openedFiles.size());
+
+ return builder.toString();
+ }
+
+ /** Method for use on testcases.
+ * It will call waitComplete on every transaction, so any assertions on the file system will be correct after this */
+ public void debugWait() throws Exception
+ {
+ fileFactory.testFlush();
+
+ for (JournalTransaction tx : transactions.values())
+ {
+ tx.waitCallbacks();
+ }
+
+ if (filesExecutor != null && !filesExecutor.isShutdown())
+ {
+ // Send something to the closingExecutor, just to make sure we went
+ // until its end
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ filesExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ latch.countDown();
+ }
+ });
+
+ latch.await();
+ }
+
+ }
+
public int getDataFilesCount()
{
return dataFiles.size();
@@ -2355,6 +2329,45 @@
// Public
// -----------------------------------------------------------------------------
+ // Protected
+ // -----------------------------------------------------------------------------
+
+ protected void deleteControlFile(final SequentialFile controlFile) throws Exception
+ {
+ controlFile.delete();
+ }
+
+ /** being protected as testcases can override this method */
+ protected void renameFiles(final List<JournalFile> oldFiles, final List<JournalFile> newFiles) throws Exception
+ {
+ for (JournalFile file : oldFiles)
+ {
+ dataFiles.remove(file);
+ freeFiles.add(reinitializeFile(file));
+ }
+
+ for (JournalFile file : newFiles)
+ {
+ String newName = file.getFile().getFileName();
+ newName = newName.substring(0, newName.lastIndexOf(".cmp"));
+ file.getFile().renameTo(newName);
+ }
+
+ }
+
+ /** This is an interception point for testcases, when the compacted files are written, before replacing the data structures */
+ protected void onCompactDone()
+ {
+ }
+
+ /**
+ * @throws Exception
+ */
+ protected SequentialFile createControlFile(final List<JournalFile> files, final List<JournalFile> newFiles) throws Exception
+ {
+ return JournalCompactor.writeControlFile(fileFactory, files, newFiles);
+ }
+
// Private
// -----------------------------------------------------------------------------
@@ -2817,20 +2830,27 @@
private void closeFile(final JournalFile file)
{
fileFactory.deactivate(file.getFile());
- dataFiles.add(file);
+ pendingCloseFiles.add(file);
filesExecutor.execute(new Runnable()
{
public void run()
{
+ compactingLock.readLock().lock();
try
{
file.getFile().close();
+ dataFiles.add(file);
+ pendingCloseFiles.remove(file);
}
catch (Exception e)
{
log.warn(e.getMessage(), e);
}
+ finally
+ {
+ compactingLock.readLock().unlock();
+ }
}
});
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java 2009-07-06 15:42:01 UTC (rev 7525)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java 2009-07-06 19:26:35 UTC (rev 7526)
@@ -613,15 +613,8 @@
journal.forceMoveNextFile();
}
- System.out.println("DataFiles = " + journal.getDataFilesCount());
-
JournalFile files[] = journal.getDataFiles();
- for (JournalFile file : files)
- {
- System.out.println("Size: " + file.getLiveSize());
- }
-
stopJournal();
createJournal();
startJournal();
@@ -629,17 +622,10 @@
journal.forceMoveNextFile();
- System.out.println("DataFiles = " + journal.getDataFilesCount());
-
JournalFile files2[] = journal.getDataFiles();
assertEquals(files.length, files2.length);
- for (JournalFile file : files2)
- {
- System.out.println("Size: " + file.getLiveSize());
- }
-
for (int i = 0; i < files.length; i++)
{
assertEquals(expectedSizes.get(i).intValue(), files[i].getLiveSize());
@@ -657,11 +643,6 @@
for (JournalFile file : files3)
{
- System.out.println("Size: " + file.getLiveSize());
- }
-
- for (JournalFile file : files3)
- {
assertEquals(0, file.getLiveSize());
}
More information about the jboss-cvs-commits
mailing list