Author: gaohoward
Date: 2011-09-20 10:43:22 -0400 (Tue, 20 Sep 2011)
New Revision: 11375
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
more tests
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-09-20
11:53:56 UTC (rev 11374)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-09-20
14:43:22 UTC (rev 11375)
@@ -410,6 +410,14 @@
response.setNeedsDisconnect(true);
}
}
+ else
+ {
+ //request null, disconnect if so.
+ if (request.getCommand().equals(Stomp.Commands.DISCONNECT))
+ {
+ this.connection.disconnect();
+ }
+ }
return response;
}
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-20
11:53:56 UTC (rev 11374)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-20
14:43:22 UTC (rev 11375)
@@ -180,6 +180,14 @@
response.setNeedsDisconnect(true);
}
}
+ else
+ {
+ //request null, disconnect if so.
+ if (request.getCommand().equals(Stomp.Commands.DISCONNECT))
+ {
+ this.connection.disconnect();
+ }
+ }
return response;
}
@@ -461,7 +469,7 @@
if (reply.needsDisconnect())
{
- connection.destroy();
+ connection.disconnect();
}
else
{
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java 2011-09-20
11:53:56 UTC (rev 11374)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java 2011-09-20
14:43:22 UTC (rev 11375)
@@ -37,7 +37,10 @@
protected static final String LOGIN_HEADER = "login";
protected static final String PASSCODE_HEADER = "passcode";
+ //ext
+ protected static final String CLIENT_ID_HEADER = "client-id";
+
protected String version;
protected String host;
protected int port;
@@ -193,6 +196,21 @@
connect(null, null);
}
+ public void destroy()
+ {
+ try
+ {
+ close();
+ }
+ catch (IOException e)
+ {
+ }
+ finally
+ {
+ this.connected = false;
+ }
+ }
+
public void connect(String username, String password) throws Exception
{
throw new RuntimeException("connect method not implemented!");
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java 2011-09-20
11:53:56 UTC (rev 11374)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java 2011-09-20
14:43:22 UTC (rev 11375)
@@ -32,6 +32,8 @@
void connect(String defUser, String defPass) throws Exception;
+ void connect(String defUser, String defPass, String clientId) throws Exception;
+
boolean isConnected();
String getVersion();
@@ -44,6 +46,8 @@
void startPinger(long interval);
void stopPinger();
+
+ void destroy();
}
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java 2011-09-20
11:53:56 UTC (rev 11374)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java 2011-09-20
14:43:22 UTC (rev 11375)
@@ -47,6 +47,26 @@
}
}
+ public void connect(String username, String passcode, String clientID) throws
IOException, InterruptedException
+ {
+ ClientStompFrame frame = factory.newFrame(CONNECT_COMMAND);
+ frame.addHeader(LOGIN_HEADER, username);
+ frame.addHeader(PASSCODE_HEADER, passcode);
+ frame.addHeader(CLIENT_ID_HEADER, clientID);
+
+ ClientStompFrame response = this.sendFrame(frame);
+
+ if (response.getCommand().equals(CONNECTED_COMMAND))
+ {
+ connected = true;
+ }
+ else
+ {
+ System.out.println("Connection failed with: " + response);
+ connected = false;
+ }
+ }
+
@Override
public void disconnect() throws IOException, InterruptedException
{
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java 2011-09-20
11:53:56 UTC (rev 11374)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java 2011-09-20
14:43:22 UTC (rev 11375)
@@ -62,7 +62,37 @@
connected = false;
}
}
+
+ public void connect(String username, String passcode, String clientID) throws
IOException, InterruptedException
+ {
+ ClientStompFrame frame = factory.newFrame(CONNECT_COMMAND);
+ frame.addHeader(ACCEPT_HEADER, "1.1");
+ frame.addHeader(HOST_HEADER, "localhost");
+ frame.addHeader(CLIENT_ID_HEADER, clientID);
+
+ if (username != null)
+ {
+ frame.addHeader(LOGIN_HEADER, username);
+ frame.addHeader(PASSCODE_HEADER, passcode);
+ }
+ ClientStompFrame response = this.sendFrame(frame);
+
+ if (response.getCommand().equals(CONNECTED_COMMAND))
+ {
+ String version = response.getHeader(VERSION_HEADER);
+ assert(version.equals("1.1"));
+
+ this.username = username;
+ this.passcode = passcode;
+ this.connected = true;
+ }
+ else
+ {
+ connected = false;
+ }
+ }
+
public void connect1(String username, String passcode) throws IOException,
InterruptedException
{
ClientStompFrame frame = factory.newFrame(STOMP_COMMAND);
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-20
11:53:56 UTC (rev 11374)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-20
14:43:22 UTC (rev 11375)
@@ -18,6 +18,7 @@
package org.hornetq.tests.integration.stomp.v11;
import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -631,7 +632,7 @@
System.out.println("Received message with id " + messageID);
- ack(connV11, "sub1", messageID);
+ ack(connV11, "sub1", messageID, null);
unsubscribe(connV11, "sub1");
@@ -657,7 +658,7 @@
System.out.println("Received message with id " + messageID);
- ack(connV11, "sub2", messageID);
+ ack(connV11, "sub2", messageID, null);
ClientStompFrame error = connV11.receiveFrame();
@@ -687,7 +688,7 @@
System.out.println("Received message with id " + messageID);
- ack(connV11, "sub2", "someother");
+ ack(connV11, "sub2", "someother", null);
ClientStompFrame error = connV11.receiveFrame();
@@ -931,28 +932,197 @@
Assert.assertNull(message);
}
- private void ack(StompClientConnection connV112, String subId,
+ //tests below are adapted from StompTest
+ public void testBeginSameTransactionTwice() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ beginTransaction(connV11, "tx1");
+
+ beginTransaction(connV11, "tx1");
+
+ ClientStompFrame f = connV11.receiveFrame();
+ Assert.assertTrue(f.getCommand().equals("ERROR"));
+ }
+
+ public void testBodyWithUTF8() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, getName(), "auto");
+
+ String text = "A" + "\u00ea" + "\u00f1" +
"\u00fc" + "C";
+ System.out.println(text);
+ sendMessage(text);
+
+ ClientStompFrame frame = connV11.receiveFrame();
+ System.out.println(frame);
+ Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+ Assert.assertNotNull(frame.getHeader("destination"));
+ Assert.assertTrue(frame.getBody().equals(text));
+
+ connV11.disconnect();
+ }
+
+ public void testClientAckNotPartOfTransaction() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, getName(), "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+ Assert.assertNotNull(frame.getHeader("destination"));
+ Assert.assertTrue(frame.getBody().equals(getName()));
+ Assert.assertNotNull(frame.getHeader("message-id"));
+
+ String messageID = frame.getHeader("message-id");
+
+ beginTransaction(connV11, "tx1");
+
+ this.ack(connV11, getName(), messageID, "tx1");
+
+ abortTransaction(connV11, "tx1");
+
+ frame = connV11.receiveFrame();
+
+ assertNull(frame);
+
+ this.unsubscribe(connV11, getName());
+
+ connV11.disconnect();
+ }
+
+ public void testDisconnectAndError() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, getName(), "client");
+
+ ClientStompFrame frame = connV11.createFrame("DISCONNECT");
+ frame.addHeader("receipt", "1");
+
+ ClientStompFrame result = connV11.sendFrame(frame);
+
+ if (result == null || (!"RECEIPT".equals(result.getCommand())) ||
(!"1".equals(result.getHeader("receipt-id"))))
+ {
+ fail("Disconnect failed! " + result);
+ }
+
+ // sending a message will result in an error
+ ClientStompFrame sendFrame = connV11.createFrame("SEND");
+ sendFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ sendFrame.setBody("Hello World");
+
+ try
+ {
+ connV11.sendFrame(sendFrame);
+ fail("connection should have been closed by server.");
+ }
+ catch (ClosedChannelException e)
+ {
+ //ok.
+ }
+
+ connV11.destroy();
+ }
+
+ public void testDurableSubscriber() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "client", getName());
+
+ this.subscribe(connV11, "sub1", "client", getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+ Assert.assertTrue(frame.getCommand().equals("ERROR"));
+
+ connV11.disconnect();
+ }
+
+ public void testDurableSubscriberWithReconnection() throws Exception
+ {
+ connV11.connect(defUser, defPass, "myclientid");
+
+ this.subscribeTopic(connV11, "sub1", "auto", getName());
+
+ ClientStompFrame frame = connV11.createFrame("DISCONNECT");
+ frame.addHeader("receipt", "1");
+
+ ClientStompFrame result = connV11.sendFrame(frame);
+
+ if (result == null || (!"RECEIPT".equals(result.getCommand())) ||
(!"1".equals(result.getHeader("receipt-id"))))
+ {
+ fail("Disconnect failed! " + result);
+ }
+
+ // send the message when the durable subscriber is disconnected
+ sendMessage(getName(), topic);
+
+ connV11.destroy();
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
+ connV11.connect(defUser, defPass, "myclientid");
+
+ this.subscribeTopic(connV11, "sub1", "auto", getName());
+
+ // we must have received the message
+ frame = connV11.receiveFrame();
+
+ Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+ Assert.assertNotNull(frame.getHeader("destination"));
+ Assert.assertEquals(getName(), frame.getBody());
+
+ this.unsubscribe(connV11, "sub1");
+
+ connV11.disconnect();
+ }
+
+ private void abortTransaction(StompClientConnection conn, String txID) throws
IOException, InterruptedException
+ {
+ ClientStompFrame abortFrame = conn.createFrame("ABORT");
+ abortFrame.addHeader("transaction", txID);
+
+ conn.sendFrame(abortFrame);
+ }
+
+ private void beginTransaction(StompClientConnection conn, String txID) throws
IOException, InterruptedException
+ {
+ ClientStompFrame beginFrame = conn.createFrame("BEGIN");
+ beginFrame.addHeader("transaction", txID);
+
+ conn.sendFrame(beginFrame);
+ }
+
+ private void ack(StompClientConnection conn, String subId,
ClientStompFrame frame) throws IOException, InterruptedException
{
String messageID = frame.getHeader("message-id");
- ClientStompFrame ackFrame = connV11.createFrame("ACK");
+ ClientStompFrame ackFrame = conn.createFrame("ACK");
ackFrame.addHeader("subscription", subId);
ackFrame.addHeader("message-id", messageID);
- ClientStompFrame response = connV11.sendFrame(ackFrame);
+ ClientStompFrame response = conn.sendFrame(ackFrame);
if (response != null)
{
throw new IOException("failed to ack " + response);
}
}
- private void ack(StompClientConnection conn, String subId, String mid) throws
IOException, InterruptedException
+ private void ack(StompClientConnection conn, String subId, String mid, String txID)
throws IOException, InterruptedException
{
ClientStompFrame ackFrame = conn.createFrame("ACK");
ackFrame.addHeader("subscription", subId);
ackFrame.addHeader("message-id", mid);
+ if (txID != null)
+ {
+ ackFrame.addHeader("transaction", txID);
+ }
conn.sendFrame(ackFrame);
}
@@ -976,6 +1146,30 @@
conn.sendFrame(subFrame);
}
+ private void subscribe(StompClientConnection conn, String subId,
+ String ack, String durableId) throws IOException, InterruptedException
+ {
+ ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", subId);
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", ack);
+ subFrame.addHeader("durable-subscriber-name", durableId);
+
+ conn.sendFrame(subFrame);
+ }
+
+ private void subscribeTopic(StompClientConnection conn, String subId,
+ String ack, String durableId) throws IOException, InterruptedException
+ {
+ ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", subId);
+ subFrame.addHeader("destination", getTopicPrefix() + getTopicName());
+ subFrame.addHeader("ack", ack);
+ subFrame.addHeader("durable-subscriber-name", durableId);
+
+ conn.sendFrame(subFrame);
+ }
+
private void unsubscribe(StompClientConnection conn, String subId) throws IOException,
InterruptedException
{
ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE");