[jboss-cvs] JBoss Messaging SVN: r7473 - branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jun 25 17:42:56 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-06-25 17:42:56 -0400 (Thu, 25 Jun 2009)
New Revision: 7473

Modified:
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
Log:
changes

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-25 18:39:32 UTC (rev 7472)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-25 21:42:56 UTC (rev 7473)
@@ -194,7 +194,7 @@
 
    // Compacting may replace this structure
    private volatile ConcurrentMap<Long, JournalTransaction> pendingTransactions = new ConcurrentHashMap<Long, JournalTransaction>();
-   
+
    // This will be filled only while the Compactor is being done
    private volatile Compactor compactor;
 
@@ -202,13 +202,9 @@
 
    private final Semaphore lock = new Semaphore(1);
 
+   /** We don't lock the journal while compacting, however we need to lock it while taking and updating snapshots */
    private final ReadWriteLock compactingLock = new ReentrantReadWriteLock();
 
-   /** We don't lock the journal while compacting, however we need to lock it while taking and updating snapshots */
-   private final Lock readLockCompact = compactingLock.readLock();
-
-   private final Lock writeLockCompact = compactingLock.writeLock();
-
    private volatile JournalFile currentFile;
 
    private volatile int state;
@@ -285,7 +281,7 @@
 
       IOCallback callback = null;
 
-      readLockCompact.lock();
+      compactingLock.readLock().lock();
 
       try
       {
@@ -311,7 +307,7 @@
       }
       finally
       {
-         readLockCompact.unlock();
+         compactingLock.readLock().unlock();
       }
 
       if (callback != null)
@@ -334,7 +330,7 @@
 
       IOCallback callback = null;
 
-      readLockCompact.lock();
+      compactingLock.readLock().lock();
 
       try
       {
@@ -374,7 +370,7 @@
       }
       finally
       {
-         readLockCompact.unlock();
+         compactingLock.readLock().unlock();
       }
 
       if (callback != null)
@@ -390,7 +386,7 @@
          throw new IllegalStateException("Journal must be loaded first");
       }
 
-      readLockCompact.lock();
+      compactingLock.readLock().lock();
 
       IOCallback callback = null;
 
@@ -429,7 +425,7 @@
       }
       finally
       {
-         readLockCompact.unlock();
+         compactingLock.readLock().unlock();
       }
 
       if (callback != null)
@@ -454,7 +450,7 @@
          throw new IllegalStateException("Journal must be loaded first");
       }
 
-      readLockCompact.lock();
+      compactingLock.readLock().lock();
 
       try
       {
@@ -481,7 +477,7 @@
       }
       finally
       {
-         readLockCompact.unlock();
+         compactingLock.readLock().unlock();
       }
    }
 
@@ -503,7 +499,7 @@
          throw new IllegalStateException("Journal must be loaded first");
       }
 
-      readLockCompact.lock();
+      compactingLock.readLock().lock();
 
       try
       {
@@ -537,7 +533,7 @@
       }
       finally
       {
-         readLockCompact.unlock();
+         compactingLock.readLock().unlock();
       }
    }
 
@@ -553,24 +549,15 @@
          throw new IllegalStateException("Journal must be loaded first");
       }
 
