Author: clebert.suconic(a)jboss.com
Date: 2011-01-25 22:21:12 -0500 (Tue, 25 Jan 2011)
New Revision: 10146
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/api/jms/management/JMSServerControl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
Log:
HORNETQ-616 / HORNETQ-617 - settings of connection factories through management
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/api/jms/management/JMSServerControl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/api/jms/management/JMSServerControl.java 2011-01-25
21:58:43 UTC (rev 10145)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/api/jms/management/JMSServerControl.java 2011-01-26
03:21:12 UTC (rev 10146)
@@ -161,7 +161,82 @@
@Parameter(name = "cfType", desc =
"RegularCF=0, QueueCF=1, TopicCF=2, XACF=3, QueueXACF=4, TopicXACF=5") int
cfType,
@Parameter(name = "connectorNames", desc =
"comma-separated list of connectorNames or the discovery group name") String
connectors,
@Parameter(name = "jndiBindings", desc =
"comma-separated list of JNDI bindings (use ',' if u need to use
commas in your jndi name)") String jndiBindings) throws Exception;
+
+ @Operation(desc = "Create a JMS ConnectionFactory", impact =
MBeanOperationInfo.ACTION)
+ void createConnectionFactory(@Parameter(name = "name") String name,
+ @Parameter(name = "ha") boolean ha,
+ @Parameter(name = "useDiscovery", desc =
"should we use discovery or a connector configuration") boolean useDiscovery,
+ @Parameter(name = "cfType", desc =
"RegularCF=0, QueueCF=1, TopicCF=2, XACF=3, QueueXACF=4, TopicXACF=5") int
cfType,
+ @Parameter(name = "connectorNames", desc =
"An array of connector or the binding address") String[] connectors,
+ @Parameter(name = "jndiBindings", desc =
"array JNDI bindings (use ',' if u need to use commas in your jndi
name)") String[] jndiBindings,
+ @Parameter(name = "clientID", desc = "The
clientID configured for the connectionFactory") String clientID,
+ @Parameter(name = "clientFailureCheckPeriod",
desc = "clientFailureCheckPeriod") long clientFailureCheckPeriod,
+ @Parameter(name = "connectionTTL", desc =
"connectionTTL") long connectionTTL,
+ @Parameter(name = "callTimeout", desc =
"callTimeout") long callTimeout,
+ @Parameter(name = "minLargeMessageSize", desc =
"minLargeMessageSize") int minLargeMessageSize,
+ @Parameter(name = "compressLargeMessages", desc
= "compressLargeMessages") boolean compressLargeMessages,
+ @Parameter(name = "consumerWindowSize", desc =
"consumerWindowSize") int consumerWindowSize,
+ @Parameter(name = "consumerMaxRate", desc =
"consumerMaxRate") int consumerMaxRate,
+ @Parameter(name = "confirmationWindowSize",
desc = "confirmationWindowSize") int confirmationWindowSize,
+ @Parameter(name = "producerWindowSize", desc =
"producerWindowSize") int producerWindowSize,
+ @Parameter(name = "producerMaxRate", desc =
"producerMaxRate") int producerMaxRate,
+ @Parameter(name = "blockOnAcknowledge", desc =
"blockOnAcknowledge") boolean blockOnAcknowledge,
+ @Parameter(name = "blockOnDurableSend", desc =
"blockOnDurableSend") boolean blockOnDurableSend,
+ @Parameter(name = "blockOnNonDurableSend", desc
= "blockOnNonDurableSend") boolean blockOnNonDurableSend,
+ @Parameter(name = "autoGroup", desc =
"autoGroup") boolean autoGroup,
+ @Parameter(name = "preAcknowledge", desc =
"preAcknowledge") boolean preAcknowledge,
+ @Parameter(name =
"loadBalancingPolicyClassName", desc = "loadBalancingPolicyClassName (null
or blank mean use the default value)") String loadBalancingPolicyClassName,
+ @Parameter(name = "transactionBatchSize", desc
= "transactionBatchSize") int transactionBatchSize,
+ @Parameter(name = "dupsOKBatchSize", desc =
"dupsOKBatchSize") int dupsOKBatchSize,
+ @Parameter(name = "useGlobalPools", desc =
"useGlobalPools") boolean useGlobalPools,
+ @Parameter(name = "scheduledThreadPoolMaxSize",
desc = "scheduledThreadPoolMaxSize") int scheduledThreadPoolMaxSize,
+ @Parameter(name = "threadPoolMaxSize", desc =
"threadPoolMaxSize") int threadPoolMaxSize,
+ @Parameter(name = "retryInterval", desc =
"retryInterval") long retryInterval,
+ @Parameter(name = "retryIntervalMultiplier",
desc = "retryIntervalMultiplier") double retryIntervalMultiplier,
+ @Parameter(name = "maxRetryInterval", desc =
"maxRetryInterval") long maxRetryInterval,
+ @Parameter(name = "reconnectAttempts", desc =
"reconnectAttempts") int reconnectAttempts,
+ @Parameter(name =
"failoverOnInitialConnection", desc = "failoverOnInitialConnection")
boolean failoverOnInitialConnection,
+ @Parameter(name = "groupId", desc =
"groupId") String groupId) throws Exception;
+
+ @Operation(desc = "Create a JMS ConnectionFactory", impact =
MBeanOperationInfo.ACTION)
+ void createConnectionFactory(@Parameter(name = "name") String name,
+ @Parameter(name = "ha") boolean ha,
+ @Parameter(name = "useDiscovery", desc =
"should we use discovery or a connector configuration") boolean useDiscovery,
+ @Parameter(name = "cfType", desc =
"RegularCF=0, QueueCF=1, TopicCF=2, XACF=3, QueueXACF=4, TopicXACF=5") int
cfType,
+ @Parameter(name = "connectorNames", desc =
"comma-separated list of connectorNames or the discovery group name") String
connectors,
+ @Parameter(name = "jndiBindings", desc =
"comma-separated list of JNDI bindings (use ',' if u need to use
commas in your jndi name)") String jndiBindings,
+ @Parameter(name = "clientID", desc = "The
clientID configured for the connectionFactory") String clientID,
+ @Parameter(name = "clientFailureCheckPeriod",
desc = "clientFailureCheckPeriod") long clientFailureCheckPeriod,
+ @Parameter(name = "connectionTTL", desc =
"connectionTTL") long connectionTTL,
+ @Parameter(name = "callTimeout", desc =
"callTimeout") long callTimeout,
+ @Parameter(name = "minLargeMessageSize", desc =
"minLargeMessageSize") int minLargeMessageSize,
+ @Parameter(name = "compressLargeMessages", desc
= "compressLargeMessages") boolean compressLargeMessages,
+ @Parameter(name = "consumerWindowSize", desc =
"consumerWindowSize") int consumerWindowSize,
+ @Parameter(name = "consumerMaxRate", desc =
"consumerMaxRate") int consumerMaxRate,
+ @Parameter(name = "confirmationWindowSize",
desc = "confirmationWindowSize") int confirmationWindowSize,
+ @Parameter(name = "producerWindowSize", desc =
"producerWindowSize") int producerWindowSize,
+ @Parameter(name = "producerMaxRate", desc =
"producerMaxRate") int producerMaxRate,
+ @Parameter(name = "blockOnAcknowledge", desc =
"blockOnAcknowledge") boolean blockOnAcknowledge,
+ @Parameter(name = "blockOnDurableSend", desc =
"blockOnDurableSend") boolean blockOnDurableSend,
+ @Parameter(name = "blockOnNonDurableSend", desc
= "blockOnNonDurableSend") boolean blockOnNonDurableSend,
+ @Parameter(name = "autoGroup", desc =
"autoGroup") boolean autoGroup,
+ @Parameter(name = "preAcknowledge", desc =
"preAcknowledge") boolean preAcknowledge,
+ @Parameter(name =
"loadBalancingPolicyClassName", desc = "loadBalancingPolicyClassName (null
or blank mean use the default value)") String loadBalancingPolicyClassName,
+ @Parameter(name = "transactionBatchSize", desc
= "transactionBatchSize") int transactionBatchSize,
+ @Parameter(name = "dupsOKBatchSize", desc =
"dupsOKBatchSize") int dupsOKBatchSize,
+ @Parameter(name = "useGlobalPools", desc =
"useGlobalPools") boolean useGlobalPools,
+ @Parameter(name = "scheduledThreadPoolMaxSize",
desc = "scheduledThreadPoolMaxSize") int scheduledThreadPoolMaxSize,
+ @Parameter(name = "threadPoolMaxSize", desc =
"threadPoolMaxSize") int threadPoolMaxSize,
+ @Parameter(name = "retryInterval", desc =
"retryInterval") long retryInterval,
+ @Parameter(name = "retryIntervalMultiplier",
desc = "retryIntervalMultiplier") double retryIntervalMultiplier,
+ @Parameter(name = "maxRetryInterval", desc =
"maxRetryInterval") long maxRetryInterval,
+ @Parameter(name = "reconnectAttempts", desc =
"reconnectAttempts") int reconnectAttempts,
+ @Parameter(name =
"failoverOnInitialConnection", desc = "failoverOnInitialConnection")
boolean failoverOnInitialConnection,
+ @Parameter(name = "groupId", desc =
"groupId") String groupId) throws Exception;
+
+
+
@Operation(desc = "Destroy a JMS ConnectionFactory", impact =
MBeanOperationInfo.ACTION)
void destroyConnectionFactory(@Parameter(name = "name", desc = "Name of
the ConnectionFactory to destroy") String name) throws Exception;
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2011-01-25
21:58:43 UTC (rev 10145)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2011-01-26
03:21:12 UTC (rev 10146)
@@ -604,6 +604,7 @@
{
JMSException jmse = new JMSException("Failed to create session
factory");
+ jmse.initCause(e);
jmse.setLinkedException(e);
throw jmse;
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2011-01-25
21:58:43 UTC (rev 10145)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2011-01-26
03:21:12 UTC (rev 10146)
@@ -29,6 +29,7 @@
import javax.management.NotificationFilter;
import javax.management.NotificationListener;
+import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.management.Parameter;
import org.hornetq.api.jms.JMSFactoryType;
import org.hornetq.api.jms.management.ConnectionFactoryControl;
@@ -44,6 +45,8 @@
import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.client.HornetQQueue;
import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
+import org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.utils.json.JSONArray;
import org.hornetq.utils.json.JSONObject;
@@ -206,6 +209,186 @@
}
}
+ /* (non-Javadoc)
+ * @see
org.hornetq.api.jms.management.JMSServerControl#createConnectionFactory(java.lang.String,
boolean, boolean, int, java.lang.String, java.lang.String, java.lang.String, long, long,
long, int, boolean, int, int, int, int, int, boolean, boolean, boolean, boolean, boolean,
java.lang.String, int, int, boolean, int, int, long, double, long, int, boolean,
java.lang.String)
+ */
+ public void createConnectionFactory(String name,
+ boolean ha,
+ boolean useDiscovery,
+ int cfType,
+ String connectors,
+ String jndiBindings,
+ String clientID,
+ long clientFailureCheckPeriod,
+ long connectionTTL,
+ long callTimeout,
+ int minLargeMessageSize,
+ boolean compressLargeMessages,
+ int consumerWindowSize,
+ int consumerMaxRate,
+ int confirmationWindowSize,
+ int producerWindowSize,
+ int producerMaxRate,
+ boolean blockOnAcknowledge,
+ boolean blockOnDurableSend,
+ boolean blockOnNonDurableSend,
+ boolean autoGroup,
+ boolean preAcknowledge,
+ String loadBalancingPolicyClassName,
+ int transactionBatchSize,
+ int dupsOKBatchSize,
+ boolean useGlobalPools,
+ int scheduledThreadPoolMaxSize,
+ int threadPoolMaxSize,
+ long retryInterval,
+ double retryIntervalMultiplier,
+ long maxRetryInterval,
+ int reconnectAttempts,
+ boolean failoverOnInitialConnection,
+ String groupId) throws Exception
+ {
+ createConnectionFactory(name,
+ ha,
+ useDiscovery,
+ cfType,
+ toArray(connectors),
+ toArray(jndiBindings),
+ clientID,
+ clientFailureCheckPeriod,
+ connectionTTL,
+ callTimeout,
+ minLargeMessageSize,
+ compressLargeMessages,
+ consumerWindowSize,
+ consumerMaxRate,
+ confirmationWindowSize,
+ producerWindowSize,
+ producerMaxRate,
+ blockOnAcknowledge,
+ blockOnDurableSend,
+ blockOnNonDurableSend,
+ autoGroup,
+ preAcknowledge,
+ loadBalancingPolicyClassName,
+ transactionBatchSize,
+ dupsOKBatchSize,
+ useGlobalPools,
+ scheduledThreadPoolMaxSize,
+ threadPoolMaxSize,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetryInterval,
+ reconnectAttempts,
+ failoverOnInitialConnection,
+ groupId);
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.api.jms.management.JMSServerControl#createConnectionFactory(java.lang.String,
boolean, boolean, int, java.lang.String[], java.lang.String[], java.lang.String, long,
long, long, int, boolean, int, int, int, int, int, boolean, boolean, boolean, boolean,
boolean, java.lang.String, int, int, boolean, int, int, long, double, long, int, boolean,
java.lang.String)
+ */
+ public void createConnectionFactory(String name,
+ boolean ha,
+ boolean useDiscovery,
+ int cfType,
+ String[] connectorNames,
+ String[] bindings,
+ String clientID,
+ long clientFailureCheckPeriod,
+ long connectionTTL,
+ long callTimeout,
+ int minLargeMessageSize,
+ boolean compressLargeMessages,
+ int consumerWindowSize,
+ int consumerMaxRate,
+ int confirmationWindowSize,
+ int producerWindowSize,
+ int producerMaxRate,
+ boolean blockOnAcknowledge,
+ boolean blockOnDurableSend,
+ boolean blockOnNonDurableSend,
+ boolean autoGroup,
+ boolean preAcknowledge,
+ String loadBalancingPolicyClassName,
+ int transactionBatchSize,
+ int dupsOKBatchSize,
+ boolean useGlobalPools,
+ int scheduledThreadPoolMaxSize,
+ int threadPoolMaxSize,
+ long retryInterval,
+ double retryIntervalMultiplier,
+ long maxRetryInterval,
+ int reconnectAttempts,
+ boolean failoverOnInitialConnection,
+ String groupId) throws Exception
+ {
+ checkStarted();
+
+ clearIO();
+
+ try
+ {
+ ConnectionFactoryConfiguration configuration = new
ConnectionFactoryConfigurationImpl(name, ha, bindings);
+
+ if (useDiscovery)
+ {
+ configuration.setDiscoveryGroupName(connectorNames[0]);
+ }
+ else
+ {
+ ArrayList<String> connectorNamesList = new ArrayList<String>();
+ for (String nameC : connectorNames)
+ {
+ connectorNamesList.add(nameC);
+ }
+ configuration.setConnectorNames(connectorNamesList);
+ }
+
+ configuration.setFactoryType(JMSFactoryType.valueOf(cfType));
+ configuration.setClientID(clientID);
+ configuration.setClientFailureCheckPeriod(clientFailureCheckPeriod);
+ configuration.setConnectionTTL(connectionTTL);
+ configuration.setCallTimeout(callTimeout);
+ configuration.setMinLargeMessageSize(minLargeMessageSize);
+ configuration.setCompressLargeMessages(compressLargeMessages);
+ configuration.setConsumerWindowSize(consumerWindowSize);
+ configuration.setConsumerMaxRate(consumerMaxRate);
+ configuration.setConfirmationWindowSize(confirmationWindowSize);
+ configuration.setProducerWindowSize(producerWindowSize);
+ configuration.setProducerMaxRate(producerMaxRate);
+ configuration.setBlockOnAcknowledge(blockOnAcknowledge);
+ configuration.setBlockOnDurableSend(blockOnDurableSend);
+ configuration.setBlockOnNonDurableSend(blockOnNonDurableSend);
+ configuration.setAutoGroup(autoGroup);
+ configuration.setPreAcknowledge(preAcknowledge);
+
+ if (loadBalancingPolicyClassName == null ||
loadBalancingPolicyClassName.trim().equals(""))
+ {
+ loadBalancingPolicyClassName =
HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
+ }
+
+ configuration.setLoadBalancingPolicyClassName(loadBalancingPolicyClassName);
+ configuration.setTransactionBatchSize(transactionBatchSize);
+ configuration.setDupsOKBatchSize(dupsOKBatchSize);
+ configuration.setUseGlobalPools(useGlobalPools);
+ configuration.setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize);
+ configuration.setThreadPoolMaxSize(threadPoolMaxSize);
+ configuration.setRetryInterval(retryInterval);
+ configuration.setRetryIntervalMultiplier(retryIntervalMultiplier);
+ configuration.setMaxRetryInterval(maxRetryInterval);
+ configuration.setReconnectAttempts(reconnectAttempts);
+ configuration.setFailoverOnInitialConnection(failoverOnInitialConnection);
+ configuration.setGroupID(groupId);
+
+ server.createConnectionFactory(true, configuration, bindings);
+
+ sendNotification(NotificationType.CONNECTION_FACTORY_CREATED, name);
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ }
+
/**
* Create a JMS ConnectionFactory with the specified name connected to a single
live-backup pair of servers.
* <br>
@@ -700,7 +883,6 @@
return MBeanInfoHelper.getMBeanOperationsInfo(JMSServerControl.class);
}
-
// Private -------------------------------------------------------
private void sendNotification(final NotificationType type, final String message)
@@ -716,6 +898,7 @@
throw new IllegalStateException("HornetQ JMS Server is not started. it can
not be managed yet");
}
}
+
// Inner classes -------------------------------------------------
public static enum NotificationType
@@ -860,5 +1043,4 @@
return obj;
}
-
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2011-01-25
21:58:43 UTC (rev 10145)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2011-01-26
03:21:12 UTC (rev 10146)
@@ -516,6 +516,8 @@
connectorNames.add(str.toString());
}
}
+
+ ha = buffer.readBoolean();
clientID = BufferHelper.readNullableSimpleStringAsString(buffer);
@@ -602,6 +604,8 @@
BufferHelper.writeAsSimpleString(buffer, tc);
}
}
+
+ buffer.writeBoolean(ha);
BufferHelper.writeAsNullableSimpleString(buffer, clientID);
@@ -686,6 +690,9 @@
}
size += BufferHelper.sizeOfNullableSimpleString(clientID) +
+
+ DataConstants.SIZE_BOOLEAN +
+ // ha
DataConstants.SIZE_LONG +
// clientFailureCheckPeriod
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2011-01-25
21:58:43 UTC (rev 10145)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2011-01-26
03:21:12 UTC (rev 10146)
@@ -912,6 +912,17 @@
storage.addJNDI(PersistedType.ConnectionFactory, cfConfig.getName(), usedJNDI);
}
}
+
+ public JMSStorageManager getJMSStorageManager()
+ {
+ return storage;
+ }
+
+ // used on tests only
+ public void replaceStorageManager(JMSStorageManager newStorage)
+ {
+ this.storage = newStorage;
+ }
private String[] getJNDIList(final Map<String, List<String>> map, final
String name)
{
@@ -1085,6 +1096,7 @@
cf.setReconnectAttempts(cfConfig.getReconnectAttempts());
cf.setFailoverOnInitialConnection(cfConfig.isFailoverOnInitialConnection());
cf.setCompressLargeMessage(cfConfig.isCompressLargeMessages());
+ cf.setGroupID(cfConfig.getGroupID());
}
connectionFactories.put(cfConfig.getName(), cf);
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java 2011-01-25
21:58:43 UTC (rev 10145)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java 2011-01-26
03:21:12 UTC (rev 10146)
@@ -14,7 +14,6 @@
package org.hornetq.tests.integration.jms.server.management;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -35,31 +34,31 @@
import junit.framework.Assert;
-import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.management.AddressControl;
import org.hornetq.api.core.management.ObjectNameBuilder;
import org.hornetq.api.core.management.ResourceNames;
import org.hornetq.api.jms.management.JMSServerControl;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.QueueBinding;
-import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
+import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
import org.hornetq.jms.client.HornetQConnection;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQDestination;
+import org.hornetq.jms.client.HornetQQueueConnectionFactory;
import org.hornetq.jms.persistence.JMSStorageManager;
import org.hornetq.jms.persistence.config.PersistedConnectionFactory;
import org.hornetq.jms.persistence.config.PersistedDestination;
import org.hornetq.jms.persistence.config.PersistedJNDI;
import org.hornetq.jms.persistence.config.PersistedType;
-import org.hornetq.jms.server.JMSServerManager;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.tests.integration.management.ManagementControlHelper;
import org.hornetq.tests.integration.management.ManagementTestBase;
@@ -112,7 +111,7 @@
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
-
+
/** Number of consumers used by the test itself */
protected int getNumberOfConsumers()
{
@@ -395,7 +394,6 @@
Assert.assertNull(fakeJMSStorageManager.destinationMap.get(topicName));
}
-
public void testListAllConsumers() throws Exception
{
String topicJNDIBinding = RandomUtil.randomString();
@@ -416,15 +414,15 @@
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// create a consumer will create a Core queue bound to the topic address
MessageConsumer cons = session.createConsumer(topic);
-
+
JSONArray jsonArray = new JSONArray(control.listAllConsumersAsJSON());
-
+
assertEquals(1 + getNumberOfConsumers(), jsonArray.length());
-
+
cons.close();
-
+
jsonArray = new JSONArray(control.listAllConsumersAsJSON());
-
+
assertEquals(getNumberOfConsumers(), jsonArray.length());
String topicAddress =
HornetQDestination.createTopicAddressFromName(topicName).toString();
@@ -465,8 +463,10 @@
public void testCreateConnectionFactory_3b() throws Exception
{
- server.getConfiguration().getConnectorConfigurations().put("tst", new
TransportConfiguration(INVM_CONNECTOR_FACTORY));
-
+ server.getConfiguration()
+ .getConnectorConfigurations()
+ .put("tst", new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+
doCreateConnectionFactory(new ConnectionFactoryCreator()
{
public void createConnectionFactory(final JMSServerControl control,
@@ -475,16 +475,119 @@
{
String jndiBindings = JMSServerControlTest.toCSV(bindings);
- control.createConnectionFactory(cfName,
- false,
- false,
- 0,
- "tst",
- jndiBindings);
+ control.createConnectionFactory(cfName, false, false, 0, "tst",
jndiBindings);
}
});
}
+ public void testCreateConnectionFactory_CopmleteList() throws Exception
+ {
+ JMSServerControl control = createManagementControl();
+ control.createConnectionFactory("test", //name
+ true, // ha
+ false, // useDiscovery
+ 1, // cfType
+ "invm", // connectorNames
+ "tst", // jndiBindins
+ "tst", // clientID
+ 1, // clientFailureCheckPeriod
+ 1, // connectionTTL
+ 1, // callTimeout
+ 1, // minLargeMessageSize
+ true, // compressLargeMessages
+ 1, // consumerWindowSize
+ 1, // consumerMaxRate
+ 1, // confirmationWindowSize
+ 1, // ProducerWindowSize
+ 1, // producerMaxRate
+ true, // blockOnACK
+ true, // blockOnDurableSend
+ true, // blockOnNonDurableSend
+ true, // autoGroup
+ true, // preACK
+
HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, //
loadBalancingClassName
+ 1, // transactionBatchSize
+ 1, // dupsOKBatchSize
+ true, // useGlobalPools
+ 1, // scheduleThreadPoolSize
+ 1, // threadPoolMaxSize
+ 1, // retryInterval
+ 1, // retryIntervalMultiplier
+ 1, // maxRetryInterval
+ 1, // reconnectAttempts
+ true, // failoverOnInitialConnection
+ "tst"); // groupID
+
+
+ HornetQQueueConnectionFactory cf =
(HornetQQueueConnectionFactory)context.lookup("tst");
+
+ assertEquals(true, cf.isHA());
+ assertEquals("tst", cf.getClientID());
+ assertEquals(1, cf.getClientFailureCheckPeriod());
+ assertEquals(1, cf.getConnectionTTL());
+ assertEquals(1, cf.getCallTimeout());
+ assertEquals(1, cf.getMinLargeMessageSize());
+ assertEquals(true, cf.isCompressLargeMessage());
+ assertEquals(1, cf.getConsumerWindowSize());
+ assertEquals(1, cf.getConfirmationWindowSize());
+ assertEquals(1, cf.getProducerWindowSize());
+ assertEquals(1, cf.getProducerMaxRate());
+ assertEquals(true, cf.isBlockOnAcknowledge());
+ assertEquals(true, cf.isBlockOnDurableSend());
+ assertEquals(true, cf.isBlockOnNonDurableSend());
+ assertEquals(true, cf.isAutoGroup());
+ assertEquals(true, cf.isPreAcknowledge());
+ assertEquals(HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
cf.getConnectionLoadBalancingPolicyClassName());
+ assertEquals(1, cf.getTransactionBatchSize());
+ assertEquals(1, cf.getDupsOKBatchSize());
+ assertEquals(true, cf.isUseGlobalPools());
+ assertEquals(1, cf.getScheduledThreadPoolMaxSize());
+ assertEquals(1, cf.getThreadPoolMaxSize());
+ assertEquals(1, cf.getRetryInterval());
+ assertEquals(1.0, cf.getRetryIntervalMultiplier());
+ assertEquals(1, cf.getMaxRetryInterval());
+ assertEquals(1, cf.getReconnectAttempts());
+ assertEquals(true, cf.isFailoverOnInitialConnection());
+ assertEquals("tst", cf.getGroupID());
+
+ stopServer();
+
+ startServer();
+
+ cf = (HornetQQueueConnectionFactory)context.lookup("tst");
+
+ assertEquals(true, cf.isHA());
+ assertEquals("tst", cf.getClientID());
+ assertEquals(1, cf.getClientFailureCheckPeriod());
+ assertEquals(1, cf.getConnectionTTL());
+ assertEquals(1, cf.getCallTimeout());
+ assertEquals(1, cf.getMinLargeMessageSize());
+ assertEquals(true, cf.isCompressLargeMessage());
+ assertEquals(1, cf.getConsumerWindowSize());
+ assertEquals(1, cf.getConfirmationWindowSize());
+ assertEquals(1, cf.getProducerWindowSize());
+ assertEquals(1, cf.getProducerMaxRate());
+ assertEquals(true, cf.isBlockOnAcknowledge());
+ assertEquals(true, cf.isBlockOnDurableSend());
+ assertEquals(true, cf.isBlockOnNonDurableSend());
+ assertEquals(true, cf.isAutoGroup());
+ assertEquals(true, cf.isPreAcknowledge());
+ assertEquals(HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
cf.getConnectionLoadBalancingPolicyClassName());
+ assertEquals(1, cf.getTransactionBatchSize());
+ assertEquals(1, cf.getDupsOKBatchSize());
+ assertEquals(true, cf.isUseGlobalPools());
+ assertEquals(1, cf.getScheduledThreadPoolMaxSize());
+ assertEquals(1, cf.getThreadPoolMaxSize());
+ assertEquals(1, cf.getRetryInterval());
+ assertEquals(1.0, cf.getRetryIntervalMultiplier());
+ assertEquals(1, cf.getMaxRetryInterval());
+ assertEquals(1, cf.getReconnectAttempts());
+ assertEquals(true, cf.isFailoverOnInitialConnection());
+ assertEquals("tst", cf.getGroupID());
+
+
+ }
+
public void testListPreparedTransactionDetails() throws Exception
{
Xid xid = newXID();
@@ -492,9 +595,11 @@
JMSServerControl control = createManagementControl();
String cfJNDIBinding = "/cf";
String cfName = "cf";
-
- server.getConfiguration().getConnectorConfigurations().put("tst", new
TransportConfiguration(INVM_CONNECTOR_FACTORY));
+ server.getConfiguration()
+ .getConnectorConfigurations()
+ .put("tst", new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+
control.createConnectionFactory(cfName, false, false, 0, "tst",
cfJNDIBinding);
control.createQueue("q", "/q");
@@ -535,11 +640,13 @@
TransportConfiguration tc = new
TransportConfiguration(InVMConnectorFactory.class.getName());
String cfJNDIBinding = "/cf";
String cfName = "cf";
-
- server.getConfiguration().getConnectorConfigurations().put("tst", new
TransportConfiguration(INVM_CONNECTOR_FACTORY));
- control.createConnectionFactory(cfName, false, false, 0, "tst",
cfJNDIBinding);
+ server.getConfiguration()
+ .getConnectorConfigurations()
+ .put("tst", new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+ control.createConnectionFactory(cfName, false, false, 0, "tst",
cfJNDIBinding);
+
control.createQueue("q", "/q");
ConnectionFactory cf = (ConnectionFactory)context.lookup("/cf");
@@ -580,23 +687,50 @@
{
super.setUp();
+ startServer();
+ }
+
+ /**
+ * @throws Exception
+ */
+ protected void startServer() throws Exception
+ {
Configuration conf = createBasicConfig();
conf.setSecurityEnabled(false);
conf.setJMXManagementEnabled(true);
- conf.getAcceptorConfigurations().add(new
TransportConfiguration(InVMAcceptorFactory.class.getName()));
- server = HornetQServers.newHornetQServer(conf, mbeanServer, false);
+ conf.setPersistenceEnabled(true);
+ conf.getAcceptorConfigurations().add(new
TransportConfiguration(NettyAcceptorFactory.class.getName()));
+ conf.getAcceptorConfigurations().add(new
TransportConfiguration(INVM_ACCEPTOR_FACTORY));
+ conf.getConnectorConfigurations().put("netty", new
TransportConfiguration(NettyConnectorFactory.class.getName()));
+ conf.getConnectorConfigurations().put("invm", new
TransportConfiguration(INVM_CONNECTOR_FACTORY));
+
+ server = HornetQServers.newHornetQServer(conf, mbeanServer, true);
+
+ serverManager = new JMSServerManagerImpl(server);
context = new InVMContext();
- fakeJMSStorageManager = new FakeJMSStorageManager();
- serverManager = new JMSServerManagerImpl(server, null, fakeJMSStorageManager);
serverManager.setContext(context);
serverManager.start();
serverManager.activated();
+
+ this.fakeJMSStorageManager = new
FakeJMSStorageManager(serverManager.getJMSStorageManager());
+
+ serverManager.replaceStorageManager(fakeJMSStorageManager);
}
@Override
protected void tearDown() throws Exception
{
+ stopServer();
+
+ super.tearDown();
+ }
+
+ /**
+ * @throws Exception
+ */
+ protected void stopServer() throws Exception
+ {
serverManager.stop();
server.stop();
@@ -604,8 +738,6 @@
serverManager = null;
server = null;
-
- super.tearDown();
}
protected JMSServerControl createManagementControl() throws Exception
@@ -615,9 +747,7 @@
// Private -------------------------------------------------------
- private void
-
- doCreateConnectionFactory(final ConnectionFactoryCreator creator) throws Exception
+ private void doCreateConnectionFactory(final ConnectionFactoryCreator creator) throws
Exception
{
Object[] cfJNDIBindings = new Object[] { RandomUtil.randomString(),
RandomUtil.randomString(),
@@ -650,30 +780,6 @@
Assert.assertTrue(fakeJMSStorageManager.persistedJNDIMap.get(cfName).contains(cfJNDIBindings[2]));
}
- private JMSServerManager startHornetQServer(final int discoveryPort) throws Exception
- {
- Configuration conf = createBasicConfig();
- conf.setSecurityEnabled(false);
- conf.setJMXManagementEnabled(true);
- conf.getDiscoveryGroupConfigurations()
- .put("discovery",
- new DiscoveryGroupConfiguration("discovery",
- null,
- "231.7.7.7",
- discoveryPort,
-
ConfigurationImpl.DEFAULT_BROADCAST_REFRESH_TIMEOUT,
-
ConfigurationImpl.DEFAULT_BROADCAST_REFRESH_TIMEOUT));
- HornetQServer server = HornetQServers.newHornetQServer(conf, mbeanServer, false);
-
- context = new InVMContext();
- JMSServerManagerImpl serverManager = new JMSServerManagerImpl(server);
- serverManager.setContext(context);
- serverManager.start();
- serverManager.activated();
-
- return serverManager;
- }
-
// Inner classes -------------------------------------------------
interface ConnectionFactoryCreator
@@ -688,35 +794,46 @@
Map<String, PersistedConnectionFactory> connectionFactoryMap = new
HashMap<String, PersistedConnectionFactory>();
ConcurrentHashMap<String, List<String>> persistedJNDIMap = new
ConcurrentHashMap<String, List<String>>();
+
+ JMSStorageManager delegate;
+
+ public FakeJMSStorageManager(JMSStorageManager delegate)
+ {
+ this.delegate = delegate;
+ }
public void storeDestination(PersistedDestination destination) throws Exception
{
destinationMap.put(destination.getName(), destination);
+ delegate.storeDestination(destination);
}
public void deleteDestination(PersistedType type, String name) throws Exception
{
destinationMap.remove(name);
+ delegate.deleteDestination(type, name);
}
public List<PersistedDestination> recoverDestinations()
{
- return Collections.EMPTY_LIST;
+ return delegate.recoverDestinations();
}
public void deleteConnectionFactory(String connectionFactory) throws Exception
{
connectionFactoryMap.remove(connectionFactory);
+ delegate.deleteConnectionFactory(connectionFactory);
}
public void storeConnectionFactory(PersistedConnectionFactory connectionFactory)
throws Exception
{
connectionFactoryMap.put(connectionFactory.getName(), connectionFactory);
+ delegate.storeConnectionFactory(connectionFactory);
}
public List<PersistedConnectionFactory> recoverConnectionFactories()
{
- return Collections.EMPTY_LIST;
+ return delegate.recoverConnectionFactories();
}
public void addJNDI(PersistedType type, String name, String... address) throws
Exception
@@ -726,36 +843,39 @@
{
persistedJNDIMap.get(name).add(ad);
}
+ delegate.addJNDI(type, name, address);
}
public List<PersistedJNDI> recoverPersistedJNDI() throws Exception
{
- return Collections.EMPTY_LIST;
+ return delegate.recoverPersistedJNDI();
}
public void deleteJNDI(PersistedType type, String name, String address) throws
Exception
{
persistedJNDIMap.get(name).remove(address);
+ delegate.deleteJNDI(type, name, address);
}
public void deleteJNDI(PersistedType type, String name) throws Exception
{
persistedJNDIMap.get(name).clear();
+ delegate.deleteJNDI(type, name);
}
public void start() throws Exception
{
- // To change body of implemented methods use File | Settings | File Templates.
+ delegate.start();
}
public void stop() throws Exception
{
- // To change body of implemented methods use File | Settings | File Templates.
+ delegate.stop();
}
public boolean isStarted()
{
- return false; // To change body of implemented methods use File | Settings |
File Templates.
+ return delegate.isStarted();
}
/* (non-Javadoc)
@@ -763,6 +883,7 @@
*/
public void installReplication(ReplicationEndpoint replicationEndpoint) throws
Exception
{
+ delegate.installReplication(replicationEndpoint);
}
/* (non-Javadoc)
@@ -770,6 +891,7 @@
*/
public void load() throws Exception
{
+ delegate.load();
}
}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2011-01-25
21:58:43 UTC (rev 10145)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2011-01-26
03:21:12 UTC (rev 10146)
@@ -66,13 +66,13 @@
return 1;
}
-
@Override
protected void setUp() throws Exception
{
super.setUp();
- HornetQConnectionFactory cf = (HornetQConnectionFactory)
HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new
TransportConfiguration(InVMConnectorFactory.class.getName()));
+ HornetQConnectionFactory cf =
(HornetQConnectionFactory)HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,
+
new TransportConfiguration(InVMConnectorFactory.class.getName()));
connection = cf.createQueueConnection();
session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
@@ -93,13 +93,12 @@
@Override
protected JMSServerControl createManagementControl() throws Exception
{
- HornetQQueue managementQueue = (HornetQQueue)
HornetQJMSClient.createQueue("hornetq.management");
+ HornetQQueue managementQueue =
(HornetQQueue)HornetQJMSClient.createQueue("hornetq.management");
final JMSMessagingProxy proxy = new JMSMessagingProxy(session, managementQueue,
ResourceNames.JMS_SERVER);
return new JMSServerControl()
{
-
public boolean closeConnectionsForAddress(final String ipAddress) throws
Exception
{
return (Boolean)proxy.invokeOperation("closeConnectionsForAddress",
ipAddress);
@@ -119,7 +118,7 @@
{
return (Boolean)proxy.invokeOperation("createQueue", name,
jndiBindings, selector, durable);
}
-
+
public boolean createTopic(final String name) throws Exception
{
return (Boolean)proxy.invokeOperation("createTopic", name);
@@ -169,12 +168,12 @@
{
return (String[])proxy.invokeOperation("listConnectionIDs");
}
-
+
public String listConnectionsAsJSON() throws Exception
{
return (String)proxy.invokeOperation("listConnectionsAsJSON");
}
-
+
public String listConsumersAsJSON(String connectionID) throws Exception
{
return (String)proxy.invokeOperation("listConsumersAsJSON",
connectionID);
@@ -199,13 +198,13 @@
{
proxy.invokeOperation("removeSecuritySettings", addressMatch);
}
-
+
@SuppressWarnings("unchecked")
public Set<Role> getSecuritySettings(String addressMatch) throws
Exception
{
return
(Set<Role>)proxy.invokeOperation("getSecuritySettings", addressMatch);
}
-
+
public String getSecuritySettingsAsJSON(String addressMatch) throws Exception
{
return (String)proxy.invokeOperation("getSecuritySettingsAsJSON",
addressMatch);
@@ -259,10 +258,15 @@
Object[] bindings) throws Exception
{
proxy.invokeOperation("createConnectionFactory", name, ha,
useDiscovery, cfType, connectorNames, bindings);
-
+
}
- public void createConnectionFactory(String name, boolean ha, boolean
useDiscovery, int cfType, String connectors, String jndiBindings) throws Exception
+ public void createConnectionFactory(String name,
+ boolean ha,
+ boolean useDiscovery,
+ int cfType,
+ String connectors,
+ String jndiBindings) throws Exception
{
proxy.invokeOperation("createConnectionFactory", name, ha,
useDiscovery, cfType, connectors, jndiBindings);
}
@@ -272,7 +276,150 @@
return (String)proxy.invokeOperation("listAllConsumersAsJSON");
}
+ public void createConnectionFactory(String name,
+ boolean ha,
+ boolean useDiscovery,
+ int cfType,
+ String[] connectors,
+ String[] jndiBindings,
+ String clientID,
+ long clientFailureCheckPeriod,
+ long connectionTTL,
+ long callTimeout,
+ int minLargeMessageSize,
+ boolean compressLargeMessages,
+ int consumerWindowSize,
+ int consumerMaxRate,
+ int confirmationWindowSize,
+ int producerWindowSize,
+ int producerMaxRate,
+ boolean blockOnAcknowledge,
+ boolean blockOnDurableSend,
+ boolean blockOnNonDurableSend,
+ boolean autoGroup,
+ boolean preAcknowledge,
+ String loadBalancingPolicyClassName,
+ int transactionBatchSize,
+ int dupsOKBatchSize,
+ boolean useGlobalPools,
+ int scheduledThreadPoolMaxSize,
+ int threadPoolMaxSize,
+ long retryInterval,
+ double retryIntervalMultiplier,
+ long maxRetryInterval,
+ int reconnectAttempts,
+ boolean failoverOnInitialConnection,
+ String groupId) throws Exception
+ {
+ proxy.invokeOperation("createConnectionFactory",
+ name,
+ ha,
+ useDiscovery,
+ cfType,
+ connectors,
+ jndiBindings,
+ clientID,
+ clientFailureCheckPeriod,
+ connectionTTL,
+ callTimeout,
+ minLargeMessageSize,
+ compressLargeMessages,
+ consumerWindowSize,
+ consumerMaxRate,
+ confirmationWindowSize,
+ producerWindowSize,
+ producerMaxRate,
+ blockOnAcknowledge,
+ blockOnDurableSend,
+ blockOnNonDurableSend,
+ autoGroup,
+ preAcknowledge,
+ loadBalancingPolicyClassName,
+ transactionBatchSize,
+ dupsOKBatchSize,
+ useGlobalPools,
+ scheduledThreadPoolMaxSize,
+ threadPoolMaxSize,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetryInterval,
+ reconnectAttempts,
+ failoverOnInitialConnection,
+ groupId);
+ }
+ public void createConnectionFactory(String name,
+ boolean ha,
+ boolean useDiscovery,
+ int cfType,
+ String connectors,
+ String jndiBindings,
+ String clientID,
+ long clientFailureCheckPeriod,
+ long connectionTTL,
+ long callTimeout,
+ int minLargeMessageSize,
+ boolean compressLargeMessages,
+ int consumerWindowSize,
+ int consumerMaxRate,
+ int confirmationWindowSize,
+ int producerWindowSize,
+ int producerMaxRate,
+ boolean blockOnAcknowledge,
+ boolean blockOnDurableSend,
+ boolean blockOnNonDurableSend,
+ boolean autoGroup,
+ boolean preAcknowledge,
+ String loadBalancingPolicyClassName,
+ int transactionBatchSize,
+ int dupsOKBatchSize,
+ boolean useGlobalPools,
+ int scheduledThreadPoolMaxSize,
+ int threadPoolMaxSize,
+ long retryInterval,
+ double retryIntervalMultiplier,
+ long maxRetryInterval,
+ int reconnectAttempts,
+ boolean failoverOnInitialConnection,
+ String groupId) throws Exception
+ {
+ proxy.invokeOperation("createConnectionFactory",
+ name,
+ ha,
+ useDiscovery,
+ cfType,
+ connectors,
+ jndiBindings,
+ clientID,
+ clientFailureCheckPeriod,
+ connectionTTL,
+ callTimeout,
+ minLargeMessageSize,
+ compressLargeMessages,
+ consumerWindowSize,
+ consumerMaxRate,
+ confirmationWindowSize,
+ producerWindowSize,
+ producerMaxRate,
+ blockOnAcknowledge,
+ blockOnDurableSend,
+ blockOnNonDurableSend,
+ autoGroup,
+ preAcknowledge,
+ loadBalancingPolicyClassName,
+ transactionBatchSize,
+ dupsOKBatchSize,
+ useGlobalPools,
+ scheduledThreadPoolMaxSize,
+ threadPoolMaxSize,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetryInterval,
+ reconnectAttempts,
+ failoverOnInitialConnection,
+ groupId);
+ }
+
};
}
// Public --------------------------------------------------------