[hornetq-commits] JBoss hornetq SVN: r10357 - in branches/Branch_2_2_EAP: src/main/org/hornetq/jms/server/recovery and 1 other directory.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Mar 23 10:52:23 EDT 2011
Author: ataylor
Date: 2011-03-23 10:52:22 -0400 (Wed, 23 Mar 2011)
New Revision: 10357
Added:
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java
Modified:
branches/Branch_2_2_EAP/examples/javaee/xarecovery/server/jbossts-properties.xml
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceRecovery.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java
Log:
https://issues.jboss.org/browse/JBPAPP-6049 - fixed recovery to handle multiple connectors and failover
Modified: branches/Branch_2_2_EAP/examples/javaee/xarecovery/server/jbossts-properties.xml
===================================================================
--- branches/Branch_2_2_EAP/examples/javaee/xarecovery/server/jbossts-properties.xml 2011-03-23 03:54:43 UTC (rev 10356)
+++ branches/Branch_2_2_EAP/examples/javaee/xarecovery/server/jbossts-properties.xml 2011-03-23 14:52:22 UTC (rev 10357)
@@ -242,11 +242,17 @@
<property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ1"
value="org.hornetq.jms.server.recovery.HornetQXAResourceRecovery;org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"/>
+ <!--you'll need something like this if the HornetQ Server is remote-->
<!--
<property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ2"
value="org.hornetq.jms.server.recovery.HornetQXAResourceRecovery;org.hornetq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost,port=5445"/>-->
+ <!--you'll need something like this if the HornetQ Server is remote and has failover configured-->
+ <!--
+ <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ2"
+ value="org.hornetq.jms.server.recovery.HornetQXAResourceRecovery;org.hornetq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost,port=5445;org.hornetq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost2,port=5446"/>-->
+
<property name="com.arjuna.ats.jta.xaRecoveryNode" value="1"/>
</properties>
<properties depends="arjuna,txoj,jta" name="recoverymanager">
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceRecovery.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceRecovery.java 2011-03-23 03:54:43 UTC (rev 10356)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceRecovery.java 2011-03-23 14:52:22 UTC (rev 10357)
@@ -20,14 +20,19 @@
import com.arjuna.ats.jta.recovery.XAResourceRecovery;
+import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.logging.Logger;
/**
*
* A XAResourceRecovery instance that can be used to recover any JMS provider.
+ *
+ * In reality only recover,rollback and commit will be called but we still need to
+ * be implement all methods just in case.
*
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
* @version <tt>$Revision: 1.1 $</tt>
*
* $Id$
@@ -58,14 +63,24 @@
HornetQXAResourceRecovery.log.trace(this + " intialise: " + config);
}
- ConfigParser parser = new ConfigParser(config);
- String connectorFactoryClassName = parser.getConnectorFactoryClassName();
- Map<String, Object> connectorParams = parser.getConnectorParameters();
- String username = parser.getUsername();
- String password = parser.getPassword();
+ String[] configs = config.split(";");
+ XARecoveryConfig[] xaRecoveryConfigs = new XARecoveryConfig[configs.length];
+ for (int i = 0, configsLength = configs.length; i < configsLength; i++)
+ {
+ String s = configs[i];
+ ConfigParser parser = new ConfigParser(s);
+ String connectorFactoryClassName = parser.getConnectorFactoryClassName();
+ Map<String, Object> connectorParams = parser.getConnectorParameters();
+ String username = parser.getUsername();
+ String password = parser.getPassword();
+ TransportConfiguration transportConfiguration = new TransportConfiguration(connectorFactoryClassName, connectorParams);
+ xaRecoveryConfigs[i] = new XARecoveryConfig(transportConfiguration, username, password);
+ }
- res = new HornetQXAResourceWrapper(connectorFactoryClassName, connectorParams, username, password);
+
+ res = new HornetQXAResourceWrapper(xaRecoveryConfigs);
+
if (HornetQXAResourceRecovery.log.isTraceEnabled())
{
HornetQXAResourceRecovery.log.trace(this + " initialised");
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java 2011-03-23 03:54:43 UTC (rev 10356)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java 2011-03-23 14:52:22 UTC (rev 10357)
@@ -40,6 +40,7 @@
* @author <a href="adrian at jboss.com">Adrian Brock</a>
* @author <a href="tim.fox at jboss.com">Tim Fox/a>
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
*
* @version $Revision: 45341 $
*/
@@ -51,36 +52,26 @@
/** The state lock */
private static final Object lock = new Object();
- /** The JNDI lookup for the XA connection factory */
- private final String connectorFactoryClassName;
-
- private final Map<String, Object> connectorConfig;
-
- private final String username;
-
- private final String password;
-
private ServerLocator serverLocator;
private ClientSessionFactory csf;
private XAResource delegate;
- public HornetQXAResourceWrapper(final String connectorFactoryClassName,
- final Map<String, Object> connectorConfig,
- final String username,
- final String password)
+ private XARecoveryConfig[] xaRecoveryConfigs;
+
+ private TransportConfiguration currentConnection;
+
+ public HornetQXAResourceWrapper(XARecoveryConfig... xaRecoveryConfigs)
{
- this.connectorFactoryClassName = connectorFactoryClassName;
- this.connectorConfig = connectorConfig;
- this.username = username;
- this.password = password;
+
+ this.xaRecoveryConfigs = xaRecoveryConfigs;
}
public Xid[] recover(final int flag) throws XAException
{
- HornetQXAResourceWrapper.log.debug("Recover " + connectorFactoryClassName);
- XAResource xaResource = getDelegate();
+ XAResource xaResource = getDelegate(false);
+ HornetQXAResourceWrapper.log.debug("Recover " + currentConnection);
try
{
return xaResource.recover(flag);
@@ -93,8 +84,8 @@
public void commit(final Xid xid, final boolean onePhase) throws XAException
{
- HornetQXAResourceWrapper.log.debug("Commit " + connectorFactoryClassName + " xid " + " onePhase=" + onePhase);
- XAResource xaResource = getDelegate();
+ XAResource xaResource = getDelegate(true);
+ HornetQXAResourceWrapper.log.debug("Commit " + currentConnection + " xid " + " onePhase=" + onePhase);
try
{
xaResource.commit(xid, onePhase);
@@ -107,8 +98,8 @@
public void rollback(final Xid xid) throws XAException
{
- HornetQXAResourceWrapper.log.debug("Rollback " + connectorFactoryClassName + " xid ");
- XAResource xaResource = getDelegate();
+ XAResource xaResource = getDelegate(true);
+ HornetQXAResourceWrapper.log.debug("Rollback " + currentConnection + " xid ");
try
{
xaResource.rollback(xid);
@@ -121,8 +112,8 @@
public void forget(final Xid xid) throws XAException
{
- HornetQXAResourceWrapper.log.debug("Forget " + connectorFactoryClassName + " xid ");
- XAResource xaResource = getDelegate();
+ XAResource xaResource = getDelegate(false);
+ HornetQXAResourceWrapper.log.debug("Forget " + currentConnection + " xid ");
try
{
xaResource.forget(xid);
@@ -137,10 +128,10 @@
{
if (xaRes instanceof HornetQXAResourceWrapper)
{
- xaRes = ((HornetQXAResourceWrapper)xaRes).getDelegate();
+ xaRes = ((HornetQXAResourceWrapper)xaRes).getDelegate(false);
}
- XAResource xaResource = getDelegate();
+ XAResource xaResource = getDelegate(false);
try
{
return xaResource.isSameRM(xaRes);
@@ -153,7 +144,8 @@
public int prepare(final Xid xid) throws XAException
{
- XAResource xaResource = getDelegate();
+ XAResource xaResource = getDelegate(true);
+ HornetQXAResourceWrapper.log.debug("prepare " + currentConnection + " xid ");
try
{
return xaResource.prepare(xid);
@@ -166,7 +158,8 @@
public void start(final Xid xid, final int flags) throws XAException
{
- XAResource xaResource = getDelegate();
+ XAResource xaResource = getDelegate(false);
+ HornetQXAResourceWrapper.log.debug("start " + currentConnection + " xid ");
try
{
xaResource.start(xid, flags);
@@ -179,7 +172,8 @@
public void end(final Xid xid, final int flags) throws XAException
{
- XAResource xaResource = getDelegate();
+ XAResource xaResource = getDelegate(false);
+ HornetQXAResourceWrapper.log.debug("end " + currentConnection + " xid ");
try
{
xaResource.end(xid, flags);
@@ -192,7 +186,8 @@
public int getTransactionTimeout() throws XAException
{
- XAResource xaResource = getDelegate();
+ XAResource xaResource = getDelegate(false);
+ HornetQXAResourceWrapper.log.debug("getTransactionTimeout " + currentConnection + " xid ");
try
{
return xaResource.getTransactionTimeout();
@@ -205,7 +200,8 @@
public boolean setTransactionTimeout(final int seconds) throws XAException
{
- XAResource xaResource = getDelegate();
+ XAResource xaResource = getDelegate(false);
+ HornetQXAResourceWrapper.log.debug("setTransactionTimeout " + currentConnection + " xid ");
try
{
return xaResource.setTransactionTimeout(seconds);
@@ -218,7 +214,7 @@
public void connectionFailed(final HornetQException me, boolean failedOver)
{
- HornetQXAResourceWrapper.log.warn("Notified of connection failure in recovery connectionFactory for provider " + connectorFactoryClassName,
+ HornetQXAResourceWrapper.log.warn("Notified of connection failure in xa recovery connectionFactory for provider " + currentConnection + " will attempt reconnect on next pass",
me);
close();
}
@@ -233,7 +229,7 @@
* @return the connectionFactory
* @throws XAException for any problem
*/
- public XAResource getDelegate() throws XAException
+ public XAResource getDelegate(boolean retry) throws XAException
{
XAResource result = null;
Exception error = null;
@@ -243,20 +239,36 @@
}
catch (Exception e)
{
- HornetQXAResourceWrapper.log.error("********************************Failed to connect to server", e);
error = e;
}
if (result == null)
{
- XAException xae = new XAException("Error trying to connect to provider " + connectorFactoryClassName);
- xae.errorCode = XAException.XAER_RMERR;
- if (error != null)
+ //we should always throw a retry for certain methods comit etc, if not the tx is marked as a heuristic and
+ //all chaos is let loose
+ if(retry)
{
- xae.initCause(error);
+ XAException xae = new XAException("Connection unavailable for xa recovery");
+ xae.errorCode = XAException.XA_RETRY;
+ if (error != null)
+ {
+ xae.initCause(error);
+ }
+ HornetQXAResourceWrapper.log.debug("Cannot get connectionFactory XAResource", xae);
+ throw xae;
}
- HornetQXAResourceWrapper.log.debug("Cannot get connectionFactory XAResource", xae);
- throw xae;
+ else
+ {
+ XAException xae = new XAException("Error trying to connect to any providers for xa recovery");
+ xae.errorCode = XAException.XAER_RMERR;
+ if (error != null)
+ {
+ xae.initCause(error);
+ }
+ HornetQXAResourceWrapper.log.debug("Cannot get connectionFactory XAResource", xae);
+ throw xae;
+ }
+
}
return result;
@@ -279,28 +291,42 @@
}
}
- TransportConfiguration config = new TransportConfiguration(connectorFactoryClassName, connectorConfig);
- serverLocator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration[]{config});
- serverLocator.disableFinalizeCheck();
- csf = serverLocator.createSessionFactory();
- ClientSession cs = null;
-
- if (username == null)
+ for (XARecoveryConfig xaRecoveryConfig : xaRecoveryConfigs)
{
- cs = csf.createSession(true, false, false);
- }
- else
- {
- cs = csf.createSession(username, password, true, false, false, false, 1);
- }
- cs.addFailureListener(this);
- synchronized (HornetQXAResourceWrapper.lock)
- {
- delegate = cs;
- }
- return delegate;
+ ClientSession cs = null;
+
+ try
+ {
+ serverLocator = HornetQClient.createServerLocatorWithoutHA(xaRecoveryConfig.getTransportConfiguration());
+ serverLocator.disableFinalizeCheck();
+ csf = serverLocator.createSessionFactory();
+ if (xaRecoveryConfig.getUsername() == null)
+ {
+ cs = csf.createSession(true, false, false);
+ }
+ else
+ {
+ cs = csf.createSession(xaRecoveryConfig.getUsername(), xaRecoveryConfig.getPassword(), true, false, false, false, 1);
+ }
+ }
+ catch (HornetQException e)
+ {
+ continue;
+ }
+ cs.addFailureListener(this);
+
+ synchronized (HornetQXAResourceWrapper.lock)
+ {
+ delegate = cs;
+ currentConnection = xaRecoveryConfig.getTransportConfiguration();
+ }
+
+ return delegate;
+ }
+ currentConnection = null;
+ throw new HornetQException(HornetQException.NOT_CONNECTED);
}
/**
@@ -344,10 +370,9 @@
{
if (e.errorCode == XAException.XA_RETRY)
{
- HornetQXAResourceWrapper.log.debug("Fatal error in provider " + connectorFactoryClassName, e);
close();
}
- throw new XAException(XAException.XAER_RMFAIL);
+ throw e;
}
@Override
Added: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java (rev 0)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java 2011-03-23 14:52:22 UTC (rev 10357)
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.server.recovery;
+
+import org.hornetq.api.core.TransportConfiguration;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.com">Andy Taylor</a>
+ *
+ * A wrapper around info needed for the xa recovery resource
+ * Date: 3/23/11
+ * Time: 10:15 AM
+ */
+public class XARecoveryConfig
+{
+ private final TransportConfiguration transportConfiguration;
+ private final String username;
+ private final String password;
+
+ public XARecoveryConfig(TransportConfiguration transportConfiguration, String username, String password)
+ {
+ this.transportConfiguration = transportConfiguration;
+ this.username = username;
+ this.password = password;
+ }
+
+ public TransportConfiguration getTransportConfiguration()
+ {
+ return transportConfiguration;
+ }
+
+ public String getUsername()
+ {
+ return username;
+ }
+
+ public String getPassword()
+ {
+ return password;
+ }
+}
More information about the hornetq-commits
mailing list