[jboss-cvs] JBoss Messaging SVN: r4736 - in trunk/src/main/org/jboss/messaging: core/remoting/spi and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jul 28 13:02:55 EDT 2008
Author: timfox
Date: 2008-07-28 13:02:54 -0400 (Mon, 28 Jul 2008)
New Revision: 4736
Added:
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
trunk/src/main/org/jboss/messaging/core/remoting/spi/Connector.java
trunk/src/main/org/jboss/messaging/util/JBMThreadFactory.java
Log:
Some missing files
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2008-07-28 17:02:54 UTC (rev 4736)
@@ -0,0 +1,245 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl.mina;
+
+import java.net.InetSocketAddress;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.DefaultIoFilterChainBuilder;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoService;
+import org.apache.mina.common.IoServiceListener;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.transport.socket.SocketConnector;
+import org.apache.mina.transport.socket.SocketSessionConfig;
+import org.apache.mina.transport.socket.nio.NioSocketConnector;
+import org.jboss.messaging.core.client.ConnectionParams;
+import org.jboss.messaging.core.client.Location;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.ConnectionLifeCycleListener;
+import org.jboss.messaging.core.remoting.RemotingHandler;
+import org.jboss.messaging.core.remoting.spi.Connector;
+import org.jboss.messaging.core.remoting.spi.Connection;
+
+/**
+ *
+ * A MinaConnector
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class MinaConnector implements Connector
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(MinaConnection.class);
+
+ // Attributes ----------------------------------------------------
+
+ private SocketConnector connector;
+
+ private final RemotingHandler handler;
+
+ private final Location location;
+
+ private final ConnectionLifeCycleListener listener;
+
+ private final ConnectionParams params;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public MinaConnector(final Location location, final ConnectionParams params,
+ final RemotingHandler handler,
+ final ConnectionLifeCycleListener listener)
+ {
+ if (location == null)
+ {
+ throw new IllegalArgumentException("Invalid argument null location");
+ }
+
+ if (params == null)
+ {
+ throw new IllegalArgumentException("Invalid argument null connection params");
+ }
+
+ if (handler == null)
+ {
+ throw new IllegalArgumentException("Invalid argument null handler");
+ }
+
+ if (listener == null)
+ {
+ throw new IllegalArgumentException("Invalid argument null listener");
+ }
+
+ this.handler = handler;
+ this.location = location;
+ this.listener = listener;
+ this.params = params;
+ }
+
+ public synchronized void start()
+ {
+ if (connector != null)
+ {
+ return;
+ }
+
+ connector = new NioSocketConnector();
+
+ SocketSessionConfig connectorConfig = connector.getSessionConfig();
+
+ DefaultIoFilterChainBuilder filterChain = connector.getFilterChain();
+
+ connector.setSessionDataStructureFactory(new MessagingIOSessionDataStructureFactory());
+
+ FilterChainSupport.addCodecFilter(filterChain, handler);
+
+ connectorConfig.setTcpNoDelay(params.isTcpNoDelay());
+ if (params.getTcpReceiveBufferSize() != -1)
+ {
+ connectorConfig.setReceiveBufferSize(params.getTcpReceiveBufferSize());
+ }
+ if (params.getTcpSendBufferSize() != -1)
+ {
+ connectorConfig.setSendBufferSize(params.getTcpSendBufferSize());
+ }
+ connectorConfig.setKeepAlive(true);
+ connectorConfig.setReuseAddress(true);
+
+ if (params.isSSLEnabled())
+ {
+ try
+ {
+ FilterChainSupport.addSSLFilter(filterChain, true, params.getKeyStorePath(), params.getKeyStorePassword(), null, null);
+ }
+ catch (Exception e)
+ {
+ IllegalStateException ise = new IllegalStateException("Unable to create MinaConnection for " + location);
+ ise.initCause(e);
+ throw ise;
+ }
+ }
+
+
+ connector.setHandler(new MinaHandler());
+
+ connector.addListener(new ServiceListener());
+ }
+
+ public synchronized void close()
+ {
+ if (connector != null)
+ {
+ connector.dispose();
+ }
+ }
+
+ public Connection createConnection()
+ {
+ InetSocketAddress address = new InetSocketAddress(location.getHost(), location.getPort());
+ ConnectFuture future = connector.connect(address);
+ connector.setDefaultRemoteAddress(address);
+
+ future.awaitUninterruptibly();
+
+ if (future.isConnected())
+ {
+ IoSession session = future.getSession();
+
+ return new MinaConnection(session);
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+ private final class ServiceListener implements IoServiceListener
+ {
+ private ServiceListener()
+ {
+ }
+
+ public void serviceActivated(IoService service)
+ {
+ }
+
+ public void serviceDeactivated(IoService service)
+ {
+ }
+
+ public void serviceIdle(IoService service, IdleStatus idleStatus)
+ {
+ }
+
+ public void sessionCreated(IoSession session)
+ {
+ }
+
+ public void sessionDestroyed(IoSession session)
+ {
+ listener.connectionDestroyed(session.getId());
+ }
+ }
+
+ private final class MinaHandler extends IoHandlerAdapter
+ {
+ public void exceptionCaught(final IoSession session, final Throwable cause)
+ throws Exception
+ {
+ log.error("caught exception " + cause + " for session " + session, cause);
+
+ MessagingException me = new MessagingException(MessagingException.INTERNAL_ERROR, "MINA exception");
+
+ me.initCause(cause);
+
+ listener.connectionException(session.getId(), me);
+ }
+
+ public void messageReceived(final IoSession session, final Object message)
+ throws Exception
+ {
+ IoBuffer buffer = (IoBuffer) message;
+
+ handler.bufferReceived(session.getId(), new IoBufferWrapper(buffer));
+ }
+ }
+
+}
Added: trunk/src/main/org/jboss/messaging/core/remoting/spi/Connector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/spi/Connector.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/spi/Connector.java 2008-07-28 17:02:54 UTC (rev 4736)
@@ -0,0 +1,42 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.spi;
+
+
+
+/**
+ *
+ * A Connector
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface Connector
+{
+ void start();
+
+ void close();
+
+ //Actually we should allow many connections to be created but in the case of our connection registry we register
+ //a callback which closes the connector - so it's actually one to one
+ Connection createConnection();
+}
Added: trunk/src/main/org/jboss/messaging/util/JBMThreadFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/JBMThreadFactory.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/util/JBMThreadFactory.java 2008-07-28 17:02:54 UTC (rev 4736)
@@ -0,0 +1,51 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.util;
+
+import java.util.concurrent.ThreadFactory;
+
+/**
+ *
+ * A JBMThreadFactory
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class JBMThreadFactory implements ThreadFactory
+{
+ private ThreadGroup group;
+
+ public JBMThreadFactory(final String groupName)
+ {
+ this.group = new ThreadGroup(groupName);
+ }
+
+ public Thread newThread(final Runnable command)
+ {
+ Thread t = new Thread(group, command);
+
+ //Don't want to prevent VM from exiting
+ t.setDaemon(true);
+
+ return t;
+ }
+}
More information about the jboss-cvs-commits
mailing list