[jboss-cvs] JBoss Messaging SVN: r3654 - in trunk: src/main/org/jboss/jms/client and 7 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jan 31 10:28:11 EST 2008
Author: clebert.suconic at jboss.com
Date: 2008-01-31 10:28:11 -0500 (Thu, 31 Jan 2008)
New Revision: 3654
Added:
trunk/src/main/org/jboss/jms/client/api/ClientProducer.java
trunk/src/main/org/jboss/jms/client/impl/ClientProducerImpl.java
Modified:
trunk/docs/examples/embedded/src/org/jboss/example/embedded/EmbeddedExample.java
trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java
trunk/src/main/org/jboss/jms/client/JBossSession.java
trunk/src/main/org/jboss/jms/client/api/ClientSession.java
trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionSendMessageCodec.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionSendMessage.java
trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java
trunk/tests/src/org/jboss/test/messaging/jms/message/JMSMessageIDHeaderTest.java
Log:
JBMESSAGING-681 - Reverting changes
Modified: trunk/docs/examples/embedded/src/org/jboss/example/embedded/EmbeddedExample.java
===================================================================
--- trunk/docs/examples/embedded/src/org/jboss/example/embedded/EmbeddedExample.java 2008-01-31 10:48:18 UTC (rev 3653)
+++ trunk/docs/examples/embedded/src/org/jboss/example/embedded/EmbeddedExample.java 2008-01-31 15:28:11 UTC (rev 3654)
@@ -56,8 +56,9 @@
MessageImpl message = new MessageImpl();
Destination destination = new DestinationImpl(DestinationType.QUEUE, "Queue1", false);
+ message.putHeader(org.jboss.messaging.core.Message.TEMP_DEST_HEADER_NAME, destination);
message.setPayload("hello".getBytes());
- clientSession.send(message, destination);
+ clientSession.send(message);
ClientConsumer clientConsumer = clientSession.createClientConsumer(destination, null, false, null);
clientConnection.start();
Modified: trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java 2008-01-31 10:48:18 UTC (rev 3653)
+++ trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java 2008-01-31 15:28:11 UTC (rev 3654)
@@ -21,42 +21,22 @@
*/
package org.jboss.jms.client;
-import java.util.UUID;
-
-import javax.jms.BytesMessage;
-import javax.jms.DeliveryMode;
import javax.jms.Destination;
-import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
-import javax.jms.MapMessage;
import javax.jms.Message;
-import javax.jms.MessageFormatException;
import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueSender;
-import javax.jms.StreamMessage;
-import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
-import org.jboss.jms.client.api.ClientSession;
import org.jboss.jms.destination.JBossDestination;
-import org.jboss.jms.message.JBossBytesMessage;
-import org.jboss.jms.message.JBossMapMessage;
-import org.jboss.jms.message.JBossMessage;
-import org.jboss.jms.message.JBossObjectMessage;
-import org.jboss.jms.message.JBossStreamMessage;
-import org.jboss.jms.message.JBossTextMessage;
-import org.jboss.messaging.core.DestinationType;
-import org.jboss.messaging.core.impl.DestinationImpl;
import org.jboss.messaging.util.Logger;
/**
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
* @version <tt>$Revision$</tt>
*
* $Id$
@@ -71,119 +51,84 @@
// Attributes ----------------------------------------------------
- private boolean trace = log.isTraceEnabled();
-
- private ClientSession session;
+ protected org.jboss.jms.client.api.ClientProducer producer;
- private Destination destination;
-
- private volatile boolean closed;
-
- private boolean disableMessageID = false;
-
- private boolean disableMessageTimestamp = false;
-
- private int priority = 4;
-
- private long timeToLive = 0;
-
- private int deliveryMode = DeliveryMode.PERSISTENT;
-
-
// Constructors --------------------------------------------------
- public JBossMessageProducer(org.jboss.jms.client.api.ClientSession session, Destination destination)
+ public JBossMessageProducer(org.jboss.jms.client.api.ClientProducer producer)
{
- this.session = session;
- this.destination = destination;
+ this.producer = producer;
}
// MessageProducer implementation --------------------------------
public void setDisableMessageID(boolean value) throws JMSException
{
- checkClosed();
log.warn("JBoss Messaging does not support disabling message ID generation");
- this.disableMessageID = value;
+ producer.setDisableMessageID(value);
}
public boolean getDisableMessageID() throws JMSException
{
- checkClosed();
- return disableMessageID;
+ return producer.isDisableMessageID();
}
public void setDisableMessageTimestamp(boolean value) throws JMSException
{
- checkClosed();
- this.disableMessageTimestamp = value;
+ producer.setDisableMessageTimestamp(value);
}
public boolean getDisableMessageTimestamp() throws JMSException
{
- checkClosed();
- return disableMessageTimestamp;
+ return producer.isDisableMessageTimestamp();
}
public void setDeliveryMode(int deliveryMode) throws JMSException
{
- checkClosed();
- this.deliveryMode = deliveryMode;
+ producer.setDeliveryMode(deliveryMode);
}
public int getDeliveryMode() throws JMSException
{
- checkClosed();
- return deliveryMode;
+ return producer.getDeliveryMode();
}
public void setPriority(int defaultPriority) throws JMSException
{
- checkClosed();
-
- if (defaultPriority < 0 || defaultPriority > 9)
- {
- throw new JMSException("Invalid message priority (" + priority + "). " +
- "Valid priorities are 0-9");
- }
-
- this.priority = defaultPriority;
+ producer.setPriority(defaultPriority);
}
public int getPriority() throws JMSException
{
- checkClosed();
- return this.priority;
+ return producer.getPriority();
}
public void setTimeToLive(long timeToLive) throws JMSException
{
- checkClosed();
- this.timeToLive = timeToLive;
+ producer.setTimeToLive(timeToLive);
}
public long getTimeToLive() throws JMSException
{
- checkClosed();
- return this.timeToLive;
+ return producer.getTimeToLive();
}
public Destination getDestination() throws JMSException
{
- checkClosed();
- return this.destination;
+ return producer.getDestination();
}
public void close() throws JMSException
{
- closed = true;
+ producer.closing();
+ producer.close();
}
public void send(Message message) throws JMSException
{
// by default the message never expires
- send(message, this.deliveryMode, this.priority, timeToLive);
+ send(message, -1, -1, Long.MIN_VALUE);
}
/**
@@ -197,7 +142,7 @@
public void send(Destination destination, Message message) throws JMSException
{
- send(destination, message, deliveryMode, priority, timeToLive);
+ send(destination, message, -1, -1, Long.MIN_VALUE);
}
public void send(Destination destination,
@@ -210,133 +155,11 @@
{
throw new InvalidDestinationException("Not a JBossDestination:" + destination);
}
-
-
- checkClosed();
-
- m.setJMSDeliveryMode(deliveryMode);
- if (priority < 0 || priority > 9)
- {
- throw new MessageFormatException("Invalid message priority (" + priority + "). " +
- "Valid priorities are 0-9");
- }
- m.setJMSPriority(priority);
-
- if (this.getDisableMessageTimestamp())
- {
- m.setJMSTimestamp(0l);
- }
- else
- {
- m.setJMSTimestamp(System.currentTimeMillis());
- }
-
- if (timeToLive == 0)
- {
- // Zero implies never expires
- m.setJMSExpiration(0);
- }
- else
- {
- m.setJMSExpiration(System.currentTimeMillis() + timeToLive);
- }
-
- if (destination == null)
- {
- // use destination from producer
- destination = (JBossDestination)getDestination();
-
- if (destination == null)
- {
- throw new UnsupportedOperationException("Destination not specified");
- }
-
- if (trace) { log.trace("Using producer's default destination: " + destination); }
- }
- else
- {
- // if a default destination was already specified then this must be same destination as
- // that specified in the arguments
-
- if (getDestination() != null &&
- !getDestination().equals(destination))
- {
- throw new UnsupportedOperationException("Where a default destination is specified " +
- "for the sender and a destination is " +
- "specified in the arguments to the send, " +
- "these destinations must be equal");
- }
- }
-
- JBossMessage jbm;
-
- boolean foreign = false;
-
- //First convert from foreign message if appropriate
- if (!(m instanceof JBossMessage))
- {
- // it's a foreign message
-
- // JMS 1.1 Sect. 3.11.4: A provider must be prepared to accept, from a client,
- // a message whose implementation is not one of its own.
-
- // create a matching JBossMessage Type from JMS Type
- jbm = convertMessage(destination, m);
-
- foreign = true;
- }
- else
- {
- jbm = (JBossMessage)m;
- }
-
- try
- {
- jbm.doBeforeSend();
- }
- catch (Exception e)
- {
- JMSException exthrown = new JMSException (e.toString());
- exthrown.initCause(e);
- throw exthrown;
- }
-
- final boolean keepID = false;
-
- if (!keepID)
- {
- //Generate an id
-
- String id = UUID.randomUUID().toString();
-
- jbm.setJMSMessageID("ID:" + id);
- }
-
- if (foreign)
- {
- m.setJMSMessageID(jbm.getJMSMessageID());
- }
-
- jbm.setJMSDestination(destination);
-
- JBossDestination dest = (JBossDestination)destination;
-
- //Set the destination on the core message - TODO temp for refactoring
- org.jboss.messaging.core.Destination coreDest =
- new DestinationImpl(dest.isQueue() ? DestinationType.QUEUE : DestinationType.TOPIC, dest.getName(), dest.isTemporary());
-
- //TODO - can optimise this copy to do copy lazily.
- org.jboss.messaging.core.Message messageToSend = jbm.getCoreMessage().copy();
-
- //FIXME - temp - for now we set destination as a header - should really be an attribute of the
- //send packet - along with scheduleddelivery time
-
- // we now invoke the send(Message) method on the session, which will eventually be fielded
- // by connection endpoint
- session.send(messageToSend, coreDest);
+ producer.send((JBossDestination)destination, m, deliveryMode, priority, timeToLive);
}
+
// TopicPublisher Implementation ---------------------------------
public Topic getTopic() throws JMSException
@@ -386,14 +209,14 @@
// Public --------------------------------------------------------
- public ClientSession getDelegate()
+ public org.jboss.jms.client.api.ClientProducer getDelegate()
{
- return session;
+ return producer;
}
public String toString()
{
- return "JBossMessageProducer->" + session;
+ return "JBossMessageProducer->" + producer;
}
// Package protected ---------------------------------------------
@@ -401,51 +224,6 @@
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
-
-
- private void checkClosed() throws IllegalStateException
- {
- if (closed || session.isClosed())
- {
- throw new IllegalStateException("Producer is closed");
- }
- }
-
- private JBossMessage convertMessage(Destination destination, Message m)
- throws JMSException
- {
- JBossMessage jbm;
- if (m instanceof BytesMessage)
- {
- jbm = new JBossBytesMessage((BytesMessage) m);
- }
- else if (m instanceof MapMessage)
- {
- jbm = new JBossMapMessage((MapMessage) m);
- }
- else if (m instanceof ObjectMessage)
- {
- jbm = new JBossObjectMessage((ObjectMessage) m);
- }
- else if (m instanceof StreamMessage)
- {
- jbm = new JBossStreamMessage((StreamMessage) m);
- }
- else if (m instanceof TextMessage)
- {
- jbm = new JBossTextMessage((TextMessage) m);
- }
- else
- {
- jbm = new JBossMessage(m);
- }
-
- //Set the destination on the original message
- m.setJMSDestination(destination);
- return jbm;
- }
-
-
// Inner classes -------------------------------------------------
}
Modified: trunk/src/main/org/jboss/jms/client/JBossSession.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossSession.java 2008-01-31 10:48:18 UTC (rev 3653)
+++ trunk/src/main/org/jboss/jms/client/JBossSession.java 2008-01-31 15:28:11 UTC (rev 3654)
@@ -55,6 +55,7 @@
import javax.transaction.xa.XAResource;
import org.jboss.jms.client.api.ClientConsumer;
+import org.jboss.jms.client.api.ClientProducer;
import org.jboss.jms.client.api.ClientSession;
import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.destination.JBossQueue;
@@ -285,16 +286,14 @@
public MessageProducer createProducer(Destination d) throws JMSException
{
- if (session.isClosed())
- {
- throw new IllegalStateException("Session is closed");
- }
if (d != null && !(d instanceof JBossDestination))
{
throw new InvalidDestinationException("Not a JBossDestination:" + d);
}
- return new JBossMessageProducer(session, d);
+ ClientProducer producerDelegate = session.createClientProducer((JBossDestination)d);
+
+ return new JBossMessageProducer(producerDelegate);
}
public MessageConsumer createConsumer(Destination d) throws JMSException
Added: trunk/src/main/org/jboss/jms/client/api/ClientProducer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/api/ClientProducer.java (rev 0)
+++ trunk/src/main/org/jboss/jms/client/api/ClientProducer.java 2008-01-31 15:28:11 UTC (rev 3654)
@@ -0,0 +1,56 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+
+package org.jboss.jms.client.api;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.jboss.jms.destination.JBossDestination;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ */
+public interface ClientProducer
+{
+ String getID();
+
+ void setDisableMessageID(boolean value) throws JMSException;
+
+ boolean isDisableMessageID() throws JMSException;
+
+ void setDisableMessageTimestamp(boolean value) throws JMSException;
+
+ boolean isDisableMessageTimestamp() throws JMSException;
+
+ void setDeliveryMode(int deliveryMode) throws JMSException;
+
+ int getDeliveryMode() throws JMSException;
+
+ void setPriority(int defaultPriority) throws JMSException;
+
+ int getPriority() throws JMSException;
+
+ void setTimeToLive(long timeToLive) throws JMSException;
+
+ long getTimeToLive() throws JMSException;
+
+ JBossDestination getDestination() throws JMSException;
+
+ void send(JBossDestination destination,
+ Message message,
+ int deliveryMode,
+ int priority,
+ long timeToLive) throws JMSException;
+
+ void closing() throws JMSException;
+
+ void close() throws JMSException;
+
+}
Property changes on: trunk/src/main/org/jboss/jms/client/api/ClientProducer.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
Modified: trunk/src/main/org/jboss/jms/client/api/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/api/ClientSession.java 2008-01-31 10:48:18 UTC (rev 3653)
+++ trunk/src/main/org/jboss/jms/client/api/ClientSession.java 2008-01-31 15:28:11 UTC (rev 3654)
@@ -32,6 +32,8 @@
ClientBrowser createClientBrowser(Destination queue, String messageSelector) throws JMSException;
+ ClientProducer createClientProducer(JBossDestination destination) throws JMSException;
+
JBossQueue createQueue(String queueName) throws JMSException;
JBossTopic createTopic(String topicName) throws JMSException;
@@ -44,7 +46,7 @@
void unsubscribe(String subscriptionName) throws JMSException;
- void send(Message message, Destination destination) throws JMSException;
+ void send(Message message) throws JMSException;
XAResource getXAResource();
@@ -56,6 +58,8 @@
void removeConsumer(ClientConsumer consumer) throws JMSException;
+ void removeProducer(ClientProducer producer);
+
void removeBrowser(ClientBrowser browser);
boolean isClosed();
Added: trunk/src/main/org/jboss/jms/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientProducerImpl.java (rev 0)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientProducerImpl.java 2008-01-31 15:28:11 UTC (rev 3654)
@@ -0,0 +1,394 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.client.impl;
+
+import java.util.UUID;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageFormatException;
+import javax.jms.ObjectMessage;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+import org.jboss.jms.client.api.ClientProducer;
+import org.jboss.jms.client.api.ClientSession;
+import org.jboss.jms.destination.JBossDestination;
+import org.jboss.jms.message.JBossBytesMessage;
+import org.jboss.jms.message.JBossMapMessage;
+import org.jboss.jms.message.JBossMessage;
+import org.jboss.jms.message.JBossObjectMessage;
+import org.jboss.jms.message.JBossStreamMessage;
+import org.jboss.jms.message.JBossTextMessage;
+import org.jboss.messaging.core.DestinationType;
+import org.jboss.messaging.core.impl.DestinationImpl;
+import org.jboss.messaging.util.Logger;
+
+/**
+ * The client-side Producer delegate class.
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class ClientProducerImpl implements ClientProducer
+{
+ // Constants ------------------------------------------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(ClientProducerImpl.class);
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ private boolean trace = log.isTraceEnabled();
+
+ private ClientSession session;
+
+ private JBossDestination destination;
+
+ private boolean disableMessageID = false;
+
+ private boolean disableMessageTimestamp = false;
+
+ private int priority = 4;
+
+ private long timeToLive = 0;
+
+ private int deliveryMode = DeliveryMode.PERSISTENT;
+
+ private String id;
+
+ private volatile boolean closed;
+
+ // Static ---------------------------------------------------------------------------------------
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ public ClientProducerImpl(ClientSession session, JBossDestination destination)
+ {
+ this.session = session;
+
+ this.destination = destination;
+
+ this.id = UUID.randomUUID().toString();
+ }
+
+ public String getID()
+ {
+ return id;
+ }
+
+ public synchronized void close() throws JMSException
+ {
+ if (closed)
+ {
+ return;
+ }
+ session.removeProducer(this);
+
+ closed = true;
+ }
+
+ public synchronized void closing() throws JMSException
+ {
+ }
+
+ public JBossDestination getDestination() throws JMSException
+ {
+ checkClosed();
+
+ return this.destination;
+ }
+
+ public void send(JBossDestination destination, Message m, int deliveryMode, int priority,
+ long timeToLive) throws JMSException
+ {
+ checkClosed();
+
+ // configure the message for sending, using attributes stored as metadata
+
+ if (deliveryMode == -1)
+ {
+ // Use the delivery mode of the producer
+ deliveryMode = getDeliveryMode();
+ if (trace) { log.trace("Using producer's default delivery mode: " + deliveryMode); }
+ }
+ m.setJMSDeliveryMode(deliveryMode);
+
+ if (priority == -1)
+ {
+ // Use the priority of the producer
+ priority = getPriority();
+ if (trace) { log.trace("Using producer's default priority: " + priority); }
+ }
+ if (priority < 0 || priority > 9)
+ {
+ throw new MessageFormatException("Invalid message priority (" + priority + "). " +
+ "Valid priorities are 0-9");
+ }
+ m.setJMSPriority(priority);
+
+ if (this.isDisableMessageTimestamp())
+ {
+ m.setJMSTimestamp(0l);
+ }
+ else
+ {
+ m.setJMSTimestamp(System.currentTimeMillis());
+ }
+
+ if (timeToLive == Long.MIN_VALUE)
+ {
+ // Use time to live value from producer
+ timeToLive = getTimeToLive();
+ if (trace) { log.trace("Using producer's default timeToLive: " + timeToLive); }
+ }
+
+ if (timeToLive == 0)
+ {
+ // Zero implies never expires
+ m.setJMSExpiration(0);
+ }
+ else
+ {
+ m.setJMSExpiration(System.currentTimeMillis() + timeToLive);
+ }
+
+ if (destination == null)
+ {
+ // use destination from producer
+ destination = (JBossDestination)getDestination();
+
+ if (destination == null)
+ {
+ throw new UnsupportedOperationException("Destination not specified");
+ }
+
+ if (trace) { log.trace("Using producer's default destination: " + destination); }
+ }
+ else
+ {
+ // if a default destination was already specified then this must be same destination as
+ // that specified in the arguments
+
+ if (getDestination() != null &&
+ !getDestination().equals(destination))
+ {
+ throw new UnsupportedOperationException("Where a default destination is specified " +
+ "for the sender and a destination is " +
+ "specified in the arguments to the send, " +
+ "these destinations must be equal");
+ }
+ }
+
+ JBossMessage jbm;
+
+ boolean foreign = false;
+
+ //First convert from foreign message if appropriate
+ if (!(m instanceof JBossMessage))
+ {
+ // it's a foreign message
+
+ // JMS 1.1 Sect. 3.11.4: A provider must be prepared to accept, from a client,
+ // a message whose implementation is not one of its own.
+
+ // create a matching JBossMessage Type from JMS Type
+ if (m instanceof BytesMessage)
+ {
+ jbm = new JBossBytesMessage((BytesMessage)m);
+ }
+ else if (m instanceof MapMessage)
+ {
+ jbm = new JBossMapMessage((MapMessage)m);
+ }
+ else if (m instanceof ObjectMessage)
+ {
+ jbm = new JBossObjectMessage((ObjectMessage)m);
+ }
+ else if (m instanceof StreamMessage)
+ {
+ jbm = new JBossStreamMessage((StreamMessage)m);
+ }
+ else if (m instanceof TextMessage)
+ {
+ jbm = new JBossTextMessage((TextMessage)m);
+ }
+ else
+ {
+ jbm = new JBossMessage(m);
+ }
+
+ //Set the destination on the original message
+ m.setJMSDestination(destination);
+
+ foreign = true;
+ }
+ else
+ {
+ jbm = (JBossMessage)m;
+ }
+
+ final boolean keepID = false;
+
+ if (!keepID)
+ {
+ //Generate an id
+
+ String id = UUID.randomUUID().toString();
+
+ jbm.setJMSMessageID("ID:" + id);
+ }
+
+ if (foreign)
+ {
+ m.setJMSMessageID(jbm.getJMSMessageID());
+ }
+
+ jbm.setJMSDestination(destination);
+
+ try
+ {
+ jbm.doBeforeSend();
+ }
+ catch (Exception e)
+ {
+ JMSException exthrown = new JMSException (e.toString());
+ exthrown.initCause(e);
+ throw exthrown;
+ }
+
+ JBossDestination dest = (JBossDestination)destination;
+
+ //Set the destination on the core message - TODO temp for refactoring
+ org.jboss.messaging.core.Destination coreDest =
+ new DestinationImpl(dest.isQueue() ? DestinationType.QUEUE : DestinationType.TOPIC, dest.getName(), dest.isTemporary());
+
+ //TODO - can optimise this copy to do copy lazily.
+ org.jboss.messaging.core.Message messageToSend = jbm.getCoreMessage().copy();
+
+ //FIXME - temp - for now we set destination as a header - should really be an attribute of the
+ //send packet - along with scheduleddelivery time
+
+ messageToSend.putHeader(org.jboss.messaging.core.Message.TEMP_DEST_HEADER_NAME, coreDest);
+
+ // we now invoke the send(Message) method on the session, which will eventually be fielded
+ // by connection endpoint
+ session.send(messageToSend);
+ }
+
+ public void setDeliveryMode(int deliveryMode) throws JMSException
+ {
+ checkClosed();
+
+ this.deliveryMode = deliveryMode;
+ }
+
+ public int getDeliveryMode() throws JMSException
+ {
+ checkClosed();
+
+ return this.deliveryMode;
+ }
+
+ public boolean isDisableMessageID() throws JMSException
+ {
+ checkClosed();
+
+ return this.disableMessageID;
+ }
+
+ public void setDisableMessageID(boolean value) throws JMSException
+ {
+ checkClosed();
+
+ this.disableMessageID = value;
+ }
+
+ public boolean isDisableMessageTimestamp() throws JMSException
+ {
+ checkClosed();
+
+ return this.disableMessageTimestamp;
+ }
+
+ public void setDisableMessageTimestamp(boolean value) throws JMSException
+ {
+ checkClosed();
+
+ this.disableMessageTimestamp = value;
+ }
+
+ public void setPriority(int priority) throws JMSException
+ {
+ checkClosed();
+
+ this.priority = priority;
+ }
+
+ public int getPriority() throws JMSException
+ {
+ checkClosed();
+
+ return this.priority;
+ }
+
+ public long getTimeToLive() throws JMSException
+ {
+ checkClosed();
+
+ return this.timeToLive;
+ }
+
+ public void setTimeToLive(long timeToLive) throws JMSException
+ {
+ checkClosed();
+
+ this.timeToLive = timeToLive;
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ // Package Private ------------------------------------------------------------------------------
+
+ // Private --------------------------------------------------------------------------------------
+
+ private void checkClosed() throws IllegalStateException
+ {
+ if (closed)
+ {
+ throw new IllegalStateException("Producer is closed");
+ }
+ }
+
+ // Inner Classes --------------------------------------------------------------------------------
+
+}
Property changes on: trunk/src/main/org/jboss/jms/client/impl/ClientProducerImpl.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
Modified: trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java 2008-01-31 10:48:18 UTC (rev 3653)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java 2008-01-31 15:28:11 UTC (rev 3654)
@@ -35,6 +35,7 @@
import org.jboss.jms.client.api.ClientBrowser;
import org.jboss.jms.client.api.ClientConnection;
import org.jboss.jms.client.api.ClientConsumer;
+import org.jboss.jms.client.api.ClientProducer;
import org.jboss.jms.client.api.ClientSession;
import org.jboss.jms.client.remoting.MessagingRemotingConnection;
import org.jboss.jms.destination.JBossDestination;
@@ -115,6 +116,8 @@
private Set<ClientBrowser> browsers = new HashSet<ClientBrowser>();
+ private Set<ClientProducer> producers = new HashSet<ClientProducer>();
+
private Map<String, ClientConsumer> consumers = new HashMap<String, ClientConsumer>();
@@ -281,6 +284,17 @@
return consumer;
}
+ public ClientProducer createClientProducer(JBossDestination destination) throws JMSException
+ {
+ checkClosed();
+
+ ClientProducer producer = new ClientProducerImpl(this, destination);
+
+ producers.add(producer);
+
+ return producer;
+ }
+
public JBossQueue createQueue(String queueName) throws JMSException
{
checkClosed();
@@ -381,11 +395,11 @@
return xaResource;
}
- public void send(Message m, Destination dest) throws JMSException
+ public void send(Message m) throws JMSException
{
checkClosed();
- SessionSendMessage message = new SessionSendMessage(m, dest);
+ SessionSendMessage message = new SessionSendMessage(m);
remotingConnection.send(id, message, !m.isDurable());
}
@@ -403,6 +417,11 @@
remotingConnection.send(id, new SessionCancelMessage(-1, false));
}
+ public void removeProducer(ClientProducer producer)
+ {
+ producers.remove(producer);
+ }
+
public void removeBrowser(ClientBrowser browser)
{
browsers.remove(browser);
@@ -455,6 +474,15 @@
consumer.close();
}
+ Set<ClientProducer> producersClone = new HashSet<ClientProducer>(producers);
+
+ for (ClientProducer producer: producersClone)
+ {
+ producer.closing();
+
+ producer.close();
+ }
+
Set<ClientBrowser> browsersClone = new HashSet<ClientBrowser>(browsers);
for (ClientBrowser browser: browsersClone)
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-01-31 10:48:18 UTC (rev 3653)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-01-31 15:28:11 UTC (rev 3654)
@@ -478,10 +478,12 @@
}
}
- private void send(Message msg, Destination dest) throws JMSException
+ private void send(Message msg) throws JMSException
{
try
{
+ Destination dest = (Destination)msg.getHeader(org.jboss.messaging.core.Message.TEMP_DEST_HEADER_NAME);
+
//Assign the message an internal id - this is used to key it in the store and also used to
//handle delivery
@@ -1503,7 +1505,7 @@
{
SessionSendMessage message = (SessionSendMessage) packet;
- send(message.getMessage(), message.getDestination());
+ send(message.getMessage());
if (message.getMessage().isDurable())
{
Modified: trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionSendMessageCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionSendMessageCodec.java 2008-01-31 10:48:18 UTC (rev 3653)
+++ trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionSendMessageCodec.java 2008-01-31 15:28:11 UTC (rev 3654)
@@ -8,10 +8,7 @@
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SENDMESSAGE;
-import org.jboss.messaging.core.Destination;
-import org.jboss.messaging.core.DestinationType;
import org.jboss.messaging.core.Message;
-import org.jboss.messaging.core.impl.DestinationImpl;
import org.jboss.messaging.core.remoting.wireformat.SessionSendMessage;
/**
@@ -39,17 +36,13 @@
@Override
protected void encodeBody(SessionSendMessage message, RemotingBuffer out) throws Exception
{
- Destination dest = message.getDestination();
byte[] encodedMsg = encodeMessage(message.getMessage());
- int bodyLength = INT_LENGTH + encodedMsg.length + INT_LENGTH + BOOLEAN_LENGTH + sizeof(dest.getName());
+ int bodyLength = INT_LENGTH + encodedMsg.length;
out.putInt(bodyLength);
out.putInt(encodedMsg.length);
out.put(encodedMsg);
- out.putInt(DestinationType.toInt(dest.getType()));
- out.putBoolean(dest.isTemporary());
- out.putNullableString(dest.getName());
}
@Override
@@ -66,14 +59,8 @@
byte[] encodedMsg = new byte[msgLength];
in.get(encodedMsg);
Message msg = decodeMessage(encodedMsg);
-
- int destinationType = in.getInt();
- boolean isTemporary = in.getBoolean();
- String name = in.getNullableString();
-
- DestinationImpl dest = new DestinationImpl(DestinationType.fromInt(destinationType), name, isTemporary);
- return new SessionSendMessage(msg, dest);
+ return new SessionSendMessage(msg);
}
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionSendMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionSendMessage.java 2008-01-31 10:48:18 UTC (rev 3653)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionSendMessage.java 2008-01-31 15:28:11 UTC (rev 3654)
@@ -8,7 +8,6 @@
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SENDMESSAGE;
-import org.jboss.messaging.core.Destination;
import org.jboss.messaging.core.Message;
/**
@@ -24,20 +23,18 @@
// Attributes ----------------------------------------------------
private final Message message;
- private final Destination destination;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionSendMessage(Message message, Destination destination)
+ public SessionSendMessage(Message message)
{
super(MSG_SENDMESSAGE);
assert message != null;
this.message = message;
- this.destination = destination;
}
// Public --------------------------------------------------------
@@ -46,11 +43,6 @@
{
return message;
}
-
- public Destination getDestination()
- {
- return destination;
- }
@Override
public String toString()
Modified: trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java 2008-01-31 10:48:18 UTC (rev 3653)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java 2008-01-31 15:28:11 UTC (rev 3654)
@@ -547,14 +547,12 @@
public void testSendMessage() throws Exception
{
- Destination originalDestination = new DestinationImpl(DestinationType.QUEUE, java.util.UUID.randomUUID().toString(), true);
- SessionSendMessage packet = new SessionSendMessage(new MessageImpl(), originalDestination);
+ SessionSendMessage packet = new SessionSendMessage(new MessageImpl());
AbstractPacketCodec codec = new SessionSendMessageCodec();
SimpleRemotingBuffer buffer = encode(packet, codec);
checkHeader(buffer, packet);
- checkBody(buffer, encodeMessage(packet.getMessage()), DestinationType.toInt(originalDestination.getType()),
- originalDestination.isTemporary(), originalDestination.getName());
+ checkBody(buffer, encodeMessage(packet.getMessage()));
buffer.rewind();
AbstractPacket p = codec.decode(buffer);
@@ -564,9 +562,6 @@
assertEquals(MSG_SENDMESSAGE, decodedPacket.getType());
assertEquals(packet.getMessage().getMessageID(), decodedPacket
.getMessage().getMessageID());
- assertEquals(originalDestination.getName(), packet.getDestination().getName());
- assertEquals(DestinationType.QUEUE, packet.getDestination().getType());
- assertEquals(originalDestination.isTemporary(), packet.getDestination().isTemporary());
}
public void testCreateConsumerRequest() throws Exception
Modified: trunk/tests/src/org/jboss/test/messaging/jms/message/JMSMessageIDHeaderTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/message/JMSMessageIDHeaderTest.java 2008-01-31 10:48:18 UTC (rev 3653)
+++ trunk/tests/src/org/jboss/test/messaging/jms/message/JMSMessageIDHeaderTest.java 2008-01-31 15:28:11 UTC (rev 3654)
@@ -49,7 +49,6 @@
Message m = queueProducerSession.createMessage();
queueProducer.send(m);
String messageID = queueConsumer.receive().getJMSMessageID();
- assertNotNull(messageID);
// JMS1.1 specs 3.4.3
assertTrue(messageID.startsWith("ID:"));
}
More information about the jboss-cvs-commits
mailing list