[hornetq-commits] JBoss hornetq SVN: r10902 - in branches/Branch_2_2_EAP_cluster_clean2: src/main/org/hornetq/core/postoffice/impl and 7 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Jun 30 01:48:56 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-06-30 01:48:56 -0400 (Thu, 30 Jun 2011)
New Revision: 10902

Modified:
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/integration/logging/HornetQLoggerFormatter.java
   branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
   branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
Log:
changes on my branch

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-06-29 16:54:35 UTC (rev 10901)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-06-30 05:48:56 UTC (rev 10902)
@@ -366,8 +366,18 @@
 
    public void connectionDestroyed(final Object connectionID)
    {
-      handleConnectionFailure(connectionID,
-                              new HornetQException(HornetQException.NOT_CONNECTED, "Channel disconnected"));
+      // It has to use the same executor as the disconnect message is being sent through
+      
+      final HornetQException ex = new HornetQException(HornetQException.DISCONNECTED, "Channel disconnected");
+      
+      closeExecutor.execute(new Runnable()
+      {
+         public void run()
+         {
+            handleConnectionFailure(connectionID, ex);
+         }
+      });
+
    }
 
    public void connectionException(final Object connectionID, final HornetQException me)
@@ -1365,6 +1375,8 @@
          if (type == PacketImpl.DISCONNECT)
          {
             final DisconnectMessage msg = (DisconnectMessage)packet;
+            
+            log.info("PUTZ10 Disconnect arrived: " + msg);
 
             closeExecutor.execute(new Runnable()
             {
@@ -1378,6 +1390,7 @@
                      serverLocator.notifyNodeDown(msg.getNodeID().toString());
                   }
 
+                  log.info("Disconnect being called on connection");
                   conn.fail(new HornetQException(HornetQException.DISCONNECTED,
                                                  "The connection was disconnected because of server shutdown"));
 

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-06-29 16:54:35 UTC (rev 10901)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-06-30 05:48:56 UTC (rev 10902)
@@ -19,6 +19,7 @@
 import java.security.AccessController;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -1232,6 +1233,18 @@
       // Notify if waiting on getting topology
       notify();
    }
+   
+   /* (non-Javadoc)
+    * @see java.lang.Object#toString()
+    */
+   @Override
+   public String toString()
+   {
+      return "ServerLocatorImpl [initialConnectors=" + Arrays.toString(initialConnectors) +
+             ", discoveryGroupConfiguration=" +
+             discoveryGroupConfiguration +
+             "]";
+   }
 
    private void updateArraysAndPairs()
    {

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2011-06-29 16:54:35 UTC (rev 10901)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2011-06-30 05:48:56 UTC (rev 10902)
@@ -816,6 +816,15 @@
 
    }
 
+   /* (non-Javadoc)
+    * @see java.lang.Object#toString()
+    */
+   @Override
+   public String toString()
+   {
+      return "PostOfficeImpl [server=" + server + "]";
+   }
+
    // Private -----------------------------------------------------------------
 
    /**

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java	2011-06-29 16:54:35 UTC (rev 10901)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java	2011-06-30 05:48:56 UTC (rev 10902)
@@ -305,7 +305,7 @@
 
    protected String getParentString()
    {
-      return "PACKET[type=" + type + ", channelID=" + channelID + ", packetObject=" + this.getClass().getSimpleName();
+      return "PACKET("  + this.getClass().getSimpleName() + ")[type=" + type + ", channelID=" + channelID + ", packetObject=" + this.getClass().getSimpleName();
    }
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java	2011-06-29 16:54:35 UTC (rev 10901)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java	2011-06-30 05:48:56 UTC (rev 10902)
@@ -158,8 +158,6 @@
    public String toString()
    {
       return "RemotingConnectionImpl [clientID=" + clientID +
-             ", decoder=" +
-             decoder +
              ", nodeID=" +
              nodeID +
              ", transportConnection=" +

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java	2011-06-29 16:54:35 UTC (rev 10901)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java	2011-06-30 05:48:56 UTC (rev 10902)
@@ -228,20 +228,22 @@
 
       public void connectionDestroyed(final Object connectionID)
       {
-         if (connections.remove(connectionID) != null)
+         InVMConnection connection = (InVMConnection)connections.remove(connectionID);
+         
+         if (connection != null)
          {
+ 
             listener.connectionDestroyed(connectionID);
 
-            // Execute on different thread to avoid deadlocks
-            new Thread()
+              // Execute on different thread after all the packets are sent, to avoid deadlocks
+            connection.getExecutor().execute(new Runnable()
             {
-               @Override
                public void run()
                {
-                  // Remove on the other side too
-                  connector.disconnect((String)connectionID);
+                 // Remove on the other side too
+                   connector.disconnect((String)connectionID);
                }
-            }.start();
+            });
          }
       }
 

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java	2011-06-29 16:54:35 UTC (rev 10901)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java	2011-06-30 05:48:56 UTC (rev 10902)
@@ -169,6 +169,11 @@
    public void removeReadyListener(ReadyListener listener)
    {
    }
+   
+   public Executor getExecutor()
+   {
+      return executor;
+   }
 
    /* (non-Javadoc)
     * @see java.lang.Object#toString()

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-06-29 16:54:35 UTC (rev 10901)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-06-30 05:48:56 UTC (rev 10902)
@@ -472,7 +472,7 @@
 
          if (isTrace)
          {
-            log.trace("Bridge " + name + " is handling reference=" + ref);
+            log.trace("Bridge " + this + " is handling reference=" + ref);
          }
          ref.handled();
 
@@ -518,16 +518,11 @@
 
    public final void connectionFailed(final HornetQException me, boolean failedOver)
    {
-      log.warn(name + "::Connection failed with failedOver=" + failedOver, me);
-      if (isTrace)
-      {
-         log.trace("Calling BridgeImpl::connectionFailed(HOrnetQException me=" + me +
-                   ", boolean failedOver=" +
-                   failedOver);
-      }
+      log.warn(this + "::Connection failed with failedOver=" + failedOver + "-" + me, me);
+      
       try
       {
-         csf.cleanup();
+         // csf.cleanup();
       }
       catch (Throwable dontCare)
       {
@@ -535,21 +530,29 @@
 
       try
       {
-         session.cleanUp(false);
+         // session.cleanUp(false);
       }
       catch (Throwable dontCare)
       {
       }
-
-      fail(false);
-
-      scheduleRetryConnect();
+      
+      if (me.getCode() == HornetQException.DISCONNECTED)
+      {
+         log.warn(this + "::Connection failed with failedOver=" + failedOver + "-" + me, me);
+         fail(true);
+      }
+      else
+      {
+         log.warn(this + "::Connection failed with failedOver=" + failedOver + "-" + me, me);
+         fail(false);
+         scheduleRetryConnect();
+      }
    }
 
    public void beforeReconnect(final HornetQException exception)
    {
-      log.warn(name + "::Connection failed before reconnect ", exception);
-      fail(false);
+//      log.warn(name + "::Connection failed before reconnect ", exception);
+//      fail(false);
    }
 
    // Package protected ---------------------------------------------
@@ -564,28 +567,12 @@
    @Override
    public String toString()
    {
-      return this.getClass().getName() + " [name=" +
-             name +
-             ", nodeUUID=" +
-             nodeUUID +
-             ", queue=" +
-             queue +
-             ", filter=" +
-             filter +
-             ", forwardingAddress=" +
-             forwardingAddress +
-             ", useDuplicateDetection=" +
-             useDuplicateDetection +
-             ", active=" +
-             active +
-             ", stopping=" +
-             stopping +
-             "]";
+      return this.getClass().getName() + " [name=" + name + ", queue=" + queue + " targetConnector=" + this.serverLocator + "]";
    }
 
    protected void fail(final boolean permanently)
    {
-      log.debug(name + "::BridgeImpl::fail being called, permanently=" + permanently);
+      log.debug(this + "::fail being called, permanently=" + permanently);
 
       if (queue != null)
       {
@@ -625,7 +612,7 @@
    /* This is called only when the bridge is activated */
    protected void connect()
    {
-      BridgeImpl.log.info("Connecting bridge " + name + " to its destination [" + nodeUUID.toString() + "], csf=" + this.csf);
+      BridgeImpl.log.info("Connecting  " + this + " to its destination [" + nodeUUID.toString() + "], csf=" + this.csf);
 
       retryCount++;
 
@@ -702,7 +689,7 @@
          queue.addConsumer(BridgeImpl.this);
          queue.deliverAsync();
 
-         BridgeImpl.log.info("Bridge " + name + " is connected [" + nodeUUID + "-> " + name + "]");
+         BridgeImpl.log.info("Bridge " + this + " is connected");
 
          return;
       }
@@ -720,12 +707,12 @@
          }
          else
          {
-            BridgeImpl.log.warn("Bridge " + name + " is unable to connect to destination. Retrying", e);
+            BridgeImpl.log.warn("Bridge " + this + " is unable to connect to destination. Retrying", e);
          }
       }
       catch (Exception e)
       {
-         BridgeImpl.log.warn("Bridge " + name + " is unable to connect to destination. It will be disabled.", e);
+         BridgeImpl.log.warn("Bridge " + this + " is unable to connect to destination. It will be disabled.", e);
       }
 
       scheduleRetryConnect();

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-06-29 16:54:35 UTC (rev 10901)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-06-30 05:48:56 UTC (rev 10902)
@@ -1433,7 +1433,7 @@
    @Override
    public String toString()
    {
-      return "QueueImpl[name=" + name.toString() + "]@" + Integer.toHexString(System.identityHashCode(this));
+      return "QueueImpl[name=" + name.toString() + ", postOffice=" + this.postOffice + "]@" + Integer.toHexString(System.identityHashCode(this));
    }
 
    // Private

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/integration/logging/HornetQLoggerFormatter.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/integration/logging/HornetQLoggerFormatter.java	2011-06-29 16:54:35 UTC (rev 10901)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/integration/logging/HornetQLoggerFormatter.java	2011-06-30 05:48:56 UTC (rev 10902)
@@ -59,15 +59,21 @@
          {
             StringWriter sw = new StringWriter();
             PrintWriter pw = new PrintWriter(sw);
-            record.getThrown().printStackTrace(pw);
+            
+            pw.println(record.getThrown() );
+            StackTraceElement[] trace = record.getThrown().getStackTrace();
+            for (int i=0; i < trace.length; i++)
+                pw.println("\tat " + trace[i]);
             pw.close();
+
             sb.append(sw.toString());
          }
          catch (Exception ex)
          {
          }
       }
