[jboss-cvs] JBoss Messaging SVN: r7504 - in branches/clebert_temp_expirement: src/main/org/jboss/messaging/core/journal/impl and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jun 30 18:30:19 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-06-30 18:30:18 -0400 (Tue, 30 Jun 2009)
New Revision: 7504

Added:
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalReaderCallbackAbstract.java
Modified:
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/SequentialFile.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
   branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java
   branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
Checking for control file on journal startup

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2009-06-30 17:42:50 UTC (rev 7503)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2009-06-30 22:30:18 UTC (rev 7504)
@@ -42,6 +42,8 @@
    void open() throws Exception;
    
    boolean isOpen();
+   
+   boolean exists();
 
    /**
     * For certain operations (like loading) we don't need open the file with full maxIO

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2009-06-30 17:42:50 UTC (rev 7503)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2009-06-30 22:30:18 UTC (rev 7504)
@@ -116,6 +116,11 @@
 
       return aioFile.getBlockSize();
    }
+   
+   public boolean exists()
+   {
+      return file.exists();
+   }
 
    public int calculateBlockStart(final int position) throws Exception
    {

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java	2009-06-30 17:42:50 UTC (rev 7503)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java	2009-06-30 22:30:18 UTC (rev 7504)
@@ -38,6 +38,7 @@
 import org.jboss.messaging.core.journal.SequentialFileFactory;
 import org.jboss.messaging.core.journal.impl.JournalImpl.JournalRecord;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.utils.ConcurrentHashSet;
 import org.jboss.messaging.utils.DataConstants;
 
@@ -51,6 +52,8 @@
 public class JournalCompactor implements JournalReaderCallback
 {
 
+   private static final String FILE_COMPACT_CONTROL = "journal-rename-control.ctr";
+
    private static final Logger log = Logger.getLogger(JournalCompactor.class);
 
    private final JournalImpl journal;
@@ -63,7 +66,7 @@
 
    private int fileID;
 
-   private ChannelBuffer channelWrapper;
+   private ChannelBuffer writingChannel;
 
    private int nextOrderingID;
 
@@ -83,6 +86,135 @@
     *  we cache those updates during compacting. As soon as we are done we take the right account. */
    private final LinkedList<CompactCommand> pendingCommands = new LinkedList<CompactCommand>();
 
+   /**
+    * @param tmpRenameFile
+    * @param files
+    * @param newFiles
+    */
+   public static SequentialFile writeControlFile(final SequentialFileFactory fileFactory,
+                                                 final List<JournalFile> files,
+                                                 final List<JournalFile> newFiles) throws Exception
+   {
+
+      System.out.println("****************************** controlFiles (deleted)");
+
+      for (JournalFile file : files)
+      {
+         System.out.println("To be deleted:" + file.getFile().getFileName());
+      }
+
+      System.out.println("****************************** controlFiles (renamed)");
+
+      for (JournalFile file : newFiles)
+      {
+         System.out.println("To be renamed:" + file.getFile().getFileName());
+      }
+
+      SequentialFile controlFile = fileFactory.createSequentialFile(FILE_COMPACT_CONTROL, 1);
+
+      try
+      {
+         controlFile.open(1);
+
+         ChannelBuffer renameBuffer = ChannelBuffers.dynamicBuffer(1);
+
+         renameBuffer.writeInt(-1);
+         renameBuffer.writeInt(-1);
+
+         MessagingBuffer filesToRename = ChannelBuffers.dynamicBuffer(1);
+
+         // DataFiles first
+
+         filesToRename.writeInt(files.size());
+
+         for (JournalFile file : files)
+         {
+            filesToRename.writeUTF(file.getFile().getFileName());
+         }
+
+         filesToRename.writeInt(newFiles.size());
+
+         for (JournalFile file : newFiles)
+         {
+            filesToRename.writeUTF(file.getFile().getFileName());
+         }
+
+         JournalImpl.writeAddRecord(-1,
+                                    1,
+                                    (byte)0,
+                                    new JournalImpl.ByteArrayEncoding(filesToRename.array()),
+                                    JournalImpl.SIZE_ADD_RECORD + filesToRename.array().length,
+                                    renameBuffer);
+
+         ByteBuffer writeBuffer = fileFactory.newBuffer(renameBuffer.writerIndex());
+
+         writeBuffer.put(renameBuffer.array(), 0, renameBuffer.writerIndex());
+
+         writeBuffer.rewind();
+
+         controlFile.write(writeBuffer, true);
+
+         return controlFile;
+      }
+      finally
+      {
+         controlFile.close();
+      }
+   }
+
+   public static SequentialFile readControlFile(final SequentialFileFactory fileFactory,
+                                                final List<String> dataFiles,
+                                                final List<String> newFiles) throws Exception
+   {
+      SequentialFile controlFile = fileFactory.createSequentialFile(FILE_COMPACT_CONTROL, 1);
+
+      if (controlFile.exists())
+      {
+         JournalFile file = new JournalFileImpl(controlFile, -1, -1);
+
+         final ArrayList<RecordInfo> records = new ArrayList<RecordInfo>();
+
+         JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallbackAbstract()
+         {
+            @Override
+            public void onReadAddRecord(final RecordInfo info) throws Exception
+            {
+               records.add(info);
+            }
+         });
+
+         if (records.size() == 0)
+         {
+            return null;
+         }
+         else
+         {
+            ChannelBuffer input = ChannelBuffers.wrappedBuffer(records.get(0).data);
+
+            int numberDataFiles = input.readInt();
+
+            for (int i = 0; i < numberDataFiles; i++)
+            {
+               dataFiles.add(input.readUTF());
+            }
+
+            int numberNewFiles = input.readInt();
+
+            for (int i = 0; i < numberNewFiles; i++)
+            {
+               newFiles.add(input.readUTF());
+            }
+
+         }
+
+         return controlFile;
+      }
+      else
+      {
+         return null;
+      }
+   }
+
    public List<JournalFile> getNewDataFiles()
    {
       return newDataFiles;
@@ -136,14 +268,20 @@
 
       /** If a delete comes for these records, while the compactor still working, we need to be able to take them into account for later deletes
        *  instead of throwing exceptions about non existent records */
-      for (long id : ids)
+      if (ids != null)
       {
-         recordsSnapshot.add(id);
+         for (long id : ids)
+         {
+            recordsSnapshot.add(id);
+         }
       }
 
-      for (long id : ids2)
+      if (ids2 != null)
       {
-         recordsSnapshot.add(id);
+         for (long id : ids2)
+         {
+            recordsSnapshot.add(id);
+         }
       }
    }
 
