[jboss-cvs] JBoss Messaging SVN: r8418 - branches/JBMESSAGING_1890/src/main/org/jboss/jms/server/recovery.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Aug 15 09:42:32 EDT 2011


Author: gaohoward
Date: 2011-08-15 09:42:32 -0400 (Mon, 15 Aug 2011)
New Revision: 8418

Added:
   branches/JBMESSAGING_1890/src/main/org/jboss/jms/server/recovery/MessagingXAResourceRecovery2.java
   branches/JBMESSAGING_1890/src/main/org/jboss/jms/server/recovery/MessagingXAResourceWrapper2.java
Log:
first fix


Added: branches/JBMESSAGING_1890/src/main/org/jboss/jms/server/recovery/MessagingXAResourceRecovery2.java
===================================================================
--- branches/JBMESSAGING_1890/src/main/org/jboss/jms/server/recovery/MessagingXAResourceRecovery2.java	                        (rev 0)
+++ branches/JBMESSAGING_1890/src/main/org/jboss/jms/server/recovery/MessagingXAResourceRecovery2.java	2011-08-15 13:42:32 UTC (rev 8418)
@@ -0,0 +1,296 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.jms.server.recovery;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.StringTokenizer;
+
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.transaction.xa.XAResource;
+
+import org.jboss.jms.client.ClientAOPStackLoader;
+import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
+import org.jboss.jms.delegate.ConnectionFactoryDelegate;
+import org.jboss.jms.delegate.CreateConnectionResult;
+import org.jboss.jms.jndi.JMSProviderAdapter;
+import org.jboss.logging.Logger;
+import org.jboss.util.naming.Util;
+
+import com.arjuna.ats.jta.recovery.XAResourceRecovery;
+
+/**
+ * 
+ * A XAResourceRecovery instance that can be used to recover a JBM cluster.
+ * 
+ * This class needs a clustered connection factory to create a connection to JBM cluster.
+ * It keeps track of all living nodes in the cluster. On requesting for recovery,
+ * it can returns all XAResources for all the live nodes,
+ * therefore the recovery manager can use it alone to recover all prepared transactions
+ * in the cluster.
+ * 
+ * 
+ * It also can be used in a non clustered JBM node.
+ * 
+ */
+public class MessagingXAResourceRecovery2 implements XAResourceRecovery
+{
+   private boolean trace = log.isTraceEnabled();
+
+   private static final Logger log = Logger.getLogger(MessagingXAResourceRecovery2.class);
+   
+   private String providerAdaptorName;
+   
+   private String username;
+   
+   private String password;
+   
+   private Queue<XAResource> resources = null;
+   
+   private Iterator<XAResource> iterator = null;
+   
+   private ConnectionFactoryDelegate factoryDelegate;
+
+   public MessagingXAResourceRecovery2()
+   {
+      if(trace) log.trace("Constructing BridgeXAResourceRecovery2");
+      log.error(this + " xxxxxxxx constructing XAre2");
+   }
+
+   public boolean initialise(String config)
+   {
+      if (log.isTraceEnabled()) { log.trace(this + " intialise: " + config); }
+      
+      log.error(this + " xxxxxxx initializing ...");
+      
+      StringTokenizer tok = new StringTokenizer(config, ",");
+      
+      //First (mandatory) param is the provider adaptor name
+      
+      if (!tok.hasMoreTokens())
+      {
+         throw new IllegalArgumentException("Must specify provider adaptor name in config");
+      }
+      
+      providerAdaptorName = tok.nextToken();
+                  
+      //Next two (optional) parameters are the username and password to use for creating the connection
+      //for recovery
+      
+      if (tok.hasMoreTokens())
+      {
+         username = tok.nextToken();
+         
+         if (!tok.hasMoreTokens())
+         {
+            throw new IllegalArgumentException("If username is specified, password must be specified too");
+         }
+         
+         password = tok.nextToken();
+      }
+             
+      if (log.isTraceEnabled()) { log.trace(this + " initialised"); }      
+      
+      return true;      
+   }
+
+   /*
+    * Iterates the resources. If no more, clean up the resources
+    * and update again.
+    */
+   public boolean hasMoreResources()
+   {
+      if (log.isTraceEnabled()) { log.trace(this + " hasMoreResources"); }
+      
+      log.error(this + " xxxxxxx hasMore res " + resources);
+      
+      if (resources == null)
+      {
+         updateResources();
+      }
+      
+      if (iterator.hasNext())
+      {
+         log.error(this + " xxxxxxx yes we have");
+         return true;
+      }
+      
+      this.shutdownResources();
+      
+      log.error(this + " xxxxxxx no we dont");
+      
+      return false;
+   }
+
+   public XAResource getXAResource()
+   {
+      if (log.isTraceEnabled()) { log.trace(this + " getXAResource"); }
+      
+      log.error(this + " xxxxxxx getXARes called");
+      
+      return iterator.next();
+   }
+   
+   protected void finalize()
+   {
+      log.error(this + " xxxxxxxxxxxxxxx finalize called");
+      shutdownResources(); 
+   }
+   
+   private void shutdownResources()
+   {
+      if (resources == null) return;
+      
+      for (XAResource res : resources)
+      {
+         ((MessagingXAResourceWrapper2)res).close();
+      }
+      resources = null;
+      iterator = null;
+   }
+   
+   /*
+    * Get XAResources for each node. If it is a non-clustered factory,
+    * only one XAresource is returned.
+    */
+   private void updateResources()
+   {
+      if (log.isTraceEnabled()) { log.trace(this + " updating Resources"); }
+      resources = new LinkedList<XAResource>();
+      try
+      {
+         if (factoryDelegate == null)
+         {
+            JBossConnectionFactory factory = getConnectionFactory();
+            factoryDelegate = factory.getDelegate();
+            
+            try
+            {
+               ClientAOPStackLoader.getInstance().load(factoryDelegate);
+            }
+            catch(Exception e)
+            {
+               // Need to log message since no guarantee that client will log it
+               final String msg = "Failed to download and/or install client side AOP stack";
+               log.error(msg, e);
+               throw new RuntimeException(msg, e);
+            }
+
+         }
+         
+         if (factoryDelegate instanceof ClientClusteredConnectionFactoryDelegate)
+         {
+            if (log.isTraceEnabled()) { log.trace(this + " getting XAResources from the cluster"); }
+
+            ClientClusteredConnectionFactoryDelegate clusteredFactoryDelegate = (ClientClusteredConnectionFactoryDelegate)factoryDelegate;
+
+            ClientConnectionFactoryDelegate[] delegates = clusteredFactoryDelegate.getDelegates();
+
+            for (ClientConnectionFactoryDelegate del : delegates)
+            {
+               try
+               {
+                  CreateConnectionResult result = del.createConnectionDelegate(username, password, -1);
+                  MessagingXAResourceWrapper2 res = new MessagingXAResourceWrapper2(result.getDelegate(),
+                                                                                    del.getServerID());
+                  resources.add(res);
+               }
+               catch (Throwable e)
+               {
+                  log.error("Failed to create resource for node " + del.getServerID(), e);
+               }
+            }
+         }
+         else
+         {
+            if (log.isTraceEnabled()) { log.trace(this + " getting XAResources from single node"); }
+            
+            ClientConnectionDelegate delegate = null;
+            try
+            {
+               CreateConnectionResult result = factoryDelegate.createConnectionDelegate(username, password, -1);
+               delegate = result.getDelegate();
+               MessagingXAResourceWrapper2 res = new MessagingXAResourceWrapper2(delegate, delegate.getServerID());
+
+               resources.add(res);
+            }
+            catch (Throwable e)
+            {
+               log.error("Failed to create resource using factory " + factoryDelegate, e);
+            }
+         }
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to get connection factory.", e);
+      }
+      
+      iterator = resources.iterator();
+      
+      if (resources.isEmpty())
+      {
+         //if no resources, this could mean all nodes shut down for the moment
+         //so we set the delegate to null to enable a fresh jndi lookup
+         //next time.
+         this.factoryDelegate = null;
+      }
+   }
+   
+   //the provider adaptor should use HA-JNDI
+   private JBossConnectionFactory getConnectionFactory() throws Exception
+   {
+      if (log.isTraceEnabled()) { log.trace(this + " look up CF via provider " + providerAdaptorName); }
+      
+      log.error(this + " xxxxxxx look up CF via provider " + providerAdaptorName);
+      
+      Context ctx = new InitialContext();
+      
+      JMSProviderAdapter adapter = (JMSProviderAdapter) ctx.lookup(providerAdaptorName);
+
+      String connectionFactoryRef = adapter.getFactoryRef();
+      if (connectionFactoryRef == null)
+         throw new IllegalStateException("Provider '" + providerAdaptorName + "' has no FactoryRef");
+      
+      log.error(this + " xxxxxxx connectionFactoryRef: " + connectionFactoryRef);
+      
+      // Lookup the connection factory
+      ctx = adapter.getInitialContext();
+
+      try
+      {
+         JBossConnectionFactory factory = (JBossConnectionFactory)Util.lookup(ctx, connectionFactoryRef, JBossConnectionFactory.class);
+         log.error(this + " xxxxxxx got bare facotry " + factory);
+
+         return factory;
+      }
+      finally
+      {
+         ctx.close();
+      }
+   }
+}
+

