[hornetq-commits] JBoss hornetq SVN: r10849 - in branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq: core/client/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Jun 17 12:37:51 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-06-17 12:37:51 -0400 (Fri, 17 Jun 2011)
New Revision: 10849

Modified:
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/api/core/DiscoveryGroupConfiguration.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
tweaks

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/api/core/DiscoveryGroupConfiguration.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/api/core/DiscoveryGroupConfiguration.java	2011-06-17 15:52:20 UTC (rev 10848)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/api/core/DiscoveryGroupConfiguration.java	2011-06-17 16:37:51 UTC (rev 10849)
@@ -179,4 +179,26 @@
       result = 31 * result + (int) (discoveryInitialWaitTimeout ^ (discoveryInitialWaitTimeout >>> 32));
       return result;
    }
+
+   /* (non-Javadoc)
+    * @see java.lang.Object#toString()
+    */
+   @Override
+   public String toString()
+   {
+      return "DiscoveryGroupConfiguration [discoveryInitialWaitTimeout=" + discoveryInitialWaitTimeout +
+             ", groupAddress=" +
+             groupAddress +
+             ", groupPort=" +
+             groupPort +
+             ", localBindAddress=" +
+             localBindAddress +
+             ", name=" +
+             name +
+             ", refreshTimeout=" +
+             refreshTimeout +
+             "]";
+   }
+   
+   
 }

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-06-17 15:52:20 UTC (rev 10848)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-06-17 16:37:51 UTC (rev 10849)
@@ -422,6 +422,13 @@
          return;
       }
 
+      synchronized (exitLock)
+      {
+         exitLock.notifyAll();
+      }
+
+      forceReturnChannel1();
+
       // we need to stop the factory from connecting if it is in the middle of trying to failover before we get the lock
       causeExit();
       synchronized (createSessionLock)
