Author: sergiykarpenko
Date: 2010-07-26 09:21:09 -0400 (Mon, 26 Jul 2010)
New Revision: 2812
Added:
kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/listener/Asynchronous.java
kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/listener/ListenerThreadFactory.java
kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/services/listener/TestAsynchronousListener.java
Modified:
kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/listener/ListenerService.java
Log:
EXOJCR-311: Make broadcasting of events of ListenerService asynchronous - implemented,
test added
Added:
kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/listener/Asynchronous.java
===================================================================
---
kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/listener/Asynchronous.java
(rev 0)
+++
kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/listener/Asynchronous.java 2010-07-26
13:21:09 UTC (rev 2812)
@@ -0,0 +1,36 @@
+/*
+ * Copyright (C) 2003-2010 eXo Platform SAS.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Affero General Public License
+ * as published by the Free Software Foundation; either version 3
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not,
see<http://www.gnu.org/licenses/>.
+ */
+package org.exoplatform.services.listener;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * This marker annotation shows that Listener must be executed in asynchronous way.
+ * ListenerService will execute asynchronous-marked listener in dedicated thread.
+ *
+ * Created by The eXo Platform SAS.
+ *
+ * @author <a href="karpenko.sergiy(a)gmail.com">Karpenko Sergiy</a>
+ * @version $Id: Asynchronous.java 111 2008-11-11 11:11:11Z serg $
+ */
+(a)Target(ElementType.TYPE)
+(a)Retention(RetentionPolicy.RUNTIME)
+public @interface Asynchronous {
+}
Modified:
kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/listener/ListenerService.java
===================================================================
---
kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/listener/ListenerService.java 2010-07-26
12:00:42 UTC (rev 2811)
+++
kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/listener/ListenerService.java 2010-07-26
13:21:09 UTC (rev 2812)
@@ -18,13 +18,17 @@
*/
package org.exoplatform.services.listener;
+import org.exoplatform.container.xml.InitParams;
import org.exoplatform.services.log.ExoLogger;
import org.exoplatform.services.log.Log;
+import org.exoplatform.services.naming.InitialContextInitializer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
/**
* Created by The eXo Platform SAS Author : Nhu Dinh Thuan
@@ -32,29 +36,73 @@
*/
public class ListenerService
{
+ /**
+ * This executor used for asynchronously event broadcast.
+ */
+ private final Executor executor;
+ /**
+ * Listeners by name map.
+ */
private Map<String, List<Listener>> listeners_;
private static Log log =
ExoLogger.getLogger("exo.kernel.component.common.ListenerService");
-
+
/**
* Construct a listener service.
*/
public ListenerService()
{
listeners_ = new HashMap<String, List<Listener>>();
+ executor = Executors.newFixedThreadPool(1, new ListenerThreadFactory());
}
/**
+ * Construct a listener service.
+ */
+ public ListenerService(InitialContextInitializer initializer, InitParams params)
+ {
+ listeners_ = new HashMap<String, List<Listener>>();
+ int poolSize = 1;
+
+ if (params != null)
+ {
+ if (params.getValueParam("asynchPoolSize") != null)
+ {
+
+ poolSize =
Integer.parseInt(params.getValueParam("asynchPoolSize").getValue());
+ }
+ }
+ executor = Executors.newFixedThreadPool(poolSize, new ListenerThreadFactory());
+ }
+
+ /**
* This method is used to register a listener with the service. The method
* should: 1. Check to see if there is a list of listener with the listener
* name, create one if the listener list doesn't exit 2. Add the new listener
- * to the listener list
+ * to the listener list.
*
* @param listener
*/
public void addListener(Listener listener)
{
+ // Check is Listener or its superclass asynchronous, if so - wrap it in
AsynchronousListener.
+ Class listenerClass = listener.getClass();
+
+ do
+ {
+ if (listenerClass.isAnnotationPresent(Asynchronous.class))
+ {
+ listener = new AsynchronousListener(listener);
+ break;
+ }
+ else
+ {
+ listenerClass = listenerClass.getSuperclass();
+ }
+ }
+ while (listenerClass != null);
+
String name = listener.getName();
List<Listener> list = listeners_.get(name);
if (list == null)
@@ -101,7 +149,17 @@
{
log.debug("broadcasting event " + name + " on " +
listener.getName());
}
- listener.onEvent(new Event<S, D>(name, source, data));
+
+ try
+ {
+ listener.onEvent(new Event<S, D>(name, source, data));
+ }
+ catch (Exception e)
+ {
+ // log exception and keep broadcast events
+ log.error("Exception on broadcasting events occures: " +
e.getMessage(), e.getCause());
+ log.info("Exception occures but keep broadcast events.");
+ }
}
}
@@ -121,8 +179,98 @@
{
List<Listener> list = listeners_.get(event.getEventName());
if (list == null)
+ {
return;
+ }
for (Listener listener : list)
- listener.onEvent(event);
+ {
+ try
+ {
+ listener.onEvent(event);
+ }
+ catch (Exception e)
+ {
+ // log exception and keep broadcast events
+ log.error("Exception on broadcasting events occures: " +
e.getMessage(), e.getCause());
+ log.info("Exception occures but keep broadcast events.");
+ }
+ }
}
+
+ /**
+ * This AsynchronousListener is a wrapper for original listener, that
+ * executes wrapped listeners onEvent() in separate thread.
+ */
+ protected class AsynchronousListener<S, D> extends Listener<S, D>
+ {
+ private Listener<S, D> listener;
+
+ public AsynchronousListener(Listener<S, D> listener)
+ {
+ this.listener = listener;
+ }
+
+ @Override
+ public String getName()
+ {
+ return listener.getName();
+ }
+
+ @Override
+ public void setName(String s)
+ {
+ listener.setName(s);
+ }
+
+ @Override
+ public String getDescription()
+ {
+ return listener.getDescription();
+ }
+
+ @Override
+ public void setDescription(String s)
+ {
+ listener.setDescription(s);
+ }
+
+ @Override
+ public void onEvent(Event<S, D> event) throws Exception
+ {
+ executor.execute(new RunListener<S, D>(listener, event));
+ }
+ }
+
+ /**
+ * This thread executes listener.onEvent(event) method.
+ */
+ protected class RunListener<S, D> implements Runnable
+ {
+ private Listener<S, D> listener;
+
+ private Event<S, D> event;
+
+ public RunListener(Listener<S, D> listener, Event<S, D> event)
+ {
+ this.listener = listener;
+ this.event = event;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void run()
+ {
+ try
+ {
+ listener.onEvent(event);
+ }
+ catch (Exception e)
+ {
+ // Do not throw exception. Event is asynchronous so just report error.
+ // Must say that exception will be ignored even in synchronous events.
+ log.error("Exception on broadcasting events occures: " +
e.getMessage(), e.getCause());
+ }
+ }
+ }
}
Added:
kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/listener/ListenerThreadFactory.java
===================================================================
---
kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/listener/ListenerThreadFactory.java
(rev 0)
+++
kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/listener/ListenerThreadFactory.java 2010-07-26
13:21:09 UTC (rev 2812)
@@ -0,0 +1,62 @@
+/*
+ * Copyright (C) 2003-2010 eXo Platform SAS.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Affero General Public License
+ * as published by the Free Software Foundation; either version 3
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not,
see<http://www.gnu.org/licenses/>.
+ */
+package org.exoplatform.services.listener;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This ThreadFactory implementation is a copy of Executors$DefaultThreadFactory.
+ * Its copied for single purpose - make readable thread name.
+ *
+ * Created by The eXo Platform SAS.
+ *
+ * <br/>Date:
+ *
+ * @author <a href="karpenko.sergiy(a)gmail.com">Karpenko Sergiy</a>
+ * @version $Id: ListenerThreadFactory.java 111 2008-11-11 11:11:11Z serg $
+ */
+public class ListenerThreadFactory implements ThreadFactory
+{
+ static final AtomicInteger poolNumber = new AtomicInteger(1);
+
+ final ThreadGroup group;
+
+ final AtomicInteger threadNumber = new AtomicInteger(1);
+
+ final String namePrefix;
+
+ ListenerThreadFactory()
+ {
+ SecurityManager s = System.getSecurityManager();
+ group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
+ namePrefix = "asynch-event-" + poolNumber.getAndIncrement() +
"-thread-";
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public Thread newThread(Runnable r)
+ {
+ Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
+ if (t.isDaemon())
+ t.setDaemon(false);
+ if (t.getPriority() != Thread.NORM_PRIORITY)
+ t.setPriority(Thread.NORM_PRIORITY);
+ return t;
+ }
+}
Added:
kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/services/listener/TestAsynchronousListener.java
===================================================================
---
kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/services/listener/TestAsynchronousListener.java
(rev 0)
+++
kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/services/listener/TestAsynchronousListener.java 2010-07-26
13:21:09 UTC (rev 2812)
@@ -0,0 +1,249 @@
+/*
+ * Copyright (C) 2003-2010 eXo Platform SAS.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Affero General Public License
+ * as published by the Free Software Foundation; either version 3
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not,
see<http://www.gnu.org/licenses/>.
+ */
+package org.exoplatform.services.listener;
+
+import org.exoplatform.container.PortalContainer;
+import org.exoplatform.test.BasicTestCase;
+
+/**
+ * Created by The eXo Platform SAS.
+ *
+ * <br/>Date:
+ *
+ * @author <a href="karpenko.sergiy(a)gmail.com">Karpenko Sergiy</a>
+ * @version $Id: TestAsynchronousListener.java 111 2008-11-11 11:11:11Z serg $
+ */
+public class TestAsynchronousListener extends BasicTestCase
+{
+ private ListenerService service_;
+
+ public void setUp() throws Exception
+ {
+ setTestNumber(1);
+ PortalContainer manager = PortalContainer.getInstance();
+ service_ =
(ListenerService)manager.getComponentInstanceOfType(ListenerService.class);
+ }
+
+ public void testAsynchronousListener() throws Exception
+ {
+ final String listenerName = "test_asynch";
+ final String baseString = "Value not changed";
+
+ assertTrue(service_ != null);
+ Listener<Object, StrValue> listener = new AsynchListener();
+ listener.setName(listenerName);
+ listener.setDescription("Asynchronous listener");
+
+ service_.addListener(listener);
+
+ StrValue testValue = new StrValue(baseString);
+
+ service_.broadcast(listenerName, new Object(), testValue);
+
+ // if asynch enabled value must be changed later so it's same exact after
listener
+ // broadcasting
+ assertEquals(baseString, testValue.getValue());
+ }
+
+ public void testParentAsynchListener() throws Exception
+ {
+ final String listenerName = "test_parent_asynch";
+ final String baseString = "Value not changed";
+
+ assertTrue(service_ != null);
+ Listener<Object, StrValue> listener = new ExtendedAsynchListener();
+ listener.setName(listenerName);
+ listener.setDescription("Asynchronous listener");
+
+ service_.addListener(listener);
+
+ StrValue testValue = new StrValue(baseString);
+
+ service_.broadcast(listenerName, new Object(), testValue);
+
+ // if asynch enabled value must be changed later so it's same exact after
listener
+ // broadcasting
+ assertEquals(baseString, testValue.getValue());
+ }
+
+ public void testSynchronousListener() throws Exception
+ {
+ final String listenerName = "test_synch";
+ final String baseString = "Value not changed";
+
+ assertTrue(service_ != null);
+ Listener<Object, StrValue> listener = new SynchListener();
+ listener.setName(listenerName);
+ listener.setDescription("Synchronous listener");
+
+ service_.addListener(listener);
+
+ StrValue testValue = new StrValue(baseString);
+
+ service_.broadcast(listenerName, null, testValue);
+
+ // if Synch enabled - broadcast must wait until all events will be processed,
+ // so value must be changed
+ assertFalse(baseString.equals(testValue.getValue()));
+ }
+
+ public void testSynchronousExeption() throws Exception
+ {
+ try
+ {
+ final String listenerName = "test_synch_exeption";
+
+ assertTrue(service_ != null);
+ Listener<Object, StrValue> listener = new SynchListenerWithException();
+ listener.setName(listenerName);
+ listener.setDescription("Synchronous listener with exception");
+
+ service_.addListener(listener);
+
+ StrValue testValue = new StrValue("no matter");
+
+ service_.broadcast(listenerName, null, testValue);
+ // exception must be ignored
+ }
+ catch (Exception e)
+ {
+ fail("Exception must be ignored.");
+ }
+ }
+
+ public void testAsynchronousExeption() throws Exception
+ {
+ try
+ {
+ final String listenerName = "test_asynch_exeption";
+
+ assertTrue(service_ != null);
+ Listener<Object, StrValue> listener = new AsynchListenerWithException();
+ listener.setName(listenerName);
+ listener.setDescription("Asynchronous listener with exception");
+
+ service_.addListener(listener);
+
+ StrValue testValue = new StrValue("no matter");
+
+ service_.broadcast(listenerName, null, testValue);
+ // exception must be ignored
+
+ Object obj = new Object();
+ synchronized (obj)
+ {
+ obj.wait(1000);
+ }
+ }
+ catch (Exception e)
+ {
+ fail("Exception must be ignored.");
+ }
+ }
+
+ class StrValue
+ {
+ private String val;
+
+ public StrValue(String value)
+ {
+ val = value;
+ }
+
+ public void setValue(String value)
+ {
+ val = value;
+ }
+
+ public String getValue()
+ {
+ return val;
+ }
+ }
+
+ @Asynchronous
+ class AsynchListener extends Listener<Object, StrValue>
+ {
+ @Override
+ public void onEvent(Event<Object, StrValue> event) throws Exception
+ {
+ //wait
+ Object obj = new Object();
+ synchronized (obj)
+ {
+ obj.wait(1000);
+ }
+ //change test value
+ event.getData().setValue("Value become changed");
+ }
+ }
+
+ class SynchListener extends Listener<Object, StrValue>
+ {
+ @Override
+ public void onEvent(Event<Object, StrValue> event) throws Exception
+ {
+ //wait
+ Object obj = new Object();
+ synchronized (obj)
+ {
+ obj.wait(1000);
+ }
+ //change test value
+ event.getData().setValue("Value become changed");
+ }
+ }
+
+ class ExtendedAsynchListener extends AsynchListener
+ {
+ // do nothing. This class exist only for check, does ListenerService process
+ // extended Asynchronous listeners as asynchronous
+ }
+
+ @Asynchronous
+ class AsynchListenerWithException extends Listener<Object, StrValue>
+ {
+ @Override
+ public void onEvent(Event<Object, StrValue> event) throws Exception
+ {
+ //wait
+ Object obj = new Object();
+ synchronized (obj)
+ {
+ obj.wait(1000);
+ }
+
+ throw new Exception("This is test exception");
+ }
+ }
+
+ class SynchListenerWithException extends Listener<Object, StrValue>
+ {
+ @Override
+ public void onEvent(Event<Object, StrValue> event) throws Exception
+ {
+ //wait
+ Object obj = new Object();
+ synchronized (obj)
+ {
+ obj.wait(1000);
+ }
+
+ throw new Exception("This is test exception");
+ }
+ }
+}