[hornetq-commits] JBoss hornetq SVN: r12271 - in branches/Branch_2_2_EAP/src/main/org/hornetq: jms/server/recovery and 1 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Mar 8 17:57:58 EST 2012
Author: clebert.suconic
Date: 2012-03-08 17:57:55 -0500 (Thu, 08 Mar 2012)
New Revision: 12271
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/HornetQClient.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQRegistryBase.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/recovery/RecoveryManager.java
Log:
JBPAPP-8377 - Improving recovery registry while avoiding duplicates
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/HornetQClient.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/HornetQClient.java 2012-03-08 16:08:38 UTC (rev 12270)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/HornetQClient.java 2012-03-08 22:57:55 UTC (rev 12271)
@@ -113,6 +113,18 @@
}
/**
+ * Create a ServerLocator which creates session factories using a static list of transportConfigurations, the ServerLocator is not updated automatically
+ * as the cluster topology changes, and no HA backup information is propagated to the client
+ *
+ * @param transportConfigurations
+ * @return the ServerLocator
+ */
+ public static ServerLocator createServerLocator(final boolean ha, TransportConfiguration... transportConfigurations)
+ {
+ return new ServerLocatorImpl(ha, transportConfigurations);
+ }
+
+ /**
* Create a ServerLocator which creates session factories from a set of live servers, no HA backup information is propagated to the client
*
* The UDP address and port are used to listen for live servers in the cluster
@@ -127,6 +139,20 @@
}
/**
+ * Create a ServerLocator which creates session factories from a set of live servers, no HA backup information is propagated to the client
+ *
+ * The UDP address and port are used to listen for live servers in the cluster
+ *
+ * @param discoveryAddress The UDP group address to listen for updates
+ * @param discoveryPort the UDP port to listen for updates
+ * @return the ServerLocator
+ */
+ public static ServerLocator createServerLocator(final boolean ha, final DiscoveryGroupConfiguration groupConfiguration)
+ {
+ return new ServerLocatorImpl(ha, groupConfiguration);
+ }
+
+ /**
* Create a ServerLocator which will receive cluster topology updates from the cluster as servers leave or join and new backups are appointed or removed.
* The initial list of servers supplied in this method is simply to make an initial connection to the cluster, once that connection is made, up to date
* cluster topology information is downloaded and automatically updated whenever the cluster topology changes. If the topology includes backup servers
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQRegistryBase.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQRegistryBase.java 2012-03-08 16:08:38 UTC (rev 12270)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQRegistryBase.java 2012-03-08 22:57:55 UTC (rev 12271)
@@ -13,14 +13,9 @@
package org.hornetq.jms.server.recovery;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.HashMap;
-import org.hornetq.api.core.DiscoveryGroupConfiguration;
-import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.logging.Logger;
-import org.hornetq.jms.client.HornetQConnectionFactory;
import org.jboss.tm.XAResourceRecoveryRegistry;
/**
@@ -28,7 +23,6 @@
* we verify if a given connection factory already have a recovery registered
*
* @author Clebert
- * @author Andy Taylor
*
*
*/
@@ -40,7 +34,7 @@
// Attributes ----------------------------------------------------
- private static Set<HornetQResourceRecovery> configSet = new HashSet<HornetQResourceRecovery>();
+ private static HashMap<XARecoveryConfig, HornetQResourceRecovery> configSet = new HashMap<XARecoveryConfig, HornetQResourceRecovery>();
// Static --------------------------------------------------------
@@ -54,32 +48,46 @@
{
synchronized (configSet)
{
- HornetQResourceRecovery usedInstance = locateSimilarResource(resourceRecovery);
- if (usedInstance == null)
+ HornetQResourceRecovery recovery = configSet.get(resourceRecovery.getConfig());
+
+ if (recovery == null)
{
+ recovery = resourceRecovery;
if (log.isDebugEnabled())
{
- log.debug("Adding " + resourceRecovery.getConfig() + " resource = " + resourceRecovery);
+ log.debug("Registering a new recovery for " + recovery.getConfig() + ", recovery = " + resourceRecovery);
}
- usedInstance = resourceRecovery;
- configSet.add(usedInstance);
- getTMRegistry().addXAResourceRecovery(usedInstance);
+ configSet.put(resourceRecovery.getConfig(), resourceRecovery);
+ getTMRegistry().addXAResourceRecovery(recovery);
}
- usedInstance.incrementUsage();
- return usedInstance;
+ else
+ {
+ if (log.isDebugEnabled())
+ {
+ log.info("Return pre-existent recovery=" + recovery + " for configuration = " + resourceRecovery.getConfig());
+ }
+ }
+ recovery.incrementUsage();
+ return recovery;
}
}
- public synchronized void unRegister(final HornetQResourceRecovery resourceRecovery)
+ public void unRegister(final HornetQResourceRecovery resourceRecovery)
{
synchronized (configSet)
{
- // The same resource could have been reused by more than one resource manager or factory
- if (resourceRecovery.decrementUsage() == 0)
+ HornetQResourceRecovery recFound = configSet.get(resourceRecovery.getConfig());
+
+ if (recFound != null && recFound.decrementUsage() == 0)
{
- getTMRegistry().removeXAResourceRecovery(resourceRecovery);
+ if (log.isDebugEnabled())
+ {
+ log.debug("Removing recovery information for " + recFound + " as all the deployments were already removed");
+ }
+ getTMRegistry().removeXAResourceRecovery(recFound);
+ configSet.remove(resourceRecovery);
}
}
}
@@ -89,77 +97,6 @@
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
-
- private static HornetQResourceRecovery locateSimilarResource(HornetQResourceRecovery resourceInput)
- {
- HornetQConnectionFactory factory = resourceInput.getConfig().getFactory();
-
- TransportConfiguration[] transportConfigurations = resourceInput.getConfig().getFactory().getServerLocator()
- .getStaticTransportConfigurations();
+ // Inner classes -------------------------------------------------
-
- if (log.isTraceEnabled())
- {
- log.trace("############################################## looking for a place on " + Arrays.toString(transportConfigurations));
- }
-
- for (HornetQResourceRecovery resourceScan : configSet)
- {
- XARecoveryConfig xaRecoveryConfig = resourceScan.getConfig();
-
- if (transportConfigurations != null)
- {
- TransportConfiguration[] xaConfigurations = xaRecoveryConfig.getHornetQConnectionFactory().getServerLocator()
- .getStaticTransportConfigurations();
-
- if (log.isTraceEnabled())
- {
- log.trace("Checking " + Arrays.toString(transportConfigurations) + " against " + Arrays.toString(xaConfigurations));
- }
-
- if (xaConfigurations == null)
- {
- continue;
- }
- if (transportConfigurations.length != xaConfigurations.length)
- {
- if (log.isTraceEnabled())
- {
- log.trace(Arrays.toString(transportConfigurations) + " != " + Arrays.toString(xaConfigurations) + " because of size");
- }
- continue;
- }
- boolean theSame = true;
- for (int i = 0; i < transportConfigurations.length; i++)
- {
- TransportConfiguration tc = transportConfigurations[i];
- TransportConfiguration xaTc = xaConfigurations[i];
- if (!tc.equals(xaTc))
- {
- log.info(Arrays.toString(transportConfigurations) + " != " + Arrays.toString(xaConfigurations) + " because of " + tc + " != " + xaTc);
- theSame = false;
- break;
- }
- }
- if (theSame)
- {
- return resourceScan;
- }
- } else
- {
- DiscoveryGroupConfiguration discoveryGroupConfiguration = xaRecoveryConfig.getHornetQConnectionFactory()
- .getServerLocator().getDiscoveryGroupConfiguration();
- if (discoveryGroupConfiguration != null && discoveryGroupConfiguration.equals(factory.getDiscoveryGroupConfiguration()))
- {
- return resourceScan;
- }
- }
- }
-
- return null;
-
- }
-
- // Inner classes -------------------------------------------------
-
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java 2012-03-08 16:08:38 UTC (rev 12270)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java 2012-03-08 22:57:55 UTC (rev 12271)
@@ -22,6 +22,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.logging.Logger;
@@ -316,7 +317,14 @@
try
{
- serverLocator = xaRecoveryConfig.getHornetQConnectionFactory().getServerLocator();
+ if (xaRecoveryConfig.getDiscoveryConfiguration() != null)
+ {
+ serverLocator = HornetQClient.createServerLocator(xaRecoveryConfig.isHA(), xaRecoveryConfig.getDiscoveryConfiguration());
+ }
+ else
+ {
+ serverLocator = HornetQClient.createServerLocator(xaRecoveryConfig.isHA(), xaRecoveryConfig.getTransportConfig());
+ }
serverLocator.disableFinalizeCheck();
csf = serverLocator.createSessionFactory();
if (xaRecoveryConfig.getUsername() == null)
@@ -334,10 +342,29 @@
1);
}
}
- catch (HornetQException e)
+ catch (Throwable e)
{
+ log.warn("Can't connect to " + xaRecoveryConfig + " on auto-generated resource recovery", e);
+ if (log.isDebugEnabled())
+ {
+ log.debug(e.getMessage(), e);
+ }
+
+ try
+ {
+ if (cs != null) cs.close();
+ if (serverLocator != null) serverLocator.close();
+ }
+ catch (Throwable ignored)
+ {
+ if (log.isTraceEnabled())
+ {
+ log.trace(e.getMessage(), ignored);
+ }
+ }
continue;
}
+
cs.addFailureListener(this);
synchronized (HornetQXAResourceWrapper.lock)
@@ -392,9 +419,9 @@
oldServerLocator.close();
}
}
- catch (Exception ignored)
+ catch (Throwable ignored)
{
- HornetQXAResourceWrapper.log.trace("Ignored error during close", ignored);
+ HornetQXAResourceWrapper.log.debug("Ignored error during close", ignored);
}
}
@@ -410,10 +437,9 @@
{
log.warn(e.getMessage(), e);
- if (e.errorCode == XAException.XA_RETRY)
- {
- close();
- }
+
+ // If any exception happened, we close the connection so we may start fresh
+ close();
throw e;
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java 2012-03-08 16:08:38 UTC (rev 12270)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java 2012-03-08 22:57:55 UTC (rev 12271)
@@ -13,6 +13,10 @@
package org.hornetq.jms.server.recovery;
+import java.util.Arrays;
+
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.jms.client.HornetQConnectionFactory;
/**
@@ -27,22 +31,46 @@
*/
public class XARecoveryConfig
{
- private final HornetQConnectionFactory hornetQConnectionFactory;
+
+ private final boolean ha;
+ private final TransportConfiguration[] transportConfiguration;
+ private final DiscoveryGroupConfiguration discoveryConfiguration;
private final String username;
private final String password;
- public XARecoveryConfig(HornetQConnectionFactory hornetQConnectionFactory, String username, String password)
+ public XARecoveryConfig(final boolean ha, final TransportConfiguration[] transportConfiguration, final String username, final String password)
{
- this.hornetQConnectionFactory = hornetQConnectionFactory;
+ this.transportConfiguration = transportConfiguration;
+ this.discoveryConfiguration = null;
this.username = username;
this.password = password;
+ this.ha = ha;
}
- public HornetQConnectionFactory getHornetQConnectionFactory()
+ public XARecoveryConfig(final boolean ha, final DiscoveryGroupConfiguration discoveryConfiguration, final String username, final String password)
{
- return hornetQConnectionFactory;
+ this.discoveryConfiguration = discoveryConfiguration;
+ this.transportConfiguration = null;
+ this.username = username;
+ this.password = password;
+ this.ha = ha;
}
+
+ public boolean isHA()
+ {
+ return ha;
+ }
+ public DiscoveryGroupConfiguration getDiscoveryConfiguration()
+ {
+ return discoveryConfiguration;
+ }
+
+ public TransportConfiguration[] getTransportConfig()
+ {
+ return transportConfiguration;
+ }
+
public String getUsername()
{
return username;
@@ -53,28 +81,43 @@
return password;
}
- public HornetQConnectionFactory getFactory()
+ /* (non-Javadoc)
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode()
{
- return hornetQConnectionFactory;
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((discoveryConfiguration == null) ? 0 : discoveryConfiguration.hashCode());
+ result = prime * result + Arrays.hashCode(transportConfiguration);
+ return result;
}
+ /* (non-Javadoc)
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
@Override
- public boolean equals(Object o)
+ public boolean equals(Object obj)
{
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- XARecoveryConfig that = (XARecoveryConfig) o;
-
- if (hornetQConnectionFactory != null ? !hornetQConnectionFactory.equals(that.hornetQConnectionFactory) : that.hornetQConnectionFactory != null)
+ if (this == obj)
+ return true;
+ if (obj == null)
return false;
- if (password != null ? !password.equals(that.password) : that.password != null) return false;
- if (username != null ? !username.equals(that.username) : that.username != null) return false;
-
+ if (getClass() != obj.getClass())
+ return false;
+ XARecoveryConfig other = (XARecoveryConfig)obj;
+ if (discoveryConfiguration == null)
+ {
+ if (other.discoveryConfiguration != null)
+ return false;
+ }
+ else if (!discoveryConfiguration.equals(other.discoveryConfiguration))
+ return false;
+ if (!Arrays.equals(transportConfiguration, other.transportConfiguration))
+ return false;
return true;
}
-
-
/* (non-Javadoc)
* @see java.lang.Object#toString()
@@ -82,20 +125,12 @@
@Override
public String toString()
{
- return "XARecoveryConfig [hornetQConnectionFactory=" + hornetQConnectionFactory +
+ return "XARecoveryConfig [transportConfiguration = " + Arrays.toString(transportConfiguration) +
+ ", discoveryConfiguration = " + discoveryConfiguration +
", username=" +
username +
", password=" +
password +
"]";
}
-
- @Override
- public int hashCode()
- {
- int result = hornetQConnectionFactory != null ? hornetQConnectionFactory.hashCode() : 0;
- result = 31 * result + (username != null ? username.hashCode() : 0);
- result = 31 * result + (password != null ? password.hashCode() : 0);
- return result;
- }
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/ra/recovery/RecoveryManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/ra/recovery/RecoveryManager.java 2012-03-08 16:08:38 UTC (rev 12270)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/ra/recovery/RecoveryManager.java 2012-03-08 22:57:55 UTC (rev 12271)
@@ -54,8 +54,7 @@
{
log.debug("registering recovery for factory : " + factory);
- XARecoveryConfig xaRecoveryConfig = new XARecoveryConfig(factory, userName, password);
- HornetQResourceRecovery resourceRecovery = new HornetQResourceRecovery(xaRecoveryConfig);
+ HornetQResourceRecovery resourceRecovery = newResourceRecovery(factory, userName, password);
if (registry != null)
{
@@ -69,6 +68,31 @@
return resourceRecovery;
}
+ /**
+ * @param factory
+ * @param userName
+ * @param password
+ * @return
+ */
+ private HornetQResourceRecovery newResourceRecovery(HornetQConnectionFactory factory,
+ String userName,
+ String password)
+ {
+ XARecoveryConfig xaRecoveryConfig;
+
+ if (factory.getServerLocator().getDiscoveryGroupConfiguration() != null)
+ {
+ xaRecoveryConfig = new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getDiscoveryGroupConfiguration(), userName, password);
+ }
+ else
+ {
+ xaRecoveryConfig = new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getStaticTransportConfigurations(), userName, password);
+ }
+
+ HornetQResourceRecovery resourceRecovery = new HornetQResourceRecovery(xaRecoveryConfig);
+ return resourceRecovery;
+ }
+
public void unRegister(HornetQResourceRecovery resourceRecovery)
{
registry.unRegister(resourceRecovery);
More information about the hornetq-commits
mailing list