[hornetq-commits] JBoss hornetq SVN: r8848 - in trunk: tests/src/org/hornetq/tests/integration/stomp and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Jan 27 10:15:43 EST 2010


Author: jmesnil
Date: 2010-01-27 10:15:42 -0500 (Wed, 27 Jan 2010)
New Revision: 8848

Modified:
   trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
   trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0

* create a HornetQ queue when a subcription to a topic is made and create
  a consumer on this queue.

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-01-27 12:23:11 UTC (rev 8847)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-01-27 15:15:42 UTC (rev 8848)
@@ -189,15 +189,20 @@
 
          if (response != null)
          {
-            send(connection, response);
+            send(conn, response);
          }
+         
+         if (Stomp.Commands.DISCONNECT.equals(command))
+         {
+            conn.destroy();
+         }
       }
       catch (Exception e)
       {
          StompFrame error = createError(e, request);
          if (error != null)
          {
-            send(connection, error);
+            send(conn, error);
          }
       }
    }
@@ -235,6 +240,7 @@
          }
          subscriptionID = "subscription/" + destination;
       }
+      String hornetqDestination = StompUtils.toHornetQAddress(destination);
       StompSession stompSession = getSession(connection);
       if (stompSession.containsSubscription(subscriptionID))
       {
@@ -242,7 +248,7 @@
                                   ". Either use unique subscription IDs or do not create multiple subscriptions for the same destination");
       }
       long consumerID = server.getStorageManager().generateUniqueID();
-      stompSession.addSubscription(consumerID, subscriptionID, destination, selector, ack);
+      stompSession.addSubscription(consumerID, subscriptionID, hornetqDestination, selector, ack);
 
       return null;
    }
@@ -284,12 +290,9 @@
       StompSession stompSession = null;
       if (txID != null)
       {
-         throw new StompException("transactional ACK are not supported");
+         log.warn("Transactional acknowledgement is not supported");
       }
-      else
-      {
-         stompSession = getSession(connection);
-      }
+      stompSession = getSession(connection);
       stompSession.acknowledge(messageID);
       
       return null;
@@ -508,12 +511,12 @@
       return new StompFrame(Stomp.Responses.CONNECTED, h, StompMarshaller.NO_DATA);
    }
 
