[hornetq-commits] JBoss hornetq SVN: r9615 - in branches/2_2_0_HA_Improvements/src/main/org/hornetq/core: protocol/core/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Aug 31 11:22:51 EDT 2010


Author: jmesnil
Date: 2010-08-31 11:22:50 -0400 (Tue, 31 Aug 2010)
New Revision: 9615

Added:
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage.java
Modified:
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
Log:
Clean server shutdown

* when the server shuts down, it sends a DISCONNECT packet to its connections with its node ID.
* ClientSessionFactory will handle the DISCONNECT, trigger connection failures *and after* will notify the server locator that the node is down

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2010-08-31 12:14:53 UTC (rev 9614)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2010-08-31 15:22:50 UTC (rev 9615)
@@ -43,6 +43,7 @@
 import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.Ping;
 import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
@@ -1152,6 +1153,7 @@
 
          if (type == PacketImpl.DISCONNECT)
          {
+            final DisconnectMessage msg = (DisconnectMessage)packet;
             closeExecutor.execute(new Runnable()
             {
                // Must be executed on new thread since cannot block the netty thread for a long time and fail can
@@ -1160,6 +1162,10 @@
                {
                   conn.fail(new HornetQException(HornetQException.DISCONNECTED,
                                                  "The connection was disconnected because of server shutdown"));
+                  if (msg.getNodeID() != null)
+                  {
+                     serverLocator.notifyNodeDown(msg.getNodeID().toString());
+                  }
                }
             });
          }

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2010-08-31 12:14:53 UTC (rev 9614)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2010-08-31 15:22:50 UTC (rev 9615)
@@ -579,12 +579,13 @@
                      throw new HornetQException(HornetQException.NOT_CONNECTED,
                            "Cannot connect to server(s). Tried with all available servers.");
                   }
+                  /*
                   if (topologyArray == null && initialConnectors != null && attempts == initialConnectors.length)
                   {
                      throw new HornetQException(HornetQException.NOT_CONNECTED,
                      "Cannot connect to server(s). Tried with all available servers.");
                   }
-
+                  */
                   retry = true;
                }
                else

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java	2010-08-31 12:14:53 UTC (rev 9614)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java	2010-08-31 15:22:50 UTC (rev 9615)
@@ -90,6 +90,7 @@
 import org.hornetq.core.protocol.core.impl.wireformat.CreateReplicationSessionMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
@@ -174,7 +175,7 @@
          }
          case DISCONNECT:
          {
-            packet = new PacketImpl(PacketImpl.DISCONNECT);
+            packet = new DisconnectMessage();
             break;
          }
          case EXCEPTION:

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java	2010-08-31 12:14:53 UTC (rev 9614)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java	2010-08-31 15:22:50 UTC (rev 9615)
@@ -31,6 +31,7 @@
 import org.hornetq.core.protocol.core.CoreRemotingConnection;
 import org.hornetq.core.protocol.core.Packet;
 import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage;
 import org.hornetq.core.remoting.CloseListener;
 import org.hornetq.core.remoting.FailureListener;
 import org.hornetq.spi.core.remoting.BufferHandler;
@@ -307,11 +308,8 @@
          channel.flushConfirmations();
       }
 
-      if (nodeID != null)
-      {
-         channel0.send(new ClusterTopologyChangeMessage(nodeID.toString()));
-      }
-      channel0.sendAndFlush(new PacketImpl(PacketImpl.DISCONNECT));
+      Packet disconnect = new DisconnectMessage(nodeID);
+      channel0.sendAndFlush(disconnect);
    }
 
    public long generateChannelID()

Added: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage.java	                        (rev 0)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage.java	2010-08-31 15:22:50 UTC (rev 9615)
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * 
+ * A Ping
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class DisconnectMessage extends PacketImpl
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private SimpleString nodeID;
+   
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public DisconnectMessage(final SimpleString nodeID)
+   {
+      super(PacketImpl.DISCONNECT);
+
+      this.nodeID = nodeID;
+   }
+
+   public DisconnectMessage()
+   {
+      super(PacketImpl.DISCONNECT);
+   }
+
+   // Public --------------------------------------------------------
+
+   public SimpleString getNodeID()
+   {
+      return nodeID;
+   }
+
+   @Override
+   public void encodeRest(final HornetQBuffer buffer)
+   {
+      buffer.writeNullableSimpleString(nodeID);
+   }
+
+   @Override
+   public void decodeRest(final HornetQBuffer buffer)
+   {
+      nodeID = buffer.readNullableSimpleString();
+   }
+
+   @Override
+   public String toString()
+   {
+      StringBuffer buf = new StringBuffer(getParentString());
+      buf.append(", nodeID=" + nodeID);
+      buf.append("]");
+      return buf.toString();
+   }
+
+   @Override
+   public boolean equals(final Object other)
+   {
+      if (other instanceof DisconnectMessage == false)
+      {
+         return false;
+      }
+
+      DisconnectMessage r = (DisconnectMessage)other;
+
+      return super.equals(other) && nodeID.equals(r.nodeID);
+   }
+
+   @Override
+   public final boolean isRequiresConfirmations()
+   {
+      return false;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2010-08-31 12:14:53 UTC (rev 9614)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2010-08-31 15:22:50 UTC (rev 9615)
@@ -227,4 +227,52 @@
       
       return serverLocator.createSessionFactory(connector);      
    }
+   
+   @Override
+   public void connectionFailed(HornetQException me)
+   {
+      if (!session.isClosed())
+      {
+         try
+         {
+            session.cleanUp(false);
+         }
+         catch (Exception e)
+         {
+            log.warn("Unable to clean up the session after a connection failure", e);
+         }
+         serverLocator.notifyNodeDown(targetNodeID);
+         if (serverLocator.getDiscoveryAddress() == null)
+         {
+            executor.execute(new Runnable()
+            {
+               
+               public void run()
+               {
+                  ClientSessionFactory sf = null;
+                  do
+                  {
+                     try
+                     {
+                        sf = serverLocator.createSessionFactory(connector);
+                     }
+                     catch (HornetQException e)
+                     {
+                        if (e.getCode() == HornetQException.NOT_CONNECTED)
+                        {
+                           continue;
+                        }
+                     }
+                     catch (Exception e)
+                     {
+                        break;
+                     }
+                  }
+                  while (sf == null);
+               }
+            });
+         }
+      }
+      super.connectionFailed(me);
+   }
 }



More information about the hornetq-commits mailing list