[jboss-cvs] JBoss Messaging SVN: r7483 - in branches/clebert_temp_expirement: src/main/org/jboss/messaging/utils and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Jun 26 18:56:53 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-06-26 18:56:53 -0400 (Fri, 26 Jun 2009)
New Revision: 7483

Added:
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/utils/concurrent/
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/utils/concurrent/BlockingDeque.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/utils/concurrent/Deque.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/utils/concurrent/LinkedBlockingDeque.java
   branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalCompactTest.java
Modified:
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
   branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
Dealing with concurrent insert while compacting

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-26 17:03:46 UTC (rev 7482)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-26 22:56:53 UTC (rev 7483)
@@ -65,6 +65,7 @@
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.utils.Pair;
 import org.jboss.messaging.utils.VariableLatch;
+import org.jboss.messaging.utils.concurrent.LinkedBlockingDeque;
 
 /**
  * 
@@ -94,19 +95,23 @@
 
    private static final Logger log = Logger.getLogger(JournalImpl.class);
 
-   private static final boolean trace = log.isTraceEnabled();
+   // private static final boolean trace = log.isTraceEnabled();
 
-   // private static final boolean trace = true;
+   private static final boolean LOAD_TRACE = false;
 
+   private static final boolean trace = true;
+
    // This method exists just to make debug easier.
    // I could replace log.trace by log.info temporarily while I was debugging
    // Journal
    private static final void trace(final String message)
    {
-      // System.out.println(message);
-      log.trace(message);
+      System.out.println(message);
+      // log.trace(message);
    }
 
+   private static final String FILE_COMPACT_CONTROL = "journal-rename-control.ctr";
+
    // The sizes of primitive types
 
    private static final int SIZE_LONG = 8;
@@ -182,7 +187,7 @@
 
    public final String fileExtension;
 
-   private final Queue<JournalFile> dataFiles = new ConcurrentLinkedQueue<JournalFile>();
+   private final LinkedBlockingDeque<JournalFile> dataFiles = new LinkedBlockingDeque<JournalFile>();
 
    private final Queue<JournalFile> freeFiles = new ConcurrentLinkedQueue<JournalFile>();
 
@@ -793,6 +798,8 @@
       return maxID;
    }
 
+   
+   // Note: This method can't be called from the executor, as it will invoke other methods depending on it.
    public void compact() throws Exception
    {
       if (compactor != null)
@@ -811,25 +818,28 @@
       try
       {
 
-         // First, we replace the records by a new one.
          // We need to guarantee that the journal is frozen for this short time
          // We don't freeze the journal as we compact, only for the short time where we replace records
          compactingLock.writeLock().lock();
-         currentFile.clearCounts();
          try
          {
+            // We need to move to the next file, as we need a clear start for negatives and positives counts
+            moveNextFile();
+
             autoReclaim = false;
 
-            recordsSnapshot = records;
-            pendingTransactions = this.pendingTransactions;
+            // Take the snapshots and replace the structures
 
+            recordsSnapshot = JournalImpl.this.records;
+            pendingTransactions = JournalImpl.this.pendingTransactions;
+
+            JournalImpl.this.records = new ConcurrentHashMap<Long, JournalRecord>();
+
             records = new ConcurrentHashMap<Long, JournalRecord>();
 
-            for (JournalFile file : dataFiles)
-            {
-               file.clearCounts();
-               dataFilesToProcess.add(file);
-            }
+            dataFilesToProcess.addAll(dataFiles);
+            
+            dataFiles.clear();
 
             this.compactor = new Compactor(recordsSnapshot, pendingTransactions, dataFilesToProcess.get(0).getFileID());
 
@@ -839,60 +849,106 @@
             compactingLock.writeLock().unlock();
          }
 
-      }
-      finally
-      {
-         autoReclaim = previousReclaimValue;
-      }
+         // Read the files, and use the Compactor class to create the new outputFiles, and the new collections as well
+         JournalFile previousFile = null;
+         for (final JournalFile file : dataFilesToProcess)
+         {
+            if (previousFile != null)
+            {
+               if (file.getFileID() < previousFile.getFileID())
+               {
+                  // Sanity check, this should never happen
+                  throw new IllegalStateException("DataFiles out of order!");
+               }
+            }
+            previousFile = file;
 
-      for (final JournalFile file : dataFilesToProcess)
-      {
-         readJournalFile(file, compactor);
-      }
+            log.info("Compacting file " + file.getFile().getFileName() + ", internalID = " + file.getFileID());
+            readJournalFile(file, compactor);
+         }
 
-      compactor.flush();
+         compactor.flush();
 
-      createRenameFile(dataFilesToProcess, dataFilesToProcess);
+         // pointcut for tests
+         // We need to test concurrent updates on the journal, as the compacting is being performed.
+         // Usually tests will use this to hold the compacting while other structures are being updated.
+         onCompactDone();
 
-      compactingLock.writeLock().lock();
-      try
-      {
-         // Restore relationshipMap
-         // Deal with updates and deletes that happened during the compacting
+         SequentialFile controlFile = createControlFile(dataFilesToProcess, compactor.newDataFiles);
 
+         compactingLock.writeLock().lock();
+         try
+         {
+            for ( Map.Entry<Long, JournalRecord> newRecordEntry: compactor.newRecords.entrySet())
+            {
+               records.put(newRecordEntry.getKey(), newRecordEntry.getValue());
+            }
+
+            for (JournalFile data: compactor.newDataFiles)
+            {
+               dataFiles.addFirst(data);
+            }
+            //dataFiles.add
+            
+            // Restore relationshipMap
+            // Deal with transactions commits that happend during the compacting
+            // Deal with updates and deletes that happened during the compacting
+
+         }
+         finally
+         {
+            compactingLock.writeLock().unlock();
+         }
+
+         renameFiles(dataFilesToProcess, compactor.newDataFiles);
+         deleteControlFile(controlFile);
+
       }
       finally
       {
-         compactingLock.writeLock().unlock();
+         autoReclaim = previousReclaimValue;
       }
 
-      renameFiles(dataFilesToProcess, compactor.newDataFiles);
    }
 
-   protected void renameFiles(List<JournalFile> files, List<JournalFile> newFiles) throws Exception
+   protected void deleteControlFile(SequentialFile controlFile) throws Exception
    {
-      for (JournalFile file : files)
+      controlFile.delete();
+   }
+
+   /** being protected as testcases can override this method */
+   protected void renameFiles(List<JournalFile> oldFiles, List<JournalFile> newFiles) throws Exception
+   {
+      for (JournalFile file : oldFiles)
       {
-         reinitializeFile(file);
+         System.out.println("Reinitializing file " + file);
+         dataFiles.remove(file);
+         freeFiles.add(reinitializeFile(file));
       }
-      
+
       for (JournalFile file : newFiles)
       {
          String newName = file.getFile().getFileName();
-         System.out.println("name = " + newName);
          newName = newName.substring(0, newName.lastIndexOf(".cmp"));
-         
+
+         System.out.println("Renaming file " + newName);
+
          file.getFile().renameTo(newName);
       }
-      
+
    }
 
+   /** This is an interception point for testcases, when the compacted files are written, before replacing the data structures */
+   protected void onCompactDone()
+   {
+   }
+
    /**
     * @throws Exception
     */
-   protected SequentialFile createRenameFile(List<JournalFile> files, List<JournalFile> newFiles) throws Exception
+   protected SequentialFile createControlFile(List<JournalFile> files, List<JournalFile> newFiles) throws Exception
    {
-      SequentialFile tmpRenameFile = fileFactory.createSequentialFile("journal-rename" + ".ren", 1);
+      SequentialFile tmpRenameFile = fileFactory.createSequentialFile(FILE_COMPACT_CONTROL, 1);
 
       tmpRenameFile.open();
 
@@ -1003,12 +1059,11 @@
          sequentialFile = currentFile.getFile();
          sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
          sequentialFile.open(1);
-         fileID = currentFile.getFileID();
-         currentFile = new JournalFileImpl(sequentialFile, fileID, nextOrderingID);
+         fileID = nextOrderingID++;
+         currentFile = new JournalFileImpl(sequentialFile, fileID, fileID);
 
          channelWrapper.writeInt(fileID);
-         channelWrapper.writeInt(currentFile.getOrderingID());
-
+         channelWrapper.writeInt(fileID);
       }
 
       public void addRecord(RecordInfo info) throws Exception
