[hornetq-commits] JBoss hornetq SVN: r9603 - branches/Branch_2_1/src/main/org/hornetq/core/journal/impl.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri Aug 27 11:56:15 EDT 2010
Author: clebert.suconic at jboss.com
Date: 2010-08-27 11:56:15 -0400 (Fri, 27 Aug 2010)
New Revision: 9603
Added:
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
Removed:
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java
Modified:
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
Log:
Renaming FilesRepository class & cleanup
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2010-08-27 05:34:10 UTC (rev 9602)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2010-08-27 15:56:15 UTC (rev 9603)
@@ -55,7 +55,7 @@
protected SequentialFile sequentialFile;
- protected final FilesRepository filesRepository;
+ protected final JournalFilesRepository filesRepository;
protected long nextOrderingID;
@@ -71,7 +71,7 @@
protected AbstractJournalUpdateTask(final SequentialFileFactory fileFactory,
final JournalImpl journal,
- final FilesRepository filesRepository,
+ final JournalFilesRepository filesRepository,
final Set<Long> recordsSnapshot,
final long nextOrderingID)
{
Deleted: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java 2010-08-27 05:34:10 UTC (rev 9602)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java 2010-08-27 15:56:15 UTC (rev 9603)
@@ -1,609 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.journal.impl;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.hornetq.core.journal.SequentialFile;
-import org.hornetq.core.journal.SequentialFileFactory;
-import org.hornetq.core.logging.Logger;
-
-/**
- * This is a helper class for the Journal, which will control access to dataFiles, openedFiles and freeFiles
- * Guaranteeing that they will be delivered in order to the Journal
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class FilesRepository
-{
-
- private static final Logger log = Logger.getLogger(FilesRepository.class);
-
- private static final boolean trace = FilesRepository.log.isTraceEnabled();
-
- // This method exists just to make debug easier.
- // I could replace log.trace by log.info temporarily while I was debugging
- // Journal
- private static final void trace(final String message)
- {
- FilesRepository.log.trace(message);
- }
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private final SequentialFileFactory fileFactory;
-
- private final BlockingDeque<JournalFile> dataFiles = new LinkedBlockingDeque<JournalFile>();
-
- private final BlockingQueue<JournalFile> pendingCloseFiles = new LinkedBlockingDeque<JournalFile>();
-
- private final ConcurrentLinkedQueue<JournalFile> freeFiles = new ConcurrentLinkedQueue<JournalFile>();
-
- private final BlockingQueue<JournalFile> openedFiles = new LinkedBlockingQueue<JournalFile>();
-
- private final AtomicLong nextFileID = new AtomicLong(0);
-
- private final int maxAIO;
-
- private final int minFiles;
-
- private final int fileSize;
-
- private final String filePrefix;
-
- private final String fileExtension;
-
- private final int userVersion;
-
- private Executor filesExecutor;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public FilesRepository(final SequentialFileFactory fileFactory,
- final String filePrefix,
- final String fileExtension,
- final int userVersion,
- final int maxAIO,
- final int fileSize,
- final int minFiles)
- {
- this.fileFactory = fileFactory;
- this.maxAIO = maxAIO;
- this.filePrefix = filePrefix;
- this.fileExtension = fileExtension;
- this.minFiles = minFiles;
- this.fileSize = fileSize;
- this.userVersion = userVersion;
- }
-
- // Public --------------------------------------------------------
-
- public void setExecutor(final Executor executor)
- {
- filesExecutor = executor;
- }
-
- public void clear()
- {
- dataFiles.clear();
-
- drainClosedFiles();
-
- freeFiles.clear();
-
- for (JournalFile file : openedFiles)
- {
- try
- {
- file.getFile().close();
- }
- catch (Exception e)
- {
- FilesRepository.log.warn(e.getMessage(), e);
- }
- }
- openedFiles.clear();
- }
-
- public int getMaxAIO()
- {
- return maxAIO;
- }
-
- public String getFileExtension()
- {
- return fileExtension;
- }
-
- public String getFilePrefix()
- {
- return filePrefix;
- }
-
- public void calculateNextfileID(final List<JournalFile> files)
- {
-
- for (JournalFile file : files)
- {
- long fileID = file.getFileID();
- if (nextFileID.get() < fileID)
- {
- nextFileID.set(fileID);
- }
-
- long fileNameID = getFileNameID(file.getFile().getFileName());
-
- // The compactor could create a fileName but use a previously assigned ID.
- // Because of that we need to take both parts into account
- if (nextFileID.get() < fileNameID)
- {
- nextFileID.set(fileNameID);
- }
- }
-
- }
-
- public void ensureMinFiles() throws Exception
- {
- // FIXME - size() involves a scan
- int filesToCreate = minFiles - (dataFiles.size() + freeFiles.size());
-
- if (filesToCreate > 0)
- {
- for (int i = 0; i < filesToCreate; i++)
- {
- // Keeping all files opened can be very costly (mainly on AIO)
- freeFiles.add(createFile(false, false, true, false));
- }
- }
-
- }
-
- public void openFile(final JournalFile file, final boolean multiAIO) throws Exception
- {
- if (multiAIO)
- {
- file.getFile().open();
- }
- else
- {
- file.getFile().open(1, false);
- }
-
- file.getFile().position(file.getFile().calculateBlockStart(JournalImpl.SIZE_HEADER));
- }
-
- // Data File Operations ==========================================
-
- public JournalFile[] getDataFilesArray()
- {
- return dataFiles.toArray(new JournalFile[dataFiles.size()]);
- }
-
- public JournalFile pollLastDataFile()
- {
- /*java.util.Iterator<JournalFile> iter = dataFiles.iterator();
-
- JournalFile currentFile = null;
- while (iter.hasNext())
- {
- currentFile = iter.next();
-
- if (!iter.hasNext())
- {
- iter.remove();
- }
- }
-
- return currentFile; */
-
- return dataFiles.pollLast();
- }
-
- public void removeDataFile(final JournalFile file)
- {
- if (!dataFiles.remove(file))
- {
- FilesRepository.log.warn("Could not remove file " + file + " from the list of data files");
- }
- }
-
- public int getDataFilesCount()
- {
- return dataFiles.size();
- }
-
- public Collection<JournalFile> getDataFiles()
- {
- return dataFiles;
- }
-
- public void clearDataFiles()
- {
- dataFiles.clear();
- }
-
- public void addDataFileOnTop(final JournalFile file)
- {
- dataFiles.addFirst(file);
- }
-
- public void addDataFileOnBottom(final JournalFile file)
- {
- dataFiles.add(file);
- }
-
- // Free File Operations ==========================================
-
- public int getFreeFilesCount()
- {
- return freeFiles.size();
- }
-
- /**
- * Add directly to the freeFiles structure without reinitializing the file.
- * used on load() only
- */
- public void addFreeFileNoInit(final JournalFile file)
- {
- freeFiles.add(file);
- }
-
- /**
- * @param file
- * @throws Exception
- */
- public synchronized void addFreeFile(final JournalFile file, final boolean renameTmp) throws Exception
- {
- if (file.getFile().size() != fileSize)
- {
- FilesRepository.log.warn("Deleting " + file + ".. as it doesn't have the configured size",
- new Exception("trace"));
- file.getFile().delete();
- }
- else
- // FIXME - size() involves a scan!!!
- if (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() < minFiles)
- {
- // Re-initialise it
-
- if (FilesRepository.trace)
- {
- FilesRepository.trace("Adding free file " + file);
- }
-
- JournalFile jf = reinitializeFile(file);
-
- if (renameTmp)
- {
- jf.getFile().renameTo(JournalImpl.renameExtensionFile(jf.getFile().getFileName(), ".tmp"));
- }
-
- freeFiles.add(jf);
- }
- else
- {
- file.getFile().delete();
- }
- }
-
- public Collection<JournalFile> getFreeFiles()
- {
- return freeFiles;
- }
-
- public JournalFile getFreeFile()
- {
- return freeFiles.remove();
- }
-
- // Opened files operations =======================================
-
- public int getOpenedFilesCount()
- {
- return openedFiles.size();
- }
-
- public void drainClosedFiles()
- {
- JournalFile file;
- try
- {
- while ((file = pendingCloseFiles.poll()) != null)
- {
- file.getFile().close();
- }
- }
- catch (Exception e)
- {
- FilesRepository.log.warn(e.getMessage(), e);
- }
-
- }
-
- /**
- * <p>This method will instantly return the opened file, and schedule opening and reclaiming.</p>
- * <p>In case there are no cached opened files, this method will block until the file was opened,
- * what would happen only if the system is under heavy load by another system (like a backup system, or a DB sharing the same box as HornetQ).</p>
- * */
- public JournalFile openFile() throws InterruptedException
- {
- if (FilesRepository.trace)
- {
- FilesRepository.trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
- }
-
- Runnable run = new Runnable()
- {
- public void run()
- {
- try
- {
- pushOpenedFile();
- }
- catch (Exception e)
- {
- FilesRepository.log.error(e.getMessage(), e);
- }
- }
- };
-
- if (filesExecutor == null)
- {
- run.run();
- }
- else
- {
- filesExecutor.execute(run);
- }
-
- JournalFile nextFile = null;
-
- while (nextFile == null)
- {
- nextFile = openedFiles.poll(5, TimeUnit.SECONDS);
- if (nextFile == null)
- {
- FilesRepository.log.warn("Couldn't open a file in 60 Seconds",
- new Exception("Warning: Couldn't open a file in 60 Seconds"));
- }
- }
-
- if (FilesRepository.trace)
- {
- FilesRepository.trace("Returning file " + nextFile);
- }
-
- return nextFile;
- }
-
- /**
- *
- * Open a file and place it into the openedFiles queue
- * */
- public void pushOpenedFile() throws Exception
- {
- JournalFile nextOpenedFile = takeFile(true, true, true, false);
-
- if (FilesRepository.trace)
- {
- FilesRepository.trace("pushing openFile " + nextOpenedFile);
- }
-
- openedFiles.offer(nextOpenedFile);
- }
-
- public void closeFile(final JournalFile file)
- {
- fileFactory.deactivateBuffer();
- pendingCloseFiles.add(file);
- dataFiles.add(file);
-
- Runnable run = new Runnable()
- {
- public void run()
- {
- drainClosedFiles();
- }
- };
-
- if (filesExecutor == null)
- {
- run.run();
- }
- else
- {
- filesExecutor.execute(run);
- }
-
- }
-
- /**
- * This will get a File from freeFile without initializing it
- * @return
- * @throws Exception
- */
- public JournalFile takeFile(final boolean keepOpened,
- final boolean multiAIO,
- final boolean initFile,
- final boolean tmpCompactExtension) throws Exception
- {
- JournalFile nextOpenedFile = null;
-
- nextOpenedFile = freeFiles.poll();
-
- if (nextOpenedFile == null)
- {
- nextOpenedFile = createFile(keepOpened, multiAIO, initFile, tmpCompactExtension);
- }
- else
- {
- if (tmpCompactExtension)
- {
- SequentialFile sequentialFile = nextOpenedFile.getFile();
- sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
- }
-
- if (keepOpened)
- {
- openFile(nextOpenedFile, multiAIO);
- }
- }
- return nextOpenedFile;
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- /**
- * This method will create a new file on the file system, pre-fill it with FILL_CHARACTER
- * @param keepOpened
- * @return
- * @throws Exception
- */
- private JournalFile createFile(final boolean keepOpened,
- final boolean multiAIO,
- final boolean init,
- final boolean tmpCompact) throws Exception
- {
- long fileID = generateFileID();
-
- String fileName;
-
- fileName = createFileName(tmpCompact, fileID);
-
- if (FilesRepository.trace)
- {
- FilesRepository.trace("Creating file " + fileName);
- }
-
- String tmpFileName = fileName + ".tmp";
-
- SequentialFile sequentialFile = fileFactory.createSequentialFile(tmpFileName, maxAIO);
-
- sequentialFile.open(1, false);
-
- if (init)
- {
- sequentialFile.fill(0, fileSize, JournalImpl.FILL_CHARACTER);
-
- JournalImpl.initFileHeader(fileFactory, sequentialFile, userVersion, fileID);
- }
-
- long position = sequentialFile.position();
-
- sequentialFile.close();
-
- if (FilesRepository.trace)
- {
- FilesRepository.trace("Renaming file " + tmpFileName + " as " + fileName);
- }
-
- sequentialFile.renameTo(fileName);
-
- if (keepOpened)
- {
- if (multiAIO)
- {
- sequentialFile.open();
- }
- else
- {
- sequentialFile.open(1, false);
- }
- sequentialFile.position(position);
- }
-
- return new JournalFileImpl(sequentialFile, fileID, JournalImpl.FORMAT_VERSION);
- }
-
- /**
- * @param tmpCompact
- * @param fileID
- * @return
- */
- private String createFileName(final boolean tmpCompact, final long fileID)
- {
- String fileName;
- if (tmpCompact)
- {
- fileName = filePrefix + "-" + fileID + "." + fileExtension + ".cmp";
- }
- else
- {
- fileName = filePrefix + "-" + fileID + "." + fileExtension;
- }
- return fileName;
- }
-
- private long generateFileID()
- {
- return nextFileID.incrementAndGet();
- }
-
- /** Get the ID part of the name */
- private long getFileNameID(final String fileName)
- {
- try
- {
- return Long.parseLong(fileName.substring(filePrefix.length() + 1, fileName.indexOf('.')));
- }
- catch (Throwable e)
- {
- FilesRepository.log.warn("Impossible to get the ID part of the file name " + fileName, e);
- return 0;
- }
- }
-
- // Discard the old JournalFile and set it with a new ID
- private JournalFile reinitializeFile(final JournalFile file) throws Exception
- {
- long newFileID = generateFileID();
-
- SequentialFile sf = file.getFile();
-
- sf.open(1, false);
-
- int position = JournalImpl.initFileHeader(fileFactory, sf, userVersion, newFileID);
-
- JournalFile jf = new JournalFileImpl(sf, newFileID, JournalImpl.FORMAT_VERSION);
-
- sf.position(position);
-
- sf.close();
-
- return jf;
- }
-
- // Inner classes -------------------------------------------------
-
-}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2010-08-27 05:34:10 UTC (rev 9602)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2010-08-27 15:56:15 UTC (rev 9603)
@@ -144,7 +144,7 @@
public JournalCompactor(final SequentialFileFactory fileFactory,
final JournalImpl journal,
- final FilesRepository filesRepository,
+ final JournalFilesRepository filesRepository,
final Set<Long> recordsSnapshot,
final long firstFileID)
{
Copied: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java (from rev 9602, branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/FilesRepository.java)
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java (rev 0)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java 2010-08-27 15:56:15 UTC (rev 9603)
@@ -0,0 +1,609 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.logging.Logger;
+
+/**
+ * This is a helper class for the Journal, which will control access to dataFiles, openedFiles and freeFiles
+ * Guaranteeing that they will be delivered in order to the Journal
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalFilesRepository
+{
+
+ private static final Logger log = Logger.getLogger(JournalFilesRepository.class);
+
+ private static final boolean trace = JournalFilesRepository.log.isTraceEnabled();
+
+ // This method exists just to make debug easier.
+ // I could replace log.trace by log.info temporarily while I was debugging
+ // Journal
+ private static final void trace(final String message)
+ {
+ JournalFilesRepository.log.trace(message);
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final SequentialFileFactory fileFactory;
+
+ private final BlockingDeque<JournalFile> dataFiles = new LinkedBlockingDeque<JournalFile>();
+
+ private final BlockingQueue<JournalFile> pendingCloseFiles = new LinkedBlockingDeque<JournalFile>();
+
+ private final ConcurrentLinkedQueue<JournalFile> freeFiles = new ConcurrentLinkedQueue<JournalFile>();
+
+ private final BlockingQueue<JournalFile> openedFiles = new LinkedBlockingQueue<JournalFile>();
+
+ private final AtomicLong nextFileID = new AtomicLong(0);
+
+ private final int maxAIO;
+
+ private final int minFiles;
+
+ private final int fileSize;
+
+ private final String filePrefix;
+
+ private final String fileExtension;
+
+ private final int userVersion;
+
+ private Executor filesExecutor;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public JournalFilesRepository(final SequentialFileFactory fileFactory,
+ final String filePrefix,
+ final String fileExtension,
+ final int userVersion,
+ final int maxAIO,
+ final int fileSize,
+ final int minFiles)
+ {
+ this.fileFactory = fileFactory;
+ this.maxAIO = maxAIO;
+ this.filePrefix = filePrefix;
+ this.fileExtension = fileExtension;
+ this.minFiles = minFiles;
+ this.fileSize = fileSize;
+ this.userVersion = userVersion;
+ }
+
+ // Public --------------------------------------------------------
+
+ public void setExecutor(final Executor executor)
+ {
+ filesExecutor = executor;
+ }
+
+ public void clear()
+ {
+ dataFiles.clear();
+
+ drainClosedFiles();
+
+ freeFiles.clear();
+
+ for (JournalFile file : openedFiles)
+ {
+ try
+ {
+ file.getFile().close();
+ }
+ catch (Exception e)
+ {
+ JournalFilesRepository.log.warn(e.getMessage(), e);
+ }
+ }
+ openedFiles.clear();
+ }
+
+ public int getMaxAIO()
+ {
+ return maxAIO;
+ }
+
+ public String getFileExtension()
+ {
+ return fileExtension;
+ }
+
+ public String getFilePrefix()
+ {
+ return filePrefix;
+ }
+
+ public void calculateNextfileID(final List<JournalFile> files)
+ {
+
+ for (JournalFile file : files)
+ {
+ long fileID = file.getFileID();
+ if (nextFileID.get() < fileID)
+ {
+ nextFileID.set(fileID);
+ }
+
+ long fileNameID = getFileNameID(file.getFile().getFileName());
+
+ // The compactor could create a fileName but use a previously assigned ID.
+ // Because of that we need to take both parts into account
+ if (nextFileID.get() < fileNameID)
+ {
+ nextFileID.set(fileNameID);
+ }
+ }
+
+ }
+
+ public void ensureMinFiles() throws Exception
+ {
+ // FIXME - size() involves a scan
+ int filesToCreate = minFiles - (dataFiles.size() + freeFiles.size());
+
+ if (filesToCreate > 0)
+ {
+ for (int i = 0; i < filesToCreate; i++)
+ {
+ // Keeping all files opened can be very costly (mainly on AIO)
+ freeFiles.add(createFile(false, false, true, false));
+ }
+ }
+
+ }
+
+ public void openFile(final JournalFile file, final boolean multiAIO) throws Exception
+ {
+ if (multiAIO)
+ {
+ file.getFile().open();
+ }
+ else
+ {
+ file.getFile().open(1, false);
+ }
+
+ file.getFile().position(file.getFile().calculateBlockStart(JournalImpl.SIZE_HEADER));
+ }
+
+ // Data File Operations ==========================================
+
+ public JournalFile[] getDataFilesArray()
+ {
+ return dataFiles.toArray(new JournalFile[dataFiles.size()]);
+ }
+
+ public JournalFile pollLastDataFile()
+ {
+ /*java.util.Iterator<JournalFile> iter = dataFiles.iterator();
+
+ JournalFile currentFile = null;
+ while (iter.hasNext())
+ {
+ currentFile = iter.next();
+
+ if (!iter.hasNext())
+ {
+ iter.remove();
+ }
+ }
+
+ return currentFile; */
+
+ return dataFiles.pollLast();
+ }
+
+ public void removeDataFile(final JournalFile file)
+ {
+ if (!dataFiles.remove(file))
+ {
+ JournalFilesRepository.log.warn("Could not remove file " + file + " from the list of data files");
+ }
+ }
+
+ public int getDataFilesCount()
+ {
+ return dataFiles.size();
+ }
+
+ public Collection<JournalFile> getDataFiles()
+ {
+ return dataFiles;
+ }
+
+ public void clearDataFiles()
+ {
+ dataFiles.clear();
+ }
+
+ public void addDataFileOnTop(final JournalFile file)
+ {
+ dataFiles.addFirst(file);
+ }
+
+ public void addDataFileOnBottom(final JournalFile file)
+ {
+ dataFiles.add(file);
+ }
+
+ // Free File Operations ==========================================
+
+ public int getFreeFilesCount()
+ {
+ return freeFiles.size();
+ }
+
+ /**
+ * Add directly to the freeFiles structure without reinitializing the file.
+ * used on load() only
+ */
+ public void addFreeFileNoInit(final JournalFile file)
+ {
+ freeFiles.add(file);
+ }
+
+ /**
+ * @param file
+ * @throws Exception
+ */
+ public synchronized void addFreeFile(final JournalFile file, final boolean renameTmp) throws Exception
+ {
+ if (file.getFile().size() != fileSize)
+ {
+ JournalFilesRepository.log.warn("Deleting " + file + ".. as it doesn't have the configured size",
+ new Exception("trace"));
+ file.getFile().delete();
+ }
+ else
+ // FIXME - size() involves a scan!!!
+ if (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() < minFiles)
+ {
+ // Re-initialise it
+
+ if (JournalFilesRepository.trace)
+ {
+ JournalFilesRepository.trace("Adding free file " + file);
+ }
+
+ JournalFile jf = reinitializeFile(file);
+
+ if (renameTmp)
+ {
+ jf.getFile().renameTo(JournalImpl.renameExtensionFile(jf.getFile().getFileName(), ".tmp"));
+ }
+
+ freeFiles.add(jf);
+ }
+ else
+ {
+ file.getFile().delete();
+ }
+ }
+
+ public Collection<JournalFile> getFreeFiles()
+ {
+ return freeFiles;
+ }
+
+ public JournalFile getFreeFile()
+ {
+ return freeFiles.remove();
+ }
+
+ // Opened files operations =======================================
+
+ public int getOpenedFilesCount()
+ {
+ return openedFiles.size();
+ }
+
+ public void drainClosedFiles()
+ {
+ JournalFile file;
+ try
+ {
+ while ((file = pendingCloseFiles.poll()) != null)
+ {
+ file.getFile().close();
+ }
+ }
+ catch (Exception e)
+ {
+ JournalFilesRepository.log.warn(e.getMessage(), e);
+ }
+
+ }
+
+ /**
+ * <p>This method will instantly return the opened file, and schedule opening and reclaiming.</p>
+ * <p>In case there are no cached opened files, this method will block until the file was opened,
+ * what would happen only if the system is under heavy load by another system (like a backup system, or a DB sharing the same box as HornetQ).</p>
+ * */
+ public JournalFile openFile() throws InterruptedException
+ {
+ if (JournalFilesRepository.trace)
+ {
+ JournalFilesRepository.trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
+ }
+
+ Runnable run = new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ pushOpenedFile();
+ }
+ catch (Exception e)
+ {
+ JournalFilesRepository.log.error(e.getMessage(), e);
+ }
+ }
+ };
+
+ if (filesExecutor == null)
+ {
+ run.run();
+ }
+ else
+ {
+ filesExecutor.execute(run);
+ }
+
+ JournalFile nextFile = null;
+
+ while (nextFile == null)
+ {
+ nextFile = openedFiles.poll(5, TimeUnit.SECONDS);
+ if (nextFile == null)
+ {
+ JournalFilesRepository.log.warn("Couldn't open a file in 60 Seconds",
+ new Exception("Warning: Couldn't open a file in 60 Seconds"));
+ }
+ }
+
+ if (JournalFilesRepository.trace)
+ {
+ JournalFilesRepository.trace("Returning file " + nextFile);
+ }
+
+ return nextFile;
+ }
+
+ /**
+ *
+ * Open a file and place it into the openedFiles queue
+ * */
+ public void pushOpenedFile() throws Exception
+ {
+ JournalFile nextOpenedFile = takeFile(true, true, true, false);
+
+ if (JournalFilesRepository.trace)
+ {
+ JournalFilesRepository.trace("pushing openFile " + nextOpenedFile);
+ }
+
+ openedFiles.offer(nextOpenedFile);
+ }
+
+ public void closeFile(final JournalFile file)
+ {
+ fileFactory.deactivateBuffer();
+ pendingCloseFiles.add(file);
+ dataFiles.add(file);
+
+ Runnable run = new Runnable()
+ {
+ public void run()
+ {
+ drainClosedFiles();
+ }
+ };
+
+ if (filesExecutor == null)
+ {
+ run.run();
+ }
+ else
+ {
+ filesExecutor.execute(run);
+ }
+
+ }
+
+ /**
+ * This will get a File from freeFile without initializing it
+ * @return
+ * @throws Exception
+ */
+ public JournalFile takeFile(final boolean keepOpened,
+ final boolean multiAIO,
+ final boolean initFile,
+ final boolean tmpCompactExtension) throws Exception
+ {
+ JournalFile nextOpenedFile = null;
+
+ nextOpenedFile = freeFiles.poll();
+
+ if (nextOpenedFile == null)
+ {
+ nextOpenedFile = createFile(keepOpened, multiAIO, initFile, tmpCompactExtension);
+ }
+ else
+ {
+ if (tmpCompactExtension)
+ {
+ SequentialFile sequentialFile = nextOpenedFile.getFile();
+ sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
+ }
+
+ if (keepOpened)
+ {
+ openFile(nextOpenedFile, multiAIO);
+ }
+ }
+ return nextOpenedFile;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ /**
+ * This method will create a new file on the file system, pre-fill it with FILL_CHARACTER
+ * @param keepOpened
+ * @return
+ * @throws Exception
+ */
+ private JournalFile createFile(final boolean keepOpened,
+ final boolean multiAIO,
+ final boolean init,
+ final boolean tmpCompact) throws Exception
+ {
+ long fileID = generateFileID();
+
+ String fileName;
+
+ fileName = createFileName(tmpCompact, fileID);
+
+ if (JournalFilesRepository.trace)
+ {
+ JournalFilesRepository.trace("Creating file " + fileName);
+ }
+
+ String tmpFileName = fileName + ".tmp";
+
+ SequentialFile sequentialFile = fileFactory.createSequentialFile(tmpFileName, maxAIO);
+
+ sequentialFile.open(1, false);
+
+ if (init)
+ {
+ sequentialFile.fill(0, fileSize, JournalImpl.FILL_CHARACTER);
+
+ JournalImpl.initFileHeader(fileFactory, sequentialFile, userVersion, fileID);
+ }
+
+ long position = sequentialFile.position();
+
+ sequentialFile.close();
+
+ if (JournalFilesRepository.trace)
+ {
+ JournalFilesRepository.trace("Renaming file " + tmpFileName + " as " + fileName);
+ }
+
+ sequentialFile.renameTo(fileName);
+
+ if (keepOpened)
+ {
+ if (multiAIO)
+ {
+ sequentialFile.open();
+ }
+ else
+ {
+ sequentialFile.open(1, false);
+ }
+ sequentialFile.position(position);
+ }
+
+ return new JournalFileImpl(sequentialFile, fileID, JournalImpl.FORMAT_VERSION);
+ }
+
+ /**
+ * @param tmpCompact
+ * @param fileID
+ * @return
+ */
+ private String createFileName(final boolean tmpCompact, final long fileID)
+ {
+ String fileName;
+ if (tmpCompact)
+ {
+ fileName = filePrefix + "-" + fileID + "." + fileExtension + ".cmp";
+ }
+ else
+ {
+ fileName = filePrefix + "-" + fileID + "." + fileExtension;
+ }
+ return fileName;
+ }
+
+ private long generateFileID()
+ {
+ return nextFileID.incrementAndGet();
+ }
+
+ /** Get the ID part of the name */
+ private long getFileNameID(final String fileName)
+ {
+ try
+ {
+ return Long.parseLong(fileName.substring(filePrefix.length() + 1, fileName.indexOf('.')));
+ }
+ catch (Throwable e)
+ {
+ JournalFilesRepository.log.warn("Impossible to get the ID part of the file name " + fileName, e);
+ return 0;
+ }
+ }
+
+ // Discard the old JournalFile and set it with a new ID
+ private JournalFile reinitializeFile(final JournalFile file) throws Exception
+ {
+ long newFileID = generateFileID();
+
+ SequentialFile sf = file.getFile();
+
+ sf.open(1, false);
+
+ int position = JournalImpl.initFileHeader(fileFactory, sf, userVersion, newFileID);
+
+ JournalFile jf = new JournalFileImpl(sf, newFileID, JournalImpl.FORMAT_VERSION);
+
+ sf.position(position);
+
+ sf.close();
+
+ return jf;
+ }
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-27 05:34:10 UTC (rev 9602)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-27 15:56:15 UTC (rev 9603)
@@ -63,7 +63,6 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.utils.DataConstants;
-
/**
*
* <p>A circular log implementation.</p
@@ -86,15 +85,15 @@
private static final int STATE_LOADED = 2;
public static final int FORMAT_VERSION = 2;
-
- private static final int COMPATIBLE_VERSIONS[] = new int[] {1};
+ private static final int COMPATIBLE_VERSIONS[] = new int[] { 1 };
+
// Static --------------------------------------------------------
private static final Logger log = Logger.getLogger(JournalImpl.class);
- private static final boolean trace = log.isTraceEnabled();
-
+ private static final boolean trace = JournalImpl.log.isTraceEnabled();
+
// This is useful at debug time...
// if you set it to true, all the appends, deletes, rollbacks, commits, etc.. are sent to System.out
private static final boolean TRACE_RECORDS = false;
@@ -104,9 +103,9 @@
// Journal
private static final void trace(final String message)
{
- log.trace(message);
+ JournalImpl.log.trace(message);
}
-
+
private static final void traceRecord(final String message)
{
System.out.println(message);
@@ -183,9 +182,8 @@
private final SequentialFileFactory fileFactory;
+ private final JournalFilesRepository filesRepository;
- private final FilesRepository filesRepository;
-
// Compacting may replace this structure
private final ConcurrentMap<Long, JournalRecord> records = new ConcurrentHashMap<Long, JournalRecord>();
@@ -291,9 +289,15 @@
this.minFiles = minFiles;
this.fileFactory = fileFactory;
-
- filesRepository = new FilesRepository(fileFactory, filePrefix, fileExtension, userVersion, maxAIO, fileSize, minFiles);
+ filesRepository = new JournalFilesRepository(fileFactory,
+ filePrefix,
+ fileExtension,
+ userVersion,
+ maxAIO,
+ fileSize,
+ minFiles);
+
this.userVersion = userVersion;
}
@@ -389,7 +393,7 @@
try
{
-
+
JournalFileImpl jrnFile = readFileHeader(file);
orderedFiles.add(jrnFile);
@@ -469,20 +473,20 @@
}
short compactCount = 0;
-
+
if (file.getJournalVersion() >= 2)
{
if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_BYTE))
{
reader.markAsDataFile(file);
-
+
wholeFileBuffer.position(pos + 1);
continue;
}
-
+
compactCount = wholeFileBuffer.get();
}
-
+
long transactionID = 0;
if (JournalImpl.isTransaction(recordType))
@@ -678,19 +682,31 @@
case ADD_RECORD_TX:
{
- reader.onReadAddRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, false, compactCount));
+ reader.onReadAddRecordTX(transactionID, new RecordInfo(recordID,
+ userRecordType,
+ record,
+ false,
+ compactCount));
break;
}
case UPDATE_RECORD_TX:
{
- reader.onReadUpdateRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, true, compactCount));
+ reader.onReadUpdateRecordTX(transactionID, new RecordInfo(recordID,
+ userRecordType,
+ record,
+ true,
+ compactCount));
break;
}
case DELETE_RECORD_TX:
{
- reader.onReadDeleteRecordTX(transactionID, new RecordInfo(recordID, (byte)0, record, true, compactCount));
+ reader.onReadDeleteRecordTX(transactionID, new RecordInfo(recordID,
+ (byte)0,
+ record,
+ true,
+ compactCount));
break;
}
@@ -744,7 +760,7 @@
}
catch (Throwable e)
{
- log.warn(e.getMessage(), e);
+ JournalImpl.log.warn(e.getMessage(), e);
throw new Exception(e.getMessage(), e);
}
finally
@@ -820,7 +836,10 @@
{
JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback);
- if (TRACE_RECORDS) traceRecord("appendAddRecord::id=" + id + ", usedFile = " + usedFile);
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("appendAddRecord::id=" + id + ", usedFile = " + usedFile);
+ }
records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize()));
}
@@ -898,7 +917,10 @@
{
JournalFile usedFile = appendRecord(updateRecord, false, sync, null, callback);
- if (TRACE_RECORDS) traceRecord("appendUpdateRecord::id=" + id + ", usedFile = " + usedFile);
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("appendUpdateRecord::id=" + id + ", usedFile = " + usedFile);
+ }
// record== null here could only mean there is a compactor, and computing the delete should be done after
// compacting is done
@@ -976,7 +998,10 @@
{
JournalFile usedFile = appendRecord(deleteRecord, false, sync, null, callback);
- if (TRACE_RECORDS) traceRecord("appendDeleteRecord::id=" + id + ", usedFile = " + usedFile);
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("appendDeleteRecord::id=" + id + ", usedFile = " + usedFile);
+ }
// record== null here could only mean there is a compactor, and computing the delete should be done after
// compacting is done
@@ -1030,7 +1055,14 @@
{
JournalFile usedFile = appendRecord(addRecord, false, false, tx, null);
- if (TRACE_RECORDS) traceRecord("appendAddRecordTransactional:txID=" + txID + ",id=" + id + ", usedFile = " + usedFile);
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("appendAddRecordTransactional:txID=" + txID +
+ ",id=" +
+ id +
+ ", usedFile = " +
+ usedFile);
+ }
tx.addPositive(usedFile, id, addRecord.getEncodeSize());
}
@@ -1076,7 +1108,14 @@
{
JournalFile usedFile = appendRecord(updateRecordTX, false, false, tx, null);
- if (TRACE_RECORDS) traceRecord("appendUpdateRecordTransactional::txID=" + txID + ",id="+id + ", usedFile = " + usedFile);
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("appendUpdateRecordTransactional::txID=" + txID +
+ ",id=" +
+ id +
+ ", usedFile = " +
+ usedFile);
+ }
tx.addPositive(usedFile, id, updateRecordTX.getEncodeSize());
}
@@ -1116,7 +1155,14 @@
{
JournalFile usedFile = appendRecord(deleteRecordTX, false, false, tx, null);
- if (TRACE_RECORDS) traceRecord("appendDeleteRecordTransactional::txID=" + txID + ", id=" + id + ", usedFile = " + usedFile);
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("appendDeleteRecordTransactional::txID=" + txID +
+ ", id=" +
+ id +
+ ", usedFile = " +
+ usedFile);
+ }
tx.addNegative(usedFile, id);
}
@@ -1206,9 +1252,12 @@
try
{
JournalFile usedFile = appendRecord(prepareRecord, true, sync, tx, callback);
-
- if (TRACE_RECORDS) traceRecord("appendPrepareRecord::txID=" + txID + ", usedFile = " + usedFile);
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("appendPrepareRecord::txID=" + txID + ", usedFile = " + usedFile);
+ }
+
tx.prepare(usedFile);
}
finally
@@ -1226,7 +1275,7 @@
public void appendCommitRecord(final long txID, final boolean sync) throws Exception
{
SyncIOCompletion syncCompletion = getSyncCallback(sync);
-
+
appendCommitRecord(txID, sync, syncCompletion);
if (syncCompletion != null)
@@ -1281,9 +1330,12 @@
try
{
JournalFile usedFile = appendRecord(commitRecord, true, sync, tx, callback);
-
- if (TRACE_RECORDS) traceRecord("appendCommitRecord::txID=" + txID + ", usedFile = " + usedFile);
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("appendCommitRecord::txID=" + txID + ", usedFile = " + usedFile);
+ }
+
tx.commit(usedFile);
}
finally
@@ -1413,9 +1465,9 @@
private void checkDeleteSize()
{
// HORNETQ-482 - Flush deletes only if memory is critical
- if (recordsToDelete.size() > DELETE_FLUSH && (runtime.freeMemory() < (runtime.maxMemory() * 0.2)))
+ if (recordsToDelete.size() > DELETE_FLUSH && runtime.freeMemory() < runtime.maxMemory() * 0.2)
{
- log.debug("Flushing deletes during loading, deleteCount = " + recordsToDelete.size());
+ JournalImpl.log.debug("Flushing deletes during loading, deleteCount = " + recordsToDelete.size());
// Clean up when the list is too large, or it won't be possible to load large sets of files
// Done as part of JBMESSAGING-1678
Iterator<RecordInfo> iter = records.iterator();
@@ -1431,7 +1483,7 @@
recordsToDelete.clear();
- log.debug("flush delete done");
+ JournalImpl.log.debug("flush delete done");
}
}
@@ -1500,13 +1552,16 @@
try
{
- if (trace)
+ if (JournalImpl.trace)
{
JournalImpl.trace("Starting compacting operation on journal");
}
-
- if (TRACE_RECORDS) traceRecord("Starting compacting operation on journal");
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("Starting compacting operation on journal");
+ }
+
onCompactStart();
// We need to guarantee that the journal is frozen for this short time
@@ -1527,7 +1582,7 @@
moveNextFile(false);
filesRepository.drainClosedFiles();
-
+
// Take the snapshots and replace the structures
dataFilesToProcess.addAll(filesRepository.getDataFiles());
@@ -1539,7 +1594,11 @@
return;
}
- compactor = new JournalCompactor(fileFactory, this, filesRepository, records.keySet(), dataFilesToProcess.get(0).getFileID());
+ compactor = new JournalCompactor(fileFactory,
+ this,
+ filesRepository,
+ records.keySet(),
+ dataFilesToProcess.get(0).getFileID());
for (Map.Entry<Long, JournalTransaction> entry : transactions.entrySet())
{
@@ -1571,7 +1630,7 @@
}
catch (Throwable e)
{
- log.warn("Error on reading compacting for " + file);
+ JournalImpl.log.warn("Error on reading compacting for " + file);
throw new Exception("Error on reading compacting for " + file, e);
}
}
@@ -1616,7 +1675,7 @@
filesRepository.addDataFileOnTop(fileToAdd);
}
- if (trace)
+ if (JournalImpl.trace)
{
JournalImpl.trace("There are " + filesRepository.getDataFilesCount() + " datafiles Now");
}
@@ -1647,7 +1706,7 @@
}
else
{
- log.warn("Couldn't find tx=" + newTransaction.getId() + " to merge after compacting");
+ JournalImpl.log.warn("Couldn't find tx=" + newTransaction.getId() + " to merge after compacting");
}
}
}
@@ -1660,13 +1719,16 @@
renameFiles(dataFilesToProcess, newDatafiles);
deleteControlFile(controlFile);
- if (trace)
+ if (JournalImpl.trace)
{
JournalImpl.log.debug("Finished compacting on journal");
}
-
- if (TRACE_RECORDS) traceRecord("Finished compacting on journal");
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("Finished compacting on journal");
+ }
+
}
finally
{
@@ -1794,7 +1856,8 @@
// have been deleted
// just leaving some updates in this file
- posFiles.addUpdateFile(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1); // +1 = compact count
+ posFiles.addUpdateFile(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1); // +1 = compact
+ // count
}
}
@@ -1844,7 +1907,8 @@
transactions.put(transactionID, tnp);
}
- tnp.addPositive(file, info.id, info.data.length + JournalImpl.SIZE_ADD_RECORD_TX + 1); // +1 = compact count
+ tnp.addPositive(file, info.id, info.data.length + JournalImpl.SIZE_ADD_RECORD_TX + 1); // +1 = compact
+ // count
}
public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception
@@ -2021,7 +2085,7 @@
filesRepository.ensureMinFiles();
// The current file is the last one that has data
-
+
currentFile = filesRepository.pollLastDataFile();
if (currentFile != null)
@@ -2118,7 +2182,7 @@
{
JournalImpl.trace("Reclaiming file " + file);
}
-
+
filesRepository.removeDataFile(file);
filesRepository.addFreeFile(file, false);
@@ -2147,9 +2211,9 @@
long totalBytes = (long)dataFiles.length * (long)fileSize;
long compactMargin = (long)(totalBytes * compactPercentage);
-
- boolean needCompact = (totalLiveSize < compactMargin && dataFiles.length > compactMinFiles);
+ boolean needCompact = totalLiveSize < compactMargin && dataFiles.length > compactMinFiles;
+
return needCompact;
}
@@ -2392,7 +2456,7 @@
filesExecutor = Executors.newSingleThreadExecutor(new ThreadFactory()
{
- public Thread newThread(Runnable r)
+ public Thread newThread(final Runnable r)
{
return new Thread(r, "JournalImpl::FilesExecutor");
}
@@ -2401,12 +2465,12 @@
compactorExecutor = Executors.newSingleThreadExecutor(new ThreadFactory()
{
- public Thread newThread(Runnable r)
+ public Thread newThread(final Runnable r)
{
return new Thread(r, "JournalImpl::CompactorExecutor");
}
});
-
+
filesRepository.setExecutor(filesExecutor);
fileFactory.start();
@@ -2523,7 +2587,7 @@
}
catch (Throwable e)
{
- log.warn("Error reinitializing file " + file, e);
+ JournalImpl.log.warn("Error reinitializing file " + file, e);
}
}
done.countDown();
@@ -2537,7 +2601,7 @@
for (JournalFile file : newFiles)
{
- String newName = renameExtensionFile(file.getFile().getFileName(), ".cmp");
+ String newName = JournalImpl.renameExtensionFile(file.getFile().getFileName(), ".cmp");
file.getFile().renameTo(newName);
}
@@ -2547,7 +2611,7 @@
* @param name
* @return
*/
- protected static String renameExtensionFile(String name, String extension)
+ protected static String renameExtensionFile(String name, final String extension)
{
name = name.substring(0, name.lastIndexOf(extension));
return name;
@@ -2663,29 +2727,30 @@
* @return
* @throws Exception
*/
- private JournalFileImpl readFileHeader(SequentialFile file) throws Exception
+ private JournalFileImpl readFileHeader(final SequentialFile file) throws Exception
{
ByteBuffer bb = fileFactory.newBuffer(JournalImpl.SIZE_HEADER);
file.read(bb);
int journalVersion = bb.getInt();
-
- if (journalVersion != FORMAT_VERSION)
+
+ if (journalVersion != JournalImpl.FORMAT_VERSION)
{
boolean isCompatible = false;
-
- for (int v : COMPATIBLE_VERSIONS)
+
+ for (int v : JournalImpl.COMPATIBLE_VERSIONS)
{
if (v == journalVersion)
{
isCompatible = true;
}
}
-
+
if (!isCompatible)
{
- throw new HornetQException(HornetQException.IO_ERROR, "Journal files version mismatch. You should export the data from the previous version and import it as explained on the user's manual");
+ throw new HornetQException(HornetQException.IO_ERROR,
+ "Journal files version mismatch. You should export the data from the previous version and import it as explained on the user's manual");
}
}
@@ -2701,7 +2766,7 @@
fileFactory.releaseBuffer(bb);
bb = null;
-
+
return new JournalFileImpl(file, fileID, journalVersion);
}
@@ -2720,7 +2785,7 @@
HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bb);
- writeHeader(buffer, userVersion, fileID);
+ JournalImpl.writeHeader(buffer, userVersion, fileID);
bb.rewind();
@@ -2738,9 +2803,9 @@
* @param userVersion
* @param fileID
*/
- public static void writeHeader(HornetQBuffer buffer, final int userVersion, final long fileID)
+ public static void writeHeader(final HornetQBuffer buffer, final int userVersion, final long fileID)
{
- buffer.writeInt(FORMAT_VERSION);
+ buffer.writeInt(JournalImpl.FORMAT_VERSION);
buffer.writeInt(userVersion);
@@ -2788,7 +2853,7 @@
{
throw new NullPointerException("Current file = null");
}
-
+
if (tx != null)
{
// The callback of a transaction has to be taken inside the lock,
@@ -2850,7 +2915,7 @@
filesRepository.closeFile(currentFile);
currentFile = filesRepository.openFile();
-
+
if (scheduleReclaim)
{
scheduleReclaim();
@@ -2894,7 +2959,6 @@
}
}
-
private JournalTransaction getTransactionInfo(final long txID)
{
JournalTransaction tx = transactions.get(txID);
More information about the hornetq-commits
mailing list