[hornetq-commits] JBoss hornetq SVN: r10357 - in branches/Branch_2_2_EAP: src/main/org/hornetq/jms/server/recovery and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Mar 23 10:52:23 EDT 2011


Author: ataylor
Date: 2011-03-23 10:52:22 -0400 (Wed, 23 Mar 2011)
New Revision: 10357

Added:
   branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java
Modified:
   branches/Branch_2_2_EAP/examples/javaee/xarecovery/server/jbossts-properties.xml
   branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceRecovery.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java
Log:
https://issues.jboss.org/browse/JBPAPP-6049 - fixed recovery to handle multiple connectors and failover

Modified: branches/Branch_2_2_EAP/examples/javaee/xarecovery/server/jbossts-properties.xml
===================================================================
--- branches/Branch_2_2_EAP/examples/javaee/xarecovery/server/jbossts-properties.xml	2011-03-23 03:54:43 UTC (rev 10356)
+++ branches/Branch_2_2_EAP/examples/javaee/xarecovery/server/jbossts-properties.xml	2011-03-23 14:52:22 UTC (rev 10357)
@@ -242,11 +242,17 @@
 			<property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ1"
                    value="org.hornetq.jms.server.recovery.HornetQXAResourceRecovery;org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"/>
 
+       <!--you'll need something like this if the HornetQ Server is remote-->
        <!--
 			<property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ2"
                    value="org.hornetq.jms.server.recovery.HornetQXAResourceRecovery;org.hornetq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost,port=5445"/>-->
 
+       <!--you'll need something like this if the HornetQ Server is remote and has failover configured-->
+              <!--
+                <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ2"
+                          value="org.hornetq.jms.server.recovery.HornetQXAResourceRecovery;org.hornetq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost,port=5445;org.hornetq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost2,port=5446"/>-->
 
+
 		   <property name="com.arjuna.ats.jta.xaRecoveryNode" value="1"/>
     </properties>
     <properties depends="arjuna,txoj,jta" name="recoverymanager">

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceRecovery.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceRecovery.java	2011-03-23 03:54:43 UTC (rev 10356)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceRecovery.java	2011-03-23 14:52:22 UTC (rev 10357)
@@ -20,14 +20,19 @@
 
 import com.arjuna.ats.jta.recovery.XAResourceRecovery;
 
+import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.core.logging.Logger;
 
 /**
  * 
  * A XAResourceRecovery instance that can be used to recover any JMS provider.
+ *
+ * In reality only recover,rollback and commit will be called but we still need to
+ * be implement all methods just in case.
  * 
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
  * @version <tt>$Revision: 1.1 $</tt>
  *
  * $Id$
@@ -58,14 +63,24 @@
          HornetQXAResourceRecovery.log.trace(this + " intialise: " + config);
       }
 
-      ConfigParser parser = new ConfigParser(config);
-      String connectorFactoryClassName = parser.getConnectorFactoryClassName();
-      Map<String, Object> connectorParams = parser.getConnectorParameters();
-      String username = parser.getUsername();
-      String password = parser.getPassword();
+      String[] configs = config.split(";");
+      XARecoveryConfig[] xaRecoveryConfigs = new XARecoveryConfig[configs.length];
+      for (int i = 0, configsLength = configs.length; i < configsLength; i++)
+      {
+         String s = configs[i];
+         ConfigParser parser = new ConfigParser(s);
+         String connectorFactoryClassName = parser.getConnectorFactoryClassName();
+         Map<String, Object> connectorParams = parser.getConnectorParameters();
+         String username = parser.getUsername();
+         String password = parser.getPassword();
+         TransportConfiguration transportConfiguration = new TransportConfiguration(connectorFactoryClassName, connectorParams);
+         xaRecoveryConfigs[i] = new XARecoveryConfig(transportConfiguration, username, password);
+      }
 
-      res = new HornetQXAResourceWrapper(connectorFactoryClassName, connectorParams, username, password);
 
+
+      res = new HornetQXAResourceWrapper(xaRecoveryConfigs);
+
       if (HornetQXAResourceRecovery.log.isTraceEnabled())
       {
          HornetQXAResourceRecovery.log.trace(this + " initialised");

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java	2011-03-23 03:54:43 UTC (rev 10356)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java	2011-03-23 14:52:22 UTC (rev 10357)
@@ -40,6 +40,7 @@
  * @author <a href="adrian at jboss.com">Adrian Brock</a>
  * @author <a href="tim.fox at jboss.com">Tim Fox/a>
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
  * 
  * @version $Revision: 45341 $
  */
@@ -51,36 +52,26 @@
    /** The state lock */
    private static final Object lock = new Object();
 
-   /** The JNDI lookup for the XA connection factory */
-   private final String connectorFactoryClassName;
-
-   private final Map<String, Object> connectorConfig;
-
-   private final String username;
-
-   private final String password;
-
    private ServerLocator serverLocator;
    
    private ClientSessionFactory csf;
 
    private XAResource delegate;
 
