[jboss-cvs] JBoss Messaging SVN: r3654 - in trunk: src/main/org/jboss/jms/client and 7 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jan 31 10:28:11 EST 2008


Author: clebert.suconic at jboss.com
Date: 2008-01-31 10:28:11 -0500 (Thu, 31 Jan 2008)
New Revision: 3654

Added:
   trunk/src/main/org/jboss/jms/client/api/ClientProducer.java
   trunk/src/main/org/jboss/jms/client/impl/ClientProducerImpl.java
Modified:
   trunk/docs/examples/embedded/src/org/jboss/example/embedded/EmbeddedExample.java
   trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java
   trunk/src/main/org/jboss/jms/client/JBossSession.java
   trunk/src/main/org/jboss/jms/client/api/ClientSession.java
   trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionSendMessageCodec.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionSendMessage.java
   trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/message/JMSMessageIDHeaderTest.java
Log:
JBMESSAGING-681 - Reverting changes

Modified: trunk/docs/examples/embedded/src/org/jboss/example/embedded/EmbeddedExample.java
===================================================================
--- trunk/docs/examples/embedded/src/org/jboss/example/embedded/EmbeddedExample.java	2008-01-31 10:48:18 UTC (rev 3653)
+++ trunk/docs/examples/embedded/src/org/jboss/example/embedded/EmbeddedExample.java	2008-01-31 15:28:11 UTC (rev 3654)
@@ -56,8 +56,9 @@
 
       MessageImpl message = new MessageImpl();
       Destination destination = new DestinationImpl(DestinationType.QUEUE, "Queue1", false);
+      message.putHeader(org.jboss.messaging.core.Message.TEMP_DEST_HEADER_NAME, destination);
       message.setPayload("hello".getBytes());
-      clientSession.send(message, destination);
+      clientSession.send(message);
 
       ClientConsumer clientConsumer = clientSession.createClientConsumer(destination, null, false, null);
       clientConnection.start();

Modified: trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java	2008-01-31 10:48:18 UTC (rev 3653)
+++ trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java	2008-01-31 15:28:11 UTC (rev 3654)
@@ -21,42 +21,22 @@
   */
 package org.jboss.jms.client;
 
-import java.util.UUID;
-
-import javax.jms.BytesMessage;
-import javax.jms.DeliveryMode;
 import javax.jms.Destination;
-import javax.jms.IllegalStateException;
 import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
-import javax.jms.MapMessage;
 import javax.jms.Message;
-import javax.jms.MessageFormatException;
 import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
 import javax.jms.Queue;
 import javax.jms.QueueSender;
-import javax.jms.StreamMessage;
-import javax.jms.TextMessage;
 import javax.jms.Topic;
 import javax.jms.TopicPublisher;
 
-import org.jboss.jms.client.api.ClientSession;
 import org.jboss.jms.destination.JBossDestination;
