[jboss-cvs] JBoss Messaging SVN: r5955 - in trunk: src/main/org/jboss/messaging/core/server/impl and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Mar 2 07:00:26 EST 2009
Author: ataylor
Date: 2009-03-02 07:00:26 -0500 (Mon, 02 Mar 2009)
New Revision: 5955
Modified:
trunk/build-thirdparty.xml
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
trunk/src/main/org/jboss/messaging/integration/security/JBossASSecurityManager.java
trunk/src/main/org/jboss/messaging/integration/security/SecurityActions.java
trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java
trunk/src/main/org/jboss/messaging/ra/JBMResourceAdapter.java
trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivation.java
trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java
Log:
changed RA to use core and AS integration fixes
Modified: trunk/build-thirdparty.xml
===================================================================
--- trunk/build-thirdparty.xml 2009-03-02 11:54:04 UTC (rev 5954)
+++ trunk/build-thirdparty.xml 2009-03-02 12:00:26 UTC (rev 5955)
@@ -92,8 +92,8 @@
-->
<componentref name="jboss/integration" version="5.0.0.Beta4"/>
- <componentref name="jboss/jboss-security-spi" version="2.0.1.GA"/>
- <componentref name="jboss/jbosssx-client" version="2.0.1.GA"/>
+ <componentref name="jboss/jboss-security-spi" version="2.0.2-SNAPSHOT"/>
+ <componentref name="jboss/jbosssx-client" version="2.0.2-SNAPSHOT"/>
<componentref name="jboss/jboss-jaspi-api" version="1.0-BETA1"/>
<!--
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-03-02 11:54:04 UTC (rev 5954)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-03-02 12:00:26 UTC (rev 5955)
@@ -1891,7 +1891,7 @@
}
else
{
- theTx.commit();
+ theTx.commit(packet.isOnePhase());
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
}
Modified: trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java 2009-03-02 11:54:04 UTC (rev 5954)
+++ trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java 2009-03-02 12:00:26 UTC (rev 5955)
@@ -38,6 +38,8 @@
void commit() throws Exception;
+ void commit(boolean onePhase) throws Exception;
+
void rollback() throws Exception;
int getOperationsCount();
Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2009-03-02 11:54:04 UTC (rev 5954)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2009-03-02 12:00:26 UTC (rev 5955)
@@ -145,6 +145,11 @@
public void commit() throws Exception
{
+ commit(true);
+ }
+
+ public void commit(boolean onePhase) throws Exception
+ {
synchronized (timeoutLock)
{
if (state == State.ROLLBACK_ONLY)
@@ -159,9 +164,16 @@
return;
}
}
-
+
if (xid != null)
{
+ if (onePhase)
+ {
+ if(state == State.ACTIVE)
+ {
+ prepare();
+ }
+ }
if (state != State.PREPARED)
{
throw new IllegalStateException("Transaction is in invalid state " + state);
@@ -174,7 +186,7 @@
throw new IllegalStateException("Transaction is in invalid state " + state);
}
}
-
+
if (operations != null)
{
for (TransactionOperation operation : operations)
@@ -196,7 +208,7 @@
{
operation.afterCommit(this);
}
- }
+ }
}
}
Modified: trunk/src/main/org/jboss/messaging/integration/security/JBossASSecurityManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/security/JBossASSecurityManager.java 2009-03-02 11:54:04 UTC (rev 5954)
+++ trunk/src/main/org/jboss/messaging/integration/security/JBossASSecurityManager.java 2009-03-02 12:00:26 UTC (rev 5955)
@@ -24,6 +24,7 @@
import java.util.HashSet;
import java.util.Set;
+import java.security.Principal;
import javax.naming.InitialContext;
import javax.security.auth.Subject;
@@ -107,8 +108,8 @@
// security my be screwed up, on account of thread local security stack being corrupted.
if (authenticated)
{
- SecurityActions.pushSubjectContext(principal, passwordChars, subject);
- Set<SimplePrincipal> rolePrincipals = getRolePrincipals(checkType, roles);
+ SecurityActions.pushSubjectContext(principal, passwordChars, subject, securityDomainName);
+ Set<Principal> rolePrincipals = getRolePrincipals(checkType, roles);
authenticated = realmMapping.doesUserHaveRole(principal, rolePrincipals);
@@ -121,9 +122,9 @@
return authenticated;
}
- private Set<SimplePrincipal> getRolePrincipals(final CheckType checkType, final Set<Role> roles)
+ private Set<Principal> getRolePrincipals(final CheckType checkType, final Set<Role> roles)
{
- Set<SimplePrincipal> principals = new HashSet<SimplePrincipal>();
+ Set<Principal> principals = new HashSet<Principal>();
for (Role role : roles)
{
if ((checkType.equals(CheckType.CREATE) && role.isCheckType(CheckType.CREATE)) ||
Modified: trunk/src/main/org/jboss/messaging/integration/security/SecurityActions.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/security/SecurityActions.java 2009-03-02 11:54:04 UTC (rev 5954)
+++ trunk/src/main/org/jboss/messaging/integration/security/SecurityActions.java 2009-03-02 12:00:26 UTC (rev 5955)
@@ -29,6 +29,9 @@
import javax.security.auth.Subject;
import org.jboss.security.SecurityAssociation;
+import org.jboss.security.SecurityContextAssociation;
+import org.jboss.security.SecurityContextFactory;
+import org.jboss.security.SecurityContext;
/** A collection of privileged actions for this package
@@ -39,65 +42,101 @@
*/
class SecurityActions
{
- interface PrincipalInfoAction
- {
- PrincipalInfoAction PRIVILEGED = new PrincipalInfoAction()
- {
- public void push(final Principal principal, final Object credential,
- final Subject subject)
- {
- AccessController.doPrivileged(
- new PrivilegedAction()
- {
- public Object run()
- {
- SecurityAssociation.pushSubjectContext(subject, principal, credential);
- return null;
- }
- }
- );
- }
- public void pop()
- {
- AccessController.doPrivileged(
- new PrivilegedAction()
- {
- public Object run()
- {
- SecurityAssociation.popSubjectContext();
- return null;
- }
- }
- );
- }
- };
+ interface PrincipalInfoAction
+ {
+ PrincipalInfoAction PRIVILEGED = new PrincipalInfoAction()
+ {
+ public void push(final Principal principal, final Object credential,
+ final Subject subject, final String securityDomain)
+ {
+ AccessController.doPrivileged(
+ new PrivilegedAction<Object>()
+ {
+ public Object run()
+ {
+ //SecurityAssociation.pushSubjectContext(subject, principal, credential);
+ SecurityContext sc = SecurityContextAssociation.getSecurityContext();
+ if(sc == null)
+ {
+ try
+ {
+ sc = SecurityContextFactory.createSecurityContext(principal, credential,
+ subject, securityDomain);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ SecurityContextAssociation.setSecurityContext(sc);
+ return null;
+ }
+ }
+ );
+ }
+ public void pop()
+ {
+ AccessController.doPrivileged(
+ new PrivilegedAction<Object>()
+ {
+ public Object run()
+ {
+ //SecurityAssociation.popSubjectContext();
+ SecurityContextAssociation.clearSecurityContext();
+ return null;
+ }
+ }
+ );
+ }
+ };
- PrincipalInfoAction NON_PRIVILEGED = new PrincipalInfoAction()
- {
- public void push(final Principal principal, final Object credential, final Subject subject)
- {
- SecurityAssociation.pushSubjectContext(subject, principal, credential);
- }
- public void pop()
- {
- SecurityAssociation.popSubjectContext();
- }
- };
+ PrincipalInfoAction NON_PRIVILEGED = new PrincipalInfoAction()
+ {
+ public void push(Principal principal, Object credential, Subject subject,
+ String securityDomain)
+ {
+ //SecurityAssociation.pushSubjectContext(subject, principal, credential);
+ SecurityContext sc = SecurityContextAssociation.getSecurityContext();
+ if(sc == null)
+ {
+ try
+ {
+ sc = SecurityContextFactory.createSecurityContext(principal, credential,
+ subject, securityDomain);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ else
+ {
+ sc.getUtil().createSubjectInfo(principal, credential, subject);
+ }
+ SecurityContextAssociation.setSecurityContext(sc);
+ }
+ public void pop()
+ {
+ //SecurityAssociation.popSubjectContext();
+ SecurityContextAssociation.clearSecurityContext();
+ }
+ };
- void push(Principal principal, Object credential, Subject subject);
- void pop();
+
+ void push(Principal principal, Object credential, Subject subject, String securityDomain);
+ void pop();
}
static void pushSubjectContext(final Principal principal, final Object credential,
- final Subject subject)
+ final Subject subject, String securityDomainName)
{
if(System.getSecurityManager() == null)
{
- PrincipalInfoAction.NON_PRIVILEGED.push(principal, credential, subject);
+ PrincipalInfoAction.NON_PRIVILEGED.push(principal, credential, subject, securityDomainName);
}
else
{
- PrincipalInfoAction.PRIVILEGED.push(principal, credential, subject);
+ PrincipalInfoAction.PRIVILEGED.push(principal, credential, subject, securityDomainName);
}
}
Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java 2009-03-02 11:54:04 UTC (rev 5954)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java 2009-03-02 12:00:26 UTC (rev 5955)
@@ -553,58 +553,7 @@
{
if (sessionFactory == null)
{
- if (connectorConfigs != null)
- {
- sessionFactory = new ClientSessionFactoryImpl(connectorConfigs,
- connectionLoadBalancingPolicyClassName,
- pingPeriod,
- connectionTTL,
- callTimeout,
- consumerWindowSize,
- consumerMaxRate,
- sendWindowSize,
- producerMaxRate,
- minLargeMessageSize,
- blockOnAcknowledge,
- blockOnNonPersistentSend,
- blockOnPersistentSend,
- autoGroup,
- maxConnections,
- preAcknowledge,
- dupsOKBatchSize,
- retryInterval,
- retryIntervalMultiplier,
- maxRetriesBeforeFailover,
- maxRetriesAfterFailover);
- }
- else
- {
- sessionFactory = new ClientSessionFactoryImpl(discoveryGroupAddress,
- discoveryGroupPort,
- discoveryRefreshTimeout,
- discoveryInitialWaitTimeout,
- connectionLoadBalancingPolicyClassName,
- pingPeriod,
- connectionTTL,
- callTimeout,
- consumerWindowSize,
- consumerMaxRate,
- sendWindowSize,
- producerMaxRate,
- minLargeMessageSize,
- blockOnAcknowledge,
- blockOnNonPersistentSend,
- blockOnPersistentSend,
- autoGroup,
- maxConnections,
- preAcknowledge,
- dupsOKBatchSize,
- retryInterval,
- retryIntervalMultiplier,
- maxRetriesBeforeFailover,
- maxRetriesAfterFailover);
- }
-
+ createFactory();
}
}
catch (MessagingException me)
@@ -650,7 +599,71 @@
sessionFactory);
}
+ private void createFactory() throws MessagingException
+ {
+ if (connectorConfigs != null)
+ {
+ sessionFactory = new ClientSessionFactoryImpl(connectorConfigs,
+ connectionLoadBalancingPolicyClassName,
+ pingPeriod,
+ connectionTTL,
+ callTimeout,
+ consumerWindowSize,
+ consumerMaxRate,
+ sendWindowSize,
+ producerMaxRate,
+ minLargeMessageSize,
+ blockOnAcknowledge,
+ blockOnNonPersistentSend,
+ blockOnPersistentSend,
+ autoGroup,
+ maxConnections,
+ preAcknowledge,
+ dupsOKBatchSize,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetriesBeforeFailover,
+ maxRetriesAfterFailover);
+ }
+ else
+ {
+ sessionFactory = new ClientSessionFactoryImpl(discoveryGroupAddress,
+ discoveryGroupPort,
+ discoveryRefreshTimeout,
+ discoveryInitialWaitTimeout,
+ connectionLoadBalancingPolicyClassName,
+ pingPeriod,
+ connectionTTL,
+ callTimeout,
+ consumerWindowSize,
+ consumerMaxRate,
+ sendWindowSize,
+ producerMaxRate,
+ minLargeMessageSize,
+ blockOnAcknowledge,
+ blockOnNonPersistentSend,
+ blockOnPersistentSend,
+ autoGroup,
+ maxConnections,
+ preAcknowledge,
+ dupsOKBatchSize,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetriesBeforeFailover,
+ maxRetriesAfterFailover);
+ }
+ }
+
// Private --------------------------------------------------------------------------------------
// Inner classes --------------------------------------------------------------------------------
+
+ public ClientSessionFactory getCoreFactory() throws MessagingException
+ {
+ if (sessionFactory == null)
+ {
+ createFactory();
+ }
+ return sessionFactory;
+ }
}
Modified: trunk/src/main/org/jboss/messaging/ra/JBMResourceAdapter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/ra/JBMResourceAdapter.java 2009-03-02 11:54:04 UTC (rev 5954)
+++ trunk/src/main/org/jboss/messaging/ra/JBMResourceAdapter.java 2009-03-02 12:00:26 UTC (rev 5955)
@@ -21,14 +21,19 @@
*/
package org.jboss.messaging.ra;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.jms.client.JBossConnectionFactory;
+import org.jboss.messaging.jms.client.JBossSession;
import org.jboss.messaging.ra.inflow.JBMActivation;
import org.jboss.messaging.ra.inflow.JBMActivationSpec;
+import javax.jms.Session;
import javax.resource.ResourceException;
import javax.resource.spi.ActivationSpec;
import javax.resource.spi.BootstrapContext;
@@ -75,7 +80,7 @@
/**
* The JBoss connection factory
*/
- private JBossConnectionFactory factory;
+ private ClientSessionFactory sessionFactory;
/**
* Have the factory been configured
@@ -87,6 +92,8 @@
*/
private Map activations;
+ private JBossConnectionFactory jBossConnectionFactory;
+
/**
* Constructor
*/
@@ -98,7 +105,7 @@
}
raProperties = new JBMRAProperties();
- factory = null;
+ sessionFactory = null;
configured = new AtomicBoolean(false);
activations = new ConcurrentHashMap();
}
@@ -112,6 +119,17 @@
*/
public void endpointActivation(MessageEndpointFactory endpointFactory, ActivationSpec spec) throws ResourceException
{
+ if(!configured.getAndSet(true))
+ {
+ try
+ {
+ setup();
+ }
+ catch (MessagingException e)
+ {
+ throw new ResourceException("Unable to create activation", e);
+ }
+ }
if (trace)
{
log.trace("endpointActivation(" + endpointFactory + ", " + spec + ")");
@@ -1238,19 +1256,39 @@
return ctx.getWorkManager();
}
- /**
- * Get the JBoss connection factory
- *
- * @return The factory
- */
- public JBossConnectionFactory getJBossConnectionFactory()
+ public ClientSession createSession(int ackMode, String user, String pass, Boolean preAck, Integer dupsOkBatchSize, Integer transactionBatchSize, boolean deliveryTransacted) throws Exception
{
- if (!configured.get())
+
+ ClientSession result;
+
+ boolean actPreAck = preAck != null ? preAck : ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
+ int actDupsOkBatchSize = dupsOkBatchSize != null ? dupsOkBatchSize : ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE;
+ int actTxBatchSize = transactionBatchSize != null ? transactionBatchSize : ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE;
+ switch (ackMode)
{
- setup();
+ case Session.SESSION_TRANSACTED:
+ result = sessionFactory.createSession(user, pass, deliveryTransacted, false, false, actPreAck, actTxBatchSize);
+ break;
+ case Session.AUTO_ACKNOWLEDGE:
+ result = sessionFactory.createSession(user, pass, deliveryTransacted, true, false, actPreAck, actTxBatchSize);
+ break;
+ case Session.DUPS_OK_ACKNOWLEDGE:
+ result = sessionFactory.createSession(user, pass, deliveryTransacted, true, false, actPreAck, actDupsOkBatchSize);
+ break;
+ case Session.CLIENT_ACKNOWLEDGE:
+ result = sessionFactory.createSession(user, pass, deliveryTransacted, false, false, actPreAck, actTxBatchSize);
+ break;
+ case JBossSession.SERVER_ACKNOWLEDGE:
+ result = sessionFactory.createSession(user, pass, deliveryTransacted, false, true, actPreAck, actTxBatchSize);
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid ackmode: " + ackMode);
}
- return factory;
+ log.debug("Using queue connection " + result);
+
+ return result;
+
}
/**
@@ -1271,74 +1309,74 @@
/**
* Setup the factory
*/
- protected void setup()
+ protected void setup() throws MessagingException
{
if (getTransportType() != null)
{
TransportConfiguration transportConf = new TransportConfiguration(getTransportType(), getTransportConfiguration());
TransportConfiguration backup = getBackUpTransportType() == null ? null : new TransportConfiguration(getBackUpTransportType(), getBackupTransportConfiguration());
- factory = new JBossConnectionFactory(transportConf,
- backup,
- getLoadBalancingPolicyClassName() == null ? ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME : getLoadBalancingPolicyClassName(),
- getPingPeriod() == null ? ClientSessionFactoryImpl.DEFAULT_PING_PERIOD : getPingPeriod(),
- getConnectionTTL() == null ? ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL : getConnectionTTL(),
- getCallTimeout() == null ? ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT : getCallTimeout(),
- getClientID(),
- getDupsOKBatchSize() == null ? ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE : getDupsOKBatchSize(),
- getTransactionBatchSize() == null ? ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE : getTransactionBatchSize(),
- getConsumerWindowSize() == null ? ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE : getConsumerWindowSize(),
- getConsumerMaxRate() == null ? ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE : getConsumerMaxRate(),
- getSendWindowSize() == null ? ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE : getSendWindowSize(),
- getProducerMaxRate() == null ? ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE : getProducerMaxRate(),
- getMinLargeMessageSize() == null ? ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE : getMinLargeMessageSize(),
- getBlockOnAcknowledge() == null ? ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE : getBlockOnAcknowledge(),
- getBlockOnNonPersistentSend() == null ? ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND : getBlockOnNonPersistentSend(),
- getBlockOnPersistentSend() == null ? ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND : getBlockOnPersistentSend(),
- getAutoGroup() == null ? ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP : getAutoGroup(),
- getMaxConnections() == null ? ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS : getMaxConnections(),
- getPreAcknowledge() == null ? ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE : getPreAcknowledge(),
- getRetryInterval() == null ? ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL : getRetryInterval(),
- getRetryIntervalMultiplier() == null ? ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER : getRetryIntervalMultiplier(),
- getMaxRetriesBeforeFailover() == null ? ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER : getMaxRetriesBeforeFailover(),
- getMaxRetriesAfterFailover() == null ? ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER : getMaxRetriesAfterFailover()
+ jBossConnectionFactory = new JBossConnectionFactory(transportConf,
+ backup,
+ getLoadBalancingPolicyClassName() == null ? ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME : getLoadBalancingPolicyClassName(),
+ getPingPeriod() == null ? ClientSessionFactoryImpl.DEFAULT_PING_PERIOD : getPingPeriod(),
+ getConnectionTTL() == null ? ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL : getConnectionTTL(),
+ getCallTimeout() == null ? ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT : getCallTimeout(),
+ getClientID(),
+ getDupsOKBatchSize() == null ? ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE : getDupsOKBatchSize(),
+ getTransactionBatchSize() == null ? ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE : getTransactionBatchSize(),
+ getConsumerWindowSize() == null ? ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE : getConsumerWindowSize(),
+ getConsumerMaxRate() == null ? ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE : getConsumerMaxRate(),
+ getSendWindowSize() == null ? ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE : getSendWindowSize(),
+ getProducerMaxRate() == null ? ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE : getProducerMaxRate(),
+ getMinLargeMessageSize() == null ? ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE : getMinLargeMessageSize(),
+ getBlockOnAcknowledge() == null ? ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE : getBlockOnAcknowledge(),
+ getBlockOnNonPersistentSend() == null ? ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND : getBlockOnNonPersistentSend(),
+ getBlockOnPersistentSend() == null ? ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND : getBlockOnPersistentSend(),
+ getAutoGroup() == null ? ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP : getAutoGroup(),
+ getMaxConnections() == null ? ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS : getMaxConnections(),
+ getPreAcknowledge() == null ? ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE : getPreAcknowledge(),
+ getRetryInterval() == null ? ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL : getRetryInterval(),
+ getRetryIntervalMultiplier() == null ? ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER : getRetryIntervalMultiplier(),
+ getMaxRetriesBeforeFailover() == null ? ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER : getMaxRetriesBeforeFailover(),
+ getMaxRetriesAfterFailover() == null ? ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER : getMaxRetriesAfterFailover()
);
- configured.set(true);
}
else if (getDiscoveryGroupAddress() != null && getDiscoveryGroupPort() != null)
{
- factory = new JBossConnectionFactory(getDiscoveryGroupAddress(),
- getDiscoveryGroupPort(),
- getDiscoveryRefreshTimeout() == null ? ConfigurationImpl.DEFAULT_BROADCAST_REFRESH_TIMEOUT : getDiscoveryRefreshTimeout(),
- getDiscoveryInitialWaitTimeout() == null ? ClientSessionFactoryImpl.DEFAULT_DISCOVERY_INITIAL_WAIT : getDiscoveryInitialWaitTimeout(),
- getLoadBalancingPolicyClassName() == null ? ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME : getLoadBalancingPolicyClassName(),
- getPingPeriod() == null ? ClientSessionFactoryImpl.DEFAULT_PING_PERIOD : getPingPeriod(),
- getConnectionTTL() == null ? ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL : getConnectionTTL(),
- getCallTimeout() == null ? ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT : getCallTimeout(),
- getClientID(),
- getDupsOKBatchSize() == null ? ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE : getDupsOKBatchSize(),
- getTransactionBatchSize() == null ? ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE : getTransactionBatchSize(),
- getConsumerWindowSize() == null ? ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE : getConsumerWindowSize(),
- getConsumerMaxRate() == null ? ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE : getConsumerMaxRate(),
- getSendWindowSize() == null ? ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE : getSendWindowSize(),
- getProducerMaxRate() == null ? ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE : getProducerMaxRate(),
- getMinLargeMessageSize() == null ? ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE : getMinLargeMessageSize(),
- getBlockOnAcknowledge() == null ? ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE : getBlockOnAcknowledge(),
- getBlockOnNonPersistentSend() == null ? ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND : getBlockOnNonPersistentSend(),
- getBlockOnPersistentSend() == null ? ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND : getBlockOnPersistentSend(),
- getAutoGroup() == null ? ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP : getAutoGroup(),
- getMaxConnections() == null ? ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS : getMaxConnections(),
- getPreAcknowledge() == null ? ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE : getPreAcknowledge(),
- getRetryInterval() == null ? ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL : getRetryInterval(),
- getRetryIntervalMultiplier() == null ? ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER : getRetryIntervalMultiplier(),
- getMaxRetriesBeforeFailover() == null ? ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER : getMaxRetriesBeforeFailover(),
- getMaxRetriesAfterFailover() == null ? ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER : getMaxRetriesAfterFailover()
+ jBossConnectionFactory = new JBossConnectionFactory(getDiscoveryGroupAddress(),
+ getDiscoveryGroupPort(),
+ getDiscoveryRefreshTimeout() == null ? ConfigurationImpl.DEFAULT_BROADCAST_REFRESH_TIMEOUT : getDiscoveryRefreshTimeout(),
+ getDiscoveryInitialWaitTimeout() == null ? ClientSessionFactoryImpl.DEFAULT_DISCOVERY_INITIAL_WAIT : getDiscoveryInitialWaitTimeout(),
+ getLoadBalancingPolicyClassName() == null ? ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME : getLoadBalancingPolicyClassName(),
+ getPingPeriod() == null ? ClientSessionFactoryImpl.DEFAULT_PING_PERIOD : getPingPeriod(),
+ getConnectionTTL() == null ? ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL : getConnectionTTL(),
+ getCallTimeout() == null ? ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT : getCallTimeout(),
+ getClientID(),
+ getDupsOKBatchSize() == null ? ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE : getDupsOKBatchSize(),
+ getTransactionBatchSize() == null ? ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE : getTransactionBatchSize(),
+ getConsumerWindowSize() == null ? ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE : getConsumerWindowSize(),
+ getConsumerMaxRate() == null ? ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE : getConsumerMaxRate(),
+ getSendWindowSize() == null ? ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE : getSendWindowSize(),
+ getProducerMaxRate() == null ? ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE : getProducerMaxRate(),
+ getMinLargeMessageSize() == null ? ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE : getMinLargeMessageSize(),
+ getBlockOnAcknowledge() == null ? ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE : getBlockOnAcknowledge(),
+ getBlockOnNonPersistentSend() == null ? ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND : getBlockOnNonPersistentSend(),
+ getBlockOnPersistentSend() == null ? ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND : getBlockOnPersistentSend(),
+ getAutoGroup() == null ? ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP : getAutoGroup(),
+ getMaxConnections() == null ? ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS : getMaxConnections(),
+ getPreAcknowledge() == null ? ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE : getPreAcknowledge(),
+ getRetryInterval() == null ? ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL : getRetryInterval(),
+ getRetryIntervalMultiplier() == null ? ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER : getRetryIntervalMultiplier(),
+ getMaxRetriesBeforeFailover() == null ? ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER : getMaxRetriesBeforeFailover(),
+ getMaxRetriesAfterFailover() == null ? ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER : getMaxRetriesAfterFailover()
);
- configured.set(true);
}
else
{
log.fatal("must provide either TransportTyoe or DiscoveryGroupAddress and DiscoveryGroupPort for JBM ResourceAdapter");
}
+
+ sessionFactory = jBossConnectionFactory.getCoreFactory();
}
@@ -1381,4 +1419,9 @@
}
return val;
}
+
+ public JBossConnectionFactory getJBossConnectionFactory()
+ {
+ return jBossConnectionFactory;
+ }
}
Modified: trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivation.java
===================================================================
--- trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivation.java 2009-03-02 11:54:04 UTC (rev 5954)
+++ trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivation.java 2009-03-02 12:00:26 UTC (rev 5955)
@@ -21,27 +21,25 @@
*/
package org.jboss.messaging.ra.inflow;
-import org.jboss.messaging.jms.client.JBossConnectionFactory;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.FailureListener;
+import org.jboss.messaging.jms.client.JBossSession;
+import org.jboss.messaging.jms.JBossDestination;
import org.jboss.messaging.ra.JBMResourceAdapter;
import org.jboss.messaging.ra.Util;
-import org.jboss.messaging.ra.JBMMessageListener;
-import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.utils.SimpleString;
+import org.jboss.tm.TransactionManagerLocator;
-import java.lang.reflect.Method;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.List;
-import java.util.ArrayList;
-
-import javax.jms.Connection;
import javax.jms.Destination;
-import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
-import javax.jms.QueueConnection;
+import javax.jms.Session;
import javax.jms.Topic;
-import javax.jms.TopicConnection;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.resource.ResourceException;
@@ -49,59 +47,76 @@
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkManager;
import javax.transaction.TransactionManager;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
-import org.jboss.tm.TransactionManagerLocator;
-
/**
* The activation.
- *
+ *
* @author <a href="adrian at jboss.com">Adrian Brock</a>
* @author <a href="jesper.pedersen at jboss.org">Jesper Pedersen</a>
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
* @version $Revision: $
*/
-public class JBMActivation implements ExceptionListener
+public class JBMActivation implements FailureListener
{
- /** The logger */
+ /**
+ * The logger
+ */
private static final Logger log = Logger.getLogger(JBMActivation.class);
- /** Trace enabled */
+ /**
+ * Trace enabled
+ */
private static boolean trace = log.isTraceEnabled();
-
- /** The onMessage method */
- public static final Method ONMESSAGE;
-
- /** The resource adapter */
+
+ /**
+ * The onMessage method
+ */
+ public static final Method ONMESSAGE;
+
+ /**
+ * The resource adapter
+ */
protected JBMResourceAdapter ra;
-
- /** The activation spec */
+
+ /**
+ * The activation spec
+ */
protected JBMActivationSpec spec;
- /** The message endpoint factory */
+ /**
+ * The message endpoint factory
+ */
protected MessageEndpointFactory endpointFactory;
-
- /** Whether delivery is active */
+
+ /**
+ * Whether delivery is active
+ */
protected AtomicBoolean deliveryActive = new AtomicBoolean(false);
- /** Whether we are in the failure recovery loop */
+ /**
+ * Whether we are in the failure recovery loop
+ */
private AtomicBoolean inFailure = new AtomicBoolean(false);
- /** The destination */
- protected Destination destination;
-
- /** The destination type */
+ /**
+ * The destination type
+ */
protected boolean isTopic = false;
-
- /** The connection */
- protected Connection connection;
-
- /** Is the delivery transacted */
+
+ /**
+ * Is the delivery transacted
+ */
protected boolean isDeliveryTransacted;
-
- /** The message handler pool */
- //protected JBMMessageHandlerPool pool;
- /** The TransactionManager */
+ JBossDestination destination;
+
+ /**
+ * The TransactionManager
+ */
protected TransactionManager tm;
private List<JBMMessageHandler> handlers = new ArrayList<JBMMessageHandler>();
@@ -110,7 +125,7 @@
{
try
{
- ONMESSAGE = MessageListener.class.getMethod("onMessage", new Class[] { Message.class });
+ ONMESSAGE = MessageListener.class.getMethod("onMessage", new Class[]{Message.class});
}
catch (Exception e)
{
@@ -120,15 +135,18 @@
/**
* Constructor
- * @param ra The resource adapter
+ *
+ * @param ra The resource adapter
* @param endpointFactory The endpoint factory
- * @param spec The activation spec
- * @exception ResourceException Thrown if an error occurs
+ * @param spec The activation spec
+ * @throws ResourceException Thrown if an error occurs
*/
public JBMActivation(JBMResourceAdapter ra, MessageEndpointFactory endpointFactory, JBMActivationSpec spec) throws ResourceException
{
if (trace)
+ {
log.trace("constructor(" + ra + ", " + endpointFactory + ", " + spec + ")");
+ }
this.ra = ra;
this.endpointFactory = endpointFactory;
@@ -145,113 +163,119 @@
/**
* Get the activation spec
+ *
* @return The value
*/
public JBMActivationSpec getActivationSpec()
{
if (trace)
+ {
log.trace("getActivationSpec()");
+ }
return spec;
}
/**
* Get the message endpoint factory
+ *
* @return The value
*/
public MessageEndpointFactory getMessageEndpointFactory()
{
if (trace)
+ {
log.trace("getMessageEndpointFactory()");
+ }
return endpointFactory;
}
/**
* Get whether delivery is transacted
+ *
* @return The value
*/
public boolean isDeliveryTransacted()
{
if (trace)
+ {
log.trace("isDeliveryTransacted()");
+ }
return isDeliveryTransacted;
}
/**
* Get the work manager
+ *
* @return The value
*/
public WorkManager getWorkManager()
{
if (trace)
+ {
log.trace("getWorkManager()");
+ }
return ra.getWorkManager();
}
-
+
/**
* Get the transaction manager
+ *
* @return The value
*/
public TransactionManager getTransactionManager()
{
if (trace)
+ {
log.trace("getTransactionManager()");
+ }
if (tm == null)
+ {
tm = TransactionManagerLocator.locateTransactionManager();
+ }
return tm;
}
/**
- * Get the connection
- * @return The value
- */
- public Connection getConnection()
- {
- if (trace)
- log.trace("getConnection()");
-
- return connection;
- }
-
- /**
- * Get the destination
- * @return The value
- */
- public Destination getDestination()
- {
- if (trace)
- log.trace("getDestination()");
-
- return destination;
- }
-
- /**
* Is the destination a topic
+ *
* @return The value
*/
public boolean isTopic()
{
if (trace)
+ {
log.trace("isTopic()");
+ }
return isTopic;
}
-
+
/**
* Start the activation
+ *
* @throws ResourceException Thrown if an error occurs
*/
public void start() throws ResourceException
{
if (trace)
+ {
log.trace("start()");
+ }
- ra.getWorkManager().scheduleWork(new SetupActivation());
+ try
+ {
+ setup();
+ }
+ catch (Exception e)
+ {
+ throw new ResourceException("unable to start Activation", e);
+ }
deliveryActive.set(true);
}
@@ -261,7 +285,9 @@
public void stop()
{
if (trace)
+ {
log.trace("stop()");
+ }
deliveryActive.set(false);
teardown();
@@ -269,16 +295,19 @@
/**
* Handles any failure by trying to reconnect
+ *
* @param failure The reason for the failure
*/
public void handleFailure(Throwable failure)
{
log.warn("Failure in jms activation " + spec, failure);
int reconnectCount = 0;
-
+
// Only enter the failure loop once
if (inFailure.getAndSet(true))
+ {
return;
+ }
try
{
@@ -289,7 +318,9 @@
try
{
if (spec.getReconnectIntervalMillis() > 0)
+ {
Thread.sleep(spec.getReconnectIntervalMillis());
+ }
}
catch (InterruptedException e)
{
@@ -301,7 +332,7 @@
try
{
setup();
- log.info("Reconnected with messaging provider.");
+ log.info("Reconnected with messaging provider.");
break;
}
catch (Throwable t)
@@ -320,12 +351,15 @@
/**
* On exception
+ *
* @param exception The reason for the failure
*/
public void onException(JMSException exception)
{
if (trace)
+ {
log.trace("onException(" + exception + ")");
+ }
handleFailure(exception);
}
@@ -333,35 +367,26 @@
/**
* Setup the activation
+ *
* @throws Exception Thrown if an error occurs
*/
protected void setup() throws Exception
{
log.debug("Setting up " + spec);
-
- Context ctx = new InitialContext();
- log.debug("Using context " + ctx.getEnvironment() + " for " + spec);
- try
+
+ setupDestination();
+ for (int i = 0; i < spec.getMaxSessionInt(); i++)
{
- setupDestination(ctx);
- setupConnection(ctx);
- }
- finally
- {
- if (ctx != null)
- ctx.close();
- }
- for(int i = 0; i < spec.getMaxSessionInt(); i++)
- {
- JBMMessageHandler handler = new JBMMessageHandler(this);
+ ClientSession session = setupSession(spec.getUser(), spec.getPassword(), spec.getClientId());
+ JBMMessageHandler handler = new JBMMessageHandler(this, session);
handler.setup();
+ session.start();
handlers.add(handler);
}
- connection.start();
log.debug("Setup complete " + this);
}
-
+
/**
* Teardown the activation
*/
@@ -373,128 +398,30 @@
{
handler.teardown();
}
- teardownConnection();
- teardownDestination();
log.debug("Tearing down complete " + this);
}
-
- /**
- * Setup the destination
- * @param ctx The naming context
- * @throws Exception Thrown if an error occurs
- */
- protected void setupDestination(Context ctx) throws Exception
- {
- if (trace)
- log.trace("setupDestination(" + ctx + ")");
- String destinationName = spec.getDestination();
- String destinationTypeString = spec.getDestinationType();
- if (destinationTypeString != null && !destinationTypeString.trim().equals(""))
- {
- log.debug("Destination type defined as " + destinationTypeString);
- Class<?> destinationType;
- if (Topic.class.getName().equals(destinationTypeString))
- {
- destinationType = Topic.class;
- isTopic = true;
- }
- else
- {
- destinationType = Queue.class;
- }
-
- log.debug("Retrieving destination " + destinationName + " of type " + destinationType.getName());
- destination = (Destination) Util.lookup(ctx, destinationName, destinationType);
- }
- else
- {
- log.debug("Destination type not defined");
- log.debug("Retrieving destination " + destinationName + " of type " + Destination.class.getName());
-
- destination = (Destination) Util.lookup(ctx, destinationName, Destination.class);
- if (destination instanceof Topic)
- {
- isTopic = true;
- }
- }
-
- log.debug("Got destination " + destination + " from " + destinationName);
- }
-
/**
- * Teardown the destination
- */
- protected void teardownDestination()
- {
- if (trace)
- log.trace("teardownDestination()");
-
- destination = null;
- }
-
- /**
- * Setup the Connection
- * @param ctx the naming context
- * @throws Exception for any error
- */
- protected void setupConnection(Context ctx) throws Exception
- {
- log.debug("Setup connection " + this);
-
- String user = spec.getUser();
- String pass = spec.getPassword();
- String clientID = spec.getClientId();
-
- if (isTopic)
- connection = setupTopicConnection(ctx, user, pass, clientID);
- else
- connection = setupQueueConnection(ctx, user, pass, clientID);
-
- log.debug("Established connection " + this);
- }
-
- /**
- * Setup a queue connection
- * @param ctx The naming context
- * @param user The user
- * @param pass The password
+ * Setup a session
+ *
+ * @param user The user
+ * @param pass The password
* @param clientID The client id
* @return The connection
* @throws Exception Thrown if an error occurs
*/
- protected QueueConnection setupQueueConnection(Context ctx, String user, String pass, String clientID) throws Exception
+ protected ClientSession setupSession(String user, String pass, String clientID) throws Exception
{
- if (trace)
- log.trace("setupQueueConnection(" + ctx + ", " + user + ", ****, " + clientID + ")");
+ ClientSession result = null;
- QueueConnection result = null;
-
- JBossConnectionFactory jcf = ra.getJBossConnectionFactory();
-
- if (isDeliveryTransacted)
- {
- if (user != null)
- result = jcf.createXAQueueConnection(user, pass);
- else
- result = jcf.createXAQueueConnection();
- }
- else
- {
- if (user != null)
- result = jcf.createQueueConnection(user, pass);
- else
- result = jcf.createQueueConnection();
- }
try
{
- if (clientID != null)
- result.setClientID(clientID);
+ result = ra.createSession(spec.getAcknowledgeModeInt(), user, pass, ra.getPreAcknowledge(), ra.getDupsOKBatchSize(), ra.getTransactionBatchSize(), isDeliveryTransacted);
- result.setExceptionListener(this);
+ result.addFailureListener(this);
log.debug("Using queue connection " + result);
@@ -505,176 +432,73 @@
try
{
if (result != null)
+ {
result.close();
+ }
}
catch (Exception e)
{
log.trace("Ignored error closing connection", e);
}
if (t instanceof Exception)
+ {
throw (Exception) t;
+ }
throw new RuntimeException("Error configuring connection", t);
}
}
-
- /**
- * Setup a topic connection
- * @param ctx The naming context
- * @param user The user
- * @param pass The password
- * @param clientID The client id
- * @return The connection
- * @throws Exception Thrown if an error occurs
- */
- protected TopicConnection setupTopicConnection(Context ctx, String user, String pass, String clientID) throws Exception
+
+ public SimpleString getAddress()
{
+ return destination.getSimpleAddress();
+ }
+
+ protected void setupDestination() throws Exception
+ {
+ Context ctx = new InitialContext();
+ log.debug("Using context " + ctx.getEnvironment() + " for " + spec);
if (trace)
- log.trace("setupTopicConnection(" + ctx + ", " + user + ", ****, " + clientID + ")");
+ log.trace("setupDestination(" + ctx + ")");
- TopicConnection result = null;
+ String destinationName = spec.getDestination();
- JBossConnectionFactory jcf = ra.getJBossConnectionFactory();
-
- if (isDeliveryTransacted)
+ String destinationTypeString = spec.getDestinationType();
+ if (destinationTypeString != null && !destinationTypeString.trim().equals(""))
{
- if (user != null)
- result = jcf.createXATopicConnection(user, pass);
- else
- result = jcf.createXATopicConnection();
- }
- else
- {
- if (user != null)
- result = jcf.createTopicConnection(user, pass);
- else
- result = jcf.createTopicConnection();
- }
- try
- {
- if (clientID != null)
- result.setClientID(clientID);
+ log.debug("Destination type defined as " + destinationTypeString);
- result.setExceptionListener(this);
-
- log.debug("Using topic connection " + result);
-
- return result;
- }
- catch (Throwable t)
- {
- try
+ Class<?> destinationType;
+ if (Topic.class.getName().equals(destinationTypeString))
{
- if (result != null)
- result.close();
+ destinationType = Topic.class;
+ isTopic = true;
}
- catch (Exception e)
+ else
{
- log.trace("Ignored error closing connection", e);
+ destinationType = Queue.class;
}
- if (t instanceof Exception)
- throw (Exception) t;
- throw new RuntimeException("Error configuring connection", t);
- }
- }
-
- /**
- * Teardown the connection
- */
- protected void teardownConnection()
- {
- if (trace)
- log.trace("teardownConnection()");
- try
- {
- if (connection != null)
- {
- log.debug("Closing the " + connection);
- connection.close();
- }
+ log.debug("Retrieving destination " + destinationName + " of type " + destinationType.getName());
+ destination = (JBossDestination) Util.lookup(ctx, destinationName, destinationType);
}
- catch (Throwable t)
+ else
{
- log.debug("Error closing the connection " + connection, t);
- }
- connection = null;
- }
-
- /**
- * Setup the pool
- * @throws Exception for any error
- */
- /*protected void setupPool() throws Exception
- {
- pool = new JBMMessageHandlerPool(this);
- log.debug("Created pool " + pool);
+ log.debug("Destination type not defined");
+ log.debug("Retrieving destination " + destinationName + " of type " + Destination.class.getName());
- log.debug("Starting pool " + pool);
- pool.start();
- log.debug("Started pool " + pool);
-
- log.debug("Starting delivery " + connection);
- connection.start();
- log.debug("Started delivery " + connection);
- }*/
-
- /**
- * Teardown the pool
- */
- /*protected void teardownPool()
- {
- try
- {
- if (connection != null)
+ destination = (JBossDestination) Util.lookup(ctx, destinationName, Destination.class);
+ if (destination instanceof Topic)
{
- log.debug("Stopping delivery " + connection);
- connection.stop();
+ isTopic = true;
}
}
- catch (Throwable t)
- {
- log.debug("Error stopping delivery " + connection, t);
- }
- try
- {
- if (pool != null)
- {
- log.debug("Stopping the pool " + pool);
- pool.stop();
- }
- }
- catch (Throwable t)
- {
- log.debug("Error clearing the pool " + pool, t);
- }
- pool = null;
- }*/
-
- /**
- * Handles the setup
- */
- private class SetupActivation implements Work
- {
- public void run()
- {
- try
- {
- setup();
- }
- catch (Throwable t)
- {
- handleFailure(t);
- }
- }
-
- public void release()
- {
- }
+ log.debug("Got destination " + destination + " from " + destinationName);
}
/**
* Get a string representation
+ *
* @return The value
*/
public String toString()
@@ -684,14 +508,32 @@
buffer.append("spec=").append(spec.getClass().getName());
buffer.append(" mepf=").append(endpointFactory.getClass().getName());
buffer.append(" active=").append(deliveryActive.get());
- if (destination != null)
- buffer.append(" destination=").append(destination);
- if (connection != null)
- buffer.append(" connection=").append(connection);
+ if (spec.getDestination() != null)
+ {
+ buffer.append(" destination=").append(spec.getDestination());
+ }
+ /*if (session != null)
+ {
+ buffer.append(" connection=").append(session);
+ }*/
//if (pool != null)
- //buffer.append(" pool=").append(pool.getClass().getName());
+ //buffer.append(" pool=").append(pool.getClass().getName());
buffer.append(" transacted=").append(isDeliveryTransacted);
buffer.append(')');
return buffer.toString();
}
+
+ public boolean connectionFailed(MessagingException me)
+ {
+ if (trace)
+ {
+ log.trace("onException(" + me + ")");
+ }
+
+ handleFailure(me);
+ return true;
+ }
}
+
+
+
Modified: trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java 2009-03-02 11:54:04 UTC (rev 5954)
+++ trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java 2009-03-02 12:00:26 UTC (rev 5955)
@@ -21,23 +21,28 @@
*/
package org.jboss.messaging.ra.inflow;
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.jboss.messaging.jms.JBossTopic;
+import org.jboss.messaging.jms.client.JBossMessage;
+import org.jboss.messaging.utils.SimpleString;
+import javax.jms.InvalidClientIDException;
import javax.jms.JMSException;
+import javax.jms.MessageListener;
import javax.jms.Session;
-import javax.jms.XASession;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Connection;
-import javax.jms.XAConnection;
-import javax.jms.Topic;
-import javax.jms.Message;
+import javax.resource.spi.endpoint.MessageEndpoint;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.transaction.Status;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
-import javax.transaction.Status;
-import javax.transaction.xa.XAResource;
-import javax.resource.spi.endpoint.MessageEndpoint;
-import javax.resource.spi.endpoint.MessageEndpointFactory;
+import java.util.UUID;
+
/**
* The message handler
*
@@ -46,95 +51,128 @@
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
* @version $Revision: $
*/
-public class JBMMessageHandler implements MessageListener
+public class JBMMessageHandler implements MessageHandler
{
- /** The logger */
+ /**
+ * The logger
+ */
private static final Logger log = Logger.getLogger(JBMMessageHandler.class);
- /** Trace enabled */
- private static boolean trace = log.isTraceEnabled();
+ /**
+ * Trace enabled
+ */
+ private static boolean trace = log.isTraceEnabled();
- /** The session */
- private Session session;
+ /**
+ * The session
+ */
+ private final ClientSession session;
- /** Any XA session */
- private XASession xaSession;
-
- /** The endpoint */
+ /**
+ * The endpoint
+ */
private MessageEndpoint endpoint;
private final JBMActivation activation;
- /** The transaction demarcation strategy factory */
+ /**
+ * The transaction demarcation strategy factory
+ */
private DemarcationStrategyFactory strategyFactory = new DemarcationStrategyFactory();
- public JBMMessageHandler(JBMActivation activation)
+ public JBMMessageHandler(JBMActivation activation, ClientSession session)
{
this.activation = activation;
+ this.session = session;
}
public void setup() throws Exception
{
if (trace)
+ {
log.trace("setup()");
+ }
JBMActivationSpec spec = activation.getActivationSpec();
String selector = spec.getMessageSelector();
- Connection connection = activation.getConnection();
-
- // Create the session
- if (activation.isDeliveryTransacted())
- {
- xaSession = ((XAConnection)connection).createXASession();
- session = xaSession.getSession();
- }
- else
- {
- boolean transacted = spec.isSessionTransacted();
- int acknowledge = spec.getAcknowledgeModeInt();
- session = connection.createSession(transacted, acknowledge);
- }
-
// Create the message consumer
- MessageConsumer messageConsumer;
+ ClientConsumer consumer;
+ SimpleString selectorString = selector == null || selector.trim().equals("") ? null : new SimpleString(selector);
if (activation.isTopic() && spec.isSubscriptionDurable())
{
- Topic topic = (Topic) activation.getDestination();
String subscriptionName = spec.getSubscriptionName();
- if (selector == null || selector.trim().equals(""))
+ // Durable sub
+
+ if (activation.getActivationSpec().getClientId() == null)
{
- messageConsumer = session.createDurableSubscriber(topic, subscriptionName);
+ throw new InvalidClientIDException("Cannot create durable subscription - client ID has not been set");
}
+
+ SimpleString queueName = new SimpleString(JBossTopic.createQueueNameForDurableSubscription(activation.getActivationSpec().getClientId(),
+ subscriptionName));
+
+ SessionQueueQueryResponseMessage subResponse = session.queueQuery(queueName);
+
+ if (!subResponse.isExists())
+ {
+ session.createQueue(activation.getAddress(), queueName, selectorString, true, false);
+ }
else
{
- messageConsumer = session.createDurableSubscriber(topic, subscriptionName, selector, false);
+ // Already exists
+ if (subResponse.getConsumerCount() > 0)
+ {
+ throw new javax.jms.IllegalStateException("Cannot create a subscriber on the durable subscription since it already has subscriber(s)");
+ }
+
+ SimpleString oldFilterString = subResponse.getFilterString();
+
+ boolean selectorChanged = (selector == null && oldFilterString != null) || (oldFilterString == null && selector != null) ||
+ (oldFilterString != null && selector != null && !oldFilterString.equals(selector));
+
+ SimpleString oldTopicName = subResponse.getAddress();
+
+ boolean topicChanged = !oldTopicName.equals(activation.getAddress());
+
+ if (selectorChanged || topicChanged)
+ {
+ // Delete the old durable sub
+ session.deleteQueue(queueName);
+
+ // Create the new one
+ session.createQueue(activation.getAddress(), queueName, selectorString, true, false);
+ }
}
+ consumer = session.createConsumer(queueName, null, false);
}
else
{
- if (selector == null || selector.trim().equals(""))
+ SimpleString queueName;
+ if (activation.isTopic())
{
- messageConsumer = session.createConsumer(activation.getDestination());
+ queueName = new SimpleString(UUID.randomUUID().toString());
+ session.createQueue(activation.getAddress(), queueName, selectorString, false, false);
}
else
{
- messageConsumer = session.createConsumer(activation.getDestination(), selector);
+ queueName = activation.getAddress();
}
+ consumer = session.createConsumer(queueName, selectorString);
}
// Create the endpoint
MessageEndpointFactory endpointFactory = activation.getMessageEndpointFactory();
- XAResource xaResource = null;
-
- if (activation.isDeliveryTransacted() && xaSession != null)
- xaResource = xaSession.getXAResource();
-
- endpoint = endpointFactory.createEndpoint(xaResource);
-
- // Set the message listener
- messageConsumer.setMessageListener(this);
+ if (activation.isDeliveryTransacted())
+ {
+ endpoint = endpointFactory.createEndpoint(session);
+ }
+ else
+ {
+ endpoint = endpointFactory.createEndpoint(null);
+ }
+ consumer.setMessageHandler(this);
}
/**
@@ -143,12 +181,16 @@
public void teardown()
{
if (trace)
+ {
log.trace("teardown()");
+ }
try
{
if (endpoint != null)
+ {
endpoint.release();
+ }
}
catch (Throwable t)
{
@@ -157,18 +199,10 @@
try
{
- if (xaSession != null)
- xaSession.close();
- }
- catch (Throwable t)
- {
- log.debug("Error releasing xaSession " + xaSession, t);
- }
-
- try
- {
if (session != null)
+ {
session.close();
+ }
}
catch (Throwable t)
{
@@ -176,11 +210,7 @@
}
}
- /**
- * On message
- * @param message The message
- */
- public void onMessage(Message message)
+ public void onMessage(ClientMessage message)
{
if (trace)
log.trace("onMessage(" + message + ")");
@@ -195,14 +225,39 @@
log.warn("Unable to create transaction: " + throwable.getMessage());
txnStrategy = null;
}
+
+ JBossMessage jbm = JBossMessage.createMessage(message, session);
+
try
{
- endpoint.beforeDelivery(JBMActivation.ONMESSAGE);
+ jbm.doBeforeReceive();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to prepare message for receipt", e);
+ return;
+ }
+
+ if (activation.getActivationSpec().getAcknowledgeModeInt() == Session.SESSION_TRANSACTED ||
+ activation.getActivationSpec().getAcknowledgeModeInt() == Session.CLIENT_ACKNOWLEDGE)
+ {
try
{
+ message.acknowledge();
+ }
+ catch (MessagingException e)
+ {
+ log.error("Failed to process message", e);
+ }
+ }
+ try
+ {
+ endpoint.beforeDelivery(JBMActivation.ONMESSAGE);
+ try
+ {
MessageListener listener = (MessageListener) endpoint;
- listener.onMessage(message);
+ listener.onMessage(jbm);
}
finally
{
@@ -214,13 +269,18 @@
log.error("Unexpected error delivering message " + message, t);
if (txnStrategy != null)
+ {
txnStrategy.error();
+ }
}
finally
{
if (txnStrategy != null)
+ {
txnStrategy.end();
+ }
}
+
}
/**
@@ -230,12 +290,15 @@
{
/**
* Get the transaction demarcation strategy
+ *
* @return The strategy
*/
TransactionDemarcationStrategy getStrategy()
{
if (trace)
+ {
log.trace("getStrategy()");
+ }
if (activation.isDeliveryTransacted())
{
@@ -266,6 +329,7 @@
* Start
*/
void start() throws Throwable;
+
/**
* Error
*/
@@ -285,6 +349,7 @@
/*
* Start
*/
+
public void start()
{
}
@@ -295,7 +360,9 @@
public void error()
{
if (trace)
+ {
log.trace("error()");
+ }
final JBMActivationSpec spec = activation.getActivationSpec();
@@ -319,7 +386,8 @@
{
session.rollback();
}
- } catch (JMSException e)
+ }
+ catch (MessagingException e)
{
log.error("Failed to rollback session transaction", e);
}
@@ -333,7 +401,9 @@
public void end()
{
if (trace)
+ {
log.trace("error()");
+ }
final JBMActivationSpec spec = activation.getActivationSpec();
@@ -344,7 +414,8 @@
try
{
session.commit();
- } catch (JMSException e)
+ }
+ catch (MessagingException e)
{
log.error("Failed to commit session transaction", e);
}
@@ -359,6 +430,7 @@
private class XATransactionDemarcationStrategy implements TransactionDemarcationStrategy
{
private Transaction trans = null;
+
private TransactionManager tm = activation.getTransactionManager();
public void start() throws Throwable
@@ -368,7 +440,9 @@
if (timeout > 0)
{
if (trace)
+ {
log.trace("Setting transactionTimeout for JMSSessionPool to " + timeout);
+ }
tm.setTransactionTimeout(timeout);
}
@@ -380,19 +454,19 @@
trans = tm.getTransaction();
if (trace)
+ {
log.trace(this + " using tx=" + trans);
+ }
- if (xaSession != null)
+ if (!trans.enlistResource(session))
{
- XAResource res = xaSession.getXAResource();
+ throw new JMSException("could not enlist resource");
+ }
+ if (trace)
+ {
+ log.trace(this + " XAResource '" + session + " enlisted.");
+ }
- if (!trans.enlistResource(res))
- {
- throw new JMSException("could not enlist resource");
- }
- if (trace)
- log.trace(this + " XAResource '" + res + " enlisted.");
- }
}
catch (Throwable t)
{
@@ -414,7 +488,9 @@
try
{
if (trace)
+ {
log.trace(this + " using TM to mark TX for rollback tx=" + trans);
+ }
trans.setRollbackOnly();
}
@@ -431,24 +507,21 @@
// Use the TM to commit the Tx (assert the correct association)
Transaction currentTx = tm.getTransaction();
if (trans.equals(currentTx) == false)
+ {
throw new IllegalStateException("Wrong tx association: expected " + trans + " was " + currentTx);
+ }
// Marked rollback
if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK)
{
if (trace)
+ {
log.trace(this + " rolling back JMS transaction tx=" + trans);
+ }
// Actually roll it back
tm.rollback();
- // NO XASession? then manually rollback.
- // This is not so good but
- // it's the best we can do if we have no XASession.
- if (xaSession == null && activation.isDeliveryTransacted())
- {
- session.rollback();
- }
}
else if (trans.getStatus() == Status.STATUS_ACTIVE)
{
@@ -457,28 +530,19 @@
// a) everything goes well
// b) app. exception was thrown
if (trace)
+ {
log.trace(this + " commiting the JMS transaction tx=" + trans);
+ }
tm.commit();
- // NO XASession? then manually commit. This is not so good but
- // it's the best we can do if we have no XASession.
- if (xaSession == null && activation.isDeliveryTransacted())
- {
- session.commit();
- }
-
}
else
{
tm.suspend();
-
- if (xaSession == null && activation.isDeliveryTransacted())
- {
- session.rollback();
- }
}
- } catch (Throwable t)
+ }
+ catch (Throwable t)
{
log.error(this + " failed to commit/rollback", t);
}
More information about the jboss-cvs-commits
mailing list