[jboss-cvs] JBossAS SVN: r66024 - trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Oct 11 08:24:43 EDT 2007
Author: adrian at jboss.org
Date: 2007-10-11 08:24:43 -0400 (Thu, 11 Oct 2007)
New Revision: 66024
Modified:
trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java
Log:
[JBAS-4517] - Add support for the old JMSContainerInvoker notifications
Modified: trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java
===================================================================
--- trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java 2007-10-11 12:19:57 UTC (rev 66023)
+++ trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java 2007-10-11 12:24:43 UTC (rev 66024)
@@ -22,6 +22,7 @@
package org.jboss.resource.adapter.jms.inflow;
import java.lang.reflect.Method;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.Destination;
@@ -37,6 +38,7 @@
import javax.jms.TopicConnectionFactory;
import javax.jms.XAQueueConnectionFactory;
import javax.jms.XATopicConnectionFactory;
+import javax.management.Notification;
import javax.naming.Context;
import javax.resource.ResourceException;
import javax.resource.spi.endpoint.MessageEndpointFactory;
@@ -46,13 +48,12 @@
import org.jboss.jms.jndi.JMSProviderAdapter;
import org.jboss.logging.Logger;
+import org.jboss.mx.util.JBossNotificationBroadcasterSupport;
import org.jboss.resource.adapter.jms.JmsResourceAdapter;
import org.jboss.tm.TransactionManagerLocator;
import org.jboss.util.Strings;
import org.jboss.util.naming.Util;
-import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
-
/**
* A generic jms Activation.
*
@@ -64,6 +65,21 @@
/** The log */
private static final Logger log = Logger.getLogger(JmsActivation.class);
+ /** Notification sent before connectioning */
+ private static final String CONNECTING_NOTIFICATION = "org.jboss.ejb.plugins.jms.CONNECTING";
+
+ /** Notification sent after connection */
+ private static final String CONNECTED_NOTIFICATION = "org.jboss.ejb.plugins.jms.CONNECTED";
+
+ /** Notification sent before disconnection */
+ private static final String DISCONNECTING_NOTIFICATION = "org.jboss.ejb.plugins.jms.DISCONNECTING";
+
+ /** Notification sent before disconnected */
+ private static final String DISCONNECTED_NOTIFICATION = "org.jboss.ejb.plugins.jms.DISCONNECTED";
+
+ /** Notification sent at connection failure */
+ private static final String FAILURE_NOTIFICATION = "org.jboss.ejb.plugins.jms.FAILURE";
+
/** The onMessage method */
public static final Method ONMESSAGE;
@@ -76,11 +92,14 @@
/** The message endpoint factory */
protected MessageEndpointFactory endpointFactory;
+ /** The notification emitter */
+ protected JBossNotificationBroadcasterSupport emitter;
+
/** Whether delivery is active */
- protected SynchronizedBoolean deliveryActive;
+ protected AtomicBoolean deliveryActive = new AtomicBoolean(false);
// Whether we are in the failure recovery loop
- private SynchronizedBoolean inFailure = new SynchronizedBoolean(false);
+ private AtomicBoolean inFailure = new AtomicBoolean(false);
/** The jms provider adapter */
protected JMSProviderAdapter adapter;
@@ -129,6 +148,8 @@
{
throw new ResourceException(e);
}
+ if (endpointFactory instanceof JBossNotificationBroadcasterSupport)
+ emitter = (JBossNotificationBroadcasterSupport) endpointFactory;
}
/**
@@ -166,11 +187,7 @@
public TransactionManager getTransactionManager()
{
if (tm == null)
- {
- tm = TransactionManagerLocator.getInstance().locate();
-
- }
-
+ tm = TransactionManagerLocator.locateTransactionManager();
return tm;
}
@@ -213,7 +230,7 @@
*/
public void start() throws ResourceException
{
- deliveryActive = new SynchronizedBoolean(true);
+ deliveryActive.set(true);
ra.getWorkManager().scheduleWork(new SetupActivation());
}
@@ -228,6 +245,8 @@
/**
* Handles any failure by trying to reconnect
+ *
+ * @param failure the reason for the failure
*/
public void handleFailure(Throwable failure)
{
@@ -235,13 +254,16 @@
int reconnectCount = 0;
// Only enter the failure loop once
- if (inFailure.set(true))
+ if (inFailure.getAndSet(true))
return;
try
{
while (deliveryActive.get() && reconnectCount < spec.getReconnectAttempts())
{
teardown();
+
+ sendNotification(FAILURE_NOTIFICATION, failure);
+
try
{
Thread.sleep(spec.getReconnectIntervalLong());
@@ -306,6 +328,8 @@
protected void setup() throws Exception
{
log.debug("Setting up " + spec);
+
+ sendNotification(CONNECTING_NOTIFICATION, null);
setupJMSProviderAdapter();
Context ctx = adapter.getInitialContext();
@@ -323,6 +347,8 @@
setupSessionPool();
log.debug("Setup complete " + this);
+
+ sendNotification(CONNECTED_NOTIFICATION, null);
}
/**
@@ -332,16 +358,22 @@
{
log.debug("Tearing down " + spec);
+ sendNotification(DISCONNECTING_NOTIFICATION, null);
+
teardownSessionPool();
teardownConnection();
teardownDestination();
teardownDLQ();
log.debug("Tearing down complete " + this);
+
+ sendNotification(DISCONNECTED_NOTIFICATION, null);
}
/**
* Get the jms provider
+ *
+ * @throws Exception for any error
*/
protected void setupJMSProviderAdapter() throws Exception
{
@@ -364,7 +396,7 @@
{
if (spec.isUseDLQ())
{
- Class clazz = Thread.currentThread().getContextClassLoader().loadClass(spec.getDLQHandler());
+ Class<?> clazz = Thread.currentThread().getContextClassLoader().loadClass(spec.getDLQHandler());
dlqHandler = (DLQHandler) clazz.newInstance();
dlqHandler.setup(this, ctx);
}
@@ -398,7 +430,7 @@
*/
protected void setupDestination(Context ctx) throws Exception
{
- Class destinationType;
+ Class<?> destinationType;
if (spec.isTopic())
destinationType = Topic.class;
else
@@ -446,6 +478,7 @@
* @param user the user
* @param pass the password
* @param clientID the client id
+ * @return the connection
* @throws Exception for any error
*/
protected QueueConnection setupQueueConnection(Context ctx, String user, String pass, String clientID) throws Exception
@@ -502,6 +535,7 @@
* @param user the user
* @param pass the password
* @param clientID the client id
+ * @return the connection
* @throws Exception for any error
*/
protected TopicConnection setupTopicConnection(Context ctx, String user, String pass, String clientID) throws Exception
@@ -624,6 +658,29 @@
}
/**
+ * Notify of an event
+ *
+ * @param event the event
+ * @param userData any user data, e.g. the exception on a failure
+ */
+ protected void sendNotification(String event, Object userData)
+ {
+ if (emitter == null)
+ return;
+
+ try
+ {
+ Notification notif = new Notification(event, spec, emitter.nextNotificationSequenceNumber());
+ notif.setUserData(userData);
+ emitter.sendNotification(notif);
+ }
+ catch (Throwable t)
+ {
+ log.warn("Error sending notification: " + event, t);
+ }
+ }
+
+ /**
* Handles the setup
*/
private class SetupActivation implements Work
More information about the jboss-cvs-commits
mailing list