[jboss-cvs] JBossAS SVN: r66024 - trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Oct 11 08:24:43 EDT 2007


Author: adrian at jboss.org
Date: 2007-10-11 08:24:43 -0400 (Thu, 11 Oct 2007)
New Revision: 66024

Modified:
   trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java
Log:
[JBAS-4517] - Add support for the old JMSContainerInvoker notifications

Modified: trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java
===================================================================
--- trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java	2007-10-11 12:19:57 UTC (rev 66023)
+++ trunk/connector/src/main/org/jboss/resource/adapter/jms/inflow/JmsActivation.java	2007-10-11 12:24:43 UTC (rev 66024)
@@ -22,6 +22,7 @@
 package org.jboss.resource.adapter.jms.inflow;
 
 import java.lang.reflect.Method;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
@@ -37,6 +38,7 @@
 import javax.jms.TopicConnectionFactory;
 import javax.jms.XAQueueConnectionFactory;
 import javax.jms.XATopicConnectionFactory;
+import javax.management.Notification;
 import javax.naming.Context;
 import javax.resource.ResourceException;
 import javax.resource.spi.endpoint.MessageEndpointFactory;
@@ -46,13 +48,12 @@
 
 import org.jboss.jms.jndi.JMSProviderAdapter;
 import org.jboss.logging.Logger;
+import org.jboss.mx.util.JBossNotificationBroadcasterSupport;
 import org.jboss.resource.adapter.jms.JmsResourceAdapter;
 import org.jboss.tm.TransactionManagerLocator;
 import org.jboss.util.Strings;
 import org.jboss.util.naming.Util;
 
-import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
-
 /**
  * A generic jms Activation.
  * 
@@ -64,6 +65,21 @@
    /** The log */
    private static final Logger log = Logger.getLogger(JmsActivation.class);
    
+   /** Notification sent before connectioning */
+   private static final String CONNECTING_NOTIFICATION = "org.jboss.ejb.plugins.jms.CONNECTING";
+
+   /** Notification sent after connection */
+   private static final String CONNECTED_NOTIFICATION = "org.jboss.ejb.plugins.jms.CONNECTED";
+
+   /** Notification sent before disconnection */
+   private static final String DISCONNECTING_NOTIFICATION = "org.jboss.ejb.plugins.jms.DISCONNECTING";
+
+   /** Notification sent before disconnected */
+   private static final String DISCONNECTED_NOTIFICATION = "org.jboss.ejb.plugins.jms.DISCONNECTED";
+
+   /** Notification sent at connection failure */
+   private static final String FAILURE_NOTIFICATION = "org.jboss.ejb.plugins.jms.FAILURE";
+
    /** The onMessage method */
    public static final Method ONMESSAGE; 
    
@@ -76,11 +92,14 @@
    /** The message endpoint factory */
    protected MessageEndpointFactory endpointFactory;
    
+   /** The notification emitter */
+   protected JBossNotificationBroadcasterSupport emitter;
+   
    /** Whether delivery is active */
-   protected SynchronizedBoolean deliveryActive;
+   protected AtomicBoolean deliveryActive = new AtomicBoolean(false);
 
    // Whether we are in the failure recovery loop
-   private SynchronizedBoolean inFailure = new SynchronizedBoolean(false);
+   private AtomicBoolean inFailure = new AtomicBoolean(false);
 
    /** The jms provider adapter */
    protected JMSProviderAdapter adapter;
@@ -129,6 +148,8 @@
       {
          throw new ResourceException(e);
       }
+      if (endpointFactory instanceof JBossNotificationBroadcasterSupport)
+         emitter = (JBossNotificationBroadcasterSupport) endpointFactory;
    }
 
    /**
@@ -166,11 +187,7 @@
    public TransactionManager getTransactionManager()
    {
       if (tm == null)
-      {
-         tm = TransactionManagerLocator.getInstance().locate();
-
-      }
-
+         tm = TransactionManagerLocator.locateTransactionManager();
       return tm;
    }
 
@@ -213,7 +230,7 @@
     */
    public void start() throws ResourceException
    {
-      deliveryActive = new SynchronizedBoolean(true);
+      deliveryActive.set(true);
       ra.getWorkManager().scheduleWork(new SetupActivation());
    }
 
