[jboss-cvs] JBossAS SVN: r93219 - in branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752: testsuite/src/main/org/jboss/test/jca/inflow and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Sep 4 14:33:23 EDT 2009
Author: jbertram at redhat.com
Date: 2009-09-04 14:33:23 -0400 (Fri, 04 Sep 2009)
New Revision: 93219
Modified:
branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/server/src/main/org/jboss/ejb/plugins/inflow/MessageEndpointInterceptor.java
branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/testsuite/src/main/org/jboss/test/jca/inflow/TestMessageListener.java
branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/testsuite/src/main/org/jboss/test/jca/inflow/TestResourceAdapterInflow.java
branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/testsuite/src/main/org/jboss/test/jca/inflowmdb/TestMDBMessageListener.java
branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/testsuite/src/resources/jca/inflowmdb/META-INF/ejb-jar.xml
Log:
[JBPAPP-2752]
Modified: branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/server/src/main/org/jboss/ejb/plugins/inflow/MessageEndpointInterceptor.java
===================================================================
--- branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/server/src/main/org/jboss/ejb/plugins/inflow/MessageEndpointInterceptor.java 2009-09-04 18:27:00 UTC (rev 93218)
+++ branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/server/src/main/org/jboss/ejb/plugins/inflow/MessageEndpointInterceptor.java 2009-09-04 18:33:23 UTC (rev 93219)
@@ -22,6 +22,7 @@
package org.jboss.ejb.plugins.inflow;
import java.lang.reflect.Method;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.resource.ResourceException;
import javax.transaction.Status;
@@ -34,9 +35,6 @@
import org.jboss.logging.Logger;
import org.jboss.proxy.Interceptor;
-
-import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
-
/**
* Implements the application server message endpoint requirements.
*
@@ -47,8 +45,6 @@
{
/** The serialVersionUID */
private static final long serialVersionUID = -8740717288847385688L;
-
- // Constants -----------------------------------------------------
/** The log */
private static final Logger log = Logger.getLogger(MessageEndpointInterceptor.class);
@@ -59,8 +55,6 @@
/** The key for the xa resource */
public static final String MESSAGE_ENDPOINT_XARESOURCE = "MessageEndpoint.XAResource";
- // Attributes ----------------------------------------------------
-
/** Whether trace is enabled */
private boolean trace = log.isTraceEnabled();
@@ -68,10 +62,10 @@
private String cachedProxyString = null;
/** Whether this proxy has been released */
- protected SynchronizedBoolean released = new SynchronizedBoolean(false);
+ protected AtomicBoolean released = new AtomicBoolean(false);
/** Whether we have delivered a message */
- protected boolean delivered = false;
+ protected AtomicBoolean delivered = new AtomicBoolean(false);
/** The in use thread */
protected Thread inUseThread = null;
@@ -91,18 +85,9 @@
/** The message endpoint factory */
private JBossMessageEndpointFactory endpointFactory;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
public MessageEndpointInterceptor()
{
}
-
- // Public --------------------------------------------------------
-
- // Interceptor implementation ------------------------------------
public Object invoke(Invocation mi) throws Throwable
{
@@ -111,10 +96,13 @@
throw new IllegalStateException("This message endpoint + " + getProxyString(mi) + " has been released");
// Concurrent invocation?
- Thread currentThread = Thread.currentThread();
- if (inUseThread != null && inUseThread.equals(currentThread) == false)
- throw new IllegalStateException("This message endpoint + " + getProxyString(mi) + " is already in use by another thread " + inUseThread);
- inUseThread = currentThread;
+ synchronized (this)
+ {
+ Thread currentThread = Thread.currentThread();
+ if (inUseThread != null && inUseThread.equals(currentThread) == false)
+ throw new IllegalStateException("This message endpoint + " + getProxyString(mi) + " is already in use by another thread " + inUseThread);
+ inUseThread = currentThread;
+ }
String method = mi.getMethod().getName();
if (trace)
@@ -140,10 +128,6 @@
return delivery(mi);
}
- // Package Protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
/**
* Release this message endpoint.
*
@@ -159,7 +143,7 @@
log.trace("MessageEndpoint " + getProxyString(mi) + " released");
// Tidyup any outstanding delivery
- if (oldClassLoader != null)
+ if (getOldClassLoader() != null)
{
try
{
@@ -186,8 +170,11 @@
// Set the classloader
MessageDrivenContainer container = getContainer(mi);
- oldClassLoader = GetTCLAction.getContextClassLoader(inUseThread);
- SetTCLAction.setContextClassLoader(inUseThread, container.getClassLoader());
+ synchronized (this)
+ {
+ oldClassLoader = GetTCLAction.getContextClassLoader(inUseThread);
+ SetTCLAction.setContextClassLoader(inUseThread, container.getClassLoader());
+ }
if (trace)
log.trace("MessageEndpoint " + getProxyString(mi) + " set context classloader to " + container.getClassLoader());
@@ -242,23 +229,22 @@
protected Object delivery(Invocation mi) throws Throwable
{
// Have we already delivered a message?
- if (delivered)
+ if (delivered.get())
throw new IllegalStateException("Multiple message delivery between before and after delivery is not allowed for message endpoint " + getProxyString(mi));
if (trace)
log.trace("MessageEndpoint " + getProxyString(mi) + " delivering");
// Mark delivery if beforeDelivery was invoked
- if (oldClassLoader != null)
- delivered = true;
-
+ if (getOldClassLoader() != null)
+ delivered.set(true);
MessageDrivenContainer container = getContainer(mi);
boolean commit = true;
try
{
// Check for starting a transaction
- if (oldClassLoader == null)
+ if (getOldClassLoader() == null)
startTransaction("delivery", mi, container);
return getNext().invoke(mi);
}
@@ -268,6 +254,7 @@
log.trace("MessageEndpoint " + getProxyString(mi) + " delivery error", t);
if (t instanceof Error || t instanceof RuntimeException)
{
+ Transaction transaction = getTransaction();
if (transaction != null)
transaction.setRollbackOnly();
commit = false;
@@ -277,7 +264,7 @@
finally
{
// No before/after delivery, end any transaction and release the lock
- if (oldClassLoader == null)
+ if (getOldClassLoader() == null)
{
try
{
@@ -310,7 +297,7 @@
{
setBeforeDeliveryInvoke(false);
// Reset delivered flag
- delivered = false;
+ delivered.set(false);
// Change back to the original context classloader
resetContextClassLoader(mi);
// We no longer hold the lock
@@ -348,7 +335,11 @@
// Get the transaction status
TransactionManager tm = container.getTransactionManager();
- suspended = tm.suspend();
+ Transaction tx = tm.suspend();
+ synchronized (this)
+ {
+ suspended = tx;
+ }
if (trace)
log.trace("MessageEndpoint " + getProxyString(mi) + " " + context + " currentTx=" + suspended);
@@ -360,7 +351,11 @@
if (suspended == null)
{
tm.begin();
- transaction = tm.getTransaction();
+ tx = tm.getTransaction();
+ synchronized (this)
+ {
+ transaction = tx;
+ }
if (trace)
log.trace("MessageEndpoint " + getProxyString(mi) + " started transaction=" + transaction);
@@ -381,7 +376,10 @@
}
finally
{
- suspended = null;
+ synchronized (this)
+ {
+ suspended = null;
+ }
if (trace)
log.trace("MessageEndpoint " + getProxyString(mi) + " transaction=" + suspended + " already active, IGNORED=" + resource);
}
@@ -403,6 +401,7 @@
try
{
// If we started the transaction, commit it
+ Transaction transaction = getTransaction();
if (transaction != null)
{
tm = getContainer(mi).getTransactionManager();
@@ -437,6 +436,7 @@
}
// If we suspended the incoming transaction, resume it
+ Transaction suspended = getSuspended();
if (suspended != null)
{
try
@@ -446,12 +446,20 @@
}
finally
{
- suspended = null;
+ synchronized (this)
+ {
+ this.suspended = null;
+ }
}
}
}
finally
{
+ synchronized (this)
+ {
+ transaction = null;
+ }
+
// Resume any suspended transaction
if (currentTx != null)
{
@@ -462,7 +470,6 @@
catch (Throwable t)
{
log.warn("MessageEndpoint " + getProxyString(mi) + " failed to resume old transaction " + currentTx);
-
}
}
}
@@ -475,10 +482,13 @@
*/
protected void resetContextClassLoader(Invocation mi)
{
- if (trace)
- log.trace("MessageEndpoint " + getProxyString(mi) + " reset classloader " + oldClassLoader);
- SetTCLAction.setContextClassLoader(inUseThread, oldClassLoader);
- oldClassLoader = null;
+ synchronized (this)
+ {
+ if (trace)
+ log.trace("MessageEndpoint " + getProxyString(mi) + " reset classloader " + oldClassLoader);
+ SetTCLAction.setContextClassLoader(inUseThread, oldClassLoader);
+ oldClassLoader = null;
+ }
}
protected void setBeforeDeliveryInvoke(boolean bdi)
@@ -499,9 +509,12 @@
*/
protected void releaseThreadLock(Invocation mi)
{
- if (trace)
- log.trace("MessageEndpoint " + getProxyString(mi) + " no longer in use by " + inUseThread);
- inUseThread = null;
+ synchronized (this)
+ {
+ if (trace)
+ log.trace("MessageEndpoint " + getProxyString(mi) + " no longer in use by " + inUseThread);
+ inUseThread = null;
+ }
}
/**
@@ -520,26 +533,45 @@
/**
* Get the message endpoint factory
*
+ * @param mi the invocation
* @return the message endpoint factory
*/
protected JBossMessageEndpointFactory getMessageEndpointFactory(Invocation mi)
{
if (endpointFactory == null)
endpointFactory = (JBossMessageEndpointFactory) mi.getInvocationContext().getValue(MESSAGE_ENDPOINT_FACTORY);
+ if (endpointFactory == null)
+ throw new IllegalStateException("No message endpoint factory in " + mi.getInvocationContext().context);
return endpointFactory;
}
/**
* Get the container
*
+ * @param mi the invocation
* @return the container
*/
protected MessageDrivenContainer getContainer(Invocation mi)
{
- return getMessageEndpointFactory(mi).getContainer();
+ JBossMessageEndpointFactory messageEndpointFactory = getMessageEndpointFactory(mi);
+ MessageDrivenContainer container = messageEndpointFactory.getContainer();
+ if (container == null)
+ throw new IllegalStateException("No container associated with message endpoint factory: " + messageEndpointFactory.getServiceName());
+ return container;
}
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
+ protected synchronized ClassLoader getOldClassLoader()
+ {
+ return oldClassLoader;
+ }
+
+ protected synchronized Transaction getTransaction()
+ {
+ return transaction;
+ }
+
+ protected synchronized Transaction getSuspended()
+ {
+ return suspended;
+ }
}
Modified: branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/testsuite/src/main/org/jboss/test/jca/inflow/TestMessageListener.java
===================================================================
--- branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/testsuite/src/main/org/jboss/test/jca/inflow/TestMessageListener.java 2009-09-04 18:27:00 UTC (rev 93218)
+++ branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/testsuite/src/main/org/jboss/test/jca/inflow/TestMessageListener.java 2009-09-04 18:33:23 UTC (rev 93219)
@@ -30,4 +30,5 @@
public interface TestMessageListener
{
void deliverMessage(TestMessage message);
+ void deliverMessageNoTransaction(TestMessage message);
}
Modified: branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/testsuite/src/main/org/jboss/test/jca/inflow/TestResourceAdapterInflow.java
===================================================================
--- branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/testsuite/src/main/org/jboss/test/jca/inflow/TestResourceAdapterInflow.java 2009-09-04 18:27:00 UTC (rev 93218)
+++ branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/testsuite/src/main/org/jboss/test/jca/inflow/TestResourceAdapterInflow.java 2009-09-04 18:33:23 UTC (rev 93219)
@@ -62,5 +62,9 @@
((TestMessageListener) endpoint).deliverMessage(message);
if (message.acknowledged == false)
throw new Exception("MDB did not acknowledge the message");
+ message = new TestMessage();
+ ((TestMessageListener) endpoint).deliverMessageNoTransaction(message);
+ if (message.acknowledged == false)
+ throw new Exception("MDB did not acknowledge the message");
}
}
Modified: branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/testsuite/src/main/org/jboss/test/jca/inflowmdb/TestMDBMessageListener.java
===================================================================
--- branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/testsuite/src/main/org/jboss/test/jca/inflowmdb/TestMDBMessageListener.java 2009-09-04 18:27:00 UTC (rev 93218)
+++ branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/testsuite/src/main/org/jboss/test/jca/inflowmdb/TestMDBMessageListener.java 2009-09-04 18:33:23 UTC (rev 93219)
@@ -42,6 +42,11 @@
{
message.acknowledge();
}
+
+ public void deliverMessageNoTransaction(TestMessage message)
+ {
+ deliverMessage(message);
+ }
public void ejbCreate()
{
Modified: branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/testsuite/src/resources/jca/inflowmdb/META-INF/ejb-jar.xml
===================================================================
--- branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/testsuite/src/resources/jca/inflowmdb/META-INF/ejb-jar.xml 2009-09-04 18:27:00 UTC (rev 93218)
+++ branches/JBPAPP_4_3_0_GA_CP06_JBPAPP-2752/testsuite/src/resources/jca/inflowmdb/META-INF/ejb-jar.xml 2009-09-04 18:33:23 UTC (rev 93219)
@@ -41,37 +41,6 @@
<transaction-type>Container</transaction-type>
</message-driven>
-<!--
- <message-driven>
- <description>An MDB that accepts mail messages</description>
- <ejb-name>MailMDB</ejb-name>
- <ejb-class>org.jboss.test.jca.inflowmdb.TestJavaMailMDB</ejb-class>
- <messaging-type>org.jboss.resource.adapter.mail.inflow.MailListener</messaging-type>
- <activation-config>
- <activation-config-property>
- <activation-config-property-name>mailServer</activation-config-property-name>
- <activation-config-property-value>${mailhost:mailhost}</activation-config-property-value>
- </activation-config-property>
- <activation-config-property>
- <activation-config-property-name>mailFolder</activation-config-property-name>
- <activation-config-property-value>INBOX</activation-config-property-value>
- </activation-config-property>
- <activation-config-property>
- <activation-config-property-name>storeProtocol</activation-config-property-name>
- <activation-config-property-value>imap</activation-config-property-value>
- </activation-config-property>
- <activation-config-property>
- <activation-config-property-name>userName</activation-config-property-name>
- <activation-config-property-value>jduke</activation-config-property-value>
- </activation-config-property>
- <activation-config-property>
- <activation-config-property-name>password</activation-config-property-name>
- <activation-config-property-value>theduke</activation-config-property-value>
- </activation-config-property>
- </activation-config>
- <transaction-type>Container</transaction-type>
- </message-driven>
--->
</enterprise-beans>
<assembly-descriptor>
@@ -83,6 +52,13 @@
</method>
<trans-attribute>Required</trans-attribute>
</container-transaction>
+ <container-transaction>
+ <method>
+ <ejb-name>TestMDB</ejb-name>
+ <method-name>deliverMessageNoTransaction</method-name>
+ </method>
+ <trans-attribute>NotSupported</trans-attribute>
+ </container-transaction>
</assembly-descriptor>
More information about the jboss-cvs-commits
mailing list