Author: timfox
Date: 2010-07-11 07:05:40 -0400 (Sun, 11 Jul 2010)
New Revision: 9397
Modified:
branches/2_2_0_HA_Improvements/src/config/common/schema/hornetq-configuration.xsd
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HA improvements
Modified:
branches/2_2_0_HA_Improvements/src/config/common/schema/hornetq-configuration.xsd
===================================================================
---
branches/2_2_0_HA_Improvements/src/config/common/schema/hornetq-configuration.xsd 2010-07-11
10:11:08 UTC (rev 9396)
+++
branches/2_2_0_HA_Improvements/src/config/common/schema/hornetq-configuration.xsd 2010-07-11
11:05:40 UTC (rev 9397)
@@ -245,13 +245,6 @@
</xsd:complexType>
</xsd:element>
- <xsd:complexType name="connector-refType">
- <xsd:attribute name="connector-name" type="xsd:IDREF"
use="required">
- </xsd:attribute>
- <xsd:attribute name="backup-connector-name" type="xsd:IDREF"
use="optional">
- </xsd:attribute>
- </xsd:complexType>
-
<xsd:element name="remoting-interceptors">
<xsd:complexType>
<xsd:sequence>
@@ -321,7 +314,7 @@
<xsd:element maxOccurs="1" minOccurs="1"
name="static-connectors">
<xsd:complexType>
<xsd:sequence>
- <xsd:element maxOccurs="unbounded"
minOccurs="1" name="connector-ref"
type="connectorRefType"/>
+ <xsd:element maxOccurs="unbounded"
minOccurs="1" name="connector-ref" type="xsd:string"/>
</xsd:sequence>
</xsd:complexType>
</xsd:element>
@@ -340,6 +333,8 @@
<xsd:sequence>
<xsd:element maxOccurs="1" minOccurs="1"
name="address" type="xsd:string">
</xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="1"
name="connector-ref" type="xsd:string">
+ </xsd:element>
<xsd:element maxOccurs="1" minOccurs="0"
name="retry-interval" type="xsd:long">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0"
name="use-duplicate-detection" type="xsd:boolean">
@@ -354,7 +349,7 @@
<xsd:element maxOccurs="1" minOccurs="1"
name="static-connectors">
<xsd:complexType>
<xsd:sequence>
- <xsd:element maxOccurs="unbounded"
minOccurs="0" name="connector-ref"
type="connectorRefType"/>
+ <xsd:element maxOccurs="unbounded"
minOccurs="0" name="connector-ref" type="xsd:string"/>
</xsd:sequence>
</xsd:complexType>
</xsd:element>
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java 2010-07-11
10:11:08 UTC (rev 9396)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java 2010-07-11
11:05:40 UTC (rev 9397)
@@ -32,6 +32,8 @@
private final String name;
private final String address;
+
+ private final String connectorName;
private final long retryInterval;
@@ -49,6 +51,7 @@
public ClusterConnectionConfiguration(final String name,
final String address,
+ final String connectorName,
final long retryInterval,
final boolean duplicateDetection,
final boolean forwardWhenNoConsumers,
@@ -58,6 +61,7 @@
{
this.name = name;
this.address = address;
+ this.connectorName = connectorName;
this.retryInterval = retryInterval;
this.staticConnectors = staticConnectors;
this.duplicateDetection = duplicateDetection;
@@ -69,6 +73,7 @@
public ClusterConnectionConfiguration(final String name,
final String address,
+ final String connectorName,
final long retryInterval,
final boolean duplicateDetection,
final boolean forwardWhenNoConsumers,
@@ -78,6 +83,7 @@
{
this.name = name;
this.address = address;
+ this.connectorName = connectorName;
this.retryInterval = retryInterval;
this.duplicateDetection = duplicateDetection;
this.forwardWhenNoConsumers = forwardWhenNoConsumers;
@@ -96,6 +102,11 @@
{
return address;
}
+
+ public String getConnectorName()
+ {
+ return connectorName;
+ }
public boolean isDuplicateDetection()
{
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2010-07-11
10:11:08 UTC (rev 9396)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2010-07-11
11:05:40 UTC (rev 9397)
@@ -935,6 +935,8 @@
String name = e.getAttribute("name");
String address = XMLConfigurationUtil.getString(e, "address", null,
Validators.NOT_NULL_OR_EMPTY);
+
+ String connectorName = XMLConfigurationUtil.getString(e, "connector-ref",
null, Validators.NOT_NULL_OR_EMPTY);
boolean duplicateDetection = XMLConfigurationUtil.getBoolean(e,
"use-duplicate-detection",
@@ -983,9 +985,9 @@
if (child2.getNodeName().equals("connector-ref"))
{
- String connectorName =
child.getAttributes().getNamedItem("connector-name").getNodeValue();
+ String connName =
child.getAttributes().getNamedItem("connector-name").getNodeValue();
- staticConnectorNames.add(connectorName);
+ staticConnectorNames.add(connName);
}
}
}
@@ -997,6 +999,7 @@
{
config = new ClusterConnectionConfiguration(name,
address,
+ connectorName,
retryInterval,
duplicateDetection,
forwardWhenNoConsumers,
@@ -1008,6 +1011,7 @@
{
config = new ClusterConnectionConfiguration(name,
address,
+ connectorName,
retryInterval,
duplicateDetection,
forwardWhenNoConsumers,
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2010-07-11
10:11:08 UTC (rev 9396)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2010-07-11
11:05:40 UTC (rev 9397)
@@ -51,4 +51,6 @@
void activate();
Pair<TransportConfiguration, TransportConfiguration>[] getTopology();
+
+ TransportConfiguration getConnector();
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-07-11
10:11:08 UTC (rev 9396)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-07-11
11:05:40 UTC (rev 9397)
@@ -17,7 +17,6 @@
import java.util.Set;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.core.server.HornetQComponent;
@@ -45,5 +44,5 @@
void unregisterTopologyListener(ClusterTopologyListener listener, boolean
clusterConnection);
- void announceNode(String nodeID, boolean backup, TransportConfiguration connector);
+ void announceNode(String nodeID, boolean backup);
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-07-11
10:11:08 UTC (rev 9396)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-07-11
11:05:40 UTC (rev 9397)
@@ -97,8 +97,11 @@
private Pair<TransportConfiguration, TransportConfiguration>[] topology;
private final ServerLocator serverLocator;
+
+ private final TransportConfiguration connector;
public ClusterConnectionImpl(final ServerLocator serverLocator,
+ final TransportConfiguration connector,
final SimpleString name,
final SimpleString address,
final long retryInterval,
@@ -117,6 +120,8 @@
final String clusterPassword) throws Exception
{
this.serverLocator = serverLocator;
+
+ this.connector = connector;
this.name = name;
@@ -249,6 +254,11 @@
backup = false;
}
+
+ public TransportConfiguration getConnector()
+ {
+ return connector;
+ }
// ClusterTopologyListener implementation
------------------------------------------------------------------
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-07-11
10:11:08 UTC (rev 9396)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-07-11
11:05:40 UTC (rev 9397)
@@ -283,21 +283,24 @@
}
}
- public synchronized void announceNode(final String nodeID,
- final boolean backup,
- final TransportConfiguration connector)
+ public synchronized void announceNode(final String nodeID, final boolean backup)
{
+ // TODO does this really work with more than one cluster connection? I think not
+
+ // Just take the first one for now
+ ClusterConnection cc = clusterConnections.values().iterator().next();
+
Pair<TransportConfiguration, TransportConfiguration> pair =
topology.get(nodeID);
if (pair == null)
{
if (backup)
{
- pair = new Pair<TransportConfiguration, TransportConfiguration>(null,
connector);
+ pair = new Pair<TransportConfiguration, TransportConfiguration>(null,
cc.getConnector());
}
else
{
- pair = new Pair<TransportConfiguration,
TransportConfiguration>(connector, null);
+ pair = new Pair<TransportConfiguration,
TransportConfiguration>(cc.getConnector(), null);
}
topology.put(nodeID, pair);
@@ -306,11 +309,11 @@
{
if (backup)
{
- pair.b = connector;
+ pair.b = cc.getConnector();
}
else
{
- pair.a = connector;
+ pair.a = cc.getConnector();
}
}
@@ -569,6 +572,15 @@
return;
}
+ TransportConfiguration connector =
configuration.getConnectorConfigurations().get(config.getConnectorName());
+
+ if (connector == null)
+ {
+ log.warn("No connecor with name '" + config.getConnectorName() +
+ "'. The cluster connection will not be deployed.");
+ return;
+ }
+
ServerLocator serverLocator;
if (config.getStaticConnectors() != null)
@@ -592,6 +604,7 @@
}
ClusterConnection clusterConnection = new ClusterConnectionImpl(serverLocator,
+ connector,
new
SimpleString(config.getName()),
new
SimpleString(config.getAddress()),
config.getRetryInterval(),
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-07-11
10:11:08 UTC (rev 9396)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-07-11
11:05:40 UTC (rev 9397)
@@ -368,7 +368,7 @@
// Announce presence of live node to cluster
- clusterManager.announceNode(nodeID, false, connector);
+ clusterManager.announceNode(nodeID.toString(), false);
log.info("Server is now live");
}
@@ -492,7 +492,7 @@
//Announce presence of this backup to rest of cluster
- clusterManager.announceNode(nodeID, true, connector);
+ clusterManager.announceNode(nodeID.toString(), true);
// We now look for the live.lock file - if it doesn't exist it means the
live isn't started yet, so we wait
// for that
@@ -526,7 +526,7 @@
// Announce presence of live node to cluster
- clusterManager.announceNode(nodeID, false, connector);
+ clusterManager.announceNode(nodeID.toString(), false);
break;
}