-import org.jboss.jms.message.JBossBytesMessage;
-import org.jboss.jms.message.JBossMapMessage;
-import org.jboss.jms.message.JBossMessage;
-import org.jboss.jms.message.JBossObjectMessage;
-import org.jboss.jms.message.JBossStreamMessage;
-import org.jboss.jms.message.JBossTextMessage;
-import org.jboss.messaging.core.DestinationType;
-import org.jboss.messaging.core.impl.DestinationImpl;
 import org.jboss.messaging.util.Logger;
 
 /**
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  * @version <tt>$Revision$</tt>
  *
  * $Id$
@@ -71,119 +51,84 @@
    
    // Attributes ----------------------------------------------------
    
-   private boolean trace = log.isTraceEnabled();
-   
-   private ClientSession session;
+   protected org.jboss.jms.client.api.ClientProducer producer;
 
-   private Destination destination;
-
-   private volatile boolean closed;
-   
-   private boolean disableMessageID = false;
-   
-   private boolean disableMessageTimestamp = false;
-   
-   private int priority = 4;
-   
-   private long timeToLive = 0;
-   
-   private int deliveryMode = DeliveryMode.PERSISTENT;
-   
-
    // Constructors --------------------------------------------------
    
-   public JBossMessageProducer(org.jboss.jms.client.api.ClientSession session, Destination destination)
+   public JBossMessageProducer(org.jboss.jms.client.api.ClientProducer producer)
    {
-      this.session = session;
-      this.destination = destination;
+      this.producer = producer;     
    }
    
    // MessageProducer implementation --------------------------------
    
    public void setDisableMessageID(boolean value) throws JMSException
    {
-      checkClosed();
       log.warn("JBoss Messaging does not support disabling message ID generation");
 
-      this.disableMessageID = value;
+      producer.setDisableMessageID(value);
    }
    
    public boolean getDisableMessageID() throws JMSException
    {
-      checkClosed();
-      return disableMessageID;
+      return producer.isDisableMessageID();
    }
    
    public void setDisableMessageTimestamp(boolean value) throws JMSException
    {
-      checkClosed();
-      this.disableMessageTimestamp = value;
+      producer.setDisableMessageTimestamp(value);
    }
    
    public boolean getDisableMessageTimestamp() throws JMSException
    {
-      checkClosed();
-      return disableMessageTimestamp;
+      return producer.isDisableMessageTimestamp();
    }
    
    public void setDeliveryMode(int deliveryMode) throws JMSException
    {
-      checkClosed();
-      this.deliveryMode = deliveryMode;
+      producer.setDeliveryMode(deliveryMode);
    }
    
    public int getDeliveryMode() throws JMSException
    {
-      checkClosed();
-      return deliveryMode;
+      return producer.getDeliveryMode();
    }
    
    public void setPriority(int defaultPriority) throws JMSException
    {
-      checkClosed();
-      
-      if (defaultPriority < 0 || defaultPriority > 9)
-      {
-         throw new JMSException("Invalid message priority (" + priority + "). " +
-                                          "Valid priorities are 0-9");
-      }
-      
-      this.priority = defaultPriority;
+      producer.setPriority(defaultPriority);
    }
    
    public int getPriority() throws JMSException
    {
-      checkClosed();
-      return this.priority;
+      return producer.getPriority();
    }
    
    public void setTimeToLive(long timeToLive) throws JMSException
    {
-      checkClosed();
-      this.timeToLive = timeToLive;
+      producer.setTimeToLive(timeToLive);
    }
    
    public long getTimeToLive() throws JMSException
    {
-      checkClosed();
-      return this.timeToLive;
+      return producer.getTimeToLive();
    }
    
    public Destination getDestination() throws JMSException
    {
-      checkClosed();
-      return this.destination;
+      return producer.getDestination();
    }
    
    public void close() throws JMSException
    {
-      closed = true;
+      producer.closing();
+      producer.close();
    }
    
    public void send(Message message) throws JMSException
    {
       // by default the message never expires
-      send(message, this.deliveryMode, this.priority, timeToLive);
+      send(message, -1, -1, Long.MIN_VALUE);
    }
    
    /**
@@ -197,7 +142,7 @@
    
    public void send(Destination destination, Message message) throws JMSException
    {      
-      send(destination, message, deliveryMode, priority, timeToLive);
+      send(destination, message, -1, -1, Long.MIN_VALUE);
    }
 
    public void send(Destination destination,
@@ -210,133 +155,11 @@
       {
          throw new InvalidDestinationException("Not a JBossDestination:" + destination);
       }
-      
-      
-      checkClosed();
-      
-      m.setJMSDeliveryMode(deliveryMode);
 
-      if (priority < 0 || priority > 9)
-      {
-         throw new MessageFormatException("Invalid message priority (" + priority + "). " +
-                                          "Valid priorities are 0-9");
-      }
-      m.setJMSPriority(priority);
-
-      if (this.getDisableMessageTimestamp())
-      {
-         m.setJMSTimestamp(0l);
-      }
-      else
-      {
-         m.setJMSTimestamp(System.currentTimeMillis());
-      }
-
-      if (timeToLive == 0)
-      {
-         // Zero implies never expires
-         m.setJMSExpiration(0);
-      }
-      else
-      {
-         m.setJMSExpiration(System.currentTimeMillis() + timeToLive);
-      }
-
-      if (destination == null)
-      {
-         // use destination from producer
-         destination = (JBossDestination)getDestination();
-
-         if (destination == null)
-         {
-            throw new UnsupportedOperationException("Destination not specified");
-         }
-
-         if (trace) { log.trace("Using producer's default destination: " + destination); }
-      }
-      else
-      {
-         // if a default destination was already specified then this must be same destination as
-         // that specified in the arguments
-
-         if (getDestination() != null &&
-             !getDestination().equals(destination))
-         {
-            throw new UnsupportedOperationException("Where a default destination is specified " +
-                                                    "for the sender and a destination is " +
-                                                    "specified in the arguments to the send, " +
-                                                    "these destinations must be equal");
-         }
-      }
-
-      JBossMessage jbm;
-
-      boolean foreign = false;
-
-      //First convert from foreign message if appropriate
-      if (!(m instanceof JBossMessage))
-      {
-         // it's a foreign message
-
-         // JMS 1.1 Sect. 3.11.4: A provider must be prepared to accept, from a client,
-         // a message whose implementation is not one of its own.
-
-         // create a matching JBossMessage Type from JMS Type
-         jbm = convertMessage(destination, m);
-
-         foreign = true;
-      }
-      else
-      {
-         jbm = (JBossMessage)m;
-      }
-
-      try
-      {
-         jbm.doBeforeSend();
-      }
-      catch (Exception e)
-      {
-         JMSException exthrown = new JMSException (e.toString());
-         exthrown.initCause(e);
-         throw exthrown;
-      }
-
-      final boolean keepID = false;
-      
-      if (!keepID)
-      {
-         //Generate an id
-         
-         String id = UUID.randomUUID().toString();
-         
-         jbm.setJMSMessageID("ID:" + id);
-      }
-
-      if (foreign)
-      {
-         m.setJMSMessageID(jbm.getJMSMessageID());
-      }
-
-      jbm.setJMSDestination(destination);
-
-      JBossDestination dest = (JBossDestination)destination;
-
-      //Set the destination on the core message - TODO temp for refactoring
-      org.jboss.messaging.core.Destination coreDest =
-         new DestinationImpl(dest.isQueue() ? DestinationType.QUEUE : DestinationType.TOPIC, dest.getName(), dest.isTemporary());
-      
-      //TODO - can optimise this copy to do copy lazily.
-      org.jboss.messaging.core.Message messageToSend = jbm.getCoreMessage().copy();
-
-      //FIXME - temp - for now we set destination as a header - should really be an attribute of the
-      //send packet - along with scheduleddelivery time
-
-      // we now invoke the send(Message) method on the session, which will eventually be fielded
-      // by connection endpoint
-      session.send(messageToSend, coreDest);
+      producer.send((JBossDestination)destination, m, deliveryMode, priority, timeToLive);
    }
 
+
    // TopicPublisher Implementation ---------------------------------
 
    public Topic getTopic() throws JMSException
@@ -386,14 +209,14 @@
    
    // Public --------------------------------------------------------
 
-   public ClientSession getDelegate()
+   public org.jboss.jms.client.api.ClientProducer getDelegate()
    {
-      return session;
+      return producer;
    }
 
    public String toString()
    {
-      return "JBossMessageProducer->" + session;
+      return "JBossMessageProducer->" + producer;
    }
 
    // Package protected ---------------------------------------------
@@ -401,51 +224,6 @@
    // Protected -----------------------------------------------------
    
    // Private -------------------------------------------------------
-
-
-   private void checkClosed() throws IllegalStateException
-   {
-      if (closed || session.isClosed())
-      {
-         throw new IllegalStateException("Producer is closed");
-      }
-   }
-
-   private JBossMessage convertMessage(Destination destination, Message m)
-         throws JMSException
-   {
-      JBossMessage jbm;
-      if (m instanceof BytesMessage)
-      {
-         jbm = new JBossBytesMessage((BytesMessage) m);
-      } 
-      else if (m instanceof MapMessage)
-      {
-         jbm = new JBossMapMessage((MapMessage) m);
-      } 
-      else if (m instanceof ObjectMessage)
-      {
-         jbm = new JBossObjectMessage((ObjectMessage) m);
-      } 
-      else if (m instanceof StreamMessage)
-      {
-         jbm = new JBossStreamMessage((StreamMessage) m);
-      } 
-      else if (m instanceof TextMessage)
-      {
-         jbm = new JBossTextMessage((TextMessage) m);
-      } 
-      else
-      {
-         jbm = new JBossMessage(m);
-      }
-
-      //Set the destination on the original message
-      m.setJMSDestination(destination);
-      return jbm;
-   }
-
-
    
    // Inner classes -------------------------------------------------
 }

Modified: trunk/src/main/org/jboss/jms/client/JBossSession.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossSession.java	2008-01-31 10:48:18 UTC (rev 3653)
+++ trunk/src/main/org/jboss/jms/client/JBossSession.java	2008-01-31 15:28:11 UTC (rev 3654)
@@ -55,6 +55,7 @@
 import javax.transaction.xa.XAResource;
 
 import org.jboss.jms.client.api.ClientConsumer;
+import org.jboss.jms.client.api.ClientProducer;
 import org.jboss.jms.client.api.ClientSession;
 import org.jboss.jms.destination.JBossDestination;
 import org.jboss.jms.destination.JBossQueue;
@@ -285,16 +286,14 @@
 
    public MessageProducer createProducer(Destination d) throws JMSException
    {
-      if (session.isClosed())
-      {
-         throw new IllegalStateException("Session is closed");
-      }
       if (d != null && !(d instanceof JBossDestination))
       {
          throw new InvalidDestinationException("Not a JBossDestination:" + d);
       }
            
-      return new JBossMessageProducer(session, d);
+      ClientProducer producerDelegate = session.createClientProducer((JBossDestination)d);
+      
+      return new JBossMessageProducer(producerDelegate);
    }
 
   public MessageConsumer createConsumer(Destination d) throws JMSException

Added: trunk/src/main/org/jboss/jms/client/api/ClientProducer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/api/ClientProducer.java	                        (rev 0)
+++ trunk/src/main/org/jboss/jms/client/api/ClientProducer.java	2008-01-31 15:28:11 UTC (rev 3654)
@@ -0,0 +1,56 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+
+package org.jboss.jms.client.api;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.jboss.jms.destination.JBossDestination;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ */
+public interface ClientProducer
+{
+   String getID();
+   
+   void setDisableMessageID(boolean value) throws JMSException;
+   
+   boolean isDisableMessageID() throws JMSException;
+   
+   void setDisableMessageTimestamp(boolean value) throws JMSException;
+   
+   boolean isDisableMessageTimestamp() throws JMSException;
+   
+   void setDeliveryMode(int deliveryMode) throws JMSException;
+   
+   int getDeliveryMode() throws JMSException;
+   
+   void setPriority(int defaultPriority) throws JMSException;
+   
+   int getPriority() throws JMSException;
+   
+   void setTimeToLive(long timeToLive) throws JMSException;
+   
+   long getTimeToLive() throws JMSException;
+   
+   JBossDestination getDestination() throws JMSException;
+   
+   void send(JBossDestination destination,
+             Message message,
+             int deliveryMode,
+             int priority,
+             long timeToLive) throws JMSException;
+   
+   void closing() throws JMSException;
+   
+   void close() throws JMSException;
+
+}


