Author: gaohoward
Date: 2011-09-23 12:09:48 -0400 (Fri, 23 Sep 2011)
New Revision: 11404
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrame.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
tests
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-09-23
13:53:53 UTC (rev 11403)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-09-23
16:09:48 UTC (rev 11404)
@@ -13,7 +13,6 @@
package org.hornetq.core.protocol.stomp.v10;
import java.io.UnsupportedEncodingException;
-import java.util.List;
import java.util.Map;
import org.hornetq.api.core.HornetQBuffer;
@@ -27,7 +26,6 @@
import org.hornetq.core.protocol.stomp.StompConnection;
import org.hornetq.core.protocol.stomp.StompDecoder;
import org.hornetq.core.protocol.stomp.StompFrame;
-import org.hornetq.core.protocol.stomp.StompFrame.Header;
import org.hornetq.core.protocol.stomp.StompSubscription;
import org.hornetq.core.protocol.stomp.StompUtils;
import org.hornetq.core.protocol.stomp.VersionedStompFrameHandler;
@@ -53,7 +51,6 @@
@Override
public StompFrame onConnect(StompFrame frame)
{
- log.error("-----------------onConnection ()");
StompFrame response = null;
Map<String, String> headers = frame.getHeadersMap();
String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
@@ -61,10 +58,8 @@
String clientID = (String)headers.get(Stomp.Headers.Connect.CLIENT_ID);
String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
- log.error("------------ validating user: " + login + " code " +
passcode);
if (connection.validateUser(login, passcode))
{
- log.error("-------user OK!!!");
connection.setClientID(clientID);
connection.setValid(true);
@@ -84,7 +79,6 @@
}
else
{
- log.error("--------user NOT ok!!");
//not valid
response = new StompFrameV10(Stomp.Responses.ERROR);
response.addHeader(Stomp.Headers.Error.MESSAGE, "Failed to connect");
@@ -95,7 +89,6 @@
catch (UnsupportedEncodingException e)
{
log.error("Encoding problem", e);
- //then we will send a null body message.
}
}
return response;
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java 2011-09-23
13:53:53 UTC (rev 11403)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java 2011-09-23
16:09:48 UTC (rev 11404)
@@ -70,7 +70,7 @@
}
sb.append((char)0);
- String data = new String(sb.toString());
+ String data = sb.toString();
byte[] byteValue = data.getBytes("UTF-8");
@@ -82,6 +82,35 @@
}
@Override
+ public ByteBuffer toByteBufferWithExtra(String str) throws
UnsupportedEncodingException
+ {
+ StringBuffer sb = new StringBuffer();
+ sb.append(command + "\n");
+ int n = headers.size();
+ for (int i = 0; i < n; i++)
+ {
+ sb.append(headers.get(i).key + ":" + headers.get(i).val +
"\n");
+ }
+ sb.append("\n");
+ if (body != null)
+ {
+ sb.append(body);
+ }
+ sb.append((char)0);
+ sb.append(str);
+
+ String data = sb.toString();
+
+ byte[] byteValue = data.getBytes("UTF-8");
+
+ ByteBuffer buffer = ByteBuffer.allocateDirect(byteValue.length);
+ buffer.put(byteValue);
+
+ buffer.rewind();
+ return buffer;
+ }
+
+ @Override
public boolean needsReply()
{
if ("CONNECT".equals(command) || headerKeys.contains(HEADER_RECEIPT))
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java 2011-09-23
13:53:53 UTC (rev 11403)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java 2011-09-23
16:09:48 UTC (rev 11404)
@@ -113,7 +113,38 @@
}
return response;
}
-
+
+ public ClientStompFrame sendWickedFrame(ClientStompFrame frame) throws IOException,
InterruptedException
+ {
+ ClientStompFrame response = null;
+ ByteBuffer buffer = frame.toByteBufferWithExtra("\n");
+
+ while (buffer.remaining() > 0)
+ {
+ socketChannel.write(buffer);
+ }
+
+ //now response
+ if (frame.needsReply())
+ {
+ response = receiveFrame();
+
+ //filter out server ping
+ while (response != null)
+ {
+ if (response.getCommand().equals("STOMP"))
+ {
+ response = receiveFrame();
+ }
+ else
+ {
+ break;
+ }
+ }
+ }
+ return response;
+ }
+
public ClientStompFrame receiveFrame() throws InterruptedException
{
return frameQueue.poll(10, TimeUnit.SECONDS);
@@ -138,9 +169,16 @@
frameBytes[j] = receiveList.get(j);
}
ClientStompFrame frame = factory.createFrame(new String(frameBytes,
"UTF-8"));
- frameQueue.offer(frame);
- receiveList.clear();
+ if (validateFrame(frame))
+ {
+ frameQueue.offer(frame);
+ receiveList.clear();
+ }
+ else
+ {
+ receiveList.add(b);
+ }
}
}
else
@@ -152,6 +190,20 @@
readBuffer.rewind();
}
+ private boolean validateFrame(ClientStompFrame f) throws UnsupportedEncodingException
+ {
+ String h = f.getHeader("content-length");
+ if (h != null)
+ {
+ int len = Integer.valueOf(h);
+ if (f.getBody().getBytes("UTF-8").length < len)
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
protected void close() throws IOException
{
socketChannel.close();
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrame.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrame.java 2011-09-23
13:53:53 UTC (rev 11403)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrame.java 2011-09-23
16:09:48 UTC (rev 11404)
@@ -39,5 +39,7 @@
public String getHeader(String header);
public String getBody();
+
+ public ByteBuffer toByteBufferWithExtra(String str) throws
UnsupportedEncodingException;
}
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java 2011-09-23
13:53:53 UTC (rev 11403)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java 2011-09-23
16:09:48 UTC (rev 11404)
@@ -48,6 +48,8 @@
void stopPinger();
void destroy();
+
+ ClientStompFrame sendWickedFrame(ClientStompFrame frame) throws IOException,
InterruptedException;
}
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-23
13:53:53 UTC (rev 11403)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-23
16:09:48 UTC (rev 11404)
@@ -22,6 +22,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import javax.jms.BytesMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
@@ -1210,7 +1211,104 @@
connV11.disconnect();
}
+ public void testSendMessage() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+ connV11.connect(defUser, defPass);
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.setBody("Hello World");
+
+ connV11.sendFrame(frame);
+
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+ // Assert default priority 4 is used when priority header is not set
+ Assert.assertEquals("getJMSPriority", 4, message.getJMSPriority());
+
+ // Make sure that the timestamp is valid - should
+ // be very close to the current time.
+ long tnow = System.currentTimeMillis();
+ long tmsg = message.getJMSTimestamp();
+ Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+ }
+
+ public void testSendMessageWithContentLength() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ byte[] data = new byte[] { 1, 0, 0, 4 };
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.setBody(new String(data, "UTF-8"));
+
+ frame.addHeader("content-length", String.valueOf(data.length));
+
+ connV11.sendFrame(frame);
+
+ BytesMessage message = (BytesMessage)consumer.receive(10000);
+ Assert.assertNotNull(message);
+ //there is one extra null byte
+ assertEquals(data.length + 1, message.getBodyLength());
+ assertEquals(data[0], message.readByte());
+ assertEquals(data[1], message.readByte());
+ assertEquals(data[2], message.readByte());
+ assertEquals(data[3], message.readByte());
+ }
+
+ public void testSendMessageWithCustomHeadersAndSelector() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue, "foo =
'abc'");
+
+ connV11.connect(defUser, defPass);
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("foo", "abc");
+ frame.addHeader("bar", "123");
+
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.setBody("Hello World");
+
+ connV11.sendFrame(frame);
+
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+ Assert.assertEquals("foo", "abc",
message.getStringProperty("foo"));
+ Assert.assertEquals("bar", "123",
message.getStringProperty("bar"));
+ }
+
+ public void testSendMessageWithLeadingNewLine() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.setBody("Hello World");
+
+ connV11.sendWickedFrame(frame);
+
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+
+ // Make sure that the timestamp is valid - should
+ // be very close to the current time.
+ long tnow = System.currentTimeMillis();
+ long tmsg = message.getJMSTimestamp();
+ Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+ }
+
//-----------------private help methods
private void abortTransaction(StompClientConnection conn, String txID) throws
IOException, InterruptedException