@@ -1144,7 +1199,6 @@
       {
          if (recordsSnapshot.get(info.id) != null)
          {
-            System.out.println("UpdateRecord on compacting");
             int size = SIZE_UPDATE_RECORD + info.data.length;
 
             checkSize(size);
@@ -1288,7 +1342,7 @@
 
             public void addRecord(RecordInfo info) throws Exception
             {
-               if (trace)
+               if (trace && LOAD_TRACE)
                {
                   trace("AddRecord: " + info);
                }
@@ -1301,7 +1355,7 @@
 
             public void updateRecord(RecordInfo info) throws Exception
             {
-               if (trace)
+               if (trace && LOAD_TRACE)
                {
                   trace("UpdateRecord: " + info);
                }
@@ -1323,7 +1377,7 @@
 
             public void deleteRecord(long recordID) throws Exception
             {
-               if (trace)
+               if (trace && LOAD_TRACE)
                {
                   trace("DeleteRecord: " + recordID);
                }
@@ -1347,7 +1401,7 @@
             public void addRecordTX(long transactionID, RecordInfo info) throws Exception
             {
 
-               if (trace)
+               if (trace && LOAD_TRACE)
                {
                   trace((info.isUpdate ? "updateRecordTX: " : "addRecordTX: ") + info + ", txid = " + transactionID);
                }
@@ -1379,7 +1433,7 @@
 
             public void deleteRecordTX(long transactionID, RecordInfo info) throws Exception
             {
-               if (trace)
+               if (trace && LOAD_TRACE)
                {
                   trace("DeleteRecordTX: " + transactionID + " info = " + info);
                }
@@ -1412,7 +1466,7 @@
 
             public void prepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
             {
-               if (trace)
+               if (trace && LOAD_TRACE)
                {
                   trace("prepareRecordTX: txid = " + transactionID);
                }
@@ -1457,7 +1511,7 @@
 
             public void commitRecord(long transactionID, int numberOfRecords) throws Exception
             {
-               if (trace)
+               if (trace && LOAD_TRACE)
                {
                   trace("commitRecord: txid = " + transactionID);
                }
@@ -1518,7 +1572,7 @@
 
             public void rollbackRecord(long transactionID) throws Exception
             {
-               if (trace)
+               if (trace && LOAD_TRACE)
                {
                   trace("rollbackRecord: txid = " + transactionID);
                }
@@ -1548,7 +1602,7 @@
 
             public void markAsDataFile(JournalFile file)
             {
-               if (trace)
+               if (trace && LOAD_TRACE)
                {
                   trace("Marking " + file + " as data file");
                }
@@ -1748,38 +1802,50 @@
 
    public void checkAndReclaimFiles() throws Exception
    {
-      checkReclaimStatus();
+      // We can't start compacting while compacting is working
+      compactingLock.readLock().lock();
+      try
+      {
+         checkReclaimStatus();
 
-      for (JournalFile file : dataFiles)
-      {
-         if (file.isCanReclaim())
+         for (JournalFile file : dataFiles)
          {
-            // File can be reclaimed or deleted
-
-            if (trace)
+            if (file.isCanReclaim())
             {
-               trace("Reclaiming file " + file);
-            }
+               // File can be reclaimed or deleted
 
-            dataFiles.remove(file);
+               if (trace)
+               {
+                  trace("Reclaiming file " + file);
+               }
 
-            // FIXME - size() involves a scan!!!
-            if (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() < minFiles)
-            {
-               // Re-initialise it
+               if (!dataFiles.remove(file))
+               {
+                  log.warn("Could not remove file " + file);
+               }
 
-               JournalFile jf = reinitializeFile(file);
+               // FIXME - size() involves a scan!!!
+               if (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() < minFiles)
+               {
+                  // Re-initialise it
 
-               freeFiles.add(jf);
-            }
-            else
-            {
-               file.getFile().open(1);
+                  JournalFile jf = reinitializeFile(file);
 
-               file.getFile().delete();
+                  freeFiles.add(jf);
+               }
+               else
+               {
+                  file.getFile().open(1);
+
+                  file.getFile().delete();
+               }
             }
          }
       }
+      finally
+      {
+         compactingLock.readLock().unlock();
+      }
    }
 
    public int getDataFilesCount()
@@ -2551,26 +2617,6 @@
       // Now order them by ordering id - we can't use the file name for ordering
       // since we can re-use dataFiles
 
-      class JournalFileComparator implements Comparator<JournalFile>
-      {
-         public int compare(final JournalFile f1, final JournalFile f2)
-         {
-            int oid1 = f1.getOrderingID();
-            int oid2 = f2.getOrderingID();
-
-            if (oid1 == oid2)
-            {
-               int id1 = f1.getFileID();
-               int id2 = f2.getFileID();
-               return oid1 < id2 ? -1 : id1 == id2 ? 0 : 1;
-            }
-            else
-            {
-               return oid1 < oid2 ? -1 : oid1 == oid2 ? 0 : 1;
-            }
-         }
-      }
-
       Collections.sort(orderedFiles, new JournalFileComparator());
 
       return orderedFiles;
@@ -2751,6 +2797,11 @@
 
       currentFile = enqueueOpenFile();
 
+      if (trace)
+      {
+         trace("moveNextFile: " + currentFile.getFile().getFileName());
+      }
+
       fileFactory.activate(currentFile.getFile());
    }
 
@@ -2997,9 +3048,9 @@
 
          if (updateFiles != null)
          {
-            for (JournalFile jf : updateFiles)
+            for (JournalFile updFile : updateFiles)
             {
-               file.incNegCount(jf);
+               file.incNegCount(updFile);
             }
          }
       }
@@ -3367,6 +3418,26 @@
 
    }
 
+   private static class JournalFileComparator implements Comparator<JournalFile>
+   {
+      public int compare(final JournalFile f1, final JournalFile f2)
+      {
+         int oid1 = f1.getOrderingID();
+         int oid2 = f2.getOrderingID();
+
+         if (oid1 == oid2)
+         {
+            int id1 = f1.getFileID();
+            int id2 = f2.getFileID();
+            return oid1 < id2 ? -1 : id1 == id2 ? 0 : 1;
+         }
+         else
+         {
+            return oid1 < oid2 ? -1 : oid1 == oid2 ? 0 : 1;
+         }
+      }
+   }
+
    private class PerfBlast extends Thread
    {
       private final int pages;

Added: branches/clebert_temp_expirement/src/main/org/jboss/messaging/utils/concurrent/BlockingDeque.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/utils/concurrent/BlockingDeque.java	                        (rev 0)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/utils/concurrent/BlockingDeque.java	2009-06-26 22:56:53 UTC (rev 7483)
@@ -0,0 +1,238 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package org.jboss.messaging.utils.concurrent;     // XXX This belongs in java.util!!! XXX
+import java.util.concurrent.*;  // XXX This import goes away        XXX
+import java.util.*;  
+
+/**
+ * A {@link Deque} that additionally supports operations that wait for
+ * the deque to become non-empty when retrieving an element, and wait
+ * for space to become available in the deque when storing an
+ * element. These methods are summarized in the following table:<p>
+ *
+ * <table BORDER CELLPADDING=3 CELLSPACING=1>
+ *  <tr>
+ *    <td></td>
+ *    <td ALIGN=CENTER COLSPAN = 2> <b>First Element (Head)</b></td>
+ *    <td ALIGN=CENTER COLSPAN = 2> <b>Last Element (Tail)</b></td>
+ *  </tr>
+ *  <tr>
+ *    <td></td>
+ *    <td ALIGN=CENTER><em>Block</em></td>
+ *    <td ALIGN=CENTER><em>Time out</em></td>
+ *    <td ALIGN=CENTER><em>Block</em></td>
+ *    <td ALIGN=CENTER><em>Time out</em></td>
+ *  </tr>
+ *  <tr>
+ *    <td><b>Insert</b></td>
+ *    <td>{@link #putFirst putFirst(e)}</td>
+ *    <td>{@link #offerFirst(Object, long, TimeUnit) offerFirst(e, time, unit)}</td>
+ *    <td>{@link #putLast putLast(e)}</td>
+ *    <td>{@link #offerLast(Object, long, TimeUnit) offerLast(e, time, unit)}</td> 
+ *  </tr>
+ *  <tr>
+ *    <td><b>Remove</b></td>
+ *    <td>{@link #takeFirst takeFirst()}</td>
+ *    <td>{@link #pollFirst(long, TimeUnit)  pollFirst(time, unit)}</td>
+ *    <td>{@link #takeLast takeLast()}</td>
+ *    <td>{@link #pollLast(long, TimeUnit) pollLast(time, unit)}</td>
+ *  </tr>
+ * </table>
+ *
+ * <p>Like any {@link BlockingQueue}, a <tt>BlockingDeque</tt> is
+ * thread safe and may (or may not) be capacity-constrained.  A
+ * <tt>BlockingDeque</tt> implementation may be used directly as a
+ * FIFO <tt>BlockingQueue</tt>. The blocking methods inherited from
+ * the <tt>BlockingQueue</tt> interface are precisely equivalent to
+ * <tt>BlockingDeque</tt> methods as indicated in the following table:<p>
+ *
+ * <table BORDER CELLPADDING=3 CELLSPACING=1>
+ *  <tr>
+ *    <td ALIGN=CENTER> <b><tt>BlockingQueue</tt> Method</b></td>
+ *    <td ALIGN=CENTER> <b>Equivalent <tt>BlockingDeque</tt> Method</b></td>
+ *  </tr>
+ *  <tr>
+ *   <tr>
+ *    <td>{@link java.util.concurrent.BlockingQueue#put put(e)}</td>
+ *    <td>{@link #putLast putLast(e)}</td>
+ *   </tr>
+ *   <tr>
+ *    <td>{@link java.util.concurrent.BlockingQueue#take take()}</td>
+ *    <td>{@link #takeFirst takeFirst()}</td>
+ *   </tr>
+ *   <tr>
+ *    <td>{@link java.util.concurrent.BlockingQueue#offer(Object, long, TimeUnit) offer(e, time. unit)}</td>
+ *    <td>{@link #offerLast(Object, long, TimeUnit) offerLast(e, time, unit)}</td>
+ *   </tr>
+ *   <tr>
+ *    <td>{@link java.util.concurrent.BlockingQueue#poll(long, TimeUnit) poll(time, unit)}</td>
+ *    <td>{@link #pollFirst(long, TimeUnit) pollFirst(time, unit)}</td>
+ *   </tr>
+ * </table>
+ *
+ *
+ * <p>This interface is a member of the
+ * <a href="{@docRoot}/../guide/collections/index.html">
+ * Java Collections Framework</a>.
+ *
+ * @since 1.6
+ * @author Doug Lea
+ * @param <E> the type of elements held in this collection
+ */
+public interface BlockingDeque<E> extends Deque<E>, BlockingQueue<E> {
+
+    /**
+     * Adds the specified element as the first element of this deque,
+     * waiting if necessary for space to become available.
+     * @param o the element to add
+     * @throws InterruptedException if interrupted while waiting.
+     * @throws NullPointerException if the specified element is <tt>null</tt>.
+     */
+    void putFirst(E o) throws InterruptedException;
+
+    /**
+     * Adds the specified element as the last element of this deque,
+     * waiting if necessary for space to become available.
+     * @param o the element to add
+     * @throws InterruptedException if interrupted while waiting.
+     * @throws NullPointerException if the specified element is <tt>null</tt>.
+     */
+    void putLast(E o) throws InterruptedException;
+
+    /**
+     * Retrieves and removes the first element of this deque, waiting
+     * if no elements are present on this deque.
+     * @return the head of this deque
+     * @throws InterruptedException if interrupted while waiting.
+     */
+    E takeFirst() throws InterruptedException;
+
+    /**
+     * Retrieves and removes the last element of this deque, waiting
+     * if no elements are present on this deque.
+     * @return the head of this deque
+     * @throws InterruptedException if interrupted while waiting.
+     */
+    E takeLast() throws InterruptedException;
+
+    /**
+     * Inserts the specified element as the first element of this deque,
+     * waiting if necessary up to the specified wait time for space to
+     * become available.
+     * @param o the element to add
+     * @param timeout how long to wait before giving up, in units of
+     * <tt>unit</tt>
+     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
+     * <tt>timeout</tt> parameter
+     * @return <tt>true</tt> if successful, or <tt>false</tt> if
+     * the specified waiting time elapses before space is available.
+     * @throws InterruptedException if interrupted while waiting.
+     * @throws NullPointerException if the specified element is <tt>null</tt>.
+     */
+    boolean offerFirst(E o, long timeout, TimeUnit unit)
+        throws InterruptedException;
+
+    /**
+     * Inserts the specified element as the last element of this deque,
+     * waiting if necessary up to the specified wait time for space to
+     * become available.
+     * @param o the element to add
+     * @param timeout how long to wait before giving up, in units of
+     * <tt>unit</tt>
+     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
+     * <tt>timeout</tt> parameter
+     * @return <tt>true</tt> if successful, or <tt>false</tt> if
+     * the specified waiting time elapses before space is available.
+     * @throws InterruptedException if interrupted while waiting.
+     * @throws NullPointerException if the specified element is <tt>null</tt>.
+     */
+    boolean offerLast(E o, long timeout, TimeUnit unit)
+        throws InterruptedException;
+
+
+    /**
+     * Retrieves and removes the first element of this deque, waiting
+     * if necessary up to the specified wait time if no elements are
+     * present on this deque.
+     * @param timeout how long to wait before giving up, in units of
+     * <tt>unit</tt>
+     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
+     * <tt>timeout</tt> parameter
+     * @return the head of this deque, or <tt>null</tt> if the
+     * specified waiting time elapses before an element is present.
+     * @throws InterruptedException if interrupted while waiting.
+     */
+    E pollFirst(long timeout, TimeUnit unit)
+        throws InterruptedException;
+
+    /**
+     * Retrieves and removes the last element of this deque, waiting
+     * if necessary up to the specified wait time if no elements are
+     * present on this deque.
+     * @param timeout how long to wait before giving up, in units of
+     * <tt>unit</tt>
+     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
+     * <tt>timeout</tt> parameter
+     * @return the head of this deque, or <tt>null</tt> if the
+     * specified waiting time elapses before an element is present.
+     * @throws InterruptedException if interrupted while waiting.
+     */
+    E pollLast(long timeout, TimeUnit unit)
+        throws InterruptedException;
+
+    /**
+     * Adds the specified element as the last element of this deque,
+     * waiting if necessary for space to become available.  This
+     * method is equivalent to to putLast
+     * @param o the element to add
+     * @throws InterruptedException if interrupted while waiting.
+     * @throws NullPointerException if the specified element is <tt>null</tt>.
+     */
+    void put(E o) throws InterruptedException;
+
+    /** 
+     * Inserts the specified element as the lest element of this
+     * deque, if possible.  When using deques that may impose
+     * insertion restrictions (for example capacity bounds), method
+     * <tt>offer</tt> is generally preferable to method {@link
+     * Collection#add}, which can fail to insert an element only by
+     * throwing an exception.  This method is equivalent to to
+     * offerLast
+     *
+     * @param o the element to add.
+     * @return <tt>true</tt> if it was possible to add the element to
+     *         this deque, else <tt>false</tt>
+     * @throws NullPointerException if the specified element is <tt>null</tt>
+     */
+    boolean offer(E o, long timeout, TimeUnit unit)
+        throws InterruptedException;
+
+    /**
+     * Retrieves and removes the first element of this deque, waiting
+     * if no elements are present on this deque.
+     * This method is equivalent to to takeFirst 
+     * @return the head of this deque
+     * @throws InterruptedException if interrupted while waiting.
+     */
+    E take() throws InterruptedException;
+
+    /**
+     * Retrieves and removes the first element of this deque, waiting
+     * if necessary up to the specified wait time if no elements are
+     * present on this deque.  This method is equivalent to to
+     * pollFirst
+     * @param timeout how long to wait before giving up, in units of
+     * <tt>unit</tt>
+     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
+     * <tt>timeout</tt> parameter
+     * @return the head of this deque, or <tt>null</tt> if the
+     * specified waiting time elapses before an element is present.
+     * @throws InterruptedException if interrupted while waiting.
+     */
+    E poll(long timeout, TimeUnit unit)
+        throws InterruptedException;
+}

Added: branches/clebert_temp_expirement/src/main/org/jboss/messaging/utils/concurrent/Deque.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/utils/concurrent/Deque.java	                        (rev 0)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/utils/concurrent/Deque.java	2009-06-26 22:56:53 UTC (rev 7483)
@@ -0,0 +1,442 @@
+/*
+ * Written by Doug Lea and Josh Bloch with assistance from members of
+ * JCP JSR-166 Expert Group and released to the public domain, as explained
+ * at http://creativecommons.org/licenses/publicdomain
+ */
+
+package org.jboss.messaging.utils.concurrent;     // XXX This belongs in java.util!!! XXX
+import java.util.*;    // XXX This import goes away        XXX
+
+/**
+ * A linear collection that supports element insertion and removal at
+ * both ends.  The name <i>deque</i> is short for "double ended queue"
+ * and is usually pronounced "deck".  Most <tt>Deque</tt>
+ * implementations place no fixed limits on the number of elements
+ * they may contain, but this interface supports capacity-restricted
+ * deques as well as those with no fixed size limit.
+ *
+ * <p>This interface defines methods to access the elements at both
+ * ends of the deque.  Methods are provided to insert, remove, and
+ * examine the element.  Each of these methods exists in two forms:
+ * one throws an exception if the operation fails, the other returns a
+ * special value (either <tt>null</tt> or <tt>false</tt>, depending on
+ * the operation).  The latter form of the insert operation is
+ * designed specifically for use with capacity-restricted
+ * <tt>Deque</tt> implementations; in most implementations, insert
+ * operations cannot fail.
+ *
+ * <p>The twelve methods described above are are summarized in the 
+ * follwoing table:<p>
+ * 
+ * <table BORDER CELLPADDING=3 CELLSPACING=1>
+ *  <tr>
+ *    <td></td>
+ *    <td ALIGN=CENTER COLSPAN = 2> <b>First Element (Head)</b></td>
+ *    <td ALIGN=CENTER COLSPAN = 2> <b>Last Element (Tail)</b></td>
+ *  </tr>
+ *  <tr>
+ *    <td></td>
+ *    <td ALIGN=CENTER><em>Throws exception</em></td>
+ *    <td ALIGN=CENTER><em>Returns special value</em></td>
+ *    <td ALIGN=CENTER><em>Throws exception</em></td>
+ *    <td ALIGN=CENTER><em>Returns special value</em></td>
+ *  </tr>
+ *  <tr>
+ *    <td><b>Insert</b></td>
+ *    <td>{@link #addFirst addFirst(e)}</td>
+ *    <td>{@link #offerFirst offerFirst(e)}</td>
+ *    <td>{@link #addLast addLast(e)}</td>
+ *    <td>{@link #offerLast offerLast(e)}</td>
+ *  </tr>
+ *  <tr>
+ *    <td><b>Remove</b></td>
+ *    <td>{@link #removeFirst removeFirst()}</td>
+ *    <td>{@link #pollFirst pollFirst()}</td>
+ *    <td>{@link #removeLast removeLast()}</td>
+ *    <td>{@link #pollLast pollLast()}</td>
+ *  </tr>
+ *  <tr>
+ *    <td><b>Examine</b></td>
+ *    <td>{@link #getFirst getFirst()}</td>
+ *    <td>{@link #peekFirst peekFirst()}</td>
+ *    <td>{@link #getLast getLast()}</td>
+ *    <td>{@link #peekLast peekLast()}</td>
+ *  </tr>
+ * </table>
+ *
+ * <p>This interface extends the {@link Queue} interface.  When a deque is
+ * used as a queue, FIFO (First-In-First-Out) behavior results.  Elements are
+ * added to the end of the deque and removed from the beginning.  The methods
+ * inherited from the <tt>Queue</tt> interface are precisely equivalent to
+ * <tt>Deque</tt> methods as indicated in the following table:<p>
+ *
+ * <table BORDER CELLPADDING=3 CELLSPACING=1>
+ *  <tr>
+ *    <td ALIGN=CENTER> <b><tt>Queue</tt> Method</b></td>
+ *    <td ALIGN=CENTER> <b>Equivalent <tt>Deque</tt> Method</b></td>
+ *  </tr>
+ *  <tr>
+ *   <tr>
+ *    <td>{@link java.util.Queue#offer offer(e)}</td>
+ *    <td>{@link #offerLast offerLast(e)}</td>
+ *   </tr>
+ *   <tr>
+ *    <td>{@link java.util.Queue#add add(e)}</td>
+ *    <td>{@link #addLast addLast(e)}</td>
+ *   </tr>
+ *   <tr>
+ *    <td>{@link java.util.Queue#poll poll()}</td>
+ *    <td>{@link #pollFirst pollFirst()}</td>
+ *   </tr>
+ *   <tr>
+ *    <td>{@link java.util.Queue#remove remove()}</td>
+ *    <td>{@link #removeFirst removeFirst()}</td>
+ *   </tr>
+ *   <tr>
+ *    <td>{@link java.util.Queue#peek peek()}</td>
+ *    <td>{@link #peek peekFirst()}</td>
+ *   </tr>
+ *   <tr>
+ *    <td>{@link java.util.Queue#element element()}</td>
+ *    <td>{@link #getFirst getFirst()}</td>
+ *   </tr>
+ * </table>
+ *
+ * <p>Deques can also be used as LIFO (Last-In-First-Out) stacks.  This
+ * interface should be used in preference to the legacy {@link Stack} class.
+ * When a dequeue is used as a stack, elements are pushed and popped from the
+ * beginning of the deque.  Stack methods are precisely equivalent to
+ * <tt>Deque</tt> methods as indicated in the table below:<p>
+ *
+ * <table BORDER CELLPADDING=3 CELLSPACING=1>
+ *  <tr>
+ *    <td ALIGN=CENTER> <b>Stack Method</b></td>
+ *    <td ALIGN=CENTER> <b>Equivalent <tt>Deque</tt> Method</b></td>
+ *  </tr>
+ *  <tr>
+ *   <tr>
+ *    <td>{@link #push push(e)}</td>
+ *    <td>{@link #addFirst addFirst(e)}</td>
+ *   </tr>
+ *   <tr>
+ *    <td>{@link #pop pop()}</td>
+ *    <td>{@link #removeFirst removeFirst()}</td>
+ *   </tr>
+ *   <tr>
+ *    <td>{@link #peek peek()}</td>
+ *    <td>{@link #peekFirst peekFirst()}</td>
+ *   </tr>
+ * </table>
+ *
+ * <p>Note that the {@link #peek peek} method works equally well when
+ * a deque is used as a queue or a stack; in either case, elements are
+ * drawn from the beginning of the deque.
+ *
+ * <p>This inteface provides two methods to to remove interior
+ * elements, {@link #removeFirstOccurrence removeFirstOccurrence} and
+ * {@link #removeLastOccurrence removeLastOccurrence}.  Unlike the
+ * {@link List} interface, this interface does not provide support for
+ * indexed access to elements.
+ *
+ * <p>While <tt>Deque</tt> implementations are not strictly required
+ * to prohibit the insertion of null elements, they are strongly
+ * encouraged to do so.  Users of any <tt>Deque</tt> implementations
+ * that do allow null elements are strongly encouraged <i>not</i> to
+ * take advantage of the ability to insert nulls.  This is so because
+ * <tt>null</tt> is used as a special return value by various methods
+ * to indicated that the deque is empty.
+ * 
+ * <p><tt>Deque</tt> implementations generally do not define
+ * element-based versions of the <tt>equals</tt> and <tt>hashCode</tt>
+ * methods, but instead inherit the identity-based versions from class
+ * <tt>Object</tt>.
+ *
+ * <p>This interface is a member of the <a
+ * href="{@docRoot}/../guide/collections/index.html"> Java Collections
+ * Framework</a>.
+ *
+ * @author Doug Lea
+ * @author Josh Bloch
+ * @since  1.6
+ * @param <E> the type of elements held in this collection
+ */
+public interface Deque<E> extends Queue<E> {
+    /**
+     * Inserts the specified element to the front this deque unless it would
+     * violate capacity restrictions.  When using a capacity-restricted deque,
+     * this method is generally preferable to method <tt>addFirst</tt>, which
+     * can fail to insert an element only by throwing an exception.
+     *
+     * @param e the element to insert
+     * @return <tt>true</tt> if it was possible to insert the element,
+     *     else <tt>false</tt>
+     * @throws NullPointerException if <tt>e</tt> is null and this
+     *     deque does not permit null elements
+     */
+    boolean offerFirst(E e);
+
+    /**
+     * Inserts the specified element to the end of this deque unless it would
+     * violate capacity restrictions.  When using a capacity-restricted deque,
+     * this method is generally preferable to method <tt>addLast</tt> which
+     * can fail to insert an element only by throwing an exception.
+     *
+     * @param e the element to insert
+     * @return <tt>true</tt> if it was possible to insert the element,
+     *     else <tt>false</tt>
+     * @throws NullPointerException if <tt>e</tt> is null and this
+     *     deque does not permit null elements
+     */
+    boolean offerLast(E e);
+
+    /**
+     * Inserts the specified element to the front of this deque unless it
+     * would violate capacity restrictions.
+     *
+     * @param e the element to insert
+     * @throws IllegalStateException if it was not possible to insert
+     *    the element due to capacity restrictions
+     * @throws NullPointerException if <tt>e</tt> is null and this
+     *     deque does not permit null elements
+     */
+    void addFirst(E e);
+
+    /**
+     * Inserts the specified element to the end of this deque unless it would
+     * violate capacity restrictions.
+     *
+     * @param e the element to insert
+     * @throws IllegalStateException if it was not possible to insert
+     *    the element due to capacity restrictions
+     * @throws NullPointerException if <tt>e</tt> is null and this
+     *     deque does not permit null elements
+     */
+    void addLast(E e);
+
+    /**
+     * Retrieves and removes the first element of this deque, or
+     * <tt>null</tt> if this deque is empty.
+     *
+     * @return the first element of this deque, or <tt>null</tt> if
+     *     this deque is empty
+     */
+    E pollFirst();
+
+    /**
+     * Retrieves and removes the last element of this deque, or
+     * <tt>null</tt> if this deque is empty.
+     *
+     * @return the last element of this deque, or <tt>null</tt> if
+     *     this deque is empty
+     */
+    E pollLast();
+
+    /**
+     * Removes and returns the first element of this deque.  This method
+     * differs from the <tt>pollFirst</tt> method only in that it throws an
+     * exception if this deque is empty.
+     *
+     * @return the first element of this deque
+     * @throws NoSuchElementException if this deque is empty
+     */
+    E removeFirst();
+
+    /**
+     * Retrieves and removes the last element of this deque.  This method
+     * differs from the <tt>pollLast</tt> method only in that it throws an
+     * exception if this deque is empty.
+     *
+     * @return the last element of this deque
+     * @throws NoSuchElementException if this deque is empty
+     */
+    E removeLast();
+
+    /**
+     * Retrieves, but does not remove, the first element of this deque,
+     * returning <tt>null</tt> if this deque is empty.
+     *
+     * @return the first element of this deque, or <tt>null</tt> if
+     *     this deque is empty
+     */
+    E peekFirst();
+
+    /**
+     * Retrieves, but does not remove, the last element of this deque,
+     * returning <tt>null</tt> if this deque is empty.
+     *
+     * @return the last element of this deque, or <tt>null</tt> if this deque
+     *     is empty
+     */
+    E peekLast();
+
+    /**
+     * Retrieves, but does not remove, the first element of this
+     * deque.  This method differs from the <tt>peek</tt> method only
+     * in that it throws an exception if this deque is empty.
+     *
+     * @return the first element of this deque
+     * @throws NoSuchElementException if this deque is empty
+     */
+    E getFirst();
+
+    /**
+     * Retrieves, but does not remove, the last element of this
+     * deque.  This method differs from the <tt>peek</tt> method only
+     * in that it throws an exception if this deque is empty.
+     *
+     * @return the last element of this deque
+     * @throws NoSuchElementException if this deque is empty
+     */
+    E getLast();
+
+    /**
+     * Removes the first occurrence of the specified element in this
+     * deque.  If the deque does not contain the element, it is
+     * unchanged.  More formally, removes the first element <tt>e</tt>
+     * such that <tt>(o==null ? e==null : o.equals(e))</tt> (if
+     * such an element exists).
+     *
+     * @param e element to be removed from this deque, if present
+     * @return <tt>true</tt> if the deque contained the specified element
+     * @throws NullPointerException if the specified element is <tt>null</tt>
+     */
+    boolean removeFirstOccurrence(Object e);
+
+    /**
+     * Removes the last occurrence of the specified element in this
+     * deque.  If the deque does not contain the element, it is
+     * unchanged.  More formally, removes the last element <tt>e</tt>
+     * such that <tt>(o==null ? e==null : o.equals(e))</tt> (if
+     * such an element exists).
+     *
+     * @param e element to be removed from this deque, if present
+     * @return <tt>true</tt> if the deque contained the specified element
+     * @throws NullPointerException if the specified element is <tt>null</tt>
+     */
+    boolean removeLastOccurrence(Object e);
+
+
+    // *** Queue methods ***
+
+    /**
+     * Inserts the specified element into the queue represented by this deque
+     * unless it would violate capacity restrictions.  In other words, inserts
+     * the specified element to the end of this deque.  When using a
+     * capacity-restricted deque, this method is generally preferable to the
+     * {@link #add} method, which can fail to insert an element only by
+     * throwing an exception.
+     *
+     * <p>This method is equivalent to {@link #offerLast}.
+     *
+     * @param e the element to insert
+     * @return <tt>true</tt> if it was possible to insert the element,
+     *     else <tt>false</tt>
+     * @throws NullPointerException if <tt>e</tt> is null and this
+     *     deque does not permit null elements
+     */
+    boolean offer(E e);
+
+    /**
+     * Inserts the specified element into the queue represented by this
+     * deque unless it would violate capacity restrictions.  In other words,
+     * inserts the specified element as the last element of this deque. 
+     *
+     * <p>This method is equivalent to {@link #addLast}.
+     *
+     * @param e the element to insert
+     * @return <tt>true</tt> (as per the spec for {@link Collection#add})
+     * @throws IllegalStateException if it was not possible to insert
+     *    the element due to capacity restrictions
+     * @throws NullPointerException if <tt>e</tt> is null and this
+     *     deque does not permit null elements
+     */
+    boolean add(E e);
+
+    /**
+     * Retrieves and removes the head of the queue represented by
+     * this deque, or <tt>null</tt> if this deque is empty.  In other words,
+     * retrieves and removes the first element of this deque, or <tt>null</tt>
+     * if this deque is empty.
+     *
+     * <p>This method is equivalent to {@link #pollFirst()}.
+     *
+     * @return the first element of this deque, or <tt>null</tt> if
+     *     this deque is empty
+     */
+    E poll();
+
+    /**
+     * Retrieves and removes the head of the queue represented by this deque.
+     * This method differs from the <tt>poll</tt> method only in that it
+     * throws an exception if this deque is empty.
+     *
+     * <p>This method is equivalent to {@link #removeFirst()}.
+     *
+     * @return the head of the queue represented by this deque
+     * @throws NoSuchElementException if this deque is empty
+     */
+    E remove();
+
+    /**
+     * Retrieves, but does not remove, the head of the queue represented by
+     * this deque, returning <tt>null</tt> if this deque is empty.
+     *
+     * <p>This method is equivalent to {@link #peekFirst()}
+     *
+     * @return the head of the queue represented by this deque, or
+     *     <tt>null</tt> if this deque is empty
+     */
+    E peek();
+
+    /**
+     * Retrieves, but does not remove, the head of the queue represented by
+     * this deque.  This method differs from the <tt>peek</tt> method only in
+     * that it throws an exception if this deque is empty.
+     *
+     * <p>This method is equivalent to {@link #getFirst()}
+     *
+     * @return the head of the queue represented by this deque
+     * @throws NoSuchElementException if this deque is empty
+     */
+    E element();
+
+
+    // *** Stack methods ***
+
+    /**
+     * Pushes an element onto the stack represented by this deque.  In other
+     * words, inserts the element to the front this deque unless it would
+     * violate capacity restrictions.
+     *
+     * <p>This method is equivalent to {@link #addFirst}.
+     *
+     * @throws IllegalStateException if it was not possible to insert
+     *    the element due to capacity restrictions
+     * @throws NullPointerException if <tt>e</tt> is null and this
+     *     deque does not permit null elements
+     */
+    void push(E e);
+
+    /**
+     * Pops an element from the stack represented by this deque.  In other
+     * words, removes and returns the the first element of this deque.
+     *
+     * <p>This method is equivalent to {@link #removeFirst()}.
+     *
+     * @return the element at the front of this deque (which is the top
+     *     of the stack represented by this deque)
+     * @throws NoSuchElementException if this deque is empty
+     */
+    E pop();
+
+
+    // *** Collection Method ***
+
+    /**
+     * Returns an iterator over the elements in this deque.  The elements
+     * will be ordered from first (head) to last (tail).
+     * 
+     * @return an <tt>Iterator</tt> over the elements in this deque
+     */
+    Iterator<E> iterator();
+}

Added: branches/clebert_temp_expirement/src/main/org/jboss/messaging/utils/concurrent/LinkedBlockingDeque.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/utils/concurrent/LinkedBlockingDeque.java	                        (rev 0)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/utils/concurrent/LinkedBlockingDeque.java	2009-06-26 22:56:53 UTC (rev 7483)
@@ -0,0 +1,762 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package org.jboss.messaging.utils.concurrent;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.*;
+
+/**
+ * An optionally-bounded {@linkplain BlockingDeque blocking deque} based on
+ * linked nodes.
+ *
+ * <p> The optional capacity bound constructor argument serves as a
+ * way to prevent excessive expansion. The capacity, if unspecified,
+ * is equal to {@link Integer#MAX_VALUE}.  Linked nodes are
+ * dynamically created upon each insertion unless this would bring the
+ * deque above capacity.
+ *
+ * <p>Most operations run in constant time (ignoring time spent
+ * blocking).  Exceptions include {@link #remove(Object) remove},
+ * {@link #removeFirstOccurrence removeFirstOccurrence}, {@link
+ * #removeLastOccurrence removeLastOccurrence}, {@link #contains
+ * contains }, {@link #iterator iterator.remove()}, and the bulk
+ * operations, all of which run in linear time.
+ *
+ * <p>This class and its iterator implement all of the
+ * <em>optional</em> methods of the {@link Collection} and {@link
+ * Iterator} interfaces.  This class is a member of the <a
+ * href="{@docRoot}/../guide/collections/index.html"> Java Collections
+ * Framework</a>.
+ *
+ * @since 1.6
+ * @author  Doug Lea
+ * @param <E> the type of elements held in this collection
+ */
+public class LinkedBlockingDeque<E>
+    extends AbstractQueue<E>
+    implements BlockingDeque<E>,  java.io.Serializable {
+
+    /*
+     * Implemented as a simple doubly-linked list protected by a
+     * single lock and using conditions to manage blocking.
+     */
+
+    private static final long serialVersionUID = -387911632671998426L;
+
+    /** Doubly-linked list node class */
+    static final class Node<E> {
+   E item; 
+        Node<E> prev;
+        Node<E> next;
+        Node(E x, Node<E> p, Node<E> n) {
+            item = x;
+            prev = p;
+            next = n;
+        }
+    }
+
+    /** Pointer to first node */
+    private transient Node<E> first;
+    /** Pointer to last node */
+    private transient Node<E> last;
+    /** Number of items in the deque */
+    private transient int count;
+    /** Maximum number of items in the deque */
+    private final int capacity;
+    /** Main lock guarding all access */
+    private final ReentrantLock lock = new ReentrantLock();
+    /** Condition for waiting takes */
+    private final Condition notEmpty = lock.newCondition();
+    /** Condition for waiting puts */
+    private final Condition notFull = lock.newCondition();
+
+    /**
+     * Creates a <tt>LinkedBlockingDeque</tt> with a capacity of
+     * {@link Integer#MAX_VALUE}.
+     */
+    public LinkedBlockingDeque() {
+        this(Integer.MAX_VALUE);
+    }
+
+    /**
+     * Creates a <tt>LinkedBlockingDeque</tt> with the given (fixed)
+     * capacity.
+     * @param capacity the capacity of this deque
+     * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
+     */
+    public LinkedBlockingDeque(int capacity) {
+        if (capacity <= 0) throw new IllegalArgumentException();
+        this.capacity = capacity;
+    }
+
+    /**
+     * Creates a <tt>LinkedBlockingDeque</tt> with a capacity of
+     * {@link Integer#MAX_VALUE}, initially containing the elements of the
+     * given collection,
+     * added in traversal order of the collection's iterator.
+     * @param c the collection of elements to initially contain
+     * @throws NullPointerException if <tt>c</tt> or any element within it
+     * is <tt>null</tt>
+     */
+    public LinkedBlockingDeque(Collection<? extends E> c) {
+        this(Integer.MAX_VALUE);
+        for (E e : c)
+            add(e);
+    }
+
+
+    // Basic linking and unlinking operations, called only while holding lock
+
+    /**
+     * Link e as first element, or return false if full
+     */
+    private boolean linkFirst(E e) {
+        if (count >= capacity)
+            return false;
+        ++count;
+        Node<E> f = first;
+        Node<E> x = new Node<E>(e, null, f);
+        first = x;
+        if (last == null)
+            last = x;
+        else
+            f.prev = x;
+        notEmpty.signal();
+        return true;
+    }
+
+    /**
+     * Link e as last element, or return false if full
+     */
+    private boolean linkLast(E e) {
+        if (count >= capacity)
+            return false;
+        ++count;
+        Node<E> l = last;
+        Node<E> x = new Node<E>(e, l, null);
+        last = x;
+        if (first == null)
+            first = x;
+        else
+            l.next = x;
+        notEmpty.signal();
+        return true;
+    }
+
+    /**
+     * Remove and return first element, or null if empty
+     */
+    private E unlinkFirst() {
+        Node<E> f = first;
+        if (f == null)
+            return null;
+        Node<E> n = f.next;
+        first = n;
+        if (n == null) 
+            last = null;
+        else 
+            n.prev = null;
+        --count;
+        notFull.signal();
+        return f.item;
+    }
+
+    /**
+     * Remove and return last element, or null if empty
+     */
+    private E unlinkLast() {
+        Node<E> l = last;
+        if (l == null)
+            return null;
+        Node<E> p = l.prev;
+        last = p;
+        if (p == null) 
+            first = null;
+        else 
+            p.next = null;
+        --count;
+        notFull.signal();
+        return l.item;
+    }
+
+    /**
+     * Unlink e
+     */
+    private void unlink(Node<E> x) {
+        Node<E> p = x.prev;
+        Node<E> n = x.next;
+        if (p == null) {
+            if (n == null) 
+                first = last = null;
+            else {
+                n.prev = null;
+                first = n;
+            }
+        } else if (n == null) {
+            p.next = null;
+            last = p;
+        } else {
+            p.next = n;
+            n.prev = p;
+        }
+        --count;
+        notFull.signalAll();
+    }
+
+    // Deque methods
+
+    public boolean offerFirst(E o) {
+        if (o == null) throw new NullPointerException();
+        lock.lock();
+        try {
+            return linkFirst(o);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public boolean offerLast(E o) {
+        if (o == null) throw new NullPointerException();
+        lock.lock();
+        try {
+            return linkLast(o);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void addFirst(E e) { 
+        if (!offerFirst(e))
+            throw new IllegalStateException("Deque full");
+    }
+
+    public void addLast(E e) { 
+        if (!offerLast(e))
+            throw new IllegalStateException("Deque full");
+    }
+
+    public E pollFirst() {
+        lock.lock();
+        try {
+            return unlinkFirst();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public E pollLast() {
+        lock.lock();
+        try {
+            return unlinkLast();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public E removeFirst() {
+        E x = pollFirst();
+        if (x == null) throw new NoSuchElementException();
+        return x;
+    }
+
+    public E removeLast() {
+        E x = pollLast();
+        if (x == null) throw new NoSuchElementException();
+        return x;
+    }
+
+    public E peekFirst() {
+        lock.lock();
+        try {
+            return (first == null) ? null : first.item;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public E peekLast() {
+        lock.lock();
+        try {
+            return (last == null) ? null : last.item;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public E getFirst() {
+        E x = peekFirst();
+        if (x == null) throw new NoSuchElementException();
+        return x;
+    }
+
+    public E getLast() {
+        E x = peekLast();
+        if (x == null) throw new NoSuchElementException();
+        return x;
+    }
+
+    // BlockingDeque methods
+
+    public void putFirst(E o) throws InterruptedException {
+        if (o == null) throw new NullPointerException();
+        lock.lock();
+        try {
+            while (!linkFirst(o))
+                notFull.await();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void putLast(E o) throws InterruptedException {
+        if (o == null) throw new NullPointerException();
+        lock.lock();
+        try {
+            while (!linkLast(o))
+                notFull.await();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public E takeFirst() throws InterruptedException {
+        lock.lock();
+        try {
+            E x;
+            while ( (x = unlinkFirst()) == null)
+                notEmpty.await();
+            return x;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public E takeLast() throws InterruptedException {
+        lock.lock();
+        try {
+            E x;
+            while ( (x = unlinkLast()) == null)
+                notEmpty.await();
+            return x;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public boolean offerFirst(E o, long timeout, TimeUnit unit)
+        throws InterruptedException {
+        if (o == null) throw new NullPointerException();
+        lock.lockInterruptibly();
+        try {
+            long nanos = unit.toNanos(timeout);
+            for (;;) {
+                if (linkFirst(o))
+                    return true;
+                if (nanos <= 0)
+                    return false;
+                nanos = notFull.awaitNanos(nanos);
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+        
+    public boolean offerLast(E o, long timeout, TimeUnit unit)
+        throws InterruptedException {
+        if (o == null) throw new NullPointerException();
+        lock.lockInterruptibly();
+        try {
+            long nanos = unit.toNanos(timeout);
+            for (;;) {
+                if (linkLast(o))
+                    return true;
+                if (nanos <= 0)
+                    return false;
+                nanos = notFull.awaitNanos(nanos);
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public E pollFirst(long timeout, TimeUnit unit) 
+        throws InterruptedException {
+        lock.lockInterruptibly();
+        try {
+            long nanos = unit.toNanos(timeout);
+            for (;;) {
+                E x = unlinkFirst();
+                if (x != null)
+                    return x;
+                if (nanos <= 0)
+                    return null;
+                nanos = notEmpty.awaitNanos(nanos);
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public E pollLast(long timeout, TimeUnit unit) 
+        throws InterruptedException {
+        lock.lockInterruptibly();
+        try {
+            long nanos = unit.toNanos(timeout);
+            for (;;) {
+                E x = unlinkLast();
+                if (x != null)
+                    return x;
+                if (nanos <= 0)
+                    return null;
+                nanos = notEmpty.awaitNanos(nanos);
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    // Queue and stack methods
+
+    public boolean offer(E e)       { return offerLast(e); }
+    public boolean add(E e)         { addLast(e); return true; }
+    public void push(E e)           { addFirst(e); }
+    public E poll()                 { return pollFirst(); }
+    public E remove()               { return removeFirst(); }
+    public E pop()                  { return removeFirst(); }
+    public E peek()                 { return peekFirst(); }
+    public E element()              { return getFirst(); }
+    public boolean remove(Object o) { return removeFirstOccurrence(o); }
+
+    // BlockingQueue methods
+
+    public void put(E o) throws InterruptedException  { putLast(o);  }
+    public E take() throws InterruptedException       { return takeFirst(); }
+    public boolean offer(E o, long timeout, TimeUnit unit)
+        throws InterruptedException    { return offerLast(o, timeout, unit); }
+    public E poll(long timeout, TimeUnit unit)
+        throws InterruptedException    { return pollFirst(timeout, unit); }
+
+    /**
+     * Returns the number of elements in this deque.
+     *
+     * @return  the number of elements in this deque.
+     */
+    public int size() {
+        lock.lock();
+        try {
+            return count;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Returns the number of elements that this deque can ideally (in
+     * the absence of memory or resource constraints) accept without
+     * blocking. This is always equal to the initial capacity of this deque
+     * less the current <tt>size</tt> of this deque.
+     * <p>Note that you <em>cannot</em> always tell if
+     * an attempt to <tt>add</tt> an element will succeed by
+     * inspecting <tt>remainingCapacity</tt> because it may be the
+     * case that a waiting consumer is ready to <tt>take</tt> an
+     * element out of an otherwise full deque.
+     */
+    public int remainingCapacity() {
+        lock.lock();
+        try {
+            return capacity - count;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public boolean contains(Object o) {
+        if (o == null) return false;
+        lock.lock();
+        try {
+            for (Node<E> p = first; p != null; p = p.next) 
+                if (o.equals(p.item))
+                    return true;
+            return false;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public boolean removeFirstOccurrence(Object e) {
+        if (e == null) throw new NullPointerException();
+        lock.lock();
+        try {
+            for (Node<E> p = first; p != null; p = p.next) {
+                if (e.equals(p.item)) {
+                    unlink(p);
+                    return true;
+                }
+            }
+            return false;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public boolean removeLastOccurrence(Object e) {
+        if (e == null) throw new NullPointerException();
+        lock.lock();
+        try {
+            for (Node<E> p = last; p != null; p = p.prev) {
+                if (e.equals(p.item)) {
+                    unlink(p);
+                    return true;
+                }
+            }
+            return false;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Variant of removeFirstOccurrence needed by iterator.remove.
+     * Searches for the node, not its contents.
+     */
+   boolean removeNode(Node<E> e) {
+        lock.lock();
+        try {
+            for (Node<E> p = first; p != null; p = p.next) {
+                if (p == e) {
+                    unlink(p);
+                    return true;
+                }
+            }
+            return false;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public Object[] toArray() {
+        lock.lock();
+        try {
+            Object[] a = new Object[count];
+            int k = 0;
+            for (Node<E> p = first; p != null; p = p.next) 
+                a[k++] = p.item;
+            return a;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public <T> T[] toArray(T[] a) {
+        lock.lock();
+        try {
+            if (a.length < count)
+                a = (T[])java.lang.reflect.Array.newInstance(
+                    a.getClass().getComponentType(),
+                    count
+                    );
+
+            int k = 0;
+            for (Node<E> p = first; p != null; p = p.next) 
+                a[k++] = (T)p.item;
+            if (a.length > k)
+                a[k] = null;
+            return a;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public String toString() {
+        lock.lock();
+        try {
+            return super.toString();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Atomically removes all of the elements from this deque.
+     * The deque will be empty after this call returns.
+     */
+    public void clear() {
+        lock.lock();
+        try {
+            first = last = null;
+            count = 0;
+            notFull.signalAll();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public int drainTo(Collection<? super E> c) {
+        if (c == null)
+            throw new NullPointerException();
+        if (c == this)
+            throw new IllegalArgumentException();
+        lock.lock();
+        try {
+            for (Node<E> p = first; p != null; p = p.next) 
+                c.add(p.item);
+            int n = count;
+            count = 0;
+            first = last = null;
+            notFull.signalAll();
+            return n;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public int drainTo(Collection<? super E> c, int maxElements) {
+        if (c == null)
+            throw new NullPointerException();
+        if (c == this)
+            throw new IllegalArgumentException();
+        lock.lock();
+        try {
+            int n = 0;
+            while (n < maxElements && first != null) {
+                c.add(first.item);
+                first.prev = null;
+                first = first.next;
+                --count;
+                ++n;
+            }
+            if (first == null)
+                last = null;
+            notFull.signalAll();
+            return n;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Returns an iterator over the elements in this deque in proper sequence.
+     * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
+     * will never throw {@link java.util.ConcurrentModificationException},
+     * and guarantees to traverse elements as they existed upon
+     * construction of the iterator, and may (but is not guaranteed to)
+     * reflect any modifications subsequent to construction.
+     *
+     * @return an iterator over the elements in this deque in proper sequence.
+     */
+    public Iterator<E> iterator() {
+        return new Itr();
+    }
+
+    /**
+     * Iterator for LinkedBlockingDeque
+     */
+    private class Itr implements Iterator<E> {
+        private Node<E> next;
+
+        /**
+         * nextItem holds on to item fields because once we claim that
+         * an element exists in hasNext(), we must return item read
+         * under lock (in advance()) even if it was in the process of
+         * being removed when hasNext() was called.
+         **/
+        private E nextItem;
+
+        /**
+         * Node returned by most recent call to next. Needed by remove.
+         * Reset to null if this element is deleted by a call to remove.
+         */
+        private Node<E> last;
+
+        Itr() {
+            advance();
+        }
+
+        /**
+         * Advance next, or if not yet initialized, set to first node.
+         */
+        private void advance() { 
+            final ReentrantLock lock = LinkedBlockingDeque.this.lock;
+            lock.lock();
+            try {
+                next = (next == null)? first : next.next;
+                nextItem = (next == null)? null : next.item;
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        public boolean hasNext() {
+            return next != null;
+        }
+
+        public E next() {
+            if (next == null)
+                throw new NoSuchElementException();
+            last = next;
+            E x = nextItem;
+            advance();
+            return x;
+        }
+
+        public void remove() {
+            Node<E> n = last;
+            if (n == null)
+                throw new IllegalStateException();
+            last = null;
+            // Note: removeNode rescans looking for this node to make
+            // sure it was not already removed. Otherwwise, trying to
+            // re-remove could corrupt list.
+            removeNode(n);
+        }
+    }
+
+    /**
+     * Save the state to a stream (that is, serialize it).
+     *
+     * @serialData The capacity (int), followed by elements (each an
+     * <tt>Object</tt>) in the proper order, followed by a null
+     * @param s the stream
+     */
+    private void writeObject(java.io.ObjectOutputStream s)
+        throws java.io.IOException {
+        lock.lock();
+        try {
+            // Write out capacity and any hidden stuff
+            s.defaultWriteObject();
+            // Write out all elements in the proper order.
+            for (Node<E> p = first; p != null; p = p.next)
+                s.writeObject(p.item);
+            // Use trailing null as sentinel
+            s.writeObject(null);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Reconstitute this deque instance from a stream (that is,
+     * deserialize it).
+     * @param s the stream
+     */
+    private void readObject(java.io.ObjectInputStream s)
+        throws java.io.IOException, ClassNotFoundException {
+        s.defaultReadObject();
+        count = 0;
+        first = null;
+        last = null;
+        // Read in all elements and place in queue
+        for (;;) {
+            E item = (E)s.readObject();
+            if (item == null)
+                break;
+            add(item);
+        }
+    }
+    
+}

Added: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalCompactTest.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalCompactTest.java	                        (rev 0)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalCompactTest.java	2009-06-26 22:56:53 UTC (rev 7483)
@@ -0,0 +1,324 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ * 
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ * 
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.unit.core.journal.impl;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.JournalImpl;
+import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.utils.IDGenerator;
+import org.jboss.messaging.utils.TimeAndCounterIDGenerator;
+
+/**
+ * 
+ * A JournalImplTestBase
+ * 
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public class JournalCompactTest extends JournalImplTestBase
+{
+   private static final Logger log = Logger.getLogger(JournalCompactTest.class);
+
+   protected String journalDir = System.getProperty("user.home") + "/journal-test";
+
+   private static final int NUMBER_OF_RECORDS = 1000;
+
+   IDGenerator idGenerator = new TimeAndCounterIDGenerator();
+
+   // General tests
+   // =============
+
+   public void testCompactwithPendingXACommit() throws Exception
+   {
+   }
+
+   public void testCompactwithPendingXAPrepareAndCommit() throws Exception
+   {
+   }
+
+   public void testCompactwithPendingCommit() throws Exception
+   {
+   }
+
+   public void testCompactwithConcurrentDeletes() throws Exception
+   {
+   }
+
+   public void testCompactWithConcurrentAppend() throws Exception
+   {
+      setup(50, 60 * 1024, true);
+
+      final CountDownLatch latchDone = new CountDownLatch(1);
+      final CountDownLatch latchWait = new CountDownLatch(1);
+      journal = new JournalImpl(fileSize, minFiles, fileFactory, filePrefix, fileExtension, maxAIO)
+      {
+         public void onCompactDone()
+         {
+            latchDone.countDown();
+            System.out.println("Waiting on Compact");
+            try
+            {
+               latchWait.await();
+            }
+            catch (InterruptedException e)
+            {
+               e.printStackTrace();
+            }
+            System.out.println("Done");
+         }
+      };
+      startJournal();
+      load();
+
+      long transactionID = 0;
+
+      for (int i = 0; i < NUMBER_OF_RECORDS / 2; i++)
+      {
+         add(i);
+         if (i % 10 == 0 && i > 0)
+         {
+            journal.forceMoveNextFile();
+         }
+         update(i);
+      }
+
+      for (int i = NUMBER_OF_RECORDS / 2; i < NUMBER_OF_RECORDS; i++)
+      {
+
+         addTx(transactionID, i);
+         updateTx(transactionID, i);
+         if (i % 10 == 0)
+         {
+            journal.forceMoveNextFile();
+         }
+         commit(transactionID++);
+         update(i);
+      }
+
+      System.out.println("Number of Files: " + journal.getDataFilesCount());
+
+      for (int i = 0; i < NUMBER_OF_RECORDS; i++)
+      {
+         if (!(i % 10 == 0))
+         {
+            delete(i);
+         }
+      }
+
+      journal.forceMoveNextFile();
+
+      Thread t = new Thread()
+      {
+         public void run()
+         {
+            try
+            {
+               journal.compact();
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+            }
+         }
+      };
+
+      t.start();
+
+      latchDone.await();
+
+      int nextID = NUMBER_OF_RECORDS;
+
+      for (int i = 0; i < 100; i++)
+      {
+         add(nextID++);
+         if (i % 10 == 0)
+         {
+            journal.forceMoveNextFile();
+         }
+      }
+
+      latchWait.countDown();
+
+      t.join();
+      
+      
+      for (int i = 0 ; i < 1000; i++)
+      {
+         long id = idGenerator.generateID();
+         add(id);
+         delete(id);
+         
+         if (i % 100 == 0)
+         {
+            journal.forceMoveNextFile();
+         }
+      }
+
+      journal.forceMoveNextFile();
+      
+      delete(0);
+      add(idGenerator.generateID());
+
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+
+   }
+
+   public void testCompactwithConcurrentAppendAndUpdate() throws Exception
+   {
+   }
+
+   public void testCompactWithPendingTransactionAndDelete() throws Exception
+   {
+   }
+
+   public void testCompactingWithPendingTransaction() throws Exception
+   {
+
+   }
+
+   public void testSimpleCompacting() throws Exception
+   {
+      setup(50, 60 * 1024, true);
+
+      createJournal();
+      startJournal();
+      load();
+
+      int NUMBER_OF_RECORDS = 1000;
+
+      // add and remove some data to force reclaiming
+      {
+         ArrayList<Long> ids = new ArrayList<Long>();
+         for (int i = 0; i < NUMBER_OF_RECORDS; i++)
+         {
+            long id = idGenerator.generateID();
+            ids.add(id);
+            add(id);
+            if (i > 0 && (i % 100 == 0))
+            {
+               journal.forceMoveNextFile();
+            }
+         }
+
+         for (Long id : ids)
+         {
+            delete(id);
+         }
+
+         journal.forceMoveNextFile();
+
+         journal.checkAndReclaimFiles();
+      }
+
+      long transactionID = 0;
+
+      for (int i = 0; i < NUMBER_OF_RECORDS / 2; i++)
+      {
+         add(i);
+         if (i % 10 == 0 && i > 0)
+         {
+            journal.forceMoveNextFile();
+         }
+         update(i);
+      }
+
+      for (int i = NUMBER_OF_RECORDS / 2; i < NUMBER_OF_RECORDS; i++)
+      {
+
+         addTx(transactionID, i);
+         updateTx(transactionID, i);
+         if (i % 10 == 0)
+         {
+            journal.forceMoveNextFile();
+         }
+         commit(transactionID++);
+         update(i);
+      }
+
+      System.out.println("Number of Files: " + journal.getDataFilesCount());
+
+      for (int i = 0; i < NUMBER_OF_RECORDS; i++)
+      {
+         if (!(i % 10 == 0))
+         {
+            delete(i);
+         }
+      }
+
+      journal.forceMoveNextFile();
+
+      System.out.println("Number of Files: " + journal.getDataFilesCount());
+
+      System.out.println("Before compact ****************************");
+      System.out.println(journal.debug());
+      System.out.println("*****************************************");
+
+      journal.compact();
+
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+
+   }
+
+   protected int getAlignment()
+   {
+      return 1;
+   }
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      File file = new File(journalDir);
+
+      deleteDirectory(file);
+
+      file.mkdir();
+   }
+
+   protected SequentialFileFactory createFactory()
+   {
+      return new NIOSequentialFileFactory(journalDir);
+   }
+
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.tests.unit.core.journal.impl.JournalImplTestBase#getFileFactory()
+    */
+   @Override
+   protected SequentialFileFactory getFileFactory() throws Exception
+   {
+      return createFactory();
+   }
+
+}

Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java	2009-06-26 17:03:46 UTC (rev 7482)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java	2009-06-26 22:56:53 UTC (rev 7483)
@@ -38,7 +38,6 @@
 import org.jboss.messaging.core.journal.TestableJournal;
 import org.jboss.messaging.core.journal.impl.JournalImpl;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.tests.util.RandomUtil;
 import org.jboss.messaging.tests.util.UnitTestCase;
 
 /**

Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-06-26 17:03:46 UTC (rev 7482)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-06-26 22:56:53 UTC (rev 7483)
@@ -358,9 +358,8 @@
       {
          if (!open)
          {
-            throw new IllegalStateException("Is closed");
+            close();
          }
-         close();
 
          fileMap.remove(fileName);
       }




More information about the jboss-cvs-commits mailing list