@@ -180,13 +318,13 @@
 
    private void checkSize(final int size) throws Exception
    {
-      if (channelWrapper == null)
+      if (writingChannel == null)
       {
          openFile();
       }
       else
       {
-         if (channelWrapper.writerIndex() + size > channelWrapper.capacity())
+         if (writingChannel.writerIndex() + size > writingChannel.capacity())
          {
             openFile();
          }
@@ -196,15 +334,15 @@
    /** Write pending output into file */
    public void flush() throws Exception
    {
-      if (channelWrapper != null)
+      if (writingChannel != null)
       {
          sequentialFile.position(0);
-         sequentialFile.write(channelWrapper.toByteBuffer(), true);
+         sequentialFile.write(writingChannel.toByteBuffer(), true);
          sequentialFile.close();
          newDataFiles.add(currentFile);
       }
 
-      channelWrapper = null;
+      writingChannel = null;
    }
 
    /**
@@ -238,12 +376,12 @@
 
          checkSize(size);
 
-         journal.writeAddRecord(fileID,
-                                info.id,
-                                info.getUserRecordType(),
-                                new JournalImpl.ByteArrayEncoding(info.data),
-                                size,
-                                channelWrapper);
+         JournalImpl.writeAddRecord(fileID,
+                                    info.id,
+                                    info.getUserRecordType(),
+                                    new JournalImpl.ByteArrayEncoding(info.data),
+                                    size,
+                                    writingChannel);
 
          newRecords.put(info.id, new JournalRecord(currentFile));
       }
@@ -261,13 +399,13 @@
 
          newTransaction.addPositive(currentFile, info.id);
 
-         journal.writeAddRecordTX(fileID,
-                                  transactionID,
-                                  info.id,
-                                  info.getUserRecordType(),
-                                  new JournalImpl.ByteArrayEncoding(info.data),
-                                  size,
-                                  channelWrapper);
+         JournalImpl.writeAddRecordTX(fileID,
+                                      transactionID,
+                                      info.id,
+                                      info.getUserRecordType(),
+                                      new JournalImpl.ByteArrayEncoding(info.data),
+                                      size,
+                                      writingChannel);
       }
       else
       {
@@ -306,12 +444,12 @@
 
          checkSize(size);
 
-         journal.writeDeleteRecordTransactional(fileID,
-                                                transactionID,
-                                                info.id,
-                                                new JournalImpl.ByteArrayEncoding(info.data),
-                                                size,
-                                                channelWrapper);
+         JournalImpl.writeDeleteRecordTransactional(fileID,
+                                                    transactionID,
+                                                    info.id,
+                                                    new JournalImpl.ByteArrayEncoding(info.data),
+                                                    size,
+                                                    writingChannel);
 
          newTransaction.addNegative(currentFile, info.id);
       }
@@ -334,14 +472,14 @@
 
          checkSize(size);
 
-         journal.writeTransaction(fileID,
-                                  JournalImpl.PREPARE_RECORD,
-                                  transactionID,
-                                  newTransaction,
-                                  new JournalImpl.ByteArrayEncoding(extraData),
-                                  size,
-                                  newTransaction.getCounter(currentFile),
-                                  channelWrapper);
+         JournalImpl.writeTransaction(fileID,
+                                      JournalImpl.PREPARE_RECORD,
+                                      transactionID,
+                                      newTransaction,
+                                      new JournalImpl.ByteArrayEncoding(extraData),
+                                      size,
+                                      newTransaction.getCounter(currentFile),
+                                      writingChannel);
 
          newTransaction.prepare(currentFile);
 
@@ -372,16 +510,18 @@
          {
             log.warn("Couldn't find addRecord information for record " + info.id + " during compacting");
          }
+         else
+         {
+            newRecord.addUpdateFile(currentFile);
+         }
 
-         journal.writeUpdateRecord(fileID,
-                                   info.id,
-                                   info.userRecordType,
-                                   new JournalImpl.ByteArrayEncoding(info.data),
-                                   size,
-                                   channelWrapper);
+         JournalImpl.writeUpdateRecord(fileID,
+                                       info.id,
+                                       info.userRecordType,
+                                       new JournalImpl.ByteArrayEncoding(info.data),
+                                       size,
+                                       writingChannel);
 
-         newRecord.addUpdateFile(currentFile);
-
       }
    }
 
@@ -395,13 +535,13 @@
 
          checkSize(size);
 
-         journal.writeUpdateRecordTX(fileID,
-                                     transactionID,
-                                     info.id,
-                                     info.userRecordType,
-                                     new JournalImpl.ByteArrayEncoding(info.data),
-                                     size,
-                                     channelWrapper);
+         JournalImpl.writeUpdateRecordTX(fileID,
+                                         transactionID,
+                                         info.id,
+                                         info.userRecordType,
+                                         new JournalImpl.ByteArrayEncoding(info.data),
+                                         size,
+                                         writingChannel);
 
          newTransaction.addPositive(currentFile, info.id);
       }
@@ -434,7 +574,7 @@
       flush();
 
       ByteBuffer bufferWrite = fileFactory.newBuffer(journal.getFileSize());
-      channelWrapper = ChannelBuffers.wrappedBuffer(bufferWrite);
+      writingChannel = ChannelBuffers.wrappedBuffer(bufferWrite);
 
       currentFile = journal.getFile(false, false, false);
       sequentialFile = currentFile.getFile();
@@ -443,8 +583,8 @@
       fileID = nextOrderingID++;
       currentFile = new JournalFileImpl(sequentialFile, fileID, fileID);
 
-      channelWrapper.writeInt(fileID);
-      channelWrapper.writeInt(fileID);
+      writingChannel.writeInt(fileID);
+      writingChannel.writeInt(fileID);
    }
 
    private static abstract class CompactCommand
@@ -473,7 +613,7 @@
       }
    }
 
-   private class PendingTransaction
+   private static class PendingTransaction
    {
       long pendingIDs[];
 

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-30 17:42:50 UTC (rev 7503)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-30 22:30:18 UTC (rev 7504)
@@ -107,8 +107,6 @@
       // log.trace(message);
    }
 
-   private static final String FILE_COMPACT_CONTROL = "journal-rename-control.ctr";
-
    // The sizes of primitive types
 
    public static final int MIN_FILE_SIZE = 1024;
@@ -117,25 +115,37 @@
 
    public static final int BASIC_SIZE = DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + DataConstants.SIZE_INT;
 
-   public static final int SIZE_ADD_RECORD = BASIC_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT /* + record.length */;
+   public static final int SIZE_ADD_RECORD = BASIC_SIZE + DataConstants.SIZE_LONG +
+                                             DataConstants.SIZE_BYTE +
+                                             DataConstants.SIZE_INT /* + record.length */;
 
    // Record markers - they must be all unique
 
    public static final byte ADD_RECORD = 11;
 
-   public static final byte SIZE_UPDATE_RECORD = BASIC_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT /* + record.length */;
+   public static final byte SIZE_UPDATE_RECORD = BASIC_SIZE + DataConstants.SIZE_LONG +
+                                                 DataConstants.SIZE_BYTE +
+                                                 DataConstants.SIZE_INT /* + record.length */;
 
    public static final byte UPDATE_RECORD = 12;
 
-   public static final int SIZE_ADD_RECORD_TX = BASIC_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_LONG + DataConstants.SIZE_INT /* + record.length */;
+   public static final int SIZE_ADD_RECORD_TX = BASIC_SIZE + DataConstants.SIZE_LONG +
+                                                DataConstants.SIZE_BYTE +
+                                                DataConstants.SIZE_LONG +
+                                                DataConstants.SIZE_INT /* + record.length */;
 
    public static final byte ADD_RECORD_TX = 13;
 
-   public static final int SIZE_UPDATE_RECORD_TX = BASIC_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_LONG + DataConstants.SIZE_INT /* + record.length */;
+   public static final int SIZE_UPDATE_RECORD_TX = BASIC_SIZE + DataConstants.SIZE_LONG +
+                                                   DataConstants.SIZE_BYTE +
+                                                   DataConstants.SIZE_LONG +
+                                                   DataConstants.SIZE_INT /* + record.length */;
 
    public static final byte UPDATE_RECORD_TX = 14;
 
-   public static final int SIZE_DELETE_RECORD_TX = BASIC_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_LONG + DataConstants.SIZE_INT /* + record.length */;
+   public static final int SIZE_DELETE_RECORD_TX = BASIC_SIZE + DataConstants.SIZE_LONG +
+                                                   DataConstants.SIZE_LONG +
+                                                   DataConstants.SIZE_INT /* + record.length */;
 
    public static final byte DELETE_RECORD_TX = 15;
 
@@ -143,7 +153,8 @@
 
    public static final byte DELETE_RECORD = 16;
 
-   public static final int SIZE_COMPLETE_TRANSACTION_RECORD = BASIC_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
+   public static final int SIZE_COMPLETE_TRANSACTION_RECORD = BASIC_SIZE + DataConstants.SIZE_LONG +
+                                                              DataConstants.SIZE_INT;
 
    public static final int SIZE_PREPARE_RECORD = SIZE_COMPLETE_TRANSACTION_RECORD + DataConstants.SIZE_INT;
 
@@ -258,14 +269,14 @@
 
       this.maxAIO = maxAIO;
    }
-   
+
    // Public methods (used by package members) (those are not part of the JournalImpl interface)
-   
+
    public Map<Long, JournalRecord> getRecords()
    {
       return records;
    }
-   
+
    public JournalFile getCurrentFile()
    {
       return currentFile;
@@ -839,7 +850,6 @@
          throw new IllegalStateException("There is pending compacting operation");
       }
 
-
       ArrayList<JournalFile> dataFilesToProcess = new ArrayList<JournalFile>(dataFiles.size());
 
       boolean previousReclaimValue = autoReclaim;
@@ -859,12 +869,15 @@
             autoReclaim = false;
 
             // Take the snapshots and replace the structures
- 
+
             dataFilesToProcess.addAll(dataFiles);
 
             dataFiles.clear();
 
-            this.compactor = new JournalCompactor(fileFactory, this, this.records.keySet(), dataFilesToProcess.get(0).getFileID());
+            this.compactor = new JournalCompactor(fileFactory,
+                                                  this,
+                                                  this.records.keySet(),
+                                                  dataFilesToProcess.get(0).getFileID());
 
             for (Map.Entry<Long, JournalTransaction> entry : transactions.entrySet())
             {
@@ -872,7 +885,8 @@
                entry.getValue().setCompacting();
             }
 
-            // We will calculate the new records during compacting, what will take the position the records will take after compacting
+            // We will calculate the new records during compacting, what will take the position the records will take
+            // after compacting
             this.records.clear();
          }
          finally
