[jboss-cvs] JBossAS SVN: r93219 - in branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752: testsuite/src/main/org/jboss/test/jca/inflow and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Sep 4 14:33:23 EDT 2009


Author: jbertram at redhat.com
Date: 2009-09-04 14:33:23 -0400 (Fri, 04 Sep 2009)
New Revision: 93219

Modified:
   branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/server/src/main/org/jboss/ejb/plugins/inflow/MessageEndpointInterceptor.java
   branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/testsuite/src/main/org/jboss/test/jca/inflow/TestMessageListener.java
   branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/testsuite/src/main/org/jboss/test/jca/inflow/TestResourceAdapterInflow.java
   branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/testsuite/src/main/org/jboss/test/jca/inflowmdb/TestMDBMessageListener.java
   branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/testsuite/src/resources/jca/inflowmdb/META-INF/ejb-jar.xml
Log:
[JBPAPP-2752]

Modified: branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/server/src/main/org/jboss/ejb/plugins/inflow/MessageEndpointInterceptor.java
===================================================================
--- branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/server/src/main/org/jboss/ejb/plugins/inflow/MessageEndpointInterceptor.java	2009-09-04 18:27:00 UTC (rev 93218)
+++ branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/server/src/main/org/jboss/ejb/plugins/inflow/MessageEndpointInterceptor.java	2009-09-04 18:33:23 UTC (rev 93219)
@@ -22,6 +22,7 @@
 package org.jboss.ejb.plugins.inflow;
 
 import java.lang.reflect.Method;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.resource.ResourceException;
 import javax.transaction.Status;
@@ -34,9 +35,6 @@
 import org.jboss.logging.Logger;
 import org.jboss.proxy.Interceptor;
 
-
-import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
-
 /**
  * Implements the application server message endpoint requirements.
  * 
@@ -47,8 +45,6 @@
 {
    /** The serialVersionUID */
    private static final long serialVersionUID =  -8740717288847385688L;
-
-   // Constants -----------------------------------------------------
    
    /** The log */
    private static final Logger log = Logger.getLogger(MessageEndpointInterceptor.class);
@@ -59,8 +55,6 @@
    /** The key for the xa resource */
    public static final String MESSAGE_ENDPOINT_XARESOURCE = "MessageEndpoint.XAResource";
    
-   // Attributes ----------------------------------------------------
-   
    /** Whether trace is enabled */
    private boolean trace = log.isTraceEnabled(); 
    
@@ -68,10 +62,10 @@
    private String cachedProxyString = null;
    
    /** Whether this proxy has been released */
-   protected SynchronizedBoolean released = new SynchronizedBoolean(false);
+   protected AtomicBoolean released = new AtomicBoolean(false);
    
    /** Whether we have delivered a message */
-   protected boolean delivered = false;
+   protected AtomicBoolean delivered = new AtomicBoolean(false);
    
    /** The in use thread */
    protected Thread inUseThread = null;
@@ -91,18 +85,9 @@
    /** The message endpoint factory */
    private JBossMessageEndpointFactory endpointFactory;
    
-   
-   // Static --------------------------------------------------------
-   
-   // Constructors --------------------------------------------------
-   
    public MessageEndpointInterceptor()
    {
    }
-   
-   // Public --------------------------------------------------------
-   
-   // Interceptor implementation ------------------------------------
 
    public Object invoke(Invocation mi) throws Throwable
    {
@@ -111,10 +96,13 @@
          throw new IllegalStateException("This message endpoint + " + getProxyString(mi) + " has been released");
 
       // Concurrent invocation?
-      Thread currentThread = Thread.currentThread();
-      if (inUseThread != null && inUseThread.equals(currentThread) == false)
-         throw new IllegalStateException("This message endpoint + " + getProxyString(mi) + " is already in use by another thread " + inUseThread);
-      inUseThread = currentThread;
+      synchronized (this)
+      {
+         Thread currentThread = Thread.currentThread();
+         if (inUseThread != null && inUseThread.equals(currentThread) == false)
+            throw new IllegalStateException("This message endpoint + " + getProxyString(mi) + " is already in use by another thread " + inUseThread);
+         inUseThread = currentThread;
+      }
       
       String method = mi.getMethod().getName();
       if (trace)
@@ -140,10 +128,6 @@
          return delivery(mi);
    }
    
