[jboss-cvs] JBoss Messaging SVN: r7500 - branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jun 29 23:37:06 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-06-29 23:37:06 -0400 (Mon, 29 Jun 2009)
New Revision: 7500

Modified:
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/DummyCallback.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java
Log:
tweaks

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2009-06-30 01:12:59 UTC (rev 7499)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2009-06-30 03:37:06 UTC (rev 7500)
@@ -346,7 +346,7 @@
       }
       else
       {
-         write(bytes, false, DummyCallback.instance);
+         write(bytes, false, DummyCallback.getInstance());
       }
    }
    
@@ -376,7 +376,7 @@
       }
       else
       {
-         write(bytes, false, DummyCallback.instance);
+         write(bytes, false, DummyCallback.getInstance());
       }
    }
 

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/DummyCallback.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/DummyCallback.java	2009-06-30 01:12:59 UTC (rev 7499)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/DummyCallback.java	2009-06-30 03:37:06 UTC (rev 7500)
@@ -35,7 +35,7 @@
  */
 public  class DummyCallback implements IOCallback
 {
-   static DummyCallback instance = new DummyCallback();
+   private static DummyCallback instance = new DummyCallback();
    
    private static final Logger log = Logger.getLogger(SimpleWaitIOCallback.class);
    

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java	2009-06-30 01:12:59 UTC (rev 7499)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java	2009-06-30 03:37:06 UTC (rev 7500)
@@ -29,6 +29,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.jboss.messaging.core.buffers.ChannelBuffer;
 import org.jboss.messaging.core.buffers.ChannelBuffers;
@@ -52,34 +53,46 @@
 
    private static final Logger log = Logger.getLogger(JournalCompactor.class);
 
-   final JournalImpl journal;
+   private final JournalImpl journal;
 
-   final SequentialFileFactory fileFactory;
+   private final SequentialFileFactory fileFactory;
 
-   JournalFile currentFile;
+   private JournalFile currentFile;
 
-   SequentialFile sequentialFile;
+   private SequentialFile sequentialFile;
 
-   int fileID;
+   private int fileID;
 
-   ChannelBuffer channelWrapper;
+   private ChannelBuffer channelWrapper;
 
-   int nextOrderingID;
+   private int nextOrderingID;
 
-   final List<JournalFile> newDataFiles = new ArrayList<JournalFile>();
+   private final List<JournalFile> newDataFiles = new ArrayList<JournalFile>();
 
-   final Set<Long> recordsSnapshot = new ConcurrentHashSet<Long>();;
+   private final Set<Long> recordsSnapshot = new ConcurrentHashSet<Long>();;
 
    // Snapshot of transactions that were pending when the compactor started
-   final Set<Long> pendingTransactions = new ConcurrentHashSet<Long>();
+   private final Map<Long, PendingTransaction> pendingTransactions = new ConcurrentHashMap<Long, PendingTransaction>();
 
-   final Map<Long, JournalRecord> newRecords = new HashMap<Long, JournalRecord>();
+   private final Map<Long, JournalRecord> newRecords = new HashMap<Long, JournalRecord>();
 
-   final Map<Long, JournalTransaction> newTransactions = new HashMap<Long, JournalTransaction>();
+   private final Map<Long, JournalTransaction> newTransactions = new HashMap<Long, JournalTransaction>();
 
-   /** Commands that happened during compacting */
-   final LinkedList<CompactCommand> pendingCommands = new LinkedList<CompactCommand>();
+   /** Commands that happened during compacting
+    *  We can't take any counts during compactation, as we won't know in what files the records are taking place, so
+    *  we cache those updates during compacting. As soon as we are done we take the right account. */
+   private final LinkedList<CompactCommand> pendingCommands = new LinkedList<CompactCommand>();
 
+   public List<JournalFile> getNewDataFiles()
+   {
+      return newDataFiles;
+   }
+   
+   public Map<Long, JournalRecord> getNewRecords()
+   {
+      return newRecords;
+   }
+   
    public JournalCompactor(final SequentialFileFactory fileFactory,
                            final JournalImpl journal,
                            Set<Long> recordsSnapshot,
@@ -92,9 +105,9 @@
    }
 
    /** This methods informs the Compactor about the existence of a pending (non committed) transaction */
-   public void addPendingTransaction(long transactionID)
+   public void addPendingTransaction(long transactionID, long ids[])
    {
-      pendingTransactions.add(transactionID);
+      pendingTransactions.put(transactionID, new PendingTransaction(transactionID, ids));
    }
 
    /**
@@ -192,7 +205,7 @@
 
    public void addRecordTX(long transactionID, RecordInfo info) throws Exception
    {
-      if (pendingTransactions.contains(transactionID))
+      if (pendingTransactions.get(transactionID) != null)
       {
          JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
 
@@ -219,7 +232,7 @@
 
    public void commitRecord(long transactionID, int numberOfRecords) throws Exception
    {
-      if (pendingTransactions.contains(transactionID))
+      if (pendingTransactions.get(transactionID) != null)
       {
          // Sanity check, this should never happen
          throw new IllegalStateException("Inconsistency during compacting: CommitRecord ID = " + transactionID +
@@ -239,7 +252,7 @@
 
    public void deleteRecordTX(long transactionID, RecordInfo info) throws Exception
    {
-      if (pendingTransactions.contains(transactionID))
+      if (pendingTransactions.get(transactionID) != null)
       {
          JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
 
@@ -266,7 +279,7 @@
 
    public void prepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
    {
-      if (pendingTransactions.contains(transactionID))
+      if (pendingTransactions.get(transactionID) != null)
       {
 
          JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
@@ -291,7 +304,7 @@
 
    public void rollbackRecord(long transactionID) throws Exception
    {
-      if (pendingTransactions.contains(transactionID))
+      if (pendingTransactions.get(transactionID) != null)
       {
          // Sanity check, this should never happen
          throw new IllegalStateException("Inconsistency during compacting: RollbackRecord ID = " + transactionID +
@@ -328,7 +341,7 @@
 
    public void updateRecordTX(long transactionID, RecordInfo info) throws Exception
    {
-      if (pendingTransactions.contains(transactionID))
+      if (pendingTransactions.get(transactionID) != null)
       {
          JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
 
@@ -361,7 +374,7 @@
       JournalTransaction newTransaction = newTransactions.get(transactionID);
       if (newTransaction == null)
       {
-         newTransaction = new JournalTransaction(journal);
+         newTransaction = new JournalTransaction(transactionID, journal);
          newTransactions.put(transactionID, newTransaction);
       }
       return newTransaction;
@@ -413,6 +426,20 @@
          deleteRecord.delete(usedFile);
       }
    }
+   
+   private class PendingTransaction
+   {
+      long transactionID;
+      long pendingIDs[];
+      
+      
+      PendingTransaction(long transactionID, long ids[])
+      {
+         this.transactionID = transactionID;
+         this.pendingIDs = ids;
+      }
+      
+   }
 
    private class UpdateCompactCommand extends CompactCommand
    {
@@ -435,4 +462,14 @@
          updateRecord.addUpdateFile(usedFile);
       }
    }
+
+   /**
+    * @param id
+    * @param journalTransaction
+    */
+   public void addCommandCommit(long id, JournalTransaction journalTransaction)
+   {
+      // TODO Auto-generated method stub
+      
+   }
 }

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-30 01:12:59 UTC (rev 7499)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-30 03:37:06 UTC (rev 7500)
@@ -61,9 +61,7 @@
 import org.jboss.messaging.core.journal.TestableJournal;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.utils.ConcurrentHashSet;
 import org.jboss.messaging.utils.DataConstants;
