[hornetq-commits] JBoss hornetq SVN: r11189 - branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Aug 11 13:09:12 EDT 2011


Author: borges
Date: 2011-08-11 13:09:12 -0400 (Thu, 11 Aug 2011)
New Revision: 11189

Modified:
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
Log:
HORNETQ-720 Add number of operations to COMMIT/PREPARE journal entries

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java	2011-08-11 17:08:43 UTC (rev 11188)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java	2011-08-11 17:09:12 UTC (rev 11189)
@@ -1,6 +1,9 @@
 package org.hornetq.core.journal.impl;
 
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.hornetq.api.core.HornetQException;
@@ -29,6 +32,8 @@
    private final ReentrantLock lockAppend = new ReentrantLock();
    // private final ReadWriteLock journalLock = new ReentrantReadWriteLock();
 
+   private final ConcurrentMap<Long, AtomicInteger> transactions = new ConcurrentHashMap<Long, AtomicInteger>();
+
    private final JournalFile currentFile;
 
    /**
@@ -62,16 +67,6 @@
 
    // ------------------------
 
-//   private void readLockJournal()
-//   {
-//      journalLock.readLock().lock();
-//   }
-//
-//   private void readUnlockJournal()
-//   {
-//      journalLock.readLock().unlock();
-//   }
-
    @Override
    public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync, IOCompletion callback)
             throws Exception
@@ -84,10 +79,10 @@
    /**
     * Write the record to the current file.
     */
-   private void writeRecord(JournalInternalRecord encoder, boolean sync, IOCompletion callback) throws Exception
+   private void writeRecord(JournalInternalRecord encoder, final boolean sync, final IOCompletion callback)
+            throws Exception
    {
 
-
       lockAppend.lock();
       try
       {
@@ -130,6 +125,7 @@
    public void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record)
             throws Exception
    {
+      count(txID);
       JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record);
       writeRecord(addRecord, false, null);
    }
@@ -147,6 +143,7 @@
    public void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record)
             throws Exception
    {
+      count(txID);
       JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, record);
       writeRecord(updateRecordTX, false, null);
    }
@@ -156,7 +153,13 @@
             throws Exception
    {
       JournalInternalRecord commitRecord = new JournalCompleteRecordTX(true, txID, null);
-      writeRecord(commitRecord, sync, callback);
+      AtomicInteger value = transactions.remove(Long.valueOf(txID));
+      if (value != null)
+      {
+         commitRecord.setNumberOfRecords(value.get());
+      }
+
+      writeRecord(commitRecord, true, callback);
    }
 
    @Override
@@ -164,9 +167,27 @@
             throws Exception
    {
       JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(false, txID, transactionData);
+      AtomicInteger value = transactions.get(Long.valueOf(txID));
+      if (value != null)
+      {
+         prepareRecord.setNumberOfRecords(value.get());
+      }
       writeRecord(prepareRecord, sync, callback);
    }
 
+   private int count(long txID) throws HornetQException
+   {
+      AtomicInteger defaultValue = new AtomicInteger(1);
+      AtomicInteger count = transactions.putIfAbsent(Long.valueOf(txID), defaultValue);
+      if (count != null)
+      {
+         return count.incrementAndGet();
+      }
+      return defaultValue.get();
+   }
+
+   // UNSUPPORTED STUFF
+
    @Override
    public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception
    {



More information about the hornetq-commits mailing list