[hornetq-commits] JBoss hornetq SVN: r10918 - branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Jul 5 10:51:50 EDT 2011


Author: gurkapa
Date: 2011-07-05 10:51:49 -0400 (Tue, 05 Jul 2011)
New Revision: 10918

Modified:
   branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/Stomp.java
   branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
   branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompSession.java
Log:
HORNETQ-553 - work in progress but adding code for new ACK header, subscription; some interface changes not yet implemented for 1.1 and almost finished code for new command NACK
commiting before completed and tested since my computer is acting up and I don't want to lose anything.


Modified: branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/Stomp.java
===================================================================
--- branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/Stomp.java	2011-07-05 14:08:46 UTC (rev 10917)
+++ branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/Stomp.java	2011-07-05 14:51:49 UTC (rev 10918)
@@ -62,6 +62,8 @@
       String ABORT = "ABORT";
 
       String ACK = "ACK";
+      
+      String NACK = "NACK";
    }
 
    public interface Responses
@@ -153,6 +155,8 @@
             String AUTO = "auto";
 
             String CLIENT = "client";
+            
+            String CLIENT_INDIVIDUAL = "client-individual";
          }
       }
 
@@ -201,6 +205,15 @@
       public interface Ack
       {
          String MESSAGE_ID = "message-id";
+         
+         String SUBSCRIPTION = "subscription";
       }
+      
+      public interface Nack
+      {
+         String MESSAGE_ID = "message-id";
+
+         String SUBSCRIPTION = "subscription";
+      }
    }
 }

Modified: branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2011-07-05 14:08:46 UTC (rev 10917)
+++ branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2011-07-05 14:51:49 UTC (rev 10918)
@@ -205,6 +205,10 @@
             {
                response = onAck(request, conn);
             }
+            else if (Stomp.Commands.NACK.equals(command))
+            {
+               response = onNack(request, conn);
+            }
             else if (Stomp.Commands.BEGIN.equals(command))
             {
                response = onBegin(request, server, conn);
@@ -336,7 +340,12 @@
       }
       else
       {
-         if (destination == null)
+         if (Stomp.Versions.V11.equals(connection.getVersion()))
+         {            
+            // Subscription id is mandatory in version 1.1 of STOMP
+            throw new StompException("Client must set id header to a SUBSCRIBE command");
+         }
+         else if (destination == null)
          {
             throw new StompException("Client must set destination or id header to a SUBSCRIBE command");
          }
@@ -375,8 +384,13 @@
       }
       else
       {
-         if (destination == null)
+         if (Stomp.Versions.V11.equals(connection.getVersion()))
          {
+            // Subscription id is mandatory in version 1.1 of STOMP
+            throw new StompException("Must specify the subscription's id you are unsubscribing from");
+         }
+         else if (destination == null)
+         {
             throw new StompException("Must specify the subscription's id or the destination you are unsubscribing from");
          }
          subscriptionID = "subscription/" + destination;
@@ -397,16 +411,56 @@
       String messageID = (String)headers.get(Stomp.Headers.Ack.MESSAGE_ID);
       String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
       StompSession stompSession = null;
+      stompSession = getSession(connection);
+      if (connection.getVersion() == Stomp.Versions.V11)
+      {
+         String subscriptionID = (String)headers.get(Stomp.Headers.Ack.SUBSCRIPTION);
+         if (subscriptionID == null)
+         {
+            throw new StompException("Subscription header is mandatory in ACK command when using STOMP 1.1");         
+         }
+         if (!stompSession.containsSubscription(subscriptionID))
+         {
+            throw new StompException("No subscription with the given id was found in this session");
+         }
+      }
       if (txID != null)
       {
          log.warn("Transactional acknowledgement is not supported");
       }
-      stompSession = getSession(connection);
       stompSession.acknowledge(messageID);
 
       return null;
    }
 
+   private StompFrame onNack(StompFrame frame, StompConnection connection) throws Exception
+   {
+      Map<String, Object> headers = frame.getHeaders();
+      String messageID = (String)headers.get(Stomp.Headers.Nack.MESSAGE_ID);
+      String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
+      StompSession stompSession = null;
+      stompSession = getSession(connection);
+      if (connection.getVersion() == Stomp.Versions.V11)
+      {
+         String subscriptionID = (String)headers.get(Stomp.Headers.Nack.SUBSCRIPTION);
+         if (subscriptionID == null)
+         {
+            throw new StompException("Subscription header is mandatory in NACK command");         
+         }
+         if (!stompSession.containsSubscription(subscriptionID))
+         {
+            throw new StompException("No subscription with the given id was found in this session");
+         }
+      }
+      if (txID != null)
+      {
+         log.warn("Transactional acknowledgement is not supported");
+      }
+      stompSession.nacknowledge(messageID);
+      
+      return null;
+   }
+   
    private StompFrame onBegin(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception
    {
       Map<String, Object> headers = frame.getHeaders();

Modified: branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompSession.java	2011-07-05 14:08:46 UTC (rev 10917)
+++ branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompSession.java	2011-07-05 14:51:49 UTC (rev 10918)
@@ -176,6 +176,19 @@
       session.commit();
    }
 
+   public void nacknowledge(String messageID) throws Exception
+   {
+      long id = Long.parseLong(messageID);
+      long consumerID = messagesToAck.remove(id);
+      StompSubscription subscription = subscriptions.get(consumerID);
+      boolean nackAllNonAckedMessages = true;
+      if (subscription.getAck() == Stomp.Headers.Subscribe.AckModeValues.CLIENT_INDIVIDUAL)
+      {
+         nackAllNonAckedMessages = false;
+      }
+      
+   }
+   
    public void addSubscription(long consumerID,
                                String subscriptionID,
                                String clientID,



More information about the hornetq-commits mailing list