Author: gaohoward
Date: 2011-09-19 09:17:33 -0400 (Mon, 19 Sep 2011)
New Revision: 11363
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/BindingsImpl.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryV11.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
tests
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/BindingsImpl.java 2011-09-19
04:53:40 UTC (rev 11362)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/BindingsImpl.java 2011-09-19
13:17:33 UTC (rev 11363)
@@ -238,15 +238,12 @@
{
if (binding.getFilter() == null || binding.getFilter().match(message))
{
- log.error("---------------------- route to binding: " +
binding);
binding.getBindable().route(message, context);
routed = true;
}
}
}
-
- log.error("-------- now routed is: " + routed);
if (!routed)
{
@@ -279,7 +276,6 @@
if (theBinding != null)
{
- log.error("------------------- route theBinding: " +
theBinding + " mesage: " + message);
theBinding.route(message, context);
}
}
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-09-19
04:53:40 UTC (rev 11362)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-09-19
13:17:33 UTC (rev 11363)
@@ -591,11 +591,7 @@
cleanupInternalPropertiesBeforeRouting(message);
}
- log.error("----------get address: " + address + " addressManager:
" + addressManager);
-
Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
-
- log.error("-------------------Bindings: " + bindings);
if (bindings != null)
{
@@ -634,7 +630,6 @@
}
else
{
- log.error("----------processing route: " + context + " direct
" + direct);
processRoute(message, context, direct);
}
@@ -971,8 +966,6 @@
message.incrementRefCount();
}
}
-
- log.error("In processing, tx: " + tx);
if (tx != null)
{
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2011-09-19
04:53:40 UTC (rev 11362)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2011-09-19
13:17:33 UTC (rev 11363)
@@ -220,7 +220,6 @@
// The actual send must be outside the lock, or with OIO transport, the write
can block if the tcp
// buffer is full, preventing any incoming buffers being handled and blocking
failover
- log.error("------------------------ write buffer " + connection);
connection.getTransportConnection().write(buffer, flush, batch);
}
}
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java 2011-09-19
04:53:40 UTC (rev 11362)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java 2011-09-19
13:17:33 UTC (rev 11363)
@@ -72,8 +72,6 @@
public int sendMessage(ServerMessage message, long consumerID, int deliveryCount)
{
Packet packet = new SessionReceiveMessage(consumerID, message, deliveryCount);
-
- log.error("------------------channel sent " + channel);
channel.sendBatched(packet);
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-19
04:53:40 UTC (rev 11362)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-19
13:17:33 UTC (rev 11363)
@@ -469,7 +469,6 @@
public void sendFrame(StompFrame frame)
{
- log.error("--------------- sending reply: " + frame);
manager.sendReply(this, frame);
}
@@ -529,9 +528,7 @@
}
try
{
- log.error("--------------------- sending mesage: " + message);
stompSession.getSession().send(message, true);
- log.error("----------------------sent by " +
stompSession.getSession());
}
catch (Exception e)
{
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java 2011-09-19
04:53:40 UTC (rev 11362)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java 2011-09-19
13:17:33 UTC (rev 11363)
@@ -579,8 +579,6 @@
data = data - pos;
// reset
-
- log.error("-------new Frame decoded: " + command + " headers
" + headers + " content " + content);
StompFrame ret = new StompFrame(command, headers, content);
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-09-19
04:53:40 UTC (rev 11362)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-09-19
13:17:33 UTC (rev 11363)
@@ -177,15 +177,11 @@
public String getEscapedKey()
{
- log.error("----------------key is : |" + key + "|");
- log.error("----------------esc'd: |" + escape(key) +
"|");
return escape(key);
}
public String getEscapedValue()
{
- log.error("----------------val is : |" + val + "|");
- log.error("----------------esc'd v: |" + escape(val) +
"|");
return escape(val);
}
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java 2011-09-19
04:53:40 UTC (rev 11362)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java 2011-09-19
13:17:33 UTC (rev 11363)
@@ -156,6 +156,7 @@
if (sub.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.CLIENT_INDIVIDUAL))
{
+ log.error("---------------------client-individual ack: " + id);
session.individualAcknowledge(consumerID, id);
}
else
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-09-19
04:53:40 UTC (rev 11362)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-09-19
13:17:33 UTC (rev 11363)
@@ -95,14 +95,10 @@
{
response = onUnknown(request.getCommand());
}
-
- log.error("-------------------- handled " + request);
if (response == null)
{
response = postprocess(request);
-
- log.error("---------------postprocessed response: " + response);
}
else
{
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-19
04:53:40 UTC (rev 11362)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-19
13:17:33 UTC (rev 11363)
@@ -153,7 +153,6 @@
@Override
public StompFrame onDisconnect(StompFrame frame)
{
- log.error("----------------- frame: " + frame);
if (this.heartBeater != null)
{
heartBeater.shutdown();
@@ -447,8 +446,6 @@
StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame,
deliveryCount);
- log.error("-------------------- frame created: " + frame);
-
return frame;
}
@@ -456,8 +453,6 @@
@Override
public void replySent(StompFrame reply)
{
- log.error("----------------------- reply sent notified: " + reply);
-
if (reply.getCommand().equals(Stomp.Responses.CONNECTED))
{
//kick off the pinger
@@ -574,8 +569,6 @@
{
dur2 = System.currentTimeMillis() - lastAccepted.get();
- log.error("-------------------------- dur2 is " + dur2);
-
if (dur2 > (2 * serverAcceptPing))
{
connection.disconnect();
@@ -614,21 +607,17 @@
try
{
- log.error("-------------------waiting for " + waitTime);
this.wait(waitTime);
- log.error("--------------------wake up " );
}
catch (InterruptedException e)
{
}
}
- log.error("-------------------------HeartBeat thread shut down!");
}
}
public void pingAccepted()
{
- log.error("------------------------Ping accepted!");
this.lastAccepted.set(System.currentTimeMillis());
}
}
@@ -638,7 +627,6 @@
{
if (heartBeater != null)
{
- log.error("----------------------PING accepted: " + request);
heartBeater.pingAccepted();
}
}
@@ -897,8 +885,6 @@
boolean isEscaping = false;
SimpleBytes holder = new SimpleBytes(1024);
- log.error("--------------------------------- Decoding command: " +
decoder.command);
-
outer: while (true)
{
byte b = decoder.workingBuffer[decoder.pos++];
@@ -990,8 +976,6 @@
}
holder.reset();
- log.error("---------- A new header decoded: " +
decoder.headerName + " : " + headerValue);
-
decoder.headers.put(decoder.headerName, headerValue);
if
(decoder.headerName.equals(StompDecoder.CONTENT_LENGTH_HEADER_NAME))
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java 2011-09-19
04:53:40 UTC (rev 11362)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java 2011-09-19
13:17:33 UTC (rev 11363)
@@ -71,8 +71,6 @@
}
// Add a newline to separate the headers from the content.
head.append(Stomp.NEWLINE);
-
- log.error("------------------------_______now head: " + head);
buffer.writeBytes(head.toString().getBytes("UTF-8"));
if (bytesBody != null)
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java 2011-09-19
04:53:40 UTC (rev 11362)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java 2011-09-19
13:17:33 UTC (rev 11363)
@@ -279,7 +279,6 @@
public void route(final ServerMessage message, final RoutingContext context) throws
Exception
{
- log.error("-------------------in queue route, context: " + context);
context.addQueue(address, this);
}
@@ -363,7 +362,6 @@
return;
}
- log.error("----------------checkingDirect " + checkDirect);
// The checkDirect flag is periodically set to true, if the delivery is specified
as direct then this causes the
// directDeliver flag to be re-computed resulting in direct delivery if the queue
is empty
// We don't recompute it on every delivery since executing isEmpty is expensive
for a ConcurrentQueue
@@ -386,13 +384,10 @@
checkDirect = false;
}
- log.error("-----now direct " + direct + " directDeliver " +
directDeliver );
if (direct && directDeliver && deliverDirect(ref))
{
return;
}
-
- log.error("------- ok, adding ref to the queue");
queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
@@ -401,8 +396,6 @@
directDeliver = false;
executor.execute(concurrentPoller);
-
- log.error("-----------executing : " + concurrentPoller);
}
public void deliverAsync()
@@ -1953,10 +1946,7 @@
HandleStatus status;
try
{
- log.error("-------------------Now let consumer " + consumer + "
handle " + reference);
status = consumer.handle(reference);
-
- log.error("-------------- returned status: " + status);
}
catch (Throwable t)
{
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-09-19
04:53:40 UTC (rev 11362)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-09-19
13:17:33 UTC (rev 11363)
@@ -286,7 +286,6 @@
}
else
{
- log.error("--------------------- deliver standard");
deliverStandardMessage(ref, message);
}
@@ -696,7 +695,6 @@
*/
private void deliverStandardMessage(final MessageReference ref, final ServerMessage
message)
{
- log.error("------------------ calling callback " + callback + " to
send message");
int packetSize = callback.sendMessage(message, id, ref.getDeliveryCount());
if (availableCredits != null)
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java 2011-09-19
04:53:40 UTC (rev 11362)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java 2011-09-19
13:17:33 UTC (rev 11363)
@@ -72,7 +72,6 @@
String data = new String(sb.toString());
- System.out.println("---------------------------full frame is : " +
data);
byte[] byteValue = data.getBytes("UTF-8");
ByteBuffer buffer = ByteBuffer.allocateDirect(byteValue.length);
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java 2011-09-19
04:53:40 UTC (rev 11362)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java 2011-09-19
13:17:33 UTC (rev 11363)
@@ -164,7 +164,6 @@
while (n >= 0)
{
- System.out.println("read " + n);
if (n > 0)
{
receiveBytes(n);
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryV11.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryV11.java 2011-09-19
04:53:40 UTC (rev 11362)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryV11.java 2011-09-19
13:17:33 UTC (rev 11363)
@@ -46,15 +46,9 @@
public ClientStompFrame createFrame(final String data)
{
- System.out.println("Data: |" + data + "|");
//split the string at "\n\n"
String[] dataFields = data.split("\n\n");
- System.out.println("DataFields[0] |" + dataFields[0]);
- if (dataFields.length > 1)
- {
- System.out.println("DataFields[1] |" + dataFields[1]);
- }
StringTokenizer tokenizer = new StringTokenizer(dataFields[0], "\n");
String command = tokenizer.nextToken();
@@ -63,7 +57,6 @@
while (tokenizer.hasMoreTokens())
{
String header = tokenizer.nextToken();
- System.out.println("header is: " + header);
String[] fields = splitHeader(header);
frame.addHeader(fields[0], fields[1]);
}
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-19
04:53:40 UTC (rev 11362)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-19
13:17:33 UTC (rev 11363)
@@ -21,6 +21,7 @@
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
import junit.framework.Assert;
@@ -545,6 +546,8 @@
nack(connV11, "sub1", messageID);
+ unsubscribe(connV11, "sub1");
+
connV11.disconnect();
//Nack makes the message be dropped.
@@ -573,6 +576,8 @@
System.out.println("Receiver error: " + error);
+ unsubscribe(connV11, "sub1");
+
connV11.disconnect();
//message should be still there
@@ -601,6 +606,8 @@
System.out.println("Receiver error: " + error);
+ unsubscribe(connV11, "sub1");
+
connV11.disconnect();
//message should still there
@@ -626,6 +633,8 @@
ack(connV11, "sub1", messageID);
+ unsubscribe(connV11, "sub1");
+
connV11.disconnect();
//Nack makes the message be dropped.
@@ -654,6 +663,8 @@
System.out.println("Receiver error: " + error);
+ unsubscribe(connV11, "sub1");
+
connV11.disconnect();
//message should be still there
@@ -682,6 +693,8 @@
System.out.println("Receiver error: " + error);
+ unsubscribe(connV11, "sub1");
+
connV11.disconnect();
//message should still there
@@ -718,6 +731,8 @@
assertEquals("answer-me", error.getHeader("receipt-id"));
+ unsubscribe(connV11, "sub1");
+
connV11.disconnect();
//message should still there
@@ -754,6 +769,8 @@
assertEquals("answer-me", error.getHeader("receipt-id"));
+ unsubscribe(connV11, "sub1");
+
connV11.disconnect();
//message should still there
@@ -786,6 +803,8 @@
//ack the last
this.ack(connV11, "sub1", frame);
+ unsubscribe(connV11, "sub1");
+
connV11.disconnect();
//no messages can be received.
@@ -821,6 +840,8 @@
}
}
+ unsubscribe(connV11, "sub1");
+
connV11.disconnect();
//no messages can be received.
@@ -852,6 +873,8 @@
assertNotNull(frame);
}
+ unsubscribe(connV11, "sub1");
+
connV11.disconnect();
//no messages can be received.
@@ -880,6 +903,7 @@
frame = connV11.receiveFrame();
assertNotNull(frame);
+ System.out.println(i + " == received: " + frame);
//ack on even numbers
if (i%2 == 0)
{
@@ -887,18 +911,23 @@
}
}
+ unsubscribe(connV11, "sub1");
+
connV11.disconnect();
//no messages can be received.
MessageConsumer consumer = session.createConsumer(queue);
- Message message = null;
+ TextMessage message = null;
for (int i = 0; i < num/2; i++)
{
- message = consumer.receive(1000);
+ message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
+ System.out.println("Legal: " + message.getText());
}
- message = consumer.receive(1000);
+
+ message = (TextMessage) consumer.receive(1000);
+
Assert.assertNull(message);
}
@@ -908,12 +937,15 @@
String messageID = frame.getHeader("message-id");
ClientStompFrame ackFrame = connV11.createFrame("ACK");
- //give it a wrong sub id
+
ackFrame.addHeader("subscription", subId);
ackFrame.addHeader("message-id", messageID);
- ackFrame.addHeader("receipt", "answer-me");
- connV11.sendFrame(ackFrame);
+ ClientStompFrame response = connV11.sendFrame(ackFrame);
+ if (response != null)
+ {
+ throw new IOException("failed to ack " + response);
+ }
}
private void ack(StompClientConnection conn, String subId, String mid) throws
IOException, InterruptedException
@@ -944,6 +976,14 @@
conn.sendFrame(subFrame);
}
+ private void unsubscribe(StompClientConnection conn, String subId) throws IOException,
InterruptedException
+ {
+ ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE");
+ subFrame.addHeader("id", subId);
+
+ conn.sendFrame(subFrame);
+ }
+
}