[jboss-svn-commits] JBL Code SVN: r35127 - in labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta: tests/src/org/jboss/soa/esb/listeners/message and 1 other directory.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Mon Sep 13 09:29:49 EDT 2010


Author: kevin.conner at jboss.com
Date: 2010-09-13 09:29:48 -0400 (Mon, 13 Sep 2010)
New Revision: 35127

Added:
   labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/message/MessageAwareListenerUnitTest.java
Modified:
   labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
Log:
Make sure last message is always executed: JBESB-3473

Modified: labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java	2010-09-13 13:14:50 UTC (rev 35126)
+++ labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java	2010-09-13 13:29:48 UTC (rev 35127)
@@ -25,6 +25,7 @@
 import java.lang.reflect.Method;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.log4j.Logger;
@@ -349,21 +350,30 @@
 
 		if (null != message)
 		{
+			final Message pipelineMessage = message ;
+			final TransactionalRunner txRunner ;
 			try
 			{
-				final Message pipelineMessage = message ;
 				final Object txHandle = transactionStrategy.suspend();
-				final TransactionalRunner txRunner = new TransactionalRunner(pickUpCourier, pipelineMessage, txHandle);
-				
-				updateThreadCount(+1);
-				_execService.execute(txRunner);
+				txRunner = new TransactionalRunner(pickUpCourier, pipelineMessage, txHandle);
 			}
 			catch (TransactionStrategyException ex)
 			{
 				_logger.warn("Caught transaction related exception: ", ex);
 				cleanCourier(pickUpCourier);
 				rollbackTransaction();
+				return ;
 			}
+			
+			updateThreadCount(+1);
+			try
+			{
+				_execService.execute(txRunner);
+			}
+			catch (final RejectedExecutionException ree)
+			{
+				txRunner.run() ;
+			}
 		}
 	} // ________________________________
 
@@ -468,7 +478,7 @@
         	}
         }
         