-      //sb.append(HornetQLoggerFormatter.LINE_SEPARATOR);
+      
+      sb.append(HornetQLoggerFormatter.LINE_SEPARATOR);
 
       return sb.toString();
    }

Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-06-29 16:54:35 UTC (rev 10901)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-06-30 05:48:56 UTC (rev 10902)
@@ -13,9 +13,11 @@
 
 package org.hornetq.tests.integration.cluster.distribution;
 
+import java.io.PrintStream;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -360,25 +362,27 @@
             System.out.println("Binding = " + qBinding + ", queue=" + qBinding.getQueue());
          }
       }
-      System.out.println("=======================================================================");
 
+      StringWriter writer = new StringWriter();
+      PrintWriter out = new PrintWriter(writer);
+      
       try
       {
          for (HornetQServer hornetQServer : servers)
          {
             if (hornetQServer != null)
             {
-               System.out.println(clusterDescription(hornetQServer));
-               System.out.println(debugBindings(hornetQServer, hornetQServer.getConfiguration().getManagementNotificationAddress().toString()));
+               out.println(clusterDescription(hornetQServer));
+               out.println(debugBindings(hornetQServer, hornetQServer.getConfiguration().getManagementNotificationAddress().toString()));
             }
          }
          
          for (HornetQServer hornetQServer : servers)
          {
-            System.out.println("Management bindings on " + hornetQServer);
+            out.println("Management bindings on " + hornetQServer);
             if (hornetQServer != null)
             {
-               System.out.println(debugBindings(hornetQServer, hornetQServer.getConfiguration().getManagementNotificationAddress().toString()));
+               out.println(debugBindings(hornetQServer, hornetQServer.getConfiguration().getManagementNotificationAddress().toString()));
             }
          }
       }
