[hornetq-commits] JBoss hornetq SVN: r12271 - in branches/Branch_2_2_EAP/src/main/org/hornetq: jms/server/recovery and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Mar 8 17:57:58 EST 2012


Author: clebert.suconic
Date: 2012-03-08 17:57:55 -0500 (Thu, 08 Mar 2012)
New Revision: 12271

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/HornetQClient.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQRegistryBase.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/ra/recovery/RecoveryManager.java
Log:
JBPAPP-8377 - Improving recovery registry while avoiding duplicates

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/HornetQClient.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/HornetQClient.java	2012-03-08 16:08:38 UTC (rev 12270)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/HornetQClient.java	2012-03-08 22:57:55 UTC (rev 12271)
@@ -113,6 +113,18 @@
    }
    
    /**
+    * Create a ServerLocator which creates session factories using a static list of transportConfigurations, the ServerLocator is not updated automatically
+    * as the cluster topology changes, and no HA backup information is propagated to the client
+    * 
+    * @param transportConfigurations
+    * @return the ServerLocator
+    */
+   public static ServerLocator createServerLocator(final boolean ha, TransportConfiguration... transportConfigurations)
+   {
+      return new ServerLocatorImpl(ha, transportConfigurations);
+   }
+   
+   /**
     * Create a ServerLocator which creates session factories from a set of live servers, no HA backup information is propagated to the client
     * 
     * The UDP address and port are used to listen for live servers in the cluster
@@ -127,6 +139,20 @@
    }
    
    /**
+    * Create a ServerLocator which creates session factories from a set of live servers, no HA backup information is propagated to the client
+    * 
+    * The UDP address and port are used to listen for live servers in the cluster
+    * 
+    * @param discoveryAddress The UDP group address to listen for updates
+    * @param discoveryPort the UDP port to listen for updates
+    * @return the ServerLocator
+    */
+   public static ServerLocator createServerLocator(final boolean ha, final DiscoveryGroupConfiguration groupConfiguration)
+   {
+      return new ServerLocatorImpl(ha, groupConfiguration);
+   }
+   
+   /**
     * Create a ServerLocator which will receive cluster topology updates from the cluster as servers leave or join and new backups are appointed or removed.
     * The initial list of servers supplied in this method is simply to make an initial connection to the cluster, once that connection is made, up to date
     * cluster topology information is downloaded and automatically updated whenever the cluster topology changes. If the topology includes backup servers

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQRegistryBase.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQRegistryBase.java	2012-03-08 16:08:38 UTC (rev 12270)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQRegistryBase.java	2012-03-08 22:57:55 UTC (rev 12271)
@@ -13,14 +13,9 @@
 
 package org.hornetq.jms.server.recovery;
 
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.HashMap;
 
-import org.hornetq.api.core.DiscoveryGroupConfiguration;
-import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.core.logging.Logger;
-import org.hornetq.jms.client.HornetQConnectionFactory;
 import org.jboss.tm.XAResourceRecoveryRegistry;
 
 /**
@@ -28,7 +23,6 @@
  * we verify if a given connection factory already have a recovery registered
  *
  * @author Clebert
- * @author Andy Taylor
  *
  *
  */
@@ -40,7 +34,7 @@
 
    // Attributes ----------------------------------------------------
 
-   private static Set<HornetQResourceRecovery> configSet = new HashSet<HornetQResourceRecovery>();
+   private static HashMap<XARecoveryConfig, HornetQResourceRecovery> configSet = new HashMap<XARecoveryConfig, HornetQResourceRecovery>();
 
    // Static --------------------------------------------------------
 
@@ -54,32 +48,46 @@
    {
       synchronized (configSet)
       {
-         HornetQResourceRecovery usedInstance = locateSimilarResource(resourceRecovery);
-         if (usedInstance == null)
+         HornetQResourceRecovery recovery = configSet.get(resourceRecovery.getConfig());
+         
+         if (recovery == null)
          {
+            recovery = resourceRecovery;
             if (log.isDebugEnabled())
             {
-               log.debug("Adding " + resourceRecovery.getConfig() + " resource = " + resourceRecovery);
+               log.debug("Registering a new recovery for " + recovery.getConfig() + ", recovery = " + resourceRecovery);
             }
-            usedInstance = resourceRecovery;
-            configSet.add(usedInstance);
-            getTMRegistry().addXAResourceRecovery(usedInstance);
+            configSet.put(resourceRecovery.getConfig(), resourceRecovery);
+            getTMRegistry().addXAResourceRecovery(recovery);
          }
-         usedInstance.incrementUsage();
-         return usedInstance;
+         else
+         {
+            if (log.isDebugEnabled())
+            {
+               log.info("Return pre-existent recovery=" + recovery + " for configuration = " + resourceRecovery.getConfig());
+            }
+         }
+         recovery.incrementUsage();
+         return recovery;
       }
    }
 
 
 
