Author: clebert.suconic(a)jboss.com
Date: 2011-08-10 13:30:15 -0400 (Wed, 10 Aug 2011)
New Revision: 11185
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
tweaks on my latest changes
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-10
16:57:38 UTC (rev 11184)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-10
17:30:15 UTC (rev 11185)
@@ -49,8 +49,6 @@
* */
private volatile Object owner;
- private volatile Executor executor;
-
public Topology(final Object owner)
{
this.owner = owner;
@@ -66,11 +64,6 @@
*/
private final Map<String, TopologyMember> topology = new
ConcurrentHashMap<String, TopologyMember>();
- public void setExecutor(Executor executor)
- {
- this.executor = executor;
- }
-
public void addClusterTopologyListener(final ClusterTopologyListener listener)
{
if (log.isDebugEnabled())
@@ -95,7 +88,7 @@
}
}
- public boolean addMember(final String nodeId, final TopologyMember member, final
boolean last)
+ public boolean addMember(final String nodeId, final TopologyMember member, final
boolean last)
{
boolean replaced = false;
@@ -157,8 +150,7 @@
if (Topology.log.isDebugEnabled())
{
- Topology.log.debug(this +
- " Add member nodeId=" +
+ Topology.log.debug(this + " Add member nodeId=" +
nodeId +
" member = " +
member +
@@ -169,35 +161,28 @@
}
}
-
+
if (replaced)
{
final ArrayList<ClusterTopologyListener> copy = copyListeners();
-
// Has to use a different thread otherwise we may get dead locks case the remove
is coming from the channel
- execute(new Runnable(){
- public void run()
+ for (ClusterTopologyListener listener : copy)
+ {
+ if (Topology.log.isTraceEnabled())
{
- for (ClusterTopologyListener listener : copy)
- {
- if (Topology.log.isTraceEnabled())
- {
- Topology.log.trace(this + " informing " + listener +
" about node up = " + nodeId);
- }
+ Topology.log.trace(this + " informing " + listener + "
about node up = " + nodeId);
+ }
- try
- {
- listener.nodeUP(nodeId, member.getConnector(), last);
- }
- catch (Throwable e)
- {
- log.warn (e.getMessage(), e);
- }
- }
+ try
+ {
+ listener.nodeUP(nodeId, member.getConnector(), last);
}
- });
-
+ catch (Throwable e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ }
}
return replaced;
@@ -208,7 +193,7 @@
*/
private ArrayList<ClusterTopologyListener> copyListeners()
{
- ArrayList <ClusterTopologyListener> listenersCopy;
+ ArrayList<ClusterTopologyListener> listenersCopy;
synchronized (topologyListeners)
{
listenersCopy = new
ArrayList<ClusterTopologyListener>(topologyListeners);
@@ -219,12 +204,11 @@
public boolean removeMember(final String nodeId)
{
TopologyMember member;
-
+
synchronized (this)
{
member = topology.remove(nodeId);
}
-
if (Topology.log.isDebugEnabled())
{
@@ -241,20 +225,14 @@
{
final ArrayList<ClusterTopologyListener> copy = copyListeners();
- // Has to use a different thread otherwise we may get dead locks case the remove
is coming from the channel
- execute(new Runnable(){
- public void run()
+ for (ClusterTopologyListener listener : copy)
+ {
+ if (Topology.log.isTraceEnabled())
{
- for (ClusterTopologyListener listener : copy)
- {
- if (Topology.log.isTraceEnabled())
- {
- Topology.log.trace(this + " informing " + listener +
" about node down = " + nodeId);
- }
- listener.nodeDown(nodeId);
- }
+ Topology.log.trace(this + " informing " + listener + "
about node down = " + nodeId);
}
- });
+ listener.nodeDown(nodeId);
+ }
}
return member != null;
}
@@ -268,7 +246,7 @@
{
// To make sure it was updated
addMember(nodeID, member, false);
-
+
ArrayList<ClusterTopologyListener> copy = copyListeners();
// Now force sending it
@@ -394,18 +372,6 @@
}
return null;
}
-
- private void execute(Runnable runnable)
- {
- if (executor != null)
- {
- executor.execute(runnable);
- }
- else
- {
- runnable.run();
- }
- }
/* (non-Javadoc)
* @see java.lang.Object#toString()
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2011-08-10
16:57:38 UTC (rev 11184)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2011-08-10
17:30:15 UTC (rev 11185)
@@ -162,7 +162,7 @@
// This needs to be a different thread pool to the main thread pool especially for
OIO where we may need
// to support many hundreds of connections, but the main thread pool must be kept
small for better performance
- ThreadFactory tFactory = new
HornetQThreadFactory("HornetQ-remoting-threads" +
System.identityHashCode(this),
+ ThreadFactory tFactory = new
HornetQThreadFactory("HornetQ-remoting-threads-" + server.toString() +
"-" + System.identityHashCode(this),
false,
tccl);
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-08-10
16:57:38 UTC (rev 11184)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-08-10
17:30:15 UTC (rev 11185)
@@ -95,7 +95,7 @@
private final long connectionTTL;
private final long retryInterval;
-
+
private final long callTimeout;
private final double retryIntervalMultiplier;
@@ -107,7 +107,7 @@
private final boolean useDuplicateDetection;
private final boolean routeWhenNoConsumers;
-
+
private final int confirmationWindowSize;
private final Map<String, MessageFlowRecord> records = new
ConcurrentHashMap<String, MessageFlowRecord>();
@@ -195,7 +195,7 @@
this.useDuplicateDetection = useDuplicateDetection;
this.routeWhenNoConsumers = routeWhenNoConsumers;
-
+
this.confirmationWindowSize = confirmationWindowSize;
this.executorFactory = executorFactory;
@@ -221,7 +221,7 @@
this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
this.manager = manager;
-
+
this.callTimeout = callTimeout;
this.clusterManagerTopology = clusterManagerTopology;
@@ -293,7 +293,7 @@
this.maxRetryInterval = maxRetryInterval;
this.reconnectAttempts = reconnectAttempts;
-
+
this.callTimeout = callTimeout;
this.useDuplicateDetection = useDuplicateDetection;
@@ -353,7 +353,7 @@
{
return;
}
-
+
if (log.isDebugEnabled())
{
log.debug(this + "::stopping ClusterConnection");
@@ -382,32 +382,32 @@
{
}
}
+ }
- if (managementService != null)
- {
- TypedProperties props = new TypedProperties();
- props.putSimpleStringProperty(new SimpleString("name"), name);
- Notification notification = new Notification(nodeUUID.toString(),
-
NotificationType.CLUSTER_CONNECTION_STOPPED,
- props);
- managementService.sendNotification(notification);
- }
+ if (managementService != null)
+ {
+ TypedProperties props = new TypedProperties();
+ props.putSimpleStringProperty(new SimpleString("name"), name);
+ Notification notification = new Notification(nodeUUID.toString(),
+
NotificationType.CLUSTER_CONNECTION_STOPPED,
+ props);
+ managementService.sendNotification(notification);
+ }
- executor.execute(new Runnable()
+ executor.execute(new Runnable()
+ {
+ public void run()
{
- public void run()
+ if (serverLocator != null)
{
- if (serverLocator != null)
- {
- serverLocator.close();
- serverLocator = null;
- }
-
+ serverLocator.close();
+ serverLocator = null;
}
- });
- started = false;
- }
+ }
+ });
+
+ started = false;
}
public boolean isStarted()
@@ -636,8 +636,7 @@
{
if (isTrace)
{
- log.trace(this +
- " ignored nodeUp record for " +
+ log.trace(this + " ignored nodeUp record for " +
connectorPair +
" on nodeID=" +
nodeID +
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-08-10
16:57:38 UTC (rev 11184)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-08-10
17:30:15 UTC (rev 11185)
@@ -378,8 +378,6 @@
backup = false;
String nodeID = server.getNodeID().toString();
-
- topology.setExecutor(executor);
TopologyMember member = topology.getMember(nodeID);
//swap backup as live and send it to everybody