JBoss hornetq SVN: r8860 - in trunk: src/main/org/hornetq/jms/client and 1 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-01-31 06:49:12 -0500 (Sun, 31 Jan 2010)
New Revision: 8860
Added:
trunk/tests/src/org/hornetq/tests/integration/jms/client/ReceiveNoWaitTest.java
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/hornetq/jms/client/HornetQMessageConsumer.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-284
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-01-29 18:51:42 UTC (rev 8859)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-01-31 11:49:12 UTC (rev 8860)
@@ -191,7 +191,7 @@
// Effectively infinite
timeout = Long.MAX_VALUE;
}
-
+
boolean deliveryForced = false;
long start = -1;
Modified: trunk/src/main/org/hornetq/jms/client/HornetQMessageConsumer.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQMessageConsumer.java 2010-01-29 18:51:42 UTC (rev 8859)
+++ trunk/src/main/org/hornetq/jms/client/HornetQMessageConsumer.java 2010-01-31 11:49:12 UTC (rev 8860)
@@ -123,17 +123,17 @@
public Message receive() throws JMSException
{
- return getMessage(0);
+ return getMessage(0, false);
}
public Message receive(final long timeout) throws JMSException
{
- return getMessage(timeout);
+ return getMessage(timeout, false);
}
public Message receiveNoWait() throws JMSException
{
- return getMessage(-1);
+ return getMessage(0, true);
}
public void close() throws JMSException
@@ -197,11 +197,20 @@
}
}
- private HornetQMessage getMessage(final long timeout) throws JMSException
+ private HornetQMessage getMessage(final long timeout, final boolean noWait) throws JMSException
{
try
{
- ClientMessage message = consumer.receive(timeout);
+ ClientMessage message;
+
+ if (noWait)
+ {
+ message = consumer.receiveImmediate();
+ }
+ else
+ {
+ message = consumer.receive(timeout);
+ }
HornetQMessage msg = null;
Added: trunk/tests/src/org/hornetq/tests/integration/jms/client/ReceiveNoWaitTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/ReceiveNoWaitTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/ReceiveNoWaitTest.java 2010-01-31 11:49:12 UTC (rev 8860)
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.jms.client;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.hornetq.core.logging.Logger;
+import org.hornetq.tests.util.JMSTestBase;
+
+/**
+ *
+ * A ReceiveNoWaitTest
+ *
+ * @author tim
+ *
+ *
+ */
+public class ReceiveNoWaitTest extends JMSTestBase
+{
+ private static final Logger log = Logger.getLogger(ReceiveNoWaitTest.class);
+
+ private Queue queue;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ queue = createQueue("TestQueue");
+ }
+
+ protected void tearDown() throws Exception
+ {
+ jmsServer.destroyQueue("TestQueue");
+
+ super.tearDown();
+ }
+
+
+ /*
+ * Test that after sending persistent messages to a queue (these will be sent blocking)
+ * that all messages are available for consumption by receiveNoWait()
+ * https://jira.jboss.org/jira/browse/HORNETQ-284
+ */
+ public void testReceiveNoWait() throws Exception
+ {
+ assertNotNull(queue);
+
+ for (int i = 0; i < 10; i++)
+ {
+ log.info("Iteration " + i);
+
+ Connection connection = cf.createConnection();
+
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ MessageProducer producer = session.createProducer(queue);
+
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ for (int j = 0; j < 100; j++)
+ {
+ String text = "Message" + j;
+
+ TextMessage message = session.createTextMessage();
+
+ message.setText(text);
+
+ producer.send(message);
+ }
+
+ connection.start();
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ for (int j = 0; j < 100; j++)
+ {
+ TextMessage m = (TextMessage)consumer.receiveNoWait();
+
+ if (m == null)
+ {
+ throw new IllegalStateException("msg null");
+ }
+
+ assertEquals("Message" + j, m.getText());
+
+ m.acknowledge();
+ }
+
+ connection.close();
+ }
+ }
+}
14 years, 10 months
JBoss hornetq SVN: r8859 - in trunk: src/main/org/hornetq/core/server/impl and 16 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-01-29 13:51:42 -0500 (Fri, 29 Jan 2010)
New Revision: 8859
Added:
trunk/tests/src/org/hornetq/tests/integration/jms/client/CreateQueueTest.java
Removed:
trunk/src/main/org/hornetq/jms/client/HornetQQueue.java
trunk/src/main/org/hornetq/jms/client/HornetQTemporaryQueue.java
trunk/src/main/org/hornetq/jms/client/HornetQTemporaryTopic.java
trunk/src/main/org/hornetq/jms/client/HornetQTopic.java
trunk/tests/src/org/hornetq/tests/unit/jms/HornetQQueueTest.java
trunk/tests/src/org/hornetq/tests/unit/jms/HornetQTopicTest.java
Modified:
trunk/src/main/org/hornetq/api/jms/HornetQJMSClient.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/jms/client/HornetQDestination.java
trunk/src/main/org/hornetq/jms/client/HornetQQueueBrowser.java
trunk/src/main/org/hornetq/jms/client/HornetQSession.java
trunk/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java
trunk/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
trunk/src/main/org/hornetq/jms/server/management/JMSManagementService.java
trunk/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java
trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/ReferenceableTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSReconnectTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/consumer/ConsumerTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java
trunk/tests/src/org/hornetq/tests/unit/jms/HornetQDestinationTest.java
trunk/tests/src/org/hornetq/tests/unit/jms/referenceable/DestinationObjectFactoryTest.java
trunk/tests/src/org/hornetq/tests/unit/ra/ResourceAdapterTest.java
trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-281
Modified: trunk/src/main/org/hornetq/api/jms/HornetQJMSClient.java
===================================================================
--- trunk/src/main/org/hornetq/api/jms/HornetQJMSClient.java 2010-01-29 13:43:32 UTC (rev 8858)
+++ trunk/src/main/org/hornetq/api/jms/HornetQJMSClient.java 2010-01-29 18:51:42 UTC (rev 8859)
@@ -23,8 +23,7 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.core.logging.Logger;
import org.hornetq.jms.client.HornetQConnectionFactory;
-import org.hornetq.jms.client.HornetQQueue;
-import org.hornetq.jms.client.HornetQTopic;
+import org.hornetq.jms.client.HornetQDestination;
/**
* A utility class for creating HornetQ client-side JMS managed resources.
@@ -111,7 +110,7 @@
*/
public static Topic createTopic(final String name)
{
- return new HornetQTopic(name);
+ return HornetQDestination.createTopic(name);
}
/**
@@ -122,7 +121,7 @@
*/
public static Queue createQueue(final String name)
{
- return new HornetQQueue(name);
+ return HornetQDestination.createQueue(name);
}
private HornetQJMSClient()
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-01-29 13:43:32 UTC (rev 8858)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-01-29 18:51:42 UTC (rev 8859)
@@ -147,7 +147,7 @@
final boolean preAcknowledge,
final boolean strictUpdateDeliveryCount,
final boolean xa,
- final RemotingConnection remotingConnection,
+ final RemotingConnection remotingConnection,
final StorageManager storageManager,
final PostOffice postOffice,
final ResourceManager resourceManager,
@@ -195,7 +195,7 @@
this.managementAddress = managementAddress;
this.callback = callback;
-
+
remotingConnection.addFailureListener(this);
remotingConnection.addCloseListener(this);
@@ -278,14 +278,14 @@
{
holder.store.returnProducerCredits(holder.outstandingCredits);
}
-
+
callback.closed();
}
public void createConsumer(final long consumerID,
- final SimpleString queueName,
- final SimpleString filterString,
- final boolean browseOnly) throws Exception
+ final SimpleString queueName,
+ final SimpleString filterString,
+ final boolean browseOnly) throws Exception
{
Binding binding = postOffice.getBinding(queueName);
@@ -340,11 +340,11 @@
}
public void createQueue(final SimpleString address,
- final SimpleString name,
- final SimpleString filterString,
- final boolean temporary,
- final boolean durable) throws Exception
- {
+ final SimpleString name,
+ final SimpleString filterString,
+ final boolean temporary,
+ final boolean durable) throws Exception
+ {
if (durable)
{
// make sure the user has privileges to create this queue
Modified: trunk/src/main/org/hornetq/jms/client/HornetQDestination.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQDestination.java 2010-01-29 13:43:32 UTC (rev 8858)
+++ trunk/src/main/org/hornetq/jms/client/HornetQDestination.java 2010-01-29 18:51:42 UTC (rev 8859)
@@ -14,12 +14,17 @@
package org.hornetq.jms.client;
import java.io.Serializable;
+import java.util.UUID;
import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
import javax.naming.NamingException;
import javax.naming.Reference;
import javax.naming.Referenceable;
+import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.jms.referenceable.DestinationObjectFactory;
import org.hornetq.jms.referenceable.SerializableObjectRefAddr;
@@ -27,13 +32,12 @@
/**
* HornetQ implementation of a JMS Destination.
*
- * @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @version <tt>$Revision$</tt>
*
* $Id$
*/
-public abstract class HornetQDestination implements Destination, Serializable, Referenceable
+public class HornetQDestination implements TemporaryQueue, TemporaryTopic, Serializable, Referenceable
{
// Constants -----------------------------------------------------
@@ -43,7 +47,13 @@
*
*/
private static final long serialVersionUID = 5027962425462382883L;
-
+
+ public static final String JMS_QUEUE_ADDRESS_PREFIX = "jms.queue.";
+
+ public static final String JMS_TOPIC_ADDRESS_PREFIX = "jms.topic.";
+
+ private static final char SEPARATOR = '.';
+
protected static String escape(final String input)
{
if (input == null)
@@ -55,55 +65,155 @@
public static Destination fromAddress(final String address)
{
- if (address.startsWith(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX))
+ if (address.startsWith(HornetQDestination.JMS_QUEUE_ADDRESS_PREFIX))
{
- String name = address.substring(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX.length());
+ String name = address.substring(HornetQDestination.JMS_QUEUE_ADDRESS_PREFIX.length());
- return new HornetQQueue(address, name);
+ return createQueue(name);
}
- else if (address.startsWith(HornetQTopic.JMS_TOPIC_ADDRESS_PREFIX))
+ else if (address.startsWith(HornetQDestination.JMS_TOPIC_ADDRESS_PREFIX))
{
- String name = address.substring(HornetQTopic.JMS_TOPIC_ADDRESS_PREFIX.length());
+ String name = address.substring(HornetQDestination.JMS_TOPIC_ADDRESS_PREFIX.length());
- return new HornetQTopic(address, name);
+ return createTopic(name);
}
- else if (address.startsWith(HornetQTemporaryQueue.JMS_TEMP_QUEUE_ADDRESS_PREFIX))
+ else
{
- String name = address.substring(HornetQTemporaryQueue.JMS_TEMP_QUEUE_ADDRESS_PREFIX.length());
+ throw new IllegalArgumentException("Invalid address " + address);
+ }
+ }
+
+ public static String createQueueNameForDurableSubscription(final String clientID, final String subscriptionName)
+ {
+ return HornetQDestination.escape(clientID) + SEPARATOR + HornetQDestination.escape(subscriptionName);
+ }
+
+ public static Pair<String, String> decomposeQueueNameForDurableSubscription(final String queueName)
+ {
+ StringBuffer[] parts = new StringBuffer[2];
+ int currentPart = 0;
- return new HornetQTemporaryQueue(null, name);
- }
- else if (address.startsWith(HornetQTemporaryTopic.JMS_TEMP_TOPIC_ADDRESS_PREFIX))
+ parts[0] = new StringBuffer();
+ parts[1] = new StringBuffer();
+
+ int pos = 0;
+ while (pos < queueName.length())
{
- String name = address.substring(HornetQTemporaryTopic.JMS_TEMP_TOPIC_ADDRESS_PREFIX.length());
+ char ch = queueName.charAt(pos);
+ pos++;
- return new HornetQTemporaryTopic(null, name);
+ if (ch == SEPARATOR)
+ {
+ currentPart++;
+ if (currentPart >= parts.length)
+ {
+ throw new IllegalArgumentException("Invalid message queue name: " + queueName);
+ }
+
+ continue;
+ }
+
+ if (ch == '\\')
+ {
+ if (pos >= queueName.length())
+ {
+ throw new IllegalArgumentException("Invalid message queue name: " + queueName);
+ }
+ ch = queueName.charAt(pos);
+ pos++;
+ }
+
+ parts[currentPart].append(ch);
}
- else
+
+ if (currentPart != 1)
{
- throw new IllegalArgumentException("Invalid address " + address);
+ throw new IllegalArgumentException("Invalid message queue name: " + queueName);
}
+
+ Pair<String, String> pair = new Pair<String, String>(parts[0].toString(), parts[1].toString());
+
+ return pair;
}
+
+ public static SimpleString createQueueAddressFromName(final String name)
+ {
+ return new SimpleString(JMS_QUEUE_ADDRESS_PREFIX + name);
+ }
+
+ public static SimpleString createTopicAddressFromName(final String name)
+ {
+ return new SimpleString(JMS_TOPIC_ADDRESS_PREFIX + name);
+ }
+
+ public static HornetQDestination createQueue(final String name)
+ {
+ return new HornetQDestination(JMS_QUEUE_ADDRESS_PREFIX.concat(name), name, false, true, null);
+ }
+
+ public static HornetQDestination createTopic(final String name)
+ {
+ return new HornetQDestination(JMS_TOPIC_ADDRESS_PREFIX.concat(name), name, false, false, null);
+ }
+
+ public static HornetQDestination createTemporaryQueue(final HornetQSession session)
+ {
+ String name = UUID.randomUUID().toString();
+
+ return new HornetQDestination(JMS_QUEUE_ADDRESS_PREFIX.concat(name), name, true, true, session);
+ }
+
+ public static HornetQDestination createTemporaryTopic(final HornetQSession session)
+ {
+ String name = UUID.randomUUID().toString();
+
+ return new HornetQDestination(JMS_TOPIC_ADDRESS_PREFIX.concat(name), name, true, false, session);
+ }
+
// Attributes ----------------------------------------------------
+ /**
+ * The JMS name
+ */
protected final String name;
+ /**
+ * The core address
+ */
private final String address;
+ /**
+ * SimpleString version of address
+ */
private final SimpleString simpleAddress;
-
+
+ private final boolean temporary;
+
+ private final boolean queue;
+
+ private final HornetQSession session;
+
// Constructors --------------------------------------------------
- public HornetQDestination(final String address, final String name)
+ private HornetQDestination(final String address, final String name,
+ final boolean temporary,
+ final boolean queue,
+ final HornetQSession session)
{
this.address = address;
this.name = name;
simpleAddress = new SimpleString(address);
+
+ this.temporary = temporary;
+
+ this.queue = queue;
+
+ this.session = session;
}
-
+
// Referenceable implementation ---------------------------------------
public Reference getReference() throws NamingException
@@ -114,6 +224,36 @@
null);
}
+ public String getQueueName()
+ {
+ return name;
+ }
+
+ public String getTopicName()
+ {
+ return name;
+ }
+
+ public void delete() throws JMSException
+ {
+ if (session != null)
+ {
+ if (queue)
+ {
+ session.deleteTemporaryQueue(this);
+ }
+ else
+ {
+ session.deleteTemporaryTopic(this);
+ }
+ }
+ }
+
+ public boolean isQueue()
+ {
+ return queue;
+ }
+
// Public --------------------------------------------------------
public String getAddress()
@@ -131,8 +271,11 @@
return name;
}
- public abstract boolean isTemporary();
-
+ public boolean isTemporary()
+ {
+ return temporary;
+ }
+
@Override
public boolean equals(final Object o)
{
Deleted: trunk/src/main/org/hornetq/jms/client/HornetQQueue.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQQueue.java 2010-01-29 13:43:32 UTC (rev 8858)
+++ trunk/src/main/org/hornetq/jms/client/HornetQQueue.java 2010-01-29 18:51:42 UTC (rev 8859)
@@ -1,93 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.jms.client;
-
-import javax.jms.JMSException;
-import javax.jms.Queue;
-
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.jms.client.HornetQDestination;
-import org.hornetq.core.logging.Logger;
-
-/**
- * HornetQ implementation of a JMS Queue.
- * <br>
- * This class can be instantiated directly.
- *
- * @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt>
- *
- * $Id$
- */
-public class HornetQQueue extends HornetQDestination implements Queue
-{
- // Constants -----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(HornetQQueue.class);
-
- private static final long serialVersionUID = -1106092883162295462L;
-
- public static final String JMS_QUEUE_ADDRESS_PREFIX = "jms.queue.";
-
- // Static --------------------------------------------------------
-
- public static SimpleString createAddressFromName(final String name)
- {
- return new SimpleString(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX + name);
- }
-
- // Attributes ----------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public HornetQQueue(final String name)
- {
- super(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX + name, name);
- }
-
- protected HornetQQueue(final String address, final String name)
- {
- super(address, name);
- }
-
- // Queue implementation ------------------------------------------
-
- public String getQueueName() throws JMSException
- {
- return name;
- }
-
- // Public --------------------------------------------------------
-
- @Override
- public boolean isTemporary()
- {
- return false;
- }
-
- @Override
- public String toString()
- {
- return "HornetQQueue[" + name + "]";
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Modified: trunk/src/main/org/hornetq/jms/client/HornetQQueueBrowser.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQQueueBrowser.java 2010-01-29 13:43:32 UTC (rev 8858)
+++ trunk/src/main/org/hornetq/jms/client/HornetQQueueBrowser.java 2010-01-29 18:51:42 UTC (rev 8859)
@@ -49,13 +49,13 @@
private ClientConsumer consumer;
- private final HornetQQueue queue;
+ private final HornetQDestination queue;
private SimpleString filterString;
// Constructors ---------------------------------------------------------------------------------
- protected HornetQQueueBrowser(final HornetQQueue queue, final String messageSelector, final ClientSession session) throws JMSException
+ protected HornetQQueueBrowser(final HornetQDestination queue, final String messageSelector, final ClientSession session) throws JMSException
{
this.session = session;
this.queue = queue;
Modified: trunk/src/main/org/hornetq/jms/client/HornetQSession.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQSession.java 2010-01-29 13:43:32 UTC (rev 8858)
+++ trunk/src/main/org/hornetq/jms/client/HornetQSession.java 2010-01-29 18:51:42 UTC (rev 8859)
@@ -57,10 +57,8 @@
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSession.BindingQuery;
import org.hornetq.api.core.client.ClientSession.QueueQuery;
-import org.hornetq.api.jms.*;
import org.hornetq.core.filter.impl.FilterImpl;
import org.hornetq.core.logging.Logger;
-import org.hornetq.jms.*;
/**
* HornetQ implementation of a JMS Session.
@@ -324,7 +322,7 @@
if (jbd != null)
{
- if (jbd instanceof Queue)
+ if (jbd.isQueue())
{
QueueQuery response = session.queueQuery(jbd.getSimpleAddress());
@@ -399,8 +397,8 @@
throw new IllegalStateException("Cannot create a queue using a TopicSession");
}
- HornetQQueue queue = (HornetQQueue) HornetQJMSClient.createQueue(queueName);
-
+ HornetQDestination queue = HornetQDestination.createQueue(queueName);
+
try
{
QueueQuery response = session.queueQuery(queue.getSimpleAddress());
@@ -428,8 +426,8 @@
throw new IllegalStateException("Cannot create a topic on a QueueSession");
}
- HornetQTopic topic = (HornetQTopic) HornetQJMSClient.createTopic(topicName);
-
+ HornetQDestination topic = HornetQDestination.createTopic(topicName);
+
try
{
BindingQuery query = session.bindingQuery(topic.getSimpleAddress());
@@ -468,7 +466,7 @@
{
throw new InvalidDestinationException("Cannot create a durable subscriber on a null topic");
}
- if (!(topic instanceof HornetQTopic))
+ if (!(topic instanceof HornetQDestination))
{
throw new InvalidDestinationException("Not a HornetQTopic:" + topic);
}
@@ -478,6 +476,11 @@
}
HornetQDestination jbdest = (HornetQDestination)topic;
+
+ if (jbdest.isQueue())
+ {
+ throw new InvalidDestinationException("Cannot create a subscriber on a queue");
+ }
return createConsumer(jbdest, name, messageSelector, noLocal);
}
@@ -486,7 +489,7 @@
final String subscriptionName,
String selectorString,
final boolean noLocal) throws JMSException
- {
+ {
try
{
selectorString = "".equals(selectorString) ? null : selectorString;
@@ -520,8 +523,8 @@
SimpleString autoDeleteQueueName = null;
- if (dest instanceof Queue)
- {
+ if (dest.isQueue())
+ {
QueueQuery response = session.queueQuery(dest.getSimpleAddress());
if (!response.isExists())
@@ -568,7 +571,7 @@
throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic");
}
- queueName = new SimpleString(HornetQTopic.createQueueNameForDurableSubscription(connection.getClientID(),
+ queueName = new SimpleString(HornetQDestination.createQueueNameForDurableSubscription(connection.getClientID(),
subscriptionName));
QueueQuery subResponse = session.queueQuery(queueName);
@@ -654,7 +657,7 @@
{
throw new InvalidDestinationException("Cannot create a browser with a null queue");
}
- if (!(queue instanceof HornetQQueue))
+ if (!(queue instanceof HornetQDestination))
{
throw new InvalidDestinationException("Not a HornetQQueue:" + queue);
}
@@ -673,7 +676,12 @@
throw JMSExceptionHelper.convertFromHornetQException(e);
}
- HornetQQueue jbq = (HornetQQueue)queue;
+ HornetQDestination jbq = (HornetQDestination)queue;
+
+ if (!jbq.isQueue())
+ {
+ throw new InvalidDestinationException("Cannot create a browser on a topic");
+ }
try
{
@@ -700,11 +708,9 @@
throw new IllegalStateException("Cannot create a temporary queue using a TopicSession");
}
- String queueName = UUID.randomUUID().toString();
-
try
{
- HornetQTemporaryQueue queue = new HornetQTemporaryQueue(this, queueName);
+ HornetQDestination queue = HornetQDestination.createTemporaryQueue(this);
SimpleString simpleAddress = queue.getSimpleAddress();
@@ -728,11 +734,9 @@
throw new IllegalStateException("Cannot create a temporary topic on a QueueSession");
}
- String topicName = UUID.randomUUID().toString();
-
try
{
- HornetQTemporaryTopic topic = new HornetQTemporaryTopic(this, topicName);
+ HornetQDestination topic = HornetQDestination.createTemporaryTopic(this);
SimpleString simpleAddress = topic.getSimpleAddress();
@@ -761,7 +765,7 @@
throw new IllegalStateException("Cannot unsubscribe using a QueueSession");
}
- SimpleString queueName = new SimpleString(HornetQTopic.createQueueNameForDurableSubscription(connection.getClientID(),
+ SimpleString queueName = new SimpleString(HornetQDestination.createQueueNameForDurableSubscription(connection.getClientID(),
name));
try
@@ -876,8 +880,13 @@
this.recoverCalled = recoverCalled;
}
- public void deleteTemporaryTopic(final HornetQTemporaryTopic tempTopic) throws JMSException
+ public void deleteTemporaryTopic(final HornetQDestination tempTopic) throws JMSException
{
+ if (!tempTopic.isTemporary())
+ {
+ throw new InvalidDestinationException("Not a temporary topic " + tempTopic);
+ }
+
try
{
BindingQuery response = session.bindingQuery(tempTopic.getSimpleAddress());
@@ -906,8 +915,12 @@
}
}
- public void deleteTemporaryQueue(final HornetQTemporaryQueue tempQueue) throws JMSException
+ public void deleteTemporaryQueue(final HornetQDestination tempQueue) throws JMSException
{
+ if (!tempQueue.isTemporary())
+ {
+ throw new InvalidDestinationException("Not a temporary queue " + tempQueue);
+ }
try
{
QueueQuery response = session.queueQuery(tempQueue.getSimpleAddress());
Deleted: trunk/src/main/org/hornetq/jms/client/HornetQTemporaryQueue.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQTemporaryQueue.java 2010-01-29 13:43:32 UTC (rev 8858)
+++ trunk/src/main/org/hornetq/jms/client/HornetQTemporaryQueue.java 2010-01-29 18:51:42 UTC (rev 8859)
@@ -1,81 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.jms.client;
-
-import javax.jms.JMSException;
-import javax.jms.TemporaryQueue;
-
-
-/**
- * HornetQ implementation of a JMS TemporaryQueue.
- * <br>
- * This class can be instantiated directly.
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 3569 $</tt>
- *
- * $Id: HornetQQueue.java 3569 2008-01-15 21:14:04Z timfox $
- */
-public class HornetQTemporaryQueue extends HornetQQueue implements TemporaryQueue
-{
- // Constants -----------------------------------------------------
-
- private static final long serialVersionUID = -4624930377557954624L;
-
- public static final String JMS_TEMP_QUEUE_ADDRESS_PREFIX = "jms.tempqueue.";
-
- // Static --------------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private final transient HornetQSession session;
-
- // Constructors --------------------------------------------------
-
- public HornetQTemporaryQueue(final HornetQSession session, final String name)
- {
- super(HornetQTemporaryQueue.JMS_TEMP_QUEUE_ADDRESS_PREFIX + name, name);
-
- this.session = session;
- }
-
- // TemporaryQueue implementation ------------------------------------------
-
- public void delete() throws JMSException
- {
- session.deleteTemporaryQueue(this);
- }
-
- // Public --------------------------------------------------------
-
- @Override
- public boolean isTemporary()
- {
- return true;
- }
-
- @Override
- public String toString()
- {
- return "HornetQTemporaryQueue[" + name + "]";
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Deleted: trunk/src/main/org/hornetq/jms/client/HornetQTemporaryTopic.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQTemporaryTopic.java 2010-01-29 13:43:32 UTC (rev 8858)
+++ trunk/src/main/org/hornetq/jms/client/HornetQTemporaryTopic.java 2010-01-29 18:51:42 UTC (rev 8859)
@@ -1,81 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.jms.client;
-
-import javax.jms.JMSException;
-import javax.jms.TemporaryTopic;
-
-
-/**
- * HornetQ implementation of a JMS TemporaryTopic.
- * <br>
- * This class can be instantiated directly.
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 3569 $</tt>
- *
- * $Id: HornetQQueue.java 3569 2008-01-15 21:14:04Z timfox $
- */
-public class HornetQTemporaryTopic extends HornetQTopic implements TemporaryTopic
-{
- // Constants -----------------------------------------------------
-
- private static final long serialVersionUID = 845450764835635266L;
-
- public static final String JMS_TEMP_TOPIC_ADDRESS_PREFIX = "jms.temptopic.";
-
- // Static --------------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private final transient HornetQSession session;
-
- // Constructors --------------------------------------------------
-
- public HornetQTemporaryTopic(final HornetQSession session, final String name)
- {
- super(HornetQTemporaryTopic.JMS_TEMP_TOPIC_ADDRESS_PREFIX + name, name);
-
- this.session = session;
- }
-
- // TemporaryTopic implementation ------------------------------------------
-
- public void delete() throws JMSException
- {
- session.deleteTemporaryTopic(this);
- }
-
- // Public --------------------------------------------------------
-
- @Override
- public boolean isTemporary()
- {
- return true;
- }
-
- @Override
- public String toString()
- {
- return "HornetQTemporaryTopic[" + name + "]";
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Deleted: trunk/src/main/org/hornetq/jms/client/HornetQTopic.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQTopic.java 2010-01-29 13:43:32 UTC (rev 8858)
+++ trunk/src/main/org/hornetq/jms/client/HornetQTopic.java 2010-01-29 18:51:42 UTC (rev 8859)
@@ -1,146 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.jms.client;
-
-import javax.jms.JMSException;
-import javax.jms.Topic;
-
-import org.hornetq.api.core.Pair;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.jms.client.HornetQDestination;
-
-/**
- * HornetQ implementation of a JMS Topic.
- * <br>
- * This class can be instantiated directly.
- *
- * @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt>
- *
- * $Id$
- */
-public class HornetQTopic extends HornetQDestination implements Topic
-{
- // Constants -----------------------------------------------------
-
- private static final long serialVersionUID = 7873614001276404156L;
-
- public static final String JMS_TOPIC_ADDRESS_PREFIX = "jms.topic.";
-
- private static final char SEPARATOR = '.';
-
- // Static --------------------------------------------------------
-
- public static String createQueueNameForDurableSubscription(final String clientID, final String subscriptionName)
- {
- return HornetQDestination.escape(clientID) + HornetQTopic.SEPARATOR + HornetQDestination.escape(subscriptionName);
- }
-
- public static Pair<String, String> decomposeQueueNameForDurableSubscription(final String queueName)
- {
- StringBuffer[] parts = new StringBuffer[2];
- int currentPart = 0;
-
- parts[0] = new StringBuffer();
- parts[1] = new StringBuffer();
-
- int pos = 0;
- while (pos < queueName.length())
- {
- char ch = queueName.charAt(pos);
- pos++;
-
- if (ch == HornetQTopic.SEPARATOR)
- {
- currentPart++;
- if (currentPart >= parts.length)
- {
- throw new IllegalArgumentException("Invalid message queue name: " + queueName);
- }
-
- continue;
- }
-
- if (ch == '\\')
- {
- if (pos >= queueName.length())
- {
- throw new IllegalArgumentException("Invalid message queue name: " + queueName);
- }
- ch = queueName.charAt(pos);
- pos++;
- }
-
- parts[currentPart].append(ch);
- }
-
- if (currentPart != 1)
- {
- throw new IllegalArgumentException("Invalid message queue name: " + queueName);
- }
-
- Pair<String, String> pair = new Pair<String, String>(parts[0].toString(), parts[1].toString());
-
- return pair;
- }
-
- public static SimpleString createAddressFromName(final String name)
- {
- return new SimpleString(HornetQTopic.JMS_TOPIC_ADDRESS_PREFIX + name);
- }
-
- // Attributes ----------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public HornetQTopic(final String name)
- {
- super(HornetQTopic.JMS_TOPIC_ADDRESS_PREFIX + name, name);
- }
-
- protected HornetQTopic(final String address, final String name)
- {
- super(address, name);
- }
-
- // Topic implementation ------------------------------------------
-
- public String getTopicName() throws JMSException
- {
- return name;
- }
-
- // Public --------------------------------------------------------
-
- @Override
- public boolean isTemporary()
- {
- return false;
- }
-
- @Override
- public String toString()
- {
- return "HornetQTopic[" + name + "]";
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Modified: trunk/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java 2010-01-29 13:43:32 UTC (rev 8858)
+++ trunk/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java 2010-01-29 18:51:42 UTC (rev 8859)
@@ -27,8 +27,8 @@
import org.hornetq.core.management.impl.MBeanInfoHelper;
import org.hornetq.core.messagecounter.MessageCounter;
import org.hornetq.core.messagecounter.impl.MessageCounterHelper;
+import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.client.HornetQMessage;
-import org.hornetq.jms.client.HornetQQueue;
import org.hornetq.jms.client.SelectorTranslator;
import org.hornetq.utils.json.JSONArray;
import org.hornetq.utils.json.JSONObject;
@@ -47,7 +47,7 @@
// Attributes ----------------------------------------------------
- private final HornetQQueue managedQueue;
+ private final HornetQDestination managedQueue;
private final QueueControl coreQueueControl;
@@ -83,7 +83,7 @@
// Constructors --------------------------------------------------
- public JMSQueueControlImpl(final HornetQQueue managedQueue,
+ public JMSQueueControlImpl(final HornetQDestination managedQueue,
final QueueControl coreQueueControl,
final String jndiBinding,
final MessageCounter counter) throws Exception
@@ -275,7 +275,7 @@
public boolean moveMessage(final String messageID, final String otherQueueName) throws Exception
{
String filter = JMSQueueControlImpl.createFilterForJMSMessageID(messageID);
- HornetQQueue otherQueue = (HornetQQueue) HornetQJMSClient.createQueue(otherQueueName);
+ HornetQDestination otherQueue = HornetQDestination.createQueue(otherQueueName);
int moved = coreQueueControl.moveMessages(filter, otherQueue.getAddress());
if (moved != 1)
{
@@ -288,7 +288,7 @@
public int moveMessages(final String filterStr, final String otherQueueName) throws Exception
{
String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr);
- HornetQQueue otherQueue = (HornetQQueue) HornetQJMSClient.createQueue(otherQueueName);
+ HornetQDestination otherQueue = HornetQDestination.createQueue(otherQueueName);
return coreQueueControl.moveMessages(filter, otherQueue.getAddress());
}
Modified: trunk/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java 2010-01-29 13:43:32 UTC (rev 8858)
+++ trunk/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java 2010-01-29 18:51:42 UTC (rev 8859)
@@ -31,8 +31,8 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.impl.MBeanInfoHelper;
import org.hornetq.core.server.management.ManagementService;
+import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.client.HornetQMessage;
-import org.hornetq.jms.client.HornetQTopic;
import org.hornetq.jms.client.SelectorTranslator;
import org.hornetq.utils.json.JSONArray;
import org.hornetq.utils.json.JSONObject;
@@ -51,7 +51,7 @@
// Attributes ----------------------------------------------------
- private final HornetQTopic managedTopic;
+ private final HornetQDestination managedTopic;
private final String binding;
@@ -69,7 +69,7 @@
// Constructors --------------------------------------------------
- public JMSTopicControlImpl(final HornetQTopic topic,
+ public JMSTopicControlImpl(final HornetQDestination topic,
final AddressControl addressControl,
final String jndiBinding,
final ManagementService managementService) throws Exception
@@ -191,7 +191,7 @@
public int countMessagesForSubscription(final String clientID, final String subscriptionName, final String filterStr) throws Exception
{
- String queueName = HornetQTopic.createQueueNameForDurableSubscription(clientID, subscriptionName);
+ String queueName = HornetQDestination.createQueueNameForDurableSubscription(clientID, subscriptionName);
QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queueName);
if (coreQueueControl == null)
{
@@ -217,7 +217,7 @@
public void dropDurableSubscription(final String clientID, final String subscriptionName) throws Exception
{
- String queueName = HornetQTopic.createQueueNameForDurableSubscription(clientID, subscriptionName);
+ String queueName = HornetQDestination.createQueueNameForDurableSubscription(clientID, subscriptionName);
QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queueName);
if (coreQueueControl == null)
{
@@ -255,7 +255,7 @@
if (queue.isDurable())
{
- Pair<String, String> pair = HornetQTopic.decomposeQueueNameForDurableSubscription(queue.getName()
+ Pair<String, String> pair = HornetQDestination.decomposeQueueNameForDurableSubscription(queue.getName()
.toString());
clientID = pair.a;
subName = pair.b;
@@ -287,7 +287,7 @@
if (queue.isDurable())
{
- Pair<String, String> pair = HornetQTopic.decomposeQueueNameForDurableSubscription(queue.getName()
+ Pair<String, String> pair = HornetQDestination.decomposeQueueNameForDurableSubscription(queue.getName()
.toString());
clientID = pair.a;
subName = pair.b;
Modified: trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-01-29 13:43:32 UTC (rev 8858)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-01-29 18:51:42 UTC (rev 8859)
@@ -36,8 +36,7 @@
import org.hornetq.core.server.ActivateCallback;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.jms.client.HornetQConnectionFactory;
-import org.hornetq.jms.client.HornetQQueue;
-import org.hornetq.jms.client.HornetQTopic;
+import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.client.SelectorTranslator;
import org.hornetq.jms.server.JMSServerManager;
import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
@@ -258,7 +257,7 @@
final boolean durable) throws Exception
{
checkInitialised();
- HornetQQueue jBossQueue = new HornetQQueue(queueName);
+ HornetQDestination jBossQueue = HornetQDestination.createQueue(queueName);
// Convert from JMS selector to core filter
String coreFilterString = null;
@@ -287,7 +286,7 @@
public synchronized boolean createTopic(final String topicName, final String jndiBinding) throws Exception
{
checkInitialised();
- HornetQTopic jBossTopic = new HornetQTopic(topicName);
+ HornetQDestination jBossTopic = HornetQDestination.createTopic(topicName);
// We create a dummy subscription on the topic, that never receives messages - this is so we can perform JMS
// checks when routing messages to a topic that
// does not exist - otherwise we would not be able to distinguish from a non existent topic and one with no
@@ -333,7 +332,7 @@
destinations.remove(name);
jmsManagementService.unregisterQueue(name);
- server.getHornetQServerControl().destroyQueue(HornetQQueue.createAddressFromName(name).toString());
+ server.getHornetQServerControl().destroyQueue(HornetQDestination.createQueueAddressFromName(name).toString());
return true;
}
@@ -345,7 +344,7 @@
destinations.remove(name);
jmsManagementService.unregisterTopic(name);
- server.getHornetQServerControl().destroyQueue(HornetQTopic.createAddressFromName(name).toString());
+ server.getHornetQServerControl().destroyQueue(HornetQDestination.createTopicAddressFromName(name).toString());
return true;
}
Modified: trunk/src/main/org/hornetq/jms/server/management/JMSManagementService.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/management/JMSManagementService.java 2010-01-29 13:43:32 UTC (rev 8858)
+++ trunk/src/main/org/hornetq/jms/server/management/JMSManagementService.java 2010-01-29 18:51:42 UTC (rev 8859)
@@ -17,8 +17,7 @@
import org.hornetq.api.jms.management.JMSServerControl;
import org.hornetq.jms.client.HornetQConnectionFactory;
-import org.hornetq.jms.client.HornetQQueue;
-import org.hornetq.jms.client.HornetQTopic;
+import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.server.JMSServerManager;
/**
@@ -33,11 +32,11 @@
void unregisterJMSServer() throws Exception;
- void registerQueue(HornetQQueue queue, String jndiBinding) throws Exception;
+ void registerQueue(HornetQDestination queue, String jndiBinding) throws Exception;
void unregisterQueue(String name) throws Exception;
- void registerTopic(HornetQTopic topic, String jndiBinding) throws Exception;
+ void registerTopic(HornetQDestination topic, String jndiBinding) throws Exception;
void unregisterTopic(String name) throws Exception;
Modified: trunk/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java 2010-01-29 13:43:32 UTC (rev 8858)
+++ trunk/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java 2010-01-29 18:51:42 UTC (rev 8859)
@@ -28,8 +28,7 @@
import org.hornetq.core.messagecounter.MessageCounterManager;
import org.hornetq.core.server.management.ManagementService;
import org.hornetq.jms.client.HornetQConnectionFactory;
-import org.hornetq.jms.client.HornetQQueue;
-import org.hornetq.jms.client.HornetQTopic;
+import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.management.impl.JMSConnectionFactoryControlImpl;
import org.hornetq.jms.management.impl.JMSQueueControlImpl;
import org.hornetq.jms.management.impl.JMSServerControlImpl;
@@ -78,7 +77,7 @@
managementService.unregisterFromRegistry(ResourceNames.JMS_SERVER);
}
- public synchronized void registerQueue(final HornetQQueue queue, final String jndiBinding) throws Exception
+ public synchronized void registerQueue(final HornetQDestination queue, final String jndiBinding) throws Exception
{
QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queue.getAddress());
MessageCounterManager messageCounterManager = managementService.getMessageCounterManager();
@@ -102,7 +101,7 @@
managementService.unregisterFromRegistry(ResourceNames.JMS_QUEUE + name);
}
- public synchronized void registerTopic(final HornetQTopic topic, final String jndiBinding) throws Exception
+ public synchronized void registerTopic(final HornetQDestination topic, final String jndiBinding) throws Exception
{
ObjectName objectName = managementService.getObjectNameBuilder().getJMSTopicObjectName(topic.getTopicName());
AddressControl addressControl = (AddressControl)managementService.getResource(ResourceNames.CORE_ADDRESS + topic.getAddress());
Modified: trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java 2010-01-29 13:43:32 UTC (rev 8858)
+++ trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java 2010-01-29 18:51:42 UTC (rev 8859)
@@ -28,8 +28,8 @@
import org.hornetq.api.core.client.MessageHandler;
import org.hornetq.api.core.client.ClientSession.QueueQuery;
import org.hornetq.core.logging.Logger;
+import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.client.HornetQMessage;
-import org.hornetq.jms.client.HornetQTopic;
/**
* The message handler
@@ -95,7 +95,7 @@
throw new InvalidClientIDException("Cannot create durable subscription - client ID has not been set");
}
- SimpleString queueName = new SimpleString(HornetQTopic.createQueueNameForDurableSubscription(activation.getActivationSpec()
+ SimpleString queueName = new SimpleString(HornetQDestination.createQueueNameForDurableSubscription(activation.getActivationSpec()
.getClientID(),
subscriptionName));
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/ReferenceableTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/ReferenceableTest.java 2010-01-29 13:43:32 UTC (rev 8858)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/ReferenceableTest.java 2010-01-29 18:51:42 UTC (rev 8859)
@@ -26,8 +26,7 @@
import javax.naming.Referenceable;
import org.hornetq.jms.client.HornetQConnectionFactory;
-import org.hornetq.jms.client.HornetQQueue;
-import org.hornetq.jms.client.HornetQTopic;
+import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.referenceable.ConnectionFactoryObjectFactory;
import org.hornetq.jms.referenceable.DestinationObjectFactory;
import org.hornetq.jms.tests.util.ProxyAssertSupport;
@@ -104,9 +103,9 @@
Object instance = factory.getObjectInstance(queueRef, null, null, null);
- ProxyAssertSupport.assertTrue(instance instanceof HornetQQueue);
+ ProxyAssertSupport.assertTrue(instance instanceof HornetQDestination);
- HornetQQueue queue2 = (HornetQQueue)instance;
+ HornetQDestination queue2 = (HornetQDestination)instance;
ProxyAssertSupport.assertEquals(HornetQServerTestCase.queue1.getQueueName(), queue2.getQueueName());
@@ -126,10 +125,10 @@
Object instance = factory.getObjectInstance(topicRef, null, null, null);
- ProxyAssertSupport.assertTrue(instance instanceof HornetQTopic);
+ ProxyAssertSupport.assertTrue(instance instanceof HornetQDestination);
+
+ HornetQDestination topic2 = (HornetQDestination)instance;
- HornetQTopic topic2 = (HornetQTopic)instance;
-
ProxyAssertSupport.assertEquals(HornetQServerTestCase.topic1.getTopicName(), topic2.getTopicName());
simpleSendReceive(JMSTestCase.cf, topic2);
Added: trunk/tests/src/org/hornetq/tests/integration/jms/client/CreateQueueTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/CreateQueueTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/CreateQueueTest.java 2010-01-29 18:51:42 UTC (rev 8859)
@@ -0,0 +1,204 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.jms.client;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.hornetq.core.logging.Logger;
+import org.hornetq.jms.client.HornetQDestination;
+import org.hornetq.tests.util.JMSTestBase;
+
+/**
+ *
+ * A CreateQueueTest
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class CreateQueueTest extends JMSTestBase
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(CreateQueueTest.class);
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testCreateQueueTempQueue() throws Exception
+ {
+ Connection conn = cf.createConnection();
+
+ try
+ {
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue tempQueue = session.createTemporaryQueue();
+
+ String tempQueueName = tempQueue.getQueueName();
+
+ assertFalse(tempQueueName.startsWith(HornetQDestination.JMS_QUEUE_ADDRESS_PREFIX));
+
+ Queue replyQueue = session.createQueue(tempQueueName);
+
+ MessageProducer producer = session.createProducer(replyQueue);
+
+ producer.send(session.createMessage());
+
+ MessageConsumer consumer= session.createConsumer(replyQueue);
+
+ conn.start();
+
+ assertNotNull(consumer.receive(10000));
+
+ }
+ finally
+ {
+ try
+ {
+ conn.close();
+ }
+ catch (Throwable igonred)
+ {
+ }
+ }
+ }
+
+ public void testCreateQueue() throws Exception
+ {
+ Connection conn = cf.createConnection();
+
+ try
+ {
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue queue = createQueue("TestQueue");
+
+ String queueName = queue.getQueueName();
+
+ log.info("queue name is " + queueName);
+
+ assertFalse(queueName.startsWith(HornetQDestination.JMS_QUEUE_ADDRESS_PREFIX));
+
+ Queue replyQueue = session.createQueue(queueName);
+
+ MessageProducer producer = session.createProducer(replyQueue);
+
+ producer.send(session.createMessage());
+
+ MessageConsumer consumer= session.createConsumer(replyQueue);
+
+ conn.start();
+
+ assertNotNull(consumer.receive(10000));
+
+ }
+ finally
+ {
+ try
+ {
+ conn.close();
+ }
+ catch (Throwable igonred)
+ {
+ }
+ }
+ }
+
+ public void testCreateTopic() throws Exception
+ {
+ Connection conn = cf.createConnection();
+
+ try
+ {
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Topic topic = createTopic("TestTopic");
+
+ String topicName = topic.getTopicName();
+
+ assertFalse(topicName.startsWith(HornetQDestination.JMS_TOPIC_ADDRESS_PREFIX));
+
+ Topic replyTopic = session.createTopic(topicName);
+
+ MessageConsumer consumer= session.createConsumer(replyTopic);
+
+ conn.start();
+
+ MessageProducer producer = session.createProducer(replyTopic);
+
+ producer.send(session.createMessage());
+
+ assertNotNull(consumer.receive(10000));
+ }
+ finally
+ {
+ try
+ {
+ conn.close();
+ }
+ catch (Throwable igonred)
+ {
+ }
+ }
+ }
+
+ public void testCreateTopicTempTopic() throws Exception
+ {
+ Connection conn = cf.createConnection();
+
+ try
+ {
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Topic tempTopic = session.createTemporaryTopic();
+
+ String tempTopicName = tempTopic.getTopicName();
+
+ assertFalse(tempTopicName.startsWith(HornetQDestination.JMS_TOPIC_ADDRESS_PREFIX));
+
+ Topic replyTopic = session.createTopic(tempTopicName);
+
+ MessageConsumer consumer= session.createConsumer(replyTopic);
+
+ conn.start();
+
+ MessageProducer producer = session.createProducer(replyTopic);
+
+ producer.send(session.createMessage());
+
+ assertNotNull(consumer.receive(10000));
+ }
+ finally
+ {
+ try
+ {
+ conn.close();
+ }
+ catch (Throwable igonred)
+ {
+ }
+ }
+ }
+}
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2010-01-29 13:43:32 UTC (rev 8858)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2010-01-29 18:51:42 UTC (rev 8859)
@@ -43,7 +43,7 @@
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
import org.hornetq.jms.client.HornetQConnectionFactory;
-import org.hornetq.jms.client.HornetQQueue;
+import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.client.HornetQSession;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.util.RandomUtil;
@@ -113,7 +113,7 @@
RemotingConnection coreConn = ((ClientSessionInternal)coreSession).getConnection();
- SimpleString jmsQueueName = new SimpleString(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX + "myqueue");
+ SimpleString jmsQueueName = new SimpleString(HornetQDestination.JMS_QUEUE_ADDRESS_PREFIX + "myqueue");
coreSession.createQueue(jmsQueueName, jmsQueueName, null, true);
@@ -192,7 +192,7 @@
RemotingConnection coreConnLive = ((ClientSessionInternal)coreSessionLive).getConnection();
- SimpleString jmsQueueName = new SimpleString(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX + "myqueue");
+ SimpleString jmsQueueName = new SimpleString(HornetQDestination.JMS_QUEUE_ADDRESS_PREFIX + "myqueue");
coreSessionLive.createQueue(jmsQueueName, jmsQueueName, null, true);
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSReconnectTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSReconnectTest.java 2010-01-29 13:43:32 UTC (rev 8858)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSReconnectTest.java 2010-01-29 18:51:42 UTC (rev 8859)
@@ -41,9 +41,8 @@
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
import org.hornetq.jms.client.HornetQConnectionFactory;
-import org.hornetq.jms.client.HornetQQueue;
+import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.client.HornetQSession;
-import org.hornetq.jms.client.HornetQTopic;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
@@ -120,7 +119,7 @@
RemotingConnection coreConn = ((ClientSessionInternal)coreSession).getConnection();
- SimpleString jmsQueueName = new SimpleString(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX + "myqueue");
+ SimpleString jmsQueueName = new SimpleString(HornetQDestination.JMS_QUEUE_ADDRESS_PREFIX + "myqueue");
coreSession.createQueue(jmsQueueName, jmsQueueName, null, true);
@@ -145,8 +144,6 @@
conn.start();
- log.info("sent messages and started connection");
-
Thread.sleep(2000);
HornetQException me = new HornetQException(HornetQException.NOT_CONNECTED);
@@ -157,8 +154,6 @@
for (int i = 0; i < numMessages; i++)
{
- log.info("got message " + i);
-
BytesMessage bm = (BytesMessage)consumer.receive(1000);
Assert.assertNotNull(bm);
@@ -188,7 +183,7 @@
}
//Test that non durable JMS sub gets recreated in auto reconnect
- private void testReconnectSameNodeServerRestartedWithNonDurableSubOrTempQueue(final boolean durableSub) throws Exception
+ private void testReconnectSameNodeServerRestartedWithNonDurableSubOrTempQueue(final boolean nonDurableSub) throws Exception
{
HornetQConnectionFactory jbcf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactory(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
@@ -206,9 +201,9 @@
Destination dest;
- if (durableSub)
- {
- coreSession.createQueue(HornetQTopic.JMS_TOPIC_ADDRESS_PREFIX + "mytopic", "blahblah", null, false);
+ if (nonDurableSub)
+ {
+ coreSession.createQueue(HornetQDestination.JMS_TOPIC_ADDRESS_PREFIX + "mytopic", "blahblah", null, false);
dest = HornetQJMSClient.createTopic("mytopic");
}
@@ -229,8 +224,6 @@
//Allow client some time to reconnect
Thread.sleep(3000);
- log.info("now sending some messages");
-
final int numMessages = 100;
byte[] body = RandomUtil.randomBytes(1000);
@@ -277,7 +270,7 @@
ClientSession coreSession = ((HornetQSession)sess).getCoreSession();
- coreSession.createQueue(HornetQTopic.JMS_TOPIC_ADDRESS_PREFIX + "mytopic", "blahblah", null, false);
+ coreSession.createQueue(HornetQDestination.JMS_TOPIC_ADDRESS_PREFIX + "mytopic", "blahblah", null, false);
Topic topic = HornetQJMSClient.createTopic("mytopic");
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/consumer/ConsumerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/consumer/ConsumerTest.java 2010-01-29 13:43:32 UTC (rev 8858)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/consumer/ConsumerTest.java 2010-01-29 18:51:42 UTC (rev 8859)
@@ -30,7 +30,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.Queue;
import org.hornetq.jms.client.HornetQConnectionFactory;
-import org.hornetq.jms.client.HornetQQueue;
+import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.tests.util.JMSTestBase;
/**
@@ -42,7 +42,7 @@
private static final String Q_NAME = "ConsumerTestQueue";
- private HornetQQueue jBossQueue;
+ private javax.jms.Queue jBossQueue;
@Override
protected void setUp() throws Exception
@@ -65,7 +65,7 @@
{
Connection conn = cf.createConnection();
Session session = conn.createSession(false, HornetQJMSConstants.PRE_ACKNOWLEDGE);
- jBossQueue = (HornetQQueue) HornetQJMSClient.createQueue(ConsumerTest.Q_NAME);
+ jBossQueue = HornetQJMSClient.createQueue(ConsumerTest.Q_NAME);
MessageProducer producer = session.createProducer(jBossQueue);
MessageConsumer consumer = session.createConsumer(jBossQueue);
int noOfMessages = 100;
@@ -81,7 +81,7 @@
Assert.assertNotNull(m);
}
- SimpleString queueName = new SimpleString(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX + ConsumerTest.Q_NAME);
+ SimpleString queueName = new SimpleString(HornetQDestination.JMS_QUEUE_ADDRESS_PREFIX + ConsumerTest.Q_NAME);
Assert.assertEquals(0, ((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
Assert.assertEquals(0, ((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getMessageCount());
conn.close();
@@ -93,7 +93,7 @@
Connection conn = cf.createConnection();
Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- jBossQueue = (HornetQQueue) HornetQJMSClient.createQueue(ConsumerTest.Q_NAME);
+ jBossQueue = HornetQJMSClient.createQueue(ConsumerTest.Q_NAME);
MessageProducer producer = session.createProducer(jBossQueue);
MessageConsumer consumer = session.createConsumer(jBossQueue);
int noOfMessages = 100;
@@ -110,7 +110,7 @@
}
// Messages should all have been acked since we set pre ack on the cf
- SimpleString queueName = new SimpleString(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX + ConsumerTest.Q_NAME);
+ SimpleString queueName = new SimpleString(HornetQDestination.JMS_QUEUE_ADDRESS_PREFIX + ConsumerTest.Q_NAME);
Assert.assertEquals(0, ((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
Assert.assertEquals(0, ((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getMessageCount());
conn.close();
@@ -122,7 +122,7 @@
Connection conn = cf.createConnection();
Session session = conn.createSession(false, HornetQJMSConstants.PRE_ACKNOWLEDGE);
- jBossQueue = (HornetQQueue) HornetQJMSClient.createQueue(ConsumerTest.Q_NAME);
+ jBossQueue = HornetQJMSClient.createQueue(ConsumerTest.Q_NAME);
MessageProducer producer = session.createProducer(jBossQueue);
MessageConsumer consumer = session.createConsumer(jBossQueue);
int noOfMessages = 1000;
@@ -152,7 +152,7 @@
((HornetQConnectionFactory)cf).setPreAcknowledge(true);
Connection conn = cf.createConnection();
Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- jBossQueue = (HornetQQueue) HornetQJMSClient.createQueue(ConsumerTest.Q_NAME);
+ jBossQueue = HornetQJMSClient.createQueue(ConsumerTest.Q_NAME);
MessageProducer producer = session.createProducer(jBossQueue);
MessageConsumer consumer = session.createConsumer(jBossQueue);
int noOfMessages = 1000;
@@ -180,7 +180,7 @@
{
Connection conn = cf.createConnection();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- jBossQueue = (HornetQQueue) HornetQJMSClient.createQueue(ConsumerTest.Q_NAME);
+ jBossQueue = HornetQJMSClient.createQueue(ConsumerTest.Q_NAME);
MessageConsumer consumer = session.createConsumer(jBossQueue);
consumer.setMessageListener(new MessageListener()
{
@@ -199,7 +199,7 @@
{
Connection conn = cf.createConnection();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- jBossQueue = (HornetQQueue) HornetQJMSClient.createQueue(ConsumerTest.Q_NAME);
+ jBossQueue = HornetQJMSClient.createQueue(ConsumerTest.Q_NAME);
MessageConsumer consumer = session.createConsumer(jBossQueue);
consumer.setMessageListener(new MessageListener()
{
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java 2010-01-29 13:43:32 UTC (rev 8858)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java 2010-01-29 18:51:42 UTC (rev 8859)
@@ -19,6 +19,7 @@
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
+import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
@@ -27,7 +28,6 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.jms.client.HornetQQueue;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.server.cluster.DivertConfiguration;
import org.hornetq.tests.util.JMSTestBase;
@@ -56,8 +56,8 @@
public void testAutoACK() throws Exception
{
- HornetQQueue queueSource = (HornetQQueue)createQueue("Source");
- HornetQQueue queueTarget = (HornetQQueue)createQueue("Dest");
+ Queue queueSource = createQueue("Source");
+ Queue queueTarget = createQueue("Dest");
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -79,8 +79,8 @@
public void testClientACK() throws Exception
{
- HornetQQueue queueSource = (HornetQQueue)createQueue("Source");
- HornetQQueue queueTarget = (HornetQQueue)createQueue("Dest");
+ Queue queueSource = createQueue("Source");
+ Queue queueTarget = createQueue("Dest");
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java 2010-01-29 13:43:32 UTC (rev 8858)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java 2010-01-29 18:51:42 UTC (rev 8859)
@@ -39,7 +39,7 @@
import org.hornetq.core.server.HornetQServers;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.jms.client.HornetQConnectionFactory;
-import org.hornetq.jms.client.HornetQQueue;
+import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.tests.integration.management.ManagementControlHelper;
import org.hornetq.tests.integration.management.ManagementTestBase;
@@ -66,7 +66,7 @@
private JMSServerManagerImpl serverManager;
- protected HornetQQueue queue;
+ protected HornetQDestination queue;
protected Context context;
@@ -416,7 +416,7 @@
{
JMSQueueControl queueControl = createManagementControl();
String expiryQueueName = RandomUtil.randomString();
- HornetQQueue expiryQueue = (HornetQQueue) HornetQJMSClient.createQueue(expiryQueueName);
+ HornetQDestination expiryQueue = (HornetQDestination)HornetQJMSClient.createQueue(expiryQueueName);
serverManager.createQueue(expiryQueueName, expiryQueueName, null, true);
queueControl.setExpiryAddress(expiryQueue.getAddress());
@@ -545,7 +545,7 @@
{
String deadLetterQueue = RandomUtil.randomString();
serverManager.createQueue(deadLetterQueue, deadLetterQueue, null, true);
- HornetQQueue dlq = (HornetQQueue) HornetQJMSClient.createQueue(deadLetterQueue);
+ HornetQDestination dlq = (HornetQDestination)HornetQJMSClient.createQueue(deadLetterQueue);
Connection conn = createConnection();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -606,7 +606,7 @@
String deadLetterQueue = RandomUtil.randomString();
serverManager.createQueue(deadLetterQueue, deadLetterQueue, null, true);
- HornetQQueue dlq = (HornetQQueue) HornetQJMSClient.createQueue(deadLetterQueue);
+ HornetQDestination dlq = (HornetQDestination)HornetQJMSClient.createQueue(deadLetterQueue);
Connection conn = createConnection();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -648,7 +648,7 @@
String otherQueueName = RandomUtil.randomString();
serverManager.createQueue(otherQueueName, otherQueueName, null, true);
- HornetQQueue otherQueue = (HornetQQueue) HornetQJMSClient.createQueue(otherQueueName);
+ HornetQDestination otherQueue = (HornetQDestination)HornetQJMSClient.createQueue(otherQueueName);
// send on queue
JMSUtil.sendMessages(queue, 2);
@@ -695,7 +695,7 @@
String otherQueueName = RandomUtil.randomString();
serverManager.createQueue(otherQueueName, otherQueueName, null, true);
- HornetQQueue otherQueue = (HornetQQueue) HornetQJMSClient.createQueue(otherQueueName);
+ HornetQDestination otherQueue = (HornetQDestination)HornetQJMSClient.createQueue(otherQueueName);
Connection connection = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -730,7 +730,7 @@
String otherQueueName = RandomUtil.randomString();
serverManager.createQueue(otherQueueName, otherQueueName, null, true);
- HornetQQueue otherQueue = (HornetQQueue) HornetQJMSClient.createQueue(otherQueueName);
+ HornetQDestination otherQueue = (HornetQDestination)HornetQJMSClient.createQueue(otherQueueName);
String[] messageIDs = JMSUtil.sendMessages(queue, 1);
@@ -834,7 +834,7 @@
String queueName = RandomUtil.randomString();
serverManager.createQueue(queueName, queueName, null, true);
- queue = (HornetQQueue) HornetQJMSClient.createQueue(queueName);
+ queue = (HornetQDestination)HornetQJMSClient.createQueue(queueName);
}
@Override
@@ -866,7 +866,7 @@
private Connection createConnection() throws JMSException
{
- HornetQConnectionFactory cf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ HornetQConnectionFactory cf = (HornetQConnectionFactory)HornetQJMSClient.createConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
cf.setBlockOnDurableSend(true);
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java 2010-01-29 13:43:32 UTC (rev 8858)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java 2010-01-29 18:51:42 UTC (rev 8859)
@@ -22,10 +22,10 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.management.ResourceNames;
import org.hornetq.api.jms.HornetQJMSClient;
-import org.hornetq.jms.client.HornetQConnectionFactory;
-import org.hornetq.jms.client.HornetQQueue;
import org.hornetq.api.jms.management.JMSQueueControl;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQDestination;
/**
*
@@ -77,7 +77,7 @@
@Override
protected JMSQueueControl createManagementControl() throws Exception
{
- HornetQQueue managementQueue = (HornetQQueue) HornetQJMSClient.createQueue("hornetq.management");
+ HornetQDestination managementQueue = (HornetQDestination) HornetQJMSClient.createQueue("hornetq.management");
final JMSMessagingProxy proxy = new JMSMessagingProxy(session,
managementQueue,
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2010-01-29 13:43:32 UTC (rev 8858)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2010-01-29 18:51:42 UTC (rev 8859)
@@ -22,11 +22,10 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.management.ResourceNames;
import org.hornetq.api.jms.HornetQJMSClient;
-import org.hornetq.jms.client.HornetQConnectionFactory;
-import org.hornetq.jms.client.HornetQQueue;
import org.hornetq.api.jms.management.JMSServerControl;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQDestination;
/**
* A JMSServerControlUsingCoreTest
@@ -86,7 +85,7 @@
@Override
protected JMSServerControl createManagementControl() throws Exception
{
- HornetQQueue managementQueue = (HornetQQueue) HornetQJMSClient.createQueue("hornetq.management");
+ HornetQDestination managementQueue = (HornetQDestination) HornetQJMSClient.createQueue("hornetq.management");
final JMSMessagingProxy proxy = new JMSMessagingProxy(session, managementQueue, ResourceNames.JMS_SERVER);
return new JMSServerControl()
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java 2010-01-29 13:43:32 UTC (rev 8858)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java 2010-01-29 18:51:42 UTC (rev 8859)
@@ -30,7 +30,7 @@
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
-import org.hornetq.jms.client.HornetQTopic;
+import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.tests.integration.management.ManagementControlHelper;
import org.hornetq.tests.integration.management.ManagementTestBase;
@@ -61,7 +61,7 @@
private String subscriptionName;
- protected HornetQTopic topic;
+ protected HornetQDestination topic;
// Static --------------------------------------------------------
@@ -338,8 +338,8 @@
JMSUtil.sendMessages(topic, 3);
TopicControl topicControl = createManagementControl();
- Map<String, Object>[] messages = topicControl.listMessagesForSubscription(HornetQTopic.createQueueNameForDurableSubscription(clientID,
- subscriptionName));
+ Map<String, Object>[] messages = topicControl.listMessagesForSubscription(HornetQDestination.createQueueNameForDurableSubscription(clientID,
+ subscriptionName));
Assert.assertEquals(3, messages.length);
connection.close();
@@ -354,8 +354,8 @@
String[] ids = JMSUtil.sendMessages(topic, 3);
TopicControl topicControl = createManagementControl();
- String jsonString = topicControl.listMessagesForSubscriptionAsJSON(HornetQTopic.createQueueNameForDurableSubscription(clientID,
- subscriptionName));
+ String jsonString = topicControl.listMessagesForSubscriptionAsJSON(HornetQDestination.createQueueNameForDurableSubscription(clientID,
+ subscriptionName));
Assert.assertNotNull(jsonString);
JSONArray array = new JSONArray(jsonString);
Assert.assertEquals(3, array.length());
@@ -375,8 +375,8 @@
try
{
- topicControl.listMessagesForSubscription(HornetQTopic.createQueueNameForDurableSubscription(unknownClientID,
- subscriptionName));
+ topicControl.listMessagesForSubscription(HornetQDestination.createQueueNameForDurableSubscription(unknownClientID,
+ subscriptionName));
Assert.fail();
}
catch (Exception e)
@@ -392,8 +392,8 @@
try
{
- topicControl.listMessagesForSubscription(HornetQTopic.createQueueNameForDurableSubscription(clientID,
- unknownSubscription));
+ topicControl.listMessagesForSubscription(HornetQDestination.createQueueNameForDurableSubscription(clientID,
+ unknownSubscription));
Assert.fail();
}
catch (Exception e)
@@ -428,7 +428,7 @@
String topicName = RandomUtil.randomString();
serverManager.createTopic(topicName, topicName);
- topic = (HornetQTopic) HornetQJMSClient.createTopic(topicName);
+ topic = (HornetQDestination)HornetQJMSClient.createTopic(topicName);
}
@Override
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java 2010-01-29 13:43:32 UTC (rev 8858)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java 2010-01-29 18:51:42 UTC (rev 8859)
@@ -30,8 +30,7 @@
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
import org.hornetq.jms.client.HornetQConnectionFactory;
-import org.hornetq.jms.client.HornetQQueue;
-import org.hornetq.jms.client.HornetQTopic;
+import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.tests.integration.management.ManagementTestBase;
import org.hornetq.tests.util.RandomUtil;
@@ -51,7 +50,7 @@
private String subscriptionName;
- protected HornetQTopic topic;
+ protected HornetQDestination topic;
protected JMSMessagingProxy proxy;
@@ -288,7 +287,7 @@
JMSUtil.sendMessages(topic, 3);
Object[] data = (Object[])proxy.invokeOperation("listMessagesForSubscription",
- HornetQTopic.createQueueNameForDurableSubscription(clientID,
+ HornetQDestination.createQueueNameForDurableSubscription(clientID,
subscriptionName));
Assert.assertEquals(3, data.length);
@@ -302,7 +301,7 @@
try
{
proxy.invokeOperation("listMessagesForSubscription",
- HornetQTopic.createQueueNameForDurableSubscription(unknownClientID, subscriptionName));
+ HornetQDestination.createQueueNameForDurableSubscription(unknownClientID, subscriptionName));
Assert.fail();
}
catch (Exception e)
@@ -317,7 +316,7 @@
try
{
proxy.invokeOperation("listMessagesForSubscription",
- HornetQTopic.createQueueNameForDurableSubscription(clientID, unknownSubscription));
+ HornetQDestination.createQueueNameForDurableSubscription(clientID, unknownSubscription));
Assert.fail();
}
catch (Exception e)
@@ -352,14 +351,14 @@
String topicName = RandomUtil.randomString();
serverManager.createTopic(topicName, topicName);
- topic = (HornetQTopic)HornetQJMSClient.createTopic(topicName);
+ topic = (HornetQDestination)HornetQJMSClient.createTopic(topicName);
HornetQConnectionFactory cf = (HornetQConnectionFactory)HornetQJMSClient.createConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
connection = cf.createQueueConnection();
session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
- HornetQQueue managementQueue = (HornetQQueue)HornetQJMSClient.createQueue("hornetq.management");
+ HornetQDestination managementQueue = (HornetQDestination)HornetQJMSClient.createQueue("hornetq.management");
proxy = new JMSMessagingProxy(session, managementQueue, ResourceNames.JMS_TOPIC + topic.getTopicName());
}
Modified: trunk/tests/src/org/hornetq/tests/unit/jms/HornetQDestinationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/jms/HornetQDestinationTest.java 2010-01-29 13:43:32 UTC (rev 8858)
+++ trunk/tests/src/org/hornetq/tests/unit/jms/HornetQDestinationTest.java 2010-01-29 18:51:42 UTC (rev 8859)
@@ -14,18 +14,11 @@
package org.hornetq.tests.unit.jms;
import javax.jms.Queue;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
import javax.jms.Topic;
import junit.framework.Assert;
-import org.hornetq.jms.*;
import org.hornetq.jms.client.HornetQDestination;
-import org.hornetq.jms.client.HornetQQueue;
-import org.hornetq.jms.client.HornetQTemporaryQueue;
-import org.hornetq.jms.client.HornetQTemporaryTopic;
-import org.hornetq.jms.client.HornetQTopic;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
@@ -50,7 +43,7 @@
public void testEquals() throws Exception
{
String destinationName = RandomUtil.randomString();
- String address = HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX + destinationName;
+ String address = HornetQDestination.JMS_QUEUE_ADDRESS_PREFIX + destinationName;
HornetQDestination destination = (HornetQDestination) HornetQDestination.fromAddress(address);
HornetQDestination sameDestination = (HornetQDestination) HornetQDestination.fromAddress(address);
HornetQDestination differentDestination = (HornetQDestination) HornetQDestination.fromAddress(address + RandomUtil.randomString());
@@ -64,7 +57,7 @@
public void testFromAddressWithQueueAddressPrefix() throws Exception
{
String destinationName = RandomUtil.randomString();
- String address = HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX + destinationName;
+ String address = HornetQDestination.JMS_QUEUE_ADDRESS_PREFIX + destinationName;
HornetQDestination destination = (HornetQDestination) HornetQDestination.fromAddress(address);
Assert.assertTrue(destination instanceof Queue);
Assert.assertEquals(destinationName, ((Queue)destination).getQueueName());
@@ -73,30 +66,12 @@
public void testFromAddressWithTopicAddressPrefix() throws Exception
{
String destinationName = RandomUtil.randomString();
- String address = HornetQTopic.JMS_TOPIC_ADDRESS_PREFIX + destinationName;
+ String address = HornetQDestination.JMS_TOPIC_ADDRESS_PREFIX + destinationName;
HornetQDestination destination = (HornetQDestination) HornetQDestination.fromAddress(address);
Assert.assertTrue(destination instanceof Topic);
Assert.assertEquals(destinationName, ((Topic)destination).getTopicName());
}
-
- public void testFromAddressWithTemporaryQueueAddressPrefix() throws Exception
- {
- String destinationName = RandomUtil.randomString();
- String address = HornetQTemporaryQueue.JMS_TEMP_QUEUE_ADDRESS_PREFIX + destinationName;
- HornetQDestination destination = (HornetQDestination) HornetQDestination.fromAddress(address);
- Assert.assertTrue(destination instanceof TemporaryQueue);
- Assert.assertEquals(destinationName, ((TemporaryQueue)destination).getQueueName());
- }
-
- public void testFromAddressWithTemporaryTopicAddressPrefix() throws Exception
- {
- String destinationName = RandomUtil.randomString();
- String address = HornetQTemporaryTopic.JMS_TEMP_TOPIC_ADDRESS_PREFIX + destinationName;
- HornetQDestination destination = (HornetQDestination) HornetQDestination.fromAddress(address);
- Assert.assertTrue(destination instanceof TemporaryTopic);
- Assert.assertEquals(destinationName, ((TemporaryTopic)destination).getTopicName());
- }
-
+
public void testFromAddressWithInvalidPrefix() throws Exception
{
String invalidPrefix = "junk";
Deleted: trunk/tests/src/org/hornetq/tests/unit/jms/HornetQQueueTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/jms/HornetQQueueTest.java 2010-01-29 13:43:32 UTC (rev 8858)
+++ trunk/tests/src/org/hornetq/tests/unit/jms/HornetQQueueTest.java 2010-01-29 18:51:42 UTC (rev 8859)
@@ -1,61 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.unit.jms;
-
-import junit.framework.Assert;
-
-import org.hornetq.api.jms.HornetQJMSClient;
-import org.hornetq.jms.client.HornetQQueue;
-import org.hornetq.tests.util.RandomUtil;
-import org.hornetq.tests.util.UnitTestCase;
-
-/**
- * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
- *
- * @version <tt>$Revision$</tt>
- *
- */
-public class HornetQQueueTest extends UnitTestCase
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testIsTemporary() throws Exception
- {
- HornetQQueue queue = (HornetQQueue) HornetQJMSClient.createQueue(RandomUtil.randomString());
- Assert.assertFalse(queue.isTemporary());
- }
-
- public void testGetQueueName() throws Exception
- {
- String queueName = RandomUtil.randomString();
- HornetQQueue queue = (HornetQQueue) HornetQJMSClient.createQueue(queueName);
- Assert.assertEquals(queueName, queue.getQueueName());
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Deleted: trunk/tests/src/org/hornetq/tests/unit/jms/HornetQTopicTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/jms/HornetQTopicTest.java 2010-01-29 13:43:32 UTC (rev 8858)
+++ trunk/tests/src/org/hornetq/tests/unit/jms/HornetQTopicTest.java 2010-01-29 18:51:42 UTC (rev 8859)
@@ -1,97 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.unit.jms;
-
-import junit.framework.Assert;
-
-import org.hornetq.api.core.Pair;
-import org.hornetq.api.jms.HornetQJMSClient;
-import org.hornetq.jms.client.HornetQTopic;
-import org.hornetq.tests.util.RandomUtil;
-import org.hornetq.tests.util.UnitTestCase;
-
-/**
- * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
- *
- * @version <tt>$Revision$</tt>
- *
- */
-public class HornetQTopicTest extends UnitTestCase
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testIsTemporary() throws Exception
- {
- HornetQTopic topic = (HornetQTopic) HornetQJMSClient.createTopic(RandomUtil.randomString());
- Assert.assertFalse(topic.isTemporary());
- }
-
- public void testGetTopicName() throws Exception
- {
- String topicName = RandomUtil.randomString();
- HornetQTopic queue = (HornetQTopic) HornetQJMSClient.createTopic(topicName);
- Assert.assertEquals(topicName, queue.getTopicName());
- }
-
- public void testDecomposeQueueNameForDurableSubscription() throws Exception
- {
- String clientID = RandomUtil.randomString();
- String subscriptionName = RandomUtil.randomString();
-
- Pair<String, String> pair = HornetQTopic.decomposeQueueNameForDurableSubscription(clientID + '.' +
- subscriptionName);
- Assert.assertEquals(clientID, pair.a);
- Assert.assertEquals(subscriptionName, pair.b);
- }
-
- public void testdDecomposeQueueNameForDurableSubscriptionWithInvalidQueueName() throws Exception
- {
- try
- {
- HornetQTopic.decomposeQueueNameForDurableSubscription("queueNameHasNoDot");
- Assert.fail("IllegalArgumentException");
- }
- catch (IllegalArgumentException e)
- {
- }
- }
-
- public void testdDcomposeQueueNameForDurableSubscriptionWithInvalidQueueName_2() throws Exception
- {
- try
- {
- HornetQTopic.decomposeQueueNameForDurableSubscription("queueName.HasTooMany.Dots");
- Assert.fail("IllegalArgumentException");
- }
- catch (IllegalArgumentException e)
- {
- }
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Modified: trunk/tests/src/org/hornetq/tests/unit/jms/referenceable/DestinationObjectFactoryTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/jms/referenceable/DestinationObjectFactoryTest.java 2010-01-29 13:43:32 UTC (rev 8858)
+++ trunk/tests/src/org/hornetq/tests/unit/jms/referenceable/DestinationObjectFactoryTest.java 2010-01-29 18:51:42 UTC (rev 8859)
@@ -19,7 +19,6 @@
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.jms.client.HornetQDestination;
-import org.hornetq.jms.client.HornetQQueue;
import org.hornetq.jms.referenceable.DestinationObjectFactory;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
@@ -44,7 +43,7 @@
public void testReference() throws Exception
{
- HornetQDestination queue = (HornetQQueue) HornetQJMSClient.createQueue(RandomUtil.randomString());
+ HornetQDestination queue = (HornetQDestination) HornetQJMSClient.createQueue(RandomUtil.randomString());
Reference reference = queue.getReference();
DestinationObjectFactory factory = new DestinationObjectFactory();
Modified: trunk/tests/src/org/hornetq/tests/unit/ra/ResourceAdapterTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/ra/ResourceAdapterTest.java 2010-01-29 13:43:32 UTC (rev 8858)
+++ trunk/tests/src/org/hornetq/tests/unit/ra/ResourceAdapterTest.java 2010-01-29 18:51:42 UTC (rev 8859)
@@ -35,11 +35,11 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.jms.HornetQJMSClient;
-import org.hornetq.jms.client.HornetQConnectionFactory;
-import org.hornetq.jms.client.HornetQQueue;
import org.hornetq.core.remoting.impl.invm.InVMConnector;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.integration.transports.netty.NettyConnector;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.ra.ConnectionFactoryProperties;
import org.hornetq.ra.HornetQRAManagedConnectionFactory;
import org.hornetq.ra.HornetQResourceAdapter;
@@ -387,7 +387,7 @@
ClientSessionFactory factory = createInVMFactory();
ClientSession session = factory.createSession(false, false, false);
- HornetQQueue queue = (HornetQQueue) HornetQJMSClient.createQueue("test");
+ HornetQDestination queue = (HornetQDestination) HornetQJMSClient.createQueue("test");
session.createQueue(queue.getSimpleAddress(), queue.getSimpleAddress(), true);
session.close();
Modified: trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java 2010-01-29 13:43:32 UTC (rev 8858)
+++ trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java 2010-01-29 18:51:42 UTC (rev 8859)
@@ -18,6 +18,7 @@
import javax.jms.ConnectionFactory;
import javax.jms.Queue;
+import javax.jms.Topic;
import javax.naming.NamingException;
import org.hornetq.api.core.Pair;
@@ -88,7 +89,14 @@
return (Queue)context.lookup("/jms/" + name);
}
+
+ protected Topic createTopic(final String name) throws Exception, NamingException
+ {
+ jmsServer.createTopic(name, "/jms/" + name);
+ return (Topic)context.lookup("/jms/" + name);
+ }
+
@Override
protected void setUp() throws Exception
{
14 years, 11 months
JBoss hornetq SVN: r8858 - in trunk: src/main/org/hornetq/core/protocol/stomp and 1 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-01-29 08:43:32 -0500 (Fri, 29 Jan 2010)
New Revision: 8858
Modified:
trunk/docs/user-manual/en/examples.xml
trunk/docs/user-manual/en/interoperability.xml
trunk/docs/user-manual/en/messaging-concepts.xml
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* add documentation
* replace sysout calls by log.trace
Modified: trunk/docs/user-manual/en/examples.xml
===================================================================
--- trunk/docs/user-manual/en/examples.xml 2010-01-28 15:16:36 UTC (rev 8857)
+++ trunk/docs/user-manual/en/examples.xml 2010-01-29 13:43:32 UTC (rev 8858)
@@ -416,6 +416,11 @@
HornetQ queue with static message selectors (filters) using JMS.</para>
</section>
<section>
+ <title>Stomp</title>
+ <para>The <literal>stomp</literal> example shows you how to configure a
+ HornetQ server to send and receive Stomp messages.</para>
+ </section>
+ <section>
<title>Symmetric Cluster</title>
<para>The <literal>symmetric-cluster</literal> example demonstrates a symmetric cluster
set-up with HornetQ.</para>
Modified: trunk/docs/user-manual/en/interoperability.xml
===================================================================
--- trunk/docs/user-manual/en/interoperability.xml 2010-01-28 15:16:36 UTC (rev 8857)
+++ trunk/docs/user-manual/en/interoperability.xml 2010-01-29 13:43:32 UTC (rev 8858)
@@ -18,28 +18,58 @@
<!-- ============================================================================= -->
<chapter id="interoperability">
<title>Interoperability</title>
- <section>
- <title>Stomp and StompConnect</title>
- <para><ulink url="http://stomp.codehaus.org/">Stomp</ulink> is a wire protocol that allows
- Stomp clients to communicate with Stomp Brokers. <ulink
- url="http://stomp.codehaus.org/StompConnect">StompConnect</ulink> is a server that
+ <section id="stomp">
+ <title>Stomp</title>
+ <para><ulink url="http://stomp.codehaus.org/">Stomp</ulink> is a text-orientated wire protocol that allows
+ Stomp clients to communicate with Stomp Brokers.</para>
+ <para><ulink url="http://stomp.codehaus.org/Clients">Stomp clients</ulink> are available for
+ several languages and platforms making it a good choice for interoperability.</para>
+ <section id="stomp.native">
+ <title>Native Stomp support</title>
+ <para>HornetQ provides native support for Stomp. To be able to send and receive Stomp messages,
+ you must configure a <literal>NettyAcceptor</literal> with a <literal>protocol</literal>
+ parameter set to <literal>stomp</literal>:</para>
+<programlisting>
+ <acceptor name="stomp-acceptor">
+ <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+ <param key="protocol" value="stomp"/>
+ <param key="port" value="61613"/>
+ </acceptor>
+</programlisting>
+ <para>With this configuration, HornetQ will accept Stomp connections on
+ the port <literal>61613</literal> (which is the default port of the Stomp brokers).</para>
+ <para>See the <literal>stomp</literal> example which shows how to configure a HornetQ server with Stomp.</para>
+ <section>
+ <title>Limitations</title>
+ <para>Message acknowledgements are not transactional. The ACK frame can not be part of a transaction
+ (it will be ignored if its <literal>transaction</literal> header is set).</para>
+ </section>
+ <section>
+ <title>Destination Mapping</title>
+ <para>Stomp messages are sent and received by specifying "destinations".
+ If the Stomp destinations starts with <literal>/queue/</literal>, <literal>/topic/</literal>,
+ <literal>/temp-queue/</literal> or <literal>/temp-topic/</literal>, they will be mapped to corresponding
+ JMS Destinations. Ohterwise, they will be treated as regular HornetQ addresses (for sent messages) and
+ queues (for subscription and received messages).</para>
+ </section>
+ </section>
+ <section id="stompconnect">
+ <title>StompConnect</title>
+ <para><ulink url="http://stomp.codehaus.org/StompConnect">StompConnect</ulink> is a server that
can act as a Stomp broker and proxy the Stomp protocol to the standard JMS API.
Consequently, using StompConnect it is possible to turn HornetQ into a Stomp Broker and
use any of the available stomp clients. These include clients written in C, C++, c# and
.net etc.</para>
- <para>To run StompConnect first start the HornetQ server and make sure that it is using
+ <para>To run StompConnect first start the HornetQ server and make sure that it is using
JNDI.</para>
- <para>Stomp requires the file <literal>jndi.properties</literal> to be available on the
+ <para>Stomp requires the file <literal>jndi.properties</literal> to be available on the
classpath. This should look something like:</para>
- <programlisting>java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+ <programlisting>java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
java.naming.provider.url=jnp://localhost:1099
java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces</programlisting>
- <para>Make sure this file is in the classpath along with the StompConnect jar and the
+ <para>Make sure this file is in the classpath along with the StompConnect jar and the
HornetQ jars and simply run <literal>java org.codehaus.stomp.jms.Main</literal>.</para>
- <para>HornetQ will shortly be implementing the Stomp protocol directly, so you won't have to
- use StompConnect to be able to use HornetQ with Stomp clients.</para>
- <para>A list of STOMP clients is available <ulink url="http://stomp.codehaus.org/Clients"
- >here.</ulink></para>
+ </section>
</section>
<section>
<title>REST</title>
Modified: trunk/docs/user-manual/en/messaging-concepts.xml
===================================================================
--- trunk/docs/user-manual/en/messaging-concepts.xml 2010-01-28 15:16:36 UTC (rev 8857)
+++ trunk/docs/user-manual/en/messaging-concepts.xml 2010-01-29 13:43:32 UTC (rev 8858)
@@ -203,15 +203,14 @@
<title>STOMP</title>
<para><ulink
url="http://en.wikipedia.org/wiki/Streaming_Text_Orientated_Messaging_Protocol"
- >STOMP</ulink> is a very simple protocol for interoperating with messaging
- systems. It defines a wire format, so theoretically any STOMP client can work with
- any messaging system that supports STOMP. STOMP clients are available in many
+ >Stomp</ulink> is a very simple protocol for interoperating with messaging
+ systems. It defines a wire format, so theoretically any Stomp client can work with
+ any messaging system that supports Stomp. Stomp clients are available in many
different programming languages.</para>
- <para>HornetQ can be used by any STOMP client when using the <ulink
+ <para>HornetQ can be used by any Stomp client when using the <ulink
url="http://stomp.codehaus.org/StompConnect">StompConnect</ulink> broker which
translates the STOMP protocol to the JMS API.</para>
- <para>HornetQ will be shortly implementing the STOMP protocol on the broker, thus
- avoiding having to use StompConnect.</para>
+ <para>Please see <xref linkend="stomp"/> for using STOMP with HornetQ.</para>
</section>
<section>
<title>AMQP</title>
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-01-28 15:16:36 UTC (rev 8857)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-01-29 13:43:32 UTC (rev 8858)
@@ -126,8 +126,11 @@
try
{
request = marshaller.unmarshal(buffer);
- System.out.println("<<< " + request);
-
+ if (log.isTraceEnabled())
+ {
+ log.trace("received " + request);
+ }
+
String command = request.getCommand();
StompFrame response = null;
@@ -514,7 +517,10 @@
public int send(StompConnection connection, StompFrame frame)
{
- System.out.println(">>> " + frame);
+ if (log.isTraceEnabled())
+ {
+ log.trace("sent " + frame);
+ }
synchronized (connection)
{
if (connection.isDestroyed() || !connection.isValid())
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-01-28 15:16:36 UTC (rev 8857)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-01-29 13:43:32 UTC (rev 8858)
@@ -80,6 +80,7 @@
headers.put(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
}
byte[] data = new byte[] {};
+ serverMessage.getBodyBuffer().markReaderIndex();
if (serverMessage.getType() == Message.TEXT_TYPE)
{
SimpleString text = serverMessage.getBodyBuffer().readNullableSimpleString();
@@ -97,6 +98,7 @@
buffer.readBytes(data);
headers.put(Headers.CONTENT_LENGTH, data.length);
}
+ serverMessage.getBodyBuffer().resetReaderIndex();
StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-28 15:16:36 UTC (rev 8857)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-29 13:43:32 UTC (rev 8858)
@@ -769,6 +769,7 @@
frame = receiveFrame(10000);
Assert.assertTrue(frame.startsWith("MESSAGE"));
+ System.out.println(frame);
Assert.assertTrue(frame.contains("shouldBeNextMessage"));
}
@@ -801,12 +802,12 @@
frame =
"UNSUBSCRIBE\n" +
"destination:/queue/" + getQueueName() + "\n" +
+ "receipt:567\n" +
"\n\n" +
Stomp.NULL;
sendFrame(frame);
+ waitForReceipt();
- waitForFrameToTakeEffect();
-
//send a message to our queue
sendMessage("second message");
@@ -850,12 +851,12 @@
frame =
"UNSUBSCRIBE\n" +
"id:mysubid\n" +
+ "receipt: 345\n" +
"\n\n" +
Stomp.NULL;
sendFrame(frame);
+ waitForReceipt();
- waitForFrameToTakeEffect();
-
//send a message to our queue
sendMessage("second message");
@@ -893,24 +894,25 @@
"SEND\n" +
"destination:/queue/" + getQueueName() + "\n" +
"transaction: tx1\n" +
+ "receipt: 123\n" +
"\n\n" +
"Hello World" +
Stomp.NULL;
sendFrame(frame);
-
- waitForFrameToTakeEffect();
+ waitForReceipt();
+
// check the message is not committed
assertNull(consumer.receive(100));
frame =
"COMMIT\n" +
"transaction: tx1\n" +
+ "receipt:456\n" +
"\n\n" +
Stomp.NULL;
sendFrame(frame);
+ waitForReceipt();
- waitForFrameToTakeEffect();
-
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull("Should have received a message", message);
}
@@ -1070,13 +1072,12 @@
frame =
"COMMIT\n" +
"transaction: tx1\n" +
+ "receipt:789\n" +
"\n\n" +
Stomp.NULL;
sendFrame(frame);
+ waitForReceipt();
- // This test case is currently failing
- waitForFrameToTakeEffect();
-
//only second msg should be received since first msg was rolled back
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
@@ -1366,6 +1367,12 @@
producer.send(message);
}
+ protected void waitForReceipt() throws Exception {
+ String frame = receiveFrame(50000);
+ assertNotNull(frame);
+ assertTrue(frame.indexOf("RECEIPT") > -1);
+ }
+
protected void waitForFrameToTakeEffect() throws InterruptedException {
// bit of a dirty hack :)
// another option would be to force some kind of receipt to be returned
14 years, 11 months
JBoss hornetq SVN: r8857 - trunk/tests/src/org/hornetq/tests/integration/stomp.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-01-28 10:16:36 -0500 (Thu, 28 Jan 2010)
New Revision: 8857
Modified:
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* test fixes
Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-28 14:36:24 UTC (rev 8856)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-28 15:16:36 UTC (rev 8857)
@@ -1098,16 +1098,20 @@
frame =
"SUBSCRIBE\n" +
"destination:/topic/" + getTopicName() + "\n" +
+ "receipt: 12\n" +
"\n\n" +
Stomp.NULL;
sendFrame(frame);
+ // wait for SUBSCRIBE's receipt
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("RECEIPT"));
sendMessage(getName(), topic);
frame = receiveFrame(10000);
Assert.assertTrue(frame.startsWith("MESSAGE"));
Assert.assertTrue(frame.indexOf("destination:") > 0);
- Assert.assertTrue(frame.indexOf(getTopicName()) > 0);
+ Assert.assertTrue(frame.indexOf(getName()) > 0);
frame =
"UNSUBSCRIBE\n" +
14 years, 11 months
JBoss hornetq SVN: r8856 - in trunk: examples/jms and 8 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-01-28 09:36:24 -0500 (Thu, 28 Jan 2010)
New Revision: 8856
Added:
trunk/examples/jms/stomp/
trunk/examples/jms/stomp/build.bat
trunk/examples/jms/stomp/build.sh
trunk/examples/jms/stomp/build.xml
trunk/examples/jms/stomp/readme.html
trunk/examples/jms/stomp/server0/
trunk/examples/jms/stomp/server0/client-jndi.properties
trunk/examples/jms/stomp/server0/hornetq-beans.xml
trunk/examples/jms/stomp/server0/hornetq-configuration.xml
trunk/examples/jms/stomp/server0/hornetq-jms.xml
trunk/examples/jms/stomp/server0/hornetq-users.xml
trunk/examples/jms/stomp/src/
trunk/examples/jms/stomp/src/org/
trunk/examples/jms/stomp/src/org/hornetq/
trunk/examples/jms/stomp/src/org/hornetq/jms/
trunk/examples/jms/stomp/src/org/hornetq/jms/example/
trunk/examples/jms/stomp/src/org/hornetq/jms/example/StompExample.java
Modified:
trunk/.classpath
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* add jms/stomp example
Modified: trunk/.classpath
===================================================================
--- trunk/.classpath 2010-01-28 13:27:55 UTC (rev 8855)
+++ trunk/.classpath 2010-01-28 14:36:24 UTC (rev 8856)
@@ -66,6 +66,7 @@
<classpathentry kind="src" path="examples/jms/ssl-enabled/src"/>
<classpathentry kind="src" path="examples/jms/static-selector/src"/>
<classpathentry kind="src" path="examples/jms/static-selector-jms/src"/>
+ <classpathentry kind="src" path="examples/jms/stomp/src"/>
<classpathentry kind="src" path="examples/jms/symmetric-cluster/src"/>
<classpathentry kind="src" path="examples/jms/temp-queue/src"/>
<classpathentry kind="src" path="examples/jms/topic/src"/>
Added: trunk/examples/jms/stomp/build.bat
===================================================================
--- trunk/examples/jms/stomp/build.bat (rev 0)
+++ trunk/examples/jms/stomp/build.bat 2010-01-28 14:36:24 UTC (rev 8856)
@@ -0,0 +1,13 @@
+@echo off
+
+set "OVERRIDE_ANT_HOME=..\..\..\tools\ant"
+
+if exist "..\..\..\src\bin\build.bat" (
+ rem running from TRUNK
+ call ..\..\..\src\bin\build.bat %*
+) else (
+ rem running from the distro
+ call ..\..\..\bin\build.bat %*
+)
+
+set "OVERRIDE_ANT_HOME="
Added: trunk/examples/jms/stomp/build.sh
===================================================================
--- trunk/examples/jms/stomp/build.sh (rev 0)
+++ trunk/examples/jms/stomp/build.sh 2010-01-28 14:36:24 UTC (rev 8856)
@@ -0,0 +1,15 @@
+#!/bin/sh
+
+OVERRIDE_ANT_HOME=../../../tools/ant
+export OVERRIDE_ANT_HOME
+
+if [ -f "../../../src/bin/build.sh" ]; then
+ # running from TRUNK
+ ../../../src/bin/build.sh "$@"
+else
+ # running from the distro
+ ../../../bin/build.sh "$@"
+fi
+
+
+
Property changes on: trunk/examples/jms/stomp/build.sh
___________________________________________________________________
Name: svn:executable
+ *
Added: trunk/examples/jms/stomp/build.xml
===================================================================
--- trunk/examples/jms/stomp/build.xml (rev 0)
+++ trunk/examples/jms/stomp/build.xml 2010-01-28 14:36:24 UTC (rev 8856)
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE project [
+ <!ENTITY libraries SYSTEM "../../../thirdparty/libraries.ent">
+ ]>
+<!--
+ ~ Copyright 2009 Red Hat, Inc.
+ ~ Red Hat licenses this file to you under the Apache License, version
+ ~ 2.0 (the "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ ~ implied. See the License for the specific language governing
+ ~ permissions and limitations under the License.
+ -->
+<project default="run" name="HornetQ Stomp Example">
+
+ <import file="../../common/build.xml"/>
+
+ <target name="run">
+ <antcall target="runExample">
+ <param name="example.classname" value="org.hornetq.jms.example.StompExample"/>
+ </antcall>
+ </target>
+
+ <target name="runRemote">
+ <antcall target="runExample">
+ <param name="example.classname" value="org.hornetq.jms.example.StompExample"/>
+ <param name="hornetq.example.runServer" value="false"/>
+ </antcall>
+ </target>
+
+</project>
\ No newline at end of file
Added: trunk/examples/jms/stomp/readme.html
===================================================================
--- trunk/examples/jms/stomp/readme.html (rev 0)
+++ trunk/examples/jms/stomp/readme.html 2010-01-28 14:36:24 UTC (rev 8856)
@@ -0,0 +1,111 @@
+<html>
+ <head>
+ <title>HornetQ Stomp Example</title>
+ <link rel="stylesheet" type="text/css" href="../../common/common.css" />
+ <link rel="stylesheet" type="text/css" href="../../common/prettify.css" />
+ <script type="text/javascript" src="../../common/prettify.js"></script>
+ </head>
+ <body onload="prettyPrint()">
+ <h1>Stomp Example</h1>
+
+ <p>This example shows you how to configure HornetQ to send and receive Stomp messages.</p>
+ <p>The example will start a HornetQ server configured with Stomp and JMS.</p>
+ <p>The client will open a socket to send one Stomp message (using TCP directly).
+ The client will then consume a message from a JMS Queue and check it is the message sent with Stomp.</p>
+
+ <h2>Example step-by-step</h2>
+ <p><i>To run the example, simply type <code>./build.sh</code> (or <code>build.bat</code> on windows) from this directory</i></p>
+
+ <ol>
+ <li>We create a TCP socket to connect to the Stomp port
+ <pre class="prettyprint">
+ Socket socket = new Socket("localhost", 61613);
+ </pre>
+
+ <li>We send a CONNECT frame to connect to the server</li>
+ <pre class="prettyprint">
+ String connectFrame = "CONNECT\n" +
+ "login: guest\n" +
+ "passcode: guest\n" +
+ "request-id: 1\n" +
+ "\n" +
+ Stomp.NULL;
+ sendFrame(socket, connectFrame);
+ </pre>
+
+ <li>We send a SEND frame (a Stomp message) to the queue <code>/queue/exampleQueue</code> with a text body</li>
+ <pre class="prettyprint">
+ String text = "Hello, world from Stomp!";
+ String message = "SEND\n" +
+ "destination: /queue/exampleQueue\n" +
+ "\n" +
+ text +
+ Stomp.NULL;
+ sendFrame(socket, message);
+ System.out.println("Sent Stomp message: " + text);
+ </pre>
+
+ <li>We send a DISCONNECT frame to disconnect from the server</li>
+ <pre class="prettyprint">
+ String disconnectFrame = "DISCONNECT\n" +
+ "\n" +
+ Stomp.NULL;
+ sendFrame(socket, disconnectFrame);
+ </pre>
+
+ <li>We close the TCP socket</li>
+ <pre class="prettyprint">
+ socket.close();
+ </pre>
+
+ <li>We create an initial context to perform the JNDI lookup.</li>
+ <pre class="prettyprint">
+ initialContext = getContext(0);
+ </pre>
+
+ <li>We perform a lookup on the queue and the connection factory</li>
+ <pre class="prettyprint">
+ Queue queue = (Queue)initialContext.lookup("/queue/exampleQueue");
+ ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
+ </pre>
+
+ <li>We create a JMS Connection, Session and a MessageConsumer on the queue</li>
+ <pre class="prettyprint">
+ connection = cf.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(queue);
+ </pre>
+
+ <li>We start the connection</li>
+ <pre class="prettyprint">
+ <code>connection.start();</code>
+ </pre>
+
+ <li>We receive the message. Since the Stomp message contained a text body,
+ the corresponding JMS Message is a TextMessage</li>
+ <pre class="prettyprint">
+ TextMessage messageReceived = (TextMessage)consumer.receive(5000);
+ System.out.println("Received JMS message: " + messageReceived.getText());
+ </pre>
+
+ <li>And finally, <b>always</b> remember to close your JMS connections and resources after use, in a <code>finally</code> block. Closing a JMS connection will automatically close all of its sessions, consumers, producer and browser objects</li>
+
+ <pre class="prettyprint">
+ <code>finally
+ {
+ if (initialContext != null)
+ {
+ initialContext.close();
+ }
+ if (connection != null)
+ {
+ connection.close();
+ }
+ }</code>
+ </pre>
+
+
+
+ </ol>
+ </body>
+</html>
\ No newline at end of file
Added: trunk/examples/jms/stomp/server0/client-jndi.properties
===================================================================
--- trunk/examples/jms/stomp/server0/client-jndi.properties (rev 0)
+++ trunk/examples/jms/stomp/server0/client-jndi.properties 2010-01-28 14:36:24 UTC (rev 8856)
@@ -0,0 +1,3 @@
+java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+java.naming.provider.url=jnp://localhost:1099
+java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
Added: trunk/examples/jms/stomp/server0/hornetq-beans.xml
===================================================================
--- trunk/examples/jms/stomp/server0/hornetq-beans.xml (rev 0)
+++ trunk/examples/jms/stomp/server0/hornetq-beans.xml 2010-01-28 14:36:24 UTC (rev 8856)
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<deployment xmlns="urn:jboss:bean-deployer:2.0">
+
+ <bean name="Naming" class="org.jnp.server.NamingBeanImpl"/>
+
+ <!-- JNDI server. Disable this if you don't want JNDI -->
+ <bean name="JNDIServer" class="org.jnp.server.Main">
+ <property name="namingInfo">
+ <inject bean="Naming"/>
+ </property>
+ <property name="port">1099</property>
+ <property name="bindAddress">localhost</property>
+ <property name="rmiPort">1098</property>
+ <property name="rmiBindAddress">localhost</property>
+ </bean>
+
+ <!-- MBean server -->
+ <bean name="MBeanServer" class="javax.management.MBeanServer">
+ <constructor factoryClass="java.lang.management.ManagementFactory"
+ factoryMethod="getPlatformMBeanServer"/>
+ </bean>
+
+ <!-- The core configuration -->
+ <bean name="Configuration" class="org.hornetq.core.config.impl.FileConfiguration"/>
+
+ <!-- The security manager -->
+ <bean name="HornetQSecurityManager" class="org.hornetq.spi.core.security.HornetQSecurityManagerImpl">
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The core server -->
+ <bean name="HornetQServer" class="org.hornetq.core.server.impl.HornetQServerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="Configuration"/>
+ </parameter>
+ <parameter>
+ <inject bean="MBeanServer"/>
+ </parameter>
+ <parameter>
+ <inject bean="HornetQSecurityManager"/>
+ </parameter>
+ </constructor>
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The JMS server -->
+ <bean name="JMSServerManager" class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="HornetQServer"/>
+ </parameter>
+ </constructor>
+ </bean>
+
+</deployment>
Added: trunk/examples/jms/stomp/server0/hornetq-configuration.xml
===================================================================
--- trunk/examples/jms/stomp/server0/hornetq-configuration.xml (rev 0)
+++ trunk/examples/jms/stomp/server0/hornetq-configuration.xml 2010-01-28 14:36:24 UTC (rev 8856)
@@ -0,0 +1,42 @@
+<configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
+
+ <!-- Connectors -->
+
+ <connectors>
+ <connector name="netty-connector">
+ <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+ </connector>
+ </connectors>
+
+ <!-- Acceptors -->
+ <acceptors>
+ <!-- a regular Netty acceptor used by the JMS client -->
+ <acceptor name="netty-acceptor">
+ <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+ </acceptor>
+ <!-- the stomp-acceptor is configured for the Stomp protocol and -->
+ <!-- will listen on port 61613 (default Stomp port) -->
+ <acceptor name="stomp-acceptor">
+ <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+ <param key="protocol" value="stomp" />
+ <param key="port" value="61613" />
+ </acceptor>
+ </acceptors>
+
+ <!-- Other config -->
+
+ <security-settings>
+ <!--security for example queue-->
+ <security-setting match="jms.queue.exampleQueue">
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createTempQueue" roles="guest"/>
+ <permission type="deleteTempQueue" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ </security-setting>
+ </security-settings>
+
+</configuration>
Added: trunk/examples/jms/stomp/server0/hornetq-jms.xml
===================================================================
--- trunk/examples/jms/stomp/server0/hornetq-jms.xml (rev 0)
+++ trunk/examples/jms/stomp/server0/hornetq-jms.xml 2010-01-28 14:36:24 UTC (rev 8856)
@@ -0,0 +1,19 @@
+<configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
+ <!--the connection factory used by the example-->
+ <connection-factory name="ConnectionFactory">
+ <connectors>
+ <connector-ref connector-name="netty-connector"/>
+ </connectors>
+ <entries>
+ <entry name="ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
+ <!--the queue used by the example-->
+ <queue name="exampleQueue">
+ <entry name="/queue/exampleQueue"/>
+ </queue>
+
+</configuration>
Added: trunk/examples/jms/stomp/server0/hornetq-users.xml
===================================================================
--- trunk/examples/jms/stomp/server0/hornetq-users.xml (rev 0)
+++ trunk/examples/jms/stomp/server0/hornetq-users.xml 2010-01-28 14:36:24 UTC (rev 8856)
@@ -0,0 +1,7 @@
+<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
+ <!-- the default user. this is used where username is null-->
+ <defaultuser name="guest" password="guest">
+ <role name="guest"/>
+ </defaultuser>
+</configuration>
\ No newline at end of file
Added: trunk/examples/jms/stomp/src/org/hornetq/jms/example/StompExample.java
===================================================================
--- trunk/examples/jms/stomp/src/org/hornetq/jms/example/StompExample.java (rev 0)
+++ trunk/examples/jms/stomp/src/org/hornetq/jms/example/StompExample.java 2010-01-28 14:36:24 UTC (rev 8856)
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.jms.example;
+
+import java.io.OutputStream;
+import java.net.Socket;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.naming.InitialContext;
+
+import org.hornetq.common.example.HornetQExample;
+
+/**
+ * An example where a client will send a Stomp message on a TCP socket
+ * and consume it from a JMS MessageConsumer.
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ */
+public class StompExample extends HornetQExample
+{
+ private static final String END_OF_FRAME = "\u0000";
+
+ public static void main(final String[] args)
+ {
+ new StompExample().run(args);
+ }
+
+ @Override
+ public boolean runExample() throws Exception
+ {
+ Connection connection = null;
+ InitialContext initialContext = null;
+
+ try
+ {
+ // Step 1. Create a TCP socket to connect to the Stomp port
+ Socket socket = new Socket("localhost", 61613);
+
+ // Step 2. Send a CONNECT frame to connect to the server
+ String connectFrame = "CONNECT\n" +
+ "login: guest\n" +
+ "passcode: guest\n" +
+ "request-id: 1\n" +
+ "\n" +
+ END_OF_FRAME;
+ sendFrame(socket, connectFrame);
+
+ // Step 3. Send a SEND frame (a Stomp message) to the
+ // queue /queue/exampleQueue with a text body
+ String text = "Hello, world from Stomp!";
+ String message = "SEND\n" +
+ "destination: /queue/exampleQueue\n" +
+ "\n" +
+ text +
+ END_OF_FRAME;
+ sendFrame(socket, message);
+ System.out.println("Sent Stomp message: " + text);
+
+ // Step 4. Send a DISCONNECT frame to disconnect from the server
+ String disconnectFrame = "DISCONNECT\n" +
+ "\n" +
+ END_OF_FRAME;
+ sendFrame(socket, disconnectFrame);
+
+ // Step 5. Slose the TCP socket
+ socket.close();
+
+ // We will now consume from JMS the message sent with Stomp.
+
+ // Step 6. Create an initial context to perform the JNDI lookup.
+ initialContext = getContext(0);
+
+ // Step 7. Perform a lookup on the queue and the connection factory
+ Queue queue = (Queue)initialContext.lookup("/queue/exampleQueue");
+ ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
+
+ // Step 8.Create a JMS Connection, Session and a MessageConsumer on the queue
+ connection = cf.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ // Step 9. Start the Connection
+ connection.start();
+
+ // Step 10. Receive the message
+ TextMessage messageReceived = (TextMessage)consumer.receive(5000);
+ System.out.println("Received JMS message: " + messageReceived.getText());
+
+ return true;
+ }
+ finally
+ {
+ // Step 11. Be sure to close our JMS resources!
+ if (initialContext != null)
+ {
+ initialContext.close();
+ }
+ if (connection != null)
+ {
+ connection.close();
+ }
+ }
+ }
+
+ private static void sendFrame(Socket socket, String data) throws Exception
+ {
+ byte[] bytes = data.getBytes("UTF-8");
+ OutputStream outputStream = socket.getOutputStream();
+ for (int i = 0; i < bytes.length; i++)
+ {
+ outputStream.write(bytes[i]);
+ }
+ outputStream.flush();
+ }
+
+}
Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-28 13:27:55 UTC (rev 8855)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-28 14:36:24 UTC (rev 8856)
@@ -705,6 +705,7 @@
"\n\n" +
Stomp.NULL;
sendFrame(frame);
+ waitForFrameToTakeEffect();
reconnect();
}
else {
@@ -738,7 +739,8 @@
"\n\n" +
Stomp.NULL;
sendFrame(frame);
-
+ waitForFrameToTakeEffect();
+
// now lets make sure we don't see the message again
reconnect();
14 years, 11 months
JBoss hornetq SVN: r8855 - trunk/src/main/org/hornetq/core/protocol/stomp.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-01-28 08:27:55 -0500 (Thu, 28 Jan 2010)
New Revision: 8855
Modified:
trunk/src/main/org/hornetq/core/protocol/stomp/Stomp.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompException.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
Log:
fix code format
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/Stomp.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/Stomp.java 2010-01-28 13:26:39 UTC (rev 8854)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/Stomp.java 2010-01-28 13:27:55 UTC (rev 8855)
@@ -17,108 +17,165 @@
*/
package org.hornetq.core.protocol.stomp;
-
/**
* The standard verbs and headers used for the <a href="http://stomp.codehaus.org/">STOMP</a> protocol.
*
* @version $Revision: 57 $
*/
-public interface Stomp {
- String NULL = "\u0000";
- String NEWLINE = "\n";
+public interface Stomp
+{
+ String NULL = "\u0000";
- public static interface Commands {
- String CONNECT = "CONNECT";
- String SEND = "SEND";
- String DISCONNECT = "DISCONNECT";
- String SUBSCRIBE = "SUBSCRIBE";
- String UNSUBSCRIBE = "UNSUBSCRIBE";
- String BEGIN_TRANSACTION = "BEGIN";
- String COMMIT_TRANSACTION = "COMMIT";
- String ABORT_TRANSACTION = "ABORT";
- String BEGIN = "BEGIN";
- String COMMIT = "COMMIT";
- String ABORT = "ABORT";
- String ACK = "ACK";
- }
+ String NEWLINE = "\n";
- public interface Responses {
- String CONNECTED = "CONNECTED";
- String ERROR = "ERROR";
- String MESSAGE = "MESSAGE";
- String RECEIPT = "RECEIPT";
- }
+ public static interface Commands
+ {
+ String CONNECT = "CONNECT";
- public interface Headers {
- String SEPERATOR = ":";
- String RECEIPT_REQUESTED = "receipt";
- String TRANSACTION = "transaction";
- String CONTENT_LENGTH = "content-length";
+ String SEND = "SEND";
- public interface Response {
- String RECEIPT_ID = "receipt-id";
- }
+ String DISCONNECT = "DISCONNECT";
- public interface Send {
- String DESTINATION = "destination";
- String CORRELATION_ID = "correlation-id";
- String REPLY_TO = "reply-to";
- String EXPIRATION_TIME = "expires";
- String PRIORITY = "priority";
- String TYPE = "type";
- Object PERSISTENT = "persistent";
- }
+ String SUBSCRIBE = "SUBSCRIBE";
- public interface Message {
- String MESSAGE_ID = "message-id";
- String DESTINATION = "destination";
- String CORRELATION_ID = "correlation-id";
- String EXPIRATION_TIME = "expires";
- String REPLY_TO = "reply-to";
- String PRORITY = "priority";
- String REDELIVERED = "redelivered";
- String TIMESTAMP = "timestamp";
- String TYPE = "type";
- String SUBSCRIPTION = "subscription";
- }
+ String UNSUBSCRIBE = "UNSUBSCRIBE";
- public interface Subscribe {
- String DESTINATION = "destination";
- String ACK_MODE = "ack";
- String ID = "id";
- String SELECTOR = "selector";
- String DURABLE_SUBSCRIPTION_NAME = "durable-subscription-name";
- String NO_LOCAL = "no-local";
+ String BEGIN_TRANSACTION = "BEGIN";
- public interface AckModeValues {
- String AUTO = "auto";
- String CLIENT = "client";
- }
- }
+ String COMMIT_TRANSACTION = "COMMIT";
- public interface Unsubscribe {
- String DESTINATION = "destination";
- String ID = "id";
- }
+ String ABORT_TRANSACTION = "ABORT";
- public interface Connect {
- String LOGIN = "login";
- String PASSCODE = "passcode";
- String CLIENT_ID = "client-id";
- String REQUEST_ID = "request-id";
- }
+ String BEGIN = "BEGIN";
- public interface Error {
- String MESSAGE = "message";
- }
+ String COMMIT = "COMMIT";
- public interface Connected {
- String SESSION = "session";
- String RESPONSE_ID = "response-id";
- }
+ String ABORT = "ABORT";
- public interface Ack {
- String MESSAGE_ID = "message-id";
- }
- }
+ String ACK = "ACK";
+ }
+
+ public interface Responses
+ {
+ String CONNECTED = "CONNECTED";
+
+ String ERROR = "ERROR";
+
+ String MESSAGE = "MESSAGE";
+
+ String RECEIPT = "RECEIPT";
+ }
+
+ public interface Headers
+ {
+ String SEPERATOR = ":";
+
+ String RECEIPT_REQUESTED = "receipt";
+
+ String TRANSACTION = "transaction";
+
+ String CONTENT_LENGTH = "content-length";
+
+ public interface Response
+ {
+ String RECEIPT_ID = "receipt-id";
+ }
+
+ public interface Send
+ {
+ String DESTINATION = "destination";
+
+ String CORRELATION_ID = "correlation-id";
+
+ String REPLY_TO = "reply-to";
+
+ String EXPIRATION_TIME = "expires";
+
+ String PRIORITY = "priority";
+
+ String TYPE = "type";
+
+ Object PERSISTENT = "persistent";
+ }
+
+ public interface Message
+ {
+ String MESSAGE_ID = "message-id";
+
+ String DESTINATION = "destination";
+
+ String CORRELATION_ID = "correlation-id";
+
+ String EXPIRATION_TIME = "expires";
+
+ String REPLY_TO = "reply-to";
+
+ String PRORITY = "priority";
+
+ String REDELIVERED = "redelivered";
+
+ String TIMESTAMP = "timestamp";
+
+ String TYPE = "type";
+
+ String SUBSCRIPTION = "subscription";
+ }
+
+ public interface Subscribe
+ {
+ String DESTINATION = "destination";
+
+ String ACK_MODE = "ack";
+
+ String ID = "id";
+
+ String SELECTOR = "selector";
+
+ String DURABLE_SUBSCRIPTION_NAME = "durable-subscription-name";
+
+ String NO_LOCAL = "no-local";
+
+ public interface AckModeValues
+ {
+ String AUTO = "auto";
+
+ String CLIENT = "client";
+ }
+ }
+
+ public interface Unsubscribe
+ {
+ String DESTINATION = "destination";
+
+ String ID = "id";
+ }
+
+ public interface Connect
+ {
+ String LOGIN = "login";
+
+ String PASSCODE = "passcode";
+
+ String CLIENT_ID = "client-id";
+
+ String REQUEST_ID = "request-id";
+ }
+
+ public interface Error
+ {
+ String MESSAGE = "message";
+ }
+
+ public interface Connected
+ {
+ String SESSION = "session";
+
+ String RESPONSE_ID = "response-id";
+ }
+
+ public interface Ack
+ {
+ String MESSAGE_ID = "message-id";
+ }
+ }
}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-01-28 13:26:39 UTC (rev 8854)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-01-28 13:27:55 UTC (rev 8855)
@@ -39,11 +39,11 @@
private static final Logger log = Logger.getLogger(StompConnection.class);
private final StompProtocolManager manager;
-
+
private final Connection transportConnection;
-
+
private String login;
-
+
private String passcode;
private String clientID;
@@ -52,12 +52,10 @@
private boolean destroyed = false;
- private final List<FailureListener> failureListeners = new CopyOnWriteArrayList<FailureListener>();
-
StompConnection(final Connection transportConnection, final StompProtocolManager manager)
{
this.transportConnection = transportConnection;
-
+
this.manager = manager;
}
@@ -67,12 +65,6 @@
public void addFailureListener(FailureListener listener)
{
- if (listener == null)
- {
- throw new IllegalStateException("FailureListener cannot be null");
- }
-
- failureListeners.add(listener);
}
public boolean checkDataReceived()
@@ -95,8 +87,8 @@
destroyed = true;
transportConnection.close();
-
- callFailureListeners(new HornetQException(HornetQException.INTERNAL_ERROR, "Stomp connection destroyed"));
+
+ manager.cleanup(this);
}
public void disconnect()
@@ -108,7 +100,7 @@
}
public void flush()
- {
+ {
}
public List<FailureListener> getFailureListeners()
@@ -124,7 +116,7 @@
}
public String getRemoteAddress()
- {
+ {
return transportConnection.getRemoteAddress();
}
@@ -150,22 +142,13 @@
public boolean removeFailureListener(FailureListener listener)
{
- if (listener == null)
- {
- throw new IllegalStateException("FailureListener cannot be null");
- }
-
- return failureListeners.remove(listener);
+ return false;
}
public void setFailureListeners(List<FailureListener> listeners)
{
- failureListeners.clear();
-
- failureListeners.addAll(listeners);
}
-
public void bufferReceived(Object connectionID, HornetQBuffer buffer)
{
manager.handleBuffer(this, buffer);
@@ -200,30 +183,9 @@
{
return valid;
}
-
+
public void setValid(boolean valid)
{
this.valid = valid;
}
-
- private void callFailureListeners(final HornetQException me)
- {
- final List<FailureListener> listenersClone = new ArrayList<FailureListener>(failureListeners);
-
- for (final FailureListener listener : listenersClone)
- {
- try
- {
- listener.connectionFailed(me);
- }
- catch (final Throwable t)
- {
- // Failure of one listener to execute shouldn't prevent others
- // from
- // executing
- log.error("Failed to execute failure listener", t);
- }
- }
- }
-
}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompException.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompException.java 2010-01-28 13:26:39 UTC (rev 8854)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompException.java 2010-01-28 13:27:55 UTC (rev 8855)
@@ -22,29 +22,36 @@
/**
* @author <a href="http://hiramchirino.com">chirino</a>
*/
-class StompException extends IOException {
- private static final long serialVersionUID = -2869735532997332242L;
- private final boolean fatal;
+class StompException extends IOException
+{
+ private static final long serialVersionUID = -2869735532997332242L;
- public StompException() {
- this(null);
- }
+ private final boolean fatal;
- public StompException(String s) {
- this(s, false);
- }
+ public StompException()
+ {
+ this(null);
+ }
- public StompException(String s, boolean fatal) {
- this(s, fatal, null);
- }
+ public StompException(String s)
+ {
+ this(s, false);
+ }
- public StompException(String s, boolean fatal, Throwable cause) {
- super(s);
- this.fatal = fatal;
- initCause(cause);
- }
+ public StompException(String s, boolean fatal)
+ {
+ this(s, fatal, null);
+ }
- public boolean isFatal() {
- return fatal;
- }
+ public StompException(String s, boolean fatal, Throwable cause)
+ {
+ super(s);
+ this.fatal = fatal;
+ initCause(cause);
+ }
+
+ public boolean isFatal()
+ {
+ return fatal;
+ }
}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java 2010-01-28 13:26:39 UTC (rev 8854)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java 2010-01-28 13:27:55 UTC (rev 8855)
@@ -22,14 +22,17 @@
*
* @author <a href="http://hiramchirino.com">chirino</a>
*/
-class StompFrameError extends StompFrame {
- private final StompException exception;
+class StompFrameError extends StompFrame
+{
+ private final StompException exception;
- public StompFrameError(StompException exception) {
- this.exception = exception;
- }
+ public StompFrameError(StompException exception)
+ {
+ this.exception = exception;
+ }
- public StompException getException() {
- return exception;
- }
+ public StompException getException()
+ {
+ return exception;
+ }
}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java 2010-01-28 13:26:39 UTC (rev 8854)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java 2010-01-28 13:27:55 UTC (rev 8855)
@@ -30,169 +30,206 @@
/**
* Implements marshalling and unmarsalling the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
*/
-class StompMarshaller {
- public static final byte[] NO_DATA = new byte[]{};
- private static final byte[] END_OF_FRAME = new byte[]{0, '\n'};
- private static final int MAX_COMMAND_LENGTH = 1024;
- private static final int MAX_HEADER_LENGTH = 1024 * 10;
- private static final int MAX_HEADERS = 1000;
- private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
- private int version = 1;
+class StompMarshaller
+{
+ public static final byte[] NO_DATA = new byte[] {};
- public int getVersion() {
- return version;
- }
+ private static final byte[] END_OF_FRAME = new byte[] { 0, '\n' };
- public void setVersion(int version) {
- this.version = version;
- }
+ private static final int MAX_COMMAND_LENGTH = 1024;
- public byte[] marshal(StompFrame command) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(baos);
- marshal(command, dos);
- dos.close();
- return baos.toByteArray();
- }
+ private static final int MAX_HEADER_LENGTH = 1024 * 10;
- public void marshal(StompFrame stomp, DataOutput os) throws IOException {
- StringBuffer buffer = new StringBuffer();
- buffer.append(stomp.getCommand());
- buffer.append(Stomp.NEWLINE);
+ private static final int MAX_HEADERS = 1000;
- // Output the headers.
- for (Iterator<Map.Entry<String, Object>> iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();) {
- Map.Entry<String, Object> entry = iter.next();
- buffer.append(entry.getKey());
- buffer.append(Stomp.Headers.SEPERATOR);
- buffer.append(entry.getValue());
- buffer.append(Stomp.NEWLINE);
- }
+ private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
- // Add a newline to seperate the headers from the content.
- buffer.append(Stomp.NEWLINE);
+ private int version = 1;
- os.write(buffer.toString().getBytes("UTF-8"));
- os.write(stomp.getContent());
- os.write(END_OF_FRAME);
- }
+ public int getVersion()
+ {
+ return version;
+ }
- public StompFrame unmarshal(HornetQBuffer in) throws IOException {
+ public void setVersion(int version)
+ {
+ this.version = version;
+ }
- try {
- String action = null;
+ public byte[] marshal(StompFrame command) throws IOException
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ marshal(command, dos);
+ dos.close();
+ return baos.toByteArray();
+ }
- // skip white space to next real action line
- while (true) {
- action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
- if (action == null) {
- throw new IOException("connection was closed");
- }
- else {
- action = action.trim();
- if (action.length() > 0) {
- break;
- }
- }
+ public void marshal(StompFrame stomp, DataOutput os) throws IOException
+ {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append(stomp.getCommand());
+ buffer.append(Stomp.NEWLINE);
+
+ // Output the headers.
+ for (Iterator<Map.Entry<String, Object>> iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();)
+ {
+ Map.Entry<String, Object> entry = iter.next();
+ buffer.append(entry.getKey());
+ buffer.append(Stomp.Headers.SEPERATOR);
+ buffer.append(entry.getValue());
+ buffer.append(Stomp.NEWLINE);
+ }
+
+ // Add a newline to seperate the headers from the content.
+ buffer.append(Stomp.NEWLINE);
+
+ os.write(buffer.toString().getBytes("UTF-8"));
+ os.write(stomp.getContent());
+ os.write(END_OF_FRAME);
+ }
+
+ public StompFrame unmarshal(HornetQBuffer in) throws IOException
+ {
+
+ try
+ {
+ String action = null;
+
+ // skip white space to next real action line
+ while (true)
+ {
+ action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
+ if (action == null)
+ {
+ throw new IOException("connection was closed");
}
+ else
+ {
+ action = action.trim();
+ if (action.length() > 0)
+ {
+ break;
+ }
+ }
+ }
- // Parse the headers
- HashMap<String, Object> headers = new HashMap<String, Object>(25);
- while (true) {
- String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
- if (line != null && line.trim().length() > 0) {
+ // Parse the headers
+ HashMap<String, Object> headers = new HashMap<String, Object>(25);
+ while (true)
+ {
+ String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
+ if (line != null && line.trim().length() > 0)
+ {
- if (headers.size() > MAX_HEADERS) {
- throw new StompException("The maximum number of headers was exceeded", true);
- }
+ if (headers.size() > MAX_HEADERS)
+ {
+ throw new StompException("The maximum number of headers was exceeded", true);
+ }
- try {
- int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR);
- String name = line.substring(0, seperator_index).trim();
- String value = line.substring(seperator_index + 1, line.length()).trim();
- headers.put(name, value);
- }
- catch (Exception e) {
- throw new StompException("Unable to parser header line [" + line + "]", true);
- }
- }
- else {
- break;
- }
+ try
+ {
+ int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR);
+ String name = line.substring(0, seperator_index).trim();
+ String value = line.substring(seperator_index + 1, line.length()).trim();
+ headers.put(name, value);
+ }
+ catch (Exception e)
+ {
+ throw new StompException("Unable to parser header line [" + line + "]", true);
+ }
}
+ else
+ {
+ break;
+ }
+ }
- // Read in the data part.
- byte[] data = NO_DATA;
- String contentLength = (String) headers.get(Stomp.Headers.CONTENT_LENGTH);
- if (contentLength != null) {
+ // Read in the data part.
+ byte[] data = NO_DATA;
+ String contentLength = (String)headers.get(Stomp.Headers.CONTENT_LENGTH);
+ if (contentLength != null)
+ {
- // Bless the client, he's telling us how much data to read in.
- int length;
- try {
- length = Integer.parseInt(contentLength.trim());
- }
- catch (NumberFormatException e) {
- throw new StompException("Specified content-length is not a valid integer", true);
- }
+ // Bless the client, he's telling us how much data to read in.
+ int length;
+ try
+ {
+ length = Integer.parseInt(contentLength.trim());
+ }
+ catch (NumberFormatException e)
+ {
+ throw new StompException("Specified content-length is not a valid integer", true);
+ }
- if (length > MAX_DATA_LENGTH) {
- throw new StompException("The maximum data length was exceeded", true);
- }
+ if (length > MAX_DATA_LENGTH)
+ {
+ throw new StompException("The maximum data length was exceeded", true);
+ }
- data = new byte[length];
- in.readBytes(data);
+ data = new byte[length];
+ in.readBytes(data);
- if (in.readByte() != 0) {
- throw new StompException(Stomp.Headers.CONTENT_LENGTH + " bytes were read and " + "there was no trailing null byte", true);
- }
+ if (in.readByte() != 0)
+ {
+ throw new StompException(Stomp.Headers.CONTENT_LENGTH + " bytes were read and " +
+ "there was no trailing null byte", true);
}
- else {
+ }
+ else
+ {
- // We don't know how much to read.. data ends when we hit a 0
- byte b;
- ByteArrayOutputStream baos = null;
- while (in.readableBytes() > 0 && (b = in.readByte()) != 0) {
+ // We don't know how much to read.. data ends when we hit a 0
+ byte b;
+ ByteArrayOutputStream baos = null;
+ while (in.readableBytes() > 0 && (b = in.readByte()) != 0)
+ {
- if (baos == null) {
- baos = new ByteArrayOutputStream();
- }
- else if (baos.size() > MAX_DATA_LENGTH) {
- throw new StompException("The maximum data length was exceeded", true);
- }
+ if (baos == null)
+ {
+ baos = new ByteArrayOutputStream();
+ }
+ else if (baos.size() > MAX_DATA_LENGTH)
+ {
+ throw new StompException("The maximum data length was exceeded", true);
+ }
- baos.write(b);
- }
+ baos.write(b);
+ }
- if (baos != null) {
- baos.close();
- data = baos.toByteArray();
- }
+ if (baos != null)
+ {
+ baos.close();
+ data = baos.toByteArray();
}
+ }
- return new StompFrame(action, headers, data);
- }
- catch (StompException e) {
- return new StompFrameError(e);
- }
- }
+ return new StompFrame(action, headers, data);
+ }
+ catch (StompException e)
+ {
+ return new StompFrameError(e);
+ }
+ }
- protected String readLine(HornetQBuffer in, int maxLength, String errorMessage) throws IOException {
- char[] chars = new char[MAX_HEADER_LENGTH];
-
- int count = 0;
- while (in.readable())
- {
- byte b = in.readByte();
-
- if (b == (byte)'\n')
- {
- break;
- }
- else
- {
- chars[count++] = (char)b;
- }
- }
- return new String(chars, 0, count);
- }
+ protected String readLine(HornetQBuffer in, int maxLength, String errorMessage) throws IOException
+ {
+ char[] chars = new char[MAX_HEADER_LENGTH];
+
+ int count = 0;
+ while (in.readable())
+ {
+ byte b = in.readByte();
+
+ if (b == (byte)'\n')
+ {
+ break;
+ }
+ else
+ {
+ chars[count++] = (char)b;
+ }
+ }
+ return new String(chars, 0, count);
+ }
}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-01-28 13:26:39 UTC (rev 8854)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-01-28 13:27:55 UTC (rev 8855)
@@ -191,7 +191,7 @@
{
send(conn, response);
}
-
+
if (Stomp.Commands.DISCONNECT.equals(command))
{
conn.destroy();
@@ -285,7 +285,7 @@
private StompFrame onAck(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception
{
Map<String, Object> headers = frame.getHeaders();
- String messageID = (String)headers.get(Stomp.Headers.Ack.MESSAGE_ID);
+ String messageID = (String)headers.get(Stomp.Headers.Ack.MESSAGE_ID);
String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
StompSession stompSession = null;
if (txID != null)
@@ -294,7 +294,7 @@
}
stompSession = getSession(connection);
stompSession.acknowledge(messageID);
-
+
return null;
}
@@ -537,5 +537,47 @@
}
}
+ public void cleanup(StompConnection connection)
+ {
+ connection.setValid(false);
+
+ StompSession session = sessions.remove(connection);
+ if (session != null)
+ {
+ try
+ {
+ session.getSession().rollback(true);
+ session.getSession().close();
+ session.getSession().runConnectionFailureRunners();
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ }
+
+ // removed the transacted session belonging to the connection
+ Iterator<Entry<String, StompSession>> iterator = transactedSessions.entrySet().iterator();
+ while (iterator.hasNext())
+ {
+ Map.Entry<String, StompSession> entry = (Map.Entry<String, StompSession>)iterator.next();
+ if (entry.getValue().getConnection() == connection)
+ {
+ ServerSession serverSession = entry.getValue().getSession();
+ try
+ {
+ serverSession.rollback(true);
+ serverSession.close();
+ serverSession.runConnectionFailureRunners();
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ iterator.remove();
+ }
+ }
+ }
+
// Inner classes -------------------------------------------------
}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-01-28 13:26:39 UTC (rev 8854)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-01-28 13:27:55 UTC (rev 8855)
@@ -97,10 +97,10 @@
buffer.readBytes(data);
headers.put(Headers.CONTENT_LENGTH, data.length);
}
-
+
StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
-
+
int length = manager.send(connection, frame);
if (subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO))
@@ -150,14 +150,11 @@
SimpleString queue = SimpleString.toSimpleString(destination);
if (destination.startsWith(StompUtils.HQ_TOPIC_PREFIX))
{
- //subscribes to a topic
+ // subscribes to a topic
queue = UUIDGenerator.getInstance().generateSimpleStringUUID();
session.createQueue(SimpleString.toSimpleString(destination), queue, null, true, false);
}
- session.createConsumer(consumerID,
- queue,
- SimpleString.toSimpleString(selector),
- false);
+ session.createConsumer(consumerID, queue, SimpleString.toSimpleString(selector), false);
session.receiveConsumerCredits(consumerID, -1);
StompSubscription subscription = new StompSubscription(subscriptionID, destination, ack);
subscriptions.put(consumerID, subscription);
@@ -185,7 +182,7 @@
}
boolean containsSubscription(String subscriptionID)
- {
+ {
Iterator<Entry<Long, StompSubscription>> iterator = subscriptions.entrySet().iterator();
while (iterator.hasNext())
{
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java 2010-01-28 13:26:39 UTC (rev 8854)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java 2010-01-28 13:27:55 UTC (rev 8855)
@@ -54,7 +54,7 @@
{
return destination;
}
-
+
public String getID()
{
return subID;
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java 2010-01-28 13:26:39 UTC (rev 8854)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java 2010-01-28 13:27:55 UTC (rev 8855)
@@ -124,11 +124,11 @@
public static void copyStandardHeadersFromFrameToMessage(StompFrame frame, ServerMessageImpl msg) throws Exception
{
Map<String, Object> headers = new HashMap<String, Object>(frame.getHeaders());
-
+
String priority = (String)headers.remove(Stomp.Headers.Send.PRIORITY);
if (priority != null)
{
- msg.setPriority(Byte.parseByte(priority));
+ msg.setPriority(Byte.parseByte(priority));
}
String persistent = (String)headers.remove(Stomp.Headers.Send.PERSISTENT);
if (persistent != null)
@@ -160,34 +160,39 @@
}
}
- public static void copyStandardHeadersFromMessageToFrame(Message message, StompFrame command, int deliveryCount) throws Exception {
+ public static void copyStandardHeadersFromMessageToFrame(Message message, StompFrame command, int deliveryCount) throws Exception
+ {
final Map<String, Object> headers = command.getHeaders();
headers.put(Stomp.Headers.Message.DESTINATION, toStompDestination(message.getAddress().toString()));
headers.put(Stomp.Headers.Message.MESSAGE_ID, message.getMessageID());
- if (message.getObjectProperty("JMSCorrelationID") != null) {
- headers.put(Stomp.Headers.Message.CORRELATION_ID, message.getObjectProperty("JMSCorrelationID"));
+ if (message.getObjectProperty("JMSCorrelationID") != null)
+ {
+ headers.put(Stomp.Headers.Message.CORRELATION_ID, message.getObjectProperty("JMSCorrelationID"));
}
headers.put(Stomp.Headers.Message.EXPIRATION_TIME, "" + message.getExpiration());
headers.put(Stomp.Headers.Message.REDELIVERED, deliveryCount > 1);
headers.put(Stomp.Headers.Message.PRORITY, "" + message.getPriority());
- if (message.getStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME) != null) {
- headers.put(Stomp.Headers.Message.REPLY_TO, toStompDestination(message.getStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME)));
+ if (message.getStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME) != null)
+ {
+ headers.put(Stomp.Headers.Message.REPLY_TO,
+ toStompDestination(message.getStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME)));
}
headers.put(Stomp.Headers.Message.TIMESTAMP, "" + message.getTimestamp());
- if (message.getObjectProperty("JMSType") != null) {
- headers.put(Stomp.Headers.Message.TYPE, message.getObjectProperty("JMSType"));
+ if (message.getObjectProperty("JMSType") != null)
+ {
+ headers.put(Stomp.Headers.Message.TYPE, message.getObjectProperty("JMSType"));
}
// now lets add all the message headers
Set<SimpleString> names = message.getPropertyNames();
for (SimpleString name : names)
{
- headers.put(name.toString(), message.getObjectProperty(name));
+ headers.put(name.toString(), message.getObjectProperty(name));
}
- }
+ }
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
14 years, 11 months
JBoss hornetq SVN: r8854 - trunk/tests/src/org/hornetq/tests/integration/stomp.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-01-28 08:26:39 -0500 (Thu, 28 Jan 2010)
New Revision: 8854
Modified:
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
add missing calls to super.setup() & tearDown()
Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-28 10:42:51 UTC (rev 8853)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-28 13:26:39 UTC (rev 8854)
@@ -1216,6 +1216,8 @@
// Implementation methods
//-------------------------------------------------------------------------
protected void setUp() throws Exception {
+ super.setUp();
+
server = createServer();
server.start();
connectionFactory = createConnectionFactory();
@@ -1262,6 +1264,8 @@
stompSocket.close();
}
server.stop();
+
+ super.tearDown();
}
protected void reconnect() throws Exception {
14 years, 11 months
JBoss hornetq SVN: r8853 - trunk/tests/src/org/hornetq/tests/integration/stomp.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-01-28 05:42:51 -0500 (Thu, 28 Jan 2010)
New Revision: 8853
Modified:
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* test fixes
Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-28 09:52:47 UTC (rev 8852)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-28 10:42:51 UTC (rev 8853)
@@ -754,10 +754,15 @@
frame =
"SUBSCRIBE\n" +
- "destination:/queue/" + getQueueName() + "\n\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "receipt: 1234\n\n" +
Stomp.NULL;
sendFrame(frame);
+ // wait for SUBSCRIBE's receipt
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("RECEIPT"));
+
sendMessage("shouldBeNextMessage");
frame = receiveFrame(10000);
@@ -1109,7 +1114,7 @@
"\n\n" +
Stomp.NULL;
sendFrame(frame);
- // wait for UNSUBSCRIPE's receipt
+ // wait for UNSUBSCRIBE's receipt
frame = receiveFrame(10000);
Assert.assertTrue(frame.startsWith("RECEIPT"));
14 years, 11 months
JBoss hornetq SVN: r8852 - trunk/tests/src/org/hornetq/tests/integration/stomp.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-01-28 04:52:47 -0500 (Thu, 28 Jan 2010)
New Revision: 8852
Modified:
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* fix testSubscribeToTopic. Make sure the Stomp client is unsubscribed before sending
another message on the topic.
Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-27 17:07:27 UTC (rev 8851)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-28 09:52:47 UTC (rev 8852)
@@ -442,7 +442,6 @@
frame = receiveFrame(10000);
Assert.assertNotNull(frame);
- System.out.println(frame);
Assert.assertTrue(frame.startsWith("MESSAGE"));
Assert.assertTrue(frame.indexOf("S:") > 0);
Assert.assertTrue(frame.indexOf("n:") > 0);
@@ -1101,14 +1100,18 @@
frame = receiveFrame(10000);
Assert.assertTrue(frame.startsWith("MESSAGE"));
Assert.assertTrue(frame.indexOf("destination:") > 0);
- Assert.assertTrue(frame.indexOf(getName()) > 0);
+ Assert.assertTrue(frame.indexOf(getTopicName()) > 0);
frame =
"UNSUBSCRIBE\n" +
"destination:/topic/" + getTopicName() + "\n" +
+ "receipt: 1234\n" +
"\n\n" +
Stomp.NULL;
sendFrame(frame);
+ // wait for UNSUBSCRIPE's receipt
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("RECEIPT"));
sendMessage(getName(), topic);
@@ -1151,7 +1154,6 @@
sendMessage(getName());
frame = receiveFrame(10000);
- System.out.println(frame);
Assert.assertTrue(frame.startsWith("MESSAGE"));
Assert.assertTrue(frame.indexOf("destination:") > 0);
Assert.assertTrue(frame.indexOf(getName()) > 0);
@@ -1160,7 +1162,6 @@
Matcher cl_matcher = cl.matcher(frame);
Assert.assertTrue(cl_matcher.find());
String messageID = cl_matcher.group(1);
- System.out.println(messageID);
frame =
"BEGIN\n" +
14 years, 11 months
JBoss hornetq SVN: r8851 - in trunk: tests/src/org/hornetq/tests/integration/stomp and 1 other directory.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-01-27 12:07:27 -0500 (Wed, 27 Jan 2010)
New Revision: 8851
Modified:
trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* test fixes
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java 2010-01-27 16:25:14 UTC (rev 8850)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java 2010-01-27 17:07:27 UTC (rev 8851)
@@ -61,8 +61,8 @@
buffer.append(Stomp.NEWLINE);
// Output the headers.
- for (Iterator iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();) {
- Map.Entry entry = (Map.Entry) iter.next();
+ for (Iterator<Map.Entry<String, Object>> iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();) {
+ Map.Entry<String, Object> entry = iter.next();
buffer.append(entry.getKey());
buffer.append(Stomp.Headers.SEPERATOR);
buffer.append(entry.getValue());
@@ -97,7 +97,7 @@
}
// Parse the headers
- HashMap headers = new HashMap(25);
+ HashMap<String, Object> headers = new HashMap<String, Object>(25);
while (true) {
String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
if (line != null && line.trim().length() > 0) {
@@ -177,15 +177,22 @@
}
protected String readLine(HornetQBuffer in, int maxLength, String errorMessage) throws IOException {
- byte b;
- ByteArrayOutputStream baos = new ByteArrayOutputStream(maxLength);
- while ((b = in.readByte()) != '\n') {
- if (baos.size() > maxLength) {
- throw new StompException(errorMessage, true);
- }
- baos.write(b);
- }
- byte[] sequence = baos.toByteArray();
- return new String(sequence, "UTF-8");
+ char[] chars = new char[MAX_HEADER_LENGTH];
+
+ int count = 0;
+ while (in.readable())
+ {
+ byte b = in.readByte();
+
+ if (b == (byte)'\n')
+ {
+ break;
+ }
+ else
+ {
+ chars[count++] = (char)b;
+ }
+ }
+ return new String(chars, 0, count);
}
}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-01-27 16:25:14 UTC (rev 8850)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-01-27 17:07:27 UTC (rev 8851)
@@ -277,7 +277,7 @@
boolean unsubscribed = stompSession.unsubscribe(subscriptionID);
if (!unsubscribed)
{
- throw new StompException("Cannot unsubscribe as o subscription exists for id: " + subscriptionID);
+ throw new StompException("Cannot unsubscribe as a subscription exists for id: " + subscriptionID);
}
return null;
}
@@ -412,6 +412,8 @@
private StompFrame onDisconnect(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception
{
+ connection.setValid(false);
+
StompSession session = sessions.remove(connection);
if (session != null)
{
@@ -439,7 +441,6 @@
iterator.remove();
}
}
- connection.setValid(false);
return null;
}
Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-27 16:25:14 UTC (rev 8850)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-27 17:07:27 UTC (rev 8851)
@@ -98,6 +98,8 @@
connect_frame = "DISCONNECT\n\n" + Stomp.NULL;
sendFrame(connect_frame);
+ waitForFrameToTakeEffect();
+
// sending a message will result in an error
String frame =
"SEND\n" +
@@ -1193,7 +1195,7 @@
frame =
"UNSUBSCRIBE\n" +
- "destination:/topic/" + getQueueName() + "\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
"\n\n" +
Stomp.NULL;
sendFrame(frame);
14 years, 11 months