@@ -942,7 +949,7 @@
          {
             if (isDebug)
             {
-               log.debug("Trying reconnection attempt " + count);
+               log.debug("Trying reconnection attempt " + count + "/" + reconnectAttempts);
             }
 
             getConnection();
@@ -1055,6 +1062,11 @@
          try
          {
             DelegatingBufferHandler handler = new DelegatingBufferHandler();
+            
+            if (log.isDebugEnabled())
+            {
+               log.debug("Trying to connect with connector = " + connectorFactory + ", parameters = " + connectorConfig.getParams());
+            }
 
             connector = connectorFactory.createConnector(connectorConfig.getParams(),
                                                          handler,

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-06-17 15:52:20 UTC (rev 10848)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-06-17 16:37:51 UTC (rev 10849)
@@ -65,7 +65,7 @@
 
    private StaticConnector staticConnector = new StaticConnector();
 
-   private Topology topology = new Topology();
+   private final Topology topology = new Topology();
 
    private Pair<TransportConfiguration, TransportConfiguration>[] topologyArray;
 
@@ -599,7 +599,7 @@
                                                       threadPool,
                                                       scheduledThreadPool,
                                                       interceptors);
-               factory.connect(initialConnectAttempts, failoverOnInitialConnection);
+               factory.connect(reconnectAttempts, failoverOnInitialConnection);
             }
             catch (HornetQException e)
             {
@@ -1096,8 +1096,10 @@
       {
          staticConnector.disconnect();
       }
+      
+      Set<ClientSessionFactory> clonedFactory = new HashSet<ClientSessionFactory>(factories);
 
-      for (ClientSessionFactory factory : factories)
+      for (ClientSessionFactory factory : clonedFactory)
       {
          factory.close();
       }
@@ -1263,16 +1265,6 @@
    public synchronized void factoryClosed(final ClientSessionFactory factory)
    {
       factories.remove(factory);
-
-      if (factories.isEmpty())
-      {
-         // Go back to using the broadcast or static list
-
-         receivedTopology = false;
-
-         topology = null;
-
-      }
    }
 
    public Topology getTopology()

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/ClusterConnection.java	2011-06-17 15:52:20 UTC (rev 10848)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/ClusterConnection.java	2011-06-17 16:37:51 UTC (rev 10849)
@@ -41,14 +41,6 @@
     */
    Map<String, String> getNodes();
 
-   void handleReplicatedAddBinding(SimpleString address,
-                                   SimpleString uniqueName,
-                                   SimpleString routingName,
-                                   long queueID,
-                                   SimpleString filterString,
-                                   SimpleString queueName,
-                                   int distance) throws Exception;
-
    void activate() throws Exception;
    
    TransportConfiguration getConnector();

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java	2011-06-17 15:52:20 UTC (rev 10848)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java	2011-06-17 16:37:51 UTC (rev 10849)
@@ -33,14 +33,8 @@
    Bridge getBridge();
 
    void close() throws Exception;
-   
-   public void resume() throws Exception;
-   
+
    boolean isClosed();
 
    void reset() throws Exception;
-
-   void pause() throws Exception;
-
-    boolean isPaused();
 }

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-06-17 15:52:20 UTC (rev 10848)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-06-17 16:37:51 UTC (rev 10849)
@@ -617,7 +617,7 @@
    protected ClientSessionFactoryInternal createSessionFactory() throws Exception
    {
       ClientSessionFactoryInternal csf = (ClientSessionFactoryInternal)serverLocator.createSessionFactory();
-      // csf.setReconnectAttempts(0);
+      csf.setReconnectAttempts(0);
       //csf.setInitialReconnectAttempts(1);
       return csf;
    }

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-06-17 15:52:20 UTC (rev 10848)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-06-17 16:37:51 UTC (rev 10849)
@@ -489,7 +489,7 @@
 
 
    // TODO: does it need to be sync?
-   public synchronized void nodeUP(final String nodeID,
+   public void nodeUP(final String nodeID,
                                    final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
                                    final boolean last)
    {
@@ -563,10 +563,6 @@
             else
             {
                log.info("Reattaching nodeID=" + nodeID);
-               if (record.isPaused())
-               {
-                  record.resume();
-               }
             }
          }
          catch (Exception e)
@@ -808,26 +804,8 @@
          bridge.stop();
       }
 
-      public void pause() throws Exception
+       public boolean isClosed()
       {
-           paused = true;
-           clearBindings();
-           bridge.pause();
-      }
-
-       public boolean isPaused()
-       {
-           return paused;
-       }
-
-       public void resume() throws Exception
-      {
-         paused = false;
-         bridge.resume();
-      }
-      
-      public boolean isClosed()
-      {
          return isClosed;
       }
 
@@ -836,7 +814,6 @@
          clearBindings();
       }
 
