[hornetq-commits] JBoss hornetq SVN: r10237 - in branches/Branch_2_2_EAP: src/main/org/hornetq/jms/management/impl and 8 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Mon Feb 21 16:59:43 EST 2011
Author: clebert.suconic at jboss.com
Date: 2011-02-21 16:59:42 -0500 (Mon, 21 Feb 2011)
New Revision: 10237
Added:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/ConnectionFactoryControlTest.java
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/api/jms/management/ConnectionFactoryControl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/management/impl/JMSConnectionFactoryControlImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/JMSServerManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/management/JMSManagementService.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/DivertControlTest.java
Log:
HORNETQ-647 - Persisting changes on ConnectinFactoryControl
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/api/jms/management/ConnectionFactoryControl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/api/jms/management/ConnectionFactoryControl.java 2011-02-21 17:33:22 UTC (rev 10236)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/api/jms/management/ConnectionFactoryControl.java 2011-02-21 21:59:42 UTC (rev 10237)
@@ -63,6 +63,10 @@
* Sets the Client ID for this connection factory.
*/
void setClientID(String clientID);
+
+ boolean isCompressLargeMessages();
+
+ void setCompressLargeMessages(boolean compress);
/**
* @see ClientSessionFactory#getClientFailureCheckPeriod()
@@ -327,11 +331,6 @@
int getInitialMessagePacketSize();
/**
- * @see ClientSessionFactory#setInitialMessagePacketSize(int)
- */
- void setInitialMessagePacketSize(int size);
-
- /**
* @see ClientSessionFactory#isUseGlobalPools()
*/
boolean isUseGlobalPools();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/management/impl/JMSConnectionFactoryControlImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/management/impl/JMSConnectionFactoryControlImpl.java 2011-02-21 17:33:22 UTC (rev 10236)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/management/impl/JMSConnectionFactoryControlImpl.java 2011-02-21 21:59:42 UTC (rev 10237)
@@ -24,6 +24,7 @@
import org.hornetq.core.management.impl.MBeanInfoHelper;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -37,7 +38,9 @@
// Attributes ----------------------------------------------------
- private final HornetQConnectionFactory cf;
+ private final ConnectionFactoryConfiguration cfConfig;
+
+ private HornetQConnectionFactory cf;
private final String name;
@@ -47,11 +50,13 @@
// Constructors --------------------------------------------------
- public JMSConnectionFactoryControlImpl(final HornetQConnectionFactory cf,
+ public JMSConnectionFactoryControlImpl(final ConnectionFactoryConfiguration cfConfig,
+ final HornetQConnectionFactory cf,
final JMSServerManager jmsManager,
final String name) throws NotCompliantMBeanException
{
super(ConnectionFactoryControl.class);
+ this.cfConfig = cfConfig;
this.cf = cf;
this.name = name;
this.jmsManager = jmsManager;
@@ -65,170 +70,200 @@
{
return jmsManager.getJNDIOnConnectionFactory(name);
}
+
+ public boolean isCompressLargeMessages()
+ {
+ return cf.isCompressLargeMessage();
+ }
+
+ public void setCompressLargeMessages(final boolean compress)
+ {
+ cfConfig.setCompressLargeMessages(compress);
+ recreateCF();
+ }
public boolean isHA()
{
- return cf.isHA();
+ return cfConfig.isHA();
}
public int getFactoryType()
{
- return cf.getFactoryType();
+ return cfConfig.getFactoryType().intValue();
}
public String getClientID()
{
- return cf.getClientID();
+ return cfConfig.getClientID();
}
public long getClientFailureCheckPeriod()
{
- return cf.getClientFailureCheckPeriod();
+ return cfConfig.getClientFailureCheckPeriod();
}
- public String getConnectionLoadBalancingPolicyClassName()
- {
- return cf.getConnectionLoadBalancingPolicyClassName();
- }
-
public void setClientID(String clientID)
{
- cf.setClientID(clientID);
+ cfConfig.setClientID(clientID);
+ recreateCF();
}
public void setDupsOKBatchSize(int dupsOKBatchSize)
{
- cf.setDupsOKBatchSize(dupsOKBatchSize);
+ cfConfig.setDupsOKBatchSize(dupsOKBatchSize);
+ recreateCF();
}
public void setTransactionBatchSize(int transactionBatchSize)
{
- cf.setTransactionBatchSize(transactionBatchSize);
+ cfConfig.setTransactionBatchSize(transactionBatchSize);
+ recreateCF();
}
public void setClientFailureCheckPeriod(long clientFailureCheckPeriod)
{
- cf.setClientFailureCheckPeriod(clientFailureCheckPeriod);
+ cfConfig.setClientFailureCheckPeriod(clientFailureCheckPeriod);
+ recreateCF();
}
public void setConnectionTTL(long connectionTTL)
{
- cf.setConnectionTTL(connectionTTL);
+ cfConfig.setConnectionTTL(connectionTTL);
+ recreateCF();
}
public void setCallTimeout(long callTimeout)
{
- cf.setCallTimeout(callTimeout);
+ cfConfig.setCallTimeout(callTimeout);
+ recreateCF();
}
public void setConsumerWindowSize(int consumerWindowSize)
{
- cf.setConsumerWindowSize(consumerWindowSize);
+ cfConfig.setConsumerWindowSize(consumerWindowSize);
+ recreateCF();
}
public void setConsumerMaxRate(int consumerMaxRate)
{
- cf.setConsumerMaxRate(consumerMaxRate);
+ cfConfig.setConsumerMaxRate(consumerMaxRate);
+ recreateCF();
}
public void setConfirmationWindowSize(int confirmationWindowSize)
{
- cf.setConfirmationWindowSize(confirmationWindowSize);
+ cfConfig.setConfirmationWindowSize(confirmationWindowSize);
+ recreateCF();
}
public void setProducerMaxRate(int producerMaxRate)
{
- cf.setProducerMaxRate(producerMaxRate);
+ cfConfig.setProducerMaxRate(producerMaxRate);
+ recreateCF();
}
public int getProducerWindowSize()
{
- return cf.getProducerWindowSize();
+ return cfConfig.getProducerWindowSize();
}
public void setProducerWindowSize(int producerWindowSize)
{
- cf.setProducerWindowSize(producerWindowSize);
+ cfConfig.setProducerWindowSize(producerWindowSize);
+ recreateCF();
}
public void setCacheLargeMessagesClient(boolean cacheLargeMessagesClient)
{
- cf.setCacheLargeMessagesClient(cacheLargeMessagesClient);
+ cfConfig.setCacheLargeMessagesClient(cacheLargeMessagesClient);
+ recreateCF();
}
public boolean isCacheLargeMessagesClient()
{
- return cf.isCacheLargeMessagesClient();
+ return cfConfig.isCacheLargeMessagesClient();
}
public void setMinLargeMessageSize(int minLargeMessageSize)
{
- cf.setMinLargeMessageSize(minLargeMessageSize);
+ cfConfig.setMinLargeMessageSize(minLargeMessageSize);
+ recreateCF();
}
public void setBlockOnNonDurableSend(boolean blockOnNonDurableSend)
{
- cf.setBlockOnNonDurableSend(blockOnNonDurableSend);
+ cfConfig.setBlockOnNonDurableSend(blockOnNonDurableSend);
+ recreateCF();
}
public void setBlockOnAcknowledge(boolean blockOnAcknowledge)
{
- cf.setBlockOnAcknowledge(blockOnAcknowledge);
+ cfConfig.setBlockOnAcknowledge(blockOnAcknowledge);
+ recreateCF();
}
public void setBlockOnDurableSend(boolean blockOnDurableSend)
{
- cf.setBlockOnDurableSend(blockOnDurableSend);
+ cfConfig.setBlockOnDurableSend(blockOnDurableSend);
+ recreateCF();
}
public void setAutoGroup(boolean autoGroup)
{
- cf.setAutoGroup(autoGroup);
+ cfConfig.setAutoGroup(autoGroup);
+ recreateCF();
}
public void setPreAcknowledge(boolean preAcknowledge)
{
- cf.setPreAcknowledge(preAcknowledge);
+ cfConfig.setPreAcknowledge(preAcknowledge);
+ recreateCF();
}
public void setMaxRetryInterval(long retryInterval)
{
- cf.setMaxRetryInterval(retryInterval);
+ cfConfig.setMaxRetryInterval(retryInterval);
+ recreateCF();
}
public void setRetryIntervalMultiplier(double retryIntervalMultiplier)
{
- cf.setRetryIntervalMultiplier(retryIntervalMultiplier);
+ cfConfig.setRetryIntervalMultiplier(retryIntervalMultiplier);
+ recreateCF();
}
public void setReconnectAttempts(int reconnectAttempts)
{
- cf.setReconnectAttempts(reconnectAttempts);
+ cfConfig.setReconnectAttempts(reconnectAttempts);
+ recreateCF();
}
public void setFailoverOnInitialConnection(boolean failover)
{
- cf.setFailoverOnInitialConnection(failover);
+ cfConfig.setFailoverOnInitialConnection(failover);
+ recreateCF();
}
public boolean isUseGlobalPools()
{
- return cf.isUseGlobalPools();
+ return cfConfig.isUseGlobalPools();
}
public void setScheduledThreadPoolMaxSize(int scheduledThreadPoolMaxSize)
{
- cf.setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize);
+ cfConfig.setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize);
+ recreateCF();
}
public int getThreadPoolMaxSize()
{
- return cf.getThreadPoolMaxSize();
+ return cfConfig.getThreadPoolMaxSize();
}
public void setThreadPoolMaxSize(int threadPoolMaxSize)
{
- cf.setThreadPoolMaxSize(threadPoolMaxSize);
+ cfConfig.setThreadPoolMaxSize(threadPoolMaxSize);
+ recreateCF();
}
public int getInitialMessagePacketSize()
@@ -238,43 +273,47 @@
public void setGroupID(String groupID)
{
- cf.setGroupID(groupID);
+ cfConfig.setGroupID(groupID);
+ recreateCF();
}
public String getGroupID()
{
- return cf.getGroupID();
+ return cfConfig.getGroupID();
}
- public void setInitialMessagePacketSize(int size)
- {
- cf.setInitialMessagePacketSize(size);
- }
-
public void setUseGlobalPools(boolean useGlobalPools)
{
- cf.setUseGlobalPools(useGlobalPools);
+ cfConfig.setUseGlobalPools(useGlobalPools);
+ recreateCF();
}
public int getScheduledThreadPoolMaxSize()
{
- return cf.getScheduledThreadPoolMaxSize();
+ return cfConfig.getScheduledThreadPoolMaxSize();
}
public void setRetryInterval(long retryInterval)
{
- cf.setRetryInterval(retryInterval);
+ cfConfig.setRetryInterval(retryInterval);
+ recreateCF();
}
public long getMaxRetryInterval()
{
- return cf.getMaxRetryInterval();
+ return cfConfig.getMaxRetryInterval();
}
- public void setConnectionLoadBalancingPolicyClassName(String connectionLoadBalancingPolicyClassName)
+ public String getConnectionLoadBalancingPolicyClassName()
{
- cf.setConnectionLoadBalancingPolicyClassName(connectionLoadBalancingPolicyClassName);
+ return cfConfig.getLoadBalancingPolicyClassName();
}
+
+ public void setConnectionLoadBalancingPolicyClassName(String name)
+ {
+ cfConfig.setLoadBalancingPolicyClassName(name);
+ recreateCF();
+ }
public TransportConfiguration[] getStaticConnectors()
{
@@ -293,52 +332,52 @@
public long getCallTimeout()
{
- return cf.getCallTimeout();
+ return cfConfig.getCallTimeout();
}
public int getConsumerMaxRate()
{
- return cf.getConsumerMaxRate();
+ return cfConfig.getConsumerMaxRate();
}
public int getConsumerWindowSize()
{
- return cf.getConsumerWindowSize();
+ return cfConfig.getConsumerWindowSize();
}
public int getProducerMaxRate()
{
- return cf.getProducerMaxRate();
+ return cfConfig.getProducerMaxRate();
}
public int getConfirmationWindowSize()
{
- return cf.getConfirmationWindowSize();
+ return cfConfig.getConfirmationWindowSize();
}
public int getDupsOKBatchSize()
{
- return cf.getDupsOKBatchSize();
+ return cfConfig.getDupsOKBatchSize();
}
public boolean isBlockOnAcknowledge()
{
- return cf.isBlockOnAcknowledge();
+ return cfConfig.isBlockOnAcknowledge();
}
public boolean isBlockOnNonDurableSend()
{
- return cf.isBlockOnNonDurableSend();
+ return cfConfig.isBlockOnNonDurableSend();
}
public boolean isBlockOnDurableSend()
{
- return cf.isBlockOnDurableSend();
+ return cfConfig.isBlockOnDurableSend();
}
public boolean isPreAcknowledge()
{
- return cf.isPreAcknowledge();
+ return cfConfig.isPreAcknowledge();
}
public String getName()
@@ -348,42 +387,42 @@
public long getConnectionTTL()
{
- return cf.getConnectionTTL();
+ return cfConfig.getConnectionTTL();
}
public int getReconnectAttempts()
{
- return cf.getReconnectAttempts();
+ return cfConfig.getReconnectAttempts();
}
public boolean isFailoverOnInitialConnection()
{
- return cf.isFailoverOnInitialConnection();
+ return cfConfig.isFailoverOnInitialConnection();
}
public int getMinLargeMessageSize()
{
- return cf.getMinLargeMessageSize();
+ return cfConfig.getMinLargeMessageSize();
}
public long getRetryInterval()
{
- return cf.getRetryInterval();
+ return cfConfig.getRetryInterval();
}
public double getRetryIntervalMultiplier()
{
- return cf.getRetryIntervalMultiplier();
+ return cfConfig.getRetryIntervalMultiplier();
}
public int getTransactionBatchSize()
{
- return cf.getTransactionBatchSize();
+ return cfConfig.getTransactionBatchSize();
}
public boolean isAutoGroup()
{
- return cf.isAutoGroup();
+ return cfConfig.isAutoGroup();
}
@Override
@@ -402,6 +441,18 @@
// Protected -----------------------------------------------------
+ private void recreateCF()
+ {
+ try
+ {
+ this.cf = jmsManager.recreateCF(this.name, this.cfConfig);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException (e.getMessage(), e);
+ }
+ }
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/JMSServerManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/JMSServerManager.java 2011-02-21 17:33:22 UTC (rev 10236)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/JMSServerManager.java 2011-02-21 21:59:42 UTC (rev 10237)
@@ -24,6 +24,7 @@
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
import org.hornetq.spi.core.naming.BindingRegistry;
@@ -158,6 +159,10 @@
* if a problem occurred destroying the topic
*/
boolean destroyTopic(String name) throws Exception;
+
+ /** Call this method to have a CF rebound to JNDI and stored on the Journal
+ * @throws Exception */
+ HornetQConnectionFactory recreateCF(String name, ConnectionFactoryConfiguration cf) throws Exception;
void createConnectionFactory(String name, boolean ha, JMSFactoryType cfType, String discoveryGroupName, String ... jndiBindings) throws Exception;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java 2011-02-21 17:33:22 UTC (rev 10236)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java 2011-02-21 21:59:42 UTC (rev 10237)
@@ -15,7 +15,6 @@
import java.util.List;
-import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.jms.JMSFactoryType;
import org.hornetq.core.journal.EncodingSupport;
@@ -28,6 +27,9 @@
*/
public interface ConnectionFactoryConfiguration extends EncodingSupport
{
+
+ boolean isPersisted();
+
String getName();
String[] getBindings();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2011-02-21 17:33:22 UTC (rev 10236)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2011-02-21 21:59:42 UTC (rev 10237)
@@ -42,6 +42,8 @@
// Attributes ----------------------------------------------------
private String name;
+
+ private boolean persisted;
private String[] bindings;
@@ -155,6 +157,11 @@
{
return name;
}
+
+ public boolean isPersisted()
+ {
+ return persisted;
+ }
/**
* @return the discoveryGroupName
@@ -499,6 +506,8 @@
*/
public void decode(final HornetQBuffer buffer)
{
+ persisted = true;
+
name = buffer.readSimpleString().toString();
discoveryGroupName = BufferHelper.readNullableSimpleStringAsString(buffer);
@@ -587,6 +596,8 @@
*/
public void encode(final HornetQBuffer buffer)
{
+ persisted = true;
+
BufferHelper.writeAsSimpleString(buffer, name);
BufferHelper.writeAsNullableSimpleString(buffer, discoveryGroupName);
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2011-02-21 17:33:22 UTC (rev 10236)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2011-02-21 21:59:42 UTC (rev 10237)
@@ -13,8 +13,16 @@
package org.hornetq.jms.server.impl;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import javax.naming.Context;
import javax.naming.InitialContext;
@@ -214,13 +222,13 @@
{
deploy();
}
-
- for (Runnable run: cachedCommands)
+
+ for (Runnable run : cachedCommands)
{
log.info("Running cached command for " + run);
run.run();
}
-
+
cachedCommands.clear();
}
@@ -401,7 +409,7 @@
{
return "createQueue for " + queueName;
}
-
+
public void runException() throws Exception
{
if (internalCreateQueue(queueName, selectorString, durable))
@@ -429,7 +437,10 @@
if (storeConfig && durable)
{
- storage.storeDestination(new PersistedDestination(PersistedType.Queue, queueName, selectorString, durable));
+ storage.storeDestination(new PersistedDestination(PersistedType.Queue,
+ queueName,
+ selectorString,
+ durable));
storage.addJNDI(PersistedType.Queue, queueName, usedJNDI);
}
}
@@ -452,7 +463,7 @@
{
return "createTopic for " + topicName;
}
-
+
public void runException() throws Exception
{
if (internalCreateTopic(topicName))
@@ -914,23 +925,49 @@
}
}
+ public synchronized HornetQConnectionFactory recreateCF(String name, ConnectionFactoryConfiguration cf) throws Exception
+ {
+ List<String> jndi = connectionFactoryJNDI.get(name);
+
+ if (jndi == null)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Connection Factory " + name + " doesn't exist");
+ }
+
+ String[] usedJNDI = jndi.toArray(new String[jndi.size()]);
+
+ HornetQConnectionFactory realCF = internalCreateCFPOJO(cf);
+
+ if (cf.isPersisted())
+ {
+ storage.storeConnectionFactory(new PersistedConnectionFactory(cf));
+ storage.addJNDI(PersistedType.ConnectionFactory, cf.getName(), usedJNDI);
+ }
+
+ for (String jndiElement : usedJNDI)
+ {
+ this.bindToJndi(jndiElement, realCF);
+ }
+
+ return realCF;
+ }
+
public synchronized void createConnectionFactory(final boolean storeConfig,
final ConnectionFactoryConfiguration cfConfig,
final String... jndi) throws Exception
{
runAfterActive(new RunnableException()
{
-
+
public String toString()
{
return "createConnectionFactory for " + cfConfig.getName();
}
-
public void runException() throws Exception
{
- HornetQConnectionFactory cf = internalCreateCF(cfConfig);
+ HornetQConnectionFactory cf = internalCreateCF(storeConfig, cfConfig);
ArrayList<String> bindings = new ArrayList<String>();
@@ -1053,7 +1090,8 @@
* @throws HornetQException
* @throws Exception
*/
- private HornetQConnectionFactory internalCreateCF(final ConnectionFactoryConfiguration cfConfig) throws HornetQException,
+ private HornetQConnectionFactory internalCreateCF(final boolean persisted,
+ final ConnectionFactoryConfiguration cfConfig) throws HornetQException,
Exception
{
checkInitialised();
@@ -1062,95 +1100,107 @@
if (cf == null)
{
- if (cfConfig.getDiscoveryGroupName() != null)
- {
- DiscoveryGroupConfiguration groupConfig = server.getConfiguration()
- .getDiscoveryGroupConfigurations()
- .get(cfConfig.getDiscoveryGroupName());
+ cf = internalCreateCFPOJO(cfConfig);
+ }
- if (groupConfig == null)
- {
- throw new HornetQException(HornetQException.ILLEGAL_STATE,
- "Discovery Group '" + cfConfig.getDiscoveryGroupName() +
- "' doesn't exist on maing config");
- }
+ connectionFactories.put(cfConfig.getName(), cf);
- if (cfConfig.isHA())
- {
- cf = HornetQJMSClient.createConnectionFactoryWithHA(groupConfig, cfConfig.getFactoryType());
- }
- else
- {
- cf = HornetQJMSClient.createConnectionFactoryWithoutHA(groupConfig, cfConfig.getFactoryType());
- }
+ jmsManagementService.registerConnectionFactory(cfConfig.getName(), cfConfig, cf);
+
+ return cf;
+ }
+
+ /**
+ * @param cfConfig
+ * @return
+ * @throws HornetQException
+ */
+ protected HornetQConnectionFactory internalCreateCFPOJO(final ConnectionFactoryConfiguration cfConfig) throws HornetQException
+ {
+ HornetQConnectionFactory cf;
+ if (cfConfig.getDiscoveryGroupName() != null)
+ {
+ DiscoveryGroupConfiguration groupConfig = server.getConfiguration()
+ .getDiscoveryGroupConfigurations()
+ .get(cfConfig.getDiscoveryGroupName());
+
+ if (groupConfig == null)
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE,
+ "Discovery Group '" + cfConfig.getDiscoveryGroupName() +
+ "' doesn't exist on maing config");
}
+
+ if (cfConfig.isHA())
+ {
+ cf = HornetQJMSClient.createConnectionFactoryWithHA(groupConfig, cfConfig.getFactoryType());
+ }
else
{
- if (cfConfig.getConnectorNames() == null || cfConfig.getConnectorNames().size() == 0)
- {
- throw new HornetQException(HornetQException.ILLEGAL_STATE,
- "Null Connector name passed to create ConnectionFactory");
- }
+ cf = HornetQJMSClient.createConnectionFactoryWithoutHA(groupConfig, cfConfig.getFactoryType());
+ }
+ }
+ else
+ {
+ if (cfConfig.getConnectorNames() == null || cfConfig.getConnectorNames().size() == 0)
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE,
+ "Null Connector name passed to create ConnectionFactory");
+ }
- TransportConfiguration[] configs = new TransportConfiguration[cfConfig.getConnectorNames().size()];
+ TransportConfiguration[] configs = new TransportConfiguration[cfConfig.getConnectorNames().size()];
- int count = 0;
- for (String name : cfConfig.getConnectorNames())
+ int count = 0;
+ for (String name : cfConfig.getConnectorNames())
+ {
+ TransportConfiguration connector = server.getConfiguration().getConnectorConfigurations().get(name);
+ if (connector == null)
{
- TransportConfiguration connector = server.getConfiguration().getConnectorConfigurations().get(name);
- if (connector == null)
- {
- throw new HornetQException(HornetQException.ILLEGAL_STATE, "Connector '" + name +
- "' not found on the main configuration file");
- }
- configs[count++] = connector;
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "Connector '" + name +
+ "' not found on the main configuration file");
}
+ configs[count++] = connector;
+ }
- if (cfConfig.isHA())
- {
- cf = HornetQJMSClient.createConnectionFactoryWithHA(cfConfig.getFactoryType(), configs);
- }
- else
- {
- cf = HornetQJMSClient.createConnectionFactoryWithoutHA(cfConfig.getFactoryType(), configs);
- }
+ if (cfConfig.isHA())
+ {
+ cf = HornetQJMSClient.createConnectionFactoryWithHA(cfConfig.getFactoryType(), configs);
}
-
- cf.setClientID(cfConfig.getClientID());
- cf.setClientFailureCheckPeriod(cfConfig.getClientFailureCheckPeriod());
- cf.setConnectionTTL(cfConfig.getConnectionTTL());
- cf.setCallTimeout(cfConfig.getCallTimeout());
- cf.setCacheLargeMessagesClient(cfConfig.isCacheLargeMessagesClient());
- cf.setMinLargeMessageSize(cfConfig.getMinLargeMessageSize());
- cf.setConsumerWindowSize(cfConfig.getConsumerWindowSize());
- cf.setConsumerMaxRate(cfConfig.getConsumerMaxRate());
- cf.setConfirmationWindowSize(cfConfig.getConfirmationWindowSize());
- cf.setProducerWindowSize(cfConfig.getProducerWindowSize());
- cf.setProducerMaxRate(cfConfig.getProducerMaxRate());
- cf.setBlockOnAcknowledge(cfConfig.isBlockOnAcknowledge());
- cf.setBlockOnDurableSend(cfConfig.isBlockOnDurableSend());
- cf.setBlockOnNonDurableSend(cfConfig.isBlockOnNonDurableSend());
- cf.setAutoGroup(cfConfig.isAutoGroup());
- cf.setPreAcknowledge(cfConfig.isPreAcknowledge());
- cf.setConnectionLoadBalancingPolicyClassName(cfConfig.getLoadBalancingPolicyClassName());
- cf.setTransactionBatchSize(cfConfig.getTransactionBatchSize());
- cf.setDupsOKBatchSize(cfConfig.getDupsOKBatchSize());
- cf.setUseGlobalPools(cfConfig.isUseGlobalPools());
- cf.setScheduledThreadPoolMaxSize(cfConfig.getScheduledThreadPoolMaxSize());
- cf.setThreadPoolMaxSize(cfConfig.getThreadPoolMaxSize());
- cf.setRetryInterval(cfConfig.getRetryInterval());
- cf.setRetryIntervalMultiplier(cfConfig.getRetryIntervalMultiplier());
- cf.setMaxRetryInterval(cfConfig.getMaxRetryInterval());
- cf.setReconnectAttempts(cfConfig.getReconnectAttempts());
- cf.setFailoverOnInitialConnection(cfConfig.isFailoverOnInitialConnection());
- cf.setCompressLargeMessage(cfConfig.isCompressLargeMessages());
- cf.setGroupID(cfConfig.getGroupID());
+ else
+ {
+ cf = HornetQJMSClient.createConnectionFactoryWithoutHA(cfConfig.getFactoryType(), configs);
+ }
}
- connectionFactories.put(cfConfig.getName(), cf);
-
- jmsManagementService.registerConnectionFactory(cfConfig.getName(), cf);
-
+ cf.setClientID(cfConfig.getClientID());
+ cf.setClientFailureCheckPeriod(cfConfig.getClientFailureCheckPeriod());
+ cf.setConnectionTTL(cfConfig.getConnectionTTL());
+ cf.setCallTimeout(cfConfig.getCallTimeout());
+ cf.setCacheLargeMessagesClient(cfConfig.isCacheLargeMessagesClient());
+ cf.setMinLargeMessageSize(cfConfig.getMinLargeMessageSize());
+ cf.setConsumerWindowSize(cfConfig.getConsumerWindowSize());
+ cf.setConsumerMaxRate(cfConfig.getConsumerMaxRate());
+ cf.setConfirmationWindowSize(cfConfig.getConfirmationWindowSize());
+ cf.setProducerWindowSize(cfConfig.getProducerWindowSize());
+ cf.setProducerMaxRate(cfConfig.getProducerMaxRate());
+ cf.setBlockOnAcknowledge(cfConfig.isBlockOnAcknowledge());
+ cf.setBlockOnDurableSend(cfConfig.isBlockOnDurableSend());
+ cf.setBlockOnNonDurableSend(cfConfig.isBlockOnNonDurableSend());
+ cf.setAutoGroup(cfConfig.isAutoGroup());
+ cf.setPreAcknowledge(cfConfig.isPreAcknowledge());
+ cf.setConnectionLoadBalancingPolicyClassName(cfConfig.getLoadBalancingPolicyClassName());
+ cf.setTransactionBatchSize(cfConfig.getTransactionBatchSize());
+ cf.setDupsOKBatchSize(cfConfig.getDupsOKBatchSize());
+ cf.setUseGlobalPools(cfConfig.isUseGlobalPools());
+ cf.setScheduledThreadPoolMaxSize(cfConfig.getScheduledThreadPoolMaxSize());
+ cf.setThreadPoolMaxSize(cfConfig.getThreadPoolMaxSize());
+ cf.setRetryInterval(cfConfig.getRetryInterval());
+ cf.setRetryIntervalMultiplier(cfConfig.getRetryIntervalMultiplier());
+ cf.setMaxRetryInterval(cfConfig.getMaxRetryInterval());
+ cf.setReconnectAttempts(cfConfig.getReconnectAttempts());
+ cf.setFailoverOnInitialConnection(cfConfig.isFailoverOnInitialConnection());
+ cf.setCompressLargeMessage(cfConfig.isCompressLargeMessages());
+ cf.setGroupID(cfConfig.getGroupID());
return cf;
}
@@ -1360,6 +1410,7 @@
{
if (registry != null)
{
+ registry.unbind(jndiName);
registry.bind(jndiName, objectToBind);
}
return true;
@@ -1437,7 +1488,7 @@
for (PersistedConnectionFactory cf : cfs)
{
- internalCreateCF(cf.getConfig());
+ internalCreateCF(true, cf.getConfig());
}
List<PersistedDestination> destinations = storage.recoverDestinations();
@@ -1576,7 +1627,7 @@
return false;
}
}
-
+
private boolean runAfterActive(RunnableException runnable) throws Exception
{
if (active)
@@ -1592,8 +1643,6 @@
}
}
-
-
private abstract class RunnableException implements Runnable
{
public void run()
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/management/JMSManagementService.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/management/JMSManagementService.java 2011-02-21 17:33:22 UTC (rev 10236)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/management/JMSManagementService.java 2011-02-21 21:59:42 UTC (rev 10237)
@@ -18,6 +18,7 @@
import org.hornetq.jms.client.HornetQQueue;
import org.hornetq.jms.client.HornetQTopic;
import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -39,7 +40,7 @@
void unregisterTopic(String name) throws Exception;
- void registerConnectionFactory(String name, HornetQConnectionFactory connectionFactory) throws Exception;
+ void registerConnectionFactory(String name, ConnectionFactoryConfiguration config, HornetQConnectionFactory connectionFactory) throws Exception;
void unregisterConnectionFactory(String name) throws Exception;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java 2011-02-21 17:33:22 UTC (rev 10236)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java 2011-02-21 21:59:42 UTC (rev 10237)
@@ -34,6 +34,7 @@
import org.hornetq.jms.management.impl.JMSServerControlImpl;
import org.hornetq.jms.management.impl.JMSTopicControlImpl;
import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
import org.hornetq.jms.server.management.JMSManagementService;
/*
@@ -121,10 +122,11 @@
}
public synchronized void registerConnectionFactory(final String name,
+ final ConnectionFactoryConfiguration cfConfig,
final HornetQConnectionFactory connectionFactory) throws Exception
{
ObjectName objectName = managementService.getObjectNameBuilder().getConnectionFactoryObjectName(name);
- JMSConnectionFactoryControlImpl control = new JMSConnectionFactoryControlImpl(connectionFactory, jmsServerManager, name);
+ JMSConnectionFactoryControlImpl control = new JMSConnectionFactoryControlImpl(cfConfig, connectionFactory, jmsServerManager, name);
managementService.registerInJMX(objectName, control);
managementService.registerInRegistry(ResourceNames.JMS_CONNECTION_FACTORY + name, control);
}
Added: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/ConnectionFactoryControlTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/ConnectionFactoryControlTest.java (rev 0)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/ConnectionFactoryControlTest.java 2011-02-21 21:59:42 UTC (rev 10237)
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.jms.server.management;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.jms.management.ConnectionFactoryControl;
+import org.hornetq.api.jms.management.JMSServerControl;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.server.impl.JMSServerManagerImpl;
+import org.hornetq.tests.integration.management.ManagementControlHelper;
+import org.hornetq.tests.integration.management.ManagementTestBase;
+import org.hornetq.tests.unit.util.InVMContext;
+
+/**
+ * A Connection Factory Control Test
+ *
+ * @author <a href="csuconic at redhat.com">Clebert Suconic</a>
+ *
+ * Created 13 nov. 2008 16:50:53
+ *
+ *
+ */
+public class ConnectionFactoryControlTest extends ManagementTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private HornetQServer server;
+
+ private JMSServerManagerImpl serverManager;
+
+ private InVMContext ctx;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testCreateCF() throws Exception
+ {
+ JMSServerControl control = createJMSControl();
+ control.createConnectionFactory("test", false, false, 0, "invm", "test");
+
+ ConnectionFactoryControl controlCF = createCFControl("test");
+
+ HornetQConnectionFactory cf = (HornetQConnectionFactory)ctx.lookup("test");
+
+ assertFalse(cf.isCompressLargeMessage());
+
+ controlCF.setCompressLargeMessages(true);
+
+ cf = (HornetQConnectionFactory)ctx.lookup("test");
+ assertTrue(cf.isCompressLargeMessage());
+
+ stopServer();
+
+ Thread.sleep(500);
+
+ startServer();
+
+ cf = (HornetQConnectionFactory)ctx.lookup("test");
+ assertTrue(cf.isCompressLargeMessage());
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ startServer();
+
+ }
+
+ /**
+ * @throws Exception
+ */
+ protected void startServer() throws Exception
+ {
+ Configuration conf = createDefaultConfig(false);
+ conf.setClustered(false);
+ conf.getConnectorConfigurations().put("invm", new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ conf.setSecurityEnabled(false);
+ conf.setJMXManagementEnabled(true);
+ conf.setSharedStore(false);
+ conf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory"));
+ server = HornetQServers.newHornetQServer(conf, mbeanServer, true);
+ server.start();
+
+ serverManager = new JMSServerManagerImpl(server);
+ serverManager.start();
+
+ ctx = new InVMContext();
+
+ serverManager.setContext(ctx);
+ serverManager.activated();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ stopServer();
+
+ super.tearDown();
+ }
+
+ /**
+ * @throws Exception
+ */
+ protected void stopServer() throws Exception
+ {
+ serverManager.stop();
+
+ server.stop();
+
+ serverManager = null;
+
+ server = null;
+ }
+
+ protected ConnectionFactoryControl createCFControl(String name) throws Exception
+ {
+ return ManagementControlHelper.createConnectionFactoryControl(name, mbeanServer);
+ }
+
+ protected JMSServerControl createJMSControl() throws Exception
+ {
+ return ManagementControlHelper.createJMSServerControl(mbeanServer);
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/DivertControlTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/DivertControlTest.java 2011-02-21 17:33:22 UTC (rev 10236)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/DivertControlTest.java 2011-02-21 21:59:42 UTC (rev 10237)
@@ -15,14 +15,12 @@
import junit.framework.Assert;
-import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.management.DivertControl;
import org.hornetq.api.core.management.ObjectNameBuilder;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.DivertConfiguration;
import org.hornetq.core.config.CoreQueueConfiguration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.config.DivertConfiguration;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQServer;
More information about the hornetq-commits
mailing list