Author: gurkapa
Date: 2011-07-05 10:51:49 -0400 (Tue, 05 Jul 2011)
New Revision: 10918
Modified:
branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/Stomp.java
branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompSession.java
Log:
HORNETQ-553 - work in progress but adding code for new ACK header, subscription; some
interface changes not yet implemented for 1.1 and almost finished code for new command
NACK
commiting before completed and tested since my computer is acting up and I don't want
to lose anything.
Modified: branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/Stomp.java
===================================================================
--- branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/Stomp.java 2011-07-05
14:08:46 UTC (rev 10917)
+++ branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/Stomp.java 2011-07-05
14:51:49 UTC (rev 10918)
@@ -62,6 +62,8 @@
String ABORT = "ABORT";
String ACK = "ACK";
+
+ String NACK = "NACK";
}
public interface Responses
@@ -153,6 +155,8 @@
String AUTO = "auto";
String CLIENT = "client";
+
+ String CLIENT_INDIVIDUAL = "client-individual";
}
}
@@ -201,6 +205,15 @@
public interface Ack
{
String MESSAGE_ID = "message-id";
+
+ String SUBSCRIPTION = "subscription";
}
+
+ public interface Nack
+ {
+ String MESSAGE_ID = "message-id";
+
+ String SUBSCRIPTION = "subscription";
+ }
}
}
Modified:
branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
---
branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-07-05
14:08:46 UTC (rev 10917)
+++
branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-07-05
14:51:49 UTC (rev 10918)
@@ -205,6 +205,10 @@
{
response = onAck(request, conn);
}
+ else if (Stomp.Commands.NACK.equals(command))
+ {
+ response = onNack(request, conn);
+ }
else if (Stomp.Commands.BEGIN.equals(command))
{
response = onBegin(request, server, conn);
@@ -336,7 +340,12 @@
}
else
{
- if (destination == null)
+ if (Stomp.Versions.V11.equals(connection.getVersion()))
+ {
+ // Subscription id is mandatory in version 1.1 of STOMP
+ throw new StompException("Client must set id header to a SUBSCRIBE
command");
+ }
+ else if (destination == null)
{
throw new StompException("Client must set destination or id header to a
SUBSCRIBE command");
}
@@ -375,8 +384,13 @@
}
else
{
- if (destination == null)
+ if (Stomp.Versions.V11.equals(connection.getVersion()))
{
+ // Subscription id is mandatory in version 1.1 of STOMP
+ throw new StompException("Must specify the subscription's id you are
unsubscribing from");
+ }
+ else if (destination == null)
+ {
throw new StompException("Must specify the subscription's id or the
destination you are unsubscribing from");
}
subscriptionID = "subscription/" + destination;
@@ -397,16 +411,56 @@
String messageID = (String)headers.get(Stomp.Headers.Ack.MESSAGE_ID);
String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
StompSession stompSession = null;
+ stompSession = getSession(connection);
+ if (connection.getVersion() == Stomp.Versions.V11)
+ {
+ String subscriptionID = (String)headers.get(Stomp.Headers.Ack.SUBSCRIPTION);
+ if (subscriptionID == null)
+ {
+ throw new StompException("Subscription header is mandatory in ACK
command when using STOMP 1.1");
+ }
+ if (!stompSession.containsSubscription(subscriptionID))
+ {
+ throw new StompException("No subscription with the given id was found in
this session");
+ }
+ }
if (txID != null)
{
log.warn("Transactional acknowledgement is not supported");
}
- stompSession = getSession(connection);
stompSession.acknowledge(messageID);
return null;
}
+ private StompFrame onNack(StompFrame frame, StompConnection connection) throws
Exception
+ {
+ Map<String, Object> headers = frame.getHeaders();
+ String messageID = (String)headers.get(Stomp.Headers.Nack.MESSAGE_ID);
+ String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
+ StompSession stompSession = null;
+ stompSession = getSession(connection);
+ if (connection.getVersion() == Stomp.Versions.V11)
+ {
+ String subscriptionID = (String)headers.get(Stomp.Headers.Nack.SUBSCRIPTION);
+ if (subscriptionID == null)
+ {
+ throw new StompException("Subscription header is mandatory in NACK
command");
+ }
+ if (!stompSession.containsSubscription(subscriptionID))
+ {
+ throw new StompException("No subscription with the given id was found in
this session");
+ }
+ }
+ if (txID != null)
+ {
+ log.warn("Transactional acknowledgement is not supported");
+ }
+ stompSession.nacknowledge(messageID);
+
+ return null;
+ }
+
private StompFrame onBegin(StompFrame frame, HornetQServer server, StompConnection
connection) throws Exception
{
Map<String, Object> headers = frame.getHeaders();
Modified: branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
---
branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2011-07-05
14:08:46 UTC (rev 10917)
+++
branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2011-07-05
14:51:49 UTC (rev 10918)
@@ -176,6 +176,19 @@
session.commit();
}
+ public void nacknowledge(String messageID) throws Exception
+ {
+ long id = Long.parseLong(messageID);
+ long consumerID = messagesToAck.remove(id);
+ StompSubscription subscription = subscriptions.get(consumerID);
+ boolean nackAllNonAckedMessages = true;
+ if (subscription.getAck() ==
Stomp.Headers.Subscribe.AckModeValues.CLIENT_INDIVIDUAL)
+ {
+ nackAllNonAckedMessages = false;
+ }
+
+ }
+
public void addSubscription(long consumerID,
String subscriptionID,
String clientID,
Show replies by date