[hornetq-commits] JBoss hornetq SVN: r8844 - in trunk: src/main/org/hornetq/core/server and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Jan 26 08:31:50 EST 2010


Author: jmesnil
Date: 2010-01-26 08:31:50 -0500 (Tue, 26 Jan 2010)
New Revision: 8844

Added:
   trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java
Removed:
   trunk/src/main/org/hornetq/core/protocol/stomp/StompSessionCallback.java
Modified:
   trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
   trunk/src/main/org/hornetq/core/server/ServerSession.java
   trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0

* added missing Stomp commands and responses

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java	2010-01-22 15:32:13 UTC (rev 8843)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java	2010-01-26 13:31:50 UTC (rev 8844)
@@ -33,15 +33,23 @@
  *
  *
  */
-public class StompConnection implements RemotingConnection
+class StompConnection implements RemotingConnection
 {
    private static final Logger log = Logger.getLogger(StompConnection.class);
 
-   private final ProtocolManager manager;
+   private final StompProtocolManager manager;
    
    private final Connection transportConnection;
       
-   StompConnection(final Connection transportConnection, final ProtocolManager manager)
+   private String login;
+   
+   private String passcode;
+
+   private String clientID;
+
+   private boolean valid;
+
+   StompConnection(final Connection transportConnection, final StompProtocolManager manager)
    {
       this.transportConnection = transportConnection;
       
@@ -68,6 +76,8 @@
 
    public void destroy()
    {
+      manager.cleanup(this);
+      transportConnection.close();
    }
 
    public void disconnect()
@@ -84,7 +94,7 @@
 
    public List<FailureListener> getFailureListeners()
    {
-      return Collections.EMPTY_LIST;
+      return Collections.emptyList();
    }
 
    public Object getID()
@@ -132,9 +142,38 @@
       manager.handleBuffer(this, buffer);
    }
 
-   public int isReadyToHandle(HornetQBuffer buffer)
+   public void setLogin(String login)
    {
-      return -1;
+      this.login = login;
    }
 
+   public String getLogin()
+   {
+      return login;
+   }
+
+   public void setPasscode(String passcode)
+   {
+      this.passcode = passcode;
+   }
+
+   public String getPasscode()
+   {
+      return passcode;
+   }
+
+   public void setClientID(String clientID)
+   {
+      this.clientID = clientID;
+   }
+
+   public boolean isValid()
+   {
+      return valid;
+   }
+   
+   public void setValid(boolean valid)
+   {
+      this.valid = valid;
+   }
 }

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java	2010-01-22 15:32:13 UTC (rev 8843)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java	2010-01-26 13:31:50 UTC (rev 8844)
@@ -65,6 +65,6 @@
    @Override
    public String toString()
    {
-      return "StompFrame[command=" + command + ", headers=" + headers + ",content-length=" + content.length + "]";
+      return "StompFrame[command=" + command + ", headers=" + headers + ", content-length=" + content.length + "]";
    }
 }

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java	2010-01-22 15:32:13 UTC (rev 8843)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java	2010-01-26 13:31:50 UTC (rev 8844)
@@ -31,7 +31,7 @@
  * Implements marshalling and unmarsalling the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
  */
 class StompMarshaller {
-    private static final byte[] NO_DATA = new byte[]{};
+    public static final byte[] NO_DATA = new byte[]{};
     private static final byte[] END_OF_FRAME = new byte[]{0, '\n'};
     private static final int MAX_COMMAND_LENGTH = 1024;
     private static final int MAX_HEADER_LENGTH = 1024 * 10;

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-01-22 15:32:13 UTC (rev 8843)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-01-26 13:31:50 UTC (rev 8844)
@@ -31,7 +31,6 @@
 import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.ServerSession;
 import org.hornetq.core.server.impl.ServerMessageImpl;
 import org.hornetq.spi.core.protocol.ConnectionEntry;
@@ -43,28 +42,64 @@
 /**
  * StompProtocolManager
  * 
- * A stupid protocol to demonstrate how to implement a new protocol in HornetQ
- *
- * @author Tim Fox
- *
- *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  */
-public class StompProtocolManager implements ProtocolManager
+class StompProtocolManager implements ProtocolManager
 {
+   // Constants -----------------------------------------------------
+
    private static final Logger log = Logger.getLogger(StompProtocolManager.class);
 
+   // Attributes ----------------------------------------------------
+
    private final HornetQServer server;
 
    private final StompMarshaller marshaller;
 
-   private final Map<RemotingConnection, ServerSession> sessions = new HashMap<RemotingConnection, ServerSession>();
+   private final Map<String, StompSession> transactedSessions = new HashMap<String, StompSession>();
 
+   private final Map<RemotingConnection, StompSession> sessions = new HashMap<RemotingConnection, StompSession>();
+
+   // Static --------------------------------------------------------
+
+   private static StompFrame createError(Exception e, StompFrame request)
+   {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      try
+      {
+         // Let the stomp client know about any protocol errors.
+         PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
+         e.printStackTrace(stream);
+         stream.close();
+
+         Map<String, Object> headers = new HashMap<String, Object>();
+         headers.put(Stomp.Headers.Error.MESSAGE, e.getMessage());
+
+         final String receiptId = (String)request.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+         if (receiptId != null)
+         {
+            headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+         }
+
+         return new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
+      }
+      catch (UnsupportedEncodingException ex)
+      {
+         log.warn("Unable to create ERROR frame from the exception", ex);
+         return null;
+      }
+   }
+
+   // Constructors --------------------------------------------------
+
    public StompProtocolManager(final HornetQServer server, final List<Interceptor> interceptors)
    {
       this.server = server;
       this.marshaller = new StompMarshaller();
    }
 
+   // ProtocolManager implementation --------------------------------
+
    public ConnectionEntry createConnectionEntry(final Connection connection)
    {
       StompConnection conn = new StompConnection(connection, this);
@@ -76,160 +111,293 @@
    {
    }
 
+   public int isReadyToHandle(HornetQBuffer buffer)
+   {
+      return -1;
+   }
+
    public void handleBuffer(RemotingConnection connection, HornetQBuffer buffer)
    {
-      StompFrame frame = null;
+      StompConnection conn = (StompConnection)connection;
+      StompFrame request = null;
       try
       {
-         frame = marshaller.unmarshal(buffer);
-         System.out.println("RECEIVED " + frame);
+         request = marshaller.unmarshal(buffer);
+         System.out.println("<<< " + request);
 
-         String command = frame.getCommand();
+         String command = request.getCommand();
 
          StompFrame response = null;
          if (Stomp.Commands.CONNECT.equals(command))
          {
-            response = onConnect(frame, server, connection);
+            response = onConnect(request, server, conn);
          }
          else if (Stomp.Commands.DISCONNECT.equals(command))
          {
-            response = onDisconnect(frame, server, connection);
+            response = onDisconnect(request, server, conn);
          }
          else if (Stomp.Commands.SEND.equals(command))
          {
-            response = onSend(frame, server, connection);
+            response = onSend(request, server, conn);
          }
          else if (Stomp.Commands.SUBSCRIBE.equals(command))
          {
-            response = onSubscribe(frame, server, connection);
+            response = onSubscribe(request, server, conn);
          }
+         else if (Stomp.Commands.UNSUBSCRIBE.equals(command))
+         {
+            response = onUnsubscribe(request, server, conn);
+         }
+         else if (Stomp.Commands.ACK.equals(command))
+         {
+            response = onAck(request, server, conn);
+         }
+         else if (Stomp.Commands.BEGIN.equals(command))
+         {
+            response = onBegin(request, server, conn);
+         }
+         else if (Stomp.Commands.COMMIT.equals(command))
+         {
+            response = onCommit(request, server, conn);
+         }
+         else if (Stomp.Commands.ABORT.equals(command))
+         {
+            response = onAbort(request, server, conn);
+         }
          else
          {
-            log.error("Unsupported Stomp frame: " + frame);
+
+            log.error("Unsupported Stomp frame: " + request);
             response = new StompFrame(Stomp.Responses.ERROR,
                                       new HashMap<String, Object>(),
                                       ("Unsupported frame: " + command).getBytes());
          }
 
+         if (request.getHeaders().containsKey(Stomp.Headers.RECEIPT_REQUESTED))
+         {
+            if (response == null) 
+            {
+               Map<String, Object> h = new HashMap<String, Object>();
+               response = new StompFrame(Stomp.Responses.RECEIPT, h, StompMarshaller.NO_DATA);
+            }
+            response.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, request.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED));
+         }
+
          if (response != null)
          {
             send(connection, response);
          }
       }
-      catch (StompException ex)
+      catch (Exception e)
       {
-         ByteArrayOutputStream baos = new ByteArrayOutputStream();
-         try
+         StompFrame error = createError(e, request);
+         if (error != null)
          {
-            // Let the stomp client know about any protocol errors.
-            PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
-            ex.printStackTrace(stream);
-            stream.close();
+            send(connection, error);
          }
-         catch (UnsupportedEncodingException e)
-         {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-         }
+      }
+   }
 
-         Map<String, Object> headers = new HashMap<String, Object>();
-         headers.put(Stomp.Headers.Error.MESSAGE, ex.getMessage());
+   // Public --------------------------------------------------------
 
-         final String receiptId = (String)frame.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
-         if (receiptId != null)
-         {
-            headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
-         }
+   // Package protected ---------------------------------------------
 
-         StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
-         try
-         {
-            send(connection, errorMessage);
-         }
-         catch (IOException e)
-         {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-         }
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   private StompFrame onSubscribe(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception,
+                                                                                                     StompException,
+                                                                                                     HornetQException
+   {
+      Map<String, Object> headers = frame.getHeaders();
+      String destination = (String)headers.get(Stomp.Headers.Subscribe.DESTINATION);
+      String selector = (String)headers.get(Stomp.Headers.Subscribe.SELECTOR);
+      String ack = (String)headers.get(Stomp.Headers.Subscribe.ACK_MODE);
+      String subID = (String)headers.get(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME);
+
+      if (ack == null)
+      {
+         ack = Stomp.Headers.Subscribe.AckModeValues.AUTO;
       }
-      catch (Exception ex)
+      StompSession stompSession = getSession(connection);
+      long consumerID = server.getStorageManager().generateUniqueID();
+      stompSession.addSubscription(consumerID, subID, destination, selector, ack);
+
+      return null;
+   }
+
+   private StompFrame onUnsubscribe(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception,
+                                                                                                       StompException,
+                                                                                                       HornetQException
+   {
+      Map<String, Object> headers = frame.getHeaders();
+      String destination = (String)headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
+      String id = (String)headers.get(Stomp.Headers.Unsubscribe.ID);
+
+      StompSession stompSession = getSession(connection);
+      stompSession.unsubscribe(destination);
+
+      return null;
+   }
+
+   private StompFrame onAck(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception,
+                                                                                               StompException,
+                                                                                               HornetQException
+   {
+      Map<String, Object> headers = frame.getHeaders();
+      String messageID = (String)headers.get(Stomp.Headers.Ack.MESSAGE_ID);
+      String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
+
+      StompSession stompSession = getSession(connection);
+      stompSession.acknowledge(messageID);
+
+      return null;
+   }
+
+   private StompFrame onBegin(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception
+   {
+      Map<String, Object> headers = frame.getHeaders();
+      String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
+
+      if (transactedSessions.containsKey(txID))
       {
-         ex.printStackTrace();
+         throw new StompException("Transaction already started: " + txID);
       }
+      StompSession stompSession = getTransactedSession(connection, txID);
+      return null;
    }
 
-   private StompFrame onSubscribe(StompFrame frame, HornetQServer server, RemotingConnection connection) throws Exception,
-                                                                                                        StompException,
-                                                                                                        HornetQException
+   private StompFrame onCommit(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception,
+                                                                                                  StompException,
+                                                                                                  HornetQException
    {
       Map<String, Object> headers = frame.getHeaders();
-      String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
-      SimpleString queueName = SimpleString.toSimpleString(StompUtils.toHornetQAddress(queue));
+      String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
 
-      ServerSession session = checkAndGetSession(connection);
-      long consumerID = server.getStorageManager().generateUniqueID();
-      session.createConsumer(consumerID, queueName, null, false);
-      session.receiveConsumerCredits(consumerID, -1);
-      session.start();
+      StompSession session = transactedSessions.remove(txID);
 
+      if (session == null)
+      {
+         throw new StompException("No transaction started: " + txID);
+      }
+
+      session.getSession().commit();
+
       return null;
    }
 
-   private ServerSession checkAndGetSession(RemotingConnection connection) throws StompException
+   private StompFrame onAbort(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception,
+                                                                                                    StompException,
+                                                                                                    HornetQException
    {
-      ServerSession session = sessions.get(connection);
+      Map<String, Object> headers = frame.getHeaders();
+      String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
+
+      StompSession session = transactedSessions.remove(txID);
+
       if (session == null)
       {
+         throw new StompException("No transaction started: " + txID);
+      }
+      session.getSession().rollback(false);
+
+      return null;
+   }
+
+   private void checkConnected(StompConnection connection) throws StompException
+   {
+      if (!connection.isValid())
+      {
          throw new StompException("Not connected");
       }
-      return session;
    }
 
-   private StompFrame onDisconnect(StompFrame frame, HornetQServer server, RemotingConnection connection) throws StompException
+   private StompSession getSession(StompConnection connection) throws Exception
    {
-      ServerSession session = checkAndGetSession(connection);
+      StompSession stompSession = sessions.get(connection);
+      if (stompSession == null)
+      {
+         stompSession = new StompSession(marshaller, connection);
+         String name = UUIDGenerator.getInstance().generateStringUUID();
+         ServerSession session = server.createSession(name,
+                                                      connection.getLogin(),
+                                                      connection.getPasscode(),
+                                                      HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                                      connection,
+                                                      true,
+                                                      false,
+                                                      false,
+                                                      false,
+                                                      stompSession);
+         stompSession.setServerSession(session);
+         sessions.put(connection, stompSession);
+      }
+      return stompSession;
+   }
+
+   private StompSession getTransactedSession(StompConnection connection, String txID) throws Exception
+   {
+      StompSession stompSession = transactedSessions.get(txID);
+      if (stompSession == null)
+      {
+         stompSession = new StompSession(marshaller, connection);
+         String name = UUIDGenerator.getInstance().generateStringUUID();
+         ServerSession session = server.createSession(name,
+                                                      connection.getLogin(),
+                                                      connection.getPasscode(),
+                                                      HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                                      connection,
+                                                      false,
+                                                      false,
+                                                      false,
+                                                      false,
+                                                      stompSession);
+         stompSession.setServerSession(session);
+         transactedSessions.put(txID, stompSession);
+      }
+      return stompSession;
+   }
+
+   private StompFrame onDisconnect(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception
+   {
+      StompSession session = getSession(connection);
       if (session != null)
       {
          try
          {
-            session.close();
+            session.getSession().rollback(true);
+            session.getSession().close();
          }
          catch (Exception e)
          {
             throw new StompException(e.getMessage());
          }
          sessions.remove(connection);
+         connection.setValid(false);
       }
       return null;
    }
 
-   private StompFrame onSend(StompFrame frame, HornetQServer server, RemotingConnection connection) throws Exception
+   private StompFrame onSend(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception
    {
-      ServerSession session = checkAndGetSession(connection);
-
+      checkConnected(connection);
       Map<String, Object> headers = frame.getHeaders();
-      String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
-      /*
-      String type = (String)headers.get(Stomp.Headers.Send.TYPE);
-      long expiration = (Long)headers.get(Stomp.Headers.Send.EXPIRATION_TIME);
-      byte priority = (Byte)headers.get(Stomp.Headers.Send.PRIORITY);
-      boolean durable = (Boolean)headers.get(Stomp.Headers.Send.PERSISTENT);
-      */
+      String queue = (String)headers.remove(Stomp.Headers.Send.DESTINATION);
+      String txID = (String)headers.remove(Stomp.Headers.TRANSACTION);
       byte type = Message.TEXT_TYPE;
       if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH))
       {
          type = Message.BYTES_TYPE;
       }
       long timestamp = System.currentTimeMillis();
-      boolean durable = false;
-      long expiration = -1;
-      byte priority = 9;
       SimpleString address = SimpleString.toSimpleString(StompUtils.toHornetQAddress(queue));
 
       ServerMessageImpl message = new ServerMessageImpl(server.getStorageManager().generateUniqueID(), 512);
       message.setType(type);
       message.setTimestamp(timestamp);
       message.setAddress(address);
+      StompUtils.copyStandardHeadersFromFrameToMessage(frame, message);
       byte[] content = frame.getContent();
       if (type == Message.TEXT_TYPE)
       {
@@ -240,56 +408,75 @@
          message.getBodyBuffer().writeBytes(content);
       }
 
-      session.send(message);
-      if (headers.containsKey(Stomp.Headers.RECEIPT_REQUESTED))
+      ServerSession session = null;
+      if (txID == null)
       {
-         Map<String, Object> h = new HashMap<String, Object>();
-         h.put(Stomp.Headers.Response.RECEIPT_ID, headers.get(Stomp.Headers.RECEIPT_REQUESTED));
-         return new StompFrame(Stomp.Responses.RECEIPT, h, new byte[] {});
-      }
-      else
+         session = getSession(connection).getSession();
+      } else
       {
-         return null;
+         session = transactedSessions.get(txID).getSession();
       }
+
+      session.send(message);
+      return null;
    }
 
-   private StompFrame onConnect(StompFrame frame, HornetQServer server, final RemotingConnection connection) throws Exception
+   private StompFrame onConnect(StompFrame frame, HornetQServer server, final StompConnection connection) throws Exception
    {
       Map<String, Object> headers = frame.getHeaders();
       String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
       String passcode = (String)headers.get(Stomp.Headers.Connect.PASSCODE);
+      String clientID = (String)headers.get(Stomp.Headers.Connect.CLIENT_ID);
       String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
 
-      String name = UUIDGenerator.getInstance().generateStringUUID();
-      ServerSession session = server.createSession(name,
-                                                   login,
-                                                   passcode,
-                                                   HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
-                                                   connection,
-                                                   true,
-                                                   true,
-                                                   false,
-                                                   false,
-                                                   new StompSessionCallback(marshaller, connection));
-      sessions.put(connection, session);
-      System.out.println(">>> created session " + session);
+      server.getSecurityManager().validateUser(login, passcode);
+
+      connection.setLogin(login);
+      connection.setPasscode(passcode);
+      connection.setClientID(clientID);
+      connection.setValid(true);
+
       HashMap<String, Object> h = new HashMap<String, Object>();
-      h.put(Stomp.Headers.Connected.SESSION, name);
-      h.put(Stomp.Headers.Connected.RESPONSE_ID, requestID);
-      return new StompFrame(Stomp.Responses.CONNECTED, h, new byte[] {});
+      h.put(Stomp.Headers.Connected.SESSION, connection.getID());
+      if (requestID != null)
+      {
+         h.put(Stomp.Headers.Connected.RESPONSE_ID, requestID);
+      }
+      return new StompFrame(Stomp.Responses.CONNECTED, h, StompMarshaller.NO_DATA);
    }
 
-   private void send(RemotingConnection connection, StompFrame frame) throws IOException
+   private void send(RemotingConnection connection, StompFrame frame)
    {
-      System.out.println("SENDING >>> " + frame);
-      byte[] bytes = marshaller.marshal(frame);
-      HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
-      System.out.println("ready to send reply: " + buffer);
-      connection.getTransportConnection().write(buffer, true);
+      System.out.println(">>> " + frame);
+
+      try
+      {
+         byte[] bytes = marshaller.marshal(frame);
+         HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+         connection.getTransportConnection().write(buffer, true);
+      }
+      catch (IOException e)
+      {
+         log.error("Unable to send frame " + frame, e);
+      }
    }
 
-   public int isReadyToHandle(HornetQBuffer buffer)
+   public synchronized void cleanup(StompConnection conn)
    {
-      return -1;
+      StompSession session = sessions.remove(conn);
+      if (session != null)
+      {
+         try
+         {
+            session.getSession().rollback(true);
+            session.getSession().close();
+         }
+         catch (Exception e)
+         {
+            log.error(e);
+         }
+      }
    }
+
+   // Inner classes -------------------------------------------------
 }

Copied: trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java (from rev 8843, trunk/src/main/org/hornetq/core/protocol/stomp/StompSessionCallback.java)
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java	2010-01-26 13:31:50 UTC (rev 8844)
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.protocol.stomp.Stomp.Headers;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.ServerSession;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.protocol.SessionCallback;
+
+/**
+ * A StompSession
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ */
+class StompSession implements SessionCallback
+{
+   private final RemotingConnection connection;
+
+   private final StompMarshaller marshaller;
+
+   private ServerSession session;
+
+   private final Map<Long, StompSubscription> subscriptions = new HashMap<Long, StompSubscription>();
+
+   // key = message ID, value = consumer ID
+   private final Map<Long, Long> messagesToAck = new HashMap<Long, Long>();
+
+   StompSession(final StompMarshaller marshaller, final RemotingConnection connection)
+   {
+      this.marshaller = marshaller;
+      this.connection = connection;
+   }
+
+   void setServerSession(ServerSession session)
+   {
+      this.session = session;
+   }
+
+   public ServerSession getSession()
+   {
+      return session;
+   }
+
+   public void sendProducerCreditsMessage(int credits, SimpleString address, int offset)
+   {
+   }
+
+   public int sendMessage(ServerMessage serverMessage, long consumerID, int deliveryCount)
+   {
+      try
+      {
+         Map<String, Object> headers = new HashMap<String, Object>();
+         headers.put(Stomp.Headers.Message.DESTINATION, StompUtils.toStompDestination(serverMessage.getAddress()
+                                                                                                   .toString()));
+         byte[] data = new byte[] {};
+         if (serverMessage.getType() == Message.TEXT_TYPE)
+         {
+            SimpleString text = serverMessage.getBodyBuffer().readNullableSimpleString();
+            if (text != null)
+            {
+               data = text.toString().getBytes();
+            }
+         }
+         else
+         {
+            HornetQBuffer buffer = serverMessage.getBodyBuffer();
+            buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE);
+            int size = serverMessage.getEndOfBodyPosition() - buffer.readerIndex();
+            data = new byte[size];
+            buffer.readBytes(data);
+            headers.put(Headers.CONTENT_LENGTH, data.length);
+         }
+         StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
+         StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
+         System.out.println(">>> " + frame);
+         byte[] bytes = marshaller.marshal(frame);
+         HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+         connection.getTransportConnection().write(buffer, true);
+
+         StompSubscription subscription = subscriptions.get(consumerID);
+
+         if (subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO))
+         {
+            session.acknowledge(consumerID, serverMessage.getMessageID());
+            session.commit();
+         }
+         else
+         {
+            messagesToAck.put(serverMessage.getMessageID(), consumerID);
+         }
+         return bytes.length;
+
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+         return 0;
+      }
+
+   }
+
+   public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean continues, boolean requiresResponse)
+   {
+      return 0;
+   }
+
+   public int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize, int deliveryCount)
+   {
+      return 0;
+   }
+
+   public void closed()
+   {
+   }
+
+   public void acknowledge(String messageID) throws Exception
+   {
+      long id = Long.parseLong(messageID);
+      long consumerID = messagesToAck.remove(id);
+      session.acknowledge(consumerID, id);
+      session.commit();
+   }
+
+   public void addSubscription(long consumerID, String clientID, String destination, String selector, String ack) throws Exception
+   {
+      String queue = StompUtils.toHornetQAddress(destination);
+      synchronized (session)
+      {
+         session.createConsumer(consumerID,
+                                SimpleString.toSimpleString(queue),
+                                SimpleString.toSimpleString(selector),
+                                false);
+         session.receiveConsumerCredits(consumerID, -1);
+         StompSubscription subscription = new StompSubscription(consumerID, clientID, destination, ack);
+         subscriptions.put(consumerID, subscription);
+         // FIXME not very smart: since we can't start the consumer, we start the session
+         // everytime to start the new consumer (and all previous consumers...)
+         session.start();
+      }
+   }
+
+   public void unsubscribe(String destination) throws Exception
+   {
+      Iterator<Entry<Long, StompSubscription>> iterator = subscriptions.entrySet().iterator();
+      while (iterator.hasNext())
+      {
+         Map.Entry<Long, StompSubscription> entry = (Map.Entry<Long, StompSubscription>)iterator.next();
+         long consumerID = entry.getKey();
+         StompSubscription sub = entry.getValue();
+         if (sub.getDestination().equals(destination))
+         {
+            iterator.remove();
+            session.closeConsumer(consumerID);
+         }
+      }
+   }
+}
\ No newline at end of file

Deleted: trunk/src/main/org/hornetq/core/protocol/stomp/StompSessionCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSessionCallback.java	2010-01-22 15:32:13 UTC (rev 8843)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSessionCallback.java	2010-01-26 13:31:50 UTC (rev 8844)
@@ -1,93 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.core.protocol.stomp;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.HornetQBuffers;
-import org.hornetq.api.core.Message;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.core.server.ServerMessage;
-import org.hornetq.spi.core.protocol.RemotingConnection;
-import org.hornetq.spi.core.protocol.SessionCallback;
-
-/**
- * A StompSessionCallback
- *
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- */
-class StompSessionCallback implements SessionCallback
-{
-   private final RemotingConnection connection;
-
-   private final StompMarshaller marshaller;
-
-   StompSessionCallback(final StompMarshaller marshaller, final RemotingConnection connection)
-   {
-      this.marshaller = marshaller;
-      this.connection = connection;
-   }
-
-   public void sendProducerCreditsMessage(int credits, SimpleString address, int offset)
-   {
-   }
-
-   public int sendMessage(ServerMessage serverMessage, long consumerID, int deliveryCount)
-   {
-      try
-      {
-         Map<String, Object> headers = new HashMap<String, Object>();
-         headers.put(Stomp.Headers.Message.DESTINATION, StompUtils.toStompDestination(serverMessage.getAddress()
-                                                                                                       .toString()));
-         byte[] data = new byte[] {};
-         if (serverMessage.getType() == Message.TEXT_TYPE)
-         {
-            SimpleString text = serverMessage.getBodyBuffer().readNullableSimpleString();
-            if (text != null)
-            {
-               data = text.toString().getBytes();
-            }
-         }
-         StompFrame msg = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
-         System.out.println("SENDING : " + msg);
-         byte[] bytes = marshaller.marshal(msg);
-         HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
-         connection.getTransportConnection().write(buffer, true);
-
-         return bytes.length;
-
-      }
-      catch (Exception e)
-      {
-         e.printStackTrace();
-         return 0;
-      }
-
-   }
-
-   public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean continues, boolean requiresResponse)
-   {
-      return 0;
-   }
-
-   public int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize, int deliveryCount)
-   {
-      return 0;
-   }
-
-   public void closed()
-   {
-   }
-}
\ No newline at end of file

