Author: jmesnil
Date: 2009-12-16 08:53:24 -0500 (Wed, 16 Dec 2009)
New Revision: 8695
Added:
trunk/tests/src/org/hornetq/tests/integration/client/IncompatibleVersionTest.java
Modified:
trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
Log:
HORNETQ-252: Connections not closed if client/server version mismatch
* destroy the connection on the client if the server reports an incompatible version
* send a HornetQException to the client, flush it and destroy the connection when the
server
detects an incompatible version from the client
* added IncompatibleVersionTest
Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2009-12-15
15:09:02 UTC (rev 8694)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2009-12-16
13:53:24 UTC (rev 8695)
@@ -378,6 +378,11 @@
}
catch (HornetQException e)
{
+ if (e.getCode() ==
HornetQException.INCOMPATIBLE_CLIENT_SERVER_VERSIONS)
+ {
+ theConnection.destroy();
+ }
+
if (e.getCode() == HornetQException.UNBLOCKED)
{
// This means the thread was blocked on create session and failover
unblocked it
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java 2009-12-15
15:09:02 UTC (rev 8694)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java 2009-12-16
13:53:24 UTC (rev 8695)
@@ -112,6 +112,7 @@
private void handleCreateSession(final CreateSessionMessage request)
{
+ boolean incompatibleVersion = false;
Packet response;
try
{
@@ -133,6 +134,11 @@
if (e instanceof HornetQException)
{
response = new HornetQExceptionMessage((HornetQException)e);
+
+ if (((HornetQException)e).getCode() ==
HornetQException.INCOMPATIBLE_CLIENT_SERVER_VERSIONS)
+ {
+ incompatibleVersion = true;
+ }
}
else
{
@@ -140,8 +146,19 @@
response = new HornetQExceptionMessage(new
HornetQException(HornetQException.INTERNAL_ERROR));
}
}
-
- channel1.send(response);
+
+ // send the exception to the client and destroy
+ // the connection if the client and server versions
+ // are not compatible
+ if (incompatibleVersion)
+ {
+ channel1.sendAndFlush(response);
+ connection.destroy();
+ }
+ else
+ {
+ channel1.send(response);
+ }
}
private void handleReattachSession(final ReattachSessionMessage request)
Added: trunk/tests/src/org/hornetq/tests/integration/client/IncompatibleVersionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/IncompatibleVersionTest.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/client/IncompatibleVersionTest.java 2009-12-16
13:53:24 UTC (rev 8695)
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import static org.hornetq.tests.util.RandomUtil.randomString;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.client.impl.FailoverManagerImpl;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.remoting.Channel;
+import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.remoting.impl.wireformat.CreateSessionMessage;
+import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.VersionLoader;
+
+/**
+ * A IncompatibleVersionTest
+ *
+ * @author jmesnil
+ *
+ *
+ */
+public class IncompatibleVersionTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private HornetQServer server;
+
+ private RemotingConnection connection;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ server = createServer(false, false);
+ server.start();
+
+ TransportConfiguration config = new
TransportConfiguration(InVMConnectorFactory.class.getName());
+ ClientSessionFactory csf = new ClientSessionFactoryImpl(config);
+ ExecutorService executorService = Executors.newFixedThreadPool(1);
+ ScheduledExecutorService scheduledexecutorService =
Executors.newScheduledThreadPool(1);
+ FailoverManagerImpl failoverManager = new FailoverManagerImpl(csf,
+ config,
+ null,
+
ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN,
+
ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
+
ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+
ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
+
ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL,
+
ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+
ClientSessionFactoryImpl.DEFAULT_MAX_RETRY_INTERVAL,
+
ClientSessionFactoryImpl.DEFAULT_RECONNECT_ATTEMPTS,
+ executorService,
+
scheduledexecutorService,
+ null);
+ connection = failoverManager.getConnection();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ server.stop();
+ }
+
+ public void testCompatibleClientVersion() throws Exception
+ {
+ doTestClientVersionCompatibility(true);
+ }
+
+ public void testIncompatibleClientVersion() throws Exception
+ {
+ doTestClientVersionCompatibility(false);
+ }
+
+ private void doTestClientVersionCompatibility(boolean compatible) throws Exception
+ {
+ Channel channel1 = connection.getChannel(1, -1);
+ long sessionChannelID = connection.generateChannelID();
+ int version = VersionLoader.getVersion().getIncrementingVersion();
+ if (!compatible)
+ {
+ version = -1;
+ }
+ Packet request = new CreateSessionMessage(randomString(),
+ sessionChannelID,
+ version,
+ null,
+ null,
+
ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ false,
+ true,
+ true,
+ false,
+
ClientSessionFactoryImpl.DEFAULT_CONFIRMATION_WINDOW_SIZE);
+
+ if (compatible)
+ {
+ Packet packet = channel1.sendBlocking(request);
+ assertNotNull(packet);
+ assertTrue(packet instanceof CreateSessionResponseMessage);
+ // 1 connection on the server
+ assertEquals(1, server.getConnectionCount());
+ }
+ else
+ {
+ try
+ {
+ channel1.sendBlocking(request);
+ fail();
+ }
+ catch (HornetQException e)
+ {
+ assertEquals(HornetQException.INCOMPATIBLE_CLIENT_SERVER_VERSIONS,
e.getCode());
+ }
+ // no connection on the server
+ assertEquals(0, server.getConnectionCount());
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}