[jboss-cvs] JBoss Messaging SVN: r5955 - in trunk: src/main/org/jboss/messaging/core/server/impl and 6 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Mar 2 07:00:26 EST 2009


Author: ataylor
Date: 2009-03-02 07:00:26 -0500 (Mon, 02 Mar 2009)
New Revision: 5955

Modified:
   trunk/build-thirdparty.xml
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
   trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
   trunk/src/main/org/jboss/messaging/integration/security/JBossASSecurityManager.java
   trunk/src/main/org/jboss/messaging/integration/security/SecurityActions.java
   trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java
   trunk/src/main/org/jboss/messaging/ra/JBMResourceAdapter.java
   trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivation.java
   trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java
Log:
changed RA to use core and AS integration fixes

Modified: trunk/build-thirdparty.xml
===================================================================
--- trunk/build-thirdparty.xml	2009-03-02 11:54:04 UTC (rev 5954)
+++ trunk/build-thirdparty.xml	2009-03-02 12:00:26 UTC (rev 5955)
@@ -92,8 +92,8 @@
       -->
 
       <componentref name="jboss/integration" version="5.0.0.Beta4"/>
-      <componentref name="jboss/jboss-security-spi" version="2.0.1.GA"/>
-      <componentref name="jboss/jbosssx-client" version="2.0.1.GA"/>
+      <componentref name="jboss/jboss-security-spi" version="2.0.2-SNAPSHOT"/>
+      <componentref name="jboss/jbosssx-client" version="2.0.2-SNAPSHOT"/>
       <componentref name="jboss/jboss-jaspi-api" version="1.0-BETA1"/>
 
       <!--

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-03-02 11:54:04 UTC (rev 5954)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-03-02 12:00:26 UTC (rev 5955)
@@ -1891,7 +1891,7 @@
                }
                else
                {
-                  theTx.commit();
+                  theTx.commit(packet.isOnePhase());
 
                   response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
                }

Modified: trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java	2009-03-02 11:54:04 UTC (rev 5954)
+++ trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java	2009-03-02 12:00:26 UTC (rev 5955)
@@ -38,6 +38,8 @@
 
    void commit() throws Exception;
 
+   void commit(boolean onePhase) throws Exception;
+
    void rollback() throws Exception;
 
    int getOperationsCount();

Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2009-03-02 11:54:04 UTC (rev 5954)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2009-03-02 12:00:26 UTC (rev 5955)
@@ -145,6 +145,11 @@
 
    public void commit() throws Exception
    {           
+      commit(true);
+   }
+
+   public void commit(boolean onePhase) throws Exception
+   {
       synchronized (timeoutLock)
       {
          if (state == State.ROLLBACK_ONLY)
@@ -159,9 +164,16 @@
                return;
             }
          }
-         
+
          if (xid != null)
          {
+            if (onePhase)
+            {
+               if(state == State.ACTIVE)
+               {
+                  prepare();
+               }
+            }
             if (state != State.PREPARED)
             {
                throw new IllegalStateException("Transaction is in invalid state " + state);
@@ -174,7 +186,7 @@
                throw new IllegalStateException("Transaction is in invalid state " + state);
             }
          }
-         
+
          if (operations != null)
          {
             for (TransactionOperation operation : operations)
@@ -196,7 +208,7 @@
             {
                operation.afterCommit(this);
             }
-         }                  
+         }
       }
    }
 

Modified: trunk/src/main/org/jboss/messaging/integration/security/JBossASSecurityManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/security/JBossASSecurityManager.java	2009-03-02 11:54:04 UTC (rev 5954)
+++ trunk/src/main/org/jboss/messaging/integration/security/JBossASSecurityManager.java	2009-03-02 12:00:26 UTC (rev 5955)
@@ -24,6 +24,7 @@
 
 import java.util.HashSet;
 import java.util.Set;
+import java.security.Principal;
 
 import javax.naming.InitialContext;
 import javax.security.auth.Subject;
@@ -107,8 +108,8 @@
       // security my be screwed up, on account of thread local security stack being corrupted.
       if (authenticated)
       {
-         SecurityActions.pushSubjectContext(principal, passwordChars, subject);
-         Set<SimplePrincipal> rolePrincipals = getRolePrincipals(checkType, roles);
+         SecurityActions.pushSubjectContext(principal, passwordChars, subject, securityDomainName);
+         Set<Principal> rolePrincipals = getRolePrincipals(checkType, roles);
 
          authenticated = realmMapping.doesUserHaveRole(principal, rolePrincipals);
 
@@ -121,9 +122,9 @@
       return authenticated;
    }
 