-        private PickUpOnlyCourier getCourier()
+        protected PickUpOnlyCourier getCourier()
             throws MalformedEPRException, CourierException
         {
             PickUpOnlyCourier pickUpCourier = _pickUpCourier;

Added: labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/message/MessageAwareListenerUnitTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/message/MessageAwareListenerUnitTest.java	                        (rev 0)
+++ labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/message/MessageAwareListenerUnitTest.java	2010-09-13 13:29:48 UTC (rev 35127)
@@ -0,0 +1,224 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, Red Hat Middleware LLC, and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA  02110-1301, USA.
+ */
+package org.jboss.soa.esb.listeners.message;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
+import junit.framework.JUnit4TestAdapter;
+
+import org.jboss.internal.soa.esb.couriers.PickUpOnlyCourier;
+import org.jboss.internal.soa.esb.services.registry.MockRegistry;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.actions.AbstractActionPipelineProcessor;
+import org.jboss.soa.esb.actions.ActionProcessingException;
+import org.jboss.soa.esb.addressing.MalformedEPRException;
+import org.jboss.soa.esb.couriers.CourierException;
+import org.jboss.soa.esb.couriers.CourierTimeoutException;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.listeners.lifecycle.ManagedLifecycleException;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.format.MessageFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+/**
+ * Tests for Call modifications while invoking through ServiceInvoker.
+ *
+ * @author <a href="mailto:kevin.conner at jboss.com">Kevin Conner</a>
+ */
+public class MessageAwareListenerUnitTest
+{
+    @Before
+    public void setUp()
+        throws Exception
+    {
+        MockRegistry.install() ;
+    }
+    
+    @After
+    public void tearDown()
+        throws Exception
+    {
+        MockRegistry.uninstall() ;
+    }
+    
+    @Test
+    public void testShutdownExecutor()
+        throws Exception
+    {
+        final ConfigTree config = new ConfigTree("MessageAwareListener") ;
+        final String category = "TestCategory" ;
+        final String service = "TestName" ;
+        
+        config.setAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG, category) ;
+        config.setAttribute(ListenerTagNames.SERVICE_NAME_TAG, service) ;
+        config.setAttribute(ListenerTagNames.MAX_THREADS_TAG, "1") ;
+        
+        final ConfigTree configEPR = new ConfigTree(ListenerTagNames.EPR_TAG, config) ;
+        configEPR.setAttribute(ListenerTagNames.URL_TAG, "invm://test-epr") ;
+        
+        final ConfigTree actionTree = new ConfigTree(ListenerTagNames.ACTION_ELEMENT_TAG, config) ;
+        actionTree.setAttribute(ListenerTagNames.ACTION_ELEMENT_TAG, "TestAction");
+        actionTree.setAttribute(ListenerTagNames.ACTION_CLASS_TAG, MessageAwareListenerTestAction.class.getName());
+
+        final MessageAwareListenerMockCourier courier = new MessageAwareListenerMockCourier() ;
+        final MessageAwareListenerTestListener listener = new MessageAwareListenerTestListener(config, courier) ;
+        courier.setListener(listener) ;
+        
+        listener.initialise() ;
+        listener.start() ;
+        
+        final boolean result = courier.waitUntilTriggered(10000) ;
+        assertTrue("Triggered", result) ;
+        
+        listener.destroy() ;
+        assertNotNull("Message threads", courier.getThread()) ;
+        assertNotNull("Execution thread", MessageAwareListenerTestAction.getThread()) ;
+        assertEquals("Message and execution threads", courier.getThread(), MessageAwareListenerTestAction.getThread()) ;
+    }
+    
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(MessageAwareListenerUnitTest.class) ;
+    }
+    
+    public static final class MessageAwareListenerTestListener extends MessageAwareListener
+    {
+        private final PickUpOnlyCourier courier ;
+        
+        public MessageAwareListenerTestListener(final ConfigTree config, final PickUpOnlyCourier courier)
+            throws ConfigurationException
+        {
+            super(config) ;
+            this.courier = courier ;
+        }
+
+        @Override
+        protected PickUpOnlyCourier getCourier()
+            throws MalformedEPRException, CourierException
+        {
+            return courier ;
+        }
+    }
+    
+    public static final class MessageAwareListenerTestAction extends AbstractActionPipelineProcessor
+    {
+        private static Thread thread ;
+        
+        public MessageAwareListenerTestAction(final ConfigTree config)
+        {
+        }
+        
+        public Message process(final Message message)
+            throws ActionProcessingException
+        {
+            setThread() ;
+            return message ;
+        }
+        
+        private static synchronized void setThread()
+        {
+            thread = Thread.currentThread() ;
+        }
+        
+        public static synchronized Thread getThread()
+        {
+            return thread ;
+        }
+    }
+    
+    public static final class MessageAwareListenerMockCourier implements PickUpOnlyCourier
+    {
+        private MessageAwareListenerTestListener listener ;
+        private boolean triggered ;
+        private Thread thread ;
+        
+        public Message pickup(final long millis)
+            throws CourierException, CourierTimeoutException
+        {
+            if (listener != null)
+            {
+                setThread() ;
+                try
+                {
+                    listener.stop() ;
+                }
+                catch (final ManagedLifecycleException mle)
+                {
+                    throw new CourierException("Unexpected lifecycle exception", mle) ;
+                }
+                synchronized(this)
+                {
+                    triggered = true ;
+                    notify() ;
+                }
+            }
+            return MessageFactory.getInstance().getMessage() ;
+        }
+
+        public boolean waitUntilTriggered(final long millis)
+            throws InterruptedException
+        {
+            final long end = System.currentTimeMillis() + millis ;
+            do
+            {
+                final long delay = end - System.currentTimeMillis() ;
+                if (delay > 0)
+                {
+                    synchronized(this)
+                    {
+                        if (!triggered)
+                        {
+                            wait(delay) ;
+                        }
+                        if (triggered)
+                        {
+                            return true ;
+                        }
+                    }
+                }
+                else
+                {
+                    return false ;
+                }
+            }
+            while(true) ;
+        }
+
+        public void cleanup()
+        {
+        }
+
+        public void setListener(final MessageAwareListenerTestListener listener)
+        {
+            this.listener = listener ;
+        }
+        
+        private synchronized void setThread()
+        {
+            thread = Thread.currentThread() ;
+        }
+        
+        public synchronized Thread getThread()
+        {
+            return thread ;
+        }
+    }
+}


Property changes on: labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/message/MessageAwareListenerUnitTest.java
___________________________________________________________________
Name: svn:keywords
   + Rev Date
Name: svn:eol-style
   + native



More information about the jboss-svn-commits mailing list