[jboss-svn-commits] JBL Code SVN: r35127 - in labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta: tests/src/org/jboss/soa/esb/listeners/message and 1 other directory.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Mon Sep 13 09:29:49 EDT 2010
Author: kevin.conner at jboss.com
Date: 2010-09-13 09:29:48 -0400 (Mon, 13 Sep 2010)
New Revision: 35127
Added:
labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/message/MessageAwareListenerUnitTest.java
Modified:
labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
Log:
Make sure last message is always executed: JBESB-3473
Modified: labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java 2010-09-13 13:14:50 UTC (rev 35126)
+++ labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageAwareListener.java 2010-09-13 13:29:48 UTC (rev 35127)
@@ -25,6 +25,7 @@
import java.lang.reflect.Method;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
@@ -349,21 +350,30 @@
if (null != message)
{
+ final Message pipelineMessage = message ;
+ final TransactionalRunner txRunner ;
try
{
- final Message pipelineMessage = message ;
final Object txHandle = transactionStrategy.suspend();
- final TransactionalRunner txRunner = new TransactionalRunner(pickUpCourier, pipelineMessage, txHandle);
-
- updateThreadCount(+1);
- _execService.execute(txRunner);
+ txRunner = new TransactionalRunner(pickUpCourier, pipelineMessage, txHandle);
}
catch (TransactionStrategyException ex)
{
_logger.warn("Caught transaction related exception: ", ex);
cleanCourier(pickUpCourier);
rollbackTransaction();
+ return ;
}
+
+ updateThreadCount(+1);
+ try
+ {
+ _execService.execute(txRunner);
+ }
+ catch (final RejectedExecutionException ree)
+ {
+ txRunner.run() ;
+ }
}
} // ________________________________
@@ -468,7 +478,7 @@
}
}
- private PickUpOnlyCourier getCourier()
+ protected PickUpOnlyCourier getCourier()
throws MalformedEPRException, CourierException
{
PickUpOnlyCourier pickUpCourier = _pickUpCourier;
Added: labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/message/MessageAwareListenerUnitTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/message/MessageAwareListenerUnitTest.java (rev 0)
+++ labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/message/MessageAwareListenerUnitTest.java 2010-09-13 13:29:48 UTC (rev 35127)
@@ -0,0 +1,224 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, Red Hat Middleware LLC, and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ */
+package org.jboss.soa.esb.listeners.message;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
+import junit.framework.JUnit4TestAdapter;
+
+import org.jboss.internal.soa.esb.couriers.PickUpOnlyCourier;
+import org.jboss.internal.soa.esb.services.registry.MockRegistry;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.actions.AbstractActionPipelineProcessor;
+import org.jboss.soa.esb.actions.ActionProcessingException;
+import org.jboss.soa.esb.addressing.MalformedEPRException;
+import org.jboss.soa.esb.couriers.CourierException;
+import org.jboss.soa.esb.couriers.CourierTimeoutException;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.listeners.lifecycle.ManagedLifecycleException;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.format.MessageFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+/**
+ * Tests for Call modifications while invoking through ServiceInvoker.
+ *
+ * @author <a href="mailto:kevin.conner at jboss.com">Kevin Conner</a>
+ */
+public class MessageAwareListenerUnitTest
+{
+ @Before
+ public void setUp()
+ throws Exception
+ {
+ MockRegistry.install() ;
+ }
+
+ @After
+ public void tearDown()
+ throws Exception
+ {
+ MockRegistry.uninstall() ;
+ }
+
+ @Test
+ public void testShutdownExecutor()
+ throws Exception
+ {
+ final ConfigTree config = new ConfigTree("MessageAwareListener") ;
+ final String category = "TestCategory" ;
+ final String service = "TestName" ;
+
+ config.setAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG, category) ;
+ config.setAttribute(ListenerTagNames.SERVICE_NAME_TAG, service) ;
+ config.setAttribute(ListenerTagNames.MAX_THREADS_TAG, "1") ;
+
+ final ConfigTree configEPR = new ConfigTree(ListenerTagNames.EPR_TAG, config) ;
+ configEPR.setAttribute(ListenerTagNames.URL_TAG, "invm://test-epr") ;
+
+ final ConfigTree actionTree = new ConfigTree(ListenerTagNames.ACTION_ELEMENT_TAG, config) ;
+ actionTree.setAttribute(ListenerTagNames.ACTION_ELEMENT_TAG, "TestAction");
+ actionTree.setAttribute(ListenerTagNames.ACTION_CLASS_TAG, MessageAwareListenerTestAction.class.getName());
+
+ final MessageAwareListenerMockCourier courier = new MessageAwareListenerMockCourier() ;
+ final MessageAwareListenerTestListener listener = new MessageAwareListenerTestListener(config, courier) ;
+ courier.setListener(listener) ;
+
+ listener.initialise() ;
+ listener.start() ;
+
+ final boolean result = courier.waitUntilTriggered(10000) ;
+ assertTrue("Triggered", result) ;
+
+ listener.destroy() ;
+ assertNotNull("Message threads", courier.getThread()) ;
+ assertNotNull("Execution thread", MessageAwareListenerTestAction.getThread()) ;
+ assertEquals("Message and execution threads", courier.getThread(), MessageAwareListenerTestAction.getThread()) ;
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(MessageAwareListenerUnitTest.class) ;
+ }
+
+ public static final class MessageAwareListenerTestListener extends MessageAwareListener
+ {
+ private final PickUpOnlyCourier courier ;
+
+ public MessageAwareListenerTestListener(final ConfigTree config, final PickUpOnlyCourier courier)
+ throws ConfigurationException
+ {
+ super(config) ;
+ this.courier = courier ;
+ }
+
+ @Override
+ protected PickUpOnlyCourier getCourier()
+ throws MalformedEPRException, CourierException
+ {
+ return courier ;
+ }
+ }
+
+ public static final class MessageAwareListenerTestAction extends AbstractActionPipelineProcessor
+ {
+ private static Thread thread ;
+
+ public MessageAwareListenerTestAction(final ConfigTree config)
+ {
+ }
+
+ public Message process(final Message message)
+ throws ActionProcessingException
+ {
+ setThread() ;
+ return message ;
+ }
+
+ private static synchronized void setThread()
+ {
+ thread = Thread.currentThread() ;
+ }
+
+ public static synchronized Thread getThread()
+ {
+ return thread ;
+ }
+ }
+
+ public static final class MessageAwareListenerMockCourier implements PickUpOnlyCourier
+ {
+ private MessageAwareListenerTestListener listener ;
+ private boolean triggered ;
+ private Thread thread ;
+
+ public Message pickup(final long millis)
+ throws CourierException, CourierTimeoutException
+ {
+ if (listener != null)
+ {
+ setThread() ;
+ try
+ {
+ listener.stop() ;
+ }
+ catch (final ManagedLifecycleException mle)
+ {
+ throw new CourierException("Unexpected lifecycle exception", mle) ;
+ }
+ synchronized(this)
+ {
+ triggered = true ;
+ notify() ;
+ }
+ }
+ return MessageFactory.getInstance().getMessage() ;
+ }
+
+ public boolean waitUntilTriggered(final long millis)
+ throws InterruptedException
+ {
+ final long end = System.currentTimeMillis() + millis ;
+ do
+ {
+ final long delay = end - System.currentTimeMillis() ;
+ if (delay > 0)
+ {
+ synchronized(this)
+ {
+ if (!triggered)
+ {
+ wait(delay) ;
+ }
+ if (triggered)
+ {
+ return true ;
+ }
+ }
+ }
+ else
+ {
+ return false ;
+ }
+ }
+ while(true) ;
+ }
+
+ public void cleanup()
+ {
+ }
+
+ public void setListener(final MessageAwareListenerTestListener listener)
+ {
+ this.listener = listener ;
+ }
+
+ private synchronized void setThread()
+ {
+ thread = Thread.currentThread() ;
+ }
+
+ public synchronized Thread getThread()
+ {
+ return thread ;
+ }
+ }
+}
Property changes on: labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/message/MessageAwareListenerUnitTest.java
___________________________________________________________________
Name: svn:keywords
+ Rev Date
Name: svn:eol-style
+ native
More information about the jboss-svn-commits
mailing list