From do-not-reply at jboss.org Mon Jul 26 09:21:09 2010 Content-Type: multipart/mixed; boundary="===============6245671062824710199==" MIME-Version: 1.0 From: do-not-reply at jboss.org To: exo-jcr-commits at lists.jboss.org Subject: [exo-jcr-commits] exo-jcr SVN: r2812 - in kernel/trunk/exo.kernel.component.common/src: test/java/org/exoplatform/services/listener and 1 other directory. Date: Mon, 26 Jul 2010 09:21:09 -0400 Message-ID: <201007261321.o6QDL9PP019762@svn01.web.mwc.hst.phx2.redhat.com> --===============6245671062824710199== Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: quoted-printable Author: sergiykarpenko Date: 2010-07-26 09:21:09 -0400 (Mon, 26 Jul 2010) New Revision: 2812 Added: kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/s= ervices/listener/Asynchronous.java kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/s= ervices/listener/ListenerThreadFactory.java kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/s= ervices/listener/TestAsynchronousListener.java Modified: kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/s= ervices/listener/ListenerService.java Log: EXOJCR-311: Make broadcasting of events of ListenerService asynchronous - i= mplemented, test added Added: kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatfo= rm/services/listener/Asynchronous.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/= services/listener/Asynchronous.java (rev 0) +++ kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/= services/listener/Asynchronous.java 2010-07-26 13:21:09 UTC (rev 2812) @@ -0,0 +1,36 @@ +/* + * Copyright (C) 2003-2010 eXo Platform SAS. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Affero General Public License + * as published by the Free Software Foundation; either version 3 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, see. + */ +package org.exoplatform.services.listener; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * This marker annotation shows that Listener must be executed in asynchro= nous way. = + * ListenerService will execute asynchronous-marked listener in dedicated = thread. + * = + * Created by The eXo Platform SAS. + * = + * @author Karpenko Sergiy = + * @version $Id: Asynchronous.java 111 2008-11-11 11:11:11Z serg $ + */ +(a)Target(ElementType.TYPE) +(a)Retention(RetentionPolicy.RUNTIME) +public @interface Asynchronous { +} Modified: kernel/trunk/exo.kernel.component.common/src/main/java/org/exopla= tform/services/listener/ListenerService.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/= services/listener/ListenerService.java 2010-07-26 12:00:42 UTC (rev 2811) +++ kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/= services/listener/ListenerService.java 2010-07-26 13:21:09 UTC (rev 2812) @@ -18,13 +18,17 @@ */ package org.exoplatform.services.listener; = +import org.exoplatform.container.xml.InitParams; import org.exoplatform.services.log.ExoLogger; import org.exoplatform.services.log.Log; +import org.exoplatform.services.naming.InitialContextInitializer; = import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; = /** * Created by The eXo Platform SAS Author : Nhu Dinh Thuan @@ -32,29 +36,73 @@ */ public class ListenerService { + /** = + * This executor used for asynchronously event broadcast. = + */ + private final Executor executor; = + /** + * Listeners by name map. + */ private Map> listeners_; = private static Log log =3D ExoLogger.getLogger("exo.kernel.component.co= mmon.ListenerService"); - = + /** * Construct a listener service. */ public ListenerService() { listeners_ =3D new HashMap>(); + executor =3D Executors.newFixedThreadPool(1, new ListenerThreadFacto= ry()); } = /** + * Construct a listener service. + */ + public ListenerService(InitialContextInitializer initializer, InitParam= s params) + { + listeners_ =3D new HashMap>(); + int poolSize =3D 1; + + if (params !=3D null) + { + if (params.getValueParam("asynchPoolSize") !=3D null) + { + + poolSize =3D Integer.parseInt(params.getValueParam("asynchPool= Size").getValue()); + } + } + executor =3D Executors.newFixedThreadPool(poolSize, new ListenerThre= adFactory()); + } + + /** * This method is used to register a listener with the service. The met= hod * should: 1. Check to see if there is a list of listener with the list= ener * name, create one if the listener list doesn't exit 2. Add the new li= stener - * to the listener list + * to the listener list. * = * @param listener */ public void addListener(Listener listener) { + // Check is Listener or its superclass asynchronous, if so - wrap it= in AsynchronousListener. + Class listenerClass =3D listener.getClass(); + + do + { + if (listenerClass.isAnnotationPresent(Asynchronous.class)) + { + listener =3D new AsynchronousListener(listener); + break; + } + else + { + listenerClass =3D listenerClass.getSuperclass(); + } + } + while (listenerClass !=3D null); + String name =3D listener.getName(); List list =3D listeners_.get(name); if (list =3D=3D null) @@ -101,7 +149,17 @@ { log.debug("broadcasting event " + name + " on " + listener.get= Name()); } - listener.onEvent(new Event(name, source, data)); + + try + { + listener.onEvent(new Event(name, source, data)); + } + catch (Exception e) + { + // log exception and keep broadcast events + log.error("Exception on broadcasting events occures: " + e.get= Message(), e.getCause()); + log.info("Exception occures but keep broadcast events."); + } } } = @@ -121,8 +179,98 @@ { List list =3D listeners_.get(event.getEventName()); if (list =3D=3D null) + { return; + } for (Listener listener : list) - listener.onEvent(event); + { + try + { + listener.onEvent(event); + } + catch (Exception e) + { + // log exception and keep broadcast events + log.error("Exception on broadcasting events occures: " + e.get= Message(), e.getCause()); + log.info("Exception occures but keep broadcast events."); + } + } } + + /** + * This AsynchronousListener is a wrapper for original listener, that = + * executes wrapped listeners onEvent() in separate thread. = + */ + protected class AsynchronousListener extends Listener + { + private Listener listener; + + public AsynchronousListener(Listener listener) + { + this.listener =3D listener; + } + + @Override + public String getName() + { + return listener.getName(); + } + + @Override + public void setName(String s) + { + listener.setName(s); + } + + @Override + public String getDescription() + { + return listener.getDescription(); + } + + @Override + public void setDescription(String s) + { + listener.setDescription(s); + } + + @Override + public void onEvent(Event event) throws Exception + { + executor.execute(new RunListener(listener, event)); + } + } + + /** = + * This thread executes listener.onEvent(event) method. + */ + protected class RunListener implements Runnable + { + private Listener listener; + + private Event event; + + public RunListener(Listener listener, Event event) + { + this.listener =3D listener; + this.event =3D event; + } + + /** + * {@inheritDoc} + */ + public void run() + { + try + { + listener.onEvent(event); + } + catch (Exception e) + { + // Do not throw exception. Event is asynchronous so just repor= t error. + // Must say that exception will be ignored even in synchronous= events. + log.error("Exception on broadcasting events occures: " + e.get= Message(), e.getCause()); + } + } + } } Added: kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatfo= rm/services/listener/ListenerThreadFactory.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/= services/listener/ListenerThreadFactory.java (rev 0) +++ kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/= services/listener/ListenerThreadFactory.java 2010-07-26 13:21:09 UTC (rev 2= 812) @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2003-2010 eXo Platform SAS. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Affero General Public License + * as published by the Free Software Foundation; either version 3 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, see. + */ +package org.exoplatform.services.listener; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This ThreadFactory implementation is a copy of Executors$DefaultThreadF= actory. + * Its copied for single purpose - make readable thread name. = + * = + * Created by The eXo Platform SAS. + * = + *
Date: = + * + * @author Karpenko Sergiy = + * @version $Id: ListenerThreadFactory.java 111 2008-11-11 11:11:11Z serg $ + */ +public class ListenerThreadFactory implements ThreadFactory +{ + static final AtomicInteger poolNumber =3D new AtomicInteger(1); + + final ThreadGroup group; + + final AtomicInteger threadNumber =3D new AtomicInteger(1); + + final String namePrefix; + + ListenerThreadFactory() + { + SecurityManager s =3D System.getSecurityManager(); + group =3D (s !=3D null) ? s.getThreadGroup() : Thread.currentThread(= ).getThreadGroup(); + namePrefix =3D "asynch-event-" + poolNumber.getAndIncrement() + "-th= read-"; + } + + /** + * {@inheritDoc} + */ + public Thread newThread(Runnable r) + { + Thread t =3D new Thread(group, r, namePrefix + threadNumber.getAndIn= crement(), 0); + if (t.isDaemon()) + t.setDaemon(false); + if (t.getPriority() !=3D Thread.NORM_PRIORITY) + t.setPriority(Thread.NORM_PRIORITY); + return t; + } +} Added: kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatfo= rm/services/listener/TestAsynchronousListener.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/= services/listener/TestAsynchronousListener.java (re= v 0) +++ kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/= services/listener/TestAsynchronousListener.java 2010-07-26 13:21:09 UTC (re= v 2812) @@ -0,0 +1,249 @@ +/* + * Copyright (C) 2003-2010 eXo Platform SAS. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Affero General Public License + * as published by the Free Software Foundation; either version 3 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, see. + */ +package org.exoplatform.services.listener; + +import org.exoplatform.container.PortalContainer; +import org.exoplatform.test.BasicTestCase; + +/** + * Created by The eXo Platform SAS. + * = + *
Date: = + * + * @author Karpenko Sergiy = + * @version $Id: TestAsynchronousListener.java 111 2008-11-11 11:11:11Z se= rg $ + */ +public class TestAsynchronousListener extends BasicTestCase +{ + private ListenerService service_; + + public void setUp() throws Exception + { + setTestNumber(1); + PortalContainer manager =3D PortalContainer.getInstance(); + service_ =3D (ListenerService)manager.getComponentInstanceOfType(Lis= tenerService.class); + } + + public void testAsynchronousListener() throws Exception + { + final String listenerName =3D "test_asynch"; + final String baseString =3D "Value not changed"; + + assertTrue(service_ !=3D null); + Listener listener =3D new AsynchListener(); + listener.setName(listenerName); + listener.setDescription("Asynchronous listener"); + + service_.addListener(listener); + + StrValue testValue =3D new StrValue(baseString); + + service_.broadcast(listenerName, new Object(), testValue); + + // if asynch enabled value must be changed later so it's same exact = after listener = + // broadcasting + assertEquals(baseString, testValue.getValue()); + } + + public void testParentAsynchListener() throws Exception + { + final String listenerName =3D "test_parent_asynch"; + final String baseString =3D "Value not changed"; + + assertTrue(service_ !=3D null); + Listener listener =3D new ExtendedAsynchListener(); + listener.setName(listenerName); + listener.setDescription("Asynchronous listener"); + + service_.addListener(listener); + + StrValue testValue =3D new StrValue(baseString); + + service_.broadcast(listenerName, new Object(), testValue); + + // if asynch enabled value must be changed later so it's same exact = after listener = + // broadcasting + assertEquals(baseString, testValue.getValue()); + } + + public void testSynchronousListener() throws Exception + { + final String listenerName =3D "test_synch"; + final String baseString =3D "Value not changed"; + + assertTrue(service_ !=3D null); + Listener listener =3D new SynchListener(); + listener.setName(listenerName); + listener.setDescription("Synchronous listener"); + + service_.addListener(listener); + + StrValue testValue =3D new StrValue(baseString); + + service_.broadcast(listenerName, null, testValue); + + // if Synch enabled - broadcast must wait until all events will be p= rocessed, = + // so value must be changed + assertFalse(baseString.equals(testValue.getValue())); + } + + public void testSynchronousExeption() throws Exception + { + try + { + final String listenerName =3D "test_synch_exeption"; + + assertTrue(service_ !=3D null); + Listener listener =3D new SynchListenerWithExce= ption(); + listener.setName(listenerName); + listener.setDescription("Synchronous listener with exception"); + + service_.addListener(listener); + + StrValue testValue =3D new StrValue("no matter"); + + service_.broadcast(listenerName, null, testValue); + // exception must be ignored + } + catch (Exception e) + { + fail("Exception must be ignored."); + } + } + + public void testAsynchronousExeption() throws Exception + { + try + { + final String listenerName =3D "test_asynch_exeption"; + + assertTrue(service_ !=3D null); + Listener listener =3D new AsynchListenerWithExc= eption(); + listener.setName(listenerName); + listener.setDescription("Asynchronous listener with exception"); + + service_.addListener(listener); + + StrValue testValue =3D new StrValue("no matter"); + + service_.broadcast(listenerName, null, testValue); + // exception must be ignored + + Object obj =3D new Object(); + synchronized (obj) + { + obj.wait(1000); + } + } + catch (Exception e) + { + fail("Exception must be ignored."); + } + } + + class StrValue + { + private String val; + + public StrValue(String value) + { + val =3D value; + } + + public void setValue(String value) + { + val =3D value; + } + + public String getValue() + { + return val; + } + } + + @Asynchronous + class AsynchListener extends Listener + { + @Override + public void onEvent(Event event) throws Exception + { + //wait + Object obj =3D new Object(); + synchronized (obj) + { + obj.wait(1000); + } + //change test value + event.getData().setValue("Value become changed"); + } + } + + class SynchListener extends Listener + { + @Override + public void onEvent(Event event) throws Exception + { + //wait + Object obj =3D new Object(); + synchronized (obj) + { + obj.wait(1000); + } + //change test value + event.getData().setValue("Value become changed"); + } + } + + class ExtendedAsynchListener extends AsynchListener + { + // do nothing. This class exist only for check, does ListenerService= process = + // extended Asynchronous listeners as asynchronous + } + + @Asynchronous + class AsynchListenerWithException extends Listener + { + @Override + public void onEvent(Event event) throws Exception + { + //wait + Object obj =3D new Object(); + synchronized (obj) + { + obj.wait(1000); + } + + throw new Exception("This is test exception"); + } + } + + class SynchListenerWithException extends Listener + { + @Override + public void onEvent(Event event) throws Exception + { + //wait + Object obj =3D new Object(); + synchronized (obj) + { + obj.wait(1000); + } + + throw new Exception("This is test exception"); + } + } +} --===============6245671062824710199==--