[hornetq-commits] JBoss hornetq SVN: r8878 - in trunk: tests/src/org/hornetq/tests/integration/stomp and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Feb 16 04:50:11 EST 2010


Author: jmesnil
Date: 2010-02-16 04:50:10 -0500 (Tue, 16 Feb 2010)
New Revision: 8878

Modified:
   trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
   trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0

* fixed deadlock

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java	2010-02-15 20:45:11 UTC (rev 8877)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java	2010-02-16 09:50:10 UTC (rev 8878)
@@ -17,6 +17,7 @@
  */
 package org.hornetq.core.protocol.stomp;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -68,17 +69,20 @@
    {
       if (size == -1)
       {
-         throw new IllegalStateException("Frame has not been encoded yet");
+         StompMarshaller marshaller = new StompMarshaller();
+         try
+         {
+            size = marshaller.marshal(this).length;
+         }
+         catch (IOException e)
+         {
+            return -1;
+         }
       }
 
       return size ;
    }
 
-   public void setEncodedSize(int size)
-   {
-      this.size = size;
-   }
-
    @Override
    public String toString()
    {

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java	2010-02-15 20:45:11 UTC (rev 8877)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java	2010-02-16 09:50:10 UTC (rev 8878)
@@ -62,9 +62,7 @@
       DataOutputStream dos = new DataOutputStream(baos);
       marshal(command, dos);
       dos.close();
-      byte[] bytes = baos.toByteArray();
-      command.setEncodedSize(bytes.length);
-      return bytes;
+      return baos.toByteArray();
    }
 
    public void marshal(StompFrame stomp, DataOutput os) throws IOException

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-02-15 20:45:11 UTC (rev 8877)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-02-16 09:50:10 UTC (rev 8878)
@@ -34,7 +34,6 @@
 import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.logging.Logger;
-import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.ServerSession;
 import org.hornetq.core.server.impl.ServerMessageImpl;