-      readLockCompact.lock();
+      compactingLock.readLock().lock();
 
       try
       {
-         int size = SIZE_DELETE_RECORD_TX + (record != null ? record.getEncodeSize() : 0);
+         int size = SIZE_DELETE_RECORD_TX + record.getEncodeSize();
 
          ChannelBuffer bb = newBuffer(size);
 
-         bb.writeByte(DELETE_RECORD_TX);
-         bb.writeInt(-1); // skip ID part
-         bb.writeLong(txID);
-         bb.writeLong(id);
-         bb.writeInt(record != null ? record.getEncodeSize() : 0);
-         if (record != null)
-         {
-            record.encode(bb);
-         }
-         bb.writeInt(size);
+         writeDeleteRecordTransactional(-1, txID, id, record, size, bb);
 
          JournalTransaction tx = getTransactionInfo(txID);
 
@@ -588,51 +575,13 @@
       }
       finally
       {
-         readLockCompact.unlock();
+         compactingLock.readLock().unlock();
       }
    }
 
    public void appendDeleteRecordTransactional(final long txID, final long id) throws Exception
    {
-      if (state != STATE_LOADED)
-      {
-         throw new IllegalStateException("Journal must be loaded first");
-      }
-
-      readLockCompact.lock();
-
-      try
-      {
-         int size = SIZE_DELETE_RECORD_TX;
-
-         ChannelBuffer bb = newBuffer(size);
-
-         bb.writeByte(DELETE_RECORD_TX);
-         bb.writeInt(-1); // skip ID part
-         bb.writeLong(txID);
-         bb.writeLong(id);
-         bb.writeInt(0);
-         bb.writeInt(size);
-
-         JournalTransaction tx = getTransactionInfo(txID);
-
-         lock.acquire();
-         try
-         {
-            JournalFile usedFile = appendRecord(bb, false, false, tx, null);
-
-            tx.addNegative(usedFile, id);
-         }
-         finally
-         {
-            lock.release();
-         }
-      }
-      finally
-      {
-         readLockCompact.unlock();
-      }
-
+      appendDeleteRecordTransactional(txID, id, NullEncoding.instance);
    }
 
    /** 
@@ -662,7 +611,7 @@
          tx.syncPreviousFiles();
       }
 
-      readLockCompact.lock();
+      compactingLock.readLock().lock();
 
       try
       {
@@ -687,7 +636,7 @@
       }
       finally
       {
-         readLockCompact.unlock();
+         compactingLock.readLock().unlock();
       }
 
       // We should wait this outside of the lock, to increase throughput
@@ -720,7 +669,7 @@
 
       JournalTransaction tx = pendingTransactions.remove(txID);
 
-      readLockCompact.lock();
+      compactingLock.readLock().lock();
 
       try
       {
@@ -749,7 +698,7 @@
       }
       finally
       {
-         readLockCompact.unlock();
+         compactingLock.readLock().unlock();
       }
 
       if (sync)
@@ -766,7 +715,7 @@
          throw new IllegalStateException("Journal must be loaded first");
       }
 
-      readLockCompact.lock();
+      compactingLock.readLock().lock();
 
       JournalTransaction tx = null;
 
@@ -803,7 +752,7 @@
       }
       finally
       {
-         readLockCompact.unlock();
+         compactingLock.readLock().unlock();
       }
 
       // We should wait this outside of the lock, to increase throuput
@@ -868,7 +817,7 @@
       ConcurrentMap<Long, JournalRecord> recordsSnapshot = null;
 
       ArrayList<JournalFile> dataFilesToProcess = new ArrayList<JournalFile>(dataFiles.size());
-      
+
       Map<Long, JournalTransaction> pendingTransactions;
 
       boolean previousReclaimValue = autoReclaim;
@@ -879,7 +828,7 @@
          // First, we replace the records by a new one.
          // We need to guarantee that the journal is frozen for this short time
          // We don't freeze the journal as we compact, only for the short time where we replace records
-         writeLockCompact.lock();
+         compactingLock.writeLock().lock();
          try
          {
             autoReclaim = false;
@@ -902,16 +851,13 @@
                   break;
                }
             }
-            
-            
 
             this.compactor = new Compactor(recordsSnapshot, pendingTransactions, dataFilesToProcess.get(0).getFileID());
 
-
          }
          finally
          {
-            writeLockCompact.unlock();
+            compactingLock.writeLock().unlock();
          }
 
       }
@@ -925,9 +871,9 @@
          readJournalFile(file, compactor);
       }
 
-      compactor.flushBuffer();
+      compactor.flush();
 
-      writeLockCompact.lock();
+      compactingLock.writeLock().lock();
       try
       {
          // Restore relationshipMap
@@ -936,7 +882,7 @@
       }
       finally
       {
-         writeLockCompact.unlock();
+         compactingLock.writeLock().unlock();
       }
 
    }
@@ -956,14 +902,16 @@
       int nextOrderingID;
 
       final Map<Long, JournalRecord> recordsSnapshot;
+
       final Map<Long, JournalTransaction> pendingTransactions;
-      
+
       final Map<Long, JournalRecord> newRecords = new HashMap<Long, JournalRecord>();
+
       final Map<Long, JournalTransaction> newTransactions = new HashMap<Long, JournalTransaction>();
-      
-      
 
-      public Compactor(Map<Long, JournalRecord> recordsSnapshot, Map<Long, JournalTransaction> pendingTransactions, int firstFileID)
+      public Compactor(Map<Long, JournalRecord> recordsSnapshot,
+                       Map<Long, JournalTransaction> pendingTransactions,
+                       int firstFileID)
       {
          this.recordsSnapshot = recordsSnapshot;
          this.nextOrderingID = firstFileID;
@@ -985,7 +933,7 @@
          }
       }
 
-      public void flushBuffer() throws Exception
+      public void flush() throws Exception
       {
          if (bufferWrite != null)
          {
@@ -1002,7 +950,7 @@
        */
       private void openFile() throws Exception
       {
-         flushBuffer();
+         flush();
 
          bufferWrite = fileFactory.newBuffer(fileSize);
          channelWrapper = ChannelBuffers.wrappedBuffer(bufferWrite);
@@ -1032,25 +980,23 @@
                            new ByteArrayEncoding(info.data),
                            size,
                            channelWrapper);
