[hornetq-commits] JBoss hornetq SVN: r12261 - in branches/Branch_2_2_EAP/src/main/org/hornetq: ra/recovery and 1 other directory.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Mar 7 18:59:46 EST 2012
Author: clebert.suconic
Date: 2012-03-07 18:59:46 -0500 (Wed, 07 Mar 2012)
New Revision: 12261
Added:
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQRegistryBase.java
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQResourceRecovery.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/RecoveryRegistry.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/recovery/RecoveryManager.java
Log:
JBPAPP-8366 & JBPAPP-8377 - fixing leaks and duplicated resources (commit first part)
Added: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQRegistryBase.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQRegistryBase.java (rev 0)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQRegistryBase.java 2012-03-07 23:59:46 UTC (rev 12261)
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.server.recovery;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.jboss.tm.XAResourceRecoveryRegistry;
+
+/**
+ * This class is a base class for the integration layer where
+ * we verify if a given connection factory already have a recovery registered
+ *
+ * @author Clebert
+ * @author Andy Taylor
+ *
+ *
+ */
+public abstract class HornetQRegistryBase implements RecoveryRegistry
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(HornetQRegistryBase.class);
+
+ // Attributes ----------------------------------------------------
+
+ private static Set<HornetQResourceRecovery> configSet = new HashSet<HornetQResourceRecovery>();
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public abstract XAResourceRecoveryRegistry getTMRegistry();
+
+ public HornetQResourceRecovery register(final HornetQResourceRecovery resourceRecovery)
+ {
+ synchronized (configSet)
+ {
+ HornetQResourceRecovery usedInstance = locateSimilarResource(resourceRecovery);
+ if (usedInstance == null)
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug("Adding " + resourceRecovery.getConfig() + " resource = " + resourceRecovery);
+ }
+ usedInstance = resourceRecovery;
+ configSet.add(usedInstance);
+ getTMRegistry().addXAResourceRecovery(usedInstance);
+ }
+ usedInstance.incrementUsage();
+ return usedInstance;
+ }
+ }
+
+
+
+ public synchronized void unRegister(final HornetQResourceRecovery resourceRecovery)
+ {
+ synchronized (configSet)
+ {
+ // The same resource could have been reused by more than one resource manager or factory
+ if (resourceRecovery.decrementUsage() == 0)
+ {
+ getTMRegistry().removeXAResourceRecovery(resourceRecovery);
+ }
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private static HornetQResourceRecovery locateSimilarResource(HornetQResourceRecovery resourceInput)
+ {
+ HornetQConnectionFactory factory = resourceInput.getConfig().getFactory();
+
+ TransportConfiguration[] transportConfigurations = resourceInput.getConfig().getFactory().getServerLocator()
+ .getStaticTransportConfigurations();
+
+
+ if (log.isTraceEnabled())
+ {
+ log.trace("############################################## looking for a place on " + Arrays.toString(transportConfigurations));
+ }
+
+ for (HornetQResourceRecovery resourceScan : configSet)
+ {
+ XARecoveryConfig xaRecoveryConfig = resourceScan.getConfig();
+
+ if (transportConfigurations != null)
+ {
+ TransportConfiguration[] xaConfigurations = xaRecoveryConfig.getHornetQConnectionFactory().getServerLocator()
+ .getStaticTransportConfigurations();
+
+ if (log.isTraceEnabled())
+ {
+ log.trace("Checking " + Arrays.toString(transportConfigurations) + " against " + Arrays.toString(xaConfigurations));
+ }
+
+ if (xaConfigurations == null)
+ {
+ continue;
+ }
+ if (transportConfigurations.length != xaConfigurations.length)
+ {
+ if (log.isTraceEnabled())
+ {
+ log.trace(Arrays.toString(transportConfigurations) + " != " + Arrays.toString(xaConfigurations) + " because of size");
+ }
+ continue;
+ }
+ boolean theSame = true;
+ for (int i = 0; i < transportConfigurations.length; i++)
+ {
+ TransportConfiguration tc = transportConfigurations[i];
+ TransportConfiguration xaTc = xaConfigurations[i];
+ if (!tc.equals(xaTc))
+ {
+ log.info(Arrays.toString(transportConfigurations) + " != " + Arrays.toString(xaConfigurations) + " because of " + tc + " != " + xaTc);
+ theSame = false;
+ break;
+ }
+ }
+ if (theSame)
+ {
+ return resourceScan;
+ }
+ } else
+ {
+ DiscoveryGroupConfiguration discoveryGroupConfiguration = xaRecoveryConfig.getHornetQConnectionFactory()
+ .getServerLocator().getDiscoveryGroupConfiguration();
+ if (discoveryGroupConfiguration != null && discoveryGroupConfiguration.equals(factory.getDiscoveryGroupConfiguration()))
+ {
+ return resourceScan;
+ }
+ }
+ }
+
+ return null;
+
+ }
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQResourceRecovery.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQResourceRecovery.java 2012-03-07 14:02:01 UTC (rev 12260)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQResourceRecovery.java 2012-03-07 23:59:46 UTC (rev 12261)
@@ -32,16 +32,39 @@
public class HornetQResourceRecovery implements XAResourceRecovery
{
private final XARecoveryConfig config;
+
+ private final XAResource[] xaResources;
+
+ private int usage;
public HornetQResourceRecovery(XARecoveryConfig config)
{
this.config = config;
+ this.xaResources = new XAResource[]{new HornetQXAResourceWrapper(config)};
}
public XAResource[] getXAResources()
{
- return new XAResource[]{new HornetQXAResourceWrapper(config)};
+ return xaResources;
}
+
+ public XARecoveryConfig getConfig()
+ {
+ return config;
+ }
+
+ /** we may have several connection factories referencing the same connection recovery entry.
+ * Because of that we need to make a count of the number of the instances that are referencing it,
+ * so we will remove it as soon as we are done */
+ public synchronized int incrementUsage()
+ {
+ return ++usage;
+ }
+
+ public synchronized int decrementUsage()
+ {
+ return --usage;
+ }
@Override
public boolean equals(Object o)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/RecoveryRegistry.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/RecoveryRegistry.java 2012-03-07 14:02:01 UTC (rev 12260)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/RecoveryRegistry.java 2012-03-07 23:59:46 UTC (rev 12261)
@@ -28,7 +28,7 @@
*/
public interface RecoveryRegistry
{
- void register(HornetQResourceRecovery resourceRecovery);
+ HornetQResourceRecovery register(HornetQResourceRecovery resourceRecovery);
void unRegister(HornetQResourceRecovery xaRecoveryConfig);
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java 2012-03-07 14:02:01 UTC (rev 12260)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java 2012-03-07 23:59:46 UTC (rev 12261)
@@ -16,6 +16,9 @@
import org.hornetq.jms.client.HornetQConnectionFactory;
/**
+ *
+ * This represents the configuration of a single connection factory.
+ *
* @author <a href="mailto:andy.taylor at jboss.com">Andy Taylor</a>
*
* A wrapper around info needed for the xa recovery resource
@@ -49,6 +52,11 @@
{
return password;
}
+
+ public HornetQConnectionFactory getFactory()
+ {
+ return hornetQConnectionFactory;
+ }
@Override
public boolean equals(Object o)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/ra/recovery/RecoveryManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/ra/recovery/RecoveryManager.java 2012-03-07 14:02:01 UTC (rev 12260)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/ra/recovery/RecoveryManager.java 2012-03-07 23:59:46 UTC (rev 12261)
@@ -21,18 +21,16 @@
*/
package org.hornetq.ra.recovery;
-import org.hornetq.api.core.DiscoveryGroupConfiguration;
-import org.hornetq.api.core.TransportConfiguration;
+import java.util.Set;
+
import org.hornetq.core.logging.Logger;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.server.recovery.HornetQResourceRecovery;
import org.hornetq.jms.server.recovery.RecoveryRegistry;
import org.hornetq.jms.server.recovery.XARecoveryConfig;
import org.hornetq.ra.Util;
+import org.hornetq.utils.ConcurrentHashSet;
-import java.util.HashMap;
-import java.util.Map;
-
/**
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
* 9/21/11
@@ -44,9 +42,9 @@
private RecoveryRegistry registry;
private String resourceRecoveryClassNames = "org.jboss.as.integration.hornetq.recovery.AS5RecoveryRegistry";
+
+ private final Set<HornetQResourceRecovery> resources = new ConcurrentHashSet<HornetQResourceRecovery>();
- private Map<XARecoveryConfig, HornetQResourceRecovery> configMap = new HashMap<XARecoveryConfig, HornetQResourceRecovery>();
-
public void start()
{
locateRecoveryRegistry();
@@ -56,15 +54,19 @@
{
log.debug("registering recovery for factory : " + factory);
- if(!isRegistered(factory) && registry != null)
+ XARecoveryConfig xaRecoveryConfig = new XARecoveryConfig(factory, userName, password);
+ HornetQResourceRecovery resourceRecovery = new HornetQResourceRecovery(xaRecoveryConfig);
+
+ if (registry != null)
{
- XARecoveryConfig xaRecoveryConfig = new XARecoveryConfig(factory, userName, password);
- HornetQResourceRecovery resourceRecovery = new HornetQResourceRecovery(xaRecoveryConfig);
- registry.register(resourceRecovery);
- configMap.put(xaRecoveryConfig, resourceRecovery);
- return resourceRecovery;
+ resourceRecovery = registry.register(resourceRecovery);
+ if (resourceRecovery != null)
+ {
+ resources.add(resourceRecovery);
+ }
}
- return null;
+
+ return resourceRecovery;
}
public void unRegister(HornetQResourceRecovery resourceRecovery)
@@ -74,11 +76,11 @@
public void stop()
{
- for (HornetQResourceRecovery hornetQResourceRecovery : configMap.values())
+ for (HornetQResourceRecovery hornetQResourceRecovery : resources)
{
registry.unRegister(hornetQResourceRecovery);
}
- configMap.clear();
+ resources.clear();
}
private void locateRecoveryRegistry()
@@ -105,9 +107,9 @@
{
registry = new RecoveryRegistry()
{
- public void register(HornetQResourceRecovery resourceRecovery)
+ public HornetQResourceRecovery register(HornetQResourceRecovery resourceRecovery)
{
- //no op
+ return null;
}
public void unRegister(HornetQResourceRecovery xaRecoveryConfig)
@@ -123,49 +125,4 @@
}
- public boolean isRegistered(HornetQConnectionFactory factory)
- {
- for (XARecoveryConfig xaRecoveryConfig : configMap.keySet())
- {
- TransportConfiguration[] transportConfigurations = factory.getServerLocator().getStaticTransportConfigurations();
-
- if (transportConfigurations != null)
- {
- TransportConfiguration[] xaConfigurations = xaRecoveryConfig.getHornetQConnectionFactory().getServerLocator().getStaticTransportConfigurations();
- if(xaConfigurations == null)
- {
- break;
- }
- if(transportConfigurations.length != xaConfigurations.length)
- {
- break;
- }
- boolean theSame=true;
- for(int i = 0; i < transportConfigurations.length; i++)
- {
- TransportConfiguration tc = transportConfigurations[i];
- TransportConfiguration xaTc = xaConfigurations[i];
- if(!tc.equals(xaTc))
- {
- theSame = false;
- break;
- }
- }
- if(theSame)
- {
- return theSame;
- }
- }
- else
- {
- DiscoveryGroupConfiguration discoveryGroupConfiguration = xaRecoveryConfig.getHornetQConnectionFactory().getServerLocator().getDiscoveryGroupConfiguration();
- if(discoveryGroupConfiguration != null && discoveryGroupConfiguration.equals(factory.getDiscoveryGroupConfiguration()))
- {
- return true;
- }
- }
- }
- return false;
- }
-
}
More information about the hornetq-commits
mailing list