-   // Package Protected ---------------------------------------------
-   
-   // Protected -----------------------------------------------------
-   
    /**
     * Release this message endpoint.
     * 
@@ -159,7 +143,7 @@
          log.trace("MessageEndpoint " + getProxyString(mi) + " released");
       
       // Tidyup any outstanding delivery
-      if (oldClassLoader != null)
+      if (getOldClassLoader() != null)
       {
          try
          {
@@ -186,8 +170,11 @@
 
       // Set the classloader
       MessageDrivenContainer container = getContainer(mi);
-      oldClassLoader = GetTCLAction.getContextClassLoader(inUseThread);
-      SetTCLAction.setContextClassLoader(inUseThread, container.getClassLoader());
+      synchronized (this)
+      {
+         oldClassLoader = GetTCLAction.getContextClassLoader(inUseThread);
+         SetTCLAction.setContextClassLoader(inUseThread, container.getClassLoader());
+      }
       if (trace)
          log.trace("MessageEndpoint " + getProxyString(mi) + " set context classloader to " + container.getClassLoader());
 
@@ -242,23 +229,22 @@
    protected Object delivery(Invocation mi) throws Throwable
    {
       // Have we already delivered a message?
-      if (delivered)
+      if (delivered.get())
          throw new IllegalStateException("Multiple message delivery between before and after delivery is not allowed for message endpoint " + getProxyString(mi));
 
       if (trace)
          log.trace("MessageEndpoint " + getProxyString(mi) + " delivering");
       
       // Mark delivery if beforeDelivery was invoked
-      if (oldClassLoader != null)
-         delivered = true;
-
+      if (getOldClassLoader() != null)
+         delivered.set(true);
      
       MessageDrivenContainer container = getContainer(mi);
       boolean commit = true;
       try
       {
          // Check for starting a transaction
-         if (oldClassLoader == null)
+         if (getOldClassLoader() == null)
             startTransaction("delivery", mi, container);
          return getNext().invoke(mi);
       }
@@ -268,6 +254,7 @@
             log.trace("MessageEndpoint " + getProxyString(mi) + " delivery error", t);
          if (t instanceof Error || t instanceof RuntimeException)
          {
+            Transaction transaction = getTransaction();
             if (transaction != null)
                transaction.setRollbackOnly();
             commit = false;
@@ -277,7 +264,7 @@
       finally
       {
          // No before/after delivery, end any transaction and release the lock
-         if (oldClassLoader == null)
+         if (getOldClassLoader() == null)
          {
             try
             {
@@ -310,7 +297,7 @@
       {
          setBeforeDeliveryInvoke(false);
          // Reset delivered flag
-         delivered = false;
+         delivered.set(false);
          // Change back to the original context classloader
          resetContextClassLoader(mi);
          // We no longer hold the lock
@@ -348,7 +335,11 @@
 
       // Get the transaction status
       TransactionManager tm = container.getTransactionManager();
-      suspended = tm.suspend();
+      Transaction tx = tm.suspend();
+      synchronized (this)
+      {
+         suspended = tx;
+      }
 
       if (trace)
          log.trace("MessageEndpoint " + getProxyString(mi) + " " + context + " currentTx=" + suspended);
@@ -360,7 +351,11 @@
          if (suspended == null)
          {
             tm.begin();
-            transaction = tm.getTransaction();
+            tx = tm.getTransaction();
+            synchronized (this)
+            {
+               transaction = tx;
+            }
             if (trace)
                log.trace("MessageEndpoint " + getProxyString(mi) + " started transaction=" + transaction);
       
@@ -381,7 +376,10 @@
             }
             finally
             {
-               suspended = null;
+               synchronized (this)
+               {
+                  suspended = null;
+               }
                if (trace)
                   log.trace("MessageEndpoint " + getProxyString(mi) + " transaction=" + suspended + " already active, IGNORED=" + resource);
             }
@@ -403,6 +401,7 @@
       try
       {
          // If we started the transaction, commit it
+         Transaction transaction = getTransaction();
          if (transaction != null)
          {
             tm = getContainer(mi).getTransactionManager();
@@ -437,6 +436,7 @@
          }
 
          // If we suspended the incoming transaction, resume it
+         Transaction suspended = getSuspended();
          if (suspended != null)
          {
             try
@@ -446,12 +446,20 @@
             }
             finally
             {
-               suspended = null;
+               synchronized (this)
+               {
+                  this.suspended = null;
+               }
             }
          }
       }
       finally
       {
+         synchronized (this)
+         {
+            transaction = null;
+         }
+      
          // Resume any suspended transaction
          if (currentTx != null)
          {
@@ -462,7 +470,6 @@
             catch (Throwable t)
             {
                log.warn("MessageEndpoint " + getProxyString(mi) + " failed to resume old transaction " + currentTx);
-               
             }
          }
       }
@@ -475,10 +482,13 @@
     */
    protected void resetContextClassLoader(Invocation mi)
    {
-      if (trace)
-         log.trace("MessageEndpoint " + getProxyString(mi) + " reset classloader " + oldClassLoader);
-      SetTCLAction.setContextClassLoader(inUseThread, oldClassLoader);
-      oldClassLoader = null;
+      synchronized (this)
+      {
+         if (trace)
+            log.trace("MessageEndpoint " + getProxyString(mi) + " reset classloader " + oldClassLoader);
+         SetTCLAction.setContextClassLoader(inUseThread, oldClassLoader);
+         oldClassLoader = null;
+      }
    }
    
    protected void setBeforeDeliveryInvoke(boolean bdi)
