[hornetq-commits] JBoss hornetq SVN: r9599 - in branches/Branch_2_1: src/main/org/hornetq/core/journal/impl and 3 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Aug 26 17:11:43 EDT 2010
Author: clebert.suconic at jboss.com
Date: 2010-08-26 17:11:42 -0400 (Thu, 26 Aug 2010)
New Revision: 9599
Added:
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java
Modified:
branches/Branch_2_1/src/main/org/hornetq/core/journal/TestableJournal.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java
branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
branches/Branch_2_1/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-485 Improvements on compacting and file management on the journal
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/TestableJournal.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/TestableJournal.java 2010-08-26 17:48:38 UTC (rev 9598)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/TestableJournal.java 2010-08-26 21:11:42 UTC (rev 9599)
@@ -49,8 +49,6 @@
void forceMoveNextFile() throws Exception;
- void forceMoveNextFile(boolean synchronous) throws Exception;
-
void setAutoReclaim(boolean autoReclaim);
boolean isAutoReclaim();
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2010-08-26 17:48:38 UTC (rev 9598)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2010-08-26 21:11:42 UTC (rev 9599)
@@ -54,6 +54,8 @@
protected JournalFile currentFile;
protected SequentialFile sequentialFile;
+
+ protected final FilesRepository filesRepository;
protected long nextOrderingID;
@@ -69,11 +71,13 @@
protected AbstractJournalUpdateTask(final SequentialFileFactory fileFactory,
final JournalImpl journal,
+ final FilesRepository filesRepository,
final Set<Long> recordsSnapshot,
final long nextOrderingID)
{
super();
this.journal = journal;
+ this.filesRepository = filesRepository;
this.fileFactory = fileFactory;
this.nextOrderingID = nextOrderingID;
this.recordsSnapshot.addAll(recordsSnapshot);
@@ -212,7 +216,7 @@
writingChannel = HornetQBuffers.wrappedBuffer(bufferWrite);
- currentFile = journal.getFile(false, false, false, true);
+ currentFile = filesRepository.takeFile(false, false, false, true);
sequentialFile = currentFile.getFile();
Added: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java (rev 0)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java 2010-08-26 21:11:42 UTC (rev 9599)
@@ -0,0 +1,602 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.logging.Logger;
+
+/**
+ * This is a helper class for the Journal, which will control access to dataFiles, openedFiles and freeFiles
+ * Guaranteeing that they will be delivered in order to the Journal
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class FilesRepository
+{
+
+ private static final Logger log = Logger.getLogger(JournalImpl.class);
+
+ private static final boolean trace = false;
+
+ // This method exists just to make debug easier.
+ // I could replace log.trace by log.info temporarily while I was debugging
+ // Journal
+ private static final void trace(final String message)
+ {
+ log.trace(message);
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final SequentialFileFactory fileFactory;
+
+ private final BlockingDeque<JournalFile> dataFiles = new LinkedBlockingDeque<JournalFile>();
+
+ private final BlockingQueue<JournalFile> pendingCloseFiles = new LinkedBlockingDeque<JournalFile>();
+
+ private final ConcurrentLinkedQueue<JournalFile> freeFiles = new ConcurrentLinkedQueue<JournalFile>();
+
+ private final BlockingQueue<JournalFile> openedFiles = new LinkedBlockingQueue<JournalFile>();
+
+ private final AtomicLong nextFileID = new AtomicLong(0);
+
+ private final int maxAIO;
+
+ private final int minFiles;
+
+ private final int fileSize;
+
+ private final String filePrefix;
+
+ private final String fileExtension;
+
+ private final int userVersion;
+
+ private Executor filesExecutor;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public FilesRepository(final SequentialFileFactory fileFactory,
+ final String filePrefix,
+ final String fileExtension,
+ final int userVersion,
+ final int maxAIO,
+ final int fileSize,
+ final int minFiles)
+ {
+ this.fileFactory = fileFactory;
+ this.maxAIO = maxAIO;
+ this.filePrefix = filePrefix;
+ this.fileExtension = fileExtension;
+ this.minFiles = minFiles;
+ this.fileSize = fileSize;
+ this.userVersion = userVersion;
+ }
+
+ // Public --------------------------------------------------------
+
+
+ public void setExecutor(Executor executor)
+ {
+ this.filesExecutor = executor;
+ }
+
+ public void clear()
+ {
+ dataFiles.clear();
+
+ pendingCloseFiles.clear();
+
+ freeFiles.clear();
+
+ openedFiles.clear();
+ }
+
+ public int getMaxAIO()
+ {
+ return maxAIO;
+ }
+
+ public String getFileExtension()
+ {
+ return fileExtension;
+ }
+
+ public String getFilePrefix()
+ {
+ return filePrefix;
+ }
+
+ public void calculateNextfileID(List<JournalFile> files)
+ {
+
+ for (JournalFile file : files)
+ {
+ long fileID = file.getFileID();
+ if (nextFileID.get() < fileID)
+ {
+ nextFileID.set(fileID);
+ }
+
+ long fileNameID = getFileNameID(file.getFile().getFileName());
+
+ // The compactor could create a fileName but use a previously assigned ID.
+ // Because of that we need to take both parts into account
+ if (nextFileID.get() < fileNameID)
+ {
+ nextFileID.set(fileNameID);
+ }
+ }
+
+ }
+
+ public void ensureMinFiles() throws Exception
+ {
+ // FIXME - size() involves a scan
+ int filesToCreate = minFiles - (dataFiles.size() + freeFiles.size());
+
+ if (filesToCreate > 0)
+ {
+ for (int i = 0; i < filesToCreate; i++)
+ {
+ // Keeping all files opened can be very costly (mainly on AIO)
+ freeFiles.add(createFile(false, false, true, false));
+ }
+ }
+
+ }
+
+ public void openFile(final JournalFile file, final boolean multiAIO) throws Exception
+ {
+ if (multiAIO)
+ {
+ file.getFile().open();
+ }
+ else
+ {
+ file.getFile().open(1, false);
+ }
+
+ file.getFile().position(file.getFile().calculateBlockStart(JournalImpl.SIZE_HEADER));
+ }
+
+ // Data File Operations ==========================================
+
+ public JournalFile[] getDataFilesArray()
+ {
+ return (JournalFile[])dataFiles.toArray(new JournalFile[dataFiles.size()]);
+ }
+
+ public JournalFile pollLastDataFile()
+ {
+ /*java.util.Iterator<JournalFile> iter = dataFiles.iterator();
+
+ JournalFile currentFile = null;
+ while (iter.hasNext())
+ {
+ currentFile = iter.next();
+
+ if (!iter.hasNext())
+ {
+ iter.remove();
+ }
+ }
+
+ return currentFile; */
+
+ return dataFiles.pollLast();
+ }
+
+ public void removeDataFile(JournalFile file)
+ {
+ if (!dataFiles.remove(file))
+ {
+ log.warn("Could not remove file " + file + " from the list of data files");
+ }
+ }
+
+ public int getDataFilesCount()
+ {
+ return dataFiles.size();
+ }
+
+ public Collection<JournalFile> getDataFiles()
+ {
+ return dataFiles;
+ }
+
+ public void clearDataFiles()
+ {
+ dataFiles.clear();
+ }
+
+ public void addDataFileOnTop(JournalFile file)
+ {
+ dataFiles.addFirst(file);
+ }
+
+ public void addDataFileOnBottom(JournalFile file)
+ {
+ dataFiles.add(file);
+ }
+
+ // Free File Operations ==========================================
+
+ public int getFreeFilesCount()
+ {
+ return freeFiles.size();
+ }
+
+ /**
+ * Add directly to the freeFiles structure without reinitializing the file.
+ * used on load() only
+ */
+ public void addFreeFileNoInit(JournalFile file)
+ {
+ freeFiles.add(file);
+ }
+
+ /**
+ * @param file
+ * @throws Exception
+ */
+ public synchronized void addFreeFile(final JournalFile file, final boolean renameTmp) throws Exception
+ {
+ if (file.getFile().size() != fileSize)
+ {
+ log.warn("Deleting " + file + ".. as it doesn't have the configured size", new Exception("trace"));
+ file.getFile().delete();
+ }
+ else
+ // FIXME - size() involves a scan!!!
+ if (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() < minFiles)
+ {
+ // Re-initialise it
+
+ if (trace)
+ {
+ trace("Adding free file " + file);
+ }
+
+ JournalFile jf = reinitializeFile(file);
+
+ if (renameTmp)
+ {
+ jf.getFile().renameTo(JournalImpl.renameExtensionFile(jf.getFile().getFileName(), ".tmp"));
+ }
+
+ freeFiles.add(jf);
+ }
+ else
+ {
+ file.getFile().delete();
+ }
+ }
+
+ public Collection<JournalFile> getFreeFiles()
+ {
+ return freeFiles;
+ }
+
+ public JournalFile getFreeFile()
+ {
+ return freeFiles.remove();
+ }
+
+ // Opened files operations =======================================
+
+ public int getOpenedFilesCount()
+ {
+ return openedFiles.size();
+ }
+
+ public void drainClosedFiles()
+ {
+ JournalFile file;
+ try
+ {
+ while ((file = pendingCloseFiles.poll()) != null)
+ {
+ file.getFile().close();
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+
+ }
+
+ /**
+ * <p>This method will instantly return the opened file, and schedule opening and reclaiming.</p>
+ * <p>In case there are no cached opened files, this method will block until the file was opened,
+ * what would happen only if the system is under heavy load by another system (like a backup system, or a DB sharing the same box as HornetQ).</p>
+ * */
+ public JournalFile openFile() throws InterruptedException
+ {
+ if (trace)
+ {
+ trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
+ }
+
+ Runnable run = new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ pushOpenedFile();
+ }
+ catch (Exception e)
+ {
+ log.error(e.getMessage(), e);
+ }
+ }
+ };
+
+ if (filesExecutor == null)
+ {
+ run.run();
+ }
+ else
+ {
+ filesExecutor.execute(run);
+ }
+
+ JournalFile nextFile = null;
+
+ while (nextFile == null)
+ {
+ nextFile = openedFiles.poll(5, TimeUnit.SECONDS);
+ if (nextFile == null)
+ {
+ log.warn("Couldn't open a file in 60 Seconds", new Exception("Warning: Couldn't open a file in 60 Seconds"));
+ }
+ }
+
+ if (trace)
+ {
+ trace("Returning file " + nextFile);
+ }
+
+ return nextFile;
+ }
+
+ /**
+ *
+ * Open a file and place it into the openedFiles queue
+ * */
+ public void pushOpenedFile() throws Exception
+ {
+ JournalFile nextOpenedFile = takeFile(true, true, true, false);
+
+ if (trace)
+ {
+ trace("pushing openFile " + nextOpenedFile);
+ }
+
+ openedFiles.offer(nextOpenedFile);
+ }
+
+
+ public void closeFile(final JournalFile file)
+ {
+ fileFactory.deactivateBuffer();
+ pendingCloseFiles.add(file);
+ dataFiles.add(file);
+
+ Runnable run = new Runnable()
+ {
+ public void run()
+ {
+ drainClosedFiles();
+ }
+ };
+
+ if (filesExecutor == null)
+ {
+ run.run();
+ }
+ else
+ {
+ filesExecutor.execute(run);
+ }
+
+ }
+
+
+ /**
+ * This will get a File from freeFile without initializing it
+ * @return
+ * @throws Exception
+ */
+ public JournalFile takeFile(final boolean keepOpened,
+ final boolean multiAIO,
+ final boolean initFile,
+ final boolean tmpCompactExtension) throws Exception
+ {
+ JournalFile nextOpenedFile = null;
+
+ nextOpenedFile = freeFiles.poll();
+
+ if (nextOpenedFile == null)
+ {
+ nextOpenedFile = createFile(keepOpened, multiAIO, initFile, tmpCompactExtension);
+ }
+ else
+ {
+ if (tmpCompactExtension)
+ {
+ SequentialFile sequentialFile = nextOpenedFile.getFile();
+ sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
+ }
+
+ if (keepOpened)
+ {
+ openFile(nextOpenedFile, multiAIO);
+ }
+ }
+ return nextOpenedFile;
+ }
+
+
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ /**
+ * This method will create a new file on the file system, pre-fill it with FILL_CHARACTER
+ * @param keepOpened
+ * @return
+ * @throws Exception
+ */
+ private JournalFile createFile(final boolean keepOpened,
+ final boolean multiAIO,
+ final boolean init,
+ final boolean tmpCompact) throws Exception
+ {
+ long fileID = generateFileID();
+
+ String fileName;
+
+ fileName = createFileName(tmpCompact, fileID);
+
+ if (trace)
+ {
+ trace("Creating file " + fileName);
+ }
+
+ String tmpFileName = fileName + ".tmp";
+
+ SequentialFile sequentialFile = fileFactory.createSequentialFile(tmpFileName, maxAIO);
+
+ sequentialFile.open(1, false);
+
+ if (init)
+ {
+ sequentialFile.fill(0, fileSize, JournalImpl.FILL_CHARACTER);
+
+ JournalImpl.initFileHeader(this.fileFactory, sequentialFile, userVersion, fileID);
+ }
+
+ long position = sequentialFile.position();
+
+ sequentialFile.close();
+
+ if (trace)
+ {
+ trace("Renaming file " + tmpFileName + " as " + fileName);
+ }
+
+ sequentialFile.renameTo(fileName);
+
+ if (keepOpened)
+ {
+ if (multiAIO)
+ {
+ sequentialFile.open();
+ }
+ else
+ {
+ sequentialFile.open(1, false);
+ }
+ sequentialFile.position(position);
+ }
+
+ return new JournalFileImpl(sequentialFile, fileID, JournalImpl.FORMAT_VERSION);
+ }
+
+ /**
+ * @param tmpCompact
+ * @param fileID
+ * @return
+ */
+ private String createFileName(final boolean tmpCompact, long fileID)
+ {
+ String fileName;
+ if (tmpCompact)
+ {
+ fileName = filePrefix + "-" + fileID + "." + fileExtension + ".cmp";
+ }
+ else
+ {
+ fileName = filePrefix + "-" + fileID + "." + fileExtension;
+ }
+ return fileName;
+ }
+
+ private long generateFileID()
+ {
+ return nextFileID.incrementAndGet();
+ }
+
+ /** Get the ID part of the name */
+ private long getFileNameID(final String fileName)
+ {
+ try
+ {
+ return Long.parseLong(fileName.substring(filePrefix.length() + 1, fileName.indexOf('.')));
+ }
+ catch (Throwable e)
+ {
+ log.warn("Impossible to get the ID part of the file name " + fileName, e);
+ return 0;
+ }
+ }
+
+ // Discard the old JournalFile and set it with a new ID
+ private JournalFile reinitializeFile(final JournalFile file) throws Exception
+ {
+ long newFileID = generateFileID();
+
+ SequentialFile sf = file.getFile();
+
+ sf.open(1, false);
+
+ int position = JournalImpl.initFileHeader(this.fileFactory, sf, userVersion, newFileID);
+
+ JournalFile jf = new JournalFileImpl(sf, newFileID, JournalImpl.FORMAT_VERSION);
+
+ sf.position(position);
+
+ sf.close();
+
+ return jf;
+ }
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2010-08-26 17:48:38 UTC (rev 9598)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2010-08-26 21:11:42 UTC (rev 9599)
@@ -144,10 +144,11 @@
public JournalCompactor(final SequentialFileFactory fileFactory,
final JournalImpl journal,
+ final FilesRepository filesRepository,
final Set<Long> recordsSnapshot,
final long firstFileID)
{
- super(fileFactory, journal, recordsSnapshot, firstFileID);
+ super(fileFactory, journal, filesRepository, recordsSnapshot, firstFileID);
}
/** This methods informs the Compactor about the existence of a pending (non committed) transaction */
@@ -531,7 +532,14 @@
void execute() throws Exception
{
JournalRecord deleteRecord = journal.getRecords().remove(id);
- deleteRecord.delete(usedFile);
+ if (deleteRecord == null)
+ {
+ log.warn("Can't find record " + id + " during compact replay");
+ }
+ else
+ {
+ deleteRecord.delete(usedFile);
+ }
}
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-26 17:48:38 UTC (rev 9598)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-26 21:11:42 UTC (rev 9599)
@@ -24,16 +24,11 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -99,6 +94,10 @@
private static final Logger log = Logger.getLogger(JournalImpl.class);
private static final boolean trace = log.isTraceEnabled();
+
+ // This is useful at debug time...
+ // if you set it to true, all the appends, deletes, rollbacks, commits, etc.. are sent to System.out
+ private static final boolean TRACE_RECORDS = false;
// This method exists just to make debug easier.
// I could replace log.trace by log.info temporarily while I was debugging
@@ -107,6 +106,11 @@
{
log.trace(message);
}
+
+ private static final void traceRecord(final String message)
+ {
+ System.out.println(message);
+ }
// The sizes of primitive types
@@ -167,12 +171,8 @@
private volatile boolean autoReclaim = true;
- private final AtomicLong nextFileID = new AtomicLong(0);
-
private final int userVersion;
- private final int maxAIO;
-
private final int fileSize;
private final int minFiles;
@@ -183,18 +183,9 @@
private final SequentialFileFactory fileFactory;
- public final String filePrefix;
- public final String fileExtension;
-
- private final BlockingDeque<JournalFile> dataFiles = new LinkedBlockingDeque<JournalFile>();
-
- private final BlockingQueue<JournalFile> pendingCloseFiles = new LinkedBlockingDeque<JournalFile>();
-
- private final ConcurrentLinkedQueue<JournalFile> freeFiles = new ConcurrentLinkedQueue<JournalFile>();
-
- private final BlockingQueue<JournalFile> openedFiles = new LinkedBlockingQueue<JournalFile>();
-
+ private final FilesRepository filesRepository;
+
// Compacting may replace this structure
private final ConcurrentMap<Long, JournalRecord> records = new ConcurrentHashMap<Long, JournalRecord>();
@@ -300,13 +291,9 @@
this.minFiles = minFiles;
this.fileFactory = fileFactory;
+
+ filesRepository = new FilesRepository(fileFactory, filePrefix, fileExtension, userVersion, maxAIO, fileSize, minFiles);
- this.filePrefix = filePrefix;
-
- this.fileExtension = fileExtension;
-
- this.maxAIO = maxAIO;
-
this.userVersion = userVersion;
}
@@ -390,13 +377,13 @@
* It won't be part of the interface as the tools should be specific to the implementation */
public List<JournalFile> orderFiles() throws Exception
{
- List<String> fileNames = fileFactory.listFiles(fileExtension);
+ List<String> fileNames = fileFactory.listFiles(filesRepository.getFileExtension());
List<JournalFile> orderedFiles = new ArrayList<JournalFile>(fileNames.size());
for (String fileName : fileNames)
{
- SequentialFile file = fileFactory.createSequentialFile(fileName, maxAIO);
+ SequentialFile file = fileFactory.createSequentialFile(fileName, filesRepository.getMaxAIO());
file.open(1, false);
@@ -421,29 +408,6 @@
return orderedFiles;
}
- private void calculateNextfileID(List<JournalFile> files)
- {
-
- for (JournalFile file : files)
- {
- long fileID = file.getFileID();
- if (nextFileID.get() < fileID)
- {
- nextFileID.set(fileID);
- }
-
- long fileNameID = getFileNameID(file.getFile().getFileName());
-
- // The compactor could create a fileName but use a previously assigned ID.
- // Because of that we need to take both parts into account
- if (nextFileID.get() < fileNameID)
- {
- nextFileID.set(fileNameID);
- }
- }
-
- }
-
/** this method is used internally only however tools may use it to maintenance. */
public static int readJournalFile(final SequentialFileFactory fileFactory,
final JournalFile file,
@@ -856,6 +820,8 @@
{
JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback);
+ if (TRACE_RECORDS) traceRecord("appendAddRecord::id=" + id + ", usedFile = " + usedFile);
+
records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize()));
}
finally
@@ -932,6 +898,8 @@
{
JournalFile usedFile = appendRecord(updateRecord, false, sync, null, callback);
+ if (TRACE_RECORDS) traceRecord("appendUpdateRecord::id=" + id + ", usedFile = " + usedFile);
+
// record== null here could only mean there is a compactor, and computing the delete should be done after
// compacting is done
if (jrnRecord == null)
@@ -1008,6 +976,8 @@
{
JournalFile usedFile = appendRecord(deleteRecord, false, sync, null, callback);
+ if (TRACE_RECORDS) traceRecord("appendDeleteRecord::id=" + id + ", usedFile = " + usedFile);
+
// record== null here could only mean there is a compactor, and computing the delete should be done after
// compacting is done
if (record == null)
@@ -1060,6 +1030,8 @@
{
JournalFile usedFile = appendRecord(addRecord, false, false, tx, null);
+ if (TRACE_RECORDS) traceRecord("appendAddRecordTransactional:txID=" + txID + ",id=" + id + ", usedFile = " + usedFile);
+
tx.addPositive(usedFile, id, addRecord.getEncodeSize());
}
finally
@@ -1104,6 +1076,8 @@
{
JournalFile usedFile = appendRecord(updateRecordTX, false, false, tx, null);
+ if (TRACE_RECORDS) traceRecord("appendUpdateRecordTransactional::txID=" + txID + ",id="+id + ", usedFile = " + usedFile);
+
tx.addPositive(usedFile, id, updateRecordTX.getEncodeSize());
}
finally
@@ -1142,6 +1116,8 @@
{
JournalFile usedFile = appendRecord(deleteRecordTX, false, false, tx, null);
+ if (TRACE_RECORDS) traceRecord("appendDeleteRecordTransactional::txID=" + txID + ", id=" + id + ", usedFile = " + usedFile);
+
tx.addNegative(usedFile, id);
}
finally
@@ -1230,6 +1206,8 @@
try
{
JournalFile usedFile = appendRecord(prepareRecord, true, sync, tx, callback);
+
+ if (TRACE_RECORDS) traceRecord("appendPrepareRecord::txID=" + txID + ", usedFile = " + usedFile);
tx.prepare(usedFile);
}
@@ -1248,7 +1226,7 @@
public void appendCommitRecord(final long txID, final boolean sync) throws Exception
{
SyncIOCompletion syncCompletion = getSyncCallback(sync);
-
+
appendCommitRecord(txID, sync, syncCompletion);
if (syncCompletion != null)
@@ -1303,6 +1281,8 @@
try
{
JournalFile usedFile = appendRecord(commitRecord, true, sync, tx, callback);
+
+ if (TRACE_RECORDS) traceRecord("appendCommitRecord::txID=" + txID + ", usedFile = " + usedFile);
tx.commit(usedFile);
}
@@ -1514,7 +1494,7 @@
throw new IllegalStateException("There is pending compacting operation");
}
- ArrayList<JournalFile> dataFilesToProcess = new ArrayList<JournalFile>(dataFiles.size());
+ ArrayList<JournalFile> dataFilesToProcess = new ArrayList<JournalFile>(filesRepository.getDataFilesCount());
boolean previousReclaimValue = autoReclaim;
@@ -1524,7 +1504,8 @@
{
JournalImpl.trace("Starting compacting operation on journal");
}
- JournalImpl.log.debug("Starting compacting operation on journal");
+
+ if (TRACE_RECORDS) traceRecord("Starting compacting operation on journal");
onCompactStart();
@@ -1543,22 +1524,22 @@
setAutoReclaim(false);
// We need to move to the next file, as we need a clear start for negatives and positives counts
- moveNextFile(true);
+ moveNextFile(false);
+ filesRepository.drainClosedFiles();
+
// Take the snapshots and replace the structures
- dataFilesToProcess.addAll(dataFiles);
+ dataFilesToProcess.addAll(filesRepository.getDataFiles());
- dataFiles.clear();
+ filesRepository.clearDataFiles();
- drainClosedFiles();
-
if (dataFilesToProcess.size() == 0)
{
return;
}
- compactor = new JournalCompactor(fileFactory, this, records.keySet(), dataFilesToProcess.get(0).getFileID());
+ compactor = new JournalCompactor(fileFactory, this, filesRepository, records.keySet(), dataFilesToProcess.get(0).getFileID());
for (Map.Entry<Long, JournalTransaction> entry : transactions.entrySet())
{
@@ -1632,12 +1613,12 @@
{
JournalImpl.trace("Adding file " + fileToAdd + " back as datafile");
}
- dataFiles.addFirst(fileToAdd);
+ filesRepository.addDataFileOnTop(fileToAdd);
}
if (trace)
{
- JournalImpl.trace("There are " + dataFiles.size() + " datafiles Now");
+ JournalImpl.trace("There are " + filesRepository.getDataFilesCount() + " datafiles Now");
}
// Replay pending commands (including updates, deletes and commits)
@@ -1683,6 +1664,8 @@
{
JournalImpl.log.debug("Finished compacting on journal");
}
+
+ if (TRACE_RECORDS) traceRecord("Finished compacting on journal");
}
finally
@@ -1753,21 +1736,15 @@
records.clear();
- dataFiles.clear();
+ filesRepository.clear();
- pendingCloseFiles.clear();
-
- freeFiles.clear();
-
- openedFiles.clear();
-
transactions.clear();
final Map<Long, TransactionHolder> loadTransactions = new LinkedHashMap<Long, TransactionHolder>();
final List<JournalFile> orderedFiles = orderFiles();
- calculateNextfileID(orderedFiles);
+ filesRepository.calculateNextfileID(orderedFiles);
int lastDataPos = JournalImpl.SIZE_HEADER;
@@ -2030,43 +2007,23 @@
if (hasData.get())
{
lastDataPos = resultLastPost;
- dataFiles.add(file);
+ filesRepository.addDataFileOnBottom(file);
}
else
{
// Empty dataFiles with no data
- freeFiles.add(file);
+ filesRepository.addFreeFileNoInit(file);
}
}
// Create any more files we need
- // FIXME - size() involves a scan
- int filesToCreate = minFiles - (dataFiles.size() + freeFiles.size());
+ filesRepository.ensureMinFiles();
- if (filesToCreate > 0)
- {
- for (int i = 0; i < filesToCreate; i++)
- {
- // Keeping all files opened can be very costly (mainly on AIO)
- freeFiles.add(createFile(false, false, true, false));
- }
- }
+ // The current file is the last one that has data
+
+ currentFile = filesRepository.pollLastDataFile();
- // The current file is the last one
-
- Iterator<JournalFile> iter = dataFiles.iterator();
-
- while (iter.hasNext())
- {
- currentFile = iter.next();
-
- if (!iter.hasNext())
- {
- iter.remove();
- }
- }
-
if (currentFile != null)
{
currentFile.getFile().open();
@@ -2075,14 +2032,14 @@
}
else
{
- currentFile = freeFiles.remove();
+ currentFile = filesRepository.getFreeFile();
- openFile(currentFile, true);
+ filesRepository.openFile(currentFile, true);
}
fileFactory.activateBuffer(currentFile.getFile());
- pushOpenedFile();
+ filesRepository.pushOpenedFile();
for (TransactionHolder transaction : loadTransactions.values())
{
@@ -2152,7 +2109,7 @@
{
reclaimer.scan(getDataFiles());
- for (JournalFile file : dataFiles)
+ for (JournalFile file : filesRepository.getDataFiles())
{
if (file.isCanReclaim())
{
@@ -2161,13 +2118,10 @@
{
JournalImpl.trace("Reclaiming file " + file);
}
+
+ filesRepository.removeDataFile(file);
- if (!dataFiles.remove(file))
- {
- JournalImpl.log.warn("Could not remove file " + file);
- }
-
- addFreeFile(file, false);
+ filesRepository.addFreeFile(file, false);
}
}
}
@@ -2179,9 +2133,6 @@
return false;
}
-
- int deleteme = 0;
-
private boolean needsCompact() throws Exception
{
JournalFile[] dataFiles = getDataFiles();
@@ -2197,7 +2148,7 @@
long compactMargin = (long)(totalBytes * compactPercentage);
- boolean needCompact = (totalLiveSize < compactMargin && !compactorRunning.get() && dataFiles.length > compactMinFiles);
+ boolean needCompact = (totalLiveSize < compactMargin && dataFiles.length > compactMinFiles);
return needCompact;
@@ -2218,33 +2169,38 @@
if (needsCompact())
{
- if (!compactorRunning.compareAndSet(false, true))
- {
- return;
- }
+ scheduleCompact();
+ }
+ }
- // We can't use the executor for the compacting... or we would dead lock because of file open and creation
- // operations (that will use the executor)
- compactorExecutor.execute(new Runnable()
+ private void scheduleCompact()
+ {
+ if (!compactorRunning.compareAndSet(false, true))
+ {
+ return;
+ }
+
+ // We can't use the executor for the compacting... or we would dead lock because of file open and creation
+ // operations (that will use the executor)
+ compactorExecutor.execute(new Runnable()
+ {
+ public void run()
{
- public void run()
- {
- try
- {
- JournalImpl.this.compact();
- }
- catch (Throwable e)
- {
- JournalImpl.log.error(e.getMessage(), e);
- }
- finally
- {
- compactorRunning.set(false);
- }
+ try
+ {
+ JournalImpl.this.compact();
}
- });
- }
+ catch (Throwable e)
+ {
+ JournalImpl.log.error(e.getMessage(), e);
+ }
+ finally
+ {
+ compactorRunning.set(false);
+ }
+ }
+ });
}
// TestableJournal implementation
@@ -2266,7 +2222,7 @@
StringBuilder builder = new StringBuilder();
- for (JournalFile file : dataFiles)
+ for (JournalFile file : filesRepository.getDataFiles())
{
builder.append("DataFile:" + file +
" posCounter = " +
@@ -2283,7 +2239,7 @@
}
}
- for (JournalFile file : freeFiles)
+ for (JournalFile file : filesRepository.getFreeFiles())
{
builder.append("FreeFile:" + file + "\n");
}
@@ -2302,8 +2258,6 @@
builder.append("CurrentFile: No current file at this point!");
}
- builder.append("#Opened Files:" + openedFiles.size());
-
return builder.toString();
}
@@ -2339,22 +2293,22 @@
public int getDataFilesCount()
{
- return dataFiles.size();
+ return filesRepository.getDataFilesCount();
}
public JournalFile[] getDataFiles()
{
- return (JournalFile[])dataFiles.toArray(new JournalFile[dataFiles.size()]);
+ return filesRepository.getDataFilesArray();
}
public int getFreeFilesCount()
{
- return freeFiles.size();
+ return filesRepository.getFreeFilesCount();
}
public int getOpenedFilesCount()
{
- return openedFiles.size();
+ return filesRepository.getOpenedFilesCount();
}
public int getIDMapSize()
@@ -2374,17 +2328,17 @@
public String getFilePrefix()
{
- return filePrefix;
+ return filesRepository.getFilePrefix();
}
public String getFileExtension()
{
- return fileExtension;
+ return filesRepository.getFileExtension();
}
public int getMaxAIO()
{
- return maxAIO;
+ return filesRepository.getMaxAIO();
}
public int getUserVersion()
@@ -2395,20 +2349,14 @@
// In some tests we need to force the journal to move to a next file
public void forceMoveNextFile() throws Exception
{
- forceMoveNextFile(true);
- }
-
- // In some tests we need to force the journal to move to a next file
- public void forceMoveNextFile(final boolean synchronous) throws Exception
- {
compactingLock.readLock().lock();
try
{
lockAppend.lock();
try
{
- moveNextFile(synchronous);
- if (autoReclaim && synchronous)
+ moveNextFile(false);
+ if (autoReclaim)
{
checkReclaimStatus();
}
@@ -2462,6 +2410,8 @@
return new Thread(r, "JournalImpl::CompactorExecutor");
}
});
+
+ filesRepository.setExecutor(filesExecutor);
fileFactory.start();
@@ -2493,6 +2443,8 @@
filesExecutor.shutdown();
+ filesRepository.setExecutor(null);
+
if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS))
{
JournalImpl.log.warn("Couldn't stop journal executor after 60 seconds");
@@ -2505,20 +2457,13 @@
currentFile.getFile().close();
}
- for (JournalFile file : openedFiles)
- {
- file.getFile().close();
- }
+ filesRepository.drainClosedFiles();
fileFactory.stop();
currentFile = null;
- dataFiles.clear();
-
- freeFiles.clear();
-
- openedFiles.clear();
+ filesRepository.clear();
}
finally
{
@@ -2578,7 +2523,7 @@
{
try
{
- addFreeFile(file, false);
+ filesRepository.addFreeFile(file, false);
}
catch (Throwable e)
{
@@ -2606,7 +2551,7 @@
* @param name
* @return
*/
- private String renameExtensionFile(String name, String extension)
+ protected static String renameExtensionFile(String name, String extension)
{
name = name.substring(0, name.lastIndexOf(extension));
return name;
@@ -2632,63 +2577,6 @@
// -----------------------------------------------------------------------------
/**
- * @param file
- * @throws Exception
- */
- private void addFreeFile(final JournalFile file, final boolean renameTmp) throws Exception
- {
- if (file.getFile().size() != this.getFileSize())
- {
- log.warn("Deleting " + file + ".. as it doesn't have the configured size", new Exception("trace"));
- file.getFile().delete();
- }
- else
- // FIXME - size() involves a scan!!!
- if (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() < minFiles)
- {
- // Re-initialise it
-
- if (trace)
- {
- trace("Adding free file " + file);
- }
-
- JournalFile jf = reinitializeFile(file);
-
- if (renameTmp)
- {
- jf.getFile().renameTo(renameExtensionFile(jf.getFile().getFileName(), ".tmp"));
- }
-
- freeFiles.add(jf);
- }
- else
- {
- file.getFile().delete();
- }
- }
-
- // Discard the old JournalFile and set it with a new ID
- private JournalFile reinitializeFile(final JournalFile file) throws Exception
- {
- long newFileID = generateFileID();
-
- SequentialFile sf = file.getFile();
-
- sf.open(1, false);
-
- int position = initFileHeader(this.fileFactory, sf, userVersion, newFileID);
-
- JournalFile jf = new JournalFileImpl(sf, newFileID, FORMAT_VERSION);
-
- sf.position(position);
-
- sf.close();
-
- return jf;
- }
-
- /**
* <p> Check for holes on the transaction (a commit written but with an incomplete transaction) </p>
* <p>This method will validate if the transaction (PREPARE/COMMIT) is complete as stated on the COMMIT-RECORD.</p>
*
@@ -2890,7 +2778,7 @@
if (!currentFile.getFile().fits(size))
{
- moveNextFile(false);
+ moveNextFile(true);
// The same check needs to be done at the new file also
if (!currentFile.getFile().fits(size))
@@ -2904,7 +2792,7 @@
{
throw new NullPointerException("Current file = null");
}
-
+
if (tx != null)
{
// The callback of a transaction has to be taken inside the lock,
@@ -2960,196 +2848,26 @@
return currentFile;
}
- /** Get the ID part of the name */
- private long getFileNameID(final String fileName)
+ // You need to guarantee lock.acquire() before calling this method
+ private void moveNextFile(final boolean scheduleReclaim) throws InterruptedException
{
- try
- {
- return Long.parseLong(fileName.substring(filePrefix.length() + 1, fileName.indexOf('.')));
- }
- catch (Throwable e)
- {
- JournalImpl.log.warn("Impossible to get the ID part of the file name " + fileName, e);
- return 0;
- }
- }
+ filesRepository.closeFile(currentFile);
- /**
- * This method will create a new file on the file system, pre-fill it with FILL_CHARACTER
- * @param keepOpened
- * @return
- * @throws Exception
- */
- private JournalFile createFile(final boolean keepOpened,
- final boolean multiAIO,
- final boolean init,
- final boolean tmpCompact) throws Exception
- {
- long fileID = generateFileID();
-
- String fileName;
-
- fileName = createFileName(tmpCompact, fileID);
-
- if (JournalImpl.trace)
+ currentFile = filesRepository.openFile();
+
+ if (scheduleReclaim)
{
- JournalImpl.trace("Creating file " + fileName);
+ scheduleReclaim();
}
- String tmpFileName = fileName + ".tmp";
-
- SequentialFile sequentialFile = fileFactory.createSequentialFile(tmpFileName, maxAIO);
-
- sequentialFile.open(1, false);
-
- if (init)
- {
- sequentialFile.fill(0, fileSize, JournalImpl.FILL_CHARACTER);
-
- initFileHeader(this.fileFactory, sequentialFile, userVersion, fileID);
- }
-
- long position = sequentialFile.position();
-
- sequentialFile.close();
-
if (JournalImpl.trace)
{
- JournalImpl.trace("Renaming file " + tmpFileName + " as " + fileName);
+ JournalImpl.trace("moveNextFile: " + currentFile);
}
- sequentialFile.renameTo(fileName);
-
- if (keepOpened)
- {
- if (multiAIO)
- {
- sequentialFile.open();
- }
- else
- {
- sequentialFile.open(1, false);
- }
- sequentialFile.position(position);
- }
-
- return new JournalFileImpl(sequentialFile, fileID, FORMAT_VERSION);
- }
-
- /**
- * @param tmpCompact
- * @param fileID
- * @return
- */
- private String createFileName(final boolean tmpCompact, long fileID)
- {
- String fileName;
- if (tmpCompact)
- {
- fileName = filePrefix + "-" + fileID + "." + fileExtension + ".cmp";
- }
- else
- {
- fileName = filePrefix + "-" + fileID + "." + fileExtension;
- }
- return fileName;
- }
-
- private void openFile(final JournalFile file, final boolean multiAIO) throws Exception
- {
- if (multiAIO)
- {
- file.getFile().open();
- }
- else
- {
- file.getFile().open(1, false);
- }
-
- file.getFile().position(file.getFile().calculateBlockStart(JournalImpl.SIZE_HEADER));
- }
-
- private long generateFileID()
- {
- return nextFileID.incrementAndGet();
- }
-
- // You need to guarantee lock.acquire() before calling this method
- private void moveNextFile(final boolean synchronous) throws InterruptedException
- {
- closeFile(currentFile, synchronous);
-
- currentFile = enqueueOpenFile(synchronous);
-
- if (JournalImpl.trace)
- {
- JournalImpl.trace("moveNextFile: " + currentFile + " sync: " + synchronous);
- }
-
fileFactory.activateBuffer(currentFile.getFile());
}
- /**
- * <p>This method will instantly return the opened file, and schedule opening and reclaiming.</p>
- * <p>In case there are no cached opened files, this method will block until the file was opened,
- * what would happen only if the system is under heavy load by another system (like a backup system, or a DB sharing the same box as HornetQ).</p>
- * */
- private JournalFile enqueueOpenFile(final boolean synchronous) throws InterruptedException
- {
- if (JournalImpl.trace)
- {
- JournalImpl.trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
- }
-
- Runnable run = new Runnable()
- {
- public void run()
- {
- try
- {
- pushOpenedFile();
- }
- catch (Exception e)
- {
- JournalImpl.log.error(e.getMessage(), e);
- }
- }
- };
-
- if (synchronous)
- {
- run.run();
- }
- else
- {
- filesExecutor.execute(run);
- }
-
- if (!synchronous)
- {
- scheduleReclaim();
- }
-
- JournalFile nextFile = null;
-
- while (nextFile == null)
- {
- nextFile = openedFiles.poll(5, TimeUnit.SECONDS);
- if (nextFile == null)
- {
- JournalImpl.log.warn("Couldn't open a file in 60 Seconds",
- new Exception("Warning: Couldn't open a file in 60 Seconds"));
- }
- }
-
- if (trace)
- {
- JournalImpl.trace("Returning file " + nextFile);
- }
-
- return nextFile;
- }
-
private void scheduleReclaim()
{
if (state != JournalImpl.STATE_LOADED)
@@ -3165,7 +2883,7 @@
{
try
{
- drainClosedFiles();
+ filesRepository.drainClosedFiles();
if (!checkReclaimStatus())
{
checkCompact();
@@ -3180,97 +2898,7 @@
}
}
- /**
- *
- * Open a file and place it into the openedFiles queue
- * */
- private void pushOpenedFile() throws Exception
- {
- JournalFile nextOpenedFile = getFile(true, true, true, false);
- if (trace)
- {
- JournalImpl.trace("pushing openFile " + nextOpenedFile);
- }
-
- openedFiles.offer(nextOpenedFile);
- }
-
- /**
- * @return
- * @throws Exception
- */
- JournalFile getFile(final boolean keepOpened,
- final boolean multiAIO,
- final boolean initFile,
- final boolean tmpCompactExtension) throws Exception
- {
- JournalFile nextOpenedFile = null;
-
- nextOpenedFile = freeFiles.poll();
-
- if (nextOpenedFile == null)
- {
- nextOpenedFile = createFile(keepOpened, multiAIO, initFile, tmpCompactExtension);
- }
- else
- {
- if (tmpCompactExtension)
- {
- SequentialFile sequentialFile = nextOpenedFile.getFile();
- sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
- }
-
- if (keepOpened)
- {
- openFile(nextOpenedFile, multiAIO);
- }
- }
- return nextOpenedFile;
- }
-
- private void closeFile(final JournalFile file, final boolean synchronous)
- {
- fileFactory.deactivateBuffer();
- pendingCloseFiles.add(file);
- dataFiles.add(file);
-
- Runnable run = new Runnable()
- {
- public void run()
- {
- drainClosedFiles();
- }
- };
-
- if (synchronous)
- {
- run.run();
- }
- else
- {
- filesExecutor.execute(run);
- }
-
- }
-
- private void drainClosedFiles()
- {
- JournalFile file;
- try
- {
- while ((file = pendingCloseFiles.poll()) != null)
- {
- file.getFile().close();
- }
- }
- catch (Exception e)
- {
- JournalImpl.log.warn(e.getMessage(), e);
- }
-
- }
-
private JournalTransaction getTransactionInfo(final long txID)
{
JournalTransaction tx = transactions.get(txID);
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java 2010-08-26 17:48:38 UTC (rev 9598)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java 2010-08-26 21:11:42 UTC (rev 9599)
@@ -40,7 +40,7 @@
{
private final boolean isCommit;
- private final long txID;
+ public final long txID;
private final EncodingSupport transactionData;
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java 2010-08-26 17:48:38 UTC (rev 9598)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java 2010-08-26 21:11:42 UTC (rev 9599)
@@ -25,7 +25,7 @@
*/
public class JournalRollbackRecordTX extends JournalInternalRecord
{
- private final long txID;
+ public final long txID;
public JournalRollbackRecordTX(final long txID)
{
Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-08-26 17:48:38 UTC (rev 9598)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-08-26 21:11:42 UTC (rev 9599)
@@ -28,6 +28,7 @@
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.AbstractJournalUpdateTask;
+import org.hornetq.core.journal.impl.ExportJournal;
import org.hornetq.core.journal.impl.JournalCompactor;
import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.journal.impl.JournalFileImpl;
@@ -38,7 +39,6 @@
import org.hornetq.tests.unit.core.journal.impl.fakes.SimpleEncoding;
import org.hornetq.utils.IDGenerator;
import org.hornetq.utils.SimpleIDGenerator;
-import org.hornetq.utils.TimeAndCounterIDGenerator;
/**
*
@@ -53,7 +53,7 @@
private static final int NUMBER_OF_RECORDS = 1000;
- IDGenerator idGenerator = new TimeAndCounterIDGenerator();
+ IDGenerator idGenerator = new SimpleIDGenerator(100000);
// General tests
// =============
@@ -277,13 +277,13 @@
journal.forceMoveNextFile();
addTx(1, 5, 6, 7, 8);
-
+
commit(1);
-
+
journal.forceMoveNextFile();
-
+
journal.compact();
-
+
add(10);
stopJournal();
@@ -806,7 +806,7 @@
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
createJournal();
-
+
startJournal();
load();
@@ -931,7 +931,7 @@
}
startCompact();
-
+
// Delete part of the live records while cleanup still working
for (int i = 1; i < 5; i++)
{
@@ -939,7 +939,7 @@
}
finishCompact();
-
+
// Delete part of the live records after cleanup is done
for (int i = 5; i < 10; i++)
{
@@ -963,7 +963,7 @@
setup(2, 60 * 1024, false);
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
-
+
createJournal();
startJournal();
@@ -978,7 +978,7 @@
addTx(appendTX, appendOne);
startCompact();
-
+
addTx(appendTX, appendTwo);
commit(appendTX);
@@ -1161,7 +1161,7 @@
}
}
-
+
public void testCompactFirstFileWithPendingCommits() throws Exception
{
setup(2, 60 * 1024, true);
@@ -1175,10 +1175,165 @@
{
addTx(tx, idGenerator.generateID());
}
-
+
journal.forceMoveNextFile();
+
+ ArrayList<Long> listToDelete = new ArrayList<Long>();
+ for (int i = 0; i < 10; i++)
+ {
+ if (i == 5)
+ {
+ commit(tx);
+ }
+ long id = idGenerator.generateID();
+ listToDelete.add(id);
+ add(id);
+ }
+
+ journal.forceMoveNextFile();
+
+ for (Long id : listToDelete)
+ {
+ delete(id);
+ }
+
+ journal.forceMoveNextFile();
+
+ // This operation used to be journal.cleanup(journal.getDataFiles()[0]); when cleanup was still in place
+ journal.compact();
+
+ journal.checkReclaimStatus();
+
+ journal.compact();
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testCompactFirstFileWithPendingCommits3() throws Exception
+ {
+ setup(2, 60 * 1024, true);
+
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ long tx = idGenerator.generateID();
+ for (int i = 0; i < 10; i++)
+ {
+ addTx(tx, idGenerator.generateID());
+ }
+
+ journal.forceMoveNextFile();
+
+ ArrayList<Long> listToDelete = new ArrayList<Long>();
+ for (int i = 0; i < 10; i++)
+ {
+ long id = idGenerator.generateID();
+ listToDelete.add(id);
+ add(id);
+ }
+
+ journal.forceMoveNextFile();
+
+ for (Long id : listToDelete)
+ {
+ delete(id);
+ }
+
+ journal.forceMoveNextFile();
+
+ ExportJournal.exportJournal(getTestDir(), filePrefix, fileExtension, 2, this.fileSize, "/tmp/out1.dmp");
+
+ ExportJournal.exportJournal(getTestDir(), filePrefix, fileExtension, 2, this.fileSize, "/tmp/out2.dmp");
+
+ rollback(tx);
+
+ ExportJournal.exportJournal(getTestDir(), filePrefix, fileExtension, 2, this.fileSize, "/tmp/out3.dmp");
+
+ journal.forceMoveNextFile();
+ journal.checkReclaimStatus();
+
+ ExportJournal.exportJournal(getTestDir(), filePrefix, fileExtension, 2, this.fileSize, "/tmp/out4.dmp");
+
+ journal.compact();
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testCompactFirstFileWithPendingCommits2() throws Exception
+ {
+ setup(2, 60 * 1024, true);
+
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ long tx = idGenerator.generateID();
+ for (int i = 0; i < 10; i++)
+ {
+ addTx(tx, idGenerator.generateID());
+ }
+
+ journal.forceMoveNextFile();
+
+ ArrayList<Long> listToDelete = new ArrayList<Long>();
+ for (int i = 0; i < 10; i++)
+ {
+ long id = idGenerator.generateID();
+ listToDelete.add(id);
+ add(id);
+ }
+
+ journal.forceMoveNextFile();
+
+ for (Long id : listToDelete)
+ {
+ delete(id);
+ }
+
+ journal.forceMoveNextFile();
+
+ startCompact();
+ System.out.println("Committing TX " + tx);
commit(tx);
+ finishCompact();
+
+ journal.checkReclaimStatus();
+
+ journal.compact();
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testCompactFirstFileWithPendingCommits4() throws Exception
+ {
+ setup(2, 60 * 1024, true);
+
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ long ids[] = new long[10];
+
+ long tx0 = idGenerator.generateID();
+ for (int i = 0; i < 10; i++)
+ {
+ ids[i] = idGenerator.generateID();
+ addTx(tx0, ids[i]);
+ }
+
+ long tx1 = idGenerator.generateID();
+ journal.forceMoveNextFile();
ArrayList<Long> listToDelete = new ArrayList<Long>();
for (int i = 0; i < 10; i++)
@@ -1187,21 +1342,157 @@
listToDelete.add(id);
add(id);
}
-
+
journal.forceMoveNextFile();
for (Long id : listToDelete)
{
delete(id);
}
+
+ journal.forceMoveNextFile();
+
+ startCompact();
+ System.out.println("Committing TX " + tx1);
+ rollback(tx0);
+ for (int i = 0 ; i < 10; i++)
+ {
+ addTx(tx1, ids[i]);
+ }
+
+ journal.forceMoveNextFile();
+ commit(tx1);
+ finishCompact();
+
+ journal.checkReclaimStatus();
+
+ journal.compact();
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testCompactFirstFileWithPendingCommits5() throws Exception
+ {
+ setup(2, 60 * 1024, true);
+
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ long ids[] = new long[10];
+
+ long tx0 = idGenerator.generateID();
+ for (int i = 0; i < 10; i++)
+ {
+ ids[i] = idGenerator.generateID();
+ addTx(tx0, ids[i]);
+ }
+
+ long tx1 = idGenerator.generateID();
journal.forceMoveNextFile();
-
- // This operation used to be journal.cleanup(journal.getDataFiles()[0]); when cleanup was still in place
+
+ ArrayList<Long> listToDelete = new ArrayList<Long>();
+ for (int i = 0; i < 10; i++)
+ {
+ long id = idGenerator.generateID();
+ listToDelete.add(id);
+ add(id);
+ }
+
+ journal.forceMoveNextFile();
+
+ for (Long id : listToDelete)
+ {
+ delete(id);
+ }
+
+ journal.forceMoveNextFile();
+
+ startCompact();
+ System.out.println("Committing TX " + tx1);
+ rollback(tx0);
+ for (int i = 0 ; i < 10; i++)
+ {
+ addTx(tx1, ids[i]);
+ }
+
+ journal.forceMoveNextFile();
+ commit(tx1);
+ finishCompact();
+
+ journal.checkReclaimStatus();
+
journal.compact();
- journal.checkReclaimStatus();
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testCompactFirstFileWithPendingCommits6() throws Exception
+ {
+ setup(2, 60 * 1024, true);
+
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ long ids[] = new long[10];
+
+ long tx0 = idGenerator.generateID();
+ for (int i = 0; i < 10; i++)
+ {
+ ids[i] = idGenerator.generateID();
+ addTx(tx0, ids[i]);
+ }
+ commit(tx0);
+
+ startCompact();
+ for (int i = 0 ; i < 10; i++)
+ {
+ delete(ids[i]);
+ }
+ finishCompact();
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testCompactFirstFileWithPendingCommits7() throws Exception
+ {
+ setup(2, 60 * 1024, true);
+
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ long tx0 = idGenerator.generateID();
+ add(idGenerator.generateID());
+
+ long ids[] = new long[]{idGenerator.generateID(), idGenerator.generateID()};
+
+ addTx(tx0, ids[0]);
+ addTx(tx0, ids[1]);
+
+ journal.forceMoveNextFile();
+
+ commit(tx0);
+
+ journal.forceMoveNextFile();
+
+ delete(ids[0]);
+ delete(ids[1]);
+
+ journal.forceMoveNextFile();
+
journal.compact();
stopJournal();
Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-08-26 17:48:38 UTC (rev 9598)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-08-26 21:11:42 UTC (rev 9599)
@@ -119,7 +119,7 @@
}
else
{
- factory = new NIOSequentialFileFactory(dir.getPath());
+ factory = new NIOSequentialFileFactory(dir.getPath(), true);
maxAIO = ConfigurationImpl.DEFAULT_JOURNAL_MAX_IO_NIO;
}
@@ -134,9 +134,6 @@
{
protected void onCompactLock() throws Exception
{
- // System.out.println("OnCompactLock");
- journal.forceMoveNextFile(false);
- // System.out.println("OnCompactLock done");
}
protected void onCompactStart() throws Exception
@@ -152,7 +149,7 @@
{
long id = idGen.generateID();
journal.appendAddRecord(id, (byte)0, new byte[] { 1, 2, 3 }, false);
- journal.forceMoveNextFile(false);
+ journal.forceMoveNextFile();
journal.appendDeleteRecord(id, id == 20);
}
// System.out.println("OnCompactStart leave");
More information about the hornetq-commits
mailing list