[jboss-cvs] JBoss Messaging SVN: r6231 - trunk/src/main/org/jboss/messaging/jms/client.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Mar 31 06:51:19 EDT 2009
Author: timfox
Date: 2009-03-31 06:51:19 -0400 (Tue, 31 Mar 2009)
New Revision: 6231
Modified:
trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java
Log:
fixed message producer test
Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java 2009-03-31 10:22:59 UTC (rev 6230)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java 2009-03-31 10:51:19 UTC (rev 6231)
@@ -98,9 +98,9 @@
public static final int TYPE_TOPIC_SESSION = 2;
public static final int SERVER_ACKNOWLEDGE = 4;
-
+
private static SimpleString REJECTING_FILTER = new SimpleString("_JBMX=-1");
-
+
// Static --------------------------------------------------------
private static final Logger log = Logger.getLogger(JBossSession.class);
@@ -123,7 +123,6 @@
private final Set<JBossMessageConsumer> consumers = new HashSet<JBossMessageConsumer>();
-
// Constructors --------------------------------------------------
public JBossSession(final JBossConnection connection,
@@ -328,10 +327,32 @@
throw new InvalidDestinationException("Not a JBoss Destination:" + destination);
}
- JBossDestination jbd = (JBossDestination) destination;
-
try
{
+ JBossDestination jbd = (JBossDestination)destination;
+
+ if (jbd != null)
+ {
+ if (jbd instanceof Queue)
+ {
+ SessionQueueQueryResponseMessage response = session.queueQuery(jbd.getSimpleAddress());
+
+ if (!response.isExists())
+ {
+ throw new InvalidDestinationException("Queue " + jbd.getName() + " does not exist");
+ }
+ }
+ else
+ {
+ SessionBindingQueryResponseMessage response = session.bindingQuery(jbd.getSimpleAddress());
+
+ if (!response.isExists())
+ {
+ throw new InvalidDestinationException("Topic " + jbd.getName() + " does not exist");
+ }
+ }
+ }
+
ClientProducer producer = session.createProducer(jbd == null ? null : jbd.getSimpleAddress());
return new JBossMessageProducer(connection, producer, jbd, session);
@@ -366,7 +387,7 @@
throw new InvalidDestinationException("Not a JBossDestination:" + destination);
}
- JBossDestination jbdest = (JBossDestination) destination;
+ JBossDestination jbdest = (JBossDestination)destination;
JBossMessageConsumer consumer = createConsumer(jbdest, null, messageSelector, noLocal);
@@ -459,7 +480,7 @@
messageSelector = null;
}
- JBossDestination jbdest = (JBossDestination) topic;
+ JBossDestination jbdest = (JBossDestination)topic;
return createConsumer(jbdest, name, messageSelector, noLocal);
}
@@ -537,7 +558,7 @@
else
{
// Durable sub
-
+
if (connection.getClientID() == null)
{
throw new InvalidClientIDException("Cannot create durable subscription - client ID has not been set");
@@ -550,7 +571,7 @@
queueName = new SimpleString(JBossTopic.createQueueNameForDurableSubscription(connection.getClientID(),
subscriptionName));
-
+
SessionQueueQueryResponseMessage subResponse = session.queueQuery(queueName);
if (!subResponse.isExists())
@@ -580,9 +601,9 @@
(oldFilterString != null && coreFilterString != null && !oldFilterString.equals(coreFilterString));
SimpleString oldTopicName = subResponse.getAddress();
-
+
boolean topicChanged = !oldTopicName.equals(dest.getSimpleAddress());
-
+
if (selectorChanged || topicChanged)
{
// Delete the old durable sub
@@ -640,19 +661,21 @@
}
// eager test of the filter syntax as required by JMS spec
- try {
+ try
+ {
FilterImpl.createFilter(filterString);
- } catch (MessagingException e)
+ }
+ catch (MessagingException e)
{
throw JMSExceptionHelper.convertFromMessagingException(e);
}
-
- JBossQueue jbq = (JBossQueue) queue;
+ JBossQueue jbq = (JBossQueue)queue;
+
try
{
- SessionBindingQueryResponseMessage message = session.bindingQuery(new SimpleString(jbq.getAddress()));
- if(!message.isExists())
+ SessionBindingQueryResponseMessage message = session.bindingQuery(new SimpleString(jbq.getAddress()));
+ if (!message.isExists())
{
throw new InvalidDestinationException(jbq.getAddress() + " does not exist");
}
@@ -709,14 +732,16 @@
JBossTemporaryTopic topic = new JBossTemporaryTopic(this, topicName);
SimpleString simpleAddress = topic.getSimpleAddress();
-
- //We create a dummy subscription on the topic, that never receives messages - this is so we can perform JMS checks when routing messages to a topic that
- //does not exist - otherwise we would not be able to distinguish from a non existent topic and one with no subscriptions - core has no notion of a topic
-
+
+ // We create a dummy subscription on the topic, that never receives messages - this is so we can perform JMS
+ // checks when routing messages to a topic that
+ // does not exist - otherwise we would not be able to distinguish from a non existent topic and one with no
+ // subscriptions - core has no notion of a topic
+
session.createQueue(simpleAddress, simpleAddress, REJECTING_FILTER, false);
connection.addTemporaryQueue(simpleAddress);
-
+
return topic;
}
catch (MessagingException e)
@@ -724,7 +749,7 @@
throw JMSExceptionHelper.convertFromMessagingException(e);
}
}
-
+
public void unsubscribe(final String name) throws JMSException
{
// As per spec. section 4.11
@@ -781,48 +806,48 @@
public QueueReceiver createReceiver(final Queue queue, final String messageSelector) throws JMSException
{
- return (QueueReceiver) createConsumer(queue, messageSelector);
+ return (QueueReceiver)createConsumer(queue, messageSelector);
}
public QueueReceiver createReceiver(final Queue queue) throws JMSException
{
- return (QueueReceiver) createConsumer(queue);
+ return (QueueReceiver)createConsumer(queue);
}
public QueueSender createSender(final Queue queue) throws JMSException
{
- return (QueueSender) createProducer(queue);
+ return (QueueSender)createProducer(queue);
}
// XAQueueSession implementation
public QueueSession getQueueSession() throws JMSException
{
- return (QueueSession) getSession();
+ return (QueueSession)getSession();
}
// TopicSession implementation
public TopicPublisher createPublisher(final Topic topic) throws JMSException
{
- return (TopicPublisher) createProducer(topic);
+ return (TopicPublisher)createProducer(topic);
}
public TopicSubscriber createSubscriber(final Topic topic, final String messageSelector, final boolean noLocal) throws JMSException
{
- return (TopicSubscriber) createConsumer(topic, messageSelector, noLocal);
+ return (TopicSubscriber)createConsumer(topic, messageSelector, noLocal);
}
public TopicSubscriber createSubscriber(final Topic topic) throws JMSException
{
- return (TopicSubscriber) createConsumer(topic);
+ return (TopicSubscriber)createConsumer(topic);
}
// XATopicSession implementation
public TopicSession getTopicSession() throws JMSException
{
- return (TopicSession) getSession();
+ return (TopicSession)getSession();
}
// Public --------------------------------------------------------
@@ -848,27 +873,27 @@
}
public void deleteTemporaryTopic(final JBossTemporaryTopic tempTopic) throws JMSException
- {
+ {
try
{
SessionBindingQueryResponseMessage response = session.bindingQuery(tempTopic.getSimpleAddress());
-
+
if (!response.isExists())
{
throw new InvalidDestinationException("Cannot delete temporary topic " + tempTopic.getName() +
" does not exist");
}
-
+
if (response.getQueueNames().size() > 1)
{
throw new IllegalStateException("Cannot delete temporary topic " + tempTopic.getName() +
" since it has subscribers");
}
-
+
SimpleString address = tempTopic.getSimpleAddress();
session.deleteQueue(address);
-
+
connection.removeTemporaryQueue(address);
}
catch (MessagingException e)
More information about the jboss-cvs-commits
mailing list