-   private Set<SimplePrincipal> getRolePrincipals(final CheckType checkType, final Set<Role> roles)
+   private Set<Principal> getRolePrincipals(final CheckType checkType, final Set<Role> roles)
    {
-      Set<SimplePrincipal> principals = new HashSet<SimplePrincipal>();
+      Set<Principal> principals = new HashSet<Principal>();
       for (Role role : roles)
       {
          if ((checkType.equals(CheckType.CREATE) && role.isCheckType(CheckType.CREATE)) ||

Modified: trunk/src/main/org/jboss/messaging/integration/security/SecurityActions.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/security/SecurityActions.java	2009-03-02 11:54:04 UTC (rev 5954)
+++ trunk/src/main/org/jboss/messaging/integration/security/SecurityActions.java	2009-03-02 12:00:26 UTC (rev 5955)
@@ -29,6 +29,9 @@
 import javax.security.auth.Subject;
 
 import org.jboss.security.SecurityAssociation;
+import org.jboss.security.SecurityContextAssociation;
+import org.jboss.security.SecurityContextFactory;
+import org.jboss.security.SecurityContext;
 
 
 /** A collection of privileged actions for this package
@@ -39,65 +42,101 @@
  */
 class SecurityActions
 {
-	interface PrincipalInfoAction
-	{
-		PrincipalInfoAction PRIVILEGED = new PrincipalInfoAction()
-		{
-			public void push(final Principal principal, final Object credential,
-					           final Subject subject)
-			{
-				AccessController.doPrivileged(
-						new PrivilegedAction()
-						{
-							public Object run()
-							{
-								SecurityAssociation.pushSubjectContext(subject, principal, credential);
-								return null;
-							}
-						}
-				);
-			}
-			public void pop()
-			{
-				AccessController.doPrivileged(
-						new PrivilegedAction()
-						{
-							public Object run()
-							{
-								SecurityAssociation.popSubjectContext();
-								return null;
-							}
-						}
-				);
-			}
-		};
+   interface PrincipalInfoAction
+   {
+      PrincipalInfoAction PRIVILEGED = new PrincipalInfoAction()
+      {
+         public void push(final Principal principal, final Object credential,
+            final Subject subject, final String securityDomain)
+         {
+            AccessController.doPrivileged(
+               new PrivilegedAction<Object>()
+               {
+                  public Object run()
+                  {
+                     //SecurityAssociation.pushSubjectContext(subject, principal, credential);
+                     SecurityContext sc = SecurityContextAssociation.getSecurityContext();
+                     if(sc == null)
+                     {
+                        try
+                        {
+                           sc = SecurityContextFactory.createSecurityContext(principal, credential,
+                                 subject, securityDomain);
+                        }
+                        catch (Exception e)
+                        {
+                           throw new RuntimeException(e);
+                        }
+                     }
+                     SecurityContextAssociation.setSecurityContext(sc);
+                     return null;
+                  }
+               }
+            );
+         }
+         public void pop()
+         {
+            AccessController.doPrivileged(
+               new PrivilegedAction<Object>()
+               {
+                  public Object run()
+                  {
+                     //SecurityAssociation.popSubjectContext();
+                     SecurityContextAssociation.clearSecurityContext();
+                     return null;
+                  }
+               }
+            );
+         }
+      };
 
-		PrincipalInfoAction NON_PRIVILEGED = new PrincipalInfoAction()
-		{
-			public void push(final Principal principal, final Object credential, final Subject subject)
-			{
-				SecurityAssociation.pushSubjectContext(subject, principal, credential);
-			}
-			public void pop()
-			{
-				SecurityAssociation.popSubjectContext();
-			}
-		};
+      PrincipalInfoAction NON_PRIVILEGED = new PrincipalInfoAction()
+      {
+         public void push(Principal principal, Object credential, Subject subject,
+               String securityDomain)
+         {
+            //SecurityAssociation.pushSubjectContext(subject, principal, credential);
+            SecurityContext sc = SecurityContextAssociation.getSecurityContext();
+            if(sc == null)
+            {
+               try
+               {
+                  sc = SecurityContextFactory.createSecurityContext(principal, credential,
+                        subject, securityDomain);
+               }
+               catch (Exception e)
+               {
+                  throw new RuntimeException(e);
+               }
+            }
+            else
+            {
+               sc.getUtil().createSubjectInfo(principal, credential, subject);
+            }
+            SecurityContextAssociation.setSecurityContext(sc);
+         }
+         public void pop()
+         {
+            //SecurityAssociation.popSubjectContext();
+            SecurityContextAssociation.clearSecurityContext();
+         }
+      };
 
-		void push(Principal principal, Object credential, Subject subject);
-		void pop();
+
+		void push(Principal principal, Object credential, Subject subject, String securityDomain);
+      void pop();
 	}
 
 	static void pushSubjectContext(final Principal principal, final Object credential,
-			                         final Subject subject)
+                                  final Subject subject, String securityDomainName)
 	{
 		if(System.getSecurityManager() == null)
 		{
-			PrincipalInfoAction.NON_PRIVILEGED.push(principal, credential, subject);
+			PrincipalInfoAction.NON_PRIVILEGED.push(principal, credential, subject, securityDomainName);
 		}
 		else
 		{
-			PrincipalInfoAction.PRIVILEGED.push(principal, credential, subject);
+			PrincipalInfoAction.PRIVILEGED.push(principal, credential, subject, securityDomainName);
 		}
 	}
 	

Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java	2009-03-02 11:54:04 UTC (rev 5954)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java	2009-03-02 12:00:26 UTC (rev 5955)
@@ -553,58 +553,7 @@
       {
          if (sessionFactory == null)
          {
-            if (connectorConfigs != null)
-            {
-               sessionFactory = new ClientSessionFactoryImpl(connectorConfigs,
-                                                             connectionLoadBalancingPolicyClassName,
-                                                             pingPeriod,
-                                                             connectionTTL,
-                                                             callTimeout,
-                                                             consumerWindowSize,
-                                                             consumerMaxRate,
-                                                             sendWindowSize,
-                                                             producerMaxRate,
-                                                             minLargeMessageSize,
-                                                             blockOnAcknowledge,
-                                                             blockOnNonPersistentSend,
-                                                             blockOnPersistentSend,
-                                                             autoGroup,
-                                                             maxConnections,
-                                                             preAcknowledge,
-                                                             dupsOKBatchSize,                                                     
-                                                             retryInterval,
-                                                             retryIntervalMultiplier,                                                             
-                                                             maxRetriesBeforeFailover,
-                                                             maxRetriesAfterFailover);
-            }
-            else
-            {
-               sessionFactory = new ClientSessionFactoryImpl(discoveryGroupAddress,
-                                                             discoveryGroupPort,
-                                                             discoveryRefreshTimeout,
-                                                             discoveryInitialWaitTimeout,
-                                                             connectionLoadBalancingPolicyClassName,
-                                                             pingPeriod,
-                                                             connectionTTL,
-                                                             callTimeout,
-                                                             consumerWindowSize,
-                                                             consumerMaxRate,
-                                                             sendWindowSize,
-                                                             producerMaxRate,
-                                                             minLargeMessageSize,
-                                                             blockOnAcknowledge,
-                                                             blockOnNonPersistentSend,
-                                                             blockOnPersistentSend,
-                                                             autoGroup,
-                                                             maxConnections,
-                                                             preAcknowledge,
-                                                             dupsOKBatchSize,                                                             
-                                                             retryInterval,
-                                                             retryIntervalMultiplier,
-                                                             maxRetriesBeforeFailover,
-                                                             maxRetriesAfterFailover);
-            }
-   
+            createFactory();
          }
       }
       catch (MessagingException me)
@@ -650,7 +599,71 @@
                                  sessionFactory);
    }
 
+   private void createFactory() throws MessagingException
+   {
+      if (connectorConfigs != null)
+      {
+         sessionFactory = new ClientSessionFactoryImpl(connectorConfigs,
+                                                       connectionLoadBalancingPolicyClassName,
+                                                       pingPeriod,
+                                                       connectionTTL,
+                                                       callTimeout,
+                                                       consumerWindowSize,
+                                                       consumerMaxRate,
+                                                       sendWindowSize,
+                                                       producerMaxRate,
+                                                       minLargeMessageSize,
+                                                       blockOnAcknowledge,
+                                                       blockOnNonPersistentSend,
+                                                       blockOnPersistentSend,
+                                                       autoGroup,
+                                                       maxConnections,
+                                                       preAcknowledge,
+                                                       dupsOKBatchSize,
+                                                       retryInterval,
+                                                       retryIntervalMultiplier,
+                                                       maxRetriesBeforeFailover,
+                                                       maxRetriesAfterFailover);
+      }
+      else
+      {
+         sessionFactory = new ClientSessionFactoryImpl(discoveryGroupAddress,
+                                                       discoveryGroupPort,
+                                                       discoveryRefreshTimeout,
+                                                       discoveryInitialWaitTimeout,
+                                                       connectionLoadBalancingPolicyClassName,
+                                                       pingPeriod,
+                                                       connectionTTL,
+                                                       callTimeout,
+                                                       consumerWindowSize,
+                                                       consumerMaxRate,
+                                                       sendWindowSize,
+                                                       producerMaxRate,
+                                                       minLargeMessageSize,
+                                                       blockOnAcknowledge,
+                                                       blockOnNonPersistentSend,
+                                                       blockOnPersistentSend,
+                                                       autoGroup,
+                                                       maxConnections,
+                                                       preAcknowledge,
+                                                       dupsOKBatchSize,
+                                                       retryInterval,
+                                                       retryIntervalMultiplier,
+                                                       maxRetriesBeforeFailover,
+                                                       maxRetriesAfterFailover);
+      }
+   }
+
    // Private --------------------------------------------------------------------------------------
 
    // Inner classes --------------------------------------------------------------------------------
+
+   public ClientSessionFactory getCoreFactory() throws MessagingException
+   {
+      if (sessionFactory == null)
+      {
+         createFactory();
+      }
+      return sessionFactory;
+   }
 }

Modified: trunk/src/main/org/jboss/messaging/ra/JBMResourceAdapter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/ra/JBMResourceAdapter.java	2009-03-02 11:54:04 UTC (rev 5954)
+++ trunk/src/main/org/jboss/messaging/ra/JBMResourceAdapter.java	2009-03-02 12:00:26 UTC (rev 5955)
@@ -21,14 +21,19 @@
  */
 package org.jboss.messaging.ra;
 
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.jms.client.JBossConnectionFactory;
+import org.jboss.messaging.jms.client.JBossSession;
 import org.jboss.messaging.ra.inflow.JBMActivation;
 import org.jboss.messaging.ra.inflow.JBMActivationSpec;
 
+import javax.jms.Session;
 import javax.resource.ResourceException;
 import javax.resource.spi.ActivationSpec;
 import javax.resource.spi.BootstrapContext;
@@ -75,7 +80,7 @@
    /**
     * The JBoss connection factory
     */
-   private JBossConnectionFactory factory;
+   private ClientSessionFactory sessionFactory;
 
    /**
     * Have the factory been configured
@@ -87,6 +92,8 @@
     */
    private Map activations;
 
+   private JBossConnectionFactory jBossConnectionFactory;
+
    /**
     * Constructor
     */
@@ -98,7 +105,7 @@
       }
 
       raProperties = new JBMRAProperties();
