[hornetq-commits] JBoss hornetq SVN: r9615 - in branches/2_2_0_HA_Improvements/src/main/org/hornetq/core: protocol/core/impl and 2 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Aug 31 11:22:51 EDT 2010
Author: jmesnil
Date: 2010-08-31 11:22:50 -0400 (Tue, 31 Aug 2010)
New Revision: 9615
Added:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage.java
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
Log:
Clean server shutdown
* when the server shuts down, it sends a DISCONNECT packet to its connections with its node ID.
* ClientSessionFactory will handle the DISCONNECT, trigger connection failures *and after* will notify the server locator that the node is down
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-08-31 12:14:53 UTC (rev 9614)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-08-31 15:22:50 UTC (rev 9615)
@@ -43,6 +43,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.Ping;
import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
@@ -1152,6 +1153,7 @@
if (type == PacketImpl.DISCONNECT)
{
+ final DisconnectMessage msg = (DisconnectMessage)packet;
closeExecutor.execute(new Runnable()
{
// Must be executed on new thread since cannot block the netty thread for a long time and fail can
@@ -1160,6 +1162,10 @@
{
conn.fail(new HornetQException(HornetQException.DISCONNECTED,
"The connection was disconnected because of server shutdown"));
+ if (msg.getNodeID() != null)
+ {
+ serverLocator.notifyNodeDown(msg.getNodeID().toString());
+ }
}
});
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-08-31 12:14:53 UTC (rev 9614)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-08-31 15:22:50 UTC (rev 9615)
@@ -579,12 +579,13 @@
throw new HornetQException(HornetQException.NOT_CONNECTED,
"Cannot connect to server(s). Tried with all available servers.");
}
+ /*
if (topologyArray == null && initialConnectors != null && attempts == initialConnectors.length)
{
throw new HornetQException(HornetQException.NOT_CONNECTED,
"Cannot connect to server(s). Tried with all available servers.");
}
-
+ */
retry = true;
}
else
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2010-08-31 12:14:53 UTC (rev 9614)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2010-08-31 15:22:50 UTC (rev 9615)
@@ -90,6 +90,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.CreateReplicationSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage;
import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
@@ -174,7 +175,7 @@
}
case DISCONNECT:
{
- packet = new PacketImpl(PacketImpl.DISCONNECT);
+ packet = new DisconnectMessage();
break;
}
case EXCEPTION:
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-08-31 12:14:53 UTC (rev 9614)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-08-31 15:22:50 UTC (rev 9615)
@@ -31,6 +31,7 @@
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage;
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.spi.core.remoting.BufferHandler;
@@ -307,11 +308,8 @@
channel.flushConfirmations();
}
- if (nodeID != null)
- {
- channel0.send(new ClusterTopologyChangeMessage(nodeID.toString()));
- }
- channel0.sendAndFlush(new PacketImpl(PacketImpl.DISCONNECT));
+ Packet disconnect = new DisconnectMessage(nodeID);
+ channel0.sendAndFlush(disconnect);
}
public long generateChannelID()
Added: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage.java (rev 0)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage.java 2010-08-31 15:22:50 UTC (rev 9615)
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ *
+ * A Ping
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class DisconnectMessage extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private SimpleString nodeID;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public DisconnectMessage(final SimpleString nodeID)
+ {
+ super(PacketImpl.DISCONNECT);
+
+ this.nodeID = nodeID;
+ }
+
+ public DisconnectMessage()
+ {
+ super(PacketImpl.DISCONNECT);
+ }
+
+ // Public --------------------------------------------------------
+
+ public SimpleString getNodeID()
+ {
+ return nodeID;
+ }
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeNullableSimpleString(nodeID);
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ nodeID = buffer.readNullableSimpleString();
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuffer buf = new StringBuffer(getParentString());
+ buf.append(", nodeID=" + nodeID);
+ buf.append("]");
+ return buf.toString();
+ }
+
+ @Override
+ public boolean equals(final Object other)
+ {
+ if (other instanceof DisconnectMessage == false)
+ {
+ return false;
+ }
+
+ DisconnectMessage r = (DisconnectMessage)other;
+
+ return super.equals(other) && nodeID.equals(r.nodeID);
+ }
+
+ @Override
+ public final boolean isRequiresConfirmations()
+ {
+ return false;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2010-08-31 12:14:53 UTC (rev 9614)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2010-08-31 15:22:50 UTC (rev 9615)
@@ -227,4 +227,52 @@
return serverLocator.createSessionFactory(connector);
}
+
+ @Override
+ public void connectionFailed(HornetQException me)
+ {
+ if (!session.isClosed())
+ {
+ try
+ {
+ session.cleanUp(false);
+ }
+ catch (Exception e)
+ {
+ log.warn("Unable to clean up the session after a connection failure", e);
+ }
+ serverLocator.notifyNodeDown(targetNodeID);
+ if (serverLocator.getDiscoveryAddress() == null)
+ {
+ executor.execute(new Runnable()
+ {
+
+ public void run()
+ {
+ ClientSessionFactory sf = null;
+ do
+ {
+ try
+ {
+ sf = serverLocator.createSessionFactory(connector);
+ }
+ catch (HornetQException e)
+ {
+ if (e.getCode() == HornetQException.NOT_CONNECTED)
+ {
+ continue;
+ }
+ }
+ catch (Exception e)
+ {
+ break;
+ }
+ }
+ while (sf == null);
+ }
+ });
+ }
+ }
+ super.connectionFailed(me);
+ }
}
More information about the hornetq-commits
mailing list