[jboss-cvs] JBoss Messaging SVN: r4678 - trunk/src/main/org/jboss/messaging/core/journal/impl.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Jul 11 21:19:25 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-07-11 21:19:25 -0400 (Fri, 11 Jul 2008)
New Revision: 4678

Modified:
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
Log:
This file was messed up with spaces and tabs, so I used my IDE's alignment tool to fix it up.
(no code changes on this commit.. just format)

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-07-12 01:14:15 UTC (rev 4677)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-07-12 01:19:25 UTC (rev 4678)
@@ -70,109 +70,109 @@
  */
 public class JournalImpl implements TestableJournal
 {
-	private static final Logger log = Logger.getLogger(JournalImpl.class);
-	
-	private static final boolean trace = log.isTraceEnabled();
-	
-	private static final int STATE_STOPPED = 0;
-	
-	private static final int STATE_STARTED = 1;
-	
-	private static final int STATE_LOADED = 2;
-	
-	// The sizes of primitive types
-	
-	private static final int SIZE_LONG = 8;
-	
-	private static final int SIZE_INT = 4;
-	
-	private static final int SIZE_BYTE = 1;
-	
-	public static final int MIN_FILE_SIZE = 1024;
-	
-	public static final int MIN_TASK_PERIOD = 1000;
-	
-	//Record markers - they must be all unique
-	
-	public static final int SIZE_HEADER = 8;
-	
-	public static final int SIZE_ADD_RECORD = SIZE_BYTE + SIZE_LONG + SIZE_BYTE + SIZE_INT + SIZE_BYTE; // + record.length
-	
-	public static final byte ADD_RECORD = 11;
-	
-	public static final byte SIZE_UPDATE_RECORD = SIZE_BYTE + SIZE_LONG + SIZE_BYTE + SIZE_INT + SIZE_BYTE; // + record.length;
-	
-	public static final byte UPDATE_RECORD = 12;
-	
-	public static final int SIZE_DELETE_RECORD = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
-	
-	public static final byte DELETE_RECORD = 13;
-	
-	public static final byte ADD_RECORD_TX = 14;
-	
-	public static final int SIZE_ADD_RECORD_TX = SIZE_BYTE + SIZE_LONG + SIZE_BYTE + SIZE_LONG + SIZE_INT + SIZE_BYTE; // Add the size of Bytes on this
-	
-	public static final int  SIZE_UPDATE_RECORD_TX = SIZE_BYTE + SIZE_LONG + SIZE_BYTE + SIZE_LONG + SIZE_INT + SIZE_BYTE;  // Add the size of Bytes on this
-	
-	public static final byte UPDATE_RECORD_TX = 15;
-	
-	public static final int  SIZE_DELETE_RECORD_TX = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_BYTE;
-	
-	public static final byte DELETE_RECORD_TX = 16;
-	
-	public static final int SIZE_PREPARE_RECORD = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
-	
-	public static final byte PREPARE_RECORD = 17;
-	
-	
-	public static final byte SIZE_COMMIT_RECORD = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
-	
-	public static final byte COMMIT_RECORD = 18;
-	
-	public static final byte SIZE_ROLLBACK_RECORD = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
-	
-	public static final byte ROLLBACK_RECORD = 19;
-	
-	public static final byte DONE = 20;
-	
-	public static final byte FILL_CHARACTER = 74; // Letter 'J' 
-	
-	
-	// used for Asynchronous IO only (ignored on NIO).
-	private final int maxAIO;
-	
+   private static final Logger log = Logger.getLogger(JournalImpl.class);
+   
+   private static final boolean trace = log.isTraceEnabled();
+   
+   private static final int STATE_STOPPED = 0;
+   
+   private static final int STATE_STARTED = 1;
+   
+   private static final int STATE_LOADED = 2;
+   
+   // The sizes of primitive types
+   
+   private static final int SIZE_LONG = 8;
+   
+   private static final int SIZE_INT = 4;
+   
+   private static final int SIZE_BYTE = 1;
+   
+   public static final int MIN_FILE_SIZE = 1024;
+   
+   public static final int MIN_TASK_PERIOD = 1000;
+   
+   //Record markers - they must be all unique
+   
+   public static final int SIZE_HEADER = 8;
+   
+   public static final int SIZE_ADD_RECORD = SIZE_BYTE + SIZE_LONG + SIZE_BYTE + SIZE_INT + SIZE_BYTE; // + record.length
+   
+   public static final byte ADD_RECORD = 11;
+   
+   public static final byte SIZE_UPDATE_RECORD = SIZE_BYTE + SIZE_LONG + SIZE_BYTE + SIZE_INT + SIZE_BYTE; // + record.length;
+   
+   public static final byte UPDATE_RECORD = 12;
+   
+   public static final int SIZE_DELETE_RECORD = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
+   
+   public static final byte DELETE_RECORD = 13;
+   
+   public static final byte ADD_RECORD_TX = 14;
+   
+   public static final int SIZE_ADD_RECORD_TX = SIZE_BYTE + SIZE_LONG + SIZE_BYTE + SIZE_LONG + SIZE_INT + SIZE_BYTE; // Add the size of Bytes on this
+   
+   public static final int  SIZE_UPDATE_RECORD_TX = SIZE_BYTE + SIZE_LONG + SIZE_BYTE + SIZE_LONG + SIZE_INT + SIZE_BYTE;  // Add the size of Bytes on this
+   
+   public static final byte UPDATE_RECORD_TX = 15;
+   
+   public static final int  SIZE_DELETE_RECORD_TX = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_BYTE;
+   
+   public static final byte DELETE_RECORD_TX = 16;
+   
+   public static final int SIZE_PREPARE_RECORD = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
+   
+   public static final byte PREPARE_RECORD = 17;
+   
+   
+   public static final byte SIZE_COMMIT_RECORD = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
+   
+   public static final byte COMMIT_RECORD = 18;
+   
+   public static final byte SIZE_ROLLBACK_RECORD = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
+   
+   public static final byte ROLLBACK_RECORD = 19;
+   
+   public static final byte DONE = 20;
+   
+   public static final byte FILL_CHARACTER = 74; // Letter 'J' 
+   
+   
    // used for Asynchronous IO only (ignored on NIO).
-	private final long aioTimeout; // in ms
-	
-	private final int fileSize;
-	
-	private final int minFiles;
-	
-	private final boolean syncTransactional;
-	
-	private final boolean syncNonTransactional;
-	
-	private final SequentialFileFactory fileFactory;
-	
-	private final long taskPeriod;
-	
-	public final String filePrefix;
-	
-	public final String fileExtension;
-	
-	
-	private final Queue<JournalFile> dataFiles = new ConcurrentLinkedQueue<JournalFile>();
-	
-	private final Queue<JournalFile> freeFiles = new ConcurrentLinkedQueue<JournalFile>();
-	
-	private final BlockingQueue<JournalFile> openedFiles = new LinkedBlockingQueue<JournalFile>();
-	
-	private final Map<Long, PosFiles> posFilesMap = new ConcurrentHashMap<Long, PosFiles>();
-	
-	private final Map<Long, TransactionNegPos> transactionInfos = new ConcurrentHashMap<Long, TransactionNegPos>();
-
-	private final ConcurrentMap<Long, TransactionCallback> transactionCallbacks = new ConcurrentHashMap<Long, TransactionCallback>();
-	
+   private final int maxAIO;
+   
+   // used for Asynchronous IO only (ignored on NIO).
+   private final long aioTimeout; // in ms
+   
+   private final int fileSize;
+   
+   private final int minFiles;
+   
+   private final boolean syncTransactional;
+   
+   private final boolean syncNonTransactional;
+   
+   private final SequentialFileFactory fileFactory;
+   
+   private final long taskPeriod;
+   
+   public final String filePrefix;
+   
+   public final String fileExtension;
+   
+   
+   private final Queue<JournalFile> dataFiles = new ConcurrentLinkedQueue<JournalFile>();
+   
+   private final Queue<JournalFile> freeFiles = new ConcurrentLinkedQueue<JournalFile>();
+   
+   private final BlockingQueue<JournalFile> openedFiles = new LinkedBlockingQueue<JournalFile>();
+   
+   private final Map<Long, PosFiles> posFilesMap = new ConcurrentHashMap<Long, PosFiles>();
+   
+   private final Map<Long, TransactionNegPos> transactionInfos = new ConcurrentHashMap<Long, TransactionNegPos>();
+   
+   private final ConcurrentMap<Long, TransactionCallback> transactionCallbacks = new ConcurrentHashMap<Long, TransactionCallback>();
+   
    private ExecutorService closingExecutor = null;
    
    /** 
@@ -181,92 +181,92 @@
     * */
    private ExecutorService openExecutor = null;
    
-	/*
+   /*
     * We use a semaphore rather than synchronized since it performs better when
     * contended
     */
-	
-	//TODO - improve concurrency by allowing concurrent accesses if doesn't change current file
+   
+   //TODO - improve concurrency by allowing concurrent accesses if doesn't change current file
    // this locks access to currentFile
-	private final Semaphore lock = new Semaphore(1, true);
-	
-	private volatile JournalFile currentFile ;
-	
-	private volatile int state;
-	
-	private volatile long lastOrderingID;
-	
-	private final Timer timer = new Timer(true);
-	
-	private TimerTask reclaimerTask;
-	
-	private final AtomicLong transactionIDSequence = new AtomicLong(0);
-	
-	private Reclaimer reclaimer = new Reclaimer();
-			
-	public JournalImpl(final int fileSize, final int minFiles,
-			             final boolean syncTransactional, final boolean syncNonTransactional,
-			             final SequentialFileFactory fileFactory, final long taskPeriod,
-			             final String filePrefix, final String fileExtension, final int maxAIO, final long aioTimeout)
-	{
-		if (fileSize < MIN_FILE_SIZE)
-		{
-			throw new IllegalArgumentException("File size cannot be less than " + MIN_FILE_SIZE + " bytes");
-		}
-		if (minFiles < 2)
-		{
-			throw new IllegalArgumentException("minFiles cannot be less than 2");
-		}
-		if (fileFactory == null)
-		{
-			throw new NullPointerException("fileFactory is null");
-		}
-		if (taskPeriod < MIN_TASK_PERIOD)
-		{
-			throw new IllegalArgumentException("taskPeriod cannot be less than " + MIN_TASK_PERIOD);
-		}
-		if (filePrefix == null)
-		{
-			throw new NullPointerException("filePrefix is null");
-		}
-		if (fileExtension == null)
-		{
-			throw new NullPointerException("fileExtension is null");
-		}
-		if (maxAIO <= 0)
-		{
-		   throw new IllegalStateException("maxAIO should aways be a positive number");
-		}
-		if (aioTimeout < 1)
-		{
-		   throw new IllegalStateException("aio-timeout cannot be less than 1 second");
-		}
-		
-		this.fileSize = fileSize;
-		
-		this.minFiles = minFiles;
-		
-		this.syncTransactional = syncTransactional;
-		
-		this.syncNonTransactional = syncNonTransactional;
-		
-		this.fileFactory = fileFactory;
-		
-		this.taskPeriod = taskPeriod;
-		
-		this.filePrefix = filePrefix;
-		
-		this.fileExtension = fileExtension;
-		
-		this.maxAIO = maxAIO;
-		
-		this.aioTimeout = aioTimeout;
-	}
-	
-	// Journal implementation ----------------------------------------------------------------
-
-	public void appendAddRecord(final long id, final byte recordType, final EncodingSupport record) throws Exception
+   private final Semaphore lock = new Semaphore(1, true);
+   
+   private volatile JournalFile currentFile ;
+   
+   private volatile int state;
+   
+   private volatile long lastOrderingID;
+   
+   private final Timer timer = new Timer(true);
+   
+   private TimerTask reclaimerTask;
+   
+   private final AtomicLong transactionIDSequence = new AtomicLong(0);
+   
+   private Reclaimer reclaimer = new Reclaimer();
+   
+   public JournalImpl(final int fileSize, final int minFiles,
+         final boolean syncTransactional, final boolean syncNonTransactional,
+         final SequentialFileFactory fileFactory, final long taskPeriod,
+         final String filePrefix, final String fileExtension, final int maxAIO, final long aioTimeout)
    {
+      if (fileSize < MIN_FILE_SIZE)
+      {
+         throw new IllegalArgumentException("File size cannot be less than " + MIN_FILE_SIZE + " bytes");
+      }
+      if (minFiles < 2)
+      {
+         throw new IllegalArgumentException("minFiles cannot be less than 2");
+      }
+      if (fileFactory == null)
+      {
+         throw new NullPointerException("fileFactory is null");
+      }
+      if (taskPeriod < MIN_TASK_PERIOD)
+      {
+         throw new IllegalArgumentException("taskPeriod cannot be less than " + MIN_TASK_PERIOD);
+      }
+      if (filePrefix == null)
+      {
+         throw new NullPointerException("filePrefix is null");
+      }
+      if (fileExtension == null)
+      {
+         throw new NullPointerException("fileExtension is null");
+      }
+      if (maxAIO <= 0)
+      {
+         throw new IllegalStateException("maxAIO should aways be a positive number");
+      }
+      if (aioTimeout < 1)
+      {
+         throw new IllegalStateException("aio-timeout cannot be less than 1 second");
+      }
+      
+      this.fileSize = fileSize;
+      
+      this.minFiles = minFiles;
+      
+      this.syncTransactional = syncTransactional;
+      
+      this.syncNonTransactional = syncNonTransactional;
+      
+      this.fileFactory = fileFactory;
+      
+      this.taskPeriod = taskPeriod;
+      
+      this.filePrefix = filePrefix;
+      
+      this.fileExtension = fileExtension;
+      
+      this.maxAIO = maxAIO;
+      
+      this.aioTimeout = aioTimeout;
+   }
+   
+   // Journal implementation ----------------------------------------------------------------
+   
+   public void appendAddRecord(final long id, final byte recordType, final EncodingSupport record) throws Exception
+   {
       if (state != STATE_LOADED)
       {
          throw new IllegalStateException("Journal must be loaded first");
@@ -287,34 +287,34 @@
       bb.rewind();
       
       JournalFile usedFile = appendRecord(bb.getBuffer(), syncNonTransactional, null);
-     
+      
       posFilesMap.put(id, new PosFiles(usedFile));
    }
-	
-	public void appendAddRecord(final long id, final byte recordType, final byte[] record) throws Exception
-	{
-		if (state != STATE_LOADED)
-		{
-			throw new IllegalStateException("Journal must be loaded first");
-		}
-		
-		int size = SIZE_ADD_RECORD + record.length;
-		
-		ByteBuffer bb = fileFactory.newBuffer(size);
-		
-		bb.put(ADD_RECORD);		
-		bb.putLong(id);
-		bb.put(recordType);
-		bb.putInt(record.length);		
-		bb.put(record);		
-		bb.put(DONE);			
-		bb.rewind();
-		
+   
+   public void appendAddRecord(final long id, final byte recordType, final byte[] record) throws Exception
+   {
+      if (state != STATE_LOADED)
+      {
+         throw new IllegalStateException("Journal must be loaded first");
+      }
+      
+      int size = SIZE_ADD_RECORD + record.length;
+      
+      ByteBuffer bb = fileFactory.newBuffer(size);
+      
+      bb.put(ADD_RECORD);		
+      bb.putLong(id);
+      bb.put(recordType);
+      bb.putInt(record.length);		
+      bb.put(record);		
+      bb.put(DONE);			
+      bb.rewind();
+      
       JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
-		
-		posFilesMap.put(id, new PosFiles(usedFile));
-	}
-	
+      
+      posFilesMap.put(id, new PosFiles(usedFile));
+   }
+   
    public void appendUpdateRecord(final long id, final byte recordType, final byte[] record) throws Exception
    {
       if (state != STATE_LOADED)
@@ -340,9 +340,9 @@
       bb.put(record);      
       bb.put(DONE);     
       bb.rewind();
-         
+      
       JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
-            
+      
       posFiles.addUpdateFile(usedFile);
    }
    
@@ -371,44 +371,44 @@
       record.encode(bb);
       bb.putByte(DONE);     
       bb.rewind();
-         
+      
       JournalFile usedFile = appendRecord(bb.getBuffer(), syncNonTransactional, null);
-            
+      
       posFiles.addUpdateFile(usedFile);
    }
    
-	public void appendDeleteRecord(long id) throws Exception
-	{
-		if (state != STATE_LOADED)
-		{
-			throw new IllegalStateException("Journal must be loaded first");
-		}
-		
-		PosFiles posFiles = posFilesMap.remove(id);
-		
-		if (posFiles == null)
-		{
-			throw new IllegalStateException("Cannot find add info " + id);
-		}
-		
-		int size = SIZE_DELETE_RECORD;
-		
-		ByteBuffer bb = fileFactory.newBuffer(size); 
-		
-		bb.put(DELETE_RECORD);     
-		bb.putLong(id);      
-		bb.put(DONE);     
-		bb.rewind();
-		
+   public void appendDeleteRecord(long id) throws Exception
+   {
+      if (state != STATE_LOADED)
+      {
+         throw new IllegalStateException("Journal must be loaded first");
+      }
+      
+      PosFiles posFiles = posFilesMap.remove(id);
+      
+      if (posFiles == null)
+      {
+         throw new IllegalStateException("Cannot find add info " + id);
+      }
+      
+      int size = SIZE_DELETE_RECORD;
+      
+      ByteBuffer bb = fileFactory.newBuffer(size); 
+      
+      bb.put(DELETE_RECORD);     
+      bb.putLong(id);      
+      bb.put(DONE);     
+      bb.rewind();
+      
       JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
       posFiles.addDelete(usedFile);
-	}     
-	
-	public long getTransactionID()
-	{
-		return transactionIDSequence.getAndIncrement();
-	}
-	
+   }     
+   
+   public long getTransactionID()
+   {
+      return transactionIDSequence.getAndIncrement();
+   }
+   
    public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, 
                                             final EncodingSupport record) throws Exception
    {
@@ -433,7 +433,7 @@
       bb.rewind();
       
       JournalFile usedFile;
-
+      
       usedFile = appendRecord(bb.getBuffer(), false, getTransactionCallback(txID));
       
       TransactionNegPos tx = getTransactionInfo(txID);
@@ -441,35 +441,35 @@
       tx.addPos(usedFile, id);
    }
    
