JBoss hornetq SVN: r11350 - in branches/STOMP11: hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11 and 1 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-09-15 02:25:53 -0400 (Thu, 15 Sep 2011)
New Revision: 11350
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
test
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java 2011-09-14 14:25:20 UTC (rev 11349)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java 2011-09-15 06:25:53 UTC (rev 11350)
@@ -86,6 +86,8 @@
StompFrame frame = connection.createStompMessage(serverMessage, subscription, deliveryCount);
+ log.error("--------------lllll- Sending frame: " + frame);
+
if (subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO))
{
session.acknowledge(consumerID, serverMessage.getMessageID());
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-14 14:25:20 UTC (rev 11349)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-15 06:25:53 UTC (rev 11350)
@@ -417,9 +417,11 @@
{
data = new byte[0];
}
+ frame.addHeader(Stomp.Headers.CONTENT_TYPE, "text/plain");
}
- frame.addHeader(Stomp.Headers.CONTENT_TYPE, "text/plain");
+ frame.setByteBody(data);
+
serverMessage.getBodyBuffer().resetReaderIndex();
StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
@@ -961,11 +963,27 @@
}
else
{
- content = new byte[decoder.contentLength];
+ content = new byte[decoder.contentLength + 1];
System.arraycopy(decoder.workingBuffer, decoder.pos, content, 0, decoder.contentLength);
decoder.pos += decoder.contentLength + 1;
+
+ content[decoder.contentLength] = 0;
+
+ //drain all the rest
+ if (decoder.bodyStart == -1)
+ {
+ decoder.bodyStart = decoder.pos;
+ }
+
+ while (decoder.pos < decoder.data)
+ {
+ if (decoder.workingBuffer[decoder.pos++] == 0)
+ {
+ break;
+ }
+ }
}
}
else
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-14 14:25:20 UTC (rev 11349)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-15 06:25:53 UTC (rev 11350)
@@ -224,8 +224,85 @@
unsubFrame.addHeader("id", "a-sub");
newConn.disconnect();
+ }
+
+ public void testHeaderContentType() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "application/xml");
+ frame.setBody("Hello World 1!");
+ connV11.sendFrame(frame);
+
+ //subscribe
+ StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ newConn.connect(defUser, defPass);
+
+ ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+
+ newConn.sendFrame(subFrame);
+
+ frame = newConn.receiveFrame();
+
+ System.out.println("received " + frame);
+
+ assertEquals("MESSAGE", frame.getCommand());
+
+ assertEquals("application/xml", frame.getHeader("content-type"));
+
+ //unsub
+ ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+
+ newConn.disconnect();
}
+
+ public void testHeaderContentLength() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+ ClientStompFrame frame = connV11.createFrame("SEND");
+
+ String body = "Hello World 1!";
+ String cLen = String.valueOf(body.getBytes("UTF-8").length);
+
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "application/xml");
+ frame.addHeader("content-length", cLen);
+ frame.setBody(body + "extra");
+
+ connV11.sendFrame(frame);
+
+ //subscribe
+ StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ newConn.connect(defUser, defPass);
+
+ ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+
+ newConn.sendFrame(subFrame);
+
+ frame = newConn.receiveFrame();
+
+ System.out.println("received " + frame);
+
+ assertEquals("MESSAGE", frame.getCommand());
+
+ assertEquals(cLen, frame.getHeader("content-length"));
+
+ //unsub
+ ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+
+ newConn.disconnect();
+ }
+
}
13 years, 3 months
JBoss hornetq SVN: r11349 - in branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp: v11 and 1 other directory.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-09-14 10:25:20 -0400 (Wed, 14 Sep 2011)
New Revision: 11349
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrame.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
tests
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java 2011-09-14 12:29:51 UTC (rev 11348)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java 2011-09-14 14:25:20 UTC (rev 11349)
@@ -111,6 +111,12 @@
this.body = body;
}
+ @Override
+ public String getBody()
+ {
+ return body;
+ }
+
private class Header
{
public String key;
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrame.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrame.java 2011-09-14 12:29:51 UTC (rev 11348)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrame.java 2011-09-14 14:25:20 UTC (rev 11349)
@@ -37,5 +37,7 @@
public String getCommand();
public String getHeader(String header);
+
+ public String getBody();
}
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java 2011-09-14 12:29:51 UTC (rev 11348)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java 2011-09-14 14:25:20 UTC (rev 11349)
@@ -36,7 +36,7 @@
public void connect(String username, String passcode) throws IOException, InterruptedException
{
ClientStompFrame frame = factory.newFrame(CONNECT_COMMAND);
- frame.addHeader(ACCEPT_HEADER, "1.0,1.1");
+ frame.addHeader(ACCEPT_HEADER, "1.1");
frame.addHeader(HOST_HEADER, "localhost");
if (username != null)
{
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-14 12:29:51 UTC (rev 11348)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-14 14:25:20 UTC (rev 11349)
@@ -86,7 +86,6 @@
{
// case 1 accept-version absent. It is a 1.0 connect
ClientStompFrame frame = connV11.createFrame("CONNECT");
- //frame.addHeader("accept-version", "1.0,1.1,1.2");
frame.addHeader("host", "127.0.0.1");
frame.addHeader("login", this.defUser);
frame.addHeader("passcode", this.defPass);
@@ -167,4 +166,69 @@
connV11.disconnect();
}
+
+ public void testSendAndReceive() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "text/plain");
+ frame.setBody("Hello World 1!");
+
+ ClientStompFrame response = connV11.sendFrame(frame);
+
+ assertNull(response);
+
+ frame.addHeader("receipt", "1234");
+ frame.setBody("Hello World 2!");
+
+ response = connV11.sendFrame(frame);
+
+ assertNotNull(response);
+
+ assertEquals("RECEIPT", response.getCommand());
+
+ assertEquals("1234", response.getHeader("receipt-id"));
+
+ //subscribe
+ StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ newConn.connect(defUser, defPass);
+
+ ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+
+ newConn.sendFrame(subFrame);
+
+ frame = newConn.receiveFrame();
+
+ System.out.println("received " + frame);
+
+ assertEquals("MESSAGE", frame.getCommand());
+
+ assertEquals("a-sub", frame.getHeader("subscription"));
+
+ assertNotNull(frame.getHeader("message-id"));
+
+ assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader("destination"));
+
+ assertEquals("Hello World 1!", frame.getBody());
+
+ frame = newConn.receiveFrame();
+
+ System.out.println("received " + frame);
+
+ //unsub
+ ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+
+ newConn.disconnect();
+
+ }
}
+
+
+
+
+
13 years, 3 months
JBoss hornetq SVN: r11348 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-09-14 08:29:51 -0400 (Wed, 14 Sep 2011)
New Revision: 11348
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
Log:
https://issues.jboss.org/browse/JBPAPP-5747
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-09-14 08:04:17 UTC (rev 11347)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-09-14 12:29:51 UTC (rev 11348)
@@ -798,7 +798,10 @@
public void addConsumer(final ClientConsumerInternal consumer)
{
- consumers.put(consumer.getID(), consumer);
+ synchronized (consumers)
+ {
+ consumers.put(consumer.getID(), consumer);
+ }
}
public void addProducer(final ClientProducerInternal producer)
@@ -808,7 +811,10 @@
public void removeConsumer(final ClientConsumerInternal consumer) throws HornetQException
{
- consumers.remove(consumer.getID());
+ synchronized (consumers)
+ {
+ consumers.remove(consumer.getID());
+ }
}
public void removeProducer(final ClientProducerInternal producer)
@@ -1899,9 +1905,12 @@
private void flushAcks() throws HornetQException
{
- for (ClientConsumerInternal consumer : consumers.values())
+ synchronized (consumers)
{
- consumer.flushAcks();
+ for (ClientConsumerInternal consumer : consumers.values())
+ {
+ consumer.flushAcks();
+ }
}
}
13 years, 3 months
JBoss hornetq SVN: r11347 - in branches/Branch_2_2_EAP/src/config: jboss-as-4/non-clustered and 4 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-09-14 04:04:17 -0400 (Wed, 14 Sep 2011)
New Revision: 11347
Modified:
branches/Branch_2_2_EAP/src/config/jboss-as-4/clustered/hornetq-configuration.xml
branches/Branch_2_2_EAP/src/config/jboss-as-4/non-clustered/hornetq-configuration.xml
branches/Branch_2_2_EAP/src/config/jboss-as-5/clustered/hornetq-configuration.xml
branches/Branch_2_2_EAP/src/config/jboss-as-5/non-clustered/hornetq-configuration.xml
branches/Branch_2_2_EAP/src/config/jboss-as-6/clustered/hornetq-configuration.xml
branches/Branch_2_2_EAP/src/config/jboss-as-6/non-clustered/hornetq-configuration.xml
Log:
https://issues.jboss.org/browse/JBPAPP-5760
Modified: branches/Branch_2_2_EAP/src/config/jboss-as-4/clustered/hornetq-configuration.xml
===================================================================
--- branches/Branch_2_2_EAP/src/config/jboss-as-4/clustered/hornetq-configuration.xml 2011-09-14 03:43:57 UTC (rev 11346)
+++ branches/Branch_2_2_EAP/src/config/jboss-as-4/clustered/hornetq-configuration.xml 2011-09-14 08:04:17 UTC (rev 11347)
@@ -116,7 +116,7 @@
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
- <redelivery-delay>0</redelivery-delay>
+ <redelivery-delay>60000</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
Modified: branches/Branch_2_2_EAP/src/config/jboss-as-4/non-clustered/hornetq-configuration.xml
===================================================================
--- branches/Branch_2_2_EAP/src/config/jboss-as-4/non-clustered/hornetq-configuration.xml 2011-09-14 03:43:57 UTC (rev 11346)
+++ branches/Branch_2_2_EAP/src/config/jboss-as-4/non-clustered/hornetq-configuration.xml 2011-09-14 08:04:17 UTC (rev 11347)
@@ -89,7 +89,7 @@
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
- <redelivery-delay>0</redelivery-delay>
+ <redelivery-delay>60000</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
Modified: branches/Branch_2_2_EAP/src/config/jboss-as-5/clustered/hornetq-configuration.xml
===================================================================
--- branches/Branch_2_2_EAP/src/config/jboss-as-5/clustered/hornetq-configuration.xml 2011-09-14 03:43:57 UTC (rev 11346)
+++ branches/Branch_2_2_EAP/src/config/jboss-as-5/clustered/hornetq-configuration.xml 2011-09-14 08:04:17 UTC (rev 11347)
@@ -116,7 +116,7 @@
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
- <redelivery-delay>0</redelivery-delay>
+ <redelivery-delay>60000</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
Modified: branches/Branch_2_2_EAP/src/config/jboss-as-5/non-clustered/hornetq-configuration.xml
===================================================================
--- branches/Branch_2_2_EAP/src/config/jboss-as-5/non-clustered/hornetq-configuration.xml 2011-09-14 03:43:57 UTC (rev 11346)
+++ branches/Branch_2_2_EAP/src/config/jboss-as-5/non-clustered/hornetq-configuration.xml 2011-09-14 08:04:17 UTC (rev 11347)
@@ -89,7 +89,7 @@
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
- <redelivery-delay>0</redelivery-delay>
+ <redelivery-delay>60000</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
Modified: branches/Branch_2_2_EAP/src/config/jboss-as-6/clustered/hornetq-configuration.xml
===================================================================
--- branches/Branch_2_2_EAP/src/config/jboss-as-6/clustered/hornetq-configuration.xml 2011-09-14 03:43:57 UTC (rev 11346)
+++ branches/Branch_2_2_EAP/src/config/jboss-as-6/clustered/hornetq-configuration.xml 2011-09-14 08:04:17 UTC (rev 11347)
@@ -116,7 +116,7 @@
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
- <redelivery-delay>0</redelivery-delay>
+ <redelivery-delay>60000</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
Modified: branches/Branch_2_2_EAP/src/config/jboss-as-6/non-clustered/hornetq-configuration.xml
===================================================================
--- branches/Branch_2_2_EAP/src/config/jboss-as-6/non-clustered/hornetq-configuration.xml 2011-09-14 03:43:57 UTC (rev 11346)
+++ branches/Branch_2_2_EAP/src/config/jboss-as-6/non-clustered/hornetq-configuration.xml 2011-09-14 08:04:17 UTC (rev 11347)
@@ -89,7 +89,7 @@
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
- <redelivery-delay>0</redelivery-delay>
+ <redelivery-delay>60000</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
13 years, 3 months
JBoss hornetq SVN: r11346 - in branches/STOMP11: tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util and 1 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-09-13 23:43:57 -0400 (Tue, 13 Sep 2011)
New Revision: 11346
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
test
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-13 15:05:32 UTC (rev 11345)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-14 03:43:57 UTC (rev 11346)
@@ -442,7 +442,8 @@
{
if (!initialized)
{
- if (!Stomp.Commands.CONNECT.equals(request.getCommand()))
+ String cmd = request.getCommand();
+ if ( ! (Stomp.Commands.CONNECT.equals(cmd) || Stomp.Commands.STOMP.equals(cmd)))
{
throw new HornetQStompException("Connection hasn't been established.");
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java 2011-09-13 15:05:32 UTC (rev 11345)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java 2011-09-14 03:43:57 UTC (rev 11346)
@@ -104,6 +104,8 @@
public static final byte E = (byte)'E';
+ public static final byte T = (byte)'T';
+
public static final byte M = (byte)'M';
public static final byte S = (byte)'S';
@@ -367,6 +369,16 @@
// SEND
command = COMMAND_SEND;
}
+ else if (workingBuffer[offset + 1] == T)
+ {
+ if (!tryIncrement(offset + COMMAND_STOMP_LENGTH + 1))
+ {
+ return null;
+ }
+
+ // SEND
+ command = COMMAND_STOMP;
+ }
else
{
if (!tryIncrement(offset + COMMAND_SUBSCRIBE_LENGTH + 1))
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java 2011-09-13 15:05:32 UTC (rev 11345)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java 2011-09-14 03:43:57 UTC (rev 11346)
@@ -179,7 +179,6 @@
public void connect() throws Exception
{
connect(null, null);
- connected = true;
}
public void connect(String username, String password) throws Exception
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java 2011-09-13 15:05:32 UTC (rev 11345)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java 2011-09-14 03:43:57 UTC (rev 11346)
@@ -35,9 +35,16 @@
frame.addHeader(PASSCODE_HEADER, passcode);
ClientStompFrame response = this.sendFrame(frame);
- System.out.println("Got response : " + response);
- connected = true;
+ if (response.getCommand().equals(CONNECTED_COMMAND))
+ {
+ connected = true;
+ }
+ else
+ {
+ System.out.println("Connection failed with: " + response);
+ connected = false;
+ }
}
@Override
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java 2011-09-13 15:05:32 UTC (rev 11345)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java 2011-09-14 03:43:57 UTC (rev 11346)
@@ -55,14 +55,17 @@
this.passcode = passcode;
this.connected = true;
}
- connected = true;
+ else
+ {
+ connected = false;
+ }
}
public void connect1(String username, String passcode) throws IOException, InterruptedException
{
ClientStompFrame frame = factory.newFrame(STOMP_COMMAND);
frame.addHeader(ACCEPT_HEADER, "1.0,1.1");
- frame.addHeader(HOST_HEADER, "localhost");
+ frame.addHeader(HOST_HEADER, "127.0.0.1");
if (username != null)
{
frame.addHeader(LOGIN_HEADER, username);
@@ -80,6 +83,11 @@
this.passcode = passcode;
this.connected = true;
}
+ else
+ {
+ System.out.println("Connection failed with frame " + response);
+ connected = false;
+ }
}
@Override
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-13 15:05:32 UTC (rev 11345)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-14 03:43:57 UTC (rev 11346)
@@ -21,6 +21,7 @@
import org.hornetq.tests.integration.stomp.util.ClientStompFrame;
import org.hornetq.tests.integration.stomp.util.StompClientConnection;
import org.hornetq.tests.integration.stomp.util.StompClientConnectionFactory;
+import org.hornetq.tests.integration.stomp.util.StompClientConnectionV11;
public class StompTestV11 extends StompTestBase2
@@ -65,18 +66,25 @@
assertEquals("1.1", connection.getVersion());
connection.disconnect();
+
+ connection = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+
+ connection.connect();
+
+ assertFalse(connection.isConnected());
+
+ //new way of connection
+ StompClientConnectionV11 conn = (StompClientConnectionV11) StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ conn.connect1(defUser, defPass);
+
+ assertTrue(conn.isConnected());
+
+ conn.disconnect();
}
- /*
- * test case:
- * 1 accept-version absent. It is a 1.0 connect
- * 2 accept-version=1.0, result: 1.0
- * 3 accept-version=1.0,1.1,1.2, result 1.1
- * 4 accept-version="1.2,1.3", result error
- */
public void testNegotiation() throws Exception
{
- // case 1
+ // case 1 accept-version absent. It is a 1.0 connect
ClientStompFrame frame = connV11.createFrame("CONNECT");
//frame.addHeader("accept-version", "1.0,1.1,1.2");
frame.addHeader("host", "127.0.0.1");
@@ -91,6 +99,72 @@
assertEquals(null, reply.getHeader("version"));
connV11.disconnect();
+
+ // case 2 accept-version=1.0, result: 1.0
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ frame = connV11.createFrame("CONNECT");
+ frame.addHeader("accept-version", "1.0");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ reply = connV11.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+
+ //reply headers: version, session, server
+ assertEquals("1.0", reply.getHeader("version"));
+
+ connV11.disconnect();
+
+ // case 3 accept-version=1.1, result: 1.1
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ frame = connV11.createFrame("CONNECT");
+ frame.addHeader("accept-version", "1.1");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+
+ reply = connV11.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+
+ //reply headers: version, session, server
+ assertEquals("1.1", reply.getHeader("version"));
+
+ connV11.disconnect();
+
+ // case 4 accept-version=1.0,1.1,1.2, result 1.1
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ frame = connV11.createFrame("CONNECT");
+ frame.addHeader("accept-version", "1.0,1.1,1.2");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+
+ reply = connV11.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+
+ //reply headers: version, session, server
+ assertEquals("1.1", reply.getHeader("version"));
+
+ connV11.disconnect();
+
+ // case 5 accept-version=1.2, result error
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ frame = connV11.createFrame("CONNECT");
+ frame.addHeader("accept-version", "1.2");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+
+ reply = connV11.sendFrame(frame);
+
+ assertEquals("ERROR", reply.getCommand());
+
+ System.out.println("Got error frame " + reply);
+
+ connV11.disconnect();
}
}
13 years, 3 months
JBoss hornetq SVN: r11345 - in branches/STOMP11: hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10 and 1 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-09-13 11:05:32 -0400 (Tue, 13 Sep 2011)
New Revision: 11345
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
test
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-13 14:44:04 UTC (rev 11344)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-13 15:05:32 UTC (rev 11345)
@@ -371,6 +371,8 @@
{
String acceptVersion = frame.getHeader(Stomp.Headers.ACCEPT_VERSION);
+ log.error("----------------- acceptVersion: " + acceptVersion);
+
if (acceptVersion == null)
{
this.version = StompVersions.V1_0;
@@ -401,6 +403,7 @@
error.setBody("Supported protocol version are " + manager.getSupportedVersionsAsString());
throw error;
}
+ log.error("------------------ negotiated version is " + this.version);
}
this.frameHandler = VersionedStompFrameHandler.getHandler(this, this.version);
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-09-13 14:44:04 UTC (rev 11344)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-09-13 15:05:32 UTC (rev 11345)
@@ -70,6 +70,11 @@
response = new StompFrameV10(Stomp.Responses.CONNECTED);
+ if (frame.hasHeader(Stomp.Headers.ACCEPT_VERSION))
+ {
+ response.addHeader(Stomp.Headers.Connected.VERSION, "1.0");
+ }
+
response.addHeader(Stomp.Headers.Connected.SESSION, connection.getID().toString());
if (requestID != null)
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-13 14:44:04 UTC (rev 11344)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-13 15:05:32 UTC (rev 11345)
@@ -27,22 +27,16 @@
{
private static final transient Logger log = Logger.getLogger(StompTestV11.class);
- private StompClientConnection connV10;
private StompClientConnection connV11;
protected void setUp() throws Exception
{
super.setUp();
- connV10 = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
}
protected void tearDown() throws Exception
{
- if (connV10.isConnected())
- {
- connV10.disconnect();
- }
if (connV11.isConnected())
{
connV11.disconnect();
@@ -73,10 +67,18 @@
connection.disconnect();
}
+ /*
+ * test case:
+ * 1 accept-version absent. It is a 1.0 connect
+ * 2 accept-version=1.0, result: 1.0
+ * 3 accept-version=1.0,1.1,1.2, result 1.1
+ * 4 accept-version="1.2,1.3", result error
+ */
public void testNegotiation() throws Exception
{
+ // case 1
ClientStompFrame frame = connV11.createFrame("CONNECT");
- frame.addHeader("accept-version", "1.0,1.1");
+ //frame.addHeader("accept-version", "1.0,1.1,1.2");
frame.addHeader("host", "127.0.0.1");
frame.addHeader("login", this.defUser);
frame.addHeader("passcode", this.defPass);
@@ -86,20 +88,8 @@
assertEquals("CONNECTED", reply.getCommand());
//reply headers: version, session, server
- assertEquals("1.1", reply.getHeader("version"));
+ assertEquals(null, reply.getHeader("version"));
- String sessionId = reply.getHeader("session");
-
- log.info("session id: " + sessionId);
-
- assertNotNull(sessionId);
-
- String server = reply.getHeader("server");
-
- log.info("server: " + server);
-
- assertNotNull(server);
-
connV11.disconnect();
}
13 years, 3 months
JBoss hornetq SVN: r11344 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-13 10:44:04 -0400 (Tue, 13 Sep 2011)
New Revision: 11344
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
Log:
clean up of locking and scheduled threads
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-09-13 14:43:29 UTC (rev 11343)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-09-13 14:44:04 UTC (rev 11344)
@@ -238,6 +238,11 @@
{
for (PageSubscription cursor : activeCursors.values())
{
+ cursor.disableAutoCleanup();
+ }
+
+ for (PageSubscription cursor : activeCursors.values())
+ {
cursor.stop();
}
@@ -247,7 +252,7 @@
while (!future.await(10000))
{
- log.warn("Waiting cursor provider " + this + " to finish executors");
+ log.warn("Waiting cursor provider " + this + " to finish executors" + executor);
}
}
@@ -265,7 +270,7 @@
while (!future.await(10000))
{
- log.warn("Waiting cursor provider " + this + " to finish executors");
+ log.warn("Waiting cursor provider " + this + " to finish executors " + executor);
}
}
@@ -463,7 +468,7 @@
* @param currentPage
* @throws Exception
*/
- protected void storePositions(ArrayList<PageSubscription> cursorList, Page currentPage) throws Exception
+ private void storePositions(ArrayList<PageSubscription> cursorList, Page currentPage) throws Exception
{
try
{
@@ -500,8 +505,7 @@
// Protected -----------------------------------------------------
- /* Protected as we may let test cases to instrument the test */
- protected PageCacheImpl createPageCache(final long pageId) throws Exception
+ private PageCacheImpl createPageCache(final long pageId) throws Exception
{
return new PageCacheImpl(pagingStore.createPage((int)pageId));
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-09-13 14:43:29 UTC (rev 11343)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-09-13 14:44:04 UTC (rev 11344)
@@ -59,8 +59,6 @@
*
* A page cursor will always store its
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
- *
- *
*/
class PageSubscriptionImpl implements PageSubscription
{
@@ -161,11 +159,6 @@
autoCleanup = true;
}
- public PageCursorProvider getProvider()
- {
- return cursorProvider;
- }
-
public void bookmark(PagePosition position) throws Exception
{
PageCursorInfo cursorInfo = getPageInfo(position);
@@ -206,6 +199,12 @@
PageSubscriptionImpl.log.warn("Error on cleaning up cursor pages", e);
}
}
+
+ @Override
+ public String toString()
+ {
+ return "PageSubscription.scheduleCleanupCheck()";
+ }
});
}
}
@@ -215,6 +214,8 @@
* */
public void cleanupEntries() throws Exception
{
+ if (!autoCleanup)
+ return;
Transaction tx = new TransactionImpl(store);
boolean persist = false;
@@ -664,7 +665,7 @@
executor.execute(future);
while (!future.await(1000))
{
- PageSubscriptionImpl.log.warn("Waiting page cursor to finish executors - " + this);
+ PageSubscriptionImpl.log.warn("Waiting page cursor to finish executors - " + this + "\n" + executor);
}
}
13 years, 3 months
JBoss hornetq SVN: r11343 - in branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging: impl and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-13 10:43:29 -0400 (Tue, 13 Sep 2011)
New Revision: 11343
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
Log:
FIX dead-lock: Mark PagingStore as closed, before trying to stop SubscriptionProviders
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-09-13 14:42:49 UTC (rev 11342)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-09-13 14:43:29 UTC (rev 11343)
@@ -350,6 +350,12 @@
break;
}
}
+
+ if (!pagingStore.isStarted())
+ {
+ return;
+ }
+
if (complete)
{
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingManagerImpl.java 2011-09-13 14:42:49 UTC (rev 11342)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingManagerImpl.java 2011-09-13 14:43:29 UTC (rev 11343)
@@ -227,18 +227,18 @@
}
}
- public void stop() throws Exception
+ public synchronized void stop() throws Exception
{
+ if (!started)
+ {
+ return;
+ }
+ started = false;
+
lock();
try
{
- if (!started)
- {
- return;
- }
- started = false;
-
for (PagingStore store : stores.values())
{
store.stop();
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-09-13 14:42:49 UTC (rev 11342)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-09-13 14:43:29 UTC (rev 11343)
@@ -391,10 +391,10 @@
{
if (running)
{
+ running = false;
+
cursorProvider.stop();
- running = false;
-
flushExecutors();
if (currentPage != null)
13 years, 3 months
JBoss hornetq SVN: r11342 - in branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core: paging/cursor/impl and 1 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-13 10:42:49 -0400 (Tue, 13 Sep 2011)
New Revision: 11342
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
Log:
Delete unused code.
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/PageCursorProvider.java 2011-09-13 14:13:22 UTC (rev 11341)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/PageCursorProvider.java 2011-09-13 14:42:49 UTC (rev 11342)
@@ -15,7 +15,6 @@
import org.hornetq.core.filter.Filter;
import org.hornetq.core.paging.PagedMessage;
-import org.hornetq.core.paging.PagingStore;
/**
* The provider of Cursor for a given Address
@@ -43,8 +42,6 @@
void addPageCache(PageCache cache);
- PagingStore getAssociatedStore();
-
/**
*
* @param queueId The cursorID should be the same as the queueId associated for persistance
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-09-13 14:13:22 UTC (rev 11341)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-09-13 14:42:49 UTC (rev 11342)
@@ -80,11 +80,6 @@
// Public --------------------------------------------------------
- public PagingStore getAssociatedStore()
- {
- return pagingStore;
- }
-
public synchronized PageSubscription createSubscription(long cursorID, Filter filter, boolean persistent)
{
PageSubscription activeCursor = activeCursors.get(cursorID);
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-09-13 14:13:22 UTC (rev 11341)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-09-13 14:42:49 UTC (rev 11342)
@@ -104,9 +104,6 @@
private final ConcurrentMap<Long, LargeServerMessage> largeMessages =
new ConcurrentHashMap<Long, LargeServerMessage>();
- // Used on tests, to simulate failures on delete pages
- private boolean deletePages = true;
-
private boolean started;
// Constructors --------------------------------------------------
@@ -349,12 +346,6 @@
}
- /** Used on tests only. To simulate missing page deletes*/
- public void setDeletePages(final boolean deletePages)
- {
- this.deletePages = deletePages;
- }
-
/**
* @param journalInformation
*/
@@ -736,10 +727,7 @@
{
if (packet.isDelete())
{
- if (deletePages)
- {
- page.delete(null);
- }
+ page.delete(null);
}
else
{
13 years, 3 months
JBoss hornetq SVN: r11341 - in branches/STOMP11: hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11 and 2 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-09-13 10:13:22 -0400 (Tue, 13 Sep 2011)
New Revision: 11341
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompVersions.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
more tests
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-09-13 11:15:17 UTC (rev 11340)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-09-13 14:13:22 UTC (rev 11341)
@@ -20,6 +20,7 @@
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -184,13 +185,41 @@
return escape(val);
}
- private String escape(String str)
+ public static String escape(String str)
{
- str = str.replaceAll("\n", "\\n");
- str = str.replaceAll("\\", "\\\\");
- str = str.replaceAll(":", "\\:");
+ int len = str.length();
- return str;
+ char[] buffer = new char[2*len];
+ int iBuffer = 0;
+ for (int i = 0; i < len; i++)
+ {
+ char c = str.charAt(i);
+ if (c == '\n')
+ {
+ buffer[iBuffer++] = '\\';
+ buffer[iBuffer] = 'n';
+ }
+ else if (c == '\\')
+ {
+ buffer[iBuffer++] = '\\';
+ buffer[iBuffer] = '\\';
+ }
+ else if (c == ':')
+ {
+ buffer[iBuffer++] = '\\';
+ buffer[iBuffer] = ':';
+ }
+ else
+ {
+ buffer[iBuffer] = c;
+ }
+ iBuffer++;
+ }
+
+ char[] total = new char[iBuffer];
+ System.arraycopy(buffer, 0, total, 0, iBuffer);
+
+ return new String(total);
}
}
@@ -232,4 +261,9 @@
{
this.bytesBody = content;
}
+
+ public void setNeedsDisconnect(boolean b)
+ {
+ disconnect = b;
+ }
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompVersions.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompVersions.java 2011-09-13 11:15:17 UTC (rev 11340)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompVersions.java 2011-09-13 14:13:22 UTC (rev 11341)
@@ -21,6 +21,15 @@
*/
public enum StompVersions
{
- V1_0,
- V1_1
+ V1_0,
+ V1_1;
+
+ public String toString()
+ {
+ if (this == V1_0)
+ {
+ return "1.0";
+ }
+ return "1.1";
+ }
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-09-13 11:15:17 UTC (rev 11340)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-09-13 14:13:22 UTC (rev 11341)
@@ -15,6 +15,7 @@
import java.io.UnsupportedEncodingException;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.stomp.v10.StompFrameHandlerV10;
import org.hornetq.core.protocol.stomp.v11.StompFrameHandlerV11;
import org.hornetq.core.server.ServerMessage;
@@ -24,7 +25,9 @@
* @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
*/
public abstract class VersionedStompFrameHandler
-{
+{
+ private static final Logger log = Logger.getLogger(VersionedStompFrameHandler.class);
+
protected StompConnection connection;
public static VersionedStompFrameHandler getHandler(StompConnection connection, StompVersions version)
@@ -93,9 +96,13 @@
response = onUnknown(request.getCommand());
}
- if (request.hasHeader(Stomp.Headers.RECEIPT_REQUESTED) && (response == null))
+ log.error("-------------------- handled " + request);
+
+ if (response == null)
{
- response = handleReceipt(request.getHeader(Stomp.Headers.RECEIPT_REQUESTED));
+ response = postprocess(request);
+
+ log.error("---------------postprocessed response: " + response);
}
return response;
@@ -126,6 +133,11 @@
return receipt;
}
+
+ public StompFrame postprocess(StompFrame request)
+ {
+ return null;
+ }
public abstract StompFrame createMessageFrame(ServerMessage serverMessage,
StompSubscription subscription, int deliveryCount) throws Exception;
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-13 11:15:17 UTC (rev 11340)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-13 14:13:22 UTC (rev 11341)
@@ -146,9 +146,25 @@
@Override
public StompFrame onDisconnect(StompFrame frame)
{
- connection.destroy();
+ log.error("----------------- frame: " + frame);
+
return null;
}
+
+ @Override
+ public StompFrame postprocess(StompFrame request)
+ {
+ StompFrame response = null;
+ if (request.hasHeader(Stomp.Headers.RECEIPT_REQUESTED))
+ {
+ response = handleReceipt(request.getHeader(Stomp.Headers.RECEIPT_REQUESTED));
+ if (request.getCommand().equals(Stomp.Commands.DISCONNECT))
+ {
+ response.setNeedsDisconnect(true);
+ }
+ }
+ return response;
+ }
@Override
public StompFrame onSend(StompFrame frame)
@@ -415,6 +431,8 @@
@Override
public void replySent(StompFrame reply)
{
+ log.error("----------------------- reply sent notified: " + reply);
+
if (reply.getCommand().equals(Stomp.Responses.CONNECTED))
{
//kick off the pinger
@@ -807,8 +825,10 @@
// Now the headers
boolean isEscaping = false;
- SimpleBytes holder = new SimpleBytes(1024);
+ SimpleBytes holder = new SimpleBytes(1024);
+ log.error("--------------------------------- Decoding command: " + decoder.command);
+
outer: while (true)
{
byte b = decoder.workingBuffer[decoder.pos++];
@@ -887,6 +907,8 @@
}
holder.reset();
+ log.error("---------- A new header decoded: " + decoder.headerName + " : " + headerValue);
+
decoder.headers.put(decoder.headerName, headerValue);
if (decoder.headerName.equals(StompDecoder.CONTENT_LENGTH_HEADER_NAME))
@@ -914,6 +936,8 @@
decoder.whiteSpaceOnly = false;
decoder.headerValueWhitespace = false;
+
+ holder.append(b);
}
}
if (decoder.pos == decoder.data)
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java 2011-09-13 11:15:17 UTC (rev 11340)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java 2011-09-13 14:13:22 UTC (rev 11341)
@@ -49,7 +49,14 @@
{
if (buffer == null)
{
- buffer = HornetQBuffers.dynamicBuffer(bytesBody.length + 512);
+ if (bytesBody != null)
+ {
+ buffer = HornetQBuffers.dynamicBuffer(bytesBody.length + 512);
+ }
+ else
+ {
+ buffer = HornetQBuffers.dynamicBuffer(512);
+ }
StringBuffer head = new StringBuffer();
head.append(command);
@@ -66,7 +73,11 @@
head.append(Stomp.NEWLINE);
buffer.writeBytes(head.toString().getBytes("UTF-8"));
- buffer.writeBytes(bytesBody);
+ if (bytesBody != null)
+ {
+ buffer.writeBytes(bytesBody);
+ }
+
buffer.writeBytes(END_OF_FRAME);
size = buffer.writerIndex();
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java 2011-09-13 11:15:17 UTC (rev 11340)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java 2011-09-13 14:13:22 UTC (rev 11341)
@@ -64,10 +64,15 @@
sb.append(headers.get(i).key + ":" + headers.get(i).val + "\n");
}
sb.append("\n");
- sb.append(body);
+ if (body != null)
+ {
+ sb.append(body);
+ }
sb.append((char)0);
String data = new String(sb.toString());
+
+ System.out.println("---------------------------full frame is : " + data);
byte[] byteValue = data.getBytes("UTF-8");
ByteBuffer buffer = ByteBuffer.allocateDirect(byteValue.length);
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java 2011-09-13 11:15:17 UTC (rev 11340)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java 2011-09-13 14:13:22 UTC (rev 11341)
@@ -129,7 +129,6 @@
}
else
{
- System.out.println("Added to list: " + b);
receiveList.add(b);
}
}
@@ -180,11 +179,22 @@
public void connect() throws Exception
{
connect(null, null);
+ connected = true;
}
public void connect(String username, String password) throws Exception
{
throw new RuntimeException("connect method not implemented!");
}
+
+ public boolean isConnected()
+ {
+ return connected;
+ }
+
+ public String getVersion()
+ {
+ return version;
+ }
}
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java 2011-09-13 11:15:17 UTC (rev 11340)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java 2011-09-13 14:13:22 UTC (rev 11341)
@@ -29,6 +29,14 @@
void connect() throws Exception;
void disconnect() throws IOException, InterruptedException;
+
+ void connect(String defUser, String defPass) throws Exception;
+
+ boolean isConnected();
+
+ String getVersion();
+
+ ClientStompFrame createFrame(String command);
}
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java 2011-09-13 11:15:17 UTC (rev 11340)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java 2011-09-13 14:13:22 UTC (rev 11341)
@@ -36,6 +36,8 @@
ClientStompFrame response = this.sendFrame(frame);
System.out.println("Got response : " + response);
+
+ connected = true;
}
@Override
@@ -45,5 +47,15 @@
this.sendFrame(frame);
close();
+
+ connected = false;
}
+
+ @Override
+ public ClientStompFrame createFrame(
+ String command)
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java 2011-09-13 11:15:17 UTC (rev 11340)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java 2011-09-13 14:13:22 UTC (rev 11341)
@@ -55,6 +55,7 @@
this.passcode = passcode;
this.connected = true;
}
+ connected = true;
}
public void connect1(String username, String passcode) throws IOException, InterruptedException
@@ -85,11 +86,18 @@
public void disconnect() throws IOException, InterruptedException
{
ClientStompFrame frame = factory.newFrame(DISCONNECT_COMMAND);
- frame.addHeader(RECEIPT_HEADER, "77");
- this.sendFrame(frame);
+ ClientStompFrame result = this.sendFrame(frame);
close();
+
+ connected = false;
}
+ @Override
+ public ClientStompFrame createFrame(String command)
+ {
+ return new ClientStompFrameV11(command);
+ }
+
}
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-13 11:15:17 UTC (rev 11340)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-13 14:13:22 UTC (rev 11341)
@@ -18,14 +18,89 @@
package org.hornetq.tests.integration.stomp.v11;
import org.hornetq.core.logging.Logger;
+import org.hornetq.tests.integration.stomp.util.ClientStompFrame;
import org.hornetq.tests.integration.stomp.util.StompClientConnection;
+import org.hornetq.tests.integration.stomp.util.StompClientConnectionFactory;
+
public class StompTestV11 extends StompTestBase2
{
private static final transient Logger log = Logger.getLogger(StompTestV11.class);
+ private StompClientConnection connV10;
+ private StompClientConnection connV11;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ connV10 = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ if (connV10.isConnected())
+ {
+ connV10.disconnect();
+ }
+ if (connV11.isConnected())
+ {
+ connV11.disconnect();
+ }
+ super.tearDown();
+ }
+
public void testConnection() throws Exception
{
- StompClientConnection connection = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ StompClientConnection connection = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
+
+ connection.connect(defUser, defPass);
+
+ assertTrue(connection.isConnected());
+
+ assertEquals("1.0", connection.getVersion());
+
+ connection.disconnect();
+
+ connection = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+
+ connection.connect(defUser, defPass);
+
+ assertTrue(connection.isConnected());
+
+ assertEquals("1.1", connection.getVersion());
+
+ connection.disconnect();
}
+
+ public void testNegotiation() throws Exception
+ {
+ ClientStompFrame frame = connV11.createFrame("CONNECT");
+ frame.addHeader("accept-version", "1.0,1.1");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+
+ ClientStompFrame reply = connV11.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+
+ //reply headers: version, session, server
+ assertEquals("1.1", reply.getHeader("version"));
+
+ String sessionId = reply.getHeader("session");
+
+ log.info("session id: " + sessionId);
+
+ assertNotNull(sessionId);
+
+ String server = reply.getHeader("server");
+
+ log.info("server: " + server);
+
+ assertNotNull(server);
+
+ connV11.disconnect();
+
+ }
}
13 years, 3 months