[hornetq-commits] JBoss hornetq SVN: r8609 - in trunk: src/main/org/hornetq/core/client/impl and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Dec 7 11:31:22 EST 2009


Author: jmesnil
Date: 2009-12-07 11:31:21 -0500 (Mon, 07 Dec 2009)
New Revision: 8609

Modified:
   trunk/src/main/org/hornetq/core/client/ClientSession.java
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
   trunk/src/main/org/hornetq/jms/client/HornetQSession.java
   trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
   trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java
   trunk/tests/src/org/hornetq/tests/integration/client/SessionTest.java
Log:
HORNETQ-185: review core API

* removed dependency from ClientSession API to wireformat classes
  by introducing interfaces ClientSession.BindingQuery & ClientSession.QueueQuery

Modified: trunk/src/main/org/hornetq/core/client/ClientSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/ClientSession.java	2009-12-07 15:48:53 UTC (rev 8608)
+++ trunk/src/main/org/hornetq/core/client/ClientSession.java	2009-12-07 16:31:21 UTC (rev 8609)
@@ -13,11 +13,12 @@
 
 package org.hornetq.core.client;
 
+
+import java.util.List;
+
 import javax.transaction.xa.XAResource;
 
 import org.hornetq.core.exception.HornetQException;
-import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
 import org.hornetq.utils.SimpleString;
 
 /**
@@ -29,6 +30,28 @@
  */
 public interface ClientSession extends XAResource
 {
+   public interface BindingQuery
+   {
+      boolean isExists();
+
+      public List<SimpleString> getQueueNames();
+   }
+   
+   public interface QueueQuery
+   {
+      boolean isExists();
+
+      boolean isDurable();
+
+      int getConsumerCount();
+
+      int getMessageCount();
+
+      SimpleString getFilterString();
+
+      SimpleString getAddress();
+   }
+   
    // Lifecycle operations ------------------------------------------
 
    /**
@@ -399,9 +422,9 @@
 
    // Query operations ----------------------------------------------
 
-   SessionQueueQueryResponseMessage queueQuery(SimpleString queueName) throws HornetQException;
+   QueueQuery queueQuery(SimpleString queueName) throws HornetQException;
 
-   SessionBindingQueryResponseMessage bindingQuery(SimpleString address) throws HornetQException;
+   BindingQuery bindingQuery(SimpleString address) throws HornetQException;
 
    // Transaction operations ----------------------------------------
 

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2009-12-07 15:48:53 UTC (rev 8608)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2009-12-07 16:31:21 UTC (rev 8609)
@@ -15,6 +15,7 @@
 import static org.hornetq.core.exception.HornetQException.TRANSACTION_ROLLED_BACK;
 import static org.hornetq.utils.SimpleString.toSimpleString;
 
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -104,7 +105,7 @@
    private static final Logger log = Logger.getLogger(ClientSessionImpl.class);
 
    private final boolean trace = log.isTraceEnabled();
-
+   
    // Attributes ----------------------------------------------------------------------------
 
    private final FailoverManager failoverManager;
@@ -323,7 +324,7 @@
       deleteQueue(toSimpleString(queueName));
    }
 
-   public SessionQueueQueryResponseMessage queueQuery(final SimpleString queueName) throws HornetQException
+   public QueueQuery queueQuery(final SimpleString queueName) throws HornetQException
    {
       checkClosed();
 
@@ -331,10 +332,10 @@
 
       SessionQueueQueryResponseMessage response = (SessionQueueQueryResponseMessage)channel.sendBlocking(request);
 
-      return response;
-   }
+      return new QueueQueryImpl(response.isDurable(), response.getConsumerCount(), response.getMessageCount(), response.getFilterString(), response.getAddress(), response.isExists());
+      }
 
-   public SessionBindingQueryResponseMessage bindingQuery(final SimpleString address) throws HornetQException
+   public BindingQuery bindingQuery(final SimpleString address) throws HornetQException
    {
       checkClosed();
 
@@ -342,7 +343,7 @@
 
       SessionBindingQueryResponseMessage response = (SessionBindingQueryResponseMessage)channel.sendBlocking(request);
 
-      return response;
+      return new BindingQueryImpl(response.isExists(), response.getQueueNames());
    }
 
    public void forceDelivery(long consumerID, long sequence) throws HornetQException
@@ -1588,4 +1589,84 @@
          consumer.flushAcks();
       }
    }
+   
+   private static class BindingQueryImpl implements BindingQuery
+   {
+
+      private final boolean exists;
+      private final ArrayList<SimpleString> queueNames;
+
+      public BindingQueryImpl(final boolean exists, List<SimpleString> queueNames)
+      {
+         this.exists = exists;
+         this.queueNames = new ArrayList<SimpleString>(queueNames);
+      }
+
+      public List<SimpleString> getQueueNames()
+      {
+         return queueNames;
+      }
+
+      public boolean isExists()
+      {
+         return exists;
+      }
+   }
+   
+   private static class QueueQueryImpl implements QueueQuery
+   {
+
+      private final boolean exists;
+      private final boolean durable;
+      private final int messageCount;
+      private final SimpleString filterString;
+      private final int consumerCount;
+      private final SimpleString address;
+
+      public QueueQueryImpl(final boolean durable,
+                            final int consumerCount,
+                            final int messageCount,
+                            final SimpleString filterString,
+                            final SimpleString address,
+                            final boolean exists)
+      {
+
+         this.durable = durable;
+         this.consumerCount = consumerCount;
+         this.messageCount = messageCount;
+         this.filterString = filterString;
+         this.address = address;
+         this.exists = exists;
+      }
+      public SimpleString getAddress()
+      {
+         return address;
+      }
+
+      public int getConsumerCount()
+      {
+         return consumerCount;
+      }
+
+      public SimpleString getFilterString()
+      {
+         return filterString;
+      }
+
+      public int getMessageCount()
+      {
+         return messageCount;
+      }
+
+      public boolean isDurable()
+      {
+         return durable;
+      }
+
+      public boolean isExists()
+      {
+         return exists;
+      }
+      
+   }
 }

Modified: trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java	2009-12-07 15:48:53 UTC (rev 8608)
+++ trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java	2009-12-07 16:31:21 UTC (rev 8609)
@@ -27,8 +27,6 @@
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionReceiveLargeMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionReceiveMessage;
@@ -115,7 +113,7 @@
       session.addProducer(producer);
    }
 
-   public SessionBindingQueryResponseMessage bindingQuery(SimpleString address) throws HornetQException
+   public BindingQuery bindingQuery(SimpleString address) throws HornetQException
    {
       return session.bindingQuery(address);
    }
@@ -403,7 +401,7 @@
       return session.prepare(xid);
    }
 
-   public SessionQueueQueryResponseMessage queueQuery(SimpleString queueName) throws HornetQException
+   public QueueQuery queueQuery(SimpleString queueName) throws HornetQException
    {
       return session.queueQuery(queueName);
    }

Modified: trunk/src/main/org/hornetq/jms/client/HornetQSession.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQSession.java	2009-12-07 15:48:53 UTC (rev 8608)
+++ trunk/src/main/org/hornetq/jms/client/HornetQSession.java	2009-12-07 16:31:21 UTC (rev 8609)
@@ -54,11 +54,11 @@
 import org.hornetq.core.client.ClientConsumer;
 import org.hornetq.core.client.ClientProducer;
 import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSession.BindingQuery;
+import org.hornetq.core.client.ClientSession.QueueQuery;
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.filter.impl.FilterImpl;
 import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
 import org.hornetq.jms.HornetQDestination;
 import org.hornetq.jms.HornetQQueue;
 import org.hornetq.jms.HornetQTemporaryQueue;
@@ -331,7 +331,7 @@
          {
             if (jbd instanceof Queue)
             {
-               SessionQueueQueryResponseMessage response = session.queueQuery(jbd.getSimpleAddress());
+               QueueQuery response = session.queueQuery(jbd.getSimpleAddress());
 
                if (!response.isExists())
                {
@@ -340,7 +340,7 @@
             }
             else
             {
-               SessionBindingQueryResponseMessage response = session.bindingQuery(jbd.getSimpleAddress());
+               BindingQuery response = session.bindingQuery(jbd.getSimpleAddress());
 
                if (!response.isExists())
                {
@@ -408,7 +408,7 @@
 
       try
       {
-         SessionQueueQueryResponseMessage response = session.queueQuery(queue.getSimpleAddress());
+         QueueQuery response = session.queueQuery(queue.getSimpleAddress());
 
          if (!response.isExists())
          {
@@ -437,9 +437,9 @@
 
       try
       {
-         SessionBindingQueryResponseMessage response = session.bindingQuery(topic.getSimpleAddress());
+         BindingQuery query = session.bindingQuery(topic.getSimpleAddress());
 
-         if (!response.isExists())
+         if (!query.isExists())
          {
             throw new JMSException("There is no topic with name " + topicName);
          }
@@ -527,7 +527,7 @@
 
          if (dest instanceof Queue)
          {
-            SessionQueueQueryResponseMessage response = session.queueQuery(dest.getSimpleAddress());
+            QueueQuery response = session.queueQuery(dest.getSimpleAddress());
 
             if (!response.isExists())
             {
@@ -538,7 +538,7 @@
          }
          else
          {
-            SessionBindingQueryResponseMessage response = session.bindingQuery(dest.getSimpleAddress());
+            BindingQuery response = session.bindingQuery(dest.getSimpleAddress());
 
             if (!response.isExists())
             {
@@ -576,7 +576,7 @@
                queueName = new SimpleString(HornetQTopic.createQueueNameForDurableSubscription(connection.getClientID(),
                                                                                                subscriptionName));
 
-               SessionQueueQueryResponseMessage subResponse = session.queueQuery(queueName);
+               QueueQuery subResponse = session.queueQuery(queueName);
 
                if (!subResponse.isExists())
                {
@@ -678,7 +678,7 @@
 
       try
       {
-         SessionBindingQueryResponseMessage message = session.bindingQuery(new SimpleString(jbq.getAddress()));
+         BindingQuery message = session.bindingQuery(new SimpleString(jbq.getAddress()));
          if (!message.isExists())
          {
             throw new InvalidDestinationException(jbq.getAddress() + " does not exist");
@@ -767,7 +767,7 @@
 
       try
       {
-         SessionQueueQueryResponseMessage response = session.queueQuery(queueName);
+         QueueQuery response = session.queueQuery(queueName);
 
          if (!response.isExists())
          {
@@ -880,7 +880,7 @@
    {
       try
       {
-         SessionBindingQueryResponseMessage response = session.bindingQuery(tempTopic.getSimpleAddress());
+         BindingQuery response = session.bindingQuery(tempTopic.getSimpleAddress());
 
          if (!response.isExists())
          {
@@ -910,7 +910,7 @@
    {
       try
       {
-         SessionQueueQueryResponseMessage response = session.queueQuery(tempQueue.getSimpleAddress());
+         QueueQuery response = session.queueQuery(tempQueue.getSimpleAddress());
 
          if (!response.isExists())
          {

Modified: trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java	2009-12-07 15:48:53 UTC (rev 8608)
+++ trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java	2009-12-07 16:31:21 UTC (rev 8609)
@@ -24,9 +24,9 @@
 import org.hornetq.core.client.ClientMessage;
 import org.hornetq.core.client.ClientSession;
 import org.hornetq.core.client.MessageHandler;
+import org.hornetq.core.client.ClientSession.QueueQuery;
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
 import org.hornetq.jms.HornetQTopic;
 import org.hornetq.jms.client.HornetQMessage;
 import org.hornetq.utils.SimpleString;
@@ -99,7 +99,7 @@
                                                                                                                 .getClientID(),
                                                                                                       subscriptionName));
 
-         SessionQueueQueryResponseMessage subResponse = session.queueQuery(queueName);
+         QueueQuery subResponse = session.queueQuery(queueName);
 
          if (!subResponse.isExists())
          {

Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java	2009-12-07 15:48:53 UTC (rev 8608)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java	2009-12-07 16:31:21 UTC (rev 8609)
@@ -43,8 +43,6 @@
 import org.hornetq.core.client.impl.ClientMessageImpl;
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.remoting.FailureListener;
-import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
 import org.hornetq.jms.client.HornetQBytesMessage;
 import org.hornetq.jms.client.HornetQMapMessage;
 import org.hornetq.jms.client.HornetQMessage;
@@ -1003,12 +1001,12 @@
          return null;
       }
 
-      public SessionQueueQueryResponseMessage queueQuery(SimpleString queueName) throws HornetQException
+      public QueueQuery queueQuery(SimpleString queueName) throws HornetQException
       {
          return null;
       }
 
-      public SessionBindingQueryResponseMessage bindingQuery(SimpleString address) throws HornetQException
+      public BindingQuery bindingQuery(SimpleString address) throws HornetQException
       {
          return null;
       }

Modified: trunk/tests/src/org/hornetq/tests/integration/client/SessionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/SessionTest.java	2009-12-07 15:48:53 UTC (rev 8608)
+++ trunk/tests/src/org/hornetq/tests/integration/client/SessionTest.java	2009-12-07 16:31:21 UTC (rev 8609)
@@ -22,11 +22,11 @@
 import org.hornetq.core.client.ClientSession;
 import org.hornetq.core.client.ClientSessionFactory;
 import org.hornetq.core.client.SessionFailureListener;
+import org.hornetq.core.client.ClientSession.BindingQuery;
+import org.hornetq.core.client.ClientSession.QueueQuery;
 import org.hornetq.core.client.impl.ClientSessionInternal;
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.Queue;
 import org.hornetq.tests.util.ServiceTestBase;
@@ -186,7 +186,7 @@
          clientSession.createQueue("a2", "q3", false);
          clientSession.createQueue("a2", "q4", false);
          clientSession.createQueue("a2", "q5", false);
-         SessionBindingQueryResponseMessage resp = clientSession.bindingQuery(new SimpleString("a"));
+         BindingQuery resp = clientSession.bindingQuery(new SimpleString("a"));
          List<SimpleString> queues = resp.getQueueNames();
          assertTrue(queues.isEmpty());
          resp = clientSession.bindingQuery(new SimpleString("a1"));
@@ -225,7 +225,7 @@
          ClientProducer cp = clientSession.createProducer("a1");
          cp.send(clientSession.createClientMessage(false));
          cp.send(clientSession.createClientMessage(false));
-         SessionQueueQueryResponseMessage resp = clientSession.queueQuery(new SimpleString(queueName));
+         QueueQuery resp = clientSession.queueQuery(new SimpleString(queueName));
          assertEquals(new SimpleString("a1"), resp.getAddress());
          assertEquals(2, resp.getConsumerCount());
          assertEquals(2, resp.getMessageCount());
@@ -252,7 +252,7 @@
          clientSession.createQueue("a1", queueName, "foo=bar", false);
          clientSession.createConsumer(queueName);
          clientSession.createConsumer(queueName);
-         SessionQueueQueryResponseMessage resp = clientSession.queueQuery(new SimpleString(queueName));
+         QueueQuery resp = clientSession.queueQuery(new SimpleString(queueName));
          assertEquals(new SimpleString("a1"), resp.getAddress());
          assertEquals(2, resp.getConsumerCount());
          assertEquals(0, resp.getMessageCount());
@@ -276,7 +276,7 @@
          server.start();
          ClientSessionFactory cf = createInVMFactory();
          ClientSession clientSession = cf.createSession(false, true, true);
-         SessionQueueQueryResponseMessage resp = clientSession.queueQuery(new SimpleString(queueName));
+         QueueQuery resp = clientSession.queueQuery(new SimpleString(queueName));
          assertFalse(resp.isExists());
          assertEquals(null, resp.getAddress());
          clientSession.close();



More information about the hornetq-commits mailing list