Author: timfox
Date: 2010-07-01 12:14:07 -0400 (Thu, 01 Jul 2010)
New Revision: 9377
Added:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/HornetQClient.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/jms/HornetQJMSClient.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/jms/management/JMSServerControl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/JMSServerManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/ra/ConnectionFactoryProperties.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/ra/HornetQRAMCFProperties.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/ra/HornetQRAManagedConnectionFactory.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/ra/HornetQResourceAdapter.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/ra/inflow/HornetQActivation.java
Log:
HA improvements
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/HornetQClient.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/HornetQClient.java 2010-06-30
17:22:16 UTC (rev 9376)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/HornetQClient.java 2010-07-01
16:14:07 UTC (rev 9377)
@@ -93,6 +93,8 @@
public static final int DEFAULT_INITIAL_MESSAGE_PACKET_SIZE = 1500;
+ public static final boolean DEFAULT_HA = false;
+
/**
* Create a ServerLocator which creates session factories using a static list of
transportConfigurations, the ServerLocator is not updated automatically
* as the cluster topology changes, and no HA backup information is propagated to the
client
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/jms/HornetQJMSClient.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/jms/HornetQJMSClient.java 2010-06-30
17:22:16 UTC (rev 9376)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/jms/HornetQJMSClient.java 2010-07-01
16:14:07 UTC (rev 9377)
@@ -79,9 +79,9 @@
* @param transportConfigurations
* @return the HornetQConnectionFactory
*/
- public static HornetQConnectionFactory createConnectionFactoryWithoutHA(final
TransportConfiguration... initialServers)
+ public static HornetQConnectionFactory createConnectionFactoryWithoutHA(final
TransportConfiguration... transportConfigurations)
{
- return new HornetQConnectionFactory(false, initialServers);
+ return new HornetQConnectionFactory(false, transportConfigurations);
}
/**
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/jms/management/JMSServerControl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/jms/management/JMSServerControl.java 2010-06-30
17:22:16 UTC (rev 9376)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/jms/management/JMSServerControl.java 2010-07-01
16:14:07 UTC (rev 9377)
@@ -64,8 +64,8 @@
*
* @return {@code true} if the queue was created, {@code false} else
*/
- @Operation(desc = "Create a JMS Queue", impact =
MBeanOperationInfo.ACTION)
- boolean createQueue(@Parameter(name = "name", desc = "Name of the
queue to create") String name) throws Exception;
+ @Operation(desc = "Create a JMS Queue", impact = MBeanOperationInfo.ACTION)
+ boolean createQueue(@Parameter(name = "name", desc = "Name of the queue
to create") String name) throws Exception;
/**
* Creates a JMS Queue with the specified name and JNDI binding.
@@ -81,12 +81,11 @@
*
* @return {@code true} if the queue was created, {@code false} else
*/
- @Operation(desc = "Create a JMS Queue", impact =
MBeanOperationInfo.ACTION)
- boolean createQueue(@Parameter(name = "name", desc = "Name of the
queue to create") String name,
- @Parameter(name = "jndiBindings", desc =
"comma-separated list of JNDI bindings (use ',' if u need to use
commas in your jndi name)") String jndiBindings,
- @Parameter(name = "selector", desc = "the jms
selector") String selector) throws Exception;
+ @Operation(desc = "Create a JMS Queue", impact = MBeanOperationInfo.ACTION)
+ boolean createQueue(@Parameter(name = "name", desc = "Name of the queue
to create") String name,
+ @Parameter(name = "jndiBindings", desc =
"comma-separated list of JNDI bindings (use ',' if u need to use
commas in your jndi name)") String jndiBindings,
+ @Parameter(name = "selector", desc = "the jms
selector") String selector) throws Exception;
-
/**
* Destroys a JMS Queue with the specified name.
*
@@ -125,41 +124,40 @@
* <br>
* The ConnectionFactory is bound to JNDI for all the specified bindings.
*/
- void createConnectionFactory(final String name,
- final String liveTransportClassName,
- final Map<String, Object>
liveTransportParams,
- final Object[] jndiBindings) throws Exception;
+ void createConnectionFactory(String name,
+ boolean ha,
+ String liveTransportClassName,
+ Map<String, Object> liveTransportParams,
+ Object[] jndiBindings) throws Exception;
+
/**
* Create a JMS ConnectionFactory with the specified name connected to a static list
of live-backup servers.
* <br>
* The ConnectionFactory is bound to JNDI for all the specified bindings Strings.
* <br>
- * {@code liveConnectorsTransportClassNames} (resp. {@code
backupConnectorsTransportClassNames}) are the class names
- * of the {@link ConnectorFactory} to connect to the live (resp. backup) servers
- * and {@code liveConnectorTransportParams} (resp. backupConnectorTransportParams) are
Map<String, Object> for the corresponding {@link
TransportConfiguration}'s parameters.
+ * {@code liveConnectorsTransportClassNames} are the class names
+ * of the {@link ConnectorFactory} to connect to the live servers
+ * and {@code liveConnectorTransportParams} are Map<String, Object> for
the corresponding {@link TransportConfiguration}'s parameters.
*
* @see ClientSessionFactory#setStaticConnectors(java.util.List)
*/
void createConnectionFactory(String name,
+ boolean ha,
Object[] liveConnectorsTransportClassNames,
- Object[] liveConnectorTransportParams,
- Object[] backupConnectorsTransportClassNames,
- Object[] backupConnectorTransportParams,
+ Object[] liveConnectorTransportParams,
Object[] bindings) throws Exception;
/**
* Create a JMS ConnectionFactory with the specified name connected to a single
live-backup pair of servers.
* <br>
* The ConnectionFactory is bound to JNDI for all the specified bindings Strings.
- * <br>
- * {@code backupTransportClassNames} and {@code backupTransportParams} can be {@code
null} if there is no backup server.
+ *
*/
@Operation(desc = "Create a JMS ConnectionFactory", impact =
MBeanOperationInfo.ACTION)
void createConnectionFactory(@Parameter(name = "name") String name,
+ @Parameter(name = "ha") boolean ha,
@Parameter(name = "liveTransportClassNames",
desc = "comma-separated list of class names for transport to live servers")
String liveTransportClassNames,
@Parameter(name = "liveTransportParams", desc =
"comma-separated list of key=value parameters for the live transports (enclosed
between { } for each transport)") String liveTransportParams,
- @Parameter(name = "backupTransportClassNames",
desc = "comma-separated list of class names for transport to backup servers")
String backupTransportClassNames,
- @Parameter(name = "backupTransportParams", desc
= "comma-separated list of key=value parameters for the backup transports (enclosed
between { } for each transport)") String backupTransportParams,
@Parameter(name = "jndiBindings", desc =
"comma-separated list of JNDI bindings (use ',' if u need to use
commas in your jndi name)") String jndiBindings) throws Exception;
/**
@@ -171,10 +169,7 @@
*
* @see #createConnectionFactory(String, Object[], Object[], Object[], Object[])
*/
- void createConnectionFactory(String name,
- String discoveryAddress,
- int discoveryPort,
- Object[] bindings) throws Exception;
+ void createConnectionFactory(String name, boolean ha, String discoveryAddress, int
discoveryPort, Object[] bindings) throws Exception;
/**
* Create a JMS ConnectionFactory with the specified name using a discovery group to
discover HornetQ servers.
@@ -187,6 +182,7 @@
*/
@Operation(desc = "Create a JMS ConnectionFactory", impact =
MBeanOperationInfo.ACTION)
void createConnectionFactory(@Parameter(name = "name") String name,
+ @Parameter(name = "ha") boolean ha,
@Parameter(name = "discoveryAddress") String
discoveryAddress,
@Parameter(name = "discoveryPort") int
discoveryPort,
@Parameter(name = "jndiBindings") String
jndiBindings) throws Exception;
@@ -227,5 +223,4 @@
@Operation(desc = "List the sessions for the given connectionID", impact =
MBeanOperationInfo.INFO)
String[] listSessions(@Parameter(desc = "a connection ID", name =
"connectionID") String connectionID) throws Exception;
-
}
Added:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
(rev 0)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java 2010-07-01
16:14:07 UTC (rev 9377)
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ */
+public class NodeAnnounceMessage extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(NodeAnnounceMessage.class);
+
+ // Attributes ----------------------------------------------------
+
+ private String nodeID;
+
+ private boolean backup;
+
+ private TransportConfiguration connector;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public NodeAnnounceMessage(final String nodeID, final boolean backup, final
TransportConfiguration tc)
+ {
+ super(PacketImpl.NODE_ANNOUNCE);
+
+ this.nodeID = nodeID;
+
+ this.backup = backup;
+
+ this.connector = tc;
+ }
+
+ public NodeAnnounceMessage()
+ {
+ super(PacketImpl.NODE_ANNOUNCE);
+ }
+
+ // Public --------------------------------------------------------
+
+
+ public String getNodeID()
+ {
+ return nodeID;
+ }
+
+ public boolean isBackup()
+ {
+ return backup;
+ }
+
+ public TransportConfiguration getConnector()
+ {
+ return connector;
+ }
+
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeString(nodeID);
+ buffer.writeBoolean(backup);
+ connector.encode(buffer);
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ this.nodeID = buffer.readString();
+ this.backup = buffer.readBoolean();
+ connector = new TransportConfiguration();
+ connector.decode(buffer);
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-06-30
17:22:16 UTC (rev 9376)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-07-01
16:14:07 UTC (rev 9377)
@@ -208,7 +208,7 @@
private boolean initialised;
- private FailoverManager replicationFailoverManager;
+ // private FailoverManager replicationFailoverManager;
private ReplicationManager replicationManager;
@@ -1153,26 +1153,26 @@
* Protected so tests can change this behaviour
* @param backupConnector
*/
- protected FailoverManagerImpl createBackupConnectionFailoverManager(final
TransportConfiguration backupConnector,
- final
ExecutorService threadPool,
- final
ScheduledExecutorService scheduledPool)
- {
- return new FailoverManagerImpl((ClientSessionFactory)null,
- backupConnector,
- null,
- false,
- HornetQClient.DEFAULT_CALL_TIMEOUT,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- HornetQClient.DEFAULT_CONNECTION_TTL,
- 0,
- 1.0d,
- 0,
- 1,
- false,
- threadPool,
- scheduledPool,
- null);
- }
+// protected FailoverManagerImpl createBackupConnectionFailoverManager(final
TransportConfiguration backupConnector,
+// final
ExecutorService threadPool,
+// final
ScheduledExecutorService scheduledPool)
+// {
+// return new FailoverManagerImpl((ClientSessionFactory)null,
+// backupConnector,
+// null,
+// false,
+// HornetQClient.DEFAULT_CALL_TIMEOUT,
+//
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+// HornetQClient.DEFAULT_CONNECTION_TTL,
+// 0,
+// 1.0d,
+// 0,
+// 1,
+// false,
+// threadPool,
+// scheduledPool,
+// null);
+// }
protected PagingManager createPagingManager()
{
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2010-06-30
17:22:16 UTC (rev 9376)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2010-07-01
16:14:07 UTC (rev 9377)
@@ -569,7 +569,12 @@
{
serverLocator.close();
}
-
+
+ public ServerLocator getServerLocator()
+ {
+ return serverLocator;
+ }
+
// Package protected
----------------------------------------------------------------------------
// Protected
------------------------------------------------------------------------------------
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2010-06-30
17:22:16 UTC (rev 9376)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2010-07-01
16:14:07 UTC (rev 9377)
@@ -28,7 +28,6 @@
import javax.management.NotificationListener;
import javax.management.StandardMBean;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.Parameter;
@@ -87,12 +86,10 @@
return trimmed;
}
- private static List<Pair<TransportConfiguration, TransportConfiguration>>
convertToConnectorPairs(final Object[] liveConnectorsTransportClassNames,
-
final Object[] liveConnectorTransportParams,
-
final Object[] backupConnectorsTransportClassNames,
-
final Object[] backupConnectorTransportParams)
-
{
- List<Pair<TransportConfiguration, TransportConfiguration>> pairs = new
ArrayList<Pair<TransportConfiguration, TransportConfiguration>>();
+ private static List<TransportConfiguration>
convertToTransportConfigurationArray(final Object[] liveConnectorsTransportClassNames,
+ final
Object[] liveConnectorTransportParams)
+ {
+ List<TransportConfiguration> tcs = new
ArrayList<TransportConfiguration>();
for (int i = 0; i < liveConnectorsTransportClassNames.length; i++)
{
@@ -105,25 +102,11 @@
TransportConfiguration tcLive = new
TransportConfiguration(liveConnectorsTransportClassNames[i].toString(),
liveParams);
- Map<String, Object> backupParams = null;
- if (backupConnectorTransportParams.length > i)
- {
- backupParams = (Map<String, Object>)backupConnectorTransportParams[i];
- }
-
- TransportConfiguration tcBackup = null;
- if (backupConnectorsTransportClassNames.length > i)
- {
- new TransportConfiguration(backupConnectorsTransportClassNames[i].toString(),
backupParams);
- }
- Pair<TransportConfiguration, TransportConfiguration> pair = new
Pair<TransportConfiguration, TransportConfiguration>(tcLive,
- tcBackup);
-
- pairs.add(pair);
+ tcs.add(tcLive);
}
- return pairs;
-
}
+ return tcs;
+ }
public static MBeanNotificationInfo[] getNotificationInfos()
{
@@ -135,7 +118,7 @@
}
return new MBeanNotificationInfo[] { new MBeanNotificationInfo(names,
JMSServerControl.class.getName(),
- "Notifications emitted by a JMS Server") };
+ "Notifications
emitted by a JMS Server") };
}
// Constructors --------------------------------------------------
@@ -151,10 +134,11 @@
// JMSServerControlMBean implementation --------------------------
public void createConnectionFactory(final String name,
+ final boolean ha,
final String liveTransportClassName,
final Map<String, Object>
liveTransportParams,
final Object[] jndiBindings) throws Exception
- {
+ {
checkStarted();
clearIO();
@@ -162,8 +146,12 @@
try
{
TransportConfiguration liveTC = new
TransportConfiguration(liveTransportClassName, liveTransportParams);
+
+ List<TransportConfiguration> list = new
ArrayList<TransportConfiguration>(1);
+
+ list.add(liveTC);
- server.createConnectionFactory(name, liveTC,
JMSServerControlImpl.convert(jndiBindings));
+ server.createConnectionFactory(name, ha, list,
JMSServerControlImpl.convert(jndiBindings));
sendNotification(NotificationType.CONNECTION_FACTORY_CREATED, name);
}
@@ -171,26 +159,23 @@
{
blockOnIO();
}
- }
+ }
public void createConnectionFactory(final String name,
+ final boolean ha,
final Object[] liveConnectorsTransportClassNames,
final Object[] liveConnectorTransportParams,
- final Object[]
backupConnectorsTransportClassNames,
- final Object[] backupConnectorTransportParams,
final Object[] jndiBindings) throws Exception
- {
+ {
checkStarted();
clearIO();
try
{
- List<Pair<TransportConfiguration, TransportConfiguration>> pairs =
JMSServerControlImpl.convertToConnectorPairs(liveConnectorsTransportClassNames,
-
liveConnectorTransportParams,
-
backupConnectorsTransportClassNames,
-
backupConnectorTransportParams);
- server.createConnectionFactory(name, pairs,
JMSServerControlImpl.convert(jndiBindings));
+ List<TransportConfiguration> pairs =
JMSServerControlImpl.convertToTransportConfigurationArray(liveConnectorsTransportClassNames,
+
liveConnectorTransportParams);
+ server.createConnectionFactory(name, ha, pairs,
JMSServerControlImpl.convert(jndiBindings));
sendNotification(NotificationType.CONNECTION_FACTORY_CREATED, name);
}
@@ -198,15 +183,14 @@
{
blockOnIO();
}
- }
+ }
public void createConnectionFactory(final String name,
+ final boolean ha,
final String liveTransportClassNames,
- final String liveTransportParams,
- final String backupTransportClassNames,
- final String backupTransportParams,
+ final String liveTransportParams,
final String jndiBindings) throws Exception
- {
+ {
checkStarted();
clearIO();
@@ -214,32 +198,33 @@
try
{
Object[] liveClassNames =
JMSServerControlImpl.toArray(liveTransportClassNames);
- Object[] liveParams =
ManagementHelper.fromCommaSeparatedArrayOfCommaSeparatedKeyValues(liveTransportParams);
- Object[] backupClassNames =
JMSServerControlImpl.toArray(backupTransportClassNames);
- Object[] backupParams =
ManagementHelper.fromCommaSeparatedArrayOfCommaSeparatedKeyValues(backupTransportParams);;
+ Object[] liveParams =
ManagementHelper.fromCommaSeparatedArrayOfCommaSeparatedKeyValues(liveTransportParams);
Object[] bindings = JMSServerControlImpl.toArray(jndiBindings);
- createConnectionFactory(name, liveClassNames, liveParams, backupClassNames,
backupParams, bindings);
+ createConnectionFactory(name, ha, liveClassNames, liveParams, bindings);
}
finally
{
blockOnIO();
}
- }
+ }
-
-
public void createConnectionFactory(final String name,
+ final boolean ha,
final String discoveryAddress,
final int discoveryPort,
final Object[] jndiBindings) throws Exception
- {
+ {
checkStarted();
clearIO();
try
{
- server.createConnectionFactory(name, discoveryAddress, discoveryPort,
JMSServerControlImpl.convert(jndiBindings));
+ server.createConnectionFactory(name,
+ ha,
+ discoveryAddress,
+ discoveryPort,
+ JMSServerControlImpl.convert(jndiBindings));
sendNotification(NotificationType.CONNECTION_FACTORY_CREATED, name);
}
@@ -247,13 +232,14 @@
{
blockOnIO();
}
- }
+ }
public void createConnectionFactory(final String name,
+ final boolean ha,
final String discoveryAddress,
final int discoveryPort,
final String jndiBindings) throws Exception
- {
+ {
checkStarted();
clearIO();
@@ -262,29 +248,28 @@
{
Object[] bindings = JMSServerControlImpl.toArray(jndiBindings);
- createConnectionFactory(name, discoveryAddress, discoveryPort, bindings);
+ createConnectionFactory(name, ha, discoveryAddress, discoveryPort, bindings);
}
finally
{
blockOnIO();
}
- }
+ }
-
public boolean createQueue(@Parameter(name = "name", desc = "Name of
the queue to create") String name) throws Exception
{
return createQueue(name, null, null);
}
-
-
public boolean createQueue(final String name, final String jndiBindings) throws
Exception
{
return createQueue(name, jndiBindings, null);
}
- public boolean createQueue(@Parameter(name = "name", desc = "Name of
the queue to create") String name, @Parameter(name = "jndiBindings", desc =
"comma-separated list of JNDI bindings (use ',' if u need to use
commas in your jndi name)") String jndiBindings, @Parameter(name =
"selector", desc = "the jms selector") String selector) throws
Exception
+ public boolean createQueue(@Parameter(name = "name", desc = "Name of
the queue to create") String name,
+ @Parameter(name = "jndiBindings", desc =
"comma-separated list of JNDI bindings (use ',' if u need to use
commas in your jndi name)") String jndiBindings,
+ @Parameter(name = "selector", desc = "the
jms selector") String selector) throws Exception
{
checkStarted();
@@ -460,8 +445,8 @@
try
{
Object[] cfControls = server.getHornetQServer()
- .getManagementService()
- .getResources(ConnectionFactoryControl.class);
+ .getManagementService()
+ .getResources(ConnectionFactoryControl.class);
String[] names = new String[cfControls.length];
for (int i = 0; i < cfControls.length; i++)
{
@@ -481,9 +466,9 @@
public void removeNotificationListener(final NotificationListener listener,
final NotificationFilter filter,
final Object handback) throws
ListenerNotFoundException
- {
+ {
broadcaster.removeNotificationListener(listener, filter, handback);
- }
+ }
public void removeNotificationListener(final NotificationListener listener) throws
ListenerNotFoundException
{
@@ -493,9 +478,9 @@
public void addNotificationListener(final NotificationListener listener,
final NotificationFilter filter,
final Object handback) throws
IllegalArgumentException
- {
+ {
broadcaster.addNotificationListener(listener, filter, handback);
- }
+ }
public MBeanNotificationInfo[] getNotificationInfo()
{
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/JMSServerManager.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/JMSServerManager.java 2010-06-30
17:22:16 UTC (rev 9376)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/JMSServerManager.java 2010-07-01
16:14:07 UTC (rev 9377)
@@ -18,7 +18,6 @@
import javax.naming.Context;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.security.Role;
import org.hornetq.core.server.HornetQComponent;
@@ -158,41 +157,17 @@
*/
boolean destroyTopic(String name) throws Exception;
- void createConnectionFactory(String name, String discoveryAddress, int discoveryPort,
String ... jndiBindings) throws Exception;
+ void createConnectionFactory(String name, boolean ha, String discoveryAddress, int
discoveryPort, String ... jndiBindings) throws Exception;
void createConnectionFactory(String name,
- List<Pair<TransportConfiguration,
TransportConfiguration>> connectorConfigs,
+ boolean ha,
+ List<TransportConfiguration> connectorConfigs,
String ... jndiBindings) throws Exception;
void createConnectionFactory(String name,
- TransportConfiguration liveTC,
- TransportConfiguration backupTC,
- String ... jndiBindings) throws Exception;
-
- void createConnectionFactory(String name, TransportConfiguration liveTC, String ...
jndiBindings) throws Exception;
-
- void createConnectionFactory(String name,
+ boolean ha,
+ List<TransportConfiguration> connectorConfigs,
String clientID,
- String discoveryAddress,
- int discoveryPort,
- String ... jndiBindings) throws Exception;
-
- void createConnectionFactory(String name,
- String clientID,
- List<Pair<TransportConfiguration,
TransportConfiguration>> connectorConfigs,
- String ... jndiBindings) throws Exception;
-
- void createConnectionFactory(String name,
- String clientID,
- TransportConfiguration liveTC,
- TransportConfiguration backupTC,
- String ... jndiBindings) throws Exception;
-
- void createConnectionFactory(String name, String clientID, TransportConfiguration
liveTC, String ... jndiBindings) throws Exception;
-
- void createConnectionFactory(String name,
- List<Pair<TransportConfiguration,
TransportConfiguration>> connectorConfigs,
- String clientID,
long clientFailureCheckPeriod,
long connectionTTL,
long callTimeout,
@@ -224,6 +199,7 @@
String ... jndiBindings) throws Exception;
void createConnectionFactory(String name,
+ boolean ha,
String localBindAdress,
String discoveryAddress,
int discoveryPort,
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java 2010-06-30
17:22:16 UTC (rev 9376)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java 2010-07-01
16:14:07 UTC (rev 9377)
@@ -15,10 +15,8 @@
import java.util.List;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.journal.EncodingSupport;
-import org.hornetq.jms.server.JMSServerManager;
/**
* A ConnectionFactoryConfiguration
@@ -47,38 +45,14 @@
void setDiscoveryPort(int discoveryPort);
+ List<TransportConfiguration> getConnectorConfigs();
- /**
- * A Reference to the group configuration.
- */
- String getDiscoveryGroupName();
+ void setConnectorConfigs(List<TransportConfiguration> connectorConfigs);
- /**
- * A Reference to the group configuration.
- */
- void setDiscoveryGroupName(String groupName);
+ boolean isHA();
-
- /**
- * A List of connector names that will be converted into ConnnectorConfigs.
- * This is useful when using the method {@link
JMSServerManager#createConnectionFactory(ConnectionFactoryConfiguration)}
- *
- * @return
- */
- List<Pair<String, String>> getConnectorNames();
+ void setHA(boolean ha);
- /**
- * A List of connector names that will be converted into ConnnectorConfigs.
- * This is useful when using the method {@link
JMSServerManager#createConnectionFactory(ConnectionFactoryConfiguration)}
- *
- * @return
- */
- void setConnectorNames(List<Pair<String, String>> connectors);
-
- List<Pair<TransportConfiguration, TransportConfiguration>>
getConnectorConfigs();
-
- void setConnectorConfigs(List<Pair<TransportConfiguration,
TransportConfiguration>> connectorConfigs);
-
String getClientID();
void setClientID(String clientID);
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2010-06-30
17:22:16 UTC (rev 9376)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2010-07-01
16:14:07 UTC (rev 9377)
@@ -17,7 +17,6 @@
import java.util.List;
import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
@@ -41,20 +40,18 @@
private String[] bindings;
- private String discoveryGroupName;
-
private String localBindAddress;
private String discoveryAddress;
private int discoveryPort;
- private List<Pair<String, String>> connectorNames;
+ private List<TransportConfiguration> connectorConfigs;
- private List<Pair<TransportConfiguration, TransportConfiguration>>
connectorConfigs;
-
private String clientID = null;
+ private boolean ha = HornetQClient.DEFAULT_HA;
+
private long discoveryRefreshTimeout =
HornetQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT;
private long clientFailureCheckPeriod =
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
@@ -112,9 +109,9 @@
private boolean failoverOnInitialConnection =
HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION;
private boolean failoverOnServerShutdown =
HornetQClient.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN;
-
+
private String groupID = null;
-
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -123,58 +120,31 @@
public ConnectionFactoryConfigurationImpl()
{
}
-
+
public ConnectionFactoryConfigurationImpl(final String name,
+ final boolean ha,
final String discoveryAddress,
final int discoveryPort,
final String... bindings)
{
- this(name, bindings);
+ this(name, ha, bindings);
this.discoveryAddress = discoveryAddress;
this.discoveryPort = discoveryPort;
}
-
- public ConnectionFactoryConfigurationImpl(final String name,
- final String localBindAddress,
- final String discoveryAddress,
- final int discoveryPort,
- final String... bindings)
- {
- this(name, bindings);
- this.localBindAddress = localBindAddress;
- this.discoveryAddress = discoveryAddress;
- this.discoveryPort = discoveryPort;
- }
public ConnectionFactoryConfigurationImpl(final String name,
- final TransportConfiguration liveConfig,
+ final boolean ha,
+ final List<TransportConfiguration>
transportConfigs,
final String... bindings)
{
- this(name, liveConfig, null, bindings);
+ this(name, ha, bindings);
+ connectorConfigs = transportConfigs;
}
- public ConnectionFactoryConfigurationImpl(final String name,
- final TransportConfiguration liveConfig,
- final TransportConfiguration backupConfig,
- final String... bindings)
+ private ConnectionFactoryConfigurationImpl(final String name, final boolean ha, final
String... bindings)
{
- this(name, bindings);
- connectorConfigs = new ArrayList<Pair<TransportConfiguration,
TransportConfiguration>>();
- connectorConfigs.add(new Pair<TransportConfiguration,
TransportConfiguration>(liveConfig, backupConfig));
- }
-
- public ConnectionFactoryConfigurationImpl(final String name,
- final
List<Pair<TransportConfiguration, TransportConfiguration>> transportConfigs,
- final String... bindings)
- {
- this(name, bindings);
- connectorConfigs = new ArrayList<Pair<TransportConfiguration,
TransportConfiguration>>();
- connectorConfigs.addAll(transportConfigs);
- }
-
- public ConnectionFactoryConfigurationImpl(final String name, final String...
bindings)
- {
this.name = name;
+ this.ha = ha;
this.bindings = new String[bindings.length];
System.arraycopy(bindings, 0, this.bindings, 0, bindings.length);
}
@@ -195,7 +165,7 @@
{
return name;
}
-
+
public String getLocalBindAddress()
{
return localBindAddress;
@@ -225,17 +195,27 @@
{
this.discoveryPort = discoveryPort;
}
-
- public List<Pair<TransportConfiguration, TransportConfiguration>>
getConnectorConfigs()
+
+ public List<TransportConfiguration> getConnectorConfigs()
{
- return connectorConfigs;
+ return this.connectorConfigs;
}
- public void setConnectorConfigs(final List<Pair<TransportConfiguration,
TransportConfiguration>> connectorConfigs)
+ public void setConnectorConfigs(final List<TransportConfiguration>
connectorConfigs)
{
this.connectorConfigs = connectorConfigs;
}
+
+ public boolean isHA()
+ {
+ return ha;
+ }
+ public void setHA(boolean ha)
+ {
+ this.ha = ha;
+ }
+
public String getClientID()
{
return clientID;
@@ -515,7 +495,7 @@
{
this.reconnectAttempts = reconnectAttempts;
}
-
+
public boolean isFailoverOnInitialConnection()
{
return failoverOnInitialConnection;
@@ -546,70 +526,37 @@
this.groupID = groupID;
}
- /* (non-Javadoc)
- * @see
org.hornetq.jms.server.config.ConnectionFactoryConfiguration#getConnectorNames()
- */
- public List<Pair<String, String>> getConnectorNames()
- {
- return connectorNames;
- }
+ // Encoding Support Implementation
--------------------------------------------------------------
/* (non-Javadoc)
- * @see
org.hornetq.jms.server.config.ConnectionFactoryConfiguration#setConnectorNames(java.util.List)
- */
- public void setConnectorNames(List<Pair<String, String>> connectors)
- {
- this.connectorNames = connectors;
- }
-
- /* (non-Javadoc)
- * @see
org.hornetq.jms.server.config.ConnectionFactoryConfiguration#getDiscoveryGroupName()
- */
- public String getDiscoveryGroupName()
- {
- return discoveryGroupName;
- }
-
- /* (non-Javadoc)
- * @see
org.hornetq.jms.server.config.ConnectionFactoryConfiguration#setDiscoveryGroupName(java.lang.String)
- */
- public void setDiscoveryGroupName(String groupName)
- {
- this.discoveryGroupName = groupName;
- }
-
- // Encoding Support Implementation
--------------------------------------------------------------
-
- /* (non-Javadoc)
* @see
org.hornetq.core.journal.EncodingSupport#decode(org.hornetq.api.core.HornetQBuffer)
*/
public void decode(HornetQBuffer buffer)
{
name = buffer.readSimpleString().toString();
- discoveryGroupName = BufferHelper.readNullableSimpleStringAsString(buffer);
-
localBindAddress = BufferHelper.readNullableSimpleStringAsString(buffer);
-
+
discoveryAddress = BufferHelper.readNullableSimpleStringAsString(buffer);
discoveryPort = buffer.readInt();
int nConnectors = buffer.readInt();
- connectorNames = new ArrayList<Pair<String, String>>(nConnectors);
-
- for (int i = 0; i < nConnectors; i++)
+ if (nConnectors > 0)
{
- String a = BufferHelper.readNullableSimpleStringAsString(buffer);
-
- String b = BufferHelper.readNullableSimpleStringAsString(buffer);
-
- connectorNames.add(new Pair<String, String>(a, b));
- }
+ connectorConfigs = new ArrayList<TransportConfiguration>(nConnectors);
- connectorConfigs = TransportConfigurationEncodingSupport.decodeConfigs(buffer);
+ for (int i = 0; i < nConnectors; i++)
+ {
+ TransportConfiguration tc = new TransportConfiguration();
+ tc.decode(buffer);
+
+ connectorConfigs.add(tc);
+ }
+ }
+
clientID = BufferHelper.readNullableSimpleStringAsString(buffer);
discoveryRefreshTimeout = buffer.readLong();
@@ -665,7 +612,7 @@
maxRetryInterval = buffer.readLong();
reconnectAttempts = buffer.readInt();
-
+
failoverOnInitialConnection = buffer.readBoolean();
failoverOnServerShutdown = buffer.readBoolean();
@@ -680,27 +627,26 @@
{
BufferHelper.writeAsSimpleString(buffer, name);
- BufferHelper.writeAsNullableSimpleString(buffer, discoveryGroupName);
-
BufferHelper.writeAsNullableSimpleString(buffer, localBindAddress);
-
+
BufferHelper.writeAsNullableSimpleString(buffer, discoveryAddress);
buffer.writeInt(discoveryPort);
- buffer.writeInt(connectorNames == null ? 0 : connectorNames.size());
-
- if (connectorNames != null)
+ if (connectorConfigs == null)
{
- for (Pair<String, String> namePair : connectorNames)
+ buffer.writeInt(0);
+ }
+ else
+ {
+ buffer.writeInt(connectorConfigs.size());
+
+ for (TransportConfiguration tc : connectorConfigs)
{
- BufferHelper.writeAsNullableSimpleString(buffer, namePair.a);
- BufferHelper.writeAsNullableSimpleString(buffer, namePair.b);
+ tc.encode(buffer);
}
}
- TransportConfigurationEncodingSupport.encodeConfigs(buffer, connectorConfigs);
-
BufferHelper.writeAsNullableSimpleString(buffer, clientID);
buffer.writeLong(discoveryRefreshTimeout);
@@ -756,7 +702,7 @@
buffer.writeLong(maxRetryInterval);
buffer.writeInt(reconnectAttempts);
-
+
buffer.writeBoolean(failoverOnInitialConnection);
buffer.writeBoolean(failoverOnServerShutdown);
@@ -764,105 +710,119 @@
BufferHelper.writeAsNullableSimpleString(buffer, groupID);
}
-
- private int sizeOfConnectors()
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
+ */
+ public int getEncodeSize()
{
- int size = DataConstants.SIZE_INT; // for the number of connectors persisted
+ int size = BufferHelper.sizeOfSimpleString(name) +
- if (connectorNames != null)
+ BufferHelper.sizeOfNullableSimpleString(localBindAddress) +
+
+ BufferHelper.sizeOfNullableSimpleString(discoveryAddress) +
+
+ DataConstants.SIZE_INT; // discoveryPort
+
+ if (this.connectorConfigs != null)
{
- for (Pair<String, String> pair : connectorNames)
+ for (TransportConfiguration tc : this.connectorConfigs)
{
- size += BufferHelper.sizeOfNullableSimpleString(pair.a);
- size += BufferHelper.sizeOfNullableSimpleString(pair.b);
+ size += TransportConfigurationEncodingSupport.getEncodeSize(tc);
}
}
+ size += BufferHelper.sizeOfNullableSimpleString(clientID) +
- return size;
- }
+ DataConstants.SIZE_LONG +
+ // discoveryRefreshTimeout
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
- */
- public int getEncodeSize()
- {
- return BufferHelper.sizeOfSimpleString(name) +
-
- BufferHelper.sizeOfNullableSimpleString(discoveryGroupName) +
-
- BufferHelper.sizeOfNullableSimpleString(localBindAddress)+
-
- BufferHelper.sizeOfNullableSimpleString(discoveryAddress)+
-
- DataConstants.SIZE_INT + // discoveryPort
-
- sizeOfConnectors() +
-
- TransportConfigurationEncodingSupport.getEncodeSize(connectorConfigs) +
+ DataConstants.SIZE_LONG +
+ // clientFailureCheckPeriod
- BufferHelper.sizeOfNullableSimpleString(clientID) +
-
- DataConstants.SIZE_LONG + // discoveryRefreshTimeout
-
- DataConstants.SIZE_LONG + // clientFailureCheckPeriod
+ DataConstants.SIZE_LONG +
+ // connectionTTL
- DataConstants.SIZE_LONG + // connectionTTL
+ DataConstants.SIZE_LONG +
+ // callTimeout
- DataConstants.SIZE_LONG + // callTimeout
+ DataConstants.SIZE_BOOLEAN +
+ // cacheLargeMessagesClient
- DataConstants.SIZE_BOOLEAN + // cacheLargeMessagesClient
-
- DataConstants.SIZE_INT + // minLargeMessageSize
+ DataConstants.SIZE_INT +
+ // minLargeMessageSize
- DataConstants.SIZE_INT + // consumerWindowSize
+ DataConstants.SIZE_INT +
+ // consumerWindowSize
- DataConstants.SIZE_INT + // consumerMaxRate
+ DataConstants.SIZE_INT +
+ // consumerMaxRate
- DataConstants.SIZE_INT + // confirmationWindowSize
+ DataConstants.SIZE_INT +
+ // confirmationWindowSize
- DataConstants.SIZE_INT + // producerWindowSize
+ DataConstants.SIZE_INT +
+ // producerWindowSize
- DataConstants.SIZE_INT + // producerMaxRate
+ DataConstants.SIZE_INT +
+ // producerMaxRate
- DataConstants.SIZE_BOOLEAN + // blockOnAcknowledge
+ DataConstants.SIZE_BOOLEAN +
+ // blockOnAcknowledge
- DataConstants.SIZE_BOOLEAN + // blockOnDurableSend
+ DataConstants.SIZE_BOOLEAN +
+ // blockOnDurableSend
- DataConstants.SIZE_BOOLEAN + // blockOnNonDurableSend
+ DataConstants.SIZE_BOOLEAN +
+ // blockOnNonDurableSend
- DataConstants.SIZE_BOOLEAN + // autoGroup
+ DataConstants.SIZE_BOOLEAN +
+ // autoGroup
- DataConstants.SIZE_BOOLEAN + // preAcknowledge
+ DataConstants.SIZE_BOOLEAN +
+ // preAcknowledge
- BufferHelper.sizeOfSimpleString(loadBalancingPolicyClassName) +
+ BufferHelper.sizeOfSimpleString(loadBalancingPolicyClassName) +
- DataConstants.SIZE_INT + // transactionBatchSize
+ DataConstants.SIZE_INT +
+ // transactionBatchSize
- DataConstants.SIZE_INT + // dupsOKBatchSize
+ DataConstants.SIZE_INT +
+ // dupsOKBatchSize
- DataConstants.SIZE_LONG + // initialWaitTimeout
+ DataConstants.SIZE_LONG +
+ // initialWaitTimeout
- DataConstants.SIZE_BOOLEAN + // useGlobalPools
+ DataConstants.SIZE_BOOLEAN +
+ // useGlobalPools
- DataConstants.SIZE_INT + // scheduledThreadPoolMaxSize
+ DataConstants.SIZE_INT +
+ // scheduledThreadPoolMaxSize
- DataConstants.SIZE_INT + // threadPoolMaxSize
+ DataConstants.SIZE_INT +
+ // threadPoolMaxSize
- DataConstants.SIZE_LONG + // retryInterval
+ DataConstants.SIZE_LONG +
+ // retryInterval
- DataConstants.SIZE_DOUBLE + // retryIntervalMultiplier
+ DataConstants.SIZE_DOUBLE +
+ // retryIntervalMultiplier
- DataConstants.SIZE_LONG + // maxRetryInterval
+ DataConstants.SIZE_LONG +
+ // maxRetryInterval
- DataConstants.SIZE_INT + // reconnectAttempts
-
- DataConstants.SIZE_BOOLEAN + // failoverOnInitialConnection
+ DataConstants.SIZE_INT +
+ // reconnectAttempts
- DataConstants.SIZE_BOOLEAN + // failoverOnServerShutdown
-
- BufferHelper.sizeOfNullableSimpleString(groupID);
+ DataConstants.SIZE_BOOLEAN +
+ // failoverOnInitialConnection
+
+ DataConstants.SIZE_BOOLEAN +
+ // failoverOnServerShutdown
+
+ BufferHelper.sizeOfNullableSimpleString(groupID);
+
+ return size;
}
-
+
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java 2010-06-30
17:22:16 UTC (rev 9376)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java 2010-07-01
16:14:07 UTC (rev 9377)
@@ -20,8 +20,10 @@
import java.util.List;
import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.DiscoveryGroupConfiguration;
import org.hornetq.core.config.impl.Validators;
import org.hornetq.core.logging.Logger;
import org.hornetq.jms.server.JMSServerConfigParser;
@@ -57,12 +59,19 @@
// Attributes ----------------------------------------------------
+ private final Configuration config;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
+ public JMSServerConfigParserImpl(final Configuration config)
+ {
+ this.config = config;
+ }
+
/**
* Parse the JMS Configuration XML as a JMSConfiguration object
*/
@@ -188,6 +197,32 @@
return newQueue(queueName, selectorString, durable, jndiArray);
}
+ // private void lookupDiscovery(final String discoveryGroupName, final
ConnectionFactoryConfiguration cfConfig)
+ // throws HornetQException
+ // {
+ // Configuration configuration = server.getConfiguration();
+ //
+ // DiscoveryGroupConfiguration discoveryGroupConfiguration =
configuration.getDiscoveryGroupConfigurations()
+ // .get(cfConfig.getDiscoveryGroupName());
+ //
+ // if (discoveryGroupConfiguration == null)
+ // {
+ // JMSServerManagerImpl.log.warn("There is no discovery group with name
'" + cfConfig.getDiscoveryGroupName() +
+ // "' deployed.");
+ //
+ // throw new HornetQException(HornetQException.ILLEGAL_STATE,
+ // "There is no discovery group with name '" +
cfConfig.getDiscoveryGroupName() +
+ // "' deployed.");
+ // }
+ //
+ // cfConfig.setLocalBindAddress(discoveryGroupConfiguration.getLocalBindAddress());
+ // cfConfig.setDiscoveryAddress(discoveryGroupConfiguration.getGroupAddress());
+ // cfConfig.setDiscoveryPort(discoveryGroupConfiguration.getGroupPort());
+ //
cfConfig.setDiscoveryRefreshTimeout(discoveryGroupConfiguration.getRefreshTimeout());
+ //
+ //
+ // }
+
/**
* Parse the Connection Configuration node as a ConnectionFactoryConfiguration object
* @param node
@@ -310,9 +345,11 @@
HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT,
Validators.GT_ZERO);
+ boolean ha = XMLConfigurationUtil.getBoolean(e, "ha",
HornetQClient.DEFAULT_HA);
+
String groupid = XMLConfigurationUtil.getString(e, "group-id", null,
Validators.NO_CHECK);
List<String> jndiBindings = new ArrayList<String>();
- List<Pair<String, String>> connectorNames = new
ArrayList<Pair<String, String>>();
+ List<String> connectorNames = new ArrayList<String>();
String discoveryGroupName = null;
NodeList children = node.getChildNodes();
@@ -345,16 +382,7 @@
{
String connectorName =
entry.getAttributes().getNamedItem("connector-name").getNodeValue();
- String backupConnectorName = null;
-
- Node backupNode =
entry.getAttributes().getNamedItem("backup-connector-name");
- if (backupNode != null)
- {
- backupConnectorName = backupNode.getNodeValue();
- }
-
- connectorNames.add(new Pair<String, String>(connectorName,
backupConnectorName));
-
+ connectorNames.add(connectorName);
}
}
}
@@ -371,14 +399,43 @@
if (discoveryGroupName != null)
{
- cfConfig = new ConnectionFactoryConfigurationImpl(name, strbindings);
+ DiscoveryGroupConfiguration discoveryGroupConfiguration =
config.getDiscoveryGroupConfigurations()
+
.get(discoveryGroupName);
+
+ if (discoveryGroupConfiguration == null)
+ {
+ log.warn("There is no discovery group with name '" +
discoveryGroupName + "' deployed.");
+
+ throw new IllegalArgumentException("There is no discovery group with
name '" + discoveryGroupName +
+ "' deployed.");
+ }
+
+ cfConfig = new ConnectionFactoryConfigurationImpl(name,
+ ha,
+
discoveryGroupConfiguration.getGroupAddress(),
+
discoveryGroupConfiguration.getGroupPort(),
+ strbindings);
+
cfConfig.setLocalBindAddress(discoveryGroupConfiguration.getLocalBindAddress());
cfConfig.setInitialWaitTimeout(discoveryInitialWaitTimeout);
- cfConfig.setDiscoveryGroupName(discoveryGroupName);
}
else
{
- cfConfig = new ConnectionFactoryConfigurationImpl(name, strbindings);
- cfConfig.setConnectorNames(connectorNames);
+ List<TransportConfiguration> tcList = new
ArrayList<TransportConfiguration>();
+
+ for (String connectorName : connectorNames)
+ {
+ TransportConfiguration tc =
config.getConnectorConfigurations().get(connectorName);
+
+ if (tc == null)
+ {
+ log.warn("There is no connector with name '" + connectorName
+ "' deployed.");
+
+ throw new IllegalArgumentException("There is no connector with name
'" + connectorName + "' deployed.");
+ }
+
+ tcList.add(tc);
+ }
+ cfConfig = new ConnectionFactoryConfigurationImpl(name, ha, tcList,
strbindings);
}
cfConfig.setClientID(clientID);
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java 2010-06-30
17:22:16 UTC (rev 9376)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java 2010-07-01
16:14:07 UTC (rev 9377)
@@ -44,7 +44,7 @@
protected static final String ENTRY_NODE_NAME = "entry";
- protected static final String CONNECTORS_NODE_NAME = "connectors";
+ protected static final String CONNECTORS_NODE_NAME = "static-connectors";
protected static final String CONNECTION_FACTORY_NODE_NAME =
"connection-factory";
@@ -63,7 +63,7 @@
this.jmsServerManager = jmsServerManager;
- parser = new JMSServerConfigParserImpl();
+ parser = new
JMSServerConfigParserImpl(jmsServerManager.getHornetQServer().getConfiguration());
}
/**
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-06-30
17:22:16 UTC (rev 9376)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-07-01
16:14:07 UTC (rev 9377)
@@ -27,14 +27,12 @@
import javax.naming.NamingException;
import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.management.AddressControl;
import org.hornetq.api.core.management.ResourceNames;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.DiscoveryGroupConfiguration;
import org.hornetq.core.deployers.DeploymentManager;
import org.hornetq.core.deployers.impl.FileDeploymentManager;
import org.hornetq.core.deployers.impl.XmlDeployer;
@@ -170,9 +168,9 @@
public void preActivate()
{
-
+
}
-
+
public synchronized void activated()
{
active = true;
@@ -676,36 +674,25 @@
}
public synchronized void createConnectionFactory(final String name,
- final
List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs,
+ final boolean ha,
+ final
List<TransportConfiguration> connectorConfigs,
String... jndiBindings) throws
Exception
{
checkInitialised();
HornetQConnectionFactory cf = connectionFactories.get(name);
if (cf == null)
{
- ConnectionFactoryConfiguration configuration = new
ConnectionFactoryConfigurationImpl(name, connectorConfigs);
+ ConnectionFactoryConfiguration configuration = new
ConnectionFactoryConfigurationImpl(name,
+
ha,
+
connectorConfigs);
createConnectionFactory(true, configuration, jndiBindings);
}
}
public synchronized void createConnectionFactory(final String name,
+ final boolean ha,
+ final
List<TransportConfiguration> connectorConfigs,
final String clientID,
- final
List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs,
- String... jndiBindings) throws
Exception
- {
- checkInitialised();
- HornetQConnectionFactory cf = connectionFactories.get(name);
- if (cf == null)
- {
- ConnectionFactoryConfiguration configuration = new
ConnectionFactoryConfigurationImpl(name, connectorConfigs);
- configuration.setClientID(clientID);
- createConnectionFactory(true, configuration, jndiBindings);
- }
- }
-
- public synchronized void createConnectionFactory(final String name,
- final
List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs,
- final String clientID,
final long clientFailureCheckPeriod,
final long connectionTTL,
final long callTimeout,
@@ -740,7 +727,9 @@
HornetQConnectionFactory cf = connectionFactories.get(name);
if (cf == null)
{
- ConnectionFactoryConfiguration configuration = new
ConnectionFactoryConfigurationImpl(name, connectorConfigs);
+ ConnectionFactoryConfiguration configuration = new
ConnectionFactoryConfigurationImpl(name,
+
ha,
+
connectorConfigs);
configuration.setClientID(clientID);
configuration.setClientFailureCheckPeriod(clientFailureCheckPeriod);
configuration.setConnectionTTL(connectionTTL);
@@ -775,6 +764,7 @@
}
public synchronized void createConnectionFactory(final String name,
+ final boolean ha,
final String localBindAddress,
final String discoveryAddress,
final int discoveryPort,
@@ -798,7 +788,7 @@
final boolean preAcknowledge,
final String
loadBalancingPolicyClassName,
final int transactionBatchSize,
- final int dupsOKBatchSize,
+ final int dupsOKBatchSize,
final boolean useGlobalPools,
final int
scheduledThreadPoolMaxSize,
final int threadPoolMaxSize,
@@ -810,15 +800,16 @@
final boolean
failoverOnServerShutdown,
final String groupId,
final String... jndiBindings) throws
Exception
- {
+ {
checkInitialised();
HornetQConnectionFactory cf = connectionFactories.get(name);
if (cf == null)
{
ConnectionFactoryConfiguration configuration = new
ConnectionFactoryConfigurationImpl(name,
-
localBindAddress,
+
ha,
discoveryAddress,
discoveryPort);
+ configuration.setLocalBindAddress(localBindAddress);
configuration.setClientID(clientID);
configuration.setDiscoveryRefreshTimeout(discoveryRefreshTimeout);
configuration.setInitialWaitTimeout(discoveryInitialWaitTimeout);
@@ -854,6 +845,7 @@
}
public synchronized void createConnectionFactory(final String name,
+ final boolean ha,
final String discoveryAddress,
final int discoveryPort,
final String... jndiBindings) throws
Exception
@@ -863,31 +855,13 @@
if (cf == null)
{
ConnectionFactoryConfiguration configuration = new
ConnectionFactoryConfigurationImpl(name,
+
ha,
discoveryAddress,
discoveryPort);
createConnectionFactory(true, configuration, jndiBindings);
}
}
- public synchronized void createConnectionFactory(final String name,
- final String clientID,
- final String discoveryAddress,
- final int discoveryPort,
- final String... jndiBindings) throws
Exception
- {
- checkInitialised();
-
- HornetQConnectionFactory cf = connectionFactories.get(name);
- if (cf == null)
- {
- ConnectionFactoryConfiguration configuration = new
ConnectionFactoryConfigurationImpl(name,
-
discoveryAddress,
-
discoveryPort);
- configuration.setClientID(clientID);
- createConnectionFactory(true, configuration, jndiBindings);
- }
- }
-
public synchronized void createConnectionFactory(final boolean storeConfig,
final ConnectionFactoryConfiguration
cfConfig,
String... jndi) throws Exception
@@ -914,153 +888,6 @@
}
}
- private HornetQConnectionFactory internalCreateConnectionFactory(final String name,
- final String
localBindAddress,
- final String
discoveryAddress,
- final int
discoveryPort,
- final String
clientID,
- final long
discoveryRefreshTimeout,
- final long
discoveryInitialWaitTimeout,
- final long
clientFailureCheckPeriod,
- final long
connectionTTL,
- final long
callTimeout,
- final boolean
cacheLargeMessagesClient,
- final int
minLargeMessageSize,
- final int
consumerWindowSize,
- final int
consumerMaxRate,
- final int
confirmationWindowSize,
- final int
producerWindowSize,
- final int
producerMaxRate,
- final boolean
blockOnAcknowledge,
- final boolean
blockOnDurableSend,
- final boolean
blockOnNonDurableSend,
- final boolean
autoGroup,
- final boolean
preAcknowledge,
- final String
loadBalancingPolicyClassName,
- final int
transactionBatchSize,
- final int
dupsOKBatchSize,
- final boolean
useGlobalPools,
- final int
scheduledThreadPoolMaxSize,
- final int
threadPoolMaxSize,
- final long
retryInterval,
- final double
retryIntervalMultiplier,
- final long
maxRetryInterval,
- final int
reconnectAttempts,
- final boolean
failoverOnInitialConnection,
- final boolean
failoverOnServerShutdown,
- final String groupId)
throws Exception
- {
- checkInitialised();
- HornetQConnectionFactory cf = connectionFactories.get(name);
- if (cf == null)
- {
- cf =
(HornetQConnectionFactory)HornetQJMSClient.createConnectionFactory(discoveryAddress,
discoveryPort);
- cf.setClientID(clientID);
- cf.setLocalBindAddress(localBindAddress);
- cf.setDiscoveryRefreshTimeout(discoveryRefreshTimeout);
- cf.setDiscoveryInitialWaitTimeout(discoveryInitialWaitTimeout);
- cf.setClientFailureCheckPeriod(clientFailureCheckPeriod);
- cf.setConnectionTTL(connectionTTL);
- cf.setCallTimeout(callTimeout);
- cf.setCacheLargeMessagesClient(cacheLargeMessagesClient);
- cf.setMinLargeMessageSize(minLargeMessageSize);
- cf.setConsumerWindowSize(consumerWindowSize);
- cf.setConsumerMaxRate(consumerMaxRate);
- cf.setConfirmationWindowSize(confirmationWindowSize);
- cf.setProducerWindowSize(producerWindowSize);
- cf.setProducerMaxRate(producerMaxRate);
- cf.setBlockOnAcknowledge(blockOnAcknowledge);
- cf.setBlockOnDurableSend(blockOnDurableSend);
- cf.setBlockOnNonDurableSend(blockOnNonDurableSend);
- cf.setAutoGroup(autoGroup);
- cf.setPreAcknowledge(preAcknowledge);
- cf.setConnectionLoadBalancingPolicyClassName(loadBalancingPolicyClassName);
- cf.setTransactionBatchSize(transactionBatchSize);
- cf.setDupsOKBatchSize(dupsOKBatchSize);
- cf.setUseGlobalPools(useGlobalPools);
- cf.setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize);
- cf.setThreadPoolMaxSize(threadPoolMaxSize);
- cf.setRetryInterval(retryInterval);
- cf.setRetryIntervalMultiplier(retryIntervalMultiplier);
- cf.setMaxRetryInterval(maxRetryInterval);
- cf.setReconnectAttempts(reconnectAttempts);
- cf.setFailoverOnInitialConnection(failoverOnInitialConnection);
- cf.setFailoverOnServerShutdown(failoverOnServerShutdown);
- }
-
- return cf;
- }
-
- private HornetQConnectionFactory internalCreateConnectionFactory(final String name,
- final
List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs,
- final String
clientID,
- final long
clientFailureCheckPeriod,
- final long
connectionTTL,
- final long
callTimeout,
- final boolean
cacheLargeMessagesClient,
- final int
minLargeMessageSize,
- final int
consumerWindowSize,
- final int
consumerMaxRate,
- final int
confirmationWindowSize,
- final int
producerWindowSize,
- final int
producerMaxRate,
- final boolean
blockOnAcknowledge,
- final boolean
blockOnDurableSend,
- final boolean
blockOnNonDurableSend,
- final boolean
autoGroup,
- final boolean
preAcknowledge,
- final String
loadBalancingPolicyClassName,
- final int
transactionBatchSize,
- final int
dupsOKBatchSize,
- final boolean
useGlobalPools,
- final int
scheduledThreadPoolMaxSize,
- final int
threadPoolMaxSize,
- final long
retryInterval,
- final double
retryIntervalMultiplier,
- final long
maxRetryInterval,
- final int
reconnectAttempts,
- final boolean
failoverOnInitialConnection,
- final boolean
failoverOnServerShutdown,
- final String groupId)
throws Exception
- {
- checkInitialised();
- HornetQConnectionFactory cf = connectionFactories.get(name);
- if (cf == null)
- {
- cf =
(HornetQConnectionFactory)HornetQJMSClient.createConnectionFactory(connectorConfigs);
- cf.setClientID(clientID);
- cf.setClientFailureCheckPeriod(clientFailureCheckPeriod);
- cf.setConnectionTTL(connectionTTL);
- cf.setCallTimeout(callTimeout);
- cf.setCacheLargeMessagesClient(cacheLargeMessagesClient);
- cf.setMinLargeMessageSize(minLargeMessageSize);
- cf.setConsumerWindowSize(consumerWindowSize);
- cf.setConsumerMaxRate(consumerMaxRate);
- cf.setConfirmationWindowSize(confirmationWindowSize);
- cf.setProducerWindowSize(producerWindowSize);
- cf.setProducerMaxRate(producerMaxRate);
- cf.setBlockOnAcknowledge(blockOnAcknowledge);
- cf.setBlockOnDurableSend(blockOnDurableSend);
- cf.setBlockOnNonDurableSend(blockOnNonDurableSend);
- cf.setAutoGroup(autoGroup);
- cf.setPreAcknowledge(preAcknowledge);
- cf.setConnectionLoadBalancingPolicyClassName(loadBalancingPolicyClassName);
- cf.setTransactionBatchSize(transactionBatchSize);
- cf.setDupsOKBatchSize(dupsOKBatchSize);
- cf.setUseGlobalPools(useGlobalPools);
- cf.setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize);
- cf.setThreadPoolMaxSize(threadPoolMaxSize);
- cf.setRetryInterval(retryInterval);
- cf.setRetryIntervalMultiplier(retryIntervalMultiplier);
- cf.setMaxRetryInterval(maxRetryInterval);
- cf.setReconnectAttempts(reconnectAttempts);
- cf.setFailoverOnInitialConnection(failoverOnInitialConnection);
- cf.setFailoverOnServerShutdown(failoverOnServerShutdown);
- cf.setGroupID(groupId);
- }
- return cf;
- }
-
private String[] getJNDIList(final Map<String, List<String>> map, final
String name)
{
List<String> result = map.get(name);
@@ -1151,82 +978,73 @@
private HornetQConnectionFactory internalCreateCF(final ConnectionFactoryConfiguration
cfConfig) throws HornetQException,
Exception
{
- List<Pair<TransportConfiguration, TransportConfiguration>>
connectorConfigs = lookupConnectors(cfConfig);
+ checkInitialised();
- lookupDiscovery(cfConfig);
- HornetQConnectionFactory cf;
- if (cfConfig.getDiscoveryAddress() != null)
- {
- cf = internalCreateConnectionFactory(cfConfig.getName(),
- cfConfig.getLocalBindAddress(),
- cfConfig.getDiscoveryAddress(),
- cfConfig.getDiscoveryPort(),
- cfConfig.getClientID(),
- cfConfig.getDiscoveryRefreshTimeout(),
- cfConfig.getInitialWaitTimeout(),
- cfConfig.getClientFailureCheckPeriod(),
- cfConfig.getConnectionTTL(),
- cfConfig.getCallTimeout(),
- cfConfig.isCacheLargeMessagesClient(),
- cfConfig.getMinLargeMessageSize(),
- cfConfig.getConsumerWindowSize(),
- cfConfig.getConsumerMaxRate(),
- cfConfig.getConfirmationWindowSize(),
- cfConfig.getProducerWindowSize(),
- cfConfig.getProducerMaxRate(),
- cfConfig.isBlockOnAcknowledge(),
- cfConfig.isBlockOnDurableSend(),
- cfConfig.isBlockOnNonDurableSend(),
- cfConfig.isAutoGroup(),
- cfConfig.isPreAcknowledge(),
-
cfConfig.getLoadBalancingPolicyClassName(),
- cfConfig.getTransactionBatchSize(),
- cfConfig.getDupsOKBatchSize(),
- cfConfig.isUseGlobalPools(),
- cfConfig.getScheduledThreadPoolMaxSize(),
- cfConfig.getThreadPoolMaxSize(),
- cfConfig.getRetryInterval(),
- cfConfig.getRetryIntervalMultiplier(),
- cfConfig.getMaxRetryInterval(),
- cfConfig.getReconnectAttempts(),
- cfConfig.isFailoverOnInitialConnection(),
- cfConfig.isFailoverOnServerShutdown(),
- cfConfig.getGroupID());
- }
- else
+ HornetQConnectionFactory cf = connectionFactories.get(cfConfig.getName());
+
+ if (cf == null)
{
- cf = internalCreateConnectionFactory(cfConfig.getName(),
- connectorConfigs,
- cfConfig.getClientID(),
- cfConfig.getClientFailureCheckPeriod(),
- cfConfig.getConnectionTTL(),
- cfConfig.getCallTimeout(),
- cfConfig.isCacheLargeMessagesClient(),
- cfConfig.getMinLargeMessageSize(),
- cfConfig.getConsumerWindowSize(),
- cfConfig.getConsumerMaxRate(),
- cfConfig.getConfirmationWindowSize(),
- cfConfig.getProducerWindowSize(),
- cfConfig.getProducerMaxRate(),
- cfConfig.isBlockOnAcknowledge(),
- cfConfig.isBlockOnDurableSend(),
- cfConfig.isBlockOnNonDurableSend(),
- cfConfig.isAutoGroup(),
- cfConfig.isPreAcknowledge(),
-
cfConfig.getLoadBalancingPolicyClassName(),
- cfConfig.getTransactionBatchSize(),
- cfConfig.getDupsOKBatchSize(),
- cfConfig.isUseGlobalPools(),
- cfConfig.getScheduledThreadPoolMaxSize(),
- cfConfig.getThreadPoolMaxSize(),
- cfConfig.getRetryInterval(),
- cfConfig.getRetryIntervalMultiplier(),
- cfConfig.getMaxRetryInterval(),
- cfConfig.getReconnectAttempts(),
- cfConfig.isFailoverOnInitialConnection(),
- cfConfig.isFailoverOnServerShutdown(),
- cfConfig.getGroupID());
+ if (cfConfig.getDiscoveryAddress() != null)
+ {
+
+ if (cfConfig.isHA())
+ {
+ cf =
HornetQJMSClient.createConnectionFactoryWithHA(cfConfig.getDiscoveryAddress(),
+
cfConfig.getDiscoveryPort());
+ }
+ else
+ {
+ cf =
HornetQJMSClient.createConnectionFactoryWithoutHA(cfConfig.getDiscoveryAddress(),
+
cfConfig.getDiscoveryPort());
+ }
+ cf.setLocalBindAddress(cfConfig.getLocalBindAddress());
+ cf.setDiscoveryRefreshTimeout(cfConfig.getDiscoveryRefreshTimeout());
+ cf.setDiscoveryInitialWaitTimeout(cfConfig.getInitialWaitTimeout());
+ }
+ else
+ {
+ if (cfConfig.isHA())
+ {
+ cf =
HornetQJMSClient.createConnectionFactoryWithHA(cfConfig.getConnectorConfigs()
+ .toArray(new
TransportConfiguration[0]));
+ }
+ else
+ {
+ cf =
HornetQJMSClient.createConnectionFactoryWithoutHA(cfConfig.getConnectorConfigs()
+
.toArray(new TransportConfiguration[0]));
+ }
+ }
+
+ cf.setClientID(cfConfig.getClientID());
+ cf.setClientFailureCheckPeriod(cfConfig.getClientFailureCheckPeriod());
+ cf.setConnectionTTL(cfConfig.getConnectionTTL());
+ cf.setCallTimeout(cfConfig.getCallTimeout());
+ cf.setCacheLargeMessagesClient(cfConfig.isCacheLargeMessagesClient());
+ cf.setMinLargeMessageSize(cfConfig.getMinLargeMessageSize());
+ cf.setConsumerWindowSize(cfConfig.getConsumerWindowSize());
+ cf.setConsumerMaxRate(cfConfig.getConsumerMaxRate());
+ cf.setConfirmationWindowSize(cfConfig.getConfirmationWindowSize());
+ cf.setProducerWindowSize(cfConfig.getProducerWindowSize());
+ cf.setProducerMaxRate(cfConfig.getProducerMaxRate());
+ cf.setBlockOnAcknowledge(cfConfig.isBlockOnAcknowledge());
+ cf.setBlockOnDurableSend(cfConfig.isBlockOnDurableSend());
+ cf.setBlockOnNonDurableSend(cfConfig.isBlockOnNonDurableSend());
+ cf.setAutoGroup(cfConfig.isAutoGroup());
+ cf.setPreAcknowledge(cfConfig.isPreAcknowledge());
+
cf.setConnectionLoadBalancingPolicyClassName(cfConfig.getLoadBalancingPolicyClassName());
+ cf.setTransactionBatchSize(cfConfig.getTransactionBatchSize());
+ cf.setDupsOKBatchSize(cfConfig.getDupsOKBatchSize());
+ cf.setUseGlobalPools(cfConfig.isUseGlobalPools());
+ cf.setScheduledThreadPoolMaxSize(cfConfig.getScheduledThreadPoolMaxSize());
+ cf.setThreadPoolMaxSize(cfConfig.getThreadPoolMaxSize());
+ cf.setRetryInterval(cfConfig.getRetryInterval());
+ cf.setRetryIntervalMultiplier(cfConfig.getRetryIntervalMultiplier());
+ cf.setMaxRetryInterval(cfConfig.getMaxRetryInterval());
+ cf.setReconnectAttempts(cfConfig.getReconnectAttempts());
+ cf.setFailoverOnInitialConnection(cfConfig.isFailoverOnInitialConnection());
+ cf.setFailoverOnServerShutdown(cfConfig.isFailoverOnServerShutdown());
}
+
connectionFactories.put(cfConfig.getName(), cf);
jmsManagementService.registerConnectionFactory(cfConfig.getName(), cf);
@@ -1234,64 +1052,6 @@
return cf;
}
- public synchronized void createConnectionFactory(final String name,
- final TransportConfiguration liveTC,
- final String... jndiBindings) throws
Exception
- {
- checkInitialised();
- HornetQConnectionFactory cf = connectionFactories.get(name);
- if (cf == null)
- {
- ConnectionFactoryConfiguration configuration = new
ConnectionFactoryConfigurationImpl(name, liveTC);
- createConnectionFactory(true, configuration, jndiBindings);
- }
- }
-
- public synchronized void createConnectionFactory(final String name,
- final String clientID,
- final TransportConfiguration liveTC,
- final String... jndiBindings) throws
Exception
- {
- checkInitialised();
- HornetQConnectionFactory cf = connectionFactories.get(name);
- if (cf == null)
- {
- ConnectionFactoryConfiguration configuration = new
ConnectionFactoryConfigurationImpl(name, liveTC);
- configuration.setClientID(clientID);
- createConnectionFactory(true, configuration, jndiBindings);
- }
- }
-
- public synchronized void createConnectionFactory(final String name,
- final TransportConfiguration liveTC,
- final TransportConfiguration
backupTC,
- final String... jndiBindings) throws
Exception
- {
- checkInitialised();
- HornetQConnectionFactory cf = connectionFactories.get(name);
- if (cf == null)
- {
- ConnectionFactoryConfiguration configuration = new
ConnectionFactoryConfigurationImpl(name, liveTC, backupTC);
- createConnectionFactory(true, configuration, jndiBindings);
- }
- }
-
- public synchronized void createConnectionFactory(final String name,
- final String clientID,
- final TransportConfiguration liveTC,
- final TransportConfiguration
backupTC,
- final String... jndiBindings) throws
Exception
- {
- checkInitialised();
- HornetQConnectionFactory cf = connectionFactories.get(name);
- if (cf == null)
- {
- ConnectionFactoryConfiguration configuration = new
ConnectionFactoryConfigurationImpl(name, liveTC, backupTC);
- configuration.setClientID(clientID);
- createConnectionFactory(true, configuration, jndiBindings);
- }
- }
-
public synchronized boolean destroyConnectionFactory(final String name) throws
Exception
{
checkInitialised();
@@ -1412,38 +1172,6 @@
return true;
}
- /**
- * @param cfConfig
- * @throws HornetQException
- */
- private void lookupDiscovery(final ConnectionFactoryConfiguration cfConfig) throws
HornetQException
- {
- if (cfConfig.getDiscoveryGroupName() != null)
- {
- Configuration configuration = server.getConfiguration();
-
- DiscoveryGroupConfiguration discoveryGroupConfiguration = null;
- discoveryGroupConfiguration = configuration.getDiscoveryGroupConfigurations()
-
.get(cfConfig.getDiscoveryGroupName());
-
- if (discoveryGroupConfiguration == null)
- {
- JMSServerManagerImpl.log.warn("There is no discovery group with name
'" + cfConfig.getDiscoveryGroupName() +
- "' deployed.");
-
- throw new HornetQException(HornetQException.ILLEGAL_STATE,
- "There is no discovery group with name
'" + cfConfig.getDiscoveryGroupName() +
- "' deployed.");
- }
-
-
cfConfig.setLocalBindAddress(discoveryGroupConfiguration.getLocalBindAddress());
- cfConfig.setDiscoveryAddress(discoveryGroupConfiguration.getGroupAddress());
- cfConfig.setDiscoveryPort(discoveryGroupConfiguration.getGroupPort());
-
cfConfig.setDiscoveryRefreshTimeout(discoveryGroupConfiguration.getRefreshTimeout());
-
- }
- }
-
private void deploy() throws Exception
{
if (config == null)
@@ -1656,60 +1384,4 @@
}
}
- /**
- * @param cfConfig
- * @return
- * @throws HornetQException
- */
- private List<Pair<TransportConfiguration, TransportConfiguration>>
lookupConnectors(final ConnectionFactoryConfiguration cfConfig) throws HornetQException
- {
- if (cfConfig.getConnectorConfigs() != null &&
cfConfig.getConnectorConfigs().size() > 0)
- {
- return cfConfig.getConnectorConfigs();
- }
- else if (cfConfig.getConnectorNames() != null)
- {
- Configuration configuration = server.getConfiguration();
- List<Pair<TransportConfiguration, TransportConfiguration>>
connectorConfigs = new ArrayList<Pair<TransportConfiguration,
TransportConfiguration>>();
-
- for (Pair<String, String> configConnector : cfConfig.getConnectorNames())
- {
- String connectorName = configConnector.a;
- String backupConnectorName = configConnector.b;
-
- TransportConfiguration connector =
configuration.getConnectorConfigurations().get(connectorName);
-
- if (connector == null)
- {
- JMSServerManagerImpl.log.warn("There is no connector with name
'" + connectorName + "' deployed.");
- throw new HornetQException(HornetQException.ILLEGAL_STATE,
- "There is no connector with name
'" + connectorName + "' deployed.");
- }
-
- TransportConfiguration backupConnector = null;
-
- if (backupConnectorName != null)
- {
- backupConnector =
configuration.getConnectorConfigurations().get(backupConnectorName);
-
- if (backupConnector == null)
- {
- JMSServerManagerImpl.log.warn("There is no backup connector with
name '" + backupConnectorName +
- "' deployed.");
- throw new HornetQException(HornetQException.ILLEGAL_STATE,
- "There is no backup connector with name
'" + backupConnectorName +
- "' deployed.");
- }
- }
-
- connectorConfigs.add(new Pair<TransportConfiguration,
TransportConfiguration>(connector, backupConnector));
- }
- return connectorConfigs;
-
- }
- else
- {
- return null;
- }
- }
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java 2010-06-30
17:22:16 UTC (rev 9376)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java 2010-07-01
16:14:07 UTC (rev 9377)
@@ -21,7 +21,11 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.logging.Logger;
/**
@@ -56,6 +60,8 @@
private final String password;
+ private ServerLocator serverLocator;
+
private ClientSessionFactory csf;
private XAResource delegate;
@@ -274,7 +280,8 @@
}
TransportConfiguration config = new
TransportConfiguration(connectorFactoryClassName, connectorConfig);
- csf = HornetQClient.createClientSessionFactory(config);
+ serverLocator = HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration[]{config});
+ csf = serverLocator.createSessionFactory();
ClientSession cs = null;
if (username == null)
@@ -302,16 +309,20 @@
{
try
{
+ ServerLocator oldServerLocator = null;
ClientSessionFactory oldCSF = null;
synchronized (HornetQXAResourceWrapper.lock)
{
oldCSF = csf;
csf = null;
delegate = null;
+ oldServerLocator = serverLocator;
+ serverLocator = null;
}
if (oldCSF != null)
{
oldCSF.close();
+ oldServerLocator.close();
}
}
catch (Exception ignored)
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/ra/ConnectionFactoryProperties.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/ra/ConnectionFactoryProperties.java 2010-06-30
17:22:16 UTC (rev 9376)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/ra/ConnectionFactoryProperties.java 2010-07-01
16:14:07 UTC (rev 9377)
@@ -42,14 +42,9 @@
* The transport config, changing the default configured from the RA
*/
private Map<String, Object> connectionParameters;
+
+ private Boolean ha;
- /**
- * The transport config, changing the default configured from the RA
- */
- private Map<String, Object> backupConnectionParameters;
-
- private String backupConnectorClassName;
-
private String connectionLoadBalancingPolicyClassName;
private String discoveryAddress;
@@ -131,35 +126,23 @@
hasBeenUpdated = true;
}
- public String getBackupConnectorClassName()
+ public Boolean isHA()
{
- return backupConnectorClassName;
+ return ha;
}
-
- public Map<String, Object> getParsedBackupConnectionParameters()
+
+ public void setHA(final Boolean ha)
{
- return backupConnectionParameters;
- }
-
- public void setParsedBackupConnectionParameters(final Map<String, Object>
backupConnectionParameters)
- {
- this.backupConnectionParameters = backupConnectionParameters;
hasBeenUpdated = true;
+ this.ha = ha;
}
-
- public void setBackupConnectorClassName(final String backupConnectorClassName)
- {
- this.backupConnectorClassName = backupConnectorClassName;
- hasBeenUpdated = true;
- }
-
+
public String getConnectionLoadBalancingPolicyClassName()
{
if (ConnectionFactoryProperties.trace)
{
ConnectionFactoryProperties.log.trace("getConnectionLoadBalancingPolicyClassName()");
}
- hasBeenUpdated = true;
return connectionLoadBalancingPolicyClassName;
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/ra/HornetQRAMCFProperties.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/ra/HornetQRAMCFProperties.java 2010-06-30
17:22:16 UTC (rev 9376)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/ra/HornetQRAMCFProperties.java 2010-07-01
16:14:07 UTC (rev 9377)
@@ -113,20 +113,6 @@
}
/**
- * @return the connectionParameters
- */
- public String getBackupConnectionParameters()
- {
- return strBackupConnectionParameters;
- }
-
- public void setBackupConnectionParameters(final String configuration)
- {
- strBackupConnectionParameters = configuration;
- setParsedBackupConnectionParameters(Util.parseConfig(configuration));
- }
-
- /**
* Set the default session type.
*
* @param defaultType either javax.jms.Topic or javax.jms.Queue
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/ra/HornetQRAManagedConnectionFactory.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/ra/HornetQRAManagedConnectionFactory.java 2010-06-30
17:22:16 UTC (rev 9376)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/ra/HornetQRAManagedConnectionFactory.java 2010-07-01
16:14:07 UTC (rev 9377)
@@ -396,26 +396,6 @@
mcfProperties.setConnectionParameters(configuration);
}
- public String getBackupConnectorClassName()
- {
- return mcfProperties.getBackupConnectorClassName();
- }
-
- public void setBackupConnectorClassName(final String backupConnectorClassName)
- {
- mcfProperties.setBackupConnectorClassName(backupConnectorClassName);
- }
-
- public String getBackupConnectionParameters()
- {
- return mcfProperties.getBackupConnectionParameters();
- }
-
- public void setBackupConnectionParameters(final String configuration)
- {
- mcfProperties.setBackupConnectionParameters(configuration);
- }
-
/**
* @return the transportType
*/
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/ra/HornetQResourceAdapter.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2010-06-30
17:22:16 UTC (rev 9376)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2010-07-01
16:14:07 UTC (rev 9377)
@@ -33,9 +33,9 @@
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.core.logging.Logger;
+import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.ra.inflow.HornetQActivation;
import org.hornetq.ra.inflow.HornetQActivationSpec;
@@ -80,11 +80,6 @@
private String unparsedProperties;
/**
- * The JBoss connection factory
- */
- private ClientSessionFactory sessionFactory;
-
- /**
* Have the factory been configured
*/
private final AtomicBoolean configured;
@@ -107,7 +102,6 @@
}
raProperties = new HornetQRAProperties();
- sessionFactory = null;
configured = new AtomicBoolean(false);
activations = new ConcurrentHashMap<ActivationSpec, HornetQActivation>();
}
@@ -254,33 +248,6 @@
}
}
- public String getBackupConnectorClassName()
- {
- return raProperties.getBackupConnectorClassName();
- }
-
- public void setBackupConnectorClassName(final String backupConnector)
- {
- if (HornetQResourceAdapter.trace)
- {
- HornetQResourceAdapter.log.trace("setBackUpTransportType(" +
backupConnector + ")");
- }
- raProperties.setBackupConnectorClassName(backupConnector);
- }
-
- public Map<String, Object> getBackupConnectionParameters()
- {
- return raProperties.getParsedBackupConnectionParameters();
- }
-
- public void setBackupTransportConfiguration(final String config)
- {
- if (config != null)
- {
- raProperties.setParsedBackupConnectionParameters(Util.parseConfig(config));
- }
- }
-
/**
* Get the discovery group name
*
@@ -340,6 +307,16 @@
raProperties.setDiscoveryPort(dgp);
}
+
+ public Boolean isHA()
+ {
+ return raProperties.isHA();
+ }
+
+ public void setHA(final Boolean ha)
+ {
+ this.raProperties.setHA(ha);
+ }
/**
* Get discovery refresh timeout
@@ -1331,7 +1308,6 @@
protected void setup() throws HornetQException
{
defaultHornetQConnectionFactory = createHornetQConnectionFactory(raProperties);
- sessionFactory = defaultHornetQConnectionFactory.getCoreFactory();
}
public HornetQConnectionFactory getDefaultHornetQConnectionFactory() throws
ResourceException
@@ -1350,6 +1326,7 @@
return defaultHornetQConnectionFactory;
}
+ //TODO - currently RA only allows a single target server to be specified we should
allow a list of servers to be passed in
public HornetQConnectionFactory createHornetQConnectionFactory(final
ConnectionFactoryProperties overrideProperties)
{
HornetQConnectionFactory cf;
@@ -1357,28 +1334,38 @@
:
getConnectorClassName();
String discoveryAddress = overrideProperties.getDiscoveryAddress() != null ?
overrideProperties.getDiscoveryAddress()
:
getDiscoveryAddress();
+
+ Boolean ha = overrideProperties.isHA() != null ? overrideProperties.isHA() :
isHA();
+
if (connectorClassName != null)
{
Map<String, Object> connectionParams =
overrideConnectionParameters(overrideProperties.getParsedConnectionParameters(),raProperties.getParsedConnectionParameters());
+
TransportConfiguration transportConf = new
TransportConfiguration(connectorClassName, connectionParams);
-
- String backUpCOnnectorClassname =
overrideProperties.getBackupConnectorClassName() != null ?
overrideProperties.getBackupConnectorClassName()
-
: getBackupConnectorClassName();
- Map<String, Object> backupConnectionParams =
-
overrideConnectionParameters(overrideProperties.getParsedBackupConnectionParameters(),
- getBackupConnectionParameters());
- TransportConfiguration backup = backUpCOnnectorClassname == null ? null
- : new
TransportConfiguration(backUpCOnnectorClassname,
-
backupConnectionParams);
-
- cf = HornetQJMSClient.createConnectionFactory(transportConf, backup);
+
+ if (ha)
+ {
+ cf = HornetQJMSClient.createConnectionFactoryWithHA(new
TransportConfiguration[] {transportConf});
+ }
+ else
+ {
+ cf = HornetQJMSClient.createConnectionFactoryWithoutHA(new
TransportConfiguration[] {transportConf});
+ }
}
else if (discoveryAddress != null)
{
Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ?
overrideProperties.getDiscoveryPort()
:
getDiscoveryPort();
- cf = HornetQJMSClient.createConnectionFactory(discoveryAddress, discoveryPort);
+
+ if (ha)
+ {
+ cf = HornetQJMSClient.createConnectionFactoryWithHA(discoveryAddress,
discoveryPort);
+ }
+ else
+ {
+ cf = HornetQJMSClient.createConnectionFactoryWithoutHA(discoveryAddress,
discoveryPort);
+ }
}
else
{
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/ra/inflow/HornetQActivation.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/ra/inflow/HornetQActivation.java 2010-06-30
17:22:16 UTC (rev 9376)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/ra/inflow/HornetQActivation.java 2010-07-01
16:14:07 UTC (rev 9377)
@@ -32,10 +32,10 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.jms.HornetQJMSClient;
+import org.hornetq.core.logging.Logger;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQDestination;
-import org.hornetq.api.jms.HornetQJMSClient;
-import org.hornetq.core.logging.Logger;
import org.hornetq.ra.HornetQResourceAdapter;
import org.hornetq.ra.Util;
@@ -352,7 +352,7 @@
try
{
- result = ra.createSession(factory.getCoreFactory(),
+ result = ra.createSession(factory.getServerLocator().createSessionFactory(),
spec.getAcknowledgeModeInt(),
spec.getUser(),
spec.getPassword(),