[hornetq-commits] JBoss hornetq SVN: r11617 - in branches/Branch_2_2_EAP/src/main/org/hornetq: core/client/impl and 7 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Sat Oct 29 17:33:04 EDT 2011


Author: clebert.suconic
Date: 2011-10-29 17:33:03 -0400 (Sat, 29 Oct 2011)
New Revision: 11617

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ServerLocator.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/SimpleAddressManager.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/DivertImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQMessage.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/utils/TypedProperties.java
Log:
JBPAPP-7333 - Fixing reconnection on clustered bridge

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ServerLocator.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ServerLocator.java	2011-10-28 13:00:52 UTC (rev 11616)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ServerLocator.java	2011-10-29 21:33:03 UTC (rev 11617)
@@ -54,6 +54,15 @@
     * Create a ClientSessionFactory to a specific server. The server must already be known about by this ServerLocator.
     * This method allows the user to make a connection to a specific server bypassing any load balancing policy in force
     * @param transportConfiguration
+    * @return The ClientSesionFactory or null if the node is not present on the topology
+    * @throws Exception if a failure happened in creating the ClientSessionFactory or the ServerLocator does not know about the passed in transportConfiguration
+    */
+   ClientSessionFactory createSessionFactory(final String nodeID) throws Exception;
+   
+   /**
+    * Create a ClientSessionFactory to a specific server. The server must already be known about by this ServerLocator.
+    * This method allows the user to make a connection to a specific server bypassing any load balancing policy in force
+    * @param transportConfiguration
     * @return The ClientSesionFactory
     * @throws Exception if a failure happened in creating the ClientSessionFactory or the ServerLocator does not know about the passed in transportConfiguration
     */

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-10-28 13:00:52 UTC (rev 11616)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-10-29 21:33:03 UTC (rev 11617)
@@ -598,6 +598,40 @@
    {
       return afterConnectListener;
    }
+   
+   public ClientSessionFactory createSessionFactory(String nodeID) throws Exception
+   {
+      log.info(topology.describe("full topology"));
+      TopologyMember topologyMember = topology.getMember(nodeID);
+      
+      log.info("Creating connection factory towards " + nodeID + " = " + topologyMember);
+      
+      if (topologyMember == null)
+      {
+         return null;
+      }
+      else
+      if (topologyMember.getA() != null)
+      {
+         ClientSessionFactoryInternal factory = (ClientSessionFactoryInternal)createSessionFactory(topologyMember.getA());
+         if (topologyMember.getB() != null)
+         {
+            factory.setBackupConnector(topologyMember.getA(), topologyMember.getB());
+         }
+         return factory;
+      }
+      else if (topologyMember.getA() == null && topologyMember.getB() != null)
+      {
+         // This shouldn't happen, however I wanted this to consider all possible cases
+         ClientSessionFactoryInternal factory = (ClientSessionFactoryInternal)createSessionFactory(topologyMember.getB());
+         return factory;
+      }
+      else
+      {
+         // it shouldn't happen
+         return null;
+      }
+   }
 
    public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
    {

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2011-10-28 13:00:52 UTC (rev 11616)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2011-10-29 21:33:03 UTC (rev 11617)
@@ -815,6 +815,11 @@
 
                PagingStoreImpl.log.warn("Messages are being dropped on address " + getStoreName());
             }
+            
+            if (log.isDebugEnabled())
+            {
+               log.debug("Message " + message + " beig dropped for fullAddressPolicy==DROP");
+            }
 
             // Address is full, we just pretend we are paging, and drop the data
             return true;

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java	2011-10-28 13:00:52 UTC (rev 11616)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java	2011-10-29 21:33:03 UTC (rev 11617)
@@ -595,6 +595,10 @@
          {
             binding.route(message, context);
          }
+         else
+         {
+            log.warn("Couldn't find binding with id=" + bindingID + " on routeFromCluster for message=" + message + " binding = " + this);
+         }
       }
    }
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2011-10-28 13:00:52 UTC (rev 11616)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2011-10-29 21:33:03 UTC (rev 11617)
@@ -611,6 +611,19 @@
       {
          bindings.route(message, context);
       }
