[jboss-cvs] JBoss Messaging SVN: r5902 - in trunk/src/main/org/jboss/messaging/ra: inflow and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Feb 19 10:30:08 EST 2009


Author: jesper.pedersen
Date: 2009-02-19 10:30:08 -0500 (Thu, 19 Feb 2009)
New Revision: 5902

Modified:
   trunk/src/main/org/jboss/messaging/ra/JBMManagedConnection.java
   trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivationSpec.java
   trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java
   trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandlerPool.java
Log:
[JBMESSAGING-1367] Create JCA resource adapter for JBM 2.0

Modified: trunk/src/main/org/jboss/messaging/ra/JBMManagedConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/ra/JBMManagedConnection.java	2009-02-19 15:04:28 UTC (rev 5901)
+++ trunk/src/main/org/jboss/messaging/ra/JBMManagedConnection.java	2009-02-19 15:30:08 UTC (rev 5902)
@@ -188,7 +188,7 @@
       if (isDestroyed.get())
          throw new IllegalStateException("The managed connection is already destroyed");
 
-      Object session = null; // TODO - get one from JBossConnection -- new JBMSession(this, (JBMConnectionRequestInfo)cxRequestInfo);
+      Object session = new JBMSession(this, (JBMConnectionRequestInfo)cxRequestInfo);
       handles.add(session);
       return session;
    }

Modified: trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivationSpec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivationSpec.java	2009-02-19 15:04:28 UTC (rev 5901)
+++ trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivationSpec.java	2009-02-19 15:30:08 UTC (rev 5902)
@@ -137,7 +137,7 @@
       password = null;
       maxMessages = Integer.valueOf(1);
       minSession = Integer.valueOf(1);
-      maxSession = Integer.valueOf(1);
+      maxSession = Integer.valueOf(15);
       keepAlive = Long.valueOf(60000);
       sessionTransacted = Boolean.TRUE;
       reconnectAttempts = Integer.valueOf(5);

Modified: trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java	2009-02-19 15:04:28 UTC (rev 5901)
+++ trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java	2009-02-19 15:30:08 UTC (rev 5902)
@@ -21,6 +21,8 @@
  */
 package org.jboss.messaging.ra.inflow;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
 import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -32,6 +34,10 @@
 import javax.jms.XASession;
 import javax.resource.spi.endpoint.MessageEndpoint;
 import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.resource.spi.work.Work;
+import javax.resource.spi.work.WorkEvent;
+import javax.resource.spi.work.WorkException;
+import javax.resource.spi.work.WorkListener;
 import javax.transaction.Status;
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
@@ -46,7 +52,7 @@
  * @author <a href="mailto:jesper.pedersen at jboss.org">Jesper Pedersen</a>
  * @version $Revision: $
  */
-public class JBMMessageHandler implements MessageListener
+public class JBMMessageHandler implements MessageListener, Work, WorkListener
 {
      /** The logger */
    private static final Logger log = Logger.getLogger(JBMMessageHandler.class);
@@ -57,6 +63,12 @@
    /** The message handler pool */
    private JBMMessageHandlerPool pool;
 
+   /** Is in use */
+   private AtomicBoolean inUse;
+
+   /** Done latch */
+   private CountDownLatch done;
+
    /** The transacted flag */
    private boolean transacted;
 
@@ -97,6 +109,9 @@
    {
       if (trace)
          log.trace("setup()");
+
+      inUse = new AtomicBoolean(false);
+      done = new CountDownLatch(1);
       
       JBMActivation activation = pool.getActivation();
       JBMActivationSpec spec = activation.getActivationSpec();
@@ -153,9 +168,6 @@
       
       endpoint = endpointFactory.createEndpoint(xaResource);
 
-      // Create the transaction demarcation strategy
-      txnStrategy = createTransactionDemarcation();
-
       // Set the message listener
       messageConsumer.setMessageListener(this);
    }
@@ -200,6 +212,18 @@
    }
 
    /**
+    * Is in use
+    * @return True if in use; otherwise false
+    */
+   public boolean isInUse()
+   {
+      if (trace)
+         log.trace("isInUse()");
+      
+      return inUse.get();
+   }
+
+   /**
     * On message
     * @param message The message
     */
