From do-not-reply at jboss.org Wed Oct 6 07:24:45 2010 Content-Type: multipart/mixed; boundary="===============6641207432056589441==" MIME-Version: 1.0 From: do-not-reply at jboss.org To: hornetq-commits at lists.jboss.org Subject: [hornetq-commits] JBoss hornetq SVN: r9758 - in trunk: src/main/org/hornetq/core/protocol/core/impl and 5 other directories. Date: Wed, 06 Oct 2010 07:24:45 -0400 Message-ID: <201010061124.o96BOjtK028870@svn01.web.mwc.hst.phx2.redhat.com> --===============6641207432056589441== Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: quoted-printable Author: timfox Date: 2010-10-06 07:24:44 -0400 (Wed, 06 Oct 2010) New Revision: 9758 Added: trunk/tests/src/org/hornetq/tests/integration/stomp/StompConnectionClean= upTest.java trunk/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java Modified: trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler= .java trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.j= ava trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImp= l.java trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl= .java trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java Log: https://jira.jboss.org/browse/HORNETQ-526 Modified: trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacket= Handler.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandle= r.java 2010-10-06 08:18:15 UTC (rev 9757) +++ trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandle= r.java 2010-10-06 11:24:44 UTC (rev 9758) @@ -107,7 +107,7 @@ * @author Andy Taylor * @author Clebert Suconic */ -public class ServerSessionPacketHandler implements ChannelHandler, CloseLi= stener, FailureListener +public class ServerSessionPacketHandler implements ChannelHandler { private static final Logger log =3D Logger.getLogger(ServerSessionPacke= tHandler.class); = @@ -150,8 +150,6 @@ { direct =3D false; } - = - addConnectionListeners(); } = public long getID() @@ -159,22 +157,6 @@ return channel.getID(); } = - public void connectionFailed(final HornetQException exception) - { - log.warn("Client connection failed, clearing up resources for sessio= n " + session.getName()); - - try - { - session.close(true); - } - catch (Exception e) - { - log.error("Failed to close session", e); - } - - log.warn("Cleared up resources for session " + session.getName()); - } - public void close() { channel.flushConfirmations(); @@ -189,22 +171,6 @@ } } = - public void connectionClosed() - { - } - - private void addConnectionListeners() - { - remotingConnection.addFailureListener(this); - remotingConnection.addCloseListener(this); - } - - private void removeConnectionListeners() - { - remotingConnection.removeFailureListener(this); - remotingConnection.removeCloseListener(this); - } - public Channel getChannel() { return channel; @@ -423,7 +389,7 @@ { requiresResponse =3D true; session.close(false); - removeConnectionListeners(); + // removeConnectionListeners(); response =3D new NullResponseMessage(); flush =3D true; closeChannel =3D true; @@ -601,10 +567,10 @@ // might be executed // before we have transferred the connection, leaving it in a starte= d state session.setTransferring(true); - - remotingConnection.removeFailureListener(this); - remotingConnection.removeCloseListener(this); - + = + List closeListeners =3D remotingConnection.removeClos= eListeners(); + List failureListeners =3D remotingConnection.remove= FailureListeners(); + = // Note. We do not destroy the replicating connection here. In the c= ase the live server has really crashed // then the connection will get cleaned up anyway when the server pi= ng timeout kicks in. // In the case the live server is really still up, i.e. a split brai= n situation (or in tests), then closing @@ -618,8 +584,8 @@ = remotingConnection =3D newConnection; = - remotingConnection.addFailureListener(this); - remotingConnection.addCloseListener(this); + remotingConnection.setCloseListeners(closeListeners); + remotingConnection.setFailureListeners(failureListeners); = int serverLastReceivedCommandID =3D channel.getLastConfirmedCommandI= D(); = Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCal= lback.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.= java 2010-10-06 08:18:15 UTC (rev 9757) +++ trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.= java 2010-10-06 11:24:44 UTC (rev 9758) @@ -27,7 +27,7 @@ /** * A CoreSessionCallback * - * @author Jeff Mesnil + * @author Tim Fox * * */ @@ -40,7 +40,7 @@ private ProtocolManager protocolManager; = private String name; - = + public CoreSessionCallback(String name, ProtocolManager protocolManager= , Channel channel) { this.name =3D name; @@ -54,8 +54,8 @@ = channel.send(packet); = - int size =3D packet.getPacketSize(); - = + int size =3D packet.getPacketSize(); + return size; } = @@ -67,15 +67,15 @@ = return packet.getPacketSize(); } - = + public int sendMessage(ServerMessage message, long consumerID, int deli= veryCount) { Packet packet =3D new SessionReceiveMessage(consumerID, message, del= iveryCount); = channel.sendBatched(packet); - = - int size =3D packet.getPacketSize(); = + int size =3D packet.getPacketSize(); + return size; } = Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnec= tionImpl.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionIm= pl.java 2010-10-06 08:18:15 UTC (rev 9757) +++ trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionIm= pl.java 2010-10-06 11:24:44 UTC (rev 9758) @@ -225,6 +225,31 @@ return closeListeners.remove(listener); } = + public List removeCloseListeners() + { + List ret =3D new ArrayList(closeListen= ers); + = + closeListeners.clear(); + = + return ret; + } + + public List removeFailureListeners() + { + List ret =3D new ArrayList(failure= Listeners); + = + failureListeners.clear(); + = + return ret; = + } + + public void setCloseListeners(List listeners) + { + closeListeners.clear(); + = + closeListeners.addAll(listeners); = + } + public HornetQBuffer createBuffer(final int size) { return transportConnection.createBuffer(size); @@ -471,6 +496,7 @@ channels.clear(); } } = + = private void callFailureListeners(final HornetQException me) { final List listenersClone =3D new ArrayList(failureListeners); Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.ja= va =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 201= 0-10-06 08:18:15 UTC (rev 9757) +++ trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 201= 0-10-06 11:24:44 UTC (rev 9758) @@ -13,8 +13,10 @@ = package org.hornetq.core.protocol.stomp; = +import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; = import org.hornetq.api.core.HornetQBuffer; import org.hornetq.api.core.HornetQBuffers; @@ -32,8 +34,9 @@ * * */ -class StompConnection implements RemotingConnection +public class StompConnection implements RemotingConnection { + private static final Logger log =3D Logger.getLogger(StompConnection.cl= ass); = private final StompProtocolManager manager; @@ -49,9 +52,17 @@ private boolean valid; = private boolean destroyed =3D false; - = + private StompDecoder decoder =3D new StompDecoder(); + + private final List failureListeners =3D new CopyOnWrit= eArrayList(); + + private final List closeListeners =3D new CopyOnWriteArr= ayList(); + + private final Object failLock =3D new Object(); = + private volatile boolean dataReceived; + public StompDecoder getDecoder() { return decoder; @@ -64,17 +75,90 @@ this.manager =3D manager; } = - public void addCloseListener(CloseListener listener) + public void addFailureListener(final FailureListener listener) { + if (listener =3D=3D null) + { + throw new IllegalStateException("FailureListener cannot be null"); + } + + failureListeners.add(listener); } = - public void addFailureListener(FailureListener listener) + public boolean removeFailureListener(final FailureListener listener) { + if (listener =3D=3D null) + { + throw new IllegalStateException("FailureListener cannot be null"); + } + + return failureListeners.remove(listener); } = + public void addCloseListener(final CloseListener listener) + { + if (listener =3D=3D null) + { + throw new IllegalStateException("CloseListener cannot be null"); + } + + closeListeners.add(listener); + } + + public boolean removeCloseListener(final CloseListener listener) + { + if (listener =3D=3D null) + { + throw new IllegalStateException("CloseListener cannot be null"); + } + + return closeListeners.remove(listener); + } + + public List removeCloseListeners() + { + List ret =3D new ArrayList(closeListen= ers); + + closeListeners.clear(); + + return ret; + } + + public List removeFailureListeners() + { + List ret =3D new ArrayList(failure= Listeners); + + failureListeners.clear(); + + return ret; + } + + public void setCloseListeners(List listeners) + { + closeListeners.clear(); + + closeListeners.addAll(listeners); + } + + public void setFailureListeners(final List listeners) + { + failureListeners.clear(); + + failureListeners.addAll(listeners); + } + = + public void setDataReceived() + { + dataReceived =3D true; + } + public boolean checkDataReceived() { - return true; + boolean res =3D dataReceived; + + dataReceived =3D false; + + return res; } = public HornetQBuffer createBuffer(int size) @@ -84,13 +168,23 @@ = public void destroy() { - if (destroyed) + synchronized (failLock) { - return; + if (destroyed) + { + return; + } } = destroyed =3D true; = + internalClose(); + + callClosingListeners(); + } + + private void internalClose() + { transportConnection.close(); = manager.cleanup(this); @@ -100,8 +194,29 @@ { } = - public void fail(HornetQException me) + public void fail(final HornetQException me) { + synchronized (failLock) + { + if (destroyed) + { + return; + } + + destroyed =3D true; + } + + log.warn("Connection failure has been detected: " + me.getMessage() + + " [code=3D" + + me.getCode() + + "]"); + + // Then call the listeners + callFailureListeners(me); + + callClosingListeners(); + = + internalClose(); } = public void flush() @@ -140,20 +255,6 @@ return destroyed; } = - public boolean removeCloseListener(CloseListener listener) - { - return false; - } - - public boolean removeFailureListener(FailureListener listener) - { - return false; - } - - public void setFailureListeners(List listeners) - { - } - public void bufferReceived(Object connectionID, HornetQBuffer buffer) { manager.handleBuffer(this, buffer); @@ -188,7 +289,7 @@ { return clientID; } - = + public boolean isValid() { return valid; @@ -198,4 +299,45 @@ { this.valid =3D valid; } + + private void callFailureListeners(final HornetQException me) + { + final List listenersClone =3D new ArrayList(failureListeners); + + for (final FailureListener listener : listenersClone) + { + try + { + listener.connectionFailed(me); + } + catch (final Throwable t) + { + // Failure of one listener to execute shouldn't prevent others + // from + // executing + log.error("Failed to execute failure listener", t); + } + } + } + + private void callClosingListeners() + { + final List listenersClone =3D new ArrayList(closeListeners); + + for (final CloseListener listener : listenersClone) + { + try + { + listener.connectionClosed(); + } + catch (final Throwable t) + { + // Failure of one listener to execute shouldn't prevent others + // from + // executing + log.error("Failed to execute failure listener", t); + } + } + } + } Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManag= er.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.jav= a 2010-10-06 08:18:15 UTC (rev 9757) +++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.jav= a 2010-10-06 11:24:44 UTC (rev 9758) @@ -124,6 +124,7 @@ else { // Default to 1 minute - which is same as core protocol + return new ConnectionEntry(conn, System.currentTimeMillis(), 1 * = 60 * 1000); } } @@ -143,6 +144,8 @@ { StompConnection conn =3D (StompConnection)connection; = + conn.setDataReceived(); + = StompDecoder decoder =3D conn.getDecoder(); = do @@ -217,7 +220,6 @@ = if (request.getHeaders().containsKey(Stomp.Headers.RECEIPT_REQ= UESTED)) { - log.info("receipt requested"); if (response =3D=3D null) { Map h =3D new HashMap(); Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServ= iceImpl.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImp= l.java 2010-10-06 08:18:15 UTC (rev 9757) +++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImp= l.java 2010-10-06 11:24:44 UTC (rev 9758) @@ -33,13 +33,13 @@ import org.hornetq.api.core.TransportConfiguration; import org.hornetq.core.config.Configuration; import org.hornetq.core.logging.Logger; -import org.hornetq.core.protocol.core.ServerSessionPacketHandler; import org.hornetq.core.protocol.core.impl.CoreProtocolManagerFactory; import org.hornetq.core.protocol.stomp.StompProtocolManagerFactory; import org.hornetq.core.remoting.FailureListener; import org.hornetq.core.remoting.impl.netty.TransportConstants; import org.hornetq.core.remoting.server.RemotingService; import org.hornetq.core.server.HornetQServer; +import org.hornetq.core.server.impl.ServerSessionImpl; import org.hornetq.core.server.management.ManagementService; import org.hornetq.spi.core.protocol.ConnectionEntry; import org.hornetq.spi.core.protocol.ProtocolManager; @@ -133,7 +133,8 @@ // difference between Stomp and Stomp over Web Sockets is handled in= NettyAcceptor.getPipeline() this.protocolMap.put(ProtocolType.STOMP, new StompProtocolManagerFac= tory().createProtocolManager(server, = interceptors)); - this.protocolMap.put(ProtocolType.STOMP_WS, new StompProtocolManager= Factory().createProtocolManager(server, interceptors)); + this.protocolMap.put(ProtocolType.STOMP_WS, new StompProtocolManager= Factory().createProtocolManager(server, + = interceptors)); } = // RemotingService implementation ------------------------------- @@ -144,15 +145,14 @@ { return; } - = - ClassLoader tccl =3D - AccessController.doPrivileged(new PrivilegedAction() + + ClassLoader tccl =3D AccessController.doPrivileged(new PrivilegedAct= ion() + { + public ClassLoader run() { - public ClassLoader run() - { - return Thread.currentThread().getContextClassLoader(); - } - }); + return Thread.currentThread().getContextClassLoader(); + } + }); = // The remoting service maintains it's own thread pool for handling = remoting traffic // If OIO each connection will have it's own thread @@ -161,7 +161,8 @@ // to support many hundreds of connections, but the main thread pool= must be kept small for better performance = ThreadFactory tFactory =3D new HornetQThreadFactory("HornetQ-remotin= g-threads" + System.identityHashCode(this), - false, tccl); + false, + tccl); = threadPool =3D Executors.newCachedThreadPool(tFactory); = @@ -322,6 +323,8 @@ } else { + log.info("failed to remove connection"); + return null; } } @@ -388,7 +391,7 @@ = for (FailureListener listener : failureListeners) { - if (listener instanceof ServerSessionPacketHandler) + if (listener instanceof ServerSessionImpl) { empty =3D false; = @@ -528,9 +531,12 @@ RemotingConnection conn =3D removeConnection(id); = HornetQException me =3D new HornetQException(HornetQExcepti= on.CONNECTION_TIMEDOUT, - "Did not receive= ping from " + conn.getRemoteAddress() + + "Did not receive= data from " + conn.getRemoteAddress() + ". It i= s likely the client has exited or crashed without " + - "closin= g its connection, or the network between the server and client has failed. = The connection will now be closed."); + "closin= g its connection, or the network between the server and client has failed. = " + + "You al= so might have configured connection-ttl and client-failure-check-period inc= orrectly. " + + "Please= check user manual for more information." + + " The c= onnection will now be closed."); conn.fail(me); } = Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010= -10-06 08:18:15 UTC (rev 9757) +++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010= -10-06 11:24:44 UTC (rev 9758) @@ -73,7 +73,7 @@ * @author Jeff Mesnil * @author Andy Taylor */ -public class ServerSessionImpl implements ServerSession, FailureListener +public class ServerSessionImpl implements ServerSession , FailureListener { // Constants ----------------------------------------------------------= ------------------- = Modified: trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.j= ava =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java 20= 10-10-06 08:18:15 UTC (rev 9757) +++ trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java 20= 10-10-06 11:24:44 UTC (rev 9758) @@ -77,14 +77,22 @@ * @return true if removed */ boolean removeCloseListener(CloseListener listener); - + = + List removeCloseListeners(); + = + void setCloseListeners(List listeners); + = + = /** * return all the failure listeners * * @return the listeners */ List getFailureListeners(); + = + List removeFailureListeners(); = + /** * set the failure listeners. *

