[jboss-cvs] JBoss Messaging SVN: r7499 - in branches/clebert_temp_expirement: tests/src/org/jboss/messaging/tests/integration/journal and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jun 29 21:13:00 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-06-29 21:12:59 -0400 (Mon, 29 Jun 2009)
New Revision: 7499

Added:
   branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java
Removed:
   branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java
Modified:
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java
   branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/AIOJournalCompactTest.java
   branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
Log:
changes

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java	2009-06-29 23:47:11 UTC (rev 7498)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java	2009-06-30 01:12:59 UTC (rev 7499)
@@ -39,7 +39,6 @@
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.utils.ConcurrentHashSet;
 import org.jboss.messaging.utils.DataConstants;
-import org.jboss.messaging.utils.Pair;
 
 /**
  * A JournalCompactor
@@ -69,7 +68,7 @@
 
    final List<JournalFile> newDataFiles = new ArrayList<JournalFile>();
 
-   final Map<Long, JournalRecord> recordsSnapshot;
+   final Set<Long> recordsSnapshot = new ConcurrentHashSet<Long>();;
 
    // Snapshot of transactions that were pending when the compactor started
    final Set<Long> pendingTransactions = new ConcurrentHashSet<Long>();
@@ -83,15 +82,16 @@
 
    public JournalCompactor(final SequentialFileFactory fileFactory,
                            final JournalImpl journal,
-                           Map<Long, JournalRecord> recordsSnapshot,
+                           Set<Long> recordsSnapshot,
                            int firstFileID)
    {
       this.fileFactory = fileFactory;
       this.journal = journal;
-      this.recordsSnapshot = recordsSnapshot;
+      this.recordsSnapshot.addAll(recordsSnapshot);
       this.nextOrderingID = firstFileID;
    }
 
+   /** This methods informs the Compactor about the existence of a pending (non committed) transaction */
    public void addPendingTransaction(long transactionID)
    {
       pendingTransactions.add(transactionID);
@@ -117,7 +117,7 @@
 
    public boolean lookupRecord(long id)
    {
-      return recordsSnapshot.get(id) != null;
+      return recordsSnapshot.contains(id);
    }
 
    private void checkSize(int size) throws Exception
@@ -173,7 +173,7 @@
 
    public void addRecord(RecordInfo info) throws Exception
    {
-      if (recordsSnapshot.get(info.id) != null)
+      if (recordsSnapshot.contains(info.id))
       {
          int size = JournalImpl.SIZE_ADD_RECORD + info.data.length;
 
@@ -301,7 +301,7 @@
 
    public void updateRecord(RecordInfo info) throws Exception
    {
-      if (recordsSnapshot.get(info.id) != null)
+      if (recordsSnapshot.contains(info.id))
       {
          int size = JournalImpl.SIZE_UPDATE_RECORD + info.data.length;
 
@@ -388,12 +388,12 @@
       channelWrapper.writeInt(fileID);
    }
 
-   static abstract class CompactCommand
+   private static abstract class CompactCommand
    {
       abstract void execute() throws Exception;
    }
 
-   class DeleteCompactCommand extends CompactCommand
+   private class DeleteCompactCommand extends CompactCommand
    {
       long id;
 
@@ -414,7 +414,7 @@
       }
    }
 
-   class UpdateCompactCommand extends CompactCommand
+   private class UpdateCompactCommand extends CompactCommand
    {
       long id;
 
@@ -427,6 +427,7 @@
          this.usedFile = usedFile;
       }
 
+      
       @Override
       void execute() throws Exception
       {

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-29 23:47:11 UTC (rev 7498)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-30 01:12:59 UTC (rev 7499)
@@ -61,6 +61,7 @@
 import org.jboss.messaging.core.journal.TestableJournal;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.utils.ConcurrentHashSet;
 import org.jboss.messaging.utils.DataConstants;
 import org.jboss.messaging.utils.Pair;
 import org.jboss.messaging.utils.concurrent.LinkedBlockingDeque;
@@ -186,10 +187,10 @@
    private final BlockingQueue<JournalFile> openedFiles = new LinkedBlockingQueue<JournalFile>();
 
    // Compacting may replace this structure
-   private volatile ConcurrentMap<Long, JournalRecord> records = new ConcurrentHashMap<Long, JournalRecord>();
+   private final ConcurrentMap<Long, JournalRecord> records = new ConcurrentHashMap<Long, JournalRecord>();
 
    // Compacting may replace this structure
-   private volatile ConcurrentMap<Long, JournalTransaction> pendingTransactions = new ConcurrentHashMap<Long, JournalTransaction>();
+   private final ConcurrentMap<Long, JournalTransaction> transactions = new ConcurrentHashMap<Long, JournalTransaction>();
 
    // This will be set only while the JournalCompactor is being executed
    private volatile JournalCompactor compactor;
@@ -688,7 +689,7 @@
          throw new IllegalStateException("Journal must be loaded first");
       }
 
-      JournalTransaction tx = pendingTransactions.remove(txID);
+      JournalTransaction tx = transactions.remove(txID);
 
       compactingLock.readLock().lock();
 
@@ -742,7 +743,7 @@
 
       try
       {
-         tx = pendingTransactions.remove(txID);
+         tx = transactions.remove(txID);
 
          if (tx == null)
          {
@@ -836,12 +837,9 @@
          throw new IllegalStateException("There is pending compacting operation");
       }
 
-      ConcurrentMap<Long, JournalRecord> recordsSnapshot = null;
 
       ArrayList<JournalFile> dataFilesToProcess = new ArrayList<JournalFile>(dataFiles.size());
 
-      Map<Long, JournalTransaction> pendingTransactions;
-
       boolean previousReclaimValue = autoReclaim;
 
       try
@@ -852,32 +850,29 @@
          compactingLock.writeLock().lock();
          try
          {
+
             // We need to move to the next file, as we need a clear start for negatives and positives counts
             moveNextFile();
 
             autoReclaim = false;
 
             // Take the snapshots and replace the structures
-
-            recordsSnapshot = JournalImpl.this.records;
-            pendingTransactions = JournalImpl.this.pendingTransactions;
-            pendingTransactions.putAll(this.pendingTransactions);
-
-            this.records = new ConcurrentHashMap<Long, JournalRecord>();
-
+ 
             dataFilesToProcess.addAll(dataFiles);
 
             dataFiles.clear();
 
-            this.compactor = new JournalCompactor(fileFactory, this, recordsSnapshot, dataFilesToProcess.get(0).getFileID());
+            this.compactor = new JournalCompactor(fileFactory, this, this.records.keySet(), dataFilesToProcess.get(0).getFileID());
 
-            for (Map.Entry<Long, JournalTransaction> entry : pendingTransactions.entrySet())
+            for (Map.Entry<Long, JournalTransaction> entry : transactions.entrySet())
             {
                System.out.println("TransactionID = " + entry.getKey());
                entry.getValue().setCompacting();
                compactor.addPendingTransaction(entry.getKey());
             }
 
+            // We will calculate the new records during compacting, what will take the position the records will take after compacting
+            this.records.clear();
          }
          finally
          {
@@ -930,13 +925,11 @@
                dataFiles.addFirst(newDatafiles.get(i));
             }
 
-            // Replay pending commands
+            // Replay pending commands (including updates, deletes and commits)
             
             compactor.replayPendingCommands();
             
-            // Restore relationshipMap
             // Deal with transactions commits that happend during the compacting
-            // Deal with updates and deletes that happened during the compacting
 
             this.compactor = null;
 
@@ -1111,7 +1104,7 @@
          throw new IllegalStateException("Journal must be in started state");
       }
 
-      final Map<Long, TransactionHolder> transactions = new LinkedHashMap<Long, TransactionHolder>();
+      final Map<Long, TransactionHolder> loadTransactions = new LinkedHashMap<Long, TransactionHolder>();
 
       final List<JournalFile> orderedFiles = orderFiles();
 
@@ -1196,24 +1189,24 @@
 
                hasData.set(true);
 
-               TransactionHolder tx = transactions.get(transactionID);
+               TransactionHolder tx = loadTransactions.get(transactionID);
 
                if (tx == null)
                {
                   tx = new TransactionHolder(transactionID);
 
-                  transactions.put(transactionID, tx);
+                  loadTransactions.put(transactionID, tx);
                }
 
                tx.recordInfos.add(info);
 
-               JournalTransaction tnp = pendingTransactions.get(transactionID);
+               JournalTransaction tnp = transactions.get(transactionID);
 
                if (tnp == null)
                {
                   tnp = new JournalTransaction(JournalImpl.this);
 
-                  pendingTransactions.put(transactionID, tnp);
+                  transactions.put(transactionID, tnp);
                }
 
                tnp.addPositive(file, info.id);
@@ -1228,24 +1221,24 @@
 
                hasData.set(true);
 
-               TransactionHolder tx = transactions.get(transactionID);
+               TransactionHolder tx = loadTransactions.get(transactionID);
 
                if (tx == null)
                {
                   tx = new TransactionHolder(transactionID);
 
-                  transactions.put(transactionID, tx);
+                  loadTransactions.put(transactionID, tx);
                }
 
                tx.recordsToDelete.add(info);
 
-               JournalTransaction tnp = pendingTransactions.get(transactionID);
+               JournalTransaction tnp = transactions.get(transactionID);
 
                if (tnp == null)
                {
                   tnp = new JournalTransaction(JournalImpl.this);
 
-                  pendingTransactions.put(transactionID, tnp);
+                  transactions.put(transactionID, tnp);
                }
 
                tnp.addNegative(file, info.id);
@@ -1261,27 +1254,27 @@
 
                hasData.set(true);
 
-               TransactionHolder tx = transactions.get(transactionID);
+               TransactionHolder tx = loadTransactions.get(transactionID);
 
                if (tx == null)
                {
                   // The user could choose to prepare empty transactions
                   tx = new TransactionHolder(transactionID);
 
-                  transactions.put(transactionID, tx);
+                  loadTransactions.put(transactionID, tx);
                }
 
                tx.prepared = true;
 
                tx.extraData = extraData;
 
-               JournalTransaction journalTransaction = pendingTransactions.get(transactionID);
+               JournalTransaction journalTransaction = transactions.get(transactionID);
 
                if (journalTransaction == null)
                {
                   journalTransaction = new JournalTransaction(JournalImpl.this);
 
-                  pendingTransactions.put(transactionID, journalTransaction);
+                  transactions.put(transactionID, journalTransaction);
                }
 
                boolean healthy = checkTransactionHealth(file, journalTransaction, orderedFiles, numberOfRecords);
@@ -1304,7 +1297,7 @@
                   trace("commitRecord: txid = " + transactionID);
                }
 
-               TransactionHolder tx = transactions.remove(transactionID);
+               TransactionHolder tx = loadTransactions.remove(transactionID);
 
                // The commit could be alone on its own journal-file and the
                // whole transaction body was reclaimed but not the
@@ -1315,7 +1308,7 @@
                // ignore this
                if (tx != null)
                {
-                  JournalTransaction journalTransaction = pendingTransactions.remove(transactionID);
+                  JournalTransaction journalTransaction = transactions.remove(transactionID);
 
                   if (journalTransaction == null)
                   {
@@ -1365,7 +1358,7 @@
                   trace("rollbackRecord: txid = " + transactionID);
                }
 
-               TransactionHolder tx = transactions.remove(transactionID);
+               TransactionHolder tx = loadTransactions.remove(transactionID);
 
                // The rollback could be alone on its own journal-file and the
                // whole transaction body was reclaimed but the commit-record
@@ -1373,7 +1366,7 @@
                // point
                if (tx != null)
                {
-                  JournalTransaction tnp = pendingTransactions.remove(transactionID);
+                  JournalTransaction tnp = transactions.remove(transactionID);
 
                   if (tnp == null)
                   {
@@ -1457,13 +1450,13 @@
 
       pushOpenedFile();
 
-      for (TransactionHolder transaction : transactions.values())
+      for (TransactionHolder transaction : loadTransactions.values())
       {
          if (!transaction.prepared || transaction.invalid)
          {
             log.warn("Uncommitted transaction with id " + transaction.transactionID + " found and discarded");
 
-            JournalTransaction transactionInfo = pendingTransactions.get(transaction.transactionID);
+            JournalTransaction transactionInfo = transactions.get(transaction.transactionID);
 
             if (transactionInfo == null)
             {
@@ -1474,7 +1467,7 @@
             transactionInfo.forget();
 
             // Remove the transactionInfo
-            pendingTransactions.remove(transaction.transactionID);
+            transactions.remove(transaction.transactionID);
          }
          else
          {
@@ -1564,7 +1557,7 @@
    {
       fileFactory.testFlush();
 
-      for (JournalTransaction tx : pendingTransactions.values())
+      for (JournalTransaction tx : transactions.values())
       {
          tx.waitCallbacks();
       }
@@ -2685,7 +2678,7 @@
     * @return
     * @throws Exception
     */
-   public JournalFile getFile(boolean keepOpened, boolean multiAIO) throws Exception
+   JournalFile getFile(boolean keepOpened, boolean multiAIO) throws Exception
    {
       JournalFile nextOpenedFile = null;
       try
@@ -2733,13 +2726,13 @@
 
    private JournalTransaction getTransactionInfo(final long txID)
    {
-      JournalTransaction tx = pendingTransactions.get(txID);
+      JournalTransaction tx = transactions.get(txID);
 
       if (tx == null)
       {
          tx = new JournalTransaction(this);
 
-         JournalTransaction trans = pendingTransactions.putIfAbsent(txID, tx);
+         JournalTransaction trans = transactions.putIfAbsent(txID, tx);
 
          if (trans != null)
          {

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java	2009-06-29 23:47:11 UTC (rev 7498)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java	2009-06-30 01:12:59 UTC (rev 7499)
@@ -56,6 +56,8 @@
    private Set<JournalFile> pendingFiles;
 
    private TransactionCallback currentCallback;
+   
+   private boolean compacting = false;
 
    private Map<JournalFile, TransactionCallback> callbackList;
 
@@ -80,6 +82,8 @@
 
    public void setCompacting()
    {
+      compacting = true;
+      
       // / Compacting is recreating all the previous files and everything
       // / so we just clear the list of previous files, previous pos and previous adds
       // / The transaction may be working at the top from now
@@ -214,6 +218,7 @@
    public void commit(final JournalFile file)
    {
       JournalCompactor compactor = journal.getCompactor();
+      
       if (pos != null)
       {
          for (Pair<JournalFile, Long> trUpdate : pos)

Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/AIOJournalCompactTest.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/AIOJournalCompactTest.java	2009-06-29 23:47:11 UTC (rev 7498)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/AIOJournalCompactTest.java	2009-06-30 01:12:59 UTC (rev 7499)
@@ -36,7 +36,7 @@
  *
  *
  */
-public class AIOJournalCompactTest extends JournalCompactTest
+public class AIOJournalCompactTest extends NIOJournalCompactTest
 {
 
    // Constants -----------------------------------------------------

Deleted: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java	2009-06-29 23:47:11 UTC (rev 7498)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java	2009-06-30 01:12:59 UTC (rev 7499)
@@ -1,443 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
- * Middleware LLC, and individual contributors by the @authors tag. See the
- * copyright.txt in the distribution for a full listing of individual
- * contributors.
- * 
- * This is free software; you can redistribute it and/or modify it under the
- * terms of the GNU Lesser General Public License as published by the Free
- * Software Foundation; either version 2.1 of the License, or (at your option)
- * any later version.
- * 
- * This software is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- * details.
- * 
- * You should have received a copy of the GNU Lesser General Public License
- * along with this software; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
- * site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.integration.journal;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.concurrent.CountDownLatch;
-
-import org.jboss.messaging.core.journal.SequentialFileFactory;
-import org.jboss.messaging.core.journal.impl.JournalImpl;
-import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.tests.unit.core.journal.impl.JournalImplTestBase;
-import org.jboss.messaging.utils.IDGenerator;
-import org.jboss.messaging.utils.TimeAndCounterIDGenerator;
-
-/**
- * 
- * A JournalImplTestBase
- * 
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
- *
- */
-public class JournalCompactTest extends JournalImplTestBase
-{
-   private static final Logger log = Logger.getLogger(JournalCompactTest.class);
-
-   protected String journalDir = System.getProperty("user.home") + "/journal-test";
-
-   private static final int NUMBER_OF_RECORDS = 1000;
-
-   IDGenerator idGenerator = new TimeAndCounterIDGenerator();
-
-   // General tests
-   // =============
-
-   public void testCrashRenamingFiles() throws Exception
-   {
-   }
-
-   public void testCompactwithPendingXACommit() throws Exception
-   {
-   }
-
-   public void testCompactwithPendingXAPrepareAndCommit() throws Exception
-   {
-   }
-
-   public void testCompactwithPendingCommit() throws Exception
-   {
-      InternalCompactTest(false, false, false, false, true);
-   }
-
-   public void testCompactwithConcurrentUpdateAndDeletes() throws Exception
-   {
-      InternalCompactTest(true, false, true, true, false);
-   }
-
-   public void testCompactwithConcurrentDeletes() throws Exception
-   {
-      InternalCompactTest(true, false, false, true, false);
-   }
-
-   public void testCompactwithConcurrentUpdates() throws Exception
-   {
-      InternalCompactTest(true, false, true, false, false);
-   }
-
-   public void testCompactWithConcurrentAppend() throws Exception
-   {
-      InternalCompactTest(true, true, false, false, false);
-   }
-
-   private void InternalCompactTest(final boolean regularAdd,
-                                    final boolean performAppend,
-                                    final boolean performUpdate,
-                                    final boolean performDelete,
-                                    final boolean pendingTransactions) throws Exception
-   {
-      setup(50, 60 * 1024, true);
-
-      ArrayList<Long> liveIDs = new ArrayList<Long>();
-
-      ArrayList<Long> listPendingTransactions = new ArrayList<Long>();
-
-      final CountDownLatch latchDone = new CountDownLatch(1);
-      final CountDownLatch latchWait = new CountDownLatch(1);
-      journal = new JournalImpl(fileSize, minFiles, fileFactory, filePrefix, fileExtension, maxAIO)
-      {
-         @Override
-         public void onCompactDone()
-         {
-            latchDone.countDown();
-            System.out.println("Waiting on Compact");
-            try
-            {
-               latchWait.await();
-            }
-            catch (InterruptedException e)
-            {
-               e.printStackTrace();
-            }
-            System.out.println("Done");
-         }
-      };
-      startJournal();
-      load();
-
-      long transactionID = 0;
-
-      if (regularAdd)
-      {
-
-         for (int i = 0; i < NUMBER_OF_RECORDS / 2; i++)
-         {
-            add(i);
-            if (i % 10 == 0 && i > 0)
-            {
-               journal.forceMoveNextFile();
-            }
-            update(i);
-         }
-
-         for (int i = NUMBER_OF_RECORDS / 2; i < NUMBER_OF_RECORDS; i++)
-         {
-
-            addTx(transactionID, i);
-            updateTx(transactionID, i);
-            if (i % 10 == 0)
-            {
-               journal.forceMoveNextFile();
-            }
-            commit(transactionID++);
-            update(i);
-         }
-      }
-
-      if (pendingTransactions)
-      {
-         for (long i = 0; i < 100; i++)
-         {
-            addTx(transactionID, idGenerator.generateID());
-            updateTx(transactionID, idGenerator.generateID());
-            listPendingTransactions.add(transactionID++);
-         }
-      }
-
-      System.out.println("Number of Files: " + journal.getDataFilesCount());
-
-      if (regularAdd)
-      {
-         for (int i = 0; i < NUMBER_OF_RECORDS; i++)
-         {
-            if (!(i % 10 == 0))
-            {
-               delete(i);
-            }
-            else
-            {
-               liveIDs.add((long)i);
-            }
-         }
-      }
-
-      journal.forceMoveNextFile();
-
-      Thread t = new Thread()
-      {
-         @Override
-         public void run()
-         {
-            try
-            {
-               journal.compact();
-            }
-            catch (Exception e)
-            {
-               e.printStackTrace();
-            }
-         }
-      };
-
-      t.start();
-
-      latchDone.await();
-
-      int nextID = NUMBER_OF_RECORDS;
-
-      if (performAppend)
-      {
-         for (int i = 0; i < 50; i++)
-         {
-            add(nextID++);
-            if (i % 10 == 0)
-            {
-               journal.forceMoveNextFile();
-            }
-         }
-
-         for (int i = 0; i < 50; i++)
-         {
-            // A Total new transaction (that was created after the compact started) to add new record while compacting is still working
-            addTx(transactionID, nextID++);
-            commit(transactionID++);
-            if (i % 10 == 0)
-            {
-               journal.forceMoveNextFile();
-            }
-         }
-      }
-
-      if (performUpdate)
-      {
-         int count = 0;
-         for (Long liveID : liveIDs)
-         {
-            if (count++ % 2 == 0)
-            {
-               update(liveID);
-            }
-            else
-            {
-               // A Total new transaction (that was created after the compact started) to update a record that is being compacted
-               updateTx(transactionID, liveID);
-               commit(transactionID++);
-            }
-         }
-      }
-
-      if (performDelete)
-      {
-         int count = 0;
-         for (long liveID : liveIDs)
-         {
-            if (count++ % 2 == 0)
-            {
-               delete(liveID);
-            }
-            else
-            {
-               // A Total new transaction (that was created after the compact started) to delete a record that is being compacted
-               deleteTx(transactionID, liveID);
-               commit(transactionID++);
-            }
-
-         }
-      }
-
-      if (pendingTransactions)
-      {
-         for (long tx : listPendingTransactions)
-         {
-            if (tx % 2 == 0)
-            {
-               commit(tx);
-            }
-            else
-            {
-               rollback(tx);
-            }
-         }
-      }
-
-      /** Some independent adds and updates */
-      for (int i = 0; i < 1000; i++)
-      {
-         long id = idGenerator.generateID();
-         add(id);
-         delete(id);
-
-         if (i % 100 == 0)
-         {
-            journal.forceMoveNextFile();
-         }
-      }
-
-      journal.forceMoveNextFile();
-
-      latchWait.countDown();
-
-      t.join();
-
-      add(idGenerator.generateID());
-
-      // journal.compact();
-
-      stopJournal();
-      createJournal();
-      startJournal();
-      loadAndCheck();
-
-   }
-
-   public void testCompactwithConcurrentAppendAndUpdate() throws Exception
-   {
-   }
-
-   public void testCompactWithPendingTransactionAndDelete() throws Exception
-   {
-   }
-
-   public void testCompactingWithPendingTransaction() throws Exception
-   {
-
-   }
-
-   public void testSimpleCompacting() throws Exception
-   {
-      setup(50, 60 * 1024, true);
-
-      createJournal();
-      startJournal();
-      load();
-
-      int NUMBER_OF_RECORDS = 1000;
-
-      // add and remove some data to force reclaiming
-      {
-         ArrayList<Long> ids = new ArrayList<Long>();
-         for (int i = 0; i < NUMBER_OF_RECORDS; i++)
-         {
-            long id = idGenerator.generateID();
-            ids.add(id);
-            add(id);
-            if (i > 0 && i % 100 == 0)
-            {
-               journal.forceMoveNextFile();
-            }
-         }
-
-         for (Long id : ids)
-         {
-            delete(id);
-         }
-
-         journal.forceMoveNextFile();
-
-         journal.checkAndReclaimFiles();
-      }
-
-      long transactionID = 0;
-
-      for (int i = 0; i < NUMBER_OF_RECORDS / 2; i++)
-      {
-         add(i);
-         if (i % 10 == 0 && i > 0)
-         {
-            journal.forceMoveNextFile();
-         }
-         update(i);
-      }
-
-      for (int i = NUMBER_OF_RECORDS / 2; i < NUMBER_OF_RECORDS; i++)
-      {
-
-         addTx(transactionID, i);
-         updateTx(transactionID, i);
-         if (i % 10 == 0)
-         {
-            journal.forceMoveNextFile();
-         }
-         commit(transactionID++);
-         update(i);
-      }
-
-      System.out.println("Number of Files: " + journal.getDataFilesCount());
-
-      for (int i = 0; i < NUMBER_OF_RECORDS; i++)
-      {
-         if (!(i % 10 == 0))
-         {
-            delete(i);
-         }
-      }
-
-      journal.forceMoveNextFile();
-
-      System.out.println("Number of Files: " + journal.getDataFilesCount());
-
-      System.out.println("Before compact ****************************");
-      System.out.println(journal.debug());
-      System.out.println("*****************************************");
-
-      journal.compact();
-
-      add(idGenerator.generateID());
-
-      journal.compact();
-
-      stopJournal();
-      createJournal();
-      startJournal();
-      loadAndCheck();
-
-   }
-
-   protected int getAlignment()
-   {
-      return 1;
-   }
-
-   @Override
-   protected void setUp() throws Exception
-   {
-      super.setUp();
-
-      File file = new File(journalDir);
-
-      deleteDirectory(file);
-
-      file.mkdir();
-   }
-
-   /* (non-Javadoc)
-    * @see org.jboss.messaging.tests.unit.core.journal.impl.JournalImplTestBase#getFileFactory()
-    */
-   @Override
-   protected SequentialFileFactory getFileFactory() throws Exception
-   {
-      return new NIOSequentialFileFactory(journalDir);
-   }
-
-}

Copied: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java (from rev 7498, branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java)
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java	                        (rev 0)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java	2009-06-30 01:12:59 UTC (rev 7499)
@@ -0,0 +1,448 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ * 
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ * 
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.journal;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.JournalImpl;
+import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.tests.unit.core.journal.impl.JournalImplTestBase;
+import org.jboss.messaging.utils.IDGenerator;
+import org.jboss.messaging.utils.TimeAndCounterIDGenerator;
+
+/**
+ * 
+ * A JournalImplTestBase
+ * 
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public class NIOJournalCompactTest extends JournalImplTestBase
+{
+   private static final Logger log = Logger.getLogger(NIOJournalCompactTest.class);
+
+   protected String journalDir = System.getProperty("user.home") + "/journal-test";
+
+   private static final int NUMBER_OF_RECORDS = 1000;
+
+   IDGenerator idGenerator = new TimeAndCounterIDGenerator();
+
+   // General tests
+   // =============
+
+   public void testCrashRenamingFiles() throws Exception
+   {
+   }
+
+   public void testCompactwithPendingXACommit() throws Exception
+   {
+   }
+
+   public void testCompactwithPendingXAPrepareAndCommit() throws Exception
+   {
+   }
+
+   public void testCompactwithPendingCommit() throws Exception
+   {
+      InternalCompactTest(false, false, false, false, true);
+   }
+
+   public void testCompactwithPendingCommitFollowedByDelete() throws Exception
+   {
+   }
+   
+   
+   public void testCompactwithConcurrentUpdateAndDeletes() throws Exception
+   {
+      InternalCompactTest(true, false, true, true, false);
+   }
+
+   public void testCompactwithConcurrentDeletes() throws Exception
+   {
+      InternalCompactTest(true, false, false, true, false);
+   }
+
+   public void testCompactwithConcurrentUpdates() throws Exception
+   {
+      InternalCompactTest(true, false, true, false, false);
+   }
+
+   public void testCompactWithConcurrentAppend() throws Exception
+   {
+      InternalCompactTest(true, true, false, false, false);
+   }
+
+   private void InternalCompactTest(final boolean regularAdd,
+                                    final boolean performAppend,
+                                    final boolean performUpdate,
+                                    final boolean performDelete,
+                                    final boolean pendingTransactions) throws Exception
+   {
+      setup(50, 60 * 1024, true);
+
+      ArrayList<Long> liveIDs = new ArrayList<Long>();
+
+      ArrayList<Long> listPendingTransactions = new ArrayList<Long>();
+
+      final CountDownLatch latchDone = new CountDownLatch(1);
+      final CountDownLatch latchWait = new CountDownLatch(1);
+      journal = new JournalImpl(fileSize, minFiles, fileFactory, filePrefix, fileExtension, maxAIO)
+      {
+         @Override
+         public void onCompactDone()
+         {
+            latchDone.countDown();
+            System.out.println("Waiting on Compact");
+            try
+            {
+               latchWait.await();
+            }
+            catch (InterruptedException e)
+            {
+               e.printStackTrace();
+            }
+            System.out.println("Done");
+         }
+      };
+      startJournal();
+      load();
+
+      long transactionID = 0;
+
+      if (regularAdd)
+      {
+
+         for (int i = 0; i < NUMBER_OF_RECORDS / 2; i++)
+         {
+            add(i);
+            if (i % 10 == 0 && i > 0)
+            {
+               journal.forceMoveNextFile();
+            }
+            update(i);
+         }
+
+         for (int i = NUMBER_OF_RECORDS / 2; i < NUMBER_OF_RECORDS; i++)
+         {
+
+            addTx(transactionID, i);
+            updateTx(transactionID, i);
+            if (i % 10 == 0)
+            {
+               journal.forceMoveNextFile();
+            }
+            commit(transactionID++);
+            update(i);
+         }
+      }
+
+      if (pendingTransactions)
+      {
+         for (long i = 0; i < 100; i++)
+         {
+            addTx(transactionID, idGenerator.generateID());
+            updateTx(transactionID, idGenerator.generateID());
+            listPendingTransactions.add(transactionID++);
+         }
+      }
+
+      System.out.println("Number of Files: " + journal.getDataFilesCount());
+
+      if (regularAdd)
+      {
+         for (int i = 0; i < NUMBER_OF_RECORDS; i++)
+         {
+            if (!(i % 10 == 0))
+            {
+               delete(i);
+            }
+            else
+            {
+               liveIDs.add((long)i);
+            }
+         }
+      }
+
+      journal.forceMoveNextFile();
+
+      Thread t = new Thread()
+      {
+         @Override
+         public void run()
+         {
+            try
+            {
+               journal.compact();
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+            }
+         }
+      };
+
+      t.start();
+
+      latchDone.await();
+
+      int nextID = NUMBER_OF_RECORDS;
+
+      if (performAppend)
+      {
+         for (int i = 0; i < 50; i++)
+         {
+            add(nextID++);
+            if (i % 10 == 0)
+            {
+               journal.forceMoveNextFile();
+            }
+         }
+
+         for (int i = 0; i < 50; i++)
+         {
+            // A Total new transaction (that was created after the compact started) to add new record while compacting is still working
+            addTx(transactionID, nextID++);
+            commit(transactionID++);
+            if (i % 10 == 0)
+            {
+               journal.forceMoveNextFile();
+            }
+         }
+      }
+
+      if (performUpdate)
+      {
+         int count = 0;
+         for (Long liveID : liveIDs)
+         {
+            if (count++ % 2 == 0)
+            {
+               update(liveID);
+            }
+            else
+            {
+               // A Total new transaction (that was created after the compact started) to update a record that is being compacted
+               updateTx(transactionID, liveID);
+               commit(transactionID++);
+            }
+         }
+      }
+
+      if (performDelete)
+      {
+         int count = 0;
+         for (long liveID : liveIDs)
+         {
+            if (count++ % 2 == 0)
+            {
+               delete(liveID);
+            }
+            else
+            {
+               // A Total new transaction (that was created after the compact started) to delete a record that is being compacted
+               deleteTx(transactionID, liveID);
+               commit(transactionID++);
+            }
+
+         }
+      }
+
+      if (pendingTransactions)
+      {
+         for (long tx : listPendingTransactions)
+         {
+            if (tx % 2 == 0)
+            {
+               commit(tx);
+            }
+            else
+            {
+               rollback(tx);
+            }
+         }
+      }
+
+      /** Some independent adds and updates */
+      for (int i = 0; i < 1000; i++)
+      {
+         long id = idGenerator.generateID();
+         add(id);
+         delete(id);
+
+         if (i % 100 == 0)
+         {
+            journal.forceMoveNextFile();
+         }
+      }
+
+      journal.forceMoveNextFile();
+
+      latchWait.countDown();
+
+      t.join();
+
+      add(idGenerator.generateID());
+
+      // journal.compact();
+
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+
+   }
+
+   public void testCompactwithConcurrentAppendAndUpdate() throws Exception
+   {
+   }
+
+   public void testCompactWithPendingTransactionAndDelete() throws Exception
+   {
+   }
+
+   public void testCompactingWithPendingTransaction() throws Exception
+   {
+
+   }
+
+   public void testSimpleCompacting() throws Exception
+   {
+      setup(50, 60 * 1024, true);
+
+      createJournal();
+      startJournal();
+      load();
+
+      int NUMBER_OF_RECORDS = 1000;
+
+      // add and remove some data to force reclaiming
+      {
+         ArrayList<Long> ids = new ArrayList<Long>();
+         for (int i = 0; i < NUMBER_OF_RECORDS; i++)
+         {
+            long id = idGenerator.generateID();
+            ids.add(id);
+            add(id);
+            if (i > 0 && i % 100 == 0)
+            {
+               journal.forceMoveNextFile();
+            }
+         }
+
+         for (Long id : ids)
+         {
+            delete(id);
+         }
+
+         journal.forceMoveNextFile();
+
+         journal.checkAndReclaimFiles();
+      }
+
+      long transactionID = 0;
+
+      for (int i = 0; i < NUMBER_OF_RECORDS / 2; i++)
+      {
+         add(i);
+         if (i % 10 == 0 && i > 0)
+         {
+            journal.forceMoveNextFile();
+         }
+         update(i);
+      }
+
+      for (int i = NUMBER_OF_RECORDS / 2; i < NUMBER_OF_RECORDS; i++)
+      {
+
+         addTx(transactionID, i);
+         updateTx(transactionID, i);
+         if (i % 10 == 0)
+         {
+            journal.forceMoveNextFile();
+         }
+         commit(transactionID++);
+         update(i);
+      }
+
+      System.out.println("Number of Files: " + journal.getDataFilesCount());
+
+      for (int i = 0; i < NUMBER_OF_RECORDS; i++)
+      {
+         if (!(i % 10 == 0))
+         {
+            delete(i);
+         }
+      }
+
+      journal.forceMoveNextFile();
+
+      System.out.println("Number of Files: " + journal.getDataFilesCount());
+
+      System.out.println("Before compact ****************************");
+      System.out.println(journal.debug());
+      System.out.println("*****************************************");
+
+      journal.compact();
+
+      add(idGenerator.generateID());
+
+      journal.compact();
+
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+
+   }
+
+   protected int getAlignment()
+   {
+      return 1;
+   }
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      File file = new File(journalDir);
+
+      deleteDirectory(file);
+
+      file.mkdir();
+   }
+
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.tests.unit.core.journal.impl.JournalImplTestBase#getFileFactory()
+    */
+   @Override
+   protected SequentialFileFactory getFileFactory() throws Exception
+   {
+      return new NIOSequentialFileFactory(journalDir);
+   }
+
+}

Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java	2009-06-29 23:47:11 UTC (rev 7498)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java	2009-06-30 01:12:59 UTC (rev 7499)
@@ -22,22 +22,11 @@
 
 package org.jboss.messaging.tests.unit.core.journal.impl;
 
-import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.Executor;
 
-import org.jboss.messaging.core.asyncio.BufferCallback;
-import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
 import org.jboss.messaging.core.journal.EncodingSupport;
 import org.jboss.messaging.core.journal.RecordInfo;
-import org.jboss.messaging.core.journal.SequentialFile;
-import org.jboss.messaging.core.journal.SequentialFileFactory;
-import org.jboss.messaging.core.journal.impl.AIOSequentialFile;
-import org.jboss.messaging.core.journal.impl.JournalFile;
-import org.jboss.messaging.core.journal.impl.JournalFileImpl;
 import org.jboss.messaging.core.journal.impl.JournalImpl;
-import org.jboss.messaging.core.journal.impl.JournalReaderCallback;
-import org.jboss.messaging.core.journal.impl.NIOSequentialFile;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.tests.unit.core.journal.impl.fakes.SimpleEncoding;
 
@@ -3099,142 +3088,6 @@
 
    }
 
-   public void testSimpleCompacting() throws Exception
-   {
-      String tmp = "file.jbm.cmp";
-      
-      System.out.println("index = " + tmp.lastIndexOf(".cmp"));
-      
-      System.out.println("new name = " + tmp.substring(0, tmp.lastIndexOf(".cmp")));
-      setup(2, 60 * 1024, true);
-
-      createJournal();
-      startJournal();
-      load();
-
-      int NUMBER_OF_RECORDS = 100;
-
-      long transactionID = 0;
-
-      for (int i = 0; i < NUMBER_OF_RECORDS / 2; i++)
-      {
-         add(i);
-         if (i % 10 == 0 && i > 0)
-         {
-            journal.forceMoveNextFile();
-         }
-         update(i);
-      }
-
-      for (int i = NUMBER_OF_RECORDS / 2; i < NUMBER_OF_RECORDS; i++)
-      {
-
-         addTx(transactionID, i);
-         updateTx(transactionID, i);
-         if (i % 10 == 0)
-         {
-            journal.forceMoveNextFile();
-         }
-         commit(transactionID++);
-         update(i);
-      }
-
-      System.out.println("Number of Files: " + journal.getDataFilesCount());
-
-      for (int i = 0; i < NUMBER_OF_RECORDS; i++)
-      {
-         if (!(i % 10 == 0))
-         {
-            delete(i);
-         }
-      }
-
-      journal.forceMoveNextFile();
-      
-      System.out.println("Number of Files: " + journal.getDataFilesCount());
-
-      System.out.println("Before compact ****************************");
-      System.out.println(journal.debug());
-      System.out.println("*****************************************");
-
-      journal.compact();
-
-      stopJournal();
-      createJournal();
-      startJournal();
-      loadAndCheck();
-
-      for (String fileName : fileFactory.listFiles("cmp"))
-      {
-         System.out.println("File = " + fileName);
-
-         SequentialFile readFile = fileFactory.createSequentialFile(fileName, 1);
-
-         ((JournalImpl)journal).readJournalFile(new JournalFileImpl(readFile, 13, 1), new JournalReaderCallback()
-         {
-            public void addRecord(RecordInfo info) throws Exception
-            {
-               System.out.println("AddrecordID = " + info.id);
-            }
-
-            public void addRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
-            {
-               System.out.println("UpdRecordTX = " + transactionID + ", recordID=" + recordInfo.id);
-            }
-
-            public void commitRecord(long transactionID, int numberOfRecords) throws Exception
-            {
-               // TODO Auto-generated method stub
-
-            }
-
-            public void deleteRecord(long recordID) throws Exception
-            {
-               // TODO Auto-generated method stub
-
-            }
-
-            public void deleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
-            {
-               // TODO Auto-generated method stub
-
-            }
-
-            public void markAsDataFile(JournalFile file)
-            {
-               // TODO Auto-generated method stub
-
-            }
-
-            public void prepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
-            {
-               // TODO Auto-generated method stub
-
-            }
-
-            public void rollbackRecord(long transactionID) throws Exception
-            {
-               // TODO Auto-generated method stub
-
-            }
-
-            public void updateRecord(RecordInfo recordInfo) throws Exception
-            {
-               System.out.println("UpdRecordID : " + recordInfo.id);
-
-            }
-
-            public void updateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
-            {
-               System.out.println("UpdRecordID : " + recordInfo.id);
-
-            }
-
-         });
-      }
-
-   }
-
    protected abstract int getAlignment();
 
 }




More information about the jboss-cvs-commits mailing list