-
        public void setBridge(final Bridge bridge)
       {
          this.bridge = bridge;
@@ -972,6 +949,7 @@
 
       private synchronized void clearBindings() throws Exception
       {
+         log.debug(ClusterConnectionImpl.this + " clearing bindings");
          for (RemoteQueueBinding binding : new HashSet<RemoteQueueBinding>(bindings.values()))
          {
             removeBinding(binding.getClusterName());
@@ -980,6 +958,10 @@
 
       private synchronized void doBindingAdded(final ClientMessage message) throws Exception
       {
+         if (log.isTraceEnabled())
+         {
+            log.trace(ClusterConnectionImpl.this + " Adding binding " + message);
+         }
          if (!message.containsProperty(ManagementHelper.HDR_DISTANCE))
          {
             throw new IllegalStateException("distance is null");
@@ -1039,6 +1021,11 @@
 
             return;
          }
+         
+         if (isTrace)
+         {
+            log.trace("Adding binding " + clusterName + " into " + ClusterConnectionImpl.this);
+         }
 
          bindings.put(clusterName, binding);
 
@@ -1058,6 +1045,10 @@
 
       private void doBindingRemoved(final ClientMessage message) throws Exception
       {
+         if (log.isTraceEnabled())
+         {
+            log.trace(ClusterConnectionImpl.this + " Removing binding " + message);
+         }
          if (!message.containsProperty(ManagementHelper.HDR_CLUSTER_NAME))
          {
             throw new IllegalStateException("clusterName is null");
@@ -1082,6 +1073,10 @@
 
       private synchronized void doConsumerCreated(final ClientMessage message) throws Exception
       {
+         if (log.isTraceEnabled())
+         {
+            log.trace(ClusterConnectionImpl.this + " Consumer created " + message);
+         }
          if (!message.containsProperty(ManagementHelper.HDR_DISTANCE))
          {
             throw new IllegalStateException("distance is null");
@@ -1136,6 +1131,10 @@
 
       private synchronized void doConsumerClosed(final ClientMessage message) throws Exception
       {
+         if (log.isTraceEnabled())
+         {
+            log.trace(ClusterConnectionImpl.this + " Consumer closed " + message);
+         }
          if (!message.containsProperty(ManagementHelper.HDR_DISTANCE))
          {
             throw new IllegalStateException("distance is null");
@@ -1189,49 +1188,6 @@
 
    }
 
-   public void handleReplicatedAddBinding(final SimpleString address,
-                                          final SimpleString uniqueName,
-                                          final SimpleString routingName,
-                                          final long queueID,
-                                          final SimpleString filterString,
-                                          final SimpleString queueName,
-                                          final int distance) throws Exception
-   {
-      Binding queueBinding = postOffice.getBinding(queueName);
-
-      if (queueBinding == null)
-      {
-         throw new IllegalStateException("Cannot find s & f queue " + queueName);
-      }
-
-      Queue queue = (Queue)queueBinding.getBindable();
-
-      RemoteQueueBinding binding = new RemoteQueueBindingImpl(server.getStorageManager().generateUniqueID(),
-                                                              address,
-                                                              uniqueName,
-                                                              routingName,
-                                                              queueID,
-                                                              filterString,
-                                                              queue,
-                                                              queueName,
-                                                              distance);
-
-      if (postOffice.getBinding(uniqueName) != null)
-      {
-         ClusterConnectionImpl.log.warn("Remoting queue binding " + uniqueName +
-                                        " has already been bound in the post office. Most likely cause for this is you have a loop " +
-                                        "in your cluster due to cluster max-hops being too large or you have multiple cluster connections to the same nodes using overlapping addresses");
-
-         return;
-      }
-
-      postOffice.addBinding(binding);
-
-      Bindings theBindings = postOffice.getBindingsForAddress(address);
-
-      theBindings.setRouteWhenNoConsumers(routeWhenNoConsumers);
-   }
-
    // for testing only
    public Map<String, MessageFlowRecord> getRecords()
    {
@@ -1286,6 +1242,17 @@
             return null;
          }
       }
+
+      /* (non-Javadoc)
+       * @see java.lang.Object#toString()
+       */
+      @Override
+      public String toString()
+      {
+         return "StaticClusterConnector [tcConfigs=" + Arrays.toString(tcConfigs) + "]";
+      }
+      
+      
    }
 
    private class DiscoveryClusterConnector implements ClusterConnector

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-06-17 15:52:20 UTC (rev 10848)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-06-17 16:37:51 UTC (rev 10849)
@@ -97,14 +97,8 @@
    // the cluster connections which links this node to other cluster nodes
    private final Map<String, ClusterConnection> clusterConnections = new HashMap<String, ClusterConnection>();
 
-   // regular client listeners to be notified of cluster topology changes.
-   // they correspond to regular clients using a HA ServerLocator
-   private Set<ClusterTopologyListener> clientListeners = new ConcurrentHashSet<ClusterTopologyListener>();
+   private Set<ClusterTopologyListener> topologyListeners = new ConcurrentHashSet<ClusterTopologyListener>();
 
-   // cluster connections listeners to be notified of cluster topology changes
-   // they correspond to cluster connections on *other nodes connected to this one*
-   private Set<ClusterTopologyListener> clusterConnectionListeners = new ConcurrentHashSet<ClusterTopologyListener>();
-
    private Topology topology = new Topology();
 
    private volatile ServerLocatorInternal backupServerLocator;
@@ -208,8 +202,7 @@
             managementService.unregisterCluster(clusterConnection.getName().toString());
          }
 
-         clusterConnectionListeners.clear();
-         clientListeners.clear();
+         topologyListeners.clear();
          clusterConnections.clear();
          topology.clear();
 
@@ -249,15 +242,10 @@
       if (removed)
       {
 
-         for (ClusterTopologyListener listener : clientListeners)
+         for (ClusterTopologyListener listener : topologyListeners)
          {
             listener.nodeDown(nodeID);
          }
-
-         for (ClusterTopologyListener listener : clusterConnectionListeners)
-         {
-            listener.nodeDown(nodeID);
-         }
       }
    }
 
@@ -274,16 +262,11 @@
          return;
       }
 
-      for (ClusterTopologyListener listener : clientListeners)
+      for (ClusterTopologyListener listener : topologyListeners)
       {
          listener.nodeUP(nodeID, member.getConnector(), last);
       }
 
-      for (ClusterTopologyListener listener : clusterConnectionListeners)
-      {
-         listener.nodeUP(nodeID, member.getConnector(), last);
-      }
-
       // if this is a node being announced we are hearing it direct from the nodes CM so need to inform our cluster
       // connections.
       if (nodeAnnounce)
@@ -322,18 +305,7 @@
 
    public void addClusterTopologyListener(final ClusterTopologyListener listener, final boolean clusterConnection)
    {
-      synchronized (this)
-      {
-         if (clusterConnection)
-         {
-            this.clusterConnectionListeners.add(listener);
-         }
-         else
-         {
-            this.clientListeners.add(listener);
-         }
-      }
-
+      topologyListeners.add(listener);
       // We now need to send the current topology to the client
       topology.sendTopology(listener);
    }
@@ -341,14 +313,7 @@
    public void removeClusterTopologyListener(final ClusterTopologyListener listener,
                                                           final boolean clusterConnection)
    {
-      if (clusterConnection)
-      {
-         this.clusterConnectionListeners.remove(listener);
-      }
-      else
-      {
-         this.clientListeners.remove(listener);
-      }
+      topologyListeners.add(listener);
    }
 
    public Topology getTopology()
@@ -425,15 +390,14 @@
             }
          }
 
-         for (ClusterTopologyListener listener : clientListeners)
+         for (ClusterTopologyListener listener : topologyListeners)
          {
+            if (log.isDebugEnabled())
+            {
+               log.debug("Informing client listener " + listener + " about itself node " + nodeID + " with connector=" + member.getConnector());
+            }
             listener.nodeUP(nodeID, member.getConnector(), false);
          }
-
-         for (ClusterTopologyListener listener : clusterConnectionListeners)
-         {
-            listener.nodeUP(nodeID, member.getConnector(), false);
-         }
       }
    }
 
@@ -499,16 +463,10 @@
 
       // Propagate the announcement
 
-      for (ClusterTopologyListener listener : clientListeners)
+      for (ClusterTopologyListener listener : topologyListeners)
       {
          listener.nodeUP(nodeID, member.getConnector(), false);
       }
-
-      for (ClusterTopologyListener listener : clusterConnectionListeners)
-      {
-         listener.nodeUP(nodeID, member.getConnector(), false);
-      }
-
    }
 
    private synchronized void deployBroadcastGroup(final BroadcastGroupConfiguration config) throws Exception



More information about the hornetq-commits mailing list