[jboss-cvs] JBoss Messaging SVN: r5912 - in trunk: examples/jms/src/org/jboss/jms/example and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Feb 20 09:38:51 EST 2009


Author: ataylor
Date: 2009-02-20 09:38:51 -0500 (Fri, 20 Feb 2009)
New Revision: 5912

Removed:
   trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandlerPool.java
Modified:
   trunk/examples/jms/build.xml
   trunk/examples/jms/src/org/jboss/jms/example/EJB3MDBExample.java
   trunk/examples/jms/src/org/jboss/jms/example/Sender.java
   trunk/examples/messaging/src/org/jboss/messaging/example/SimpleClient.java
   trunk/src/main/org/jboss/messaging/ra/JBMConnectionRequestInfo.java
   trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivation.java
   trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java
Log:
removed pooling from ResourceAdapter


Modified: trunk/examples/jms/build.xml
===================================================================
--- trunk/examples/jms/build.xml	2009-02-20 12:12:32 UTC (rev 5911)
+++ trunk/examples/jms/build.xml	2009-02-20 14:38:51 UTC (rev 5912)
@@ -79,6 +79,18 @@
       <pathelement location="config"/>
    </path>
 
+   <property name="jboss.home" value="${JBOSS_HOME}" />
+
+   <path id="as.classpath">
+      <path refid="runtime.classpath"/>
+		<fileset file="${jboss.home}/client/" >
+         <include name="**/*.jar"/>
+      </fileset>
+      <fileset file="${jboss.home}/common/lib/" >
+         <include name="**/*.jar"/>
+      </fileset>
+	</path>
+
    <target name="help" description="-> display help">
       <echo>*****************************************************************</echo>
       <echo>* to run examples execute one of the following *</echo>
@@ -219,7 +231,7 @@
 
    <target name="Sender" depends="compile" description="-> run performance test when sending to a queue">
       <java classname="org.jboss.jms.example.Sender" fork="true">
-         <classpath refid="runtime.classpath"/>
+         <classpath refid="as.classpath"/>
          <jvmarg value="-Xmx512M"/>
          <jvmarg value="-XX:+UseParallelGC"/>
          <jvmarg value="-XX:+AggressiveOpts"/>

