[hornetq-commits] JBoss hornetq SVN: r9758 - in trunk: src/main/org/hornetq/core/protocol/core/impl and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Oct 6 07:24:45 EDT 2010


Author: timfox
Date: 2010-10-06 07:24:44 -0400 (Wed, 06 Oct 2010)
New Revision: 9758

Added:
   trunk/tests/src/org/hornetq/tests/integration/stomp/StompConnectionCleanupTest.java
   trunk/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java
Modified:
   trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
   trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java
   trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
   trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java
   trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-526

Modified: trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java	2010-10-06 08:18:15 UTC (rev 9757)
+++ trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java	2010-10-06 11:24:44 UTC (rev 9758)
@@ -107,7 +107,7 @@
  * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
  * @author <a href="mailto:clebert.suconic at jboss.org>Clebert Suconic</a>
  */
-public class ServerSessionPacketHandler implements ChannelHandler, CloseListener, FailureListener
+public class ServerSessionPacketHandler implements ChannelHandler
 {
    private static final Logger log = Logger.getLogger(ServerSessionPacketHandler.class);
 
@@ -150,8 +150,6 @@
       {
          direct = false;
       }
-      
-      addConnectionListeners();
    }
 
    public long getID()
@@ -159,22 +157,6 @@
       return channel.getID();
    }
 
-   public void connectionFailed(final HornetQException exception)
-   {
-      log.warn("Client connection failed, clearing up resources for session " + session.getName());
-
-      try
-      {
-         session.close(true);
-      }
-      catch (Exception e)
-      {
-         log.error("Failed to close session", e);
-      }
-
-      log.warn("Cleared up resources for session " + session.getName());
-   }
-
    public void close()
    {
       channel.flushConfirmations();
@@ -189,22 +171,6 @@
       }
    }
 
-   public void connectionClosed()
-   {
-   }
-
-   private void addConnectionListeners()
-   {
-      remotingConnection.addFailureListener(this);
-      remotingConnection.addCloseListener(this);
-   }
-
-   private void removeConnectionListeners()
-   {
-      remotingConnection.removeFailureListener(this);
-      remotingConnection.removeCloseListener(this);
-   }
-
    public Channel getChannel()
    {
       return channel;
@@ -423,7 +389,7 @@
                {
                   requiresResponse = true;
                   session.close(false);
-                  removeConnectionListeners();
+                 // removeConnectionListeners();
                   response = new NullResponseMessage();
                   flush = true;
                   closeChannel = true;
@@ -601,10 +567,10 @@
       // might be executed
       // before we have transferred the connection, leaving it in a started state
       session.setTransferring(true);
-
-      remotingConnection.removeFailureListener(this);
-      remotingConnection.removeCloseListener(this);
-
+            
+      List<CloseListener> closeListeners = remotingConnection.removeCloseListeners();
+      List<FailureListener> failureListeners = remotingConnection.removeFailureListeners();
+      
       // Note. We do not destroy the replicating connection here. In the case the live server has really crashed
       // then the connection will get cleaned up anyway when the server ping timeout kicks in.
       // In the case the live server is really still up, i.e. a split brain situation (or in tests), then closing
@@ -618,8 +584,8 @@
 
       remotingConnection = newConnection;
 
-      remotingConnection.addFailureListener(this);
-      remotingConnection.addCloseListener(this);
+      remotingConnection.setCloseListeners(closeListeners);
+      remotingConnection.setFailureListeners(failureListeners);
 
       int serverLastReceivedCommandID = channel.getLastConfirmedCommandID();
 

Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java	2010-10-06 08:18:15 UTC (rev 9757)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java	2010-10-06 11:24:44 UTC (rev 9758)
@@ -27,7 +27,7 @@
 /**
  * A CoreSessionCallback
  *
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author Tim Fox
  *
  *
  */
@@ -40,7 +40,7 @@
    private ProtocolManager protocolManager;
 
    private String name;
-   
+
    public CoreSessionCallback(String name, ProtocolManager protocolManager, Channel channel)
    {
       this.name = name;
@@ -54,8 +54,8 @@
 
       channel.send(packet);
 
-      int size =  packet.getPacketSize();
-      
+      int size = packet.getPacketSize();
+
       return size;
    }
 
@@ -67,15 +67,15 @@
 
       return packet.getPacketSize();
    }
-     
+
    public int sendMessage(ServerMessage message, long consumerID, int deliveryCount)
    {
       Packet packet = new SessionReceiveMessage(consumerID, message, deliveryCount);
 
       channel.sendBatched(packet);
-      
-      int size =  packet.getPacketSize();
 
+      int size = packet.getPacketSize();
+
       return size;
    }
 

Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java	2010-10-06 08:18:15 UTC (rev 9757)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java	2010-10-06 11:24:44 UTC (rev 9758)
@@ -225,6 +225,31 @@
       return closeListeners.remove(listener);
    }
 