-	public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, final byte[] record) throws Exception
+   public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, final byte[] record) throws Exception
    {
-	   if (state != STATE_LOADED)
-	   {
-	      throw new IllegalStateException("Journal must be loaded first");
-	   }
-	   
-	   int size = SIZE_ADD_RECORD_TX + record.length;
-	   
-	   ByteBuffer bb = fileFactory.newBuffer(size); 
-	   
-	   bb.put(ADD_RECORD_TX);
-	   bb.putLong(txID);
+      if (state != STATE_LOADED)
+      {
+         throw new IllegalStateException("Journal must be loaded first");
+      }
+      
+      int size = SIZE_ADD_RECORD_TX + record.length;
+      
+      ByteBuffer bb = fileFactory.newBuffer(size); 
+      
+      bb.put(ADD_RECORD_TX);
+      bb.putLong(txID);
       bb.put(recordType);
-	   bb.putLong(id);
-	   bb.putInt(record.length);
-	   bb.put(record);
-	   bb.put(DONE);     
-	   bb.rewind();
-	   
-	   JournalFile usedFile;
-	   
+      bb.putLong(id);
+      bb.putInt(record.length);
+      bb.put(record);
+      bb.put(DONE);     
+      bb.rewind();
+      
+      JournalFile usedFile;
+      
       usedFile = appendRecord(bb, false, getTransactionCallback(txID));
-	   
-	   TransactionNegPos tx = getTransactionInfo(txID);
-	   
-	   tx.addPos(usedFile, id);
+      
+      TransactionNegPos tx = getTransactionInfo(txID);
+      
+      tx.addPos(usedFile, id);
    }
