[jboss-cvs] JBoss Messaging SVN: r5840 - in trunk/src: main/org/jboss/messaging/ra/inflow and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Feb 9 13:58:14 EST 2009


Author: jesper.pedersen
Date: 2009-02-09 13:58:14 -0500 (Mon, 09 Feb 2009)
New Revision: 5840

Added:
   trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java
   trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandlerPool.java
Modified:
   trunk/src/config/ra.xml
   trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivation.java
Log:
[JBMESSAGING-1367] Create JCA resource adapter for JBM 2.0

Modified: trunk/src/config/ra.xml
===================================================================
--- trunk/src/config/ra.xml	2009-02-09 05:32:16 UTC (rev 5839)
+++ trunk/src/config/ra.xml	2009-02-09 18:58:14 UTC (rev 5840)
@@ -250,5 +250,19 @@
          <reauthentication-support>false</reauthentication-support>
       </outbound-resourceadapter>
 
+      <inbound-resourceadapter>
+         <messageadapter>
+            <messagelistener>
+               <messagelistener-type>javax.jms.MessageListener</messagelistener-type>
+               <activationspec>
+                  <activationspec-class>org.jboss.messaging.ra.inflow.JBMActivationSpec</activationspec-class>
+                  <required-config-property>
+                      <config-property-name>destination</config-property-name>
+                  </required-config-property>
+               </activationspec>
+            </messagelistener>
+         </messageadapter>
+      </inbound-resourceadapter>
+
    </resourceadapter>
 </connector>

Modified: trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivation.java
===================================================================
--- trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivation.java	2009-02-09 05:32:16 UTC (rev 5839)
+++ trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivation.java	2009-02-09 18:58:14 UTC (rev 5840)
@@ -94,6 +94,9 @@
    /** Is the delivery transacted */
    protected boolean isDeliveryTransacted;
    
+   /** The message handler pool */
+   protected JBMMessageHandlerPool pool;
+
    /** The TransactionManager */
    protected TransactionManager tm;
    
@@ -342,7 +345,7 @@
          if (ctx != null)
             ctx.close();
       }
-      setupSessionPool();
+      setupPool();
       
       log.debug("Setup complete " + this);
    }
@@ -354,7 +357,7 @@
    {
       log.debug("Tearing down " + spec);
 
-      teardownSessionPool();
+      teardownPool();
       teardownConnection();
       teardownDestination();
 
@@ -583,20 +586,27 @@
    }
    
    /**
-    * Setup the server session pool
+    * Setup the pool
     * @throws Exception for any error
     */
-   protected void setupSessionPool() throws Exception
+   protected void setupPool() throws Exception
    {
+      pool = new JBMMessageHandlerPool(this);
+      log.debug("Created pool " + pool);
+
+      log.debug("Starting pool " + pool);
+      pool.start();
+      log.debug("Started pool " + pool);
+
       log.debug("Starting delivery " + connection);
       connection.start();
       log.debug("Started delivery " + connection);
    }
    
    /**
-    * Teardown the server session pool
+    * Teardown the pool
     */
