[jboss-svn-commits] JBL Code SVN: r20698 - in labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product: rosetta/src/org/jboss/internal/soa/esb/dependencies and 3 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Mon Jun 23 07:52:17 EDT 2008
Author: kevin.conner at jboss.com
Date: 2008-06-23 07:52:16 -0400 (Mon, 23 Jun 2008)
New Revision: 20698
Modified:
labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/dependencies/H2Database.java
labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/remoting/HttpUnmarshaller.java
labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java
labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java
labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/services/jbossesb/src/main/resources/META-INF/deployment.xml
Log:
Merged third spin of CP3
Modified: labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java 2008-06-23 11:44:58 UTC (rev 20697)
+++ labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java 2008-06-23 11:52:16 UTC (rev 20698)
@@ -28,12 +28,12 @@
import java.util.Properties;
import javax.jms.DeliveryMode;
+import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
-import javax.jms.Topic;
import javax.naming.Context;
import javax.naming.NamingException;
import javax.xml.parsers.ParserConfigurationException;
@@ -98,26 +98,28 @@
public void cleanup() {
synchronized(this) {
- if (_messageProducer != null) {
- try {
- _messageProducer.close();
- } catch (Exception e) {
- _logger.debug(e.getMessage(), e);
- } finally {
- _messageProducer = null;
- closeSession();
+ try {
+ if (_messageProducer != null) {
+ try {
+ _messageProducer.close();
+ } catch (Exception e) {
+ _logger.debug(e.getMessage(), e);
+ } finally {
+ _messageProducer = null;
+ }
}
- }
- if (_messageConsumer != null) {
- try {
- _messageConsumer.close();
- } catch (JMSException e) {
- _logger.debug(e.getMessage(), e);
- } finally {
- _messageConsumer = null;
- closeSession();
+ if (_messageConsumer != null) {
+ try {
+ _messageConsumer.close();
+ } catch (JMSException e) {
+ _logger.debug(e.getMessage(), e);
+ } finally {
+ _messageConsumer = null;
+ }
}
+ } finally {
+ closeSession() ;
}
}
} // ________________________________
@@ -311,48 +313,46 @@
} // ________________________________
private void createMessageProducer() throws CourierException, NamingContextException {
- Context oJndiCtx = null;
-
synchronized(this) {
if (_messageProducer == null) {
try {
- oJndiCtx = NamingContextPool.getNamingContext(_epr.getJndiEnvironment());
-
- String sType = _epr.getDestinationType();
- if (JMSEpr.QUEUE_TYPE.equals(sType)) {
- Session qSess = getJmsSession(_epr.getAcknowledgeMode());
- javax.jms.Queue queue = null;
- try {
- queue = (javax.jms.Queue) oJndiCtx.lookup(_epr
- .getDestinationName());
- } catch (NamingException ne) {
+ final Session session = getJmsSession(_epr.getAcknowledgeMode());
+ Destination destination = null ;
+ final String destinationName = _epr.getDestinationName() ;
+ Context oJndiCtx = NamingContextPool.getNamingContext(_epr.getJndiEnvironment());
+ try {
+ String sType = _epr.getDestinationType();
+ if (JMSEpr.QUEUE_TYPE.equals(sType)) {
try {
- oJndiCtx = NamingContextPool.replaceNamingContext(oJndiCtx, _epr.getJndiEnvironment());
- queue = (javax.jms.Queue) oJndiCtx.lookup(_epr
- .getDestinationName());
- } catch (NamingException nex) {
- //ActiveMQ
- queue = qSess.createQueue(_epr.getDestinationName());
+ destination = (Destination) oJndiCtx.lookup(destinationName);
+ } catch (NamingException ne) {
+ try {
+ oJndiCtx = NamingContextPool.replaceNamingContext(oJndiCtx, _epr.getJndiEnvironment());
+ destination = (Destination) oJndiCtx.lookup(destinationName);
+ } catch (NamingException nex) {
+ //ActiveMQ
+ destination = session.createQueue(destinationName);
+ }
}
+ } else if (JMSEpr.TOPIC_TYPE.equals(sType)) {
+ try {
+ destination = (Destination) oJndiCtx.lookup(destinationName);
+ }
+ catch (NamingException ne) {
+ destination = session.createTopic(destinationName);
+ }
+ } else {
+ throw new CourierException("Unknown destination type");
}
- _messageProducer = qSess.createProducer(queue);
- } else if (JMSEpr.TOPIC_TYPE.equals(sType)) {
- Session tSess = getJmsSession(_epr.getAcknowledgeMode());
- Topic topic = null;
- try {
- topic = (Topic) oJndiCtx.lookup(_epr
- .getDestinationName());
+ _messageProducer = session.createProducer(destination);
+ _messageProducer.setDeliveryMode(_epr.getPersistent()?DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT);
+ if ( _logger.isDebugEnabled() )
+ _logger.debug("JMSCourier deliveryMode: " + _messageProducer.getDeliveryMode() + ", peristent:" + _epr.getPersistent());
+ } finally {
+ if (oJndiCtx != null) {
+ NamingContextPool.releaseNamingContext(oJndiCtx) ;
}
- catch (NamingException ne) {
- topic = tSess.createTopic(_epr.getDestinationName());
- }
- _messageProducer = tSess.createProducer(topic);
- } else {
- throw new CourierException("Unknown destination type");
}
- _messageProducer.setDeliveryMode(_epr.getPersistent()?DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT);
- if ( _logger.isDebugEnabled() )
- _logger.debug("JMSCourier deliveryMode: " + _messageProducer.getDeliveryMode() + ", peristent:" + _epr.getPersistent());
}
catch (JMSException ex) {
_logger.debug("Error from JMS system.", ex);
@@ -361,10 +361,6 @@
}
catch (URISyntaxException ex) {
throw new CourierException(ex);
- } finally {
- if (oJndiCtx != null) {
- NamingContextPool.releaseNamingContext(oJndiCtx) ;
- }
}
}
}
@@ -491,42 +487,39 @@
boolean success = false;
try {
Properties environment = _epr.getJndiEnvironment();
+ final Session session = getJmsSession(_epr.getAcknowledgeMode());
+ Destination destination = null ;
+ final String destinationName = _epr.getDestinationName() ;
oJndiCtx = NamingContextPool.getNamingContext(environment);
try
{
String sType = _epr.getDestinationType();
if (JMSEpr.QUEUE_TYPE.equals(sType)) {
- Session qSess = getJmsSession(_epr.getAcknowledgeMode());
- javax.jms.Queue queue = null;
try {
- queue = (javax.jms.Queue) oJndiCtx.lookup(_epr
- .getDestinationName());
+ destination = (Destination) oJndiCtx.lookup(destinationName);
} catch (NamingException ne) {
try {
oJndiCtx = NamingContextPool.replaceNamingContext(oJndiCtx, environment);
- queue = (javax.jms.Queue) oJndiCtx.lookup(_epr
- .getDestinationName());
+ destination = (Destination) oJndiCtx.lookup(destinationName);
} catch (NamingException nex) {
//ActiveMQ
- queue = qSess.createQueue(_epr.getDestinationName());
+ destination = session.createQueue(destinationName);
}
}
- _messageConsumer = qSess.createConsumer(queue, _epr.getMessageSelector());
} else if (JMSEpr.TOPIC_TYPE.equals(sType)) {
- Session tSess = getJmsSession(_epr.getAcknowledgeMode());
- Topic topic = null;
try {
- topic = (Topic) oJndiCtx.lookup(_epr
- .getDestinationName());
+ destination = (Destination) oJndiCtx.lookup(destinationName);
}
catch (NamingException ne) {
- topic = tSess.createTopic(_epr.getDestinationName());
+ destination = session.createTopic(destinationName);
}
- _messageConsumer = tSess.createConsumer(topic, _epr
- .getMessageSelector());
} else {
throw new CourierException("Unknown destination type");
}
+ if (destination == null) {
+ throw new CourierException("Could not locate destination: " + destinationName);
+ }
+ _messageConsumer = session.createConsumer(destination, _epr.getMessageSelector());
success = true;
} finally {
NamingContextPool.releaseNamingContext(oJndiCtx) ;
Modified: labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/dependencies/H2Database.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/dependencies/H2Database.java 2008-06-23 11:44:58 UTC (rev 20697)
+++ labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/dependencies/H2Database.java 2008-06-23 11:52:16 UTC (rev 20698)
@@ -427,7 +427,7 @@
{
log.debug( "Starting remote h2 db with port : " + port );
final String[] args = new String[] {
- "-baseDir ", dbPath.toURI().toString(),
+ "-baseDir", dbPath.getAbsolutePath(),
"-tcpPort", String.valueOf(port),
"-tcpAllowOthers","" }; // need the extra empty string or a exception is thrown by H2
final Server server = Server.createTcpServer(args) ;
@@ -507,7 +507,7 @@
final Server server = getRemoteServer() ;
if (server != null)
{
- server.shutdown() ;
+ server.stop() ;
}
}
Modified: labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/remoting/HttpUnmarshaller.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/remoting/HttpUnmarshaller.java 2008-06-23 11:44:58 UTC (rev 20697)
+++ labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/remoting/HttpUnmarshaller.java 2008-06-23 11:52:16 UTC (rev 20698)
@@ -101,10 +101,6 @@
int amtRead = inputStream.read(byteBuffer);
while (amtRead > 0) {
byteOutputStream.write(byteBuffer, pointer, amtRead);
- if (amtRead < bufferSize && byteOutputStream.size() >= contentLength) {
- //done reading, so process
- break;
- }
amtRead = inputStream.read(byteBuffer);
}
Modified: labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java 2008-06-23 11:44:58 UTC (rev 20697)
+++ labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java 2008-06-23 11:52:16 UTC (rev 20698)
@@ -24,6 +24,7 @@
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
@@ -121,6 +122,11 @@
private boolean terminated ;
/**
+ * The pool instance id.
+ */
+ private long id ;
+
+ /**
* Contructor of the pool.
*
*/
@@ -161,9 +167,9 @@
{
final JmsSession session ;
if (transacted) {
- session = new JmsXASession(JmsConnectionPool.this, ((XAConnection)jmsConnection).createXASession());
+ session = new JmsXASession(JmsConnectionPool.this, ((XAConnection)jmsConnection).createXASession(), id);
} else {
- session = new JmsSession(jmsConnection.createSession(transacted, acknowledgeMode));
+ session = new JmsSession(jmsConnection.createSession(transacted, acknowledgeMode), id);
}
return session ;
}
@@ -303,19 +309,22 @@
*/
synchronized void handleCloseSession(final JmsSession session)
{
- final int mode ;
- try {
- mode = session.getAcknowledgeMode() ;
- } catch (final JMSException jmse) {
- logger.warn("JMSException while calling getAcknowledgeMode") ;
- logger.debug("JMSException while calling getAcknowledgeMode", jmse) ;
- return ;
+ if (session.getId() == id)
+ {
+ final int mode ;
+ try {
+ mode = session.getAcknowledgeMode() ;
+ } catch (final JMSException jmse) {
+ logger.warn("JMSException while calling getAcknowledgeMode") ;
+ logger.debug("JMSException while calling getAcknowledgeMode", jmse) ;
+ return ;
+ }
+
+ final ArrayList<JmsSession> sessions = (freeSessionsMap == null ? null : freeSessionsMap.get(mode));
+ if (sessions != null) {
+ sessions.add(session) ;
+ }
}
-
- final ArrayList<JmsSession> sessions = (freeSessionsMap == null ? null : freeSessionsMap.get(mode));
- if (sessions != null) {
- sessions.add(session) ;
- }
handleReleaseSession(session) ;
}
@@ -366,6 +375,42 @@
}
/**
+ * This method is called when the pool needs to cleaned. It closes all open sessions
+ * and the connection.
+ */
+ private void cleanSessionPool()
+ {
+ final Connection connection ;
+ synchronized(this)
+ {
+ if (terminated)
+ {
+ return ;
+ }
+ id++ ;
+ for (List<JmsSession> list : freeSessionsMap.values())
+ {
+ list.clear() ;
+ }
+ for (List<JmsSession> list : inUseSessionsMap.values())
+ {
+ list.clear() ;
+ }
+ transactionsToSessions.clear() ;
+ sessionsToTransactions.clear() ;
+
+ logger.debug("Cleared the session pool now closing the connection to the factory.");
+ connection = jmsConnection ;
+ jmsConnection = null ;
+ }
+ if (connection!=null) {
+ try {
+ connection.close();
+ } catch (final Exception ex) {} // ignore
+ }
+ }
+
+ /**
* This method is called when the pool needs to destroyed. It closes all open sessions
* and the connection and removes it from the container's poolMap.
*/
@@ -481,7 +526,7 @@
jmsConnection.setExceptionListener(new ExceptionListener() {
public void onException(JMSException arg0)
{
- removeSessionPool() ;
+ cleanSessionPool() ;
}
}) ;
jmsConnection.start();
Modified: labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java 2008-06-23 11:44:58 UTC (rev 20697)
+++ labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java 2008-06-23 11:52:16 UTC (rev 20698)
@@ -52,6 +52,10 @@
* The session delegate.
*/
private final Session session ;
+ /**
+ * The pool instance id.
+ */
+ private final long id ;
/**
* The set of active queue browsers.
@@ -69,15 +73,22 @@
/**
* Create the session wrapper.
* @param session The session delegate.
+ * @param id The pool instance id.
* @param isJTA True if this tales part in a JTA transaction
* @throws JMSException
*/
- JmsSession(final Session session)
+ JmsSession(final Session session, final long id)
throws JMSException
{
+ this.id = id ;
this.session = session ;
}
+ public long getId()
+ {
+ return id ;
+ }
+
public void close() throws JMSException
{
session.close();
Modified: labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java 2008-06-23 11:44:58 UTC (rev 20697)
+++ labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java 2008-06-23 11:52:16 UTC (rev 20698)
@@ -72,13 +72,13 @@
* Create the session wrapper.
* @param pool The current connection pool
* @param session The session delegate.
- * @param isJTA True if this tales part in a JTA transaction
+ * @param id The pool instance id.
* @throws JMSException
*/
- JmsXASession(final JmsConnectionPool pool, final XASession session)
+ JmsXASession(final JmsConnectionPool pool, final XASession session, final long id)
throws JMSException
{
- super(session) ;
+ super(session, id) ;
this.pool = pool ;
this.session = session ;
}
Modified: labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/services/jbossesb/src/main/resources/META-INF/deployment.xml
===================================================================
--- labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/services/jbossesb/src/main/resources/META-INF/deployment.xml 2008-06-23 11:44:58 UTC (rev 20697)
+++ labs/jbossesb/workspace/platform/JBESB_4_2_1_SOA_4_2/product/services/jbossesb/src/main/resources/META-INF/deployment.xml 2008-06-23 11:52:16 UTC (rev 20698)
@@ -3,4 +3,7 @@
<depends>jboss.jca:service=DataSourceBinding,name=JBossESBDS</depends>
<depends>jboss.esb.destination:service=Queue,name=DeadMessageQueue</depends>
<depends>jboss.esb:service=MessageStoreDatabaseInitializer</depends>
+ <depends>jboss.esb.destination:service=Queue,name=DataCollectorQueue</depends>
+ <depends>jboss.esb.destination:service=Queue,name=OperationsCollectorQueue</depends>
+ <depends>jboss.esb.destination:service=Queue,name=OperationsResultCollectorQueue</depends>
</jbossesb-deployment>
More information about the jboss-svn-commits
mailing list