[jboss-cvs] JBoss Messaging SVN: r7474 - in branches/clebert_temp_expirement: tests/src/org/jboss/messaging/tests/unit/core/journal/impl and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jun 25 19:21:12 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-06-25 19:21:12 -0400 (Thu, 25 Jun 2009)
New Revision: 7474

Modified:
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
Log:
fix on Bytebuffer + changes

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-25 21:42:56 UTC (rev 7473)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-25 23:21:12 UTC (rev 7474)
@@ -64,6 +64,7 @@
 import org.jboss.messaging.core.journal.TestableJournal;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.tests.util.UnitTestCase;
 import org.jboss.messaging.utils.Pair;
 import org.jboss.messaging.utils.VariableLatch;
 
@@ -346,13 +347,7 @@
 
          ChannelBuffer bb = newBuffer(size);
 
-         bb.writeByte(UPDATE_RECORD);
-         bb.writeInt(-1); // skip ID part
-         bb.writeLong(id);
-         bb.writeInt(record.getEncodeSize());
-         bb.writeByte(recordType);
-         record.encode(bb);
-         bb.writeInt(size);
+         writeUpdateRecord(-1, id, recordType, record, size, bb);
 
          callback = getSyncCallback(sync);
 
@@ -508,14 +503,7 @@
 
          ChannelBuffer bb = newBuffer(size);
 
-         bb.writeByte(UPDATE_RECORD_TX);
-         bb.writeInt(-1); // skip ID part
-         bb.writeLong(txID);
-         bb.writeLong(id);
-         bb.writeInt(record.getEncodeSize());
-         bb.writeByte(recordType);
-         record.encode(bb);
-         bb.writeInt(size);
+         writeUpdateRecordTX(-1, txID, id, recordType, record, size, bb);
 
          JournalTransaction tx = getTransactionInfo(txID);
 
