[hornetq-commits] JBoss hornetq SVN: r11317 - in branches/STOMP11: hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl and 6 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Sun Sep 11 21:47:55 EDT 2011


Author: gaohoward
Date: 2011-09-11 21:47:54 -0400 (Sun, 11 Sep 2011)
New Revision: 11317

Modified:
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/BindingsImpl.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServers.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/spi/core/security/HornetQSecurityManagerImpl.java
   branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompConnectionCleanupTest.java
   branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTest.java
   branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTestBase.java
Log:
fixed some stomp tests, with some dirty debug logs, remove later.



Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/BindingsImpl.java	2011-09-10 02:13:50 UTC (rev 11316)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/BindingsImpl.java	2011-09-12 01:47:54 UTC (rev 11317)
@@ -238,12 +238,15 @@
          {
             if (binding.getFilter() == null || binding.getFilter().match(message))
             {
+               log.error("---------------------- route to binding: " + binding);
                binding.getBindable().route(message, context);
 
                routed = true;
             }
          }
       }
+      
+      log.error("-------- now routed is: " + routed);
 
       if (!routed)
       {
@@ -276,6 +279,7 @@
 
                if (theBinding != null)
                {
+                  log.error("------------------- route theBinding: " + theBinding + " mesage: " + message);
                   theBinding.route(message, context);
                }
             }

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2011-09-10 02:13:50 UTC (rev 11316)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2011-09-12 01:47:54 UTC (rev 11317)
@@ -591,8 +591,11 @@
          cleanupInternalPropertiesBeforeRouting(message);
       }
 
+      log.error("----------get address: " + address + " addressManager: " + addressManager);
 
       Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
+      
+      log.error("-------------------Bindings: " + bindings);
 
       if (bindings != null)
       {
@@ -631,6 +634,7 @@
       }
       else
       {
+         log.error("----------processing route: " + context + " direct " + direct);
          processRoute(message, context, direct);
       }
 
@@ -967,6 +971,8 @@
             message.incrementRefCount();
          }
       }
