[hornetq-commits] JBoss hornetq SVN: r8623 - in trunk: src/main/org/hornetq/core/remoting/impl/wireformat and 6 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Dec 8 13:40:16 EST 2009
Author: timfox
Date: 2009-12-08 13:40:15 -0500 (Tue, 08 Dec 2009)
New Revision: 8623
Added:
trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSReconnectTest.java
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/CreateQueueMessage.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionQueueQueryResponseMessage.java
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/jms/client/HornetQConnection.java
trunk/src/main/org/hornetq/jms/client/HornetQMessageConsumer.java
trunk/src/main/org/hornetq/jms/client/HornetQSession.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-121 and https://jira.jboss.org/jira/browse/HORNETQ-238
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-12-08 17:05:54 UTC (rev 8622)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-12-08 18:40:15 UTC (rev 8623)
@@ -27,6 +27,7 @@
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionReceiveLargeMessage;
import org.hornetq.utils.Future;
@@ -112,6 +113,8 @@
private boolean stopped = false;
private final AtomicLong forceDeliveryCount = new AtomicLong(0);
+
+ private final SessionQueueQueryResponseMessage queueInfo;
// Constructors
// ---------------------------------------------------------------------------------
@@ -125,7 +128,8 @@
final int ackBatchSize,
final TokenBucketLimiter rateLimiter,
final Executor executor,
- final Channel channel)
+ final Channel channel,
+ final SessionQueueQueryResponseMessage queueInfo)
{
this.id = id;
@@ -146,6 +150,8 @@
this.clientWindowSize = clientWindowSize;
this.ackBatchSize = ackBatchSize;
+
+ this.queueInfo = queueInfo;
}
// ClientConsumer implementation
@@ -424,6 +430,11 @@
// ClientConsumerInternal implementation
// --------------------------------------------------------------
+ public SessionQueueQueryResponseMessage getQueueInfo()
+ {
+ return queueInfo;
+ }
+
public long getID()
{
return id;
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java 2009-12-08 17:05:54 UTC (rev 8622)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java 2009-12-08 18:40:15 UTC (rev 8623)
@@ -16,6 +16,7 @@
import org.hornetq.core.client.ClientConsumer;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionReceiveLargeMessage;
import org.hornetq.utils.SimpleString;
@@ -62,4 +63,6 @@
void stop() throws HornetQException;
void start();
+
+ SessionQueueQueryResponseMessage getQueueInfo();
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-12-08 17:05:54 UTC (rev 8622)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-12-08 18:40:15 UTC (rev 8623)
@@ -922,6 +922,23 @@
for (Map.Entry<Long, ClientConsumerInternal> entry : consumers.entrySet())
{
+ SessionQueueQueryResponseMessage queueInfo = entry.getValue().getQueueInfo();
+
+ // We try and recreate any non durable queues, since they probably won't be there unless
+ // they are defined in hornetq-configuration.xml
+ // This allows e.g. JMS non durable subs and temporary queues to continue to be used after failover
+ if (!queueInfo.isDurable())
+ {
+ CreateQueueMessage createQueueRequest = new CreateQueueMessage(queueInfo.getAddress(),
+ queueInfo.getName(),
+ queueInfo.getFilterString(),
+ false,
+ queueInfo.isTemporary(),
+ false);
+
+ sendPacketWithoutLock(createQueueRequest);
+ }
+
SessionCreateConsumerMessage createConsumerRequest = new SessionCreateConsumerMessage(entry.getKey(),
entry.getValue()
.getQueueName(),
@@ -931,14 +948,8 @@
.isBrowseOnly(),
false);
- createConsumerRequest.setChannelID(channel.getID());
-
- Connection conn = channel.getConnection().getTransportConnection();
-
- HornetQBuffer buffer = createConsumerRequest.encode(channel.getConnection());
-
- conn.write(buffer, false);
-
+ sendPacketWithoutLock(createConsumerRequest);
+
int clientWindowSize = entry.getValue().getClientWindowSize();
if (clientWindowSize != 0)
@@ -946,11 +957,7 @@
SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(entry.getKey(),
clientWindowSize);
- packet.setChannelID(channel.getID());
-
- buffer = packet.encode(channel.getConnection());
-
- conn.write(buffer, false);
+ sendPacketWithoutLock(packet);
}
}
@@ -1006,6 +1013,17 @@
}
}
+ private void sendPacketWithoutLock(final Packet packet)
+ {
+ packet.setChannelID(channel.getID());
+
+ Connection conn = channel.getConnection().getTransportConnection();
+
+ HornetQBuffer buffer = packet.encode(channel.getConnection());
+
+ conn.write(buffer, false);
+ }
+
public void workDone()
{
workDone = true;
@@ -1478,7 +1496,7 @@
browseOnly,
true);
- channel.sendBlocking(request);
+ SessionQueueQueryResponseMessage queueInfo = (SessionQueueQueryResponseMessage)channel.sendBlocking(request);
// The actual windows size that gets used is determined by the user since
// could be overridden on the queue settings
@@ -1497,7 +1515,8 @@
false)
: null,
executor,
- channel);
+ channel,
+ queueInfo);
addConsumer(consumer);
@@ -1546,7 +1565,7 @@
throw new HornetQException(HornetQException.INTERNAL_ERROR, "Queue can not be both durable and temporay");
}
- CreateQueueMessage request = new CreateQueueMessage(address, queueName, filterString, durable, temp);
+ CreateQueueMessage request = new CreateQueueMessage(address, queueName, filterString, durable, temp, true);
channel.sendBlocking(request);
}
Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2009-12-08 17:05:54 UTC (rev 8622)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2009-12-08 18:40:15 UTC (rev 8623)
@@ -546,7 +546,7 @@
private void failoverOrReconnect(final Object connectionID, final HornetQException me)
{
Set<ClientSessionInternal> sessionsToClose = null;
-
+
synchronized (failoverLock)
{
if (connection == null || connection.getID() != connectionID)
@@ -555,6 +555,7 @@
// over then a async connection exception or disconnect
// came in for one of the already exitLoop connections, so we return true - we don't want to call the
// listeners again
+
return;
}
@@ -602,7 +603,7 @@
{
attemptReconnect = reconnectAttempts != 0;
}
-
+
if (attemptFailover || attemptReconnect)
{
lockChannel1();
Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/CreateQueueMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/CreateQueueMessage.java 2009-12-08 17:05:54 UTC (rev 8622)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/CreateQueueMessage.java 2009-12-08 18:40:15 UTC (rev 8623)
@@ -39,6 +39,8 @@
private boolean durable;
private boolean temporary;
+
+ private boolean requiresResponse;
// Static --------------------------------------------------------
@@ -48,7 +50,8 @@
final SimpleString queueName,
final SimpleString filterString,
final boolean durable,
- final boolean temporary)
+ final boolean temporary,
+ final boolean requiresResponse)
{
super(PacketImpl.CREATE_QUEUE);
@@ -57,6 +60,7 @@
this.filterString = filterString;
this.durable = durable;
this.temporary = temporary;
+ this.requiresResponse = requiresResponse;
}
public CreateQueueMessage()
@@ -103,6 +107,11 @@
{
return temporary;
}
+
+ public boolean isRequiresResponse()
+ {
+ return requiresResponse;
+ }
@Override
public void encodeRest(final HornetQBuffer buffer)
@@ -112,6 +121,7 @@
buffer.writeNullableSimpleString(filterString);
buffer.writeBoolean(durable);
buffer.writeBoolean(temporary);
+ buffer.writeBoolean(requiresResponse);
}
@Override
@@ -122,6 +132,7 @@
filterString = buffer.readNullableSimpleString();
durable = buffer.readBoolean();
temporary = buffer.readBoolean();
+ requiresResponse = buffer.readBoolean();
}
@Override
Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionQueueQueryResponseMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionQueueQueryResponseMessage.java 2009-12-08 17:05:54 UTC (rev 8622)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionQueueQueryResponseMessage.java 2009-12-08 18:40:15 UTC (rev 8623)
@@ -25,6 +25,8 @@
*/
public class SessionQueueQueryResponseMessage extends PacketImpl
{
+ private SimpleString name;
+
private boolean exists;
private boolean durable;
@@ -36,31 +38,39 @@
private SimpleString filterString;
private SimpleString address;
+
+ private boolean temporary;
- public SessionQueueQueryResponseMessage(final boolean durable,
+ public SessionQueueQueryResponseMessage(final SimpleString name,
+ final SimpleString address,
+ final boolean durable,
+ final boolean temporary,
+ final SimpleString filterString,
final int consumerCount,
- final int messageCount,
- final SimpleString filterString,
- final SimpleString address)
+ final int messageCount)
{
- this(durable, consumerCount, messageCount, filterString, address, true);
+ this(name, address, durable, temporary, filterString, consumerCount, messageCount, true);
}
public SessionQueueQueryResponseMessage()
{
- this(false, 0, 0, null, null, false);
+ this(null, null, false, false, null, 0, 0, false);
}
- private SessionQueueQueryResponseMessage(final boolean durable,
+ private SessionQueueQueryResponseMessage(final SimpleString name,
+ final SimpleString address,
+ final boolean durable,
+ final boolean temporary,
+ final SimpleString filterString,
final int consumerCount,
final int messageCount,
- final SimpleString filterString,
- final SimpleString address,
final boolean exists)
{
super(PacketImpl.SESS_QUEUEQUERY_RESP);
this.durable = durable;
+
+ this.temporary = temporary;
this.consumerCount = consumerCount;
@@ -69,6 +79,8 @@
this.filterString = filterString;
this.address = address;
+
+ this.name = name;
this.exists = exists;
}
@@ -108,16 +120,28 @@
{
return address;
}
+
+ public SimpleString getName()
+ {
+ return name;
+ }
+
+ public boolean isTemporary()
+ {
+ return temporary;
+ }
@Override
public void encodeRest(final HornetQBuffer buffer)
{
buffer.writeBoolean(exists);
buffer.writeBoolean(durable);
+ buffer.writeBoolean(temporary);
buffer.writeInt(consumerCount);
buffer.writeInt(messageCount);
buffer.writeNullableSimpleString(filterString);
buffer.writeNullableSimpleString(address);
+ buffer.writeNullableSimpleString(name);
}
@Override
@@ -125,10 +149,12 @@
{
exists = buffer.readBoolean();
durable = buffer.readBoolean();
+ temporary = buffer.readBoolean();
consumerCount = buffer.readInt();
messageCount = buffer.readInt();
filterString = buffer.readNullableSimpleString();
address = buffer.readNullableSimpleString();
+ name = buffer.readNullableSimpleString();
}
@Override
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-12-08 17:05:54 UTC (rev 8622)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-12-08 18:40:15 UTC (rev 8623)
@@ -367,8 +367,7 @@
}
private void updateConnectors(final Map<String, DiscoveryEntry> connectors) throws Exception
- {
- System.out.println("ClusterConnectionImpl.updateConnectors");
+ {
Iterator<Map.Entry<String, MessageFlowRecord>> iter = records.entrySet().iterator();
while (iter.hasNext())
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-12-08 17:05:54 UTC (rev 8622)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-12-08 18:40:15 UTC (rev 8623)
@@ -376,7 +376,7 @@
public void handleCreateConsumer(final SessionCreateConsumerMessage packet)
{
SimpleString name = packet.getQueueName();
-
+
SimpleString filterString = packet.getFilterString();
boolean browseOnly = packet.isBrowseOnly();
@@ -437,7 +437,17 @@
managementService.sendNotification(notification);
}
- response = new NullResponseMessage();
+ //We send back queue information on the queue as a response- this allows the queue to
+ //be automaticall recreated on failover
+
+ if (packet.isRequiresResponse())
+ {
+ response = doExecuteQueueQuery(name);
+ }
+ else
+ {
+ response = null;
+ }
}
catch (Exception e)
{
@@ -451,7 +461,7 @@
response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
}
}
-
+
sendResponse(packet, response, false, false);
}
@@ -460,7 +470,7 @@
SimpleString address = packet.getAddress();
final SimpleString name = packet.getQueueName();
-
+
SimpleString filterString = packet.getFilterString();
boolean temporary = packet.isTemporary();
@@ -510,7 +520,14 @@
});
}
- response = new NullResponseMessage();
+ if (packet.isRequiresResponse())
+ {
+ response = new NullResponseMessage();
+ }
+ else
+ {
+ response = null;
+ }
}
catch (Exception e)
{
@@ -562,7 +579,7 @@
sendResponse(packet, response, false, false);
}
-
+
public void handleExecuteQueueQuery(final SessionQueueQueryMessage packet)
{
SimpleString name = packet.getQueueName();
@@ -571,35 +588,7 @@
try
{
- if (name == null)
- {
- throw new IllegalArgumentException("Queue name is null");
- }
-
- Binding binding = postOffice.getBinding(name);
-
- if (binding != null && binding.getType() == BindingType.LOCAL_QUEUE)
- {
- Queue queue = (Queue)binding.getBindable();
-
- Filter filter = queue.getFilter();
-
- SimpleString filterString = filter == null ? null : filter.getFilterString();
- response = new SessionQueueQueryResponseMessage(queue.isDurable(),
- queue.getConsumerCount(),
- queue.getMessageCount(),
- filterString,
- binding.getAddress());
- }
- // make an exception for the management address (see HORNETQ-29)
- else if (name.equals(managementAddress))
- {
- response = new SessionQueueQueryResponseMessage(true, -1, -1, null, managementAddress);
- }
- else
- {
- response = new SessionQueueQueryResponseMessage();
- }
+ response = doExecuteQueueQuery(name);
}
catch (Exception e)
{
@@ -1419,6 +1408,7 @@
if (consumer == null)
{
ServerSessionImpl.log.error("There is no consumer with id " + packet.getConsumerID());
+
return;
}
@@ -1710,6 +1700,46 @@
// Private
// ----------------------------------------------------------------------------
+ private SessionQueueQueryResponseMessage doExecuteQueueQuery(final SimpleString name) throws Exception
+ {
+ if (name == null)
+ {
+ throw new IllegalArgumentException("Queue name is null");
+ }
+
+ SessionQueueQueryResponseMessage response;
+
+ Binding binding = postOffice.getBinding(name);
+
+ if (binding != null && binding.getType() == BindingType.LOCAL_QUEUE)
+ {
+ Queue queue = (Queue)binding.getBindable();
+
+ Filter filter = queue.getFilter();
+
+ SimpleString filterString = filter == null ? null : filter.getFilterString();
+
+ response = new SessionQueueQueryResponseMessage(name,
+ binding.getAddress(),
+ queue.isDurable(),
+ queue.isTemporary(),
+ filterString,
+ queue.getConsumerCount(),
+ queue.getMessageCount());
+ }
+ // make an exception for the management address (see HORNETQ-29)
+ else if (name.equals(managementAddress))
+ {
+ response = new SessionQueueQueryResponseMessage(name, managementAddress, true, false, null, -1, -1);
+ }
+ else
+ {
+ response = new SessionQueueQueryResponseMessage();
+ }
+
+ return response;
+ }
+
private void sendResponse(final Packet confirmPacket,
final Packet response,
final boolean flush,
Modified: trunk/src/main/org/hornetq/jms/client/HornetQConnection.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQConnection.java 2009-12-08 17:05:54 UTC (rev 8622)
+++ trunk/src/main/org/hornetq/jms/client/HornetQConnection.java 2009-12-08 18:40:15 UTC (rev 8623)
@@ -261,16 +261,26 @@
{
if (!tempQueues.isEmpty())
{
- if (initialSession == null)
- {
- initialSession = sessionFactory.createSession(username, password, false, true, true, false, 0);
- }
+// if (initialSession == null)
+// {
+// initialSession = sessionFactory.createSession(username, password, false, true, true, false, 0);
+// }
// Remove any temporary queues
for (SimpleString queueName : tempQueues)
{
- initialSession.deleteQueue(queueName);
+ if (!initialSession.isClosed())
+ {
+ try
+ {
+ initialSession.deleteQueue(queueName);
+ }
+ catch (HornetQException ignore)
+ {
+ //Exception on deleting queue shouldn't prevent close from completing
+ }
+ }
}
}
}
Modified: trunk/src/main/org/hornetq/jms/client/HornetQMessageConsumer.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQMessageConsumer.java 2009-12-08 17:05:54 UTC (rev 8622)
+++ trunk/src/main/org/hornetq/jms/client/HornetQMessageConsumer.java 2009-12-08 18:40:15 UTC (rev 8623)
@@ -145,7 +145,7 @@
if (autoDeleteQueueName != null)
{
- // If non durable subscriber need to delete subscription too
+ // If non durable subscriber need to delete subscription too
session.deleteQueue(autoDeleteQueueName);
}
Modified: trunk/src/main/org/hornetq/jms/client/HornetQSession.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQSession.java 2009-12-08 17:05:54 UTC (rev 8622)
+++ trunk/src/main/org/hornetq/jms/client/HornetQSession.java 2009-12-08 18:40:15 UTC (rev 8623)
@@ -939,19 +939,7 @@
throw JMSExceptionHelper.convertFromHornetQException(e);
}
}
-
- public void deleteQueue(final SimpleString queueName) throws JMSException
- {
- try
- {
- session.deleteQueue(queueName);
- }
- catch (HornetQException e)
- {
- throw JMSExceptionHelper.convertFromHornetQException(e);
- }
- }
-
+
public void start() throws JMSException
{
try
@@ -983,6 +971,21 @@
// Package protected ---------------------------------------------
+ void deleteQueue(final SimpleString queueName) throws JMSException
+ {
+ if (!session.isClosed())
+ {
+ try
+ {
+ session.deleteQueue(queueName);
+ }
+ catch (HornetQException ignore)
+ {
+ //Exception on deleting queue shouldn't prevent close from completing
+ }
+ }
+ }
+
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2009-12-08 17:05:54 UTC (rev 8622)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2009-12-08 18:40:15 UTC (rev 8623)
@@ -1350,7 +1350,7 @@
{
ClientSessionFactoryInternal sf = getSessionFactory();
- ClientSession session = sendAndConsume(sf);
+ ClientSession session = sendAndConsume(sf, true);
final CountDownLatch latch = new CountDownLatch(1);
@@ -1379,7 +1379,7 @@
sf = new ClientSessionFactoryImpl(getConnectorTransportConfiguration(false));
- session = sendAndConsume(sf);
+ session = sendAndConsume(sf, false);
session.close();
@@ -1757,8 +1757,28 @@
Assert.assertEquals(0, sf.numConnections());
}
- public void testSimpleSendAfterFailover() throws Exception
+ public void testSimpleSendAfterFailoverDurableTemporary() throws Exception
{
+ testSimpleSendAfterFailover(true, true);
+ }
+
+ public void testSimpleSendAfterFailoverNonDurableTemporary() throws Exception
+ {
+ testSimpleSendAfterFailover(false, true);
+ }
+
+ public void testSimpleSendAfterFailoverDurableNonTemporary() throws Exception
+ {
+ testSimpleSendAfterFailover(true, false);
+ }
+
+ public void testSimpleSendAfterFailoverNonDurableNonTemporary() throws Exception
+ {
+ testSimpleSendAfterFailover(false, false);
+ }
+
+ private void testSimpleSendAfterFailover(final boolean durable, final boolean temporary) throws Exception
+ {
ClientSessionFactoryInternal sf = getSessionFactory();
sf.setBlockOnNonDurableSend(true);
@@ -1767,8 +1787,15 @@
ClientSession session = sf.createSession(true, true, 0);
- session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
-
+ if (temporary)
+ {
+ session.createTemporaryQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null);
+ }
+ else
+ {
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, durable);
+ }
+
final CountDownLatch latch = new CountDownLatch(1);
class MyListener extends BaseListener
@@ -2254,11 +2281,14 @@
// Private -------------------------------------------------------
- private ClientSession sendAndConsume(final ClientSessionFactory sf) throws Exception
+ private ClientSession sendAndConsume(final ClientSessionFactory sf, final boolean createQueue) throws Exception
{
ClientSession session = sf.createSession(false, true, true);
- session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, false);
+ if (createQueue)
+ {
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, false);
+ }
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
Added: trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSReconnectTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSReconnectTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSReconnectTest.java 2009-12-08 18:40:15 UTC (rev 8623)
@@ -0,0 +1,371 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.jms.cluster;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import junit.framework.Assert;
+
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.impl.invm.InVMRegistry;
+import org.hornetq.core.server.HornetQ;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.jms.HornetQQueue;
+import org.hornetq.jms.HornetQTopic;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQSession;
+import org.hornetq.tests.util.RandomUtil;
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.SimpleString;
+
+/**
+ *
+ * A JMSReconnectTest
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class JMSReconnectTest extends UnitTestCase
+{
+ private static final Logger log = Logger.getLogger(JMSReconnectTest.class);
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private HornetQServer liveService;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ //In this test we re-attach to the same node without restarting the server
+ public void testReattachSameNode() throws Exception
+ {
+ testReconnectOrReattachSameNode(true);
+ }
+
+ //In this test, we reconnect to the same node without restarting the server
+ public void testReconnectSameNode() throws Exception
+ {
+ testReconnectOrReattachSameNode(false);
+ }
+
+ private void testReconnectOrReattachSameNode(boolean reattach) throws Exception
+ {
+ HornetQConnectionFactory jbcf = new HornetQConnectionFactory(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ jbcf.setBlockOnDurableSend(true);
+ jbcf.setBlockOnNonDurableSend(true);
+
+ jbcf.setReconnectAttempts(-1);
+
+ if (reattach)
+ {
+ jbcf.setConfirmationWindowSize(1024 * 1024);
+ }
+
+ // Note we set consumer window size to a value so we can verify that consumer credit re-sending
+ // works properly on failover
+ // The value is small enough that credits will have to be resent several time
+
+ final int numMessages = 10;
+
+ final int bodySize = 1000;
+
+ jbcf.setConsumerWindowSize(numMessages * bodySize / 10);
+
+ Connection conn = jbcf.createConnection();
+
+ MyExceptionListener listener = new MyExceptionListener();
+
+ conn.setExceptionListener(listener);
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ ClientSession coreSession = ((HornetQSession)sess).getCoreSession();
+
+ RemotingConnection coreConn = ((ClientSessionInternal)coreSession).getConnection();
+
+ SimpleString jmsQueueName = new SimpleString(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX + "myqueue");
+
+ coreSession.createQueue(jmsQueueName, jmsQueueName, null, true);
+
+ Queue queue = sess.createQueue("myqueue");
+
+ MessageProducer producer = sess.createProducer(queue);
+
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ MessageConsumer consumer = sess.createConsumer(queue);
+
+ byte[] body = RandomUtil.randomBytes(bodySize);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ BytesMessage bm = sess.createBytesMessage();
+
+ bm.writeBytes(body);
+
+ producer.send(bm);
+ }
+
+ conn.start();
+
+ log.info("sent messages and started connection");
+
+ Thread.sleep(2000);
+
+ HornetQException me = new HornetQException(HornetQException.NOT_CONNECTED);
+
+ coreConn.fail(me);
+
+ //It should reconnect to the same node
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ log.info("got message " + i);
+
+ BytesMessage bm = (BytesMessage)consumer.receive(1000);
+
+ Assert.assertNotNull(bm);
+
+ Assert.assertEquals(body.length, bm.getBodyLength());
+ }
+
+ TextMessage tm = (TextMessage)consumer.receiveNoWait();
+
+ Assert.assertNull(tm);
+
+ conn.close();
+
+ Assert.assertNotNull(listener.e);
+
+ Assert.assertTrue(me == listener.e.getCause());
+ }
+
+ public void testReconnectSameNodeServerRestartedWithNonDurableSub() throws Exception
+ {
+ testReconnectSameNodeServerRestartedWithNonDurableSubOrTempQueue(true);
+ }
+
+ public void testReconnectSameNodeServerRestartedWithTempQueue() throws Exception
+ {
+ testReconnectSameNodeServerRestartedWithNonDurableSubOrTempQueue(false);
+ }
+
+ //Test that non durable JMS sub gets recreated in auto reconnect
+ private void testReconnectSameNodeServerRestartedWithNonDurableSubOrTempQueue(final boolean durableSub) throws Exception
+ {
+ HornetQConnectionFactory jbcf = new HornetQConnectionFactory(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ jbcf.setReconnectAttempts(-1);
+
+ Connection conn = jbcf.createConnection();
+
+ MyExceptionListener listener = new MyExceptionListener();
+
+ conn.setExceptionListener(listener);
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ ClientSession coreSession = ((HornetQSession)sess).getCoreSession();
+
+ Destination dest;
+
+ if (durableSub)
+ {
+ coreSession.createQueue(HornetQTopic.JMS_TOPIC_ADDRESS_PREFIX + "mytopic", "blahblah", null, false);
+
+ dest = new HornetQTopic("mytopic");
+ }
+ else
+ {
+ dest = sess.createTemporaryQueue();
+ }
+
+ MessageProducer producer = sess.createProducer(dest);
+
+ //Create a non durable subscriber
+ MessageConsumer consumer = sess.createConsumer(dest);
+
+ this.liveService.stop();
+
+ this.liveService.start();
+
+ //Allow client some time to reconnect
+ Thread.sleep(3000);
+
+ log.info("now sending some messages");
+
+ final int numMessages = 100;
+
+ byte[] body = RandomUtil.randomBytes(1000);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ BytesMessage bm = sess.createBytesMessage();
+
+ bm.writeBytes(body);
+
+ producer.send(bm);
+ }
+
+ conn.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ BytesMessage bm = (BytesMessage)consumer.receive(1000);
+
+ Assert.assertNotNull(bm);
+
+ Assert.assertEquals(body.length, bm.getBodyLength());
+ }
+
+ TextMessage tm = (TextMessage)consumer.receiveNoWait();
+
+ Assert.assertNull(tm);
+
+ conn.close();
+
+ Assert.assertNotNull(listener.e);
+ }
+
+ //If the server is shutdown after a non durable sub is created, then close on the connection should proceed normally
+ public void testNoReconnectCloseAfterFailToReconnectWithTopicConsumer() throws Exception
+ {
+ HornetQConnectionFactory jbcf = new HornetQConnectionFactory(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ jbcf.setReconnectAttempts(0);
+
+ Connection conn = jbcf.createConnection();
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ ClientSession coreSession = ((HornetQSession)sess).getCoreSession();
+
+ coreSession.createQueue(HornetQTopic.JMS_TOPIC_ADDRESS_PREFIX + "mytopic", "blahblah", null, false);
+
+ Topic topic = new HornetQTopic("mytopic");
+
+ //Create a non durable subscriber
+ MessageConsumer consumer = sess.createConsumer(topic);
+
+ Thread.sleep(2000);
+
+ this.liveService.stop();
+
+ this.liveService.start();
+
+ sess.close();
+
+ conn.close();
+ }
+
+ //If server is shutdown, and then connection is closed, after a temp queue has been created, the close should complete normally
+ public void testNoReconnectCloseAfterFailToReconnectWithTempQueue() throws Exception
+ {
+ HornetQConnectionFactory jbcf = new HornetQConnectionFactory(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ jbcf.setReconnectAttempts(0);
+
+ Connection conn = jbcf.createConnection();
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ sess.createTemporaryQueue();
+
+ Thread.sleep(2000);
+
+ this.liveService.stop();
+
+ this.liveService.start();
+
+ sess.close();
+
+ conn.close();
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ Configuration liveConf = new ConfigurationImpl();
+ liveConf.setSecurityEnabled(false);
+ liveConf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory"));
+ liveConf.setBindingsDirectory(getBindingsDir());
+ liveConf.setJournalMinFiles(2);
+ liveConf.setJournalDirectory(getJournalDir());
+ liveConf.setPagingDirectory(getPageDir());
+ liveConf.setLargeMessagesDirectory(getLargeMessagesDir());
+
+ liveService = HornetQ.newHornetQServer(liveConf, true);
+ liveService.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ liveService.stop();
+
+ Assert.assertEquals(0, InVMRegistry.instance.size());
+
+ liveService = null;
+
+ super.tearDown();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+ private static class MyExceptionListener implements ExceptionListener
+ {
+ volatile JMSException e;
+
+ public void onException(final JMSException e)
+ {
+ this.e = e;
+ }
+ }
+
+}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java 2009-12-08 17:05:54 UTC (rev 8622)
+++ trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java 2009-12-08 18:40:15 UTC (rev 8623)
@@ -35,6 +35,7 @@
import org.hornetq.core.client.impl.ClientMessageInternal;
import org.hornetq.core.client.impl.LargeMessageBufferImpl;
import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionReceiveLargeMessage;
import org.hornetq.tests.util.RandomUtil;
@@ -724,6 +725,12 @@
}
+ public SessionQueueQueryResponseMessage getQueueInfo()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
}
More information about the hornetq-commits
mailing list