[hornetq-commits] JBoss hornetq SVN: r7996 - in branches/Replication_Clebert: src/main/org/hornetq/core/remoting/server and 6 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri Sep 25 21:59:14 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-09-25 21:59:13 -0400 (Fri, 25 Sep 2009)
New Revision: 7996
Added:
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/server/HandlerFactory.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/
branches/Replication_Clebert/src/main/org/hornetq/core/replication/BackupListener.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationToken.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationPacketHandler.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Modified:
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
Just backing up ideas.. a lot of this will change
Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/server/HandlerFactory.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/server/HandlerFactory.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/server/HandlerFactory.java 2009-09-26 01:59:13 UTC (rev 7996)
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.server;
+
+import org.hornetq.core.remoting.Channel;
+import org.hornetq.core.remoting.ChannelHandler;
+import org.hornetq.core.remoting.RemotingConnection;
+
+/**
+ * The RemotingService could be used by either the Replication Manager or by the Server.
+ *
+ * Each will need to use a different Handler, so this factory may be used to pass what handler needs to be created
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface HandlerFactory
+{
+ ChannelHandler getHandler(RemotingConnection conn, Channel channel);
+}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2009-09-26 01:57:13 UTC (rev 7995)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2009-09-26 01:59:13 UTC (rev 7996)
@@ -15,7 +15,6 @@
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.DISCONNECT;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -43,6 +42,7 @@
import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
import org.hornetq.core.remoting.impl.wireformat.Ping;
+import org.hornetq.core.remoting.server.HandlerFactory;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.remoting.spi.Acceptor;
import org.hornetq.core.remoting.spi.AcceptorFactory;
@@ -52,6 +52,7 @@
import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.impl.HornetQPacketHandler;
+import org.hornetq.utils.ExecutorFactory;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -83,8 +84,10 @@
private final Configuration config;
- private volatile HornetQServer server;
+ private final ExecutorFactory executorFactory;
+ private final HandlerFactory handlerFactory;
+
private ManagementService managementService;
private volatile RemotingConnection serverSideReplicatingConnection;
@@ -102,7 +105,8 @@
// Constructors --------------------------------------------------
public RemotingServiceImpl(final Configuration config,
- final HornetQServer server,
+ final HandlerFactory handlerFactory,
+ final ExecutorFactory executorFactory,
final ManagementService managementService,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool,
@@ -110,6 +114,10 @@
{
transportConfigs = config.getAcceptorConfigurations();
+ this.executorFactory = executorFactory;
+
+ this.handlerFactory = handlerFactory;
+
ClassLoader loader = Thread.currentThread().getContextClassLoader();
for (String interceptorClass : config.getInterceptorClassNames())
{
@@ -125,7 +133,6 @@
}
this.config = config;
- this.server = server;
this.managementService = managementService;
this.threadPool = threadPool;
this.scheduledThreadPool = scheduledThreadPool;
@@ -176,7 +183,7 @@
// Remove this code when this is implemented without having to require a special acceptor
// https://jira.jboss.org/jira/browse/JBMESSAGING-1649
- if (config.isJMXManagementEnabled())
+ if (config.isJMXManagementEnabled() && managementService != null)
{
Map<String, Object> params = new HashMap<String, Object>();
@@ -188,12 +195,9 @@
acceptors.add(acceptor);
- if (managementService != null)
- {
- TransportConfiguration info = new TransportConfiguration(InVMAcceptorFactory.class.getName(), params);
+ TransportConfiguration info = new TransportConfiguration(InVMAcceptorFactory.class.getName(), params);
- managementService.registerAcceptor(acceptor, info);
- }
+ managementService.registerAcceptor(acceptor, info);
}
for (Acceptor a : acceptors)
@@ -233,28 +237,31 @@
}
failureCheckThread.close();
-
+
// We need to stop them accepting first so no new connections are accepted after we send the disconnect message
for (Acceptor acceptor : acceptors)
{
acceptor.pause();
}
-
+
for (ConnectionEntry entry : connections.values())
- {
+ {
entry.connection.getChannel(0, -1, false).sendAndFlush(new PacketImpl(DISCONNECT));
}
-
+
for (Acceptor acceptor : acceptors)
{
acceptor.stop();
}
-
+
acceptors.clear();
connections.clear();
- managementService.unregisterAcceptors();
+ if (managementService != null)
+ {
+ managementService.unregisterAcceptors();
+ }
started = false;
}
@@ -299,31 +306,14 @@
public void connectionCreated(final Connection connection)
{
- if (server == null)
- {
- throw new IllegalStateException("Unable to create connection, server hasn't finished starting up");
- }
-
- RemotingConnection rc = new RemotingConnectionImpl(connection,
- interceptors,
- server.getConfiguration().isAsyncConnectionExecutionEnabled() ? server.getExecutorFactory()
- .getExecutor()
- : null);
+ RemotingConnection rc = createChannel(connection);
- Channel channel1 = rc.getChannel(1, -1, false);
-
- ChannelHandler handler = new HornetQPacketHandler(server, channel1, rc);
-
- channel1.setHandler(handler);
-
long ttl = ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
if (config.getConnectionTTLOverride() != -1)
{
ttl = config.getConnectionTTLOverride();
}
- final ConnectionEntry entry = new ConnectionEntry(rc,
- System.currentTimeMillis(),
- ttl);
+ final ConnectionEntry entry = new ConnectionEntry(rc, System.currentTimeMillis(), ttl);
connections.put(connection.getID(), entry);
@@ -402,6 +392,29 @@
// Protected -----------------------------------------------------
+ protected List<Interceptor> getInterceptors()
+ {
+ return this.interceptors;
+ }
+
+ /**
+ * Subclasses (on tests) may use this to create a different channel.
+ */
+ protected RemotingConnection createChannel(final Connection connection)
+ {
+ RemotingConnection rc = new RemotingConnectionImpl(connection,
+ interceptors,
+ executorFactory != null ? executorFactory.getExecutor() : null);
+
+ Channel channel1 = rc.getChannel(1, -1, false);
+
+ ChannelHandler handler = handlerFactory.getHandler(rc, channel1);
+
+ channel1.setHandler(handler);
+
+ return rc;
+ }
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
@@ -467,7 +480,7 @@
}
public void run()
- {
+ {
while (!closed)
{
long now = System.currentTimeMillis();
@@ -499,9 +512,9 @@
RemotingConnection conn = removeConnection(id);
HornetQException me = new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
- "Did not receive ping from " + conn.getRemoteAddress() +
- ". It is likely the client has exited or crashed without " +
- "closing its connection, or the network between the server and client has failed. The connection will now be closed.");
+ "Did not receive ping from " + conn.getRemoteAddress() +
+ ". It is likely the client has exited or crashed without " +
+ "closing its connection, or the network between the server and client has failed. The connection will now be closed.");
conn.fail(me);
}
Added: branches/Replication_Clebert/src/main/org/hornetq/core/replication/BackupListener.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/BackupListener.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/BackupListener.java 2009-09-26 01:59:13 UTC (rev 7996)
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.replication;
+
+/**
+ * A ReplicationListener
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface BackupListener
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ void onReplication(byte data[]);
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-09-26 01:59:13 UTC (rev 7996)
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.replication;
+
+
+/**
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface ReplicationManager
+{
+ void replicate(byte[] bytes, ReplicationToken token);
+
+
+ /** to be used on the backup node only */
+ void addBackupListener(BackupListener listener);
+}
Added: branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationToken.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationToken.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationToken.java 2009-09-26 01:59:13 UTC (rev 7996)
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.replication;
+
+
+/**
+ * This represents a set of operations done as part of replication.
+ * When the entire set is done a group of Runnables can be executed.
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface ReplicationToken
+{
+ /** To be called by the replication manager, when new replication is added to the queue */
+ void linedUp();
+
+ /** To be called by the replication manager, when data is confirmed on the channel */
+ void replicated();
+
+ void addFutureCompletion(Runnable runnable);
+
+ /** To be called when there are no more operations pending */
+ void complete();
+
+}
Added: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-09-26 01:59:13 UTC (rev 7996)
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.replication.impl;
+
+import org.hornetq.core.replication.BackupListener;
+import org.hornetq.core.replication.ReplicationManager;
+import org.hornetq.core.replication.ReplicationToken;
+
+/**
+ * A RepplicationManagerImpl
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationManagerImpl implements ReplicationManager
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#addListener(org.hornetq.core.replication.ReplicationListener)
+ */
+ public void addBackupListener(BackupListener listener)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#replicate(byte[], org.hornetq.core.replication.ReplicationToken)
+ */
+ public void replicate(byte[] bytes, ReplicationToken token)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationPacketHandler.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationPacketHandler.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationPacketHandler.java 2009-09-26 01:59:13 UTC (rev 7996)
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.replication.impl;
+
+import org.hornetq.core.remoting.ChannelHandler;
+import org.hornetq.core.remoting.Packet;
+
+/**
+ * A ReplicationPacketHandler
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationPacketHandler implements ChannelHandler
+{
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.remoting.ChannelHandler#handlePacket(org.hornetq.core.remoting.Packet)
+ */
+ public void handlePacket(Packet packet)
+ {
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java 2009-09-26 01:59:13 UTC (rev 7996)
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.replication.impl;
+
+import java.util.ArrayList;
+import java.util.concurrent.Executor;
+
+import org.hornetq.core.replication.ReplicationToken;
+
+/**
+ * A ReplicationToken
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationTokenImpl implements ReplicationToken
+{
+ final Executor executor;
+
+ private ArrayList<Runnable> tasks;
+
+ private int pendings;
+
+ /**
+ * @param executor
+ */
+ public ReplicationTokenImpl(Executor executor)
+ {
+ super();
+ this.executor = executor;
+ }
+
+ /** To be called by the replication manager, when new replication is added to the queue */
+ public synchronized void linedUp()
+ {
+ pendings++;
+ }
+
+ /** To be called by the replication manager, when data is confirmed on the channel */
+ public synchronized void replicated()
+ {
+ if (pendings-- == 0)
+ {
+ if (tasks != null)
+ {
+ for (Runnable run : tasks)
+ {
+ executor.execute(run);
+ }
+ tasks.clear();
+ }
+ }
+ }
+
+ /** You may have several actions to be done after a replication operation is completed. */
+ public synchronized void addFutureCompletion(Runnable runnable)
+ {
+ if (pendings == 0)
+ {
+ executor.execute(runnable);
+ }
+ else
+ {
+ if (tasks == null)
+ {
+ tasks = new ArrayList<Runnable>();
+ }
+
+ tasks.add(runnable);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationToken#complete()
+ */
+ public void complete()
+ {
+ // TODO Auto-generated method stub
+
+ }
+}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-09-26 01:57:13 UTC (rev 7995)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-09-26 01:59:13 UTC (rev 7996)
@@ -65,9 +65,11 @@
import org.hornetq.core.postoffice.impl.LocalQueueBinding;
import org.hornetq.core.postoffice.impl.PostOfficeImpl;
import org.hornetq.core.remoting.Channel;
+import org.hornetq.core.remoting.ChannelHandler;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
+import org.hornetq.core.remoting.server.HandlerFactory;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
import org.hornetq.core.security.CheckType;
@@ -959,8 +961,19 @@
managementService = new ManagementServiceImpl(mbeanServer, configuration, managementConnectorID);
+ final HandlerFactory handlerFactory = new HandlerFactory()
+ {
+
+ public ChannelHandler getHandler(RemotingConnection conn, Channel channel)
+ {
+ return new HornetQPacketHandler(HornetQServerImpl.this, channel, conn);
+ }
+
+ };
+
remotingService = new RemotingServiceImpl(configuration,
- this,
+ handlerFactory,
+ (configuration.isAsyncConnectionExecutionEnabled() ? this.executorFactory : null),
managementService,
threadPool,
scheduledPool,
Added: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java (rev 0)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-09-26 01:59:13 UTC (rev 7996)
@@ -0,0 +1,209 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.replication;
+
+import static org.hornetq.tests.util.RandomUtil.randomString;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+
+import javax.management.MBeanServer;
+
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.client.impl.ConnectionManager;
+import org.hornetq.core.client.impl.ConnectionManagerImpl;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.management.ManagementService;
+import org.hornetq.core.management.impl.HornetQServerControlImpl;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.remoting.Channel;
+import org.hornetq.core.remoting.ChannelHandler;
+import org.hornetq.core.remoting.Interceptor;
+import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.impl.RemotingConnectionImpl;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
+import org.hornetq.core.remoting.server.HandlerFactory;
+import org.hornetq.core.remoting.server.RemotingService;
+import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
+import org.hornetq.core.remoting.spi.Connection;
+import org.hornetq.core.security.HornetQSecurityManager;
+import org.hornetq.core.security.Role;
+import org.hornetq.core.server.ActivateCallback;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.QueueFactory;
+import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.cluster.ClusterManager;
+import org.hornetq.core.server.impl.HornetQPacketHandler;
+import org.hornetq.core.settings.HierarchicalRepository;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.core.transaction.ResourceManager;
+import org.hornetq.core.version.Version;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.ExecutorFactory;
+import org.hornetq.utils.HornetQThreadFactory;
+import org.hornetq.utils.SimpleString;
+
+/**
+ * A ReplicationTest
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private RemotingService remoting;
+
+ private ThreadFactory tFactory;
+
+ private ExecutorService executor;
+
+ private ConnectionManager connectionManagerLive;
+
+ private ScheduledExecutorService scheduledExecutor;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testBasicConnection() throws Exception
+ {
+
+ RemotingConnection conn = connectionManagerLive.getConnection(1);
+
+ Channel chann = conn.getChannel(2, -1, false);
+
+ chann.close();
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ class LocalHandler implements ChannelHandler
+ {
+
+ final Channel channel;
+
+ /**
+ * @param channel
+ */
+ public LocalHandler(Channel channel)
+ {
+ super();
+ this.channel = channel;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.remoting.ChannelHandler#handlePacket(org.hornetq.core.remoting.Packet)
+ */
+ public void handlePacket(Packet packet)
+ {
+ channel.send(new NullResponseMessage());
+ }
+
+ }
+
+ HandlerFactory handlerFactory = new HandlerFactory()
+ {
+
+ public ChannelHandler getHandler(RemotingConnection conn, Channel channel)
+ {
+ System.out.println("Created a handler");
+ return new LocalHandler(channel);
+ }
+
+ };
+
+ // Protected -----------------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+ Configuration config = createDefaultConfig(false);
+
+ tFactory = new HornetQThreadFactory("HornetQ-ReplicationTest", false);
+
+ executor = Executors.newCachedThreadPool(tFactory);
+
+ scheduledExecutor = new ScheduledThreadPoolExecutor(10, tFactory);
+
+ remoting = new RemotingServiceImpl(config, handlerFactory, null, null, executor, scheduledExecutor, 0);
+
+ remoting.start();
+
+ TransportConfiguration connectorConfig = new TransportConfiguration(InVMConnectorFactory.class.getName(),
+ new HashMap<String, Object>(),
+ randomString());
+
+ List<Interceptor> interceptors = new ArrayList<Interceptor>();
+
+ connectionManagerLive = new ConnectionManagerImpl(null,
+ connectorConfig,
+ null,
+ false,
+ 1,
+ ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
+ ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
+ 0,
+ 1.0d,
+ 0,
+ false,
+ executor,
+ scheduledExecutor,
+ interceptors);
+
+ }
+
+ protected void tearDown() throws Exception
+ {
+ remoting.stop();
+
+ executor.shutdown();
+
+ scheduledExecutor.shutdown();
+
+ remoting = null;
+
+ tFactory = null;
+
+ scheduledExecutor = null;
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
More information about the hornetq-commits
mailing list