Modified: trunk/examples/jms/src/org/jboss/jms/example/EJB3MDBExample.java
===================================================================
--- trunk/examples/jms/src/org/jboss/jms/example/EJB3MDBExample.java	2009-02-20 12:12:32 UTC (rev 5911)
+++ trunk/examples/jms/src/org/jboss/jms/example/EJB3MDBExample.java	2009-02-20 14:38:51 UTC (rev 5912)
@@ -44,7 +44,8 @@
 @MessageDriven(activationConfig =
 {
       @ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"),
-      @ActivationConfigProperty(propertyName="destination", propertyValue="queue/testQueue")
+      @ActivationConfigProperty(propertyName="destination", propertyValue="queue/testQueue"),
+      @ActivationConfigProperty(propertyName = "maxSession", propertyValue = "10")
 })
 public class EJB3MDBExample implements MessageListener
 {
@@ -65,16 +66,10 @@
          String text = tm.getText();
          System.out.println("message " + text + " received");
 
-         // flip the string
-         String result = "";
-         for(int i = 0; i < text.length(); i++)
-         {
-            result = text.charAt(i) + result;
-         }
+         System.out.print("Sleeping for 2 secs");
+         Thread.sleep(2000);
+         System.out.println("awake: sending message");
 
-         System.out.println("message processed, result: " + result);
-
-
          InitialContext ic = new InitialContext();
          ConnectionFactory cf = (ConnectionFactory)ic.lookup("java:/JmsXA");
          ic.close();
@@ -85,7 +80,7 @@
 
          Destination replyTo = m.getJMSReplyTo();
          MessageProducer producer = session.createProducer(replyTo);
-         TextMessage reply = session.createTextMessage(result);
+         TextMessage reply = session.createTextMessage(text);
 
          producer.send(reply);
          producer.close();

Modified: trunk/examples/jms/src/org/jboss/jms/example/Sender.java
===================================================================
--- trunk/examples/jms/src/org/jboss/jms/example/Sender.java	2009-02-20 12:12:32 UTC (rev 5911)
+++ trunk/examples/jms/src/org/jboss/jms/example/Sender.java	2009-02-20 14:38:51 UTC (rev 5912)
@@ -28,7 +28,12 @@
 import javax.jms.MessageConsumer;
 import javax.jms.TextMessage;
 import javax.jms.Session;
+import javax.jms.MessageListener;
+import javax.jms.Message;
+import javax.jms.JMSException;
 import javax.naming.InitialContext;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 /**
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
@@ -48,19 +53,33 @@
 
       Queue temporaryQueue = session.createTemporaryQueue();
       MessageConsumer consumer = session.createConsumer(temporaryQueue);
-      TextMessage message = session.createTextMessage("Hello!");
-      message.setJMSReplyTo(temporaryQueue);
-      sender.send(message);
+      final CountDownLatch latch = new CountDownLatch(30);
+      consumer.setMessageListener(new MessageListener()
+      {
+         public void onMessage(Message message)
+         {
+            try
+            {
+               TextMessage m = (TextMessage)message;
+               System.out.println("received " + m.getText());
+               latch.countDown();
+            }
+            catch (JMSException e)
+            {
+               e.printStackTrace();
+            }
+         }
+      });
+
       connection.start();
 
-      message = (TextMessage)consumer.receive(500000);
-
-
-      if (message == null)
+      for(int i = 1; i <= 30; i++)
       {
-         throw new Exception("Have not received any reply. The example failed!");
+         TextMessage message = session.createTextMessage("Message " + i);
+         message.setJMSReplyTo(temporaryQueue);
+         sender.send(message);
       }
-
+      latch.await(60, TimeUnit.SECONDS);
       connection.close();
    }
 }

Modified: trunk/examples/messaging/src/org/jboss/messaging/example/SimpleClient.java
===================================================================
--- trunk/examples/messaging/src/org/jboss/messaging/example/SimpleClient.java	2009-02-20 12:12:32 UTC (rev 5911)
+++ trunk/examples/messaging/src/org/jboss/messaging/example/SimpleClient.java	2009-02-20 14:38:51 UTC (rev 5912)
@@ -30,6 +30,8 @@
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.jms.client.JBossMessage;
+import org.jboss.messaging.jms.client.JBossTextMessage;
 
 /**
  * Uses the core messaging API to send and receive a message to a queue.
@@ -48,7 +50,7 @@
          clientSession = sessionFactory.createSession(false, true, true);
          SimpleString queue = new SimpleString("queuejms.testQueue");
          ClientProducer clientProducer = clientSession.createProducer(queue);
-         ClientMessage message = clientSession.createClientMessage(false);
+         ClientMessage message = clientSession.createClientMessage(JBossTextMessage.TYPE, false);
          message.getBody().putString("Hello!");
          clientProducer.send(message);
          ClientConsumer clientConsumer = clientSession.createConsumer(queue);

Modified: trunk/src/main/org/jboss/messaging/ra/JBMConnectionRequestInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/ra/JBMConnectionRequestInfo.java	2009-02-20 12:12:32 UTC (rev 5911)
+++ trunk/src/main/org/jboss/messaging/ra/JBMConnectionRequestInfo.java	2009-02-20 14:38:51 UTC (rev 5912)
@@ -32,6 +32,7 @@
  *
  * @author <a href="mailto:adrian at jboss.com">Adrian Brock</a>
  * @author <a href="mailto:jesper.pedersen at jboss.org">Jesper Pedersen</a>
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
  * @version $Revision:  $
  */
 public class JBMConnectionRequestInfo implements ConnectionRequestInfo
@@ -127,6 +128,7 @@
          password = prop.getPassword();
       if (clientID == null) 
          clientID = prop.getClientID();
+      useXA = prop.isUseXA();
    }
 
    /**

Modified: trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivation.java
===================================================================
--- trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivation.java	2009-02-20 12:12:32 UTC (rev 5911)
+++ trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivation.java	2009-02-20 14:38:51 UTC (rev 5912)
@@ -24,10 +24,13 @@
 import org.jboss.messaging.jms.client.JBossConnectionFactory;
 import org.jboss.messaging.ra.JBMResourceAdapter;
 import org.jboss.messaging.ra.Util;
+import org.jboss.messaging.ra.JBMMessageListener;
 import org.jboss.messaging.core.logging.Logger;
 
 import java.lang.reflect.Method;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.List;
+import java.util.ArrayList;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
@@ -54,6 +57,7 @@
  * 
  * @author <a href="adrian at jboss.com">Adrian Brock</a>
  * @author <a href="jesper.pedersen at jboss.org">Jesper Pedersen</a>
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
  * @version $Revision: $
  */
 public class JBMActivation implements ExceptionListener