@@ -889,17 +877,19 @@
 
    class Compactor implements JournalReader
    {
-      JournalFile currentOutputFile;
+      
+      
+      JournalFile currentFile;
 
       SequentialFile sequentialFile;
 
       int fileID;
 
-      ByteBuffer bufferWrite;
-
       ChannelBuffer channelWrapper;
 
       int nextOrderingID;
+      
+      final List<JournalFile> newDataFiles = new ArrayList<JournalFile>();
 
       final Map<Long, JournalRecord> recordsSnapshot;
 
@@ -920,13 +910,13 @@
 
       private void checkSize(int size) throws Exception
       {
-         if (bufferWrite == null)
+         if (channelWrapper == null)
          {
             openFile();
          }
          else
          {
-            if (bufferWrite.position() + size > bufferWrite.limit())
+            if (channelWrapper.writerIndex() + size > channelWrapper.capacity())
             {
                openFile();
             }
@@ -935,14 +925,15 @@
 
       public void flush() throws Exception
       {
-         if (bufferWrite != null)
+         if (channelWrapper != null)
          {
             sequentialFile.position(0);
-            sequentialFile.write(bufferWrite, true);
+            sequentialFile.write(channelWrapper, true);
             sequentialFile.close();
+            newDataFiles.add(currentFile);
          }
 
-         bufferWrite = null;
+         channelWrapper = null;
       }
 
       /**
@@ -952,18 +943,28 @@
       {
          flush();
 
-         bufferWrite = fileFactory.newBuffer(fileSize);
+         ByteBuffer bufferWrite = fileFactory.newBuffer(fileSize);
          channelWrapper = ChannelBuffers.wrappedBuffer(bufferWrite);
 
-         currentOutputFile = getFile(false, false);
-         sequentialFile = currentOutputFile.getFile();
+         currentFile = getFile(false, false);
+         sequentialFile = currentFile.getFile();
          sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
          sequentialFile.open(1);
-         currentOutputFile = new JournalFileImpl(sequentialFile, nextOrderingID, nextOrderingID);
-         fileID = nextOrderingID;
+         currentFile = new JournalFileImpl(sequentialFile, nextOrderingID, nextOrderingID);
+         fileID = nextOrderingID++;
          System.out.println("Next OrderingID = " + nextOrderingID);
-         bufferWrite.putInt(nextOrderingID);
-         bufferWrite.putInt(nextOrderingID++);
+
+         
+         channelWrapper.writeInt(fileID);
+         channelWrapper.writeInt(fileID);
+         
+         
+         
+         for (int i = 0 ; i < 1000; i++)
+         {
+            channelWrapper.writeByte(UnitTestCase.getSamplebyte(i));
+         }
+         
       }
 
       public void addRecord(RecordInfo info) throws Exception
@@ -1005,10 +1006,9 @@
                              size,
                              channelWrapper);
          }
-         else if (recordsSnapshot.get(info.id) != null)
+         else
          {
-            // AddRecordTX for a committed record, just converting it as a regular record
-            // The record is already confirmed. There is no need to keep the transaction information during compacting
+            // Will try it as a regular record, the method addRecord will validate if this is a live record or not
             addRecord(info);
          }
       }
@@ -1027,8 +1027,6 @@
 
       public void deleteRecord(long recordID) throws Exception
       {
-         // nothing to be done here, if it is a delete, the record is already gone.. so.. no worries
-
          if (records.get(recordID) != null)
          {
             // Sanity check, it should never happen
@@ -1037,24 +1035,24 @@
 
       }
 
-      public void deleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+      public void deleteRecordTX(long transactionID, RecordInfo info) throws Exception
       {
          if (pendingTransactions.get(transactionID) != null)
          {
             JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
 
-            int size = SIZE_DELETE_RECORD_TX + recordInfo.data.length;
-            
+            int size = SIZE_DELETE_RECORD_TX + info.data.length;
+
             checkSize(size);
-            
+
             writeDeleteRecordTransactional(fileID,
                                            transactionID,
-                                           recordInfo.id,
-                                           new ByteArrayEncoding(recordInfo.data),
+                                           info.id,
+                                           new ByteArrayEncoding(info.data),
                                            size,
                                            channelWrapper);
-            
-            newTransaction.addNegative(currentFile, recordInfo.id);
+
+            newTransaction.addNegative(currentFile, info.id);
          }
       }
 
@@ -1102,16 +1100,58 @@
       {
          if (recordsSnapshot.get(info.id) != null)
          {
-            System.out.println("Update " + info.id + " to be out on compacted file");
+            System.out.println("UpdateRecord on compacting");
+            int size = SIZE_UPDATE_RECORD + info.data.length;
+
+            checkSize(size);
+
+            JournalRecord newRecord = newRecords.get(info.id);
+            
+            if (newRecord == null)
+            {
+               log.warn("Couldn't find addRecord information for record " + info.id + " during compacting");
+            }
+            
+
+            writeUpdateRecord(fileID,
+                              info.id,
+                              info.userRecordType,
+                              new ByteArrayEncoding(info.data),
+                              size,
+                              channelWrapper);
+            
+            newRecord.addUpdateFile(currentFile);
+            
          }
       }
 
       public void updateRecordTX(long transactionID, RecordInfo info) throws Exception
       {
-         if (recordsSnapshot.get(info.id) != null)
+
+         if (pendingTransactions.get(transactionID) != null)
          {
-            System.out.println("UpdateTX " + info.id + " to be out on compacted file");
+            JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
+            
+            int size = SIZE_UPDATE_RECORD_TX + info.data.length;
+
+            checkSize(size);
+
+            writeUpdateRecordTX(fileID,
+                                transactionID,
+                                info.id,
+                                info.userRecordType,
+                                new ByteArrayEncoding(info.data),
+                                size,
+                                channelWrapper);
+            
+            
+            newTransaction.addPositive(currentFile, info.id);
          }
+         else
+         {
+            
+            updateRecord(info);
+         }
       }
 
       /**
@@ -2258,12 +2298,61 @@
    }
 
    /**
+    * @param txID
     * @param id
     * @param recordType
     * @param record
     * @param size
     * @param bb
     */
+   private void writeUpdateRecordTX(final int fileID,
+                                    final long txID,
+                                    final long id,
+                                    final byte recordType,
+                                    final EncodingSupport record,
+                                    int size,
+                                    ChannelBuffer bb)
+   {
+      bb.writeByte(UPDATE_RECORD_TX);
+      bb.writeInt(fileID);
+      bb.writeLong(txID);
+      bb.writeLong(id);
+      bb.writeInt(record.getEncodeSize());
+      bb.writeByte(recordType);
+      record.encode(bb);
+      bb.writeInt(size);
+   }
+
+   /**
+    * @param id
+    * @param recordType
+    * @param record
+    * @param size
+    * @param bb
+    */
+   private void writeUpdateRecord(final int fileId,
+                                  final long id,
+                                  final byte recordType,
+                                  final EncodingSupport record,
+                                  int size,
+                                  ChannelBuffer bb)
+   {
+      bb.writeByte(UPDATE_RECORD);
+      bb.writeInt(fileId); // skip ID part
+      bb.writeLong(id);
+      bb.writeInt(record.getEncodeSize());
+      bb.writeByte(recordType);
+      record.encode(bb);
+      bb.writeInt(size);
+   }
+
+   /**
+    * @param id
+    * @param recordType
+    * @param record
+    * @param size
+    * @param bb
+    */
    private void writeAddRecord(final int fileId,
                                final long id,
                                final byte recordType,

Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java	2009-06-25 21:42:56 UTC (rev 7473)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java	2009-06-25 23:21:12 UTC (rev 7474)
@@ -2353,7 +2353,6 @@
       EncodingSupport xid = new SimpleEncoding(10, (byte)'p');
 
       prepare(1, xid);
-      
 
       stopJournal();
       createJournal();
@@ -3078,8 +3077,7 @@
 
       assertEquals(0, journal.getDataFilesCount());
    }
-   
-   
+
    public void testCompactwithPendingPrepare() throws Exception
    {
    }
@@ -3091,14 +3089,14 @@
    public void testCompactwithConcurrentAppend() throws Exception
    {
    }
-      
+
    public void testCompactWithPendingTransactionAndDelete() throws Exception
    {
    }
-   
+
    public void testCompactingWithPendingTransaction() throws Exception
    {
-      
+
    }
 
    public void testSimpleCompacting() throws Exception
@@ -3108,12 +3106,12 @@
       createJournal();
       startJournal();
       load();
-      
+
       int NUMBER_OF_RECORDS = 100;
 
       long transactionID = 0;
 
-      for (int i = 0; i < NUMBER_OF_RECORDS/2; i++)
+      for (int i = 0; i < NUMBER_OF_RECORDS / 2; i++)
       {
          add(i);
          if (i % 10 == 0 && i > 0)
@@ -3123,7 +3121,7 @@
          update(i);
       }
 
-      for (int i = NUMBER_OF_RECORDS/2; i < NUMBER_OF_RECORDS; i++)
+      for (int i = NUMBER_OF_RECORDS / 2; i < NUMBER_OF_RECORDS; i++)
       {
 
          addTx(transactionID, i);
@@ -3156,77 +3154,74 @@
 
       journal.compact();
 
-      for (String fileName: fileFactory.listFiles("cmp"))
+      for (String fileName : fileFactory.listFiles("cmp"))
       {
          System.out.println("File = " + fileName);
-         
+
          SequentialFile readFile = fileFactory.createSequentialFile(fileName, 1);
 
-         ;
          ((JournalImpl)journal).readJournalFile(new JournalFileImpl(readFile, 1, 1), new JournalReader()
          {
             public void addRecord(RecordInfo info) throws Exception
             {
-               System.out.println("recordID = " + info.id);
+               System.out.println("AddrecordID = " + info.id);
             }
 
             public void addRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
             {
-               System.out.println("RecordIDTX = " + transactionID + ", recordID=" + recordInfo.id);
+               System.out.println("UpdRecordTX = " + transactionID + ", recordID=" + recordInfo.id);
             }
 
             public void commitRecord(long transactionID, int numberOfRecords) throws Exception
             {
                // TODO Auto-generated method stub
-               
+
             }
 
             public void deleteRecord(long recordID) throws Exception
             {
                // TODO Auto-generated method stub
-               
+
             }
 
             public void deleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
             {
                // TODO Auto-generated method stub
-               
+
             }
 
             public void markAsDataFile(JournalFile file)
             {
                // TODO Auto-generated method stub
-               
+
             }
 
             public void prepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
             {
                // TODO Auto-generated method stub
-               
+
             }
 
             public void rollbackRecord(long transactionID) throws Exception
             {
                // TODO Auto-generated method stub
-               
+
             }
 
             public void updateRecord(RecordInfo recordInfo) throws Exception
             {
-               // TODO Auto-generated method stub
-               
+               System.out.println("UpdRecordID : " + recordInfo.id);
+
             }
 
             public void updateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
             {
-               // TODO Auto-generated method stub
-               
+               System.out.println("UpdRecordID : " + recordInfo.id);
+
             }
-            
+
          });
       }
-      
-      
 
    }
 




More information about the jboss-cvs-commits mailing list