[jboss-cvs] JBoss Messaging SVN: r3783 - in trunk: src/main/org/jboss/jms/client/api and 6 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Feb 25 07:15:14 EST 2008


Author: timfox
Date: 2008-02-25 07:15:14 -0500 (Mon, 25 Feb 2008)
New Revision: 3783

Added:
   trunk/src/main/org/jboss/jms/server/endpoint/ServerProducer.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerProducerEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerProducerPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionCreateProducerMessageCodec.java
   trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionCreateProducerResponseMessageCodec.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ProducerSendMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateProducerMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateProducerResponseMessage.java
Removed:
   trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionSendMessageCodec.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionSendMessage.java
Modified:
   trunk/src/main/org/jboss/jms/client/JBossConnection.java
   trunk/src/main/org/jboss/jms/client/JBossSession.java
   trunk/src/main/org/jboss/jms/client/api/ClientConnection.java
   trunk/src/main/org/jboss/jms/client/api/ClientProducer.java
   trunk/src/main/org/jboss/jms/client/api/ClientSession.java
   trunk/src/main/org/jboss/jms/client/impl/ClientConnectionImpl.java
   trunk/src/main/org/jboss/jms/client/impl/ClientProducerImpl.java
   trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/jms/client/impl/ClientSessionInternal.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSession.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConnectionCreateSessionMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConnectionCreateSessionResponseMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConsumerFlowTokenMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/CreateConnectionRequest.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/CreateConnectionResponse.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/DeliverMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/MessagingExceptionMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/Ping.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/Pong.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionAcknowledgeMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionAddAddressMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBindingQueryMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBindingQueryResponseMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserHasNextMessageMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserHasNextMessageResponseMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserNextMessageBlockMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserNextMessageBlockResponseMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserNextMessageResponseMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserResetMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCancelMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateBrowserMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateBrowserResponseMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateConsumerMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateConsumerResponseMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateQueueMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionDeleteQueueMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionQueueQueryMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionQueueQueryResponseMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionRemoveAddressMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXACommitMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAEndMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAForgetMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAGetInDoubtXidsResponseMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAGetTimeoutResponseMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAJoinMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAPrepareMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAResponseMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAResumeMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXARollbackMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXASetTimeoutMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXASetTimeoutResponseMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAStartMessage.java
   trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java
Log:
Various tweaks plus create producer endpoint in preparation for producer flow control, also added producer caching


Modified: trunk/src/main/org/jboss/jms/client/JBossConnection.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossConnection.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/jms/client/JBossConnection.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -115,9 +115,9 @@
 
    public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException
    {
-      return createSessionInternal(transacted, acknowledgeMode, false, TYPE_GENERIC_CONNECTION);
+      return createSessionInternal(transacted, acknowledgeMode, false, TYPE_GENERIC_CONNECTION, false);
    }
-
+   
    public String getClientID() throws JMSException
    {
       checkClosed();
@@ -263,7 +263,7 @@
                                           int acknowledgeMode) throws JMSException
    {
        return createSessionInternal(transacted, acknowledgeMode, false,
-                                    JBossSession.TYPE_QUEUE_SESSION);
+                                    JBossSession.TYPE_QUEUE_SESSION, false);
    }
 
    public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector,
@@ -281,9 +281,9 @@
                                           int acknowledgeMode) throws JMSException
    {
       return createSessionInternal(transacted, acknowledgeMode, false,
-                                   JBossSession.TYPE_TOPIC_SESSION);
+                                   JBossSession.TYPE_TOPIC_SESSION, false);
    }
-
+   
    public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector,
                                                       ServerSessionPool sessionPool,
                                                       int maxMessages) throws JMSException
@@ -298,29 +298,63 @@
    public XASession createXASession() throws JMSException
    {
        return createSessionInternal(true, Session.SESSION_TRANSACTED, true,
-                                    JBossSession.TYPE_GENERIC_SESSION);
+                                    JBossSession.TYPE_GENERIC_SESSION, false);
    }
-
+   
    // XAQueueConnection implementation -------------------------------------------------------------
 
    public XAQueueSession createXAQueueSession() throws JMSException
    {
       return createSessionInternal(true, Session.SESSION_TRANSACTED, true,
-                                   JBossSession.TYPE_QUEUE_SESSION);
+                                   JBossSession.TYPE_QUEUE_SESSION, false);
 
    }
+   
 
    // XATopicConnection implementation -------------------------------------------------------------
 
    public XATopicSession createXATopicSession() throws JMSException
    {
       return createSessionInternal(true, Session.SESSION_TRANSACTED, true,
-                                   JBossSession.TYPE_TOPIC_SESSION);
+                                   JBossSession.TYPE_TOPIC_SESSION, false);
 
    }
-
+   
    // Public ---------------------------------------------------------------------------------------
 
+   // We provide some overloaded createSession methods to allow the value of cacheProducers to be specified
+   
+   public Session createSession(boolean transacted, int acknowledgeMode, boolean cacheProducers) throws JMSException
+   {
+      return createSessionInternal(transacted, acknowledgeMode, false, TYPE_GENERIC_CONNECTION, cacheProducers);
+   }
+   
+   public TopicSession createTopicSession(boolean transacted,
+         int acknowledgeMode, boolean cacheProducers) throws JMSException
+   {
+      return createSessionInternal(transacted, acknowledgeMode, false,
+                                   JBossSession.TYPE_TOPIC_SESSION, cacheProducers);
+   }
+
+   public XASession createXASession(boolean cacheProducers) throws JMSException
+   {
+       return createSessionInternal(true, Session.SESSION_TRANSACTED, true,
+                                    JBossSession.TYPE_GENERIC_SESSION, cacheProducers);
+   }
+   
+   public XAQueueSession createXAQueueSession(boolean cacheProducers) throws JMSException
+   {
+      return createSessionInternal(true, Session.SESSION_TRANSACTED, true,
+                                   JBossSession.TYPE_QUEUE_SESSION, cacheProducers);
+   }
+   
+   public XATopicSession createXATopicSession(boolean cacheProducers) throws JMSException
+   {
+      return createSessionInternal(true, Session.SESSION_TRANSACTED, true,
+                                   JBossSession.TYPE_TOPIC_SESSION, cacheProducers);
+   }
+
+
    public ClientConnection getConnection()
    {
       return connection;
@@ -342,7 +376,7 @@
    // Protected ------------------------------------------------------------------------------------
 
    protected JBossSession createSessionInternal(boolean transacted, int acknowledgeMode,
-                                                boolean isXA, int type) throws JMSException
+                                                boolean isXA, int type, boolean cacheProducers) throws JMSException
    {
       if (transacted)
       {
@@ -387,7 +421,8 @@
             }
          }
 
-         ClientSession session =  connection.createClientSession(isXA, autoCommitSends, autoCommitAcks, ackBatchSize);
+         ClientSession session =
+         	connection.createClientSession(isXA, autoCommitSends, autoCommitAcks, ackBatchSize, cacheProducers);
 
          justCreated = false;
          

Modified: trunk/src/main/org/jboss/jms/client/JBossSession.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossSession.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/jms/client/JBossSession.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -343,11 +343,13 @@
          throw new InvalidDestinationException("Not a JBoss Destination:" + d);
       }           
       
