[hornetq-commits] JBoss hornetq SVN: r8832 - in trunk: src/main/org/hornetq/core/protocol/aardvark and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Jan 21 15:09:24 EST 2010


Author: timfox
Date: 2010-01-21 15:09:23 -0500 (Thu, 21 Jan 2010)
New Revision: 8832

Added:
   trunk/src/main/org/hornetq/core/protocol/aardvark/
   trunk/src/main/org/hornetq/core/protocol/aardvark/impl/
   trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java
   trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManager.java
   trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManagerFactory.java
   trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManagerFactory.java
   trunk/src/main/org/hornetq/spi/core/protocol/ProtocolManagerFactory.java
   trunk/tests/src/org/hornetq/tests/integration/remoting/AardvarkProtocolTest.java
Log:
refactoring to abstract out protocol

Added: trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java	2010-01-21 20:09:23 UTC (rev 8832)
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.aardvark.impl;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.CloseListener;
+import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.remoting.Connection;
+
+/**
+ * A AardvarkConnection
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class AardvarkConnection implements RemotingConnection
+{
+   private static final Logger log = Logger.getLogger(AardvarkConnection.class);
+
+   private final AardvarkProtocolManager manager;
+   
+   private final Connection transportConnection;
+      
+   AardvarkConnection(final Connection transportConnection, final AardvarkProtocolManager manager)
+   {
+      this.transportConnection = transportConnection;
+      
+      this.manager = manager;
+   }
+
+   public void addCloseListener(CloseListener listener)
+   {
+   }
+
+   public void addFailureListener(FailureListener listener)
+   {
+   }
+
+   public boolean checkDataReceived()
+   {
+      return true;
+   }
+
+   public HornetQBuffer createBuffer(int size)
+   {
+      return HornetQBuffers.dynamicBuffer(size);
+   }
+
+   public void destroy()
+   {
+   }
+
+   public void disconnect()
+   {
+   }
+
+   public void fail(HornetQException me)
+   {
+   }
+
+   public void flush()
+   {  
+   }
+
+   public List<FailureListener> getFailureListeners()
+   {
+      return Collections.EMPTY_LIST;
+   }
+
+   public Object getID()
+   {
+      return transportConnection.getID();
+   }
+
+   public String getRemoteAddress()
+   {      
+      return transportConnection.getRemoteAddress();
+   }
+
+   public Connection getTransportConnection()
+   {
+      return transportConnection;
+   }
+
+   public boolean isClient()
+   {
+      return false;
+   }
+
+   public boolean isDestroyed()
+   {
+      return false;
+   }
+
+   public boolean removeCloseListener(CloseListener listener)
+   {
+      return false;
+   }
+
+   public boolean removeFailureListener(FailureListener listener)
+   {
+      return false;
+   }
+
+   public void setFailureListeners(List<FailureListener> listeners)
+   {
+   }
+
+   public void bufferReceived(Object connectionID, HornetQBuffer buffer)
+   {
+      manager.handleBuffer(this, buffer);
+   }
+
+   public int isReadyToHandle(HornetQBuffer buffer)
+   {
+      return -1;
+   }
+
+}

Added: trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManager.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManager.java	2010-01-21 20:09:23 UTC (rev 8832)
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.aardvark.impl;
+
+import java.util.List;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.spi.core.protocol.ConnectionEntry;
+import org.hornetq.spi.core.protocol.ProtocolManager;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.protocol.SessionCallback;
+import org.hornetq.spi.core.remoting.Connection;
+
+/**
+ * AardvarkProtocolManager
+ * 
+ * A stupid protocol to demonstrate how to implement a new protocol in HornetQ
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class AardvarkProtocolManager implements ProtocolManager
+{
+   private static final Logger log = Logger.getLogger(AardvarkProtocolManager.class);
+
+   private final HornetQServer server;
+
+   public AardvarkProtocolManager(final HornetQServer server, final List<Interceptor> interceptors)
+   {
+      this.server = server;
+   }
+
+   public ConnectionEntry createConnectionEntry(final Connection connection)
+   {
+      AardvarkConnection conn = new AardvarkConnection(connection, this);
+
+      return new ConnectionEntry(conn, 0, 0);
+   }
+
+   public void handleBuffer(final RemotingConnection conn, final HornetQBuffer buffer)
+   {                 
+      try
+      {
+         ServerSession session = server.createSession("aardvark",
+                                                      null,
+                                                      null,
+                                                      Integer.MAX_VALUE,
+                                                      conn,
+                                                      true,
+                                                      true,
+                                                      true,
+                                                      false);
+         
+         session.setCallback(new AardvarkSessionCallback(conn.getTransportConnection()));
+         
+         final SimpleString name = new SimpleString("hornetq.aardvark");
+         
+         session.createQueue(name, name, null, false, false);
+         
+         session.createConsumer(0, name, null, false);
+         
+         session.receiveConsumerCredits(0, -1); // No flow control
+         
+         session.start();
+         
+         ServerMessage message = new ServerMessageImpl(0, 1000);
+         
+         message.setAddress(name);
+         
+         message.getBodyBuffer().writeUTF("GIRAFFE\n");
+         
+         session.send(message);
+         
+         session.start();
+         
+         session.closeConsumer(0);
+         
+         session.deleteQueue(name);
+         
+         session.close();
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to create session", e);
+      }
+   }
+
+   private class AardvarkSessionCallback implements SessionCallback
+   {
+      private final Connection connection;
+      
+      AardvarkSessionCallback(final Connection connection)
+      {
+         this.connection = connection;
+      }
+
+      public void closed()
+      {
+      }
+
+      public int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize, int deliveryCount)
+      {
+         return 0;
+      }
+
+      public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean continues, boolean requiresResponse)
+      {
+         return 0;
+      }
+
+      public int sendMessage(ServerMessage message, long consumerID, int deliveryCount)
+      {
+         HornetQBuffer buffer = message.getBodyBuffer();
+         
+         buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE);
+         
+         connection.write(buffer);
+         
+         return -1;
+      }
+
+      public void sendProducerCreditsMessage(int credits, SimpleString address, int offset)
+      {
+      }
+      
+   }
+
+   public void bufferReceived(Object connectionID, HornetQBuffer buffer)
+   {      
+   }
+
+   public int isReadyToHandle(HornetQBuffer buffer)
+   {
+      //Look for a new-line
+      
+      //BTW this is very inefficient - in a real protocol you'd want to do this better
+      
+      for (int i = buffer.readerIndex(); i < buffer.writerIndex(); i++)
+      {
+         byte b = buffer.getByte(i);
+         
+         if (b == (byte)'\n')
+         {
+            return buffer.writerIndex();
+         }
+      }
+            
+      return -1;      
+   }
+
+}

Added: trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManagerFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManagerFactory.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManagerFactory.java	2010-01-21 20:09:23 UTC (rev 8832)
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.aardvark.impl;
+
+import java.util.List;
+
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.spi.core.protocol.ProtocolManager;
+import org.hornetq.spi.core.protocol.ProtocolManagerFactory;
+
+/**
+ * A AardvarkProtocolManagerFactory
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class AardvarkProtocolManagerFactory implements ProtocolManagerFactory
+{
+
+   public ProtocolManager createProtocolManager(final HornetQServer server, final List<Interceptor> interceptors)
+   {
+      return new AardvarkProtocolManager(server, interceptors);
+   }
+
+}

Added: trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManagerFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManagerFactory.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManagerFactory.java	2010-01-21 20:09:23 UTC (rev 8832)
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl;
+
+import java.util.List;
+
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.spi.core.protocol.ProtocolManager;
+import org.hornetq.spi.core.protocol.ProtocolManagerFactory;
+
+/**
+ * A CoreProtocolManagerFactory
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class CoreProtocolManagerFactory implements ProtocolManagerFactory
+{
+   public ProtocolManager createProtocolManager(final HornetQServer server, final List<Interceptor> interceptors)
+   {
+      return new CoreProtocolManager(server, interceptors);
+   }
+}

Added: trunk/src/main/org/hornetq/spi/core/protocol/ProtocolManagerFactory.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/protocol/ProtocolManagerFactory.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/spi/core/protocol/ProtocolManagerFactory.java	2010-01-21 20:09:23 UTC (rev 8832)
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.spi.core.protocol;
+
+import java.util.List;
+
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.core.server.HornetQServer;
+
+/**
+ * A ProtocolManagerFactory
+ *
+ * @author tim
+ *
+ *
+ */
+public interface ProtocolManagerFactory
+{
+   ProtocolManager createProtocolManager(HornetQServer server, List<Interceptor> interceptors);
+}

