JBoss hornetq SVN: r8850 - trunk/src/main/org/hornetq/integration/transports/netty.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-01-27 11:25:14 -0500 (Wed, 27 Jan 2010)
New Revision: 8850
Modified:
trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
Log:
fix protocol configuration
* upper case the value read from the configuration to ensure it matches one of the ProtocolType values
Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-27 16:24:10 UTC (rev 8849)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-27 16:25:14 UTC (rev 8850)
@@ -190,7 +190,7 @@
String protocolStr = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOL_PROP_NAME,
TransportConstants.DEFAULT_PROTOCOL,
configuration);
- protocol = ProtocolType.valueOf(protocolStr);
+ protocol = ProtocolType.valueOf(protocolStr.toUpperCase());
host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME,
TransportConstants.DEFAULT_HOST,
14 years, 11 months
JBoss hornetq SVN: r8849 - trunk.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-01-27 11:24:10 -0500 (Wed, 27 Jan 2010)
New Revision: 8849
Modified:
trunk/build-hornetq.xml
Log:
fix jar-core-client target
* add missing packages added after protocol refactoring
Modified: trunk/build-hornetq.xml
===================================================================
--- trunk/build-hornetq.xml 2010-01-27 15:15:42 UTC (rev 8848)
+++ trunk/build-hornetq.xml 2010-01-27 16:24:10 UTC (rev 8849)
@@ -760,12 +760,14 @@
<include name="org/hornetq/core/list/**/*.class"/>
<include name="org/hornetq/core/logging/**/*.class"/>
<include name="org/hornetq/core/message/**/*.class"/>
+ <include name="org/hornetq/core/protocol/core/**/*.class"/>
<include name="org/hornetq/core/remoting/**/*.class"/>
<include name="org/hornetq/core/version/**/*.class"/>
<include name="org/hornetq/core/management/*.class"/>
<include name="org/hornetq/core/transaction/impl/XidImpl.class"/>
+ <include name="org/hornetq/spi/core/logging/*.class"/>
+ <include name="org/hornetq/spi/core/protocol/*.class"/>
<include name="org/hornetq/spi/core/remoting/*.class"/>
- <include name="org/hornetq/spi/core/logging/*.class"/>
<!-- required by SessionSendMessage -->
<include name="org/hornetq/core/server/ServerMessage.class"/>
14 years, 11 months
JBoss hornetq SVN: r8848 - in trunk: tests/src/org/hornetq/tests/integration/stomp and 1 other directory.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-01-27 10:15:42 -0500 (Wed, 27 Jan 2010)
New Revision: 8848
Modified:
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* create a HornetQ queue when a subcription to a topic is made and create
a consumer on this queue.
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-01-27 12:23:11 UTC (rev 8847)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-01-27 15:15:42 UTC (rev 8848)
@@ -189,15 +189,20 @@
if (response != null)
{
- send(connection, response);
+ send(conn, response);
}
+
+ if (Stomp.Commands.DISCONNECT.equals(command))
+ {
+ conn.destroy();
+ }
}
catch (Exception e)
{
StompFrame error = createError(e, request);
if (error != null)
{
- send(connection, error);
+ send(conn, error);
}
}
}
@@ -235,6 +240,7 @@
}
subscriptionID = "subscription/" + destination;
}
+ String hornetqDestination = StompUtils.toHornetQAddress(destination);
StompSession stompSession = getSession(connection);
if (stompSession.containsSubscription(subscriptionID))
{
@@ -242,7 +248,7 @@
". Either use unique subscription IDs or do not create multiple subscriptions for the same destination");
}
long consumerID = server.getStorageManager().generateUniqueID();
- stompSession.addSubscription(consumerID, subscriptionID, destination, selector, ack);
+ stompSession.addSubscription(consumerID, subscriptionID, hornetqDestination, selector, ack);
return null;
}
@@ -284,12 +290,9 @@
StompSession stompSession = null;
if (txID != null)
{
- throw new StompException("transactional ACK are not supported");
+ log.warn("Transactional acknowledgement is not supported");
}
- else
- {
- stompSession = getSession(connection);
- }
+ stompSession = getSession(connection);
stompSession.acknowledge(messageID);
return null;
@@ -508,12 +511,12 @@
return new StompFrame(Stomp.Responses.CONNECTED, h, StompMarshaller.NO_DATA);
}
- public int send(RemotingConnection connection, StompFrame frame)
+ public int send(StompConnection connection, StompFrame frame)
{
System.out.println(">>> " + frame);
synchronized (connection)
{
- if (connection.isDestroyed())
+ if (connection.isDestroyed() || !connection.isValid())
{
log.warn("Connection closed " + connection);
return 0;
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-01-27 12:23:11 UTC (rev 8847)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-01-27 15:15:42 UTC (rev 8848)
@@ -26,6 +26,7 @@
import org.hornetq.core.server.ServerSession;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.spi.core.protocol.SessionCallback;
+import org.hornetq.utils.UUIDGenerator;
/**
* A StompSession
@@ -146,17 +147,23 @@
public void addSubscription(long consumerID, String subscriptionID, String destination, String selector, String ack) throws Exception
{
- String queue = StompUtils.toHornetQAddress(destination);
- session.createConsumer(consumerID,
- SimpleString.toSimpleString(queue),
- SimpleString.toSimpleString(selector),
- false);
- session.receiveConsumerCredits(consumerID, -1);
- StompSubscription subscription = new StompSubscription(subscriptionID, destination, ack);
- subscriptions.put(consumerID, subscription);
- // FIXME not very smart: since we can't start the consumer, we start the session
- // everytime to start the new consumer (and all previous consumers...)
- session.start();
+ SimpleString queue = SimpleString.toSimpleString(destination);
+ if (destination.startsWith(StompUtils.HQ_TOPIC_PREFIX))
+ {
+ //subscribes to a topic
+ queue = UUIDGenerator.getInstance().generateSimpleStringUUID();
+ session.createQueue(SimpleString.toSimpleString(destination), queue, null, true, false);
+ }
+ session.createConsumer(consumerID,
+ queue,
+ SimpleString.toSimpleString(selector),
+ false);
+ session.receiveConsumerCredits(consumerID, -1);
+ StompSubscription subscription = new StompSubscription(subscriptionID, destination, ack);
+ subscriptions.put(consumerID, subscription);
+ // FIXME not very smart: since we can't start the consumer, we start the session
+ // every time to start the new consumer (and all previous consumers...)
+ session.start();
}
public boolean unsubscribe(String id) throws Exception
Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-27 12:23:11 UTC (rev 8847)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-27 15:15:42 UTC (rev 8848)
@@ -32,15 +32,16 @@
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
+import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.jms.Topic;
import junit.framework.Assert;
-import junit.framework.TestCase;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.Configuration;
@@ -58,6 +59,7 @@
import org.hornetq.jms.server.config.JMSConfiguration;
import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
import org.hornetq.jms.server.config.impl.QueueConfigurationImpl;
+import org.hornetq.jms.server.config.impl.TopicConfigurationImpl;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.spi.core.protocol.ProtocolType;
import org.hornetq.tests.util.UnitTestCase;
@@ -71,6 +73,7 @@
private Connection connection;
private Session session;
private Queue queue;
+ private Topic topic;
private JMSServerManager server;
public void testConnect() throws Exception {
@@ -101,10 +104,12 @@
"destination:/queue/" + getQueueName() + "\n\n" +
"Hello World" +
Stomp.NULL;
- sendFrame(frame);
-
- f = receiveFrame(10000);
- Assert.assertTrue(f.startsWith("ERROR"));
+ try {
+ sendFrame(frame);
+ Assert.fail("the socket must have been closed when the server handled the DISCONNECT");
+ } catch (IOException e)
+ {
+ }
}
@@ -599,7 +604,7 @@
sendMessage(getName());
frame = receiveFrame(10000);
Assert.assertTrue(frame.startsWith("MESSAGE"));
- Pattern cl = Pattern.compile("message-id:\\s*(\\d+)", Pattern.CASE_INSENSITIVE);
+ Pattern cl = Pattern.compile("message-id:\\s*(\\S+)", Pattern.CASE_INSENSITIVE);
Matcher cl_matcher = cl.matcher(frame);
Assert.assertTrue(cl_matcher.find());
String messageID = cl_matcher.group(1);
@@ -1069,7 +1074,137 @@
Assert.assertNotNull(message);
Assert.assertEquals("second message", message.getText().trim());
}
+
+ public void testSubscribeToTopic() throws Exception {
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/topic/" + getTopicName() + "\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ sendMessage(getName(), topic);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.indexOf("destination:") > 0);
+ Assert.assertTrue(frame.indexOf(getName()) > 0);
+
+ frame =
+ "UNSUBSCRIBE\n" +
+ "destination:/topic/" + getTopicName() + "\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ sendMessage(getName(), topic);
+
+ try {
+ frame = receiveFrame(1000);
+ log.info("Received frame: " + frame);
+ Assert.fail("No message should have been received since subscription was removed");
+ }
+ catch (SocketTimeoutException e) {
+
+ }
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ }
+
+ public void testClientAckNotPartOfTransaction() throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "ack:client\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ sendMessage(getName());
+
+ frame = receiveFrame(10000);
+ System.out.println(frame);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.indexOf("destination:") > 0);
+ Assert.assertTrue(frame.indexOf(getName()) > 0);
+ Assert.assertTrue(frame.indexOf("message-id:") > 0);
+ Pattern cl = Pattern.compile("message-id:\\s*(\\S+)", Pattern.CASE_INSENSITIVE);
+ Matcher cl_matcher = cl.matcher(frame);
+ Assert.assertTrue(cl_matcher.find());
+ String messageID = cl_matcher.group(1);
+ System.out.println(messageID);
+
+ frame =
+ "BEGIN\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "ACK\n" +
+ "message-id:" + messageID + "\n" +
+ "transaction: tx1\n" +
+ "\n" +
+ "second message" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "ABORT\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ try {
+ frame = receiveFrame(1000);
+ log.info("Received frame: " + frame);
+ Assert.fail("No message should have been received as the message was acked even though the transaction has been aborted");
+ }
+ catch (SocketTimeoutException e) {
+ }
+
+ frame =
+ "UNSUBSCRIBE\n" +
+ "destination:/topic/" + getQueueName() + "\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ }
+
// Implementation methods
//-------------------------------------------------------------------------
protected void setUp() throws Exception {
@@ -1083,6 +1218,7 @@
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queue = session.createQueue(getQueueName());
+ topic = session.createTopic(getTopicName());
connection.start();
}
@@ -1106,6 +1242,7 @@
JMSConfiguration jmsConfig = new JMSConfigurationImpl();
jmsConfig.getQueueConfigurations().add(new QueueConfigurationImpl(getQueueName(), null, false, getQueueName()));
+ jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl(getTopicName(), getTopicName()));
server = new JMSServerManagerImpl(hornetQServer, jmsConfig);
server.setContext(null);
return server;
@@ -1145,6 +1282,10 @@
return "test";
}
+ protected String getTopicName() {
+ return "testtopic";
+ }
+
public void sendFrame(String data) throws Exception {
byte[] bytes = data.getBytes("UTF-8");
OutputStream outputStream = stompSocket.getOutputStream();
@@ -1182,11 +1323,19 @@
}
public void sendMessage(String msg) throws Exception {
- sendMessage(msg, "foo", "xyz");
+ sendMessage(msg, "foo", "xyz", queue);
+ }
+
+ public void sendMessage(String msg, Destination destination) throws Exception {
+ sendMessage(msg, "foo", "xyz", destination);
}
public void sendMessage(String msg, String propertyName, String propertyValue) throws JMSException {
- MessageProducer producer = session.createProducer(queue);
+ sendMessage(msg, propertyName, propertyValue, queue);
+ }
+
+ public void sendMessage(String msg, String propertyName, String propertyValue, Destination destination) throws JMSException {
+ MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage(msg);
message.setStringProperty(propertyName, propertyValue);
producer.send(message);
14 years, 11 months
JBoss hornetq SVN: r8847 - in trunk: tests/src/org/hornetq/tests/integration/stomp and 1 other directory.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-01-27 07:23:11 -0500 (Wed, 27 Jan 2010)
New Revision: 8847
Modified:
trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* fixed disconnection of stomp connections
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-01-26 16:14:02 UTC (rev 8846)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-01-27 12:23:11 UTC (rev 8847)
@@ -13,8 +13,10 @@
package org.hornetq.core.protocol.stomp;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
@@ -48,6 +50,10 @@
private boolean valid;
+ private boolean destroyed = false;
+
+ private final List<FailureListener> failureListeners = new CopyOnWriteArrayList<FailureListener>();
+
StompConnection(final Connection transportConnection, final StompProtocolManager manager)
{
this.transportConnection = transportConnection;
@@ -61,6 +67,12 @@
public void addFailureListener(FailureListener listener)
{
+ if (listener == null)
+ {
+ throw new IllegalStateException("FailureListener cannot be null");
+ }
+
+ failureListeners.add(listener);
}
public boolean checkDataReceived()
@@ -75,8 +87,16 @@
public void destroy()
{
- manager.cleanup(this);
+ if (destroyed)
+ {
+ return;
+ }
+
+ destroyed = true;
+
transportConnection.close();
+
+ callFailureListeners(new HornetQException(HornetQException.INTERNAL_ERROR, "Stomp connection destroyed"));
}
public void disconnect()
@@ -93,6 +113,8 @@
public List<FailureListener> getFailureListeners()
{
+ // we do not return the listeners otherwise the remoting service
+ // would NOT destroy the connection.
return Collections.emptyList();
}
@@ -118,7 +140,7 @@
public boolean isDestroyed()
{
- return false;
+ return destroyed;
}
public boolean removeCloseListener(CloseListener listener)
@@ -128,11 +150,19 @@
public boolean removeFailureListener(FailureListener listener)
{
- return false;
+ if (listener == null)
+ {
+ throw new IllegalStateException("FailureListener cannot be null");
+ }
+
+ return failureListeners.remove(listener);
}
public void setFailureListeners(List<FailureListener> listeners)
{
+ failureListeners.clear();
+
+ failureListeners.addAll(listeners);
}
@@ -175,4 +205,25 @@
{
this.valid = valid;
}
+
+ private void callFailureListeners(final HornetQException me)
+ {
+ final List<FailureListener> listenersClone = new ArrayList<FailureListener>(failureListeners);
+
+ for (final FailureListener listener : listenersClone)
+ {
+ try
+ {
+ listener.connectionFailed(me);
+ }
+ catch (final Throwable t)
+ {
+ // Failure of one listener to execute shouldn't prevent others
+ // from
+ // executing
+ log.error("Failed to execute failure listener", t);
+ }
+ }
+ }
+
}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-01-26 16:14:02 UTC (rev 8846)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-01-27 12:23:11 UTC (rev 8847)
@@ -366,7 +366,7 @@
StompSession stompSession = sessions.get(connection);
if (stompSession == null)
{
- stompSession = new StompSession(marshaller, connection);
+ stompSession = new StompSession(connection, this);
String name = UUIDGenerator.getInstance().generateStringUUID();
ServerSession session = server.createSession(name,
connection.getLogin(),
@@ -389,7 +389,7 @@
StompSession stompSession = transactedSessions.get(txID);
if (stompSession == null)
{
- stompSession = new StompSession(marshaller, connection);
+ stompSession = new StompSession(connection, this);
String name = UUIDGenerator.getInstance().generateStringUUID();
ServerSession session = server.createSession(name,
connection.getLogin(),
@@ -508,35 +508,27 @@
return new StompFrame(Stomp.Responses.CONNECTED, h, StompMarshaller.NO_DATA);
}
- private void send(RemotingConnection connection, StompFrame frame)
+ public int send(RemotingConnection connection, StompFrame frame)
{
System.out.println(">>> " + frame);
-
- try
+ synchronized (connection)
{
- byte[] bytes = marshaller.marshal(frame);
- HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
- connection.getTransportConnection().write(buffer, true);
- }
- catch (IOException e)
- {
- log.error("Unable to send frame " + frame, e);
- }
- }
-
- synchronized void cleanup(StompConnection conn)
- {
- StompSession session = sessions.remove(conn);
- if (session != null)
- {
+ if (connection.isDestroyed())
+ {
+ log.warn("Connection closed " + connection);
+ return 0;
+ }
try
{
- session.getSession().rollback(true);
- session.getSession().close();
+ byte[] bytes = marshaller.marshal(frame);
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+ connection.getTransportConnection().write(buffer, true);
+ return bytes.length;
}
- catch (Exception e)
+ catch (IOException e)
{
- log.error(e);
+ log.error("Unable to send frame " + frame, e);
+ return 0;
}
}
}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-01-26 16:14:02 UTC (rev 8846)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-01-27 12:23:11 UTC (rev 8847)
@@ -18,7 +18,6 @@
import java.util.Map.Entry;
import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.message.impl.MessageImpl;
@@ -35,9 +34,9 @@
*/
class StompSession implements SessionCallback
{
- private final RemotingConnection connection;
+ private final StompProtocolManager manager;
- private final StompMarshaller marshaller;
+ private final StompConnection connection;
private ServerSession session;
@@ -46,10 +45,10 @@
// key = message ID, value = consumer ID
private final Map<Long, Long> messagesToAck = new HashMap<Long, Long>();
- StompSession(final StompMarshaller marshaller, final RemotingConnection connection)
+ StompSession(final StompConnection connection, final StompProtocolManager manager)
{
- this.marshaller = marshaller;
this.connection = connection;
+ this.manager = manager;
}
void setServerSession(ServerSession session)
@@ -101,10 +100,7 @@
StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
- System.out.println(">>> " + frame);
- byte[] bytes = marshaller.marshal(frame);
- HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
- connection.getTransportConnection().write(buffer, true);
+ int length = manager.send(connection, frame);
if (subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO))
{
@@ -115,7 +111,7 @@
{
messagesToAck.put(serverMessage.getMessageID(), consumerID);
}
- return bytes.length;
+ return length;
}
catch (Exception e)
@@ -151,8 +147,6 @@
public void addSubscription(long consumerID, String subscriptionID, String destination, String selector, String ack) throws Exception
{
String queue = StompUtils.toHornetQAddress(destination);
- synchronized (session)
- {
session.createConsumer(consumerID,
SimpleString.toSimpleString(queue),
SimpleString.toSimpleString(selector),
@@ -163,7 +157,6 @@
// FIXME not very smart: since we can't start the consumer, we start the session
// everytime to start the new consumer (and all previous consumers...)
session.start();
- }
}
public boolean unsubscribe(String id) throws Exception
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java 2010-01-26 16:14:02 UTC (rev 8846)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java 2010-01-27 12:23:11 UTC (rev 8847)
@@ -156,7 +156,6 @@
Map.Entry<String, Object> entry = iter.next();
String name = (String)entry.getKey();
Object value = entry.getValue();
- System.out.println(name + "=" + value);
msg.putObjectProperty(name, value);
}
}
Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-26 16:14:02 UTC (rev 8846)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-27 12:23:11 UTC (rev 8847)
@@ -60,8 +60,9 @@
import org.hornetq.jms.server.config.impl.QueueConfigurationImpl;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.tests.util.UnitTestCase;
-public class StompTest extends TestCase {
+public class StompTest extends UnitTestCase {
private static final transient Logger log = Logger.getLogger(StompTest.class);
private int port = 61613;
private Socket stompSocket;
14 years, 11 months
JBoss hornetq SVN: r8846 - trunk/docs/user-manual/en.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-01-26 11:14:02 -0500 (Tue, 26 Jan 2010)
New Revision: 8846
Modified:
trunk/docs/user-manual/en/configuration-index.xml
Log:
doc fix
fixed link references for cluster-user & cluster-password in configuration index
Modified: trunk/docs/user-manual/en/configuration-index.xml
===================================================================
--- trunk/docs/user-manual/en/configuration-index.xml 2010-01-26 15:27:21 UTC (rev 8845)
+++ trunk/docs/user-manual/en/configuration-index.xml 2010-01-26 16:14:02 UTC (rev 8846)
@@ -226,19 +226,19 @@
<entry>jms.queue.hornetq.management</entry>
</row>
<row>
- <entry><link linkend="management.replication"
+ <entry><link linkend="clusters.clusteruser"
>cluster-user</link></entry>
<entry>String</entry>
- <entry>the user used to for replicating management operations between
+ <entry>the user used by cluster connections to communicate between the
clustered nodes</entry>
<entry>HORNETQ.CLUSTER.ADMIN.USER</entry>
</row>
<row>
- <entry><link linkend="management.replication"
+ <entry><link linkend="clusters.clusteruser"
>cluster-password</link></entry>
<entry>String</entry>
- <entry>the password used to for replicating management operations
- between clustered nodes</entry>
+ <entry>the password used by cluster connections to communicate between the
+ clustered nodes</entry>
<entry>CHANGE ME!!</entry>
</row>
<row>
14 years, 11 months
JBoss hornetq SVN: r8845 - in trunk: tests/src/org/hornetq/tests/integration/stomp and 1 other directory.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-01-26 10:27:21 -0500 (Tue, 26 Jan 2010)
New Revision: 8845
Modified:
trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* added checks and headers support
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-01-26 13:31:50 UTC (rev 8844)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-01-26 15:27:21 UTC (rev 8845)
@@ -22,7 +22,6 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.remoting.FailureListener;
-import org.hornetq.spi.core.protocol.ProtocolManager;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.spi.core.remoting.Connection;
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-01-26 13:31:50 UTC (rev 8844)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-01-26 15:27:21 UTC (rev 8845)
@@ -19,12 +19,13 @@
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
-import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
@@ -81,7 +82,9 @@
headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
}
- return new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
+ byte[] payload = baos.toByteArray();
+ headers.put(Stomp.Headers.CONTENT_LENGTH, payload.length);
+ return new StompFrame(Stomp.Responses.ERROR, headers, payload);
}
catch (UnsupportedEncodingException ex)
{
@@ -175,12 +178,13 @@
if (request.getHeaders().containsKey(Stomp.Headers.RECEIPT_REQUESTED))
{
- if (response == null)
+ if (response == null)
{
Map<String, Object> h = new HashMap<String, Object>();
response = new StompFrame(Stomp.Responses.RECEIPT, h, StompMarshaller.NO_DATA);
}
- response.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, request.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED));
+ response.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID,
+ request.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED));
}
if (response != null)
@@ -206,52 +210,88 @@
// Private -------------------------------------------------------
- private StompFrame onSubscribe(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception,
- StompException,
- HornetQException
+ private StompFrame onSubscribe(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception
{
Map<String, Object> headers = frame.getHeaders();
String destination = (String)headers.get(Stomp.Headers.Subscribe.DESTINATION);
String selector = (String)headers.get(Stomp.Headers.Subscribe.SELECTOR);
String ack = (String)headers.get(Stomp.Headers.Subscribe.ACK_MODE);
- String subID = (String)headers.get(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME);
+ String id = (String)headers.get(Stomp.Headers.Subscribe.ID);
if (ack == null)
{
ack = Stomp.Headers.Subscribe.AckModeValues.AUTO;
}
+ String subscriptionID = null;
+ if (id != null)
+ {
+ subscriptionID = id;
+ }
+ else
+ {
+ if (destination == null)
+ {
+ throw new StompException("Client must set destination or id header to a SUBSCRIBE command");
+ }
+ subscriptionID = "subscription/" + destination;
+ }
StompSession stompSession = getSession(connection);
+ if (stompSession.containsSubscription(subscriptionID))
+ {
+ throw new StompException("There already is a subscription for: " + subscriptionID +
+ ". Either use unique subscription IDs or do not create multiple subscriptions for the same destination");
+ }
long consumerID = server.getStorageManager().generateUniqueID();
- stompSession.addSubscription(consumerID, subID, destination, selector, ack);
+ stompSession.addSubscription(consumerID, subscriptionID, destination, selector, ack);
return null;
}
- private StompFrame onUnsubscribe(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception,
- StompException,
- HornetQException
+ private StompFrame onUnsubscribe(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception
{
Map<String, Object> headers = frame.getHeaders();
String destination = (String)headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
String id = (String)headers.get(Stomp.Headers.Unsubscribe.ID);
+ String subscriptionID = null;
+ if (id != null)
+ {
+ subscriptionID = id;
+ }
+ else
+ {
+ if (destination == null)
+ {
+ throw new StompException("Must specify the subscription's id or the destination you are unsubscribing from");
+ }
+ subscriptionID = "subscription/" + destination;
+ }
+
StompSession stompSession = getSession(connection);
- stompSession.unsubscribe(destination);
-
+ boolean unsubscribed = stompSession.unsubscribe(subscriptionID);
+ if (!unsubscribed)
+ {
+ throw new StompException("Cannot unsubscribe as o subscription exists for id: " + subscriptionID);
+ }
return null;
}
- private StompFrame onAck(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception,
- StompException,
- HornetQException
+ private StompFrame onAck(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception
{
Map<String, Object> headers = frame.getHeaders();
- String messageID = (String)headers.get(Stomp.Headers.Ack.MESSAGE_ID);
+ String messageID = (String)headers.get(Stomp.Headers.Ack.MESSAGE_ID);
String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
-
- StompSession stompSession = getSession(connection);
+ StompSession stompSession = null;
+ if (txID != null)
+ {
+ throw new StompException("transactional ACK are not supported");
+ }
+ else
+ {
+ stompSession = getSession(connection);
+ }
stompSession.acknowledge(messageID);
-
+
return null;
}
@@ -259,24 +299,30 @@
{
Map<String, Object> headers = frame.getHeaders();
String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
-
+ if (txID == null)
+ {
+ throw new StompException("transaction header is mandatory to BEGIN a transaction");
+ }
if (transactedSessions.containsKey(txID))
{
throw new StompException("Transaction already started: " + txID);
}
- StompSession stompSession = getTransactedSession(connection, txID);
+ // create the transacted session
+ getTransactedSession(connection, txID);
+
return null;
}
- private StompFrame onCommit(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception,
- StompException,
- HornetQException
+ private StompFrame onCommit(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception
{
Map<String, Object> headers = frame.getHeaders();
String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
+ if (txID == null)
+ {
+ throw new StompException("transaction header is mandatory to COMMIT a transaction");
+ }
StompSession session = transactedSessions.remove(txID);
-
if (session == null)
{
throw new StompException("No transaction started: " + txID);
@@ -287,12 +333,14 @@
return null;
}
- private StompFrame onAbort(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception,
- StompException,
- HornetQException
+ private StompFrame onAbort(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception
{
Map<String, Object> headers = frame.getHeaders();
String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
+ if (txID == null)
+ {
+ throw new StompException("transaction header is mandatory to ABORT a transaction");
+ }
StompSession session = transactedSessions.remove(txID);
@@ -361,7 +409,7 @@
private StompFrame onDisconnect(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception
{
- StompSession session = getSession(connection);
+ StompSession session = sessions.remove(connection);
if (session != null)
{
try
@@ -373,9 +421,23 @@
{
throw new StompException(e.getMessage());
}
- sessions.remove(connection);
- connection.setValid(false);
}
+
+ // removed the transacted session belonging to the connection
+ Iterator<Entry<String, StompSession>> iterator = transactedSessions.entrySet().iterator();
+ while (iterator.hasNext())
+ {
+ Map.Entry<String, StompSession> entry = (Map.Entry<String, StompSession>)iterator.next();
+ if (entry.getValue().getConnection() == connection)
+ {
+ ServerSession serverSession = entry.getValue().getSession();
+ serverSession.rollback(true);
+ serverSession.close();
+ iterator.remove();
+ }
+ }
+ connection.setValid(false);
+
return null;
}
@@ -412,7 +474,8 @@
if (txID == null)
{
session = getSession(connection).getSession();
- } else
+ }
+ else
{
session = transactedSessions.get(txID).getSession();
}
@@ -461,7 +524,7 @@
}
}
- public synchronized void cleanup(StompConnection conn)
+ synchronized void cleanup(StompConnection conn)
{
StompSession session = sessions.remove(conn);
if (session != null)
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-01-26 13:31:50 UTC (rev 8844)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-01-26 15:27:21 UTC (rev 8845)
@@ -70,9 +70,15 @@
{
try
{
+ StompSubscription subscription = subscriptions.get(consumerID);
+
Map<String, Object> headers = new HashMap<String, Object>();
headers.put(Stomp.Headers.Message.DESTINATION, StompUtils.toStompDestination(serverMessage.getAddress()
.toString()));
+ if (subscription.getID() != null)
+ {
+ headers.put(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
+ }
byte[] data = new byte[] {};
if (serverMessage.getType() == Message.TEXT_TYPE)
{
@@ -91,15 +97,15 @@
buffer.readBytes(data);
headers.put(Headers.CONTENT_LENGTH, data.length);
}
+
StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
+
System.out.println(">>> " + frame);
byte[] bytes = marshaller.marshal(frame);
HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
connection.getTransportConnection().write(buffer, true);
- StompSubscription subscription = subscriptions.get(consumerID);
-
if (subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO))
{
session.acknowledge(consumerID, serverMessage.getMessageID());
@@ -142,7 +148,7 @@
session.commit();
}
- public void addSubscription(long consumerID, String clientID, String destination, String selector, String ack) throws Exception
+ public void addSubscription(long consumerID, String subscriptionID, String destination, String selector, String ack) throws Exception
{
String queue = StompUtils.toHornetQAddress(destination);
synchronized (session)
@@ -152,7 +158,7 @@
SimpleString.toSimpleString(selector),
false);
session.receiveConsumerCredits(consumerID, -1);
- StompSubscription subscription = new StompSubscription(consumerID, clientID, destination, ack);
+ StompSubscription subscription = new StompSubscription(subscriptionID, destination, ack);
subscriptions.put(consumerID, subscription);
// FIXME not very smart: since we can't start the consumer, we start the session
// everytime to start the new consumer (and all previous consumers...)
@@ -160,7 +166,7 @@
}
}
- public void unsubscribe(String destination) throws Exception
+ public boolean unsubscribe(String id) throws Exception
{
Iterator<Entry<Long, StompSubscription>> iterator = subscriptions.entrySet().iterator();
while (iterator.hasNext())
@@ -168,11 +174,33 @@
Map.Entry<Long, StompSubscription> entry = (Map.Entry<Long, StompSubscription>)iterator.next();
long consumerID = entry.getKey();
StompSubscription sub = entry.getValue();
- if (sub.getDestination().equals(destination))
+ if (id != null && id.equals(sub.getID()))
{
iterator.remove();
session.closeConsumer(consumerID);
+ return true;
}
}
+ return false;
}
+
+ boolean containsSubscription(String subscriptionID)
+ {
+ Iterator<Entry<Long, StompSubscription>> iterator = subscriptions.entrySet().iterator();
+ while (iterator.hasNext())
+ {
+ Map.Entry<Long, StompSubscription> entry = (Map.Entry<Long, StompSubscription>)iterator.next();
+ StompSubscription sub = entry.getValue();
+ if (sub.getID().equals(subscriptionID))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public RemotingConnection getConnection()
+ {
+ return connection;
+ }
}
\ No newline at end of file
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java 2010-01-26 13:31:50 UTC (rev 8844)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java 2010-01-26 15:27:21 UTC (rev 8845)
@@ -26,8 +26,6 @@
// Attributes ----------------------------------------------------
- private final long consumerID;
-
private final String subID;
private final String destination;
@@ -38,9 +36,8 @@
// Constructors --------------------------------------------------
- public StompSubscription(long consumerID, String subID, String destination, String ack)
+ public StompSubscription(String subID, String destination, String ack)
{
- this.consumerID = consumerID;
this.subID = subID;
this.destination = destination;
this.ack = ack;
@@ -57,6 +54,11 @@
{
return destination;
}
+
+ public String getID()
+ {
+ return subID;
+ }
// Package protected ---------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-26 13:31:50 UTC (rev 8844)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-26 15:27:21 UTC (rev 8845)
@@ -454,7 +454,42 @@
Stomp.NULL;
sendFrame(frame);
}
+
+ public void testSubscribeWithID() throws Exception {
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "ack:auto\n" +
+ "id: mysubid\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ sendMessage(getName());
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.indexOf("destination:") > 0);
+ Assert.assertTrue(frame.indexOf("subscription:") > 0);
+ Assert.assertTrue(frame.indexOf(getName()) > 0);
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ }
+
public void testMessagesAreInOrder() throws Exception {
int ctr = 10;
String[] data = new String[ctr];
@@ -622,7 +657,7 @@
Assert.assertNotNull(message);
Assert.assertTrue(message.getJMSRedelivered());
}
-
+
public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws Exception {
assertSubscribeWithClientAckThenConsumeWithAutoAck(false);
}
@@ -771,6 +806,55 @@
}
}
+ public void testUnsubscribeWithID() throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "id: mysubid\n" +
+ "ack:auto\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ //send a message to our queue
+ sendMessage("first message");
+
+ //receive message from socket
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+ //remove suscription
+ frame =
+ "UNSUBSCRIBE\n" +
+ "id:mysubid\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ waitForFrameToTakeEffect();
+
+ //send a message to our queue
+ sendMessage("second message");
+
+ try {
+ frame = receiveFrame(1000);
+ log.info("Received frame: " + frame);
+ Assert.fail("No message should have been received since subscription was removed");
+ }
+ catch (SocketTimeoutException e) {
+
+ }
+ }
+
public void testTransactionCommit() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@@ -816,7 +900,106 @@
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull("Should have received a message", message);
}
+
+ public void testSuccessiveTransactionsWithSameID() throws Exception {
+ MessageConsumer consumer = session.createConsumer(queue);
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ String f = receiveFrame(1000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
+
+ // first tx
+ frame =
+ "BEGIN\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "COMMIT\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull("Should have received a message", message);
+
+ // 2nd tx with same tx ID
+ frame =
+ "BEGIN\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "COMMIT\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull("Should have received a message", message);
+}
+
+ public void testBeginSameTransactionTwice() throws Exception {
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ String f = receiveFrame(1000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
+
+ frame =
+ "BEGIN\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ // begin the tx a 2nd time
+ frame =
+ "BEGIN\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ f = receiveFrame(1000);
+ Assert.assertTrue(f.startsWith("ERROR"));
+
+ }
+
public void testTransactionRollback() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
14 years, 11 months
JBoss hornetq SVN: r8844 - in trunk: src/main/org/hornetq/core/server and 2 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-01-26 08:31:50 -0500 (Tue, 26 Jan 2010)
New Revision: 8844
Added:
trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java
Removed:
trunk/src/main/org/hornetq/core/protocol/stomp/StompSessionCallback.java
Modified:
trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
trunk/src/main/org/hornetq/core/server/ServerSession.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* added missing Stomp commands and responses
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-01-22 15:32:13 UTC (rev 8843)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-01-26 13:31:50 UTC (rev 8844)
@@ -33,15 +33,23 @@
*
*
*/
-public class StompConnection implements RemotingConnection
+class StompConnection implements RemotingConnection
{
private static final Logger log = Logger.getLogger(StompConnection.class);
- private final ProtocolManager manager;
+ private final StompProtocolManager manager;
private final Connection transportConnection;
- StompConnection(final Connection transportConnection, final ProtocolManager manager)
+ private String login;
+
+ private String passcode;
+
+ private String clientID;
+
+ private boolean valid;
+
+ StompConnection(final Connection transportConnection, final StompProtocolManager manager)
{
this.transportConnection = transportConnection;
@@ -68,6 +76,8 @@
public void destroy()
{
+ manager.cleanup(this);
+ transportConnection.close();
}
public void disconnect()
@@ -84,7 +94,7 @@
public List<FailureListener> getFailureListeners()
{
- return Collections.EMPTY_LIST;
+ return Collections.emptyList();
}
public Object getID()
@@ -132,9 +142,38 @@
manager.handleBuffer(this, buffer);
}
- public int isReadyToHandle(HornetQBuffer buffer)
+ public void setLogin(String login)
{
- return -1;
+ this.login = login;
}
+ public String getLogin()
+ {
+ return login;
+ }
+
+ public void setPasscode(String passcode)
+ {
+ this.passcode = passcode;
+ }
+
+ public String getPasscode()
+ {
+ return passcode;
+ }
+
+ public void setClientID(String clientID)
+ {
+ this.clientID = clientID;
+ }
+
+ public boolean isValid()
+ {
+ return valid;
+ }
+
+ public void setValid(boolean valid)
+ {
+ this.valid = valid;
+ }
}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java 2010-01-22 15:32:13 UTC (rev 8843)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java 2010-01-26 13:31:50 UTC (rev 8844)
@@ -65,6 +65,6 @@
@Override
public String toString()
{
- return "StompFrame[command=" + command + ", headers=" + headers + ",content-length=" + content.length + "]";
+ return "StompFrame[command=" + command + ", headers=" + headers + ", content-length=" + content.length + "]";
}
}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java 2010-01-22 15:32:13 UTC (rev 8843)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java 2010-01-26 13:31:50 UTC (rev 8844)
@@ -31,7 +31,7 @@
* Implements marshalling and unmarsalling the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
*/
class StompMarshaller {
- private static final byte[] NO_DATA = new byte[]{};
+ public static final byte[] NO_DATA = new byte[]{};
private static final byte[] END_OF_FRAME = new byte[]{0, '\n'};
private static final int MAX_COMMAND_LENGTH = 1024;
private static final int MAX_HEADER_LENGTH = 1024 * 10;
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-01-22 15:32:13 UTC (rev 8843)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-01-26 13:31:50 UTC (rev 8844)
@@ -31,7 +31,6 @@
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.ServerSession;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.spi.core.protocol.ConnectionEntry;
@@ -43,28 +42,64 @@
/**
* StompProtocolManager
*
- * A stupid protocol to demonstrate how to implement a new protocol in HornetQ
- *
- * @author Tim Fox
- *
- *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
*/
-public class StompProtocolManager implements ProtocolManager
+class StompProtocolManager implements ProtocolManager
{
+ // Constants -----------------------------------------------------
+
private static final Logger log = Logger.getLogger(StompProtocolManager.class);
+ // Attributes ----------------------------------------------------
+
private final HornetQServer server;
private final StompMarshaller marshaller;
- private final Map<RemotingConnection, ServerSession> sessions = new HashMap<RemotingConnection, ServerSession>();
+ private final Map<String, StompSession> transactedSessions = new HashMap<String, StompSession>();
+ private final Map<RemotingConnection, StompSession> sessions = new HashMap<RemotingConnection, StompSession>();
+
+ // Static --------------------------------------------------------
+
+ private static StompFrame createError(Exception e, StompFrame request)
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try
+ {
+ // Let the stomp client know about any protocol errors.
+ PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
+ e.printStackTrace(stream);
+ stream.close();
+
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put(Stomp.Headers.Error.MESSAGE, e.getMessage());
+
+ final String receiptId = (String)request.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+ if (receiptId != null)
+ {
+ headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+ }
+
+ return new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
+ }
+ catch (UnsupportedEncodingException ex)
+ {
+ log.warn("Unable to create ERROR frame from the exception", ex);
+ return null;
+ }
+ }
+
+ // Constructors --------------------------------------------------
+
public StompProtocolManager(final HornetQServer server, final List<Interceptor> interceptors)
{
this.server = server;
this.marshaller = new StompMarshaller();
}
+ // ProtocolManager implementation --------------------------------
+
public ConnectionEntry createConnectionEntry(final Connection connection)
{
StompConnection conn = new StompConnection(connection, this);
@@ -76,160 +111,293 @@
{
}
+ public int isReadyToHandle(HornetQBuffer buffer)
+ {
+ return -1;
+ }
+
public void handleBuffer(RemotingConnection connection, HornetQBuffer buffer)
{
- StompFrame frame = null;
+ StompConnection conn = (StompConnection)connection;
+ StompFrame request = null;
try
{
- frame = marshaller.unmarshal(buffer);
- System.out.println("RECEIVED " + frame);
+ request = marshaller.unmarshal(buffer);
+ System.out.println("<<< " + request);
- String command = frame.getCommand();
+ String command = request.getCommand();
StompFrame response = null;
if (Stomp.Commands.CONNECT.equals(command))
{
- response = onConnect(frame, server, connection);
+ response = onConnect(request, server, conn);
}
else if (Stomp.Commands.DISCONNECT.equals(command))
{
- response = onDisconnect(frame, server, connection);
+ response = onDisconnect(request, server, conn);
}
else if (Stomp.Commands.SEND.equals(command))
{
- response = onSend(frame, server, connection);
+ response = onSend(request, server, conn);
}
else if (Stomp.Commands.SUBSCRIBE.equals(command))
{
- response = onSubscribe(frame, server, connection);
+ response = onSubscribe(request, server, conn);
}
+ else if (Stomp.Commands.UNSUBSCRIBE.equals(command))
+ {
+ response = onUnsubscribe(request, server, conn);
+ }
+ else if (Stomp.Commands.ACK.equals(command))
+ {
+ response = onAck(request, server, conn);
+ }
+ else if (Stomp.Commands.BEGIN.equals(command))
+ {
+ response = onBegin(request, server, conn);
+ }
+ else if (Stomp.Commands.COMMIT.equals(command))
+ {
+ response = onCommit(request, server, conn);
+ }
+ else if (Stomp.Commands.ABORT.equals(command))
+ {
+ response = onAbort(request, server, conn);
+ }
else
{
- log.error("Unsupported Stomp frame: " + frame);
+
+ log.error("Unsupported Stomp frame: " + request);
response = new StompFrame(Stomp.Responses.ERROR,
new HashMap<String, Object>(),
("Unsupported frame: " + command).getBytes());
}
+ if (request.getHeaders().containsKey(Stomp.Headers.RECEIPT_REQUESTED))
+ {
+ if (response == null)
+ {
+ Map<String, Object> h = new HashMap<String, Object>();
+ response = new StompFrame(Stomp.Responses.RECEIPT, h, StompMarshaller.NO_DATA);
+ }
+ response.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, request.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED));
+ }
+
if (response != null)
{
send(connection, response);
}
}
- catch (StompException ex)
+ catch (Exception e)
{
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- try
+ StompFrame error = createError(e, request);
+ if (error != null)
{
- // Let the stomp client know about any protocol errors.
- PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
- ex.printStackTrace(stream);
- stream.close();
+ send(connection, error);
}
- catch (UnsupportedEncodingException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+ }
+ }
- Map<String, Object> headers = new HashMap<String, Object>();
- headers.put(Stomp.Headers.Error.MESSAGE, ex.getMessage());
+ // Public --------------------------------------------------------
- final String receiptId = (String)frame.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
- if (receiptId != null)
- {
- headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
- }
+ // Package protected ---------------------------------------------
- StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
- try
- {
- send(connection, errorMessage);
- }
- catch (IOException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private StompFrame onSubscribe(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception,
+ StompException,
+ HornetQException
+ {
+ Map<String, Object> headers = frame.getHeaders();
+ String destination = (String)headers.get(Stomp.Headers.Subscribe.DESTINATION);
+ String selector = (String)headers.get(Stomp.Headers.Subscribe.SELECTOR);
+ String ack = (String)headers.get(Stomp.Headers.Subscribe.ACK_MODE);
+ String subID = (String)headers.get(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME);
+
+ if (ack == null)
+ {
+ ack = Stomp.Headers.Subscribe.AckModeValues.AUTO;
}
- catch (Exception ex)
+ StompSession stompSession = getSession(connection);
+ long consumerID = server.getStorageManager().generateUniqueID();
+ stompSession.addSubscription(consumerID, subID, destination, selector, ack);
+
+ return null;
+ }
+
+ private StompFrame onUnsubscribe(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception,
+ StompException,
+ HornetQException
+ {
+ Map<String, Object> headers = frame.getHeaders();
+ String destination = (String)headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
+ String id = (String)headers.get(Stomp.Headers.Unsubscribe.ID);
+
+ StompSession stompSession = getSession(connection);
+ stompSession.unsubscribe(destination);
+
+ return null;
+ }
+
+ private StompFrame onAck(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception,
+ StompException,
+ HornetQException
+ {
+ Map<String, Object> headers = frame.getHeaders();
+ String messageID = (String)headers.get(Stomp.Headers.Ack.MESSAGE_ID);
+ String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
+
+ StompSession stompSession = getSession(connection);
+ stompSession.acknowledge(messageID);
+
+ return null;
+ }
+
+ private StompFrame onBegin(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception
+ {
+ Map<String, Object> headers = frame.getHeaders();
+ String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
+
+ if (transactedSessions.containsKey(txID))
{
- ex.printStackTrace();
+ throw new StompException("Transaction already started: " + txID);
}
+ StompSession stompSession = getTransactedSession(connection, txID);
+ return null;
}
- private StompFrame onSubscribe(StompFrame frame, HornetQServer server, RemotingConnection connection) throws Exception,
- StompException,
- HornetQException
+ private StompFrame onCommit(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception,
+ StompException,
+ HornetQException
{
Map<String, Object> headers = frame.getHeaders();
- String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
- SimpleString queueName = SimpleString.toSimpleString(StompUtils.toHornetQAddress(queue));
+ String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
- ServerSession session = checkAndGetSession(connection);
- long consumerID = server.getStorageManager().generateUniqueID();
- session.createConsumer(consumerID, queueName, null, false);
- session.receiveConsumerCredits(consumerID, -1);
- session.start();
+ StompSession session = transactedSessions.remove(txID);
+ if (session == null)
+ {
+ throw new StompException("No transaction started: " + txID);
+ }
+
+ session.getSession().commit();
+
return null;
}
- private ServerSession checkAndGetSession(RemotingConnection connection) throws StompException
+ private StompFrame onAbort(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception,
+ StompException,
+ HornetQException
{
- ServerSession session = sessions.get(connection);
+ Map<String, Object> headers = frame.getHeaders();
+ String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
+
+ StompSession session = transactedSessions.remove(txID);
+
if (session == null)
{
+ throw new StompException("No transaction started: " + txID);
+ }
+ session.getSession().rollback(false);
+
+ return null;
+ }
+
+ private void checkConnected(StompConnection connection) throws StompException
+ {
+ if (!connection.isValid())
+ {
throw new StompException("Not connected");
}
- return session;
}
- private StompFrame onDisconnect(StompFrame frame, HornetQServer server, RemotingConnection connection) throws StompException
+ private StompSession getSession(StompConnection connection) throws Exception
{
- ServerSession session = checkAndGetSession(connection);
+ StompSession stompSession = sessions.get(connection);
+ if (stompSession == null)
+ {
+ stompSession = new StompSession(marshaller, connection);
+ String name = UUIDGenerator.getInstance().generateStringUUID();
+ ServerSession session = server.createSession(name,
+ connection.getLogin(),
+ connection.getPasscode(),
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ connection,
+ true,
+ false,
+ false,
+ false,
+ stompSession);
+ stompSession.setServerSession(session);
+ sessions.put(connection, stompSession);
+ }
+ return stompSession;
+ }
+
+ private StompSession getTransactedSession(StompConnection connection, String txID) throws Exception
+ {
+ StompSession stompSession = transactedSessions.get(txID);
+ if (stompSession == null)
+ {
+ stompSession = new StompSession(marshaller, connection);
+ String name = UUIDGenerator.getInstance().generateStringUUID();
+ ServerSession session = server.createSession(name,
+ connection.getLogin(),
+ connection.getPasscode(),
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ connection,
+ false,
+ false,
+ false,
+ false,
+ stompSession);
+ stompSession.setServerSession(session);
+ transactedSessions.put(txID, stompSession);
+ }
+ return stompSession;
+ }
+
+ private StompFrame onDisconnect(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception
+ {
+ StompSession session = getSession(connection);
if (session != null)
{
try
{
- session.close();
+ session.getSession().rollback(true);
+ session.getSession().close();
}
catch (Exception e)
{
throw new StompException(e.getMessage());
}
sessions.remove(connection);
+ connection.setValid(false);
}
return null;
}
- private StompFrame onSend(StompFrame frame, HornetQServer server, RemotingConnection connection) throws Exception
+ private StompFrame onSend(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception
{
- ServerSession session = checkAndGetSession(connection);
-
+ checkConnected(connection);
Map<String, Object> headers = frame.getHeaders();
- String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
- /*
- String type = (String)headers.get(Stomp.Headers.Send.TYPE);
- long expiration = (Long)headers.get(Stomp.Headers.Send.EXPIRATION_TIME);
- byte priority = (Byte)headers.get(Stomp.Headers.Send.PRIORITY);
- boolean durable = (Boolean)headers.get(Stomp.Headers.Send.PERSISTENT);
- */
+ String queue = (String)headers.remove(Stomp.Headers.Send.DESTINATION);
+ String txID = (String)headers.remove(Stomp.Headers.TRANSACTION);
byte type = Message.TEXT_TYPE;
if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH))
{
type = Message.BYTES_TYPE;
}
long timestamp = System.currentTimeMillis();
- boolean durable = false;
- long expiration = -1;
- byte priority = 9;
SimpleString address = SimpleString.toSimpleString(StompUtils.toHornetQAddress(queue));
ServerMessageImpl message = new ServerMessageImpl(server.getStorageManager().generateUniqueID(), 512);
message.setType(type);
message.setTimestamp(timestamp);
message.setAddress(address);
+ StompUtils.copyStandardHeadersFromFrameToMessage(frame, message);
byte[] content = frame.getContent();
if (type == Message.TEXT_TYPE)
{
@@ -240,56 +408,75 @@
message.getBodyBuffer().writeBytes(content);
}
- session.send(message);
- if (headers.containsKey(Stomp.Headers.RECEIPT_REQUESTED))
+ ServerSession session = null;
+ if (txID == null)
{
- Map<String, Object> h = new HashMap<String, Object>();
- h.put(Stomp.Headers.Response.RECEIPT_ID, headers.get(Stomp.Headers.RECEIPT_REQUESTED));
- return new StompFrame(Stomp.Responses.RECEIPT, h, new byte[] {});
- }
- else
+ session = getSession(connection).getSession();
+ } else
{
- return null;
+ session = transactedSessions.get(txID).getSession();
}
+
+ session.send(message);
+ return null;
}
- private StompFrame onConnect(StompFrame frame, HornetQServer server, final RemotingConnection connection) throws Exception
+ private StompFrame onConnect(StompFrame frame, HornetQServer server, final StompConnection connection) throws Exception
{
Map<String, Object> headers = frame.getHeaders();
String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
String passcode = (String)headers.get(Stomp.Headers.Connect.PASSCODE);
+ String clientID = (String)headers.get(Stomp.Headers.Connect.CLIENT_ID);
String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
- String name = UUIDGenerator.getInstance().generateStringUUID();
- ServerSession session = server.createSession(name,
- login,
- passcode,
- HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
- connection,
- true,
- true,
- false,
- false,
- new StompSessionCallback(marshaller, connection));
- sessions.put(connection, session);
- System.out.println(">>> created session " + session);
+ server.getSecurityManager().validateUser(login, passcode);
+
+ connection.setLogin(login);
+ connection.setPasscode(passcode);
+ connection.setClientID(clientID);
+ connection.setValid(true);
+
HashMap<String, Object> h = new HashMap<String, Object>();
- h.put(Stomp.Headers.Connected.SESSION, name);
- h.put(Stomp.Headers.Connected.RESPONSE_ID, requestID);
- return new StompFrame(Stomp.Responses.CONNECTED, h, new byte[] {});
+ h.put(Stomp.Headers.Connected.SESSION, connection.getID());
+ if (requestID != null)
+ {
+ h.put(Stomp.Headers.Connected.RESPONSE_ID, requestID);
+ }
+ return new StompFrame(Stomp.Responses.CONNECTED, h, StompMarshaller.NO_DATA);
}
- private void send(RemotingConnection connection, StompFrame frame) throws IOException
+ private void send(RemotingConnection connection, StompFrame frame)
{
- System.out.println("SENDING >>> " + frame);
- byte[] bytes = marshaller.marshal(frame);
- HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
- System.out.println("ready to send reply: " + buffer);
- connection.getTransportConnection().write(buffer, true);
+ System.out.println(">>> " + frame);
+
+ try
+ {
+ byte[] bytes = marshaller.marshal(frame);
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+ connection.getTransportConnection().write(buffer, true);
+ }
+ catch (IOException e)
+ {
+ log.error("Unable to send frame " + frame, e);
+ }
}
- public int isReadyToHandle(HornetQBuffer buffer)
+ public synchronized void cleanup(StompConnection conn)
{
- return -1;
+ StompSession session = sessions.remove(conn);
+ if (session != null)
+ {
+ try
+ {
+ session.getSession().rollback(true);
+ session.getSession().close();
+ }
+ catch (Exception e)
+ {
+ log.error(e);
+ }
+ }
}
+
+ // Inner classes -------------------------------------------------
}
Copied: trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java (from rev 8843, trunk/src/main/org/hornetq/core/protocol/stomp/StompSessionCallback.java)
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-01-26 13:31:50 UTC (rev 8844)
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.protocol.stomp.Stomp.Headers;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.ServerSession;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.protocol.SessionCallback;
+
+/**
+ * A StompSession
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ */
+class StompSession implements SessionCallback
+{
+ private final RemotingConnection connection;
+
+ private final StompMarshaller marshaller;
+
+ private ServerSession session;
+
+ private final Map<Long, StompSubscription> subscriptions = new HashMap<Long, StompSubscription>();
+
+ // key = message ID, value = consumer ID
+ private final Map<Long, Long> messagesToAck = new HashMap<Long, Long>();
+
+ StompSession(final StompMarshaller marshaller, final RemotingConnection connection)
+ {
+ this.marshaller = marshaller;
+ this.connection = connection;
+ }
+
+ void setServerSession(ServerSession session)
+ {
+ this.session = session;
+ }
+
+ public ServerSession getSession()
+ {
+ return session;
+ }
+
+ public void sendProducerCreditsMessage(int credits, SimpleString address, int offset)
+ {
+ }
+
+ public int sendMessage(ServerMessage serverMessage, long consumerID, int deliveryCount)
+ {
+ try
+ {
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put(Stomp.Headers.Message.DESTINATION, StompUtils.toStompDestination(serverMessage.getAddress()
+ .toString()));
+ byte[] data = new byte[] {};
+ if (serverMessage.getType() == Message.TEXT_TYPE)
+ {
+ SimpleString text = serverMessage.getBodyBuffer().readNullableSimpleString();
+ if (text != null)
+ {
+ data = text.toString().getBytes();
+ }
+ }
+ else
+ {
+ HornetQBuffer buffer = serverMessage.getBodyBuffer();
+ buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE);
+ int size = serverMessage.getEndOfBodyPosition() - buffer.readerIndex();
+ data = new byte[size];
+ buffer.readBytes(data);
+ headers.put(Headers.CONTENT_LENGTH, data.length);
+ }
+ StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
+ StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
+ System.out.println(">>> " + frame);
+ byte[] bytes = marshaller.marshal(frame);
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+ connection.getTransportConnection().write(buffer, true);
+
+ StompSubscription subscription = subscriptions.get(consumerID);
+
+ if (subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO))
+ {
+ session.acknowledge(consumerID, serverMessage.getMessageID());
+ session.commit();
+ }
+ else
+ {
+ messagesToAck.put(serverMessage.getMessageID(), consumerID);
+ }
+ return bytes.length;
+
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ return 0;
+ }
+
+ }
+
+ public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean continues, boolean requiresResponse)
+ {
+ return 0;
+ }
+
+ public int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize, int deliveryCount)
+ {
+ return 0;
+ }
+
+ public void closed()
+ {
+ }
+
+ public void acknowledge(String messageID) throws Exception
+ {
+ long id = Long.parseLong(messageID);
+ long consumerID = messagesToAck.remove(id);
+ session.acknowledge(consumerID, id);
+ session.commit();
+ }
+
+ public void addSubscription(long consumerID, String clientID, String destination, String selector, String ack) throws Exception
+ {
+ String queue = StompUtils.toHornetQAddress(destination);
+ synchronized (session)
+ {
+ session.createConsumer(consumerID,
+ SimpleString.toSimpleString(queue),
+ SimpleString.toSimpleString(selector),
+ false);
+ session.receiveConsumerCredits(consumerID, -1);
+ StompSubscription subscription = new StompSubscription(consumerID, clientID, destination, ack);
+ subscriptions.put(consumerID, subscription);
+ // FIXME not very smart: since we can't start the consumer, we start the session
+ // everytime to start the new consumer (and all previous consumers...)
+ session.start();
+ }
+ }
+
+ public void unsubscribe(String destination) throws Exception
+ {
+ Iterator<Entry<Long, StompSubscription>> iterator = subscriptions.entrySet().iterator();
+ while (iterator.hasNext())
+ {
+ Map.Entry<Long, StompSubscription> entry = (Map.Entry<Long, StompSubscription>)iterator.next();
+ long consumerID = entry.getKey();
+ StompSubscription sub = entry.getValue();
+ if (sub.getDestination().equals(destination))
+ {
+ iterator.remove();
+ session.closeConsumer(consumerID);
+ }
+ }
+ }
+}
\ No newline at end of file
Deleted: trunk/src/main/org/hornetq/core/protocol/stomp/StompSessionCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSessionCallback.java 2010-01-22 15:32:13 UTC (rev 8843)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSessionCallback.java 2010-01-26 13:31:50 UTC (rev 8844)
@@ -1,93 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.core.protocol.stomp;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.HornetQBuffers;
-import org.hornetq.api.core.Message;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.core.server.ServerMessage;
-import org.hornetq.spi.core.protocol.RemotingConnection;
-import org.hornetq.spi.core.protocol.SessionCallback;
-
-/**
- * A StompSessionCallback
- *
- * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
- */
-class StompSessionCallback implements SessionCallback
-{
- private final RemotingConnection connection;
-
- private final StompMarshaller marshaller;
-
- StompSessionCallback(final StompMarshaller marshaller, final RemotingConnection connection)
- {
- this.marshaller = marshaller;
- this.connection = connection;
- }
-
- public void sendProducerCreditsMessage(int credits, SimpleString address, int offset)
- {
- }
-
- public int sendMessage(ServerMessage serverMessage, long consumerID, int deliveryCount)
- {
- try
- {
- Map<String, Object> headers = new HashMap<String, Object>();
- headers.put(Stomp.Headers.Message.DESTINATION, StompUtils.toStompDestination(serverMessage.getAddress()
- .toString()));
- byte[] data = new byte[] {};
- if (serverMessage.getType() == Message.TEXT_TYPE)
- {
- SimpleString text = serverMessage.getBodyBuffer().readNullableSimpleString();
- if (text != null)
- {
- data = text.toString().getBytes();
- }
- }
- StompFrame msg = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
- System.out.println("SENDING : " + msg);
- byte[] bytes = marshaller.marshal(msg);
- HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
- connection.getTransportConnection().write(buffer, true);
-
- return bytes.length;
-
- }
- catch (Exception e)
- {
- e.printStackTrace();
- return 0;
- }
-
- }
-
- public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean continues, boolean requiresResponse)
- {
- return 0;
- }
-
- public int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize, int deliveryCount)
- {
- return 0;
- }
-
- public void closed()
- {
- }
-}
\ No newline at end of file
Added: trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java 2010-01-26 13:31:50 UTC (rev 8844)
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.stomp;
+
+/**
+ * A StompSubscription
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+public class StompSubscription
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final long consumerID;
+
+ private final String subID;
+
+ private final String destination;
+
+ private final String ack;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public StompSubscription(long consumerID, String subID, String destination, String ack)
+ {
+ this.consumerID = consumerID;
+ this.subID = subID;
+ this.destination = destination;
+ this.ack = ack;
+ }
+
+ // Public --------------------------------------------------------
+
+ public String getAck()
+ {
+ return ack;
+ }
+
+ public String getDestination()
+ {
+ return destination;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java 2010-01-22 15:32:13 UTC (rev 8843)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java 2010-01-26 13:31:50 UTC (rev 8844)
@@ -13,7 +13,16 @@
package org.hornetq.core.protocol.stomp;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.client.impl.ClientMessageImpl;
+import org.hornetq.core.server.impl.ServerMessageImpl;
/**
* A StompUtils
@@ -71,9 +80,9 @@
}
else
{
- throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal destination name: [" + stompDestination +
- "] -- StompConnect destinations " +
- "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
+ // it is also possible the STOMP client send a message directly to a HornetQ address
+ // in that case, we do nothing:
+ return stompDestination;
}
}
@@ -101,17 +110,85 @@
}
else
{
- throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal address name: [" + hornetqAddress +
- "] -- Acceptable address must comply to JMS semantics");
+ // do nothing
+ return hornetqAddress;
}
}
-
- private static String convert(String str, String oldPrefix, String newPrefix)
+
+ private static String convert(String str, String oldPrefix, String newPrefix)
{
String sub = str.substring(oldPrefix.length(), str.length());
return new String(newPrefix + sub);
}
+ public static void copyStandardHeadersFromFrameToMessage(StompFrame frame, ServerMessageImpl msg) throws Exception
+ {
+ Map<String, Object> headers = new HashMap<String, Object>(frame.getHeaders());
+
+ String priority = (String)headers.remove(Stomp.Headers.Send.PRIORITY);
+ if (priority != null)
+ {
+ msg.setPriority(Byte.parseByte(priority));
+ }
+ String persistent = (String)headers.remove(Stomp.Headers.Send.PERSISTENT);
+ if (persistent != null)
+ {
+ msg.setDurable(Boolean.parseBoolean(persistent));
+ }
+ // FIXME should use a proper constant
+ msg.putObjectProperty("JMSCorrelationID", headers.remove(Stomp.Headers.Send.CORRELATION_ID));
+ msg.putObjectProperty("JMSType", headers.remove(Stomp.Headers.Send.TYPE));
+
+ String groupID = (String)headers.remove("JMSXGroupID");
+ if (groupID != null)
+ {
+ msg.putStringProperty(Message.HDR_GROUP_ID, SimpleString.toSimpleString(groupID));
+ }
+ Object o = headers.remove(Stomp.Headers.Send.REPLY_TO);
+ if (o != null)
+ {
+ msg.putStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME, SimpleString.toSimpleString((String)o));
+ }
+
+ // now the general headers
+ for (Iterator<Map.Entry<String, Object>> iter = headers.entrySet().iterator(); iter.hasNext();)
+ {
+ Map.Entry<String, Object> entry = iter.next();
+ String name = (String)entry.getKey();
+ Object value = entry.getValue();
+ System.out.println(name + "=" + value);
+ msg.putObjectProperty(name, value);
+ }
+ }
+
+ public static void copyStandardHeadersFromMessageToFrame(Message message, StompFrame command, int deliveryCount) throws Exception {
+ final Map<String, Object> headers = command.getHeaders();
+ headers.put(Stomp.Headers.Message.DESTINATION, toStompDestination(message.getAddress().toString()));
+ headers.put(Stomp.Headers.Message.MESSAGE_ID, message.getMessageID());
+
+ if (message.getObjectProperty("JMSCorrelationID") != null) {
+ headers.put(Stomp.Headers.Message.CORRELATION_ID, message.getObjectProperty("JMSCorrelationID"));
+ }
+ headers.put(Stomp.Headers.Message.EXPIRATION_TIME, "" + message.getExpiration());
+ headers.put(Stomp.Headers.Message.REDELIVERED, deliveryCount > 1);
+ headers.put(Stomp.Headers.Message.PRORITY, "" + message.getPriority());
+
+ if (message.getStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME) != null) {
+ headers.put(Stomp.Headers.Message.REPLY_TO, toStompDestination(message.getStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME)));
+ }
+ headers.put(Stomp.Headers.Message.TIMESTAMP, "" + message.getTimestamp());
+
+ if (message.getObjectProperty("JMSType") != null) {
+ headers.put(Stomp.Headers.Message.TYPE, message.getObjectProperty("JMSType"));
+ }
+
+ // now lets add all the message headers
+ Set<SimpleString> names = message.getPropertyNames();
+ for (SimpleString name : names)
+ {
+ headers.put(name.toString(), message.getObjectProperty(name));
+ }
+ }
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-01-22 15:32:13 UTC (rev 8843)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-01-26 13:31:50 UTC (rev 8844)
@@ -40,7 +40,7 @@
Object getConnectionID();
- void removeConsumer(ServerConsumer consumer) throws Exception;
+ void removeConsumer(long consumerID) throws Exception;
void acknowledge(long consumerID, long messageID) throws Exception;
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-01-22 15:32:13 UTC (rev 8843)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-01-26 13:31:50 UTC (rev 8844)
@@ -284,7 +284,7 @@
messageQueue.removeConsumer(this);
}
- session.removeConsumer(this);
+ session.removeConsumer(id);
LinkedList<MessageReference> refs = cancelRefs(false, null);
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-01-22 15:32:13 UTC (rev 8843)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-01-26 13:31:50 UTC (rev 8844)
@@ -228,11 +228,11 @@
return remotingConnection.getID();
}
- public void removeConsumer(final ServerConsumer consumer) throws Exception
+ public void removeConsumer(final long consumerID) throws Exception
{
- if (consumers.remove(consumer.getID()) == null)
+ if (consumers.remove(consumerID) == null)
{
- throw new IllegalStateException("Cannot find consumer with id " + consumer.getID() + " to remove");
+ throw new IllegalStateException("Cannot find consumer with id " + consumerID + " to remove");
}
}
Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-22 15:32:13 UTC (rev 8843)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-26 13:31:50 UTC (rev 8844)
@@ -31,6 +31,7 @@
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@@ -217,7 +218,7 @@
Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
}
- public void _testJMSXGroupIdCanBeSet() throws Exception {
+ public void testJMSXGroupIdCanBeSet() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@@ -242,11 +243,11 @@
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
- // TODO do we support it?
- //Assert.assertEquals("TEST", ((TextMessage) message).getGroupID());
+ // differ from StompConnect
+ Assert.assertEquals("TEST", ((TextMessage) message).getStringProperty("JMSXGroupID"));
}
- public void _testSendMessageWithCustomHeadersAndSelector() throws Exception {
+ public void testSendMessageWithCustomHeadersAndSelector() throws Exception {
MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'");
@@ -277,7 +278,7 @@
Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
}
- public void _testSendMessageWithStandardHeaders() throws Exception {
+ public void testSendMessageWithStandardHeaders() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@@ -294,6 +295,7 @@
frame =
"SEND\n" +
"correlation-id:c123\n" +
+ "persistent:true\n" +
"priority:3\n" +
"type:t345\n" +
"JMSXGroupID:abc\n" +
@@ -311,6 +313,7 @@
Assert.assertEquals("JMSCorrelationID", "c123", message.getJMSCorrelationID());
Assert.assertEquals("getJMSType", "t345", message.getJMSType());
Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority());
+ Assert.assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
@@ -350,9 +353,15 @@
"\n\n" +
Stomp.NULL;
sendFrame(frame);
+
+ // message should not be received as it was auto-acked
+ MessageConsumer consumer = session.createConsumer(queue);
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNull(message);
+
}
- public void _testSubscribeWithAutoAckAndBytesMessage() throws Exception {
+ public void testSubscribeWithAutoAckAndBytesMessage() throws Exception {
String frame =
"CONNECT\n" +
@@ -371,7 +380,8 @@
Stomp.NULL;
sendFrame(frame);
- sendBytesMessage(new byte[]{1, 2, 3, 4, 5});
+ byte[] payload = new byte[]{1, 2, 3, 4, 5};
+ sendBytesMessage(payload);
frame = receiveFrame(10000);
Assert.assertTrue(frame.startsWith("MESSAGE"));
@@ -382,7 +392,8 @@
Assert.assertEquals("5", cl_matcher.group(1));
Assert.assertFalse(Pattern.compile("type:\\s*null", Pattern.CASE_INSENSITIVE).matcher(frame).find());
-
+ Assert.assertTrue(frame.indexOf(new String(payload)) > -1);
+
frame =
"DISCONNECT\n" +
"\n\n" +
@@ -390,7 +401,7 @@
sendFrame(frame);
}
- public void _testSubscribeWithMessageSentWithProperties() throws Exception {
+ public void testSubscribeWithMessageSentWithProperties() throws Exception {
String frame =
"CONNECT\n" +
@@ -422,6 +433,8 @@
producer.send(message);
frame = receiveFrame(10000);
+ Assert.assertNotNull(frame);
+ System.out.println(frame);
Assert.assertTrue(frame.startsWith("MESSAGE"));
Assert.assertTrue(frame.indexOf("S:") > 0);
Assert.assertTrue(frame.indexOf("n:") > 0);
@@ -442,7 +455,7 @@
sendFrame(frame);
}
- public void _testMessagesAreInOrder() throws Exception {
+ public void testMessagesAreInOrder() throws Exception {
int ctr = 10;
String[] data = new String[ctr];
@@ -493,7 +506,7 @@
sendFrame(frame);
}
- public void _testSubscribeWithAutoAckAndSelector() throws Exception {
+ public void testSubscribeWithAutoAckAndSelector() throws Exception {
String frame =
"CONNECT\n" +
@@ -527,8 +540,54 @@
sendFrame(frame);
}
- public void _testSubscribeWithClientAck() throws Exception {
+ public void testSubscribeWithClientAck() throws Exception {
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "ack:client\n\n" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+
+ sendMessage(getName());
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Pattern cl = Pattern.compile("message-id:\\s*(\\d+)", Pattern.CASE_INSENSITIVE);
+ Matcher cl_matcher = cl.matcher(frame);
+ Assert.assertTrue(cl_matcher.find());
+ String messageID = cl_matcher.group(1);
+
+ frame =
+ "ACK\n" +
+ "message-id: " + messageID + "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ // message should not be received since message was acknowledged by the client
+ MessageConsumer consumer = session.createConsumer(queue);
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNull(message);
+ }
+
+ public void testRedeliveryWithClientAck() throws Exception {
+
String frame =
"CONNECT\n" +
"login: brianm\n" +
@@ -546,6 +605,7 @@
Stomp.NULL;
sendFrame(frame);
+
sendMessage(getName());
frame = receiveFrame(10000);
Assert.assertTrue(frame.startsWith("MESSAGE"));
@@ -563,11 +623,11 @@
Assert.assertTrue(message.getJMSRedelivered());
}
- public void _testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws Exception {
+ public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws Exception {
assertSubscribeWithClientAckThenConsumeWithAutoAck(false);
}
- public void _testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithExplicitDisconnect() throws Exception {
+ public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithExplicitDisconnect() throws Exception {
assertSubscribeWithClientAckThenConsumeWithAutoAck(true);
}
@@ -663,7 +723,7 @@
Assert.assertTrue(frame.contains("shouldBeNextMessage"));
}
- public void _testUnsubscribe() throws Exception {
+ public void testUnsubscribe() throws Exception {
String frame =
"CONNECT\n" +
@@ -685,7 +745,7 @@
sendMessage("first message");
//receive message from socket
- frame = receiveFrame(1000);
+ frame = receiveFrame(10000);
Assert.assertTrue(frame.startsWith("MESSAGE"));
//remove suscription
@@ -711,7 +771,7 @@
}
}
- public void _testTransactionCommit() throws Exception {
+ public void testTransactionCommit() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
String frame =
@@ -757,7 +817,7 @@
Assert.assertNotNull("Should have received a message", message);
}
- public void _testTransactionRollback() throws Exception {
+ public void testTransactionRollback() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
String frame =
14 years, 11 months
JBoss hornetq SVN: r8843 - in trunk: src/main/org/hornetq/api/core and 19 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-01-22 10:32:13 -0500 (Fri, 22 Jan 2010)
New Revision: 8843
Added:
trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java
trunk/src/main/org/hornetq/core/protocol/stomp/
trunk/src/main/org/hornetq/core/protocol/stomp/Stomp.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompException.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDelimiter.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManagerFactory.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompSessionCallback.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
trunk/tests/src/org/hornetq/tests/integration/stomp/
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Modified:
trunk/build-hornetq.xml
trunk/src/main/org/hornetq/api/core/Message.java
trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManager.java
trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
trunk/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
trunk/src/main/org/hornetq/core/server/HornetQServer.java
trunk/src/main/org/hornetq/core/server/ServerSession.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java
trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java
trunk/src/main/org/hornetq/integration/transports/netty/TransportConstants.java
trunk/src/main/org/hornetq/jms/client/HornetQBytesMessage.java
trunk/src/main/org/hornetq/jms/client/HornetQMapMessage.java
trunk/src/main/org/hornetq/jms/client/HornetQMessage.java
trunk/src/main/org/hornetq/jms/client/HornetQObjectMessage.java
trunk/src/main/org/hornetq/jms/client/HornetQStreamMessage.java
trunk/src/main/org/hornetq/jms/client/HornetQTextMessage.java
trunk/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java
trunk/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java
trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
trunk/tests/src/org/hornetq/tests/integration/remoting/AardvarkProtocolTest.java
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* bootstrap of the Stomp protocol implementation
Modified: trunk/build-hornetq.xml
===================================================================
--- trunk/build-hornetq.xml 2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/build-hornetq.xml 2010-01-22 15:32:13 UTC (rev 8843)
@@ -1540,23 +1540,20 @@
</target>
<target name="debugServer" depends="jar">
+ <mkdir dir="logs"/>
<java classname="org.hornetq.integration.bootstrap.HornetQBootstrapServer" fork="true">
<jvmarg value="-XX:+UseParallelGC"/>
<jvmarg value="-Xms512M"/>
<jvmarg value="-Xmx2048M"/>
<jvmarg value="-XX:+AggressiveOpts"/>
<jvmarg value="-XX:+UseFastAccessorMethods"/>
- <jvmarg value="-Xdebug"/>
- <jvmarg value="-Xnoagent"/>
- <jvmarg value="-Djava.compiler=NONE"/>
<jvmarg value="-Dcom.sun.management.jmxremote"/>
- <jvmarg value="-Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000"/>
- <jvmarg value="-Djava.util.logging.config.file=${src.config.standalone.non-clustered.dir}/logging.properties"/>
+ <jvmarg value="-Djava.util.logging.config.file=${src.config.trunk.non-clustered.dir}/logging.properties"/>
<jvmarg value="-Djava.library.path=${native.bin.dir}"/>
- <jvmarg value="-Djava.naming.factory.initial=org.jnp.interfaces.NamingContextFactory"/>
- <jvmarg value="-Djava.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces"/>
+ <jvmarg line="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000"/>
<arg line="hornetq-beans.xml"/>
- <classpath path="${src.config.standalone.non-clustered.dir}" />
+ <classpath path="${src.config.trunk.non-clustered.dir}" />
+ <classpath refid="jms.standalone.server.classpath"/>
</java>
</target>
Modified: trunk/src/main/org/hornetq/api/core/Message.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/Message.java 2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/api/core/Message.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -70,6 +70,18 @@
public static final SimpleString HDR_LAST_VALUE_NAME = new SimpleString("_HQ_LVQ_NAME");
+ public static final byte DEFAULT_TYPE = 0;
+
+ public static final byte OBJECT_TYPE = 2;
+
+ public static final byte TEXT_TYPE = 3;
+
+ public static final byte BYTES_TYPE = 4;
+
+ public static final byte MAP_TYPE = 5;
+
+ public static final byte STREAM_TYPE = 6;
+
/**
* Returns the messageID.
* <br>
Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -252,6 +252,12 @@
return type;
}
+ public void setType(byte type)
+ {
+ this.type = type;
+ }
+
+
public boolean isDurable()
{
return durable;
Modified: trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManager.java 2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManager.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -56,6 +56,10 @@
return new ConnectionEntry(conn, 0, 0);
}
+
+ public void removeHandler(String name)
+ {
+ }
public void handleBuffer(final RemotingConnection conn, final HornetQBuffer buffer)
{
@@ -69,10 +73,9 @@
true,
true,
true,
- false);
+ false,
+ new AardvarkSessionCallback(conn.getTransportConnection()));
- session.setCallback(new AardvarkSessionCallback(conn.getTransportConnection()));
-
final SimpleString name = new SimpleString("hornetq.aardvark");
session.createQueue(name, name, null, false, false);
Modified: trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -50,13 +50,11 @@
import javax.transaction.xa.Xid;
import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.SimpleString;
import org.hornetq.core.exception.HornetQXAException;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.protocol.core.impl.CoreProtocolManager;
import org.hornetq.core.protocol.core.impl.PacketImpl;
import org.hornetq.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
@@ -71,12 +69,8 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionDeleteQueueMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionExpiredMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionForceConsumerDelivery;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
@@ -100,7 +94,6 @@
import org.hornetq.core.server.QueueQueryResult;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.ServerSession;
-import org.hornetq.spi.core.protocol.SessionCallback;
/**
* A ServerSessionPacketHandler
@@ -110,12 +103,10 @@
* @author <a href="mailto:andy.taylor@jboss.org>Andy Taylor</a>
* @author <a href="mailto:clebert.suconic@jboss.org>Clebert Suconic</a>
*/
-public class ServerSessionPacketHandler implements ChannelHandler, CloseListener, FailureListener, SessionCallback
+public class ServerSessionPacketHandler implements ChannelHandler, CloseListener, FailureListener
{
private static final Logger log = Logger.getLogger(ServerSessionPacketHandler.class);
- private final CoreProtocolManager protocolManager;
-
private final ServerSession session;
private final OperationContext sessionContext;
@@ -127,14 +118,11 @@
private volatile CoreRemotingConnection remotingConnection;
- public ServerSessionPacketHandler(final CoreProtocolManager protocolManager,
- final ServerSession session,
+ public ServerSessionPacketHandler(final ServerSession session,
final OperationContext sessionContext,
final StorageManager storageManager,
final Channel channel)
{
- this.protocolManager = protocolManager;
-
this.session = session;
this.storageManager = storageManager;
@@ -567,46 +555,7 @@
channel.close();
}
}
-
- public int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize, int deliveryCount)
- {
- Packet packet = new SessionReceiveLargeMessage(consumerID, headerBuffer, bodySize, deliveryCount);
-
- channel.send(packet);
-
- return packet.getPacketSize();
- }
-
- public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean continues, boolean requiresResponse)
- {
- Packet packet = new SessionReceiveContinuationMessage(consumerID, body, continues, requiresResponse);
-
- channel.send(packet);
-
- return packet.getPacketSize();
- }
-
- public int sendMessage(ServerMessage message, long consumerID, int deliveryCount)
- {
- Packet packet = new SessionReceiveMessage(consumerID, message, deliveryCount);
-
- channel.send(packet);
-
- return packet.getPacketSize();
- }
-
- public void sendProducerCreditsMessage(int credits, SimpleString address, int offset)
- {
- Packet packet = new SessionProducerCreditsMessage(credits, address, offset);
-
- channel.send(packet);
- }
- public void closed()
- {
- protocolManager.removeHandler(session.getName());
- }
-
public int transferConnection(final CoreRemotingConnection newConnection, final int lastReceivedCommandID)
{
// We need to disable delivery on all the consumers while the transfer is occurring- otherwise packets might get
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -30,6 +30,7 @@
import org.hornetq.core.server.HornetQServer;
import org.hornetq.spi.core.protocol.ConnectionEntry;
import org.hornetq.spi.core.protocol.ProtocolManager;
+import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.spi.core.remoting.Connection;
/**
@@ -120,7 +121,7 @@
sessionHandlers.remove(name);
}
- public void bufferReceived(Object connectionID, HornetQBuffer buffer)
+ public void handleBuffer(RemotingConnection connection, HornetQBuffer buffer)
{
}
Added: trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.core.impl;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveMessage;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.spi.core.protocol.ProtocolManager;
+import org.hornetq.spi.core.protocol.SessionCallback;
+
+/**
+ * A CoreSessionCallback
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+public final class CoreSessionCallback implements SessionCallback
+{
+ private final Channel channel;
+
+ private ProtocolManager protocolManager;
+
+ private String name;
+
+ public CoreSessionCallback(String name, ProtocolManager protocolManager, Channel channel)
+ {
+ this.name = name;
+ this.protocolManager = protocolManager;
+ this.channel = channel;
+ }
+
+ public int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize, int deliveryCount)
+ {
+ Packet packet = new SessionReceiveLargeMessage(consumerID, headerBuffer, bodySize, deliveryCount);
+
+ channel.send(packet);
+
+ return packet.getPacketSize();
+ }
+
+ public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean continues, boolean requiresResponse)
+ {
+ Packet packet = new SessionReceiveContinuationMessage(consumerID, body, continues, requiresResponse);
+
+ channel.send(packet);
+
+ return packet.getPacketSize();
+ }
+
+ public int sendMessage(ServerMessage message, long consumerID, int deliveryCount)
+ {
+ Packet packet = new SessionReceiveMessage(consumerID, message, deliveryCount);
+
+ channel.send(packet);
+
+ return packet.getPacketSize();
+ }
+
+ public void sendProducerCreditsMessage(int credits, SimpleString address, int offset)
+ {
+ Packet packet = new SessionProducerCreditsMessage(credits, address, offset);
+
+ channel.send(packet);
+ }
+
+ public void closed()
+ {
+ protocolManager.removeHandler(name);
+ }
+}
\ No newline at end of file
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -163,18 +163,15 @@
request.isAutoCommitSends(),
request.isAutoCommitAcks(),
request.isPreAcknowledge(),
- request.isXA());
+ request.isXA(),
+ new CoreSessionCallback(request.getName(), protocolManager, channel));
- ServerSessionPacketHandler handler = new ServerSessionPacketHandler(protocolManager,
- session,
+ ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session,
server.getStorageManager()
.newContext(server.getExecutorFactory()
.getExecutor()),
server.getStorageManager(),
channel);
-
- session.setCallback(handler);
-
channel.setHandler(handler);
// TODO - where is this removed?
Added: trunk/src/main/org/hornetq/core/protocol/stomp/Stomp.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/Stomp.java (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/Stomp.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -0,0 +1,124 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+
+/**
+ * The standard verbs and headers used for the <a href="http://stomp.codehaus.org/">STOMP</a> protocol.
+ *
+ * @version $Revision: 57 $
+ */
+public interface Stomp {
+ String NULL = "\u0000";
+ String NEWLINE = "\n";
+
+ public static interface Commands {
+ String CONNECT = "CONNECT";
+ String SEND = "SEND";
+ String DISCONNECT = "DISCONNECT";
+ String SUBSCRIBE = "SUBSCRIBE";
+ String UNSUBSCRIBE = "UNSUBSCRIBE";
+ String BEGIN_TRANSACTION = "BEGIN";
+ String COMMIT_TRANSACTION = "COMMIT";
+ String ABORT_TRANSACTION = "ABORT";
+ String BEGIN = "BEGIN";
+ String COMMIT = "COMMIT";
+ String ABORT = "ABORT";
+ String ACK = "ACK";
+ }
+
+ public interface Responses {
+ String CONNECTED = "CONNECTED";
+ String ERROR = "ERROR";
+ String MESSAGE = "MESSAGE";
+ String RECEIPT = "RECEIPT";
+ }
+
+ public interface Headers {
+ String SEPERATOR = ":";
+ String RECEIPT_REQUESTED = "receipt";
+ String TRANSACTION = "transaction";
+ String CONTENT_LENGTH = "content-length";
+
+ public interface Response {
+ String RECEIPT_ID = "receipt-id";
+ }
+
+ public interface Send {
+ String DESTINATION = "destination";
+ String CORRELATION_ID = "correlation-id";
+ String REPLY_TO = "reply-to";
+ String EXPIRATION_TIME = "expires";
+ String PRIORITY = "priority";
+ String TYPE = "type";
+ Object PERSISTENT = "persistent";
+ }
+
+ public interface Message {
+ String MESSAGE_ID = "message-id";
+ String DESTINATION = "destination";
+ String CORRELATION_ID = "correlation-id";
+ String EXPIRATION_TIME = "expires";
+ String REPLY_TO = "reply-to";
+ String PRORITY = "priority";
+ String REDELIVERED = "redelivered";
+ String TIMESTAMP = "timestamp";
+ String TYPE = "type";
+ String SUBSCRIPTION = "subscription";
+ }
+
+ public interface Subscribe {
+ String DESTINATION = "destination";
+ String ACK_MODE = "ack";
+ String ID = "id";
+ String SELECTOR = "selector";
+ String DURABLE_SUBSCRIPTION_NAME = "durable-subscription-name";
+ String NO_LOCAL = "no-local";
+
+ public interface AckModeValues {
+ String AUTO = "auto";
+ String CLIENT = "client";
+ }
+ }
+
+ public interface Unsubscribe {
+ String DESTINATION = "destination";
+ String ID = "id";
+ }
+
+ public interface Connect {
+ String LOGIN = "login";
+ String PASSCODE = "passcode";
+ String CLIENT_ID = "client-id";
+ String REQUEST_ID = "request-id";
+ }
+
+ public interface Error {
+ String MESSAGE = "message";
+ }
+
+ public interface Connected {
+ String SESSION = "session";
+ String RESPONSE_ID = "response-id";
+ }
+
+ public interface Ack {
+ String MESSAGE_ID = "message-id";
+ }
+ }
+}
Added: trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.stomp;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.CloseListener;
+import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.spi.core.protocol.ProtocolManager;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.remoting.Connection;
+
+/**
+ * A StompConnection
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+public class StompConnection implements RemotingConnection
+{
+ private static final Logger log = Logger.getLogger(StompConnection.class);
+
+ private final ProtocolManager manager;
+
+ private final Connection transportConnection;
+
+ StompConnection(final Connection transportConnection, final ProtocolManager manager)
+ {
+ this.transportConnection = transportConnection;
+
+ this.manager = manager;
+ }
+
+ public void addCloseListener(CloseListener listener)
+ {
+ }
+
+ public void addFailureListener(FailureListener listener)
+ {
+ }
+
+ public boolean checkDataReceived()
+ {
+ return true;
+ }
+
+ public HornetQBuffer createBuffer(int size)
+ {
+ return HornetQBuffers.dynamicBuffer(size);
+ }
+
+ public void destroy()
+ {
+ }
+
+ public void disconnect()
+ {
+ }
+
+ public void fail(HornetQException me)
+ {
+ }
+
+ public void flush()
+ {
+ }
+
+ public List<FailureListener> getFailureListeners()
+ {
+ return Collections.EMPTY_LIST;
+ }
+
+ public Object getID()
+ {
+ return transportConnection.getID();
+ }
+
+ public String getRemoteAddress()
+ {
+ return transportConnection.getRemoteAddress();
+ }
+
+ public Connection getTransportConnection()
+ {
+ return transportConnection;
+ }
+
+ public boolean isClient()
+ {
+ return false;
+ }
+
+ public boolean isDestroyed()
+ {
+ return false;
+ }
+
+ public boolean removeCloseListener(CloseListener listener)
+ {
+ return false;
+ }
+
+ public boolean removeFailureListener(FailureListener listener)
+ {
+ return false;
+ }
+
+ public void setFailureListeners(List<FailureListener> listeners)
+ {
+ }
+
+
+ public void bufferReceived(Object connectionID, HornetQBuffer buffer)
+ {
+ manager.handleBuffer(this, buffer);
+ }
+
+ public int isReadyToHandle(HornetQBuffer buffer)
+ {
+ return -1;
+ }
+
+}
Added: trunk/src/main/org/hornetq/core/protocol/stomp/StompException.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompException.java (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompException.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -0,0 +1,50 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+import java.io.IOException;
+
+/**
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+class StompException extends IOException {
+ private static final long serialVersionUID = -2869735532997332242L;
+ private final boolean fatal;
+
+ public StompException() {
+ this(null);
+ }
+
+ public StompException(String s) {
+ this(s, false);
+ }
+
+ public StompException(String s, boolean fatal) {
+ this(s, fatal, null);
+ }
+
+ public StompException(String s, boolean fatal, Throwable cause) {
+ super(s);
+ this.fatal = fatal;
+ initCause(cause);
+ }
+
+ public boolean isFatal() {
+ return fatal;
+ }
+}
Added: trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -0,0 +1,70 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Represents all the data in a STOMP frame.
+ *
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+class StompFrame
+{
+ private static final byte[] NO_DATA = new byte[] {};
+
+ private String command;
+
+ private Map<String, Object> headers;
+
+ private byte[] content = StompFrame.NO_DATA;
+
+ public StompFrame()
+ {
+ this.headers = new HashMap<String, Object>();
+ }
+
+ public StompFrame(String command, Map<String, Object> headers, byte[] data)
+ {
+ this.command = command;
+ this.headers = headers;
+ this.content = data;
+ }
+
+ public String getCommand()
+ {
+ return command;
+ }
+
+ public byte[] getContent()
+ {
+ return content;
+ }
+
+ public Map<String, Object> getHeaders()
+ {
+ return headers;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "StompFrame[command=" + command + ", headers=" + headers + ",content-length=" + content.length + "]";
+ }
+}
Added: trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDelimiter.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDelimiter.java (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDelimiter.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.stomp;
+
+import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
+import org.jboss.netty.handler.codec.frame.Delimiters;
+
+/**
+ * A StompFrameDelimiter
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ */
+public class StompFrameDelimiter extends DelimiterBasedFrameDecoder
+{
+
+ private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
+
+ public StompFrameDelimiter()
+ {
+ super(MAX_DATA_LENGTH, false, Delimiters.nulDelimiter());
+ }
+}
Added: trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -0,0 +1,35 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+/**
+ * Command indicating that an invalid Stomp Frame was received.
+ *
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+class StompFrameError extends StompFrame {
+ private final StompException exception;
+
+ public StompFrameError(StompException exception) {
+ this.exception = exception;
+ }
+
+ public StompException getException() {
+ return exception;
+ }
+}
Added: trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -0,0 +1,191 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.hornetq.api.core.HornetQBuffer;
+
+/**
+ * Implements marshalling and unmarsalling the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
+ */
+class StompMarshaller {
+ private static final byte[] NO_DATA = new byte[]{};
+ private static final byte[] END_OF_FRAME = new byte[]{0, '\n'};
+ private static final int MAX_COMMAND_LENGTH = 1024;
+ private static final int MAX_HEADER_LENGTH = 1024 * 10;
+ private static final int MAX_HEADERS = 1000;
+ private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
+ private int version = 1;
+
+ public int getVersion() {
+ return version;
+ }
+
+ public void setVersion(int version) {
+ this.version = version;
+ }
+
+ public byte[] marshal(StompFrame command) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ marshal(command, dos);
+ dos.close();
+ return baos.toByteArray();
+ }
+
+ public void marshal(StompFrame stomp, DataOutput os) throws IOException {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append(stomp.getCommand());
+ buffer.append(Stomp.NEWLINE);
+
+ // Output the headers.
+ for (Iterator iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();) {
+ Map.Entry entry = (Map.Entry) iter.next();
+ buffer.append(entry.getKey());
+ buffer.append(Stomp.Headers.SEPERATOR);
+ buffer.append(entry.getValue());
+ buffer.append(Stomp.NEWLINE);
+ }
+
+ // Add a newline to seperate the headers from the content.
+ buffer.append(Stomp.NEWLINE);
+
+ os.write(buffer.toString().getBytes("UTF-8"));
+ os.write(stomp.getContent());
+ os.write(END_OF_FRAME);
+ }
+
+ public StompFrame unmarshal(HornetQBuffer in) throws IOException {
+
+ try {
+ String action = null;
+
+ // skip white space to next real action line
+ while (true) {
+ action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
+ if (action == null) {
+ throw new IOException("connection was closed");
+ }
+ else {
+ action = action.trim();
+ if (action.length() > 0) {
+ break;
+ }
+ }
+ }
+
+ // Parse the headers
+ HashMap headers = new HashMap(25);
+ while (true) {
+ String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
+ if (line != null && line.trim().length() > 0) {
+
+ if (headers.size() > MAX_HEADERS) {
+ throw new StompException("The maximum number of headers was exceeded", true);
+ }
+
+ try {
+ int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR);
+ String name = line.substring(0, seperator_index).trim();
+ String value = line.substring(seperator_index + 1, line.length()).trim();
+ headers.put(name, value);
+ }
+ catch (Exception e) {
+ throw new StompException("Unable to parser header line [" + line + "]", true);
+ }
+ }
+ else {
+ break;
+ }
+ }
+
+ // Read in the data part.
+ byte[] data = NO_DATA;
+ String contentLength = (String) headers.get(Stomp.Headers.CONTENT_LENGTH);
+ if (contentLength != null) {
+
+ // Bless the client, he's telling us how much data to read in.
+ int length;
+ try {
+ length = Integer.parseInt(contentLength.trim());
+ }
+ catch (NumberFormatException e) {
+ throw new StompException("Specified content-length is not a valid integer", true);
+ }
+
+ if (length > MAX_DATA_LENGTH) {
+ throw new StompException("The maximum data length was exceeded", true);
+ }
+
+ data = new byte[length];
+ in.readBytes(data);
+
+ if (in.readByte() != 0) {
+ throw new StompException(Stomp.Headers.CONTENT_LENGTH + " bytes were read and " + "there was no trailing null byte", true);
+ }
+ }
+ else {
+
+ // We don't know how much to read.. data ends when we hit a 0
+ byte b;
+ ByteArrayOutputStream baos = null;
+ while (in.readableBytes() > 0 && (b = in.readByte()) != 0) {
+
+ if (baos == null) {
+ baos = new ByteArrayOutputStream();
+ }
+ else if (baos.size() > MAX_DATA_LENGTH) {
+ throw new StompException("The maximum data length was exceeded", true);
+ }
+
+ baos.write(b);
+ }
+
+ if (baos != null) {
+ baos.close();
+ data = baos.toByteArray();
+ }
+ }
+
+ return new StompFrame(action, headers, data);
+ }
+ catch (StompException e) {
+ return new StompFrameError(e);
+ }
+ }
+
+ protected String readLine(HornetQBuffer in, int maxLength, String errorMessage) throws IOException {
+ byte b;
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(maxLength);
+ while ((b = in.readByte()) != '\n') {
+ if (baos.size() > maxLength) {
+ throw new StompException(errorMessage, true);
+ }
+ baos.write(b);
+ }
+ byte[] sequence = baos.toByteArray();
+ return new String(sequence, "UTF-8");
+ }
+}
Added: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -0,0 +1,295 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.stomp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.spi.core.protocol.ConnectionEntry;
+import org.hornetq.spi.core.protocol.ProtocolManager;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.remoting.Connection;
+import org.hornetq.utils.UUIDGenerator;
+
+/**
+ * StompProtocolManager
+ *
+ * A stupid protocol to demonstrate how to implement a new protocol in HornetQ
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class StompProtocolManager implements ProtocolManager
+{
+ private static final Logger log = Logger.getLogger(StompProtocolManager.class);
+
+ private final HornetQServer server;
+
+ private final StompMarshaller marshaller;
+
+ private final Map<RemotingConnection, ServerSession> sessions = new HashMap<RemotingConnection, ServerSession>();
+
+ public StompProtocolManager(final HornetQServer server, final List<Interceptor> interceptors)
+ {
+ this.server = server;
+ this.marshaller = new StompMarshaller();
+ }
+
+ public ConnectionEntry createConnectionEntry(final Connection connection)
+ {
+ StompConnection conn = new StompConnection(connection, this);
+
+ return new ConnectionEntry(conn, 0, 0);
+ }
+
+ public void removeHandler(String name)
+ {
+ }
+
+ public void handleBuffer(RemotingConnection connection, HornetQBuffer buffer)
+ {
+ StompFrame frame = null;
+ try
+ {
+ frame = marshaller.unmarshal(buffer);
+ System.out.println("RECEIVED " + frame);
+
+ String command = frame.getCommand();
+
+ StompFrame response = null;
+ if (Stomp.Commands.CONNECT.equals(command))
+ {
+ response = onConnect(frame, server, connection);
+ }
+ else if (Stomp.Commands.DISCONNECT.equals(command))
+ {
+ response = onDisconnect(frame, server, connection);
+ }
+ else if (Stomp.Commands.SEND.equals(command))
+ {
+ response = onSend(frame, server, connection);
+ }
+ else if (Stomp.Commands.SUBSCRIBE.equals(command))
+ {
+ response = onSubscribe(frame, server, connection);
+ }
+ else
+ {
+ log.error("Unsupported Stomp frame: " + frame);
+ response = new StompFrame(Stomp.Responses.ERROR,
+ new HashMap<String, Object>(),
+ ("Unsupported frame: " + command).getBytes());
+ }
+
+ if (response != null)
+ {
+ send(connection, response);
+ }
+ }
+ catch (StompException ex)
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try
+ {
+ // Let the stomp client know about any protocol errors.
+ PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
+ ex.printStackTrace(stream);
+ stream.close();
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put(Stomp.Headers.Error.MESSAGE, ex.getMessage());
+
+ final String receiptId = (String)frame.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+ if (receiptId != null)
+ {
+ headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+ }
+
+ StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
+ try
+ {
+ send(connection, errorMessage);
+ }
+ catch (IOException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+
+ private StompFrame onSubscribe(StompFrame frame, HornetQServer server, RemotingConnection connection) throws Exception,
+ StompException,
+ HornetQException
+ {
+ Map<String, Object> headers = frame.getHeaders();
+ String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
+ SimpleString queueName = SimpleString.toSimpleString(StompUtils.toHornetQAddress(queue));
+
+ ServerSession session = checkAndGetSession(connection);
+ long consumerID = server.getStorageManager().generateUniqueID();
+ session.createConsumer(consumerID, queueName, null, false);
+ session.receiveConsumerCredits(consumerID, -1);
+ session.start();
+
+ return null;
+ }
+
+ private ServerSession checkAndGetSession(RemotingConnection connection) throws StompException
+ {
+ ServerSession session = sessions.get(connection);
+ if (session == null)
+ {
+ throw new StompException("Not connected");
+ }
+ return session;
+ }
+
+ private StompFrame onDisconnect(StompFrame frame, HornetQServer server, RemotingConnection connection) throws StompException
+ {
+ ServerSession session = checkAndGetSession(connection);
+ if (session != null)
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Exception e)
+ {
+ throw new StompException(e.getMessage());
+ }
+ sessions.remove(connection);
+ }
+ return null;
+ }
+
+ private StompFrame onSend(StompFrame frame, HornetQServer server, RemotingConnection connection) throws Exception
+ {
+ ServerSession session = checkAndGetSession(connection);
+
+ Map<String, Object> headers = frame.getHeaders();
+ String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
+ /*
+ String type = (String)headers.get(Stomp.Headers.Send.TYPE);
+ long expiration = (Long)headers.get(Stomp.Headers.Send.EXPIRATION_TIME);
+ byte priority = (Byte)headers.get(Stomp.Headers.Send.PRIORITY);
+ boolean durable = (Boolean)headers.get(Stomp.Headers.Send.PERSISTENT);
+ */
+ byte type = Message.TEXT_TYPE;
+ if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH))
+ {
+ type = Message.BYTES_TYPE;
+ }
+ long timestamp = System.currentTimeMillis();
+ boolean durable = false;
+ long expiration = -1;
+ byte priority = 9;
+ SimpleString address = SimpleString.toSimpleString(StompUtils.toHornetQAddress(queue));
+
+ ServerMessageImpl message = new ServerMessageImpl(server.getStorageManager().generateUniqueID(), 512);
+ message.setType(type);
+ message.setTimestamp(timestamp);
+ message.setAddress(address);
+ byte[] content = frame.getContent();
+ if (type == Message.TEXT_TYPE)
+ {
+ message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(new String(content)));
+ }
+ else
+ {
+ message.getBodyBuffer().writeBytes(content);
+ }
+
+ session.send(message);
+ if (headers.containsKey(Stomp.Headers.RECEIPT_REQUESTED))
+ {
+ Map<String, Object> h = new HashMap<String, Object>();
+ h.put(Stomp.Headers.Response.RECEIPT_ID, headers.get(Stomp.Headers.RECEIPT_REQUESTED));
+ return new StompFrame(Stomp.Responses.RECEIPT, h, new byte[] {});
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ private StompFrame onConnect(StompFrame frame, HornetQServer server, final RemotingConnection connection) throws Exception
+ {
+ Map<String, Object> headers = frame.getHeaders();
+ String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
+ String passcode = (String)headers.get(Stomp.Headers.Connect.PASSCODE);
+ String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
+
+ String name = UUIDGenerator.getInstance().generateStringUUID();
+ ServerSession session = server.createSession(name,
+ login,
+ passcode,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ connection,
+ true,
+ true,
+ false,
+ false,
+ new StompSessionCallback(marshaller, connection));
+ sessions.put(connection, session);
+ System.out.println(">>> created session " + session);
+ HashMap<String, Object> h = new HashMap<String, Object>();
+ h.put(Stomp.Headers.Connected.SESSION, name);
+ h.put(Stomp.Headers.Connected.RESPONSE_ID, requestID);
+ return new StompFrame(Stomp.Responses.CONNECTED, h, new byte[] {});
+ }
+
+ private void send(RemotingConnection connection, StompFrame frame) throws IOException
+ {
+ System.out.println("SENDING >>> " + frame);
+ byte[] bytes = marshaller.marshal(frame);
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+ System.out.println("ready to send reply: " + buffer);
+ connection.getTransportConnection().write(buffer, true);
+ }
+
+ public int isReadyToHandle(HornetQBuffer buffer)
+ {
+ return -1;
+ }
+}
Added: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManagerFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManagerFactory.java (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManagerFactory.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.stomp;
+
+import java.util.List;
+
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.spi.core.protocol.ProtocolManager;
+import org.hornetq.spi.core.protocol.ProtocolManagerFactory;
+
+/**
+ * A StompProtocolManagerFactory
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+public class StompProtocolManagerFactory implements ProtocolManagerFactory
+{
+
+ public ProtocolManager createProtocolManager(final HornetQServer server, final List<Interceptor> interceptors)
+ {
+ return new StompProtocolManager(server, interceptors);
+ }
+
+}
Added: trunk/src/main/org/hornetq/core/protocol/stomp/StompSessionCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSessionCallback.java (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSessionCallback.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.protocol.SessionCallback;
+
+/**
+ * A StompSessionCallback
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ */
+class StompSessionCallback implements SessionCallback
+{
+ private final RemotingConnection connection;
+
+ private final StompMarshaller marshaller;
+
+ StompSessionCallback(final StompMarshaller marshaller, final RemotingConnection connection)
+ {
+ this.marshaller = marshaller;
+ this.connection = connection;
+ }
+
+ public void sendProducerCreditsMessage(int credits, SimpleString address, int offset)
+ {
+ }
+
+ public int sendMessage(ServerMessage serverMessage, long consumerID, int deliveryCount)
+ {
+ try
+ {
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put(Stomp.Headers.Message.DESTINATION, StompUtils.toStompDestination(serverMessage.getAddress()
+ .toString()));
+ byte[] data = new byte[] {};
+ if (serverMessage.getType() == Message.TEXT_TYPE)
+ {
+ SimpleString text = serverMessage.getBodyBuffer().readNullableSimpleString();
+ if (text != null)
+ {
+ data = text.toString().getBytes();
+ }
+ }
+ StompFrame msg = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
+ System.out.println("SENDING : " + msg);
+ byte[] bytes = marshaller.marshal(msg);
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+ connection.getTransportConnection().write(buffer, true);
+
+ return bytes.length;
+
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ return 0;
+ }
+
+ }
+
+ public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean continues, boolean requiresResponse)
+ {
+ return 0;
+ }
+
+ public int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize, int deliveryCount)
+ {
+ return 0;
+ }
+
+ public void closed()
+ {
+ }
+}
\ No newline at end of file
Added: trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.stomp;
+
+import org.hornetq.api.core.HornetQException;
+
+/**
+ * A StompUtils
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+class StompUtils
+{
+
+ public static String HQ_QUEUE_PREFIX = "jms.queue.";
+
+ public static String STOMP_QUEUE_PREFIX = "/queue/";
+
+ public static String HQ_TEMP_QUEUE_PREFIX = "jms.tempqueue.";
+
+ public static String STOMP_TEMP_QUEUE_PREFIX = "/temp-queue/";
+
+ public static String HQ_TOPIC_PREFIX = "jms.topic.";
+
+ public static String STOMP_TOPIC_PREFIX = "/topic/";
+
+ public static String HQ_TEMP_TOPIC_PREFIX = "jms.temptopic.";
+
+ public static String STOMP_TEMP_TOPIC_PREFIX = "/temp-topic/";
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ public static String toHornetQAddress(String stompDestination) throws HornetQException
+ {
+ if (stompDestination == null)
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "No destination is specified!");
+ }
+ else if (stompDestination.startsWith(STOMP_QUEUE_PREFIX))
+ {
+ return convert(stompDestination, STOMP_QUEUE_PREFIX, HQ_QUEUE_PREFIX);
+ }
+ else if (stompDestination.startsWith(STOMP_TOPIC_PREFIX))
+ {
+ return convert(stompDestination, STOMP_TOPIC_PREFIX, HQ_TOPIC_PREFIX);
+ }
+ else if (stompDestination.startsWith(STOMP_TEMP_QUEUE_PREFIX))
+ {
+ return convert(stompDestination, STOMP_TEMP_QUEUE_PREFIX, HQ_TEMP_QUEUE_PREFIX);
+ }
+ else if (stompDestination.startsWith(STOMP_TEMP_TOPIC_PREFIX))
+ {
+ return convert(stompDestination, STOMP_TEMP_TOPIC_PREFIX, HQ_TEMP_TOPIC_PREFIX);
+ }
+ else
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal destination name: [" + stompDestination +
+ "] -- StompConnect destinations " +
+ "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
+ }
+ }
+
+ public static String toStompDestination(String hornetqAddress) throws HornetQException
+ {
+ if (hornetqAddress == null)
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "No destination is specified!");
+ }
+ else if (hornetqAddress.startsWith(HQ_QUEUE_PREFIX))
+ {
+ return convert(hornetqAddress, HQ_QUEUE_PREFIX, STOMP_QUEUE_PREFIX);
+ }
+ else if (hornetqAddress.startsWith(HQ_TOPIC_PREFIX))
+ {
+ return convert(hornetqAddress, HQ_TOPIC_PREFIX, STOMP_TOPIC_PREFIX);
+ }
+ else if (hornetqAddress.startsWith(HQ_TEMP_QUEUE_PREFIX))
+ {
+ return convert(hornetqAddress, HQ_TEMP_QUEUE_PREFIX, STOMP_TEMP_QUEUE_PREFIX);
+ }
+ else if (hornetqAddress.startsWith(HQ_TEMP_TOPIC_PREFIX))
+ {
+ return convert(hornetqAddress, HQ_TEMP_TOPIC_PREFIX, STOMP_TEMP_TOPIC_PREFIX);
+ }
+ else
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal address name: [" + hornetqAddress +
+ "] -- Acceptable address must comply to JMS semantics");
+ }
+ }
+
+ private static String convert(String str, String oldPrefix, String newPrefix)
+ {
+ String sub = str.substring(oldPrefix.length(), str.length());
+ return new String(newPrefix + sub);
+ }
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -58,14 +58,11 @@
private boolean paused;
private NotificationService notificationService;
-
- private final ProtocolType protocol;
public InVMAcceptor(final Map<String, Object> configuration,
final BufferHandler handler,
final ConnectionLifeCycleListener listener,
- final Executor threadPool,
- final ProtocolType protocol)
+ final Executor threadPool)
{
this.handler = handler;
@@ -74,8 +71,6 @@
id = ConfigurationHelper.getIntProperty(TransportConstants.SERVER_ID_PROP_NAME, 0, configuration);
executorFactory = new OrderedExecutorFactory(threadPool);
-
- this.protocol = protocol;
}
public synchronized void start() throws Exception
Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java 2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -17,7 +17,6 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
-import org.hornetq.spi.core.protocol.ProtocolType;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.AcceptorFactory;
import org.hornetq.spi.core.remoting.BufferDecoder;
@@ -37,10 +36,9 @@
final BufferDecoder decoder,
final ConnectionLifeCycleListener listener,
final Executor threadPool,
- final ScheduledExecutorService scheduledThreadPool,
- final ProtocolType protocol)
+ final ScheduledExecutorService scheduledThreadPool)
{
- return new InVMAcceptor(configuration, handler, listener, threadPool, protocol);
+ return new InVMAcceptor(configuration, handler, listener, threadPool);
}
public Set<String> getAllowableProperties()
Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -30,9 +30,11 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.aardvark.impl.AardvarkProtocolManagerFactory;
import org.hornetq.core.protocol.core.impl.CoreProtocolManagerFactory;
+import org.hornetq.core.protocol.stomp.StompProtocolManagerFactory;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.management.ManagementService;
+import org.hornetq.integration.transports.netty.TransportConstants;
import org.hornetq.spi.core.protocol.ConnectionEntry;
import org.hornetq.spi.core.protocol.ProtocolManager;
import org.hornetq.spi.core.protocol.ProtocolType;
@@ -121,6 +123,7 @@
this.scheduledThreadPool = scheduledThreadPool;
this.protocolMap.put(ProtocolType.CORE, new CoreProtocolManagerFactory().createProtocolManager(server, interceptors));
+ this.protocolMap.put(ProtocolType.STOMP, new StompProtocolManagerFactory().createProtocolManager(server, interceptors));
this.protocolMap.put(ProtocolType.AARDVARK, new AardvarkProtocolManagerFactory().createProtocolManager(server, interceptors));
}
@@ -159,9 +162,9 @@
}
}
- //TODO - allow protocol type to be configured from Configuration for each acceptor
+ String protocolString = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOL_PROP_NAME, TransportConstants.DEFAULT_PROTOCOL, info.getParams());
- ProtocolType protocol = hackProtocol;
+ ProtocolType protocol = ProtocolType.valueOf(protocolString.toUpperCase());
ProtocolManager manager = protocolMap.get(protocol);
@@ -170,8 +173,7 @@
manager,
this,
threadPool,
- scheduledThreadPool,
- protocol);
+ scheduledThreadPool);
acceptors.add(acceptor);
@@ -200,9 +202,6 @@
started = true;
}
-
- //FIXME - temp hack so we can choose AARDVARK as protocol
- public static ProtocolType hackProtocol = ProtocolType.CORE;
public synchronized void freeze()
{
Modified: trunk/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/HornetQServer.java 2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/core/server/HornetQServer.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -35,6 +35,7 @@
import org.hornetq.core.transaction.ResourceManager;
import org.hornetq.core.version.Version;
import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.protocol.SessionCallback;
import org.hornetq.spi.core.security.HornetQSecurityManager;
import org.hornetq.utils.ExecutorFactory;
@@ -80,12 +81,11 @@
boolean autoCommitSends,
boolean autoCommitAcks,
boolean preAcknowledge,
- boolean xa) throws Exception;
+ boolean xa,
+ final SessionCallback callback) throws Exception;
void removeSession(String name) throws Exception;
- ServerSession getSession(String name);
-
Set<ServerSession> getSessions();
boolean isStarted();
Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -111,6 +111,4 @@
void setTransferring(boolean transferring);
void runConnectionFailureRunners();
-
- void setCallback(SessionCallback callback);
}
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -106,6 +106,7 @@
import org.hornetq.core.version.Version;
import org.hornetq.spi.core.logging.LogDelegateFactory;
import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.protocol.SessionCallback;
import org.hornetq.spi.core.security.HornetQSecurityManager;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.HornetQThreadFactory;
@@ -542,7 +543,8 @@
final boolean autoCommitSends,
final boolean autoCommitAcks,
final boolean preAcknowledge,
- final boolean xa) throws Exception
+ final boolean xa,
+ final SessionCallback callback) throws Exception
{
if (securityStore != null)
{
@@ -565,7 +567,8 @@
securityStore,
managementService,
this,
- configuration.getManagementAddress());
+ configuration.getManagementAddress(),
+ callback);
sessions.put(name, session);
@@ -595,11 +598,6 @@
sessions.remove(name);
}
- public ServerSession getSession(final String name)
- {
- return sessions.get(name);
- }
-
public synchronized List<ServerSession> getSessions(final String connectionID)
{
Set<Entry<String, ServerSession>> sessionEntries = sessions.entrySet();
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -134,7 +134,7 @@
private final RoutingContext routingContext = new RoutingContextImpl(null);
- private SessionCallback callback;
+ private final SessionCallback callback;
// Constructors ---------------------------------------------------------------------------------
@@ -154,7 +154,8 @@
final SecurityStore securityStore,
final ManagementService managementService,
final HornetQServer server,
- final SimpleString managementAddress) throws Exception
+ final SimpleString managementAddress,
+ final SessionCallback callback) throws Exception
{
this.username = username;
@@ -193,6 +194,8 @@
this.managementAddress = managementAddress;
+ this.callback = callback;
+
remotingConnection.addFailureListener(this);
remotingConnection.addCloseListener(this);
@@ -200,11 +203,6 @@
// ServerSession implementation ----------------------------------------------------------------------------
- public void setCallback(final SessionCallback callback)
- {
- this.callback = callback;
- }
-
public String getUsername()
{
return username;
Modified: trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -16,6 +16,7 @@
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
+import org.hornetq.core.protocol.stomp.StompFrameDelimiter;
import org.hornetq.spi.core.protocol.ProtocolType;
import org.hornetq.spi.core.remoting.BufferDecoder;
import org.jboss.netty.channel.ChannelPipeline;
@@ -54,9 +55,12 @@
//Core protocol uses it's own optimised decoder
pipeline.addLast("decoder", new HornetQFrameDecoder2());
}
+ else if (protocol == ProtocolType.STOMP)
+ {
+ pipeline.addLast("decoder", new StompFrameDelimiter());
+ }
else
{
- //Use the old frame decoder for other protocols
pipeline.addLast("decoder", new HornetQFrameDecoder(decoder));
}
}
Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -78,7 +78,7 @@
*/
public class NettyAcceptor implements Acceptor
{
- private static final Logger log = Logger.getLogger(NettyAcceptor.class);
+ static final Logger log = Logger.getLogger(NettyAcceptor.class);
private ChannelFactory channelFactory;
@@ -106,6 +106,8 @@
private final boolean useInvm;
+ private final ProtocolType protocol;
+
private final String host;
private final int port;
@@ -134,8 +136,6 @@
private VirtualExecutorService bossExecutor;
- private final ProtocolType protocol;
-
private boolean paused;
public NettyAcceptor(final Map<String, Object> configuration,
@@ -143,8 +143,7 @@
final BufferDecoder decoder,
final ConnectionLifeCycleListener listener,
final Executor threadPool,
- final ScheduledExecutorService scheduledThreadPool,
- final ProtocolType protocol)
+ final ScheduledExecutorService scheduledThreadPool)
{
this.handler = handler;
@@ -152,8 +151,6 @@
this.listener = listener;
- this.protocol = protocol;
-
sslEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME,
TransportConstants.DEFAULT_SSL_ENABLED,
configuration);
@@ -190,6 +187,11 @@
useInvm = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_INVM_PROP_NAME,
TransportConstants.DEFAULT_USE_INVM,
configuration);
+ String protocolStr = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOL_PROP_NAME,
+ TransportConstants.DEFAULT_PROTOCOL,
+ configuration);
+ protocol = ProtocolType.valueOf(protocolStr);
+
host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME,
TransportConstants.DEFAULT_HOST,
configuration);
Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java 2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -18,7 +18,6 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
-import org.hornetq.spi.core.protocol.ProtocolType;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.AcceptorFactory;
import org.hornetq.spi.core.remoting.BufferDecoder;
@@ -37,11 +36,9 @@
final BufferDecoder decoder,
final ConnectionLifeCycleListener listener,
final Executor threadPool,
- final ScheduledExecutorService scheduledThreadPool,
- final ProtocolType protocol)
+ final ScheduledExecutorService scheduledThreadPool)
{
- return new NettyAcceptor(configuration, handler, decoder, listener, threadPool, scheduledThreadPool,
- protocol);
+ return new NettyAcceptor(configuration, handler, decoder, listener, threadPool, scheduledThreadPool);
}
public Set<String> getAllowableProperties()
Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java 2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -14,7 +14,6 @@
package org.hornetq.integration.transports.netty;
import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.HornetQException;
import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
import org.hornetq.core.logging.Logger;
import org.hornetq.spi.core.protocol.ProtocolType;
Modified: trunk/src/main/org/hornetq/integration/transports/netty/TransportConstants.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/TransportConstants.java 2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/integration/transports/netty/TransportConstants.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -16,6 +16,8 @@
import java.util.HashSet;
import java.util.Set;
+import org.hornetq.spi.core.protocol.ProtocolType;
+
/**
* A TransportConstants
*
@@ -46,6 +48,8 @@
public static final String USE_INVM_PROP_NAME = "use-invm";
+ public static final String PROTOCOL_PROP_NAME = "protocol";
+
public static final String HOST_PROP_NAME = "host";
public static final String PORT_PROP_NAME = "port";
@@ -75,10 +79,14 @@
public static final boolean DEFAULT_USE_SERVLET = false;
+ public static final String DEFAULT_PROTOCOL = ProtocolType.CORE.toString();
+
public static final String DEFAULT_HOST = "localhost";
public static final int DEFAULT_PORT = 5445;
+ public static final int DEFAULT_STOMP_PORT = 61613;
+
public static final String DEFAULT_KEYSTORE_PATH = "hornetq.keystore";
public static final String DEFAULT_KEYSTORE_PASSWORD = "secureexample";
@@ -120,6 +128,7 @@
allowableAcceptorKeys.add(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_NIO_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_INVM_PROP_NAME);
+ allowableAcceptorKeys.add(TransportConstants.PROTOCOL_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.HOST_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.PORT_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.KEYSTORE_PATH_PROP_NAME);
Modified: trunk/src/main/org/hornetq/jms/client/HornetQBytesMessage.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQBytesMessage.java 2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/jms/client/HornetQBytesMessage.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -19,6 +19,7 @@
import javax.jms.MessageFormatException;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Message;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.core.logging.Logger;
@@ -42,7 +43,7 @@
private static final Logger log = Logger.getLogger(HornetQBytesMessage.class);
- public static final byte TYPE = 4;
+ public static final byte TYPE = Message.BYTES_TYPE;
// Attributes ----------------------------------------------------
Modified: trunk/src/main/org/hornetq/jms/client/HornetQMapMessage.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQMapMessage.java 2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/jms/client/HornetQMapMessage.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -22,6 +22,7 @@
import javax.jms.MapMessage;
import javax.jms.MessageFormatException;
+import org.hornetq.api.core.Message;
import org.hornetq.api.core.PropertyConversionException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientMessage;
@@ -45,7 +46,7 @@
{
// Constants -----------------------------------------------------
- public static final byte TYPE = 5;
+ public static final byte TYPE = Message.MAP_TYPE;
// Attributes ----------------------------------------------------
Modified: trunk/src/main/org/hornetq/jms/client/HornetQMessage.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQMessage.java 2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/jms/client/HornetQMessage.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -83,7 +83,7 @@
private static final String JMSXGROUPID = "JMSXGroupID";
- public static final byte TYPE = 0;
+ public static final byte TYPE = org.hornetq.api.core.Message.DEFAULT_TYPE;
public static Map<String, Object> coreMaptoJMSMap(final Map<String, Object> coreMessage)
{
Modified: trunk/src/main/org/hornetq/jms/client/HornetQObjectMessage.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQObjectMessage.java 2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/jms/client/HornetQObjectMessage.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -22,6 +22,7 @@
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
+import org.hornetq.api.core.Message;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientSession;
@@ -44,7 +45,7 @@
{
// Constants -----------------------------------------------------
- public static final byte TYPE = 2;
+ public static final byte TYPE = Message.OBJECT_TYPE;
// Attributes ----------------------------------------------------
Modified: trunk/src/main/org/hornetq/jms/client/HornetQStreamMessage.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQStreamMessage.java 2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/jms/client/HornetQStreamMessage.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -19,6 +19,7 @@
import javax.jms.StreamMessage;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Message;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.core.client.impl.ClientMessageImpl;
@@ -48,7 +49,7 @@
private static final Logger log = Logger.getLogger(HornetQStreamMessage.class);
- public static final byte TYPE = 6;
+ public static final byte TYPE = Message.STREAM_TYPE;
// Attributes ----------------------------------------------------
Modified: trunk/src/main/org/hornetq/jms/client/HornetQTextMessage.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQTextMessage.java 2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/jms/client/HornetQTextMessage.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -17,6 +17,7 @@
import javax.jms.TextMessage;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientSession;
@@ -42,7 +43,7 @@
{
// Constants -----------------------------------------------------
- public static final byte TYPE = 3;
+ public static final byte TYPE = Message.TEXT_TYPE;
public static final Logger log = Logger.getLogger(HornetQTextMessage.class);
Modified: trunk/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java 2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -13,8 +13,8 @@
package org.hornetq.spi.core.protocol;
+import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.spi.core.remoting.BufferDecoder;
-import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
/**
@@ -24,7 +24,14 @@
*
*
*/
-public interface ProtocolManager extends BufferHandler, BufferDecoder
+public interface ProtocolManager extends BufferDecoder
{
ConnectionEntry createConnectionEntry(Connection connection);
+
+ public void removeHandler(final String name);
+
+ public int isReadyToHandle(HornetQBuffer buffer);
+
+ void handleBuffer(RemotingConnection connection, HornetQBuffer buffer);
+
}
Modified: trunk/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java 2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -18,12 +18,10 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
-import org.hornetq.spi.core.protocol.ProtocolType;
-
/**
* A factory for creating acceptors.
* <p/>
- * An Acceptor is an endpoin that a {@link org.hornetq.spi.core.remoting.Connector} will connect to and is used by the remoting service.
+ * An Acceptor is an endpoint that a {@link org.hornetq.spi.core.remoting.Connector} will connect to and is used by the remoting service.
*
* @author <a href="ataylor(a)redhat.com">Andy Taylor</a>
* @author <a href="tim.fox(a)jboss.com">Tim Fox</a>
@@ -47,8 +45,7 @@
BufferDecoder decoder,
ConnectionLifeCycleListener listener,
Executor threadPool,
- ScheduledExecutorService scheduledThreadPool,
- ProtocolType protocol);
+ ScheduledExecutorService scheduledThreadPool);
/**
* Returns the allowable properties for this acceptor.
Modified: trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -98,7 +98,7 @@
conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool, ProtocolType.CORE);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool);
acceptor.start();
SimpleBufferHandler2 connectorHandler = new SimpleBufferHandler2(connectorLatch);
@@ -152,7 +152,7 @@
conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool, ProtocolType.CORE);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool);
acceptor.start();
SimpleBufferHandler2 connectorHandler = new SimpleBufferHandler2(connectorLatch);
@@ -210,7 +210,7 @@
conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool, ProtocolType.CORE);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool);
acceptor.start();
SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
@@ -269,7 +269,7 @@
conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool, ProtocolType.CORE);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool);
acceptor.start();
SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
@@ -327,7 +327,7 @@
conf.put(TransportConstants.HTTP_CLIENT_IDLE_PROP_NAME, 500l);
DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool, ProtocolType.CORE);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool);
acceptor.start();
SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
@@ -381,7 +381,7 @@
conf.put(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, 5000l);
DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool, ProtocolType.CORE);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool);
acceptor.start();
BogusResponseHandler connectorHandler = new BogusResponseHandler(connectorLatch);
@@ -428,7 +428,7 @@
conf.put(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, 5000l);
DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler2 acceptorHandler = new SimpleBufferHandler2(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool, ProtocolType.CORE);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool);
acceptor.start();
BogusResponseHandler connectorHandler = new BogusResponseHandler(connectorLatch);
Modified: trunk/tests/src/org/hornetq/tests/integration/remoting/AardvarkProtocolTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/remoting/AardvarkProtocolTest.java 2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/tests/src/org/hornetq/tests/integration/remoting/AardvarkProtocolTest.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -45,8 +45,6 @@
public void testAardvark() throws Exception
{
- RemotingServiceImpl.hackProtocol = ProtocolType.AARDVARK;
-
Configuration config = new ConfigurationImpl();
config.setSecurityEnabled(false);
@@ -56,6 +54,7 @@
params.put(TransportConstants.PORT_PROP_NAME, 9876);
params.put(TransportConstants.HOST_PROP_NAME, "127.0.0.1");
+ params.put(TransportConstants.PROTOCOL_PROP_NAME, ProtocolType.AARDVARK.toString());
TransportConfiguration tc = new TransportConfiguration(NettyAcceptorFactory.class.getCanonicalName(),
params);
@@ -95,7 +94,5 @@
socket.close();
server.stop();
-
- RemotingServiceImpl.hackProtocol = ProtocolType.CORE;
}
}
Added: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -0,0 +1,964 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.tests.integration.stomp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.stomp.Stomp;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
+import org.hornetq.integration.transports.netty.TransportConstants;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.jms.server.config.JMSConfiguration;
+import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
+import org.hornetq.jms.server.config.impl.QueueConfigurationImpl;
+import org.hornetq.jms.server.impl.JMSServerManagerImpl;
+import org.hornetq.spi.core.protocol.ProtocolType;
+
+public class StompTest extends TestCase {
+ private static final transient Logger log = Logger.getLogger(StompTest.class);
+ private int port = 61613;
+ private Socket stompSocket;
+ private ByteArrayOutputStream inputBuffer;
+ private ConnectionFactory connectionFactory;
+ private Connection connection;
+ private Session session;
+ private Queue queue;
+ private JMSServerManager server;
+
+ public void testConnect() throws Exception {
+
+ String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "request-id: 1\n" + "\n" + Stomp.NULL;
+ sendFrame(connect_frame);
+
+ String f = receiveFrame(10000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
+ Assert.assertTrue(f.indexOf("response-id:1") >= 0);
+ }
+
+ public void testDisconnectAndError() throws Exception {
+
+ String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "request-id: 1\n" + "\n" + Stomp.NULL;
+ sendFrame(connect_frame);
+
+ String f = receiveFrame(10000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
+ Assert.assertTrue(f.indexOf("response-id:1") >= 0);
+
+ connect_frame = "DISCONNECT\n\n" + Stomp.NULL;
+ sendFrame(connect_frame);
+
+ // sending a message will result in an error
+ String frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() + "\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ f = receiveFrame(10000);
+ Assert.assertTrue(f.startsWith("ERROR"));
+ }
+
+
+ public void testSendMessage() throws Exception {
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() + "\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+
+ // Make sure that the timestamp is valid - should
+ // be very close to the current time.
+ long tnow = System.currentTimeMillis();
+ long tmsg = message.getJMSTimestamp();
+ Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+ }
+
+ public void testSendMessageWithReceipt() throws Exception {
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "receipt: 1234\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+
+ String f = receiveFrame(10000);
+ Assert.assertTrue(f.startsWith("RECEIPT"));
+ Assert.assertTrue(f.indexOf("receipt-id:1234") >= 0);
+
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+
+ // Make sure that the timestamp is valid - should
+ // be very close to the current time.
+ long tnow = System.currentTimeMillis();
+ long tmsg = message.getJMSTimestamp();
+ Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+ }
+
+ public void testSendMessageWithContentLength() throws Exception {
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ byte[] data = new byte[] {1, 2, 3, 4};
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "content-length:" + data.length + "\n\n" +
+ new String(data) +
+ Stomp.NULL;
+
+ sendFrame(frame);
+
+ BytesMessage message = (BytesMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ assertEquals(data.length, message.getBodyLength());
+ assertEquals(data[0], message.readByte());
+ assertEquals(data[1], message.readByte());
+ assertEquals(data[2], message.readByte());
+ assertEquals(data[3], message.readByte());
+
+ // Make sure that the timestamp is valid - should
+ // be very close to the current time.
+ long tnow = System.currentTimeMillis();
+ long tmsg = message.getJMSTimestamp();
+ Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+ }
+
+ public void _testJMSXGroupIdCanBeSet() throws Exception {
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "JMSXGroupID: TEST\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ // TODO do we support it?
+ //Assert.assertEquals("TEST", ((TextMessage) message).getGroupID());
+ }
+
+ public void _testSendMessageWithCustomHeadersAndSelector() throws Exception {
+
+ MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'");
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SEND\n" +
+ "foo:abc\n" +
+ "bar:123\n" +
+ "destination:/queue/" + getQueueName() + "\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+ Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
+ Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
+ }
+
+ public void _testSendMessageWithStandardHeaders() throws Exception {
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SEND\n" +
+ "correlation-id:c123\n" +
+ "priority:3\n" +
+ "type:t345\n" +
+ "JMSXGroupID:abc\n" +
+ "foo:abc\n" +
+ "bar:123\n" +
+ "destination:/queue/" + getQueueName() + "\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+ Assert.assertEquals("JMSCorrelationID", "c123", message.getJMSCorrelationID());
+ Assert.assertEquals("getJMSType", "t345", message.getJMSType());
+ Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority());
+ Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
+ Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
+
+ Assert.assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID"));
+ // FIXME do we support it?
+ //Assert.assertEquals("GroupID", "abc", amqMessage.getGroupID());
+ }
+
+ public void testSubscribeWithAutoAck() throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "ack:auto\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ sendMessage(getName());
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.indexOf("destination:") > 0);
+ Assert.assertTrue(frame.indexOf(getName()) > 0);
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ }
+
+ public void _testSubscribeWithAutoAckAndBytesMessage() throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "ack:auto\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ sendBytesMessage(new byte[]{1, 2, 3, 4, 5});
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+ Pattern cl = Pattern.compile("Content-length:\\s*(\\d+)", Pattern.CASE_INSENSITIVE);
+ Matcher cl_matcher = cl.matcher(frame);
+ Assert.assertTrue(cl_matcher.find());
+ Assert.assertEquals("5", cl_matcher.group(1));
+
+ Assert.assertFalse(Pattern.compile("type:\\s*null", Pattern.CASE_INSENSITIVE).matcher(frame).find());
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ }
+
+ public void _testSubscribeWithMessageSentWithProperties() throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "ack:auto\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ MessageProducer producer = session.createProducer(queue);
+ TextMessage message = session.createTextMessage("Hello World");
+ message.setStringProperty("S", "value");
+ message.setBooleanProperty("n", false);
+ message.setByteProperty("byte", (byte) 9);
+ message.setDoubleProperty("d", 2.0);
+ message.setFloatProperty("f", (float) 6.0);
+ message.setIntProperty("i", 10);
+ message.setLongProperty("l", 121);
+ message.setShortProperty("s", (short) 12);
+ producer.send(message);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.indexOf("S:") > 0);
+ Assert.assertTrue(frame.indexOf("n:") > 0);
+ Assert.assertTrue(frame.indexOf("byte:") > 0);
+ Assert.assertTrue(frame.indexOf("d:") > 0);
+ Assert.assertTrue(frame.indexOf("f:") > 0);
+ Assert.assertTrue(frame.indexOf("i:") > 0);
+ Assert.assertTrue(frame.indexOf("l:") > 0);
+ Assert.assertTrue(frame.indexOf("s:") > 0);
+ Assert.assertTrue(frame.indexOf("Hello World") > 0);
+
+// System.out.println("out: "+frame);
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ }
+
+ public void _testMessagesAreInOrder() throws Exception {
+ int ctr = 10;
+ String[] data = new String[ctr];
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "ack:auto\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ for (int i = 0; i < ctr; ++i) {
+ data[i] = getName() + i;
+ sendMessage(data[i]);
+ }
+
+ for (int i = 0; i < ctr; ++i) {
+ frame = receiveFrame(1000);
+ Assert.assertTrue("Message not in order", frame.indexOf(data[i]) >= 0);
+ }
+
+ // sleep a while before publishing another set of messages
+ waitForFrameToTakeEffect();
+
+ for (int i = 0; i < ctr; ++i) {
+ data[i] = getName() + ":second:" + i;
+ sendMessage(data[i]);
+ }
+
+ for (int i = 0; i < ctr; ++i) {
+ frame = receiveFrame(1000);
+ Assert.assertTrue("Message not in order", frame.indexOf(data[i]) >= 0);
+ }
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ }
+
+ public void _testSubscribeWithAutoAckAndSelector() throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "selector: foo = 'zzz'\n" +
+ "ack:auto\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ sendMessage("Ignored message", "foo", "1234");
+ sendMessage("Real message", "foo", "zzz");
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real message") > 0);
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ }
+
+ public void _testSubscribeWithClientAck() throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "ack:client\n\n" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+ sendMessage(getName());
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ // message should be received since message was not acknowledged
+ MessageConsumer consumer = session.createConsumer(queue);
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertTrue(message.getJMSRedelivered());
+ }
+
+ public void _testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws Exception {
+ assertSubscribeWithClientAckThenConsumeWithAutoAck(false);
+ }
+
+ public void _testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithExplicitDisconnect() throws Exception {
+ assertSubscribeWithClientAckThenConsumeWithAutoAck(true);
+ }
+
+ protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "ack:client\n\n" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+ sendMessage(getName());
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+ log.info("Reconnecting!");
+
+ if (sendDisconnect) {
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ reconnect();
+ }
+ else {
+ reconnect(1000);
+ }
+
+
+ // message should be received since message was not acknowledged
+ frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n\n" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ // now lets make sure we don't see the message again
+ reconnect();
+
+ frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n\n" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+ sendMessage("shouldBeNextMessage");
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.contains("shouldBeNextMessage"));
+ }
+
+ public void _testUnsubscribe() throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "ack:auto\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ //send a message to our queue
+ sendMessage("first message");
+
+ //receive message from socket
+ frame = receiveFrame(1000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+ //remove suscription
+ frame =
+ "UNSUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ waitForFrameToTakeEffect();
+
+ //send a message to our queue
+ sendMessage("second message");
+
+ try {
+ frame = receiveFrame(1000);
+ log.info("Received frame: " + frame);
+ Assert.fail("No message should have been received since subscription was removed");
+ }
+ catch (SocketTimeoutException e) {
+
+ }
+ }
+
+ public void _testTransactionCommit() throws Exception {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ String f = receiveFrame(1000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
+
+ frame =
+ "BEGIN\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ waitForFrameToTakeEffect();
+ // check the message is not committed
+ assertNull(consumer.receive(100));
+
+ frame =
+ "COMMIT\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ waitForFrameToTakeEffect();
+
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull("Should have received a message", message);
+ }
+
+ public void _testTransactionRollback() throws Exception {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ String f = receiveFrame(1000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
+
+ frame =
+ "BEGIN\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "transaction: tx1\n" +
+ "\n" +
+ "first message" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ //rollback first message
+ frame =
+ "ABORT\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "BEGIN\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "transaction: tx1\n" +
+ "\n" +
+ "second message" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "COMMIT\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ // This test case is currently failing
+ waitForFrameToTakeEffect();
+
+ //only second msg should be received since first msg was rolled back
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("second message", message.getText().trim());
+ }
+
+ // Implementation methods
+ //-------------------------------------------------------------------------
+ protected void setUp() throws Exception {
+ server = createServer();
+ server.start();
+ connectionFactory = createConnectionFactory();
+
+ stompSocket = createSocket();
+ inputBuffer = new ByteArrayOutputStream();
+
+ connection = connectionFactory.createConnection();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ queue = session.createQueue(getQueueName());
+ connection.start();
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ */
+ private JMSServerManager createServer() throws Exception
+ {
+ Configuration config = new ConfigurationImpl();
+ config.setSecurityEnabled(false);
+ config.setPersistenceEnabled(false);
+
+ Map<String, Object> params = new HashMap<String, Object>();
+ params.put(TransportConstants.PROTOCOL_PROP_NAME, ProtocolType.STOMP.toString());
+ params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
+ TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
+ config.getAcceptorConfigurations().add(stompTransport);
+ config.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
+ HornetQServer hornetQServer = HornetQServers.newHornetQServer(config);
+
+ JMSConfiguration jmsConfig = new JMSConfigurationImpl();
+ jmsConfig.getQueueConfigurations().add(new QueueConfigurationImpl(getQueueName(), null, false, getQueueName()));
+ server = new JMSServerManagerImpl(hornetQServer, jmsConfig);
+ server.setContext(null);
+ return server;
+ }
+
+ protected void tearDown() throws Exception {
+ connection.close();
+ if (stompSocket != null) {
+ stompSocket.close();
+ }
+ server.stop();
+ }
+
+ protected void reconnect() throws Exception {
+ reconnect(0);
+ }
+ protected void reconnect(long sleep) throws Exception {
+ stompSocket.close();
+
+ if (sleep > 0) {
+ Thread.sleep(sleep);
+ }
+
+ stompSocket = createSocket();
+ inputBuffer = new ByteArrayOutputStream();
+ }
+
+ protected ConnectionFactory createConnectionFactory() {
+ return new HornetQConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ }
+
+ protected Socket createSocket() throws IOException {
+ return new Socket("127.0.0.1", port);
+ }
+
+ protected String getQueueName() {
+ return "test";
+ }
+
+ public void sendFrame(String data) throws Exception {
+ byte[] bytes = data.getBytes("UTF-8");
+ OutputStream outputStream = stompSocket.getOutputStream();
+ for (int i = 0; i < bytes.length; i++) {
+ outputStream.write(bytes[i]);
+ }
+ outputStream.flush();
+ }
+
+ public String receiveFrame(long timeOut) throws Exception {
+ stompSocket.setSoTimeout((int) timeOut);
+ InputStream is = stompSocket.getInputStream();
+ int c = 0;
+ for (; ;) {
+ c = is.read();
+ if (c < 0) {
+ throw new IOException("socket closed.");
+ }
+ else if (c == 0) {
+ c = is.read();
+ if (c != '\n')
+ {
+ byte[] ba = inputBuffer.toByteArray();
+ System.out.println(new String(ba, "UTF-8"));
+ }
+ Assert.assertEquals("Expecting stomp frame to terminate with \0\n", c, '\n');
+ byte[] ba = inputBuffer.toByteArray();
+ inputBuffer.reset();
+ return new String(ba, "UTF-8");
+ }
+ else {
+ inputBuffer.write(c);
+ }
+ }
+ }
+
+ public void sendMessage(String msg) throws Exception {
+ sendMessage(msg, "foo", "xyz");
+ }
+
+ public void sendMessage(String msg, String propertyName, String propertyValue) throws JMSException {
+ MessageProducer producer = session.createProducer(queue);
+ TextMessage message = session.createTextMessage(msg);
+ message.setStringProperty(propertyName, propertyValue);
+ producer.send(message);
+ }
+
+ public void sendBytesMessage(byte[] msg) throws Exception {
+ MessageProducer producer = session.createProducer(queue);
+ BytesMessage message = session.createBytesMessage();
+ message.writeBytes(msg);
+ producer.send(message);
+ }
+
+ protected void waitForFrameToTakeEffect() throws InterruptedException {
+ // bit of a dirty hack :)
+ // another option would be to force some kind of receipt to be returned
+ // from the frame
+ Thread.sleep(2000);
+ }
+}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java 2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -73,8 +73,7 @@
null,
listener,
Executors.newCachedThreadPool(),
- Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE),
- ProtocolType.CORE);
+ Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE));
Assert.assertTrue(acceptor instanceof NettyAcceptor);
}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -86,8 +86,7 @@
null,
listener,
Executors.newCachedThreadPool(),
- Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE),
- ProtocolType.CORE);
+ Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE));
acceptor.start();
Assert.assertTrue(acceptor.isStarted());
14 years, 11 months
JBoss hornetq SVN: r8842 - trunk/examples/jms/message-group.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-01-22 09:25:11 -0500 (Fri, 22 Jan 2010)
New Revision: 8842
Modified:
trunk/examples/jms/message-group/readme.html
Log:
https://jira.jboss.org/jira/browse/HORNETQ-276: message-group jms example's readme.html is incorrect
* fixed code listing
Modified: trunk/examples/jms/message-group/readme.html
===================================================================
--- trunk/examples/jms/message-group/readme.html 2010-01-22 13:21:27 UTC (rev 8841)
+++ trunk/examples/jms/message-group/readme.html 2010-01-22 14:25:11 UTC (rev 8842)
@@ -77,7 +77,7 @@
for (int i = 0; i < msgCount; i++)
{
groupMessages[i] = session.createTextMessage("Group-0 message " + i);
- groupMessages[i].setStringProperty(HornetQMessage.JMSXGROUPID, "Group-0");
+ groupMessages[i].setStringProperty("JMSXGroupID", "Group-0");
producer.send(groupMessages[i]);
System.out.println("Sent message: " + groupMessages[i].getText());
}
14 years, 11 months
JBoss hornetq SVN: r8841 - in branches/HORNETQ-129_STOMP_protocol: src/main/org/hornetq/core/protocol/core/impl and 10 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-01-22 08:21:27 -0500 (Fri, 22 Jan 2010)
New Revision: 8841
Added:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferDecoder.java
Removed:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/AbstractBufferHandler.java
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferHandler.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/remoting/AardvarkProtocolTest.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* sync with the trunk: svn merge -r 8832:8839 https://svn.jboss.org/repos/hornetq/trunk
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java 2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java 2010-01-22 13:21:27 UTC (rev 8841)
@@ -130,10 +130,4 @@
{
manager.handleBuffer(this, buffer);
}
-
- public int isReadyToHandle(HornetQBuffer buffer)
- {
- return -1;
- }
-
}
Deleted: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/AbstractBufferHandler.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/AbstractBufferHandler.java 2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/AbstractBufferHandler.java 2010-01-22 13:21:27 UTC (rev 8841)
@@ -1,46 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.core.protocol.core.impl;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.spi.core.remoting.BufferHandler;
-import org.hornetq.utils.DataConstants;
-
-/**
- * A AbstractBufferHandler
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- */
-public abstract class AbstractBufferHandler implements BufferHandler
-{
- private static final Logger log = Logger.getLogger(AbstractBufferHandler.class);
-
- public int isReadyToHandle(final HornetQBuffer buffer)
- {
- if (buffer.readableBytes() < DataConstants.SIZE_INT)
- {
- return -1;
- }
-
- int length = buffer.readInt();
-
- if (buffer.readableBytes() < length)
- {
- return -1;
- }
-
- return length;
- }
-}
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-01-22 13:21:27 UTC (rev 8841)
@@ -125,6 +125,8 @@
{
}
+ //This is never called using the core protocol, since we override the HornetQFrameDecoder with our core
+ //optimised version HornetQFrameDecoder2, which nevers calls this
public int isReadyToHandle(HornetQBuffer buffer)
{
return -1;
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-01-22 13:21:27 UTC (rev 8841)
@@ -31,6 +31,7 @@
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.utils.SimpleIDGenerator;
@@ -39,7 +40,7 @@
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
* @version <tt>$Revision$</tt> $Id$
*/
-public class RemotingConnectionImpl extends AbstractBufferHandler implements CoreRemotingConnection
+public class RemotingConnectionImpl implements BufferHandler, CoreRemotingConnection
{
// Constants
// ------------------------------------------------------------------------------------
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-01-22 13:21:27 UTC (rev 8841)
@@ -292,5 +292,4 @@
{
return -1;
}
-
}
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2010-01-22 13:21:27 UTC (rev 8841)
@@ -46,7 +46,7 @@
private final int id;
private final BufferHandler handler;
-
+
private final ConnectionLifeCycleListener listener;
private final ConcurrentMap<String, Connection> connections = new ConcurrentHashMap<String, Connection>();
@@ -62,12 +62,12 @@
private final ProtocolType protocol;
public InVMAcceptor(final Map<String, Object> configuration,
- final BufferHandler handler,
+ final BufferHandler handler,
final ConnectionLifeCycleListener listener,
final Executor threadPool)
{
this.handler = handler;
-
+
this.listener = listener;
id = ConfigurationHelper.getIntProperty(TransportConstants.SERVER_ID_PROP_NAME, 0, configuration);
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java 2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java 2010-01-22 13:21:27 UTC (rev 8841)
@@ -19,6 +19,7 @@
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.AcceptorFactory;
+import org.hornetq.spi.core.remoting.BufferDecoder;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -32,6 +33,7 @@
{
public Acceptor createAcceptor(final Map<String, Object> configuration,
final BufferHandler handler,
+ final BufferDecoder decoder,
final ConnectionLifeCycleListener listener,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool)
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-01-22 13:21:27 UTC (rev 8841)
@@ -22,8 +22,6 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
-import com.sun.corba.se.spi.activation.ServerHolder;
-
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
@@ -32,9 +30,11 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.aardvark.impl.AardvarkProtocolManagerFactory;
import org.hornetq.core.protocol.core.impl.CoreProtocolManagerFactory;
+import org.hornetq.core.protocol.stomp.StompProtocolManagerFactory;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.management.ManagementService;
+import org.hornetq.integration.transports.netty.TransportConstants;
import org.hornetq.spi.core.protocol.ConnectionEntry;
import org.hornetq.spi.core.protocol.ProtocolManager;
import org.hornetq.spi.core.protocol.ProtocolType;
@@ -72,8 +72,6 @@
private final Map<Object, ConnectionEntry> connections = new ConcurrentHashMap<Object, ConnectionEntry>();
- //private final BufferHandler bufferHandler = new DelegatingBufferHandler();
-
private final Configuration config;
private final HornetQServer server;
@@ -125,6 +123,7 @@
this.scheduledThreadPool = scheduledThreadPool;
this.protocolMap.put(ProtocolType.CORE, new CoreProtocolManagerFactory().createProtocolManager(server, interceptors));
+ this.protocolMap.put(ProtocolType.STOMP, new StompProtocolManagerFactory().createProtocolManager(server, interceptors));
this.protocolMap.put(ProtocolType.AARDVARK, new AardvarkProtocolManagerFactory().createProtocolManager(server, interceptors));
}
@@ -163,14 +162,15 @@
}
}
- //TODO - allow protocol type to be configured from Configuration for each acceptor
+ String protocolString = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOL_PROP_NAME, TransportConstants.DEFAULT_PROTOCOL, info.getParams());
- ProtocolType protocol = hackProtocol;
+ ProtocolType protocol = ProtocolType.valueOf(protocolString.toUpperCase());
ProtocolManager manager = protocolMap.get(protocol);
Acceptor acceptor = factory.createAcceptor(info.getParams(),
new DelegatingBufferHandler(manager),
+ manager,
this,
threadPool,
scheduledThreadPool);
@@ -202,9 +202,6 @@
started = true;
}
-
- //FIXME - temp hack so we can choose AARDVARK as protocol
- public static ProtocolType hackProtocol = ProtocolType.CORE;
public synchronized void freeze()
{
@@ -406,11 +403,6 @@
conn.connection.bufferReceived(connectionID, buffer);
}
}
-
- public int isReadyToHandle(HornetQBuffer buffer)
- {
- return manager.isReadyToHandle(buffer);
- }
}
private final class FailureCheckAndFlushThread extends Thread
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2010-01-22 13:21:27 UTC (rev 8841)
@@ -18,7 +18,7 @@
import org.hornetq.core.protocol.stomp.StompFrameDelimiter;
import org.hornetq.spi.core.protocol.ProtocolType;
-import org.hornetq.spi.core.remoting.BufferHandler;
+import org.hornetq.spi.core.remoting.BufferDecoder;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.handler.ssl.SslHandler;
@@ -46,24 +46,22 @@
// Public --------------------------------------------------------
- public static void addCodecFilter(final ProtocolType protocol, final ChannelPipeline pipeline, final BufferHandler handler)
+ public static void addCodecFilter(final ProtocolType protocol, final ChannelPipeline pipeline, final BufferDecoder decoder)
{
assert pipeline != null;
if (protocol == ProtocolType.CORE)
{
+ //Core protocol uses it's own optimised decoder
pipeline.addLast("decoder", new HornetQFrameDecoder2());
}
else if (protocol == ProtocolType.STOMP)
{
- pipeline.addLast("delimiter", new StompFrameDelimiter());
+ pipeline.addLast("decoder", new StompFrameDelimiter());
}
else
{
-
- // FIXME
- //Use the old frame decoder for other protocols
- //pipeline.addLast("decoder", new HornetQFrameDecoder(handler));
+ pipeline.addLast("decoder", new HornetQFrameDecoder(decoder));
}
}
Added: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java 2010-01-22 13:21:27 UTC (rev 8841)
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.transports.netty;
+
+import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.spi.core.remoting.BufferDecoder;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+
+/**
+ * A Netty FrameDecoder used to decode messages.
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="ataylor(a)redhat.com">Andy Taylor</a>
+ * @author <a href="tlee(a)redhat.com">Trustin Lee</a>
+ *
+ * @version $Revision: 8837 $, $Date: 2010-01-22 12:06:32 +0100 (Ven, 22 jan 2010) $
+ */
+public class HornetQFrameDecoder extends FrameDecoder
+{
+ private static final Logger log = Logger.getLogger(HornetQFrameDecoder.class);
+
+ private final BufferDecoder decoder;
+
+ public HornetQFrameDecoder(final BufferDecoder decoder)
+ {
+ this.decoder = decoder;
+ }
+
+ // FrameDecoder overrides
+ // -------------------------------------------------------------------------------------
+
+ @Override
+ protected Object decode(final ChannelHandlerContext ctx, final Channel channel, final ChannelBuffer in) throws Exception
+ {
+ int start = in.readerIndex();
+
+ int length = decoder.isReadyToHandle(new ChannelBufferWrapper(in));
+
+ in.readerIndex(start);
+
+ if (length == -1)
+ {
+ return null;
+ }
+
+ ChannelBuffer buffer = in.readBytes(length);
+
+ ChannelBuffer newBuffer = ChannelBuffers.dynamicBuffer(buffer.writerIndex());
+
+ newBuffer.writeBytes(buffer);
+
+ return newBuffer;
+ }
+}
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java 2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java 2010-01-22 13:21:27 UTC (rev 8841)
@@ -24,7 +24,7 @@
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
/**
- * A Netty decoder used to decode messages.
+ * A Netty decoder specially optimised to to decode messages on the core protocol only
*
* @author <a href="tlee(a)redhat.com">Trustin Lee</a>
*
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-22 13:21:27 UTC (rev 8841)
@@ -37,6 +37,7 @@
import org.hornetq.core.server.management.NotificationService;
import org.hornetq.spi.core.protocol.ProtocolType;
import org.hornetq.spi.core.remoting.Acceptor;
+import org.hornetq.spi.core.remoting.BufferDecoder;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -86,6 +87,8 @@
private ServerBootstrap bootstrap;
private final BufferHandler handler;
+
+ private final BufferDecoder decoder;
private final ConnectionLifeCycleListener listener;
@@ -135,11 +138,14 @@
public NettyAcceptor(final Map<String, Object> configuration,
final BufferHandler handler,
+ final BufferDecoder decoder,
final ConnectionLifeCycleListener listener,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool)
{
this.handler = handler;
+
+ this.decoder = decoder;
this.listener = listener;
@@ -287,21 +293,9 @@
pipeline.addLast("httpResponseEncoder", new HttpResponseEncoder());
pipeline.addLast("httphandler", new HttpAcceptorHandler(httpKeepAliveRunnable, httpResponseTime));
}
- /*
- if (protocol == ProtocolType.STOMP)
- {
- pipeline.addLast("delimiter", new StompFrameDelimiter());
- pipeline.addLast("codec", new StompFrameDecoder());
- pipeline.addLast("handler", new StompChannelHandler(serverHolder,
- channelGroup,
- NettyAcceptor.this,
- new Listener()));
- }
- else
- {*/
- ChannelPipelineSupport.addCodecFilter(protocol, pipeline, handler);
- pipeline.addLast("handler", new HornetQServerChannelHandler(channelGroup, handler, new Listener()));
-// }
+
+ ChannelPipelineSupport.addCodecFilter(protocol, pipeline, decoder);
+ pipeline.addLast("handler", new HornetQServerChannelHandler(channelGroup, handler, new Listener()));
return pipeline;
}
};
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java 2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java 2010-01-22 13:21:27 UTC (rev 8841)
@@ -20,6 +20,7 @@
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.AcceptorFactory;
+import org.hornetq.spi.core.remoting.BufferDecoder;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -32,11 +33,12 @@
{
public Acceptor createAcceptor(final Map<String, Object> configuration,
final BufferHandler handler,
+ final BufferDecoder decoder,
final ConnectionLifeCycleListener listener,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool)
{
- return new NettyAcceptor(configuration, handler, listener, threadPool, scheduledThreadPool);
+ return new NettyAcceptor(configuration, handler, decoder, listener, threadPool, scheduledThreadPool);
}
public Set<String> getAllowableProperties()
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java 2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java 2010-01-22 13:21:27 UTC (rev 8841)
@@ -315,7 +315,7 @@
pipeline.addLast("httpResponseDecoder", new HttpResponseDecoder());
pipeline.addLast("httphandler", new HttpHandler());
}
- ChannelPipelineSupport.addCodecFilter(ProtocolType.CORE, pipeline, handler);
+ ChannelPipelineSupport.addCodecFilter(ProtocolType.CORE, pipeline, null);
pipeline.addLast("handler", new HornetQClientChannelHandler(channelGroup, handler, new Listener()));
return pipeline;
}
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java 2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java 2010-01-22 13:21:27 UTC (rev 8841)
@@ -14,6 +14,7 @@
package org.hornetq.spi.core.protocol;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.spi.core.remoting.BufferDecoder;
import org.hornetq.spi.core.remoting.Connection;
/**
@@ -23,7 +24,7 @@
*
*
*/
-public interface ProtocolManager
+public interface ProtocolManager extends BufferDecoder
{
ConnectionEntry createConnectionEntry(Connection connection);
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java 2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java 2010-01-22 13:21:27 UTC (rev 8841)
@@ -34,6 +34,7 @@
*
* @param configuration the configuration
* @param handler the handler
+ * @param decoder the decoder
* @param listener the listener
* @param threadPool the threadpool
* @param scheduledThreadPool a scheduled thread pool
@@ -41,6 +42,7 @@
*/
Acceptor createAcceptor(final Map<String, Object> configuration,
BufferHandler handler,
+ BufferDecoder decoder,
ConnectionLifeCycleListener listener,
Executor threadPool,
ScheduledExecutorService scheduledThreadPool);
Copied: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferDecoder.java (from rev 8839, trunk/src/main/org/hornetq/spi/core/remoting/BufferDecoder.java)
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferDecoder.java (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferDecoder.java 2010-01-22 13:21:27 UTC (rev 8841)
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.spi.core.remoting;
+
+import org.hornetq.api.core.HornetQBuffer;
+
+/**
+ * A BufferDecoder
+ *
+ * @author tim
+ *
+ *
+ */
+public interface BufferDecoder
+{
+ /**
+ * called by the remoting system prior to {@link org.hornetq.spi.core.remoting.BufferHandler#bufferReceived(Object, org.hornetq.api.core.HornetQBuffer)}.
+ * <p/>
+ * The implementation should return true if there is enough data in the buffer to decode. otherwise false.
+ *
+ * @param buffer the buffer
+ * @return true id the buffer can be decoded..
+ */
+ int isReadyToHandle(HornetQBuffer buffer);
+}
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferHandler.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferHandler.java 2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferHandler.java 2010-01-22 13:21:27 UTC (rev 8841)
@@ -28,5 +28,5 @@
*
* @param buffer the buffer to decode
*/
- void bufferReceived(Object connectionID, HornetQBuffer buffer);
+ void bufferReceived(Object connectionID, HornetQBuffer buffer);
}
Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java 2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java 2010-01-22 13:21:27 UTC (rev 8841)
@@ -56,10 +56,10 @@
suite.addTest(new LargeMessageFailoverTest("testCreateNewFactoryAfterFailover"));
// Those tests are temporarily disabled for LargeMessage
- // suite.addTest(new LargeMessageFailoverTest("testFailoverMultipleSessionsWithConsumers"));
- // suite.addTest(new LargeMessageFailoverTest("testFailWithBrowser"));
- // suite.addTest(new LargeMessageFailoverTest("testFailThenReceiveMoreMessagesAfterFailover"));
- // suite.addTest(new LargeMessageFailoverTest("testFailThenReceiveMoreMessagesAfterFailover2"));
+ suite.addTest(new LargeMessageFailoverTest("testFailoverMultipleSessionsWithConsumers"));
+ suite.addTest(new LargeMessageFailoverTest("testFailWithBrowser"));
+ suite.addTest(new LargeMessageFailoverTest("testFailThenReceiveMoreMessagesAfterFailover"));
+ suite.addTest(new LargeMessageFailoverTest("testFailThenReceiveMoreMessagesAfterFailover2"));
suite.addTest(new LargeMessageFailoverTest("testForceBlockingReturn"));
suite.addTest(new LargeMessageFailoverTest("testCommitOccurredUnblockedAndResendNoDuplicates"));
Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2010-01-22 13:21:27 UTC (rev 8841)
@@ -98,7 +98,7 @@
conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool);
acceptor.start();
SimpleBufferHandler2 connectorHandler = new SimpleBufferHandler2(connectorLatch);
@@ -152,7 +152,7 @@
conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool);
acceptor.start();
SimpleBufferHandler2 connectorHandler = new SimpleBufferHandler2(connectorLatch);
@@ -210,7 +210,7 @@
conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool);
acceptor.start();
SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
@@ -269,7 +269,7 @@
conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool);
acceptor.start();
SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
@@ -327,7 +327,7 @@
conf.put(TransportConstants.HTTP_CLIENT_IDLE_PROP_NAME, 500l);
DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool);
acceptor.start();
SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
@@ -381,7 +381,7 @@
conf.put(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, 5000l);
DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool);
acceptor.start();
BogusResponseHandler connectorHandler = new BogusResponseHandler(connectorLatch);
@@ -428,7 +428,7 @@
conf.put(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, 5000l);
DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler2 acceptorHandler = new SimpleBufferHandler2(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool);
acceptor.start();
BogusResponseHandler connectorHandler = new BogusResponseHandler(connectorLatch);
@@ -524,11 +524,6 @@
this.latch = latch;
}
- public int isReadyToHandle(final HornetQBuffer buffer)
- {
- return 0;
- }
-
public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
{
int i = buffer.readInt();
Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/remoting/AardvarkProtocolTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/remoting/AardvarkProtocolTest.java 2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/remoting/AardvarkProtocolTest.java 2010-01-22 13:21:27 UTC (rev 8841)
@@ -45,8 +45,6 @@
public void testAardvark() throws Exception
{
- RemotingServiceImpl.hackProtocol = ProtocolType.AARDVARK;
-
Configuration config = new ConfigurationImpl();
config.setSecurityEnabled(false);
@@ -56,6 +54,7 @@
params.put(TransportConstants.PORT_PROP_NAME, 9876);
params.put(TransportConstants.HOST_PROP_NAME, "127.0.0.1");
+ params.put(TransportConstants.PROTOCOL_PROP_NAME, ProtocolType.AARDVARK.toString());
TransportConfiguration tc = new TransportConfiguration(NettyAcceptorFactory.class.getCanonicalName(),
params);
@@ -95,7 +94,5 @@
socket.close();
server.stop();
-
- RemotingServiceImpl.hackProtocol = ProtocolType.CORE;
}
}
Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java 2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java 2010-01-22 13:21:27 UTC (rev 8841)
@@ -69,6 +69,7 @@
};
Acceptor acceptor = factory.createAcceptor(params,
handler,
+ null,
listener,
Executors.newCachedThreadPool(),
Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE));
Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2010-01-22 13:21:27 UTC (rev 8841)
@@ -84,6 +84,7 @@
NettyAcceptor acceptor = new NettyAcceptor(params,
handler,
+ null,
listener,
Executors.newCachedThreadPool(),
Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE));
Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java 2010-01-22 12:46:06 UTC (rev 8840)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java 2010-01-22 13:21:27 UTC (rev 8841)
@@ -72,7 +72,7 @@
};
NettyConnector connector = new NettyConnector(params,
- handler,
+ handler,
listener,
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool(),
14 years, 11 months