-      factory = null;
+      sessionFactory = null;
       configured = new AtomicBoolean(false);
       activations = new ConcurrentHashMap();
    }
@@ -112,6 +119,17 @@
     */
    public void endpointActivation(MessageEndpointFactory endpointFactory, ActivationSpec spec) throws ResourceException
    {
+      if(!configured.getAndSet(true))
+      {
+         try
+         {
+            setup();
+         }
+         catch (MessagingException e)
+         {
+            throw new ResourceException("Unable to create activation", e);
+         }
+      }
       if (trace)
       {
          log.trace("endpointActivation(" + endpointFactory + ", " + spec + ")");
@@ -1238,19 +1256,39 @@
       return ctx.getWorkManager();
    }
 
-   /**
-    * Get the JBoss connection factory
-    *
-    * @return The factory
-    */
-   public JBossConnectionFactory getJBossConnectionFactory()
+   public ClientSession createSession(int ackMode, String user, String pass, Boolean preAck, Integer dupsOkBatchSize, Integer transactionBatchSize, boolean deliveryTransacted) throws Exception
    {
-      if (!configured.get())
+
+      ClientSession result;
+
+      boolean actPreAck = preAck != null ? preAck : ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
+      int actDupsOkBatchSize = dupsOkBatchSize != null ? dupsOkBatchSize : ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE;
+      int actTxBatchSize = transactionBatchSize != null ? transactionBatchSize : ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE;
+      switch (ackMode)
       {
-         setup();
+         case Session.SESSION_TRANSACTED:
+            result = sessionFactory.createSession(user, pass, deliveryTransacted, false, false, actPreAck, actTxBatchSize);
+            break;
+         case Session.AUTO_ACKNOWLEDGE:
+            result = sessionFactory.createSession(user, pass, deliveryTransacted, true, false, actPreAck, actTxBatchSize);
+            break;
+         case Session.DUPS_OK_ACKNOWLEDGE:
+            result = sessionFactory.createSession(user, pass, deliveryTransacted, true, false, actPreAck, actDupsOkBatchSize);
+            break;
+         case Session.CLIENT_ACKNOWLEDGE:
+            result = sessionFactory.createSession(user, pass, deliveryTransacted, false, false, actPreAck, actTxBatchSize);
+            break;
+         case JBossSession.SERVER_ACKNOWLEDGE:
+            result = sessionFactory.createSession(user, pass, deliveryTransacted, false, true, actPreAck, actTxBatchSize);
+            break;
+         default:
+            throw new IllegalArgumentException("Invalid ackmode: " + ackMode);
       }
 
-      return factory;
+      log.debug("Using queue connection " + result);
+
+      return result;
+
    }
 
    /**
@@ -1271,74 +1309,74 @@
    /**
     * Setup the factory
     */
-   protected void setup()
+   protected void setup() throws MessagingException
    {
       if (getTransportType() != null)
       {
          TransportConfiguration transportConf = new TransportConfiguration(getTransportType(), getTransportConfiguration());
          TransportConfiguration backup = getBackUpTransportType() == null ? null : new TransportConfiguration(getBackUpTransportType(), getBackupTransportConfiguration());
-         factory = new JBossConnectionFactory(transportConf,
-                                              backup,
-                                              getLoadBalancingPolicyClassName() == null ? ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME : getLoadBalancingPolicyClassName(),
-                                              getPingPeriod() == null ? ClientSessionFactoryImpl.DEFAULT_PING_PERIOD : getPingPeriod(),
-                                              getConnectionTTL() == null ? ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL : getConnectionTTL(),
-                                              getCallTimeout() == null ? ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT : getCallTimeout(),
-                                              getClientID(),
-                                              getDupsOKBatchSize() == null ? ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE : getDupsOKBatchSize(),
-                                              getTransactionBatchSize() == null ? ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE : getTransactionBatchSize(),
-                                              getConsumerWindowSize() == null ? ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE : getConsumerWindowSize(),
-                                              getConsumerMaxRate() == null ? ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE : getConsumerMaxRate(),
-                                              getSendWindowSize() == null ? ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE : getSendWindowSize(),
-                                              getProducerMaxRate() == null ? ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE : getProducerMaxRate(),
-                                              getMinLargeMessageSize() == null ? ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE : getMinLargeMessageSize(),
-                                              getBlockOnAcknowledge() == null ? ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE : getBlockOnAcknowledge(),
-                                              getBlockOnNonPersistentSend() == null ? ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND : getBlockOnNonPersistentSend(),
-                                              getBlockOnPersistentSend() == null ? ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND : getBlockOnPersistentSend(),
-                                              getAutoGroup() == null ? ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP : getAutoGroup(),
-                                              getMaxConnections() == null ? ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS : getMaxConnections(),
-                                              getPreAcknowledge() == null ? ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE : getPreAcknowledge(),
-                                              getRetryInterval() == null ? ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL : getRetryInterval(),
-                                              getRetryIntervalMultiplier() == null ? ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER : getRetryIntervalMultiplier(),
-                                              getMaxRetriesBeforeFailover() == null ? ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER : getMaxRetriesBeforeFailover(),
-                                              getMaxRetriesAfterFailover() == null ? ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER : getMaxRetriesAfterFailover()
+         jBossConnectionFactory = new JBossConnectionFactory(transportConf,
+                                                             backup,
+                                                             getLoadBalancingPolicyClassName() == null ? ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME : getLoadBalancingPolicyClassName(),
+                                                             getPingPeriod() == null ? ClientSessionFactoryImpl.DEFAULT_PING_PERIOD : getPingPeriod(),
+                                                             getConnectionTTL() == null ? ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL : getConnectionTTL(),
+                                                             getCallTimeout() == null ? ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT : getCallTimeout(),
+                                                             getClientID(),
+                                                             getDupsOKBatchSize() == null ? ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE : getDupsOKBatchSize(),
+                                                             getTransactionBatchSize() == null ? ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE : getTransactionBatchSize(),
+                                                             getConsumerWindowSize() == null ? ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE : getConsumerWindowSize(),
+                                                             getConsumerMaxRate() == null ? ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE : getConsumerMaxRate(),
+                                                             getSendWindowSize() == null ? ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE : getSendWindowSize(),
+                                                             getProducerMaxRate() == null ? ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE : getProducerMaxRate(),
+                                                             getMinLargeMessageSize() == null ? ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE : getMinLargeMessageSize(),
+                                                             getBlockOnAcknowledge() == null ? ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE : getBlockOnAcknowledge(),
+                                                             getBlockOnNonPersistentSend() == null ? ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND : getBlockOnNonPersistentSend(),
+                                                             getBlockOnPersistentSend() == null ? ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND : getBlockOnPersistentSend(),
+                                                             getAutoGroup() == null ? ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP : getAutoGroup(),
+                                                             getMaxConnections() == null ? ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS : getMaxConnections(),
+                                                             getPreAcknowledge() == null ? ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE : getPreAcknowledge(),
+                                                             getRetryInterval() == null ? ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL : getRetryInterval(),
+                                                             getRetryIntervalMultiplier() == null ? ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER : getRetryIntervalMultiplier(),
+                                                             getMaxRetriesBeforeFailover() == null ? ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER : getMaxRetriesBeforeFailover(),
+                                                             getMaxRetriesAfterFailover() == null ? ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER : getMaxRetriesAfterFailover()
          );
-         configured.set(true);
       }
       else if (getDiscoveryGroupAddress() != null && getDiscoveryGroupPort() != null)
       {
-         factory = new JBossConnectionFactory(getDiscoveryGroupAddress(),
-                                              getDiscoveryGroupPort(),
-                                              getDiscoveryRefreshTimeout() == null ? ConfigurationImpl.DEFAULT_BROADCAST_REFRESH_TIMEOUT : getDiscoveryRefreshTimeout(),
-                                              getDiscoveryInitialWaitTimeout() == null ? ClientSessionFactoryImpl.DEFAULT_DISCOVERY_INITIAL_WAIT : getDiscoveryInitialWaitTimeout(),
-                                              getLoadBalancingPolicyClassName() == null ? ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME : getLoadBalancingPolicyClassName(),
-                                              getPingPeriod() == null ? ClientSessionFactoryImpl.DEFAULT_PING_PERIOD : getPingPeriod(),
-                                              getConnectionTTL() == null ? ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL : getConnectionTTL(),
-                                              getCallTimeout() == null ? ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT : getCallTimeout(),
-                                              getClientID(),
-                                              getDupsOKBatchSize() == null ? ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE : getDupsOKBatchSize(),
-                                              getTransactionBatchSize() == null ? ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE : getTransactionBatchSize(),
-                                              getConsumerWindowSize() == null ? ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE : getConsumerWindowSize(),
-                                              getConsumerMaxRate() == null ? ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE : getConsumerMaxRate(),
-                                              getSendWindowSize() == null ? ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE : getSendWindowSize(),
-                                              getProducerMaxRate() == null ? ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE : getProducerMaxRate(),
-                                              getMinLargeMessageSize() == null ? ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE : getMinLargeMessageSize(),
-                                              getBlockOnAcknowledge() == null ? ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE : getBlockOnAcknowledge(),
-                                              getBlockOnNonPersistentSend() == null ? ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND : getBlockOnNonPersistentSend(),
-                                              getBlockOnPersistentSend() == null ? ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND : getBlockOnPersistentSend(),
-                                              getAutoGroup() == null ? ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP : getAutoGroup(),
-                                              getMaxConnections() == null ? ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS : getMaxConnections(),
-                                              getPreAcknowledge() == null ? ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE : getPreAcknowledge(),
-                                              getRetryInterval() == null ? ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL : getRetryInterval(),
-                                              getRetryIntervalMultiplier() == null ? ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER : getRetryIntervalMultiplier(),
-                                              getMaxRetriesBeforeFailover() == null ? ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER : getMaxRetriesBeforeFailover(),
-                                              getMaxRetriesAfterFailover() == null ? ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER : getMaxRetriesAfterFailover()
+         jBossConnectionFactory = new JBossConnectionFactory(getDiscoveryGroupAddress(),
+                                                             getDiscoveryGroupPort(),
+                                                             getDiscoveryRefreshTimeout() == null ? ConfigurationImpl.DEFAULT_BROADCAST_REFRESH_TIMEOUT : getDiscoveryRefreshTimeout(),
+                                                             getDiscoveryInitialWaitTimeout() == null ? ClientSessionFactoryImpl.DEFAULT_DISCOVERY_INITIAL_WAIT : getDiscoveryInitialWaitTimeout(),
+                                                             getLoadBalancingPolicyClassName() == null ? ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME : getLoadBalancingPolicyClassName(),
+                                                             getPingPeriod() == null ? ClientSessionFactoryImpl.DEFAULT_PING_PERIOD : getPingPeriod(),
+                                                             getConnectionTTL() == null ? ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL : getConnectionTTL(),
+                                                             getCallTimeout() == null ? ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT : getCallTimeout(),
+                                                             getClientID(),
+                                                             getDupsOKBatchSize() == null ? ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE : getDupsOKBatchSize(),
+                                                             getTransactionBatchSize() == null ? ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE : getTransactionBatchSize(),
+                                                             getConsumerWindowSize() == null ? ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE : getConsumerWindowSize(),
+                                                             getConsumerMaxRate() == null ? ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE : getConsumerMaxRate(),
+                                                             getSendWindowSize() == null ? ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE : getSendWindowSize(),
+                                                             getProducerMaxRate() == null ? ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE : getProducerMaxRate(),
+                                                             getMinLargeMessageSize() == null ? ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE : getMinLargeMessageSize(),
+                                                             getBlockOnAcknowledge() == null ? ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE : getBlockOnAcknowledge(),
+                                                             getBlockOnNonPersistentSend() == null ? ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND : getBlockOnNonPersistentSend(),
+                                                             getBlockOnPersistentSend() == null ? ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND : getBlockOnPersistentSend(),
+                                                             getAutoGroup() == null ? ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP : getAutoGroup(),
+                                                             getMaxConnections() == null ? ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS : getMaxConnections(),
+                                                             getPreAcknowledge() == null ? ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE : getPreAcknowledge(),
+                                                             getRetryInterval() == null ? ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL : getRetryInterval(),
+                                                             getRetryIntervalMultiplier() == null ? ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER : getRetryIntervalMultiplier(),
+                                                             getMaxRetriesBeforeFailover() == null ? ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER : getMaxRetriesBeforeFailover(),
+                                                             getMaxRetriesAfterFailover() == null ? ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER : getMaxRetriesAfterFailover()
          );
-         configured.set(true);
       }
       else
       {
          log.fatal("must provide either TransportTyoe or DiscoveryGroupAddress and DiscoveryGroupPort for JBM ResourceAdapter");
       }
+
+         sessionFactory = jBossConnectionFactory.getCoreFactory();
    }
 
 
@@ -1381,4 +1419,9 @@
       }
       return val;
    }
+
+   public JBossConnectionFactory getJBossConnectionFactory()
+   {
+      return jBossConnectionFactory;
+   }
 }

