[jboss-cvs] JBoss Messaging SVN: r3650 - in trunk: src/main/org/jboss/jms/client and 8 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Jan 30 17:51:50 EST 2008


Author: clebert.suconic at jboss.com
Date: 2008-01-30 17:51:49 -0500 (Wed, 30 Jan 2008)
New Revision: 3650

Removed:
   trunk/src/main/org/jboss/jms/client/api/ClientProducer.java
   trunk/src/main/org/jboss/jms/client/impl/ClientProducerImpl.java
Modified:
   trunk/docs/examples/embedded/src/org/jboss/example/embedded/EmbeddedExample.java
   trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java
   trunk/src/main/org/jboss/jms/client/JBossSession.java
   trunk/src/main/org/jboss/jms/client/api/ClientSession.java
   trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionSendMessageCodec.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionSendMessage.java
   trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/ConnectionClosedTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/message/JMSMessageIDHeaderTest.java
Log:
JBMESSAGING-681 - Removing ClientProducer/ClientProducerImpl... All the JMS specific implementation was moved to JBossMessageProducer

Modified: trunk/docs/examples/embedded/src/org/jboss/example/embedded/EmbeddedExample.java
===================================================================
--- trunk/docs/examples/embedded/src/org/jboss/example/embedded/EmbeddedExample.java	2008-01-30 22:47:52 UTC (rev 3649)
+++ trunk/docs/examples/embedded/src/org/jboss/example/embedded/EmbeddedExample.java	2008-01-30 22:51:49 UTC (rev 3650)
@@ -55,9 +55,8 @@
 
       MessageImpl message = new MessageImpl();
       Destination destination = new DestinationImpl(DestinationType.QUEUE, "Queue1", false);
-      message.putHeader(org.jboss.messaging.core.Message.TEMP_DEST_HEADER_NAME, destination);
       message.setPayload("hello".getBytes());
-      clientSession.send(message);
+      clientSession.send(message, destination);
 
       ClientConsumer clientConsumer = clientSession.createClientConsumer(destination, null, false, null);
       clientConnection.start();

Modified: trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java	2008-01-30 22:47:52 UTC (rev 3649)
+++ trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java	2008-01-30 22:51:49 UTC (rev 3650)
@@ -21,22 +21,42 @@
   */
 package org.jboss.jms.client;
 
+import java.util.UUID;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
 import javax.jms.Destination;
+import javax.jms.IllegalStateException;
 import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
+import javax.jms.MapMessage;
 import javax.jms.Message;
+import javax.jms.MessageFormatException;
 import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
 import javax.jms.Queue;
 import javax.jms.QueueSender;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
 import javax.jms.Topic;
 import javax.jms.TopicPublisher;
 
+import org.jboss.jms.client.api.ClientSession;
 import org.jboss.jms.destination.JBossDestination;
