[hornetq-commits] JBoss hornetq SVN: r8829 - in branches/HORNETQ-129_STOMP_protocol: src/main/org/hornetq/core/remoting/server and 7 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Jan 21 11:46:55 EST 2010


Author: jmesnil
Date: 2010-01-21 11:46:55 -0500 (Thu, 21 Jan 2010)
New Revision: 8829

Added:
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/CoreSessionCallback.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompChannelHandler.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompSessionCallback.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/AbstractServerChannelHandler.java
Removed:
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/
Modified:
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/HornetQPacketHandler.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/ProtocolManager.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/HornetQServer.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/ServerSession.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/ProtocolException.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompDestinationConverter.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompException.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrame.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrameError.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompMarshaller.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompPacketDecoder.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java
   branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0

* refactored so that Stomp implementation is all located in o.h.integration.protocol.stomp
* modified HornetQServer.createSession to pass the SessionCallback directly

Added: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/CoreSessionCallback.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/CoreSessionCallback.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/CoreSessionCallback.java	2010-01-21 16:46:55 UTC (rev 8829)
@@ -0,0 +1,72 @@
+package org.hornetq.core.protocol.core;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.protocol.core.wireformat.SessionProducerCreditsMessage;
+import org.hornetq.core.protocol.core.wireformat.SessionReceiveContinuationMessage;
+import org.hornetq.core.protocol.core.wireformat.SessionReceiveLargeMessage;
+import org.hornetq.core.protocol.core.wireformat.SessionReceiveMessage;
+import org.hornetq.core.remoting.server.ProtocolManager;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.SessionCallback;
+
+/**
+ * A CoreSessionCallback
+ *
+ * @author jmesnil
+ *
+ *
+ */
+public final class CoreSessionCallback implements SessionCallback
+{
+   private final Channel channel;
+
+   private ProtocolManager protocolManager;
+
+   private String name;
+
+   public CoreSessionCallback(String name, ProtocolManager protocolManager, Channel channel)
+   {
+      this.name = name;
+      this.protocolManager = protocolManager;
+      this.channel = channel;
+   }
+
+   public int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize, int deliveryCount)
+   {
+      Packet packet = new SessionReceiveLargeMessage(consumerID, headerBuffer, bodySize, deliveryCount);
+
+      channel.send(packet);
+
+      return packet.getPacketSize();
+   }
+
+   public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean continues, boolean requiresResponse)
+   {
+      Packet packet = new SessionReceiveContinuationMessage(consumerID, body, continues, requiresResponse);
+
+      channel.send(packet);
+
+      return packet.getPacketSize();
+   }
+
+   public int sendMessage(ServerMessage message, long consumerID, int deliveryCount)
+   {
+      Packet packet = new SessionReceiveMessage(consumerID, message, deliveryCount);
+
+      channel.send(packet);
+
+      return packet.getPacketSize();
+   }
+
+   public void sendProducerCreditsMessage(int credits, SimpleString address, int offset)
+   {
+      Packet packet = new SessionProducerCreditsMessage(credits, address, offset);
+
+      channel.send(packet);
+   }
+
+   public void closed()
+   {
+      protocolManager.removeHandler(name);
+   }
+}
\ No newline at end of file

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/HornetQPacketHandler.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/HornetQPacketHandler.java	2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/HornetQPacketHandler.java	2010-01-21 16:46:55 UTC (rev 8829)
@@ -148,7 +148,7 @@
                                        "Server will not accept create session requests");
          }
 
-         Channel channel = connection.getChannel(request.getSessionChannelID(), request.getWindowSize());
+         final Channel channel = connection.getChannel(request.getSessionChannelID(), request.getWindowSize());
 
          ServerSession session = server.createSession(request.getName(),                                                      
                                                       request.getUsername(),
@@ -158,18 +158,16 @@
                                                       request.isAutoCommitSends(),
                                                       request.isAutoCommitAcks(),
                                                       request.isPreAcknowledge(),
-                                                      request.isXA());
+                                                      request.isXA(),
+                                                      new CoreSessionCallback(request.getName(), protocolManager, channel));
 
-         ServerSessionPacketHandler handler = new ServerSessionPacketHandler(protocolManager,
-                                                                             session,
+         ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session,
                                                                              server.getStorageManager()
                                                                                    .newContext(server.getExecutorFactory()
                                                                                                      .getExecutor()),
                                                                              server.getStorageManager(),
                                                                              channel);
 
-         session.setCallback(handler);
-
          channel.setHandler(handler);
 
          // TODO - where is this removed?

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java	2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java	2010-01-21 16:46:55 UTC (rev 8829)
@@ -50,7 +50,6 @@
 import javax.transaction.xa.Xid;
 
 import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.exception.HornetQXAException;
 import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.logging.Logger;
@@ -69,12 +68,8 @@
 import org.hornetq.core.protocol.core.wireformat.SessionDeleteQueueMessage;
 import org.hornetq.core.protocol.core.wireformat.SessionExpiredMessage;
 import org.hornetq.core.protocol.core.wireformat.SessionForceConsumerDelivery;
-import org.hornetq.core.protocol.core.wireformat.SessionProducerCreditsMessage;
 import org.hornetq.core.protocol.core.wireformat.SessionQueueQueryMessage;
 import org.hornetq.core.protocol.core.wireformat.SessionQueueQueryResponseMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionReceiveContinuationMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionReceiveLargeMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionReceiveMessage;
 import org.hornetq.core.protocol.core.wireformat.SessionRequestProducerCreditsMessage;
 import org.hornetq.core.protocol.core.wireformat.SessionSendContinuationMessage;
 import org.hornetq.core.protocol.core.wireformat.SessionSendLargeMessage;
@@ -98,7 +93,6 @@
 import org.hornetq.core.server.QueueQueryResult;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.ServerSession;
-import org.hornetq.core.server.SessionCallback;
 
 /**
  * A ServerSessionPacketHandler
@@ -108,12 +102,10 @@
  * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
  * @author <a href="mailto:clebert.suconic at jboss.org>Clebert Suconic</a>
  */