Modified: trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivation.java
===================================================================
--- trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivation.java	2009-03-02 11:54:04 UTC (rev 5954)
+++ trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivation.java	2009-03-02 12:00:26 UTC (rev 5955)
@@ -21,27 +21,25 @@
  */
 package org.jboss.messaging.ra.inflow;
 
-import org.jboss.messaging.jms.client.JBossConnectionFactory;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.FailureListener;
+import org.jboss.messaging.jms.client.JBossSession;
+import org.jboss.messaging.jms.JBossDestination;
 import org.jboss.messaging.ra.JBMResourceAdapter;
 import org.jboss.messaging.ra.Util;
-import org.jboss.messaging.ra.JBMMessageListener;
-import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.utils.SimpleString;
+import org.jboss.tm.TransactionManagerLocator;
 
-import java.lang.reflect.Method;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.List;
-import java.util.ArrayList;
-
-import javax.jms.Connection;
 import javax.jms.Destination;
-import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
 import javax.jms.Queue;
-import javax.jms.QueueConnection;
+import javax.jms.Session;
 import javax.jms.Topic;
-import javax.jms.TopicConnection;
 import javax.naming.Context;
 import javax.naming.InitialContext;
 import javax.resource.ResourceException;
@@ -49,59 +47,76 @@
 import javax.resource.spi.work.Work;
 import javax.resource.spi.work.WorkManager;
 import javax.transaction.TransactionManager;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.jboss.tm.TransactionManagerLocator;
-
 /**
  * The activation.
- * 
+ *
  * @author <a href="adrian at jboss.com">Adrian Brock</a>
  * @author <a href="jesper.pedersen at jboss.org">Jesper Pedersen</a>
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
  * @version $Revision: $
  */
