[jboss-cvs] JBoss Messaging SVN: r7483 - in branches/clebert_temp_expirement: src/main/org/jboss/messaging/utils and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jun 26 18:56:53 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-06-26 18:56:53 -0400 (Fri, 26 Jun 2009)
New Revision: 7483
Added:
branches/clebert_temp_expirement/src/main/org/jboss/messaging/utils/concurrent/
branches/clebert_temp_expirement/src/main/org/jboss/messaging/utils/concurrent/BlockingDeque.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/utils/concurrent/Deque.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/utils/concurrent/LinkedBlockingDeque.java
branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalCompactTest.java
Modified:
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
Dealing with concurrent insert while compacting
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-26 17:03:46 UTC (rev 7482)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-26 22:56:53 UTC (rev 7483)
@@ -65,6 +65,7 @@
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.utils.Pair;
import org.jboss.messaging.utils.VariableLatch;
+import org.jboss.messaging.utils.concurrent.LinkedBlockingDeque;
/**
*
@@ -94,19 +95,23 @@
private static final Logger log = Logger.getLogger(JournalImpl.class);
- private static final boolean trace = log.isTraceEnabled();
+ // private static final boolean trace = log.isTraceEnabled();
- // private static final boolean trace = true;
+ private static final boolean LOAD_TRACE = false;
+ private static final boolean trace = true;
+
// This method exists just to make debug easier.
// I could replace log.trace by log.info temporarily while I was debugging
// Journal
private static final void trace(final String message)
{
- // System.out.println(message);
- log.trace(message);
+ System.out.println(message);
+ // log.trace(message);
}
+ private static final String FILE_COMPACT_CONTROL = "journal-rename-control.ctr";
+
// The sizes of primitive types
private static final int SIZE_LONG = 8;
@@ -182,7 +187,7 @@
public final String fileExtension;
- private final Queue<JournalFile> dataFiles = new ConcurrentLinkedQueue<JournalFile>();
+ private final LinkedBlockingDeque<JournalFile> dataFiles = new LinkedBlockingDeque<JournalFile>();
private final Queue<JournalFile> freeFiles = new ConcurrentLinkedQueue<JournalFile>();
@@ -793,6 +798,8 @@
return maxID;
}
+
+ // Note: This method can't be called from the executor, as it will invoke other methods depending on it.
public void compact() throws Exception
{
if (compactor != null)
@@ -811,25 +818,28 @@
try
{
- // First, we replace the records by a new one.
// We need to guarantee that the journal is frozen for this short time
// We don't freeze the journal as we compact, only for the short time where we replace records
compactingLock.writeLock().lock();
- currentFile.clearCounts();
try
{
+ // We need to move to the next file, as we need a clear start for negatives and positives counts
+ moveNextFile();
+
autoReclaim = false;
- recordsSnapshot = records;
- pendingTransactions = this.pendingTransactions;
+ // Take the snapshots and replace the structures
+ recordsSnapshot = JournalImpl.this.records;
+ pendingTransactions = JournalImpl.this.pendingTransactions;
+
+ JournalImpl.this.records = new ConcurrentHashMap<Long, JournalRecord>();
+
records = new ConcurrentHashMap<Long, JournalRecord>();
- for (JournalFile file : dataFiles)
- {
- file.clearCounts();
- dataFilesToProcess.add(file);
- }
+ dataFilesToProcess.addAll(dataFiles);
+
+ dataFiles.clear();
this.compactor = new Compactor(recordsSnapshot, pendingTransactions, dataFilesToProcess.get(0).getFileID());
@@ -839,60 +849,106 @@
compactingLock.writeLock().unlock();
}
- }
- finally
- {
- autoReclaim = previousReclaimValue;
- }
+ // Read the files, and use the Compactor class to create the new outputFiles, and the new collections as well
+ JournalFile previousFile = null;
+ for (final JournalFile file : dataFilesToProcess)
+ {
+ if (previousFile != null)
+ {
+ if (file.getFileID() < previousFile.getFileID())
+ {
+ // Sanity check, this should never happen
+ throw new IllegalStateException("DataFiles out of order!");
+ }
+ }
+ previousFile = file;
- for (final JournalFile file : dataFilesToProcess)
- {
- readJournalFile(file, compactor);
- }
+ log.info("Compacting file " + file.getFile().getFileName() + ", internalID = " + file.getFileID());
+ readJournalFile(file, compactor);
+ }
- compactor.flush();
+ compactor.flush();
- createRenameFile(dataFilesToProcess, dataFilesToProcess);
+ // pointcut for tests
+ // We need to test concurrent updates on the journal, as the compacting is being performed.
+ // Usually tests will use this to hold the compacting while other structures are being updated.
+ onCompactDone();
- compactingLock.writeLock().lock();
- try
- {
- // Restore relationshipMap
- // Deal with updates and deletes that happened during the compacting
+ SequentialFile controlFile = createControlFile(dataFilesToProcess, compactor.newDataFiles);
+ compactingLock.writeLock().lock();
+ try
+ {
+ for ( Map.Entry<Long, JournalRecord> newRecordEntry: compactor.newRecords.entrySet())
+ {
+ records.put(newRecordEntry.getKey(), newRecordEntry.getValue());
+ }
+
+ for (JournalFile data: compactor.newDataFiles)
+ {
+ dataFiles.addFirst(data);
+ }
+ //dataFiles.add
+
+ // Restore relationshipMap
+ // Deal with transactions commits that happend during the compacting
+ // Deal with updates and deletes that happened during the compacting
+
+ }
+ finally
+ {
+ compactingLock.writeLock().unlock();
+ }
+
+ renameFiles(dataFilesToProcess, compactor.newDataFiles);
+ deleteControlFile(controlFile);
+
}
finally
{
- compactingLock.writeLock().unlock();
+ autoReclaim = previousReclaimValue;
}
- renameFiles(dataFilesToProcess, compactor.newDataFiles);
}
- protected void renameFiles(List<JournalFile> files, List<JournalFile> newFiles) throws Exception
+ protected void deleteControlFile(SequentialFile controlFile) throws Exception
{
- for (JournalFile file : files)
+ controlFile.delete();
+ }
+
+ /** being protected as testcases can override this method */
+ protected void renameFiles(List<JournalFile> oldFiles, List<JournalFile> newFiles) throws Exception
+ {
+ for (JournalFile file : oldFiles)
{
- reinitializeFile(file);
+ System.out.println("Reinitializing file " + file);
+ dataFiles.remove(file);
+ freeFiles.add(reinitializeFile(file));
}
-
+
for (JournalFile file : newFiles)
{
String newName = file.getFile().getFileName();
- System.out.println("name = " + newName);
newName = newName.substring(0, newName.lastIndexOf(".cmp"));
-
+
+ System.out.println("Renaming file " + newName);
+
file.getFile().renameTo(newName);
}
-
+
}
+ /** This is an interception point for testcases, when the compacted files are written, before replacing the data structures */
+ protected void onCompactDone()
+ {
+ }
+
/**
* @throws Exception
*/
- protected SequentialFile createRenameFile(List<JournalFile> files, List<JournalFile> newFiles) throws Exception
+ protected SequentialFile createControlFile(List<JournalFile> files, List<JournalFile> newFiles) throws Exception
{
- SequentialFile tmpRenameFile = fileFactory.createSequentialFile("journal-rename" + ".ren", 1);
+ SequentialFile tmpRenameFile = fileFactory.createSequentialFile(FILE_COMPACT_CONTROL, 1);
tmpRenameFile.open();
@@ -1003,12 +1059,11 @@
sequentialFile = currentFile.getFile();
sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
sequentialFile.open(1);
- fileID = currentFile.getFileID();
- currentFile = new JournalFileImpl(sequentialFile, fileID, nextOrderingID);
+ fileID = nextOrderingID++;
+ currentFile = new JournalFileImpl(sequentialFile, fileID, fileID);
channelWrapper.writeInt(fileID);
- channelWrapper.writeInt(currentFile.getOrderingID());
-
+ channelWrapper.writeInt(fileID);
}
public void addRecord(RecordInfo info) throws Exception
@@ -1144,7 +1199,6 @@
{
if (recordsSnapshot.get(info.id) != null)
{
- System.out.println("UpdateRecord on compacting");
int size = SIZE_UPDATE_RECORD + info.data.length;
checkSize(size);
@@ -1288,7 +1342,7 @@
public void addRecord(RecordInfo info) throws Exception
{
- if (trace)
+ if (trace && LOAD_TRACE)
{
trace("AddRecord: " + info);
}
@@ -1301,7 +1355,7 @@
public void updateRecord(RecordInfo info) throws Exception
{
- if (trace)
+ if (trace && LOAD_TRACE)
{
trace("UpdateRecord: " + info);
}
@@ -1323,7 +1377,7 @@
public void deleteRecord(long recordID) throws Exception
{
- if (trace)
+ if (trace && LOAD_TRACE)
{
trace("DeleteRecord: " + recordID);
}
@@ -1347,7 +1401,7 @@
public void addRecordTX(long transactionID, RecordInfo info) throws Exception
{
- if (trace)
+ if (trace && LOAD_TRACE)
{
trace((info.isUpdate ? "updateRecordTX: " : "addRecordTX: ") + info + ", txid = " + transactionID);
}
@@ -1379,7 +1433,7 @@
public void deleteRecordTX(long transactionID, RecordInfo info) throws Exception
{
- if (trace)
+ if (trace && LOAD_TRACE)
{
trace("DeleteRecordTX: " + transactionID + " info = " + info);
}
@@ -1412,7 +1466,7 @@
public void prepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
{
- if (trace)
+ if (trace && LOAD_TRACE)
{
trace("prepareRecordTX: txid = " + transactionID);
}
@@ -1457,7 +1511,7 @@
public void commitRecord(long transactionID, int numberOfRecords) throws Exception
{
- if (trace)
+ if (trace && LOAD_TRACE)
{
trace("commitRecord: txid = " + transactionID);
}
@@ -1518,7 +1572,7 @@
public void rollbackRecord(long transactionID) throws Exception
{
- if (trace)
+ if (trace && LOAD_TRACE)
{
trace("rollbackRecord: txid = " + transactionID);
}
@@ -1548,7 +1602,7 @@
public void markAsDataFile(JournalFile file)
{
- if (trace)
+ if (trace && LOAD_TRACE)
{
trace("Marking " + file + " as data file");
}
@@ -1748,38 +1802,50 @@
public void checkAndReclaimFiles() throws Exception
{
- checkReclaimStatus();
+ // We can't start compacting while compacting is working
+ compactingLock.readLock().lock();
+ try
+ {
+ checkReclaimStatus();
- for (JournalFile file : dataFiles)
- {
- if (file.isCanReclaim())
+ for (JournalFile file : dataFiles)
{
- // File can be reclaimed or deleted
-
- if (trace)
+ if (file.isCanReclaim())
{
- trace("Reclaiming file " + file);
- }
+ // File can be reclaimed or deleted
- dataFiles.remove(file);
+ if (trace)
+ {
+ trace("Reclaiming file " + file);
+ }
- // FIXME - size() involves a scan!!!
- if (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() < minFiles)
- {
- // Re-initialise it
+ if (!dataFiles.remove(file))
+ {
+ log.warn("Could not remove file " + file);
+ }
- JournalFile jf = reinitializeFile(file);
+ // FIXME - size() involves a scan!!!
+ if (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() < minFiles)
+ {
+ // Re-initialise it
- freeFiles.add(jf);
- }
- else
- {
- file.getFile().open(1);
+ JournalFile jf = reinitializeFile(file);
- file.getFile().delete();
+ freeFiles.add(jf);
+ }
+ else
+ {
+ file.getFile().open(1);
+
+ file.getFile().delete();
+ }
}
}
}
+ finally
+ {
+ compactingLock.readLock().unlock();
+ }
}
public int getDataFilesCount()
@@ -2551,26 +2617,6 @@
// Now order them by ordering id - we can't use the file name for ordering
// since we can re-use dataFiles
- class JournalFileComparator implements Comparator<JournalFile>
- {
- public int compare(final JournalFile f1, final JournalFile f2)
- {
- int oid1 = f1.getOrderingID();
- int oid2 = f2.getOrderingID();
-
- if (oid1 == oid2)
- {
- int id1 = f1.getFileID();
- int id2 = f2.getFileID();
- return oid1 < id2 ? -1 : id1 == id2 ? 0 : 1;
- }
- else
- {
- return oid1 < oid2 ? -1 : oid1 == oid2 ? 0 : 1;
- }
- }
- }
-
Collections.sort(orderedFiles, new JournalFileComparator());
return orderedFiles;
@@ -2751,6 +2797,11 @@
currentFile = enqueueOpenFile();
+ if (trace)
+ {
+ trace("moveNextFile: " + currentFile.getFile().getFileName());
+ }
+
fileFactory.activate(currentFile.getFile());
}
@@ -2997,9 +3048,9 @@
if (updateFiles != null)
{
- for (JournalFile jf : updateFiles)
+ for (JournalFile updFile : updateFiles)
{
- file.incNegCount(jf);
+ file.incNegCount(updFile);
}
}
}
@@ -3367,6 +3418,26 @@
}
+ private static class JournalFileComparator implements Comparator<JournalFile>
+ {
+ public int compare(final JournalFile f1, final JournalFile f2)
+ {
+ int oid1 = f1.getOrderingID();
+ int oid2 = f2.getOrderingID();
+
+ if (oid1 == oid2)
+ {
+ int id1 = f1.getFileID();
+ int id2 = f2.getFileID();
+ return oid1 < id2 ? -1 : id1 == id2 ? 0 : 1;
+ }
+ else
+ {
+ return oid1 < oid2 ? -1 : oid1 == oid2 ? 0 : 1;
+ }
+ }
+ }
+
private class PerfBlast extends Thread
{
private final int pages;
Added: branches/clebert_temp_expirement/src/main/org/jboss/messaging/utils/concurrent/BlockingDeque.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/utils/concurrent/BlockingDeque.java (rev 0)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/utils/concurrent/BlockingDeque.java 2009-06-26 22:56:53 UTC (rev 7483)
@@ -0,0 +1,238 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package org.jboss.messaging.utils.concurrent; // XXX This belongs in java.util!!! XXX
+import java.util.concurrent.*; // XXX This import goes away XXX
+import java.util.*;
+
+/**
+ * A {@link Deque} that additionally supports operations that wait for
+ * the deque to become non-empty when retrieving an element, and wait
+ * for space to become available in the deque when storing an
+ * element. These methods are summarized in the following table:<p>
+ *
+ * <table BORDER CELLPADDING=3 CELLSPACING=1>
+ * <tr>
+ * <td></td>
+ * <td ALIGN=CENTER COLSPAN = 2> <b>First Element (Head)</b></td>
+ * <td ALIGN=CENTER COLSPAN = 2> <b>Last Element (Tail)</b></td>
+ * </tr>
+ * <tr>
+ * <td></td>
+ * <td ALIGN=CENTER><em>Block</em></td>
+ * <td ALIGN=CENTER><em>Time out</em></td>
+ * <td ALIGN=CENTER><em>Block</em></td>
+ * <td ALIGN=CENTER><em>Time out</em></td>
+ * </tr>
+ * <tr>
+ * <td><b>Insert</b></td>
+ * <td>{@link #putFirst putFirst(e)}</td>
+ * <td>{@link #offerFirst(Object, long, TimeUnit) offerFirst(e, time, unit)}</td>
+ * <td>{@link #putLast putLast(e)}</td>
+ * <td>{@link #offerLast(Object, long, TimeUnit) offerLast(e, time, unit)}</td>
+ * </tr>
+ * <tr>
+ * <td><b>Remove</b></td>
+ * <td>{@link #takeFirst takeFirst()}</td>
+ * <td>{@link #pollFirst(long, TimeUnit) pollFirst(time, unit)}</td>
+ * <td>{@link #takeLast takeLast()}</td>
+ * <td>{@link #pollLast(long, TimeUnit) pollLast(time, unit)}</td>
+ * </tr>
+ * </table>
+ *
+ * <p>Like any {@link BlockingQueue}, a <tt>BlockingDeque</tt> is
+ * thread safe and may (or may not) be capacity-constrained. A
+ * <tt>BlockingDeque</tt> implementation may be used directly as a
+ * FIFO <tt>BlockingQueue</tt>. The blocking methods inherited from
+ * the <tt>BlockingQueue</tt> interface are precisely equivalent to
+ * <tt>BlockingDeque</tt> methods as indicated in the following table:<p>
+ *
+ * <table BORDER CELLPADDING=3 CELLSPACING=1>
+ * <tr>
+ * <td ALIGN=CENTER> <b><tt>BlockingQueue</tt> Method</b></td>
+ * <td ALIGN=CENTER> <b>Equivalent <tt>BlockingDeque</tt> Method</b></td>
+ * </tr>
+ * <tr>
+ * <tr>
+ * <td>{@link java.util.concurrent.BlockingQueue#put put(e)}</td>
+ * <td>{@link #putLast putLast(e)}</td>
+ * </tr>
+ * <tr>
+ * <td>{@link java.util.concurrent.BlockingQueue#take take()}</td>
+ * <td>{@link #takeFirst takeFirst()}</td>
+ * </tr>
+ * <tr>
+ * <td>{@link java.util.concurrent.BlockingQueue#offer(Object, long, TimeUnit) offer(e, time. unit)}</td>
+ * <td>{@link #offerLast(Object, long, TimeUnit) offerLast(e, time, unit)}</td>
+ * </tr>
+ * <tr>
+ * <td>{@link java.util.concurrent.BlockingQueue#poll(long, TimeUnit) poll(time, unit)}</td>
+ * <td>{@link #pollFirst(long, TimeUnit) pollFirst(time, unit)}</td>
+ * </tr>
+ * </table>
+ *
+ *
+ * <p>This interface is a member of the
+ * <a href="{@docRoot}/../guide/collections/index.html">
+ * Java Collections Framework</a>.
+ *
+ * @since 1.6
+ * @author Doug Lea
+ * @param <E> the type of elements held in this collection
+ */
+public interface BlockingDeque<E> extends Deque<E>, BlockingQueue<E> {
+
+ /**
+ * Adds the specified element as the first element of this deque,
+ * waiting if necessary for space to become available.
+ * @param o the element to add
+ * @throws InterruptedException if interrupted while waiting.
+ * @throws NullPointerException if the specified element is <tt>null</tt>.
+ */
+ void putFirst(E o) throws InterruptedException;
+
+ /**
+ * Adds the specified element as the last element of this deque,
+ * waiting if necessary for space to become available.
+ * @param o the element to add
+ * @throws InterruptedException if interrupted while waiting.
+ * @throws NullPointerException if the specified element is <tt>null</tt>.
+ */
+ void putLast(E o) throws InterruptedException;
+
+ /**
+ * Retrieves and removes the first element of this deque, waiting
+ * if no elements are present on this deque.
+ * @return the head of this deque
+ * @throws InterruptedException if interrupted while waiting.
+ */
+ E takeFirst() throws InterruptedException;
+
+ /**
+ * Retrieves and removes the last element of this deque, waiting
+ * if no elements are present on this deque.
+ * @return the head of this deque
+ * @throws InterruptedException if interrupted while waiting.
+ */
+ E takeLast() throws InterruptedException;
+
+ /**
+ * Inserts the specified element as the first element of this deque,
+ * waiting if necessary up to the specified wait time for space to
+ * become available.
+ * @param o the element to add
+ * @param timeout how long to wait before giving up, in units of
+ * <tt>unit</tt>
+ * @param unit a <tt>TimeUnit</tt> determining how to interpret the
+ * <tt>timeout</tt> parameter
+ * @return <tt>true</tt> if successful, or <tt>false</tt> if
+ * the specified waiting time elapses before space is available.
+ * @throws InterruptedException if interrupted while waiting.
+ * @throws NullPointerException if the specified element is <tt>null</tt>.
+ */
+ boolean offerFirst(E o, long timeout, TimeUnit unit)
+ throws InterruptedException;
+
+ /**
+ * Inserts the specified element as the last element of this deque,
+ * waiting if necessary up to the specified wait time for space to
+ * become available.
+ * @param o the element to add
+ * @param timeout how long to wait before giving up, in units of
+ * <tt>unit</tt>
+ * @param unit a <tt>TimeUnit</tt> determining how to interpret the
+ * <tt>timeout</tt> parameter
+ * @return <tt>true</tt> if successful, or <tt>false</tt> if
+ * the specified waiting time elapses before space is available.
+ * @throws InterruptedException if interrupted while waiting.
+ * @throws NullPointerException if the specified element is <tt>null</tt>.
+ */
+ boolean offerLast(E o, long timeout, TimeUnit unit)
+ throws InterruptedException;
+
+
+ /**
+ * Retrieves and removes the first element of this deque, waiting
+ * if necessary up to the specified wait time if no elements are
+ * present on this deque.
+ * @param timeout how long to wait before giving up, in units of
+ * <tt>unit</tt>
+ * @param unit a <tt>TimeUnit</tt> determining how to interpret the
+ * <tt>timeout</tt> parameter
+ * @return the head of this deque, or <tt>null</tt> if the
+ * specified waiting time elapses before an element is present.
+ * @throws InterruptedException if interrupted while waiting.
+ */
+ E pollFirst(long timeout, TimeUnit unit)
+ throws InterruptedException;
+
+ /**
+ * Retrieves and removes the last element of this deque, waiting
+ * if necessary up to the specified wait time if no elements are
+ * present on this deque.
+ * @param timeout how long to wait before giving up, in units of
+ * <tt>unit</tt>
+ * @param unit a <tt>TimeUnit</tt> determining how to interpret the
+ * <tt>timeout</tt> parameter
+ * @return the head of this deque, or <tt>null</tt> if the
+ * specified waiting time elapses before an element is present.
+ * @throws InterruptedException if interrupted while waiting.
+ */
+ E pollLast(long timeout, TimeUnit unit)
+ throws InterruptedException;
+
+ /**
+ * Adds the specified element as the last element of this deque,
+ * waiting if necessary for space to become available. This
+ * method is equivalent to to putLast
+ * @param o the element to add
+ * @throws InterruptedException if interrupted while waiting.
+ * @throws NullPointerException if the specified element is <tt>null</tt>.
+ */
+ void put(E o) throws InterruptedException;
+
+ /**
+ * Inserts the specified element as the lest element of this
+ * deque, if possible. When using deques that may impose
+ * insertion restrictions (for example capacity bounds), method
+ * <tt>offer</tt> is generally preferable to method {@link
+ * Collection#add}, which can fail to insert an element only by
+ * throwing an exception. This method is equivalent to to
+ * offerLast
+ *
+ * @param o the element to add.
+ * @return <tt>true</tt> if it was possible to add the element to
+ * this deque, else <tt>false</tt>
+ * @throws NullPointerException if the specified element is <tt>null</tt>
+ */
+ boolean offer(E o, long timeout, TimeUnit unit)
+ throws InterruptedException;
+
+ /**
+ * Retrieves and removes the first element of this deque, waiting
+ * if no elements are present on this deque.
+ * This method is equivalent to to takeFirst
+ * @return the head of this deque
+ * @throws InterruptedException if interrupted while waiting.
+ */
+ E take() throws InterruptedException;
+
+ /**
+ * Retrieves and removes the first element of this deque, waiting
+ * if necessary up to the specified wait time if no elements are
+ * present on this deque. This method is equivalent to to
+ * pollFirst
+ * @param timeout how long to wait before giving up, in units of
+ * <tt>unit</tt>
+ * @param unit a <tt>TimeUnit</tt> determining how to interpret the
+ * <tt>timeout</tt> parameter
+ * @return the head of this deque, or <tt>null</tt> if the
+ * specified waiting time elapses before an element is present.
+ * @throws InterruptedException if interrupted while waiting.
+ */
+ E poll(long timeout, TimeUnit unit)
+ throws InterruptedException;
+}
Added: branches/clebert_temp_expirement/src/main/org/jboss/messaging/utils/concurrent/Deque.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/utils/concurrent/Deque.java (rev 0)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/utils/concurrent/Deque.java 2009-06-26 22:56:53 UTC (rev 7483)
@@ -0,0 +1,442 @@
+/*
+ * Written by Doug Lea and Josh Bloch with assistance from members of
+ * JCP JSR-166 Expert Group and released to the public domain, as explained
+ * at http://creativecommons.org/licenses/publicdomain
+ */
+
+package org.jboss.messaging.utils.concurrent; // XXX This belongs in java.util!!! XXX
+import java.util.*; // XXX This import goes away XXX
+
+/**
+ * A linear collection that supports element insertion and removal at
+ * both ends. The name <i>deque</i> is short for "double ended queue"
+ * and is usually pronounced "deck". Most <tt>Deque</tt>
+ * implementations place no fixed limits on the number of elements
+ * they may contain, but this interface supports capacity-restricted
+ * deques as well as those with no fixed size limit.
+ *
+ * <p>This interface defines methods to access the elements at both
+ * ends of the deque. Methods are provided to insert, remove, and
+ * examine the element. Each of these methods exists in two forms:
+ * one throws an exception if the operation fails, the other returns a
+ * special value (either <tt>null</tt> or <tt>false</tt>, depending on
+ * the operation). The latter form of the insert operation is
+ * designed specifically for use with capacity-restricted
+ * <tt>Deque</tt> implementations; in most implementations, insert
+ * operations cannot fail.
+ *
+ * <p>The twelve methods described above are are summarized in the
+ * follwoing table:<p>
+ *
+ * <table BORDER CELLPADDING=3 CELLSPACING=1>
+ * <tr>
+ * <td></td>
+ * <td ALIGN=CENTER COLSPAN = 2> <b>First Element (Head)</b></td>
+ * <td ALIGN=CENTER COLSPAN = 2> <b>Last Element (Tail)</b></td>
+ * </tr>
+ * <tr>
+ * <td></td>
+ * <td ALIGN=CENTER><em>Throws exception</em></td>
+ * <td ALIGN=CENTER><em>Returns special value</em></td>
+ * <td ALIGN=CENTER><em>Throws exception</em></td>
+ * <td ALIGN=CENTER><em>Returns special value</em></td>
+ * </tr>
+ * <tr>
+ * <td><b>Insert</b></td>
+ * <td>{@link #addFirst addFirst(e)}</td>
+ * <td>{@link #offerFirst offerFirst(e)}</td>
+ * <td>{@link #addLast addLast(e)}</td>
+ * <td>{@link #offerLast offerLast(e)}</td>
+ * </tr>
+ * <tr>
+ * <td><b>Remove</b></td>
+ * <td>{@link #removeFirst removeFirst()}</td>
+ * <td>{@link #pollFirst pollFirst()}</td>
+ * <td>{@link #removeLast removeLast()}</td>
+ * <td>{@link #pollLast pollLast()}</td>
+ * </tr>
+ * <tr>
+ * <td><b>Examine</b></td>
+ * <td>{@link #getFirst getFirst()}</td>
+ * <td>{@link #peekFirst peekFirst()}</td>
+ * <td>{@link #getLast getLast()}</td>
+ * <td>{@link #peekLast peekLast()}</td>
+ * </tr>
+ * </table>
+ *
+ * <p>This interface extends the {@link Queue} interface. When a deque is
+ * used as a queue, FIFO (First-In-First-Out) behavior results. Elements are
+ * added to the end of the deque and removed from the beginning. The methods
+ * inherited from the <tt>Queue</tt> interface are precisely equivalent to
+ * <tt>Deque</tt> methods as indicated in the following table:<p>
+ *
+ * <table BORDER CELLPADDING=3 CELLSPACING=1>
+ * <tr>
+ * <td ALIGN=CENTER> <b><tt>Queue</tt> Method</b></td>
+ * <td ALIGN=CENTER> <b>Equivalent <tt>Deque</tt> Method</b></td>
+ * </tr>
+ * <tr>
+ * <tr>
+ * <td>{@link java.util.Queue#offer offer(e)}</td>
+ * <td>{@link #offerLast offerLast(e)}</td>
+ * </tr>
+ * <tr>
+ * <td>{@link java.util.Queue#add add(e)}</td>
+ * <td>{@link #addLast addLast(e)}</td>
+ * </tr>
+ * <tr>
+ * <td>{@link java.util.Queue#poll poll()}</td>
+ * <td>{@link #pollFirst pollFirst()}</td>
+ * </tr>
+ * <tr>
+ * <td>{@link java.util.Queue#remove remove()}</td>
+ * <td>{@link #removeFirst removeFirst()}</td>
+ * </tr>
+ * <tr>
+ * <td>{@link java.util.Queue#peek peek()}</td>
+ * <td>{@link #peek peekFirst()}</td>
+ * </tr>
+ * <tr>
+ * <td>{@link java.util.Queue#element element()}</td>
+ * <td>{@link #getFirst getFirst()}</td>
+ * </tr>
+ * </table>
+ *
+ * <p>Deques can also be used as LIFO (Last-In-First-Out) stacks. This
+ * interface should be used in preference to the legacy {@link Stack} class.
+ * When a dequeue is used as a stack, elements are pushed and popped from the
+ * beginning of the deque. Stack methods are precisely equivalent to
+ * <tt>Deque</tt> methods as indicated in the table below:<p>
+ *
+ * <table BORDER CELLPADDING=3 CELLSPACING=1>
+ * <tr>
+ * <td ALIGN=CENTER> <b>Stack Method</b></td>
+ * <td ALIGN=CENTER> <b>Equivalent <tt>Deque</tt> Method</b></td>
+ * </tr>
+ * <tr>
+ * <tr>
+ * <td>{@link #push push(e)}</td>
+ * <td>{@link #addFirst addFirst(e)}</td>
+ * </tr>
+ * <tr>
+ * <td>{@link #pop pop()}</td>
+ * <td>{@link #removeFirst removeFirst()}</td>
+ * </tr>
+ * <tr>
+ * <td>{@link #peek peek()}</td>
+ * <td>{@link #peekFirst peekFirst()}</td>
+ * </tr>
+ * </table>
+ *
+ * <p>Note that the {@link #peek peek} method works equally well when
+ * a deque is used as a queue or a stack; in either case, elements are
+ * drawn from the beginning of the deque.
+ *
+ * <p>This inteface provides two methods to to remove interior
+ * elements, {@link #removeFirstOccurrence removeFirstOccurrence} and
+ * {@link #removeLastOccurrence removeLastOccurrence}. Unlike the
+ * {@link List} interface, this interface does not provide support for
+ * indexed access to elements.
+ *
+ * <p>While <tt>Deque</tt> implementations are not strictly required
+ * to prohibit the insertion of null elements, they are strongly
+ * encouraged to do so. Users of any <tt>Deque</tt> implementations
+ * that do allow null elements are strongly encouraged <i>not</i> to
+ * take advantage of the ability to insert nulls. This is so because
+ * <tt>null</tt> is used as a special return value by various methods
+ * to indicated that the deque is empty.
+ *
+ * <p><tt>Deque</tt> implementations generally do not define
+ * element-based versions of the <tt>equals</tt> and <tt>hashCode</tt>
+ * methods, but instead inherit the identity-based versions from class
+ * <tt>Object</tt>.
+ *
+ * <p>This interface is a member of the <a
+ * href="{@docRoot}/../guide/collections/index.html"> Java Collections
+ * Framework</a>.
+ *
+ * @author Doug Lea
+ * @author Josh Bloch
+ * @since 1.6
+ * @param <E> the type of elements held in this collection
+ */
+public interface Deque<E> extends Queue<E> {
+ /**
+ * Inserts the specified element to the front this deque unless it would
+ * violate capacity restrictions. When using a capacity-restricted deque,
+ * this method is generally preferable to method <tt>addFirst</tt>, which
+ * can fail to insert an element only by throwing an exception.
+ *
+ * @param e the element to insert
+ * @return <tt>true</tt> if it was possible to insert the element,
+ * else <tt>false</tt>
+ * @throws NullPointerException if <tt>e</tt> is null and this
+ * deque does not permit null elements
+ */
+ boolean offerFirst(E e);
+
+ /**
+ * Inserts the specified element to the end of this deque unless it would
+ * violate capacity restrictions. When using a capacity-restricted deque,
+ * this method is generally preferable to method <tt>addLast</tt> which
+ * can fail to insert an element only by throwing an exception.
+ *
+ * @param e the element to insert
+ * @return <tt>true</tt> if it was possible to insert the element,
+ * else <tt>false</tt>
+ * @throws NullPointerException if <tt>e</tt> is null and this
+ * deque does not permit null elements
+ */
+ boolean offerLast(E e);
+
+ /**
+ * Inserts the specified element to the front of this deque unless it
+ * would violate capacity restrictions.
+ *
+ * @param e the element to insert
+ * @throws IllegalStateException if it was not possible to insert
+ * the element due to capacity restrictions
+ * @throws NullPointerException if <tt>e</tt> is null and this
+ * deque does not permit null elements
+ */
+ void addFirst(E e);
+
+ /**
+ * Inserts the specified element to the end of this deque unless it would
+ * violate capacity restrictions.
+ *
+ * @param e the element to insert
+ * @throws IllegalStateException if it was not possible to insert
+ * the element due to capacity restrictions
+ * @throws NullPointerException if <tt>e</tt> is null and this
+ * deque does not permit null elements
+ */
+ void addLast(E e);
+
+ /**
+ * Retrieves and removes the first element of this deque, or
+ * <tt>null</tt> if this deque is empty.
+ *
+ * @return the first element of this deque, or <tt>null</tt> if
+ * this deque is empty
+ */
+ E pollFirst();
+
+ /**
+ * Retrieves and removes the last element of this deque, or
+ * <tt>null</tt> if this deque is empty.
+ *
+ * @return the last element of this deque, or <tt>null</tt> if
+ * this deque is empty
+ */
+ E pollLast();
+
+ /**
+ * Removes and returns the first element of this deque. This method
+ * differs from the <tt>pollFirst</tt> method only in that it throws an
+ * exception if this deque is empty.
+ *
+ * @return the first element of this deque
+ * @throws NoSuchElementException if this deque is empty
+ */
+ E removeFirst();
+
+ /**
+ * Retrieves and removes the last element of this deque. This method
+ * differs from the <tt>pollLast</tt> method only in that it throws an
+ * exception if this deque is empty.
+ *
+ * @return the last element of this deque
+ * @throws NoSuchElementException if this deque is empty
+ */
+ E removeLast();
+
+ /**
+ * Retrieves, but does not remove, the first element of this deque,
+ * returning <tt>null</tt> if this deque is empty.
+ *
+ * @return the first element of this deque, or <tt>null</tt> if
+ * this deque is empty
+ */
+ E peekFirst();
+
+ /**
+ * Retrieves, but does not remove, the last element of this deque,
+ * returning <tt>null</tt> if this deque is empty.
+ *
+ * @return the last element of this deque, or <tt>null</tt> if this deque
+ * is empty
+ */
+ E peekLast();
+
+ /**
+ * Retrieves, but does not remove, the first element of this
+ * deque. This method differs from the <tt>peek</tt> method only
+ * in that it throws an exception if this deque is empty.
+ *
+ * @return the first element of this deque
+ * @throws NoSuchElementException if this deque is empty
+ */
+ E getFirst();
+
+ /**
+ * Retrieves, but does not remove, the last element of this
+ * deque. This method differs from the <tt>peek</tt> method only
+ * in that it throws an exception if this deque is empty.
+ *
+ * @return the last element of this deque
+ * @throws NoSuchElementException if this deque is empty
+ */
+ E getLast();
+
+ /**
+ * Removes the first occurrence of the specified element in this
+ * deque. If the deque does not contain the element, it is
+ * unchanged. More formally, removes the first element <tt>e</tt>
+ * such that <tt>(o==null ? e==null : o.equals(e))</tt> (if
+ * such an element exists).
+ *
+ * @param e element to be removed from this deque, if present
+ * @return <tt>true</tt> if the deque contained the specified element
+ * @throws NullPointerException if the specified element is <tt>null</tt>
+ */
+ boolean removeFirstOccurrence(Object e);
+
+ /**
+ * Removes the last occurrence of the specified element in this
+ * deque. If the deque does not contain the element, it is
+ * unchanged. More formally, removes the last element <tt>e</tt>
+ * such that <tt>(o==null ? e==null : o.equals(e))</tt> (if
+ * such an element exists).
+ *
+ * @param e element to be removed from this deque, if present
+ * @return <tt>true</tt> if the deque contained the specified element
+ * @throws NullPointerException if the specified element is <tt>null</tt>
+ */
+ boolean removeLastOccurrence(Object e);
+
+
+ // *** Queue methods ***
+
+ /**
+ * Inserts the specified element into the queue represented by this deque
+ * unless it would violate capacity restrictions. In other words, inserts
+ * the specified element to the end of this deque. When using a
+ * capacity-restricted deque, this method is generally preferable to the
+ * {@link #add} method, which can fail to insert an element only by
+ * throwing an exception.
+ *
+ * <p>This method is equivalent to {@link #offerLast}.
+ *
+ * @param e the element to insert
+ * @return <tt>true</tt> if it was possible to insert the element,
+ * else <tt>false</tt>
+ * @throws NullPointerException if <tt>e</tt> is null and this
+ * deque does not permit null elements
+ */
+ boolean offer(E e);
+
+ /**
+ * Inserts the specified element into the queue represented by this
+ * deque unless it would violate capacity restrictions. In other words,
+ * inserts the specified element as the last element of this deque.
+ *
+ * <p>This method is equivalent to {@link #addLast}.
+ *
+ * @param e the element to insert
+ * @return <tt>true</tt> (as per the spec for {@link Collection#add})
+ * @throws IllegalStateException if it was not possible to insert
+ * the element due to capacity restrictions
+ * @throws NullPointerException if <tt>e</tt> is null and this
+ * deque does not permit null elements
+ */
+ boolean add(E e);
+
+ /**
+ * Retrieves and removes the head of the queue represented by
+ * this deque, or <tt>null</tt> if this deque is empty. In other words,
+ * retrieves and removes the first element of this deque, or <tt>null</tt>
+ * if this deque is empty.
+ *
+ * <p>This method is equivalent to {@link #pollFirst()}.
+ *
+ * @return the first element of this deque, or <tt>null</tt> if
+ * this deque is empty
+ */
+ E poll();
+
+ /**
+ * Retrieves and removes the head of the queue represented by this deque.
+ * This method differs from the <tt>poll</tt> method only in that it
+ * throws an exception if this deque is empty.
+ *
+ * <p>This method is equivalent to {@link #removeFirst()}.
+ *
+ * @return the head of the queue represented by this deque
+ * @throws NoSuchElementException if this deque is empty
+ */
+ E remove();
+
+ /**
+ * Retrieves, but does not remove, the head of the queue represented by
+ * this deque, returning <tt>null</tt> if this deque is empty.
+ *
+ * <p>This method is equivalent to {@link #peekFirst()}
+ *
+ * @return the head of the queue represented by this deque, or
+ * <tt>null</tt> if this deque is empty
+ */
+ E peek();
+
+ /**
+ * Retrieves, but does not remove, the head of the queue represented by
+ * this deque. This method differs from the <tt>peek</tt> method only in
+ * that it throws an exception if this deque is empty.
+ *
+ * <p>This method is equivalent to {@link #getFirst()}
+ *
+ * @return the head of the queue represented by this deque
+ * @throws NoSuchElementException if this deque is empty
+ */
+ E element();
+
+
+ // *** Stack methods ***
+
+ /**
+ * Pushes an element onto the stack represented by this deque. In other
+ * words, inserts the element to the front this deque unless it would
+ * violate capacity restrictions.
+ *
+ * <p>This method is equivalent to {@link #addFirst}.
+ *
+ * @throws IllegalStateException if it was not possible to insert
+ * the element due to capacity restrictions
+ * @throws NullPointerException if <tt>e</tt> is null and this
+ * deque does not permit null elements
+ */
+ void push(E e);
+
+ /**
+ * Pops an element from the stack represented by this deque. In other
+ * words, removes and returns the the first element of this deque.
+ *
+ * <p>This method is equivalent to {@link #removeFirst()}.
+ *
+ * @return the element at the front of this deque (which is the top
+ * of the stack represented by this deque)
+ * @throws NoSuchElementException if this deque is empty
+ */
+ E pop();
+
+
+ // *** Collection Method ***
+
+ /**
+ * Returns an iterator over the elements in this deque. The elements
+ * will be ordered from first (head) to last (tail).
+ *
+ * @return an <tt>Iterator</tt> over the elements in this deque
+ */
+ Iterator<E> iterator();
+}
Added: branches/clebert_temp_expirement/src/main/org/jboss/messaging/utils/concurrent/LinkedBlockingDeque.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/utils/concurrent/LinkedBlockingDeque.java (rev 0)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/utils/concurrent/LinkedBlockingDeque.java 2009-06-26 22:56:53 UTC (rev 7483)
@@ -0,0 +1,762 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ */
+
+package org.jboss.messaging.utils.concurrent;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.*;
+
+/**
+ * An optionally-bounded {@linkplain BlockingDeque blocking deque} based on
+ * linked nodes.
+ *
+ * <p> The optional capacity bound constructor argument serves as a
+ * way to prevent excessive expansion. The capacity, if unspecified,
+ * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
+ * dynamically created upon each insertion unless this would bring the
+ * deque above capacity.
+ *
+ * <p>Most operations run in constant time (ignoring time spent
+ * blocking). Exceptions include {@link #remove(Object) remove},
+ * {@link #removeFirstOccurrence removeFirstOccurrence}, {@link
+ * #removeLastOccurrence removeLastOccurrence}, {@link #contains
+ * contains }, {@link #iterator iterator.remove()}, and the bulk
+ * operations, all of which run in linear time.
+ *
+ * <p>This class and its iterator implement all of the
+ * <em>optional</em> methods of the {@link Collection} and {@link
+ * Iterator} interfaces. This class is a member of the <a
+ * href="{@docRoot}/../guide/collections/index.html"> Java Collections
+ * Framework</a>.
+ *
+ * @since 1.6
+ * @author Doug Lea
+ * @param <E> the type of elements held in this collection
+ */
+public class LinkedBlockingDeque<E>
+ extends AbstractQueue<E>
+ implements BlockingDeque<E>, java.io.Serializable {
+
+ /*
+ * Implemented as a simple doubly-linked list protected by a
+ * single lock and using conditions to manage blocking.
+ */
+
+ private static final long serialVersionUID = -387911632671998426L;
+
+ /** Doubly-linked list node class */
+ static final class Node<E> {
+ E item;
+ Node<E> prev;
+ Node<E> next;
+ Node(E x, Node<E> p, Node<E> n) {
+ item = x;
+ prev = p;
+ next = n;
+ }
+ }
+
+ /** Pointer to first node */
+ private transient Node<E> first;
+ /** Pointer to last node */
+ private transient Node<E> last;
+ /** Number of items in the deque */
+ private transient int count;
+ /** Maximum number of items in the deque */
+ private final int capacity;
+ /** Main lock guarding all access */
+ private final ReentrantLock lock = new ReentrantLock();
+ /** Condition for waiting takes */
+ private final Condition notEmpty = lock.newCondition();
+ /** Condition for waiting puts */
+ private final Condition notFull = lock.newCondition();
+
+ /**
+ * Creates a <tt>LinkedBlockingDeque</tt> with a capacity of
+ * {@link Integer#MAX_VALUE}.
+ */
+ public LinkedBlockingDeque() {
+ this(Integer.MAX_VALUE);
+ }
+
+ /**
+ * Creates a <tt>LinkedBlockingDeque</tt> with the given (fixed)
+ * capacity.
+ * @param capacity the capacity of this deque
+ * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
+ */
+ public LinkedBlockingDeque(int capacity) {
+ if (capacity <= 0) throw new IllegalArgumentException();
+ this.capacity = capacity;
+ }
+
+ /**
+ * Creates a <tt>LinkedBlockingDeque</tt> with a capacity of
+ * {@link Integer#MAX_VALUE}, initially containing the elements of the
+ * given collection,
+ * added in traversal order of the collection's iterator.
+ * @param c the collection of elements to initially contain
+ * @throws NullPointerException if <tt>c</tt> or any element within it
+ * is <tt>null</tt>
+ */
+ public LinkedBlockingDeque(Collection<? extends E> c) {
+ this(Integer.MAX_VALUE);
+ for (E e : c)
+ add(e);
+ }
+
+
+ // Basic linking and unlinking operations, called only while holding lock
+
+ /**
+ * Link e as first element, or return false if full
+ */
+ private boolean linkFirst(E e) {
+ if (count >= capacity)
+ return false;
+ ++count;
+ Node<E> f = first;
+ Node<E> x = new Node<E>(e, null, f);
+ first = x;
+ if (last == null)
+ last = x;
+ else
+ f.prev = x;
+ notEmpty.signal();
+ return true;
+ }
+
+ /**
+ * Link e as last element, or return false if full
+ */
+ private boolean linkLast(E e) {
+ if (count >= capacity)
+ return false;
+ ++count;
+ Node<E> l = last;
+ Node<E> x = new Node<E>(e, l, null);
+ last = x;
+ if (first == null)
+ first = x;
+ else
+ l.next = x;
+ notEmpty.signal();
+ return true;
+ }
+
+ /**
+ * Remove and return first element, or null if empty
+ */
+ private E unlinkFirst() {
+ Node<E> f = first;
+ if (f == null)
+ return null;
+ Node<E> n = f.next;
+ first = n;
+ if (n == null)
+ last = null;
+ else
+ n.prev = null;
+ --count;
+ notFull.signal();
+ return f.item;
+ }
+
+ /**
+ * Remove and return last element, or null if empty
+ */
+ private E unlinkLast() {
+ Node<E> l = last;
+ if (l == null)
+ return null;
+ Node<E> p = l.prev;
+ last = p;
+ if (p == null)
+ first = null;
+ else
+ p.next = null;
+ --count;
+ notFull.signal();
+ return l.item;
+ }
+
+ /**
+ * Unlink e
+ */
+ private void unlink(Node<E> x) {
+ Node<E> p = x.prev;
+ Node<E> n = x.next;
+ if (p == null) {
+ if (n == null)
+ first = last = null;
+ else {
+ n.prev = null;
+ first = n;
+ }
+ } else if (n == null) {
+ p.next = null;
+ last = p;
+ } else {
+ p.next = n;
+ n.prev = p;
+ }
+ --count;
+ notFull.signalAll();
+ }
+
+ // Deque methods
+
+ public boolean offerFirst(E o) {
+ if (o == null) throw new NullPointerException();
+ lock.lock();
+ try {
+ return linkFirst(o);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean offerLast(E o) {
+ if (o == null) throw new NullPointerException();
+ lock.lock();
+ try {
+ return linkLast(o);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void addFirst(E e) {
+ if (!offerFirst(e))
+ throw new IllegalStateException("Deque full");
+ }
+
+ public void addLast(E e) {
+ if (!offerLast(e))
+ throw new IllegalStateException("Deque full");
+ }
+
+ public E pollFirst() {
+ lock.lock();
+ try {
+ return unlinkFirst();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public E pollLast() {
+ lock.lock();
+ try {
+ return unlinkLast();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public E removeFirst() {
+ E x = pollFirst();
+ if (x == null) throw new NoSuchElementException();
+ return x;
+ }
+
+ public E removeLast() {
+ E x = pollLast();
+ if (x == null) throw new NoSuchElementException();
+ return x;
+ }
+
+ public E peekFirst() {
+ lock.lock();
+ try {
+ return (first == null) ? null : first.item;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public E peekLast() {
+ lock.lock();
+ try {
+ return (last == null) ? null : last.item;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public E getFirst() {
+ E x = peekFirst();
+ if (x == null) throw new NoSuchElementException();
+ return x;
+ }
+
+ public E getLast() {
+ E x = peekLast();
+ if (x == null) throw new NoSuchElementException();
+ return x;
+ }
+
+ // BlockingDeque methods
+
+ public void putFirst(E o) throws InterruptedException {
+ if (o == null) throw new NullPointerException();
+ lock.lock();
+ try {
+ while (!linkFirst(o))
+ notFull.await();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void putLast(E o) throws InterruptedException {
+ if (o == null) throw new NullPointerException();
+ lock.lock();
+ try {
+ while (!linkLast(o))
+ notFull.await();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public E takeFirst() throws InterruptedException {
+ lock.lock();
+ try {
+ E x;
+ while ( (x = unlinkFirst()) == null)
+ notEmpty.await();
+ return x;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public E takeLast() throws InterruptedException {
+ lock.lock();
+ try {
+ E x;
+ while ( (x = unlinkLast()) == null)
+ notEmpty.await();
+ return x;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean offerFirst(E o, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ if (o == null) throw new NullPointerException();
+ lock.lockInterruptibly();
+ try {
+ long nanos = unit.toNanos(timeout);
+ for (;;) {
+ if (linkFirst(o))
+ return true;
+ if (nanos <= 0)
+ return false;
+ nanos = notFull.awaitNanos(nanos);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean offerLast(E o, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ if (o == null) throw new NullPointerException();
+ lock.lockInterruptibly();
+ try {
+ long nanos = unit.toNanos(timeout);
+ for (;;) {
+ if (linkLast(o))
+ return true;
+ if (nanos <= 0)
+ return false;
+ nanos = notFull.awaitNanos(nanos);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public E pollFirst(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ lock.lockInterruptibly();
+ try {
+ long nanos = unit.toNanos(timeout);
+ for (;;) {
+ E x = unlinkFirst();
+ if (x != null)
+ return x;
+ if (nanos <= 0)
+ return null;
+ nanos = notEmpty.awaitNanos(nanos);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public E pollLast(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ lock.lockInterruptibly();
+ try {
+ long nanos = unit.toNanos(timeout);
+ for (;;) {
+ E x = unlinkLast();
+ if (x != null)
+ return x;
+ if (nanos <= 0)
+ return null;
+ nanos = notEmpty.awaitNanos(nanos);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ // Queue and stack methods
+
+ public boolean offer(E e) { return offerLast(e); }
+ public boolean add(E e) { addLast(e); return true; }
+ public void push(E e) { addFirst(e); }
+ public E poll() { return pollFirst(); }
+ public E remove() { return removeFirst(); }
+ public E pop() { return removeFirst(); }
+ public E peek() { return peekFirst(); }
+ public E element() { return getFirst(); }
+ public boolean remove(Object o) { return removeFirstOccurrence(o); }
+
+ // BlockingQueue methods
+
+ public void put(E o) throws InterruptedException { putLast(o); }
+ public E take() throws InterruptedException { return takeFirst(); }
+ public boolean offer(E o, long timeout, TimeUnit unit)
+ throws InterruptedException { return offerLast(o, timeout, unit); }
+ public E poll(long timeout, TimeUnit unit)
+ throws InterruptedException { return pollFirst(timeout, unit); }
+
+ /**
+ * Returns the number of elements in this deque.
+ *
+ * @return the number of elements in this deque.
+ */
+ public int size() {
+ lock.lock();
+ try {
+ return count;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns the number of elements that this deque can ideally (in
+ * the absence of memory or resource constraints) accept without
+ * blocking. This is always equal to the initial capacity of this deque
+ * less the current <tt>size</tt> of this deque.
+ * <p>Note that you <em>cannot</em> always tell if
+ * an attempt to <tt>add</tt> an element will succeed by
+ * inspecting <tt>remainingCapacity</tt> because it may be the
+ * case that a waiting consumer is ready to <tt>take</tt> an
+ * element out of an otherwise full deque.
+ */
+ public int remainingCapacity() {
+ lock.lock();
+ try {
+ return capacity - count;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean contains(Object o) {
+ if (o == null) return false;
+ lock.lock();
+ try {
+ for (Node<E> p = first; p != null; p = p.next)
+ if (o.equals(p.item))
+ return true;
+ return false;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean removeFirstOccurrence(Object e) {
+ if (e == null) throw new NullPointerException();
+ lock.lock();
+ try {
+ for (Node<E> p = first; p != null; p = p.next) {
+ if (e.equals(p.item)) {
+ unlink(p);
+ return true;
+ }
+ }
+ return false;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean removeLastOccurrence(Object e) {
+ if (e == null) throw new NullPointerException();
+ lock.lock();
+ try {
+ for (Node<E> p = last; p != null; p = p.prev) {
+ if (e.equals(p.item)) {
+ unlink(p);
+ return true;
+ }
+ }
+ return false;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Variant of removeFirstOccurrence needed by iterator.remove.
+ * Searches for the node, not its contents.
+ */
+ boolean removeNode(Node<E> e) {
+ lock.lock();
+ try {
+ for (Node<E> p = first; p != null; p = p.next) {
+ if (p == e) {
+ unlink(p);
+ return true;
+ }
+ }
+ return false;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public Object[] toArray() {
+ lock.lock();
+ try {
+ Object[] a = new Object[count];
+ int k = 0;
+ for (Node<E> p = first; p != null; p = p.next)
+ a[k++] = p.item;
+ return a;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public <T> T[] toArray(T[] a) {
+ lock.lock();
+ try {
+ if (a.length < count)
+ a = (T[])java.lang.reflect.Array.newInstance(
+ a.getClass().getComponentType(),
+ count
+ );
+
+ int k = 0;
+ for (Node<E> p = first; p != null; p = p.next)
+ a[k++] = (T)p.item;
+ if (a.length > k)
+ a[k] = null;
+ return a;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public String toString() {
+ lock.lock();
+ try {
+ return super.toString();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Atomically removes all of the elements from this deque.
+ * The deque will be empty after this call returns.
+ */
+ public void clear() {
+ lock.lock();
+ try {
+ first = last = null;
+ count = 0;
+ notFull.signalAll();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public int drainTo(Collection<? super E> c) {
+ if (c == null)
+ throw new NullPointerException();
+ if (c == this)
+ throw new IllegalArgumentException();
+ lock.lock();
+ try {
+ for (Node<E> p = first; p != null; p = p.next)
+ c.add(p.item);
+ int n = count;
+ count = 0;
+ first = last = null;
+ notFull.signalAll();
+ return n;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public int drainTo(Collection<? super E> c, int maxElements) {
+ if (c == null)
+ throw new NullPointerException();
+ if (c == this)
+ throw new IllegalArgumentException();
+ lock.lock();
+ try {
+ int n = 0;
+ while (n < maxElements && first != null) {
+ c.add(first.item);
+ first.prev = null;
+ first = first.next;
+ --count;
+ ++n;
+ }
+ if (first == null)
+ last = null;
+ notFull.signalAll();
+ return n;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns an iterator over the elements in this deque in proper sequence.
+ * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
+ * will never throw {@link java.util.ConcurrentModificationException},
+ * and guarantees to traverse elements as they existed upon
+ * construction of the iterator, and may (but is not guaranteed to)
+ * reflect any modifications subsequent to construction.
+ *
+ * @return an iterator over the elements in this deque in proper sequence.
+ */
+ public Iterator<E> iterator() {
+ return new Itr();
+ }
+
+ /**
+ * Iterator for LinkedBlockingDeque
+ */
+ private class Itr implements Iterator<E> {
+ private Node<E> next;
+
+ /**
+ * nextItem holds on to item fields because once we claim that
+ * an element exists in hasNext(), we must return item read
+ * under lock (in advance()) even if it was in the process of
+ * being removed when hasNext() was called.
+ **/
+ private E nextItem;
+
+ /**
+ * Node returned by most recent call to next. Needed by remove.
+ * Reset to null if this element is deleted by a call to remove.
+ */
+ private Node<E> last;
+
+ Itr() {
+ advance();
+ }
+
+ /**
+ * Advance next, or if not yet initialized, set to first node.
+ */
+ private void advance() {
+ final ReentrantLock lock = LinkedBlockingDeque.this.lock;
+ lock.lock();
+ try {
+ next = (next == null)? first : next.next;
+ nextItem = (next == null)? null : next.item;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean hasNext() {
+ return next != null;
+ }
+
+ public E next() {
+ if (next == null)
+ throw new NoSuchElementException();
+ last = next;
+ E x = nextItem;
+ advance();
+ return x;
+ }
+
+ public void remove() {
+ Node<E> n = last;
+ if (n == null)
+ throw new IllegalStateException();
+ last = null;
+ // Note: removeNode rescans looking for this node to make
+ // sure it was not already removed. Otherwwise, trying to
+ // re-remove could corrupt list.
+ removeNode(n);
+ }
+ }
+
+ /**
+ * Save the state to a stream (that is, serialize it).
+ *
+ * @serialData The capacity (int), followed by elements (each an
+ * <tt>Object</tt>) in the proper order, followed by a null
+ * @param s the stream
+ */
+ private void writeObject(java.io.ObjectOutputStream s)
+ throws java.io.IOException {
+ lock.lock();
+ try {
+ // Write out capacity and any hidden stuff
+ s.defaultWriteObject();
+ // Write out all elements in the proper order.
+ for (Node<E> p = first; p != null; p = p.next)
+ s.writeObject(p.item);
+ // Use trailing null as sentinel
+ s.writeObject(null);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Reconstitute this deque instance from a stream (that is,
+ * deserialize it).
+ * @param s the stream
+ */
+ private void readObject(java.io.ObjectInputStream s)
+ throws java.io.IOException, ClassNotFoundException {
+ s.defaultReadObject();
+ count = 0;
+ first = null;
+ last = null;
+ // Read in all elements and place in queue
+ for (;;) {
+ E item = (E)s.readObject();
+ if (item == null)
+ break;
+ add(item);
+ }
+ }
+
+}
Added: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalCompactTest.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalCompactTest.java (rev 0)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalCompactTest.java 2009-06-26 22:56:53 UTC (rev 7483)
@@ -0,0 +1,324 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.unit.core.journal.impl;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.JournalImpl;
+import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.utils.IDGenerator;
+import org.jboss.messaging.utils.TimeAndCounterIDGenerator;
+
+/**
+ *
+ * A JournalImplTestBase
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public class JournalCompactTest extends JournalImplTestBase
+{
+ private static final Logger log = Logger.getLogger(JournalCompactTest.class);
+
+ protected String journalDir = System.getProperty("user.home") + "/journal-test";
+
+ private static final int NUMBER_OF_RECORDS = 1000;
+
+ IDGenerator idGenerator = new TimeAndCounterIDGenerator();
+
+ // General tests
+ // =============
+
+ public void testCompactwithPendingXACommit() throws Exception
+ {
+ }
+
+ public void testCompactwithPendingXAPrepareAndCommit() throws Exception
+ {
+ }
+
+ public void testCompactwithPendingCommit() throws Exception
+ {
+ }
+
+ public void testCompactwithConcurrentDeletes() throws Exception
+ {
+ }
+
+ public void testCompactWithConcurrentAppend() throws Exception
+ {
+ setup(50, 60 * 1024, true);
+
+ final CountDownLatch latchDone = new CountDownLatch(1);
+ final CountDownLatch latchWait = new CountDownLatch(1);
+ journal = new JournalImpl(fileSize, minFiles, fileFactory, filePrefix, fileExtension, maxAIO)
+ {
+ public void onCompactDone()
+ {
+ latchDone.countDown();
+ System.out.println("Waiting on Compact");
+ try
+ {
+ latchWait.await();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ System.out.println("Done");
+ }
+ };
+ startJournal();
+ load();
+
+ long transactionID = 0;
+
+ for (int i = 0; i < NUMBER_OF_RECORDS / 2; i++)
+ {
+ add(i);
+ if (i % 10 == 0 && i > 0)
+ {
+ journal.forceMoveNextFile();
+ }
+ update(i);
+ }
+
+ for (int i = NUMBER_OF_RECORDS / 2; i < NUMBER_OF_RECORDS; i++)
+ {
+
+ addTx(transactionID, i);
+ updateTx(transactionID, i);
+ if (i % 10 == 0)
+ {
+ journal.forceMoveNextFile();
+ }
+ commit(transactionID++);
+ update(i);
+ }
+
+ System.out.println("Number of Files: " + journal.getDataFilesCount());
+
+ for (int i = 0; i < NUMBER_OF_RECORDS; i++)
+ {
+ if (!(i % 10 == 0))
+ {
+ delete(i);
+ }
+ }
+
+ journal.forceMoveNextFile();
+
+ Thread t = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ journal.compact();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ t.start();
+
+ latchDone.await();
+
+ int nextID = NUMBER_OF_RECORDS;
+
+ for (int i = 0; i < 100; i++)
+ {
+ add(nextID++);
+ if (i % 10 == 0)
+ {
+ journal.forceMoveNextFile();
+ }
+ }
+
+ latchWait.countDown();
+
+ t.join();
+
+
+ for (int i = 0 ; i < 1000; i++)
+ {
+ long id = idGenerator.generateID();
+ add(id);
+ delete(id);
+
+ if (i % 100 == 0)
+ {
+ journal.forceMoveNextFile();
+ }
+ }
+
+ journal.forceMoveNextFile();
+
+ delete(0);
+ add(idGenerator.generateID());
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ }
+
+ public void testCompactwithConcurrentAppendAndUpdate() throws Exception
+ {
+ }
+
+ public void testCompactWithPendingTransactionAndDelete() throws Exception
+ {
+ }
+
+ public void testCompactingWithPendingTransaction() throws Exception
+ {
+
+ }
+
+ public void testSimpleCompacting() throws Exception
+ {
+ setup(50, 60 * 1024, true);
+
+ createJournal();
+ startJournal();
+ load();
+
+ int NUMBER_OF_RECORDS = 1000;
+
+ // add and remove some data to force reclaiming
+ {
+ ArrayList<Long> ids = new ArrayList<Long>();
+ for (int i = 0; i < NUMBER_OF_RECORDS; i++)
+ {
+ long id = idGenerator.generateID();
+ ids.add(id);
+ add(id);
+ if (i > 0 && (i % 100 == 0))
+ {
+ journal.forceMoveNextFile();
+ }
+ }
+
+ for (Long id : ids)
+ {
+ delete(id);
+ }
+
+ journal.forceMoveNextFile();
+
+ journal.checkAndReclaimFiles();
+ }
+
+ long transactionID = 0;
+
+ for (int i = 0; i < NUMBER_OF_RECORDS / 2; i++)
+ {
+ add(i);
+ if (i % 10 == 0 && i > 0)
+ {
+ journal.forceMoveNextFile();
+ }
+ update(i);
+ }
+
+ for (int i = NUMBER_OF_RECORDS / 2; i < NUMBER_OF_RECORDS; i++)
+ {
+
+ addTx(transactionID, i);
+ updateTx(transactionID, i);
+ if (i % 10 == 0)
+ {
+ journal.forceMoveNextFile();
+ }
+ commit(transactionID++);
+ update(i);
+ }
+
+ System.out.println("Number of Files: " + journal.getDataFilesCount());
+
+ for (int i = 0; i < NUMBER_OF_RECORDS; i++)
+ {
+ if (!(i % 10 == 0))
+ {
+ delete(i);
+ }
+ }
+
+ journal.forceMoveNextFile();
+
+ System.out.println("Number of Files: " + journal.getDataFilesCount());
+
+ System.out.println("Before compact ****************************");
+ System.out.println(journal.debug());
+ System.out.println("*****************************************");
+
+ journal.compact();
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ }
+
+ protected int getAlignment()
+ {
+ return 1;
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ File file = new File(journalDir);
+
+ deleteDirectory(file);
+
+ file.mkdir();
+ }
+
+ protected SequentialFileFactory createFactory()
+ {
+ return new NIOSequentialFileFactory(journalDir);
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.tests.unit.core.journal.impl.JournalImplTestBase#getFileFactory()
+ */
+ @Override
+ protected SequentialFileFactory getFileFactory() throws Exception
+ {
+ return createFactory();
+ }
+
+}
Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java 2009-06-26 17:03:46 UTC (rev 7482)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java 2009-06-26 22:56:53 UTC (rev 7483)
@@ -38,7 +38,6 @@
import org.jboss.messaging.core.journal.TestableJournal;
import org.jboss.messaging.core.journal.impl.JournalImpl;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.tests.util.RandomUtil;
import org.jboss.messaging.tests.util.UnitTestCase;
/**
Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-06-26 17:03:46 UTC (rev 7482)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-06-26 22:56:53 UTC (rev 7483)
@@ -358,9 +358,8 @@
{
if (!open)
{
- throw new IllegalStateException("Is closed");
+ close();
}
- close();
fileMap.remove(fileName);
}
More information about the jboss-cvs-commits
mailing list