[jboss-cvs] JBoss Messaging SVN: r6231 - trunk/src/main/org/jboss/messaging/jms/client.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Mar 31 06:51:19 EDT 2009


Author: timfox
Date: 2009-03-31 06:51:19 -0400 (Tue, 31 Mar 2009)
New Revision: 6231

Modified:
   trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java
Log:
fixed message producer test

Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java	2009-03-31 10:22:59 UTC (rev 6230)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java	2009-03-31 10:51:19 UTC (rev 6231)
@@ -98,9 +98,9 @@
    public static final int TYPE_TOPIC_SESSION = 2;
 
    public static final int SERVER_ACKNOWLEDGE = 4;
-   
+
    private static SimpleString REJECTING_FILTER = new SimpleString("_JBMX=-1");
-   
+
    // Static --------------------------------------------------------
 
    private static final Logger log = Logger.getLogger(JBossSession.class);
@@ -123,7 +123,6 @@
 
    private final Set<JBossMessageConsumer> consumers = new HashSet<JBossMessageConsumer>();
 
-
    // Constructors --------------------------------------------------
 
    public JBossSession(final JBossConnection connection,
@@ -328,10 +327,32 @@
          throw new InvalidDestinationException("Not a JBoss Destination:" + destination);
       }
 
-      JBossDestination jbd = (JBossDestination) destination;
-
       try
       {
+         JBossDestination jbd = (JBossDestination)destination;
+
+         if (jbd != null)
+         {
+            if (jbd instanceof Queue)
+            {
+               SessionQueueQueryResponseMessage response = session.queueQuery(jbd.getSimpleAddress());
+   
+               if (!response.isExists())
+               {
+                  throw new InvalidDestinationException("Queue " + jbd.getName() + " does not exist");
+               }
+            }
+            else
+            {
+               SessionBindingQueryResponseMessage response = session.bindingQuery(jbd.getSimpleAddress());
+   
+               if (!response.isExists())
+               {
+                  throw new InvalidDestinationException("Topic " + jbd.getName() + " does not exist");
+               }
+            }
+         }
+
          ClientProducer producer = session.createProducer(jbd == null ? null : jbd.getSimpleAddress());
 
          return new JBossMessageProducer(connection, producer, jbd, session);
@@ -366,7 +387,7 @@
          throw new InvalidDestinationException("Not a JBossDestination:" + destination);
       }
 
-      JBossDestination jbdest = (JBossDestination) destination;
+      JBossDestination jbdest = (JBossDestination)destination;
 
       JBossMessageConsumer consumer = createConsumer(jbdest, null, messageSelector, noLocal);
 
@@ -459,7 +480,7 @@
          messageSelector = null;
       }
 
-      JBossDestination jbdest = (JBossDestination) topic;
+      JBossDestination jbdest = (JBossDestination)topic;
 
       return createConsumer(jbdest, name, messageSelector, noLocal);
    }