+   public List<CloseListener> removeCloseListeners()
+   {
+      List<CloseListener> ret = new ArrayList<CloseListener>(closeListeners);
+      
+      closeListeners.clear();
+      
+      return ret;
+   }
+
+   public List<FailureListener> removeFailureListeners()
+   {
+      List<FailureListener> ret = new ArrayList<FailureListener>(failureListeners);
+      
+      failureListeners.clear();
+      
+      return ret; 
+   }
+
+   public void setCloseListeners(List<CloseListener> listeners)
+   {
+      closeListeners.clear();
+      
+      closeListeners.addAll(listeners);      
+   }
+
    public HornetQBuffer createBuffer(final int size)
    {
       return transportConnection.createBuffer(size);
@@ -471,6 +496,7 @@
          channels.clear();
       }
    }  
+   
    private void callFailureListeners(final HornetQException me)
    {
       final List<FailureListener> listenersClone = new ArrayList<FailureListener>(failureListeners);

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java	2010-10-06 08:18:15 UTC (rev 9757)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java	2010-10-06 11:24:44 UTC (rev 9758)
@@ -13,8 +13,10 @@
 
 package org.hornetq.core.protocol.stomp;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQBuffers;
@@ -32,8 +34,9 @@
  *
  *
  */
-class StompConnection implements RemotingConnection
+public class StompConnection implements RemotingConnection
 {
+
    private static final Logger log = Logger.getLogger(StompConnection.class);
 
    private final StompProtocolManager manager;
@@ -49,9 +52,17 @@
    private boolean valid;
 
    private boolean destroyed = false;
-   
+
    private StompDecoder decoder = new StompDecoder();
+
+   private final List<FailureListener> failureListeners = new CopyOnWriteArrayList<FailureListener>();
+
+   private final List<CloseListener> closeListeners = new CopyOnWriteArrayList<CloseListener>();
+
+   private final Object failLock = new Object();
    
+   private volatile boolean dataReceived;
+
    public StompDecoder getDecoder()
    {
       return decoder;
@@ -64,17 +75,90 @@
       this.manager = manager;
    }
 
-   public void addCloseListener(CloseListener listener)
+   public void addFailureListener(final FailureListener listener)
    {
+      if (listener == null)
+      {
+         throw new IllegalStateException("FailureListener cannot be null");
+      }
+
+      failureListeners.add(listener);
    }
 
-   public void addFailureListener(FailureListener listener)
+   public boolean removeFailureListener(final FailureListener listener)
    {
+      if (listener == null)
+      {
+         throw new IllegalStateException("FailureListener cannot be null");
+      }
+
+      return failureListeners.remove(listener);
    }
 
+   public void addCloseListener(final CloseListener listener)
+   {
+      if (listener == null)
+      {
+         throw new IllegalStateException("CloseListener cannot be null");
+      }
+
+      closeListeners.add(listener);
+   }
+
+   public boolean removeCloseListener(final CloseListener listener)
+   {
+      if (listener == null)
+      {
+         throw new IllegalStateException("CloseListener cannot be null");
+      }
+
+      return closeListeners.remove(listener);
+   }
+
+   public List<CloseListener> removeCloseListeners()
+   {
+      List<CloseListener> ret = new ArrayList<CloseListener>(closeListeners);
+
+      closeListeners.clear();
+
+      return ret;
+   }
+
+   public List<FailureListener> removeFailureListeners()
+   {
+      List<FailureListener> ret = new ArrayList<FailureListener>(failureListeners);
+
+      failureListeners.clear();
+
+      return ret;
+   }
+
+   public void setCloseListeners(List<CloseListener> listeners)
+   {
+      closeListeners.clear();
+
+      closeListeners.addAll(listeners);
+   }
+
+   public void setFailureListeners(final List<FailureListener> listeners)
+   {
+      failureListeners.clear();
+
+      failureListeners.addAll(listeners);
+   }
+   
+   public void setDataReceived()
+   {
+      dataReceived = true;
+   }
+
    public boolean checkDataReceived()
    {
-      return true;
+      boolean res = dataReceived;
+
+      dataReceived = false;
+
+      return res;
    }
 
    public HornetQBuffer createBuffer(int size)
@@ -84,13 +168,23 @@
 
    public void destroy()
    {
-      if (destroyed)
+      synchronized (failLock)
       {
-         return;
+         if (destroyed)
+         {
+            return;
+         }
       }
 
       destroyed = true;
 
+      internalClose();
+
+      callClosingListeners();
+   }
+
+   private void internalClose()
+   {
       transportConnection.close();
 
       manager.cleanup(this);
@@ -100,8 +194,29 @@
    {
    }
 
-   public void fail(HornetQException me)
+   public void fail(final HornetQException me)
    {
+      synchronized (failLock)
+      {
+         if (destroyed)
+         {
+            return;
+         }
+
+         destroyed = true;
+      }
+
+      log.warn("Connection failure has been detected: " + me.getMessage() +
+                                      " [code=" +
+                                      me.getCode() +
+                                      "]");
+
+      // Then call the listeners
+      callFailureListeners(me);
+
+      callClosingListeners();
+      
+      internalClose();
    }
 
    public void flush()
@@ -140,20 +255,6 @@
       return destroyed;
    }
 
-   public boolean removeCloseListener(CloseListener listener)
-   {
-      return false;
-   }
-
-   public boolean removeFailureListener(FailureListener listener)
-   {
-      return false;
-   }
-
-   public void setFailureListeners(List<FailureListener> listeners)
-   {
-   }
-
    public void bufferReceived(Object connectionID, HornetQBuffer buffer)
    {
       manager.handleBuffer(this, buffer);
@@ -188,7 +289,7 @@
    {
       return clientID;
    }
-   
+
    public boolean isValid()
    {
       return valid;
@@ -198,4 +299,45 @@
    {
       this.valid = valid;
    }
+
+   private void callFailureListeners(final HornetQException me)
+   {
+      final List<FailureListener> listenersClone = new ArrayList<FailureListener>(failureListeners);
+
+      for (final FailureListener listener : listenersClone)
+      {
+         try
+         {
+            listener.connectionFailed(me);
+         }
+         catch (final Throwable t)
+         {
+            // Failure of one listener to execute shouldn't prevent others
+            // from
+            // executing
+            log.error("Failed to execute failure listener", t);
+         }
+      }
+   }
+
+   private void callClosingListeners()
+   {
+      final List<CloseListener> listenersClone = new ArrayList<CloseListener>(closeListeners);
+
+      for (final CloseListener listener : listenersClone)
+      {
+         try
+         {
+            listener.connectionClosed();
+         }
+         catch (final Throwable t)
+         {
+            // Failure of one listener to execute shouldn't prevent others
+            // from
+            // executing
+            log.error("Failed to execute failure listener", t);
+         }
+      }
+   }
+
 }

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-10-06 08:18:15 UTC (rev 9757)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-10-06 11:24:44 UTC (rev 9758)
@@ -124,6 +124,7 @@
       else
       {
          // Default to 1 minute - which is same as core protocol
+
          return new ConnectionEntry(conn, System.currentTimeMillis(), 1 * 60 * 1000);
       }
    }
@@ -143,6 +144,8 @@
    {
       StompConnection conn = (StompConnection)connection;
       
+      conn.setDataReceived();
+      
       StompDecoder decoder = conn.getDecoder();
 
       do
@@ -217,7 +220,6 @@
 
             if (request.getHeaders().containsKey(Stomp.Headers.RECEIPT_REQUESTED))
             {
-               log.info("receipt requested");
                if (response == null)
                {
                   Map<String, Object> h = new HashMap<String, Object>();

Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2010-10-06 08:18:15 UTC (rev 9757)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2010-10-06 11:24:44 UTC (rev 9758)
@@ -33,13 +33,13 @@
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.logging.Logger;
-import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
 import org.hornetq.core.protocol.core.impl.CoreProtocolManagerFactory;
 import org.hornetq.core.protocol.stomp.StompProtocolManagerFactory;
 import org.hornetq.core.remoting.FailureListener;
 import org.hornetq.core.remoting.impl.netty.TransportConstants;
 import org.hornetq.core.remoting.server.RemotingService;
 import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.impl.ServerSessionImpl;
 import org.hornetq.core.server.management.ManagementService;
 import org.hornetq.spi.core.protocol.ConnectionEntry;
 import org.hornetq.spi.core.protocol.ProtocolManager;
@@ -133,7 +133,8 @@
       // difference between Stomp and Stomp over Web Sockets is handled in NettyAcceptor.getPipeline()
       this.protocolMap.put(ProtocolType.STOMP, new StompProtocolManagerFactory().createProtocolManager(server,
                                                                                                        interceptors));
-      this.protocolMap.put(ProtocolType.STOMP_WS, new StompProtocolManagerFactory().createProtocolManager(server, interceptors));
+      this.protocolMap.put(ProtocolType.STOMP_WS, new StompProtocolManagerFactory().createProtocolManager(server,
+                                                                                                          interceptors));
    }
 
    // RemotingService implementation -------------------------------
@@ -144,15 +145,14 @@
       {
          return;
       }
-      
-      ClassLoader tccl =
-         AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
+
+      ClassLoader tccl = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
+      {
+         public ClassLoader run()
          {
-            public ClassLoader run()
-            {
-               return Thread.currentThread().getContextClassLoader();
-            }
-         });
+            return Thread.currentThread().getContextClassLoader();
+         }
+      });
 
       // The remoting service maintains it's own thread pool for handling remoting traffic
       // If OIO each connection will have it's own thread
@@ -161,7 +161,8 @@
       // to support many hundreds of connections, but the main thread pool must be kept small for better performance
 
       ThreadFactory tFactory = new HornetQThreadFactory("HornetQ-remoting-threads" + System.identityHashCode(this),
-                                                        false, tccl);
+                                                        false,
+                                                        tccl);
 
       threadPool = Executors.newCachedThreadPool(tFactory);
 