-            
+
             newRecords.put(info.id, new JournalRecord(currentFile));
          }
       }
 
       public void addRecordTX(long transactionID, RecordInfo info) throws Exception
       {
-         JournalTransaction pending = pendingTransactions.get(transactionID);
-         
-         if (pending != null)
+         if (pendingTransactions.get(transactionID) != null)
          {
             JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
-            
+
             int size = SIZE_ADD_RECORD_TX + info.data.length;
-            
+
             checkSize(size);
-            
+
             newTransaction.addPositive(currentFile, info.id);
-            
+
             writeAddRecordTX(fileID,
                              transactionID,
                              info.id,
@@ -1059,10 +1005,8 @@
                              size,
                              channelWrapper);
          }
-         else
-         if (recordsSnapshot.get(info.id) != null)
+         else if (recordsSnapshot.get(info.id) != null)
          {
-            System.out.println("AddRecordTX for a committed record, just converting it as a regular record");
             // AddRecordTX for a committed record, just converting it as a regular record
             // The record is already confirmed. There is no need to keep the transaction information during compacting
             addRecord(info);
@@ -1071,28 +1015,47 @@
 
       public void commitRecord(long transactionID, int numberOfRecords) throws Exception
       {
-         // Even though this shouldn't happen, I'm processing the commit as it was legal (instead of throwing it away)
          JournalTransaction pendingTx = pendingTransactions.get(transactionID);
-         
+
          if (pendingTx != null)
          {
-            log.warn("A commit record for a pending transaction is being read on compactor. This shouldn't happen");
-            
-            JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
-            checkSize(SIZE_COMMIT_RECORD);
-            writeTransaction(fileID, COMMIT_RECORD, transactionID, newTransaction, null, SIZE_COMMIT_RECORD, pendingTx.getCounter(currentFile), channelWrapper);
-            newTransaction.commit(currentFile);
+            // Sanity check, this should never happen
+            throw new IllegalStateException("Inconsistency during compacting: CommitRecord ID = " + transactionID +
+                                            " for an already committed transaction during compacting");
          }
       }
 
       public void deleteRecord(long recordID) throws Exception
       {
-         // nothing to be done here
+         // nothing to be done here, if it is a delete, the record is already gone.. so.. no worries
+
+         if (records.get(recordID) != null)
+         {
+            // Sanity check, it should never happen
+            throw new IllegalStateException("Inconsistency during compacting: Delete record being read on an existent record");
+         }
+
       }
 
       public void deleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
       {
-         // nothing to be done here
+         if (pendingTransactions.get(transactionID) != null)
+         {
+            JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
+
+            int size = SIZE_DELETE_RECORD_TX + recordInfo.data.length;
+            
+            checkSize(size);
+            
+            writeDeleteRecordTransactional(fileID,
+                                           transactionID,
+                                           recordInfo.id,
+                                           new ByteArrayEncoding(recordInfo.data),
+                                           size,
+                                           channelWrapper);
+            
+            newTransaction.addNegative(currentFile, recordInfo.id);
+         }
       }
 
       public void markAsDataFile(JournalFile file)
@@ -1104,12 +1067,35 @@
       {
          if (pendingTransactions.get(transactionID) != null)
          {
-            
+
+            JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
+
+            int size = SIZE_COMPLETE_TRANSACTION_RECORD + extraData.length + SIZE_INT;
+
+            checkSize(size);
+
+            writeTransaction(fileID,
+                             PREPARE_RECORD,
+                             transactionID,
+                             newTransaction,
+                             new ByteArrayEncoding(extraData),
+                             size,
+                             newTransaction.getCounter(currentFile),
+                             channelWrapper);
+
+            newTransaction.prepare(currentFile);
+
          }
       }
 
       public void rollbackRecord(long transactionID) throws Exception
       {
+         if (pendingTransactions.get(transactionID) != null)
+         {
+            // Sanity check, this should never happen
+            throw new IllegalStateException("Inconsistency during compacting: RollbackRecord ID = " + transactionID +
+                                            " for an already rolled back transaction during compacting");
+         }
       }
 
       public void updateRecord(RecordInfo info) throws Exception
@@ -1127,7 +1113,6 @@
             System.out.println("UpdateTX " + info.id + " to be out on compacted file");
          }
       }
