[hornetq-commits] JBoss hornetq SVN: r9460 - in trunk: src/main/org/hornetq/core/journal/impl and 5 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri Jul 23 00:51:08 EDT 2010
Author: clebert.suconic at jboss.com
Date: 2010-07-23 00:51:06 -0400 (Fri, 23 Jul 2010)
New Revision: 9460
Added:
trunk/src/main/org/hornetq/utils/ReusableLatch.java
trunk/tests/src/org/hornetq/tests/timing/util/ReusableLatchTest.java
trunk/tests/src/org/hornetq/tests/unit/util/ReusableLatchTest.java
Removed:
trunk/src/main/org/hornetq/utils/VariableLatch.java
trunk/tests/src/org/hornetq/tests/timing/util/VariableLatchTest.java
trunk/tests/src/org/hornetq/tests/unit/util/VariableLatchTest.java
Modified:
trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
trunk/src/main/org/hornetq/core/journal/impl/TransactionCallback.java
trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactStressTest.java
Log:
Renaming VariableLatch as ReusableLatch (wanted to do this for a long time already)
Modified: trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2010-07-22 22:39:51 UTC (rev 9459)
+++ trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2010-07-23 04:51:06 UTC (rev 9460)
@@ -29,7 +29,7 @@
import org.hornetq.core.asyncio.AsynchronousFile;
import org.hornetq.core.asyncio.BufferCallback;
import org.hornetq.core.logging.Logger;
-import org.hornetq.utils.VariableLatch;
+import org.hornetq.utils.ReusableLatch;
/**
*
@@ -146,7 +146,7 @@
**/
private final Lock callbackLock = new ReentrantLock();
- private final VariableLatch pollerLatch = new VariableLatch();
+ private final ReusableLatch pollerLatch = new ReusableLatch();
private volatile Runnable poller;
@@ -154,7 +154,7 @@
private final Lock writeLock = new ReentrantReadWriteLock().writeLock();
- private final VariableLatch pendingWrites = new VariableLatch();
+ private final ReusableLatch pendingWrites = new ReusableLatch();
private Semaphore maxIOSemaphore;
@@ -242,7 +242,7 @@
try
{
- while (!pendingWrites.waitCompletion(60000))
+ while (!pendingWrites.await(60000))
{
AsynchronousFileImpl.log.warn("Couldn't get lock after 60 seconds on closing AsynchronousFileImpl::" + fileName);
}
@@ -299,7 +299,7 @@
startPoller();
}
- pendingWrites.up();
+ pendingWrites.countUp();
if (writeExecutor != null)
{
@@ -362,7 +362,7 @@
{
startPoller();
}
- pendingWrites.up();
+ pendingWrites.countUp();
maxIOSemaphore.acquireUninterruptibly();
try
{
@@ -372,14 +372,14 @@
{
// Release only if an exception happened
maxIOSemaphore.release();
- pendingWrites.down();
+ pendingWrites.countDown();
throw e;
}
catch (RuntimeException e)
{
// Release only if an exception happened
maxIOSemaphore.release();
- pendingWrites.down();
+ pendingWrites.countDown();
throw e;
}
}
@@ -457,7 +457,7 @@
{
maxIOSemaphore.release();
- pendingWrites.down();
+ pendingWrites.countDown();
callbackLock.lock();
@@ -524,7 +524,7 @@
maxIOSemaphore.release();
- pendingWrites.down();
+ pendingWrites.countDown();
callbackLock.lock();
@@ -578,7 +578,7 @@
if (poller == null)
{
- pollerLatch.up();
+ pollerLatch.countUp();
poller = new PollerRunnable();
try
{
@@ -613,7 +613,7 @@
AsynchronousFileImpl.stopPoller(handler);
// We need to make sure we won't call close until Poller is
// completely done, or we might get beautiful GPFs
- pollerLatch.waitCompletion();
+ pollerLatch.await();
}
// Native ----------------------------------------------------------------------------
@@ -729,7 +729,7 @@
// Case the poller thread is interrupted, this will allow us to
// restart the thread when required
poller = null;
- pollerLatch.down();
+ pollerLatch.countDown();
}
}
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/TransactionCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TransactionCallback.java 2010-07-22 22:39:51 UTC (rev 9459)
+++ trunk/src/main/org/hornetq/core/journal/impl/TransactionCallback.java 2010-07-23 04:51:06 UTC (rev 9460)
@@ -14,7 +14,7 @@
package org.hornetq.core.journal.impl;
import org.hornetq.core.journal.IOAsyncTask;
-import org.hornetq.utils.VariableLatch;
+import org.hornetq.utils.ReusableLatch;
/**
* A TransactionCallback
@@ -25,7 +25,7 @@
*/
public class TransactionCallback implements IOAsyncTask
{
- private final VariableLatch countLatch = new VariableLatch();
+ private final ReusableLatch countLatch = new ReusableLatch();
private volatile String errorMessage = null;
@@ -40,12 +40,12 @@
public void countUp()
{
up++;
- countLatch.up();
+ countLatch.countUp();
}
public void done()
{
- countLatch.down();
+ countLatch.countDown();
if (++done == up && delegateCompletion != null)
{
final IOAsyncTask delegateToCall = delegateCompletion;
@@ -58,7 +58,7 @@
public void waitCompletion() throws InterruptedException
{
- countLatch.waitCompletion();
+ countLatch.await();
if (errorMessage != null)
{
@@ -72,7 +72,7 @@
this.errorCode = errorCode;
- countLatch.down();
+ countLatch.countDown();
if (delegateCompletion != null)
{
Copied: trunk/src/main/org/hornetq/utils/ReusableLatch.java (from rev 9459, trunk/src/main/org/hornetq/utils/VariableLatch.java)
===================================================================
--- trunk/src/main/org/hornetq/utils/ReusableLatch.java (rev 0)
+++ trunk/src/main/org/hornetq/utils/ReusableLatch.java 2010-07-23 04:51:06 UTC (rev 9460)
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+
+/**
+ *
+ * <p>This class will use the framework provided to by AbstractQueuedSynchronizer.</p>
+ * <p>AbstractQueuedSynchronizer is the framework for any sort of concurrent synchronization, such as Semaphores, events, etc, based on AtomicIntegers.</p>
+ *
+ * <p>This class works just like CountDownLatch, with the difference you can also increase the counter</p>
+ *
+ * <p>It could be used for sync points when one process is feeding the latch while another will wait when everything is done. (e.g. waiting IO completions to finish)</p>
+ *
+ * <p>On HornetQ we have the requirement of increment and decrement a counter until the user fires a ready event (commit). At that point we just act as a regular countDown.</p>
+ *
+ * <p>Note: This latch is reusable. Once it reaches zero, you can call up again, and reuse it on further waits.</p>
+ *
+ * <p>For example: prepareTransaction will wait for the current completions, and further adds will be called on the latch. Later on when commit is called you can reuse the same latch.</p>
+ *
+ * @author Clebert Suconic
+ * */
+public class ReusableLatch
+{
+ /**
+ * Look at the doc and examples provided by AbstractQueuedSynchronizer for more information
+ * @see AbstractQueuedSynchronizer*/
+ @SuppressWarnings("serial")
+ private static class CountSync extends AbstractQueuedSynchronizer
+ {
+ public CountSync(int count)
+ {
+ setState(count);
+ }
+
+ public int getCount()
+ {
+ return getState();
+ }
+
+ @Override
+ public int tryAcquireShared(final int numberOfAqcquires)
+ {
+ return getState() == 0 ? 1 : -1;
+ }
+
+ public void add()
+ {
+ for (;;)
+ {
+ int actualState = getState();
+ int newState = actualState + 1;
+ if (compareAndSetState(actualState, newState))
+ {
+ return;
+ }
+ }
+ }
+
+ @Override
+ public boolean tryReleaseShared(final int numberOfReleases)
+ {
+ for (;;)
+ {
+ int actualState = getState();
+ if (actualState == 0)
+ {
+ return true;
+ }
+
+ int newState = actualState - numberOfReleases;
+
+ if (compareAndSetState(actualState, newState))
+ {
+ return newState == 0;
+ }
+ }
+ }
+ }
+
+ private final CountSync control;
+
+ public ReusableLatch()
+ {
+ this(0);
+ }
+
+ public ReusableLatch(final int count)
+ {
+ control = new CountSync(count);
+ }
+
+ public int getCount()
+ {
+ return control.getCount();
+ }
+
+ public void countUp()
+ {
+ control.add();
+ }
+
+ public void countDown()
+ {
+ control.releaseShared(1);
+ }
+
+ public void await() throws InterruptedException
+ {
+ control.acquireSharedInterruptibly(1);
+ }
+
+ public boolean await(final long milliseconds) throws InterruptedException
+ {
+ return control.tryAcquireSharedNanos(1, TimeUnit.MILLISECONDS.toNanos(milliseconds));
+ }
+
+ public boolean await(final long timeWait, TimeUnit timeUnit) throws InterruptedException
+ {
+ return control.tryAcquireSharedNanos(1, timeUnit.toNanos(timeWait));
+ }
+}
Deleted: trunk/src/main/org/hornetq/utils/VariableLatch.java
===================================================================
--- trunk/src/main/org/hornetq/utils/VariableLatch.java 2010-07-22 22:39:51 UTC (rev 9459)
+++ trunk/src/main/org/hornetq/utils/VariableLatch.java 2010-07-23 04:51:06 UTC (rev 9460)
@@ -1,118 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.utils;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.AbstractQueuedSynchronizer;
-
-/**
- *
- * <p>This class will use the framework provided to by AbstractQueuedSynchronizer.</p>
- * <p>AbstractQueuedSynchronizer is the framework for any sort of concurrent synchronization, such as Semaphores, events, etc, based on AtomicIntegers.</p>
- *
- * <p>The idea is, instead of providing each user specific Latch/Synchronization, java.util.concurrent provides the framework for reuses, based on an AtomicInteger (getState())</p>
- *
- * <p>On HornetQ we have the requirement of increment and decrement a counter until the user fires a ready event (commit). At that point we just act as a regular countDown.</p>
- *
- * <p>Note: This latch is reusable. Once it reaches zero, you can call up again, and reuse it on further waits.</p>
- *
- * <p>For example: prepareTransaction will wait for the current completions, and further adds will be called on the latch. Later on when commit is called you can reuse the same latch.</p>
- *
- * @author Clebert Suconic
- * */
-public class VariableLatch
-{
- /**
- * Look at the doc and examples provided by AbstractQueuedSynchronizer for more information
- * @see AbstractQueuedSynchronizer*/
- @SuppressWarnings("serial")
- private static class CountSync extends AbstractQueuedSynchronizer
- {
- public CountSync()
- {
- setState(0);
- }
-
- public int getCount()
- {
- return getState();
- }
-
- @Override
- public int tryAcquireShared(final int numberOfAqcquires)
- {
- return getState() == 0 ? 1 : -1;
- }
-
- public void add()
- {
- for (;;)
- {
- int actualState = getState();
- int newState = actualState + 1;
- if (compareAndSetState(actualState, newState))
- {
- return;
- }
- }
- }
-
- @Override
- public boolean tryReleaseShared(final int numberOfReleases)
- {
- for (;;)
- {
- int actualState = getState();
- if (actualState == 0)
- {
- return true;
- }
-
- int newState = actualState - numberOfReleases;
-
- if (compareAndSetState(actualState, newState))
- {
- return newState == 0;
- }
- }
- }
- }
-
- private final CountSync control = new CountSync();
-
- public int getCount()
- {
- return control.getCount();
- }
-
- public void up()
- {
- control.add();
- }
-
- public void down()
- {
- control.releaseShared(1);
- }
-
- public void waitCompletion() throws InterruptedException
- {
- control.acquireSharedInterruptibly(1);
- }
-
- public boolean waitCompletion(final long milliseconds) throws InterruptedException
- {
- return control.tryAcquireSharedNanos(1, TimeUnit.MILLISECONDS.toNanos(milliseconds));
- }
-}
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-07-22 22:39:51 UTC (rev 9459)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-07-23 04:51:06 UTC (rev 9460)
@@ -39,7 +39,7 @@
import org.hornetq.utils.IDGenerator;
import org.hornetq.utils.SimpleIDGenerator;
import org.hornetq.utils.TimeAndCounterIDGenerator;
-import org.hornetq.utils.VariableLatch;
+import org.hornetq.utils.ReusableLatch;
/**
*
@@ -571,10 +571,10 @@
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
- final VariableLatch reusableLatchDone = new VariableLatch();
- reusableLatchDone.up();
- final VariableLatch reusableLatchWait = new VariableLatch();
- reusableLatchWait.up();
+ final ReusableLatch reusableLatchDone = new ReusableLatch();
+ reusableLatchDone.countUp();
+ final ReusableLatch reusableLatchWait = new ReusableLatch();
+ reusableLatchWait.countUp();
journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
{
@@ -582,11 +582,11 @@
@Override
public void onCompactDone()
{
- reusableLatchDone.down();
+ reusableLatchDone.countDown();
System.out.println("Waiting on Compact");
try
{
- reusableLatchWait.waitCompletion();
+ reusableLatchWait.await();
}
catch (InterruptedException e)
{
@@ -631,7 +631,7 @@
tCompact.start();
- reusableLatchDone.waitCompletion();
+ reusableLatchDone.await();
addTx(appendTX, addedRecord);
@@ -643,7 +643,7 @@
delete(addedRecord);
- reusableLatchWait.down();
+ reusableLatchWait.countDown();
tCompact.join();
@@ -672,10 +672,10 @@
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
- final VariableLatch reusableLatchDone = new VariableLatch();
- reusableLatchDone.up();
- final VariableLatch reusableLatchWait = new VariableLatch();
- reusableLatchWait.up();
+ final ReusableLatch reusableLatchDone = new ReusableLatch();
+ reusableLatchDone.countUp();
+ final ReusableLatch reusableLatchWait = new ReusableLatch();
+ reusableLatchWait.countUp();
journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
{
@@ -683,11 +683,11 @@
@Override
public void onCompactDone()
{
- reusableLatchDone.down();
+ reusableLatchDone.countDown();
System.out.println("Waiting on Compact");
try
{
- reusableLatchWait.waitCompletion();
+ reusableLatchWait.await();
}
catch (InterruptedException e)
{
@@ -732,7 +732,7 @@
tCompact.start();
- reusableLatchDone.waitCompletion();
+ reusableLatchDone.await();
addTx(appendTX, addedRecord);
commit(appendTX);
@@ -745,7 +745,7 @@
commit(deleteTXID);
- reusableLatchWait.down();
+ reusableLatchWait.countDown();
tCompact.join();
@@ -768,10 +768,10 @@
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
- final VariableLatch reusableLatchDone = new VariableLatch();
- reusableLatchDone.up();
- final VariableLatch reusableLatchWait = new VariableLatch();
- reusableLatchWait.up();
+ final ReusableLatch reusableLatchDone = new ReusableLatch();
+ reusableLatchDone.countUp();
+ final ReusableLatch reusableLatchWait = new ReusableLatch();
+ reusableLatchWait.countUp();
journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
{
@@ -779,11 +779,11 @@
@Override
public void onCompactDone()
{
- reusableLatchDone.down();
+ reusableLatchDone.countDown();
System.out.println("Waiting on Compact");
try
{
- reusableLatchWait.waitCompletion();
+ reusableLatchWait.await();
}
catch (InterruptedException e)
{
@@ -829,13 +829,13 @@
tCompact.start();
- reusableLatchDone.waitCompletion();
+ reusableLatchDone.await();
addTx(consumerTX, addedRecord);
commit(consumerTX);
delete(addedRecord);
- reusableLatchWait.down();
+ reusableLatchWait.countDown();
tCompact.join();
@@ -857,10 +857,10 @@
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
- final VariableLatch reusableLatchDone = new VariableLatch();
- reusableLatchDone.up();
- final VariableLatch reusableLatchWait = new VariableLatch();
- reusableLatchWait.up();
+ final ReusableLatch reusableLatchDone = new ReusableLatch();
+ reusableLatchDone.countUp();
+ final ReusableLatch reusableLatchWait = new ReusableLatch();
+ reusableLatchWait.countUp();
journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
{
@@ -868,11 +868,11 @@
@Override
public void onCompactDone()
{
- reusableLatchDone.down();
+ reusableLatchDone.countDown();
System.out.println("Waiting on Compact");
try
{
- reusableLatchWait.waitCompletion();
+ reusableLatchWait.await();
}
catch (InterruptedException e)
{
@@ -915,7 +915,7 @@
tCompact.start();
- reusableLatchDone.waitCompletion();
+ reusableLatchDone.await();
addTx(consumerTX, firstID);
@@ -929,7 +929,7 @@
delete(addedRecord);
- reusableLatchWait.down();
+ reusableLatchWait.countDown();
tCompact.join();
@@ -959,10 +959,10 @@
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
- final VariableLatch reusableLatchDone = new VariableLatch();
- reusableLatchDone.up();
- final VariableLatch reusableLatchWait = new VariableLatch();
- reusableLatchWait.up();
+ final ReusableLatch reusableLatchDone = new ReusableLatch();
+ reusableLatchDone.countUp();
+ final ReusableLatch reusableLatchWait = new ReusableLatch();
+ reusableLatchWait.countUp();
journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
{
@@ -970,11 +970,11 @@
@Override
public void onCompactDone()
{
- reusableLatchDone.down();
+ reusableLatchDone.countDown();
System.out.println("Waiting on Compact");
try
{
- reusableLatchWait.waitCompletion();
+ reusableLatchWait.await();
}
catch (InterruptedException e)
{
@@ -1016,7 +1016,7 @@
tCompact.start();
- reusableLatchDone.waitCompletion();
+ reusableLatchDone.await();
addTx(appendTX, appendTwo);
@@ -1028,7 +1028,7 @@
commit(updateTX);
//delete(appendTwo);
- reusableLatchWait.down();
+ reusableLatchWait.countDown();
tCompact.join();
journal.compact();
Modified: trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactStressTest.java 2010-07-22 22:39:51 UTC (rev 9459)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactStressTest.java 2010-07-23 04:51:06 UTC (rev 9460)
@@ -21,7 +21,7 @@
import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
import org.hornetq.tests.unit.core.journal.impl.JournalImplTestBase;
import org.hornetq.utils.SimpleIDGenerator;
-import org.hornetq.utils.VariableLatch;
+import org.hornetq.utils.ReusableLatch;
/**
* A NIORandomCompactTest
@@ -37,9 +37,9 @@
// Attributes ----------------------------------------------------
- private VariableLatch startedCompactingLatch = null;
+ private ReusableLatch startedCompactingLatch = null;
- private VariableLatch releaseCompactingLatch = null;
+ private ReusableLatch releaseCompactingLatch = null;
private Thread tCompact = null;
@@ -55,9 +55,9 @@
tCompact = null;
- startedCompactingLatch = new VariableLatch();
+ startedCompactingLatch = new ReusableLatch();
- releaseCompactingLatch = new VariableLatch();
+ releaseCompactingLatch = new ReusableLatch();
File file = new File(getTestDir());
@@ -107,10 +107,10 @@
@Override
public void onCompactDone()
{
- startedCompactingLatch.down();
+ startedCompactingLatch.countDown();
try
{
- releaseCompactingLatch.waitCompletion();
+ releaseCompactingLatch.await();
}
catch (InterruptedException e)
{
@@ -283,7 +283,7 @@
*/
private void joinCompact() throws InterruptedException
{
- releaseCompactingLatch.down();
+ releaseCompactingLatch.countDown();
tCompact.join();
@@ -315,7 +315,7 @@
tCompact.start();
- startedCompactingLatch.waitCompletion();
+ startedCompactingLatch.await();
}
/* (non-Javadoc)
Copied: trunk/tests/src/org/hornetq/tests/timing/util/ReusableLatchTest.java (from rev 9459, trunk/tests/src/org/hornetq/tests/timing/util/VariableLatchTest.java)
===================================================================
--- trunk/tests/src/org/hornetq/tests/timing/util/ReusableLatchTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/timing/util/ReusableLatchTest.java 2010-07-23 04:51:06 UTC (rev 9460)
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.timing.util;
+
+import junit.framework.Assert;
+
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.ReusableLatch;
+
+/**
+ * @author <a href="csuconic at redhat.com">Clebert Suconic</a>
+ */
+public class ReusableLatchTest extends UnitTestCase
+{
+ public void testTimeout() throws Exception
+ {
+ ReusableLatch latch = new ReusableLatch();
+
+ latch.countUp();
+
+ long start = System.currentTimeMillis();
+ Assert.assertFalse(latch.await(1000));
+ long end = System.currentTimeMillis();
+
+ Assert.assertTrue("Timeout didn't work correctly", end - start >= 1000 && end - start < 2000);
+ }
+}
Deleted: trunk/tests/src/org/hornetq/tests/timing/util/VariableLatchTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/timing/util/VariableLatchTest.java 2010-07-22 22:39:51 UTC (rev 9459)
+++ trunk/tests/src/org/hornetq/tests/timing/util/VariableLatchTest.java 2010-07-23 04:51:06 UTC (rev 9460)
@@ -1,37 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.tests.timing.util;
-
-import junit.framework.Assert;
-
-import org.hornetq.tests.util.UnitTestCase;
-import org.hornetq.utils.VariableLatch;
-
-/**
- * @author <a href="csuconic at redhat.com">Clebert Suconic</a>
- */
-public class VariableLatchTest extends UnitTestCase
-{
- public void testTimeout() throws Exception
- {
- VariableLatch latch = new VariableLatch();
-
- latch.up();
-
- long start = System.currentTimeMillis();
- Assert.assertFalse(latch.waitCompletion(1000));
- long end = System.currentTimeMillis();
-
- Assert.assertTrue("Timeout didn't work correctly", end - start >= 1000 && end - start < 2000);
- }
-}
Copied: trunk/tests/src/org/hornetq/tests/unit/util/ReusableLatchTest.java (from rev 9459, trunk/tests/src/org/hornetq/tests/unit/util/VariableLatchTest.java)
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/util/ReusableLatchTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/unit/util/ReusableLatchTest.java 2010-07-23 04:51:06 UTC (rev 9460)
@@ -0,0 +1,312 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.unit.util;
+
+import java.util.concurrent.CountDownLatch;
+
+import junit.framework.Assert;
+
+import org.hornetq.core.logging.Logger;
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.ReusableLatch;
+
+/**
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public class ReusableLatchTest extends UnitTestCase
+{
+ private static final Logger log = Logger.getLogger(ReusableLatchTest.class);
+
+ public void testLatchOnSingleThread() throws Exception
+ {
+ ReusableLatch latch = new ReusableLatch();
+
+ for (int i = 1; i <= 100; i++)
+ {
+ latch.countUp();
+ Assert.assertEquals(i, latch.getCount());
+ }
+
+ for (int i = 100; i > 0; i--)
+ {
+ Assert.assertEquals(i, latch.getCount());
+ latch.countDown();
+ Assert.assertEquals(i - 1, latch.getCount());
+ }
+
+ latch.await();
+ }
+
+ /**
+ *
+ * This test will open numberOfThreads threads, and add numberOfAdds on the
+ * VariableLatch After those addthreads are finished, the latch count should
+ * be numberOfThreads * numberOfAdds Then it will open numberOfThreads
+ * threads again releasing numberOfAdds on the VariableLatch After those
+ * releaseThreads are finished, the latch count should be 0 And all the
+ * waiting threads should be finished also
+ *
+ * @throws Exception
+ */
+ public void testLatchOnMultiThread() throws Exception
+ {
+ final ReusableLatch latch = new ReusableLatch();
+
+ latch.countUp(); // We hold at least one, so ThreadWaits won't go away
+
+ final int numberOfThreads = 100;
+ final int numberOfAdds = 100;
+
+ class ThreadWait extends Thread
+ {
+ private volatile boolean waiting = true;
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ if (!latch.await(5000))
+ {
+ ReusableLatchTest.log.error("Latch timed out");
+ }
+ }
+ catch (Exception e)
+ {
+ ReusableLatchTest.log.error(e);
+ }
+ waiting = false;
+ }
+ }
+
+ class ThreadAdd extends Thread
+ {
+ private final CountDownLatch latchReady;
+
+ private final CountDownLatch latchStart;
+
+ ThreadAdd(final CountDownLatch latchReady, final CountDownLatch latchStart)
+ {
+ this.latchReady = latchReady;
+ this.latchStart = latchStart;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ latchReady.countDown();
+ // Everybody should start at the same time, to worse concurrency
+ // effects
+ latchStart.await();
+ for (int i = 0; i < numberOfAdds; i++)
+ {
+ latch.countUp();
+ }
+ }
+ catch (Exception e)
+ {
+ ReusableLatchTest.log.error(e.getMessage(), e);
+ }
+ }
+ }
+
+ CountDownLatch latchReady = new CountDownLatch(numberOfThreads);
+ CountDownLatch latchStart = new CountDownLatch(1);
+
+ ThreadAdd[] threadAdds = new ThreadAdd[numberOfThreads];
+ ThreadWait waits[] = new ThreadWait[numberOfThreads];
+
+ for (int i = 0; i < numberOfThreads; i++)
+ {
+ threadAdds[i] = new ThreadAdd(latchReady, latchStart);
+ threadAdds[i].start();
+ waits[i] = new ThreadWait();
+ waits[i].start();
+ }
+
+ latchReady.await();
+ latchStart.countDown();
+
+ for (int i = 0; i < numberOfThreads; i++)
+ {
+ threadAdds[i].join();
+ }
+
+ for (int i = 0; i < numberOfThreads; i++)
+ {
+ Assert.assertTrue(waits[i].waiting);
+ }
+
+ Assert.assertEquals(numberOfThreads * numberOfAdds + 1, latch.getCount());
+
+ class ThreadDown extends Thread
+ {
+ private final CountDownLatch latchReady;
+
+ private final CountDownLatch latchStart;
+
+ ThreadDown(final CountDownLatch latchReady, final CountDownLatch latchStart)
+ {
+ this.latchReady = latchReady;
+ this.latchStart = latchStart;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ latchReady.countDown();
+ // Everybody should start at the same time, to worse concurrency
+ // effects
+ latchStart.await();
+ for (int i = 0; i < numberOfAdds; i++)
+ {
+ latch.countDown();
+ }
+ }
+ catch (Exception e)
+ {
+ ReusableLatchTest.log.error(e.getMessage(), e);
+ }
+ }
+ }
+
+ latchReady = new CountDownLatch(numberOfThreads);
+ latchStart = new CountDownLatch(1);
+
+ ThreadDown down[] = new ThreadDown[numberOfThreads];
+
+ for (int i = 0; i < numberOfThreads; i++)
+ {
+ down[i] = new ThreadDown(latchReady, latchStart);
+ down[i].start();
+ }
+
+ latchReady.await();
+ latchStart.countDown();
+
+ for (int i = 0; i < numberOfThreads; i++)
+ {
+ down[i].join();
+ }
+
+ Assert.assertEquals(1, latch.getCount());
+
+ for (int i = 0; i < numberOfThreads; i++)
+ {
+ Assert.assertTrue(waits[i].waiting);
+ }
+
+ latch.countDown();
+
+ for (int i = 0; i < numberOfThreads; i++)
+ {
+ waits[i].join();
+ }
+
+ Assert.assertEquals(0, latch.getCount());
+
+ for (int i = 0; i < numberOfThreads; i++)
+ {
+ Assert.assertFalse(waits[i].waiting);
+ }
+ }
+
+ public void testReuseLatch() throws Exception
+ {
+ final ReusableLatch latch = new ReusableLatch(5);
+ for (int i = 0 ; i < 5; i++)
+ {
+ latch.countDown();
+ }
+
+ latch.countUp();
+
+ class ThreadWait extends Thread
+ {
+ private volatile boolean waiting = false;
+
+ private volatile Exception e;
+
+ private final CountDownLatch readyLatch = new CountDownLatch(1);
+
+ @Override
+ public void run()
+ {
+ waiting = true;
+ readyLatch.countDown();
+ try
+ {
+ if (!latch.await(1000))
+ {
+ ReusableLatchTest.log.error("Latch timed out!", new Exception("trace"));
+ }
+ }
+ catch (Exception e)
+ {
+ ReusableLatchTest.log.error(e);
+ this.e = e;
+ }
+ waiting = false;
+ }
+ }
+
+ ThreadWait t = new ThreadWait();
+ t.start();
+
+ t.readyLatch.await();
+
+ Assert.assertEquals(true, t.waiting);
+
+ latch.countDown();
+
+ t.join();
+
+ Assert.assertEquals(false, t.waiting);
+
+ Assert.assertNull(t.e);
+
+ latch.countUp();
+
+ t = new ThreadWait();
+ t.start();
+
+ t.readyLatch.await();
+
+ Assert.assertEquals(true, t.waiting);
+
+ latch.countDown();
+
+ t.join();
+
+ Assert.assertEquals(false, t.waiting);
+
+ Assert.assertNull(t.e);
+
+ Assert.assertTrue(latch.await(1000));
+
+ Assert.assertEquals(0, latch.getCount());
+
+ latch.countDown();
+
+ Assert.assertEquals(0, latch.getCount());
+
+ }
+
+}
Deleted: trunk/tests/src/org/hornetq/tests/unit/util/VariableLatchTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/util/VariableLatchTest.java 2010-07-22 22:39:51 UTC (rev 9459)
+++ trunk/tests/src/org/hornetq/tests/unit/util/VariableLatchTest.java 2010-07-23 04:51:06 UTC (rev 9460)
@@ -1,307 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.unit.util;
-
-import java.util.concurrent.CountDownLatch;
-
-import junit.framework.Assert;
-
-import org.hornetq.core.logging.Logger;
-import org.hornetq.tests.util.UnitTestCase;
-import org.hornetq.utils.VariableLatch;
-
-/**
- *
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
- *
- */
-public class VariableLatchTest extends UnitTestCase
-{
- private static final Logger log = Logger.getLogger(VariableLatchTest.class);
-
- public void testLatchOnSingleThread() throws Exception
- {
- VariableLatch latch = new VariableLatch();
-
- for (int i = 1; i <= 100; i++)
- {
- latch.up();
- Assert.assertEquals(i, latch.getCount());
- }
-
- for (int i = 100; i > 0; i--)
- {
- Assert.assertEquals(i, latch.getCount());
- latch.down();
- Assert.assertEquals(i - 1, latch.getCount());
- }
-
- latch.waitCompletion();
- }
-
- /**
- *
- * This test will open numberOfThreads threads, and add numberOfAdds on the
- * VariableLatch After those addthreads are finished, the latch count should
- * be numberOfThreads * numberOfAdds Then it will open numberOfThreads
- * threads again releasing numberOfAdds on the VariableLatch After those
- * releaseThreads are finished, the latch count should be 0 And all the
- * waiting threads should be finished also
- *
- * @throws Exception
- */
- public void testLatchOnMultiThread() throws Exception
- {
- final VariableLatch latch = new VariableLatch();
-
- latch.up(); // We hold at least one, so ThreadWaits won't go away
-
- final int numberOfThreads = 100;
- final int numberOfAdds = 100;
-
- class ThreadWait extends Thread
- {
- private volatile boolean waiting = true;
-
- @Override
- public void run()
- {
- try
- {
- if (!latch.waitCompletion(5000))
- {
- VariableLatchTest.log.error("Latch timed out");
- }
- }
- catch (Exception e)
- {
- VariableLatchTest.log.error(e);
- }
- waiting = false;
- }
- }
-
- class ThreadAdd extends Thread
- {
- private final CountDownLatch latchReady;
-
- private final CountDownLatch latchStart;
-
- ThreadAdd(final CountDownLatch latchReady, final CountDownLatch latchStart)
- {
- this.latchReady = latchReady;
- this.latchStart = latchStart;
- }
-
- @Override
- public void run()
- {
- try
- {
- latchReady.countDown();
- // Everybody should start at the same time, to worse concurrency
- // effects
- latchStart.await();
- for (int i = 0; i < numberOfAdds; i++)
- {
- latch.up();
- }
- }
- catch (Exception e)
- {
- VariableLatchTest.log.error(e.getMessage(), e);
- }
- }
- }
-
- CountDownLatch latchReady = new CountDownLatch(numberOfThreads);
- CountDownLatch latchStart = new CountDownLatch(1);
-
- ThreadAdd[] threadAdds = new ThreadAdd[numberOfThreads];
- ThreadWait waits[] = new ThreadWait[numberOfThreads];
-
- for (int i = 0; i < numberOfThreads; i++)
- {
- threadAdds[i] = new ThreadAdd(latchReady, latchStart);
- threadAdds[i].start();
- waits[i] = new ThreadWait();
- waits[i].start();
- }
-
- latchReady.await();
- latchStart.countDown();
-
- for (int i = 0; i < numberOfThreads; i++)
- {
- threadAdds[i].join();
- }
-
- for (int i = 0; i < numberOfThreads; i++)
- {
- Assert.assertTrue(waits[i].waiting);
- }
-
- Assert.assertEquals(numberOfThreads * numberOfAdds + 1, latch.getCount());
-
- class ThreadDown extends Thread
- {
- private final CountDownLatch latchReady;
-
- private final CountDownLatch latchStart;
-
- ThreadDown(final CountDownLatch latchReady, final CountDownLatch latchStart)
- {
- this.latchReady = latchReady;
- this.latchStart = latchStart;
- }
-
- @Override
- public void run()
- {
- try
- {
- latchReady.countDown();
- // Everybody should start at the same time, to worse concurrency
- // effects
- latchStart.await();
- for (int i = 0; i < numberOfAdds; i++)
- {
- latch.down();
- }
- }
- catch (Exception e)
- {
- VariableLatchTest.log.error(e.getMessage(), e);
- }
- }
- }
-
- latchReady = new CountDownLatch(numberOfThreads);
- latchStart = new CountDownLatch(1);
-
- ThreadDown down[] = new ThreadDown[numberOfThreads];
-
- for (int i = 0; i < numberOfThreads; i++)
- {
- down[i] = new ThreadDown(latchReady, latchStart);
- down[i].start();
- }
-
- latchReady.await();
- latchStart.countDown();
-
- for (int i = 0; i < numberOfThreads; i++)
- {
- down[i].join();
- }
-
- Assert.assertEquals(1, latch.getCount());
-
- for (int i = 0; i < numberOfThreads; i++)
- {
- Assert.assertTrue(waits[i].waiting);
- }
-
- latch.down();
-
- for (int i = 0; i < numberOfThreads; i++)
- {
- waits[i].join();
- }
-
- Assert.assertEquals(0, latch.getCount());
-
- for (int i = 0; i < numberOfThreads; i++)
- {
- Assert.assertFalse(waits[i].waiting);
- }
- }
-
- public void testReuseLatch() throws Exception
- {
- final VariableLatch latch = new VariableLatch();
- latch.up();
-
- class ThreadWait extends Thread
- {
- private volatile boolean waiting = false;
-
- private volatile Exception e;
-
- private final CountDownLatch readyLatch = new CountDownLatch(1);
-
- @Override
- public void run()
- {
- waiting = true;
- readyLatch.countDown();
- try
- {
- if (!latch.waitCompletion(1000))
- {
- VariableLatchTest.log.error("Latch timed out!", new Exception("trace"));
- }
- }
- catch (Exception e)
- {
- VariableLatchTest.log.error(e);
- this.e = e;
- }
- waiting = false;
- }
- }
-
- ThreadWait t = new ThreadWait();
- t.start();
-
- t.readyLatch.await();
-
- Assert.assertEquals(true, t.waiting);
-
- latch.down();
-
- t.join();
-
- Assert.assertEquals(false, t.waiting);
-
- Assert.assertNull(t.e);
-
- latch.up();
-
- t = new ThreadWait();
- t.start();
-
- t.readyLatch.await();
-
- Assert.assertEquals(true, t.waiting);
-
- latch.down();
-
- t.join();
-
- Assert.assertEquals(false, t.waiting);
-
- Assert.assertNull(t.e);
-
- Assert.assertTrue(latch.waitCompletion(1000));
-
- Assert.assertEquals(0, latch.getCount());
-
- latch.down();
-
- Assert.assertEquals(0, latch.getCount());
-
- }
-
-}
More information about the hornetq-commits
mailing list