Property changes on: trunk/src/main/org/jboss/jms/client/api/ClientProducer.java
___________________________________________________________________
Name: svn:keywords
   + Id LastChangedDate Author Revision

Modified: trunk/src/main/org/jboss/jms/client/api/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/api/ClientSession.java	2008-01-31 10:48:18 UTC (rev 3653)
+++ trunk/src/main/org/jboss/jms/client/api/ClientSession.java	2008-01-31 15:28:11 UTC (rev 3654)
@@ -32,6 +32,8 @@
    
    ClientBrowser createClientBrowser(Destination queue, String messageSelector) throws JMSException;
    
+   ClientProducer createClientProducer(JBossDestination destination) throws JMSException;
+
    JBossQueue createQueue(String queueName) throws JMSException;
 
    JBossTopic createTopic(String topicName) throws JMSException;
@@ -44,7 +46,7 @@
 
    void unsubscribe(String subscriptionName) throws JMSException;
 
-   void send(Message message, Destination destination) throws JMSException;
+   void send(Message message) throws JMSException;
 
    XAResource getXAResource();
 
@@ -56,6 +58,8 @@
    
    void removeConsumer(ClientConsumer consumer) throws JMSException;
    
+   void removeProducer(ClientProducer producer);
+   
    void removeBrowser(ClientBrowser browser);
    
    boolean isClosed();