@@ -387,7 +391,7 @@
       }
       
       
-      throw new IllegalStateException(msg);
+      throw new IllegalStateException(msg + "\n" + writer.toString());
    }
    
    
@@ -2016,6 +2020,7 @@
 
    protected void stopServers(final int... nodes) throws Exception
    {
+      log.info("Stopping nodes "  + Arrays.toString(nodes));
       for (int node : nodes)
       {
          if (servers[node] != null && servers[node].isStarted())

Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java	2011-06-29 16:54:35 UTC (rev 10901)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java	2011-06-30 05:48:56 UTC (rev 10902)
@@ -52,21 +52,6 @@
    {
       return false;
    }
-   
-   public void _testLoop() throws Throwable
-   {
-      for (int i = 0 ; i < 1000; i++)
-      {
-         log.info("#test " + i);
-         testStopAllStartAll();
-         if (i + 1  < 1000)
-         {
-            tearDown();
-            setUp();
-         }
-      }
-   }
-
    public void testStopAllStartAll() throws Throwable
    {
       try
@@ -1620,6 +1605,22 @@
       verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
    }
 
+   
+   public void _testLoop() throws Throwable
+   {
+      for (int i = 0 ; i < 1000; i++)
+      {
+         log.info("#test " + i);
+         testStopSuccessiveServers();
+         if (i + 1  < 1000)
+         {
+            tearDown();
+            setUp();
+         }
+      }
+   }
+
+
    public void testStopSuccessiveServers() throws Exception
    {
       setupCluster();
@@ -1842,5 +1843,12 @@
 
       stopServers(0, 1, 2, 3, 4);
    }
+   
 
+   protected boolean isFileStorage()
+   {
+      return false;
+   }
+
+
 }

Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java	2011-06-29 16:54:35 UTC (rev 10901)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java	2011-06-30 05:48:56 UTC (rev 10902)
@@ -79,7 +79,7 @@
          waitForBindings(0, "queues.testaddress", 1, 1, true);
          waitForBindings(1, "queues.testaddress", 1, 1, true);
 
-         waitForServerTopology(servers[1], 3, 5);
+         waitForTopology(servers[1], 3);
 
          sendWithProperty(0, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
 
@@ -162,7 +162,7 @@
          waitForBindings(0, "queues.testaddress", 1, 1, true);
          waitForBindings(1, "queues.testaddress", 1, 1, true);
 
-         waitForServerTopology(servers[1], 3, 5);
+         waitForTopology(servers[1], 3);
 
 
          sendWithProperty(0, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
@@ -215,21 +215,6 @@
       }
    }
 
-   private void waitForServerTopology(HornetQServer server, int nodes, int seconds)
-         throws InterruptedException
-   {
-      Topology topology = server.getClusterManager().getTopology();
-      long timeToWait = System.currentTimeMillis() + (seconds * 1000);
-      while(topology.nodes()!= nodes)
-      {
-         Thread.sleep(100);
-         if(System.currentTimeMillis() > timeToWait)
-         {
-            fail("timed out waiting for server topology");
-         }
-      }
-   }
-
    public boolean isNetty()
    {
       return true;



More information about the hornetq-commits mailing list