[hornetq-commits] JBoss hornetq SVN: r7996 - in branches/Replication_Clebert: src/main/org/hornetq/core/remoting/server and 6 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Sep 25 21:59:14 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-09-25 21:59:13 -0400 (Fri, 25 Sep 2009)
New Revision: 7996

Added:
   branches/Replication_Clebert/src/main/org/hornetq/core/remoting/server/HandlerFactory.java
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/BackupListener.java
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationToken.java
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationPacketHandler.java
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java
   branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/
   branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Modified:
   branches/Replication_Clebert/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
   branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
Just backing up ideas.. a lot of this will change

Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/server/HandlerFactory.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/server/HandlerFactory.java	                        (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/server/HandlerFactory.java	2009-09-26 01:59:13 UTC (rev 7996)
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.server;
+
+import org.hornetq.core.remoting.Channel;
+import org.hornetq.core.remoting.ChannelHandler;
+import org.hornetq.core.remoting.RemotingConnection;
+
+/**
+ * The RemotingService could be used by either the Replication Manager or by the Server.
+ * 
+ * Each will need to use a different Handler, so this factory may be used to pass what handler needs to be created
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface HandlerFactory
+{
+   ChannelHandler getHandler(RemotingConnection conn, Channel channel);
+}

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2009-09-26 01:57:13 UTC (rev 7995)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2009-09-26 01:59:13 UTC (rev 7996)
@@ -15,7 +15,6 @@
 
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.DISCONNECT;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -43,6 +42,7 @@
 import org.hornetq.core.remoting.impl.invm.TransportConstants;
 import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
 import org.hornetq.core.remoting.impl.wireformat.Ping;
+import org.hornetq.core.remoting.server.HandlerFactory;
 import org.hornetq.core.remoting.server.RemotingService;
 import org.hornetq.core.remoting.spi.Acceptor;
 import org.hornetq.core.remoting.spi.AcceptorFactory;
@@ -52,6 +52,7 @@
 import org.hornetq.core.remoting.spi.HornetQBuffer;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.impl.HornetQPacketHandler;
+import org.hornetq.utils.ExecutorFactory;
 
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -83,8 +84,10 @@
 
    private final Configuration config;
 
-   private volatile HornetQServer server;
+   private final ExecutorFactory executorFactory;
 
+   private final HandlerFactory handlerFactory;
+
    private ManagementService managementService;
 
    private volatile RemotingConnection serverSideReplicatingConnection;
@@ -102,7 +105,8 @@
    // Constructors --------------------------------------------------
 
    public RemotingServiceImpl(final Configuration config,
-                              final HornetQServer server,
+                              final HandlerFactory handlerFactory,
+                              final ExecutorFactory executorFactory,
                               final ManagementService managementService,
                               final Executor threadPool,
                               final ScheduledExecutorService scheduledThreadPool,
@@ -110,6 +114,10 @@
    {
       transportConfigs = config.getAcceptorConfigurations();
 
+      this.executorFactory = executorFactory;
+
+      this.handlerFactory = handlerFactory;
+
       ClassLoader loader = Thread.currentThread().getContextClassLoader();
       for (String interceptorClass : config.getInterceptorClassNames())
       {
@@ -125,7 +133,6 @@
       }
 
       this.config = config;
-      this.server = server;
       this.managementService = managementService;
       this.threadPool = threadPool;
       this.scheduledThreadPool = scheduledThreadPool;
@@ -176,7 +183,7 @@
       // Remove this code when this is implemented without having to require a special acceptor
       // https://jira.jboss.org/jira/browse/JBMESSAGING-1649
 
-      if (config.isJMXManagementEnabled())
+      if (config.isJMXManagementEnabled() && managementService != null)
       {
          Map<String, Object> params = new HashMap<String, Object>();
 
@@ -188,12 +195,9 @@
 
          acceptors.add(acceptor);
 
-         if (managementService != null)
-         {
-            TransportConfiguration info = new TransportConfiguration(InVMAcceptorFactory.class.getName(), params);
+         TransportConfiguration info = new TransportConfiguration(InVMAcceptorFactory.class.getName(), params);
 
-            managementService.registerAcceptor(acceptor, info);
-         }
+         managementService.registerAcceptor(acceptor, info);
       }
 
       for (Acceptor a : acceptors)
@@ -233,28 +237,31 @@
       }
 
       failureCheckThread.close();
-      
+
       // We need to stop them accepting first so no new connections are accepted after we send the disconnect message
       for (Acceptor acceptor : acceptors)
       {
          acceptor.pause();
       }
-     
+
       for (ConnectionEntry entry : connections.values())
-      {       
+      {
          entry.connection.getChannel(0, -1, false).sendAndFlush(new PacketImpl(DISCONNECT));
       }
-           
+
       for (Acceptor acceptor : acceptors)
       {
          acceptor.stop();
       }
-     
+
       acceptors.clear();
 
       connections.clear();
 
-      managementService.unregisterAcceptors();
+      if (managementService != null)
+      {
+         managementService.unregisterAcceptors();
+      }
 
       started = false;
    }
@@ -299,31 +306,14 @@
 
    public void connectionCreated(final Connection connection)
    {
-      if (server == null)
-      {
-         throw new IllegalStateException("Unable to create connection, server hasn't finished starting up");
-      }
-      
-      RemotingConnection rc = new RemotingConnectionImpl(connection,
-                                                         interceptors,                                                        
-                                                         server.getConfiguration().isAsyncConnectionExecutionEnabled() ? server.getExecutorFactory()
-                                                                                                                               .getExecutor()
-                                                                                                                      : null);
+      RemotingConnection rc = createChannel(connection);
 
-      Channel channel1 = rc.getChannel(1, -1, false);
-
-      ChannelHandler handler = new HornetQPacketHandler(server, channel1, rc);
-
-      channel1.setHandler(handler);
-
       long ttl = ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
       if (config.getConnectionTTLOverride() != -1)
       {
          ttl = config.getConnectionTTLOverride();
       }
-      final ConnectionEntry entry = new ConnectionEntry(rc,
-                                                        System.currentTimeMillis(),
-                                                        ttl);
+      final ConnectionEntry entry = new ConnectionEntry(rc, System.currentTimeMillis(), ttl);
 
       connections.put(connection.getID(), entry);
 
@@ -402,6 +392,29 @@
 
    // Protected -----------------------------------------------------
 
+   protected List<Interceptor> getInterceptors()
+   {
+      return this.interceptors;
+   }
+
+   /**
+    * Subclasses (on tests) may use this to create a different channel.
+    */
+   protected RemotingConnection createChannel(final Connection connection)
+   {
+      RemotingConnection rc = new RemotingConnectionImpl(connection,
+                                                         interceptors,
+                                                         executorFactory != null ? executorFactory.getExecutor() : null);
+
+      Channel channel1 = rc.getChannel(1, -1, false);
+
+      ChannelHandler handler = handlerFactory.getHandler(rc, channel1);
+
+      channel1.setHandler(handler);
+
+      return rc;
+   }
+
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------
@@ -467,7 +480,7 @@
       }
 
       public void run()
