Author: gaohoward
Date: 2011-09-11 21:47:54 -0400 (Sun, 11 Sep 2011)
New Revision: 11317
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/BindingsImpl.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServers.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/spi/core/security/HornetQSecurityManagerImpl.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompConnectionCleanupTest.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTest.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTestBase.java
Log:
fixed some stomp tests, with some dirty debug logs, remove later.
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/BindingsImpl.java 2011-09-10
02:13:50 UTC (rev 11316)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/BindingsImpl.java 2011-09-12
01:47:54 UTC (rev 11317)
@@ -238,12 +238,15 @@
{
if (binding.getFilter() == null || binding.getFilter().match(message))
{
+ log.error("---------------------- route to binding: " +
binding);
binding.getBindable().route(message, context);
routed = true;
}
}
}
+
+ log.error("-------- now routed is: " + routed);
if (!routed)
{
@@ -276,6 +279,7 @@
if (theBinding != null)
{
+ log.error("------------------- route theBinding: " +
theBinding + " mesage: " + message);
theBinding.route(message, context);
}
}
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-09-10
02:13:50 UTC (rev 11316)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-09-12
01:47:54 UTC (rev 11317)
@@ -591,8 +591,11 @@
cleanupInternalPropertiesBeforeRouting(message);
}
+ log.error("----------get address: " + address + " addressManager:
" + addressManager);
Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
+
+ log.error("-------------------Bindings: " + bindings);
if (bindings != null)
{
@@ -631,6 +634,7 @@
}
else
{
+ log.error("----------processing route: " + context + " direct
" + direct);
processRoute(message, context, direct);
}
@@ -967,6 +971,8 @@
message.incrementRefCount();
}
}
+
+ log.error("In processing, tx: " + tx);
if (tx != null)
{
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2011-09-10
02:13:50 UTC (rev 11316)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2011-09-12
01:47:54 UTC (rev 11317)
@@ -220,6 +220,7 @@
// The actual send must be outside the lock, or with OIO transport, the write
can block if the tcp
// buffer is full, preventing any incoming buffers being handled and blocking
failover
+ log.error("------------------------ write buffer " + connection);
connection.getTransportConnection().write(buffer, flush, batch);
}
}
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java 2011-09-10
02:13:50 UTC (rev 11316)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java 2011-09-12
01:47:54 UTC (rev 11317)
@@ -73,6 +73,8 @@
{
Packet packet = new SessionReceiveMessage(consumerID, message, deliveryCount);
+ log.error("------------------channel sent " + channel);
+
channel.sendBatched(packet);
int size = packet.getPacketSize();
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-10
02:13:50 UTC (rev 11316)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-12
01:47:54 UTC (rev 11317)
@@ -55,6 +55,7 @@
private String clientID;
+ //this means login is valid. (stomp connection ok)
private boolean valid;
private boolean destroyed = false;
@@ -75,6 +76,7 @@
private VersionedStompFrameHandler frameHandler;
+ //this means the version negotiation done.
private boolean initialized;
private FrameEventListener stompListener;
@@ -459,6 +461,7 @@
public void sendFrame(StompFrame frame)
{
+ log.error("--------------- sending reply: " + frame);
manager.sendReply(this, frame);
}
@@ -518,7 +521,9 @@
}
try
{
+ log.error("--------------------- sending mesage: " + message);
stompSession.getSession().send(message, true);
+ log.error("----------------------sent by " +
stompSession.getSession());
}
catch (Exception e)
{
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java 2011-09-10
02:13:50 UTC (rev 11316)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java 2011-09-12
01:47:54 UTC (rev 11317)
@@ -565,6 +565,8 @@
data = data - pos;
// reset
+
+ log.error("-------new Frame decoded: " + command + " headers
" + headers + " content " + content);
StompFrame ret = new StompFrame(command, headers, content);
@@ -588,6 +590,8 @@
pos = 0;
command = null;
+
+ headers = new HashMap<String, String>();
this.headerBytesCopyStart = -1;
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-09-10
02:13:50 UTC (rev 11316)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-09-12
01:47:54 UTC (rev 11317)
@@ -94,7 +94,7 @@
@Override
public String toString()
{
- return "StompFrame[command=" + command + ", headers=" + headers
+ ", content-length=";
+ return "StompFrame[command=" + command + ", headers=" + headers
+ ", content= " + this.body + " bytes " + this.bytesBody;
}
public String asString()
@@ -113,7 +113,14 @@
{
if (buffer == null)
{
- buffer = HornetQBuffers.dynamicBuffer(bytesBody.length + 512);
+ if (bytesBody != null)
+ {
+ buffer = HornetQBuffers.dynamicBuffer(bytesBody.length + 512);
+ }
+ else
+ {
+ buffer = HornetQBuffers.dynamicBuffer(512);
+ }
StringBuffer head = new StringBuffer();
head.append(command);
@@ -130,7 +137,10 @@
head.append(Stomp.NEWLINE);
buffer.writeBytes(head.toString().getBytes("UTF-8"));
- buffer.writeBytes(bytesBody);
+ if (bytesBody != null)
+ {
+ buffer.writeBytes(bytesBody);
+ }
buffer.writeBytes(END_OF_FRAME);
size = buffer.writerIndex();
@@ -195,8 +205,15 @@
return headers.containsKey(key);
}
- public String getBody()
+ public String getBody() throws UnsupportedEncodingException
{
+ if (body == null)
+ {
+ if (bytesBody != null)
+ {
+ body = new String(bytesBody, "UTF-8");
+ }
+ }
return body;
}
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-09-10
02:13:50 UTC (rev 11316)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-09-12
01:47:54 UTC (rev 11317)
@@ -149,7 +149,7 @@
}
synchronized (connection)
{
- if (connection.isDestroyed() || !connection.isValid())
+ if (connection.isDestroyed())
{
log.warn("Connection closed " + connection);
return;
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-09-10
02:13:50 UTC (rev 11316)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-09-12
01:47:54 UTC (rev 11317)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.protocol.stomp.FrameEventListener;
import org.hornetq.core.protocol.stomp.HornetQStompException;
import org.hornetq.core.protocol.stomp.Stomp;
import org.hornetq.core.protocol.stomp.StompConnection;
@@ -39,18 +40,20 @@
*
* @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
*/
-public class StompFrameHandlerV10 extends VersionedStompFrameHandler
+public class StompFrameHandlerV10 extends VersionedStompFrameHandler implements
FrameEventListener
{
private static final Logger log = Logger.getLogger(StompFrameHandlerV10.class);
public StompFrameHandlerV10(StompConnection connection)
{
this.connection = connection;
+ connection.addStompEventListener(this);
}
@Override
public StompFrame onConnect(StompFrame frame)
{
+ log.error("-----------------onConnection ()");
StompFrame response = null;
Map<String, String> headers = frame.getHeadersMap();
String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
@@ -58,12 +61,14 @@
String clientID = (String)headers.get(Stomp.Headers.Connect.CLIENT_ID);
String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
+ log.error("------------ validating user: " + login + " code " +
passcode);
if (connection.validateUser(login, passcode))
{
+ log.error("-------user OK!!!");
connection.setClientID(clientID);
connection.setValid(true);
- response = new StompFrame(Stomp.Responses.CONNECTED);
+ response = new StompFrameV10(Stomp.Responses.CONNECTED);
response.addHeader(Stomp.Headers.Connected.SESSION,
connection.getID().toString());
@@ -74,6 +79,7 @@
}
else
{
+ log.error("--------user NOT ok!!");
//not valid
response = new StompFrameV10(Stomp.Responses.ERROR);
response.addHeader(Stomp.Headers.Error.MESSAGE, "Failed to connect");
@@ -86,11 +92,6 @@
log.error("Encoding problem", e);
//then we will send a null body message.
}
-
- connection.sendFrame(response);
- connection.destroy();
-
- return null;
}
return response;
}
@@ -105,10 +106,12 @@
@Override
public StompFrame onSend(StompFrame frame)
{
+ log.error("-------------on Send: " + frame);
StompFrame response = null;
try
{
connection.validate();
+ log.error("-----------connection is valid");
String destination = frame.getHeader(Stomp.Headers.Send.DESTINATION);
String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
@@ -120,11 +123,13 @@
StompUtils.copyStandardHeadersFromFrameToMessage(frame, message);
if (frame.hasHeader(Stomp.Headers.CONTENT_LENGTH))
{
+ log.error("--------------------------------it's a bryte
type");
message.setType(Message.BYTES_TYPE);
message.getBodyBuffer().writeBytes(frame.getBodyAsBytes());
}
else
{
+ log.error("------------------ it's a text type");
message.setType(Message.TEXT_TYPE);
String text = frame.getBody();
message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(text));
@@ -367,4 +372,21 @@
return decoder.defaultDecode(buffer);
}
+ @Override
+ public void replySent(StompFrame reply)
+ {
+ log.error("-----------------------need destroy? " +
reply.needsDisconnect());
+ if (reply.needsDisconnect())
+ {
+ connection.destroy();
+ }
+ }
+
+ @Override
+ public void requestAccepted(StompFrame request)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServers.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServers.java 2011-09-10
02:13:50 UTC (rev 11316)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServers.java 2011-09-12
01:47:54 UTC (rev 11317)
@@ -93,4 +93,19 @@
return server;
}
+ public static HornetQServer newHornetQServer(Configuration config,
+ String defUser, String defPass)
+ {
+ HornetQSecurityManager securityManager = new HornetQSecurityManagerImpl();
+
+ securityManager.addUser(defUser, defPass);
+
+ HornetQServer server = HornetQServers.newHornetQServer(config,
+
ManagementFactory.getPlatformMBeanServer(),
+ securityManager,
+ config.isPersistenceEnabled());
+
+ return server;
+ }
+
}
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java 2011-09-10
02:13:50 UTC (rev 11316)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java 2011-09-12
01:47:54 UTC (rev 11317)
@@ -279,6 +279,7 @@
public void route(final ServerMessage message, final RoutingContext context) throws
Exception
{
+ log.error("-------------------in queue route, context: " + context);
context.addQueue(address, this);
}
@@ -362,6 +363,7 @@
return;
}
+ log.error("----------------checkingDirect " + checkDirect);
// The checkDirect flag is periodically set to true, if the delivery is specified
as direct then this causes the
// directDeliver flag to be re-computed resulting in direct delivery if the queue
is empty
// We don't recompute it on every delivery since executing isEmpty is expensive
for a ConcurrentQueue
@@ -384,10 +386,13 @@
checkDirect = false;
}
+ log.error("-----now direct " + direct + " directDeliver " +
directDeliver );
if (direct && directDeliver && deliverDirect(ref))
{
return;
}
+
+ log.error("------- ok, adding ref to the queue");
queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
@@ -396,6 +401,8 @@
directDeliver = false;
executor.execute(concurrentPoller);
+
+ log.error("-----------executing : " + concurrentPoller);
}
public void deliverAsync()
@@ -1946,7 +1953,10 @@
HandleStatus status;
try
{
+ log.error("-------------------Now let consumer " + consumer + "
handle " + reference);
status = consumer.handle(reference);
+
+ log.error("-------------- returned status: " + status);
}
catch (Throwable t)
{
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-09-10
02:13:50 UTC (rev 11316)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-09-12
01:47:54 UTC (rev 11317)
@@ -286,6 +286,7 @@
}
else
{
+ log.error("--------------------- deliver standard");
deliverStandardMessage(ref, message);
}
@@ -695,6 +696,7 @@
*/
private void deliverStandardMessage(final MessageReference ref, final ServerMessage
message)
{
+ log.error("------------------ calling callback " + callback + " to
send message");
int packetSize = callback.sendMessage(message, id, ref.getDeliveryCount());
if (availableCredits != null)
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/spi/core/security/HornetQSecurityManagerImpl.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/spi/core/security/HornetQSecurityManagerImpl.java 2011-09-10
02:13:50 UTC (rev 11316)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/spi/core/security/HornetQSecurityManagerImpl.java 2011-09-12
01:47:54 UTC (rev 11317)
@@ -119,6 +119,7 @@
public void addUser(final String user, final String password)
{
+ log.error("-------------------------------adding user: " + user + "
password " + password);
if (user == null)
{
throw new IllegalArgumentException("User cannot be null");
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompConnectionCleanupTest.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompConnectionCleanupTest.java 2011-09-10
02:13:50 UTC (rev 11316)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompConnectionCleanupTest.java 2011-09-12
01:47:54 UTC (rev 11317)
@@ -39,6 +39,8 @@
frame = receiveFrame(10000);
//We send and consumer a message to ensure a STOMP connection and server session is
created
+
+ System.out.println("Received frame: " + frame);
Assert.assertTrue(frame.startsWith("CONNECTED"));
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTest.java 2011-09-10
02:13:50 UTC (rev 11316)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTest.java 2011-09-12
01:47:54 UTC (rev 11317)
@@ -370,12 +370,13 @@
frame = receiveFrame(100000);
Assert.assertTrue(frame.startsWith("CONNECTED"));
- frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() +
getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
+ frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() +
getQueueName() + "\n" + "ack:auto\n\nfff" + Stomp.NULL;
sendFrame(frame);
sendMessage(getName());
frame = receiveFrame(10000);
+ System.out.println("-------- frame received: " + frame);
Assert.assertTrue(frame.startsWith("MESSAGE"));
Assert.assertTrue(frame.indexOf("destination:") > 0);
Assert.assertTrue(frame.indexOf(getName()) > 0);
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTestBase.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTestBase.java 2011-09-10
02:13:50 UTC (rev 11316)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTestBase.java 2011-09-12
01:47:54 UTC (rev 11317)
@@ -39,7 +39,6 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
@@ -80,7 +79,11 @@
protected JMSServerManager server;
+ protected String defUser = "brianm";
+ protected String defPass = "wombats";
+
+
// Implementation methods
// -------------------------------------------------------------------------
@@ -118,7 +121,7 @@
TransportConfiguration stompTransport = new
TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
config.getAcceptorConfigurations().add(stompTransport);
config.getAcceptorConfigurations().add(new
TransportConfiguration(InVMAcceptorFactory.class.getName()));
- HornetQServer hornetQServer = HornetQServers.newHornetQServer(config);
+ HornetQServer hornetQServer = HornetQServers.newHornetQServer(config, defUser,
defPass);
JMSConfiguration jmsConfig = new JMSConfigurationImpl();
jmsConfig.getQueueConfigurations()