[hornetq-commits] JBoss hornetq SVN: r8874 - in trunk: src/main/org/hornetq/core/protocol/stomp and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Feb 15 09:52:00 EST 2010


Author: jmesnil
Date: 2010-02-15 09:51:59 -0500 (Mon, 15 Feb 2010)
New Revision: 8874

Modified:
   trunk/docs/user-manual/en/interoperability.xml
   trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
   trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0

* added missing calls to coordinate with the storageManager
* added support for no-local header

Modified: trunk/docs/user-manual/en/interoperability.xml
===================================================================
--- trunk/docs/user-manual/en/interoperability.xml	2010-02-15 11:07:59 UTC (rev 8873)
+++ trunk/docs/user-manual/en/interoperability.xml	2010-02-15 14:51:59 UTC (rev 8874)
@@ -89,7 +89,7 @@
                 </programlisting>
               </listitem>
               <listitem>
-                <para>send or subscribe to a JMS <emphasis>Queue</emphasis> by prepending the topic name by <literal>jms.topic.</literal>.</para>
+                <para>send or subscribe to a JMS <emphasis>Topic</emphasis> by prepending the topic name by <literal>jms.topic.</literal>.</para>
                 <para>For example to subscribe to the <literal>stocks</literal> JMS Topic, the Stomp client must send the frame:</para>
                 <programlisting>
 SUBSCRIBE

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java	2010-02-15 11:07:59 UTC (rev 8873)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java	2010-02-15 14:51:59 UTC (rev 8874)
@@ -35,6 +35,8 @@
 
    private byte[] content = StompFrame.NO_DATA;
 
+   private int size = -1;
+
    public StompFrame()
    {
       this.headers = new HashMap<String, Object>();
@@ -62,9 +64,25 @@
       return headers;
    }
 
+   public int getEncodedSize()
+   {
+      if (size == -1)
+      {
+         throw new IllegalStateException("Frame has not been encoded yet");
+      }
+
+      return size ;
+   }
+
+   public void setEncodedSize(int size)
+   {
+      this.size = size;
+   }
+
    @Override
    public String toString()
    {
       return "StompFrame[command=" + command + ", headers=" + headers + ", content-length=" + content.length + "]";
    }
+
 }

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java	2010-02-15 11:07:59 UTC (rev 8873)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java	2010-02-15 14:51:59 UTC (rev 8874)
@@ -62,7 +62,9 @@
       DataOutputStream dos = new DataOutputStream(baos);
       marshal(command, dos);
       dos.close();
-      return baos.toByteArray();
+      byte[] bytes = baos.toByteArray();
+      command.setEncodedSize(bytes.length);
+      return bytes;
    }
 
    public void marshal(StompFrame stomp, DataOutput os) throws IOException

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-02-15 11:07:59 UTC (rev 8873)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-02-15 14:51:59 UTC (rev 8874)
@@ -27,10 +27,12 @@
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Interceptor;
 import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.ServerSession;
@@ -52,18 +54,21 @@
 
    private static final Logger log = Logger.getLogger(StompProtocolManager.class);
 
+   // TODO use same value than HornetQConnection
+   private static final String CONNECTION_ID_PROP = "__HQ_CID";
+   
    // Attributes ----------------------------------------------------
 
    private final HornetQServer server;
 
    private final StompMarshaller marshaller;
 
+   private final Executor executor;
+
    private final Map<String, StompSession> transactedSessions = new HashMap<String, StompSession>();
 
    private final Map<RemotingConnection, StompSession> sessions = new HashMap<RemotingConnection, StompSession>();
 
-   private Executor executor;
-
    // Static --------------------------------------------------------
 
    private static StompFrame createError(Exception e, StompFrame request)
@@ -129,7 +134,14 @@
       {
          public void run()
          {
-            doHandleBuffer(connection, buffer);
+            try
+            {
+               doHandleBuffer(connection, buffer);
+            } 
+            finally
+            {
+               server.getStorageManager().clearContext();
+            }
          }
       });
    }
@@ -240,7 +252,22 @@
       String selector = (String)headers.get(Stomp.Headers.Subscribe.SELECTOR);
       String ack = (String)headers.get(Stomp.Headers.Subscribe.ACK_MODE);
       String id = (String)headers.get(Stomp.Headers.Subscribe.ID);
