[jboss-cvs] JBoss Messaging SVN: r5912 - in trunk: examples/jms/src/org/jboss/jms/example and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Feb 20 09:38:51 EST 2009
Author: ataylor
Date: 2009-02-20 09:38:51 -0500 (Fri, 20 Feb 2009)
New Revision: 5912
Removed:
trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandlerPool.java
Modified:
trunk/examples/jms/build.xml
trunk/examples/jms/src/org/jboss/jms/example/EJB3MDBExample.java
trunk/examples/jms/src/org/jboss/jms/example/Sender.java
trunk/examples/messaging/src/org/jboss/messaging/example/SimpleClient.java
trunk/src/main/org/jboss/messaging/ra/JBMConnectionRequestInfo.java
trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivation.java
trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java
Log:
removed pooling from ResourceAdapter
Modified: trunk/examples/jms/build.xml
===================================================================
--- trunk/examples/jms/build.xml 2009-02-20 12:12:32 UTC (rev 5911)
+++ trunk/examples/jms/build.xml 2009-02-20 14:38:51 UTC (rev 5912)
@@ -79,6 +79,18 @@
<pathelement location="config"/>
</path>
+ <property name="jboss.home" value="${JBOSS_HOME}" />
+
+ <path id="as.classpath">
+ <path refid="runtime.classpath"/>
+ <fileset file="${jboss.home}/client/" >
+ <include name="**/*.jar"/>
+ </fileset>
+ <fileset file="${jboss.home}/common/lib/" >
+ <include name="**/*.jar"/>
+ </fileset>
+ </path>
+
<target name="help" description="-> display help">
<echo>*****************************************************************</echo>
<echo>* to run examples execute one of the following *</echo>
@@ -219,7 +231,7 @@
<target name="Sender" depends="compile" description="-> run performance test when sending to a queue">
<java classname="org.jboss.jms.example.Sender" fork="true">
- <classpath refid="runtime.classpath"/>
+ <classpath refid="as.classpath"/>
<jvmarg value="-Xmx512M"/>
<jvmarg value="-XX:+UseParallelGC"/>
<jvmarg value="-XX:+AggressiveOpts"/>
Modified: trunk/examples/jms/src/org/jboss/jms/example/EJB3MDBExample.java
===================================================================
--- trunk/examples/jms/src/org/jboss/jms/example/EJB3MDBExample.java 2009-02-20 12:12:32 UTC (rev 5911)
+++ trunk/examples/jms/src/org/jboss/jms/example/EJB3MDBExample.java 2009-02-20 14:38:51 UTC (rev 5912)
@@ -44,7 +44,8 @@
@MessageDriven(activationConfig =
{
@ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"),
- @ActivationConfigProperty(propertyName="destination", propertyValue="queue/testQueue")
+ @ActivationConfigProperty(propertyName="destination", propertyValue="queue/testQueue"),
+ @ActivationConfigProperty(propertyName = "maxSession", propertyValue = "10")
})
public class EJB3MDBExample implements MessageListener
{
@@ -65,16 +66,10 @@
String text = tm.getText();
System.out.println("message " + text + " received");
- // flip the string
- String result = "";
- for(int i = 0; i < text.length(); i++)
- {
- result = text.charAt(i) + result;
- }
+ System.out.print("Sleeping for 2 secs");
+ Thread.sleep(2000);
+ System.out.println("awake: sending message");
- System.out.println("message processed, result: " + result);
-
-
InitialContext ic = new InitialContext();
ConnectionFactory cf = (ConnectionFactory)ic.lookup("java:/JmsXA");
ic.close();
@@ -85,7 +80,7 @@
Destination replyTo = m.getJMSReplyTo();
MessageProducer producer = session.createProducer(replyTo);
- TextMessage reply = session.createTextMessage(result);
+ TextMessage reply = session.createTextMessage(text);
producer.send(reply);
producer.close();
Modified: trunk/examples/jms/src/org/jboss/jms/example/Sender.java
===================================================================
--- trunk/examples/jms/src/org/jboss/jms/example/Sender.java 2009-02-20 12:12:32 UTC (rev 5911)
+++ trunk/examples/jms/src/org/jboss/jms/example/Sender.java 2009-02-20 14:38:51 UTC (rev 5912)
@@ -28,7 +28,12 @@
import javax.jms.MessageConsumer;
import javax.jms.TextMessage;
import javax.jms.Session;
+import javax.jms.MessageListener;
+import javax.jms.Message;
+import javax.jms.JMSException;
import javax.naming.InitialContext;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
/**
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
@@ -48,19 +53,33 @@
Queue temporaryQueue = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(temporaryQueue);
- TextMessage message = session.createTextMessage("Hello!");
- message.setJMSReplyTo(temporaryQueue);
- sender.send(message);
+ final CountDownLatch latch = new CountDownLatch(30);
+ consumer.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ try
+ {
+ TextMessage m = (TextMessage)message;
+ System.out.println("received " + m.getText());
+ latch.countDown();
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ });
+
connection.start();
- message = (TextMessage)consumer.receive(500000);
-
-
- if (message == null)
+ for(int i = 1; i <= 30; i++)
{
- throw new Exception("Have not received any reply. The example failed!");
+ TextMessage message = session.createTextMessage("Message " + i);
+ message.setJMSReplyTo(temporaryQueue);
+ sender.send(message);
}
-
+ latch.await(60, TimeUnit.SECONDS);
connection.close();
}
}
Modified: trunk/examples/messaging/src/org/jboss/messaging/example/SimpleClient.java
===================================================================
--- trunk/examples/messaging/src/org/jboss/messaging/example/SimpleClient.java 2009-02-20 12:12:32 UTC (rev 5911)
+++ trunk/examples/messaging/src/org/jboss/messaging/example/SimpleClient.java 2009-02-20 14:38:51 UTC (rev 5912)
@@ -30,6 +30,8 @@
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.jms.client.JBossMessage;
+import org.jboss.messaging.jms.client.JBossTextMessage;
/**
* Uses the core messaging API to send and receive a message to a queue.
@@ -48,7 +50,7 @@
clientSession = sessionFactory.createSession(false, true, true);
SimpleString queue = new SimpleString("queuejms.testQueue");
ClientProducer clientProducer = clientSession.createProducer(queue);
- ClientMessage message = clientSession.createClientMessage(false);
+ ClientMessage message = clientSession.createClientMessage(JBossTextMessage.TYPE, false);
message.getBody().putString("Hello!");
clientProducer.send(message);
ClientConsumer clientConsumer = clientSession.createConsumer(queue);
Modified: trunk/src/main/org/jboss/messaging/ra/JBMConnectionRequestInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/ra/JBMConnectionRequestInfo.java 2009-02-20 12:12:32 UTC (rev 5911)
+++ trunk/src/main/org/jboss/messaging/ra/JBMConnectionRequestInfo.java 2009-02-20 14:38:51 UTC (rev 5912)
@@ -32,6 +32,7 @@
*
* @author <a href="mailto:adrian at jboss.com">Adrian Brock</a>
* @author <a href="mailto:jesper.pedersen at jboss.org">Jesper Pedersen</a>
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
* @version $Revision: $
*/
public class JBMConnectionRequestInfo implements ConnectionRequestInfo
@@ -127,6 +128,7 @@
password = prop.getPassword();
if (clientID == null)
clientID = prop.getClientID();
+ useXA = prop.isUseXA();
}
/**
Modified: trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivation.java
===================================================================
--- trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivation.java 2009-02-20 12:12:32 UTC (rev 5911)
+++ trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivation.java 2009-02-20 14:38:51 UTC (rev 5912)
@@ -24,10 +24,13 @@
import org.jboss.messaging.jms.client.JBossConnectionFactory;
import org.jboss.messaging.ra.JBMResourceAdapter;
import org.jboss.messaging.ra.Util;
+import org.jboss.messaging.ra.JBMMessageListener;
import org.jboss.messaging.core.logging.Logger;
import java.lang.reflect.Method;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.List;
+import java.util.ArrayList;
import javax.jms.Connection;
import javax.jms.Destination;
@@ -54,6 +57,7 @@
*
* @author <a href="adrian at jboss.com">Adrian Brock</a>
* @author <a href="jesper.pedersen at jboss.org">Jesper Pedersen</a>
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
* @version $Revision: $
*/
public class JBMActivation implements ExceptionListener
@@ -95,11 +99,13 @@
protected boolean isDeliveryTransacted;
/** The message handler pool */
- protected JBMMessageHandlerPool pool;
+ //protected JBMMessageHandlerPool pool;
/** The TransactionManager */
protected TransactionManager tm;
-
+
+ private List<JBMMessageHandler> handlers = new ArrayList<JBMMessageHandler>();
+
static
{
try
@@ -345,8 +351,14 @@
if (ctx != null)
ctx.close();
}
- setupPool();
-
+ for(int i = 0; i < spec.getMaxSessionInt(); i++)
+ {
+ JBMMessageHandler handler = new JBMMessageHandler(this);
+ handler.setup();
+ handlers.add(handler);
+ }
+
+ connection.start();
log.debug("Setup complete " + this);
}
@@ -357,7 +369,10 @@
{
log.debug("Tearing down " + spec);
- teardownPool();
+ for (JBMMessageHandler handler : handlers)
+ {
+ handler.teardown();
+ }
teardownConnection();
teardownDestination();
@@ -589,7 +604,7 @@
* Setup the pool
* @throws Exception for any error
*/
- protected void setupPool() throws Exception
+ /*protected void setupPool() throws Exception
{
pool = new JBMMessageHandlerPool(this);
log.debug("Created pool " + pool);
@@ -601,12 +616,12 @@
log.debug("Starting delivery " + connection);
connection.start();
log.debug("Started delivery " + connection);
- }
-
+ }*/
+
/**
* Teardown the pool
*/
- protected void teardownPool()
+ /*protected void teardownPool()
{
try
{
@@ -634,7 +649,7 @@
log.debug("Error clearing the pool " + pool, t);
}
pool = null;
- }
+ }*/
/**
* Handles the setup
@@ -673,8 +688,8 @@
buffer.append(" destination=").append(destination);
if (connection != null)
buffer.append(" connection=").append(connection);
- if (pool != null)
- buffer.append(" pool=").append(pool.getClass().getName());
+ //if (pool != null)
+ //buffer.append(" pool=").append(pool.getClass().getName());
buffer.append(" transacted=").append(isDeliveryTransacted);
buffer.append(')');
return buffer.toString();
Modified: trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java 2009-02-20 12:12:32 UTC (rev 5911)
+++ trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java 2009-02-20 14:38:51 UTC (rev 5912)
@@ -1,8 +1,8 @@
/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2006, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
@@ -21,99 +21,63 @@
*/
package org.jboss.messaging.ra.inflow;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.jms.Connection;
+import org.jboss.messaging.core.logging.Logger;
+
import javax.jms.JMSException;
-import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.XASession;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
-import javax.jms.Session;
-import javax.jms.Topic;
+import javax.jms.Connection;
import javax.jms.XAConnection;
-import javax.jms.XASession;
-import javax.resource.spi.endpoint.MessageEndpoint;
-import javax.resource.spi.endpoint.MessageEndpointFactory;
-import javax.resource.spi.work.Work;
-import javax.resource.spi.work.WorkEvent;
-import javax.resource.spi.work.WorkException;
-import javax.resource.spi.work.WorkListener;
-import javax.transaction.Status;
+import javax.jms.Topic;
+import javax.jms.Message;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
+import javax.transaction.Status;
import javax.transaction.xa.XAResource;
-
-import org.jboss.messaging.core.logging.Logger;
-
+import javax.resource.spi.endpoint.MessageEndpoint;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
/**
* The message handler
- *
+ *
* @author <a href="adrian at jboss.com">Adrian Brock</a>
* @author <a href="mailto:jesper.pedersen at jboss.org">Jesper Pedersen</a>
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
* @version $Revision: $
*/
-public class JBMMessageHandler implements MessageListener, Work, WorkListener
+public class JBMMessageHandler implements MessageListener
{
/** The logger */
private static final Logger log = Logger.getLogger(JBMMessageHandler.class);
/** Trace enabled */
- private static boolean trace = log.isTraceEnabled();
+ private static boolean trace = log.isTraceEnabled();
- /** The message handler pool */
- private JBMMessageHandlerPool pool;
-
- /** Is in use */
- private AtomicBoolean inUse;
-
- /** Done latch */
- private CountDownLatch done;
-
- /** The transacted flag */
- private boolean transacted;
-
- /** The acknowledge mode */
- private int acknowledge;
-
/** The session */
private Session session;
/** Any XA session */
private XASession xaSession;
- /** The message consumer */
- private MessageConsumer messageConsumer;
-
/** The endpoint */
private MessageEndpoint endpoint;
- /** The transaction demarcation strategy */
- private TransactionDemarcationStrategy txnStrategy;
+ private final JBMActivation activation;
- /**
- * Constructor
- * @param pool The message handler pool
- */
- public JBMMessageHandler(JBMMessageHandlerPool pool)
- {
- if (trace)
- log.trace("constructor(" + pool + ")");
+ /** The transaction demarcation strategy factory */
+ private DemarcationStrategyFactory strategyFactory = new DemarcationStrategyFactory();
- this.pool = pool;
+ public JBMMessageHandler(JBMActivation activation)
+ {
+ this.activation = activation;
}
- /**
- * Setup the session
- */
public void setup() throws Exception
{
if (trace)
log.trace("setup()");
- inUse = new AtomicBoolean(false);
- done = new CountDownLatch(1);
-
- JBMActivation activation = pool.getActivation();
JBMActivationSpec spec = activation.getActivationSpec();
String selector = spec.getMessageSelector();
@@ -124,15 +88,16 @@
{
xaSession = ((XAConnection)connection).createXASession();
session = xaSession.getSession();
- }
+ }
else
{
- transacted = spec.isSessionTransacted();
- acknowledge = spec.getAcknowledgeModeInt();
+ boolean transacted = spec.isSessionTransacted();
+ int acknowledge = spec.getAcknowledgeModeInt();
session = connection.createSession(transacted, acknowledge);
}
// Create the message consumer
+ MessageConsumer messageConsumer;
if (activation.isTopic() && spec.isSubscriptionDurable())
{
Topic topic = (Topic) activation.getDestination();
@@ -140,11 +105,11 @@
if (selector == null || selector.trim().equals(""))
{
- messageConsumer = (MessageConsumer)session.createDurableSubscriber(topic, subscriptionName);
+ messageConsumer = session.createDurableSubscriber(topic, subscriptionName);
}
else
{
- messageConsumer = (MessageConsumer)session.createDurableSubscriber(topic, subscriptionName, selector, false);
+ messageConsumer = session.createDurableSubscriber(topic, subscriptionName, selector, false);
}
}
else
@@ -162,10 +127,10 @@
// Create the endpoint
MessageEndpointFactory endpointFactory = activation.getMessageEndpointFactory();
XAResource xaResource = null;
-
+
if (activation.isDeliveryTransacted() && xaSession != null)
xaResource = xaSession.getXAResource();
-
+
endpoint = endpointFactory.createEndpoint(xaResource);
// Set the message listener
@@ -179,12 +144,12 @@
{
if (trace)
log.trace("teardown()");
-
+
try
{
if (endpoint != null)
endpoint.release();
- }
+ }
catch (Throwable t)
{
log.debug("Error releasing endpoint " + endpoint, t);
@@ -212,18 +177,6 @@
}
/**
- * Is in use
- * @return True if in use; otherwise false
- */
- public boolean isInUse()
- {
- if (trace)
- log.trace("isInUse()");
-
- return inUse.get();
- }
-
- /**
* On message
* @param message The message
*/
@@ -232,17 +185,25 @@
if (trace)
log.trace("onMessage(" + message + ")");
- inUse.set(true);
-
+ TransactionDemarcationStrategy txnStrategy = strategyFactory.getStrategy();
try
{
+ txnStrategy.start();
+ }
+ catch (Throwable throwable)
+ {
+ log.warn("Unable to create transaction: " + throwable.getMessage());
+ txnStrategy = null;
+ }
+ try
+ {
endpoint.beforeDelivery(JBMActivation.ONMESSAGE);
try
{
MessageListener listener = (MessageListener) endpoint;
listener.onMessage(message);
- }
+ }
finally
{
endpoint.afterDelivery();
@@ -255,128 +216,14 @@
if (txnStrategy != null)
txnStrategy.error();
}
-
- inUse.set(false);
- done.countDown();
- }
-
- /**
- * Run
- */
- public void run()
- {
- if (trace)
- log.trace("run()");
-
- try
- {
- setup();
-
- txnStrategy = createTransactionDemarcation();
- }
- catch (Throwable t)
- {
- log.error("Error creating transaction demarcation. Cannot continue.");
- return;
- }
-
- try
- {
- // Wait for onMessage
- while (done.getCount() > 0)
- {
- try
- {
- done.await();
- }
- catch (InterruptedException ignore)
- {
- }
- }
- }
- catch (Throwable t)
- {
- if (txnStrategy != null)
- txnStrategy.error();
-
- }
finally
{
if (txnStrategy != null)
txnStrategy.end();
-
- txnStrategy = null;
}
}
/**
- * Release
- */
- public void release()
- {
- if (trace)
- log.trace("release()");
- }
-
- /**
- * Work accepted
- * @param e The work event
- */
- public void workAccepted(WorkEvent e)
- {
- if (trace)
- log.trace("workAccepted()");
- }
-
- /**
- * Work completed
- * @param e The work event
- */
- public void workCompleted(WorkEvent e)
- {
- if (trace)
- log.trace("workCompleted()");
-
- teardown();
- pool.removeHandler(this);
- }
-
- /**
- * Work rejected
- * @param e The work event
- */
- public void workRejected(WorkEvent e)
- {
- if (trace)
- log.trace("workRejected()");
-
- teardown();
- pool.removeHandler(this);
- }
-
- /**
- * Work started
- * @param e The work event
- */
- public void workStarted(WorkEvent e)
- {
- if (trace)
- log.trace("workStarted()");
- }
-
- /**
- * Create the transaction demarcation strategy
- * @return The strategy
- */
- private TransactionDemarcationStrategy createTransactionDemarcation()
- {
- if (trace)
- log.trace("createTransactionDemarcation()");
-
- return new DemarcationStrategyFactory().getStrategy();
- }
-
- /**
* Demarcation strategy factory
*/
private class DemarcationStrategyFactory
@@ -390,20 +237,17 @@
if (trace)
log.trace("getStrategy()");
- final JBMActivationSpec spec = pool.getActivation().getActivationSpec();
- final JBMActivation activation = pool.getActivation();
-
if (activation.isDeliveryTransacted())
{
try
{
return new XATransactionDemarcationStrategy();
- }
+ }
catch (Throwable t)
{
log.error(this + " error creating transaction demarcation ", t);
}
- }
+ }
else
{
return new LocalDemarcationStrategy();
@@ -418,6 +262,10 @@
*/
private interface TransactionDemarcationStrategy
{
+ /*
+ * Start
+ */
+ void start() throws Throwable;
/**
* Error
*/
@@ -434,6 +282,13 @@
*/
private class LocalDemarcationStrategy implements TransactionDemarcationStrategy
{
+ /*
+ * Start
+ */
+ public void start()
+ {
+ }
+
/**
* Error
*/
@@ -442,7 +297,7 @@
if (trace)
log.trace("error()");
- final JBMActivationSpec spec = pool.getActivation().getActivationSpec();
+ final JBMActivationSpec spec = activation.getActivationSpec();
if (spec.isSessionTransacted())
{
@@ -452,15 +307,15 @@
{
/*
* Looks strange, but this basically means
- *
+ *
* If the underlying connection was non-XA and the transaction
* attribute is REQUIRED we rollback. Also, if the underlying
* connection was non-XA and the transaction attribute is
* NOT_SUPPORT and the non standard redelivery behavior is
* enabled we rollback to force redelivery.
- *
+ *
*/
- if (pool.getActivation().isDeliveryTransacted() || spec.getRedeliverUnspecified())
+ if (activation.isDeliveryTransacted() || spec.getRedeliverUnspecified())
{
session.rollback();
}
@@ -480,7 +335,7 @@
if (trace)
log.trace("error()");
- final JBMActivationSpec spec = pool.getActivation().getActivationSpec();
+ final JBMActivationSpec spec = activation.getActivationSpec();
if (spec.isSessionTransacted())
{
@@ -504,11 +359,11 @@
private class XATransactionDemarcationStrategy implements TransactionDemarcationStrategy
{
private Transaction trans = null;
- private TransactionManager tm = pool.getActivation().getTransactionManager();
+ private TransactionManager tm = activation.getTransactionManager();
- public XATransactionDemarcationStrategy() throws Throwable
+ public void start() throws Throwable
{
- final int timeout = pool.getActivation().getActivationSpec().getTransactionTimeout();
+ final int timeout = activation.getActivationSpec().getTransactionTimeout();
if (timeout > 0)
{
@@ -538,13 +393,13 @@
if (trace)
log.trace(this + " XAResource '" + res + " enlisted.");
}
- }
+ }
catch (Throwable t)
{
try
{
tm.rollback();
- }
+ }
catch (Throwable ignored)
{
log.trace(this + " ignored error rolling back after failed enlist", ignored);
@@ -562,7 +417,7 @@
log.trace(this + " using TM to mark TX for rollback tx=" + trans);
trans.setRollbackOnly();
- }
+ }
catch (Throwable t)
{
log.error(this + " failed to set rollback only", t);
@@ -590,7 +445,7 @@
// NO XASession? then manually rollback.
// This is not so good but
// it's the best we can do if we have no XASession.
- if (xaSession == null && pool.getActivation().isDeliveryTransacted())
+ if (xaSession == null && activation.isDeliveryTransacted())
{
session.rollback();
}
@@ -608,17 +463,17 @@
// NO XASession? then manually commit. This is not so good but
// it's the best we can do if we have no XASession.
- if (xaSession == null && pool.getActivation().isDeliveryTransacted())
+ if (xaSession == null && activation.isDeliveryTransacted())
{
session.commit();
}
- }
+ }
else
{
tm.suspend();
- if (xaSession == null && pool.getActivation().isDeliveryTransacted())
+ if (xaSession == null && activation.isDeliveryTransacted())
{
session.rollback();
}
@@ -630,3 +485,4 @@
}
}
}
+
Deleted: trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandlerPool.java
===================================================================
--- trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandlerPool.java 2009-02-20 12:12:32 UTC (rev 5911)
+++ trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandlerPool.java 2009-02-20 14:38:51 UTC (rev 5912)
@@ -1,244 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2009, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.ra.inflow;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.resource.spi.work.WorkManager;
-
-import org.jboss.messaging.core.logging.Logger;
-
-/**
- * The message handler pool
- *
- * @author <a href="adrian at jboss.com">Adrian Brock</a>
- * @author <a href="jesper.pedersen at jboss.org">Jesper Pedersen</a>
- * @version $Revision: $
- */
-public class JBMMessageHandlerPool
-{
- /** The logger */
- private static final Logger log = Logger.getLogger(JBMMessageHandlerPool.class);
-
- /** Trace enabled */
- private static boolean trace = log.isTraceEnabled();
-
- /** The activation */
- private JBMActivation activation;
-
- /** The active sessions */
- private ArrayList<JBMMessageHandler> activeSessions;
-
- /** Whether the pool is stopped */
- private AtomicBoolean stopped;
-
- /**
- * Constructor
- * @param activation The activation
- */
- public JBMMessageHandlerPool(JBMActivation activation)
- {
- if (trace)
- log.trace("constructor(" + activation + ")");
-
- this.activation = activation;
- this.activeSessions = new ArrayList<JBMMessageHandler>();
- this.stopped = new AtomicBoolean(false);
- }
-
- /**
- * Get the activation
- * @return The value
- */
- public JBMActivation getActivation()
- {
- if (trace)
- log.trace("getActivation()");
-
- return activation;
- }
-
- /**
- * Start the pool
- * @exception Exception Thrown if an error occurs
- */
- public void start() throws Exception
- {
- if (trace)
- log.trace("start()");
-
- setupSessions();
- }
-
- /**
- * Stop the server session pool
- */
- public void stop()
- {
- if (trace)
- log.trace("stop()");
-
- // Disallow any new sessions
- stopped.set(true);
-
- teardownSessions();
- }
-
- /**
- * Remove message handler
- * @param handler The handler
- */
- protected void removeHandler(JBMMessageHandler handler)
- {
- if (trace)
- log.trace("removeHandler(" + handler + ")");
-
- synchronized (activeSessions)
- {
- activeSessions.remove(handler);
-
- if (!stopped.get())
- {
- try
- {
- setupSession();
- }
- catch (Exception e)
- {
- log.error("Unable to restart handler", e);
- }
- }
- activeSessions.notifyAll();
- }
- }
-
- /**
- * Starts the sessions
- * @exception Exception Thrown if an error occurs
- */
- protected void setupSessions() throws Exception
- {
- if (trace)
- log.trace("setupSessions()");
-
- JBMActivationSpec spec = activation.getActivationSpec();
-
- // Create the sessions
- synchronized (activeSessions)
- {
- for (int i = 0; i < spec.getMaxSessionInt(); ++i)
- {
- setupSession();
- }
- }
- }
-
- /**
- * Setup a session
- * @exception Exception Thrown if an error occurs
- */
- protected void setupSession() throws Exception
- {
- if (trace)
- log.trace("setupSession()");
-
- WorkManager workManager = activation.getWorkManager();
-
- // Create the session
- JBMMessageHandler handler = new JBMMessageHandler(this);
- workManager.scheduleWork(handler, 0, null, handler);
-
- activeSessions.add(handler);
- }
-
- /**
- * Stop the sessions
- */
- protected void teardownSessions()
- {
- if (trace)
- log.trace("teardownSessions()");
-
- synchronized (activeSessions)
- {
- List<JBMMessageHandler> cloned = (List<JBMMessageHandler>)activeSessions.clone();
- for (int i = 0; i < cloned.size(); ++i)
- {
- JBMMessageHandler handler = cloned.get(i);
- if (!handler.isInUse())
- {
- handler.teardown();
- activeSessions.remove(handler);
- }
- }
-
- activeSessions.notifyAll();
- }
-
- synchronized (activeSessions)
- {
- if (activation.getActivationSpec().isForceClearOnShutdown())
- {
- int attempts = 0;
- int forceClearAttempts = activation.getActivationSpec().getForceClearAttempts();
- long forceClearInterval = activation.getActivationSpec().getForceClearOnShutdownInterval();
-
- if (trace)
- log.trace(this + " force clear behavior in effect. Waiting for " + forceClearInterval + " milliseconds for " + forceClearAttempts + " attempts.");
-
- while((activeSessions.size() > 0) && (attempts < forceClearAttempts))
- {
- try
- {
- int currentSessions = activeSessions.size();
- activeSessions.wait(forceClearInterval);
- // Number of session didn't change
- if (activeSessions.size() == currentSessions)
- {
- ++attempts;
- log.trace(this + " clear attempt failed " + attempts);
- }
- }
- catch(InterruptedException ignore)
- {
- }
- }
- }
- else
- {
- // Wait for inuse sessions
- while (activeSessions.size() > 0)
- {
- try
- {
- activeSessions.wait();
- }
- catch (InterruptedException ignore)
- {
- }
- }
- }
- }
- }
-}
More information about the jboss-cvs-commits
mailing list