[jboss-svn-commits] JBL Code SVN: r15053 - in labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb: client and 1 other directory.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Wed Sep 12 10:42:22 EDT 2007
Author: tfennelly
Date: 2007-09-12 10:42:22 -0400 (Wed, 12 Sep 2007)
New Revision: 15053
Modified:
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java
Log:
Remove use of ThreadLocal in MessageDeliveryAdapter: http://jira.jboss.com/jira/browse/JBESB-628
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java 2007-09-12 13:53:04 UTC (rev 15052)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java 2007-09-12 14:42:22 UTC (rev 15053)
@@ -326,10 +326,13 @@
private void close() {
try {
- if (jmsProducer!=null) jmsProducer.close();
- pool.closeSession(jmsSession);
+ if (jmsProducer!=null) {
+ jmsProducer.close();
+ }
} catch (Exception e) {
logger.error("Unable to close JMS Queue Setup.", e);
+ } finally {
+ pool.closeSession(jmsSession);
}
}
}
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java 2007-09-12 13:53:04 UTC (rev 15052)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java 2007-09-12 14:42:22 UTC (rev 15053)
@@ -81,10 +81,6 @@
*/
private ServiceClusterInfo serviceClusterInfo;
/**
- * Synchronous courier "pickup" deliver timeout.
- */
- private ThreadLocal<Long> syncPickupDeliveryTimeout = new ThreadLocal<Long>();
- /**
*
*/
private Date expirationDate;
@@ -154,9 +150,8 @@
*/
public Message deliverSync(Message message, long timeoutMillis) throws MessageDeliverException, RegistryException, FaultMessageException {
AssertArgument.isNotNull(message, "message");
- syncPickupDeliveryTimeout.set(timeoutMillis);
try {
- message = post(message, true);
+ message = post(message, new EPRInvoker(timeoutMillis));
} catch (MessageDeliverException mde) {
if ("true".equalsIgnoreCase(Configuration.getRedeliveryDlsOn())
&& !service.equals(dlqService)) {
@@ -182,7 +177,7 @@
AssertArgument.isNotNull(message, "message");
// Not interested in a reply
try {
- post(message, false);
+ post(message, new EPRInvoker());
} catch (MessageDeliverException mde) {
if (message.getProperties().getProperty(IS_REDELIVERY)==null
&& "true".equalsIgnoreCase(Configuration.getRedeliveryDlsOn())
@@ -223,13 +218,13 @@
/**
* Deliver the supplied message to the target service associated with this invoker instance.
*
- * @param message The message to be delivered.
- * @param synchronous Is the message to be delivered synchronously or not (asynchronously).
+ * @param message The message to be delivered.
+ * @param eprInvoker The EPRInvoker to be used (sync or async).
* @return Returns the message (or a reply message if synchronous) if the message was delivered
* without error, otherwise an exception is thrown.
* @throws MessageDeliverException Failed to deliver message, after trying all available EPRs.
*/
- private Message post(Message message, boolean synchronous) throws MessageDeliverException, FaultMessageException {
+ private Message post(Message message, EPRInvoker eprInvoker) throws MessageDeliverException, FaultMessageException {
boolean staleEPRCache = true;
//We are removing dead EPRs from the serviceClusterInfo. *Previous* deliveries maybe have
//removed EPRs that have now come back to life. We should try once more to pull a fresh list of EPRS
@@ -247,7 +242,7 @@
EPR epr;
// Iterate over all the EPRs in the list until delivered
while ((epr = loadBalancer.chooseEPR(serviceClusterInfo)) != null) {
- replyMessage = attemptDelivery(message, epr, synchronous);
+ replyMessage = eprInvoker.attemptDelivery(message, epr);
if (replyMessage != null) {
// We've delivered it, we're done!
return replyMessage;
@@ -290,86 +285,6 @@
}
/**
- * Attempt to deliver the supplied message using the supplied EPR.
- *
- * @param message The message to be delivered.
- * @param epr The EPR to be used in the delivery attempt.
- * @param synchronous Is the message to be delivered synchronously or not (asynchronously).
- * @return Returns the message (or a reply message if synchronous) if the message was delivered
- * without error, otherwise null.
- */
- private Message attemptDelivery(Message message, EPR epr, boolean synchronous) throws FaultMessageException {
- TwoWayCourier courier = null;
-
- // Get a courier for the EPR...
- try {
- courier = getCourier(epr);
- } catch (CourierException e) {
- logger.debug("Courier lookup failed for EPR [" + epr + "] for Service [" + service + "].", e);
- } catch (MalformedEPRException e) {
- logger.warn("Badly formed EPR [" + epr + "] for Service [" + service + "]. " + e.getMessage());
- } catch (Throwable t) {
- logger.warn("Unexpected exception during Courier lookup for EPR [" + epr + "] for Service [" + service + "].", t);
- }
-
- // Try delivering the message using the courier we just looked up....
-
- if (courier != null) {
- // make sure the message header does not change when we exit
-
- EPR currentEpr = message.getHeader().getCall().getTo();
-
- try {
- EPR replyToEPR = message.getHeader().getCall().getReplyTo();
-
- message.getHeader().getCall().setTo(epr);
-
- if (synchronous) {
- if (replyToEPR == null)
- replyToEPR = getReplyToAddress(epr);
-
- if (replyToEPR == null) {
- logger.debug("Not using epr [" + epr + "] for Service [" + service + "]. No reply-to address available for synchronous response.");
- return null;
- }
- message.getHeader().getCall().setReplyTo(replyToEPR);
- }
- if (courier.deliver(message)) {
- if (synchronous) {
- // JBESB-1016 replyToEPR has to be non-null or we'd have dropped out by this point!
-
- // do we need to do this for synchronous calls? Vagueries of Couriers?
-
- courier.setReplyToEpr(replyToEPR);
- return courier.pickup(syncPickupDeliveryTimeout.get());
- } else {
- return message;
- }
- }
- } catch (FaultMessageException e) {
- throw e;
- } catch (CourierException e) {
- logger.debug("Badly formed EPR [" + epr + "] for Service [" + service + "]. " + e.getMessage());
- } catch (MalformedEPRException e) {
- // Hmmmm???... Can this really happen? The Courier has already been created. Haven't we already validated the EPR during the Courier lookup (above)??
- logger.warn("Unexpected error. Badly formed EPR [" + epr + "] for Service [" + service + "]. But the EPR has already been validated!!");
- } catch (Throwable t) {
- logger.warn("Unexpected exception during attempted message delivery over Courier for EPR [" + epr + "] for Service [" + service + "].", t);
- } finally {
- // TODO: So does this mean that Couriers are stateful? If so, do we need to synchronize on using them??
- CourierUtil.cleanCourier(courier);
-
- // put back the old To since we will have changed it.
-
- if (currentEpr != null)
- message.getHeader().getCall().setTo(currentEpr);
- }
- }
-
- return null;
- }
-
- /**
* Get the reply to address for synchronous delivery.
*
* @param toEpr The to address.
@@ -421,4 +336,104 @@
expirationDate = new Date(java.lang.System.currentTimeMillis() + registryCacheLife);
}
+ private class EPRInvoker {
+
+ private boolean synchronous = false;
+ private long timeout;
+
+ /**
+ * Create an asynchronous EPRInvoker instance.
+ */
+ private EPRInvoker() {
+ synchronous = false;
+ }
+
+ /**
+ * Create a synchronous EPRInvoker instance with the specified timeout.
+ */
+ private EPRInvoker(long timeout) {
+ synchronous = true;
+ this.timeout = timeout;
+ }
+
+
+ /**
+ * Attempt to deliver the supplied message using the supplied EPR.
+ *
+ * @param message The message to be delivered.
+ * @param epr The EPR to be used in the delivery attempt.
+ * @return Returns the message (or a reply message if synchronous) if the message was delivered
+ * without error, otherwise null.
+ */
+ private Message attemptDelivery(Message message, EPR epr) throws FaultMessageException {
+ TwoWayCourier courier = null;
+
+ // Get a courier for the EPR...
+ try {
+ courier = getCourier(epr);
+ } catch (CourierException e) {
+ logger.debug("Courier lookup failed for EPR [" + epr + "] for Service [" + service + "].", e);
+ } catch (MalformedEPRException e) {
+ logger.warn("Badly formed EPR [" + epr + "] for Service [" + service + "]. " + e.getMessage());
+ } catch (Throwable t) {
+ logger.warn("Unexpected exception during Courier lookup for EPR [" + epr + "] for Service [" + service + "].", t);
+ }
+
+ // Try delivering the message using the courier we just looked up....
+
+ if (courier != null) {
+ // make sure the message header does not change when we exit
+
+ EPR currentEpr = message.getHeader().getCall().getTo();
+
+ try {
+ EPR replyToEPR = message.getHeader().getCall().getReplyTo();
+
+ message.getHeader().getCall().setTo(epr);
+
+ if (synchronous) {
+ if (replyToEPR == null)
+ replyToEPR = getReplyToAddress(epr);
+
+ if (replyToEPR == null) {
+ logger.debug("Not using epr [" + epr + "] for Service [" + service + "]. No reply-to address available for synchronous response.");
+ return null;
+ }
+ message.getHeader().getCall().setReplyTo(replyToEPR);
+ }
+ if (courier.deliver(message)) {
+ if (synchronous) {
+ // JBESB-1016 replyToEPR has to be non-null or we'd have dropped out by this point!
+
+ // do we need to do this for synchronous calls? Vagueries of Couriers?
+
+ courier.setReplyToEpr(replyToEPR);
+ return courier.pickup(timeout);
+ } else {
+ return message;
+ }
+ }
+ } catch (FaultMessageException e) {
+ throw e;
+ } catch (CourierException e) {
+ logger.debug("Badly formed EPR [" + epr + "] for Service [" + service + "]. " + e.getMessage());
+ } catch (MalformedEPRException e) {
+ // Hmmmm???... Can this really happen? The Courier has already been created. Haven't we already validated the EPR during the Courier lookup (above)??
+ logger.warn("Unexpected error. Badly formed EPR [" + epr + "] for Service [" + service + "]. But the EPR has already been validated!!");
+ } catch (Throwable t) {
+ logger.warn("Unexpected exception during attempted message delivery over Courier for EPR [" + epr + "] for Service [" + service + "].", t);
+ } finally {
+ // TODO: So does this mean that Couriers are stateful? If so, do we need to synchronize on using them??
+ CourierUtil.cleanCourier(courier);
+
+ // put back the old To since we will have changed it.
+
+ if (currentEpr != null)
+ message.getHeader().getCall().setTo(currentEpr);
+ }
+ }
+
+ return null;
+ }
+ }
}
More information about the jboss-svn-commits
mailing list