+      JBossDestination jbd = (JBossDestination)d;
+      
       try
       {
-         ClientProducer producer = session.createProducer();
+         ClientProducer producer = session.createProducer(jbd == null ? null : jbd.getAddress());
 
-         return new JBossMessageProducer(producer, (JBossDestination)d);
+         return new JBossMessageProducer(producer, jbd);
       }
       catch (MessagingException e)
       {

Modified: trunk/src/main/org/jboss/jms/client/api/ClientConnection.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/api/ClientConnection.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/jms/client/api/ClientConnection.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -16,7 +16,7 @@
 public interface ClientConnection
 {    
    ClientSession createClientSession(boolean xa, boolean autoCommitSends, boolean autoCommitAcks,
-                                     int ackBatchSize) throws MessagingException;
+                                     int ackBatchSize, boolean cacheProducers) throws MessagingException;
 
    void start() throws MessagingException;
 

Modified: trunk/src/main/org/jboss/jms/client/api/ClientProducer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/api/ClientProducer.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/jms/client/api/ClientProducer.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -15,6 +15,8 @@
  */
 public interface ClientProducer
 {        
+	String getAddress();
+	
    void send(String address, Message message) throws MessagingException;
    
    void registerAcknowledgementHandler(AcknowledgementHandler handler);

Modified: trunk/src/main/org/jboss/jms/client/api/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/api/ClientSession.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/jms/client/api/ClientSession.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -38,7 +38,7 @@
    
    ClientBrowser createBrowser(String queueName, String messageSelector) throws MessagingException;
    
-   ClientProducer createProducer() throws MessagingException;
+   ClientProducer createProducer(String address) throws MessagingException;
    
    XAResource getXAResource();
 

Modified: trunk/src/main/org/jboss/jms/client/impl/ClientConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientConnectionImpl.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientConnectionImpl.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -90,7 +90,7 @@
    // ClientConnection implementation --------------------------------------------------------------
 
    public ClientSession createClientSession(boolean xa, boolean autoCommitSends, boolean autoCommitAcks,
-                                            int ackBatchSize) throws MessagingException
+                                            int ackBatchSize, boolean cacheProducers) throws MessagingException
    {
       checkClosed();
 
@@ -98,7 +98,7 @@
 
       ConnectionCreateSessionResponseMessage response = (ConnectionCreateSessionResponseMessage)remotingConnection.send(id, request);   
 
-      ClientSession session =  new ClientSessionImpl(this, response.getSessionID(), ackBatchSize);
+      ClientSession session =  new ClientSessionImpl(this, response.getSessionID(), ackBatchSize, cacheProducers);
 
       children.put(response.getSessionID(), session);
 

Modified: trunk/src/main/org/jboss/jms/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientProducerImpl.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientProducerImpl.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -23,7 +23,9 @@
 
 import org.jboss.jms.client.api.AcknowledgementHandler;
 import org.jboss.jms.client.api.ClientProducer;
+import org.jboss.jms.client.remoting.RemotingConnection;
 import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.remoting.wireformat.ProducerSendMessage;
 import org.jboss.messaging.util.Logger;
 import org.jboss.messaging.util.MessagingException;
 
@@ -47,26 +49,46 @@
 
    private boolean trace = log.isTraceEnabled();
    
-   private ClientSessionInternal session;
+   private final String address;
    
+   private final String id;
+   
+   private final ClientSessionInternal session;
+   
+   private final RemotingConnection remotingConnection;
+   
    private volatile boolean closed;
    
    // Static ---------------------------------------------------------------------------------------
 
    // Constructors ---------------------------------------------------------------------------------
       
-   public ClientProducerImpl(ClientSessionInternal session)
-   {
+   public ClientProducerImpl(final ClientSessionInternal session, final String id, final String address,
+   		                    final RemotingConnection remotingConnection)
+   {   	
       this.session = session;
+      
+      this.id = id;
+      
+      this.address = address;
+      
+      this.remotingConnection = remotingConnection;
    }
    
    // ClientProducer implementation ----------------------------------------------------------------
 
-   public void send(String address, Message message) throws MessagingException
+   public String getAddress()
    {
+   	return address;
+   }
+   
+   public void send(String address, Message msg) throws MessagingException
+   {
       checkClosed();
       
-      session.send(address, message);
+      ProducerSendMessage message = new ProducerSendMessage(address, msg.copy());
+      
+      remotingConnection.send(id, message, !msg.isDurable());
    }
    
    public void registerAcknowledgementHandler(AcknowledgementHandler handler)

Modified: trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -38,7 +38,6 @@
 import org.jboss.jms.client.api.ClientConsumer;
 import org.jboss.jms.client.api.ClientProducer;
 import org.jboss.jms.client.remoting.RemotingConnection;
-import org.jboss.messaging.core.Message;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
 import org.jboss.messaging.core.remoting.wireformat.CloseMessage;
@@ -53,13 +52,14 @@
 import org.jboss.messaging.core.remoting.wireformat.SessionCreateBrowserResponseMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionCreateConsumerMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionCreateConsumerResponseMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCreateProducerMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCreateProducerResponseMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionCreateQueueMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionDeleteQueueMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryResponseMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionRemoveAddressMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionRollbackMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionSendMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionXACommitMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionXAEndMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionXAForgetMessage;
@@ -99,10 +99,16 @@
 
    // Attributes -----------------------------------------------------------------------------------
 
-   private String id;
+   private final ClientConnectionInternal connection;
+      
+   private final String id;
    
-   private int lazyAckBatchSize;
+   private final int lazyAckBatchSize;
    
+   private final boolean cacheProducers;
+   
+   private final ExecutorService executor;
+   
    private volatile boolean closed;
       
    private boolean acked = true;
@@ -117,25 +123,23 @@
    
    private boolean deliveryExpired;   
 
-   private ExecutorService executor;
-
-   private RemotingConnection remotingConnection;
-         
-   private ClientConnectionInternal connection;
+   private final RemotingConnection remotingConnection;         
    
-   private Set<ClientBrowser> browsers = new HashSet<ClientBrowser>();
+   private final Set<ClientBrowser> browsers = new HashSet<ClientBrowser>();
    
-   private Set<ClientProducer> producers = new HashSet<ClientProducer>();
+   private final Set<ClientProducer> producers = new HashSet<ClientProducer>();
    
-   private Map<String, ClientConsumerInternal> consumers = new HashMap<String, ClientConsumerInternal>();
+   private final Map<String, ClientConsumerInternal> consumers = new HashMap<String, ClientConsumerInternal>();
    
+   private final Map<String, ClientProducer> producerCache;
+   
    //For testing only
    private boolean forceNotSameRM;
    
    // Constructors ---------------------------------------------------------------------------------
    
-   public ClientSessionImpl(ClientConnectionInternal connection, String id,
-                            int lazyAckBatchSize) throws MessagingException
+   public ClientSessionImpl(final ClientConnectionInternal connection, final String id,
+                            final int lazyAckBatchSize, final boolean cacheProducers) throws MessagingException
    {
       this.id = id;
       
@@ -143,9 +147,20 @@
       
       this.remotingConnection = connection.getRemotingConnection();
       
+      this.cacheProducers = cacheProducers;
+      
       executor = Executors.newSingleThreadExecutor();
       
-      this.lazyAckBatchSize = lazyAckBatchSize;   
+      this.lazyAckBatchSize = lazyAckBatchSize;
+      
+      if (cacheProducers)
+      {
+      	producerCache = new HashMap<String, ClientProducer>();
+      }
+      else
+      {
+      	producerCache = null;
+      }
    }
    
    // ClientSession implementation -----------------------------------------------------------------
@@ -263,11 +278,26 @@
       return browser;
    }
 
-   public ClientProducer createProducer() throws MessagingException
+   public ClientProducer createProducer(String address) throws MessagingException
    {
       checkClosed();
+      
+      ClientProducer producer = null;
+      
+      if (cacheProducers)
+      {
+      	producer = producerCache.remove(address);
+      }
 
-      ClientProducer producer = new ClientProducerImpl(this);
+      if (producer == null)
+      {
+      	SessionCreateProducerMessage request = new SessionCreateProducerMessage(address);
+      	
+      	SessionCreateProducerResponseMessage response =
+      		(SessionCreateProducerResponseMessage)remotingConnection.send(id, request);
+      	
+      	producer = new ClientProducerImpl(this, response.getProducerID(), address, remotingConnection);      	
+      }
 
       producers.add(producer);
 
@@ -353,6 +383,11 @@
       {
          closeChildren();
          
+         if (cacheProducers)
+         {
+         	producerCache.clear();
+         }
+         
          //Make sure any remaining acks make it to the server
          
          acknowledgeInternal(false);      
@@ -418,6 +453,11 @@
    public void removeProducer(ClientProducer producer)
    {
       producers.remove(producer);
+      
+      if (cacheProducers && !producerCache.containsKey(producer.getAddress()))
+      {
+      	producerCache.put(producer.getAddress(), producer);
+      }
    }
    
    public void removeBrowser(ClientBrowser browser)
@@ -425,15 +465,8 @@
       browsers.remove(browser);
    }
    
-   public void send(String address, Message m) throws MessagingException
-   {
-      checkClosed();
+   
       
-      SessionSendMessage message = new SessionSendMessage(address, m.copy());
-      
-      remotingConnection.send(id, message, !m.isDurable());
-   }
-      
    // XAResource implementation --------------------------------------------------------------------
    
    public void commit(Xid xid, boolean onePhase) throws XAException

Modified: trunk/src/main/org/jboss/jms/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientSessionInternal.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientSessionInternal.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -33,7 +33,5 @@
    
    void removeProducer(ClientProducer producer);
    
-   void removeBrowser(ClientBrowser browser);
-   
-   void send(String address, Message message) throws MessagingException;   
+   void removeBrowser(ClientBrowser browser);  
 }

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -131,8 +131,6 @@
    {
       if (enableFlowControl && availableTokens.get() == 0)
       {
-         if (trace) { log.trace(this + " is NOT accepting messages!"); }
-
          return HandleStatus.BUSY;
       }
 

Added: trunk/src/main/org/jboss/jms/server/endpoint/ServerProducer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerProducer.java	                        (rev 0)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerProducer.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -0,0 +1,23 @@
+package org.jboss.jms.server.endpoint;
+
+import org.jboss.messaging.core.Message;
+
+/**
+ * 
+ * A ServerProducer
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface ServerProducer
+{
+	String getID();
+	
+	void close() throws Exception;
+	
+	void send(String address, Message msg) throws Exception;
+	
+	void sendCredits(int credits);
+	
+	int getNumCredits();
+}

Added: trunk/src/main/org/jboss/jms/server/endpoint/ServerProducerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerProducerEndpoint.java	                        (rev 0)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerProducerEndpoint.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -0,0 +1,90 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.jms.server.endpoint;
+
+import java.util.UUID;
+
+import org.jboss.messaging.core.Message;
+
+/**
+ * 
+ * A ServerProducerEndpoint
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class ServerProducerEndpoint implements ServerProducer
+{
+	private final String id;
+	
+	private final ServerSession session;
+	
+	private final String address;
+		
+	// Constructors ----------------------------------------------------------------
+	
+	public ServerProducerEndpoint(final ServerSession session, final String address)
+	{
+		id = UUID.randomUUID().toString();
+      
+		this.session = session;
+		
+		this.address = address;
+	}
+	
+	// ServerProducer implementation --------------------------------------------
+	
+	public String getID()
+	{
+		return id;
+	}
+	
+	public void close() throws Exception
+	{
+		session.removeProducer(id);
+	}
+	
+	public void send(final String address, final Message message) throws Exception
+	{
+		if (address != null)
+		{
+			//Anonymous producer - no flow control
+			session.send(address, message);
+		}
+		else
+		{			
+			session.send(this.address, message);
+		}
+	}
+	
+	public int getNumCredits()
+	{
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
+	public void sendCredits(int credits)
+	{
+		// TODO Auto-generated method stub
+		
+	}
+}

Added: trunk/src/main/org/jboss/jms/server/endpoint/ServerProducerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerProducerPacketHandler.java	                        (rev 0)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerProducerPacketHandler.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -0,0 +1,87 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.jms.server.endpoint;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.CLOSE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONS_FLOWTOKEN;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.PROD_SEND;
+
+import org.jboss.messaging.core.remoting.PacketSender;
+import org.jboss.messaging.core.remoting.wireformat.ConsumerFlowTokenMessage;
+import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.wireformat.Packet;
+import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.core.remoting.wireformat.ProducerSendMessage;
+import org.jboss.messaging.util.MessagingException;
+
+
+public class ServerProducerPacketHandler extends ServerPacketHandlerSupport
+{
+	private final ServerProducer producer;
+	
+	public ServerProducerPacketHandler(final ServerProducer producer)
+	{
+		this.producer = producer;
+	}
+
+   public String getID()
+   {
+      return producer.getID();
+   }
+
+   public Packet doHandle(final Packet packet, final PacketSender sender) throws Exception
+   {
+      Packet response = null;
+
+      PacketType type = packet.getType();
+      
+      if (type == PROD_SEND)
+      {
+         ProducerSendMessage message = (ProducerSendMessage) packet;
+         
+         producer.send(message.getAddress(), message.getMessage());
+      }
+      else if (type == CLOSE)
+      {
+         producer.close();
+      }
+      else
+      {
+         throw new MessagingException(MessagingException.UNSUPPORTED_PACKET,
+               "Unsupported packet " + type);
+      }
+
+      // reply if necessary
+      if (response == null && packet.isOneWay() == false)
+      {
+         response = new NullPacket();               
+      }
+      
+      return response;
+   }
+
+   @Override
+   public String toString()
+   {
+      return "ServerConsumerEndpointPacketHandler[id=" + producer.getID() + "]";
+   }
+}

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSession.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSession.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -32,6 +32,7 @@
 import org.jboss.messaging.core.remoting.wireformat.SessionBindingQueryResponseMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionCreateBrowserResponseMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionCreateConsumerResponseMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCreateProducerResponseMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryResponseMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionXAResponseMessage;
@@ -51,6 +52,8 @@
 	
 	void removeConsumer(String consumerID) throws Exception;
 	
+	void removeProducer(String producerID) throws Exception;
+	
 	void close() throws Exception;
 	
 	void setStarted(boolean started) throws Exception;
@@ -59,7 +62,7 @@
 	
 	void promptDelivery(Queue queue);
 	
-	boolean send(String address, Message msg) throws Exception;
+	void send(String address, Message msg) throws Exception;
 
    void acknowledge(long deliveryID, boolean allUpTo) throws Exception;
 
@@ -103,6 +106,8 @@
 
    SessionCreateConsumerResponseMessage  createConsumer(String queueName, String filterString,
                      boolean noLocal, boolean autoDeleteQueue, int prefetchSize) throws Exception;
+   
+   SessionCreateProducerResponseMessage createProducer(String address) throws Exception;   
 
    SessionQueueQueryResponseMessage executeQueueQuery(SessionQueueQueryMessage request) throws Exception;
 

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -57,6 +57,7 @@
 import org.jboss.messaging.core.remoting.wireformat.SessionBindingQueryResponseMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionCreateBrowserResponseMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionCreateConsumerResponseMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCreateProducerResponseMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryResponseMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionXAResponseMessage;
@@ -115,6 +116,8 @@
    private final Map<String, ServerConsumer> consumers = new ConcurrentHashMap<String, ServerConsumer>();
 
    private final Map<String, ServerBrowserEndpoint> browsers = new ConcurrentHashMap<String, ServerBrowserEndpoint>();
+   
+   private final Map<String, ServerProducer> producers = new ConcurrentHashMap<String, ServerProducer>();
 
    private final LinkedList<Delivery> deliveries = new LinkedList<Delivery>();
 
@@ -176,26 +179,36 @@
       return connection;
    }
 
-   public void removeBrowser(final String browserId) throws Exception
+   public void removeBrowser(final String browserID) throws Exception
    {
-      if (browsers.remove(browserId) == null)
+      if (browsers.remove(browserID) == null)
       {
-         throw new IllegalStateException("Cannot find browser with id " + browserId + " to remove");
+         throw new IllegalStateException("Cannot find browser with id " + browserID + " to remove");
       }
       
-      dispatcher.unregister(browserId);           
+      dispatcher.unregister(browserID);           
    }
 
-   public void removeConsumer(final String consumerId) throws Exception
+   public void removeConsumer(final String consumerID) throws Exception
    {
-      if (consumers.remove(consumerId) == null)
+      if (consumers.remove(consumerID) == null)
       {
-         throw new IllegalStateException("Cannot find consumer with id " + consumerId + " to remove");
+         throw new IllegalStateException("Cannot find consumer with id " + consumerID + " to remove");
       }
       
-      dispatcher.unregister(consumerId);           
+      dispatcher.unregister(consumerID);           
    }
    
+   public void removeProducer(final String producerID) throws Exception
+   {
+      if (producers.remove(producerID) == null)
+      {
+         throw new IllegalStateException("Cannot find producer with id " + producerID + " to remove");
+      }
+      
+      dispatcher.unregister(producerID);           
+   }
+   
    public synchronized void handleDelivery(final MessageReference ref, final ServerConsumer consumer) throws Exception
    {
       Delivery delivery = new DeliveryImpl(ref, consumer.getID(), deliveryIDSequence++, sender);
@@ -233,10 +246,17 @@
          browser.close();
       }
 
-      consumers.clear();
-
       browsers.clear();
+      
+      Map<String, ServerProducer> producersClone = new HashMap<String, ServerProducer>(producers);
 
+      for (ServerProducer producer: producersClone.values())
+      {
+         producer.close();
+      }
+
+      producers.clear();
+      
       rollback();
 
       executor.shutdown();
@@ -258,7 +278,7 @@
       });
    }
    
-   public boolean send(final String address, final Message msg) throws Exception
+   public void send(final String address, final Message msg) throws Exception
    {
       //check the address exists, if it doesnt add if the user has the correct privileges
       if (!postOffice.containsAllowableAddress(address))
@@ -287,14 +307,8 @@
 
       postOffice.route(address, msg);
 
-      if (msg.getReferences().isEmpty())
+      if (!msg.getReferences().isEmpty())
       {
-         // Didn't route anywhere
-
-         return false;
-      }
-      else
-      {
          if (autoCommitSends)
          {
             if (msg.getNumDurableReferences() != 0)
@@ -308,8 +322,6 @@
          {
             tx.addMessage(msg);
          }
-
-         return true;
       }
    }
 
@@ -885,10 +897,7 @@
       SessionCreateConsumerResponseMessage response = new SessionCreateConsumerResponseMessage(consumer.getID(),
             prefetchSize);
 
-      synchronized (consumers)
-      {
-         consumers.put(consumer.getID(), consumer);
-      }
+      consumers.put(consumer.getID(), consumer);      
 
       log.trace(this + " created and registered " + consumer);
 
@@ -953,7 +962,7 @@
    public SessionCreateBrowserResponseMessage createBrowser(final String queueName, final String selector)
          throws Exception
    {
-      if(!postOffice.containsAllowableAddress(queueName))
+      if (!postOffice.containsAllowableAddress(queueName))
       {
          try
          {
@@ -966,6 +975,7 @@
             throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
          }
       }
+      
       Binding binding = postOffice.getBinding(queueName);
 
       if (binding == null)
@@ -976,20 +986,25 @@
       
       ServerBrowserEndpoint browser = new ServerBrowserEndpoint(this, binding.getQueue(), selector);
 
-      // still need to synchronized since close() can come in on a different
-      // thread
-      synchronized (browsers)
-      {
-         browsers.put(browser.getID(), browser);
-      }
-
+      browsers.put(browser.getID(), browser);
+      
       dispatcher.register(browser.newHandler());
 
-      log.trace(this + " created and registered " + browser);
-
       return new SessionCreateBrowserResponseMessage(browser.getID());
    }
    
+   public SessionCreateProducerResponseMessage createProducer(String address) throws Exception
+   {
+   	ServerProducerEndpoint producer = new ServerProducerEndpoint(this, address);
+   	
+   	producers.put(producer.getID(), producer);
+   	
+   	dispatcher.register(new ServerProducerPacketHandler(producer));
+   	
+   	return new SessionCreateProducerResponseMessage(producer.getID());
+   }
+
+   
    // Public ---------------------------------------------------------------------------------------------
    
    public String toString()

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionPacketHandler.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionPacketHandler.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -28,11 +28,11 @@
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_COMMIT;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEBROWSER;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATECONSUMER;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEPRODUCER;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEQUEUE;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_DELETE_QUEUE;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_QUEUEQUERY;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_ROLLBACK;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_SEND;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_COMMIT;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_END;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_FORGET;
@@ -60,11 +60,11 @@
 import org.jboss.messaging.core.remoting.wireformat.SessionCancelMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionCreateBrowserMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionCreateConsumerMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCreateProducerMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionCreateQueueMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionDeleteQueueMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionRemoveAddressMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionSendMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionXACommitMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionXAEndMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionXAForgetMessage;
@@ -79,7 +79,6 @@
 import org.jboss.messaging.core.remoting.wireformat.SessionXAStartMessage;
 import org.jboss.messaging.util.MessagingException;
 
-
 /**
  * 
  * A ServerSessionPacketHandler
@@ -113,14 +112,8 @@
       PacketType type = packet.getType();
 
       // TODO use a switch for this
-      if (type == SESS_SEND)
+      if (type == SESS_CREATECONSUMER)
       {
-         SessionSendMessage message = (SessionSendMessage) packet;
-
-         session.send(message.getAddress(), message.getMessage());
-      }
-      else if (type == SESS_CREATECONSUMER)
-      {
          SessionCreateConsumerMessage request = (SessionCreateConsumerMessage) packet;
 
          response = session.createConsumer(request.getQueueName(), request
@@ -159,6 +152,12 @@
          response = session.createBrowser(request.getQueueName(), request
                .getFilterString());
       }
+      else if (type == SESS_CREATEPRODUCER)
+      {
+         SessionCreateProducerMessage request = (SessionCreateProducerMessage) packet;
+
+         response = session.createProducer(request.getAddress());
+      }
       else if (type == CLOSE)
       {
       	session.close();

Added: trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionCreateProducerMessageCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionCreateProducerMessageCodec.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionCreateProducerMessageCodec.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -0,0 +1,73 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEPRODUCER;
+
+import org.jboss.messaging.core.remoting.wireformat.SessionCreateProducerMessage;
+
+/**
+ * 
+ * A SessionCreateProducerMessageCodec
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class SessionCreateProducerMessageCodec extends
+      AbstractPacketCodec<SessionCreateProducerMessage>
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public SessionCreateProducerMessageCodec()
+   {
+      super(SESS_CREATEPRODUCER);
+   }
+
+   // Public --------------------------------------------------------
+
+   // AbstractPacketCodec overrides ---------------------------------
+
+   @Override
+   protected void encodeBody(SessionCreateProducerMessage request, RemotingBuffer out) throws Exception
+   {
+      String address = request.getAddress();
+     
+      int bodyLength = sizeof(address);
+
+      out.putInt(bodyLength);
+      out.putNullableString(address);
+   }
+
+   @Override
+   protected SessionCreateProducerMessage decodeBody(RemotingBuffer in)
+         throws Exception
+   {
+      int bodyLength = in.getInt();
+      if (in.remaining() < bodyLength)
+      {
+         return null;
+      }
+
+      String address = in.getNullableString();
+
+      return new SessionCreateProducerMessage(address);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Added: trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionCreateProducerResponseMessageCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionCreateProducerResponseMessageCodec.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionCreateProducerResponseMessageCodec.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -0,0 +1,74 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEPRODUCER_RESP;
+
+import org.jboss.messaging.core.remoting.wireformat.SessionCreateProducerResponseMessage;
+
+/**
+ * 
+ * A SessionCreateProducerResponseMessageCodec
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class SessionCreateProducerResponseMessageCodec extends
+      AbstractPacketCodec<SessionCreateProducerResponseMessage>
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public SessionCreateProducerResponseMessageCodec()
+   {
+      super(SESS_CREATEPRODUCER_RESP);
+   }
+
+   // Public --------------------------------------------------------
+
+   // AbstractPacketCodec overrides ---------------------------------
+
+   @Override
+   protected void encodeBody(SessionCreateProducerResponseMessage response,
+                             RemotingBuffer out) throws Exception
+   {
+      String producerID = response.getProducerID();
+
+      int bodyLength = sizeof(producerID);
+       
+      out.putInt(bodyLength);
+      out.putNullableString(producerID);
+   }
+
+   @Override
+   protected SessionCreateProducerResponseMessage decodeBody(RemotingBuffer in)
+         throws Exception
+   {
+      int bodyLength = in.getInt();
+      if (in.remaining() < bodyLength)
+      {
+         return null;
+      }
+
+      String producerID = in.getNullableString();
+ 
+      return new SessionCreateProducerResponseMessage(producerID);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Deleted: trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionSendMessageCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionSendMessageCodec.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionSendMessageCodec.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -1,79 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.messaging.core.remoting.codec;
-
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_SEND;
-
-import org.jboss.messaging.core.Message;
-import org.jboss.messaging.core.impl.MessageImpl;
-import org.jboss.messaging.core.remoting.wireformat.SessionSendMessage;
-import org.jboss.messaging.util.StreamUtils;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
- */
-public class SessionSendMessageCodec extends AbstractPacketCodec<SessionSendMessage>
-{
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public SessionSendMessageCodec()
-   {
-      super(SESS_SEND);
-   }
-
-   // Public --------------------------------------------------------
-
-   // AbstractPacketCodec overrides ---------------------------------
-
-   @Override
-   protected void encodeBody(SessionSendMessage message, RemotingBuffer out) throws Exception
-   {
-      String address = message.getAddress();
-      byte[] encodedMsg = StreamUtils.toBytes(message.getMessage());   
-
-      int bodyLength = sizeof(address) + INT_LENGTH + encodedMsg.length;
-
-      out.putInt(bodyLength);
-      out.putNullableString(address);
-      out.putInt(encodedMsg.length);
-      out.put(encodedMsg);
-   }
-
-   @Override
-   protected SessionSendMessage decodeBody(RemotingBuffer in)
-         throws Exception
-   {
-      int bodyLength = in.getInt();
-      if (in.remaining() < bodyLength)
-      {
-         return null;
-      }
-
-      String address = in.getNullableString();
-      int msgLength = in.getInt();
-      byte[] encodedMsg = new byte[msgLength];
-      in.get(encodedMsg);
-      Message message = new MessageImpl();
-      StreamUtils.fromBytes(message, encodedMsg);
-
-      return new SessionSendMessage(address, message);
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private ----------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -20,6 +20,7 @@
 import org.jboss.messaging.core.remoting.codec.MessagingExceptionMessageCodec;
 import org.jboss.messaging.core.remoting.codec.PingCodec;
 import org.jboss.messaging.core.remoting.codec.PongCodec;
+import org.jboss.messaging.core.remoting.codec.ProducerSendMessageCodec;
 import org.jboss.messaging.core.remoting.codec.RemotingBuffer;
 import org.jboss.messaging.core.remoting.codec.SessionAcknowledgeMessageCodec;
 import org.jboss.messaging.core.remoting.codec.SessionAddAddressMessageCodec;
@@ -34,12 +35,13 @@
 import org.jboss.messaging.core.remoting.codec.SessionCreateBrowserResponseMessageCodec;
 import org.jboss.messaging.core.remoting.codec.SessionCreateConsumerMessageCodec;
 import org.jboss.messaging.core.remoting.codec.SessionCreateConsumerResponseMessageCodec;
+import org.jboss.messaging.core.remoting.codec.SessionCreateProducerMessageCodec;
+import org.jboss.messaging.core.remoting.codec.SessionCreateProducerResponseMessageCodec;
 import org.jboss.messaging.core.remoting.codec.SessionCreateQueueMessageCodec;
 import org.jboss.messaging.core.remoting.codec.SessionDeleteQueueMessageCodec;
 import org.jboss.messaging.core.remoting.codec.SessionQueueQueryMessageCodec;
 import org.jboss.messaging.core.remoting.codec.SessionQueueQueryResponseMessageCodec;
 import org.jboss.messaging.core.remoting.codec.SessionRemoveAddressMessageCodec;
-import org.jboss.messaging.core.remoting.codec.SessionSendMessageCodec;
 import org.jboss.messaging.core.remoting.codec.SessionXACommitMessageCodec;
 import org.jboss.messaging.core.remoting.codec.SessionXAEndMessageCodec;
 import org.jboss.messaging.core.remoting.codec.SessionXAForgetMessageCodec;
@@ -70,6 +72,7 @@
 import org.jboss.messaging.core.remoting.wireformat.PacketType;
 import org.jboss.messaging.core.remoting.wireformat.Ping;
 import org.jboss.messaging.core.remoting.wireformat.Pong;
+import org.jboss.messaging.core.remoting.wireformat.ProducerSendMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionAcknowledgeMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionAddAddressMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionBindingQueryMessage;
@@ -87,6 +90,8 @@
 import org.jboss.messaging.core.remoting.wireformat.SessionCreateBrowserResponseMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionCreateConsumerMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionCreateConsumerResponseMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCreateProducerMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCreateProducerResponseMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionCreateQueueMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionDeleteQueueMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryMessage;
@@ -94,7 +99,6 @@
 import org.jboss.messaging.core.remoting.wireformat.SessionRecoverMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionRemoveAddressMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionRollbackMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionSendMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionXACommitMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionXAEndMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionXAForgetMessage;
@@ -153,12 +157,14 @@
 
       addCodec(ConnectionCreateSessionResponseMessage.class, ConnectionCreateSessionResponseMessageCodec.class);
 
-      addCodec(SessionSendMessage.class, SessionSendMessageCodec.class);
-
       addCodec(SessionCreateConsumerMessage.class, SessionCreateConsumerMessageCodec.class);
 
       addCodec(SessionCreateConsumerResponseMessage.class, SessionCreateConsumerResponseMessageCodec.class);
+      
+      addCodec(SessionCreateProducerMessage.class, SessionCreateProducerMessageCodec.class);
 
+      addCodec(SessionCreateProducerResponseMessage.class, SessionCreateProducerResponseMessageCodec.class);
+
       addCodec(SessionCreateBrowserMessage.class, SessionCreateBrowserMessageCodec.class);
 
       addCodec(SessionCreateBrowserResponseMessage.class, SessionCreateBrowserResponseMessageCodec.class);
@@ -256,6 +262,9 @@
       addCodec(SessionBindingQueryResponseMessage.class, SessionBindingQueryResponseMessageCodec.class);
       
       addCodec(SessionDeleteQueueMessage.class, SessionDeleteQueueMessageCodec.class);
+      
+      addCodec(ProducerSendMessage.class, ProducerSendMessageCodec.class);
+
    }
 
    // Public --------------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConnectionCreateSessionMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConnectionCreateSessionMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConnectionCreateSessionMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -30,7 +30,7 @@
 
    // Constructors --------------------------------------------------
 
-   public ConnectionCreateSessionMessage(boolean xa, boolean autoCommitSends, boolean autoCommitAcks)
+   public ConnectionCreateSessionMessage(final boolean xa, final boolean autoCommitSends, final boolean autoCommitAcks)
    {
       super(CONN_CREATESESSION);
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConnectionCreateSessionResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConnectionCreateSessionResponseMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConnectionCreateSessionResponseMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -26,7 +26,7 @@
 
    // Constructors --------------------------------------------------
 
-   public ConnectionCreateSessionResponseMessage(String sessionID)
+   public ConnectionCreateSessionResponseMessage(final String sessionID)
    {
       super(PacketType.CONN_CREATESESSION_RESP);
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConsumerFlowTokenMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConsumerFlowTokenMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConsumerFlowTokenMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -26,7 +26,7 @@
 
    // Constructors --------------------------------------------------
 
-   public ConsumerFlowTokenMessage(int tokens)
+   public ConsumerFlowTokenMessage(final int tokens)
    {
       super(CONS_FLOWTOKEN);
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/CreateConnectionRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/CreateConnectionRequest.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/CreateConnectionRequest.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -27,15 +27,15 @@
    private final String clientVMID;
    private final String username;
    private final String password;
-   private int prefetchSize;
+   private final int prefetchSize;
 
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public CreateConnectionRequest(byte version,
-         String remotingSessionID, String clientVMID, String username, String password,
-         int prefetchSize)
+   public CreateConnectionRequest(final byte version,
+         final String remotingSessionID, final String clientVMID, final String username, final String password,
+         final int prefetchSize)
    {
       super(CREATECONNECTION);
 
@@ -95,16 +95,9 @@
       return prefetchSize;
    }
 
-   public void setPrefetchSize(int prefetchSize)
-   {
-      this.prefetchSize = prefetchSize;
-   }
-
    // Protected -----------------------------------------------------
 
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------
-
-
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/CreateConnectionResponse.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/CreateConnectionResponse.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/CreateConnectionResponse.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -27,7 +27,7 @@
 
    // Constructors --------------------------------------------------
 
-   public CreateConnectionResponse(String connectionID)
+   public CreateConnectionResponse(final String connectionID)
    {
       super(CREATECONNECTION_RESP);
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/DeliverMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/DeliverMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/DeliverMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -31,7 +31,7 @@
 
    // Constructors --------------------------------------------------
 
-   public DeliverMessage(Message message, long deliveryID)
+   public DeliverMessage(final Message message, final long deliveryID)
    {
       super(SESS_DELIVER);
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/MessagingExceptionMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/MessagingExceptionMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/MessagingExceptionMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -28,7 +28,7 @@
 
    // Constructors --------------------------------------------------
 
-   public MessagingExceptionMessage(MessagingException exception)
+   public MessagingExceptionMessage(final MessagingException exception)
    {
       super(EXCEPTION);
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -36,51 +36,55 @@
 
    // Session   
    SESS_CREATECONSUMER                 ((byte)41),
-   SESS_CREATECONSUMER_RESP            ((byte)42),   
-   SESS_CREATEBROWSER                  ((byte)43),
-   SESS_CREATEBROWSER_RESP             ((byte)44),
-   SESS_SEND                           ((byte)45),
-   SESS_DELIVER                        ((byte)46),  
-   SESS_ACKNOWLEDGE                    ((byte)47),
-   SESS_RECOVER                        ((byte)48),
-   SESS_COMMIT                         ((byte)49),
-   SESS_ROLLBACK                       ((byte)50),
-   SESS_CANCEL                         ((byte)51),
-   SESS_QUEUEQUERY                     ((byte)52),
-   SESS_QUEUEQUERY_RESP                ((byte)53),
-   SESS_CREATEQUEUE                    ((byte)54),
-   SESS_DELETE_QUEUE                   ((byte)55),   
-   SESS_ADD_ADDRESS                    ((byte)56),
-   SESS_REMOVE_ADDRESS                 ((byte)57),
-   SESS_BINDINGQUERY                   ((byte)58),
-   SESS_BINDINGQUERY_RESP              ((byte)59),  
-   SESS_BROWSER_RESET                  ((byte)60),
-   SESS_BROWSER_HASNEXTMESSAGE         ((byte)61),
-   SESS_BROWSER_HASNEXTMESSAGE_RESP    ((byte)62),
-   SESS_BROWSER_NEXTMESSAGEBLOCK       ((byte)63),
-   SESS_BROWSER_NEXTMESSAGEBLOCK_RESP  ((byte)64),
-   SESS_BROWSER_NEXTMESSAGE            ((byte)65),
-   SESS_BROWSER_NEXTMESSAGE_RESP       ((byte)66),      
-   SESS_XA_START                       ((byte)67),
-   SESS_XA_END                         ((byte)68),
-   SESS_XA_COMMIT                      ((byte)69),
-   SESS_XA_PREPARE                     ((byte)70),
-   SESS_XA_RESP                        ((byte)71),
-   SESS_XA_ROLLBACK                    ((byte)72),
-   SESS_XA_JOIN                        ((byte)73),
-   SESS_XA_SUSPEND                     ((byte)74),
-   SESS_XA_RESUME                      ((byte)75),
-   SESS_XA_FORGET                      ((byte)76),
-   SESS_XA_INDOUBT_XIDS                ((byte)77),
-   SESS_XA_INDOUBT_XIDS_RESP           ((byte)78),
-   SESS_XA_SET_TIMEOUT                 ((byte)79),
-   SESS_XA_SET_TIMEOUT_RESP            ((byte)80),
-   SESS_XA_GET_TIMEOUT                 ((byte)81),
-   SESS_XA_GET_TIMEOUT_RESP            ((byte)82),
+   SESS_CREATECONSUMER_RESP            ((byte)42),
+   SESS_CREATEPRODUCER                 ((byte)43),
+   SESS_CREATEPRODUCER_RESP            ((byte)44),
+   SESS_CREATEBROWSER                  ((byte)45),
+   SESS_CREATEBROWSER_RESP             ((byte)46),   
+   SESS_DELIVER                        ((byte)47),  
+   SESS_ACKNOWLEDGE                    ((byte)48),
+   SESS_RECOVER                        ((byte)49),
+   SESS_COMMIT                         ((byte)50),
+   SESS_ROLLBACK                       ((byte)51),
+   SESS_CANCEL                         ((byte)52),
+   SESS_QUEUEQUERY                     ((byte)53),
+   SESS_QUEUEQUERY_RESP                ((byte)54),
+   SESS_CREATEQUEUE                    ((byte)55),
+   SESS_DELETE_QUEUE                   ((byte)56),   
+   SESS_ADD_ADDRESS                    ((byte)57),
+   SESS_REMOVE_ADDRESS                 ((byte)58),
+   SESS_BINDINGQUERY                   ((byte)59),
+   SESS_BINDINGQUERY_RESP              ((byte)60),  
+   SESS_BROWSER_RESET                  ((byte)61),
+   SESS_BROWSER_HASNEXTMESSAGE         ((byte)62),
+   SESS_BROWSER_HASNEXTMESSAGE_RESP    ((byte)63),
+   SESS_BROWSER_NEXTMESSAGEBLOCK       ((byte)64),
+   SESS_BROWSER_NEXTMESSAGEBLOCK_RESP  ((byte)65),
+   SESS_BROWSER_NEXTMESSAGE            ((byte)66),
+   SESS_BROWSER_NEXTMESSAGE_RESP       ((byte)67),      
+   SESS_XA_START                       ((byte)68),
+   SESS_XA_END                         ((byte)69),
+   SESS_XA_COMMIT                      ((byte)70),
+   SESS_XA_PREPARE                     ((byte)71),
+   SESS_XA_RESP                        ((byte)72),
+   SESS_XA_ROLLBACK                    ((byte)73),
+   SESS_XA_JOIN                        ((byte)74),
+   SESS_XA_SUSPEND                     ((byte)75),
+   SESS_XA_RESUME                      ((byte)76),
+   SESS_XA_FORGET                      ((byte)77),
+   SESS_XA_INDOUBT_XIDS                ((byte)78),
+   SESS_XA_INDOUBT_XIDS_RESP           ((byte)79),
+   SESS_XA_SET_TIMEOUT                 ((byte)80),
+   SESS_XA_SET_TIMEOUT_RESP            ((byte)81),
+   SESS_XA_GET_TIMEOUT                 ((byte)82),
+   SESS_XA_GET_TIMEOUT_RESP            ((byte)83),
        
    // Consumer 
-   CONS_FLOWTOKEN                      ((byte)90);
+   CONS_FLOWTOKEN                      ((byte)90),
    
+   //Producer
+   PROD_SEND                           ((byte)91);
+   
 
    private final byte type;
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/Ping.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/Ping.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/Ping.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -27,7 +27,7 @@
 
    // Constructors --------------------------------------------------
 
-   public Ping(String sessionID)
+   public Ping(final String sessionID)
    {
       super(PING);
       

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/Pong.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/Pong.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/Pong.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -29,7 +29,7 @@
 
    // Constructors --------------------------------------------------
 
-   public Pong(String sessionID, boolean sessionFailed)
+   public Pong(final String sessionID, final boolean sessionFailed)
    {
       super(PONG);
 

Added: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ProducerSendMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ProducerSendMessage.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ProducerSendMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -0,0 +1,68 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.PROD_SEND;
+
+import org.jboss.messaging.core.Message;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * 
+ * @version <tt>$Revision$</tt>
+ */
+public class ProducerSendMessage extends AbstractPacket
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+	private String address;
+	
+   private final Message message;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ProducerSendMessage(final String address, final Message message)
+   {
+      super(PROD_SEND);
+
+      this.address = address;
+      
+      this.message = message;
+   }
+
+   // Public --------------------------------------------------------
+
+   public String getAddress()
+   {
+   	return address;
+   }
+   
+   public Message getMessage()
+   {
+      return message;
+   }
+   
+   @Override
+   public String toString()
+   {
+      return getParentString() + ", address=" + address + ", message=" + message
+            + "]";
+   }
+   
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionAcknowledgeMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionAcknowledgeMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionAcknowledgeMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -18,15 +18,15 @@
 
    // Attributes ----------------------------------------------------
    
-   private long deliveryID;
+   private final long deliveryID;
    
-   private boolean allUpTo;
+   private final boolean allUpTo;
 
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionAcknowledgeMessage(long deliveryID, boolean allUpTo)
+   public SessionAcknowledgeMessage(final long deliveryID, final boolean allUpTo)
    {
       super(PacketType.SESS_ACKNOWLEDGE);
       

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionAddAddressMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionAddAddressMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionAddAddressMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -22,13 +22,13 @@
 
    // Attributes ----------------------------------------------------
    
-   private String address;
+   private final String address;
    
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionAddAddressMessage(String address)
+   public SessionAddAddressMessage(final String address)
    {
       super(SESS_ADD_ADDRESS);
       

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBindingQueryMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBindingQueryMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBindingQueryMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -16,9 +16,9 @@
  */
 public class SessionBindingQueryMessage extends AbstractPacket
 {
-   private String address;
+   private final String address;
 
-   public SessionBindingQueryMessage(String address)
+   public SessionBindingQueryMessage(final String address)
    {
       super(PacketType.SESS_BINDINGQUERY);
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBindingQueryResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBindingQueryResponseMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBindingQueryResponseMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -13,11 +13,11 @@
  */
 public class SessionBindingQueryResponseMessage extends AbstractPacket
 {
-   private boolean exists;
+   private final boolean exists;
    
-   private List<String> queueNames;
+   private final List<String> queueNames;
    
-   public SessionBindingQueryResponseMessage(boolean exists, List<String> queueNames)
+   public SessionBindingQueryResponseMessage(final boolean exists, final List<String> queueNames)
    {
       super(SESS_BINDINGQUERY_RESP);
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserHasNextMessageMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserHasNextMessageMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserHasNextMessageMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -27,7 +27,7 @@
 
    public SessionBrowserHasNextMessageMessage()
    {
-    super(SESS_BROWSER_HASNEXTMESSAGE);
+      super(SESS_BROWSER_HASNEXTMESSAGE);
    }
 
    // Public --------------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserHasNextMessageResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserHasNextMessageResponseMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserHasNextMessageResponseMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -27,7 +27,7 @@
 
    // Constructors --------------------------------------------------
 
-   public SessionBrowserHasNextMessageResponseMessage(boolean hasNext)
+   public SessionBrowserHasNextMessageResponseMessage(final boolean hasNext)
    {
       super(SESS_BROWSER_HASNEXTMESSAGE_RESP);
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserNextMessageBlockMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserNextMessageBlockMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserNextMessageBlockMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -27,7 +27,7 @@
 
    // Constructors --------------------------------------------------
 
-   public SessionBrowserNextMessageBlockMessage(long maxMessages)
+   public SessionBrowserNextMessageBlockMessage(final long maxMessages)
    {
       super(SESS_BROWSER_NEXTMESSAGEBLOCK);
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserNextMessageBlockResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserNextMessageBlockResponseMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserNextMessageBlockResponseMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -30,7 +30,7 @@
 
    // Constructors --------------------------------------------------
 
-   public SessionBrowserNextMessageBlockResponseMessage(Message[] messages)
+   public SessionBrowserNextMessageBlockResponseMessage(final Message[] messages)
    {
       super(SESS_BROWSER_NEXTMESSAGEBLOCK_RESP);
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserNextMessageResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserNextMessageResponseMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserNextMessageResponseMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -26,7 +26,7 @@
 
    // Constructors --------------------------------------------------
 
-   public SessionBrowserNextMessageResponseMessage(Message message)
+   public SessionBrowserNextMessageResponseMessage(final Message message)
    {
       super(PacketType.SESS_BROWSER_NEXTMESSAGE_RESP);
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserResetMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserResetMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserResetMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -27,7 +27,7 @@
 
    public SessionBrowserResetMessage()
    {
-    super(SESS_BROWSER_RESET);
+      super(SESS_BROWSER_RESET);
    }
 
    // Public --------------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCancelMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCancelMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCancelMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -20,15 +20,15 @@
 
    // Attributes ----------------------------------------------------
    
-   private long deliveryID;
+   private final long deliveryID;
    
-   private boolean expired;
+   private final boolean expired;
 
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionCancelMessage(long deliveryID, boolean expired)
+   public SessionCancelMessage(final long deliveryID, final boolean expired)
    {
       super(SESS_CANCEL);
       

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateBrowserMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateBrowserMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateBrowserMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -22,13 +22,14 @@
    // Attributes ----------------------------------------------------
 
    private final String queueName;
+   
    private final String filterString;
 
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionCreateBrowserMessage(String queueName, String filterString)
+   public SessionCreateBrowserMessage(final String queueName, final String filterString)
    {
       super(SESS_CREATEBROWSER);
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateBrowserResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateBrowserResponseMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateBrowserResponseMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -28,7 +28,7 @@
 
    // Constructors --------------------------------------------------
 
-   public SessionCreateBrowserResponseMessage(String browserID)
+   public SessionCreateBrowserResponseMessage(final String browserID)
    {
       super(SESS_CREATEBROWSER_RESP);
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateConsumerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateConsumerMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateConsumerMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -18,16 +18,20 @@
 
    // Attributes ----------------------------------------------------
 
-   private String queueName;
-   private String filterString;
-   private boolean noLocal;
-   private boolean autoDeleteQueue;
+   private final String queueName;
+   
+   private final String filterString;
+   
+   private final boolean noLocal;
+   
+   private final boolean autoDeleteQueue;
       
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionCreateConsumerMessage(String queueName, String filterString, boolean noLocal, boolean autoDeleteQueue)
+   public SessionCreateConsumerMessage(final String queueName, final String filterString,
+   		                              final boolean noLocal, final boolean autoDeleteQueue)
    {
       super(PacketType.SESS_CREATECONSUMER);
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateConsumerResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateConsumerResponseMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateConsumerResponseMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -23,13 +23,14 @@
    // Attributes ----------------------------------------------------
 
    private final String consumerID;
+   
    private final int prefetchSize;
 
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionCreateConsumerResponseMessage(String consumerID, int prefetchSize)
+   public SessionCreateConsumerResponseMessage(final String consumerID, final int prefetchSize)
    {
       super(SESS_CREATECONSUMER_RESP);
 

Added: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateProducerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateProducerMessage.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateProducerMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -0,0 +1,58 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * @version <tt>$Revision$</tt>
+ */
+public class SessionCreateProducerMessage extends AbstractPacket
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final String address;
+      
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public SessionCreateProducerMessage(final String address)
+   {
+      super(PacketType.SESS_CREATEPRODUCER);
+
+      this.address = address;
+   }
+
+   // Public --------------------------------------------------------
+
+   @Override
+   public String toString()
+   {
+      StringBuffer buff = new StringBuffer(getParentString());
+      buff.append(", address=" + address);
+      buff.append("]");
+      return buff.toString();
+   }
+
+   public String getAddress()
+   {
+      return address;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}
+

Added: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateProducerResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateProducerResponseMessage.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateProducerResponseMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -0,0 +1,59 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEPRODUCER_RESP;
+
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * @version <tt>$Revision$</tt>
+ */
+public class SessionCreateProducerResponseMessage extends AbstractPacket
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final String producerID;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public SessionCreateProducerResponseMessage(final String producerID)
+   {
+      super(SESS_CREATEPRODUCER_RESP);
+
+      this.producerID = producerID;
+   }
+
+   // Public --------------------------------------------------------
+
+   public String getProducerID()
+   {
+      return producerID;
+   }
+
+   @Override
+   public String toString()
+   {
+      StringBuffer buf = new StringBuffer(getParentString());
+      buf.append(", producerID=" + producerID);
+      buf.append("]");
+      return buf.toString();
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateQueueMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateQueueMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateQueueMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -20,17 +20,18 @@
 
    // Attributes ----------------------------------------------------
 
-   private String address;
-   private String queueName;
-   private String filterString;
-   private boolean durable;
-   private boolean temporary;
+   private final String address;
+   private final String queueName;
+   private final String filterString;
+   private final boolean durable;
+   private final boolean temporary;
    
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionCreateQueueMessage(String address, String queueName, String filterString, boolean durable, boolean temporary)
+   public SessionCreateQueueMessage(final String address, final String queueName,
+   		final String filterString, final boolean durable, final boolean temporary)
    {
       super(SESS_CREATEQUEUE);
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionDeleteQueueMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionDeleteQueueMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionDeleteQueueMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -20,13 +20,13 @@
 
    // Attributes ----------------------------------------------------
 
-   private String queueName;
+   private final String queueName;
 
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionDeleteQueueMessage(String queueName)
+   public SessionDeleteQueueMessage(final String queueName)
    {
       super(SESS_DELETE_QUEUE);
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionQueueQueryMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionQueueQueryMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionQueueQueryMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -16,9 +16,9 @@
  */
 public class SessionQueueQueryMessage extends AbstractPacket
 {
-   private String queueName;
+   private final String queueName;
 
-   public SessionQueueQueryMessage(String queueName)
+   public SessionQueueQueryMessage(final String queueName)
    {
       super(PacketType.SESS_QUEUEQUERY);
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionQueueQueryResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionQueueQueryResponseMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionQueueQueryResponseMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -11,31 +11,39 @@
  */
 public class SessionQueueQueryResponseMessage extends AbstractPacket
 {
-   private boolean exists;
+   private final boolean exists;
    
-   private boolean durable;
+   private final boolean durable;
    
-   private boolean temporary;
+   private final boolean temporary;
    
-   private int maxSize;
+   private final int maxSize;
    
-   private int consumerCount;
+   private final int consumerCount;
    
-   private int messageCount;
+   private final int messageCount;
    
-   private String filterString;
+   private final String filterString;
    
-   private String address;
+   private final String address;
    
-   //etc
+   public SessionQueueQueryResponseMessage(final boolean durable, final boolean temporary, final int maxSize, 
+   		final int consumerCount, final int messageCount, final String filterString, final String address)
+   {
+   	this(durable, temporary, maxSize, consumerCount, messageCount, filterString, address, true);
+   }
    
-   public SessionQueueQueryResponseMessage(boolean durable, boolean temporary, int maxSize, int consumerCount,
-                             int messageCount, String filterString, String address)
+   public SessionQueueQueryResponseMessage()
    {
+      this(false, false, 0, 0, 0, null, null, false);
+   }
+   
+   private SessionQueueQueryResponseMessage(final boolean durable, final boolean temporary, final int maxSize, 
+   		final int consumerCount, final int messageCount, final String filterString, final String address,
+   		final boolean exists)
+   {
       super(SESS_QUEUEQUERY_RESP);
-      
-      this.exists = true;
-      
+       
       this.durable = durable;
       
       this.temporary = temporary;
@@ -49,15 +57,10 @@
       this.filterString = filterString;
       
       this.address = address;
+      
+      this.exists = exists;      
    }
-   
-   public SessionQueueQueryResponseMessage()
-   {
-      super(SESS_QUEUEQUERY_RESP);
       
-      this.exists = false;
-   }
-   
    public boolean isExists()
    {
       return exists;

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionRemoveAddressMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionRemoveAddressMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionRemoveAddressMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -22,13 +22,13 @@
 
    // Attributes ----------------------------------------------------
    
-   private String address;
+   private final String address;
    
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
    
-   public SessionRemoveAddressMessage(String address)
+   public SessionRemoveAddressMessage(final String address)
    {
       super(SESS_REMOVE_ADDRESS);
       

Deleted: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionSendMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionSendMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionSendMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -1,70 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.messaging.core.remoting.wireformat;
-
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_SEND;
-
-import org.jboss.messaging.core.Message;
-
-/**
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- * 
- * @version <tt>$Revision$</tt>
- */
-public class SessionSendMessage extends AbstractPacket
-{
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   private String address;
-   
-   private final Message message;
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public SessionSendMessage(String address, Message message)
-   {
-      super(SESS_SEND);
-
-      assert message != null;
-            
-      this.address = address;
-      
-      this.message = message;
-   }
-
-   // Public --------------------------------------------------------
-
-   public Message getMessage()
-   {
-      return message;
-   }
-   
-   public String getAddress()
-   {
-      return address;
-   }
-
-   @Override
-   public String toString()
-   {
-      return getParentString() + ", message=" + message + ", address=" + address
-            + "]";
-   }
-   
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXACommitMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXACommitMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXACommitMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -20,15 +20,15 @@
 
    // Attributes ----------------------------------------------------
    
-   private boolean onePhase;
+   private final boolean onePhase;
    
-   private Xid xid;
+   private final Xid xid;
    
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionXACommitMessage(Xid xid, boolean onePhase)
+   public SessionXACommitMessage(final Xid xid, final boolean onePhase)
    {
       super(PacketType.SESS_XA_COMMIT);
       

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAEndMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAEndMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAEndMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -20,15 +20,15 @@
 
    // Attributes ----------------------------------------------------
    
-   private Xid xid;
+   private final Xid xid;
    
-   private boolean failed;
+   private final boolean failed;
    
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionXAEndMessage(Xid xid, boolean failed)
+   public SessionXAEndMessage(final Xid xid, final boolean failed)
    {
       super(PacketType.SESS_XA_END);
       

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAForgetMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAForgetMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAForgetMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -20,13 +20,13 @@
 
    // Attributes ----------------------------------------------------
    
-   private Xid xid;
+   private final Xid xid;
       
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionXAForgetMessage(Xid xid)
+   public SessionXAForgetMessage(final Xid xid)
    {
       super(PacketType.SESS_XA_FORGET);
       

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAGetInDoubtXidsResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAGetInDoubtXidsResponseMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAGetInDoubtXidsResponseMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -22,13 +22,13 @@
 
    // Attributes ----------------------------------------------------
    
-   private List<Xid> xids;
+   private final List<Xid> xids;
    
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionXAGetInDoubtXidsResponseMessage(List<Xid> xids)
+   public SessionXAGetInDoubtXidsResponseMessage(final List<Xid> xids)
    {
       super(PacketType.SESS_XA_INDOUBT_XIDS_RESP);
       

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAGetTimeoutResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAGetTimeoutResponseMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAGetTimeoutResponseMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -19,13 +19,13 @@
 
    // Attributes ----------------------------------------------------
    
-   private int timeoutSeconds;
+   private final int timeoutSeconds;
    
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionXAGetTimeoutResponseMessage(int timeoutSeconds)
+   public SessionXAGetTimeoutResponseMessage(final int timeoutSeconds)
    {
       super(PacketType.SESS_XA_GET_TIMEOUT_RESP);
       

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAJoinMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAJoinMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAJoinMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -20,13 +20,13 @@
 
    // Attributes ----------------------------------------------------
    
-   private Xid xid;
+   private final Xid xid;
    
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionXAJoinMessage(Xid xid)
+   public SessionXAJoinMessage(final Xid xid)
    {
       super(PacketType.SESS_XA_JOIN);
       

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAPrepareMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAPrepareMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAPrepareMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -20,13 +20,13 @@
 
    // Attributes ----------------------------------------------------
    
-   private Xid xid;
+   private final Xid xid;
    
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionXAPrepareMessage(Xid xid)
+   public SessionXAPrepareMessage(final Xid xid)
    {
       super(PacketType.SESS_XA_PREPARE);
       

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAResponseMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAResponseMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -18,17 +18,17 @@
 
    // Attributes ----------------------------------------------------
    
-   private boolean error;
+   private final boolean error;
    
-   private int responseCode;
+   private final int responseCode;
    
-   private String message;
+   private final String message;
       
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionXAResponseMessage(boolean isError, int responseCode, String message)
+   public SessionXAResponseMessage(final boolean isError, final int responseCode, final String message)
    {
       super(PacketType.SESS_XA_RESP);
       

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAResumeMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAResumeMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAResumeMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -20,13 +20,13 @@
 
    // Attributes ----------------------------------------------------
    
-   private Xid xid;
+   private final Xid xid;
    
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionXAResumeMessage(Xid xid)
+   public SessionXAResumeMessage(final Xid xid)
    {
       super(PacketType.SESS_XA_RESUME);
       

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXARollbackMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXARollbackMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXARollbackMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -20,13 +20,13 @@
 
    // Attributes ----------------------------------------------------
    
-   private Xid xid;
+   private final Xid xid;
    
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionXARollbackMessage(Xid xid)
+   public SessionXARollbackMessage(final Xid xid)
    {
       super(PacketType.SESS_XA_ROLLBACK);
       

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXASetTimeoutMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXASetTimeoutMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXASetTimeoutMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -19,13 +19,13 @@
 
    // Attributes ----------------------------------------------------
    
-   private int timeoutSeconds;
+   private final int timeoutSeconds;
    
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionXASetTimeoutMessage(int timeoutSeconds)
+   public SessionXASetTimeoutMessage(final int timeoutSeconds)
    {
       super(PacketType.SESS_XA_SET_TIMEOUT);
       

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXASetTimeoutResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXASetTimeoutResponseMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXASetTimeoutResponseMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -18,13 +18,13 @@
 
    // Attributes ----------------------------------------------------
    
-   private boolean ok;
+   private final boolean ok;
    
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionXASetTimeoutResponseMessage(boolean ok)
+   public SessionXASetTimeoutResponseMessage(final boolean ok)
    {
       super(PacketType.SESS_XA_SET_TIMEOUT_RESP);
       

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAStartMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAStartMessage.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAStartMessage.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -20,13 +20,13 @@
 
    // Attributes ----------------------------------------------------
    
-   private Xid xid;
+   private final Xid xid;
    
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionXAStartMessage(Xid xid)
+   public SessionXAStartMessage(final Xid xid)
    {
       super(PacketType.SESS_XA_START);
       

Modified: trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java	2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java	2008-02-25 12:15:14 UTC (rev 3783)
@@ -29,6 +29,7 @@
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.NULL;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.PING;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.PONG;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.PROD_SEND;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_ACKNOWLEDGE;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_ADD_ADDRESS;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_BINDINGQUERY;
@@ -46,6 +47,8 @@
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEBROWSER_RESP;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATECONSUMER;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATECONSUMER_RESP;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEPRODUCER;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEPRODUCER_RESP;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEQUEUE;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_DELETE_QUEUE;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_DELIVER;
@@ -54,7 +57,6 @@
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_RECOVER;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_REMOVE_ADDRESS;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_ROLLBACK;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_SEND;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_COMMIT;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_END;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_FORGET;
@@ -100,6 +102,7 @@
 import org.jboss.messaging.core.remoting.codec.DeliverMessageCodec;
 import org.jboss.messaging.core.remoting.codec.PingCodec;
 import org.jboss.messaging.core.remoting.codec.PongCodec;
+import org.jboss.messaging.core.remoting.codec.ProducerSendMessageCodec;
 import org.jboss.messaging.core.remoting.codec.RemotingBuffer;
 import org.jboss.messaging.core.remoting.codec.SessionAcknowledgeMessageCodec;
 import org.jboss.messaging.core.remoting.codec.SessionAddAddressMessageCodec;
@@ -114,12 +117,13 @@
 import org.jboss.messaging.core.remoting.codec.SessionCreateBrowserResponseMessageCodec;
 import org.jboss.messaging.core.remoting.codec.SessionCreateConsumerMessageCodec;
 import org.jboss.messaging.core.remoting.codec.SessionCreateConsumerResponseMessageCodec;
+import org.jboss.messaging.core.remoting.codec.SessionCreateProducerMessageCodec;
+import org.jboss.messaging.core.remoting.codec.SessionCreateProducerResponseMessageCodec;
 import org.jboss.messaging.core.remoting.codec.SessionCreateQueueMessageCodec;
 import org.jboss.messaging.core.remoting.codec.SessionDeleteQueueMessageCodec;
 import org.jboss.messaging.core.remoting.codec.SessionQueueQueryMessageCodec;
 import org.jboss.messaging.core.remoting.codec.SessionQueueQueryResponseMessageCodec;
 import org.jboss.messaging.core.remoting.codec.SessionRemoveAddressMessageCodec;
-import org.jboss.messaging.core.remoting.codec.SessionSendMessageCodec;
 import org.jboss.messaging.core.remoting.codec.SessionXACommitMessageCodec;
 import org.jboss.messaging.core.remoting.codec.SessionXAEndMessageCodec;
 import org.jboss.messaging.core.remoting.codec.SessionXAForgetMessageCodec;
@@ -151,6 +155,7 @@
 import org.jboss.messaging.core.remoting.wireformat.PacketType;
 import org.jboss.messaging.core.remoting.wireformat.Ping;
 import org.jboss.messaging.core.remoting.wireformat.Pong;
+import org.jboss.messaging.core.remoting.wireformat.ProducerSendMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionAcknowledgeMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionAddAddressMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionBindingQueryMessage;
@@ -168,6 +173,8 @@
 import org.jboss.messaging.core.remoting.wireformat.SessionCreateBrowserResponseMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionCreateConsumerMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionCreateConsumerResponseMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCreateProducerMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCreateProducerResponseMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionCreateQueueMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionDeleteQueueMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryMessage;
@@ -175,7 +182,6 @@
 import org.jboss.messaging.core.remoting.wireformat.SessionRecoverMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionRemoveAddressMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionRollbackMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionSendMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionXACommitMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionXAEndMessage;
 import org.jboss.messaging.core.remoting.wireformat.SessionXAForgetMessage;
@@ -543,9 +549,9 @@
 
    public void testSendMessage() throws Exception
    {
-      SessionSendMessage packet = new SessionSendMessage(randomString(), new MessageImpl());
+      ProducerSendMessage packet = new ProducerSendMessage(randomString(), new MessageImpl());
 
-      AbstractPacketCodec codec = new SessionSendMessageCodec();
+      AbstractPacketCodec codec = new ProducerSendMessageCodec();
       SimpleRemotingBuffer buffer = encode(packet, codec);
       checkHeader(buffer, packet);
       checkBody(buffer, packet.getAddress(), StreamUtils.toBytes(packet.getMessage()));
@@ -553,9 +559,9 @@
 
       AbstractPacket p = codec.decode(buffer);
 
-      assertTrue(p instanceof SessionSendMessage);
-      SessionSendMessage decodedPacket = (SessionSendMessage) p;
-      assertEquals(SESS_SEND, decodedPacket.getType());
+      assertTrue(p instanceof ProducerSendMessage);
+      ProducerSendMessage decodedPacket = (ProducerSendMessage) p;
+      assertEquals(PROD_SEND, decodedPacket.getType());
       assertEquals(packet.getAddress(), decodedPacket.getAddress());
       assertEquals(packet.getMessage().getMessageID(), decodedPacket
             .getMessage().getMessageID());
@@ -604,7 +610,44 @@
       assertEquals(SESS_CREATECONSUMER_RESP, decodedResponse.getType());
       assertEquals(response.getPrefetchSize(), decodedResponse.getPrefetchSize());
    }
+   
+   public void testCreateProducerRequest() throws Exception
+   {      
+      String destination = "queue.testCreateProducerRequest";
+      SessionCreateProducerMessage request = new SessionCreateProducerMessage(destination);
 
+      AbstractPacketCodec codec = new SessionCreateProducerMessageCodec();
+      SimpleRemotingBuffer buffer = encode(request, codec);
+      checkHeader(buffer, request);
+      checkBody(buffer, request.getAddress());
+      buffer.rewind();
+
+      AbstractPacket decodedPacket = codec.decode(buffer);
+
+      assertTrue(decodedPacket instanceof SessionCreateProducerMessage);
+      SessionCreateProducerMessage decodedRequest = (SessionCreateProducerMessage) decodedPacket;
+      assertEquals(SESS_CREATEPRODUCER, decodedRequest.getType());
+      assertEquals(request.getAddress(), decodedRequest.getAddress());
+   }
+   
+   public void testCreateProducerResponse() throws Exception
+   {
+      SessionCreateProducerResponseMessage response = new SessionCreateProducerResponseMessage(randomString());
+
+      AbstractPacketCodec codec = new SessionCreateProducerResponseMessageCodec();
+      SimpleRemotingBuffer buffer = encode(response, codec);
+      checkHeader(buffer, response);
+      checkBody(buffer, response.getProducerID());
+      buffer.rewind();
+
+      AbstractPacket decodedPacket = codec.decode(buffer);
+
+      assertTrue(decodedPacket instanceof SessionCreateProducerResponseMessage);
+      SessionCreateProducerResponseMessage decodedResponse = (SessionCreateProducerResponseMessage) decodedPacket;
+      assertEquals(SESS_CREATEPRODUCER_RESP, decodedResponse.getType());
+      assertEquals(response.getProducerID(), decodedResponse.getProducerID());;
+   }
+
    public void testStartConnectionMessage() throws Exception
    {
       ConnectionStartMessage packet = new ConnectionStartMessage();




More information about the jboss-cvs-commits mailing list