[jboss-cvs] JBossAS SVN: r66023 - in trunk/server/src/main/org/jboss/ejb/plugins: jms and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Oct 11 08:19:57 EDT 2007
Author: adrian at jboss.org
Date: 2007-10-11 08:19:57 -0400 (Thu, 11 Oct 2007)
New Revision: 66023
Removed:
trunk/server/src/main/org/jboss/ejb/plugins/jms/DLQHandler.java
trunk/server/src/main/org/jboss/ejb/plugins/jms/SecurityActions.java
Modified:
trunk/server/src/main/org/jboss/ejb/plugins/inflow/GetTCLAction.java
trunk/server/src/main/org/jboss/ejb/plugins/inflow/JBossJMSMessageEndpointFactory.java
trunk/server/src/main/org/jboss/ejb/plugins/inflow/JBossMessageEndpointFactory.java
trunk/server/src/main/org/jboss/ejb/plugins/inflow/JBossMessageEndpointFactoryMBean.java
trunk/server/src/main/org/jboss/ejb/plugins/inflow/MessageEndpointInterceptor.java
trunk/server/src/main/org/jboss/ejb/plugins/inflow/SetTCLAction.java
trunk/server/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvoker.java
trunk/server/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvokerMBean.java
Log:
[JBAS-4517] - Convert the old JMSContainerInvoker to use the JCA1.5 inbound resource adapter
Modified: trunk/server/src/main/org/jboss/ejb/plugins/inflow/GetTCLAction.java
===================================================================
--- trunk/server/src/main/org/jboss/ejb/plugins/inflow/GetTCLAction.java 2007-10-11 12:11:24 UTC (rev 66022)
+++ trunk/server/src/main/org/jboss/ejb/plugins/inflow/GetTCLAction.java 2007-10-11 12:19:57 UTC (rev 66023)
@@ -1,24 +1,24 @@
/*
-* JBoss, Home of Professional Open Source
-* Copyright 2005, JBoss Inc., and individual contributors as indicated
-* by the @authors tag. See the copyright.txt in the distribution for a
-* full listing of individual contributors.
-*
-* This is free software; you can redistribute it and/or modify it
-* under the terms of the GNU Lesser General Public License as
-* published by the Free Software Foundation; either version 2.1 of
-* the License, or (at your option) any later version.
-*
-* This software is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this software; if not, write to the Free
-* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-*/
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
package org.jboss.ejb.plugins.inflow;
import java.security.AccessController;
@@ -28,16 +28,16 @@
* @author Scott.Stark at jboss.org
* @version $Revison:$
*/
-public class GetTCLAction implements PrivilegedAction
+public class GetTCLAction implements PrivilegedAction<ClassLoader>
{
- static PrivilegedAction ACTION = new GetTCLAction(null);
+ static PrivilegedAction<ClassLoader> ACTION = new GetTCLAction(null);
Thread t;
GetTCLAction(Thread t)
{
this.t = t;
}
- public Object run()
+ public ClassLoader run()
{
Thread thread = t;
if (thread == null)
@@ -48,13 +48,13 @@
static ClassLoader getContextClassLoader()
{
- ClassLoader loader = (ClassLoader) AccessController.doPrivileged(ACTION);
+ ClassLoader loader = AccessController.doPrivileged(ACTION);
return loader;
}
static ClassLoader getContextClassLoader(Thread t)
{
GetTCLAction action = new GetTCLAction(t);
- ClassLoader loader = (ClassLoader) AccessController.doPrivileged(action);
+ ClassLoader loader = AccessController.doPrivileged(action);
return loader;
}
Modified: trunk/server/src/main/org/jboss/ejb/plugins/inflow/JBossJMSMessageEndpointFactory.java
===================================================================
--- trunk/server/src/main/org/jboss/ejb/plugins/inflow/JBossJMSMessageEndpointFactory.java 2007-10-11 12:11:24 UTC (rev 66022)
+++ trunk/server/src/main/org/jboss/ejb/plugins/inflow/JBossJMSMessageEndpointFactory.java 2007-10-11 12:19:57 UTC (rev 66023)
@@ -1,26 +1,27 @@
/*
-* JBoss, Home of Professional Open Source
-* Copyright 2005, JBoss Inc., and individual contributors as indicated
-* by the @authors tag. See the copyright.txt in the distribution for a
-* full listing of individual contributors.
-*
-* This is free software; you can redistribute it and/or modify it
-* under the terms of the GNU Lesser General Public License as
-* published by the Free Software Foundation; either version 2.1 of
-* the License, or (at your option) any later version.
-*
-* This software is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this software; if not, write to the Free
-* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-*/
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
package org.jboss.ejb.plugins.inflow;
+import javax.jms.MessageListener;
import javax.jms.Session;
import org.jboss.deployment.DeploymentException;
@@ -35,32 +36,29 @@
* @author <a href="mailto:adrian at jboss.com">Adrian Brock</a> .
* @version <tt>$Revision$</tt>
*/
-public class JBossJMSMessageEndpointFactory
- extends JBossMessageEndpointFactory
+public class JBossJMSMessageEndpointFactory extends JBossMessageEndpointFactory
{
- // Constants -----------------------------------------------------
-
/** The JBoss resource adapter deployment name */
protected static String jmsra = "jms-ra.rar";
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Protected -----------------------------------------------------
protected String resolveResourceAdapterName() throws DeploymentException
{
+ // No resource adapter specified assume jms
String result = super.resolveResourceAdapterName();
if (result == null)
result = jmsra;
return result;
}
+ protected void resolveMessageListener() throws DeploymentException
+ {
+ // No messaging type use jms
+ if (metaData.getMessagingType() == null)
+ messagingTypeClass = MessageListener.class;
+ else
+ super.resolveMessageListener();
+ }
+
/**
* Add activation config properties
*
@@ -71,21 +69,25 @@
super.augmentActivationConfigProperties();
// Hack for old style deployments (jms)
- if (metaData.isJMSMessagingType())
+ if (messagingTypeClass.equals(MessageListener.class))
{
- checkActivationConfig("destination", metaData.getDestinationJndiName());
- checkActivationConfig("destinationType", metaData.getDestinationType());
- checkActivationConfig("messageSelector", metaData.getMessageSelector());
- if (Session.DUPS_OK_ACKNOWLEDGE == metaData.getAcknowledgeMode())
- checkActivationConfig("acknowledgeMode", "DUPS_OK_ACKNOWLEDGE");
- else
- checkActivationConfig("acknowledgeMode", "AUTO_ACKNOWLEDGE");
- if (MessageDrivenMetaData.DURABLE_SUBSCRIPTION == metaData.getSubscriptionDurability())
- checkActivationConfig("subscriptionDurability", "Durable");
- else
- checkActivationConfig("subscriptionDurability", "NonDurable");
- checkActivationConfig("clientID", metaData.getClientId());
- checkActivationConfig("subscriptionName", metaData.getSubscriptionId());
+ // Old style deployment
+ if (metaData.getMessagingType() == null)
+ {
+ checkActivationConfig("destination", metaData.getDestinationJndiName());
+ checkActivationConfig("destinationType", metaData.getDestinationType());
+ checkActivationConfig("messageSelector", metaData.getMessageSelector());
+ if (Session.DUPS_OK_ACKNOWLEDGE == metaData.getAcknowledgeMode())
+ checkActivationConfig("acknowledgeMode", "DUPS_OK_ACKNOWLEDGE");
+ else
+ checkActivationConfig("acknowledgeMode", "AUTO_ACKNOWLEDGE");
+ if (MessageDrivenMetaData.DURABLE_SUBSCRIPTION == metaData.getSubscriptionDurability())
+ checkActivationConfig("subscriptionDurability", "Durable");
+ else
+ checkActivationConfig("subscriptionDurability", "NonDurable");
+ checkActivationConfig("clientID", metaData.getClientId());
+ checkActivationConfig("subscriptionName", metaData.getSubscriptionId());
+ }
// Only for JBoss's resource adapter
if (jmsra.equals(resourceAdapterName))
@@ -100,10 +102,45 @@
Element mdbConfig = MetaData.getOptionalChild(proxyConfig, "MDBConfig");
if (mdbConfig != null)
{
+ try
+ {
+ if ("false".equalsIgnoreCase(MetaData.getElementContent(MetaData.getUniqueChild(mdbConfig, "DeliveryActive"))))
+ {
+ setDeliveryActive(false);
+ }
+ }
+ catch (Exception ignore)
+ {
+ }
+
checkActivationConfig("reconnectInterval", MetaData.getOptionalChildContent(proxyConfig, "ReconnectIntervalSec"));
checkActivationConfig("deliveryActive", MetaData.getOptionalChildContent(proxyConfig, "DeliveryActive"));
checkActivationConfig("providerAdapterJNDI", MetaData.getOptionalChildContent(proxyConfig, "JMSProviderAdapterJNDI"));
- // TODO DLQ
+
+ Element dlqEl = MetaData.getOptionalChild(mdbConfig, "DLQConfig");
+ if (dlqEl != null)
+ {
+ checkActivationConfig("useDLQ", "true");
+ checkActivationConfig("DLQJNDIName", MetaData.getElementContent(MetaData.getUniqueChild(dlqEl, "DestinationQueue")));
+ try
+ {
+ checkActivationConfig("DLQMaxResent", MetaData.getElementContent(MetaData.getUniqueChild(dlqEl, "MaxTimesRedelivered")));
+ }
+ catch (Exception ignored)
+ {
+ // backwards comaptibility
+ }
+
+ // TODO TimeToLive
+
+ checkActivationConfig("DLQUser", MetaData.getElementContent(MetaData.getOptionalChild(dlqEl, "DLQUser")));
+ checkActivationConfig("DLQPassword", MetaData.getElementContent(MetaData.getOptionalChild(dlqEl, "DLQPassword")));
+ }
+ else
+ {
+ // backwards compatibility - no DLQConfig in MDBConfig means no DLQ
+ checkActivationConfig("useDLQ", "false");
+ }
}
}
}
@@ -123,10 +160,4 @@
properties.put(name, md);
}
}
-
- // Package Private -----------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner Classes -------------------------------------------------
}
Modified: trunk/server/src/main/org/jboss/ejb/plugins/inflow/JBossMessageEndpointFactory.java
===================================================================
--- trunk/server/src/main/org/jboss/ejb/plugins/inflow/JBossMessageEndpointFactory.java 2007-10-11 12:11:24 UTC (rev 66022)
+++ trunk/server/src/main/org/jboss/ejb/plugins/inflow/JBossMessageEndpointFactory.java 2007-10-11 12:19:57 UTC (rev 66023)
@@ -1,24 +1,24 @@
/*
-* JBoss, Home of Professional Open Source
-* Copyright 2005, JBoss Inc., and individual contributors as indicated
-* by the @authors tag. See the copyright.txt in the distribution for a
-* full listing of individual contributors.
-*
-* This is free software; you can redistribute it and/or modify it
-* under the terms of the GNU Lesser General Public License as
-* published by the Free Software Foundation; either version 2.1 of
-* the License, or (at your option) any later version.
-*
-* This software is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this software; if not, write to the Free
-* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-*/
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
package org.jboss.ejb.plugins.inflow;
import java.lang.reflect.Method;
@@ -26,6 +26,8 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.ejb.EJBMetaData;
import javax.management.ObjectName;
@@ -54,24 +56,15 @@
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
-import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
-
/**
* EJBProxyFactory for inflow message driven beans
*
- * @jmx:mbean extends="org.jboss.system.ServiceMBean"
- *
* @author <a href="mailto:adrian at jboss.com">Adrian Brock</a> .
* @version <tt>$Revision$</tt>
*/
-public class JBossMessageEndpointFactory
- extends ServiceMBeanSupport
+public class JBossMessageEndpointFactory extends ServiceMBeanSupport
implements EJBProxyFactory, MessageEndpointFactory, JBossMessageEndpointFactoryMBean
{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
/** Whether trace is enabled */
protected boolean trace = log.isTraceEnabled();
@@ -88,13 +81,13 @@
protected InvokerProxyBindingMetaData invokerMetaData;
/** The activation properties */
- protected HashMap properties = new HashMap();
+ protected HashMap<String, ActivationConfigPropertyMetaData> properties = new HashMap<String, ActivationConfigPropertyMetaData>();
/** The proxy factory */
protected GenericProxyFactory proxyFactory = new GenericProxyFactory();
/** The messaging type class */
- protected Class messagingTypeClass;
+ protected Class<?> messagingTypeClass;
/** The resource adapter name */
protected String resourceAdapterName;
@@ -106,16 +99,17 @@
protected ActivationSpec activationSpec;
/** The interceptors */
- protected ArrayList interceptors;
+ protected ArrayList<Class<?>> interceptors;
/** The interfaces */
- protected Class[] interfaces;
+ protected Class<?>[] interfaces;
/** The next proxy id */
- protected SynchronizedInt nextProxyId = new SynchronizedInt(0);
+ protected AtomicInteger nextProxyId = new AtomicInteger(0);
+
+ /** Whether delivery is active */
+ protected AtomicBoolean deliveryActive = new AtomicBoolean(true);
- // Static --------------------------------------------------------
-
/** The signature for createActivationSpec */
protected String[] createActivationSpecSig = new String[]
{
@@ -129,11 +123,7 @@
MessageEndpointFactory.class.getName(),
ActivationSpec.class.getName()
};
-
- // Constructors --------------------------------------------------
- // Public --------------------------------------------------------
-
/**
* Get the message driven container
*
@@ -143,20 +133,11 @@
{
return container;
}
-
- /**
- * Display the configuration
- *
- * @jmx:managed-attribute
- *
- * @return the configuration
- */
+
public String getConfig()
{
return toString();
}
-
- // MessageEndpointFactory implementation -------------------------
public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException
{
@@ -176,7 +157,7 @@
MessageEndpoint endpoint = (MessageEndpoint) proxyFactory.createProxy
(
- ejbName + "@" + nextProxyId.increment(),
+ ejbName + "@" + nextProxyId.incrementAndGet(),
container.getServiceName(),
InvokerInterceptor.getLocal(),
null,
@@ -204,8 +185,6 @@
return result;
}
- // ServiceMBeanSupport overrides ---------------------------------
-
protected void startService() throws Exception
{
// Lets take a reference to our metadata
@@ -227,8 +206,6 @@
// Deactivate
deactivate();
}
-
- // EJBProxyFactory implementation --------------------------------
public boolean isIdentical(Container container, Invocation mi)
{
@@ -245,7 +222,7 @@
throw new Error("Not valid for MessageDriven beans");
}
- public Collection getEntityCollection(Collection c)
+ public Collection getEntityCollection(Collection collection)
{
throw new Error("Not valid for MessageDriven beans");
}
@@ -275,8 +252,6 @@
this.invokerMetaData = imd;
}
- // ContainerService implementation -------------------------------
-
/**
* Set the container for which this is an invoker to.
*
@@ -287,8 +262,6 @@
this.container = (MessageDrivenContainer) container;
}
- // Object overrides ----------------------------------------------
-
/**
* Return a string representation of the current config state.
*/
@@ -304,8 +277,6 @@
buffer.append("}");
return buffer.toString();
}
-
- // Protected -----------------------------------------------------
/**
* Resolve message listener class
@@ -368,7 +339,7 @@
interfaces = new Class[] { MessageEndpoint.class, messagingTypeClass };
// Set the interceptors
- interceptors = new ArrayList();
+ interceptors = new ArrayList<Class<?>>();
Element proxyConfig = invokerMetaData.getProxyFactoryConfig();
Element endpointInterceptors = MetaData.getOptionalChild(proxyConfig, "endpoint-interceptors", null);
if (endpointInterceptors == null)
@@ -385,7 +356,7 @@
String className = MetaData.getElementContent(interceptor);
try
{
- Class clazz = container.getClassLoader().loadClass(className);
+ Class<?> clazz = container.getClassLoader().loadClass(className);
interceptors.add(clazz);
}
catch (Throwable t)
@@ -409,10 +380,10 @@
Element activationConfig = MetaData.getOptionalChild(proxyConfig, "activation-config");
if (activationConfig != null)
{
- Iterator iterator = MetaData.getChildrenByTagName(activationConfig, "activation-config-property");
+ Iterator<Element> iterator = MetaData.getChildrenByTagName(activationConfig, "activation-config-property");
while (iterator.hasNext())
{
- Element resourceRef = (Element) iterator.next();
+ Element resourceRef = iterator.next();
ActivationConfigPropertyMetaData metaData = new ActivationConfigPropertyMetaData();
metaData.importXml(resourceRef);
if (properties.containsKey(metaData.getName()) == false)
@@ -438,7 +409,7 @@
String jndiName = destinationMetaData.getJNDIName();
if (jndiName == null)
throw new DeploymentException("The message-destination '" + link + "' has no jndi-name in jboss.xml");
- properties.put("destination", jndiName);
+ properties.put("destination", new ActivationConfigPropertyMetaData("destination", jndiName));
}
}
}
@@ -470,7 +441,56 @@
" messaging-type=" + messagingTypeClass.getName() + " properties=" + metaData.getActivationConfigProperties(), t);
}
}
+
+ public void startDelivery() throws Exception
+ {
+ if (getState() != STARTED)
+ throw new IllegalStateException("The MDB is not started");
+
+ if (deliveryActive.getAndSet(true))
+ return;
+ activate();
+ }
+
+ public void stopDelivery() throws Exception
+ {
+ stopDelivery(false);
+ }
+
+ public void stopDelivery(boolean asynch) throws Exception
+ {
+ if (getState() != STARTED)
+ throw new IllegalStateException("The MDB is not started");
+
+ if (deliveryActive.getAndSet(false) == false)
+ return;
+
+ if (asynch)
+ {
+ new Thread("StopDelivery: " + getServiceName())
+ {
+ public void run()
+ {
+ deactivate();
+ }
+ }.start();
+ }
+ else
+ {
+ deactivate();
+ }
+ }
+ public boolean getDeliveryActive()
+ {
+ return deliveryActive.get();
+ }
+
+ public void setDeliveryActive(boolean active)
+ {
+ deliveryActive.set(active);
+ }
+
/**
* Activate
*
@@ -478,6 +498,12 @@
*/
protected void activate() throws DeploymentException
{
+ if (deliveryActive.get() == false)
+ {
+ log.info("Delivery is disabled: " + getServiceName());
+ return;
+ }
+
Object[] params = new Object[] { this, activationSpec };
try
{
@@ -508,10 +534,4 @@
" activationSpec=" + activationSpec, t);
}
}
-
- // Package Private -----------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner Classes -------------------------------------------------
}
Modified: trunk/server/src/main/org/jboss/ejb/plugins/inflow/JBossMessageEndpointFactoryMBean.java
===================================================================
--- trunk/server/src/main/org/jboss/ejb/plugins/inflow/JBossMessageEndpointFactoryMBean.java 2007-10-11 12:11:24 UTC (rev 66022)
+++ trunk/server/src/main/org/jboss/ejb/plugins/inflow/JBossMessageEndpointFactoryMBean.java 2007-10-11 12:19:57 UTC (rev 66023)
@@ -1,34 +1,64 @@
/*
-* JBoss, Home of Professional Open Source
-* Copyright 2005, JBoss Inc., and individual contributors as indicated
-* by the @authors tag. See the copyright.txt in the distribution for a
-* full listing of individual contributors.
-*
-* This is free software; you can redistribute it and/or modify it
-* under the terms of the GNU Lesser General Public License as
-* published by the Free Software Foundation; either version 2.1 of
-* the License, or (at your option) any later version.
-*
-* This software is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this software; if not, write to the Free
-* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-*/
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
package org.jboss.ejb.plugins.inflow;
/**
* MBean interface.
*/
-public interface JBossMessageEndpointFactoryMBean extends org.jboss.system.ServiceMBean {
-
+public interface JBossMessageEndpointFactoryMBean extends org.jboss.system.ServiceMBean
+{
/**
* Display the configuration
- * @return the configuration */
- java.lang.String getConfig() ;
+ *
+ * @return the configuration
+ */
+ String getConfig();
+ /**
+ * Get whether delivery is active
+ *
+ * @return true when active
+ */
+ boolean getDeliveryActive();
+
+ /**
+ * Start delivery
+ *
+ * @throws Exception for any error
+ */
+ void startDelivery() throws Exception;
+
+ /**
+ * Stop delivery synchronously
+ *
+ * @throws Exception for any error
+ */
+ void stopDelivery() throws Exception;
+
+ /**
+ * Stop delivery synchronously
+ *
+ * @param asynch whether to stop delivery asynchronously
+ * @throws Exception for any error
+ */
+ void stopDelivery(boolean asynch) throws Exception;
}
Modified: trunk/server/src/main/org/jboss/ejb/plugins/inflow/MessageEndpointInterceptor.java
===================================================================
--- trunk/server/src/main/org/jboss/ejb/plugins/inflow/MessageEndpointInterceptor.java 2007-10-11 12:11:24 UTC (rev 66022)
+++ trunk/server/src/main/org/jboss/ejb/plugins/inflow/MessageEndpointInterceptor.java 2007-10-11 12:19:57 UTC (rev 66023)
@@ -1,27 +1,28 @@
/*
-* JBoss, Home of Professional Open Source
-* Copyright 2005, JBoss Inc., and individual contributors as indicated
-* by the @authors tag. See the copyright.txt in the distribution for a
-* full listing of individual contributors.
-*
-* This is free software; you can redistribute it and/or modify it
-* under the terms of the GNU Lesser General Public License as
-* published by the Free Software Foundation; either version 2.1 of
-* the License, or (at your option) any later version.
-*
-* This software is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this software; if not, write to the Free
-* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-*/
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
package org.jboss.ejb.plugins.inflow;
import java.lang.reflect.Method;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.resource.ResourceException;
import javax.transaction.Status;
@@ -34,8 +35,6 @@
import org.jboss.logging.Logger;
import org.jboss.proxy.Interceptor;
-import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
-
/**
* Implements the application server message endpoint requirements.
*
@@ -44,7 +43,8 @@
*/
public class MessageEndpointInterceptor extends Interceptor
{
- // Constants -----------------------------------------------------
+ /** The serialVersionUID */
+ private static final long serialVersionUID = -8740717288847385688L;
/** The log */
private static final Logger log = Logger.getLogger(MessageEndpointInterceptor.class);
@@ -55,8 +55,6 @@
/** The key for the xa resource */
public static final String MESSAGE_ENDPOINT_XARESOURCE = "MessageEndpoint.XAResource";
- // Attributes ----------------------------------------------------
-
/** Whether trace is enabled */
private boolean trace = log.isTraceEnabled();
@@ -64,10 +62,10 @@
private String cachedProxyString = null;
/** Whether this proxy has been released */
- protected SynchronizedBoolean released = new SynchronizedBoolean(false);
+ protected AtomicBoolean released = new AtomicBoolean(false);
/** Whether we have delivered a message */
- protected boolean delivered = false;
+ protected AtomicBoolean delivered = new AtomicBoolean(false);
/** The in use thread */
protected Thread inUseThread = null;
@@ -81,20 +79,15 @@
/** Any suspended transaction */
protected Transaction suspended = null;
+ /** The beforeDeliveryInvoked used to identify sequence of before/after invocation*/
+ protected boolean beforeDeliveryInvoked = false;
+
/** The message endpoint factory */
private JBossMessageEndpointFactory endpointFactory;
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
public MessageEndpointInterceptor()
{
}
-
- // Public --------------------------------------------------------
-
- // Interceptor implementation ------------------------------------
public Object invoke(Invocation mi) throws Throwable
{
@@ -103,10 +96,13 @@
throw new IllegalStateException("This message endpoint + " + getProxyString(mi) + " has been released");
// Concurrent invocation?
- Thread currentThread = Thread.currentThread();
- if (inUseThread != null && inUseThread.equals(currentThread) == false)
- throw new IllegalStateException("This message endpoint + " + getProxyString(mi) + " is already in use by another thread " + inUseThread);
- inUseThread = currentThread;
+ synchronized (this)
+ {
+ Thread currentThread = Thread.currentThread();
+ if (inUseThread != null && inUseThread.equals(currentThread) == false)
+ throw new IllegalStateException("This message endpoint + " + getProxyString(mi) + " is already in use by another thread " + inUseThread);
+ inUseThread = currentThread;
+ }
String method = mi.getMethod().getName();
if (trace)
@@ -132,10 +128,6 @@
return delivery(mi);
}
- // Package Protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
/**
* Release this message endpoint.
*
@@ -151,7 +143,7 @@
log.trace("MessageEndpoint " + getProxyString(mi) + " released");
// Tidyup any outstanding delivery
- if (oldClassLoader != null)
+ if (getOldClassLoader() != null)
{
try
{
@@ -173,16 +165,16 @@
protected void before(Invocation mi) throws Throwable
{
// Called out of sequence
- if (oldClassLoader != null)
+ if (getBeforeDeliveryInvoke())
throw new IllegalStateException("Missing afterDelivery from the previous beforeDelivery for message endpoint " + getProxyString(mi));
- if (trace)
- log.trace("MessageEndpoint " + getProxyString(mi) + " released");
-
// Set the classloader
MessageDrivenContainer container = getContainer(mi);
- oldClassLoader = GetTCLAction.getContextClassLoader(inUseThread);
- SetTCLAction.setContextClassLoader(inUseThread, container.getClassLoader());
+ synchronized (this)
+ {
+ oldClassLoader = GetTCLAction.getContextClassLoader(inUseThread);
+ SetTCLAction.setContextClassLoader(inUseThread, container.getClassLoader());
+ }
if (trace)
log.trace("MessageEndpoint " + getProxyString(mi) + " set context classloader to " + container.getClassLoader());
@@ -190,9 +182,11 @@
try
{
startTransaction("beforeDelivery", mi, container);
+ setBeforeDeliveryInvoke(true);
}
catch (Throwable t)
{
+ setBeforeDeliveryInvoke(false);
resetContextClassLoader(mi);
throw new ResourceException(t);
}
@@ -207,8 +201,11 @@
protected void after(Invocation mi) throws Throwable
{
// Called out of sequence
- if (oldClassLoader == null)
+ if(!getBeforeDeliveryInvoke())
+ {
throw new IllegalStateException("afterDelivery without a previous beforeDelivery for message endpoint " + getProxyString(mi));
+
+ }
// Finish this delivery committing if we can
try
@@ -218,6 +215,7 @@
catch (Throwable t)
{
throw new ResourceException(t);
+
}
}
@@ -231,24 +229,22 @@
protected Object delivery(Invocation mi) throws Throwable
{
// Have we already delivered a message?
- if (delivered)
+ if (delivered.get())
throw new IllegalStateException("Multiple message delivery between before and after delivery is not allowed for message endpoint " + getProxyString(mi));
if (trace)
log.trace("MessageEndpoint " + getProxyString(mi) + " delivering");
// Mark delivery if beforeDelivery was invoked
- if (oldClassLoader != null)
- delivered = true;
-
+ if (getOldClassLoader() != null)
+ delivered.set(true);
MessageDrivenContainer container = getContainer(mi);
boolean commit = true;
- container.pushENC();
try
{
// Check for starting a transaction
- if (oldClassLoader == null)
+ if (getOldClassLoader() == null)
startTransaction("delivery", mi, container);
return getNext().invoke(mi);
}
@@ -258,6 +254,7 @@
log.trace("MessageEndpoint " + getProxyString(mi) + " delivery error", t);
if (t instanceof Error || t instanceof RuntimeException)
{
+ Transaction transaction = getTransaction();
if (transaction != null)
transaction.setRollbackOnly();
commit = false;
@@ -266,9 +263,8 @@
}
finally
{
- container.popENC();
// No before/after delivery, end any transaction and release the lock
- if (oldClassLoader == null)
+ if (getOldClassLoader() == null)
{
try
{
@@ -299,9 +295,9 @@
}
finally
{
+ setBeforeDeliveryInvoke(false);
// Reset delivered flag
- delivered = false;
-
+ delivered.set(false);
// Change back to the original context classloader
resetContextClassLoader(mi);
// We no longer hold the lock
@@ -339,7 +335,11 @@
// Get the transaction status
TransactionManager tm = container.getTransactionManager();
- suspended = tm.suspend();
+ Transaction tx = tm.suspend();
+ synchronized (this)
+ {
+ suspended = tx;
+ }
if (trace)
log.trace("MessageEndpoint " + getProxyString(mi) + " " + context + " currentTx=" + suspended);
@@ -351,7 +351,11 @@
if (suspended == null)
{
tm.begin();
- transaction = tm.getTransaction();
+ tx = tm.getTransaction();
+ synchronized (this)
+ {
+ transaction = tx;
+ }
if (trace)
log.trace("MessageEndpoint " + getProxyString(mi) + " started transaction=" + transaction);
@@ -372,7 +376,10 @@
}
finally
{
- suspended = null;
+ synchronized (this)
+ {
+ suspended = null;
+ }
if (trace)
log.trace("MessageEndpoint " + getProxyString(mi) + " transaction=" + suspended + " already active, IGNORED=" + resource);
}
@@ -394,6 +401,7 @@
try
{
// If we started the transaction, commit it
+ Transaction transaction = getTransaction();
if (transaction != null)
{
tm = getContainer(mi).getTransactionManager();
@@ -428,6 +436,7 @@
}
// If we suspended the incoming transaction, resume it
+ Transaction suspended = getSuspended();
if (suspended != null)
{
try
@@ -437,7 +446,10 @@
}
finally
{
- suspended = null;
+ synchronized (this)
+ {
+ this.suspended = null;
+ }
}
}
}
@@ -453,7 +465,6 @@
catch (Throwable t)
{
log.warn("MessageEndpoint " + getProxyString(mi) + " failed to resume old transaction " + currentTx);
-
}
}
}
@@ -466,12 +477,26 @@
*/
protected void resetContextClassLoader(Invocation mi)
{
- if (trace)
- log.trace("MessageEndpoint " + getProxyString(mi) + " reset classloader " + oldClassLoader);
- SetTCLAction.setContextClassLoader(inUseThread, oldClassLoader);
- oldClassLoader = null;
+ synchronized (this)
+ {
+ if (trace)
+ log.trace("MessageEndpoint " + getProxyString(mi) + " reset classloader " + oldClassLoader);
+ SetTCLAction.setContextClassLoader(inUseThread, oldClassLoader);
+ oldClassLoader = null;
+ }
}
-
+
+ protected void setBeforeDeliveryInvoke(boolean bdi)
+ {
+ this.beforeDeliveryInvoked = bdi;
+
+ }
+
+ protected boolean getBeforeDeliveryInvoke()
+ {
+ return this.beforeDeliveryInvoked;
+
+ }
/**
* Release the thread lock
*
@@ -479,9 +504,12 @@
*/
protected void releaseThreadLock(Invocation mi)
{
- if (trace)
- log.trace("MessageEndpoint " + getProxyString(mi) + " no longer in use by " + inUseThread);
- inUseThread = null;
+ synchronized (this)
+ {
+ if (trace)
+ log.trace("MessageEndpoint " + getProxyString(mi) + " no longer in use by " + inUseThread);
+ inUseThread = null;
+ }
}
/**
@@ -500,26 +528,45 @@
/**
* Get the message endpoint factory
*
+ * @param mi the invocation
* @return the message endpoint factory
*/
protected JBossMessageEndpointFactory getMessageEndpointFactory(Invocation mi)
{
if (endpointFactory == null)
endpointFactory = (JBossMessageEndpointFactory) mi.getInvocationContext().getValue(MESSAGE_ENDPOINT_FACTORY);
+ if (endpointFactory == null)
+ throw new IllegalStateException("No message endpoint factory in " + mi.getInvocationContext().context);
return endpointFactory;
}
/**
* Get the container
*
+ * @param mi the invocation
* @return the container
*/
protected MessageDrivenContainer getContainer(Invocation mi)
{
- return getMessageEndpointFactory(mi).getContainer();
+ JBossMessageEndpointFactory messageEndpointFactory = getMessageEndpointFactory(mi);
+ MessageDrivenContainer container = messageEndpointFactory.getContainer();
+ if (container == null)
+ throw new IllegalStateException("No container associated with message endpoint factory: " + messageEndpointFactory.getServiceName());
+ return container;
}
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
+ protected synchronized ClassLoader getOldClassLoader()
+ {
+ return oldClassLoader;
+ }
+
+ protected synchronized Transaction getTransaction()
+ {
+ return transaction;
+ }
+
+ protected synchronized Transaction getSuspended()
+ {
+ return suspended;
+ }
}
Modified: trunk/server/src/main/org/jboss/ejb/plugins/inflow/SetTCLAction.java
===================================================================
--- trunk/server/src/main/org/jboss/ejb/plugins/inflow/SetTCLAction.java 2007-10-11 12:11:24 UTC (rev 66022)
+++ trunk/server/src/main/org/jboss/ejb/plugins/inflow/SetTCLAction.java 2007-10-11 12:19:57 UTC (rev 66023)
@@ -1,24 +1,24 @@
/*
-* JBoss, Home of Professional Open Source
-* Copyright 2005, JBoss Inc., and individual contributors as indicated
-* by the @authors tag. See the copyright.txt in the distribution for a
-* full listing of individual contributors.
-*
-* This is free software; you can redistribute it and/or modify it
-* under the terms of the GNU Lesser General Public License as
-* published by the Free Software Foundation; either version 2.1 of
-* the License, or (at your option) any later version.
-*
-* This software is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this software; if not, write to the Free
-* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-*/
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
package org.jboss.ejb.plugins.inflow;
import java.security.AccessController;
@@ -28,7 +28,7 @@
* @author Scott.Stark at jboss.org
* @version $Revison:$
*/
-public class SetTCLAction implements PrivilegedAction
+public class SetTCLAction implements PrivilegedAction<Object>
{
Thread t;
ClassLoader loader;
Deleted: trunk/server/src/main/org/jboss/ejb/plugins/jms/DLQHandler.java
===================================================================
--- trunk/server/src/main/org/jboss/ejb/plugins/jms/DLQHandler.java 2007-10-11 12:11:24 UTC (rev 66022)
+++ trunk/server/src/main/org/jboss/ejb/plugins/jms/DLQHandler.java 2007-10-11 12:19:57 UTC (rev 66023)
@@ -1,518 +0,0 @@
-/*
-* JBoss, Home of Professional Open Source
-* Copyright 2005, JBoss Inc., and individual contributors as indicated
-* by the @authors tag. See the copyright.txt in the distribution for a
-* full listing of individual contributors.
-*
-* This is free software; you can redistribute it and/or modify it
-* under the terms of the GNU Lesser General Public License as
-* published by the Free Software Foundation; either version 2.1 of
-* the License, or (at your option) any later version.
-*
-* This software is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this software; if not, write to the Free
-* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-*/
-package org.jboss.ejb.plugins.jms;
-
-import java.util.Hashtable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Enumeration;
-import java.util.Iterator;
-
-import javax.naming.Context;
-import javax.jms.ExceptionListener;
-import javax.jms.Session;
-import javax.jms.QueueConnection;
-import javax.jms.QueueConnectionFactory;
-import javax.jms.QueueSession;
-import javax.jms.QueueSender;
-import javax.jms.Queue;
-import javax.jms.Message;
-import javax.jms.JMSException;
-import javax.jms.Destination;
-import javax.transaction.Status;
-import javax.transaction.Synchronization;
-import javax.transaction.Transaction;
-
-import org.w3c.dom.Element;
-
-import org.jboss.deployment.DeploymentException;
-import org.jboss.metadata.MetaData;
-import org.jboss.jms.jndi.JMSProviderAdapter;
-import org.jboss.system.ServiceMBeanSupport;
-
-/**
- * Places redeliveded messages on a Dead Letter Queue.
- *
- *<p>
- *The Dead Letter Queue handler is used to not set JBoss in an endles loop
- * when a message is resent on and on due to transaction rollback for
- * message receipt.
- *
- * <p>
- * It sends message to a dead letter queue (configurable, defaults to
- * queue/DLQ) when the message has been resent a configurable amount of times,
- * defaults to 10.
- *
- * <p>
- * The handler is configured through the element MDBConfig in
- * container-invoker-conf.
- *
- * <p>
- * The JMS property JBOSS_ORIG_DESTINATION in the resent message is set
- * to the name of the original destination (Destination.toString())
- * if it is present.
- *
- * <p>
- * The JMS property JBOSS_ORIG_MESSAGEID in the resent message is set
- * to the id of the original message.
- *
- * @author <a href="mailto:jason at planet57.com">Jason Dillon</a>
- * @author Scott.Stark at jboss.org
- * @author Adrian Brock
- * @version <tt>$Revision$</tt>
- */
-public class DLQHandler extends ServiceMBeanSupport implements ExceptionListener
-{
- /** Standard property for delivery count */
- public static final String PROPERTY_DELIVERY_COUNT = "JMSXDeliveryCount";
-
- /** JMS property name holding original destination. */
- public static final String JBOSS_ORIG_DESTINATION = "JBOSS_ORIG_DESTINATION";
-
- /** JMS property name holding original JMS message id. */
- public static final String JBOSS_ORIG_MESSAGEID = "JBOSS_ORIG_MESSAGEID";
-
- /** Properties copied from org.jboss.mq.SpyMessage */
- private static final String JMS_JBOSS_REDELIVERY_COUNT = "JMS_JBOSS_REDELIVERY_COUNT";
- private static final String JMS_JBOSS_REDELIVERY_LIMIT = "JMS_JBOSS_REDELIVERY_LIMIT";
-
- /**
- * Destination to send dead letters to.
- *
- * <p>
- * Defaults to <em>queue/DLQ</em>, configurable through
- * <tt>DestinationQueue</tt> element.
- */
- private String destinationJNDI = "queue/DLQ";
-
- /**
- * Maximum times a message is alowed to be resent.
- *
- * <p>Defaults to <em>10</em>, configurable through
- * <tt>MaxTimesRedelivered</tt> element.
- */
- private int maxResent = 10;
-
- /**
- * Time to live for the message.
- *
- * <p>
- * Defaults to <em>{@link Message#DEFAULT_TIME_TO_LIVE}</em>,
- * configurable through the <tt>TimeToLive</tt> element.
- */
- private long timeToLive = Message.DEFAULT_TIME_TO_LIVE;
-
- // May become configurable
-
- /** Delivery mode for message, Message.DEFAULT_DELIVERY_MODE. */
- private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
-
- /** Priority for the message, Message.DEFAULT_PRIORITY */
- private int priority = Message.DEFAULT_PRIORITY;
-
- /** The dlq user for the connection */
- private String dlqUser;
-
- /** The dlq password for the connection */
- private String dlqPass;
-
- // Private stuff
- private QueueConnection connection;
- private Queue dlq;
- private JMSProviderAdapter providerAdapter;
- private JMSContainerInvoker invoker;
- private Hashtable resentBuffer = new Hashtable();
-
- public DLQHandler(final JMSProviderAdapter providerAdapter, final JMSContainerInvoker invoker)
- {
- this.providerAdapter = providerAdapter;
- this.invoker = invoker;
- }
-
- public void onException(JMSException e)
- {
- if (invoker != null && invoker.exListener != null)
- invoker.exListener.handleFailure(e);
- else
- {
- log.warn("DLQHandler got JMS Failure but there is no link to JMSContainerInvoker's exception listener.", e);
-
- // We shouldn't get here, but if we do, we should at least close the connection
- if (connection != null)
- {
- try
- {
- connection.close();
- }
- catch (Throwable ignored)
- {
- log.trace("Ignored error closing connection", ignored);
- }
- connection = null;
- }
- }
- }
-
- protected void createService() throws Exception
- {
- Context ctx = providerAdapter.getInitialContext();
-
- try
- {
- String factoryName = providerAdapter.getQueueFactoryRef();
- QueueConnectionFactory factory = (QueueConnectionFactory)
- ctx.lookup(factoryName);
- log.debug("Using factory: " + factory);
-
- if (dlqUser == null)
- connection = factory.createQueueConnection();
- else
- connection = factory.createQueueConnection(dlqUser, dlqPass);
- log.debug("Created connection: " + connection);
-
- dlq = (Queue) ctx.lookup(destinationJNDI);
- log.debug("Using Queue: " + dlq);
- }
- finally
- {
- ctx.close();
- }
- }
-
- protected void startService() throws Exception
- {
- connection.setExceptionListener(this);
- connection.start();
- }
-
- protected void stopService() throws Exception
- {
- try
- {
- connection.setExceptionListener(null);
- connection.stop();
- }
- catch (Throwable t)
- {
- log.trace("Ignored error stopping DLQ", t);
- }
- }
-
- protected void destroyService() throws Exception
- {
- // Help the GC
- if (connection != null)
- connection.close();
- connection = null;
- dlq = null;
- providerAdapter = null;
- }
-
- /**
- * Check if a message has been redelivered to many times.
- *
- * If message has been redelivered to many times, send it to the
- * dead letter queue (default to queue/DLQ).
- *
- * @return true if message is handled (i.e resent), false if not.
- */
- public boolean handleRedeliveredMessage(final Message msg, final Transaction tx)
- {
- boolean handled = false;
- int max = this.maxResent;
- String id = null;
- boolean fromMessage = true;
- int count = 0;
-
- try
- {
-
- if (msg.propertyExists(JMS_JBOSS_REDELIVERY_LIMIT))
- max = msg.getIntProperty(JMS_JBOSS_REDELIVERY_LIMIT);
-
- try
- {
- if (msg.propertyExists(PROPERTY_DELIVERY_COUNT))
- count = msg.getIntProperty(PROPERTY_DELIVERY_COUNT);
- }
- catch (JMSException ignored)
- {
- }
- if (count > 0)
- {
- // The delivery count is one too many
- --count;
- }
- else if (msg.propertyExists(JMS_JBOSS_REDELIVERY_COUNT))
- count = msg.getIntProperty(JMS_JBOSS_REDELIVERY_COUNT);
- else
- {
- id = msg.getJMSMessageID();
- if (id == null)
- {
- // if we can't get the id we are basically fucked
- log.error("Message id is null, can't handle message");
- return false;
- }
- count = incrementResentCount(id);
- fromMessage = false;
- }
-
- if (count > max)
- {
- id = msg.getJMSMessageID();
- log.warn("Message resent too many times; sending it to DLQ; message id=" + id);
-
- sendMessage(msg);
- deleteFromBuffer(id);
-
- handled = true;
- }
- else if (fromMessage == false && tx != null)
- {
- // Register a synchronization to remove the buffer entry
- // should the transaction commit
- DLQSynchronization synch = new DLQSynchronization(id);
- try
- {
- tx.registerSynchronization(synch);
- }
- catch (Exception e)
- {
- log.warn("Error registering DlQ Synchronization with transaction " + tx, e);
- }
- }
- }
- catch (JMSException e)
- {
- // If we can't send it ahead, we do not dare to just drop it...or?
- log.error("Could not send message to Dead Letter Queue", e);
- }
-
- return handled;
- }
-
- /**
- * Send message to the configured dead letter queue, defaults to queue/DLQ.
- */
- protected void sendMessage(Message msg) throws JMSException
- {
- boolean trace = log.isTraceEnabled();
-
- QueueSession session = null;
- QueueSender sender = null;
-
- try
- {
- msg = makeWritable(msg, trace); // Don't know yet if we are gona clone or not
-
- // Set the properties
- msg.setStringProperty(JBOSS_ORIG_MESSAGEID, msg.getJMSMessageID());
- // Some providers (say Websphere MQ) don't set this to something we can use
- Destination d = msg.getJMSDestination();
- if (d != null)
- msg.setStringProperty(JBOSS_ORIG_DESTINATION, d.toString());
-
- session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
- sender = session.createSender(dlq);
- if (trace)
- {
- log.trace("Sending message to DLQ; destination=" +
- dlq + ", session=" + session + ", sender=" + sender);
- }
-
- sender.send(msg, deliveryMode, priority, timeToLive);
-
- if (trace)
- {
- log.trace("Message sent.");
- }
-
- }
- finally
- {
- try
- {
- if (sender != null) sender.close();
- if (session != null) session.close();
- }
- catch (Exception e)
- {
- log.warn("Failed to close sender or session; ignoring", e);
- }
- }
- }
-
- /**
- * Increment the counter for the specific JMS message id.
- *
- * @return the new counter value.
- */
- protected int incrementResentCount(String id)
- {
- BufferEntry entry = null;
- boolean trace = log.isTraceEnabled();
- if (!resentBuffer.containsKey(id))
- {
- if (trace)
- log.trace("Making new entry for id " + id);
- entry = new BufferEntry();
- entry.id = id;
- entry.count = 1;
- resentBuffer.put(id, entry);
- }
- else
- {
- entry = (BufferEntry) resentBuffer.get(id);
- entry.count++;
- if (trace)
- log.trace("Incremented old entry for id " + id + " count " + entry.count);
- }
- return entry.count;
- }
-
- /**
- * Delete the entry in the message counter buffer for specifyed JMS id.
- */
- protected void deleteFromBuffer(String id)
- {
- resentBuffer.remove(id);
- }
-
- /**
- * Make the Message properties writable.
- *
- * @return the writable message.
- */
- protected Message makeWritable(Message msg, boolean trace) throws JMSException
- {
- HashMap tmp = new HashMap();
-
- // Save properties
- for (Enumeration en = msg.getPropertyNames(); en.hasMoreElements();)
- {
- String key = (String) en.nextElement();
- tmp.put(key, msg.getObjectProperty(key));
- }
-
- // Make them writable
- msg.clearProperties();
-
- Iterator i = tmp.entrySet().iterator();
- while (i.hasNext())
- {
- Map.Entry me = (Map.Entry)i.next();
- String key = (String) me.getKey();
- try
- {
- msg.setObjectProperty(key, me.getValue());
- }
- catch (JMSException ignored)
- {
- if (trace)
- log.trace("Could not copy message property " + key, ignored);
- }
- }
-
- return msg;
- }
-
- /**
- * Takes an MDBConfig Element
- */
- public void importXml(final Element element) throws DeploymentException
- {
- destinationJNDI = MetaData.getElementContent
- (MetaData.getUniqueChild(element, "DestinationQueue"));
-
- try
- {
- String mr = MetaData.getElementContent
- (MetaData.getUniqueChild(element, "MaxTimesRedelivered"));
- maxResent = Integer.parseInt(mr);
- }
- catch (Exception ignore)
- {
- }
-
- try
- {
- String ttl = MetaData.getElementContent
- (MetaData.getUniqueChild(element, "TimeToLive"));
- timeToLive = Long.parseLong(ttl);
-
- if (timeToLive < 0)
- {
- log.warn("Invalid TimeToLive: " + timeToLive + "; using default");
- timeToLive = Message.DEFAULT_TIME_TO_LIVE;
- }
- }
- catch (Exception ignore)
- {
- }
-
- dlqUser = MetaData.getElementContent(MetaData.getOptionalChild(element, "DLQUser"));
- dlqPass = MetaData.getElementContent(MetaData.getOptionalChild(element, "DLQPassword"));
- }
-
- public String toString()
- {
- return super.toString() +
- "{ destinationJNDI=" + destinationJNDI +
- ", maxResent=" + maxResent +
- ", timeToLive=" + timeToLive +
- " }";
- }
-
- private static class BufferEntry
- {
- int count;
- String id;
- }
-
- /**
- * Remove a redelivered message from the DLQ's buffer when it is acknowledged
- */
- protected class DLQSynchronization implements Synchronization
- {
- /** The message id */
- String id;
-
- public DLQSynchronization(String id)
- {
- this.id = id;
- }
-
- public void beforeCompletion()
- {
- }
-
- /**
- * Forget the message when the transaction commits
- */
- public void afterCompletion(int status)
- {
- if (status == Status.STATUS_COMMITTED)
- deleteFromBuffer(id);
- }
- }
-}
Modified: trunk/server/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvoker.java
===================================================================
--- trunk/server/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvoker.java 2007-10-11 12:11:24 UTC (rev 66022)
+++ trunk/server/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvoker.java 2007-10-11 12:19:57 UTC (rev 66023)
@@ -1,71 +1,39 @@
/*
-* JBoss, Home of Professional Open Source
-* Copyright 2005, JBoss Inc., and individual contributors as indicated
-* by the @authors tag. See the copyright.txt in the distribution for a
-* full listing of individual contributors.
-*
-* This is free software; you can redistribute it and/or modify it
-* under the terms of the GNU Lesser General Public License as
-* published by the Free Software Foundation; either version 2.1 of
-* the License, or (at your option) any later version.
-*
-* This software is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this software; if not, write to the Free
-* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-*/
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
package org.jboss.ejb.plugins.jms;
-import java.lang.reflect.Method;
-import java.security.AccessController;
-import java.security.Principal;
-import java.security.PrivilegedAction;
-import java.util.Collection;
+import java.util.ArrayList;
-import javax.ejb.EJBMetaData;
-import javax.jms.Connection;
-import javax.jms.ConnectionConsumer;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
import javax.jms.MessageListener;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.ServerSessionPool;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.management.MBeanServer;
-import javax.management.Notification;
-import javax.management.ObjectName;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
+import javax.resource.spi.endpoint.MessageEndpoint;
import org.jboss.deployment.DeploymentException;
-import org.jboss.ejb.Container;
import org.jboss.ejb.EJBProxyFactory;
-import org.jboss.invocation.Invocation;
-import org.jboss.invocation.InvocationType;
-import org.jboss.jms.ConnectionFactoryHelper;
-import org.jboss.jms.asf.ServerSessionPoolFactory;
-import org.jboss.jms.asf.StdServerSessionPool;
-import org.jboss.jms.jndi.JMSProviderAdapter;
-import org.jboss.logging.Logger;
-import org.jboss.metadata.ActivationConfigPropertyMetaData;
-import org.jboss.metadata.InvokerProxyBindingMetaData;
-import org.jboss.metadata.MessageDestinationMetaData;
+import org.jboss.ejb.plugins.inflow.JBossJMSMessageEndpointFactory;
+import org.jboss.ejb.plugins.inflow.MessageEndpointInterceptor;
+import org.jboss.invocation.InvokerInterceptor;
import org.jboss.metadata.MessageDrivenMetaData;
-import org.jboss.metadata.MetaData;
-import org.jboss.system.ServiceMBeanSupport;
-import org.w3c.dom.Element;
+import org.jboss.proxy.ClientMethodInterceptor;
+import org.jboss.proxy.TransactionInterceptor;
/**
* EJBProxyFactory for JMS MessageDrivenBeans
@@ -78,187 +46,22 @@
* @author <a href="mailto:adrian at jboss.com">Adrian Brock</a>
* @version <tt>$Revision$</tt>
*/
-public class JMSContainerInvoker extends ServiceMBeanSupport
+public class JMSContainerInvoker extends JBossJMSMessageEndpointFactory
implements EJBProxyFactory, JMSContainerInvokerMBean
{
- /** The logger */
- private static final Logger log = Logger.getLogger(JMSContainerInvoker.class);
-
- /** Notification sent before connectioning */
- private static final String CONNECTING_NOTIFICATION = "org.jboss.ejb.plugins.jms.CONNECTING";
-
- /** Notification sent after connection */
- private static final String CONNECTED_NOTIFICATION = "org.jboss.ejb.plugins.jms.CONNECTED";
-
- /** Notification sent before disconnection */
- private static final String DISCONNECTING_NOTIFICATION = "org.jboss.ejb.plugins.jms.DISCONNECTING";
-
- /** Notification sent before disconnected */
- private static final String DISCONNECTED_NOTIFICATION = "org.jboss.ejb.plugins.jms.DISCONNECTED";
-
- /** Notification sent at connection failure */
- private static final String FAILURE_NOTIFICATION = "org.jboss.ejb.plugins.jms.FAILURE";
-
- /** {@link MessageListener#onMessage} reference. */
- protected static Method ON_MESSAGE;
-
- /**
- * Default destination type. Used when no message-driven-destination is given
- * in ejb-jar, and a lookup of destinationJNDI from jboss.xml is not
- * successfull. Default value: javax.jms.Topic.
- */
- protected final static String DEFAULT_DESTINATION_TYPE = "javax.jms.Topic";
-
- /**
- * Initialize the ON_MESSAGE reference.
- */
- static
+ protected void setupProxyParameters() throws DeploymentException
{
- try
- {
- final Class type = MessageListener.class;
- final Class arg = Message.class;
- ON_MESSAGE = type.getMethod("onMessage", new Class[]{arg});
- }
- catch (Exception e)
- {
- throw new ExceptionInInitializerError(e);
- }
+ // Set the interfaces
+ interfaces = new Class[] { MessageEndpoint.class, MessageListener.class };
+
+ // Set the interceptors
+ interceptors = new ArrayList<Class<?>>();
+ interceptors.add(ClientMethodInterceptor.class);
+ interceptors.add(MessageEndpointInterceptor.class);
+ interceptors.add(TransactionInterceptor.class);
+ interceptors.add(InvokerInterceptor.class);
}
-
- protected boolean optimize;
- /** Maximum number provider is allowed to stuff into a session. */
- protected int maxMessagesNr = 1;
-
- /** Minimun pool size of server sessions. */
- protected int minPoolSize = 1;
-
- /** Keep alive server sessions. */
- protected long keepAlive = 30 * 1000;
-
- /** Maximun pool size of server sessions. */
- protected int maxPoolSize = 15;
-
- /** Time to wait before retrying to reconnect a lost connection. */
- protected long reconnectInterval = 10000;
-
- /** If Dead letter queue should be used or not. */
- protected boolean useDLQ = false;
-
- /**
- * JNDI name of the provider adapter.
- *
- * @see org.jboss.jms.jndi.JMSProviderAdapter
- */
- protected String providerAdapterJNDI;
-
- /**
- * JNDI name of the server session factory.
- *
- * @see org.jboss.jms.asf.ServerSessionPoolFactory
- */
- protected String serverSessionPoolFactoryJNDI;
-
- /** JMS acknowledge mode, used when session is not XA. */
- protected int acknowledgeMode;
-
- protected boolean isContainerManagedTx;
- protected boolean isNotSupportedTx;
-
- /** The container. */
- protected Container container;
-
- /** The JMS connection. */
- protected Connection connection;
-
- /** The JMS connection consumer. */
- protected ConnectionConsumer connectionConsumer;
-
- protected TransactionManager tm;
- protected ServerSessionPool pool;
- protected ExceptionListenerImpl exListener;
-
- /** Dead letter queue handler. */
- protected DLQHandler dlqHandler;
-
- /** DLQConfig element from MDBConfig element from jboss.xml. */
- protected Element dlqConfig;
-
- protected InvokerProxyBindingMetaData invokerMetaData;
- protected String invokerBinding;
-
- protected boolean deliveryActive = true;
-
- protected boolean createJBossMQDestination = false;
-
- /**
- * Set the invoker meta data so that the ProxyFactory can initialize
- * properly
- */
- public void setInvokerMetaData(InvokerProxyBindingMetaData imd)
- {
- invokerMetaData = imd;
- }
-
- /**
- * Set the invoker jndi binding
- */
- public void setInvokerBinding(String binding)
- {
- invokerBinding = binding;
- }
-
- /**
- * Set the container for which this is an invoker to.
- *
- * @param container The container for which this is an invoker to.
- */
- public void setContainer(final Container container)
- {
- this.container = container;
- }
-
- public int getMinPoolSize()
- {
- return minPoolSize;
- }
-
- public void setMinPoolSize(int minPoolSize)
- {
- this.minPoolSize = minPoolSize;
- }
-
- public int getMaxPoolSize()
- {
- return maxPoolSize;
- }
-
- public void setMaxPoolSize(int maxPoolSize)
- {
- this.maxPoolSize = maxPoolSize;
- }
-
- public long getKeepAliveMillis()
- {
- return keepAlive;
- }
-
- public void setKeepAliveMillis(long keepAlive)
- {
- this.keepAlive = keepAlive;
- }
-
- public int getMaxMessages()
- {
- return maxMessagesNr;
- }
-
- public void setMaxMessages(int maxMessages)
- {
- this.maxMessagesNr = maxMessages;
- }
-
public MessageDrivenMetaData getMetaData()
{
MessageDrivenMetaData config =
@@ -266,1335 +69,60 @@
return config;
}
- public boolean getDeliveryActive()
- {
- return deliveryActive;
- }
-
public boolean getCreateJBossMQDestination()
{
- return createJBossMQDestination;
+ // TODO getCreateJBossMQDestination
+ return false;
}
- public void startDelivery()
- throws Exception
+ public long getKeepAliveMillis()
{
- if (getState() != STARTED)
- throw new IllegalStateException("The MDB is not started");
- if (deliveryActive)
- return;
- deliveryActive = true;
- startService();
+ // TODO getKeepAliveMillis
+ return 0;
}
- public void stopDelivery()
- throws Exception
+ public int getMaxMessages()
{
- if (getState() != STARTED)
- throw new IllegalStateException("The MDB is not started");
- if (deliveryActive == false)
- return;
- deliveryActive = false;
- stopService();
+ // TODO getMaxMessages
+ return 0;
}
- /**
- * Sets the Optimized attribute of the JMSContainerInvoker object
- *
- * @param optimize The new Optimized value
- */
- public void setOptimized(final boolean optimize)
+ public int getMaxPoolSize()
{
- this.optimize = optimize;
+ // TODO getMaxPoolSize
+ return 0;
}
- public boolean isIdentical(Container container, Invocation mi)
+ public int getMinPoolSize()
{
- throw new Error("Not valid for MessageDriven beans");
+ // TODO getMinPoolSize
+ return 0;
}
- public Object getEJBHome()
+ public void setKeepAliveMillis(long keepAlive)
{
- throw new Error("Not valid for MessageDriven beans");
- }
-
- public EJBMetaData getEJBMetaData()
- {
- throw new Error("Not valid for MessageDriven beans");
- }
-
- public Collection getEntityCollection(Collection ids)
- {
- throw new Error("Not valid for MessageDriven beans");
- }
-
- public Object getEntityEJBObject(Object id)
- {
- throw new Error("Not valid for MessageDriven beans");
- }
-
- public Object getStatefulSessionEJBObject(Object id)
- {
- throw new Error("Not valid for MessageDriven beans");
- }
-
- public Object getStatelessSessionEJBObject()
- {
- throw new Error("Not valid for MessageDriven beans");
- }
-
- public boolean isOptimized()
- {
- return optimize;
- }
-
- /**
- * XmlLoadable implementation.
- *
- * @todo FIXME - we ought to move all config into MDBConfig, but I do not do that
- * now due to backward compatibility.
- *
- * @param element Description of Parameter
- * @throws DeploymentException Description of Exception
- */
- public void importXml(final Element element) throws Exception
- {
- try
- {
- if ("false".equalsIgnoreCase(MetaData.getElementContent(MetaData.getUniqueChild(element, "CreateJBossMQDestination"))))
- {
- createJBossMQDestination = false;
- }
- }
- catch (Exception ignore)
- {
- }
+ // TODO setKeepAliveMillis
+ throw new org.jboss.util.NotImplementedException("setKeepAliveMillis");
- try
- {
- String maxMessages = MetaData.getElementContent
- (MetaData.getUniqueChild(element, "MaxMessages"));
- maxMessagesNr = Integer.parseInt(maxMessages);
- }
- catch (Exception ignore)
- {
- }
-
- try
- {
- String minSize = MetaData.getElementContent
- (MetaData.getUniqueChild(element, "MinimumSize"));
- minPoolSize = Integer.parseInt(minSize);
- }
- catch (Exception ignore)
- {
- }
-
- try
- {
- String maxSize = MetaData.getElementContent
- (MetaData.getUniqueChild(element, "MaximumSize"));
- maxPoolSize = Integer.parseInt(maxSize);
- }
- catch (Exception ignore)
- {
- }
-
- try
- {
- String keepAliveMillis = MetaData.getElementContent
- (MetaData.getUniqueChild(element, "KeepAliveMillis"));
- keepAlive = Integer.parseInt(keepAliveMillis);
- }
- catch (Exception ignore)
- {
- }
-
- Element mdbConfig = MetaData.getUniqueChild(element, "MDBConfig");
-
- try
- {
- String reconnect = MetaData.getElementContent
- (MetaData.getUniqueChild(mdbConfig, "ReconnectIntervalSec"));
- reconnectInterval = Long.parseLong(reconnect) * 1000;
- }
- catch (Exception ignore)
- {
- }
-
- try
- {
- if ("false".equalsIgnoreCase(MetaData.getElementContent(MetaData.getUniqueChild(mdbConfig, "DeliveryActive"))))
- {
- deliveryActive = false;
- }
- }
- catch (Exception ignore)
- {
- }
-
- // Get Dead letter queue config - and save it for later use
- Element dlqEl = MetaData.getOptionalChild(mdbConfig, "DLQConfig");
- if (dlqEl != null)
- {
- dlqConfig = (Element) dlqEl.cloneNode(true);
- useDLQ = true;
- }
- else
- {
- useDLQ = false;
- }
-
- // If these are not found we will get a DeploymentException, I hope
- providerAdapterJNDI = MetaData.getElementContent
- (MetaData.getUniqueChild(element, "JMSProviderAdapterJNDI"));
-
- serverSessionPoolFactoryJNDI = MetaData.getElementContent
- (MetaData.getUniqueChild(element, "ServerSessionPoolFactoryJNDI"));
-
- // Check java:/ prefix
- if (!providerAdapterJNDI.startsWith("java:/"))
- {
- providerAdapterJNDI = "java:/" + providerAdapterJNDI;
- }
-
- if (!serverSessionPoolFactoryJNDI.startsWith("java:/"))
- {
- serverSessionPoolFactoryJNDI = "java:/" + serverSessionPoolFactoryJNDI;
- }
}
- /**
- * Initialize the container invoker. Sets up a connection, a server session
- * pool and a connection consumer for the configured destination.
- *
- * <p>Any JMSExceptions produced while initializing will be assumed to be caused
- * due to JMS Provider failure.
- *
- * @throws Exception Failed to initalize.
- */
- protected void createService() throws Exception
+ public void setMaxMessages(int maxMessages)
{
- importXml(invokerMetaData.getProxyFactoryConfig());
-
- exListener = new ExceptionListenerImpl(this);
- }
-
- /**
- * Initialize the container invoker. Sets up a connection, a server session
- * pool and a connection consumer for the configured destination.
- *
- * @throws Exception Failed to initalize.
- */
- protected void innerStartDelivery() throws Exception
- {
- if (deliveryActive == false)
- {
- log.debug("Delivery is disabled");
- return;
- }
+ // TODO setMaxMessages
+ throw new org.jboss.util.NotImplementedException("setMaxMessages");
- sendNotification(CONNECTING_NOTIFICATION, null);
-
- log.debug("Initializing");
-
- // Get the JMS provider
- JMSProviderAdapter adapter = getJMSProviderAdapter();
- log.debug("Provider adapter: " + adapter);
-
- // Set up Dead Letter Queue handler
- if (useDLQ)
- {
- dlqHandler = new DLQHandler(adapter, this);
- dlqHandler.importXml(dlqConfig);
- dlqHandler.create();
- }
-
- // Store TM reference locally - should we test for CMT Required
- tm = container.getTransactionManager();
-
- // Get configuration information - from EJB-xml
- MessageDrivenMetaData config = getMetaData();
-
- // Selector
- String messageSelector = config.getMessageSelector();
- String activationConfig = getActivationConfigProperty("messageSelector");
- if (activationConfig != null)
- messageSelector = activationConfig;
-
- // Queue or Topic - optional unfortunately
- String destinationType = config.getDestinationType();
- activationConfig = getActivationConfigProperty("destinationType");
- if (activationConfig != null)
- destinationType = activationConfig;
-
- // Is container managed?
- isContainerManagedTx = config.isContainerManagedTx();
- acknowledgeMode = config.getAcknowledgeMode();
- activationConfig = getActivationConfigProperty("acknowledgeMode");
- if (activationConfig != null)
- {
- if (activationConfig.equals("DUPS_OK_ACKNOWLEDGE"))
- acknowledgeMode = MessageDrivenMetaData.DUPS_OK_ACKNOWLEDGE_MODE;
- else
- acknowledgeMode = MessageDrivenMetaData.AUTO_ACKNOWLEDGE_MODE;
- }
-
- byte txType = config.getMethodTransactionType("onMessage",
- new Class[]{Message.class},
- InvocationType.LOCAL);
- isNotSupportedTx = txType == MetaData.TX_NOT_SUPPORTED;
-
- // Get configuration data from jboss.xml
- String destinationJNDI = config.getDestinationJndiName();
- activationConfig = getActivationConfigProperty("destination");
- if (activationConfig != null)
- destinationJNDI = activationConfig;
- // Try any EJB21 destination link
- if (destinationJNDI == null)
- {
- String link = config.getDestinationLink();
- if (link != null)
- {
- link = link.trim();
- if (link.length() > 0)
- {
- MessageDestinationMetaData destinationMetaData = container.getMessageDestination(link);
- if (destinationMetaData == null)
- log.warn("Unresolved message-destination-link '" + link + "' no message-destination in ejb-jar.xml");
- else
- {
- String jndiName = destinationMetaData.getJNDIName();
- if (jndiName == null)
- log.warn("The message-destination '" + link + "' has no jndi-name in jboss.xml");
- else
- destinationJNDI = jndiName;
- }
- }
- }
- }
-
- String user = config.getUser();
- String password = config.getPasswd();
-
- // Connect to the JNDI server and get a reference to root context
- Context context = adapter.getInitialContext();
- log.debug("context: " + context);
-
- // if we can't get the root context then exit with an exception
- if (context == null)
- {
- throw new RuntimeException("Failed to get the root context");
- }
-
- // Get the JNDI suffix of the destination
- String jndiSuffix = parseJndiSuffix(destinationJNDI, config.getEjbName());
- log.debug("jndiSuffix: " + jndiSuffix);
-
- // Unfortunately the destination is optional, so if we do not have one
- // here we have to look it up if we have a destinationJNDI, else give it
- // a default.
- if (destinationType == null)
- {
- log.warn("No message-driven-destination given; using; guessing type");
- destinationType = getDestinationType(context, destinationJNDI);
- }
-
- if ("javax.jms.Topic".equals(destinationType))
- {
- log.debug("Got destination type Topic for " + config.getEjbName());
-
- // create a topic connection
- Object factory = context.lookup(adapter.getTopicFactoryRef());
- TopicConnection tConnection = null;
- try
- {
- tConnection = ConnectionFactoryHelper.createTopicConnection(factory, user, password);
- connection = tConnection;
- }
- catch (ClassCastException e)
- {
- throw new DeploymentException("Expected a TopicConnection check your provider adaptor: "
- + adapter.getTopicFactoryRef());
- }
-
- try
- {
- // Fix: ClientId must be set as the first method call after connection creation.
- // Fix: ClientId is necessary for durable subscriptions.
-
- String clientId = config.getClientId();
- activationConfig = getActivationConfigProperty("clientID");
- if (activationConfig != null)
- clientId = activationConfig;
-
- log.debug("Using client id: " + clientId);
- if (clientId != null && clientId.length() > 0)
- connection.setClientID(clientId);
-
- // lookup or create the destination topic
- Topic topic = null;
- try
- {
- // First we try the specified topic
- if (destinationJNDI != null)
- topic = (Topic) context.lookup(destinationJNDI);
- else if (createJBossMQDestination == false)
- throw new DeploymentException("Unable to determine destination for '" + container.getBeanMetaData().getEjbName()
- + "' use destination-jndi-name in jboss.xml, an activation config property or a message-destination-link");
- }
- catch (NamingException e)
- {
- if (createJBossMQDestination == false)
- throw new DeploymentException("Could not find the topic destination-jndi-name=" + destinationJNDI, e);
- log.warn("Could not find the topic destination-jndi-name=" + destinationJNDI, e);
- }
- catch (ClassCastException e)
- {
- throw new DeploymentException("Expected a Topic destination-jndi-name=" + destinationJNDI, e);
- }
-
- // FIXME: This is not portable, only works for JBossMQ
- if (topic == null)
- topic = (Topic) createDestination(Topic.class,
- context,
- "topic/" + jndiSuffix,
- jndiSuffix);
-
- // set up the server session pool
- pool = createSessionPool(
- topic,
- tConnection,
- minPoolSize,
- maxPoolSize,
- keepAlive,
- true, // tx
- acknowledgeMode,
- new MessageListenerImpl(this));
-
- int subscriptionDurablity = config.getSubscriptionDurability();
- activationConfig = getActivationConfigProperty("subscriptionDurability");
- if (activationConfig != null)
- {
- if (activationConfig.equals("Durable"))
- subscriptionDurablity = MessageDrivenMetaData.DURABLE_SUBSCRIPTION;
- else
- subscriptionDurablity = MessageDrivenMetaData.NON_DURABLE_SUBSCRIPTION;
- }
- // To be no-durable or durable
- if (subscriptionDurablity != MessageDrivenMetaData.DURABLE_SUBSCRIPTION)
- {
- // Create non durable
- connectionConsumer =
- tConnection.createConnectionConsumer(topic,
- messageSelector,
- pool,
- maxMessagesNr);
- }
- else
- {
- // Durable subscription
- String durableName = config.getSubscriptionId();
- activationConfig = getActivationConfigProperty("subscriptionName");
- if (activationConfig != null)
- durableName = activationConfig;
-
- connectionConsumer =
- tConnection.createDurableConnectionConsumer(topic,
- durableName,
- messageSelector,
- pool,
- maxMessagesNr);
- }
- log.debug("Topic connectionConsumer set up");
- }
- catch (Throwable t)
- {
- try
- {
- tConnection.close();
- }
- catch (Throwable ignored)
- {
- }
- DeploymentException.rethrowAsDeploymentException("Error during topic setup", t);
- }
- }
- else if ("javax.jms.Queue".equals(destinationType))
- {
- log.debug("Got destination type Queue for " + config.getEjbName());
-
- // create a queue connection
- Object qFactory = context.lookup(adapter.getQueueFactoryRef());
- QueueConnection qConnection = null;
- try
- {
- qConnection = ConnectionFactoryHelper.createQueueConnection(qFactory, user, password);
- connection = qConnection;
- }
- catch (ClassCastException e)
- {
- throw new DeploymentException("Expected a QueueConnection check your provider adaptor: "
- + adapter.getQueueFactoryRef());
- }
-
- try
- {
- // Set the optional client id
- String clientId = config.getClientId();
- activationConfig = getActivationConfigProperty("clientID");
- if (activationConfig != null)
- clientId = activationConfig;
-
- log.debug("Using client id: " + clientId);
- if (clientId != null && clientId.length() > 0)
- connection.setClientID(clientId);
-
- // lookup or create the destination queue
- Queue queue = null;
- try
- {
- // First we try the specified queue
- if (destinationJNDI != null)
- queue = (Queue) context.lookup(destinationJNDI);
- else if (createJBossMQDestination == false)
- throw new DeploymentException("Unable to determine destination for '" + container.getBeanMetaData().getEjbName()
- + "' use destination-jndi-name in jboss.xml, an activation config property or a message-destination-link");
- }
- catch (NamingException e)
- {
- if (createJBossMQDestination == false)
- throw new DeploymentException("Could not find the queue destination-jndi-name=" + destinationJNDI, e);
- log.warn("Could not find the queue destination-jndi-name=" + destinationJNDI);
- }
- catch (ClassCastException e)
- {
- throw new DeploymentException("Expected a Queue destination-jndi-name=" + destinationJNDI);
- }
-
- // FIXME: This is not portable, only works for JBossMQ
- if (queue == null)
- queue = (Queue) createDestination(Queue.class,
- context,
- "queue/" + jndiSuffix,
- jndiSuffix);
-
- // set up the server session pool
- pool = createSessionPool(
- queue,
- qConnection,
- minPoolSize,
- maxPoolSize,
- keepAlive,
- true, // tx
- acknowledgeMode,
- new MessageListenerImpl(this));
- log.debug("Server session pool: " + pool);
-
- // create the connection consumer
- connectionConsumer =
- qConnection.createConnectionConsumer(queue,
- messageSelector,
- pool,
- maxMessagesNr);
- log.debug("Connection consumer: " + connectionConsumer);
- }
- catch (Throwable t)
- {
- try
- {
- qConnection.close();
- }
- catch (Throwable ignored)
- {
- }
- DeploymentException.rethrowAsDeploymentException("Error during queue setup", t);
- }
- }
- else
- throw new DeploymentException("Unknown destination-type " + destinationType);
-
- log.debug("Initialized with config " + toString());
-
- context.close();
-
- if (dlqHandler != null)
- {
- dlqHandler.start();
- }
-
- if (connection != null)
- {
- connection.setExceptionListener(exListener);
- connection.start();
- }
-
- sendNotification(CONNECTED_NOTIFICATION, null);
}
- protected void startService() throws Exception
+ public void setMaxPoolSize(int maxPoolSize)
{
- try
- {
- innerStartDelivery();
- }
- catch (final Throwable t)
- {
- // start a thread up to handle recovering the connection. so we can
- // attach to the jms resources once they become available
- exListener.handleFailure(t);
- return;
- }
- finally
- {
- // Clear any security context established by the jms connection
- SecurityActions.clear();
- }
- }
-
- protected void stopService() throws Exception
- {
- // Silence the exception listener
- if (exListener != null)
- {
- exListener.stop();
- }
-
- innerStopDelivery();
- }
-
- /**
- * Stop done from inside, we should not stop the exceptionListener in inner
- * stop.
- */
- protected void innerStopDelivery()
- {
- log.debug("innerStop");
-
- sendNotification(DISCONNECTING_NOTIFICATION, null);
+ // TODO setMaxPoolSize
+ throw new org.jboss.util.NotImplementedException("setMaxPoolSize");
- try
- {
- if (connection != null)
- {
- connection.setExceptionListener(null);
- log.debug("unset exception listener");
- }
- }
- catch (Throwable t)
- {
- log.trace("Could not set ExceptionListener to null", t);
- }
-
- // Stop the connection
- try
- {
- if (connection != null)
- {
- connection.stop();
- log.debug("connection stopped");
- }
- }
- catch (Throwable t)
- {
- log.trace("Could not stop JMS connection", t);
- }
-
- try
- {
- if (dlqHandler != null)
- dlqHandler.stop();
- }
- catch (Throwable t)
- {
- log.trace("Failed to stop the dlq handler", t);
- }
-
- // close the connection consumer
- try
- {
- if (connectionConsumer != null)
- connectionConsumer.close();
- }
- catch (Throwable t)
- {
- log.trace("Failed to close connection consumer", t);
- }
- connectionConsumer = null;
-
- // clear the server session pool (if it is clearable)
- try
- {
- if (pool instanceof StdServerSessionPool)
- {
- StdServerSessionPool p = (StdServerSessionPool) pool;
- p.clear();
- }
- }
- catch (Throwable t)
- {
- log.trace("Failed to clear session pool", t);
- }
-
- // close the connection
- if (connection != null)
- {
- try
- {
- connection.close();
- }
- catch (Throwable t)
- {
- log.trace("Failed to close connection", t);
- }
- }
- connection = null;
-
- // Take down DLQ
- try
- {
- if (dlqHandler != null)
- {
- dlqHandler.destroy();
- }
- }
- catch (Throwable t)
- {
- log.trace("Failed to close the dlq handler", t);
- }
- dlqHandler = null;
-
- sendNotification(DISCONNECTED_NOTIFICATION, null);
}
-
- public Object invoke(Object id,
- Method m,
- Object[] args,
- Transaction tx,
- Principal identity,
- Object credential)
- throws Exception
- {
- Invocation invocation = new Invocation(id, m, args, tx, identity, credential);
- invocation.setType(InvocationType.LOCAL);
-
- // Set the right context classloader
- ClassLoader oldCL = TCLAction.UTIL.getContextClassLoader();
- TCLAction.UTIL.setContextClassLoader(container.getClassLoader());
- container.pushENC();
- try
- {
- return container.invoke(invocation);
- }
- finally
- {
- container.popENC();
- TCLAction.UTIL.setContextClassLoader(oldCL);
- }
- }
-
- /**
- * Try to get a destination type by looking up the destination JNDI, or
- * provide a default if there is not destinationJNDI or if it is not possible
- * to lookup.
- *
- * @param ctx The naming context to lookup destinations from.
- * @param destinationJNDI The name to use when looking up destinations.
- * @return The destination type, either derived from destinationJDNI or
- * DEFAULT_DESTINATION_TYPE
- */
- protected String getDestinationType(Context ctx, String destinationJNDI)
+ public void setMinPoolSize(int minPoolSize)
{
- String destType = null;
-
- if (destinationJNDI != null)
- {
- try
- {
- Destination dest = (Destination) ctx.lookup(destinationJNDI);
- if (dest instanceof javax.jms.Topic)
- {
- destType = "javax.jms.Topic";
- }
- else if (dest instanceof javax.jms.Queue)
- {
- destType = "javax.jms.Queue";
- }
- }
- catch (NamingException ex)
- {
- log.debug("Could not do heristic lookup of destination ", ex);
- }
-
- }
- if (destType == null)
- {
- log.warn("Could not determine destination type, defaults to: " +
- DEFAULT_DESTINATION_TYPE);
-
- destType = DEFAULT_DESTINATION_TYPE;
- }
-
- return destType;
+ // TODO setMinPoolSize
+ throw new org.jboss.util.NotImplementedException("setMinPoolSize");
}
-
- /**
- * Return the JMSProviderAdapter that should be used.
- *
- * @return The JMSProviderAdapter to use.
- */
- protected JMSProviderAdapter getJMSProviderAdapter() throws NamingException
- {
- Context context = new InitialContext();
- try
- {
- log.debug("Looking up provider adapter: " + providerAdapterJNDI);
- return (JMSProviderAdapter) context.lookup(providerAdapterJNDI);
- }
- finally
- {
- context.close();
- }
- }
-
- /**
- * Create and or lookup a JMS destination.
- *
- * @param type Either javax.jms.Queue or javax.jms.Topic.
- * @param ctx The naming context to lookup destinations from.
- * @param jndiName The name to use when looking up destinations.
- * @param jndiSuffix The name to use when creating destinations.
- * @return The destination.
- * @throws IllegalArgumentException Type is not Queue or Topic.
- * @throws Exception Description of Exception
- */
- protected Destination createDestination(final Class type,
- final Context ctx,
- final String jndiName,
- final String jndiSuffix)
- throws Exception
- {
- try
- {
- // first try to look it up
- return (Destination) ctx.lookup(jndiName);
- }
- catch (NamingException e)
- {
- // if the lookup failes, the try to create it
- log.warn("destination not found: " + jndiName + " reason: " + e);
- log.warn("creating a new temporary destination: " + jndiName);
-
- //
- // jason: we should do away with this...
- //
- // attempt to create the destination (note, this is very
- // very, very unportable).
- //
-
- MBeanServer server = org.jboss.mx.util.MBeanServerLocator.locateJBoss();
-
- String methodName;
- if (type == Topic.class)
- {
- methodName = "createTopic";
- }
- else if (type == Queue.class)
- {
- methodName = "createQueue";
- }
- else
- {
- // type was not a Topic or Queue, bad user
- throw new IllegalArgumentException
- ("Expected javax.jms.Queue or javax.jms.Topic: " + type);
- }
-
- // invoke the server to create the destination
- server.invoke(new ObjectName("jboss.mq:service=DestinationManager"),
- methodName,
- new Object[]{jndiSuffix},
- new String[]{"java.lang.String"});
-
- // try to look it up again
- return (Destination) ctx.lookup(jndiName);
- }
- }
-
- protected String getActivationConfigProperty(String property)
- {
- MessageDrivenMetaData mdmd = getMetaData();
- ActivationConfigPropertyMetaData acpmd = mdmd.getActivationConfigProperty(property);
- if (acpmd != null)
- return acpmd.getValue();
- else
- return null;
- }
-
- /**
- * Create a server session pool for the given connection.
- *
- * @param destination the destination
- * @param connection The connection to use.
- * @param minSession The minumum number of sessions
- * @param maxSession The maximum number of sessions.
- * @param keepAlive The time to keep sessions alive
- * @param isTransacted True if the sessions are transacted.
- * @param ack The session acknowledgement mode.
- * @param listener The message listener.
- * @return A server session pool.
- * @throws JMSException
- * @throws NamingException Description of Exception
- */
- protected ServerSessionPool createSessionPool(
- final Destination destination,
- final Connection connection,
- final int minSession,
- final int maxSession,
- final long keepAlive,
- final boolean isTransacted,
- final int ack,
- final MessageListener listener)
- throws NamingException, JMSException
- {
- ServerSessionPool pool;
- Context context = new InitialContext();
-
- try
- {
- // first lookup the factory
- log.debug("looking up session pool factory: " +
- serverSessionPoolFactoryJNDI);
- ServerSessionPoolFactory factory = (ServerSessionPoolFactory)
- context.lookup(serverSessionPoolFactoryJNDI);
-
- // the create the pool
- pool = factory.getServerSessionPool(destination, connection, minSession, maxSession, keepAlive, isTransacted, ack, !isContainerManagedTx || isNotSupportedTx, listener);
- }
- finally
- {
- context.close();
- }
-
- return pool;
- }
-
- /**
- * Notify of an event
- *
- * @param event the event
- * @param userData any user data, e.g. the exception on a failure
- */
- protected void sendNotification(String event, Object userData)
- {
- Notification notif = new Notification(event, getServiceName(), getNextNotificationSequenceNumber());
- notif.setUserData(userData);
- sendNotification(notif);
- }
-
- /**
- * Parse the JNDI suffix from the given JNDI name.
- *
- * @param jndiname The JNDI name used to lookup the destination.
- * @param defautSuffix Description of Parameter
- * @return The parsed suffix or the defaultSuffix
- */
- protected String parseJndiSuffix(final String jndiname,
- final String defautSuffix)
- {
- // jndiSuffix is merely the name that the user has given the MDB.
- // since the jndi name contains the message type I have to split
- // at the "/" if there is no slash then I use the entire jndi name...
- String jndiSuffix = "";
-
- if (jndiname != null)
- {
- int indexOfSlash = jndiname.indexOf("/");
- if (indexOfSlash != -1)
- {
- jndiSuffix = jndiname.substring(indexOfSlash + 1);
- }
- else
- {
- jndiSuffix = jndiname;
- }
- }
- else
- {
- // if the jndi name from jboss.xml is null then lets use the ejbName
- jndiSuffix = defautSuffix;
- }
-
- return jndiSuffix;
- }
-
- /**
- * An implementation of MessageListener that passes messages on to the
- * container invoker.
- */
- class MessageListenerImpl implements MessageListener
- {
- /** The container invoker. */
- JMSContainerInvoker invoker;
-
- /**
- * Construct a <tt>MessageListenerImpl</tt> .
- *
- * @param invoker The container invoker. Must not be null.
- */
- MessageListenerImpl(final JMSContainerInvoker invoker)
- {
- this.invoker = invoker;
- }
-
- /**
- * Process a message.
- *
- * @param message The message to process.
- */
- public void onMessage(final Message message)
- {
- if (log.isTraceEnabled())
- {
- log.trace("processing message: " + message);
- }
-
- Object id;
- try
- {
- id = message.getJMSMessageID();
- }
- catch (JMSException e)
- {
- // what ?
- id = "JMSContainerInvoker";
- }
-
- // Invoke, shuld we catch any Exceptions??
- try
- {
- Transaction tx = tm.getTransaction();
-
- // DLQHandling
- if (useDLQ && // Is Dead Letter Queue used at all
- message.getJMSRedelivered() && // Was message resent
- dlqHandler.handleRedeliveredMessage(message, tx)) //Did the DLQ handler take care of the message
- {
- // Message will be placed on Dead Letter Queue,
- // if redelivered to many times
- return;
- }
-
- invoker.invoke(id, // Object id - where used?
- ON_MESSAGE, // Method to invoke
- new Object[]{message}, // argument
- tx, // Transaction
- null, // Principal
- null); // Cred
-
- }
- catch (Exception e)
- {
- log.error("Exception in JMSCI message listener", e);
- }
- }
- }
-
- /** ExceptionListener for failover handling. */
- class ExceptionListenerImpl implements ExceptionListener
- {
- Object lock = new Object();
- JMSContainerInvoker invoker;
- Thread currentThread;
- boolean notStopped = true;
-
- /**
- * Create a new ExceptionListenerImpl.
- *
- * @param invoker the container invoker
- */
- ExceptionListenerImpl(final JMSContainerInvoker invoker)
- {
- this.invoker = invoker;
- }
-
- /**
- * Called on jms connection failure events
- *
- * @param ex the jms connection failure exception
- */
- public void onException(JMSException ex)
- {
- handleFailure(ex);
- }
-
- /**
- * Handle a failure
- *
- * @param t the failure
- */
- public void handleFailure(Throwable t)
- {
- MessageDrivenMetaData metaData = invoker.getMetaData();
- log.warn("JMS provider failure detected for " + metaData.getEjbName(), t);
-
- // JBAS-3750 - Help debug integration with foreign JMS providers
- if (t instanceof JMSException)
- {
- Exception le = ((JMSException)t).getLinkedException();
- if (le != null)
- log.debug("Linked exception: " + le + ", cause: " + le.getCause());
- }
-
- // Run the reconnection in the background
- String name = "JMSContainerInvoker("+metaData.getEjbName()+") Reconnect";
- synchronized (lock)
- {
- if (currentThread != null)
- {
- log.debug("Already a reconnect thread: " + currentThread + " for " + metaData.getEjbName());
- return;
- }
- Runnable runnable = new ExceptionListenerRunnable(t);
- currentThread = new Thread(runnable, name);
- try
- {
- currentThread.setDaemon(true);
- currentThread.start();
- }
- catch (RuntimeException rethrow)
- {
- currentThread = null;
- throw rethrow;
- }
- catch (Error rethrow)
- {
- currentThread = null;
- throw rethrow;
- }
- }
- }
-
- class ExceptionListenerRunnable implements Runnable
- {
- Throwable failure;
-
- /**
- * Create a new ExceptionListenerRunnable.
- *
- * @param failure the error
- */
- public ExceptionListenerRunnable(Throwable failure)
- {
- this.failure = failure;
- }
-
- /**
- * Try to reconnect to the jms provider until explicitly stopped.
- */
- public void run()
- {
- MessageDrivenMetaData metaData = invoker.getMetaData();
- try
- {
- boolean tryIt = true;
- while (tryIt && notStopped)
- {
- try
- {
- invoker.innerStopDelivery();
- }
- catch (Throwable t)
- {
- log.error("Unhandled error stopping connection for " + metaData.getEjbName(), t);
- }
-
- sendNotification(FAILURE_NOTIFICATION, failure);
-
- try
- {
- log.info("Waiting for reconnect internal " + reconnectInterval + "ms for " + metaData.getEjbName());
- try
- {
- Thread.sleep(reconnectInterval);
- }
- catch (InterruptedException ie)
- {
- tryIt = false;
- return;
- }
-
- // Reboot container
- log.info("Trying to reconnect to JMS provider for " + metaData.getEjbName());
- invoker.innerStartDelivery();
- tryIt = false;
-
- log.info("Reconnected to JMS provider for " + metaData.getEjbName());
- }
- catch (Throwable t)
- {
- log.error("Reconnect failed: JMS provider failure detected for " + metaData.getEjbName(), t);
- }
- }
- }
- finally
- {
- synchronized (lock)
- {
- currentThread = null;
- }
- }
- }
- }
-
- void stop()
- {
- synchronized (lock)
- {
- log.debug("Stop requested for recovery thread: " + currentThread);
- notStopped = false;
- if (currentThread != null)
- {
- currentThread.interrupt();
- log.debug("Recovery thread interrupted: " + currentThread);
- }
- }
- }
- }
-
- /**
- * Return a string representation of the current config state.
- */
- public String toString()
- {
- MessageDrivenMetaData metaData = getMetaData();
- String destinationJNDI = metaData.getDestinationJndiName();
- return super.toString() +
- "{ maxMessagesNr=" + maxMessagesNr +
- ", maxPoolSize=" + maxPoolSize +
- ", reconnectInterval=" + reconnectInterval +
- ", providerAdapterJNDI=" + providerAdapterJNDI +
- ", serverSessionPoolFactoryJNDI=" + serverSessionPoolFactoryJNDI +
- ", acknowledgeMode=" + acknowledgeMode +
- ", isContainerManagedTx=" + isContainerManagedTx +
- ", isNotSupportedTx=" + isNotSupportedTx +
- ", useDLQ=" + useDLQ +
- ", dlqHandler=" + dlqHandler +
- ", destinationJNDI=" + destinationJNDI +
- " }";
- }
-
- interface TCLAction
- {
- class UTIL
- {
- static TCLAction getTCLAction()
- {
- return System.getSecurityManager() == null ? NON_PRIVILEGED : PRIVILEGED;
- }
-
- static ClassLoader getContextClassLoader()
- {
- return getTCLAction().getContextClassLoader();
- }
-
- static ClassLoader getContextClassLoader(Thread thread)
- {
- return getTCLAction().getContextClassLoader(thread);
- }
-
- static void setContextClassLoader(ClassLoader cl)
- {
- getTCLAction().setContextClassLoader(cl);
- }
-
- static void setContextClassLoader(Thread thread, ClassLoader cl)
- {
- getTCLAction().setContextClassLoader(thread, cl);
- }
- }
-
- TCLAction NON_PRIVILEGED = new TCLAction()
- {
- public ClassLoader getContextClassLoader()
- {
- return Thread.currentThread().getContextClassLoader();
- }
-
- public ClassLoader getContextClassLoader(Thread thread)
- {
- return thread.getContextClassLoader();
- }
-
- public void setContextClassLoader(ClassLoader cl)
- {
- Thread.currentThread().setContextClassLoader(cl);
- }
-
- public void setContextClassLoader(Thread thread, ClassLoader cl)
- {
- thread.setContextClassLoader(cl);
- }
- };
-
- TCLAction PRIVILEGED = new TCLAction()
- {
- private final PrivilegedAction getTCLPrivilegedAction = new PrivilegedAction()
- {
- public Object run()
- {
- return Thread.currentThread().getContextClassLoader();
- }
- };
-
- public ClassLoader getContextClassLoader()
- {
- return (ClassLoader) AccessController.doPrivileged(getTCLPrivilegedAction);
- }
-
- public ClassLoader getContextClassLoader(final Thread thread)
- {
- return (ClassLoader) AccessController.doPrivileged(new PrivilegedAction()
- {
- public Object run()
- {
- return thread.getContextClassLoader();
- }
- });
- }
-
- public void setContextClassLoader(final ClassLoader cl)
- {
- AccessController.doPrivileged(new PrivilegedAction()
- {
- public Object run()
- {
- Thread.currentThread().setContextClassLoader(cl);
- return null;
- }
- });
- }
-
- public void setContextClassLoader(final Thread thread, final ClassLoader cl)
- {
- AccessController.doPrivileged(new PrivilegedAction()
- {
- public Object run()
- {
- thread.setContextClassLoader(cl);
- return null;
- }
- });
- }
- };
-
- ClassLoader getContextClassLoader();
-
- ClassLoader getContextClassLoader(Thread thread);
-
- void setContextClassLoader(ClassLoader cl);
-
- void setContextClassLoader(Thread thread, ClassLoader cl);
- }
}
Modified: trunk/server/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvokerMBean.java
===================================================================
--- trunk/server/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvokerMBean.java 2007-10-11 12:11:24 UTC (rev 66022)
+++ trunk/server/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvokerMBean.java 2007-10-11 12:19:57 UTC (rev 66023)
@@ -1,8 +1,8 @@
/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
@@ -21,6 +21,7 @@
*/
package org.jboss.ejb.plugins.jms;
+import org.jboss.ejb.plugins.inflow.JBossMessageEndpointFactoryMBean;
import org.jboss.metadata.MessageDrivenMetaData;
/**
@@ -29,7 +30,7 @@
* @author <a href="mailto:adrian at jboss.com">Adrian Brock</a>
* @version <tt>$Revision$</tt>
*/
-public interface JMSContainerInvokerMBean extends org.jboss.system.ServiceMBean
+public interface JMSContainerInvokerMBean extends JBossMessageEndpointFactoryMBean
{
/**
* Get the minimum pool size
@@ -108,18 +109,4 @@
* @return true to create
*/
boolean getCreateJBossMQDestination();
-
- /**
- * Start delivery
- *
- * @throws Exception for any error
- */
- void startDelivery() throws Exception;
-
- /**
- * Stop delivery
- *
- * @throws Exception for any error
- */
- void stopDelivery() throws Exception;
}
Deleted: trunk/server/src/main/org/jboss/ejb/plugins/jms/SecurityActions.java
===================================================================
--- trunk/server/src/main/org/jboss/ejb/plugins/jms/SecurityActions.java 2007-10-11 12:11:24 UTC (rev 66022)
+++ trunk/server/src/main/org/jboss/ejb/plugins/jms/SecurityActions.java 2007-10-11 12:19:57 UTC (rev 66023)
@@ -1,77 +0,0 @@
-/*
-* JBoss, Home of Professional Open Source
-* Copyright 2005, JBoss Inc., and individual contributors as indicated
-* by the @authors tag. See the copyright.txt in the distribution for a
-* full listing of individual contributors.
-*
-* This is free software; you can redistribute it and/or modify it
-* under the terms of the GNU Lesser General Public License as
-* published by the Free Software Foundation; either version 2.1 of
-* the License, or (at your option) any later version.
-*
-* This software is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this software; if not, write to the Free
-* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-*/
-package org.jboss.ejb.plugins.jms;
-
-import java.security.PrivilegedAction;
-import java.security.AccessController;
-
-import org.jboss.security.SecurityAssociation;
-
-/** The priviledged actions in used in this package
- *
- * @author Scott.Stark at jboss.org
- * @version $Revision$
- */
-class SecurityActions
-{
- interface SubjectActions
- {
- SubjectActions PRIVILEGED = new SubjectActions()
- {
- private final PrivilegedAction clearAction = new PrivilegedAction()
- {
- public Object run()
- {
- SecurityAssociation.clear();
- return null;
- }
- };
-
- public void clear()
- {
- AccessController.doPrivileged(clearAction);
- }
- };
-
- SubjectActions NON_PRIVILEGED = new SubjectActions()
- {
- public void clear()
- {
- SecurityAssociation.clear();
- }
- };
-
- void clear();
- }
-
- static void clear()
- {
- if(System.getSecurityManager() == null)
- {
- SubjectActions.NON_PRIVILEGED.clear();
- }
- else
- {
- SubjectActions.PRIVILEGED.clear();
- }
- }
-}
More information about the jboss-cvs-commits
mailing list