Author: ataylor
Date: 2012-02-03 05:55:59 -0500 (Fri, 03 Feb 2012)
New Revision: 12070
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQRAManagedConnectionFactory.java
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQResourceAdapter.java
Log:
https://issues.jboss.org/browse/JBPAPP-8038
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2012-02-03
10:25:34 UTC (rev 12069)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2012-02-03
10:55:59 UTC (rev 12070)
@@ -349,6 +349,10 @@
throw new IllegalArgumentException("Cannot find channel with id " + id
+ " to close");
}
+ if(failingOver)
+ {
+ unlock();
+ }
closed = true;
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQRAManagedConnectionFactory.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQRAManagedConnectionFactory.java 2012-02-03
10:25:34 UTC (rev 12069)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQRAManagedConnectionFactory.java 2012-02-03
10:55:59 UTC (rev 12070)
@@ -77,6 +77,12 @@
*/
private HornetQConnectionFactory connectionFactory;
+
+ /**
+ * Connection Factory used if properties are set
+ */
+ private HornetQConnectionFactory recoveryConnectionFactory;
+
/*
* The resource recovery if there is one
* */
@@ -141,7 +147,8 @@
if (connectionFactory == null)
{
connectionFactory = ra.createHornetQConnectionFactory(mcfProperties);
- resourceRecovery = ra.getRecoveryManager().register(connectionFactory, null,
null);
+ recoveryConnectionFactory =
ra.createRecoveryHornetQConnectionFactory(mcfProperties);
+ resourceRecovery = ra.getRecoveryManager().register(recoveryConnectionFactory,
null, null);
}
return cf;
}
@@ -314,6 +321,7 @@
}
this.ra = (HornetQResourceAdapter)ra;
+ this.ra.setManagedConnectionFactory(this);
}
/**
@@ -758,7 +766,8 @@
if (connectionFactory == null)
{
connectionFactory = ra.createHornetQConnectionFactory(mcfProperties);
- resourceRecovery = ra.getRecoveryManager().register(connectionFactory, null,
null);
+ recoveryConnectionFactory =
ra.createRecoveryHornetQConnectionFactory(mcfProperties);
+ resourceRecovery = ra.getRecoveryManager().register(recoveryConnectionFactory,
null, null);
}
return connectionFactory;
}
@@ -810,5 +819,17 @@
{
ra.getRecoveryManager().unRegister(resourceRecovery);
}
+
+ if(connectionFactory != null)
+ {
+ connectionFactory.close();
+ connectionFactory = null;
+ }
+
+ if(recoveryConnectionFactory != null)
+ {
+ recoveryConnectionFactory.close();
+ recoveryConnectionFactory = null;
+ }
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQResourceAdapter.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2012-02-03
10:25:34 UTC (rev 12069)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2012-02-03
10:55:59 UTC (rev 12070)
@@ -13,6 +13,7 @@
package org.hornetq.ra;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
@@ -103,13 +104,17 @@
private final Map<ActivationSpec, HornetQActivation> activations;
private HornetQConnectionFactory defaultHornetQConnectionFactory;
+
+ private HornetQConnectionFactory recoveryHornetQConnectionFactory;
private TransactionManager tm;
private String unparsedJndiParams;
- RecoveryManager recoveryManager;
+ private RecoveryManager recoveryManager;
+ private final List<HornetQRAManagedConnectionFactory> managedConnectionFactories
= new ArrayList<HornetQRAManagedConnectionFactory>();
+
/**
* Constructor
*/
@@ -240,7 +245,7 @@
{
if (HornetQResourceAdapter.trace)
{
- HornetQResourceAdapter.log.trace("stop()");
+
HornetQResourceAdapter.log.info("stop()*******************************************************************");
}
for (Map.Entry<ActivationSpec, HornetQActivation> entry :
activations.entrySet())
@@ -257,11 +262,21 @@
activations.clear();
+ for (HornetQRAManagedConnectionFactory managedConnectionFactory :
managedConnectionFactories)
+ {
+ managedConnectionFactory.stop();
+ }
+
+ managedConnectionFactories.clear();
+
if (defaultHornetQConnectionFactory != null)
{
defaultHornetQConnectionFactory.close();
+ }
- XARecoveryConfig xaRecoveryConfig = new
XARecoveryConfig(defaultHornetQConnectionFactory, raProperties.getUserName(),
raProperties.getPassword());
+ if(recoveryHornetQConnectionFactory != null)
+ {
+ recoveryHornetQConnectionFactory.close();
}
recoveryManager.stop();
@@ -1368,7 +1383,8 @@
protected void setup() throws HornetQException
{
defaultHornetQConnectionFactory = createHornetQConnectionFactory(raProperties);
- recoveryManager.register(defaultHornetQConnectionFactory,
raProperties.getUserName(), raProperties.getPassword());
+ recoveryHornetQConnectionFactory =
createRecoveryHornetQConnectionFactory(raProperties);
+ recoveryManager.register(recoveryHornetQConnectionFactory,
raProperties.getUserName(), raProperties.getPassword());
}
public Map<ActivationSpec, HornetQActivation> getActivations()
@@ -1542,6 +1558,106 @@
return cf;
}
+ public HornetQConnectionFactory createRecoveryHornetQConnectionFactory(final
ConnectionFactoryProperties overrideProperties)
+ {
+ HornetQConnectionFactory cf;
+ List<String> connectorClassName =
overrideProperties.getParsedConnectorClassNames() != null ?
overrideProperties.getParsedConnectorClassNames()
+ :
raProperties.getParsedConnectorClassNames();
+
+ String discoveryAddress = overrideProperties.getDiscoveryAddress() != null ?
overrideProperties.getDiscoveryAddress()
+ :
getDiscoveryAddress();
+
+ if (discoveryAddress != null)
+ {
+ Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ?
overrideProperties.getDiscoveryPort()
+ :
getDiscoveryPort();
+
+ if(discoveryPort == null)
+ {
+ discoveryPort = HornetQClient.DEFAULT_DISCOVERY_PORT;
+ }
+
+ DiscoveryGroupConfiguration groupConfiguration = new
DiscoveryGroupConfiguration(discoveryAddress, discoveryPort);
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("Creating Recovery Connection Factory on the resource adapter
for discovery=" + groupConfiguration);
+ }
+
+ Long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ?
overrideProperties.getDiscoveryRefreshTimeout()
+ :
raProperties.getDiscoveryRefreshTimeout();
+ if (refreshTimeout == null)
+ {
+ refreshTimeout = HornetQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT;
+ }
+
+ Long initialTimeout = overrideProperties.getDiscoveryInitialWaitTimeout() !=
null ? overrideProperties.getDiscoveryInitialWaitTimeout()
+ :
raProperties.getDiscoveryInitialWaitTimeout();
+
+ if(initialTimeout == null)
+ {
+ initialTimeout = HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT;
+ }
+
+ groupConfiguration.setDiscoveryInitialWaitTimeout(initialTimeout);
+
+ groupConfiguration.setRefreshTimeout(refreshTimeout);
+
+ cf = HornetQJMSClient.createConnectionFactoryWithoutHA(groupConfiguration,
JMSFactoryType.XA_CF);
+ }
+ else
+ if (connectorClassName != null)
+ {
+ TransportConfiguration[] transportConfigurations = new
TransportConfiguration[connectorClassName.size()];
+
+ List<Map<String, Object>> connectionParams;
+ if(overrideProperties.getParsedConnectorClassNames() != null)
+ {
+ connectionParams = overrideProperties.getParsedConnectionParameters();
+ }
+ else
+ {
+ connectionParams = raProperties.getParsedConnectionParameters();
+ }
+
+ for (int i = 0; i < connectorClassName.size(); i++)
+ {
+ TransportConfiguration tc;
+ if(connectionParams == null || i >= connectionParams.size())
+ {
+ tc = new TransportConfiguration(connectorClassName.get(i));
+ log.debug("No connector params provided using default");
+ }
+ else
+ {
+ tc = new TransportConfiguration(connectorClassName.get(i),
connectionParams.get(i));
+ }
+
+ transportConfigurations[i] = tc;
+ }
+
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("Creating Recovery Connection Factory on the resource adapter
for transport=" + transportConfigurations);
+ }
+
+ cf = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.XA_CF,
transportConfigurations);
+
+ }
+ else
+ {
+ throw new IllegalArgumentException("must provide either TransportType or
DiscoveryGroupAddress and DiscoveryGroupPort for HornetQ ResourceAdapter Connection
Factory");
+ }
+ setParams(cf, overrideProperties);
+
+ //now make sure we are HA in any way
+
+ cf.setReconnectAttempts(0);
+ cf.setInitialConnectAttempts(0);
+ return cf;
+ }
+
public Map<String, Object> overrideConnectionParameters(final Map<String,
Object> connectionParams,
final Map<String,
Object> overrideConnectionParams)
{
@@ -1735,4 +1851,9 @@
cf.setConnectionLoadBalancingPolicyClassName(val5);
}
}
+
+ public void setManagedConnectionFactory(HornetQRAManagedConnectionFactory
hornetQRAManagedConnectionFactory)
+ {
+ managedConnectionFactories.add(hornetQRAManagedConnectionFactory);
+ }
}