@@ -322,6 +323,8 @@
       }
       else
       {
+         log.info("failed to remove connection");
+
          return null;
       }
    }
@@ -388,7 +391,7 @@
 
          for (FailureListener listener : failureListeners)
          {
-            if (listener instanceof ServerSessionPacketHandler)
+            if (listener instanceof ServerSessionImpl)
             {
                empty = false;
 
@@ -528,9 +531,12 @@
                RemotingConnection conn = removeConnection(id);
 
                HornetQException me = new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
-                                                          "Did not receive ping from " + conn.getRemoteAddress() +
+                                                          "Did not receive data from " + conn.getRemoteAddress() +
                                                                    ". It is likely the client has exited or crashed without " +
-                                                                   "closing its connection, or the network between the server and client has failed. The connection will now be closed.");
+                                                                   "closing its connection, or the network between the server and client has failed. " +
+                                                                   "You also might have configured connection-ttl and client-failure-check-period incorrectly. " +
+                                                                   "Please check user manual for more information." +
+                                                                   " The connection will now be closed.");
                conn.fail(me);
             }
 

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-10-06 08:18:15 UTC (rev 9757)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-10-06 11:24:44 UTC (rev 9758)
@@ -73,7 +73,7 @@
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
  */
-public class ServerSessionImpl implements ServerSession, FailureListener
+public class ServerSessionImpl implements ServerSession , FailureListener
 {
    // Constants -----------------------------------------------------------------------------
 

Modified: trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java	2010-10-06 08:18:15 UTC (rev 9757)
+++ trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java	2010-10-06 11:24:44 UTC (rev 9758)
@@ -77,14 +77,22 @@
     * @return true if removed
     */
    boolean removeCloseListener(CloseListener listener);
-
+   
+   List<CloseListener> removeCloseListeners();
+   
+   void setCloseListeners(List<CloseListener> listeners);
+   
+   
    /**
     * return all the failure listeners
     *
     * @return the listeners
     */
    List<FailureListener> getFailureListeners();
+   
+   List<FailureListener> removeFailureListeners();
 
+
    /**
     * set the failure listeners.
     * <p/>

Added: trunk/tests/src/org/hornetq/tests/integration/stomp/StompConnectionCleanupTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompConnectionCleanupTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompConnectionCleanupTest.java	2010-10-06 11:24:44 UTC (rev 9758)
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.stomp;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+
+import junit.framework.Assert;
+
+import org.hornetq.core.protocol.stomp.Stomp;
+import org.hornetq.jms.server.JMSServerManager;
+
+/**
+ * A StompConnectionCleanupTest
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class StompConnectionCleanupTest extends StompTestBase
+{
+   private static final long CONNECTION_TTL = 2000;
+
+   public void testConnectionCleanup() throws Exception
+   {
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
+      frame = receiveFrame(10000);
+      
+      //We send and consumer a message to ensure a STOMP connection and server session is created
+
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+      frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
+      sendFrame(frame);
+
+      frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
+      sendFrame(frame);
+
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("MESSAGE"));
+      Assert.assertTrue(frame.indexOf("destination:") > 0);
+    
+      // Now we wait until the connection is cleared on the server, which will happen some time after ttl, since no data
+      // is being sent
+
+      long start = System.currentTimeMillis();
+
+      while (true)
+      {
+         int connCount = server.getHornetQServer().getRemotingService().getConnections().size();
+
+         int sessionCount = server.getHornetQServer().getSessions().size();
+         
+         // All connections and sessions should be timed out including STOMP + JMS connection
+
+         if (connCount == 0 && sessionCount == 0)
+         {
+            break;
+         }
+         
+         Thread.sleep(10);
+
+         if (System.currentTimeMillis() - start > 10000)
+         {
+            fail("Timed out waiting for connection to be cleared up");
+         }
+      }      
+   }
+   
+   public void testConnectionNotCleanedUp() throws Exception
+   {
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
+      frame = receiveFrame(10000);
+      
+      //We send and consumer a message to ensure a STOMP connection and server session is created
+
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+      MessageConsumer consumer = session.createConsumer(queue);
+      
+      long time = CONNECTION_TTL * 3;
+      
+      long start = System.currentTimeMillis();
+      
+      //Send msgs for an amount of time > connection_ttl make sure connection is not closed
+      while (true)
+      {
+         //Send and receive a msg
+         
+         frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
+         sendFrame(frame);
+
+         Message msg = consumer.receive(1000);
+         assertNotNull(msg);
+                
+         Thread.sleep(100);
+         
+         if (System.currentTimeMillis() - start > time)
+         {
+            break;
+         }
+      }
+    
+   }
+   
+   @Override
+   protected JMSServerManager createServer() throws Exception
+   {
+      JMSServerManager s = super.createServer();
+      
+      s.getHornetQServer().getConfiguration().setConnectionTTLOverride(CONNECTION_TTL);
+      
+      return s;
+   }
+}

Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2010-10-06 08:18:15 UTC (rev 9757)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2010-10-06 11:24:44 UTC (rev 9758)
@@ -19,77 +19,29 @@
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
 import java.net.SocketTimeoutException;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
-import javax.jms.Destination;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
 import javax.jms.TextMessage;
-import javax.jms.Topic;
 
 import junit.framework.Assert;
 
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.protocol.stomp.Stomp;
-import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
-import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
-import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
-import org.hornetq.core.remoting.impl.netty.TransportConstants;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.HornetQServers;
-import org.hornetq.jms.client.HornetQConnectionFactory;
-import org.hornetq.jms.server.JMSServerManager;
-import org.hornetq.jms.server.config.JMSConfiguration;
-import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
-import org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl;
-import org.hornetq.jms.server.config.impl.TopicConfigurationImpl;
-import org.hornetq.jms.server.impl.JMSServerManagerImpl;
-import org.hornetq.spi.core.protocol.ProtocolType;
-import org.hornetq.tests.unit.util.InVMContext;
-import org.hornetq.tests.util.UnitTestCase;
 
-public class StompTest extends UnitTestCase
+public class StompTest extends StompTestBase
 {
    private static final transient Logger log = Logger.getLogger(StompTest.class);
 
-   private int port = 61613;
-
-   private Socket stompSocket;
-
-   private ByteArrayOutputStream inputBuffer;
-
-   private ConnectionFactory connectionFactory;
-
-   private Connection connection;
-
-   private Session session;
-
-   private Queue queue;
-
-   private Topic topic;
-
-   private JMSServerManager server;
-   
    public void testSendManyMessages() throws Exception
    {
       MessageConsumer consumer = session.createConsumer(queue);
@@ -106,7 +58,7 @@
 
          public void onMessage(Message arg0)
          {
-            //System.out.println("<<< " + (1000 - latch.getCount()));
+            // System.out.println("<<< " + (1000 - latch.getCount()));
             latch.countDown();
          }
       });
@@ -115,7 +67,7 @@
       for (int i = 1; i <= count; i++)
       {
          // Thread.sleep(1);
-         //System.out.println(">>> " + i);
+         // System.out.println(">>> " + i);
          sendFrame(frame);
       }
 
@@ -191,14 +143,14 @@
       TextMessage message = (TextMessage)consumer.receive(1000);
       Assert.assertNotNull(message);
       Assert.assertEquals("Hello World", message.getText());
-      
+
       // Make sure that the timestamp is valid - should
       // be very close to the current time.
       long tnow = System.currentTimeMillis();
       long tmsg = message.getJMSTimestamp();
       Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
    }
-   
+
    /*
     * Some STOMP clients erroneously put a new line \n *after* the terminating NUL char at the end of the frame
     * This means next frame read might have a \n a the beginning.
@@ -215,14 +167,20 @@
       frame = receiveFrame(10000);
       Assert.assertTrue(frame.startsWith("CONNECTED"));
 
-      frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL + "\n";
+      frame = "SEND\n" + "destination:" +
+              getQueuePrefix() +
+              getQueueName() +
+              "\n\n" +
+              "Hello World" +
+              Stomp.NULL +
+              "\n";
 
       sendFrame(frame);
 
       TextMessage message = (TextMessage)consumer.receive(1000);
       Assert.assertNotNull(message);
       Assert.assertEquals("Hello World", message.getText());
-      
+
       // Make sure that the timestamp is valid - should
       // be very close to the current time.
       long tnow = System.currentTimeMillis();
@@ -258,7 +216,7 @@
       TextMessage message = (TextMessage)consumer.receive(1000);
       Assert.assertNotNull(message);
       Assert.assertEquals("Hello World", message.getText());
-      
+
       // Make sure that the timestamp is valid - should
       // be very close to the current time.
       long tnow = System.currentTimeMillis();
@@ -1279,210 +1237,4 @@
       frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
       sendFrame(frame);
    }
-
-   // Implementation methods
-   // -------------------------------------------------------------------------
-   protected void setUp() throws Exception
-   {
-      super.setUp();
-
-      server = createServer();
-      server.start();
-      connectionFactory = createConnectionFactory();
-
-      stompSocket = createSocket();
-      inputBuffer = new ByteArrayOutputStream();
-
-      connection = connectionFactory.createConnection();
-      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      queue = session.createQueue(getQueueName());
-      topic = session.createTopic(getTopicName());
-      connection.start();
-   }
-
-   /**
-   * @return
-   * @throws Exception 
-   */
-   private JMSServerManager createServer() throws Exception
-   {
-      Configuration config = new ConfigurationImpl();
-      config.setSecurityEnabled(false);
-      config.setPersistenceEnabled(false);
-
-      Map<String, Object> params = new HashMap<String, Object>();
-      params.put(TransportConstants.PROTOCOL_PROP_NAME, ProtocolType.STOMP.toString());
-      params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
-      TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
-      config.getAcceptorConfigurations().add(stompTransport);
-      config.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
-      HornetQServer hornetQServer = HornetQServers.newHornetQServer(config);
-
-      JMSConfiguration jmsConfig = new JMSConfigurationImpl();
-      jmsConfig.getQueueConfigurations()
-               .add(new JMSQueueConfigurationImpl(getQueueName(), null, false, getQueueName()));
-      jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl(getTopicName(), getTopicName()));
-      server = new JMSServerManagerImpl(hornetQServer, jmsConfig);
-      server.setContext(new InVMContext());
-      return server;
-   }
-
-   protected void tearDown() throws Exception
-   {
-      connection.close();
-      if (stompSocket != null)
-      {
-         stompSocket.close();
-      }
-      server.stop();
-
-      super.tearDown();
-   }
-
-   protected void reconnect() throws Exception
-   {
-      reconnect(0);
-   }
-
-   protected void reconnect(long sleep) throws Exception
-   {
-      stompSocket.close();
-
-      if (sleep > 0)
-      {
-         Thread.sleep(sleep);
-      }
-
-      stompSocket = createSocket();
-      inputBuffer = new ByteArrayOutputStream();
-   }
-
-   protected ConnectionFactory createConnectionFactory()
-   {
-      return new HornetQConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
-   }
-
-   protected Socket createSocket() throws IOException
-   {
-      return new Socket("127.0.0.1", port);
-   }
-
-   protected String getQueueName()
-   {
-      return "test";
-   }
-
-   protected String getQueuePrefix()
-   {
-      return "jms.queue.";
-   }
-
-   protected String getTopicName()
-   {
-      return "testtopic";
-   }
-
-   protected String getTopicPrefix()
-   {
-      return "jms.topic.";
-   }
-
-   public void sendFrame(String data) throws Exception
-   {
-      byte[] bytes = data.getBytes("UTF-8");
-      OutputStream outputStream = stompSocket.getOutputStream();
-      for (int i = 0; i < bytes.length; i++)
-      {
-         outputStream.write(bytes[i]);
-      }
-      outputStream.flush();
-   }
-
-   public void sendFrame(byte[] data) throws Exception
-   {
-      OutputStream outputStream = stompSocket.getOutputStream();
-      for (int i = 0; i < data.length; i++)
-      {
-         outputStream.write(data[i]);
-      }
-      outputStream.flush();
-   }
-
-   public String receiveFrame(long timeOut) throws Exception
-   {
-      stompSocket.setSoTimeout((int)timeOut);
-      InputStream is = stompSocket.getInputStream();
-      int c = 0;
-      for (;;)
-      {
-         c = is.read();
-         if (c < 0)
-         {
-            throw new IOException("socket closed.");
-         }
-         else if (c == 0)
-         {
-            c = is.read();
-            if (c != '\n')
-            {
-               byte[] ba = inputBuffer.toByteArray();
-               System.out.println(new String(ba, "UTF-8"));
-            }
-            Assert.assertEquals("Expecting stomp frame to terminate with \0\n", c, '\n');
-            byte[] ba = inputBuffer.toByteArray();
-            inputBuffer.reset();
-            return new String(ba, "UTF-8");
-         }
-         else
-         {
-            inputBuffer.write(c);
-         }
-      }
-   }
-
-   public void sendMessage(String msg) throws Exception
-   {
-      sendMessage(msg, queue);
-   }
-
-   public void sendMessage(String msg, Destination destination) throws Exception
-   {
-      MessageProducer producer = session.createProducer(destination);
-      TextMessage message = session.createTextMessage(msg);
-      producer.send(message);
-   }
-
-   public void sendMessage(byte[] data, Destination destination) throws Exception
-   {
-      sendMessage(data, "foo", "xyz", destination);
-   }
-
-   public void sendMessage(String msg, String propertyName, String propertyValue) throws Exception
-   {
-      sendMessage(msg.getBytes("UTF-8"), propertyName, propertyValue, queue);
-   }
-
-   public void sendMessage(byte[] data, String propertyName, String propertyValue, Destination destination) throws Exception
-   {
-      MessageProducer producer = session.createProducer(destination);
-      BytesMessage message = session.createBytesMessage();
-      message.setStringProperty(propertyName, propertyValue);
-      message.writeBytes(data);
-      producer.send(message);
-   }
-
-   protected void waitForReceipt() throws Exception
-   {
-      String frame = receiveFrame(50000);
-      assertNotNull(frame);
-      assertTrue(frame.indexOf("RECEIPT") > -1);
-   }
-
-   protected void waitForFrameToTakeEffect() throws InterruptedException
-   {
-      // bit of a dirty hack :)
-      // another option would be to force some kind of receipt to be returned
-      // from the frame
-      Thread.sleep(2000);
-   }
 }