-	
+   
    public void appendUpdateRecordTransactional(final long txID, final long id, byte recordType, final byte[] record) throws Exception
    {
       if (state != STATE_LOADED)
@@ -529,130 +529,130 @@
       tx.addPos(usedFile, id);
    }
    
-	public void appendDeleteRecordTransactional(final long txID, final long id) throws Exception
-	{
-		if (state != STATE_LOADED)
-		{
-			throw new IllegalStateException("Journal must be loaded first");
-		}
-	
-		int size = SIZE_DELETE_RECORD_TX;
-		
-		ByteBuffer bb = fileFactory.newBuffer(size); 
-		
-		bb.put(DELETE_RECORD_TX);     
-		bb.putLong(txID);    
-		bb.putLong(id);      
-		bb.put(DONE);        
-		bb.rewind();
-		
+   public void appendDeleteRecordTransactional(final long txID, final long id) throws Exception
+   {
+      if (state != STATE_LOADED)
+      {
+         throw new IllegalStateException("Journal must be loaded first");
+      }
+      
+      int size = SIZE_DELETE_RECORD_TX;
+      
+      ByteBuffer bb = fileFactory.newBuffer(size); 
+      
+      bb.put(DELETE_RECORD_TX);     
+      bb.putLong(txID);    
+      bb.putLong(id);      
+      bb.put(DONE);        
+      bb.rewind();
+      
       JournalFile usedFile;
       
       usedFile = appendRecord(bb, false, getTransactionCallback(txID));
-
+      
       TransactionNegPos tx = getTransactionInfo(txID);
-		
-		tx.addNeg(usedFile, id);      
-	}  
-	
-	public void appendPrepareRecord(final long txID) throws Exception
-	{
-		if (state != STATE_LOADED)
-		{
-			throw new IllegalStateException("Journal must be loaded first");
-		}
-		
-		TransactionNegPos tx = transactionInfos.get(txID);
-		
-		if (tx == null)
-		{
-			throw new IllegalStateException("Cannot find tx with id " + txID);
-		}
-		
-		int size = SIZE_PREPARE_RECORD;
-		
-		ByteBuffer bb = fileFactory.newBuffer(size); 
-		
-		bb.put(PREPARE_RECORD);    
-		bb.putLong(txID);
-		bb.put(DONE);           
-		bb.rewind();
-							
-		JournalFile usedFile;
       
+      tx.addNeg(usedFile, id);      
+   }  
+   
+   public void appendPrepareRecord(final long txID) throws Exception
+   {
+      if (state != STATE_LOADED)
+      {
+         throw new IllegalStateException("Journal must be loaded first");
+      }
+      
+      TransactionNegPos tx = transactionInfos.get(txID);
+      
+      if (tx == null)
+      {
+         throw new IllegalStateException("Cannot find tx with id " + txID);
+      }
+      
+      int size = SIZE_PREPARE_RECORD;
+      
+      ByteBuffer bb = fileFactory.newBuffer(size); 
+      
+      bb.put(PREPARE_RECORD);    
+      bb.putLong(txID);
+      bb.put(DONE);           
+      bb.rewind();
+      
+      JournalFile usedFile;
+      
       usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));
       
