[jboss-cvs] JBoss Messaging SVN: r4746 - in trunk: src/main/org/jboss/messaging/core/asyncio/impl and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jul 29 15:40:18 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-07-29 15:40:18 -0400 (Tue, 29 Jul 2008)
New Revision: 4746

Modified:
   trunk/src/config/jbm-configuration.xml
   trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
   trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/Reclaimer.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
Journal: Fixing reclaiming because of concurrent adds and deletes, few other tweaks

Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml	2008-07-29 15:40:12 UTC (rev 4745)
+++ trunk/src/config/jbm-configuration.xml	2008-07-29 19:40:18 UTC (rev 4746)
@@ -95,13 +95,13 @@
       <!-- 10 MB journal file size -->
       <journal-file-size>10485760</journal-file-size>
 
-      <journal-min-files>10</journal-min-files>
+      <journal-min-files>15</journal-min-files>
 
       <!-- Maximum simultaneous asynchronous writes accepted by the native layer.
       (parameter ignored on NIO)
        You can verify the max AIO on the OS level at /proc/sys/fs/aio_max_nr. (aio-nr will give you the current max-aio being used)
       -->
-      <journal-max-aio>5000</journal-max-aio>
+      <journal-max-aio>10000</journal-max-aio>
 
    </configuration>
 

Modified: trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java	2008-07-29 15:40:12 UTC (rev 4745)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java	2008-07-29 19:40:18 UTC (rev 4746)
@@ -290,7 +290,6 @@
 	// Native
 	// ------------------------------------------------------------------------------------------
 	
-	@SuppressWarnings("unchecked")
 	private static native long init(String fileName, int maxIO, Logger logger);
 	
 	private native void write(long handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage);

Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2008-07-29 15:40:12 UTC (rev 4745)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2008-07-29 19:40:18 UTC (rev 4746)
@@ -39,6 +39,13 @@
     */
    void open() throws Exception;
    
+   /**
+    * For certain operations (like loading) we don't need open the file with full maxIO
+    * @param maxIO
+    * @throws Exception
+    */
+   void open(int maxIO) throws Exception;
+   
    int getAlignment() throws Exception;
    
    int calculateBlockStart(int position) throws Exception;
@@ -59,6 +66,8 @@
    
    void position(int pos) throws Exception;
    
+   int position() throws Exception;
+   
    void close() throws Exception;
    
 }

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2008-07-29 15:40:12 UTC (rev 4745)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2008-07-29 19:40:18 UTC (rev 4746)
@@ -166,20 +166,31 @@
       return fileName;
    }
    
-   public synchronized void open() throws Exception
+   public void open() throws Exception
    {
+     open(maxIO);
+   }
+   
+   public synchronized void open(int currentMaxIO) throws Exception
+   {
       opened = true;
       executor = Executors.newSingleThreadExecutor();
       aioFile = newFile();
-      aioFile.open(journalDir + "/" + fileName, maxIO);
+      aioFile.open(journalDir + "/" + fileName, currentMaxIO);
       position.set(0);
       
    }
+   
    public void position(final int pos) throws Exception
    {
       position.set(pos);		
    }
    
