[hornetq-commits] JBoss hornetq SVN: r12070 - in branches/Branch_2_2_EAP/src/main/org/hornetq: ra and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Feb 3 05:56:00 EST 2012


Author: ataylor
Date: 2012-02-03 05:55:59 -0500 (Fri, 03 Feb 2012)
New Revision: 12070

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQRAManagedConnectionFactory.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQResourceAdapter.java
Log:
https://issues.jboss.org/browse/JBPAPP-8038

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java	2012-02-03 10:25:34 UTC (rev 12069)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java	2012-02-03 10:55:59 UTC (rev 12070)
@@ -349,6 +349,10 @@
          throw new IllegalArgumentException("Cannot find channel with id " + id + " to close");
       }
 
+      if(failingOver)
+      {
+         unlock();
+      }
       closed = true;
    }
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQRAManagedConnectionFactory.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQRAManagedConnectionFactory.java	2012-02-03 10:25:34 UTC (rev 12069)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQRAManagedConnectionFactory.java	2012-02-03 10:55:59 UTC (rev 12070)
@@ -77,6 +77,12 @@
     */
    private HornetQConnectionFactory connectionFactory;
 
+
+   /**
+    * Connection Factory used if properties are set
+    */
+   private HornetQConnectionFactory recoveryConnectionFactory;
+
    /*
    * The resource recovery if there is one
    * */
@@ -141,7 +147,8 @@
       if (connectionFactory == null)
       {
          connectionFactory = ra.createHornetQConnectionFactory(mcfProperties);
-         resourceRecovery = ra.getRecoveryManager().register(connectionFactory, null, null);
+         recoveryConnectionFactory = ra.createRecoveryHornetQConnectionFactory(mcfProperties);
+         resourceRecovery = ra.getRecoveryManager().register(recoveryConnectionFactory, null, null);
       }
       return cf;
    }
@@ -314,6 +321,7 @@
       }
 
       this.ra = (HornetQResourceAdapter)ra;
+      this.ra.setManagedConnectionFactory(this);
    }
 
    /**
@@ -758,7 +766,8 @@
       if (connectionFactory == null)
       {
          connectionFactory = ra.createHornetQConnectionFactory(mcfProperties);
-         resourceRecovery = ra.getRecoveryManager().register(connectionFactory, null, null);
+         recoveryConnectionFactory = ra.createRecoveryHornetQConnectionFactory(mcfProperties);
+         resourceRecovery = ra.getRecoveryManager().register(recoveryConnectionFactory, null, null);
       }
       return connectionFactory;
    }
@@ -810,5 +819,17 @@
       {
          ra.getRecoveryManager().unRegister(resourceRecovery);
       }
+
+      if(connectionFactory != null)
+      {
+         connectionFactory.close();
+         connectionFactory = null;
+      }
+
+      if(recoveryConnectionFactory != null)
+      {
+         recoveryConnectionFactory.close();
+         recoveryConnectionFactory = null;
+      }
    }
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQResourceAdapter.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQResourceAdapter.java	2012-02-03 10:25:34 UTC (rev 12069)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQResourceAdapter.java	2012-02-03 10:55:59 UTC (rev 12070)
@@ -13,6 +13,7 @@
 package org.hornetq.ra;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.List;
@@ -103,13 +104,17 @@
    private final Map<ActivationSpec, HornetQActivation> activations;
 
    private HornetQConnectionFactory defaultHornetQConnectionFactory;
+
+   private HornetQConnectionFactory recoveryHornetQConnectionFactory;
    
    private TransactionManager tm;
 
    private String unparsedJndiParams;
 
-   RecoveryManager recoveryManager;
+   private RecoveryManager recoveryManager;
 
+   private final List<HornetQRAManagedConnectionFactory> managedConnectionFactories = new ArrayList<HornetQRAManagedConnectionFactory>();
+
    /**
     * Constructor
     */
@@ -240,7 +245,7 @@
    {
       if (HornetQResourceAdapter.trace)
       {
-         HornetQResourceAdapter.log.trace("stop()");
+         HornetQResourceAdapter.log.info("stop()*******************************************************************");
       }
 
       for (Map.Entry<ActivationSpec, HornetQActivation> entry : activations.entrySet())
@@ -257,11 +262,21 @@
 
       activations.clear();
 
+      for (HornetQRAManagedConnectionFactory managedConnectionFactory : managedConnectionFactories)
+      {
+         managedConnectionFactory.stop();
+      }
+
+      managedConnectionFactories.clear();
+
       if (defaultHornetQConnectionFactory != null)
       {
          defaultHornetQConnectionFactory.close();
+      }
 
-         XARecoveryConfig xaRecoveryConfig = new XARecoveryConfig(defaultHornetQConnectionFactory, raProperties.getUserName(), raProperties.getPassword());
+      if(recoveryHornetQConnectionFactory != null)
+      {
+         recoveryHornetQConnectionFactory.close();
       }
 
       recoveryManager.stop();
@@ -1368,7 +1383,8 @@
    protected void setup() throws HornetQException
    {
       defaultHornetQConnectionFactory = createHornetQConnectionFactory(raProperties);
-      recoveryManager.register(defaultHornetQConnectionFactory, raProperties.getUserName(), raProperties.getPassword());
+      recoveryHornetQConnectionFactory = createRecoveryHornetQConnectionFactory(raProperties);
+      recoveryManager.register(recoveryHornetQConnectionFactory, raProperties.getUserName(), raProperties.getPassword());
    }
 
    public Map<ActivationSpec, HornetQActivation> getActivations()
@@ -1542,6 +1558,106 @@
       return cf;
    }
 