-		tx.prepare(usedFile);
-	}
-	
-	public void appendCommitRecord(final long txID) throws Exception
-	{
-		if (state != STATE_LOADED)
-		{
-			throw new IllegalStateException("Journal must be loaded first");
-		}		
+      tx.prepare(usedFile);
+   }
+   
+   public void appendCommitRecord(final long txID) throws Exception
+   {
+      if (state != STATE_LOADED)
+      {
+         throw new IllegalStateException("Journal must be loaded first");
+      }		
       
-		TransactionNegPos tx = transactionInfos.remove(txID);
-		
-		if (tx == null)
-		{
-			throw new IllegalStateException("Cannot find tx with id " + txID);
-		}
-		
-		int size = SIZE_COMMIT_RECORD;
-		
-		ByteBuffer bb = fileFactory.newBuffer(size); 
-		
-		bb.put(COMMIT_RECORD);     
-		bb.putLong(txID);    
-		bb.put(DONE);           
-		bb.rewind();
-		
-		JournalFile usedFile;
+      TransactionNegPos tx = transactionInfos.remove(txID);
       
+      if (tx == null)
+      {
+         throw new IllegalStateException("Cannot find tx with id " + txID);
+      }
+      
+      int size = SIZE_COMMIT_RECORD;
+      
+      ByteBuffer bb = fileFactory.newBuffer(size); 
+      
+      bb.put(COMMIT_RECORD);     
+      bb.putLong(txID);    
+      bb.put(DONE);           
+      bb.rewind();
+      
+      JournalFile usedFile;
+      
       usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));
-		
-		transactionCallbacks.remove(txID);
-		
-		tx.commit(usedFile);
-		
-	}
-	
-	public void appendRollbackRecord(final long txID) throws Exception
-	{
-		if (state != STATE_LOADED)
-		{
-			throw new IllegalStateException("Journal must be loaded first");
-		}
-		
-		TransactionNegPos tx = transactionInfos.remove(txID);
-		
-		if (tx == null)
-		{
-			throw new IllegalStateException("Cannot find tx with id " + txID);
-		}
-		
-		int size = SIZE_ROLLBACK_RECORD;
-		
-		ByteBuffer bb = fileFactory.newBuffer(size); 
-		
-		bb.put(ROLLBACK_RECORD);      
-		bb.putLong(txID);
-		bb.put(DONE);        
-		bb.rewind();
-		
-		JournalFile usedFile;
-
-		usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));      
-		
-		transactionCallbacks.remove(txID);
-				
-		tx.rollback(usedFile);
-	}
-	
+      
+      transactionCallbacks.remove(txID);
+      
+      tx.commit(usedFile);
+      
+   }
+   
+   public void appendRollbackRecord(final long txID) throws Exception
+   {
+      if (state != STATE_LOADED)
+      {
+         throw new IllegalStateException("Journal must be loaded first");
+      }
+      
+      TransactionNegPos tx = transactionInfos.remove(txID);
+      
+      if (tx == null)
+      {
+         throw new IllegalStateException("Cannot find tx with id " + txID);
+      }
+      
+      int size = SIZE_ROLLBACK_RECORD;
+      
+      ByteBuffer bb = fileFactory.newBuffer(size); 
+      
+      bb.put(ROLLBACK_RECORD);      
+      bb.putLong(txID);
+      bb.put(DONE);        
+      bb.rewind();
+      
+      JournalFile usedFile;
+      
+      usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));      
+      
+      transactionCallbacks.remove(txID);
+      
+      tx.rollback(usedFile);
+   }
+   
    public synchronized long load(final List<RecordInfo> committedRecords,
          final List<PreparedTransactionInfo> preparedTransactions) throws Exception