+   public int position() throws Exception
+   {
+      return (int)position.get();
+   }
+   
    public int read(final ByteBuffer bytes, final IOCallback callback) throws Exception
    {
       int bytesToRead = bytes.limit();

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java	2008-07-29 15:40:12 UTC (rev 4745)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java	2008-07-29 19:40:18 UTC (rev 4746)
@@ -25,6 +25,7 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.logging.Logger;
@@ -47,11 +48,11 @@
    
    private int offset;
    
-   private int posCount;
+   private final AtomicInteger posCount = new AtomicInteger(0);
    
    private boolean canReclaim;
    
-   private Map<JournalFile, Integer> negCounts = new ConcurrentHashMap<JournalFile, Integer>();
+   private Map<JournalFile, AtomicInteger> negCounts = new ConcurrentHashMap<JournalFile, AtomicInteger>();
    
    public JournalFileImpl(final SequentialFile file, final int orderingID)
    {
@@ -62,7 +63,7 @@
    
    public int getPosCount()
    {
-      return posCount;
+      return posCount.intValue();
    }
    
    public boolean isCanReclaim()
@@ -77,16 +78,12 @@
    
    public void incNegCount(final JournalFile file)
    {
-      Integer count = negCounts.get(file);
-      
-      int c = count == null ? 1 : count.intValue() + 1;
-      
-      negCounts.put(file, c);
+      getOrCreateNegCount(file).incrementAndGet();
    }
    
    public int getNegCount(final JournalFile file)
    {		
-      Integer count =  negCounts.get(file);
+      AtomicInteger count =  negCounts.get(file);
       
       if (count == null)
       {
@@ -100,12 +97,12 @@
    
    public void incPosCount()
    {
-      posCount++;
+      posCount.incrementAndGet();
    }
    
    public void decPosCount()
    {
-      posCount--;
+      posCount.decrementAndGet();
    }
    
    public void extendOffset(final int delta)
@@ -151,13 +148,27 @@
    {
       StringBuilder builder = new StringBuilder();
       
-      for (Entry<JournalFile, Integer> entry: negCounts.entrySet())
+      for (Entry<JournalFile, AtomicInteger> entry: negCounts.entrySet())
       {
          builder.append(" file = " + entry.getKey() + " negcount value = " + entry.getValue() + "\n");
       }
       
       return builder.toString();
    }
+
    
+   private synchronized AtomicInteger getOrCreateNegCount(final JournalFile file)
+   {
+      AtomicInteger count = negCounts.get(file);
+      
+      if (count == null)
+      {
+         count = new AtomicInteger();
+         negCounts.put(file, count);
+      }
+      
+      return count;
+   }
    
+   
 }

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-07-29 15:40:12 UTC (rev 4745)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-07-29 19:40:18 UTC (rev 4746)
@@ -199,6 +199,13 @@
    
    private static final boolean trace = log.isTraceEnabled();
    
+   // This method exists just to make debug easier.
+   // I could replace log.trace by log.info temporarily while I was debugging Journal 
+   private static final void trace(String message)
+   {
+      log.trace(message);
+   }
+   
    // Constructors --------------------------------------------------
    
    public JournalImpl(final int fileSize, final int minFiles,
@@ -276,9 +283,18 @@
       bb.putInt(size);        
       bb.rewind();
       
-      JournalFile usedFile = appendRecord(bb.getBuffer(), syncNonTransactional, null);
-      
-      posFilesMap.put(id, new PosFiles(usedFile));
+      try
+      {                 
+         lock.acquire();
+
+         JournalFile usedFile = appendRecord(bb.getBuffer(), syncNonTransactional, null);
+         
+         posFilesMap.put(id, new PosFiles(usedFile));
+      }
+      finally
+      {
+         lock.release();
+      }
    }
    
    public void appendAddRecord(final long id, final byte recordType, final byte[] record) throws Exception
@@ -302,9 +318,20 @@
       bb.putInt(size);			
       bb.rewind();
       
-      JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
       
-      posFilesMap.put(id, new PosFiles(usedFile));
+      try
+      {                 
+         lock.acquire();
+
+         JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
+         
+         posFilesMap.put(id, new PosFiles(usedFile));
+      }
+      finally
+      {
+         lock.release();
+      }
+      
    }
    
    public void appendUpdateRecord(final long id, final byte recordType, final byte[] record) throws Exception
@@ -334,9 +361,18 @@
       bb.putInt(size);     
       bb.rewind();
       
-      JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
-      
-      posFiles.addUpdateFile(usedFile);
+      try
+      {                 
+         lock.acquire();
+
+         JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
+         
+         posFiles.addUpdateFile(usedFile);
+      }
+      finally
+      {
+         lock.release();
+      }
    }
    
    public void appendUpdateRecord(final long id, final byte recordType, final EncodingSupport record) throws Exception
@@ -366,9 +402,18 @@
       bb.putInt(size);     
       bb.rewind();
       
-      JournalFile usedFile = appendRecord(bb.getBuffer(), syncNonTransactional, null);
-      
-      posFiles.addUpdateFile(usedFile);
+      try
+      {                 
+         lock.acquire();
+
+         JournalFile usedFile = appendRecord(bb.getBuffer(), syncNonTransactional, null);
+         
+         posFiles.addUpdateFile(usedFile);
+      }
+      finally
+      {
+         lock.release();
+      }
    }
    
    public void appendDeleteRecord(long id) throws Exception
