[jboss-svn-commits] JBL Code SVN: r15053 - in labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb: client and 1 other directory.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Wed Sep 12 10:42:22 EDT 2007


Author: tfennelly
Date: 2007-09-12 10:42:22 -0400 (Wed, 12 Sep 2007)
New Revision: 15053

Modified:
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java
Log:
Remove use of ThreadLocal in MessageDeliveryAdapter: http://jira.jboss.com/jira/browse/JBESB-628

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java	2007-09-12 13:53:04 UTC (rev 15052)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/routing/JMSRouter.java	2007-09-12 14:42:22 UTC (rev 15053)
@@ -326,10 +326,13 @@
         
         private void close() {
             try {
-                if (jmsProducer!=null) jmsProducer.close();
-                pool.closeSession(jmsSession);
+                if (jmsProducer!=null) {
+                    jmsProducer.close();                    
+                }
             } catch (Exception e) {
                 logger.error("Unable to close JMS Queue Setup.", e);
+            } finally {
+                pool.closeSession(jmsSession);
             }
         }
     }

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java	2007-09-12 13:53:04 UTC (rev 15052)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java	2007-09-12 14:42:22 UTC (rev 15053)
@@ -81,10 +81,6 @@
      */
     private ServiceClusterInfo serviceClusterInfo;
     /**
-     * Synchronous courier "pickup" deliver timeout.
-     */
-    private ThreadLocal<Long> syncPickupDeliveryTimeout = new ThreadLocal<Long>();
-    /**
      * 
      */
     private Date expirationDate;
