[jboss-cvs] JBoss Messaging SVN: r5840 - in trunk/src: main/org/jboss/messaging/ra/inflow and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Feb 9 13:58:14 EST 2009
Author: jesper.pedersen
Date: 2009-02-09 13:58:14 -0500 (Mon, 09 Feb 2009)
New Revision: 5840
Added:
trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java
trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandlerPool.java
Modified:
trunk/src/config/ra.xml
trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivation.java
Log:
[JBMESSAGING-1367] Create JCA resource adapter for JBM 2.0
Modified: trunk/src/config/ra.xml
===================================================================
--- trunk/src/config/ra.xml 2009-02-09 05:32:16 UTC (rev 5839)
+++ trunk/src/config/ra.xml 2009-02-09 18:58:14 UTC (rev 5840)
@@ -250,5 +250,19 @@
<reauthentication-support>false</reauthentication-support>
</outbound-resourceadapter>
+ <inbound-resourceadapter>
+ <messageadapter>
+ <messagelistener>
+ <messagelistener-type>javax.jms.MessageListener</messagelistener-type>
+ <activationspec>
+ <activationspec-class>org.jboss.messaging.ra.inflow.JBMActivationSpec</activationspec-class>
+ <required-config-property>
+ <config-property-name>destination</config-property-name>
+ </required-config-property>
+ </activationspec>
+ </messagelistener>
+ </messageadapter>
+ </inbound-resourceadapter>
+
</resourceadapter>
</connector>
Modified: trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivation.java
===================================================================
--- trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivation.java 2009-02-09 05:32:16 UTC (rev 5839)
+++ trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivation.java 2009-02-09 18:58:14 UTC (rev 5840)
@@ -94,6 +94,9 @@
/** Is the delivery transacted */
protected boolean isDeliveryTransacted;
+ /** The message handler pool */
+ protected JBMMessageHandlerPool pool;
+
/** The TransactionManager */
protected TransactionManager tm;
@@ -342,7 +345,7 @@
if (ctx != null)
ctx.close();
}
- setupSessionPool();
+ setupPool();
log.debug("Setup complete " + this);
}
@@ -354,7 +357,7 @@
{
log.debug("Tearing down " + spec);
- teardownSessionPool();
+ teardownPool();
teardownConnection();
teardownDestination();
@@ -583,20 +586,27 @@
}
/**
- * Setup the server session pool
+ * Setup the pool
* @throws Exception for any error
*/
- protected void setupSessionPool() throws Exception
+ protected void setupPool() throws Exception
{
+ pool = new JBMMessageHandlerPool(this);
+ log.debug("Created pool " + pool);
+
+ log.debug("Starting pool " + pool);
+ pool.start();
+ log.debug("Started pool " + pool);
+
log.debug("Starting delivery " + connection);
connection.start();
log.debug("Started delivery " + connection);
}
/**
- * Teardown the server session pool
+ * Teardown the pool
*/
- protected void teardownSessionPool()
+ protected void teardownPool()
{
try
{
@@ -610,6 +620,20 @@
{
log.debug("Error stopping delivery " + connection, t);
}
+
+ try
+ {
+ if (pool != null)
+ {
+ log.debug("Stopping the pool " + pool);
+ pool.stop();
+ }
+ }
+ catch (Throwable t)
+ {
+ log.debug("Error clearing the pool " + pool, t);
+ }
+ pool = null;
}
/**
@@ -649,6 +673,8 @@
buffer.append(" destination=").append(destination);
if (connection != null)
buffer.append(" connection=").append(connection);
+ if (pool != null)
+ buffer.append(" pool=").append(pool.getClass().getName());
buffer.append(" transacted=").append(isDeliveryTransacted);
buffer.append(')');
return buffer.toString();
Added: trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java 2009-02-09 18:58:14 UTC (rev 5840)
@@ -0,0 +1,515 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.ra.inflow;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.XAConnection;
+import javax.jms.XASession;
+import javax.resource.spi.endpoint.MessageEndpoint;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.transaction.Status;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
+
+import org.jboss.messaging.core.logging.Logger;
+
+/**
+ * The message handler
+ *
+ * @author <a href="adrian at jboss.com">Adrian Brock</a>
+ * @author <a href="mailto:jesper.pedersen at jboss.org">Jesper Pedersen</a>
+ * @version $Revision: $
+ */
+public class JBMMessageHandler implements MessageListener
+{
+ /** The logger */
+ private static final Logger log = Logger.getLogger(JBMMessageHandler.class);
+
+ /** Trace enabled */
+ private static boolean trace = log.isTraceEnabled();
+
+ /** The message handler pool */
+ private JBMMessageHandlerPool pool;
+
+ /** The transacted flag */
+ private boolean transacted;
+
+ /** The acknowledge mode */
+ private int acknowledge;
+
+ /** The session */
+ private Session session;
+
+ /** Any XA session */
+ private XASession xaSession;
+
+ /** The message consumer */
+ private MessageConsumer messageConsumer;
+
+ /** The endpoint */
+ private MessageEndpoint endpoint;
+
+ /** The transaction demarcation strategy */
+ private TransactionDemarcationStrategy txnStrategy;
+
+ /**
+ * Constructor
+ * @param pool The message handler pool
+ */
+ public JBMMessageHandler(JBMMessageHandlerPool pool)
+ {
+ if (trace)
+ log.trace("constructor(" + pool + ")");
+
+ this.pool = pool;
+ }
+
+ /**
+ * Setup the session
+ */
+ public void setup() throws Exception
+ {
+ if (trace)
+ log.trace("setup()");
+
+ JBMActivation activation = pool.getActivation();
+ JBMActivationSpec spec = activation.getActivationSpec();
+ String selector = spec.getMessageSelector();
+
+ Connection connection = activation.getConnection();
+
+ // Create the session
+ if (activation.isDeliveryTransacted())
+ {
+ xaSession = ((XAConnection)connection).createXASession();
+ session = xaSession.getSession();
+ }
+ else
+ {
+ transacted = spec.isSessionTransacted();
+ acknowledge = spec.getAcknowledgeModeInt();
+ session = connection.createSession(transacted, acknowledge);
+ }
+
+ // Create the message consumer
+ if (activation.isTopic() && spec.isSubscriptionDurable())
+ {
+ Topic topic = (Topic) activation.getDestination();
+ String subscriptionName = spec.getSubscriptionName();
+
+ if (selector == null || selector.trim().equals(""))
+ {
+ messageConsumer = (MessageConsumer)session.createDurableSubscriber(topic, subscriptionName);
+ }
+ else
+ {
+ messageConsumer = (MessageConsumer)session.createDurableSubscriber(topic, subscriptionName, selector, false);
+ }
+ }
+ else
+ {
+ if (selector == null || selector.trim().equals(""))
+ {
+ messageConsumer = session.createConsumer(activation.getDestination());
+ }
+ else
+ {
+ messageConsumer = session.createConsumer(activation.getDestination(), selector);
+ }
+ }
+
+ // Create the endpoint
+ MessageEndpointFactory endpointFactory = activation.getMessageEndpointFactory();
+ XAResource xaResource = null;
+
+ if (activation.isDeliveryTransacted() && xaSession != null)
+ xaResource = xaSession.getXAResource();
+
+ endpoint = endpointFactory.createEndpoint(xaResource);
+
+ // Set the message listener
+ messageConsumer.setMessageListener(this);
+ }
+
+ /**
+ * Stop the handler
+ */
+ public void teardown()
+ {
+ if (trace)
+ log.trace("teardown()");
+
+ try
+ {
+ if (endpoint != null)
+ endpoint.release();
+ }
+ catch (Throwable t)
+ {
+ log.debug("Error releasing endpoint " + endpoint, t);
+ }
+
+ try
+ {
+ if (xaSession != null)
+ xaSession.close();
+ }
+ catch (Throwable t)
+ {
+ log.debug("Error releasing xaSession " + xaSession, t);
+ }
+
+ try
+ {
+ if (session != null)
+ session.close();
+ }
+ catch (Throwable t)
+ {
+ log.debug("Error releasing session " + session, t);
+ }
+ }
+
+ /**
+ * On message
+ * @param message The message
+ */
+ public void onMessage(Message message)
+ {
+ if (trace)
+ log.trace("onMessage(" + message + ")");
+
+ try
+ {
+ txnStrategy = createTransactionDemarcation();
+ }
+ catch (Throwable t)
+ {
+ log.error("Error creating transaction demarcation. Cannot continue.");
+ return;
+ }
+
+ try
+ {
+ endpoint.beforeDelivery(JBMActivation.ONMESSAGE);
+
+ try
+ {
+ MessageListener listener = (MessageListener) endpoint;
+ listener.onMessage(message);
+ }
+ finally
+ {
+ endpoint.afterDelivery();
+ }
+ }
+ catch (Throwable t)
+ {
+ log.error("Unexpected error delivering message " + message, t);
+
+ if (txnStrategy != null)
+ txnStrategy.error();
+ }
+ finally
+ {
+ if (txnStrategy != null)
+ txnStrategy.end();
+
+ txnStrategy = null;
+ }
+
+ pool.removeHandler(this);
+ }
+
+ /**
+ * Create the transaction demarcation strategy
+ * @return The strategy
+ */
+ private TransactionDemarcationStrategy createTransactionDemarcation()
+ {
+ if (trace)
+ log.trace("createTransactionDemarcation()");
+
+ return new DemarcationStrategyFactory().getStrategy();
+ }
+
+ /**
+ * Demarcation strategy factory
+ */
+ private class DemarcationStrategyFactory
+ {
+ /**
+ * Get the transaction demarcation strategy
+ * @return The strategy
+ */
+ TransactionDemarcationStrategy getStrategy()
+ {
+ if (trace)
+ log.trace("getStrategy()");
+
+ final JBMActivationSpec spec = pool.getActivation().getActivationSpec();
+ final JBMActivation activation = pool.getActivation();
+
+ if (activation.isDeliveryTransacted())
+ {
+ try
+ {
+ return new XATransactionDemarcationStrategy();
+ }
+ catch (Throwable t)
+ {
+ log.error(this + " error creating transaction demarcation ", t);
+ }
+ }
+ else
+ {
+ return new LocalDemarcationStrategy();
+ }
+
+ return null;
+ }
+ }
+
+ /**
+ * Transaction demarcation strategy
+ */
+ private interface TransactionDemarcationStrategy
+ {
+ /**
+ * Error
+ */
+ void error();
+
+ /**
+ * End
+ */
+ void end();
+ }
+
+ /**
+ * Local demarcation strategy
+ */
+ private class LocalDemarcationStrategy implements TransactionDemarcationStrategy
+ {
+ /**
+ * Error
+ */
+ public void error()
+ {
+ if (trace)
+ log.trace("error()");
+
+ final JBMActivationSpec spec = pool.getActivation().getActivationSpec();
+
+ if (spec.isSessionTransacted())
+ {
+ if (session != null)
+ {
+ try
+ {
+ /*
+ * Looks strange, but this basically means
+ *
+ * If the underlying connection was non-XA and the transaction
+ * attribute is REQUIRED we rollback. Also, if the underlying
+ * connection was non-XA and the transaction attribute is
+ * NOT_SUPPORT and the non standard redelivery behavior is
+ * enabled we rollback to force redelivery.
+ *
+ */
+ if (pool.getActivation().isDeliveryTransacted() || spec.getRedeliverUnspecified())
+ {
+ session.rollback();
+ }
+ } catch (JMSException e)
+ {
+ log.error("Failed to rollback session transaction", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * End
+ */
+ public void end()
+ {
+ if (trace)
+ log.trace("error()");
+
+ final JBMActivationSpec spec = pool.getActivation().getActivationSpec();
+
+ if (spec.isSessionTransacted())
+ {
+ if (session != null)
+ {
+ try
+ {
+ session.commit();
+ } catch (JMSException e)
+ {
+ log.error("Failed to commit session transaction", e);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * XA demarcation strategy
+ */
+ private class XATransactionDemarcationStrategy implements TransactionDemarcationStrategy
+ {
+ private Transaction trans = null;
+ private TransactionManager tm = pool.getActivation().getTransactionManager();;
+
+ public XATransactionDemarcationStrategy() throws Throwable
+ {
+ final int timeout = pool.getActivation().getActivationSpec().getTransactionTimeout();
+
+ if (timeout > 0)
+ {
+ if (trace)
+ log.trace("Setting transactionTimeout for JMSSessionPool to " + timeout);
+
+ tm.setTransactionTimeout(timeout);
+ }
+
+ tm.begin();
+
+ try
+ {
+ trans = tm.getTransaction();
+
+ if (trace)
+ log.trace(this + " using tx=" + trans);
+
+ if (xaSession != null)
+ {
+ XAResource res = xaSession.getXAResource();
+
+ if (!trans.enlistResource(res))
+ {
+ throw new JMSException("could not enlist resource");
+ }
+ if (trace)
+ log.trace(this + " XAResource '" + res + " enlisted.");
+ }
+ }
+ catch (Throwable t)
+ {
+ try
+ {
+ tm.rollback();
+ }
+ catch (Throwable ignored)
+ {
+ log.trace(this + " ignored error rolling back after failed enlist", ignored);
+ }
+ throw t;
+ }
+ }
+
+ public void error()
+ {
+ // Mark for tollback TX via TM
+ try
+ {
+ if (trace)
+ log.trace(this + " using TM to mark TX for rollback tx=" + trans);
+
+ trans.setRollbackOnly();
+ }
+ catch (Throwable t)
+ {
+ log.error(this + " failed to set rollback only", t);
+ }
+ }
+
+ public void end()
+ {
+ try
+ {
+ // Use the TM to commit the Tx (assert the correct association)
+ Transaction currentTx = tm.getTransaction();
+ if (trans.equals(currentTx) == false)
+ throw new IllegalStateException("Wrong tx association: expected " + trans + " was " + currentTx);
+
+ // Marked rollback
+ if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK)
+ {
+ if (trace)
+ log.trace(this + " rolling back JMS transaction tx=" + trans);
+
+ // Actually roll it back
+ tm.rollback();
+
+ // NO XASession? then manually rollback.
+ // This is not so good but
+ // it's the best we can do if we have no XASession.
+ if (xaSession == null && pool.getActivation().isDeliveryTransacted())
+ {
+ session.rollback();
+ }
+ }
+ else if (trans.getStatus() == Status.STATUS_ACTIVE)
+ {
+ // Commit tx
+ // This will happen if
+ // a) everything goes well
+ // b) app. exception was thrown
+ if (trace)
+ log.trace(this + " commiting the JMS transaction tx=" + trans);
+
+ tm.commit();
+
+ // NO XASession? then manually commit. This is not so good but
+ // it's the best we can do if we have no XASession.
+ if (xaSession == null && pool.getActivation().isDeliveryTransacted())
+ {
+ session.commit();
+ }
+
+ }
+ else
+ {
+ tm.suspend();
+
+ if (xaSession == null && pool.getActivation().isDeliveryTransacted())
+ {
+ session.rollback();
+ }
+ }
+ } catch (Throwable t)
+ {
+ log.error(this + " failed to commit/rollback", t);
+ }
+ }
+ }
+}
Added: trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandlerPool.java
===================================================================
--- trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandlerPool.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandlerPool.java 2009-02-09 18:58:14 UTC (rev 5840)
@@ -0,0 +1,224 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.ra.inflow;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.jboss.messaging.core.logging.Logger;
+
+/**
+ * The message handler pool
+ *
+ * @author <a href="adrian at jboss.com">Adrian Brock</a>
+ * @author <a href="jesper.pedersen at jboss.org">Jesper Pedersen</a>
+ * @version $Revision: $
+ */
+public class JBMMessageHandlerPool
+{
+ /** The logger */
+ private static final Logger log = Logger.getLogger(JBMMessageHandlerPool.class);
+
+ /** Trace enabled */
+ private static boolean trace = log.isTraceEnabled();
+
+ /** The activation */
+ private JBMActivation activation;
+
+ /** The active sessions */
+ private List<JBMMessageHandler> activeSessions;
+
+ /** Whether the pool is stopped */
+ private AtomicBoolean stopped;
+
+ /**
+ * Constructor
+ * @param activation The activation
+ */
+ public JBMMessageHandlerPool(JBMActivation activation)
+ {
+ if (trace)
+ log.trace("constructor(" + activation + ")");
+
+ this.activation = activation;
+ this.activeSessions = new ArrayList<JBMMessageHandler>();
+ this.stopped = new AtomicBoolean(false);
+ }
+
+ /**
+ * Get the activation
+ * @return The value
+ */
+ public JBMActivation getActivation()
+ {
+ if (trace)
+ log.trace("getActivation()");
+
+ return activation;
+ }
+
+ /**
+ * Start the pool
+ * @exception Exception Thrown if an error occurs
+ */
+ public void start() throws Exception
+ {
+ if (trace)
+ log.trace("start()");
+
+ setupSessions();
+ }
+
+ /**
+ * Stop the server session pool
+ */
+ public void stop()
+ {
+ if (trace)
+ log.trace("stop()");
+
+ // Disallow any new sessions
+ stopped.set(true);
+
+ teardownSessions();
+ }
+
+ /**
+ * Remove message handler
+ * @param handler The handler
+ */
+ protected void removeHandler(JBMMessageHandler handler)
+ {
+ if (trace)
+ log.trace("removeHandler(" + handler + ")");
+
+ synchronized (activeSessions)
+ {
+ activeSessions.remove(handler);
+
+ if (!stopped.get())
+ {
+ try
+ {
+ setupSession();
+ }
+ catch (Exception e)
+ {
+ log.error("Unable to restart handler", e);
+ }
+ }
+ }
+ activeSessions.notifyAll();
+ }
+
+ /**
+ * Starts the sessions
+ * @exception Exception Thrown if an error occurs
+ */
+ protected void setupSessions() throws Exception
+ {
+ if (trace)
+ log.trace("setupSessions()");
+
+ JBMActivationSpec spec = activation.getActivationSpec();
+
+ // Create the sessions
+ synchronized (activeSessions)
+ {
+ for (int i = 0; i < spec.getMaxSessionInt(); ++i)
+ {
+ setupSession();
+ }
+ }
+ }
+
+ /**
+ * Setup a session
+ * @exception Exception Thrown if an error occurs
+ */
+ protected void setupSession() throws Exception
+ {
+ if (trace)
+ log.trace("setupSession()");
+
+ // Create the session
+ JBMMessageHandler handler = new JBMMessageHandler(this);
+ handler.setup();
+
+ activeSessions.add(handler);
+ }
+
+ /**
+ * Stop the sessions
+ */
+ protected void teardownSessions()
+ {
+ if (trace)
+ log.trace("teardownSessions()");
+
+ synchronized (activeSessions)
+ {
+ if (activation.getActivationSpec().isForceClearOnShutdown())
+ {
+ int attempts = 0;
+ int forceClearAttempts = activation.getActivationSpec().getForceClearAttempts();
+ long forceClearInterval = activation.getActivationSpec().getForceClearOnShutdownInterval();
+
+ if (trace)
+ log.trace(this + " force clear behavior in effect. Waiting for " + forceClearInterval + " milliseconds for " + forceClearAttempts + " attempts.");
+
+ while((activeSessions.size() > 0) && (attempts < forceClearAttempts))
+ {
+ try
+ {
+ int currentSessions = activeSessions.size();
+ activeSessions.wait(forceClearInterval);
+ // Number of session didn't change
+ if (activeSessions.size() == currentSessions)
+ {
+ ++attempts;
+ log.trace(this + " clear attempt failed " + attempts);
+ }
+ }
+ catch(InterruptedException ignore)
+ {
+ }
+ }
+ }
+ else
+ {
+ // Wait for inuse sessions
+ while (activeSessions.size() > 0)
+ {
+ try
+ {
+ activeSessions.wait();
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+ }
+ }
+ }
+ }
+}
More information about the jboss-cvs-commits
mailing list