[jboss-cvs] JBoss Messaging SVN: r5826 - in trunk/src/main/org/jboss/messaging/ra: inflow and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Feb 5 14:47:14 EST 2009


Author: jesper.pedersen
Date: 2009-02-05 14:47:14 -0500 (Thu, 05 Feb 2009)
New Revision: 5826

Added:
   trunk/src/main/org/jboss/messaging/ra/inflow/
   trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivation.java
   trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivationSpec.java
Modified:
   trunk/src/main/org/jboss/messaging/ra/JBMResourceAdapter.java
   trunk/src/main/org/jboss/messaging/ra/Util.java
Log:
[JBMESSAGING-1367] Create JCA resource adapter for JBM 2.0

Modified: trunk/src/main/org/jboss/messaging/ra/JBMResourceAdapter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/ra/JBMResourceAdapter.java	2009-02-05 19:24:16 UTC (rev 5825)
+++ trunk/src/main/org/jboss/messaging/ra/JBMResourceAdapter.java	2009-02-05 19:47:14 UTC (rev 5826)
@@ -21,6 +21,14 @@
  */
 package org.jboss.messaging.ra;
 
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.jms.client.JBossConnectionFactory;
+import org.jboss.messaging.ra.inflow.JBMActivation;
+import org.jboss.messaging.ra.inflow.JBMActivationSpec;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.resource.NotSupportedException;
@@ -30,11 +38,9 @@
 import javax.resource.spi.ResourceAdapter;
 import javax.resource.spi.ResourceAdapterInternalException;
 import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.resource.spi.work.WorkManager;
 import javax.transaction.xa.XAResource;
 
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.jms.client.JBossConnectionFactory;
-
 /**
  * The resource adapter for JBoss Messaging
  * 
@@ -62,6 +68,9 @@
    /** Have the factory been configured */
    private AtomicBoolean configured;
 
+   /** The activations by activation spec */
+   private Map activations;
+
    /**
     * Constructor
     */
@@ -73,22 +82,46 @@
       raProperties = new JBMRAProperties();
       factory = null;
       configured = new AtomicBoolean(false);
+      activations = new ConcurrentHashMap();
    }
 
+   /**
+    * Endpoint activation
+    * @param endpointFactory The endpoint factory
+    * @param spec The activation spec
+    * @exception ResourceException Thrown if an error occurs
+    */
    public void endpointActivation(MessageEndpointFactory endpointFactory, ActivationSpec spec) throws ResourceException
    {
       if (trace)
          log.trace("endpointActivation(" + endpointFactory + ", " + spec + ")");
 
-      throw new NotSupportedException("Unsupported");
+      JBMActivation activation = new JBMActivation(this, endpointFactory, (JBMActivationSpec) spec);
+      activations.put(spec, activation);
+      activation.start();
    }
 
+   /**
+    * Endpoint deactivation
+    * @param endpointFactory The endpoint factory
+    * @param spec The activation spec
+    */
    public void endpointDeactivation(MessageEndpointFactory endpointFactory, ActivationSpec spec)
    {
       if (trace)
          log.trace("endpointDeactivation(" + endpointFactory + ", " + spec + ")");
+
+      JBMActivation activation = (JBMActivation) activations.remove(spec);
+      if (activation != null)
+         activation.stop();
    }
    
+   /**
+    * Get XA resources
+    * @param specs The activation specs
+    * @return The XA resources
+    * @exception ResourceException Thrown if an error occurs or unsupported
+    */
    public XAResource[] getXAResources(ActivationSpec[] specs) throws ResourceException
    {
       if (trace)
@@ -97,6 +130,11 @@
       throw new ResourceException("Unsupported");
    }
    
+   /**
+    * Start
+    * @param ctx The bootstrap context
+    * @exception ResourceAdapterInternalException Thrown if an error occurs
+    */
    public void start(BootstrapContext ctx) throws ResourceAdapterInternalException
    {
       if (trace)
@@ -107,11 +145,30 @@
       log.info("JBoss Messaging resource adapter started");
    }
 
+   /**
+    * Stop
+    */
    public void stop()
    {
       if (trace)
          log.trace("stop()");
 
+      for (Iterator i = activations.entrySet().iterator(); i.hasNext();)
+      {
+         Map.Entry entry = (Map.Entry) i.next();
+         try
+         {
+            JBMActivation activation = (JBMActivation) entry.getValue();
+            if (activation != null)
+               activation.stop();
+         }
+         catch (Exception ignored)
+         {
+            log.debug("Ignored", ignored);
+         }
+         i.remove();
+      }
+
       log.info("JBoss Messaging resource adapter stopped");
    }
 
@@ -847,10 +904,25 @@
    }
 
    /**
+    * Get the work manager
+    * @return The manager
+    */
+   public WorkManager getWorkManager()
+   {
+      if (trace)
+         log.trace("getWorkManager()");
+
+      if (ctx == null)
+         return null;
+
+      return ctx.getWorkManager();
+   }
+
+   /**
     * Get the JBoss connection factory
     * @return The factory
     */
