[hornetq-commits] JBoss hornetq SVN: r8845 - in trunk: tests/src/org/hornetq/tests/integration/stomp and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Jan 26 10:27:22 EST 2010


Author: jmesnil
Date: 2010-01-26 10:27:21 -0500 (Tue, 26 Jan 2010)
New Revision: 8845

Modified:
   trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java
   trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0

* added checks and headers support

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java	2010-01-26 13:31:50 UTC (rev 8844)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java	2010-01-26 15:27:21 UTC (rev 8845)
@@ -22,7 +22,6 @@
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.remoting.CloseListener;
 import org.hornetq.core.remoting.FailureListener;
-import org.hornetq.spi.core.protocol.ProtocolManager;
 import org.hornetq.spi.core.protocol.RemotingConnection;
 import org.hornetq.spi.core.remoting.Connection;
 

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-01-26 13:31:50 UTC (rev 8844)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-01-26 15:27:21 UTC (rev 8845)
@@ -19,12 +19,13 @@
 import java.io.PrintWriter;
 import java.io.UnsupportedEncodingException;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQBuffers;
-import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Interceptor;
 import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
@@ -81,7 +82,9 @@
             headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
          }
 
-         return new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
+         byte[] payload = baos.toByteArray();
+         headers.put(Stomp.Headers.CONTENT_LENGTH, payload.length);
+         return new StompFrame(Stomp.Responses.ERROR, headers, payload);
       }
       catch (UnsupportedEncodingException ex)
       {
@@ -175,12 +178,13 @@
 
          if (request.getHeaders().containsKey(Stomp.Headers.RECEIPT_REQUESTED))
          {
-            if (response == null) 
+            if (response == null)
             {
                Map<String, Object> h = new HashMap<String, Object>();
                response = new StompFrame(Stomp.Responses.RECEIPT, h, StompMarshaller.NO_DATA);
             }
-            response.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, request.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED));
+            response.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID,
+                                      request.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED));
          }
 
          if (response != null)
@@ -206,52 +210,88 @@
 
    // Private -------------------------------------------------------
 