Added: trunk/tests/src/org/hornetq/tests/integration/remoting/AardvarkProtocolTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/remoting/AardvarkProtocolTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/remoting/AardvarkProtocolTest.java	2010-01-21 20:09:23 UTC (rev 8832)
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.remoting;
+
+import java.io.BufferedOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
+import org.hornetq.integration.transports.netty.TransportConstants;
+import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * An AardvarkProtocolTest
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class AardvarkProtocolTest extends ServiceTestBase
+{
+   private static final Logger log = Logger.getLogger(AardvarkProtocolTest.class);
+
+   public void testAardvark() throws Exception
+   {
+      RemotingServiceImpl.hackProtocol = ProtocolType.AARDVARK;
+      
+      Configuration config = new ConfigurationImpl();
+      
+      config.setSecurityEnabled(false);
+      config.setPersistenceEnabled(false);
+      
+      Map<String, Object> params = new HashMap<String, Object>();
+      
+      params.put(TransportConstants.PORT_PROP_NAME, 9876);
+      params.put(TransportConstants.HOST_PROP_NAME, "127.0.0.1");
+      
+      TransportConfiguration tc = new TransportConfiguration(NettyAcceptorFactory.class.getCanonicalName(),
+                                                             params);
+      
+      config.getAcceptorConfigurations().add(tc);
+      
+      HornetQServer server = HornetQServers.newHornetQServer(config);
+      
+      server.start();
+      
+      //Now we should be able to make a connection to this port and talk the Aardvark protocol!
+      
+      Socket socket = new Socket("127.0.0.1", 9876);
+      
+      
+      
+      OutputStream out = new BufferedOutputStream(socket.getOutputStream());
+      
+      String s = "AARDVARK!\n";
+      
+      byte[] bytes = s.getBytes("UTF-8");
+      
+      out.write(bytes);
+      
+      out.flush();
+      
+      InputStream in = socket.getInputStream();
+
+      log.info("writing bytes");
+            byte b;
+      
+      while ((b = (byte)in.read()) != '\n')
+      {
+         log.info("read " + (char)b);
+      }
+            
+      socket.close();
+      
+      server.stop();
+      
+      RemotingServiceImpl.hackProtocol = ProtocolType.CORE;
+   }
+}



More information about the hornetq-commits mailing list