-   protected JBossConnectionFactory getJBossConnectionFactory()
+   public JBossConnectionFactory getJBossConnectionFactory()
    {
       if (!configured.get()) {
          setup();

Modified: trunk/src/main/org/jboss/messaging/ra/Util.java
===================================================================
--- trunk/src/main/org/jboss/messaging/ra/Util.java	2009-02-05 19:24:16 UTC (rev 5825)
+++ trunk/src/main/org/jboss/messaging/ra/Util.java	2009-02-05 19:47:14 UTC (rev 5826)
@@ -21,6 +21,8 @@
  */
 package org.jboss.messaging.ra;
 
+import javax.naming.Context;
+
 /**
  * Various utility functions
  *
@@ -136,4 +138,17 @@
       // me will not be null, test for equality
       return me.equals(you);
    }
+
+   /**
+    * Lookup an object in the default initial context
+    * @param context The context to use
+    * @param name the name to lookup
+    * @param clazz the expected type
+    * @return the object
+    * @throws Exception for any error
+    */
+   public static Object lookup(Context context, String name, Class clazz) throws Exception
+   {
+      return context.lookup(name);
+   }
 }

Added: trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivation.java
===================================================================
--- trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivation.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivation.java	2009-02-05 19:47:14 UTC (rev 5826)
@@ -0,0 +1,656 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.ra.inflow;
+
+import org.jboss.messaging.jms.client.JBossConnectionFactory;
+import org.jboss.messaging.ra.JBMResourceAdapter;
+import org.jboss.messaging.ra.Util;
+import org.jboss.messaging.core.logging.Logger;
+
+import java.lang.reflect.Method;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.resource.ResourceException;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.resource.spi.work.Work;
+import javax.resource.spi.work.WorkManager;
+import javax.transaction.TransactionManager;
+
+import org.jboss.tm.TransactionManagerLocator;
+
+/**
+ * The activation.
+ * 
+ * @author <a href="adrian at jboss.com">Adrian Brock</a>
+ * @author <a href="jesper.pedersen at jboss.org">Jesper Pedersen</a>
+ * @version $Revision: $
+ */
+public class JBMActivation implements ExceptionListener
+{
+   /** The logger */
+   private static final Logger log = Logger.getLogger(JBMActivation.class);
+
+   /** Trace enabled */
+   private static boolean trace = log.isTraceEnabled();
+   
+   /** The onMessage method */
+   public static final Method ONMESSAGE; 
+   
+   /** The resource adapter */
+   protected JBMResourceAdapter ra;
+   
+   /** The activation spec */
+   protected JBMActivationSpec spec;
+
+   /** The message endpoint factory */
+   protected MessageEndpointFactory endpointFactory;
+   
+   /** Whether delivery is active */
+   protected AtomicBoolean deliveryActive = new AtomicBoolean(false);
+
+   /** Whether we are in the failure recovery loop */
+   private AtomicBoolean inFailure = new AtomicBoolean(false);
+
+   /** The destination */
+   protected Destination destination;
+
+   /** The destination type */
+   protected boolean isTopic = false;
+   
+   /** The connection */
+   protected Connection connection;
+   
+   /** Is the delivery transacted */
+   protected boolean isDeliveryTransacted;
+   
+   /** The TransactionManager */
+   protected TransactionManager tm;
+   
+   static
+   {
+      try
+      {
+         ONMESSAGE = MessageListener.class.getMethod("onMessage", new Class[] { Message.class });
+      }
+      catch (Exception e)
+      {
+         throw new RuntimeException(e);
+      }
+   }
+
+   /**
+    * Constructor
+    * @param ra The resource adapter
+    * @param endpointFactory The endpoint factory
+    * @param spec The activation spec
+    * @exception ResourceException Thrown if an error occurs
+    */
+   public JBMActivation(JBMResourceAdapter ra, MessageEndpointFactory endpointFactory, JBMActivationSpec spec) throws ResourceException
+   {
+      if (trace)
+         log.trace("constructor(" + ra + ", " + endpointFactory + ", " + spec + ")");
+
+      this.ra = ra;
+      this.endpointFactory = endpointFactory;
+      this.spec = spec;
+      try
+      {
+         this.isDeliveryTransacted = endpointFactory.isDeliveryTransacted(ONMESSAGE);
+      }
+      catch (Exception e)
+      {
+         throw new ResourceException(e);
+      }
+   }
+
+   /**
+    * Get the activation spec
+    * @return The value
+    */
+   public JBMActivationSpec getActivationSpec()
+   {
+      if (trace)
+         log.trace("getActivationSpec()");
+
+      return spec;
+   }
+
+   /**
+    * Get the message endpoint factory
+    * @return The value
+    */
+   public MessageEndpointFactory getMessageEndpointFactory()
+   {
+      if (trace)
+         log.trace("getMessageEndpointFactory()");
+
+      return endpointFactory;
+   }
+
+   /**
+    * Get whether delivery is transacted
+    * @return The value
+    */
+   public boolean isDeliveryTransacted()
+   {
+      if (trace)
+         log.trace("isDeliveryTransacted()");
+
+      return isDeliveryTransacted;
+   }
+
+   /**
+    * Get the work manager
+    * @return The value
+    */
+   public WorkManager getWorkManager()
+   {
+      if (trace)
+         log.trace("getWorkManager()");
+
+      return ra.getWorkManager();
+   }
+   
+   /**
+    * Get the transaction manager
+    * @return The value
+    */
+   public TransactionManager getTransactionManager()
+   {
+      if (trace)
+         log.trace("getTransactionManager()");
+
+      if (tm == null)
+         tm = TransactionManagerLocator.locateTransactionManager();
+
+      return tm;
+   }
+
+   /**
+    * Get the connection
+    * @return The value
+    */
+   public Connection getConnection()
+   {
+      if (trace)
+         log.trace("getConnection()");
+
+      return connection;
+   }
+
+   /**
+    * Get the destination 
+    * @return The value
+    */
+   public Destination getDestination()
+   {
+      if (trace)
+         log.trace("getDestination()");
+
+      return destination;
+   }
+
+   /**
+    * Is the destination a topic
+    * @return The value
+    */
+   public boolean isTopic()
+   {
+      if (trace)
+         log.trace("isTopic()");
+
+      return isTopic;
+   }
+   
+   /**
+    * Start the activation
+    * @throws ResourceException Thrown if an error occurs
+    */
+   public void start() throws ResourceException
+   {
+      if (trace)
+         log.trace("start()");
+
+      ra.getWorkManager().scheduleWork(new SetupActivation());
+      deliveryActive.set(true);
+   }
+
+   /**
+    * Stop the activation
+    */
+   public void stop()
+   {
+      if (trace)
+         log.trace("stop()");
+
+      deliveryActive.set(false);
+      teardown();
+   }
+
+   /**
+    * Handles any failure by trying to reconnect
+    * @param failure The reason for the failure
+    */
+   public void handleFailure(Throwable failure)
+   {
+      log.warn("Failure in jms activation " + spec, failure);
+      int reconnectCount = 0;
+      
+      // Only enter the failure loop once
+      if (inFailure.getAndSet(true))
+         return;
+
+      try
+      {
+         while (deliveryActive.get() && reconnectCount < spec.getReconnectAttempts())
+         {
+            teardown();
+
+            try
+            {
+               if (spec.getReconnectIntervalMillis() > 0)
+                  Thread.sleep(spec.getReconnectIntervalMillis());
+            }
+            catch (InterruptedException e)
+            {
+               log.debug("Interrupted trying to reconnect " + spec, e);
+               break;
+            }
+
+            log.info("Attempting to reconnect " + spec);
+            try
+            {
+               setup();
+               log.info("Reconnected with messaging provider.");            
+               break;
+            }
+            catch (Throwable t)
+            {
+               log.error("Unable to reconnect " + spec, t);
+            }
+            ++reconnectCount;
+         }
+      }
+      finally
+      {
+         // Leaving failure recovery loop
+         inFailure.set(false);
+      }
+   }
+
+   /**
+    * On exception
+    * @param exception The reason for the failure
+    */
+   public void onException(JMSException exception)
+   {
+      if (trace)
+         log.trace("onException(" + exception + ")");
+
+      handleFailure(exception);
+   }
+
+
+   /**
+    * Setup the activation
+    * @throws Exception Thrown if an error occurs
+    */
+   protected void setup() throws Exception
+   {
+      log.debug("Setting up " + spec);
+      
+      Context ctx = new InitialContext();
+      log.debug("Using context " + ctx.getEnvironment() + " for " + spec);
+      try
+      {
+         setupDestination(ctx);
+         setupConnection(ctx);
+      }
+      finally
+      {
+         if (ctx != null)
+            ctx.close();
+      }
+      setupSessionPool();
+      
+      log.debug("Setup complete " + this);
+   }
+   
+   /**
+    * Teardown the activation
+    */
+   protected void teardown()
+   {
+      log.debug("Tearing down " + spec);
+
+      teardownSessionPool();
+      teardownConnection();
+      teardownDestination();
+
+      log.debug("Tearing down complete " + this);
+   }
+   
+   /**
+    * Setup the destination
+    * @param ctx The naming context
+    * @throws Exception Thrown if an error occurs
+    */
+   protected void setupDestination(Context ctx) throws Exception
+   {
+      if (trace)
+         log.trace("setupDestination(" + ctx + ")");
+
+      String destinationName = spec.getDestination();
+
+      String destinationTypeString = spec.getDestinationType();
+      if (destinationTypeString != null && !destinationTypeString.trim().equals(""))
+      {
+         log.debug("Destination type defined as " + destinationTypeString);
+
+         Class<?> destinationType;
+         if (Topic.class.getName().equals(destinationTypeString))
+         {
+            destinationType = Topic.class;
+            isTopic = true;
+         }
+         else
+         {
+            destinationType = Queue.class;
+         }
+
+         log.debug("Retrieving destination " + destinationName + " of type " + destinationType.getName());
+         destination = (Destination) Util.lookup(ctx, destinationName, destinationType);
+      }
+      else
+      {
+         log.debug("Destination type not defined");
+         log.debug("Retrieving destination " + destinationName + " of type " + Destination.class.getName());
+
+         destination = (Destination) Util.lookup(ctx, destinationName, Destination.class);
+         if (destination instanceof Topic)
+         {
+            isTopic = true;
+         }
+      }
+
+      log.debug("Got destination " + destination + " from " + destinationName);
+   }
+   
+   /**
+    * Teardown the destination
+    */
+   protected void teardownDestination()
+   {
+      if (trace)
+         log.trace("teardownDestination()");
+
+      destination = null;
+   }
+   
+   /**
+    * Setup the Connection
+    * @param ctx the naming context
+    * @throws Exception for any error
+    */
+   protected void setupConnection(Context ctx) throws Exception
+   {
+      log.debug("Setup connection " + this);
+
+      String user = spec.getUser();
+      String pass = spec.getPassword();
+      String clientID = spec.getClientId();
+
+      if (isTopic)
+         connection = setupTopicConnection(ctx, user, pass, clientID);
+      else
+         connection = setupQueueConnection(ctx, user, pass, clientID);
+      
+      log.debug("Established connection " + this);
+   }
+   
+   /**
+    * Setup a queue connection
+    * @param ctx The naming context
+    * @param user The user
+    * @param pass The password
+    * @param clientID The client id
+    * @return The connection
+    * @throws Exception Thrown if an error occurs
+    */
+   protected QueueConnection setupQueueConnection(Context ctx, String user, String pass, String clientID) throws Exception
+   {
+      if (trace)
+         log.trace("setupQueueConnection(" + ctx + ", " + user + ", ****, " + clientID + ")");
+
+      QueueConnection result = null;
+
+      JBossConnectionFactory jcf = ra.getJBossConnectionFactory();
+
+      if (isDeliveryTransacted)
+      {
+         if (user != null)
+            result = jcf.createXAQueueConnection(user, pass);
+         else
+            result = jcf.createXAQueueConnection();
+      }
+      else
+      {
+         if (user != null)
+            result = jcf.createQueueConnection(user, pass);
+         else
+            result = jcf.createQueueConnection();
+      }
+      try
+      {
+         if (clientID != null)
+            result.setClientID(clientID);
+
+         result.setExceptionListener(this);
+
+         log.debug("Using queue connection " + result);
+
+         return result;
+      }
+      catch (Throwable t)
+      {
+         try
+         {
+            if (result != null)
+               result.close();
+         }
+         catch (Exception e)
+         {
+            log.trace("Ignored error closing connection", e);
+         }
+         if (t instanceof Exception)
+            throw (Exception) t;
+         throw new RuntimeException("Error configuring connection", t);
+      }
+   }
+   
+   /**
+    * Setup a topic connection
+    * @param ctx The naming context
+    * @param user The user
+    * @param pass The password
+    * @param clientID The client id
+    * @return The connection
+    * @throws Exception Thrown if an error occurs
+    */
+   protected TopicConnection setupTopicConnection(Context ctx, String user, String pass, String clientID) throws Exception
+   {
+      if (trace)
+         log.trace("setupTopicConnection(" + ctx + ", " + user + ", ****, " + clientID + ")");
+
+      TopicConnection result = null;
+
+      JBossConnectionFactory jcf = ra.getJBossConnectionFactory();
+
+      if (isDeliveryTransacted)
+      {
+         if (user != null)
+            result = jcf.createXATopicConnection(user, pass);
+         else
+            result = jcf.createXATopicConnection();
+      }
+      else
+      {
+         if (user != null)
+            result = jcf.createTopicConnection(user, pass);
+         else
+            result = jcf.createTopicConnection();
+      }
+      try
+      {
+         if (clientID != null)
+            result.setClientID(clientID);
+
+         result.setExceptionListener(this);
+
+         log.debug("Using topic connection " + result);
+
+         return result;
+      }
+      catch (Throwable t)
+      {
+         try
+         {
+            if (result != null)
+               result.close();
+         }
+         catch (Exception e)
+         {
+            log.trace("Ignored error closing connection", e);
+         }
+         if (t instanceof Exception)
+            throw (Exception) t;
+         throw new RuntimeException("Error configuring connection", t);
+      }
+   }
+   
+   /**
+    * Teardown the connection
+    */
+   protected void teardownConnection()
+   {
+      if (trace)
+         log.trace("teardownConnection()");
+
+      try
+      {
+         if (connection != null)
+         {
+            log.debug("Closing the " + connection);
+            connection.close();
+         }
+      }
+      catch (Throwable t)
+      {
+         log.debug("Error closing the connection " + connection, t);
+      }
+      connection = null;
+   }
+   
+   /**
+    * Setup the server session pool
+    * @throws Exception for any error
+    */
+   protected void setupSessionPool() throws Exception
+   {
+      log.debug("Starting delivery " + connection);
+      connection.start();
+      log.debug("Started delivery " + connection);
+   }
+   
+   /**
+    * Teardown the server session pool
+    */
+   protected void teardownSessionPool()
+   {
+      try
+      {
+         if (connection != null)
+         {
+            log.debug("Stopping delivery " + connection);
+            connection.stop();
+         }
+      }
+      catch (Throwable t)
+      {
+         log.debug("Error stopping delivery " + connection, t);
+      }
+   }
+
+   /**
+    * Handles the setup
+    */
+   private class SetupActivation implements Work
+   {
+      public void run()
+      {
+         try
+         {
+            setup();
+         }
+         catch (Throwable t)
+         {
+            handleFailure(t);
+         }
+      }
+
+      public void release()
+      {
+      }
+   }
+
+   /**
+    * Get a string representation
+    * @return The value
+    */
+   public String toString()
+   {
+      StringBuffer buffer = new StringBuffer();
+      buffer.append(JBMActivation.class.getName()).append('(');
+      buffer.append("spec=").append(spec.getClass().getName());
+      buffer.append(" mepf=").append(endpointFactory.getClass().getName());
+      buffer.append(" active=").append(deliveryActive.get());
+      if (destination != null)
+         buffer.append(" destination=").append(destination);
+      if (connection != null)
+         buffer.append(" connection=").append(connection);
+      buffer.append(" transacted=").append(isDeliveryTransacted);
+      buffer.append(')');
+      return buffer.toString();
+   }
+}

Added: trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivationSpec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivationSpec.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivationSpec.java	2009-02-05 19:47:14 UTC (rev 5826)
@@ -0,0 +1,899 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.ra.inflow;
+
+import org.jboss.messaging.ra.JBMResourceAdapter;
+
+import javax.jms.Session;
+import javax.resource.ResourceException;
+import javax.resource.spi.ActivationSpec;
+import javax.resource.spi.InvalidPropertyException;
+import javax.resource.spi.ResourceAdapter;
+
+import org.jboss.messaging.core.logging.Logger;
+
+/**
+ * The activation spec
+ * 
+ * @author <a href="adrian at jboss.com">Adrian Brock</a>
+ * @author <a href="jesper.pedersen at jboss.org">Jesper Pedersen</a>
+ * @version $Revision: $
+ */
+public class JBMActivationSpec implements ActivationSpec
+{
+   /** The logger */
+   private static final Logger log = Logger.getLogger(JBMActivationSpec.class);
+   
+   /** Whether trace is enabled */
+   private static boolean trace = log.isTraceEnabled();
+
+   /** The resource adapter */
+   private JBMResourceAdapter ra;
+
+   /** The destination */
+   private String destination;
+
+   /** The destination type */
+   private String destinationType;
+
+   /** The message selector */
+   private String messageSelector;
+
+   /** The acknowledgement mode */
+   private int acknowledgeMode;
+
+   /** The subscription durability */
+   private boolean subscriptionDurability;
+
+   /** The subscription name */
+   private String subscriptionName;
+
+   /** The client id */
+   private String clientId;
+
+   /** The reconnect interval in seconds */
+   private Long reconnectInterval;
+
+   /** The user */
+   private String user;
+
+   /** The password */
+   private String password;
+
+   /** The maximum number of messages */
+   private Integer maxMessages;
+
+   /** The minimum number of sessions */
+   private Integer minSession;
+
+   /** The maximum number of sessions */
+   private Integer maxSession;
+
+   /** The keep alive time for sessions */
+   private Long keepAlive;
+
+   /** Is the session transacted */
+   private Boolean sessionTransacted;
+
+   /** The number of reconnection attempts */
+   private Integer reconnectAttempts;
+
+   /** Unspecified redelivery */
+   private Boolean redeliverUnspecified;
+   
+   /** Transaction timeout */
+   private Integer transactionTimeout;
+   
+   /** Is same RM override */
+   private Boolean isSameRMOverrideValue;
+   
+   /** Force clear on shutdown */
+   private Boolean forceClearOnShutdown;
+   
+   /** Force clear internal */
+   private Long forceClearOnShutdownInterval;
+   
+   /** Force clear attempts */
+   private Integer forceClearAttempts;
+   
+   /**
+    * Constructor
+    */
+   public JBMActivationSpec()
+   {
+      if (trace)
+         log.trace("constructor()");
+
+      ra = null;
+      destination = null;
+      destinationType = null;
+      messageSelector = null;
+      acknowledgeMode = Session.SESSION_TRANSACTED;
+      subscriptionDurability = false;
+      subscriptionName = null;
+      clientId = null;
+      reconnectInterval = Long.valueOf(10);
+      user = null;
+      password = null;
+      maxMessages = Integer.valueOf(1);
+      minSession = Integer.valueOf(1);
+      maxSession = Integer.valueOf(15);
+      keepAlive = Long.valueOf(60000);
+      sessionTransacted = Boolean.TRUE;
+      reconnectAttempts = Integer.valueOf(5);
+      redeliverUnspecified = Boolean.TRUE;
+      transactionTimeout = null;
+      isSameRMOverrideValue = null;
+      forceClearOnShutdown = Boolean.FALSE;
+      forceClearOnShutdownInterval = Long.valueOf(1000);
+      forceClearAttempts = Integer.valueOf(0);
+   }
+
+   /**
+    * Get the resource adapter
+    * @return The resource adapter
+    */
+   public ResourceAdapter getResourceAdapter()
+   {
+      if (trace)
+         log.trace("getResourceAdapter()");
+
+      return ra;
+   }
+
+   /**
+    * Set the resource adapter
+    * @param ra The resource adapter
+    * @exception ResourceException Thrown if incorrect resource adapter
+    */
+   public void setResourceAdapter(ResourceAdapter ra) throws ResourceException
+   {
+      if (trace)
+         log.trace("setResourceAdapter(" + ra + ")");
+
+      if (ra == null || !(ra instanceof JBMResourceAdapter)) {
+         throw new ResourceException("Resource adapter is " + ra);
+      }
+
+      this.ra = (JBMResourceAdapter)ra;
+   }
+
+   /**
+    * Get the destination
+    * @return The value
+    */
+   public String getDestination()
+   {
+      if (trace)
+         log.trace("getDestination()");
+
+      return destination;
+   }
+
+   /**
+    * Set the destination
+    * @param value The value
+    */
+   public void setDestination(String value)
+   {
+      if (trace)
+         log.trace("setDestination(" + value + ")");
+
+      this.destination = value;
+   }
+
+   /**
+    * Get the destination type
+    * @return The value
+    */
+   public String getDestinationType()
+   {
+      if (trace)
+         log.trace("getDestinationType()");
+
+      return destinationType;
+   }
+
+   /**
+    * Set the destination type
+    * @param value The value
+    */
+   public void setDestinationType(String value)
+   {
+      if (trace)
+         log.trace("setDestinationType(" + value + ")");
+
+      this.destinationType = destinationType;
+   }
+
+   /**
+    * Get the message selector
+    * @return The value
+    */
+   public String getMessageSelector()
+   {
+      if (trace)
+         log.trace("getMessageSelector()");
+
+      return messageSelector;
+   }
+
+   /**
+    * Set the message selector
+    * @param value The value
+    */
+   public void setMessageSelector(String value)
+   {
+      if (trace)
+         log.trace("setMessageSelector(" + value + ")");
+
+      this.messageSelector = messageSelector;
+   }
+
+   /**
+    * Get the acknowledge mode
+    * @return The value
+    */
+   public String getAcknowledgeMode()
+   {
+      if (trace)
+         log.trace("getAcknowledgeMode()");
+
+      if (sessionTransacted.booleanValue())
+         return "Transacted";
+      else if (Session.DUPS_OK_ACKNOWLEDGE == acknowledgeMode)
+         return "Dups-ok-acknowledge";
+      else
+         return "Auto-acknowledge";
+   }
+
+   /**
+    * Set the acknowledge mode
+    * @param value The value
+    */
+   public void setAcknowledgeMode(String value)
+   {
+      if (trace)
+         log.trace("setAcknowledgeMode(" + value + ")");
+
+      if ("DUPS_OK_ACKNOWLEDGE".equals(value) || "Dups-ok-acknowledge".equals(value))
+         this.acknowledgeMode = Session.DUPS_OK_ACKNOWLEDGE;
+      else if ("AUTO_ACKNOWLEDGE".equals(value) || "Auto-acknowledge".equals(value))
+         this.acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+      else if ("SESSION_TRANSACTED".equals(value))
+         this.acknowledgeMode = Session.SESSION_TRANSACTED;
+      else
+         throw new IllegalArgumentException("Unsupported acknowledgement mode " + value);
+   }
+
+   /**
+    * @return the acknowledgement mode
+    */
+   public int getAcknowledgeModeInt()
+   {
+      if (trace)
+         log.trace("getAcknowledgeMode()");
+
+      if (sessionTransacted.booleanValue())
+         return Session.SESSION_TRANSACTED;
+
+      return acknowledgeMode;
+   }
+
+   /**
+    * Get the subscription durability
+    * @return The value
+    */
+   public String getSubscriptionDurability()
+   {
+      if (trace)
+         log.trace("getSubscriptionDurability()");
+
+      if (subscriptionDurability)
+         return "Durable";
+      else
+         return "NonDurable";
+   }
+
+   /**
+    * Set the subscription durability
+    * @param value The value
+    */
+   public void setSubscriptionDurability(String value)
+   {
+      if (trace)
+         log.trace("setSubscriptionDurability(" + value + ")");
+
+      this.subscriptionDurability = "Durable".equals(subscriptionDurability);
+   }
+
+   /**
+    * Get the status of subscription durability
+    * @return The value
+    */
+   public boolean isSubscriptionDurable()
+   {
+      if (trace)
+         log.trace("isSubscriptionDurable()");
+
+      return subscriptionDurability;
+   }
+
+   /**
+    * Get the subscription name
+    * @return The value
+    */
+   public String getSubscriptionName()
+   {
+      if (trace)
+         log.trace("getSubscriptionName()");
+
+      return subscriptionName;
+   }
+
+   /**
+    * Set the subscription name
+    * @param value The value
+    */
+   public void setSubscriptionName(String value)
+   {
+      if (trace)
+         log.trace("setSubscriptionName(" + value + ")");
+
+      this.subscriptionName = value;
+   }
+
+   /**
+    * Get the client id
+    * @return The value
+    */
+   public String getClientId()
+   {
+      if (trace)
+         log.trace("getClientId()");
+
+      return clientId;
+   }
+
+   /**
+    * Set the client id
+    * @param value The value
+    */
+   public void setClientId(String value)
+   {
+      if (trace)
+         log.trace("setClientId(" + value + ")");
+
+      this.clientId = value;
+   }
+
+   /**
+    * Get the reconnection interval
+    * @return The value
+    */
+   public Long getReconnectInterval()
+   {
+      if (trace)
+         log.trace("getReconnectInterval()");
+
+      return reconnectInterval;
+   }
+
+   /**
+    * Set the reconnection interval
+    * @param value The value
+    */
+   public void setReconnectInterval(Long value)
+   {
+      if (trace)
+         log.trace("setReconnectInterval(" + value + ")");
+
+      this.reconnectInterval = value;
+   }
+
+   /**
+    * Get the reconnection interval in milliseconds
+    * @return The value; if 0 == disable
+    */
+   public long getReconnectIntervalMillis()
+   {
+      if (trace)
+         log.trace("getReconnectIntervalMillis()");
+
+      if (reconnectInterval == null)
+         return 0;
+
+      return reconnectInterval.longValue() * 1000L;
+   }
+
+   /**
+    * Get the user
+    * @return The value
+    */
+   public String getUser()
+   {
+      if (trace)
+         log.trace("getUser()");
+
+      return user;
+   }
+
+   /**
+    * Set the user
+    * @param value The value
+    */
+   public void setUser(String value)
+   {
+      if (trace)
+         log.trace("setUser(" + value + ")");
+
+      this.user = value;
+   }
+
+   /**
+    * Get the password
+    * @return The value
+    */
+   public String getPassword()
+   {
+      if (trace)
+         log.trace("getPassword()");
+
+      return password;
+   }
+
+   /**
+    * Set the password
+    * @param value The value
+    */
+   public void setPassword(String value)
+   {
+      if (trace)
+         log.trace("setPassword(" + value + ")");
+
+      this.password = value;
+   }
+
+   /**
+    * Get the numer of max messages
+    * @return The value
+    */
+   public Integer getMaxMessages()
+   {
+      if (trace)
+         log.trace("getMaxMessages()");
+
+      return maxMessages;
+   }
+
+   /**
+    * Set the numer of max messages
+    * @param value The value
+    */
+   public void setMaxMessages(Integer value)
+   {
+      if (trace)
+         log.trace("setMaxMessages(" + value + ")");
+
+      this.maxMessages = maxMessages;
+   }
+
+   /**
+    * Get the number of max messages
+    * @return The value
+    */
+   public int getMaxMessagesInt()
+   {
+      if (trace)
+         log.trace("getMaxMessagesInt()");
+
+      if (maxMessages == null)
+         return 0;
+
+      return maxMessages.intValue();
+   }
+
+   /**
+    * Get the number of min session
+    * @return The value
+    */
+   public Integer getMinSession()
+   {
+      if (trace)
+         log.trace("getMinSession()");
+
+      return minSession;
+   }
+
+   /**
+    * Set the number of min session
+    * @param value The value
+    */
+   public void setMinSession(Integer value)
+   {
+      if (trace)
+         log.trace("setMinSession(" + value + ")");
+
+      this.minSession = value;
+   }
+
+   /**
+    * Get the number of min session
+    * @return The value
+    */
+   public int getMinSessionInt()
+   {
+      if (trace)
+         log.trace("getMinSessionInt()");
+
+      if (minSession == null)
+         return 0;
+
+      return minSession.intValue();
+   }
+
+   /**
+    * Get the number of max session
+    * @return The value
+    */
+   public Integer getMaxSession()
+   {
+      if (trace)
+         log.trace("getMaxSession()");
+
+      return maxSession;
+   }
+
+   /**
+    * Set the number of max session
+    * @param value The value
+    */
+   public void setMaxSession(Integer value)
+   {
+      if (trace)
+         log.trace("setMaxSession(" + value + ")");
+
+      maxSession = value;
+   }
+
+   /**
+    * Get the number of max session
+    * @return The value
+    */
+   public int getMaxSessionInt()
+   {
+      if (trace)
+         log.trace("getMaxSessionInt()");
+
+      if (maxSession == null)
+         return 0;
+
+      return maxSession.intValue();
+   }
+
+   /**
+    * Get the keep alive
+    * @return The value
+    */
+   public Long getKeepAlive()
+   {
+      if (trace)
+         log.trace("getKeepAlive()");
+
+      return keepAlive;
+   }
+
+   /**
+    * Set the keep alive
+    * @param value The value
+    */
+   public void setKeepAlive(Long value)
+   {
+      if (trace)
+         log.trace("setKeepAlive(" + value + ")");
+
+      this.keepAlive = value;
+   }
+
+   /**
+    * Get the keep alive
+    * @return The value
+    */
+   public long getKeepAliveLong()
+   {
+      if (trace)
+         log.trace("getKeepAliveLong()");
+
+      if (keepAlive == null)
+         return 0;
+
+      return keepAlive.longValue();
+   }
+
+   /**
+    * Get the session transacted
+    * @return The value
+    */
+   public Boolean getSessionTransacted()
+   {
+      if (trace)
+         log.trace("getSessionTransacted()");
+
+      return sessionTransacted;
+   }
+
+   /**
+    * Set the session transacted
+    * @param value The value
+    */
+   public void setSessionTransacted(Boolean value)
+   {
+      if (trace)
+         log.trace("setTransactionTimeout(" + value + ")");
+
+      this.sessionTransacted = value;
+   }
+
+   /**
+    * Get the session transaced
+    * @return THe value
+    */
+   public boolean isSessionTransacted()
+   {
+      if (trace)
+         log.trace("isSessionTransacted()");
+
+      if (sessionTransacted == null)
+         return false;
+
+      return sessionTransacted.booleanValue();
+   }
+
+   /**
+    * Get the reconnect attempts
+    * @return The value
+    */
+   public Integer getReconnectAttempts()
+   {
+      if (trace)
+         log.trace("getReconnectAttempts()");
+
+      return reconnectAttempts;
+   }
+
+   /**
+    * Set the reconnect attempts
+    * @param value The value
+    */
+   public void setReconnectAttempts(Integer value)
+   {
+      if (trace)
+         log.trace("setReconnectAttempts(" + value + ")");
+
+      this.reconnectAttempts = value;
+   }
+
+   /**
+    * Get the redeliver upspecified
+    * @return The value
+    */
+   public Boolean getRedeliverUnspecified()
+   {
+      if (trace)
+         log.trace("getRedeliverUnspecified()");
+
+      return redeliverUnspecified;
+   }
+
+   /**
+    * Set the redeliver unspecified
+    * @param value The value
+    */
+   public void setRedeliverUnspecified(Boolean value)
+   {
+      if (trace)
+         log.trace("setRedeliverUnspecified(" + value + ")");
+
+      this.redeliverUnspecified = value;
+   }
+   
+   /**
+    * Get the transaction timeout
+    * @return The value
+    */
+   public Integer getTransactionTimeout()
+   {
+      if (trace)
+         log.trace("getTransactionTimeout()");
+
+      return transactionTimeout;
+   }
+
+   /**
+    * Set the transaction timeout
+    * @param value The value
+    */
+   public void setTransactionTimeout(Integer value)
+   {
+      if (trace)
+         log.trace("setTransactionTimeout(" + value + ")");
+
+      transactionTimeout = value;
+   }
+   
+   /**
+    * Get the is same rm override
+    * @return The value
+    */
+   public Boolean getIsSameRMOverrideValue()
+   {
+      if (trace)
+         log.trace("getIsSameRMOverrideValue()");
+
+      return isSameRMOverrideValue;
+   }
+
+   /**
+    * Set the is same RM override
+    * @param value The value
+    */
+   public void setIsSameRMOverrideValue(Boolean value)
+   {
+      if (trace)
+         log.trace("setIsSameRMOverrideValue(" + value + ")");
+
+      isSameRMOverrideValue = value;
+   }
+
+   /**
+    * Get force clear on shutdown
+    * @return The value
+    */
+   public Boolean getForceClearOnShutdown()
+   {
+      if (trace)
+         log.trace("getForceClearOnShutdown()");
+
+      return forceClearOnShutdown;
+   }
+
+   /**
+    * Set the force clear on shutdown
+    * @param value The value
+    */
+   public void setForceClearOnShutdown(Boolean value)
+   {
+      if (trace)
+         log.trace("setForceClearOnShutdown(" + value + ")");
+
+      this.forceClearOnShutdown = value;
+   }   
+   
+   /**
+    * Get force clear on shutdown
+    * @return The value
+    */
+   public boolean isForceClearOnShutdown()
+   {
+      if (trace)
+         log.trace("isForceClearOnShutdown()");
+
+      if (forceClearOnShutdown == null)
+         return false;
+
+      return forceClearOnShutdown.booleanValue();
+   }
+
+   /**
+    * Get force clear on shutdown interval
+    * @return The value
+    */
+   public Long getForceClearOnShutdownInterval()
+   {
+      if (trace)
+         log.trace("getForceClearOnShutdownInterval()");
+
+      return forceClearOnShutdownInterval;
+   }
+   
+   /**
+    * Set the force clear on shutdown interval
+    * @param value The value
+    */
+   public void setForceClearOnShutdownInterval(Long value)
+   {
+      if (trace)
+         log.trace("setForceClearOnShutdownInterval(" + value + ")");
+
+      forceClearOnShutdownInterval = value;
+   }
+   
+   /**
+    * Get force clear attempts
+    * @return The value
+    */
+   public Integer getForceClearAttempts()
+   {
+      if (trace)
+         log.trace("getForceClearAttempts()");
+
+      return forceClearAttempts;
+   }
+   
+   /**
+    * Set the force clear attempts
+    * @param value The value
+    */
+   public void setForceClearAttempts(Integer value)
+   {
+      if (trace)
+         log.trace("setForceClearAttempts(" + value + ")");
+
+      forceClearAttempts = value;
+   }
+
+   /**
+    * Validate
+    * @exception InvalidPropertyException Thrown if a validation exception occurs
+    */
+   public void validate() throws InvalidPropertyException
+   {
+      if (trace)
+         log.trace("validate()");
+
+      if (destination == null || destination.trim().equals(""))
+         throw new InvalidPropertyException("Destination is mandatory");
+   }
+
+   /**
+    * Get a string representation
+    * @return The value
+    */
+   public String toString()
+   {
+      StringBuffer buffer = new StringBuffer();
+      buffer.append(JBMActivationSpec.class.getName()).append('(');
+      buffer.append("ra=").append(ra);
+      buffer.append(" destination=").append(destination);
+      buffer.append(" destinationType=").append(destinationType);
+      if (messageSelector != null)
+         buffer.append(" selector=").append(messageSelector);
+      buffer.append(" tx=").append(sessionTransacted);
+      if (sessionTransacted == false)
+         buffer.append(" ack=").append(getAcknowledgeMode());
+      buffer.append(" durable=").append(subscriptionDurability);
+      if (clientId != null)
+         buffer.append(" clientID=").append(clientId);
+      if (subscriptionName != null)
+         buffer.append(" subscription=").append(subscriptionName);
+      buffer.append(" reconnect=").append(reconnectInterval);
+      buffer.append(" user=").append(user);
+      if (password != null)
+         buffer.append(" password=").append("****");
+      buffer.append(" maxMessages=").append(maxMessages);
+      buffer.append(" minSession=").append(minSession);
+      buffer.append(" maxSession=").append(maxSession);
+      buffer.append(" keepAlive=").append(keepAlive);
+      buffer.append(')');
+      return buffer.toString();
+   }
+}




More information about the jboss-cvs-commits mailing list