@@ -395,8 +440,18 @@
       bb.putInt(size);     
       bb.rewind();
       
-      JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
-      posFiles.addDelete(usedFile);
+      try
+      {                 
+         lock.acquire();
+         
+         JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
+         
+         posFiles.addDelete(usedFile);
+      }
+      finally
+      {
+         lock.release();
+      }
    }     
    
    public long getTransactionID()
@@ -428,11 +483,21 @@
       bb.putInt(size);     
       bb.rewind();
       
-      JournalFile usedFile = appendRecord(bb.getBuffer(), false, getTransactionCallback(txID));
-      
-      JournalTransaction tx = getTransactionInfo(txID);
-      
-      tx.addPositive(usedFile, id);
+      try
+      {                 
+         lock.acquire();
+         
+         JournalFile usedFile = appendRecord(bb.getBuffer(), false, getTransactionCallback(txID));
+         
+         JournalTransaction tx = getTransactionInfo(txID);
+         
+         tx.addPositive(usedFile, id);
+         
+      }
+      finally
+      {
+         lock.release();
+      }
    }
    
    public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, final byte[] record) throws Exception
@@ -456,11 +521,21 @@
       bb.putInt(size);
       bb.rewind();
       
-      JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID));
-      
-      JournalTransaction tx = getTransactionInfo(txID);
-      
-      tx.addPositive(usedFile, id);
+      try
+      {                 
+         lock.acquire();
+
+         JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID));
+         
+         JournalTransaction tx = getTransactionInfo(txID);
+         
+         tx.addPositive(usedFile, id);
+
+      }
+      finally
+      {
+         lock.release();
+      }
    }
    
    public void appendUpdateRecordTransactional(final long txID, final long id, byte recordType, final byte[] record) throws Exception
@@ -484,11 +559,21 @@
       bb.putInt(size);     
       bb.rewind();
       
-      JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID));
-      
-      JournalTransaction tx = getTransactionInfo(txID);
-      
-      tx.addPositive(usedFile, id);
+      try
+      {                 
+         lock.acquire();
+
+         JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID));
+         
+         JournalTransaction tx = getTransactionInfo(txID);
+         
+         tx.addPositive(usedFile, id);
+
+      }
+      finally
+      {
+         lock.release();
+      }
    }
    
    public void appendUpdateRecordTransactional(final long txID, final long id, byte recordType, EncodingSupport record) throws Exception
@@ -513,11 +598,21 @@
       bb.putInt(size);     
       bb.rewind();
       
-      JournalFile usedFile = appendRecord(bb.getBuffer(), false, getTransactionCallback(txID));
-      
-      JournalTransaction tx = getTransactionInfo(txID);
-      
-      tx.addPositive(usedFile, id);
+      try
+      {                 
+         lock.acquire();
+
+         JournalFile usedFile = appendRecord(bb.getBuffer(), false, getTransactionCallback(txID));
+         
+         JournalTransaction tx = getTransactionInfo(txID);
+         
+         tx.addPositive(usedFile, id);
+
+      }
+      finally
+      {
+         lock.release();
+      }
    }
    
    public void appendDeleteRecordTransactional(final long txID, final long id) throws Exception
@@ -538,11 +633,21 @@
       bb.putInt(size);     
       bb.rewind();
       
-      JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID));
-      
-      JournalTransaction tx = getTransactionInfo(txID);
-      
-      tx.addNegative(usedFile, id);      
+      try
+      {                 
+         lock.acquire();
+
+         JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID));
+         
+         JournalTransaction tx = getTransactionInfo(txID);
+         
+         tx.addNegative(usedFile, id);      
+
+      }
+      finally
+      {
+         lock.release();
+      }
    }  
    
    public void appendPrepareRecord(final long txID) throws Exception
@@ -559,9 +664,22 @@
          throw new IllegalStateException("Cannot find tx with id " + txID);
       }
       