@@ -499,9 +509,12 @@
     */
    protected void releaseThreadLock(Invocation mi)
    {
-      if (trace)
-         log.trace("MessageEndpoint " + getProxyString(mi) + " no longer in use by " + inUseThread);
-      inUseThread = null;
+      synchronized (this)
+      {
+         if (trace)
+            log.trace("MessageEndpoint " + getProxyString(mi) + " no longer in use by " + inUseThread);
+         inUseThread = null;
+      }
    }
    
    /**
@@ -520,26 +533,45 @@
    /**
     * Get the message endpoint factory
     *
+    * @param mi the invocation
     * @return the message endpoint factory
     */
    protected JBossMessageEndpointFactory getMessageEndpointFactory(Invocation mi)
    {
       if (endpointFactory == null)
          endpointFactory = (JBossMessageEndpointFactory) mi.getInvocationContext().getValue(MESSAGE_ENDPOINT_FACTORY);
+      if (endpointFactory == null)
+         throw new IllegalStateException("No message endpoint factory in " + mi.getInvocationContext().context);
       return endpointFactory;
    }
    
    /**
     * Get the container
     *
+    * @param mi the invocation
     * @return the container
     */
    protected MessageDrivenContainer getContainer(Invocation mi)
    {
-      return getMessageEndpointFactory(mi).getContainer();
+      JBossMessageEndpointFactory messageEndpointFactory = getMessageEndpointFactory(mi);
+      MessageDrivenContainer container = messageEndpointFactory.getContainer();
+      if (container == null)
+         throw new IllegalStateException("No container associated with message endpoint factory: " + messageEndpointFactory.getServiceName());
+      return container;
    }
 
-   // Private -------------------------------------------------------
-   
-   // Inner classes -------------------------------------------------
+   protected synchronized ClassLoader getOldClassLoader()
+   {
+      return oldClassLoader;
+   }
+
+   protected synchronized Transaction getTransaction()
+   {
+      return transaction;
+   }
+
+   protected synchronized Transaction getSuspended()
+   {
+      return suspended;
+   }
 }

Modified: branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/testsuite/src/main/org/jboss/test/jca/inflow/TestMessageListener.java
===================================================================
--- branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/testsuite/src/main/org/jboss/test/jca/inflow/TestMessageListener.java	2009-09-04 18:27:00 UTC (rev 93218)
+++ branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/testsuite/src/main/org/jboss/test/jca/inflow/TestMessageListener.java	2009-09-04 18:33:23 UTC (rev 93219)
@@ -30,4 +30,5 @@
 public interface TestMessageListener
 {
    void deliverMessage(TestMessage message);
+   void deliverMessageNoTransaction(TestMessage message);
 }

