[hornetq-commits] JBoss hornetq SVN: r12261 - in branches/Branch_2_2_EAP/src/main/org/hornetq: ra/recovery and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Mar 7 18:59:46 EST 2012


Author: clebert.suconic
Date: 2012-03-07 18:59:46 -0500 (Wed, 07 Mar 2012)
New Revision: 12261

Added:
   branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQRegistryBase.java
Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQResourceRecovery.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/RecoveryRegistry.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/ra/recovery/RecoveryManager.java
Log:
JBPAPP-8366 & JBPAPP-8377 - fixing leaks and duplicated resources (commit first part)

Added: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQRegistryBase.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQRegistryBase.java	                        (rev 0)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQRegistryBase.java	2012-03-07 23:59:46 UTC (rev 12261)
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.server.recovery;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.jboss.tm.XAResourceRecoveryRegistry;
+
+/**
+ * This class is a base class for the integration layer where
+ * we verify if a given connection factory already have a recovery registered
+ *
+ * @author Clebert
+ * @author Andy Taylor
+ *
+ *
+ */
+public abstract class HornetQRegistryBase implements RecoveryRegistry
+{
+   // Constants -----------------------------------------------------
+   
+   private static final Logger log = Logger.getLogger(HornetQRegistryBase.class);
+
+   // Attributes ----------------------------------------------------
+
+   private static Set<HornetQResourceRecovery> configSet = new HashSet<HornetQResourceRecovery>();
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+   
+   public abstract XAResourceRecoveryRegistry getTMRegistry();
+
+   public HornetQResourceRecovery register(final HornetQResourceRecovery resourceRecovery)
+   {
+      synchronized (configSet)
+      {
+         HornetQResourceRecovery usedInstance = locateSimilarResource(resourceRecovery);
+         if (usedInstance == null)
+         {
+            if (log.isDebugEnabled())
+            {
+               log.debug("Adding " + resourceRecovery.getConfig() + " resource = " + resourceRecovery);
+            }
+            usedInstance = resourceRecovery;
+            configSet.add(usedInstance);
+            getTMRegistry().addXAResourceRecovery(usedInstance);
+         }
+         usedInstance.incrementUsage();
+         return usedInstance;
+      }
+   }
+
+
+
+   public synchronized void unRegister(final HornetQResourceRecovery resourceRecovery)
+   {
+      synchronized (configSet)
+      {
+         // The same resource could have been reused by more than one resource manager or factory
+         if (resourceRecovery.decrementUsage() == 0)
+         {
+            getTMRegistry().removeXAResourceRecovery(resourceRecovery);
+         }
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+   
+   private static HornetQResourceRecovery locateSimilarResource(HornetQResourceRecovery resourceInput)
+   {
+      HornetQConnectionFactory factory = resourceInput.getConfig().getFactory();
+      
+      TransportConfiguration[] transportConfigurations = resourceInput.getConfig().getFactory().getServerLocator()
+               .getStaticTransportConfigurations();
+
+      
+      if (log.isTraceEnabled())
+      {
+         log.trace("############################################## looking for a place on " + Arrays.toString(transportConfigurations));
+      }
+      
+      for (HornetQResourceRecovery resourceScan : configSet)
+      {
+         XARecoveryConfig xaRecoveryConfig = resourceScan.getConfig();
+
+         if (transportConfigurations != null)
+         {
+            TransportConfiguration[] xaConfigurations = xaRecoveryConfig.getHornetQConnectionFactory().getServerLocator()
+                  .getStaticTransportConfigurations();
+            
+            if (log.isTraceEnabled())
+            {
+               log.trace("Checking " + Arrays.toString(transportConfigurations) + " against " + Arrays.toString(xaConfigurations));
+            }
+
+            if (xaConfigurations == null)
+            {
+               continue;
+            }
+            if (transportConfigurations.length != xaConfigurations.length)
+            {
+               if (log.isTraceEnabled())
+               {
+                  log.trace(Arrays.toString(transportConfigurations) + " != " + Arrays.toString(xaConfigurations) + " because of size");
+               }
+               continue;
+            }
+            boolean theSame = true;
+            for (int i = 0; i < transportConfigurations.length; i++)
+            {
+               TransportConfiguration tc = transportConfigurations[i];
+               TransportConfiguration xaTc = xaConfigurations[i];
+               if (!tc.equals(xaTc))
+               {
+                  log.info(Arrays.toString(transportConfigurations) + " != " + Arrays.toString(xaConfigurations) + " because of " + tc + " != " + xaTc);
+                  theSame = false;
+                  break;
+               }
+            }
+            if (theSame)
+            {
+               return resourceScan;
+            }
+         } else
+         {
+            DiscoveryGroupConfiguration discoveryGroupConfiguration = xaRecoveryConfig.getHornetQConnectionFactory()
+                  .getServerLocator().getDiscoveryGroupConfiguration();
+            if (discoveryGroupConfiguration != null && discoveryGroupConfiguration.equals(factory.getDiscoveryGroupConfiguration()))
+            {
+               return resourceScan;
+            }
+         }
+      }
+
+      return null;
+
+   }
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQResourceRecovery.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQResourceRecovery.java	2012-03-07 14:02:01 UTC (rev 12260)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQResourceRecovery.java	2012-03-07 23:59:46 UTC (rev 12261)
@@ -32,16 +32,39 @@
 public class HornetQResourceRecovery implements XAResourceRecovery
 {
    private final XARecoveryConfig config;
+   
+   private final XAResource[] xaResources;
+   
+   private int usage;
 
    public HornetQResourceRecovery(XARecoveryConfig config)
    {
       this.config = config;
+      this.xaResources =  new XAResource[]{new HornetQXAResourceWrapper(config)};
    }
 
    public XAResource[] getXAResources()
    {
-      return new XAResource[]{new HornetQXAResourceWrapper(config)};
+      return xaResources;
    }
+   
+   public XARecoveryConfig getConfig()
+   {
+      return config;
+   }
+   
+   /** we may have several connection factories referencing the same connection recovery entry.
+    *  Because of that we need to make a count of the number of the instances that are referencing it,
+    *  so we will remove it as soon as we are done */
+   public synchronized int incrementUsage()
+   {
+      return ++usage;
+   }
+   
+   public synchronized int decrementUsage()
+   {
+      return --usage;
+   }
 
    @Override
    public boolean equals(Object o)

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/RecoveryRegistry.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/RecoveryRegistry.java	2012-03-07 14:02:01 UTC (rev 12260)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/RecoveryRegistry.java	2012-03-07 23:59:46 UTC (rev 12261)
@@ -28,7 +28,7 @@
  */
 public interface RecoveryRegistry
 {
-   void register(HornetQResourceRecovery resourceRecovery);
+   HornetQResourceRecovery register(HornetQResourceRecovery resourceRecovery);
 
    void unRegister(HornetQResourceRecovery xaRecoveryConfig);
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java	2012-03-07 14:02:01 UTC (rev 12260)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java	2012-03-07 23:59:46 UTC (rev 12261)
@@ -16,6 +16,9 @@
 import org.hornetq.jms.client.HornetQConnectionFactory;
 
 /**
+ * 
+ * This represents the configuration of a single connection factory.
+ * 
  * @author <a href="mailto:andy.taylor at jboss.com">Andy Taylor</a>
  *
  * A wrapper around info needed for the xa recovery resource
@@ -49,6 +52,11 @@
    {
       return password;
    }
+   
+   public HornetQConnectionFactory getFactory()
+   {
+      return hornetQConnectionFactory;
+   }
 
    @Override
    public boolean equals(Object o)

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/ra/recovery/RecoveryManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/ra/recovery/RecoveryManager.java	2012-03-07 14:02:01 UTC (rev 12260)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/ra/recovery/RecoveryManager.java	2012-03-07 23:59:46 UTC (rev 12261)
@@ -21,18 +21,16 @@
 */
 package org.hornetq.ra.recovery;
 
-import org.hornetq.api.core.DiscoveryGroupConfiguration;
-import org.hornetq.api.core.TransportConfiguration;
+import java.util.Set;
+
 import org.hornetq.core.logging.Logger;
 import org.hornetq.jms.client.HornetQConnectionFactory;
 import org.hornetq.jms.server.recovery.HornetQResourceRecovery;
 import org.hornetq.jms.server.recovery.RecoveryRegistry;
 import org.hornetq.jms.server.recovery.XARecoveryConfig;
 import org.hornetq.ra.Util;
+import org.hornetq.utils.ConcurrentHashSet;
 
-import java.util.HashMap;
-import java.util.Map;
-
 /**
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
  *         9/21/11
@@ -44,9 +42,9 @@
    private RecoveryRegistry registry;
 
    private String resourceRecoveryClassNames = "org.jboss.as.integration.hornetq.recovery.AS5RecoveryRegistry";
+   
+   private final Set<HornetQResourceRecovery> resources = new ConcurrentHashSet<HornetQResourceRecovery>(); 
 
-   private Map<XARecoveryConfig, HornetQResourceRecovery> configMap = new HashMap<XARecoveryConfig, HornetQResourceRecovery>();
-
    public void start()
    {
       locateRecoveryRegistry();
@@ -56,15 +54,19 @@
    {
       log.debug("registering recovery for factory : " + factory);
       
-      if(!isRegistered(factory) && registry != null)
+      XARecoveryConfig xaRecoveryConfig = new XARecoveryConfig(factory, userName, password);
+      HornetQResourceRecovery resourceRecovery = new HornetQResourceRecovery(xaRecoveryConfig);
+      
+      if (registry != null)
       {
-         XARecoveryConfig xaRecoveryConfig = new XARecoveryConfig(factory, userName, password);
-         HornetQResourceRecovery resourceRecovery = new HornetQResourceRecovery(xaRecoveryConfig);
-         registry.register(resourceRecovery);
-         configMap.put(xaRecoveryConfig, resourceRecovery);
-         return resourceRecovery;
+         resourceRecovery = registry.register(resourceRecovery);
+         if (resourceRecovery != null)
+         {
+            resources.add(resourceRecovery);
+         }
       }
-      return null;
+      
+      return resourceRecovery;
    }
 
    public void unRegister(HornetQResourceRecovery resourceRecovery)
@@ -74,11 +76,11 @@
 
    public void stop()
    {
-      for (HornetQResourceRecovery hornetQResourceRecovery : configMap.values())
+      for (HornetQResourceRecovery hornetQResourceRecovery : resources)
       {
          registry.unRegister(hornetQResourceRecovery);
       }
-      configMap.clear();
+      resources.clear();
    }
 
    private void locateRecoveryRegistry()
@@ -105,9 +107,9 @@
       {
          registry = new RecoveryRegistry()
          {
-            public void register(HornetQResourceRecovery resourceRecovery)
+            public HornetQResourceRecovery register(HornetQResourceRecovery resourceRecovery)
             {
-               //no op
+               return null;
             }
 
             public void unRegister(HornetQResourceRecovery xaRecoveryConfig)
@@ -123,49 +125,4 @@
    }
 
 
-   public boolean isRegistered(HornetQConnectionFactory factory)
-   {
-      for (XARecoveryConfig xaRecoveryConfig : configMap.keySet())
-      {
-         TransportConfiguration[] transportConfigurations = factory.getServerLocator().getStaticTransportConfigurations();
-
-         if (transportConfigurations != null)
-         {
-            TransportConfiguration[] xaConfigurations = xaRecoveryConfig.getHornetQConnectionFactory().getServerLocator().getStaticTransportConfigurations();
-            if(xaConfigurations == null)
-            {
-               break;
-            }
-            if(transportConfigurations.length != xaConfigurations.length)
-            {
-               break;
-            }
-            boolean theSame=true;
-            for(int i = 0; i < transportConfigurations.length; i++)
-            {
-              TransportConfiguration tc = transportConfigurations[i];
-              TransportConfiguration xaTc = xaConfigurations[i];
-              if(!tc.equals(xaTc))
-              {
-                 theSame = false;
-                 break;
-              }
-            }
-            if(theSame)
-            {
-               return theSame;
-            }
-         }
-         else
-         {
-            DiscoveryGroupConfiguration discoveryGroupConfiguration = xaRecoveryConfig.getHornetQConnectionFactory().getServerLocator().getDiscoveryGroupConfiguration();
-            if(discoveryGroupConfiguration != null && discoveryGroupConfiguration.equals(factory.getDiscoveryGroupConfiguration()))
-            {
-               return true;
-            }
-         }
-      }
-      return false;
-   }
-
 }



More information about the hornetq-commits mailing list