Added: trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java	2010-01-26 13:31:50 UTC (rev 8844)
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.stomp;
+
+/**
+ * A StompSubscription
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+public class StompSubscription
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final long consumerID;
+
+   private final String subID;
+
+   private final String destination;
+
+   private final String ack;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public StompSubscription(long consumerID, String subID, String destination, String ack)
+   {
+      this.consumerID = consumerID;
+      this.subID = subID;
+      this.destination = destination;
+      this.ack = ack;
+   }
+
+   // Public --------------------------------------------------------
+
+   public String getAck()
+   {
+      return ack;
+   }
+
+   public String getDestination()
+   {
+      return destination;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java	2010-01-22 15:32:13 UTC (rev 8843)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java	2010-01-26 13:31:50 UTC (rev 8844)
@@ -13,7 +13,16 @@
 
 package org.hornetq.core.protocol.stomp;
 
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
 import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.client.impl.ClientMessageImpl;
+import org.hornetq.core.server.impl.ServerMessageImpl;
 
 /**
  * A StompUtils
@@ -71,9 +80,9 @@
       }
       else
       {
-         throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal destination name: [" + stompDestination +
-                                                                    "] -- StompConnect destinations " +
-                                                                    "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
+         // it is also possible the STOMP client send a message directly to a HornetQ address
+         // in that case, we do nothing:
+         return stompDestination;
       }
    }
 
@@ -101,17 +110,85 @@
       }
       else
       {
-         throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal address name: [" + hornetqAddress +
-                                                                    "] -- Acceptable address must comply to JMS semantics");
+         // do nothing
+         return hornetqAddress;
       }
    }
-   
-   private static String convert(String str, String oldPrefix, String newPrefix) 
+
+   private static String convert(String str, String oldPrefix, String newPrefix)
    {
       String sub = str.substring(oldPrefix.length(), str.length());
       return new String(newPrefix + sub);
    }
 
+   public static void copyStandardHeadersFromFrameToMessage(StompFrame frame, ServerMessageImpl msg) throws Exception
+   {
+      Map<String, Object> headers = new HashMap<String, Object>(frame.getHeaders());
+      
+      String priority = (String)headers.remove(Stomp.Headers.Send.PRIORITY);
+      if (priority != null)
+      {
+      msg.setPriority(Byte.parseByte(priority));
+      }
+      String persistent = (String)headers.remove(Stomp.Headers.Send.PERSISTENT);
+      if (persistent != null)
+      {
+         msg.setDurable(Boolean.parseBoolean(persistent));
+      }
+      // FIXME should use a proper constant
+      msg.putObjectProperty("JMSCorrelationID", headers.remove(Stomp.Headers.Send.CORRELATION_ID));
+      msg.putObjectProperty("JMSType", headers.remove(Stomp.Headers.Send.TYPE));
+
+      String groupID = (String)headers.remove("JMSXGroupID");
+      if (groupID != null)
+      {
+         msg.putStringProperty(Message.HDR_GROUP_ID, SimpleString.toSimpleString(groupID));
+      }
+      Object o = headers.remove(Stomp.Headers.Send.REPLY_TO);
+      if (o != null)
+      {
+         msg.putStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME, SimpleString.toSimpleString((String)o));
+      }
+
+      // now the general headers
+      for (Iterator<Map.Entry<String, Object>> iter = headers.entrySet().iterator(); iter.hasNext();)
+      {
+         Map.Entry<String, Object> entry = iter.next();
+         String name = (String)entry.getKey();
+         Object value = entry.getValue();
+         System.out.println(name + "=" + value);   
+         msg.putObjectProperty(name, value);
+      }
+   }
+
+   public static void copyStandardHeadersFromMessageToFrame(Message message, StompFrame command, int deliveryCount) throws Exception {
+      final Map<String, Object> headers = command.getHeaders();
+      headers.put(Stomp.Headers.Message.DESTINATION, toStompDestination(message.getAddress().toString()));
+      headers.put(Stomp.Headers.Message.MESSAGE_ID, message.getMessageID());
+
+      if (message.getObjectProperty("JMSCorrelationID") != null) {
+          headers.put(Stomp.Headers.Message.CORRELATION_ID, message.getObjectProperty("JMSCorrelationID"));
+      }
+      headers.put(Stomp.Headers.Message.EXPIRATION_TIME, "" + message.getExpiration());
+      headers.put(Stomp.Headers.Message.REDELIVERED, deliveryCount > 1);
+      headers.put(Stomp.Headers.Message.PRORITY, "" + message.getPriority());
+
+      if (message.getStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME) != null) {
+          headers.put(Stomp.Headers.Message.REPLY_TO, toStompDestination(message.getStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME)));
+      }
+      headers.put(Stomp.Headers.Message.TIMESTAMP, "" + message.getTimestamp());
+
+      if (message.getObjectProperty("JMSType") != null) {
+          headers.put(Stomp.Headers.Message.TYPE, message.getObjectProperty("JMSType"));
+      }
+
+      // now lets add all the message headers
+      Set<SimpleString> names = message.getPropertyNames();
+      for (SimpleString name : names)
+      {
+          headers.put(name.toString(), message.getObjectProperty(name));
+      }
+  }
    // Constructors --------------------------------------------------
 
    // Public --------------------------------------------------------

Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java	2010-01-22 15:32:13 UTC (rev 8843)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java	2010-01-26 13:31:50 UTC (rev 8844)
@@ -40,7 +40,7 @@
 
    Object getConnectionID();
 
-   void removeConsumer(ServerConsumer consumer) throws Exception;
+   void removeConsumer(long consumerID) throws Exception;
 
    void acknowledge(long consumerID, long messageID) throws Exception;
 

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2010-01-22 15:32:13 UTC (rev 8843)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2010-01-26 13:31:50 UTC (rev 8844)
@@ -284,7 +284,7 @@
          messageQueue.removeConsumer(this);
       }
 
-      session.removeConsumer(this);
+      session.removeConsumer(id);
 
       LinkedList<MessageReference> refs = cancelRefs(false, null);
 

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-01-22 15:32:13 UTC (rev 8843)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-01-26 13:31:50 UTC (rev 8844)
@@ -228,11 +228,11 @@
       return remotingConnection.getID();
    }
 
-   public void removeConsumer(final ServerConsumer consumer) throws Exception
+   public void removeConsumer(final long consumerID) throws Exception
    {
-      if (consumers.remove(consumer.getID()) == null)
+      if (consumers.remove(consumerID) == null)
       {
-         throw new IllegalStateException("Cannot find consumer with id " + consumer.getID() + " to remove");
+         throw new IllegalStateException("Cannot find consumer with id " + consumerID + " to remove");
       }
    }
 

Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2010-01-22 15:32:13 UTC (rev 8843)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2010-01-26 13:31:50 UTC (rev 8844)
@@ -31,6 +31,7 @@
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
 import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
@@ -217,7 +218,7 @@
        Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
    }
 
-    public void _testJMSXGroupIdCanBeSet() throws Exception {
+    public void testJMSXGroupIdCanBeSet() throws Exception {
 
         MessageConsumer consumer = session.createConsumer(queue);
 
@@ -242,11 +243,11 @@
 
         TextMessage message = (TextMessage) consumer.receive(1000);
         Assert.assertNotNull(message);
-        // TODO do we support it?
-        //Assert.assertEquals("TEST", ((TextMessage) message).getGroupID());
+        // differ from StompConnect
+        Assert.assertEquals("TEST", ((TextMessage) message).getStringProperty("JMSXGroupID"));
     }
 
-    public void _testSendMessageWithCustomHeadersAndSelector() throws Exception {
+    public void testSendMessageWithCustomHeadersAndSelector() throws Exception {
 
         MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'");
 
@@ -277,7 +278,7 @@
         Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
     }
 
-    public void _testSendMessageWithStandardHeaders() throws Exception {
+    public void testSendMessageWithStandardHeaders() throws Exception {
 
         MessageConsumer consumer = session.createConsumer(queue);
 
@@ -294,6 +295,7 @@
         frame =
                 "SEND\n" +
                         "correlation-id:c123\n" +
+                        "persistent:true\n" +
                         "priority:3\n" +
                         "type:t345\n" +
                         "JMSXGroupID:abc\n" +
@@ -311,6 +313,7 @@
         Assert.assertEquals("JMSCorrelationID", "c123", message.getJMSCorrelationID());
         Assert.assertEquals("getJMSType", "t345", message.getJMSType());
         Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority());
+        Assert.assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
         Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
         Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
 
@@ -350,9 +353,15 @@
                         "\n\n" +
                         Stomp.NULL;
         sendFrame(frame);
+        
+        // message should not be received as it was auto-acked
+        MessageConsumer consumer = session.createConsumer(queue);
+        TextMessage message = (TextMessage) consumer.receive(1000);
+        Assert.assertNull(message);
+
     }
 
-    public void _testSubscribeWithAutoAckAndBytesMessage() throws Exception {
+    public void testSubscribeWithAutoAckAndBytesMessage() throws Exception {
 
         String frame =
                 "CONNECT\n" +
@@ -371,7 +380,8 @@
                         Stomp.NULL;
         sendFrame(frame);
 
-        sendBytesMessage(new byte[]{1, 2, 3, 4, 5});
+        byte[] payload = new byte[]{1, 2, 3, 4, 5}; 
+        sendBytesMessage(payload);
 
         frame = receiveFrame(10000);
         Assert.assertTrue(frame.startsWith("MESSAGE"));
@@ -382,7 +392,8 @@
         Assert.assertEquals("5", cl_matcher.group(1));
 
         Assert.assertFalse(Pattern.compile("type:\\s*null", Pattern.CASE_INSENSITIVE).matcher(frame).find());
-
+        Assert.assertTrue(frame.indexOf(new String(payload)) > -1);
+        
         frame =
                 "DISCONNECT\n" +
                         "\n\n" +
@@ -390,7 +401,7 @@
         sendFrame(frame);
     }
 
-    public void _testSubscribeWithMessageSentWithProperties() throws Exception {
+    public void testSubscribeWithMessageSentWithProperties() throws Exception {
 
         String frame =
                 "CONNECT\n" +
@@ -422,6 +433,8 @@
         producer.send(message);
 
         frame = receiveFrame(10000);
+        Assert.assertNotNull(frame);
+        System.out.println(frame);
         Assert.assertTrue(frame.startsWith("MESSAGE"));
         Assert.assertTrue(frame.indexOf("S:") > 0);
         Assert.assertTrue(frame.indexOf("n:") > 0);
@@ -442,7 +455,7 @@
         sendFrame(frame);
     }
 
-    public void _testMessagesAreInOrder() throws Exception {
+    public void testMessagesAreInOrder() throws Exception {
         int ctr = 10;
         String[] data = new String[ctr];
 
@@ -493,7 +506,7 @@
         sendFrame(frame);
     }
 
-    public void _testSubscribeWithAutoAckAndSelector() throws Exception {
+    public void testSubscribeWithAutoAckAndSelector() throws Exception {
 
         String frame =
                 "CONNECT\n" +
@@ -527,8 +540,54 @@
         sendFrame(frame);
     }
 
-    public void _testSubscribeWithClientAck() throws Exception {
+    public void testSubscribeWithClientAck() throws Exception {
 
+       String frame =
+               "CONNECT\n" +
+                       "login: brianm\n" +
+                       "passcode: wombats\n\n" +
+                       Stomp.NULL;
+       sendFrame(frame);
+
+       frame = receiveFrame(10000);
+       Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+       frame =
+               "SUBSCRIBE\n" +
+                       "destination:/queue/" + getQueueName() + "\n" +
+                       "ack:client\n\n" +
+                       Stomp.NULL;
+
+       sendFrame(frame);
+       
+       sendMessage(getName());
+       frame = receiveFrame(10000);
+       Assert.assertTrue(frame.startsWith("MESSAGE"));
+       Pattern cl = Pattern.compile("message-id:\\s*(\\d+)", Pattern.CASE_INSENSITIVE);
+       Matcher cl_matcher = cl.matcher(frame);
+       Assert.assertTrue(cl_matcher.find());
+       String messageID = cl_matcher.group(1);
+
+       frame =
+          "ACK\n" +
+                  "message-id: " + messageID + "\n\n" +
+                  Stomp.NULL;
+       sendFrame(frame);
+
+       frame =
+               "DISCONNECT\n" +
+                       "\n\n" +
+                       Stomp.NULL;
+       sendFrame(frame);
+
+       // message should not be received since message was acknowledged by the client
+       MessageConsumer consumer = session.createConsumer(queue);
+       TextMessage message = (TextMessage) consumer.receive(1000);
+       Assert.assertNull(message);
+   }
+    
+    public void testRedeliveryWithClientAck() throws Exception {
+
         String frame =
                 "CONNECT\n" +
                         "login: brianm\n" +
@@ -546,6 +605,7 @@
                         Stomp.NULL;
 
         sendFrame(frame);
+        
         sendMessage(getName());
         frame = receiveFrame(10000);
         Assert.assertTrue(frame.startsWith("MESSAGE"));
@@ -563,11 +623,11 @@
         Assert.assertTrue(message.getJMSRedelivered());
     }
 
-    public void _testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws Exception {
+    public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws Exception {
         assertSubscribeWithClientAckThenConsumeWithAutoAck(false);
     }
 
-    public void _testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithExplicitDisconnect() throws Exception {
+    public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithExplicitDisconnect() throws Exception {
         assertSubscribeWithClientAckThenConsumeWithAutoAck(true);
     }
 
@@ -663,7 +723,7 @@
         Assert.assertTrue(frame.contains("shouldBeNextMessage"));
     }
 
-    public void _testUnsubscribe() throws Exception {
+    public void testUnsubscribe() throws Exception {
 
         String frame =
                 "CONNECT\n" +
@@ -685,7 +745,7 @@
         sendMessage("first message");
 
         //receive message from socket
-        frame = receiveFrame(1000);
+        frame = receiveFrame(10000);
         Assert.assertTrue(frame.startsWith("MESSAGE"));
 
         //remove suscription
@@ -711,7 +771,7 @@
         }
     }
 
-    public void _testTransactionCommit() throws Exception {
+    public void testTransactionCommit() throws Exception {
         MessageConsumer consumer = session.createConsumer(queue);
 
         String frame =
@@ -757,7 +817,7 @@
         Assert.assertNotNull("Should have received a message", message);
     }
 
-    public void _testTransactionRollback() throws Exception {
+    public void testTransactionRollback() throws Exception {
         MessageConsumer consumer = session.createConsumer(queue);
 
         String frame =



More information about the hornetq-commits mailing list