[exo-jcr-commits] exo-jcr SVN: r2812 - in kernel/trunk/exo.kernel.component.common/src: test/java/org/exoplatform/services/listener and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Jul 26 09:21:09 EDT 2010


Author: sergiykarpenko
Date: 2010-07-26 09:21:09 -0400 (Mon, 26 Jul 2010)
New Revision: 2812

Added:
   kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/listener/Asynchronous.java
   kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/listener/ListenerThreadFactory.java
   kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/services/listener/TestAsynchronousListener.java
Modified:
   kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/listener/ListenerService.java
Log:
EXOJCR-311: Make broadcasting of events of ListenerService asynchronous - implemented, test added

Added: kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/listener/Asynchronous.java
===================================================================
--- kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/listener/Asynchronous.java	                        (rev 0)
+++ kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/listener/Asynchronous.java	2010-07-26 13:21:09 UTC (rev 2812)
@@ -0,0 +1,36 @@
+/*
+ * Copyright (C) 2003-2010 eXo Platform SAS.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Affero General Public License
+ * as published by the Free Software Foundation; either version 3
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, see<http://www.gnu.org/licenses/>.
+ */
+package org.exoplatform.services.listener;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * This marker annotation shows that Listener must be executed in asynchronous way. 
+ * ListenerService will execute asynchronous-marked listener in dedicated thread.
+ * 
+ * Created by The eXo Platform SAS.
+ * 
+ * @author <a href="karpenko.sergiy at gmail.com">Karpenko Sergiy</a> 
+ * @version $Id: Asynchronous.java 111 2008-11-11 11:11:11Z serg $
+ */
+ at Target(ElementType.TYPE)
+ at Retention(RetentionPolicy.RUNTIME)
+public @interface Asynchronous {
+}

Modified: kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/listener/ListenerService.java
===================================================================
--- kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/listener/ListenerService.java	2010-07-26 12:00:42 UTC (rev 2811)
+++ kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/listener/ListenerService.java	2010-07-26 13:21:09 UTC (rev 2812)
@@ -18,13 +18,17 @@
  */
 package org.exoplatform.services.listener;
 
+import org.exoplatform.container.xml.InitParams;
 import org.exoplatform.services.log.ExoLogger;
 import org.exoplatform.services.log.Log;
+import org.exoplatform.services.naming.InitialContextInitializer;
 
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 
 /**
  * Created by The eXo Platform SAS Author : Nhu Dinh Thuan
@@ -32,29 +36,73 @@
  */
 public class ListenerService
 {
+   /** 
+    * This executor used for asynchronously event broadcast. 
+    */
+   private final Executor executor;
 
+   /**
+    * Listeners by name map.
+    */
    private Map<String, List<Listener>> listeners_;
 
    private static Log log = ExoLogger.getLogger("exo.kernel.component.common.ListenerService");
-   
+
    /**
     * Construct a listener service.
     */
    public ListenerService()
    {
       listeners_ = new HashMap<String, List<Listener>>();
+      executor = Executors.newFixedThreadPool(1, new ListenerThreadFactory());
    }
 
    /**
+    * Construct a listener service.
+    */
+   public ListenerService(InitialContextInitializer initializer, InitParams params)
+   {
+      listeners_ = new HashMap<String, List<Listener>>();
+      int poolSize = 1;
+
+      if (params != null)
+      {
+         if (params.getValueParam("asynchPoolSize") != null)
+         {
+
+            poolSize = Integer.parseInt(params.getValueParam("asynchPoolSize").getValue());
+         }
+      }
+      executor = Executors.newFixedThreadPool(poolSize, new ListenerThreadFactory());
+   }
+
+   /**
     * This method is used to register a listener with the service. The method
     * should: 1. Check to see if there is a list of listener with the listener
     * name, create one if the listener list doesn't exit 2. Add the new listener
-    * to the listener list
+    * to the listener list.
     * 
     * @param listener
     */
    public void addListener(Listener listener)
    {
+      // Check is Listener or its superclass asynchronous, if so - wrap it in AsynchronousListener.
+      Class listenerClass = listener.getClass();
+
+      do
+      {
+         if (listenerClass.isAnnotationPresent(Asynchronous.class))
+         {
+            listener = new AsynchronousListener(listener);
+            break;
+         }
+         else
+         {
+            listenerClass = listenerClass.getSuperclass();
+         }
+      }
+      while (listenerClass != null);
+
       String name = listener.getName();
       List<Listener> list = listeners_.get(name);
       if (list == null)
@@ -101,7 +149,17 @@
          {
             log.debug("broadcasting event " + name + " on " + listener.getName());
          }
-         listener.onEvent(new Event<S, D>(name, source, data));
+
+         try
+         {
+            listener.onEvent(new Event<S, D>(name, source, data));
+         }
+         catch (Exception e)
+         {
+            // log exception and keep broadcast events
+            log.error("Exception on broadcasting events occures: " + e.getMessage(), e.getCause());
+            log.info("Exception occures but keep broadcast events.");
+         }
       }
    }
 
