[jboss-cvs] JBoss Messaging SVN: r1988 - trunk/src/main/org/jboss/jms/recovery.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jan 18 16:24:04 EST 2007
Author: timfox
Date: 2007-01-18 16:24:04 -0500 (Thu, 18 Jan 2007)
New Revision: 1988
Added:
trunk/src/main/org/jboss/jms/recovery/BridgeXAResourceRecovery.java
trunk/src/main/org/jboss/jms/recovery/BridgeXAResourceWrapper.java
Log:
Bridge recovery
Added: trunk/src/main/org/jboss/jms/recovery/BridgeXAResourceRecovery.java
===================================================================
--- trunk/src/main/org/jboss/jms/recovery/BridgeXAResourceRecovery.java (rev 0)
+++ trunk/src/main/org/jboss/jms/recovery/BridgeXAResourceRecovery.java 2007-01-18 21:24:04 UTC (rev 1988)
@@ -0,0 +1,188 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.jms.recovery;
+
+import java.io.InputStream;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.StringTokenizer;
+
+import javax.transaction.xa.XAResource;
+
+import org.jboss.logging.Logger;
+
+import com.arjuna.ats.jta.recovery.XAResourceRecovery;
+
+/**
+ * @author <a href="adrian at jboss.com">Adrian Brock</a>
+ * @author <a href="juha at jboss.com">Juha Lindfors</a>
+ * @author <a href="tim.fox at jboss.com">Tim Fox</a>
+ *
+ * @version $Revision: 1.1 $
+ */
+public class BridgeXAResourceRecovery implements XAResourceRecovery
+{
+ private boolean trace = log.isTraceEnabled();
+
+ private static final Logger log = Logger.getLogger(BridgeXAResourceRecovery.class);
+
+ private BridgeXAResourceWrapper wrapper;
+
+ private boolean working = false;
+
+ private Hashtable jndiProperties;
+
+ private String connectionFactoryLookup;
+
+ public BridgeXAResourceRecovery()
+ {
+ if(trace) log.trace("Constructing BridgeXAResourceRecovery2..");
+ }
+
+ public boolean initialise(String config)
+ {
+ if (log.isTraceEnabled()) { log.trace(this + " intialise: " + config); }
+
+ StringTokenizer tok = new StringTokenizer(config, ",");
+
+ if (tok.countTokens() != 2)
+ {
+ log.error("Invalid config: " + config);
+ return false;
+ }
+
+ String provider = tok.nextToken();
+
+ String propsFile = tok.nextToken();
+
+ try
+ {
+ //The config should point to a properties file on the classpath that holds the actual config
+ InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream(propsFile);
+
+ Properties props = new Properties();
+
+ props.load(is);
+
+ /*
+ * provider1.jndi.prop1=xxxx
+ * provider1.jndi.prop2=yyyy
+ * provider1.jndi.prop3=zzzz
+ *
+ * provider1.xaconnectionfactorylookup=xyz
+ *
+ * provider2.jndi.prop1=xxxx
+ * provider2.jndi.prop2=yyyy
+ * provider2.jndi.prop3=zzzz
+ *
+ * provider2.xaconnectionfactorylookup=xyz
+ *
+ */
+
+ Iterator iter = props.entrySet().iterator();
+
+ String jndiPrefix = provider + ".jndi.";
+
+ String cfKey = provider + ".xaconnectionfactorylookup";
+
+ jndiProperties = new Hashtable();
+
+ while (iter.hasNext())
+ {
+ Map.Entry entry = (Map.Entry)iter.next();
+
+ String key = (String)entry.getKey();
+ String value = (String)entry.getValue();
+
+ if (key.startsWith(jndiPrefix))
+ {
+ String actualKey = key.substring(jndiPrefix.length());
+
+ jndiProperties.put(actualKey, value);
+ }
+ else if (key.equals(cfKey))
+ {
+ connectionFactoryLookup = value;
+ }
+ }
+
+ if (connectionFactoryLookup == null)
+ {
+ log.error("Key " + cfKey + " does not exist in config");
+ return false;
+ }
+
+ if (log.isTraceEnabled()) { log.trace(this + " initialised"); }
+
+ return true;
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to load config file: " + config, e);
+
+ return false;
+ }
+ }
+
+ public boolean hasMoreResources()
+ {
+ if (log.isTraceEnabled()) { log.trace(this + " hasMoreResources"); }
+
+ // If the XAResource is already working
+ if (working)
+ {
+ log.info("Returning false");
+ return false;
+ }
+
+ // Have we initialized yet?
+ if (wrapper == null)
+ {
+ wrapper = new BridgeXAResourceWrapper(jndiProperties, connectionFactoryLookup);
+ }
+
+ // Test the connection
+ try
+ {
+ wrapper.getTransactionTimeout();
+ working = true;
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ log.info("Returning: " + working);
+
+ // This will return false until we get a successful connection
+ return working;
+ }
+
+ public XAResource getXAResource()
+ {
+ if (log.isTraceEnabled()) { log.trace(this + " getXAResource"); }
+
+ return wrapper;
+ }
+}
+
Added: trunk/src/main/org/jboss/jms/recovery/BridgeXAResourceWrapper.java
===================================================================
--- trunk/src/main/org/jboss/jms/recovery/BridgeXAResourceWrapper.java (rev 0)
+++ trunk/src/main/org/jboss/jms/recovery/BridgeXAResourceWrapper.java 2007-01-18 21:24:04 UTC (rev 1988)
@@ -0,0 +1,446 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.jms.recovery;
+
+import java.util.Hashtable;
+
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
+import javax.naming.InitialContext;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.jboss.logging.Logger;
+
+/**
+ * BridgeXAResourceWrapper.
+ *
+ * @author <a href="adrian at jboss.com">Adrian Brock</a>
+ * @author <a href="tim.fox at jboss.com">Tim Fox</a>
+ * @version $Revision: 1.2 $
+ */
+public class BridgeXAResourceWrapper implements XAResource, ExceptionListener
+{
+ /** The log */
+ private static final Logger log = Logger.getLogger(XAResourceWrapper.class);
+
+ private boolean trace = log.isTraceEnabled();
+
+ /** The state lock */
+ private static final Object lock = new Object();
+
+ /** The connection */
+ private XAConnection connection;
+
+ /** The delegate XAResource */
+ private XAResource delegate;
+
+ private Hashtable jndiProperties;
+
+ private String connectionFactoryLookup;
+
+ public BridgeXAResourceWrapper(Hashtable jndiProperties, String connectionFactoryLookup)
+ {
+ this.jndiProperties = jndiProperties;
+
+ this.connectionFactoryLookup = connectionFactoryLookup;
+ }
+
+ public Xid[] recover(int flag) throws XAException
+ {
+ XAResource xaResource = getDelegate();
+
+ if (trace) { log.trace(this + " Calling recover"); }
+
+ try
+ {
+ return xaResource.recover(flag);
+ }
+ catch (XAException e)
+ {
+ log.info("Caught exception in recover", e);
+ throw check(e);
+ }
+ catch (Exception e)
+ {
+ log.info("Caught e in recover", e);
+ throw new RuntimeException(e.toString());
+ }
+ }
+
+ public void commit(Xid xid, boolean onePhase) throws XAException
+ {
+ XAResource xaResource = getDelegate();
+
+ if (trace) { log.trace(this + " Calling commit"); }
+
+ try
+ {
+ xaResource.commit(xid, onePhase);
+ }
+ catch (XAException e)
+ {
+ log.info("Caught exception in commit", e);
+ throw check(e);
+ }
+ catch (Exception e)
+ {
+ log.info("Caught e in commit", e);
+ throw new RuntimeException(e.toString());
+ }
+ }
+
+ public void rollback(Xid xid) throws XAException
+ {
+ XAResource xaResource = getDelegate();
+
+ if (trace) { log.trace(this + " Calling rollback"); }
+
+ try
+ {
+ xaResource.rollback(xid);
+ }
+ catch (XAException e)
+ {
+ log.info("Caught exception in rollback", e);
+ throw check(e);
+ }
+ catch (Exception e)
+ {
+ log.info("Caught e in rollback", e);
+ throw new RuntimeException(e.toString());
+ }
+ }
+
+ public void forget(Xid xid) throws XAException
+ {
+ XAResource xaResource = getDelegate();
+
+ if (trace) { log.trace(this + " Calling forget"); }
+
+
+ try
+ {
+ xaResource.forget(xid);
+ }
+ catch (XAException e)
+ {
+ throw check(e);
+ }
+ catch (Exception e)
+ {
+ log.info("Caught e in forget", e);
+ throw new RuntimeException(e.toString());
+ }
+ }
+
+ public boolean isSameRM(XAResource xaRes) throws XAException
+ {
+ if (xaRes instanceof XAResourceWrapper)
+ xaRes = ((XAResourceWrapper) xaRes).getDelegate();
+
+ if (trace) { log.trace(this + " Calling isSameRM"); }
+
+
+ XAResource xaResource = getDelegate();
+ try
+ {
+ return xaResource.isSameRM(xaRes);
+ }
+ catch (XAException e)
+ {
+ throw check(e);
+ }
+ catch (Exception e)
+ {
+ log.info("Caught e in issamerm", e);
+ throw new RuntimeException(e.toString());
+ }
+ }
+
+ public int prepare(Xid xid) throws XAException
+ {
+ XAResource xaResource = getDelegate();
+
+ if (trace) { log.trace(this + " Calling prepare"); }
+
+
+ try
+ {
+ return xaResource.prepare(xid);
+ }
+ catch (XAException e)
+ {
+ throw check(e);
+ }
+ catch (Exception e)
+ {
+ log.info("Caught e in prepare", e);
+ throw new RuntimeException(e.toString());
+ }
+ }
+
+ public void start(Xid xid, int flags) throws XAException
+ {
+ XAResource xaResource = getDelegate();
+
+ if (trace) { log.trace(this + " Calling start"); }
+
+ try
+ {
+ xaResource.start(xid, flags);
+ }
+ catch (XAException e)
+ {
+ throw check(e);
+ }
+ catch (Exception e)
+ {
+ log.info("Caught e in start", e);
+ throw new RuntimeException(e.toString());
+ }
+ }
+
+ public void end(Xid xid, int flags) throws XAException
+ {
+ XAResource xaResource = getDelegate();
+
+ if (trace) { log.trace(this + " Calling end"); }
+
+ try
+ {
+ xaResource.end(xid, flags);
+ }
+ catch (XAException e)
+ {
+ throw check(e);
+ }
+ catch (Exception e)
+ {
+ log.info("Caught e in end", e);
+ throw new RuntimeException(e.toString());
+ }
+ }
+
+ public int getTransactionTimeout() throws XAException
+ {
+ XAResource xaResource = getDelegate();
+
+ if (trace) { log.trace(this + " Calling getTransactionTimeout"); }
+
+ try
+ {
+ return xaResource.getTransactionTimeout();
+ }
+ catch (XAException e)
+ {
+ throw check(e);
+ }
+ catch (Exception e)
+ {
+ log.info("Caught e in getTransactiontimeoiut", e);
+ throw new RuntimeException(e.toString());
+ }
+ }
+
+ public boolean setTransactionTimeout(int seconds) throws XAException
+ {
+ XAResource xaResource = getDelegate();
+
+ if (trace) { log.trace(this + " Calling setTransactionTimeout"); }
+ try
+ {
+ return xaResource.setTransactionTimeout(seconds);
+ }
+ catch (XAException e)
+ {
+ throw check(e);
+ }
+ catch (Exception e)
+ {
+ log.info("Caught e in settranactiotntimeoiut", e);
+ throw new RuntimeException(e.toString());
+ }
+ }
+
+ public void onException(JMSException exception)
+ {
+ log.warn("Notified of connection failure in recovery delegate", exception);
+ close();
+ }
+
+ /**
+ * Get the delegate XAResource
+ *
+ * @return the delegate
+ * @throws XAException for any problem
+ */
+ public XAResource getDelegate() throws XAException
+ {
+ XAResource result = null;
+ Exception error = null;
+ try
+ {
+ result = connect();
+ }
+ catch (Exception e)
+ {
+ error = e;
+ }
+
+ if (result == null)
+ {
+ XAException xae = new XAException("Error trying to connect");
+ xae.errorCode = XAException.XAER_RMERR;
+ if (error != null)
+ xae.initCause(error);
+ log.debug("Cannot get delegate XAResource", xae);
+ throw xae;
+ }
+
+ return result;
+ }
+
+ /**
+ * Connect to the server if not already done so
+ *
+ * @return the delegate XAResource
+ * @throws Exception for any problem
+ */
+ protected XAResource connect() throws Exception
+ {
+ // Do we already have a valid delegate?
+ synchronized (lock)
+ {
+ if (delegate != null)
+ return delegate;
+ }
+
+ if (trace) { log.trace(this + " Connecting"); }
+
+
+ // Create the connection
+ XAConnection xaConnection = getConnectionFactory().createXAConnection();
+ synchronized (lock)
+ {
+ connection = xaConnection;
+ }
+
+ // Retrieve the delegate XAResource
+ try
+ {
+ XASession session = connection.createXASession();
+ XAResource result = session.getXAResource();
+ synchronized (lock)
+ {
+ delegate = result;
+ }
+ return delegate;
+ }
+ catch (Exception e)
+ {
+ close();
+ throw e;
+ }
+ }
+
+ /**
+ * Get the XAConnectionFactory
+ *
+ * @return the connection
+ * @throws Exception for any problem
+ */
+ protected XAConnectionFactory getConnectionFactory() throws Exception
+ {
+ InitialContext ic = null;
+
+ try
+ {
+ ic = new InitialContext(jndiProperties);
+
+ XAConnectionFactory connectionFactory = (XAConnectionFactory)ic.lookup(connectionFactoryLookup);
+
+ return connectionFactory;
+ }
+ finally
+ {
+ if (ic != null)
+ {
+ ic.close();
+ }
+ }
+ }
+
+ /**
+ * Close the connection
+ */
+ public void close()
+ {
+ if (trace) { log.trace(this + " Close"); }
+
+ try
+ {
+ XAConnection oldConnection = null;
+ synchronized (lock)
+ {
+ oldConnection = connection;
+ connection = null;
+ delegate = null;
+ }
+ if (oldConnection != null)
+ oldConnection.close();
+ }
+ catch (Exception ignored)
+ {
+ log.trace("Ignored error during close", ignored);
+ }
+ }
+
+ /**
+ * Check whether an XAException is fatal. If it is an RM problem
+ * we close the connection so the next call will reconnect.
+ *
+ * @param e the xa exception
+ * @return never
+ * @throws XAException always
+ */
+ protected XAException check(XAException e) throws XAException
+ {
+ if (trace) { log.trace(this + " check " + e); }
+
+ if (e.errorCode == XAException.XAER_RMERR || e.errorCode == XAException.XAER_RMFAIL)
+ {
+ log.debug("Fatal error", e);
+ close();
+ }
+ throw e;
+ }
+
+ protected void finalize() throws Throwable
+ {
+ close();
+ }
+}
More information about the jboss-cvs-commits
mailing list