+      else
+      {
+    	 // this is a debug and not warn because this could be a regular scenario on publish-subscribe queues (or topic subscriptions on JMS)
+    	 if (log.isDebugEnabled())
+    	 {
+    	    log.debug("Couldn't find any bindings for address=" + address + " on message=" + message);
+    	 }
+      }
+      
+      if (log.isTraceEnabled())
+      {
+         log.trace("Message after routed=" + message);
+      }
 
       if (context.getQueueCount() == 0)
       {
@@ -625,6 +638,11 @@
             // Send to the DLA for the address
 
             SimpleString dlaAddress = addressSettings.getDeadLetterAddress();
+            
+            if (log.isDebugEnabled())
+            {
+               log.debug("sending message to dla address = " + dlaAddress + ", message=" + message);
+            }
 
             if (dlaAddress == null)
             {
@@ -641,6 +659,13 @@
                route(message, context.getTransaction(), false);
             }
          }
+         else
+         {
+            if (log.isDebugEnabled())
+            {
+               log.debug("Message " + message + " is not going anywhere as it didn't have a binding on address:" + address);
+            }
+         }
       }
       else
       {

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/SimpleAddressManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/SimpleAddressManager.java	2011-10-28 13:00:52 UTC (rev 11616)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/SimpleAddressManager.java	2011-10-29 21:33:03 UTC (rev 11617)
@@ -52,6 +52,11 @@
       {
          throw new IllegalStateException("Binding already exists " + binding);
       }
+      
+      if (log.isDebugEnabled())
+      {
+         log.debug("Adding binding " + binding + " with address = " + binding.getUniqueName(), new Exception ("trace"));
+      }
 
       return addMappingInternal(binding.getAddress(), binding);
    }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java	2011-10-28 13:00:52 UTC (rev 11616)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java	2011-10-29 21:33:03 UTC (rev 11617)