+      
+      log.error("In processing, tx: " + tx);
 
       if (tx != null)
       {

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java	2011-09-10 02:13:50 UTC (rev 11316)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java	2011-09-12 01:47:54 UTC (rev 11317)
@@ -220,6 +220,7 @@
 
          // The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
          // buffer is full, preventing any incoming buffers being handled and blocking failover
+         log.error("------------------------ write buffer " + connection);
          connection.getTransportConnection().write(buffer, flush, batch);
       }
    }

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java	2011-09-10 02:13:50 UTC (rev 11316)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java	2011-09-12 01:47:54 UTC (rev 11317)
@@ -73,6 +73,8 @@
    {
       Packet packet = new SessionReceiveMessage(consumerID, message, deliveryCount);
 
+      log.error("------------------channel sent " + channel);
+      
       channel.sendBatched(packet);
 
       int size = packet.getPacketSize();

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java	2011-09-10 02:13:50 UTC (rev 11316)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java	2011-09-12 01:47:54 UTC (rev 11317)
@@ -55,6 +55,7 @@
 
    private String clientID;
 
+   //this means login is valid. (stomp connection ok)
    private boolean valid;
 
    private boolean destroyed = false;
@@ -75,6 +76,7 @@
    
    private VersionedStompFrameHandler frameHandler;
    
+   //this means the version negotiation done.
    private boolean initialized;
    
    private FrameEventListener stompListener;
@@ -459,6 +461,7 @@
 
    public void sendFrame(StompFrame frame)
    {
+      log.error("--------------- sending reply: " + frame);
       manager.sendReply(this, frame);
    }
 
@@ -518,7 +521,9 @@
       }
       try
       {
+         log.error("--------------------- sending mesage: " + message);
          stompSession.getSession().send(message, true);
+         log.error("----------------------sent by " + stompSession.getSession());
       }
       catch (Exception e)
       {

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java	2011-09-10 02:13:50 UTC (rev 11316)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java	2011-09-12 01:47:54 UTC (rev 11317)
@@ -565,6 +565,8 @@
          data = data - pos;
 
          // reset
+         
+         log.error("-------new Frame decoded: " + command + " headers " + headers + " content " + content);
 
          StompFrame ret = new StompFrame(command, headers, content);
 
@@ -588,6 +590,8 @@
       pos = 0;
 
       command = null;
+      
+      headers = new HashMap<String, String>();
 
       this.headerBytesCopyStart = -1;
 

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java	2011-09-10 02:13:50 UTC (rev 11316)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java	2011-09-12 01:47:54 UTC (rev 11317)
@@ -94,7 +94,7 @@
    @Override
    public String toString()
    {
-      return "StompFrame[command=" + command + ", headers=" + headers + ", content-length=";
+      return "StompFrame[command=" + command + ", headers=" + headers + ", content= " + this.body + " bytes " + this.bytesBody;
    }
 
    public String asString()
@@ -113,7 +113,14 @@
    {
       if (buffer == null)
       {
-         buffer = HornetQBuffers.dynamicBuffer(bytesBody.length + 512);
+         if (bytesBody != null)
+         {
+            buffer = HornetQBuffers.dynamicBuffer(bytesBody.length + 512);
+         }
+         else
+         {
+            buffer = HornetQBuffers.dynamicBuffer(512);
+         }
 
          StringBuffer head = new StringBuffer();
          head.append(command);
@@ -130,7 +137,10 @@
          head.append(Stomp.NEWLINE);
 
          buffer.writeBytes(head.toString().getBytes("UTF-8"));
-         buffer.writeBytes(bytesBody);
+         if (bytesBody != null)
+         {
+            buffer.writeBytes(bytesBody);
+         }
          buffer.writeBytes(END_OF_FRAME);
 
          size = buffer.writerIndex();
@@ -195,8 +205,15 @@
       return headers.containsKey(key);
    }
 
-   public String getBody()
+   public String getBody() throws UnsupportedEncodingException
    {
+      if (body == null)
+      {
+         if (bytesBody != null)
+         {
+            body = new String(bytesBody, "UTF-8");
+         }
+      }
       return body;
    }
    

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2011-09-10 02:13:50 UTC (rev 11316)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2011-09-12 01:47:54 UTC (rev 11317)
@@ -149,7 +149,7 @@
       }
       synchronized (connection)
       {
-         if (connection.isDestroyed() || !connection.isValid())
+         if (connection.isDestroyed())
          {
             log.warn("Connection closed " + connection);
             return;

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java	2011-09-10 02:13:50 UTC (rev 11316)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java	2011-09-12 01:47:54 UTC (rev 11317)
@@ -21,6 +21,7 @@
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.protocol.stomp.FrameEventListener;
 import org.hornetq.core.protocol.stomp.HornetQStompException;
 import org.hornetq.core.protocol.stomp.Stomp;
 import org.hornetq.core.protocol.stomp.StompConnection;
@@ -39,18 +40,20 @@
 *
 * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
 */
-public class StompFrameHandlerV10 extends VersionedStompFrameHandler
+public class StompFrameHandlerV10 extends VersionedStompFrameHandler implements FrameEventListener
 {
    private static final Logger log = Logger.getLogger(StompFrameHandlerV10.class);
    
    public StompFrameHandlerV10(StompConnection connection)
    {
       this.connection = connection;
+      connection.addStompEventListener(this);
    }
 
    @Override
    public StompFrame onConnect(StompFrame frame)
    {
+      log.error("-----------------onConnection ()");
       StompFrame response = null;
       Map<String, String> headers = frame.getHeadersMap();
       String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
@@ -58,12 +61,14 @@
       String clientID = (String)headers.get(Stomp.Headers.Connect.CLIENT_ID);
       String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
 
+      log.error("------------ validating user: " + login + " code " + passcode);
       if (connection.validateUser(login, passcode))
       {
+         log.error("-------user OK!!!");
          connection.setClientID(clientID);
          connection.setValid(true);
          
-         response = new StompFrame(Stomp.Responses.CONNECTED);
+         response = new StompFrameV10(Stomp.Responses.CONNECTED);
          
          response.addHeader(Stomp.Headers.Connected.SESSION, connection.getID().toString());
          
@@ -74,6 +79,7 @@
       }
       else
       {
+         log.error("--------user NOT ok!!");
          //not valid
          response = new StompFrameV10(Stomp.Responses.ERROR);
          response.addHeader(Stomp.Headers.Error.MESSAGE, "Failed to connect");
@@ -86,11 +92,6 @@
             log.error("Encoding problem", e);
             //then we will send a null body message.
          }
-         
-         connection.sendFrame(response);
-         connection.destroy();
-         
-         return null;
       }
       return response;
    }
@@ -105,10 +106,12 @@
    @Override
    public StompFrame onSend(StompFrame frame)
    {
+      log.error("-------------on Send: " + frame);
       StompFrame response = null;
       try
       {
          connection.validate();
+         log.error("-----------connection is valid");
          String destination = frame.getHeader(Stomp.Headers.Send.DESTINATION);
          String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
 
@@ -120,11 +123,13 @@
          StompUtils.copyStandardHeadersFromFrameToMessage(frame, message);
          if (frame.hasHeader(Stomp.Headers.CONTENT_LENGTH))
          {
+            log.error("--------------------------------it's a bryte type");
             message.setType(Message.BYTES_TYPE);
             message.getBodyBuffer().writeBytes(frame.getBodyAsBytes());
          }
          else
          {
+            log.error("------------------ it's a text type");
             message.setType(Message.TEXT_TYPE);
             String text = frame.getBody();
             message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(text));
@@ -367,4 +372,21 @@
       return decoder.defaultDecode(buffer);
    }
 
+   @Override
+   public void replySent(StompFrame reply)
+   {
+      log.error("-----------------------need destroy? " + reply.needsDisconnect());
+      if (reply.needsDisconnect())
+      {
+         connection.destroy();
+      }
+   }
+
+   @Override
+   public void requestAccepted(StompFrame request)
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
 }

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServers.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServers.java	2011-09-10 02:13:50 UTC (rev 11316)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServers.java	2011-09-12 01:47:54 UTC (rev 11317)
@@ -93,4 +93,19 @@
       return server;
    }
 
+   public static HornetQServer newHornetQServer(Configuration config,
+         String defUser, String defPass)
+   {
+      HornetQSecurityManager securityManager = new HornetQSecurityManagerImpl();
+      
+      securityManager.addUser(defUser, defPass);
+
+      HornetQServer server = HornetQServers.newHornetQServer(config,
+                                                      ManagementFactory.getPlatformMBeanServer(),
+                                                      securityManager,
+                                                      config.isPersistenceEnabled());
+
+      return server;
+   }
+
 }

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java	2011-09-10 02:13:50 UTC (rev 11316)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java	2011-09-12 01:47:54 UTC (rev 11317)
@@ -279,6 +279,7 @@
 
    public void route(final ServerMessage message, final RoutingContext context) throws Exception
    {
+      log.error("-------------------in queue route, context: " + context);
       context.addQueue(address, this);
    }
 
@@ -362,6 +363,7 @@
          return;
       }
 