@@ -121,8 +179,98 @@
    {
       List<Listener> list = listeners_.get(event.getEventName());
       if (list == null)
+      {
          return;
+      }
       for (Listener listener : list)
-         listener.onEvent(event);
+      {
+         try
+         {
+            listener.onEvent(event);
+         }
+         catch (Exception e)
+         {
+            // log exception and keep broadcast events
+            log.error("Exception on broadcasting events occures: " + e.getMessage(), e.getCause());
+            log.info("Exception occures but keep broadcast events.");
+         }
+      }
    }
+
+   /**
+    * This AsynchronousListener is a wrapper for original listener, that 
+    * executes wrapped listeners onEvent() in separate thread. 
+    */
+   protected class AsynchronousListener<S, D> extends Listener<S, D>
+   {
+      private Listener<S, D> listener;
+
+      public AsynchronousListener(Listener<S, D> listener)
+      {
+         this.listener = listener;
+      }
+
+      @Override
+      public String getName()
+      {
+         return listener.getName();
+      }
+
+      @Override
+      public void setName(String s)
+      {
+         listener.setName(s);
+      }
+
+      @Override
+      public String getDescription()
+      {
+         return listener.getDescription();
+      }
+
+      @Override
+      public void setDescription(String s)
+      {
+         listener.setDescription(s);
+      }
+
+      @Override
+      public void onEvent(Event<S, D> event) throws Exception
+      {
+         executor.execute(new RunListener<S, D>(listener, event));
+      }
+   }
+
+   /** 
+    * This thread executes listener.onEvent(event) method.
+    */
+   protected class RunListener<S, D> implements Runnable
+   {
+      private Listener<S, D> listener;
+
+      private Event<S, D> event;
+
+      public RunListener(Listener<S, D> listener, Event<S, D> event)
+      {
+         this.listener = listener;
+         this.event = event;
+      }
+
+      /**
+       * {@inheritDoc}
+       */
+      public void run()
+      {
+         try
+         {
+            listener.onEvent(event);
+         }
+         catch (Exception e)
+         {
+            // Do not throw exception. Event is asynchronous so just report error.
+            // Must say that exception will be ignored even in synchronous events.
+            log.error("Exception on broadcasting events occures: " + e.getMessage(), e.getCause());
+         }
+      }
+   }
 }