Added: branches/JBMESSAGING_1890/src/main/org/jboss/jms/server/recovery/MessagingXAResourceWrapper2.java
===================================================================
--- branches/JBMESSAGING_1890/src/main/org/jboss/jms/server/recovery/MessagingXAResourceWrapper2.java	                        (rev 0)
+++ branches/JBMESSAGING_1890/src/main/org/jboss/jms/server/recovery/MessagingXAResourceWrapper2.java	2011-08-15 13:42:32 UTC (rev 8418)
@@ -0,0 +1,270 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.jms.server.recovery;
+
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.delegate.ClientSessionDelegate;
+import org.jboss.logging.Logger;
+
+/**
+ * XAResourceWrapper.
+ * 
+ * Mainly from org.jboss.server.XAResourceWrapper from the JBoss AS server module
+ * 
+ * The reason why we don't use that class directly is that it assumes on failure of connection
+ * the RM_FAIL or RM_ERR is thrown, but in JBM we throw XA_RETRY since we want the recovery manager to be able
+ * to retry on failure without having to manually retry
+ * 
+ * @author <a href="adrian at jboss.com">Adrian Brock</a>
+ * 
+ * @author <a href="tim.fox at jboss.com">Tim Fox/a>
+ * 
+ * @version $Revision: 45341 $
+ */
+public class MessagingXAResourceWrapper2 implements XAResource, ExceptionListener
+{
+   /** The log */
+   private static final Logger log = Logger.getLogger(MessagingXAResourceWrapper2.class);
+
+   /** The state lock */
+   private static final Object lock = new Object();
+   
+   /** The connection */
+   private ClientConnectionDelegate connection;
+   
+   /** The delegate XAResource */
+   private XAResource delegate;
+   
+   private int node;
+   
+   public MessagingXAResourceWrapper2(ClientConnectionDelegate connectionDelegate, int node) throws JMSException
+   {
+      try
+      {
+         log.error(this + " xxxxxx construct wrapper delegate: " + connectionDelegate + " node " + node);
+         
+         connection = connectionDelegate;
+         ClientSessionDelegate session = (ClientSessionDelegate)connection.createSessionDelegate(true, Session.SESSION_TRANSACTED, true);
+         delegate = session.getXAResource();
+         
+         log.error(this + " xxxxxxxxx get XAResource delegate: " + delegate);
+         
+         this.node = node;
+         connection.setExceptionListener(this);
+      }
+      catch(JMSException e)
+      {
+         log.error(this + " got exception creating XAResource", e);
+         this.close();
+         throw e;
+      }
+   }
+   
+   public Xid[] recover(int flag) throws XAException
+   {
+      try
+      {
+         log.debug("Invoking recover(" + flag + ") on the underlying XAResource.");
+         return delegate.recover(flag);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public void commit(Xid xid, boolean onePhase) throws XAException
+   {
+      log.debug("Commit " + node + " xid " + " onePhase=" + onePhase);
+      try
+      {
+         delegate.commit(xid, onePhase);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public void rollback(Xid xid) throws XAException
+   {
+      log.debug("Rollback " + node + " xid " + xid);
+      try
+      {
+         delegate.rollback(xid);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public void forget(Xid xid) throws XAException
+   {
+      log.debug("Forget " + node + " xid " + xid);
+      try
+      {
+         delegate.forget(xid);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public boolean isSameRM(XAResource xaRes) throws XAException
+   {
+      if (xaRes instanceof MessagingXAResourceWrapper)
+         xaRes = ((MessagingXAResourceWrapper) xaRes).getDelegate();
+
+      try
+      {
+         return delegate.isSameRM(xaRes);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public int prepare(Xid xid) throws XAException
+   {
+      try
+      {
+         return delegate.prepare(xid);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public void start(Xid xid, int flags) throws XAException
+   {
+      try
+      {
+         delegate.start(xid, flags);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public void end(Xid xid, int flags) throws XAException
+   {
+      try
+      {
+         delegate.end(xid, flags);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public int getTransactionTimeout() throws XAException
+   {
+      try
+      {
+         return delegate.getTransactionTimeout();
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   public boolean setTransactionTimeout(int seconds) throws XAException
+   {
+      try
+      {
+         return delegate.setTransactionTimeout(seconds);
+      }
+      catch (XAException e)
+      {
+         throw check(e);
+      }
+   }
+
+   /**
+    * Close the connection
+    */
+   public void close()
+   {
+      try
+      {
+         ClientConnectionDelegate oldConnection = null;
+         synchronized (lock)
+         {
+            oldConnection = connection;
+            connection = null;
+            delegate = null;
+         }
+         if (oldConnection != null)
+         {
+            oldConnection.closing(-1);
+            oldConnection.close();
+         }
+      }
+      catch (Exception ignored)
+      {
+         log.trace("Ignored error during close", ignored);
+      }
+   }
+
+   /**
+    * Check whether an XAException is fatal. If it is an RM problem
+    * we close the connection so the next call will reconnect.
+    * 
+    * @param e the xa exception
+    * @return never
+    * @throws XAException always
+    */
+   protected XAException check(XAException e) throws XAException
+   {
+      if (e.errorCode == XAException.XA_RETRY)
+      {
+         log.debug("Fatal error in provider " + node, e);
+         close();
+      }
+      log.debug("Caught XAException. Error code: " + e.errorCode + ". Will rethrow as XAER_RMFAIL.", e);
+      throw new XAException(XAException.XAER_RMFAIL);
+   }
+
+   protected void finalize() throws Throwable
+   {
+      close();
+   }
+
+   public void onException(JMSException arg0)
+   {
+      close();
+   }
+}



More information about the jboss-cvs-commits mailing list