[jboss-cvs] JBoss Messaging SVN: r4731 - in trunk/src/main/org/jboss/messaging/core/journal: impl and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jul 25 13:12:33 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-07-25 13:12:33 -0400 (Fri, 25 Jul 2008)
New Revision: 4731
Modified:
trunk/src/main/org/jboss/messaging/core/journal/Journal.java
trunk/src/main/org/jboss/messaging/core/journal/PreparedTransactionInfo.java
trunk/src/main/org/jboss/messaging/core/journal/RecordInfo.java
trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java
trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java
trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
trunk/src/main/org/jboss/messaging/core/journal/impl/Reclaimer.java
trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java
Log:
Replacing tabs by spaces
Modified: trunk/src/main/org/jboss/messaging/core/journal/Journal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/Journal.java 2008-07-24 21:57:44 UTC (rev 4730)
+++ trunk/src/main/org/jboss/messaging/core/journal/Journal.java 2008-07-25 17:12:33 UTC (rev 4731)
@@ -36,45 +36,45 @@
*/
public interface Journal extends MessagingComponent
{
- // Non transactional operations
-
+ // Non transactional operations
+
void appendAddRecord(long id, byte recordType, EncodingSupport record) throws Exception;
-
- void appendAddRecord(long id, byte recordType, byte[] record) throws Exception;
-
+
+ void appendAddRecord(long id, byte recordType, byte[] record) throws Exception;
+
void appendUpdateRecord(long id, byte recordType, byte[] record) throws Exception;
void appendUpdateRecord(long id, byte recordType, EncodingSupport record) throws Exception;
- void appendDeleteRecord(long id) throws Exception;
-
- // Transactional operations
-
- long getTransactionID();
-
+ void appendDeleteRecord(long id) throws Exception;
+
+ // Transactional operations
+
+ long getTransactionID();
+
void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception;
- void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
-
+ void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
+
void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception;
- void appendDeleteRecordTransactional(long txID, long id) throws Exception;
-
- void appendCommitRecord(long txID) throws Exception;
-
- void appendPrepareRecord(long txID) throws Exception;
-
- void appendRollbackRecord(long txID) throws Exception;
-
- // Load
-
+ void appendDeleteRecordTransactional(long txID, long id) throws Exception;
+
+ void appendCommitRecord(long txID) throws Exception;
+
+ void appendPrepareRecord(long txID) throws Exception;
+
+ void appendRollbackRecord(long txID) throws Exception;
+
+ // Load
+
long load(List<RecordInfo> committedRecords,
List<PreparedTransactionInfo> preparedTransactions) throws Exception;
-
+
long load(LoadManager reloadManager) throws Exception;
-
- int getAlignment() throws Exception;
-
+
+ int getAlignment() throws Exception;
+
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/PreparedTransactionInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/PreparedTransactionInfo.java 2008-07-24 21:57:44 UTC (rev 4730)
+++ trunk/src/main/org/jboss/messaging/core/journal/PreparedTransactionInfo.java 2008-07-25 17:12:33 UTC (rev 4731)
@@ -37,14 +37,14 @@
*/
public class PreparedTransactionInfo
{
- public final long id;
-
- public final List<RecordInfo> records = new ArrayList<RecordInfo>();
-
- public final Set<Long> recordsToDelete = new HashSet<Long>();
-
- public PreparedTransactionInfo(final long id)
- {
- this.id = id;
- }
+ public final long id;
+
+ public final List<RecordInfo> records = new ArrayList<RecordInfo>();
+
+ public final Set<Long> recordsToDelete = new HashSet<Long>();
+
+ public PreparedTransactionInfo(final long id)
+ {
+ this.id = id;
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/RecordInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/RecordInfo.java 2008-07-24 21:57:44 UTC (rev 4730)
+++ trunk/src/main/org/jboss/messaging/core/journal/RecordInfo.java 2008-07-25 17:12:33 UTC (rev 4731)
@@ -33,40 +33,40 @@
*/
public class RecordInfo
{
- public RecordInfo(final long id, byte userRecordType, final byte[] data, final boolean isUpdate)
- {
- this.id = id;
-
- this.userRecordType = userRecordType;
-
- this.data = data;
-
- this.isUpdate = isUpdate;
- }
-
- public final long id;
-
- public final byte userRecordType;
-
- public final byte[] data;
-
- public boolean isUpdate;
-
- public byte getUserRecordType()
- {
- return userRecordType;
- }
-
- public int hashCode()
- {
- return (int)((id >>> 32) ^ id);
- }
-
- public boolean equals(Object other)
- {
- RecordInfo r = (RecordInfo)other;
-
- return r.id == this.id;
- }
-
+ public RecordInfo(final long id, byte userRecordType, final byte[] data, final boolean isUpdate)
+ {
+ this.id = id;
+
+ this.userRecordType = userRecordType;
+
+ this.data = data;
+
+ this.isUpdate = isUpdate;
+ }
+
+ public final long id;
+
+ public final byte userRecordType;
+
+ public final byte[] data;
+
+ public boolean isUpdate;
+
+ public byte getUserRecordType()
+ {
+ return userRecordType;
+ }
+
+ public int hashCode()
+ {
+ return (int)((id >>> 32) ^ id);
+ }
+
+ public boolean equals(Object other)
+ {
+ RecordInfo r = (RecordInfo)other;
+
+ return r.id == this.id;
+ }
+
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java 2008-07-24 21:57:44 UTC (rev 4730)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java 2008-07-25 17:12:33 UTC (rev 4731)
@@ -35,17 +35,17 @@
*/
public interface SequentialFileFactory
{
- SequentialFile createSequentialFile(String fileName, int maxIO) throws Exception;
-
- List<String> listFiles(String extension) throws Exception;
-
- boolean isSupportsCallbacks();
-
+ SequentialFile createSequentialFile(String fileName, int maxIO) throws Exception;
+
+ List<String> listFiles(String extension) throws Exception;
+
+ boolean isSupportsCallbacks();
+
ByteBuffer newBuffer(int size);
-
+
// Avoid using this method in production as it creates an unecessary copy
ByteBuffer wrapBuffer(byte[] bytes);
int getAlignment();
-
+
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java 2008-07-24 21:57:44 UTC (rev 4730)
+++ trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java 2008-07-25 17:12:33 UTC (rev 4731)
@@ -32,20 +32,20 @@
*/
public interface TestableJournal extends Journal
{
- void checkAndReclaimFiles() throws Exception;
-
- int getDataFilesCount();
-
- int getFreeFilesCount();
-
- int getOpenedFilesCount();
-
- int getIDMapSize();
-
+ void checkAndReclaimFiles() throws Exception;
+
+ int getDataFilesCount();
+
+ int getFreeFilesCount();
+
+ int getOpenedFilesCount();
+
+ int getIDMapSize();
+
String debug() throws Exception;
-
+
void debugWait() throws Exception;
-
+
int getFileSize();
int getMinFiles();
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2008-07-24 21:57:44 UTC (rev 4730)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2008-07-25 17:12:33 UTC (rev 4731)
@@ -47,198 +47,198 @@
public class AIOSequentialFile implements SequentialFile
{
private static final Logger log = Logger.getLogger(AIOSequentialFile.class);
-
+
private final String journalDir;
- private final String fileName;
-
- private boolean opened = false;
-
- private final int maxIO;
-
+ private final String fileName;
+
+ private boolean opened = false;
+
+ private final int maxIO;
+
private AsynchronousFile aioFile;
-
- private AtomicLong position = new AtomicLong(0);
-
- // A context switch on AIO would make it to synchronize the disk before switching to the new thread, what would cause
- // serious performance problems. Because of that we make all the writes on AIO using a single thread.
- private ExecutorService executor;
-
- public AIOSequentialFile(final String journalDir, final String fileName, final int maxIO) throws Exception
- {
- this.journalDir = journalDir;
- this.fileName = fileName;
- this.maxIO = maxIO;
- }
-
- public int getAlignment() throws Exception
- {
- checkOpened();
-
- return aioFile.getBlockSize();
- }
-
- public int calculateBlockStart(int position) throws Exception
- {
- int alignment = getAlignment();
-
- int pos = ((position / alignment) + (position % alignment != 0 ? 1 : 0)) * alignment;
-
- return pos;
- }
-
- public synchronized void close() throws Exception
- {
- checkOpened();
- opened = false;
+
+ private AtomicLong position = new AtomicLong(0);
+
+ // A context switch on AIO would make it to synchronize the disk before switching to the new thread, what would cause
+ // serious performance problems. Because of that we make all the writes on AIO using a single thread.
+ private ExecutorService executor;
+
+ public AIOSequentialFile(final String journalDir, final String fileName, final int maxIO) throws Exception
+ {
+ this.journalDir = journalDir;
+ this.fileName = fileName;
+ this.maxIO = maxIO;
+ }
+
+ public int getAlignment() throws Exception
+ {
+ checkOpened();
+
+ return aioFile.getBlockSize();
+ }
+
+ public int calculateBlockStart(int position) throws Exception
+ {
+ int alignment = getAlignment();
+
+ int pos = ((position / alignment) + (position % alignment != 0 ? 1 : 0)) * alignment;
+
+ return pos;
+ }
+
+ public synchronized void close() throws Exception
+ {
+ checkOpened();
+ opened = false;
executor.shutdown();
while (!executor.awaitTermination(60, TimeUnit.SECONDS))
{
log.warn("Executor on file " + this.fileName + " couldn't complete its tasks in 60 seconds.",
- new Exception ("Warning: Executor on file " + this.fileName + " couldn't complete its tasks in 60 seconds.") );
+ new Exception ("Warning: Executor on file " + this.fileName + " couldn't complete its tasks in 60 seconds.") );
}
- aioFile.close();
- aioFile = null;
- }
-
- public void delete() throws Exception
- {
- if (aioFile != null)
- {
- aioFile.close();
- aioFile = null;
- }
-
- File file = new File(journalDir + "/" + fileName);
- file.delete();
- }
-
- public void fill(int position, final int size, final byte fillCharacter) throws Exception
- {
- checkOpened();
-
- int blockSize = aioFile.getBlockSize();
-
+ aioFile.close();
+ aioFile = null;
+ }
+
+ public void delete() throws Exception
+ {
+ if (aioFile != null)
+ {
+ aioFile.close();
+ aioFile = null;
+ }
+
+ File file = new File(journalDir + "/" + fileName);
+ file.delete();
+ }
+
+ public void fill(int position, final int size, final byte fillCharacter) throws Exception
+ {
+ checkOpened();
+
+ int blockSize = aioFile.getBlockSize();
+
if (size % (100*1024*1024) == 0)
{
blockSize = 100*1024*1024;
}
- if (size % (10*1024*1024) == 0)
+ else if (size % (10*1024*1024) == 0)
{
blockSize = 10*1024*1024;
}
- else if (size % (1024*1024) == 0)
- {
- blockSize = 1024*1024;
- }
- else if (size % (10*1024) == 0)
- {
- blockSize = 10*1024;
- }
- else
- {
- blockSize = aioFile.getBlockSize();
- }
-
- int blocks = size / blockSize;
-
- if (size % blockSize != 0)
- {
- blocks++;
- }
-
- if (position % aioFile.getBlockSize() != 0)
- {
- position = ((position / aioFile.getBlockSize()) + 1) * aioFile.getBlockSize();
- }
-
- aioFile.fill((long)position, blocks, blockSize, (byte)fillCharacter);
- }
-
- public String getFileName()
- {
- return fileName;
- }
-
- public synchronized void open() throws Exception
- {
- opened = true;
- executor = Executors.newSingleThreadExecutor();
- aioFile = new AsynchronousFileImpl();
- aioFile.open(journalDir + "/" + fileName, maxIO);
- position.set(0);
-
- }
-
- public void position(final int pos) throws Exception
- {
- position.set(pos);
- }
-
- public int read(final ByteBuffer bytes, final IOCallback callback) throws Exception
- {
- int bytesToRead = bytes.limit();
-
- long positionToRead = position.getAndAdd(bytesToRead);
-
- bytes.rewind();
-
- aioFile.read(positionToRead, bytesToRead, bytes, callback);
-
- return bytesToRead;
- }
-
- public int read(final ByteBuffer bytes) throws Exception
- {
- WaitCompletion waitCompletion = new WaitCompletion();
-
- int bytesRead = read (bytes, waitCompletion);
-
- waitCompletion.waitLatch();
-
- return bytesRead;
- }
-
-
- public int write(final ByteBuffer bytes, final IOCallback callback) throws Exception
- {
- final int bytesToWrite = bytes.limit();
-
- final long positionToWrite = position.getAndAdd(bytesToWrite);
-
- execWrite(bytes, callback, bytesToWrite, positionToWrite);
-
- return bytesToWrite;
- }
-
- public int write(final ByteBuffer bytes, final boolean sync) throws Exception
- {
- if (sync)
- {
- WaitCompletion completion = new WaitCompletion();
-
- int bytesWritten = write(bytes, completion);
-
- completion.waitLatch();
-
- return bytesWritten;
- }
- else
- {
- return write (bytes, DummyCallback.instance);
- }
- }
-
- public String toString()
- {
- return "AIOSequentialFile:" + this.journalDir + "/" + this.fileName;
- }
-
- // Private methods
- // -----------------------------------------------------------------------------------------------------
-
+ else if (size % (1024*1024) == 0)
+ {
+ blockSize = 1024*1024;
+ }
+ else if (size % (10*1024) == 0)
+ {
+ blockSize = 10*1024;
+ }
+ else
+ {
+ blockSize = aioFile.getBlockSize();
+ }
+
+ int blocks = size / blockSize;
+
+ if (size % blockSize != 0)
+ {
+ blocks++;
+ }
+
+ if (position % aioFile.getBlockSize() != 0)
+ {
+ position = ((position / aioFile.getBlockSize()) + 1) * aioFile.getBlockSize();
+ }
+
+ aioFile.fill((long)position, blocks, blockSize, (byte)fillCharacter);
+ }
+
+ public String getFileName()
+ {
+ return fileName;
+ }
+
+ public synchronized void open() throws Exception
+ {
+ opened = true;
+ executor = Executors.newSingleThreadExecutor();
+ aioFile = new AsynchronousFileImpl();
+ aioFile.open(journalDir + "/" + fileName, maxIO);
+ position.set(0);
+
+ }
+
+ public void position(final int pos) throws Exception
+ {
+ position.set(pos);
+ }
+
+ public int read(final ByteBuffer bytes, final IOCallback callback) throws Exception
+ {
+ int bytesToRead = bytes.limit();
+
+ long positionToRead = position.getAndAdd(bytesToRead);
+
+ bytes.rewind();
+
+ aioFile.read(positionToRead, bytesToRead, bytes, callback);
+
+ return bytesToRead;
+ }
+
+ public int read(final ByteBuffer bytes) throws Exception
+ {
+ WaitCompletion waitCompletion = new WaitCompletion();
+
+ int bytesRead = read (bytes, waitCompletion);
+
+ waitCompletion.waitLatch();
+
+ return bytesRead;
+ }
+
+
+ public int write(final ByteBuffer bytes, final IOCallback callback) throws Exception
+ {
+ final int bytesToWrite = bytes.limit();
+
+ final long positionToWrite = position.getAndAdd(bytesToWrite);
+
+ execWrite(bytes, callback, bytesToWrite, positionToWrite);
+
+ return bytesToWrite;
+ }
+
+ public int write(final ByteBuffer bytes, final boolean sync) throws Exception
+ {
+ if (sync)
+ {
+ WaitCompletion completion = new WaitCompletion();
+
+ int bytesWritten = write(bytes, completion);
+
+ completion.waitLatch();
+
+ return bytesWritten;
+ }
+ else
+ {
+ return write (bytes, DummyCallback.instance);
+ }
+ }
+
+ public String toString()
+ {
+ return "AIOSequentialFile:" + this.journalDir + "/" + this.fileName;
+ }
+
+ // Private methods
+ // -----------------------------------------------------------------------------------------------------
+
private void execWrite(final ByteBuffer bytes, final IOCallback callback,
final int bytesToWrite, final long positionToWrite)
{
@@ -260,62 +260,62 @@
}
});
}
-
-
- private void checkOpened() throws Exception
- {
- if (aioFile == null || !opened)
- {
- throw new IllegalStateException ("File not opened");
- }
- }
-
- private static class DummyCallback implements IOCallback
- {
- static DummyCallback instance = new DummyCallback();
-
+
+
+ private void checkOpened() throws Exception
+ {
+ if (aioFile == null || !opened)
+ {
+ throw new IllegalStateException ("File not opened");
+ }
+ }
+
+ private static class DummyCallback implements IOCallback
+ {
+ static DummyCallback instance = new DummyCallback();
+
public void done()
{
}
-
+
public void onError(int errorCode, String errorMessage)
{
log.warn("Error on writing data!" + errorMessage + " code - " + errorCode, new Exception (errorMessage));
}
- }
-
- private static class WaitCompletion implements IOCallback
- {
- private final CountDownLatch latch = new CountDownLatch(1);
-
- private volatile String errorMessage;
-
- private volatile int errorCode = 0;
-
- public void done()
- {
- latch.countDown();
- }
-
- public void onError(final int errorCode, final String errorMessage)
- {
- this.errorCode = errorCode;
-
- this.errorMessage = errorMessage;
-
- log.warn("Error Message " + errorMessage);
-
- latch.countDown();
- }
-
- public void waitLatch() throws Exception
- {
- latch.await();
- if (errorMessage != null)
- {
- throw new MessagingException(errorCode, errorMessage);
- }
- return;
- }
- }
+ }
+
+ private static class WaitCompletion implements IOCallback
+ {
+ private final CountDownLatch latch = new CountDownLatch(1);
+
+ private volatile String errorMessage;
+
+ private volatile int errorCode = 0;
+
+ public void done()
+ {
+ latch.countDown();
+ }
+
+ public void onError(final int errorCode, final String errorMessage)
+ {
+ this.errorCode = errorCode;
+
+ this.errorMessage = errorMessage;
+
+ log.warn("Error Message " + errorMessage);
+
+ latch.countDown();
+ }
+
+ public void waitLatch() throws Exception
+ {
+ latch.await();
+ if (errorMessage != null)
+ {
+ throw new MessagingException(errorCode, errorMessage);
+ }
+ return;
+ }
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java 2008-07-24 21:57:44 UTC (rev 4730)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java 2008-07-25 17:12:33 UTC (rev 4731)
@@ -36,26 +36,26 @@
*/
public class AIOSequentialFileFactory extends AbstractSequentialFactory
{
- public AIOSequentialFileFactory(final String journalDir)
- {
- super(journalDir);
- }
-
- public SequentialFile createSequentialFile(final String fileName, final int maxIO) throws Exception
- {
- return new AIOSequentialFile(journalDir, fileName, maxIO);
- }
-
+ public AIOSequentialFileFactory(final String journalDir)
+ {
+ super(journalDir);
+ }
+
+ public SequentialFile createSequentialFile(final String fileName, final int maxIO) throws Exception
+ {
+ return new AIOSequentialFile(journalDir, fileName, maxIO);
+ }
+
public boolean isSupportsCallbacks()
{
return true;
}
- public static boolean isSupported()
- {
- return AsynchronousFileImpl.isLoaded();
- }
-
+ public static boolean isSupported()
+ {
+ return AsynchronousFileImpl.isLoaded();
+ }
+
public ByteBuffer newBuffer(int size)
{
if (size % 512 != 0)
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java 2008-07-24 21:57:44 UTC (rev 4730)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java 2008-07-25 17:12:33 UTC (rev 4731)
@@ -40,33 +40,33 @@
*/
public abstract class AbstractSequentialFactory implements SequentialFileFactory
{
- protected final String journalDir;
-
- public AbstractSequentialFactory(final String journalDir)
- {
- this.journalDir = journalDir;
- }
-
- public List<String> listFiles(final String extension) throws Exception
- {
- File dir = new File(journalDir);
-
- FilenameFilter fnf = new FilenameFilter()
- {
- public boolean accept(File file, String name)
- {
- return name.endsWith("." + extension);
- }
- };
-
- String[] fileNames = dir.list(fnf);
-
- if (fileNames == null)
- {
- throw new IOException("Failed to list: " + journalDir);
- }
-
- return Arrays.asList(fileNames);
- }
-
+ protected final String journalDir;
+
+ public AbstractSequentialFactory(final String journalDir)
+ {
+ this.journalDir = journalDir;
+ }
+
+ public List<String> listFiles(final String extension) throws Exception
+ {
+ File dir = new File(journalDir);
+
+ FilenameFilter fnf = new FilenameFilter()
+ {
+ public boolean accept(File file, String name)
+ {
+ return name.endsWith("." + extension);
+ }
+ };
+
+ String[] fileNames = dir.list(fnf);
+
+ if (fileNames == null)
+ {
+ throw new IOException("Failed to list: " + journalDir);
+ }
+
+ return Arrays.asList(fileNames);
+ }
+
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java 2008-07-24 21:57:44 UTC (rev 4730)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java 2008-07-25 17:12:33 UTC (rev 4731)
@@ -35,27 +35,27 @@
*/
public interface JournalFile
{
- int getNegCount(JournalFile file);
-
- void incNegCount(JournalFile file);
-
- int getPosCount();
-
- void incPosCount();
-
- void decPosCount();
-
- void setCanReclaim(boolean canDelete);
-
- boolean isCanReclaim();
-
- void extendOffset(final int delta);
-
- int getOffset();
-
- int getOrderingID();
-
- void setOffset(final int offset);
-
- SequentialFile getFile();
+ int getNegCount(JournalFile file);
+
+ void incNegCount(JournalFile file);
+
+ int getPosCount();
+
+ void incPosCount();
+
+ void decPosCount();
+
+ void setCanReclaim(boolean canDelete);
+
+ boolean isCanReclaim();
+
+ void extendOffset(final int delta);
+
+ int getOffset();
+
+ int getOrderingID();
+
+ void setOffset(final int offset);
+
+ SequentialFile getFile();
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java 2008-07-24 21:57:44 UTC (rev 4730)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java 2008-07-25 17:12:33 UTC (rev 4731)
@@ -22,7 +22,6 @@
package org.jboss.messaging.core.journal.impl;
-import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
@@ -40,125 +39,125 @@
*/
public class JournalFileImpl implements JournalFile
{
- private static final Logger log = Logger.getLogger(JournalFileImpl.class);
-
- private final SequentialFile file;
-
- private final int orderingID;
-
- private int offset;
-
- private int posCount;
-
- private boolean canReclaim;
-
- private Map<JournalFile, Integer> negCounts = new ConcurrentHashMap<JournalFile, Integer>();
-
- public JournalFileImpl(final SequentialFile file, final int orderingID)
- {
- this.file = file;
-
- this.orderingID = orderingID;
- }
-
- public int getPosCount()
- {
- return posCount;
- }
-
- public boolean isCanReclaim()
- {
- return canReclaim;
- }
-
- public void setCanReclaim(final boolean canReclaim)
- {
- this.canReclaim = canReclaim;
- }
-
- public void incNegCount(final JournalFile file)
- {
- Integer count = negCounts.get(file);
-
- int c = count == null ? 1 : count.intValue() + 1;
-
- negCounts.put(file, c);
- }
-
- public int getNegCount(final JournalFile file)
- {
- Integer count = negCounts.get(file);
-
- if (count == null)
- {
- return 0;
- }
- else
- {
- return count.intValue();
- }
- }
-
- public void incPosCount()
- {
- posCount++;
- }
-
- public void decPosCount()
- {
- posCount--;
- }
-
- public void extendOffset(final int delta)
- {
- offset += delta;
- }
-
- public int getOffset()
- {
- return offset;
- }
-
- public int getOrderingID()
- {
- return orderingID;
- }
-
- public void setOffset(final int offset)
- {
- this.offset = offset;
- }
-
- public SequentialFile getFile()
- {
- return file;
- }
+ private static final Logger log = Logger.getLogger(JournalFileImpl.class);
- public String toString()
- {
- try
- {
- return "JournalFileImpl: " + file.getFileName();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- return "Error:" + e.toString();
- }
- }
-
- /** Receive debug information about the journal */
- public String debug()
- {
- StringBuilder builder = new StringBuilder();
-
- for (Entry<JournalFile, Integer> entry: negCounts.entrySet())
- {
- builder.append(" file = " + entry.getKey() + " negcount value = " + entry.getValue() + "\n");
- }
-
- return builder.toString();
- }
-
-
+ private final SequentialFile file;
+
+ private final int orderingID;
+
+ private int offset;
+
+ private int posCount;
+
+ private boolean canReclaim;
+
+ private Map<JournalFile, Integer> negCounts = new ConcurrentHashMap<JournalFile, Integer>();
+
+ public JournalFileImpl(final SequentialFile file, final int orderingID)
+ {
+ this.file = file;
+
+ this.orderingID = orderingID;
+ }
+
+ public int getPosCount()
+ {
+ return posCount;
+ }
+
+ public boolean isCanReclaim()
+ {
+ return canReclaim;
+ }
+
+ public void setCanReclaim(final boolean canReclaim)
+ {
+ this.canReclaim = canReclaim;
+ }
+
+ public void incNegCount(final JournalFile file)
+ {
+ Integer count = negCounts.get(file);
+
+ int c = count == null ? 1 : count.intValue() + 1;
+
+ negCounts.put(file, c);
+ }
+
+ public int getNegCount(final JournalFile file)
+ {
+ Integer count = negCounts.get(file);
+
+ if (count == null)
+ {
+ return 0;
+ }
+ else
+ {
+ return count.intValue();
+ }
+ }
+
+ public void incPosCount()
+ {
+ posCount++;
+ }
+
+ public void decPosCount()
+ {
+ posCount--;
+ }
+
+ public void extendOffset(final int delta)
+ {
+ offset += delta;
+ }
+
+ public int getOffset()
+ {
+ return offset;
+ }
+
+ public int getOrderingID()
+ {
+ return orderingID;
+ }
+
+ public void setOffset(final int offset)
+ {
+ this.offset = offset;
+ }
+
+ public SequentialFile getFile()
+ {
+ return file;
+ }
+
+ public String toString()
+ {
+ try
+ {
+ return "JournalFileImpl: " + file.getFileName();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ return "Error:" + e.toString();
+ }
+ }
+
+ /** Receive debug information about the journal */
+ public String debug()
+ {
+ StringBuilder builder = new StringBuilder();
+
+ for (Entry<JournalFile, Integer> entry: negCounts.entrySet())
+ {
+ builder.append(" file = " + entry.getKey() + " negcount value = " + entry.getValue() + "\n");
+ }
+
+ return builder.toString();
+ }
+
+
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2008-07-24 21:57:44 UTC (rev 4730)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2008-07-25 17:12:33 UTC (rev 4731)
@@ -41,120 +41,120 @@
*/
public class NIOSequentialFile implements SequentialFile
{
- private static final Logger log = Logger.getLogger(NIOSequentialFile.class);
-
- private String journalDir;
-
- private String fileName;
-
- private File file;
-
- private FileChannel channel;
-
- private RandomAccessFile rfile;
-
- public NIOSequentialFile(final String journalDir, final String fileName)
- {
- this.journalDir = journalDir;
-
- this.fileName = fileName;
- }
-
- public int getAlignment()
- {
- return 1;
- }
-
- public int calculateBlockStart(final int position) throws Exception
- {
- return position;
- }
-
- public String getFileName()
- {
- return fileName;
- }
-
- public void open() throws Exception
- {
- file = new File(journalDir + "/" + fileName);
-
- rfile = new RandomAccessFile(file, "rw");
-
- channel = rfile.getChannel();
- }
-
- public void fill(final int position, final int size, final byte fillCharacter) throws Exception
- {
- ByteBuffer bb = ByteBuffer.allocateDirect(size);
-
- for (int i = 0; i < size; i++)
- {
- bb.put(fillCharacter);
- }
-
- bb.flip();
-
- channel.position(position);
-
- channel.write(bb);
-
- channel.force(false);
-
- channel.position(0);
- }
-
- public void close() throws Exception
- {
- channel.close();
-
- rfile.close();
-
- channel = null;
-
- rfile = null;
-
- file = null;
- }
-
- public void delete() throws Exception
- {
- file.delete();
-
- close();
- }
-
- public int read(final ByteBuffer bytes) throws Exception
- {
- return read(bytes, null);
- }
-
- public int read(final ByteBuffer bytes, final IOCallback callback) throws Exception
- {
- try
- {
- int bytesRead = channel.read(bytes);
- if (callback != null)
- {
- callback.done();
- }
- bytes.flip();
- return bytesRead;
- }
- catch (Exception e)
- {
- if (callback != null)
- {
- callback.onError(-1, e.getLocalizedMessage());
- }
-
- throw e;
- }
-
- }
-
- public int write(final ByteBuffer bytes, final boolean sync) throws Exception
- {
+ private static final Logger log = Logger.getLogger(NIOSequentialFile.class);
+
+ private String journalDir;
+
+ private String fileName;
+
+ private File file;
+
+ private FileChannel channel;
+
+ private RandomAccessFile rfile;
+
+ public NIOSequentialFile(final String journalDir, final String fileName)
+ {
+ this.journalDir = journalDir;
+
+ this.fileName = fileName;
+ }
+
+ public int getAlignment()
+ {
+ return 1;
+ }
+
+ public int calculateBlockStart(final int position) throws Exception
+ {
+ return position;
+ }
+
+ public String getFileName()
+ {
+ return fileName;
+ }
+
+ public void open() throws Exception
+ {
+ file = new File(journalDir + "/" + fileName);
+
+ rfile = new RandomAccessFile(file, "rw");
+
+ channel = rfile.getChannel();
+ }
+
+ public void fill(final int position, final int size, final byte fillCharacter) throws Exception
+ {
+ ByteBuffer bb = ByteBuffer.allocateDirect(size);
+
+ for (int i = 0; i < size; i++)
+ {
+ bb.put(fillCharacter);
+ }
+
+ bb.flip();
+
+ channel.position(position);
+
+ channel.write(bb);
+
+ channel.force(false);
+
+ channel.position(0);
+ }
+
+ public void close() throws Exception
+ {
+ channel.close();
+
+ rfile.close();
+
+ channel = null;
+
+ rfile = null;
+
+ file = null;
+ }
+
+ public void delete() throws Exception
+ {
+ file.delete();
+
+ close();
+ }
+
+ public int read(final ByteBuffer bytes) throws Exception
+ {
+ return read(bytes, null);
+ }
+
+ public int read(final ByteBuffer bytes, final IOCallback callback) throws Exception
+ {
+ try
+ {
+ int bytesRead = channel.read(bytes);
+ if (callback != null)
+ {
+ callback.done();
+ }
+ bytes.flip();
+ return bytesRead;
+ }
+ catch (Exception e)
+ {
+ if (callback != null)
+ {
+ callback.onError(-1, e.getLocalizedMessage());
+ }
+
+ throw e;
+ }
+
+ }
+
+ public int write(final ByteBuffer bytes, final boolean sync) throws Exception
+ {
int bytesRead = channel.write(bytes);
if (sync)
@@ -163,12 +163,12 @@
}
return bytesRead;
- }
-
- public int write(final ByteBuffer bytes, final IOCallback callback) throws Exception
- {
- try
- {
+ }
+
+ public int write(final ByteBuffer bytes, final IOCallback callback) throws Exception
+ {
+ try
+ {
int bytesRead = channel.write(bytes);
if (callback != null)
@@ -177,17 +177,17 @@
}
return bytesRead;
- }
- catch (Exception e)
- {
- callback.onError(-1, e.getMessage());
- throw e;
- }
- }
-
- public void position(final int pos) throws Exception
- {
- channel.position(pos);
- }
-
+ }
+ catch (Exception e)
+ {
+ callback.onError(-1, e.getMessage());
+ throw e;
+ }
+ }
+
+ public void position(final int pos) throws Exception
+ {
+ channel.position(pos);
+ }
+
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java 2008-07-24 21:57:44 UTC (rev 4730)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java 2008-07-25 17:12:33 UTC (rev 4731)
@@ -37,17 +37,17 @@
*/
public class NIOSequentialFileFactory extends AbstractSequentialFactory implements SequentialFileFactory
{
- public NIOSequentialFileFactory(final String journalDir)
- {
- super(journalDir);
- }
-
- // maxIO is ignored on NIO
- public SequentialFile createSequentialFile(final String fileName, final int maxIO)
- {
- return new NIOSequentialFile(journalDir, fileName);
- }
-
+ public NIOSequentialFileFactory(final String journalDir)
+ {
+ super(journalDir);
+ }
+
+ // maxIO is ignored on NIO
+ public SequentialFile createSequentialFile(final String fileName, final int maxIO)
+ {
+ return new NIOSequentialFile(journalDir, fileName);
+ }
+
public boolean isSupportsCallbacks()
{
return false;
@@ -68,5 +68,5 @@
return 1;
}
-
+
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/Reclaimer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/Reclaimer.java 2008-07-24 21:57:44 UTC (rev 4730)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/Reclaimer.java 2008-07-25 17:12:33 UTC (rev 4731)
@@ -49,56 +49,56 @@
*/
public class Reclaimer
{
- private static final Logger log = Logger.getLogger(Reclaimer.class);
-
- public void scan(final JournalFile[] files)
- {
- for (int i = 0; i < files.length; i++)
- {
- //First we evaluate criterion 1)
-
- JournalFile currentFile = files[i];
-
- int posCount = currentFile.getPosCount();
-
- int totNeg = 0;
-
- for (int j = i; j < files.length; j++)
- {
- totNeg += files[j].getNegCount(currentFile);
- }
-
- currentFile.setCanReclaim(true);
-
- if (posCount <= totNeg)
- {
- //Now we evaluate criterion 2)
-
- for (int j = 0; j <= i; j++)
- {
- JournalFile file = files[j];
-
- int negCount = currentFile.getNegCount(file);
-
- if (negCount != 0)
- {
- if (file.isCanReclaim())
- {
- //Ok
- }
- else
- {
- currentFile.setCanReclaim(false);
-
- break;
- }
- }
- }
- }
- else
- {
- currentFile.setCanReclaim(false);
- }
- }
- }
+ private static final Logger log = Logger.getLogger(Reclaimer.class);
+
+ public void scan(final JournalFile[] files)
+ {
+ for (int i = 0; i < files.length; i++)
+ {
+ //First we evaluate criterion 1)
+
+ JournalFile currentFile = files[i];
+
+ int posCount = currentFile.getPosCount();
+
+ int totNeg = 0;
+
+ for (int j = i; j < files.length; j++)
+ {
+ totNeg += files[j].getNegCount(currentFile);
+ }
+
+ currentFile.setCanReclaim(true);
+
+ if (posCount <= totNeg)
+ {
+ //Now we evaluate criterion 2)
+
+ for (int j = 0; j <= i; j++)
+ {
+ JournalFile file = files[j];
+
+ int negCount = currentFile.getNegCount(file);
+
+ if (negCount != 0)
+ {
+ if (file.isCanReclaim())
+ {
+ //Ok
+ }
+ else
+ {
+ currentFile.setCanReclaim(false);
+
+ break;
+ }
+ }
+ }
+ }
+ else
+ {
+ currentFile.setCanReclaim(false);
+ }
+ }
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java 2008-07-24 21:57:44 UTC (rev 4730)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java 2008-07-25 17:12:33 UTC (rev 4731)
@@ -40,19 +40,19 @@
public class TransactionHolder
{
- public TransactionHolder(final long id)
- {
- this.transactionID = id;
- }
-
- public final long transactionID;
-
- public final List<RecordInfo> recordInfos = new ArrayList<RecordInfo>();
-
- public final Set<Long> recordsToDelete = new HashSet<Long>();
-
- public boolean prepared;
-
- public boolean invalid;
-
+ public TransactionHolder(final long id)
+ {
+ this.transactionID = id;
+ }
+
+ public final long transactionID;
+
+ public final List<RecordInfo> recordInfos = new ArrayList<RecordInfo>();
+
+ public final Set<Long> recordsToDelete = new HashSet<Long>();
+
+ public boolean prepared;
+
+ public boolean invalid;
+
}
More information about the jboss-cvs-commits
mailing list