[jboss-cvs] JBoss Messaging SVN: r7460 - in branches/clebert_temp_expirement: tests/src/org/jboss/messaging/tests/unit/core/journal/impl and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jun 25 00:07:44 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-06-25 00:07:44 -0400 (Thu, 25 Jun 2009)
New Revision: 7460

Added:
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalReader.java
Modified:
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
   branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
Log:
changes

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2009-06-24 21:37:13 UTC (rev 7459)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2009-06-25 04:07:44 UTC (rev 7460)
@@ -262,7 +262,10 @@
     */
    public void renameTo(String newFileName) throws Exception
    {
-      close();
+      if (isOpen())
+      {
+         close();
+      }
       File newFile = new File(directory + "/" + newFileName);
       file.renameTo(newFile);
       file = newFile;

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java	2009-06-24 21:37:13 UTC (rev 7459)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java	2009-06-25 04:07:44 UTC (rev 7460)
@@ -56,7 +56,7 @@
    boolean isCanReclaim();
 
    long getOffset();
-
+   
    int getFileID();
    
    int getOrderingID();

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-24 21:37:13 UTC (rev 7459)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-25 04:07:44 UTC (rev 7460)
@@ -286,20 +286,11 @@
 
       try
       {
+         int size = SIZE_ADD_RECORD + record.getEncodeSize();
 
-         int recordLength = record.getEncodeSize();
-
-         int size = SIZE_ADD_RECORD + recordLength;
-
          ChannelBuffer bb = newBuffer(size);
 
-         bb.writeByte(ADD_RECORD);
-         bb.writeInt(-1); // skip ID part
-         bb.writeLong(id);
-         bb.writeInt(recordLength);
-         bb.writeByte(recordType);
-         record.encode(bb);
-         bb.writeInt(size);
+         writeAddRecord(id, -1, recordType, record, size, bb); // fileID will be filled later
 
          callback = getSyncCallback(sync);
 
@@ -682,8 +673,11 @@
       try
       {
 
-         ChannelBuffer bb = writeTransaction(PREPARE_RECORD, txID, tx, transactionData);
+         int size = SIZE_COMPLETE_TRANSACTION_RECORD + transactionData.getEncodeSize() + SIZE_INT;
+         ChannelBuffer bb = newBuffer(size);
 
+         writeTransaction(PREPARE_RECORD, txID, tx, transactionData, size, bb);
+
          lock.acquire();
          try
          {
@@ -742,8 +736,10 @@
             throw new IllegalStateException("Cannot find tx with id " + txID);
          }
 
-         ChannelBuffer bb = writeTransaction(COMMIT_RECORD, txID, tx, null);
+         ChannelBuffer bb = newBuffer(SIZE_COMPLETE_TRANSACTION_RECORD);
 
+         writeTransaction(COMMIT_RECORD, txID, tx, null, SIZE_COMPLETE_TRANSACTION_RECORD, bb);
+
          lock.acquire();
          try
          {
@@ -924,8 +920,10 @@
       for (final JournalFile file : dataFilesToProcess)
       {
          readJournalFile(file, compactor);
-
       }
+      
+      
+      compactor.flushBuffer();
 
       writeLockCompact.lock();
       try
@@ -947,7 +945,11 @@
 
       SequentialFile sequentialFile;
 
+      int fileID;
+      
       ByteBuffer bufferWrite;
+      
+      ChannelBuffer channelWrapper;
 
       int nextOrderingID;
 
@@ -963,21 +965,18 @@
       {
          if (bufferWrite == null)
          {
-            flushFile();
+            openFile();
          }
          else
          {
             if (bufferWrite.position() + size > bufferWrite.limit())
             {
-               flushFile();
+               openFile();
             }
          }
       }
 
-      /**
-       * @throws Exception
-       */
-      private void flushFile() throws Exception
+      public void flushBuffer() throws Exception
       {
          if (bufferWrite != null)
          {
@@ -986,10 +985,27 @@
             sequentialFile.close();
          }
 
+         bufferWrite = null;
+      }
+
+      /**
+       * @throws Exception
+       */
+      private void openFile() throws Exception
+      {
+         flushBuffer();
+
          bufferWrite = fileFactory.newBuffer(fileSize);
-         currentOutputFile = openFile(false);
+         channelWrapper = ChannelBuffers.wrappedBuffer(bufferWrite);
+
+         currentOutputFile = getFile(false, false);
          sequentialFile = currentOutputFile.getFile();
-         bufferWrite.putInt(currentOutputFile.getFileID());
+         sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
+         sequentialFile.open(1);
+         currentOutputFile = new JournalFileImpl(sequentialFile, nextOrderingID, nextOrderingID);
+         fileID = nextOrderingID;
+         System.out.println("Next OrderingID = " + nextOrderingID);
+         bufferWrite.putInt(nextOrderingID);
          bufferWrite.putInt(nextOrderingID++);
       }
 
@@ -997,6 +1013,13 @@
       {
          if (recordsSnapshot.get(info.id) != null)
          {
+            int size = SIZE_ADD_RECORD + info.data.length;
+            
+            checkSize(size);
+            
+            writeAddRecord(info.id, fileID, info.getUserRecordType(), new ByteArrayEncoding(info.data), size, channelWrapper);
+
+            
             System.out.println("Record " + info.id + " to be out on compacted file");
          }
       }
@@ -1318,10 +1341,7 @@
                      throw new IllegalStateException("Cannot find tx " + transactionID);
                   }
 
-                  boolean healthy = checkTransactionHealth(file,
-                                                           journalTransaction,
-                                                           orderedFiles,
-                                                           numberOfRecords);
+                  boolean healthy = checkTransactionHealth(file, journalTransaction, orderedFiles, numberOfRecords);
 
                   if (healthy)
                   {
@@ -1800,7 +1820,7 @@
       return jf;
    }
 
-   private int readJournalFile(JournalFile file, JournalReader reader) throws Exception
+   public int readJournalFile(JournalFile file, JournalReader reader) throws Exception
    {
       ByteBuffer wholeFileBuffer = fileFactory.newBuffer(fileSize);
 
@@ -1925,7 +1945,7 @@
 
             wholeFileBuffer.get(record);
          }
-         
+
          // Case this is a transaction, this will contain the number of records on a transaction, at the currentFile
          int transactionCheckNumberOfRecords = 0;
 
@@ -2152,16 +2172,13 @@
     * @return
     * @throws Exception
     */
-   private ChannelBuffer writeTransaction(final byte recordType,
-                                          final long txID,
-                                          final JournalTransaction tx,
-                                          final EncodingSupport transactionData) throws Exception
+   private void writeTransaction(final byte recordType,
+                                 final long txID,
+                                 final JournalTransaction tx,
+                                 final EncodingSupport transactionData,
+                                 final int size,
+                                 final ChannelBuffer bb) throws Exception
    {
-      int size = SIZE_COMPLETE_TRANSACTION_RECORD + (transactionData != null ? transactionData.getEncodeSize() + SIZE_INT
-                                                                            : 0);
-
-      ChannelBuffer bb = newBuffer(size);
-
       bb.writeByte(recordType);
       bb.writeInt(-1); // skip ID part
       bb.writeLong(txID);
@@ -2178,8 +2195,29 @@
       }
 
       bb.writeInt(size);
+   }
 
-      return bb;
+   /**
+    * @param id
+    * @param recordType
+    * @param record
+    * @param size
+    * @param bb
+    */
+   private void writeAddRecord(final long id,
+                               final int fileId,
+                               final byte recordType,
+                               final EncodingSupport record,
+                               int size,
+                               ChannelBuffer bb)
+   {
+      bb.writeByte(ADD_RECORD);
+      bb.writeInt(fileId);
+      bb.writeLong(id);
+      bb.writeInt(record.getEncodeSize());
+      bb.writeByte(recordType);
+      record.encode(bb);
+      bb.writeInt(size);
    }
 
    private boolean isTransaction(final byte recordType)
@@ -2371,7 +2409,8 @@
             if (sync)
             {
                // 99 % of the times this will be already synced, as previous files should be closed already.
-               // This is to have 100% guarantee the transaction will be persisted and no loss of information would happen
+               // This is to have 100% guarantee the transaction will be persisted and no loss of information would
+               // happen
                tx.syncPreviousFiles();
             }
 
@@ -2382,7 +2421,6 @@
             }
          }
 