Added: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java	2010-10-06 11:24:44 UTC (rev 9758)
@@ -0,0 +1,290 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.tests.integration.stomp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
+import org.hornetq.core.remoting.impl.netty.TransportConstants;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.jms.server.config.JMSConfiguration;
+import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
+import org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl;
+import org.hornetq.jms.server.config.impl.TopicConfigurationImpl;
+import org.hornetq.jms.server.impl.JMSServerManagerImpl;
+import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.tests.unit.util.InVMContext;
+import org.hornetq.tests.util.UnitTestCase;
+
+public abstract class StompTestBase extends UnitTestCase
+{
+   private static final transient Logger log = Logger.getLogger(StompTestBase.class);
+
+   private int port = 61613;
+
+   private Socket stompSocket;
+
+   private ByteArrayOutputStream inputBuffer;
+
+   private ConnectionFactory connectionFactory;
+
+   private Connection connection;
+
+   protected Session session;
+
+   protected Queue queue;
+
+   protected Topic topic;
+
+   protected JMSServerManager server;
+   
+   
+
+   // Implementation methods
+   // -------------------------------------------------------------------------
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      server = createServer();
+      server.start();
+      connectionFactory = createConnectionFactory();
+
+      stompSocket = createSocket();
+      inputBuffer = new ByteArrayOutputStream();
+
+      connection = connectionFactory.createConnection();
+      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      queue = session.createQueue(getQueueName());
+      topic = session.createTopic(getTopicName());
+      connection.start();
+   }
+
+   /**
+   * @return
+   * @throws Exception 
+   */
+   protected JMSServerManager createServer() throws Exception
+   {
+      Configuration config = new ConfigurationImpl();
+      config.setSecurityEnabled(false);
+      config.setPersistenceEnabled(false);
+
+      Map<String, Object> params = new HashMap<String, Object>();
+      params.put(TransportConstants.PROTOCOL_PROP_NAME, ProtocolType.STOMP.toString());
+      params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
+      TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
+      config.getAcceptorConfigurations().add(stompTransport);
+      config.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
+      HornetQServer hornetQServer = HornetQServers.newHornetQServer(config);
+
+      JMSConfiguration jmsConfig = new JMSConfigurationImpl();
+      jmsConfig.getQueueConfigurations()
+               .add(new JMSQueueConfigurationImpl(getQueueName(), null, false, getQueueName()));
+      jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl(getTopicName(), getTopicName()));
+      server = new JMSServerManagerImpl(hornetQServer, jmsConfig);
+      server.setContext(new InVMContext());
+      return server;
+   }
+
+   protected void tearDown() throws Exception
+   {
+      connection.close();
+      if (stompSocket != null)
+      {
+         stompSocket.close();
+      }
+      server.stop();
+
+      super.tearDown();
+   }
+
+   protected void reconnect() throws Exception
+   {
+      reconnect(0);
+   }
+
+   protected void reconnect(long sleep) throws Exception
+   {
+      stompSocket.close();
+
+      if (sleep > 0)
+      {
+         Thread.sleep(sleep);
+      }
+
+      stompSocket = createSocket();
+      inputBuffer = new ByteArrayOutputStream();
+   }
+
+   protected ConnectionFactory createConnectionFactory()
+   {
+      return new HornetQConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+   }
+
+   protected Socket createSocket() throws IOException
+   {
+      return new Socket("127.0.0.1", port);
+   }
+
+   protected String getQueueName()
+   {
+      return "test";
+   }
+
+   protected String getQueuePrefix()
+   {
+      return "jms.queue.";
+   }
+
+   protected String getTopicName()
+   {
+      return "testtopic";
+   }
+
+   protected String getTopicPrefix()
+   {
+      return "jms.topic.";
+   }
+
+   public void sendFrame(String data) throws Exception
+   {
+      byte[] bytes = data.getBytes("UTF-8");
+      OutputStream outputStream = stompSocket.getOutputStream();
+      for (int i = 0; i < bytes.length; i++)
+      {
+         outputStream.write(bytes[i]);
+      }
+      outputStream.flush();
+   }
+
+   public void sendFrame(byte[] data) throws Exception
+   {
+      OutputStream outputStream = stompSocket.getOutputStream();
+      for (int i = 0; i < data.length; i++)
+      {
+         outputStream.write(data[i]);
+      }
+      outputStream.flush();
+   }
+
+   public String receiveFrame(long timeOut) throws Exception
+   {
+      stompSocket.setSoTimeout((int)timeOut);
+      InputStream is = stompSocket.getInputStream();
+      int c = 0;
+      for (;;)
+      {
+         c = is.read();
+         if (c < 0)
+         {
+            throw new IOException("socket closed.");
+         }
+         else if (c == 0)
+         {
+            c = is.read();
+            if (c != '\n')
+            {
+               byte[] ba = inputBuffer.toByteArray();
+               System.out.println(new String(ba, "UTF-8"));
+            }
+            Assert.assertEquals("Expecting stomp frame to terminate with \0\n", c, '\n');
+            byte[] ba = inputBuffer.toByteArray();
+            inputBuffer.reset();
+            return new String(ba, "UTF-8");
+         }
+         else
+         {
+            inputBuffer.write(c);
+         }
+      }
+   }
+
+   public void sendMessage(String msg) throws Exception
+   {
+      sendMessage(msg, queue);
+   }
+
+   public void sendMessage(String msg, Destination destination) throws Exception
+   {
+      MessageProducer producer = session.createProducer(destination);
+      TextMessage message = session.createTextMessage(msg);
+      producer.send(message);
+   }
+
+   public void sendMessage(byte[] data, Destination destination) throws Exception
+   {
+      sendMessage(data, "foo", "xyz", destination);
+   }
+
+   public void sendMessage(String msg, String propertyName, String propertyValue) throws Exception
+   {
+      sendMessage(msg.getBytes("UTF-8"), propertyName, propertyValue, queue);
+   }
+
+   public void sendMessage(byte[] data, String propertyName, String propertyValue, Destination destination) throws Exception
+   {
+      MessageProducer producer = session.createProducer(destination);
+      BytesMessage message = session.createBytesMessage();
+      message.setStringProperty(propertyName, propertyValue);
+      message.writeBytes(data);
+      producer.send(message);
+   }
+
+   protected void waitForReceipt() throws Exception
+   {
+      String frame = receiveFrame(50000);
+      assertNotNull(frame);
+      assertTrue(frame.indexOf("RECEIPT") > -1);
+   }
+
+   protected void waitForFrameToTakeEffect() throws InterruptedException
+   {
+      // bit of a dirty hack :)
+      // another option would be to force some kind of receipt to be returned
+      // from the frame
+      Thread.sleep(2000);
+   }
+}



More information about the hornetq-commits mailing list