Added: kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/listener/ListenerThreadFactory.java
===================================================================
--- kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/listener/ListenerThreadFactory.java	                        (rev 0)
+++ kernel/trunk/exo.kernel.component.common/src/main/java/org/exoplatform/services/listener/ListenerThreadFactory.java	2010-07-26 13:21:09 UTC (rev 2812)
@@ -0,0 +1,62 @@
+/*
+ * Copyright (C) 2003-2010 eXo Platform SAS.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Affero General Public License
+ * as published by the Free Software Foundation; either version 3
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, see<http://www.gnu.org/licenses/>.
+ */
+package org.exoplatform.services.listener;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This ThreadFactory implementation is a copy of Executors$DefaultThreadFactory.
+ * Its copied for single purpose - make readable thread name. 
+ * 
+ * Created by The eXo Platform SAS.
+ * 
+ * <br/>Date: 
+ *
+ * @author <a href="karpenko.sergiy at gmail.com">Karpenko Sergiy</a> 
+ * @version $Id: ListenerThreadFactory.java 111 2008-11-11 11:11:11Z serg $
+ */
+public class ListenerThreadFactory implements ThreadFactory
+{
+   static final AtomicInteger poolNumber = new AtomicInteger(1);
+
+   final ThreadGroup group;
+
+   final AtomicInteger threadNumber = new AtomicInteger(1);
+
+   final String namePrefix;
+
+   ListenerThreadFactory()
+   {
+      SecurityManager s = System.getSecurityManager();
+      group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
+      namePrefix = "asynch-event-" + poolNumber.getAndIncrement() + "-thread-";
+   }
+
+   /**
+    * {@inheritDoc}
+    */
+   public Thread newThread(Runnable r)
+   {
+      Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
+      if (t.isDaemon())
+         t.setDaemon(false);
+      if (t.getPriority() != Thread.NORM_PRIORITY)
+         t.setPriority(Thread.NORM_PRIORITY);
+      return t;
+   }
+}