-   public synchronized void unRegister(final HornetQResourceRecovery resourceRecovery)
+   public void unRegister(final HornetQResourceRecovery resourceRecovery)
    {
       synchronized (configSet)
       {
-         // The same resource could have been reused by more than one resource manager or factory
-         if (resourceRecovery.decrementUsage() == 0)
+         HornetQResourceRecovery recFound = configSet.get(resourceRecovery.getConfig());
+         
+         if (recFound != null && recFound.decrementUsage() == 0)
          {
-            getTMRegistry().removeXAResourceRecovery(resourceRecovery);
+            if (log.isDebugEnabled())
+            {
+               log.debug("Removing recovery information for " + recFound + " as all the deployments were already removed");
+            }
+            getTMRegistry().removeXAResourceRecovery(recFound);
+            configSet.remove(resourceRecovery);
          }
       }
    }
@@ -89,77 +97,6 @@
    // Protected -----------------------------------------------------
 
    // Private -------------------------------------------------------
-   
-   private static HornetQResourceRecovery locateSimilarResource(HornetQResourceRecovery resourceInput)
-   {
-      HornetQConnectionFactory factory = resourceInput.getConfig().getFactory();
-      
-      TransportConfiguration[] transportConfigurations = resourceInput.getConfig().getFactory().getServerLocator()
-               .getStaticTransportConfigurations();
+    // Inner classes -------------------------------------------------
 
-      
-      if (log.isTraceEnabled())
-      {
-         log.trace("############################################## looking for a place on " + Arrays.toString(transportConfigurations));
-      }
-      
-      for (HornetQResourceRecovery resourceScan : configSet)
-      {
-         XARecoveryConfig xaRecoveryConfig = resourceScan.getConfig();
-
-         if (transportConfigurations != null)
-         {
-            TransportConfiguration[] xaConfigurations = xaRecoveryConfig.getHornetQConnectionFactory().getServerLocator()
-                  .getStaticTransportConfigurations();
-            
-            if (log.isTraceEnabled())
-            {
-               log.trace("Checking " + Arrays.toString(transportConfigurations) + " against " + Arrays.toString(xaConfigurations));
-            }
-
-            if (xaConfigurations == null)
-            {
-               continue;
-            }
-            if (transportConfigurations.length != xaConfigurations.length)
-            {
-               if (log.isTraceEnabled())
-               {
-                  log.trace(Arrays.toString(transportConfigurations) + " != " + Arrays.toString(xaConfigurations) + " because of size");
-               }
-               continue;
-            }
-            boolean theSame = true;
-            for (int i = 0; i < transportConfigurations.length; i++)
-            {
-               TransportConfiguration tc = transportConfigurations[i];
-               TransportConfiguration xaTc = xaConfigurations[i];
-               if (!tc.equals(xaTc))
-               {
-                  log.info(Arrays.toString(transportConfigurations) + " != " + Arrays.toString(xaConfigurations) + " because of " + tc + " != " + xaTc);
-                  theSame = false;
-                  break;
-               }
-            }
-            if (theSame)
-            {
-               return resourceScan;
-            }
-         } else
-         {
-            DiscoveryGroupConfiguration discoveryGroupConfiguration = xaRecoveryConfig.getHornetQConnectionFactory()
-                  .getServerLocator().getDiscoveryGroupConfiguration();
-            if (discoveryGroupConfiguration != null && discoveryGroupConfiguration.equals(factory.getDiscoveryGroupConfiguration()))
-            {
-               return resourceScan;
-            }
-         }
-      }
-
-      return null;
-
-   }
-
-   // Inner classes -------------------------------------------------
-
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java	2012-03-08 16:08:38 UTC (rev 12270)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java	2012-03-08 22:57:55 UTC (rev 12271)
@@ -22,6 +22,7 @@
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.api.core.client.SessionFailureListener;
 import org.hornetq.core.logging.Logger;
