Author: jmesnil
Date: 2009-12-07 11:31:21 -0500 (Mon, 07 Dec 2009)
New Revision: 8609
Modified:
trunk/src/main/org/hornetq/core/client/ClientSession.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
trunk/src/main/org/hornetq/jms/client/HornetQSession.java
trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java
trunk/tests/src/org/hornetq/tests/integration/client/SessionTest.java
Log:
HORNETQ-185: review core API
* removed dependency from ClientSession API to wireformat classes
by introducing interfaces ClientSession.BindingQuery & ClientSession.QueueQuery
Modified: trunk/src/main/org/hornetq/core/client/ClientSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/ClientSession.java 2009-12-07 15:48:53 UTC (rev
8608)
+++ trunk/src/main/org/hornetq/core/client/ClientSession.java 2009-12-07 16:31:21 UTC (rev
8609)
@@ -13,11 +13,12 @@
package org.hornetq.core.client;
+
+import java.util.List;
+
import javax.transaction.xa.XAResource;
import org.hornetq.core.exception.HornetQException;
-import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
import org.hornetq.utils.SimpleString;
/**
@@ -29,6 +30,28 @@
*/
public interface ClientSession extends XAResource
{
+ public interface BindingQuery
+ {
+ boolean isExists();
+
+ public List<SimpleString> getQueueNames();
+ }
+
+ public interface QueueQuery
+ {
+ boolean isExists();
+
+ boolean isDurable();
+
+ int getConsumerCount();
+
+ int getMessageCount();
+
+ SimpleString getFilterString();
+
+ SimpleString getAddress();
+ }
+
// Lifecycle operations ------------------------------------------
/**
@@ -399,9 +422,9 @@
// Query operations ----------------------------------------------
- SessionQueueQueryResponseMessage queueQuery(SimpleString queueName) throws
HornetQException;
+ QueueQuery queueQuery(SimpleString queueName) throws HornetQException;
- SessionBindingQueryResponseMessage bindingQuery(SimpleString address) throws
HornetQException;
+ BindingQuery bindingQuery(SimpleString address) throws HornetQException;
// Transaction operations ----------------------------------------
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-12-07 15:48:53
UTC (rev 8608)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-12-07 16:31:21
UTC (rev 8609)
@@ -15,6 +15,7 @@
import static org.hornetq.core.exception.HornetQException.TRANSACTION_ROLLED_BACK;
import static org.hornetq.utils.SimpleString.toSimpleString;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
@@ -104,7 +105,7 @@
private static final Logger log = Logger.getLogger(ClientSessionImpl.class);
private final boolean trace = log.isTraceEnabled();
-
+
// Attributes
----------------------------------------------------------------------------
private final FailoverManager failoverManager;
@@ -323,7 +324,7 @@
deleteQueue(toSimpleString(queueName));
}
- public SessionQueueQueryResponseMessage queueQuery(final SimpleString queueName)
throws HornetQException
+ public QueueQuery queueQuery(final SimpleString queueName) throws HornetQException
{
checkClosed();
@@ -331,10 +332,10 @@
SessionQueueQueryResponseMessage response =
(SessionQueueQueryResponseMessage)channel.sendBlocking(request);
- return response;
- }
+ return new QueueQueryImpl(response.isDurable(), response.getConsumerCount(),
response.getMessageCount(), response.getFilterString(), response.getAddress(),
response.isExists());
+ }
- public SessionBindingQueryResponseMessage bindingQuery(final SimpleString address)
throws HornetQException
+ public BindingQuery bindingQuery(final SimpleString address) throws HornetQException
{
checkClosed();
@@ -342,7 +343,7 @@
SessionBindingQueryResponseMessage response =
(SessionBindingQueryResponseMessage)channel.sendBlocking(request);
- return response;
+ return new BindingQueryImpl(response.isExists(), response.getQueueNames());
}
public void forceDelivery(long consumerID, long sequence) throws HornetQException
@@ -1588,4 +1589,84 @@
consumer.flushAcks();
}
}
+
+ private static class BindingQueryImpl implements BindingQuery
+ {
+
+ private final boolean exists;
+ private final ArrayList<SimpleString> queueNames;
+
+ public BindingQueryImpl(final boolean exists, List<SimpleString> queueNames)
+ {
+ this.exists = exists;
+ this.queueNames = new ArrayList<SimpleString>(queueNames);
+ }
+
+ public List<SimpleString> getQueueNames()
+ {
+ return queueNames;
+ }
+
+ public boolean isExists()
+ {
+ return exists;
+ }
+ }
+
+ private static class QueueQueryImpl implements QueueQuery
+ {
+
+ private final boolean exists;
+ private final boolean durable;
+ private final int messageCount;
+ private final SimpleString filterString;
+ private final int consumerCount;
+ private final SimpleString address;
+
+ public QueueQueryImpl(final boolean durable,
+ final int consumerCount,
+ final int messageCount,
+ final SimpleString filterString,
+ final SimpleString address,
+ final boolean exists)
+ {
+
+ this.durable = durable;
+ this.consumerCount = consumerCount;
+ this.messageCount = messageCount;
+ this.filterString = filterString;
+ this.address = address;
+ this.exists = exists;
+ }
+ public SimpleString getAddress()
+ {
+ return address;
+ }
+
+ public int getConsumerCount()
+ {
+ return consumerCount;
+ }
+
+ public SimpleString getFilterString()
+ {
+ return filterString;
+ }
+
+ public int getMessageCount()
+ {
+ return messageCount;
+ }
+
+ public boolean isDurable()
+ {
+ return durable;
+ }
+
+ public boolean isExists()
+ {
+ return exists;
+ }
+
+ }
}
Modified: trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2009-12-07 15:48:53
UTC (rev 8608)
+++ trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2009-12-07 16:31:21
UTC (rev 8609)
@@ -27,8 +27,6 @@
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionReceiveLargeMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionReceiveMessage;
@@ -115,7 +113,7 @@
session.addProducer(producer);
}
- public SessionBindingQueryResponseMessage bindingQuery(SimpleString address) throws
HornetQException
+ public BindingQuery bindingQuery(SimpleString address) throws HornetQException
{
return session.bindingQuery(address);
}
@@ -403,7 +401,7 @@
return session.prepare(xid);
}
- public SessionQueueQueryResponseMessage queueQuery(SimpleString queueName) throws
HornetQException
+ public QueueQuery queueQuery(SimpleString queueName) throws HornetQException
{
return session.queueQuery(queueName);
}
Modified: trunk/src/main/org/hornetq/jms/client/HornetQSession.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQSession.java 2009-12-07 15:48:53 UTC (rev
8608)
+++ trunk/src/main/org/hornetq/jms/client/HornetQSession.java 2009-12-07 16:31:21 UTC (rev
8609)
@@ -54,11 +54,11 @@
import org.hornetq.core.client.ClientConsumer;
import org.hornetq.core.client.ClientProducer;
import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSession.BindingQuery;
+import org.hornetq.core.client.ClientSession.QueueQuery;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.impl.FilterImpl;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
import org.hornetq.jms.HornetQDestination;
import org.hornetq.jms.HornetQQueue;
import org.hornetq.jms.HornetQTemporaryQueue;
@@ -331,7 +331,7 @@
{
if (jbd instanceof Queue)
{
- SessionQueueQueryResponseMessage response =
session.queueQuery(jbd.getSimpleAddress());
+ QueueQuery response = session.queueQuery(jbd.getSimpleAddress());
if (!response.isExists())
{
@@ -340,7 +340,7 @@
}
else
{
- SessionBindingQueryResponseMessage response =
session.bindingQuery(jbd.getSimpleAddress());
+ BindingQuery response = session.bindingQuery(jbd.getSimpleAddress());
if (!response.isExists())
{
@@ -408,7 +408,7 @@
try
{
- SessionQueueQueryResponseMessage response =
session.queueQuery(queue.getSimpleAddress());
+ QueueQuery response = session.queueQuery(queue.getSimpleAddress());
if (!response.isExists())
{
@@ -437,9 +437,9 @@
try
{
- SessionBindingQueryResponseMessage response =
session.bindingQuery(topic.getSimpleAddress());
+ BindingQuery query = session.bindingQuery(topic.getSimpleAddress());
- if (!response.isExists())
+ if (!query.isExists())
{
throw new JMSException("There is no topic with name " +
topicName);
}
@@ -527,7 +527,7 @@
if (dest instanceof Queue)
{
- SessionQueueQueryResponseMessage response =
session.queueQuery(dest.getSimpleAddress());
+ QueueQuery response = session.queueQuery(dest.getSimpleAddress());
if (!response.isExists())
{
@@ -538,7 +538,7 @@
}
else
{
- SessionBindingQueryResponseMessage response =
session.bindingQuery(dest.getSimpleAddress());
+ BindingQuery response = session.bindingQuery(dest.getSimpleAddress());
if (!response.isExists())
{
@@ -576,7 +576,7 @@
queueName = new
SimpleString(HornetQTopic.createQueueNameForDurableSubscription(connection.getClientID(),
subscriptionName));
- SessionQueueQueryResponseMessage subResponse =
session.queueQuery(queueName);
+ QueueQuery subResponse = session.queueQuery(queueName);
if (!subResponse.isExists())
{
@@ -678,7 +678,7 @@
try
{
- SessionBindingQueryResponseMessage message = session.bindingQuery(new
SimpleString(jbq.getAddress()));
+ BindingQuery message = session.bindingQuery(new
SimpleString(jbq.getAddress()));
if (!message.isExists())
{
throw new InvalidDestinationException(jbq.getAddress() + " does not
exist");
@@ -767,7 +767,7 @@
try
{
- SessionQueueQueryResponseMessage response = session.queueQuery(queueName);
+ QueueQuery response = session.queueQuery(queueName);
if (!response.isExists())
{
@@ -880,7 +880,7 @@
{
try
{
- SessionBindingQueryResponseMessage response =
session.bindingQuery(tempTopic.getSimpleAddress());
+ BindingQuery response = session.bindingQuery(tempTopic.getSimpleAddress());
if (!response.isExists())
{
@@ -910,7 +910,7 @@
{
try
{
- SessionQueueQueryResponseMessage response =
session.queueQuery(tempQueue.getSimpleAddress());
+ QueueQuery response = session.queueQuery(tempQueue.getSimpleAddress());
if (!response.isExists())
{
Modified: trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java 2009-12-07 15:48:53
UTC (rev 8608)
+++ trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java 2009-12-07 16:31:21
UTC (rev 8609)
@@ -24,9 +24,9 @@
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientSession;
import org.hornetq.core.client.MessageHandler;
+import org.hornetq.core.client.ClientSession.QueueQuery;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
import org.hornetq.jms.HornetQTopic;
import org.hornetq.jms.client.HornetQMessage;
import org.hornetq.utils.SimpleString;
@@ -99,7 +99,7 @@
.getClientID(),
subscriptionName));
- SessionQueueQueryResponseMessage subResponse = session.queueQuery(queueName);
+ QueueQuery subResponse = session.queueQuery(queueName);
if (!subResponse.isExists())
{
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java
===================================================================
---
trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java 2009-12-07
15:48:53 UTC (rev 8608)
+++
trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java 2009-12-07
16:31:21 UTC (rev 8609)
@@ -43,8 +43,6 @@
import org.hornetq.core.client.impl.ClientMessageImpl;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.remoting.FailureListener;
-import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
import org.hornetq.jms.client.HornetQBytesMessage;
import org.hornetq.jms.client.HornetQMapMessage;
import org.hornetq.jms.client.HornetQMessage;
@@ -1003,12 +1001,12 @@
return null;
}
- public SessionQueueQueryResponseMessage queueQuery(SimpleString queueName) throws
HornetQException
+ public QueueQuery queueQuery(SimpleString queueName) throws HornetQException
{
return null;
}
- public SessionBindingQueryResponseMessage bindingQuery(SimpleString address) throws
HornetQException
+ public BindingQuery bindingQuery(SimpleString address) throws HornetQException
{
return null;
}
Modified: trunk/tests/src/org/hornetq/tests/integration/client/SessionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/SessionTest.java 2009-12-07
15:48:53 UTC (rev 8608)
+++ trunk/tests/src/org/hornetq/tests/integration/client/SessionTest.java 2009-12-07
16:31:21 UTC (rev 8609)
@@ -22,11 +22,11 @@
import org.hornetq.core.client.ClientSession;
import org.hornetq.core.client.ClientSessionFactory;
import org.hornetq.core.client.SessionFailureListener;
+import org.hornetq.core.client.ClientSession.BindingQuery;
+import org.hornetq.core.client.ClientSession.QueueQuery;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
import org.hornetq.tests.util.ServiceTestBase;
@@ -186,7 +186,7 @@
clientSession.createQueue("a2", "q3", false);
clientSession.createQueue("a2", "q4", false);
clientSession.createQueue("a2", "q5", false);
- SessionBindingQueryResponseMessage resp = clientSession.bindingQuery(new
SimpleString("a"));
+ BindingQuery resp = clientSession.bindingQuery(new
SimpleString("a"));
List<SimpleString> queues = resp.getQueueNames();
assertTrue(queues.isEmpty());
resp = clientSession.bindingQuery(new SimpleString("a1"));
@@ -225,7 +225,7 @@
ClientProducer cp = clientSession.createProducer("a1");
cp.send(clientSession.createClientMessage(false));
cp.send(clientSession.createClientMessage(false));
- SessionQueueQueryResponseMessage resp = clientSession.queueQuery(new
SimpleString(queueName));
+ QueueQuery resp = clientSession.queueQuery(new SimpleString(queueName));
assertEquals(new SimpleString("a1"), resp.getAddress());
assertEquals(2, resp.getConsumerCount());
assertEquals(2, resp.getMessageCount());
@@ -252,7 +252,7 @@
clientSession.createQueue("a1", queueName, "foo=bar",
false);
clientSession.createConsumer(queueName);
clientSession.createConsumer(queueName);
- SessionQueueQueryResponseMessage resp = clientSession.queueQuery(new
SimpleString(queueName));
+ QueueQuery resp = clientSession.queueQuery(new SimpleString(queueName));
assertEquals(new SimpleString("a1"), resp.getAddress());
assertEquals(2, resp.getConsumerCount());
assertEquals(0, resp.getMessageCount());
@@ -276,7 +276,7 @@
server.start();
ClientSessionFactory cf = createInVMFactory();
ClientSession clientSession = cf.createSession(false, true, true);
- SessionQueueQueryResponseMessage resp = clientSession.queueQuery(new
SimpleString(queueName));
+ QueueQuery resp = clientSession.queueQuery(new SimpleString(queueName));
assertFalse(resp.isExists());
assertEquals(null, resp.getAddress());
clientSession.close();