[jboss-cvs] JBoss Messaging SVN: r1988 - trunk/src/main/org/jboss/jms/recovery.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jan 18 16:24:04 EST 2007


Author: timfox
Date: 2007-01-18 16:24:04 -0500 (Thu, 18 Jan 2007)
New Revision: 1988

Added:
   trunk/src/main/org/jboss/jms/recovery/BridgeXAResourceRecovery.java
   trunk/src/main/org/jboss/jms/recovery/BridgeXAResourceWrapper.java
Log:
Bridge recovery


Added: trunk/src/main/org/jboss/jms/recovery/BridgeXAResourceRecovery.java
===================================================================
--- trunk/src/main/org/jboss/jms/recovery/BridgeXAResourceRecovery.java	                        (rev 0)
+++ trunk/src/main/org/jboss/jms/recovery/BridgeXAResourceRecovery.java	2007-01-18 21:24:04 UTC (rev 1988)
@@ -0,0 +1,188 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.jms.recovery;
+
+import java.io.InputStream;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.StringTokenizer;
+
+import javax.transaction.xa.XAResource;
+
+import org.jboss.logging.Logger;
+
+import com.arjuna.ats.jta.recovery.XAResourceRecovery;
+
+/**
+ * @author <a href="adrian at jboss.com">Adrian Brock</a>
+ * @author <a href="juha at jboss.com">Juha Lindfors</a>
+ * @author <a href="tim.fox at jboss.com">Tim Fox</a>
+ *
+ * @version $Revision: 1.1 $
+ */
+public class BridgeXAResourceRecovery implements XAResourceRecovery
+{
+   private boolean trace = log.isTraceEnabled();
+
+   private static final Logger log = Logger.getLogger(BridgeXAResourceRecovery.class);
+
+   private BridgeXAResourceWrapper wrapper;
+
+   private boolean working = false;
+   
+   private Hashtable jndiProperties;
+   
+   private String connectionFactoryLookup;
+
+   public BridgeXAResourceRecovery()
+   {
+      if(trace) log.trace("Constructing BridgeXAResourceRecovery2..");
+   }
+
+   public boolean initialise(String config)
+   {
+      if (log.isTraceEnabled()) { log.trace(this + " intialise: " + config); }
+      
+      StringTokenizer tok = new StringTokenizer(config, ",");
+      
+      if (tok.countTokens() != 2)
+      {
+         log.error("Invalid config: " + config);
+         return false;
+      }
+      
+      String provider = tok.nextToken();
+      
+      String propsFile = tok.nextToken();
+      
+      try
+      {
+         //The config should point to a properties file on the classpath that holds the actual config
+         InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream(propsFile);
+         
+         Properties props = new Properties();
+         
+         props.load(is);
+         
+         /*
+          * provider1.jndi.prop1=xxxx
+          * provider1.jndi.prop2=yyyy
+          * provider1.jndi.prop3=zzzz
+          * 
+          * provider1.xaconnectionfactorylookup=xyz
+          * 
+          * provider2.jndi.prop1=xxxx
+          * provider2.jndi.prop2=yyyy
+          * provider2.jndi.prop3=zzzz
+          * 
+          * provider2.xaconnectionfactorylookup=xyz
+          *           
+          */
+         
+         Iterator iter = props.entrySet().iterator();
+         
+         String jndiPrefix = provider + ".jndi.";
+         
+         String cfKey = provider + ".xaconnectionfactorylookup";
+         
+         jndiProperties = new Hashtable();
+         
+         while (iter.hasNext())
+         {
+            Map.Entry entry = (Map.Entry)iter.next();
+            
+            String key = (String)entry.getKey();
+            String value = (String)entry.getValue();
+            
+            if (key.startsWith(jndiPrefix))
+            {
+               String actualKey = key.substring(jndiPrefix.length());
+               
+               jndiProperties.put(actualKey, value);
+            }
+            else if (key.equals(cfKey))
+            {
+               connectionFactoryLookup = value;
+            }
+         }
+         
+         if (connectionFactoryLookup == null)
+         {
+            log.error("Key " + cfKey + " does not exist in config");
+            return false;
+         }
+         
+         if (log.isTraceEnabled()) { log.trace(this + " initialised"); }
+         
+         return true;
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to load config file: " + config, e);
+         
+         return false;
+      }
+   }
+
+   public boolean hasMoreResources()
+   {
+      if (log.isTraceEnabled()) { log.trace(this + " hasMoreResources"); }
+            
+      // If the XAResource is already working
+      if (working)
+      {
+         log.info("Returning false");
+         return false;
+      }
+
+      // Have we initialized yet?
+      if (wrapper == null)
+      {
+         wrapper = new BridgeXAResourceWrapper(jndiProperties, connectionFactoryLookup);
+      }
+
+      // Test the connection
+      try
+      {
+         wrapper.getTransactionTimeout();
+         working = true;
+      }
+      catch (Exception ignored)
+      {
+      }
+      
+      log.info("Returning: " + working);
+
+      // This will return false until we get a successful connection
+      return working;
+   }
+
+   public XAResource getXAResource()
+   {
+      if (log.isTraceEnabled()) { log.trace(this + " getXAResource"); }
+      
+      return wrapper;
+   }
+}
+