@@ -316,7 +317,14 @@
 
          try
          {
-            serverLocator = xaRecoveryConfig.getHornetQConnectionFactory().getServerLocator();
+            if (xaRecoveryConfig.getDiscoveryConfiguration() != null)
+            {
+               serverLocator = HornetQClient.createServerLocator(xaRecoveryConfig.isHA(), xaRecoveryConfig.getDiscoveryConfiguration());
+            }
+            else
+            {
+               serverLocator = HornetQClient.createServerLocator(xaRecoveryConfig.isHA(), xaRecoveryConfig.getTransportConfig());
+            }
             serverLocator.disableFinalizeCheck();
             csf = serverLocator.createSessionFactory();
             if (xaRecoveryConfig.getUsername() == null)
@@ -334,10 +342,29 @@
                                       1);
             }
          }
-         catch (HornetQException e)
+         catch (Throwable e)
          {
+            log.warn("Can't connect to " + xaRecoveryConfig + " on auto-generated resource recovery", e);
+            if (log.isDebugEnabled())
+            {
+               log.debug(e.getMessage(), e);
+            }
+            
+            try
+            {
+               if (cs != null) cs.close();
+               if (serverLocator != null) serverLocator.close();
+            }
+            catch (Throwable ignored)
+            {
+               if (log.isTraceEnabled())
+               {
+                  log.trace(e.getMessage(), ignored);
+               }
+            }
             continue;
          }
+         
          cs.addFailureListener(this);
 
          synchronized (HornetQXAResourceWrapper.lock)
@@ -392,9 +419,9 @@
             oldServerLocator.close();
          }
       }
-      catch (Exception ignored)
+      catch (Throwable ignored)
       {
-         HornetQXAResourceWrapper.log.trace("Ignored error during close", ignored);
+         HornetQXAResourceWrapper.log.debug("Ignored error during close", ignored);
       }
    }
 
@@ -410,10 +437,9 @@
    {
       log.warn(e.getMessage(), e);
 
-      if (e.errorCode == XAException.XA_RETRY)
-      {
-         close();
-      }
+
+      // If any exception happened, we close the connection so we may start fresh
+      close();
       throw e;
    }
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java	2012-03-08 16:08:38 UTC (rev 12270)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java	2012-03-08 22:57:55 UTC (rev 12271)
@@ -13,6 +13,10 @@
 
 package org.hornetq.jms.server.recovery;
 