@@ -537,7 +558,7 @@
             else
             {
                // Durable sub
-               
+
                if (connection.getClientID() == null)
                {
                   throw new InvalidClientIDException("Cannot create durable subscription - client ID has not been set");
@@ -550,7 +571,7 @@
 
                queueName = new SimpleString(JBossTopic.createQueueNameForDurableSubscription(connection.getClientID(),
                                                                                              subscriptionName));
-               
+
                SessionQueueQueryResponseMessage subResponse = session.queueQuery(queueName);
 
                if (!subResponse.isExists())
@@ -580,9 +601,9 @@
                                             (oldFilterString != null && coreFilterString != null && !oldFilterString.equals(coreFilterString));
 
                   SimpleString oldTopicName = subResponse.getAddress();
-                  
+
                   boolean topicChanged = !oldTopicName.equals(dest.getSimpleAddress());
-                  
+
                   if (selectorChanged || topicChanged)
                   {
                      // Delete the old durable sub
@@ -640,19 +661,21 @@
       }
 
       // eager test of the filter syntax as required by JMS spec
-      try {
+      try
+      {
          FilterImpl.createFilter(filterString);
-      } catch (MessagingException e)
+      }
+      catch (MessagingException e)
       {
          throw JMSExceptionHelper.convertFromMessagingException(e);
       }
-      
-      JBossQueue jbq = (JBossQueue) queue;
 
+      JBossQueue jbq = (JBossQueue)queue;
+
       try
       {
-         SessionBindingQueryResponseMessage message  = session.bindingQuery(new SimpleString(jbq.getAddress()));
-         if(!message.isExists())
+         SessionBindingQueryResponseMessage message = session.bindingQuery(new SimpleString(jbq.getAddress()));
+         if (!message.isExists())
          {
             throw new InvalidDestinationException(jbq.getAddress() + " does not exist");
          }
@@ -709,14 +732,16 @@
          JBossTemporaryTopic topic = new JBossTemporaryTopic(this, topicName);
 
          SimpleString simpleAddress = topic.getSimpleAddress();
-         
-         //We create a dummy subscription on the topic, that never receives messages - this is so we can perform JMS checks when routing messages to a topic that
-         //does not exist - otherwise we would not be able to distinguish from a non existent topic and one with no subscriptions - core has no notion of a topic
-         
+
+         // We create a dummy subscription on the topic, that never receives messages - this is so we can perform JMS
+         // checks when routing messages to a topic that
+         // does not exist - otherwise we would not be able to distinguish from a non existent topic and one with no
+         // subscriptions - core has no notion of a topic
+
          session.createQueue(simpleAddress, simpleAddress, REJECTING_FILTER, false);
 
          connection.addTemporaryQueue(simpleAddress);
-         
+
          return topic;
       }
       catch (MessagingException e)
@@ -724,7 +749,7 @@
          throw JMSExceptionHelper.convertFromMessagingException(e);
       }
    }
-   
+
    public void unsubscribe(final String name) throws JMSException
    {
       // As per spec. section 4.11
@@ -781,48 +806,48 @@
 
    public QueueReceiver createReceiver(final Queue queue, final String messageSelector) throws JMSException
    {
-      return (QueueReceiver) createConsumer(queue, messageSelector);
+      return (QueueReceiver)createConsumer(queue, messageSelector);
    }
 
    public QueueReceiver createReceiver(final Queue queue) throws JMSException
    {
-      return (QueueReceiver) createConsumer(queue);
+      return (QueueReceiver)createConsumer(queue);
    }
 
    public QueueSender createSender(final Queue queue) throws JMSException
    {
-      return (QueueSender) createProducer(queue);
+      return (QueueSender)createProducer(queue);
    }
 
    // XAQueueSession implementation
 
    public QueueSession getQueueSession() throws JMSException
    {
-      return (QueueSession) getSession();
+      return (QueueSession)getSession();
    }
 
    // TopicSession implementation
 
    public TopicPublisher createPublisher(final Topic topic) throws JMSException
    {
-      return (TopicPublisher) createProducer(topic);
+      return (TopicPublisher)createProducer(topic);
    }
 
    public TopicSubscriber createSubscriber(final Topic topic, final String messageSelector, final boolean noLocal) throws JMSException
    {
-      return (TopicSubscriber) createConsumer(topic, messageSelector, noLocal);
+      return (TopicSubscriber)createConsumer(topic, messageSelector, noLocal);
    }
 
    public TopicSubscriber createSubscriber(final Topic topic) throws JMSException
    {
-      return (TopicSubscriber) createConsumer(topic);
+      return (TopicSubscriber)createConsumer(topic);
    }
 
    // XATopicSession implementation
 
    public TopicSession getTopicSession() throws JMSException
    {
-      return (TopicSession) getSession();
+      return (TopicSession)getSession();
    }
 
    // Public --------------------------------------------------------
@@ -848,27 +873,27 @@
    }
 
    public void deleteTemporaryTopic(final JBossTemporaryTopic tempTopic) throws JMSException
-   { 
+   {
       try
       {
          SessionBindingQueryResponseMessage response = session.bindingQuery(tempTopic.getSimpleAddress());
-     
+
          if (!response.isExists())
          {
             throw new InvalidDestinationException("Cannot delete temporary topic " + tempTopic.getName() +
                                                   " does not exist");
          }
-                 
+
          if (response.getQueueNames().size() > 1)
          {
             throw new IllegalStateException("Cannot delete temporary topic " + tempTopic.getName() +
                                             " since it has subscribers");
          }
-         
+
          SimpleString address = tempTopic.getSimpleAddress();
 
          session.deleteQueue(address);
- 
+
          connection.removeTemporaryQueue(address);
       }
       catch (MessagingException e)




More information about the jboss-cvs-commits mailing list