-      
 
       /**
        * @param transactionID
@@ -1143,7 +1128,6 @@
          }
          return newTransaction;
       }
-      
 
    }
 
@@ -2019,7 +2003,8 @@
             wholeFileBuffer.get(record);
          }
 
-         // Case this is a transaction, this will contain the number of pendingTransactions on a transaction, at the currentFile
+         // Case this is a transaction, this will contain the number of pendingTransactions on a transaction, at the
+         // currentFile
          int transactionCheckNumberOfRecords = 0;
 
          if (recordType == PREPARE_RECORD || recordType == COMMIT_RECORD)
@@ -2298,6 +2283,32 @@
    /**
     * @param txID
     * @param id
+    * @param record
+    * @param size
+    * @param bb
+    */
+   private void writeDeleteRecordTransactional(final int fileID,
+                                               final long txID,
+                                               final long id,
+                                               final EncodingSupport record,
+                                               int size,
+                                               ChannelBuffer bb)
+   {
+      bb.writeByte(DELETE_RECORD_TX);
+      bb.writeInt(fileID);
+      bb.writeLong(txID);
+      bb.writeLong(id);
+      bb.writeInt(record != null ? record.getEncodeSize() : 0);
+      if (record != null)
+      {
+         record.encode(bb);
+      }
+      bb.writeInt(size);
+   }
+
+   /**
+    * @param txID
+    * @param id
     * @param recordType
     * @param record
     * @param recordLength
@@ -2727,6 +2738,8 @@
    private void closeFile(final JournalFile file)
    {
       fileFactory.deactivate(file.getFile());
+      dataFiles.add(file);
+
       filesExecutor.execute(new Runnable()
       {
          public void run()
@@ -2739,7 +2752,6 @@
             {
                log.warn(e.getMessage(), e);
             }
-            dataFiles.add(file);
          }
       });
    }
@@ -3172,6 +3184,26 @@
       }
    }
 
+   private static class NullEncoding implements EncodingSupport
+   {
+
+      static NullEncoding instance = new NullEncoding();
+
+      public void decode(MessagingBuffer buffer)
+      {
+      }
+
+      public void encode(MessagingBuffer buffer)
+      {
+      }
+
+      public int getEncodeSize()
+      {
+         return 0;
+      }
+
+   }
+
    private static class ByteArrayEncoding implements EncodingSupport
    {
 




More information about the jboss-cvs-commits mailing list