-
+      boolean noLocal = false;
+      if (headers.containsKey(Stomp.Headers.Subscribe.NO_LOCAL))
+      {
+         noLocal = Boolean.parseBoolean((String)headers.get(Stomp.Headers.Subscribe.NO_LOCAL));
+      }
+      if (noLocal)
+      {
+         String noLocalFilter = CONNECTION_ID_PROP + " <> '"  + connection.getID().toString() + "'";
+         if (selector == null)
+         {
+            selector = noLocalFilter; 
+         } else
+         {
+            selector += " AND " + noLocalFilter;
+         }
+      }
       if (ack == null)
       {
          ack = Stomp.Headers.Subscribe.AckModeValues.AUTO;
@@ -259,11 +286,13 @@
          subscriptionID = "subscription/" + destination;
       }
       StompSession stompSession = getSession(connection);
+      stompSession.setNoLocal(noLocal);
       if (stompSession.containsSubscription(subscriptionID))
       {
          throw new StompException("There already is a subscription for: " + subscriptionID +
                                   ". Either use unique subscription IDs or do not create multiple subscriptions for the same destination");
       }
+      server.getStorageManager().setContext(stompSession.getContext());
       long consumerID = server.getStorageManager().generateUniqueID();
       stompSession.addSubscription(consumerID, subscriptionID, destination, selector, ack);
 
@@ -291,6 +320,7 @@
       }
 
       StompSession stompSession = getSession(connection);
+      server.getStorageManager().setContext(stompSession.getContext());
       boolean unsubscribed = stompSession.unsubscribe(subscriptionID);
       if (!unsubscribed)
       {
@@ -310,6 +340,7 @@
          log.warn("Transactional acknowledgement is not supported");
       }
       stompSession = getSession(connection);
+      server.getStorageManager().setContext(stompSession.getContext());
       stompSession.acknowledge(messageID);
 
       return null;
@@ -386,7 +417,7 @@
       StompSession stompSession = sessions.get(connection);
       if (stompSession == null)
       {
-         stompSession = new StompSession(connection, this);
+         stompSession = new StompSession(connection, this, server.getStorageManager().newContext(executor));
          String name = UUIDGenerator.getInstance().generateStringUUID();
          ServerSession session = server.createSession(name,
                                                       connection.getLogin(),
@@ -409,7 +440,7 @@
       StompSession stompSession = transactedSessions.get(txID);
       if (stompSession == null)
       {
-         stompSession = new StompSession(connection, this);
+         stompSession = new StompSession(connection, this, server.getStorageManager().newContext(executor));
          String name = UUIDGenerator.getInstance().generateStringUUID();
          ServerSession session = server.createSession(name,
                                                       connection.getLogin(),
@@ -490,17 +521,21 @@
          message.getBodyBuffer().writeBytes(content);
       }
 
-      ServerSession session = null;
+      StompSession stompSession = null;
       if (txID == null)
       {
-         session = getSession(connection).getSession();
+         stompSession = getSession(connection);
       }
       else
       {
-         session = transactedSessions.get(txID).getSession();
+         stompSession = transactedSessions.get(txID);
       }
-
-      session.send(message);
+      server.getStorageManager().setContext(stompSession.getContext());
+      if (stompSession.isNoLocal())
+      {
+         message.putStringProperty(CONNECTION_ID_PROP, connection.getID().toString());
+      }
+      stompSession.getSession().send(message);
       return null;
    }
 
@@ -528,32 +563,23 @@
       return new StompFrame(Stomp.Responses.CONNECTED, h, StompMarshaller.NO_DATA);
    }
 
-   public int send(StompConnection connection, StompFrame frame)
+   public void send(final StompConnection connection, final StompFrame frame)
    {
-      if (log.isTraceEnabled())
+      server.getStorageManager().afterCompleteOperations(new IOAsyncTask()
       {
-         log.trace("sent " + frame);
-      }
-      synchronized (connection)
-      {
-         if (connection.isDestroyed() || !connection.isValid())
+         public void onError(final int errorCode, final String errorMessage)
          {
-            log.warn("Connection closed " + connection);
-            return 0;
+            log.warn("Error processing IOCallback code = " + errorCode + " message = " + errorMessage);
+
+            StompFrame error = createError(new HornetQException(errorCode, errorMessage), frame);
+            doSend(connection, error);
          }
-         try
+
+         public void done()
          {
-            byte[] bytes = marshaller.marshal(frame);
-            HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
-            connection.getTransportConnection().write(buffer, true);
-            return bytes.length;
+            doSend(connection, frame);
          }
-         catch (IOException e)
-         {
-            log.error("Unable to send frame " + frame, e);
-            return 0;
-         }
-      }
+      });
    }
 
    public void cleanup(StompConnection connection)
@@ -597,6 +623,31 @@
          }
       }
    }