-public class JBMActivation implements ExceptionListener
+public class JBMActivation implements FailureListener
 {
-   /** The logger */
+   /**
+    * The logger
+    */
    private static final Logger log = Logger.getLogger(JBMActivation.class);
 
-   /** Trace enabled */
+   /**
+    * Trace enabled
+    */
    private static boolean trace = log.isTraceEnabled();
-   
-   /** The onMessage method */
-   public static final Method ONMESSAGE; 
-   
-   /** The resource adapter */
+
+   /**
+    * The onMessage method
+    */
+   public static final Method ONMESSAGE;
+
+   /**
+    * The resource adapter
+    */
    protected JBMResourceAdapter ra;
-   
-   /** The activation spec */
+
+   /**
+    * The activation spec
+    */
    protected JBMActivationSpec spec;
 
-   /** The message endpoint factory */
+   /**
+    * The message endpoint factory
+    */
    protected MessageEndpointFactory endpointFactory;
-   
-   /** Whether delivery is active */
+
+   /**
+    * Whether delivery is active
+    */
    protected AtomicBoolean deliveryActive = new AtomicBoolean(false);
 
-   /** Whether we are in the failure recovery loop */
+   /**
+    * Whether we are in the failure recovery loop
+    */
    private AtomicBoolean inFailure = new AtomicBoolean(false);
 
-   /** The destination */
-   protected Destination destination;
-
-   /** The destination type */
+   /**
+    * The destination type
+    */
    protected boolean isTopic = false;
-   
-   /** The connection */
-   protected Connection connection;
-   
-   /** Is the delivery transacted */
+
+   /**
+    * Is the delivery transacted
+    */
    protected boolean isDeliveryTransacted;
-   
-   /** The message handler pool */
-   //protected JBMMessageHandlerPool pool;
 
-   /** The TransactionManager */
+   JBossDestination destination;
+
+   /**
+    * The TransactionManager
+    */
    protected TransactionManager tm;
 
    private List<JBMMessageHandler> handlers = new ArrayList<JBMMessageHandler>();
@@ -110,7 +125,7 @@
    {
       try
       {
-         ONMESSAGE = MessageListener.class.getMethod("onMessage", new Class[] { Message.class });
+         ONMESSAGE = MessageListener.class.getMethod("onMessage", new Class[]{Message.class});
       }
       catch (Exception e)
       {
@@ -120,15 +135,18 @@
 
    /**
     * Constructor
-    * @param ra The resource adapter
+    *
+    * @param ra              The resource adapter
     * @param endpointFactory The endpoint factory
-    * @param spec The activation spec
-    * @exception ResourceException Thrown if an error occurs
+    * @param spec            The activation spec
+    * @throws ResourceException Thrown if an error occurs
     */
    public JBMActivation(JBMResourceAdapter ra, MessageEndpointFactory endpointFactory, JBMActivationSpec spec) throws ResourceException
    {
       if (trace)
+      {
          log.trace("constructor(" + ra + ", " + endpointFactory + ", " + spec + ")");
+      }
 
       this.ra = ra;
       this.endpointFactory = endpointFactory;
@@ -145,113 +163,119 @@
 
    /**
     * Get the activation spec
+    *
     * @return The value
     */
    public JBMActivationSpec getActivationSpec()
    {
       if (trace)
+      {
          log.trace("getActivationSpec()");
+      }
 
       return spec;
    }
 
    /**
     * Get the message endpoint factory
+    *
     * @return The value
     */
    public MessageEndpointFactory getMessageEndpointFactory()
    {
       if (trace)
+      {
          log.trace("getMessageEndpointFactory()");
+      }
 
       return endpointFactory;
    }
 
    /**
     * Get whether delivery is transacted
+    *
     * @return The value
     */
    public boolean isDeliveryTransacted()
    {
       if (trace)
+      {
          log.trace("isDeliveryTransacted()");
+      }
 
       return isDeliveryTransacted;
    }
 
    /**
     * Get the work manager
+    *
     * @return The value
     */
    public WorkManager getWorkManager()
    {
       if (trace)
+      {
          log.trace("getWorkManager()");
+      }
 
       return ra.getWorkManager();
    }
-   
+
    /**
     * Get the transaction manager
+    *
     * @return The value
     */
    public TransactionManager getTransactionManager()
    {
       if (trace)
+      {
          log.trace("getTransactionManager()");
+      }
 
       if (tm == null)
+      {
          tm = TransactionManagerLocator.locateTransactionManager();
+      }
 
       return tm;
    }
 
    /**
-    * Get the connection
-    * @return The value
-    */
-   public Connection getConnection()
-   {
-      if (trace)
-         log.trace("getConnection()");
-
-      return connection;
-   }
-
-   /**
-    * Get the destination 
-    * @return The value
-    */
-   public Destination getDestination()
-   {
-      if (trace)
-         log.trace("getDestination()");
-
-      return destination;
-   }
-
-   /**
     * Is the destination a topic
+    *
     * @return The value
     */
    public boolean isTopic()
    {
       if (trace)
+      {
          log.trace("isTopic()");
+      }
 
       return isTopic;
    }
-   
+
    /**
     * Start the activation
+    *
     * @throws ResourceException Thrown if an error occurs
     */
    public void start() throws ResourceException
    {
       if (trace)
+      {
          log.trace("start()");
+      }
 
-      ra.getWorkManager().scheduleWork(new SetupActivation());
+      try
+      {
+         setup();
+      }
+      catch (Exception e)
+      {
+         throw new ResourceException("unable to start Activation", e);
+      }
       deliveryActive.set(true);
    }
 
@@ -261,7 +285,9 @@
    public void stop()
    {
       if (trace)
+      {
          log.trace("stop()");
+      }
 
       deliveryActive.set(false);
       teardown();
@@ -269,16 +295,19 @@
 
    /**
     * Handles any failure by trying to reconnect
+    *
     * @param failure The reason for the failure
     */
    public void handleFailure(Throwable failure)
    {
       log.warn("Failure in jms activation " + spec, failure);
       int reconnectCount = 0;
-      
+
       // Only enter the failure loop once
       if (inFailure.getAndSet(true))
+      {
          return;
+      }
 
       try
       {
@@ -289,7 +318,9 @@
             try
             {
                if (spec.getReconnectIntervalMillis() > 0)
+               {
                   Thread.sleep(spec.getReconnectIntervalMillis());
+               }
             }
             catch (InterruptedException e)
             {
@@ -301,7 +332,7 @@
             try
             {
                setup();
-               log.info("Reconnected with messaging provider.");            
+               log.info("Reconnected with messaging provider.");
                break;
             }
             catch (Throwable t)
@@ -320,12 +351,15 @@
 
    /**
     * On exception
+    *
     * @param exception The reason for the failure
     */
    public void onException(JMSException exception)
    {
       if (trace)
+      {
          log.trace("onException(" + exception + ")");
+      }
 
       handleFailure(exception);
    }
@@ -333,35 +367,26 @@
 
    /**
     * Setup the activation
+    *
     * @throws Exception Thrown if an error occurs
     */
    protected void setup() throws Exception
    {
       log.debug("Setting up " + spec);
-      
-      Context ctx = new InitialContext();
-      log.debug("Using context " + ctx.getEnvironment() + " for " + spec);
-      try
+
+      setupDestination();
+      for (int i = 0; i < spec.getMaxSessionInt(); i++)
       {
-         setupDestination(ctx);
-         setupConnection(ctx);
-      }
-      finally
-      {
-         if (ctx != null)
-            ctx.close();
-      }
-      for(int i = 0; i < spec.getMaxSessionInt(); i++)
-      {
-         JBMMessageHandler handler = new JBMMessageHandler(this);
+         ClientSession session = setupSession(spec.getUser(), spec.getPassword(), spec.getClientId());
+         JBMMessageHandler handler = new JBMMessageHandler(this, session);
          handler.setup();
+         session.start();
          handlers.add(handler);
       }
 
-      connection.start();
       log.debug("Setup complete " + this);
    }
-   
+
    /**
     * Teardown the activation
     */
@@ -373,128 +398,30 @@
       {
          handler.teardown();
       }
-      teardownConnection();
-      teardownDestination();
 
       log.debug("Tearing down complete " + this);
    }
-   
-   /**
-    * Setup the destination
-    * @param ctx The naming context
-    * @throws Exception Thrown if an error occurs
-    */
-   protected void setupDestination(Context ctx) throws Exception
-   {
-      if (trace)
-         log.trace("setupDestination(" + ctx + ")");
 
-      String destinationName = spec.getDestination();
 
-      String destinationTypeString = spec.getDestinationType();
-      if (destinationTypeString != null && !destinationTypeString.trim().equals(""))
-      {
-         log.debug("Destination type defined as " + destinationTypeString);
 
-         Class<?> destinationType;
-         if (Topic.class.getName().equals(destinationTypeString))
-         {
-            destinationType = Topic.class;
-            isTopic = true;
-         }
-         else
-         {
-            destinationType = Queue.class;
-         }
-
-         log.debug("Retrieving destination " + destinationName + " of type " + destinationType.getName());
-         destination = (Destination) Util.lookup(ctx, destinationName, destinationType);
-      }
-      else
-      {
-         log.debug("Destination type not defined");
-         log.debug("Retrieving destination " + destinationName + " of type " + Destination.class.getName());
-
-         destination = (Destination) Util.lookup(ctx, destinationName, Destination.class);
-         if (destination instanceof Topic)
-         {
-            isTopic = true;
-         }
-      }
-
-      log.debug("Got destination " + destination + " from " + destinationName);
-   }
-   
    /**
-    * Teardown the destination
-    */
-   protected void teardownDestination()
-   {
-      if (trace)
-         log.trace("teardownDestination()");
-
-      destination = null;
-   }
-   
-   /**
-    * Setup the Connection
-    * @param ctx the naming context
-    * @throws Exception for any error
-    */
-   protected void setupConnection(Context ctx) throws Exception
-   {
-      log.debug("Setup connection " + this);
-
-      String user = spec.getUser();
-      String pass = spec.getPassword();
-      String clientID = spec.getClientId();
-
-      if (isTopic)
-         connection = setupTopicConnection(ctx, user, pass, clientID);
-      else
-         connection = setupQueueConnection(ctx, user, pass, clientID);
-      
-      log.debug("Established connection " + this);
-   }
-   
-   /**
-    * Setup a queue connection
-    * @param ctx The naming context
-    * @param user The user
-    * @param pass The password
+    * Setup a session
+    *
+    * @param user     The user
+    * @param pass     The password
     * @param clientID The client id
     * @return The connection
     * @throws Exception Thrown if an error occurs
     */
-   protected QueueConnection setupQueueConnection(Context ctx, String user, String pass, String clientID) throws Exception
+   protected ClientSession setupSession(String user, String pass, String clientID) throws Exception
    {
-      if (trace)
-         log.trace("setupQueueConnection(" + ctx + ", " + user + ", ****, " + clientID + ")");
+      ClientSession result = null;
 
-      QueueConnection result = null;
-
-      JBossConnectionFactory jcf = ra.getJBossConnectionFactory();
-
-      if (isDeliveryTransacted)
-      {
-         if (user != null)
-            result = jcf.createXAQueueConnection(user, pass);
-         else
-            result = jcf.createXAQueueConnection();
-      }
-      else
-      {
-         if (user != null)
-            result = jcf.createQueueConnection(user, pass);
-         else
-            result = jcf.createQueueConnection();
-      }
       try
       {
-         if (clientID != null)
-            result.setClientID(clientID);
+         result = ra.createSession(spec.getAcknowledgeModeInt(), user, pass, ra.getPreAcknowledge(), ra.getDupsOKBatchSize(), ra.getTransactionBatchSize(), isDeliveryTransacted);
 
-         result.setExceptionListener(this);
+         result.addFailureListener(this);
 
          log.debug("Using queue connection " + result);
 
@@ -505,176 +432,73 @@
          try
          {
             if (result != null)
+            {
                result.close();
+            }
          }
          catch (Exception e)
          {
             log.trace("Ignored error closing connection", e);
          }
          if (t instanceof Exception)
+         {
             throw (Exception) t;
+         }
          throw new RuntimeException("Error configuring connection", t);
       }
    }
-   
-   /**
-    * Setup a topic connection
-    * @param ctx The naming context
-    * @param user The user
-    * @param pass The password
-    * @param clientID The client id
-    * @return The connection
-    * @throws Exception Thrown if an error occurs
-    */
-   protected TopicConnection setupTopicConnection(Context ctx, String user, String pass, String clientID) throws Exception
+
+   public SimpleString getAddress()
    {
+      return destination.getSimpleAddress();
+   }
+
+   protected void setupDestination() throws Exception
+   {
+      Context ctx = new InitialContext();
+      log.debug("Using context " + ctx.getEnvironment() + " for " + spec);
       if (trace)
-         log.trace("setupTopicConnection(" + ctx + ", " + user + ", ****, " + clientID + ")");
+         log.trace("setupDestination(" + ctx + ")");
 
-      TopicConnection result = null;
+      String destinationName = spec.getDestination();
 
-      JBossConnectionFactory jcf = ra.getJBossConnectionFactory();
-
-      if (isDeliveryTransacted)
+      String destinationTypeString = spec.getDestinationType();
+      if (destinationTypeString != null && !destinationTypeString.trim().equals(""))
       {
-         if (user != null)
-            result = jcf.createXATopicConnection(user, pass);
-         else
-            result = jcf.createXATopicConnection();
-      }
-      else
-      {
-         if (user != null)
-            result = jcf.createTopicConnection(user, pass);
-         else
-            result = jcf.createTopicConnection();
-      }
-      try
-      {
-         if (clientID != null)
-            result.setClientID(clientID);
+         log.debug("Destination type defined as " + destinationTypeString);
 
-         result.setExceptionListener(this);
-
-         log.debug("Using topic connection " + result);
-
-         return result;
-      }
-      catch (Throwable t)
-      {
-         try
+         Class<?> destinationType;
+         if (Topic.class.getName().equals(destinationTypeString))
          {
-            if (result != null)
-               result.close();
+            destinationType = Topic.class;
+            isTopic = true;
          }
-         catch (Exception e)
+         else
          {
-            log.trace("Ignored error closing connection", e);
+            destinationType = Queue.class;
          }
-         if (t instanceof Exception)
-            throw (Exception) t;
-         throw new RuntimeException("Error configuring connection", t);
-      }
-   }
-   
-   /**
-    * Teardown the connection
-    */
-   protected void teardownConnection()
-   {
-      if (trace)
-         log.trace("teardownConnection()");
 
-      try
-      {
-         if (connection != null)
-         {
-            log.debug("Closing the " + connection);
-            connection.close();
-         }
+         log.debug("Retrieving destination " + destinationName + " of type " + destinationType.getName());
+         destination = (JBossDestination) Util.lookup(ctx, destinationName, destinationType);
       }
-      catch (Throwable t)
+      else
       {
-         log.debug("Error closing the connection " + connection, t);
-      }
-      connection = null;
-   }
-   
-   /**
-    * Setup the pool
-    * @throws Exception for any error
-    */
-   /*protected void setupPool() throws Exception
-   {
-      pool = new JBMMessageHandlerPool(this);
-      log.debug("Created pool " + pool);
+         log.debug("Destination type not defined");
+         log.debug("Retrieving destination " + destinationName + " of type " + Destination.class.getName());
 
-      log.debug("Starting pool " + pool);
-      pool.start();
-      log.debug("Started pool " + pool);
-
-      log.debug("Starting delivery " + connection);
-      connection.start();
-      log.debug("Started delivery " + connection);
-   }*/
-
-   /**
-    * Teardown the pool
-    */
-   /*protected void teardownPool()
-   {
-      try
-      {
-         if (connection != null)
+         destination = (JBossDestination) Util.lookup(ctx, destinationName, Destination.class);
+         if (destination instanceof Topic)
          {
-            log.debug("Stopping delivery " + connection);
-            connection.stop();
+            isTopic = true;
          }
       }
-      catch (Throwable t)
-      {
-         log.debug("Error stopping delivery " + connection, t);
-      }
 
-      try
-      {
-         if (pool != null)
-         {
-            log.debug("Stopping the pool " + pool);
-            pool.stop();
-         }
-      }
-      catch (Throwable t)
-      {
-         log.debug("Error clearing the pool " + pool, t);
-      }
-      pool = null;
-   }*/
-
-   /**
-    * Handles the setup
-    */
-   private class SetupActivation implements Work
-   {
-      public void run()
-      {
-         try
-         {
-            setup();
-         }
-         catch (Throwable t)
-         {
-            handleFailure(t);
-         }
-      }
-
-      public void release()
-      {
-      }
+      log.debug("Got destination " + destination + " from " + destinationName);
    }
 
    /**
     * Get a string representation
+    *
     * @return The value
     */
    public String toString()
@@ -684,14 +508,32 @@
       buffer.append("spec=").append(spec.getClass().getName());
       buffer.append(" mepf=").append(endpointFactory.getClass().getName());
       buffer.append(" active=").append(deliveryActive.get());
-      if (destination != null)
-         buffer.append(" destination=").append(destination);
-      if (connection != null)
-         buffer.append(" connection=").append(connection);
+      if (spec.getDestination() != null)
+      {
+         buffer.append(" destination=").append(spec.getDestination());
+      }
+      /*if (session != null)
+      {
+         buffer.append(" connection=").append(session);
+      }*/
       //if (pool != null)
-         //buffer.append(" pool=").append(pool.getClass().getName());
+      //buffer.append(" pool=").append(pool.getClass().getName());
       buffer.append(" transacted=").append(isDeliveryTransacted);
       buffer.append(')');
       return buffer.toString();
    }
+
+   public boolean connectionFailed(MessagingException me)
+   {
+      if (trace)
+      {
+         log.trace("onException(" + me + ")");
+      }
+
+      handleFailure(me);
+      return true;
+   }
 }
+
+
+

Modified: trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java	2009-03-02 11:54:04 UTC (rev 5954)
+++ trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java	2009-03-02 12:00:26 UTC (rev 5955)
@@ -21,23 +21,28 @@
  */
 package org.jboss.messaging.ra.inflow;
 
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.jboss.messaging.jms.JBossTopic;
+import org.jboss.messaging.jms.client.JBossMessage;
+import org.jboss.messaging.utils.SimpleString;
 
+import javax.jms.InvalidClientIDException;
 import javax.jms.JMSException;
+import javax.jms.MessageListener;
 import javax.jms.Session;
-import javax.jms.XASession;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Connection;
-import javax.jms.XAConnection;
-import javax.jms.Topic;
-import javax.jms.Message;
+import javax.resource.spi.endpoint.MessageEndpoint;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.transaction.Status;
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
-import javax.transaction.Status;
-import javax.transaction.xa.XAResource;
-import javax.resource.spi.endpoint.MessageEndpoint;
-import javax.resource.spi.endpoint.MessageEndpointFactory;
+import java.util.UUID;
+
 /**
  * The message handler
  *
@@ -46,95 +51,128 @@
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
  * @version $Revision: $
  */
-public class JBMMessageHandler implements MessageListener
+public class JBMMessageHandler implements MessageHandler
 {
-     /** The logger */
+   /**
+    * The logger
+    */
    private static final Logger log = Logger.getLogger(JBMMessageHandler.class);
 
-   /** Trace enabled */
-   private static boolean trace  = log.isTraceEnabled();
+   /**
+    * Trace enabled
+    */
+   private static boolean trace = log.isTraceEnabled();
 
-   /** The session */
-   private Session session;
+   /**
+    * The session
+    */
+   private final ClientSession session;
 
-   /** Any XA session */
-   private XASession xaSession;
-
-   /** The endpoint */
+   /**
+    * The endpoint
+    */
    private MessageEndpoint endpoint;
 
    private final JBMActivation activation;
 
-   /** The transaction demarcation strategy factory */
+   /**
+    * The transaction demarcation strategy factory
+    */
    private DemarcationStrategyFactory strategyFactory = new DemarcationStrategyFactory();
 
-   public JBMMessageHandler(JBMActivation activation)
+   public JBMMessageHandler(JBMActivation activation, ClientSession session)
    {
       this.activation = activation;
+      this.session = session;
    }
 
    public void setup() throws Exception
    {
       if (trace)
+      {
          log.trace("setup()");
+      }
 
       JBMActivationSpec spec = activation.getActivationSpec();
       String selector = spec.getMessageSelector();
 
-      Connection connection = activation.getConnection();
-
-      // Create the session
-      if (activation.isDeliveryTransacted())
-      {
-         xaSession = ((XAConnection)connection).createXASession();
-         session = xaSession.getSession();
-      }
-      else
-      {
-         boolean transacted = spec.isSessionTransacted();
-         int acknowledge = spec.getAcknowledgeModeInt();
-         session = connection.createSession(transacted, acknowledge);
-      }
-
       // Create the message consumer
-      MessageConsumer messageConsumer;
+      ClientConsumer consumer;
+      SimpleString selectorString = selector == null || selector.trim().equals("") ? null : new SimpleString(selector);
       if (activation.isTopic() && spec.isSubscriptionDurable())
       {
-         Topic topic = (Topic) activation.getDestination();
          String subscriptionName = spec.getSubscriptionName();
 
-         if (selector == null || selector.trim().equals(""))
+         // Durable sub
+
+         if (activation.getActivationSpec().getClientId() == null)
          {
-            messageConsumer = session.createDurableSubscriber(topic, subscriptionName);
+            throw new InvalidClientIDException("Cannot create durable subscription - client ID has not been set");
          }
+
+         SimpleString queueName = new SimpleString(JBossTopic.createQueueNameForDurableSubscription(activation.getActivationSpec().getClientId(),
+                                                                                                    subscriptionName));
+
+         SessionQueueQueryResponseMessage subResponse = session.queueQuery(queueName);
+
+         if (!subResponse.isExists())
+         {
+            session.createQueue(activation.getAddress(), queueName, selectorString, true, false);
+         }
          else
          {
-            messageConsumer = session.createDurableSubscriber(topic, subscriptionName, selector, false);
+            // Already exists
+            if (subResponse.getConsumerCount() > 0)
+            {
+               throw new javax.jms.IllegalStateException("Cannot create a subscriber on the durable subscription since it already has subscriber(s)");
+            }
+
+            SimpleString oldFilterString = subResponse.getFilterString();
+
+            boolean selectorChanged = (selector == null && oldFilterString != null) || (oldFilterString == null && selector != null) ||
+                                      (oldFilterString != null && selector != null && !oldFilterString.equals(selector));
+
+            SimpleString oldTopicName = subResponse.getAddress();
+
+            boolean topicChanged = !oldTopicName.equals(activation.getAddress());
+
+            if (selectorChanged || topicChanged)
+            {
+               // Delete the old durable sub
+               session.deleteQueue(queueName);
+
+               // Create the new one
+               session.createQueue(activation.getAddress(), queueName, selectorString, true, false);
+            }
          }
+         consumer = session.createConsumer(queueName, null, false);
       }
       else
       {
-         if (selector == null || selector.trim().equals(""))
+         SimpleString queueName;
+         if (activation.isTopic())
          {
-            messageConsumer = session.createConsumer(activation.getDestination());
+            queueName = new SimpleString(UUID.randomUUID().toString());
+            session.createQueue(activation.getAddress(), queueName, selectorString, false, false);
          }
          else
          {
-            messageConsumer = session.createConsumer(activation.getDestination(), selector);
+            queueName = activation.getAddress();
          }
+         consumer = session.createConsumer(queueName, selectorString);
       }
 
       // Create the endpoint
       MessageEndpointFactory endpointFactory = activation.getMessageEndpointFactory();
-      XAResource xaResource = null;
-
-      if (activation.isDeliveryTransacted() && xaSession != null)
-         xaResource = xaSession.getXAResource();
-
-      endpoint = endpointFactory.createEndpoint(xaResource);
-
-      // Set the message listener
-      messageConsumer.setMessageListener(this);
+      if (activation.isDeliveryTransacted())
+      {
+         endpoint = endpointFactory.createEndpoint(session);
+      }
+      else
+      {
+         endpoint = endpointFactory.createEndpoint(null);
+      }
+      consumer.setMessageHandler(this);
    }
 
    /**
@@ -143,12 +181,16 @@
    public void teardown()
    {
       if (trace)
+      {
          log.trace("teardown()");
+      }
 
       try
       {
          if (endpoint != null)
+         {
             endpoint.release();
+         }
       }
       catch (Throwable t)
       {
@@ -157,18 +199,10 @@
 
       try
       {
-         if (xaSession != null)
-            xaSession.close();
-      }
-      catch (Throwable t)
-      {
-         log.debug("Error releasing xaSession " + xaSession, t);
-      }
-
-      try
-      {
          if (session != null)
+         {
             session.close();
+         }
       }
       catch (Throwable t)
       {
@@ -176,11 +210,7 @@
       }
    }
 
-   /**
-    * On message
-    * @param message The message
-    */
-   public void onMessage(Message message)
+   public void onMessage(ClientMessage message)
    {
       if (trace)
          log.trace("onMessage(" + message + ")");
@@ -195,14 +225,39 @@
          log.warn("Unable to create transaction: " + throwable.getMessage());
          txnStrategy = null;
       }
+
+      JBossMessage jbm = JBossMessage.createMessage(message, session);
+
       try
       {
-         endpoint.beforeDelivery(JBMActivation.ONMESSAGE);
+         jbm.doBeforeReceive();
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to prepare message for receipt", e);
 
+         return;
+      }
+
+      if (activation.getActivationSpec().getAcknowledgeModeInt() == Session.SESSION_TRANSACTED ||
+          activation.getActivationSpec().getAcknowledgeModeInt() == Session.CLIENT_ACKNOWLEDGE)
+      {
          try
          {
+            message.acknowledge();
+         }
+         catch (MessagingException e)
+         {
+            log.error("Failed to process message", e);
+         }
+      }
+      try
+      {
+         endpoint.beforeDelivery(JBMActivation.ONMESSAGE);
+         try
+         {
             MessageListener listener = (MessageListener) endpoint;
-            listener.onMessage(message);
+            listener.onMessage(jbm);
          }
          finally
          {
@@ -214,13 +269,18 @@
          log.error("Unexpected error delivering message " + message, t);
 
          if (txnStrategy != null)
+         {
             txnStrategy.error();
+         }
       }
       finally
       {
          if (txnStrategy != null)
+         {
             txnStrategy.end();
+         }
       }
+
    }
 
    /**
@@ -230,12 +290,15 @@
    {
       /**
        * Get the transaction demarcation strategy
+       *
        * @return The strategy
        */
       TransactionDemarcationStrategy getStrategy()
       {
          if (trace)
+         {
             log.trace("getStrategy()");
+         }
 
          if (activation.isDeliveryTransacted())
          {
@@ -266,6 +329,7 @@
       * Start
       */
       void start() throws Throwable;
+
       /**
        * Error
        */
@@ -285,6 +349,7 @@
       /*
    * Start
    */
+
       public void start()
       {
       }
@@ -295,7 +360,9 @@
       public void error()
       {
          if (trace)
+         {
             log.trace("error()");
+         }
 
          final JBMActivationSpec spec = activation.getActivationSpec();
 
@@ -319,7 +386,8 @@
                   {
                      session.rollback();
                   }
-               } catch (JMSException e)
+               }
+               catch (MessagingException e)
                {
                   log.error("Failed to rollback session transaction", e);
                }
@@ -333,7 +401,9 @@
       public void end()
       {
          if (trace)
+         {
             log.trace("error()");
+         }
 
          final JBMActivationSpec spec = activation.getActivationSpec();
 
@@ -344,7 +414,8 @@
                try
                {
                   session.commit();
-               } catch (JMSException e)
+               }
+               catch (MessagingException e)
                {
                   log.error("Failed to commit session transaction", e);
                }
@@ -359,6 +430,7 @@
    private class XATransactionDemarcationStrategy implements TransactionDemarcationStrategy
    {
       private Transaction trans = null;
+
       private TransactionManager tm = activation.getTransactionManager();
 
       public void start() throws Throwable
@@ -368,7 +440,9 @@
          if (timeout > 0)
          {
             if (trace)
+            {
                log.trace("Setting transactionTimeout for JMSSessionPool to " + timeout);
+            }
 
             tm.setTransactionTimeout(timeout);
          }
@@ -380,19 +454,19 @@
             trans = tm.getTransaction();
 
             if (trace)
+            {
                log.trace(this + " using tx=" + trans);
+            }
 
-            if (xaSession != null)
+            if (!trans.enlistResource(session))
             {
-               XAResource res = xaSession.getXAResource();
+               throw new JMSException("could not enlist resource");
+            }
+            if (trace)
+            {
+               log.trace(this + " XAResource '" + session + " enlisted.");
+            }
 
-               if (!trans.enlistResource(res))
-               {
-                  throw new JMSException("could not enlist resource");
-               }
-               if (trace)
-                  log.trace(this + " XAResource '" + res + " enlisted.");
-            }
          }
          catch (Throwable t)
          {
@@ -414,7 +488,9 @@
          try
          {
             if (trace)
+            {
                log.trace(this + " using TM to mark TX for rollback tx=" + trans);
+            }
 
             trans.setRollbackOnly();
          }
@@ -431,24 +507,21 @@
             // Use the TM to commit the Tx (assert the correct association)
             Transaction currentTx = tm.getTransaction();
             if (trans.equals(currentTx) == false)
+            {
                throw new IllegalStateException("Wrong tx association: expected " + trans + " was " + currentTx);
+            }
 
             // Marked rollback
             if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK)
             {
                if (trace)
+               {
                   log.trace(this + " rolling back JMS transaction tx=" + trans);
+               }
 
                // Actually roll it back
                tm.rollback();
 
-               // NO XASession? then manually rollback.
-               // This is not so good but
-               // it's the best we can do if we have no XASession.
-               if (xaSession == null && activation.isDeliveryTransacted())
-               {
-                  session.rollback();
-               }
             }
             else if (trans.getStatus() == Status.STATUS_ACTIVE)
             {
@@ -457,28 +530,19 @@
                // a) everything goes well
                // b) app. exception was thrown
                if (trace)
+               {
                   log.trace(this + " commiting the JMS transaction tx=" + trans);
+               }
 
                tm.commit();
 
-               // NO XASession? then manually commit. This is not so good but
-               // it's the best we can do if we have no XASession.
-               if (xaSession == null && activation.isDeliveryTransacted())
-               {
-                  session.commit();
-               }
-
             }
             else
             {
                tm.suspend();
-
-               if (xaSession == null && activation.isDeliveryTransacted())
-               {
-                  session.rollback();
-               }
             }
-         } catch (Throwable t)
+         }
+         catch (Throwable t)
          {
             log.error(this + " failed to commit/rollback", t);
          }




More information about the jboss-cvs-commits mailing list