+      log.error("----------------checkingDirect " + checkDirect);
       // The checkDirect flag is periodically set to true, if the delivery is specified as direct then this causes the
       // directDeliver flag to be re-computed resulting in direct delivery if the queue is empty
       // We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue
@@ -384,10 +386,13 @@
          checkDirect = false;
       }
 
+      log.error("-----now direct " + direct + " directDeliver " + directDeliver );
       if (direct && directDeliver && deliverDirect(ref))
       {
          return;
       }
+      
+      log.error("------- ok, adding ref to the queue");
 
       queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
 
@@ -396,6 +401,8 @@
       directDeliver = false;
 
       executor.execute(concurrentPoller);
+      
+      log.error("-----------executing : " + concurrentPoller);
    }
 
    public void deliverAsync()
@@ -1946,7 +1953,10 @@
       HandleStatus status;
       try
       {
+         log.error("-------------------Now let consumer " + consumer + " handle " + reference);
          status = consumer.handle(reference);
+         
+         log.error("-------------- returned status: " + status);
       }
       catch (Throwable t)
       {

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java	2011-09-10 02:13:50 UTC (rev 11316)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java	2011-09-12 01:47:54 UTC (rev 11317)
@@ -286,6 +286,7 @@
          }
          else
          {
+            log.error("--------------------- deliver standard");
             deliverStandardMessage(ref, message);
          }
 