Added: trunk/src/main/org/jboss/jms/recovery/BridgeXAResourceWrapper.java
===================================================================
--- trunk/src/main/org/jboss/jms/recovery/BridgeXAResourceWrapper.java	                        (rev 0)
+++ trunk/src/main/org/jboss/jms/recovery/BridgeXAResourceWrapper.java	2007-01-18 21:24:04 UTC (rev 1988)
@@ -0,0 +1,446 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.jms.recovery;
+
+import java.util.Hashtable;
+
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
+import javax.naming.InitialContext;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.jboss.logging.Logger;
+
+/**
+ * BridgeXAResourceWrapper.
+ * 
+ * @author <a href="adrian at jboss.com">Adrian Brock</a>
+ * @author <a href="tim.fox at jboss.com">Tim Fox</a>
+ * @version $Revision: 1.2 $
+ */
+public class BridgeXAResourceWrapper implements XAResource, ExceptionListener
+{
+   /** The log */
+   private static final Logger log = Logger.getLogger(XAResourceWrapper.class);
+   
+   private boolean trace = log.isTraceEnabled();
+
+   /** The state lock */
+   private static final Object lock = new Object();
+   
+   /** The connection */
+   private XAConnection connection;
+   
+   /** The delegate XAResource */
+   private XAResource delegate;
+   
+   private Hashtable jndiProperties;
+   
+   private String connectionFactoryLookup;
+   
+   public BridgeXAResourceWrapper(Hashtable jndiProperties, String connectionFactoryLookup)
+   {
+      this.jndiProperties = jndiProperties;
+      
+      this.connectionFactoryLookup = connectionFactoryLookup;
+   }
+   
+   public Xid[] recover(int flag) throws XAException
+   {
+      XAResource xaResource = getDelegate();
+      
+      if (trace) { log.trace(this + " Calling recover"); }
+                  
+      try
+      {
+         return xaResource.recover(flag);
+      }
+      catch (XAException e)
+      {
+         log.info("Caught exception in recover", e);
+         throw check(e);
+      }
+      catch (Exception e)
+      {
+         log.info("Caught e in recover", e);
+         throw new RuntimeException(e.toString());
+      }
+   }
+
+   public void commit(Xid xid, boolean onePhase) throws XAException
+   {
+      XAResource xaResource = getDelegate();
+      
+      if (trace) { log.trace(this + " Calling commit"); }
+      
+      try
+      {
+         xaResource.commit(xid, onePhase);
+      }
+      catch (XAException e)
+      {
+         log.info("Caught exception in commit", e);
+         throw check(e);
+      }
+      catch (Exception e)
+      {
+         log.info("Caught e in commit", e);
+         throw new RuntimeException(e.toString());
+      }
+   }
+
+   public void rollback(Xid xid) throws XAException
+   {
+      XAResource xaResource = getDelegate();
+      
+      if (trace) { log.trace(this + " Calling rollback"); }
+      
+      try
+      {
+         xaResource.rollback(xid);
+      }
+      catch (XAException e)
+      {
+         log.info("Caught exception in rollback", e);
+         throw check(e);
+      }
+      catch (Exception e)
+      {
+         log.info("Caught e in rollback", e);
+         throw new RuntimeException(e.toString());
+      }
+   }
+
+   public void forget(Xid xid) throws XAException
+   {
+      XAResource xaResource = getDelegate();
+      
+      if (trace) { log.trace(this + " Calling forget"); }
+      
+      
+      try
+      {
+         xaResource.forget(xid);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+      catch (Exception e)
+      {
+         log.info("Caught e in forget", e);
+         throw new RuntimeException(e.toString());
+      }
+   }
+
+   public boolean isSameRM(XAResource xaRes) throws XAException
+   {
+      if (xaRes instanceof XAResourceWrapper)
+         xaRes = ((XAResourceWrapper) xaRes).getDelegate();
+      
+      if (trace) { log.trace(this + " Calling isSameRM"); }
+      
+
+      XAResource xaResource = getDelegate();
+      try
+      {
+         return xaResource.isSameRM(xaRes);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+      catch (Exception e)
+      {
+         log.info("Caught e in issamerm", e);
+         throw new RuntimeException(e.toString());
+      }
+   }
+
+   public int prepare(Xid xid) throws XAException
+   {
+      XAResource xaResource = getDelegate();
+      
+      if (trace) { log.trace(this + " Calling prepare"); }
+      
+      
+      try
+      {
+         return xaResource.prepare(xid);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+      catch (Exception e)
+      {
+         log.info("Caught e in prepare", e);
+         throw new RuntimeException(e.toString());
+      }
+   }
+
+   public void start(Xid xid, int flags) throws XAException
+   {
+      XAResource xaResource = getDelegate();
+      
+      if (trace) { log.trace(this + " Calling start"); }
+      
+      try
+      {
+         xaResource.start(xid, flags);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+      catch (Exception e)
+      {
+         log.info("Caught e in start", e);
+         throw new RuntimeException(e.toString());
+      }
+   }
+
+   public void end(Xid xid, int flags) throws XAException
+   {
+      XAResource xaResource = getDelegate();
+      
+      if (trace) { log.trace(this + " Calling end"); }
+      
+      try
+      {
+         xaResource.end(xid, flags);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+      catch (Exception e)
+      {
+         log.info("Caught e in end", e);
+         throw new RuntimeException(e.toString());
+      }
+   }
+
+   public int getTransactionTimeout() throws XAException
+   {
+      XAResource xaResource = getDelegate();
+      
+      if (trace) { log.trace(this + " Calling getTransactionTimeout"); }
+      
+      try
+      {
+         return xaResource.getTransactionTimeout();
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+      catch (Exception e)
+      {
+         log.info("Caught e in getTransactiontimeoiut", e);
+         throw new RuntimeException(e.toString());
+      }
+   }
+
+   public boolean setTransactionTimeout(int seconds) throws XAException
+   {
+      XAResource xaResource = getDelegate();
+      
+      if (trace) { log.trace(this + " Calling setTransactionTimeout"); }
+      try
+      {
+         return xaResource.setTransactionTimeout(seconds);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+      catch (Exception e)
+      {
+         log.info("Caught e in settranactiotntimeoiut", e);
+         throw new RuntimeException(e.toString());
+      }
+   }
+
+   public void onException(JMSException exception)
+   {
+      log.warn("Notified of connection failure in recovery delegate", exception);
+      close();
+   }
+   
+   /**
+    * Get the delegate XAResource
+    * 
+    * @return the delegate
+    * @throws XAException for any problem
+    */
+   public XAResource getDelegate() throws XAException
+   {
+      XAResource result = null;
+      Exception error = null;
+      try
+      {
+         result = connect();
+      }
+      catch (Exception e)
+      {
+         error = e;
+      }
+
+      if (result == null)
+      {
+         XAException xae = new XAException("Error trying to connect");
+         xae.errorCode = XAException.XAER_RMERR;
+         if (error != null)
+            xae.initCause(error);
+         log.debug("Cannot get delegate XAResource", xae);
+         throw xae;
+      }
+      
+      return result;
+   }
+   
+   /**
+    * Connect to the server if not already done so
+    * 
+    * @return the delegate XAResource
+    * @throws Exception for any problem
+    */
+   protected XAResource connect() throws Exception
+   {
+      // Do we already have a valid delegate?
+      synchronized (lock)
+      {
+         if (delegate != null)
+            return delegate;
+      }
+      
+      if (trace) { log.trace(this + " Connecting"); }
+      
+      
+      // Create the connection
+      XAConnection xaConnection = getConnectionFactory().createXAConnection();
+      synchronized (lock)
+      {
+         connection = xaConnection;
+      }
+
+      // Retrieve the delegate XAResource
+      try
+      {
+         XASession session = connection.createXASession();
+         XAResource result = session.getXAResource();
+         synchronized (lock)
+         {
+            delegate = result;
+         }
+         return delegate;
+      }
+      catch (Exception e)
+      {
+         close();
+         throw e;
+      }
+   }
+
+   /**
+    * Get the XAConnectionFactory
+    * 
+    * @return the connection
+    * @throws Exception for any problem
+    */
+   protected XAConnectionFactory getConnectionFactory() throws Exception
+   {
+      InitialContext ic = null;
+      
+      try
+      {
+         ic = new InitialContext(jndiProperties);
+         
+         XAConnectionFactory connectionFactory = (XAConnectionFactory)ic.lookup(connectionFactoryLookup);
+         
+         return connectionFactory;
+      }
+      finally
+      {
+         if (ic != null)
+         {
+            ic.close();
+         }
+      }
+   }
+   
+   /**
+    * Close the connection
+    */
+   public void close()
+   {
+      if (trace) { log.trace(this + " Close"); }
+      
+      try
+      {
+         XAConnection oldConnection = null;
+         synchronized (lock)
+         {
+            oldConnection = connection;
+            connection = null;
+            delegate = null;
+         }
+         if (oldConnection != null)
+            oldConnection.close();
+      }
+      catch (Exception ignored)
+      {
+         log.trace("Ignored error during close", ignored);
+      }
+   }
+
+   /**
+    * Check whether an XAException is fatal. If it is an RM problem
+    * we close the connection so the next call will reconnect.
+    * 
+    * @param e the xa exception
+    * @return never
+    * @throws XAException always
+    */
+   protected XAException check(XAException e) throws XAException
+   {
+      if (trace) { log.trace(this + " check " + e); }
+      
+      if (e.errorCode == XAException.XAER_RMERR || e.errorCode == XAException.XAER_RMFAIL)
+      {
+         log.debug("Fatal error", e);
+         close();
+      }
+      throw e;
+   }
+
+   protected void finalize() throws Throwable
+   {
+      close();
+   }
+}




More information about the jboss-cvs-commits mailing list