Author: dallen6
Date: 2009-03-04 10:24:10 -0500 (Wed, 04 Mar 2009)
New Revision: 1760
Added:
tck/trunk/impl/src/main/java/org/jboss/jsr299/tck/tests/event/AsynchronousObserver.java
Modified:
ri/trunk/webbeans-ri/src/main/java/org/jboss/webbeans/ManagerImpl.java
ri/trunk/webbeans-ri/src/main/java/org/jboss/webbeans/event/DeferredEventNotification.java
ri/trunk/webbeans-ri/src/main/java/org/jboss/webbeans/event/ObserverImpl.java
ri/trunk/webbeans-ri/src/main/java/org/jboss/webbeans/event/TransactionalObserverImpl.java
ri/trunk/webbeans-ri/src/main/java/org/jboss/webbeans/introspector/AnnotatedMethod.java
tck/trunk/impl/src/main/java/org/jboss/jsr299/tck/tests/event/EventTest.java
tck/trunk/impl/src/main/java/org/jboss/jsr299/tck/tests/event/transactionalObservers/Pomeranian.java
Log:
Added support for asynchronously executed observers.
Modified: ri/trunk/webbeans-ri/src/main/java/org/jboss/webbeans/ManagerImpl.java
===================================================================
--- ri/trunk/webbeans-ri/src/main/java/org/jboss/webbeans/ManagerImpl.java 2009-03-04
10:41:38 UTC (rev 1759)
+++ ri/trunk/webbeans-ri/src/main/java/org/jboss/webbeans/ManagerImpl.java 2009-03-04
15:24:10 UTC (rev 1760)
@@ -33,6 +33,9 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import javax.context.Context;
import javax.context.ContextNotActiveException;
@@ -91,8 +94,6 @@
private static final long serialVersionUID = 3021562879133838561L;
- private static final Annotation[] EMPTY_ANNOTATION_ARRAY = new Annotation[0];
-
// The JNDI key to place the manager under
public static final String JNDI_KEY = "java:comp/Manager";
@@ -101,6 +102,9 @@
// The Web Beans event manager
private transient final EventManager eventManager;
+ // An executor service for asynchronous tasks
+ private transient final ExecutorService taskExecutor =
Executors.newSingleThreadExecutor();
+
// An injection point metadata beans factory
private transient final ThreadLocal<InjectionPoint> currentInjectionPoint;
@@ -591,6 +595,7 @@
*
* @see javax.inject.manager.Manager#getInstance(javax.inject.manager.Bean)
*/
+ @SuppressWarnings("unchecked")
private <T> T getInstance(Bean<T> bean, CreationalContextImpl<T>
creationalContext)
{
if (specializedBeans.containsKey(bean))
@@ -619,6 +624,7 @@
return this.<T>getInstanceToInject(injectionPoint, null);
}
+ @SuppressWarnings("unchecked")
public <T> T getInstanceToInject(InjectionPoint injectionPoint,
CreationalContext<?> creationalContext)
{
boolean registerInjectionPoint =
!injectionPoint.getType().equals(InjectionPoint.class);
@@ -889,6 +895,12 @@
return resourceLoader;
}
+ /**
+ * Provides access to the transaction services provided by the container
+ * or application server.
+ *
+ * @return a TransactionServices provider per the SPI
+ */
public TransactionServices getTransactionServices()
{
return transactionServices;
@@ -922,4 +934,36 @@
return CurrentManager.rootManager();
}
+ /**
+ * Provides access to the executor service used for asynchronous tasks.
+ *
+ * @return the ExecutorService for this manager
+ */
+ public ExecutorService getTaskExecutor()
+ {
+ return taskExecutor;
+ }
+
+ @Override
+ protected void finalize() throws Throwable
+ {
+ taskExecutor.shutdown();
+ try {
+ // Wait a while for existing tasks to terminate
+ if (!taskExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
+ taskExecutor.shutdownNow(); // Cancel currently executing tasks
+ // Wait a while for tasks to respond to being cancelled
+ if (!taskExecutor.awaitTermination(60, TimeUnit.SECONDS))
+ {
+ // Log the error here
+ }
+ }
+ } catch (InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted
+ taskExecutor.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
+
}
Modified:
ri/trunk/webbeans-ri/src/main/java/org/jboss/webbeans/event/DeferredEventNotification.java
===================================================================
---
ri/trunk/webbeans-ri/src/main/java/org/jboss/webbeans/event/DeferredEventNotification.java 2009-03-04
10:41:38 UTC (rev 1759)
+++
ri/trunk/webbeans-ri/src/main/java/org/jboss/webbeans/event/DeferredEventNotification.java 2009-03-04
15:24:10 UTC (rev 1760)
@@ -17,7 +17,6 @@
package org.jboss.webbeans.event;
-import javax.event.Observer;
/**
* A task that will notify the observer of a specific event at some
@@ -28,7 +27,7 @@
public class DeferredEventNotification<T> implements Runnable
{
// The observer
- private Observer<T> observer;
+ private ObserverImpl<T> observer;
// The event object
private T event;
@@ -38,7 +37,7 @@
* @param observer The observer to be notified
* @param event The event being fired
*/
- public DeferredEventNotification(T event, Observer<T> observer)
+ public DeferredEventNotification(T event, ObserverImpl<T> observer)
{
this.observer = observer;
this.event = event;
@@ -46,6 +45,6 @@
public void run()
{
- observer.notify(event);
+ observer.sendEvent(event);
}
}
Modified: ri/trunk/webbeans-ri/src/main/java/org/jboss/webbeans/event/ObserverImpl.java
===================================================================
---
ri/trunk/webbeans-ri/src/main/java/org/jboss/webbeans/event/ObserverImpl.java 2009-03-04
10:41:38 UTC (rev 1759)
+++
ri/trunk/webbeans-ri/src/main/java/org/jboss/webbeans/event/ObserverImpl.java 2009-03-04
15:24:10 UTC (rev 1760)
@@ -18,14 +18,10 @@
package org.jboss.webbeans.event;
import java.lang.annotation.Annotation;
-import java.util.ArrayList;
import java.util.List;
import javax.context.Dependent;
-import javax.event.AfterTransactionCompletion;
-import javax.event.AfterTransactionFailure;
-import javax.event.AfterTransactionSuccess;
-import javax.event.BeforeTransactionCompletion;
+import javax.event.Asynchronously;
import javax.event.IfExists;
import javax.event.Observer;
import javax.event.ObserverException;
@@ -35,11 +31,8 @@
import javax.inject.Initializer;
import javax.inject.Produces;
import javax.inject.manager.Bean;
-import javax.transaction.Status;
-import javax.transaction.SystemException;
import org.jboss.webbeans.ManagerImpl;
-import org.jboss.webbeans.bean.AbstractClassBean;
import org.jboss.webbeans.bean.RIBean;
import org.jboss.webbeans.context.DependentContext;
import org.jboss.webbeans.context.DependentInstancesStore;
@@ -47,8 +40,6 @@
import org.jboss.webbeans.injection.MethodInjectionPoint;
import org.jboss.webbeans.introspector.AnnotatedMethod;
import org.jboss.webbeans.introspector.AnnotatedParameter;
-import org.jboss.webbeans.transaction.UserTransaction;
-import org.jboss.webbeans.transaction.spi.TransactionServices;
import org.jboss.webbeans.util.Reflections;
/**
@@ -67,6 +58,7 @@
protected final Bean<?> observerBean;
protected final MethodInjectionPoint<?> observerMethod;
private final boolean conditional;
+ private final boolean asynchronous;
protected ManagerImpl manager;
private final Class<T> eventType;
private final Annotation[] bindings;
@@ -92,6 +84,7 @@
this.bindings =
observerMethod.getAnnotatedParameters(Observes.class).get(0).getBindingsAsArray();
this.conditional =
!observerMethod.getAnnotatedParameters(IfExists.class).isEmpty();
+ this.asynchronous =
!observerMethod.getAnnotatedParameters(Asynchronously.class).isEmpty();
init();
}
@@ -146,7 +139,14 @@
public void notify(final T event)
{
- sendEvent(event);
+ if (this.asynchronous)
+ {
+ sendEventAsynchronously(event);
+ }
+ else
+ {
+ sendEvent(event);
+ }
}
/**
@@ -182,6 +182,16 @@
}
}
+ /**
+ * Queues the event for later execution
+ * @param event
+ */
+ protected void sendEventAsynchronously(final T event)
+ {
+ DeferredEventNotification<T> deferredEvent = new
DeferredEventNotification<T>(event, this);
+ manager.getTaskExecutor().execute(deferredEvent);
+ }
+
private <B> B getInstance(Bean<B> observerBean)
{
return manager.getInstance(observerBean, !isConditional());
Modified:
ri/trunk/webbeans-ri/src/main/java/org/jboss/webbeans/event/TransactionalObserverImpl.java
===================================================================
---
ri/trunk/webbeans-ri/src/main/java/org/jboss/webbeans/event/TransactionalObserverImpl.java 2009-03-04
10:41:38 UTC (rev 1759)
+++
ri/trunk/webbeans-ri/src/main/java/org/jboss/webbeans/event/TransactionalObserverImpl.java 2009-03-04
15:24:10 UTC (rev 1760)
@@ -76,7 +76,6 @@
}
private TransactionObservationPhase transactionObservationPhase;
- private boolean deferred = false;
/**
* Tests an observer method to see if it is transactional.
@@ -115,7 +114,7 @@
@Override
public void notify(T event)
{
- if (!deferred && manager.getTransactionServices().isTransactionActive())
+ if (manager.getTransactionServices().isTransactionActive())
{
deferEvent(event);
}
@@ -168,7 +167,6 @@
{
DeferredEventNotification<T> deferredEvent = new
DeferredEventNotification<T>(event, this);
transactionObservationPhase.registerTask(manager.getTransactionServices(),
deferredEvent);
- deferred = true;
}
}
Modified:
ri/trunk/webbeans-ri/src/main/java/org/jboss/webbeans/introspector/AnnotatedMethod.java
===================================================================
---
ri/trunk/webbeans-ri/src/main/java/org/jboss/webbeans/introspector/AnnotatedMethod.java 2009-03-04
10:41:38 UTC (rev 1759)
+++
ri/trunk/webbeans-ri/src/main/java/org/jboss/webbeans/introspector/AnnotatedMethod.java 2009-03-04
15:24:10 UTC (rev 1760)
@@ -28,6 +28,7 @@
import javax.event.AfterTransactionCompletion;
import javax.event.AfterTransactionFailure;
import javax.event.AfterTransactionSuccess;
+import javax.event.Asynchronously;
import javax.event.BeforeTransactionCompletion;
import javax.event.Fires;
import javax.event.IfExists;
@@ -44,7 +45,7 @@
public interface AnnotatedMethod<T> extends AnnotatedMember<T, Method>
{
@SuppressWarnings("unchecked")
- public static final Set<Class<? extends Annotation>>
MAPPED_PARAMETER_ANNOTATIONS = new HashSet<Class<? extends
Annotation>>(Arrays.asList(Disposes.class, Observes.class, Fires.class,
IfExists.class, BeforeTransactionCompletion.class, AfterTransactionCompletion.class,
AfterTransactionFailure.class, AfterTransactionSuccess.class));
+ public static final Set<Class<? extends Annotation>>
MAPPED_PARAMETER_ANNOTATIONS = new HashSet<Class<? extends
Annotation>>(Arrays.asList(Disposes.class, Observes.class, Fires.class,
IfExists.class, BeforeTransactionCompletion.class, AfterTransactionCompletion.class,
AfterTransactionFailure.class, AfterTransactionSuccess.class, Asynchronously.class));
/**
* Gets the abstracted parameters of the method
Added:
tck/trunk/impl/src/main/java/org/jboss/jsr299/tck/tests/event/AsynchronousObserver.java
===================================================================
---
tck/trunk/impl/src/main/java/org/jboss/jsr299/tck/tests/event/AsynchronousObserver.java
(rev 0)
+++
tck/trunk/impl/src/main/java/org/jboss/jsr299/tck/tests/event/AsynchronousObserver.java 2009-03-04
15:24:10 UTC (rev 1760)
@@ -0,0 +1,24 @@
+package org.jboss.jsr299.tck.tests.event;
+
+import javax.event.Asynchronously;
+import javax.event.Observes;
+
+public class AsynchronousObserver
+{
+ private static Thread threadObservingEvent = null;
+
+ public void observes(@Observes @Asynchronously Boolean event)
+ {
+ threadObservingEvent = Thread.currentThread();
+ }
+
+ public static Thread getThreadObservingEvent()
+ {
+ return threadObservingEvent;
+ }
+
+ public static void setThreadObservingEvent(Thread threadObservingEvent)
+ {
+ AsynchronousObserver.threadObservingEvent = threadObservingEvent;
+ }
+}
Property changes on:
tck/trunk/impl/src/main/java/org/jboss/jsr299/tck/tests/event/AsynchronousObserver.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified: tck/trunk/impl/src/main/java/org/jboss/jsr299/tck/tests/event/EventTest.java
===================================================================
---
tck/trunk/impl/src/main/java/org/jboss/jsr299/tck/tests/event/EventTest.java 2009-03-04
10:41:38 UTC (rev 1759)
+++
tck/trunk/impl/src/main/java/org/jboss/jsr299/tck/tests/event/EventTest.java 2009-03-04
15:24:10 UTC (rev 1760)
@@ -372,7 +372,8 @@
@SpecAssertion(section = "7.5.7", id = "a")
public void testAsynchronousObserverIsAsynchronous()
{
- assert false;
+ getCurrentManager().fireEvent(new Boolean(true));
+ assert
!AsynchronousObserver.getThreadObservingEvent().equals(Thread.currentThread());
}
@Test(groups = { "stub", "events", "webbeansxml" })
Modified:
tck/trunk/impl/src/main/java/org/jboss/jsr299/tck/tests/event/transactionalObservers/Pomeranian.java
===================================================================
---
tck/trunk/impl/src/main/java/org/jboss/jsr299/tck/tests/event/transactionalObservers/Pomeranian.java 2009-03-04
10:41:38 UTC (rev 1759)
+++
tck/trunk/impl/src/main/java/org/jboss/jsr299/tck/tests/event/transactionalObservers/Pomeranian.java 2009-03-04
15:24:10 UTC (rev 1760)
@@ -3,13 +3,13 @@
import static javax.ejb.TransactionManagementType.BEAN;
import static javax.transaction.Status.STATUS_COMMITTED;
import static javax.transaction.Status.STATUS_NO_TRANSACTION;
-import static javax.transaction.Status.STATUS_PREPARED;
import static javax.transaction.Status.STATUS_ROLLEDBACK;
import java.math.BigInteger;
import javax.annotation.Named;
import javax.annotation.Resource;
+import javax.context.SessionScoped;
import javax.ejb.EJBException;
import javax.ejb.SessionContext;
import javax.ejb.Stateful;
@@ -27,6 +27,7 @@
@TransactionManagement(BEAN)
@Tame
@Named("Teddy")
+@SessionScoped
public class Pomeranian implements PomeranianInterface
{
@Resource