-import org.jboss.messaging.utils.Pair;
 import org.jboss.messaging.utils.concurrent.LinkedBlockingDeque;
 
 /**
@@ -866,9 +864,8 @@
 
             for (Map.Entry<Long, JournalTransaction> entry : transactions.entrySet())
             {
-               System.out.println("TransactionID = " + entry.getKey());
                entry.getValue().setCompacting();
-               compactor.addPendingTransaction(entry.getKey());
+               compactor.addPendingTransaction(entry.getKey(), entry.getValue().getPositiveArray());
             }
 
             // We will calculate the new records during compacting, what will take the position the records will take after compacting
@@ -904,17 +901,17 @@
          // Usually tests will use this to hold the compacting while other structures are being updated.
          onCompactDone();
 
-         SequentialFile controlFile = createControlFile(dataFilesToProcess, compactor.newDataFiles);
+         SequentialFile controlFile = createControlFile(dataFilesToProcess, compactor.getNewDataFiles());
 
          List<JournalFile> newDatafiles = null;
 
          compactingLock.writeLock().lock();
          try
          {
-            newDatafiles = compactor.newDataFiles;
+            newDatafiles = compactor.getNewDataFiles();
 
             // Restore newRecords created during compacting
-            for (Map.Entry<Long, JournalRecord> newRecordEntry : compactor.newRecords.entrySet())
+            for (Map.Entry<Long, JournalRecord> newRecordEntry : compactor.getNewRecords().entrySet())
             {
                records.put(newRecordEntry.getKey(), newRecordEntry.getValue());
             }
@@ -929,7 +926,7 @@
             
             compactor.replayPendingCommands();
             
-            // Deal with transactions commits that happend during the compacting
+            // Deal with transactions commits that happened during the compacting
 
             this.compactor = null;
 
@@ -956,17 +953,6 @@
             {
             }
 
-            if (compactor.sequentialFile != null)
-            {
-               try
-               {
-                  compactor.sequentialFile = null;
-               }
-               catch (Throwable ignored)
-               {
-               }
-            }
-
             compactor = null;
          }
          autoReclaim = previousReclaimValue;
@@ -1204,7 +1190,7 @@
 
                if (tnp == null)
                {
-                  tnp = new JournalTransaction(JournalImpl.this);
+                  tnp = new JournalTransaction(info.id, JournalImpl.this);
 
                   transactions.put(transactionID, tnp);
                }
@@ -1236,7 +1222,7 @@
 
                if (tnp == null)
                {
-                  tnp = new JournalTransaction(JournalImpl.this);
+                  tnp = new JournalTransaction(transactionID, JournalImpl.this);
 
                   transactions.put(transactionID, tnp);
                }
@@ -1272,7 +1258,7 @@
 
                if (journalTransaction == null)
                {
-                  journalTransaction = new JournalTransaction(JournalImpl.this);
+                  journalTransaction = new JournalTransaction(transactionID, JournalImpl.this);
 
                   transactions.put(transactionID, journalTransaction);
                }
@@ -2730,7 +2716,7 @@
 
       if (tx == null)
       {
-         tx = new JournalTransaction(this);
+         tx = new JournalTransaction(txID, this);
 
          JournalTransaction trans = transactions.putIfAbsent(txID, tx);
 

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java	2009-06-30 01:12:59 UTC (rev 7499)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java	2009-06-30 03:37:06 UTC (rev 7500)
@@ -51,12 +51,14 @@
 
    private List<Pair<JournalFile, Long>> neg;
 
+   private long id;
+   
    // All the files this transaction is touching on.
    // We can't have those files being reclaimed if there is a pending transaction
    private Set<JournalFile> pendingFiles;
 
    private TransactionCallback currentCallback;
-   
+
    private boolean compacting = false;
 
    private Map<JournalFile, TransactionCallback> callbackList;
@@ -65,8 +67,9 @@
 
    private final AtomicInteger counter = new AtomicInteger();
 
-   public JournalTransaction(final JournalImpl journal)
+   public JournalTransaction(final long id, final JournalImpl journal)
    {
+      this.id = id;
       this.journal = journal;
    }
 
@@ -80,10 +83,74 @@
       internalgetCounter(file).incrementAndGet();
    }
 
+   public long[] getPositiveArray()
+   {
+      if (pos == null)
+      {
+         return new long[0];
+      }
+      else
+      {
+         int i = 0;
+         long[] ids = new long[pos.size()];
+         for (Pair<JournalFile, Long> el : pos)
+         {
+            ids[i++] = el.b;
+         }
+         return ids;
+      }
+   }
+
    public void setCompacting()
    {
       compacting = true;
-      
+
+      // Everything is cleared on the transaction...
+      // since we are compacting, everything is at the compactor's level
+      clear();
+   }
+
+   /** This is used to merge transactions from compacting */
+   public void merge(JournalTransaction other)
+   {
+      if (other.pos != null)
+      {
+         if (pos == null)
+         {
+            pos = new ArrayList<Pair<JournalFile, Long>>();
+         }
+
+         pos.addAll(other.pos);
+      }
+
+      if (other.neg != null)
+      {
+         if (neg == null)
+         {
+            neg = new ArrayList<Pair<JournalFile, Long>>();
+         }
+
+         neg.addAll(other.neg);
+      }
+
+      if (other.pendingFiles != null)
+      {
+         if (pendingFiles == null)
+         {
+            pendingFiles = new HashSet<JournalFile>();
+         }
+
+         pendingFiles.addAll(other.pendingFiles);
+      }
+
+      this.compacting = false;
+   }
+
+   /**
+    * 
+    */
+   public void clear()
+   {
       // / Compacting is recreating all the previous files and everything
       // / so we just clear the list of previous files, previous pos and previous adds
       // / The transaction may be working at the top from now
@@ -213,64 +280,72 @@
    }
 
    /** 
-    * The caller of this method needs to guarantee lock.acquire. (unless this is being called from load what is a single thread process).
+    * The caller of this method needs to guarantee lock.acquire at the journal. (unless this is being called from load what is a single thread process).
     * */
    public void commit(final JournalFile file)
    {
       JournalCompactor compactor = journal.getCompactor();
-      
-      if (pos != null)
+
+      if (compacting)
       {
-         for (Pair<JournalFile, Long> trUpdate : pos)
+         compactor.addCommandCommit(id, this);
+      }
+      else
+      {
+
+         if (pos != null)
          {
-            JournalImpl.JournalRecord posFiles = journal.getRecords().get(trUpdate.b);
-
-            if (posFiles == null)
+            for (Pair<JournalFile, Long> trUpdate : pos)
             {
-               posFiles = new JournalImpl.JournalRecord(trUpdate.a);
+               JournalImpl.JournalRecord posFiles = journal.getRecords().get(trUpdate.b);
 
-               journal.getRecords().put(trUpdate.b, posFiles);
+               if (posFiles == null)
+               {
+                  posFiles = new JournalImpl.JournalRecord(trUpdate.a);
+
+                  journal.getRecords().put(trUpdate.b, posFiles);
+               }
+               else if (compactor != null && compactor.lookupRecord(trUpdate.b))
+               {
+                  // This is a case where the transaction was opened after compacting was started,
+                  // but the commit arrived while compacting was working
+                  // We need to cache the counter update, so compacting will take the correct files when it is done
+                  compactor.addCommandUpdate(trUpdate.b, trUpdate.a);
+               }
+               else
+               {
+                  posFiles.addUpdateFile(trUpdate.a);
+               }
             }
-            else if (compactor != null && compactor.lookupRecord(trUpdate.b))
-            {
-               // This is a case where the transaction was opened after compacting was started,
-               // but the commit arrived while compacting was working
-               // We need to cache the counter update, so compacting will take the correct files when it is done
-               compactor.addCommandUpdate(trUpdate.b, trUpdate.a);
-            }
-            else
-            {
-               posFiles.addUpdateFile(trUpdate.a);
-            }
          }
-      }
 
-      if (neg != null)
-      {
-         for (Pair<JournalFile, Long> trDelete : neg)
+         if (neg != null)
          {
-            JournalImpl.JournalRecord posFiles = journal.getRecords().remove(trDelete.b);
+            for (Pair<JournalFile, Long> trDelete : neg)
+            {
+               JournalImpl.JournalRecord posFiles = journal.getRecords().remove(trDelete.b);
 
-            if (posFiles != null)
-            {
-               posFiles.delete(trDelete.a);
+               if (posFiles != null)
+               {
+                  posFiles.delete(trDelete.a);
+               }
+               else if (compactor != null && compactor.lookupRecord(trDelete.b))
+               {
+                  // This is a case where the transaction was opened after compacting was started,
+                  // but the commit arrived while compacting was working
+                  // We need to cache the counter update, so compacting will take the correct files when it is done
+                  compactor.addCommandDelete(trDelete.b, trDelete.a);
+               }
             }
-            else if (compactor != null && compactor.lookupRecord(trDelete.b))
-            {
-               // This is a case where the transaction was opened after compacting was started,
-               // but the commit arrived while compacting was working
-               // We need to cache the counter update, so compacting will take the correct files when it is done
-               compactor.addCommandDelete(trDelete.b, trDelete.a);
-            }
          }
-      }
 
-      // Now add negs for the pos we added in each file in which there were
-      // transactional operations
+         // Now add negs for the pos we added in each file in which there were
+         // transactional operations
 
-      for (JournalFile jf : pendingFiles)
-      {
-         file.incNegCount(jf);
+         for (JournalFile jf : pendingFiles)
+         {
+            file.incNegCount(jf);
+         }
       }
    }
 




More information about the jboss-cvs-commits mailing list