@@ -154,9 +150,8 @@
      */
     public Message deliverSync(Message message, long timeoutMillis) throws MessageDeliverException, RegistryException, FaultMessageException {
         AssertArgument.isNotNull(message, "message");
-        syncPickupDeliveryTimeout.set(timeoutMillis);
         try {
-            message = post(message, true);
+            message = post(message, new EPRInvoker(timeoutMillis));
         } catch (MessageDeliverException mde) {
             if ("true".equalsIgnoreCase(Configuration.getRedeliveryDlsOn())
                     && !service.equals(dlqService)) {
@@ -182,7 +177,7 @@
         AssertArgument.isNotNull(message, "message");
         // Not interested in a reply
         try {
-            post(message, false);
+            post(message, new EPRInvoker());
         } catch (MessageDeliverException mde) {
             if (message.getProperties().getProperty(IS_REDELIVERY)==null
                     && "true".equalsIgnoreCase(Configuration.getRedeliveryDlsOn())
@@ -223,13 +218,13 @@
     /**
      * Deliver the supplied message to the target service associated with this invoker instance.
      *
-     * @param message     The message to be delivered.
-     * @param synchronous Is the message to be delivered synchronously or not (asynchronously).
+     * @param message  The message to be delivered.
+     * @param eprInvoker The EPRInvoker to be used (sync or async).
      * @return Returns the message (or a reply message if synchronous) if the message was delivered
      *         without error, otherwise an exception is thrown.
      * @throws MessageDeliverException Failed to deliver message, after trying all available EPRs.
      */
-    private Message post(Message message, boolean synchronous) throws MessageDeliverException, FaultMessageException {
+    private Message post(Message message, EPRInvoker eprInvoker) throws MessageDeliverException, FaultMessageException {
         boolean staleEPRCache = true;
         //We are removing dead EPRs from the serviceClusterInfo. *Previous* deliveries maybe have
         //removed EPRs that have now come back to life. We should try once more to pull a fresh list of EPRS
@@ -247,7 +242,7 @@
             EPR epr;
             // Iterate over all the EPRs in the list until delivered
             while ((epr = loadBalancer.chooseEPR(serviceClusterInfo)) != null) {
-                replyMessage = attemptDelivery(message, epr, synchronous);
+                replyMessage = eprInvoker.attemptDelivery(message, epr);
                 if (replyMessage != null) {
                     // We've delivered it, we're done!
                     return replyMessage;
@@ -290,86 +285,6 @@
     }
 
     /**
-     * Attempt to deliver the supplied message using the supplied EPR.
-     *
-     * @param message     The message to be delivered.
-     * @param epr         The EPR to be used in the delivery attempt.
-     * @param synchronous Is the message to be delivered synchronously or not (asynchronously).
-     * @return Returns the message (or a reply message if synchronous) if the message was delivered
-     *         without error, otherwise null.
-     */
-    private Message attemptDelivery(Message message, EPR epr, boolean synchronous) throws FaultMessageException {
-        TwoWayCourier courier = null;
-
-        // Get a courier for the EPR...
-        try {
-            courier = getCourier(epr);
-        } catch (CourierException e) {
-            logger.debug("Courier lookup failed for EPR [" + epr + "] for Service [" + service + "].", e);
-        } catch (MalformedEPRException e) {
-            logger.warn("Badly formed EPR [" + epr + "] for Service [" + service + "]. " + e.getMessage());
-        } catch (Throwable t) {
-            logger.warn("Unexpected exception during Courier lookup for EPR [" + epr + "] for Service [" + service + "].", t);
-        }
-        
-        // Try delivering the message using the courier we just looked up....
-        
-        if (courier != null) {
-            // make sure the message header does not change when we exit
-            
-            EPR currentEpr = message.getHeader().getCall().getTo();
-            
-            try {
-                EPR replyToEPR = message.getHeader().getCall().getReplyTo();
-
-                message.getHeader().getCall().setTo(epr);
-                
-                if (synchronous) {
-                    if (replyToEPR == null)
-                        replyToEPR = getReplyToAddress(epr);
-
-                    if (replyToEPR == null) {
-                        logger.debug("Not using epr [" + epr + "] for Service [" + service + "]. No reply-to address available for synchronous response.");
-                        return null;
-                    }
-                    message.getHeader().getCall().setReplyTo(replyToEPR);
-                }
-                if (courier.deliver(message)) {
-                    if (synchronous) {
-                    	// JBESB-1016 replyToEPR has to be non-null or we'd have dropped out by this point!
-                    	
-                        // do we need to do this for synchronous calls? Vagueries of Couriers?
-
-                        courier.setReplyToEpr(replyToEPR);
-                        return courier.pickup(syncPickupDeliveryTimeout.get());
-                    } else {
-                        return message;
-                    }
-                }
-            } catch (FaultMessageException e) {
-                throw e;
-            } catch (CourierException e) {
-                logger.debug("Badly formed EPR [" + epr + "] for Service [" + service + "]. " + e.getMessage());
-            } catch (MalformedEPRException e) {
-                // Hmmmm???... Can this really happen?  The Courier has already been created.  Haven't we already validated the EPR during the Courier lookup (above)??
-                logger.warn("Unexpected error.  Badly formed EPR [" + epr + "] for Service [" + service + "]. But the EPR has already been validated!!");
-            } catch (Throwable t) {
-                logger.warn("Unexpected exception during attempted message delivery over Courier for EPR [" + epr + "] for Service [" + service + "].", t);
-            } finally {
-                // TODO: So does this mean that Couriers are stateful?  If so, do we need to synchronize on using them??
-                CourierUtil.cleanCourier(courier);
-                
-                // put back the old To since we will have changed it.
-                
-                if (currentEpr != null)
-                	message.getHeader().getCall().setTo(currentEpr);
-            }
-        }
-
-        return null;
-    }
-
-    /**
      * Get the reply to address for synchronous delivery.
      *
      * @param toEpr The to address.
@@ -421,4 +336,104 @@
         expirationDate = new Date(java.lang.System.currentTimeMillis() + registryCacheLife);
     }
 
+    private class EPRInvoker {
+
+        private boolean synchronous = false;
+        private long timeout;
+
+        /**
+         * Create an asynchronous EPRInvoker instance.
+         */
+        private EPRInvoker() {
+            synchronous = false;
+        }
+
+        /**
+         * Create a synchronous EPRInvoker instance with the specified timeout.
+         */
+        private EPRInvoker(long timeout) {
+            synchronous = true;
+            this.timeout = timeout;
+        }
+
+
+        /**
+         * Attempt to deliver the supplied message using the supplied EPR.
+         *
+         * @param message     The message to be delivered.
+         * @param epr         The EPR to be used in the delivery attempt.
+         * @return Returns the message (or a reply message if synchronous) if the message was delivered
+         *         without error, otherwise null.
+         */
+        private Message attemptDelivery(Message message, EPR epr) throws FaultMessageException {
+            TwoWayCourier courier = null;
+
+            // Get a courier for the EPR...
+            try {
+                courier = getCourier(epr);
+            } catch (CourierException e) {
+                logger.debug("Courier lookup failed for EPR [" + epr + "] for Service [" + service + "].", e);
+            } catch (MalformedEPRException e) {
+                logger.warn("Badly formed EPR [" + epr + "] for Service [" + service + "]. " + e.getMessage());
+            } catch (Throwable t) {
+                logger.warn("Unexpected exception during Courier lookup for EPR [" + epr + "] for Service [" + service + "].", t);
+            }
+
+            // Try delivering the message using the courier we just looked up....
+
+            if (courier != null) {
+                // make sure the message header does not change when we exit
+
+                EPR currentEpr = message.getHeader().getCall().getTo();
+
+                try {
+                    EPR replyToEPR = message.getHeader().getCall().getReplyTo();
+
+                    message.getHeader().getCall().setTo(epr);
+
+                    if (synchronous) {
+                        if (replyToEPR == null)
+                            replyToEPR = getReplyToAddress(epr);
+
+                        if (replyToEPR == null) {
+                            logger.debug("Not using epr [" + epr + "] for Service [" + service + "]. No reply-to address available for synchronous response.");
+                            return null;
+                        }
+                        message.getHeader().getCall().setReplyTo(replyToEPR);
+                    }
+                    if (courier.deliver(message)) {
+                        if (synchronous) {
+                            // JBESB-1016 replyToEPR has to be non-null or we'd have dropped out by this point!
+
+                            // do we need to do this for synchronous calls? Vagueries of Couriers?
+
+                            courier.setReplyToEpr(replyToEPR);
+                            return courier.pickup(timeout);
+                        } else {
+                            return message;
+                        }
+                    }
+                } catch (FaultMessageException e) {
+                    throw e;
+                } catch (CourierException e) {
+                    logger.debug("Badly formed EPR [" + epr + "] for Service [" + service + "]. " + e.getMessage());
+                } catch (MalformedEPRException e) {
+                    // Hmmmm???... Can this really happen?  The Courier has already been created.  Haven't we already validated the EPR during the Courier lookup (above)??
+                    logger.warn("Unexpected error.  Badly formed EPR [" + epr + "] for Service [" + service + "]. But the EPR has already been validated!!");
+                } catch (Throwable t) {
+                    logger.warn("Unexpected exception during attempted message delivery over Courier for EPR [" + epr + "] for Service [" + service + "].", t);
+                } finally {
+                    // TODO: So does this mean that Couriers are stateful?  If so, do we need to synchronize on using them??
+                    CourierUtil.cleanCourier(courier);
+
+                    // put back the old To since we will have changed it.
+
+                    if (currentEpr != null)
+                        message.getHeader().getCall().setTo(currentEpr);
+                }
+            }
+
+            return null;
+        }
+    }
 }




More information about the jboss-svn-commits mailing list