-public class ServerSessionPacketHandler implements ChannelHandler, CloseListener, FailureListener, SessionCallback
+public class ServerSessionPacketHandler implements ChannelHandler, CloseListener, FailureListener
 {
    private static final Logger log = Logger.getLogger(ServerSessionPacketHandler.class);
 
-   private final CoreProtocolManager protocolManager;
-   
    private final ServerSession session;
 
    private final OperationContext sessionContext;
@@ -125,14 +117,11 @@
 
    private volatile CoreRemotingConnection remotingConnection;
 
-   public ServerSessionPacketHandler(final CoreProtocolManager protocolManager,
-                                     final ServerSession session,
+   public ServerSessionPacketHandler(final ServerSession session,
                                      final OperationContext sessionContext,
                                      final StorageManager storageManager,
                                      final Channel channel)
    {
-      this.protocolManager = protocolManager;
-      
       this.session = session;
 
       this.storageManager = storageManager;
@@ -565,46 +554,7 @@
          channel.close();
       }
    }
-
-   public int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize, int deliveryCount)
-   {
-      Packet packet = new SessionReceiveLargeMessage(consumerID, headerBuffer, bodySize, deliveryCount);
-
-      channel.send(packet);
-
-      return packet.getPacketSize();
-   }
-
-   public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean continues, boolean requiresResponse)
-   {
-      Packet packet = new SessionReceiveContinuationMessage(consumerID, body, continues, requiresResponse);
-
-      channel.send(packet);
-
-      return packet.getPacketSize();
-   }
-
-   public int sendMessage(ServerMessage message, long consumerID, int deliveryCount)
-   {
-      Packet packet = new SessionReceiveMessage(consumerID, message, deliveryCount);
-
-      channel.send(packet);
-
-      return packet.getPacketSize();
-   }
-
-   public void sendProducerCreditsMessage(int credits, SimpleString address, int offset)
-   {
-      Packet packet = new SessionProducerCreditsMessage(credits, address, offset);
-
-      channel.send(packet);
-   }
    
-   public void closed()
-   {
-      protocolManager.removeHandler(session.getName());
-   }
-   
    public int transferConnection(final CoreRemotingConnection newConnection, final int lastReceivedCommandID)
    {
       // We need to disable delivery on all the consumers while the transfer is occurring- otherwise packets might get

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/ProtocolManager.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/ProtocolManager.java	2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/ProtocolManager.java	2010-01-21 16:46:55 UTC (rev 8829)
@@ -26,4 +26,6 @@
 {
    ConnectionEntry createConnectionEntry(Connection connection);
 
+   void removeHandler(String name);
+
 }

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2010-01-21 16:46:55 UTC (rev 8829)
@@ -29,7 +29,6 @@
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.protocol.core.CoreProtocolManager;
-import org.hornetq.core.protocol.core.CoreRemotingConnection;
 import org.hornetq.core.remoting.ProtocolType;
 import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.remoting.server.ConnectionEntry;
@@ -170,14 +169,13 @@
                   return server;
                }
                
-               public CoreRemotingConnection getRemotingConnection(int connectionID)
+               public RemotingConnection getRemotingConnection(int connectionID)
                {
                   ConnectionEntry conn = connections.get(connectionID);
 
                   if (conn != null)
                   {
-                     // FIXME ....
-                     return (CoreRemotingConnection)conn.connection;
+                     return conn.connection;
                   }
                   else
                   {

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/HornetQServer.java	2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/HornetQServer.java	2010-01-21 16:46:55 UTC (rev 8829)
@@ -24,7 +24,7 @@
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.protocol.core.Channel;
-import org.hornetq.core.protocol.core.CoreRemotingConnection;
+import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.remoting.server.RemotingService;
 import org.hornetq.core.replication.ReplicationEndpoint;
 import org.hornetq.core.security.Role;
@@ -76,11 +76,12 @@
                                String username,
                                String password,
                                int minLargeMessageSize,
-                               CoreRemotingConnection remotingConnection,
+                               RemotingConnection remotingConnection,
                                boolean autoCommitSends,
                                boolean autoCommitAcks,
                                boolean preAcknowledge,
-                               boolean xa) throws Exception;
+                               boolean xa,
+                               final SessionCallback callback) throws Exception;
 
    void removeSession(String name) throws Exception;
 

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/ServerSession.java	2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/ServerSession.java	2010-01-21 16:46:55 UTC (rev 8829)
@@ -110,6 +110,4 @@
    void setTransferring(boolean transferring);
    
    void runConnectionFailureRunners();
-   
-   void setCallback(SessionCallback callback);
 }

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-01-21 16:46:55 UTC (rev 8829)
@@ -69,7 +69,7 @@
 import org.hornetq.core.postoffice.impl.LocalQueueBinding;
 import org.hornetq.core.postoffice.impl.PostOfficeImpl;
 import org.hornetq.core.protocol.core.Channel;
-import org.hornetq.core.protocol.core.CoreRemotingConnection;
+import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.remoting.server.RemotingService;
 import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
 import org.hornetq.core.replication.ReplicationEndpoint;
@@ -87,6 +87,7 @@
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.QueueFactory;
 import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.SessionCallback;
 import org.hornetq.core.server.cluster.ClusterManager;
 import org.hornetq.core.server.cluster.DivertConfiguration;
 import org.hornetq.core.server.cluster.QueueConfiguration;
@@ -538,11 +539,12 @@
                                       final String username,
                                       final String password,
                                       final int minLargeMessageSize,
-                                      final CoreRemotingConnection connection,
+                                      final RemotingConnection connection,
                                       final boolean autoCommitSends,
                                       final boolean autoCommitAcks,
                                       final boolean preAcknowledge,
-                                      final boolean xa) throws Exception
+                                      final boolean xa,
+                                      final SessionCallback callback) throws Exception
    {
       if (securityStore != null)
       {
@@ -565,7 +567,8 @@
                                                               securityStore,
                                                               managementService,
                                                               this,
-                                                              configuration.getManagementAddress());
+                                                              configuration.getManagementAddress(),
+                                                              callback);
 
       sessions.put(name, session);
 

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-01-21 16:46:55 UTC (rev 8829)
@@ -42,9 +42,9 @@
 import org.hornetq.core.postoffice.Bindings;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.postoffice.QueueBinding;