-   protected void teardownSessionPool()
+   protected void teardownPool()
    {
       try
       {
@@ -610,6 +620,20 @@
       {
          log.debug("Error stopping delivery " + connection, t);
       }
+
+      try
+      {
+         if (pool != null)
+         {
+            log.debug("Stopping the pool " + pool);
+            pool.stop();
+         }
+      }
+      catch (Throwable t)
+      {
+         log.debug("Error clearing the pool " + pool, t);
+      }
+      pool = null;
    }
 
    /**
@@ -649,6 +673,8 @@
          buffer.append(" destination=").append(destination);
       if (connection != null)
          buffer.append(" connection=").append(connection);
+      if (pool != null)
+         buffer.append(" pool=").append(pool.getClass().getName());
       buffer.append(" transacted=").append(isDeliveryTransacted);
       buffer.append(')');
       return buffer.toString();

Added: trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java	2009-02-09 18:58:14 UTC (rev 5840)
@@ -0,0 +1,515 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.ra.inflow;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.XAConnection;
+import javax.jms.XASession;
+import javax.resource.spi.endpoint.MessageEndpoint;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.transaction.Status;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
+
+import org.jboss.messaging.core.logging.Logger;
+
+/**
+ * The message handler
+ * 
+ * @author <a href="adrian at jboss.com">Adrian Brock</a>
+ * @author <a href="mailto:jesper.pedersen at jboss.org">Jesper Pedersen</a>
+ * @version $Revision: $
+ */
+public class JBMMessageHandler implements MessageListener
+{
+     /** The logger */
+   private static final Logger log = Logger.getLogger(JBMMessageHandler.class);
+
+   /** Trace enabled */
+   private static boolean trace = log.isTraceEnabled();
+
+   /** The message handler pool */
+   private JBMMessageHandlerPool pool;
+
+   /** The transacted flag */
+   private boolean transacted;
+
+   /** The acknowledge mode */
+   private int acknowledge;
+
+   /** The session */
+   private Session session;
+
+   /** Any XA session */
+   private XASession xaSession;
+
+   /** The message consumer */
+   private MessageConsumer messageConsumer;
+
+   /** The endpoint */
+   private MessageEndpoint endpoint;
+
+   /** The transaction demarcation strategy */
+   private TransactionDemarcationStrategy txnStrategy;
+
+   /**
+    * Constructor
+    * @param pool The message handler pool
+    */
+   public JBMMessageHandler(JBMMessageHandlerPool pool)
+   {
+      if (trace)
+         log.trace("constructor(" + pool + ")");
+
+      this.pool = pool;
+   }
+
+   /**
+    * Setup the session
+    */
+   public void setup() throws Exception
+   {
+      if (trace)
+         log.trace("setup()");
+      
+      JBMActivation activation = pool.getActivation();
+      JBMActivationSpec spec = activation.getActivationSpec();
+      String selector = spec.getMessageSelector();
+
+      Connection connection = activation.getConnection();
+
+      // Create the session
+      if (activation.isDeliveryTransacted())
+      {
+         xaSession = ((XAConnection)connection).createXASession();
+         session = xaSession.getSession();
+      } 
+      else
+      {
+         transacted = spec.isSessionTransacted();
+         acknowledge = spec.getAcknowledgeModeInt();
+         session = connection.createSession(transacted, acknowledge);
+      }
+
+      // Create the message consumer
+      if (activation.isTopic() && spec.isSubscriptionDurable())
+      {
+         Topic topic = (Topic) activation.getDestination();
+         String subscriptionName = spec.getSubscriptionName();
+
+         if (selector == null || selector.trim().equals(""))
+         {
+            messageConsumer = (MessageConsumer)session.createDurableSubscriber(topic, subscriptionName);
+         }
+         else
+         {
+            messageConsumer = (MessageConsumer)session.createDurableSubscriber(topic, subscriptionName, selector, false);
+         }
+      }
+      else
+      {
+         if (selector == null || selector.trim().equals(""))
+         {
+            messageConsumer = session.createConsumer(activation.getDestination());
+         }
+         else
+         {
+            messageConsumer = session.createConsumer(activation.getDestination(), selector);
+         }
+      }
+
+      // Create the endpoint
+      MessageEndpointFactory endpointFactory = activation.getMessageEndpointFactory();
+      XAResource xaResource = null;
+      
+      if (activation.isDeliveryTransacted() && xaSession != null)
+         xaResource = xaSession.getXAResource();
+      
+      endpoint = endpointFactory.createEndpoint(xaResource);
+
+      // Set the message listener
+      messageConsumer.setMessageListener(this);
+   }
+
+   /**
+    * Stop the handler
+    */
+   public void teardown()
+   {
+      if (trace)
+         log.trace("teardown()");
+      
+      try
+      {
+         if (endpoint != null)
+            endpoint.release();
+      } 
+      catch (Throwable t)
+      {
+         log.debug("Error releasing endpoint " + endpoint, t);
+      }
+
+      try
+      {
+         if (xaSession != null)
+            xaSession.close();
+      }
+      catch (Throwable t)
+      {
+         log.debug("Error releasing xaSession " + xaSession, t);
+      }
+
+      try
+      {
+         if (session != null)
+            session.close();
+      }
+      catch (Throwable t)
+      {
+         log.debug("Error releasing session " + session, t);
+      }
+   }
+
+   /**
+    * On message
+    * @param message The message
+    */
+   public void onMessage(Message message)
+   {
+      if (trace)
+         log.trace("onMessage(" + message + ")");
+      
+      try
+      {
+         txnStrategy = createTransactionDemarcation();
+      } 
+      catch (Throwable t)
+      {
+         log.error("Error creating transaction demarcation. Cannot continue.");
+         return;
+      }
+
+      try
+      {
+         endpoint.beforeDelivery(JBMActivation.ONMESSAGE);
+
+         try
+         {
+            MessageListener listener = (MessageListener) endpoint;
+            listener.onMessage(message);
+         } 
+         finally
+         {
+            endpoint.afterDelivery();
+         }
+      }
+      catch (Throwable t)
+      {
+         log.error("Unexpected error delivering message " + message, t);
+
+         if (txnStrategy != null)
+            txnStrategy.error();
+      }
+      finally
+      {
+         if (txnStrategy != null)
+            txnStrategy.end();
+
+         txnStrategy = null;
+      }
+
+      pool.removeHandler(this);
+   }
+
+   /**
+    * Create the transaction demarcation strategy
+    * @return The strategy
+    */
+   private TransactionDemarcationStrategy createTransactionDemarcation()
+   {
+      if (trace)
+         log.trace("createTransactionDemarcation()");
+
+      return new DemarcationStrategyFactory().getStrategy();
+   }
+
+   /**
+    * Demarcation strategy factory
+    */
+   private class DemarcationStrategyFactory
+   {
+      /**
+       * Get the transaction demarcation strategy
+       * @return The strategy
+       */
+      TransactionDemarcationStrategy getStrategy()
+      {
+         if (trace)
+            log.trace("getStrategy()");
+
+         final JBMActivationSpec spec = pool.getActivation().getActivationSpec();
+         final JBMActivation activation = pool.getActivation();
+
+         if (activation.isDeliveryTransacted())
+         {
+            try
+            {
+               return new XATransactionDemarcationStrategy();
+            } 
+            catch (Throwable t)
+            {
+               log.error(this + " error creating transaction demarcation ", t);
+            }
+         } 
+         else
+         {
+            return new LocalDemarcationStrategy();
+         }
+
+         return null;
+      }
+   }
+
+   /**
+    * Transaction demarcation strategy
+    */
+   private interface TransactionDemarcationStrategy
+   {
+      /**
+       * Error
+       */
+      void error();
+
+      /**
+       * End
+       */
+      void end();
+   }
+
+   /**
+    * Local demarcation strategy
+    */
+   private class LocalDemarcationStrategy implements TransactionDemarcationStrategy
+   {
+      /**
+       * Error
+       */
+      public void error()
+      {
+         if (trace)
+            log.trace("error()");
+
+         final JBMActivationSpec spec = pool.getActivation().getActivationSpec();
+
+         if (spec.isSessionTransacted())
+         {
+            if (session != null)
+            {
+               try
+               {
+                  /*
+                   * Looks strange, but this basically means
+                   * 
+                   * If the underlying connection was non-XA and the transaction
+                   * attribute is REQUIRED we rollback. Also, if the underlying
+                   * connection was non-XA and the transaction attribute is
+                   * NOT_SUPPORT and the non standard redelivery behavior is
+                   * enabled we rollback to force redelivery.
+                   * 
+                   */
+                  if (pool.getActivation().isDeliveryTransacted() || spec.getRedeliverUnspecified())
+                  {
+                     session.rollback();
+                  }
+               } catch (JMSException e)
+               {
+                  log.error("Failed to rollback session transaction", e);
+               }
+            }
+         }
+      }
+
+      /**
+       * End
+       */
+      public void end()
+      {
+         if (trace)
+            log.trace("error()");
+
+         final JBMActivationSpec spec = pool.getActivation().getActivationSpec();
+
+         if (spec.isSessionTransacted())
+         {
+            if (session != null)
+            {
+               try
+               {
+                  session.commit();
+               } catch (JMSException e)
+               {
+                  log.error("Failed to commit session transaction", e);
+               }
+            }
+         }
+      }
+   }
+
+   /**
+    * XA demarcation strategy
+    */
+   private class XATransactionDemarcationStrategy implements TransactionDemarcationStrategy
+   {
+      private Transaction trans = null;
+      private TransactionManager tm = pool.getActivation().getTransactionManager();;
+
+      public XATransactionDemarcationStrategy() throws Throwable
+      {
+         final int timeout = pool.getActivation().getActivationSpec().getTransactionTimeout();
+
+         if (timeout > 0)
+         {
+            if (trace)
+               log.trace("Setting transactionTimeout for JMSSessionPool to " + timeout);
+
+            tm.setTransactionTimeout(timeout);
+         }
+
+         tm.begin();
+
+         try
+         {
+            trans = tm.getTransaction();
+
+            if (trace)
+               log.trace(this + " using tx=" + trans);
+
+            if (xaSession != null)
+            {
+               XAResource res = xaSession.getXAResource();
+
+               if (!trans.enlistResource(res))
+               {
+                  throw new JMSException("could not enlist resource");
+               }
+               if (trace)
+                  log.trace(this + " XAResource '" + res + " enlisted.");
+            }
+         } 
+         catch (Throwable t)
+         {
+            try
+            {
+               tm.rollback();
+            } 
+            catch (Throwable ignored)
+            {
+               log.trace(this + " ignored error rolling back after failed enlist", ignored);
+            }
+            throw t;
+         }
+      }
+
+      public void error()
+      {
+         // Mark for tollback TX via TM
+         try
+         {
+            if (trace)
+               log.trace(this + " using TM to mark TX for rollback tx=" + trans);
+
+            trans.setRollbackOnly();
+         } 
+         catch (Throwable t)
+         {
+            log.error(this + " failed to set rollback only", t);
+         }
+      }
+
+      public void end()
+      {
+         try
+         {
+            // Use the TM to commit the Tx (assert the correct association)
+            Transaction currentTx = tm.getTransaction();
+            if (trans.equals(currentTx) == false)
+               throw new IllegalStateException("Wrong tx association: expected " + trans + " was " + currentTx);
+
+            // Marked rollback
+            if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK)
+            {
+               if (trace)
+                  log.trace(this + " rolling back JMS transaction tx=" + trans);
+
+               // Actually roll it back
+               tm.rollback();
+
+               // NO XASession? then manually rollback.
+               // This is not so good but
+               // it's the best we can do if we have no XASession.
+               if (xaSession == null && pool.getActivation().isDeliveryTransacted())
+               {
+                  session.rollback();
+               }
+            }
+            else if (trans.getStatus() == Status.STATUS_ACTIVE)
+            {
+               // Commit tx
+               // This will happen if
+               // a) everything goes well
+               // b) app. exception was thrown
+               if (trace)
+                  log.trace(this + " commiting the JMS transaction tx=" + trans);
+
+               tm.commit();
+
+               // NO XASession? then manually commit. This is not so good but
+               // it's the best we can do if we have no XASession.
+               if (xaSession == null && pool.getActivation().isDeliveryTransacted())
+               {
+                  session.commit();
+               }
+
+            } 
+            else
+            {
+               tm.suspend();
+
+               if (xaSession == null && pool.getActivation().isDeliveryTransacted())
+               {
+                  session.rollback();
+               }
+            }
+         } catch (Throwable t)
+         {
+            log.error(this + " failed to commit/rollback", t);
+         }
+      }
+   }
+}

