[hornetq-commits] JBoss hornetq SVN: r8898 - in trunk: tests/src/org/hornetq/tests/concurrent and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Feb 24 04:36:54 EST 2010


Author: jmesnil
Date: 2010-02-24 04:36:53 -0500 (Wed, 24 Feb 2010)
New Revision: 8898

Added:
   trunk/tests/src/org/hornetq/tests/concurrent/stomp/
   trunk/tests/src/org/hornetq/tests/concurrent/stomp/ConcurrentStompTest.java
Modified:
   trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0

* added concurrent test with 1 socket sending Stomp messages and another socket receiving them

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-02-23 16:03:09 UTC (rev 8897)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-02-24 09:36:53 UTC (rev 8898)
@@ -141,14 +141,20 @@
 
    public void handleBuffer(final RemotingConnection connection, final HornetQBuffer buffer)
    {
-      try
+      executor.execute(new Runnable()
       {
-         doHandleBuffer(connection, buffer);
-      } 
-      finally
-      {
-         server.getStorageManager().clearContext();
-      }
+         public void run()
+         {
+            try
+            {
+               doHandleBuffer(connection, buffer);
+            } 
+            finally
+            {
+               server.getStorageManager().clearContext();
+            }
+         }
+      });
    }
    
    private void doHandleBuffer(final RemotingConnection connection, final HornetQBuffer buffer)
@@ -542,13 +548,7 @@
 
          public void done()
          {
-            executor.execute(new Runnable()
-            {
-               public void run()
-               {
-                  doSend(connection, frame);
-               }
-            });
+            doSend(connection, frame);
          }
       });
    }

Added: trunk/tests/src/org/hornetq/tests/concurrent/stomp/ConcurrentStompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/concurrent/stomp/ConcurrentStompTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/concurrent/stomp/ConcurrentStompTest.java	2010-02-24 09:36:53 UTC (rev 8898)
@@ -0,0 +1,218 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.concurrent.stomp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.CoreQueueConfiguration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.stomp.Stomp;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
+import org.hornetq.integration.transports.netty.TransportConstants;
+import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.tests.util.UnitTestCase;
+
+public class ConcurrentStompTest extends UnitTestCase
+{
+   private static final transient Logger log = Logger.getLogger(ConcurrentStompTest.class);
+
+   private int port = 61613;
+
+   private Socket stompSocket;
+
+   private ByteArrayOutputStream inputBuffer;
+
+   private Socket stompSocket_2;
+
+   private ByteArrayOutputStream inputBuffer_2;
+
+   private HornetQServer server;
+
+   /**
+    * Send messages on 1 socket and receives them concurrently on another socket.
+    */
+   public void testSendManyMessages() throws Exception
+   {
+      String connect = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+
+      sendFrame(stompSocket, connect);
+      String connected = receiveFrame(stompSocket, inputBuffer, 10000);
+      Assert.assertTrue(connected.startsWith("CONNECTED"));
+      
+      sendFrame(stompSocket_2, connect);
+      connected = receiveFrame(stompSocket_2, inputBuffer_2, 10000);
+      Assert.assertTrue(connected.startsWith("CONNECTED"));
+      
+      final int count = 1000;
+      final CountDownLatch latch = new CountDownLatch(count);
+      
+      String subscribe = 
+         "SUBSCRIBE\n" +
+         "destination:" + getQueueName() + "\n" +
+         "ack:auto\n\n" +
+         Stomp.NULL;
+      sendFrame(stompSocket_2, subscribe);
+      Thread.sleep(2000);
+      
+      new Thread()
+      {
+         public void run()
+         {
+            int i = 0;
+            while (true)
+            {
+               try
+               {
+                  String frame = receiveFrame(stompSocket_2, inputBuffer_2, 10000);
+                  Assert.assertTrue(frame.startsWith("MESSAGE"));
+                  Assert.assertTrue(frame.indexOf("destination:") > 0);
+                  System.out.println("<<< " + i++);
+                  latch.countDown();
+               }
+               catch (Exception e)
+               {
+                  break;
+               }
+            }
+         };
+      }.start();
+
+      String send = "SEND\n" + "destination:" + getQueueName() + "\n";
+      for (int i = 1; i <= count; i++)
+      {
+         // Thread.sleep(1);
+         System.out.println(">>> " + i);
+         sendFrame(stompSocket, send + "count:" + i + "\n\n" + Stomp.NULL);
+      }
+
+      assertTrue(latch.await(60, TimeUnit.SECONDS));
+
+   }
+
+   // Implementation methods
+   // -------------------------------------------------------------------------
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      server = createServer();
+      server.start();
+
+      stompSocket = createSocket();
+      inputBuffer = new ByteArrayOutputStream();
+      stompSocket_2 = createSocket();
+      inputBuffer_2 = new ByteArrayOutputStream();
+
+   }
+
+   private HornetQServer createServer() throws Exception
+   {
+      Configuration config = new ConfigurationImpl();
+      config.setSecurityEnabled(false);
+      config.setPersistenceEnabled(false);
+
+      Map<String, Object> params = new HashMap<String, Object>();
+      params.put(TransportConstants.PROTOCOL_PROP_NAME, ProtocolType.STOMP.toString());
+      params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
+      TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
+      config.getAcceptorConfigurations().add(stompTransport);
+      config.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
+      config.getQueueConfigurations().add(new CoreQueueConfiguration(getQueueName(), getQueueName(), null, false));
+      return HornetQServers.newHornetQServer(config);
+   }
+
+   protected void tearDown() throws Exception
+   {
+      if (stompSocket != null)
+      {
+         stompSocket.close();
+      }
+
+      if (stompSocket_2 != null)
+      {
+         stompSocket_2.close();
+      }
+      server.stop();
+
+      super.tearDown();
+   }
+
+   protected Socket createSocket() throws IOException
+   {
+      return new Socket("127.0.0.1", port);
+   }
+
+   protected String getQueueName()
+   {
+      return "test";
+   }
+
+   public void sendFrame(Socket socket, String data) throws Exception
+   {
+      byte[] bytes = data.getBytes("UTF-8");
+      OutputStream outputStream = socket.getOutputStream();
+      for (int i = 0; i < bytes.length; i++)
+      {
+         outputStream.write(bytes[i]);
+      }
+      outputStream.flush();
+   }
+
+   public String receiveFrame(Socket socket, ByteArrayOutputStream inputBuffer, long timeOut) throws Exception
+   {
+      socket.setSoTimeout((int)timeOut);
+      InputStream is = socket.getInputStream();
+      int c = 0;
+      for (;;)
+      {
+         c = is.read();
+         if (c < 0)
+         {
+            throw new IOException("socket closed.");
+         }
+         else if (c == 0)
+         {
+            c = is.read();
+            if (c != '\n')
+            {
+               byte[] ba = inputBuffer.toByteArray();
+               System.out.println(new String(ba, "UTF-8"));
+            }
+            Assert.assertEquals("Expecting stomp frame to terminate with \0\n", c, '\n');
+            byte[] ba = inputBuffer.toByteArray();
+            inputBuffer.reset();
+            return new String(ba, "UTF-8");
+         }
+         else
+         {
+            inputBuffer.write(c);
+         }
+      }
+   }
+}



More information about the hornetq-commits mailing list