-import org.hornetq.core.protocol.core.CoreRemotingConnection;
 import org.hornetq.core.remoting.CloseListener;
 import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.security.CheckType;
 import org.hornetq.core.security.SecurityStore;
 import org.hornetq.core.server.BindingQueryResult;
@@ -99,7 +99,7 @@
 
    private final boolean strictUpdateDeliveryCount;
 
-   private CoreRemotingConnection remotingConnection;
+   private RemotingConnection remotingConnection;
 
    private final Map<Long, ServerConsumer> consumers = new ConcurrentHashMap<Long, ServerConsumer>();
 
@@ -134,7 +134,7 @@
 
    private final RoutingContext routingContext = new RoutingContextImpl(null);
 
-   private SessionCallback callback;
+   private final SessionCallback callback;
 
    // Constructors ---------------------------------------------------------------------------------
 
@@ -147,14 +147,15 @@
                             final boolean preAcknowledge,
                             final boolean strictUpdateDeliveryCount,
                             final boolean xa,
-                            final CoreRemotingConnection remotingConnection,                     
+                            final RemotingConnection remotingConnection,                     
                             final StorageManager storageManager,
                             final PostOffice postOffice,
                             final ResourceManager resourceManager,
                             final SecurityStore securityStore,
                             final ManagementService managementService,
                             final HornetQServer server,