Added: trunk/src/main/org/jboss/jms/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientProducerImpl.java	                        (rev 0)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientProducerImpl.java	2008-01-31 15:28:11 UTC (rev 3654)
@@ -0,0 +1,394 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.jms.client.impl;
+
+import java.util.UUID;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageFormatException;
+import javax.jms.ObjectMessage;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+import org.jboss.jms.client.api.ClientProducer;
+import org.jboss.jms.client.api.ClientSession;
+import org.jboss.jms.destination.JBossDestination;
+import org.jboss.jms.message.JBossBytesMessage;
+import org.jboss.jms.message.JBossMapMessage;
+import org.jboss.jms.message.JBossMessage;
+import org.jboss.jms.message.JBossObjectMessage;
+import org.jboss.jms.message.JBossStreamMessage;
+import org.jboss.jms.message.JBossTextMessage;
+import org.jboss.messaging.core.DestinationType;
+import org.jboss.messaging.core.impl.DestinationImpl;
+import org.jboss.messaging.util.Logger;
+
+/**
+ * The client-side Producer delegate class.
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class ClientProducerImpl implements ClientProducer
+{
+   // Constants ------------------------------------------------------------------------------------
+
+   private static final Logger log = Logger.getLogger(ClientProducerImpl.class);
+
+   // Attributes -----------------------------------------------------------------------------------
+
+   private boolean trace = log.isTraceEnabled();
+   
+   private ClientSession session;
+   
+   private JBossDestination destination;
+
+   private boolean disableMessageID = false;
+   
+   private boolean disableMessageTimestamp = false;
+   
+   private int priority = 4;
+   
+   private long timeToLive = 0;
+   
+   private int deliveryMode = DeliveryMode.PERSISTENT;
+   
+   private String id;
+   
+   private volatile boolean closed;
+   
+   // Static ---------------------------------------------------------------------------------------
+
+   // Constructors ---------------------------------------------------------------------------------
+      
+   public ClientProducerImpl(ClientSession session, JBossDestination destination)
+   {
+      this.session = session;
+      
+      this.destination = destination;
+      
+      this.id = UUID.randomUUID().toString();
+   }
+
+   public String getID()
+   {
+      return id;
+   }
+
+   public synchronized void close() throws JMSException
+   {
+      if (closed)
+      {
+         return;         
+      }
+      session.removeProducer(this);
+      
+      closed = true;
+   }
+
+   public synchronized void closing() throws JMSException
+   {    
+   }
+
+   public JBossDestination getDestination() throws JMSException
+   {
+      checkClosed();
+      
+      return this.destination;
+   }
+
+   public void send(JBossDestination destination, Message m, int deliveryMode, int priority,
+                    long timeToLive) throws JMSException
+   {
+      checkClosed();
+      
+      // configure the message for sending, using attributes stored as metadata
+
+      if (deliveryMode == -1)
+      {
+         // Use the delivery mode of the producer
+         deliveryMode = getDeliveryMode();
+         if (trace) { log.trace("Using producer's default delivery mode: " + deliveryMode); }
+      }
+      m.setJMSDeliveryMode(deliveryMode);
+
+      if (priority == -1)
+      {
+         // Use the priority of the producer
+         priority = getPriority();
+         if (trace) { log.trace("Using producer's default priority: " + priority); }
+      }
+      if (priority < 0 || priority > 9)
+      {
+         throw new MessageFormatException("Invalid message priority (" + priority + "). " +
+                                          "Valid priorities are 0-9");
+      }
+      m.setJMSPriority(priority);
+
+      if (this.isDisableMessageTimestamp())
+      {
+         m.setJMSTimestamp(0l);
+      }
+      else
+      {
+         m.setJMSTimestamp(System.currentTimeMillis());
+      }
+
+      if (timeToLive == Long.MIN_VALUE)
+      {
+         // Use time to live value from producer
+         timeToLive = getTimeToLive();
+         if (trace) { log.trace("Using producer's default timeToLive: " + timeToLive); }
+      }
+
+      if (timeToLive == 0)
+      {
+         // Zero implies never expires
+         m.setJMSExpiration(0);
+      }
+      else
+      {
+         m.setJMSExpiration(System.currentTimeMillis() + timeToLive);
+      }
+
+      if (destination == null)
+      {
+         // use destination from producer
+         destination = (JBossDestination)getDestination();
+
+         if (destination == null)
+         {
+            throw new UnsupportedOperationException("Destination not specified");
+         }
+
+         if (trace) { log.trace("Using producer's default destination: " + destination); }
+      }
+      else
+      {
+         // if a default destination was already specified then this must be same destination as
+         // that specified in the arguments
+
+         if (getDestination() != null &&
+             !getDestination().equals(destination))
+         {
+            throw new UnsupportedOperationException("Where a default destination is specified " +
+                                                    "for the sender and a destination is " +
+                                                    "specified in the arguments to the send, " +
+                                                    "these destinations must be equal");
+         }
+      }
+
+      JBossMessage jbm;
+
+      boolean foreign = false;
+
+      //First convert from foreign message if appropriate
+      if (!(m instanceof JBossMessage))
+      {
+         // it's a foreign message
+
+         // JMS 1.1 Sect. 3.11.4: A provider must be prepared to accept, from a client,
+         // a message whose implementation is not one of its own.
+
+         // create a matching JBossMessage Type from JMS Type
+         if (m instanceof BytesMessage)
+         {
+            jbm = new JBossBytesMessage((BytesMessage)m);
+         }
+         else if (m instanceof MapMessage)
+         {
+            jbm = new JBossMapMessage((MapMessage)m);
+         }
+         else if (m instanceof ObjectMessage)
+         {
+            jbm = new JBossObjectMessage((ObjectMessage)m);
+         }
+         else if (m instanceof StreamMessage)
+         {
+            jbm = new JBossStreamMessage((StreamMessage)m);
+         }
+         else if (m instanceof TextMessage)
+         {
+            jbm = new JBossTextMessage((TextMessage)m);
+         }
+         else
+         {
+            jbm = new JBossMessage(m);
+         }
+
+         //Set the destination on the original message
+         m.setJMSDestination(destination);
+
+         foreign = true;
+      }
+      else
+      {
+         jbm = (JBossMessage)m;
+      }
+
+      final boolean keepID = false;
+      
+      if (!keepID)
+      {
+         //Generate an id
+         
+         String id = UUID.randomUUID().toString();
+         
+         jbm.setJMSMessageID("ID:" + id);
+      }
+
+      if (foreign)
+      {
+         m.setJMSMessageID(jbm.getJMSMessageID());
+      }
+
+      jbm.setJMSDestination(destination);
+
+      try
+      {
+         jbm.doBeforeSend();
+      }
+      catch (Exception e)
+      {
+         JMSException exthrown = new JMSException (e.toString());
+         exthrown.initCause(e);
+         throw exthrown;
+      }
+
+      JBossDestination dest = (JBossDestination)destination;
+
+      //Set the destination on the core message - TODO temp for refactoring
+      org.jboss.messaging.core.Destination coreDest =
+         new DestinationImpl(dest.isQueue() ? DestinationType.QUEUE : DestinationType.TOPIC, dest.getName(), dest.isTemporary());
+      
+      //TODO - can optimise this copy to do copy lazily.
+      org.jboss.messaging.core.Message messageToSend = jbm.getCoreMessage().copy();
+
+      //FIXME - temp - for now we set destination as a header - should really be an attribute of the
+      //send packet - along with scheduleddelivery time
+
+      messageToSend.putHeader(org.jboss.messaging.core.Message.TEMP_DEST_HEADER_NAME, coreDest);
+
+      // we now invoke the send(Message) method on the session, which will eventually be fielded
+      // by connection endpoint
+      session.send(messageToSend);
+   }
+
+   public void setDeliveryMode(int deliveryMode) throws JMSException
+   {
+      checkClosed();
+      
+      this.deliveryMode = deliveryMode;
+   }
+
+   public int getDeliveryMode() throws JMSException
+   {
+      checkClosed();
+      
+      return this.deliveryMode;
+   }
+  
+   public boolean isDisableMessageID() throws JMSException
+   {
+      checkClosed();
+      
+      return this.disableMessageID;
+   }
+
+   public void setDisableMessageID(boolean value) throws JMSException
+   {
+      checkClosed();
+      
+      this.disableMessageID = value;   
+   }
+
+   public boolean isDisableMessageTimestamp() throws JMSException
+   {      
+      checkClosed();
+      
+      return this.disableMessageTimestamp;
+   }
+
+   public void setDisableMessageTimestamp(boolean value) throws JMSException
+   {
+      checkClosed();
+      
+      this.disableMessageTimestamp = value;
+   }
+
+   public void setPriority(int priority) throws JMSException
+   {
+      checkClosed();
+      
+      this.priority = priority;
+   }
+
+   public int getPriority() throws JMSException
+   {
+      checkClosed();
+      
+      return this.priority;
+   }
+
+   public long getTimeToLive() throws JMSException
+   {
+      checkClosed();
+      
+      return this.timeToLive;
+   }
+
+   public void setTimeToLive(long timeToLive) throws JMSException
+   {
+      checkClosed();
+      
+      this.timeToLive = timeToLive;
+   }
+
+   // Public ---------------------------------------------------------------------------------------
+
+   // Protected ------------------------------------------------------------------------------------
+   
+   // Package Private ------------------------------------------------------------------------------
+
+   // Private --------------------------------------------------------------------------------------
+
+   private void checkClosed() throws IllegalStateException
+   {
+      if (closed)
+      {
+         throw new IllegalStateException("Producer is closed");
+      }
+   }
+   
+   // Inner Classes --------------------------------------------------------------------------------
+
+}


Property changes on: trunk/src/main/org/jboss/jms/client/impl/ClientProducerImpl.java
___________________________________________________________________
Name: svn:keywords
   + Id LastChangedDate Author Revision

Modified: trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java	2008-01-31 10:48:18 UTC (rev 3653)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java	2008-01-31 15:28:11 UTC (rev 3654)
@@ -35,6 +35,7 @@
 import org.jboss.jms.client.api.ClientBrowser;
 import org.jboss.jms.client.api.ClientConnection;
 import org.jboss.jms.client.api.ClientConsumer;
+import org.jboss.jms.client.api.ClientProducer;
 import org.jboss.jms.client.api.ClientSession;
 import org.jboss.jms.client.remoting.MessagingRemotingConnection;
 import org.jboss.jms.destination.JBossDestination;
@@ -115,6 +116,8 @@
    
    private Set<ClientBrowser> browsers = new HashSet<ClientBrowser>();
    
+   private Set<ClientProducer> producers = new HashSet<ClientProducer>();
+   
    private Map<String, ClientConsumer> consumers = new HashMap<String, ClientConsumer>();
    
       
@@ -281,6 +284,17 @@
       return consumer;
    }
    
+   public ClientProducer createClientProducer(JBossDestination destination) throws JMSException
+   {
+      checkClosed();
+      
+      ClientProducer producer = new ClientProducerImpl(this, destination);
+  
+      producers.add(producer);
+      
+      return producer;
+   }
+
    public JBossQueue createQueue(String queueName) throws JMSException
    {
       checkClosed();
@@ -381,11 +395,11 @@
       return xaResource;
    }
 
-   public void send(Message m, Destination dest) throws JMSException
+   public void send(Message m) throws JMSException
    {
       checkClosed();
       
-      SessionSendMessage message = new SessionSendMessage(m, dest);
+      SessionSendMessage message = new SessionSendMessage(m);
       
       remotingConnection.send(id, message, !m.isDurable());
    }
@@ -403,6 +417,11 @@
       remotingConnection.send(id, new SessionCancelMessage(-1, false));      
    }
    
+   public void removeProducer(ClientProducer producer)
+   {
+      producers.remove(producer);
+   }
+   
    public void removeBrowser(ClientBrowser browser)
    {
       browsers.remove(browser);
@@ -455,6 +474,15 @@
          consumer.close();
       }
       
+      Set<ClientProducer> producersClone = new HashSet<ClientProducer>(producers);
+      
+      for (ClientProducer producer: producersClone)
+      {
+         producer.closing();
+         
+         producer.close();
+      }
+      
       Set<ClientBrowser> browsersClone = new HashSet<ClientBrowser>(browsers);
       
       for (ClientBrowser browser: browsersClone)

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-01-31 10:48:18 UTC (rev 3653)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-01-31 15:28:11 UTC (rev 3654)
@@ -478,10 +478,12 @@
       }
    }
 
-   private void send(Message msg, Destination dest) throws JMSException
+   private void send(Message msg) throws JMSException
    {
       try
       {
+         Destination dest = (Destination)msg.getHeader(org.jboss.messaging.core.Message.TEMP_DEST_HEADER_NAME);
+
          //Assign the message an internal id - this is used to key it in the store and also used to 
          //handle delivery
          
@@ -1503,7 +1505,7 @@
             {
                SessionSendMessage message = (SessionSendMessage) packet;
               
-               send(message.getMessage(), message.getDestination());
+               send(message.getMessage());
 
                if (message.getMessage().isDurable())
                {

Modified: trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionSendMessageCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionSendMessageCodec.java	2008-01-31 10:48:18 UTC (rev 3653)
+++ trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionSendMessageCodec.java	2008-01-31 15:28:11 UTC (rev 3654)
@@ -8,10 +8,7 @@
 
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SENDMESSAGE;
 
-import org.jboss.messaging.core.Destination;
-import org.jboss.messaging.core.DestinationType;
 import org.jboss.messaging.core.Message;
-import org.jboss.messaging.core.impl.DestinationImpl;
 import org.jboss.messaging.core.remoting.wireformat.SessionSendMessage;
 
 /**
@@ -39,17 +36,13 @@
    @Override
    protected void encodeBody(SessionSendMessage message, RemotingBuffer out) throws Exception
    {
-      Destination dest = message.getDestination();
       byte[] encodedMsg = encodeMessage(message.getMessage());   
 
-      int bodyLength = INT_LENGTH + encodedMsg.length + INT_LENGTH + BOOLEAN_LENGTH + sizeof(dest.getName());
+      int bodyLength = INT_LENGTH + encodedMsg.length;
 
       out.putInt(bodyLength);
       out.putInt(encodedMsg.length);
       out.put(encodedMsg);
-      out.putInt(DestinationType.toInt(dest.getType()));
-      out.putBoolean(dest.isTemporary());
-      out.putNullableString(dest.getName());
    }
 
    @Override
@@ -66,14 +59,8 @@
       byte[] encodedMsg = new byte[msgLength];
       in.get(encodedMsg);
       Message msg = decodeMessage(encodedMsg);
-      
-      int destinationType = in.getInt();
-      boolean isTemporary = in.getBoolean();
-      String name = in.getNullableString();
-      
-      DestinationImpl dest = new DestinationImpl(DestinationType.fromInt(destinationType), name, isTemporary);
 
-      return new SessionSendMessage(msg, dest);
+      return new SessionSendMessage(msg);
    }
 
    // Package protected ---------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionSendMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionSendMessage.java	2008-01-31 10:48:18 UTC (rev 3653)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionSendMessage.java	2008-01-31 15:28:11 UTC (rev 3654)
@@ -8,7 +8,6 @@
 
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SENDMESSAGE;
 
-import org.jboss.messaging.core.Destination;
 import org.jboss.messaging.core.Message;
 
 /**
@@ -24,20 +23,18 @@
    // Attributes ----------------------------------------------------
 
    private final Message message;
-   private final Destination destination;
 
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionSendMessage(Message message, Destination destination)
+   public SessionSendMessage(Message message)
    {
       super(MSG_SENDMESSAGE);
 
       assert message != null;
 
       this.message = message;
-      this.destination = destination;
    }
 
    // Public --------------------------------------------------------
@@ -46,11 +43,6 @@
    {
       return message;
    }
-   
-   public Destination getDestination()
-   {
-      return destination;
-   }
 
    @Override
    public String toString()

Modified: trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java	2008-01-31 10:48:18 UTC (rev 3653)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java	2008-01-31 15:28:11 UTC (rev 3654)
@@ -547,14 +547,12 @@
 
    public void testSendMessage() throws Exception
    {
-      Destination originalDestination = new DestinationImpl(DestinationType.QUEUE, java.util.UUID.randomUUID().toString(), true);
-      SessionSendMessage packet = new SessionSendMessage(new MessageImpl(), originalDestination);
+      SessionSendMessage packet = new SessionSendMessage(new MessageImpl());
 
       AbstractPacketCodec codec = new SessionSendMessageCodec();
       SimpleRemotingBuffer buffer = encode(packet, codec);
       checkHeader(buffer, packet);
-      checkBody(buffer, encodeMessage(packet.getMessage()), DestinationType.toInt(originalDestination.getType()),
-                        originalDestination.isTemporary(), originalDestination.getName());
+      checkBody(buffer, encodeMessage(packet.getMessage()));
       buffer.rewind();
 
       AbstractPacket p = codec.decode(buffer);
@@ -564,9 +562,6 @@
       assertEquals(MSG_SENDMESSAGE, decodedPacket.getType());
       assertEquals(packet.getMessage().getMessageID(), decodedPacket
             .getMessage().getMessageID());
-      assertEquals(originalDestination.getName(), packet.getDestination().getName());
-      assertEquals(DestinationType.QUEUE, packet.getDestination().getType());
-      assertEquals(originalDestination.isTemporary(), packet.getDestination().isTemporary());
    }
 
    public void testCreateConsumerRequest() throws Exception

Modified: trunk/tests/src/org/jboss/test/messaging/jms/message/JMSMessageIDHeaderTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/message/JMSMessageIDHeaderTest.java	2008-01-31 10:48:18 UTC (rev 3653)
+++ trunk/tests/src/org/jboss/test/messaging/jms/message/JMSMessageIDHeaderTest.java	2008-01-31 15:28:11 UTC (rev 3654)
@@ -49,7 +49,6 @@
       Message m = queueProducerSession.createMessage();
       queueProducer.send(m);
       String messageID = queueConsumer.receive().getJMSMessageID();
-      assertNotNull(messageID);
       // JMS1.1 specs 3.4.3
       assertTrue(messageID.startsWith("ID:"));
    }




More information about the jboss-cvs-commits mailing list