+import java.util.Arrays;
+
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.jms.client.HornetQConnectionFactory;
 
 /**
@@ -27,22 +31,46 @@
  */
 public class XARecoveryConfig
 {
-   private final HornetQConnectionFactory hornetQConnectionFactory;
+   
+   private final boolean ha;
+   private final TransportConfiguration[] transportConfiguration;
+   private final DiscoveryGroupConfiguration discoveryConfiguration;
    private final String username;
    private final String password;
 
-   public XARecoveryConfig(HornetQConnectionFactory hornetQConnectionFactory, String username, String password)
+   public XARecoveryConfig(final boolean ha, final TransportConfiguration[] transportConfiguration, final String username, final String password)
    {
-      this.hornetQConnectionFactory = hornetQConnectionFactory;
+      this.transportConfiguration = transportConfiguration;
+      this.discoveryConfiguration = null;
       this.username = username;
       this.password = password;
+      this.ha = ha;
    }
 
-   public HornetQConnectionFactory getHornetQConnectionFactory()
+   public XARecoveryConfig(final boolean ha, final DiscoveryGroupConfiguration discoveryConfiguration, final String username, final String password)
    {
-      return hornetQConnectionFactory;
+      this.discoveryConfiguration = discoveryConfiguration;
+      this.transportConfiguration = null;
+      this.username = username;
+      this.password = password;
+      this.ha = ha;
    }
+   
+   public boolean isHA()
+   {
+      return ha;
+   }
 
+   public DiscoveryGroupConfiguration getDiscoveryConfiguration()
+   {
+      return discoveryConfiguration;
+   }
+   
+   public TransportConfiguration[] getTransportConfig()
+   {
+      return transportConfiguration;
+   }
+
    public String getUsername()
    {
       return username;
@@ -53,28 +81,43 @@
       return password;
    }
    
-   public HornetQConnectionFactory getFactory()
+   /* (non-Javadoc)
+    * @see java.lang.Object#hashCode()
+    */
+   @Override
+   public int hashCode()
    {
-      return hornetQConnectionFactory;
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((discoveryConfiguration == null) ? 0 : discoveryConfiguration.hashCode());
+      result = prime * result + Arrays.hashCode(transportConfiguration);
+      return result;
    }
 
+   /* (non-Javadoc)
+    * @see java.lang.Object#equals(java.lang.Object)
+    */
    @Override
-   public boolean equals(Object o)
+   public boolean equals(Object obj)
    {
-      if (this == o) return true;
-      if (o == null || getClass() != o.getClass()) return false;
-
-      XARecoveryConfig that = (XARecoveryConfig) o;
-
-      if (hornetQConnectionFactory != null ? !hornetQConnectionFactory.equals(that.hornetQConnectionFactory) : that.hornetQConnectionFactory != null)
+      if (this == obj)
+         return true;
+      if (obj == null)
          return false;
-      if (password != null ? !password.equals(that.password) : that.password != null) return false;
-      if (username != null ? !username.equals(that.username) : that.username != null) return false;
-
+      if (getClass() != obj.getClass())
+         return false;
+      XARecoveryConfig other = (XARecoveryConfig)obj;
+      if (discoveryConfiguration == null)
+      {
+         if (other.discoveryConfiguration != null)
+            return false;
+      }
+      else if (!discoveryConfiguration.equals(other.discoveryConfiguration))
+         return false;
+      if (!Arrays.equals(transportConfiguration, other.transportConfiguration))
+         return false;
       return true;
    }
-   
-   
 
    /* (non-Javadoc)
     * @see java.lang.Object#toString()
@@ -82,20 +125,12 @@
    @Override
    public String toString()
    {
-      return "XARecoveryConfig [hornetQConnectionFactory=" + hornetQConnectionFactory +
+      return "XARecoveryConfig [transportConfiguration = " + Arrays.toString(transportConfiguration) +
+             ", discoveryConfiguration = " + discoveryConfiguration +
              ", username=" +
              username +
              ", password=" +
              password +
              "]";
    }
-
-   @Override
-   public int hashCode()
-   {
-      int result = hornetQConnectionFactory != null ? hornetQConnectionFactory.hashCode() : 0;
-      result = 31 * result + (username != null ? username.hashCode() : 0);
-      result = 31 * result + (password != null ? password.hashCode() : 0);
-      return result;
-   }
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/ra/recovery/RecoveryManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/ra/recovery/RecoveryManager.java	2012-03-08 16:08:38 UTC (rev 12270)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/ra/recovery/RecoveryManager.java	2012-03-08 22:57:55 UTC (rev 12271)
@@ -54,8 +54,7 @@
    {
       log.debug("registering recovery for factory : " + factory);
       
-      XARecoveryConfig xaRecoveryConfig = new XARecoveryConfig(factory, userName, password);
-      HornetQResourceRecovery resourceRecovery = new HornetQResourceRecovery(xaRecoveryConfig);
+      HornetQResourceRecovery resourceRecovery = newResourceRecovery(factory, userName, password);
       
       if (registry != null)
       {
@@ -69,6 +68,31 @@
       return resourceRecovery;
    }
 
+   /**
+    * @param factory
+    * @param userName
+    * @param password
+    * @return
+    */
+   private HornetQResourceRecovery newResourceRecovery(HornetQConnectionFactory factory,
+                                                       String userName,
+                                                       String password)
+   {
+      XARecoveryConfig xaRecoveryConfig;
+
+      if (factory.getServerLocator().getDiscoveryGroupConfiguration() != null)
+      {
+         xaRecoveryConfig = new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getDiscoveryGroupConfiguration(), userName, password);
+      }
+      else
+      {
+         xaRecoveryConfig = new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getStaticTransportConfigurations(), userName, password);
+      }
+      
+      HornetQResourceRecovery resourceRecovery = new HornetQResourceRecovery(xaRecoveryConfig);
+      return resourceRecovery;
+   }
+
    public void unRegister(HornetQResourceRecovery resourceRecovery)
    {
       registry.unRegister(resourceRecovery);



More information about the hornetq-commits mailing list