Author: clebert.suconic
Date: 2011-08-13 01:48:57 -0400 (Sat, 13 Aug 2011)
New Revision: 11199
Added:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
Fixing the testsuite
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-08-12
15:10:56 UTC (rev 11198)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-08-13
05:48:57 UTC (rev 11199)
@@ -1399,27 +1399,19 @@
log.trace("Disconnect being called on client:" + msg + "
server locator = " + serverLocator, new Exception ("trace"));
}
- closeExecutor.execute(new Runnable()
+ conn.fail(new HornetQException(HornetQException.DISCONNECTED,
+ "The connection was disconnected because
of server shutdown"));
+
+ SimpleString nodeID = msg.getNodeID();
+ if (log.isTraceEnabled())
{
- // Must be executed on new thread since cannot block the netty thread for
a long time and fail can
- // cause reconnect loop
- public void run()
- {
- SimpleString nodeID = msg.getNodeID();
- if (log.isTraceEnabled())
- {
- log.trace("notifyDown nodeID=" + msg.getNodeID() + "
on serverLocator=" + serverLocator + " csf created at ",
ClientSessionFactoryImpl.this.e);
- }
- if (nodeID != null)
- {
- serverLocator.notifyNodeDown(msg.getNodeID().toString());
- }
+ log.trace("notifyDown nodeID=" + msg.getNodeID() + " on
serverLocator=" + serverLocator + " csf created at ",
ClientSessionFactoryImpl.this.e);
+ }
+ if (nodeID != null)
+ {
+ serverLocator.notifyNodeDown(msg.getNodeID().toString());
+ }
- conn.fail(new HornetQException(HornetQException.DISCONNECTED,
- "The connection was disconnected
because of server shutdown"));
-
- }
- });
}
else if (type == PacketImpl.CLUSTER_TOPOLOGY)
{
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-08-12
15:10:56 UTC (rev 11198)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-08-13
05:48:57 UTC (rev 11199)
@@ -282,6 +282,11 @@
// ClientSession implementation
// -----------------------------------------------------------------
+
+ public Channel getChannel()
+ {
+ return channel;
+ }
public void createQueue(final SimpleString address, final SimpleString queueName)
throws HornetQException
{
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2011-08-12
15:10:56 UTC (rev 11198)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2011-08-13
05:48:57 UTC (rev 11199)
@@ -17,6 +17,7 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
@@ -61,6 +62,8 @@
void handleFailover(CoreRemotingConnection backupConnection);
RemotingConnection getConnection();
+
+ Channel getChannel();
void cleanUp(boolean failingOver) throws Exception;
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2011-08-12
15:10:56 UTC (rev 11198)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2011-08-13
05:48:57 UTC (rev 11199)
@@ -28,6 +28,7 @@
import org.hornetq.api.core.client.SendAcknowledgementHandler;
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
@@ -584,4 +585,12 @@
return "DelegatingSession [session=" + session + "]";
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.client.impl.ClientSessionInternal#getChannel()
+ */
+ public Channel getChannel()
+ {
+ return session.getChannel();
+ }
+
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-08-12
15:10:56 UTC (rev 11198)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-08-13
05:48:57 UTC (rev 11199)
@@ -69,7 +69,7 @@
private transient String identity;
- private Set<ClientSessionFactory> factories = new
HashSet<ClientSessionFactory>();
+ private Set<ClientSessionFactoryInternal> factories = new
HashSet<ClientSessionFactoryInternal>();
private TransportConfiguration[] initialConnectors;
@@ -84,7 +84,7 @@
private boolean receivedTopology;
private boolean compressLargeMessage;
-
+
// if the system should shutdown the pool when shutting down
private transient boolean shutdownPool;
@@ -252,13 +252,12 @@
private void setThreadPools()
{
- if (threadPool != null)
- {
- return;
- }
- else
- if (useGlobalPools)
+ if (threadPool != null)
{
+ return;
+ }
+ else if (useGlobalPools)
+ {
threadPool = getGlobalThreadPool();
scheduledThreadPool = getGlobalScheduledThreadPool();
@@ -266,7 +265,7 @@
else
{
this.shutdownPool = true;
-
+
ThreadFactory factory = new
HornetQThreadFactory("HornetQ-client-factory-threads-" +
System.identityHashCode(this),
true,
getThisClassLoader());
@@ -369,19 +368,13 @@
private ServerLocatorImpl(final Topology topology,
final boolean useHA,
- final ExecutorService threadPool,
- final ScheduledExecutorService scheduledExecutor,
final DiscoveryGroupConfiguration
discoveryGroupConfiguration,
final TransportConfiguration[] transportConfigs)
{
e.fillInStackTrace();
-
- this.scheduledThreadPool = scheduledExecutor;
-
- this.threadPool = threadPool;
-
+
this.topology = topology;
-
+
this.ha = useHA;
this.discoveryGroupConfiguration = discoveryGroupConfiguration;
@@ -459,8 +452,13 @@
*/
public ServerLocatorImpl(final boolean useHA, final DiscoveryGroupConfiguration
groupConfiguration)
{
- this(new Topology(null), useHA, null, null, groupConfiguration, null);
- topology.setOwner(this);
+ this(useHA ? new Topology(null) : null, useHA, groupConfiguration, null);
+ if (useHA)
+ {
+ // We only set the owner at where the Topology was created.
+ // For that reason we can't set it at the main constructor
+ topology.setOwner(this);
+ }
}
/**
@@ -470,8 +468,13 @@
*/
public ServerLocatorImpl(final boolean useHA, final TransportConfiguration...
transportConfigs)
{
- this(new Topology(null), useHA, null, null, null, transportConfigs);
- topology.setOwner(this);
+ this(useHA ? new Topology(null) : null, useHA, null, transportConfigs);
+ if (useHA)
+ {
+ // We only set the owner at where the Topology was created.
+ // For that reason we can't set it at the main constructor
+ topology.setOwner(this);
+ }
}
/**
@@ -480,10 +483,10 @@
* @param discoveryAddress
* @param discoveryPort
*/
- public ServerLocatorImpl(final Topology topology, final boolean useHA, final
ExecutorService threadPool, final ScheduledExecutorService scheduledExecutor, final
DiscoveryGroupConfiguration groupConfiguration)
+ public ServerLocatorImpl(final Topology topology, final boolean useHA, final
DiscoveryGroupConfiguration groupConfiguration)
{
- this(topology, useHA, threadPool, scheduledExecutor, groupConfiguration, null);
-
+ this(topology, useHA, groupConfiguration, null);
+
}
/**
@@ -491,9 +494,11 @@
*
* @param transportConfigs
*/
- public ServerLocatorImpl(final Topology topology, final boolean useHA, final
ExecutorService threadPool, final ScheduledExecutorService scheduledExecutor, final
TransportConfiguration... transportConfigs)
+ public ServerLocatorImpl(final Topology topology,
+ final boolean useHA,
+ final TransportConfiguration... transportConfigs)
{
- this(topology, useHA, threadPool, scheduledExecutor, null, transportConfigs);
+ this(topology, useHA, null, transportConfigs);
}
private TransportConfiguration selectConnector()
@@ -565,7 +570,7 @@
addFactory(sf);
return sf;
}
-
+
public boolean isClosed()
{
return closed || closing;
@@ -609,7 +614,7 @@
public ClientSessionFactory createSessionFactory() throws Exception
{
- if (closed)
+ if (closed || closing)
{
throw new IllegalStateException("Cannot create session factory, server
locator is closed (maybe it has been garbage collected)");
}
@@ -697,10 +702,12 @@
if (ha || clusterConnection)
{
long timeout = System.currentTimeMillis() + 30000;
- while (!ServerLocatorImpl.this.closed &&
!ServerLocatorImpl.this.closing && !receivedTopology && timeout >
System.currentTimeMillis())
+ while (!ServerLocatorImpl.this.closed &&
!ServerLocatorImpl.this.closing &&
+ !receivedTopology &&
+ timeout > System.currentTimeMillis())
{
// Now wait for the topology
-
+
try
{
wait(1000);
@@ -711,7 +718,7 @@
}
- if (System.currentTimeMillis() > timeout && ! receivedTopology
&& !closed && !closing)
+ if (System.currentTimeMillis() > timeout && !receivedTopology
&& !closed && !closing)
{
throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
"Timed out waiting to receive cluster
topology");
@@ -1137,7 +1144,7 @@
super.finalize();
}
-
+
public void cleanup()
{
doClose(false);
@@ -1147,7 +1154,7 @@
{
doClose(true);
}
-
+
protected void doClose(final boolean sendClose)
{
if (closed)
@@ -1182,7 +1189,7 @@
staticConnector.disconnect();
}
- Set<ClientSessionFactory> clonedFactory = new
HashSet<ClientSessionFactory>(factories);
+ Set<ClientSessionFactoryInternal> clonedFactory = new
HashSet<ClientSessionFactoryInternal>(factories);
for (ClientSessionFactory factory : clonedFactory)
{
@@ -1239,12 +1246,10 @@
public void notifyNodeDown(final String nodeID)
{
- if (!clusterConnection && !ha)
+
+ if (topology == null)
{
- if (log.isDebugEnabled())
- {
- log.debug(this + "::ignoring notifyNodeDown=" + nodeID + " as
isHA=false");
- }
+ // there's no topology here
return;
}
@@ -1254,7 +1259,7 @@
}
topology.removeMember(nodeID);
-
+
if (!topology.isEmpty())
{
updateArraysAndPairs();
@@ -1274,25 +1279,18 @@
}
public void notifyNodeUp(final String nodeID,
- final Pair<TransportConfiguration,
TransportConfiguration> connectorPair,
- final boolean last)
+ final Pair<TransportConfiguration,
TransportConfiguration> connectorPair,
+ final boolean last)
{
- if (!clusterConnection && !ha)
+ if (topology == null)
{
- if (log.isDebugEnabled())
- {
- log.debug(this + "::Ignoring notifyNodeUp for " +
- nodeID +
- " connectorPair=" +
- connectorPair +
- ", since ha=false and clusterConnection=false");
- }
+ // there's no topology
return;
}
if (log.isDebugEnabled())
{
- log.debug("NodeUp " + this + "::nodeID=" + nodeID + ",
connectorPair=" + connectorPair, new Exception ("trace"));
+ log.debug("NodeUp " + this + "::nodeID=" + nodeID + ",
connectorPair=" + connectorPair, new Exception("trace"));
}
topology.addMember(nodeID, new TopologyMember(connectorPair), last);
@@ -1352,7 +1350,7 @@
private synchronized void updateArraysAndPairs()
{
Collection<TopologyMember> membersCopy = topology.getMembers();
-
+
topologyArray = (Pair<TransportConfiguration,
TransportConfiguration>[])Array.newInstance(Pair.class,
membersCopy.size());
@@ -1395,12 +1393,12 @@
{
factories.remove(factory);
- if (!clusterConnection && factories.isEmpty())
+ if (!clusterConnection && factories.isEmpty())
{
// Go back to using the broadcast or static list
receivedTopology = false;
-
+
topologyArray = null;
}
}
@@ -1424,7 +1422,13 @@
{
if (factory != null)
{
- TransportConfiguration backup =
topology.getBackupForConnector(factory.getConnectorConfiguration());
+ TransportConfiguration backup = null;
+
+ if (topology != null)
+ {
+ backup =
topology.getBackupForConnector(factory.getConnectorConfiguration());
+ }
+
factory.setBackupConnector(factory.getConnectorConfiguration(), backup);
factories.add(factory);
}
@@ -1476,10 +1480,11 @@
{
csf.getConnection().addFailureListener(new FailureListener()
{
- // Case the node where we were connected is gone, we need to
restart the connection
+ // Case the node where the cluster connection was connected is
gone, we need to restart the
+ // connection
public void connectionFailed(HornetQException exception, boolean
failedOver)
{
- if (exception.getCode() == HornetQException.DISCONNECTED)
+ if (clusterConnection && exception.getCode() ==
HornetQException.DISCONNECTED)
{
try
{
@@ -1496,7 +1501,7 @@
if (log.isDebugEnabled())
{
- log.debug("XXX Returning " + csf +
+ log.debug("Returning " + csf +
" after " +
retryNumber +
" retries on StaticConnector " +
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-12
15:10:56 UTC (rev 11198)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-13
05:48:57 UTC (rev 11199)
@@ -40,6 +40,8 @@
private static final Logger log = Logger.getLogger(Topology.class);
+ private Executor executor = null;
+
/** Used to debug operations.
*
* Someone may argue this is not needed. But it's impossible to debg anything
related to topology without knowing what node
@@ -49,14 +51,8 @@
* */
private volatile Object owner;
- public Topology(final Object owner)
- {
- this.owner = owner;
- Topology.log.debug("Topology@" +
Integer.toHexString(System.identityHashCode(this)) + " CREATE",
- new Exception("trace")); // Delete this line
- }
- /*
+ /**
* topology describes the other cluster nodes that this server knows about:
*
* keys are node IDs
@@ -64,6 +60,18 @@
*/
private final Map<String, TopologyMember> topology = new
ConcurrentHashMap<String, TopologyMember>();
+ public Topology(final Object owner)
+ {
+ this.owner = owner;
+ Topology.log.debug("Topology@" +
Integer.toHexString(System.identityHashCode(this)) + " CREATE",
+ new Exception("trace")); // Delete this line
+ }
+
+ public void setExecutor(final Executor executor)
+ {
+ this.executor = executor;
+ }
+
public void addClusterTopologyListener(final ClusterTopologyListener listener)
{
if (log.isDebugEnabled())
@@ -164,25 +172,32 @@
if (replaced)
{
+
+
final ArrayList<ClusterTopologyListener> copy = copyListeners();
-
- // Has to use a different thread otherwise we may get dead locks case the remove
is coming from the channel
- for (ClusterTopologyListener listener : copy)
+
+ execute(new Runnable()
{
- if (Topology.log.isTraceEnabled())
+ public void run()
{
- Topology.log.trace(this + " informing " + listener + "
about node up = " + nodeId);
- }
+ for (ClusterTopologyListener listener : copy)
+ {
+ if (Topology.log.isTraceEnabled())
+ {
+ Topology.log.trace(this + " informing " + listener +
" about node up = " + nodeId);
+ }
- try
- {
- listener.nodeUP(nodeId, member.getConnector(), last);
+ try
+ {
+ listener.nodeUP(nodeId, member.getConnector(), last);
+ }
+ catch (Throwable e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ }
}
- catch (Throwable e)
- {
- log.warn(e.getMessage(), e);
- }
- }
+ });
}
return replaced;
@@ -212,7 +227,7 @@
if (Topology.log.isDebugEnabled())
{
- Topology.log.debug("ZZZ removeMember " + this +
+ Topology.log.debug("removeMember " + this +
" removing nodeID=" +
nodeId +
", result=" +
@@ -225,17 +240,43 @@
{
final ArrayList<ClusterTopologyListener> copy = copyListeners();
- for (ClusterTopologyListener listener : copy)
+ execute(new Runnable()
{
- if (Topology.log.isTraceEnabled())
+ public void run()
{
- Topology.log.trace(this + " informing " + listener + "
about node down = " + nodeId);
+ for (ClusterTopologyListener listener : copy)
+ {
+ if (Topology.log.isTraceEnabled())
+ {
+ Topology.log.trace(this + " informing " + listener +
" about node down = " + nodeId);
+ }
+ try
+ {
+ listener.nodeDown(nodeId);
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ }
}
- listener.nodeDown(nodeId);
- }
+ });
+
}
return member != null;
}
+
+ protected void execute(final Runnable runnable)
+ {
+ if (executor != null)
+ {
+ executor.execute(runnable);
+ }
+ else
+ {
+ runnable.run();
+ }
+ }
/**
* it will send all the member updates to listeners, independently of being changed or
not
@@ -264,7 +305,7 @@
}
}
- public synchronized void sendTopology(final ClusterTopologyListener listener)
+ public void sendTopology(final ClusterTopologyListener listener)
{
int count = 0;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java 2011-08-12
15:10:56 UTC (rev 11198)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java 2011-08-13
05:48:57 UTC (rev 11199)
@@ -53,4 +53,10 @@
void pause() throws Exception;
void resume() throws Exception;
+
+ /**
+ * To be called when the server sent a disconnect to the client.
+ * Basically this is for cluster bridges being disconnected
+ */
+ void disconnect();
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java 2011-08-12
15:10:56 UTC (rev 11198)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java 2011-08-13
05:48:57 UTC (rev 11199)
@@ -33,6 +33,8 @@
Bridge getBridge();
void close() throws Exception;
+
+ void serverDisconnected();
boolean isClosed();
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-08-12
15:10:56 UTC (rev 11198)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-08-13
05:48:57 UTC (rev 11199)
@@ -25,14 +25,14 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientProducer;
-import org.hornetq.api.core.client.ClientSession.BindingQuery;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.SendAcknowledgementHandler;
-import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.client.SessionFailureListener;
+import org.hornetq.api.core.client.ClientSession.BindingQuery;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.filter.impl.FilterImpl;
import org.hornetq.core.logging.Logger;
@@ -47,6 +47,7 @@
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.server.management.NotificationService;
import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.utils.ConcurrentHashSet;
import org.hornetq.utils.Future;
import org.hornetq.utils.TypedProperties;
import org.hornetq.utils.UUID;
@@ -77,7 +78,7 @@
private static final SimpleString JMS_TOPIC_ADDRESS_PREFIX = new
SimpleString("jms.topic.");
- protected final ServerLocator serverLocator;
+ protected final ServerLocatorInternal serverLocator;
private final UUID nodeUUID;
@@ -138,7 +139,7 @@
// Public --------------------------------------------------------
- public BridgeImpl(final ServerLocator serverLocator,
+ public BridgeImpl(final ServerLocatorInternal serverLocator,
final int reconnectAttempts,
final long retryInterval,
final double retryMultiplier,
@@ -159,7 +160,7 @@
{
this.reconnectAttempts = reconnectAttempts;
-
+
this.reconnectAttemptsInUse = -1;
this.retryInterval = retryInterval;
@@ -199,7 +200,6 @@
{
this.notificationService = notificationService;
}
-
public synchronized void start() throws Exception
{
if (started)
@@ -278,6 +278,35 @@
}
}
+ public void disconnect()
+ {
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ if (session != null)
+ {
+ try
+ {
+ session.cleanUp(false);
+ }
+ catch (Exception dontcare)
+ {
+ log.debug(dontcare.getMessage(), dontcare);
+ }
+ session = null;
+ }
+ }
+ });
+ }
+
+ /** The cluster manager needs to use the same executor to close the serverLocator,
otherwise the stop will break.
+ * This method is intended to expose this executor to the ClusterManager */
+ public Executor getExecutor()
+ {
+ return executor;
+ }
+
public void stop() throws Exception
{
if (log.isDebugEnabled())
@@ -468,7 +497,7 @@
{
if (log.isDebugEnabled())
{
- log.debug(this + "::Ignoring reference on bridge as it is set to
iniactive ref=" + ref);
+ log.debug(this + "::Ignoring reference on bridge as it is set to
iniactive ref=" + ref);
}
return HandleStatus.BUSY;
}
@@ -521,9 +550,9 @@
public void connectionFailed(final HornetQException me, boolean failedOver)
{
-
+
log.warn(this + "::Connection failed with failedOver=" + failedOver +
"-" + me, me);
-
+
try
{
csf.cleanup();
@@ -539,12 +568,12 @@
catch (Throwable dontCare)
{
}
-
+
fail(me.getCode() == HornetQException.DISCONNECTED);
tryScheduleRetryReconnect(me.getCode());
}
-
+
protected void tryScheduleRetryReconnect(final int code)
{
scheduleRetryConnect();
@@ -552,8 +581,8 @@
public void beforeReconnect(final HornetQException exception)
{
-// log.warn(name + "::Connection failed before reconnect ", exception);
-// fail(false);
+ // log.warn(name + "::Connection failed before reconnect ", exception);
+ // fail(false);
}
// Package protected ---------------------------------------------
@@ -568,7 +597,15 @@
@Override
public String toString()
{
- return this.getClass().getSimpleName() + "@" +
Integer.toHexString(System.identityHashCode(this)) + " [name=" + name + ",
queue=" + queue + " targetConnector=" + this.serverLocator +
"]";
+ return this.getClass().getSimpleName() + "@" +
+ Integer.toHexString(System.identityHashCode(this)) +
+ " [name=" +
+ name +
+ ", queue=" +
+ queue +
+ " targetConnector=" +
+ this.serverLocator +
+ "]";
}
protected void fail(final boolean permanently)
@@ -590,7 +627,7 @@
log.debug(dontcare);
}
}
-
+
cancelRefs();
if (queue != null)
{
@@ -615,7 +652,6 @@
{
ClientSessionFactoryInternal csf =
(ClientSessionFactoryInternal)serverLocator.createSessionFactory();
csf.setReconnectAttempts(0);
- //csf.setInitialReconnectAttempts(1);
return csf;
}
@@ -625,7 +661,7 @@
BridgeImpl.log.debug("Connecting " + this + " to its destination
[" + nodeUUID.toString() + "], csf=" + this.csf);
retryCount++;
-
+
try
{
if (csf == null || csf.isClosed())
@@ -712,15 +748,14 @@
{
log.debug("Bridge " + this + " is unable to connect to
destination. Retrying", e);
}
+
+ scheduleRetryConnect();
}
}
catch (Exception e)
{
BridgeImpl.log.warn("Bridge " + this + " is unable to connect to
destination. It will be disabled.", e);
}
-
- scheduleRetryConnect();
-
}
protected void scheduleRetryConnect()
@@ -730,7 +765,7 @@
log.warn("ServerLocator was shutdown, can't retry on opening connection
for bridge");
return;
}
-
+
if (reconnectAttemptsInUse >= 0 && retryCount > reconnectAttempts)
{
log.warn("Bridge " + this.name +
@@ -752,9 +787,15 @@
{
timeout = maxRetryInterval;
}
-
- log.debug("Bridge " + this + " retrying connection #" +
retryCount + ", maxRetry=" + reconnectAttemptsInUse + ", timeout=" +
timeout);
+ log.debug("Bridge " + this +
+ " retrying connection #" +
+ retryCount +
+ ", maxRetry=" +
+ reconnectAttemptsInUse +
+ ", timeout=" +
+ timeout);
+
scheduleRetryConnectFixedTimeout(timeout);
}
@@ -770,13 +811,15 @@
{
}
}
-
+
if (log.isDebugEnabled())
{
log.debug("Scheduling retry for bridge " + this.name + " in
" + milliseconds + " milliseconds");
}
- futureScheduledReconnection = scheduledExecutor.schedule(new
FutureConnectRunnable(), milliseconds, TimeUnit.MILLISECONDS);
+ futureScheduledReconnection = scheduledExecutor.schedule(new
FutureConnectRunnable(),
+ milliseconds,
+ TimeUnit.MILLISECONDS);
}
// Inner classes -------------------------------------------------
@@ -788,12 +831,23 @@
try
{
log.debug("stopping bridge " + BridgeImpl.this);
-
+
+ queue.removeConsumer(BridgeImpl.this);
+
+ internalCancelReferences();
+
if (session != null)
{
log.debug("Cleaning up session " + session);
- session.close();
session.removeFailureListener(BridgeImpl.this);
+ try
+ {
+ session.close();
+ session = null;
+ }
+ catch (Exception dontcare)
+ {
+ }
}
if (csf != null)
@@ -815,10 +869,6 @@
{
log.trace("Removing consumer on stopRunnable " + this + "
from queue " + queue);
}
- queue.removeConsumer(BridgeImpl.this);
-
- internalCancelReferences();
-
log.info("stopped bridge " + name);
}
catch (Exception e)
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-08-12
15:10:56 UTC (rev 11198)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-08-13
05:48:57 UTC (rev 11199)
@@ -24,15 +24,16 @@
import org.hornetq.api.core.client.ClientConsumer;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
-import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.api.core.management.ResourceNames;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.BindingType;
+import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.cluster.ClusterConnection;
@@ -57,6 +58,8 @@
private final ClusterConnection clusterConnection;
+ private final ClusterManagerInternal clusterManager;
+
private final MessageFlowRecord flowRecord;
private final SimpleString managementAddress;
@@ -74,7 +77,8 @@
private final ServerLocatorInternal discoveryLocator;
public ClusterConnectionBridge(final ClusterConnection clusterConnection,
- final ServerLocator targetLocator,
+ final ClusterManagerInternal clusterManager,
+ final ServerLocatorInternal targetLocator,
final ServerLocatorInternal discoveryLocator,
final int reconnectAttempts,
final long retryInterval,
@@ -124,6 +128,8 @@
this.clusterConnection = clusterConnection;
+ this.clusterManager = clusterManager;
+
this.targetNodeID = targetNodeID;
this.managementAddress = managementAddress;
this.managementNotificationAddress = managementNotificationAddress;
@@ -132,13 +138,32 @@
// we need to disable DLQ check on the clustered bridges
queue.setInternalQueue(true);
-
+
if (log.isDebugEnabled())
{
- log.debug("Setting up bridge between " +
clusterConnection.getConnector() + " and " + targetLocator, new Exception
("trace"));
+ log.debug("Setting up bridge between " +
clusterConnection.getConnector() + " and " + targetLocator,
+ new Exception("trace"));
}
}
-
+
+ protected ClientSessionFactoryInternal createSessionFactory() throws Exception
+ {
+ ClientSessionFactoryInternal factory = super.createSessionFactory();
+ factory.getConnection().addFailureListener(new FailureListener()
+ {
+
+ public void connectionFailed(HornetQException exception, boolean failedOver)
+ {
+ if (exception.getCode() == HornetQException.DISCONNECTED)
+ {
+ flowRecord.serverDisconnected();
+ clusterManager.removeClusterLocator(serverLocator);
+ }
+ }
+ });
+ return factory;
+ }
+
@Override
protected ServerMessage beforeForward(ServerMessage message)
{
@@ -247,7 +272,7 @@
ClientMessage message = session.createMessage(false);
- log.debug("Requesting sendQueueInfoToQueue through " + this, new
Exception ("trace"));
+ log.debug("Requesting sendQueueInfoToQueue through " + this, new
Exception("trace"));
ManagementHelper.putOperationInvocation(message,
ResourceNames.CORE_SERVER,
"sendQueueInfoToQueue",
@@ -286,7 +311,6 @@
}
}
-
protected void fail(final boolean permanently)
{
log.debug("Cluster Bridge " + this.getName() + " failed,
permanently=" + permanently);
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-08-12
15:10:56 UTC (rev 11198)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-08-13
05:48:57 UTC (rev 11199)
@@ -33,12 +33,15 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.ServerLocatorImpl;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.client.impl.Topology;
+import org.hornetq.core.client.impl.TopologyMember;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
@@ -48,6 +51,7 @@
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.cluster.Bridge;
import org.hornetq.core.server.cluster.ClusterConnection;
+import org.hornetq.core.server.cluster.ClusterManager;
import org.hornetq.core.server.cluster.MessageFlowRecord;
import org.hornetq.core.server.cluster.RemoteQueueBinding;
import org.hornetq.core.server.group.impl.Proposal;
@@ -136,9 +140,9 @@
private final Set<TransportConfiguration> allowableConnections = new
HashSet<TransportConfiguration>();
- private final ClusterManagerImpl manager;
+ private final ClusterManagerInternal manager;
- public ClusterConnectionImpl(final ClusterManagerImpl manager,
+ public ClusterConnectionImpl(final ClusterManagerInternal manager,
final Topology clusterManagerTopology,
final TransportConfiguration[] tcConfigs,
final TransportConfiguration connector,
@@ -657,38 +661,10 @@
final Queue queue,
final boolean start) throws Exception
{
- MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetNodeID, connector,
queueName, queue);
-
- Bridge bridge = createClusteredBridge(record);
-
- if (log.isDebugEnabled())
- {
- log.debug("creating record between " + this.connector + " and
" + connector + bridge);
- }
-
- record.setBridge(bridge);
-
- records.put(targetNodeID, record);
-
- if (start)
- {
- bridge.start();
- }
- }
-
- /**
- * @param record
- * @return
- * @throws Exception
- */
- protected Bridge createClusteredBridge(MessageFlowRecordImpl record) throws Exception
- {
-
- final ServerLocatorInternal targetLocator = new
ServerLocatorImpl(this.clusterManagerTopology,
+ Topology topology = new Topology(null);
+ final ServerLocatorInternal targetLocator = new ServerLocatorImpl(topology,
false,
-
server.getThreadPool(),
-
server.getScheduledPool(),
-
record.getConnector());
+ connector);
targetLocator.setReconnectAttempts(0);
@@ -700,7 +676,6 @@
targetLocator.setConfirmationWindowSize(confirmationWindowSize);
targetLocator.setBlockOnDurableSend(!useDuplicateDetection);
targetLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
- targetLocator.setClusterConnection(true);
targetLocator.setRetryInterval(retryInterval);
targetLocator.setMaxRetryInterval(maxRetryInterval);
@@ -714,10 +689,37 @@
{
targetLocator.setRetryInterval(retryInterval);
}
+
+ targetLocator.disableFinalizeCheck();
+
+ targetLocator.connect();
+
- manager.addClusterLocator(targetLocator);
+ MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetLocator,
targetNodeID, connector, queueName, queue);
+ topology.setOwner(record);
+
+ // Establish a proxy to the main topology.
+ // We are going to listen for adds and removes on the bridges as well
+ topology.addClusterTopologyListener(new ClusterTopologyListener(){
+
+ public void nodeDown(String nodeID)
+ {
+ clusterManagerTopology.removeMember(nodeID);
+ }
+
+ public void nodeUP(String nodeID,
+ Pair<TransportConfiguration, TransportConfiguration>
connectorPair,
+ boolean last)
+ {
+ clusterManagerTopology.addMember(nodeID,new TopologyMember(connectorPair),
last);
+ }
+
+ });
+
+
ClusterConnectionBridge bridge = new ClusterConnectionBridge(this,
+ manager,
targetLocator,
serverLocator,
reconnectAttempts,
@@ -745,20 +747,36 @@
targetLocator.setIdentity("(Cluster-connection-bridge::" +
bridge.toString() + "::" + this.toString() + ")");
- return bridge;
+ if (log.isDebugEnabled())
+ {
+ log.debug("creating record between " + this.connector + " and
" + connector + bridge);
+ }
+
+ record.setBridge(bridge);
+
+ records.put(targetNodeID, record);
+
+ if (start)
+ {
+ bridge.start();
+ }
}
-
+
// Inner classes
-----------------------------------------------------------------------------------
private class MessageFlowRecordImpl implements MessageFlowRecord
{
- private Bridge bridge;
+ private BridgeImpl bridge;
private final String targetNodeID;
private final TransportConfiguration connector;
+
+ private final ServerLocatorInternal targetLocator;
private final SimpleString queueName;
+
+ private boolean disconnected = false;;
private final Queue queue;
@@ -768,11 +786,13 @@
private volatile boolean firstReset = false;
- public MessageFlowRecordImpl(final String targetNodeID,
+ public MessageFlowRecordImpl(final ServerLocatorInternal targetLocator,
+ final String targetNodeID,
final TransportConfiguration connector,
final SimpleString queueName,
final Queue queue)
{
+ this.targetLocator = targetLocator;
this.queue = queue;
this.targetNodeID = targetNodeID;
this.connector = connector;
@@ -798,6 +818,11 @@
firstReset +
"]";
}
+
+ public void serverDisconnected()
+ {
+ this.disconnected = true;
+ }
public String getAddress()
{
@@ -850,8 +875,35 @@
isClosed = true;
clearBindings();
+
+ if (disconnected)
+ {
+ bridge.disconnect();
+ }
bridge.stop();
+
+ bridge.getExecutor().execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ if (disconnected)
+ {
+ targetLocator.cleanup();
+ }
+ else
+ {
+ targetLocator.close();
+ }
+ }
+ catch (Exception ignored)
+ {
+ log.debug(ignored.getMessage(), ignored);
+ }
+ }
+ });
}
public boolean isClosed()
@@ -864,7 +916,7 @@
clearBindings();
}
- public void setBridge(final Bridge bridge)
+ public void setBridge(final BridgeImpl bridge)
{
this.bridge = bridge;
}
@@ -1306,8 +1358,6 @@
}
return new ServerLocatorImpl(clusterManagerTopology,
true,
- server.getThreadPool(),
- server.getScheduledPool(),
tcConfigs);
}
else
@@ -1340,8 +1390,6 @@
{
return new ServerLocatorImpl(clusterManagerTopology,
true,
- server.getThreadPool(),
- server.getScheduledPool(),
dg);
}
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-08-12
15:10:56 UTC (rev 11198)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-08-13
05:48:57 UTC (rev 11199)
@@ -36,7 +36,6 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.client.impl.TopologyMember;
@@ -53,7 +52,6 @@
import org.hornetq.core.server.cluster.Bridge;
import org.hornetq.core.server.cluster.BroadcastGroup;
import org.hornetq.core.server.cluster.ClusterConnection;
-import org.hornetq.core.server.cluster.ClusterManager;
import org.hornetq.core.server.cluster.Transformer;
import org.hornetq.core.server.management.ManagementService;
import org.hornetq.utils.ConcurrentHashSet;
@@ -69,7 +67,7 @@
*
*
*/
-public class ClusterManagerImpl implements ClusterManager
+public class ClusterManagerImpl implements ClusterManagerInternal
{
private static final Logger log = Logger.getLogger(ClusterManagerImpl.class);
@@ -126,6 +124,8 @@
this.executorFactory = executorFactory;
executor = executorFactory.getExecutor();
+
+ topology.setExecutor(executorFactory.getExecutor());
this.server = server;
@@ -482,10 +482,15 @@
}
}
- void addClusterLocator(final ServerLocatorInternal serverLocator)
+ public void addClusterLocator(final ServerLocatorInternal serverLocator)
{
this.clusterLocators.add(serverLocator);
}
+
+ public void removeClusterLocator(final ServerLocatorInternal serverLocator)
+ {
+ this.clusterLocators.remove(serverLocator);
+ }
private synchronized void announceNode()
{
Added:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java
(rev 0)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java 2011-08-13
05:48:57 UTC (rev 11199)
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server.cluster.impl;
+
+import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.server.cluster.ClusterManager;
+
+/**
+ * A ClusterManagerInternal
+ *
+ * @author clebert
+ *
+ *
+ */
+public interface ClusterManagerInternal extends ClusterManager
+{
+ void addClusterLocator(ServerLocatorInternal locator);
+
+ void removeClusterLocator(ServerLocatorInternal locator);
+
+}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-08-12
15:10:56 UTC (rev 11198)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-08-13
05:48:57 UTC (rev 11199)
@@ -523,7 +523,7 @@
queue.isTemporary(),
filterString,
queue.getConsumerCount(),
- queue.getMessageCount());
+ queue.getInstantMessageCount());
}
// make an exception for the management address (see HORNETQ-29)
else if (name.equals(managementAddress))
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-12
15:10:56 UTC (rev 11198)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-13
05:48:57 UTC (rev 11199)
@@ -111,7 +111,7 @@
}
locators = new ServerLocator[ClusterTestBase.MAX_SERVERS];
-
+
// To make sure the test will start with a clean VM
forceGC();
@@ -146,8 +146,6 @@
consumers = new ConsumerHolder[ClusterTestBase.MAX_CONSUMERS];
-
-
nodeManagers = null;
super.tearDown();
@@ -195,7 +193,7 @@
{
return consumers[node].consumer;
}
-
+
protected void waitForMessages(final int node, final String address, final int count)
throws Exception
{
HornetQServer server = servers[node];
@@ -247,37 +245,41 @@
throw new IllegalStateException(msg);
}
-
+
protected void waitForTopology(final HornetQServer server, final int nodes) throws
Exception
{
waitForTopology(server, nodes, WAIT_TIMEOUT);
}
-
+
protected void waitForTopology(final HornetQServer server, final int nodes, final long
timeout) throws Exception
{
log.debug("waiting for " + nodes + " on the topology for server =
" + server);
+ long start = System.currentTimeMillis();
- long start = System.currentTimeMillis();
-
Topology topology = server.getClusterManager().getTopology();
do
{
if (nodes == topology.getMembers().size())
{
- return;
+ return;
}
Thread.sleep(10);
}
while (System.currentTimeMillis() - start < timeout);
-
- String msg = "Timed out waiting for cluster topology of " + nodes +
" (received " + topology.getMembers().size() + ") topology = " +
topology + ")";
+ String msg = "Timed out waiting for cluster topology of " + nodes +
+ " (received " +
+ topology.getMembers().size() +
+ ") topology = " +
+ topology +
+ ")";
+
ClusterTestBase.log.error(msg);
-
- throw new Exception (msg);
+
+ throw new Exception(msg);
}
protected void waitForBindings(final int node,
@@ -340,9 +342,15 @@
}
while (System.currentTimeMillis() - start < ClusterTestBase.WAIT_TIMEOUT);
- String msg = "Timed out waiting for bindings (bindingCount = " +
bindingCount + " (expecting " + expectedBindingCount + ") "+
+ String msg = "Timed out waiting for bindings (bindingCount = " +
bindingCount +
+ " (expecting " +
+ expectedBindingCount +
+ ") " +
", totConsumers = " +
- totConsumers + " (expecting " + expectedConsumerCount +
")" +
+ totConsumers +
+ " (expecting " +
+ expectedConsumerCount +
+ ")" +
")";
ClusterTestBase.log.error(msg);
@@ -364,7 +372,7 @@
StringWriter writer = new StringWriter();
PrintWriter out = new PrintWriter(writer);
-
+
try
{
for (HornetQServer hornetQServer : servers)
@@ -372,35 +380,38 @@
if (hornetQServer != null)
{
out.println(clusterDescription(hornetQServer));
- out.println(debugBindings(hornetQServer,
hornetQServer.getConfiguration().getManagementNotificationAddress().toString()));
+ out.println(debugBindings(hornetQServer, hornetQServer.getConfiguration()
+
.getManagementNotificationAddress()
+ .toString()));
}
}
-
+
for (HornetQServer hornetQServer : servers)
{
out.println("Management bindings on " + hornetQServer);
if (hornetQServer != null)
{
- out.println(debugBindings(hornetQServer,
hornetQServer.getConfiguration().getManagementNotificationAddress().toString()));
+ out.println(debugBindings(hornetQServer, hornetQServer.getConfiguration()
+
.getManagementNotificationAddress()
+ .toString()));
}
}
}
catch (Throwable dontCare)
{
}
-
+
logAndSystemOut(writer.toString());
-
+
throw new IllegalStateException(msg);
}
-
-
+
protected String debugBindings(final HornetQServer server, final String address)
throws Exception
{
-
+
StringWriter str = new StringWriter();
PrintWriter out = new PrintWriter(str);
-
+
if (server == null)
{
return "server is shutdown";
@@ -414,7 +425,7 @@
Bindings bindings = po.getBindingsForAddress(new SimpleString(address));
out.println("=======================================================================");
- out.println("Binding information for address = " + address + " on
" + server);
+ out.println("Binding information for address = " + address + " on
" + server);
for (Binding binding : bindings.getBindings())
{
@@ -423,7 +434,7 @@
out.println("Binding = " + qBinding + ", queue=" +
qBinding.getQueue());
}
out.println("=======================================================================");
-
+
return str.toString();
}
@@ -449,7 +460,7 @@
{
filterString = ClusterTestBase.FILTER_PROP.toString() + "='" +
filterVal + "'";
}
-
+
log.info("Creating " + queueName + " , address " + address +
" on " + servers[node]);
session.createQueue(address, queueName, filterString, durable);
@@ -538,7 +549,7 @@
if (holder != null)
{
holder.consumer.close();
- // holder.session.close();
+ // holder.session.close();
consumers[i] = null;
}
@@ -621,13 +632,13 @@
message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
producer.send(message);
-
+
if (i % 100 == 0)
{
session.commit();
}
}
-
+
session.commit();
}
finally
@@ -851,10 +862,9 @@
for (int j = msgStart; j < msgEnd; j++)
{
-
+
ClientMessage message = holder.consumer.receive(WAIT_TIMEOUT);
-
-
+
if (message == null)
{
ClusterTestBase.log.info("*** dumping consumers:");
@@ -863,9 +873,9 @@
Assert.fail("consumer " + consumerID + " did not receive
message " + j);
}
-
- log.info("msg on ClusterTestBase = " + message);
+ log.info("msg on ClusterTestBase = " + message);
+
if (ack)
{
message.acknowledge();
@@ -880,15 +890,17 @@
{
if (firstOutOfOrderMessage == null)
{
- firstOutOfOrderMessage = "expected " + j + " received
" + message.getObjectProperty(ClusterTestBase.COUNT_PROP);
+ firstOutOfOrderMessage = "expected " + j +
+ " received " +
+
message.getObjectProperty(ClusterTestBase.COUNT_PROP);
}
outOfOrder = true;
System.out.println("Message j=" + j +
" was received out of order = " +
message.getObjectProperty(ClusterTestBase.COUNT_PROP));
log.info("Message j=" + j +
- " was received out of order = " +
-
message.getObjectProperty(ClusterTestBase.COUNT_PROP));
+ " was received out of order = " +
+ message.getObjectProperty(ClusterTestBase.COUNT_PROP));
}
}
}
@@ -908,7 +920,7 @@
}
}
}
-
+
protected String clusterDescription(HornetQServer server)
{
String br = "-------------------------\n";
@@ -990,28 +1002,28 @@
if (consumerIDs[count] >= 0)
{
ConsumerHolder holder = consumers[consumerIDs[count]];
-
+
if (holder == null)
{
throw new IllegalArgumentException("No consumer at " +
consumerIDs[i]);
}
-
+
ClientMessage message = holder.consumer.receive(WAIT_TIMEOUT);
-
+
message.acknowledge();
-
+
consumers[consumerIDs[count]].session.commit();
-
+
System.out.println("Msg: " + message);
-
+
Assert.assertNotNull("consumer " + consumerIDs[count] + " did
not receive message " + i, message);
-
+
Assert.assertEquals("consumer " + consumerIDs[count] + "
message " + i,
i,
message.getObjectProperty(ClusterTestBase.COUNT_PROP));
}
-
+
count++;
if (count == consumerIDs.length)
@@ -1277,7 +1289,7 @@
{
res[j++] = i;
}
-
+
if (ack)
{
// just to flush acks
@@ -1340,11 +1352,11 @@
locators[node].setBlockOnDurableSend(true);
ClientSessionFactory sf = locators[node].createSessionFactory();
- sf.createSession().close();
+ ClientSession session = sf.createSession();
+ session.close();
sfs[node] = sf;
}
-
protected void setupSessionFactory(final int node, final boolean netty, int
reconnectAttempts) throws Exception
{
if (sfs[node] != null)
@@ -1395,7 +1407,6 @@
serverTotc = new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY,
params);
}
-
locators[node] = HornetQClient.createServerLocatorWithHA(serverTotc);
locators[node].setRetryInterval(100);
locators[node].setRetryIntervalMultiplier(1d);
@@ -1431,79 +1442,79 @@
final boolean fileStorage,
final boolean sharedStorage,
final boolean netty)
+ {
+ if (servers[node] != null)
{
- if (servers[node] != null)
- {
- throw new IllegalArgumentException("Already a server at node " +
node);
- }
+ throw new IllegalArgumentException("Already a server at node " +
node);
+ }
- Configuration configuration = createBasicConfig();
+ Configuration configuration = createBasicConfig();
- configuration.setSecurityEnabled(false);
- configuration.setJournalMinFiles(2);
- configuration.setJournalMaxIO_AIO(1000);
- configuration.setJournalFileSize(100 * 1024);
- configuration.setJournalType(getDefaultJournalType());
- configuration.setSharedStore(sharedStorage);
- configuration.setThreadPoolMaxSize(10);
+ configuration.setSecurityEnabled(false);
+ configuration.setJournalMinFiles(2);
+ configuration.setJournalMaxIO_AIO(1000);
+ configuration.setJournalFileSize(100 * 1024);
+ configuration.setJournalType(getDefaultJournalType());
+ configuration.setSharedStore(sharedStorage);
+ configuration.setThreadPoolMaxSize(10);
+ if (sharedStorage)
+ {
+ // Shared storage will share the node between the backup and live node
+ configuration.setBindingsDirectory(getBindingsDir(node, false));
+ configuration.setJournalDirectory(getJournalDir(node, false));
+ configuration.setPagingDirectory(getPageDir(node, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+ }
+ else
+ {
+ configuration.setBindingsDirectory(getBindingsDir(node, true));
+ configuration.setJournalDirectory(getJournalDir(node, true));
+ configuration.setPagingDirectory(getPageDir(node, true));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+ }
+ configuration.setClustered(true);
+ configuration.setJournalCompactMinFiles(0);
+
+ configuration.getAcceptorConfigurations().clear();
+ configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty,
true, generateParams(node,
+
netty)));
+
+ HornetQServer server;
+
+ if (fileStorage)
+ {
if (sharedStorage)
{
- // Shared storage will share the node between the backup and live node
- configuration.setBindingsDirectory(getBindingsDir(node, false));
- configuration.setJournalDirectory(getJournalDir(node, false));
- configuration.setPagingDirectory(getPageDir(node, false));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+ server = createInVMFailoverServer(true, configuration, nodeManagers[node],
node);
}
else
{
- configuration.setBindingsDirectory(getBindingsDir(node, true));
- configuration.setJournalDirectory(getJournalDir(node, true));
- configuration.setPagingDirectory(getPageDir(node, true));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+ server = HornetQServers.newHornetQServer(configuration);
+ server.setIdentity("Server " + node);
}
- configuration.setClustered(true);
- configuration.setJournalCompactMinFiles(0);
-
- configuration.getAcceptorConfigurations().clear();
-
configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true,
generateParams(node, netty)));
-
- HornetQServer server;
-
- if (fileStorage)
+ }
+ else
+ {
+ if (sharedStorage)
{
- if (sharedStorage)
- {
- server = createInVMFailoverServer(true, configuration, nodeManagers[node],
node);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration);
- server.setIdentity("Server " + node);
- }
+ server = createInVMFailoverServer(false, configuration, nodeManagers[node],
node);
}
else
{
- if (sharedStorage)
- {
- server = createInVMFailoverServer(false, configuration,
nodeManagers[node], node);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration, false);
- server.setIdentity("Server " + node);
- }
+ server = HornetQServers.newHornetQServer(configuration, false);
+ server.setIdentity("Server " + node);
}
-
- server.setIdentity(this.getClass().getSimpleName() + "/Live(" + node +
")");
- servers[node] = server;
}
+ server.setIdentity(this.getClass().getSimpleName() + "/Live(" + node +
")");
+ servers[node] = server;
+ }
- protected void setupBackupServer(final int node,
- final int liveNode,
- final boolean fileStorage,
- final boolean sharedStorage,
- final boolean netty)
+ protected void setupBackupServer(final int node,
+ final int liveNode,
+ final boolean fileStorage,
+ final boolean sharedStorage,
+ final boolean netty)
{
if (servers[node] != null)
{
@@ -1540,7 +1551,7 @@
configuration.getAcceptorConfigurations().clear();
TransportConfiguration acceptorConfig = createTransportConfiguration(netty, true,
generateParams(node, netty));
configuration.getAcceptorConfigurations().add(acceptorConfig);
- //add backup connector
+ // add backup connector
TransportConfiguration liveConfig = createTransportConfiguration(netty, false,
generateParams(liveNode, netty));
configuration.getConnectorConfigurations().put(liveConfig.getName(), liveConfig);
TransportConfiguration backupConfig = createTransportConfiguration(netty, false,
generateParams(node, netty));
@@ -1577,174 +1588,183 @@
}
protected void setupLiveServerWithDiscovery(final int node,
- final String groupAddress,
- final int port,
- final boolean fileStorage,
- final boolean netty,
- final boolean sharedStorage)
- {
- if (servers[node] != null)
- {
- throw new IllegalArgumentException("Already a server at node " +
node);
- }
+ final String groupAddress,
+ final int port,
+ final boolean fileStorage,
+ final boolean netty,
+ final boolean sharedStorage)
+ {
+ if (servers[node] != null)
+ {
+ throw new IllegalArgumentException("Already a server at node " +
node);
+ }
- Configuration configuration = createBasicConfig();
+ Configuration configuration = createBasicConfig();
- configuration.setSecurityEnabled(false);
- configuration.setBindingsDirectory(getBindingsDir(node, false));
- configuration.setJournalMinFiles(2);
- configuration.setJournalDirectory(getJournalDir(node, false));
- configuration.setJournalFileSize(100 * 1024);
- configuration.setJournalType(getDefaultJournalType());
- configuration.setJournalMaxIO_AIO(1000);
- configuration.setPagingDirectory(getPageDir(node, false));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
- configuration.setClustered(true);
- configuration.setBackup(false);
+ configuration.setSecurityEnabled(false);
+ configuration.setBindingsDirectory(getBindingsDir(node, false));
+ configuration.setJournalMinFiles(2);
+ configuration.setJournalDirectory(getJournalDir(node, false));
+ configuration.setJournalFileSize(100 * 1024);
+ configuration.setJournalType(getDefaultJournalType());
+ configuration.setJournalMaxIO_AIO(1000);
+ configuration.setPagingDirectory(getPageDir(node, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+ configuration.setClustered(true);
+ configuration.setBackup(false);
- configuration.getAcceptorConfigurations().clear();
+ configuration.getAcceptorConfigurations().clear();
- Map<String, Object> params = generateParams(node, netty);
+ Map<String, Object> params = generateParams(node, netty);
- configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty,
true, params));
+ configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty,
true, params));
- TransportConfiguration connector = createTransportConfiguration(netty, false,
params);
- configuration.getConnectorConfigurations().put(connector.getName(), connector);
+ TransportConfiguration connector = createTransportConfiguration(netty, false,
params);
+ configuration.getConnectorConfigurations().put(connector.getName(), connector);
- List<String> connectorPairs = new ArrayList<String>();
- connectorPairs.add(connector.getName());
+ List<String> connectorPairs = new ArrayList<String>();
+ connectorPairs.add(connector.getName());
- BroadcastGroupConfiguration bcConfig = new
BroadcastGroupConfiguration("bg1",
- null,
- -1,
-
groupAddress,
- port,
- 1000,
-
connectorPairs);
+ BroadcastGroupConfiguration bcConfig = new
BroadcastGroupConfiguration("bg1",
+ null,
+ -1,
+
groupAddress,
+ port,
+ 1000,
+
connectorPairs);
- configuration.getBroadcastGroupConfigurations().add(bcConfig);
+ configuration.getBroadcastGroupConfigurations().add(bcConfig);
- DiscoveryGroupConfiguration dcConfig = new
DiscoveryGroupConfiguration("dg1", null, groupAddress, port, 5000, 5000);
+ DiscoveryGroupConfiguration dcConfig = new
DiscoveryGroupConfiguration("dg1",
+ null,
+
groupAddress,
+ port,
+ 5000,
+ 5000);
- configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(),
dcConfig);
+ configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
- HornetQServer server;
- if (fileStorage)
- {
- if (sharedStorage)
- {
- server = createInVMFailoverServer(true, configuration, nodeManagers[node],
node);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration);
- server.setIdentity("Server " + node);
- }
- }
- else
- {
- if (sharedStorage)
- {
- server = createInVMFailoverServer(false, configuration, nodeManagers[node],
node);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration, false);
- server.setIdentity("Server " + node);
- }
- }
- servers[node] = server;
- }
+ HornetQServer server;
+ if (fileStorage)
+ {
+ if (sharedStorage)
+ {
+ server = createInVMFailoverServer(true, configuration, nodeManagers[node],
node);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration);
+ server.setIdentity("Server " + node);
+ }
+ }
+ else
+ {
+ if (sharedStorage)
+ {
+ server = createInVMFailoverServer(false, configuration, nodeManagers[node],
node);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration, false);
+ server.setIdentity("Server " + node);
+ }
+ }
+ servers[node] = server;
+ }
protected void setupBackupServerWithDiscovery(final int node,
- final int liveNode,
- final String groupAddress,
- final int port,
- final boolean fileStorage,
- final boolean netty,
- final boolean sharedStorage)
- {
- if (servers[node] != null)
- {
- throw new IllegalArgumentException("Already a server at node " +
node);
- }
+ final int liveNode,
+ final String groupAddress,
+ final int port,
+ final boolean fileStorage,
+ final boolean netty,
+ final boolean sharedStorage)
+ {
+ if (servers[node] != null)
+ {
+ throw new IllegalArgumentException("Already a server at node " +
node);
+ }
- Configuration configuration = createBasicConfig();
+ Configuration configuration = createBasicConfig();
- configuration.setSecurityEnabled(false);
- configuration.setSharedStore(sharedStorage);
- if (sharedStorage)
- {
- // Shared storage will share the node between the backup and live node
- configuration.setBindingsDirectory(getBindingsDir(liveNode, false));
- configuration.setJournalDirectory(getJournalDir(liveNode, false));
- configuration.setPagingDirectory(getPageDir(liveNode, false));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(liveNode,
false));
- }
- else
- {
- configuration.setBindingsDirectory(getBindingsDir(node, true));
- configuration.setJournalDirectory(getJournalDir(node, true));
- configuration.setPagingDirectory(getPageDir(node, true));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
- }
- configuration.setClustered(true);
- configuration.setBackup(true);
+ configuration.setSecurityEnabled(false);
+ configuration.setSharedStore(sharedStorage);
+ if (sharedStorage)
+ {
+ // Shared storage will share the node between the backup and live node
+ configuration.setBindingsDirectory(getBindingsDir(liveNode, false));
+ configuration.setJournalDirectory(getJournalDir(liveNode, false));
+ configuration.setPagingDirectory(getPageDir(liveNode, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(liveNode, false));
+ }
+ else
+ {
+ configuration.setBindingsDirectory(getBindingsDir(node, true));
+ configuration.setJournalDirectory(getJournalDir(node, true));
+ configuration.setPagingDirectory(getPageDir(node, true));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+ }
+ configuration.setClustered(true);
+ configuration.setBackup(true);
- configuration.getAcceptorConfigurations().clear();
+ configuration.getAcceptorConfigurations().clear();
- Map<String, Object> params = generateParams(node, netty);
+ Map<String, Object> params = generateParams(node, netty);
- configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty,
true, params));
+ configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty,
true, params));
- TransportConfiguration connector = createTransportConfiguration(netty, false,
params);
- configuration.getConnectorConfigurations().put(connector.getName(), connector);
+ TransportConfiguration connector = createTransportConfiguration(netty, false,
params);
+ configuration.getConnectorConfigurations().put(connector.getName(), connector);
- List<String> connectorPairs = new ArrayList<String>();
- connectorPairs.add(connector.getName());
+ List<String> connectorPairs = new ArrayList<String>();
+ connectorPairs.add(connector.getName());
- BroadcastGroupConfiguration bcConfig = new
BroadcastGroupConfiguration("bg1",
- null,
- -1,
-
groupAddress,
- port,
- 1000,
-
connectorPairs);
+ BroadcastGroupConfiguration bcConfig = new
BroadcastGroupConfiguration("bg1",
+ null,
+ -1,
+
groupAddress,
+ port,
+ 1000,
+
connectorPairs);
- configuration.getBroadcastGroupConfigurations().add(bcConfig);
+ configuration.getBroadcastGroupConfigurations().add(bcConfig);
- DiscoveryGroupConfiguration dcConfig = new
DiscoveryGroupConfiguration("dg1", null, groupAddress, port, 5000, 5000);
+ DiscoveryGroupConfiguration dcConfig = new
DiscoveryGroupConfiguration("dg1",
+ null,
+
groupAddress,
+ port,
+ 5000,
+ 5000);
- configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(),
dcConfig);
+ configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
- HornetQServer server;
- if (fileStorage)
- {
- if (sharedStorage)
- {
- server = createInVMFailoverServer(true, configuration,
nodeManagers[liveNode], liveNode);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration);
- server.setIdentity("Server " + liveNode);
- }
- }
- else
- {
- if (sharedStorage)
- {
- server = createInVMFailoverServer(false, configuration,
nodeManagers[liveNode], liveNode);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration, false);
- }
- }
- servers[node] = server;
- }
+ HornetQServer server;
+ if (fileStorage)
+ {
+ if (sharedStorage)
+ {
+ server = createInVMFailoverServer(true, configuration,
nodeManagers[liveNode], liveNode);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration);
+ server.setIdentity("Server " + liveNode);
+ }
+ }
+ else
+ {
+ if (sharedStorage)
+ {
+ server = createInVMFailoverServer(false, configuration,
nodeManagers[liveNode], liveNode);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration, false);
+ }
+ }
+ servers[node] = server;
+ }
-
protected void clearServer(final int... nodes)
{
for (int i = 0; i < nodes.length; i++)
@@ -1781,12 +1801,12 @@
{
throw new IllegalStateException("No server at node " + nodeFrom);
}
-
+
TransportConfiguration connectorFrom = createTransportConfiguration(netty, false,
generateParams(nodeFrom, netty));
serverFrom.getConfiguration().getConnectorConfigurations().put(name,
connectorFrom);
List<String> pairs = null;
-
+
if (nodeTo != -1)
{
TransportConfiguration serverTotc = createTransportConfiguration(netty, false,
generateParams(nodeTo, netty));
@@ -1803,11 +1823,11 @@
forwardWhenNoConsumers,
maxHops,
1024,
-
pairs, allowDirectConnectionsOnly);
+
pairs,
+
allowDirectConnectionsOnly);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
-
protected void setupClusterConnection(final String name,
final String address,
final boolean forwardWhenNoConsumers,
@@ -1825,7 +1845,7 @@
TransportConfiguration connectorFrom = createTransportConfiguration(netty, false,
generateParams(nodeFrom, netty));
serverFrom.getConfiguration().getConnectorConfigurations().put(connectorFrom.getName(),
connectorFrom);
-
+
List<String> pairs = new ArrayList<String>();
for (int element : nodesTo)
{
@@ -1863,7 +1883,7 @@
TransportConfiguration connectorFrom = createTransportConfiguration(netty, false,
generateParams(nodeFrom, netty));
serverFrom.getConfiguration().getConnectorConfigurations().put(connectorFrom.getName(),
connectorFrom);
-
+
List<String> pairs = new ArrayList<String>();
for (int element : nodesTo)
{
@@ -1872,21 +1892,21 @@
pairs.add(serverTotc.getName());
}
ClusterConnectionConfiguration clusterConf = new
ClusterConnectionConfiguration(name,
- address,
- connectorFrom.getName(),
- ConfigurationImpl.DEFAULT_CLUSTER_FAILURE_CHECK_PERIOD,
- ConfigurationImpl.DEFAULT_CLUSTER_CONNECTION_TTL,
- retryInterval,
- ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER,
- ConfigurationImpl.DEFAULT_CLUSTER_MAX_RETRY_INTERVAL,
- reconnectAttempts,
- 1000,
- true,
- forwardWhenNoConsumers,
- maxHops,
- 1024,
- pairs,
- false);
+
address,
+
connectorFrom.getName(),
+
ConfigurationImpl.DEFAULT_CLUSTER_FAILURE_CHECK_PERIOD,
+
ConfigurationImpl.DEFAULT_CLUSTER_CONNECTION_TTL,
+
retryInterval,
+
ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER,
+
ConfigurationImpl.DEFAULT_CLUSTER_MAX_RETRY_INTERVAL,
+
reconnectAttempts,
+
1000,
+
true,
+
forwardWhenNoConsumers,
+
maxHops,
+
1024,
+
pairs,
+
false);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
@@ -1906,18 +1926,19 @@
final int maxHops,
TransportConfiguration
connectorFrom,
List<String>
pairs)
- {
- ClusterConnectionConfiguration clusterConf = new
ClusterConnectionConfiguration(name,
-
address,
-
connectorFrom.getName(),
-
250,
-
true,
-
forwardWhenNoConsumers,
-
maxHops,
-
1024,
-
pairs, false);
- return clusterConf;
- }
+ {
+ ClusterConnectionConfiguration clusterConf = new
ClusterConnectionConfiguration(name,
+
address,
+
connectorFrom.getName(),
+
250,
+
true,
+
forwardWhenNoConsumers,
+
maxHops,
+
1024,
+
pairs,
+
false);
+ return clusterConf;
+ }
protected void setupClusterConnectionWithBackups(final String name,
final String address,
@@ -1953,7 +1974,8 @@
forwardWhenNoConsumers,
maxHops,
1024,
-
pairs, false);
+
pairs,
+
false);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
@@ -1975,7 +1997,7 @@
TransportConfiguration connectorConfig = createTransportConfiguration(netty, false,
generateParams(node, netty));
server.getConfiguration().getConnectorConfigurations().put(name, connectorConfig);
-
+
ClusterConnectionConfiguration clusterConf = new
ClusterConnectionConfiguration(name,
address,
name,
@@ -2011,22 +2033,20 @@
* */
Thread.sleep(500);
}
-
-
+
}
- protected void waitForServer(HornetQServer server)
- throws InterruptedException
+ protected void waitForServer(HornetQServer server) throws InterruptedException
{
- long timetowait =System.currentTimeMillis() + 5000;
- while(!server.isStarted())
+ long timetowait = System.currentTimeMillis() + 5000;
+ while (!server.isStarted())
{
Thread.sleep(100);
- if(server.isStarted())
+ if (server.isStarted())
{
break;
}
- else if(System.currentTimeMillis() > timetowait)
+ else if (System.currentTimeMillis() > timetowait)
{
fail("server didnt start");
}
@@ -2049,7 +2069,7 @@
protected void stopServers(final int... nodes) throws Exception
{
- log.info("Stopping nodes " + Arrays.toString(nodes));
+ log.info("Stopping nodes " + Arrays.toString(nodes));
for (int node : nodes)
{
log.info("#test stop server " + node);
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterTest.java 2011-08-12
15:10:56 UTC (rev 11198)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterTest.java 2011-08-13
05:48:57 UTC (rev 11199)
@@ -13,6 +13,8 @@
package org.hornetq.tests.integration.cluster.distribution;
+import org.hornetq.core.logging.Logger;
+
/**
* A NettyFileStorageSymmetricClusterTest
*
@@ -22,17 +24,17 @@
*/
public class NettyFileStorageSymmetricClusterTest extends SymmetricClusterTest
{
+ Logger log = Logger.getLogger(NettyFileStorageSymmetricClusterTest.class);
+
@Override
protected boolean isNetty()
{
return true;
}
-
-
+
protected boolean isFileStorage()
{
return true;
}
-
}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2011-08-12
15:10:56 UTC (rev 11198)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2011-08-13
05:48:57 UTC (rev 11199)
@@ -242,12 +242,6 @@
waitForBindings(3, "queues.testaddress", 1, 1, true);
waitForBindings(4, "queues.testaddress", 1, 1, true);
- System.out.println(clusterDescription(servers[0]));
- System.out.println(clusterDescription(servers[1]));
- System.out.println(clusterDescription(servers[2]));
- System.out.println(clusterDescription(servers[3]));
- System.out.println(clusterDescription(servers[4]));
-
waitForBindings(0, "queues.testaddress", 4, 4, false);
waitForBindings(1, "queues.testaddress", 4, 4, false);
waitForBindings(2, "queues.testaddress", 4, 4, false);
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-08-12
15:10:56 UTC (rev 11198)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-08-13
05:48:57 UTC (rev 11199)
@@ -980,7 +980,7 @@
for (Thread aliveThread : postThreads.keySet())
{
- if (!aliveThread.getName().contains("SunPKCS11") &&
!previousThreads.containsKey(aliveThread))
+ if (!aliveThread.getName().contains("SunPKCS11") &&
!aliveThread.getName().contains("Attach Listener") &&
!previousThreads.containsKey(aliveThread))
{
failedThread = true;
buffer.append("=============================================================================\n");