@@ -880,7 +894,8 @@
             compactingLock.writeLock().unlock();
          }
 
-         // Read the files, and use the JournalCompactor class to create the new outputFiles, and the new collections as well
+         // Read the files, and use the JournalCompactor class to create the new outputFiles, and the new collections as
+         // well
          JournalFile previousFile = null;
          for (final JournalFile file : dataFilesToProcess)
          {
@@ -895,7 +910,7 @@
             previousFile = file;
 
             log.info("Compacting file " + file.getFile().getFileName() + ", internalID = " + file.getFileID());
-            readJournalFile(file, compactor);
+            readJournalFile(fileFactory, file, compactor);
          }
 
          compactor.flush();
@@ -910,7 +925,7 @@
          List<JournalFile> newDatafiles = null;
 
          JournalCompactor localCompactor = compactor;
-         
+
          compactingLock.writeLock().lock();
          try
          {
@@ -935,31 +950,31 @@
                }
                dataFiles.addFirst(fileToAdd);
             }
-            
 
             // Replay pending commands (including updates, deletes and commits)
-            
+
             localCompactor.replayPendingCommands();
 
             // Merge transactions back after compacting
-            
+
             for (JournalTransaction newTransaction : localCompactor.getNewTransactions().values())
             {
-               if (trace) 
+               if (trace)
                {
-                  trace("Merging pending transaction " +  newTransaction + " after compacting to the journal");
+                  trace("Merging pending transaction " + newTransaction + " after compacting to the journal");
                }
                JournalTransaction liveTransaction = this.transactions.get(newTransaction.getId());
                if (liveTransaction == null)
                {
-                  log.warn("Inconsistency: Can't merge transaction " + newTransaction.getId() + " back into JournalTransactions");
+                  log.warn("Inconsistency: Can't merge transaction " + newTransaction.getId() +
+                           " back into JournalTransactions");
                }
                else
                {
                   liveTransaction.merge(newTransaction);
                }
             }
