JBoss hornetq SVN: r9570 - branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-08-20 09:45:58 -0400 (Fri, 20 Aug 2010)
New Revision: 9570
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
Log:
clear data for failover tests
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-08-20 10:18:05 UTC (rev 9569)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-08-20 13:45:58 UTC (rev 9570)
@@ -79,7 +79,7 @@
protected void setUp() throws Exception
{
super.setUp();
-
+ clearData();
createConfigs();
if (server1Service != null)
@@ -109,6 +109,11 @@
public void preActivate()
{
// To avoid two servers messing up with the same journal at any single point
+
+ }
+
+ public void activated()
+ {
try
{
server0Service.getStorageManager().stop();
@@ -117,10 +122,6 @@
{
}
}
-
- public void activated()
- {
- }
});
Configuration config0 = super.createDefaultConfig();
15 years, 9 months
JBoss hornetq SVN: r9569 - in branches/2_2_0_HA_Improvements/src/main/org/hornetq/core: server/cluster/impl and 1 other directory.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-20 06:18:05 -0400 (Fri, 20 Aug 2010)
New Revision: 9569
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
Log:
HA refactoring
* moved code to reconnect from ServerLocator to ClusterConnectionBridge
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-08-20 08:17:24 UTC (rev 9568)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-08-20 10:18:05 UTC (rev 9569)
@@ -1125,14 +1125,13 @@
return;
}
- final TopologyMember member = topology.getMember(nodeID);
removed = topology.removeMember(nodeID);
if (!topology.isEmpty())
{
updateArraysAndPairs();
- if (topology.size() == 1 && topology.getMember(nodeID) != null)
+ if (topology.size() == 1 && topology.getMember(this.nodeID) != null)
{
receivedTopology = false;
}
@@ -1146,39 +1145,6 @@
receivedTopology = false;
}
- if (ha && discoveryAddress == null && removed)
- {
- threadPool.execute(new Runnable()
- {
- public void run()
- {
- System.out.println(ServerLocatorImpl.this.nodeID + " will try to connect to " + nodeID);
- ClientSessionFactory sf = null;
- do
- {
- try
- {
- Pair<TransportConfiguration,TransportConfiguration> pair = member.getConnector();
- TransportConfiguration tc = (pair.a != null) ? pair.a : pair.b;
- sf = createSessionFactory(tc);
- }
- catch (HornetQException e)
- {
- if (e.getCode() == HornetQException.NOT_CONNECTED)
- {
- continue;
- }
- }
- catch (Exception e)
- {
- break;
- }
- }
- while (sf == null);
- }
- });
- }
-
if (removed)
{
for (ClusterTopologyListener listener : topologyListeners)
@@ -1245,7 +1211,7 @@
this.initialConnectors[count++] = entry.getConnector();
}
- if (ha && clusterConnection && !receivedTopology)
+ if (ha && clusterConnection && !receivedTopology && initialConnectors.length > 0)
{
// FIXME the node is alone in the cluster. We create a connection to the new node
// to trigger the node notification to form the cluster.
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2010-08-20 08:17:24 UTC (rev 9568)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2010-08-20 10:18:05 UTC (rev 9569)
@@ -74,7 +74,7 @@
private Queue queue;
- private final Executor executor;
+ protected final Executor executor;
private final Filter filter;
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2010-08-20 08:17:24 UTC (rev 9568)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2010-08-20 10:18:05 UTC (rev 9569)
@@ -231,15 +231,48 @@
@Override
public void connectionFailed(HornetQException me)
{
- try
+ if (!session.isClosed())
{
- session.cleanUp(false);
+ try
+ {
+ session.cleanUp(false);
+ }
+ catch (Exception e)
+ {
+ log.warn("Unable to clean up the session after a connection failure", e);
+ }
+ serverLocator.notifyNodeDown(targetNodeID);
+ if (serverLocator.getDiscoveryAddress() == null)
+ {
+ executor.execute(new Runnable()
+ {
+
+ public void run()
+ {
+ ClientSessionFactory sf = null;
+ do
+ {
+ try
+ {
+ sf = serverLocator.createSessionFactory(connector);
+ }
+ catch (HornetQException e)
+ {
+ if (e.getCode() == HornetQException.NOT_CONNECTED)
+ {
+ continue;
+ }
+ }
+ catch (Exception e)
+ {
+ break;
+ }
+ }
+ while (sf == null);
+ }
+ });
+ }
}
- catch (Exception e)
- {
- log.warn("Unable to clean up the session after a connection failure", e);
- }
- serverLocator.notifyNodeDown(targetNodeID);
super.connectionFailed(me);
}
15 years, 9 months
JBoss hornetq SVN: r9568 - in branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests: util and 1 other directory.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-08-20 04:17:24 -0400 (Fri, 20 Aug 2010)
New Revision: 9568
Added:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/FakeLockHornetQServer.java
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
added fake lock server
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-08-20 08:16:38 UTC (rev 9567)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-08-20 08:17:24 UTC (rev 9568)
@@ -101,7 +101,7 @@
config1.setSecurityEnabled(false);
config1.setSharedStore(true);
config1.setBackup(true);
- server1Service = createServer(true, config1);
+ server1Service = createFakeLockServer(true, config1);
server1Service.registerActivateCallback(new ActivateCallback()
{
@@ -128,7 +128,7 @@
config0.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
config0.setSecurityEnabled(false);
config0.setSharedStore(true);
- server0Service = createServer(true, config0);
+ server0Service = createFakeLockServer(true, config0);
}
Added: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/FakeLockHornetQServer.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/FakeLockHornetQServer.java (rev 0)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/FakeLockHornetQServer.java 2010-08-20 08:17:24 UTC (rev 9568)
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.util;
+
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.server.cluster.LockFile;
+import org.hornetq.core.server.cluster.impl.FakeLockFile;
+import org.hornetq.core.server.impl.HornetQServerImpl;
+import org.hornetq.spi.core.security.HornetQSecurityManager;
+
+import javax.management.MBeanServer;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * Created Jul 23, 2010
+ */
+public class FakeLockHornetQServer extends HornetQServerImpl
+{
+ public FakeLockHornetQServer()
+ {
+ super(); //To change body of overridden methods use File | Settings | File Templates.
+ }
+
+ public FakeLockHornetQServer(Configuration configuration)
+ {
+ super(configuration); //To change body of overridden methods use File | Settings | File Templates.
+ }
+
+ public FakeLockHornetQServer(Configuration configuration, MBeanServer mbeanServer)
+ {
+ super(configuration, mbeanServer); //To change body of overridden methods use File | Settings | File Templates.
+ }
+
+ public FakeLockHornetQServer(Configuration configuration, HornetQSecurityManager securityManager)
+ {
+ super(configuration, securityManager); //To change body of overridden methods use File | Settings | File Templates.
+ }
+
+ public FakeLockHornetQServer(Configuration configuration, MBeanServer mbeanServer, HornetQSecurityManager securityManager)
+ {
+ super(configuration, mbeanServer, securityManager); //To change body of overridden methods use File | Settings | File Templates.
+ }
+
+ @Override
+ protected LockFile createLockFile(String fileName, String directory)
+ {
+ return new FakeLockFile(fileName, directory);
+ }
+}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2010-08-20 08:16:38 UTC (rev 9567)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2010-08-20 08:17:24 UTC (rev 9568)
@@ -42,6 +42,7 @@
import org.hornetq.jms.client.HornetQBytesMessage;
import org.hornetq.jms.client.HornetQTextMessage;
import org.hornetq.spi.core.security.HornetQSecurityManager;
+import org.hornetq.spi.core.security.HornetQSecurityManagerImpl;
import static org.hornetq.tests.util.ServiceTestBase.*;
@@ -161,6 +162,47 @@
return createServer(realFiles, configuration, -1, -1, new HashMap<String, AddressSettings>());
}
+ protected HornetQServer createFakeLockServer(final boolean realFiles)
+ {
+ return createFakeLockServer(realFiles, false);
+ }
+
+ protected HornetQServer createFakeLockServer(final boolean realFiles, final boolean netty)
+ {
+ return createFakeLockServer(realFiles, createDefaultConfig(netty), -1, -1, new HashMap<String, AddressSettings>());
+ }
+
+ protected HornetQServer createFakeLockServer(final boolean realFiles, final Configuration configuration)
+ {
+ return createFakeLockServer(realFiles, configuration, -1, -1, new HashMap<String, AddressSettings>());
+ }
+
+ protected HornetQServer createFakeLockServer(final boolean realFiles,
+ final Configuration configuration,
+ final int pageSize,
+ final int maxAddressSize,
+ final Map<String, AddressSettings> settings)
+ {
+ HornetQServer server;
+ HornetQSecurityManager securityManager = new HornetQSecurityManagerImpl();
+ configuration.setPersistenceEnabled(realFiles);
+ server = new FakeLockHornetQServer(configuration,ManagementFactory.getPlatformMBeanServer(),securityManager);
+
+
+ for (Map.Entry<String, AddressSettings> setting : settings.entrySet())
+ {
+ server.getAddressSettingsRepository().addMatch(setting.getKey(), setting.getValue());
+ }
+
+ AddressSettings defaultSetting = new AddressSettings();
+ defaultSetting.setPageSizeBytes(pageSize);
+ defaultSetting.setMaxSizeBytes(maxAddressSize);
+
+ server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+ return server;
+ }
+
protected HornetQServer createServer(final boolean realFiles,
final Configuration configuration,
final HornetQSecurityManager securityManager)
15 years, 9 months
JBoss hornetq SVN: r9567 - branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-08-20 04:16:38 -0400 (Fri, 20 Aug 2010)
New Revision: 9567
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
removed unwanted import
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-08-19 15:10:14 UTC (rev 9566)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-08-20 08:16:38 UTC (rev 9567)
@@ -25,7 +25,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
-import apple.awt.CList;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
15 years, 9 months
JBoss hornetq SVN: r9566 - branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-19 11:10:14 -0400 (Thu, 19 Aug 2010)
New Revision: 9566
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
HA refactoring
* add thread to reconnect to a node when it is detected as down (static connector case)
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-08-19 11:42:35 UTC (rev 9565)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-08-19 15:10:14 UTC (rev 9566)
@@ -1116,35 +1116,67 @@
closed = true;
}
- public void notifyNodeDown(final String nodeID)
+ public synchronized void notifyNodeDown(final String nodeID)
{
boolean removed = false;
- synchronized (this)
+
+ if (!ha)
{
- if (!ha)
+ return;
+ }
+
+ final TopologyMember member = topology.getMember(nodeID);
+ removed = topology.removeMember(nodeID);
+
+ if (!topology.isEmpty())
+ {
+ updateArraysAndPairs();
+
+ if (topology.size() == 1 && topology.getMember(nodeID) != null)
{
- return;
+ receivedTopology = false;
}
+ }
+ else
+ {
+ pairs.clear();
- removed = topology.removeMember(nodeID);
-
- if (!topology.isEmpty())
+ topologyArray = null;
+
+ receivedTopology = false;
+ }
+
+ if (ha && discoveryAddress == null && removed)
+ {
+ threadPool.execute(new Runnable()
{
- updateArraysAndPairs();
-
- if (topology.size() == 1 && topology.getMember(nodeID) != null)
+ public void run()
{
- receivedTopology = false;
+ System.out.println(ServerLocatorImpl.this.nodeID + " will try to connect to " + nodeID);
+ ClientSessionFactory sf = null;
+ do
+ {
+ try
+ {
+ Pair<TransportConfiguration,TransportConfiguration> pair = member.getConnector();
+ TransportConfiguration tc = (pair.a != null) ? pair.a : pair.b;
+ sf = createSessionFactory(tc);
+ }
+ catch (HornetQException e)
+ {
+ if (e.getCode() == HornetQException.NOT_CONNECTED)
+ {
+ continue;
+ }
+ }
+ catch (Exception e)
+ {
+ break;
+ }
+ }
+ while (sf == null);
}
- }
- else
- {
- pairs.clear();
-
- topologyArray = null;
-
- receivedTopology = false;
- }
+ });
}
if (removed)
@@ -1213,7 +1245,7 @@
this.initialConnectors[count++] = entry.getConnector();
}
- if (clusterConnection && !receivedTopology)
+ if (ha && clusterConnection && !receivedTopology)
{
// FIXME the node is alone in the cluster. We create a connection to the new node
// to trigger the node notification to form the cluster.
15 years, 9 months
JBoss hornetq SVN: r9565 - in branches/2_2_0_HA_Improvements: src/main/org/hornetq/core/cluster/impl and 5 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-19 07:42:35 -0400 (Thu, 19 Aug 2010)
New Revision: 9565
Added:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/TopologyMember.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
Removed:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/Topology.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/TopologyMember.java
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
Log:
HA refactoring
* improvements for discovery, cluster formation using node notifications, etc.
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -43,6 +43,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.Ping;
import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
import org.hornetq.core.remoting.FailureListener;
@@ -1072,6 +1073,11 @@
if (serverLocator.isHA())
{
channel0.send(new SubscribeClusterTopologyUpdatesMessage(serverLocator.isClusterConnection()));
+ if (serverLocator.isClusterConnection())
+ {
+ TransportConfiguration config = serverLocator.getClusterTransportConfiguration();
+ channel0.send(new NodeAnnounceMessage(serverLocator.getNodeID(), serverLocator.isBackup(), config));
+ }
}
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -18,7 +18,6 @@
import java.net.InetAddress;
import java.security.AccessController;
import java.security.PrivilegedAction;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -44,9 +43,6 @@
import org.hornetq.core.cluster.DiscoveryListener;
import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.server.cluster.impl.Topology;
-import org.hornetq.core.server.cluster.impl.TopologyMember;
-import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.HornetQThreadFactory;
import org.hornetq.utils.UUIDGenerator;
@@ -166,6 +162,10 @@
private String groupID;
private String nodeID;
+
+ private TransportConfiguration clusterTransportConfiguration;
+
+ private boolean backup;
private static synchronized ExecutorService getGlobalThreadPool()
{
@@ -504,7 +504,7 @@
interceptors);
factories.add(factory);
-
+
return factory;
}
@@ -1008,6 +1008,11 @@
{
this.nodeID = nodeID;
}
+
+ public String getNodeID()
+ {
+ return nodeID;
+ }
public void setClusterConnection(boolean clusterConnection)
{
@@ -1018,7 +1023,27 @@
{
return clusterConnection;
}
+
+ public TransportConfiguration getClusterTransportConfiguration()
+ {
+ return clusterTransportConfiguration;
+ }
+
+ public void setClusterTransportConfiguration(TransportConfiguration tc)
+ {
+ this.clusterTransportConfiguration = tc;
+ }
+ public boolean isBackup()
+ {
+ return backup;
+ }
+
+ public void setBackup(boolean backup)
+ {
+ this.backup = backup;
+ }
+
@Override
protected void finalize() throws Throwable
{
@@ -1091,31 +1116,43 @@
closed = true;
}
- public synchronized void notifyNodeDown(final String nodeID)
+ public void notifyNodeDown(final String nodeID)
{
- if (!ha)
+ boolean removed = false;
+ synchronized (this)
{
- return;
- }
+ if (!ha)
+ {
+ return;
+ }
- topology.removeMember(nodeID);
+ removed = topology.removeMember(nodeID);
+
+ if (!topology.isEmpty())
+ {
+ updateArraysAndPairs();
+
+ if (topology.size() == 1 && topology.getMember(nodeID) != null)
+ {
+ receivedTopology = false;
+ }
+ }
+ else
+ {
+ pairs.clear();
- if (!topology.isEmpty())
- {
- updateArraysAndPairs();
- }
- else
- {
- pairs.clear();
+ topologyArray = null;
- topologyArray = null;
-
- receivedTopology = false;
+ receivedTopology = false;
+ }
}
- for (ClusterTopologyListener listener : topologyListeners)
+ if (removed)
{
- listener.nodeDown(nodeID);
+ for (ClusterTopologyListener listener : topologyListeners)
+ {
+ listener.nodeDown(nodeID);
+ }
}
}
@@ -1144,7 +1181,6 @@
}
// Notify if waiting on getting topology
-
notify();
}
@@ -1175,11 +1211,14 @@
for (DiscoveryEntry entry : newConnectors)
{
this.initialConnectors[count++] = entry.getConnector();
-
- notifyNodeUp(entry.getNodeID(), new Pair<TransportConfiguration, TransportConfiguration>(entry.getConnector(), null), true, 1);
}
-
- System.out.println(">>>>>>>> Discovered initial connectors= " + Arrays.asList(initialConnectors));
+
+ if (clusterConnection && !receivedTopology)
+ {
+ // FIXME the node is alone in the cluster. We create a connection to the new node
+ // to trigger the node notification to form the cluster.
+ connect();
+ }
}
public synchronized void factoryClosed(final ClientSessionFactory factory)
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -35,7 +35,9 @@
TransportConfiguration getBackup( TransportConfiguration live);
void setNodeID(String nodeID);
-
+
+ String getNodeID();
+
void connect();
void addClusterTopologyListener(ClusterTopologyListener listener);
@@ -49,4 +51,13 @@
void setClusterConnection(boolean clusterConnection);
boolean isClusterConnection();
+
+ TransportConfiguration getClusterTransportConfiguration();
+
+ void setClusterTransportConfiguration(TransportConfiguration tc);
+
+ boolean isBackup();
+
+ void setBackup(boolean backup);
+
}
Copied: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java (from rev 9550, branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/Topology.java)
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java (rev 0)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.client.impl;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.hornetq.api.core.client.ClusterTopologyListener;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * Created Aug 16, 2010
+ */
+public class Topology
+{
+ /*
+ * topology describes the other cluster nodes that this server knows about:
+ *
+ * keys are node IDs
+ * values are a pair of live/backup transport configurations
+ */
+ private Map<String, TopologyMember> topology = new HashMap<String, TopologyMember>();
+
+ public synchronized boolean addMember(String nodeId, TopologyMember member)
+ {
+ boolean replaced = topology.containsKey(nodeId);
+ topology.put(nodeId, member);
+ return replaced;
+ }
+
+ public synchronized boolean removeMember(String nodeId)
+ {
+ TopologyMember member = topology.remove(nodeId);
+ return (member != null);
+ }
+
+ public synchronized void fireListeners(ClusterTopologyListener listener)
+ {
+ int count = 0;
+ for (Map.Entry<String, TopologyMember> entry : topology.entrySet())
+ {
+ listener.nodeUP(entry.getKey(), entry.getValue().getConnector(), ++count == topology.size(), entry.getValue().getDistance());
+ }
+ }
+
+ public TopologyMember getMember(String nodeID)
+ {
+ return topology.get(nodeID);
+ }
+
+ public boolean isEmpty()
+ {
+ return topology.isEmpty();
+ }
+
+ public Collection<TopologyMember> getMembers()
+ {
+ return topology.values();
+ }
+
+ public int size()
+ {
+ return topology.size();
+ }
+
+ public String describe()
+ {
+
+ String desc = "";
+ for (Entry<String, TopologyMember> entry : new HashMap<String, TopologyMember>(topology).entrySet())
+ {
+ desc += "\t" + entry.getKey() + " => " + entry.getValue() + "\n";
+ }
+ return desc;
+ }
+
+ public void clear()
+ {
+ topology.clear();
+ }
+}
Copied: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/TopologyMember.java (from rev 9550, branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/TopologyMember.java)
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/TopologyMember.java (rev 0)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/TopologyMember.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.client.impl;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * Created Aug 16, 2010
+ */
+public class TopologyMember
+{
+ private final Pair<TransportConfiguration, TransportConfiguration> connector;
+
+ private final int distance;
+
+ public TopologyMember(Pair<TransportConfiguration, TransportConfiguration> connector, int distance)
+ {
+ this.connector = connector;
+ this.distance = distance;
+ }
+
+ public Pair<TransportConfiguration, TransportConfiguration> getConnector()
+ {
+ return connector;
+ }
+
+ public int getDistance()
+ {
+ return distance;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "TopologyMember[distance=" + distance + ", connector=" + connector + "]";
+ }
+}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -413,7 +413,6 @@
if (entry.getValue().getLastUpdate() + timeout <= now)
{
- System.out.println("remove " + entry);
iter.remove();
changed = true;
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -30,6 +30,7 @@
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.Ping;
import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
import org.hornetq.core.remoting.CloseListener;
@@ -118,11 +119,6 @@
{
channel0.send(new ClusterTopologyChangeMessage(nodeID));
}
-
- public String toString()
- {
- return "ClusterTopologyListener[address=" + connection.getRemoteAddress() + "]";
- };
};
final boolean isCC = msg.isClusterConnection();
@@ -137,6 +133,21 @@
}
});
}
+ else if (packet.getType() == PacketImpl.NODE_ANNOUNCE)
+ {
+ NodeAnnounceMessage msg = (NodeAnnounceMessage)packet;
+
+ Pair<TransportConfiguration, TransportConfiguration> pair;
+ if (msg.isBackup())
+ {
+ pair = new Pair<TransportConfiguration, TransportConfiguration>(null, msg.getConnector());
+ }
+ else
+ {
+ pair = new Pair<TransportConfiguration, TransportConfiguration>(msg.getConnector(), null);
+ }
+ server.getClusterManager().notifyNodeUp(msg.getNodeID(), pair, false, 1);
+ }
}
});
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -21,6 +21,7 @@
import static org.hornetq.core.protocol.core.impl.PacketImpl.DELETE_QUEUE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.DISCONNECT;
import static org.hornetq.core.protocol.core.impl.PacketImpl.EXCEPTION;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.NODE_ANNOUNCE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.NULL_RESPONSE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.PACKETS_CONFIRMED;
import static org.hornetq.core.protocol.core.impl.PacketImpl.PING;
@@ -90,6 +91,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
import org.hornetq.core.protocol.core.impl.wireformat.Ping;
@@ -495,6 +497,11 @@
packet = new ClusterTopologyChangeMessage();
break;
}
+ case NODE_ANNOUNCE:
+ {
+ packet = new NodeAnnounceMessage();
+ break;
+ }
case SUBSCRIBE_TOPOLOGY:
{
packet = new SubscribeClusterTopologyUpdatesMessage();
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -186,6 +186,8 @@
public static final byte CLUSTER_TOPOLOGY = 110;
+ public static final byte NODE_ANNOUNCE = 111;
+
public static final byte SUBSCRIBE_TOPOLOGY = 112;
// Static --------------------------------------------------------
Added: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java (rev 0)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ */
+public class NodeAnnounceMessage extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(NodeAnnounceMessage.class);
+
+ // Attributes ----------------------------------------------------
+
+ private String nodeID;
+
+ private boolean backup;
+
+ private TransportConfiguration connector;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public NodeAnnounceMessage(final String nodeID, final boolean backup, final TransportConfiguration tc)
+ {
+ super(PacketImpl.NODE_ANNOUNCE);
+
+ this.nodeID = nodeID;
+
+ this.backup = backup;
+
+ this.connector = tc;
+ }
+
+ public NodeAnnounceMessage()
+ {
+ super(PacketImpl.NODE_ANNOUNCE);
+ }
+
+ // Public --------------------------------------------------------
+
+
+ public String getNodeID()
+ {
+ return nodeID;
+ }
+
+ public boolean isBackup()
+ {
+ return backup;
+ }
+
+ public TransportConfiguration getConnector()
+ {
+ return connector;
+ }
+
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeString(nodeID);
+ buffer.writeBoolean(backup);
+ connector.encode(buffer);
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ this.nodeID = buffer.readString();
+ this.backup = buffer.readBoolean();
+ connector = new TransportConfiguration();
+ connector.decode(buffer);
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -15,7 +15,6 @@
import java.util.Map;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClusterTopologyListener;
@@ -51,12 +50,8 @@
void activate();
- Pair<TransportConfiguration, TransportConfiguration>[] getTopology();
-
TransportConfiguration getConnector();
// for debug
String description();
-
- void announce(String nodeID, Pair<TransportConfiguration, TransportConfiguration> pair, boolean b);
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -20,6 +20,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClusterTopologyListener;
+import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.server.HornetQComponent;
/**
@@ -46,7 +47,9 @@
void activate();
- void notifyClientsNodeDown(String nodeID);
+ void notifyNodeDown(String nodeID);
- void notifyClientsNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean b, int distance);
+ void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean backup, int distance);
+
+ Topology getTopology();
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -25,7 +25,6 @@
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.api.core.management.ResourceNames;
@@ -238,8 +237,7 @@
}
catch (Exception e)
{
- // TODO Auto-generated catch block
- e.printStackTrace();
+ log.warn("Unable to clean up the session after a connection failure", e);
}
serverLocator.notifyNodeDown(targetNodeID);
super.connectionFailed(me);
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -93,12 +93,12 @@
private final String clusterPassword;
- private Pair<TransportConfiguration, TransportConfiguration>[] topology;
-
private final ServerLocatorInternal serverLocator;
private final TransportConfiguration connector;
+ private final boolean allowsDirectConnectionsOnly;
+
public ClusterConnectionImpl(final ServerLocatorInternal serverLocator,
final TransportConfiguration connector,
final SimpleString name,
@@ -131,6 +131,15 @@
if (this.serverLocator != null)
{
this.serverLocator.setClusterConnection(true);
+ this.serverLocator.setClusterTransportConfiguration(connector);
+ this.serverLocator.setBackup(server.getConfiguration().isBackup());
+
+ // a cluster connection will connect to other nodes only if they are directly connected
+ // through a static list of connectors
+ allowsDirectConnectionsOnly = (serverLocator.getStaticTransportConfigurations() != null);
+ } else
+ {
+ allowsDirectConnectionsOnly = false;
}
this.connector = connector;
@@ -245,11 +254,6 @@
started = false;
}
- public Pair<TransportConfiguration, TransportConfiguration>[] getTopology()
- {
- return topology;
- }
-
public boolean isStarted()
{
return started;
@@ -302,8 +306,6 @@
return;
}
- server.getClusterManager().notifyClientsNodeDown(nodeID);
-
//Remove the flow record for that node
MessageFlowRecord record = records.remove(nodeID);
@@ -313,13 +315,15 @@
try
{
record.reset();
- record.close();
+ //record.close();
}
catch (Exception e)
{
log.error("Failed to close flow record", e);
}
}
+
+ server.getClusterManager().notifyNodeDown(nodeID);
}
public synchronized void nodeUP(final String nodeID,
@@ -327,14 +331,28 @@
final boolean last,
final int distance)
{
- //we only create a bridge it it isnt ourselves and the node is 1hop away
- if (nodeID.equals(nodeUUID.toString()) || distance > 1)
+ // discard notifications about ourselves
+ if (nodeID.equals(nodeUUID.toString()))
{
return;
}
-
- server.getClusterManager().notifyClientsNodeUp(nodeID, connectorPair, false, distance);
-
+
+ // we propagate the node notifications to all cluster topology listeners
+ server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, distance);
+
+ // if the node is more than 1 hop away, we do not create a bridge for direct cluster connection
+ if (allowsDirectConnectionsOnly && distance > 1)
+ {
+ return;
+ }
+
+ // FIXME required to prevent cluster connections w/o discovery group
+ // and empty static connectors to create bridges... ulgy!
+ if (serverLocator == null)
+ {
+ return;
+ }
+
try
{
MessageFlowRecord record = records.get(nodeID);
@@ -377,21 +395,6 @@
}
}
- public void announce(String nodeID, Pair<TransportConfiguration, TransportConfiguration> pair, boolean b)
- {
- TransportConfiguration connector = (backup) ? pair.b : pair.a;
- if (serverLocator!= null && serverLocator.getStaticTransportConfigurations() != null)
- {
- for (TransportConfiguration staticConnector : serverLocator.getStaticTransportConfigurations())
- {
- if (connector.equals(staticConnector))
- {
- nodeUP(nodeID, pair, false, 0);
- }
- }
- }
- }
-
private void createNewRecord(final String nodeID,
final TransportConfiguration connector,
final SimpleString queueName,
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -25,12 +25,16 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
+import apple.awt.CList;
+
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.client.impl.Topology;
+import org.hornetq.core.client.impl.TopologyMember;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.BroadcastGroupConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
@@ -194,7 +198,10 @@
managementService.unregisterCluster(clusterConnection.getName().toString());
}
+ clusterConnectionListeners.clear();
+ clientListeners.clear();
clusterConnections.clear();
+ topology.clear();
}
@@ -209,27 +216,56 @@
started = false;
}
- public void notifyClientsNodeDown(String nodeID)
+ public void notifyNodeDown(String nodeID)
{
- topology.removeMember(nodeID);
+ if (nodeID.equals(nodeUUID.toString()))
+ {
+ return;
+ }
- for (ClusterTopologyListener listener : clientListeners)
+ boolean removed = topology.removeMember(nodeID);
+
+ if (removed)
{
- listener.nodeDown(nodeID);
+
+ for (ClusterTopologyListener listener : clientListeners)
+ {
+ listener.nodeDown(nodeID);
+ }
+
+ for (ClusterTopologyListener listener : clusterConnectionListeners)
+ {
+ listener.nodeDown(nodeID);
+ }
}
}
- public void notifyClientsNodeUp(String nodeID,
+ public void notifyNodeUp(String nodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
boolean last,
int distance)
{
- topology.addMember(nodeID, new TopologyMember(connectorPair, distance));
+ if (nodeID.equals(nodeUUID.toString()))
+ {
+ return;
+ }
+
+ boolean updated = topology.addMember(nodeID, new TopologyMember(connectorPair, distance));
+ if (distance >= topology.size() || updated)
+ {
+ return;
+ }
+
for (ClusterTopologyListener listener : clientListeners)
{
listener.nodeUP(nodeID, connectorPair, last, distance);
}
+
+ for (ClusterTopologyListener listener : clusterConnectionListeners)
+ {
+ listener.nodeUP(nodeID, connectorPair, last, distance);
+ }
}
public boolean isStarted()
@@ -260,7 +296,6 @@
public synchronized void addClusterTopologyListener(final ClusterTopologyListener listener,
final boolean clusterConnection)
{
- System.out.println("ClusterManagerImpl.addClusterTopologyListener() on " + nodeUUID + " " + clusterConnection + " " + listener);
if (clusterConnection)
{
this.clusterConnectionListeners.add(listener);
@@ -287,6 +322,11 @@
}
}
+ public Topology getTopology()
+ {
+ return topology;
+ }
+
// backup node becomes live
public synchronized void activate()
{
@@ -609,7 +649,7 @@
serverLocator = null;
}
- ClusterConnection clusterConnection = new ClusterConnectionImpl(serverLocator,
+ ClusterConnectionImpl clusterConnection = new ClusterConnectionImpl(serverLocator,
connector,
new SimpleString(config.getName()),
new SimpleString(config.getAddress()),
Deleted: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/Topology.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/Topology.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/Topology.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -1,76 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.core.server.cluster.impl;
-
-import org.hornetq.api.core.Pair;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClusterTopologyListener;
-
-import java.lang.reflect.Array;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- * Created Aug 16, 2010
- */
-public class Topology
-{
- /*
- * topology describes the other cluster nodes that this server knows about:
- *
- * keys are node IDs
- * values are a pair of live/backup transport configurations
- */
- private Map<String, TopologyMember> topology = new HashMap<String, TopologyMember>();
-
- public synchronized void addMember(String nodeId, TopologyMember member)
- {
- topology.put(nodeId, member);
- }
-
- public synchronized void removeMember(String nodeId)
- {
- topology.remove(nodeId);
- }
-
- public synchronized void fireListeners(ClusterTopologyListener listener)
- {
- int count = 0;
- for (Map.Entry<String, TopologyMember> entry : topology.entrySet())
- {
- listener.nodeUP(entry.getKey(), entry.getValue().getConnector(), ++count == topology.size(), entry.getValue().getDistance());
- }
- }
-
- public TopologyMember getMember(String nodeID)
- {
- return topology.get(nodeID);
- }
-
- public boolean isEmpty()
- {
- return topology.isEmpty();
- }
-
- public Collection<TopologyMember> getMembers()
- {
- return topology.values();
- }
-
- public int size()
- {
- return topology.size();
- }
-}
Deleted: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/TopologyMember.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/TopologyMember.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/TopologyMember.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -1,42 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.core.server.cluster.impl;
-
-import org.hornetq.api.core.Pair;
-import org.hornetq.api.core.TransportConfiguration;
-
-/**
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- * Created Aug 16, 2010
- */
-public class TopologyMember
-{
- private final Pair<TransportConfiguration, TransportConfiguration> connector;
-
- private final int distance;
- public TopologyMember(Pair<TransportConfiguration, TransportConfiguration> connector, int distance)
- {
- this.connector = connector;
- this.distance = distance;
- }
-
- public Pair<TransportConfiguration, TransportConfiguration> getConnector()
- {
- return connector;
- }
-
- public int getDistance()
- {
- return distance;
- }
-}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -760,6 +760,8 @@
out += cc.description() + "\n";
}
}
+ out += "\n\nfull topology:";
+ out += clusterManager.getTopology().describe();
return out + br;
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -189,6 +189,12 @@
waitForBindings(3, "queues.testaddress", 1, 1, true);
waitForBindings(4, "queues.testaddress", 1, 1, true);
+ System.out.println(clusterDescription(servers[0]));
+ System.out.println(clusterDescription(servers[1]));
+ System.out.println(clusterDescription(servers[2]));
+ System.out.println(clusterDescription(servers[3]));
+ System.out.println(clusterDescription(servers[4]));
+
waitForBindings(0, "queues.testaddress", 4, 4, false);
waitForBindings(1, "queues.testaddress", 4, 4, false);
waitForBindings(2, "queues.testaddress", 4, 4, false);
@@ -1326,6 +1332,11 @@
public void testStartStopServers() throws Exception
{
+ doTestStartStopServers(1, 3000);
+ }
+
+ public void doTestStartStopServers(long pauseBeforeServerRestarts, long pauseAfterServerRestarts) throws Exception
+ {
setupCluster();
startServers();
@@ -1412,6 +1423,15 @@
waitForBindings(3, "queues.testaddress", 6, 6, true);
waitForBindings(4, "queues.testaddress", 7, 7, true);
+ Thread.sleep(2000);
+ System.out.println("#####################################");
+ System.out.println(clusterDescription(servers[0]));
+ System.out.println(clusterDescription(servers[1]));
+ System.out.println(clusterDescription(servers[2]));
+ System.out.println(clusterDescription(servers[3]));
+ System.out.println(clusterDescription(servers[4]));
+ System.out.println("#####################################");
+
waitForBindings(0, "queues.testaddress", 23, 23, false);
waitForBindings(1, "queues.testaddress", 23, 23, false);
waitForBindings(2, "queues.testaddress", 23, 23, false);
@@ -1455,9 +1475,11 @@
System.out.println(clusterDescription(servers[4]));
System.out.println("#####################################");
+ Thread.sleep(pauseBeforeServerRestarts);
+
startServers(3, 0);
- Thread.sleep(3000);
+ Thread.sleep(pauseAfterServerRestarts);
setupSessionFactory(0, isNetty());
setupSessionFactory(3, isNetty());
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -98,193 +98,7 @@
*/
public void testStartStopServersWithPauseBeforeRestarting() throws Exception
{
- setupCluster();
-
- startServers();
-
- setupSessionFactory(0, isNetty());
- setupSessionFactory(1, isNetty());
- setupSessionFactory(2, isNetty());
- setupSessionFactory(3, isNetty());
- setupSessionFactory(4, isNetty());
-
- createQueue(0, "queues.testaddress", "queue0", null, false);
- createQueue(1, "queues.testaddress", "queue1", null, false);
- createQueue(2, "queues.testaddress", "queue2", null, false);
- createQueue(3, "queues.testaddress", "queue3", null, false);
- createQueue(4, "queues.testaddress", "queue4", null, false);
-
- createQueue(0, "queues.testaddress", "queue5", null, false);
- createQueue(1, "queues.testaddress", "queue6", null, false);
- createQueue(2, "queues.testaddress", "queue7", null, false);
- createQueue(3, "queues.testaddress", "queue8", null, false);
- createQueue(4, "queues.testaddress", "queue9", null, false);
-
- createQueue(0, "queues.testaddress", "queue10", null, false);
- createQueue(1, "queues.testaddress", "queue11", null, false);
- createQueue(2, "queues.testaddress", "queue12", null, false);
- createQueue(3, "queues.testaddress", "queue13", null, false);
- createQueue(4, "queues.testaddress", "queue14", null, false);
-
- createQueue(0, "queues.testaddress", "queue15", null, false);
- createQueue(1, "queues.testaddress", "queue15", null, false);
- createQueue(2, "queues.testaddress", "queue15", null, false);
- createQueue(3, "queues.testaddress", "queue15", null, false);
- createQueue(4, "queues.testaddress", "queue15", null, false);
-
- createQueue(2, "queues.testaddress", "queue16", null, false);
- createQueue(3, "queues.testaddress", "queue16", null, false);
- createQueue(4, "queues.testaddress", "queue16", null, false);
-
- createQueue(0, "queues.testaddress", "queue17", null, false);
- createQueue(1, "queues.testaddress", "queue17", null, false);
- createQueue(4, "queues.testaddress", "queue17", null, false);
-
- createQueue(3, "queues.testaddress", "queue18", null, false);
- createQueue(4, "queues.testaddress", "queue18", null, false);
-
- addConsumer(0, 0, "queue0", null);
- addConsumer(1, 1, "queue1", null);
- addConsumer(2, 2, "queue2", null);
- addConsumer(3, 3, "queue3", null);
- addConsumer(4, 4, "queue4", null);
-
- addConsumer(5, 0, "queue5", null);
- addConsumer(6, 1, "queue6", null);
- addConsumer(7, 2, "queue7", null);
- addConsumer(8, 3, "queue8", null);
- addConsumer(9, 4, "queue9", null);
-
- addConsumer(10, 0, "queue10", null);
- addConsumer(11, 1, "queue11", null);
- addConsumer(12, 2, "queue12", null);
- addConsumer(13, 3, "queue13", null);
- addConsumer(14, 4, "queue14", null);
-
- addConsumer(15, 0, "queue15", null);
- addConsumer(16, 1, "queue15", null);
- addConsumer(17, 2, "queue15", null);
- addConsumer(18, 3, "queue15", null);
- addConsumer(19, 4, "queue15", null);
-
- addConsumer(20, 2, "queue16", null);
- addConsumer(21, 3, "queue16", null);
- addConsumer(22, 4, "queue16", null);
-
- addConsumer(23, 0, "queue17", null);
- addConsumer(24, 1, "queue17", null);
- addConsumer(25, 4, "queue17", null);
-
- addConsumer(26, 3, "queue18", null);
- addConsumer(27, 4, "queue18", null);
-
- waitForBindings(0, "queues.testaddress", 5, 5, true);
- waitForBindings(1, "queues.testaddress", 5, 5, true);
- waitForBindings(2, "queues.testaddress", 5, 5, true);
- waitForBindings(3, "queues.testaddress", 6, 6, true);
- waitForBindings(4, "queues.testaddress", 7, 7, true);
-
- waitForBindings(0, "queues.testaddress", 23, 23, false);
- waitForBindings(1, "queues.testaddress", 23, 23, false);
- waitForBindings(2, "queues.testaddress", 23, 23, false);
- waitForBindings(3, "queues.testaddress", 22, 22, false);
- waitForBindings(4, "queues.testaddress", 21, 21, false);
-
- send(0, "queues.testaddress", 10, false, null);
-
- verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
-
- verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
-
- verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
-
- verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
-
- verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
-
- removeConsumer(0);
- removeConsumer(5);
- removeConsumer(10);
- removeConsumer(15);
- removeConsumer(23);
- removeConsumer(3);
- removeConsumer(8);
- removeConsumer(13);
- removeConsumer(18);
- removeConsumer(21);
- removeConsumer(26);
-
- closeSessionFactory(0);
- closeSessionFactory(3);
-
- stopServers(0, 3);
-
- Thread.sleep(10000);
-
- startServers(3, 0);
-
- setupSessionFactory(0, isNetty());
- setupSessionFactory(3, isNetty());
-
- createQueue(0, "queues.testaddress", "queue0", null, false);
- createQueue(3, "queues.testaddress", "queue3", null, false);
-
- createQueue(0, "queues.testaddress", "queue5", null, false);
- createQueue(3, "queues.testaddress", "queue8", null, false);
-
- createQueue(0, "queues.testaddress", "queue10", null, false);
- createQueue(3, "queues.testaddress", "queue13", null, false);
-
- createQueue(0, "queues.testaddress", "queue15", null, false);
- createQueue(3, "queues.testaddress", "queue15", null, false);
-
- createQueue(3, "queues.testaddress", "queue16", null, false);
-
- createQueue(0, "queues.testaddress", "queue17", null, false);
-
- createQueue(3, "queues.testaddress", "queue18", null, false);
-
- addConsumer(0, 0, "queue0", null);
- addConsumer(3, 3, "queue3", null);
-
- addConsumer(5, 0, "queue5", null);
- addConsumer(8, 3, "queue8", null);
-
- addConsumer(10, 0, "queue10", null);
- addConsumer(13, 3, "queue13", null);
-
- addConsumer(15, 0, "queue15", null);
- addConsumer(18, 3, "queue15", null);
-
- addConsumer(21, 3, "queue16", null);
-
- addConsumer(23, 0, "queue17", null);
-
- addConsumer(26, 3, "queue18", null);
-
- waitForBindings(0, "queues.testaddress", 5, 5, true);
- waitForBindings(1, "queues.testaddress", 5, 5, true);
- waitForBindings(2, "queues.testaddress", 5, 5, true);
- waitForBindings(3, "queues.testaddress", 6, 6, true);
- waitForBindings(4, "queues.testaddress", 7, 7, true);
-
- waitForBindings(0, "queues.testaddress", 23, 23, false);
- waitForBindings(1, "queues.testaddress", 23, 23, false);
- waitForBindings(2, "queues.testaddress", 23, 23, false);
- waitForBindings(3, "queues.testaddress", 22, 22, false);
- waitForBindings(4, "queues.testaddress", 21, 21, false);
-
- send(0, "queues.testaddress", 10, false, null);
-
- verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
-
- verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
-
- verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
-
- verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
-
- verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
+ doTestStartStopServers(10000, 3000);
}
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -92,7 +92,39 @@
stopServers(0, 1);
}
+
+ public void testStartPauseStartOther() throws Exception
+ {
+ startServers(0);
+
+ setupSessionFactory(0, isNetty());
+ createQueue(0, "queues", "queue0", null, false);
+ addConsumer(0, 0, "queue0", null);
+
+ // we let the discovery initial timeout expire,
+ // #0 will be alone in the cluster
+ Thread.sleep(12000);
+
+ startServers(1);
+ setupSessionFactory(1, isNetty());
+ createQueue(1, "queues", "queue0", null, false);
+
+ addConsumer(1, 1, "queue0", null);
+
+ waitForBindings(0, "queues", 1, 1, true);
+ waitForBindings(1, "queues", 1, 1, true);
+
+ waitForBindings(0, "queues", 1, 1, false);
+ waitForBindings(1, "queues", 1, 1, false);
+
+ send(0, "queues", 10, false, null);
+ verifyReceiveRoundRobin(10, 0, 1);
+ verifyNotReceive(0, 1);
+
+ stopServers(0, 1);
+ }
+
public void testStopStart() throws Exception
{
startServers(0, 1);
@@ -127,7 +159,12 @@
System.out.println(clusterDescription(servers[0]));
startServers(1);
+
+ Thread.sleep(3000);
+ System.out.println(clusterDescription(servers[0]));
+ System.out.println(clusterDescription(servers[1]));
+
setupSessionFactory(1, isNetty());
createQueue(1, "queues", "queue0", null, false);
@@ -137,9 +174,6 @@
waitForBindings(0, "queues", 1, 1, true);
waitForBindings(1, "queues", 1, 1, true);
- System.out.println(clusterDescription(servers[0]));
- System.out.println(clusterDescription(servers[1]));
-
waitForBindings(1, "queues", 1, 1, false);
waitForBindings(0, "queues", 1, 1, false);
15 years, 9 months
JBoss hornetq SVN: r9564 - tags/HornetQ_2_1_2_Final.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-18 15:29:30 -0400 (Wed, 18 Aug 2010)
New Revision: 9564
Added:
tags/HornetQ_2_1_2_Final/remove-twitter.patch
Log:
Adding patch/option to remove twitter support from the build
Added: tags/HornetQ_2_1_2_Final/remove-twitter.patch
===================================================================
--- tags/HornetQ_2_1_2_Final/remove-twitter.patch (rev 0)
+++ tags/HornetQ_2_1_2_Final/remove-twitter.patch 2010-08-18 19:29:30 UTC (rev 9564)
@@ -0,0 +1,1304 @@
+#This file is used to remove twitter support from HornetQ
+#As EAP doesn't have twitter4j.jar certified for the build
+Index: src/main/org/hornetq/integration/twitter/TwitterConstants.java
+===================================================================
+--- src/main/org/hornetq/integration/twitter/TwitterConstants.java (revision 9563)
++++ src/main/org/hornetq/integration/twitter/TwitterConstants.java (working copy)
+@@ -1,81 +0,0 @@
+-/*
+- * Copyright 2010 Red Hat, Inc.
+- * Red Hat licenses this file to you under the Apache License, version
+- * 2.0 (the "License"); you may not use this file except in compliance
+- * with the License. You may obtain a copy of the License at
+- * http://www.apache.org/licenses/LICENSE-2.0
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+- * implied. See the License for the specific language governing
+- * permissions and limitations under the License.
+- */
+-
+-package org.hornetq.integration.twitter;
+-
+-import java.util.HashSet;
+-import java.util.Set;
+-
+-/**
+- * A TwitterConstants
+- *
+- * @author <a href="tm.igarashi(a)gmail.com">Tomohisa Igarashi</a>
+- */
+-public class TwitterConstants
+-{
+- public static final String KEY_ID = "id";
+- public static final String KEY_SOURCE = "source";
+- public static final String KEY_CREATED_AT = "createdAt";
+- public static final String KEY_IS_TRUNCATED = "isTruncated";
+- public static final String KEY_IN_REPLY_TO_STATUS_ID = "inReplyToStatusId";
+- public static final String KEY_IN_REPLY_TO_USER_ID = "inReplyToUserId";
+- public static final String KEY_IN_REPLY_TO_SCREEN_NAME = "inReplyToScreenName";
+- public static final String KEY_IS_FAVORITED = "isFavorited";
+- public static final String KEY_IS_RETWEET = "isRetweet";
+- public static final String KEY_CONTRIBUTORS = "contributors";
+- public static final String KEY_GEO_LOCATION_LATITUDE = "geoLocation.latitude";
+- public static final String KEY_GEO_LOCATION_LONGITUDE = "geoLocation.longitude";
+- public static final String KEY_PLACE_ID = "place.id";
+- public static final String KEY_DISPLAY_COODINATES = "displayCoodinates";
+-
+- public static final int DEFAULT_POLLING_INTERVAL_SECS = 10;
+- public static final int DEFAULT_PAGE_SIZE = 100;
+- public static final int FIRST_ATTEMPT_PAGE_SIZE = 1;
+- public static final int START_SINCE_ID = 1;
+- public static final int INITIAL_MESSAGE_BUFFER_SIZE = 50;
+-
+- public static final Set<String> ALLOWABLE_INCOMING_CONNECTOR_KEYS;
+- public static final Set<String> REQUIRED_INCOMING_CONNECTOR_KEYS;
+-
+- public static final Set<String> ALLOWABLE_OUTGOING_CONNECTOR_KEYS;
+- public static final Set<String> REQUIRED_OUTGOING_CONNECTOR_KEYS;
+-
+- public static final String USER_NAME = "username";
+- public static final String PASSWORD = "password";
+- public static final String QUEUE_NAME = "queue";
+- public static final String INCOMING_INTERVAL = "interval";
+-
+- static
+- {
+- ALLOWABLE_INCOMING_CONNECTOR_KEYS = new HashSet<String>();
+- ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(USER_NAME);
+- ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(PASSWORD);
+- ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(QUEUE_NAME);
+- ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(INCOMING_INTERVAL);
+-
+- REQUIRED_INCOMING_CONNECTOR_KEYS = new HashSet<String>();
+- REQUIRED_INCOMING_CONNECTOR_KEYS.add(USER_NAME);
+- REQUIRED_INCOMING_CONNECTOR_KEYS.add(PASSWORD);
+- REQUIRED_INCOMING_CONNECTOR_KEYS.add(QUEUE_NAME);
+-
+- ALLOWABLE_OUTGOING_CONNECTOR_KEYS = new HashSet<String>();
+- ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(USER_NAME);
+- ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(PASSWORD);
+- ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(QUEUE_NAME);
+-
+- REQUIRED_OUTGOING_CONNECTOR_KEYS = new HashSet<String>();
+- REQUIRED_OUTGOING_CONNECTOR_KEYS.add(USER_NAME);
+- REQUIRED_OUTGOING_CONNECTOR_KEYS.add(PASSWORD);
+- REQUIRED_OUTGOING_CONNECTOR_KEYS.add(QUEUE_NAME);
+- }
+-}
+Index: src/main/org/hornetq/integration/twitter/impl/OutgoingTweetsHandler.java
+===================================================================
+--- src/main/org/hornetq/integration/twitter/impl/OutgoingTweetsHandler.java (revision 9563)
++++ src/main/org/hornetq/integration/twitter/impl/OutgoingTweetsHandler.java (working copy)
+@@ -1,202 +0,0 @@
+-/*
+- * Copyright 2009 Red Hat, Inc.
+- * Red Hat licenses this file to you under the Apache License, version
+- * 2.0 (the "License"); you may not use this file except in compliance
+- * with the License. You may obtain a copy of the License at
+- * http://www.apache.org/licenses/LICENSE-2.0
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+- * implied. See the License for the specific language governing
+- * permissions and limitations under the License.
+- */
+-package org.hornetq.integration.twitter.impl;
+-
+-import org.hornetq.api.core.SimpleString;
+-import org.hornetq.core.filter.Filter;
+-import org.hornetq.core.logging.Logger;
+-import org.hornetq.core.postoffice.Binding;
+-import org.hornetq.core.postoffice.PostOffice;
+-import org.hornetq.core.server.*;
+-import org.hornetq.integration.twitter.TwitterConstants;
+-import org.hornetq.utils.ConfigurationHelper;
+-import twitter4j.*;
+-
+-import java.util.Map;
+-
+-/**
+- * OutgoingTweetsHandler consumes from configured HornetQ address
+- * and forwards to the twitter.
+- */
+-public class OutgoingTweetsHandler implements Consumer, ConnectorService
+-{
+- private static final Logger log = Logger.getLogger(OutgoingTweetsHandler.class);
+-
+- private final String connectorName;
+-
+- private final String userName;
+-
+- private final String password;
+-
+- private final String queueName;
+-
+- private final PostOffice postOffice;
+-
+- private Twitter twitter = null;
+-
+- private Queue queue = null;
+-
+- private Filter filter = null;
+-
+- private boolean isStarted = false;
+-
+- public OutgoingTweetsHandler(final String connectorName,
+- final Map<String, Object> configuration,
+- final PostOffice postOffice)
+- {
+- this.connectorName = connectorName;
+- this.userName = ConfigurationHelper.getStringProperty(TwitterConstants.USER_NAME, null, configuration);
+- this.password = ConfigurationHelper.getStringProperty(TwitterConstants.PASSWORD, null, configuration);
+- this.queueName = ConfigurationHelper.getStringProperty(TwitterConstants.QUEUE_NAME, null, configuration);
+- this.postOffice = postOffice;
+- }
+-
+- /**
+- * TODO streaming API support
+- * TODO rate limit support
+- */
+- public synchronized void start() throws Exception
+- {
+- if(this.isStarted)
+- {
+- return;
+- }
+-
+- if(this.connectorName == null || this.connectorName.trim().equals(""))
+- {
+- throw new Exception("invalid connector name: " + this.connectorName);
+- }
+-
+- if(this.queueName == null || this.queueName.trim().equals(""))
+- {
+- throw new Exception("invalid queue name: " + queueName);
+- }
+-
+- SimpleString name = new SimpleString(this.queueName);
+- Binding b = this.postOffice.getBinding(name);
+- if(b == null)
+- {
+- throw new Exception(connectorName + ": queue " + queueName + " not found");
+- }
+- this.queue = (Queue)b.getBindable();
+-
+- TwitterFactory tf = new TwitterFactory();
+- this.twitter = tf.getInstance(userName, password);
+- this.twitter.verifyCredentials();
+- // TODO make filter-string configurable
+- // this.filter = FilterImpl.createFilter(filterString);
+- this.filter = null;
+-
+- this.queue.addConsumer(this);
+-
+- this.queue.deliverAsync();
+- this.isStarted = true;
+- log.debug(connectorName + ": started");
+- }
+-
+- public boolean isStarted()
+- {
+- return isStarted; //To change body of implemented methods use File | Settings | File Templates.
+- }
+-
+- public synchronized void stop() throws Exception
+- {
+- if(!this.isStarted)
+- {
+- return;
+- }
+-
+- log.debug(connectorName + ": receive shutdown request");
+-
+- this.queue.removeConsumer(this);
+-
+- this.isStarted = false;
+- log.debug(connectorName + ": shutdown");
+- }
+-
+- public String getName()
+- {
+- return connectorName;
+- }
+-
+- public Filter getFilter()
+- {
+- return filter;
+- }
+-
+- public HandleStatus handle(final MessageReference ref) throws Exception
+- {
+- if (filter != null && !filter.match(ref.getMessage()))
+- {
+- return HandleStatus.NO_MATCH;
+- }
+-
+- synchronized (this)
+- {
+- ref.handled();
+-
+- ServerMessage message = ref.getMessage();
+-
+- StatusUpdate status = new StatusUpdate(message.getBodyBuffer().readString());
+-
+- // set optional property
+-
+- if(message.containsProperty(TwitterConstants.KEY_IN_REPLY_TO_STATUS_ID))
+- {
+- status.setInReplyToStatusId(message.getLongProperty(TwitterConstants.KEY_IN_REPLY_TO_STATUS_ID));
+- }
+-
+- if(message.containsProperty(TwitterConstants.KEY_GEO_LOCATION_LATITUDE))
+- {
+- double geolat = message.getDoubleProperty(TwitterConstants.KEY_GEO_LOCATION_LATITUDE);
+- double geolong = message.getDoubleProperty(TwitterConstants.KEY_GEO_LOCATION_LONGITUDE);
+- status.setLocation(new GeoLocation(geolat, geolong));
+- }
+-
+- if(message.containsProperty(TwitterConstants.KEY_PLACE_ID))
+- {
+- status.setPlaceId(message.getStringProperty(TwitterConstants.KEY_PLACE_ID));
+- }
+-
+- if(message.containsProperty(TwitterConstants.KEY_DISPLAY_COODINATES))
+- {
+- status.setDisplayCoordinates(message.getBooleanProperty(TwitterConstants.KEY_DISPLAY_COODINATES));
+- }
+-
+- // send to Twitter
+- try
+- {
+- this.twitter.updateStatus(status);
+- }
+- catch (TwitterException e)
+- {
+- if(e.getStatusCode() == 403 )
+- {
+- // duplicated message
+- log.warn(connectorName + ": HTTP status code = 403: Ignore duplicated message");
+- queue.acknowledge(ref);
+-
+- return HandleStatus.HANDLED;
+- }
+- else
+- {
+- throw e;
+- }
+- }
+-
+- queue.acknowledge(ref);
+- log.debug(connectorName + ": forwarded to twitter: " + message.getMessageID());
+- return HandleStatus.HANDLED;
+- }
+- }
+-}
+Index: src/main/org/hornetq/integration/twitter/impl/IncomingTweetsHandler.java
+===================================================================
+--- src/main/org/hornetq/integration/twitter/impl/IncomingTweetsHandler.java (revision 9563)
++++ src/main/org/hornetq/integration/twitter/impl/IncomingTweetsHandler.java (working copy)
+@@ -1,213 +0,0 @@
+-/*
+- * Copyright 2009 Red Hat, Inc.
+- * Red Hat licenses this file to you under the Apache License, version
+- * 2.0 (the "License"); you may not use this file except in compliance
+- * with the License. You may obtain a copy of the License at
+- * http://www.apache.org/licenses/LICENSE-2.0
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+- * implied. See the License for the specific language governing
+- * permissions and limitations under the License.
+- */
+-package org.hornetq.integration.twitter.impl;
+-
+-import org.hornetq.api.core.SimpleString;
+-import org.hornetq.core.logging.Logger;
+-import org.hornetq.core.persistence.StorageManager;
+-import org.hornetq.core.postoffice.Binding;
+-import org.hornetq.core.postoffice.PostOffice;
+-import org.hornetq.core.server.ConnectorService;
+-import org.hornetq.core.server.ServerMessage;
+-import org.hornetq.core.server.impl.ServerMessageImpl;
+-import org.hornetq.integration.twitter.TwitterConstants;
+-import org.hornetq.utils.ConfigurationHelper;
+-import twitter4j.*;
+-
+-import java.util.Map;
+-import java.util.concurrent.ScheduledExecutorService;
+-import java.util.concurrent.ScheduledFuture;
+-import java.util.concurrent.TimeUnit;
+-
+-/**
+- * IncomingTweetsHandler consumes from twitter and forwards to the
+- * configured HornetQ address.
+- */
+-public class IncomingTweetsHandler implements ConnectorService
+-{
+- private static final Logger log = Logger.getLogger(IncomingTweetsHandler.class);
+-
+- private final String connectorName;
+-
+- private final String userName;
+-
+- private final String password;
+-
+- private final String queueName;
+-
+- private final int intervalSeconds;
+-
+- private final StorageManager storageManager;
+-
+- private final PostOffice postOffice;
+-
+- private Paging paging;
+-
+- private Twitter twitter;
+-
+- private boolean isStarted = false;
+-
+- private final ScheduledExecutorService scheduledPool;
+-
+- private ScheduledFuture scheduledFuture;
+-
+- public IncomingTweetsHandler(final String connectorName,
+- final Map<String, Object> configuration,
+- final StorageManager storageManager,
+- final PostOffice postOffice,
+- final ScheduledExecutorService scheduledThreadPool)
+- {
+- this.connectorName = connectorName;
+- this.userName = ConfigurationHelper.getStringProperty(TwitterConstants.USER_NAME, null, configuration);
+- this.password = ConfigurationHelper.getStringProperty(TwitterConstants.PASSWORD, null, configuration);
+- this.queueName = ConfigurationHelper.getStringProperty(TwitterConstants.QUEUE_NAME, null, configuration);
+- Integer intervalSeconds = ConfigurationHelper.getIntProperty(TwitterConstants.INCOMING_INTERVAL, 0, configuration);
+- if (intervalSeconds > 0)
+- {
+- this.intervalSeconds = intervalSeconds;
+- }
+- else
+- {
+- this.intervalSeconds = TwitterConstants.DEFAULT_POLLING_INTERVAL_SECS;
+- }
+- this.storageManager = storageManager;
+- this.postOffice = postOffice;
+- this.scheduledPool = scheduledThreadPool;
+- }
+-
+- public void start() throws Exception
+- {
+- Binding b = postOffice.getBinding(new SimpleString(queueName));
+- if (b == null)
+- {
+- throw new Exception(connectorName + ": queue " + queueName + " not found");
+- }
+-
+- paging = new Paging();
+- TwitterFactory tf = new TwitterFactory();
+- this.twitter = tf.getInstance(userName, password);
+- this.twitter.verifyCredentials();
+-
+- // getting latest ID
+- this.paging.setCount(TwitterConstants.FIRST_ATTEMPT_PAGE_SIZE);
+- ResponseList<Status> res = this.twitter.getHomeTimeline(paging);
+- this.paging.setSinceId(res.get(0).getId());
+- log.debug(connectorName + " initialise(): got latest ID: " + this.paging.getSinceId());
+-
+- // TODO make page size configurable
+- this.paging.setCount(TwitterConstants.DEFAULT_PAGE_SIZE);
+-
+- scheduledFuture = this.scheduledPool.scheduleWithFixedDelay(new TweetsRunnable(),
+- intervalSeconds,
+- intervalSeconds,
+- TimeUnit.SECONDS);
+- isStarted = true;
+- }
+-
+- public void stop() throws Exception
+- {
+- if(!isStarted)
+- {
+- return;
+- }
+- scheduledFuture.cancel(true);
+- paging = null;
+- isStarted = false;
+- }
+-
+- public boolean isStarted()
+- {
+- return isStarted;
+- }
+-
+- private void poll() throws Exception
+- {
+- // get new tweets
+- ResponseList<Status> res = this.twitter.getHomeTimeline(paging);
+-
+- if (res == null || res.size() == 0)
+- {
+- return;
+- }
+-
+- for (int i = res.size() - 1; i >= 0; i--)
+- {
+- Status status = res.get(i);
+-
+- ServerMessage msg = new ServerMessageImpl(this.storageManager.generateUniqueID(),
+- TwitterConstants.INITIAL_MESSAGE_BUFFER_SIZE);
+- msg.setAddress(new SimpleString(this.queueName));
+- msg.setDurable(true);
+- msg.encodeMessageIDToBuffer();
+-
+- putTweetIntoMessage(status, msg);
+-
+- this.postOffice.route(msg, false);
+- log.debug(connectorName + ": routed: " + status.toString());
+- }
+-
+- this.paging.setSinceId(res.get(0).getId());
+- log.debug(connectorName + ": update latest ID: " + this.paging.getSinceId());
+- }
+-
+- private void putTweetIntoMessage(final Status status, final ServerMessage msg)
+- {
+- msg.getBodyBuffer().writeString(status.getText());
+- msg.putLongProperty(TwitterConstants.KEY_ID, status.getId());
+- msg.putStringProperty(TwitterConstants.KEY_SOURCE, status.getSource());
+-
+- msg.putLongProperty(TwitterConstants.KEY_CREATED_AT, status.getCreatedAt().getTime());
+- msg.putBooleanProperty(TwitterConstants.KEY_IS_TRUNCATED, status.isTruncated());
+- msg.putLongProperty(TwitterConstants.KEY_IN_REPLY_TO_STATUS_ID, status.getInReplyToStatusId());
+- msg.putIntProperty(TwitterConstants.KEY_IN_REPLY_TO_USER_ID, status.getInReplyToUserId());
+- msg.putBooleanProperty(TwitterConstants.KEY_IS_FAVORITED, status.isFavorited());
+- msg.putBooleanProperty(TwitterConstants.KEY_IS_RETWEET, status.isRetweet());
+- msg.putObjectProperty(TwitterConstants.KEY_CONTRIBUTORS, status.getContributors());
+- GeoLocation gl;
+- if ((gl = status.getGeoLocation()) != null)
+- {
+- msg.putDoubleProperty(TwitterConstants.KEY_GEO_LOCATION_LATITUDE, gl.getLatitude());
+- msg.putDoubleProperty(TwitterConstants.KEY_GEO_LOCATION_LONGITUDE, gl.getLongitude());
+- }
+- Place place;
+- if ((place = status.getPlace()) != null)
+- {
+- msg.putStringProperty(TwitterConstants.KEY_PLACE_ID, place.getId());
+- }
+- }
+-
+- public String getName()
+- {
+- return connectorName;
+- }
+-
+- private final class TweetsRunnable implements Runnable
+- {
+- /**
+- * TODO streaming API support
+- * TODO rate limit support
+- */
+- public void run()
+- {
+- // Avoid cancelling the task with RuntimeException
+- try
+- {
+- poll();
+- }
+- catch (Throwable t)
+- {
+- log.warn(connectorName, t);
+- }
+- }
+- }
+-}
+Index: src/main/org/hornetq/integration/twitter/TwitterOutgoingConnectorServiceFactory.java
+===================================================================
+--- src/main/org/hornetq/integration/twitter/TwitterOutgoingConnectorServiceFactory.java (revision 9563)
++++ src/main/org/hornetq/integration/twitter/TwitterOutgoingConnectorServiceFactory.java (working copy)
+@@ -1,45 +0,0 @@
+-/*
+- * Copyright 2009 Red Hat, Inc.
+- * Red Hat licenses this file to you under the Apache License, version
+- * 2.0 (the "License"); you may not use this file except in compliance
+- * with the License. You may obtain a copy of the License at
+- * http://www.apache.org/licenses/LICENSE-2.0
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+- * implied. See the License for the specific language governing
+- * permissions and limitations under the License.
+- */
+-package org.hornetq.integration.twitter;
+-
+-import org.hornetq.core.persistence.StorageManager;
+-import org.hornetq.core.postoffice.PostOffice;
+-import org.hornetq.core.server.ConnectorService;
+-import org.hornetq.core.server.ConnectorServiceFactory;
+-import org.hornetq.integration.twitter.impl.OutgoingTweetsHandler;
+-
+-import java.util.Map;
+-import java.util.Set;
+-import java.util.concurrent.ScheduledExecutorService;
+-
+-/**
+- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+- * Created Jun 29, 2010
+- */
+-public class TwitterOutgoingConnectorServiceFactory implements ConnectorServiceFactory
+-{
+- public ConnectorService createConnectorService(String connectorName, Map<String, Object> configuration, StorageManager storageManager, PostOffice postOffice, ScheduledExecutorService scheduledThreadPool)
+- {
+- return new OutgoingTweetsHandler(connectorName, configuration, postOffice);
+- }
+-
+- public Set<String> getAllowableProperties()
+- {
+- return TwitterConstants.ALLOWABLE_OUTGOING_CONNECTOR_KEYS;
+- }
+-
+- public Set<String> getRequiredProperties()
+- {
+- return TwitterConstants.REQUIRED_OUTGOING_CONNECTOR_KEYS;
+- }
+-}
+Index: src/main/org/hornetq/integration/twitter/TwitterIncomingConnectorServiceFactory.java
+===================================================================
+--- src/main/org/hornetq/integration/twitter/TwitterIncomingConnectorServiceFactory.java (revision 9563)
++++ src/main/org/hornetq/integration/twitter/TwitterIncomingConnectorServiceFactory.java (working copy)
+@@ -1,48 +0,0 @@
+-/*
+- * Copyright 2009 Red Hat, Inc.
+- * Red Hat licenses this file to you under the Apache License, version
+- * 2.0 (the "License"); you may not use this file except in compliance
+- * with the License. You may obtain a copy of the License at
+- * http://www.apache.org/licenses/LICENSE-2.0
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+- * implied. See the License for the specific language governing
+- * permissions and limitations under the License.
+- */
+-package org.hornetq.integration.twitter;
+-
+-import org.hornetq.core.persistence.StorageManager;
+-import org.hornetq.core.postoffice.PostOffice;
+-import org.hornetq.core.server.ConnectorService;
+-import org.hornetq.core.server.ConnectorServiceFactory;
+-import org.hornetq.integration.twitter.impl.IncomingTweetsHandler;
+-
+-import java.util.Map;
+-import java.util.Set;
+-import java.util.concurrent.ScheduledExecutorService;
+-
+-/**
+- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+- * Created Jun 29, 2010
+- */
+-public class TwitterIncomingConnectorServiceFactory implements ConnectorServiceFactory
+-{
+- public ConnectorService createConnectorService(String connectorName, final Map<String, Object> configuration,
+- final StorageManager storageManager,
+- final PostOffice postOffice,
+- final ScheduledExecutorService scheduledThreadPool)
+- {
+- return new IncomingTweetsHandler(connectorName, configuration, storageManager, postOffice, scheduledThreadPool);
+- }
+-
+- public Set<String> getAllowableProperties()
+- {
+- return TwitterConstants.ALLOWABLE_INCOMING_CONNECTOR_KEYS;
+- }
+-
+- public Set<String> getRequiredProperties()
+- {
+- return TwitterConstants.REQUIRED_INCOMING_CONNECTOR_KEYS;
+- }
+-}
+Index: pom.xml
+===================================================================
+--- pom.xml (revision 9563)
++++ pom.xml (working copy)
+@@ -236,11 +236,6 @@
+ <version>2.1.0.GA</version>
+ </dependency>
+ <!--needed to compile twitter support-->
+- <dependency>
+- <groupId>org.twitter4j</groupId>
+- <artifactId>twitter4j-core</artifactId>
+- <version>2.1.2</version>
+- </dependency>
+ <!-- needed to compile the tests-->
+ <dependency>
+ <groupId>junit</groupId>
+Index: tests/src/org/hornetq/tests/integration/twitter/TwitterTest.java
+===================================================================
+--- tests/src/org/hornetq/tests/integration/twitter/TwitterTest.java (revision 9563)
++++ tests/src/org/hornetq/tests/integration/twitter/TwitterTest.java (working copy)
+@@ -1,511 +0,0 @@
+-/*
+- * Copyright 2010 Red Hat, Inc.
+- * Red Hat licenses this file to you under the Apache License, version
+- * 2.0 (the "License"); you may not use this file except in compliance
+- * with the License. You may obtain a copy of the License at
+- * http://www.apache.org/licenses/LICENSE-2.0
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+- * implied. See the License for the specific language governing
+- * permissions and limitations under the License.
+- */
+-
+-package org.hornetq.tests.integration.twitter;
+-
+-import java.util.HashMap;
+-import java.util.Iterator;
+-import java.util.Set;
+-
+-import junit.framework.Assert;
+-
+-import org.hornetq.api.core.TransportConfiguration;
+-import org.hornetq.api.core.client.ClientConsumer;
+-import org.hornetq.api.core.client.ClientMessage;
+-import org.hornetq.api.core.client.ClientProducer;
+-import org.hornetq.api.core.client.ClientSession;
+-import org.hornetq.api.core.client.ClientSessionFactory;
+-import org.hornetq.api.core.client.HornetQClient;
+-import org.hornetq.core.config.Configuration;
+-import org.hornetq.core.config.ConnectorServiceConfiguration;
+-import org.hornetq.core.config.CoreQueueConfiguration;
+-import org.hornetq.core.logging.Logger;
+-import org.hornetq.core.server.ConnectorService;
+-import org.hornetq.core.server.HornetQServer;
+-import org.hornetq.integration.twitter.TwitterConstants;
+-import org.hornetq.integration.twitter.TwitterIncomingConnectorServiceFactory;
+-import org.hornetq.integration.twitter.TwitterOutgoingConnectorServiceFactory;
+-import org.hornetq.tests.util.ServiceTestBase;
+-import org.hornetq.tests.util.UnitTestCase;
+-import twitter4j.*;
+-
+-/**
+- * A TwitterTest
+- *
+- * @author tm.igarashi(a)gmail.com
+- *
+- *
+- */
+-public class TwitterTest extends ServiceTestBase
+-{
+- private static final Logger log = Logger.getLogger(TwitterTest.class);
+- private static final String KEY_CONNECTOR_NAME = "connector.name";
+- private static final String KEY_USERNAME = "username";
+- private static final String KEY_PASSWORD = "password";
+- private static final String KEY_QUEUE_NAME = "queue.name";
+-
+- private static final String TWITTER_USERNAME = System.getProperty("twitter.username");
+- private static final String TWITTER_PASSWORD = System.getProperty("twitter.password");
+-
+- @Override
+- protected void setUp() throws Exception
+- {
+- if(TWITTER_USERNAME == null || TWITTER_PASSWORD == null)
+- {
+- throw new Exception("* * * Please set twitter.username and twitter.password in system property * * *");
+- }
+- super.setUp();
+- }
+-
+- // incoming
+-
+- public void testSimpleIncoming() throws Exception
+- {
+- internalTestIncoming(true,false);
+- }
+-
+- public void testIncomingNoQueue() throws Exception
+- {
+- internalTestIncoming(false,false);
+- }
+-
+- public void testIncomingWithRestart() throws Exception
+- {
+- internalTestIncoming(true,true);
+- }
+-
+- public void testIncomingWithEmptyConnectorName() throws Exception
+- {
+- HashMap<String,String> params = new HashMap<String,String>();
+- params.put(KEY_CONNECTOR_NAME, "");
+- internalTestIncomingFailedToInitialize(params);
+- }
+-
+- public void testIncomingWithEmptyQueueName() throws Exception
+- {
+- HashMap<String,String> params = new HashMap<String,String>();
+- params.put(KEY_QUEUE_NAME, "");
+- internalTestIncomingFailedToInitialize(params);
+- }
+-
+- public void testIncomingWithInvalidCredentials() throws Exception
+- {
+- HashMap<String,String> params = new HashMap<String,String>();
+- params.put(KEY_USERNAME, "invalidUsername");
+- params.put(KEY_PASSWORD, "invalidPassword");
+- internalTestIncomingFailedToInitialize(params);
+- }
+-
+- //outgoing
+-
+- public void testSimpleOutgoing() throws Exception
+- {
+- internalTestOutgoing(true,false);
+- }
+-
+- public void testOutgoingNoQueue() throws Exception
+- {
+- internalTestOutgoing(false,false);
+- }
+- public void testOutgoingWithRestart() throws Exception
+- {
+- internalTestOutgoing(true,true);
+- }
+-
+- public void testOutgoingWithEmptyConnectorName() throws Exception
+- {
+- HashMap<String,String> params = new HashMap<String,String>();
+- params.put(KEY_CONNECTOR_NAME, "");
+- internalTestOutgoingFailedToInitialize(params);
+- }
+-
+- public void testOutgoingWithEmptyQueueName() throws Exception
+- {
+- HashMap<String,String> params = new HashMap<String,String>();
+- params.put(KEY_QUEUE_NAME, "");
+- internalTestOutgoingFailedToInitialize(params);
+- }
+-
+- public void testOutgoingWithInvalidCredentials() throws Exception
+- {
+- HashMap<String,String> params = new HashMap<String,String>();
+- params.put(KEY_USERNAME, "invalidUsername");
+- params.put(KEY_PASSWORD, "invalidPassword");
+- internalTestOutgoingFailedToInitialize(params);
+- }
+-
+- /**
+- * This will fail until TFJ-347 is fixed.
+- * http://twitter4j.org/jira/browse/TFJ-347
+- *
+- * @throws Exception
+- */
+- public void _testOutgoingWithInReplyTo() throws Exception
+- {
+- internalTestOutgoingWithInReplyTo();
+- }
+-
+- protected void internalTestIncoming(boolean createQueue, boolean restart) throws Exception
+- {
+- HornetQServer server0 = null;
+- ClientSession session = null;
+- String queue = "TwitterTestQueue";
+- int interval = 5;
+- Twitter twitter = new TwitterFactory().getInstance(TWITTER_USERNAME,TWITTER_PASSWORD);
+- String testMessage = "TwitterTest/incoming: " + System.currentTimeMillis();
+- log.debug("test incoming: " + testMessage);
+-
+- try
+- {
+- Configuration configuration = createDefaultConfig(false);
+- HashMap<String, Object> config = new HashMap<String, Object>();
+- config.put(TwitterConstants.INCOMING_INTERVAL, interval);
+- config.put(TwitterConstants.QUEUE_NAME, queue);
+- config.put(TwitterConstants.USER_NAME, TWITTER_USERNAME);
+- config.put(TwitterConstants.PASSWORD, TWITTER_PASSWORD);
+- ConnectorServiceConfiguration inconf =
+- new ConnectorServiceConfiguration(
+- TwitterIncomingConnectorServiceFactory.class.getName(),
+- config,"test-incoming-connector");
+- configuration.getConnectorServiceConfigurations().add(inconf);
+-
+- if(createQueue)
+- {
+- CoreQueueConfiguration qc = new CoreQueueConfiguration(queue, queue, null, true);
+- configuration.getQueueConfigurations().add(qc);
+- }
+-
+- server0 = createServer(false,configuration);
+- server0.start();
+-
+- if(restart)
+- {
+- server0.getConnectorsService().stop();
+- server0.getConnectorsService().start();
+- }
+-
+- assertEquals(1, server0.getConnectorsService().getConnectors().size());
+- Iterator<ConnectorService> connectorServiceIterator = server0.getConnectorsService().getConnectors().iterator();
+- if(createQueue)
+- {
+- Assert.assertTrue(connectorServiceIterator.next().isStarted());
+- }
+- else
+- {
+- Assert.assertFalse(connectorServiceIterator.next().isStarted());
+- return;
+- }
+-
+- twitter.updateStatus(testMessage);
+-
+- TransportConfiguration tpconf = new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY);
+- ClientSessionFactory sf = HornetQClient.createClientSessionFactory(tpconf);
+- session = sf.createSession(false, true, true);
+- ClientConsumer consumer = session.createConsumer(queue);
+- session.start();
+- ClientMessage msg = consumer.receive(60*1000);
+-
+- Assert.assertNotNull(msg);
+- Assert.assertEquals(testMessage, msg.getBodyBuffer().readString());
+-
+- msg.acknowledge();
+- }
+- finally
+- {
+- try
+- {
+- session.close();
+- }
+- catch(Throwable t)
+- {
+- }
+- try
+- {
+- server0.stop();
+- }
+- catch(Throwable ignored)
+- {
+- }
+- }
+- }
+-
+- protected void internalTestIncomingFailedToInitialize(HashMap<String,String> params) throws Exception
+- {
+- HornetQServer server0 = null;
+- String connectorName = "test-incoming-connector";
+- String queue = "TwitterTestQueue";
+- String userName = "invalidUsername";
+- String password = "invalidPassword";
+- int interval = 5;
+-
+- if(params.containsKey(KEY_CONNECTOR_NAME))
+- {
+- connectorName = params.get(KEY_CONNECTOR_NAME);
+- }
+- if(params.containsKey(KEY_USERNAME))
+- {
+- userName = params.get(KEY_USERNAME);
+- }
+- if(params.containsKey(KEY_PASSWORD))
+- {
+- password = params.get(KEY_PASSWORD);
+- }
+- if(params.containsKey(KEY_QUEUE_NAME))
+- {
+- queue = params.get(KEY_QUEUE_NAME);
+- }
+-
+- try
+- {
+- Configuration configuration = createDefaultConfig(false);
+- HashMap<String, Object> config = new HashMap<String, Object>();
+- config.put(TwitterConstants.INCOMING_INTERVAL, interval);
+- config.put(TwitterConstants.QUEUE_NAME, queue);
+- config.put(TwitterConstants.USER_NAME, userName);
+- config.put(TwitterConstants.PASSWORD, password);
+- ConnectorServiceConfiguration inconf =
+- new ConnectorServiceConfiguration(TwitterIncomingConnectorServiceFactory.class.getName(),
+- config,
+- connectorName);
+- configuration.getConnectorServiceConfigurations().add(inconf);
+- CoreQueueConfiguration qc = new CoreQueueConfiguration(queue, queue, null, true);
+- configuration.getQueueConfigurations().add(qc);
+-
+- server0 = createServer(false,configuration);
+- server0.start();
+-
+- Set<ConnectorService> conns = server0.getConnectorsService().getConnectors();
+- Assert.assertEquals(1, conns.size());
+- Iterator<ConnectorService> it = conns.iterator();
+- Assert.assertFalse(it.next().isStarted());
+- }
+- finally
+- {
+- try
+- {
+- server0.stop();
+- }
+- catch(Throwable ignored)
+- {
+- }
+- }
+- }
+-
+- protected void internalTestOutgoing(boolean createQueue, boolean restart) throws Exception
+- {
+- HornetQServer server0 = null;
+- ClientSession session = null;
+- String queue = "TwitterTestQueue";
+- Twitter twitter = new TwitterFactory().getInstance(TWITTER_USERNAME,TWITTER_PASSWORD);
+- String testMessage = "TwitterTest/outgoing: " + System.currentTimeMillis();
+- log.debug("test outgoing: " + testMessage);
+-
+- try
+- {
+- Configuration configuration = createDefaultConfig(false);
+- HashMap<String, Object> config = new HashMap<String, Object>();
+- config.put(TwitterConstants.QUEUE_NAME, queue);
+- config.put(TwitterConstants.USER_NAME, TWITTER_USERNAME);
+- config.put(TwitterConstants.PASSWORD, TWITTER_PASSWORD);
+- ConnectorServiceConfiguration outconf =
+- new ConnectorServiceConfiguration(TwitterOutgoingConnectorServiceFactory.class.getName(),
+- config,
+- "test-outgoing-connector");
+- configuration.getConnectorServiceConfigurations().add(outconf);
+- if(createQueue)
+- {
+- CoreQueueConfiguration qc = new CoreQueueConfiguration(queue, queue, null, false);
+- configuration.getQueueConfigurations().add(qc);
+- }
+-
+- server0 = createServer(false,configuration);
+- server0.start();
+-
+- if(restart)
+- {
+- server0.getConnectorsService().stop();
+- server0.getConnectorsService().start();
+- }
+-
+- assertEquals(1, server0.getConnectorsService().getConnectors().size());
+- Iterator<ConnectorService> connectorServiceIterator = server0.getConnectorsService().getConnectors().iterator();
+- if(createQueue)
+- {
+- Assert.assertTrue(connectorServiceIterator.next().isStarted());
+- }
+- else
+- {
+- Assert.assertFalse(connectorServiceIterator.next().isStarted());
+- return;
+- }
+-
+- TransportConfiguration tpconf = new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY);
+- ClientSessionFactory sf = HornetQClient.createClientSessionFactory(tpconf);
+- session = sf.createSession(false, true, true);
+- ClientProducer producer = session.createProducer(queue);
+- ClientMessage msg = session.createMessage(false);
+- msg.getBodyBuffer().writeString(testMessage);
+- session.start();
+- producer.send(msg);
+-
+- Thread.sleep(3000);
+-
+- Paging page = new Paging();
+- page.setCount(1);
+- ResponseList<Status> res = twitter.getHomeTimeline(page);
+-
+- Assert.assertEquals(testMessage, res.get(0).getText());
+- }
+- finally
+- {
+- try
+- {
+- session.close();
+- }
+- catch(Throwable t)
+- {
+- }
+- try
+- {
+- server0.stop();
+- }
+- catch(Throwable ignored)
+- {
+- }
+- }
+- }
+-
+- protected void internalTestOutgoingFailedToInitialize(HashMap<String,String> params) throws Exception
+- {
+- HornetQServer server0 = null;
+- String connectorName = "test-outgoing-connector";
+- String queue = "TwitterTestQueue";
+- String userName = TWITTER_USERNAME;
+- String password = TWITTER_PASSWORD;
+-
+- if(params.containsKey(KEY_CONNECTOR_NAME))
+- {
+- connectorName = params.get(KEY_CONNECTOR_NAME);
+- }
+- if(params.containsKey(KEY_USERNAME))
+- {
+- userName = params.get(KEY_USERNAME);
+- }
+- if(params.containsKey(KEY_PASSWORD))
+- {
+- password = params.get(KEY_PASSWORD);
+- }
+- if(params.containsKey(KEY_QUEUE_NAME))
+- {
+- queue = params.get(KEY_QUEUE_NAME);
+- }
+-
+- try
+- {
+- Configuration configuration = createDefaultConfig(false);
+- HashMap<String, Object> config = new HashMap<String, Object>();
+- config.put(TwitterConstants.QUEUE_NAME, queue);
+- config.put(TwitterConstants.USER_NAME, userName);
+- config.put(TwitterConstants.PASSWORD, password);
+- ConnectorServiceConfiguration outconf =
+- new ConnectorServiceConfiguration(TwitterOutgoingConnectorServiceFactory.class.getName(),
+- config,
+- "test-outgoing-connector");
+- configuration.getConnectorServiceConfigurations().add(outconf);
+- CoreQueueConfiguration qc = new CoreQueueConfiguration(queue, queue, null, false);
+- configuration.getQueueConfigurations().add(qc);
+-
+- server0 = createServer(false,configuration);
+- server0.start();
+-
+- }
+- finally
+- {
+- try
+- {
+- server0.stop();
+- }
+- catch(Throwable ignored)
+- {
+- }
+- }
+- }
+-
+- protected void internalTestOutgoingWithInReplyTo() throws Exception
+- {
+- HornetQServer server0 = null;
+- ClientSession session = null;
+- String queue = "TwitterTestQueue";
+- Twitter twitter = new TwitterFactory().getInstance(TWITTER_USERNAME,TWITTER_PASSWORD);
+- String testMessage = "TwitterTest/outgoing with in_reply_to: " + System.currentTimeMillis();
+- String replyMessage = "@" + TWITTER_USERNAME + " TwitterTest/outgoing reply: " + System.currentTimeMillis();
+- try
+- {
+- Configuration configuration = createDefaultConfig(false);
+- HashMap<String, Object> config = new HashMap<String, Object>();
+- config.put(TwitterConstants.QUEUE_NAME, queue);
+- config.put(TwitterConstants.USER_NAME, TWITTER_USERNAME);
+- config.put(TwitterConstants.PASSWORD, TWITTER_PASSWORD);
+- ConnectorServiceConfiguration outconf =
+- new ConnectorServiceConfiguration(TwitterOutgoingConnectorServiceFactory.class.getName(),
+- config,
+- "test-outgoing-with-in-reply-to");
+- configuration.getConnectorServiceConfigurations().add(outconf);
+- CoreQueueConfiguration qc = new CoreQueueConfiguration(queue, queue, null, false);
+- configuration.getQueueConfigurations().add(qc);
+-
+- Status s = twitter.updateStatus(testMessage);
+-
+- server0 = createServer(false,configuration);
+- server0.start();
+-
+- TransportConfiguration tpconf = new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY);
+- ClientSessionFactory sf = HornetQClient.createClientSessionFactory(tpconf);
+- session = sf.createSession(false, true, true);
+- ClientProducer producer = session.createProducer(queue);
+- ClientMessage msg = session.createMessage(false);
+- msg.getBodyBuffer().writeString(replyMessage);
+- msg.putLongProperty(TwitterConstants.KEY_IN_REPLY_TO_STATUS_ID, s.getId());
+- session.start();
+- producer.send(msg);
+-
+- Thread.sleep(3000);
+-
+- Paging page = new Paging();
+- page.setCount(2);
+- ResponseList<Status> res = twitter.getHomeTimeline(page);
+-
+- Assert.assertEquals(testMessage, res.get(1).getText());
+- Assert.assertEquals(-1, res.get(1).getInReplyToStatusId());
+- Assert.assertEquals(replyMessage, res.get(0).getText());
+- Assert.assertEquals(s.getId(), res.get(0).getInReplyToStatusId());
+- }
+- finally
+- {
+- try
+- {
+- session.close();
+- }
+- catch(Throwable t)
+- {
+- }
+- try
+- {
+- server0.stop();
+- }
+- catch(Throwable ignored)
+- {
+- }
+- }
+- }
+-}
+Index: build-hornetq.xml
+===================================================================
+--- build-hornetq.xml (revision 9563)
++++ build-hornetq.xml (working copy)
+@@ -65,8 +65,8 @@
+ <property name="jnp.client.jar.name" value="jnp-client.jar"/>
+ <property name="jboss.integration.jar.name" value="hornetq-jboss-as-integration.jar"/>
+ <property name="jboss.integration.sources.jar.name" value="hornetq-jboss-as-integration-sources.jar"/>
+- <property name="twitter.integration.jar.name" value="hornetq-twitter-integration.jar"/>
+- <property name="twitter.integration.sources.jar.name" value="hornetq-twitter-integration-sources.jar"/>
++ <!-- <property name="twitter.integration.jar.name" value="hornetq-twitter-integration.jar"/>
++ <property name="twitter.integration.sources.jar.name" value="hornetq-twitter-integration-sources.jar"/> -->
+ <property name="bootstrap.jar.name" value="hornetq-bootstrap.jar"/>
+ <property name="bootstrap.sources.jar.name" value="hornetq-bootstrap-sources.jar"/>
+ <property name="logging.jar.name" value="hornetq-logging.jar"/>
+@@ -84,7 +84,7 @@
+ <property name="service.sources.sar.name" value="hornetq-service-sources.sar"/>
+ <property name="resources.jar.name" value="hornetq-resources.jar"/>
+ <property name="resources.sources.jar.name" value="hornetq-resources-sources.jar"/>
+- <property name="twitter4j.jar.name" value="twitter4j-core.jar"/>
++ <!-- <property name="twitter4j.jar.name" value="twitter4j-core.jar"/> -->
+
+ <!--source and build dirs-->
+ <property name="build.dir" value="build"/>
+@@ -94,7 +94,7 @@
+ <property name="build.jms.classes.dir" value="${build.dir}/classes/jms"/>
+ <property name="build.jms.java5.classes.dir" value="${build.dir}/classes/jms-java5"/>
+ <property name="build.jboss.integration.classes.dir" value="${build.dir}/classes/jboss-integration"/>
+- <property name="build.twitter.integration.classes.dir" value="${build.dir}/classes/twitter-integration"/>
++<!-- <property name="build.twitter.integration.classes.dir" value="${build.dir}/classes/twitter-integration"/> -->
+ <property name="build.service.classes.dir" value="${build.dir}/classes/service"/>
+ <property name="build.bootstrap.classes.dir" value="${build.dir}/classes/bootstrap"/>
+ <property name="build.logging.classes.dir" value="${build.dir}/classes/logging"/>
+@@ -213,10 +213,10 @@
+ <path refid="org.jboss.javaee.classpath"/>
+ </path>
+
+- <path id="twitter.integration.compilation.classpath">
++ <!-- <path id="twitter.integration.compilation.classpath">
+ <path location="${build.core.classes.dir}"/>
+ <path refid="org.twitter4j.classpath"/>
+- </path>
++ </path> -->
+
+ <path id="jboss.service.compilation.classpath">
+ <path refid="org.jboss.javaee.classpath"/>
+@@ -246,13 +246,13 @@
+ <path refid="jboss.integration.compilation.classpath"/>
+ <path refid="bootstrap.compilation.classpath"/>
+ <path refid="junit.junit.classpath"/>
+- <path refid="org.twitter4j.classpath"/>
++ <!-- <path refid="org.twitter4j.classpath"/> -->
+ <path location="${build.jars.dir}/${ra.jar.name}"/>
+ <path location="${build.jars.dir}/${jms.jar.name}"/>
+ <path location="${build.jars.dir}/${jboss.integration.jar.name}"/>
+ <path location="${build.jars.dir}/${bootstrap.jar.name}"/>
+ <path location="${build.jars.dir}/${logging.jar.name}"/>
+- <path location="${build.jars.dir}/${twitter.integration.jar.name}"/>
++ <!-- <path location="${build.jars.dir}/${twitter.integration.jar.name}"/> -->
+ </path>
+
+ <path id="jms.test.compilation.classpath">
+@@ -305,7 +305,7 @@
+ <!-- we must include Apache commons logging -->
+ <!-- as a transitive dependency from JBoss TM -->
+ <path refid="apache.logging.classpath"/>
+- <path refid="org.twitter4j.classpath"/>
++ <!-- <path refid="org.twitter4j.classpath"/> -->
+ </path>
+
+ <path id="emma.unit.test.execution.classpath">
+@@ -382,7 +382,7 @@
+ <mkdir dir="${build.jms.classes.dir}"/>
+ <mkdir dir="${build.jms.java5.classes.dir}"/>
+ <mkdir dir="${build.jboss.integration.classes.dir}"/>
+- <mkdir dir="${build.twitter.integration.classes.dir}"/>
++ <!-- <mkdir dir="${build.twitter.integration.classes.dir}"/> -->
+ <mkdir dir="${build.service.classes.dir}"/>
+ <mkdir dir="${build.bootstrap.classes.dir}"/>
+ <mkdir dir="${build.logging.classes.dir}"/>
+@@ -560,7 +560,7 @@
+ </javac>
+ </target>
+
+- <target name="compile-twitter-integration" depends="compile-core">
++ <!-- <target name="compile-twitter-integration" depends="compile-core">
+ <javac destdir="${build.twitter.integration.classes.dir}"
+ target="${javac.target}"
+ source="${javac.source}"
+@@ -578,7 +578,7 @@
+ <include name="org/hornetq/integration/twitter/**/*.java"/>
+ <classpath refid="twitter.integration.compilation.classpath"/>
+ </javac>
+- </target>
++ </target> -->
+
+ <!-- author: Lucas Amador -->
+ <target name="compile-jboss-service" depends="compile-core">
+@@ -719,12 +719,19 @@
+ <!-- Jar Targets -->
+ <!-- ======================================================================================== -->
+
++ <!-- <target name="sources-jar" description="create jar files containing source code"
++ depends="jar-core-sources, jar-core-client-sources, jar-core-client-java5-sources, jar-jms-sources, jar-jms-client-sources, jar-jms-client-java5-sources, jar-jboss-integration-sources, jar-jboss-service-sources, jar-bootstrap-sources, jar-logging-sources, jar-ra-sources, jar-resources-sources, jar-twitter-integration-sources">
++ </target> -->
+ <target name="sources-jar" description="create jar files containing source code"
+- depends="jar-core-sources, jar-core-client-sources, jar-core-client-java5-sources, jar-jms-sources, jar-jms-client-sources, jar-jms-client-java5-sources, jar-jboss-integration-sources, jar-jboss-service-sources, jar-bootstrap-sources, jar-logging-sources, jar-ra-sources, jar-resources-sources, jar-twitter-integration-sources">
++ depends="jar-core-sources, jar-core-client-sources, jar-core-client-java5-sources, jar-jms-sources, jar-jms-client-sources, jar-jms-client-java5-sources, jar-jboss-integration-sources, jar-jboss-service-sources, jar-bootstrap-sources, jar-logging-sources, jar-ra-sources, jar-resources-sources">
+ </target>
+
++ <!-- <target name="jar"
++ depends="jar-core, jar-core-client, jar-core-client-java5, jar-jms, jar-jms-client, jar-jms-client-java5, jar-jboss-integration, jar-jboss-service, jar-bootstrap, jar-logging, jar-ra, jar-mc, jar-jnp-client, jar-resources, sources-jar, jar-twitter-integration">
++ </target> -->
++
+ <target name="jar"
+- depends="jar-core, jar-core-client, jar-core-client-java5, jar-jms, jar-jms-client, jar-jms-client-java5, jar-jboss-integration, jar-jboss-service, jar-bootstrap, jar-logging, jar-ra, jar-mc, jar-jnp-client, jar-resources, sources-jar, jar-twitter-integration">
++ depends="jar-core, jar-core-client, jar-core-client-java5, jar-jms, jar-jms-client, jar-jms-client-java5, jar-jboss-integration, jar-jboss-service, jar-bootstrap, jar-logging, jar-ra, jar-mc, jar-jnp-client, jar-resources, sources-jar">
+ </target>
+
+ <target name="jar-jnp-client" depends="init">
+@@ -868,7 +875,7 @@
+ </jar>
+ </target>
+
+- <target name="jar-twitter-integration" depends="compile-twitter-integration">
++<!-- <target name="jar-twitter-integration" depends="compile-twitter-integration">
+
+ <jar jarfile="${build.jars.dir}/${twitter.integration.jar.name}">
+ <fileset dir="${build.twitter.integration.classes.dir}" includes="**"/>
+@@ -882,7 +889,7 @@
+ <include name="org/hornetq/integration/twitter/**/*.java"/>
+ </fileset>
+ </jar>
+- </target>
++ </target> -->
+
+ <!-- author: Lucas Amador -->
+ <target name="jar-jboss-service" depends="compile-jboss-service">
+@@ -1148,7 +1155,7 @@
+ <include name="${jms.client.jar.name}"/>
+ <include name="${jms.client.java5.jar.name}"/>
+ <include name="${jnp.client.jar.name}"/>
+- <include name="${twitter.integration.jar.name}"/>
++ <!-- <include name="${twitter.integration.jar.name}"/> -->
+ </fileset>
+ <fileset dir="${org.jboss.naming.lib}">
+ <include name="jnpserver.jar"/>
+@@ -1158,7 +1165,7 @@
+ </fileset>
+ </copy>
+ <copy file="${org.jboss.netty.lib}/${netty.jar.name}" tofile="${build.distro.lib.dir}/netty.jar"/>
+- <copy file="${org.twitter4j.lib}/${twitter4j.jar.name}" tofile="${build.distro.lib.dir}/${twitter4j.jar.name}"/>
++ <!-- <copy file="${org.twitter4j.lib}/${twitter4j.jar.name}" tofile="${build.distro.lib.dir}/${twitter4j.jar.name}"/> -->
+ <copy todir="${build.distro.config.dir}">
+ <fileset dir="${src.config.dir}">
+ <include name="*.xml"/>
15 years, 9 months
JBoss hornetq SVN: r9563 - in trunk: src/main/org/hornetq/core/journal/impl and 20 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-08-18 13:03:33 -0400 (Wed, 18 Aug 2010)
New Revision: 9563
Added:
trunk/src/main/org/hornetq/utils/LinkedList.java
trunk/src/main/org/hornetq/utils/LinkedListImpl.java
trunk/src/main/org/hornetq/utils/LinkedListIterator.java
trunk/tests/src/org/hornetq/tests/unit/core/list/impl/PriorityLinkedListTest.java
trunk/tests/src/org/hornetq/tests/unit/util/LinkedListTest.java
Removed:
trunk/src/main/org/hornetq/utils/NonConcurrentHQDeque.java
trunk/src/main/org/hornetq/utils/concurrent/
trunk/tests/src/org/hornetq/tests/unit/core/list/impl/ConcurrentPriorityLinkedListTest.java
trunk/tests/src/org/hornetq/tests/unit/core/list/impl/NonConcurrentPriorityLinkedListTest.java
trunk/tests/src/org/hornetq/tests/unit/util/concurrent/
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/postoffice/Binding.java
trunk/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java
trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
trunk/src/main/org/hornetq/core/server/Queue.java
trunk/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/utils/HQIterator.java
trunk/src/main/org/hornetq/utils/PriorityLinkedList.java
trunk/src/main/org/hornetq/utils/PriorityLinkedListImpl.java
trunk/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java
trunk/tests/src/org/hornetq/tests/integration/client/AcknowledgeTest.java
trunk/tests/src/org/hornetq/tests/integration/client/ConsumerFilterTest.java
trunk/tests/src/org/hornetq/tests/integration/client/ConsumerRoundRobinTest.java
trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java
trunk/tests/src/org/hornetq/tests/integration/client/MessagePriorityTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/OrderReattachTest.java
trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
trunk/tests/src/org/hornetq/tests/integration/scheduling/DelayedMessageTest.java
trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
trunk/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/list/impl/PriorityLinkedListTestBase.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-469
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -14,6 +14,7 @@
package org.hornetq.core.client.impl;
import java.io.File;
+import java.util.Iterator;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
@@ -30,7 +31,6 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
import org.hornetq.utils.Future;
-import org.hornetq.utils.HQIterator;
import org.hornetq.utils.PriorityLinkedList;
import org.hornetq.utils.PriorityLinkedListImpl;
import org.hornetq.utils.TokenBucketLimiter;
@@ -209,7 +209,7 @@
synchronized (this)
{
- while ((stopped || (m = buffer.removeFirst()) == null) && !closed && toWait > 0)
+ while ((stopped || (m = buffer.poll()) == null) && !closed && toWait > 0)
{
if (start == -1)
{
@@ -367,7 +367,6 @@
// if no previous handler existed queue up messages for delivery
if (handler != null && noPreviousHandler)
{
-
requeueExecutors();
}
// if unsetting a previous handler may be in onMessage so wait for completion
@@ -496,7 +495,7 @@
}
// Add it to the buffer
- buffer.addLast(messageToHandle, messageToHandle.getPriority());
+ buffer.addTail(messageToHandle, messageToHandle.getPriority());
if (handler != null)
{
@@ -567,12 +566,12 @@
{
// Need to send credits for the messages in the buffer
- HQIterator<ClientMessageInternal> iter = buffer.iterator();
+ Iterator<ClientMessageInternal> iter = buffer.iterator();
- ClientMessageInternal message;
-
- while ((message = iter.next()) != null)
+ while (iter.hasNext())
{
+ ClientMessageInternal message = iter.next();
+
flowControlBeforeConsumption(message);
}
@@ -802,7 +801,7 @@
synchronized (this)
{
- message = buffer.removeFirst();
+ message = buffer.poll();
}
if (message != null)
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -67,6 +68,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.utils.DataConstants;
+
/**
*
* <p>A circular log implementation.</p
@@ -183,9 +185,9 @@
public final String fileExtension;
- private final LinkedBlockingDeque<JournalFile> dataFiles = new LinkedBlockingDeque<JournalFile>();
+ private final BlockingDeque<JournalFile> dataFiles = new LinkedBlockingDeque<JournalFile>();
- private final LinkedBlockingDeque<JournalFile> pendingCloseFiles = new LinkedBlockingDeque<JournalFile>();
+ private final BlockingQueue<JournalFile> pendingCloseFiles = new LinkedBlockingDeque<JournalFile>();
private final ConcurrentLinkedQueue<JournalFile> freeFiles = new ConcurrentLinkedQueue<JournalFile>();
Modified: trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -396,6 +396,7 @@
{
Filter filter = FilterImpl.createFilter(filterStr);
List<Map<String, Object>> messages = new ArrayList<Map<String,Object>>();
+ queue.blockOnExecutorFuture();
Iterator<MessageReference> iterator = queue.iterator();
while (iterator.hasNext())
{
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -1397,7 +1397,6 @@
}
case ADD_REF:
{
-
long messageID = record.id;
RefEncoding encoding = new RefEncoding();
@@ -1436,8 +1435,8 @@
{
throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
}
-
- // TODO - this involves a scan - we should find a quicker qay of doing it
+
+ // TODO - this involves a scan - we should find a quicker way of doing it
MessageReference removed = queue.removeReferenceWithID(messageID);
referencesToAck.add(removed);
Modified: trunk/src/main/org/hornetq/core/postoffice/Binding.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/Binding.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/src/main/org/hornetq/core/postoffice/Binding.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -51,4 +51,6 @@
int getDistance();
void route(ServerMessage message, RoutingContext context) throws Exception;
+
+ void close() throws Exception;
}
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -132,5 +132,9 @@
{
return "DivertBinding [divert=" + divert + "]";
}
+
+ public void close() throws Exception
+ {
+ }
}
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -144,5 +144,10 @@
{
return "LocalQueueBinding [address=" + address + ", name=" + name + ", filter=" + filter + "]";
}
+
+ public void close() throws Exception
+ {
+ queue.close();
+ }
}
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -501,6 +501,8 @@
managementService.sendNotification(new Notification(null, NotificationType.BINDING_REMOVED, props));
+ binding.close();
+
return binding;
}
@@ -692,7 +694,7 @@
if (tx == null)
{
- queue.addLast(reference, false);
+ queue.reload(reference);
}
else
{
@@ -968,7 +970,7 @@
{
for (MessageReference ref : refs)
{
- ref.getQueue().addLast(ref, direct);
+ ref.getQueue().addTail(ref, direct);
}
}
@@ -1262,7 +1264,7 @@
{
for (MessageReference ref : refs)
{
- ref.getQueue().addLast(ref, false);
+ ref.getQueue().addTail(ref, false);
}
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -15,7 +15,6 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/src/main/org/hornetq/core/server/Queue.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -49,11 +49,13 @@
int getConsumerCount();
- void addLast(MessageReference ref);
+ void reload(MessageReference ref);
+
+ void addTail(MessageReference ref);
- void addLast(MessageReference ref, boolean direct);
+ void addTail(MessageReference ref, boolean direct);
- void addFirst(MessageReference ref);
+ void addHead(MessageReference ref);
void acknowledge(MessageReference ref) throws Exception;
@@ -81,8 +83,6 @@
MessageReference removeReferenceWithID(long id) throws Exception;
- MessageReference removeFirstReference(long id) throws Exception;
-
MessageReference getReference(long id);
int deleteAllReferences() throws Exception;
@@ -124,9 +124,6 @@
boolean checkDLQ(MessageReference ref) throws Exception;
- /**
- * @return an immutable iterator which does not allow to remove references
- */
Iterator<MessageReference> iterator();
void setExpiryAddress(SimpleString expiryAddress);
@@ -152,6 +149,10 @@
boolean isPaused();
Executor getExecutor();
+
+ void resetAllIterators();
-
+ void blockOnExecutorFuture();
+
+ void close() throws Exception;
}
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -289,5 +289,10 @@
uniqueName +
"]";
}
+
+ public void close() throws Exception
+ {
+ storeAndForwardQueue.close();
+ }
}
Modified: trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -71,7 +71,7 @@
}
@Override
- public synchronized void add(final MessageReference ref, final boolean first, final boolean direct)
+ public synchronized void addTail(final MessageReference ref, final boolean direct)
{
SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
@@ -79,67 +79,75 @@
{
HolderReference hr = map.get(prop);
- if (!first)
+ if (hr != null)
{
- if (hr != null)
- {
- // We need to overwrite the old ref with the new one and ack the old one
+ // We need to overwrite the old ref with the new one and ack the old one
- MessageReference oldRef = hr.getReference();
+ MessageReference oldRef = hr.getReference();
- super.referenceHandled();
+ super.referenceHandled();
- try
- {
- super.acknowledge(oldRef);
- }
- catch (Exception e)
- {
- LastValueQueue.log.error("Failed to ack old reference", e);
- }
-
- hr.setReference(ref);
-
+ try
+ {
+ super.acknowledge(oldRef);
}
- else
+ catch (Exception e)
{
- hr = new HolderReference(prop, ref);
+ LastValueQueue.log.error("Failed to ack old reference", e);
+ }
- map.put(prop, hr);
+ hr.setReference(ref);
- super.add(hr, first, direct);
- }
}
else
{
- // Add to front
+ hr = new HolderReference(prop, ref);
- if (hr != null)
- {
- // We keep the current ref and ack the one we are returning
+ map.put(prop, hr);
- super.referenceHandled();
+ super.addTail(hr, direct);
+ }
+ }
+ else
+ {
+ super.addTail(ref, direct);
+ }
+ }
- try
- {
- super.acknowledge(ref);
- }
- catch (Exception e)
- {
- LastValueQueue.log.error("Failed to ack old reference", e);
- }
+ @Override
+ public synchronized void addHead(final MessageReference ref)
+ {
+ SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
+
+ if (prop != null)
+ {
+ HolderReference hr = map.get(prop);
+
+ if (hr != null)
+ {
+ // We keep the current ref and ack the one we are returning
+
+ super.referenceHandled();
+
+ try
+ {
+ super.acknowledge(ref);
}
- else
+ catch (Exception e)
{
- map.put(prop, (HolderReference)ref);
-
- super.add(ref, first, direct);
+ LastValueQueue.log.error("Failed to ack old reference", e);
}
}
+ else
+ {
+ map.put(prop, (HolderReference)ref);
+
+ super.addHead(ref);
+ }
}
else
{
- super.add(ref, first, direct);
+ super.addHead(ref);
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -21,12 +21,12 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
@@ -50,14 +50,16 @@
import org.hornetq.core.transaction.TransactionPropertyIndexes;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.utils.ConcurrentHashSet;
-import org.hornetq.utils.HQIterator;
+import org.hornetq.utils.Future;
+import org.hornetq.utils.LinkedListIterator;
import org.hornetq.utils.PriorityLinkedList;
-import org.hornetq.utils.concurrent.ConcurrentPriorityLinkedListImpl;
+import org.hornetq.utils.PriorityLinkedListImpl;
/**
* Implementation of a Queue
*
* Completely non blocking between adding to queue and delivering to consumers.
+ *
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="ataylor(a)redhat.com">Andy Taylor</a>
@@ -71,8 +73,10 @@
public static final int REDISTRIBUTOR_BATCH_SIZE = 100;
public static final int NUM_PRIORITIES = 10;
+
+ public static final int MAX_DELIVERIES_IN_LOOP = 1000;
- public static final int MAX_DELIVERIES_IN_LOOP = 1000;
+ private static final int CHECK_QUEUE_SIZE_PERIOD = 2000;
private final long id;
@@ -86,13 +90,15 @@
private final PostOffice postOffice;
- private final PriorityLinkedList<MessageReference> messageReferences = new ConcurrentPriorityLinkedListImpl<MessageReference>(QueueImpl.NUM_PRIORITIES);
+ private final ConcurrentLinkedQueue<MessageReference> concurrentQueue = new ConcurrentLinkedQueue<MessageReference>();
+ private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<MessageReference>(QueueImpl.NUM_PRIORITIES);
+
private final List<ConsumerHolder> consumerList = new ArrayList<ConsumerHolder>();
private final ScheduledDeliveryHandler scheduledDeliveryHandler;
- private final AtomicLong messagesAdded = new AtomicLong(0);
+ private long messagesAdded;
protected final AtomicInteger deliveringCount = new AtomicInteger(0);
@@ -112,8 +118,10 @@
private final Set<ScheduledFuture<?>> futures = new ConcurrentHashSet<ScheduledFuture<?>>();
- private ScheduledFuture<?> future;
+ private ScheduledFuture<?> redistributorFuture;
+ private ScheduledFuture<?> checkQueueSizeFuture;
+
// We cache the consumers here since we don't want to include the redistributor
private final Set<Consumer> consumerSet = new HashSet<Consumer>();
@@ -125,21 +133,15 @@
private int pos;
private final Executor executor;
-
+
private volatile int consumerWithFilterCount;
- private static class ConsumerHolder
- {
- ConsumerHolder(final Consumer consumer)
- {
- this.consumer = consumer;
- }
+ private final Runnable concurrentPoller = new ConcurrentPoller();
- final Consumer consumer;
+ private volatile boolean queued;
+
+ private volatile boolean checkQueueSize = true;
- volatile HQIterator<MessageReference> iter;
- }
-
public QueueImpl(final long id,
final SimpleString address,
final SimpleString name,
@@ -184,8 +186,16 @@
}
this.executor = executor;
+
+ checkQueueSizeFuture = scheduledExecutor.scheduleWithFixedDelay(new Runnable()
+ {
+ public void run()
+ {
+ checkQueueSize = true;
+ }
+ }, CHECK_QUEUE_SIZE_PERIOD, CHECK_QUEUE_SIZE_PERIOD, TimeUnit.MILLISECONDS);
}
-
+
// Bindable implementation -------------------------------------------------------------------------------------
public SimpleString getRoutingName()
@@ -235,38 +245,117 @@
return filter;
}
- public void addLast(final MessageReference ref)
+ /* Called when a message is cancelled back into the queue */
+ public synchronized void addHead(final MessageReference ref)
{
- addLast(ref, false);
+ if (scheduledDeliveryHandler.checkAndSchedule(ref))
+ {
+ return;
+ }
+
+ messageReferences.addHead(ref, ref.getMessage().getPriority());
+
+ queued = true;
+
+ checkQueueSize = false;
}
- public void addLast(final MessageReference ref, final boolean direct)
+ public synchronized void reload(final MessageReference ref)
{
- messagesAdded.incrementAndGet();
+ if (!scheduledDeliveryHandler.checkAndSchedule(ref))
+ {
+ messageReferences.addTail(ref, ref.getMessage().getPriority());
+ }
- add(ref, false, direct);
+ queued = true;
+
+ checkQueueSize = false;
+
+ messagesAdded++;
}
- public void addFirst(final MessageReference ref)
+ public void addTail(final MessageReference ref)
{
- add(ref, true, false);
+ addTail(ref, false);
}
+ public void addTail(final MessageReference ref, final boolean direct)
+ {
+ if (scheduledDeliveryHandler.checkAndSchedule(ref))
+ {
+ synchronized (this)
+ {
+ messagesAdded++;
+ }
+
+ return;
+ }
+
+ if (checkQueueSize)
+ {
+ // This is an expensive operation so we don't want to do it every time we add a message, that's why we use the checkQueueSize flag
+ // which is set to true periodically using a scheduled executor
+
+ queued = !messageReferences.isEmpty() || !concurrentQueue.isEmpty();
+
+ checkQueueSize = false;
+ }
+
+ if (direct & !queued)
+ {
+ if (deliverDirect(ref))
+ {
+ return;
+ }
+ }
+
+ concurrentQueue.add(ref);
+
+ executor.execute(concurrentPoller);
+ }
+
public void deliverAsync()
{
executor.execute(deliverRunner);
}
+
+ public void close() throws Exception
+ {
+ if (checkQueueSizeFuture != null)
+ {
+ checkQueueSizeFuture.cancel(false);
+ }
+
+ cancelRedistributor();
+ }
public Executor getExecutor()
{
return executor;
}
- public synchronized void deliverNow()
+ /* Only used on tests */
+ public void deliverNow()
{
- deliver();
+ deliverAsync();
+
+ blockOnExecutorFuture();
}
+ public void blockOnExecutorFuture()
+ {
+ Future future = new Future();
+
+ executor.execute(future);
+
+ boolean ok = future.await(10000);
+
+ if (!ok)
+ {
+ throw new IllegalStateException("Timed out waiting for future to complete");
+ }
+ }
+
public synchronized void addConsumer(final Consumer consumer) throws Exception
{
cancelRedistributor();
@@ -291,6 +380,11 @@
if (holder.consumer == consumer)
{
+ if (holder.iter != null)
+ {
+ holder.iter.close();
+ }
+
iter.remove();
break;
@@ -327,11 +421,11 @@
public synchronized void addRedistributor(final long delay)
{
- if (future != null)
+ if (redistributorFuture != null)
{
- future.cancel(false);
+ redistributorFuture.cancel(false);
- futures.remove(future);
+ futures.remove(redistributorFuture);
}
if (redistributor != null)
@@ -346,9 +440,9 @@
{
DelayedAddRedistributor dar = new DelayedAddRedistributor(executor);
- future = scheduledExecutor.schedule(dar, delay, TimeUnit.MILLISECONDS);
+ redistributorFuture = scheduledExecutor.schedule(dar, delay, TimeUnit.MILLISECONDS);
- futures.add(future);
+ futures.add(redistributorFuture);
}
}
else
@@ -385,13 +479,26 @@
}
}
- if (future != null)
+ if (redistributorFuture != null)
{
- future.cancel(false);
+ redistributorFuture.cancel(false);
- future = null;
+ redistributorFuture = null;
}
}
+
+ @Override
+ protected void finalize() throws Throwable
+ {
+ if (checkQueueSizeFuture != null)
+ {
+ checkQueueSizeFuture.cancel(false);
+ }
+
+ cancelRedistributor();
+
+ super.finalize();
+ }
public synchronized int getConsumerCount()
{
@@ -433,39 +540,10 @@
public Iterator<MessageReference> iterator()
{
- return new Iterator<MessageReference>()
- {
- private final HQIterator<MessageReference> iterator = messageReferences.iterator();
-
- private MessageReference next;
-
- public boolean hasNext()
- {
- if (next == null)
- {
- next = iterator.next();
- }
-
- return next != null;
- }
-
- public MessageReference next()
- {
- MessageReference n = next;
-
- next = null;
-
- return n;
- }
-
- public void remove()
- {
- iterator.remove();
- }
- };
+ return new SynchronizedIterator(messageReferences.iterator());
}
- public MessageReference removeReferenceWithID(final long id) throws Exception
+ public synchronized MessageReference removeReferenceWithID(final long id) throws Exception
{
Iterator<MessageReference> iterator = iterator();
@@ -494,26 +572,8 @@
return removed;
}
- public synchronized MessageReference removeFirstReference(final long id) throws Exception
+ public synchronized MessageReference getReference(final long id)
{
- MessageReference ref = messageReferences.peekFirst();
-
- if (ref != null && ref.getMessage().getMessageID() == id)
- {
- messageReferences.removeFirst();
-
- return ref;
- }
- else
- {
- ref = scheduledDeliveryHandler.removeReferenceWithID(id);
- }
-
- return ref;
- }
-
- public MessageReference getReference(final long id)
- {
Iterator<MessageReference> iterator = iterator();
while (iterator.hasNext())
@@ -529,9 +589,14 @@
return null;
}
- public synchronized int getMessageCount()
+ public int getMessageCount()
{
- return messageReferences.size() + getScheduledCount() + getDeliveringCount();
+ blockOnExecutorFuture();
+
+ synchronized (this)
+ {
+ return messageReferences.size() + getScheduledCount() + getDeliveringCount();
+ }
}
public synchronized int getScheduledCount()
@@ -621,8 +686,10 @@
{
if (!scheduledDeliveryHandler.checkAndSchedule(reference))
{
- messageReferences.addFirst(reference, reference.getMessage().getPriority());
+ messageReferences.addHead(reference, reference.getMessage().getPriority());
}
+
+ resetAllIterators();
}
}
@@ -650,7 +717,12 @@
public long getMessagesAdded()
{
- return messagesAdded.get();
+ blockOnExecutorFuture();
+
+ synchronized (this)
+ {
+ return messagesAdded;
+ }
}
public int deleteAllReferences() throws Exception
@@ -658,7 +730,7 @@
return deleteMatchingReferences(null);
}
- public int deleteMatchingReferences(final Filter filter) throws Exception
+ public synchronized int deleteMatchingReferences(final Filter filter) throws Exception
{
int count = 0;
@@ -692,7 +764,7 @@
return count;
}
- public boolean deleteReference(final long messageID) throws Exception
+ public synchronized boolean deleteReference(final long messageID) throws Exception
{
boolean deleted = false;
@@ -718,7 +790,7 @@
return deleted;
}
- public boolean expireReference(final long messageID) throws Exception
+ public synchronized boolean expireReference(final long messageID) throws Exception
{
Iterator<MessageReference> iter = iterator();
@@ -736,7 +808,7 @@
return false;
}
- public int expireReferences(final Filter filter) throws Exception
+ public synchronized int expireReferences(final Filter filter) throws Exception
{
Transaction tx = new TransactionImpl(storageManager);
@@ -760,7 +832,7 @@
return count;
}
- public void expireReferences() throws Exception
+ public synchronized void expireReferences() throws Exception
{
Iterator<MessageReference> iter = iterator();
@@ -776,7 +848,7 @@
}
}
- public boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception
+ public synchronized boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception
{
Iterator<MessageReference> iter = iterator();
@@ -794,7 +866,7 @@
return false;
}
- public int sendMessagesToDeadLetterAddress(Filter filter) throws Exception
+ public synchronized int sendMessagesToDeadLetterAddress(Filter filter) throws Exception
{
int count = 0;
Iterator<MessageReference> iter = iterator();
@@ -813,7 +885,7 @@
return count;
}
- public boolean moveReference(final long messageID, final SimpleString toAddress) throws Exception
+ public synchronized boolean moveReference(final long messageID, final SimpleString toAddress) throws Exception
{
Iterator<MessageReference> iter = iterator();
@@ -831,7 +903,7 @@
return false;
}
- public int moveReferences(final Filter filter, final SimpleString toAddress) throws Exception
+ public synchronized int moveReferences(final Filter filter, final SimpleString toAddress) throws Exception
{
Transaction tx = new TransactionImpl(storageManager);
@@ -864,7 +936,7 @@
return count;
}
- public boolean changeReferencePriority(final long messageID, final byte newPriority) throws Exception
+ public synchronized boolean changeReferencePriority(final long messageID, final byte newPriority) throws Exception
{
Iterator<MessageReference> iter = iterator();
@@ -875,7 +947,7 @@
{
iter.remove();
ref.getMessage().setPriority(newPriority);
- addLast(ref, false);
+ addTail(ref, false);
return true;
}
}
@@ -883,7 +955,7 @@
return false;
}
- public int changeReferencesPriority(final Filter filter, final byte newPriority) throws Exception
+ public synchronized int changeReferencesPriority(final Filter filter, final byte newPriority) throws Exception
{
Iterator<MessageReference> iter = iterator();
@@ -896,12 +968,37 @@
count++;
iter.remove();
ref.getMessage().setPriority(newPriority);
- addLast(ref, false);
+ addTail(ref, false);
}
}
return count;
}
+ public synchronized void resetAllIterators()
+ {
+ for (ConsumerHolder holder : this.consumerList)
+ {
+ holder.iter = null;
+ }
+ }
+
+ public synchronized void pause()
+ {
+ paused = true;
+ }
+
+ public synchronized void resume()
+ {
+ paused = false;
+
+ deliverAsync();
+ }
+
+ public synchronized boolean isPaused()
+ {
+ return paused;
+ }
+
// Public
// -----------------------------------------------------------------------------
@@ -933,6 +1030,147 @@
// Private
// ------------------------------------------------------------------------------
+ private synchronized void doPoll()
+ {
+ MessageReference ref = concurrentQueue.poll();
+
+ if (ref != null)
+ {
+ messageReferences.addTail(ref, ref.getMessage().getPriority());
+
+ messagesAdded++;
+
+ if (consumerWithFilterCount > 0 || messageReferences.size() == 1)
+ {
+ deliver();
+ }
+ }
+ }
+
+ // This method will deliver as many messages as possible until all consumers are busy or there are no more matching
+ // or available messages
+ private synchronized void deliver()
+ {
+ if (paused || consumerList.isEmpty())
+ {
+ return;
+ }
+
+ int busyCount = 0;
+
+ int nullRefCount = 0;
+
+ int size = consumerList.size();
+
+ int endPos = pos == size - 1 ? 0 : size - 1;
+
+ int numRefs = messageReferences.size();
+
+ int handled = 0;
+
+ while (handled < numRefs)
+ {
+ ConsumerHolder holder = consumerList.get(pos);
+
+ Consumer consumer = holder.consumer;
+
+ if (holder.iter == null)
+ {
+ holder.iter = messageReferences.iterator();
+ }
+
+ MessageReference ref;
+
+ if (holder.iter.hasNext())
+ {
+ ref = holder.iter.next();
+ }
+ else
+ {
+ ref = null;
+ }
+
+ if (ref == null)
+ {
+ nullRefCount++;
+ }
+ else
+ {
+ if (checkExpired(ref))
+ {
+ holder.iter.remove();
+
+ continue;
+ }
+
+ Consumer groupConsumer = null;
+
+ // If a group id is set, then this overrides the consumer chosen round-robin
+
+ SimpleString groupID = ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
+
+ if (groupID != null)
+ {
+ groupConsumer = groups.get(groupID);
+
+ if (groupConsumer != null)
+ {
+ consumer = groupConsumer;
+ }
+ }
+
+ HandleStatus status = handle(ref, consumer);
+
+ if (status == HandleStatus.HANDLED)
+ {
+ holder.iter.remove();
+
+ if (groupID != null && groupConsumer == null)
+ {
+ groups.put(groupID, consumer);
+ }
+
+ handled++;
+ }
+ else if (status == HandleStatus.BUSY)
+ {
+ holder.iter.repeat();
+
+ busyCount++;
+ }
+ else if (status == HandleStatus.NO_MATCH)
+ {
+ }
+ }
+
+ if (pos == endPos)
+ {
+ // Round robin'd all
+
+ if (nullRefCount + busyCount == size)
+ {
+ break;
+ }
+
+ nullRefCount = busyCount = 0;
+ }
+
+ pos++;
+
+ if (pos == size)
+ {
+ pos = 0;
+ }
+
+ if (handled == MAX_DELIVERIES_IN_LOOP)
+ {
+ // Schedule another one - we do this to prevent a single thread getting caught up in this loop for too long
+
+ deliverAsync();
+ }
+ }
+ }
+
private void internalAddRedistributor(final Executor executor)
{
// create the redistributor only once if there are no local consumers
@@ -1098,151 +1336,6 @@
tx.commit();
}
- private synchronized void deliver()
- {
- if (paused || consumerList.isEmpty())
- {
- return;
- }
-
- int busyCount = 0;
-
- int nullRefCount = 0;
-
- int noMatchCount = 0;
-
- int size = consumerList.size();
-
- int startPos = pos;
-
- // Deliver at most 1000 messages in one go, to prevent tying this thread up for too long
- int loop = Math.min(messageReferences.size(), MAX_DELIVERIES_IN_LOOP);
-
- for (int i = 0; i < loop; i++)
- {
- ConsumerHolder holder = consumerList.get(pos);
-
- Consumer consumer = holder.consumer;
-
- MessageReference ref;
-
- if (holder.iter == null)
- {
- ref = messageReferences.removeFirst();
- }
- else
- {
- ref = holder.iter.next();
- }
-
- if (ref == null)
- {
- nullRefCount++;
-
- if (holder.iter != null)
- {
- noMatchCount++;
- }
- }
- else
- {
- if (checkExpired(ref))
- {
- if (holder.iter != null)
- {
- holder.iter.remove();
- }
-
- continue;
- }
-
- Consumer groupConsumer = null;
-
- // If a group id is set, then this overrides the consumer chosen round-robin
-
- SimpleString groupID = ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
-
- if (groupID != null)
- {
- groupConsumer = groups.get(groupID);
-
- if (groupConsumer != null)
- {
- consumer = groupConsumer;
- }
- }
-
- HandleStatus status = handle(ref, consumer);
-
- if (status == HandleStatus.HANDLED)
- {
- if (holder.iter != null)
- {
- holder.iter.remove();
- }
-
- if (groupID != null && groupConsumer == null)
- {
- groups.put(groupID, consumer);
- }
- }
- else if (status == HandleStatus.BUSY)
- {
- if (holder.iter == null)
- {
- // Put the ref back
-
- messageReferences.addFirst(ref, ref.getMessage().getPriority());
- }
-
- busyCount++;
- }
- else if (status == HandleStatus.NO_MATCH)
- {
- if (holder.iter == null)
- {
- // Put the ref back
-
- messageReferences.addFirst(ref, ref.getMessage().getPriority());
-
- holder.iter = messageReferences.iterator();
-
- // Skip past the one we just put back
-
- holder.iter.next();
- }
- }
- }
-
- pos++;
-
- if (pos == size)
- {
- pos = 0;
- }
-
- if (pos == startPos)
- {
- // Round robin'd all
-
- if (nullRefCount + busyCount == size)
- {
- break;
- }
-
- nullRefCount = busyCount = noMatchCount = 0;
- }
- }
-
- if (messageReferences.size() > 0 && busyCount != size && noMatchCount != size)
- {
- // More messages to deliver so need to prompt another runner - note we don't
- // prompt another one if all consumers are busy
-
- executor.execute(deliverRunner);
- }
- }
-
/*
* This method delivers the reference on the callers thread - this can give us better latency in the case there is nothing in the queue
*/
@@ -1300,6 +1393,8 @@
groups.put(groupID, consumer);
}
+ messagesAdded++;
+
return true;
}
@@ -1335,47 +1430,6 @@
}
}
- protected void add(final MessageReference ref, final boolean first, final boolean direct)
- {
- if (scheduledDeliveryHandler.checkAndSchedule(ref))
- {
- return;
- }
-
- if (direct && messageReferences.isEmpty())
- {
- if (deliverDirect(ref))
- {
- return;
- }
- }
-
- int refs;
-
- if (first)
- {
- refs = messageReferences.addFirst(ref, ref.getMessage().getPriority());
- }
- else
- {
- refs = messageReferences.addLast(ref, ref.getMessage().getPriority());
- }
-
- /*
- * We only prompt delivery if there are no messages waiting for delivery - this prevents many executors being
- * unnecessarily queued up
- * During delivery toDeliver is decremented before the message is delivered, therefore if it's delivering the last
- * message, then we cannot have a situation where this delivery is not prompted and message remains stranded in the
- * queue.
- * The exception to this is if we have consumers with filters - these will maintain an iterator, so we need to prompt delivery every time
- * in this case, since there may be many non matching messages already in the queue
- */
- if (consumerWithFilterCount > 0 || refs == 1)
- {
- deliverAsync();
- }
- }
-
private synchronized HandleStatus handle(final MessageReference reference, final Consumer consumer)
{
HandleStatus status;
@@ -1433,7 +1487,7 @@
// also note then when this happens as part of a trasaction its the tx commt of the ack that is important
// not this
-
+
// Also note that this delete shouldn't sync to disk, or else we would build up the executor's queue
// as we can't delete each messaging with sync=true while adding messages transactionally.
// There is a startup check to remove non referenced messages case these deletes fail
@@ -1467,9 +1521,13 @@
{
for (MessageReference ref : refs)
{
- add(ref, true, false);
+ addHead(ref);
}
+ // Need to reset all iterators
+
+ resetAllIterators();
+
deliverAsync();
}
}
@@ -1477,6 +1535,18 @@
// Inner classes
// --------------------------------------------------------------------------
+ private static class ConsumerHolder
+ {
+ ConsumerHolder(final Consumer consumer)
+ {
+ this.consumer = consumer;
+ }
+
+ final Consumer consumer;
+
+ LinkedListIterator<MessageReference> iter;
+ }
+
private final class RefsOperation implements TransactionOperation
{
List<MessageReference> refsToAck = new ArrayList<MessageReference>();
@@ -1575,35 +1645,79 @@
}
}
- public synchronized void pause()
+ private class DeliverRunner implements Runnable
{
- paused = true;
+ public void run()
+ {
+ try
+ {
+ deliver();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to deliver", e);
+ }
+ }
}
- public synchronized void resume()
+ private class ConcurrentPoller implements Runnable
{
- paused = false;
-
- deliverAsync();
+ public void run()
+ {
+ doPoll();
+ }
}
- public synchronized boolean isPaused()
+ /* For external use we need to use a synchronized version since the list is not thread safe */
+ private class SynchronizedIterator implements LinkedListIterator<MessageReference>
{
- return paused;
- }
+ private final LinkedListIterator<MessageReference> iter;
- private class DeliverRunner implements Runnable
- {
- public void run()
+ SynchronizedIterator(LinkedListIterator<MessageReference> iter)
{
- try
+ this.iter = iter;
+ }
+
+ public void close()
+ {
+ synchronized (QueueImpl.this)
{
- deliver();
+ iter.close();
}
- catch (Exception e)
+ }
+
+ public void repeat()
+ {
+ synchronized (QueueImpl.this)
{
- log.error("Failed to deliver", e);
+ iter.repeat();
}
}
+
+ public boolean hasNext()
+ {
+ synchronized (QueueImpl.this)
+ {
+ return iter.hasNext();
+ }
+ }
+
+ public MessageReference next()
+ {
+ synchronized (QueueImpl.this)
+ {
+ return iter.next();
+ }
+ }
+
+ public void remove()
+ {
+ synchronized (QueueImpl.this)
+ {
+ iter.remove();
+ }
+ }
+
}
+
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -203,9 +203,15 @@
}
ref.setScheduledDeliveryTime(0);
- // TODO - need to replicate this so backup node also adds back to
- // front of queue
- ref.getQueue().addFirst(ref);
+
+ synchronized (ref.getQueue())
+ {
+ ref.getQueue().resetAllIterators();
+
+ ref.getQueue().addHead(ref);
+
+ ref.getQueue().deliverAsync();
+ }
}
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -509,8 +509,8 @@
public void acknowledge(final long consumerID, final long messageID) throws Exception
{
ServerConsumer consumer = consumers.get(consumerID);
-
- consumer.acknowledge(autoCommitAcks, tx, messageID);
+
+ consumer.acknowledge(autoCommitAcks, tx, messageID);
}
public void individualAcknowledge(final long consumerID, final long messageID) throws Exception
@@ -1131,7 +1131,7 @@
toCancel.addAll(consumer.cancelRefs(false, lastMessageAsDelived, theTx));
}
-
+
for (MessageReference ref : toCancel)
{
ref.getQueue().cancel(theTx, ref);
Modified: trunk/src/main/org/hornetq/utils/HQIterator.java
===================================================================
--- trunk/src/main/org/hornetq/utils/HQIterator.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/src/main/org/hornetq/utils/HQIterator.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -25,4 +25,6 @@
E next();
void remove();
+
+ void prev();
}
Added: trunk/src/main/org/hornetq/utils/LinkedList.java
===================================================================
--- trunk/src/main/org/hornetq/utils/LinkedList.java (rev 0)
+++ trunk/src/main/org/hornetq/utils/LinkedList.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+/**
+ * A LinkedList
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public interface LinkedList<E>
+{
+ void addHead(E e);
+
+ void addTail(E e);
+
+ E poll();
+
+ LinkedListIterator<E> iterator();
+
+ void clear();
+
+ int size();
+}
Added: trunk/src/main/org/hornetq/utils/LinkedListImpl.java
===================================================================
--- trunk/src/main/org/hornetq/utils/LinkedListImpl.java (rev 0)
+++ trunk/src/main/org/hornetq/utils/LinkedListImpl.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -0,0 +1,447 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import java.lang.reflect.Array;
+import java.util.NoSuchElementException;
+
+import org.hornetq.core.logging.Logger;
+
+/**
+ * A linked list implementation which allows multiple iterators to exist at the same time on the queue, and which see any
+ * elements added or removed from the queue either directly or via iterators.
+ *
+ * This class is not thread safe.
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class LinkedListImpl<E> implements LinkedList<E>
+{
+ private static final Logger log = Logger.getLogger(LinkedListImpl.class);
+
+ private static final int INITIAL_ITERATOR_ARRAY_SIZE = 10;
+
+ private Node<E> head = new Node<E>(null);
+
+ private Node<E> tail = null;
+
+ private int size;
+
+ // We store in an array rather than a Map for the best performance
+ private Iterator[] iters;
+
+ private int numIters;
+
+ private int nextIndex;
+
+ public LinkedListImpl()
+ {
+ iters = createIteratorArray(INITIAL_ITERATOR_ARRAY_SIZE);
+ }
+
+ public void addHead(E e)
+ {
+ Node<E> node = new Node<E>(e);
+
+ node.next = head.next;
+
+ node.prev = head;
+
+ head.next = node;
+
+ if (size == 0)
+ {
+ tail = node;
+ }
+
+ size++;
+ }
+
+ public void addTail(E e)
+ {
+ if (size == 0)
+ {
+ addHead(e);
+ }
+ else
+ {
+ Node<E> node = new Node<E>(e);
+
+ node.prev = tail;
+
+ tail.next = node;
+
+ tail = node;
+
+ size++;
+ }
+ }
+
+ public E poll()
+ {
+ Node<E> ret = head.next;
+
+ if (ret != null)
+ {
+ removeAfter(head);
+
+ return ret.val;
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public void clear()
+ {
+ tail = head.next = null;
+
+ size = 0;
+ }
+
+ public int size()
+ {
+ return size;
+ }
+
+ public LinkedListIterator<E> iterator()
+ {
+ return new Iterator();
+ }
+
+ public String toString()
+ {
+ StringBuilder str = new StringBuilder("LinkedListImpl [ ");
+
+ Node<E> node = head;
+
+ while (node != null)
+ {
+ str.append(node.toString());
+
+ if (node.next != null)
+ {
+ str.append(", ");
+ }
+
+ node = node.next;
+ }
+
+ return str.toString();
+ }
+
+ public int numIters()
+ {
+ return numIters;
+ }
+
+ private Iterator[] createIteratorArray(int size)
+ {
+ return (Iterator[])Array.newInstance(Iterator.class, size);
+ }
+
+ private void removeAfter(Node<E> after)
+ {
+ Node<E> toRemove = after.next;
+
+ after.next = toRemove.next;
+
+ if (toRemove.next != null)
+ {
+ toRemove.next.prev = after;
+ }
+
+ if (toRemove == tail)
+ {
+ tail = after;
+ }
+
+ size--;
+
+ if (toRemove.iterCount != 0)
+ {
+ LinkedListImpl.this.nudgeIterators(toRemove);
+ }
+ }
+
+ private void nudgeIterators(Node<E> node)
+ {
+ for (int i = 0; i < numIters; i++)
+ {
+ iters[i].nudged(node);
+ }
+ }
+
+ private void addIter(Iterator iter)
+ {
+ if (numIters == iters.length)
+ {
+ resize(2 * numIters);
+ }
+
+ iters[nextIndex++] = iter;
+
+ numIters++;
+ }
+
+ private void resize(int newSize)
+ {
+ Iterator[] newIters = createIteratorArray(newSize);
+
+ System.arraycopy(iters, 0, newIters, 0, numIters);
+
+ iters = newIters;
+ }
+
+ private void removeIter(Iterator iter)
+ {
+ for (int i = 0; i < numIters; i++)
+ {
+ if (iter == iters[i])
+ {
+ iters[i] = null;
+
+ if (i != numIters - 1)
+ {
+ // Fill in the hole
+
+ System.arraycopy(iters, i + 1, iters, i, numIters - i - 1);
+ }
+
+ numIters--;
+
+ if (numIters >= INITIAL_ITERATOR_ARRAY_SIZE && numIters == iters.length / 2)
+ {
+ resize(numIters);
+ }
+
+ nextIndex--;
+
+ return;
+ }
+ }
+
+ throw new IllegalStateException("Cannot find iter to remove");
+ }
+
+ private static final class Node<E>
+ {
+ Node<E> next;
+
+ Node<E> prev;
+
+ final E val;
+
+ int iterCount;
+
+ Node(E e)
+ {
+ val = e;
+ }
+
+ public String toString()
+ {
+ return "Node, value = " + val;
+ }
+ }
+
+ private class Iterator implements LinkedListIterator<E>
+ {
+ Node<E> last;
+
+ Node<E> current = head.next;
+
+ boolean repeat;
+
+ Iterator()
+ {
+ if (current != null)
+ {
+ current.iterCount++;
+ }
+
+ addIter(this);
+ }
+
+ public void repeat()
+ {
+ repeat = true;
+ }
+
+ public boolean hasNext()
+ {
+ Node<E> e = getNode();
+
+ if (e != null && (e != last || repeat))
+ {
+ return true;
+ }
+
+ return canAdvance();
+ }
+
+ public E next()
+ {
+ Node<E> e = getNode();
+
+ if (repeat)
+ {
+ repeat = false;
+
+ if (e != null)
+ {
+ return e.val;
+ }
+ else
+ {
+ if (canAdvance())
+ {
+ advance();
+
+ e = getNode();
+
+ return e.val;
+ }
+ else
+ {
+ throw new NoSuchElementException();
+ }
+ }
+ }
+
+ if (e == null || e == last)
+ {
+ if (canAdvance())
+ {
+ advance();
+
+ e = getNode();
+ }
+ else
+ {
+ throw new NoSuchElementException();
+ }
+ }
+
+ last = e;
+
+ repeat = false;
+
+ return e.val;
+ }
+
+ public void remove()
+ {
+ if (last == null)
+ {
+ throw new NoSuchElementException();
+ }
+
+ if (current == null)
+ {
+ throw new NoSuchElementException();
+ }
+
+ LinkedListImpl.this.removeAfter(current.prev);
+
+ last = null;
+ }
+
+ public void close()
+ {
+ removeIter(this);
+ }
+
+ public void nudged(Node<E> node)
+ {
+ if (current == node)
+ {
+ if (canAdvance())
+ {
+ advance();
+ }
+ else
+ {
+ if (current.prev != head)
+ {
+ current.iterCount--;
+
+ current = current.prev;
+
+ current.iterCount++;
+ }
+ else
+ {
+ current = null;
+ }
+ }
+ }
+ }
+
+ private Node<E> getNode()
+ {
+ if (current == null)
+ {
+ current = head.next;
+
+ if (current != null)
+ {
+ current.iterCount++;
+ }
+ }
+
+ if (current != null)
+ {
+ return current;
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ private boolean canAdvance()
+ {
+ if (current == null)
+ {
+ current = head.next;
+
+ if (current != null)
+ {
+ current.iterCount++;
+ }
+ }
+
+ return current != null && current.next != null;
+ }
+
+ private void advance()
+ {
+ if (current == null || current.next == null)
+ {
+ throw new NoSuchElementException();
+ }
+
+ current.iterCount--;
+
+ current = current.next;
+
+ current.iterCount++;
+ }
+
+ }
+}
Added: trunk/src/main/org/hornetq/utils/LinkedListIterator.java
===================================================================
--- trunk/src/main/org/hornetq/utils/LinkedListIterator.java (rev 0)
+++ trunk/src/main/org/hornetq/utils/LinkedListIterator.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import java.util.Iterator;
+
+
+/**
+ * A LinkedListIterator
+ *
+ * This iterator allows the last element to be repeated in the next call to hasNext or next
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public interface LinkedListIterator<E> extends Iterator<E>
+{
+ void repeat();
+
+ void close();
+}
Deleted: trunk/src/main/org/hornetq/utils/NonConcurrentHQDeque.java
===================================================================
--- trunk/src/main/org/hornetq/utils/NonConcurrentHQDeque.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/src/main/org/hornetq/utils/NonConcurrentHQDeque.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -1,101 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.utils;
-
-import java.util.Iterator;
-import java.util.LinkedList;
-
-import org.hornetq.core.logging.Logger;
-
-/**
- * A NonConcurrentHQDeque
- *
- * @author Tim Fox
- *
- *
- */
-public class NonConcurrentHQDeque<T> implements HQDeque<T>
-{
- private static final Logger log = Logger.getLogger(NonConcurrentHQDeque.class);
-
- private final LinkedList<T> queue;
-
- public NonConcurrentHQDeque()
- {
- this.queue = new LinkedList<T>();
- }
-
- public synchronized void addFirst(T t)
- {
- queue.addFirst(t);
- }
-
- public void addLast(T t)
- {
- queue.add(t);
- }
-
- public void clear()
- {
- queue.clear();
- }
-
- public synchronized T getFirst()
- {
- return queue.getFirst();
- }
-
- public boolean isEmpty()
- {
- return queue.isEmpty();
- }
-
- public HQIterator<T> iterator()
- {
- return new Iter();
- }
-
- public T removeFirst()
- {
- return queue.removeFirst();
- }
-
- private class Iter implements HQIterator<T>
- {
- private Iterator<T> iter;
-
- private Iter()
- {
- iter = queue.iterator();
- }
-
- public T next()
- {
- if (iter.hasNext())
- {
- return iter.next();
- }
- else
- {
- return null;
- }
- }
-
- public void remove()
- {
- iter.remove();
- }
-
- }
-}
Modified: trunk/src/main/org/hornetq/utils/PriorityLinkedList.java
===================================================================
--- trunk/src/main/org/hornetq/utils/PriorityLinkedList.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/src/main/org/hornetq/utils/PriorityLinkedList.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -25,19 +25,17 @@
*/
public interface PriorityLinkedList<T>
{
- int addFirst(T t, int priority);
+ void addHead(T t, int priority);
- int addLast(T t, int priority);
+ void addTail(T t, int priority);
- T removeFirst();
+ T poll();
- T peekFirst();
-
void clear();
int size();
- HQIterator<T> iterator();
+ LinkedListIterator<T> iterator();
boolean isEmpty();
}
Modified: trunk/src/main/org/hornetq/utils/PriorityLinkedListImpl.java
===================================================================
--- trunk/src/main/org/hornetq/utils/PriorityLinkedListImpl.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/src/main/org/hornetq/utils/PriorityLinkedListImpl.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -15,7 +15,6 @@
import java.lang.reflect.Array;
import java.util.NoSuchElementException;
-import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.core.logging.Logger;
@@ -34,80 +33,87 @@
{
private static final Logger log = Logger.getLogger(PriorityLinkedListImpl.class);
- protected HQDeque<T>[] levels;
+ protected LinkedListImpl<T>[] levels;
protected final int priorities;
- private final AtomicInteger size = new AtomicInteger(0);
+ private int size;
+ private int lastReset;
+
+ private int highestPriority = -1;
+
public PriorityLinkedListImpl(final int priorities)
{
this.priorities = priorities;
- levels = (HQDeque<T>[])Array.newInstance(HQDeque.class, priorities);
+ levels = (LinkedListImpl<T>[])Array.newInstance(LinkedListImpl.class, priorities);
for (int i = 0; i < priorities; i++)
{
- levels[i] = new NonConcurrentHQDeque<T>();
+ levels[i] = new LinkedListImpl<T>();
}
}
+
+ private void checkHighest(int priority)
+ {
+ if (priority > highestPriority)
+ {
+ highestPriority = priority;
+
+ lastReset++;
+ }
+ }
- public int addFirst(final T t, final int priority)
+ public void addHead(final T t, final int priority)
{
- levels[priority].addFirst(t);
-
- return size.incrementAndGet();
+ checkHighest(priority);
+
+ levels[priority].addHead(t);
+
+ size++;
}
- public int addLast(final T t, final int priority)
+ public void addTail(final T t, final int priority)
{
- levels[priority].addLast(t);
-
- return size.incrementAndGet();
+ checkHighest(priority);
+
+ levels[priority].addTail(t);
+
+ size++;
}
- public T removeFirst()
+ public T poll()
{
T t = null;
- // Initially we are just using a simple prioritization algorithm:
+ // We are just using a simple prioritization algorithm:
// Highest priority refs always get returned first.
// This could cause starvation of lower priority refs.
// TODO - A better prioritization algorithm
- for (int i = priorities - 1; i >= 0; i--)
+ for (int i = highestPriority; i >= 0; i--)
{
- HQDeque<T> ll = levels[i];
+ LinkedListImpl<T> ll = levels[i];
- if (!ll.isEmpty())
+ if (ll.size() != 0)
{
- t = ll.removeFirst();
- break;
- }
- }
+ t = ll.poll();
- if (t != null)
- {
- size.decrementAndGet();
- }
+ if (t != null)
+ {
+ size--;
- return t;
- }
+ if (ll.size() == 0)
+ {
+ if (highestPriority == i)
+ {
+ highestPriority--;
+ }
+ }
+ }
- public T peekFirst()
- {
- T t = null;
-
- for (int i = priorities - 1; i >= 0; i--)
- {
- HQDeque<T> ll = levels[i];
- if (!ll.isEmpty())
- {
- t = ll.getFirst();
- }
- if (t != null)
- {
break;
}
}
@@ -117,83 +123,134 @@
public void clear()
{
- for (HQDeque<T> list : levels)
+ for (LinkedListImpl<T> list : levels)
{
list.clear();
}
- size.set(0);
+ size = 0;
}
public int size()
{
- return size.get();
+ return size;
}
public boolean isEmpty()
{
- return size.get() == 0;
+ return size == 0;
}
- public HQIterator<T> iterator()
+ public LinkedListIterator<T> iterator()
{
return new PriorityLinkedListIterator();
}
- private class PriorityLinkedListIterator implements HQIterator<T>
+ private class PriorityLinkedListIterator implements LinkedListIterator<T>
{
private int index;
- private HQIterator<T>[] cachedIters = new HQIterator[levels.length];
+ private LinkedListIterator<T>[] cachedIters = new LinkedListIterator[levels.length];
+ private LinkedListIterator<T> lastIter;
+
+ private int resetCount = lastReset;
+
PriorityLinkedListIterator()
{
index = levels.length - 1;
}
- public T next()
+ public void repeat()
{
- while (index >= 0)
+ if (lastIter == null)
{
- HQIterator<T> iter = cachedIters[index];
-
- if (iter == null)
+ throw new NoSuchElementException();
+ }
+
+ lastIter.repeat();
+ }
+
+ public void close()
+ {
+ lastIter = null;
+
+ for (LinkedListIterator<T> iter : cachedIters)
+ {
+ if (iter != null)
{
- iter = cachedIters[index] = levels[index].iterator();
+ iter.close();
}
+ }
+ }
+
+ private void checkReset()
+ {
+ if (lastReset > resetCount)
+ {
+ index = highestPriority;
- T t = iter.next();
-
- if (t != null)
+ resetCount = lastReset;
+ }
+ }
+
+ public boolean hasNext()
+ {
+ checkReset();
+
+ while (index >= 0)
+ {
+ lastIter = cachedIters[index];
+
+ if (lastIter == null)
{
- return t;
+ lastIter = cachedIters[index] = levels[index].iterator();
}
-
+
+ boolean b = lastIter.hasNext();
+
+ if (b)
+ {
+ return true;
+ }
+
index--;
-
+
if (index < 0)
{
index = levels.length - 1;
-
+
break;
}
}
-
- return null;
+ return false;
}
+ public T next()
+ {
+ if (lastIter == null)
+ {
+ throw new NoSuchElementException();
+ }
+
+ return lastIter.next();
+ }
+
public void remove()
{
- HQIterator<T> iter = cachedIters[index];
-
- if (iter == null)
+ if (lastIter == null)
{
throw new NoSuchElementException();
}
+
+ lastIter.remove();
- iter.remove();
+ if (index == highestPriority && levels[index].size() == 0)
+ {
+ highestPriority--;
+ }
- size.decrementAndGet();
+ size--;
}
}
}
Modified: trunk/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -150,7 +150,7 @@
MessageReference ref = message.createReference(queue);
- queue.addLast(ref, false);
+ queue.addTail(ref, false);
refs.add(ref);
Modified: trunk/tests/src/org/hornetq/tests/integration/client/AcknowledgeTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/AcknowledgeTest.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/tests/src/org/hornetq/tests/integration/client/AcknowledgeTest.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -25,6 +25,7 @@
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.MessageHandler;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
import org.hornetq.tests.util.ServiceTestBase;
@@ -34,6 +35,8 @@
*/
public class AcknowledgeTest extends ServiceTestBase
{
+ private static final Logger log = Logger.getLogger(AcknowledgeTest.class);
+
public final SimpleString addressA = new SimpleString("addressA");
public final SimpleString queueA = new SimpleString("queueA");
@@ -96,17 +99,23 @@
sendSession.createQueue(addressA, queueA, false);
ClientProducer cp = sendSession.createProducer(addressA);
ClientConsumer cc = session.createConsumer(queueA);
- int numMessages = 100;
+ int numMessages = 3;
for (int i = 0; i < numMessages; i++)
{
cp.send(sendSession.createMessage(false));
}
+
+ Thread.sleep(500);
+ log.info("woke up");
+
final CountDownLatch latch = new CountDownLatch(numMessages);
session.start();
cc.setMessageHandler(new MessageHandler()
{
+ int c = 0;
public void onMessage(final ClientMessage message)
{
+ log.info("Got message " + c++);
latch.countDown();
}
});
Modified: trunk/tests/src/org/hornetq/tests/integration/client/ConsumerFilterTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ConsumerFilterTest.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ConsumerFilterTest.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -76,7 +76,7 @@
producer.send(message);
- assertNull(consumer.receive(500));
+ assertNull(consumer.receiveImmediate());
message = session.createMessage(false);
@@ -86,14 +86,14 @@
producer.send(message);
- ClientMessage received = consumer.receive(500);
+ ClientMessage received = consumer.receiveImmediate();
assertNotNull(received);
assertEquals("giraffe", received.getStringProperty("animal"));
- assertNull(consumer.receive(500));
-
+ assertNull(consumer.receiveImmediate());
+
session.close();
}
@@ -113,7 +113,6 @@
for (int i = 0; i < QueueImpl.MAX_DELIVERIES_IN_LOOP * 2; i++)
{
-
ClientMessage message = session.createMessage(false);
message.putStringProperty("animal", "hippo");
@@ -121,7 +120,7 @@
producer.send(message);
}
- assertNull(consumer.receive(500));
+ assertNull(consumer.receiveImmediate());
for (int i = 0; i < QueueImpl.MAX_DELIVERIES_IN_LOOP * 2; i++)
{
@@ -134,15 +133,101 @@
for (int i = 0; i < QueueImpl.MAX_DELIVERIES_IN_LOOP * 2; i++)
{
- ClientMessage received = consumer.receive(500);
+ ClientMessage received = consumer.receiveImmediate();
assertNotNull(received);
assertEquals("giraffe", received.getStringProperty("animal"));
}
+
+ assertNull(consumer.receiveImmediate());
- assertNull(consumer.receive(500));
+ session.close();
+ }
+
+ public void testTwoConsumers() throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
+ ClientSession session = sf.createSession();
+
+ session.start();
+
+ session.createQueue("foo", "foo");
+
+ ClientProducer producer = session.createProducer("foo");
+
+ ClientConsumer consumer1 = session.createConsumer("foo", "animal='giraffe'");
+
+ ClientConsumer consumer2 = session.createConsumer("foo", "animal='elephant'");
+
+ //Create and consume message that matches the first consumer's filter
+
+ ClientMessage message = session.createMessage(false);
+
+ message.putStringProperty("animal", "giraffe");
+
+ producer.send(message);
+
+ ClientMessage received = consumer1.receive(10000);
+
+ assertNotNull(received);
+
+ assertEquals("giraffe", received.getStringProperty("animal"));
+
+ assertNull(consumer1.receiveImmediate());
+ assertNull(consumer2.receiveImmediate());
+
+ //Create and consume another message that matches the first consumer's filter
+ message = session.createMessage(false);
+
+ message.putStringProperty("animal", "giraffe");
+
+ producer.send(message);
+
+ received = consumer1.receive(10000);
+
+ assertNotNull(received);
+
+ assertEquals("giraffe", received.getStringProperty("animal"));
+
+ assertNull(consumer1.receiveImmediate());
+ assertNull(consumer2.receiveImmediate());
+
+ //Create and consume a message that matches the second consumer's filter
+
+ message = session.createMessage(false);
+
+ message.putStringProperty("animal", "elephant");
+
+ producer.send(message);
+
+ received = consumer2.receive(10000);
+
+ assertNotNull(received);
+
+ assertEquals("elephant", received.getStringProperty("animal"));
+
+ assertNull(consumer1.receiveImmediate());
+ assertNull(consumer2.receiveImmediate());
+
+ //Create and consume another message that matches the second consumer's filter
+
+ message = session.createMessage(false);
+
+ message.putStringProperty("animal", "elephant");
+
+ producer.send(message);
+
+ received = consumer2.receive(1000);
+
+ assertNotNull(received);
+
+ assertEquals("elephant", received.getStringProperty("animal"));
+
+ assertNull(consumer1.receiveImmediate());
+ assertNull(consumer2.receiveImmediate());
+
session.close();
}
}
Modified: trunk/tests/src/org/hornetq/tests/integration/client/ConsumerRoundRobinTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ConsumerRoundRobinTest.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ConsumerRoundRobinTest.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -57,7 +57,7 @@
consumers[4] = session.createConsumer(queueA);
ClientProducer cp = session.createProducer(addressA);
- int numMessage = 100;
+ int numMessage = 10;
for (int i = 0; i < numMessage; i++)
{
ClientMessage cm = session.createMessage(false);
@@ -67,8 +67,10 @@
int currMessage = 0;
for (int i = 0; i < numMessage / 5; i++)
{
+ log.info("i is " + i);
for (int j = 0; j < 5; j++)
{
+ log.info("j is " + j);
ClientMessage cm = consumers[j].receive(5000);
Assert.assertNotNull(cm);
Assert.assertEquals(currMessage++, cm.getBodyBuffer().readInt());
Modified: trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -610,7 +610,7 @@
try
{
- final int numberOfMessages = 100;
+ final int numberOfMessages = 10;
server.start();
@@ -660,6 +660,8 @@
String str = getTextMessage(msg);
Assert.assertEquals("Msg" + i, str);
+
+ log.info("got msg " + str);
msg.acknowledge();
@@ -673,8 +675,13 @@
ClientMessage msg = cons2.receive(1000);
Assert.assertNotNull("expected message at i = " + i, msg);
+
+ String str = getTextMessage(msg);
+
+ log.info("got msg " + str);
- Assert.assertEquals("Msg" + i, msg.getBodyBuffer().readString());
+
+ Assert.assertEquals("Msg" + i, str);
msg.acknowledge();
@@ -1010,11 +1017,14 @@
{
try
{
+ log.info("received msg " + message);
String str = getTextMessage(message);
if (ConsumerWindowSizeTest.isTrace)
{
ConsumerWindowSizeTest.log.trace("Received message " + str);
}
+
+ ConsumerWindowSizeTest.log.info("Received message " + str);
failed = failed || !str.equals("Msg" + count);
@@ -1058,9 +1068,12 @@
Assert.assertTrue(latchReceived.await(TIMEOUT, TimeUnit.SECONDS));
+ log.info("bs " + consReceiveOneAndHold.getBufferSize());
+
long timeout = System.currentTimeMillis() + 1000 * TIMEOUT;
while (consReceiveOneAndHold.getBufferSize() == 0 && System.currentTimeMillis() < timeout)
{
+ log.info("bs " + consReceiveOneAndHold.getBufferSize());
Thread.sleep(10);
}
Modified: trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -480,7 +480,7 @@
SimpleString groupId = new SimpleString("grp1");
SimpleString groupId2 = new SimpleString("grp2");
- int numMessages = 10;
+ int numMessages = 4;
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = createTextMessage("m" + i, clientSession);
Modified: trunk/tests/src/org/hornetq/tests/integration/client/MessagePriorityTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/MessagePriorityTest.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/tests/src/org/hornetq/tests/integration/client/MessagePriorityTest.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -126,6 +126,9 @@
for (int i = 9; i >= 0; i--)
{
ClientMessage m = consumer.receive(500);
+
+ log.info("received msg " + m.getPriority());
+
Assert.assertNotNull(m);
Assert.assertEquals(i, m.getPriority());
}
@@ -221,9 +224,9 @@
ClientMessage m = createTextMessage(Integer.toString(i), session);
m.setPriority((byte)i);
producer.send(m);
-
- Thread.sleep(20);
}
+
+ Thread.sleep(500);
// Now we wait a little bit to make sure the messages are in the client side buffer
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/OrderReattachTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/OrderReattachTest.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/OrderReattachTest.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -16,6 +16,7 @@
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
@@ -35,7 +36,6 @@
import org.hornetq.core.server.HornetQServer;
import org.hornetq.jms.client.HornetQTextMessage;
import org.hornetq.tests.util.ServiceTestBase;
-import org.hornetq.utils.concurrent.LinkedBlockingDeque;
/**
* A OrderReattachTest
Modified: trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -372,7 +372,7 @@
ClientMessage message = session.createMessage(false);
message.putIntProperty(new SimpleString("key"), intValue);
producer.send(message);
-
+
String jsonString = queueControl.listMessagesAsJSON(null);
Assert.assertNotNull(jsonString);
JSONArray array = new JSONArray(jsonString);
Modified: trunk/tests/src/org/hornetq/tests/integration/scheduling/DelayedMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/scheduling/DelayedMessageTest.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/tests/src/org/hornetq/tests/integration/scheduling/DelayedMessageTest.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -136,6 +136,8 @@
Assert.assertNotNull(tm);
long time = System.currentTimeMillis();
+
+ log.info("delay " + (time-now));
Assert.assertTrue(time - now >= DelayedMessageTest.DELAY);
Modified: trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -20,6 +20,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@@ -41,7 +42,6 @@
import org.hornetq.utils.HornetQThreadFactory;
import org.hornetq.utils.OrderedExecutorFactory;
import org.hornetq.utils.SimpleIDGenerator;
-import org.hornetq.utils.concurrent.LinkedBlockingDeque;
/**
* A SoakJournal
Modified: trunk/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -84,34 +84,34 @@
MessageReference ref1 = generateReference(queue, 1);
ref1.setScheduledDeliveryTime(now + 7000);
- queue.addLast(ref1);
+ queue.addTail(ref1);
// Send some non scheduled messages
MessageReference ref2 = generateReference(queue, 2);
- queue.addLast(ref2);
+ queue.addTail(ref2);
MessageReference ref3 = generateReference(queue, 3);
- queue.addLast(ref3);
+ queue.addTail(ref3);
MessageReference ref4 = generateReference(queue, 4);
- queue.addLast(ref4);
+ queue.addTail(ref4);
// Now send some more scheduled messages
MessageReference ref5 = generateReference(queue, 5);
ref5.setScheduledDeliveryTime(now + 5000);
- queue.addLast(ref5);
+ queue.addTail(ref5);
MessageReference ref6 = generateReference(queue, 6);
ref6.setScheduledDeliveryTime(now + 4000);
- queue.addLast(ref6);
+ queue.addTail(ref6);
MessageReference ref7 = generateReference(queue, 7);
ref7.setScheduledDeliveryTime(now + 3000);
- queue.addLast(ref7);
+ queue.addTail(ref7);
MessageReference ref8 = generateReference(queue, 8);
ref8.setScheduledDeliveryTime(now + 6000);
- queue.addLast(ref8);
+ queue.addTail(ref8);
List<MessageReference> refs = new ArrayList<MessageReference>();
@@ -162,34 +162,34 @@
MessageReference ref1 = generateReference(queue, 1);
ref1.setScheduledDeliveryTime(now + 7000);
- queue.addLast(ref1);
+ queue.addTail(ref1);
// Send some non scheduled messages
MessageReference ref2 = generateReference(queue, 2);
- queue.addLast(ref2);
+ queue.addTail(ref2);
MessageReference ref3 = generateReference(queue, 3);
- queue.addLast(ref3);
+ queue.addTail(ref3);
MessageReference ref4 = generateReference(queue, 4);
- queue.addLast(ref4);
+ queue.addTail(ref4);
// Now send some more scheduled messages
MessageReference ref5 = generateReference(queue, 5);
ref5.setScheduledDeliveryTime(now + 5000);
- queue.addLast(ref5);
+ queue.addTail(ref5);
MessageReference ref6 = generateReference(queue, 6);
ref6.setScheduledDeliveryTime(now + 4000);
- queue.addLast(ref6);
+ queue.addTail(ref6);
MessageReference ref7 = generateReference(queue, 7);
ref7.setScheduledDeliveryTime(now + 3000);
- queue.addLast(ref7);
+ queue.addTail(ref7);
MessageReference ref8 = generateReference(queue, 8);
ref8.setScheduledDeliveryTime(now + 6000);
- queue.addLast(ref8);
+ queue.addTail(ref8);
consumer = new FakeConsumer();
@@ -263,7 +263,7 @@
MessageReference messageReference = generateReference(queue, 1);
queue.addConsumer(consumer);
messageReference.setScheduledDeliveryTime(System.currentTimeMillis() + 2000);
- queue.addFirst(messageReference);
+ queue.addHead(messageReference);
boolean gotLatch = countDownLatch.await(3000, TimeUnit.MILLISECONDS);
Assert.assertTrue(gotLatch);
Deleted: trunk/tests/src/org/hornetq/tests/unit/core/list/impl/ConcurrentPriorityLinkedListTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/list/impl/ConcurrentPriorityLinkedListTest.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/tests/src/org/hornetq/tests/unit/core/list/impl/ConcurrentPriorityLinkedListTest.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -1,35 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.unit.core.list.impl;
-
-import org.hornetq.utils.PriorityLinkedListImpl;
-import org.hornetq.utils.concurrent.ConcurrentPriorityLinkedListImpl;
-
-/**
- * A ConcurrentPriorityLinkedListTest
- *
- * @author Tim Fox
- *
- *
- */
-public class ConcurrentPriorityLinkedListTest extends PriorityLinkedListTestBase
-{
-
- @Override
- protected PriorityLinkedListImpl<Wibble> getList()
- {
- return new ConcurrentPriorityLinkedListImpl<Wibble>(10);
- }
-
-}
Deleted: trunk/tests/src/org/hornetq/tests/unit/core/list/impl/NonConcurrentPriorityLinkedListTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/list/impl/NonConcurrentPriorityLinkedListTest.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/tests/src/org/hornetq/tests/unit/core/list/impl/NonConcurrentPriorityLinkedListTest.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -1,34 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.unit.core.list.impl;
-
-import org.hornetq.utils.PriorityLinkedListImpl;
-
-/**
- * A NonConcurrentPriorityLinkedListTest
- *
- * @author Tim Fox
- *
- *
- */
-public class NonConcurrentPriorityLinkedListTest extends PriorityLinkedListTestBase
-{
-
- @Override
- protected PriorityLinkedListImpl<Wibble> getList()
- {
- return new PriorityLinkedListImpl<Wibble>(10);
- }
-
-}
Copied: trunk/tests/src/org/hornetq/tests/unit/core/list/impl/PriorityLinkedListTest.java (from rev 9484, trunk/tests/src/org/hornetq/tests/unit/core/list/impl/NonConcurrentPriorityLinkedListTest.java)
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/list/impl/PriorityLinkedListTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/unit/core/list/impl/PriorityLinkedListTest.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.unit.core.list.impl;
+
+import org.hornetq.utils.PriorityLinkedListImpl;
+
+/**
+ * A PriorityLinkedListTest
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class PriorityLinkedListTest extends PriorityLinkedListTestBase
+{
+
+ @Override
+ protected PriorityLinkedListImpl<Wibble> getList()
+ {
+ return new PriorityLinkedListImpl<Wibble>(10);
+ }
+
+}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/list/impl/PriorityLinkedListTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/list/impl/PriorityLinkedListTestBase.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/tests/src/org/hornetq/tests/unit/core/list/impl/PriorityLinkedListTestBase.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -16,7 +16,7 @@
import junit.framework.Assert;
import junit.framework.TestCase;
-import org.hornetq.utils.HQIterator;
+import org.hornetq.utils.LinkedListIterator;
import org.hornetq.utils.PriorityLinkedListImpl;
/**
@@ -126,414 +126,451 @@
{
Assert.assertTrue(list.isEmpty());
- list.addFirst(a, 0);
+ list.addHead(a, 0);
Assert.assertFalse(list.isEmpty());
- Wibble w = list.removeFirst();
+ Wibble w = list.poll();
Assert.assertEquals(a, w);
Assert.assertTrue(list.isEmpty());
+
+ assertEquals(0, list.size());
}
- public void testAddFirst() throws Exception
+ public void testaddHead() throws Exception
{
- list.addFirst(a, 0);
- list.addFirst(b, 0);
- list.addFirst(c, 0);
- list.addFirst(d, 0);
- list.addFirst(e, 0);
+ list.addHead(a, 0);
+ list.addHead(b, 0);
+ list.addHead(c, 0);
+ list.addHead(d, 0);
+ list.addHead(e, 0);
+
+ assertEquals(5, list.size());
- Assert.assertEquals(e, list.removeFirst());
- Assert.assertEquals(d, list.removeFirst());
- Assert.assertEquals(c, list.removeFirst());
- Assert.assertEquals(b, list.removeFirst());
- Assert.assertEquals(a, list.removeFirst());
- Assert.assertNull(list.removeFirst());
+ Assert.assertEquals(e, list.poll());
+ Assert.assertEquals(d, list.poll());
+ Assert.assertEquals(c, list.poll());
+ Assert.assertEquals(b, list.poll());
+ Assert.assertEquals(a, list.poll());
+ Assert.assertNull(list.poll());
+
+ assertEquals(0, list.size());
}
- public void testAddLast() throws Exception
+ public void testaddTail() throws Exception
{
- list.addLast(a, 0);
- list.addLast(b, 0);
- list.addLast(c, 0);
- list.addLast(d, 0);
- list.addLast(e, 0);
+ list.addTail(a, 0);
+ list.addTail(b, 0);
+ list.addTail(c, 0);
+ list.addTail(d, 0);
+ list.addTail(e, 0);
+ assertEquals(5, list.size());
- Assert.assertEquals(a, list.removeFirst());
- Assert.assertEquals(b, list.removeFirst());
- Assert.assertEquals(c, list.removeFirst());
- Assert.assertEquals(d, list.removeFirst());
- Assert.assertEquals(e, list.removeFirst());
- Assert.assertNull(list.removeFirst());
+ Assert.assertEquals(a, list.poll());
+ Assert.assertEquals(b, list.poll());
+ Assert.assertEquals(c, list.poll());
+ Assert.assertEquals(d, list.poll());
+ Assert.assertEquals(e, list.poll());
+ Assert.assertNull(list.poll());
+
+ assertEquals(0, list.size());
}
public void testAddLastAndFirst() throws Exception
{
- list.addLast(a, 0);
- list.addLast(b, 0);
- list.addLast(c, 0);
- list.addLast(d, 0);
- list.addLast(e, 0);
- list.addLast(f, 0);
- list.addLast(g, 0);
- list.addLast(h, 0);
- list.addLast(i, 0);
- list.addLast(j, 0);
+ list.addTail(a, 0);
+ list.addTail(b, 0);
+ list.addTail(c, 0);
+ list.addTail(d, 0);
+ list.addTail(e, 0);
+ list.addTail(f, 0);
+ list.addTail(g, 0);
+ list.addTail(h, 0);
+ list.addTail(i, 0);
+ list.addTail(j, 0);
- list.addFirst(k, 0);
- list.addFirst(l, 0);
- list.addFirst(m, 0);
- list.addFirst(n, 0);
- list.addFirst(o, 0);
- list.addFirst(p, 0);
- list.addFirst(q, 0);
- list.addFirst(r, 0);
- list.addFirst(s, 0);
- list.addFirst(t, 0);
+ list.addHead(k, 0);
+ list.addHead(l, 0);
+ list.addHead(m, 0);
+ list.addHead(n, 0);
+ list.addHead(o, 0);
+ list.addHead(p, 0);
+ list.addHead(q, 0);
+ list.addHead(r, 0);
+ list.addHead(s, 0);
+ list.addHead(t, 0);
- assertEquals(t, list.removeFirst());
- assertEquals(s, list.removeFirst());
- assertEquals(r, list.removeFirst());
- assertEquals(q, list.removeFirst());
- assertEquals(p, list.removeFirst());
- assertEquals(o, list.removeFirst());
- assertEquals(n, list.removeFirst());
- assertEquals(m, list.removeFirst());
- assertEquals(l, list.removeFirst());
- assertEquals(k, list.removeFirst());
+ assertEquals(t, list.poll());
+ assertEquals(s, list.poll());
+ assertEquals(r, list.poll());
+ assertEquals(q, list.poll());
+ assertEquals(p, list.poll());
+ assertEquals(o, list.poll());
+ assertEquals(n, list.poll());
+ assertEquals(m, list.poll());
+ assertEquals(l, list.poll());
+ assertEquals(k, list.poll());
- assertEquals(a, list.removeFirst());
- assertEquals(b, list.removeFirst());
- assertEquals(c, list.removeFirst());
- assertEquals(d, list.removeFirst());
- assertEquals(e, list.removeFirst());
- assertEquals(f, list.removeFirst());
- assertEquals(g, list.removeFirst());
- assertEquals(h, list.removeFirst());
- assertEquals(i, list.removeFirst());
- assertEquals(j, list.removeFirst());
+ assertEquals(a, list.poll());
+ assertEquals(b, list.poll());
+ assertEquals(c, list.poll());
+ assertEquals(d, list.poll());
+ assertEquals(e, list.poll());
+ assertEquals(f, list.poll());
+ assertEquals(g, list.poll());
+ assertEquals(h, list.poll());
+ assertEquals(i, list.poll());
+ assertEquals(j, list.poll());
}
public void testAddLastAndFirstWithIterator() throws Exception
{
- list.addLast(a, 0);
- list.addLast(b, 0);
- list.addLast(c, 0);
- list.addLast(d, 0);
- list.addLast(e, 0);
- list.addLast(f, 0);
- list.addLast(g, 0);
- list.addLast(h, 0);
- list.addLast(i, 0);
- list.addLast(j, 0);
+ list.addTail(a, 0);
+ list.addTail(b, 0);
+ list.addTail(c, 0);
+ list.addTail(d, 0);
+ list.addTail(e, 0);
+ list.addTail(f, 0);
+ list.addTail(g, 0);
+ list.addTail(h, 0);
+ list.addTail(i, 0);
+ list.addTail(j, 0);
- list.addFirst(k, 0);
- list.addFirst(l, 0);
- list.addFirst(m, 0);
- list.addFirst(n, 0);
- list.addFirst(o, 0);
- list.addFirst(p, 0);
- list.addFirst(q, 0);
- list.addFirst(r, 0);
- list.addFirst(s, 0);
- list.addFirst(t, 0);
+ list.addHead(k, 0);
+ list.addHead(l, 0);
+ list.addHead(m, 0);
+ list.addHead(n, 0);
+ list.addHead(o, 0);
+ list.addHead(p, 0);
+ list.addHead(q, 0);
+ list.addHead(r, 0);
+ list.addHead(s, 0);
+ list.addHead(t, 0);
- HQIterator<Wibble> iter = list.iterator();
+ LinkedListIterator<Wibble> iter = list.iterator();
+ assertTrue(iter.hasNext());
assertEquals(t, iter.next());
+ assertTrue(iter.hasNext());
assertEquals(s, iter.next());
+ assertTrue(iter.hasNext());
assertEquals(r, iter.next());
+ assertTrue(iter.hasNext());
assertEquals(q, iter.next());
+ assertTrue(iter.hasNext());
assertEquals(p, iter.next());
+ assertTrue(iter.hasNext());
assertEquals(o, iter.next());
+ assertTrue(iter.hasNext());
assertEquals(n, iter.next());
+ assertTrue(iter.hasNext());
assertEquals(m, iter.next());
+ assertTrue(iter.hasNext());
assertEquals(l, iter.next());
+ assertTrue(iter.hasNext());
assertEquals(k, iter.next());
-
+ assertTrue(iter.hasNext());
assertEquals(a, iter.next());
+ assertTrue(iter.hasNext());
assertEquals(b, iter.next());
+ assertTrue(iter.hasNext());
assertEquals(c, iter.next());
+ assertTrue(iter.hasNext());
assertEquals(d, iter.next());
+ assertTrue(iter.hasNext());
assertEquals(e, iter.next());
+ assertTrue(iter.hasNext());
assertEquals(f, iter.next());
+ assertTrue(iter.hasNext());
assertEquals(g, iter.next());
+ assertTrue(iter.hasNext());
assertEquals(h, iter.next());
+ assertTrue(iter.hasNext());
assertEquals(i, iter.next());
+ assertTrue(iter.hasNext());
assertEquals(j, iter.next());
}
- public void testPeekFirst()
+ public void testPoll() throws Exception
{
- list.addLast(a, 0);
- list.addLast(b, 1);
- list.addLast(c, 2);
- list.addLast(d, 3);
- list.addLast(e, 4);
- list.addLast(f, 5);
- list.addLast(g, 6);
- list.addLast(h, 7);
- list.addLast(i, 8);
- list.addLast(j, 9);
+ list.addTail(a, 0);
+ list.addTail(b, 1);
+ list.addTail(c, 2);
+ list.addTail(d, 3);
+ list.addTail(e, 4);
+ list.addTail(f, 5);
+ list.addTail(g, 6);
+ list.addTail(h, 7);
+ list.addTail(i, 8);
+ list.addTail(j, 9);
- Assert.assertEquals(j, list.peekFirst());
- Assert.assertEquals(j, list.peekFirst());
+ Assert.assertEquals(j, list.poll());
+ Assert.assertEquals(i, list.poll());
+ Assert.assertEquals(h, list.poll());
+ Assert.assertEquals(g, list.poll());
+ Assert.assertEquals(f, list.poll());
+ Assert.assertEquals(e, list.poll());
+ Assert.assertEquals(d, list.poll());
+ Assert.assertEquals(c, list.poll());
+ Assert.assertEquals(b, list.poll());
+ Assert.assertEquals(a, list.poll());
- list.removeFirst();
+ Assert.assertNull(list.poll());
- Assert.assertEquals(i, list.peekFirst());
- Assert.assertEquals(i, list.peekFirst());
+ list.addTail(a, 9);
+ list.addTail(b, 8);
+ list.addTail(c, 7);
+ list.addTail(d, 6);
+ list.addTail(e, 5);
+ list.addTail(f, 4);
+ list.addTail(g, 3);
+ list.addTail(h, 2);
+ list.addTail(i, 1);
+ list.addTail(j, 0);
- list.clear();
- }
+ Assert.assertEquals(a, list.poll());
+ Assert.assertEquals(b, list.poll());
+ Assert.assertEquals(c, list.poll());
+ Assert.assertEquals(d, list.poll());
+ Assert.assertEquals(e, list.poll());
+ Assert.assertEquals(f, list.poll());
+ Assert.assertEquals(g, list.poll());
+ Assert.assertEquals(h, list.poll());
+ Assert.assertEquals(i, list.poll());
+ Assert.assertEquals(j, list.poll());
- public void testRemoveFirst() throws Exception
- {
- list.addLast(a, 0);
- list.addLast(b, 1);
- list.addLast(c, 2);
- list.addLast(d, 3);
- list.addLast(e, 4);
- list.addLast(f, 5);
- list.addLast(g, 6);
- list.addLast(h, 7);
- list.addLast(i, 8);
- list.addLast(j, 9);
+ Assert.assertNull(list.poll());
- Assert.assertEquals(j, list.removeFirst());
- Assert.assertEquals(i, list.removeFirst());
- Assert.assertEquals(h, list.removeFirst());
- Assert.assertEquals(g, list.removeFirst());
- Assert.assertEquals(f, list.removeFirst());
- Assert.assertEquals(e, list.removeFirst());
- Assert.assertEquals(d, list.removeFirst());
- Assert.assertEquals(c, list.removeFirst());
- Assert.assertEquals(b, list.removeFirst());
- Assert.assertEquals(a, list.removeFirst());
+ list.addTail(a, 9);
+ list.addTail(b, 0);
+ list.addTail(c, 8);
+ list.addTail(d, 1);
+ list.addTail(e, 7);
+ list.addTail(f, 2);
+ list.addTail(g, 6);
+ list.addTail(h, 3);
+ list.addTail(i, 5);
+ list.addTail(j, 4);
- Assert.assertNull(list.removeFirst());
+ Assert.assertEquals(a, list.poll());
+ Assert.assertEquals(c, list.poll());
+ Assert.assertEquals(e, list.poll());
+ Assert.assertEquals(g, list.poll());
+ Assert.assertEquals(i, list.poll());
+ Assert.assertEquals(j, list.poll());
+ Assert.assertEquals(h, list.poll());
+ Assert.assertEquals(f, list.poll());
+ Assert.assertEquals(d, list.poll());
+ Assert.assertEquals(b, list.poll());
- list.addLast(a, 9);
- list.addLast(b, 8);
- list.addLast(c, 7);
- list.addLast(d, 6);
- list.addLast(e, 5);
- list.addLast(f, 4);
- list.addLast(g, 3);
- list.addLast(h, 2);
- list.addLast(i, 1);
- list.addLast(j, 0);
+ Assert.assertNull(list.poll());
- Assert.assertEquals(a, list.removeFirst());
- Assert.assertEquals(b, list.removeFirst());
- Assert.assertEquals(c, list.removeFirst());
- Assert.assertEquals(d, list.removeFirst());
- Assert.assertEquals(e, list.removeFirst());
- Assert.assertEquals(f, list.removeFirst());
- Assert.assertEquals(g, list.removeFirst());
- Assert.assertEquals(h, list.removeFirst());
- Assert.assertEquals(i, list.removeFirst());
- Assert.assertEquals(j, list.removeFirst());
+ list.addTail(a, 0);
+ list.addTail(b, 3);
+ list.addTail(c, 3);
+ list.addTail(d, 3);
+ list.addTail(e, 6);
+ list.addTail(f, 6);
+ list.addTail(g, 6);
+ list.addTail(h, 9);
+ list.addTail(i, 9);
+ list.addTail(j, 9);
- Assert.assertNull(list.removeFirst());
+ Assert.assertEquals(h, list.poll());
+ Assert.assertEquals(i, list.poll());
+ Assert.assertEquals(j, list.poll());
+ Assert.assertEquals(e, list.poll());
+ Assert.assertEquals(f, list.poll());
+ Assert.assertEquals(g, list.poll());
+ Assert.assertEquals(b, list.poll());
+ Assert.assertEquals(c, list.poll());
+ Assert.assertEquals(d, list.poll());
+ Assert.assertEquals(a, list.poll());
- list.addLast(a, 9);
- list.addLast(b, 0);
- list.addLast(c, 8);
- list.addLast(d, 1);
- list.addLast(e, 7);
- list.addLast(f, 2);
- list.addLast(g, 6);
- list.addLast(h, 3);
- list.addLast(i, 5);
- list.addLast(j, 4);
+ Assert.assertNull(list.poll());
- Assert.assertEquals(a, list.removeFirst());
- Assert.assertEquals(c, list.removeFirst());
- Assert.assertEquals(e, list.removeFirst());
- Assert.assertEquals(g, list.removeFirst());
- Assert.assertEquals(i, list.removeFirst());
- Assert.assertEquals(j, list.removeFirst());
- Assert.assertEquals(h, list.removeFirst());
- Assert.assertEquals(f, list.removeFirst());
- Assert.assertEquals(d, list.removeFirst());
- Assert.assertEquals(b, list.removeFirst());
+ list.addTail(a, 5);
+ list.addTail(b, 5);
+ list.addTail(c, 5);
+ list.addTail(d, 5);
+ list.addTail(e, 5);
+ list.addTail(f, 5);
+ list.addTail(g, 5);
+ list.addTail(h, 5);
+ list.addTail(i, 5);
+ list.addTail(j, 5);
- Assert.assertNull(list.removeFirst());
+ Assert.assertEquals(a, list.poll());
+ Assert.assertEquals(b, list.poll());
+ Assert.assertEquals(c, list.poll());
+ Assert.assertEquals(d, list.poll());
+ Assert.assertEquals(e, list.poll());
+ Assert.assertEquals(f, list.poll());
+ Assert.assertEquals(g, list.poll());
+ Assert.assertEquals(h, list.poll());
+ Assert.assertEquals(i, list.poll());
+ Assert.assertEquals(j, list.poll());
- list.addLast(a, 0);
- list.addLast(b, 3);
- list.addLast(c, 3);
- list.addLast(d, 3);
- list.addLast(e, 6);
- list.addLast(f, 6);
- list.addLast(g, 6);
- list.addLast(h, 9);
- list.addLast(i, 9);
- list.addLast(j, 9);
+ Assert.assertNull(list.poll());
- Assert.assertEquals(h, list.removeFirst());
- Assert.assertEquals(i, list.removeFirst());
- Assert.assertEquals(j, list.removeFirst());
- Assert.assertEquals(e, list.removeFirst());
- Assert.assertEquals(f, list.removeFirst());
- Assert.assertEquals(g, list.removeFirst());
- Assert.assertEquals(b, list.removeFirst());
- Assert.assertEquals(c, list.removeFirst());
- Assert.assertEquals(d, list.removeFirst());
- Assert.assertEquals(a, list.removeFirst());
+ list.addTail(j, 5);
+ list.addTail(i, 5);
+ list.addTail(h, 5);
+ list.addTail(g, 5);
+ list.addTail(f, 5);
+ list.addTail(e, 5);
+ list.addTail(d, 5);
+ list.addTail(c, 5);
+ list.addTail(b, 5);
+ list.addTail(a, 5);
- Assert.assertNull(list.removeFirst());
+ Assert.assertEquals(j, list.poll());
+ Assert.assertEquals(i, list.poll());
+ Assert.assertEquals(h, list.poll());
+ Assert.assertEquals(g, list.poll());
+ Assert.assertEquals(f, list.poll());
+ Assert.assertEquals(e, list.poll());
+ Assert.assertEquals(d, list.poll());
+ Assert.assertEquals(c, list.poll());
+ Assert.assertEquals(b, list.poll());
+ Assert.assertEquals(a, list.poll());
- list.addLast(a, 5);
- list.addLast(b, 5);
- list.addLast(c, 5);
- list.addLast(d, 5);
- list.addLast(e, 5);
- list.addLast(f, 5);
- list.addLast(g, 5);
- list.addLast(h, 5);
- list.addLast(i, 5);
- list.addLast(j, 5);
+ Assert.assertNull(list.poll());
+
+ assertEquals(0, list.size());
- Assert.assertEquals(a, list.removeFirst());
- Assert.assertEquals(b, list.removeFirst());
- Assert.assertEquals(c, list.removeFirst());
- Assert.assertEquals(d, list.removeFirst());
- Assert.assertEquals(e, list.removeFirst());
- Assert.assertEquals(f, list.removeFirst());
- Assert.assertEquals(g, list.removeFirst());
- Assert.assertEquals(h, list.removeFirst());
- Assert.assertEquals(i, list.removeFirst());
- Assert.assertEquals(j, list.removeFirst());
-
- Assert.assertNull(list.removeFirst());
-
- list.addLast(j, 5);
- list.addLast(i, 5);
- list.addLast(h, 5);
- list.addLast(g, 5);
- list.addLast(f, 5);
- list.addLast(e, 5);
- list.addLast(d, 5);
- list.addLast(c, 5);
- list.addLast(b, 5);
- list.addLast(a, 5);
-
- Assert.assertEquals(j, list.removeFirst());
- Assert.assertEquals(i, list.removeFirst());
- Assert.assertEquals(h, list.removeFirst());
- Assert.assertEquals(g, list.removeFirst());
- Assert.assertEquals(f, list.removeFirst());
- Assert.assertEquals(e, list.removeFirst());
- Assert.assertEquals(d, list.removeFirst());
- Assert.assertEquals(c, list.removeFirst());
- Assert.assertEquals(b, list.removeFirst());
- Assert.assertEquals(a, list.removeFirst());
-
- Assert.assertNull(list.removeFirst());
-
}
public void testIterator()
{
- list.addLast(a, 9);
- list.addLast(b, 9);
- list.addLast(c, 8);
- list.addLast(d, 8);
- list.addLast(e, 7);
- list.addLast(f, 7);
- list.addLast(g, 7);
- list.addLast(h, 6);
- list.addLast(i, 6);
- list.addLast(j, 6);
- list.addLast(k, 5);
- list.addLast(l, 5);
- list.addLast(m, 4);
- list.addLast(n, 4);
- list.addLast(o, 4);
- list.addLast(p, 3);
- list.addLast(q, 3);
- list.addLast(r, 3);
- list.addLast(s, 2);
- list.addLast(t, 2);
- list.addLast(u, 2);
- list.addLast(v, 1);
- list.addLast(w, 1);
- list.addLast(x, 1);
- list.addLast(y, 0);
- list.addLast(z, 0);
+ list.addTail(a, 9);
+ list.addTail(b, 9);
+ list.addTail(c, 8);
+ list.addTail(d, 8);
+ list.addTail(e, 7);
+ list.addTail(f, 7);
+ list.addTail(g, 7);
+ list.addTail(h, 6);
+ list.addTail(i, 6);
+ list.addTail(j, 6);
+ list.addTail(k, 5);
+ list.addTail(l, 5);
+ list.addTail(m, 4);
+ list.addTail(n, 4);
+ list.addTail(o, 4);
+ list.addTail(p, 3);
+ list.addTail(q, 3);
+ list.addTail(r, 3);
+ list.addTail(s, 2);
+ list.addTail(t, 2);
+ list.addTail(u, 2);
+ list.addTail(v, 1);
+ list.addTail(w, 1);
+ list.addTail(x, 1);
+ list.addTail(y, 0);
+ list.addTail(z, 0);
- HQIterator<Wibble> iter = list.iterator();
+ LinkedListIterator<Wibble> iter = list.iterator();
- int c = 0;
+ int count = 0;
Wibble w;
- while ((w = iter.next()) != null)
+ while (iter.hasNext())
{
- c++;
+ w = iter.next();
+ count++;
}
- Assert.assertEquals(c, 26);
+ Assert.assertEquals(26, count);
Assert.assertEquals(26, list.size());
iter = list.iterator();
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
+ assertTrue(iter.hasNext());
Assert.assertEquals("a", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
+ assertTrue(iter.hasNext());
Assert.assertEquals("b", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("c", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("d", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("e", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("f", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("g", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("h", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("i", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("j", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("k", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("l", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("m", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("n", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("o", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("p", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("q", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("r", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("s", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("t", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("u", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("v", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("w", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("x", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("y", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("z", w.s);
- Assert.assertNull(iter.next());
-
+ assertFalse(iter.hasNext());
+
+
iter = list.iterator();
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("a", w.s);
@@ -541,10 +578,13 @@
Assert.assertEquals(25, list.size());
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("b", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("c", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("d", w.s);
@@ -552,126 +592,241 @@
Assert.assertEquals(24, list.size());
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
+ Assert.assertEquals("c", w.s);
+ assertTrue(iter.hasNext());
+ w = (Wibble)iter.next();
Assert.assertEquals("e", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("f", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("g", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("h", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("i", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("j", w.s);
iter.remove();
Assert.assertEquals(23, list.size());
-
+
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
+ Assert.assertEquals("i", w.s);
+ assertTrue(iter.hasNext());
+ w = (Wibble)iter.next();
Assert.assertEquals("k", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("l", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("m", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("n", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("o", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("p", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("q", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("r", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("s", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("t", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("u", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("v", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("w", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("x", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("y", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("z", w.s);
iter.remove();
- Assert.assertNull(iter.next());
+
iter = list.iterator();
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("b", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("c", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("e", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("f", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("g", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("h", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("i", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("k", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("l", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("m", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("n", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("o", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("p", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("q", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("r", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("s", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("t", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("u", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("v", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("w", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("x", w.s);
+ assertTrue(iter.hasNext());
w = (Wibble)iter.next();
Assert.assertEquals("y", w.s);
- Assert.assertNull(iter.next());
- //We test again - should still be null (there was a bug here)
- Assert.assertNull(iter.next());
+ assertFalse(iter.hasNext());
+ assertFalse(iter.hasNext());
+ //Test the elements added after iter created are seen
+
+ list.addTail(a, 4);
+ list.addTail(b, 4);
+
+ assertTrue(iter.hasNext());
+ w = (Wibble)iter.next();
+ Assert.assertEquals("a", w.s);
+
+ assertTrue(iter.hasNext());
+ w = (Wibble)iter.next();
+ Assert.assertEquals("b", w.s);
+
+ assertFalse(iter.hasNext());
+
+ list.addTail(c, 4);
+ list.addTail(d, 4);
+
+ assertTrue(iter.hasNext());
+ w = (Wibble)iter.next();
+ Assert.assertEquals("c", w.s);
+
+ assertTrue(iter.hasNext());
+ w = (Wibble)iter.next();
+ Assert.assertEquals("d", w.s);
+
+ assertFalse(iter.hasNext());
+
+
}
+
+ public void testIteratorPicksUpHigherPriorities()
+ {
+ list.addTail(a, 4);
+ list.addTail(b, 4);
+ list.addTail(c, 4);
+
+ LinkedListIterator<Wibble> iter = list.iterator();
+
+ assertTrue(iter.hasNext());
+ assertEquals(a, iter.next());
+
+ assertTrue(iter.hasNext());
+ assertEquals(b, iter.next());
+
+ list.addTail(d, 5);
+ list.addTail(e, 5);
+
+ assertTrue(iter.hasNext());
+ assertEquals(d, iter.next());
+
+ assertTrue(iter.hasNext());
+ assertEquals(e, iter.next());
+
+ assertTrue(iter.hasNext());
+ assertEquals(c, iter.next());
+
+ list.addTail(f, 1);
+ list.addTail(g, 9);
+
+ assertTrue(iter.hasNext());
+ assertEquals(g, iter.next());
+
+ assertTrue(iter.hasNext());
+ assertEquals(f, iter.next());
+ }
+
public void testClear()
{
- list.addLast(a, 0);
- list.addLast(b, 3);
- list.addLast(c, 3);
- list.addLast(d, 3);
- list.addLast(e, 6);
- list.addLast(f, 6);
- list.addLast(g, 6);
- list.addLast(h, 9);
- list.addLast(i, 9);
- list.addLast(j, 9);
+ list.addTail(a, 0);
+ list.addTail(b, 3);
+ list.addTail(c, 3);
+ list.addTail(d, 3);
+ list.addTail(e, 6);
+ list.addTail(f, 6);
+ list.addTail(g, 6);
+ list.addTail(h, 9);
+ list.addTail(i, 9);
+ list.addTail(j, 9);
list.clear();
- Assert.assertNull(list.removeFirst());
+ Assert.assertNull(list.poll());
}
class Wibble
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -964,6 +964,12 @@
class FakeBinding implements Binding
{
+ public void close() throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
final SimpleString name;
FakeBinding(final SimpleString name)
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -29,6 +29,54 @@
public class FakeQueue implements Queue
{
+ public void close()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void forceCheckQueueSize()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void reload(MessageReference ref)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void blockOnExecutorFuture()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void addHead(MessageReference ref)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void addTail(MessageReference ref, boolean direct)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void addTail(MessageReference ref)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void resetAllIterators()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
private final SimpleString name;
public FakeQueue(final SimpleString name)
Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2010-08-18 14:49:50 UTC (rev 9562)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -243,7 +243,7 @@
{
MessageReference ref = generateReference(queue, i);
- queue.addLast(ref);
+ queue.addTail(ref);
}
Assert.assertEquals(numMessages, queue.getMessageCount());
@@ -276,7 +276,7 @@
refs.add(ref);
- queue.addLast(ref);
+ queue.addTail(ref);
}
Assert.assertEquals(10, queue.getMessageCount());
@@ -330,7 +330,7 @@
refs.add(ref);
- queue.addLast(ref);
+ queue.addTail(ref);
}
Assert.assertEquals(10, queue.getMessageCount());
@@ -384,7 +384,7 @@
refs.add(ref);
- queue.addLast(ref);
+ queue.addTail(ref);
}
Assert.assertEquals(10, queue.getMessageCount());
@@ -404,7 +404,7 @@
refs.add(ref);
- queue.addLast(ref);
+ queue.addTail(ref);
}
Assert.assertEquals(20, queue.getMessageCount());
@@ -420,7 +420,7 @@
refs.add(ref);
- queue.addLast(ref);
+ queue.addTail(ref);
}
queue.deliverNow();
@@ -431,7 +431,7 @@
Assert.assertEquals(30, queue.getDeliveringCount());
}
- public void testAddFirstadd() throws Exception
+ public void testaddHeadadd() throws Exception
{
QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
@@ -455,7 +455,7 @@
refs1.add(ref);
- queue.addLast(ref);
+ queue.addTail(ref);
}
LinkedList<MessageReference> refs2 = new LinkedList<MessageReference>();
@@ -466,7 +466,7 @@
refs2.addFirst(ref);
- queue.addFirst(ref);
+ queue.addHead(ref);
}
List<MessageReference> refs3 = new ArrayList<MessageReference>();
@@ -477,7 +477,7 @@
refs3.add(ref);
- queue.addLast(ref);
+ queue.addTail(ref);
}
FakeConsumer consumer = new FakeConsumer();
@@ -519,7 +519,7 @@
refs.add(ref);
- queue.addLast(ref);
+ queue.addTail(ref);
}
Assert.assertEquals(numMessages, queue.getMessageCount());
@@ -559,7 +559,7 @@
refs.add(ref);
- queue.addLast(ref);
+ queue.addTail(ref);
}
queue.deliverNow();
@@ -593,7 +593,7 @@
refs.add(ref);
- queue.addLast(ref);
+ queue.addTail(ref);
}
queue.deliverNow();
@@ -625,7 +625,7 @@
refs.add(ref);
- queue.addLast(ref);
+ queue.addTail(ref);
}
queue.deliverNow();
@@ -654,7 +654,7 @@
refs.add(ref);
- queue.addLast(ref);
+ queue.addTail(ref);
}
queue.deliverNow();
@@ -694,7 +694,7 @@
refs.add(ref);
- queue.addLast(ref);
+ queue.addTail(ref);
}
FakeConsumer cons1 = new FakeConsumer();
@@ -747,8 +747,10 @@
refs.add(ref);
- queue.addLast(ref);
+ queue.addTail(ref);
}
+
+ queue.deliverNow();
FakeConsumer consumer = new FakeConsumer();
@@ -820,7 +822,7 @@
{
MessageReference ref = generateReference(queue, i);
- queue.addLast(ref);
+ queue.addTail(ref);
refs.add(ref);
}
@@ -872,13 +874,13 @@
ref1.getMessage().putStringProperty(new SimpleString("fruit"), new SimpleString("banana"));
- queue.addLast(ref1);
+ queue.addTail(ref1);
MessageReference ref2 = generateReference(queue, 2);
ref2.getMessage().putStringProperty(new SimpleString("fruit"), new SimpleString("orange"));
- queue.addLast(ref2);
+ queue.addTail(ref2);
refs.add(ref2);
@@ -908,13 +910,13 @@
ref3.getMessage().putStringProperty(new SimpleString("fruit"), new SimpleString("banana"));
- queue.addLast(ref3);
+ queue.addTail(ref3);
MessageReference ref4 = generateReference(queue, 4);
ref4.getMessage().putStringProperty(new SimpleString("fruit"), new SimpleString("orange"));
- queue.addLast(ref4);
+ queue.addTail(ref4);
refs.add(ref4);
@@ -959,7 +961,7 @@
ref.getMessage().putStringProperty("color", "green");
refs.add(ref);
- queue.addLast(ref);
+ queue.addTail(ref);
}
Assert.assertEquals(10, queue.getMessageCount());
@@ -1010,7 +1012,7 @@
ref.getMessage().putStringProperty("color", "red");
refs.add(ref);
- queue.addLast(ref);
+ queue.addTail(ref);
}
Assert.assertEquals(10, queue.getMessageCount());
@@ -1030,7 +1032,7 @@
refs.add(ref);
ref.getMessage().putStringProperty("color", "green");
- queue.addLast(ref);
+ queue.addTail(ref);
}
Assert.assertEquals(20, queue.getMessageCount());
@@ -1046,7 +1048,7 @@
refs.add(ref);
- queue.addLast(ref);
+ queue.addTail(ref);
}
queue.deliverNow();
@@ -1087,7 +1089,7 @@
ref.getMessage().putStringProperty("color", "red");
refs.add(ref);
- queue.addLast(ref);
+ queue.addTail(ref);
}
Assert.assertEquals(10, queue.getMessageCount());
@@ -1106,7 +1108,7 @@
refs.add(ref);
ref.getMessage().putStringProperty("color", "green");
- queue.addLast(ref);
+ queue.addTail(ref);
}
FakeConsumer consumer = new FakeConsumer(FilterImpl.createFilter("color = 'green'"));
@@ -1125,7 +1127,7 @@
refs.add(ref);
ref.getMessage().putStringProperty("color", "green");
- queue.addLast(ref);
+ queue.addTail(ref);
}
queue.deliverNow();
@@ -1167,19 +1169,19 @@
ref1.getMessage().putStringProperty(new SimpleString("fruit"), new SimpleString("banana"));
- queue.addLast(ref1);
+ queue.addTail(ref1);
MessageReference ref2 = generateReference(queue, 2);
ref2.getMessage().putStringProperty(new SimpleString("cheese"), new SimpleString("stilton"));
- queue.addLast(ref2);
+ queue.addTail(ref2);
MessageReference ref3 = generateReference(queue, 3);
ref3.getMessage().putStringProperty(new SimpleString("cake"), new SimpleString("sponge"));
- queue.addLast(ref3);
+ queue.addTail(ref3);
MessageReference ref4 = generateReference(queue, 4);
@@ -1187,13 +1189,13 @@
refs.add(ref4);
- queue.addLast(ref4);
+ queue.addTail(ref4);
MessageReference ref5 = generateReference(queue, 5);
ref5.getMessage().putStringProperty(new SimpleString("fruit"), new SimpleString("apple"));
- queue.addLast(ref5);
+ queue.addTail(ref5);
MessageReference ref6 = generateReference(queue, 6);
@@ -1201,7 +1203,7 @@
refs.add(ref6);
- queue.addLast(ref6);
+ queue.addTail(ref6);
if (!direct)
{
@@ -1255,9 +1257,9 @@
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
- queue.addFirst(messageReference);
- queue.addLast(messageReference2);
- queue.addFirst(messageReference3);
+ queue.addHead(messageReference);
+ queue.addTail(messageReference2);
+ queue.addHead(messageReference3);
Assert.assertEquals(0, consumer.getReferences().size());
queue.addConsumer(consumer);
@@ -1285,9 +1287,9 @@
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
- queue.addLast(messageReference);
- queue.addLast(messageReference2);
- queue.addLast(messageReference3);
+ queue.addTail(messageReference);
+ queue.addTail(messageReference2);
+ queue.addTail(messageReference3);
Assert.assertEquals(queue.getMessagesAdded(), 3);
}
@@ -1307,9 +1309,9 @@
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
- queue.addFirst(messageReference);
- queue.addFirst(messageReference2);
- queue.addFirst(messageReference3);
+ queue.addHead(messageReference);
+ queue.addHead(messageReference2);
+ queue.addHead(messageReference3);
Assert.assertEquals(queue.getReference(2), messageReference2);
}
@@ -1330,9 +1332,9 @@
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
- queue.addFirst(messageReference);
- queue.addFirst(messageReference2);
- queue.addFirst(messageReference3);
+ queue.addHead(messageReference);
+ queue.addHead(messageReference2);
+ queue.addHead(messageReference3);
Assert.assertNull(queue.getReference(5));
}
@@ -1368,7 +1370,7 @@
refs.add(ref);
- queue.addLast(ref);
+ queue.addTail(ref);
}
// even as this queue is paused, it will receive the messages anyway
Assert.assertEquals(10, queue.getMessageCount());
@@ -1438,7 +1440,7 @@
{
MessageReference ref = generateReference(queue, i);
refs.add(ref);
- queue.addLast(ref);
+ queue.addTail(ref);
}
// the queue even if it's paused will receive the message but won't forward
@@ -1488,11 +1490,11 @@
{
if (first)
{
- queue.addFirst(messageReference);
+ queue.addHead(messageReference);
}
else
{
- queue.addLast(messageReference);
+ queue.addTail(messageReference);
}
added = true;
countDownLatch.countDown();
Added: trunk/tests/src/org/hornetq/tests/unit/util/LinkedListTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/util/LinkedListTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/unit/util/LinkedListTest.java 2010-08-18 17:03:33 UTC (rev 9563)
@@ -0,0 +1,1124 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.unit.util;
+
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import junit.framework.TestCase;
+
+import org.hornetq.core.logging.Logger;
+import org.hornetq.utils.LinkedListImpl;
+import org.hornetq.utils.LinkedListIterator;
+
+/**
+ * A LinkedListTest
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class LinkedListTest extends TestCase
+{
+ private static final Logger log = Logger.getLogger(LinkedListTest.class);
+
+ private LinkedListImpl<Integer> list;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ list = new LinkedListImpl<Integer>();
+ }
+
+ public void testAddTail()
+ {
+ int num = 10;
+
+ assertEquals(0, list.size());
+
+ for (int i = 0; i < num; i++)
+ {
+ list.addTail(i);
+
+ assertEquals(i + 1, list.size());
+ }
+
+ for (int i = 0; i < num; i++)
+ {
+ assertEquals(i, list.poll().intValue());
+
+ assertEquals(num - i - 1, list.size());
+ }
+ }
+
+ public void testAddHead()
+ {
+ int num = 10;
+
+ assertEquals(0, list.size());
+
+ for (int i = 0; i < num; i++)
+ {
+ list.addHead(i);
+
+ assertEquals(i + 1, list.size());
+ }
+
+ for (int i = num - 1; i >= 0; i--)
+ {
+ assertEquals(i, list.poll().intValue());
+
+ assertEquals(i, list.size());
+ }
+ }
+
+ public void testAddHeadAndTail()
+ {
+ int num = 10;
+
+ for (int i = 0; i < num; i++)
+ {
+ list.addHead(i);
+ }
+
+ for (int i = num; i < num * 2; i++)
+ {
+ list.addTail(i);
+ }
+
+ for (int i = num * 2; i < num * 3; i++)
+ {
+ list.addHead(i);
+ }
+
+ for (int i = num * 3; i < num * 4; i++)
+ {
+ list.addTail(i);
+ }
+
+ for (int i = num * 3 - 1; i >= num * 2; i--)
+ {
+ assertEquals(i, list.poll().intValue());
+ }
+
+ for (int i = num - 1; i >= 0; i--)
+ {
+ assertEquals(i, list.poll().intValue());
+ }
+
+ for (int i = num; i < num * 2; i++)
+ {
+ assertEquals(i, list.poll().intValue());
+ }
+
+ for (int i = num * 3; i < num * 4; i++)
+ {
+ assertEquals(i, list.poll().intValue());
+ }
+
+ }
+
+ public void testPoll()
+ {
+ int num = 10;
+
+ assertNull(list.poll());
+ assertNull(list.poll());
+ assertNull(list.poll());
+
+ for (int i = 0; i < num; i++)
+ {
+ list.addTail(i);
+ }
+
+ for (int i = 0; i < num; i++)
+ {
+ assertEquals(i, list.poll().intValue());
+ }
+
+ assertNull(list.poll());
+ assertNull(list.poll());
+ assertNull(list.poll());
+
+ for (int i = num; i < num * 2; i++)
+ {
+ list.addHead(i);
+ }
+
+ for (int i = num * 2 - 1; i >= num; i--)
+ {
+ assertEquals(i, list.poll().intValue());
+ }
+
+ assertNull(list.poll());
+ assertNull(list.poll());
+ assertNull(list.poll());
+
+ }
+
+ public void testIterateNoElements()
+ {
+ LinkedListIterator<Integer> iter = list.iterator();
+
+ assertNotNull(iter);
+
+ try
+ {
+ iter.next();
+
+ fail("Should throw NoSuchElementException");
+ }
+ catch (NoSuchElementException e)
+ {
+ // OK
+ }
+
+ try
+ {
+ iter.remove();
+
+ fail("Should throw NoSuchElementException");
+ }
+ catch (NoSuchElementException e)
+ {
+ // OK
+ }
+ }
+
+ public void testCreateIteratorBeforeAddElements()
+ {
+ int num = 10;
+
+ LinkedListIterator<Integer> iter = list.iterator();
+
+ assertNotNull(iter);
+
+ for (int i = 0; i < num; i++)
+ {
+ list.addTail(i);
+ }
+
+ testIterate1(num, iter);
+ }
+
+ public void testCreateIteratorAfterAddElements()
+ {
+ int num = 10;
+
+ for (int i = 0; i < num; i++)
+ {
+ list.addTail(i);
+ }
+
+ LinkedListIterator<Integer> iter = list.iterator();
+
+ assertNotNull(iter);
+
+ testIterate1(num, iter);
+ }
+
+ public void testIterateThenAddMoreAndIterateAgain()
+ {
+ int num = 10;
+
+ for (int i = 0; i < num; i++)
+ {
+ list.addTail(i);
+ }
+
+ LinkedListIterator<Integer> iter = list.iterator();
+
+ assertNotNull(iter);
+
+ for (int i = 0; i < num; i++)
+ {
+ assertTrue(iter.hasNext());
+ assertEquals(i, iter.next().intValue());
+ }
+ assertFalse(iter.hasNext());
+
+ try
+ {
+ iter.next();
+
+ fail("Should throw NoSuchElementException");
+ }
+ catch (NoSuchElementException e)
+ {
+ // OK
+ }
+
+ // Add more
+
+ for (int i = num; i < num * 2; i++)
+ {
+ list.addTail(i);
+ }
+
+ for (int i = num; i < num * 2; i++)
+ {
+ assertTrue(iter.hasNext());
+ assertEquals(i, iter.next().intValue());
+ }
+ assertFalse(iter.hasNext());
+
+ try
+ {
+ iter.next();
+
+ fail("Should throw NoSuchElementException");
+ }
+ catch (NoSuchElementException e)
+ {
+ // OK
+ }
+
+ // Add some more at head
+
+ for (int i = num * 2; i < num * 3; i++)
+ {
+ list.addHead(i);
+ }
+
+ iter = list.iterator();
+
+ for (int i = num * 3 - 1; i >= num * 2; i--)
+ {
+ assertTrue(iter.hasNext());
+ assertEquals(i, iter.next().intValue());
+ }
+ for (int i = 0; i < num * 2; i++)
+ {
+ assertTrue(iter.hasNext());
+ assertEquals(i, iter.next().intValue());
+ }
+ assertFalse(iter.hasNext());
+ }
+
+ private void testIterate1(int num, LinkedListIterator<Integer> iter)
+ {
+ for (int i = 0; i < num; i++)
+ {
+ assertTrue(iter.hasNext());
+ assertEquals(i, iter.next().intValue());
+ }
+ assertFalse(iter.hasNext());
+
+ try
+ {
+ iter.next();
+
+ fail("Should throw NoSuchElementException");
+ }
+ catch (NoSuchElementException e)
+ {
+ // OK
+ }
+ }
+
+ public void testRemoveAll()
+ {
+ int num = 10;
+
+ LinkedListIterator<Integer> iter = list.iterator();
+
+ try
+ {
+ iter.remove();
+
+ fail("Should throw NoSuchElementException");
+ }
+ catch (NoSuchElementException e)
+ {
+ // OK
+ }
+
+ for (int i = 0; i < num; i++)
+ {
+ list.addTail(i);
+ }
+
+ assertEquals(num, list.size());
+
+ try
+ {
+ iter.remove();
+
+ fail("Should throw NoSuchElementException");
+ }
+ catch (NoSuchElementException e)
+ {
+ // OK
+ }
+
+ for (int i = 0; i < num; i++)
+ {
+ assertTrue(iter.hasNext());
+ assertEquals(i, iter.next().intValue());
+ iter.remove();
+ assertEquals(num - i - 1, list.size());
+ }
+
+ assertFalse(iter.hasNext());
+ }
+
+ public void testRemoveOdd()
+ {
+ int num = 10;
+
+ LinkedListIterator<Integer> iter = list.iterator();
+
+ try
+ {
+ iter.remove();
+
+ fail("Should throw NoSuchElementException");
+ }
+ catch (NoSuchElementException e)
+ {
+ // OK
+ }
+
+ for (int i = 0; i < num; i++)
+ {
+ list.addTail(i);
+ }
+
+ try
+ {
+ iter.remove();
+
+ fail("Should throw NoSuchElementException");
+ }
+ catch (NoSuchElementException e)
+ {
+ // OK
+ }
+
+ int size = num;
+ for (int i = 0; i < num; i++)
+ {
+ assertTrue(iter.hasNext());
+ assertEquals(i, iter.next().intValue());
+ if (i % 2 == 0)
+ {
+ iter.remove();
+ size--;
+ }
+ assertEquals(list.size(), size);
+ }
+
+ iter = list.iterator();
+ for (int i = 0; i < num; i++)
+ {
+ if (i % 2 == 1)
+ {
+ assertTrue(iter.hasNext());
+ assertEquals(i, iter.next().intValue());
+ }
+ }
+
+ assertFalse(iter.hasNext());
+ }
+
+ public void testRemoveHead1()
+ {
+ int num = 10;
+
+ LinkedListIterator<Integer> iter = list.iterator();
+
+ for (int i = 0; i < num; i++)
+ {
+ list.addTail(i);
+ }
+
+ iter.next();
+ iter.remove();
+
+ for (int i = 1; i < num; i++)
+ {
+ assertTrue(iter.hasNext());
+ assertEquals(i, iter.next().intValue());
+ }
+
+ assertFalse(iter.hasNext());
+ }
+
+ public void testRemoveHead2()
+ {
+ int num = 10;
+
+ for (int i = 0; i < num; i++)
+ {
+ list.addTail(i);
+ }
+
+ LinkedListIterator<Integer> iter = list.iterator();
+
+ iter.next();
+ iter.remove();
+
+ iter = list.iterator();
+
+ for (int i = 1; i < num; i++)
+ {
+ assertTrue(iter.hasNext());
+ assertEquals(i, iter.next().intValue());
+ }
+
+ assertFalse(iter.hasNext());
+ }
+
+ public void testRemoveHead3()
+ {
+ int num = 10;
+
+ LinkedListIterator<Integer> iter = list.iterator();
+
+ for (int i = 0; i < num; i++)
+ {
+ list.addTail(i);
+ }
+
+ for (int i = 0; i < num; i++)
+ {
+ assertTrue(iter.hasNext());
+ assertEquals(i, iter.next().intValue());
+ iter.remove();
+ }
+
+ for (int i = num; i < num * 2; i++)
+ {
+ list.addTail(i);
+ }
+
+ for (int i = num; i < num * 2; i++)
+ {
+ assertTrue(iter.hasNext());
+ assertEquals(i, iter.next().intValue());
+ iter.remove();
+ }
+
+ }
+
+ public void testRemoveTail1()
+ {
+ int num = 10;
+
+ LinkedListIterator<Integer> iter = list.iterator();
+
+ for (int i = 0; i < num; i++)
+ {
+ list.addTail(i);
+ }
+
+ for (int i = 0; i < num; i++)
+ {
+ assertTrue(iter.hasNext());
+ assertEquals(i, iter.next().intValue());
+ }
+
+ assertFalse(iter.hasNext());
+
+ // Remove the last one, that's element 9
+ iter.remove();
+
+ iter = list.iterator();
+
+ for (int i = 0; i < num - 1; i++)
+ {
+ assertTrue(iter.hasNext());
+ assertEquals(i, iter.next().intValue());
+ }
+
+ assertFalse(iter.hasNext());
+ }
+
+ public void testRemoveMiddle()
+ {
+ int num = 10;
+
+ for (int i = 0; i < num; i++)
+ {
+ list.addTail(i);
+ }
+
+ LinkedListIterator<Integer> iter = list.iterator();
+
+ for (int i = 0; i < num / 2; i++)
+ {
+ assertTrue(iter.hasNext());
+ assertEquals(i, iter.next().intValue());
+ }
+
+ // Remove the 4th element
+ iter.remove();
+
+ iter = list.iterator();
+
+ for (int i = 0; i < num; i++)
+ {
+ if (i != num / 2 - 1)
+ {
+ assertTrue(iter.hasNext());
+ assertEquals(i, iter.next().intValue());
+ }
+ }
+
+ assertFalse(iter.hasNext());
+ }
+
+ public void testRemoveTail2()
+ {
+ int num = 10;
+
+ for (int i = 0; i < num; i++)
+ {
+ list.addTail(i);
+ }
+
+ LinkedListIterator<Integer> iter = list.iterator();
+
+ for (int i = 0; i < num; i++)
+ {
+ assertTrue(iter.hasNext());
+ assertEquals(i, iter.next().intValue());
+ }
+
+ assertFalse(iter.hasNext());
+
+ // Remove the last one, that's element 9
+ iter.remove();
+
+ try
+ {
+ iter.remove();
+ fail("Should throw exception");
+ }
+ catch (NoSuchElementException e)
+ {
+ }
+
+ iter = list.iterator();
+
+ for (int i = 0; i < num - 1; i++)
+ {
+ assertTrue(iter.hasNext());
+ assertEquals(i, iter.next().intValue());
+ }
+
+ assertFalse(iter.hasNext());
+ }
+
+ public void testRemoveTail3()
+ {
+ int num = 10;
+
+ LinkedListIterator<Integer> iter = list.iterator();
+
+ for (int i = 0; i < num; i++)
+ {
+ list.addTail(i);
+ }
+
+ for (int i = 0; i < num; i++)
+ {
+ assertTrue(iter.hasNext());
+ assertEquals(i, iter.next().intValue());
+ }
+
+ assertFalse(iter.hasNext());
+
+ // This should remove the 9th element and move the iterator back to position 8
+ iter.remove();
+
+ for (int i = num; i < num * 2; i++)
+ {
+ list.addTail(i);
+ }
+
+ assertTrue(iter.hasNext());
+ assertEquals(8, iter.next().intValue());
+
+ for (int i = num; i < num * 2; i++)
+ {
+ assertTrue(iter.hasNext());
+ assertEquals(i, iter.next().intValue());
+ }
+ }
+
+ public void testRemoveHeadAndTail1()
+ {
+ LinkedListIterator<Integer> iter = list.iterator();
+
+ int num = 10;
+
+ for (int i = 0; i < num; i++)
+ {
+ list.addTail(i);
+ assertTrue(iter.hasNext());
+ assertEquals(i, iter.next().intValue());
+ iter.remove();
+ }
+
+ }
+
+ public void testRemoveHeadAndTail2()
+ {
+ LinkedListIterator<Integer> iter = list.iterator();
+
+ int num = 10;
+
+ for (int i = 0; i < num; i++)
+ {
+ list.addHead(i);
+ assertEquals(1, list.size());
+ assertTrue(iter.hasNext());
+ assertEquals(i, iter.next().intValue());
+ iter.remove();
+ }
+
+ }
+
+ public void testRemoveHeadAndTail3()
+ {
+ LinkedListIterator<Integer> iter = list.iterator();
+
+ int num = 10;
+
+ for (int i = 0; i < num; i++)
+ {
+ if (i % 2 == 0)
+ {
+ list.addHead(i);
+ }
+ else
+ {
+ list.addTail(i);
+ }
+ assertEquals(1, list.size());
+ assertTrue(iter.hasNext());
+ assertEquals(i, iter.next().intValue());
+ iter.remove();
+ }
+
+ }
+
+ public void testRemoveInTurn()
+ {
+ LinkedListIterator<Integer> iter = list.iterator();
+
+ int num = 10;
+
+ for (int i = 0; i < num; i++)
+ {
+ list.addTail(i);
+ }
+
+ for (int i = 0; i < num; i++)
+ {
+ assertTrue(iter.hasNext());
+ assertEquals(i, iter.next().intValue());
+ iter.remove();
+ }
+
+ assertFalse(iter.hasNext());
+ assertEquals(0, list.size());
+
+ }
+
+ public void testClear()
+ {
+
+ int num = 10;
+
+ for (int i = 0; i < num; i++)
+ {
+ list.addTail(i);
+ }
+
+ assertEquals(num, list.size());
+
+ list.clear();
+
+ assertEquals(0, list.size());
+
+ assertNull(list.poll());
+
+ LinkedListIterator<Integer> iter = list.iterator();
+
+ assertFalse(iter.hasNext());
+
+ try
+ {
+ iter.next();
+ }
+ catch (NoSuchElementException e)
+ {
+ }
+
+ for (int i = 0; i < num; i++)
+ {
+ list.addTail(i);
+ }
+
+ assertEquals(num, list.size());
+
+ iter = list.iterator();
+
+ for (int i = 0; i < num; i++)
+ {
+ assertTrue(iter.hasNext());
+ assertEquals(i, iter.next().intValue());
+ }
+ assertFalse(iter.hasNext());
+
+ for (int i = 0; i < num; i++)
+ {
+ assertEquals(i, list.poll().intValue());
+ }
+ assertNull(list.poll());
+ assertEquals(0, list.size());
+
+ }
+
+ public void testMultipleIterators1()
+ {
+ int num = 10;
+
+ for (int i = 0; i < num; i++)
+ {
+ list.addTail(i);
+ }
+
+ LinkedListIterator<Integer> iter1 = list.iterator();
+ LinkedListIterator<Integer> iter2 = list.iterator();
+ LinkedListIterator<Integer> iter3 = list.iterator();
+
+ for (int i = 0; i < num;)
+ {
+ assertTrue(iter1.hasNext());
+ assertEquals(i++, iter1.next().intValue());
+ iter1.remove();
+
+ if (i == 10)
+ {
+ break;
+ }
+
+ assertTrue(iter2.hasNext());
+ assertEquals(i++, iter2.next().intValue());
+ iter2.remove();
+
+ assertTrue(iter3.hasNext());
+ assertEquals(i++, iter3.next().intValue());
+ iter3.remove();
+ }
+ }
+
+ public void testRepeat()
+ {
+ int num = 10;
+
+ for (int i = 0; i < num; i++)
+ {
+ list.addTail(i);
+ }
+
+ LinkedListIterator<Integer> iter = list.iterator();
+
+ assertTrue(iter.hasNext());
+ assertEquals(0, iter.next().intValue());
+
+ iter.repeat();
+ assertTrue(iter.hasNext());
+ assertEquals(0, iter.next().intValue());
+
+ iter.next();
+ iter.next();
+ iter.next();
+ iter.hasNext();
+ assertEquals(4, iter.next().intValue());
+
+ iter.repeat();
+ assertTrue(iter.hasNext());
+ assertEquals(4, iter.next().intValue());
+
+ iter.next();
+ iter.next();
+ iter.next();
+ iter.next();
+ assertEquals(9, iter.next().intValue());
+ assertFalse(iter.hasNext());
+
+ iter.repeat();
+ assertTrue(iter.hasNext());
+ assertEquals(9, iter.next().intValue());
+ assertFalse(iter.hasNext());
+ }
+
+ public void testRepeatAndRemove()
+ {
+ int num = 10;
+
+ for (int i = 0; i < num; i++)
+ {
+ list.addTail(i);
+ }
+
+ LinkedListIterator<Integer> iter1 = list.iterator();
+
+ LinkedListIterator<Integer> iter2 = list.iterator();
+
+ assertTrue(iter1.hasNext());
+ assertEquals(0, iter1.next().intValue());
+
+ assertTrue(iter2.hasNext());
+ assertEquals(0, iter2.next().intValue());
+
+ iter2.remove();
+
+ iter1.repeat();
+
+ // Should move to the next one
+ assertTrue(iter1.hasNext());
+ assertEquals(1, iter1.next().intValue());
+
+ iter1.next();
+ iter1.next();
+ iter1.next();
+ iter1.next();
+ iter1.next();
+ iter1.next();
+ iter1.next();
+ iter1.next();
+ assertEquals(9, iter1.next().intValue());
+
+ iter2.next();
+ iter2.next();
+ iter2.next();
+ iter2.next();
+ iter2.next();
+ iter2.next();
+ iter2.next();
+ iter2.next();
+ assertEquals(9, iter2.next().intValue());
+
+ iter1.remove();
+
+ iter2.repeat();
+
+ // Go back one since can't go forward
+ assertEquals(8, iter2.next().intValue());
+
+ }
+
+ public void testMultipleIterators2()
+ {
+ int num = 10;
+
+ for (int i = 0; i < num; i++)
+ {
+ list.addTail(i);
+ }
+
+ LinkedListIterator<Integer> iter1 = list.iterator();
+ LinkedListIterator<Integer> iter2 = list.iterator();
+ LinkedListIterator<Integer> iter3 = list.iterator();
+ LinkedListIterator<Integer> iter4 = list.iterator();
+ LinkedListIterator<Integer> iter5 = list.iterator();
+
+ assertTrue(iter1.hasNext());
+ assertTrue(iter2.hasNext());
+ assertTrue(iter3.hasNext());
+ assertTrue(iter4.hasNext());
+ assertTrue(iter5.hasNext());
+
+ assertEquals(0, iter2.next().intValue());
+ assertTrue(iter2.hasNext());
+ assertEquals(1, iter2.next().intValue());
+
+ assertEquals(0, iter1.next().intValue());
+ iter1.remove();
+
+ assertTrue(iter1.hasNext());
+ assertEquals(1, iter1.next().intValue());
+
+ // The others should get nudged onto the next value up
+ assertEquals(1, iter3.next().intValue());
+ assertEquals(1, iter4.next().intValue());
+ assertEquals(1, iter5.next().intValue());
+
+ assertTrue(iter4.hasNext());
+ assertEquals(2, iter4.next().intValue());
+ assertEquals(3, iter4.next().intValue());
+ assertEquals(4, iter4.next().intValue());
+ assertEquals(5, iter4.next().intValue());
+ assertEquals(6, iter4.next().intValue());
+ assertEquals(7, iter4.next().intValue());
+ assertEquals(8, iter4.next().intValue());
+ assertEquals(9, iter4.next().intValue());
+ assertFalse(iter4.hasNext());
+
+ assertTrue(iter5.hasNext());
+ assertEquals(2, iter5.next().intValue());
+ assertEquals(3, iter5.next().intValue());
+ assertEquals(4, iter5.next().intValue());
+ assertEquals(5, iter5.next().intValue());
+ assertEquals(6, iter5.next().intValue());
+
+ assertTrue(iter3.hasNext());
+ assertEquals(2, iter3.next().intValue());
+ assertEquals(3, iter3.next().intValue());
+ assertEquals(4, iter3.next().intValue());
+
+ assertTrue(iter2.hasNext());
+ assertEquals(2, iter2.next().intValue());
+ assertEquals(3, iter2.next().intValue());
+ assertEquals(4, iter2.next().intValue());
+
+ assertTrue(iter1.hasNext());
+ assertEquals(2, iter1.next().intValue());
+ assertEquals(3, iter1.next().intValue());
+ assertEquals(4, iter1.next().intValue());
+
+ // 1, 2, 3 are on element 4
+
+ iter2.remove();
+ assertEquals(5, iter2.next().intValue());
+ iter2.remove();
+
+ // Should be nudged to element 6
+
+ assertTrue(iter1.hasNext());
+ assertEquals(6, iter1.next().intValue());
+ assertTrue(iter2.hasNext());
+ assertEquals(6, iter2.next().intValue());
+ assertTrue(iter3.hasNext());
+ assertEquals(6, iter3.next().intValue());
+
+ iter5.remove();
+ assertTrue(iter5.hasNext());
+ assertEquals(7, iter5.next().intValue());
+
+ // Should be nudged to 7
+
+ assertTrue(iter1.hasNext());
+ assertEquals(7, iter1.next().intValue());
+ assertTrue(iter2.hasNext());
+ assertEquals(7, iter2.next().intValue());
+ assertTrue(iter3.hasNext());
+ assertEquals(7, iter3.next().intValue());
+
+ // Delete last element
+
+ assertTrue(iter5.hasNext());
+ assertEquals(8, iter5.next().intValue());
+ assertTrue(iter5.hasNext());
+ assertEquals(9, iter5.next().intValue());
+ assertFalse(iter5.hasNext());
+
+ iter5.remove();
+
+ // iter4 should be nudged back to 8, now remove element 8
+ iter4.remove();
+
+ // add a new element on tail
+
+ list.addTail(10);
+
+ // should be nudged back to 7
+
+ assertTrue(iter5.hasNext());
+ assertEquals(7, iter5.next().intValue());
+ assertTrue(iter5.hasNext());
+ assertEquals(10, iter5.next().intValue());
+
+ assertTrue(iter4.hasNext());
+ assertEquals(7, iter4.next().intValue());
+ assertTrue(iter4.hasNext());
+ assertEquals(10, iter4.next().intValue());
+
+ assertTrue(iter3.hasNext());
+ assertEquals(10, iter3.next().intValue());
+
+ assertTrue(iter2.hasNext());
+ assertEquals(10, iter2.next().intValue());
+
+ assertTrue(iter1.hasNext());
+ assertEquals(10, iter1.next().intValue());
+
+ }
+
+ public void testResizing()
+ {
+ int numIters = 1000;
+
+ List<LinkedListIterator<Integer>> iters = new java.util.LinkedList<LinkedListIterator<Integer>>();
+
+ int num = 10;
+
+ for (int i = 0; i < num; i++)
+ {
+ list.addTail(i);
+ }
+
+ for (int i = 0; i < numIters; i++)
+ {
+ LinkedListIterator<Integer> iter = list.iterator();
+
+ iters.add(iter);
+
+ for (int j = 0; j < num / 2; j++)
+ {
+ assertTrue(iter.hasNext());
+
+ assertEquals(j, iter.next().intValue());
+ }
+ }
+
+ assertEquals(numIters, list.numIters());
+
+ // Close the odd ones
+
+ boolean b = false;
+ for (LinkedListIterator<Integer> iter: iters)
+ {
+ if (b)
+ {
+ iter.close();
+ }
+ b = !b;
+ }
+
+ assertEquals(numIters / 2, list.numIters());
+
+ // close the even ones
+
+ b = true;
+ for (LinkedListIterator<Integer> iter: iters)
+ {
+ if (b)
+ {
+ iter.close();
+ }
+ b = !b;
+ }
+
+ assertEquals(0, list.numIters());
+
+ }
+}
15 years, 9 months
JBoss hornetq SVN: r9562 - in branches/2_2_0_HA_Improvements/src/main/org/hornetq/core: server/cluster/impl and 1 other directory.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-08-18 10:49:50 -0400 (Wed, 18 Aug 2010)
New Revision: 9562
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/DelegatingSession.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
Log:
tidied up
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-08-18 14:03:20 UTC (rev 9561)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-08-18 14:49:50 UTC (rev 9562)
@@ -100,11 +100,11 @@
private final Object exitLock = new Object();
- private final Object createSessionLock = new CreateSessionLock();
+ private final Object createSessionLock = new Object();
private boolean inCreateSession;
- private final Object failoverLock = new FailoverLock();
+ private final Object failoverLock = new Object();
private final ExecutorFactory orderedExecutorFactory;
@@ -1283,14 +1283,4 @@
cancelled = true;
}
}
-
- class CreateSessionLock
- {
-
- }
-
- class FailoverLock
- {
-
- }
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-08-18 14:03:20 UTC (rev 9561)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-08-18 14:49:50 UTC (rev 9562)
@@ -841,11 +841,6 @@
doCleanup(false);
}
- public synchronized void cleanUp() throws Exception
- {
- cleanUp(false);
- }
-
public synchronized void cleanUp(boolean failingOver) throws Exception
{
if (closed)
@@ -1489,7 +1484,7 @@
{
try
{
- cleanUp();
+ cleanUp(false);
}
catch (Exception e)
{
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2010-08-18 14:03:20 UTC (rev 9561)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2010-08-18 14:49:50 UTC (rev 9562)
@@ -60,8 +60,6 @@
RemotingConnection getConnection();
- void cleanUp() throws Exception;
-
void cleanUp(boolean failingOver) throws Exception;
void returnBlocking();
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010-08-18 14:03:20 UTC (rev 9561)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010-08-18 14:49:50 UTC (rev 9562)
@@ -134,10 +134,6 @@
session.forceDelivery(consumerID, sequence);
}
- public void cleanUp() throws Exception
- {
- session.cleanUp();
- }
public void cleanUp(boolean failingOver) throws Exception
{
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2010-08-18 14:03:20 UTC (rev 9561)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2010-08-18 14:49:50 UTC (rev 9562)
@@ -234,7 +234,7 @@
{
try
{
- session.cleanUp();
+ session.cleanUp(false);
}
catch (Exception e)
{
15 years, 9 months
JBoss hornetq SVN: r9561 - branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-08-18 10:03:20 -0400 (Wed, 18 Aug 2010)
New Revision: 9561
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/DelegatingSession.java
Log:
fixed deadlock during failover
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-08-18 05:42:32 UTC (rev 9560)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-08-18 14:03:20 UTC (rev 9561)
@@ -100,11 +100,11 @@
private final Object exitLock = new Object();
- private final Object createSessionLock = new Object();
+ private final Object createSessionLock = new CreateSessionLock();
private boolean inCreateSession;
- private final Object failoverLock = new Object();
+ private final Object failoverLock = new FailoverLock();
private final ExecutorFactory orderedExecutorFactory;
@@ -349,15 +349,22 @@
// Must be synchronized to prevent it happening concurrently with failover which can lead to
// inconsistencies
- public void removeSession(final ClientSessionInternal session)
+ public void removeSession(final ClientSessionInternal session, boolean failingOver)
{
- synchronized (createSessionLock)
+ if (!failingOver)
{
- synchronized (failoverLock)
+ synchronized (createSessionLock)
{
- sessions.remove(session);
+ synchronized (failoverLock)
+ {
+ sessions.remove(session);
+ }
}
}
+ else
+ {
+ sessions.remove(session);
+ }
}
public synchronized int numConnections()
@@ -610,7 +617,7 @@
{
try
{
- session.cleanUp();
+ session.cleanUp(true);
}
catch (Exception e)
{
@@ -1276,4 +1283,14 @@
cancelled = true;
}
}
+
+ class CreateSessionLock
+ {
+
+ }
+
+ class FailoverLock
+ {
+
+ }
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java 2010-08-18 05:42:32 UTC (rev 9560)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java 2010-08-18 14:03:20 UTC (rev 9561)
@@ -35,6 +35,6 @@
int numSessions();
- void removeSession(final ClientSessionInternal session);
+ void removeSession(final ClientSessionInternal session, boolean failingOver);
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-08-18 05:42:32 UTC (rev 9560)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-08-18 14:03:20 UTC (rev 9561)
@@ -838,11 +838,16 @@
ClientSessionImpl.log.trace("Failed to close session", e);
}
- doCleanup();
+ doCleanup(false);
}
public synchronized void cleanUp() throws Exception
{
+ cleanUp(false);
+ }
+
+ public synchronized void cleanUp(boolean failingOver) throws Exception
+ {
if (closed)
{
return;
@@ -852,7 +857,7 @@
cleanUpChildren();
- doCleanup();
+ doCleanup(failingOver);
}
public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler)
@@ -1661,7 +1666,7 @@
}
}
- private void doCleanup()
+ private void doCleanup(boolean failingOver)
{
remotingConnection.removeFailureListener(this);
@@ -1672,7 +1677,7 @@
channel.close();
}
- sessionFactory.removeSession(this);
+ sessionFactory.removeSession(this, failingOver);
}
private void cleanUpChildren() throws Exception
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2010-08-18 05:42:32 UTC (rev 9560)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2010-08-18 14:03:20 UTC (rev 9561)
@@ -62,6 +62,8 @@
void cleanUp() throws Exception;
+ void cleanUp(boolean failingOver) throws Exception;
+
void returnBlocking();
void setForceNotSameRM(boolean force);
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010-08-18 05:42:32 UTC (rev 9560)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010-08-18 14:03:20 UTC (rev 9561)
@@ -139,6 +139,11 @@
session.cleanUp();
}
+ public void cleanUp(boolean failingOver) throws Exception
+ {
+ session.cleanUp(failingOver);
+ }
+
public void close() throws HornetQException
{
closed = true;
15 years, 9 months