JBoss hornetq SVN: r8810 - in branches/HORNETQ-129_STOMP_protocol: src/main/org/hornetq/core/server and 4 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-01-20 08:24:55 -0500 (Wed, 20 Jan 2010)
New Revision: 8810
Added:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompDestinationConverter.java
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/message/impl/MessageImpl.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/ServerMessage.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* barely enough ugly code to make StompTest.testSendMessage pass this time
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/message/impl/MessageImpl.java 2010-01-20 11:02:05 UTC (rev 8809)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/message/impl/MessageImpl.java 2010-01-20 13:24:55 UTC (rev 8810)
@@ -249,7 +249,14 @@
{
return type;
}
+
+ public void setType(byte type)
+ {
+ this.type = type;
+ }
+
+
public boolean isDurable()
{
return durable;
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/ServerMessage.java 2010-01-20 11:02:05 UTC (rev 8809)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/ServerMessage.java 2010-01-20 13:24:55 UTC (rev 8810)
@@ -62,4 +62,6 @@
boolean storeIsPaging();
void encodeMessageIDToBuffer();
+
+ void setType(byte type);
}
Added: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompDestinationConverter.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompDestinationConverter.java (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompDestinationConverter.java 2010-01-20 13:24:55 UTC (rev 8810)
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.stomp;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.jms.client.HornetQQueue;
+import org.hornetq.jms.client.HornetQTemporaryQueue;
+import org.hornetq.jms.client.HornetQTemporaryTopic;
+import org.hornetq.jms.client.HornetQTopic;
+
+/**
+ * A StompDestinationConverter
+ *
+ * @author jmesnil
+ *
+ *
+ */
+public class StompDestinationConverter
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ public static SimpleString convertDestination(String name) throws HornetQException
+ {
+ if (name == null)
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "No destination is specified!");
+ }
+ else if (name.startsWith("/queue/"))
+ {
+ String queueName = name.substring("/queue/".length(), name.length());
+ return HornetQQueue.createAddressFromName(queueName);
+ }
+ else if (name.startsWith("/topic/"))
+ {
+ String topicName = name.substring("/topic/".length(), name.length());
+ return HornetQTopic.createAddressFromName(topicName);
+ }
+ else if (name.startsWith("/temp-queue/"))
+ {
+ String tempName = name.substring("/temp-queue/".length(), name.length());
+ return HornetQTemporaryQueue.createAddressFromName(tempName);
+ }
+ else if (name.startsWith("/temp-topic/"))
+ {
+ String tempName = name.substring("/temp-topic/".length(), name.length());
+ return HornetQTemporaryTopic.createAddressFromName(tempName);
+ }
+ else
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal destination name: [" + name +
+ "] -- StompConnect destinations " +
+ "must begine with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
+ }
+ }
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-20 11:02:05 UTC (rev 8809)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-20 13:24:55 UTC (rev 8810)
@@ -40,13 +40,18 @@
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.CorePacketDecoder;
import org.hornetq.core.remoting.impl.ssl.SSLSupport;
+import org.hornetq.core.remoting.impl.wireformat.SessionSendMessage;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.server.management.NotificationService;
import org.hornetq.integration.stomp.Stomp;
+import org.hornetq.integration.stomp.StompDestinationConverter;
import org.hornetq.integration.stomp.StompFrame;
import org.hornetq.integration.stomp.StompMarshaller;
+import org.hornetq.jms.client.HornetQTextMessage;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
@@ -160,7 +165,7 @@
this.handler = handler;
this.serverHandler = serverHandler;
-
+
this.listener = listener;
sslEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME,
@@ -308,7 +313,10 @@
if (protocol.equals(TransportConstants.STOMP_PROTOCOL))
{
ChannelPipelineSupport.addStompStack(pipeline, serverHandler);
- pipeline.addLast("handler", new StompChannelHandler(serverHandler, new StompMarshaller(), channelGroup, new Listener()));
+ pipeline.addLast("handler", new StompChannelHandler(serverHandler,
+ new StompMarshaller(),
+ channelGroup,
+ new Listener()));
}
else
{
@@ -512,19 +520,20 @@
private final class HornetQServerChannelHandler extends AbstractServerChannelHandler
{
private PacketDecoder decoder;
+
private BufferHandler handler;
HornetQServerChannelHandler(final ChannelGroup group,
- final PacketDecoder decoder,
- final BufferHandler handler,
- final ConnectionLifeCycleListener listener)
- {
- super(group, listener);
-
- this.decoder = decoder;
- this.handler = handler;
- }
+ final PacketDecoder decoder,
+ final BufferHandler handler,
+ final ConnectionLifeCycleListener listener)
+ {
+ super(group, listener);
+ this.decoder = decoder;
+ this.handler = handler;
+ }
+
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
{
@@ -532,14 +541,16 @@
handler.bufferReceived(e.getChannel().getId(), new ChannelBufferWrapper(buffer), decoder);
}
-
+
}
-
+
@ChannelPipelineCoverage("one")
public final class StompChannelHandler extends AbstractServerChannelHandler
{
private final StompMarshaller marshaller;
+ private final Map<RemotingConnection, ServerSession> sessions = new HashMap<RemotingConnection, ServerSession>();
+
private ServerHolder serverHandler;
public StompChannelHandler(ServerHolder serverHolder,
@@ -554,31 +565,82 @@
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
{
- StompFrame frame = (StompFrame)e.getMessage();
- System.out.println(">>> got frame " + frame);
+ try
+ {
+ StompFrame frame = (StompFrame)e.getMessage();
+ System.out.println(">>> got frame " + frame);
- // need to interact with HornetQ server & session
- HornetQServer server = serverHandler.getServer();
- RemotingConnection connection = serverHandler.getRemotingConnection(e.getChannel().getId());
+ // need to interact with HornetQ server & session
+ HornetQServer server = serverHandler.getServer();
+ RemotingConnection connection = serverHandler.getRemotingConnection(e.getChannel().getId());
- String command = frame.getCommand();
+ String command = frame.getCommand();
- StompFrame response = null;
- if (Stomp.Commands.CONNECT.equals(command))
+ StompFrame response = null;
+ if (Stomp.Commands.CONNECT.equals(command))
+ {
+ response = onConnect(frame, server, connection);
+ }
+ else if (Stomp.Commands.SEND.equals(command))
+ {
+ response = onSend(frame, server, connection);
+ }
+ else
+ {
+ log.error("Unsupported Stomp frame: " + frame);
+ }
+ if (response != null)
+ {
+ System.out.println(">>> will reply " + response);
+ byte[] bytes = marshaller.marshal(response);
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+ System.out.println("ready to send reply: " + buffer);
+ connection.getTransportConnection().write(buffer, true);
+ }
+ }
+ catch (Exception ex)
{
- response = onConnect(frame, server, connection);
+ ex.printStackTrace();
}
- else
+ }
+
+ private StompFrame onSend(StompFrame frame, HornetQServer server, RemotingConnection connection) throws HornetQException
+ {
+ Map<String, Object> headers = frame.getHeaders();
+ String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
+ /*
+ String type = (String)headers.get(Stomp.Headers.Send.TYPE);
+ long expiration = (Long)headers.get(Stomp.Headers.Send.EXPIRATION_TIME);
+ byte priority = (Byte)headers.get(Stomp.Headers.Send.PRIORITY);
+ boolean durable = (Boolean)headers.get(Stomp.Headers.Send.PERSISTENT);
+ */
+ byte type = HornetQTextMessage.TYPE;
+ long timestamp = System.currentTimeMillis();
+ boolean durable = false;
+ long expiration = -1;
+ byte priority = 9;
+ SimpleString address = StompDestinationConverter.convertDestination(queue);
+
+ ServerMessage message = new ServerMessageImpl(server.getStorageManager().generateUniqueID(), 512);
+ message.setType(type);
+ message.setTimestamp(timestamp);
+ message.setAddress(address);
+ String content = new String(frame.getContent());
+ System.out.println(">>> got: " + content);
+ message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(content));
+
+ ServerSession session = sessions.get(connection);
+ SessionSendMessage packet = new SessionSendMessage(message, false);
+ session.handleSend(packet);
+ if (headers.containsKey(Stomp.Headers.RECEIPT_REQUESTED))
{
- log.error("Unsupported Stomp frame: " + frame);
+ Map<String, Object> h = new HashMap<String, Object>();
+ h.put(Stomp.Headers.Response.RECEIPT_ID, headers.get(Stomp.Headers.RECEIPT_REQUESTED));
+ return new StompFrame(Stomp.Responses.RECEIPT, h, new byte[] {});
}
- if (response != null)
+ else
{
- System.out.println(">>> will reply " + response);
- byte[] bytes = marshaller.marshal(response);
- HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
- System.out.println("ready to send reply: " + buffer);
- connection.getTransportConnection().write(buffer, true);
+ return null;
}
}
@@ -603,6 +665,7 @@
false,
-1);
ServerSession session = server.getSession(name);
+ sessions.put(connection, session);
System.out.println(">>> created session " + session);
HashMap<String, Object> h = new HashMap<String, Object>();
h.put(Stomp.Headers.Connected.SESSION, name);
@@ -610,12 +673,11 @@
return new StompFrame(Stomp.Responses.CONNECTED, h, new byte[] {});
}
}
-
+
@ChannelPipelineCoverage("one")
public abstract class AbstractServerChannelHandler extends HornetQChannelHandler
{
- protected AbstractServerChannelHandler(final ChannelGroup group,
- final ConnectionLifeCycleListener listener)
+ protected AbstractServerChannelHandler(final ChannelGroup group, final ConnectionLifeCycleListener listener)
{
super(group, listener);
}
Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-20 11:02:05 UTC (rev 8809)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-20 13:24:55 UTC (rev 8810)
@@ -41,11 +41,10 @@
import junit.framework.Assert;
import junit.framework.TestCase;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQServer;
@@ -61,7 +60,7 @@
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
public class StompTest extends TestCase {
- private static final transient Log log = LogFactory.getLog(StompTest.class);
+ private static final transient Logger log = Logger.getLogger(StompTest.class);
private int port = 61613;
private Socket stompSocket;
private ByteArrayOutputStream inputBuffer;
@@ -102,7 +101,7 @@
Stomp.NULL;
sendFrame(frame);
-
+
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals("Hello World", message.getText());
@@ -113,7 +112,45 @@
long tmsg = message.getJMSTimestamp();
Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
}
+
+ public void testSendMessageWithReceipt() throws Exception {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "receipt: 1234\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+
+ String f = receiveFrame(10000);
+ Assert.assertTrue(f.startsWith("RECEIPT"));
+ Assert.assertTrue(f.indexOf("receipt-id:1234") >= 0);
+
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+
+ // Make sure that the timestamp is valid - should
+ // be very close to the current time.
+ long tnow = System.currentTimeMillis();
+ long tmsg = message.getJMSTimestamp();
+ Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+ }
+
public void _testJMSXGroupIdCanBeSet() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2010-01-20 11:02:05 UTC (rev 8809)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2010-01-20 13:24:55 UTC (rev 8810)
@@ -663,6 +663,15 @@
// TODO Auto-generated method stub
return 0;
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.ServerMessage#setType(byte)
+ */
+ public void setType(byte type)
+ {
+ // TODO Auto-generated method stub
+
+ }
public HornetQBuffer getWholeBuffer()
{
14 years, 11 months
JBoss hornetq SVN: r8809 - in branches/HORNETQ-129_STOMP_protocol: src/main/org/hornetq/core/remoting/server/impl and 5 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-01-20 06:02:05 -0500 (Wed, 20 Jan 2010)
New Revision: 8809
Added:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/DummyServerHolder.java
Removed:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolConverter.java
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompPacketDecoder.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQChannelHandler.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* barely enough ugly code to make StompTest.testConnect pass :)
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2010-01-20 09:17:21 UTC (rev 8808)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2010-01-20 11:02:05 UTC (rev 8809)
@@ -23,6 +23,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.server.management.NotificationService;
+import org.hornetq.integration.transports.netty.ServerHolder;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
@@ -60,6 +61,7 @@
public InVMAcceptor(final Map<String, Object> configuration,
final BufferHandler handler,
+ final ServerHolder holder,
final ConnectionLifeCycleListener listener,
final Executor threadPool)
{
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java 2010-01-20 09:17:21 UTC (rev 8808)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java 2010-01-20 11:02:05 UTC (rev 8809)
@@ -17,6 +17,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
+import org.hornetq.integration.transports.netty.ServerHolder;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.AcceptorFactory;
import org.hornetq.spi.core.remoting.BufferHandler;
@@ -32,11 +33,12 @@
{
public Acceptor createAcceptor(final Map<String, Object> configuration,
final BufferHandler handler,
+ final ServerHolder holder,
final ConnectionLifeCycleListener listener,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool)
{
- return new InVMAcceptor(configuration, handler, listener, threadPool);
+ return new InVMAcceptor(configuration, handler, holder, listener, threadPool);
}
public Set<String> getAllowableProperties()
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-01-20 09:17:21 UTC (rev 8808)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-01-20 11:02:05 UTC (rev 8809)
@@ -41,6 +41,7 @@
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.impl.HornetQPacketHandler;
import org.hornetq.core.server.management.ManagementService;
+import org.hornetq.integration.transports.netty.ServerHolder;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.AcceptorFactory;
import org.hornetq.spi.core.remoting.BufferHandler;
@@ -161,6 +162,27 @@
Acceptor acceptor = factory.createAcceptor(info.getParams(),
bufferHandler,
+ new ServerHolder()
+ {
+ public HornetQServer getServer()
+ {
+ return server;
+ }
+
+ public RemotingConnection getRemotingConnection(int connectionID)
+ {
+ ConnectionEntry conn = connections.get(connectionID);
+
+ if (conn != null)
+ {
+ return conn.connection;
+ }
+ else
+ {
+ return null;
+ }
+ }
+ },
this,
threadPool,
scheduledThreadPool);
Deleted: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolConverter.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolConverter.java 2010-01-20 09:17:21 UTC (rev 8808)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolConverter.java 2010-01-20 11:02:05 UTC (rev 8809)
@@ -1,86 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.integration.stomp;
-
-import java.util.Map;
-
-import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.impl.wireformat.CreateSessionMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionCloseMessage;
-import org.hornetq.utils.UUIDGenerator;
-import org.hornetq.utils.VersionLoader;
-
-/**
- * A ProtocolConverter
- *
- * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
- *
- *
- */
-public class ProtocolConverter
-{
-
- public Packet toPacket(StompFrame frame)
- {
- String command = frame.getCommand();
- Map<String, Object> headers = frame.getHeaders();
- if (Stomp.Commands.CONNECT.equals(command))
- {
- String login = (String)headers.get("login");
- String password = (String)headers.get("passcode");
-
- String name = UUIDGenerator.getInstance().generateStringUUID();
- long sessionChannelID = 12;
- return new CreateSessionMessage(name,
- sessionChannelID,
- VersionLoader.getVersion().getIncrementingVersion(),
- login,
- password,
- HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
- false,
- true,
- true,
- false,
- HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE);
- }
- if (Stomp.Commands.DISCONNECT.equals(command))
- {
- return new SessionCloseMessage();
- }
- else
- {
- throw new RuntimeException("frame not supported: " + frame);
- }
- }
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompPacketDecoder.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompPacketDecoder.java 2010-01-20 09:17:21 UTC (rev 8808)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompPacketDecoder.java 2010-01-20 11:02:05 UTC (rev 8809)
@@ -14,39 +14,38 @@
package org.hornetq.integration.stomp;
import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.PacketDecoder;
+import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
/**
* A StompPacketDecoder
*
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
*/
-public class StompPacketDecoder implements PacketDecoder
+@ChannelPipelineCoverage("one")
+public class StompPacketDecoder extends SimpleChannelHandler
{
- private final StompMarshaller marshaller = new StompMarshaller();
-
- private final ProtocolConverter converter = new ProtocolConverter();
-
+ private final StompMarshaller marshaller;
+
// PacketDecoder implementation ----------------------------------
- public Packet decode(HornetQBuffer in)
+ public StompPacketDecoder(final StompMarshaller marshaller)
{
- StompFrame frame;
- try
- {
- frame = marshaller.unmarshal(in);
- System.out.println(">>> " + frame);
- Packet packet = converter.toPacket(frame);
- packet.setChannelID(1);
- System.out.println(">>> " + packet);
+ this.marshaller = marshaller;
+ }
- return packet;
- }
- catch (Exception e)
- {
- e.printStackTrace();
- return null;
- }
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
+ {
+ ChannelBuffer in = (ChannelBuffer)e.getMessage();
+ HornetQBuffer buffer = new ChannelBufferWrapper(in);
+ StompFrame frame = marshaller.unmarshal(buffer);
+
+ Channels.fireMessageReceived(ctx, frame);
}
}
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2010-01-20 09:17:21 UTC (rev 8808)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2010-01-20 11:02:05 UTC (rev 8809)
@@ -17,6 +17,8 @@
import javax.net.ssl.SSLEngine;
import org.hornetq.integration.stomp.StompFrameDelimiter;
+import org.hornetq.integration.stomp.StompMarshaller;
+import org.hornetq.integration.stomp.StompPacketDecoder;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.handler.ssl.SslHandler;
@@ -45,10 +47,12 @@
// Public --------------------------------------------------------
- public static void addStompCodecFilter(final ChannelPipeline pipeline, final BufferHandler handler)
+ public static void addStompStack(final ChannelPipeline pipeline, final ServerHolder serverHandler)
{
assert pipeline != null;
+ StompMarshaller marshaller = new StompMarshaller();
pipeline.addLast("delimiter", new StompFrameDelimiter());
+ pipeline.addLast("codec", new StompPacketDecoder(marshaller));
}
public static void addHornetQCodecFilter(final ChannelPipeline pipeline, final BufferHandler handler)
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQChannelHandler.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQChannelHandler.java 2010-01-20 09:17:21 UTC (rev 8808)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQChannelHandler.java 2010-01-20 11:02:05 UTC (rev 8809)
@@ -13,12 +13,8 @@
package org.hornetq.integration.transports.netty;
import org.hornetq.api.core.HornetQException;
-import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.PacketDecoder;
-import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
-import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
@@ -32,28 +28,20 @@
* @author <a href="mailto:tlee@redhat.com">Trustin Lee</a>
* @version $Rev$, $Date$
*/
-public class HornetQChannelHandler extends SimpleChannelHandler
+public abstract class HornetQChannelHandler extends SimpleChannelHandler
{
private static final Logger log = Logger.getLogger(HornetQChannelHandler.class);
private final ChannelGroup group;
- private final PacketDecoder decoder;
-
- private final BufferHandler handler;
-
private final ConnectionLifeCycleListener listener;
volatile boolean active;
public HornetQChannelHandler(final ChannelGroup group,
- final PacketDecoder decoder,
- final BufferHandler handler,
final ConnectionLifeCycleListener listener)
{
this.group = group;
- this.decoder = decoder;
- this.handler = handler;
this.listener = listener;
}
@@ -63,16 +51,10 @@
group.add(e.getChannel());
ctx.sendUpstream(e);
}
+
+ public abstract void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception;
@Override
- public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception
- {
- ChannelBuffer buffer = (ChannelBuffer)e.getMessage();
-
- handler.bufferReceived(e.getChannel().getId(), new ChannelBufferWrapper(buffer), decoder);
- }
-
- @Override
public void channelDisconnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception
{
synchronized (this)
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-20 09:17:21 UTC (rev 8808)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-20 11:02:05 UTC (rev 8809)
@@ -15,6 +15,7 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -26,25 +27,36 @@
import javax.net.ssl.SSLContext;
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.PacketDecoder;
+import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.CorePacketDecoder;
import org.hornetq.core.remoting.impl.ssl.SSLSupport;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.ServerSession;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.server.management.NotificationService;
-import org.hornetq.integration.stomp.StompPacketDecoder;
+import org.hornetq.integration.stomp.Stomp;
+import org.hornetq.integration.stomp.StompFrame;
+import org.hornetq.integration.stomp.StompMarshaller;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
import org.hornetq.utils.ConfigurationHelper;
import org.hornetq.utils.TypedProperties;
+import org.hornetq.utils.UUIDGenerator;
import org.hornetq.utils.VersionLoader;
import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
@@ -55,6 +67,7 @@
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.ChannelGroupFuture;
import org.jboss.netty.channel.group.DefaultChannelGroup;
@@ -106,6 +119,7 @@
private final boolean useInvm;
private final String protocol;
+
private final String host;
private final int port;
@@ -134,14 +148,19 @@
private VirtualExecutorService bossExecutor;
+ private ServerHolder serverHandler;
+
public NettyAcceptor(final Map<String, Object> configuration,
final BufferHandler handler,
+ final ServerHolder serverHandler,
final ConnectionLifeCycleListener listener,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool)
{
this.handler = handler;
+ this.serverHandler = serverHandler;
+
this.listener = listener;
sslEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME,
@@ -181,8 +200,8 @@
TransportConstants.DEFAULT_USE_INVM,
configuration);
protocol = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOL_PROP_NAME,
- TransportConstants.DEFAULT_PROTOCOL,
- configuration);
+ TransportConstants.DEFAULT_PROTOCOL,
+ configuration);
host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME,
TransportConstants.DEFAULT_HOST,
configuration);
@@ -286,18 +305,21 @@
pipeline.addLast("httpResponseEncoder", new HttpResponseEncoder());
pipeline.addLast("httphandler", new HttpAcceptorHandler(httpKeepAliveRunnable, httpResponseTime));
}
- PacketDecoder decoder;
if (protocol.equals(TransportConstants.STOMP_PROTOCOL))
{
- ChannelPipelineSupport.addStompCodecFilter(pipeline, handler);
- decoder = new StompPacketDecoder();
- } else
+ ChannelPipelineSupport.addStompStack(pipeline, serverHandler);
+ pipeline.addLast("handler", new StompChannelHandler(serverHandler, new StompMarshaller(), channelGroup, new Listener()));
+ }
+ else
{
ChannelPipelineSupport.addHornetQCodecFilter(pipeline, handler);
- decoder = new CorePacketDecoder();
+ PacketDecoder decoder = new CorePacketDecoder();
+ pipeline.addLast("handler", new HornetQServerChannelHandler(channelGroup,
+ decoder,
+ handler,
+ new Listener()));
}
-
- pipeline.addLast("handler", new HornetQServerChannelHandler(channelGroup, decoder, handler, new Listener()));
+
return pipeline;
}
};
@@ -487,15 +509,115 @@
// Inner classes -----------------------------------------------------------------------------
+ private final class HornetQServerChannelHandler extends AbstractServerChannelHandler
+ {
+ private PacketDecoder decoder;
+ private BufferHandler handler;
+
+ HornetQServerChannelHandler(final ChannelGroup group,
+ final PacketDecoder decoder,
+ final BufferHandler handler,
+ final ConnectionLifeCycleListener listener)
+ {
+ super(group, listener);
+
+ this.decoder = decoder;
+ this.handler = handler;
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
+ {
+ ChannelBuffer buffer = (ChannelBuffer)e.getMessage();
+
+ handler.bufferReceived(e.getChannel().getId(), new ChannelBufferWrapper(buffer), decoder);
+ }
+
+ }
+
@ChannelPipelineCoverage("one")
- private final class HornetQServerChannelHandler extends HornetQChannelHandler
+ public final class StompChannelHandler extends AbstractServerChannelHandler
{
- HornetQServerChannelHandler(final ChannelGroup group,
- final PacketDecoder decoder,
- final BufferHandler handler,
+ private final StompMarshaller marshaller;
+
+ private ServerHolder serverHandler;
+
+ public StompChannelHandler(ServerHolder serverHolder,
+ StompMarshaller marshaller,
+ final ChannelGroup group,
+ final ConnectionLifeCycleListener listener)
+ {
+ super(group, listener);
+ this.serverHandler = serverHolder;
+ this.marshaller = marshaller;
+ }
+
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
+ {
+ StompFrame frame = (StompFrame)e.getMessage();
+ System.out.println(">>> got frame " + frame);
+
+ // need to interact with HornetQ server & session
+ HornetQServer server = serverHandler.getServer();
+ RemotingConnection connection = serverHandler.getRemotingConnection(e.getChannel().getId());
+
+ String command = frame.getCommand();
+
+ StompFrame response = null;
+ if (Stomp.Commands.CONNECT.equals(command))
+ {
+ response = onConnect(frame, server, connection);
+ }
+ else
+ {
+ log.error("Unsupported Stomp frame: " + frame);
+ }
+ if (response != null)
+ {
+ System.out.println(">>> will reply " + response);
+ byte[] bytes = marshaller.marshal(response);
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+ System.out.println("ready to send reply: " + buffer);
+ connection.getTransportConnection().write(buffer, true);
+ }
+ }
+
+ private StompFrame onConnect(StompFrame frame, HornetQServer server, RemotingConnection connection) throws Exception
+ {
+ Map<String, Object> headers = frame.getHeaders();
+ String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
+ String passcode = (String)headers.get(Stomp.Headers.Connect.PASSCODE);
+ String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
+
+ String name = UUIDGenerator.getInstance().generateStringUUID();
+ server.createSession(name,
+ 1,
+ login,
+ passcode,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ VersionLoader.getVersion().getIncrementingVersion(),
+ connection,
+ true,
+ true,
+ false,
+ false,
+ -1);
+ ServerSession session = server.getSession(name);
+ System.out.println(">>> created session " + session);
+ HashMap<String, Object> h = new HashMap<String, Object>();
+ h.put(Stomp.Headers.Connected.SESSION, name);
+ h.put(Stomp.Headers.Connected.RESPONSE_ID, requestID);
+ return new StompFrame(Stomp.Responses.CONNECTED, h, new byte[] {});
+ }
+ }
+
+ @ChannelPipelineCoverage("one")
+ public abstract class AbstractServerChannelHandler extends HornetQChannelHandler
+ {
+ protected AbstractServerChannelHandler(final ChannelGroup group,
final ConnectionLifeCycleListener listener)
{
- super(group, decoder, handler, listener);
+ super(group, listener);
}
@Override
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java 2010-01-20 09:17:21 UTC (rev 8808)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java 2010-01-20 11:02:05 UTC (rev 8809)
@@ -32,11 +32,12 @@
{
public Acceptor createAcceptor(final Map<String, Object> configuration,
final BufferHandler handler,
+ final ServerHolder serverHolder,
final ConnectionLifeCycleListener listener,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool)
{
- return new NettyAcceptor(configuration, handler, listener, threadPool, scheduledThreadPool);
+ return new NettyAcceptor(configuration, handler, serverHolder, listener, threadPool, scheduledThreadPool);
}
public Set<String> getAllowableProperties()
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java 2010-01-20 09:17:21 UTC (rev 8808)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java 2010-01-20 11:02:05 UTC (rev 8809)
@@ -29,6 +29,7 @@
import javax.net.ssl.SSLException;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.PacketDecoder;
import org.hornetq.core.remoting.impl.CorePacketDecoder;
@@ -441,12 +442,23 @@
@ChannelPipelineCoverage("one")
private final class HornetQClientChannelHandler extends HornetQChannelHandler
{
+ private BufferHandler handler;
+
HornetQClientChannelHandler(final ChannelGroup group,
final BufferHandler handler,
final ConnectionLifeCycleListener listener)
{
- super(group, decoder, handler, listener);
+ super(group, listener);
+ this.handler = handler;
}
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
+ {
+ ChannelBuffer buffer = (ChannelBuffer)e.getMessage();
+
+ handler.bufferReceived(e.getChannel().getId(), new ChannelBufferWrapper(buffer), decoder);
+ }
}
@ChannelPipelineCoverage("one")
Added: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java 2010-01-20 11:02:05 UTC (rev 8809)
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.transports.netty;
+
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.server.HornetQServer;
+
+/**
+ * A ServerHolder
+ *
+ * @author jmesnil
+ *
+ *
+ */
+public interface ServerHolder
+{
+ HornetQServer getServer();
+
+ RemotingConnection getRemotingConnection(int connectionID);
+}
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java 2010-01-20 09:17:21 UTC (rev 8808)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java 2010-01-20 11:02:05 UTC (rev 8809)
@@ -18,6 +18,8 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
+import org.hornetq.integration.transports.netty.ServerHolder;
+
/**
* A factory for creating acceptors.
* <p/>
@@ -41,6 +43,7 @@
*/
Acceptor createAcceptor(final Map<String, Object> configuration,
BufferHandler handler,
+ ServerHolder holder,
ConnectionLifeCycleListener listener,
Executor threadPool,
ScheduledExecutorService scheduledThreadPool);
Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2010-01-20 09:17:21 UTC (rev 8808)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2010-01-20 11:02:05 UTC (rev 8809)
@@ -32,6 +32,7 @@
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
+import org.hornetq.tests.unit.core.remoting.impl.netty.DummyServerHolder;
import org.hornetq.tests.util.UnitTestCase;
/**
@@ -98,7 +99,8 @@
conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool);
+ DummyServerHolder serverHolder = new DummyServerHolder();
+ acceptor = new NettyAcceptor(conf, acceptorHandler, serverHolder, acceptorListener, threadPool, scheduledThreadPool);
acceptor.start();
SimpleBufferHandler2 connectorHandler = new SimpleBufferHandler2(connectorLatch);
@@ -152,7 +154,8 @@
conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool);
+ DummyServerHolder serverHolder = new DummyServerHolder();
+ acceptor = new NettyAcceptor(conf, acceptorHandler, serverHolder, acceptorListener, threadPool, scheduledThreadPool);
acceptor.start();
SimpleBufferHandler2 connectorHandler = new SimpleBufferHandler2(connectorLatch);
@@ -210,7 +213,8 @@
conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool);
+ DummyServerHolder serverHolder = new DummyServerHolder();
+ acceptor = new NettyAcceptor(conf, acceptorHandler, serverHolder, acceptorListener, threadPool, scheduledThreadPool);
acceptor.start();
SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
@@ -269,7 +273,8 @@
conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool);
+ DummyServerHolder serverHolder = new DummyServerHolder();
+ acceptor = new NettyAcceptor(conf, acceptorHandler, serverHolder, acceptorListener, threadPool, scheduledThreadPool);
acceptor.start();
SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
@@ -327,7 +332,8 @@
conf.put(TransportConstants.HTTP_CLIENT_IDLE_PROP_NAME, 500l);
DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool);
+ DummyServerHolder serverHolder = new DummyServerHolder();
+ acceptor = new NettyAcceptor(conf, acceptorHandler, serverHolder, acceptorListener, threadPool, scheduledThreadPool);
acceptor.start();
SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
@@ -381,7 +387,8 @@
conf.put(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, 5000l);
DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool);
+ DummyServerHolder serverHolder = new DummyServerHolder();
+ acceptor = new NettyAcceptor(conf, acceptorHandler, serverHolder, acceptorListener, threadPool, scheduledThreadPool);
acceptor.start();
BogusResponseHandler connectorHandler = new BogusResponseHandler(connectorLatch);
@@ -428,7 +435,8 @@
conf.put(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, 5000l);
DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler2 acceptorHandler = new SimpleBufferHandler2(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool);
+ DummyServerHolder serverHolder = new DummyServerHolder();
+ acceptor = new NettyAcceptor(conf, acceptorHandler, serverHolder, acceptorListener, threadPool, scheduledThreadPool);
acceptor.start();
BogusResponseHandler connectorHandler = new BogusResponseHandler(connectorLatch);
Added: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/DummyServerHolder.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/DummyServerHolder.java (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/DummyServerHolder.java 2010-01-20 11:02:05 UTC (rev 8809)
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.unit.core.remoting.impl.netty;
+
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.integration.transports.netty.ServerHolder;
+
+/**
+ * A DummyServerHolder
+ *
+ * @author jmesnil
+ *
+ *
+ */
+public class DummyServerHolder implements ServerHolder
+{
+ public HornetQServer getServer()
+ {
+ return null;
+ }
+
+ public RemotingConnection getRemotingConnection(int connectionID)
+ {
+ return null;
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java 2010-01-20 09:17:21 UTC (rev 8808)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java 2010-01-20 11:02:05 UTC (rev 8809)
@@ -24,6 +24,7 @@
import org.hornetq.core.remoting.PacketDecoder;
import org.hornetq.integration.transports.netty.NettyAcceptor;
import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
+import org.hornetq.integration.transports.netty.ServerHolder;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
@@ -67,9 +68,10 @@
{
}
};
-
+ ServerHolder holder = new DummyServerHolder();
Acceptor acceptor = factory.createAcceptor(params,
handler,
+ holder,
listener,
Executors.newCachedThreadPool(),
Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE));
Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2010-01-20 09:17:21 UTC (rev 8808)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2010-01-20 11:02:05 UTC (rev 8809)
@@ -24,6 +24,7 @@
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.PacketDecoder;
import org.hornetq.integration.transports.netty.NettyAcceptor;
+import org.hornetq.integration.transports.netty.ServerHolder;
import org.hornetq.integration.transports.netty.TransportConstants;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
@@ -81,8 +82,11 @@
{
}
};
+ ServerHolder holder = new DummyServerHolder();
+
NettyAcceptor acceptor = new NettyAcceptor(params,
handler,
+ holder,
listener,
Executors.newCachedThreadPool(),
Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE));
14 years, 11 months
JBoss hornetq SVN: r8808 - in branches/HORNETQ-129_STOMP_protocol: src/main/org/hornetq/core/remoting/impl and 5 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-01-20 04:17:21 -0500 (Wed, 20 Jan 2010)
New Revision: 8808
Removed:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/AbstractBufferHandler.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferHandler.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
Log:
* removed unused HornetQFrameDecoder
* removed unused method from BufferHandler
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2010-01-19 16:04:31 UTC (rev 8807)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2010-01-20 09:17:21 UTC (rev 8808)
@@ -41,7 +41,6 @@
import org.hornetq.core.remoting.Packet;
import org.hornetq.core.remoting.PacketDecoder;
import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.core.remoting.impl.AbstractBufferHandler;
import org.hornetq.core.remoting.impl.CorePacketDecoder;
import org.hornetq.core.remoting.impl.RemotingConnectionImpl;
import org.hornetq.core.remoting.impl.wireformat.CreateSessionMessage;
@@ -49,6 +48,7 @@
import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
import org.hornetq.core.remoting.impl.wireformat.Ping;
import org.hornetq.core.version.Version;
+import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
import org.hornetq.spi.core.remoting.Connector;
@@ -1088,7 +1088,7 @@
}
}
- private class DelegatingBufferHandler extends AbstractBufferHandler
+ private class DelegatingBufferHandler implements BufferHandler
{
public void bufferReceived(final Object connectionID, final HornetQBuffer buffer, final PacketDecoder decoder)
{
Deleted: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/AbstractBufferHandler.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/AbstractBufferHandler.java 2010-01-19 16:04:31 UTC (rev 8807)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/AbstractBufferHandler.java 2010-01-20 09:17:21 UTC (rev 8808)
@@ -1,46 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.core.remoting.impl;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.spi.core.remoting.BufferHandler;
-import org.hornetq.utils.DataConstants;
-
-/**
- * A AbstractBufferHandler
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- */
-public abstract class AbstractBufferHandler implements BufferHandler
-{
- private static final Logger log = Logger.getLogger(AbstractBufferHandler.class);
-
- public int isReadyToHandle(final HornetQBuffer buffer)
- {
- if (buffer.readableBytes() < DataConstants.SIZE_INT)
- {
- return -1;
- }
-
- int length = buffer.readInt();
-
- if (buffer.readableBytes() < length)
- {
- return -1;
- }
-
- return length;
- }
-}
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2010-01-19 16:04:31 UTC (rev 8807)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2010-01-20 09:17:21 UTC (rev 8808)
@@ -30,6 +30,7 @@
import org.hornetq.core.remoting.Packet;
import org.hornetq.core.remoting.PacketDecoder;
import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.utils.SimpleIDGenerator;
@@ -38,7 +39,7 @@
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
* @version <tt>$Revision$</tt> $Id$
*/
-public class RemotingConnectionImpl extends AbstractBufferHandler implements RemotingConnection
+public class RemotingConnectionImpl implements RemotingConnection, BufferHandler
{
// Constants
// ------------------------------------------------------------------------------------
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-01-19 16:04:31 UTC (rev 8807)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-01-20 09:17:21 UTC (rev 8808)
@@ -34,7 +34,6 @@
import org.hornetq.core.remoting.Packet;
import org.hornetq.core.remoting.PacketDecoder;
import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.core.remoting.impl.AbstractBufferHandler;
import org.hornetq.core.remoting.impl.RemotingConnectionImpl;
import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
import org.hornetq.core.remoting.impl.wireformat.Ping;
@@ -42,7 +41,6 @@
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.impl.HornetQPacketHandler;
import org.hornetq.core.server.management.ManagementService;
-import org.hornetq.integration.stomp.StompPacketDecoder;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.AcceptorFactory;
import org.hornetq.spi.core.remoting.BufferHandler;
@@ -423,7 +421,7 @@
// Inner classes -------------------------------------------------
- private final class DelegatingBufferHandler extends AbstractBufferHandler
+ private final class DelegatingBufferHandler implements BufferHandler
{
public void bufferReceived(final Object connectionID, final HornetQBuffer buffer, final PacketDecoder decoder)
{
Deleted: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java 2010-01-19 16:04:31 UTC (rev 8807)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java 2010-01-20 09:17:21 UTC (rev 8808)
@@ -1,78 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.integration.transports.netty;
-
-import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.spi.core.remoting.BufferHandler;
-import org.hornetq.utils.DataConstants;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.frame.FrameDecoder;
-
-/**
- * A Netty FrameDecoder used to decode messages.
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- * @author <a href="ataylor(a)redhat.com">Andy Taylor</a>
- * @author <a href="tlee(a)redhat.com">Trustin Lee</a>
- *
- * @version $Revision$, $Date$
- */
-public class HornetQFrameDecoder extends FrameDecoder
-{
- private static final Logger log = Logger.getLogger(HornetQFrameDecoder.class);
-
- private final BufferHandler handler;
-
- public HornetQFrameDecoder(final BufferHandler handler)
- {
- this.handler = handler;
- }
-
- // FrameDecoder overrides
- // -------------------------------------------------------------------------------------
-
- @Override
- protected Object decode(final ChannelHandlerContext ctx, final Channel channel, final ChannelBuffer in) throws Exception
- {
- // TODO - we can avoid this entirely if we maintain fragmented packets in the handler
- int start = in.readerIndex();
-
- int length = handler.isReadyToHandle(new ChannelBufferWrapper(in));
-
- in.readerIndex(start);
-
- if (length == -1)
- {
- return null;
- }
-
- // in.readerIndex(start + SIZE_INT);
-
- ChannelBuffer buffer = in.readBytes(length + DataConstants.SIZE_INT);
-
- // FIXME - we should get Netty to give us a DynamicBuffer - seems to currently give us a non resizable buffer
-
- ChannelBuffer newBuffer = ChannelBuffers.dynamicBuffer(buffer.writerIndex());
-
- newBuffer.writeBytes(buffer);
-
- newBuffer.readInt();
-
- return newBuffer;
- }
-}
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferHandler.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferHandler.java 2010-01-19 16:04:31 UTC (rev 8807)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferHandler.java 2010-01-20 09:17:21 UTC (rev 8808)
@@ -31,14 +31,4 @@
* @param buffer the buffer to decode
*/
void bufferReceived(Object connectionID, HornetQBuffer buffer, PacketDecoder decoder);
-
- /**
- * called by the remoting connection prior to {@link org.hornetq.spi.core.remoting.BufferHandler#bufferReceived(Object, org.hornetq.api.core.HornetQBuffer)}.
- * <p/>
- * The implementation should return true if there is enough data in the buffer to decode. otherwise false.
- *
- * @param buffer the buffer
- * @return true id the buffer can be decoded..
- */
- int isReadyToHandle(HornetQBuffer buffer);
}
Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2010-01-19 16:04:31 UTC (rev 8807)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2010-01-20 09:17:21 UTC (rev 8808)
@@ -26,7 +26,6 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.PacketDecoder;
-import org.hornetq.core.remoting.impl.AbstractBufferHandler;
import org.hornetq.integration.transports.netty.NettyAcceptor;
import org.hornetq.integration.transports.netty.NettyConnector;
import org.hornetq.integration.transports.netty.TransportConstants;
@@ -462,7 +461,7 @@
}
}
- class SimpleBufferHandler extends AbstractBufferHandler
+ class SimpleBufferHandler implements BufferHandler
{
int messagesReceieved = 0;
@@ -484,7 +483,7 @@
}
}
- class SimpleBufferHandler2 extends AbstractBufferHandler
+ class SimpleBufferHandler2 implements BufferHandler
{
int messagesReceieved = 0;
Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java 2010-01-19 16:04:31 UTC (rev 8807)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java 2010-01-20 09:17:21 UTC (rev 8808)
@@ -22,7 +22,6 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.PacketDecoder;
-import org.hornetq.core.remoting.impl.AbstractBufferHandler;
import org.hornetq.integration.transports.netty.NettyAcceptor;
import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
import org.hornetq.spi.core.remoting.Acceptor;
@@ -45,7 +44,7 @@
NettyAcceptorFactory factory = new NettyAcceptorFactory();
Map<String, Object> params = new HashMap<String, Object>();
- BufferHandler handler = new AbstractBufferHandler()
+ BufferHandler handler = new BufferHandler()
{
public void bufferReceived(Object connectionID, HornetQBuffer buffer, PacketDecoder decoder)
Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2010-01-19 16:04:31 UTC (rev 8807)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2010-01-20 09:17:21 UTC (rev 8808)
@@ -23,7 +23,6 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.PacketDecoder;
-import org.hornetq.core.remoting.impl.AbstractBufferHandler;
import org.hornetq.integration.transports.netty.NettyAcceptor;
import org.hornetq.integration.transports.netty.TransportConstants;
import org.hornetq.spi.core.remoting.BufferHandler;
@@ -58,7 +57,7 @@
public void testStartStop() throws Exception
{
- BufferHandler handler = new AbstractBufferHandler()
+ BufferHandler handler = new BufferHandler()
{
public void bufferReceived(final Object connectionID, final HornetQBuffer buffer, final PacketDecoder decoder)
Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java 2010-01-19 16:04:31 UTC (rev 8807)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java 2010-01-20 09:17:21 UTC (rev 8808)
@@ -22,7 +22,6 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.remoting.PacketDecoder;
-import org.hornetq.core.remoting.impl.AbstractBufferHandler;
import org.hornetq.integration.transports.netty.NettyConnector;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
@@ -50,7 +49,7 @@
public void testStartStop() throws Exception
{
- BufferHandler handler = new AbstractBufferHandler()
+ BufferHandler handler = new BufferHandler()
{
public void bufferReceived(final Object connectionID, final HornetQBuffer buffer, final PacketDecoder decoder)
{
@@ -87,7 +86,7 @@
public void testNullParams() throws Exception
{
- BufferHandler handler = new AbstractBufferHandler()
+ BufferHandler handler = new BufferHandler()
{
public void bufferReceived(final Object connectionID, final HornetQBuffer buffer, final PacketDecoder decoder)
{
14 years, 11 months
JBoss hornetq SVN: r8807 - in branches/HORNETQ-129_STOMP_protocol: src/main/org/hornetq/core/client/impl and 12 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-01-19 11:04:31 -0500 (Tue, 19 Jan 2010)
New Revision: 8807
Added:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/PacketDecoder.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/CorePacketDecoder.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolConverter.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolException.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/Stomp.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrame.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameDelimiter.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameError.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompMarshaller.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompPacketDecoder.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest2.java
Removed:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
Modified:
branches/HORNETQ-129_STOMP_protocol/.classpath
branches/HORNETQ-129_STOMP_protocol/build-hornetq.xml
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQChannelHandler.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnection.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/TransportConstants.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferHandler.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* incomplete prototype
* copied StompTest from StompConnect project and adapt it to use HornetQ server
Modified: branches/HORNETQ-129_STOMP_protocol/.classpath
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/.classpath 2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/.classpath 2010-01-19 16:04:31 UTC (rev 8807)
@@ -100,7 +100,7 @@
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="lib" path="tests/tmpfiles"/>
<classpathentry kind="lib" path="thirdparty/net/java/dev/javacc/lib/javacc.jar"/>
- <classpathentry kind="lib" path="thirdparty/org/jboss/netty/lib/netty.jar"/>
+ <classpathentry kind="lib" path="thirdparty/org/jboss/netty/lib/netty.jar" sourcepath="/Users/jmesnil/.m2/repository/org/jboss/netty/netty/3.1.5.GA/netty-3.1.5.GA-sources.jar"/>
<classpathentry kind="lib" path="thirdparty/log4j/lib/log4j.jar"/>
<classpathentry kind="lib" path="thirdparty/org/jboss/naming/lib/jnpserver.jar"/>
<classpathentry kind="lib" path="thirdparty/org/jboss/security/lib/jbosssx.jar"/>
Modified: branches/HORNETQ-129_STOMP_protocol/build-hornetq.xml
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/build-hornetq.xml 2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/build-hornetq.xml 2010-01-19 16:04:31 UTC (rev 8807)
@@ -1511,7 +1511,7 @@
<jvmarg value="-Dcom.sun.management.jmxremote"/>
<jvmarg value="-Djava.util.logging.config.file=${src.config.trunk.non-clustered.dir}/logging.properties"/>
<jvmarg value="-Djava.library.path=${native.bin.dir}"/>
- <!--<jvmarg line="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000"/>-->
+ <!-- <jvmarg line="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000"/> -->
<arg line="hornetq-beans.xml"/>
<classpath path="${src.config.trunk.non-clustered.dir}" />
<classpath refid="jms.standalone.server.classpath"/>
@@ -1540,23 +1540,20 @@
</target>
<target name="debugServer" depends="jar">
+ <mkdir dir="logs"/>
<java classname="org.hornetq.integration.bootstrap.HornetQBootstrapServer" fork="true">
<jvmarg value="-XX:+UseParallelGC"/>
<jvmarg value="-Xms512M"/>
<jvmarg value="-Xmx2048M"/>
<jvmarg value="-XX:+AggressiveOpts"/>
<jvmarg value="-XX:+UseFastAccessorMethods"/>
- <jvmarg value="-Xdebug"/>
- <jvmarg value="-Xnoagent"/>
- <jvmarg value="-Djava.compiler=NONE"/>
<jvmarg value="-Dcom.sun.management.jmxremote"/>
- <jvmarg value="-Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000"/>
- <jvmarg value="-Djava.util.logging.config.file=${src.config.standalone.non-clustered.dir}/logging.properties"/>
+ <jvmarg value="-Djava.util.logging.config.file=${src.config.trunk.non-clustered.dir}/logging.properties"/>
<jvmarg value="-Djava.library.path=${native.bin.dir}"/>
- <jvmarg value="-Djava.naming.factory.initial=org.jnp.interfaces.NamingContextFactory"/>
- <jvmarg value="-Djava.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces"/>
+ <jvmarg line="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000"/>
<arg line="hornetq-beans.xml"/>
- <classpath path="${src.config.standalone.non-clustered.dir}" />
+ <classpath path="${src.config.trunk.non-clustered.dir}" />
+ <classpath refid="jms.standalone.server.classpath"/>
</java>
</target>
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2010-01-19 16:04:31 UTC (rev 8807)
@@ -39,8 +39,10 @@
import org.hornetq.core.remoting.ChannelHandler;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.PacketDecoder;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.AbstractBufferHandler;
+import org.hornetq.core.remoting.impl.CorePacketDecoder;
import org.hornetq.core.remoting.impl.RemotingConnectionImpl;
import org.hornetq.core.remoting.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
@@ -529,6 +531,8 @@
private volatile boolean stopPingingAfterOne;
+ private final PacketDecoder decoder = new CorePacketDecoder();
+
public void stopPingingAfterOne()
{
stopPingingAfterOne = true;
@@ -1086,13 +1090,13 @@
private class DelegatingBufferHandler extends AbstractBufferHandler
{
- public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
+ public void bufferReceived(final Object connectionID, final HornetQBuffer buffer, final PacketDecoder decoder)
{
RemotingConnection theConn = connection;
if (theConn != null && connectionID == theConn.getID())
{
- theConn.bufferReceived(connectionID, buffer);
+ theConn.bufferReceived(connectionID, buffer, decoder );
}
}
}
Copied: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/PacketDecoder.java (from rev 8798, trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java)
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/PacketDecoder.java (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/PacketDecoder.java 2010-01-19 16:04:31 UTC (rev 8807)
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting;
+
+import org.hornetq.api.core.HornetQBuffer;
+
+/**
+ * A PacketDecoder
+ *
+ * @author jmesnil
+ *
+ *
+ */
+public interface PacketDecoder
+{
+
+ public abstract Packet decode(final HornetQBuffer in);
+
+}
\ No newline at end of file
Copied: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/CorePacketDecoder.java (from rev 8798, trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java)
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/CorePacketDecoder.java (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/CorePacketDecoder.java 2010-01-19 16:04:31 UTC (rev 8807)
@@ -0,0 +1,495 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl;
+
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATESESSION_RESP;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_QUEUE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_REPLICATION;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.DELETE_QUEUE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.DISCONNECT;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.NULL_RESPONSE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.PACKETS_CONFIRMED;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.PING;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION_RESP;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_APPEND;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_APPEND_TX;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMMIT_ROLLBACK;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMPARE_DATA;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_DELETE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_DELETE_TX;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_END;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_WRITE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_PAGE_EVENT;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_PAGE_WRITE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_PREPARE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_RESPONSE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_ACKNOWLEDGE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY_RESP;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_FORCE_CONSUMER_DELIVERY;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_PRODUCER_CREDITS;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_PRODUCER_REQUEST_CREDITS;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY_RESP;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_CONTINUATION;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_LARGE_MSG;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_MSG;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_CONTINUATION;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_LARGE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_START;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_STOP;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_COMMIT;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_END;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_FORGET;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_GET_TIMEOUT;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_GET_TIMEOUT_RESP;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_INDOUBT_XIDS;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_INDOUBT_XIDS_RESP;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_JOIN;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_PREPARE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_RESP;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_RESUME;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_ROLLBACK;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT_RESP;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_START;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.PacketDecoder;
+import org.hornetq.core.remoting.impl.wireformat.CreateQueueMessage;
+import org.hornetq.core.remoting.impl.wireformat.CreateReplicationSessionMessage;
+import org.hornetq.core.remoting.impl.wireformat.CreateSessionMessage;
+import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.HornetQExceptionMessage;
+import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
+import org.hornetq.core.remoting.impl.wireformat.PacketsConfirmedMessage;
+import org.hornetq.core.remoting.impl.wireformat.Ping;
+import org.hornetq.core.remoting.impl.wireformat.ReattachSessionMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationCompareDataMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageWriteMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargemessageEndMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationPageEventMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationPageWriteMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.RollbackMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionCloseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionCommitMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionExpiredMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
+import org.hornetq.core.remoting.impl.wireformat.SessionProducerCreditsMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionReceiveLargeMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionReceiveMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionRequestProducerCreditsMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionSendContinuationMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionSendLargeMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionSendMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXACommitMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXAEndMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXAForgetMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXAJoinMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXAPrepareMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXAResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXAResumeMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXARollbackMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXAStartMessage;
+
+/**
+ * A CorePacketDecoder
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class CorePacketDecoder implements PacketDecoder
+{
+ private static final Logger log = Logger.getLogger(CorePacketDecoder.class);
+
+ public Packet decode(final HornetQBuffer in)
+ {
+ final byte packetType = in.readByte();
+
+ Packet packet;
+
+ switch (packetType)
+ {
+ case PING:
+ {
+ packet = new Ping();
+ break;
+ }
+ case DISCONNECT:
+ {
+ packet = new PacketImpl(PacketImpl.DISCONNECT);
+ break;
+ }
+ case EXCEPTION:
+ {
+ packet = new HornetQExceptionMessage();
+ break;
+ }
+ case PACKETS_CONFIRMED:
+ {
+ packet = new PacketsConfirmedMessage();
+ break;
+ }
+ case CREATESESSION:
+ {
+ packet = new CreateSessionMessage();
+ break;
+ }
+ case CREATESESSION_RESP:
+ {
+ packet = new CreateSessionResponseMessage();
+ break;
+ }
+ case REATTACH_SESSION:
+ {
+ packet = new ReattachSessionMessage();
+ break;
+ }
+ case REATTACH_SESSION_RESP:
+ {
+ packet = new ReattachSessionResponseMessage();
+ break;
+ }
+ case SESS_CLOSE:
+ {
+ packet = new SessionCloseMessage();
+ break;
+ }
+ case SESS_CREATECONSUMER:
+ {
+ packet = new SessionCreateConsumerMessage();
+ break;
+ }
+ case SESS_ACKNOWLEDGE:
+ {
+ packet = new SessionAcknowledgeMessage();
+ break;
+ }
+ case SESS_EXPIRED:
+ {
+ packet = new SessionExpiredMessage();
+ break;
+ }
+ case SESS_COMMIT:
+ {
+ packet = new SessionCommitMessage();
+ break;
+ }
+ case SESS_ROLLBACK:
+ {
+ packet = new RollbackMessage();
+ break;
+ }
+ case SESS_QUEUEQUERY:
+ {
+ packet = new SessionQueueQueryMessage();
+ break;
+ }
+ case SESS_QUEUEQUERY_RESP:
+ {
+ packet = new SessionQueueQueryResponseMessage();
+ break;
+ }
+ case CREATE_QUEUE:
+ {
+ packet = new CreateQueueMessage();
+ break;
+ }
+ case DELETE_QUEUE:
+ {
+ packet = new SessionDeleteQueueMessage();
+ break;
+ }
+ case SESS_BINDINGQUERY:
+ {
+ packet = new SessionBindingQueryMessage();
+ break;
+ }
+ case SESS_BINDINGQUERY_RESP:
+ {
+ packet = new SessionBindingQueryResponseMessage();
+ break;
+ }
+ case SESS_XA_START:
+ {
+ packet = new SessionXAStartMessage();
+ break;
+ }
+ case SESS_XA_END:
+ {
+ packet = new SessionXAEndMessage();
+ break;
+ }
+ case SESS_XA_COMMIT:
+ {
+ packet = new SessionXACommitMessage();
+ break;
+ }
+ case SESS_XA_PREPARE:
+ {
+ packet = new SessionXAPrepareMessage();
+ break;
+ }
+ case SESS_XA_RESP:
+ {
+ packet = new SessionXAResponseMessage();
+ break;
+ }
+ case SESS_XA_ROLLBACK:
+ {
+ packet = new SessionXARollbackMessage();
+ break;
+ }
+ case SESS_XA_JOIN:
+ {
+ packet = new SessionXAJoinMessage();
+ break;
+ }
+ case SESS_XA_SUSPEND:
+ {
+ packet = new PacketImpl(PacketImpl.SESS_XA_SUSPEND);
+ break;
+ }
+ case SESS_XA_RESUME:
+ {
+ packet = new SessionXAResumeMessage();
+ break;
+ }
+ case SESS_XA_FORGET:
+ {
+ packet = new SessionXAForgetMessage();
+ break;
+ }
+ case SESS_XA_INDOUBT_XIDS:
+ {
+ packet = new PacketImpl(PacketImpl.SESS_XA_INDOUBT_XIDS);
+ break;
+ }
+ case SESS_XA_INDOUBT_XIDS_RESP:
+ {
+ packet = new SessionXAGetInDoubtXidsResponseMessage();
+ break;
+ }
+ case SESS_XA_SET_TIMEOUT:
+ {
+ packet = new SessionXASetTimeoutMessage();
+ break;
+ }
+ case SESS_XA_SET_TIMEOUT_RESP:
+ {
+ packet = new SessionXASetTimeoutResponseMessage();
+ break;
+ }
+ case SESS_XA_GET_TIMEOUT:
+ {
+ packet = new PacketImpl(PacketImpl.SESS_XA_GET_TIMEOUT);
+ break;
+ }
+ case SESS_XA_GET_TIMEOUT_RESP:
+ {
+ packet = new SessionXAGetTimeoutResponseMessage();
+ break;
+ }
+ case SESS_START:
+ {
+ packet = new PacketImpl(PacketImpl.SESS_START);
+ break;
+ }
+ case SESS_STOP:
+ {
+ packet = new PacketImpl(PacketImpl.SESS_STOP);
+ break;
+ }
+ case SESS_FLOWTOKEN:
+ {
+ packet = new SessionConsumerFlowCreditMessage();
+ break;
+ }
+ case SESS_SEND:
+ {
+ packet = new SessionSendMessage();
+ break;
+ }
+ case SESS_SEND_LARGE:
+ {
+ packet = new SessionSendLargeMessage();
+ break;
+ }
+ case SESS_RECEIVE_MSG:
+ {
+ packet = new SessionReceiveMessage();
+ break;
+ }
+ case SESS_RECEIVE_LARGE_MSG:
+ {
+ packet = new SessionReceiveLargeMessage();
+ break;
+ }
+ case SESS_CONSUMER_CLOSE:
+ {
+ packet = new SessionConsumerCloseMessage();
+ break;
+ }
+ case NULL_RESPONSE:
+ {
+ packet = new NullResponseMessage();
+ break;
+ }
+ case SESS_RECEIVE_CONTINUATION:
+ {
+ packet = new SessionReceiveContinuationMessage();
+ break;
+ }
+ case SESS_SEND_CONTINUATION:
+ {
+ packet = new SessionSendContinuationMessage();
+ break;
+ }
+ case SESS_PRODUCER_REQUEST_CREDITS:
+ {
+ packet = new SessionRequestProducerCreditsMessage();
+ break;
+ }
+ case SESS_PRODUCER_CREDITS:
+ {
+ packet = new SessionProducerCreditsMessage();
+ break;
+ }
+ case CREATE_REPLICATION:
+ {
+ packet = new CreateReplicationSessionMessage();
+ break;
+ }
+ case REPLICATION_APPEND:
+ {
+ packet = new ReplicationAddMessage();
+ break;
+ }
+ case REPLICATION_APPEND_TX:
+ {
+ packet = new ReplicationAddTXMessage();
+ break;
+ }
+ case REPLICATION_DELETE:
+ {
+ packet = new ReplicationDeleteMessage();
+ break;
+ }
+ case REPLICATION_DELETE_TX:
+ {
+ packet = new ReplicationDeleteTXMessage();
+ break;
+ }
+ case REPLICATION_PREPARE:
+ {
+ packet = new ReplicationPrepareMessage();
+ break;
+ }
+ case REPLICATION_COMMIT_ROLLBACK:
+ {
+ packet = new ReplicationCommitMessage();
+ break;
+ }
+ case REPLICATION_RESPONSE:
+ {
+ packet = new ReplicationResponseMessage();
+ break;
+ }
+ case REPLICATION_PAGE_WRITE:
+ {
+ packet = new ReplicationPageWriteMessage();
+ break;
+ }
+ case REPLICATION_PAGE_EVENT:
+ {
+ packet = new ReplicationPageEventMessage();
+ break;
+ }
+ case REPLICATION_LARGE_MESSAGE_BEGIN:
+ {
+ packet = new ReplicationLargeMessageBeingMessage();
+ break;
+ }
+ case REPLICATION_LARGE_MESSAGE_END:
+ {
+ packet = new ReplicationLargemessageEndMessage();
+ break;
+ }
+ case REPLICATION_LARGE_MESSAGE_WRITE:
+ {
+ packet = new ReplicationLargeMessageWriteMessage();
+ break;
+ }
+ case REPLICATION_COMPARE_DATA:
+ {
+ packet = new ReplicationCompareDataMessage();
+ break;
+ }
+ case SESS_FORCE_CONSUMER_DELIVERY:
+ {
+ packet = new SessionForceConsumerDelivery();
+ break;
+ }
+ default:
+ {
+ throw new IllegalArgumentException("Invalid type: " + packetType);
+ }
+ }
+
+ packet.decode(in);
+
+ return packet;
+ }
+
+}
Deleted: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2010-01-19 16:04:31 UTC (rev 8807)
@@ -1,494 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.remoting.impl;
-
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATESESSION_RESP;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_QUEUE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_REPLICATION;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.DELETE_QUEUE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.DISCONNECT;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.NULL_RESPONSE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.PACKETS_CONFIRMED;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.PING;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION_RESP;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_APPEND;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_APPEND_TX;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMMIT_ROLLBACK;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMPARE_DATA;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_DELETE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_DELETE_TX;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_END;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_WRITE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_PAGE_EVENT;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_PAGE_WRITE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_PREPARE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_RESPONSE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_ACKNOWLEDGE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY_RESP;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_FORCE_CONSUMER_DELIVERY;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_PRODUCER_CREDITS;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_PRODUCER_REQUEST_CREDITS;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY_RESP;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_CONTINUATION;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_LARGE_MSG;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_MSG;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_CONTINUATION;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_LARGE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_START;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_STOP;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_COMMIT;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_END;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_FORGET;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_GET_TIMEOUT;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_GET_TIMEOUT_RESP;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_INDOUBT_XIDS;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_INDOUBT_XIDS_RESP;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_JOIN;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_PREPARE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_RESP;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_RESUME;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_ROLLBACK;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT_RESP;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_START;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.impl.wireformat.CreateQueueMessage;
-import org.hornetq.core.remoting.impl.wireformat.CreateReplicationSessionMessage;
-import org.hornetq.core.remoting.impl.wireformat.CreateSessionMessage;
-import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.HornetQExceptionMessage;
-import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
-import org.hornetq.core.remoting.impl.wireformat.PacketsConfirmedMessage;
-import org.hornetq.core.remoting.impl.wireformat.Ping;
-import org.hornetq.core.remoting.impl.wireformat.ReattachSessionMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationCompareDataMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageWriteMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationLargemessageEndMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationPageEventMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationPageWriteMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.RollbackMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionCloseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionCommitMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionExpiredMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
-import org.hornetq.core.remoting.impl.wireformat.SessionProducerCreditsMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionReceiveLargeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionReceiveMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionRequestProducerCreditsMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionSendContinuationMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionSendLargeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionSendMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXACommitMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAEndMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAForgetMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAJoinMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAPrepareMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAResumeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXARollbackMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAStartMessage;
-
-/**
- * A PacketDecoder
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- *
- */
-public class PacketDecoder
-{
- private static final Logger log = Logger.getLogger(PacketDecoder.class);
-
- public Packet decode(final HornetQBuffer in)
- {
- final byte packetType = in.readByte();
-
- Packet packet;
-
- switch (packetType)
- {
- case PING:
- {
- packet = new Ping();
- break;
- }
- case DISCONNECT:
- {
- packet = new PacketImpl(PacketImpl.DISCONNECT);
- break;
- }
- case EXCEPTION:
- {
- packet = new HornetQExceptionMessage();
- break;
- }
- case PACKETS_CONFIRMED:
- {
- packet = new PacketsConfirmedMessage();
- break;
- }
- case CREATESESSION:
- {
- packet = new CreateSessionMessage();
- break;
- }
- case CREATESESSION_RESP:
- {
- packet = new CreateSessionResponseMessage();
- break;
- }
- case REATTACH_SESSION:
- {
- packet = new ReattachSessionMessage();
- break;
- }
- case REATTACH_SESSION_RESP:
- {
- packet = new ReattachSessionResponseMessage();
- break;
- }
- case SESS_CLOSE:
- {
- packet = new SessionCloseMessage();
- break;
- }
- case SESS_CREATECONSUMER:
- {
- packet = new SessionCreateConsumerMessage();
- break;
- }
- case SESS_ACKNOWLEDGE:
- {
- packet = new SessionAcknowledgeMessage();
- break;
- }
- case SESS_EXPIRED:
- {
- packet = new SessionExpiredMessage();
- break;
- }
- case SESS_COMMIT:
- {
- packet = new SessionCommitMessage();
- break;
- }
- case SESS_ROLLBACK:
- {
- packet = new RollbackMessage();
- break;
- }
- case SESS_QUEUEQUERY:
- {
- packet = new SessionQueueQueryMessage();
- break;
- }
- case SESS_QUEUEQUERY_RESP:
- {
- packet = new SessionQueueQueryResponseMessage();
- break;
- }
- case CREATE_QUEUE:
- {
- packet = new CreateQueueMessage();
- break;
- }
- case DELETE_QUEUE:
- {
- packet = new SessionDeleteQueueMessage();
- break;
- }
- case SESS_BINDINGQUERY:
- {
- packet = new SessionBindingQueryMessage();
- break;
- }
- case SESS_BINDINGQUERY_RESP:
- {
- packet = new SessionBindingQueryResponseMessage();
- break;
- }
- case SESS_XA_START:
- {
- packet = new SessionXAStartMessage();
- break;
- }
- case SESS_XA_END:
- {
- packet = new SessionXAEndMessage();
- break;
- }
- case SESS_XA_COMMIT:
- {
- packet = new SessionXACommitMessage();
- break;
- }
- case SESS_XA_PREPARE:
- {
- packet = new SessionXAPrepareMessage();
- break;
- }
- case SESS_XA_RESP:
- {
- packet = new SessionXAResponseMessage();
- break;
- }
- case SESS_XA_ROLLBACK:
- {
- packet = new SessionXARollbackMessage();
- break;
- }
- case SESS_XA_JOIN:
- {
- packet = new SessionXAJoinMessage();
- break;
- }
- case SESS_XA_SUSPEND:
- {
- packet = new PacketImpl(PacketImpl.SESS_XA_SUSPEND);
- break;
- }
- case SESS_XA_RESUME:
- {
- packet = new SessionXAResumeMessage();
- break;
- }
- case SESS_XA_FORGET:
- {
- packet = new SessionXAForgetMessage();
- break;
- }
- case SESS_XA_INDOUBT_XIDS:
- {
- packet = new PacketImpl(PacketImpl.SESS_XA_INDOUBT_XIDS);
- break;
- }
- case SESS_XA_INDOUBT_XIDS_RESP:
- {
- packet = new SessionXAGetInDoubtXidsResponseMessage();
- break;
- }
- case SESS_XA_SET_TIMEOUT:
- {
- packet = new SessionXASetTimeoutMessage();
- break;
- }
- case SESS_XA_SET_TIMEOUT_RESP:
- {
- packet = new SessionXASetTimeoutResponseMessage();
- break;
- }
- case SESS_XA_GET_TIMEOUT:
- {
- packet = new PacketImpl(PacketImpl.SESS_XA_GET_TIMEOUT);
- break;
- }
- case SESS_XA_GET_TIMEOUT_RESP:
- {
- packet = new SessionXAGetTimeoutResponseMessage();
- break;
- }
- case SESS_START:
- {
- packet = new PacketImpl(PacketImpl.SESS_START);
- break;
- }
- case SESS_STOP:
- {
- packet = new PacketImpl(PacketImpl.SESS_STOP);
- break;
- }
- case SESS_FLOWTOKEN:
- {
- packet = new SessionConsumerFlowCreditMessage();
- break;
- }
- case SESS_SEND:
- {
- packet = new SessionSendMessage();
- break;
- }
- case SESS_SEND_LARGE:
- {
- packet = new SessionSendLargeMessage();
- break;
- }
- case SESS_RECEIVE_MSG:
- {
- packet = new SessionReceiveMessage();
- break;
- }
- case SESS_RECEIVE_LARGE_MSG:
- {
- packet = new SessionReceiveLargeMessage();
- break;
- }
- case SESS_CONSUMER_CLOSE:
- {
- packet = new SessionConsumerCloseMessage();
- break;
- }
- case NULL_RESPONSE:
- {
- packet = new NullResponseMessage();
- break;
- }
- case SESS_RECEIVE_CONTINUATION:
- {
- packet = new SessionReceiveContinuationMessage();
- break;
- }
- case SESS_SEND_CONTINUATION:
- {
- packet = new SessionSendContinuationMessage();
- break;
- }
- case SESS_PRODUCER_REQUEST_CREDITS:
- {
- packet = new SessionRequestProducerCreditsMessage();
- break;
- }
- case SESS_PRODUCER_CREDITS:
- {
- packet = new SessionProducerCreditsMessage();
- break;
- }
- case CREATE_REPLICATION:
- {
- packet = new CreateReplicationSessionMessage();
- break;
- }
- case REPLICATION_APPEND:
- {
- packet = new ReplicationAddMessage();
- break;
- }
- case REPLICATION_APPEND_TX:
- {
- packet = new ReplicationAddTXMessage();
- break;
- }
- case REPLICATION_DELETE:
- {
- packet = new ReplicationDeleteMessage();
- break;
- }
- case REPLICATION_DELETE_TX:
- {
- packet = new ReplicationDeleteTXMessage();
- break;
- }
- case REPLICATION_PREPARE:
- {
- packet = new ReplicationPrepareMessage();
- break;
- }
- case REPLICATION_COMMIT_ROLLBACK:
- {
- packet = new ReplicationCommitMessage();
- break;
- }
- case REPLICATION_RESPONSE:
- {
- packet = new ReplicationResponseMessage();
- break;
- }
- case REPLICATION_PAGE_WRITE:
- {
- packet = new ReplicationPageWriteMessage();
- break;
- }
- case REPLICATION_PAGE_EVENT:
- {
- packet = new ReplicationPageEventMessage();
- break;
- }
- case REPLICATION_LARGE_MESSAGE_BEGIN:
- {
- packet = new ReplicationLargeMessageBeingMessage();
- break;
- }
- case REPLICATION_LARGE_MESSAGE_END:
- {
- packet = new ReplicationLargemessageEndMessage();
- break;
- }
- case REPLICATION_LARGE_MESSAGE_WRITE:
- {
- packet = new ReplicationLargeMessageWriteMessage();
- break;
- }
- case REPLICATION_COMPARE_DATA:
- {
- packet = new ReplicationCompareDataMessage();
- break;
- }
- case SESS_FORCE_CONSUMER_DELIVERY:
- {
- packet = new SessionForceConsumerDelivery();
- break;
- }
- default:
- {
- throw new IllegalArgumentException("Invalid type: " + packetType);
- }
- }
-
- packet.decode(in);
-
- return packet;
- }
-
-}
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2010-01-19 16:04:31 UTC (rev 8807)
@@ -28,8 +28,8 @@
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.PacketDecoder;
import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.utils.SimpleIDGenerator;
@@ -79,8 +79,6 @@
private final Object failLock = new Object();
- private final PacketDecoder decoder = new PacketDecoder();
-
private volatile boolean dataReceived;
private final Executor executor;
@@ -123,7 +121,7 @@
this.interceptors = interceptors;
this.client = client;
-
+
this.executor = executor;
}
@@ -351,7 +349,7 @@
private volatile boolean executing;
- public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
+ public void bufferReceived(final Object connectionID, final HornetQBuffer buffer, final PacketDecoder decoder)
{
final Packet packet = decoder.decode(buffer);
@@ -391,7 +389,46 @@
dataReceived = true;
}
+
+ public void packetReceived(final Object connectionID, final Packet packet)
+ {
+ if (packet.isAsyncExec() && executor != null)
+ {
+ executing = true;
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ doBufferReceived(packet);
+ }
+ catch (Throwable t)
+ {
+ RemotingConnectionImpl.log.error("Unexpected error", t);
+ }
+
+ executing = false;
+ }
+ });
+ }
+ else
+ {
+ //To prevent out of order execution if interleaving sync and async operations on same connection
+ while (executing)
+ {
+ Thread.yield();
+ }
+
+ // Pings must always be handled out of band so we can send pings back to the client quickly
+ // otherwise they would get in the queue with everything else which might give an intolerable delay
+ doBufferReceived(packet);
+ }
+
+ dataReceived = true;
+ }
+
private void doBufferReceived(final Packet packet)
{
if (interceptors != null)
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2010-01-19 16:04:31 UTC (rev 8807)
@@ -19,6 +19,8 @@
import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.PacketDecoder;
+import org.hornetq.core.remoting.impl.CorePacketDecoder;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -36,6 +38,8 @@
private final BufferHandler handler;
+ private final PacketDecoder decoder = new CorePacketDecoder();
+
private final ConnectionLifeCycleListener listener;
private final String id;
@@ -128,7 +132,7 @@
{
copied.readInt(); // read and discard
- handler.bufferReceived(id, copied);
+ handler.bufferReceived(id, copied, decoder);
}
}
catch (Exception e)
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-01-19 16:04:31 UTC (rev 8807)
@@ -32,6 +32,7 @@
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.PacketDecoder;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.AbstractBufferHandler;
import org.hornetq.core.remoting.impl.RemotingConnectionImpl;
@@ -41,6 +42,7 @@
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.impl.HornetQPacketHandler;
import org.hornetq.core.server.management.ManagementService;
+import org.hornetq.integration.stomp.StompPacketDecoder;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.AcceptorFactory;
import org.hornetq.spi.core.remoting.BufferHandler;
@@ -184,7 +186,7 @@
a.start();
}
- //This thread checks connections that need to be closed, and also flushes confirmations
+ // This thread checks connections that need to be closed, and also flushes confirmations
failureCheckAndFlushThread = new FailureCheckAndFlushThread(RemotingServiceImpl.CONNECTION_TTL_CHECK_INTERVAL);
failureCheckAndFlushThread.start();
@@ -423,13 +425,13 @@
private final class DelegatingBufferHandler extends AbstractBufferHandler
{
- public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
+ public void bufferReceived(final Object connectionID, final HornetQBuffer buffer, final PacketDecoder decoder)
{
ConnectionEntry conn = connections.get(connectionID);
if (conn != null)
{
- conn.connection.bufferReceived(connectionID, buffer);
+ conn.connection.bufferReceived(connectionID, buffer, decoder);
}
}
}
@@ -495,9 +497,9 @@
for (ConnectionEntry entry : connections.values())
{
RemotingConnection conn = entry.connection;
-
+
boolean flush = true;
-
+
if (entry.ttl != -1)
{
if (now >= entry.lastCheck + entry.ttl)
@@ -505,7 +507,7 @@
if (!conn.checkDataReceived())
{
idsToRemove.add(conn.getID());
-
+
flush = false;
}
else
@@ -514,12 +516,12 @@
}
}
}
-
+
if (flush)
{
- //We flush any confirmations on the connection - this prevents idle bridges for example
- //sitting there with many unacked messages
-
+ // We flush any confirmations on the connection - this prevents idle bridges for example
+ // sitting there with many unacked messages
+
conn.flushConfirmations();
}
}
Added: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolConverter.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolConverter.java (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolConverter.java 2010-01-19 16:04:31 UTC (rev 8807)
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.stomp;
+
+import java.util.Map;
+
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.impl.wireformat.CreateSessionMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionCloseMessage;
+import org.hornetq.utils.UUIDGenerator;
+import org.hornetq.utils.VersionLoader;
+
+/**
+ * A ProtocolConverter
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+public class ProtocolConverter
+{
+
+ public Packet toPacket(StompFrame frame)
+ {
+ String command = frame.getCommand();
+ Map<String, Object> headers = frame.getHeaders();
+ if (Stomp.Commands.CONNECT.equals(command))
+ {
+ String login = (String)headers.get("login");
+ String password = (String)headers.get("passcode");
+
+ String name = UUIDGenerator.getInstance().generateStringUUID();
+ long sessionChannelID = 12;
+ return new CreateSessionMessage(name,
+ sessionChannelID,
+ VersionLoader.getVersion().getIncrementingVersion(),
+ login,
+ password,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ false,
+ true,
+ true,
+ false,
+ HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE);
+ }
+ if (Stomp.Commands.DISCONNECT.equals(command))
+ {
+ return new SessionCloseMessage();
+ }
+ else
+ {
+ throw new RuntimeException("frame not supported: " + frame);
+ }
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolException.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolException.java (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolException.java 2010-01-19 16:04:31 UTC (rev 8807)
@@ -0,0 +1,50 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.integration.stomp;
+
+import java.io.IOException;
+
+/**
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+public class ProtocolException extends IOException {
+ private static final long serialVersionUID = -2869735532997332242L;
+ private final boolean fatal;
+
+ public ProtocolException() {
+ this(null);
+ }
+
+ public ProtocolException(String s) {
+ this(s, false);
+ }
+
+ public ProtocolException(String s, boolean fatal) {
+ this(s, fatal, null);
+ }
+
+ public ProtocolException(String s, boolean fatal, Throwable cause) {
+ super(s);
+ this.fatal = fatal;
+ initCause(cause);
+ }
+
+ public boolean isFatal() {
+ return fatal;
+ }
+}
Added: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/Stomp.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/Stomp.java (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/Stomp.java 2010-01-19 16:04:31 UTC (rev 8807)
@@ -0,0 +1,124 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.integration.stomp;
+
+
+/**
+ * The standard verbs and headers used for the <a href="http://stomp.codehaus.org/">STOMP</a> protocol.
+ *
+ * @version $Revision: 57 $
+ */
+public interface Stomp {
+ String NULL = "\u0000";
+ String NEWLINE = "\n";
+
+ public static interface Commands {
+ String CONNECT = "CONNECT";
+ String SEND = "SEND";
+ String DISCONNECT = "DISCONNECT";
+ String SUBSCRIBE = "SUB";
+ String UNSUBSCRIBE = "UNSUB";
+ String BEGIN_TRANSACTION = "BEGIN";
+ String COMMIT_TRANSACTION = "COMMIT";
+ String ABORT_TRANSACTION = "ABORT";
+ String BEGIN = "BEGIN";
+ String COMMIT = "COMMIT";
+ String ABORT = "ABORT";
+ String ACK = "ACK";
+ }
+
+ public interface Responses {
+ String CONNECTED = "CONNECTED";
+ String ERROR = "ERROR";
+ String MESSAGE = "MESSAGE";
+ String RECEIPT = "RECEIPT";
+ }
+
+ public interface Headers {
+ String SEPERATOR = ":";
+ String RECEIPT_REQUESTED = "receipt";
+ String TRANSACTION = "transaction";
+ String CONTENT_LENGTH = "content-length";
+
+ public interface Response {
+ String RECEIPT_ID = "receipt-id";
+ }
+
+ public interface Send {
+ String DESTINATION = "destination";
+ String CORRELATION_ID = "correlation-id";
+ String REPLY_TO = "reply-to";
+ String EXPIRATION_TIME = "expires";
+ String PRIORITY = "priority";
+ String TYPE = "type";
+ Object PERSISTENT = "persistent";
+ }
+
+ public interface Message {
+ String MESSAGE_ID = "message-id";
+ String DESTINATION = "destination";
+ String CORRELATION_ID = "correlation-id";
+ String EXPIRATION_TIME = "expires";
+ String REPLY_TO = "reply-to";
+ String PRORITY = "priority";
+ String REDELIVERED = "redelivered";
+ String TIMESTAMP = "timestamp";
+ String TYPE = "type";
+ String SUBSCRIPTION = "subscription";
+ }
+
+ public interface Subscribe {
+ String DESTINATION = "destination";
+ String ACK_MODE = "ack";
+ String ID = "id";
+ String SELECTOR = "selector";
+ String DURABLE_SUBSCRIPTION_NAME = "durable-subscription-name";
+ String NO_LOCAL = "no-local";
+
+ public interface AckModeValues {
+ String AUTO = "auto";
+ String CLIENT = "client";
+ }
+ }
+
+ public interface Unsubscribe {
+ String DESTINATION = "destination";
+ String ID = "id";
+ }
+
+ public interface Connect {
+ String LOGIN = "login";
+ String PASSCODE = "passcode";
+ String CLIENT_ID = "client-id";
+ String REQUEST_ID = "request-id";
+ }
+
+ public interface Error {
+ String MESSAGE = "message";
+ }
+
+ public interface Connected {
+ String SESSION = "session";
+ String RESPONSE_ID = "response-id";
+ }
+
+ public interface Ack {
+ String MESSAGE_ID = "message-id";
+ }
+ }
+}
Added: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrame.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrame.java (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrame.java 2010-01-19 16:04:31 UTC (rev 8807)
@@ -0,0 +1,70 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.integration.stomp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Represents all the data in a STOMP frame.
+ *
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+public class StompFrame
+{
+ private static final byte[] NO_DATA = new byte[] {};
+
+ private String command;
+
+ private Map<String, Object> headers;
+
+ private byte[] content = StompFrame.NO_DATA;
+
+ public StompFrame()
+ {
+ this.headers = new HashMap<String, Object>();
+ }
+
+ public StompFrame(String command, Map<String, Object> headers, byte[] data)
+ {
+ this.command = command;
+ this.headers = headers;
+ this.content = data;
+ }
+
+ public String getCommand()
+ {
+ return command;
+ }
+
+ public byte[] getContent()
+ {
+ return content;
+ }
+
+ public Map<String, Object> getHeaders()
+ {
+ return headers;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "StompFrame[command=" + command + ", headers=" + headers + ",content-length=" + content.length + "]";
+ }
+}
Added: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameDelimiter.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameDelimiter.java (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameDelimiter.java 2010-01-19 16:04:31 UTC (rev 8807)
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.stomp;
+
+import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
+import org.jboss.netty.handler.codec.frame.Delimiters;
+
+/**
+ * A StompFrameDelimiter
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ */
+public class StompFrameDelimiter extends DelimiterBasedFrameDecoder
+{
+
+ private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
+
+ public StompFrameDelimiter()
+ {
+ super(MAX_DATA_LENGTH, true, Delimiters.nulDelimiter());
+ }
+}
Added: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameError.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameError.java (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameError.java 2010-01-19 16:04:31 UTC (rev 8807)
@@ -0,0 +1,35 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.integration.stomp;
+
+/**
+ * Command indicating that an invalid Stomp Frame was received.
+ *
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+public class StompFrameError extends StompFrame {
+ private final ProtocolException exception;
+
+ public StompFrameError(ProtocolException exception) {
+ this.exception = exception;
+ }
+
+ public ProtocolException getException() {
+ return exception;
+ }
+}
Added: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompMarshaller.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompMarshaller.java (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompMarshaller.java 2010-01-19 16:04:31 UTC (rev 8807)
@@ -0,0 +1,191 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.integration.stomp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.hornetq.api.core.HornetQBuffer;
+
+/**
+ * Implements marshalling and unmarsalling the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
+ */
+public class StompMarshaller {
+ private static final byte[] NO_DATA = new byte[]{};
+ private static final byte[] END_OF_FRAME = new byte[]{0, '\n'};
+ private static final int MAX_COMMAND_LENGTH = 1024;
+ private static final int MAX_HEADER_LENGTH = 1024 * 10;
+ private static final int MAX_HEADERS = 1000;
+ private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
+ private int version = 1;
+
+ public int getVersion() {
+ return version;
+ }
+
+ public void setVersion(int version) {
+ this.version = version;
+ }
+
+ public byte[] marshal(StompFrame command) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ marshal(command, dos);
+ dos.close();
+ return baos.toByteArray();
+ }
+
+ public void marshal(StompFrame stomp, DataOutput os) throws IOException {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append(stomp.getCommand());
+ buffer.append(Stomp.NEWLINE);
+
+ // Output the headers.
+ for (Iterator iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();) {
+ Map.Entry entry = (Map.Entry) iter.next();
+ buffer.append(entry.getKey());
+ buffer.append(Stomp.Headers.SEPERATOR);
+ buffer.append(entry.getValue());
+ buffer.append(Stomp.NEWLINE);
+ }
+
+ // Add a newline to seperate the headers from the content.
+ buffer.append(Stomp.NEWLINE);
+
+ os.write(buffer.toString().getBytes("UTF-8"));
+ os.write(stomp.getContent());
+ os.write(END_OF_FRAME);
+ }
+
+ public StompFrame unmarshal(HornetQBuffer in) throws IOException {
+
+ try {
+ String action = null;
+
+ // skip white space to next real action line
+ while (true) {
+ action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
+ if (action == null) {
+ throw new IOException("connection was closed");
+ }
+ else {
+ action = action.trim();
+ if (action.length() > 0) {
+ break;
+ }
+ }
+ }
+
+ // Parse the headers
+ HashMap headers = new HashMap(25);
+ while (true) {
+ String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
+ if (line != null && line.trim().length() > 0) {
+
+ if (headers.size() > MAX_HEADERS) {
+ throw new ProtocolException("The maximum number of headers was exceeded", true);
+ }
+
+ try {
+ int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR);
+ String name = line.substring(0, seperator_index).trim();
+ String value = line.substring(seperator_index + 1, line.length()).trim();
+ headers.put(name, value);
+ }
+ catch (Exception e) {
+ throw new ProtocolException("Unable to parser header line [" + line + "]", true);
+ }
+ }
+ else {
+ break;
+ }
+ }
+
+ // Read in the data part.
+ byte[] data = NO_DATA;
+ String contentLength = (String) headers.get(Stomp.Headers.CONTENT_LENGTH);
+ if (contentLength != null) {
+
+ // Bless the client, he's telling us how much data to read in.
+ int length;
+ try {
+ length = Integer.parseInt(contentLength.trim());
+ }
+ catch (NumberFormatException e) {
+ throw new ProtocolException("Specified content-length is not a valid integer", true);
+ }
+
+ if (length > MAX_DATA_LENGTH) {
+ throw new ProtocolException("The maximum data length was exceeded", true);
+ }
+
+ data = new byte[length];
+ in.readBytes(data);
+
+ if (in.readByte() != 0) {
+ throw new ProtocolException(Stomp.Headers.CONTENT_LENGTH + " bytes were read and " + "there was no trailing null byte", true);
+ }
+ }
+ else {
+
+ // We don't know how much to read.. data ends when we hit a 0
+ byte b;
+ ByteArrayOutputStream baos = null;
+ while (in.readableBytes() > 0 && (b = in.readByte()) != 0) {
+
+ if (baos == null) {
+ baos = new ByteArrayOutputStream();
+ }
+ else if (baos.size() > MAX_DATA_LENGTH) {
+ throw new ProtocolException("The maximum data length was exceeded", true);
+ }
+
+ baos.write(b);
+ }
+
+ if (baos != null) {
+ baos.close();
+ data = baos.toByteArray();
+ }
+ }
+
+ return new StompFrame(action, headers, data);
+ }
+ catch (ProtocolException e) {
+ return new StompFrameError(e);
+ }
+ }
+
+ protected String readLine(HornetQBuffer in, int maxLength, String errorMessage) throws IOException {
+ byte b;
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(maxLength);
+ while ((b = in.readByte()) != '\n') {
+ if (baos.size() > maxLength) {
+ throw new ProtocolException(errorMessage, true);
+ }
+ baos.write(b);
+ }
+ byte[] sequence = baos.toByteArray();
+ return new String(sequence, "UTF-8");
+ }
+}
Added: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompPacketDecoder.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompPacketDecoder.java (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompPacketDecoder.java 2010-01-19 16:04:31 UTC (rev 8807)
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.stomp;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.PacketDecoder;
+
+/**
+ * A StompPacketDecoder
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ */
+public class StompPacketDecoder implements PacketDecoder
+{
+ private final StompMarshaller marshaller = new StompMarshaller();
+
+ private final ProtocolConverter converter = new ProtocolConverter();
+
+ // PacketDecoder implementation ----------------------------------
+
+ public Packet decode(HornetQBuffer in)
+ {
+ StompFrame frame;
+ try
+ {
+ frame = marshaller.unmarshal(in);
+ System.out.println(">>> " + frame);
+ Packet packet = converter.toPacket(frame);
+ packet.setChannelID(1);
+ System.out.println(">>> " + packet);
+
+ return packet;
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ return null;
+ }
+ }
+}
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2010-01-19 16:04:31 UTC (rev 8807)
@@ -16,6 +16,7 @@
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
+import org.hornetq.integration.stomp.StompFrameDelimiter;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.handler.ssl.SslHandler;
@@ -44,9 +45,15 @@
// Public --------------------------------------------------------
- public static void addCodecFilter(final ChannelPipeline pipeline, final BufferHandler handler)
+ public static void addStompCodecFilter(final ChannelPipeline pipeline, final BufferHandler handler)
{
assert pipeline != null;
+ pipeline.addLast("delimiter", new StompFrameDelimiter());
+ }
+
+ public static void addHornetQCodecFilter(final ChannelPipeline pipeline, final BufferHandler handler)
+ {
+ assert pipeline != null;
pipeline.addLast("decoder", new HornetQFrameDecoder2());
}
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQChannelHandler.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQChannelHandler.java 2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQChannelHandler.java 2010-01-19 16:04:31 UTC (rev 8807)
@@ -15,6 +15,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.PacketDecoder;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
import org.jboss.netty.buffer.ChannelBuffer;
@@ -31,23 +32,27 @@
* @author <a href="mailto:tlee@redhat.com">Trustin Lee</a>
* @version $Rev$, $Date$
*/
-class HornetQChannelHandler extends SimpleChannelHandler
+public class HornetQChannelHandler extends SimpleChannelHandler
{
private static final Logger log = Logger.getLogger(HornetQChannelHandler.class);
private final ChannelGroup group;
+ private final PacketDecoder decoder;
+
private final BufferHandler handler;
private final ConnectionLifeCycleListener listener;
volatile boolean active;
- HornetQChannelHandler(final ChannelGroup group,
- final BufferHandler handler,
- final ConnectionLifeCycleListener listener)
+ public HornetQChannelHandler(final ChannelGroup group,
+ final PacketDecoder decoder,
+ final BufferHandler handler,
+ final ConnectionLifeCycleListener listener)
{
this.group = group;
+ this.decoder = decoder;
this.handler = handler;
this.listener = listener;
}
@@ -64,7 +69,7 @@
{
ChannelBuffer buffer = (ChannelBuffer)e.getMessage();
- handler.bufferReceived(e.getChannel().getId(), new ChannelBufferWrapper(buffer));
+ handler.bufferReceived(e.getChannel().getId(), new ChannelBufferWrapper(buffer), decoder);
}
@Override
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-19 16:04:31 UTC (rev 8807)
@@ -31,9 +31,12 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.PacketDecoder;
+import org.hornetq.core.remoting.impl.CorePacketDecoder;
import org.hornetq.core.remoting.impl.ssl.SSLSupport;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.server.management.NotificationService;
+import org.hornetq.integration.stomp.StompPacketDecoder;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
@@ -102,6 +105,7 @@
private final boolean useInvm;
+ private final String protocol;
private final String host;
private final int port;
@@ -176,6 +180,9 @@
useInvm = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_INVM_PROP_NAME,
TransportConstants.DEFAULT_USE_INVM,
configuration);
+ protocol = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOL_PROP_NAME,
+ TransportConstants.DEFAULT_PROTOCOL,
+ configuration);
host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME,
TransportConstants.DEFAULT_HOST,
configuration);
@@ -279,9 +286,18 @@
pipeline.addLast("httpResponseEncoder", new HttpResponseEncoder());
pipeline.addLast("httphandler", new HttpAcceptorHandler(httpKeepAliveRunnable, httpResponseTime));
}
-
- ChannelPipelineSupport.addCodecFilter(pipeline, handler);
- pipeline.addLast("handler", new HornetQServerChannelHandler(channelGroup, handler, new Listener()));
+ PacketDecoder decoder;
+ if (protocol.equals(TransportConstants.STOMP_PROTOCOL))
+ {
+ ChannelPipelineSupport.addStompCodecFilter(pipeline, handler);
+ decoder = new StompPacketDecoder();
+ } else
+ {
+ ChannelPipelineSupport.addHornetQCodecFilter(pipeline, handler);
+ decoder = new CorePacketDecoder();
+ }
+
+ pipeline.addLast("handler", new HornetQServerChannelHandler(channelGroup, decoder, handler, new Listener()));
return pipeline;
}
};
@@ -475,10 +491,11 @@
private final class HornetQServerChannelHandler extends HornetQChannelHandler
{
HornetQServerChannelHandler(final ChannelGroup group,
+ final PacketDecoder decoder,
final BufferHandler handler,
final ConnectionLifeCycleListener listener)
{
- super(group, handler, listener);
+ super(group, decoder, handler, listener);
}
@Override
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnection.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnection.java 2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnection.java 2010-01-19 16:04:31 UTC (rev 8807)
@@ -14,7 +14,6 @@
package org.hornetq.integration.transports.netty;
import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.HornetQException;
import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
import org.hornetq.core.logging.Logger;
import org.hornetq.spi.core.remoting.Connection;
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java 2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java 2010-01-19 16:04:31 UTC (rev 8807)
@@ -30,6 +30,8 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.PacketDecoder;
+import org.hornetq.core.remoting.impl.CorePacketDecoder;
import org.hornetq.core.remoting.impl.ssl.SSLSupport;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
@@ -93,6 +95,8 @@
private ChannelGroup channelGroup;
private final BufferHandler handler;
+
+ private final PacketDecoder decoder = new CorePacketDecoder();
private final ConnectionLifeCycleListener listener;
@@ -310,7 +314,7 @@
pipeline.addLast("httpResponseDecoder", new HttpResponseDecoder());
pipeline.addLast("httphandler", new HttpHandler());
}
- ChannelPipelineSupport.addCodecFilter(pipeline, handler);
+ ChannelPipelineSupport.addHornetQCodecFilter(pipeline, handler);
pipeline.addLast("handler", new HornetQClientChannelHandler(channelGroup, handler, new Listener()));
return pipeline;
}
@@ -441,7 +445,7 @@
final BufferHandler handler,
final ConnectionLifeCycleListener listener)
{
- super(group, handler, listener);
+ super(group, decoder, handler, listener);
}
}
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/TransportConstants.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/TransportConstants.java 2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/TransportConstants.java 2010-01-19 16:04:31 UTC (rev 8807)
@@ -46,6 +46,8 @@
public static final String USE_INVM_PROP_NAME = "use-invm";
+ public static final String PROTOCOL_PROP_NAME = "protocol";
+
public static final String HOST_PROP_NAME = "host";
public static final String PORT_PROP_NAME = "port";
@@ -75,10 +77,18 @@
public static final boolean DEFAULT_USE_SERVLET = false;
+ public static final String HORNETQ_PROTOCOL = "hornetq";
+
+ public static final String STOMP_PROTOCOL = "stomp";
+
+ public static final String DEFAULT_PROTOCOL = HORNETQ_PROTOCOL;
+
public static final String DEFAULT_HOST = "localhost";
public static final int DEFAULT_PORT = 5445;
+ public static final int DEFAULT_STOMP_PORT = 61613;
+
public static final String DEFAULT_KEYSTORE_PATH = "hornetq.keystore";
public static final String DEFAULT_KEYSTORE_PASSWORD = "secureexample";
@@ -120,6 +130,7 @@
allowableAcceptorKeys.add(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_NIO_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_INVM_PROP_NAME);
+ allowableAcceptorKeys.add(TransportConstants.PROTOCOL_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.HOST_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.PORT_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.KEYSTORE_PATH_PROP_NAME);
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferHandler.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferHandler.java 2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferHandler.java 2010-01-19 16:04:31 UTC (rev 8807)
@@ -13,6 +13,7 @@
package org.hornetq.spi.core.remoting;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.remoting.PacketDecoder;
/**
* A BufferHandler that will handle buffers received by an acceptor.
@@ -29,7 +30,7 @@
* @param connectionID the connection the buffer was received on
* @param buffer the buffer to decode
*/
- void bufferReceived(Object connectionID, HornetQBuffer buffer);
+ void bufferReceived(Object connectionID, HornetQBuffer buffer, PacketDecoder decoder);
/**
* called by the remoting connection prior to {@link org.hornetq.spi.core.remoting.BufferHandler#bufferReceived(Object, org.hornetq.api.core.HornetQBuffer)}.
Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2010-01-19 16:04:31 UTC (rev 8807)
@@ -25,6 +25,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.remoting.PacketDecoder;
import org.hornetq.core.remoting.impl.AbstractBufferHandler;
import org.hornetq.integration.transports.netty.NettyAcceptor;
import org.hornetq.integration.transports.netty.NettyConnector;
@@ -474,7 +475,7 @@
this.latch = latch;
}
- public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
+ public void bufferReceived(final Object connectionID, final HornetQBuffer buffer, final PacketDecoder decoder)
{
int i = buffer.readInt();
messages.add(i);
@@ -496,7 +497,7 @@
this.latch = latch;
}
- public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
+ public void bufferReceived(final Object connectionID, final HornetQBuffer buffer, final PacketDecoder decoder)
{
int i = buffer.readInt();
@@ -529,7 +530,7 @@
return 0;
}
- public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
+ public void bufferReceived(final Object connectionID, final HornetQBuffer buffer, final PacketDecoder decoder)
{
int i = buffer.readInt();
messages.add(i);
Added: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-19 16:04:31 UTC (rev 8807)
@@ -0,0 +1,841 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.tests.integration.stomp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.integration.stomp.Stomp;
+import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
+import org.hornetq.integration.transports.netty.TransportConstants;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.jms.server.config.JMSConfiguration;
+import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
+import org.hornetq.jms.server.config.impl.QueueConfigurationImpl;
+import org.hornetq.jms.server.impl.JMSServerManagerImpl;
+
+public class StompTest extends TestCase {
+ private static final transient Log log = LogFactory.getLog(StompTest.class);
+ private int port = 61613;
+ private Socket stompSocket;
+ private ByteArrayOutputStream inputBuffer;
+ private ConnectionFactory connectionFactory;
+ private Connection connection;
+ private Session session;
+ private Queue queue;
+ private JMSServerManager server;
+
+ public void testConnect() throws Exception {
+
+ String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "request-id: 1\n" + "\n" + Stomp.NULL;
+ sendFrame(connect_frame);
+
+ String f = receiveFrame(10000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
+ Assert.assertTrue(f.indexOf("response-id:1") >= 0);
+ }
+
+ public void testSendMessage() throws Exception {
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() + "\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+
+ // Make sure that the timestamp is valid - should
+ // be very close to the current time.
+ long tnow = System.currentTimeMillis();
+ long tmsg = message.getJMSTimestamp();
+ Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+ }
+
+ public void _testJMSXGroupIdCanBeSet() throws Exception {
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "JMSXGroupID: TEST\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ // TODO do we support it?
+ //Assert.assertEquals("TEST", ((TextMessage) message).getGroupID());
+ }
+
+ public void testSendMessageWithCustomHeadersAndSelector() throws Exception {
+
+ MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'");
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SEND\n" +
+ "foo:abc\n" +
+ "bar:123\n" +
+ "destination:/queue/" + getQueueName() + "\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+ Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
+ Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
+ }
+
+ public void _testSendMessageWithStandardHeaders() throws Exception {
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SEND\n" +
+ "correlation-id:c123\n" +
+ "priority:3\n" +
+ "type:t345\n" +
+ "JMSXGroupID:abc\n" +
+ "foo:abc\n" +
+ "bar:123\n" +
+ "destination:/queue/" + getQueueName() + "\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+ Assert.assertEquals("JMSCorrelationID", "c123", message.getJMSCorrelationID());
+ Assert.assertEquals("getJMSType", "t345", message.getJMSType());
+ Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority());
+ Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
+ Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
+
+ Assert.assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID"));
+ // FIXME do we support it?
+ //Assert.assertEquals("GroupID", "abc", amqMessage.getGroupID());
+ }
+
+ public void testSubscribeWithAutoAck() throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "ack:auto\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ sendMessage(getName());
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ }
+
+ public void testSubscribeWithAutoAckAndBytesMessage() throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "ack:auto\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ sendBytesMessage(new byte[]{1, 2, 3, 4, 5});
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+ Pattern cl = Pattern.compile("Content-length:\\s*(\\d+)", Pattern.CASE_INSENSITIVE);
+ Matcher cl_matcher = cl.matcher(frame);
+ Assert.assertTrue(cl_matcher.find());
+ Assert.assertEquals("5", cl_matcher.group(1));
+
+ Assert.assertFalse(Pattern.compile("type:\\s*null", Pattern.CASE_INSENSITIVE).matcher(frame).find());
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ }
+
+ public void testSubscribeWithMessageSentWithProperties() throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "ack:auto\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ MessageProducer producer = session.createProducer(queue);
+ TextMessage message = session.createTextMessage("Hello World");
+ message.setStringProperty("s", "value");
+ message.setBooleanProperty("n", false);
+ message.setByteProperty("byte", (byte) 9);
+ message.setDoubleProperty("d", 2.0);
+ message.setFloatProperty("f", (float) 6.0);
+ message.setIntProperty("i", 10);
+ message.setLongProperty("l", 121);
+ message.setShortProperty("s", (short) 12);
+ producer.send(message);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+// System.out.println("out: "+frame);
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ }
+
+ public void testMessagesAreInOrder() throws Exception {
+ int ctr = 10;
+ String[] data = new String[ctr];
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "ack:auto\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ for (int i = 0; i < ctr; ++i) {
+ data[i] = getName() + i;
+ sendMessage(data[i]);
+ }
+
+ for (int i = 0; i < ctr; ++i) {
+ frame = receiveFrame(1000);
+ Assert.assertTrue("Message not in order", frame.indexOf(data[i]) >= 0);
+ }
+
+ // sleep a while before publishing another set of messages
+ waitForFrameToTakeEffect();
+
+ for (int i = 0; i < ctr; ++i) {
+ data[i] = getName() + ":second:" + i;
+ sendMessage(data[i]);
+ }
+
+ for (int i = 0; i < ctr; ++i) {
+ frame = receiveFrame(1000);
+ Assert.assertTrue("Message not in order", frame.indexOf(data[i]) >= 0);
+ }
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ }
+
+ public void testSubscribeWithAutoAckAndSelector() throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "selector: foo = 'zzz'\n" +
+ "ack:auto\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ sendMessage("Ignored message", "foo", "1234");
+ sendMessage("Real message", "foo", "zzz");
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real message") > 0);
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ }
+
+ public void testSubscribeWithClientAck() throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "ack:client\n\n" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+ sendMessage(getName());
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ // message should be received since message was not acknowledged
+ MessageConsumer consumer = session.createConsumer(queue);
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertTrue(message.getJMSRedelivered());
+ }
+
+ public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws Exception {
+ assertSubscribeWithClientAckThenConsumeWithAutoAck(false);
+ }
+
+ public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithExplicitDisconnect() throws Exception {
+ assertSubscribeWithClientAckThenConsumeWithAutoAck(true);
+ }
+
+ protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "ack:client\n\n" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+ sendMessage(getName());
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+ log.info("Reconnecting!");
+
+ if (sendDisconnect) {
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ reconnect();
+ }
+ else {
+ reconnect(1000);
+ }
+
+
+ // message should be received since message was not acknowledged
+ frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n\n" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ // now lets make sure we don't see the message again
+ reconnect();
+
+ frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n\n" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+ sendMessage("shouldBeNextMessage");
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.contains("shouldBeNextMessage"));
+ }
+
+ public void testUnsubscribe() throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "ack:auto\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ //send a message to our queue
+ sendMessage("first message");
+
+ //receive message from socket
+ frame = receiveFrame(1000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+ //remove suscription
+ frame =
+ "UNSUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ waitForFrameToTakeEffect();
+
+ //send a message to our queue
+ sendMessage("second message");
+
+ try {
+ frame = receiveFrame(1000);
+ log.info("Received frame: " + frame);
+ Assert.fail("No message should have been received since subscription was removed");
+ }
+ catch (SocketTimeoutException e) {
+
+ }
+ }
+
+ public void testTransactionCommit() throws Exception {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ String f = receiveFrame(1000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
+
+ frame =
+ "BEGIN\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "COMMIT\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ waitForFrameToTakeEffect();
+
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull("Should have received a message", message);
+ }
+
+ public void testTransactionRollback() throws Exception {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ String f = receiveFrame(1000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
+
+ frame =
+ "BEGIN\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "transaction: tx1\n" +
+ "\n" +
+ "first message" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ //rollback first message
+ frame =
+ "ABORT\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "BEGIN\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "transaction: tx1\n" +
+ "\n" +
+ "second message" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "COMMIT\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ // This test case is currently failing
+ waitForFrameToTakeEffect();
+
+ //only second msg should be received since first msg was rolled back
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("second message", message.getText().trim());
+ }
+
+ // Implementation methods
+ //-------------------------------------------------------------------------
+ protected void setUp() throws Exception {
+ server = createServer();
+ server.start();
+ connectionFactory = createConnectionFactory();
+
+ stompSocket = createSocket();
+ inputBuffer = new ByteArrayOutputStream();
+
+ connection = connectionFactory.createConnection();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ queue = session.createQueue(getQueueName());
+ connection.start();
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ */
+ private JMSServerManager createServer() throws Exception
+ {
+ Configuration config = new ConfigurationImpl();
+ config.setSecurityEnabled(false);
+
+ Map<String, Object> params = new HashMap<String, Object>();
+ params.put(TransportConstants.PROTOCOL_PROP_NAME, TransportConstants.STOMP_PROTOCOL);
+ params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
+ TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
+ config.getAcceptorConfigurations().add(stompTransport);
+ config.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
+ HornetQServer hornetQServer = HornetQServers.newHornetQServer(config);
+
+ JMSConfiguration jmsConfig = new JMSConfigurationImpl();
+ jmsConfig.getQueueConfigurations().add(new QueueConfigurationImpl(getQueueName(), null, false, getQueueName()));
+ server = new JMSServerManagerImpl(hornetQServer, jmsConfig);
+ server.setContext(null);
+ return server;
+ }
+
+ protected void tearDown() throws Exception {
+ connection.close();
+ if (stompSocket != null) {
+ stompSocket.close();
+ }
+ server.stop();
+ }
+
+ protected void reconnect() throws Exception {
+ reconnect(0);
+ }
+ protected void reconnect(long sleep) throws Exception {
+ stompSocket.close();
+
+ if (sleep > 0) {
+ Thread.sleep(sleep);
+ }
+
+ stompSocket = createSocket();
+ inputBuffer = new ByteArrayOutputStream();
+ }
+
+ protected ConnectionFactory createConnectionFactory() {
+ return new HornetQConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ }
+
+ protected Socket createSocket() throws IOException {
+ return new Socket("127.0.0.1", port);
+ }
+
+ protected String getQueueName() {
+ return "test";
+ }
+
+ public void sendFrame(String data) throws Exception {
+ byte[] bytes = data.getBytes("UTF-8");
+ OutputStream outputStream = stompSocket.getOutputStream();
+ for (int i = 0; i < bytes.length; i++) {
+ outputStream.write(bytes[i]);
+ }
+ outputStream.flush();
+ }
+
+ public String receiveFrame(long timeOut) throws Exception {
+ stompSocket.setSoTimeout((int) timeOut);
+ InputStream is = stompSocket.getInputStream();
+ int c = 0;
+ for (; ;) {
+ c = is.read();
+ System.out.println(c);
+ if (c < 0) {
+ throw new IOException("socket closed.");
+ }
+ else if (c == 0) {
+ c = is.read();
+ Assert.assertEquals("Expecting stomp frame to terminate with \0\n", c, '\n');
+ byte[] ba = inputBuffer.toByteArray();
+ inputBuffer.reset();
+ return new String(ba, "UTF-8");
+ }
+ else {
+ inputBuffer.write(c);
+ }
+ }
+ }
+
+ public void sendMessage(String msg) throws Exception {
+ sendMessage(msg, "foo", "xyz");
+ }
+
+ public void sendMessage(String msg, String propertyName, String propertyValue) throws JMSException {
+ MessageProducer producer = session.createProducer(queue);
+ TextMessage message = session.createTextMessage(msg);
+ message.setStringProperty(propertyName, propertyValue);
+ producer.send(message);
+ }
+
+ public void sendBytesMessage(byte[] msg) throws Exception {
+ MessageProducer producer = session.createProducer(queue);
+ BytesMessage message = session.createBytesMessage();
+ message.writeBytes(msg);
+ producer.send(message);
+ }
+
+ protected void waitForFrameToTakeEffect() throws InterruptedException {
+ // bit of a dirty hack :)
+ // another option would be to force some kind of receipt to be returned
+ // from the frame
+ Thread.sleep(2000);
+ }
+}
Added: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest2.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest2.java (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest2.java 2010-01-19 16:04:31 UTC (rev 8807)
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.stomp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.integration.transports.netty.NettyAcceptor;
+import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
+import org.hornetq.integration.transports.netty.TransportConstants;
+
+import junit.framework.TestCase;
+
+/**
+ * A StompTest
+ *
+ * @author jmesnil
+ *
+ *
+ */
+public class StompTest2 extends TestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private HornetQServer server;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testFoo() throws Exception
+ {
+ Thread.sleep(10);
+ }
+
+ // Package protected ---------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ Configuration config = new ConfigurationImpl();
+ config.setSecurityEnabled(false);
+
+ Map<String, Object> params = new HashMap<String, Object>();
+ params.put(TransportConstants.PROTOCOL_PROP_NAME, TransportConstants.STOMP_PROTOCOL);
+ params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
+ TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
+ config.getAcceptorConfigurations().add(stompTransport);
+
+ server = HornetQServers.newHornetQServer(config);
+ server.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ server.stop();
+
+ super.tearDown();
+ }
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java 2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java 2010-01-19 16:04:31 UTC (rev 8807)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.remoting.PacketDecoder;
import org.hornetq.core.remoting.impl.AbstractBufferHandler;
import org.hornetq.integration.transports.netty.NettyAcceptor;
import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
@@ -47,7 +48,7 @@
BufferHandler handler = new AbstractBufferHandler()
{
- public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
+ public void bufferReceived(Object connectionID, HornetQBuffer buffer, PacketDecoder decoder)
{
}
};
Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2010-01-19 16:04:31 UTC (rev 8807)
@@ -22,6 +22,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.remoting.PacketDecoder;
import org.hornetq.core.remoting.impl.AbstractBufferHandler;
import org.hornetq.integration.transports.netty.NettyAcceptor;
import org.hornetq.integration.transports.netty.TransportConstants;
@@ -60,7 +61,7 @@
BufferHandler handler = new AbstractBufferHandler()
{
- public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
+ public void bufferReceived(final Object connectionID, final HornetQBuffer buffer, final PacketDecoder decoder)
{
}
};
Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java 2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java 2010-01-19 16:04:31 UTC (rev 8807)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.remoting.PacketDecoder;
import org.hornetq.core.remoting.impl.AbstractBufferHandler;
import org.hornetq.integration.transports.netty.NettyConnector;
import org.hornetq.spi.core.remoting.BufferHandler;
@@ -51,7 +52,7 @@
{
BufferHandler handler = new AbstractBufferHandler()
{
- public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
+ public void bufferReceived(final Object connectionID, final HornetQBuffer buffer, final PacketDecoder decoder)
{
}
};
@@ -88,7 +89,7 @@
{
BufferHandler handler = new AbstractBufferHandler()
{
- public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
+ public void bufferReceived(final Object connectionID, final HornetQBuffer buffer, final PacketDecoder decoder)
{
}
};
14 years, 11 months
JBoss hornetq SVN: r8805 - trunk/.settings.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-01-18 12:35:51 -0500 (Mon, 18 Jan 2010)
New Revision: 8805
Modified:
trunk/.settings/org.eclipse.jdt.ui.prefs
Log:
updated copyright year for new files
Modified: trunk/.settings/org.eclipse.jdt.ui.prefs
===================================================================
--- trunk/.settings/org.eclipse.jdt.ui.prefs 2010-01-18 11:27:20 UTC (rev 8804)
+++ trunk/.settings/org.eclipse.jdt.ui.prefs 2010-01-18 17:35:51 UTC (rev 8805)
@@ -1,4 +1,4 @@
-#Mon Dec 07 20:39:16 GMT 2009
+#Mon Jan 18 18:34:30 CET 2010
cleanup.add_default_serial_version_id=false
cleanup.add_generated_serial_version_id=true
cleanup.add_missing_annotations=true
@@ -64,7 +64,7 @@
org.eclipse.jdt.ui.ondemandthreshold=9999
org.eclipse.jdt.ui.overrideannotation=true
org.eclipse.jdt.ui.staticondemandthreshold=9999
-org.eclipse.jdt.ui.text.custom_code_templates=<?xml version\="1.0" encoding\="UTF-8" standalone\="no"?><templates><template autoinsert\="true" context\="gettercomment_context" deleted\="false" description\="Comment for getter method" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.gettercomment" name\="gettercomment">/**\n * @return the ${bare_field_name}\n */</template><template autoinsert\="true" context\="settercomment_context" deleted\="false" description\="Comment for setter method" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.settercomment" name\="settercomment">/**\n * @param ${param} the ${bare_field_name} to set\n */</template><template autoinsert\="true" context\="constructorcomment_context" deleted\="false" description\="Comment for created constructors" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.constructorcomment" name\="constructorcomment">/**\n * ${tags}\n */</template><template autoinsert\="false" context\="filecommen!
t_context" deleted\="false" description\="Comment for created Java files" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.filecomment" name\="filecomment"/><template autoinsert\="false" context\="typecomment_context" deleted\="false" description\="Comment for created types" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.typecomment" name\="typecomment">/**\n * A ${type_name}\n *\n * @author ${user}\n *\n * ${tags}\n *\n */</template><template autoinsert\="true" context\="fieldcomment_context" deleted\="false" description\="Comment for fields" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.fieldcomment" name\="fieldcomment">/**\n * \n */</template><template autoinsert\="true" context\="methodcomment_context" deleted\="false" description\="Comment for non-overriding methods" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.methodcomment" name\="methodcomment">/**\n * ${tags}\n */</template><template autoinsert\="true" context\="ove!
rridecomment_context" deleted\="false" description\="Comment f!
or overr
iding methods" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.overridecomment" name\="overridecomment">/* (non-Javadoc)\n * ${see_to_overridden}\n */</template><template autoinsert\="true" context\="delegatecomment_context" deleted\="false" description\="Comment for delegate methods" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.delegatecomment" name\="delegatecomment">/**\n * ${tags}\n * ${see_to_target}\n */</template><template autoinsert\="false" context\="newtype_context" deleted\="false" description\="Newly created files" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.newtype" name\="newtype">/*\n * Copyright 2009 Red Hat, Inc.\n * Red Hat licenses this file to you under the Apache License, version\n * 2.0 (the "License"); you may not use this file except in compliance\n * with the License. You may obtain a copy of the License at\n * http\://www.apache.org/licenses/LICENSE-2.0\n * Unless required by applicable law or agreed to in!
writing, software\n * distributed under the License is distributed on an "AS IS" BASIS,\n * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or\n * implied. See the License for the specific language governing\n * permissions and limitations under the License.\n */\n\n${filecomment}\n${package_declaration}\n\n${typecomment}\n${type_declaration}</template><template autoinsert\="false" context\="classbody_context" deleted\="false" description\="Code in new class type bodies" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.classbody" name\="classbody">\n // Constants -----------------------------------------------------\n\n // Attributes ----------------------------------------------------\n\n // Static --------------------------------------------------------\n\n // Constructors --------------------------------------------------\n\n // Public --------------------------------------------------------\n\n // Package protected ------------------!
---------------------------\n\n // Protected ---------------!
--------
------------------------------\n\n // Private -------------------------------------------------------\n\n // Inner classes -------------------------------------------------\n\n</template><template autoinsert\="true" context\="interfacebody_context" deleted\="false" description\="Code in new interface type bodies" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.interfacebody" name\="interfacebody">\n</template><template autoinsert\="true" context\="enumbody_context" deleted\="false" description\="Code in new enum type bodies" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.enumbody" name\="enumbody">\n</template><template autoinsert\="true" context\="annotationbody_context" deleted\="false" description\="Code in new annotation type bodies" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.annotationbody" name\="annotationbody">\n</template><template autoinsert\="true" context\="catchblock_context" deleted\="false" description\="Code in new c!
atch blocks" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.catchblock" name\="catchblock">// ${todo} Auto-generated catch block\n${exception_var}.printStackTrace();</template><template autoinsert\="true" context\="methodbody_context" deleted\="false" description\="Code in created method stubs" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.methodbody" name\="methodbody">// ${todo} Auto-generated method stub\n${body_statement}</template><template autoinsert\="true" context\="constructorbody_context" deleted\="false" description\="Code in created constructor stubs" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.constructorbody" name\="constructorbody">${body_statement}\n// ${todo} Auto-generated constructor stub</template><template autoinsert\="true" context\="getterbody_context" deleted\="false" description\="Code in created getters" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.getterbody" name\="getterbody">return ${field};!
</template><template autoinsert\="true" context\="setterbody_c!
ontext"
deleted\="false" description\="Code in created setters" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.setterbody" name\="setterbody">${field} \= ${param};</template></templates>
+org.eclipse.jdt.ui.text.custom_code_templates=<?xml version\="1.0" encoding\="UTF-8" standalone\="no"?><templates><template autoinsert\="true" context\="gettercomment_context" deleted\="false" description\="Comment for getter method" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.gettercomment" name\="gettercomment">/**\n * @return the ${bare_field_name}\n */</template><template autoinsert\="true" context\="settercomment_context" deleted\="false" description\="Comment for setter method" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.settercomment" name\="settercomment">/**\n * @param ${param} the ${bare_field_name} to set\n */</template><template autoinsert\="true" context\="constructorcomment_context" deleted\="false" description\="Comment for created constructors" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.constructorcomment" name\="constructorcomment">/**\n * ${tags}\n */</template><template autoinsert\="false" context\="filecommen!
t_context" deleted\="false" description\="Comment for created Java files" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.filecomment" name\="filecomment"/><template autoinsert\="false" context\="typecomment_context" deleted\="false" description\="Comment for created types" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.typecomment" name\="typecomment">/**\n * A ${type_name}\n *\n * @author ${user}\n *\n * ${tags}\n *\n */</template><template autoinsert\="true" context\="fieldcomment_context" deleted\="false" description\="Comment for fields" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.fieldcomment" name\="fieldcomment">/**\n * \n */</template><template autoinsert\="true" context\="methodcomment_context" deleted\="false" description\="Comment for non-overriding methods" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.methodcomment" name\="methodcomment">/**\n * ${tags}\n */</template><template autoinsert\="true" context\="ove!
rridecomment_context" deleted\="false" description\="Comment f!
or overr
iding methods" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.overridecomment" name\="overridecomment">/* (non-Javadoc)\n * ${see_to_overridden}\n */</template><template autoinsert\="true" context\="delegatecomment_context" deleted\="false" description\="Comment for delegate methods" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.delegatecomment" name\="delegatecomment">/**\n * ${tags}\n * ${see_to_target}\n */</template><template autoinsert\="false" context\="newtype_context" deleted\="false" description\="Newly created files" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.newtype" name\="newtype">/*\n * Copyright 2010 Red Hat, Inc.\n * Red Hat licenses this file to you under the Apache License, version\n * 2.0 (the "License"); you may not use this file except in compliance\n * with the License. You may obtain a copy of the License at\n * http\://www.apache.org/licenses/LICENSE-2.0\n * Unless required by applicable law or agreed to in!
writing, software\n * distributed under the License is distributed on an "AS IS" BASIS,\n * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or\n * implied. See the License for the specific language governing\n * permissions and limitations under the License.\n */\n\n${filecomment}\n${package_declaration}\n\n${typecomment}\n${type_declaration}</template><template autoinsert\="false" context\="classbody_context" deleted\="false" description\="Code in new class type bodies" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.classbody" name\="classbody">\n // Constants -----------------------------------------------------\n\n // Attributes ----------------------------------------------------\n\n // Static --------------------------------------------------------\n\n // Constructors --------------------------------------------------\n\n // Public --------------------------------------------------------\n\n // Package protected ------------------!
---------------------------\n\n // Protected ---------------!
--------
------------------------------\n\n // Private -------------------------------------------------------\n\n // Inner classes -------------------------------------------------\n\n</template><template autoinsert\="true" context\="interfacebody_context" deleted\="false" description\="Code in new interface type bodies" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.interfacebody" name\="interfacebody">\n</template><template autoinsert\="true" context\="enumbody_context" deleted\="false" description\="Code in new enum type bodies" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.enumbody" name\="enumbody">\n</template><template autoinsert\="true" context\="annotationbody_context" deleted\="false" description\="Code in new annotation type bodies" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.annotationbody" name\="annotationbody">\n</template><template autoinsert\="true" context\="catchblock_context" deleted\="false" description\="Code in new c!
atch blocks" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.catchblock" name\="catchblock">// ${todo} Auto-generated catch block\n${exception_var}.printStackTrace();</template><template autoinsert\="true" context\="methodbody_context" deleted\="false" description\="Code in created method stubs" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.methodbody" name\="methodbody">// ${todo} Auto-generated method stub\n${body_statement}</template><template autoinsert\="true" context\="constructorbody_context" deleted\="false" description\="Code in created constructor stubs" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.constructorbody" name\="constructorbody">${body_statement}\n// ${todo} Auto-generated constructor stub</template><template autoinsert\="true" context\="getterbody_context" deleted\="false" description\="Code in created getters" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.getterbody" name\="getterbody">return ${field};!
</template><template autoinsert\="true" context\="setterbody_c!
ontext"
deleted\="false" description\="Code in created setters" enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.setterbody" name\="setterbody">${field} \= ${param};</template></templates>
sp_cleanup.add_default_serial_version_id=true
sp_cleanup.add_generated_serial_version_id=false
sp_cleanup.add_missing_annotations=true
14 years, 11 months
JBoss hornetq SVN: r8804 - in trunk: src/main/org/hornetq/core/postoffice/impl and 2 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-01-18 06:27:20 -0500 (Mon, 18 Jan 2010)
New Revision: 8804
Added:
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterHeadersRemovedTest.java
Modified:
trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-271
Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2010-01-14 12:06:43 UTC (rev 8803)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2010-01-18 11:27:20 UTC (rev 8804)
@@ -53,8 +53,6 @@
public static final SimpleString HDR_ROUTE_TO_IDS = new SimpleString("_HQ_ROUTE_TO");
- public static final SimpleString HDR_FROM_CLUSTER = new SimpleString("_HQ_FROM_CLUSTER");
-
protected long messageID;
protected SimpleString address;
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2010-01-14 12:06:43 UTC (rev 8803)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2010-01-18 11:27:20 UTC (rev 8804)
@@ -238,7 +238,9 @@
if (!routed)
{
- if (message.containsProperty(MessageImpl.HDR_FROM_CLUSTER))
+ //TODO this is a little inefficient since we do the lookup once to see if the property
+ //is there, then do it again to remove the actual property
+ if (message.containsProperty(MessageImpl.HDR_ROUTE_TO_IDS))
{
routeFromCluster(message, context);
}
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2010-01-14 12:06:43 UTC (rev 8803)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2010-01-18 11:27:20 UTC (rev 8804)
@@ -427,8 +427,6 @@
}
message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
-
- message.putBooleanProperty(MessageImpl.HDR_FROM_CLUSTER, Boolean.TRUE);
}
if (useDuplicateDetection && !message.containsProperty(Message.HDR_DUPLICATE_DETECTION_ID))
Added: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterHeadersRemovedTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterHeadersRemovedTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterHeadersRemovedTest.java 2010-01-18 11:27:20 UTC (rev 8804)
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.distribution;
+
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
+
+public class ClusterHeadersRemovedTest extends ClusterTestBase
+{
+ private static final Logger log = Logger.getLogger(ClusterHeadersRemovedTest.class);
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1);
+
+ super.tearDown();
+ }
+
+ protected boolean isNetty()
+ {
+ return false;
+ }
+
+ public void testHeadersRemoved() throws Exception
+ {
+ setupClusterConnection("cluster1", 0, 1, "queues", false, 1, isNetty());
+ startServers(1, 0);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+
+ addConsumer(1, 1, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
+ waitForBindings(0, "queues.testaddress", 1, 1, false);
+
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+
+ ClientSessionFactory sf = sfs[0];
+
+ ClientSession session0 = sf.createSession(false, true, true);
+
+ try
+ {
+ ClientProducer producer = session0.createProducer("queues.testaddress");
+
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage message = session0.createMessage(true);
+
+ producer.send(message);
+ }
+ }
+ finally
+ {
+ session0.close();
+ }
+
+ ClientConsumer consumer = super.getConsumer(1);
+
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage message = consumer.receive(5000);
+
+ assertNotNull(message);
+
+ assertFalse(message.containsProperty(MessageImpl.HDR_ROUTE_TO_IDS));
+ }
+ }
+
+}
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-01-14 12:06:43 UTC (rev 8803)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-01-18 11:27:20 UTC (rev 8804)
@@ -28,7 +28,12 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
@@ -144,6 +149,11 @@
protected ClientSessionFactory[] sfs;
+ protected ClientConsumer getConsumer(final int node)
+ {
+ return consumers[node].consumer;
+ }
+
protected void waitForMessages(final int node, final String address, final int count) throws Exception
{
HornetQServer server = servers[node];
14 years, 11 months
JBoss hornetq SVN: r8803 - trunk/docs/quickstart-guide/en.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-01-14 07:06:43 -0500 (Thu, 14 Jan 2010)
New Revision: 8803
Modified:
trunk/docs/quickstart-guide/en/installation.xml
Log:
https://jira.jboss.org/jira/browse/HORNETQ-269
Modified: trunk/docs/quickstart-guide/en/installation.xml
===================================================================
--- trunk/docs/quickstart-guide/en/installation.xml 2010-01-13 16:42:15 UTC (rev 8802)
+++ trunk/docs/quickstart-guide/en/installation.xml 2010-01-14 12:06:43 UTC (rev 8803)
@@ -109,7 +109,7 @@
directory where you installed JBoss AS 5</para>
</listitem>
<listitem>
- <para>run <literal>./build.sh</literal> (or <literal>build.bat</literal> if you are on
+ <para>run <literal>./build.sh as5</literal> (or <literal>build.bat as5</literal> if you are on
Windows) in HornetQ's <literal>config/jboss-as</literal> directory</para>
</listitem>
</orderedlist>
@@ -143,7 +143,7 @@
directory where you installed JBoss AS 4</para>
</listitem>
<listitem>
- <para>run <literal><literal>./build.sh</literal> (or <literal>build.bat</literal> if you
+ <para>run <literal><literal>./build.sh as4</literal> (or <literal>build.bat as4</literal> if you
are on Windows)</literal> in HornetQ's <literal>config/jboss-as</literal>
directory</para>
</listitem>
14 years, 11 months
JBoss hornetq SVN: r8802 - tags.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-01-13 11:42:15 -0500 (Wed, 13 Jan 2010)
New Revision: 8802
Removed:
tags/HornetQ_2_0_0_GA_pending/
Log:
removing pending tag
14 years, 11 months
JBoss hornetq SVN: r8801 - tags.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-01-13 11:41:50 -0500 (Wed, 13 Jan 2010)
New Revision: 8801
Removed:
tags/HornetQ_2_0_0_CR3_pending/
Log:
removing pending tag
14 years, 11 months