JBoss hornetq SVN: r9700 - branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-09-20 10:15:39 -0400 (Mon, 20 Sep 2010)
New Revision: 9700
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java
Log:
simplify Topology.nodes()
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java 2010-09-16 15:39:03 UTC (rev 9699)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java 2010-09-20 14:15:39 UTC (rev 9700)
@@ -39,8 +39,6 @@
*/
private Map<String, TopologyMember> topology = new HashMap<String, TopologyMember>();
- int nodes = 0;
-
public synchronized boolean addMember(String nodeId, TopologyMember member)
{
boolean replaced = false;
@@ -49,32 +47,17 @@
{
topology.put(nodeId, member);
replaced = true;
- if(member.getConnector().a != null)
- {
- nodes++;
- }
- if(member.getConnector().b != null)
- {
- nodes++;
- }
}
else
{
+ System.out.println("current=" + currentMember + ", new=" + member);
if(hasChanged(currentMember.getConnector().a, member.getConnector().a) && member.getConnector().a != null)
{
- if(currentMember.getConnector().a == null)
- {
- nodes++;
- }
currentMember.getConnector().a = member.getConnector().a;
replaced = true;
}
if(hasChanged(currentMember.getConnector().b, member.getConnector().b) && member.getConnector().b != null)
{
- if(currentMember.getConnector().b == null)
- {
- nodes++;
- }
currentMember.getConnector().b = member.getConnector().b;
replaced = true;
}
@@ -85,18 +68,6 @@
public synchronized boolean removeMember(String nodeId)
{
TopologyMember member = topology.remove(nodeId);
- if(member != null)
- {
- if(member.getConnector().a != null)
- {
- nodes--;
- }
- if(member.getConnector().b != null)
- {
- nodes--;
- }
- }
-
return (member != null);
}
@@ -126,7 +97,19 @@
public int nodes()
{
- return nodes;
+ int count = 0;
+ for (TopologyMember member : topology.values())
+ {
+ if (member.getConnector().a != null)
+ {
+ count++;
+ }
+ if (member.getConnector().b != null)
+ {
+ count++;
+ }
+ }
+ return count;
}
public String describe()
@@ -137,14 +120,13 @@
{
desc += "\t" + entry.getKey() + " => " + entry.getValue() + "\n";
}
- desc += "\t" + "nodes=" + nodes + "\t" + "members=" + members();
+ desc += "\t" + "nodes=" + nodes() + "\t" + "members=" + members();
return desc;
}
public void clear()
{
topology.clear();
- nodes = 0;
}
public int members()
14 years, 5 months
JBoss hornetq SVN: r9699 - in trunk: src/main/org/hornetq/core/server/impl and 2 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-09-16 11:39:03 -0400 (Thu, 16 Sep 2010)
New Revision: 9699
Added:
trunk/tests/src/org/hornetq/tests/integration/remoting/DirectDeliverTest.java
Modified:
trunk/src/main/org/hornetq/core/server/Queue.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
Log:
https://jira.jboss.org/browse/HORNETQ-469
Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java 2010-09-16 15:34:41 UTC (rev 9698)
+++ trunk/src/main/org/hornetq/core/server/Queue.java 2010-09-16 15:39:03 UTC (rev 9699)
@@ -155,4 +155,6 @@
void blockOnExecutorFuture();
void close() throws Exception;
+
+ boolean isDirectDeliver();
}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-09-16 15:34:41 UTC (rev 9698)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-09-16 15:39:03 UTC (rev 9699)
@@ -75,9 +75,9 @@
public static final int NUM_PRIORITIES = 10;
public static final int MAX_DELIVERIES_IN_LOOP = 1000;
-
- private static final int CHECK_QUEUE_SIZE_PERIOD = 2000;
+ public static final int CHECK_QUEUE_SIZE_PERIOD = 100;
+
private final long id;
private final SimpleString name;
@@ -138,10 +138,10 @@
private final Runnable concurrentPoller = new ConcurrentPoller();
- private volatile boolean queued;
-
- private volatile boolean checkQueueSize = true;
+ private volatile boolean checkDirect;
+ private volatile boolean directDeliver = true;
+
public QueueImpl(final long id,
final SimpleString address,
final SimpleString name,
@@ -191,11 +191,15 @@
{
public void run()
{
- checkQueueSize = true;
+ // This flag is periodically set to true. This enables the directDeliver flag to be set to true if the queue
+ // is empty
+ // We don't want to evaluate that on every delivery since that's too expensive
+
+ checkDirect = true;
}
}, CHECK_QUEUE_SIZE_PERIOD, CHECK_QUEUE_SIZE_PERIOD, TimeUnit.MILLISECONDS);
}
-
+
// Bindable implementation -------------------------------------------------------------------------------------
public SimpleString getRoutingName()
@@ -252,12 +256,10 @@
{
return;
}
-
+
messageReferences.addHead(ref, ref.getMessage().getPriority());
-
- queued = true;
-
- checkQueueSize = false;
+
+ directDeliver = false;
}
public synchronized void reload(final MessageReference ref)
@@ -267,9 +269,7 @@
messageReferences.addTail(ref, ref.getMessage().getPriority());
}
- queued = true;
-
- checkQueueSize = false;
+ directDeliver = false;
messagesAdded++;
}
@@ -291,41 +291,47 @@
return;
}
- if (checkQueueSize)
+ // The checkDirect flag is periodically set to true, if the delivery is specified as direct then this causes the
+ // directDeliver flag to be re-computed resulting in direct delivery if the queue is empty
+ // We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue
+ if (checkDirect)
{
- // This is an expensive operation so we don't want to do it every time we add a message, that's why we use the checkQueueSize flag
- // which is set to true periodically using a scheduled executor
+ if (direct && !directDeliver && concurrentQueue.isEmpty() && messageReferences.isEmpty())
+ {
+ // We must block on the executor to ensure any async deliveries have completed or we might get out of order
+ // deliveries
+ blockOnExecutorFuture();
- queued = !messageReferences.isEmpty() || !concurrentQueue.isEmpty();
-
- checkQueueSize = false;
+ // Go into direct delivery mode
+ directDeliver = true;
+ }
+ checkDirect = false;
}
- if (direct & !queued)
+ if (direct && directDeliver && deliverDirect(ref))
{
- if (deliverDirect(ref))
- {
- return;
- }
+ return;
}
concurrentQueue.add(ref);
+ directDeliver = false;
+
executor.execute(concurrentPoller);
}
-
+
public void deliverAsync()
{
executor.execute(deliverRunner);
}
-
+
public void close() throws Exception
{
if (checkQueueSizeFuture != null)
{
checkQueueSizeFuture.cancel(false);
}
-
+
cancelRedistributor();
}
@@ -486,7 +492,7 @@
redistributorFuture = null;
}
}
-
+
@Override
protected void finalize() throws Throwable
{
@@ -494,9 +500,9 @@
{
checkQueueSizeFuture.cancel(false);
}
-
+
cancelRedistributor();
-
+
super.finalize();
}
@@ -998,6 +1004,11 @@
{
return paused;
}
+
+ public boolean isDirectDeliver()
+ {
+ return directDeliver;
+ }
// Public
// -----------------------------------------------------------------------------
@@ -1075,10 +1086,10 @@
// Schedule another one - we do this to prevent a single thread getting caught up in this loop for too long
deliverAsync();
-
+
return;
}
-
+
ConsumerHolder holder = consumerList.get(pos);
Consumer consumer = holder.consumer;
@@ -1169,7 +1180,7 @@
if (pos == size)
{
pos = 0;
- }
+ }
}
}
Added: trunk/tests/src/org/hornetq/tests/integration/remoting/DirectDeliverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/remoting/DirectDeliverTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/remoting/DirectDeliverTest.java 2010-09-16 15:39:03 UTC (rev 9699)
@@ -0,0 +1,197 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.remoting;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
+import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
+import org.hornetq.core.remoting.impl.netty.TransportConstants;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.impl.QueueImpl;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ *
+ * A DirectDeliverTest
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class DirectDeliverTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(DirectDeliverTest.class);
+
+ // Attributes ----------------------------------------------------
+
+ private HornetQServer server;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ Map<String, Object> params = new HashMap<String, Object>();
+ params.put(TransportConstants.DIRECT_DELIVER, true);
+
+ TransportConfiguration tc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
+
+ Configuration config = new ConfigurationImpl();
+ config.getAcceptorConfigurations().add(tc);
+
+ config.setSecurityEnabled(false);
+ server = createServer(false, config);
+ server.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ server.stop();
+
+ server = null;
+
+ super.tearDown();
+ }
+
+ protected ClientSessionFactory createSessionFactory()
+ {
+ ClientSessionFactory sf = HornetQClient.createClientSessionFactory(new TransportConfiguration(NettyConnectorFactory.class.getName()));
+
+ return sf;
+ }
+
+ public void testDirectDeliver() throws Exception
+ {
+ final String foo = "foo";
+
+ ClientSessionFactory sf = createSessionFactory();
+
+ ClientSession session = sf.createSession();
+
+ session.createQueue(foo, foo);
+
+ Binding binding = server.getPostOffice().getBinding(new SimpleString(foo));
+
+ Queue queue = (Queue)binding.getBindable();
+
+ assertTrue(queue.isDirectDeliver());
+
+ ClientProducer prod = session.createProducer(foo);
+
+ ClientConsumer cons = session.createConsumer(foo);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+
+ prod.send(msg);
+ }
+
+ queue.blockOnExecutorFuture();
+
+ //Consumer is not started so should go queued
+ assertFalse(queue.isDirectDeliver());
+
+ session.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage msg = cons.receive(10000);
+
+ assertNotNull(msg);
+
+ msg.acknowledge();
+ }
+
+ Thread.sleep((long)(QueueImpl.CHECK_QUEUE_SIZE_PERIOD * 1.5));
+
+ //Add another message, should go direct
+ ClientMessage msg = session.createMessage(true);
+
+ prod.send(msg);
+
+ queue.blockOnExecutorFuture();
+
+ assertTrue(queue.isDirectDeliver());
+
+ //Send some more
+ for (int i = 0; i < numMessages; i++)
+ {
+ msg = session.createMessage(true);
+
+ prod.send(msg);
+ }
+
+ for (int i = 0; i < numMessages + 1; i++)
+ {
+ msg = cons.receive(10000);
+
+ assertNotNull(msg);
+
+ msg.acknowledge();
+ }
+
+ assertTrue(queue.isDirectDeliver());
+
+ session.stop();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ msg = session.createMessage(true);
+
+ prod.send(msg);
+ }
+
+ assertFalse(queue.isDirectDeliver());
+
+
+ sf.close();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2010-09-16 15:34:41 UTC (rev 9698)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2010-09-16 15:39:03 UTC (rev 9699)
@@ -27,8 +27,21 @@
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.transaction.Transaction;
+/**
+ * A FakeQueue
+ *
+ * @author tim
+ *
+ *
+ */
public class FakeQueue implements Queue
{
+ public boolean isDirectDeliver()
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
public void close()
{
// TODO Auto-generated method stub
14 years, 6 months
JBoss hornetq SVN: r9698 - branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-09-16 11:34:41 -0400 (Thu, 16 Sep 2010)
New Revision: 9698
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
Log:
fix FailoverTest
* do not include backups in the connector list of a server locator
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-09-16 14:39:42 UTC (rev 9697)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-09-16 15:34:41 UTC (rev 9698)
@@ -333,7 +333,7 @@
protected ServerLocatorInternal getServerLocator() throws Exception
{
- ServerLocator locator = HornetQClient.createServerLocatorWithHA(getConnectorTransportConfiguration(true), getConnectorTransportConfiguration(false));
+ ServerLocator locator = HornetQClient.createServerLocatorWithHA(getConnectorTransportConfiguration(true));// , getConnectorTransportConfiguration(false));
return (ServerLocatorInternal) locator;
}
14 years, 6 months
JBoss hornetq SVN: r9697 - branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-09-16 10:39:42 -0400 (Thu, 16 Sep 2010)
New Revision: 9697
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServerSupport.java
Log:
do not specify a logger delegate factory for remote servers
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServerSupport.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServerSupport.java 2010-09-16 13:41:15 UTC (rev 9696)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServerSupport.java 2010-09-16 14:39:42 UTC (rev 9697)
@@ -96,7 +96,7 @@
public static Process start(String serverClassName, final RemoteProcessHornetQServer remoteProcessHornetQServer) throws Exception
{
- String[] vmArgs = new String[] { "-Dorg.hornetq.logger-delegate-factory-class-name=org.hornetq.jms.SysoutLoggerDelegateFactory" };
+ String[] vmArgs = new String[] {};
Process serverProcess = SpawnedVMSupport.spawnVM(RemoteProcessHornetQServerSupport.class.getName(), vmArgs, false, serverClassName);
InputStreamReader isr = new InputStreamReader(serverProcess.getInputStream());
14 years, 6 months
JBoss hornetq SVN: r9696 - in branches/2_2_0_HA_Improvements: src/main/org/hornetq/core/server/cluster/impl and 4 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-09-16 09:41:15 -0400 (Thu, 16 Sep 2010)
New Revision: 9696
Added:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ServerLocatorConnectTest.java
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/TestableServer.java
Log:
added connector code to connect to static connectors in parallel
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-09-16 12:55:50 UTC (rev 9695)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-09-16 13:41:15 UTC (rev 9696)
@@ -157,8 +157,6 @@
final double retryIntervalMultiplier,
final long maxRetryInterval,
final int reconnectAttempts,
- final int initialConnectAttempts,
- final boolean failoverOnInitialConnection,
final ExecutorService threadPool,
final ScheduledExecutorService scheduledThreadPool,
final List<Interceptor> interceptors) throws HornetQException
@@ -197,6 +195,10 @@
this.interceptors = interceptors;
+ }
+
+ public void connect(int initialConnectAttempts, boolean failoverOnInitialConnection) throws HornetQException
+ {
// Get the connection
getConnectionWithRetry(initialConnectAttempts);
@@ -210,9 +212,9 @@
// Try and connect to the backup
log.warn("Server is not available to make initial connection to. Will try backup server instead.");
-
+
this.connectorConfig = backupConfig;
-
+
connectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
transportParams = this.connectorConfig.getParams();
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java 2010-09-16 12:55:50 UTC (rev 9695)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java 2010-09-16 13:41:15 UTC (rev 9696)
@@ -12,6 +12,7 @@
*/
package org.hornetq.core.client.impl;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.SessionFailureListener;
@@ -36,4 +37,6 @@
int numSessions();
void removeSession(final ClientSessionInternal session, boolean failingOver);
+
+ void connect(int reconnectAttempts, boolean failoverOnInitialConnection) throws HornetQException;
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-09-16 12:55:50 UTC (rev 9695)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-09-16 13:41:15 UTC (rev 9696)
@@ -18,17 +18,8 @@
import java.net.InetAddress;
import java.security.AccessController;
import java.security.PrivilegedAction;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
+import java.util.*;
+import java.util.concurrent.*;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
@@ -71,6 +62,8 @@
private TransportConfiguration[] initialConnectors;
+ private StaticConnector staticConnector = new StaticConnector();
+
private Topology topology = new Topology();
private Pair<TransportConfiguration, TransportConfiguration>[] topologyArray;
@@ -162,9 +155,9 @@
private String groupID;
private String nodeID;
-
+
private TransportConfiguration clusterTransportConfiguration;
-
+
private boolean backup;
private static synchronized ExecutorService getGlobalThreadPool()
@@ -184,8 +177,8 @@
if (globalScheduledThreadPool == null)
{
ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-scheduled-threads",
- true,
- getThisClassLoader());
+ true,
+ getThisClassLoader());
globalScheduledThreadPool = Executors.newScheduledThreadPool(HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
@@ -428,48 +421,22 @@
{
initialise();
}
-
- public void connect()
+
+ public ClientSessionFactory connect() throws Exception
{
+ ClientSessionFactory sf;
// static list of initial connectors
if (initialConnectors != null && discoveryGroup == null)
{
- for (TransportConfiguration connector : initialConnectors)
- {
- ClientSessionFactory sf = null;
- do
- {
- try
- {
- sf = createSessionFactory(connector);
- }
- catch (HornetQException e)
- {
- if (e.getCode() == HornetQException.NOT_CONNECTED)
- {
- continue;
- }
- }
- catch (Exception e)
- {
- break;
- }
- }
- while (sf == null);
- }
+ sf = staticConnector.connect();
}
// wait for discovery group to get the list of initial connectors
else
{
- try
- {
- ClientSessionFactory sf = createSessionFactory();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
+ sf = createSessionFactory();
}
+ factories.add(sf);
+ return sf;
}
public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
@@ -488,7 +455,7 @@
throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
}
- ClientSessionFactory factory = new ClientSessionFactoryImpl(this,
+ ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this,
transportConfiguration,
callTimeout,
clientFailureCheckPeriod,
@@ -497,14 +464,14 @@
retryIntervalMultiplier,
maxRetryInterval,
reconnectAttempts,
- initialConnectAttempts,
- failoverOnInitialConnection,
threadPool,
scheduledThreadPool,
interceptors);
+ factory.connect(reconnectAttempts, failoverOnInitialConnection);
+
factories.add(factory);
-
+
return factory;
}
@@ -537,7 +504,7 @@
}
}
- ClientSessionFactory factory = null;
+ ClientSessionFactoryInternal factory = null;
synchronized (this)
{
@@ -562,11 +529,10 @@
retryIntervalMultiplier,
maxRetryInterval,
reconnectAttempts,
- initialConnectAttempts,
- failoverOnInitialConnection,
threadPool,
scheduledThreadPool,
interceptors);
+ factory.connect(reconnectAttempts, failoverOnInitialConnection);
}
catch (HornetQException e)
{
@@ -582,7 +548,7 @@
if (topologyArray == null && initialConnectors != null && attempts == initialConnectors.length)
{
throw new HornetQException(HornetQException.NOT_CONNECTED,
- "Cannot connect to server(s). Tried with all available servers.");
+ "Cannot connect to server(s). Tried with all available servers.");
}
retry = true;
}
@@ -1007,7 +973,7 @@
{
this.nodeID = nodeID;
}
-
+
public String getNodeID()
{
return nodeID;
@@ -1022,12 +988,12 @@
{
return clusterConnection;
}
-
+
public TransportConfiguration getClusterTransportConfiguration()
{
return clusterTransportConfiguration;
}
-
+
public void setClusterTransportConfiguration(TransportConfiguration tc)
{
this.clusterTransportConfiguration = tc;
@@ -1037,17 +1003,12 @@
{
return backup;
}
-
+
public void setBackup(boolean backup)
{
this.backup = backup;
}
- public void announceBackup()
- {
- connect();
- }
-
@Override
protected void finalize() throws Throwable
{
@@ -1074,6 +1035,10 @@
log.error("Failed to stop discovery group", e);
}
}
+ else
+ {
+ staticConnector.disconnect();
+ }
for (ClientSessionFactory factory : factories)
{
@@ -1214,12 +1179,19 @@
{
this.initialConnectors[count++] = entry.getConnector();
}
-
+
if (ha && clusterConnection && !receivedTopology && initialConnectors.length > 0)
{
// FIXME the node is alone in the cluster. We create a connection to the new node
// to trigger the node notification to form the cluster.
- connect();
+ try
+ {
+ connect();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
}
}
@@ -1253,4 +1225,176 @@
return pairs.get(live);
}
+ class StaticConnector
+ {
+ private List<Connector> connectors;
+
+ public ClientSessionFactory connect() throws HornetQException
+ {
+ if (closed)
+ {
+ throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+ }
+
+ try
+ {
+ initialise();
+ }
+ catch (Exception e)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+ }
+
+ ClientSessionFactory csf = null;
+
+ createConnectors();
+
+ try
+ {
+ List<Future<ClientSessionFactory>> futures = threadPool.invokeAll(connectors);
+ for (int i = 0, futuresSize = futures.size(); i < futuresSize; i++)
+ {
+ Future<ClientSessionFactory> future = futures.get(i);
+ try
+ {
+ csf = future.get();
+ if(csf != null)
+ break;
+ }
+ catch (Exception e)
+ {
+ log.debug("unable to connect with static connector " + connectors.get(i).initialConnector);
+ }
+ }
+ }
+ catch (InterruptedException e)
+ {
+ throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors", e);
+ }
+
+ if (csf == null)
+ {
+ throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors");
+ }
+ return csf;
+ }
+
+ private synchronized void createConnectors()
+ {
+ connectors = new ArrayList<Connector>();
+ for (TransportConfiguration initialConnector : initialConnectors)
+ {
+ connectors.add(new Connector(initialConnector));
+ }
+ }
+
+
+ public synchronized void disconnect()
+ {
+ if (connectors != null)
+ {
+ for (Connector connector : connectors)
+ {
+ connector.disconnect();
+ }
+ }
+ }
+
+
+ class Connector implements Callable<ClientSessionFactory>
+ {
+ private TransportConfiguration initialConnector;
+ private ClientSessionFactoryInternal factory;
+ private boolean isConnected = false;
+ private boolean interrupted = false;
+ private Exception e;
+
+ public Connector(TransportConfiguration initialConnector)
+ {
+ this.initialConnector = initialConnector;
+ }
+
+ public ClientSessionFactory call() throws HornetQException
+ {
+ factory = getFactory();
+ try
+ {
+ factory.connect(reconnectAttempts, failoverOnInitialConnection);
+ }
+ catch (HornetQException e)
+ {
+ if (!interrupted)
+ {
+ this.e = e;
+ throw e;
+ }
+ return null;
+ }
+ isConnected = true;
+ for (Connector connector : connectors)
+ {
+ if (!connector.isConnected())
+ {
+ connector.disconnect();
+ }
+ }
+ return factory;
+ }
+
+ public boolean isConnected()
+ {
+ return isConnected;
+ }
+
+ public void disconnect()
+ {
+ interrupted = true;
+ try
+ {
+ ClientSessionFactoryInternal factory = getFactory();
+ if (factory != null)
+ {
+ factory.causeExit();
+ }
+ else
+ {
+ System.out.println("ServerLocatorImpl$StaticConnector$Connector.disconnect");
+ }
+ }
+ catch (HornetQException e1)
+ {
+ log.debug("exception closing factory");
+ }
+ }
+
+ private synchronized ClientSessionFactoryInternal getFactory() throws HornetQException
+ {
+ if (factory == null)
+ {
+ try
+ {
+ initialise();
+ }
+ catch (Exception e)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+ }
+
+ factory = new ClientSessionFactoryImpl(ServerLocatorImpl.this,
+ initialConnector,
+ callTimeout,
+ clientFailureCheckPeriod,
+ connectionTTL,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetryInterval,
+ reconnectAttempts,
+ threadPool,
+ scheduledThreadPool,
+ interceptors);
+ }
+ return factory;
+ }
+ }
+ }
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-09-16 12:55:50 UTC (rev 9695)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-09-16 13:41:15 UTC (rev 9696)
@@ -13,6 +13,7 @@
package org.hornetq.core.client.impl;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSessionFactory;
@@ -37,7 +38,7 @@
String getNodeID();
- void connect();
+ ClientSessionFactory connect() throws Exception;
void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance);
@@ -55,7 +56,5 @@
void setBackup(boolean backup);
- void announceBackup();
-
void setInitialConnectAttempts(int reconnectAttempts);
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-09-16 12:55:50 UTC (rev 9695)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-09-16 13:41:15 UTC (rev 9696)
@@ -134,7 +134,10 @@
this.serverLocator.setClusterTransportConfiguration(connector);
this.serverLocator.setBackup(server.getConfiguration().isBackup());
this.serverLocator.setReconnectAttempts(-1);
- this.serverLocator.setRetryInterval(retryInterval);
+ if(retryInterval > 0)
+ {
+ this.serverLocator.setRetryInterval(retryInterval);
+ }
// a cluster connection will connect to other nodes only if they are directly connected
// through a static list of connectors
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-09-16 12:55:50 UTC (rev 9695)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-09-16 13:41:15 UTC (rev 9696)
@@ -183,7 +183,7 @@
private void deployBackupListener(BackupConnectorConfiguration connectorConfiguration)
throws Exception
{
- ServerLocator locator;
+ ServerLocatorInternal locator;
if (connectorConfiguration.getDiscoveryGroupName() != null)
{
DiscoveryGroupConfiguration groupConfiguration = configuration.getDiscoveryGroupConfigurations().get(connectorConfiguration.getDiscoveryGroupName());
@@ -217,7 +217,7 @@
//todo update the topology
}
});
- backupSessionFactory = locator.createSessionFactory();
+ backupSessionFactory = locator.connect();
backupSessionFactory.getConnection().getChannel(0, -1).send(new NodeAnnounceMessage(nodeUUID.toString(), true, configuration.getConnectorConfigurations().get(connectorConfiguration.getConnector())));
}
@@ -370,7 +370,7 @@
}
// backup node becomes live
- public synchronized void activate()
+ public synchronized void activate()
{
if (backup)
{
@@ -437,10 +437,15 @@
}
}
- if (clusterConnections.size() > 0)
+ for (ClusterTopologyListener listener : clientListeners)
{
- announceNode();
+ listener.nodeUP(nodeID, member.getConnector(), false, member.getDistance());
}
+
+ for (ClusterTopologyListener listener : clusterConnectionListeners)
+ {
+ listener.nodeUP(nodeID, member.getConnector(), false, member.getDistance());
+ }
}
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-09-16 12:55:50 UTC (rev 9695)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-09-16 13:41:15 UTC (rev 9696)
@@ -601,9 +601,9 @@
configuration.setBackup(false);
+ initialisePart2();
+
clusterManager.activate();
-
- initialisePart2();
log.info("Backup Server is now live");
Added: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ServerLocatorConnectTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ServerLocatorConnectTest.java (rev 0)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ServerLocatorConnectTest.java 2010-09-16 13:41:15 UTC (rev 9696)
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.ServiceTestBase;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * User: andy
+ * Date: Sep 15, 2010
+ * Time: 2:27:07 PM
+ * * @author <a href="mailto:andy.taylor@jboss.com">Andy Taylor</a>
+ */
+public class ServerLocatorConnectTest extends ServiceTestBase
+{
+ private HornetQServer server;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ Configuration configuration = createDefaultConfig(isNetty());
+ server = createServer(false, configuration);
+ server.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ server.stop();
+ super.tearDown();
+ }
+
+ public void testSingleConnectorSingleServer() throws Exception
+ {
+
+ ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(createTransportConfiguration(isNetty(), false, generateParams(0, isNetty())));
+ ClientSessionFactory csf = locator.createSessionFactory();
+ csf.close();
+ }
+
+ public void testSingleConnectorSingleServerConnect() throws Exception
+ {
+ ServerLocatorInternal locator = (ServerLocatorInternal) HornetQClient.createServerLocatorWithoutHA(createTransportConfiguration(isNetty(), false, generateParams(0, isNetty())));
+ ClientSessionFactoryInternal csf = (ClientSessionFactoryInternal) locator.connect();
+ assertNotNull(csf);
+ assertEquals(csf.numConnections(), 1);
+ locator.close();
+ }
+
+ public void testMultipleConnectorSingleServerConnect() throws Exception
+ {
+ ServerLocatorInternal locator = (ServerLocatorInternal) HornetQClient.createServerLocatorWithoutHA(
+ createTransportConfiguration(isNetty(), false, generateParams(0, isNetty())),
+ createTransportConfiguration(isNetty(), false, generateParams(1, isNetty())),
+ createTransportConfiguration(isNetty(), false, generateParams(2, isNetty())),
+ createTransportConfiguration(isNetty(), false, generateParams(3, isNetty())),
+ createTransportConfiguration(isNetty(), false, generateParams(4, isNetty()))
+ );
+ ClientSessionFactoryInternal csf = (ClientSessionFactoryInternal) locator.connect();
+ assertNotNull(csf);
+ assertEquals(csf.numConnections(), 1);
+ locator.close();
+ }
+
+ public void testMultipleConnectorSingleServerConnectReconnect() throws Exception
+ {
+ ServerLocatorInternal locator = (ServerLocatorInternal) HornetQClient.createServerLocatorWithoutHA(
+ createTransportConfiguration(isNetty(), false, generateParams(0, isNetty())),
+ createTransportConfiguration(isNetty(), false, generateParams(1, isNetty())),
+ createTransportConfiguration(isNetty(), false, generateParams(2, isNetty())),
+ createTransportConfiguration(isNetty(), false, generateParams(3, isNetty())),
+ createTransportConfiguration(isNetty(), false, generateParams(4, isNetty()))
+ );
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal csf = (ClientSessionFactoryInternal) locator.connect();
+ assertNotNull(csf);
+ assertEquals(csf.numConnections(), 1);
+ locator.close();
+ }
+
+ public void testMultipleConnectorSingleServerNoConnect() throws Exception
+ {
+ ServerLocatorInternal locator = (ServerLocatorInternal) HornetQClient.createServerLocatorWithoutHA(
+ createTransportConfiguration(isNetty(), false, generateParams(1, isNetty())),
+ createTransportConfiguration(isNetty(), false, generateParams(2, isNetty())),
+ createTransportConfiguration(isNetty(), false, generateParams(3, isNetty())),
+ createTransportConfiguration(isNetty(), false, generateParams(4, isNetty())),
+ createTransportConfiguration(isNetty(), false, generateParams(5, isNetty()))
+ );
+ ClientSessionFactoryInternal csf = null;
+ try
+ {
+ csf = (ClientSessionFactoryInternal) locator.connect();
+ }
+ catch (Exception e)
+ {
+ assertTrue(e instanceof HornetQException);
+ assertEquals(((HornetQException)e).getCode(), HornetQException.NOT_CONNECTED);
+ }
+ assertNull(csf);
+ locator.close();
+ }
+
+ public void testMultipleConnectorSingleServerNoConnectAttemptReconnect() throws Exception
+ {
+ ServerLocatorInternal locator = (ServerLocatorInternal) HornetQClient.createServerLocatorWithoutHA(
+ createTransportConfiguration(isNetty(), false, generateParams(1, isNetty())),
+ createTransportConfiguration(isNetty(), false, generateParams(2, isNetty())),
+ createTransportConfiguration(isNetty(), false, generateParams(3, isNetty())),
+ createTransportConfiguration(isNetty(), false, generateParams(4, isNetty())),
+ createTransportConfiguration(isNetty(), false, generateParams(5, isNetty()))
+ );
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal csf = null;
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ Connector target = new Connector(locator, countDownLatch);
+ Thread t = new Thread(target);
+ t.start();
+ //let them get started
+ Thread.sleep(1500);
+ locator.close();
+ assertTrue(countDownLatch.await(5, TimeUnit.SECONDS));
+ assertTrue(target.e instanceof HornetQException);
+ assertEquals(((HornetQException)target.e).getCode(), HornetQException.NOT_CONNECTED);
+ }
+
+ public boolean isNetty()
+ {
+ return false;
+ }
+
+ static class Connector implements Runnable
+ {
+ private ServerLocatorInternal locator;
+ CountDownLatch latch;
+ Exception e;
+ public Connector(ServerLocatorInternal locator, CountDownLatch latch)
+ {
+ this.locator = locator;
+ this.latch = latch;
+ }
+
+ public void run()
+ {
+ try
+ {
+ locator.connect();
+ }
+ catch (Exception e)
+ {
+ this.e = e;
+ }
+ latch.countDown();
+ }
+ }
+}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2010-09-16 12:55:50 UTC (rev 9695)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2010-09-16 13:41:15 UTC (rev 9696)
@@ -34,14 +34,34 @@
{
protected Map<Integer, TestableServer> servers = new HashMap<Integer, TestableServer>();
+ @Override
+ protected void tearDown() throws Exception
+ {
+ for (TestableServer testableServer : servers.values())
+ {
+ if(testableServer != null)
+ {
+ try
+ {
+ testableServer.destroy();
+ }
+ catch (Throwable e)
+ {
+ //
+ }
+ }
+ }
+ super.tearDown();
+ }
+
public void testMultipleFailovers2LiveServers() throws Exception
{
createLiveConfig(0, 3);
createBackupConfig(0, 1, true, 0, 3);
- createBackupConfig(0, 2,true, 0, 3);
+ createBackupConfig(0, 2, true, 0, 3);
createLiveConfig(3, 0);
- createBackupConfig(3, 4, true,0, 3);
- createBackupConfig(3, 5, true,0, 3);
+ createBackupConfig(3, 4, true, 0, 3, 1, 4);
+ createBackupConfig(3, 5, true, 0, 3, 1, 4);
servers.get(0).start();
servers.get(3).start();
servers.get(1).start();
@@ -60,7 +80,7 @@
servers.get(0).crash(session);
int liveAfter0 = waitForBackup(10000, servers, 1, 2);
-
+
ServerLocator locator2 = getServerLocator(3);
locator2.setBlockOnNonDurableSend(true);
locator2.setBlockOnDurableSend(true);
@@ -74,23 +94,23 @@
if (liveAfter0 == 2)
{
servers.get(1).stop();
- servers.get(2).stop();
+ servers.get(2).stop();
}
else
{
servers.get(2).stop();
- servers.get(1).stop();
+ servers.get(1).stop();
}
-
+
if (liveAfter3 == 4)
{
servers.get(5).stop();
- servers.get(4).stop();
+ servers.get(4).stop();
}
else
{
servers.get(4).stop();
- servers.get(5).stop();
+ servers.get(5).stop();
}
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java 2010-09-16 12:55:50 UTC (rev 9695)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java 2010-09-16 13:41:15 UTC (rev 9696)
@@ -65,7 +65,15 @@
}
return false;
}
-
+
+ public void destroy()
+ {
+ if(serverProcess != null)
+ {
+ serverProcess.destroy();
+ }
+ }
+
public void setInitialised(boolean initialised)
{
this.initialised = initialised;
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java 2010-09-16 12:55:50 UTC (rev 9695)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java 2010-09-16 13:41:15 UTC (rev 9696)
@@ -56,7 +56,12 @@
{
return server.isInitialised();
}
-
+
+ public void destroy()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void start() throws Exception
{
server.start();
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/TestableServer.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/TestableServer.java 2010-09-16 12:55:50 UTC (rev 9695)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/TestableServer.java 2010-09-16 13:41:15 UTC (rev 9696)
@@ -31,4 +31,6 @@
public void crash(ClientSession... sessions) throws Exception;
public boolean isInitialised();
+
+ void destroy();
}
14 years, 6 months
JBoss hornetq SVN: r9695 - in trunk: tests/src/org/hornetq/tests/soak/client and 1 other directory.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-09-16 08:55:50 -0400 (Thu, 16 Sep 2010)
New Revision: 9695
Modified:
trunk/src/main/org/hornetq/utils/LinkedListImpl.java
trunk/tests/src/org/hornetq/tests/soak/client/SimpleSendReceiveSoakTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-469
Modified: trunk/src/main/org/hornetq/utils/LinkedListImpl.java
===================================================================
--- trunk/src/main/org/hornetq/utils/LinkedListImpl.java 2010-09-16 12:11:07 UTC (rev 9694)
+++ trunk/src/main/org/hornetq/utils/LinkedListImpl.java 2010-09-16 12:55:50 UTC (rev 9695)
@@ -176,6 +176,10 @@
{
LinkedListImpl.this.nudgeIterators(toRemove);
}
+
+ //Help GC - otherwise GC potentially has to traverse a very long list to see if elements are reachable, this can result in OOM
+ //https://jira.jboss.org/browse/HORNETQ-469
+ toRemove.next = toRemove.prev = null;
}
private void nudgeIterators(Node<E> node)
Modified: trunk/tests/src/org/hornetq/tests/soak/client/SimpleSendReceiveSoakTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/soak/client/SimpleSendReceiveSoakTest.java 2010-09-16 12:11:07 UTC (rev 9694)
+++ trunk/tests/src/org/hornetq/tests/soak/client/SimpleSendReceiveSoakTest.java 2010-09-16 12:55:50 UTC (rev 9695)
@@ -88,7 +88,7 @@
server = null;
}
- public void testSoakClient() throws Exception
+ public void testSoakClientTransactions() throws Exception
{
final ClientSessionFactory sf = createFactory(IS_NETTY);
@@ -123,8 +123,6 @@
producer.send(msg);
}
- session.commit();
-
for (int i = 0; i < MIN_MESSAGES_ON_QUEUE; i++)
{
ClientMessage msg = consumer.receive(5000);
@@ -132,8 +130,6 @@
msg.acknowledge();
assertEquals(msgReceivedID++, msg.getLongProperty("count").longValue());
}
-
- sessionConsumer.commit();
}
sessionConsumer.close();
@@ -141,6 +137,7 @@
sf.close();
}
+
// Package protected ---------------------------------------------
14 years, 6 months
JBoss hornetq SVN: r9693 - trunk/tests/src/org/hornetq/tests/soak/client.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-09-15 16:30:04 -0400 (Wed, 15 Sep 2010)
New Revision: 9693
Added:
trunk/tests/src/org/hornetq/tests/soak/client/SimpleSendReceiveSoakTest.java
Log:
Adding a new simple test on sending and receiving messages to investigate a leak on LinkedList
Added: trunk/tests/src/org/hornetq/tests/soak/client/SimpleSendReceiveSoakTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/soak/client/SimpleSendReceiveSoakTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/soak/client/SimpleSendReceiveSoakTest.java 2010-09-15 20:30:04 UTC (rev 9693)
@@ -0,0 +1,153 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.soak.client;
+
+import java.util.HashMap;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A ClientSoakTest
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class SimpleSendReceiveSoakTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private static final SimpleString ADDRESS = new SimpleString("ADD");
+
+ private static final boolean IS_NETTY = false;
+
+ private static final boolean IS_JOURNAL = false;
+
+ public static final int MIN_MESSAGES_ON_QUEUE = 1000;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ private HornetQServer server;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig(SimpleSendReceiveSoakTest.IS_NETTY);
+
+ config.setJournalFileSize(10 * 1024 * 1024);
+
+ server = createServer(IS_JOURNAL, config, -1, -1, new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ ClientSessionFactory sf = createFactory(SimpleSendReceiveSoakTest.IS_NETTY);
+
+ ClientSession session = sf.createSession();
+
+ session.createQueue(SimpleSendReceiveSoakTest.ADDRESS, SimpleSendReceiveSoakTest.ADDRESS, true);
+
+ session.close();
+
+ sf.close();
+
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ server.stop();
+ server = null;
+ }
+
+ public void testSoakClient() throws Exception
+ {
+ final ClientSessionFactory sf = createFactory(IS_NETTY);
+
+ ClientSession session = sf.createSession(true, true);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ long msgId = 0;
+
+ long msgReceivedID = 0;
+
+ for (int i = 0; i < MIN_MESSAGES_ON_QUEUE; i++)
+ {
+ ClientMessage msg = session.createMessage(IS_JOURNAL);
+ msg.putLongProperty("count", msgId++);
+ msg.getBodyBuffer().writeBytes(new byte[10 * 1024]);
+ producer.send(msg);
+ }
+
+ ClientSession sessionConsumer = sf.createSession(true, true, 0);
+ ClientConsumer consumer = sessionConsumer.createConsumer(ADDRESS);
+ sessionConsumer.start();
+
+ for (int loopNumber = 0; loopNumber < 1000; loopNumber++)
+ {
+ System.out.println("Loop " + loopNumber);
+ for (int i = 0; i < MIN_MESSAGES_ON_QUEUE; i++)
+ {
+ ClientMessage msg = session.createMessage(IS_JOURNAL);
+ msg.putLongProperty("count", msgId++);
+ msg.getBodyBuffer().writeBytes(new byte[10 * 1024]);
+ producer.send(msg);
+ }
+
+ session.commit();
+
+ for (int i = 0; i < MIN_MESSAGES_ON_QUEUE; i++)
+ {
+ ClientMessage msg = consumer.receive(5000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ assertEquals(msgReceivedID++, msg.getLongProperty("count").longValue());
+ }
+
+ sessionConsumer.commit();
+ }
+
+ sessionConsumer.close();
+ session.close();
+ sf.close();
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
14 years, 6 months
JBoss hornetq SVN: r9692 - trunk/tests/src/org/hornetq/tests/unit/util.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-09-15 11:30:02 -0400 (Wed, 15 Sep 2010)
New Revision: 9692
Modified:
trunk/tests/src/org/hornetq/tests/unit/util/LinkedListTest.java
Log:
Adding test on LinkedList
Modified: trunk/tests/src/org/hornetq/tests/unit/util/LinkedListTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/util/LinkedListTest.java 2010-09-15 14:36:37 UTC (rev 9691)
+++ trunk/tests/src/org/hornetq/tests/unit/util/LinkedListTest.java 2010-09-15 15:30:02 UTC (rev 9692)
@@ -15,10 +15,10 @@
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicInteger;
-import junit.framework.TestCase;
-
import org.hornetq.core.logging.Logger;
+import org.hornetq.tests.util.UnitTestCase;
import org.hornetq.utils.LinkedListImpl;
import org.hornetq.utils.LinkedListIterator;
@@ -29,7 +29,7 @@
*
*
*/
-public class LinkedListTest extends TestCase
+public class LinkedListTest extends UnitTestCase
{
private static final Logger log = Logger.getLogger(LinkedListTest.class);
@@ -43,6 +43,89 @@
list = new LinkedListImpl<Integer>();
}
+ public void testAddAndRemove()
+ {
+ final AtomicInteger count = new AtomicInteger(0);
+ class MyObject
+ {
+
+ public byte payload[];
+
+ MyObject()
+ {
+ count.incrementAndGet();
+ payload = new byte[10 * 1024];
+ }
+
+ protected void finalize() throws Exception
+ {
+ count.decrementAndGet();
+ }
+ };
+
+ LinkedListImpl<MyObject> objs = new LinkedListImpl<MyObject>();
+
+ // Initial add
+ for (int i = 0; i < 1000; i++)
+ {
+ objs.addTail(new MyObject());
+ }
+
+ LinkedListIterator<MyObject> iter = objs.iterator();
+
+ for (int i = 0; i < 5000; i++)
+ {
+
+ for (int add = 0; add < 1000; add++)
+ {
+ objs.addTail(new MyObject());
+ }
+
+ for (int remove = 0; remove < 1000; remove++)
+ {
+ assertNotNull(iter.next());
+ iter.remove();
+ }
+
+ if (i % 1000 == 0)
+ {
+ System.out.println("Checking on " + i);
+
+ for (int gcLoop = 0 ; gcLoop < 5; gcLoop++)
+ {
+ forceGC();
+ if (count.get() == 1000)
+ {
+ break;
+ }
+ else
+ {
+ System.out.println("Trying a GC again");
+ }
+ }
+
+ assertEquals(1000, count.get());
+ }
+ }
+
+ forceGC();
+
+ assertEquals(1000, count.get());
+
+ int removed = 0;
+ while (iter.hasNext())
+ {
+ System.out.println("removed " + (removed++));
+ iter.next();
+ iter.remove();
+ }
+
+ forceGC();
+
+ assertEquals(0, count.get());
+
+ }
+
public void testAddTail()
{
int num = 10;
@@ -1095,29 +1178,29 @@
// Close the odd ones
boolean b = false;
- for (LinkedListIterator<Integer> iter: iters)
+ for (LinkedListIterator<Integer> iter : iters)
{
if (b)
{
- iter.close();
+ iter.close();
}
b = !b;
}
-
+
assertEquals(numIters / 2, list.numIters());
-
+
// close the even ones
-
+
b = true;
- for (LinkedListIterator<Integer> iter: iters)
+ for (LinkedListIterator<Integer> iter : iters)
{
if (b)
{
- iter.close();
+ iter.close();
}
b = !b;
}
-
+
assertEquals(0, list.numIters());
}
14 years, 6 months
JBoss hornetq SVN: r9691 - in branches/Branch_2_1: tests/src/org/hornetq/tests/integration/client and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-09-15 10:36:37 -0400 (Wed, 15 Sep 2010)
New Revision: 9691
Modified:
branches/Branch_2_1/merge-activity.txt
branches/Branch_2_1/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
tweak on paging-test (sync from trunk)
Modified: branches/Branch_2_1/merge-activity.txt
===================================================================
--- branches/Branch_2_1/merge-activity.txt 2010-09-15 14:25:44 UTC (rev 9690)
+++ branches/Branch_2_1/merge-activity.txt 2010-09-15 14:36:37 UTC (rev 9691)
@@ -18,5 +18,7 @@
- 14-sep-2010 - jmesnil - merger from trunk -r 9682:9683 - https://jira.jboss.org/browse/HORNETQ-509
- 14-sep-2010 - clebert - merger from trunk -r 9685:9686
+
+- 15-sep-2010 - clebert - merger from trunk -r 9689:9690 - tweak on PagingTest
TODO: Bring changes from HORNETQ-469 as soon as it's stable (remove this line as soon as it's done)
Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-09-15 14:25:44 UTC (rev 9690)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-09-15 14:36:37 UTC (rev 9691)
@@ -34,7 +34,6 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.DivertConfiguration;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.paging.impl.TestSupportPageStore;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
@@ -268,8 +267,10 @@
threads[i].join();
}
+ assertEquals(0, errors.get());
+
assertEquals(0, server.getPostOffice().getPagingManager().getTransactions().size());
-
+
}
finally
{
14 years, 6 months
JBoss hornetq SVN: r9690 - trunk/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-09-15 10:25:44 -0400 (Wed, 15 Sep 2010)
New Revision: 9690
Modified:
trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
tweak on test
Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-09-15 13:00:32 UTC (rev 9689)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-09-15 14:25:44 UTC (rev 9690)
@@ -34,7 +34,6 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.DivertConfiguration;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.paging.impl.TestSupportPageStore;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
@@ -268,8 +267,10 @@
threads[i].join();
}
+ assertEquals(0, errors.get());
+
assertEquals(0, server.getPostOffice().getPagingManager().getTransactions().size());
-
+
}
finally
{
14 years, 6 months