JBoss hornetq SVN: r11246 - branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-30 15:53:38 -0400 (Tue, 30 Aug 2011)
New Revision: 11246
Modified:
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/TopologyMember.java
Log:
tweaks on my branch
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-30 17:51:05 UTC (rev 11245)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-30 19:53:38 UTC (rev 11246)
@@ -36,16 +36,12 @@
public class Topology implements Serializable
{
- private static final int BACKOF_TIMEOUT = 500;
-
private static final long serialVersionUID = -9037171688692471371L;
private final Set<ClusterTopologyListener> topologyListeners = new HashSet<ClusterTopologyListener>();
private static final Logger log = Logger.getLogger(Topology.class);
- private transient HashMap<String, Pair<Long, Integer>> mapBackof = new HashMap<String, Pair<Long, Integer>>();
-
private Executor executor = null;
/** Used to debug operations.
@@ -142,7 +138,7 @@
mapTopology.put(nodeId, currentMember);
}
- TopologyMember newMember = new TopologyMember(currentMember.getConnector().a, memberInput.getConnector().b);
+ TopologyMember newMember = new TopologyMember(currentMember.getA(), memberInput.getB());
newMember.setUniqueEventID(System.currentTimeMillis());
mapTopology.remove(nodeId);
mapTopology.put(nodeId, newMember);
@@ -162,12 +158,6 @@
*/
public boolean updateMember(final long uniqueEventID, final String nodeId, final TopologyMember memberInput)
{
-
-// if (memberInput.getConnector().a == null && memberInput.getConnector().b != null)
-// {
-// updateBackup(nodeId, memberInput);
-// return true;
-// }
Long deleteTme = mapDelete.get(nodeId);
if (deleteTme != null && uniqueEventID < deleteTme)
@@ -216,7 +206,22 @@
memberInput);
}
- TopologyMember newMember = new TopologyMember(memberInput.getConnector().a, memberInput.getConnector().b);
+ TopologyMember newMember = new TopologyMember(currentMember.getA(), memberInput.getB());
+
+ if (memberInput.getA() == null && memberInput.getB() != null)
+ {
+ // Updating what appears to be a backup update
+ newMember.setA(currentMember.getA());
+ }
+ else
+ if (currentMember.getA() == null && currentMember.getB() != null && newMember.getA() != null && newMember.getB() == null)
+ {
+ // This is a situation where we have:
+ // CurrentMember (null, X) && Input(X, null)
+ // This means the backup has arrived before, hence we need to merge the results
+ newMember.setA(currentMember.getA());
+ }
+
newMember.setUniqueEventID(uniqueEventID);
mapTopology.remove(nodeId);
mapTopology.put(nodeId, newMember);
@@ -276,46 +281,6 @@
}
/**
- * @param nodeId
- * @param backOfData
- */
- private boolean testBackof(final String nodeId)
- {
- Pair<Long, Integer> backOfData = mapBackof.get(nodeId);
-
- if (backOfData != null)
- {
- backOfData.b += 1;
-
- long timeDiff = System.currentTimeMillis() - backOfData.a;
-
- // To prevent a loop where nodes are being considered down and up
- if (backOfData.b > 5 && timeDiff < BACKOF_TIMEOUT)
- {
-
- // The cluster may get in loop without this..
- // Case one node is stll sending nodeDown while another member is sending nodeUp
- log.warn(backOfData.b + ", The topology controller identified a blast events and it's interrupting the flow of the loop, nodeID=" +
- nodeId +
- ", topologyInstance=" +
- this,
- new Exception("this exception is just to trace location"));
- return false;
- }
- else if (timeDiff < BACKOF_TIMEOUT)
- {
- log.warn(this + "::Simple blast of " + nodeId, new Exception("this exception is just to trace location"));
- }
- else if (timeDiff >= BACKOF_TIMEOUT)
- {
- mapBackof.remove(nodeId);
- }
- }
-
- return true;
- }
-
- /**
* @return
*/
private ArrayList<ClusterTopologyListener> copyListeners()
@@ -500,11 +465,11 @@
int count = 0;
for (TopologyMember member : mapTopology.values())
{
- if (member.getConnector().a != null)
+ if (member.getA() != null)
{
count++;
}
- if (member.getConnector().b != null)
+ if (member.getB() != null)
{
count++;
}
@@ -551,32 +516,13 @@
this.owner = owner;
}
- private boolean hasChanged(final String debugInfo, final TransportConfiguration a, final TransportConfiguration b)
- {
- boolean changed = a == null && b != null || a != null && b != null && !a.equals(b);
-
- if (log.isTraceEnabled())
- {
-
- log.trace(this + "::Validating current=" +
- a +
- " != input=" +
- b +
- (changed ? " and it has changed" : " and it didn't change") +
- ", for validation of " +
- debugInfo);
- }
-
- return changed;
- }
-
public TransportConfiguration getBackupForConnector(final TransportConfiguration connectorConfiguration)
{
for (TopologyMember member : mapTopology.values())
{
- if (member.getConnector().a != null && member.getConnector().a.equals(connectorConfiguration))
+ if (member.getA() != null && member.getA().equals(connectorConfiguration))
{
- return member.getConnector().b;
+ return member.getB();
}
}
return null;
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/TopologyMember.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/TopologyMember.java 2011-08-30 17:51:05 UTC (rev 11245)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/TopologyMember.java 2011-08-30 19:53:38 UTC (rev 11246)
@@ -41,7 +41,27 @@
this(new Pair<TransportConfiguration, TransportConfiguration>(a, b));
}
+
+ public TransportConfiguration getA()
+ {
+ return connector.a;
+ }
+ public TransportConfiguration getB()
+ {
+ return connector.b;
+ }
+
+ public void setB(TransportConfiguration param)
+ {
+ this.connector.b = param;
+ }
+
+ public void setA(TransportConfiguration param)
+ {
+ this.connector.a = param;
+ }
+
/**
* @return the uniqueEventID
*/
12 years, 8 months
JBoss hornetq SVN: r11245 - branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-30 13:51:05 -0400 (Tue, 30 Aug 2011)
New Revision: 11245
Modified:
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java
Log:
just a tweak for my tests
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-30 17:13:32 UTC (rev 11244)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-30 17:51:05 UTC (rev 11245)
@@ -163,11 +163,11 @@
public boolean updateMember(final long uniqueEventID, final String nodeId, final TopologyMember memberInput)
{
- if (memberInput.getConnector().a == null && memberInput.getConnector().b != null)
- {
- updateBackup(nodeId, memberInput);
- return true;
- }
+// if (memberInput.getConnector().a == null && memberInput.getConnector().b != null)
+// {
+// updateBackup(nodeId, memberInput);
+// return true;
+// }
Long deleteTme = mapDelete.get(nodeId);
if (deleteTme != null && uniqueEventID < deleteTme)
12 years, 8 months
JBoss hornetq SVN: r11244 - in branches/Branch_2_2_EAP_cluster_clean3: src/main/org/hornetq/core/server/cluster/impl and 3 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-30 13:13:32 -0400 (Tue, 30 Aug 2011)
New Revision: 11244
Added:
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/AfterConnectInternalListener.java
Modified:
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
Log:
Adding AfterConnectioninternal to my branch, and adjusting versioning
Added: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/AfterConnectInternalListener.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/AfterConnectInternalListener.java (rev 0)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/AfterConnectInternalListener.java 2011-08-30 17:13:32 UTC (rev 11244)
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+/**
+ * To be called right after the ConnectionFactory created a connection.
+ * This listener is not part of the API and shouldn't be used by users.
+ * (if you do so we can't guarantee any API compatibility on this class)
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public interface AfterConnectInternalListener
+{
+ void onConnection(ClientSessionFactoryInternal sf);
+}
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-08-30 11:05:47 UTC (rev 11243)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-08-30 17:13:32 UTC (rev 11244)
@@ -1293,23 +1293,15 @@
}
channel0.send(new SubscribeClusterTopologyUpdatesMessageV2(serverLocator.isClusterConnection(), VersionLoader.getVersion().getIncrementingVersion()));
-
-
- if (serverLocator.isClusterConnection())
- {
- TransportConfiguration config = serverLocator.getClusterTransportConfiguration();
- if (ClientSessionFactoryImpl.isDebug)
- {
- ClientSessionFactoryImpl.log.debug("Announcing node " + serverLocator.getNodeID() +
- ", isBackup=" +
- serverLocator.isBackup());
- }
- sendNodeAnnounce(System.currentTimeMillis(), serverLocator.getNodeID(), serverLocator.isBackup(), config, null);
- //channel0.send(new NodeAnnounceMessage(serverLocator.getNodeID(), serverLocator.isBackup(), config));
- }
+
}
}
+ if (serverLocator.getAfterConnectInternalListener() != null)
+ {
+ serverLocator.getAfterConnectInternalListener().onConnection(this);
+ }
+
if (ClientSessionFactoryImpl.log.isTraceEnabled())
{
ClientSessionFactoryImpl.log.trace("returning " + connection);
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-08-30 11:05:47 UTC (rev 11243)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-08-30 17:13:32 UTC (rev 11244)
@@ -165,6 +165,8 @@
private Executor startExecutor;
private static ScheduledExecutorService globalScheduledThreadPool;
+
+ private AfterConnectInternalListener afterConnectListener;
private String groupID;
@@ -578,6 +580,19 @@
return sf;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.client.impl.ServerLocatorInternal#setAfterConnectionInternalListener(org.hornetq.core.client.impl.AfterConnectInternalListener)
+ */
+ public void setAfterConnectionInternalListener(AfterConnectInternalListener listener)
+ {
+ this.afterConnectListener = listener;
+ }
+
+ public AfterConnectInternalListener getAfterConnectInternalListener()
+ {
+ return afterConnectListener;
+ }
+
public boolean isClosed()
{
return closed || closing;
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2011-08-30 11:05:47 UTC (rev 11243)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2011-08-30 17:13:32 UTC (rev 11244)
@@ -35,6 +35,10 @@
void factoryClosed(final ClientSessionFactory factory);
+ AfterConnectInternalListener getAfterConnectInternalListener();
+
+ void setAfterConnectionInternalListener(AfterConnectInternalListener listener);
+
/** Used to better identify Cluster Connection Locators on logs while debugging logs */
void setIdentity(String identity);
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-30 11:05:47 UTC (rev 11243)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-30 17:13:32 UTC (rev 11244)
@@ -64,7 +64,7 @@
* values are a pair of live/backup transport configurations
*/
private final Map<String, TopologyMember> mapTopology = new ConcurrentHashMap<String, TopologyMember>();
-
+
private final Map<String, Long> mapDelete = new ConcurrentHashMap<String, Long>();
public Topology(final Object owner)
@@ -132,14 +132,14 @@
synchronized (this)
{
- // TODO treat versioning here. it should remove any previous version
- // However, if the previous version has a higher time (say if the node time where the system died), we should
- // use that number ++
-
TopologyMember currentMember = getMember(nodeId);
if (currentMember == null)
{
- log.warn("There's no live to be updated on backup update", new Exception("trace"));
+ log.warn("There's no live to be updated on backup update, node=" + nodeId + " memberInput=" + memberInput,
+ new Exception("trace"));
+
+ currentMember = memberInput;
+ mapTopology.put(nodeId, currentMember);
}
TopologyMember newMember = new TopologyMember(currentMember.getConnector().a, memberInput.getConnector().b);
@@ -162,16 +162,24 @@
*/
public boolean updateMember(final long uniqueEventID, final String nodeId, final TopologyMember memberInput)
{
+
+ if (memberInput.getConnector().a == null && memberInput.getConnector().b != null)
+ {
+ updateBackup(nodeId, memberInput);
+ return true;
+ }
+
Long deleteTme = mapDelete.get(nodeId);
if (deleteTme != null && uniqueEventID < deleteTme)
{
+ log.debug("Update uniqueEvent=" + uniqueEventID +
+ ", nodeId=" +
+ nodeId +
+ ", memberInput=" +
+ memberInput +
+ " being rejected as there was a delete done after that");
return false;
}
-
- if (log.isTraceEnabled())
- {
- // log.trace(this + "::UpdateMember::" + uniqueEventID + ", nodeID=" + nodeId + ", memberInput=" + memberInput);
- }
synchronized (this)
{
@@ -201,7 +209,8 @@
{
if (log.isDebugEnabled())
{
- log.debug(this + "::updated currentMember=nodeID=" + nodeId +
+ log.debug(this + "::updated currentMember=nodeID=" +
+ nodeId +
currentMember +
" of memberInput=" +
memberInput);
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-08-30 11:05:47 UTC (rev 11243)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-08-30 17:13:32 UTC (rev 11244)
@@ -159,7 +159,6 @@
try
{
TopologyMember member = clusterManager.getLocalMember();
- factory.sendNodeAnnounce(member.getUniqueEventID(), clusterManager.getNodeId(), false, member.getConnector().a, member.getConnector().b);
}
catch (Exception e)
{
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-08-30 11:05:47 UTC (rev 11243)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-08-30 17:13:32 UTC (rev 11244)
@@ -33,9 +33,9 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientMessage;
-import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.core.client.impl.AfterConnectInternalListener;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorImpl;
import org.hornetq.core.client.impl.ServerLocatorInternal;
@@ -70,7 +70,7 @@
*
*
*/
-public class ClusterConnectionImpl implements ClusterConnection
+public class ClusterConnectionImpl implements ClusterConnection, AfterConnectInternalListener
{
private static final Logger log = Logger.getLogger(ClusterConnectionImpl.class);
@@ -412,6 +412,25 @@
started = false;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.client.impl.AfterConnectInternalListener#onConnection(org.hornetq.core.client.impl.ClientSessionFactoryInternal)
+ */
+ public void onConnection(ClientSessionFactoryInternal sf)
+ {
+ TopologyMember localMember = manager.getLocalMember();
+ sf.sendNodeAnnounce(localMember.getUniqueEventID(),
+ manager.getNodeId(),
+ false,
+ localMember.getConnector().a,
+ localMember.getConnector().b);
+
+ // sf.sendNodeAnnounce(System.currentTimeMillis(),
+ // manager.getNodeId(),
+ // false,
+ // localMember.getConnector().a,
+ // localMember.getConnector().b);
+ }
+
public boolean isStarted()
{
return started;
@@ -471,13 +490,13 @@
{
log.debug("DuplicateDetection is disabled, sending clustered messages blocked");
}
-
+
final TopologyMember currentMember = clusterManagerTopology.getMember(nodeUUID.toString());
-
+
if (currentMember == null)
{
// sanity check only
- throw new IllegalStateException ("InternalError! The ClusterConnection doesn't know about its own node = " + this);
+ throw new IllegalStateException("InternalError! The ClusterConnection doesn't know about its own node = " + this);
}
serverLocator.setNodeID(nodeUUID.toString());
@@ -502,27 +521,9 @@
serverLocator.addClusterTopologyListener(this);
+ serverLocator.setAfterConnectionInternalListener(this);
+
serverLocator.start(server.getExecutorFactory().getExecutor());
-
- /* serverLocator.getExecutor().execute(new Runnable(){
- public void run()
- {
- try
- {
- ClientSessionFactoryInternal csf = serverLocator.connect();
-
- log.info(this + "::YYY " + nodeUUID.toString() + " Cluster connection " + ClusterConnectionImpl.this +
- " connected, sending announce node, connector=" +
- manager.getLocalMember().getConnector().a + "/" + manager.getLocalMember().getConnector().b);
-
- csf.sendNodeAnnounce(currentMember.getUniqueEventID(), nodeUUID.toString(), false, manager.getLocalMember().getConnector().a, manager.getLocalMember().getConnector().b);
- }
- catch (Exception e)
- {
- log.warn("Error on connectin Cluster connection to other nodes", e);
- }
- }
- });*/
}
if (managementService != null)
@@ -576,7 +577,7 @@
}
}
- public void nodeUP(final long eventUID,
+ public void nodeUP(final long eventUID,
final String nodeID,
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
final boolean last)
@@ -590,14 +591,18 @@
if (nodeID.equals(nodeUUID.toString()))
{
- if (log.isTraceEnabled())
- {
- log.trace(this + "::informing about backup to itself, nodeUUID=" + nodeUUID + ", connectorPair=" + connectorPair + " this = " + this);
- }
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::informing about backup to itself, nodeUUID=" +
+ nodeUUID +
+ ", connectorPair=" +
+ connectorPair +
+ " this = " +
+ this);
+ }
return;
}
-
// if the node is more than 1 hop away, we do not create a bridge for direct cluster connection
if (allowDirectConnectionsOnly && !allowableConnections.contains(connectorPair.a))
{
@@ -703,6 +708,8 @@
targetLocator.setMaxRetryInterval(maxRetryInterval);
targetLocator.setRetryIntervalMultiplier(retryIntervalMultiplier);
+ targetLocator.setAfterConnectionInternalListener(this);
+
targetLocator.setNodeID(serverLocator.getNodeID());
targetLocator.setClusterTransportConfiguration(serverLocator.getClusterTransportConfiguration());
@@ -713,9 +720,14 @@
}
targetLocator.disableFinalizeCheck();
-
- MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetLocator, eventUID, targetNodeID, connector, queueName, queue);
+ MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetLocator,
+ eventUID,
+ targetNodeID,
+ connector,
+ queueName,
+ queue);
+
ClusterConnectionBridge bridge = new ClusterConnectionBridge(this,
manager,
targetLocator,
@@ -768,7 +780,7 @@
private BridgeImpl bridge;
private final long eventUID;
-
+
private final String targetNodeID;
private final TransportConfiguration connector;
@@ -831,7 +843,7 @@
{
return address.toString();
}
-
+
/**
* @return the eventUID
*/
@@ -1129,7 +1141,8 @@
// hops is too high
// or there are multiple cluster connections for the same address
- ClusterConnectionImpl.log.warn(this + "::Remote queue binding " + clusterName +
+ ClusterConnectionImpl.log.warn(this + "::Remote queue binding " +
+ clusterName +
" has already been bound in the post office. Most likely cause for this is you have a loop " +
"in your cluster due to cluster max-hops being too large or you have multiple cluster connections to the same nodes using overlapping addresses");
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-08-30 11:05:47 UTC (rev 11243)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-08-30 17:13:32 UTC (rev 11244)
@@ -100,8 +100,6 @@
private final Map<String, ClusterConnection> clusterConnections = new HashMap<String, ClusterConnection>();
private final Topology topology = new Topology(this);
-
- private TopologyMember localMember;
private volatile ServerLocatorInternal backupServerLocator;
@@ -173,7 +171,7 @@
public TopologyMember getLocalMember()
{
- return localMember;
+ return topology.getMember(nodeUUID.toString());
}
public String getNodeId()
@@ -301,8 +299,9 @@
{
if (log.isDebugEnabled())
{
- log.debug(this + "::NodeAnnounced, backup=" + backup + nodeID + connectorPair);
+ log.info(this + "::NodeAnnounced, backup=" + backup + nodeID + connectorPair);
}
+ System.out.println(this + "::NodeAnnounced, backup=" + backup + nodeID + connectorPair);
TopologyMember newMember = new TopologyMember(connectorPair.a, connectorPair.b);
newMember.setUniqueEventID(uniqueEventID);
@@ -475,6 +474,7 @@
{
String nodeID = server.getNodeID().toString();
+ TopologyMember localMember;
if (backup)
{
localMember = new TopologyMember(null, nodeConnector);
Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-30 11:05:47 UTC (rev 11243)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-30 17:13:32 UTC (rev 11244)
@@ -2029,6 +2029,7 @@
servers[node].setIdentity("server " + node);
log.info("starting server " + servers[node]);
servers[node].start();
+ Thread.sleep(100);
// for (int i = 0 ; i <= node; i++)
// {
Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2011-08-30 11:05:47 UTC (rev 11243)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2011-08-30 17:13:32 UTC (rev 11244)
@@ -20,7 +20,6 @@
import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
-
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
@@ -34,6 +33,8 @@
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorImpl;
import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
import org.hornetq.jms.client.HornetQTextMessage;
import org.hornetq.tests.integration.cluster.util.TestableServer;
import org.hornetq.tests.util.ServiceTestBase;
@@ -47,6 +48,7 @@
*/
public abstract class MultipleBackupsFailoverTestBase extends ServiceTestBase
{
+ Logger log = Logger.getLogger(this.getClass());
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
@@ -102,7 +104,7 @@
}
}
}
-
+
try
{
Thread.sleep(100);
@@ -170,6 +172,13 @@
protected ClientSessionFactoryInternal createSessionFactoryAndWaitForTopology(ServerLocator locator,
int topologyMembers) throws Exception
{
+ return createSessionFactoryAndWaitForTopology(locator, topologyMembers, null);
+ }
+
+ protected ClientSessionFactoryInternal createSessionFactoryAndWaitForTopology(ServerLocator locator,
+ int topologyMembers,
+ HornetQServer server) throws Exception
+ {
ClientSessionFactoryInternal sf;
CountDownLatch countDownLatch = new CountDownLatch(topologyMembers);
@@ -179,12 +188,16 @@
sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
boolean ok = countDownLatch.await(5, TimeUnit.SECONDS);
+ locator.removeClusterTopologyListener(topListener);
if (!ok)
{
- System.out.println(((ServerLocatorInternal)locator).getTopology().describe());
+ log.info("failed topology, Topology on client = " + (((ServerLocatorInternal)locator).getTopology().describe()));
+ if (server != null)
+ {
+ log.info("failed topology, Topology on server = " + server.getClusterManager().getTopology().describe());
+ }
}
- locator.removeClusterTopologyListener(topListener);
- assertTrue(ok);
+ assertTrue("expected " + topologyMembers + " members", ok);
return sf;
}
@@ -219,7 +232,10 @@
this.latch = latch;
}
- public void nodeUP(final long uniqueEventID, String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
+ public void nodeUP(final long uniqueEventID,
+ String nodeID,
+ Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+ boolean last)
{
if (connectorPair.a != null && !liveNode.contains(connectorPair.a.getName()))
{
Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2011-08-30 11:05:47 UTC (rev 11243)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2011-08-30 17:13:32 UTC (rev 11244)
@@ -41,7 +41,7 @@
{
for (TestableServer testableServer : servers.values())
{
- if(testableServer != null)
+ if (testableServer != null)
{
try
{
@@ -55,44 +55,51 @@
}
super.tearDown();
}
-
+
public void testMultipleFailovers2LiveServers() throws Exception
{
- // TODO: remove these sleeps
NodeManager nodeManager1 = new InVMNodeManager();
NodeManager nodeManager2 = new InVMNodeManager();
createLiveConfig(nodeManager1, 0, 3, 4, 5);
- createBackupConfig(nodeManager1, 0, 1, true, new int[] {0, 2}, 3, 4, 5);
- createBackupConfig(nodeManager1, 0, 2, true, new int[] {0, 1}, 3, 4, 5);
+ createBackupConfig(nodeManager1, 0, 1, true, new int[] { 0, 2 }, 3, 4, 5);
+ createBackupConfig(nodeManager1, 0, 2, true, new int[] { 0, 1 }, 3, 4, 5);
createLiveConfig(nodeManager2, 3, 0);
- createBackupConfig(nodeManager2, 3, 4, true, new int[] {3, 5}, 0, 1, 2);
- createBackupConfig(nodeManager2, 3, 5, true, new int[] {3, 4}, 0, 1, 2);
-
- Thread.sleep(500);
+ createBackupConfig(nodeManager2, 3, 4, true, new int[] { 3, 5 }, 0, 1, 2);
+ createBackupConfig(nodeManager2, 3, 5, true, new int[] { 3, 4 }, 0, 1, 2);
+
servers.get(0).start();
- Thread.sleep(500);
+ waitForServer(servers.get(0).getServer());
+
servers.get(3).start();
- Thread.sleep(500);
+ waitForServer(servers.get(3).getServer());
+
servers.get(1).start();
- Thread.sleep(500);
+ waitForServer(servers.get(1).getServer());
+
servers.get(2).start();
- Thread.sleep(500);
+
servers.get(4).start();
- Thread.sleep(500);
+ waitForServer(servers.get(4).getServer());
+
servers.get(5).start();
+
+ waitForServer(servers.get(4).getServer());
+
ServerLocator locator = getServerLocator(0);
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 4);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 4, servers.get(0).getServer());
ClientSession session = sendAndConsume(sf, true);
System.out.println(((ServerLocatorInternal)locator).getTopology().describe());
Thread.sleep(500);
servers.get(0).crash(session);
+ System.out.println("server3 " + servers.get(3).getServer().getClusterManager().getTopology().describe());
+
int liveAfter0 = waitForNewLive(10000, true, servers, 1, 2);
ServerLocator locator2 = getServerLocator(3);
@@ -139,11 +146,18 @@
}
}
- protected void createBackupConfig(NodeManager nodeManager, int liveNode, int nodeid, boolean createClusterConnections, int[] otherBackupNodes, int... otherClusterNodes)
+ protected void createBackupConfig(NodeManager nodeManager,
+ int liveNode,
+ int nodeid,
+ boolean createClusterConnections,
+ int[] otherBackupNodes,
+ int... otherClusterNodes)
{
Configuration config1 = super.createDefaultConfig();
config1.getAcceptorConfigurations().clear();
- config1.getAcceptorConfigurations().add(createTransportConfiguration(isNetty(), true, generateParams(nodeid, isNetty())));
+ config1.getAcceptorConfigurations().add(createTransportConfiguration(isNetty(),
+ true,
+ generateParams(nodeid, isNetty())));
config1.setSecurityEnabled(false);
config1.setSharedStore(true);
config1.setBackup(true);
@@ -152,21 +166,36 @@
List<String> staticConnectors = new ArrayList<String>();
for (int node : otherBackupNodes)
{
- TransportConfiguration liveConnector = createTransportConfiguration(isNetty(), false, generateParams(node, isNetty()));
+ TransportConfiguration liveConnector = createTransportConfiguration(isNetty(),
+ false,
+ generateParams(node, isNetty()));
config1.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
staticConnectors.add(liveConnector.getName());
}
- TransportConfiguration backupConnector = createTransportConfiguration(isNetty(), false, generateParams(nodeid, isNetty()));
+ TransportConfiguration backupConnector = createTransportConfiguration(isNetty(),
+ false,
+ generateParams(nodeid, isNetty()));
config1.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
List<String> clusterNodes = new ArrayList<String>();
for (int node : otherClusterNodes)
{
- TransportConfiguration connector = createTransportConfiguration(isNetty(), false, generateParams(node, isNetty()));
+ TransportConfiguration connector = createTransportConfiguration(isNetty(),
+ false,
+ generateParams(node, isNetty()));
config1.getConnectorConfigurations().put(connector.getName(), connector);
clusterNodes.add(connector.getName());
}
- ClusterConnectionConfiguration ccc1 = new ClusterConnectionConfiguration("cluster1", "jms", backupConnector.getName(), -1, false, false, 1, 1, clusterNodes, false);
+ ClusterConnectionConfiguration ccc1 = new ClusterConnectionConfiguration("cluster1",
+ "jms",
+ backupConnector.getName(),
+ -1,
+ false,
+ false,
+ 1,
+ 1,
+ clusterNodes,
+ false);
config1.getClusterConfigurations().add(ccc1);
config1.setBindingsDirectory(config1.getBindingsDirectory() + "_" + liveNode);
@@ -177,25 +206,39 @@
servers.put(nodeid, new SameProcessHornetQServer(createInVMFailoverServer(true, config1, nodeManager, liveNode)));
}
- protected void createLiveConfig(NodeManager nodeManager, int liveNode, int ... otherLiveNodes)
+ protected void createLiveConfig(NodeManager nodeManager, int liveNode, int... otherLiveNodes)
{
- TransportConfiguration liveConnector = createTransportConfiguration(isNetty(), false, generateParams(liveNode, isNetty()));
+ TransportConfiguration liveConnector = createTransportConfiguration(isNetty(),
+ false,
+ generateParams(liveNode, isNetty()));
Configuration config0 = super.createDefaultConfig();
config0.getAcceptorConfigurations().clear();
- config0.getAcceptorConfigurations().add(createTransportConfiguration(isNetty(), true, generateParams(liveNode, isNetty())));
+ config0.getAcceptorConfigurations().add(createTransportConfiguration(isNetty(),
+ true,
+ generateParams(liveNode, isNetty())));
config0.setSecurityEnabled(false);
config0.setSharedStore(true);
config0.setClustered(true);
List<String> pairs = new ArrayList<String>();
for (int node : otherLiveNodes)
{
- TransportConfiguration otherLiveConnector = createTransportConfiguration(isNetty(), false, generateParams(node, isNetty()));
+ TransportConfiguration otherLiveConnector = createTransportConfiguration(isNetty(),
+ false,
+ generateParams(node, isNetty()));
config0.getConnectorConfigurations().put(otherLiveConnector.getName(), otherLiveConnector);
- pairs.add(otherLiveConnector.getName());
+ pairs.add(otherLiveConnector.getName());
}
- ClusterConnectionConfiguration ccc0 = new ClusterConnectionConfiguration("cluster1", "jms", liveConnector.getName(), -1, false, false, 1, 1,
- pairs, false);
+ ClusterConnectionConfiguration ccc0 = new ClusterConnectionConfiguration("cluster1",
+ "jms",
+ liveConnector.getName(),
+ -1,
+ false,
+ false,
+ 1,
+ 1,
+ pairs,
+ false);
config0.getClusterConfigurations().add(ccc0);
config0.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
@@ -204,7 +247,8 @@
config0.setPagingDirectory(config0.getPagingDirectory() + "_" + liveNode);
config0.setLargeMessagesDirectory(config0.getLargeMessagesDirectory() + "_" + liveNode);
- servers.put(liveNode, new SameProcessHornetQServer(createInVMFailoverServer(true, config0, nodeManager, liveNode)));
+ servers.put(liveNode,
+ new SameProcessHornetQServer(createInVMFailoverServer(true, config0, nodeManager, liveNode)));
}
protected boolean isNetty()
Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java 2011-08-30 11:05:47 UTC (rev 11243)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java 2011-08-30 17:13:32 UTC (rev 11244)
@@ -25,6 +25,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.cluster.impl.ClusterManagerImpl;
+import org.hornetq.tests.util.ServiceTestBase;
/**
* A SameProcessHornetQServer
12 years, 8 months
JBoss hornetq SVN: r11243 - in branches/HORNETQ-720_Replication: tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-30 07:05:47 -0400 (Tue, 30 Aug 2011)
New Revision: 11243
Removed:
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagePositionTest.java
Modified:
branches/HORNETQ-720_Replication/
branches/HORNETQ-720_Replication/pom.xml
Log:
merge from trunk
Property changes on: branches/HORNETQ-720_Replication
___________________________________________________________________
Modified: svn:mergeinfo
- /trunk:10878-11155
+ /trunk:10878-11242
Modified: branches/HORNETQ-720_Replication/pom.xml
===================================================================
--- branches/HORNETQ-720_Replication/pom.xml 2011-08-30 11:00:53 UTC (rev 11242)
+++ branches/HORNETQ-720_Replication/pom.xml 2011-08-30 11:05:47 UTC (rev 11243)
@@ -425,7 +425,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
- <version>2.3.1</version>
+ <version>2.3.2</version>
</plugin>
<plugin>
<groupId>net.sf.maven-sar</groupId>
@@ -435,7 +435,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-site-plugin</artifactId>
- <version>3.0-beta-3</version>
+ <version>3.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
@@ -450,7 +450,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
- <version>2.6</version>
+ <version>2.7</version>
</plugin>
<plugin>
@@ -606,7 +606,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
- <version>2.6</version>
+ <version>2.7</version>
<configuration>
<configLocation>checkstyle.xml</configLocation>
</configuration>
Deleted: branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagePositionTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagePositionTest.java 2011-08-30 11:00:53 UTC (rev 11242)
+++ branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagePositionTest.java 2011-08-30 11:05:47 UTC (rev 11243)
@@ -1,51 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.unit.core.paging.impl;
-
-import org.hornetq.tests.util.UnitTestCase;
-
-/**
- * A PagePositionTest
- *
- * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class PagePositionTest extends UnitTestCase
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testNextSequenceOf()
- {
-
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
12 years, 8 months
JBoss hornetq SVN: r11242 - trunk.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-30 07:00:53 -0400 (Tue, 30 Aug 2011)
New Revision: 11242
Modified:
trunk/pom.xml
Log:
Upgrade Maven 'jar' and 'deploy' plugins
Modified: trunk/pom.xml
===================================================================
--- trunk/pom.xml 2011-08-30 00:03:18 UTC (rev 11241)
+++ trunk/pom.xml 2011-08-30 11:00:53 UTC (rev 11242)
@@ -425,7 +425,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
- <version>2.3.1</version>
+ <version>2.3.2</version>
</plugin>
<plugin>
<groupId>net.sf.maven-sar</groupId>
@@ -450,7 +450,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
- <version>2.6</version>
+ <version>2.7</version>
</plugin>
<plugin>
12 years, 8 months
JBoss hornetq SVN: r11241 - in branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core: server and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-29 20:03:18 -0400 (Mon, 29 Aug 2011)
New Revision: 11241
Modified:
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/ServerSession.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
just a few tweaks to my branch
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-08-29 21:57:04 UTC (rev 11240)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-08-30 00:03:18 UTC (rev 11241)
@@ -1297,8 +1297,6 @@
if (serverLocator.isClusterConnection())
{
-
- new Exception("Announcing node::").printStackTrace();
TransportConfiguration config = serverLocator.getClusterTransportConfiguration();
if (ClientSessionFactoryImpl.isDebug)
{
@@ -1331,9 +1329,6 @@
ClientSessionFactoryImpl.log.debug("Announcing node " + serverLocator.getNodeID() +
", isBackup=" + isBackup);
}
- ClientSessionFactoryImpl.log.info("YYY Announcing node " + serverLocator.getNodeID() + ", config=" + config +
- ", backup=" + backupConfig +
- ", isBackup=" + isBackup);
channel0.send(new NodeAnnounceMessage(currentEventID, nodeID, isBackup, config, backupConfig));
}
@@ -1462,6 +1457,7 @@
if (nodeID != null)
{
+ // TODO: calculate the time of node down
serverLocator.notifyNodeDown(System.currentTimeMillis(), msg.getNodeID().toString());
}
@@ -1479,8 +1475,6 @@
}
else if (type == PacketImpl.CLUSTER_TOPOLOGY)
{
- log.warn("Server is sending packets from an older version. " +
- "You must update all the servers to the same version on a cluster!");
ClusterTopologyChangeMessage topMessage = (ClusterTopologyChangeMessage)packet;
if (topMessage.isExit())
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/ServerSession.java 2011-08-29 21:57:04 UTC (rev 11240)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/ServerSession.java 2011-08-30 00:03:18 UTC (rev 11241)
@@ -15,12 +15,9 @@
import java.util.List;
import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
import javax.transaction.xa.Xid;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.persistence.OperationContext;
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-08-29 21:57:04 UTC (rev 11240)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-08-30 00:03:18 UTC (rev 11241)
@@ -357,7 +357,7 @@
//looks like we've failed over at some point need to inform that we are the backup so when the current live
// goes down they failover to us
clusterManager.announceBackup();
- //Thread.sleep(configuration.getFailbackDelay());
+ Thread.sleep(configuration.getFailbackDelay());
}
nodeManager.startLiveNode();
@@ -2003,7 +2003,14 @@
public String toString()
{
- return "HornetQServerImpl::" + (identity == null ? "" : (identity + ", "))/* + (nodeManager != null ? ("serverUUID=" + nodeManager.getUUID()) : "")*/;
+ if (identity != null)
+ {
+ return "HornetQServerImpl::" + identity;
+ }
+ else
+ {
+ return "HornetQServerImpl::" + (nodeManager != null ? "serverUUID=" + nodeManager.getUUID() : "");
+ }
}
// Inner classes
12 years, 8 months
JBoss hornetq SVN: r11240 - branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-29 17:57:04 -0400 (Mon, 29 Aug 2011)
New Revision: 11240
Modified:
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java
Log:
removing trace
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-29 21:52:17 UTC (rev 11239)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-29 21:57:04 UTC (rev 11240)
@@ -199,10 +199,13 @@
{
if (uniqueEventID > currentMember.getUniqueEventID())
{
- log.info(this + "::updated currentMember=nodeID=" + nodeId +
- currentMember +
- " of memberInput=" +
- memberInput, new Exception ("trace"));
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + "::updated currentMember=nodeID=" + nodeId +
+ currentMember +
+ " of memberInput=" +
+ memberInput);
+ }
TopologyMember newMember = new TopologyMember(memberInput.getConnector().a, memberInput.getConnector().b);
newMember.setUniqueEventID(uniqueEventID);
12 years, 8 months
JBoss hornetq SVN: r11239 - branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-29 17:52:17 -0400 (Mon, 29 Aug 2011)
New Revision: 11239
Modified:
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java
Log:
Adding back a sleep that was needed
Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java 2011-08-29 20:30:46 UTC (rev 11238)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java 2011-08-29 21:52:17 UTC (rev 11239)
@@ -316,6 +316,9 @@
stopServers(2);
+ waitForTopology(servers[1], 4);
+
+ Thread.sleep(1000);
log.info("============================================ after stop");
log.info(clusterDescription(servers[0]));
log.info(clusterDescription(servers[1]));
@@ -323,7 +326,12 @@
log.info(clusterDescription(servers[4]));
startServers(2);
+
+ Thread.sleep(1000);
+
+ waitForTopology(servers[1], 5);
+
log.info("============================================ after start");
log.info(clusterDescription(servers[0]));
log.info(clusterDescription(servers[1]));
12 years, 8 months
JBoss hornetq SVN: r11238 - branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/bridge.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-29 16:30:46 -0400 (Mon, 29 Aug 2011)
New Revision: 11238
Modified:
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
Log:
tweak on test
Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-08-29 20:10:05 UTC (rev 11237)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-08-29 20:30:46 UTC (rev 11238)
@@ -434,7 +434,7 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer1.receive(200);
+ ClientMessage message = consumer1.receive(2000);
Assert.assertNotNull(message);
12 years, 8 months
JBoss hornetq SVN: r11237 - branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-29 16:10:05 -0400 (Mon, 29 Aug 2011)
New Revision: 11237
Modified:
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
Log:
speeding up tests
Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-29 19:59:41 UTC (rev 11236)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-29 20:10:05 UTC (rev 11237)
@@ -73,7 +73,7 @@
*/
public abstract class ClusterTestBase extends ServiceTestBase
{
- private static final Logger log = Logger.getLogger(ClusterTestBase.class);
+ private final Logger log = Logger.getLogger(this.getClass());
private static final int[] PORTS = { TransportConstants.DEFAULT_PORT,
TransportConstants.DEFAULT_PORT + 1,
@@ -116,9 +116,6 @@
locators = new ServerLocator[ClusterTestBase.MAX_SERVERS];
- // To make sure the test will start with a clean VM
- forceGC();
-
}
@Override
@@ -247,7 +244,7 @@
while (System.currentTimeMillis() - start < ClusterTestBase.WAIT_TIMEOUT);
String msg = "Timed out waiting for server starting = " + node;
- ClusterTestBase.log.error(msg);
+ log.error(msg);
throw new IllegalStateException(msg);
}
@@ -283,7 +280,7 @@
topology +
")";
- ClusterTestBase.log.error(msg);
+ log.error(msg);
throw new Exception(msg);
}
@@ -359,7 +356,7 @@
")" +
")";
- ClusterTestBase.log.error(msg);
+ log.error(msg);
Bindings bindings = po.getBindingsForAddress(new SimpleString(address));
@@ -772,7 +769,7 @@
if (message == null)
{
- ClusterTestBase.log.info("*** dumping consumers:");
+ log.info("*** dumping consumers:");
dumpConsumers();
@@ -873,7 +870,7 @@
if (message == null)
{
- ClusterTestBase.log.info("*** dumping consumers:");
+ log.info("*** dumping consumers:");
dumpConsumers();
@@ -920,7 +917,7 @@
{
if (consumers[i] != null && !consumers[i].consumer.isClosed())
{
- ClusterTestBase.log.info("Dumping consumer " + i);
+ log.info("Dumping consumer " + i);
checkReceive(i);
}
@@ -984,13 +981,13 @@
if (message != null)
{
- ClusterTestBase.log.info("check receive Consumer " + consumerID +
+ log.info("check receive Consumer " + consumerID +
" received message " +
message.getObjectProperty(ClusterTestBase.COUNT_PROP));
}
else
{
- ClusterTestBase.log.info("check receive Consumer " + consumerID + " null message");
+ log.info("check receive Consumer " + consumerID + " null message");
}
}
while (message != null);
@@ -2030,7 +2027,7 @@
timeStarts[node] = System.currentTimeMillis();
servers[node].setIdentity("server " + node);
- ClusterTestBase.log.info("starting server " + servers[node]);
+ log.info("starting server " + servers[node]);
servers[node].start();
// for (int i = 0 ; i <= node; i++)
@@ -2038,9 +2035,9 @@
// System.out.println(servers[node].getClusterManager().getTopology().describe());
// }
- ClusterTestBase.log.info("started server " + servers[node]);
+ log.info("started server " + servers[node]);
- ClusterTestBase.log.info("started server " + node);
+ log.info("started server " + node);
waitForServer(servers[node]);
@@ -2082,13 +2079,13 @@
timeStarts[node] = System.currentTimeMillis();
- ClusterTestBase.log.info("stopping server " + node);
+ log.info("stopping server " + node);
servers[node].stop();
- ClusterTestBase.log.info("server " + node + " stopped");
+ log.info("server " + node + " stopped");
}
catch (Exception e)
{
- ClusterTestBase.log.warn(e.getMessage(), e);
+ log.warn(e.getMessage(), e);
}
}
}
Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java 2011-08-29 19:59:41 UTC (rev 11236)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java 2011-08-29 20:10:05 UTC (rev 11237)
@@ -97,9 +97,7 @@
setupCluster();
startServers(0, 1, 2, 3, 4);
-
- Thread.sleep(1000);
-
+
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java 2011-08-29 19:59:41 UTC (rev 11236)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java 2011-08-29 20:10:05 UTC (rev 11237)
@@ -88,13 +88,6 @@
waitForBindings(0, "queues.testaddress", 1, 1, true);
- Thread.sleep(2000);
- System.out.println(clusterDescription(servers[0]));
- System.out.println(clusterDescription(servers[1]));
- System.out.println(clusterDescription(servers[2]));
- System.out.println(clusterDescription(servers[3]));
- System.out.println(clusterDescription(servers[4]));
-
waitForBindings(0, "queues.testaddress", 1, 1, false);
send(0, "queues.testaddress", 10, false, null);
@@ -323,8 +316,6 @@
stopServers(2);
- Thread.sleep(2000);
-
log.info("============================================ after stop");
log.info(clusterDescription(servers[0]));
log.info(clusterDescription(servers[1]));
@@ -333,8 +324,6 @@
startServers(2);
- Thread.sleep(2000);
-
log.info("============================================ after start");
log.info(clusterDescription(servers[0]));
log.info(clusterDescription(servers[1]));
@@ -358,7 +347,6 @@
setupClusterConnection("cluster4-X", 4, -1, "queues", false, 4, isNetty(), true);
startServers(0, 1, 2, 3, 4);
- Thread.sleep(2000);
Set<ClusterConnection> connectionSet = getServer(0).getClusterManager().getClusterConnections();
assertNotNull(connectionSet);
assertEquals(1, connectionSet.size());
Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2011-08-29 19:59:41 UTC (rev 11236)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2011-08-29 20:10:05 UTC (rev 11237)
@@ -72,7 +72,7 @@
startServers(0);
// Give it a little time for the bridge to try to start
- Thread.sleep(2000);
+ Thread.sleep(500);
stopServers(0);
}
@@ -102,7 +102,11 @@
public void testStartSourceServerBeforeTargetServer() throws Exception
{
startServers(0, 1);
+
+ waitForTopology(servers[0], 2);
+ waitForTopology(servers[1], 2);
+
setupSessionFactory(0, isNetty(), true);
setupSessionFactory(1, isNetty(), true);
@@ -124,6 +128,13 @@
public void testStopAndStartTarget() throws Exception
{
startServers(0, 1);
+
+ waitForTopology(servers[0], 2);
+ waitForTopology(servers[1], 2);
+
+ System.out.println(servers[0].getClusterManager().getTopology().describe());
+
+ System.out.println(servers[1].getClusterManager().getTopology().describe());
setupSessionFactory(0, isNetty(), true);
setupSessionFactory(1, isNetty(), true);
@@ -150,12 +161,14 @@
OnewayTwoNodeClusterTest.log.info("stopping server 1");
stopServers(1);
+
+ waitForTopology(servers[0], 1);
OnewayTwoNodeClusterTest.log.info("restarting server 1(" + servers[1].getIdentity() + ")");
startServers(1);
- //Thread.sleep(1000);
+ waitForTopology(servers[0], 2);
log.info("Server 1 id=" + servers[1].getNodeID());
Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2011-08-29 19:59:41 UTC (rev 11236)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2011-08-29 20:10:05 UTC (rev 11237)
@@ -1085,9 +1085,14 @@
public void testRouteWhenNoConsumersFalseNoLocalConsumerLoadBalancedQueues() throws Exception
{
setupCluster(false);
-
+
startServers();
+ for (int i = 0 ; i <= 4; i++)
+ {
+ waitForTopology(servers[i], 5);
+ }
+
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
@@ -1241,10 +1246,6 @@
waitForBindings(3, "queues.testaddress", 4, 4, false);
waitForBindings(4, "queues.testaddress", 4, 4, false);
- // this.checkReceive(0, 1, 2, 3, 4);
-
- // Thread.sleep(300000);
-
verifyReceiveAll(10, 0, 1, 2, 3, 4);
}
@@ -1470,7 +1471,6 @@
waitForBindings(3, "queues.testaddress", 6, 6, true);
waitForBindings(4, "queues.testaddress", 7, 7, true);
- Thread.sleep(2000);
System.out.println("#####################################");
System.out.println(clusterDescription(servers[0]));
System.out.println(clusterDescription(servers[1]));
12 years, 8 months