-      JournalFile usedFile = writeTransaction(PREPARE_RECORD, txID, tx);
+      ByteBuffer bb = writeTransaction(PREPARE_RECORD, txID, tx);
       
-      tx.prepare(usedFile);
+      
+      try
+      {                 
+         lock.acquire();
+
+         JournalFile usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));
+         
+         tx.prepare(usedFile);
+
+      }
+      finally
+      {
+         lock.release();
+      }
    }
 
    public void appendCommitRecord(final long txID) throws Exception
@@ -571,20 +689,35 @@
          throw new IllegalStateException("Journal must be loaded first");
       }		
       
-      JournalTransaction tx = transactionInfos.get(txID);
+      JournalTransaction tx = transactionInfos.remove(txID);
       
       if (tx == null)
       {
          throw new IllegalStateException("Cannot find tx with id " + txID);
       }
       
-      JournalFile usedFile = writeTransaction(COMMIT_RECORD, txID, tx);
+      ByteBuffer bb = writeTransaction(COMMIT_RECORD, txID, tx);
       
-      transactionInfos.remove(txID);
-      transactionCallbacks.remove(txID);
       
-      tx.commit(usedFile);
+      try
+      {                 
+         lock.acquire();
+
+         JournalFile usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));
+         
+         transactionCallbacks.remove(txID);
+         
+         tx.commit(usedFile);
+
+      }
+      finally
+      {
+         lock.release();
+      }
       
+      
+      
+      
    }
    
    public void appendRollbackRecord(final long txID) throws Exception
@@ -611,11 +744,21 @@
       bb.putInt(size);        
       bb.rewind();
       
-      JournalFile usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));      
-      
-      transactionCallbacks.remove(txID);
-      
-      tx.rollback(usedFile);
+      try
+      {                 
+         lock.acquire();
+
+         JournalFile usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));      
+         
+         transactionCallbacks.remove(txID);
+         
+         tx.rollback(usedFile);
+
+      }
+      finally
+      {
+         lock.release();
+      }
    }
    
    public synchronized long load(final List<RecordInfo> committedRecords,
@@ -676,7 +819,7 @@
       
       List<JournalFile> orderedFiles = orderFiles();
       
-      int lastDataPos = -1;
+      int lastDataPos = SIZE_HEADER;
       
       long maxTransactionID = -1;
       
@@ -684,7 +827,7 @@
       
       for (JournalFile file: orderedFiles)
       {  
-         file.getFile().open();
+         file.getFile().open(1);
          
          ByteBuffer bb = fileFactory.newBuffer(fileSize);
          
@@ -1224,7 +1367,7 @@
          {
             //File can be reclaimed or deleted
             
-            if (trace) log.trace("Reclaiming file " + file);
+            if (trace) trace("Reclaiming file " + file);
               
             dataFiles.remove(file);
             
@@ -1239,7 +1382,7 @@
             }
             else
             {
-               file.getFile().open();
+               file.getFile().open(1);
                
                file.getFile().delete();
             }
@@ -1390,7 +1533,7 @@
       
       SequentialFile sf = file.getFile();
       
-      sf.open();
+      sf.open(1);
       
       ByteBuffer bb = fileFactory.newBuffer(SIZE_INT); 
       
@@ -1460,7 +1603,7 @@
    }
 
    /** a method that shares the logic of writing a complete transaction between COMMIT and PREPARE */
-   private JournalFile writeTransaction(final byte recordType, final long txID, final JournalTransaction tx) throws Exception
+   private ByteBuffer writeTransaction(final byte recordType, final long txID, final JournalTransaction tx) throws Exception
    {
       int size = SIZE_COMPLETE_TRANSACTION_RECORD + tx.getElementsSummary().size() * SIZE_INT * 2;
       
@@ -1481,8 +1624,7 @@
       bb.putInt(size);           
       bb.rewind();
       
-      JournalFile usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));
-      return usedFile;
+      return bb;
    }
    
    private boolean isTransaction(final byte recordType)
@@ -1552,7 +1694,7 @@
       {
          SequentialFile file = fileFactory.createSequentialFile(fileName, maxAIO);
          
-         file.open();
+         file.open(1);
          
          ByteBuffer bb = fileFactory.newBuffer(SIZE_INT);
          
@@ -1587,41 +1729,35 @@
       return orderedFiles;
    }
    