@@ -228,6 +245,8 @@
 
    /**
     * Handles any failure by trying to reconnect
+    * 
+    * @param failure the reason for the failure
     */
    public void handleFailure(Throwable failure)
    {
@@ -235,13 +254,16 @@
       int reconnectCount = 0;
       
       // Only enter the failure loop once
-      if (inFailure.set(true))
+      if (inFailure.getAndSet(true))
          return;
       try
       {
          while (deliveryActive.get() && reconnectCount < spec.getReconnectAttempts())
          {
             teardown();
+
+            sendNotification(FAILURE_NOTIFICATION, failure);
+
             try
             {
                Thread.sleep(spec.getReconnectIntervalLong());
@@ -306,6 +328,8 @@
    protected void setup() throws Exception
    {
       log.debug("Setting up " + spec);
+      
+      sendNotification(CONNECTING_NOTIFICATION, null);
 
       setupJMSProviderAdapter();
       Context ctx = adapter.getInitialContext();
@@ -323,6 +347,8 @@
       setupSessionPool();
       
       log.debug("Setup complete " + this);
+
+      sendNotification(CONNECTED_NOTIFICATION, null);
    }
    
    /**
@@ -332,16 +358,22 @@
    {
       log.debug("Tearing down " + spec);
 
+      sendNotification(DISCONNECTING_NOTIFICATION, null);
+
       teardownSessionPool();
       teardownConnection();
       teardownDestination();
       teardownDLQ();
 
       log.debug("Tearing down complete " + this);
+      
+      sendNotification(DISCONNECTED_NOTIFICATION, null);
    }
 
    /**
     * Get the jms provider
+    * 
+    * @throws Exception for any error
     */
    protected void setupJMSProviderAdapter() throws Exception
    {
@@ -364,7 +396,7 @@
    {
       if (spec.isUseDLQ())
       {
-         Class clazz = Thread.currentThread().getContextClassLoader().loadClass(spec.getDLQHandler());
+         Class<?> clazz = Thread.currentThread().getContextClassLoader().loadClass(spec.getDLQHandler());
          dlqHandler = (DLQHandler) clazz.newInstance();
          dlqHandler.setup(this, ctx);
       }
@@ -398,7 +430,7 @@
     */
    protected void setupDestination(Context ctx) throws Exception
    {
-      Class destinationType;
+      Class<?> destinationType;
       if (spec.isTopic())
          destinationType = Topic.class;
       else
@@ -446,6 +478,7 @@
     * @param user the user
     * @param pass the password
     * @param clientID the client id
+    * @return the connection
     * @throws Exception for any error
     */
    protected QueueConnection setupQueueConnection(Context ctx, String user, String pass, String clientID) throws Exception
@@ -502,6 +535,7 @@
     * @param user the user
     * @param pass the password
     * @param clientID the client id
+    * @return the connection
     * @throws Exception for any error
     */
    protected TopicConnection setupTopicConnection(Context ctx, String user, String pass, String clientID) throws Exception
@@ -624,6 +658,29 @@
    }
 
    /**
+    * Notify of an event
+    * 
+    * @param event the event
+    * @param userData any user data, e.g. the exception on a failure
+    */
+   protected void sendNotification(String event, Object userData)
+   {
+      if (emitter == null)
+         return;
+      
+      try
+      {
+         Notification notif = new Notification(event, spec, emitter.nextNotificationSequenceNumber());
+         notif.setUserData(userData);
+         emitter.sendNotification(notif);
+      }
+      catch (Throwable t)
+      {
+         log.warn("Error sending notification: " + event, t);
+      }
+   }
+
+   /**
     * Handles the setup
     */
    private class SetupActivation implements Work




More information about the jboss-cvs-commits mailing list