+import org.jboss.jms.message.JBossBytesMessage;
+import org.jboss.jms.message.JBossMapMessage;
+import org.jboss.jms.message.JBossMessage;
+import org.jboss.jms.message.JBossObjectMessage;
+import org.jboss.jms.message.JBossStreamMessage;
+import org.jboss.jms.message.JBossTextMessage;
+import org.jboss.messaging.core.DestinationType;
+import org.jboss.messaging.core.impl.DestinationImpl;
 import org.jboss.messaging.util.Logger;
 
 /**
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  * @version <tt>$Revision$</tt>
  *
  * $Id$
@@ -51,84 +71,119 @@
    
    // Attributes ----------------------------------------------------
    
-   protected org.jboss.jms.client.api.ClientProducer producer;
+   private boolean trace = log.isTraceEnabled();
+   
+   private ClientSession session;
 
+   private Destination destination;
+
+   private volatile boolean closed;
+   
+   private boolean disableMessageID = false;
+   
+   private boolean disableMessageTimestamp = false;
+   
+   private int priority = 4;
+   
+   private long timeToLive = 0;
+   
+   private int deliveryMode = DeliveryMode.PERSISTENT;
+   
+
    // Constructors --------------------------------------------------
    
-   public JBossMessageProducer(org.jboss.jms.client.api.ClientProducer producer)
+   public JBossMessageProducer(org.jboss.jms.client.api.ClientSession session, Destination destination)
    {
-      this.producer = producer;     
+      this.session = session;
+      this.destination = destination;
    }
    
    // MessageProducer implementation --------------------------------
    
    public void setDisableMessageID(boolean value) throws JMSException
    {
+      checkClosed();
       log.warn("JBoss Messaging does not support disabling message ID generation");
 
-      producer.setDisableMessageID(value);
+      this.disableMessageID = value;
    }
    
    public boolean getDisableMessageID() throws JMSException
    {
-      return producer.isDisableMessageID();
+      checkClosed();
+      return disableMessageID;
    }
    
    public void setDisableMessageTimestamp(boolean value) throws JMSException
    {
-      producer.setDisableMessageTimestamp(value);
+      checkClosed();
+      this.disableMessageTimestamp = value;
    }
    
    public boolean getDisableMessageTimestamp() throws JMSException
    {
-      return producer.isDisableMessageTimestamp();
+      checkClosed();
+      return disableMessageTimestamp;
    }
    
    public void setDeliveryMode(int deliveryMode) throws JMSException
    {
-      producer.setDeliveryMode(deliveryMode);
+      checkClosed();
+      this.deliveryMode = deliveryMode;
    }
    
    public int getDeliveryMode() throws JMSException
    {
-      return producer.getDeliveryMode();
+      checkClosed();
+      return deliveryMode;
    }
    
    public void setPriority(int defaultPriority) throws JMSException
    {
-      producer.setPriority(defaultPriority);
+      checkClosed();
+      
+      if (defaultPriority < 0 || defaultPriority > 9)
+      {
+         throw new JMSException("Invalid message priority (" + priority + "). " +
+                                          "Valid priorities are 0-9");
+      }
+      
+      this.priority = defaultPriority;
    }
    
    public int getPriority() throws JMSException
    {
-      return producer.getPriority();
+      checkClosed();
+      return this.priority;
    }
    
    public void setTimeToLive(long timeToLive) throws JMSException
    {
-      producer.setTimeToLive(timeToLive);
+      checkClosed();
+      this.timeToLive = timeToLive;
    }
    
    public long getTimeToLive() throws JMSException
    {
-      return producer.getTimeToLive();
+      checkClosed();
+      return this.timeToLive;
    }
    
    public Destination getDestination() throws JMSException
    {
-      return producer.getDestination();
+      checkClosed();
+      return this.destination;
    }
    
    public void close() throws JMSException
    {
-      producer.closing();
-      producer.close();
+      closed = true;
    }
    
    public void send(Message message) throws JMSException
    {
       // by default the message never expires
-      send(message, -1, -1, Long.MIN_VALUE);
+      send(message, this.deliveryMode, this.priority, timeToLive);
    }
    
    /**
@@ -142,7 +197,7 @@
    
    public void send(Destination destination, Message message) throws JMSException
    {      
-      send(destination, message, -1, -1, Long.MIN_VALUE);
+      send(destination, message, deliveryMode, priority, timeToLive);
    }
 
    public void send(Destination destination,
@@ -155,11 +210,133 @@
       {
          throw new InvalidDestinationException("Not a JBossDestination:" + destination);
       }
+      
+      
+      checkClosed();
+      
+      m.setJMSDeliveryMode(deliveryMode);
 
-      producer.send((JBossDestination)destination, m, deliveryMode, priority, timeToLive);
+      if (priority < 0 || priority > 9)
+      {
+         throw new MessageFormatException("Invalid message priority (" + priority + "). " +
+                                          "Valid priorities are 0-9");
+      }
+      m.setJMSPriority(priority);
+
+      if (this.getDisableMessageTimestamp())
+      {
+         m.setJMSTimestamp(0l);
+      }
+      else
+      {
+         m.setJMSTimestamp(System.currentTimeMillis());
+      }
+
+      if (timeToLive == 0)
+      {
+         // Zero implies never expires
+         m.setJMSExpiration(0);
+      }
+      else
+      {
+         m.setJMSExpiration(System.currentTimeMillis() + timeToLive);
+      }
+
+      if (destination == null)
+      {
+         // use destination from producer
+         destination = (JBossDestination)getDestination();
+
+         if (destination == null)
+         {
+            throw new UnsupportedOperationException("Destination not specified");
+         }
+
+         if (trace) { log.trace("Using producer's default destination: " + destination); }
+      }
+      else
+      {
+         // if a default destination was already specified then this must be same destination as
+         // that specified in the arguments
+
+         if (getDestination() != null &&
+             !getDestination().equals(destination))
+         {
+            throw new UnsupportedOperationException("Where a default destination is specified " +
+                                                    "for the sender and a destination is " +
+                                                    "specified in the arguments to the send, " +
+                                                    "these destinations must be equal");
+         }
+      }
+
+      JBossMessage jbm;
+
+      boolean foreign = false;
+
+      //First convert from foreign message if appropriate
+      if (!(m instanceof JBossMessage))
+      {
+         // it's a foreign message
+
+         // JMS 1.1 Sect. 3.11.4: A provider must be prepared to accept, from a client,
+         // a message whose implementation is not one of its own.
+
+         // create a matching JBossMessage Type from JMS Type
+         jbm = convertMessage(destination, m);
+
+         foreign = true;
+      }
+      else
+      {
+         jbm = (JBossMessage)m;
+      }
+
+      try
+      {
+         jbm.doBeforeSend();
+      }
+      catch (Exception e)
+      {
+         JMSException exthrown = new JMSException (e.toString());
+         exthrown.initCause(e);
+         throw exthrown;
+      }
+
+      final boolean keepID = false;
+      
+      if (!keepID)
+      {
+         //Generate an id
+         
+         String id = UUID.randomUUID().toString();
+         
+         jbm.setJMSMessageID("ID:" + id);
+      }
+
+      if (foreign)
+      {
+         m.setJMSMessageID(jbm.getJMSMessageID());
+      }
+
+      jbm.setJMSDestination(destination);
+
+      JBossDestination dest = (JBossDestination)destination;
+
+      //Set the destination on the core message - TODO temp for refactoring
+      org.jboss.messaging.core.Destination coreDest =
+         new DestinationImpl(dest.isQueue() ? DestinationType.QUEUE : DestinationType.TOPIC, dest.getName(), dest.isTemporary());
+      
+      //TODO - can optimise this copy to do copy lazily.
+      org.jboss.messaging.core.Message messageToSend = jbm.getCoreMessage().copy();
+
+      //FIXME - temp - for now we set destination as a header - should really be an attribute of the
+      //send packet - along with scheduleddelivery time
+
+      // we now invoke the send(Message) method on the session, which will eventually be fielded
+      // by connection endpoint
+      session.send(messageToSend, coreDest);
    }
 
-
    // TopicPublisher Implementation ---------------------------------
 
    public Topic getTopic() throws JMSException
@@ -209,14 +386,14 @@
    
    // Public --------------------------------------------------------
 
-   public org.jboss.jms.client.api.ClientProducer getDelegate()
+   public ClientSession getDelegate()
    {
-      return producer;
+      return session;
    }
 
    public String toString()
    {
-      return "JBossMessageProducer->" + producer;
+      return "JBossMessageProducer->" + session;
    }
 
    // Package protected ---------------------------------------------
@@ -224,6 +401,51 @@
    // Protected -----------------------------------------------------
    
    // Private -------------------------------------------------------
+
+
+   private void checkClosed() throws IllegalStateException
+   {
+      if (closed || session.isClosed())
+      {
+         throw new IllegalStateException("Producer is closed");
+      }
+   }
+
+   private JBossMessage convertMessage(Destination destination, Message m)
+         throws JMSException
+   {
+      JBossMessage jbm;
+      if (m instanceof BytesMessage)
+      {
+         jbm = new JBossBytesMessage((BytesMessage) m);
+      } 
+      else if (m instanceof MapMessage)
+      {
+         jbm = new JBossMapMessage((MapMessage) m);
+      } 
+      else if (m instanceof ObjectMessage)
+      {
+         jbm = new JBossObjectMessage((ObjectMessage) m);
+      } 
+      else if (m instanceof StreamMessage)
+      {
+         jbm = new JBossStreamMessage((StreamMessage) m);
+      } 
+      else if (m instanceof TextMessage)
+      {
+         jbm = new JBossTextMessage((TextMessage) m);
+      } 
+      else
+      {
+         jbm = new JBossMessage(m);
+      }
+
+      //Set the destination on the original message
+      m.setJMSDestination(destination);
+      return jbm;
+   }
+
+
    
    // Inner classes -------------------------------------------------
 }

Modified: trunk/src/main/org/jboss/jms/client/JBossSession.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossSession.java	2008-01-30 22:47:52 UTC (rev 3649)
+++ trunk/src/main/org/jboss/jms/client/JBossSession.java	2008-01-30 22:51:49 UTC (rev 3650)
@@ -55,7 +55,6 @@
 import javax.transaction.xa.XAResource;
 
 import org.jboss.jms.client.api.ClientConsumer;
-import org.jboss.jms.client.api.ClientProducer;
 import org.jboss.jms.client.api.ClientSession;
 import org.jboss.jms.destination.JBossDestination;
 import org.jboss.jms.destination.JBossQueue;
@@ -286,14 +285,16 @@
 
    public MessageProducer createProducer(Destination d) throws JMSException
    {
+      if (session.isClosed())
+      {
+         throw new IllegalStateException("Session is closed");
+      }
       if (d != null && !(d instanceof JBossDestination))
       {
          throw new InvalidDestinationException("Not a JBossDestination:" + d);
       }
            
-      ClientProducer producerDelegate = session.createClientProducer((JBossDestination)d);
-      
-      return new JBossMessageProducer(producerDelegate);
+      return new JBossMessageProducer(session, d);
    }
 
   public MessageConsumer createConsumer(Destination d) throws JMSException

Deleted: trunk/src/main/org/jboss/jms/client/api/ClientProducer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/api/ClientProducer.java	2008-01-30 22:47:52 UTC (rev 3649)
+++ trunk/src/main/org/jboss/jms/client/api/ClientProducer.java	2008-01-30 22:51:49 UTC (rev 3650)
@@ -1,56 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-
-package org.jboss.jms.client.api;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-
-import org.jboss.jms.destination.JBossDestination;
-
-/**
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- */
-public interface ClientProducer
-{
-   String getID();
-   
-   void setDisableMessageID(boolean value) throws JMSException;
-   
-   boolean isDisableMessageID() throws JMSException;
-   
-   void setDisableMessageTimestamp(boolean value) throws JMSException;
-   
-   boolean isDisableMessageTimestamp() throws JMSException;
-   
-   void setDeliveryMode(int deliveryMode) throws JMSException;
-   
-   int getDeliveryMode() throws JMSException;
-   
-   void setPriority(int defaultPriority) throws JMSException;
-   
-   int getPriority() throws JMSException;
-   
-   void setTimeToLive(long timeToLive) throws JMSException;
-   
-   long getTimeToLive() throws JMSException;
-   
-   JBossDestination getDestination() throws JMSException;
-   
-   void send(JBossDestination destination,
-             Message message,
-             int deliveryMode,
-             int priority,
-             long timeToLive) throws JMSException;
-   
-   void closing() throws JMSException;
-   
-   void close() throws JMSException;
-
-}