Added: kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/services/listener/TestAsynchronousListener.java
===================================================================
--- kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/services/listener/TestAsynchronousListener.java	                        (rev 0)
+++ kernel/trunk/exo.kernel.component.common/src/test/java/org/exoplatform/services/listener/TestAsynchronousListener.java	2010-07-26 13:21:09 UTC (rev 2812)
@@ -0,0 +1,249 @@
+/*
+ * Copyright (C) 2003-2010 eXo Platform SAS.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Affero General Public License
+ * as published by the Free Software Foundation; either version 3
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, see<http://www.gnu.org/licenses/>.
+ */
+package org.exoplatform.services.listener;
+
+import org.exoplatform.container.PortalContainer;
+import org.exoplatform.test.BasicTestCase;
+
+/**
+ * Created by The eXo Platform SAS.
+ * 
+ * <br/>Date: 
+ *
+ * @author <a href="karpenko.sergiy at gmail.com">Karpenko Sergiy</a> 
+ * @version $Id: TestAsynchronousListener.java 111 2008-11-11 11:11:11Z serg $
+ */
+public class TestAsynchronousListener extends BasicTestCase
+{
+   private ListenerService service_;
+
+   public void setUp() throws Exception
+   {
+      setTestNumber(1);
+      PortalContainer manager = PortalContainer.getInstance();
+      service_ = (ListenerService)manager.getComponentInstanceOfType(ListenerService.class);
+   }
+
+   public void testAsynchronousListener() throws Exception
+   {
+      final String listenerName = "test_asynch";
+      final String baseString = "Value not changed";
+
+      assertTrue(service_ != null);
+      Listener<Object, StrValue> listener = new AsynchListener();
+      listener.setName(listenerName);
+      listener.setDescription("Asynchronous listener");
+
+      service_.addListener(listener);
+
+      StrValue testValue = new StrValue(baseString);
+
+      service_.broadcast(listenerName, new Object(), testValue);
+
+      // if asynch enabled value must be changed later so it's same exact after listener 
+      // broadcasting
+      assertEquals(baseString, testValue.getValue());
+   }
+
+   public void testParentAsynchListener() throws Exception
+   {
+      final String listenerName = "test_parent_asynch";
+      final String baseString = "Value not changed";
+
+      assertTrue(service_ != null);
+      Listener<Object, StrValue> listener = new ExtendedAsynchListener();
+      listener.setName(listenerName);
+      listener.setDescription("Asynchronous listener");
+
+      service_.addListener(listener);
+
+      StrValue testValue = new StrValue(baseString);
+
+      service_.broadcast(listenerName, new Object(), testValue);
+
+      // if asynch enabled value must be changed later so it's same exact after listener 
+      // broadcasting
+      assertEquals(baseString, testValue.getValue());
+   }
+
+   public void testSynchronousListener() throws Exception
+   {
+      final String listenerName = "test_synch";
+      final String baseString = "Value not changed";
+
+      assertTrue(service_ != null);
+      Listener<Object, StrValue> listener = new SynchListener();
+      listener.setName(listenerName);
+      listener.setDescription("Synchronous listener");
+
+      service_.addListener(listener);
+
+      StrValue testValue = new StrValue(baseString);
+
+      service_.broadcast(listenerName, null, testValue);
+
+      // if Synch enabled - broadcast must wait until all events will be processed, 
+      // so value must be changed
+      assertFalse(baseString.equals(testValue.getValue()));
+   }
+
+   public void testSynchronousExeption() throws Exception
+   {
+      try
+      {
+         final String listenerName = "test_synch_exeption";
+
+         assertTrue(service_ != null);
+         Listener<Object, StrValue> listener = new SynchListenerWithException();
+         listener.setName(listenerName);
+         listener.setDescription("Synchronous listener with exception");
+
+         service_.addListener(listener);
+
+         StrValue testValue = new StrValue("no matter");
+
+         service_.broadcast(listenerName, null, testValue);
+         // exception must be ignored
+      }
+      catch (Exception e)
+      {
+         fail("Exception must be ignored.");
+      }
+   }
+
+   public void testAsynchronousExeption() throws Exception
+   {
+      try
+      {
+         final String listenerName = "test_asynch_exeption";
+
+         assertTrue(service_ != null);
+         Listener<Object, StrValue> listener = new AsynchListenerWithException();
+         listener.setName(listenerName);
+         listener.setDescription("Asynchronous listener with exception");
+
+         service_.addListener(listener);
+
+         StrValue testValue = new StrValue("no matter");
+
+         service_.broadcast(listenerName, null, testValue);
+         // exception must be ignored
+
+         Object obj = new Object();
+         synchronized (obj)
+         {
+            obj.wait(1000);
+         }
+      }
+      catch (Exception e)
+      {
+         fail("Exception must be ignored.");
+      }
+   }
+
+   class StrValue
+   {
+      private String val;
+
+      public StrValue(String value)
+      {
+         val = value;
+      }
+
+      public void setValue(String value)
+      {
+         val = value;
+      }
+
+      public String getValue()
+      {
+         return val;
+      }
+   }
+
+   @Asynchronous
+   class AsynchListener extends Listener<Object, StrValue>
+   {
+      @Override
+      public void onEvent(Event<Object, StrValue> event) throws Exception
+      {
+         //wait
+         Object obj = new Object();
+         synchronized (obj)
+         {
+            obj.wait(1000);
+         }
+         //change test value
+         event.getData().setValue("Value become changed");
+      }
+   }
+
+   class SynchListener extends Listener<Object, StrValue>
+   {
+      @Override
+      public void onEvent(Event<Object, StrValue> event) throws Exception
+      {
+         //wait
+         Object obj = new Object();
+         synchronized (obj)
+         {
+            obj.wait(1000);
+         }
+         //change test value
+         event.getData().setValue("Value become changed");
+      }
+   }
+
+   class ExtendedAsynchListener extends AsynchListener
+   {
+      // do nothing. This class exist only for check, does ListenerService process 
+      // extended Asynchronous listeners as asynchronous
+   }
+
+   @Asynchronous
+   class AsynchListenerWithException extends Listener<Object, StrValue>
+   {
+      @Override
+      public void onEvent(Event<Object, StrValue> event) throws Exception
+      {
+         //wait
+         Object obj = new Object();
+         synchronized (obj)
+         {
+            obj.wait(1000);
+         }
+
+         throw new Exception("This is test exception");
+      }
+   }
+
+   class SynchListenerWithException extends Listener<Object, StrValue>
+   {
+      @Override
+      public void onEvent(Event<Object, StrValue> event) throws Exception
+      {
+         //wait
+         Object obj = new Object();
+         synchronized (obj)
+         {
+            obj.wait(1000);
+         }
+
+         throw new Exception("This is test exception");
+      }
+   }
+}



More information about the exo-jcr-commits mailing list