Author: clebert.suconic(a)jboss.com
Date: 2011-03-27 00:55:20 -0400 (Sun, 27 Mar 2011)
New Revision: 10378
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalTransaction.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/journal/NIOBufferedJournalCompactTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-6198 - fixing compactor tests
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-03-27
00:43:12 UTC (rev 10377)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-03-27
04:55:20 UTC (rev 10378)
@@ -59,8 +59,6 @@
private final BlockingDeque<JournalFile> dataFiles = new
LinkedBlockingDeque<JournalFile>();
- private final BlockingQueue<JournalFile> pendingCloseFiles = new
LinkedBlockingDeque<JournalFile>();
-
private final ConcurrentLinkedQueue<JournalFile> freeFiles = new
ConcurrentLinkedQueue<JournalFile>();
private final BlockingQueue<JournalFile> openedFiles = new
LinkedBlockingQueue<JournalFile>();
@@ -80,8 +78,6 @@
private final int userVersion;
private Executor openFilesExecutor;
-
- private Executor closeFilesExecutor;
// Static --------------------------------------------------------
@@ -106,18 +102,15 @@
// Public --------------------------------------------------------
- public void setExecutor(final Executor fileExecutor, final Executor closeExecutor)
+ public void setExecutor(final Executor fileExecutor)
{
this.openFilesExecutor = fileExecutor;
- this.closeFilesExecutor = closeExecutor;
}
- public void clear()
+ public void clear() throws Exception
{
dataFiles.clear();
- drainClosedFiles();
-
freeFiles.clear();
for (JournalFile file : openedFiles)
@@ -269,8 +262,19 @@
*/
public synchronized void addFreeFile(final JournalFile file, final boolean renameTmp)
throws Exception
{
- if (file.getFile().size() != fileSize)
+ long calculatedSize = 0;
+ try
{
+ calculatedSize = file.getFile().size();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ System.out.println("Can't get file size on " + file);
+ System.exit(-1);
+ }
+ if (calculatedSize != fileSize)
+ {
JournalFilesRepository.log.warn("Deleting " + file + ".. as it
doesn't have the configured size");
file.getFile().delete();
}
@@ -317,23 +321,6 @@
return openedFiles.size();
}
- public void drainClosedFiles()
- {
- JournalFile file;
- try
- {
- while ((file = pendingCloseFiles.poll()) != null)
- {
- file.getFile().close();
- }
- }
- catch (Exception e)
- {
- JournalFilesRepository.log.warn(e.getMessage(), e);
- }
-
- }
-
/**
* <p>This method will instantly return the opened file, and schedule opening
and reclaiming.</p>
* <p>In case there are no cached opened files, this method will block until the
file was opened,
@@ -406,31 +393,11 @@
openedFiles.offer(nextOpenedFile);
}
- public void closeFile(final JournalFile file)
+ public void closeFile(final JournalFile file) throws Exception
{
fileFactory.deactivateBuffer();
- pendingCloseFiles.add(file);
+ file.getFile().close();
dataFiles.add(file);
-
- Runnable run = new Runnable()
- {
- public void run()
- {
- drainClosedFiles();
- }
- };
-
- // We can't close files while the compactor is running
- // as we may be closing files that are being read by the compactor
- if (closeFilesExecutor == null)
- {
- run.run();
- }
- else
- {
- closeFilesExecutor.execute(run);
- }
-
}
/**
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2011-03-27
00:43:12 UTC (rev 10377)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2011-03-27
04:55:20 UTC (rev 10378)
@@ -1655,8 +1655,6 @@
// We need to move to the next file, as we need a clear start for negatives
and positives counts
moveNextFile(false);
- filesRepository.drainClosedFiles();
-
// Take the snapshots and replace the structures
dataFilesToProcess.addAll(filesRepository.getDataFiles());
@@ -2544,7 +2542,7 @@
}
});
- filesRepository.setExecutor(filesExecutor, compactorExecutor);
+ filesRepository.setExecutor(filesExecutor);
fileFactory.start();
@@ -2576,7 +2574,7 @@
filesExecutor.shutdown();
- filesRepository.setExecutor(null, null);
+ filesRepository.setExecutor(null);
if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS))
{
@@ -2590,8 +2588,6 @@
currentFile.getFile().close();
}
- filesRepository.drainClosedFiles();
-
filesRepository.clear();
fileFactory.stop();
@@ -2947,14 +2943,6 @@
callback = null;
}
- if (sync && !compactorRunning.get())
- {
- // In an edge case the transaction could still have pending data from
previous files.
- // This shouldn't cause any blocking issues, as this is here to guarantee
we cover all possibilities
- // on guaranteeing the data is on the disk
- tx.syncPreviousFiles(fileFactory.isSupportsCallbacks(), currentFile);
- }
-
// We need to add the number of records on currentFile if prepare or commit
if (completeTransaction)
{
@@ -2983,7 +2971,7 @@
}
// You need to guarantee lock.acquire() before calling this method
- private void moveNextFile(final boolean scheduleReclaim) throws InterruptedException
+ private void moveNextFile(final boolean scheduleReclaim) throws Exception
{
filesRepository.closeFile(currentFile);
@@ -3017,7 +3005,6 @@
{
try
{
- filesRepository.drainClosedFiles();
if (!checkReclaimStatus())
{
checkCompact();
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalTransaction.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalTransaction.java 2011-03-27
00:43:12 UTC (rev 10377)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalTransaction.java 2011-03-27
04:55:20 UTC (rev 10378)
@@ -193,38 +193,6 @@
data.setNumberOfRecords(getCounter(currentFile));
}
- /** 99.99 % of the times previous files will be already synced, since they are
scheduled to be closed.
- * Because of that, this operation should be almost very fast.*/
- public void syncPreviousFiles(final boolean callbacks, final JournalFile currentFile)
throws Exception
- {
- if (callbacks)
- {
- if (callbackList != null)
- {
- for (Map.Entry<JournalFile, TransactionCallback> entry :
callbackList.entrySet())
- {
- if (entry.getKey() != currentFile)
- {
- entry.getValue().waitCompletion();
- }
- }
- }
- }
- else
- {
- if (pendingFiles != null)
- {
- for (JournalFile file : pendingFiles)
- {
- if (file != currentFile)
- {
- file.getFile().waitForClose();
- }
- }
- }
- }
- }
-
public TransactionCallback getCallback(final JournalFile file) throws Exception
{
if (callbackList == null)
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/journal/NIOBufferedJournalCompactTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/journal/NIOBufferedJournalCompactTest.java 2011-03-27
00:43:12 UTC (rev 10377)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/journal/NIOBufferedJournalCompactTest.java 2011-03-27
04:55:20 UTC (rev 10378)
@@ -39,6 +39,18 @@
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
+
+ public void testLoop() throws Exception
+ {
+ int i = 0 ;
+ while (true)
+ {
+ System.out.println("#test " + (i++));
+ testOnRollback();
+ tearDown();
+ setUp();
+ }
+ }
// Protected -----------------------------------------------------
@Override