Author: igarashitm
Date: 2011-02-02 08:44:15 -0500 (Wed, 02 Feb 2011)
New Revision: 10174
Added:
branches/HORNETQ-316/src/main/org/hornetq/api/core/DiscoveryGroupConstants.java
branches/HORNETQ-316/src/main/org/hornetq/core/config/BroadcastGroupConstants.java
Removed:
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/DiscoveryGroupConstants.java
Modified:
branches/HORNETQ-316/src/main/org/hornetq/api/core/DiscoveryGroupConfiguration.java
branches/HORNETQ-316/src/main/org/hornetq/api/core/client/HornetQClient.java
branches/HORNETQ-316/src/main/org/hornetq/api/core/management/HornetQServerControl.java
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/StaticServerLocatorImpl.java
branches/HORNETQ-316/src/main/org/hornetq/core/config/BridgeConfiguration.java
branches/HORNETQ-316/src/main/org/hornetq/core/config/BroadcastGroupConfiguration.java
branches/HORNETQ-316/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java
branches/HORNETQ-316/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
branches/HORNETQ-316/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java
branches/HORNETQ-316/src/main/org/hornetq/core/management/impl/BroadcastGroupControlImpl.java
branches/HORNETQ-316/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java
branches/HORNETQ-316/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/BroadcastGroup.java
branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java
branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/HORNETQ-316/src/main/org/hornetq/ra/HornetQResourceAdapter.java
Log:
also made BroadcastGroup pluggable
Modified:
branches/HORNETQ-316/src/main/org/hornetq/api/core/DiscoveryGroupConfiguration.java
===================================================================
---
branches/HORNETQ-316/src/main/org/hornetq/api/core/DiscoveryGroupConfiguration.java 2011-02-02
13:09:22 UTC (rev 10173)
+++
branches/HORNETQ-316/src/main/org/hornetq/api/core/DiscoveryGroupConfiguration.java 2011-02-02
13:44:15 UTC (rev 10174)
@@ -54,5 +54,4 @@
{
return this.name;
}
-
}
Copied: branches/HORNETQ-316/src/main/org/hornetq/api/core/DiscoveryGroupConstants.java
(from rev 10150,
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/DiscoveryGroupConstants.java)
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/api/core/DiscoveryGroupConstants.java
(rev 0)
+++
branches/HORNETQ-316/src/main/org/hornetq/api/core/DiscoveryGroupConstants.java 2011-02-02
13:44:15 UTC (rev 10174)
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.api.core;
+
+/**
+ * A DiscoveryGroupConstants
+ *
+ * @author "<a href=\"tm.igarashi(a)gmail.com\">Tomohisa
Igarashi</a>"
+ *
+ */
+public class DiscoveryGroupConstants
+{
+ // for static discovery
+ public static final String STATIC_CONNECTORS_CONNECTOR_REF_LIST_NAME =
"static-connector-ref-list";
+ public static final String STATIC_CONNECTORS_LIST_NAME =
"static-connector-list";
+
+ // for simple UDP discovery
+ public static final String LOCAL_BIND_ADDRESS_NAME = "local-bind-address";
+ public static final String GROUP_ADDRESS_NAME = "group-address";
+ public static final String GROUP_PORT_NAME = "group-port";
+ public static final String INITIAL_WAIT_TIMEOUT_NAME =
"initial-wait-timeout";
+ public static final String REFRESH_TIMEOUT_NAME = "refresh-timeout";
+}
Property changes on:
branches/HORNETQ-316/src/main/org/hornetq/api/core/DiscoveryGroupConstants.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified: branches/HORNETQ-316/src/main/org/hornetq/api/core/client/HornetQClient.java
===================================================================
---
branches/HORNETQ-316/src/main/org/hornetq/api/core/client/HornetQClient.java 2011-02-02
13:09:22 UTC (rev 10173)
+++
branches/HORNETQ-316/src/main/org/hornetq/api/core/client/HornetQClient.java 2011-02-02
13:44:15 UTC (rev 10174)
@@ -13,10 +13,10 @@
package org.hornetq.api.core.client;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.DiscoveryGroupConstants;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy;
-import org.hornetq.core.client.impl.DiscoveryGroupConstants;
import org.hornetq.core.client.impl.StaticServerLocatorImpl;
import java.lang.reflect.Constructor;
Modified:
branches/HORNETQ-316/src/main/org/hornetq/api/core/management/HornetQServerControl.java
===================================================================
---
branches/HORNETQ-316/src/main/org/hornetq/api/core/management/HornetQServerControl.java 2011-02-02
13:09:22 UTC (rev 10173)
+++
branches/HORNETQ-316/src/main/org/hornetq/api/core/management/HornetQServerControl.java 2011-02-02
13:44:15 UTC (rev 10174)
@@ -565,8 +565,7 @@
@Parameter(name="useDuplicateDetection", desc="Use
duplicate detection") boolean useDuplicateDetection,
@Parameter(name="confirmationWindowSize",
desc="Confirmation window size") int confirmationWindowSize,
@Parameter(name="clientFailureCheckPeriod",
desc="Period to check client failure") long clientFailureCheckPeriod,
- @Parameter(name="staticConnectorNames", desc="comma
separated list of connector names or name of discovery group if
'useDiscoveryGroup' is set to true") String connectorNames,
- @Parameter(name="useDiscoveryGroup", desc="use
discovery group")boolean useDiscoveryGroup,
+ @Parameter(name="discoveryGroupName", desc="name of
discovery group") String connectorNames,
@Parameter(name="ha", desc="Is it using HA")
boolean ha,
@Parameter(name="user", desc="User name") String
user,
@Parameter(name="password", desc="User
password") String password) throws Exception;
Deleted:
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/DiscoveryGroupConstants.java
===================================================================
---
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/DiscoveryGroupConstants.java 2011-02-02
13:09:22 UTC (rev 10173)
+++
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/DiscoveryGroupConstants.java 2011-02-02
13:44:15 UTC (rev 10174)
@@ -1,34 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.client.impl;
-
-/**
- * A DiscoveryGroupConstants
- *
- * @author "<a href=\"tm.igarashi(a)gmail.com\">Tomohisa
Igarashi</a>"
- *
- */
-public class DiscoveryGroupConstants
-{
- // for static discovery
- public static final String STATIC_CONNECTORS_CONNECTOR_REF_LIST_NAME =
"static-connector-ref-list";
- public static final String STATIC_CONNECTORS_LIST_NAME =
"static-connector-list";
-
- // for simple UDP discovery
- public static final String LOCAL_BIND_ADDRESS_NAME = "local-bind-address";
- public static final String GROUP_ADDRESS_NAME = "group-address";
- public static final String GROUP_PORT_NAME = "group-port";
- public static final String INITIAL_WAIT_TIMEOUT_NAME =
"initial-wait-timeout";
- public static final String REFRESH_TIMEOUT_NAME = "refresh-timeout";
-}
Modified:
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java
===================================================================
---
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java 2011-02-02
13:09:22 UTC (rev 10173)
+++
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java 2011-02-02
13:44:15 UTC (rev 10174)
@@ -19,6 +19,7 @@
import java.util.concurrent.*;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.DiscoveryGroupConstants;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSessionFactory;
Modified:
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/StaticServerLocatorImpl.java
===================================================================
---
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/StaticServerLocatorImpl.java 2011-02-02
13:09:22 UTC (rev 10173)
+++
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/StaticServerLocatorImpl.java 2011-02-02
13:44:15 UTC (rev 10174)
@@ -18,6 +18,7 @@
import java.util.concurrent.*;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.DiscoveryGroupConstants;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSessionFactory;
Modified: branches/HORNETQ-316/src/main/org/hornetq/core/config/BridgeConfiguration.java
===================================================================
---
branches/HORNETQ-316/src/main/org/hornetq/core/config/BridgeConfiguration.java 2011-02-02
13:09:22 UTC (rev 10173)
+++
branches/HORNETQ-316/src/main/org/hornetq/core/config/BridgeConfiguration.java 2011-02-02
13:44:15 UTC (rev 10174)
@@ -16,6 +16,8 @@
import java.io.Serializable;
import java.util.List;
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+
/**
* A BridgeConfiguration
*
@@ -37,9 +39,7 @@
private String filterString;
- private List<String> staticConnectors;
-
- private String discoveryGroupName;
+ private DiscoveryGroupConfiguration discoveryGroupConfiguration;
private boolean ha;
@@ -72,7 +72,7 @@
final boolean useDuplicateDetection,
final int confirmationWindowSize,
final long clientFailureCheckPeriod,
- final List<String> staticConnectors,
+ final DiscoveryGroupConfiguration
discoveryGroupConfiguration,
final boolean ha,
final String user,
final String password)
@@ -88,41 +88,7 @@
this.useDuplicateDetection = useDuplicateDetection;
this.confirmationWindowSize = confirmationWindowSize;
this.clientFailureCheckPeriod = clientFailureCheckPeriod;
- this.staticConnectors = staticConnectors;
- this.user = user;
- this.password = password;
- discoveryGroupName = null;
- }
-
- public BridgeConfiguration(final String name,
- final String queueName,
- final String forwardingAddress,
- final String filterString,
- final String transformerClassName,
- final long retryInterval,
- final double retryIntervalMultiplier,
- final int reconnectAttempts,
- final boolean useDuplicateDetection,
- final int confirmationWindowSize,
- final long clientFailureCheckPeriod,
- final String discoveryGroupName,
- final boolean ha,
- final String user,
- final String password)
- {
- this.name = name;
- this.queueName = queueName;
- this.forwardingAddress = forwardingAddress;
- this.filterString = filterString;
- this.transformerClassName = transformerClassName;
- this.retryInterval = retryInterval;
- this.retryIntervalMultiplier = retryIntervalMultiplier;
- this.reconnectAttempts = reconnectAttempts;
- this.useDuplicateDetection = useDuplicateDetection;
- this.confirmationWindowSize = confirmationWindowSize;
- this.clientFailureCheckPeriod = clientFailureCheckPeriod;
- this.staticConnectors = null;
- this.discoveryGroupName = discoveryGroupName;
+ this.discoveryGroupConfiguration = discoveryGroupConfiguration;
this.ha = ha;
this.user = user;
this.password = password;
@@ -153,15 +119,10 @@
return transformerClassName;
}
- public List<String> getStaticConnectors()
+ public DiscoveryGroupConfiguration getDiscoveryGroupConfiguration()
{
- return staticConnectors;
+ return discoveryGroupConfiguration;
}
-
- public String getDiscoveryGroupName()
- {
- return discoveryGroupName;
- }
public boolean isHA()
{
@@ -231,19 +192,11 @@
}
/**
- * @param staticConnectors the staticConnectors to set
- */
- public void setStaticConnectors(final List<String> staticConnectors)
- {
- this.staticConnectors = staticConnectors;
- }
-
- /**
* @param discoveryGroupName the discoveryGroupName to set
*/
- public void setDiscoveryGroupName(final String discoveryGroupName)
+ public void setDiscoveryGroupConfiguration(final DiscoveryGroupConfiguration
discoveryGroupConfiguration)
{
- this.discoveryGroupName = discoveryGroupName;
+ this.discoveryGroupConfiguration = discoveryGroupConfiguration;
}
/**
Modified:
branches/HORNETQ-316/src/main/org/hornetq/core/config/BroadcastGroupConfiguration.java
===================================================================
---
branches/HORNETQ-316/src/main/org/hornetq/core/config/BroadcastGroupConfiguration.java 2011-02-02
13:09:22 UTC (rev 10173)
+++
branches/HORNETQ-316/src/main/org/hornetq/core/config/BroadcastGroupConfiguration.java 2011-02-02
13:44:15 UTC (rev 10174)
@@ -15,6 +15,7 @@
import java.io.Serializable;
import java.util.List;
+import java.util.Map;
import org.hornetq.core.logging.Logger;
@@ -32,73 +33,38 @@
private static final Logger log =
Logger.getLogger(BroadcastGroupConfiguration.class);
+ private String broadcastGroupClassName;
+
+ private Map<String,Object> params;
+
private String name;
- private String localBindAddress;
-
- private int localBindPort;
-
- private String groupAddress;
-
- private int groupPort;
-
- private long broadcastPeriod;
-
- private List<String> connectorInfos;
-
- public BroadcastGroupConfiguration(final String name,
- final String localBindAddress,
- final int localBindPort,
- final String groupAddress,
- final int groupPort,
- final long broadcastPeriod,
- final List<String> connectorInfos)
+ public BroadcastGroupConfiguration(final String clazz,
+ final Map<String,Object> params,
+ final String name)
{
super();
+ this.broadcastGroupClassName = clazz;
+ this.params = params;
this.name = name;
- this.localBindAddress = localBindAddress;
- this.localBindPort = localBindPort;
- this.groupAddress = groupAddress;
- this.groupPort = groupPort;
- this.broadcastPeriod = broadcastPeriod;
- this.connectorInfos = connectorInfos;
+
}
- public String getName()
+ public String getBroadcastGroupClassName()
{
- return name;
+ return this.broadcastGroupClassName;
}
- public String getLocalBindAddress()
+ public Map<String, Object> getParams()
{
- return localBindAddress;
+ return this.params;
}
- public int getLocalBindPort()
+ public String getName()
{
- return localBindPort;
+ return name;
}
- public String getGroupAddress()
- {
- return groupAddress;
- }
-
- public int getGroupPort()
- {
- return groupPort;
- }
-
- public long getBroadcastPeriod()
- {
- return broadcastPeriod;
- }
-
- public List<String> getConnectorInfos()
- {
- return connectorInfos;
- }
-
/**
* @param name the name to set
*/
@@ -106,53 +72,4 @@
{
this.name = name;
}
-
- /**
- * @param localBindAddress the localBindAddress to set
- */
- public void setLocalBindAddress(final String localBindAddress)
- {
- this.localBindAddress = localBindAddress;
- }
-
- /**
- * @param localBindPort the localBindPort to set
- */
- public void setLocalBindPort(final int localBindPort)
- {
- this.localBindPort = localBindPort;
- }
-
- /**
- * @param groupAddress the groupAddress to set
- */
- public void setGroupAddress(final String groupAddress)
- {
- this.groupAddress = groupAddress;
- }
-
- /**
- * @param groupPort the groupPort to set
- */
- public void setGroupPort(final int groupPort)
- {
- this.groupPort = groupPort;
- }
-
- /**
- * @param broadcastPeriod the broadcastPeriod to set
- */
- public void setBroadcastPeriod(final long broadcastPeriod)
- {
- this.broadcastPeriod = broadcastPeriod;
- }
-
- /**
- * @param connectorInfos the connectorInfos to set
- */
- public void setConnectorInfos(final List<String> connectorInfos)
- {
- this.connectorInfos = connectorInfos;
- }
-
}
Added: branches/HORNETQ-316/src/main/org/hornetq/core/config/BroadcastGroupConstants.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/core/config/BroadcastGroupConstants.java
(rev 0)
+++
branches/HORNETQ-316/src/main/org/hornetq/core/config/BroadcastGroupConstants.java 2011-02-02
13:44:15 UTC (rev 10174)
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.config;
+
+/**
+ * A BroadcastGroupConstants
+ *
+ * @author "<a href=\"tm.igarashi(a)gmail.com\">Tomohisa
Igarashi</a>"
+ *
+ *
+ */
+public class BroadcastGroupConstants
+{
+ // for simple UDP broadcast
+ public static final String LOCAL_BIND_ADDRESS_NAME = "local-bind-address";
+ public static final String LOCAL_BIND_PORT_NAME = "local-bind-port";
+ public static final String GROUP_ADDRESS_NAME = "group-address";
+ public static final String GROUP_PORT_NAME = "group-port";
+ public static final String BROADCAST_PERIOD_NAME = "broadcast-period";
+ public static final String CONNECTOR_REF_LIST_NAME = "connector-ref-list";
+ public static final String CONNECTOR_LIST_NAME = "connector-list";
+
+}
Property changes on:
branches/HORNETQ-316/src/main/org/hornetq/core/config/BroadcastGroupConstants.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified:
branches/HORNETQ-316/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java
===================================================================
---
branches/HORNETQ-316/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java 2011-02-02
13:09:22 UTC (rev 10173)
+++
branches/HORNETQ-316/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java 2011-02-02
13:44:15 UTC (rev 10174)
@@ -14,8 +14,9 @@
package org.hornetq.core.config;
import java.io.Serializable;
-import java.util.List;
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+
/**
* A ClusterConnectionConfiguration
*
@@ -41,12 +42,8 @@
private final boolean forwardWhenNoConsumers;
- private final List<String> staticConnectors;
-
- private final List<String> allowableConnectors;
+ private final DiscoveryGroupConfiguration discoveryGroupConfiguration;
- private final String discoveryGroupName;
-
private final int maxHops;
private final int confirmationWindowSize;
@@ -61,10 +58,8 @@
final boolean forwardWhenNoConsumers,
final int maxHops,
final int confirmationWindowSize,
- final List<String> staticConnectors,
- final String discoveryGroupName,
- final boolean allowableConnectionsOnly,
- final List<String>
allowableConnectorNames)
+ final DiscoveryGroupConfiguration
discoveryGroupConfiguration,
+ final boolean allowableConnectionsOnly)
{
this.name = name;
this.address = address;
@@ -72,11 +67,9 @@
this.retryInterval = retryInterval;
this.duplicateDetection = duplicateDetection;
this.forwardWhenNoConsumers = forwardWhenNoConsumers;
- this.discoveryGroupName = discoveryGroupName;
+ this.discoveryGroupConfiguration = discoveryGroupConfiguration;
this.maxHops = maxHops;
this.confirmationWindowSize = confirmationWindowSize;
- this.staticConnectors = staticConnectors;
- this.allowableConnectors = allowableConnectorNames;
this.allowableConnectionsOnly = allowableConnectionsOnly;
}
@@ -115,21 +108,11 @@
return confirmationWindowSize;
}
- public List<String> getStaticConnectors()
+ public DiscoveryGroupConfiguration getDiscoveryGroupConfiguration()
{
- return staticConnectors;
+ return discoveryGroupConfiguration;
}
-
- public List<String> getAllowableConnectors()
- {
- return allowableConnectors;
- }
- public String getDiscoveryGroupName()
- {
- return discoveryGroupName;
- }
-
public long getRetryInterval()
{
return retryInterval;
Modified:
branches/HORNETQ-316/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
===================================================================
---
branches/HORNETQ-316/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2011-02-02
13:09:22 UTC (rev 10173)
+++
branches/HORNETQ-316/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2011-02-02
13:44:15 UTC (rev 10174)
@@ -25,11 +25,11 @@
import java.util.StringTokenizer;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.DiscoveryGroupConstants;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.core.client.impl.DiscoveryGroupConstants;
import org.hornetq.core.config.*;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.config.impl.FileConfiguration;
@@ -894,46 +894,41 @@
{
String name = e.getAttribute("name");
- String localAddress = XMLConfigurationUtil.getString(e,
"local-bind-address", null, Validators.NO_CHECK);
+ String clazz = XMLConfigurationUtil.getString(e, "broadcast-group-class",
null, Validators.NOT_NULL_OR_EMPTY);
- int localBindPort = XMLConfigurationUtil.getInteger(e, "local-bind-port",
-1, Validators.MINUS_ONE_OR_GT_ZERO);
+ Map<String, Object> params = new HashMap<String, Object>();
- String groupAddress = XMLConfigurationUtil.getString(e, "group-address",
null, Validators.NOT_NULL_OR_EMPTY);
+ NodeList paramsNodes = e.getElementsByTagName("param");
- int groupPort = XMLConfigurationUtil.getInteger(e, "group-port", -1,
Validators.GT_ZERO);
+ for (int i = 0; i < paramsNodes.getLength(); i++)
+ {
+ Node paramNode = paramsNodes.item(i);
- long broadcastPeriod = XMLConfigurationUtil.getLong(e,
- "broadcast-period",
-
ConfigurationImpl.DEFAULT_BROADCAST_PERIOD,
- Validators.GT_ZERO);
+ NamedNodeMap attributes = paramNode.getAttributes();
- NodeList children = e.getChildNodes();
+ Node nkey = attributes.getNamedItem("key");
- List<String> connectorNames = new ArrayList<String>();
+ String key = nkey.getTextContent();
- for (int j = 0; j < children.getLength(); j++)
- {
- Node child = children.item(j);
+ Node nValue = attributes.getNamedItem("value");
- if (child.getNodeName().equals("connector-ref"))
- {
- String connectorName = XMLConfigurationUtil.getString(e,
-
"connector-ref",
- null,
-
Validators.NOT_NULL_OR_EMPTY);
+ params.put(key, nValue.getTextContent());
+ }
- connectorNames.add(connectorName);
+ String connectorList =
(String)params.get(BroadcastGroupConstants.CONNECTOR_REF_LIST_NAME);
+ if(connectorList != null)
+ {
+ List<TransportConfiguration> connectors = new
ArrayList<TransportConfiguration>();
+ StringTokenizer token = new StringTokenizer(connectorList, ",",
false);
+ while(token.hasMoreElements())
+ {
+
connectors.add(mainConfig.getConnectorConfigurations().get(token.nextElement()));
}
+ params.put(BroadcastGroupConstants.CONNECTOR_LIST_NAME, connectors.toArray(new
TransportConfiguration[0]));
}
+
+ BroadcastGroupConfiguration config = new BroadcastGroupConfiguration(clazz, params,
name);
- BroadcastGroupConfiguration config = new BroadcastGroupConfiguration(name,
- localAddress,
-
localBindPort,
- groupAddress,
- groupPort,
-
broadcastPeriod,
-
connectorNames);
-
mainConfig.getBroadcastGroupConfigurations().add(config);
}
@@ -962,7 +957,6 @@
params.put(key, nValue.getTextContent());
}
- // discovery-group configuration contains static connector list
String connectorList =
(String)params.get(DiscoveryGroupConstants.STATIC_CONNECTORS_CONNECTOR_REF_LIST_NAME);
if(connectorList != null)
{
@@ -1043,34 +1037,8 @@
}
}
- List<String> staticConnectors = new ArrayList<String>();
- DiscoveryGroupConfiguration discovery =
mainConfig.getDiscoveryGroupConfigurations().get(discoveryGroupName);
- Map<String,Object> params = discovery.getParams();
- String connectorList =
(String)params.get(DiscoveryGroupConstants.STATIC_CONNECTORS_CONNECTOR_REF_LIST_NAME);
- if(connectorList != null)
- {
- StringTokenizer token = new StringTokenizer(connectorList, ",",
false);
- while(token.hasMoreElements())
- {
- staticConnectors.add(token.nextToken());
- }
- }
+ DiscoveryGroupConfiguration discoveryGroupConfiguration =
mainConfig.getDiscoveryGroupConfigurations().get(discoveryGroupName);
- List<String> allowableConnectionNames = null;
- if(allowDirectConnectionsOnly)
- {
- if(connectorList == null)
- {
- log.warn("allow-direct-connections-only was found, but "
- + DiscoveryGroupConstants.STATIC_CONNECTORS_CONNECTOR_REF_LIST_NAME
- + " was not found in discovery-group. ignore.");
- }
- else
- {
- allowableConnectionNames = staticConnectors;
- }
- }
-
ClusterConnectionConfiguration config;
config = new ClusterConnectionConfiguration(name,
@@ -1081,10 +1049,8 @@
forwardWhenNoConsumers,
maxHops,
confirmationWindowSize,
- staticConnectors,
- discoveryGroupName,
- allowDirectConnectionsOnly,
- allowableConnectionNames);
+ discoveryGroupConfiguration,
+ allowDirectConnectionsOnly);
mainConfig.getClusterConfigurations().add(config);
}
@@ -1157,10 +1123,8 @@
String filterString = null;
- List<String> staticConnectorNames = new ArrayList<String>();
+ DiscoveryGroupConfiguration discoveryGroupConfiguration = null;
- String discoveryGroupName = null;
-
NodeList children = brNode.getChildNodes();
for (int j = 0; j < children.getLength(); j++)
@@ -1173,70 +1137,32 @@
}
else if (child.getNodeName().equals("discovery-group-ref"))
{
- discoveryGroupName =
child.getAttributes().getNamedItem("discovery-group-name").getNodeValue();
+ String discoveryGroupName =
child.getAttributes().getNamedItem("discovery-group-name").getNodeValue();
+ discoveryGroupConfiguration =
mainConfig.getDiscoveryGroupConfigurations().get(discoveryGroupName);
}
- else if (child.getNodeName().equals("static-connectors"))
- {
- getStaticConnectors(staticConnectorNames, child);
- }
}
BridgeConfiguration config;
- if (!staticConnectorNames.isEmpty())
- {
- config = new BridgeConfiguration(name,
- queueName,
- forwardingAddress,
- filterString,
- transformerClassName,
- retryInterval,
- retryIntervalMultiplier,
- reconnectAttempts,
- useDuplicateDetection,
- confirmationWindowSize,
-
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- staticConnectorNames,
- ha,
- user,
- password);
- }
- else
- {
- config = new BridgeConfiguration(name,
- queueName,
- forwardingAddress,
- filterString,
- transformerClassName,
- retryInterval,
- retryIntervalMultiplier,
- reconnectAttempts,
- useDuplicateDetection,
- confirmationWindowSize,
-
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- discoveryGroupName,
- ha,
- user,
- password);
- }
+ config = new BridgeConfiguration(name,
+ queueName,
+ forwardingAddress,
+ filterString,
+ transformerClassName,
+ retryInterval,
+ retryIntervalMultiplier,
+ reconnectAttempts,
+ useDuplicateDetection,
+ confirmationWindowSize,
+
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ discoveryGroupConfiguration,
+ ha,
+ user,
+ password);
mainConfig.getBridgeConfigurations().add(config);
}
- private void getStaticConnectors(List<String> staticConnectorNames, Node child)
- {
- NodeList children2 =
((Element)child).getElementsByTagName("connector-ref");
-
- for (int k = 0; k < children2.getLength(); k++)
- {
- Element child2 = (Element)children2.item(k);
-
- String connectorName = child2.getChildNodes().item(0).getNodeValue();
-
- staticConnectorNames.add(connectorName);
- }
- }
-
private void parseDivertConfiguration(final Element e, final Configuration
mainConfig)
{
String name = e.getAttribute("name");
Modified:
branches/HORNETQ-316/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java
===================================================================
---
branches/HORNETQ-316/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java 2011-02-02
13:09:22 UTC (rev 10173)
+++
branches/HORNETQ-316/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java 2011-02-02
13:44:15 UTC (rev 10174)
@@ -13,8 +13,12 @@
package org.hornetq.core.management.impl;
+import java.util.Map;
+
import javax.management.MBeanOperationInfo;
+import org.hornetq.api.core.DiscoveryGroupConstants;
+import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.management.BridgeControl;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.persistence.StorageManager;
@@ -58,7 +62,19 @@
clearIO();
try
{
- return configuration.getStaticConnectors().toArray(new String[0]);
+ Map<String,Object> params =
configuration.getDiscoveryGroupConfiguration().getParams();
+ TransportConfiguration[] staticConnectors =
(TransportConfiguration[])params.get(DiscoveryGroupConstants.STATIC_CONNECTORS_LIST_NAME);
+ if(staticConnectors == null)
+ {
+ return null;
+ }
+
+ String[] staticConnectorNames = new String[staticConnectors.length];
+ for(int i=0; i<staticConnectors.length; i++)
+ {
+ staticConnectorNames[i] = staticConnectors[i].getName();
+ }
+ return staticConnectorNames;
}
finally
{
@@ -97,7 +113,7 @@
clearIO();
try
{
- return configuration.getDiscoveryGroupName();
+ return configuration.getDiscoveryGroupConfiguration().getName();
}
finally
{
Modified:
branches/HORNETQ-316/src/main/org/hornetq/core/management/impl/BroadcastGroupControlImpl.java
===================================================================
---
branches/HORNETQ-316/src/main/org/hornetq/core/management/impl/BroadcastGroupControlImpl.java 2011-02-02
13:09:22 UTC (rev 10173)
+++
branches/HORNETQ-316/src/main/org/hornetq/core/management/impl/BroadcastGroupControlImpl.java 2011-02-02
13:44:15 UTC (rev 10174)
@@ -13,10 +13,14 @@
package org.hornetq.core.management.impl;
+import java.util.Map;
+
import javax.management.MBeanOperationInfo;
+import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.management.BroadcastGroupControl;
import org.hornetq.core.config.BroadcastGroupConfiguration;
+import org.hornetq.core.config.BroadcastGroupConstants;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.cluster.BroadcastGroup;
import org.hornetq.utils.json.JSONArray;
@@ -72,7 +76,16 @@
clearIO();
try
{
- return configuration.getBroadcastPeriod();
+ Map<String,Object> params = configuration.getParams();
+ String period =
(String)params.get(BroadcastGroupConstants.BROADCAST_PERIOD_NAME);
+ if(period == null)
+ {
+ return -1;
+ }
+ else
+ {
+ return Long.parseLong(period);
+ }
}
finally
{
@@ -85,12 +98,15 @@
clearIO();
try
{
- Object[] ret = new Object[configuration.getConnectorInfos().size()];
+ Map<String,Object> params = configuration.getParams();
+ TransportConfiguration[] connectors =
(TransportConfiguration[])params.get(BroadcastGroupConstants.CONNECTOR_LIST_NAME);
+
+ Object[] ret = new Object[connectors.length];
int i = 0;
- for (String connector : configuration.getConnectorInfos())
+ for (TransportConfiguration conn : connectors)
{
- ret[i++] = connector;
+ ret[i++] = conn.getName();
}
return ret;
@@ -108,9 +124,12 @@
{
JSONArray array = new JSONArray();
- for (String connector : configuration.getConnectorInfos())
+ Map<String,Object> params = configuration.getParams();
+ TransportConfiguration[] connectors =
(TransportConfiguration[])params.get(BroadcastGroupConstants.CONNECTOR_LIST_NAME);
+
+ for (TransportConfiguration conn : connectors)
{
- array.put(connector);
+ array.put(conn.getName());
}
return array.toString();
}
@@ -125,7 +144,8 @@
clearIO();
try
{
- return configuration.getGroupAddress();
+ Map<String,Object> params = configuration.getParams();
+ return (String)params.get(BroadcastGroupConstants.GROUP_ADDRESS_NAME);
}
finally
{
@@ -138,7 +158,16 @@
clearIO();
try
{
- return configuration.getGroupPort();
+ Map<String,Object> params = configuration.getParams();
+ String port = (String)params.get(BroadcastGroupConstants.GROUP_PORT_NAME);
+ if(port == null)
+ {
+ return -1;
+ }
+ else
+ {
+ return Integer.parseInt(port);
+ }
}
finally
{
@@ -151,7 +180,16 @@
clearIO();
try
{
- return configuration.getLocalBindPort();
+ Map<String,Object> params = configuration.getParams();
+ String port = (String)params.get(BroadcastGroupConstants.LOCAL_BIND_PORT_NAME);
+ if(port == null)
+ {
+ return -1;
+ }
+ else
+ {
+ return Integer.parseInt(port);
+ }
}
finally
{
Modified:
branches/HORNETQ-316/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java
===================================================================
---
branches/HORNETQ-316/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java 2011-02-02
13:09:22 UTC (rev 10173)
+++
branches/HORNETQ-316/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java 2011-02-02
13:44:15 UTC (rev 10174)
@@ -18,6 +18,8 @@
import javax.management.MBeanOperationInfo;
+import org.hornetq.api.core.DiscoveryGroupConstants;
+import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.management.ClusterConnectionControl;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.persistence.StorageManager;
@@ -74,7 +76,7 @@
clearIO();
try
{
- return configuration.getDiscoveryGroupName();
+ return configuration.getDiscoveryGroupConfiguration().getName();
}
finally
{
@@ -143,14 +145,19 @@
clearIO();
try
{
- if (configuration.getStaticConnectors() == null)
+ Map<String,Object> params =
configuration.getDiscoveryGroupConfiguration().getParams();
+ TransportConfiguration[] staticConnectors =
(TransportConfiguration[])params.get(DiscoveryGroupConstants.STATIC_CONNECTORS_LIST_NAME);
+ if(staticConnectors == null)
{
return null;
}
- else
+
+ String[] staticConnectorNames = new String[staticConnectors.length];
+ for(int i=0; i<staticConnectors.length; i++)
{
- return configuration.getStaticConnectors().toArray(new String[0]);
+ staticConnectorNames[i] = staticConnectors[i].getName();
}
+ return staticConnectorNames;
}
finally
{
@@ -163,7 +170,7 @@
clearIO();
try
{
- List<String> connectors = configuration.getStaticConnectors();
+ String[] connectors = getStaticConnectors();
if (connectors == null)
{
Modified:
branches/HORNETQ-316/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
---
branches/HORNETQ-316/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2011-02-02
13:09:22 UTC (rev 10173)
+++
branches/HORNETQ-316/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2011-02-02
13:44:15 UTC (rev 10174)
@@ -1641,8 +1641,7 @@
final boolean useDuplicateDetection,
final int confirmationWindowSize,
final long clientFailureCheckPeriod,
- final String connectorNames,
- boolean useDiscoveryGroup,
+ final String discoveryGroupName,
final boolean ha,
final String user,
final String password) throws Exception
@@ -1655,43 +1654,22 @@
try
{
BridgeConfiguration config = null;
- if (useDiscoveryGroup)
- {
- config = new BridgeConfiguration(name,
- queueName,
- forwardingAddress,
- filterString,
- transformerClassName,
- retryInterval,
- retryIntervalMultiplier,
- reconnectAttempts,
- useDuplicateDetection,
- confirmationWindowSize,
- clientFailureCheckPeriod,
- connectorNames,
- ha,
- user,
- password);
- }
- else
- {
- List<String> connectors = toList(connectorNames);
- config = new BridgeConfiguration(name,
- queueName,
- forwardingAddress,
- filterString,
- transformerClassName,
- retryInterval,
- retryIntervalMultiplier,
- reconnectAttempts,
- useDuplicateDetection,
- confirmationWindowSize,
- clientFailureCheckPeriod,
- connectors,
- ha,
- user,
- password);
- }
+ config = new BridgeConfiguration(name,
+ queueName,
+ forwardingAddress,
+ filterString,
+ transformerClassName,
+ retryInterval,
+ retryIntervalMultiplier,
+ reconnectAttempts,
+ useDuplicateDetection,
+ confirmationWindowSize,
+ clientFailureCheckPeriod,
+
configuration.getDiscoveryGroupConfigurations().get(discoveryGroupName),
+ ha,
+ user,
+ password);
+
server.deployBridge(config);
}
finally
Modified:
branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/BroadcastGroup.java
===================================================================
---
branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/BroadcastGroup.java 2011-02-02
13:09:22 UTC (rev 10173)
+++
branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/BroadcastGroup.java 2011-02-02
13:44:15 UTC (rev 10174)
@@ -13,6 +13,8 @@
package org.hornetq.core.server.cluster;
+import java.util.concurrent.ScheduledExecutorService;
+
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.server.management.NotificationService;
@@ -42,4 +44,6 @@
void broadcastConnectors() throws Exception;
void activate();
+
+ void schedule(ScheduledExecutorService scheduler);
}
Modified:
branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java
===================================================================
---
branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java 2011-02-02
13:09:22 UTC (rev 10173)
+++
branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java 2011-02-02
13:44:15 UTC (rev 10174)
@@ -18,13 +18,18 @@
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.core.config.BroadcastGroupConfiguration;
+import org.hornetq.core.config.BroadcastGroupConstants;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.cluster.BroadcastGroup;
import org.hornetq.core.server.management.Notification;
@@ -48,14 +53,8 @@
private final String name;
- private final InetAddress localAddress;
-
- private final int localPort;
-
- private final InetAddress groupAddress;
-
- private final int groupPort;
-
+ private final BroadcastGroupConfiguration broadcastGroupConfiguration;
+
private DatagramSocket socket;
private final List<TransportConfiguration> connectors = new
ArrayList<TransportConfiguration>();
@@ -77,26 +76,17 @@
*/
public BroadcastGroupImpl(final String nodeID,
final String name,
- final InetAddress localAddress,
- final int localPort,
- final InetAddress groupAddress,
- final int groupPort,
- final boolean active) throws Exception
+ final boolean active,
+ final BroadcastGroupConfiguration config) throws Exception
{
this.nodeID = nodeID;
this.name = name;
- this.localAddress = localAddress;
-
- this.localPort = localPort;
-
- this.groupAddress = groupAddress;
-
- this.groupPort = groupPort;
-
this.active = active;
+ this.broadcastGroupConfiguration = config;
+
uniqueID = UUIDGenerator.getInstance().generateStringUUID();
}
@@ -112,6 +102,11 @@
return;
}
+ Map<String,Object> params = this.broadcastGroupConfiguration.getParams();
+ int localPort =
Integer.parseInt((String)params.get(BroadcastGroupConstants.LOCAL_BIND_PORT_NAME));
+ String localAddr =
(String)params.get(BroadcastGroupConstants.LOCAL_BIND_ADDRESS_NAME);
+ InetAddress localAddress = InetAddress.getByName(localAddr);
+
if (localPort != -1)
{
socket = new DatagramSocket(localPort, localAddress);
@@ -222,6 +217,11 @@
byte[] data = buff.toByteBuffer().array();
+ Map<String,Object> params = broadcastGroupConfiguration.getParams();
+ int groupPort =
Integer.parseInt((String)params.get(BroadcastGroupConstants.GROUP_PORT_NAME));
+ String groupAddr = (String)params.get(BroadcastGroupConstants.GROUP_ADDRESS_NAME);
+ InetAddress groupAddress = InetAddress.getByName(groupAddr);
+
DatagramPacket packet = new DatagramPacket(data, data.length, groupAddress,
groupPort);
socket.send(packet);
@@ -244,9 +244,13 @@
}
}
- public synchronized void setScheduledFuture(final ScheduledFuture<?> future)
+ public void schedule(ScheduledExecutorService scheduler)
{
- this.future = future;
+ Map<String,Object> params = broadcastGroupConfiguration.getParams();
+
+ this.future = scheduler.scheduleWithFixedDelay(this,
+ 0L,
+
Long.parseLong((String)params.get(BroadcastGroupConstants.BROADCAST_PERIOD_NAME)),
+ TimeUnit.MILLISECONDS);
}
-
}
Modified:
branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-02-02
13:09:22 UTC (rev 10173)
+++
branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-02-02
13:44:15 UTC (rev 10174)
@@ -21,6 +21,7 @@
import java.util.concurrent.ScheduledExecutorService;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.DiscoveryGroupConstants;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
@@ -128,8 +129,7 @@
final boolean backup,
final String clusterUser,
final String clusterPassword,
- final boolean allowableConnectionsOnly,
- final TransportConfiguration[] allowableConnections)
throws Exception
+ final boolean allowableConnectionsOnly) throws Exception
{
if (nodeUUID == null)
@@ -306,6 +306,14 @@
props);
managementService.sendNotification(notification);
}
+
+ if(this.allowableConnectionsOnly)
+ {
+ Map<String,Object> params = discoveryGroupConfiguration.getParams();
+ TransportConfiguration[] sc =
(TransportConfiguration[])params.get(DiscoveryGroupConstants.STATIC_CONNECTORS_LIST_NAME);
+ List<TransportConfiguration> staticConnectors =
java.util.Arrays.asList(sc);
+ this.allowableConnections.addAll(staticConnectors);
+ }
}
public TransportConfiguration getConnector()
Modified:
branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-02-02
13:09:22 UTC (rev 10173)
+++
branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-02-02
13:44:15 UTC (rev 10174)
@@ -16,6 +16,7 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import java.lang.reflect.Array;
+import java.lang.reflect.Constructor;
import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.ScheduledExecutorService;
@@ -23,6 +24,7 @@
import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.DiscoveryGroupConstants;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
@@ -498,43 +500,23 @@
return;
}
- InetAddress localAddress = null;
- if (config.getLocalBindAddress() != null)
- {
- localAddress = InetAddress.getByName(config.getLocalBindAddress());
- }
+ String className = config.getBroadcastGroupClassName();
- InetAddress groupAddress = InetAddress.getByName(config.getGroupAddress());
-
- BroadcastGroupImpl group = new BroadcastGroupImpl(nodeUUID.toString(),
- config.getName(),
- localAddress,
- config.getLocalBindPort(),
- groupAddress,
- config.getGroupPort(),
- !backup);
-
- for (String connectorInfo : config.getConnectorInfos())
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ Class<?> clazz = loader.loadClass(className);
+ Constructor<?> constructor = clazz.getConstructor(String.class, String.class,
Boolean.class, DiscoveryGroupConfiguration.class);
+ BroadcastGroup group = (BroadcastGroup)constructor.newInstance(nodeUUID.toString(),
config.getName(), !backup, config);
+
+ if (group.size() == 0)
{
- TransportConfiguration connector =
configuration.getConnectorConfigurations().get(connectorInfo);
-
- if (connector == null)
- {
- logWarnNoConnector(config.getName(), connectorInfo);
-
- return;
- }
-
- group.addConnector(connector);
+ ClusterManagerImpl.log.warn("There is no connector deployed for the
broadcast group with name '" +
+ group.getName() +
+ "'. That will not be deployed.");
+ return;
}
- ScheduledFuture<?> future = scheduledExecutor.scheduleWithFixedDelay(group,
- 0L,
-
config.getBroadcastPeriod(),
-
MILLISECONDS);
-
- group.setScheduledFuture(future);
-
+ group.schedule(scheduledExecutor);
+
broadcastGroups.put(config.getName(), group);
managementService.registerBroadcastGroup(group, config);
@@ -545,37 +527,6 @@
}
}
- private void logWarnNoConnector(final String connectorName, final String bgName)
- {
- ClusterManagerImpl.log.warn("There is no connector deployed with name
'" + connectorName +
- "'. The broadcast group with name '"
+
- bgName +
- "' will not be deployed.");
- }
-
- private TransportConfiguration[] connectorNameListToArray(final List<String>
connectorNames)
- {
- TransportConfiguration[] tcConfigs =
(TransportConfiguration[])Array.newInstance(TransportConfiguration.class,
-
connectorNames.size());
- int count = 0;
- for (String connectorName : connectorNames)
- {
- TransportConfiguration connector =
configuration.getConnectorConfigurations().get(connectorName);
-
- if (connector == null)
- {
- ClusterManagerImpl.log.warn("No connector defined with name '"
+ connectorName +
- "'. The bridge will not be
deployed.");
-
- return null;
- }
-
- tcConfigs[count++] = connector;
- }
-
- return tcConfigs;
- }
-
public synchronized void deployBridge(final BridgeConfiguration config) throws
Exception
{
if (config.getName() == null)
@@ -621,16 +572,8 @@
ServerLocatorInternal serverLocator;
- DiscoveryGroupConfiguration discoveryGroupConfiguration =
configuration.getDiscoveryGroupConfigurations()
-
.get(config.getDiscoveryGroupName());
- if (discoveryGroupConfiguration == null)
- {
- ClusterManagerImpl.log.warn("No discovery group configured with name
'" + config.getDiscoveryGroupName() +
- "'. The bridge will not be
deployed.");
+ DiscoveryGroupConfiguration discoveryGroupConfiguration =
config.getDiscoveryGroupConfiguration();
- return;
- }
-
if (config.isHA())
{
serverLocator =
(ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(discoveryGroupConfiguration);
@@ -717,20 +660,14 @@
ClusterConnectionImpl clusterConnection;
DiscoveryGroupConfiguration dg = configuration.getDiscoveryGroupConfigurations()
-
.get(config.getDiscoveryGroupName());
+
.get(config.getDiscoveryGroupConfiguration().getName());
if (dg == null)
{
- ClusterManagerImpl.log.warn("No discovery group with name '" +
config.getDiscoveryGroupName() +
+ ClusterManagerImpl.log.warn("No discovery group with name '" +
config.getDiscoveryGroupConfiguration().getName() +
"'. The cluster connection will not be
deployed.");
}
- List<String> connectorNames = config.getAllowableConnectors();
- TransportConfiguration[] allowableConnections = null;
- if(connectorNames != null)
- {
- allowableConnections = connectorNameListToArray(connectorNames);
- }
clusterConnection = new ClusterConnectionImpl(dg,
connector,
new SimpleString(config.getName()),
@@ -749,8 +686,7 @@
backup,
server.getConfiguration().getClusterUser(),
server.getConfiguration().getClusterPassword(),
- config.isAllowableConnectionsOnly(),
- allowableConnections);
+
config.isAllowableConnectionsOnly());
managementService.registerCluster(clusterConnection, config);
@@ -767,11 +703,11 @@
private void announceBackup(final ClusterConnectionConfiguration config, final
TransportConfiguration connector) throws Exception
{
DiscoveryGroupConfiguration dg = configuration.getDiscoveryGroupConfigurations()
-
.get(config.getDiscoveryGroupName());
+
.get(config.getDiscoveryGroupConfiguration().getName());
if (dg == null)
{
- ClusterManagerImpl.log.warn("No discovery group with name '" +
config.getDiscoveryGroupName() +
+ ClusterManagerImpl.log.warn("No discovery group with name '" +
config.getDiscoveryGroupConfiguration().getName() +
"'. The cluster connection will not be
deployed.");
}
Modified: branches/HORNETQ-316/src/main/org/hornetq/ra/HornetQResourceAdapter.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2011-02-02
13:09:22 UTC (rev 10173)
+++ branches/HORNETQ-316/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2011-02-02
13:44:15 UTC (rev 10174)
@@ -30,6 +30,7 @@
import javax.transaction.xa.XAResource;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.DiscoveryGroupConstants;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSession;
@@ -37,7 +38,6 @@
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.api.jms.JMSFactoryType;
-import org.hornetq.core.client.impl.DiscoveryGroupConstants;
import org.hornetq.core.client.impl.SimpleUDPServerLocatorImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.jms.client.HornetQConnectionFactory;