From do-not-reply at jboss.org Wed Oct 6 07:24:45 2010
Content-Type: multipart/mixed; boundary="===============6641207432056589441=="
MIME-Version: 1.0
From: do-not-reply at jboss.org
To: hornetq-commits at lists.jboss.org
Subject: [hornetq-commits] JBoss hornetq SVN: r9758 - in trunk:
src/main/org/hornetq/core/protocol/core/impl and 5 other directories.
Date: Wed, 06 Oct 2010 07:24:45 -0400
Message-ID: <201010061124.o96BOjtK028870@svn01.web.mwc.hst.phx2.redhat.com>
--===============6641207432056589441==
Content-Type: text/plain; charset="utf-8"
MIME-Version: 1.0
Content-Transfer-Encoding: quoted-printable
Author: timfox
Date: 2010-10-06 07:24:44 -0400 (Wed, 06 Oct 2010)
New Revision: 9758
Added:
trunk/tests/src/org/hornetq/tests/integration/stomp/StompConnectionClean=
upTest.java
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java
Modified:
trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler=
.java
trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.j=
ava
trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImp=
l.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl=
.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-526
Modified: trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacket=
Handler.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandle=
r.java 2010-10-06 08:18:15 UTC (rev 9757)
+++ trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandle=
r.java 2010-10-06 11:24:44 UTC (rev 9758)
@@ -107,7 +107,7 @@
* @author Andy Taylor
* @author Clebert Suconic=
a>
*/
-public class ServerSessionPacketHandler implements ChannelHandler, CloseLi=
stener, FailureListener
+public class ServerSessionPacketHandler implements ChannelHandler
{
private static final Logger log =3D Logger.getLogger(ServerSessionPacke=
tHandler.class);
=
@@ -150,8 +150,6 @@
{
direct =3D false;
}
- =
- addConnectionListeners();
}
=
public long getID()
@@ -159,22 +157,6 @@
return channel.getID();
}
=
- public void connectionFailed(final HornetQException exception)
- {
- log.warn("Client connection failed, clearing up resources for sessio=
n " + session.getName());
-
- try
- {
- session.close(true);
- }
- catch (Exception e)
- {
- log.error("Failed to close session", e);
- }
-
- log.warn("Cleared up resources for session " + session.getName());
- }
-
public void close()
{
channel.flushConfirmations();
@@ -189,22 +171,6 @@
}
}
=
- public void connectionClosed()
- {
- }
-
- private void addConnectionListeners()
- {
- remotingConnection.addFailureListener(this);
- remotingConnection.addCloseListener(this);
- }
-
- private void removeConnectionListeners()
- {
- remotingConnection.removeFailureListener(this);
- remotingConnection.removeCloseListener(this);
- }
-
public Channel getChannel()
{
return channel;
@@ -423,7 +389,7 @@
{
requiresResponse =3D true;
session.close(false);
- removeConnectionListeners();
+ // removeConnectionListeners();
response =3D new NullResponseMessage();
flush =3D true;
closeChannel =3D true;
@@ -601,10 +567,10 @@
// might be executed
// before we have transferred the connection, leaving it in a starte=
d state
session.setTransferring(true);
-
- remotingConnection.removeFailureListener(this);
- remotingConnection.removeCloseListener(this);
-
+ =
+ List closeListeners =3D remotingConnection.removeClos=
eListeners();
+ List failureListeners =3D remotingConnection.remove=
FailureListeners();
+ =
// Note. We do not destroy the replicating connection here. In the c=
ase the live server has really crashed
// then the connection will get cleaned up anyway when the server pi=
ng timeout kicks in.
// In the case the live server is really still up, i.e. a split brai=
n situation (or in tests), then closing
@@ -618,8 +584,8 @@
=
remotingConnection =3D newConnection;
=
- remotingConnection.addFailureListener(this);
- remotingConnection.addCloseListener(this);
+ remotingConnection.setCloseListeners(closeListeners);
+ remotingConnection.setFailureListeners(failureListeners);
=
int serverLastReceivedCommandID =3D channel.getLastConfirmedCommandI=
D();
=
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCal=
lback.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.=
java 2010-10-06 08:18:15 UTC (rev 9757)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.=
java 2010-10-06 11:24:44 UTC (rev 9758)
@@ -27,7 +27,7 @@
/**
* A CoreSessionCallback
*
- * @author Jeff Mesnil
+ * @author Tim Fox
*
*
*/
@@ -40,7 +40,7 @@
private ProtocolManager protocolManager;
=
private String name;
- =
+
public CoreSessionCallback(String name, ProtocolManager protocolManager=
, Channel channel)
{
this.name =3D name;
@@ -54,8 +54,8 @@
=
channel.send(packet);
=
- int size =3D packet.getPacketSize();
- =
+ int size =3D packet.getPacketSize();
+
return size;
}
=
@@ -67,15 +67,15 @@
=
return packet.getPacketSize();
}
- =
+
public int sendMessage(ServerMessage message, long consumerID, int deli=
veryCount)
{
Packet packet =3D new SessionReceiveMessage(consumerID, message, del=
iveryCount);
=
channel.sendBatched(packet);
- =
- int size =3D packet.getPacketSize();
=
+ int size =3D packet.getPacketSize();
+
return size;
}
=
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnec=
tionImpl.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionIm=
pl.java 2010-10-06 08:18:15 UTC (rev 9757)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionIm=
pl.java 2010-10-06 11:24:44 UTC (rev 9758)
@@ -225,6 +225,31 @@
return closeListeners.remove(listener);
}
=
+ public List removeCloseListeners()
+ {
+ List ret =3D new ArrayList(closeListen=
ers);
+ =
+ closeListeners.clear();
+ =
+ return ret;
+ }
+
+ public List removeFailureListeners()
+ {
+ List ret =3D new ArrayList(failure=
Listeners);
+ =
+ failureListeners.clear();
+ =
+ return ret; =
+ }
+
+ public void setCloseListeners(List listeners)
+ {
+ closeListeners.clear();
+ =
+ closeListeners.addAll(listeners); =
+ }
+
public HornetQBuffer createBuffer(final int size)
{
return transportConnection.createBuffer(size);
@@ -471,6 +496,7 @@
channels.clear();
}
} =
+ =
private void callFailureListeners(final HornetQException me)
{
final List listenersClone =3D new ArrayList(failureListeners);
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.ja=
va
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 201=
0-10-06 08:18:15 UTC (rev 9757)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 201=
0-10-06 11:24:44 UTC (rev 9758)
@@ -13,8 +13,10 @@
=
package org.hornetq.core.protocol.stomp;
=
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
=
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
@@ -32,8 +34,9 @@
*
*
*/
-class StompConnection implements RemotingConnection
+public class StompConnection implements RemotingConnection
{
+
private static final Logger log =3D Logger.getLogger(StompConnection.cl=
ass);
=
private final StompProtocolManager manager;
@@ -49,9 +52,17 @@
private boolean valid;
=
private boolean destroyed =3D false;
- =
+
private StompDecoder decoder =3D new StompDecoder();
+
+ private final List failureListeners =3D new CopyOnWrit=
eArrayList();
+
+ private final List closeListeners =3D new CopyOnWriteArr=
ayList();
+
+ private final Object failLock =3D new Object();
=
+ private volatile boolean dataReceived;
+
public StompDecoder getDecoder()
{
return decoder;
@@ -64,17 +75,90 @@
this.manager =3D manager;
}
=
- public void addCloseListener(CloseListener listener)
+ public void addFailureListener(final FailureListener listener)
{
+ if (listener =3D=3D null)
+ {
+ throw new IllegalStateException("FailureListener cannot be null");
+ }
+
+ failureListeners.add(listener);
}
=
- public void addFailureListener(FailureListener listener)
+ public boolean removeFailureListener(final FailureListener listener)
{
+ if (listener =3D=3D null)
+ {
+ throw new IllegalStateException("FailureListener cannot be null");
+ }
+
+ return failureListeners.remove(listener);
}
=
+ public void addCloseListener(final CloseListener listener)
+ {
+ if (listener =3D=3D null)
+ {
+ throw new IllegalStateException("CloseListener cannot be null");
+ }
+
+ closeListeners.add(listener);
+ }
+
+ public boolean removeCloseListener(final CloseListener listener)
+ {
+ if (listener =3D=3D null)
+ {
+ throw new IllegalStateException("CloseListener cannot be null");
+ }
+
+ return closeListeners.remove(listener);
+ }
+
+ public List removeCloseListeners()
+ {
+ List ret =3D new ArrayList(closeListen=
ers);
+
+ closeListeners.clear();
+
+ return ret;
+ }
+
+ public List removeFailureListeners()
+ {
+ List ret =3D new ArrayList(failure=
Listeners);
+
+ failureListeners.clear();
+
+ return ret;
+ }
+
+ public void setCloseListeners(List listeners)
+ {
+ closeListeners.clear();
+
+ closeListeners.addAll(listeners);
+ }
+
+ public void setFailureListeners(final List listeners)
+ {
+ failureListeners.clear();
+
+ failureListeners.addAll(listeners);
+ }
+ =
+ public void setDataReceived()
+ {
+ dataReceived =3D true;
+ }
+
public boolean checkDataReceived()
{
- return true;
+ boolean res =3D dataReceived;
+
+ dataReceived =3D false;
+
+ return res;
}
=
public HornetQBuffer createBuffer(int size)
@@ -84,13 +168,23 @@
=
public void destroy()
{
- if (destroyed)
+ synchronized (failLock)
{
- return;
+ if (destroyed)
+ {
+ return;
+ }
}
=
destroyed =3D true;
=
+ internalClose();
+
+ callClosingListeners();
+ }
+
+ private void internalClose()
+ {
transportConnection.close();
=
manager.cleanup(this);
@@ -100,8 +194,29 @@
{
}
=
- public void fail(HornetQException me)
+ public void fail(final HornetQException me)
{
+ synchronized (failLock)
+ {
+ if (destroyed)
+ {
+ return;
+ }
+
+ destroyed =3D true;
+ }
+
+ log.warn("Connection failure has been detected: " + me.getMessage() +
+ " [code=3D" +
+ me.getCode() +
+ "]");
+
+ // Then call the listeners
+ callFailureListeners(me);
+
+ callClosingListeners();
+ =
+ internalClose();
}
=
public void flush()
@@ -140,20 +255,6 @@
return destroyed;
}
=
- public boolean removeCloseListener(CloseListener listener)
- {
- return false;
- }
-
- public boolean removeFailureListener(FailureListener listener)
- {
- return false;
- }
-
- public void setFailureListeners(List listeners)
- {
- }
-
public void bufferReceived(Object connectionID, HornetQBuffer buffer)
{
manager.handleBuffer(this, buffer);
@@ -188,7 +289,7 @@
{
return clientID;
}
- =
+
public boolean isValid()
{
return valid;
@@ -198,4 +299,45 @@
{
this.valid =3D valid;
}
+
+ private void callFailureListeners(final HornetQException me)
+ {
+ final List listenersClone =3D new ArrayList(failureListeners);
+
+ for (final FailureListener listener : listenersClone)
+ {
+ try
+ {
+ listener.connectionFailed(me);
+ }
+ catch (final Throwable t)
+ {
+ // Failure of one listener to execute shouldn't prevent others
+ // from
+ // executing
+ log.error("Failed to execute failure listener", t);
+ }
+ }
+ }
+
+ private void callClosingListeners()
+ {
+ final List listenersClone =3D new ArrayList(closeListeners);
+
+ for (final CloseListener listener : listenersClone)
+ {
+ try
+ {
+ listener.connectionClosed();
+ }
+ catch (final Throwable t)
+ {
+ // Failure of one listener to execute shouldn't prevent others
+ // from
+ // executing
+ log.error("Failed to execute failure listener", t);
+ }
+ }
+ }
+
}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManag=
er.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.jav=
a 2010-10-06 08:18:15 UTC (rev 9757)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.jav=
a 2010-10-06 11:24:44 UTC (rev 9758)
@@ -124,6 +124,7 @@
else
{
// Default to 1 minute - which is same as core protocol
+
return new ConnectionEntry(conn, System.currentTimeMillis(), 1 * =
60 * 1000);
}
}
@@ -143,6 +144,8 @@
{
StompConnection conn =3D (StompConnection)connection;
=
+ conn.setDataReceived();
+ =
StompDecoder decoder =3D conn.getDecoder();
=
do
@@ -217,7 +220,6 @@
=
if (request.getHeaders().containsKey(Stomp.Headers.RECEIPT_REQ=
UESTED))
{
- log.info("receipt requested");
if (response =3D=3D null)
{
Map h =3D new HashMap();
Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServ=
iceImpl.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImp=
l.java 2010-10-06 08:18:15 UTC (rev 9757)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImp=
l.java 2010-10-06 11:24:44 UTC (rev 9758)
@@ -33,13 +33,13 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
import org.hornetq.core.protocol.core.impl.CoreProtocolManagerFactory;
import org.hornetq.core.protocol.stomp.StompProtocolManagerFactory;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.impl.ServerSessionImpl;
import org.hornetq.core.server.management.ManagementService;
import org.hornetq.spi.core.protocol.ConnectionEntry;
import org.hornetq.spi.core.protocol.ProtocolManager;
@@ -133,7 +133,8 @@
// difference between Stomp and Stomp over Web Sockets is handled in=
NettyAcceptor.getPipeline()
this.protocolMap.put(ProtocolType.STOMP, new StompProtocolManagerFac=
tory().createProtocolManager(server,
=
interceptors));
- this.protocolMap.put(ProtocolType.STOMP_WS, new StompProtocolManager=
Factory().createProtocolManager(server, interceptors));
+ this.protocolMap.put(ProtocolType.STOMP_WS, new StompProtocolManager=
Factory().createProtocolManager(server,
+ =
interceptors));
}
=
// RemotingService implementation -------------------------------
@@ -144,15 +145,14 @@
{
return;
}
- =
- ClassLoader tccl =3D
- AccessController.doPrivileged(new PrivilegedAction()
+
+ ClassLoader tccl =3D AccessController.doPrivileged(new PrivilegedAct=
ion()
+ {
+ public ClassLoader run()
{
- public ClassLoader run()
- {
- return Thread.currentThread().getContextClassLoader();
- }
- });
+ return Thread.currentThread().getContextClassLoader();
+ }
+ });
=
// The remoting service maintains it's own thread pool for handling =
remoting traffic
// If OIO each connection will have it's own thread
@@ -161,7 +161,8 @@
// to support many hundreds of connections, but the main thread pool=
must be kept small for better performance
=
ThreadFactory tFactory =3D new HornetQThreadFactory("HornetQ-remotin=
g-threads" + System.identityHashCode(this),
- false, tccl);
+ false,
+ tccl);
=
threadPool =3D Executors.newCachedThreadPool(tFactory);
=
@@ -322,6 +323,8 @@
}
else
{
+ log.info("failed to remove connection");
+
return null;
}
}
@@ -388,7 +391,7 @@
=
for (FailureListener listener : failureListeners)
{
- if (listener instanceof ServerSessionPacketHandler)
+ if (listener instanceof ServerSessionImpl)
{
empty =3D false;
=
@@ -528,9 +531,12 @@
RemotingConnection conn =3D removeConnection(id);
=
HornetQException me =3D new HornetQException(HornetQExcepti=
on.CONNECTION_TIMEDOUT,
- "Did not receive=
ping from " + conn.getRemoteAddress() +
+ "Did not receive=
data from " + conn.getRemoteAddress() +
". It i=
s likely the client has exited or crashed without " +
- "closin=
g its connection, or the network between the server and client has failed. =
The connection will now be closed.");
+ "closin=
g its connection, or the network between the server and client has failed. =
" +
+ "You al=
so might have configured connection-ttl and client-failure-check-period inc=
orrectly. " +
+ "Please=
check user manual for more information." +
+ " The c=
onnection will now be closed.");
conn.fail(me);
}
=
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010=
-10-06 08:18:15 UTC (rev 9757)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010=
-10-06 11:24:44 UTC (rev 9758)
@@ -73,7 +73,7 @@
* @author Jeff Mesnil
* @author Andy Taylor
*/
-public class ServerSessionImpl implements ServerSession, FailureListener
+public class ServerSessionImpl implements ServerSession , FailureListener
{
// Constants ----------------------------------------------------------=
-------------------
=
Modified: trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.j=
ava
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java 20=
10-10-06 08:18:15 UTC (rev 9757)
+++ trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java 20=
10-10-06 11:24:44 UTC (rev 9758)
@@ -77,14 +77,22 @@
* @return true if removed
*/
boolean removeCloseListener(CloseListener listener);
-
+ =
+ List removeCloseListeners();
+ =
+ void setCloseListeners(List listeners);
+ =
+ =
/**
* return all the failure listeners
*
* @return the listeners
*/
List getFailureListeners();
+ =
+ List removeFailureListeners();
=
+
/**
* set the failure listeners.
*
Added: trunk/tests/src/org/hornetq/tests/integration/stomp/StompConnectionC=
leanupTest.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompConnectionClea=
nupTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompConnectionClea=
nupTest.java 2010-10-06 11:24:44 UTC (rev 9758)
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.stomp;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+
+import junit.framework.Assert;
+
+import org.hornetq.core.protocol.stomp.Stomp;
+import org.hornetq.jms.server.JMSServerManager;
+
+/**
+ * A StompConnectionCleanupTest
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class StompConnectionCleanupTest extends StompTestBase
+{
+ private static final long CONNECTION_TTL =3D 2000;
+
+ public void testConnectionCleanup() throws Exception
+ {
+ String frame =3D "CONNECT\n" + "login: brianm\n" + "passcode: wombat=
s\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ frame =3D receiveFrame(10000);
+ =
+ //We send and consumer a message to ensure a STOMP connection and se=
rver session is created
+
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =3D "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQue=
ueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
+ sendFrame(frame);
+
+ frame =3D "SEND\n" + "destination:" + getQueuePrefix() + getQueueNam=
e() + "\n\n" + "Hello World" + Stomp.NULL;
+ sendFrame(frame);
+
+ frame =3D receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.indexOf("destination:") > 0);
+ =
+ // Now we wait until the connection is cleared on the server, which =
will happen some time after ttl, since no data
+ // is being sent
+
+ long start =3D System.currentTimeMillis();
+
+ while (true)
+ {
+ int connCount =3D server.getHornetQServer().getRemotingService().=
getConnections().size();
+
+ int sessionCount =3D server.getHornetQServer().getSessions().size=
();
+ =
+ // All connections and sessions should be timed out including STO=
MP + JMS connection
+
+ if (connCount =3D=3D 0 && sessionCount =3D=3D 0)
+ {
+ break;
+ }
+ =
+ Thread.sleep(10);
+
+ if (System.currentTimeMillis() - start > 10000)
+ {
+ fail("Timed out waiting for connection to be cleared up");
+ }
+ } =
+ }
+ =
+ public void testConnectionNotCleanedUp() throws Exception
+ {
+ String frame =3D "CONNECT\n" + "login: brianm\n" + "passcode: wombat=
s\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ frame =3D receiveFrame(10000);
+ =
+ //We send and consumer a message to ensure a STOMP connection and se=
rver session is created
+
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ MessageConsumer consumer =3D session.createConsumer(queue);
+ =
+ long time =3D CONNECTION_TTL * 3;
+ =
+ long start =3D System.currentTimeMillis();
+ =
+ //Send msgs for an amount of time > connection_ttl make sure connect=
ion is not closed
+ while (true)
+ {
+ //Send and receive a msg
+ =
+ frame =3D "SEND\n" + "destination:" + getQueuePrefix() + getQueue=
Name() + "\n\n" + "Hello World" + Stomp.NULL;
+ sendFrame(frame);
+
+ Message msg =3D consumer.receive(1000);
+ assertNotNull(msg);
+ =
+ Thread.sleep(100);
+ =
+ if (System.currentTimeMillis() - start > time)
+ {
+ break;
+ }
+ }
+ =
+ }
+ =
+ @Override
+ protected JMSServerManager createServer() throws Exception
+ {
+ JMSServerManager s =3D super.createServer();
+ =
+ s.getHornetQServer().getConfiguration().setConnectionTTLOverride(CON=
NECTION_TTL);
+ =
+ return s;
+ }
+}
Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010=
-10-06 08:18:15 UTC (rev 9757)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010=
-10-06 11:24:44 UTC (rev 9758)
@@ -19,77 +19,29 @@
=
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
import java.net.SocketTimeoutException;
-import java.util.HashMap;
-import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
=
import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
-import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
import javax.jms.TextMessage;
-import javax.jms.Topic;
=
import junit.framework.Assert;
=
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.stomp.Stomp;
-import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
-import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
-import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
-import org.hornetq.core.remoting.impl.netty.TransportConstants;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.HornetQServers;
-import org.hornetq.jms.client.HornetQConnectionFactory;
-import org.hornetq.jms.server.JMSServerManager;
-import org.hornetq.jms.server.config.JMSConfiguration;
-import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
-import org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl;
-import org.hornetq.jms.server.config.impl.TopicConfigurationImpl;
-import org.hornetq.jms.server.impl.JMSServerManagerImpl;
-import org.hornetq.spi.core.protocol.ProtocolType;
-import org.hornetq.tests.unit.util.InVMContext;
-import org.hornetq.tests.util.UnitTestCase;
=
-public class StompTest extends UnitTestCase
+public class StompTest extends StompTestBase
{
private static final transient Logger log =3D Logger.getLogger(StompTes=
t.class);
=
- private int port =3D 61613;
-
- private Socket stompSocket;
-
- private ByteArrayOutputStream inputBuffer;
-
- private ConnectionFactory connectionFactory;
-
- private Connection connection;
-
- private Session session;
-
- private Queue queue;
-
- private Topic topic;
-
- private JMSServerManager server;
- =
public void testSendManyMessages() throws Exception
{
MessageConsumer consumer =3D session.createConsumer(queue);
@@ -106,7 +58,7 @@
=
public void onMessage(Message arg0)
{
- //System.out.println("<<< " + (1000 - latch.getCount()));
+ // System.out.println("<<< " + (1000 - latch.getCount()));
latch.countDown();
}
});
@@ -115,7 +67,7 @@
for (int i =3D 1; i <=3D count; i++)
{
// Thread.sleep(1);
- //System.out.println(">>> " + i);
+ // System.out.println(">>> " + i);
sendFrame(frame);
}
=
@@ -191,14 +143,14 @@
TextMessage message =3D (TextMessage)consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals("Hello World", message.getText());
- =
+
// Make sure that the timestamp is valid - should
// be very close to the current time.
long tnow =3D System.currentTimeMillis();
long tmsg =3D message.getJMSTimestamp();
Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
}
- =
+
/*
* Some STOMP clients erroneously put a new line \n *after* the termina=
ting NUL char at the end of the frame
* This means next frame read might have a \n a the beginning.
@@ -215,14 +167,20 @@
frame =3D receiveFrame(10000);
Assert.assertTrue(frame.startsWith("CONNECTED"));
=
- frame =3D "SEND\n" + "destination:" + getQueuePrefix() + getQueueNam=
e() + "\n\n" + "Hello World" + Stomp.NULL + "\n";
+ frame =3D "SEND\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n\n" +
+ "Hello World" +
+ Stomp.NULL +
+ "\n";
=
sendFrame(frame);
=
TextMessage message =3D (TextMessage)consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals("Hello World", message.getText());
- =
+
// Make sure that the timestamp is valid - should
// be very close to the current time.
long tnow =3D System.currentTimeMillis();
@@ -258,7 +216,7 @@
TextMessage message =3D (TextMessage)consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals("Hello World", message.getText());
- =
+
// Make sure that the timestamp is valid - should
// be very close to the current time.
long tnow =3D System.currentTimeMillis();
@@ -1279,210 +1237,4 @@
frame =3D "DISCONNECT\n" + "\n\n" + Stomp.NULL;
sendFrame(frame);
}
-
- // Implementation methods
- // --------------------------------------------------------------------=
-----
- protected void setUp() throws Exception
- {
- super.setUp();
-
- server =3D createServer();
- server.start();
- connectionFactory =3D createConnectionFactory();
-
- stompSocket =3D createSocket();
- inputBuffer =3D new ByteArrayOutputStream();
-
- connection =3D connectionFactory.createConnection();
- session =3D connection.createSession(false, Session.AUTO_ACKNOWLEDGE=
);
- queue =3D session.createQueue(getQueueName());
- topic =3D session.createTopic(getTopicName());
- connection.start();
- }
-
- /**
- * @return
- * @throws Exception =
- */
- private JMSServerManager createServer() throws Exception
- {
- Configuration config =3D new ConfigurationImpl();
- config.setSecurityEnabled(false);
- config.setPersistenceEnabled(false);
-
- Map params =3D new HashMap();
- params.put(TransportConstants.PROTOCOL_PROP_NAME, ProtocolType.STOMP=
.toString());
- params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEF=
AULT_STOMP_PORT);
- TransportConfiguration stompTransport =3D new TransportConfiguration=
(NettyAcceptorFactory.class.getName(), params);
- config.getAcceptorConfigurations().add(stompTransport);
- config.getAcceptorConfigurations().add(new TransportConfiguration(In=
VMAcceptorFactory.class.getName()));
- HornetQServer hornetQServer =3D HornetQServers.newHornetQServer(conf=
ig);
-
- JMSConfiguration jmsConfig =3D new JMSConfigurationImpl();
- jmsConfig.getQueueConfigurations()
- .add(new JMSQueueConfigurationImpl(getQueueName(), null, fa=
lse, getQueueName()));
- jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl(ge=
tTopicName(), getTopicName()));
- server =3D new JMSServerManagerImpl(hornetQServer, jmsConfig);
- server.setContext(new InVMContext());
- return server;
- }
-
- protected void tearDown() throws Exception
- {
- connection.close();
- if (stompSocket !=3D null)
- {
- stompSocket.close();
- }
- server.stop();
-
- super.tearDown();
- }
-
- protected void reconnect() throws Exception
- {
- reconnect(0);
- }
-
- protected void reconnect(long sleep) throws Exception
- {
- stompSocket.close();
-
- if (sleep > 0)
- {
- Thread.sleep(sleep);
- }
-
- stompSocket =3D createSocket();
- inputBuffer =3D new ByteArrayOutputStream();
- }
-
- protected ConnectionFactory createConnectionFactory()
- {
- return new HornetQConnectionFactory(new TransportConfiguration(InVMC=
onnectorFactory.class.getName()));
- }
-
- protected Socket createSocket() throws IOException
- {
- return new Socket("127.0.0.1", port);
- }
-
- protected String getQueueName()
- {
- return "test";
- }
-
- protected String getQueuePrefix()
- {
- return "jms.queue.";
- }
-
- protected String getTopicName()
- {
- return "testtopic";
- }
-
- protected String getTopicPrefix()
- {
- return "jms.topic.";
- }
-
- public void sendFrame(String data) throws Exception
- {
- byte[] bytes =3D data.getBytes("UTF-8");
- OutputStream outputStream =3D stompSocket.getOutputStream();
- for (int i =3D 0; i < bytes.length; i++)
- {
- outputStream.write(bytes[i]);
- }
- outputStream.flush();
- }
-
- public void sendFrame(byte[] data) throws Exception
- {
- OutputStream outputStream =3D stompSocket.getOutputStream();
- for (int i =3D 0; i < data.length; i++)
- {
- outputStream.write(data[i]);
- }
- outputStream.flush();
- }
-
- public String receiveFrame(long timeOut) throws Exception
- {
- stompSocket.setSoTimeout((int)timeOut);
- InputStream is =3D stompSocket.getInputStream();
- int c =3D 0;
- for (;;)
- {
- c =3D is.read();
- if (c < 0)
- {
- throw new IOException("socket closed.");
- }
- else if (c =3D=3D 0)
- {
- c =3D is.read();
- if (c !=3D '\n')
- {
- byte[] ba =3D inputBuffer.toByteArray();
- System.out.println(new String(ba, "UTF-8"));
- }
- Assert.assertEquals("Expecting stomp frame to terminate with \=
0\n", c, '\n');
- byte[] ba =3D inputBuffer.toByteArray();
- inputBuffer.reset();
- return new String(ba, "UTF-8");
- }
- else
- {
- inputBuffer.write(c);
- }
- }
- }
-
- public void sendMessage(String msg) throws Exception
- {
- sendMessage(msg, queue);
- }
-
- public void sendMessage(String msg, Destination destination) throws Exc=
eption
- {
- MessageProducer producer =3D session.createProducer(destination);
- TextMessage message =3D session.createTextMessage(msg);
- producer.send(message);
- }
-
- public void sendMessage(byte[] data, Destination destination) throws Ex=
ception
- {
- sendMessage(data, "foo", "xyz", destination);
- }
-
- public void sendMessage(String msg, String propertyName, String propert=
yValue) throws Exception
- {
- sendMessage(msg.getBytes("UTF-8"), propertyName, propertyValue, queu=
e);
- }
-
- public void sendMessage(byte[] data, String propertyName, String proper=
tyValue, Destination destination) throws Exception
- {
- MessageProducer producer =3D session.createProducer(destination);
- BytesMessage message =3D session.createBytesMessage();
- message.setStringProperty(propertyName, propertyValue);
- message.writeBytes(data);
- producer.send(message);
- }
-
- protected void waitForReceipt() throws Exception
- {
- String frame =3D receiveFrame(50000);
- assertNotNull(frame);
- assertTrue(frame.indexOf("RECEIPT") > -1);
- }
-
- protected void waitForFrameToTakeEffect() throws InterruptedException
- {
- // bit of a dirty hack :)
- // another option would be to force some kind of receipt to be retur=
ned
- // from the frame
- Thread.sleep(2000);
- }
}
Added: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.ja=
va
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java =
(rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java =
2010-10-06 11:24:44 UTC (rev 9758)
@@ -0,0 +1,290 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.tests.integration.stomp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
+import org.hornetq.core.remoting.impl.netty.TransportConstants;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.jms.server.config.JMSConfiguration;
+import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
+import org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl;
+import org.hornetq.jms.server.config.impl.TopicConfigurationImpl;
+import org.hornetq.jms.server.impl.JMSServerManagerImpl;
+import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.tests.unit.util.InVMContext;
+import org.hornetq.tests.util.UnitTestCase;
+
+public abstract class StompTestBase extends UnitTestCase
+{
+ private static final transient Logger log =3D Logger.getLogger(StompTes=
tBase.class);
+
+ private int port =3D 61613;
+
+ private Socket stompSocket;
+
+ private ByteArrayOutputStream inputBuffer;
+
+ private ConnectionFactory connectionFactory;
+
+ private Connection connection;
+
+ protected Session session;
+
+ protected Queue queue;
+
+ protected Topic topic;
+
+ protected JMSServerManager server;
+ =
+ =
+
+ // Implementation methods
+ // --------------------------------------------------------------------=
-----
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ server =3D createServer();
+ server.start();
+ connectionFactory =3D createConnectionFactory();
+
+ stompSocket =3D createSocket();
+ inputBuffer =3D new ByteArrayOutputStream();
+
+ connection =3D connectionFactory.createConnection();
+ session =3D connection.createSession(false, Session.AUTO_ACKNOWLEDGE=
);
+ queue =3D session.createQueue(getQueueName());
+ topic =3D session.createTopic(getTopicName());
+ connection.start();
+ }
+
+ /**
+ * @return
+ * @throws Exception =
+ */
+ protected JMSServerManager createServer() throws Exception
+ {
+ Configuration config =3D new ConfigurationImpl();
+ config.setSecurityEnabled(false);
+ config.setPersistenceEnabled(false);
+
+ Map params =3D new HashMap();
+ params.put(TransportConstants.PROTOCOL_PROP_NAME, ProtocolType.STOMP=
.toString());
+ params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEF=
AULT_STOMP_PORT);
+ TransportConfiguration stompTransport =3D new TransportConfiguration=
(NettyAcceptorFactory.class.getName(), params);
+ config.getAcceptorConfigurations().add(stompTransport);
+ config.getAcceptorConfigurations().add(new TransportConfiguration(In=
VMAcceptorFactory.class.getName()));
+ HornetQServer hornetQServer =3D HornetQServers.newHornetQServer(conf=
ig);
+
+ JMSConfiguration jmsConfig =3D new JMSConfigurationImpl();
+ jmsConfig.getQueueConfigurations()
+ .add(new JMSQueueConfigurationImpl(getQueueName(), null, fa=
lse, getQueueName()));
+ jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl(ge=
tTopicName(), getTopicName()));
+ server =3D new JMSServerManagerImpl(hornetQServer, jmsConfig);
+ server.setContext(new InVMContext());
+ return server;
+ }
+
+ protected void tearDown() throws Exception
+ {
+ connection.close();
+ if (stompSocket !=3D null)
+ {
+ stompSocket.close();
+ }
+ server.stop();
+
+ super.tearDown();
+ }
+
+ protected void reconnect() throws Exception
+ {
+ reconnect(0);
+ }
+
+ protected void reconnect(long sleep) throws Exception
+ {
+ stompSocket.close();
+
+ if (sleep > 0)
+ {
+ Thread.sleep(sleep);
+ }
+
+ stompSocket =3D createSocket();
+ inputBuffer =3D new ByteArrayOutputStream();
+ }
+
+ protected ConnectionFactory createConnectionFactory()
+ {
+ return new HornetQConnectionFactory(new TransportConfiguration(InVMC=
onnectorFactory.class.getName()));
+ }
+
+ protected Socket createSocket() throws IOException
+ {
+ return new Socket("127.0.0.1", port);
+ }
+
+ protected String getQueueName()
+ {
+ return "test";
+ }
+
+ protected String getQueuePrefix()
+ {
+ return "jms.queue.";
+ }
+
+ protected String getTopicName()
+ {
+ return "testtopic";
+ }
+
+ protected String getTopicPrefix()
+ {
+ return "jms.topic.";
+ }
+
+ public void sendFrame(String data) throws Exception
+ {
+ byte[] bytes =3D data.getBytes("UTF-8");
+ OutputStream outputStream =3D stompSocket.getOutputStream();
+ for (int i =3D 0; i < bytes.length; i++)
+ {
+ outputStream.write(bytes[i]);
+ }
+ outputStream.flush();
+ }
+
+ public void sendFrame(byte[] data) throws Exception
+ {
+ OutputStream outputStream =3D stompSocket.getOutputStream();
+ for (int i =3D 0; i < data.length; i++)
+ {
+ outputStream.write(data[i]);
+ }
+ outputStream.flush();
+ }
+
+ public String receiveFrame(long timeOut) throws Exception
+ {
+ stompSocket.setSoTimeout((int)timeOut);
+ InputStream is =3D stompSocket.getInputStream();
+ int c =3D 0;
+ for (;;)
+ {
+ c =3D is.read();
+ if (c < 0)
+ {
+ throw new IOException("socket closed.");
+ }
+ else if (c =3D=3D 0)
+ {
+ c =3D is.read();
+ if (c !=3D '\n')
+ {
+ byte[] ba =3D inputBuffer.toByteArray();
+ System.out.println(new String(ba, "UTF-8"));
+ }
+ Assert.assertEquals("Expecting stomp frame to terminate with \=
0\n", c, '\n');
+ byte[] ba =3D inputBuffer.toByteArray();
+ inputBuffer.reset();
+ return new String(ba, "UTF-8");
+ }
+ else
+ {
+ inputBuffer.write(c);
+ }
+ }
+ }
+
+ public void sendMessage(String msg) throws Exception
+ {
+ sendMessage(msg, queue);
+ }
+
+ public void sendMessage(String msg, Destination destination) throws Exc=
eption
+ {
+ MessageProducer producer =3D session.createProducer(destination);
+ TextMessage message =3D session.createTextMessage(msg);
+ producer.send(message);
+ }
+
+ public void sendMessage(byte[] data, Destination destination) throws Ex=
ception
+ {
+ sendMessage(data, "foo", "xyz", destination);
+ }
+
+ public void sendMessage(String msg, String propertyName, String propert=
yValue) throws Exception
+ {
+ sendMessage(msg.getBytes("UTF-8"), propertyName, propertyValue, queu=
e);
+ }
+
+ public void sendMessage(byte[] data, String propertyName, String proper=
tyValue, Destination destination) throws Exception
+ {
+ MessageProducer producer =3D session.createProducer(destination);
+ BytesMessage message =3D session.createBytesMessage();
+ message.setStringProperty(propertyName, propertyValue);
+ message.writeBytes(data);
+ producer.send(message);
+ }
+
+ protected void waitForReceipt() throws Exception
+ {
+ String frame =3D receiveFrame(50000);
+ assertNotNull(frame);
+ assertTrue(frame.indexOf("RECEIPT") > -1);
+ }
+
+ protected void waitForFrameToTakeEffect() throws InterruptedException
+ {
+ // bit of a dirty hack :)
+ // another option would be to force some kind of receipt to be retur=
ned
+ // from the frame
+ Thread.sleep(2000);
+ }
+}
--===============6641207432056589441==--