@@ -95,11 +99,13 @@
    protected boolean isDeliveryTransacted;
    
    /** The message handler pool */
-   protected JBMMessageHandlerPool pool;
+   //protected JBMMessageHandlerPool pool;
 
    /** The TransactionManager */
    protected TransactionManager tm;
-   
+
+   private List<JBMMessageHandler> handlers = new ArrayList<JBMMessageHandler>();
+
    static
    {
       try
@@ -345,8 +351,14 @@
          if (ctx != null)
             ctx.close();
       }
-      setupPool();
-      
+      for(int i = 0; i < spec.getMaxSessionInt(); i++)
+      {
+         JBMMessageHandler handler = new JBMMessageHandler(this);
+         handler.setup();
+         handlers.add(handler);
+      }
+
+      connection.start();
       log.debug("Setup complete " + this);
    }
    
@@ -357,7 +369,10 @@
    {
       log.debug("Tearing down " + spec);
 
-      teardownPool();
+      for (JBMMessageHandler handler : handlers)
+      {
+         handler.teardown();
+      }
       teardownConnection();
       teardownDestination();
 
@@ -589,7 +604,7 @@
     * Setup the pool
     * @throws Exception for any error
     */
-   protected void setupPool() throws Exception
+   /*protected void setupPool() throws Exception
    {
       pool = new JBMMessageHandlerPool(this);
       log.debug("Created pool " + pool);
@@ -601,12 +616,12 @@
       log.debug("Starting delivery " + connection);
       connection.start();
       log.debug("Started delivery " + connection);
-   }
-   
+   }*/
+
    /**
     * Teardown the pool
     */
-   protected void teardownPool()
+   /*protected void teardownPool()
    {
       try
       {
@@ -634,7 +649,7 @@
          log.debug("Error clearing the pool " + pool, t);
       }
       pool = null;
-   }
+   }*/
 
    /**
     * Handles the setup
@@ -673,8 +688,8 @@
          buffer.append(" destination=").append(destination);
       if (connection != null)
          buffer.append(" connection=").append(connection);
-      if (pool != null)
-         buffer.append(" pool=").append(pool.getClass().getName());
+      //if (pool != null)
+         //buffer.append(" pool=").append(pool.getClass().getName());
       buffer.append(" transacted=").append(isDeliveryTransacted);
       buffer.append(')');
       return buffer.toString();

Modified: trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java	2009-02-20 12:12:32 UTC (rev 5911)
+++ trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java	2009-02-20 14:38:51 UTC (rev 5912)
@@ -1,8 +1,8 @@
 /*
- * JBoss, Home of Professional Open Source.
- * Copyright 2006, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
  *
  * This is free software; you can redistribute it and/or modify it
  * under the terms of the GNU Lesser General Public License as
@@ -21,99 +21,63 @@
  */
 package org.jboss.messaging.ra.inflow;
 
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.jms.Connection;
+import org.jboss.messaging.core.logging.Logger;
+
 import javax.jms.JMSException;
-import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.XASession;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
-import javax.jms.Session;
-import javax.jms.Topic;
+import javax.jms.Connection;
 import javax.jms.XAConnection;
-import javax.jms.XASession;
-import javax.resource.spi.endpoint.MessageEndpoint;
-import javax.resource.spi.endpoint.MessageEndpointFactory;
-import javax.resource.spi.work.Work;
-import javax.resource.spi.work.WorkEvent;
-import javax.resource.spi.work.WorkException;
-import javax.resource.spi.work.WorkListener;
-import javax.transaction.Status;
+import javax.jms.Topic;
+import javax.jms.Message;
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
+import javax.transaction.Status;
 import javax.transaction.xa.XAResource;
-
-import org.jboss.messaging.core.logging.Logger;
-
+import javax.resource.spi.endpoint.MessageEndpoint;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
 /**
  * The message handler
- * 
+ *
  * @author <a href="adrian at jboss.com">Adrian Brock</a>
  * @author <a href="mailto:jesper.pedersen at jboss.org">Jesper Pedersen</a>
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
  * @version $Revision: $
  */
-public class JBMMessageHandler implements MessageListener, Work, WorkListener
+public class JBMMessageHandler implements MessageListener
 {
      /** The logger */
    private static final Logger log = Logger.getLogger(JBMMessageHandler.class);
 
    /** Trace enabled */
-   private static boolean trace = log.isTraceEnabled();
+   private static boolean trace  = log.isTraceEnabled();
 
-   /** The message handler pool */
-   private JBMMessageHandlerPool pool;
-
-   /** Is in use */
-   private AtomicBoolean inUse;
-
-   /** Done latch */
-   private CountDownLatch done;
-
-   /** The transacted flag */
-   private boolean transacted;
-
-   /** The acknowledge mode */
-   private int acknowledge;
-
    /** The session */
    private Session session;
 
    /** Any XA session */
    private XASession xaSession;
 
-   /** The message consumer */
-   private MessageConsumer messageConsumer;
-
    /** The endpoint */
    private MessageEndpoint endpoint;
 
-   /** The transaction demarcation strategy */
-   private TransactionDemarcationStrategy txnStrategy;
+   private final JBMActivation activation;
 
-   /**
-    * Constructor
-    * @param pool The message handler pool
-    */
-   public JBMMessageHandler(JBMMessageHandlerPool pool)
-   {
-      if (trace)
-         log.trace("constructor(" + pool + ")");
+   /** The transaction demarcation strategy factory */
+   private DemarcationStrategyFactory strategyFactory = new DemarcationStrategyFactory();
 
-      this.pool = pool;
+   public JBMMessageHandler(JBMActivation activation)
+   {
+      this.activation = activation;
    }
 
-   /**
-    * Setup the session
-    */
    public void setup() throws Exception
    {
       if (trace)
          log.trace("setup()");
 
-      inUse = new AtomicBoolean(false);
-      done = new CountDownLatch(1);
-      
-      JBMActivation activation = pool.getActivation();
       JBMActivationSpec spec = activation.getActivationSpec();
       String selector = spec.getMessageSelector();
 
@@ -124,15 +88,16 @@
       {
          xaSession = ((XAConnection)connection).createXASession();
          session = xaSession.getSession();
-      } 
+      }
       else
       {
-         transacted = spec.isSessionTransacted();
-         acknowledge = spec.getAcknowledgeModeInt();
+         boolean transacted = spec.isSessionTransacted();
+         int acknowledge = spec.getAcknowledgeModeInt();
          session = connection.createSession(transacted, acknowledge);
       }
 
       // Create the message consumer
+      MessageConsumer messageConsumer;
       if (activation.isTopic() && spec.isSubscriptionDurable())
       {
          Topic topic = (Topic) activation.getDestination();
@@ -140,11 +105,11 @@
 
          if (selector == null || selector.trim().equals(""))
          {
-            messageConsumer = (MessageConsumer)session.createDurableSubscriber(topic, subscriptionName);
+            messageConsumer = session.createDurableSubscriber(topic, subscriptionName);
          }
          else
          {
-            messageConsumer = (MessageConsumer)session.createDurableSubscriber(topic, subscriptionName, selector, false);
+            messageConsumer = session.createDurableSubscriber(topic, subscriptionName, selector, false);
          }
       }
       else
@@ -162,10 +127,10 @@
       // Create the endpoint
       MessageEndpointFactory endpointFactory = activation.getMessageEndpointFactory();
       XAResource xaResource = null;
-      
+
       if (activation.isDeliveryTransacted() && xaSession != null)
          xaResource = xaSession.getXAResource();
-      
+
       endpoint = endpointFactory.createEndpoint(xaResource);
 
       // Set the message listener
@@ -179,12 +144,12 @@
    {
       if (trace)
          log.trace("teardown()");
-      
+
       try
       {
          if (endpoint != null)
             endpoint.release();
-      } 
+      }
       catch (Throwable t)
       {
          log.debug("Error releasing endpoint " + endpoint, t);
@@ -212,18 +177,6 @@
    }
 
    /**
-    * Is in use
-    * @return True if in use; otherwise false
-    */
-   public boolean isInUse()
-   {
-      if (trace)
-         log.trace("isInUse()");
-      
-      return inUse.get();
-   }
-
-   /**
     * On message
     * @param message The message
     */
@@ -232,17 +185,25 @@
       if (trace)
          log.trace("onMessage(" + message + ")");
 
-      inUse.set(true);
-      
+      TransactionDemarcationStrategy txnStrategy = strategyFactory.getStrategy();
       try
       {
+         txnStrategy.start();
+      }
+      catch (Throwable throwable)
+      {
+         log.warn("Unable to create transaction: " + throwable.getMessage());
+         txnStrategy = null;
+      }
+      try
+      {
          endpoint.beforeDelivery(JBMActivation.ONMESSAGE);
 
          try
          {
             MessageListener listener = (MessageListener) endpoint;
             listener.onMessage(message);
-         } 
+         }
          finally
          {
             endpoint.afterDelivery();
@@ -255,128 +216,14 @@
          if (txnStrategy != null)
             txnStrategy.error();
       }
-
-      inUse.set(false);
-      done.countDown();
-   }
-
-   /**
-    * Run
-    */
-   public void run()
-   {
-      if (trace)
-         log.trace("run()");
-
-      try
-      {
-         setup();
-
-         txnStrategy = createTransactionDemarcation();
-      } 
-      catch (Throwable t)
-      {
-         log.error("Error creating transaction demarcation. Cannot continue.");
-         return;
-      }
-
-      try
-      {
-         // Wait for onMessage
-         while (done.getCount() > 0)
-         {
-            try
-            {
-               done.await();
-            }
-            catch (InterruptedException ignore)
-            {
-            }
-         }
-      }
-      catch (Throwable t)
-      {
-         if (txnStrategy != null)
-            txnStrategy.error();
-
-      }
       finally
       {
          if (txnStrategy != null)
             txnStrategy.end();
-
-         txnStrategy = null;
       }
    }
 
    /**
-    * Release
-    */
-   public void release()
-   {
-      if (trace)
-         log.trace("release()");
-   }
-
-   /**
-    * Work accepted
-    * @param e The work event
-    */
-   public void workAccepted(WorkEvent e)
-   {
-      if (trace)
-         log.trace("workAccepted()");
-   }
-
-   /**
-    * Work completed
-    * @param e The work event
-    */
-   public void workCompleted(WorkEvent e)
-   {
-      if (trace)
-         log.trace("workCompleted()");
-
-      teardown();
-      pool.removeHandler(this);
-   }
-
-   /**
-    * Work rejected
-    * @param e The work event
-    */
-   public void workRejected(WorkEvent e)
-   {
-      if (trace)
-         log.trace("workRejected()");
-
-      teardown();
-      pool.removeHandler(this);
-   }
-
-   /**
-    * Work started
-    * @param e The work event
-    */
-   public void workStarted(WorkEvent e)
-   {
-      if (trace)
-         log.trace("workStarted()");
-   }
-
-   /**
-    * Create the transaction demarcation strategy
-    * @return The strategy
-    */
-   private TransactionDemarcationStrategy createTransactionDemarcation()
-   {
-      if (trace)
-         log.trace("createTransactionDemarcation()");
-
-      return new DemarcationStrategyFactory().getStrategy();
-   }
-
-   /**
     * Demarcation strategy factory
     */
    private class DemarcationStrategyFactory
@@ -390,20 +237,17 @@
          if (trace)
             log.trace("getStrategy()");
 
-         final JBMActivationSpec spec = pool.getActivation().getActivationSpec();
-         final JBMActivation activation = pool.getActivation();
-
          if (activation.isDeliveryTransacted())
          {
             try
             {
                return new XATransactionDemarcationStrategy();
-            } 
+            }
             catch (Throwable t)
             {
                log.error(this + " error creating transaction demarcation ", t);
             }
-         } 
+         }
          else
          {
             return new LocalDemarcationStrategy();
@@ -418,6 +262,10 @@
     */
    private interface TransactionDemarcationStrategy
    {
+      /*
+      * Start
+      */
+      void start() throws Throwable;
       /**
        * Error
        */
@@ -434,6 +282,13 @@
     */
    private class LocalDemarcationStrategy implements TransactionDemarcationStrategy
    {
+      /*
+   * Start
+   */
+      public void start()
+      {
+      }
+
       /**
        * Error
        */
@@ -442,7 +297,7 @@
          if (trace)
             log.trace("error()");
 
-         final JBMActivationSpec spec = pool.getActivation().getActivationSpec();
+         final JBMActivationSpec spec = activation.getActivationSpec();
 
          if (spec.isSessionTransacted())
          {
@@ -452,15 +307,15 @@
                {
                   /*
                    * Looks strange, but this basically means
-                   * 
+                   *
                    * If the underlying connection was non-XA and the transaction
                    * attribute is REQUIRED we rollback. Also, if the underlying
                    * connection was non-XA and the transaction attribute is
                    * NOT_SUPPORT and the non standard redelivery behavior is
                    * enabled we rollback to force redelivery.
-                   * 
+                   *
                    */
-                  if (pool.getActivation().isDeliveryTransacted() || spec.getRedeliverUnspecified())
+                  if (activation.isDeliveryTransacted() || spec.getRedeliverUnspecified())
                   {
                      session.rollback();
                   }
@@ -480,7 +335,7 @@
          if (trace)
             log.trace("error()");
 
-         final JBMActivationSpec spec = pool.getActivation().getActivationSpec();
+         final JBMActivationSpec spec = activation.getActivationSpec();
 
          if (spec.isSessionTransacted())
          {
@@ -504,11 +359,11 @@
    private class XATransactionDemarcationStrategy implements TransactionDemarcationStrategy
    {
       private Transaction trans = null;
-      private TransactionManager tm = pool.getActivation().getTransactionManager();
+      private TransactionManager tm = activation.getTransactionManager();
 
-      public XATransactionDemarcationStrategy() throws Throwable
+      public void start() throws Throwable
       {
-         final int timeout = pool.getActivation().getActivationSpec().getTransactionTimeout();
+         final int timeout = activation.getActivationSpec().getTransactionTimeout();
 
          if (timeout > 0)
          {
@@ -538,13 +393,13 @@
                if (trace)
                   log.trace(this + " XAResource '" + res + " enlisted.");
             }
-         } 
+         }
          catch (Throwable t)
          {
             try
             {
                tm.rollback();
-            } 
+            }
             catch (Throwable ignored)
             {
                log.trace(this + " ignored error rolling back after failed enlist", ignored);
@@ -562,7 +417,7 @@
                log.trace(this + " using TM to mark TX for rollback tx=" + trans);
 
             trans.setRollbackOnly();
-         } 
+         }
          catch (Throwable t)
          {
             log.error(this + " failed to set rollback only", t);
@@ -590,7 +445,7 @@
                // NO XASession? then manually rollback.
                // This is not so good but
                // it's the best we can do if we have no XASession.
-               if (xaSession == null && pool.getActivation().isDeliveryTransacted())
+               if (xaSession == null && activation.isDeliveryTransacted())
                {
                   session.rollback();
                }
@@ -608,17 +463,17 @@
 
                // NO XASession? then manually commit. This is not so good but
                // it's the best we can do if we have no XASession.
-               if (xaSession == null && pool.getActivation().isDeliveryTransacted())
+               if (xaSession == null && activation.isDeliveryTransacted())
                {
                   session.commit();
                }
 
-            } 
+            }
             else
             {
                tm.suspend();
 
-               if (xaSession == null && pool.getActivation().isDeliveryTransacted())
+               if (xaSession == null && activation.isDeliveryTransacted())
                {
                   session.rollback();
                }
@@ -630,3 +485,4 @@
       }
    }
 }
+

Deleted: trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandlerPool.java
===================================================================
--- trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandlerPool.java	2009-02-20 12:12:32 UTC (rev 5911)
+++ trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandlerPool.java	2009-02-20 14:38:51 UTC (rev 5912)
@@ -1,244 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2009, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.ra.inflow;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.resource.spi.work.WorkManager;
-
-import org.jboss.messaging.core.logging.Logger;
-
-/**
- * The message handler pool
- * 
- * @author <a href="adrian at jboss.com">Adrian Brock</a>
- * @author <a href="jesper.pedersen at jboss.org">Jesper Pedersen</a>
- * @version $Revision:  $
- */
-public class JBMMessageHandlerPool
-{
-    /** The logger */
-   private static final Logger log = Logger.getLogger(JBMMessageHandlerPool.class);
-
-   /** Trace enabled */
-   private static boolean trace = log.isTraceEnabled();
-      
-   /** The activation */
-   private JBMActivation activation;
-
-   /** The active sessions */
-   private ArrayList<JBMMessageHandler> activeSessions;
-   
-   /** Whether the pool is stopped */
-   private AtomicBoolean stopped;
-   
-   /**
-    * Constructor
-    * @param activation The activation
-    */
-   public JBMMessageHandlerPool(JBMActivation activation)
-   {
-      if (trace)
-         log.trace("constructor(" + activation + ")");
-
-      this.activation = activation;
-      this.activeSessions = new ArrayList<JBMMessageHandler>();
-      this.stopped = new AtomicBoolean(false);
-   }
-
-   /**
-    * Get the activation
-    * @return The value
-    */
-   public JBMActivation getActivation()
-   {
-      if (trace)
-         log.trace("getActivation()");
-
-      return activation;
-   }
-   
-   /**
-    * Start the pool
-    * @exception Exception Thrown if an error occurs
-    */
-   public void start() throws Exception
-   {
-      if (trace)
-         log.trace("start()");
-
-      setupSessions();
-   }
-
-   /**
-    * Stop the server session pool
-    */
-   public void stop()
-   {
-      if (trace)
-         log.trace("stop()");
-
-      // Disallow any new sessions
-      stopped.set(true);
-      
-      teardownSessions();
-   }
-   
-   /**
-    * Remove message handler
-    * @param handler The handler
-    */
-   protected void removeHandler(JBMMessageHandler handler)
-   {
-      if (trace)
-         log.trace("removeHandler(" + handler + ")");
-
-      synchronized (activeSessions)
-      {
-         activeSessions.remove(handler);
-         
-         if (!stopped.get())
-         {
-            try
-            {
-               setupSession();
-            }
-            catch (Exception e)
-            {
-               log.error("Unable to restart handler", e);
-            }
-         }
-         activeSessions.notifyAll();
-      }
-   }
-   
-   /**
-    * Starts the sessions
-    * @exception Exception Thrown if an error occurs
-    */
-   protected void setupSessions() throws Exception
-   {
-      if (trace)
-         log.trace("setupSessions()");
-
-      JBMActivationSpec spec = activation.getActivationSpec();
-
-      // Create the sessions
-      synchronized (activeSessions)
-      {
-         for (int i = 0; i < spec.getMaxSessionInt(); ++i)
-         {
-            setupSession();
-         }
-      }
-   }
-
-   /**
-    * Setup a session
-    * @exception Exception Thrown if an error occurs
-    */
-   protected void setupSession() throws Exception
-   {
-      if (trace)
-         log.trace("setupSession()");
-
-      WorkManager workManager = activation.getWorkManager();
-
-      // Create the session
-      JBMMessageHandler handler = new JBMMessageHandler(this);
-      workManager.scheduleWork(handler, 0, null, handler);
-
-      activeSessions.add(handler);
-   }
-
-   /**
-    * Stop the sessions
-    */
-   protected void teardownSessions()
-   {
-      if (trace)
-         log.trace("teardownSessions()");
-
-      synchronized (activeSessions)
-      {
-         List<JBMMessageHandler> cloned = (List<JBMMessageHandler>)activeSessions.clone();
-         for (int i = 0; i < cloned.size(); ++i)
-         {
-            JBMMessageHandler handler = cloned.get(i);
-            if (!handler.isInUse())
-            {
-               handler.teardown();
-               activeSessions.remove(handler);
-            }
-         }
-
-         activeSessions.notifyAll();
-      }
-
-      synchronized (activeSessions)
-      {
-         if (activation.getActivationSpec().isForceClearOnShutdown())
-         {        
-            int attempts = 0;
-            int forceClearAttempts = activation.getActivationSpec().getForceClearAttempts();
-            long forceClearInterval = activation.getActivationSpec().getForceClearOnShutdownInterval();
-
-            if (trace)
-               log.trace(this + " force clear behavior in effect. Waiting for " + forceClearInterval + " milliseconds for " + forceClearAttempts + " attempts.");
-           
-            while((activeSessions.size() > 0) && (attempts < forceClearAttempts))
-            {
-               try
-               {
-                  int currentSessions = activeSessions.size();
-                  activeSessions.wait(forceClearInterval);
-                  // Number of session didn't change
-                  if (activeSessions.size() == currentSessions)
-                  {
-                     ++attempts;
-                     log.trace(this + " clear attempt failed " + attempts); 
-                  }
-               }
-               catch(InterruptedException ignore)
-               {
-               }            
-            }
-         }
-         else
-         {
-            // Wait for inuse sessions
-            while (activeSessions.size() > 0)
-            {
-               try
-               {
-                  activeSessions.wait();
-               }
-               catch (InterruptedException ignore)
-               {
-               }
-            }
-         }
-      }
-   }
-}




More information about the jboss-cvs-commits mailing list