-      {         
+      {
          while (!closed)
          {
             long now = System.currentTimeMillis();
@@ -499,9 +512,9 @@
                RemotingConnection conn = removeConnection(id);
 
                HornetQException me = new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
-                                                              "Did not receive ping from " + conn.getRemoteAddress() +
-                                                                       ". It is likely the client has exited or crashed without " +
-                                                                       "closing its connection, or the network between the server and client has failed. The connection will now be closed.");
+                                                          "Did not receive ping from " + conn.getRemoteAddress() +
+                                                                   ". It is likely the client has exited or crashed without " +
+                                                                   "closing its connection, or the network between the server and client has failed. The connection will now be closed.");
                conn.fail(me);
             }
 

Added: branches/Replication_Clebert/src/main/org/hornetq/core/replication/BackupListener.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/BackupListener.java	                        (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/BackupListener.java	2009-09-26 01:59:13 UTC (rev 7996)
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.replication;
+
+/**
+ * A ReplicationListener
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface BackupListener
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+   
+   void onReplication(byte data[]);
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Added: branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java	                        (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java	2009-09-26 01:59:13 UTC (rev 7996)
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.replication;
+
+
+/**
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface ReplicationManager
+{
+   void replicate(byte[] bytes, ReplicationToken token);
+   
+ 
+   /** to be used on the backup node only */
+   void addBackupListener(BackupListener listener);
+}

Added: branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationToken.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationToken.java	                        (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationToken.java	2009-09-26 01:59:13 UTC (rev 7996)
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.replication;
+
+
+/**
+ * This represents a set of operations done as part of replication. 
+ * When the entire set is done a group of Runnables can be executed.
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface ReplicationToken
+{
+   /** To be called by the replication manager, when new replication is added to the queue */
+   void linedUp();
+
+   /** To be called by the replication manager, when data is confirmed on the channel */
+   void replicated();
+   
+   void addFutureCompletion(Runnable runnable);
+   
+   /** To be called when there are no more operations pending */
+   void complete();
+
+}