+   
+   private void doSend(final StompConnection connection, final StompFrame frame)
+   {
+      if (log.isTraceEnabled())
+      {
+         log.trace("sent " + frame);
+      }
+      synchronized (connection)
+      {
+         if (connection.isDestroyed() || !connection.isValid())
+         {
+            log.warn("Connection closed " + connection);
+         }
+         try
+         {
+            byte[] bytes = marshaller.marshal(frame);
+            HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+            connection.getTransportConnection().write(buffer, true);
+         }
+         catch (IOException e)
+         {
+            log.error("Unable to send frame " + frame, e);
+         }
+      }
+   }
 
    // Inner classes -------------------------------------------------
 }

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java	2010-02-15 11:07:59 UTC (rev 8873)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java	2010-02-15 14:51:59 UTC (rev 8874)
@@ -21,6 +21,7 @@
 import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.persistence.OperationContext;
 import org.hornetq.core.protocol.stomp.Stomp.Headers;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.ServerSession;
@@ -41,15 +42,20 @@
 
    private ServerSession session;
 
+   private final OperationContext sessionContext;
+
    private final Map<Long, StompSubscription> subscriptions = new HashMap<Long, StompSubscription>();
 
    // key = message ID, value = consumer ID
    private final Map<Long, Long> messagesToAck = new HashMap<Long, Long>();
 
-   StompSession(final StompConnection connection, final StompProtocolManager manager)
+   private boolean noLocal = false;
+
+   StompSession(final StompConnection connection, final StompProtocolManager manager, OperationContext sessionContext)
    {
       this.connection = connection;
       this.manager = manager;
+      this.sessionContext = sessionContext;
    }
 
    void setServerSession(ServerSession session)
@@ -102,8 +108,9 @@
          StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
          StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
 
-         int length = manager.send(connection, frame);
-
+         manager.send(connection, frame);
+         int size =  frame.getEncodedSize();
+         
          if (subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO))
          {
             session.acknowledge(consumerID, serverMessage.getMessageID());
@@ -113,7 +120,7 @@
          {
             messagesToAck.put(serverMessage.getMessageID(), consumerID);
          }
-         return length;
+         return size;
 
       }
       catch (Exception e)
@@ -201,4 +208,19 @@
    {
       return connection;
    }
+
+   public OperationContext getContext()
+   {
+      return sessionContext;
+   }
+
+   public boolean isNoLocal()
+   {
+      return noLocal;
+   }
+   
+   public void setNoLocal(boolean noLocal)
+   {
+      this.noLocal = noLocal;
+   }
 }
\ No newline at end of file

Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2010-02-15 11:07:59 UTC (rev 8873)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2010-02-15 14:51:59 UTC (rev 8874)
@@ -1143,6 +1143,59 @@
        sendFrame(frame);
    }
     
+    public void testSubscribeToTopicWithNoLocal() throws Exception {
+
+       String frame =
+               "CONNECT\n" +
+                       "login: brianm\n" +
+                       "passcode: wombats\n\n" +
+                       Stomp.NULL;
+       sendFrame(frame);
+
+       frame = receiveFrame(100000);
+       Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+       frame =
+               "SUBSCRIBE\n" +
+                       "destination:" + getTopicPrefix() + getTopicName() + "\n" +
+                       "receipt: 12\n" +
+                       "no-local: true\n" +
+                       "\n\n" +
+                       Stomp.NULL;
+       sendFrame(frame);
+       // wait for SUBSCRIBE's receipt
+       frame = receiveFrame(10000);
+       Assert.assertTrue(frame.startsWith("RECEIPT"));
+
+       // send a message on the same connection => it should not be received
+       frame = "SEND\n" +
+          "destination:" + getTopicPrefix() + getTopicName() + "\n\n" +
+                  "Hello World" +
+                  Stomp.NULL;
+       sendFrame(frame);
+  
+       try {
+          frame = receiveFrame(2000);
+          log.info("Received frame: " + frame);
+          Assert.fail("No message should have been received since subscription is noLocal");
+      }
+      catch (SocketTimeoutException e) {
+      }
+      
+      // send message on another JMS connection => it should be received
+      sendMessage(getName(), topic);
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("MESSAGE"));
+      Assert.assertTrue(frame.indexOf("destination:") > 0);
+      Assert.assertTrue(frame.indexOf(getName()) > 0);
+      
+      frame =
+               "DISCONNECT\n" +
+                       "\n\n" +
+                       Stomp.NULL;
+       sendFrame(frame);
+   }
+    
     public void testClientAckNotPartOfTransaction() throws Exception {
 
        String frame =



More information about the hornetq-commits mailing list