-            
+
             for (Map.Entry<Long, JournalRecord> record : records.entrySet())
             {
                trace("We have " + record.getKey() + " on the list now");
@@ -971,6 +986,7 @@
             compactingLock.writeLock().unlock();
          }
 
+         // At this point the journal is unlocked. We keep renaming files while the journal is already operational
          renameFiles(dataFilesToProcess, newDatafiles);
          deleteControlFile(controlFile);
 
@@ -1032,44 +1048,10 @@
     */
    protected SequentialFile createControlFile(List<JournalFile> files, List<JournalFile> newFiles) throws Exception
    {
-      SequentialFile tmpRenameFile = fileFactory.createSequentialFile(FILE_COMPACT_CONTROL, 1);
-
-      tmpRenameFile.open();
-
-      ChannelBuffer renameBuffer = ChannelBuffers.dynamicBuffer(1);
-
-      renameBuffer.writeInt(-1);
-      renameBuffer.writeInt(-1);
-
-      MessagingBuffer filesToRename = ChannelBuffers.dynamicBuffer(1);
-
-      // DataFiles first
-
-      filesToRename.writeInt(files.size());
-
-      for (JournalFile file : files)
-      {
-         filesToRename.writeUTF(file.getFile().getFileName());
-         filesToRename.writeInt(file.getFileID()); // The File ID that needs to be renamed
-      }
-
-      filesToRename.writeInt(newFiles.size());
-
-      for (JournalFile file : newFiles)
-      {
-         filesToRename.writeUTF(file.getFile().getFileName());
-      }
-
-      writeAddRecord(-1, 1, (byte)0, new ByteArrayEncoding(filesToRename.array()), SIZE_ADD_RECORD, renameBuffer);
-
-      tmpRenameFile.write(renameBuffer, true);
-
-      tmpRenameFile.close();
-
-      return tmpRenameFile;
+      return JournalCompactor.writeControlFile(fileFactory, files, newFiles);
    }
 
