Author: timfox
Date: 2010-01-21 15:09:23 -0500 (Thu, 21 Jan 2010)
New Revision: 8832
Added:
trunk/src/main/org/hornetq/core/protocol/aardvark/
trunk/src/main/org/hornetq/core/protocol/aardvark/impl/
trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java
trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManager.java
trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManagerFactory.java
trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManagerFactory.java
trunk/src/main/org/hornetq/spi/core/protocol/ProtocolManagerFactory.java
trunk/tests/src/org/hornetq/tests/integration/remoting/AardvarkProtocolTest.java
Log:
refactoring to abstract out protocol
Added: trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java
(rev 0)
+++
trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java 2010-01-21
20:09:23 UTC (rev 8832)
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.aardvark.impl;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.CloseListener;
+import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.remoting.Connection;
+
+/**
+ * A AardvarkConnection
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class AardvarkConnection implements RemotingConnection
+{
+ private static final Logger log = Logger.getLogger(AardvarkConnection.class);
+
+ private final AardvarkProtocolManager manager;
+
+ private final Connection transportConnection;
+
+ AardvarkConnection(final Connection transportConnection, final AardvarkProtocolManager
manager)
+ {
+ this.transportConnection = transportConnection;
+
+ this.manager = manager;
+ }
+
+ public void addCloseListener(CloseListener listener)
+ {
+ }
+
+ public void addFailureListener(FailureListener listener)
+ {
+ }
+
+ public boolean checkDataReceived()
+ {
+ return true;
+ }
+
+ public HornetQBuffer createBuffer(int size)
+ {
+ return HornetQBuffers.dynamicBuffer(size);
+ }
+
+ public void destroy()
+ {
+ }
+
+ public void disconnect()
+ {
+ }
+
+ public void fail(HornetQException me)
+ {
+ }
+
+ public void flush()
+ {
+ }
+
+ public List<FailureListener> getFailureListeners()
+ {
+ return Collections.EMPTY_LIST;
+ }
+
+ public Object getID()
+ {
+ return transportConnection.getID();
+ }
+
+ public String getRemoteAddress()
+ {
+ return transportConnection.getRemoteAddress();
+ }
+
+ public Connection getTransportConnection()
+ {
+ return transportConnection;
+ }
+
+ public boolean isClient()
+ {
+ return false;
+ }
+
+ public boolean isDestroyed()
+ {
+ return false;
+ }
+
+ public boolean removeCloseListener(CloseListener listener)
+ {
+ return false;
+ }
+
+ public boolean removeFailureListener(FailureListener listener)
+ {
+ return false;
+ }
+
+ public void setFailureListeners(List<FailureListener> listeners)
+ {
+ }
+
+ public void bufferReceived(Object connectionID, HornetQBuffer buffer)
+ {
+ manager.handleBuffer(this, buffer);
+ }
+
+ public int isReadyToHandle(HornetQBuffer buffer)
+ {
+ return -1;
+ }
+
+}
Added:
trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManager.java
(rev 0)
+++
trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManager.java 2010-01-21
20:09:23 UTC (rev 8832)
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.aardvark.impl;
+
+import java.util.List;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.spi.core.protocol.ConnectionEntry;
+import org.hornetq.spi.core.protocol.ProtocolManager;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.protocol.SessionCallback;
+import org.hornetq.spi.core.remoting.Connection;
+
+/**
+ * AardvarkProtocolManager
+ *
+ * A stupid protocol to demonstrate how to implement a new protocol in HornetQ
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class AardvarkProtocolManager implements ProtocolManager
+{
+ private static final Logger log = Logger.getLogger(AardvarkProtocolManager.class);
+
+ private final HornetQServer server;
+
+ public AardvarkProtocolManager(final HornetQServer server, final
List<Interceptor> interceptors)
+ {
+ this.server = server;
+ }
+
+ public ConnectionEntry createConnectionEntry(final Connection connection)
+ {
+ AardvarkConnection conn = new AardvarkConnection(connection, this);
+
+ return new ConnectionEntry(conn, 0, 0);
+ }
+
+ public void handleBuffer(final RemotingConnection conn, final HornetQBuffer buffer)
+ {
+ try
+ {
+ ServerSession session = server.createSession("aardvark",
+ null,
+ null,
+ Integer.MAX_VALUE,
+ conn,
+ true,
+ true,
+ true,
+ false);
+
+ session.setCallback(new
AardvarkSessionCallback(conn.getTransportConnection()));
+
+ final SimpleString name = new SimpleString("hornetq.aardvark");
+
+ session.createQueue(name, name, null, false, false);
+
+ session.createConsumer(0, name, null, false);
+
+ session.receiveConsumerCredits(0, -1); // No flow control
+
+ session.start();
+
+ ServerMessage message = new ServerMessageImpl(0, 1000);
+
+ message.setAddress(name);
+
+ message.getBodyBuffer().writeUTF("GIRAFFE\n");
+
+ session.send(message);
+
+ session.start();
+
+ session.closeConsumer(0);
+
+ session.deleteQueue(name);
+
+ session.close();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to create session", e);
+ }
+ }
+
+ private class AardvarkSessionCallback implements SessionCallback
+ {
+ private final Connection connection;
+
+ AardvarkSessionCallback(final Connection connection)
+ {
+ this.connection = connection;
+ }
+
+ public void closed()
+ {
+ }
+
+ public int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize,
int deliveryCount)
+ {
+ return 0;
+ }
+
+ public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean
continues, boolean requiresResponse)
+ {
+ return 0;
+ }
+
+ public int sendMessage(ServerMessage message, long consumerID, int deliveryCount)
+ {
+ HornetQBuffer buffer = message.getBodyBuffer();
+
+ buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE);
+
+ connection.write(buffer);
+
+ return -1;
+ }
+
+ public void sendProducerCreditsMessage(int credits, SimpleString address, int
offset)
+ {
+ }
+
+ }
+
+ public void bufferReceived(Object connectionID, HornetQBuffer buffer)
+ {
+ }
+
+ public int isReadyToHandle(HornetQBuffer buffer)
+ {
+ //Look for a new-line
+
+ //BTW this is very inefficient - in a real protocol you'd want to do this
better
+
+ for (int i = buffer.readerIndex(); i < buffer.writerIndex(); i++)
+ {
+ byte b = buffer.getByte(i);
+
+ if (b == (byte)'\n')
+ {
+ return buffer.writerIndex();
+ }
+ }
+
+ return -1;
+ }
+
+}
Added:
trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManagerFactory.java
===================================================================
---
trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManagerFactory.java
(rev 0)
+++
trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManagerFactory.java 2010-01-21
20:09:23 UTC (rev 8832)
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.aardvark.impl;
+
+import java.util.List;
+
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.spi.core.protocol.ProtocolManager;
+import org.hornetq.spi.core.protocol.ProtocolManagerFactory;
+
+/**
+ * A AardvarkProtocolManagerFactory
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class AardvarkProtocolManagerFactory implements ProtocolManagerFactory
+{
+
+ public ProtocolManager createProtocolManager(final HornetQServer server, final
List<Interceptor> interceptors)
+ {
+ return new AardvarkProtocolManager(server, interceptors);
+ }
+
+}
Added: trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManagerFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManagerFactory.java
(rev 0)
+++
trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManagerFactory.java 2010-01-21
20:09:23 UTC (rev 8832)
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl;
+
+import java.util.List;
+
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.spi.core.protocol.ProtocolManager;
+import org.hornetq.spi.core.protocol.ProtocolManagerFactory;
+
+/**
+ * A CoreProtocolManagerFactory
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class CoreProtocolManagerFactory implements ProtocolManagerFactory
+{
+ public ProtocolManager createProtocolManager(final HornetQServer server, final
List<Interceptor> interceptors)
+ {
+ return new CoreProtocolManager(server, interceptors);
+ }
+}
Added: trunk/src/main/org/hornetq/spi/core/protocol/ProtocolManagerFactory.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/protocol/ProtocolManagerFactory.java
(rev 0)
+++ trunk/src/main/org/hornetq/spi/core/protocol/ProtocolManagerFactory.java 2010-01-21
20:09:23 UTC (rev 8832)
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.spi.core.protocol;
+
+import java.util.List;
+
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.core.server.HornetQServer;
+
+/**
+ * A ProtocolManagerFactory
+ *
+ * @author tim
+ *
+ *
+ */
+public interface ProtocolManagerFactory
+{
+ ProtocolManager createProtocolManager(HornetQServer server, List<Interceptor>
interceptors);
+}
Added: trunk/tests/src/org/hornetq/tests/integration/remoting/AardvarkProtocolTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/remoting/AardvarkProtocolTest.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/remoting/AardvarkProtocolTest.java 2010-01-21
20:09:23 UTC (rev 8832)
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.remoting;
+
+import java.io.BufferedOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
+import org.hornetq.integration.transports.netty.TransportConstants;
+import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * An AardvarkProtocolTest
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class AardvarkProtocolTest extends ServiceTestBase
+{
+ private static final Logger log = Logger.getLogger(AardvarkProtocolTest.class);
+
+ public void testAardvark() throws Exception
+ {
+ RemotingServiceImpl.hackProtocol = ProtocolType.AARDVARK;
+
+ Configuration config = new ConfigurationImpl();
+
+ config.setSecurityEnabled(false);
+ config.setPersistenceEnabled(false);
+
+ Map<String, Object> params = new HashMap<String, Object>();
+
+ params.put(TransportConstants.PORT_PROP_NAME, 9876);
+ params.put(TransportConstants.HOST_PROP_NAME, "127.0.0.1");
+
+ TransportConfiguration tc = new
TransportConfiguration(NettyAcceptorFactory.class.getCanonicalName(),
+ params);
+
+ config.getAcceptorConfigurations().add(tc);
+
+ HornetQServer server = HornetQServers.newHornetQServer(config);
+
+ server.start();
+
+ //Now we should be able to make a connection to this port and talk the Aardvark
protocol!
+
+ Socket socket = new Socket("127.0.0.1", 9876);
+
+
+
+ OutputStream out = new BufferedOutputStream(socket.getOutputStream());
+
+ String s = "AARDVARK!\n";
+
+ byte[] bytes = s.getBytes("UTF-8");
+
+ out.write(bytes);
+
+ out.flush();
+
+ InputStream in = socket.getInputStream();
+
+ log.info("writing bytes");
+ byte b;
+
+ while ((b = (byte)in.read()) != '\n')
+ {
+ log.info("read " + (char)b);
+ }
+
+ socket.close();
+
+ server.stop();
+
+ RemotingServiceImpl.hackProtocol = ProtocolType.CORE;
+ }
+}