-   {
+         {
       if (state != STATE_STARTED)
       {
          throw new IllegalStateException("Journal must be in started state");
@@ -709,7 +709,7 @@
       for (JournalFile file: orderedFiles)
       {  
          file.getFile().open();
-            
+         
          ByteBuffer bb = fileFactory.newBuffer(fileSize);
          
          int bytesRead = file.getFile().read(bb);
@@ -732,7 +732,7 @@
             int pos = bb.position();
             
             byte recordType = bb.get();
-                
+            
             switch(recordType)
             {
                case ADD_RECORD:
@@ -742,7 +742,7 @@
                   maxMessageID = Math.max(maxMessageID, id);
                   
                   byte userRecordType = bb.get();
-                   
+                  
                   int size = bb.getInt();                
                   byte[] record = new byte[size];                 
                   bb.get(record);
@@ -765,7 +765,7 @@
                case UPDATE_RECORD:                 
                {
                   long id = bb.getLong();    
-
+                  
                   maxMessageID = Math.max(maxMessageID, id);
                   
                   byte userRecordType = bb.get();
@@ -925,7 +925,7 @@
                   maxTransactionID = Math.max(maxTransactionID, txID);                 
                   long id = bb.getLong(); 
                   maxMessageID = Math.max(maxMessageID, id);
-
+                  
                   byte end = bb.get();
                   
                   if (end != DONE)
@@ -963,7 +963,7 @@
                case PREPARE_RECORD:
                {
                   long txID = bb.getLong();           
-
+                  
                   maxTransactionID = Math.max(maxTransactionID, txID);                 
                   byte end = bb.get();
                   
@@ -1034,7 +1034,7 @@
                case ROLLBACK_RECORD:
                {
                   long txID = bb.getLong();     
-   
+                  
                   maxTransactionID = Math.max(maxTransactionID, txID);                 
                   byte end = bb.get();
                   
@@ -1095,7 +1095,7 @@
          }
          
          file.getFile().close();          
-
+         
          if (hasData)
          {        
             dataFiles.add(file);
@@ -1192,22 +1192,22 @@
       state = STATE_LOADED;
       
       return maxMessageID;
+         }
+   
+   public int getAlignment() throws Exception
+   {
+      return this.fileFactory.getAlignment();
    }
-
-	public int getAlignment() throws Exception
-	{
-		return this.fileFactory.getAlignment();
-	}
-	
-	public synchronized void checkReclaimStatus() throws Exception
-	{
-		JournalFile[] files = new JournalFile[dataFiles.size()];
-		
-		reclaimer.scan(dataFiles.toArray(files));		
-	}
-
-	public String debug() throws Exception
+   
+   public synchronized void checkReclaimStatus() throws Exception
    {
+      JournalFile[] files = new JournalFile[dataFiles.size()];
+      
+      reclaimer.scan(dataFiles.toArray(files));		
+   }
+   
+   public String debug() throws Exception
+   {
       this.checkReclaimStatus();
       
       StringBuilder builder = new StringBuilder();
@@ -1235,12 +1235,12 @@
       }
       
       builder.append("#Opened Files:" + this.openedFiles.size());
-            
+      
       return builder.toString();
    }
    
    // TestableJournal implementation --------------------------------------------------------------
-	
+   
    /** Method for use on testcases.
     *  It will call waitComplete on every transaction, so any assertions on the file system will be correct after this */
    public void debugWait() throws Exception
@@ -1254,7 +1254,7 @@
       {
          // Send something to the closingExecutor, just to make sure we went until its end
          final CountDownLatch latch = new CountDownLatch(1);
-
+         
          this.closingExecutor.execute(new Runnable()
          {
             public void run()
@@ -1265,12 +1265,12 @@
          
          latch.await();
       }
-
+      
       if (openExecutor != null && !openExecutor.isShutdown())
       {
          // Send something to the closingExecutor, just to make sure we went until its end
          final CountDownLatch latch = new CountDownLatch(1);
-
+         
          this.openExecutor.execute(new Runnable()
          {
             public void run()
@@ -1281,93 +1281,93 @@
          
          latch.await();
       }
+      
+   }
    
-   }
-
-	public synchronized void checkAndReclaimFiles() throws Exception
-	{
-		checkReclaimStatus();
-		
-		for (JournalFile file: dataFiles)
-		{           
-			if (file.isCanReclaim())
-			{
-				//File can be reclaimed or deleted
-				
+   public synchronized void checkAndReclaimFiles() throws Exception
+   {
+      checkReclaimStatus();
+      
+      for (JournalFile file: dataFiles)
+      {           
+         if (file.isCanReclaim())
+         {
+            //File can be reclaimed or deleted
+            
             if (trace) log.trace("Reclaiming file " + file);
             log.info("Reclaiming file " + file); // remove this
-				
-				dataFiles.remove(file);
-				
-				//FIXME - size() involves a scan!!!
-				if (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() < minFiles)
-				{
-					//Re-initialise it
-					
-					long newOrderingID = generateOrderingID();
-					
-					SequentialFile sf = file.getFile();
-					
+            
+            dataFiles.remove(file);
+            
+            //FIXME - size() involves a scan!!!
+            if (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() < minFiles)
+            {
+               //Re-initialise it
+               
+               long newOrderingID = generateOrderingID();
+               
+               SequentialFile sf = file.getFile();
+               
                log.info("Adding " + sf + "to freeFiles");  // remove this
-
+               
                sf.open();
-					
-					ByteBuffer bb = fileFactory.newBuffer(SIZE_LONG); 
-					
-					bb.putLong(newOrderingID);
-					
-					//Note we MUST re-fill it - otherwise we won't be able to detect corrupt records
-					
-					//TODO - if we can avoid this somehow would be good, since filling the file is a heavyweight
-					//operation and can impact other IO operations on the disk
-					sf.fill(0, fileSize, FILL_CHARACTER);
-					
-					int bytesWritten = sf.write(bb, true);
-					
-					JournalFile jf = new JournalFileImpl(sf, newOrderingID);
-					
-					sf.position(bytesWritten);
-					
-					jf.setOffset(bytesWritten);
-					
-					sf.close();
-					
-					freeFiles.add(jf);  
-				}
-				else
-				{
+               
+               ByteBuffer bb = fileFactory.newBuffer(SIZE_LONG); 
+               
+               bb.putLong(newOrderingID);
+               
+               //Note we MUST re-fill it - otherwise we won't be able to detect corrupt records
+               
+               //TODO - if we can avoid this somehow would be good, since filling the file is a heavyweight
+               //operation and can impact other IO operations on the disk
+               sf.fill(0, fileSize, FILL_CHARACTER);
+               
+               int bytesWritten = sf.write(bb, true);
+               
+               JournalFile jf = new JournalFileImpl(sf, newOrderingID);
+               
+               sf.position(bytesWritten);
+               
+               jf.setOffset(bytesWritten);
+               
+               sf.close();
+               
+               freeFiles.add(jf);  
+            }
+            else
+            {
                log.info("Deleting " + file.getFile());  // remove this
-
-					file.getFile().open();
-					
-					file.getFile().delete();
-				}
-			}
-		}
-	}
-	
-	public int getDataFilesCount()
-	{
-		return dataFiles.size();
-	}
-	
-	public int getFreeFilesCount()
-	{
-		return freeFiles.size();
-	}
-	
-	public int getOpenedFilesCount()
-	{
-	   return openedFiles.size();
-	}
-	
-	public int getIDMapSize()
-	{
-		return posFilesMap.size();
-	}
-	
-	public int getFileSize()
+               
+               file.getFile().open();
+               
+               file.getFile().delete();
+            }
+         }
+      }
+   }
+   
+   public int getDataFilesCount()
    {
+      return dataFiles.size();
+   }
+   
+   public int getFreeFilesCount()
+   {
+      return freeFiles.size();
+   }
+   
+   public int getOpenedFilesCount()
+   {
+      return openedFiles.size();
+   }
+   
+   public int getIDMapSize()
+   {
+      return posFilesMap.size();
+   }
+   
+   public int getFileSize()
+   {
       return fileSize;
    }
    
@@ -1427,234 +1427,234 @@
       
       debugWait();
    }
-
-	// MessagingComponent implementation ---------------------------------------------------
-	
-	public synchronized boolean isStarted()
-	{
-	   return state != STATE_STOPPED;
-	}
-	
-	public synchronized void start()
-	{
-		if (state != STATE_STOPPED)
-		{
-			throw new IllegalStateException("Journal is not stopped");
-		}
-		
-		this.openExecutor =  Executors.newSingleThreadExecutor();
-		this.closingExecutor = Executors.newSingleThreadExecutor();
-		
-		state = STATE_STARTED;
-	}
-	
-	public synchronized void stop() throws Exception
-	{
-		if (state == STATE_STOPPED)
-		{
-			throw new IllegalStateException("Journal is already stopped");
-		}
-		
-		stopReclaimer();
-		
-		closingExecutor.shutdown();
-		if (!closingExecutor.awaitTermination(aioTimeout, TimeUnit.MILLISECONDS))
-		{
-		   throw new IllegalStateException("Time out waiting for closing executor to finish");
-		}
-		
-		if (currentFile != null)
-		{
-			currentFile.getFile().close();
-		}
-
-		openExecutor.shutdown();
+   
+   // MessagingComponent implementation ---------------------------------------------------
+   
+   public synchronized boolean isStarted()
+   {
+      return state != STATE_STOPPED;
+   }
+   
+   public synchronized void start()
+   {
+      if (state != STATE_STOPPED)
+      {
+         throw new IllegalStateException("Journal is not stopped");
+      }
+      
+      this.openExecutor =  Executors.newSingleThreadExecutor();
+      this.closingExecutor = Executors.newSingleThreadExecutor();
+      
+      state = STATE_STARTED;
+   }
+   
+   public synchronized void stop() throws Exception
+   {
+      if (state == STATE_STOPPED)
+      {
+         throw new IllegalStateException("Journal is already stopped");
+      }
+      
+      stopReclaimer();
+      
+      closingExecutor.shutdown();
+      if (!closingExecutor.awaitTermination(aioTimeout, TimeUnit.MILLISECONDS))
+      {
+         throw new IllegalStateException("Time out waiting for closing executor to finish");
+      }
+      
+      if (currentFile != null)
+      {
+         currentFile.getFile().close();
+      }
+      
+      openExecutor.shutdown();
       if (!openExecutor.awaitTermination(aioTimeout, TimeUnit.MILLISECONDS))
       {
          throw new IllegalStateException("Time out waiting for open executor to finish");
       }
       
-		for (JournalFile file: openedFiles)
-		{
-			file.getFile().close();
-		}
-		
-		currentFile = null;
-		
-		dataFiles.clear();
-		
-		freeFiles.clear();
-		
-		openedFiles.clear();
-		
-		state = STATE_STOPPED;
-	}
-	
-	public void startReclaimer()
-	{
-		if (state == STATE_STOPPED)
-		{
-			throw new IllegalStateException("Journal is stopped");
-		}
-		
-		reclaimerTask = new ReclaimerTask();
-		
-		timer.schedule(reclaimerTask, taskPeriod, taskPeriod);
-	}
-	
-	public void stopReclaimer()
-	{
-		if (state == STATE_STOPPED)
-		{
-			throw new IllegalStateException("Journal is already stopped");
-		}
-		
-		if (reclaimerTask != null)
-		{
-			reclaimerTask.cancel();
-		}
-	}
-	
-	// Public -----------------------------------------------------------------------------
-	
-	// Private -----------------------------------------------------------------------------
-
-	private JournalFile appendRecord(final ByteBuffer bb, final boolean sync, final TransactionCallback callback) throws Exception
-	{
-		lock.acquire();
-		
-		int size = bb.capacity();
-		
-		try
-		{                 
-			checkFile(size);
-			if (callback != null)
-			{
-			   currentFile.getFile().write(bb, callback);
-			   if (sync)
-			   {
-			      callback.waitCompletion(aioTimeout);
-			   }
-			}
-			else
-			{
-	         currentFile.getFile().write(bb, sync);       
-			}
-			currentFile.extendOffset(size);
-			return currentFile;
-		}
-		finally
-		{
-			lock.release();
-		}
-	}
-	
-	private void repairFrom(final int pos, final JournalFile file) throws Exception
-	{
-		log.warn("Corruption has been detected in file: " + file.getFile().getFileName() +
-				" in the record that starts at position " + pos + ". " + 
-		"The most likely cause is that a crash occurred in the previous run. The corrupt record will be discarded.");
-		
-		file.getFile().fill(pos, fileSize - pos, FILL_CHARACTER);
-		
-		file.getFile().position(pos);
-	}
-	
-	private JournalFile createFile(boolean keepOpened) throws Exception
-	{
-		long orderingID = generateOrderingID();
-		
-		String fileName = filePrefix + "-" + orderingID + "." + fileExtension;
-		
-		if (trace) log.trace("Creating file " + fileName);
-		
-		SequentialFile sequentialFile = fileFactory.createSequentialFile(fileName, maxAIO, aioTimeout);
-		
-		sequentialFile.open();
-		
-		sequentialFile.fill(0, fileSize, FILL_CHARACTER);
-		
-		ByteBuffer bb = fileFactory.newBuffer(SIZE_LONG); 
-		
-		bb.putLong(orderingID);
-		
-		bb.rewind();
-		
-		int bytesWritten = sequentialFile.write(bb, true);
-		
-		JournalFile info = new JournalFileImpl(sequentialFile, orderingID);
-		
-		info.extendOffset(bytesWritten);
-		
-		if (!keepOpened)
+      for (JournalFile file: openedFiles)
       {
+         file.getFile().close();
+      }
+      
+      currentFile = null;
+      
+      dataFiles.clear();
+      
+      freeFiles.clear();
+      
+      openedFiles.clear();
+      
+      state = STATE_STOPPED;
+   }
+   
+   public void startReclaimer()
+   {
+      if (state == STATE_STOPPED)
+      {
+         throw new IllegalStateException("Journal is stopped");
+      }
+      
+      reclaimerTask = new ReclaimerTask();
+      
+      timer.schedule(reclaimerTask, taskPeriod, taskPeriod);
+   }
+   
+   public void stopReclaimer()
+   {
+      if (state == STATE_STOPPED)
+      {
+         throw new IllegalStateException("Journal is already stopped");
+      }
+      
+      if (reclaimerTask != null)
+      {
+         reclaimerTask.cancel();
+      }
+   }
+   
+   // Public -----------------------------------------------------------------------------
+   
+   // Private -----------------------------------------------------------------------------
+   
+   private JournalFile appendRecord(final ByteBuffer bb, final boolean sync, final TransactionCallback callback) throws Exception
+   {
+      lock.acquire();
+      
+      int size = bb.capacity();
+      
+      try
+      {                 
+         checkFile(size);
+         if (callback != null)
+         {
+            currentFile.getFile().write(bb, callback);
+            if (sync)
+            {
+               callback.waitCompletion(aioTimeout);
+            }
+         }
+         else
+         {
+            currentFile.getFile().write(bb, sync);       
+         }
+         currentFile.extendOffset(size);
+         return currentFile;
+      }
+      finally
+      {
+         lock.release();
+      }
+   }
+   
+   private void repairFrom(final int pos, final JournalFile file) throws Exception
+   {
+      log.warn("Corruption has been detected in file: " + file.getFile().getFileName() +
+            " in the record that starts at position " + pos + ". " + 
+      "The most likely cause is that a crash occurred in the previous run. The corrupt record will be discarded.");
+      
+      file.getFile().fill(pos, fileSize - pos, FILL_CHARACTER);
+      
+      file.getFile().position(pos);
+   }
+   
+   private JournalFile createFile(boolean keepOpened) throws Exception
+   {
+      long orderingID = generateOrderingID();
+      
+      String fileName = filePrefix + "-" + orderingID + "." + fileExtension;
+      
+      if (trace) log.trace("Creating file " + fileName);
+      
+      SequentialFile sequentialFile = fileFactory.createSequentialFile(fileName, maxAIO, aioTimeout);
+      
+      sequentialFile.open();
+      
+      sequentialFile.fill(0, fileSize, FILL_CHARACTER);
+      
+      ByteBuffer bb = fileFactory.newBuffer(SIZE_LONG); 
+      
+      bb.putLong(orderingID);
+      
+      bb.rewind();
+      
+      int bytesWritten = sequentialFile.write(bb, true);
+      
+      JournalFile info = new JournalFileImpl(sequentialFile, orderingID);
+      
+      info.extendOffset(bytesWritten);
+      
+      if (!keepOpened)
+      {
          sequentialFile.close();
       }
-		
-		return info;
-	}
-	
-	private void openFile(JournalFile file) throws Exception
-	{
-	   file.getFile().open();
-	   file.getFile().position(file.getFile().calculateBlockStart(SIZE_LONG));
-	}
-	
-	private long generateOrderingID()
-	{
-		long orderingID = System.currentTimeMillis();
-		
-		while (orderingID == lastOrderingID)
-		{
-			//Ensure it's unique
-			try
-			{           
-				Thread.sleep(1);
-			}
-			catch (InterruptedException ignore)
-			{           
-			}
-			orderingID = System.currentTimeMillis();
-		}
-		lastOrderingID = orderingID;  
-		
-		return orderingID;
-	}
-
-	// You need to guarantee lock.acquire() over currentFile before calling this method
-	private void checkFile(final int size) throws Exception
-	{		
-		if (size % currentFile.getFile().getAlignment() != 0)
-		{
-			throw new IllegalStateException("You can't write blocks in a size different than " + currentFile.getFile().getAlignment());
-		}
-		
-		//We take into account the first timestamp long
-		if (size > fileSize - currentFile.getFile().calculateBlockStart(SIZE_HEADER))
-		{
-			throw new IllegalArgumentException("Record is too large to store " + size);
-		}
-		
-		if (currentFile == null || fileSize - currentFile.getOffset() < size)
-		{
-		   moveNextFile();
-
-		}     
-	}
-	
-	// You need to guarantee lock.acquire() before calling this method
+      
+      return info;
+   }
+   
+   private void openFile(JournalFile file) throws Exception
+   {
+      file.getFile().open();
+      file.getFile().position(file.getFile().calculateBlockStart(SIZE_LONG));
+   }
+   
+   private long generateOrderingID()
+   {
+      long orderingID = System.currentTimeMillis();
+      
+      while (orderingID == lastOrderingID)
+      {
+         //Ensure it's unique
+         try
+         {           
+            Thread.sleep(1);
+         }
+         catch (InterruptedException ignore)
+         {           
+         }
+         orderingID = System.currentTimeMillis();
+      }
+      lastOrderingID = orderingID;  
+      
+      return orderingID;
+   }
+   
+   // You need to guarantee lock.acquire() over currentFile before calling this method
+   private void checkFile(final int size) throws Exception
+   {		
+      if (size % currentFile.getFile().getAlignment() != 0)
+      {
+         throw new IllegalStateException("You can't write blocks in a size different than " + currentFile.getFile().getAlignment());
+      }
+      
+      //We take into account the first timestamp long
+      if (size > fileSize - currentFile.getFile().calculateBlockStart(SIZE_HEADER))
+      {
+         throw new IllegalArgumentException("Record is too large to store " + size);
+      }
+      
+      if (currentFile == null || fileSize - currentFile.getOffset() < size)
+      {
+         moveNextFile();
+         
+      }     
+   }
+   
+   // You need to guarantee lock.acquire() before calling this method
    private void moveNextFile() throws InterruptedException
    {
       closeFile(currentFile);
-
+      
       currentFile = enqueueOpenFile();
    }
-	
+   
    // You need to guarantee lock.acquire() before calling this method
-	private JournalFile enqueueOpenFile() throws InterruptedException
-	{
-	   if (trace) log.trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
-	   openExecutor.execute(new Runnable()
+   private JournalFile enqueueOpenFile() throws InterruptedException
+   {
+      if (trace) log.trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
+      openExecutor.execute(new Runnable()
       {
          public void run()
          {
@@ -1668,18 +1668,18 @@
             }
          }
       });
-	   
-	   JournalFile nextFile = openedFiles.poll(aioTimeout, TimeUnit.SECONDS);
-	   
-	   if (nextFile == null)
-	   {
+      
+      JournalFile nextFile = openedFiles.poll(aioTimeout, TimeUnit.SECONDS);
+      
+      if (nextFile == null)
+      {
          throw new IllegalStateException("Timed out waiting for an opened file");
-	   }
-	   
-	   return nextFile;
-	}
-	
-	
+      }
+      
+      return nextFile;
+   }
+   
+   
    /** 
     * 
     * Open a file and place it into the openedFiles queue
@@ -1694,7 +1694,7 @@
       catch (NoSuchElementException ignored)
       {
       }
-
+      
       if (nextOpenedFile == null)
       {
          nextOpenedFile = createFile(true);
@@ -1703,42 +1703,42 @@
       {
          openFile(nextOpenedFile);
       }
-
+      
       openedFiles.offer(nextOpenedFile);
    }
    
-	
-	private void closeFile(final JournalFile file)
-	{
-	   this.closingExecutor.execute(new Runnable() { public void run()
-	   {
-	      try
-	      {
-	         file.getFile().close();
-	      }
-	      catch (Exception e)
-	      {
-	         log.warn(e.getMessage(), e);
-	      }
-	      dataFiles.add(file);
-	   }
-	   });
-	}
-	
-	private TransactionNegPos getTransactionInfo(final long txID)
-	{
-		TransactionNegPos tx = transactionInfos.get(txID);
-		
-		if (tx == null)
-		{
-			tx = new TransactionNegPos();
-			
-			transactionInfos.put(txID, tx);
-		}
-		
-		return tx;
-	}
-	
+   
+   private void closeFile(final JournalFile file)
+   {
+      this.closingExecutor.execute(new Runnable() { public void run()
+      {
+         try
+         {
+            file.getFile().close();
+         }
+         catch (Exception e)
+         {
+            log.warn(e.getMessage(), e);
+         }
+         dataFiles.add(file);
+      }
+      });
+   }
+   
+   private TransactionNegPos getTransactionInfo(final long txID)
+   {
+      TransactionNegPos tx = transactionInfos.get(txID);
+      
+      if (tx == null)
+      {
+         tx = new TransactionNegPos();
+         
+         transactionInfos.put(txID, tx);
+      }
+      
+      return tx;
+   }
+   
    private TransactionCallback getTransactionCallback(final long transactionId)
    {
       if (fileFactory.isSupportsCallbacks() && syncTransactional)
@@ -1750,7 +1750,7 @@
             callback = new TransactionCallback();
             transactionCallbacks.put(transactionId, callback);
          }
-
+         
          callback.countUp();
          return callback;
       }
@@ -1760,8 +1760,8 @@
       }
    }
    
-	// Inner classes ---------------------------------------------------------------------------
-
+   // Inner classes ---------------------------------------------------------------------------
+   
    private static class TransactionCallback implements IOCallback
    {      
       private final VariableLatch countLatch = new VariableLatch();
@@ -1774,7 +1774,7 @@
       {
          countLatch.up();
       }
-
+      
       public void done()
       {
          countLatch.down();
@@ -1789,7 +1789,7 @@
             throw new IllegalStateException("Error on Transaction: " + errorCode + " - " + errorMessage);
          }
       }
-
+      
       public void onError(final int errorCode, final String errorMessage)
       {
          this.errorMessage = errorMessage;
@@ -1798,193 +1798,193 @@
       }
       
    }
-	
-	private class ReclaimerTask extends TimerTask
-	{
-		public synchronized boolean cancel()
-		{
-			timer.cancel();
-			
-			return super.cancel();
-		}
-		
-		public synchronized void run()
-		{
-			try
-			{
-				checkAndReclaimFiles();    
-			}
-			catch (Exception e)
-			{
-				log.error("Failure in running ReclaimerTask", e);
-				
-				cancel();
-			}
-		}     
-	}  
-	
-	private static class PosFiles
-	{
-		private final JournalFile addFile;
-		
-		private List<JournalFile> updateFiles;
-		
-		PosFiles(final JournalFile addFile)
-		{
-			this.addFile = addFile;
-			
-			addFile.incPosCount();
-		}
-		
-		void addUpdateFile(final JournalFile updateFile)
-		{
-			if (updateFiles == null)
-			{
-				updateFiles = new ArrayList<JournalFile>();
-			}
-			
-			updateFiles.add(updateFile);
-			
-			updateFile.incPosCount();
-		}
-		
-		void addDelete(final JournalFile file)
-		{
-			file.incNegCount(addFile);
-			
-			if (updateFiles != null)
-			{
-				for (JournalFile jf: updateFiles)
-				{
-					file.incNegCount(jf);
-				}
-			}
-		}
-	}
-	
-	private class TransactionNegPos
-	{
-		private List<Pair<JournalFile, Long>> pos;
-		
-		private List<Pair<JournalFile, Long>> neg;
-		
-		private Set<JournalFile> transactionPos;
-		
-		void addTXPosCount(final JournalFile file)
-		{
-			if (transactionPos == null)
-			{
-				transactionPos = new HashSet<JournalFile>();
-			}
-			
-			if (!transactionPos.contains(file))
-			{
-				transactionPos.add(file);
-				
-				//We add a pos for the transaction itself in the file - this prevents any transactional operations
-				//being deleted before a commit or rollback is written
-				file.incPosCount();
-			}  
-		}
-		
-		void addPos(final JournalFile file, final long id)
-		{     
-			addTXPosCount(file);          
-			
-			if (pos == null)
-			{
-				pos = new ArrayList<Pair<JournalFile, Long>>();
-			}
-			
-			pos.add(new Pair<JournalFile, Long>(file, id));
-		}
-		
-		void addNeg(final JournalFile file, final long id)
-		{        
-			addTXPosCount(file);    
-			
-			if (neg == null)
-			{
-				neg = new ArrayList<Pair<JournalFile, Long>>();
-			}
-			
-			neg.add(new Pair<JournalFile, Long>(file, id));       
-		}
-		
-		void commit(final JournalFile file)
-		{        
-			if (pos != null)
-			{
-				for (Pair<JournalFile, Long> p: pos)
-				{
-					PosFiles posFiles = posFilesMap.get(p.b);
-					
-					if (posFiles == null)
-					{
-						posFiles = new PosFiles(p.a);
-						
-						posFilesMap.put(p.b, posFiles);
-					}
-					else
-					{              
-						posFiles.addUpdateFile(p.a);
-					}
-				}
-			}
-			
-			if (neg != null)
-			{
-				for (Pair<JournalFile, Long> n: neg)
-				{
-					PosFiles posFiles = posFilesMap.remove(n.b);
-					
-					if (posFiles != null)
-					{
-						//throw new IllegalStateException("Cannot find add info " + n.b);
-						posFiles.addDelete(n.a);
-					}
-					
-				}
-			}
-			
-			//Now add negs for the pos we added in each file in which there were transactional operations
-			
-			for (JournalFile jf: transactionPos)
-			{
-				file.incNegCount(jf);
-			}        
-		}
-		
-		void rollback(JournalFile file)
-		{     
-			//Now add negs for the pos we added in each file in which there were transactional operations
-			//Note that we do this on rollback as we do on commit, since we need to ensure the file containing
-			//the rollback record doesn't get deleted before the files with the transactional operations are deleted
-			//Otherwise we may run into problems especially with XA where we are just left with a prepare when the tx
-			//has actually been rolled back
-			
-			for (JournalFile jf: transactionPos)
-			{
-				file.incNegCount(jf);
-			}
-		}
-		
-		void prepare(JournalFile file)
-		{
-			//We don't want the prepare record getting deleted before time
-			
-			addTXPosCount(file);
-		}
-		
-		void forget()
-		{
-			//The transaction was not committed or rolled back in the file, so we reverse any pos counts we added
-			
-			for (JournalFile jf: transactionPos)
-			{
-				jf.decPosCount();
-			}
-		}
-	}
-
+   
+   private class ReclaimerTask extends TimerTask
+   {
+      public synchronized boolean cancel()
+      {
+         timer.cancel();
+         
+         return super.cancel();
+      }
+      
+      public synchronized void run()
+      {
+         try
+         {
+            checkAndReclaimFiles();    
+         }
+         catch (Exception e)
+         {
+            log.error("Failure in running ReclaimerTask", e);
+            
+            cancel();
+         }
+      }     
+   }  
+   
+   private static class PosFiles
+   {
+      private final JournalFile addFile;
+      
+      private List<JournalFile> updateFiles;
+      
+      PosFiles(final JournalFile addFile)
+      {
+         this.addFile = addFile;
+         
+         addFile.incPosCount();
+      }
+      
+      void addUpdateFile(final JournalFile updateFile)
+      {
+         if (updateFiles == null)
+         {
+            updateFiles = new ArrayList<JournalFile>();
+         }
+         
+         updateFiles.add(updateFile);
+         
+         updateFile.incPosCount();
+      }
+      
+      void addDelete(final JournalFile file)
+      {
+         file.incNegCount(addFile);
+         
+         if (updateFiles != null)
+         {
+            for (JournalFile jf: updateFiles)
+            {
+               file.incNegCount(jf);
+            }
+         }
+      }
+   }
+   
+   private class TransactionNegPos
+   {
+      private List<Pair<JournalFile, Long>> pos;
+      
+      private List<Pair<JournalFile, Long>> neg;
+      
+      private Set<JournalFile> transactionPos;
+      
+      void addTXPosCount(final JournalFile file)
+      {
+         if (transactionPos == null)
+         {
+            transactionPos = new HashSet<JournalFile>();
+         }
+         
+         if (!transactionPos.contains(file))
+         {
+            transactionPos.add(file);
+            
+            //We add a pos for the transaction itself in the file - this prevents any transactional operations
+            //being deleted before a commit or rollback is written
+            file.incPosCount();
+         }  
+      }
+      
+      void addPos(final JournalFile file, final long id)
+      {     
+         addTXPosCount(file);          
+         
+         if (pos == null)
+         {
+            pos = new ArrayList<Pair<JournalFile, Long>>();
+         }
+         
+         pos.add(new Pair<JournalFile, Long>(file, id));
+      }
+      
+      void addNeg(final JournalFile file, final long id)
+      {        
+         addTXPosCount(file);    
+         
+         if (neg == null)
+         {
+            neg = new ArrayList<Pair<JournalFile, Long>>();
+         }
+         
+         neg.add(new Pair<JournalFile, Long>(file, id));       
+      }
+      
+      void commit(final JournalFile file)
+      {        
+         if (pos != null)
+         {
+            for (Pair<JournalFile, Long> p: pos)
+            {
+               PosFiles posFiles = posFilesMap.get(p.b);
+               
+               if (posFiles == null)
+               {
+                  posFiles = new PosFiles(p.a);
+                  
+                  posFilesMap.put(p.b, posFiles);
+               }
+               else
+               {              
+                  posFiles.addUpdateFile(p.a);
+               }
+            }
+         }
+         
+         if (neg != null)
+         {
+            for (Pair<JournalFile, Long> n: neg)
+            {
+               PosFiles posFiles = posFilesMap.remove(n.b);
+               
+               if (posFiles != null)
+               {
+                  //throw new IllegalStateException("Cannot find add info " + n.b);
+                  posFiles.addDelete(n.a);
+               }
+               
+            }
+         }
+         
+         //Now add negs for the pos we added in each file in which there were transactional operations
+         
+         for (JournalFile jf: transactionPos)
+         {
+            file.incNegCount(jf);
+         }        
+      }
+      
+      void rollback(JournalFile file)
+      {     
+         //Now add negs for the pos we added in each file in which there were transactional operations
+         //Note that we do this on rollback as we do on commit, since we need to ensure the file containing
+         //the rollback record doesn't get deleted before the files with the transactional operations are deleted
+         //Otherwise we may run into problems especially with XA where we are just left with a prepare when the tx
+         //has actually been rolled back
+         
+         for (JournalFile jf: transactionPos)
+         {
+            file.incNegCount(jf);
+         }
+      }
+      
+      void prepare(JournalFile file)
+      {
+         //We don't want the prepare record getting deleted before time
+         
+         addTXPosCount(file);
+      }
+      
+      void forget()
+      {
+         //The transaction was not committed or rolled back in the file, so we reverse any pos counts we added
+         
+         for (JournalFile jf: transactionPos)
+         {
+            jf.decPosCount();
+         }
+      }
+   }
+   
 }




More information about the jboss-cvs-commits mailing list