-                            final SimpleString managementAddress) throws Exception
+                            final SimpleString managementAddress,
+                            final SessionCallback callback) throws Exception
    {
       this.username = username;
 
@@ -193,6 +194,8 @@
 
       this.managementAddress = managementAddress;
 
+      this.callback = callback;
+      
       remotingConnection.addFailureListener(this);
 
       remotingConnection.addCloseListener(this);
@@ -200,11 +203,6 @@
 
    // ServerSession implementation ----------------------------------------------------------------------------
 
-   public void setCallback(final SessionCallback callback)
-   {
-      this.callback = callback;
-   }
-
    public String getUsername()
    {
       return username;

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/ProtocolException.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/ProtocolException.java	2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/ProtocolException.java	2010-01-21 16:46:55 UTC (rev 8829)
@@ -22,7 +22,7 @@
 /**
  * @author <a href="http://hiramchirino.com">chirino</a>
  */
-public class ProtocolException extends IOException {
+class ProtocolException extends IOException {
     private static final long serialVersionUID = -2869735532997332242L;
     private final boolean fatal;
 

Added: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompChannelHandler.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompChannelHandler.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompChannelHandler.java	2010-01-21 16:46:55 UTC (rev 8829)
@@ -0,0 +1,273 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.protocol.stomp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.integration.transports.netty.AbstractServerChannelHandler;
+import org.hornetq.integration.transports.netty.NettyAcceptor;
+import org.hornetq.integration.transports.netty.ServerHolder;
+import org.hornetq.jms.client.HornetQBytesMessage;
+import org.hornetq.jms.client.HornetQTextMessage;
+import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
+import org.hornetq.utils.UUIDGenerator;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.group.ChannelGroup;
+
+ at ChannelPipelineCoverage("one")
+public final class StompChannelHandler extends AbstractServerChannelHandler
+{
+   static final Logger log = Logger.getLogger(StompChannelHandler.class);
+
+   final StompMarshaller marshaller;
+
+   private final Map<RemotingConnection, ServerSession> sessions = new HashMap<RemotingConnection, ServerSession>();
+
+   private ServerHolder serverHandler;
+
+   public StompChannelHandler(ServerHolder serverHolder,
+                              final ChannelGroup group,
+                              NettyAcceptor acceptor,
+                              final ConnectionLifeCycleListener listener)
+   {
+      super(group, listener, acceptor);
+      this.serverHandler = serverHolder;
+      this.marshaller = new StompMarshaller();
+   }
+
+   public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
+   {
+      StompFrame frame = (StompFrame)e.getMessage();
+      System.out.println(">>> got frame " + frame);
+
+      // need to interact with HornetQ server & session
+      HornetQServer server = serverHandler.getServer();
+      RemotingConnection connection = serverHandler.getRemotingConnection(e.getChannel().getId());
+
+      try
+      {
+
+         String command = frame.getCommand();
+
+         StompFrame response = null;
+         if (Stomp.Commands.CONNECT.equals(command))
+         {
+            response = onConnect(frame, server, connection);
+         }
+         else if (Stomp.Commands.DISCONNECT.equals(command))
+         {
+            response = onDisconnect(frame, server, connection);
+         }
+         else if (Stomp.Commands.SEND.equals(command))
+         {
+            response = onSend(frame, server, connection);
+         }
+         else if (Stomp.Commands.SUBSCRIBE.equals(command))
+         {
+            response = onSubscribe(frame, server, connection);
+         }
+         else
+         {
+            log.error("Unsupported Stomp frame: " + frame);
+            response = new StompFrame(Stomp.Responses.ERROR,
+                                      new HashMap<String, Object>(),
+                                      ("Unsupported frame: " + command).getBytes());
+         }
+
+         if (response != null)
+         {
+            System.out.println(">>> will reply " + response);
+            byte[] bytes = marshaller.marshal(response);
+            HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+            System.out.println("ready to send reply: " + buffer);
+            connection.getTransportConnection().write(buffer, true);
+         }
+      }
+      catch (StompException ex)
+      {
+         // Let the stomp client know about any protocol errors.
+         ByteArrayOutputStream baos = new ByteArrayOutputStream();
+         PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
+         ex.printStackTrace(stream);
+         stream.close();
+
+         Map<String, Object> headers = new HashMap<String, Object>();
+         headers.put(Stomp.Headers.Error.MESSAGE, e.getMessage());
+
+         final String receiptId = (String)frame.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+         if (receiptId != null)
+         {
+            headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+         }
+
+         StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
+         byte[] bytes = marshaller.marshal(errorMessage);
+         HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+         System.out.println("ready to send reply: " + buffer);
+         connection.getTransportConnection().write(buffer, true);
+
+      }
+      catch (Exception ex)
+      {
+         ex.printStackTrace();
+      }
+   }
+
+   /**
+    * @param frame
+    * @param server
+    * @param connection
+    * @return
+    * @throws StompException 
+    * @throws HornetQException 
+    */
+   private StompFrame onSubscribe(StompFrame frame, HornetQServer server, RemotingConnection connection) throws Exception,
+                                                                                                        StompException,
+                                                                                                        HornetQException
+   {
+      Map<String, Object> headers = frame.getHeaders();
+      String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
+      SimpleString queueName = StompDestinationConverter.toHornetQAddress(queue);
+
+      ServerSession session = checkAndGetSession(connection);
+      long consumerID = server.getStorageManager().generateUniqueID();
+      session.createConsumer(consumerID, queueName, null, false);
+      session.receiveConsumerCredits(consumerID, -1);
+      session.start();
+
+      return null;
+   }
+
+   private ServerSession checkAndGetSession(RemotingConnection connection) throws StompException
+   {
+      ServerSession session = sessions.get(connection);
+      if (session == null)
+      {
+         throw new StompException("Not connected");
+      }
+      return session;
+   }
+
+   private StompFrame onDisconnect(StompFrame frame, HornetQServer server, RemotingConnection connection) throws StompException
+   {
+      ServerSession session = checkAndGetSession(connection);
+      if (session != null)
+      {
+         try
+         {
+            session.close();
+         }
+         catch (Exception e)
+         {
+            throw new StompException(e.getMessage());
+         }
+         sessions.remove(connection);
+      }
+      return null;
+   }
+
+   private StompFrame onSend(StompFrame frame, HornetQServer server, RemotingConnection connection) throws Exception
+   {
+      ServerSession session = checkAndGetSession(connection);
+
+      Map<String, Object> headers = frame.getHeaders();
+      String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
+      /*
+      String type = (String)headers.get(Stomp.Headers.Send.TYPE);
+      long expiration = (Long)headers.get(Stomp.Headers.Send.EXPIRATION_TIME);
+      byte priority = (Byte)headers.get(Stomp.Headers.Send.PRIORITY);
+      boolean durable = (Boolean)headers.get(Stomp.Headers.Send.PERSISTENT);
+      */
+      byte type = HornetQTextMessage.TYPE;
+      if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH))
+      {
+         type = HornetQBytesMessage.TYPE;
+      }
+      long timestamp = System.currentTimeMillis();
+      boolean durable = false;
+      long expiration = -1;
+      byte priority = 9;
+      SimpleString address = StompDestinationConverter.toHornetQAddress(queue);
+
+      ServerMessage message = new ServerMessageImpl(server.getStorageManager().generateUniqueID(), 512);
+      message.setType(type);
+      message.setTimestamp(timestamp);
+      message.setAddress(address);
+      byte[] content = frame.getContent();
+      if (type == HornetQTextMessage.TYPE)
+      {
+         message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(new String(content)));
+      }
+      else
+      {
+         message.getBodyBuffer().writeBytes(content);
+      }
+
+      session.send(message);
+      if (headers.containsKey(Stomp.Headers.RECEIPT_REQUESTED))
+      {
+         Map<String, Object> h = new HashMap<String, Object>();
+         h.put(Stomp.Headers.Response.RECEIPT_ID, headers.get(Stomp.Headers.RECEIPT_REQUESTED));
+         return new StompFrame(Stomp.Responses.RECEIPT, h, new byte[] {});
+      }
+      else
+      {
+         return null;
+      }
+   }
+
+   private StompFrame onConnect(StompFrame frame, HornetQServer server, final RemotingConnection connection) throws Exception
+   {
+      Map<String, Object> headers = frame.getHeaders();
+      String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
+      String passcode = (String)headers.get(Stomp.Headers.Connect.PASSCODE);
+      String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
+
+      String name = UUIDGenerator.getInstance().generateStringUUID();
+      server.createSession(name,
+                           login,
+                           passcode,
+                           HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                           connection,
+                           true,
+                           true,
+                           false,
+                           false,
+                           new StompSessionCallback(marshaller, connection));
+      ServerSession session = server.getSession(name);
+      sessions.put(connection, session);
+      System.out.println(">>> created session " + session);
+      HashMap<String, Object> h = new HashMap<String, Object>();
+      h.put(Stomp.Headers.Connected.SESSION, name);
+      h.put(Stomp.Headers.Connected.RESPONSE_ID, requestID);
+      return new StompFrame(Stomp.Responses.CONNECTED, h, new byte[] {});
+   }
+}
\ No newline at end of file

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompDestinationConverter.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompDestinationConverter.java	2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompDestinationConverter.java	2010-01-21 16:46:55 UTC (rev 8829)
@@ -27,7 +27,7 @@
  *
  *
  */
-public class StompDestinationConverter
+class StompDestinationConverter
 {
 
    // Constants -----------------------------------------------------
@@ -36,65 +36,69 @@
 
    // Static --------------------------------------------------------
 
-   public static SimpleString convertDestination(String name) throws HornetQException
+   public static SimpleString toHornetQAddress(String stompDestination) throws HornetQException
    {
-      if (name == null)
+      if (stompDestination == null)
       {
          throw new HornetQException(HornetQException.ILLEGAL_STATE, "No destination is specified!");
       }
-      else if (name.startsWith("/queue/"))
+      else if (stompDestination.startsWith("/queue/"))
       {
-         String queueName = name.substring("/queue/".length(), name.length());
+         String queueName = stompDestination.substring("/queue/".length(), stompDestination.length());
          return HornetQQueue.createAddressFromName(queueName);
       }
-      else if (name.startsWith("/topic/"))
+      else if (stompDestination.startsWith("/topic/"))
       {
-         String topicName = name.substring("/topic/".length(), name.length());
+         String topicName = stompDestination.substring("/topic/".length(), stompDestination.length());
          return HornetQTopic.createAddressFromName(topicName);
       }
-      else if (name.startsWith("/temp-queue/"))
+      else if (stompDestination.startsWith("/temp-queue/"))
       {
-         String tempName = name.substring("/temp-queue/".length(), name.length());
+         String tempName = stompDestination.substring("/temp-queue/".length(), stompDestination.length());
          return HornetQTemporaryQueue.createAddressFromName(tempName);
       }
-      else if (name.startsWith("/temp-topic/"))
+      else if (stompDestination.startsWith("/temp-topic/"))
       {
-         String tempName = name.substring("/temp-topic/".length(), name.length());
+         String tempName = stompDestination.substring("/temp-topic/".length(), stompDestination.length());
          return HornetQTemporaryTopic.createAddressFromName(tempName);
       }
       else
       {
-         throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal destination name: [" + name +
+         throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal destination name: [" + stompDestination +
                                                                     "] -- StompConnect destinations " +
                                                                     "must begine with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
       }
    }
 
-   public static String toStomp(String address) throws HornetQException
+   public static String toStompDestination(String hornetqAddress) throws HornetQException
    {
-      if (address == null)
+      if (hornetqAddress == null)
       {
          throw new HornetQException(HornetQException.ILLEGAL_STATE, "No destination is specified!");
       }
-      else if (address.startsWith(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX))
+      else if (hornetqAddress.startsWith(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX))
       {
-         return "/queue/" + address.substring(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX.length(), address.length());
+         return "/queue/" + hornetqAddress.substring(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX.length(),
+                                                     hornetqAddress.length());
       }
-      else if (address.startsWith(HornetQTemporaryQueue.JMS_TEMP_QUEUE_ADDRESS_PREFIX))
+      else if (hornetqAddress.startsWith(HornetQTemporaryQueue.JMS_TEMP_QUEUE_ADDRESS_PREFIX))
       {
-         return "/temp-queue/" + address.substring(HornetQTemporaryQueue.JMS_TEMP_QUEUE_ADDRESS_PREFIX.length(), address.length());
+         return "/temp-queue/" + hornetqAddress.substring(HornetQTemporaryQueue.JMS_TEMP_QUEUE_ADDRESS_PREFIX.length(),
+                                                          hornetqAddress.length());
       }
-      else if (address.startsWith(HornetQTopic.JMS_TOPIC_ADDRESS_PREFIX))
+      else if (hornetqAddress.startsWith(HornetQTopic.JMS_TOPIC_ADDRESS_PREFIX))
       {
-         return "/topic/" + address.substring(HornetQTopic.JMS_TOPIC_ADDRESS_PREFIX.length(), address.length());
+         return "/topic/" + hornetqAddress.substring(HornetQTopic.JMS_TOPIC_ADDRESS_PREFIX.length(),
+                                                     hornetqAddress.length());
       }
-      else if (address.startsWith(HornetQTemporaryTopic.JMS_TEMP_TOPIC_ADDRESS_PREFIX))
+      else if (hornetqAddress.startsWith(HornetQTemporaryTopic.JMS_TEMP_TOPIC_ADDRESS_PREFIX))
       {
-         return "/temp-topic/" + address.substring(HornetQTemporaryTopic.JMS_TEMP_TOPIC_ADDRESS_PREFIX.length(), address.length());
+         return "/temp-topic/" + hornetqAddress.substring(HornetQTemporaryTopic.JMS_TEMP_TOPIC_ADDRESS_PREFIX.length(),
+                                                          hornetqAddress.length());
       }
       else
       {
-         throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal address name: [" + address +
+         throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal address name: [" + hornetqAddress +
                                                                     "] -- Acceptable address must comply to JMS semantics");
       }
    }

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompException.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompException.java	2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompException.java	2010-01-21 16:46:55 UTC (rev 8829)
@@ -16,11 +16,9 @@
 /**
  * A StompException
  *
- * @author jmesnil
- *
- *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  */
-public class StompException extends Exception
+class StompException extends Exception
 {
 
    /**

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrame.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrame.java	2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrame.java	2010-01-21 16:46:55 UTC (rev 8829)
@@ -25,7 +25,7 @@
  *
  * @author <a href="http://hiramchirino.com">chirino</a>
  */
-public class StompFrame
+class StompFrame
 {
    private static final byte[] NO_DATA = new byte[] {};
 

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrameError.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrameError.java	2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrameError.java	2010-01-21 16:46:55 UTC (rev 8829)
@@ -22,7 +22,7 @@
  *
  * @author <a href="http://hiramchirino.com">chirino</a>
  */
-public class StompFrameError extends StompFrame {
+class StompFrameError extends StompFrame {
     private final ProtocolException exception;
 
     public StompFrameError(ProtocolException exception) {

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompMarshaller.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompMarshaller.java	2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompMarshaller.java	2010-01-21 16:46:55 UTC (rev 8829)
@@ -30,7 +30,7 @@
 /**
  * Implements marshalling and unmarsalling the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
  */
-public class StompMarshaller {
+class StompMarshaller {
     private static final byte[] NO_DATA = new byte[]{};
     private static final byte[] END_OF_FRAME = new byte[]{0, '\n'};
     private static final int MAX_COMMAND_LENGTH = 1024;

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompPacketDecoder.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompPacketDecoder.java	2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompPacketDecoder.java	2010-01-21 16:46:55 UTC (rev 8829)
@@ -34,9 +34,9 @@
    
    // PacketDecoder implementation ----------------------------------
 
-   public StompPacketDecoder(final StompMarshaller marshaller)
+   public StompPacketDecoder()
    {
-      this.marshaller = marshaller;
+      this.marshaller = new StompMarshaller();
    }
 
    @Override

Added: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompSessionCallback.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompSessionCallback.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompSessionCallback.java	2010-01-21 16:46:55 UTC (rev 8829)
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.integration.protocol.stomp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.SessionCallback;
+import org.hornetq.jms.client.HornetQTextMessage;
+
+/**
+ * A StompSessionCallback
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ */
+class StompSessionCallback implements SessionCallback
+{
+   private final RemotingConnection connection;
+
+   private final StompMarshaller marshaller;
+
+   StompSessionCallback(final StompMarshaller marshaller, final RemotingConnection connection)
+   {
+      this.marshaller = marshaller;
+      this.connection = connection;
+   }
+
+   public void sendProducerCreditsMessage(int credits, SimpleString address, int offset)
+   {
+   }
+
+   public int sendMessage(ServerMessage serverMessage, long consumerID, int deliveryCount)
+   {
+      try
+      {
+         Map<String, Object> headers = new HashMap<String, Object>();
+         headers.put(Stomp.Headers.Message.DESTINATION, StompDestinationConverter.toStompDestination(serverMessage.getAddress()
+                                                                                                       .toString()));
+         byte[] data = new byte[] {};
+         if (serverMessage.getType() == HornetQTextMessage.TYPE)
+         {
+            SimpleString text = serverMessage.getBodyBuffer().readNullableSimpleString();
+            if (text != null)
+            {
+               data = text.toString().getBytes();
+            }
+         }
+         StompFrame msg = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
+         System.out.println("SENDING : " + msg);
+         byte[] bytes = marshaller.marshal(msg);
+         HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+         connection.getTransportConnection().write(buffer, true);
+
+         return bytes.length;
+
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+         return 0;
+      }
+
+   }
+
+   public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean continues, boolean requiresResponse)
+   {
+      return 0;
+   }
+
+   public int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize, int deliveryCount)
+   {
+      return 0;
+   }
+
+   public void closed()
+   {
+   }
+}
\ No newline at end of file

Added: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/AbstractServerChannelHandler.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/AbstractServerChannelHandler.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/AbstractServerChannelHandler.java	2010-01-21 16:46:55 UTC (rev 8829)
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.transports.netty;
+
+import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.handler.ssl.SslHandler;
+
+ at ChannelPipelineCoverage("one")
+public abstract class AbstractServerChannelHandler extends HornetQChannelHandler
+{
+   private NettyAcceptor acceptor;
+
+   protected AbstractServerChannelHandler(final ChannelGroup group, final ConnectionLifeCycleListener listener, NettyAcceptor acceptor)
+   {
+      super(group, listener);
+      this.acceptor = acceptor;
+   }
+
+   @Override
+   public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception
+   {
+      new NettyConnection(e.getChannel(), acceptor.newListener());
+
+      SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
+      if (sslHandler != null)
+      {
+         sslHandler.handshake(e.getChannel()).addListener(new ChannelFutureListener()
+         {
+            public void operationComplete(final ChannelFuture future) throws Exception
+            {
+               if (future.isSuccess())
+               {
+                  active = true;
+               }
+               else
+               {
+                  future.getChannel().close();
+               }
+            }
+         });
+      }
+      else
+      {
+         active = true;
+      }
+   }
+}
\ No newline at end of file

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java	2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java	2010-01-21 16:46:55 UTC (rev 8829)
@@ -17,7 +17,6 @@
 import javax.net.ssl.SSLEngine;
 
 import org.hornetq.integration.protocol.stomp.StompFrameDelimiter;
-import org.hornetq.integration.protocol.stomp.StompMarshaller;
 import org.hornetq.integration.protocol.stomp.StompPacketDecoder;
 import org.hornetq.spi.core.remoting.BufferHandler;
 import org.jboss.netty.channel.ChannelPipeline;
@@ -50,9 +49,8 @@
    public static void addStompStack(final ChannelPipeline pipeline, final ServerHolder serverHandler)
    {
       assert pipeline != null;
-      StompMarshaller marshaller = new StompMarshaller();
       pipeline.addLast("delimiter", new StompFrameDelimiter());
-      pipeline.addLast("codec", new StompPacketDecoder(marshaller));
+      pipeline.addLast("codec", new StompPacketDecoder());
    }
 
    public static void addHornetQCodecFilter(final ChannelPipeline pipeline, final BufferHandler handler)

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java	2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java	2010-01-21 16:46:55 UTC (rev 8829)
@@ -13,12 +13,8 @@
 
 package org.hornetq.integration.transports.netty;
 
-import java.io.ByteArrayOutputStream;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -30,52 +26,31 @@
 
 import javax.net.ssl.SSLContext;
 
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.HornetQBuffers;
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
 import org.hornetq.core.logging.Logger;
-import org.hornetq.core.protocol.core.CoreRemotingConnection;
 import org.hornetq.core.remoting.ProtocolType;
-import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.remoting.impl.ssl.SSLSupport;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.server.ServerSession;
-import org.hornetq.core.server.SessionCallback;
-import org.hornetq.core.server.impl.ServerMessageImpl;
 import org.hornetq.core.server.management.Notification;
 import org.hornetq.core.server.management.NotificationService;
-import org.hornetq.integration.protocol.stomp.Stomp;
-import org.hornetq.integration.protocol.stomp.StompDestinationConverter;
-import org.hornetq.integration.protocol.stomp.StompException;
-import org.hornetq.integration.protocol.stomp.StompFrame;
-import org.hornetq.integration.protocol.stomp.StompMarshaller;
-import org.hornetq.jms.client.HornetQBytesMessage;
-import org.hornetq.jms.client.HornetQTextMessage;
+import org.hornetq.integration.protocol.stomp.StompChannelHandler;
 import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.Connection;
 import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
 import org.hornetq.utils.ConfigurationHelper;
 import org.hornetq.utils.TypedProperties;
-import org.hornetq.utils.UUIDGenerator;
 import org.hornetq.utils.VersionLoader;
 import org.jboss.netty.bootstrap.ServerBootstrap;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineCoverage;
 import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.Channels;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.group.ChannelGroup;
@@ -87,7 +62,6 @@
 import org.jboss.netty.channel.socket.oio.OioServerSocketChannelFactory;
 import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
 import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
-import org.jboss.netty.handler.ssl.SslHandler;
 import org.jboss.netty.util.Version;
 import org.jboss.netty.util.VirtualExecutorService;
 
@@ -102,7 +76,7 @@
  */
 public class NettyAcceptor implements Acceptor
 {
-   private static final Logger log = Logger.getLogger(NettyAcceptor.class);
+   static final Logger log = Logger.getLogger(NettyAcceptor.class);
 
    private ChannelFactory channelFactory;
 
@@ -319,8 +293,8 @@
             {
                ChannelPipelineSupport.addStompStack(pipeline, serverHandler);
                pipeline.addLast("handler", new StompChannelHandler(serverHandler,
-                                                                   new StompMarshaller(),
                                                                    channelGroup,
+                                                                   NettyAcceptor.this,
                                                                    new Listener()));
             }
             else
@@ -516,6 +490,11 @@
       this.notificationService = notificationService;
    }
 
+   public ConnectionLifeCycleListener newListener()
+   {
+      return new Listener();
+   }
+
    // Inner classes -----------------------------------------------------------------------------
 
    private final class HornetQServerChannelHandler extends AbstractServerChannelHandler
@@ -526,7 +505,7 @@
                                   final BufferHandler handler,
                                   final ConnectionLifeCycleListener listener)
       {
-         super(group, listener);
+         super(group, listener, NettyAcceptor.this);
 
          this.handler = handler;
       }
@@ -541,329 +520,8 @@
 
    }
 
-   @ChannelPipelineCoverage("one")
-   public final class StompChannelHandler extends AbstractServerChannelHandler
+   class Listener implements ConnectionLifeCycleListener
    {
-      private final StompMarshaller marshaller;
-
-      private final Map<RemotingConnection, ServerSession> sessions = new HashMap<RemotingConnection, ServerSession>();
-
-      private ServerHolder serverHandler;
-
-      public StompChannelHandler(ServerHolder serverHolder,
-                                 StompMarshaller marshaller,
-                                 final ChannelGroup group,
-                                 final ConnectionLifeCycleListener listener)
-      {
-         super(group, listener);
-         this.serverHandler = serverHolder;
-         this.marshaller = marshaller;
-      }
-
-      public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
-      {
-         StompFrame frame = (StompFrame)e.getMessage();
-         System.out.println(">>> got frame " + frame);
-
-         // need to interact with HornetQ server & session
-         HornetQServer server = serverHandler.getServer();
-         CoreRemotingConnection connection = serverHandler.getRemotingConnection(e.getChannel().getId());
-
-         try
-         {
-
-            String command = frame.getCommand();
-
-            StompFrame response = null;
-            if (Stomp.Commands.CONNECT.equals(command))
-            {
-               response = onConnect(frame, server, connection);
-            }
-            else if (Stomp.Commands.DISCONNECT.equals(command))
-            {
-               response = onDisconnect(frame, server, connection);
-            }
-            else if (Stomp.Commands.SEND.equals(command))
-            {
-               response = onSend(frame, server, connection);
-            }
-            else if (Stomp.Commands.SUBSCRIBE.equals(command))
-            {
-               response = onSubscribe(frame, server, connection);
-            }
-            else
-            {
-               log.error("Unsupported Stomp frame: " + frame);
-               response = new StompFrame(Stomp.Responses.ERROR,
-                                         new HashMap<String, Object>(),
-                                         ("Unsupported frame: " + command).getBytes());
-            }
-
-            if (response != null)
-            {
-               System.out.println(">>> will reply " + response);
-               byte[] bytes = marshaller.marshal(response);
-               HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
-               System.out.println("ready to send reply: " + buffer);
-               connection.getTransportConnection().write(buffer, true);
-            }
-         }
-         catch (StompException ex)
-         {
-            // Let the stomp client know about any protocol errors.
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
-            ex.printStackTrace(stream);
-            stream.close();
-
-            Map<String, Object> headers = new HashMap<String, Object>();
-            headers.put(Stomp.Headers.Error.MESSAGE, e.getMessage());
-
-            final String receiptId = (String)frame.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
-            if (receiptId != null)
-            {
-               headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
-            }
-
-            StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
-            byte[] bytes = marshaller.marshal(errorMessage);
-            HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
-            System.out.println("ready to send reply: " + buffer);
-            connection.getTransportConnection().write(buffer, true);
-
-         }
-         catch (Exception ex)
-         {
-            ex.printStackTrace();
-         }
-      }
-
-      /**
-       * @param frame
-       * @param server
-       * @param connection
-       * @return
-       * @throws StompException 
-       * @throws HornetQException 
-       */
-      private StompFrame onSubscribe(StompFrame frame, HornetQServer server, RemotingConnection connection) throws Exception,
-                                                                                                           StompException,
-                                                                                                           HornetQException
-      {
-         Map<String, Object> headers = frame.getHeaders();
-         String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
-         SimpleString queueName = StompDestinationConverter.convertDestination(queue);
-
-         ServerSession session = checkAndGetSession(connection);
-         long consumerID = server.getStorageManager().generateUniqueID();
-         session.createConsumer(consumerID, queueName, null, false);
-         session.receiveConsumerCredits(consumerID, -1);
-         session.start();
-
-         return null;
-      }
-
-      private ServerSession checkAndGetSession(RemotingConnection connection) throws StompException
-      {
-         ServerSession session = sessions.get(connection);
-         if (session == null)
-         {
-            throw new StompException("Not connected");
-         }
-         return session;
-      }
-
-      private StompFrame onDisconnect(StompFrame frame, HornetQServer server, RemotingConnection connection) throws StompException
-      {
-         ServerSession session = checkAndGetSession(connection);
-         if (session != null)
-         {
-            try
-            {
-               session.close();
-            }
-            catch (Exception e)
-            {
-               throw new StompException(e.getMessage());
-            }
-            sessions.remove(connection);
-         }
-         return null;
-      }
-
-      private StompFrame onSend(StompFrame frame, HornetQServer server, RemotingConnection connection) throws Exception
-      {
-         ServerSession session = checkAndGetSession(connection);
-
-         Map<String, Object> headers = frame.getHeaders();
-         String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
-         /*
-         String type = (String)headers.get(Stomp.Headers.Send.TYPE);
-         long expiration = (Long)headers.get(Stomp.Headers.Send.EXPIRATION_TIME);
-         byte priority = (Byte)headers.get(Stomp.Headers.Send.PRIORITY);
-         boolean durable = (Boolean)headers.get(Stomp.Headers.Send.PERSISTENT);
-         */
-         byte type = HornetQTextMessage.TYPE;
-         if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH))
-         {
-            type = HornetQBytesMessage.TYPE;
-         }
-         long timestamp = System.currentTimeMillis();
-         boolean durable = false;
-         long expiration = -1;
-         byte priority = 9;
-         SimpleString address = StompDestinationConverter.convertDestination(queue);
-
-         ServerMessage message = new ServerMessageImpl(server.getStorageManager().generateUniqueID(), 512);
-         message.setType(type);
-         message.setTimestamp(timestamp);
-         message.setAddress(address);
-         byte[] content = frame.getContent();
-         if (type == HornetQTextMessage.TYPE)
-         {
-            message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(new String(content)));
-         }
-         else
-         {
-            message.getBodyBuffer().writeBytes(content);
-         }
-
-         session.send(message);
-         if (headers.containsKey(Stomp.Headers.RECEIPT_REQUESTED))
-         {
-            Map<String, Object> h = new HashMap<String, Object>();
-            h.put(Stomp.Headers.Response.RECEIPT_ID, headers.get(Stomp.Headers.RECEIPT_REQUESTED));
-            return new StompFrame(Stomp.Responses.RECEIPT, h, new byte[] {});
-         }
-         else
-         {
-            return null;
-         }
-      }
-
-      private StompFrame onConnect(StompFrame frame, HornetQServer server, final CoreRemotingConnection connection) throws Exception
-      {
-         Map<String, Object> headers = frame.getHeaders();
-         String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
-         String passcode = (String)headers.get(Stomp.Headers.Connect.PASSCODE);
-         String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
-
-         String name = UUIDGenerator.getInstance().generateStringUUID();
-         server.createSession(name,
-                              login,
-                              passcode,
-                              HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
-                              connection,
-                              true,
-                              true,
-                              false,
-                              false);
-         ServerSession session = server.getSession(name);
-         session.setCallback(new SessionCallback()
-         {
-            public void sendProducerCreditsMessage(int credits, SimpleString address, int offset)
-            {
-            }
-
-            public int sendMessage(ServerMessage serverMessage, long consumerID, int deliveryCount)
-            {
-               try
-               {
-                  Map<String, Object> headers = new HashMap<String, Object>();
-                  headers.put(Stomp.Headers.Message.DESTINATION,
-                              StompDestinationConverter.toStomp(serverMessage.getAddress().toString()));
-                  byte[] data = new byte[] {};
-                  if (serverMessage.getType() == HornetQTextMessage.TYPE)
-                  {
-                     SimpleString text = serverMessage.getBodyBuffer().readNullableSimpleString();
-                     if (text != null)
-                     {
-                        data = text.toString().getBytes();
-                     }
-                  }
-                  StompFrame msg = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
-                  System.out.println("SENDING : " + msg);
-                  byte[] bytes = marshaller.marshal(msg);
-                  HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
-                  connection.getTransportConnection().write(buffer, true);
-
-                  return bytes.length;
-
-               }
-               catch (Exception e)
-               {
-                  e.printStackTrace();
-                  return 0;
-               }
-
-            }
-
-            public int sendLargeMessageContinuation(long consumerID,
-                                                    byte[] body,
-                                                    boolean continues,
-                                                    boolean requiresResponse)
-            {
-               return 0;
-            }
-
-            public int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize, int deliveryCount)
-            {
-               return 0;
-            }
-
-            public void closed()
-            {
-            }
-         });
-         sessions.put(connection, session);
-         System.out.println(">>> created session " + session);
-         HashMap<String, Object> h = new HashMap<String, Object>();
-         h.put(Stomp.Headers.Connected.SESSION, name);
-         h.put(Stomp.Headers.Connected.RESPONSE_ID, requestID);
-         return new StompFrame(Stomp.Responses.CONNECTED, h, new byte[] {});
-      }
-   }
-
-   @ChannelPipelineCoverage("one")
-   public abstract class AbstractServerChannelHandler extends HornetQChannelHandler
-   {
-      protected AbstractServerChannelHandler(final ChannelGroup group, final ConnectionLifeCycleListener listener)
-      {
-         super(group, listener);
-      }
-
-      @Override
-      public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception
-      {
-         new NettyConnection(e.getChannel(), new Listener());
-
-         SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
-         if (sslHandler != null)
-         {
-            sslHandler.handshake(e.getChannel()).addListener(new ChannelFutureListener()
-            {
-               public void operationComplete(final ChannelFuture future) throws Exception
-               {
-                  if (future.isSuccess())
-                  {
-                     active = true;
-                  }
-                  else
-                  {
-                     future.getChannel().close();
-                  }
-               }
-            });
-         }
-         else
-         {
-            active = true;
-         }
-      }
-   }
-
-   private class Listener implements ConnectionLifeCycleListener
-   {
       public void connectionCreated(final Connection connection, final ProtocolType protocol)
       {
          if (connections.putIfAbsent(connection.getID(), connection) != null)
@@ -896,4 +554,5 @@
 
       }
    }
+
 }

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java	2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java	2010-01-21 16:46:55 UTC (rev 8829)
@@ -13,7 +13,7 @@
 
 package org.hornetq.integration.transports.netty;
 
-import org.hornetq.core.protocol.core.CoreRemotingConnection;
+import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.server.HornetQServer;
 
 /**
@@ -27,6 +27,5 @@
 {
    HornetQServer getServer();
 
-   // FIXME should NOT be CoreRemotingConnection but RemotingConnection
-   CoreRemotingConnection getRemotingConnection(int connectionID);
+   RemotingConnection getRemotingConnection(int connectionID);
 }

Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2010-01-21 16:46:55 UTC (rev 8829)
@@ -849,6 +849,7 @@
    {
       Configuration config = new ConfigurationImpl();
       config.setSecurityEnabled(false);
+      config.setPersistenceEnabled(false);
 
       Map<String, Object> params = new HashMap<String, Object>();
       params.put(TransportConstants.PROTOCOL_PROP_NAME, TransportConstants.STOMP_PROTOCOL);



More information about the hornetq-commits mailing list