+   /** 
+    * You need to call lock.acquire before calling this method
+    * */
    private JournalFile appendRecord(final ByteBuffer bb, final boolean sync, final TransactionCallback callback) throws Exception
    {
-      lock.acquire();
       
       int size = bb.capacity();
-      
-      try
-      {                 
-         checkFile(size);
-         bb.position(SIZE_BYTE);
-         if (currentFile == null)
+      checkFile(size);
+      bb.position(SIZE_BYTE);
+      if (currentFile == null)
+      {
+         throw new Exception ("Current file = null");
+      }
+      bb.putInt(currentFile.getOrderingID());
+      bb.rewind();
+      if (callback != null)
+      {
+         currentFile.getFile().write(bb, callback);
+         if (sync)
          {
-            throw new Exception ("Current file = null");
+            callback.waitCompletion();
          }
-         bb.putInt(currentFile.getOrderingID());
-         bb.rewind();
-         if (callback != null)
-         {
-            currentFile.getFile().write(bb, callback);
-            if (sync)
-            {
-               callback.waitCompletion();
-            }
-         }
-         else
-         {
-            currentFile.getFile().write(bb, sync);       
-         }
-         currentFile.extendOffset(size);
-         return currentFile;
       }
-      finally
+      else
       {
-         lock.release();
+         currentFile.getFile().write(bb, sync);       
       }
+      currentFile.extendOffset(size);
+      return currentFile;
    }
    
    private JournalFile createFile(boolean keepOpened) throws Exception
@@ -1724,7 +1860,7 @@
             {
                try
                {
-                     checkAndReclaimFiles();
+                   checkAndReclaimFiles();
                }
                catch (Exception e)
                {

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2008-07-29 15:40:12 UTC (rev 4745)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2008-07-29 19:40:18 UTC (rev 4746)
@@ -74,9 +74,9 @@
    {
       return fileName;
    }
-   
-   public void open() throws Exception
-   {     
+
+   public synchronized void open() throws Exception
+   {
       file = new File(journalDir + "/" + fileName);
       
       rfile = new RandomAccessFile(file, "rw");
@@ -84,6 +84,11 @@
       channel = rfile.getChannel();    
    }
    
+   public void open(int currentMaxIO) throws Exception
+   {
+      open();
+   }
+   
    public void fill(final int position, final int size, final byte fillCharacter) throws Exception
    {
       ByteBuffer bb = ByteBuffer.allocateDirect(size);
@@ -190,4 +195,9 @@
       channel.position(pos);
    }
    
+   public int position() throws Exception
+   {
+      return (int) channel.position();
+   }
+   
 }

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/Reclaimer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/Reclaimer.java	2008-07-29 15:40:12 UTC (rev 4745)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/Reclaimer.java	2008-07-29 19:40:18 UTC (rev 4746)
@@ -51,6 +51,13 @@
 {
    private static final Logger log = Logger.getLogger(Reclaimer.class);
    
+   private static boolean trace = log.isTraceEnabled();
+   
+   private static void trace(String message)
+   {
+      log.trace(message);
+   }
+   
    public void scan(final JournalFile[] files)
    {
       for (int i = 0; i < files.length; i++)
@@ -62,9 +69,22 @@
          int posCount = currentFile.getPosCount();
          
          int totNeg = 0;
-         
+
+         if (trace)
+         {
+            trace("posCount on " + currentFile + " = " + posCount);
+         }
+
          for (int j = i; j < files.length; j++)
          {
+            if (trace)
+            {
+               if (files[j].getNegCount(currentFile) != 0)
+               {
+                  trace("Negative from " + files[j] + " = " + files[j].getNegCount(currentFile));
+               }
+            }
+            
             totNeg += files[j].getNegCount(currentFile);
          }
          
@@ -88,6 +108,11 @@
                   }
                   else
                   {
+                     if (trace)
+                     {
+                        trace(currentFile + " Can't be reclaimed because " + file + " has negative values");
+                     }
+                     
                      currentFile.setCanReclaim(false);
                      
                      break;

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java	2008-07-29 15:40:12 UTC (rev 4745)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java	2008-07-29 19:40:18 UTC (rev 4746)
@@ -25,6 +25,11 @@
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.jboss.messaging.core.journal.EncodingSupport;
 import org.jboss.messaging.core.journal.PreparedTransactionInfo;
@@ -33,7 +38,6 @@
 import org.jboss.messaging.core.journal.SequentialFileFactory;
 import org.jboss.messaging.core.journal.impl.JournalImpl;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.MessagingBuffer;
 import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
 import org.jboss.messaging.tests.unit.core.journal.impl.fakes.SimpleEncoding;
 import org.jboss.messaging.tests.util.UnitTestCase;
@@ -862,7 +866,7 @@
       
       for (int i = 0; i < 10; i++)
       {
-         journalImpl.appendAddRecordTransactional(1, 1, (byte) 1, new SimpleEncoding(50,(byte) 1));
+         journalImpl.appendAddRecordTransactional(1, i, (byte) 1, new SimpleEncoding(50,(byte) 1));
          journalImpl.forceMoveNextFile();
       }
       
@@ -997,6 +1001,96 @@
    }
    
    
+   public void testReclaimingAfterConcurrentAddsAndDeletes() throws Exception
+   {
+      final int JOURNAL_SIZE = 10 * 1024;
+      
+      setupJournal(JOURNAL_SIZE, 1);
+      
+      assertEquals(0, records.size());
+      assertEquals(0, transactions.size());
+      
+      final CountDownLatch latchReady = new CountDownLatch(2);
+      final CountDownLatch latchStart = new CountDownLatch(1);
+      final AtomicInteger finishedOK = new AtomicInteger(0);
+      final BlockingQueue<Integer> queueDelete = new LinkedBlockingQueue<Integer>();
+      
+      final int NUMBER_OF_ELEMENTS = 500;
+      
+      
+      Thread t1 = new Thread()
+      {
+         public void run()
+         {
+            try
+            {
+               latchReady.countDown();
+               latchStart.await();
+               for (int i = 0; i < NUMBER_OF_ELEMENTS; i++)
+               {
+                  journalImpl.appendAddRecordTransactional((long)i, i, (byte) 1, new SimpleEncoding(50,(byte) 1));
+                  journalImpl.appendCommitRecord((long)i);
+                  queueDelete.offer(i);
+               }
+               finishedOK.incrementAndGet();
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+            }
+         }
+      };
+      
+      Thread t2 = new Thread()
+      {
+         public void run()
+         {
+            try
+            {
+               latchReady.countDown();
+               latchStart.await();
+               for (int i = 0; i < NUMBER_OF_ELEMENTS; i++)
+               {
+                  Integer toDelete = queueDelete.poll(10, TimeUnit.SECONDS);
+                  if (toDelete == null)
+                  {
+                     break;
+                  }
+                  journalImpl.appendDeleteRecord(toDelete);
+               }
+               finishedOK.incrementAndGet();
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+            }
+         }
+      };
+
+      t1.start();
+      t2.start();
+      
+      latchReady.await();
+      latchStart.countDown();
+      
+      t1.join();
+      t2.join();
+      
+      assertEquals(2, finishedOK.intValue());
+
+      journalImpl.forceMoveNextFile();
+      
+      journalImpl.checkAndReclaimFiles();
+      
+      assertEquals(0, journalImpl.getDataFilesCount());
+
+      assertEquals(2, factory.listFiles("tt").size());
+
+      
+   }
+   
+   
+   
    // Package protected ---------------------------------------------
    
    // Protected -----------------------------------------------------

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2008-07-29 15:40:12 UTC (rev 4745)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2008-07-29 19:40:18 UTC (rev 4746)
@@ -302,9 +302,14 @@
       {
          return fileName;
       }
-      
+
       public void open() throws Exception
       {
+        open(0);
+      }
+      
+      public synchronized void open(int currentMaxIO) throws Exception
+      {
          open = true;
       }
 
@@ -363,6 +368,11 @@
          
          data.position(pos);
       }
+      
+      public int position() throws Exception
+      {
+         return data.position();
+      }
 
       public int write(final ByteBuffer bytes, final IOCallback callback) throws Exception
       {




More information about the jboss-cvs-commits mailing list