@@ -695,6 +696,7 @@
     */
    private void deliverStandardMessage(final MessageReference ref, final ServerMessage message)
    {
+      log.error("------------------ calling callback " + callback + " to send message");
       int packetSize = callback.sendMessage(message, id, ref.getDeliveryCount());
 
       if (availableCredits != null)

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/spi/core/security/HornetQSecurityManagerImpl.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/spi/core/security/HornetQSecurityManagerImpl.java	2011-09-10 02:13:50 UTC (rev 11316)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/spi/core/security/HornetQSecurityManagerImpl.java	2011-09-12 01:47:54 UTC (rev 11317)
@@ -119,6 +119,7 @@
 
    public void addUser(final String user, final String password)
    {
+      log.error("-------------------------------adding user: " + user + " password " + password);
       if (user == null)
       {
          throw new IllegalArgumentException("User cannot be null");

Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompConnectionCleanupTest.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompConnectionCleanupTest.java	2011-09-10 02:13:50 UTC (rev 11316)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompConnectionCleanupTest.java	2011-09-12 01:47:54 UTC (rev 11317)
@@ -39,6 +39,8 @@
       frame = receiveFrame(10000);
       
       //We send and consumer a message to ensure a STOMP connection and server session is created
+      
+      System.out.println("Received frame: " + frame);
 
       Assert.assertTrue(frame.startsWith("CONNECTED"));
 

Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTest.java	2011-09-10 02:13:50 UTC (rev 11316)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTest.java	2011-09-12 01:47:54 UTC (rev 11317)
@@ -370,12 +370,13 @@
       frame = receiveFrame(100000);
       Assert.assertTrue(frame.startsWith("CONNECTED"));
 
-      frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
+      frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\nfff" + Stomp.NULL;
       sendFrame(frame);
 
       sendMessage(getName());
 
       frame = receiveFrame(10000);
+      System.out.println("-------- frame received: " + frame);
       Assert.assertTrue(frame.startsWith("MESSAGE"));
       Assert.assertTrue(frame.indexOf("destination:") > 0);
       Assert.assertTrue(frame.indexOf(getName()) > 0);

Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTestBase.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTestBase.java	2011-09-10 02:13:50 UTC (rev 11316)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTestBase.java	2011-09-12 01:47:54 UTC (rev 11317)
@@ -39,7 +39,6 @@
 
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
 import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
@@ -80,7 +79,11 @@
 
    protected JMSServerManager server;
    
+   protected String defUser = "brianm";
    
+   protected String defPass = "wombats";
+   
+   
 
    // Implementation methods
    // -------------------------------------------------------------------------
@@ -118,7 +121,7 @@
       TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
       config.getAcceptorConfigurations().add(stompTransport);
       config.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
-      HornetQServer hornetQServer = HornetQServers.newHornetQServer(config);
+      HornetQServer hornetQServer = HornetQServers.newHornetQServer(config, defUser, defPass);
 
       JMSConfiguration jmsConfig = new JMSConfigurationImpl();
       jmsConfig.getQueueConfigurations()



More information about the hornetq-commits mailing list