Modified: trunk/src/main/org/jboss/jms/client/api/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/api/ClientSession.java	2008-01-30 22:47:52 UTC (rev 3649)
+++ trunk/src/main/org/jboss/jms/client/api/ClientSession.java	2008-01-30 22:51:49 UTC (rev 3650)
@@ -32,8 +32,6 @@
    
    ClientBrowser createClientBrowser(Destination queue, String messageSelector) throws JMSException;
    
-   ClientProducer createClientProducer(JBossDestination destination) throws JMSException;
-
    JBossQueue createQueue(String queueName) throws JMSException;
 
    JBossTopic createTopic(String topicName) throws JMSException;
@@ -46,7 +44,7 @@
 
    void unsubscribe(String subscriptionName) throws JMSException;
 
-   void send(Message message) throws JMSException;
+   void send(Message message, Destination destination) throws JMSException;
 
    XAResource getXAResource();
 
@@ -58,8 +56,6 @@
    
    void removeConsumer(ClientConsumer consumer) throws JMSException;
    
-   void removeProducer(ClientProducer producer);
-   
    void removeBrowser(ClientBrowser browser);
    
    boolean isClosed();

Deleted: trunk/src/main/org/jboss/jms/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientProducerImpl.java	2008-01-30 22:47:52 UTC (rev 3649)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientProducerImpl.java	2008-01-30 22:51:49 UTC (rev 3650)
@@ -1,394 +0,0 @@
-/*
-  * JBoss, Home of Professional Open Source
-  * Copyright 2005, JBoss Inc., and individual contributors as indicated
-  * by the @authors tag. See the copyright.txt in the distribution for a
-  * full listing of individual contributors.
-  *
-  * This is free software; you can redistribute it and/or modify it
-  * under the terms of the GNU Lesser General Public License as
-  * published by the Free Software Foundation; either version 2.1 of
-  * the License, or (at your option) any later version.
-  *
-  * This software is distributed in the hope that it will be useful,
-  * but WITHOUT ANY WARRANTY; without even the implied warranty of
-  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-  * Lesser General Public License for more details.
-  *
-  * You should have received a copy of the GNU Lesser General Public
-  * License along with this software; if not, write to the Free
-  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-  */
-package org.jboss.jms.client.impl;
-
-import java.util.UUID;
-
-import javax.jms.BytesMessage;
-import javax.jms.DeliveryMode;
-import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageFormatException;
-import javax.jms.ObjectMessage;
-import javax.jms.StreamMessage;
-import javax.jms.TextMessage;
-
-import org.jboss.jms.client.api.ClientProducer;
-import org.jboss.jms.client.api.ClientSession;
-import org.jboss.jms.destination.JBossDestination;
-import org.jboss.jms.message.JBossBytesMessage;
-import org.jboss.jms.message.JBossMapMessage;
-import org.jboss.jms.message.JBossMessage;
-import org.jboss.jms.message.JBossObjectMessage;
-import org.jboss.jms.message.JBossStreamMessage;
-import org.jboss.jms.message.JBossTextMessage;
-import org.jboss.messaging.core.DestinationType;
-import org.jboss.messaging.core.impl.DestinationImpl;
-import org.jboss.messaging.util.Logger;
-
-/**
- * The client-side Producer delegate class.
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- * @version <tt>$Revision: 3602 $</tt>
- *
- * $Id: ClientProducerImpl.java 3602 2008-01-21 17:48:32Z timfox $
- */
-public class ClientProducerImpl implements ClientProducer
-{
-   // Constants ------------------------------------------------------------------------------------
-
-   private static final Logger log = Logger.getLogger(ClientProducerImpl.class);
-
-   // Attributes -----------------------------------------------------------------------------------
-
-   private boolean trace = log.isTraceEnabled();
-   
-   private ClientSession session;
-   
-   private JBossDestination destination;
-
-   private boolean disableMessageID = false;
-   
-   private boolean disableMessageTimestamp = false;
-   
-   private int priority = 4;
-   
-   private long timeToLive = 0;
-   
-   private int deliveryMode = DeliveryMode.PERSISTENT;
-   
-   private String id;
-   
-   private volatile boolean closed;
-   
-   // Static ---------------------------------------------------------------------------------------
-
-   // Constructors ---------------------------------------------------------------------------------
-      
-   public ClientProducerImpl(ClientSession session, JBossDestination destination)
-   {
-      this.session = session;
-      
-      this.destination = destination;
-      
-      this.id = UUID.randomUUID().toString();
-   }
-
-   public String getID()
-   {
-      return id;
-   }
-
-   public synchronized void close() throws JMSException
-   {
-      if (closed)
-      {
-         return;         
-      }
-      session.removeProducer(this);
-      
-      closed = true;
-   }
-
-   public synchronized void closing() throws JMSException
-   {    
-   }
-
-   public JBossDestination getDestination() throws JMSException
-   {
-      checkClosed();
-      
-      return this.destination;
-   }
-
-   public void send(JBossDestination destination, Message m, int deliveryMode, int priority,
-                    long timeToLive) throws JMSException
-   {
-      checkClosed();
-      
-      // configure the message for sending, using attributes stored as metadata
-
-      if (deliveryMode == -1)
-      {
-         // Use the delivery mode of the producer
-         deliveryMode = getDeliveryMode();
-         if (trace) { log.trace("Using producer's default delivery mode: " + deliveryMode); }
-      }
-      m.setJMSDeliveryMode(deliveryMode);
-
-      if (priority == -1)
-      {
-         // Use the priority of the producer
-         priority = getPriority();
-         if (trace) { log.trace("Using producer's default priority: " + priority); }
-      }
-      if (priority < 0 || priority > 9)
-      {
-         throw new MessageFormatException("Invalid message priority (" + priority + "). " +
-                                          "Valid priorities are 0-9");
-      }
-      m.setJMSPriority(priority);
-
-      if (this.isDisableMessageTimestamp())
-      {
-         m.setJMSTimestamp(0l);
-      }
-      else
-      {
-         m.setJMSTimestamp(System.currentTimeMillis());
-      }
-
-      if (timeToLive == Long.MIN_VALUE)
-      {
-         // Use time to live value from producer
-         timeToLive = getTimeToLive();
-         if (trace) { log.trace("Using producer's default timeToLive: " + timeToLive); }
-      }
-
-      if (timeToLive == 0)
-      {
-         // Zero implies never expires
-         m.setJMSExpiration(0);
-      }
-      else
-      {
-         m.setJMSExpiration(System.currentTimeMillis() + timeToLive);
-      }
-
-      if (destination == null)
-      {
-         // use destination from producer
-         destination = (JBossDestination)getDestination();
-
-         if (destination == null)
-         {
-            throw new UnsupportedOperationException("Destination not specified");
-         }
-
-         if (trace) { log.trace("Using producer's default destination: " + destination); }
-      }
-      else
-      {
-         // if a default destination was already specified then this must be same destination as
-         // that specified in the arguments
-
-         if (getDestination() != null &&
-             !getDestination().equals(destination))
-         {
-            throw new UnsupportedOperationException("Where a default destination is specified " +
-                                                    "for the sender and a destination is " +
-                                                    "specified in the arguments to the send, " +
-                                                    "these destinations must be equal");
-         }
-      }
-
-      JBossMessage jbm;
-
-      boolean foreign = false;
-
-      //First convert from foreign message if appropriate
-      if (!(m instanceof JBossMessage))
-      {
-         // it's a foreign message
-
-         // JMS 1.1 Sect. 3.11.4: A provider must be prepared to accept, from a client,
-         // a message whose implementation is not one of its own.
-
-         // create a matching JBossMessage Type from JMS Type
-         if (m instanceof BytesMessage)
-         {
-            jbm = new JBossBytesMessage((BytesMessage)m);
-         }
-         else if (m instanceof MapMessage)
-         {
-            jbm = new JBossMapMessage((MapMessage)m);
-         }
-         else if (m instanceof ObjectMessage)
-         {
-            jbm = new JBossObjectMessage((ObjectMessage)m);
-         }
-         else if (m instanceof StreamMessage)
-         {
-            jbm = new JBossStreamMessage((StreamMessage)m);
-         }
-         else if (m instanceof TextMessage)
-         {
-            jbm = new JBossTextMessage((TextMessage)m);
-         }
-         else
-         {
-            jbm = new JBossMessage(m);
-         }
-
-         //Set the destination on the original message
-         m.setJMSDestination(destination);
-
-         foreign = true;
-      }
-      else
-      {
-         jbm = (JBossMessage)m;
-      }
-
-      final boolean keepID = false;
-      
-      if (!keepID)
-      {
-         //Generate an id
-         
-         String id = UUID.randomUUID().toString();
-         
-         jbm.setJMSMessageID("ID:" + id);
-      }
-
-      if (foreign)
-      {
-         m.setJMSMessageID(jbm.getJMSMessageID());
-      }
-
-      jbm.setJMSDestination(destination);
-
-      try
-      {
-         jbm.doBeforeSend();
-      }
-      catch (Exception e)
-      {
-         JMSException exthrown = new JMSException (e.toString());
-         exthrown.initCause(e);
-         throw exthrown;
-      }
-
-      JBossDestination dest = (JBossDestination)destination;
-
-      //Set the destination on the core message - TODO temp for refactoring
-      org.jboss.messaging.core.Destination coreDest =
-         new DestinationImpl(dest.isQueue() ? DestinationType.QUEUE : DestinationType.TOPIC, dest.getName(), dest.isTemporary());
-      
-      //TODO - can optimise this copy to do copy lazily.
-      org.jboss.messaging.core.Message messageToSend = jbm.getCoreMessage().copy();
-
-      //FIXME - temp - for now we set destination as a header - should really be an attribute of the
-      //send packet - along with scheduleddelivery time
-
-      messageToSend.putHeader(org.jboss.messaging.core.Message.TEMP_DEST_HEADER_NAME, coreDest);
-
-      // we now invoke the send(Message) method on the session, which will eventually be fielded
-      // by connection endpoint
-      session.send(messageToSend);
-   }
-
-   public void setDeliveryMode(int deliveryMode) throws JMSException
-   {
-      checkClosed();
-      
-      this.deliveryMode = deliveryMode;
-   }
-
-   public int getDeliveryMode() throws JMSException
-   {
-      checkClosed();
-      
-      return this.deliveryMode;
-   }
-  
-   public boolean isDisableMessageID() throws JMSException
-   {
-      checkClosed();
-      
-      return this.disableMessageID;
-   }
-
-   public void setDisableMessageID(boolean value) throws JMSException
-   {
-      checkClosed();
-      
-      this.disableMessageID = value;   
-   }
-
-   public boolean isDisableMessageTimestamp() throws JMSException
-   {      
-      checkClosed();
-      
-      return this.disableMessageTimestamp;
-   }
-
-   public void setDisableMessageTimestamp(boolean value) throws JMSException
-   {
-      checkClosed();
-      
-      this.disableMessageTimestamp = value;
-   }
-
-   public void setPriority(int priority) throws JMSException
-   {
-      checkClosed();
-      
-      this.priority = priority;
-   }
-
-   public int getPriority() throws JMSException
-   {
-      checkClosed();
-      
-      return this.priority;
-   }
-
-   public long getTimeToLive() throws JMSException
-   {
-      checkClosed();
-      
-      return this.timeToLive;
-   }
-
-   public void setTimeToLive(long timeToLive) throws JMSException
-   {
-      checkClosed();
-      
-      this.timeToLive = timeToLive;
-   }
-
-   // Public ---------------------------------------------------------------------------------------
-
-   // Protected ------------------------------------------------------------------------------------
-   
-   // Package Private ------------------------------------------------------------------------------
-
-   // Private --------------------------------------------------------------------------------------
-
-   private void checkClosed() throws IllegalStateException
-   {
-      if (closed)
-      {
-         throw new IllegalStateException("Producer is closed");
-      }
-   }
-   
-   // Inner Classes --------------------------------------------------------------------------------
-
-}