Modified: branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/testsuite/src/main/org/jboss/test/jca/inflow/TestResourceAdapterInflow.java
===================================================================
--- branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/testsuite/src/main/org/jboss/test/jca/inflow/TestResourceAdapterInflow.java	2009-09-04 18:27:00 UTC (rev 93218)
+++ branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/testsuite/src/main/org/jboss/test/jca/inflow/TestResourceAdapterInflow.java	2009-09-04 18:33:23 UTC (rev 93219)
@@ -62,5 +62,9 @@
       ((TestMessageListener) endpoint).deliverMessage(message);
       if (message.acknowledged == false)
          throw new Exception("MDB did not acknowledge the message");
+      message = new TestMessage();
+      ((TestMessageListener) endpoint).deliverMessageNoTransaction(message);
+      if (message.acknowledged == false)
+         throw new Exception("MDB did not acknowledge the message");
    }
 }

Modified: branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/testsuite/src/main/org/jboss/test/jca/inflowmdb/TestMDBMessageListener.java
===================================================================
--- branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/testsuite/src/main/org/jboss/test/jca/inflowmdb/TestMDBMessageListener.java	2009-09-04 18:27:00 UTC (rev 93218)
+++ branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/testsuite/src/main/org/jboss/test/jca/inflowmdb/TestMDBMessageListener.java	2009-09-04 18:33:23 UTC (rev 93219)
@@ -42,6 +42,11 @@
    {
       message.acknowledge();
    }
+   
+   public void deliverMessageNoTransaction(TestMessage message)
+   {
+      deliverMessage(message);
+   }
 
    public void ejbCreate()
    {

Modified: branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/testsuite/src/resources/jca/inflowmdb/META-INF/ejb-jar.xml
===================================================================
--- branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/testsuite/src/resources/jca/inflowmdb/META-INF/ejb-jar.xml	2009-09-04 18:27:00 UTC (rev 93218)
+++ branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/testsuite/src/resources/jca/inflowmdb/META-INF/ejb-jar.xml	2009-09-04 18:33:23 UTC (rev 93219)
@@ -41,37 +41,6 @@
         <transaction-type>Container</transaction-type>
       </message-driven>
 
-<!--
-      <message-driven>
-         <description>An MDB that accepts mail messages</description>
-         <ejb-name>MailMDB</ejb-name>
-         <ejb-class>org.jboss.test.jca.inflowmdb.TestJavaMailMDB</ejb-class>
-         <messaging-type>org.jboss.resource.adapter.mail.inflow.MailListener</messaging-type>
-         <activation-config>
-            <activation-config-property>
-               <activation-config-property-name>mailServer</activation-config-property-name>
-               <activation-config-property-value>${mailhost:mailhost}</activation-config-property-value>
-            </activation-config-property>
-            <activation-config-property>
-               <activation-config-property-name>mailFolder</activation-config-property-name>
-               <activation-config-property-value>INBOX</activation-config-property-value>
-            </activation-config-property>
-            <activation-config-property>
-               <activation-config-property-name>storeProtocol</activation-config-property-name>
-               <activation-config-property-value>imap</activation-config-property-value>
-            </activation-config-property>
-            <activation-config-property>
-               <activation-config-property-name>userName</activation-config-property-name>
-               <activation-config-property-value>jduke</activation-config-property-value>
-            </activation-config-property>
-            <activation-config-property>
-               <activation-config-property-name>password</activation-config-property-name>
-               <activation-config-property-value>theduke</activation-config-property-value>
-            </activation-config-property>
-        </activation-config>
-        <transaction-type>Container</transaction-type>
-      </message-driven>
--->
    </enterprise-beans>
 
    <assembly-descriptor>
@@ -83,6 +52,13 @@
          </method>
          <trans-attribute>Required</trans-attribute>
       </container-transaction>
+      <container-transaction>
+         <method>
+            <ejb-name>TestMDB</ejb-name>
+            <method-name>deliverMessageNoTransaction</method-name>
+         </method>
+         <trans-attribute>NotSupported</trans-attribute>
+      </container-transaction>
 
    </assembly-descriptor>
 




More information about the jboss-cvs-commits mailing list