Author: clebert.suconic(a)jboss.com
Date: 2011-05-28 12:34:41 -0400 (Sat, 28 May 2011)
New Revision: 10752
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/utils/PriorityLinkedListImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/list/impl/PriorityLinkedListTestBase.java
Log:
https://issues.jboss.org/browse/HORNETQ-705 - fixing queue iterations with multiple
priorities
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java 2011-05-28
05:06:01 UTC (rev 10751)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java 2011-05-28
16:34:41 UTC (rev 10752)
@@ -44,8 +44,6 @@
void activate();
- void setQueue(Queue queue);
-
void setNotificationService(NotificationService notificationService);
RemotingConnection getForwardingConnection();
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-05-28
05:06:01 UTC (rev 10751)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-05-28
16:34:41 UTC (rev 10752)
@@ -64,6 +64,8 @@
// Constants -----------------------------------------------------
private static final Logger log = Logger.getLogger(BridgeImpl.class);
+
+ private static final boolean isTrace = log.isTraceEnabled();
// Attributes ----------------------------------------------------
@@ -77,7 +79,7 @@
private final SimpleString name;
- private Queue queue;
+ private final Queue queue;
protected final Executor executor;
@@ -222,6 +224,8 @@
}
}
+ log.info("Bridge " + this.name + " being stopped");
+
stopping = true;
executor.execute(new StopRunnable());
@@ -266,11 +270,6 @@
return queue;
}
- public void setQueue(final Queue queue)
- {
- this.queue = queue;
- }
-
public Filter getFilter()
{
return filter;
@@ -367,20 +366,25 @@
{
return HandleStatus.NO_MATCH;
}
-
+
synchronized (this)
{
if (!active)
{
+ log.debug(name + "::Ignoring reference on bridge as it is set to
iniactive ref=" + ref);
return HandleStatus.BUSY;
}
+ if (isTrace)
+ {
+ log.trace("Bridge " + name + " is handling reference=" +
ref);
+ }
ref.handled();
ServerMessage message = ref.getMessage();
refs.add(ref);
-
+
message = beforeForward(message);
SimpleString dest;
@@ -419,11 +423,13 @@
public void connectionFailed(final HornetQException me, boolean failedOver)
{
+ log.warn(name + "::Connection failed with failedOver=" + failedOver,
me);
fail(false);
}
public void beforeReconnect(final HornetQException exception)
{
+ log.warn(name + "::Connection failed before reconnect ", exception);
fail(true);
}
@@ -454,8 +460,11 @@
// we want to cancel all unacked refs so they get resent
// duplicate detection will ensure no dups are routed on the other side
+ log.debug(name + "::BridgeImpl::fail being called, beforeReconnect=" +
beforeReconnect);
+
if (session.getConnection().isDestroyed())
{
+ log.debug(name + "::Connection is destroyed, active = false now");
active = false;
}
@@ -467,7 +476,7 @@
{
synchronized (this)
{
- active = false;
+ log.debug(name + "::Connection is destroyed, active = false
now");
}
cancelRefs();
@@ -476,6 +485,7 @@
{
afterConnect();
+ log.debug(name + "::After reconnect, setting active=true now");
active = true;
if (queue != null)
@@ -650,6 +660,8 @@
{
return;
}
+
+ log.debug("Closing Session for bridge " +
BridgeImpl.this.name);
if (session != null)
{
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-05-28
05:06:01 UTC (rev 10751)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-05-28
16:34:41 UTC (rev 10752)
@@ -16,8 +16,12 @@
import static org.hornetq.api.core.management.NotificationType.CONSUMER_CLOSED;
import static org.hornetq.api.core.management.NotificationType.CONSUMER_CREATED;
-import java.util.*;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
@@ -29,7 +33,6 @@
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.ServerLocatorInternal;
-import org.hornetq.core.client.impl.TopologyMember;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
@@ -272,6 +275,8 @@
{
serverLocator.removeClusterTopologyListener(this);
}
+
+ log.debug("Cluster connection being stopped for node" + nodeUUID);
synchronized (this)
{
@@ -357,6 +362,8 @@
serverLocator.setBackup(server.getConfiguration().isBackup());
serverLocator.setInitialConnectAttempts(-1);
serverLocator.setConfirmationWindowSize(0);
+ serverLocator.setBlockOnDurableSend(false);
+ serverLocator.setBlockOnNonDurableSend(false);
if(retryInterval > 0)
{
@@ -388,6 +395,7 @@
public synchronized void nodeDown(final String nodeID)
{
+ log.debug("node " + nodeID + " being considered down on cluster
connection for nodeID=" + nodeUUID);
if (nodeID.equals(nodeUUID.toString()))
{
return;
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-05-28
05:06:01 UTC (rev 10751)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-05-28
16:34:41 UTC (rev 10752)
@@ -22,7 +22,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
-
import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
@@ -93,7 +92,7 @@
// regular client listeners to be notified of cluster topology changes.
// they correspond to regular clients using a HA ServerLocator
private Set<ClusterTopologyListener> clientListeners = new
ConcurrentHashSet<ClusterTopologyListener>();
-
+
// cluster connections listeners to be notified of cluster topology changes
// they correspond to cluster connections on *other nodes connected to this one*
private Set<ClusterTopologyListener> clusterConnectionListeners = new
ConcurrentHashSet<ClusterTopologyListener>();
@@ -103,6 +102,7 @@
private volatile ServerLocatorInternal backupServerLocator;
private final List<ServerLocatorInternal> clusterLocators = new
ArrayList<ServerLocatorInternal>();
+
private Executor executor;
public ClusterManagerImpl(final ExecutorFactory executorFactory,
@@ -173,11 +173,10 @@
{
announceNode();
}
-
+
started = true;
}
-
public synchronized void stop() throws Exception
{
if (!started)
@@ -200,7 +199,7 @@
clusterConnection.stop();
managementService.unregisterCluster(clusterConnection.getName().toString());
}
-
+
clusterConnectionListeners.clear();
clientListeners.clear();
clusterConnections.clear();
@@ -216,7 +215,7 @@
bridges.clear();
- if(backupServerLocator != null)
+ if (backupServerLocator != null)
{
backupServerLocator.close();
backupServerLocator = null;
@@ -238,7 +237,7 @@
}
boolean removed = topology.removeMember(nodeID);
-
+
if (removed)
{
@@ -262,23 +261,23 @@
TopologyMember member = new TopologyMember(connectorPair);
boolean updated = topology.addMember(nodeID, member);
- if(!updated)
+ if (!updated)
{
return;
}
-
+
for (ClusterTopologyListener listener : clientListeners)
{
listener.nodeUP(nodeID, member.getConnector(), last);
}
-
for (ClusterTopologyListener listener : clusterConnectionListeners)
{
listener.nodeUP(nodeID, member.getConnector(), last);
}
- //if this is a node being announced we are hearing it direct from the nodes CM so
need to inform our cluster connections.
+ // if this is a node being announced we are hearing it direct from the nodes CM so
need to inform our cluster
+ // connections.
if (nodeAnnounce)
{
for (ClusterConnection clusterConnection : clusterConnections.values())
@@ -287,7 +286,7 @@
}
}
}
-
+
public boolean isStarted()
{
return started;
@@ -313,8 +312,7 @@
return clusterConnections.get(name.toString());
}
- public void addClusterTopologyListener(final ClusterTopologyListener listener,
- final boolean clusterConnection)
+ public void addClusterTopologyListener(final ClusterTopologyListener listener, final
boolean clusterConnection)
{
synchronized (this)
{
@@ -333,7 +331,7 @@
}
public synchronized void removeClusterTopologyListener(final ClusterTopologyListener
listener,
- final boolean clusterConnection)
+ final boolean
clusterConnection)
{
if (clusterConnection)
{
@@ -349,9 +347,9 @@
{
return topology;
}
-
+
// backup node becomes live
- public synchronized void activate()
+ public synchronized void activate()
{
if (backup)
{
@@ -360,7 +358,7 @@
String nodeID = server.getNodeID().toString();
TopologyMember member = topology.getMember(nodeID);
- //we swap the topology backup now = live
+ // we swap the topology backup now = live
if (member != null)
{
member.getConnector().a = member.getConnector().b;
@@ -368,9 +366,9 @@
member.getConnector().b = null;
}
- if(backupServerLocator != null)
+ if (backupServerLocator != null)
{
- //todo we could use the topology of this to preempt it arriving from the cc
+ // todo we could use the topology of this to preempt it arriving from the cc
try
{
backupServerLocator.close();
@@ -434,7 +432,7 @@
public void announceBackup() throws Exception
{
List<ClusterConnectionConfiguration> configs =
this.configuration.getClusterConfigurations();
- if(!configs.isEmpty())
+ if (!configs.isEmpty())
{
ClusterConnectionConfiguration config = configs.get(0);
@@ -442,8 +440,7 @@
if (connector == null)
{
- log.warn("No connecor with name '" + config.getConnectorName()
+
- "'. backup cannot be announced.");
+ log.warn("No connecor with name '" + config.getConnectorName()
+ "'. backup cannot be announced.");
return;
}
announceBackup(config, connector);
@@ -469,11 +466,13 @@
{
if (backup)
{
- member = new TopologyMember(new Pair<TransportConfiguration,
TransportConfiguration>(null, cc.getConnector()));
+ member = new TopologyMember(new Pair<TransportConfiguration,
TransportConfiguration>(null,
+
cc.getConnector()));
}
else
{
- member = new TopologyMember(new Pair<TransportConfiguration,
TransportConfiguration>(cc.getConnector(), null));
+ member = new TopologyMember(new Pair<TransportConfiguration,
TransportConfiguration>(cc.getConnector(),
+
null));
}
topology.addMember(nodeID, member);
@@ -482,11 +481,11 @@
{
if (backup)
{
- // pair.b = cc.getConnector();
+ // pair.b = cc.getConnector();
}
else
{
- // pair.a = cc.getConnector();
+ // pair.a = cc.getConnector();
}
}
@@ -496,7 +495,7 @@
{
listener.nodeUP(nodeID, member.getConnector(), false);
}
-
+
for (ClusterTopologyListener listener : clusterConnectionListeners)
{
listener.nodeUP(nodeID, member.getConnector(), false);
@@ -685,6 +684,8 @@
serverLocator.setRetryIntervalMultiplier(config.getRetryIntervalMultiplier());
serverLocator.setClientFailureCheckPeriod(config.getClientFailureCheckPeriod());
serverLocator.setInitialConnectAttempts(config.getReconnectAttempts());
+ serverLocator.setBlockOnDurableSend(false);
+ serverLocator.setBlockOnNonDurableSend(false);
clusterLocators.add(serverLocator);
Bridge bridge = new BridgeImpl(serverLocator,
nodeUUID,
@@ -720,7 +721,7 @@
managementService.unregisterBridge(name);
}
}
-
+
private synchronized void deployClusterConnection(final ClusterConnectionConfiguration
config) throws Exception
{
if (config.getName() == null)
@@ -746,10 +747,10 @@
return;
}
-
- if(clusterConnections.containsKey(config.getName()))
+ if (clusterConnections.containsKey(config.getName()))
{
- log.warn("Cluster Configuration '" + config.getConnectorName() +
"' already exists. The cluster connection will not be deployed.", new
Exception ("trace"));
+ log.warn("Cluster Configuration '" + config.getConnectorName() +
+ "' already exists. The cluster connection will not be
deployed.", new Exception("trace"));
return;
}
@@ -788,7 +789,8 @@
}
else
{
- TransportConfiguration[] tcConfigs = config.getStaticConnectors() != null?
connectorNameListToArray(config.getStaticConnectors()):null;
+ TransportConfiguration[] tcConfigs = config.getStaticConnectors() != null ?
connectorNameListToArray(config.getStaticConnectors())
+ :
null;
clusterConnection = new ClusterConnectionImpl(tcConfigs,
connector,
@@ -816,8 +818,8 @@
clusterConnections.put(config.getName(), clusterConnection);
clusterConnection.start();
-
- if(backup)
+
+ if (backup)
{
announceBackup(config, connector);
}
@@ -860,13 +862,15 @@
ClientSessionFactory backupSessionFactory =
backupServerLocator.connect();
if (backupSessionFactory != null)
{
- backupSessionFactory.getConnection().getChannel(0, -1).send(new
NodeAnnounceMessage(nodeUUID.toString(), true, connector));
+ backupSessionFactory.getConnection()
+ .getChannel(0, -1)
+ .send(new NodeAnnounceMessage(nodeUUID.toString(),
true, connector));
log.info("backup announced");
}
}
catch (Exception e)
{
- log.warn("Unable to announce backup", e);
+ log.warn("Unable to announce backup", e);
}
}
});
@@ -892,7 +896,8 @@
}
return transformer;
}
- //for testing
+
+ // for testing
public void clear()
{
bridges.clear();
@@ -904,7 +909,7 @@
}
catch (Exception e)
{
- e.printStackTrace(); //To change body of catch statement use File | Settings
| File Templates.
+ e.printStackTrace();
}
}
clusterConnections.clear();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-05-28
05:06:01 UTC (rev 10751)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-05-28
16:34:41 UTC (rev 10752)
@@ -393,7 +393,7 @@
{
return;
}
-
+
queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
concurrentQueue.add(ref);
@@ -2055,7 +2055,7 @@
}
catch (Exception e)
{
- QueueImpl.log.warn("Unable to decrement reference counting", e);
+ QueueImpl.log.warn("Unable to decrement reference counting", e);
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/utils/PriorityLinkedListImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/utils/PriorityLinkedListImpl.java 2011-05-28
05:06:01 UTC (rev 10751)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/utils/PriorityLinkedListImpl.java 2011-05-28
16:34:41 UTC (rev 10752)
@@ -43,6 +43,8 @@
private int highestPriority = -1;
+ private int lastPriority = -1;
+
public PriorityLinkedListImpl(final int priorities)
{
this.priorities = priorities;
@@ -54,14 +56,25 @@
levels[i] = new LinkedListImpl<T>();
}
}
-
- private void checkHighest(int priority)
+
+ private void checkHighest(final int priority)
{
+ if (lastPriority != priority || priority > highestPriority)
+ {
+ lastPriority = priority;
+ if (lastReset == Integer.MAX_VALUE)
+ {
+ lastReset = 0;
+ }
+ else
+ {
+ lastReset++;
+ }
+ }
+
if (priority > highestPriority)
{
highestPriority = priority;
-
- lastReset++;
}
}
@@ -150,19 +163,20 @@
{
private int index;
- private LinkedListIterator<T>[] cachedIters = new
LinkedListIterator[levels.length];
+ private final LinkedListIterator<T>[] cachedIters = new
LinkedListIterator[levels.length];
private LinkedListIterator<T> lastIter;
-
+
private int resetCount = lastReset;
-
+
volatile boolean closed = false;
PriorityLinkedListIterator()
{
index = levels.length - 1;
}
-
+
+ @Override
protected void finalize()
{
close();
@@ -184,7 +198,7 @@
{
closed = true;
lastIter = null;
-
+
for (LinkedListIterator<T> iter : cachedIters)
{
if (iter != null)
@@ -194,13 +208,13 @@
}
}
}
-
+
private void checkReset()
{
- if (lastReset > resetCount)
+ if (lastReset != resetCount)
{
index = highestPriority;
-
+
resetCount = lastReset;
}
}
@@ -208,7 +222,7 @@
public boolean hasNext()
{
checkReset();
-
+
while (index >= 0)
{
lastIter = cachedIters[index];
@@ -255,10 +269,17 @@
}
lastIter.remove();
-
- if (index == highestPriority && levels[index].size() == 0)
+
+ // This next statement would be the equivalent of:
+ // if (index == highestPriority && levels[index].size() == 0)
+ // However we have to keep checking all the previous levels
+ // otherwise we would cache a max that will not exist
+ // what would make us eventually having hasNext() returning false
+ // as a bug
+ // Part of the fix for HORNETQ-705
+ for (int i = index; i >= 0 && levels[index].size() == 0; i--)
{
- highestPriority--;
+ highestPriority = i;
}
size--;
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-05-28
05:06:01 UTC (rev 10751)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-05-28
16:34:41 UTC (rev 10752)
@@ -17,6 +17,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
@@ -42,6 +45,7 @@
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.cluster.impl.BridgeImpl;
import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.tests.util.UnitTestCase;
import org.hornetq.utils.LinkedListIterator;
@@ -947,25 +951,257 @@
try
{
- server0.stop();
+ server0.stop();
}
- catch(Exception ignored)
+ catch (Exception ignored)
{
-
+
}
try
{
- server1.stop();
+ server1.stop();
}
- catch(Exception ignored)
+ catch (Exception ignored)
{
-
+
}
}
}
+ public void testSawtoothLoad() throws Exception
+ {
+ Map<String, Object> server0Params = new HashMap<String, Object>();
+ HornetQServer server0 = createClusteredServerWithParams(isNetty(), 0, true,
server0Params);
+ server0.getConfiguration().setThreadPoolMaxSize(10);
+
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ addTargetParameters(server1Params);
+ HornetQServer server1 = createClusteredServerWithParams(isNetty(), 1, true,
server1Params);
+ server1.getConfiguration().setThreadPoolMaxSize(10);
+
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String forwardAddress = "forwardAddress";
+ final String queueName1 = "queue1";
+
+ Map<String, TransportConfiguration> connectors = new HashMap<String,
TransportConfiguration>();
+ final TransportConfiguration server0tc = new TransportConfiguration(getConnector(),
server0Params);
+ final TransportConfiguration server1tc = new TransportConfiguration(getConnector(),
server1Params);
+ connectors.put(server1tc.getName(), server1tc);
+
+ server0.getConfiguration().setConnectorConfigurations(connectors);
+
+ ArrayList<String> staticConnectors = new ArrayList<String>();
+ staticConnectors.add(server1tc.getName());
+
+ BridgeConfiguration bridgeConfiguration = new
BridgeConfiguration("bridge1",
+ queueName0,
+ forwardAddress,
+ null,
+ null,
+ 1000,
+ 1d,
+ -1,
+ false,
+ 0,
+
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+
staticConnectors,
+ false,
+
ConfigurationImpl.DEFAULT_CLUSTER_USER,
+
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
+
+ List<BridgeConfiguration> bridgeConfigs = new
ArrayList<BridgeConfiguration>();
+ bridgeConfigs.add(bridgeConfiguration);
+ server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+ CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress,
queueName0, null, true);
+ List<CoreQueueConfiguration> queueConfigs0 = new
ArrayList<CoreQueueConfiguration>();
+ queueConfigs0.add(queueConfig0);
+ server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+ CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress,
queueName1, null, true);
+ List<CoreQueueConfiguration> queueConfigs1 = new
ArrayList<CoreQueueConfiguration>();
+ queueConfigs1.add(queueConfig1);
+ server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+
+ try
+ {
+ server1.start();
+ server0.start();
+
+ final int numMessages = 10000;
+
+ final int totalrepeats = 10;
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ // We shouldn't have more than 10K messages pending
+ final Semaphore semop = new Semaphore(10000);
+
+ class ConsumerThread extends Thread
+ {
+ public void run()
+ {
+ try
+ {
+ ServerLocator locator =
HornetQClient.createServerLocatorWithoutHA(server1tc);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false);
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(queueName1);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer.receive(5000);
+
+ Assert.assertNotNull(message);
+
+ message.acknowledge();
+ semop.release();
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ session.close();
+ sf.close();
+ locator.close();
+
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ };
+
+ class ProducerThread extends Thread
+ {
+ final int nmsg;
+ ProducerThread(int nmsg)
+ {
+ this.nmsg = nmsg;
+ }
+ public void run()
+ {
+ ServerLocator locator =
HornetQClient.createServerLocatorWithoutHA(server0tc);
+
+ locator.setBlockOnDurableSend(false);
+ locator.setBlockOnNonDurableSend(false);
+
+ ClientSessionFactory sf = null;
+
+ ClientSession session = null;
+
+ ClientProducer producer = null;
+
+ try
+ {
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, true, true);
+
+ producer = session.createProducer(new SimpleString(testAddress));
+
+ for (int i = 0; i < nmsg; i++)
+ {
+ assertEquals(0, errors.get());
+ ClientMessage message = session.createMessage(true);
+
+ message.putIntProperty("seq", i);
+
+
+ if (i % 100 == 0)
+ {
+ message.setPriority((byte)(RandomUtil.randomPositiveInt() % 9));
+ }
+ else
+ {
+ message.setPriority((byte)5);
+ }
+
+ message.getBodyBuffer().writeBytes(new byte[50]);
+
+ producer.send(message);
+ assertTrue(semop.tryAcquire(1, 10, TimeUnit.SECONDS));
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace(System.out);
+ errors.incrementAndGet();
+ }
+ finally
+ {
+ try
+ {
+ session.close();
+ sf.close();
+ locator.close();
+ }
+ catch (Exception ignored)
+ {
+ errors.incrementAndGet();
+ }
+ }
+ }
+ }
+
+ for (int repeat = 0 ; repeat < totalrepeats; repeat++)
+ {
+ System.out.println("Repeat " + repeat);
+ ArrayList<Thread> threads = new ArrayList<Thread>();
+
+ threads.add(new ConsumerThread());
+ threads.add(new ProducerThread(numMessages / 2));
+ threads.add(new ProducerThread(numMessages / 2));
+
+ for (Thread t : threads)
+ {
+ t.start();
+ }
+
+ for (Thread t : threads)
+ {
+ t.join();
+ }
+
+ assertEquals(0, errors.get());
+ }
+ }
+ finally
+ {
+ try
+ {
+ server0.stop();
+ }
+ catch (Exception ignored)
+ {
+
+ }
+
+ try
+ {
+ server1.stop();
+ }
+ catch (Exception ignored)
+ {
+
+ }
+ }
+
+ }
+
public void testBridgeWithPaging() throws Exception
{
HornetQServer server0 = null;
@@ -1142,11 +1378,11 @@
ArrayList<String> staticConnectors = new ArrayList<String>();
staticConnectors.add(server1tc.getName());
BridgeConfiguration bridgeConfiguration = new
BridgeConfiguration("bridge1", queueName0, null, // pass a null
-
// forwarding
-
// address to
-
// use messages'
-
// original
-
// address
+ // forwarding
+ // address to
+ // use
messages'
+ // original
+ // address
null,
null,
1000,
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-05-28
05:06:01 UTC (rev 10751)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-05-28
16:34:41 UTC (rev 10752)
@@ -513,7 +513,7 @@
throw new IllegalArgumentException("No sf at " + node);
}
- ClientSession session = sf.createSession(false, true, true);
+ ClientSession session = sf.createSession(false, false, false);
try
{
@@ -531,7 +531,14 @@
message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
producer.send(message);
+
+ if (i % 100 == 0)
+ {
+ session.commit();
+ }
}
+
+ session.commit();
}
finally
{
@@ -1328,6 +1335,7 @@
configuration.setJournalFileSize(100 * 1024);
configuration.setJournalType(getDefaultJournalType());
configuration.setSharedStore(sharedStorage);
+ configuration.setThreadPoolMaxSize(10);
if (sharedStorage)
{
// Shared storage will share the node between the backup and live node
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2011-05-28
05:06:01 UTC (rev 10751)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2011-05-28
16:34:41 UTC (rev 10752)
@@ -210,6 +210,56 @@
verifyNotReceive(0, 1, 2, 3, 4);
}
+
+ public void testBasicRoundRobinManyMessages() throws Exception
+ {
+ setupCluster();
+
+ startServers();
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+ setupSessionFactory(3, isNetty());
+ setupSessionFactory(4, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+ createQueue(2, "queues.testaddress", "queue0", null, false);
+ createQueue(3, "queues.testaddress", "queue0", null, false);
+ createQueue(4, "queues.testaddress", "queue0", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+ addConsumer(3, 3, "queue0", null);
+ addConsumer(4, 4, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+ waitForBindings(3, "queues.testaddress", 1, 1, true);
+ waitForBindings(4, "queues.testaddress", 1, 1, true);
+
+ System.out.println(clusterDescription(servers[0]));
+ System.out.println(clusterDescription(servers[1]));
+ System.out.println(clusterDescription(servers[2]));
+ System.out.println(clusterDescription(servers[3]));
+ System.out.println(clusterDescription(servers[4]));
+
+ waitForBindings(0, "queues.testaddress", 4, 4, false);
+ waitForBindings(1, "queues.testaddress", 4, 4, false);
+ waitForBindings(2, "queues.testaddress", 4, 4, false);
+ waitForBindings(3, "queues.testaddress", 4, 4, false);
+ waitForBindings(4, "queues.testaddress", 4, 4, false);
+
+ send(0, "queues.testaddress", 1000, true, null);
+
+ verifyReceiveRoundRobinInSomeOrder(1000, 0, 1, 2, 3, 4);
+
+ verifyNotReceive(0, 1, 2, 3, 4);
+ }
+
public void testRoundRobinMultipleQueues() throws Exception
{
SymmetricClusterTest.log.info("starting");
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/list/impl/PriorityLinkedListTestBase.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/list/impl/PriorityLinkedListTestBase.java 2011-05-28
05:06:01 UTC (rev 10751)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/list/impl/PriorityLinkedListTestBase.java 2011-05-28
16:34:41 UTC (rev 10752)
@@ -16,6 +16,7 @@
import junit.framework.Assert;
import junit.framework.TestCase;
+import org.hornetq.tests.util.RandomUtil;
import org.hornetq.utils.LinkedListIterator;
import org.hornetq.utils.PriorityLinkedListImpl;
@@ -77,9 +78,9 @@
protected Wibble y;
protected Wibble z;
-
+
private PriorityLinkedListImpl<Wibble> list;
-
+
protected abstract PriorityLinkedListImpl<Wibble> getList();
public void setUp() throws Exception
@@ -133,7 +134,7 @@
Wibble w = list.poll();
Assert.assertEquals(a, w);
Assert.assertTrue(list.isEmpty());
-
+
assertEquals(0, list.size());
}
@@ -144,7 +145,7 @@
list.addHead(c, 0);
list.addHead(d, 0);
list.addHead(e, 0);
-
+
assertEquals(5, list.size());
Assert.assertEquals(e, list.poll());
@@ -153,7 +154,7 @@
Assert.assertEquals(b, list.poll());
Assert.assertEquals(a, list.poll());
Assert.assertNull(list.poll());
-
+
assertEquals(0, list.size());
}
@@ -172,11 +173,11 @@
Assert.assertEquals(d, list.poll());
Assert.assertEquals(e, list.poll());
Assert.assertNull(list.poll());
-
+
assertEquals(0, list.size());
}
-
+
public void testAddLastAndFirst() throws Exception
{
list.addTail(a, 0);
@@ -189,7 +190,7 @@
list.addTail(h, 0);
list.addTail(i, 0);
list.addTail(j, 0);
-
+
list.addHead(k, 0);
list.addHead(l, 0);
list.addHead(m, 0);
@@ -200,7 +201,7 @@
list.addHead(r, 0);
list.addHead(s, 0);
list.addHead(t, 0);
-
+
assertEquals(t, list.poll());
assertEquals(s, list.poll());
assertEquals(r, list.poll());
@@ -211,7 +212,7 @@
assertEquals(m, list.poll());
assertEquals(l, list.poll());
assertEquals(k, list.poll());
-
+
assertEquals(a, list.poll());
assertEquals(b, list.poll());
assertEquals(c, list.poll());
@@ -223,7 +224,7 @@
assertEquals(i, list.poll());
assertEquals(j, list.poll());
}
-
+
public void testAddLastAndFirstWithIterator() throws Exception
{
list.addTail(a, 0);
@@ -236,7 +237,7 @@
list.addTail(h, 0);
list.addTail(i, 0);
list.addTail(j, 0);
-
+
list.addHead(k, 0);
list.addHead(l, 0);
list.addHead(m, 0);
@@ -247,9 +248,9 @@
list.addHead(r, 0);
list.addHead(s, 0);
list.addHead(t, 0);
-
+
LinkedListIterator<Wibble> iter = list.iterator();
-
+
assertTrue(iter.hasNext());
assertEquals(t, iter.next());
assertTrue(iter.hasNext());
@@ -270,7 +271,7 @@
assertEquals(l, iter.next());
assertTrue(iter.hasNext());
assertEquals(k, iter.next());
-
+
assertTrue(iter.hasNext());
assertEquals(a, iter.next());
assertTrue(iter.hasNext());
@@ -438,7 +439,7 @@
Assert.assertEquals(a, list.poll());
Assert.assertNull(list.poll());
-
+
assertEquals(0, list.size());
}
@@ -567,8 +568,7 @@
w = (Wibble)iter.next();
Assert.assertEquals("z", w.s);
assertFalse(iter.hasNext());
-
-
+
iter = list.iterator();
assertTrue(iter.hasNext());
w = (Wibble)iter.next();
@@ -617,7 +617,7 @@
iter.remove();
Assert.assertEquals(23, list.size());
-
+
assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("i", w.s);
@@ -671,7 +671,6 @@
Assert.assertEquals("z", w.s);
iter.remove();
-
iter = list.iterator();
assertTrue(iter.hasNext());
w = (Wibble)iter.next();
@@ -739,78 +738,76 @@
assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("y", w.s);
-
+
assertFalse(iter.hasNext());
assertFalse(iter.hasNext());
- //Test the elements added after iter created are seen
-
+ // Test the elements added after iter created are seen
+
list.addTail(a, 4);
list.addTail(b, 4);
-
+
assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("a", w.s);
-
+
assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("b", w.s);
-
- assertFalse(iter.hasNext());
-
+
+ assertFalse(iter.hasNext());
+
list.addTail(c, 4);
list.addTail(d, 4);
-
+
assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("c", w.s);
-
+
assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("d", w.s);
-
+
assertFalse(iter.hasNext());
-
}
-
+
public void testIteratorPicksUpHigherPriorities()
{
list.addTail(a, 4);
list.addTail(b, 4);
list.addTail(c, 4);
-
+
LinkedListIterator<Wibble> iter = list.iterator();
-
+
assertTrue(iter.hasNext());
assertEquals(a, iter.next());
-
+
assertTrue(iter.hasNext());
assertEquals(b, iter.next());
-
+
list.addTail(d, 5);
list.addTail(e, 5);
-
+
assertTrue(iter.hasNext());
assertEquals(d, iter.next());
-
+
assertTrue(iter.hasNext());
assertEquals(e, iter.next());
-
+
assertTrue(iter.hasNext());
assertEquals(c, iter.next());
-
+
list.addTail(f, 1);
list.addTail(g, 9);
-
+
assertTrue(iter.hasNext());
assertEquals(g, iter.next());
-
+
assertTrue(iter.hasNext());
assertEquals(f, iter.next());
}
-
public void testClear()
{
list.addTail(a, 0);
@@ -829,6 +826,59 @@
Assert.assertNull(list.poll());
}
+ public void testMixupIterator()
+ {
+ list.addTail(c, 5);
+ list.addTail(a, 4);
+ list.addTail(b, 4);
+
+ LinkedListIterator<Wibble> iter = list.iterator();
+
+ assertTrue(iter.hasNext());
+ assertEquals(c, iter.next());
+ assertTrue(iter.hasNext());
+ assertEquals(a, iter.next());
+ assertTrue(iter.hasNext());
+ assertEquals(b, iter.next());
+ list.addTail(d, 5);
+ assertTrue(iter.hasNext());
+ assertEquals(d, iter.next());
+ }
+
+ public void testMixupIterator2()
+ {
+ list.addTail(c, 5);
+
+ list.addTail(k, 0);
+
+ list.addTail(a, 2);
+ list.addTail(b, 2);
+
+ LinkedListIterator<Wibble> iter = list.iterator();
+
+ assertTrue(iter.hasNext());
+ assertEquals(c, iter.next());
+ iter.remove();
+
+ assertTrue(iter.hasNext());
+ assertEquals(a, iter.next());
+ iter.remove();
+
+ assertTrue(iter.hasNext());
+ assertEquals(b, iter.next());
+ iter.remove();
+
+ assertTrue(iter.hasNext());
+ assertEquals(k, iter.next());
+ iter.remove();
+
+ list.addTail(d, 2);
+
+ assertTrue(iter.hasNext());
+ assertEquals(d, iter.next());
+ iter.remove();
+ }
+
class Wibble
{
String s;