Author: timfox
Date: 2010-10-06 07:24:44 -0400 (Wed, 06 Oct 2010)
New Revision: 9758
Added:
trunk/tests/src/org/hornetq/tests/integration/stomp/StompConnectionCleanupTest.java
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java
Modified:
trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java
trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-526
Modified: trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
---
trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-10-06
08:18:15 UTC (rev 9757)
+++
trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-10-06
11:24:44 UTC (rev 9758)
@@ -107,7 +107,7 @@
* @author <a href="mailto:andy.taylor@jboss.org>Andy Taylor</a>
* @author <a href="mailto:clebert.suconic@jboss.org>Clebert
Suconic</a>
*/
-public class ServerSessionPacketHandler implements ChannelHandler, CloseListener,
FailureListener
+public class ServerSessionPacketHandler implements ChannelHandler
{
private static final Logger log = Logger.getLogger(ServerSessionPacketHandler.class);
@@ -150,8 +150,6 @@
{
direct = false;
}
-
- addConnectionListeners();
}
public long getID()
@@ -159,22 +157,6 @@
return channel.getID();
}
- public void connectionFailed(final HornetQException exception)
- {
- log.warn("Client connection failed, clearing up resources for session " +
session.getName());
-
- try
- {
- session.close(true);
- }
- catch (Exception e)
- {
- log.error("Failed to close session", e);
- }
-
- log.warn("Cleared up resources for session " + session.getName());
- }
-
public void close()
{
channel.flushConfirmations();
@@ -189,22 +171,6 @@
}
}
- public void connectionClosed()
- {
- }
-
- private void addConnectionListeners()
- {
- remotingConnection.addFailureListener(this);
- remotingConnection.addCloseListener(this);
- }
-
- private void removeConnectionListeners()
- {
- remotingConnection.removeFailureListener(this);
- remotingConnection.removeCloseListener(this);
- }
-
public Channel getChannel()
{
return channel;
@@ -423,7 +389,7 @@
{
requiresResponse = true;
session.close(false);
- removeConnectionListeners();
+ // removeConnectionListeners();
response = new NullResponseMessage();
flush = true;
closeChannel = true;
@@ -601,10 +567,10 @@
// might be executed
// before we have transferred the connection, leaving it in a started state
session.setTransferring(true);
-
- remotingConnection.removeFailureListener(this);
- remotingConnection.removeCloseListener(this);
-
+
+ List<CloseListener> closeListeners =
remotingConnection.removeCloseListeners();
+ List<FailureListener> failureListeners =
remotingConnection.removeFailureListeners();
+
// Note. We do not destroy the replicating connection here. In the case the live
server has really crashed
// then the connection will get cleaned up anyway when the server ping timeout
kicks in.
// In the case the live server is really still up, i.e. a split brain situation (or
in tests), then closing
@@ -618,8 +584,8 @@
remotingConnection = newConnection;
- remotingConnection.addFailureListener(this);
- remotingConnection.addCloseListener(this);
+ remotingConnection.setCloseListeners(closeListeners);
+ remotingConnection.setFailureListeners(failureListeners);
int serverLastReceivedCommandID = channel.getLastConfirmedCommandID();
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java 2010-10-06
08:18:15 UTC (rev 9757)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java 2010-10-06
11:24:44 UTC (rev 9758)
@@ -27,7 +27,7 @@
/**
* A CoreSessionCallback
*
- * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ * @author Tim Fox
*
*
*/
@@ -40,7 +40,7 @@
private ProtocolManager protocolManager;
private String name;
-
+
public CoreSessionCallback(String name, ProtocolManager protocolManager, Channel
channel)
{
this.name = name;
@@ -54,8 +54,8 @@
channel.send(packet);
- int size = packet.getPacketSize();
-
+ int size = packet.getPacketSize();
+
return size;
}
@@ -67,15 +67,15 @@
return packet.getPacketSize();
}
-
+
public int sendMessage(ServerMessage message, long consumerID, int deliveryCount)
{
Packet packet = new SessionReceiveMessage(consumerID, message, deliveryCount);
channel.sendBatched(packet);
-
- int size = packet.getPacketSize();
+ int size = packet.getPacketSize();
+
return size;
}
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-10-06
08:18:15 UTC (rev 9757)
+++
trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-10-06
11:24:44 UTC (rev 9758)
@@ -225,6 +225,31 @@
return closeListeners.remove(listener);
}
+ public List<CloseListener> removeCloseListeners()
+ {
+ List<CloseListener> ret = new
ArrayList<CloseListener>(closeListeners);
+
+ closeListeners.clear();
+
+ return ret;
+ }
+
+ public List<FailureListener> removeFailureListeners()
+ {
+ List<FailureListener> ret = new
ArrayList<FailureListener>(failureListeners);
+
+ failureListeners.clear();
+
+ return ret;
+ }
+
+ public void setCloseListeners(List<CloseListener> listeners)
+ {
+ closeListeners.clear();
+
+ closeListeners.addAll(listeners);
+ }
+
public HornetQBuffer createBuffer(final int size)
{
return transportConnection.createBuffer(size);
@@ -471,6 +496,7 @@
channels.clear();
}
}
+
private void callFailureListeners(final HornetQException me)
{
final List<FailureListener> listenersClone = new
ArrayList<FailureListener>(failureListeners);
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-10-06
08:18:15 UTC (rev 9757)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-10-06
11:24:44 UTC (rev 9758)
@@ -13,8 +13,10 @@
package org.hornetq.core.protocol.stomp;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
@@ -32,8 +34,9 @@
*
*
*/
-class StompConnection implements RemotingConnection
+public class StompConnection implements RemotingConnection
{
+
private static final Logger log = Logger.getLogger(StompConnection.class);
private final StompProtocolManager manager;
@@ -49,9 +52,17 @@
private boolean valid;
private boolean destroyed = false;
-
+
private StompDecoder decoder = new StompDecoder();
+
+ private final List<FailureListener> failureListeners = new
CopyOnWriteArrayList<FailureListener>();
+
+ private final List<CloseListener> closeListeners = new
CopyOnWriteArrayList<CloseListener>();
+
+ private final Object failLock = new Object();
+ private volatile boolean dataReceived;
+
public StompDecoder getDecoder()
{
return decoder;
@@ -64,17 +75,90 @@
this.manager = manager;
}
- public void addCloseListener(CloseListener listener)
+ public void addFailureListener(final FailureListener listener)
{
+ if (listener == null)
+ {
+ throw new IllegalStateException("FailureListener cannot be null");
+ }
+
+ failureListeners.add(listener);
}
- public void addFailureListener(FailureListener listener)
+ public boolean removeFailureListener(final FailureListener listener)
{
+ if (listener == null)
+ {
+ throw new IllegalStateException("FailureListener cannot be null");
+ }
+
+ return failureListeners.remove(listener);
}
+ public void addCloseListener(final CloseListener listener)
+ {
+ if (listener == null)
+ {
+ throw new IllegalStateException("CloseListener cannot be null");
+ }
+
+ closeListeners.add(listener);
+ }
+
+ public boolean removeCloseListener(final CloseListener listener)
+ {
+ if (listener == null)
+ {
+ throw new IllegalStateException("CloseListener cannot be null");
+ }
+
+ return closeListeners.remove(listener);
+ }
+
+ public List<CloseListener> removeCloseListeners()
+ {
+ List<CloseListener> ret = new
ArrayList<CloseListener>(closeListeners);
+
+ closeListeners.clear();
+
+ return ret;
+ }
+
+ public List<FailureListener> removeFailureListeners()
+ {
+ List<FailureListener> ret = new
ArrayList<FailureListener>(failureListeners);
+
+ failureListeners.clear();
+
+ return ret;
+ }
+
+ public void setCloseListeners(List<CloseListener> listeners)
+ {
+ closeListeners.clear();
+
+ closeListeners.addAll(listeners);
+ }
+
+ public void setFailureListeners(final List<FailureListener> listeners)
+ {
+ failureListeners.clear();
+
+ failureListeners.addAll(listeners);
+ }
+
+ public void setDataReceived()
+ {
+ dataReceived = true;
+ }
+
public boolean checkDataReceived()
{
- return true;
+ boolean res = dataReceived;
+
+ dataReceived = false;
+
+ return res;
}
public HornetQBuffer createBuffer(int size)
@@ -84,13 +168,23 @@
public void destroy()
{
- if (destroyed)
+ synchronized (failLock)
{
- return;
+ if (destroyed)
+ {
+ return;
+ }
}
destroyed = true;
+ internalClose();
+
+ callClosingListeners();
+ }
+
+ private void internalClose()
+ {
transportConnection.close();
manager.cleanup(this);
@@ -100,8 +194,29 @@
{
}
- public void fail(HornetQException me)
+ public void fail(final HornetQException me)
{
+ synchronized (failLock)
+ {
+ if (destroyed)
+ {
+ return;
+ }
+
+ destroyed = true;
+ }
+
+ log.warn("Connection failure has been detected: " + me.getMessage() +
+ " [code=" +
+ me.getCode() +
+ "]");
+
+ // Then call the listeners
+ callFailureListeners(me);
+
+ callClosingListeners();
+
+ internalClose();
}
public void flush()
@@ -140,20 +255,6 @@
return destroyed;
}
- public boolean removeCloseListener(CloseListener listener)
- {
- return false;
- }
-
- public boolean removeFailureListener(FailureListener listener)
- {
- return false;
- }
-
- public void setFailureListeners(List<FailureListener> listeners)
- {
- }
-
public void bufferReceived(Object connectionID, HornetQBuffer buffer)
{
manager.handleBuffer(this, buffer);
@@ -188,7 +289,7 @@
{
return clientID;
}
-
+
public boolean isValid()
{
return valid;
@@ -198,4 +299,45 @@
{
this.valid = valid;
}
+
+ private void callFailureListeners(final HornetQException me)
+ {
+ final List<FailureListener> listenersClone = new
ArrayList<FailureListener>(failureListeners);
+
+ for (final FailureListener listener : listenersClone)
+ {
+ try
+ {
+ listener.connectionFailed(me);
+ }
+ catch (final Throwable t)
+ {
+ // Failure of one listener to execute shouldn't prevent others
+ // from
+ // executing
+ log.error("Failed to execute failure listener", t);
+ }
+ }
+ }
+
+ private void callClosingListeners()
+ {
+ final List<CloseListener> listenersClone = new
ArrayList<CloseListener>(closeListeners);
+
+ for (final CloseListener listener : listenersClone)
+ {
+ try
+ {
+ listener.connectionClosed();
+ }
+ catch (final Throwable t)
+ {
+ // Failure of one listener to execute shouldn't prevent others
+ // from
+ // executing
+ log.error("Failed to execute failure listener", t);
+ }
+ }
+ }
+
}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-10-06
08:18:15 UTC (rev 9757)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-10-06
11:24:44 UTC (rev 9758)
@@ -124,6 +124,7 @@
else
{
// Default to 1 minute - which is same as core protocol
+
return new ConnectionEntry(conn, System.currentTimeMillis(), 1 * 60 * 1000);
}
}
@@ -143,6 +144,8 @@
{
StompConnection conn = (StompConnection)connection;
+ conn.setDataReceived();
+
StompDecoder decoder = conn.getDecoder();
do
@@ -217,7 +220,6 @@
if (request.getHeaders().containsKey(Stomp.Headers.RECEIPT_REQUESTED))
{
- log.info("receipt requested");
if (response == null)
{
Map<String, Object> h = new HashMap<String, Object>();
Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-10-06
08:18:15 UTC (rev 9757)
+++
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-10-06
11:24:44 UTC (rev 9758)
@@ -33,13 +33,13 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
import org.hornetq.core.protocol.core.impl.CoreProtocolManagerFactory;
import org.hornetq.core.protocol.stomp.StompProtocolManagerFactory;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.impl.ServerSessionImpl;
import org.hornetq.core.server.management.ManagementService;
import org.hornetq.spi.core.protocol.ConnectionEntry;
import org.hornetq.spi.core.protocol.ProtocolManager;
@@ -133,7 +133,8 @@
// difference between Stomp and Stomp over Web Sockets is handled in
NettyAcceptor.getPipeline()
this.protocolMap.put(ProtocolType.STOMP, new
StompProtocolManagerFactory().createProtocolManager(server,
interceptors));
- this.protocolMap.put(ProtocolType.STOMP_WS, new
StompProtocolManagerFactory().createProtocolManager(server, interceptors));
+ this.protocolMap.put(ProtocolType.STOMP_WS, new
StompProtocolManagerFactory().createProtocolManager(server,
+
interceptors));
}
// RemotingService implementation -------------------------------
@@ -144,15 +145,14 @@
{
return;
}
-
- ClassLoader tccl =
- AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
+
+ ClassLoader tccl = AccessController.doPrivileged(new
PrivilegedAction<ClassLoader>()
+ {
+ public ClassLoader run()
{
- public ClassLoader run()
- {
- return Thread.currentThread().getContextClassLoader();
- }
- });
+ return Thread.currentThread().getContextClassLoader();
+ }
+ });
// The remoting service maintains it's own thread pool for handling remoting
traffic
// If OIO each connection will have it's own thread
@@ -161,7 +161,8 @@
// to support many hundreds of connections, but the main thread pool must be kept
small for better performance
ThreadFactory tFactory = new
HornetQThreadFactory("HornetQ-remoting-threads" +
System.identityHashCode(this),
- false, tccl);
+ false,
+ tccl);
threadPool = Executors.newCachedThreadPool(tFactory);
@@ -322,6 +323,8 @@
}
else
{
+ log.info("failed to remove connection");
+
return null;
}
}
@@ -388,7 +391,7 @@
for (FailureListener listener : failureListeners)
{
- if (listener instanceof ServerSessionPacketHandler)
+ if (listener instanceof ServerSessionImpl)
{
empty = false;
@@ -528,9 +531,12 @@
RemotingConnection conn = removeConnection(id);
HornetQException me = new
HornetQException(HornetQException.CONNECTION_TIMEDOUT,
- "Did not receive ping from
" + conn.getRemoteAddress() +
+ "Did not receive data from
" + conn.getRemoteAddress() +
". It is likely
the client has exited or crashed without " +
- "closing its
connection, or the network between the server and client has failed. The connection will
now be closed.");
+ "closing its
connection, or the network between the server and client has failed. " +
+ "You also might
have configured connection-ttl and client-failure-check-period incorrectly. " +
+ "Please check
user manual for more information." +
+ " The connection
will now be closed.");
conn.fail(me);
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-10-06 08:18:15
UTC (rev 9757)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-10-06 11:24:44
UTC (rev 9758)
@@ -73,7 +73,7 @@
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
* @author <a href="mailto:andy.taylor@jboss.org>Andy Taylor</a>
*/
-public class ServerSessionImpl implements ServerSession, FailureListener
+public class ServerSessionImpl implements ServerSession , FailureListener
{
// Constants
-----------------------------------------------------------------------------
Modified: trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java 2010-10-06
08:18:15 UTC (rev 9757)
+++ trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java 2010-10-06
11:24:44 UTC (rev 9758)
@@ -77,14 +77,22 @@
* @return true if removed
*/
boolean removeCloseListener(CloseListener listener);
-
+
+ List<CloseListener> removeCloseListeners();
+
+ void setCloseListeners(List<CloseListener> listeners);
+
+
/**
* return all the failure listeners
*
* @return the listeners
*/
List<FailureListener> getFailureListeners();
+
+ List<FailureListener> removeFailureListeners();
+
/**
* set the failure listeners.
* <p/>
Added:
trunk/tests/src/org/hornetq/tests/integration/stomp/StompConnectionCleanupTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompConnectionCleanupTest.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/stomp/StompConnectionCleanupTest.java 2010-10-06
11:24:44 UTC (rev 9758)
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.stomp;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+
+import junit.framework.Assert;
+
+import org.hornetq.core.protocol.stomp.Stomp;
+import org.hornetq.jms.server.JMSServerManager;
+
+/**
+ * A StompConnectionCleanupTest
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class StompConnectionCleanupTest extends StompTestBase
+{
+ private static final long CONNECTION_TTL = 2000;
+
+ public void testConnectionCleanup() throws Exception
+ {
+ String frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ frame = receiveFrame(10000);
+
+ //We send and consumer a message to ensure a STOMP connection and server session is
created
+
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() +
getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
+ sendFrame(frame);
+
+ frame = "SEND\n" + "destination:" + getQueuePrefix() +
getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.indexOf("destination:") > 0);
+
+ // Now we wait until the connection is cleared on the server, which will happen
some time after ttl, since no data
+ // is being sent
+
+ long start = System.currentTimeMillis();
+
+ while (true)
+ {
+ int connCount =
server.getHornetQServer().getRemotingService().getConnections().size();
+
+ int sessionCount = server.getHornetQServer().getSessions().size();
+
+ // All connections and sessions should be timed out including STOMP + JMS
connection
+
+ if (connCount == 0 && sessionCount == 0)
+ {
+ break;
+ }
+
+ Thread.sleep(10);
+
+ if (System.currentTimeMillis() - start > 10000)
+ {
+ fail("Timed out waiting for connection to be cleared up");
+ }
+ }
+ }
+
+ public void testConnectionNotCleanedUp() throws Exception
+ {
+ String frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ frame = receiveFrame(10000);
+
+ //We send and consumer a message to ensure a STOMP connection and server session is
created
+
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ long time = CONNECTION_TTL * 3;
+
+ long start = System.currentTimeMillis();
+
+ //Send msgs for an amount of time > connection_ttl make sure connection is not
closed
+ while (true)
+ {
+ //Send and receive a msg
+
+ frame = "SEND\n" + "destination:" + getQueuePrefix() +
getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
+ sendFrame(frame);
+
+ Message msg = consumer.receive(1000);
+ assertNotNull(msg);
+
+ Thread.sleep(100);
+
+ if (System.currentTimeMillis() - start > time)
+ {
+ break;
+ }
+ }
+
+ }
+
+ @Override
+ protected JMSServerManager createServer() throws Exception
+ {
+ JMSServerManager s = super.createServer();
+
+ s.getHornetQServer().getConfiguration().setConnectionTTLOverride(CONNECTION_TTL);
+
+ return s;
+ }
+}
Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-10-06 08:18:15
UTC (rev 9757)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-10-06 11:24:44
UTC (rev 9758)
@@ -19,77 +19,29 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
import java.net.SocketTimeoutException;
-import java.util.HashMap;
-import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
-import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
import javax.jms.TextMessage;
-import javax.jms.Topic;
import junit.framework.Assert;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.stomp.Stomp;
-import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
-import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
-import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
-import org.hornetq.core.remoting.impl.netty.TransportConstants;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.HornetQServers;
-import org.hornetq.jms.client.HornetQConnectionFactory;
-import org.hornetq.jms.server.JMSServerManager;
-import org.hornetq.jms.server.config.JMSConfiguration;
-import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
-import org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl;
-import org.hornetq.jms.server.config.impl.TopicConfigurationImpl;
-import org.hornetq.jms.server.impl.JMSServerManagerImpl;
-import org.hornetq.spi.core.protocol.ProtocolType;
-import org.hornetq.tests.unit.util.InVMContext;
-import org.hornetq.tests.util.UnitTestCase;
-public class StompTest extends UnitTestCase
+public class StompTest extends StompTestBase
{
private static final transient Logger log = Logger.getLogger(StompTest.class);
- private int port = 61613;
-
- private Socket stompSocket;
-
- private ByteArrayOutputStream inputBuffer;
-
- private ConnectionFactory connectionFactory;
-
- private Connection connection;
-
- private Session session;
-
- private Queue queue;
-
- private Topic topic;
-
- private JMSServerManager server;
-
public void testSendManyMessages() throws Exception
{
MessageConsumer consumer = session.createConsumer(queue);
@@ -106,7 +58,7 @@
public void onMessage(Message arg0)
{
- //System.out.println("<<< " + (1000 - latch.getCount()));
+ // System.out.println("<<< " + (1000 -
latch.getCount()));
latch.countDown();
}
});
@@ -115,7 +67,7 @@
for (int i = 1; i <= count; i++)
{
// Thread.sleep(1);
- //System.out.println(">>> " + i);
+ // System.out.println(">>> " + i);
sendFrame(frame);
}
@@ -191,14 +143,14 @@
TextMessage message = (TextMessage)consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals("Hello World", message.getText());
-
+
// Make sure that the timestamp is valid - should
// be very close to the current time.
long tnow = System.currentTimeMillis();
long tmsg = message.getJMSTimestamp();
Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
}
-
+
/*
* Some STOMP clients erroneously put a new line \n *after* the terminating NUL char
at the end of the frame
* This means next frame read might have a \n a the beginning.
@@ -215,14 +167,20 @@
frame = receiveFrame(10000);
Assert.assertTrue(frame.startsWith("CONNECTED"));
- frame = "SEND\n" + "destination:" + getQueuePrefix() +
getQueueName() + "\n\n" + "Hello World" + Stomp.NULL +
"\n";
+ frame = "SEND\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n\n" +
+ "Hello World" +
+ Stomp.NULL +
+ "\n";
sendFrame(frame);
TextMessage message = (TextMessage)consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals("Hello World", message.getText());
-
+
// Make sure that the timestamp is valid - should
// be very close to the current time.
long tnow = System.currentTimeMillis();
@@ -258,7 +216,7 @@
TextMessage message = (TextMessage)consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals("Hello World", message.getText());
-
+
// Make sure that the timestamp is valid - should
// be very close to the current time.
long tnow = System.currentTimeMillis();
@@ -1279,210 +1237,4 @@
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
sendFrame(frame);
}
-
- // Implementation methods
- // -------------------------------------------------------------------------
- protected void setUp() throws Exception
- {
- super.setUp();
-
- server = createServer();
- server.start();
- connectionFactory = createConnectionFactory();
-
- stompSocket = createSocket();
- inputBuffer = new ByteArrayOutputStream();
-
- connection = connectionFactory.createConnection();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- queue = session.createQueue(getQueueName());
- topic = session.createTopic(getTopicName());
- connection.start();
- }
-
- /**
- * @return
- * @throws Exception
- */
- private JMSServerManager createServer() throws Exception
- {
- Configuration config = new ConfigurationImpl();
- config.setSecurityEnabled(false);
- config.setPersistenceEnabled(false);
-
- Map<String, Object> params = new HashMap<String, Object>();
- params.put(TransportConstants.PROTOCOL_PROP_NAME, ProtocolType.STOMP.toString());
- params.put(TransportConstants.PORT_PROP_NAME,
TransportConstants.DEFAULT_STOMP_PORT);
- TransportConfiguration stompTransport = new
TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
- config.getAcceptorConfigurations().add(stompTransport);
- config.getAcceptorConfigurations().add(new
TransportConfiguration(InVMAcceptorFactory.class.getName()));
- HornetQServer hornetQServer = HornetQServers.newHornetQServer(config);
-
- JMSConfiguration jmsConfig = new JMSConfigurationImpl();
- jmsConfig.getQueueConfigurations()
- .add(new JMSQueueConfigurationImpl(getQueueName(), null, false,
getQueueName()));
- jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl(getTopicName(),
getTopicName()));
- server = new JMSServerManagerImpl(hornetQServer, jmsConfig);
- server.setContext(new InVMContext());
- return server;
- }
-
- protected void tearDown() throws Exception
- {
- connection.close();
- if (stompSocket != null)
- {
- stompSocket.close();
- }
- server.stop();
-
- super.tearDown();
- }
-
- protected void reconnect() throws Exception
- {
- reconnect(0);
- }
-
- protected void reconnect(long sleep) throws Exception
- {
- stompSocket.close();
-
- if (sleep > 0)
- {
- Thread.sleep(sleep);
- }
-
- stompSocket = createSocket();
- inputBuffer = new ByteArrayOutputStream();
- }
-
- protected ConnectionFactory createConnectionFactory()
- {
- return new HornetQConnectionFactory(new
TransportConfiguration(InVMConnectorFactory.class.getName()));
- }
-
- protected Socket createSocket() throws IOException
- {
- return new Socket("127.0.0.1", port);
- }
-
- protected String getQueueName()
- {
- return "test";
- }
-
- protected String getQueuePrefix()
- {
- return "jms.queue.";
- }
-
- protected String getTopicName()
- {
- return "testtopic";
- }
-
- protected String getTopicPrefix()
- {
- return "jms.topic.";
- }
-
- public void sendFrame(String data) throws Exception
- {
- byte[] bytes = data.getBytes("UTF-8");
- OutputStream outputStream = stompSocket.getOutputStream();
- for (int i = 0; i < bytes.length; i++)
- {
- outputStream.write(bytes[i]);
- }
- outputStream.flush();
- }
-
- public void sendFrame(byte[] data) throws Exception
- {
- OutputStream outputStream = stompSocket.getOutputStream();
- for (int i = 0; i < data.length; i++)
- {
- outputStream.write(data[i]);
- }
- outputStream.flush();
- }
-
- public String receiveFrame(long timeOut) throws Exception
- {
- stompSocket.setSoTimeout((int)timeOut);
- InputStream is = stompSocket.getInputStream();
- int c = 0;
- for (;;)
- {
- c = is.read();
- if (c < 0)
- {
- throw new IOException("socket closed.");
- }
- else if (c == 0)
- {
- c = is.read();
- if (c != '\n')
- {
- byte[] ba = inputBuffer.toByteArray();
- System.out.println(new String(ba, "UTF-8"));
- }
- Assert.assertEquals("Expecting stomp frame to terminate with \0\n",
c, '\n');
- byte[] ba = inputBuffer.toByteArray();
- inputBuffer.reset();
- return new String(ba, "UTF-8");
- }
- else
- {
- inputBuffer.write(c);
- }
- }
- }
-
- public void sendMessage(String msg) throws Exception
- {
- sendMessage(msg, queue);
- }
-
- public void sendMessage(String msg, Destination destination) throws Exception
- {
- MessageProducer producer = session.createProducer(destination);
- TextMessage message = session.createTextMessage(msg);
- producer.send(message);
- }
-
- public void sendMessage(byte[] data, Destination destination) throws Exception
- {
- sendMessage(data, "foo", "xyz", destination);
- }
-
- public void sendMessage(String msg, String propertyName, String propertyValue) throws
Exception
- {
- sendMessage(msg.getBytes("UTF-8"), propertyName, propertyValue, queue);
- }
-
- public void sendMessage(byte[] data, String propertyName, String propertyValue,
Destination destination) throws Exception
- {
- MessageProducer producer = session.createProducer(destination);
- BytesMessage message = session.createBytesMessage();
- message.setStringProperty(propertyName, propertyValue);
- message.writeBytes(data);
- producer.send(message);
- }
-
- protected void waitForReceipt() throws Exception
- {
- String frame = receiveFrame(50000);
- assertNotNull(frame);
- assertTrue(frame.indexOf("RECEIPT") > -1);
- }
-
- protected void waitForFrameToTakeEffect() throws InterruptedException
- {
- // bit of a dirty hack :)
- // another option would be to force some kind of receipt to be returned
- // from the frame
- Thread.sleep(2000);
- }
}
Added: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java
(rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java 2010-10-06
11:24:44 UTC (rev 9758)
@@ -0,0 +1,290 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.tests.integration.stomp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
+import org.hornetq.core.remoting.impl.netty.TransportConstants;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.jms.server.config.JMSConfiguration;
+import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
+import org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl;
+import org.hornetq.jms.server.config.impl.TopicConfigurationImpl;
+import org.hornetq.jms.server.impl.JMSServerManagerImpl;
+import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.tests.unit.util.InVMContext;
+import org.hornetq.tests.util.UnitTestCase;
+
+public abstract class StompTestBase extends UnitTestCase
+{
+ private static final transient Logger log = Logger.getLogger(StompTestBase.class);
+
+ private int port = 61613;
+
+ private Socket stompSocket;
+
+ private ByteArrayOutputStream inputBuffer;
+
+ private ConnectionFactory connectionFactory;
+
+ private Connection connection;
+
+ protected Session session;
+
+ protected Queue queue;
+
+ protected Topic topic;
+
+ protected JMSServerManager server;
+
+
+
+ // Implementation methods
+ // -------------------------------------------------------------------------
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ server = createServer();
+ server.start();
+ connectionFactory = createConnectionFactory();
+
+ stompSocket = createSocket();
+ inputBuffer = new ByteArrayOutputStream();
+
+ connection = connectionFactory.createConnection();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ queue = session.createQueue(getQueueName());
+ topic = session.createTopic(getTopicName());
+ connection.start();
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ */
+ protected JMSServerManager createServer() throws Exception
+ {
+ Configuration config = new ConfigurationImpl();
+ config.setSecurityEnabled(false);
+ config.setPersistenceEnabled(false);
+
+ Map<String, Object> params = new HashMap<String, Object>();
+ params.put(TransportConstants.PROTOCOL_PROP_NAME, ProtocolType.STOMP.toString());
+ params.put(TransportConstants.PORT_PROP_NAME,
TransportConstants.DEFAULT_STOMP_PORT);
+ TransportConfiguration stompTransport = new
TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
+ config.getAcceptorConfigurations().add(stompTransport);
+ config.getAcceptorConfigurations().add(new
TransportConfiguration(InVMAcceptorFactory.class.getName()));
+ HornetQServer hornetQServer = HornetQServers.newHornetQServer(config);
+
+ JMSConfiguration jmsConfig = new JMSConfigurationImpl();
+ jmsConfig.getQueueConfigurations()
+ .add(new JMSQueueConfigurationImpl(getQueueName(), null, false,
getQueueName()));
+ jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl(getTopicName(),
getTopicName()));
+ server = new JMSServerManagerImpl(hornetQServer, jmsConfig);
+ server.setContext(new InVMContext());
+ return server;
+ }
+
+ protected void tearDown() throws Exception
+ {
+ connection.close();
+ if (stompSocket != null)
+ {
+ stompSocket.close();
+ }
+ server.stop();
+
+ super.tearDown();
+ }
+
+ protected void reconnect() throws Exception
+ {
+ reconnect(0);
+ }
+
+ protected void reconnect(long sleep) throws Exception
+ {
+ stompSocket.close();
+
+ if (sleep > 0)
+ {
+ Thread.sleep(sleep);
+ }
+
+ stompSocket = createSocket();
+ inputBuffer = new ByteArrayOutputStream();
+ }
+
+ protected ConnectionFactory createConnectionFactory()
+ {
+ return new HornetQConnectionFactory(new
TransportConfiguration(InVMConnectorFactory.class.getName()));
+ }
+
+ protected Socket createSocket() throws IOException
+ {
+ return new Socket("127.0.0.1", port);
+ }
+
+ protected String getQueueName()
+ {
+ return "test";
+ }
+
+ protected String getQueuePrefix()
+ {
+ return "jms.queue.";
+ }
+
+ protected String getTopicName()
+ {
+ return "testtopic";
+ }
+
+ protected String getTopicPrefix()
+ {
+ return "jms.topic.";
+ }
+
+ public void sendFrame(String data) throws Exception
+ {
+ byte[] bytes = data.getBytes("UTF-8");
+ OutputStream outputStream = stompSocket.getOutputStream();
+ for (int i = 0; i < bytes.length; i++)
+ {
+ outputStream.write(bytes[i]);
+ }
+ outputStream.flush();
+ }
+
+ public void sendFrame(byte[] data) throws Exception
+ {
+ OutputStream outputStream = stompSocket.getOutputStream();
+ for (int i = 0; i < data.length; i++)
+ {
+ outputStream.write(data[i]);
+ }
+ outputStream.flush();
+ }
+
+ public String receiveFrame(long timeOut) throws Exception
+ {
+ stompSocket.setSoTimeout((int)timeOut);
+ InputStream is = stompSocket.getInputStream();
+ int c = 0;
+ for (;;)
+ {
+ c = is.read();
+ if (c < 0)
+ {
+ throw new IOException("socket closed.");
+ }
+ else if (c == 0)
+ {
+ c = is.read();
+ if (c != '\n')
+ {
+ byte[] ba = inputBuffer.toByteArray();
+ System.out.println(new String(ba, "UTF-8"));
+ }
+ Assert.assertEquals("Expecting stomp frame to terminate with \0\n",
c, '\n');
+ byte[] ba = inputBuffer.toByteArray();
+ inputBuffer.reset();
+ return new String(ba, "UTF-8");
+ }
+ else
+ {
+ inputBuffer.write(c);
+ }
+ }
+ }
+
+ public void sendMessage(String msg) throws Exception
+ {
+ sendMessage(msg, queue);
+ }
+
+ public void sendMessage(String msg, Destination destination) throws Exception
+ {
+ MessageProducer producer = session.createProducer(destination);
+ TextMessage message = session.createTextMessage(msg);
+ producer.send(message);
+ }
+
+ public void sendMessage(byte[] data, Destination destination) throws Exception
+ {
+ sendMessage(data, "foo", "xyz", destination);
+ }
+
+ public void sendMessage(String msg, String propertyName, String propertyValue) throws
Exception
+ {
+ sendMessage(msg.getBytes("UTF-8"), propertyName, propertyValue, queue);
+ }
+
+ public void sendMessage(byte[] data, String propertyName, String propertyValue,
Destination destination) throws Exception
+ {
+ MessageProducer producer = session.createProducer(destination);
+ BytesMessage message = session.createBytesMessage();
+ message.setStringProperty(propertyName, propertyValue);
+ message.writeBytes(data);
+ producer.send(message);
+ }
+
+ protected void waitForReceipt() throws Exception
+ {
+ String frame = receiveFrame(50000);
+ assertNotNull(frame);
+ assertTrue(frame.indexOf("RECEIPT") > -1);
+ }
+
+ protected void waitForFrameToTakeEffect() throws InterruptedException
+ {
+ // bit of a dirty hack :)
+ // another option would be to force some kind of receipt to be returned
+ // from the frame
+ Thread.sleep(2000);
+ }
+}