JBoss hornetq SVN: r9463 - in trunk/src/main/org/hornetq: jms/client and 1 other directory.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-07-23 04:13:58 -0400 (Fri, 23 Jul 2010)
New Revision: 9463
Modified:
trunk/src/main/org/hornetq/api/jms/management/JMSManagementHelper.java
trunk/src/main/org/hornetq/jms/client/HornetQMessage.java
trunk/src/main/org/hornetq/jms/client/HornetQMessageProducer.java
trunk/src/main/org/hornetq/jms/client/HornetQSession.java
Log:
https://jira.jboss.org/browse/HORNETQ-434
Modified: trunk/src/main/org/hornetq/api/jms/management/JMSManagementHelper.java
===================================================================
--- trunk/src/main/org/hornetq/api/jms/management/JMSManagementHelper.java 2010-07-23 08:13:54 UTC (rev 9462)
+++ trunk/src/main/org/hornetq/api/jms/management/JMSManagementHelper.java 2010-07-23 08:13:58 UTC (rev 9463)
@@ -40,7 +40,7 @@
{
if (jmsMessage instanceof HornetQMessage == false)
{
- throw new IllegalArgumentException("Cannot send a non JBoss message as a management message " + jmsMessage.getClass()
+ throw new IllegalArgumentException("Cannot send a non HornetQ message as a management message " + jmsMessage.getClass()
.getName());
}
Modified: trunk/src/main/org/hornetq/jms/client/HornetQMessage.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQMessage.java 2010-07-23 08:13:54 UTC (rev 9462)
+++ trunk/src/main/org/hornetq/jms/client/HornetQMessage.java 2010-07-23 08:13:58 UTC (rev 9463)
@@ -442,7 +442,7 @@
{
if (dest instanceof HornetQDestination == false)
{
- throw new InvalidDestinationException("Not a JBoss destination " + dest);
+ throw new InvalidDestinationException("Not a HornetQ destination " + dest);
}
HornetQDestination jbd = (HornetQDestination)dest;
Modified: trunk/src/main/org/hornetq/jms/client/HornetQMessageProducer.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQMessageProducer.java 2010-07-23 08:13:54 UTC (rev 9462)
+++ trunk/src/main/org/hornetq/jms/client/HornetQMessageProducer.java 2010-07-23 08:13:58 UTC (rev 9463)
@@ -216,7 +216,7 @@
if (destination != null && !(destination instanceof HornetQDestination))
{
- throw new InvalidDestinationException("Not a JBoss Destination:" + destination);
+ throw new InvalidDestinationException("Not a HornetQ Destination:" + destination);
}
message.setJMSDeliveryMode(defaultDeliveryMode);
@@ -236,7 +236,7 @@
if (destination != null && !(destination instanceof HornetQDestination))
{
- throw new InvalidDestinationException("Not a JBoss Destination:" + destination);
+ throw new InvalidDestinationException("Not a HornetQ Destination:" + destination);
}
message.setJMSDeliveryMode(deliveryMode);
Modified: trunk/src/main/org/hornetq/jms/client/HornetQSession.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQSession.java 2010-07-23 08:13:54 UTC (rev 9462)
+++ trunk/src/main/org/hornetq/jms/client/HornetQSession.java 2010-07-23 08:13:58 UTC (rev 9463)
@@ -313,7 +313,7 @@
{
if (destination != null && !(destination instanceof HornetQDestination))
{
- throw new InvalidDestinationException("Not a JBoss Destination:" + destination);
+ throw new InvalidDestinationException("Not a HornetQ Destination:" + destination);
}
try
13 years, 10 months
JBoss hornetq SVN: r9462 - branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-07-23 04:13:54 -0400 (Fri, 23 Jul 2010)
New Revision: 9462
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
removed invalid character
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-07-23 08:11:27 UTC (rev 9461)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-07-23 08:13:54 UTC (rev 9462)
@@ -736,7 +736,7 @@
started = true;
- HornetQServerImpl.log.info("HornetQ Server version " + getVersion().getFullVersion() + " [" + nodeID + "]�started");
+ HornetQServerImpl.log.info("HornetQ Server version " + getVersion().getFullVersion() + " [" + nodeID + "] started");
if (configuration.isBackup())
{
@@ -909,7 +909,7 @@
backupActivationThread.join();
}
- HornetQServerImpl.log.info("HornetQ Server version " + getVersion().getFullVersion() + " [" + nodeID + "]�stopped");
+ HornetQServerImpl.log.info("HornetQ Server version " + getVersion().getFullVersion() + " [" + nodeID + "]�stopped");
Logger.reset();
}
13 years, 10 months
JBoss hornetq SVN: r9461 - trunk/src/main/org/hornetq/core/remoting/impl/netty.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-07-23 04:11:27 -0400 (Fri, 23 Jul 2010)
New Revision: 9461
Modified:
trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java
Log:
fixed batching
Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java 2010-07-23 04:51:06 UTC (rev 9460)
+++ trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java 2010-07-23 08:11:27 UTC (rev 9461)
@@ -162,72 +162,74 @@
public void write(HornetQBuffer buffer, final boolean flush, final boolean batched)
{
- if (writeLock.compareAndSet(false, true))
+ while (!writeLock.compareAndSet(false, true))
{
- try
+ Thread.yield();
+ }
+
+ try
+ {
+ if (batchBuffer == null && batchingEnabled && batched && !flush)
{
- if (batchBuffer == null && batchingEnabled && batched && !flush)
+ // Lazily create batch buffer
+
+ batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
+ }
+
+ if (batchBuffer != null)
+ {
+ batchBuffer.writeBytes(buffer, 0, buffer.writerIndex());
+
+ if (batchBuffer.writerIndex() >= BATCHING_BUFFER_SIZE || !batched || flush)
{
- // Lazily create batch buffer
+ // If the batch buffer is full or it's flush param or not batched then flush the buffer
- batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
+ buffer = batchBuffer;
}
+ else
+ {
+ return;
+ }
- if (batchBuffer != null)
+ if (!batched || flush)
{
- batchBuffer.writeBytes(buffer, 0, buffer.writerIndex());
+ batchBuffer = null;
+ }
+ else
+ {
+ // Create a new buffer
- if (batchBuffer.writerIndex() >= BATCHING_BUFFER_SIZE || !batched || flush)
- {
- // If the batch buffer is full or it's flush param or not batched then flush the buffer
-
- buffer = batchBuffer;
- }
- else
- {
- return;
- }
-
- if (!batched || flush)
- {
- batchBuffer = null;
- }
- else
- {
- // Create a new buffer
-
- batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
- }
+ batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
}
+ }
- ChannelFuture future = channel.write(buffer.channelBuffer());
+ ChannelFuture future = channel.write(buffer.channelBuffer());
- if (flush)
+ if (flush)
+ {
+ while (true)
{
- while (true)
+ try
{
- try
- {
- boolean ok = future.await(10000);
+ boolean ok = future.await(10000);
- if (!ok)
- {
- NettyConnection.log.warn("Timed out waiting for packet to be flushed");
- }
-
- break;
- }
- catch (InterruptedException ignore)
+ if (!ok)
{
+ NettyConnection.log.warn("Timed out waiting for packet to be flushed");
}
+
+ break;
}
+ catch (InterruptedException ignore)
+ {
+ }
}
}
- finally
- {
- writeLock.set(false);
- }
}
+ finally
+ {
+ writeLock.set(false);
+ }
}
public String getRemoteAddress()
13 years, 10 months
JBoss hornetq SVN: r9460 - in trunk: src/main/org/hornetq/core/journal/impl and 5 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-07-23 00:51:06 -0400 (Fri, 23 Jul 2010)
New Revision: 9460
Added:
trunk/src/main/org/hornetq/utils/ReusableLatch.java
trunk/tests/src/org/hornetq/tests/timing/util/ReusableLatchTest.java
trunk/tests/src/org/hornetq/tests/unit/util/ReusableLatchTest.java
Removed:
trunk/src/main/org/hornetq/utils/VariableLatch.java
trunk/tests/src/org/hornetq/tests/timing/util/VariableLatchTest.java
trunk/tests/src/org/hornetq/tests/unit/util/VariableLatchTest.java
Modified:
trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
trunk/src/main/org/hornetq/core/journal/impl/TransactionCallback.java
trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactStressTest.java
Log:
Renaming VariableLatch as ReusableLatch (wanted to do this for a long time already)
Modified: trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2010-07-22 22:39:51 UTC (rev 9459)
+++ trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2010-07-23 04:51:06 UTC (rev 9460)
@@ -29,7 +29,7 @@
import org.hornetq.core.asyncio.AsynchronousFile;
import org.hornetq.core.asyncio.BufferCallback;
import org.hornetq.core.logging.Logger;
-import org.hornetq.utils.VariableLatch;
+import org.hornetq.utils.ReusableLatch;
/**
*
@@ -146,7 +146,7 @@
**/
private final Lock callbackLock = new ReentrantLock();
- private final VariableLatch pollerLatch = new VariableLatch();
+ private final ReusableLatch pollerLatch = new ReusableLatch();
private volatile Runnable poller;
@@ -154,7 +154,7 @@
private final Lock writeLock = new ReentrantReadWriteLock().writeLock();
- private final VariableLatch pendingWrites = new VariableLatch();
+ private final ReusableLatch pendingWrites = new ReusableLatch();
private Semaphore maxIOSemaphore;
@@ -242,7 +242,7 @@
try
{
- while (!pendingWrites.waitCompletion(60000))
+ while (!pendingWrites.await(60000))
{
AsynchronousFileImpl.log.warn("Couldn't get lock after 60 seconds on closing AsynchronousFileImpl::" + fileName);
}
@@ -299,7 +299,7 @@
startPoller();
}
- pendingWrites.up();
+ pendingWrites.countUp();
if (writeExecutor != null)
{
@@ -362,7 +362,7 @@
{
startPoller();
}
- pendingWrites.up();
+ pendingWrites.countUp();
maxIOSemaphore.acquireUninterruptibly();
try
{
@@ -372,14 +372,14 @@
{
// Release only if an exception happened
maxIOSemaphore.release();
- pendingWrites.down();
+ pendingWrites.countDown();
throw e;
}
catch (RuntimeException e)
{
// Release only if an exception happened
maxIOSemaphore.release();
- pendingWrites.down();
+ pendingWrites.countDown();
throw e;
}
}
@@ -457,7 +457,7 @@
{
maxIOSemaphore.release();
- pendingWrites.down();
+ pendingWrites.countDown();
callbackLock.lock();
@@ -524,7 +524,7 @@
maxIOSemaphore.release();
- pendingWrites.down();
+ pendingWrites.countDown();
callbackLock.lock();
@@ -578,7 +578,7 @@
if (poller == null)
{
- pollerLatch.up();
+ pollerLatch.countUp();
poller = new PollerRunnable();
try
{
@@ -613,7 +613,7 @@
AsynchronousFileImpl.stopPoller(handler);
// We need to make sure we won't call close until Poller is
// completely done, or we might get beautiful GPFs
- pollerLatch.waitCompletion();
+ pollerLatch.await();
}
// Native ----------------------------------------------------------------------------
@@ -729,7 +729,7 @@
// Case the poller thread is interrupted, this will allow us to
// restart the thread when required
poller = null;
- pollerLatch.down();
+ pollerLatch.countDown();
}
}
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/TransactionCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TransactionCallback.java 2010-07-22 22:39:51 UTC (rev 9459)
+++ trunk/src/main/org/hornetq/core/journal/impl/TransactionCallback.java 2010-07-23 04:51:06 UTC (rev 9460)
@@ -14,7 +14,7 @@
package org.hornetq.core.journal.impl;
import org.hornetq.core.journal.IOAsyncTask;
-import org.hornetq.utils.VariableLatch;
+import org.hornetq.utils.ReusableLatch;
/**
* A TransactionCallback
@@ -25,7 +25,7 @@
*/
public class TransactionCallback implements IOAsyncTask
{
- private final VariableLatch countLatch = new VariableLatch();
+ private final ReusableLatch countLatch = new ReusableLatch();
private volatile String errorMessage = null;
@@ -40,12 +40,12 @@
public void countUp()
{
up++;
- countLatch.up();
+ countLatch.countUp();
}
public void done()
{
- countLatch.down();
+ countLatch.countDown();
if (++done == up && delegateCompletion != null)
{
final IOAsyncTask delegateToCall = delegateCompletion;
@@ -58,7 +58,7 @@
public void waitCompletion() throws InterruptedException
{
- countLatch.waitCompletion();
+ countLatch.await();
if (errorMessage != null)
{
@@ -72,7 +72,7 @@
this.errorCode = errorCode;
- countLatch.down();
+ countLatch.countDown();
if (delegateCompletion != null)
{
Copied: trunk/src/main/org/hornetq/utils/ReusableLatch.java (from rev 9459, trunk/src/main/org/hornetq/utils/VariableLatch.java)
===================================================================
--- trunk/src/main/org/hornetq/utils/ReusableLatch.java (rev 0)
+++ trunk/src/main/org/hornetq/utils/ReusableLatch.java 2010-07-23 04:51:06 UTC (rev 9460)
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+
+/**
+ *
+ * <p>This class will use the framework provided to by AbstractQueuedSynchronizer.</p>
+ * <p>AbstractQueuedSynchronizer is the framework for any sort of concurrent synchronization, such as Semaphores, events, etc, based on AtomicIntegers.</p>
+ *
+ * <p>This class works just like CountDownLatch, with the difference you can also increase the counter</p>
+ *
+ * <p>It could be used for sync points when one process is feeding the latch while another will wait when everything is done. (e.g. waiting IO completions to finish)</p>
+ *
+ * <p>On HornetQ we have the requirement of increment and decrement a counter until the user fires a ready event (commit). At that point we just act as a regular countDown.</p>
+ *
+ * <p>Note: This latch is reusable. Once it reaches zero, you can call up again, and reuse it on further waits.</p>
+ *
+ * <p>For example: prepareTransaction will wait for the current completions, and further adds will be called on the latch. Later on when commit is called you can reuse the same latch.</p>
+ *
+ * @author Clebert Suconic
+ * */
+public class ReusableLatch
+{
+ /**
+ * Look at the doc and examples provided by AbstractQueuedSynchronizer for more information
+ * @see AbstractQueuedSynchronizer*/
+ @SuppressWarnings("serial")
+ private static class CountSync extends AbstractQueuedSynchronizer
+ {
+ public CountSync(int count)
+ {
+ setState(count);
+ }
+
+ public int getCount()
+ {
+ return getState();
+ }
+
+ @Override
+ public int tryAcquireShared(final int numberOfAqcquires)
+ {
+ return getState() == 0 ? 1 : -1;
+ }
+
+ public void add()
+ {
+ for (;;)
+ {
+ int actualState = getState();
+ int newState = actualState + 1;
+ if (compareAndSetState(actualState, newState))
+ {
+ return;
+ }
+ }
+ }
+
+ @Override
+ public boolean tryReleaseShared(final int numberOfReleases)
+ {
+ for (;;)
+ {
+ int actualState = getState();
+ if (actualState == 0)
+ {
+ return true;
+ }
+
+ int newState = actualState - numberOfReleases;
+
+ if (compareAndSetState(actualState, newState))
+ {
+ return newState == 0;
+ }
+ }
+ }
+ }
+
+ private final CountSync control;
+
+ public ReusableLatch()
+ {
+ this(0);
+ }
+
+ public ReusableLatch(final int count)
+ {
+ control = new CountSync(count);
+ }
+
+ public int getCount()
+ {
+ return control.getCount();
+ }
+
+ public void countUp()
+ {
+ control.add();
+ }
+
+ public void countDown()
+ {
+ control.releaseShared(1);
+ }
+
+ public void await() throws InterruptedException
+ {
+ control.acquireSharedInterruptibly(1);
+ }
+
+ public boolean await(final long milliseconds) throws InterruptedException
+ {
+ return control.tryAcquireSharedNanos(1, TimeUnit.MILLISECONDS.toNanos(milliseconds));
+ }
+
+ public boolean await(final long timeWait, TimeUnit timeUnit) throws InterruptedException
+ {
+ return control.tryAcquireSharedNanos(1, timeUnit.toNanos(timeWait));
+ }
+}
Deleted: trunk/src/main/org/hornetq/utils/VariableLatch.java
===================================================================
--- trunk/src/main/org/hornetq/utils/VariableLatch.java 2010-07-22 22:39:51 UTC (rev 9459)
+++ trunk/src/main/org/hornetq/utils/VariableLatch.java 2010-07-23 04:51:06 UTC (rev 9460)
@@ -1,118 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.utils;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.AbstractQueuedSynchronizer;
-
-/**
- *
- * <p>This class will use the framework provided to by AbstractQueuedSynchronizer.</p>
- * <p>AbstractQueuedSynchronizer is the framework for any sort of concurrent synchronization, such as Semaphores, events, etc, based on AtomicIntegers.</p>
- *
- * <p>The idea is, instead of providing each user specific Latch/Synchronization, java.util.concurrent provides the framework for reuses, based on an AtomicInteger (getState())</p>
- *
- * <p>On HornetQ we have the requirement of increment and decrement a counter until the user fires a ready event (commit). At that point we just act as a regular countDown.</p>
- *
- * <p>Note: This latch is reusable. Once it reaches zero, you can call up again, and reuse it on further waits.</p>
- *
- * <p>For example: prepareTransaction will wait for the current completions, and further adds will be called on the latch. Later on when commit is called you can reuse the same latch.</p>
- *
- * @author Clebert Suconic
- * */
-public class VariableLatch
-{
- /**
- * Look at the doc and examples provided by AbstractQueuedSynchronizer for more information
- * @see AbstractQueuedSynchronizer*/
- @SuppressWarnings("serial")
- private static class CountSync extends AbstractQueuedSynchronizer
- {
- public CountSync()
- {
- setState(0);
- }
-
- public int getCount()
- {
- return getState();
- }
-
- @Override
- public int tryAcquireShared(final int numberOfAqcquires)
- {
- return getState() == 0 ? 1 : -1;
- }
-
- public void add()
- {
- for (;;)
- {
- int actualState = getState();
- int newState = actualState + 1;
- if (compareAndSetState(actualState, newState))
- {
- return;
- }
- }
- }
-
- @Override
- public boolean tryReleaseShared(final int numberOfReleases)
- {
- for (;;)
- {
- int actualState = getState();
- if (actualState == 0)
- {
- return true;
- }
-
- int newState = actualState - numberOfReleases;
-
- if (compareAndSetState(actualState, newState))
- {
- return newState == 0;
- }
- }
- }
- }
-
- private final CountSync control = new CountSync();
-
- public int getCount()
- {
- return control.getCount();
- }
-
- public void up()
- {
- control.add();
- }
-
- public void down()
- {
- control.releaseShared(1);
- }
-
- public void waitCompletion() throws InterruptedException
- {
- control.acquireSharedInterruptibly(1);
- }
-
- public boolean waitCompletion(final long milliseconds) throws InterruptedException
- {
- return control.tryAcquireSharedNanos(1, TimeUnit.MILLISECONDS.toNanos(milliseconds));
- }
-}
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-07-22 22:39:51 UTC (rev 9459)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-07-23 04:51:06 UTC (rev 9460)
@@ -39,7 +39,7 @@
import org.hornetq.utils.IDGenerator;
import org.hornetq.utils.SimpleIDGenerator;
import org.hornetq.utils.TimeAndCounterIDGenerator;
-import org.hornetq.utils.VariableLatch;
+import org.hornetq.utils.ReusableLatch;
/**
*
@@ -571,10 +571,10 @@
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
- final VariableLatch reusableLatchDone = new VariableLatch();
- reusableLatchDone.up();
- final VariableLatch reusableLatchWait = new VariableLatch();
- reusableLatchWait.up();
+ final ReusableLatch reusableLatchDone = new ReusableLatch();
+ reusableLatchDone.countUp();
+ final ReusableLatch reusableLatchWait = new ReusableLatch();
+ reusableLatchWait.countUp();
journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
{
@@ -582,11 +582,11 @@
@Override
public void onCompactDone()
{
- reusableLatchDone.down();
+ reusableLatchDone.countDown();
System.out.println("Waiting on Compact");
try
{
- reusableLatchWait.waitCompletion();
+ reusableLatchWait.await();
}
catch (InterruptedException e)
{
@@ -631,7 +631,7 @@
tCompact.start();
- reusableLatchDone.waitCompletion();
+ reusableLatchDone.await();
addTx(appendTX, addedRecord);
@@ -643,7 +643,7 @@
delete(addedRecord);
- reusableLatchWait.down();
+ reusableLatchWait.countDown();
tCompact.join();
@@ -672,10 +672,10 @@
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
- final VariableLatch reusableLatchDone = new VariableLatch();
- reusableLatchDone.up();
- final VariableLatch reusableLatchWait = new VariableLatch();
- reusableLatchWait.up();
+ final ReusableLatch reusableLatchDone = new ReusableLatch();
+ reusableLatchDone.countUp();
+ final ReusableLatch reusableLatchWait = new ReusableLatch();
+ reusableLatchWait.countUp();
journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
{
@@ -683,11 +683,11 @@
@Override
public void onCompactDone()
{
- reusableLatchDone.down();
+ reusableLatchDone.countDown();
System.out.println("Waiting on Compact");
try
{
- reusableLatchWait.waitCompletion();
+ reusableLatchWait.await();
}
catch (InterruptedException e)
{
@@ -732,7 +732,7 @@
tCompact.start();
- reusableLatchDone.waitCompletion();
+ reusableLatchDone.await();
addTx(appendTX, addedRecord);
commit(appendTX);
@@ -745,7 +745,7 @@
commit(deleteTXID);
- reusableLatchWait.down();
+ reusableLatchWait.countDown();
tCompact.join();
@@ -768,10 +768,10 @@
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
- final VariableLatch reusableLatchDone = new VariableLatch();
- reusableLatchDone.up();
- final VariableLatch reusableLatchWait = new VariableLatch();
- reusableLatchWait.up();
+ final ReusableLatch reusableLatchDone = new ReusableLatch();
+ reusableLatchDone.countUp();
+ final ReusableLatch reusableLatchWait = new ReusableLatch();
+ reusableLatchWait.countUp();
journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
{
@@ -779,11 +779,11 @@
@Override
public void onCompactDone()
{
- reusableLatchDone.down();
+ reusableLatchDone.countDown();
System.out.println("Waiting on Compact");
try
{
- reusableLatchWait.waitCompletion();
+ reusableLatchWait.await();
}
catch (InterruptedException e)
{
@@ -829,13 +829,13 @@
tCompact.start();
- reusableLatchDone.waitCompletion();
+ reusableLatchDone.await();
addTx(consumerTX, addedRecord);
commit(consumerTX);
delete(addedRecord);
- reusableLatchWait.down();
+ reusableLatchWait.countDown();
tCompact.join();
@@ -857,10 +857,10 @@
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
- final VariableLatch reusableLatchDone = new VariableLatch();
- reusableLatchDone.up();
- final VariableLatch reusableLatchWait = new VariableLatch();
- reusableLatchWait.up();
+ final ReusableLatch reusableLatchDone = new ReusableLatch();
+ reusableLatchDone.countUp();
+ final ReusableLatch reusableLatchWait = new ReusableLatch();
+ reusableLatchWait.countUp();
journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
{
@@ -868,11 +868,11 @@
@Override
public void onCompactDone()
{
- reusableLatchDone.down();
+ reusableLatchDone.countDown();
System.out.println("Waiting on Compact");
try
{
- reusableLatchWait.waitCompletion();
+ reusableLatchWait.await();
}
catch (InterruptedException e)
{
@@ -915,7 +915,7 @@
tCompact.start();
- reusableLatchDone.waitCompletion();
+ reusableLatchDone.await();
addTx(consumerTX, firstID);
@@ -929,7 +929,7 @@
delete(addedRecord);
- reusableLatchWait.down();
+ reusableLatchWait.countDown();
tCompact.join();
@@ -959,10 +959,10 @@
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
- final VariableLatch reusableLatchDone = new VariableLatch();
- reusableLatchDone.up();
- final VariableLatch reusableLatchWait = new VariableLatch();
- reusableLatchWait.up();
+ final ReusableLatch reusableLatchDone = new ReusableLatch();
+ reusableLatchDone.countUp();
+ final ReusableLatch reusableLatchWait = new ReusableLatch();
+ reusableLatchWait.countUp();
journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
{
@@ -970,11 +970,11 @@
@Override
public void onCompactDone()
{
- reusableLatchDone.down();
+ reusableLatchDone.countDown();
System.out.println("Waiting on Compact");
try
{
- reusableLatchWait.waitCompletion();
+ reusableLatchWait.await();
}
catch (InterruptedException e)
{
@@ -1016,7 +1016,7 @@
tCompact.start();
- reusableLatchDone.waitCompletion();
+ reusableLatchDone.await();
addTx(appendTX, appendTwo);
@@ -1028,7 +1028,7 @@
commit(updateTX);
//delete(appendTwo);
- reusableLatchWait.down();
+ reusableLatchWait.countDown();
tCompact.join();
journal.compact();
Modified: trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactStressTest.java 2010-07-22 22:39:51 UTC (rev 9459)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactStressTest.java 2010-07-23 04:51:06 UTC (rev 9460)
@@ -21,7 +21,7 @@
import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
import org.hornetq.tests.unit.core.journal.impl.JournalImplTestBase;
import org.hornetq.utils.SimpleIDGenerator;
-import org.hornetq.utils.VariableLatch;
+import org.hornetq.utils.ReusableLatch;
/**
* A NIORandomCompactTest
@@ -37,9 +37,9 @@
// Attributes ----------------------------------------------------
- private VariableLatch startedCompactingLatch = null;
+ private ReusableLatch startedCompactingLatch = null;
- private VariableLatch releaseCompactingLatch = null;
+ private ReusableLatch releaseCompactingLatch = null;
private Thread tCompact = null;
@@ -55,9 +55,9 @@
tCompact = null;
- startedCompactingLatch = new VariableLatch();
+ startedCompactingLatch = new ReusableLatch();
- releaseCompactingLatch = new VariableLatch();
+ releaseCompactingLatch = new ReusableLatch();
File file = new File(getTestDir());
@@ -107,10 +107,10 @@
@Override
public void onCompactDone()
{
- startedCompactingLatch.down();
+ startedCompactingLatch.countDown();
try
{
- releaseCompactingLatch.waitCompletion();
+ releaseCompactingLatch.await();
}
catch (InterruptedException e)
{
@@ -283,7 +283,7 @@
*/
private void joinCompact() throws InterruptedException
{
- releaseCompactingLatch.down();
+ releaseCompactingLatch.countDown();
tCompact.join();
@@ -315,7 +315,7 @@
tCompact.start();
- startedCompactingLatch.waitCompletion();
+ startedCompactingLatch.await();
}
/* (non-Javadoc)
Copied: trunk/tests/src/org/hornetq/tests/timing/util/ReusableLatchTest.java (from rev 9459, trunk/tests/src/org/hornetq/tests/timing/util/VariableLatchTest.java)
===================================================================
--- trunk/tests/src/org/hornetq/tests/timing/util/ReusableLatchTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/timing/util/ReusableLatchTest.java 2010-07-23 04:51:06 UTC (rev 9460)
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.timing.util;
+
+import junit.framework.Assert;
+
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.ReusableLatch;
+
+/**
+ * @author <a href="csuconic(a)redhat.com">Clebert Suconic</a>
+ */
+public class ReusableLatchTest extends UnitTestCase
+{
+ public void testTimeout() throws Exception
+ {
+ ReusableLatch latch = new ReusableLatch();
+
+ latch.countUp();
+
+ long start = System.currentTimeMillis();
+ Assert.assertFalse(latch.await(1000));
+ long end = System.currentTimeMillis();
+
+ Assert.assertTrue("Timeout didn't work correctly", end - start >= 1000 && end - start < 2000);
+ }
+}
Deleted: trunk/tests/src/org/hornetq/tests/timing/util/VariableLatchTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/timing/util/VariableLatchTest.java 2010-07-22 22:39:51 UTC (rev 9459)
+++ trunk/tests/src/org/hornetq/tests/timing/util/VariableLatchTest.java 2010-07-23 04:51:06 UTC (rev 9460)
@@ -1,37 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.tests.timing.util;
-
-import junit.framework.Assert;
-
-import org.hornetq.tests.util.UnitTestCase;
-import org.hornetq.utils.VariableLatch;
-
-/**
- * @author <a href="csuconic(a)redhat.com">Clebert Suconic</a>
- */
-public class VariableLatchTest extends UnitTestCase
-{
- public void testTimeout() throws Exception
- {
- VariableLatch latch = new VariableLatch();
-
- latch.up();
-
- long start = System.currentTimeMillis();
- Assert.assertFalse(latch.waitCompletion(1000));
- long end = System.currentTimeMillis();
-
- Assert.assertTrue("Timeout didn't work correctly", end - start >= 1000 && end - start < 2000);
- }
-}
Copied: trunk/tests/src/org/hornetq/tests/unit/util/ReusableLatchTest.java (from rev 9459, trunk/tests/src/org/hornetq/tests/unit/util/VariableLatchTest.java)
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/util/ReusableLatchTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/unit/util/ReusableLatchTest.java 2010-07-23 04:51:06 UTC (rev 9460)
@@ -0,0 +1,312 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.unit.util;
+
+import java.util.concurrent.CountDownLatch;
+
+import junit.framework.Assert;
+
+import org.hornetq.core.logging.Logger;
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.ReusableLatch;
+
+/**
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
+ *
+ */
+public class ReusableLatchTest extends UnitTestCase
+{
+ private static final Logger log = Logger.getLogger(ReusableLatchTest.class);
+
+ public void testLatchOnSingleThread() throws Exception
+ {
+ ReusableLatch latch = new ReusableLatch();
+
+ for (int i = 1; i <= 100; i++)
+ {
+ latch.countUp();
+ Assert.assertEquals(i, latch.getCount());
+ }
+
+ for (int i = 100; i > 0; i--)
+ {
+ Assert.assertEquals(i, latch.getCount());
+ latch.countDown();
+ Assert.assertEquals(i - 1, latch.getCount());
+ }
+
+ latch.await();
+ }
+
+ /**
+ *
+ * This test will open numberOfThreads threads, and add numberOfAdds on the
+ * VariableLatch After those addthreads are finished, the latch count should
+ * be numberOfThreads * numberOfAdds Then it will open numberOfThreads
+ * threads again releasing numberOfAdds on the VariableLatch After those
+ * releaseThreads are finished, the latch count should be 0 And all the
+ * waiting threads should be finished also
+ *
+ * @throws Exception
+ */
+ public void testLatchOnMultiThread() throws Exception
+ {
+ final ReusableLatch latch = new ReusableLatch();
+
+ latch.countUp(); // We hold at least one, so ThreadWaits won't go away
+
+ final int numberOfThreads = 100;
+ final int numberOfAdds = 100;
+
+ class ThreadWait extends Thread
+ {
+ private volatile boolean waiting = true;
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ if (!latch.await(5000))
+ {
+ ReusableLatchTest.log.error("Latch timed out");
+ }
+ }
+ catch (Exception e)
+ {
+ ReusableLatchTest.log.error(e);
+ }
+ waiting = false;
+ }
+ }
+
+ class ThreadAdd extends Thread
+ {
+ private final CountDownLatch latchReady;
+
+ private final CountDownLatch latchStart;
+
+ ThreadAdd(final CountDownLatch latchReady, final CountDownLatch latchStart)
+ {
+ this.latchReady = latchReady;
+ this.latchStart = latchStart;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ latchReady.countDown();
+ // Everybody should start at the same time, to worse concurrency
+ // effects
+ latchStart.await();
+ for (int i = 0; i < numberOfAdds; i++)
+ {
+ latch.countUp();
+ }
+ }
+ catch (Exception e)
+ {
+ ReusableLatchTest.log.error(e.getMessage(), e);
+ }
+ }
+ }
+
+ CountDownLatch latchReady = new CountDownLatch(numberOfThreads);
+ CountDownLatch latchStart = new CountDownLatch(1);
+
+ ThreadAdd[] threadAdds = new ThreadAdd[numberOfThreads];
+ ThreadWait waits[] = new ThreadWait[numberOfThreads];
+
+ for (int i = 0; i < numberOfThreads; i++)
+ {
+ threadAdds[i] = new ThreadAdd(latchReady, latchStart);
+ threadAdds[i].start();
+ waits[i] = new ThreadWait();
+ waits[i].start();
+ }
+
+ latchReady.await();
+ latchStart.countDown();
+
+ for (int i = 0; i < numberOfThreads; i++)
+ {
+ threadAdds[i].join();
+ }
+
+ for (int i = 0; i < numberOfThreads; i++)
+ {
+ Assert.assertTrue(waits[i].waiting);
+ }
+
+ Assert.assertEquals(numberOfThreads * numberOfAdds + 1, latch.getCount());
+
+ class ThreadDown extends Thread
+ {
+ private final CountDownLatch latchReady;
+
+ private final CountDownLatch latchStart;
+
+ ThreadDown(final CountDownLatch latchReady, final CountDownLatch latchStart)
+ {
+ this.latchReady = latchReady;
+ this.latchStart = latchStart;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ latchReady.countDown();
+ // Everybody should start at the same time, to worse concurrency
+ // effects
+ latchStart.await();
+ for (int i = 0; i < numberOfAdds; i++)
+ {
+ latch.countDown();
+ }
+ }
+ catch (Exception e)
+ {
+ ReusableLatchTest.log.error(e.getMessage(), e);
+ }
+ }
+ }
+
+ latchReady = new CountDownLatch(numberOfThreads);
+ latchStart = new CountDownLatch(1);
+
+ ThreadDown down[] = new ThreadDown[numberOfThreads];
+
+ for (int i = 0; i < numberOfThreads; i++)
+ {
+ down[i] = new ThreadDown(latchReady, latchStart);
+ down[i].start();
+ }
+
+ latchReady.await();
+ latchStart.countDown();
+
+ for (int i = 0; i < numberOfThreads; i++)
+ {
+ down[i].join();
+ }
+
+ Assert.assertEquals(1, latch.getCount());
+
+ for (int i = 0; i < numberOfThreads; i++)
+ {
+ Assert.assertTrue(waits[i].waiting);
+ }
+
+ latch.countDown();
+
+ for (int i = 0; i < numberOfThreads; i++)
+ {
+ waits[i].join();
+ }
+
+ Assert.assertEquals(0, latch.getCount());
+
+ for (int i = 0; i < numberOfThreads; i++)
+ {
+ Assert.assertFalse(waits[i].waiting);
+ }
+ }
+
+ public void testReuseLatch() throws Exception
+ {
+ final ReusableLatch latch = new ReusableLatch(5);
+ for (int i = 0 ; i < 5; i++)
+ {
+ latch.countDown();
+ }
+
+ latch.countUp();
+
+ class ThreadWait extends Thread
+ {
+ private volatile boolean waiting = false;
+
+ private volatile Exception e;
+
+ private final CountDownLatch readyLatch = new CountDownLatch(1);
+
+ @Override
+ public void run()
+ {
+ waiting = true;
+ readyLatch.countDown();
+ try
+ {
+ if (!latch.await(1000))
+ {
+ ReusableLatchTest.log.error("Latch timed out!", new Exception("trace"));
+ }
+ }
+ catch (Exception e)
+ {
+ ReusableLatchTest.log.error(e);
+ this.e = e;
+ }
+ waiting = false;
+ }
+ }
+
+ ThreadWait t = new ThreadWait();
+ t.start();
+
+ t.readyLatch.await();
+
+ Assert.assertEquals(true, t.waiting);
+
+ latch.countDown();
+
+ t.join();
+
+ Assert.assertEquals(false, t.waiting);
+
+ Assert.assertNull(t.e);
+
+ latch.countUp();
+
+ t = new ThreadWait();
+ t.start();
+
+ t.readyLatch.await();
+
+ Assert.assertEquals(true, t.waiting);
+
+ latch.countDown();
+
+ t.join();
+
+ Assert.assertEquals(false, t.waiting);
+
+ Assert.assertNull(t.e);
+
+ Assert.assertTrue(latch.await(1000));
+
+ Assert.assertEquals(0, latch.getCount());
+
+ latch.countDown();
+
+ Assert.assertEquals(0, latch.getCount());
+
+ }
+
+}
Deleted: trunk/tests/src/org/hornetq/tests/unit/util/VariableLatchTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/util/VariableLatchTest.java 2010-07-22 22:39:51 UTC (rev 9459)
+++ trunk/tests/src/org/hornetq/tests/unit/util/VariableLatchTest.java 2010-07-23 04:51:06 UTC (rev 9460)
@@ -1,307 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.unit.util;
-
-import java.util.concurrent.CountDownLatch;
-
-import junit.framework.Assert;
-
-import org.hornetq.core.logging.Logger;
-import org.hornetq.tests.util.UnitTestCase;
-import org.hornetq.utils.VariableLatch;
-
-/**
- *
- * @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
- *
- */
-public class VariableLatchTest extends UnitTestCase
-{
- private static final Logger log = Logger.getLogger(VariableLatchTest.class);
-
- public void testLatchOnSingleThread() throws Exception
- {
- VariableLatch latch = new VariableLatch();
-
- for (int i = 1; i <= 100; i++)
- {
- latch.up();
- Assert.assertEquals(i, latch.getCount());
- }
-
- for (int i = 100; i > 0; i--)
- {
- Assert.assertEquals(i, latch.getCount());
- latch.down();
- Assert.assertEquals(i - 1, latch.getCount());
- }
-
- latch.waitCompletion();
- }
-
- /**
- *
- * This test will open numberOfThreads threads, and add numberOfAdds on the
- * VariableLatch After those addthreads are finished, the latch count should
- * be numberOfThreads * numberOfAdds Then it will open numberOfThreads
- * threads again releasing numberOfAdds on the VariableLatch After those
- * releaseThreads are finished, the latch count should be 0 And all the
- * waiting threads should be finished also
- *
- * @throws Exception
- */
- public void testLatchOnMultiThread() throws Exception
- {
- final VariableLatch latch = new VariableLatch();
-
- latch.up(); // We hold at least one, so ThreadWaits won't go away
-
- final int numberOfThreads = 100;
- final int numberOfAdds = 100;
-
- class ThreadWait extends Thread
- {
- private volatile boolean waiting = true;
-
- @Override
- public void run()
- {
- try
- {
- if (!latch.waitCompletion(5000))
- {
- VariableLatchTest.log.error("Latch timed out");
- }
- }
- catch (Exception e)
- {
- VariableLatchTest.log.error(e);
- }
- waiting = false;
- }
- }
-
- class ThreadAdd extends Thread
- {
- private final CountDownLatch latchReady;
-
- private final CountDownLatch latchStart;
-
- ThreadAdd(final CountDownLatch latchReady, final CountDownLatch latchStart)
- {
- this.latchReady = latchReady;
- this.latchStart = latchStart;
- }
-
- @Override
- public void run()
- {
- try
- {
- latchReady.countDown();
- // Everybody should start at the same time, to worse concurrency
- // effects
- latchStart.await();
- for (int i = 0; i < numberOfAdds; i++)
- {
- latch.up();
- }
- }
- catch (Exception e)
- {
- VariableLatchTest.log.error(e.getMessage(), e);
- }
- }
- }
-
- CountDownLatch latchReady = new CountDownLatch(numberOfThreads);
- CountDownLatch latchStart = new CountDownLatch(1);
-
- ThreadAdd[] threadAdds = new ThreadAdd[numberOfThreads];
- ThreadWait waits[] = new ThreadWait[numberOfThreads];
-
- for (int i = 0; i < numberOfThreads; i++)
- {
- threadAdds[i] = new ThreadAdd(latchReady, latchStart);
- threadAdds[i].start();
- waits[i] = new ThreadWait();
- waits[i].start();
- }
-
- latchReady.await();
- latchStart.countDown();
-
- for (int i = 0; i < numberOfThreads; i++)
- {
- threadAdds[i].join();
- }
-
- for (int i = 0; i < numberOfThreads; i++)
- {
- Assert.assertTrue(waits[i].waiting);
- }
-
- Assert.assertEquals(numberOfThreads * numberOfAdds + 1, latch.getCount());
-
- class ThreadDown extends Thread
- {
- private final CountDownLatch latchReady;
-
- private final CountDownLatch latchStart;
-
- ThreadDown(final CountDownLatch latchReady, final CountDownLatch latchStart)
- {
- this.latchReady = latchReady;
- this.latchStart = latchStart;
- }
-
- @Override
- public void run()
- {
- try
- {
- latchReady.countDown();
- // Everybody should start at the same time, to worse concurrency
- // effects
- latchStart.await();
- for (int i = 0; i < numberOfAdds; i++)
- {
- latch.down();
- }
- }
- catch (Exception e)
- {
- VariableLatchTest.log.error(e.getMessage(), e);
- }
- }
- }
-
- latchReady = new CountDownLatch(numberOfThreads);
- latchStart = new CountDownLatch(1);
-
- ThreadDown down[] = new ThreadDown[numberOfThreads];
-
- for (int i = 0; i < numberOfThreads; i++)
- {
- down[i] = new ThreadDown(latchReady, latchStart);
- down[i].start();
- }
-
- latchReady.await();
- latchStart.countDown();
-
- for (int i = 0; i < numberOfThreads; i++)
- {
- down[i].join();
- }
-
- Assert.assertEquals(1, latch.getCount());
-
- for (int i = 0; i < numberOfThreads; i++)
- {
- Assert.assertTrue(waits[i].waiting);
- }
-
- latch.down();
-
- for (int i = 0; i < numberOfThreads; i++)
- {
- waits[i].join();
- }
-
- Assert.assertEquals(0, latch.getCount());
-
- for (int i = 0; i < numberOfThreads; i++)
- {
- Assert.assertFalse(waits[i].waiting);
- }
- }
-
- public void testReuseLatch() throws Exception
- {
- final VariableLatch latch = new VariableLatch();
- latch.up();
-
- class ThreadWait extends Thread
- {
- private volatile boolean waiting = false;
-
- private volatile Exception e;
-
- private final CountDownLatch readyLatch = new CountDownLatch(1);
-
- @Override
- public void run()
- {
- waiting = true;
- readyLatch.countDown();
- try
- {
- if (!latch.waitCompletion(1000))
- {
- VariableLatchTest.log.error("Latch timed out!", new Exception("trace"));
- }
- }
- catch (Exception e)
- {
- VariableLatchTest.log.error(e);
- this.e = e;
- }
- waiting = false;
- }
- }
-
- ThreadWait t = new ThreadWait();
- t.start();
-
- t.readyLatch.await();
-
- Assert.assertEquals(true, t.waiting);
-
- latch.down();
-
- t.join();
-
- Assert.assertEquals(false, t.waiting);
-
- Assert.assertNull(t.e);
-
- latch.up();
-
- t = new ThreadWait();
- t.start();
-
- t.readyLatch.await();
-
- Assert.assertEquals(true, t.waiting);
-
- latch.down();
-
- t.join();
-
- Assert.assertEquals(false, t.waiting);
-
- Assert.assertNull(t.e);
-
- Assert.assertTrue(latch.waitCompletion(1000));
-
- Assert.assertEquals(0, latch.getCount());
-
- latch.down();
-
- Assert.assertEquals(0, latch.getCount());
-
- }
-
-}
13 years, 10 months
JBoss hornetq SVN: r9459 - trunk/tests/src/org/hornetq/tests/integration/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-07-22 18:39:51 -0400 (Thu, 22 Jul 2010)
New Revision: 9459
Modified:
trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
Log:
Adding a new test on compacting
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-07-22 21:07:01 UTC (rev 9458)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-07-22 22:39:51 UTC (rev 9459)
@@ -951,6 +951,96 @@
}
+ public void testCompactAddAndUpdateFollowedByADelete5() throws Exception
+ {
+
+ setup(2, 60 * 1024, false);
+
+
+ SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
+
+ final VariableLatch reusableLatchDone = new VariableLatch();
+ reusableLatchDone.up();
+ final VariableLatch reusableLatchWait = new VariableLatch();
+ reusableLatchWait.up();
+
+ journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
+ {
+
+ @Override
+ public void onCompactDone()
+ {
+ reusableLatchDone.down();
+ System.out.println("Waiting on Compact");
+ try
+ {
+ reusableLatchWait.waitCompletion();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ System.out.println("Done");
+ }
+ };
+
+ journal.setAutoReclaim(false);
+
+ startJournal();
+ load();
+
+ Thread tCompact = new Thread()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ journal.compact();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ };
+
+
+ long appendTX = idGen.generateID();
+ long appendOne = idGen.generateID();
+ long appendTwo = idGen.generateID();
+
+ long updateTX = idGen.generateID();
+
+ addTx(appendTX, appendOne);
+
+
+ tCompact.start();
+ reusableLatchDone.waitCompletion();
+
+ addTx(appendTX, appendTwo);
+
+ commit(appendTX);
+
+ updateTx(updateTX, appendOne);
+ updateTx(updateTX, appendTwo);
+
+ commit(updateTX);
+ //delete(appendTwo);
+
+ reusableLatchWait.down();
+ tCompact.join();
+
+ journal.compact();
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ }
+
+
public void testSimpleCompacting() throws Exception
{
setup(2, 60 * 1024, false);
13 years, 10 months
JBoss hornetq SVN: r9458 - trunk/src/main/org/hornetq/core/paging/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-07-22 17:07:01 -0400 (Thu, 22 Jul 2010)
New Revision: 9458
Modified:
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
Log:
A little tweak to release internal memory as soon as we're done with reading the messages
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-07-22 13:18:50 UTC (rev 9457)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-07-22 21:07:01 UTC (rev 9458)
@@ -616,7 +616,22 @@
page.open();
- List<PagedMessage> messages = page.read();
+ List<PagedMessage> messages = null;
+
+ try
+ {
+ messages = page.read();
+ }
+ finally
+ {
+ try
+ {
+ page.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
if (onDepage(page.getPageId(), storeName, messages))
{
13 years, 10 months
JBoss hornetq SVN: r9457 - in trunk/src/main/org/hornetq: core/paging and 3 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-07-22 09:18:50 -0400 (Thu, 22 Jul 2010)
New Revision: 9457
Modified:
trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
trunk/src/main/org/hornetq/core/paging/PagingStore.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java
trunk/src/main/org/hornetq/utils/OrderedExecutorFactory.java
Log:
a few tweaks
Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2010-07-22 13:04:29 UTC (rev 9456)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2010-07-22 13:18:50 UTC (rev 9457)
@@ -36,7 +36,6 @@
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.SessionFailureListener;
-import org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.ChannelHandler;
Modified: trunk/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/PagingStore.java 2010-07-22 13:04:29 UTC (rev 9456)
+++ trunk/src/main/org/hornetq/core/paging/PagingStore.java 2010-07-22 13:18:50 UTC (rev 9457)
@@ -15,7 +15,6 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.server.HornetQComponent;
-import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-07-22 13:04:29 UTC (rev 9456)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-07-22 13:18:50 UTC (rev 9457)
@@ -838,6 +838,7 @@
private void setPagingStore(final ServerMessage message) throws Exception
{
PagingStore store = pagingManager.getPageStore(message.getAddress());
+
message.setPagingStore(store);
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java 2010-07-22 13:04:29 UTC (rev 9456)
+++ trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java 2010-07-22 13:18:50 UTC (rev 9457)
@@ -13,6 +13,8 @@
package org.hornetq.core.remoting.impl.netty;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
@@ -49,27 +51,30 @@
private final ConnectionLifeCycleListener listener;
private final boolean batchingEnabled;
-
+
private final boolean directDeliver;
-
- private HornetQBuffer batchBuffer;
-
- private final Object writeLock = new Object();
+ private volatile HornetQBuffer batchBuffer;
+
+ private final AtomicBoolean writeLock = new AtomicBoolean(false);
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public NettyConnection(final Channel channel, final ConnectionLifeCycleListener listener, boolean batchingEnabled, boolean directDeliver)
+ public NettyConnection(final Channel channel,
+ final ConnectionLifeCycleListener listener,
+ boolean batchingEnabled,
+ boolean directDeliver)
{
this.channel = channel;
this.listener = listener;
this.batchingEnabled = batchingEnabled;
-
+
this.directDeliver = directDeliver;
-
+
listener.connectionCreated(this, ProtocolType.CORE);
}
@@ -127,19 +132,26 @@
// This is called periodically to flush the batch buffer
public void checkFlushBatchBuffer()
{
- synchronized (writeLock)
+ if (!batchingEnabled)
{
- if (!batchingEnabled)
- {
- return;
- }
+ return;
+ }
- if (batchBuffer != null && batchBuffer.readable())
+ if (writeLock.compareAndSet(false, true))
+ {
+ try
{
- channel.write(batchBuffer.channelBuffer());
+ if (batchBuffer != null && batchBuffer.readable())
+ {
+ channel.write(batchBuffer.channelBuffer());
- batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
+ batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
+ }
}
+ finally
+ {
+ writeLock.set(false);
+ }
}
}
@@ -150,64 +162,71 @@
public void write(HornetQBuffer buffer, final boolean flush, final boolean batched)
{
- synchronized (writeLock)
+ if (writeLock.compareAndSet(false, true))
{
- if (batchBuffer == null && batchingEnabled && batched && !flush)
+ try
{
- // Lazily create batch buffer
-
- batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
- }
-
- if (batchBuffer != null)
- {
- batchBuffer.writeBytes(buffer, 0, buffer.writerIndex());
-
- if (batchBuffer.writerIndex() >= BATCHING_BUFFER_SIZE || !batched || flush)
+ if (batchBuffer == null && batchingEnabled && batched && !flush)
{
- // If the batch buffer is full or it's flush param or not batched then flush the buffer
+ // Lazily create batch buffer
- buffer = batchBuffer;
+ batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
}
- else
- {
- return;
- }
- if (!batched || flush)
+ if (batchBuffer != null)
{
- batchBuffer = null;
- }
- else
- {
- // Create a new buffer
+ batchBuffer.writeBytes(buffer, 0, buffer.writerIndex());
- batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
+ if (batchBuffer.writerIndex() >= BATCHING_BUFFER_SIZE || !batched || flush)
+ {
+ // If the batch buffer is full or it's flush param or not batched then flush the buffer
+
+ buffer = batchBuffer;
+ }
+ else
+ {
+ return;
+ }
+
+ if (!batched || flush)
+ {
+ batchBuffer = null;
+ }
+ else
+ {
+ // Create a new buffer
+
+ batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
+ }
}
- }
- ChannelFuture future = channel.write(buffer.channelBuffer());
+ ChannelFuture future = channel.write(buffer.channelBuffer());
- if (flush)
- {
- while (true)
+ if (flush)
{
- try
+ while (true)
{
- boolean ok = future.await(10000);
+ try
+ {
+ boolean ok = future.await(10000);
- if (!ok)
+ if (!ok)
+ {
+ NettyConnection.log.warn("Timed out waiting for packet to be flushed");
+ }
+
+ break;
+ }
+ catch (InterruptedException ignore)
{
- NettyConnection.log.warn("Timed out waiting for packet to be flushed");
}
-
- break;
}
- catch (InterruptedException ignore)
- {
- }
}
}
+ finally
+ {
+ writeLock.set(false);
+ }
}
}
@@ -215,7 +234,7 @@
{
return channel.getRemoteAddress().toString();
}
-
+
public boolean isDirectDeliver()
{
return directDeliver;
Modified: trunk/src/main/org/hornetq/utils/OrderedExecutorFactory.java
===================================================================
--- trunk/src/main/org/hornetq/utils/OrderedExecutorFactory.java 2010-07-22 13:04:29 UTC (rev 9456)
+++ trunk/src/main/org/hornetq/utils/OrderedExecutorFactory.java 2010-07-22 13:18:50 UTC (rev 9457)
@@ -16,6 +16,8 @@
import java.util.LinkedList;
import java.util.concurrent.Executor;
+import org.hornetq.core.logging.Logger;
+
/**
* A factory for producing executors that run all tasks in order, which delegate to a single common executor instance.
*
@@ -27,6 +29,8 @@
*/
public final class OrderedExecutorFactory implements ExecutorFactory
{
+ private static final Logger log = Logger.getLogger(OrderedExecutorFactory.class);
+
private final Executor parent;
/**
@@ -97,7 +101,7 @@
}
catch (Throwable t)
{
- // eat it!
+ log.error("Caught unexpected Throwable", t);
}
}
}
13 years, 10 months
JBoss hornetq SVN: r9456 - in trunk: tests/src/org/hornetq/tests/integration/client and 1 other directory.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-07-22 09:04:29 -0400 (Thu, 22 Jul 2010)
New Revision: 9456
Added:
trunk/tests/src/org/hornetq/tests/integration/client/ConsumerFilterTest.java
Modified:
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
https://jira.jboss.org/browse/HORNETQ-444
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-07-22 13:01:25 UTC (rev 9455)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-07-22 13:04:29 UTC (rev 9456)
@@ -56,6 +56,8 @@
/**
* Implementation of a Queue
+ *
+ * Completely non blocking between adding to queue and delivering to consumers.
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="ataylor(a)redhat.com">Andy Taylor</a>
@@ -69,6 +71,8 @@
public static final int REDISTRIBUTOR_BATCH_SIZE = 100;
public static final int NUM_PRIORITIES = 10;
+
+ public static final int MAX_DELIVERIES_IN_LOOP = 1000;
private final long id;
@@ -121,6 +125,8 @@
private int pos;
private final Executor executor;
+
+ private volatile int consumerWithFilterCount;
private static class ConsumerHolder
{
@@ -228,7 +234,7 @@
{
return filter;
}
-
+
public void addLast(final MessageReference ref)
{
addLast(ref, false);
@@ -265,6 +271,11 @@
{
cancelRedistributor();
+ if (consumer.getFilter() != null)
+ {
+ consumerWithFilterCount++;
+ }
+
consumerList.add(new ConsumerHolder(consumer));
consumerSet.add(consumer);
@@ -307,6 +318,11 @@
{
groups.remove(gid);
}
+
+ if (consumer.getFilter() != null)
+ {
+ consumerWithFilterCount--;
+ }
}
public synchronized void addRedistributor(final long delay)
@@ -1088,17 +1104,19 @@
{
return;
}
-
+
int busyCount = 0;
int nullRefCount = 0;
+ int noMatchCount = 0;
+
int size = consumerList.size();
int startPos = pos;
// Deliver at most 1000 messages in one go, to prevent tying this thread up for too long
- int loop = Math.min(messageReferences.size(), 1000);
+ int loop = Math.min(messageReferences.size(), MAX_DELIVERIES_IN_LOOP);
for (int i = 0; i < loop; i++)
{
@@ -1120,6 +1138,11 @@
if (ref == null)
{
nullRefCount++;
+
+ if (holder.iter != null)
+ {
+ noMatchCount++;
+ }
}
else
{
@@ -1207,18 +1230,17 @@
break;
}
- nullRefCount = busyCount = 0;
+ nullRefCount = busyCount = noMatchCount = 0;
}
}
-
- if (messageReferences.size() > 0 && busyCount != size)
+
+ if (messageReferences.size() > 0 && busyCount != size && noMatchCount != size)
{
// More messages to deliver so need to prompt another runner - note we don't
// prompt another one if all consumers are busy
executor.execute(deliverRunner);
}
-
}
/*
@@ -1230,14 +1252,14 @@
{
return false;
}
-
+
if (checkExpired(ref))
{
return true;
}
-
+
int startPos = pos;
-
+
int size = consumerList.size();
while (true)
@@ -1261,7 +1283,7 @@
consumer = groupConsumer;
}
}
-
+
pos++;
if (pos == size)
@@ -1272,12 +1294,12 @@
HandleStatus status = handle(ref, consumer);
if (status == HandleStatus.HANDLED)
- {
+ {
if (groupID != null && groupConsumer == null)
{
groups.put(groupID, consumer);
}
-
+
return true;
}
@@ -1344,9 +1366,11 @@
* unnecessarily queued up
* During delivery toDeliver is decremented before the message is delivered, therefore if it's delivering the last
* message, then we cannot have a situation where this delivery is not prompted and message remains stranded in the
- * queue
+ * queue.
+ * The exception to this is if we have consumers with filters - these will maintain an iterator, so we need to prompt delivery every time
+ * in this case, since there may be many non matching messages already in the queue
*/
- if (refs == 1)
+ if (consumerWithFilterCount > 0 || refs == 1)
{
deliverAsync();
}
Added: trunk/tests/src/org/hornetq/tests/integration/client/ConsumerFilterTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ConsumerFilterTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ConsumerFilterTest.java 2010-07-22 13:04:29 UTC (rev 9456)
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.client;
+
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.impl.QueueImpl;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ *
+ * A ConsumerFilterTest
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class ConsumerFilterTest extends ServiceTestBase
+{
+ private static final Logger log = Logger.getLogger(ConsumerFilterTest.class);
+
+ private HornetQServer server;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ server = createServer(false);
+
+ server.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ server.stop();
+
+ server = null;
+
+ super.tearDown();
+ }
+
+ public void testNonMatchingMessagesFollowedByMatchingMessages() throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ ClientSession session = sf.createSession();
+
+ session.start();
+
+ session.createQueue("foo", "foo");
+
+ ClientProducer producer = session.createProducer("foo");
+
+ ClientConsumer consumer = session.createConsumer("foo", "animal='giraffe'");
+
+ ClientMessage message = session.createMessage(false);
+
+ message.putStringProperty("animal", "hippo");
+
+ producer.send(message);
+
+ assertNull(consumer.receive(500));
+
+ message = session.createMessage(false);
+
+ message.putStringProperty("animal", "giraffe");
+
+ log.info("sending second msg");
+
+ producer.send(message);
+
+ ClientMessage received = consumer.receive(500);
+
+ assertNotNull(received);
+
+ assertEquals("giraffe", received.getStringProperty("animal"));
+
+ assertNull(consumer.receive(500));
+
+ session.close();
+ }
+
+ public void testNonMatchingMessagesFollowedByMatchingMessagesMany() throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ ClientSession session = sf.createSession();
+
+ session.start();
+
+ session.createQueue("foo", "foo");
+
+ ClientProducer producer = session.createProducer("foo");
+
+ ClientConsumer consumer = session.createConsumer("foo", "animal='giraffe'");
+
+ for (int i = 0; i < QueueImpl.MAX_DELIVERIES_IN_LOOP * 2; i++)
+ {
+
+ ClientMessage message = session.createMessage(false);
+
+ message.putStringProperty("animal", "hippo");
+
+ producer.send(message);
+ }
+
+ assertNull(consumer.receive(500));
+
+ for (int i = 0; i < QueueImpl.MAX_DELIVERIES_IN_LOOP * 2; i++)
+ {
+ ClientMessage message = session.createMessage(false);
+
+ message.putStringProperty("animal", "giraffe");
+
+ producer.send(message);
+ }
+
+ for (int i = 0; i < QueueImpl.MAX_DELIVERIES_IN_LOOP * 2; i++)
+ {
+ ClientMessage received = consumer.receive(500);
+
+ assertNotNull(received);
+
+ assertEquals("giraffe", received.getStringProperty("animal"));
+ }
+
+ assertNull(consumer.receive(500));
+
+ session.close();
+ }
+}
13 years, 10 months
JBoss hornetq SVN: r9455 - in branches/2_2_0_HA_Improvements/src/main/org/hornetq: core/client/impl and 4 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-07-22 09:01:25 -0400 (Thu, 22 Jul 2010)
New Revision: 9455
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HA refactoring
* fix receiving list of initial connectors when using static connectors
* clean up ServerLocator interface and move topology-related methods to ServerLocatorInternal
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java 2010-07-22 12:59:20 UTC (rev 9454)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java 2010-07-22 13:01:25 UTC (rev 9455)
@@ -15,7 +15,6 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy;
@@ -656,14 +655,6 @@
* Closes this factory and release all its resources
*/
void close();
-
- void registerTopologyListener(ClusterTopologyListener listener);
-
- void unregisterTopologyListener(ClusterTopologyListener listener);
-
- void notifyNodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
-
- void notifyNodeDown(String nodeID);
boolean isHA();
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-07-22 12:59:20 UTC (rev 9454)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-07-22 13:01:25 UTC (rev 9455)
@@ -1160,7 +1160,7 @@
}
else
{
- serverLocator.notifyNodeUP(topMessage.getNodeID(), topMessage.getPair(), topMessage.isLast());
+ serverLocator.notifyNodeUp(topMessage.getNodeID(), topMessage.getPair(), topMessage.isLast());
}
}
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-07-22 12:59:20 UTC (rev 9454)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-07-22 13:01:25 UTC (rev 9455)
@@ -297,16 +297,6 @@
discoveryGroup.start();
}
-
- if (initialConnectors != null)
- {
- System.out.println(">>>>>>>> Static initial connectors = " + Arrays.asList(initialConnectors));
- for (int i = 0; i < initialConnectors.length; i++)
- {
- // FIXME and now what do I do?
- TransportConfiguration connector = initialConnectors[i];
- }
- }
readOnly = true;
}
@@ -432,6 +422,36 @@
initialise();
}
+ public void connect()
+ {
+ if (initialConnectors != null)
+ {
+ for (TransportConfiguration connector : initialConnectors)
+ {
+ ClientSessionFactory sf = null;
+ do
+ {
+ try
+ {
+ sf = createSessionFactory(connector);
+ }
+ catch (HornetQException e)
+ {
+ if (e.getCode() == HornetQException.NOT_CONNECTED)
+ {
+ continue;
+ }
+ }
+ catch (Exception e)
+ {
+ break;
+ }
+ }
+ while (sf == null);
+ }
+ }
+ }
+
public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
{
if (closed)
@@ -1069,7 +1089,7 @@
}
}
- public synchronized void notifyNodeUP(final String nodeID,
+ public synchronized void notifyNodeUp(final String nodeID,
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
final boolean last)
{
@@ -1125,7 +1145,7 @@
{
this.initialConnectors[count++] = entry.getConnector();
- notifyNodeUP(entry.getNodeID(), new Pair<TransportConfiguration, TransportConfiguration>(entry.getConnector(), null), true);
+ notifyNodeUp(entry.getNodeID(), new Pair<TransportConfiguration, TransportConfiguration>(entry.getConnector(), null), true);
}
System.out.println(">>>>>>>> Discovered initial connectors= " + Arrays.asList(initialConnectors));
@@ -1146,12 +1166,12 @@
}
}
- public void registerTopologyListener(final ClusterTopologyListener listener)
+ public void addClusterTopologyListener(final ClusterTopologyListener listener)
{
topologyListeners.add(listener);
}
- public void unregisterTopologyListener(final ClusterTopologyListener listener)
+ public void removeClusterTopologyListener(final ClusterTopologyListener listener)
{
topologyListeners.remove(listener);
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-07-22 12:59:20 UTC (rev 9454)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-07-22 13:01:25 UTC (rev 9455)
@@ -13,8 +13,10 @@
package org.hornetq.core.client.impl;
+import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.ServerLocator;
/**
@@ -34,4 +36,13 @@
void setNodeID(String nodeID);
+ void connect();
+
+ void addClusterTopologyListener(ClusterTopologyListener listener);
+
+ void removeClusterTopologyListener(ClusterTopologyListener listener);
+
+ void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
+
+ void notifyNodeDown(String nodeID);
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-07-22 12:59:20 UTC (rev 9454)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-07-22 13:01:25 UTC (rev 9455)
@@ -30,7 +30,6 @@
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.Ping;
import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
import org.hornetq.core.remoting.CloseListener;
@@ -123,13 +122,13 @@
final boolean isCC = msg.isClusterConnection();
- server.getClusterManager().registerTopologyListener(listener, isCC);
+ server.getClusterManager().addClusterTopologyListener(listener, isCC);
rc.addCloseListener(new CloseListener()
{
public void connectionClosed()
{
- server.getClusterManager().unregisterTopologyListener(listener, isCC);
+ server.getClusterManager().removeClusterTopologyListener(listener, isCC);
}
});
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2010-07-22 12:59:20 UTC (rev 9454)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2010-07-22 13:01:25 UTC (rev 9455)
@@ -53,4 +53,7 @@
Pair<TransportConfiguration, TransportConfiguration>[] getTopology();
TransportConfiguration getConnector();
+
+ // for debug
+ String description();
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-07-22 12:59:20 UTC (rev 9454)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-07-22 13:01:25 UTC (rev 9455)
@@ -16,9 +16,7 @@
import java.util.Map;
import java.util.Set;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.core.server.HornetQComponent;
@@ -40,13 +38,9 @@
Set<BroadcastGroup> getBroadcastGroups();
- void notifyNodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
+ void addClusterTopologyListener(ClusterTopologyListener listener, boolean clusterConnection);
- void notifyNodeDown(String nodeID);
-
- void registerTopologyListener(ClusterTopologyListener listener, boolean clusterConnection);
+ void removeClusterTopologyListener(ClusterTopologyListener listener, boolean clusterConnection);
- void unregisterTopologyListener(ClusterTopologyListener listener, boolean clusterConnection);
-
void activate();
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2010-07-22 12:59:20 UTC (rev 9454)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2010-07-22 13:01:25 UTC (rev 9455)
@@ -485,7 +485,7 @@
do
{
- BridgeImpl.log.info("Connecting bridge " + name + " to its destination");
+ BridgeImpl.log.info("Connecting bridge " + name + " to its destination [" + nodeUUID.toString() + "]");
try
{
@@ -510,7 +510,7 @@
queue.addConsumer(BridgeImpl.this);
queue.deliverAsync();
- BridgeImpl.log.info("Bridge " + name + " is connected to its destination");
+ BridgeImpl.log.info("Bridge " + name + " is connected [" + nodeUUID + "-> " + name +"]");
return true;
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-07-22 12:59:20 UTC (rev 9454)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-07-22 13:01:25 UTC (rev 9455)
@@ -164,9 +164,18 @@
return;
}
- serverLocator.registerTopologyListener(this);
+ serverLocator.addClusterTopologyListener(this);
serverLocator.start();
+ // FIXME Ugly ugly code to connect to other nodes and form the cluster... :(
+ server.getExecutorFactory().getExecutor().execute(new Runnable()
+ {
+ public void run()
+ {
+ serverLocator.connect();
+ }
+ });
+
started = true;
if (managementService != null)
@@ -187,7 +196,7 @@
return;
}
- serverLocator.unregisterTopologyListener(this);
+ serverLocator.removeClusterTopologyListener(this);
for (MessageFlowRecord record : records.values())
{
@@ -334,7 +343,6 @@
final Queue queue,
final boolean start) throws Exception
{
- System.out.println("ClusterConnectionImpl.createNewRecord() " + connector);
MessageFlowRecordImpl record = new MessageFlowRecordImpl(queue);
Bridge bridge = new ClusterConnectionBridge(serverLocator,
@@ -801,4 +809,18 @@
{
return records;
}
+
+ public String description()
+ {
+ String out = name + " connected to\n";
+ for (Entry<String, MessageFlowRecord> messageFlow : records.entrySet())
+ {
+ String nodeID = messageFlow.getKey();
+ Bridge bridge = messageFlow.getValue().getBridge();
+
+ out += "\t" + nodeID + " -- " + bridge.isStarted() + "\n";
+ }
+
+ return out;
+ }
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-07-22 12:59:20 UTC (rev 9454)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-07-22 13:01:25 UTC (rev 9455)
@@ -91,6 +91,13 @@
private final boolean clustered;
+ // FIXME why do we distinguish between client listeners and cluster connection listeners?
+ // They are both notified at the same time...
+ private Set<ClusterTopologyListener> clientListeners = new ConcurrentHashSet<ClusterTopologyListener>();
+ private Set<ClusterTopologyListener> clusterConnectionListeners = new ConcurrentHashSet<ClusterTopologyListener>();
+
+ private Map<String, Pair<TransportConfiguration, TransportConfiguration>> topology = new HashMap<String, Pair<TransportConfiguration,TransportConfiguration>>();
+
public ClusterManagerImpl(final ExecutorFactory executorFactory,
final HornetQServer server,
final PostOffice postOffice,
@@ -221,13 +228,7 @@
return clusterConnections.get(name.toString());
}
- private Set<ClusterTopologyListener> clientListeners = new ConcurrentHashSet<ClusterTopologyListener>();
-
- private Set<ClusterTopologyListener> clusterConnectionListeners = new ConcurrentHashSet<ClusterTopologyListener>();
-
- private Map<String, Pair<TransportConfiguration, TransportConfiguration>> topology = new HashMap<String, Pair<TransportConfiguration,TransportConfiguration>>();
-
- public synchronized void registerTopologyListener(final ClusterTopologyListener listener,
+ public synchronized void addClusterTopologyListener(final ClusterTopologyListener listener,
final boolean clusterConnection)
{
if (clusterConnection)
@@ -248,7 +249,7 @@
}
}
- public synchronized void unregisterTopologyListener(final ClusterTopologyListener listener,
+ public synchronized void removeClusterTopologyListener(final ClusterTopologyListener listener,
final boolean clusterConnection)
{
if (clusterConnection)
@@ -325,28 +326,6 @@
}
- public synchronized void notifyNodeDown(final String nodeID)
- {
- topology.remove(nodeID);
-
- for (ClusterTopologyListener listener : clientListeners)
- {
- listener.nodeDown(nodeID);
- }
- }
-
- public synchronized void notifyNodeUP(final String nodeID,
- final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- final boolean last)
- {
- topology.put(nodeID, connectorPair);
-
- for (ClusterTopologyListener listener : clientListeners)
- {
- listener.nodeUP(nodeID, connectorPair, false);
- }
- }
-
private synchronized void deployBroadcastGroup(final BroadcastGroupConfiguration config) throws Exception
{
if (broadcastGroups.containsKey(config.getName()))
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-07-22 12:59:20 UTC (rev 9454)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-07-22 13:01:25 UTC (rev 9455)
@@ -146,7 +146,7 @@
private volatile SimpleString nodeID;
private volatile UUID uuid;
-
+
private final Version version;
private final HornetQSecurityManager securityManager;
@@ -736,7 +736,7 @@
started = true;
- HornetQServerImpl.log.info("HornetQ Server version " + getVersion().getFullVersion() + " started");
+ HornetQServerImpl.log.info("HornetQ Server version " + getVersion().getFullVersion() + " [" + nodeID + "]�started");
if (configuration.isBackup())
{
@@ -909,7 +909,7 @@
backupActivationThread.join();
}
- HornetQServerImpl.log.info("HornetQ Server version " + getVersion().getFullVersion() + " stopped");
+ HornetQServerImpl.log.info("HornetQ Server version " + getVersion().getFullVersion() + " [" + nodeID + "]�stopped");
Logger.reset();
}
@@ -1586,12 +1586,11 @@
// when the cluster manager is started, it will form a cluster -> other nodes will then create bridges
// to connect to this server. If the remoting service is not started before, the connection will fail
// and the cluster will not be formed...
+ initialised = true;
+
remotingService.start();
clusterManager.start();
-
- initialised = true;
-
}
/**
13 years, 10 months
JBoss hornetq SVN: r9454 - branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-07-22 08:59:20 -0400 (Thu, 22 Jul 2010)
New Revision: 9454
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
HA refactoring
* fix clustered tests configuration
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-07-22 04:54:11 UTC (rev 9453)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-07-22 12:59:20 UTC (rev 9454)
@@ -14,6 +14,7 @@
package org.hornetq.tests.integration.cluster.distribution;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -40,11 +41,15 @@
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.postoffice.QueueBinding;
import org.hornetq.core.postoffice.impl.LocalQueueBinding;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
+import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
import org.hornetq.core.server.cluster.ClusterConnection;
+import org.hornetq.core.server.cluster.ClusterManager;
import org.hornetq.core.server.cluster.RemoteQueueBinding;
import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
@@ -737,6 +742,26 @@
}
}
}
+
+ protected String clusterDescription(HornetQServer server)
+ {
+ String br = "-------------------------\n";
+ String out = br;
+ out += "HornetQ server " + server.getNodeID() + "\n";
+ ClusterManager clusterManager = server.getClusterManager();
+ if (clusterManager == null)
+ {
+ out += "N/A";
+ }
+ else
+ {
+ for (ClusterConnection cc : clusterManager.getClusterConnections())
+ {
+ out += cc.description() + "\n";
+ }
+ }
+ return out + br;
+ }
protected void verifyReceiveAll(final boolean ack, final int numMessages, final int... consumerIDs) throws Exception
{
@@ -1377,7 +1402,35 @@
return params;
}
+
+ protected static TransportConfiguration createTransportConfiguration(boolean netty, boolean acceptor, Map<String, Object> params)
+ {
+ String className;
+ if (netty)
+ {
+ if (acceptor)
+ {
+ className = NettyAcceptorFactory.class.getName();
+ }
+ else
+ {
+ className = NettyConnectorFactory.class.getName();
+ }
+ } else
+ {
+ if (acceptor)
+ {
+ className = InVMAcceptorFactory.class.getName();
+ }
+ else
+ {
+ className = InVMConnectorFactory.class.getName();
+ }
+ }
+ return new TransportConfiguration(className, params);
+ }
+
protected void clearServer(final int... nodes)
{
for (int i = 0; i < nodes.length; i++)
@@ -1413,27 +1466,13 @@
{
throw new IllegalStateException("No server at node " + nodeFrom);
}
+
+ TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
+ serverFrom.getConfiguration().getConnectorConfigurations().put(name, connectorFrom);
- // Map<String, TransportConfiguration> connectors = serviceFrom
- // .getConfiguration()
- // .getConnectorConfigurations();
-
- Map<String, Object> params = generateParams(nodeTo, netty);
-
- TransportConfiguration serverTotc;
-
- if (netty)
- {
- serverTotc = new TransportConfiguration(ServiceTestBase.NETTY_CONNECTOR_FACTORY, params);
- }
- else
- {
- serverTotc = new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY, params);
- }
-
+ TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(nodeTo, netty));
serverFrom.getConfiguration().getConnectorConfigurations().put(serverTotc.getName(), serverTotc);
-
List<String> pairs = new ArrayList<String>();
pairs.add(serverTotc.getName());
@@ -1449,6 +1488,7 @@
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
+
protected void setupClusterConnection(final String name,
final String address,
final boolean forwardWhenNoConsumers,
@@ -1464,35 +1504,20 @@
throw new IllegalStateException("No server at node " + nodeFrom);
}
- Map<String, TransportConfiguration> connectors = serverFrom.getConfiguration().getConnectorConfigurations();
-
+ TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
+ serverFrom.getConfiguration().getConnectorConfigurations().put(connectorFrom.getName(), connectorFrom);
+
List<String> pairs = new ArrayList<String>();
- TransportConfiguration configuration = serverFrom.getConfiguration().getAcceptorConfigurations().iterator().next();
- String connectorName = configuration.getName();
- connectors.put(connectorName, configuration);
-
for (int element : nodesTo)
{
- Map<String, Object> params = generateParams(element, netty);
-
- TransportConfiguration serverTotc;
- if (netty)
- {
- serverTotc = new TransportConfiguration(ServiceTestBase.NETTY_CONNECTOR_FACTORY, params);
- }
- else
- {
- serverTotc = new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY, params);
- }
-
- connectors.put(serverTotc.getName(), serverTotc);
-
+ TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(element, netty));
+ serverFrom.getConfiguration().getConnectorConfigurations().put(serverTotc.getName(), serverTotc);
pairs.add(serverTotc.getName());
}
ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
address,
- connectorName,
+ connectorFrom.getName(),
250,
true,
forwardWhenNoConsumers,
@@ -1588,6 +1613,9 @@
throw new IllegalStateException("No server at node " + node);
}
+ TransportConfiguration connectorConfig = createTransportConfiguration(netty, false, generateParams(node, netty));
+ server.getConfiguration().getConnectorConfigurations().put(name, connectorConfig);
+
ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
address,
name,
13 years, 10 months