-   public int send(RemotingConnection connection, StompFrame frame)
+   public int send(StompConnection connection, StompFrame frame)
    {
       System.out.println(">>> " + frame);
       synchronized (connection)
       {
-         if (connection.isDestroyed())
+         if (connection.isDestroyed() || !connection.isValid())
          {
             log.warn("Connection closed " + connection);
             return 0;

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java	2010-01-27 12:23:11 UTC (rev 8847)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java	2010-01-27 15:15:42 UTC (rev 8848)
@@ -26,6 +26,7 @@
 import org.hornetq.core.server.ServerSession;
 import org.hornetq.spi.core.protocol.RemotingConnection;
 import org.hornetq.spi.core.protocol.SessionCallback;
+import org.hornetq.utils.UUIDGenerator;
 
 /**
  * A StompSession
@@ -146,17 +147,23 @@
 
    public void addSubscription(long consumerID, String subscriptionID, String destination, String selector, String ack) throws Exception
    {
-      String queue = StompUtils.toHornetQAddress(destination);
-         session.createConsumer(consumerID,
-                                SimpleString.toSimpleString(queue),
-                                SimpleString.toSimpleString(selector),
-                                false);
-         session.receiveConsumerCredits(consumerID, -1);
-         StompSubscription subscription = new StompSubscription(subscriptionID, destination, ack);
-         subscriptions.put(consumerID, subscription);
-         // FIXME not very smart: since we can't start the consumer, we start the session
-         // everytime to start the new consumer (and all previous consumers...)
-         session.start();
+      SimpleString queue = SimpleString.toSimpleString(destination);
+      if (destination.startsWith(StompUtils.HQ_TOPIC_PREFIX))
+      {
+         //subscribes to a topic
+         queue = UUIDGenerator.getInstance().generateSimpleStringUUID();
+         session.createQueue(SimpleString.toSimpleString(destination), queue, null, true, false);
+      }
+      session.createConsumer(consumerID,
+                             queue,
+                             SimpleString.toSimpleString(selector),
+                             false);
+      session.receiveConsumerCredits(consumerID, -1);
+      StompSubscription subscription = new StompSubscription(subscriptionID, destination, ack);
+      subscriptions.put(consumerID, subscription);
+      // FIXME not very smart: since we can't start the consumer, we start the session
+      // every time to start the new consumer (and all previous consumers...)
+      session.start();
    }
 
    public boolean unsubscribe(String id) throws Exception

Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2010-01-27 12:23:11 UTC (rev 8847)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2010-01-27 15:15:42 UTC (rev 8848)
@@ -32,15 +32,16 @@
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
+import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.jms.Topic;
 
 import junit.framework.Assert;
-import junit.framework.TestCase;
 
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.core.config.Configuration;
@@ -58,6 +59,7 @@
 import org.hornetq.jms.server.config.JMSConfiguration;
 import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
 import org.hornetq.jms.server.config.impl.QueueConfigurationImpl;
+import org.hornetq.jms.server.config.impl.TopicConfigurationImpl;
 import org.hornetq.jms.server.impl.JMSServerManagerImpl;
 import org.hornetq.spi.core.protocol.ProtocolType;
 import org.hornetq.tests.util.UnitTestCase;
@@ -71,6 +73,7 @@
     private Connection connection;
     private Session session;
     private Queue queue;
+    private Topic topic;
     private JMSServerManager server;
 
     public void testConnect() throws Exception {
@@ -101,10 +104,12 @@
                   "destination:/queue/" + getQueueName() + "\n\n" +
                   "Hello World" +
                   Stomp.NULL;
-       sendFrame(frame);
-
-       f = receiveFrame(10000);
-       Assert.assertTrue(f.startsWith("ERROR"));
+       try {
+          sendFrame(frame);
+          Assert.fail("the socket must have been closed when the server handled the DISCONNECT");
+       } catch (IOException e)
+       {
+       }
    }
 
 
@@ -599,7 +604,7 @@
        sendMessage(getName());
        frame = receiveFrame(10000);
        Assert.assertTrue(frame.startsWith("MESSAGE"));
-       Pattern cl = Pattern.compile("message-id:\\s*(\\d+)", Pattern.CASE_INSENSITIVE);
+       Pattern cl = Pattern.compile("message-id:\\s*(\\S+)", Pattern.CASE_INSENSITIVE);
        Matcher cl_matcher = cl.matcher(frame);
        Assert.assertTrue(cl_matcher.find());
        String messageID = cl_matcher.group(1);
@@ -1069,7 +1074,137 @@
         Assert.assertNotNull(message);
         Assert.assertEquals("second message", message.getText().trim());
     }
+    
+    public void testSubscribeToTopic() throws Exception {
 
+       String frame =
+               "CONNECT\n" +
+                       "login: brianm\n" +
+                       "passcode: wombats\n\n" +
+                       Stomp.NULL;
+       sendFrame(frame);
+
+       frame = receiveFrame(100000);
+       Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+       frame =
+               "SUBSCRIBE\n" +
+                       "destination:/topic/" + getTopicName() + "\n" +
+                       "\n\n" +
+                       Stomp.NULL;
+       sendFrame(frame);
+
+       sendMessage(getName(), topic);
+
+       frame = receiveFrame(10000);
+       Assert.assertTrue(frame.startsWith("MESSAGE"));
+       Assert.assertTrue(frame.indexOf("destination:") > 0);
+       Assert.assertTrue(frame.indexOf(getName()) > 0);
+
+       frame =
+          "UNSUBSCRIBE\n" +
+                  "destination:/topic/" + getTopicName() + "\n" +
+                  "\n\n" +
+                  Stomp.NULL;
+       sendFrame(frame);
+  
+       sendMessage(getName(), topic);
+
+       try {
+          frame = receiveFrame(1000);
+          log.info("Received frame: " + frame);
+          Assert.fail("No message should have been received since subscription was removed");
+      }
+      catch (SocketTimeoutException e) {
+
+      }
+       
+      frame =
+               "DISCONNECT\n" +
+                       "\n\n" +
+                       Stomp.NULL;
+       sendFrame(frame);
+   }
+    
+    public void testClientAckNotPartOfTransaction() throws Exception {
+
+       String frame =
+               "CONNECT\n" +
+                       "login: brianm\n" +
+                       "passcode: wombats\n\n" +
+                       Stomp.NULL;
+       sendFrame(frame);
+
+       frame = receiveFrame(100000);
+       Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+       frame =
+               "SUBSCRIBE\n" +
+                       "destination:/queue/" + getQueueName() + "\n" +
+                       "ack:client\n" +
+                       "\n\n" +
+                       Stomp.NULL;
+       sendFrame(frame);
+
+       sendMessage(getName());
+
+       frame = receiveFrame(10000);
+       System.out.println(frame);
+       Assert.assertTrue(frame.startsWith("MESSAGE"));
+       Assert.assertTrue(frame.indexOf("destination:") > 0);
+       Assert.assertTrue(frame.indexOf(getName()) > 0);
+       Assert.assertTrue(frame.indexOf("message-id:") > 0);
+       Pattern cl = Pattern.compile("message-id:\\s*(\\S+)", Pattern.CASE_INSENSITIVE);
+       Matcher cl_matcher = cl.matcher(frame);
+       Assert.assertTrue(cl_matcher.find());
+       String messageID = cl_matcher.group(1);
+       System.out.println(messageID);
+
+       frame =
+          "BEGIN\n" +
+                  "transaction: tx1\n" +
+                  "\n\n" +
+                  Stomp.NULL;
+       sendFrame(frame);
+
+       frame =
+          "ACK\n" +
+                  "message-id:" + messageID + "\n" +
+                  "transaction: tx1\n" +
+                  "\n" +
+                  "second message" +
+                  Stomp.NULL;
+       sendFrame(frame);
+
+       frame =
+          "ABORT\n" +
+                  "transaction: tx1\n" +
+                  "\n\n" +
+                  Stomp.NULL;
+       sendFrame(frame);
+  
+       try {
+           frame = receiveFrame(1000);
+           log.info("Received frame: " + frame);
+           Assert.fail("No message should have been received as the message was acked even though the transaction has been aborted");
+       }
+       catch (SocketTimeoutException e) {
+       }
+       
+       frame =
+          "UNSUBSCRIBE\n" +
+                  "destination:/topic/" + getQueueName() + "\n" +
+                  "\n\n" +
+                  Stomp.NULL;
+       sendFrame(frame);
+       
+      frame =
+               "DISCONNECT\n" +
+                       "\n\n" +
+                       Stomp.NULL;
+       sendFrame(frame);
+   }
+   
     // Implementation methods
     //-------------------------------------------------------------------------
     protected void setUp() throws Exception {
@@ -1083,6 +1218,7 @@
         connection = connectionFactory.createConnection();
         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         queue = session.createQueue(getQueueName());
+        topic = session.createTopic(getTopicName());
         connection.start();
     }
 
@@ -1106,6 +1242,7 @@
        
        JMSConfiguration jmsConfig = new JMSConfigurationImpl();
        jmsConfig.getQueueConfigurations().add(new QueueConfigurationImpl(getQueueName(), null, false, getQueueName()));
+       jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl(getTopicName(), getTopicName()));
        server = new JMSServerManagerImpl(hornetQServer, jmsConfig);
        server.setContext(null);
        return server;
@@ -1145,6 +1282,10 @@
         return "test";
     }
 
+    protected String getTopicName() {
+       return "testtopic";
+   }
+    
     public void sendFrame(String data) throws Exception {
         byte[] bytes = data.getBytes("UTF-8");
         OutputStream outputStream = stompSocket.getOutputStream();
@@ -1182,11 +1323,19 @@
     }
 
     public void sendMessage(String msg) throws Exception {
-        sendMessage(msg, "foo", "xyz");
+       sendMessage(msg, "foo", "xyz", queue);
+   }
+
+    public void sendMessage(String msg, Destination destination) throws Exception {
+        sendMessage(msg, "foo", "xyz", destination);
     }
 
     public void sendMessage(String msg, String propertyName, String propertyValue) throws JMSException {
-        MessageProducer producer = session.createProducer(queue);
+       sendMessage(msg, propertyName, propertyValue, queue);
+    }
+    
+    public void sendMessage(String msg, String propertyName, String propertyValue, Destination destination) throws JMSException {
+        MessageProducer producer = session.createProducer(destination);
         TextMessage message = session.createTextMessage(msg);
         message.setStringProperty(propertyName, propertyValue);
         producer.send(message);



More information about the hornetq-commits mailing list