-   private boolean isInvalidSize(int bufferPos, int size)
+   private static boolean isInvalidSize(int fileSize, int bufferPos, int size)
    {
       if (size < 0)
       {
@@ -1125,6 +1107,8 @@
          throw new IllegalStateException("Journal must be in started state");
       }
 
+      checkControlFile();
+
       final Map<Long, TransactionHolder> loadTransactions = new LinkedHashMap<Long, TransactionHolder>();
 
       final List<JournalFile> orderedFiles = orderFiles();
@@ -1139,7 +1123,7 @@
 
          final AtomicBoolean hasData = new AtomicBoolean(false);
 
-         int resultLastPost = readJournalFile(file, new JournalReaderCallback()
+         int resultLastPost = readJournalFile(fileFactory, file, new JournalReaderCallback()
          {
 
             public void onReadAddRecord(RecordInfo info) throws Exception
@@ -1509,6 +1493,62 @@
       return maxID;
    }
 
+   /**
+    * @return
+    * @throws Exception
+    */
+   private void checkControlFile() throws Exception
+   {
+      ArrayList<String> dataFiles = new ArrayList<String>();
+      ArrayList<String> newFiles = new ArrayList<String>();
+
+      SequentialFile controlFile = JournalCompactor.readControlFile(fileFactory, dataFiles, newFiles);
+      if (controlFile != null)
+      {
+         log.info("Journal Compactor was interrupted during renaming phase, renaming files");
+
+         for (String dataFile : dataFiles)
+         {
+            SequentialFile file = fileFactory.createSequentialFile(dataFile, 1);
+            log.info("Removing old compacted file" + file.getFileName());
+            if (file.exists())
+            {
+               file.delete();
+            }
+         }
+
+         for (String newFile : newFiles)
+         {
+            SequentialFile file = fileFactory.createSequentialFile(newFile, 1);
+            log.info("Renaming file " + file.getFileName() + " as an part of the data files");
+            if (file.exists())
+            {
+               final String originalName = file.getFileName();
+               final String newName = originalName.substring(0, originalName.lastIndexOf(".cmp"));
+               file.renameTo(newName);
+            }
+         }
+         
+         controlFile.delete();
+      }
+      
+      
+      List<String> leftFiles = fileFactory.listFiles(this.getFileExtension() + ".cmp");
+      
+      if (leftFiles.size() > 0)
+      {
+         log.warn("Compacted files were left unnatended on journal directory, deleting invalid files now");
+         
+         for (String fileToDelete : leftFiles)
+         {
+            SequentialFile file = fileFactory.createSequentialFile(fileToDelete, 1);
+            file.delete();
+         }
+      }
+      
+      return;
+   }
+
    public int getAlignment() throws Exception
    {
       return fileFactory.getAlignment();
@@ -1827,7 +1867,7 @@
       return jf;
    }
 
-   public int readJournalFile(JournalFile file, JournalReaderCallback reader) throws Exception
+   public static int readJournalFile(SequentialFileFactory fileFactory, JournalFile file, JournalReaderCallback reader) throws Exception
    {
 
       file.getFile().open(1);
@@ -1837,9 +1877,9 @@
 
          wholeFileBuffer = fileFactory.newBuffer((int)file.getFile().size());
 
-         int bytesRead = file.getFile().read(wholeFileBuffer);
+         final int journalFileSize = file.getFile().read(wholeFileBuffer);
 
-         if (bytesRead != file.getFile().size())
+         if (journalFileSize != file.getFile().size())
          {
             throw new RuntimeException("Invalid read! The system couldn't read the entire file into memory");
          }
@@ -1865,7 +1905,7 @@
                continue;
             }
 
-            if (isInvalidSize(wholeFileBuffer.position(), DataConstants.SIZE_INT))
+            if (isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT))
             {
                reader.markAsDataFile(file);
 
@@ -1882,7 +1922,7 @@
 
             if (isTransaction(recordType))
             {
-               if (isInvalidSize(wholeFileBuffer.position(), DataConstants.SIZE_LONG))
+               if (isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_LONG))
                {
                   wholeFileBuffer.position(pos + 1);
                   reader.markAsDataFile(file);
@@ -1897,7 +1937,7 @@
             // If prepare or commit
             if (!isCompleteTransaction(recordType))
             {
-               if (isInvalidSize(wholeFileBuffer.position(), DataConstants.SIZE_LONG))
+               if (isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_LONG))
                {
                   wholeFileBuffer.position(pos + 1);
                   reader.markAsDataFile(file);
@@ -1923,7 +1963,7 @@
 
             if (isContainsBody(recordType))
             {
-               if (isInvalidSize(wholeFileBuffer.position(), DataConstants.SIZE_INT))
+               if (isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT))
                {
                   wholeFileBuffer.position(pos + 1);
                   reader.markAsDataFile(file);
@@ -1932,7 +1972,7 @@
 
                variableSize = wholeFileBuffer.getInt();
 
-               if (isInvalidSize(wholeFileBuffer.position(), variableSize))
+               if (isInvalidSize(journalFileSize, wholeFileBuffer.position(), variableSize))
                {
                   wholeFileBuffer.position(pos + 1);
                   continue;
@@ -1969,7 +2009,7 @@
             // VI - this is completing V, We will validate the size at the end
             // of the record,
             // But we avoid buffer overflows by damaged data
-            if (isInvalidSize(pos, recordSize + variableSize + preparedTransactionExtraDataSize))
+            if (isInvalidSize(journalFileSize, pos, recordSize + variableSize + preparedTransactionExtraDataSize))
             {
                // Avoid a buffer overflow caused by damaged data... continue
                // scanning for more pendingTransactions...
@@ -1995,7 +2035,10 @@
 
             int oldPos = wholeFileBuffer.position();
 
-            wholeFileBuffer.position(pos + variableSize + recordSize + preparedTransactionExtraDataSize - DataConstants.SIZE_INT);
+            wholeFileBuffer.position(pos + variableSize +
+                                     recordSize +
+                                     preparedTransactionExtraDataSize -
+                                     DataConstants.SIZE_INT);
 
             int checkSize = wholeFileBuffer.getInt();
 
@@ -2182,14 +2225,14 @@
     * @return
     * @throws Exception
     */
-   void writeTransaction(final int fileID,
-                                 final byte recordType,
-                                 final long txID,
-                                 final JournalTransaction tx,
-                                 final EncodingSupport transactionData,
-                                 final int size,
-                                 final int numberOfRecords,
-                                 final ChannelBuffer bb) throws Exception
+   public static void writeTransaction(final int fileID,
+                                       final byte recordType,
+                                       final long txID,
+                                       final JournalTransaction tx,
+                                       final EncodingSupport transactionData,
+                                       final int size,
+                                       final int numberOfRecords,
+                                       final ChannelBuffer bb) throws Exception
    {
       bb.writeByte(recordType);
       bb.writeInt(fileID); // skip ID part
@@ -2217,13 +2260,13 @@
     * @param size
     * @param bb
     */
-   void writeUpdateRecordTX(final int fileID,
-                                    final long txID,
-                                    final long id,
-                                    final byte recordType,
-                                    final EncodingSupport record,
-                                    int size,
-                                    ChannelBuffer bb)
+   public static void writeUpdateRecordTX(final int fileID,
+                                          final long txID,
+                                          final long id,
+                                          final byte recordType,
+                                          final EncodingSupport record,
+                                          int size,
+                                          ChannelBuffer bb)
    {
       bb.writeByte(UPDATE_RECORD_TX);
       bb.writeInt(fileID);
@@ -2242,12 +2285,12 @@
     * @param size
     * @param bb
     */
-   void writeUpdateRecord(final int fileId,
-                                  final long id,
-                                  final byte recordType,
-                                  final EncodingSupport record,
-                                  int size,
-                                  ChannelBuffer bb)
+   public static void writeUpdateRecord(final int fileId,
+                                        final long id,
+                                        final byte recordType,
+                                        final EncodingSupport record,
+                                        int size,
+                                        ChannelBuffer bb)
    {
       bb.writeByte(UPDATE_RECORD);
       bb.writeInt(fileId);
@@ -2265,12 +2308,12 @@
     * @param size
     * @param bb
     */
-   void writeAddRecord(final int fileId,
-                               final long id,
-                               final byte recordType,
-                               final EncodingSupport record,
-                               int size,
-                               ChannelBuffer bb)
+   public static void writeAddRecord(final int fileId,
+                                     final long id,
+                                     final byte recordType,
+                                     final EncodingSupport record,
+                                     int size,
+                                     ChannelBuffer bb)
    {
       bb.writeByte(ADD_RECORD);
       bb.writeInt(fileId);
@@ -2288,12 +2331,12 @@
     * @param size
     * @param bb
     */
-   void writeDeleteRecordTransactional(final int fileID,
-                                               final long txID,
-                                               final long id,
-                                               final EncodingSupport record,
-                                               int size,
-                                               ChannelBuffer bb)
+   public static void writeDeleteRecordTransactional(final int fileID,
+                                                     final long txID,
+                                                     final long id,
+                                                     final EncodingSupport record,
+                                                     int size,
+                                                     ChannelBuffer bb)
    {
       bb.writeByte(DELETE_RECORD_TX);
       bb.writeInt(fileID);
@@ -2316,13 +2359,13 @@
     * @param size
     * @param bb
     */
-   void writeAddRecordTX(final int fileID,
-                                 final long txID,
-                                 final long id,
-                                 final byte recordType,
-                                 final EncodingSupport record,
-                                 int size,
-                                 ChannelBuffer bb)
+   public static void writeAddRecordTX(final int fileID,
+                                       final long txID,
+                                       final long id,
+                                       final byte recordType,
+                                       final EncodingSupport record,
+                                       int size,
+                                       ChannelBuffer bb)
    {
       bb.writeByte(ADD_RECORD_TX);
       bb.writeInt(fileID);
@@ -2334,24 +2377,24 @@
       bb.writeInt(size);
    }
 
-   private boolean isTransaction(final byte recordType)
+   private static boolean isTransaction(final byte recordType)
    {
       return recordType == ADD_RECORD_TX || recordType == UPDATE_RECORD_TX ||
              recordType == DELETE_RECORD_TX ||
              isCompleteTransaction(recordType);
    }
 
-   private boolean isCompleteTransaction(final byte recordType)
+   private static boolean isCompleteTransaction(final byte recordType)
    {
       return recordType == COMMIT_RECORD || recordType == PREPARE_RECORD || recordType == ROLLBACK_RECORD;
    }
 
-   private boolean isContainsBody(final byte recordType)
+   private static boolean isContainsBody(final byte recordType)
    {
       return recordType >= ADD_RECORD && recordType <= DELETE_RECORD_TX;
    }
 
-   private int getRecordSize(final byte recordType)
+   private static int getRecordSize(final byte recordType)
    {
       // The record size (without the variable portion)
       int recordSize = 0;
@@ -2572,14 +2615,14 @@
       if (fill)
       {
          sequentialFile.fill(0, fileSize, FILL_CHARACTER);
-   
+
          ByteBuffer bb = fileFactory.newBuffer(SIZE_HEADER);
-   
+
          bb.putInt(fileID);
          bb.putInt(fileID);
-   
+
          bb.rewind();
-   
+
          sequentialFile.write(bb, true);
       }
 

Added: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalReaderCallbackAbstract.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalReaderCallbackAbstract.java	                        (rev 0)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalReaderCallbackAbstract.java	2009-06-30 22:30:18 UTC (rev 7504)
@@ -0,0 +1,78 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.journal.impl;
+
+import org.jboss.messaging.core.journal.RecordInfo;
+
+/**
+ * A JournalReaderCallbackAbstract
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalReaderCallbackAbstract implements JournalReaderCallback
+{
+
+   public void markAsDataFile(JournalFile file)
+   {
+   }
+
+   public void onReadAddRecord(RecordInfo info) throws Exception
+   {
+   }
+
+   public void onReadAddRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+   {
+   }
+
+   public void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception
+   {
+   }
+
+   public void onReadDeleteRecord(long recordID) throws Exception
+   {
+   }
+
+   public void onReadDeleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+   {
+   }
+
+   public void onReadPrepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
+   {
+   }
+
+   public void onReadRollbackRecord(long transactionID) throws Exception
+   {
+   }
+
+   public void onReadUpdateRecord(RecordInfo recordInfo) throws Exception
+   {
+   }
+
+   public void onReadUpdateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+   {
+   }
+
+}

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2009-06-30 17:42:50 UTC (rev 7503)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2009-06-30 22:30:18 UTC (rev 7504)
@@ -46,7 +46,7 @@
    private static final Logger log = Logger.getLogger(NIOSequentialFile.class);
 
    private File file;
-   
+
    private long fileSize = 0;
 
    private final String directory;
@@ -54,7 +54,7 @@
    private FileChannel channel;
 
    private RandomAccessFile rfile;
-   
+
    private final AtomicLong position = new AtomicLong(0);
 
    public NIOSequentialFile(final String directory, final String fileName)
@@ -63,11 +63,16 @@
       file = new File(directory + "/" + fileName);
    }
 
+   public boolean exists()
+   {
+      return file.exists();
+   }
+
    public int getAlignment()
    {
       return 1;
    }
-   
+
    public void flush()
    {
    }
@@ -76,7 +81,7 @@
    {
       return position;
    }
-   
+
    public boolean fits(final int size)
    {
       return this.position.get() + size <= fileSize;
@@ -97,7 +102,7 @@
       rfile = new RandomAccessFile(file, "rw");
 
       channel = rfile.getChannel();
-      
+
       fileSize = channel.size();
    }
 
@@ -124,10 +129,10 @@
       channel.force(false);
 
       channel.position(0);
-      
+
       fileSize = channel.size();
    }
-   
+
    public synchronized void waitForClose() throws Exception
    {
       while (isOpen())
@@ -151,7 +156,7 @@
       channel = null;
 
       rfile = null;
-      
+
       notifyAll();
    }
 
@@ -194,7 +199,6 @@
 
    }
 
-   
    public void write(final MessagingBuffer bytes, final boolean sync) throws Exception
    {
       write(ByteBuffer.wrap(bytes.array()), sync);
@@ -212,7 +216,7 @@
       channel.write(bytes);
 
       if (sync)
-      {         
+      {
          sync();
       }
    }
@@ -222,9 +226,9 @@
       try
       {
          position.addAndGet(bytes.limit());
-         
+
          channel.write(bytes);
-         
+
          if (sync)
          {
             sync();

Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java	2009-06-30 17:42:50 UTC (rev 7503)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java	2009-06-30 22:30:18 UTC (rev 7504)
@@ -24,9 +24,15 @@
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 
+import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.JournalCompactor;
+import org.jboss.messaging.core.journal.impl.JournalFile;
+import org.jboss.messaging.core.journal.impl.JournalFileImpl;
 import org.jboss.messaging.core.journal.impl.JournalImpl;
 import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
 import org.jboss.messaging.core.logging.Logger;
@@ -55,10 +61,55 @@
    // General tests
    // =============
 
+   public void testControlFile() throws Exception
+   {
+      ArrayList<JournalFile> dataFiles = new ArrayList<JournalFile>();
+
+      for (int i = 0; i < 5; i++)
+      {
+         SequentialFile file = fileFactory.createSequentialFile("file-" + i + ".tst", 1);
+         dataFiles.add(new JournalFileImpl(file, 0, 0));
+      }
+
+      ArrayList<JournalFile> newFiles = new ArrayList<JournalFile>();
+
+      for (int i = 0; i < 3; i++)
+      {
+         SequentialFile file = fileFactory.createSequentialFile("file-" + i + ".tst.new", 1);
+         newFiles.add(new JournalFileImpl(file, 0, 0));
+      }
+
+      JournalCompactor.writeControlFile(fileFactory, dataFiles, newFiles);
+
+      ArrayList<String> strDataFiles = new ArrayList<String>();
+
+      ArrayList<String> strNewFiles = new ArrayList<String>();
+
+      assertNotNull(JournalCompactor.readControlFile(fileFactory, strDataFiles, strNewFiles));
+
+      assertEquals(dataFiles.size(), strDataFiles.size());
+      assertEquals(newFiles.size(), strNewFiles.size());
+
+      Iterator<String> iterDataFiles = strDataFiles.iterator();
+      for (JournalFile file : dataFiles)
+      {
+         assertEquals(file.getFile().getFileName(), iterDataFiles.next());
+      }
+
+      assertFalse(iterDataFiles.hasNext());
+
+      Iterator<String> iterNewFiles = strNewFiles.iterator();
+      for (JournalFile file : newFiles)
+      {
+         assertEquals(file.getFile().getFileName(), iterNewFiles.next());
+      }
+   }
+
    public void testCrashRenamingFiles() throws Exception
    {
+      internalCompactTest(true, false, false, false, false, false, false, false, true, false, false);
    }
-   
+
    public void testCrashDuringCompacting() throws Exception
    {
    }
@@ -73,43 +124,43 @@
 
    public void testCompactwithPendingCommit() throws Exception
    {
-      internalCompactTest(false, false, false, false, false, true, false, false);
+      internalCompactTest(false, false, false, false, false, true, false, false, true, true, true);
    }
 
    public void testCompactwithDelayedCommit() throws Exception
    {
-      internalCompactTest(false, false, false, false, false, true, false, true);
+      internalCompactTest(false, false, false, false, false, true, false, true, true, true, true);
    }
 
    public void testCompactwithPendingCommitFollowedByDelete() throws Exception
    {
-      internalCompactTest(false, false, false, false, false, true, true, false);
+      internalCompactTest(false, false, false, false, false, true, true, false, true, true, true);
    }
 
    public void testCompactwithConcurrentUpdateAndDeletes() throws Exception
    {
-      internalCompactTest(true, false, true, true, false, false, false, false);
+      internalCompactTest(true, false, true, true, false, false, false, false, true, true, true);
       tearDown();
       setUp();
-      internalCompactTest(true, false, true, false, true, false, false, false);
+      internalCompactTest(true, false, true, false, true, false, false, false, true, true, true);
    }
 
    public void testCompactwithConcurrentDeletes() throws Exception
    {
-      internalCompactTest(true, false, false, true, false, false, false, false);
+      internalCompactTest(true, false, false, true, false, false, false, false, true, true, true);
       tearDown();
       setUp();
-      internalCompactTest(true, false, false, false, true, false, false, false);
+      internalCompactTest(true, false, false, false, true, false, false, false, true, true, true);
    }
 
    public void testCompactwithConcurrentUpdates() throws Exception
    {
-      internalCompactTest(true, false, true, false, false, false, false, false);
+      internalCompactTest(true, false, true, false, false, false, false, false, true, true, true);
    }
 
    public void testCompactWithConcurrentAppend() throws Exception
    {
-      internalCompactTest(true, true, false, false, false, false, false, false);
+      internalCompactTest(true, true, false, false, false, false, false, false, true, true, true);
    }
 
    private void internalCompactTest(final boolean regularAdd,
@@ -119,7 +170,10 @@
                                     boolean performNonTransactionalDelete,
                                     final boolean pendingTransactions,
                                     final boolean deleteTransactRecords,
-                                    final boolean delayCommit) throws Exception
+                                    final boolean delayCommit,
+                                    final boolean createControlFile,
+                                    final boolean deleteControlFile,
+                                    final boolean renameFilesAfterCompacting) throws Exception
    {
       if (performNonTransactionalDelete)
       {
@@ -129,7 +183,7 @@
       {
          performNonTransactionalDelete = false;
       }
-      
+
       setup(50, 60 * 1024, true);
 
       ArrayList<Long> liveIDs = new ArrayList<Long>();
@@ -140,7 +194,39 @@
       final CountDownLatch latchWait = new CountDownLatch(1);
       journal = new JournalImpl(fileSize, minFiles, fileFactory, filePrefix, fileExtension, maxAIO)
       {
+
          @Override
+         protected SequentialFile createControlFile(List<JournalFile> files, List<JournalFile> newFiles) throws Exception
+         {
+            if (createControlFile)
+            {
+               return super.createControlFile(files, newFiles);
+            }
+            else
+            {
+               throw new IllegalStateException("Error creating control file");
+            }
+         }
+
+         @Override
+         protected void deleteControlFile(SequentialFile controlFile) throws Exception
+         {
+            if (deleteControlFile)
+            {
+               super.deleteControlFile(controlFile);
+            }
+         }
+
+         @Override
+         protected void renameFiles(List<JournalFile> oldFiles, List<JournalFile> newFiles) throws Exception
+         {
+            if (renameFilesAfterCompacting)
+            {
+               super.renameFiles(oldFiles, newFiles);
+            }
+         }
+
+         @Override
          public void onCompactDone()
          {
             latchDone.countDown();
@@ -335,7 +421,7 @@
       }
 
       /** Some independent adds and updates */
-       for (int i = 0; i < 1000; i++)
+      for (int i = 0; i < 1000; i++)
       {
          long id = idGenerator.generateID();
          add(id);
@@ -345,7 +431,7 @@
          {
             journal.forceMoveNextFile();
          }
-       }
+      }
       journal.forceMoveNextFile();
 
       latchWait.countDown();
@@ -374,7 +460,10 @@
 
       add(idGenerator.generateID());
 
-      journal.compact();
+      if (createControlFile && deleteControlFile && renameFilesAfterCompacting)
+      {
+         journal.compact();
+      }
 
       stopJournal();
       createJournal();
@@ -398,7 +487,7 @@
 
    public void testSimpleCompacting() throws Exception
    {
-      setup(50, 60 * 1024, true);
+      setup(2, 60 * 1024, true);
 
       createJournal();
       startJournal();

Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-06-30 17:42:50 UTC (rev 7503)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-06-30 22:30:18 UTC (rev 7504)
@@ -340,10 +340,10 @@
          {
             data.position(0);
          }
-         
+
          this.notifyAll();
       }
-      
+
       public synchronized void waitForClose() throws Exception
       {
          while (open)
@@ -352,8 +352,6 @@
          }
       }
 
-
-
       public void delete() throws Exception
       {
          if (!open)
@@ -613,7 +611,7 @@
       public void write(MessagingBuffer bytes, boolean sync, IOCallback callback) throws Exception
       {
          write(ByteBuffer.wrap(bytes.array()), sync, callback);
-         
+
       }
 
       /* (non-Javadoc)
@@ -624,6 +622,14 @@
          write(ByteBuffer.wrap(bytes.array()), sync);
       }
 
+      /* (non-Javadoc)
+       * @see org.jboss.messaging.core.journal.SequentialFile#exists()
+       */
+      public boolean exists()
+      {
+         return fileMap.get(fileName) != null;
+      }
+
    }
 
    /* (non-Javadoc)




More information about the jboss-cvs-commits mailing list