Modified: trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java	2008-01-30 22:47:52 UTC (rev 3649)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java	2008-01-30 22:51:49 UTC (rev 3650)
@@ -35,7 +35,6 @@
 import org.jboss.jms.client.api.ClientBrowser;
 import org.jboss.jms.client.api.ClientConnection;
 import org.jboss.jms.client.api.ClientConsumer;
-import org.jboss.jms.client.api.ClientProducer;
 import org.jboss.jms.client.api.ClientSession;
 import org.jboss.jms.client.remoting.MessagingRemotingConnection;
 import org.jboss.jms.destination.JBossDestination;
@@ -116,8 +115,6 @@
    
    private Set<ClientBrowser> browsers = new HashSet<ClientBrowser>();
    
-   private Set<ClientProducer> producers = new HashSet<ClientProducer>();
-   
    private Map<String, ClientConsumer> consumers = new HashMap<String, ClientConsumer>();
    
       
@@ -284,17 +281,6 @@
       return consumer;
    }
    
-   public ClientProducer createClientProducer(JBossDestination destination) throws JMSException
-   {
-      checkClosed();
-      
-      ClientProducer producer = new ClientProducerImpl(this, destination);
-  
-      producers.add(producer);
-      
-      return producer;
-   }
-
    public JBossQueue createQueue(String queueName) throws JMSException
    {
       checkClosed();
@@ -395,11 +381,11 @@
       return xaResource;
    }
 