Added: trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandlerPool.java
===================================================================
--- trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandlerPool.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandlerPool.java	2009-02-09 18:58:14 UTC (rev 5840)
@@ -0,0 +1,224 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.ra.inflow;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.jboss.messaging.core.logging.Logger;
+
+/**
+ * The message handler pool
+ * 
+ * @author <a href="adrian at jboss.com">Adrian Brock</a>
+ * @author <a href="jesper.pedersen at jboss.org">Jesper Pedersen</a>
+ * @version $Revision:  $
+ */
+public class JBMMessageHandlerPool
+{
+    /** The logger */
+   private static final Logger log = Logger.getLogger(JBMMessageHandlerPool.class);
+
+   /** Trace enabled */
+   private static boolean trace = log.isTraceEnabled();
+      
+   /** The activation */
+   private JBMActivation activation;
+
+   /** The active sessions */
+   private List<JBMMessageHandler> activeSessions;
+   
+   /** Whether the pool is stopped */
+   private AtomicBoolean stopped;
+   
+   /**
+    * Constructor
+    * @param activation The activation
+    */
+   public JBMMessageHandlerPool(JBMActivation activation)
+   {
+      if (trace)
+         log.trace("constructor(" + activation + ")");
+
+      this.activation = activation;
+      this.activeSessions = new ArrayList<JBMMessageHandler>();
+      this.stopped = new AtomicBoolean(false);
+   }
+
+   /**
+    * Get the activation
+    * @return The value
+    */
+   public JBMActivation getActivation()
+   {
+      if (trace)
+         log.trace("getActivation()");
+
+      return activation;
+   }
+   
+   /**
+    * Start the pool
+    * @exception Exception Thrown if an error occurs
+    */
+   public void start() throws Exception
+   {
+      if (trace)
+         log.trace("start()");
+
+      setupSessions();
+   }
+
+   /**
+    * Stop the server session pool
+    */
+   public void stop()
+   {
+      if (trace)
+         log.trace("stop()");
+
+      // Disallow any new sessions
+      stopped.set(true);
+      
+      teardownSessions();
+   }
+   
+   /**
+    * Remove message handler
+    * @param handler The handler
+    */
+   protected void removeHandler(JBMMessageHandler handler)
+   {
+      if (trace)
+         log.trace("removeHandler(" + handler + ")");
+
+      synchronized (activeSessions)
+      {
+         activeSessions.remove(handler);
+         
+         if (!stopped.get())
+         {
+            try
+            {
+               setupSession();
+            }
+            catch (Exception e)
+            {
+               log.error("Unable to restart handler", e);
+            }
+         }
+      }
+      activeSessions.notifyAll();
+   }
+   
+   /**
+    * Starts the sessions
+    * @exception Exception Thrown if an error occurs
+    */
+   protected void setupSessions() throws Exception
+   {
+      if (trace)
+         log.trace("setupSessions()");
+
+      JBMActivationSpec spec = activation.getActivationSpec();
+
+      // Create the sessions
+      synchronized (activeSessions)
+      {
+         for (int i = 0; i < spec.getMaxSessionInt(); ++i)
+         {
+            setupSession();
+         }
+      }
+   }
+
+   /**
+    * Setup a session
+    * @exception Exception Thrown if an error occurs
+    */
+   protected void setupSession() throws Exception
+   {
+      if (trace)
+         log.trace("setupSession()");
+
+      // Create the session
+      JBMMessageHandler handler = new JBMMessageHandler(this);
+      handler.setup();
+
+      activeSessions.add(handler);
+   }
+
+   /**
+    * Stop the sessions
+    */
+   protected void teardownSessions()
+   {
+      if (trace)
+         log.trace("teardownSessions()");
+
+      synchronized (activeSessions)
+      {
+         if (activation.getActivationSpec().isForceClearOnShutdown())
+         {        
+            int attempts = 0;
+            int forceClearAttempts = activation.getActivationSpec().getForceClearAttempts();
+            long forceClearInterval = activation.getActivationSpec().getForceClearOnShutdownInterval();
+
+            if (trace)
+               log.trace(this + " force clear behavior in effect. Waiting for " + forceClearInterval + " milliseconds for " + forceClearAttempts + " attempts.");
+           
+            while((activeSessions.size() > 0) && (attempts < forceClearAttempts))
+            {
+               try
+               {
+                  int currentSessions = activeSessions.size();
+                  activeSessions.wait(forceClearInterval);
+                  // Number of session didn't change
+                  if (activeSessions.size() == currentSessions)
+                  {
+                     ++attempts;
+                     log.trace(this + " clear attempt failed " + attempts); 
+                  }
+               }
+               catch(InterruptedException ignore)
+               {
+               }            
+            }
+         }
+         else
+         {
+            // Wait for inuse sessions
+            while (activeSessions.size() > 0)
+            {
+               try
+               {
+                  activeSessions.wait();
+               }
+               catch (InterruptedException ignore)
+               {
+               }
+            }
+         }
+      }
+   }
+}




More information about the jboss-cvs-commits mailing list