@@ -154,7 +153,6 @@
       try
       {
          request = marshaller.unmarshal(buffer);
-         System.out.println("received " + request);
          if (log.isTraceEnabled())
          {
             log.trace("received " + request);
@@ -165,27 +163,27 @@
          StompFrame response = null;
          if (Stomp.Commands.CONNECT.equals(command))
          {
-            response = onConnect(request, server, conn);
+            response = onConnect(request, conn);
          }
          else if (Stomp.Commands.DISCONNECT.equals(command))
          {
-            response = onDisconnect(request, server, conn);
+            response = onDisconnect(request, conn);
          }
          else if (Stomp.Commands.SEND.equals(command))
          {
-            response = onSend(request, server, conn);
+            response = onSend(request, conn);
          }
          else if (Stomp.Commands.SUBSCRIBE.equals(command))
          {
-            response = onSubscribe(request, server, conn);
+            response = onSubscribe(request, conn);
          }
          else if (Stomp.Commands.UNSUBSCRIBE.equals(command))
          {
-            response = onUnsubscribe(request, server, conn);
+            response = onUnsubscribe(request, conn);
          }
          else if (Stomp.Commands.ACK.equals(command))
          {
-            response = onAck(request, server, conn);
+            response = onAck(request, conn);
          }
          else if (Stomp.Commands.BEGIN.equals(command))
          {
@@ -193,11 +191,11 @@
          }
          else if (Stomp.Commands.COMMIT.equals(command))
          {
-            response = onCommit(request, server, conn);
+            response = onCommit(request, conn);
          }
          else if (Stomp.Commands.ABORT.equals(command))
          {
-            response = onAbort(request, server, conn);
+            response = onAbort(request, conn);
          }
          else
          {
@@ -248,7 +246,7 @@
 
    // Private -------------------------------------------------------
 
-   private StompFrame onSubscribe(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception
+   private StompFrame onSubscribe(StompFrame frame, StompConnection connection) throws Exception
    {
       Map<String, Object> headers = frame.getHeaders();
       String destination = (String)headers.get(Stomp.Headers.Subscribe.DESTINATION);
@@ -296,7 +294,6 @@
          throw new StompException("There already is a subscription for: " + subscriptionID +
                                   ". Either use unique subscription IDs or do not create multiple subscriptions for the same destination");
       }
-      server.getStorageManager().setContext(stompSession.getContext());
       long consumerID = server.getStorageManager().generateUniqueID();
       String clientID = (connection.getClientID() != null) ? connection.getClientID() : null;
       stompSession.addSubscription(consumerID, subscriptionID, clientID, durableSubscriptionName, destination, selector, ack);
@@ -304,7 +301,7 @@
       return null;
    }
 
-   private StompFrame onUnsubscribe(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception
+   private StompFrame onUnsubscribe(StompFrame frame, StompConnection connection) throws Exception
    {
       Map<String, Object> headers = frame.getHeaders();
       String destination = (String)headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
@@ -325,7 +322,6 @@
       }
 
       StompSession stompSession = getSession(connection);
-      server.getStorageManager().setContext(stompSession.getContext());
       boolean unsubscribed = stompSession.unsubscribe(subscriptionID);
       if (!unsubscribed)
       {
@@ -334,7 +330,7 @@
       return null;
    }
 
-   private StompFrame onAck(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception
+   private StompFrame onAck(StompFrame frame, StompConnection connection) throws Exception
    {
       Map<String, Object> headers = frame.getHeaders();
       String messageID = (String)headers.get(Stomp.Headers.Ack.MESSAGE_ID);
@@ -345,7 +341,6 @@
          log.warn("Transactional acknowledgement is not supported");
       }
       stompSession = getSession(connection);
-      server.getStorageManager().setContext(stompSession.getContext());
       stompSession.acknowledge(messageID);
 
       return null;
@@ -369,7 +364,7 @@
       return null;
    }
 
-   private StompFrame onCommit(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception
+   private StompFrame onCommit(StompFrame frame, StompConnection connection) throws Exception
    {
       Map<String, Object> headers = frame.getHeaders();
       String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
@@ -378,18 +373,18 @@
          throw new StompException("transaction header is mandatory to COMMIT a transaction");
       }
 
-      StompSession session = transactedSessions.remove(txID);
+      StompSession session = getTransactedSession(connection, txID);
       if (session == null)
       {
          throw new StompException("No transaction started: " + txID);
       }
-
+      transactedSessions.remove(txID);
       session.getSession().commit();
 
       return null;
    }
 
-   private StompFrame onAbort(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception
+   private StompFrame onAbort(StompFrame frame, StompConnection connection) throws Exception
    {
       Map<String, Object> headers = frame.getHeaders();
       String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
@@ -398,12 +393,13 @@
          throw new StompException("transaction header is mandatory to ABORT a transaction");
       }
 
-      StompSession session = transactedSessions.remove(txID);
-
+      StompSession session = getTransactedSession(connection, txID);
+      
       if (session == null)
       {
          throw new StompException("No transaction started: " + txID);
       }
+      transactedSessions.remove(txID);
       session.getSession().rollback(false);
 
       return null;
@@ -437,6 +433,7 @@
          stompSession.setServerSession(session);
          sessions.put(connection, stompSession);
       }
+      server.getStorageManager().setContext(stompSession.getContext());
       return stompSession;
    }
 
@@ -460,45 +457,17 @@
          stompSession.setServerSession(session);
          transactedSessions.put(txID, stompSession);
       }
+      server.getStorageManager().setContext(stompSession.getContext());
       return stompSession;
    }
 
-   private StompFrame onDisconnect(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception
+   private StompFrame onDisconnect(StompFrame frame, StompConnection connection) throws Exception
    {
-      connection.setValid(false);
-
-      StompSession session = sessions.remove(connection);
-      if (session != null)
-      {
-         try
-         {
-            session.getSession().rollback(true);
-            session.getSession().close();
-         }
-         catch (Exception e)
-         {
-            throw new StompException(e.getMessage());
-         }
-      }
-
-      // removed the transacted session belonging to the connection
-      Iterator<Entry<String, StompSession>> iterator = transactedSessions.entrySet().iterator();
-      while (iterator.hasNext())
-      {
-         Map.Entry<String, StompSession> entry = (Map.Entry<String, StompSession>)iterator.next();
-         if (entry.getValue().getConnection() == connection)
-         {
-            ServerSession serverSession = entry.getValue().getSession();
-            serverSession.rollback(true);
-            serverSession.close();
-            iterator.remove();
-         }
-      }
-
+      cleanup(connection);
       return null;
    }
 
-   private StompFrame onSend(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception
+   private StompFrame onSend(StompFrame frame, StompConnection connection) throws Exception
    {
       checkConnected(connection);
       Map<String, Object> headers = frame.getHeaders();
@@ -533,9 +502,8 @@
       }
       else
       {
-         stompSession = transactedSessions.get(txID);
+         stompSession = getTransactedSession(connection, txID);
       }
-      server.getStorageManager().setContext(stompSession.getContext());
       if (stompSession.isNoLocal())
       {
          message.putStringProperty(CONNECTION_ID_PROP, connection.getID().toString());
@@ -544,7 +512,7 @@
       return null;
    }
 
-   private StompFrame onConnect(StompFrame frame, HornetQServer server, final StompConnection connection) throws Exception
+   private StompFrame onConnect(StompFrame frame, final StompConnection connection) throws Exception
    {
       Map<String, Object> headers = frame.getHeaders();
       String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
@@ -637,7 +605,6 @@
 
    private void doSend(final StompConnection connection, final StompFrame frame)
    {
-      System.out.println("sent " + frame);
       if (log.isTraceEnabled())
       {
          log.trace("sent " + frame);
@@ -647,7 +614,9 @@
          if (connection.isDestroyed() || !connection.isValid())
          {
             log.warn("Connection closed " + connection);
+            return;
          }
+
          try
          {
             byte[] bytes = marshaller.marshal(frame);

Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2010-02-15 20:45:11 UTC (rev 8877)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2010-02-16 09:50:10 UTC (rev 8878)
@@ -665,11 +665,11 @@
         Assert.assertTrue(message.getJMSRedelivered());
     }
     
-    public void _testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws Exception {
+    public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws Exception {
         assertSubscribeWithClientAckThenConsumeWithAutoAck(false);
     }
 
-    public void _testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithExplicitDisconnect() throws Exception {
+    public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithExplicitDisconnect() throws Exception {
         assertSubscribeWithClientAckThenConsumeWithAutoAck(true);
     }
 
@@ -1174,7 +1174,8 @@
                   "\n\n" +
                   Stomp.NULL;
        sendFrame(disconnectFrame);
-
+       stompSocket.close();
+       
        // send the message when the durable subscriber is disconnected
        sendMessage(getName(), topic);
   



More information about the hornetq-commits mailing list