From do-not-reply at jboss.org Mon Jul 26 09:21:09 2010
Content-Type: multipart/mixed; boundary="===============6245671062824710199=="
MIME-Version: 1.0
From: do-not-reply at jboss.org
To: exo-jcr-commits at lists.jboss.org
Subject: [exo-jcr-commits] exo-jcr SVN: r2812 - in
kernel/trunk/exo.kernel.component.common/src:
test/java/org/exoplatform/services/listener and 1 other directory.
Date: Mon, 26 Jul 2010 09:21:09 -0400
Message-ID: <201007261321.o6QDL9PP019762@svn01.web.mwc.hst.phx2.redhat.com>
--===============6245671062824710199==
Content-Type: text/plain; charset="utf-8"
MIME-Version: 1.0
Content-Transfer-Encoding: quoted-printable
Author: sergiykarpenko
Date: 2010-07-26 09:21:09 -0400 (Mon, 26 Jul 2010)
New Revision: 2812
Added:
kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/s=
ervices/listener/Asynchronous.java
kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/s=
ervices/listener/ListenerThreadFactory.java
kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/s=
ervices/listener/TestAsynchronousListener.java
Modified:
kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/s=
ervices/listener/ListenerService.java
Log:
EXOJCR-311: Make broadcasting of events of ListenerService asynchronous - i=
mplemented, test added
Added: kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatfo=
rm/services/listener/Asynchronous.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/=
services/listener/Asynchronous.java (rev 0)
+++ kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/=
services/listener/Asynchronous.java 2010-07-26 13:21:09 UTC (rev 2812)
@@ -0,0 +1,36 @@
+/*
+ * Copyright (C) 2003-2010 eXo Platform SAS.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Affero General Public License
+ * as published by the Free Software Foundation; either version 3
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, see.
+ */
+package org.exoplatform.services.listener;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * This marker annotation shows that Listener must be executed in asynchro=
nous way. =
+ * ListenerService will execute asynchronous-marked listener in dedicated =
thread.
+ * =
+ * Created by The eXo Platform SAS.
+ * =
+ * @author Karpenko Sergiy =
+ * @version $Id: Asynchronous.java 111 2008-11-11 11:11:11Z serg $
+ */
+(a)Target(ElementType.TYPE)
+(a)Retention(RetentionPolicy.RUNTIME)
+public @interface Asynchronous {
+}
Modified: kernel/trunk/exo.kernel.component.common/src/main/java/org/exopla=
tform/services/listener/ListenerService.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/=
services/listener/ListenerService.java 2010-07-26 12:00:42 UTC (rev 2811)
+++ kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/=
services/listener/ListenerService.java 2010-07-26 13:21:09 UTC (rev 2812)
@@ -18,13 +18,17 @@
*/
package org.exoplatform.services.listener;
=
+import org.exoplatform.container.xml.InitParams;
import org.exoplatform.services.log.ExoLogger;
import org.exoplatform.services.log.Log;
+import org.exoplatform.services.naming.InitialContextInitializer;
=
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
=
/**
* Created by The eXo Platform SAS Author : Nhu Dinh Thuan
@@ -32,29 +36,73 @@
*/
public class ListenerService
{
+ /** =
+ * This executor used for asynchronously event broadcast. =
+ */
+ private final Executor executor;
=
+ /**
+ * Listeners by name map.
+ */
private Map> listeners_;
=
private static Log log =3D ExoLogger.getLogger("exo.kernel.component.co=
mmon.ListenerService");
- =
+
/**
* Construct a listener service.
*/
public ListenerService()
{
listeners_ =3D new HashMap>();
+ executor =3D Executors.newFixedThreadPool(1, new ListenerThreadFacto=
ry());
}
=
/**
+ * Construct a listener service.
+ */
+ public ListenerService(InitialContextInitializer initializer, InitParam=
s params)
+ {
+ listeners_ =3D new HashMap>();
+ int poolSize =3D 1;
+
+ if (params !=3D null)
+ {
+ if (params.getValueParam("asynchPoolSize") !=3D null)
+ {
+
+ poolSize =3D Integer.parseInt(params.getValueParam("asynchPool=
Size").getValue());
+ }
+ }
+ executor =3D Executors.newFixedThreadPool(poolSize, new ListenerThre=
adFactory());
+ }
+
+ /**
* This method is used to register a listener with the service. The met=
hod
* should: 1. Check to see if there is a list of listener with the list=
ener
* name, create one if the listener list doesn't exit 2. Add the new li=
stener
- * to the listener list
+ * to the listener list.
* =
* @param listener
*/
public void addListener(Listener listener)
{
+ // Check is Listener or its superclass asynchronous, if so - wrap it=
in AsynchronousListener.
+ Class listenerClass =3D listener.getClass();
+
+ do
+ {
+ if (listenerClass.isAnnotationPresent(Asynchronous.class))
+ {
+ listener =3D new AsynchronousListener(listener);
+ break;
+ }
+ else
+ {
+ listenerClass =3D listenerClass.getSuperclass();
+ }
+ }
+ while (listenerClass !=3D null);
+
String name =3D listener.getName();
List list =3D listeners_.get(name);
if (list =3D=3D null)
@@ -101,7 +149,17 @@
{
log.debug("broadcasting event " + name + " on " + listener.get=
Name());
}
- listener.onEvent(new Event(name, source, data));
+
+ try
+ {
+ listener.onEvent(new Event(name, source, data));
+ }
+ catch (Exception e)
+ {
+ // log exception and keep broadcast events
+ log.error("Exception on broadcasting events occures: " + e.get=
Message(), e.getCause());
+ log.info("Exception occures but keep broadcast events.");
+ }
}
}
=
@@ -121,8 +179,98 @@
{
List list =3D listeners_.get(event.getEventName());
if (list =3D=3D null)
+ {
return;
+ }
for (Listener listener : list)
- listener.onEvent(event);
+ {
+ try
+ {
+ listener.onEvent(event);
+ }
+ catch (Exception e)
+ {
+ // log exception and keep broadcast events
+ log.error("Exception on broadcasting events occures: " + e.get=
Message(), e.getCause());
+ log.info("Exception occures but keep broadcast events.");
+ }
+ }
}
+
+ /**
+ * This AsynchronousListener is a wrapper for original listener, that =
+ * executes wrapped listeners onEvent() in separate thread. =
+ */
+ protected class AsynchronousListener extends Listener
+ {
+ private Listener listener;
+
+ public AsynchronousListener(Listener listener)
+ {
+ this.listener =3D listener;
+ }
+
+ @Override
+ public String getName()
+ {
+ return listener.getName();
+ }
+
+ @Override
+ public void setName(String s)
+ {
+ listener.setName(s);
+ }
+
+ @Override
+ public String getDescription()
+ {
+ return listener.getDescription();
+ }
+
+ @Override
+ public void setDescription(String s)
+ {
+ listener.setDescription(s);
+ }
+
+ @Override
+ public void onEvent(Event event) throws Exception
+ {
+ executor.execute(new RunListener(listener, event));
+ }
+ }
+
+ /** =
+ * This thread executes listener.onEvent(event) method.
+ */
+ protected class RunListener implements Runnable
+ {
+ private Listener listener;
+
+ private Event event;
+
+ public RunListener(Listener listener, Event event)
+ {
+ this.listener =3D listener;
+ this.event =3D event;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void run()
+ {
+ try
+ {
+ listener.onEvent(event);
+ }
+ catch (Exception e)
+ {
+ // Do not throw exception. Event is asynchronous so just repor=
t error.
+ // Must say that exception will be ignored even in synchronous=
events.
+ log.error("Exception on broadcasting events occures: " + e.get=
Message(), e.getCause());
+ }
+ }
+ }
}
Added: kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatfo=
rm/services/listener/ListenerThreadFactory.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/=
services/listener/ListenerThreadFactory.java (rev 0)
+++ kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/=
services/listener/ListenerThreadFactory.java 2010-07-26 13:21:09 UTC (rev 2=
812)
@@ -0,0 +1,62 @@
+/*
+ * Copyright (C) 2003-2010 eXo Platform SAS.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Affero General Public License
+ * as published by the Free Software Foundation; either version 3
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, see.
+ */
+package org.exoplatform.services.listener;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This ThreadFactory implementation is a copy of Executors$DefaultThreadF=
actory.
+ * Its copied for single purpose - make readable thread name. =
+ * =
+ * Created by The eXo Platform SAS.
+ * =
+ * Date: =
+ *
+ * @author Karpenko Sergiy =
+ * @version $Id: ListenerThreadFactory.java 111 2008-11-11 11:11:11Z serg $
+ */
+public class ListenerThreadFactory implements ThreadFactory
+{
+ static final AtomicInteger poolNumber =3D new AtomicInteger(1);
+
+ final ThreadGroup group;
+
+ final AtomicInteger threadNumber =3D new AtomicInteger(1);
+
+ final String namePrefix;
+
+ ListenerThreadFactory()
+ {
+ SecurityManager s =3D System.getSecurityManager();
+ group =3D (s !=3D null) ? s.getThreadGroup() : Thread.currentThread(=
).getThreadGroup();
+ namePrefix =3D "asynch-event-" + poolNumber.getAndIncrement() + "-th=
read-";
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public Thread newThread(Runnable r)
+ {
+ Thread t =3D new Thread(group, r, namePrefix + threadNumber.getAndIn=
crement(), 0);
+ if (t.isDaemon())
+ t.setDaemon(false);
+ if (t.getPriority() !=3D Thread.NORM_PRIORITY)
+ t.setPriority(Thread.NORM_PRIORITY);
+ return t;
+ }
+}
Added: kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatfo=
rm/services/listener/TestAsynchronousListener.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/=
services/listener/TestAsynchronousListener.java (re=
v 0)
+++ kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/=
services/listener/TestAsynchronousListener.java 2010-07-26 13:21:09 UTC (re=
v 2812)
@@ -0,0 +1,249 @@
+/*
+ * Copyright (C) 2003-2010 eXo Platform SAS.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Affero General Public License
+ * as published by the Free Software Foundation; either version 3
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, see.
+ */
+package org.exoplatform.services.listener;
+
+import org.exoplatform.container.PortalContainer;
+import org.exoplatform.test.BasicTestCase;
+
+/**
+ * Created by The eXo Platform SAS.
+ * =
+ * Date: =
+ *
+ * @author Karpenko Sergiy =
+ * @version $Id: TestAsynchronousListener.java 111 2008-11-11 11:11:11Z se=
rg $
+ */
+public class TestAsynchronousListener extends BasicTestCase
+{
+ private ListenerService service_;
+
+ public void setUp() throws Exception
+ {
+ setTestNumber(1);
+ PortalContainer manager =3D PortalContainer.getInstance();
+ service_ =3D (ListenerService)manager.getComponentInstanceOfType(Lis=
tenerService.class);
+ }
+
+ public void testAsynchronousListener() throws Exception
+ {
+ final String listenerName =3D "test_asynch";
+ final String baseString =3D "Value not changed";
+
+ assertTrue(service_ !=3D null);
+ Listener