-   public HornetQXAResourceWrapper(final String connectorFactoryClassName,
-                                   final Map<String, Object> connectorConfig,
-                                   final String username,
-                                   final String password)
+   private XARecoveryConfig[] xaRecoveryConfigs;
+
+   private TransportConfiguration currentConnection;
+
+   public HornetQXAResourceWrapper(XARecoveryConfig... xaRecoveryConfigs)
    {
-      this.connectorFactoryClassName = connectorFactoryClassName;
-      this.connectorConfig = connectorConfig;
-      this.username = username;
-      this.password = password;
+
+      this.xaRecoveryConfigs = xaRecoveryConfigs;
    }
 
    public Xid[] recover(final int flag) throws XAException
    {
-      HornetQXAResourceWrapper.log.debug("Recover " + connectorFactoryClassName);
-      XAResource xaResource = getDelegate();
+      XAResource xaResource = getDelegate(false);
+      HornetQXAResourceWrapper.log.debug("Recover " + currentConnection);
       try
       {
          return xaResource.recover(flag);
@@ -93,8 +84,8 @@
 
    public void commit(final Xid xid, final boolean onePhase) throws XAException
    {
-      HornetQXAResourceWrapper.log.debug("Commit " + connectorFactoryClassName + " xid " + " onePhase=" + onePhase);
-      XAResource xaResource = getDelegate();
+      XAResource xaResource = getDelegate(true);
+      HornetQXAResourceWrapper.log.debug("Commit " + currentConnection + " xid " + " onePhase=" + onePhase);
       try
       {
          xaResource.commit(xid, onePhase);
@@ -107,8 +98,8 @@
 
    public void rollback(final Xid xid) throws XAException
    {
-      HornetQXAResourceWrapper.log.debug("Rollback " + connectorFactoryClassName + " xid ");
-      XAResource xaResource = getDelegate();
+      XAResource xaResource = getDelegate(true);
+      HornetQXAResourceWrapper.log.debug("Rollback " + currentConnection + " xid ");
       try
       {
          xaResource.rollback(xid);
@@ -121,8 +112,8 @@
 
    public void forget(final Xid xid) throws XAException
    {
-      HornetQXAResourceWrapper.log.debug("Forget " + connectorFactoryClassName + " xid ");
-      XAResource xaResource = getDelegate();
+      XAResource xaResource = getDelegate(false);
+      HornetQXAResourceWrapper.log.debug("Forget " + currentConnection + " xid ");
       try
       {
          xaResource.forget(xid);
@@ -137,10 +128,10 @@
    {
       if (xaRes instanceof HornetQXAResourceWrapper)
       {
-         xaRes = ((HornetQXAResourceWrapper)xaRes).getDelegate();
+         xaRes = ((HornetQXAResourceWrapper)xaRes).getDelegate(false);
       }
 
-      XAResource xaResource = getDelegate();
+      XAResource xaResource = getDelegate(false);
       try
       {
          return xaResource.isSameRM(xaRes);
@@ -153,7 +144,8 @@
 
    public int prepare(final Xid xid) throws XAException
    {
-      XAResource xaResource = getDelegate();
+      XAResource xaResource = getDelegate(true);
+      HornetQXAResourceWrapper.log.debug("prepare " + currentConnection + " xid ");
       try
       {
          return xaResource.prepare(xid);
@@ -166,7 +158,8 @@
 
    public void start(final Xid xid, final int flags) throws XAException
    {
-      XAResource xaResource = getDelegate();
+      XAResource xaResource = getDelegate(false);
+      HornetQXAResourceWrapper.log.debug("start " + currentConnection + " xid ");
       try
       {
          xaResource.start(xid, flags);
@@ -179,7 +172,8 @@
 
    public void end(final Xid xid, final int flags) throws XAException
    {
-      XAResource xaResource = getDelegate();
+      XAResource xaResource = getDelegate(false);
+      HornetQXAResourceWrapper.log.debug("end " + currentConnection + " xid ");
       try
       {
          xaResource.end(xid, flags);
@@ -192,7 +186,8 @@
 
    public int getTransactionTimeout() throws XAException
    {
-      XAResource xaResource = getDelegate();
+      XAResource xaResource = getDelegate(false);
+      HornetQXAResourceWrapper.log.debug("getTransactionTimeout " + currentConnection + " xid ");
       try
       {
          return xaResource.getTransactionTimeout();
@@ -205,7 +200,8 @@
 
    public boolean setTransactionTimeout(final int seconds) throws XAException
    {
-      XAResource xaResource = getDelegate();
+      XAResource xaResource = getDelegate(false);
+      HornetQXAResourceWrapper.log.debug("setTransactionTimeout " + currentConnection + " xid ");
       try
       {
          return xaResource.setTransactionTimeout(seconds);
@@ -218,7 +214,7 @@
 
    public void connectionFailed(final HornetQException me, boolean failedOver)
    {
-      HornetQXAResourceWrapper.log.warn("Notified of connection failure in recovery connectionFactory for provider " + connectorFactoryClassName,
+      HornetQXAResourceWrapper.log.warn("Notified of connection failure in xa recovery connectionFactory for provider " + currentConnection + " will attempt reconnect on next pass",
                                         me);
       close();
    }
@@ -233,7 +229,7 @@
     * @return the connectionFactory
     * @throws XAException for any problem
     */
-   public XAResource getDelegate() throws XAException
+   public XAResource getDelegate(boolean retry) throws XAException
    {
       XAResource result = null;
       Exception error = null;
@@ -243,20 +239,36 @@
       }
       catch (Exception e)
       {
-         HornetQXAResourceWrapper.log.error("********************************Failed to connect to server", e);
          error = e;
       }
 
       if (result == null)
       {
-         XAException xae = new XAException("Error trying to connect to provider " + connectorFactoryClassName);
-         xae.errorCode = XAException.XAER_RMERR;
-         if (error != null)
+         //we should always throw a retry for certain methods comit etc, if not the tx is marked as a heuristic and
+         //all chaos is let loose
+         if(retry)
          {
-            xae.initCause(error);
+            XAException xae = new XAException("Connection unavailable for xa recovery");
+            xae.errorCode = XAException.XA_RETRY;
+            if (error != null)
+            {
+               xae.initCause(error);
+            }
+            HornetQXAResourceWrapper.log.debug("Cannot get connectionFactory XAResource", xae);
+            throw xae;
          }
-         HornetQXAResourceWrapper.log.debug("Cannot get connectionFactory XAResource", xae);
-         throw xae;
+         else
+         {
+            XAException xae = new XAException("Error trying to connect to any providers for xa recovery");
+            xae.errorCode = XAException.XAER_RMERR;
+            if (error != null)
+            {
+               xae.initCause(error);
+            }
+            HornetQXAResourceWrapper.log.debug("Cannot get connectionFactory XAResource", xae);
+            throw xae;
+         }
+
       }
 
       return result;
@@ -279,28 +291,42 @@
          }
       }
 
-      TransportConfiguration config = new TransportConfiguration(connectorFactoryClassName, connectorConfig);
-      serverLocator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration[]{config});
-      serverLocator.disableFinalizeCheck();
-      csf = serverLocator.createSessionFactory();
-      ClientSession cs = null;
-
-      if (username == null)
+      for (XARecoveryConfig xaRecoveryConfig : xaRecoveryConfigs)
       {
-         cs = csf.createSession(true, false, false);
-      }
-      else
-      {
-         cs = csf.createSession(username, password, true, false, false, false, 1);
-      }
-      cs.addFailureListener(this);
 
-      synchronized (HornetQXAResourceWrapper.lock)
-      {
-         delegate = cs;
-      }
 
-      return delegate;
+         ClientSession cs = null;
+
+         try
+         {
+            serverLocator = HornetQClient.createServerLocatorWithoutHA(xaRecoveryConfig.getTransportConfiguration());
+            serverLocator.disableFinalizeCheck();
+            csf = serverLocator.createSessionFactory();
+            if (xaRecoveryConfig.getUsername() == null)
+            {
+               cs = csf.createSession(true, false, false);
+            }
+            else
+            {
+               cs = csf.createSession(xaRecoveryConfig.getUsername(), xaRecoveryConfig.getPassword(), true, false, false, false, 1);
+            }
+         }
+         catch (HornetQException e)
+         {
+            continue;
+         }
+         cs.addFailureListener(this);
+
+         synchronized (HornetQXAResourceWrapper.lock)
+         {
+            delegate = cs;
+            currentConnection = xaRecoveryConfig.getTransportConfiguration();
+         }
+
+         return delegate;
+       }
+      currentConnection = null;
+      throw new HornetQException(HornetQException.NOT_CONNECTED);
    }
 
    /**
@@ -344,10 +370,9 @@
    {
       if (e.errorCode == XAException.XA_RETRY)
       {
-         HornetQXAResourceWrapper.log.debug("Fatal error in provider " + connectorFactoryClassName, e);
          close();
       }
-      throw new XAException(XAException.XAER_RMFAIL);
+      throw e;
    }
 
    @Override

Added: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java	                        (rev 0)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java	2011-03-23 14:52:22 UTC (rev 10357)
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *  Red Hat licenses this file to you under the Apache License, version
+ *  2.0 (the "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ *  implied.  See the License for the specific language governing
+ *  permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.server.recovery;
+
+import org.hornetq.api.core.TransportConfiguration;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.com">Andy Taylor</a>
+ *
+ * A wrapper around info needed for the xa recovery resource
+ *         Date: 3/23/11
+ *         Time: 10:15 AM
+ */
+public class XARecoveryConfig
+{
+   private final TransportConfiguration transportConfiguration;
+   private final String username;
+   private final String password;
+
+   public XARecoveryConfig(TransportConfiguration transportConfiguration, String username, String password)
+   {
+      this.transportConfiguration = transportConfiguration;
+      this.username = username;
+      this.password = password;
+   }
+
+   public TransportConfiguration getTransportConfiguration()
+   {
+      return transportConfiguration;
+   }
+
+   public String getUsername()
+   {
+      return username;
+   }
+
+   public String getPassword()
+   {
+      return password;
+   }
+}



More information about the hornetq-commits mailing list