-   public void send(Message m) throws JMSException
+   public void send(Message m, Destination dest) throws JMSException
    {
       checkClosed();
       
-      SessionSendMessage message = new SessionSendMessage(m);
+      SessionSendMessage message = new SessionSendMessage(m, dest);
       
       remotingConnection.send(id, message, !m.isDurable());
    }
@@ -417,11 +403,6 @@
       remotingConnection.send(id, new SessionCancelMessage(-1, false));      
    }
    
-   public void removeProducer(ClientProducer producer)
-   {
-      producers.remove(producer);
-   }
-   
    public void removeBrowser(ClientBrowser browser)
    {
       browsers.remove(browser);
@@ -474,15 +455,6 @@
          consumer.close();
       }
       
-      Set<ClientProducer> producersClone = new HashSet<ClientProducer>(producers);
-      
-      for (ClientProducer producer: producersClone)
-      {
-         producer.closing();
-         
-         producer.close();
-      }
-      
       Set<ClientBrowser> browsersClone = new HashSet<ClientBrowser>(browsers);
       
       for (ClientBrowser browser: browsersClone)

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-01-30 22:47:52 UTC (rev 3649)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-01-30 22:51:49 UTC (rev 3650)
@@ -478,12 +478,10 @@
       }
    }
 
-   private void send(Message msg) throws JMSException
+   private void send(Message msg, Destination dest) throws JMSException
    {
       try
       {
-         Destination dest = (Destination)msg.getHeader(org.jboss.messaging.core.Message.TEMP_DEST_HEADER_NAME);
-
          //Assign the message an internal id - this is used to key it in the store and also used to 
          //handle delivery
          
@@ -1505,7 +1503,7 @@
             {
                SessionSendMessage message = (SessionSendMessage) packet;
               
-               send(message.getMessage());
+               send(message.getMessage(), message.getDestination());
 
                if (message.getMessage().isDurable())
                {

Modified: trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionSendMessageCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionSendMessageCodec.java	2008-01-30 22:47:52 UTC (rev 3649)
+++ trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionSendMessageCodec.java	2008-01-30 22:51:49 UTC (rev 3650)
@@ -8,7 +8,10 @@
 
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SENDMESSAGE;
 
+import org.jboss.messaging.core.Destination;
+import org.jboss.messaging.core.DestinationType;
 import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.impl.DestinationImpl;
 import org.jboss.messaging.core.remoting.wireformat.SessionSendMessage;
 
 /**
@@ -36,13 +39,17 @@
    @Override
    protected void encodeBody(SessionSendMessage message, RemotingBuffer out) throws Exception
    {
+      Destination dest = message.getDestination();
       byte[] encodedMsg = encodeMessage(message.getMessage());   
 
-      int bodyLength = INT_LENGTH + encodedMsg.length;
+      int bodyLength = INT_LENGTH + encodedMsg.length + INT_LENGTH + BOOLEAN_LENGTH + sizeof(dest.getName());
 
       out.putInt(bodyLength);
       out.putInt(encodedMsg.length);
       out.put(encodedMsg);
+      out.putInt(DestinationType.toInt(dest.getType()));
+      out.putBoolean(dest.isTemporary());
+      out.putNullableString(dest.getName());
    }
 
    @Override
@@ -59,8 +66,14 @@
       byte[] encodedMsg = new byte[msgLength];
       in.get(encodedMsg);
       Message msg = decodeMessage(encodedMsg);
+      
+      int destinationType = in.getInt();
+      boolean isTemporary = in.getBoolean();
+      String name = in.getNullableString();
+      
+      DestinationImpl dest = new DestinationImpl(DestinationType.fromInt(destinationType), name, isTemporary);
 
-      return new SessionSendMessage(msg);
+      return new SessionSendMessage(msg, dest);
    }
 
    // Package protected ---------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionSendMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionSendMessage.java	2008-01-30 22:47:52 UTC (rev 3649)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionSendMessage.java	2008-01-30 22:51:49 UTC (rev 3650)
@@ -8,6 +8,7 @@
 
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SENDMESSAGE;
 
+import org.jboss.messaging.core.Destination;
 import org.jboss.messaging.core.Message;
 
 /**
@@ -23,18 +24,20 @@
    // Attributes ----------------------------------------------------
 
    private final Message message;
+   private final Destination destination;
 
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionSendMessage(Message message)
+   public SessionSendMessage(Message message, Destination destination)
    {
       super(MSG_SENDMESSAGE);
 
       assert message != null;
 
       this.message = message;
+      this.destination = destination;
    }
 
    // Public --------------------------------------------------------
@@ -43,6 +46,11 @@
    {
       return message;
    }
+   
+   public Destination getDestination()
+   {
+      return destination;
+   }
 
    @Override
    public String toString()

Modified: trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java	2008-01-30 22:47:52 UTC (rev 3649)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java	2008-01-30 22:51:49 UTC (rev 3650)
@@ -547,12 +547,14 @@
 
    public void testSendMessage() throws Exception
    {
-      SessionSendMessage packet = new SessionSendMessage(new MessageImpl());
+      Destination originalDestination = new DestinationImpl(DestinationType.QUEUE, java.util.UUID.randomUUID().toString(), true);
+      SessionSendMessage packet = new SessionSendMessage(new MessageImpl(), originalDestination);
 
       AbstractPacketCodec codec = new SessionSendMessageCodec();
       SimpleRemotingBuffer buffer = encode(packet, codec);
       checkHeader(buffer, packet);
-      checkBody(buffer, encodeMessage(packet.getMessage()));
+      checkBody(buffer, encodeMessage(packet.getMessage()), DestinationType.toInt(originalDestination.getType()),
+                        originalDestination.isTemporary(), originalDestination.getName());
       buffer.rewind();
 
       AbstractPacket p = codec.decode(buffer);
@@ -562,6 +564,9 @@
       assertEquals(MSG_SENDMESSAGE, decodedPacket.getType());
       assertEquals(packet.getMessage().getMessageID(), decodedPacket
             .getMessage().getMessageID());
+      assertEquals(originalDestination.getName(), packet.getDestination().getName());
+      assertEquals(DestinationType.QUEUE, packet.getDestination().getType());
+      assertEquals(originalDestination.isTemporary(), packet.getDestination().isTemporary());
    }
 
    public void testCreateConsumerRequest() throws Exception

Modified: trunk/tests/src/org/jboss/test/messaging/jms/ConnectionClosedTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/ConnectionClosedTest.java	2008-01-30 22:47:52 UTC (rev 3649)
+++ trunk/tests/src/org/jboss/test/messaging/jms/ConnectionClosedTest.java	2008-01-30 22:51:49 UTC (rev 3650)
@@ -106,7 +106,7 @@
       int count = 0;
       while (true)
       {
-         TextMessage tm = (TextMessage)sub1.receive(10000);
+         TextMessage tm = (TextMessage)sub1.receive(500);
          if (tm == null)
          {
             break;
@@ -125,7 +125,7 @@
       count = 0;
       while (true)
       {
-         TextMessage tm = (TextMessage)sub2.receive(10000);
+         TextMessage tm = (TextMessage)sub2.receive(500);
          if (tm == null)
          {
             break;

Modified: trunk/tests/src/org/jboss/test/messaging/jms/message/JMSMessageIDHeaderTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/message/JMSMessageIDHeaderTest.java	2008-01-30 22:47:52 UTC (rev 3649)
+++ trunk/tests/src/org/jboss/test/messaging/jms/message/JMSMessageIDHeaderTest.java	2008-01-30 22:51:49 UTC (rev 3650)
@@ -49,6 +49,7 @@
       Message m = queueProducerSession.createMessage();
       queueProducer.send(m);
       String messageID = queueConsumer.receive().getJMSMessageID();
+      assertNotNull(messageID);
       // JMS1.1 specs 3.4.3
       assertTrue(messageID.startsWith("ID:"));
    }




More information about the jboss-cvs-commits mailing list