Added: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	                        (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-09-26 01:59:13 UTC (rev 7996)
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.replication.impl;
+
+import org.hornetq.core.replication.BackupListener;
+import org.hornetq.core.replication.ReplicationManager;
+import org.hornetq.core.replication.ReplicationToken;
+
+/**
+ * A RepplicationManagerImpl
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationManagerImpl implements ReplicationManager
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.replication.ReplicationManager#addListener(org.hornetq.core.replication.ReplicationListener)
+    */
+   public void addBackupListener(BackupListener listener)
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.replication.ReplicationManager#replicate(byte[], org.hornetq.core.replication.ReplicationToken)
+    */
+   public void replicate(byte[] bytes, ReplicationToken token)
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Added: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationPacketHandler.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationPacketHandler.java	                        (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationPacketHandler.java	2009-09-26 01:59:13 UTC (rev 7996)
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.replication.impl;
+
+import org.hornetq.core.remoting.ChannelHandler;
+import org.hornetq.core.remoting.Packet;
+
+/**
+ * A ReplicationPacketHandler
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationPacketHandler implements ChannelHandler
+{
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.remoting.ChannelHandler#handlePacket(org.hornetq.core.remoting.Packet)
+    */
+   public void handlePacket(Packet packet)
+   {
+   }
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Added: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java	                        (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java	2009-09-26 01:59:13 UTC (rev 7996)
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.replication.impl;
+
+import java.util.ArrayList;
+import java.util.concurrent.Executor;
+
+import org.hornetq.core.replication.ReplicationToken;
+
+/**
+ * A ReplicationToken
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationTokenImpl implements ReplicationToken
+{
+   final Executor executor;
+   
+   private ArrayList<Runnable> tasks;
+   
+   private int pendings;
+   
+   /**
+    * @param executor
+    */
+   public ReplicationTokenImpl(Executor executor)
+   {
+      super();
+      this.executor = executor;
+   }
+
+   /** To be called by the replication manager, when new replication is added to the queue */
+   public synchronized void linedUp()
+   {
+      pendings++;
+   }
+
+   /** To be called by the replication manager, when data is confirmed on the channel */
+   public synchronized void replicated()
+   {
+      if (pendings-- == 0)
+      {
+         if (tasks != null)
+         {
+            for (Runnable run : tasks)
+            {
+               executor.execute(run);
+            }
+            tasks.clear();
+         }
+      }
+   }
+   
+   /** You may have several actions to be done after a replication operation is completed. */
+   public synchronized void addFutureCompletion(Runnable runnable)
+   {
+      if (pendings == 0)
+      {
+         executor.execute(runnable);
+      }
+      else
+      {
+         if (tasks == null)
+         {
+            tasks = new ArrayList<Runnable>();
+         }
+         
+         tasks.add(runnable);
+      }
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.replication.ReplicationToken#complete()
+    */
+   public void complete()
+   {
+      // TODO Auto-generated method stub
+      
+   }
+}

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-09-26 01:57:13 UTC (rev 7995)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-09-26 01:59:13 UTC (rev 7996)
@@ -65,9 +65,11 @@
 import org.hornetq.core.postoffice.impl.LocalQueueBinding;
 import org.hornetq.core.postoffice.impl.PostOfficeImpl;
 import org.hornetq.core.remoting.Channel;
+import org.hornetq.core.remoting.ChannelHandler;
 import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
+import org.hornetq.core.remoting.server.HandlerFactory;
 import org.hornetq.core.remoting.server.RemotingService;
 import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
 import org.hornetq.core.security.CheckType;
@@ -959,8 +961,19 @@
 
       managementService = new ManagementServiceImpl(mbeanServer, configuration, managementConnectorID);
 
+      final HandlerFactory handlerFactory = new HandlerFactory()
+      {
+
+         public ChannelHandler getHandler(RemotingConnection conn, Channel channel)
+         {
+            return new HornetQPacketHandler(HornetQServerImpl.this, channel, conn);
+         }
+         
+      };
+      
       remotingService = new RemotingServiceImpl(configuration,
-                                                this,
+                                                handlerFactory,
+                                                (configuration.isAsyncConnectionExecutionEnabled() ? this.executorFactory : null),
                                                 managementService,
                                                 threadPool,
                                                 scheduledPool,

Added: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	                        (rev 0)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-09-26 01:59:13 UTC (rev 7996)
@@ -0,0 +1,209 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.replication;
+
+import static org.hornetq.tests.util.RandomUtil.randomString;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+
+import javax.management.MBeanServer;
+
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.client.impl.ConnectionManager;
+import org.hornetq.core.client.impl.ConnectionManagerImpl;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.management.ManagementService;
+import org.hornetq.core.management.impl.HornetQServerControlImpl;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.remoting.Channel;
+import org.hornetq.core.remoting.ChannelHandler;
+import org.hornetq.core.remoting.Interceptor;
+import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.impl.RemotingConnectionImpl;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
+import org.hornetq.core.remoting.server.HandlerFactory;
+import org.hornetq.core.remoting.server.RemotingService;
+import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
+import org.hornetq.core.remoting.spi.Connection;
+import org.hornetq.core.security.HornetQSecurityManager;
+import org.hornetq.core.security.Role;
+import org.hornetq.core.server.ActivateCallback;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.QueueFactory;
+import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.cluster.ClusterManager;
+import org.hornetq.core.server.impl.HornetQPacketHandler;
+import org.hornetq.core.settings.HierarchicalRepository;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.core.transaction.ResourceManager;
+import org.hornetq.core.version.Version;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.ExecutorFactory;
+import org.hornetq.utils.HornetQThreadFactory;
+import org.hornetq.utils.SimpleString;
+
+/**
+ * A ReplicationTest
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationTest extends ServiceTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private RemotingService remoting;
+
+   private ThreadFactory tFactory;
+
+   private ExecutorService executor;
+
+   private ConnectionManager connectionManagerLive;
+
+   private ScheduledExecutorService scheduledExecutor;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testBasicConnection() throws Exception
+   {
+
+      RemotingConnection conn = connectionManagerLive.getConnection(1);
+
+      Channel chann = conn.getChannel(2, -1, false);
+
+      chann.close();
+
+   }
+
+   // Package protected ---------------------------------------------
+
+   class LocalHandler implements ChannelHandler
+   {
+
+      final Channel channel;
+
+      /**
+       * @param channel
+       */
+      public LocalHandler(Channel channel)
+      {
+         super();
+         this.channel = channel;
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.remoting.ChannelHandler#handlePacket(org.hornetq.core.remoting.Packet)
+       */
+      public void handlePacket(Packet packet)
+      {
+         channel.send(new NullResponseMessage());
+      }
+
+   }
+
+   HandlerFactory handlerFactory = new HandlerFactory()
+   {
+
+      public ChannelHandler getHandler(RemotingConnection conn, Channel channel)
+      {
+         System.out.println("Created a handler");
+         return new LocalHandler(channel);
+      }
+
+   };
+
+   // Protected -----------------------------------------------------
+
+   protected void setUp() throws Exception
+   {
+      Configuration config = createDefaultConfig(false);
+
+      tFactory = new HornetQThreadFactory("HornetQ-ReplicationTest", false);
+
+      executor = Executors.newCachedThreadPool(tFactory);
+
+      scheduledExecutor = new ScheduledThreadPoolExecutor(10, tFactory);
+
+      remoting = new RemotingServiceImpl(config, handlerFactory, null, null, executor, scheduledExecutor, 0);
+
+      remoting.start();
+
+      TransportConfiguration connectorConfig = new TransportConfiguration(InVMConnectorFactory.class.getName(),
+                                                                          new HashMap<String, Object>(),
+                                                                          randomString());
+
+      List<Interceptor> interceptors = new ArrayList<Interceptor>();
+
+      connectionManagerLive = new ConnectionManagerImpl(null,
+                                                        connectorConfig,
+                                                        null,
+                                                        false,
+                                                        1,
+                                                        ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
+                                                        ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+                                                        ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
+                                                        0,
+                                                        1.0d,
+                                                        0,
+                                                        false,
+                                                        executor,
+                                                        scheduledExecutor,
+                                                        interceptors);
+
+   }
+
+   protected void tearDown() throws Exception
+   {
+      remoting.stop();
+
+      executor.shutdown();
+
+      scheduledExecutor.shutdown();
+
+      remoting = null;
+
+      tFactory = null;
+
+      scheduledExecutor = null;
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}



More information about the hornetq-commits mailing list