+   public HornetQConnectionFactory createRecoveryHornetQConnectionFactory(final ConnectionFactoryProperties overrideProperties)
+   {
+      HornetQConnectionFactory cf;
+      List<String> connectorClassName = overrideProperties.getParsedConnectorClassNames() != null ? overrideProperties.getParsedConnectorClassNames()
+                                                                                    : raProperties.getParsedConnectorClassNames();
+
+      String discoveryAddress = overrideProperties.getDiscoveryAddress() != null ? overrideProperties.getDiscoveryAddress()
+                                                                                : getDiscoveryAddress();
+
+      if (discoveryAddress != null)
+      {
+         Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort()
+                                                                              : getDiscoveryPort();
+
+         if(discoveryPort == null)
+         {
+            discoveryPort = HornetQClient.DEFAULT_DISCOVERY_PORT;
+         }
+
+         DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration(discoveryAddress, discoveryPort);
+
+         if (log.isDebugEnabled())
+         {
+            log.debug("Creating Recovery Connection Factory on the resource adapter for discovery=" + groupConfiguration);
+         }
+
+         Long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout()
+                                                                    : raProperties.getDiscoveryRefreshTimeout();
+         if (refreshTimeout == null)
+         {
+            refreshTimeout = HornetQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT;
+         }
+
+         Long initialTimeout = overrideProperties.getDiscoveryInitialWaitTimeout() != null ? overrideProperties.getDiscoveryInitialWaitTimeout()
+                                                                        : raProperties.getDiscoveryInitialWaitTimeout();
+
+         if(initialTimeout == null)
+         {
+            initialTimeout = HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT;
+         }
+
+         groupConfiguration.setDiscoveryInitialWaitTimeout(initialTimeout);
+
+         groupConfiguration.setRefreshTimeout(refreshTimeout);
+
+         cf = HornetQJMSClient.createConnectionFactoryWithoutHA(groupConfiguration, JMSFactoryType.XA_CF);
+      }
+      else
+      if (connectorClassName != null)
+      {
+         TransportConfiguration[] transportConfigurations = new TransportConfiguration[connectorClassName.size()];
+
+         List<Map<String, Object>> connectionParams;
+         if(overrideProperties.getParsedConnectorClassNames() != null)
+         {
+            connectionParams = overrideProperties.getParsedConnectionParameters();
+         }
+         else
+         {
+            connectionParams = raProperties.getParsedConnectionParameters();
+         }
+
+         for (int i = 0; i < connectorClassName.size(); i++)
+         {
+            TransportConfiguration tc;
+            if(connectionParams == null || i >= connectionParams.size())
+            {
+               tc = new TransportConfiguration(connectorClassName.get(i));
+               log.debug("No connector params provided using default");
+            }
+            else
+            {
+               tc = new TransportConfiguration(connectorClassName.get(i), connectionParams.get(i));
+            }
+
+            transportConfigurations[i] = tc;
+         }
+
+
+         if (log.isDebugEnabled())
+         {
+            log.debug("Creating Recovery Connection Factory on the resource adapter for transport=" + transportConfigurations);
+         }
+
+         cf = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.XA_CF, transportConfigurations);
+
+      }
+      else
+      {
+         throw new IllegalArgumentException("must provide either TransportType or DiscoveryGroupAddress and DiscoveryGroupPort for HornetQ ResourceAdapter Connection Factory");
+      }
+      setParams(cf, overrideProperties);
+
+      //now make sure we are HA in any way
+
+      cf.setReconnectAttempts(0);
+      cf.setInitialConnectAttempts(0);
+      return cf;
+   }
+
    public Map<String, Object> overrideConnectionParameters(final Map<String, Object> connectionParams,
                                                            final Map<String, Object> overrideConnectionParams)
    {
@@ -1735,4 +1851,9 @@
          cf.setConnectionLoadBalancingPolicyClassName(val5);
       }
    }
+
+   public void setManagedConnectionFactory(HornetQRAManagedConnectionFactory hornetQRAManagedConnectionFactory)
+   {
+      managedConnectionFactories.add(hornetQRAManagedConnectionFactory);
+   }
 }



More information about the hornetq-commits mailing list