-   private StompFrame onSubscribe(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception,
-                                                                                                     StompException,
-                                                                                                     HornetQException
+   private StompFrame onSubscribe(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception
    {
       Map<String, Object> headers = frame.getHeaders();
       String destination = (String)headers.get(Stomp.Headers.Subscribe.DESTINATION);
       String selector = (String)headers.get(Stomp.Headers.Subscribe.SELECTOR);
       String ack = (String)headers.get(Stomp.Headers.Subscribe.ACK_MODE);
-      String subID = (String)headers.get(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME);
+      String id = (String)headers.get(Stomp.Headers.Subscribe.ID);
 
       if (ack == null)
       {
          ack = Stomp.Headers.Subscribe.AckModeValues.AUTO;
       }
+      String subscriptionID = null;
+      if (id != null)
+      {
+         subscriptionID = id;
+      }
+      else
+      {
+         if (destination == null)
+         {
+            throw new StompException("Client must set destination or id header to a SUBSCRIBE command");
+         }
+         subscriptionID = "subscription/" + destination;
+      }
       StompSession stompSession = getSession(connection);
+      if (stompSession.containsSubscription(subscriptionID))
+      {
+         throw new StompException("There already is a subscription for: " + subscriptionID +
+                                  ". Either use unique subscription IDs or do not create multiple subscriptions for the same destination");
+      }
       long consumerID = server.getStorageManager().generateUniqueID();
-      stompSession.addSubscription(consumerID, subID, destination, selector, ack);
+      stompSession.addSubscription(consumerID, subscriptionID, destination, selector, ack);
 
       return null;
    }
 
-   private StompFrame onUnsubscribe(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception,
-                                                                                                       StompException,
-                                                                                                       HornetQException
+   private StompFrame onUnsubscribe(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception
    {
       Map<String, Object> headers = frame.getHeaders();
       String destination = (String)headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
       String id = (String)headers.get(Stomp.Headers.Unsubscribe.ID);
 
+      String subscriptionID = null;
+      if (id != null)
+      {
+         subscriptionID = id;
+      }
+      else
+      {
+         if (destination == null)
+         {
+            throw new StompException("Must specify the subscription's id or the destination you are unsubscribing from");
+         }
+         subscriptionID = "subscription/" + destination;
+      }
+
       StompSession stompSession = getSession(connection);
-      stompSession.unsubscribe(destination);
-
+      boolean unsubscribed = stompSession.unsubscribe(subscriptionID);
+      if (!unsubscribed)
+      {
+         throw new StompException("Cannot unsubscribe as o subscription exists for id: " + subscriptionID);
+      }
       return null;
    }
 
-   private StompFrame onAck(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception,
-                                                                                               StompException,
-                                                                                               HornetQException
+   private StompFrame onAck(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception
    {
       Map<String, Object> headers = frame.getHeaders();
-      String messageID = (String)headers.get(Stomp.Headers.Ack.MESSAGE_ID);
+      String messageID = (String)headers.get(Stomp.Headers.Ack.MESSAGE_ID);      
       String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
-
-      StompSession stompSession = getSession(connection);
+      StompSession stompSession = null;
+      if (txID != null)
+      {
+         throw new StompException("transactional ACK are not supported");
+      }
+      else
+      {
+         stompSession = getSession(connection);
+      }
       stompSession.acknowledge(messageID);
-
+      
       return null;
    }
 
@@ -259,24 +299,30 @@
    {
       Map<String, Object> headers = frame.getHeaders();
       String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
-
+      if (txID == null)
+      {
+         throw new StompException("transaction header is mandatory to BEGIN a transaction");
+      }
       if (transactedSessions.containsKey(txID))
       {
          throw new StompException("Transaction already started: " + txID);
       }
-      StompSession stompSession = getTransactedSession(connection, txID);
+      // create the transacted session
+      getTransactedSession(connection, txID);
+
       return null;
    }
 
-   private StompFrame onCommit(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception,
-                                                                                                  StompException,
-                                                                                                  HornetQException
+   private StompFrame onCommit(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception
    {
       Map<String, Object> headers = frame.getHeaders();
       String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
+      if (txID == null)
+      {
+         throw new StompException("transaction header is mandatory to COMMIT a transaction");
+      }
 
       StompSession session = transactedSessions.remove(txID);
-
       if (session == null)
       {
          throw new StompException("No transaction started: " + txID);
@@ -287,12 +333,14 @@
       return null;
    }
 
-   private StompFrame onAbort(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception,
-                                                                                                    StompException,
-                                                                                                    HornetQException
+   private StompFrame onAbort(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception
    {
       Map<String, Object> headers = frame.getHeaders();
       String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
+      if (txID == null)
+      {
+         throw new StompException("transaction header is mandatory to ABORT a transaction");
+      }
 
       StompSession session = transactedSessions.remove(txID);
 
@@ -361,7 +409,7 @@
 
    private StompFrame onDisconnect(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception
    {
-      StompSession session = getSession(connection);
+      StompSession session = sessions.remove(connection);
       if (session != null)
       {
          try
@@ -373,9 +421,23 @@
          {
             throw new StompException(e.getMessage());
          }
-         sessions.remove(connection);
-         connection.setValid(false);
       }
+
+      // removed the transacted session belonging to the connection
+      Iterator<Entry<String, StompSession>> iterator = transactedSessions.entrySet().iterator();
+      while (iterator.hasNext())
+      {
+         Map.Entry<String, StompSession> entry = (Map.Entry<String, StompSession>)iterator.next();
+         if (entry.getValue().getConnection() == connection)
+         {
+            ServerSession serverSession = entry.getValue().getSession();
+            serverSession.rollback(true);
+            serverSession.close();
+            iterator.remove();
+         }
+      }
+      connection.setValid(false);
+
       return null;
    }
 
@@ -412,7 +474,8 @@
       if (txID == null)
       {
          session = getSession(connection).getSession();
-      } else
+      }
+      else
       {
          session = transactedSessions.get(txID).getSession();
       }
@@ -461,7 +524,7 @@
       }
    }
 
-   public synchronized void cleanup(StompConnection conn)
+   synchronized void cleanup(StompConnection conn)
    {
       StompSession session = sessions.remove(conn);
       if (session != null)

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java	2010-01-26 13:31:50 UTC (rev 8844)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java	2010-01-26 15:27:21 UTC (rev 8845)
@@ -70,9 +70,15 @@
    {
       try
       {
+         StompSubscription subscription = subscriptions.get(consumerID);
+
          Map<String, Object> headers = new HashMap<String, Object>();
          headers.put(Stomp.Headers.Message.DESTINATION, StompUtils.toStompDestination(serverMessage.getAddress()
                                                                                                    .toString()));
+         if (subscription.getID() != null)
+         {
+            headers.put(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
+         }
          byte[] data = new byte[] {};
          if (serverMessage.getType() == Message.TEXT_TYPE)
          {
@@ -91,15 +97,15 @@
             buffer.readBytes(data);
             headers.put(Headers.CONTENT_LENGTH, data.length);
          }
+         
          StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
          StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
+         
          System.out.println(">>> " + frame);
          byte[] bytes = marshaller.marshal(frame);
          HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
          connection.getTransportConnection().write(buffer, true);
 
-         StompSubscription subscription = subscriptions.get(consumerID);
-
          if (subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO))
          {
             session.acknowledge(consumerID, serverMessage.getMessageID());
@@ -142,7 +148,7 @@
       session.commit();
    }
 
-   public void addSubscription(long consumerID, String clientID, String destination, String selector, String ack) throws Exception
+   public void addSubscription(long consumerID, String subscriptionID, String destination, String selector, String ack) throws Exception
    {
       String queue = StompUtils.toHornetQAddress(destination);
       synchronized (session)
@@ -152,7 +158,7 @@
                                 SimpleString.toSimpleString(selector),
                                 false);
          session.receiveConsumerCredits(consumerID, -1);
-         StompSubscription subscription = new StompSubscription(consumerID, clientID, destination, ack);
+         StompSubscription subscription = new StompSubscription(subscriptionID, destination, ack);
          subscriptions.put(consumerID, subscription);
          // FIXME not very smart: since we can't start the consumer, we start the session
          // everytime to start the new consumer (and all previous consumers...)
@@ -160,7 +166,7 @@
       }
    }
 
-   public void unsubscribe(String destination) throws Exception
+   public boolean unsubscribe(String id) throws Exception
    {
       Iterator<Entry<Long, StompSubscription>> iterator = subscriptions.entrySet().iterator();
       while (iterator.hasNext())
@@ -168,11 +174,33 @@
          Map.Entry<Long, StompSubscription> entry = (Map.Entry<Long, StompSubscription>)iterator.next();
          long consumerID = entry.getKey();
          StompSubscription sub = entry.getValue();
-         if (sub.getDestination().equals(destination))
+         if (id != null && id.equals(sub.getID()))
          {
             iterator.remove();
             session.closeConsumer(consumerID);
+            return true;
          }
       }
+      return false;
    }
+
+   boolean containsSubscription(String subscriptionID)
+   {     
+      Iterator<Entry<Long, StompSubscription>> iterator = subscriptions.entrySet().iterator();
+      while (iterator.hasNext())
+      {
+         Map.Entry<Long, StompSubscription> entry = (Map.Entry<Long, StompSubscription>)iterator.next();
+         StompSubscription sub = entry.getValue();
+         if (sub.getID().equals(subscriptionID))
+         {
+            return true;
+         }
+      }
+      return false;
+   }
+
+   public RemotingConnection getConnection()
+   {
+      return connection;
+   }
 }
\ No newline at end of file

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java	2010-01-26 13:31:50 UTC (rev 8844)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java	2010-01-26 15:27:21 UTC (rev 8845)
@@ -26,8 +26,6 @@
 
    // Attributes ----------------------------------------------------
 
-   private final long consumerID;
-
    private final String subID;
 
    private final String destination;
@@ -38,9 +36,8 @@
 
    // Constructors --------------------------------------------------
 
-   public StompSubscription(long consumerID, String subID, String destination, String ack)
+   public StompSubscription(String subID, String destination, String ack)
    {
-      this.consumerID = consumerID;
       this.subID = subID;
       this.destination = destination;
       this.ack = ack;
@@ -57,6 +54,11 @@
    {
       return destination;
    }
+   
+   public String getID()
+   {
+      return subID;
+   }
 
    // Package protected ---------------------------------------------
 

Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2010-01-26 13:31:50 UTC (rev 8844)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2010-01-26 15:27:21 UTC (rev 8845)
@@ -454,7 +454,42 @@
                         Stomp.NULL;
         sendFrame(frame);
     }
+    
+    public void testSubscribeWithID() throws Exception {
 
+       String frame =
+               "CONNECT\n" +
+                       "login: brianm\n" +
+                       "passcode: wombats\n\n" +
+                       Stomp.NULL;
+       sendFrame(frame);
+
+       frame = receiveFrame(100000);
+       Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+       frame =
+               "SUBSCRIBE\n" +
+                       "destination:/queue/" + getQueueName() + "\n" +
+                       "ack:auto\n" +
+                       "id: mysubid\n\n" +
+                       Stomp.NULL;
+       sendFrame(frame);
+
+       sendMessage(getName());
+
+       frame = receiveFrame(10000);
+       Assert.assertTrue(frame.startsWith("MESSAGE"));
+       Assert.assertTrue(frame.indexOf("destination:") > 0);
+       Assert.assertTrue(frame.indexOf("subscription:") > 0);
+       Assert.assertTrue(frame.indexOf(getName()) > 0);
+
+       frame =
+               "DISCONNECT\n" +
+                       "\n\n" +
+                       Stomp.NULL;
+       sendFrame(frame);
+   }
+
     public void testMessagesAreInOrder() throws Exception {
         int ctr = 10;
         String[] data = new String[ctr];
@@ -622,7 +657,7 @@
         Assert.assertNotNull(message);
         Assert.assertTrue(message.getJMSRedelivered());
     }
-
+    
     public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws Exception {
         assertSubscribeWithClientAckThenConsumeWithAutoAck(false);
     }
@@ -771,6 +806,55 @@
         }
     }
 
+    public void testUnsubscribeWithID() throws Exception {
+
+        String frame =
+                "CONNECT\n" +
+                        "login: brianm\n" +
+                        "passcode: wombats\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+        frame = receiveFrame(100000);
+        Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+        frame =
+                "SUBSCRIBE\n" +
+                        "destination:/queue/" + getQueueName() + "\n" +
+                        "id: mysubid\n" +
+                        "ack:auto\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        //send a message to our queue
+        sendMessage("first message");
+
+        //receive message from socket
+        frame = receiveFrame(10000);
+        Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+        //remove suscription
+        frame =
+                "UNSUBSCRIBE\n" +
+                        "id:mysubid\n" +
+                        "\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        waitForFrameToTakeEffect();
+
+        //send a message to our queue
+        sendMessage("second message");
+
+        try {
+            frame = receiveFrame(1000);
+            log.info("Received frame: " + frame);
+            Assert.fail("No message should have been received since subscription was removed");
+        }
+        catch (SocketTimeoutException e) {
+
+        }
+    }
+
     public void testTransactionCommit() throws Exception {
         MessageConsumer consumer = session.createConsumer(queue);
 
@@ -816,7 +900,106 @@
         TextMessage message = (TextMessage) consumer.receive(1000);
         Assert.assertNotNull("Should have received a message", message);
     }
+    
+    public void testSuccessiveTransactionsWithSameID() throws Exception {
+       MessageConsumer consumer = session.createConsumer(queue);
 
+       String frame =
+               "CONNECT\n" +
+                       "login: brianm\n" +
+                       "passcode: wombats\n\n" +
+                       Stomp.NULL;
+       sendFrame(frame);
+
+       String f = receiveFrame(1000);
+       Assert.assertTrue(f.startsWith("CONNECTED"));
+
+       // first tx
+       frame =
+               "BEGIN\n" +
+                       "transaction: tx1\n" +
+                       "\n\n" +
+                       Stomp.NULL;
+       sendFrame(frame);
+
+       frame =
+               "SEND\n" +
+                       "destination:/queue/" + getQueueName() + "\n" +
+                       "transaction: tx1\n" +
+                       "\n\n" +
+                       "Hello World" +
+                       Stomp.NULL;
+       sendFrame(frame);
+
+       frame =
+               "COMMIT\n" +
+                       "transaction: tx1\n" +
+                       "\n\n" +
+                       Stomp.NULL;
+       sendFrame(frame);
+
+       TextMessage message = (TextMessage) consumer.receive(1000);
+       Assert.assertNotNull("Should have received a message", message);
+
+       // 2nd tx with same tx ID
+       frame =
+               "BEGIN\n" +
+                       "transaction: tx1\n" +
+                       "\n\n" +
+                       Stomp.NULL;
+       sendFrame(frame);
+
+       frame =
+               "SEND\n" +
+                       "destination:/queue/" + getQueueName() + "\n" +
+                       "transaction: tx1\n" +
+                       "\n\n" +
+                       "Hello World" +
+                       Stomp.NULL;
+       sendFrame(frame);
+
+       frame =
+               "COMMIT\n" +
+                       "transaction: tx1\n" +
+                       "\n\n" +
+                       Stomp.NULL;
+       sendFrame(frame);
+
+       message = (TextMessage) consumer.receive(1000);
+       Assert.assertNotNull("Should have received a message", message);
+}
+    
+    public void testBeginSameTransactionTwice() throws Exception {
+       String frame =
+               "CONNECT\n" +
+                       "login: brianm\n" +
+                       "passcode: wombats\n\n" +
+                       Stomp.NULL;
+       sendFrame(frame);
+
+       String f = receiveFrame(1000);
+       Assert.assertTrue(f.startsWith("CONNECTED"));
+
+       frame =
+               "BEGIN\n" +
+                       "transaction: tx1\n" +
+                       "\n\n" +
+                       Stomp.NULL;
+       sendFrame(frame);
+
+       // begin the tx a 2nd time
+       frame =
+          "BEGIN\n" +
+                  "transaction: tx1\n" +
+                  "\n\n" +
+                  Stomp.NULL;
+       sendFrame(frame);
+
+       f = receiveFrame(1000);
+       Assert.assertTrue(f.startsWith("ERROR"));
+
+   }
+
     public void testTransactionRollback() throws Exception {
         MessageConsumer consumer = session.createConsumer(queue);
 



More information about the hornetq-commits mailing list