Added: trunk/tests/src/org/hornetq/tests/integration/stomp/StompConnectionC= leanupTest.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/tests/src/org/hornetq/tests/integration/stomp/StompConnectionClea= nupTest.java (rev 0) +++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompConnectionClea= nupTest.java 2010-10-06 11:24:44 UTC (rev 9758) @@ -0,0 +1,128 @@ +/* + * Copyright 2010 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.tests.integration.stomp; + +import javax.jms.Message; +import javax.jms.MessageConsumer; + +import junit.framework.Assert; + +import org.hornetq.core.protocol.stomp.Stomp; +import org.hornetq.jms.server.JMSServerManager; + +/** + * A StompConnectionCleanupTest + * + * @author Tim Fox + * + * + */ +public class StompConnectionCleanupTest extends StompTestBase +{ + private static final long CONNECTION_TTL =3D 2000; + + public void testConnectionCleanup() throws Exception + { + String frame =3D "CONNECT\n" + "login: brianm\n" + "passcode: wombat= s\n\n" + Stomp.NULL; + sendFrame(frame); + frame =3D receiveFrame(10000); + = + //We send and consumer a message to ensure a STOMP connection and se= rver session is created + + Assert.assertTrue(frame.startsWith("CONNECTED")); + + frame =3D "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQue= ueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; + sendFrame(frame); + + frame =3D "SEND\n" + "destination:" + getQueuePrefix() + getQueueNam= e() + "\n\n" + "Hello World" + Stomp.NULL; + sendFrame(frame); + + frame =3D receiveFrame(10000); + Assert.assertTrue(frame.startsWith("MESSAGE")); + Assert.assertTrue(frame.indexOf("destination:") > 0); + = + // Now we wait until the connection is cleared on the server, which = will happen some time after ttl, since no data + // is being sent + + long start =3D System.currentTimeMillis(); + + while (true) + { + int connCount =3D server.getHornetQServer().getRemotingService().= getConnections().size(); + + int sessionCount =3D server.getHornetQServer().getSessions().size= (); + = + // All connections and sessions should be timed out including STO= MP + JMS connection + + if (connCount =3D=3D 0 && sessionCount =3D=3D 0) + { + break; + } + = + Thread.sleep(10); + + if (System.currentTimeMillis() - start > 10000) + { + fail("Timed out waiting for connection to be cleared up"); + } + } = + } + = + public void testConnectionNotCleanedUp() throws Exception + { + String frame =3D "CONNECT\n" + "login: brianm\n" + "passcode: wombat= s\n\n" + Stomp.NULL; + sendFrame(frame); + frame =3D receiveFrame(10000); + = + //We send and consumer a message to ensure a STOMP connection and se= rver session is created + + Assert.assertTrue(frame.startsWith("CONNECTED")); + + MessageConsumer consumer =3D session.createConsumer(queue); + = + long time =3D CONNECTION_TTL * 3; + = + long start =3D System.currentTimeMillis(); + = + //Send msgs for an amount of time > connection_ttl make sure connect= ion is not closed + while (true) + { + //Send and receive a msg + = + frame =3D "SEND\n" + "destination:" + getQueuePrefix() + getQueue= Name() + "\n\n" + "Hello World" + Stomp.NULL; + sendFrame(frame); + + Message msg =3D consumer.receive(1000); + assertNotNull(msg); + = + Thread.sleep(100); + = + if (System.currentTimeMillis() - start > time) + { + break; + } + } + = + } + = + @Override + protected JMSServerManager createServer() throws Exception + { + JMSServerManager s =3D super.createServer(); + = + s.getHornetQServer().getConfiguration().setConnectionTTLOverride(CON= NECTION_TTL); + = + return s; + } +} Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010= -10-06 08:18:15 UTC (rev 9757) +++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010= -10-06 11:24:44 UTC (rev 9758) @@ -19,77 +19,29 @@ = import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; import java.net.SocketTimeoutException; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; = import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; -import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; import javax.jms.TextMessage; -import javax.jms.Topic; = import junit.framework.Assert; = -import org.hornetq.api.core.TransportConfiguration; -import org.hornetq.core.config.Configuration; -import org.hornetq.core.config.impl.ConfigurationImpl; import org.hornetq.core.logging.Logger; import org.hornetq.core.protocol.stomp.Stomp; -import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory; -import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory; -import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory; -import org.hornetq.core.remoting.impl.netty.TransportConstants; -import org.hornetq.core.server.HornetQServer; -import org.hornetq.core.server.HornetQServers; -import org.hornetq.jms.client.HornetQConnectionFactory; -import org.hornetq.jms.server.JMSServerManager; -import org.hornetq.jms.server.config.JMSConfiguration; -import org.hornetq.jms.server.config.impl.JMSConfigurationImpl; -import org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl; -import org.hornetq.jms.server.config.impl.TopicConfigurationImpl; -import org.hornetq.jms.server.impl.JMSServerManagerImpl; -import org.hornetq.spi.core.protocol.ProtocolType; -import org.hornetq.tests.unit.util.InVMContext; -import org.hornetq.tests.util.UnitTestCase; = -public class StompTest extends UnitTestCase +public class StompTest extends StompTestBase { private static final transient Logger log =3D Logger.getLogger(StompTes= t.class); = - private int port =3D 61613; - - private Socket stompSocket; - - private ByteArrayOutputStream inputBuffer; - - private ConnectionFactory connectionFactory; - - private Connection connection; - - private Session session; - - private Queue queue; - - private Topic topic; - - private JMSServerManager server; - = public void testSendManyMessages() throws Exception { MessageConsumer consumer =3D session.createConsumer(queue); @@ -106,7 +58,7 @@ = public void onMessage(Message arg0) { - //System.out.println("<<< " + (1000 - latch.getCount())); + // System.out.println("<<< " + (1000 - latch.getCount())); latch.countDown(); } }); @@ -115,7 +67,7 @@ for (int i =3D 1; i <=3D count; i++) { // Thread.sleep(1); - //System.out.println(">>> " + i); + // System.out.println(">>> " + i); sendFrame(frame); } = @@ -191,14 +143,14 @@ TextMessage message =3D (TextMessage)consumer.receive(1000); Assert.assertNotNull(message); Assert.assertEquals("Hello World", message.getText()); - = + // Make sure that the timestamp is valid - should // be very close to the current time. long tnow =3D System.currentTimeMillis(); long tmsg =3D message.getJMSTimestamp(); Assert.assertTrue(Math.abs(tnow - tmsg) < 1000); } - = + /* * Some STOMP clients erroneously put a new line \n *after* the termina= ting NUL char at the end of the frame * This means next frame read might have a \n a the beginning. @@ -215,14 +167,20 @@ frame =3D receiveFrame(10000); Assert.assertTrue(frame.startsWith("CONNECTED")); = - frame =3D "SEND\n" + "destination:" + getQueuePrefix() + getQueueNam= e() + "\n\n" + "Hello World" + Stomp.NULL + "\n"; + frame =3D "SEND\n" + "destination:" + + getQueuePrefix() + + getQueueName() + + "\n\n" + + "Hello World" + + Stomp.NULL + + "\n"; = sendFrame(frame); = TextMessage message =3D (TextMessage)consumer.receive(1000); Assert.assertNotNull(message); Assert.assertEquals("Hello World", message.getText()); - = + // Make sure that the timestamp is valid - should // be very close to the current time. long tnow =3D System.currentTimeMillis(); @@ -258,7 +216,7 @@ TextMessage message =3D (TextMessage)consumer.receive(1000); Assert.assertNotNull(message); Assert.assertEquals("Hello World", message.getText()); - = + // Make sure that the timestamp is valid - should // be very close to the current time. long tnow =3D System.currentTimeMillis(); @@ -1279,210 +1237,4 @@ frame =3D "DISCONNECT\n" + "\n\n" + Stomp.NULL; sendFrame(frame); } - - // Implementation methods - // --------------------------------------------------------------------= ----- - protected void setUp() throws Exception - { - super.setUp(); - - server =3D createServer(); - server.start(); - connectionFactory =3D createConnectionFactory(); - - stompSocket =3D createSocket(); - inputBuffer =3D new ByteArrayOutputStream(); - - connection =3D connectionFactory.createConnection(); - session =3D connection.createSession(false, Session.AUTO_ACKNOWLEDGE= ); - queue =3D session.createQueue(getQueueName()); - topic =3D session.createTopic(getTopicName()); - connection.start(); - } - - /** - * @return - * @throws Exception = - */ - private JMSServerManager createServer() throws Exception - { - Configuration config =3D new ConfigurationImpl(); - config.setSecurityEnabled(false); - config.setPersistenceEnabled(false); - - Map params =3D new HashMap(); - params.put(TransportConstants.PROTOCOL_PROP_NAME, ProtocolType.STOMP= .toString()); - params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEF= AULT_STOMP_PORT); - TransportConfiguration stompTransport =3D new TransportConfiguration= (NettyAcceptorFactory.class.getName(), params); - config.getAcceptorConfigurations().add(stompTransport); - config.getAcceptorConfigurations().add(new TransportConfiguration(In= VMAcceptorFactory.class.getName())); - HornetQServer hornetQServer =3D HornetQServers.newHornetQServer(conf= ig); - - JMSConfiguration jmsConfig =3D new JMSConfigurationImpl(); - jmsConfig.getQueueConfigurations() - .add(new JMSQueueConfigurationImpl(getQueueName(), null, fa= lse, getQueueName())); - jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl(ge= tTopicName(), getTopicName())); - server =3D new JMSServerManagerImpl(hornetQServer, jmsConfig); - server.setContext(new InVMContext()); - return server; - } - - protected void tearDown() throws Exception - { - connection.close(); - if (stompSocket !=3D null) - { - stompSocket.close(); - } - server.stop(); - - super.tearDown(); - } - - protected void reconnect() throws Exception - { - reconnect(0); - } - - protected void reconnect(long sleep) throws Exception - { - stompSocket.close(); - - if (sleep > 0) - { - Thread.sleep(sleep); - } - - stompSocket =3D createSocket(); - inputBuffer =3D new ByteArrayOutputStream(); - } - - protected ConnectionFactory createConnectionFactory() - { - return new HornetQConnectionFactory(new TransportConfiguration(InVMC= onnectorFactory.class.getName())); - } - - protected Socket createSocket() throws IOException - { - return new Socket("127.0.0.1", port); - } - - protected String getQueueName() - { - return "test"; - } - - protected String getQueuePrefix() - { - return "jms.queue."; - } - - protected String getTopicName() - { - return "testtopic"; - } - - protected String getTopicPrefix() - { - return "jms.topic."; - } - - public void sendFrame(String data) throws Exception - { - byte[] bytes =3D data.getBytes("UTF-8"); - OutputStream outputStream =3D stompSocket.getOutputStream(); - for (int i =3D 0; i < bytes.length; i++) - { - outputStream.write(bytes[i]); - } - outputStream.flush(); - } - - public void sendFrame(byte[] data) throws Exception - { - OutputStream outputStream =3D stompSocket.getOutputStream(); - for (int i =3D 0; i < data.length; i++) - { - outputStream.write(data[i]); - } - outputStream.flush(); - } - - public String receiveFrame(long timeOut) throws Exception - { - stompSocket.setSoTimeout((int)timeOut); - InputStream is =3D stompSocket.getInputStream(); - int c =3D 0; - for (;;) - { - c =3D is.read(); - if (c < 0) - { - throw new IOException("socket closed."); - } - else if (c =3D=3D 0) - { - c =3D is.read(); - if (c !=3D '\n') - { - byte[] ba =3D inputBuffer.toByteArray(); - System.out.println(new String(ba, "UTF-8")); - } - Assert.assertEquals("Expecting stomp frame to terminate with \= 0\n", c, '\n'); - byte[] ba =3D inputBuffer.toByteArray(); - inputBuffer.reset(); - return new String(ba, "UTF-8"); - } - else - { - inputBuffer.write(c); - } - } - } - - public void sendMessage(String msg) throws Exception - { - sendMessage(msg, queue); - } - - public void sendMessage(String msg, Destination destination) throws Exc= eption - { - MessageProducer producer =3D session.createProducer(destination); - TextMessage message =3D session.createTextMessage(msg); - producer.send(message); - } - - public void sendMessage(byte[] data, Destination destination) throws Ex= ception - { - sendMessage(data, "foo", "xyz", destination); - } - - public void sendMessage(String msg, String propertyName, String propert= yValue) throws Exception - { - sendMessage(msg.getBytes("UTF-8"), propertyName, propertyValue, queu= e); - } - - public void sendMessage(byte[] data, String propertyName, String proper= tyValue, Destination destination) throws Exception - { - MessageProducer producer =3D session.createProducer(destination); - BytesMessage message =3D session.createBytesMessage(); - message.setStringProperty(propertyName, propertyValue); - message.writeBytes(data); - producer.send(message); - } - - protected void waitForReceipt() throws Exception - { - String frame =3D receiveFrame(50000); - assertNotNull(frame); - assertTrue(frame.indexOf("RECEIPT") > -1); - } - - protected void waitForFrameToTakeEffect() throws InterruptedException - { - // bit of a dirty hack :) - // another option would be to force some kind of receipt to be retur= ned - // from the frame - Thread.sleep(2000); - } } Added: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.ja= va =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java = (rev 0) +++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java = 2010-10-06 11:24:44 UTC (rev 9758) @@ -0,0 +1,290 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.hornetq.tests.integration.stomp; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.util.HashMap; +import java.util.Map; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import junit.framework.Assert; + +import org.hornetq.api.core.TransportConfiguration; +import org.hornetq.core.config.Configuration; +import org.hornetq.core.config.impl.ConfigurationImpl; +import org.hornetq.core.logging.Logger; +import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory; +import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory; +import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory; +import org.hornetq.core.remoting.impl.netty.TransportConstants; +import org.hornetq.core.server.HornetQServer; +import org.hornetq.core.server.HornetQServers; +import org.hornetq.jms.client.HornetQConnectionFactory; +import org.hornetq.jms.server.JMSServerManager; +import org.hornetq.jms.server.config.JMSConfiguration; +import org.hornetq.jms.server.config.impl.JMSConfigurationImpl; +import org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl; +import org.hornetq.jms.server.config.impl.TopicConfigurationImpl; +import org.hornetq.jms.server.impl.JMSServerManagerImpl; +import org.hornetq.spi.core.protocol.ProtocolType; +import org.hornetq.tests.unit.util.InVMContext; +import org.hornetq.tests.util.UnitTestCase; + +public abstract class StompTestBase extends UnitTestCase +{ + private static final transient Logger log =3D Logger.getLogger(StompTes= tBase.class); + + private int port =3D 61613; + + private Socket stompSocket; + + private ByteArrayOutputStream inputBuffer; + + private ConnectionFactory connectionFactory; + + private Connection connection; + + protected Session session; + + protected Queue queue; + + protected Topic topic; + + protected JMSServerManager server; + = + = + + // Implementation methods + // --------------------------------------------------------------------= ----- + protected void setUp() throws Exception + { + super.setUp(); + + server =3D createServer(); + server.start(); + connectionFactory =3D createConnectionFactory(); + + stompSocket =3D createSocket(); + inputBuffer =3D new ByteArrayOutputStream(); + + connection =3D connectionFactory.createConnection(); + session =3D connection.createSession(false, Session.AUTO_ACKNOWLEDGE= ); + queue =3D session.createQueue(getQueueName()); + topic =3D session.createTopic(getTopicName()); + connection.start(); + } + + /** + * @return + * @throws Exception = + */ + protected JMSServerManager createServer() throws Exception + { + Configuration config =3D new ConfigurationImpl(); + config.setSecurityEnabled(false); + config.setPersistenceEnabled(false); + + Map params =3D new HashMap(); + params.put(TransportConstants.PROTOCOL_PROP_NAME, ProtocolType.STOMP= .toString()); + params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEF= AULT_STOMP_PORT); + TransportConfiguration stompTransport =3D new TransportConfiguration= (NettyAcceptorFactory.class.getName(), params); + config.getAcceptorConfigurations().add(stompTransport); + config.getAcceptorConfigurations().add(new TransportConfiguration(In= VMAcceptorFactory.class.getName())); + HornetQServer hornetQServer =3D HornetQServers.newHornetQServer(conf= ig); + + JMSConfiguration jmsConfig =3D new JMSConfigurationImpl(); + jmsConfig.getQueueConfigurations() + .add(new JMSQueueConfigurationImpl(getQueueName(), null, fa= lse, getQueueName())); + jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl(ge= tTopicName(), getTopicName())); + server =3D new JMSServerManagerImpl(hornetQServer, jmsConfig); + server.setContext(new InVMContext()); + return server; + } + + protected void tearDown() throws Exception + { + connection.close(); + if (stompSocket !=3D null) + { + stompSocket.close(); + } + server.stop(); + + super.tearDown(); + } + + protected void reconnect() throws Exception + { + reconnect(0); + } + + protected void reconnect(long sleep) throws Exception + { + stompSocket.close(); + + if (sleep > 0) + { + Thread.sleep(sleep); + } + + stompSocket =3D createSocket(); + inputBuffer =3D new ByteArrayOutputStream(); + } + + protected ConnectionFactory createConnectionFactory() + { + return new HornetQConnectionFactory(new TransportConfiguration(InVMC= onnectorFactory.class.getName())); + } + + protected Socket createSocket() throws IOException + { + return new Socket("127.0.0.1", port); + } + + protected String getQueueName() + { + return "test"; + } + + protected String getQueuePrefix() + { + return "jms.queue."; + } + + protected String getTopicName() + { + return "testtopic"; + } + + protected String getTopicPrefix() + { + return "jms.topic."; + } + + public void sendFrame(String data) throws Exception + { + byte[] bytes =3D data.getBytes("UTF-8"); + OutputStream outputStream =3D stompSocket.getOutputStream(); + for (int i =3D 0; i < bytes.length; i++) + { + outputStream.write(bytes[i]); + } + outputStream.flush(); + } + + public void sendFrame(byte[] data) throws Exception + { + OutputStream outputStream =3D stompSocket.getOutputStream(); + for (int i =3D 0; i < data.length; i++) + { + outputStream.write(data[i]); + } + outputStream.flush(); + } + + public String receiveFrame(long timeOut) throws Exception + { + stompSocket.setSoTimeout((int)timeOut); + InputStream is =3D stompSocket.getInputStream(); + int c =3D 0; + for (;;) + { + c =3D is.read(); + if (c < 0) + { + throw new IOException("socket closed."); + } + else if (c =3D=3D 0) + { + c =3D is.read(); + if (c !=3D '\n') + { + byte[] ba =3D inputBuffer.toByteArray(); + System.out.println(new String(ba, "UTF-8")); + } + Assert.assertEquals("Expecting stomp frame to terminate with \= 0\n", c, '\n'); + byte[] ba =3D inputBuffer.toByteArray(); + inputBuffer.reset(); + return new String(ba, "UTF-8"); + } + else + { + inputBuffer.write(c); + } + } + } + + public void sendMessage(String msg) throws Exception + { + sendMessage(msg, queue); + } + + public void sendMessage(String msg, Destination destination) throws Exc= eption + { + MessageProducer producer =3D session.createProducer(destination); + TextMessage message =3D session.createTextMessage(msg); + producer.send(message); + } + + public void sendMessage(byte[] data, Destination destination) throws Ex= ception + { + sendMessage(data, "foo", "xyz", destination); + } + + public void sendMessage(String msg, String propertyName, String propert= yValue) throws Exception + { + sendMessage(msg.getBytes("UTF-8"), propertyName, propertyValue, queu= e); + } + + public void sendMessage(byte[] data, String propertyName, String proper= tyValue, Destination destination) throws Exception + { + MessageProducer producer =3D session.createProducer(destination); + BytesMessage message =3D session.createBytesMessage(); + message.setStringProperty(propertyName, propertyValue); + message.writeBytes(data); + producer.send(message); + } + + protected void waitForReceipt() throws Exception + { + String frame =3D receiveFrame(50000); + assertNotNull(frame); + assertTrue(frame.indexOf("RECEIPT") > -1); + } + + protected void waitForFrameToTakeEffect() throws InterruptedException + { + // bit of a dirty hack :) + // another option would be to force some kind of receipt to be retur= ned + // from the frame + Thread.sleep(2000); + } +} --===============6641207432056589441==--