JBoss hornetq SVN: r9600 - branches/Branch_2_1/src/main/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-27 00:20:03 -0400 (Fri, 27 Aug 2010)
New Revision: 9600
Modified:
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
Log:
https://jira.jboss.org/browse/HORNETQ-485 Improvements on compacting and file management on the journal
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java 2010-08-26 21:11:42 UTC (rev 9599)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java 2010-08-27 04:20:03 UTC (rev 9600)
@@ -114,10 +114,21 @@
{
dataFiles.clear();
- pendingCloseFiles.clear();
+ drainClosedFiles();
freeFiles.clear();
+ for (JournalFile file : openedFiles)
+ {
+ try
+ {
+ file.getFile().close();
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ }
openedFiles.clear();
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-26 21:11:42 UTC (rev 9599)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-27 04:20:03 UTC (rev 9600)
@@ -2459,11 +2459,11 @@
filesRepository.drainClosedFiles();
+ filesRepository.clear();
+
fileFactory.stop();
currentFile = null;
-
- filesRepository.clear();
}
finally
{
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2010-08-26 21:11:42 UTC (rev 9599)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2010-08-27 04:20:03 UTC (rev 9600)
@@ -140,7 +140,7 @@
@Override
public synchronized void close() throws Exception
{
- super.close();
+ super.close();
if (maxIOSemaphore != null)
{
14 years, 4 months
JBoss hornetq SVN: r9599 - in branches/Branch_2_1: src/main/org/hornetq/core/journal/impl and 3 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-26 17:11:42 -0400 (Thu, 26 Aug 2010)
New Revision: 9599
Added:
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java
Modified:
branches/Branch_2_1/src/main/org/hornetq/core/journal/TestableJournal.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java
branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
branches/Branch_2_1/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-485 Improvements on compacting and file management on the journal
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/TestableJournal.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/TestableJournal.java 2010-08-26 17:48:38 UTC (rev 9598)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/TestableJournal.java 2010-08-26 21:11:42 UTC (rev 9599)
@@ -49,8 +49,6 @@
void forceMoveNextFile() throws Exception;
- void forceMoveNextFile(boolean synchronous) throws Exception;
-
void setAutoReclaim(boolean autoReclaim);
boolean isAutoReclaim();
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2010-08-26 17:48:38 UTC (rev 9598)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2010-08-26 21:11:42 UTC (rev 9599)
@@ -54,6 +54,8 @@
protected JournalFile currentFile;
protected SequentialFile sequentialFile;
+
+ protected final FilesRepository filesRepository;
protected long nextOrderingID;
@@ -69,11 +71,13 @@
protected AbstractJournalUpdateTask(final SequentialFileFactory fileFactory,
final JournalImpl journal,
+ final FilesRepository filesRepository,
final Set<Long> recordsSnapshot,
final long nextOrderingID)
{
super();
this.journal = journal;
+ this.filesRepository = filesRepository;
this.fileFactory = fileFactory;
this.nextOrderingID = nextOrderingID;
this.recordsSnapshot.addAll(recordsSnapshot);
@@ -212,7 +216,7 @@
writingChannel = HornetQBuffers.wrappedBuffer(bufferWrite);
- currentFile = journal.getFile(false, false, false, true);
+ currentFile = filesRepository.takeFile(false, false, false, true);
sequentialFile = currentFile.getFile();
Added: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java (rev 0)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java 2010-08-26 21:11:42 UTC (rev 9599)
@@ -0,0 +1,602 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.logging.Logger;
+
+/**
+ * This is a helper class for the Journal, which will control access to dataFiles, openedFiles and freeFiles
+ * Guaranteeing that they will be delivered in order to the Journal
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class FilesRepository
+{
+
+ private static final Logger log = Logger.getLogger(JournalImpl.class);
+
+ private static final boolean trace = false;
+
+ // This method exists just to make debug easier.
+ // I could replace log.trace by log.info temporarily while I was debugging
+ // Journal
+ private static final void trace(final String message)
+ {
+ log.trace(message);
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final SequentialFileFactory fileFactory;
+
+ private final BlockingDeque<JournalFile> dataFiles = new LinkedBlockingDeque<JournalFile>();
+
+ private final BlockingQueue<JournalFile> pendingCloseFiles = new LinkedBlockingDeque<JournalFile>();
+
+ private final ConcurrentLinkedQueue<JournalFile> freeFiles = new ConcurrentLinkedQueue<JournalFile>();
+
+ private final BlockingQueue<JournalFile> openedFiles = new LinkedBlockingQueue<JournalFile>();
+
+ private final AtomicLong nextFileID = new AtomicLong(0);
+
+ private final int maxAIO;
+
+ private final int minFiles;
+
+ private final int fileSize;
+
+ private final String filePrefix;
+
+ private final String fileExtension;
+
+ private final int userVersion;
+
+ private Executor filesExecutor;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public FilesRepository(final SequentialFileFactory fileFactory,
+ final String filePrefix,
+ final String fileExtension,
+ final int userVersion,
+ final int maxAIO,
+ final int fileSize,
+ final int minFiles)
+ {
+ this.fileFactory = fileFactory;
+ this.maxAIO = maxAIO;
+ this.filePrefix = filePrefix;
+ this.fileExtension = fileExtension;
+ this.minFiles = minFiles;
+ this.fileSize = fileSize;
+ this.userVersion = userVersion;
+ }
+
+ // Public --------------------------------------------------------
+
+
+ public void setExecutor(Executor executor)
+ {
+ this.filesExecutor = executor;
+ }
+
+ public void clear()
+ {
+ dataFiles.clear();
+
+ pendingCloseFiles.clear();
+
+ freeFiles.clear();
+
+ openedFiles.clear();
+ }
+
+ public int getMaxAIO()
+ {
+ return maxAIO;
+ }
+
+ public String getFileExtension()
+ {
+ return fileExtension;
+ }
+
+ public String getFilePrefix()
+ {
+ return filePrefix;
+ }
+
+ public void calculateNextfileID(List<JournalFile> files)
+ {
+
+ for (JournalFile file : files)
+ {
+ long fileID = file.getFileID();
+ if (nextFileID.get() < fileID)
+ {
+ nextFileID.set(fileID);
+ }
+
+ long fileNameID = getFileNameID(file.getFile().getFileName());
+
+ // The compactor could create a fileName but use a previously assigned ID.
+ // Because of that we need to take both parts into account
+ if (nextFileID.get() < fileNameID)
+ {
+ nextFileID.set(fileNameID);
+ }
+ }
+
+ }
+
+ public void ensureMinFiles() throws Exception
+ {
+ // FIXME - size() involves a scan
+ int filesToCreate = minFiles - (dataFiles.size() + freeFiles.size());
+
+ if (filesToCreate > 0)
+ {
+ for (int i = 0; i < filesToCreate; i++)
+ {
+ // Keeping all files opened can be very costly (mainly on AIO)
+ freeFiles.add(createFile(false, false, true, false));
+ }
+ }
+
+ }
+
+ public void openFile(final JournalFile file, final boolean multiAIO) throws Exception
+ {
+ if (multiAIO)
+ {
+ file.getFile().open();
+ }
+ else
+ {
+ file.getFile().open(1, false);
+ }
+
+ file.getFile().position(file.getFile().calculateBlockStart(JournalImpl.SIZE_HEADER));
+ }
+
+ // Data File Operations ==========================================
+
+ public JournalFile[] getDataFilesArray()
+ {
+ return (JournalFile[])dataFiles.toArray(new JournalFile[dataFiles.size()]);
+ }
+
+ public JournalFile pollLastDataFile()
+ {
+ /*java.util.Iterator<JournalFile> iter = dataFiles.iterator();
+
+ JournalFile currentFile = null;
+ while (iter.hasNext())
+ {
+ currentFile = iter.next();
+
+ if (!iter.hasNext())
+ {
+ iter.remove();
+ }
+ }
+
+ return currentFile; */
+
+ return dataFiles.pollLast();
+ }
+
+ public void removeDataFile(JournalFile file)
+ {
+ if (!dataFiles.remove(file))
+ {
+ log.warn("Could not remove file " + file + " from the list of data files");
+ }
+ }
+
+ public int getDataFilesCount()
+ {
+ return dataFiles.size();
+ }
+
+ public Collection<JournalFile> getDataFiles()
+ {
+ return dataFiles;
+ }
+
+ public void clearDataFiles()
+ {
+ dataFiles.clear();
+ }
+
+ public void addDataFileOnTop(JournalFile file)
+ {
+ dataFiles.addFirst(file);
+ }
+
+ public void addDataFileOnBottom(JournalFile file)
+ {
+ dataFiles.add(file);
+ }
+
+ // Free File Operations ==========================================
+
+ public int getFreeFilesCount()
+ {
+ return freeFiles.size();
+ }
+
+ /**
+ * Add directly to the freeFiles structure without reinitializing the file.
+ * used on load() only
+ */
+ public void addFreeFileNoInit(JournalFile file)
+ {
+ freeFiles.add(file);
+ }
+
+ /**
+ * @param file
+ * @throws Exception
+ */
+ public synchronized void addFreeFile(final JournalFile file, final boolean renameTmp) throws Exception
+ {
+ if (file.getFile().size() != fileSize)
+ {
+ log.warn("Deleting " + file + ".. as it doesn't have the configured size", new Exception("trace"));
+ file.getFile().delete();
+ }
+ else
+ // FIXME - size() involves a scan!!!
+ if (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() < minFiles)
+ {
+ // Re-initialise it
+
+ if (trace)
+ {
+ trace("Adding free file " + file);
+ }
+
+ JournalFile jf = reinitializeFile(file);
+
+ if (renameTmp)
+ {
+ jf.getFile().renameTo(JournalImpl.renameExtensionFile(jf.getFile().getFileName(), ".tmp"));
+ }
+
+ freeFiles.add(jf);
+ }
+ else
+ {
+ file.getFile().delete();
+ }
+ }
+
+ public Collection<JournalFile> getFreeFiles()
+ {
+ return freeFiles;
+ }
+
+ public JournalFile getFreeFile()
+ {
+ return freeFiles.remove();
+ }
+
+ // Opened files operations =======================================
+
+ public int getOpenedFilesCount()
+ {
+ return openedFiles.size();
+ }
+
+ public void drainClosedFiles()
+ {
+ JournalFile file;
+ try
+ {
+ while ((file = pendingCloseFiles.poll()) != null)
+ {
+ file.getFile().close();
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+
+ }
+
+ /**
+ * <p>This method will instantly return the opened file, and schedule opening and reclaiming.</p>
+ * <p>In case there are no cached opened files, this method will block until the file was opened,
+ * what would happen only if the system is under heavy load by another system (like a backup system, or a DB sharing the same box as HornetQ).</p>
+ * */
+ public JournalFile openFile() throws InterruptedException
+ {
+ if (trace)
+ {
+ trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
+ }
+
+ Runnable run = new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ pushOpenedFile();
+ }
+ catch (Exception e)
+ {
+ log.error(e.getMessage(), e);
+ }
+ }
+ };
+
+ if (filesExecutor == null)
+ {
+ run.run();
+ }
+ else
+ {
+ filesExecutor.execute(run);
+ }
+
+ JournalFile nextFile = null;
+
+ while (nextFile == null)
+ {
+ nextFile = openedFiles.poll(5, TimeUnit.SECONDS);
+ if (nextFile == null)
+ {
+ log.warn("Couldn't open a file in 60 Seconds", new Exception("Warning: Couldn't open a file in 60 Seconds"));
+ }
+ }
+
+ if (trace)
+ {
+ trace("Returning file " + nextFile);
+ }
+
+ return nextFile;
+ }
+
+ /**
+ *
+ * Open a file and place it into the openedFiles queue
+ * */
+ public void pushOpenedFile() throws Exception
+ {
+ JournalFile nextOpenedFile = takeFile(true, true, true, false);
+
+ if (trace)
+ {
+ trace("pushing openFile " + nextOpenedFile);
+ }
+
+ openedFiles.offer(nextOpenedFile);
+ }
+
+
+ public void closeFile(final JournalFile file)
+ {
+ fileFactory.deactivateBuffer();
+ pendingCloseFiles.add(file);
+ dataFiles.add(file);
+
+ Runnable run = new Runnable()
+ {
+ public void run()
+ {
+ drainClosedFiles();
+ }
+ };
+
+ if (filesExecutor == null)
+ {
+ run.run();
+ }
+ else
+ {
+ filesExecutor.execute(run);
+ }
+
+ }
+
+
+ /**
+ * This will get a File from freeFile without initializing it
+ * @return
+ * @throws Exception
+ */
+ public JournalFile takeFile(final boolean keepOpened,
+ final boolean multiAIO,
+ final boolean initFile,
+ final boolean tmpCompactExtension) throws Exception
+ {
+ JournalFile nextOpenedFile = null;
+
+ nextOpenedFile = freeFiles.poll();
+
+ if (nextOpenedFile == null)
+ {
+ nextOpenedFile = createFile(keepOpened, multiAIO, initFile, tmpCompactExtension);
+ }
+ else
+ {
+ if (tmpCompactExtension)
+ {
+ SequentialFile sequentialFile = nextOpenedFile.getFile();
+ sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
+ }
+
+ if (keepOpened)
+ {
+ openFile(nextOpenedFile, multiAIO);
+ }
+ }
+ return nextOpenedFile;
+ }
+
+
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ /**
+ * This method will create a new file on the file system, pre-fill it with FILL_CHARACTER
+ * @param keepOpened
+ * @return
+ * @throws Exception
+ */
+ private JournalFile createFile(final boolean keepOpened,
+ final boolean multiAIO,
+ final boolean init,
+ final boolean tmpCompact) throws Exception
+ {
+ long fileID = generateFileID();
+
+ String fileName;
+
+ fileName = createFileName(tmpCompact, fileID);
+
+ if (trace)
+ {
+ trace("Creating file " + fileName);
+ }
+
+ String tmpFileName = fileName + ".tmp";
+
+ SequentialFile sequentialFile = fileFactory.createSequentialFile(tmpFileName, maxAIO);
+
+ sequentialFile.open(1, false);
+
+ if (init)
+ {
+ sequentialFile.fill(0, fileSize, JournalImpl.FILL_CHARACTER);
+
+ JournalImpl.initFileHeader(this.fileFactory, sequentialFile, userVersion, fileID);
+ }
+
+ long position = sequentialFile.position();
+
+ sequentialFile.close();
+
+ if (trace)
+ {
+ trace("Renaming file " + tmpFileName + " as " + fileName);
+ }
+
+ sequentialFile.renameTo(fileName);
+
+ if (keepOpened)
+ {
+ if (multiAIO)
+ {
+ sequentialFile.open();
+ }
+ else
+ {
+ sequentialFile.open(1, false);
+ }
+ sequentialFile.position(position);
+ }
+
+ return new JournalFileImpl(sequentialFile, fileID, JournalImpl.FORMAT_VERSION);
+ }
+
+ /**
+ * @param tmpCompact
+ * @param fileID
+ * @return
+ */
+ private String createFileName(final boolean tmpCompact, long fileID)
+ {
+ String fileName;
+ if (tmpCompact)
+ {
+ fileName = filePrefix + "-" + fileID + "." + fileExtension + ".cmp";
+ }
+ else
+ {
+ fileName = filePrefix + "-" + fileID + "." + fileExtension;
+ }
+ return fileName;
+ }
+
+ private long generateFileID()
+ {
+ return nextFileID.incrementAndGet();
+ }
+
+ /** Get the ID part of the name */
+ private long getFileNameID(final String fileName)
+ {
+ try
+ {
+ return Long.parseLong(fileName.substring(filePrefix.length() + 1, fileName.indexOf('.')));
+ }
+ catch (Throwable e)
+ {
+ log.warn("Impossible to get the ID part of the file name " + fileName, e);
+ return 0;
+ }
+ }
+
+ // Discard the old JournalFile and set it with a new ID
+ private JournalFile reinitializeFile(final JournalFile file) throws Exception
+ {
+ long newFileID = generateFileID();
+
+ SequentialFile sf = file.getFile();
+
+ sf.open(1, false);
+
+ int position = JournalImpl.initFileHeader(this.fileFactory, sf, userVersion, newFileID);
+
+ JournalFile jf = new JournalFileImpl(sf, newFileID, JournalImpl.FORMAT_VERSION);
+
+ sf.position(position);
+
+ sf.close();
+
+ return jf;
+ }
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2010-08-26 17:48:38 UTC (rev 9598)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2010-08-26 21:11:42 UTC (rev 9599)
@@ -144,10 +144,11 @@
public JournalCompactor(final SequentialFileFactory fileFactory,
final JournalImpl journal,
+ final FilesRepository filesRepository,
final Set<Long> recordsSnapshot,
final long firstFileID)
{
- super(fileFactory, journal, recordsSnapshot, firstFileID);
+ super(fileFactory, journal, filesRepository, recordsSnapshot, firstFileID);
}
/** This methods informs the Compactor about the existence of a pending (non committed) transaction */
@@ -531,7 +532,14 @@
void execute() throws Exception
{
JournalRecord deleteRecord = journal.getRecords().remove(id);
- deleteRecord.delete(usedFile);
+ if (deleteRecord == null)
+ {
+ log.warn("Can't find record " + id + " during compact replay");
+ }
+ else
+ {
+ deleteRecord.delete(usedFile);
+ }
}
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-26 17:48:38 UTC (rev 9598)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-26 21:11:42 UTC (rev 9599)
@@ -24,16 +24,11 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -99,6 +94,10 @@
private static final Logger log = Logger.getLogger(JournalImpl.class);
private static final boolean trace = log.isTraceEnabled();
+
+ // This is useful at debug time...
+ // if you set it to true, all the appends, deletes, rollbacks, commits, etc.. are sent to System.out
+ private static final boolean TRACE_RECORDS = false;
// This method exists just to make debug easier.
// I could replace log.trace by log.info temporarily while I was debugging
@@ -107,6 +106,11 @@
{
log.trace(message);
}
+
+ private static final void traceRecord(final String message)
+ {
+ System.out.println(message);
+ }
// The sizes of primitive types
@@ -167,12 +171,8 @@
private volatile boolean autoReclaim = true;
- private final AtomicLong nextFileID = new AtomicLong(0);
-
private final int userVersion;
- private final int maxAIO;
-
private final int fileSize;
private final int minFiles;
@@ -183,18 +183,9 @@
private final SequentialFileFactory fileFactory;
- public final String filePrefix;
- public final String fileExtension;
-
- private final BlockingDeque<JournalFile> dataFiles = new LinkedBlockingDeque<JournalFile>();
-
- private final BlockingQueue<JournalFile> pendingCloseFiles = new LinkedBlockingDeque<JournalFile>();
-
- private final ConcurrentLinkedQueue<JournalFile> freeFiles = new ConcurrentLinkedQueue<JournalFile>();
-
- private final BlockingQueue<JournalFile> openedFiles = new LinkedBlockingQueue<JournalFile>();
-
+ private final FilesRepository filesRepository;
+
// Compacting may replace this structure
private final ConcurrentMap<Long, JournalRecord> records = new ConcurrentHashMap<Long, JournalRecord>();
@@ -300,13 +291,9 @@
this.minFiles = minFiles;
this.fileFactory = fileFactory;
+
+ filesRepository = new FilesRepository(fileFactory, filePrefix, fileExtension, userVersion, maxAIO, fileSize, minFiles);
- this.filePrefix = filePrefix;
-
- this.fileExtension = fileExtension;
-
- this.maxAIO = maxAIO;
-
this.userVersion = userVersion;
}
@@ -390,13 +377,13 @@
* It won't be part of the interface as the tools should be specific to the implementation */
public List<JournalFile> orderFiles() throws Exception
{
- List<String> fileNames = fileFactory.listFiles(fileExtension);
+ List<String> fileNames = fileFactory.listFiles(filesRepository.getFileExtension());
List<JournalFile> orderedFiles = new ArrayList<JournalFile>(fileNames.size());
for (String fileName : fileNames)
{
- SequentialFile file = fileFactory.createSequentialFile(fileName, maxAIO);
+ SequentialFile file = fileFactory.createSequentialFile(fileName, filesRepository.getMaxAIO());
file.open(1, false);
@@ -421,29 +408,6 @@
return orderedFiles;
}
- private void calculateNextfileID(List<JournalFile> files)
- {
-
- for (JournalFile file : files)
- {
- long fileID = file.getFileID();
- if (nextFileID.get() < fileID)
- {
- nextFileID.set(fileID);
- }
-
- long fileNameID = getFileNameID(file.getFile().getFileName());
-
- // The compactor could create a fileName but use a previously assigned ID.
- // Because of that we need to take both parts into account
- if (nextFileID.get() < fileNameID)
- {
- nextFileID.set(fileNameID);
- }
- }
-
- }
-
/** this method is used internally only however tools may use it to maintenance. */
public static int readJournalFile(final SequentialFileFactory fileFactory,
final JournalFile file,
@@ -856,6 +820,8 @@
{
JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback);
+ if (TRACE_RECORDS) traceRecord("appendAddRecord::id=" + id + ", usedFile = " + usedFile);
+
records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize()));
}
finally
@@ -932,6 +898,8 @@
{
JournalFile usedFile = appendRecord(updateRecord, false, sync, null, callback);
+ if (TRACE_RECORDS) traceRecord("appendUpdateRecord::id=" + id + ", usedFile = " + usedFile);
+
// record== null here could only mean there is a compactor, and computing the delete should be done after
// compacting is done
if (jrnRecord == null)
@@ -1008,6 +976,8 @@
{
JournalFile usedFile = appendRecord(deleteRecord, false, sync, null, callback);
+ if (TRACE_RECORDS) traceRecord("appendDeleteRecord::id=" + id + ", usedFile = " + usedFile);
+
// record== null here could only mean there is a compactor, and computing the delete should be done after
// compacting is done
if (record == null)
@@ -1060,6 +1030,8 @@
{
JournalFile usedFile = appendRecord(addRecord, false, false, tx, null);
+ if (TRACE_RECORDS) traceRecord("appendAddRecordTransactional:txID=" + txID + ",id=" + id + ", usedFile = " + usedFile);
+
tx.addPositive(usedFile, id, addRecord.getEncodeSize());
}
finally
@@ -1104,6 +1076,8 @@
{
JournalFile usedFile = appendRecord(updateRecordTX, false, false, tx, null);
+ if (TRACE_RECORDS) traceRecord("appendUpdateRecordTransactional::txID=" + txID + ",id="+id + ", usedFile = " + usedFile);
+
tx.addPositive(usedFile, id, updateRecordTX.getEncodeSize());
}
finally
@@ -1142,6 +1116,8 @@
{
JournalFile usedFile = appendRecord(deleteRecordTX, false, false, tx, null);
+ if (TRACE_RECORDS) traceRecord("appendDeleteRecordTransactional::txID=" + txID + ", id=" + id + ", usedFile = " + usedFile);
+
tx.addNegative(usedFile, id);
}
finally
@@ -1230,6 +1206,8 @@
try
{
JournalFile usedFile = appendRecord(prepareRecord, true, sync, tx, callback);
+
+ if (TRACE_RECORDS) traceRecord("appendPrepareRecord::txID=" + txID + ", usedFile = " + usedFile);
tx.prepare(usedFile);
}
@@ -1248,7 +1226,7 @@
public void appendCommitRecord(final long txID, final boolean sync) throws Exception
{
SyncIOCompletion syncCompletion = getSyncCallback(sync);
-
+
appendCommitRecord(txID, sync, syncCompletion);
if (syncCompletion != null)
@@ -1303,6 +1281,8 @@
try
{
JournalFile usedFile = appendRecord(commitRecord, true, sync, tx, callback);
+
+ if (TRACE_RECORDS) traceRecord("appendCommitRecord::txID=" + txID + ", usedFile = " + usedFile);
tx.commit(usedFile);
}
@@ -1514,7 +1494,7 @@
throw new IllegalStateException("There is pending compacting operation");
}
- ArrayList<JournalFile> dataFilesToProcess = new ArrayList<JournalFile>(dataFiles.size());
+ ArrayList<JournalFile> dataFilesToProcess = new ArrayList<JournalFile>(filesRepository.getDataFilesCount());
boolean previousReclaimValue = autoReclaim;
@@ -1524,7 +1504,8 @@
{
JournalImpl.trace("Starting compacting operation on journal");
}
- JournalImpl.log.debug("Starting compacting operation on journal");
+
+ if (TRACE_RECORDS) traceRecord("Starting compacting operation on journal");
onCompactStart();
@@ -1543,22 +1524,22 @@
setAutoReclaim(false);
// We need to move to the next file, as we need a clear start for negatives and positives counts
- moveNextFile(true);
+ moveNextFile(false);
+ filesRepository.drainClosedFiles();
+
// Take the snapshots and replace the structures
- dataFilesToProcess.addAll(dataFiles);
+ dataFilesToProcess.addAll(filesRepository.getDataFiles());
- dataFiles.clear();
+ filesRepository.clearDataFiles();
- drainClosedFiles();
-
if (dataFilesToProcess.size() == 0)
{
return;
}
- compactor = new JournalCompactor(fileFactory, this, records.keySet(), dataFilesToProcess.get(0).getFileID());
+ compactor = new JournalCompactor(fileFactory, this, filesRepository, records.keySet(), dataFilesToProcess.get(0).getFileID());
for (Map.Entry<Long, JournalTransaction> entry : transactions.entrySet())
{
@@ -1632,12 +1613,12 @@
{
JournalImpl.trace("Adding file " + fileToAdd + " back as datafile");
}
- dataFiles.addFirst(fileToAdd);
+ filesRepository.addDataFileOnTop(fileToAdd);
}
if (trace)
{
- JournalImpl.trace("There are " + dataFiles.size() + " datafiles Now");
+ JournalImpl.trace("There are " + filesRepository.getDataFilesCount() + " datafiles Now");
}
// Replay pending commands (including updates, deletes and commits)
@@ -1683,6 +1664,8 @@
{
JournalImpl.log.debug("Finished compacting on journal");
}
+
+ if (TRACE_RECORDS) traceRecord("Finished compacting on journal");
}
finally
@@ -1753,21 +1736,15 @@
records.clear();
- dataFiles.clear();
+ filesRepository.clear();
- pendingCloseFiles.clear();
-
- freeFiles.clear();
-
- openedFiles.clear();
-
transactions.clear();
final Map<Long, TransactionHolder> loadTransactions = new LinkedHashMap<Long, TransactionHolder>();
final List<JournalFile> orderedFiles = orderFiles();
- calculateNextfileID(orderedFiles);
+ filesRepository.calculateNextfileID(orderedFiles);
int lastDataPos = JournalImpl.SIZE_HEADER;
@@ -2030,43 +2007,23 @@
if (hasData.get())
{
lastDataPos = resultLastPost;
- dataFiles.add(file);
+ filesRepository.addDataFileOnBottom(file);
}
else
{
// Empty dataFiles with no data
- freeFiles.add(file);
+ filesRepository.addFreeFileNoInit(file);
}
}
// Create any more files we need
- // FIXME - size() involves a scan
- int filesToCreate = minFiles - (dataFiles.size() + freeFiles.size());
+ filesRepository.ensureMinFiles();
- if (filesToCreate > 0)
- {
- for (int i = 0; i < filesToCreate; i++)
- {
- // Keeping all files opened can be very costly (mainly on AIO)
- freeFiles.add(createFile(false, false, true, false));
- }
- }
+ // The current file is the last one that has data
+
+ currentFile = filesRepository.pollLastDataFile();
- // The current file is the last one
-
- Iterator<JournalFile> iter = dataFiles.iterator();
-
- while (iter.hasNext())
- {
- currentFile = iter.next();
-
- if (!iter.hasNext())
- {
- iter.remove();
- }
- }
-
if (currentFile != null)
{
currentFile.getFile().open();
@@ -2075,14 +2032,14 @@
}
else
{
- currentFile = freeFiles.remove();
+ currentFile = filesRepository.getFreeFile();
- openFile(currentFile, true);
+ filesRepository.openFile(currentFile, true);
}
fileFactory.activateBuffer(currentFile.getFile());
- pushOpenedFile();
+ filesRepository.pushOpenedFile();
for (TransactionHolder transaction : loadTransactions.values())
{
@@ -2152,7 +2109,7 @@
{
reclaimer.scan(getDataFiles());
- for (JournalFile file : dataFiles)
+ for (JournalFile file : filesRepository.getDataFiles())
{
if (file.isCanReclaim())
{
@@ -2161,13 +2118,10 @@
{
JournalImpl.trace("Reclaiming file " + file);
}
+
+ filesRepository.removeDataFile(file);
- if (!dataFiles.remove(file))
- {
- JournalImpl.log.warn("Could not remove file " + file);
- }
-
- addFreeFile(file, false);
+ filesRepository.addFreeFile(file, false);
}
}
}
@@ -2179,9 +2133,6 @@
return false;
}
-
- int deleteme = 0;
-
private boolean needsCompact() throws Exception
{
JournalFile[] dataFiles = getDataFiles();
@@ -2197,7 +2148,7 @@
long compactMargin = (long)(totalBytes * compactPercentage);
- boolean needCompact = (totalLiveSize < compactMargin && !compactorRunning.get() && dataFiles.length > compactMinFiles);
+ boolean needCompact = (totalLiveSize < compactMargin && dataFiles.length > compactMinFiles);
return needCompact;
@@ -2218,33 +2169,38 @@
if (needsCompact())
{
- if (!compactorRunning.compareAndSet(false, true))
- {
- return;
- }
+ scheduleCompact();
+ }
+ }
- // We can't use the executor for the compacting... or we would dead lock because of file open and creation
- // operations (that will use the executor)
- compactorExecutor.execute(new Runnable()
+ private void scheduleCompact()
+ {
+ if (!compactorRunning.compareAndSet(false, true))
+ {
+ return;
+ }
+
+ // We can't use the executor for the compacting... or we would dead lock because of file open and creation
+ // operations (that will use the executor)
+ compactorExecutor.execute(new Runnable()
+ {
+ public void run()
{
- public void run()
- {
- try
- {
- JournalImpl.this.compact();
- }
- catch (Throwable e)
- {
- JournalImpl.log.error(e.getMessage(), e);
- }
- finally
- {
- compactorRunning.set(false);
- }
+ try
+ {
+ JournalImpl.this.compact();
}
- });
- }
+ catch (Throwable e)
+ {
+ JournalImpl.log.error(e.getMessage(), e);
+ }
+ finally
+ {
+ compactorRunning.set(false);
+ }
+ }
+ });
}
// TestableJournal implementation
@@ -2266,7 +2222,7 @@
StringBuilder builder = new StringBuilder();
- for (JournalFile file : dataFiles)
+ for (JournalFile file : filesRepository.getDataFiles())
{
builder.append("DataFile:" + file +
" posCounter = " +
@@ -2283,7 +2239,7 @@
}
}
- for (JournalFile file : freeFiles)
+ for (JournalFile file : filesRepository.getFreeFiles())
{
builder.append("FreeFile:" + file + "\n");
}
@@ -2302,8 +2258,6 @@
builder.append("CurrentFile: No current file at this point!");
}
- builder.append("#Opened Files:" + openedFiles.size());
-
return builder.toString();
}
@@ -2339,22 +2293,22 @@
public int getDataFilesCount()
{
- return dataFiles.size();
+ return filesRepository.getDataFilesCount();
}
public JournalFile[] getDataFiles()
{
- return (JournalFile[])dataFiles.toArray(new JournalFile[dataFiles.size()]);
+ return filesRepository.getDataFilesArray();
}
public int getFreeFilesCount()
{
- return freeFiles.size();
+ return filesRepository.getFreeFilesCount();
}
public int getOpenedFilesCount()
{
- return openedFiles.size();
+ return filesRepository.getOpenedFilesCount();
}
public int getIDMapSize()
@@ -2374,17 +2328,17 @@
public String getFilePrefix()
{
- return filePrefix;
+ return filesRepository.getFilePrefix();
}
public String getFileExtension()
{
- return fileExtension;
+ return filesRepository.getFileExtension();
}
public int getMaxAIO()
{
- return maxAIO;
+ return filesRepository.getMaxAIO();
}
public int getUserVersion()
@@ -2395,20 +2349,14 @@
// In some tests we need to force the journal to move to a next file
public void forceMoveNextFile() throws Exception
{
- forceMoveNextFile(true);
- }
-
- // In some tests we need to force the journal to move to a next file
- public void forceMoveNextFile(final boolean synchronous) throws Exception
- {
compactingLock.readLock().lock();
try
{
lockAppend.lock();
try
{
- moveNextFile(synchronous);
- if (autoReclaim && synchronous)
+ moveNextFile(false);
+ if (autoReclaim)
{
checkReclaimStatus();
}
@@ -2462,6 +2410,8 @@
return new Thread(r, "JournalImpl::CompactorExecutor");
}
});
+
+ filesRepository.setExecutor(filesExecutor);
fileFactory.start();
@@ -2493,6 +2443,8 @@
filesExecutor.shutdown();
+ filesRepository.setExecutor(null);
+
if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS))
{
JournalImpl.log.warn("Couldn't stop journal executor after 60 seconds");
@@ -2505,20 +2457,13 @@
currentFile.getFile().close();
}
- for (JournalFile file : openedFiles)
- {
- file.getFile().close();
- }
+ filesRepository.drainClosedFiles();
fileFactory.stop();
currentFile = null;
- dataFiles.clear();
-
- freeFiles.clear();
-
- openedFiles.clear();
+ filesRepository.clear();
}
finally
{
@@ -2578,7 +2523,7 @@
{
try
{
- addFreeFile(file, false);
+ filesRepository.addFreeFile(file, false);
}
catch (Throwable e)
{
@@ -2606,7 +2551,7 @@
* @param name
* @return
*/
- private String renameExtensionFile(String name, String extension)
+ protected static String renameExtensionFile(String name, String extension)
{
name = name.substring(0, name.lastIndexOf(extension));
return name;
@@ -2632,63 +2577,6 @@
// -----------------------------------------------------------------------------
/**
- * @param file
- * @throws Exception
- */
- private void addFreeFile(final JournalFile file, final boolean renameTmp) throws Exception
- {
- if (file.getFile().size() != this.getFileSize())
- {
- log.warn("Deleting " + file + ".. as it doesn't have the configured size", new Exception("trace"));
- file.getFile().delete();
- }
- else
- // FIXME - size() involves a scan!!!
- if (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() < minFiles)
- {
- // Re-initialise it
-
- if (trace)
- {
- trace("Adding free file " + file);
- }
-
- JournalFile jf = reinitializeFile(file);
-
- if (renameTmp)
- {
- jf.getFile().renameTo(renameExtensionFile(jf.getFile().getFileName(), ".tmp"));
- }
-
- freeFiles.add(jf);
- }
- else
- {
- file.getFile().delete();
- }
- }
-
- // Discard the old JournalFile and set it with a new ID
- private JournalFile reinitializeFile(final JournalFile file) throws Exception
- {
- long newFileID = generateFileID();
-
- SequentialFile sf = file.getFile();
-
- sf.open(1, false);
-
- int position = initFileHeader(this.fileFactory, sf, userVersion, newFileID);
-
- JournalFile jf = new JournalFileImpl(sf, newFileID, FORMAT_VERSION);
-
- sf.position(position);
-
- sf.close();
-
- return jf;
- }
-
- /**
* <p> Check for holes on the transaction (a commit written but with an incomplete transaction) </p>
* <p>This method will validate if the transaction (PREPARE/COMMIT) is complete as stated on the COMMIT-RECORD.</p>
*
@@ -2890,7 +2778,7 @@
if (!currentFile.getFile().fits(size))
{
- moveNextFile(false);
+ moveNextFile(true);
// The same check needs to be done at the new file also
if (!currentFile.getFile().fits(size))
@@ -2904,7 +2792,7 @@
{
throw new NullPointerException("Current file = null");
}
-
+
if (tx != null)
{
// The callback of a transaction has to be taken inside the lock,
@@ -2960,196 +2848,26 @@
return currentFile;
}
- /** Get the ID part of the name */
- private long getFileNameID(final String fileName)
+ // You need to guarantee lock.acquire() before calling this method
+ private void moveNextFile(final boolean scheduleReclaim) throws InterruptedException
{
- try
- {
- return Long.parseLong(fileName.substring(filePrefix.length() + 1, fileName.indexOf('.')));
- }
- catch (Throwable e)
- {
- JournalImpl.log.warn("Impossible to get the ID part of the file name " + fileName, e);
- return 0;
- }
- }
+ filesRepository.closeFile(currentFile);
- /**
- * This method will create a new file on the file system, pre-fill it with FILL_CHARACTER
- * @param keepOpened
- * @return
- * @throws Exception
- */
- private JournalFile createFile(final boolean keepOpened,
- final boolean multiAIO,
- final boolean init,
- final boolean tmpCompact) throws Exception
- {
- long fileID = generateFileID();
-
- String fileName;
-
- fileName = createFileName(tmpCompact, fileID);
-
- if (JournalImpl.trace)
+ currentFile = filesRepository.openFile();
+
+ if (scheduleReclaim)
{
- JournalImpl.trace("Creating file " + fileName);
+ scheduleReclaim();
}
- String tmpFileName = fileName + ".tmp";
-
- SequentialFile sequentialFile = fileFactory.createSequentialFile(tmpFileName, maxAIO);
-
- sequentialFile.open(1, false);
-
- if (init)
- {
- sequentialFile.fill(0, fileSize, JournalImpl.FILL_CHARACTER);
-
- initFileHeader(this.fileFactory, sequentialFile, userVersion, fileID);
- }
-
- long position = sequentialFile.position();
-
- sequentialFile.close();
-
if (JournalImpl.trace)
{
- JournalImpl.trace("Renaming file " + tmpFileName + " as " + fileName);
+ JournalImpl.trace("moveNextFile: " + currentFile);
}
- sequentialFile.renameTo(fileName);
-
- if (keepOpened)
- {
- if (multiAIO)
- {
- sequentialFile.open();
- }
- else
- {
- sequentialFile.open(1, false);
- }
- sequentialFile.position(position);
- }
-
- return new JournalFileImpl(sequentialFile, fileID, FORMAT_VERSION);
- }
-
- /**
- * @param tmpCompact
- * @param fileID
- * @return
- */
- private String createFileName(final boolean tmpCompact, long fileID)
- {
- String fileName;
- if (tmpCompact)
- {
- fileName = filePrefix + "-" + fileID + "." + fileExtension + ".cmp";
- }
- else
- {
- fileName = filePrefix + "-" + fileID + "." + fileExtension;
- }
- return fileName;
- }
-
- private void openFile(final JournalFile file, final boolean multiAIO) throws Exception
- {
- if (multiAIO)
- {
- file.getFile().open();
- }
- else
- {
- file.getFile().open(1, false);
- }
-
- file.getFile().position(file.getFile().calculateBlockStart(JournalImpl.SIZE_HEADER));
- }
-
- private long generateFileID()
- {
- return nextFileID.incrementAndGet();
- }
-
- // You need to guarantee lock.acquire() before calling this method
- private void moveNextFile(final boolean synchronous) throws InterruptedException
- {
- closeFile(currentFile, synchronous);
-
- currentFile = enqueueOpenFile(synchronous);
-
- if (JournalImpl.trace)
- {
- JournalImpl.trace("moveNextFile: " + currentFile + " sync: " + synchronous);
- }
-
fileFactory.activateBuffer(currentFile.getFile());
}
- /**
- * <p>This method will instantly return the opened file, and schedule opening and reclaiming.</p>
- * <p>In case there are no cached opened files, this method will block until the file was opened,
- * what would happen only if the system is under heavy load by another system (like a backup system, or a DB sharing the same box as HornetQ).</p>
- * */
- private JournalFile enqueueOpenFile(final boolean synchronous) throws InterruptedException
- {
- if (JournalImpl.trace)
- {
- JournalImpl.trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
- }
-
- Runnable run = new Runnable()
- {
- public void run()
- {
- try
- {
- pushOpenedFile();
- }
- catch (Exception e)
- {
- JournalImpl.log.error(e.getMessage(), e);
- }
- }
- };
-
- if (synchronous)
- {
- run.run();
- }
- else
- {
- filesExecutor.execute(run);
- }
-
- if (!synchronous)
- {
- scheduleReclaim();
- }
-
- JournalFile nextFile = null;
-
- while (nextFile == null)
- {
- nextFile = openedFiles.poll(5, TimeUnit.SECONDS);
- if (nextFile == null)
- {
- JournalImpl.log.warn("Couldn't open a file in 60 Seconds",
- new Exception("Warning: Couldn't open a file in 60 Seconds"));
- }
- }
-
- if (trace)
- {
- JournalImpl.trace("Returning file " + nextFile);
- }
-
- return nextFile;
- }
-
private void scheduleReclaim()
{
if (state != JournalImpl.STATE_LOADED)
@@ -3165,7 +2883,7 @@
{
try
{
- drainClosedFiles();
+ filesRepository.drainClosedFiles();
if (!checkReclaimStatus())
{
checkCompact();
@@ -3180,97 +2898,7 @@
}
}
- /**
- *
- * Open a file and place it into the openedFiles queue
- * */
- private void pushOpenedFile() throws Exception
- {
- JournalFile nextOpenedFile = getFile(true, true, true, false);
- if (trace)
- {
- JournalImpl.trace("pushing openFile " + nextOpenedFile);
- }
-
- openedFiles.offer(nextOpenedFile);
- }
-
- /**
- * @return
- * @throws Exception
- */
- JournalFile getFile(final boolean keepOpened,
- final boolean multiAIO,
- final boolean initFile,
- final boolean tmpCompactExtension) throws Exception
- {
- JournalFile nextOpenedFile = null;
-
- nextOpenedFile = freeFiles.poll();
-
- if (nextOpenedFile == null)
- {
- nextOpenedFile = createFile(keepOpened, multiAIO, initFile, tmpCompactExtension);
- }
- else
- {
- if (tmpCompactExtension)
- {
- SequentialFile sequentialFile = nextOpenedFile.getFile();
- sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
- }
-
- if (keepOpened)
- {
- openFile(nextOpenedFile, multiAIO);
- }
- }
- return nextOpenedFile;
- }
-
- private void closeFile(final JournalFile file, final boolean synchronous)
- {
- fileFactory.deactivateBuffer();
- pendingCloseFiles.add(file);
- dataFiles.add(file);
-
- Runnable run = new Runnable()
- {
- public void run()
- {
- drainClosedFiles();
- }
- };
-
- if (synchronous)
- {
- run.run();
- }
- else
- {
- filesExecutor.execute(run);
- }
-
- }
-
- private void drainClosedFiles()
- {
- JournalFile file;
- try
- {
- while ((file = pendingCloseFiles.poll()) != null)
- {
- file.getFile().close();
- }
- }
- catch (Exception e)
- {
- JournalImpl.log.warn(e.getMessage(), e);
- }
-
- }
-
private JournalTransaction getTransactionInfo(final long txID)
{
JournalTransaction tx = transactions.get(txID);
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java 2010-08-26 17:48:38 UTC (rev 9598)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java 2010-08-26 21:11:42 UTC (rev 9599)
@@ -40,7 +40,7 @@
{
private final boolean isCommit;
- private final long txID;
+ public final long txID;
private final EncodingSupport transactionData;
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java 2010-08-26 17:48:38 UTC (rev 9598)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java 2010-08-26 21:11:42 UTC (rev 9599)
@@ -25,7 +25,7 @@
*/
public class JournalRollbackRecordTX extends JournalInternalRecord
{
- private final long txID;
+ public final long txID;
public JournalRollbackRecordTX(final long txID)
{
Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-08-26 17:48:38 UTC (rev 9598)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-08-26 21:11:42 UTC (rev 9599)
@@ -28,6 +28,7 @@
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.AbstractJournalUpdateTask;
+import org.hornetq.core.journal.impl.ExportJournal;
import org.hornetq.core.journal.impl.JournalCompactor;
import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.journal.impl.JournalFileImpl;
@@ -38,7 +39,6 @@
import org.hornetq.tests.unit.core.journal.impl.fakes.SimpleEncoding;
import org.hornetq.utils.IDGenerator;
import org.hornetq.utils.SimpleIDGenerator;
-import org.hornetq.utils.TimeAndCounterIDGenerator;
/**
*
@@ -53,7 +53,7 @@
private static final int NUMBER_OF_RECORDS = 1000;
- IDGenerator idGenerator = new TimeAndCounterIDGenerator();
+ IDGenerator idGenerator = new SimpleIDGenerator(100000);
// General tests
// =============
@@ -277,13 +277,13 @@
journal.forceMoveNextFile();
addTx(1, 5, 6, 7, 8);
-
+
commit(1);
-
+
journal.forceMoveNextFile();
-
+
journal.compact();
-
+
add(10);
stopJournal();
@@ -806,7 +806,7 @@
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
createJournal();
-
+
startJournal();
load();
@@ -931,7 +931,7 @@
}
startCompact();
-
+
// Delete part of the live records while cleanup still working
for (int i = 1; i < 5; i++)
{
@@ -939,7 +939,7 @@
}
finishCompact();
-
+
// Delete part of the live records after cleanup is done
for (int i = 5; i < 10; i++)
{
@@ -963,7 +963,7 @@
setup(2, 60 * 1024, false);
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
-
+
createJournal();
startJournal();
@@ -978,7 +978,7 @@
addTx(appendTX, appendOne);
startCompact();
-
+
addTx(appendTX, appendTwo);
commit(appendTX);
@@ -1161,7 +1161,7 @@
}
}
-
+
public void testCompactFirstFileWithPendingCommits() throws Exception
{
setup(2, 60 * 1024, true);
@@ -1175,10 +1175,165 @@
{
addTx(tx, idGenerator.generateID());
}
-
+
journal.forceMoveNextFile();
+
+ ArrayList<Long> listToDelete = new ArrayList<Long>();
+ for (int i = 0; i < 10; i++)
+ {
+ if (i == 5)
+ {
+ commit(tx);
+ }
+ long id = idGenerator.generateID();
+ listToDelete.add(id);
+ add(id);
+ }
+
+ journal.forceMoveNextFile();
+
+ for (Long id : listToDelete)
+ {
+ delete(id);
+ }
+
+ journal.forceMoveNextFile();
+
+ // This operation used to be journal.cleanup(journal.getDataFiles()[0]); when cleanup was still in place
+ journal.compact();
+
+ journal.checkReclaimStatus();
+
+ journal.compact();
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testCompactFirstFileWithPendingCommits3() throws Exception
+ {
+ setup(2, 60 * 1024, true);
+
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ long tx = idGenerator.generateID();
+ for (int i = 0; i < 10; i++)
+ {
+ addTx(tx, idGenerator.generateID());
+ }
+
+ journal.forceMoveNextFile();
+
+ ArrayList<Long> listToDelete = new ArrayList<Long>();
+ for (int i = 0; i < 10; i++)
+ {
+ long id = idGenerator.generateID();
+ listToDelete.add(id);
+ add(id);
+ }
+
+ journal.forceMoveNextFile();
+
+ for (Long id : listToDelete)
+ {
+ delete(id);
+ }
+
+ journal.forceMoveNextFile();
+
+ ExportJournal.exportJournal(getTestDir(), filePrefix, fileExtension, 2, this.fileSize, "/tmp/out1.dmp");
+
+ ExportJournal.exportJournal(getTestDir(), filePrefix, fileExtension, 2, this.fileSize, "/tmp/out2.dmp");
+
+ rollback(tx);
+
+ ExportJournal.exportJournal(getTestDir(), filePrefix, fileExtension, 2, this.fileSize, "/tmp/out3.dmp");
+
+ journal.forceMoveNextFile();
+ journal.checkReclaimStatus();
+
+ ExportJournal.exportJournal(getTestDir(), filePrefix, fileExtension, 2, this.fileSize, "/tmp/out4.dmp");
+
+ journal.compact();
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testCompactFirstFileWithPendingCommits2() throws Exception
+ {
+ setup(2, 60 * 1024, true);
+
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ long tx = idGenerator.generateID();
+ for (int i = 0; i < 10; i++)
+ {
+ addTx(tx, idGenerator.generateID());
+ }
+
+ journal.forceMoveNextFile();
+
+ ArrayList<Long> listToDelete = new ArrayList<Long>();
+ for (int i = 0; i < 10; i++)
+ {
+ long id = idGenerator.generateID();
+ listToDelete.add(id);
+ add(id);
+ }
+
+ journal.forceMoveNextFile();
+
+ for (Long id : listToDelete)
+ {
+ delete(id);
+ }
+
+ journal.forceMoveNextFile();
+
+ startCompact();
+ System.out.println("Committing TX " + tx);
commit(tx);
+ finishCompact();
+
+ journal.checkReclaimStatus();
+
+ journal.compact();
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testCompactFirstFileWithPendingCommits4() throws Exception
+ {
+ setup(2, 60 * 1024, true);
+
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ long ids[] = new long[10];
+
+ long tx0 = idGenerator.generateID();
+ for (int i = 0; i < 10; i++)
+ {
+ ids[i] = idGenerator.generateID();
+ addTx(tx0, ids[i]);
+ }
+
+ long tx1 = idGenerator.generateID();
+ journal.forceMoveNextFile();
ArrayList<Long> listToDelete = new ArrayList<Long>();
for (int i = 0; i < 10; i++)
@@ -1187,21 +1342,157 @@
listToDelete.add(id);
add(id);
}
-
+
journal.forceMoveNextFile();
for (Long id : listToDelete)
{
delete(id);
}
+
+ journal.forceMoveNextFile();
+
+ startCompact();
+ System.out.println("Committing TX " + tx1);
+ rollback(tx0);
+ for (int i = 0 ; i < 10; i++)
+ {
+ addTx(tx1, ids[i]);
+ }
+
+ journal.forceMoveNextFile();
+ commit(tx1);
+ finishCompact();
+
+ journal.checkReclaimStatus();
+
+ journal.compact();
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testCompactFirstFileWithPendingCommits5() throws Exception
+ {
+ setup(2, 60 * 1024, true);
+
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ long ids[] = new long[10];
+
+ long tx0 = idGenerator.generateID();
+ for (int i = 0; i < 10; i++)
+ {
+ ids[i] = idGenerator.generateID();
+ addTx(tx0, ids[i]);
+ }
+
+ long tx1 = idGenerator.generateID();
journal.forceMoveNextFile();
-
- // This operation used to be journal.cleanup(journal.getDataFiles()[0]); when cleanup was still in place
+
+ ArrayList<Long> listToDelete = new ArrayList<Long>();
+ for (int i = 0; i < 10; i++)
+ {
+ long id = idGenerator.generateID();
+ listToDelete.add(id);
+ add(id);
+ }
+
+ journal.forceMoveNextFile();
+
+ for (Long id : listToDelete)
+ {
+ delete(id);
+ }
+
+ journal.forceMoveNextFile();
+
+ startCompact();
+ System.out.println("Committing TX " + tx1);
+ rollback(tx0);
+ for (int i = 0 ; i < 10; i++)
+ {
+ addTx(tx1, ids[i]);
+ }
+
+ journal.forceMoveNextFile();
+ commit(tx1);
+ finishCompact();
+
+ journal.checkReclaimStatus();
+
journal.compact();
- journal.checkReclaimStatus();
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testCompactFirstFileWithPendingCommits6() throws Exception
+ {
+ setup(2, 60 * 1024, true);
+
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ long ids[] = new long[10];
+
+ long tx0 = idGenerator.generateID();
+ for (int i = 0; i < 10; i++)
+ {
+ ids[i] = idGenerator.generateID();
+ addTx(tx0, ids[i]);
+ }
+ commit(tx0);
+
+ startCompact();
+ for (int i = 0 ; i < 10; i++)
+ {
+ delete(ids[i]);
+ }
+ finishCompact();
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testCompactFirstFileWithPendingCommits7() throws Exception
+ {
+ setup(2, 60 * 1024, true);
+
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ long tx0 = idGenerator.generateID();
+ add(idGenerator.generateID());
+
+ long ids[] = new long[]{idGenerator.generateID(), idGenerator.generateID()};
+
+ addTx(tx0, ids[0]);
+ addTx(tx0, ids[1]);
+
+ journal.forceMoveNextFile();
+
+ commit(tx0);
+
+ journal.forceMoveNextFile();
+
+ delete(ids[0]);
+ delete(ids[1]);
+
+ journal.forceMoveNextFile();
+
journal.compact();
stopJournal();
Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-08-26 17:48:38 UTC (rev 9598)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-08-26 21:11:42 UTC (rev 9599)
@@ -119,7 +119,7 @@
}
else
{
- factory = new NIOSequentialFileFactory(dir.getPath());
+ factory = new NIOSequentialFileFactory(dir.getPath(), true);
maxAIO = ConfigurationImpl.DEFAULT_JOURNAL_MAX_IO_NIO;
}
@@ -134,9 +134,6 @@
{
protected void onCompactLock() throws Exception
{
- // System.out.println("OnCompactLock");
- journal.forceMoveNextFile(false);
- // System.out.println("OnCompactLock done");
}
protected void onCompactStart() throws Exception
@@ -152,7 +149,7 @@
{
long id = idGen.generateID();
journal.appendAddRecord(id, (byte)0, new byte[] { 1, 2, 3 }, false);
- journal.forceMoveNextFile(false);
+ journal.forceMoveNextFile();
journal.appendDeleteRecord(id, id == 20);
}
// System.out.println("OnCompactStart leave");
14 years, 4 months
JBoss hornetq SVN: r9598 - in trunk/src/main/org/hornetq: ra/inflow and 1 other directory.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-08-26 13:48:38 -0400 (Thu, 26 Aug 2010)
New Revision: 9598
Modified:
trunk/src/main/org/hornetq/jms/client/HornetQSession.java
trunk/src/main/org/hornetq/jms/client/JMSMessageListenerWrapper.java
trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
Log:
https://jira.jboss.org/browse/HORNETQ-495
Modified: trunk/src/main/org/hornetq/jms/client/HornetQSession.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQSession.java 2010-08-25 15:34:03 UTC (rev 9597)
+++ trunk/src/main/org/hornetq/jms/client/HornetQSession.java 2010-08-26 17:48:38 UTC (rev 9598)
@@ -213,6 +213,11 @@
return ackMode;
}
+
+ public boolean isXA()
+ {
+ return xa;
+ }
public void commit() throws JMSException
{
Modified: trunk/src/main/org/hornetq/jms/client/JMSMessageListenerWrapper.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/JMSMessageListenerWrapper.java 2010-08-25 15:34:03 UTC (rev 9597)
+++ trunk/src/main/org/hornetq/jms/client/JMSMessageListenerWrapper.java 2010-08-26 17:48:38 UTC (rev 9598)
@@ -42,9 +42,9 @@
private final boolean transactedOrClientAck;
protected JMSMessageListenerWrapper(final HornetQSession session,
- final ClientConsumer consumer,
- final MessageListener listener,
- final int ackMode)
+ final ClientConsumer consumer,
+ final MessageListener listener,
+ final int ackMode)
{
this.session = session;
@@ -52,7 +52,7 @@
this.listener = listener;
- transactedOrClientAck = ackMode == Session.SESSION_TRANSACTED || ackMode == Session.CLIENT_ACKNOWLEDGE;
+ transactedOrClientAck = (ackMode == Session.SESSION_TRANSACTED || ackMode == Session.CLIENT_ACKNOWLEDGE) || session.isXA();
}
/**
Modified: trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java 2010-08-25 15:34:03 UTC (rev 9597)
+++ trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java 2010-08-26 17:48:38 UTC (rev 9598)
@@ -68,11 +68,16 @@
private boolean useLocalTx;
+ private boolean transacted;
+
private final int sessionNr;
private final TransactionManager tm;
- public HornetQMessageHandler(final HornetQActivation activation, final TransactionManager tm, final ClientSession session, final int sessionNr)
+ public HornetQMessageHandler(final HornetQActivation activation,
+ final TransactionManager tm,
+ final ClientSession session,
+ final int sessionNr)
{
this.activation = activation;
this.session = session;
@@ -96,14 +101,16 @@
{
String subscriptionName = spec.getSubscriptionName();
String clientID = spec.getClientID();
-
+
// Durable sub
if (clientID == null)
{
- throw new InvalidClientIDException("Cannot create durable subscription for " + subscriptionName + " - client ID has not been set");
+ throw new InvalidClientIDException("Cannot create durable subscription for " + subscriptionName +
+ " - client ID has not been set");
}
- SimpleString queueName = new SimpleString(HornetQDestination.createQueueNameForDurableSubscription(clientID, subscriptionName));
+ SimpleString queueName = new SimpleString(HornetQDestination.createQueueNameForDurableSubscription(clientID,
+ subscriptionName));
QueueQuery subResponse = session.queueQuery(queueName);
@@ -123,9 +130,10 @@
SimpleString oldFilterString = subResponse.getFilterString();
boolean selectorChanged = selector == null && oldFilterString != null ||
- oldFilterString == null && selector != null ||
- (oldFilterString != null && selector != null &&
- !oldFilterString.toString().equals(selector));
+ oldFilterString == null &&
+ selector != null ||
+ (oldFilterString != null && selector != null && !oldFilterString.toString()
+ .equals(selector));
SimpleString oldTopicName = subResponse.getAddress();
@@ -155,7 +163,7 @@
}
else
{
- queueName = activation.getTopicTemporaryQueue();
+ queueName = activation.getTopicTemporaryQueue();
}
}
else
@@ -168,6 +176,7 @@
// Create the endpoint, if we are transacted pass the sesion so it is enlisted, unless using Local TX
MessageEndpointFactory endpointFactory = activation.getMessageEndpointFactory();
useLocalTx = !activation.isDeliveryTransacted() && activation.getActivationSpec().isUseLocalTx();
+ transacted = activation.isDeliveryTransacted();
if (activation.isDeliveryTransacted() && !activation.getActivationSpec().isUseLocalTx())
{
endpoint = endpointFactory.createEndpoint(session);
@@ -201,7 +210,7 @@
{
HornetQMessageHandler.log.debug("Error releasing endpoint " + endpoint, t);
}
-
+
try
{
consumer.close();
@@ -246,15 +255,28 @@
try
{
- if(activation.getActivationSpec().getTransactionTimeout() > 0 && tm != null)
+ if (activation.getActivationSpec().getTransactionTimeout() > 0 && tm != null)
{
tm.setTransactionTimeout(activation.getActivationSpec().getTransactionTimeout());
}
endpoint.beforeDelivery(HornetQActivation.ONMESSAGE);
beforeDelivery = true;
msg.doBeforeReceive();
+
+ //In the transacted case the message must be acked *before* onMessage is called
+
+ if (transacted)
+ {
+ message.acknowledge();
+ }
+
((MessageListener)endpoint).onMessage(msg);
- message.acknowledge();
+
+ if (!transacted)
+ {
+ message.acknowledge();
+ }
+
try
{
endpoint.afterDelivery();
14 years, 4 months
JBoss hornetq SVN: r9597 - branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-25 11:34:03 -0400 (Wed, 25 Aug 2010)
New Revision: 9597
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
fix test setup
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-08-25 09:06:02 UTC (rev 9596)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-08-25 15:34:03 UTC (rev 9597)
@@ -1161,15 +1161,14 @@
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(serverTotc);
- ClientSessionFactory sf = locator.createSessionFactory();
+ locator.setFailoverOnServerShutdown(false);
+ locator.setRetryInterval(100);
+ locator.setRetryIntervalMultiplier(1d);
+ locator.setReconnectAttempts(-1);
+ locator.setBlockOnNonDurableSend(blocking);
+ locator.setBlockOnDurableSend(blocking);
- sf.getServerLocator().setFailoverOnServerShutdown(false);
- sf.getServerLocator().setRetryInterval(100);
- sf.getServerLocator().setRetryIntervalMultiplier(1d);
- sf.getServerLocator().setReconnectAttempts(-1);
- sf.getServerLocator().setBlockOnNonDurableSend(blocking);
- sf.getServerLocator().setBlockOnDurableSend(blocking);
-
+ ClientSessionFactory sf = locator.createSessionFactory();
sfs[node] = sf;
}
14 years, 4 months
JBoss hornetq SVN: r9596 - in branches/2_2_0_HA_Improvements/src/main/org/hornetq/core: server/cluster/impl and 1 other directory.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-08-25 05:06:02 -0400 (Wed, 25 Aug 2010)
New Revision: 9596
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
fixed topology size
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-08-25 08:39:01 UTC (rev 9595)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-08-25 09:06:02 UTC (rev 9596)
@@ -1131,7 +1131,7 @@
{
updateArraysAndPairs();
- if (topology.size() == 1 && topology.getMember(this.nodeID) != null)
+ if (topology.nodes() == 1 && topology.getMember(this.nodeID) != null)
{
receivedTopology = false;
}
@@ -1185,7 +1185,7 @@
private void updateArraysAndPairs()
{
topologyArray = (Pair<TransportConfiguration, TransportConfiguration>[]) Array.newInstance(Pair.class,
- topology.size());
+ topology.members());
int count = 0;
for (TopologyMember pair : topology.getMembers())
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java 2010-08-25 08:39:01 UTC (rev 9595)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java 2010-08-25 09:06:02 UTC (rev 9596)
@@ -116,7 +116,7 @@
return topology.values();
}
- public int size()
+ public int nodes()
{
return nodes;
}
@@ -137,4 +137,9 @@
topology.clear();
nodes = 0;
}
+
+ public int members()
+ {
+ return topology.size();
+ }
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-08-25 08:39:01 UTC (rev 9595)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-08-25 09:06:02 UTC (rev 9596)
@@ -254,7 +254,7 @@
listener.nodeUP(nodeID, connectorPair, last, distance);
}
- if (distance < topology.size())
+ if (distance < topology.nodes())
{
for (ClusterTopologyListener listener : clusterConnectionListeners)
{
14 years, 4 months
JBoss hornetq SVN: r9595 - in branches/2_2_0_HA_Improvements: src/main/org/hornetq/api/core and 7 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-08-25 04:39:01 -0400 (Wed, 25 Aug 2010)
New Revision: 9595
Modified:
branches/2_2_0_HA_Improvements/hornetq.ipr
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/TransportConfiguration.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/FakeLockFile.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/FakeLockHornetQServer.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
fixes for shared store backup
Modified: branches/2_2_0_HA_Improvements/hornetq.ipr
===================================================================
--- branches/2_2_0_HA_Improvements/hornetq.ipr 2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/hornetq.ipr 2010-08-25 08:39:01 UTC (rev 9595)
@@ -342,11 +342,8 @@
<option name="SKIP_IMPORT_STATEMENTS" value="false" />
</component>
<component name="EclipseCompilerSettings">
- <option name="DEBUGGING_INFO" value="true" />
<option name="GENERATE_NO_WARNINGS" value="true" />
<option name="DEPRECATION" value="false" />
- <option name="ADDITIONAL_OPTIONS_STRING" value="" />
- <option name="MAXIMUM_HEAP_SIZE" value="128" />
</component>
<component name="EclipseEmbeddedCompilerSettings">
<option name="DEBUGGING_INFO" value="true" />
@@ -423,14 +420,6 @@
<option name="LOCALE" />
<option name="OPEN_IN_BROWSER" value="true" />
</component>
- <component name="JikesSettings">
- <option name="JIKES_PATH" value="" />
- <option name="DEBUGGING_INFO" value="true" />
- <option name="DEPRECATION" value="true" />
- <option name="GENERATE_NO_WARNINGS" value="false" />
- <option name="IS_EMACS_ERRORS_MODE" value="true" />
- <option name="ADDITIONAL_OPTIONS_STRING" value="" />
- </component>
<component name="LogConsolePreferences">
<option name="FILTER_ERRORS" value="false" />
<option name="FILTER_WARNINGS" value="false" />
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/TransportConfiguration.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/TransportConfiguration.java 2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/TransportConfiguration.java 2010-08-25 08:39:01 UTC (rev 9595)
@@ -167,13 +167,13 @@
if (factoryClassName.equals(kother.factoryClassName))
{
- if (params == null)
+ if (params == null || params.isEmpty())
{
- return kother.params == null;
+ return kother.params == null || kother.params.isEmpty();
}
else
{
- if (kother.params == null)
+ if (kother.params == null || kother.params.isEmpty())
{
return false;
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java 2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java 2010-08-25 08:39:01 UTC (rev 9595)
@@ -657,4 +657,8 @@
void close();
boolean isHA();
+
+ void addClusterTopologyListener(ClusterTopologyListener listener);
+
+ void removeClusterTopologyListener(ClusterTopologyListener listener);
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-08-25 08:39:01 UTC (rev 9595)
@@ -39,13 +39,9 @@
String getNodeID();
void connect();
-
- void addClusterTopologyListener(ClusterTopologyListener listener);
-
- void removeClusterTopologyListener(ClusterTopologyListener listener);
-
+
void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance);
-
+
void notifyNodeDown(String nodeID);
void setClusterConnection(boolean clusterConnection);
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java 2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java 2010-08-25 08:39:01 UTC (rev 9595)
@@ -38,16 +38,57 @@
*/
private Map<String, TopologyMember> topology = new HashMap<String, TopologyMember>();
+ int nodes = 0;
+
public synchronized boolean addMember(String nodeId, TopologyMember member)
{
- boolean replaced = topology.containsKey(nodeId);
- topology.put(nodeId, member);
+ boolean replaced = false;
+ TopologyMember currentMember = topology.get(nodeId);
+ if(currentMember == null)
+ {
+ topology.put(nodeId, member);
+ replaced = true;
+ if(member.getConnector().a != null)
+ {
+ nodes++;
+ }
+ if(member.getConnector().b != null)
+ {
+ nodes++;
+ }
+ }
+ else
+ {
+ if(currentMember.getConnector().a == null && member.getConnector().a != null)
+ {
+ currentMember.getConnector().a = member.getConnector().a;
+ replaced = true;
+ nodes++;
+ }
+ if(currentMember.getConnector().b == null && member.getConnector().b != null)
+ {
+ currentMember.getConnector().b = member.getConnector().b;
+ replaced = true;
+ nodes++;
+ }
+ }
return replaced;
}
public synchronized boolean removeMember(String nodeId)
{
TopologyMember member = topology.remove(nodeId);
+ if(member != null)
+ {
+ if(member.getConnector().a != null)
+ {
+ nodes--;
+ }
+ if(member.getConnector().b != null)
+ {
+ nodes--;
+ }
+ }
return (member != null);
}
@@ -77,7 +118,7 @@
public int size()
{
- return topology.size();
+ return nodes;
}
public String describe()
@@ -94,5 +135,6 @@
public void clear()
{
topology.clear();
+ nodes = 0;
}
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java 2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java 2010-08-25 08:39:01 UTC (rev 9595)
@@ -113,7 +113,15 @@
buffer.writeString(nodeID);
if (!exit)
{
- pair.a.encode(buffer);
+ if (pair.a != null)
+ {
+ buffer.writeBoolean(true);
+ pair.a.encode(buffer);
+ }
+ else
+ {
+ buffer.writeBoolean(false);
+ }
if (pair.b != null)
{
buffer.writeBoolean(true);
@@ -134,9 +142,18 @@
exit = buffer.readBoolean();
nodeID = buffer.readString();
if (!exit)
- {
- TransportConfiguration a = new TransportConfiguration();
- a.decode(buffer);
+ {
+ boolean hasLive = buffer.readBoolean();
+ TransportConfiguration a;
+ if(hasLive)
+ {
+ a = new TransportConfiguration();
+ a.decode(buffer);
+ }
+ else
+ {
+ a = null;
+ }
boolean hasBackup = buffer.readBoolean();
TransportConfiguration b;
if (hasBackup)
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-08-25 08:39:01 UTC (rev 9595)
@@ -244,26 +244,22 @@
boolean last,
int distance)
{
- if (nodeID.equals(nodeUUID.toString()))
- {
- return;
- }
-
boolean updated = topology.addMember(nodeID, new TopologyMember(connectorPair, distance));
-
- if (distance >= topology.size() || updated)
+ if(!updated)
{
return;
}
-
for (ClusterTopologyListener listener : clientListeners)
{
listener.nodeUP(nodeID, connectorPair, last, distance);
}
- for (ClusterTopologyListener listener : clusterConnectionListeners)
+ if (distance < topology.size())
{
- listener.nodeUP(nodeID, connectorPair, last, distance);
+ for (ClusterTopologyListener listener : clusterConnectionListeners)
+ {
+ listener.nodeUP(nodeID, connectorPair, last, distance);
+ }
}
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/FakeLockFile.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/FakeLockFile.java 2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/FakeLockFile.java 2010-08-25 08:39:01 UTC (rev 9595)
@@ -13,9 +13,12 @@
package org.hornetq.core.server.cluster.impl;
+import java.io.File;
import java.io.IOException;
+import java.util.HashMap;
import java.util.Map;
import java.util.WeakHashMap;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -40,10 +43,9 @@
private final String directory;
- private static Map<String, Lock> locks = new WeakHashMap<String, Lock>();
-
- private Lock lock;
-
+ private final static Map<String, Semaphore> locks = new WeakHashMap<String, Semaphore>();
+
+ private Semaphore semaphore;
/**
* @param fileName
* @param directory
@@ -56,15 +58,32 @@
synchronized (locks)
{
- String key = directory + fileName;
+ String key = directory + "/" + fileName;
- lock = locks.get(key);
+ semaphore = locks.get(key);
- if (lock == null)
+ if (semaphore == null)
{
- lock = new ReentrantLock(true);
+ semaphore = new Semaphore(1, true);
- locks.put(key, lock);
+ locks.put(key, semaphore);
+
+ File f = new File(directory, fileName);
+
+ try
+ {
+ f.createNewFile();
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ throw new IllegalStateException(e);
+ }
+
+ if(!f.exists())
+ {
+ throw new IllegalStateException("unable to create " + directory + fileName);
+ }
}
}
}
@@ -81,13 +100,38 @@
public void lock() throws IOException
{
- lock.lock();
+ try
+ {
+ semaphore.acquire();
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException(e);
+ }
}
public boolean unlock() throws IOException
{
- lock.unlock();
+ semaphore.release();
return true;
}
+
+ public static void unlock(final String fileName, final String directory)
+ {
+ String key = directory + "/" + fileName;
+
+ Semaphore semaphore = locks.get(key);
+
+ semaphore.release();
+ }
+
+ public static void clearLocks()
+ {
+ for (Semaphore semaphore : locks.values())
+ {
+ semaphore.drainPermits();
+ }
+ locks.clear();
+ }
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-08-25 08:39:01 UTC (rev 9595)
@@ -557,7 +557,6 @@
initialisePart1();
//TODO TODO at this point the clustermanager needs to announce it's presence so the cluster can know about the backup
-
// We now look for the live.lock file - if it doesn't exist it means the live isn't started yet, so we wait
// for that
@@ -574,17 +573,20 @@
liveLock = createLockFile("live.lock", configuration.getJournalDirectory());
+
+ clusterManager.start();
+
log.info("Live server is up - waiting for failover");
liveLock.lock();
-
+ //todo check if we need this or not
// We need to test if the file exists again, since the live might have shutdown
- if (!liveLockFile.exists())
- {
- liveLock.unlock();
+ // if (!liveLockFile.exists())
+ // {
+ // liveLock.unlock();
- continue;
- }
+ // continue;
+ // }
log.info("Obtained live lock");
@@ -600,13 +602,13 @@
initialisePart2();
- log.info("Server is now live");
+ log.info("Back Up Server is now live");
backupLock.unlock();
}
catch (InterruptedException e)
{
- // This can occur when closing if the thread is blocked - it's ok
+ System.out.println("HornetQServerImpl$SharedStoreBackupActivation.run");
}
catch (Exception e)
{
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java 2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java 2010-08-25 08:39:01 UTC (rev 9595)
@@ -24,11 +24,7 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientConsumer;
-import org.hornetq.api.core.client.ClientMessage;
-import org.hornetq.api.core.client.ClientProducer;
-import org.hornetq.api.core.client.ClientSession;
-import org.hornetq.api.core.client.SessionFailureListener;
+import org.hornetq.api.core.client.*;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.client.impl.DelegatingSession;
@@ -171,9 +167,9 @@
for (int i = 0; i < numIts; i++)
{
AsynchronousFailoverTest.log.info("Iteration " + i);
+ ServerLocator locator = getServerLocator();
+ sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
- sf = getSessionFactory();
-
sf.getServerLocator().setBlockOnNonDurableSend(true);
sf.getServerLocator().setBlockOnDurableSend(true);
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2010-08-25 08:39:01 UTC (rev 9595)
@@ -26,16 +26,13 @@
import junit.framework.Assert;
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.Interceptor;
-import org.hornetq.api.core.Message;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.*;
import org.hornetq.api.core.client.*;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.invm.TransportConstants;
+import org.hornetq.core.server.cluster.impl.FakeLockFile;
import org.hornetq.core.transaction.impl.XidImpl;
import org.hornetq.jms.client.HornetQTextMessage;
import org.hornetq.spi.core.protocol.RemotingConnection;
@@ -84,11 +81,17 @@
public void testNonTransacted() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ClientSessionFactoryInternal sf;
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ ServerLocator locator = getServerLocator();
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+
+ sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
ClientSession session = sf.createSession(true, true);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -146,6 +149,8 @@
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -153,11 +158,15 @@
public void testConsumeTransacted() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
ClientSession session = sf.createSession(false, false);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -242,6 +251,8 @@
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -251,11 +262,15 @@
* and the servers should be able to connect without any problems. */
public void testRestartServers() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
ClientSession session = sf.createSession(true, true);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -279,17 +294,14 @@
session.close();
+ server1Service.stop();
server0Service.stop();
- server1Service.stop();
-
+ FakeLockFile.clearLocks();
server1Service.start();
server0Service.start();
- sf = getSessionFactory();
+ sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
-
session = sf.createSession(true, true);
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
@@ -311,6 +323,8 @@
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -319,12 +333,16 @@
// https://jira.jboss.org/jira/browse/HORNETQ-285
public void testFailoverOnInitialConnection() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
- sf.getServerLocator().setFailoverOnInitialConnection(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnInitialConnection(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
// Stop live server
this.server0Service.stop();
@@ -367,6 +385,8 @@
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -377,28 +397,32 @@
* @param latch
* @throws InterruptedException
*/
- private void fail(final ClientSession session, final CountDownLatch latch) throws InterruptedException
+ private void fail(final ClientSession session, final CountDownLatch latch) throws Exception
{
- RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
+ //RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
// Simulate failure on connection
- conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
-
+ //conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
+ server0Service.stop();
// Wait to be informed of failure
- boolean ok = latch.await(1000, TimeUnit.MILLISECONDS);
+ boolean ok = latch.await(10000, TimeUnit.MILLISECONDS);
Assert.assertTrue(ok);
}
public void testTransactedMessagesSentSoRollback() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
ClientSession session = sf.createSession(false, false);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -456,6 +480,8 @@
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -467,11 +493,15 @@
*/
public void testTransactedMessagesSentSoRollbackAndContinueWork() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
ClientSession session = sf.createSession(false, false);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -541,6 +571,8 @@
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -548,11 +580,15 @@
public void testTransactedMessagesNotSentSoNoRollback() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
ClientSession session = sf.createSession(false, false);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -622,6 +658,8 @@
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -629,11 +667,15 @@
public void testTransactedMessagesWithConsumerStartedBeforeFailover() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
ClientSession session = sf.createSession(false, false);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -711,6 +753,8 @@
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -718,11 +762,15 @@
public void testTransactedMessagesConsumedSoRollback() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
ClientSession session1 = sf.createSession(false, false);
session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -794,6 +842,8 @@
session2.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -801,11 +851,15 @@
public void testTransactedMessagesNotConsumedSoNoRollback() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
ClientSession session1 = sf.createSession(false, false);
session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -889,6 +943,8 @@
session2.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -896,11 +952,15 @@
public void testXAMessagesSentSoRollbackOnEnd() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
ClientSession session = sf.createSession(true, false, false);
Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512, "auhsduashd".getBytes());
@@ -959,6 +1019,8 @@
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -966,11 +1028,15 @@
public void testXAMessagesSentSoRollbackOnPrepare() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
ClientSession session = sf.createSession(true, false, false);
Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512, "auhsduashd".getBytes());
@@ -1031,6 +1097,8 @@
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -1039,11 +1107,15 @@
// This might happen if 1PC optimisation kicks in
public void testXAMessagesSentSoRollbackOnCommit() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
ClientSession session = sf.createSession(true, false, false);
Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512, "auhsduashd".getBytes());
@@ -1106,6 +1178,8 @@
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -1113,11 +1187,15 @@
public void testXAMessagesNotSentSoNoRollbackOnCommit() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
ClientSession session = sf.createSession(true, false, false);
Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512, "auhsduashd".getBytes());
@@ -1195,6 +1273,8 @@
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -1202,11 +1282,15 @@
public void testXAMessagesConsumedSoRollbackOnEnd() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
ClientSession session1 = sf.createSession(false, false);
session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -1280,6 +1364,8 @@
session2.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -1287,11 +1373,15 @@
public void testXAMessagesConsumedSoRollbackOnPrepare() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
ClientSession session1 = sf.createSession(false, false);
session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -1376,6 +1466,8 @@
session2.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -1384,11 +1476,14 @@
// 1PC optimisation
public void testXAMessagesConsumedSoRollbackOnCommit() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
ClientSession session1 = sf.createSession(false, false);
session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -1466,6 +1561,8 @@
session2.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -1473,8 +1570,10 @@
public void testCreateNewFactoryAfterFailover() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
ClientSession session = sendAndConsume(sf, true);
final CountDownLatch latch = new CountDownLatch(1);
@@ -1502,13 +1601,14 @@
session.close();
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
session = sendAndConsume(sf, false);
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -1516,11 +1616,15 @@
public void testFailoverMultipleSessionsWithConsumers() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
final int numSessions = 5;
final int numConsumersPerSession = 5;
@@ -1620,6 +1724,8 @@
sendSession.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -1630,11 +1736,14 @@
*/
public void testFailWithBrowser() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
ClientSession session = sf.createSession(true, true);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -1703,6 +1812,8 @@
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -1710,11 +1821,15 @@
public void testFailThenReceiveMoreMessagesAfterFailover() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
ClientSession session = sf.createSession(true, true);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -1785,6 +1900,8 @@
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -1792,11 +1909,12 @@
public void testFailThenReceiveMoreMessagesAfterFailover2() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
- sf.getServerLocator().setBlockOnAcknowledge(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
ClientSession session = sf.createSession(true, true, 0);
@@ -1878,6 +1996,8 @@
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -1905,11 +2025,14 @@
private void testSimpleSendAfterFailover(final boolean durable, final boolean temporary) throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
- sf.getServerLocator().setBlockOnAcknowledge(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
ClientSession session = sf.createSession(true, true, 0);
@@ -1970,6 +2093,8 @@
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -1977,8 +2102,10 @@
public void testForceBlockingReturn() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
+ ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
+
// Add an interceptor to delay the send method so we can get time to cause failover before it returns
server0Service.getRemotingService().addInterceptor(new DelayInterceptor());
@@ -1986,6 +2113,8 @@
sf.getServerLocator().setBlockOnNonDurableSend(true);
sf.getServerLocator().setBlockOnDurableSend(true);
sf.getServerLocator().setBlockOnAcknowledge(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
final ClientSession session = sf.createSession(true, true, 0);
@@ -2043,6 +2172,8 @@
session.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -2050,12 +2181,16 @@
public void testCommitOccurredUnblockedAndResendNoDuplicates() throws Exception
{
- final ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
- sf.getServerLocator().setBlockOnAcknowledge(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ locator.setBlockOnAcknowledge(true);
+ final ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
final ClientSession session = sf.createSession(false, false);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -2203,6 +2338,8 @@
session2.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -2210,11 +2347,14 @@
public void testCommitDidNotOccurUnblockedAndResend() throws Exception
{
- ClientSessionFactoryInternal sf = getSessionFactory();
+ ServerLocator locator = getServerLocator();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
- sf.getServerLocator().setBlockOnAcknowledge(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
final ClientSession session = sf.createSession(false, false);
@@ -2340,6 +2480,8 @@
session2.close();
+ sf.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -2452,4 +2594,6 @@
}
// Inner classes -------------------------------------------------
+
+
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-08-25 08:39:01 UTC (rev 9595)
@@ -13,24 +13,33 @@
package org.hornetq.tests.integration.cluster.failover;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
+import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.remoting.impl.invm.InVMConnector;
import org.hornetq.core.remoting.impl.invm.InVMRegistry;
import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.core.server.ActivateCallback;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.cluster.impl.FakeLockFile;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.tests.util.UnitTestCase;
@@ -80,6 +89,7 @@
{
super.setUp();
clearData();
+ FakeLockFile.clearLocks();
createConfigs();
if (server1Service != null)
@@ -101,6 +111,16 @@
config1.setSecurityEnabled(false);
config1.setSharedStore(true);
config1.setBackup(true);
+ config1.setClustered(true);
+ TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
+ TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
+ List<String> staticConnectors = new ArrayList<String>();
+ staticConnectors.add(liveConnector.getName());
+ ClusterConnectionConfiguration ccc1 = new ClusterConnectionConfiguration("cluster1", "jms", backupConnector.getName(), -1, false, false, 1, 1,
+ staticConnectors);
+ config1.getClusterConfigurations().add(ccc1);
+ config1.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
+ config1.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
server1Service = createFakeLockServer(true, config1);
server1Service.registerActivateCallback(new ActivateCallback()
@@ -129,6 +149,12 @@
config0.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
config0.setSecurityEnabled(false);
config0.setSharedStore(true);
+ config0.setClustered(true);
+ List<String> pairs = null;
+ ClusterConnectionConfiguration ccc0 = new ClusterConnectionConfiguration("cluster1", "jms", liveConnector.getName(), -1, false, false, 1, 1,
+ pairs);
+ config0.getClusterConfigurations().add(ccc0);
+ config0.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
server0Service = createFakeLockServer(true, config0);
}
@@ -179,6 +205,20 @@
super.tearDown();
}
+ protected ClientSessionFactoryInternal createSessionFactoryAndWaitForTopology(ServerLocator locator, int topologyMembers)
+ throws Exception
+ {
+ ClientSessionFactoryInternal sf;
+ CountDownLatch countDownLatch = new CountDownLatch(topologyMembers);
+
+ locator.addClusterTopologyListener(new LatchClusterTopologyListener(countDownLatch));
+
+ sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
+
+ assertTrue(countDownLatch.await(5, TimeUnit.SECONDS));
+ return sf;
+ }
+
protected TransportConfiguration getInVMConnectorTransportConfiguration(final boolean live)
{
if (live)
@@ -251,14 +291,45 @@
protected abstract TransportConfiguration getConnectorTransportConfiguration(final boolean live);
- protected ClientSessionFactoryInternal getSessionFactory() throws Exception
+ protected ServerLocatorInternal getServerLocator() throws Exception
{
- ServerLocator locator = HornetQClient.createServerLocatorWithHA(getConnectorTransportConfiguration(true), getConnectorTransportConfiguration(false));
- return (ClientSessionFactoryInternal) locator.createSessionFactory();
+ ServerLocator locator = HornetQClient.createServerLocatorWithHA(getConnectorTransportConfiguration(true));
+ return (ServerLocatorInternal) locator;
}
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
+ class LatchClusterTopologyListener implements ClusterTopologyListener
+ {
+ final CountDownLatch latch;
+ int liveNodes = 0;
+ int backUpNodes = 0;
+ List<String> liveNode = new ArrayList<String>();
+ List<String> backupNode = new ArrayList<String>();
+ public LatchClusterTopologyListener(CountDownLatch latch)
+ {
+ this.latch = latch;
+ }
+
+ public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance)
+ {
+ if(connectorPair.a != null && !liveNode.contains(connectorPair.a.getName()))
+ {
+ liveNode.add(connectorPair.a.getName());
+ latch.countDown();
+ }
+ if(connectorPair.b != null && !backupNode.contains(connectorPair.b.getName()))
+ {
+ backupNode.add(connectorPair.b.getName());
+ latch.countDown();
+ }
+ }
+
+ public void nodeDown(String nodeID)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+ }
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java 2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java 2010-08-25 08:39:01 UTC (rev 9595)
@@ -17,7 +17,9 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.logging.Logger;
import org.hornetq.tests.util.UnitTestCase;
@@ -84,11 +86,11 @@
}
- protected ClientSessionFactoryInternal getSessionFactory() throws Exception
+ protected ServerLocatorInternal getServerLocator() throws Exception
{
- ClientSessionFactoryInternal sf = super.getSessionFactory();
- sf.getServerLocator().setMinLargeMessageSize(LARGE_MESSAGE_SIZE);
- return sf;
+ ServerLocator locator = super.getServerLocator();
+ locator.setMinLargeMessageSize(LARGE_MESSAGE_SIZE);
+ return (ServerLocatorInternal) locator;
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2010-08-25 08:39:01 UTC (rev 9595)
@@ -22,11 +22,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientConsumer;
-import org.hornetq.api.core.client.ClientMessage;
-import org.hornetq.api.core.client.ClientProducer;
-import org.hornetq.api.core.client.ClientSession;
-import org.hornetq.api.core.client.SessionFailureListener;
+import org.hornetq.api.core.client.*;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.config.Configuration;
@@ -79,11 +75,16 @@
public void internalTestPage(final boolean transacted, final boolean failBeforeConsume) throws Exception
{
- ClientSessionFactoryInternal factory = getSessionFactory();
- factory.getServerLocator().setBlockOnDurableSend(true);
- factory.getServerLocator().setBlockOnAcknowledge(true);
- ClientSession session = factory.createSession(!transacted, !transacted, 0);
+ ServerLocator locator = getServerLocator();
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+
+ //waitForTopology(locator, 1, 1);
+
+ ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
+ ClientSession session = sf.createSession(!transacted, !transacted, 0);
+
try
{
@@ -170,7 +171,7 @@
session.close();
- session = factory.createSession(true, true, 0);
+ session = sf.createSession(true, true, 0);
cons = session.createConsumer(PagingFailoverTest.ADDRESS);
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/FakeLockHornetQServer.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/FakeLockHornetQServer.java 2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/FakeLockHornetQServer.java 2010-08-25 08:39:01 UTC (rev 9595)
@@ -10,6 +10,7 @@
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
+
package org.hornetq.tests.util;
import org.hornetq.core.config.Configuration;
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/UnitTestCase.java 2010-08-25 07:20:38 UTC (rev 9594)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/UnitTestCase.java 2010-08-25 08:39:01 UTC (rev 9595)
@@ -65,6 +65,7 @@
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.cluster.impl.FakeLockFile;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.transaction.impl.XidImpl;
import org.hornetq.jms.client.HornetQTextMessage;
14 years, 4 months
JBoss hornetq SVN: r9594 - trunk/docs/user-manual/zh.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-08-25 03:20:38 -0400 (Wed, 25 Aug 2010)
New Revision: 9594
Modified:
trunk/docs/user-manual/zh/appserver-integration.xml
trunk/docs/user-manual/zh/persistence.xml
Log:
doc sync
Modified: trunk/docs/user-manual/zh/appserver-integration.xml
===================================================================
--- trunk/docs/user-manual/zh/appserver-integration.xml 2010-08-24 16:21:41 UTC (rev 9593)
+++ trunk/docs/user-manual/zh/appserver-integration.xml 2010-08-25 07:20:38 UTC (rev 9594)
@@ -627,6 +627,17 @@
<entry>Integer</entry>
<entry>线程池的大小</entry>
</row>
+ <row>
+ <entry>SetupAttempts</entry>
+ <entry>Integer</entry>
+ <entry>尝试建立JMS连接的次数(默认值是10。-1表示无限次进行尝试)。有时MDB在部署时相关的JMS资源还没有准备好,这时通过多次的
+ 尝试直到JMS资源连接上为止。只适用于内部(Inbound)连接的情况。</entry>
+ </row>
+ <row>
+ <entry>SetupInterval</entry>
+ <entry>Long</entry>
+ <entry>每两次相邻尝试之间的时间间隔,以毫秒为单位(默认值为2000毫秒)。只适用于内部(Inbound)连接的情况。</entry>
+ </row>
</tbody>
</tgroup>
</informaltable>
Modified: trunk/docs/user-manual/zh/persistence.xml
===================================================================
--- trunk/docs/user-manual/zh/persistence.xml 2010-08-24 16:21:41 UTC (rev 9593)
+++ trunk/docs/user-manual/zh/persistence.xml 2010-08-25 07:20:38 UTC (rev 9594)
@@ -56,23 +56,32 @@
</listitem>
</itemizedlist>
<para>标准的HornetQ核心服务器使用了两种日志:</para>
- <itemizedlist>
+ <itemizedlist id="persistence.journallist">
<listitem>
<para>绑定日志</para>
<para>这个日志用来保存与绑定有关的数据。其中包括在HornetQ上部署的队列及其属性,还有ID序列计数器。 </para>
<para>绑定日志是一个NIO型日志。与消息日志相比它的呑吐量是比较低的。</para>
+ <para>这种日志文件的名字采用<literal>hornetq-bindings</literal>作为前缀。每个文件都有
+ <literal>bindings</literal>这样的扩展。文件大小是<literal
+ >1048576</literal>,它的位置在bindings文件夹下。</para>
</listitem>
<listitem>
<para>JMS日志</para>
<para>这个日志保存所有JMS相关的数据,包括JMS队列,话题及连接工厂,以及它们的JNDI绑定信息。</para>
<para>通过管理接口创建的JMS资源将被保存在这个日志中。但是通过配置文件配置的资源则不保存。只有使用JMS时JMS的日志
才被创建。</para>
+ <para>这种日志文件的名字采用<literal>hornetq-jms</literal>作为前缀。每个文件都有
+ <literal>jms</literal>这样的扩展。文件大小是<literal
+ >1048576</literal>,它的位置在bindings文件夹下。</para>
</listitem>
<listitem>
<para>消息日志</para>
<para>这个日志用来存贮所有消息相关的数据,包括消息本身和重复ID缓存。</para>
<para>默认情况下HornetQ总是优先使用AIO型日志。如果AIO型日志不可用(比如在非Linux平台上运行,或系统内核版本不同)
它将自动使用NIO型日志。</para>
+ <para>这种日志文件的名字采用<literal>hornetq-data</literal>。作为前缀。每个文件都有
+ <literal>hq</literal>作为扩展名。默认的文件大小是 <literal
+ >10485760</literal> (可配置)。文件保存在journal文件夹下。</para>
</listitem>
</itemizedlist>
<para>对于超大消息,Hornet将它们保存在消息日志之外的地方。详见<xref linkend="large-messages"/>.</para>
@@ -220,4 +229,35 @@
参数设为<literal>false</literal>即可。 </para>
<para>注意如果你将该参数设为 false来关闭持久化,就意味着所有的绑定数据、消息数据、超大消息数据、重复ID缓冲以及转移(paging)数据都将不会被持久。</para>
</section>
+ <section id="persistence.importexport">
+ <title>导入入/导出日志数据</title>
+ <para>有时你需要使用导入/导出工具来查看日志文件的记录。这个导入/导出工具类在hornetq-core.jar文件中。
+ 使用以下命令可以将日志文件导出为文本文件:</para>
+ <para><literal>java -cp hornetq-core.jar org.hornetq.core.journal.impl.ExportJournal
+ <JournalDirectory> <JournalPrefix> <FileExtension> <FileSize>
+ <FileOutput></literal></para>
+ <para>要将日志文件导入,使用下面的命令(注意你需要netty.jar):</para>
+ <para><literal>java -cp hornetq-core.jar:netty.jar org.hornetq.core.journal.impl.ImportJournal
+ <JournalDirectory> <JournalPrefix> <FileExtension> <FileSize>
+ <FileInput></literal></para>
+ <itemizedlist>
+ <listitem>
+ <para>JournalDirectory:文件的位置,如./hornetq/data/journal</para>
+ </listitem>
+ <listitem>
+ <para>JournalPrefix: 日志文件的前缀。<link linkend="persistence.journallist">这里</link>有关于前缀的详细描述。</para>
+ </listitem>
+ <listitem>
+ <para>FileExtension: 文件的扩展名。详细讨论参见<link linkend="persistence.journallist">这里</link>。
+ </para>
+ </listitem>
+ <listitem>
+ <para>FileSize:日志文件的大小。详细讨论参见<link linkend="persistence.journallist">这里</link>。</para>
+ </listitem>
+ <listitem>
+ <para>FileOutput:输出的文本文件名。</para>
+ </listitem>
+ </itemizedlist>
+ </section>
+
</chapter>
14 years, 5 months
JBoss hornetq SVN: r9592 - branches/Branch_2_1/src/main/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-24 10:33:40 -0400 (Tue, 24 Aug 2010)
New Revision: 9592
Modified:
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
Log:
typo
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-24 14:21:23 UTC (rev 9591)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-24 14:33:40 UTC (rev 9592)
@@ -1667,7 +1667,6 @@
else
{
log.warn("Couldn't find tx=" + newTransaction.getId() + " to merge after compacting");
- System.exit(-1);
}
}
}
14 years, 5 months
JBoss hornetq SVN: r9591 - in branches/Branch_2_1: src/main/org/hornetq/core/journal and 5 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-24 10:21:23 -0400 (Tue, 24 Aug 2010)
New Revision: 9591
Added:
branches/Branch_2_1/merge-activity.txt
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/CompactJournal.java
branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/OldFormatTest.java
Modified:
branches/Branch_2_1/src/main/org/hornetq/core/journal/RecordInfo.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/TestableJournal.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ExportJournal.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ImportJournal.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFile.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/Reclaimer.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecord.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecordTX.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecord.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecordTX.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalInternalRecord.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java
branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
branches/Branch_2_1/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java
Log:
merge journal fixes from trunk
Added: branches/Branch_2_1/merge-activity.txt
===================================================================
--- branches/Branch_2_1/merge-activity.txt (rev 0)
+++ branches/Branch_2_1/merge-activity.txt 2010-08-24 14:21:23 UTC (rev 9591)
@@ -0,0 +1,9 @@
+Detailed list of merges that happened at this branch.
+
+
+- Date - author - Description
+
+- 24-aug-2010 - clebert - Branch created from https://svn.jboss.org/repos/hornetq/tags/HornetQ_2_1_2_Final/
+
+- 24-aug-2010 - clebert - merge from trunk -r9588:9590
+ There was also a manual copy of JournalImpl.java on this merge, since there was a minor change before that needed to be applied
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/RecordInfo.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/RecordInfo.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/RecordInfo.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -23,7 +23,7 @@
*/
public class RecordInfo
{
- public RecordInfo(final long id, final byte userRecordType, final byte[] data, final boolean isUpdate)
+ public RecordInfo(final long id, final byte userRecordType, final byte[] data, final boolean isUpdate, final short compactCount)
{
this.id = id;
@@ -32,8 +32,15 @@
this.data = data;
this.isUpdate = isUpdate;
+
+ this.compactCount = compactCount;
}
+ /** How many times this record was compacted (up to 7 times)
+ After the record has reached 7 times, it will always be 7
+ As we only store up to 0x7 binary, as part of the recordID (binary 111) */
+ public final short compactCount;
+
public final long id;
public final byte userRecordType;
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/TestableJournal.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/TestableJournal.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/TestableJournal.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -57,8 +57,6 @@
void compact() throws Exception;
- void cleanUp(final JournalFile file) throws Exception;
-
JournalFile getCurrentFile();
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -218,7 +218,7 @@
sequentialFile.open(1, false);
- currentFile = new JournalFileImpl(sequentialFile, nextOrderingID++);
+ currentFile = new JournalFileImpl(sequentialFile, nextOrderingID++, JournalImpl.FORMAT_VERSION);
JournalImpl.writeHeader(writingChannel, journal.getUserVersion(), currentFile.getFileID());
}
Added: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/CompactJournal.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/CompactJournal.java (rev 0)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/CompactJournal.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.List;
+
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.utils.Base64;
+
+/**
+ * This is an undocumented class, that will open a journal and force compacting on it.
+ * It may be used under special cases, but it shouldn't be needed under regular circunstances as the system should detect
+ * the need for compacting.
+ *
+ * The regular use is to configure min-compact parameters.
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class CompactJournal
+{
+
+ public static void main(String arg[])
+ {
+ if (arg.length != 4)
+ {
+ System.err.println("Use: java -cp hornetq-core.jar org.hornetq.core.journal.impl.CompactJournal <JournalDirectory> <JournalPrefix> <FileExtension> <FileSize>");
+ return;
+ }
+
+ try
+ {
+ compactJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]));
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+
+ }
+
+ public static void compactJournal(String directory,
+ String journalPrefix,
+ String journalSuffix,
+ int minFiles,
+ int fileSize) throws Exception
+ {
+ NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory);
+
+ JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
+
+ journal.start();
+
+ journal.loadInternalOnly();
+
+ journal.compact();
+
+ journal.stop();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ExportJournal.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ExportJournal.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ExportJournal.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -183,6 +183,8 @@
recordInfo.data.length +
",isUpdate@" +
recordInfo.isUpdate +
+ ",compactCount@" +
+ recordInfo.compactCount +
",data@" +
encode(recordInfo.data);
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ImportJournal.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ImportJournal.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ImportJournal.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -279,11 +279,11 @@
protected static RecordInfo parseRecord(Properties properties) throws Exception
{
- int id = parseInt("id", properties);
+ long id = parseLong("id", properties);
byte userRecordType = parseByte("userRecordType", properties);
boolean isUpdate = parseBoolean("isUpdate", properties);
byte[] data = parseEncoding("data", properties);
- return new RecordInfo(id, userRecordType, data, isUpdate);
+ return new RecordInfo(id, userRecordType, data, isUpdate, (short)0);
}
private static byte[] parseEncoding(String name, Properties properties) throws Exception
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCleaner.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCleaner.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -1,212 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.journal.impl;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.hornetq.core.journal.RecordInfo;
-import org.hornetq.core.journal.SequentialFileFactory;
-import org.hornetq.core.journal.impl.dataformat.ByteArrayEncoding;
-import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
-import org.hornetq.core.journal.impl.dataformat.JournalAddRecordTX;
-import org.hornetq.core.journal.impl.dataformat.JournalCompleteRecordTX;
-import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecord;
-import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecordTX;
-import org.hornetq.core.journal.impl.dataformat.JournalRollbackRecordTX;
-
-/**
- * A JournalCleaner
- *
- * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class JournalCleaner extends AbstractJournalUpdateTask
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private final HashMap<Long, AtomicInteger> transactionCounter = new HashMap<Long, AtomicInteger>();
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
- /**
- * @param fileFactory
- * @param journal
- * @param nextOrderingID
- */
- protected JournalCleaner(final SequentialFileFactory fileFactory,
- final JournalImpl journal,
- final Set<Long> recordsSnapshot,
- final long nextOrderingID) throws Exception
- {
- super(fileFactory, journal, recordsSnapshot, nextOrderingID);
- openFile();
- }
-
- // Public --------------------------------------------------------
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.impl.JournalReaderCallback#markAsDataFile(org.hornetq.core.journal.impl.JournalFile)
- */
- public void markAsDataFile(final JournalFile file)
- {
- // nothing to be done here
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadAddRecord(org.hornetq.core.journal.RecordInfo)
- */
- public void onReadAddRecord(final RecordInfo info) throws Exception
- {
- if (lookupRecord(info.id))
- {
- writeEncoder(new JournalAddRecord(true, info.id, info.getUserRecordType(), new ByteArrayEncoding(info.data)));
- }
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadAddRecordTX(long, org.hornetq.core.journal.RecordInfo)
- */
- public void onReadAddRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
- {
- if (lookupRecord(recordInfo.id))
- {
- incrementTransactionCounter(transactionID);
-
- writeEncoder(new JournalAddRecordTX(true,
- transactionID,
- recordInfo.id,
- recordInfo.getUserRecordType(),
- new ByteArrayEncoding(recordInfo.data)));
- }
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadCommitRecord(long, int)
- */
- public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
- {
- int txcounter = getTransactionCounter(transactionID);
-
- writeEncoder(new JournalCompleteRecordTX(true, transactionID, null), txcounter);
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadDeleteRecord(long)
- */
- public void onReadDeleteRecord(final long recordID) throws Exception
- {
- writeEncoder(new JournalDeleteRecord(recordID));
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadDeleteRecordTX(long, org.hornetq.core.journal.RecordInfo)
- */
- public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
- {
- incrementTransactionCounter(transactionID);
-
- writeEncoder(new JournalDeleteRecordTX(transactionID, recordInfo.id, new ByteArrayEncoding(recordInfo.data)));
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadPrepareRecord(long, byte[], int)
- */
- public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception
- {
- int txcounter = getTransactionCounter(transactionID);
-
- writeEncoder(new JournalCompleteRecordTX(false, transactionID, new ByteArrayEncoding(extraData)), txcounter);
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadRollbackRecord(long)
- */
- public void onReadRollbackRecord(final long transactionID) throws Exception
- {
- writeEncoder(new JournalRollbackRecordTX(transactionID));
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadUpdateRecord(org.hornetq.core.journal.RecordInfo)
- */
- public void onReadUpdateRecord(final RecordInfo recordInfo) throws Exception
- {
- if (lookupRecord(recordInfo.id))
- {
- writeEncoder(new JournalAddRecord(false,
- recordInfo.id,
- recordInfo.userRecordType,
- new ByteArrayEncoding(recordInfo.data)));
- }
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadUpdateRecordTX(long, org.hornetq.core.journal.RecordInfo)
- */
- public void onReadUpdateRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
- {
- if (lookupRecord(recordInfo.id))
- {
- incrementTransactionCounter(transactionID);
-
- writeEncoder(new JournalAddRecordTX(false,
- transactionID,
- recordInfo.id,
- recordInfo.userRecordType,
- new ByteArrayEncoding(recordInfo.data)));
- }
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- protected int incrementTransactionCounter(final long transactionID)
- {
- AtomicInteger counter = transactionCounter.get(transactionID);
- if (counter == null)
- {
- counter = new AtomicInteger(0);
- transactionCounter.put(transactionID, counter);
- }
-
- return counter.incrementAndGet();
- }
-
- protected int getTransactionCounter(final long transactionID)
- {
- AtomicInteger counter = transactionCounter.get(transactionID);
- if (counter == null)
- {
- return 0;
- }
- else
- {
- return counter.intValue();
- }
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -47,6 +47,11 @@
{
private static final Logger log = Logger.getLogger(JournalCompactor.class);
+
+ // We try to separate old record from new ones when doing the compacting
+ // this is a split line
+ // We will force a moveNextFiles when the compactCount is bellow than COMPACT_SPLIT_LINE
+ private final short COMPACT_SPLIT_LINE = 2;
// Snapshot of transactions that were pending when the compactor started
private final Map<Long, PendingTransaction> pendingTransactions = new ConcurrentHashMap<Long, PendingTransaction>();
@@ -69,7 +74,7 @@
if (controlFile.exists())
{
- JournalFile file = new JournalFileImpl(controlFile, 0);
+ JournalFile file = new JournalFileImpl(controlFile, 0, JournalImpl.FORMAT_VERSION);
final ArrayList<RecordInfo> records = new ArrayList<RecordInfo>();
@@ -209,19 +214,65 @@
private void checkSize(final int size) throws Exception
{
+ checkSize(size, -1);
+ }
+
+ private void checkSize(final int size, final int compactCount) throws Exception
+ {
if (getWritingChannel() == null)
{
- openFile();
+ if (!checkCompact(compactCount))
+ {
+ // will need to open a file either way
+ openFile();
+ }
}
else
{
+ if (compactCount >= 0)
+ {
+ if (checkCompact(compactCount))
+ {
+ // The file was already moved on this case, no need to check for the size.
+ // otherwise we will also need to check for the size
+ return;
+ }
+ }
+
if (getWritingChannel().writerIndex() + size > getWritingChannel().capacity())
{
openFile();
}
}
}
+
+ int currentCount;
+ // This means we will need to split when the compactCount is bellow the watermark
+ boolean willNeedToSplit = false;
+ boolean splitted = false;
+ private boolean checkCompact(final int compactCount) throws Exception
+ {
+ if (compactCount >= COMPACT_SPLIT_LINE && !splitted)
+ {
+ willNeedToSplit = true;
+ }
+
+ if (willNeedToSplit && compactCount < COMPACT_SPLIT_LINE)
+ {
+ willNeedToSplit = false;
+ splitted = false;
+ openFile();
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+
+
/**
* Replay pending counts that happened during compacting
*/
@@ -252,9 +303,10 @@
info.id,
info.getUserRecordType(),
new ByteArrayEncoding(info.data));
+ addRecord.setCompactCount((short)(info.compactCount + 1));
+
+ checkSize(addRecord.getEncodeSize(), info.compactCount);
- checkSize(addRecord.getEncodeSize());
-
writeEncoder(addRecord);
newRecords.put(info.id, new JournalRecord(currentFile, addRecord.getEncodeSize()));
@@ -273,7 +325,9 @@
info.getUserRecordType(),
new ByteArrayEncoding(info.data));
- checkSize(record.getEncodeSize());
+ record.setCompactCount((short)(info.compactCount + 1));
+
+ checkSize(record.getEncodeSize(), info.compactCount);
newTransaction.addPositive(currentFile, info.id, record.getEncodeSize());
@@ -395,7 +449,9 @@
info.userRecordType,
new ByteArrayEncoding(info.data));
- checkSize(updateRecord.getEncodeSize());
+ updateRecord.setCompactCount((short)(info.compactCount + 1));
+
+ checkSize(updateRecord.getEncodeSize(), info.compactCount);
JournalRecord newRecord = newRecords.get(info.id);
@@ -425,7 +481,9 @@
info.userRecordType,
new ByteArrayEncoding(info.data));
- checkSize(updateRecordTX.getEncodeSize());
+ updateRecordTX.setCompactCount((short)(info.compactCount + 1));
+
+ checkSize(updateRecordTX.getEncodeSize(), info.compactCount);
writeEncoder(updateRecordTX);
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFile.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFile.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFile.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -53,10 +53,6 @@
boolean isCanReclaim();
- void setNeedCleanup(boolean needCleanup);
-
- boolean isNeedCleanup();
-
long getOffset();
/** This is a field to identify that records on this file actually belong to the current file.
@@ -64,6 +60,8 @@
int getRecordID();
long getFileID();
+
+ int getJournalVersion();
SequentialFile getFile();
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -36,7 +36,7 @@
private final SequentialFile file;
private final long fileID;
-
+
private final int recordID;
private long offset;
@@ -47,19 +47,20 @@
private boolean canReclaim;
- private boolean needCleanup;
-
private AtomicInteger totalNegativeToOthers = new AtomicInteger(0);
-
+ private final int version;
+
private final Map<JournalFile, AtomicInteger> negCounts = new ConcurrentHashMap<JournalFile, AtomicInteger>();
- public JournalFileImpl(final SequentialFile file, final long fileID)
+ public JournalFileImpl(final SequentialFile file, final long fileID, final int version)
{
this.file = file;
this.fileID = fileID;
-
+
+ this.version = version;
+
this.recordID = (int)(fileID & (long)Integer.MAX_VALUE);
}
@@ -81,16 +82,6 @@
return canReclaim;
}
- public boolean isNeedCleanup()
- {
- return needCleanup;
- }
-
- public void setNeedCleanup(final boolean needCleanup)
- {
- this.needCleanup = needCleanup;
- }
-
public void setCanReclaim(final boolean canReclaim)
{
this.canReclaim = canReclaim;
@@ -119,6 +110,11 @@
}
}
+ public int getJournalVersion()
+ {
+ return version;
+ }
+
public boolean resetNegCount(final JournalFile file)
{
return negCounts.remove(file) != null;
@@ -148,7 +144,7 @@
{
return fileID;
}
-
+
public int getRecordID()
{
return recordID;
@@ -227,12 +223,10 @@
{
return liveBytes.get();
}
-
+
public int getTotalNegativeToOthers()
{
return totalNegativeToOthers.get();
}
-
-
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -67,6 +68,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.utils.DataConstants;
+
/**
*
* <p>A circular log implementation.</p
@@ -88,7 +90,9 @@
private static final int STATE_LOADED = 2;
- private static final int FORMAT_VERSION = 1;
+ public static final int FORMAT_VERSION = 2;
+
+ private static final int COMPATIBLE_VERSIONS[] = new int[] {1};
// Static --------------------------------------------------------
@@ -183,9 +187,9 @@
public final String fileExtension;
- private final LinkedBlockingDeque<JournalFile> dataFiles = new LinkedBlockingDeque<JournalFile>();
+ private final BlockingDeque<JournalFile> dataFiles = new LinkedBlockingDeque<JournalFile>();
- private final LinkedBlockingDeque<JournalFile> pendingCloseFiles = new LinkedBlockingDeque<JournalFile>();
+ private final BlockingQueue<JournalFile> pendingCloseFiles = new LinkedBlockingDeque<JournalFile>();
private final ConcurrentLinkedQueue<JournalFile> freeFiles = new ConcurrentLinkedQueue<JournalFile>();
@@ -398,9 +402,10 @@
try
{
- long fileID = readFileHeader(file);
+
+ JournalFileImpl jrnFile = readFileHeader(file);
- orderedFiles.add(new JournalFileImpl(file, fileID));
+ orderedFiles.add(jrnFile);
}
finally
{
@@ -499,6 +504,21 @@
continue;
}
+ short compactCount = 0;
+
+ if (file.getJournalVersion() >= 2)
+ {
+ if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_BYTE))
+ {
+ reader.markAsDataFile(file);
+
+ wholeFileBuffer.position(pos + 1);
+ continue;
+ }
+
+ compactCount = wholeFileBuffer.get();
+ }
+
long transactionID = 0;
if (JournalImpl.isTransaction(recordType))
@@ -602,7 +622,7 @@
variableSize = 0;
}
- int recordSize = JournalImpl.getRecordSize(recordType);
+ int recordSize = JournalImpl.getRecordSize(recordType, file.getJournalVersion());
// VI - this is completing V, We will validate the size at the end
// of the record,
@@ -676,13 +696,13 @@
{
case ADD_RECORD:
{
- reader.onReadAddRecord(new RecordInfo(recordID, userRecordType, record, false));
+ reader.onReadAddRecord(new RecordInfo(recordID, userRecordType, record, false, compactCount));
break;
}
case UPDATE_RECORD:
{
- reader.onReadUpdateRecord(new RecordInfo(recordID, userRecordType, record, true));
+ reader.onReadUpdateRecord(new RecordInfo(recordID, userRecordType, record, true, compactCount));
break;
}
@@ -694,19 +714,19 @@
case ADD_RECORD_TX:
{
- reader.onReadAddRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, false));
+ reader.onReadAddRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, false, compactCount));
break;
}
case UPDATE_RECORD_TX:
{
- reader.onReadUpdateRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, true));
+ reader.onReadUpdateRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, true, compactCount));
break;
}
case DELETE_RECORD_TX:
{
- reader.onReadDeleteRecordTX(transactionID, new RecordInfo(recordID, (byte)0, record, true));
+ reader.onReadDeleteRecordTX(transactionID, new RecordInfo(recordID, (byte)0, record, true, compactCount));
break;
}
@@ -1647,6 +1667,7 @@
else
{
log.warn("Couldn't find tx=" + newTransaction.getId() + " to merge after compacting");
+ System.exit(-1);
}
}
}
@@ -1693,6 +1714,7 @@
* <tr><td><b>Field Name</b></td><td><b>Size</b></td></tr>
* <tr><td>RecordType</td><td>Byte (1)</td></tr>
* <tr><td>FileID</td><td>Integer (4 bytes)</td></tr>
+ * <tr><td>Compactor Counter</td><td>1 byte</td></tr>
* <tr><td>TransactionID <i>(if record is transactional)</i></td><td>Long (8 bytes)</td></tr>
* <tr><td>RecordID</td><td>Long (8 bytes)</td></tr>
* <tr><td>BodySize(Add, update and delete)</td><td>Integer (4 bytes)</td></tr>
@@ -1708,6 +1730,7 @@
* <tr><td><b>Field Name</b></td><td><b>Size</b></td></tr>
* <tr><td>RecordType</td><td>Byte (1)</td></tr>
* <tr><td>FileID</td><td>Integer (4 bytes)</td></tr>
+ * <tr><td>Compactor Counter</td><td>1 byte</td></tr>
* <tr><td>TransactionID <i>(if record is transactional)</i></td><td>Long (8 bytes)</td></tr>
* <tr><td>ExtraDataLength (Prepares only)</td><td>Integer (4 bytes)</td></tr>
* <tr><td>Number Of Files (N)</td><td>Integer (4 bytes)</td></tr>
@@ -1776,7 +1799,7 @@
loadManager.addRecord(info);
- records.put(info.id, new JournalRecord(file, info.data.length + JournalImpl.SIZE_ADD_RECORD));
+ records.put(info.id, new JournalRecord(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1));
}
public void onReadUpdateRecord(final RecordInfo info) throws Exception
@@ -1795,7 +1818,7 @@
// have been deleted
// just leaving some updates in this file
- posFiles.addUpdateFile(file, info.data.length + JournalImpl.SIZE_ADD_RECORD);
+ posFiles.addUpdateFile(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1); // +1 = compact count
}
}
@@ -1845,7 +1868,7 @@
transactions.put(transactionID, tnp);
}
- tnp.addPositive(file, info.id, info.data.length + JournalImpl.SIZE_ADD_RECORD_TX);
+ tnp.addPositive(file, info.id, info.data.length + JournalImpl.SIZE_ADD_RECORD_TX + 1); // +1 = compact count
}
public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception
@@ -2135,7 +2158,6 @@
if (file.isCanReclaim())
{
// File can be reclaimed or deleted
-
if (JournalImpl.trace)
{
JournalImpl.trace("Reclaiming file " + file);
@@ -2149,67 +2171,6 @@
addFreeFile(file, false);
}
}
-
- int nCleanup = 0;
- for (JournalFile file : dataFiles)
- {
- if (file.isNeedCleanup())
- {
- nCleanup++;
- }
- }
-
- if (compactMinFiles > 0)
- {
- if (nCleanup > 0 && needsCompact())
- {
- for (JournalFile file : dataFiles)
- {
- if (file.isNeedCleanup())
- {
- final JournalFile cleanupFile = file;
-
- if (compactorRunning.compareAndSet(false, true))
- {
- // The cleanup should happen rarely.
- // but when it happens it needs to use a different thread,
- // or opening new files or any other executor's usage will be blocked while the cleanUp is being
- // processed.
-
- compactorExecutor.execute(new Runnable()
- {
- public void run()
- {
- try
- {
- cleanUp(cleanupFile);
- }
- catch (Throwable e)
- {
- JournalImpl.log.warn(e.getMessage(), e);
- }
- finally
- {
- compactorRunning.set(false);
- if (autoReclaim)
- {
- scheduleReclaim();
- }
- }
- }
- });
- }
- return true;
- }
- else
- {
- // We only cleanup the first files
- // if a middle file needs cleanup it will be done through compacting
- break;
- }
- }
- }
- }
}
finally
{
@@ -2219,132 +2180,9 @@
return false;
}
- // This method is public for tests
- public synchronized void cleanUp(final JournalFile file) throws Exception
- {
- if (state != JournalImpl.STATE_LOADED)
- {
- return;
- }
+
+ int deleteme = 0;
- try
- {
- JournalCleaner cleaner = null;
- ArrayList<JournalFile> dependencies = new ArrayList<JournalFile>();
- compactingLock.writeLock().lock();
-
- try
- {
-
- if (JournalImpl.trace)
- {
- JournalImpl.trace("Cleaning up file " + file);
- }
- JournalImpl.log.debug("Cleaning up file " + file);
-
- if (file.getPosCount() == 0)
- {
- // nothing to be done
- return;
- }
-
- // We don't want this file to be reclaimed during the cleanup
- file.incPosCount();
-
- // The file will have all the deleted records removed, so all the NegCount towards the file being cleaned up
- // could be reset
- for (JournalFile jrnFile : dataFiles)
- {
- if (jrnFile.resetNegCount(file))
- {
- dependencies.add(jrnFile);
- jrnFile.incPosCount(); // this file can't be reclaimed while cleanup is being done
- }
- }
-
- currentFile.resetNegCount(file);
- currentFile.incPosCount();
- dependencies.add(currentFile);
-
- cleaner = new JournalCleaner(fileFactory, this, records.keySet(), file.getFileID());
- }
- finally
- {
- compactingLock.writeLock().unlock();
- }
-
- compactingLock.readLock().lock();
-
- try
- {
- JournalImpl.readJournalFile(fileFactory, file, cleaner);
- }
- catch (Throwable e)
- {
- log.warn("Error reading cleanup on " + file, e);
- throw new Exception("Error reading cleanup on " + file, e);
- }
-
- cleaner.flush();
-
- // pointcut for tests
- // We need to test concurrent updates on the journal, as the compacting is being performed.
- // Usually tests will use this to hold the compacting while other structures are being updated.
- onCompactDone();
-
- for (JournalFile jrnfile : dependencies)
- {
- jrnfile.decPosCount();
- }
- file.decPosCount();
-
- SequentialFile tmpFile = cleaner.currentFile.getFile();
- String tmpFileName = tmpFile.getFileName();
- String cleanedFileName = file.getFile().getFileName();
-
- SequentialFile controlFile = createControlFile(null, null, new Pair<String, String>(tmpFileName,
- cleanedFileName));
-
- SequentialFile returningFile = fileFactory.createSequentialFile(file.getFile().getFileName(), maxAIO);
-
- returningFile.renameTo(renameExtensionFile(tmpFileName, ".cmp") + ".tmp");
-
- tmpFile.renameTo(cleanedFileName);
-
- controlFile.delete();
-
- final JournalFile retJournalfile = new JournalFileImpl(returningFile, -1);
-
- if (trace)
- {
- trace("Adding free file back from cleanup" + retJournalfile);
- }
-
- filesExecutor.execute(new Runnable()
- {
- public void run()
- {
- try
- {
- addFreeFile(retJournalfile, true);
- }
- catch (Throwable e)
- {
- log.warn("Error reinitializing file " + file, e);
- }
-
- }
- });
-
- }
- finally
- {
- compactingLock.readLock().unlock();
- JournalImpl.log.debug("Clean up on file " + file + " done");
- }
-
- }
-
private boolean needsCompact() throws Exception
{
JournalFile[] dataFiles = getDataFiles();
@@ -2359,8 +2197,10 @@
long totalBytes = (long)dataFiles.length * (long)fileSize;
long compactMargin = (long)(totalBytes * compactPercentage);
+
+ boolean needCompact = (totalLiveSize < compactMargin && !compactorRunning.get() && dataFiles.length > compactMinFiles);
- return (totalLiveSize < compactMargin && !compactorRunning.get() && dataFiles.length > compactMinFiles);
+ return needCompact;
}
@@ -2840,7 +2680,7 @@
int position = initFileHeader(this.fileFactory, sf, userVersion, newFileID);
- JournalFile jf = new JournalFileImpl(sf, newFileID);
+ JournalFile jf = new JournalFileImpl(sf, newFileID, FORMAT_VERSION);
sf.position(position);
@@ -2886,7 +2726,7 @@
return recordType >= JournalImpl.ADD_RECORD && recordType <= JournalImpl.DELETE_RECORD_TX;
}
- private static int getRecordSize(final byte recordType)
+ private static int getRecordSize(final byte recordType, final int journalVersion)
{
// The record size (without the variable portion)
int recordSize = 0;
@@ -2925,7 +2765,14 @@
throw new IllegalStateException("Record other than expected");
}
- return recordSize;
+ if (journalVersion >= 2)
+ {
+ return recordSize + 1;
+ }
+ else
+ {
+ return recordSize;
+ }
}
/**
@@ -2933,17 +2780,30 @@
* @return
* @throws Exception
*/
- private long readFileHeader(SequentialFile file) throws Exception
+ private JournalFileImpl readFileHeader(SequentialFile file) throws Exception
{
ByteBuffer bb = fileFactory.newBuffer(JournalImpl.SIZE_HEADER);
file.read(bb);
int journalVersion = bb.getInt();
-
+
if (journalVersion != FORMAT_VERSION)
{
- throw new HornetQException(HornetQException.IO_ERROR, "Journal files version mismatch");
+ boolean isCompatible = false;
+
+ for (int v : COMPATIBLE_VERSIONS)
+ {
+ if (v == journalVersion)
+ {
+ isCompatible = true;
+ }
+ }
+
+ if (!isCompatible)
+ {
+ throw new HornetQException(HornetQException.IO_ERROR, "Journal files version mismatch. You should export the data from the previous version and import it as explained on the user's manual");
+ }
}
int readUserVersion = bb.getInt();
@@ -2958,7 +2818,8 @@
fileFactory.releaseBuffer(bb);
bb = null;
- return fileID;
+
+ return new JournalFileImpl(file, fileID, journalVersion);
}
/**
@@ -3173,7 +3034,7 @@
sequentialFile.position(position);
}
- return new JournalFileImpl(sequentialFile, fileID);
+ return new JournalFileImpl(sequentialFile, fileID, FORMAT_VERSION);
}
/**
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/Reclaimer.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/Reclaimer.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/Reclaimer.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -53,8 +53,6 @@
JournalFile currentFile = files[i];
- currentFile.setNeedCleanup(false);
-
int posCount = currentFile.getPosCount();
int totNeg = 0;
@@ -101,18 +99,7 @@
{
Reclaimer.trace(currentFile + " Can't be reclaimed because " + file + " has negative values");
}
- file.setNeedCleanup(true);
-
- if (file.getTotalNegativeToOthers() == 0)
- {
- file.setNeedCleanup(true);
- }
- else
- {
- // This file can't be cleared as the file has negatives to other files as well
- file.setNeedCleanup(false);
- }
-
+
currentFile.setCanReclaim(false);
break;
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecord.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecord.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecord.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -66,6 +66,8 @@
}
buffer.writeInt(fileID);
+
+ buffer.writeByte(compactCount);
buffer.writeLong(id);
@@ -81,6 +83,6 @@
@Override
public int getEncodeSize()
{
- return JournalImpl.SIZE_ADD_RECORD + record.getEncodeSize();
+ return JournalImpl.SIZE_ADD_RECORD + record.getEncodeSize() + 1;
}
}
\ No newline at end of file
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecordTX.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecordTX.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecordTX.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -75,6 +75,8 @@
}
buffer.writeInt(fileID);
+
+ buffer.writeByte(compactCount);
buffer.writeLong(txID);
@@ -92,6 +94,6 @@
@Override
public int getEncodeSize()
{
- return JournalImpl.SIZE_ADD_RECORD_TX + record.getEncodeSize();
+ return JournalImpl.SIZE_ADD_RECORD_TX + record.getEncodeSize() + 1;
}
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -70,6 +70,8 @@
}
buffer.writeInt(fileID);
+
+ buffer.writeByte(compactCount);
buffer.writeLong(txID);
@@ -105,11 +107,11 @@
{
if (isCommit)
{
- return JournalImpl.SIZE_COMPLETE_TRANSACTION_RECORD;
+ return JournalImpl.SIZE_COMPLETE_TRANSACTION_RECORD + 1;
}
else
{
- return JournalImpl.SIZE_PREPARE_RECORD + (transactionData != null ? transactionData.getEncodeSize() : 0);
+ return JournalImpl.SIZE_PREPARE_RECORD + (transactionData != null ? transactionData.getEncodeSize() : 0) + 1;
}
}
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecord.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecord.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecord.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -44,6 +44,8 @@
buffer.writeByte(JournalImpl.DELETE_RECORD);
buffer.writeInt(fileID);
+
+ buffer.writeByte(compactCount);
buffer.writeLong(id);
@@ -53,6 +55,6 @@
@Override
public int getEncodeSize()
{
- return JournalImpl.SIZE_DELETE_RECORD;
+ return JournalImpl.SIZE_DELETE_RECORD + 1;
}
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecordTX.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecordTX.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecordTX.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -55,6 +55,8 @@
buffer.writeByte(JournalImpl.DELETE_RECORD_TX);
buffer.writeInt(fileID);
+
+ buffer.writeByte(compactCount);
buffer.writeLong(txID);
@@ -73,6 +75,6 @@
@Override
public int getEncodeSize()
{
- return JournalImpl.SIZE_DELETE_RECORD_TX + (record != null ? record.getEncodeSize() : 0);
+ return JournalImpl.SIZE_DELETE_RECORD_TX + (record != null ? record.getEncodeSize() : 0) + 1;
}
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalInternalRecord.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalInternalRecord.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalInternalRecord.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -27,6 +27,8 @@
{
protected int fileID;
+
+ protected byte compactCount;
public int getFileID()
{
@@ -50,6 +52,23 @@
{
return 0;
}
+
+ public short getCompactCount()
+ {
+ return compactCount;
+ }
+
+ public void setCompactCount(final short compactCount)
+ {
+ if (compactCount > Byte.MAX_VALUE)
+ {
+ this.compactCount = Byte.MAX_VALUE;
+ }
+ else
+ {
+ this.compactCount = (byte)compactCount;
+ }
+ }
public abstract int getEncodeSize();
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -39,14 +39,15 @@
{
buffer.writeByte(JournalImpl.ROLLBACK_RECORD);
buffer.writeInt(fileID);
+ buffer.writeByte(compactCount);
buffer.writeLong(txID);
- buffer.writeInt(JournalImpl.SIZE_ROLLBACK_RECORD);
+ buffer.writeInt(JournalImpl.SIZE_ROLLBACK_RECORD + 1);
}
@Override
public int getEncodeSize()
{
- return JournalImpl.SIZE_ROLLBACK_RECORD;
+ return JournalImpl.SIZE_ROLLBACK_RECORD + 1;
}
}
Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -39,7 +39,6 @@
import org.hornetq.utils.IDGenerator;
import org.hornetq.utils.SimpleIDGenerator;
import org.hornetq.utils.TimeAndCounterIDGenerator;
-import org.hornetq.utils.ReusableLatch;
/**
*
@@ -66,7 +65,7 @@
for (int i = 0; i < 5; i++)
{
SequentialFile file = fileFactory.createSequentialFile("file-" + i + ".tst", 1);
- dataFiles.add(new JournalFileImpl(file, 0));
+ dataFiles.add(new JournalFileImpl(file, 0, JournalImpl.FORMAT_VERSION));
}
ArrayList<JournalFile> newFiles = new ArrayList<JournalFile>();
@@ -74,7 +73,7 @@
for (int i = 0; i < 3; i++)
{
SequentialFile file = fileFactory.createSequentialFile("file-" + i + ".tst.new", 1);
- newFiles.add(new JournalFileImpl(file, 0));
+ newFiles.add(new JournalFileImpl(file, 0, JournalImpl.FORMAT_VERSION));
}
ArrayList<Pair<String, String>> renames = new ArrayList<Pair<String, String>>();
@@ -806,33 +805,8 @@
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
- final ReusableLatch reusableLatchDone = new ReusableLatch();
- reusableLatchDone.countUp();
- final ReusableLatch reusableLatchWait = new ReusableLatch();
- reusableLatchWait.countUp();
-
- journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
- {
-
- @Override
- public void onCompactDone()
- {
- reusableLatchDone.countDown();
- System.out.println("Waiting on Compact");
- try
- {
- reusableLatchWait.await();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- System.out.println("Done");
- }
- };
-
- journal.setAutoReclaim(false);
-
+ createJournal();
+
startJournal();
load();
@@ -844,26 +818,8 @@
long addedRecord = idGen.generateID();
- Thread tCompact = new Thread()
- {
- @Override
- public void run()
- {
- try
- {
- journal.compact();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- };
+ startCompact();
- tCompact.start();
-
- reusableLatchDone.await();
-
addTx(consumerTX, firstID);
addTx(appendTX, addedRecord);
@@ -876,10 +832,8 @@
delete(addedRecord);
- reusableLatchWait.countDown();
+ finishCompact();
- tCompact.join();
-
journal.forceMoveNextFile();
long newRecord = idGen.generateID();
@@ -959,52 +913,11 @@
setup(2, 60 * 1024, false);
- final ReusableLatch reusableLatchDone = new ReusableLatch();
- reusableLatchDone.countUp();
- final ReusableLatch reusableLatchWait = new ReusableLatch();
- reusableLatchWait.countUp();
+ createJournal();
- journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
- {
-
- @Override
- public void onCompactDone()
- {
- reusableLatchDone.countDown();
- System.out.println("Waiting on Compact");
- try
- {
- reusableLatchWait.await();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- System.out.println("Done");
- }
- };
-
- journal.setAutoReclaim(false);
-
startJournal();
load();
- Thread tCompact = new Thread()
- {
- @Override
- public void run()
- {
- try
- {
- journal.cleanUp(journal.getDataFiles()[0]);
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- };
-
for (int i = 0; i < 100; i++)
{
add(i);
@@ -1017,20 +930,16 @@
delete(i);
}
- tCompact.start();
-
- reusableLatchDone.await();
-
+ startCompact();
+
// Delete part of the live records while cleanup still working
for (int i = 1; i < 5; i++)
{
delete(i);
}
- reusableLatchWait.countDown();
-
- tCompact.join();
-
+ finishCompact();
+
// Delete part of the live records after cleanup is done
for (int i = 5; i < 10; i++)
{
@@ -1054,53 +963,12 @@
setup(2, 60 * 1024, false);
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
+
+ createJournal();
- final ReusableLatch reusableLatchDone = new ReusableLatch();
- reusableLatchDone.countUp();
- final ReusableLatch reusableLatchWait = new ReusableLatch();
- reusableLatchWait.countUp();
-
- journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
- {
-
- @Override
- public void onCompactDone()
- {
- reusableLatchDone.countDown();
- System.out.println("Waiting on Compact");
- try
- {
- reusableLatchWait.await();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- System.out.println("Done");
- }
- };
-
- journal.setAutoReclaim(false);
-
startJournal();
load();
- Thread tCompact = new Thread()
- {
- @Override
- public void run()
- {
- try
- {
- journal.compact();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- };
-
long appendTX = idGen.generateID();
long appendOne = idGen.generateID();
long appendTwo = idGen.generateID();
@@ -1109,9 +977,8 @@
addTx(appendTX, appendOne);
- tCompact.start();
- reusableLatchDone.await();
-
+ startCompact();
+
addTx(appendTX, appendTwo);
commit(appendTX);
@@ -1122,8 +989,7 @@
commit(updateTX);
// delete(appendTwo);
- reusableLatchWait.countDown();
- tCompact.join();
+ finishCompact();
journal.compact();
@@ -1239,13 +1105,13 @@
long id = idGenerator.generateID();
listToDelete.add(id);
- expectedSizes.add(recordLength + JournalImpl.SIZE_ADD_RECORD);
+ expectedSizes.add(recordLength + JournalImpl.SIZE_ADD_RECORD + 1);
add(id);
journal.forceMoveNextFile();
update(id);
- expectedSizes.add(recordLength + JournalImpl.SIZE_ADD_RECORD);
+ expectedSizes.add(recordLength + JournalImpl.SIZE_ADD_RECORD + 1);
journal.forceMoveNextFile();
}
@@ -1295,7 +1161,55 @@
}
}
+
+ public void testCompactFirstFileWithPendingCommits() throws Exception
+ {
+ setup(2, 60 * 1024, true);
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ long tx = idGenerator.generateID();
+ for (int i = 0; i < 10; i++)
+ {
+ addTx(tx, idGenerator.generateID());
+ }
+
+ journal.forceMoveNextFile();
+ commit(tx);
+
+
+ ArrayList<Long> listToDelete = new ArrayList<Long>();
+ for (int i = 0; i < 10; i++)
+ {
+ long id = idGenerator.generateID();
+ listToDelete.add(id);
+ add(id);
+ }
+
+ journal.forceMoveNextFile();
+
+ for (Long id : listToDelete)
+ {
+ delete(id);
+ }
+
+ journal.forceMoveNextFile();
+
+ // This operation used to be journal.cleanup(journal.getDataFiles()[0]); when cleanup was still in place
+ journal.compact();
+
+ journal.checkReclaimStatus();
+
+ journal.compact();
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
public void testLiveSizeTransactional() throws Exception
{
setup(2, 60 * 1024, true);
Added: branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/OldFormatTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/OldFormatTest.java (rev 0)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/OldFormatTest.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -0,0 +1,172 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.journal;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.tests.unit.core.journal.impl.JournalImplTestBase;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A OldFormatTest
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class OldFormatTest extends JournalImplTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // This will generate records using the Version 1 format, and reading at the current version
+ public void testFormatOne() throws Exception
+ {
+ setup(2, 100 * 1024, true);
+
+ SequentialFile file = fileFactory.createSequentialFile("hq-1.hq", 1);
+
+ ByteBuffer buffer = ByteBuffer.allocateDirect(100 * 1024);
+
+ initHeader(buffer, 1);
+
+ byte[] record = new byte[1];
+
+ for (long i = 0 ; i < 10; i++)
+ {
+ add(buffer, 1, i, record);
+
+ update(buffer, 1, i, record);
+ }
+
+ file.open(1, false);
+
+ buffer.rewind();
+
+ file.writeDirect(buffer, true);
+
+ file.close();
+
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ startCompact();
+ finishCompact();
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ private void add(ByteBuffer buffer, int fileID, long id, byte[] record)
+ {
+ int pos = buffer.position();
+
+ buffer.put(JournalImpl.ADD_RECORD);
+
+ buffer.putInt(fileID);
+
+ buffer.putLong(id);
+
+ buffer.putInt(record.length);
+
+ buffer.put((byte)0);
+
+ buffer.put(record);
+
+ buffer.putInt(buffer.position() - pos + DataConstants.SIZE_INT);
+
+ records.add(new RecordInfo(id, (byte)0, record, false, (short)0));
+ }
+
+ private void update(ByteBuffer buffer, int fileID, long id, byte[] record)
+ {
+ int pos = buffer.position();
+
+ buffer.put(JournalImpl.UPDATE_RECORD);
+
+ buffer.putInt(fileID);
+
+ buffer.putLong(id);
+
+ buffer.putInt(record.length);
+
+ buffer.put((byte)0);
+
+ buffer.put(record);
+
+ buffer.putInt(buffer.position() - pos + DataConstants.SIZE_INT);
+
+ records.add(new RecordInfo(id, (byte)0, record, true, (short)0));
+
+ }
+
+ /**
+ * @param buffer
+ */
+ private void initHeader(ByteBuffer buffer, int fileID)
+ {
+ buffer.putInt(1);
+
+ buffer.putInt(0);
+
+ buffer.putLong(fileID);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.tests.unit.core.journal.impl.JournalImplTestBase#getFileFactory()
+ */
+ @Override
+ protected SequentialFileFactory getFileFactory() throws Exception
+ {
+ return new NIOSequentialFileFactory(getTestDir());
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ File file = new File(getTestDir());
+
+ deleteDirectory(file);
+
+ file.mkdir();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -15,15 +15,19 @@
import java.io.File;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -41,7 +45,6 @@
import org.hornetq.utils.HornetQThreadFactory;
import org.hornetq.utils.OrderedExecutorFactory;
import org.hornetq.utils.SimpleIDGenerator;
-import org.hornetq.utils.concurrent.LinkedBlockingDeque;
/**
* A SoakJournal
@@ -56,6 +59,8 @@
public static SimpleIDGenerator idGen = new SimpleIDGenerator(1);
private static final int MAX_WRITES = 20000;
+
+ private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
// We want to maximize the difference between appends and deles, or we could get out of memory
public Semaphore maxRecords;
@@ -82,6 +87,12 @@
Executor testExecutor;
+ protected long getTotalTimeMilliseconds()
+ {
+ return TimeUnit.MINUTES.toMillis(2);
+ }
+
+
@Override
public void setUp() throws Exception
{
@@ -114,7 +125,7 @@
journal = new JournalImpl(50 * 1024,
20,
- 15,
+ 50,
ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_PERCENTAGE,
factory,
"hornetq-data",
@@ -181,11 +192,6 @@
threadPool.shutdown();
}
- protected long getTotalTimeMilliseconds()
- {
- return TimeUnit.MINUTES.toMillis(10);
- }
-
public void testAppend() throws Exception
{
@@ -225,6 +231,12 @@
", liveRecords = " +
(numberOfRecords.get() - numberOfDeletes.get()));
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+ rwLock.writeLock().lock();
+ System.out.println("Restarting server");
+ journal.stop();
+ journal.start();
+ reloadJournal();
+ rwLock.writeLock().unlock();
}
running = false;
@@ -255,12 +267,36 @@
latchExecutorDone.await();
- assertEquals(0, errors.get());
-
journal.stop();
journal.start();
+ reloadJournal();
+
+ Collection<Long> records = journal.getRecords().keySet();
+
+ System.out.println("Deleting everything!");
+ for (Long delInfo : records)
+ {
+ journal.appendDeleteRecord(delInfo, false);
+ }
+
+ journal.forceMoveNextFile();
+
+ Thread.sleep(5000);
+
+ assertEquals(0, journal.getDataFilesCount());
+
+ journal.stop();
+ }
+
+ /**
+ * @throws Exception
+ */
+ private void reloadJournal() throws Exception
+ {
+ assertEquals(0, errors.get());
+
ArrayList<RecordInfo> committedRecords = new ArrayList<RecordInfo>();
ArrayList<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
journal.load(committedRecords, preparedTransactions, new TransactionFailureCallback()
@@ -285,8 +321,6 @@
}
assertEquals(numberOfRecords.get() - numberOfDeletes.get(), appends);
-
- journal.stop();
}
private byte[] generateRecord()
@@ -313,9 +347,10 @@
@Override
public void run()
{
+ rwLock.readLock().lock();
+
try
{
-
while (running)
{
final int txSize = RandomUtil.randomMax(100);
@@ -358,6 +393,14 @@
}
}
});
+
+ rwLock.readLock().unlock();
+
+ Thread.yield();
+
+ rwLock.readLock().lock();
+
+
}
}
catch (Exception e)
@@ -366,6 +409,10 @@
running = false;
errors.incrementAndGet();
}
+ finally
+ {
+ rwLock.readLock().unlock();
+ }
}
}
@@ -384,6 +431,9 @@
@Override
public void run()
{
+
+ rwLock.readLock().lock();
+
try
{
int txSize = RandomUtil.randomMax(100);
@@ -391,17 +441,30 @@
long ids[] = new long[txSize];
long txID = JournalCleanupCompactStressTest.idGen.generateID();
-
+
while (running)
{
- long id = queue.poll(60, TimeUnit.MINUTES);
- ids[txCount] = id;
- journal.appendUpdateRecordTransactional(txID, id, (byte)0, generateRecord());
- if (++txCount == txSize)
+ Long id = queue.poll(10, TimeUnit.SECONDS);
+ if (id != null)
{
- journal.appendCommitRecord(txID, true, ctx);
- ctx.executeOnCompletion(new DeleteTask(ids));
+ ids[txCount++] = id;
+ journal.appendUpdateRecordTransactional(txID, id, (byte)0, generateRecord());
+ }
+ if (txCount == txSize || id == null)
+ {
+ if (txCount > 0)
+ {
+ journal.appendCommitRecord(txID, true, ctx);
+ ctx.executeOnCompletion(new DeleteTask(ids));
+ }
+
+ rwLock.readLock().unlock();
+
+ Thread.yield();
+
+ rwLock.readLock().lock();
+
txCount = 0;
txSize = RandomUtil.randomMax(100);
txID = JournalCleanupCompactStressTest.idGen.generateID();
@@ -420,6 +483,10 @@
running = false;
errors.incrementAndGet();
}
+ finally
+ {
+ rwLock.readLock().unlock();
+ }
}
}
@@ -434,14 +501,18 @@
public void done()
{
+ rwLock.readLock().lock();
numberOfUpdates.addAndGet(ids.length);
try
{
for (long id : ids)
{
- journal.appendDeleteRecord(id, false);
- maxRecords.release();
- numberOfDeletes.incrementAndGet();
+ if (id != 0)
+ {
+ journal.appendDeleteRecord(id, false);
+ maxRecords.release();
+ numberOfDeletes.incrementAndGet();
+ }
}
}
catch (Exception e)
@@ -451,6 +522,10 @@
running = false;
errors.incrementAndGet();
}
+ finally
+ {
+ rwLock.readLock().unlock();
+ }
}
public void onError(final int errorCode, final String errorMessage)
@@ -473,6 +548,7 @@
@Override
public void run()
{
+ rwLock.readLock().lock();
try
{
while (running)
@@ -481,18 +557,22 @@
// Append
for (int i = 0; running & i < ids.length; i++)
{
- // System.out.println("append slow");
+ System.out.println("append slow");
ids[i] = JournalCleanupCompactStressTest.idGen.generateID();
maxRecords.acquire();
journal.appendAddRecord(ids[i], (byte)1, generateRecord(), true);
numberOfRecords.incrementAndGet();
+ rwLock.readLock().unlock();
+
Thread.sleep(TimeUnit.SECONDS.toMillis(50));
+
+ rwLock.readLock().lock();
}
// Delete
for (int i = 0; running & i < ids.length; i++)
{
- // System.out.println("Deleting");
+ System.out.println("Deleting");
maxRecords.release();
journal.appendDeleteRecord(ids[i], false);
numberOfDeletes.incrementAndGet();
@@ -504,6 +584,10 @@
e.printStackTrace();
System.exit(-1);
}
+ finally
+ {
+ rwLock.readLock().unlock();
+ }
}
}
Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -542,7 +542,7 @@
journalImpl.appendAddRecordTransactional(1l,
2l,
(byte)3,
- new SimpleEncoding(1900 - JournalImpl.SIZE_ADD_RECORD_TX, (byte)4));
+ new SimpleEncoding(1900 - JournalImpl.SIZE_ADD_RECORD_TX - 1, (byte)4));
journalImpl.appendCommitRecord(1l, false);
@@ -587,11 +587,11 @@
// jumping RecordType, FileId, TransactionID, RecordID, VariableSize,
// RecordType, RecordBody (that we know it is 1 )
- buffer.position(1 + 4 + 8 + 8 + 4 + 1 + 1);
+ buffer.position(1 + 4 + 8 + 8 + 4 + 1 + 1 + 1);
int posCheckSize = buffer.position();
- Assert.assertEquals(JournalImpl.SIZE_ADD_RECORD_TX + 1, buffer.getInt());
+ Assert.assertEquals(JournalImpl.SIZE_ADD_RECORD_TX + 2, buffer.getInt());
buffer.position(posCheckSize);
@@ -652,11 +652,11 @@
// jumping RecordType, FileId, TransactionID, RecordID, VariableSize,
// RecordType, RecordBody (that we know it is 1 )
- buffer.position(1 + 4 + 8 + 8 + 4 + 1 + 1);
+ buffer.position(1 + 4 + 8 + 8 + 4 + 1 + 1 + 1);
int posCheckSize = buffer.position();
- Assert.assertEquals(JournalImpl.SIZE_ADD_RECORD_TX + 1, buffer.getInt());
+ Assert.assertEquals(JournalImpl.SIZE_ADD_RECORD_TX + 2, buffer.getInt());
buffer.position(posCheckSize);
Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -333,7 +333,7 @@
journal.appendAddRecord(element, (byte)0, record, sync);
- records.add(new RecordInfo(element, (byte)0, record, false));
+ records.add(new RecordInfo(element, (byte)0, record, false, (short)0));
}
journal.debugWait();
@@ -349,7 +349,7 @@
journal.appendUpdateRecord(element, (byte)0, updateRecord, sync);
- records.add(new RecordInfo(element, (byte)0, updateRecord, true));
+ records.add(new RecordInfo(element, (byte)0, updateRecord, true, (short)0));
}
journal.debugWait();
@@ -377,13 +377,13 @@
{
// SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_INT + record.length +
// SIZE_BYTE
- byte[] record = generateRecord(recordLength - JournalImpl.SIZE_ADD_RECORD_TX);
+ byte[] record = generateRecord(recordLength - (JournalImpl.SIZE_ADD_RECORD_TX + 1));
beforeJournalOperation();
journal.appendAddRecordTransactional(txID, element, (byte)0, record);
- tx.records.add(new RecordInfo(element, (byte)0, record, false));
+ tx.records.add(new RecordInfo(element, (byte)0, record, false, (short)0));
}
@@ -396,13 +396,13 @@
for (long element : arguments)
{
- byte[] updateRecord = generateRecord(recordLength - JournalImpl.SIZE_ADD_RECORD_TX);
+ byte[] updateRecord = generateRecord(recordLength - (JournalImpl.SIZE_ADD_RECORD_TX + 1));
beforeJournalOperation();
journal.appendUpdateRecordTransactional(txID, element, (byte)0, updateRecord);
- tx.records.add(new RecordInfo(element, (byte)0, updateRecord, true));
+ tx.records.add(new RecordInfo(element, (byte)0, updateRecord, true, (short)0));
}
journal.debugWait();
}
@@ -417,7 +417,7 @@
journal.appendDeleteRecordTransactional(txID, element);
- tx.deletes.add(new RecordInfo(element, (byte)0, null, true));
+ tx.deletes.add(new RecordInfo(element, (byte)0, null, true, (short)0));
}
journal.debugWait();
Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -492,7 +492,8 @@
private int calculateRecordsPerFile(final int fileSize, final int alignment, int recordSize)
{
recordSize = calculateRecordSize(recordSize, alignment);
- return fileSize / recordSize;
+ int headerSize = calculateRecordSize(JournalImpl.SIZE_HEADER, alignment);
+ return (fileSize - headerSize) / recordSize;
}
/**
@@ -666,7 +667,9 @@
int addRecordsPerFile = calculateRecordsPerFile(10 * 1024,
journal.getAlignment(),
- JournalImpl.SIZE_ADD_RECORD + recordLength);
+ JournalImpl.SIZE_ADD_RECORD + 1 + recordLength);
+
+ System.out.println(JournalImpl.SIZE_ADD_RECORD + 1 + recordLength);
// Fills exactly 10 files
int initialNumberOfAddRecords = addRecordsPerFile * 10;
@@ -693,29 +696,11 @@
// Now delete half of them
- int deleteRecordsPerFile = calculateRecordsPerFile(10 * 1024,
- journal.getAlignment(),
- JournalImpl.SIZE_DELETE_RECORD);
-
for (int i = 0; i < initialNumberOfAddRecords / 2; i++)
{
delete(i);
}
- int numberOfFiles = calculateNumberOfFiles(10 * 1024,
- journal.getAlignment(),
- initialNumberOfAddRecords,
- JournalImpl.SIZE_ADD_RECORD + recordLength,
- initialNumberOfAddRecords / 2,
- JournalImpl.SIZE_DELETE_RECORD);
-
- if (initialNumberOfAddRecords / 2 % deleteRecordsPerFile == 0)
- {
- // The file is already full, next add would fix it
- numberOfFiles--;
- }
-
- Assert.assertEquals(numberOfFiles, journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(initialNumberOfAddRecords / 2, journal.getIDMapSize());
@@ -726,16 +711,6 @@
add(initialNumberOfAddRecords + i);
}
- numberOfFiles = calculateNumberOfFiles(10 * 1024,
- journal.getAlignment(),
- initialNumberOfAddRecords,
- JournalImpl.SIZE_ADD_RECORD + recordLength,
- initialNumberOfAddRecords / 2,
- JournalImpl.SIZE_DELETE_RECORD,
- 10,
- JournalImpl.SIZE_ADD_RECORD + recordLength);
-
- Assert.assertEquals(numberOfFiles, journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(initialNumberOfAddRecords / 2 + 10, journal.getIDMapSize());
@@ -786,7 +761,7 @@
public void testReclaimAddUpdateDeleteDifferentFiles1() throws Exception
{
// Make sure there is one record per file
- setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(JournalImpl.SIZE_ADD_RECORD + recordLength,
+ setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(JournalImpl.SIZE_ADD_RECORD + 1 + recordLength,
getAlignment()), true);
createJournal();
startJournal();
@@ -825,7 +800,7 @@
public void testReclaimAddUpdateDeleteDifferentFiles2() throws Exception
{
// Make sure there is one record per file
- setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(JournalImpl.SIZE_ADD_RECORD + recordLength,
+ setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(JournalImpl.SIZE_ADD_RECORD + 1 + recordLength,
getAlignment()), true);
createJournal();
@@ -1130,7 +1105,7 @@
// Make sure we move on to the next file
- addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 2); // in file 2
+ addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD - 1, 2); // in file 2
journal.debugWait();
@@ -1161,7 +1136,7 @@
// Make sure we move on to the next file
- addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 3); // in file 4
+ addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD - 1, 3); // in file 4
List<String> files5 = fileFactory.listFiles(fileExtension);
@@ -1491,7 +1466,7 @@
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(1, journal.getIDMapSize());
- addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 2);
+ addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD - 1, 2);
// Move on to another file
@@ -1544,7 +1519,7 @@
// Make sure we move on to the next file
- addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 2); // in file 1
+ addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD - 1, 2); // in file 1
List<String> files2 = fileFactory.listFiles(fileExtension);
@@ -1565,7 +1540,7 @@
2,
recordLength,
1,
- JournalImpl.SIZE_COMMIT_RECORD) + 2, files3.size());
+ JournalImpl.SIZE_COMMIT_RECORD + 1) + 2, files3.size());
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(calculateNumberOfFiles(fileSize,
@@ -1586,9 +1561,9 @@
2,
recordLength,
1,
- JournalImpl.SIZE_COMMIT_RECORD,
+ JournalImpl.SIZE_COMMIT_RECORD + 1,
1,
- JournalImpl.SIZE_DELETE_RECORD) + 2, files4.size());
+ JournalImpl.SIZE_DELETE_RECORD + 1) + 2, files4.size());
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(calculateNumberOfFiles(fileSize,
@@ -1596,9 +1571,9 @@
2,
recordLength,
1,
- JournalImpl.SIZE_COMMIT_RECORD,
+ JournalImpl.SIZE_COMMIT_RECORD + 1,
1,
- JournalImpl.SIZE_DELETE_RECORD), journal.getDataFilesCount());
+ JournalImpl.SIZE_DELETE_RECORD + 1), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getIDMapSize());
@@ -1608,9 +1583,6 @@
List<String> files5 = fileFactory.listFiles(fileExtension);
- Assert.assertEquals(4, files5.size());
-
- Assert.assertEquals(2, journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(2, journal.getIDMapSize());
@@ -1619,9 +1591,6 @@
List<String> files6 = fileFactory.listFiles(fileExtension);
- Assert.assertEquals(4, files6.size());
-
- Assert.assertEquals(2, journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(2, journal.getIDMapSize());
@@ -1633,11 +1602,6 @@
startJournal();
loadAndCheck();
- List<String> files7 = fileFactory.listFiles(fileExtension);
-
- Assert.assertEquals(4, files7.size());
-
- Assert.assertEquals(2, journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(2, journal.getIDMapSize());
@@ -1665,7 +1629,7 @@
// Make sure we move on to the next file
- addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 2); // in file 1
+ addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD - 1, 2); // in file 1
List<String> files2 = fileFactory.listFiles(fileExtension);
@@ -1678,14 +1642,12 @@
rollback(1); // in file 1
- List<String> files3 = fileFactory.listFiles(fileExtension);
-
Assert.assertEquals(calculateNumberOfFiles(fileSize,
journal.getAlignment(),
2,
recordLength,
1,
- JournalImpl.SIZE_ROLLBACK_RECORD) + 2, files3.size());
+ JournalImpl.SIZE_ROLLBACK_RECORD + 1), journal.getDataFilesCount());
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(calculateNumberOfFiles(fileSize,
@@ -1693,7 +1655,7 @@
2,
recordLength,
1,
- JournalImpl.SIZE_ROLLBACK_RECORD), journal.getDataFilesCount());
+ JournalImpl.SIZE_ROLLBACK_RECORD + 1), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getIDMapSize());
@@ -1706,9 +1668,9 @@
2,
recordLength,
1,
- JournalImpl.SIZE_ROLLBACK_RECORD,
+ JournalImpl.SIZE_ROLLBACK_RECORD + 1,
1,
- JournalImpl.SIZE_DELETE_RECORD) + 2, files4.size());
+ JournalImpl.SIZE_DELETE_RECORD + 1) + 2, files4.size());
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(calculateNumberOfFiles(fileSize,
@@ -1716,43 +1678,19 @@
2,
recordLength,
1,
- JournalImpl.SIZE_ROLLBACK_RECORD,
+ JournalImpl.SIZE_ROLLBACK_RECORD + 1,
1,
- JournalImpl.SIZE_DELETE_RECORD), journal.getDataFilesCount());
+ JournalImpl.SIZE_DELETE_RECORD + 1), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(0, journal.getIDMapSize());
// Move on to another file
- addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 3); // in file 2
+ addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD - 1, 3); // in file 2
// (current
// file)
- List<String> files5 = fileFactory.listFiles(fileExtension);
-
- Assert.assertEquals(calculateNumberOfFiles(fileSize,
- journal.getAlignment(),
- 2,
- recordLength,
- 1,
- JournalImpl.SIZE_ROLLBACK_RECORD,
- 1,
- JournalImpl.SIZE_DELETE_RECORD,
- 1,
- recordLength) + 2, files5.size());
-
Assert.assertEquals(1, journal.getOpenedFilesCount());
-
- Assert.assertEquals(calculateNumberOfFiles(fileSize,
- journal.getAlignment(),
- 2,
- recordLength,
- 1,
- JournalImpl.SIZE_ROLLBACK_RECORD,
- 1,
- JournalImpl.SIZE_DELETE_RECORD,
- 1,
- recordLength), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getIDMapSize());
@@ -1859,76 +1797,23 @@
EncodingSupport xid = new SimpleEncoding(10, (byte)0);
prepare(1, xid); // in file 1
- List<String> files3 = fileFactory.listFiles(fileExtension);
-
- Assert.assertEquals(3, files3.size());
-
- Assert.assertEquals(calculateNumberOfFiles(fileSize,
- journal.getAlignment(),
- 2,
- recordLength,
- 1,
- JournalImpl.SIZE_PREPARE_RECORD), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getIDMapSize());
Assert.assertEquals(1, journal.getOpenedFilesCount());
delete(2); // in file 1
- List<String> files4 = fileFactory.listFiles(fileExtension);
-
- Assert.assertEquals(calculateNumberOfFiles(fileSize,
- journal.getAlignment(),
- 2,
- recordLength,
- 1,
- JournalImpl.SIZE_PREPARE_RECORD,
- 1,
- JournalImpl.SIZE_DELETE_RECORD) + 2, files4.size());
-
Assert.assertEquals(1, journal.getOpenedFilesCount());
- Assert.assertEquals(calculateNumberOfFiles(fileSize,
- journal.getAlignment(),
- 2,
- recordLength,
- 1,
- JournalImpl.SIZE_PREPARE_RECORD,
- 1,
- JournalImpl.SIZE_DELETE_RECORD), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(0, journal.getIDMapSize());
// Move on to another file
- addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 3); // in file 2
+ addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD -1, 3); // in file 2
- List<String> files5 = fileFactory.listFiles(fileExtension);
-
- Assert.assertEquals(calculateNumberOfFiles(fileSize,
- journal.getAlignment(),
- 2,
- recordLength,
- 1,
- JournalImpl.SIZE_PREPARE_RECORD,
- 1,
- JournalImpl.SIZE_DELETE_RECORD,
- 1,
- recordLength) + 2, files5.size());
-
Assert.assertEquals(1, journal.getOpenedFilesCount());
- Assert.assertEquals(calculateNumberOfFiles(fileSize,
- journal.getAlignment(),
- 2,
- recordLength,
- 1,
- JournalImpl.SIZE_PREPARE_RECORD,
- 1,
- JournalImpl.SIZE_DELETE_RECORD,
- 1,
- recordLength), journal.getDataFilesCount());
-
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getIDMapSize());
@@ -1943,7 +1828,7 @@
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(1, journal.getIDMapSize());
- addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 4); // in file 3
+ addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD -1, 4); // in file 3
List<String> files7 = fileFactory.listFiles(fileExtension);
@@ -2415,7 +2300,7 @@
journal.appendAddRecord(i, (byte)0, record, false);
- records.add(new RecordInfo(i, (byte)0, record, false));
+ records.add(new RecordInfo(i, (byte)0, record, false, (short)0));
}
for (int i = 0; i < 100; i++)
@@ -2424,7 +2309,7 @@
journal.appendUpdateRecord(i, (byte)0, record, false);
- records.add(new RecordInfo(i, (byte)0, record, true));
+ records.add(new RecordInfo(i, (byte)0, record, true, (short)0));
}
for (int i = 0; i < 100; i++)
Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -22,6 +22,7 @@
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.impl.JournalFile;
+import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.journal.impl.Reclaimer;
import org.hornetq.core.logging.Logger;
import org.hornetq.tests.util.UnitTestCase;
@@ -357,23 +358,6 @@
assertCantDelete(2);
}
- public void testCleanup() throws Exception
- {
- setup(3);
- setupPosNeg(0, 11, 0, 0, 0);
- setupPosNeg(1, 1, 10, 0, 0);
- setupPosNeg(2, 1, 0, 1, 0);
-
- reclaimer.scan(files);
-
- debugFiles();
-
- assertCantDelete(0);
- Assert.assertTrue(files[0].isNeedCleanup());
- assertCantDelete(1);
- assertCantDelete(2);
- }
-
public void testThreeFiles10() throws Exception
{
setup(3);
@@ -741,9 +725,7 @@
"]=" +
files[i].getPosCount() +
", canDelete = " +
- files[i].isCanReclaim() +
- ", cleanup = " +
- files[i].isNeedCleanup());
+ files[i].isCanReclaim());
for (int j = 0; j <= i; j++)
{
System.out.println("..." + files[i].getNegCount(files[j]));
@@ -1002,5 +984,31 @@
{
return totalDep;
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalFile#getJournalVersion()
+ */
+ public int getJournalVersion()
+ {
+ return JournalImpl.FORMAT_VERSION;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalFile#getTotNeg()
+ */
+ public int getTotNeg()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalFile#setTotNeg(int)
+ */
+ public void setTotNeg(int totNeg)
+ {
+ // TODO Auto-generated method stub
+
+ }
}
}
14 years, 5 months