@@ -207,6 +231,8 @@
    {
       if (trace)
          log.trace("onMessage(" + message + ")");
+
+      inUse.set(true);
       
       try
       {
@@ -229,6 +255,51 @@
          if (txnStrategy != null)
             txnStrategy.error();
       }
+
+      inUse.set(false);
+      done.countDown();
+   }
+
+   /**
+    * Run
+    */
+   public void run()
+   {
+      if (trace)
+         log.trace("run()");
+
+      try
+      {
+         setup();
+
+         txnStrategy = createTransactionDemarcation();
+      } 
+      catch (Throwable t)
+      {
+         log.error("Error creating transaction demarcation. Cannot continue.");
+         return;
+      }
+
+      try
+      {
+         // Wait for onMessage
+         while (done.getCount() > 0)
+         {
+            try
+            {
+               done.wait();
+            }
+            catch (InterruptedException ignore)
+            {
+            }
+         }
+      }
+      catch (Throwable t)
+      {
+         if (txnStrategy != null)
+            txnStrategy.error();
+
+      }
       finally
       {
          if (txnStrategy != null)
@@ -236,11 +307,64 @@
 
          txnStrategy = null;
       }
+   }
 
+   /**
+    * Release
+    */
+   public void release()
+   {
+      if (trace)
+         log.trace("release()");
+   }
+
+   /**
+    * Work accepted
+    * @param e The work event
+    */
+   public void workAccepted(WorkEvent e)
+   {
+      if (trace)
+         log.trace("workAccepted()");
+   }
+
+   /**
+    * Work completed
+    * @param e The work event
+    */
+   public void workCompleted(WorkEvent e)
+   {
+      if (trace)
+         log.trace("workCompleted()");
+
+      teardown();
       pool.removeHandler(this);
    }
 
    /**
+    * Work rejected
+    * @param e The work event
+    */
+   public void workRejected(WorkEvent e)
+   {
+      if (trace)
+         log.trace("workRejected()");
+
+      teardown();
+      pool.removeHandler(this);
+   }
+
+   /**
+    * Work started
+    * @param e The work event
+    */
+   public void workStarted(WorkEvent e)
+   {
+      if (trace)
+         log.trace("workStarted()");
+   }
+
+   /**
     * Create the transaction demarcation strategy
     * @return The strategy
     */

Modified: trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandlerPool.java
===================================================================
--- trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandlerPool.java	2009-02-19 15:04:28 UTC (rev 5901)
+++ trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandlerPool.java	2009-02-19 15:30:08 UTC (rev 5902)
@@ -25,6 +25,8 @@
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import javax.resource.spi.work.WorkManager;
+
 import org.jboss.messaging.core.logging.Logger;
 
 /**
@@ -46,7 +48,7 @@
    private JBMActivation activation;
 
    /** The active sessions */
-   private List<JBMMessageHandler> activeSessions;
+   private ArrayList<JBMMessageHandler> activeSessions;
    
    /** Whether the pool is stopped */
    private AtomicBoolean stopped;
@@ -114,7 +116,6 @@
 
       synchronized (activeSessions)
       {
-         handler.teardown();
          activeSessions.remove(handler);
          
          if (!stopped.get())
@@ -128,8 +129,8 @@
                log.error("Unable to restart handler", e);
             }
          }
+         activeSessions.notifyAll();
       }
-      activeSessions.notifyAll();
    }
    
    /**
@@ -162,9 +163,11 @@
       if (trace)
          log.trace("setupSession()");
 
+      WorkManager workManager = activation.getWorkManager();
+
       // Create the session
       JBMMessageHandler handler = new JBMMessageHandler(this);
-      handler.setup();
+      workManager.scheduleWork(handler, 0, null, handler);
 
       activeSessions.add(handler);
    }
@@ -179,6 +182,22 @@
 
       synchronized (activeSessions)
       {
+         List<JBMMessageHandler> cloned = (List<JBMMessageHandler>)activeSessions.clone();
+         for (int i = 0; i < cloned.size(); ++i)
+         {
+            JBMMessageHandler handler = cloned.get(i);
+            if (!handler.isInUse())
+            {
+               handler.teardown();
+               activeSessions.remove(handler);
+            }
+         }
+
+         activeSessions.notifyAll();
+      }
+
+      synchronized (activeSessions)
+      {
          if (activation.getActivationSpec().isForceClearOnShutdown())
          {        
             int attempts = 0;




More information about the jboss-cvs-commits mailing list