-         
          // Adding fileID
          bb.writerIndex(SIZE_BYTE);
 
@@ -2550,7 +2588,7 @@
     * */
    private void pushOpenedFile() throws Exception
    {
-      JournalFile nextOpenedFile = openFile(true);
+      JournalFile nextOpenedFile = getFile(true, true);
 
       openedFiles.offer(nextOpenedFile);
    }
@@ -2559,7 +2597,7 @@
     * @return
     * @throws Exception
     */
-   private JournalFile openFile(boolean multiAIO) throws Exception
+   private JournalFile getFile(boolean keepOpened, boolean multiAIO) throws Exception
    {
       JournalFile nextOpenedFile = null;
       try
@@ -2572,11 +2610,14 @@
 
       if (nextOpenedFile == null)
       {
-         nextOpenedFile = createFile(true, multiAIO);
+         nextOpenedFile = createFile(keepOpened, multiAIO);
       }
       else
       {
-         openFile(nextOpenedFile, multiAIO);
+         if (keepOpened)
+         {
+            openFile(nextOpenedFile, multiAIO);
+         }
       }
       return nextOpenedFile;
    }
@@ -2765,9 +2806,9 @@
       private TransactionCallback currentCallback;
 
       private Map<JournalFile, TransactionCallback> callbackList;
