Author: clebert.suconic(a)jboss.com
Date: 2011-08-30 13:13:32 -0400 (Tue, 30 Aug 2011)
New Revision: 11244
Added:
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/AfterConnectInternalListener.java
Modified:
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
Log:
Adding AfterConnectioninternal to my branch, and adjusting versioning
Added:
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/AfterConnectInternalListener.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/AfterConnectInternalListener.java
(rev 0)
+++
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/AfterConnectInternalListener.java 2011-08-30
17:13:32 UTC (rev 11244)
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+/**
+ * To be called right after the ConnectionFactory created a connection.
+ * This listener is not part of the API and shouldn't be used by users.
+ * (if you do so we can't guarantee any API compatibility on this class)
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public interface AfterConnectInternalListener
+{
+ void onConnection(ClientSessionFactoryInternal sf);
+}
Modified:
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-08-30
11:05:47 UTC (rev 11243)
+++
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-08-30
17:13:32 UTC (rev 11244)
@@ -1293,23 +1293,15 @@
}
channel0.send(new
SubscribeClusterTopologyUpdatesMessageV2(serverLocator.isClusterConnection(),
VersionLoader.getVersion().getIncrementingVersion()));
-
-
- if (serverLocator.isClusterConnection())
- {
- TransportConfiguration config =
serverLocator.getClusterTransportConfiguration();
- if (ClientSessionFactoryImpl.isDebug)
- {
- ClientSessionFactoryImpl.log.debug("Announcing node " +
serverLocator.getNodeID() +
- ", isBackup=" +
- serverLocator.isBackup());
- }
- sendNodeAnnounce(System.currentTimeMillis(), serverLocator.getNodeID(),
serverLocator.isBackup(), config, null);
- //channel0.send(new NodeAnnounceMessage(serverLocator.getNodeID(),
serverLocator.isBackup(), config));
- }
+
}
}
+ if (serverLocator.getAfterConnectInternalListener() != null)
+ {
+ serverLocator.getAfterConnectInternalListener().onConnection(this);
+ }
+
if (ClientSessionFactoryImpl.log.isTraceEnabled())
{
ClientSessionFactoryImpl.log.trace("returning " + connection);
Modified:
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-08-30
11:05:47 UTC (rev 11243)
+++
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-08-30
17:13:32 UTC (rev 11244)
@@ -165,6 +165,8 @@
private Executor startExecutor;
private static ScheduledExecutorService globalScheduledThreadPool;
+
+ private AfterConnectInternalListener afterConnectListener;
private String groupID;
@@ -578,6 +580,19 @@
return sf;
}
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.client.impl.ServerLocatorInternal#setAfterConnectionInternalListener(org.hornetq.core.client.impl.AfterConnectInternalListener)
+ */
+ public void setAfterConnectionInternalListener(AfterConnectInternalListener listener)
+ {
+ this.afterConnectListener = listener;
+ }
+
+ public AfterConnectInternalListener getAfterConnectInternalListener()
+ {
+ return afterConnectListener;
+ }
+
public boolean isClosed()
{
return closed || closing;
Modified:
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2011-08-30
11:05:47 UTC (rev 11243)
+++
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2011-08-30
17:13:32 UTC (rev 11244)
@@ -35,6 +35,10 @@
void factoryClosed(final ClientSessionFactory factory);
+ AfterConnectInternalListener getAfterConnectInternalListener();
+
+ void setAfterConnectionInternalListener(AfterConnectInternalListener listener);
+
/** Used to better identify Cluster Connection Locators on logs while debugging logs
*/
void setIdentity(String identity);
Modified:
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-30
11:05:47 UTC (rev 11243)
+++
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-30
17:13:32 UTC (rev 11244)
@@ -64,7 +64,7 @@
* values are a pair of live/backup transport configurations
*/
private final Map<String, TopologyMember> mapTopology = new
ConcurrentHashMap<String, TopologyMember>();
-
+
private final Map<String, Long> mapDelete = new ConcurrentHashMap<String,
Long>();
public Topology(final Object owner)
@@ -132,14 +132,14 @@
synchronized (this)
{
- // TODO treat versioning here. it should remove any previous version
- // However, if the previous version has a higher time (say if the node time
where the system died), we should
- // use that number ++
-
TopologyMember currentMember = getMember(nodeId);
if (currentMember == null)
{
- log.warn("There's no live to be updated on backup update", new
Exception("trace"));
+ log.warn("There's no live to be updated on backup update,
node=" + nodeId + " memberInput=" + memberInput,
+ new Exception("trace"));
+
+ currentMember = memberInput;
+ mapTopology.put(nodeId, currentMember);
}
TopologyMember newMember = new TopologyMember(currentMember.getConnector().a,
memberInput.getConnector().b);
@@ -162,16 +162,24 @@
*/
public boolean updateMember(final long uniqueEventID, final String nodeId, final
TopologyMember memberInput)
{
+
+ if (memberInput.getConnector().a == null && memberInput.getConnector().b !=
null)
+ {
+ updateBackup(nodeId, memberInput);
+ return true;
+ }
+
Long deleteTme = mapDelete.get(nodeId);
if (deleteTme != null && uniqueEventID < deleteTme)
{
+ log.debug("Update uniqueEvent=" + uniqueEventID +
+ ", nodeId=" +
+ nodeId +
+ ", memberInput=" +
+ memberInput +
+ " being rejected as there was a delete done after that");
return false;
}
-
- if (log.isTraceEnabled())
- {
- // log.trace(this + "::UpdateMember::" + uniqueEventID + ",
nodeID=" + nodeId + ", memberInput=" + memberInput);
- }
synchronized (this)
{
@@ -201,7 +209,8 @@
{
if (log.isDebugEnabled())
{
- log.debug(this + "::updated currentMember=nodeID=" + nodeId
+
+ log.debug(this + "::updated currentMember=nodeID=" +
+ nodeId +
currentMember +
" of memberInput=" +
memberInput);
Modified:
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-08-30
11:05:47 UTC (rev 11243)
+++
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-08-30
17:13:32 UTC (rev 11244)
@@ -159,7 +159,6 @@
try
{
TopologyMember member = clusterManager.getLocalMember();
- factory.sendNodeAnnounce(member.getUniqueEventID(), clusterManager.getNodeId(),
false, member.getConnector().a, member.getConnector().b);
}
catch (Exception e)
{
Modified:
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-08-30
11:05:47 UTC (rev 11243)
+++
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-08-30
17:13:32 UTC (rev 11244)
@@ -33,9 +33,9 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientMessage;
-import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.core.client.impl.AfterConnectInternalListener;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorImpl;
import org.hornetq.core.client.impl.ServerLocatorInternal;
@@ -70,7 +70,7 @@
*
*
*/
-public class ClusterConnectionImpl implements ClusterConnection
+public class ClusterConnectionImpl implements ClusterConnection,
AfterConnectInternalListener
{
private static final Logger log = Logger.getLogger(ClusterConnectionImpl.class);
@@ -412,6 +412,25 @@
started = false;
}
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.client.impl.AfterConnectInternalListener#onConnection(org.hornetq.core.client.impl.ClientSessionFactoryInternal)
+ */
+ public void onConnection(ClientSessionFactoryInternal sf)
+ {
+ TopologyMember localMember = manager.getLocalMember();
+ sf.sendNodeAnnounce(localMember.getUniqueEventID(),
+ manager.getNodeId(),
+ false,
+ localMember.getConnector().a,
+ localMember.getConnector().b);
+
+ // sf.sendNodeAnnounce(System.currentTimeMillis(),
+ // manager.getNodeId(),
+ // false,
+ // localMember.getConnector().a,
+ // localMember.getConnector().b);
+ }
+
public boolean isStarted()
{
return started;
@@ -471,13 +490,13 @@
{
log.debug("DuplicateDetection is disabled, sending clustered messages
blocked");
}
-
+
final TopologyMember currentMember =
clusterManagerTopology.getMember(nodeUUID.toString());
-
+
if (currentMember == null)
{
// sanity check only
- throw new IllegalStateException ("InternalError! The ClusterConnection
doesn't know about its own node = " + this);
+ throw new IllegalStateException("InternalError! The ClusterConnection
doesn't know about its own node = " + this);
}
serverLocator.setNodeID(nodeUUID.toString());
@@ -502,27 +521,9 @@
serverLocator.addClusterTopologyListener(this);
+ serverLocator.setAfterConnectionInternalListener(this);
+
serverLocator.start(server.getExecutorFactory().getExecutor());
-
- /* serverLocator.getExecutor().execute(new Runnable(){
- public void run()
- {
- try
- {
- ClientSessionFactoryInternal csf = serverLocator.connect();
-
- log.info(this + "::YYY " + nodeUUID.toString() + "
Cluster connection " + ClusterConnectionImpl.this +
- " connected, sending announce node, connector=" +
- manager.getLocalMember().getConnector().a + "/" +
manager.getLocalMember().getConnector().b);
-
- csf.sendNodeAnnounce(currentMember.getUniqueEventID(),
nodeUUID.toString(), false, manager.getLocalMember().getConnector().a,
manager.getLocalMember().getConnector().b);
- }
- catch (Exception e)
- {
- log.warn("Error on connectin Cluster connection to other
nodes", e);
- }
- }
- });*/
}
if (managementService != null)
@@ -576,7 +577,7 @@
}
}
- public void nodeUP(final long eventUID,
+ public void nodeUP(final long eventUID,
final String nodeID,
final Pair<TransportConfiguration, TransportConfiguration>
connectorPair,
final boolean last)
@@ -590,14 +591,18 @@
if (nodeID.equals(nodeUUID.toString()))
{
- if (log.isTraceEnabled())
- {
- log.trace(this + "::informing about backup to itself, nodeUUID=" +
nodeUUID + ", connectorPair=" + connectorPair + " this = " + this);
- }
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::informing about backup to itself, nodeUUID=" +
+ nodeUUID +
+ ", connectorPair=" +
+ connectorPair +
+ " this = " +
+ this);
+ }
return;
}
-
// if the node is more than 1 hop away, we do not create a bridge for direct
cluster connection
if (allowDirectConnectionsOnly &&
!allowableConnections.contains(connectorPair.a))
{
@@ -703,6 +708,8 @@
targetLocator.setMaxRetryInterval(maxRetryInterval);
targetLocator.setRetryIntervalMultiplier(retryIntervalMultiplier);
+ targetLocator.setAfterConnectionInternalListener(this);
+
targetLocator.setNodeID(serverLocator.getNodeID());
targetLocator.setClusterTransportConfiguration(serverLocator.getClusterTransportConfiguration());
@@ -713,9 +720,14 @@
}
targetLocator.disableFinalizeCheck();
-
- MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetLocator, eventUID,
targetNodeID, connector, queueName, queue);
+ MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetLocator,
+ eventUID,
+ targetNodeID,
+ connector,
+ queueName,
+ queue);
+
ClusterConnectionBridge bridge = new ClusterConnectionBridge(this,
manager,
targetLocator,
@@ -768,7 +780,7 @@
private BridgeImpl bridge;
private final long eventUID;
-
+
private final String targetNodeID;
private final TransportConfiguration connector;
@@ -831,7 +843,7 @@
{
return address.toString();
}
-
+
/**
* @return the eventUID
*/
@@ -1129,7 +1141,8 @@
// hops is too high
// or there are multiple cluster connections for the same address
- ClusterConnectionImpl.log.warn(this + "::Remote queue binding " +
clusterName +
+ ClusterConnectionImpl.log.warn(this + "::Remote queue binding " +
+ clusterName +
" has already been bound in the post
office. Most likely cause for this is you have a loop " +
"in your cluster due to cluster max-hops
being too large or you have multiple cluster connections to the same nodes using
overlapping addresses");
Modified:
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-08-30
11:05:47 UTC (rev 11243)
+++
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-08-30
17:13:32 UTC (rev 11244)
@@ -100,8 +100,6 @@
private final Map<String, ClusterConnection> clusterConnections = new
HashMap<String, ClusterConnection>();
private final Topology topology = new Topology(this);
-
- private TopologyMember localMember;
private volatile ServerLocatorInternal backupServerLocator;
@@ -173,7 +171,7 @@
public TopologyMember getLocalMember()
{
- return localMember;
+ return topology.getMember(nodeUUID.toString());
}
public String getNodeId()
@@ -301,8 +299,9 @@
{
if (log.isDebugEnabled())
{
- log.debug(this + "::NodeAnnounced, backup=" + backup + nodeID +
connectorPair);
+ log.info(this + "::NodeAnnounced, backup=" + backup + nodeID +
connectorPair);
}
+ System.out.println(this + "::NodeAnnounced, backup=" + backup + nodeID +
connectorPair);
TopologyMember newMember = new TopologyMember(connectorPair.a, connectorPair.b);
newMember.setUniqueEventID(uniqueEventID);
@@ -475,6 +474,7 @@
{
String nodeID = server.getNodeID().toString();
+ TopologyMember localMember;
if (backup)
{
localMember = new TopologyMember(null, nodeConnector);
Modified:
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-30
11:05:47 UTC (rev 11243)
+++
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-30
17:13:32 UTC (rev 11244)
@@ -2029,6 +2029,7 @@
servers[node].setIdentity("server " + node);
log.info("starting server " + servers[node]);
servers[node].start();
+ Thread.sleep(100);
// for (int i = 0 ; i <= node; i++)
// {
Modified:
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2011-08-30
11:05:47 UTC (rev 11243)
+++
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2011-08-30
17:13:32 UTC (rev 11244)
@@ -20,7 +20,6 @@
import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
-
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
@@ -34,6 +33,8 @@
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorImpl;
import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
import org.hornetq.jms.client.HornetQTextMessage;
import org.hornetq.tests.integration.cluster.util.TestableServer;
import org.hornetq.tests.util.ServiceTestBase;
@@ -47,6 +48,7 @@
*/
public abstract class MultipleBackupsFailoverTestBase extends ServiceTestBase
{
+ Logger log = Logger.getLogger(this.getClass());
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
@@ -102,7 +104,7 @@
}
}
}
-
+
try
{
Thread.sleep(100);
@@ -170,6 +172,13 @@
protected ClientSessionFactoryInternal
createSessionFactoryAndWaitForTopology(ServerLocator locator,
int
topologyMembers) throws Exception
{
+ return createSessionFactoryAndWaitForTopology(locator, topologyMembers, null);
+ }
+
+ protected ClientSessionFactoryInternal
createSessionFactoryAndWaitForTopology(ServerLocator locator,
+ int
topologyMembers,
+
HornetQServer server) throws Exception
+ {
ClientSessionFactoryInternal sf;
CountDownLatch countDownLatch = new CountDownLatch(topologyMembers);
@@ -179,12 +188,16 @@
sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
boolean ok = countDownLatch.await(5, TimeUnit.SECONDS);
+ locator.removeClusterTopologyListener(topListener);
if (!ok)
{
- System.out.println(((ServerLocatorInternal)locator).getTopology().describe());
+ log.info("failed topology, Topology on client = " +
(((ServerLocatorInternal)locator).getTopology().describe()));
+ if (server != null)
+ {
+ log.info("failed topology, Topology on server = " +
server.getClusterManager().getTopology().describe());
+ }
}
- locator.removeClusterTopologyListener(topListener);
- assertTrue(ok);
+ assertTrue("expected " + topologyMembers + " members", ok);
return sf;
}
@@ -219,7 +232,10 @@
this.latch = latch;
}
- public void nodeUP(final long uniqueEventID, String nodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
+ public void nodeUP(final long uniqueEventID,
+ String nodeID,
+ Pair<TransportConfiguration, TransportConfiguration>
connectorPair,
+ boolean last)
{
if (connectorPair.a != null &&
!liveNode.contains(connectorPair.a.getName()))
{
Modified:
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2011-08-30
11:05:47 UTC (rev 11243)
+++
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2011-08-30
17:13:32 UTC (rev 11244)
@@ -41,7 +41,7 @@
{
for (TestableServer testableServer : servers.values())
{
- if(testableServer != null)
+ if (testableServer != null)
{
try
{
@@ -55,44 +55,51 @@
}
super.tearDown();
}
-
+
public void testMultipleFailovers2LiveServers() throws Exception
{
- // TODO: remove these sleeps
NodeManager nodeManager1 = new InVMNodeManager();
NodeManager nodeManager2 = new InVMNodeManager();
createLiveConfig(nodeManager1, 0, 3, 4, 5);
- createBackupConfig(nodeManager1, 0, 1, true, new int[] {0, 2}, 3, 4, 5);
- createBackupConfig(nodeManager1, 0, 2, true, new int[] {0, 1}, 3, 4, 5);
+ createBackupConfig(nodeManager1, 0, 1, true, new int[] { 0, 2 }, 3, 4, 5);
+ createBackupConfig(nodeManager1, 0, 2, true, new int[] { 0, 1 }, 3, 4, 5);
createLiveConfig(nodeManager2, 3, 0);
- createBackupConfig(nodeManager2, 3, 4, true, new int[] {3, 5}, 0, 1, 2);
- createBackupConfig(nodeManager2, 3, 5, true, new int[] {3, 4}, 0, 1, 2);
-
- Thread.sleep(500);
+ createBackupConfig(nodeManager2, 3, 4, true, new int[] { 3, 5 }, 0, 1, 2);
+ createBackupConfig(nodeManager2, 3, 5, true, new int[] { 3, 4 }, 0, 1, 2);
+
servers.get(0).start();
- Thread.sleep(500);
+ waitForServer(servers.get(0).getServer());
+
servers.get(3).start();
- Thread.sleep(500);
+ waitForServer(servers.get(3).getServer());
+
servers.get(1).start();
- Thread.sleep(500);
+ waitForServer(servers.get(1).getServer());
+
servers.get(2).start();
- Thread.sleep(500);
+
servers.get(4).start();
- Thread.sleep(500);
+ waitForServer(servers.get(4).getServer());
+
servers.get(5).start();
+
+ waitForServer(servers.get(4).getServer());
+
ServerLocator locator = getServerLocator(0);
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
4);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
4, servers.get(0).getServer());
ClientSession session = sendAndConsume(sf, true);
System.out.println(((ServerLocatorInternal)locator).getTopology().describe());
Thread.sleep(500);
servers.get(0).crash(session);
+ System.out.println("server3 " +
servers.get(3).getServer().getClusterManager().getTopology().describe());
+
int liveAfter0 = waitForNewLive(10000, true, servers, 1, 2);
ServerLocator locator2 = getServerLocator(3);
@@ -139,11 +146,18 @@
}
}
- protected void createBackupConfig(NodeManager nodeManager, int liveNode, int nodeid,
boolean createClusterConnections, int[] otherBackupNodes, int... otherClusterNodes)
+ protected void createBackupConfig(NodeManager nodeManager,
+ int liveNode,
+ int nodeid,
+ boolean createClusterConnections,
+ int[] otherBackupNodes,
+ int... otherClusterNodes)
{
Configuration config1 = super.createDefaultConfig();
config1.getAcceptorConfigurations().clear();
- config1.getAcceptorConfigurations().add(createTransportConfiguration(isNetty(),
true, generateParams(nodeid, isNetty())));
+ config1.getAcceptorConfigurations().add(createTransportConfiguration(isNetty(),
+ true,
+
generateParams(nodeid, isNetty())));
config1.setSecurityEnabled(false);
config1.setSharedStore(true);
config1.setBackup(true);
@@ -152,21 +166,36 @@
List<String> staticConnectors = new ArrayList<String>();
for (int node : otherBackupNodes)
{
- TransportConfiguration liveConnector = createTransportConfiguration(isNetty(),
false, generateParams(node, isNetty()));
+ TransportConfiguration liveConnector = createTransportConfiguration(isNetty(),
+ false,
+
generateParams(node, isNetty()));
config1.getConnectorConfigurations().put(liveConnector.getName(),
liveConnector);
staticConnectors.add(liveConnector.getName());
}
- TransportConfiguration backupConnector = createTransportConfiguration(isNetty(),
false, generateParams(nodeid, isNetty()));
+ TransportConfiguration backupConnector = createTransportConfiguration(isNetty(),
+ false,
+
generateParams(nodeid, isNetty()));
config1.getConnectorConfigurations().put(backupConnector.getName(),
backupConnector);
List<String> clusterNodes = new ArrayList<String>();
for (int node : otherClusterNodes)
{
- TransportConfiguration connector = createTransportConfiguration(isNetty(),
false, generateParams(node, isNetty()));
+ TransportConfiguration connector = createTransportConfiguration(isNetty(),
+ false,
+
generateParams(node, isNetty()));
config1.getConnectorConfigurations().put(connector.getName(), connector);
clusterNodes.add(connector.getName());
}
- ClusterConnectionConfiguration ccc1 = new
ClusterConnectionConfiguration("cluster1", "jms",
backupConnector.getName(), -1, false, false, 1, 1, clusterNodes, false);
+ ClusterConnectionConfiguration ccc1 = new
ClusterConnectionConfiguration("cluster1",
+
"jms",
+
backupConnector.getName(),
+ -1,
+ false,
+ false,
+ 1,
+ 1,
+
clusterNodes,
+ false);
config1.getClusterConfigurations().add(ccc1);
config1.setBindingsDirectory(config1.getBindingsDirectory() + "_" +
liveNode);
@@ -177,25 +206,39 @@
servers.put(nodeid, new SameProcessHornetQServer(createInVMFailoverServer(true,
config1, nodeManager, liveNode)));
}
- protected void createLiveConfig(NodeManager nodeManager, int liveNode, int ...
otherLiveNodes)
+ protected void createLiveConfig(NodeManager nodeManager, int liveNode, int...
otherLiveNodes)
{
- TransportConfiguration liveConnector = createTransportConfiguration(isNetty(),
false, generateParams(liveNode, isNetty()));
+ TransportConfiguration liveConnector = createTransportConfiguration(isNetty(),
+ false,
+
generateParams(liveNode, isNetty()));
Configuration config0 = super.createDefaultConfig();
config0.getAcceptorConfigurations().clear();
- config0.getAcceptorConfigurations().add(createTransportConfiguration(isNetty(),
true, generateParams(liveNode, isNetty())));
+ config0.getAcceptorConfigurations().add(createTransportConfiguration(isNetty(),
+ true,
+
generateParams(liveNode, isNetty())));
config0.setSecurityEnabled(false);
config0.setSharedStore(true);
config0.setClustered(true);
List<String> pairs = new ArrayList<String>();
for (int node : otherLiveNodes)
{
- TransportConfiguration otherLiveConnector =
createTransportConfiguration(isNetty(), false, generateParams(node, isNetty()));
+ TransportConfiguration otherLiveConnector =
createTransportConfiguration(isNetty(),
+ false,
+
generateParams(node, isNetty()));
config0.getConnectorConfigurations().put(otherLiveConnector.getName(),
otherLiveConnector);
- pairs.add(otherLiveConnector.getName());
+ pairs.add(otherLiveConnector.getName());
}
- ClusterConnectionConfiguration ccc0 = new
ClusterConnectionConfiguration("cluster1", "jms",
liveConnector.getName(), -1, false, false, 1, 1,
- pairs, false);
+ ClusterConnectionConfiguration ccc0 = new
ClusterConnectionConfiguration("cluster1",
+
"jms",
+
liveConnector.getName(),
+ -1,
+ false,
+ false,
+ 1,
+ 1,
+ pairs,
+ false);
config0.getClusterConfigurations().add(ccc0);
config0.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
@@ -204,7 +247,8 @@
config0.setPagingDirectory(config0.getPagingDirectory() + "_" +
liveNode);
config0.setLargeMessagesDirectory(config0.getLargeMessagesDirectory() +
"_" + liveNode);
- servers.put(liveNode, new SameProcessHornetQServer(createInVMFailoverServer(true,
config0, nodeManager, liveNode)));
+ servers.put(liveNode,
+ new SameProcessHornetQServer(createInVMFailoverServer(true, config0,
nodeManager, liveNode)));
}
protected boolean isNetty()
Modified:
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java 2011-08-30
11:05:47 UTC (rev 11243)
+++
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java 2011-08-30
17:13:32 UTC (rev 11244)
@@ -25,6 +25,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.cluster.impl.ClusterManagerImpl;
+import org.hornetq.tests.util.ServiceTestBase;
/**
* A SameProcessHornetQServer