@@ -514,6 +514,7 @@
          {
             if (requiresResponse)
             {
+               log.debug("Sending exception to client", e);
                response = new SessionXAResponseMessage(true, e.errorCode, e.getMessage());
             }
             else
@@ -525,6 +526,7 @@
          {
             if (requiresResponse)
             {
+               log.debug("Sending exception to client", e);
                response = new HornetQExceptionMessage((HornetQException)e);
             }
             else

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-10-28 13:00:52 UTC (rev 11616)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-10-29 21:33:03 UTC (rev 11617)
@@ -701,6 +701,12 @@
          if (csf == null || csf.isClosed())
          {
             csf = createSessionFactory();
+            if (csf == null)
+            {
+               // Retrying. This probably means the node is not available (for the cluster connection case)
+               scheduleRetryConnect();
+               return;
+            }
             // Session is pre-acknowledge
             session = (ClientSessionInternal)csf.createSession(user, password, false, true, true, true, 1);
          }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2011-10-28 13:00:52 UTC (rev 11616)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2011-10-29 21:33:03 UTC (rev 11617)
@@ -74,8 +74,6 @@
    
    private final long targetNodeEventUID;
 
-   private final TransportConfiguration connector;
-
    private final ServerLocatorInternal discoveryLocator;
 
    public ClusterConnectionBridge(final ClusterConnection clusterConnection,
@@ -138,7 +136,6 @@
       this.managementAddress = managementAddress;
       this.managementNotificationAddress = managementNotificationAddress;
       this.flowRecord = flowRecord;
-      this.connector = connector;
 
       // we need to disable DLQ check on the clustered bridges
       queue.setInternalQueue(true);
@@ -152,7 +149,14 @@
 
    protected ClientSessionFactoryInternal createSessionFactory() throws Exception
    {
-      ClientSessionFactoryInternal factory = super.createSessionFactory();
+      ClientSessionFactoryInternal factory = (ClientSessionFactoryInternal)serverLocator.createSessionFactory(targetNodeID);
+      
+      if (factory == null)
+      {
+         log.warn("NodeID=" + targetNodeID + " is not available on the topology. Retrying the connection to that node now");
+         return null;
+      }
+      factory.setReconnectAttempts(0);
       factory.getConnection().addFailureListener(new FailureListener()
       {
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java	2011-10-28 13:00:52 UTC (rev 11616)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java	2011-10-29 21:33:03 UTC (rev 11617)
@@ -311,6 +311,11 @@
       buff.putLong(remoteQueueID);
 
       message.putBytesProperty(idsHeaderName, ids);
+      
+      if (log.isTraceEnabled())
+      {
+         log.trace("Adding remoteQueue ID = " + remoteQueueID + " into message=" + message + " store-forward-queue=" + storeAndForwardQueue);
+      }
    }
 
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/DivertImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/DivertImpl.java	2011-10-28 13:00:52 UTC (rev 11616)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/DivertImpl.java	2011-10-29 21:33:03 UTC (rev 11617)
@@ -84,6 +84,11 @@
       // properly on ack, since the original address will be overwritten
 
       // TODO we can optimise this so it doesn't copy if it's not routed anywhere else
+      
+      if (log.isTraceEnabled())
+      {
+         log.trace("Diverting message " + message + " into " + this);
+      }
 
       long id = storageManager.generateUniqueID();
       

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQMessage.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQMessage.java	2011-10-28 13:00:52 UTC (rev 11616)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQMessage.java	2011-10-29 21:33:03 UTC (rev 11617)
@@ -961,6 +961,7 @@
       sb.append(getJMSMessageID());
       sb.append("]:");
       sb.append(message.isDurable() ? "PERSISTENT" : "NON-PERSISTENT");
+      sb.append(",coreMessage=" +  message);
       return sb.toString();
    }
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/utils/TypedProperties.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/utils/TypedProperties.java	2011-10-28 13:00:52 UTC (rev 11616)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/utils/TypedProperties.java	2011-10-29 21:33:03 UTC (rev 11617)
@@ -25,8 +25,12 @@
 import static org.hornetq.utils.DataConstants.SHORT;
 import static org.hornetq.utils.DataConstants.STRING;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.Map.Entry;
@@ -138,7 +142,7 @@
       checkCreateProperties();
       doPutValue(key, value == null ? new NullValue() : new StringValue(value));
    }
-   
+
    public void putNullValue(final SimpleString key)
    {
       checkCreateProperties();
@@ -604,9 +608,65 @@
    @Override
    public String toString()
    {
-      return "TypedProperties[" + properties + "]";
+      StringWriter strWriter = new StringWriter();
+      PrintWriter out = new PrintWriter(strWriter);
+      Iterator<Entry<SimpleString, PropertyValue>> propIter = properties.entrySet().iterator();
+      while (propIter.hasNext())
+      {
+         Entry<SimpleString, PropertyValue> item = propIter.next();
+
+         // When debugging TypedProperties, we need to identify the actual array on PrintData and other log outputs
+         if (item.getKey().toString().startsWith("_HQ_ROUTE_TO"))
+         {
+            String outstr = toLongArray(item.getValue());
+            
+            out.print(item.getKey() + "=" + outstr);
+         }
+         else
+         {
+            out.print(item.getKey() + "=" + item.getValue());
+         }
+         if (propIter.hasNext())
+         {
+            out.print(",");
+         }
+      }
+      return "TypedProperties[" + strWriter.toString() + "]";
    }
 
+   /**
+    * @param item
+    * @return
+    */
+   private String toLongArray(PropertyValue value)
+   {
+      StringBuffer outstr = new StringBuffer();
+
+      try
+      {
+         byte[] ids = (byte [])value.getValue();
+
+         ByteBuffer buff = ByteBuffer.wrap(ids);
+
+         while (buff.hasRemaining())
+         {
+            long bindingID = buff.getLong();
+            outstr.append(bindingID);
+            if (buff.hasRemaining())
+            {
+               outstr.append(",");
+            }
+         }
+      }
+      catch (Throwable e)
+      {
+         log.warn(e.getMessage(), e);
+         outstr = new StringBuffer();
+         outstr.append(value.toString());
+      }
+      return "[" + outstr.toString() + "]";
+   }
+
    // Private ------------------------------------------------------------------------------------
 
    private void checkCreateProperties()



More information about the hornetq-commits mailing list