-      
+
       private JournalFile lastFile = null;
-      
+
       private final AtomicInteger counter = new AtomicInteger();
 
       private AtomicInteger internalgetCounter(final JournalFile file)
@@ -2779,7 +2820,7 @@
          }
          return counter;
       }
-      
+
       public int getCounter(final JournalFile file)
       {
          return internalgetCounter(file).intValue();
@@ -2789,7 +2830,7 @@
       {
          internalgetCounter(file).incrementAndGet();
       }
-      
+
       /**
        * @param currentFile
        * @param bb
@@ -3110,61 +3151,5 @@
       }
    }
 
-   private static interface JournalReader
-   {
-      void addRecord(RecordInfo info) throws Exception;
 
-      /**
-       * @param recordInfo
-       * @throws Exception 
-       */
-      void updateRecord(RecordInfo recordInfo) throws Exception;
-
-      /**
-       * @param recordID
-       */
-      void deleteRecord(long recordID) throws Exception;
-
-      /**
-       * @param transactionID
-       * @param recordInfo
-       * @throws Exception 
-       */
-      void addRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
-
-      /**
-       * @param transactionID
-       * @param recordInfo
-       * @throws Exception 
-       */
-      void updateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
-
-      /**
-       * @param transactionID
-       * @param recordInfo
-       */
-      void deleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
-
-      /**
-       * @param transactionID
-       * @param extraData
-       * @param summaryData
-       */
-      void prepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception;
-
-      /**
-       * @param transactionID
-       * @param summaryData
-       */
-      void commitRecord(long transactionID, int numberOfRecords) throws Exception;
-
-      /**
-       * @param transactionID
-       */
-      void rollbackRecord(long transactionID) throws Exception;
-
-      public void markAsDataFile(JournalFile file);
-
-   }
-
 }

Added: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalReader.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalReader.java	                        (rev 0)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalReader.java	2009-06-25 04:07:44 UTC (rev 7460)
@@ -0,0 +1,91 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.journal.impl;
+
+import org.jboss.messaging.core.journal.RecordInfo;
+
+/**
+ * A JournalReader
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface JournalReader
+{
+   void addRecord(RecordInfo info) throws Exception;
+
+   /**
+    * @param recordInfo
+    * @throws Exception 
+    */
+   void updateRecord(RecordInfo recordInfo) throws Exception;
+
+   /**
+    * @param recordID
+    */
+   void deleteRecord(long recordID) throws Exception;
+
+   /**
+    * @param transactionID
+    * @param recordInfo
+    * @throws Exception 
+    */
+   void addRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
+
+   /**
+    * @param transactionID
+    * @param recordInfo
+    * @throws Exception 
+    */
+   void updateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
+
+   /**
+    * @param transactionID
+    * @param recordInfo
+    */
+   void deleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
+
+   /**
+    * @param transactionID
+    * @param extraData
+    * @param summaryData
+    */
+   void prepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception;
+
+   /**
+    * @param transactionID
+    * @param summaryData
+    */
+   void commitRecord(long transactionID, int numberOfRecords) throws Exception;
+
+   /**
+    * @param transactionID
+    */
+   void rollbackRecord(long transactionID) throws Exception;
+
+   public void markAsDataFile(JournalFile file);
+
+
+}

Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java	2009-06-24 21:37:13 UTC (rev 7459)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java	2009-06-25 04:07:44 UTC (rev 7460)
@@ -499,7 +499,8 @@
       byte[] record = new byte[length];
       for (int i = 0; i < length; i++)
       {
-         record[i] = RandomUtil.randomByte();
+         //record[i] = RandomUtil.randomByte();
+         record[i] = getSamplebyte(i);
       }
       return record;
    }

Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java	2009-06-24 21:37:13 UTC (rev 7459)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java	2009-06-25 04:07:44 UTC (rev 7460)
@@ -3076,10 +3076,12 @@
       createJournal();
       startJournal();
       load();
+      
+      int NUMBER_OF_RECRODS = 100;
 
       long transactionID = 0;
 
-      for (int i = 0; i < 500; i++)
+      for (int i = 0; i < NUMBER_OF_RECRODS/2; i++)
       {
          add(i);
          if (i % 10 == 0 && i > 0)
@@ -3089,7 +3091,7 @@
          update(i);
       }
 
-      for (int i = 500; i < 1000; i++)
+      for (int i = NUMBER_OF_RECRODS/2; i < NUMBER_OF_RECRODS; i++)
       {
 
          addTx(transactionID, i);
@@ -3104,7 +3106,7 @@
 
       System.out.println("Number of Files: " + journal.getDataFilesCount());
 
-      for (int i = 0; i < 1000; i++)
+      for (int i = 0; i < NUMBER_OF_RECRODS; i++)
       {
          if (!(i % 10 == 0))
          {




More information about the jboss-cvs-commits mailing list