[jboss-cvs] JBoss Messaging SVN: r4807 - in trunk: src/main/org/jboss/messaging/core/remoting and 9 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Aug 18 02:38:52 EDT 2008
Author: trustin
Date: 2008-08-18 02:38:52 -0400 (Mon, 18 Aug 2008)
New Revision: 4807
Added:
trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/
trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/ChannelBufferWrapper.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/ChannelPipelineSupport.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/MessagingFrameDecoder.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyAcceptor.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyAcceptorFactory.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnection.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnector.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnectorFactory.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/
trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/ChannelBufferWrapper2Test.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/ChannelBufferWrapperTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerPacketHandler.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerPacketHandler.java
trunk/src/main/org/jboss/messaging/core/remoting/ConnectionLifeCycleListener.java
trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java
trunk/src/main/org/jboss/messaging/core/remoting/PacketHandler.java
trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
trunk/src/main/org/jboss/messaging/core/remoting/RemotingHandler.java
trunk/src/main/org/jboss/messaging/core/remoting/RemotingService.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/ResponseHandlerImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnection.java
trunk/src/main/org/jboss/messaging/core/remoting/spi/Connection.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerBrowserImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionPacketHandler.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerPacketHandler.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerPacketHandler.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
trunk/tests/src/org/jboss/messaging/tests/timing/core/remoting/impl/RemotingHandlerImplTest.java
trunk/tests/src/org/jboss/messaging/tests/timing/core/remoting/impl/RemotingServiceImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/MessagingBufferTestBase.java
Log:
* Added Netty transport and its related unit tests
* Changed connectionID type from Long to Object because Netty uses UUID as a connectionID
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerPacketHandler.java 2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerPacketHandler.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,7 +18,7 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.client.impl;
@@ -46,7 +46,7 @@
public ClientConsumerPacketHandler(final ClientConsumerInternal clientConsumer, final long consumerID)
{
this.clientConsumer = clientConsumer;
-
+
this.consumerID = consumerID;
}
@@ -55,14 +55,14 @@
return consumerID;
}
- public void handle(final long sessionID, final Packet packet)
+ public void handle(final Object connectionID, final Packet packet)
{
byte type = packet.getType();
-
+
if (type == PacketImpl.RECEIVE_MSG)
{
ReceiveMessage message = (ReceiveMessage) packet;
-
+
try
{
clientConsumer.handleMessage(message.getClientMessage());
@@ -83,16 +83,17 @@
{
return "ClientConsumerPacketHandler[id=" + consumerID + "]";
}
-
+
+ @Override
public boolean equals(Object other)
{
if (other instanceof ClientConsumerPacketHandler == false)
{
return false;
}
-
+
ClientConsumerPacketHandler r = (ClientConsumerPacketHandler)other;
-
- return r.consumerID == this.consumerID;
+
+ return r.consumerID == consumerID;
}
}
\ No newline at end of file
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerPacketHandler.java 2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerPacketHandler.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,7 +18,7 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.client.impl;
@@ -29,9 +29,9 @@
import org.jboss.messaging.core.remoting.impl.wireformat.ProducerFlowCreditMessage;
/**
- *
+ *
* A ClientProducerPacketHandler
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
*/
@@ -46,7 +46,7 @@
public ClientProducerPacketHandler(final ClientProducerInternal clientProducer, final long producerID)
{
this.clientProducer = clientProducer;
-
+
this.producerID = producerID;
}
@@ -55,14 +55,14 @@
return producerID;
}
- public void handle(final long sessionID, final Packet packet)
- {
+ public void handle(final Object connectionID, final Packet packet)
+ {
byte type = packet.getType();
-
+
if (type == PacketImpl.PROD_RECEIVETOKENS)
{
ProducerFlowCreditMessage message = (ProducerFlowCreditMessage) packet;
-
+
try
{
clientProducer.receiveCredits(message.getTokens());
@@ -75,7 +75,7 @@
else
{
throw new IllegalStateException("Invalid packet: " + type);
- }
+ }
}
@Override
@@ -83,16 +83,17 @@
{
return "ClientProducerPacketHandler[id=" + producerID + "]";
}
-
+
+ @Override
public boolean equals(Object other)
{
if (other instanceof ClientProducerPacketHandler == false)
{
return false;
}
-
+
ClientProducerPacketHandler r = (ClientProducerPacketHandler)other;
-
- return r.producerID == this.producerID;
+
+ return r.producerID == producerID;
}
}
\ No newline at end of file
Modified: trunk/src/main/org/jboss/messaging/core/remoting/ConnectionLifeCycleListener.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/ConnectionLifeCycleListener.java 2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/remoting/ConnectionLifeCycleListener.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -25,17 +25,17 @@
import org.jboss.messaging.core.remoting.spi.Connection;
/**
- *
+ *
* A ConnectionLifeCycleListener
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
*/
public interface ConnectionLifeCycleListener
{
void connectionCreated(Connection connection);
-
- void connectionDestroyed(long connectionID);
-
- void connectionException(long connectionID, MessagingException me);
+
+ void connectionDestroyed(Object connectionID);
+
+ void connectionException(Object connectionID, MessagingException me);
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java 2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,7 +18,7 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.remoting;
@@ -34,13 +34,13 @@
void unregister(long handlerID);
- void dispatch(long connectionID, Packet packet) throws Exception;
+ void dispatch(Object connectionID, Packet packet) throws Exception;
long generateID();
-
+
void addInterceptor(Interceptor filter);
void removeInterceptor(Interceptor filter);
-
+
List<Interceptor> getInterceptors();
}
\ No newline at end of file
Modified: trunk/src/main/org/jboss/messaging/core/remoting/PacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/PacketHandler.java 2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/remoting/PacketHandler.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,7 +18,7 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.remoting;
@@ -26,20 +26,20 @@
/**
* A PacketHandler handles packets (as defined by {@link Packet} and its
* subclasses).
- *
+ *
* It must have an ID unique among all PacketHandlers (or at least among those
* registered into the same RemoteDispatcher).
- *
+ *
* @see PacketDispatcher#register(PacketHandler)
* @see PacketDispatcher#unregister(long)
- *
+ *
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
- *
+ *
* @version <tt>$Revision$</tt>
*/
public interface PacketHandler
{
long getID();
- void handle(long connectionID, Packet packet);
+ void handle(Object connectionID, Packet packet);
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java 2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,7 +18,7 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.remoting;
@@ -26,36 +26,36 @@
import org.jboss.messaging.core.exception.MessagingException;
/**
- *
+ *
* A RemotingConnection
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
*
*/
public interface RemotingConnection
{
- long getID();
-
+ Object getID();
+
Packet sendBlocking(long targetID, long executorID, Packet packet) throws MessagingException;
-
+
void sendOneWay(long targetID, long executorID, Packet packet);
-
+
Packet sendBlocking(Packet packet) throws MessagingException;
-
+
void sendOneWay(Packet packet);
-
+
void addFailureListener(FailureListener listener);
-
+
boolean removeFailureListener(FailureListener listener);
-
+
PacketDispatcher getPacketDispatcher();
-
+
Location getLocation();
-
+
MessagingBuffer createBuffer(int size);
-
+
void fail(MessagingException me);
-
+
void destroy();
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/RemotingHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/RemotingHandler.java 2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/remoting/RemotingHandler.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,27 +18,27 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.remoting;
import java.util.Set;
/**
- *
+ *
* A RemotingHandler
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
*/
public interface RemotingHandler
{
- void bufferReceived(long connectionID, MessagingBuffer buffer) throws Exception;
-
+ void bufferReceived(Object connectionID, MessagingBuffer buffer) throws Exception;
+
int isReadyToHandle(MessagingBuffer buffer);
-
+
void closeExecutor(long executorID);
-
- Set<Long> scanForFailedConnections(long expirePeriod);
-
- void removeLastPing(long connectionID);
+
+ Set<Object> scanForFailedConnections(long expirePeriod);
+
+ void removeLastPing(Object connectionID);
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/RemotingService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/RemotingService.java 2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/remoting/RemotingService.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,7 +18,7 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.remoting;
@@ -37,11 +37,11 @@
{
PacketDispatcher getDispatcher();
- RemotingConnection getConnection(long remotingConnectionID);
-
+ RemotingConnection getConnection(Object remotingConnectionID);
+
Set<RemotingConnection> getConnections();
-
+
void registerAcceptorFactory(AcceptorFactory factory);
-
+
void unregisterAcceptorFactory(AcceptorFactory factory);
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java 2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,7 +18,7 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.remoting.impl;
@@ -36,9 +36,9 @@
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.RemotingHandler;
import org.jboss.messaging.core.remoting.TransportType;
+import org.jboss.messaging.core.remoting.spi.Connection;
import org.jboss.messaging.core.remoting.spi.Connector;
import org.jboss.messaging.core.remoting.spi.ConnectorFactory;
-import org.jboss.messaging.core.remoting.spi.Connection;
import org.jboss.messaging.util.JBMThreadFactory;
/**
@@ -49,23 +49,23 @@
public class ConnectionRegistryImpl implements ConnectionRegistry, ConnectionLifeCycleListener
{
// Constants -----------------------------------------------------
-
+
public static final Logger log = Logger.getLogger(ConnectionRegistryImpl.class);
-
+
// Attributes ----------------------------------------------------
- private Map<String, ConnectionHolder> connections = new HashMap<String, ConnectionHolder>();
-
- private Map<TransportType, ConnectorFactory> connectorFactories = new HashMap<TransportType, ConnectorFactory>();
-
- private Map<Long, RemotingConnection> remotingConnections = new HashMap<Long, RemotingConnection>();
-
+ private final Map<String, ConnectionHolder> connections = new HashMap<String, ConnectionHolder>();
+
+ private final Map<TransportType, ConnectorFactory> connectorFactories = new HashMap<TransportType, ConnectorFactory>();
+
+ private final Map<Object, RemotingConnection> remotingConnections = new HashMap<Object, RemotingConnection>();
+
//TODO - core pool size should be configurable
- private ScheduledThreadPoolExecutor pingExecutor = new ScheduledThreadPoolExecutor(20, new JBMThreadFactory("jbm-pinger-threads"));
+ private final ScheduledThreadPoolExecutor pingExecutor = new ScheduledThreadPoolExecutor(20, new JBMThreadFactory("jbm-pinger-threads"));
// Static --------------------------------------------------------
-
+
// ConnectionRegistry implementation -----------------------------
public synchronized RemotingConnection getConnection(final Location location, final ConnectionParams connectionParams)
@@ -73,11 +73,11 @@
String key = location.getLocation();
ConnectionHolder holder = connections.get(key);
-
+
if (holder != null)
- {
+ {
holder.increment();
-
+
RemotingConnection connection = holder.getConnection();
if (log.isDebugEnabled())
@@ -88,22 +88,22 @@
return connection;
}
else
- {
+ {
PacketDispatcher dispatcher = new PacketDispatcherImpl(null);
-
+
RemotingHandler handler = new RemotingHandlerImpl(dispatcher, null);
-
+
Connector connector = createConnector(location, connectionParams, handler, this);
-
+
connector.start();
-
+
Connection tc = connector.createConnection();
-
+
if (tc == null)
{
throw new IllegalStateException("Failed to connect to " + location);
}
-
+
long pingInterval = connectionParams.getPingInterval();
RemotingConnection connection;
if (pingInterval != -1)
@@ -118,26 +118,26 @@
}
remotingConnections.put(tc.getID(), connection);
-
+
if (log.isDebugEnabled())
{
log.debug("Created " + connector + " to connect to " + location);
}
-
+
holder = new ConnectionHolder(connection, connector);
-
+
connections.put(key, holder);
-
+
return connection;
}
}
-
+
public synchronized void returnConnection(final Location location)
{
String key = location.getLocation();
ConnectionHolder holder = connections.get(key);
-
+
if (holder == null)
{
throw new IllegalStateException("No connection for location " + key);
@@ -149,13 +149,13 @@
{
log.debug("Removed connection for " + key);
}
-
+
RemotingConnection conn = remotingConnections.remove(holder.getConnection().getID());
-
+
conn.destroy();
-
+
holder.getConnector().close();
-
+
connections.remove(key);
}
else
@@ -164,15 +164,15 @@
if (log.isDebugEnabled())
{
log.debug(holder.getCount() + " remaining references to " + key);
- }
+ }
}
}
public synchronized int size()
{
- return this.connections.size();
+ return connections.size();
}
-
+
public synchronized void registerConnectorFactory(final TransportType transport, final ConnectorFactory factory)
{
connectorFactories.put(transport, factory);
@@ -182,7 +182,7 @@
{
connectorFactories.remove(transport);
}
-
+
public synchronized ConnectorFactory getConnectorFactory(final TransportType transport)
{
return connectorFactories.get(transport);
@@ -191,51 +191,51 @@
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
-
+
// ConnectionLifeCycleListener implementation --------------------
-
+
public void connectionCreated(final Connection connection)
- {
+ {
}
- public void connectionDestroyed(final long connectionID)
- {
+ public void connectionDestroyed(final Object connectionID)
+ {
RemotingConnection conn = remotingConnections.remove(connectionID);
-
+
if (conn != null)
{
ConnectionHolder holder = connections.remove(conn.getLocation().getLocation());
-
+
//If conn still exists here this means that the underlying transport connection has been closed from the server side without
//being returned from the client side so we need to fail the connection and call it's listeners
MessagingException me = new MessagingException(MessagingException.OBJECT_CLOSED,
"The connection has been closed.");
conn.fail(me);
-
- holder.getConnector().close();
+
+ holder.getConnector().close();
}
}
- public void connectionException(final long connectionID, final MessagingException me)
+ public void connectionException(final Object connectionID, final MessagingException me)
{
RemotingConnection conn = remotingConnections.remove(connectionID);
-
+
if (conn == null)
{
throw new IllegalStateException("Cannot find connection with id " + connectionID);
}
-
+
ConnectionHolder holder = connections.remove(conn.getLocation().getLocation());
-
+
conn.fail(me);
-
+
holder.getConnector().close();
}
-
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
-
+
// Private -------------------------------------------------------
private Connector createConnector(final Location location,
@@ -244,7 +244,7 @@
final ConnectionLifeCycleListener listener)
{
ConnectorFactory factory = connectorFactories.get(location.getTransport());
-
+
if (factory == null)
{
throw new IllegalStateException("No connector factory registered for transport " + location.getTransport());
@@ -258,9 +258,9 @@
private static class ConnectionHolder
{
private final RemotingConnection connection;
-
+
private final Connector connector;
-
+
private int count;
public ConnectionHolder(final RemotingConnection connection, final Connector connector)
@@ -269,7 +269,7 @@
this.connection = connection;
this.connector = connector;
- this.count = 1;
+ count = 1;
}
public void increment()
@@ -291,7 +291,7 @@
{
return connection;
}
-
+
public Connector getConnector()
{
return connector;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java 2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,7 +18,7 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.remoting.impl;
@@ -56,7 +56,7 @@
private final AtomicLong idSequence = new AtomicLong(0);
- private List<Interceptor> interceptors = new CopyOnWriteArrayList<Interceptor>();
+ private final List<Interceptor> interceptors = new CopyOnWriteArrayList<Interceptor>();
// Static --------------------------------------------------------
@@ -67,7 +67,7 @@
handlers = new ConcurrentHashMap<Long, PacketHandler>();
if (filters != null)
{
- this.interceptors.addAll(filters);
+ interceptors.addAll(filters);
}
}
@@ -115,21 +115,21 @@
{
interceptors.remove(filter);
}
-
+
public List<Interceptor> getInterceptors()
{
return new ArrayList<Interceptor>(interceptors);
}
- public void dispatch(final long connectionID, final Packet packet) throws Exception
+ public void dispatch(final Object connectionID, final Packet packet) throws Exception
{
long targetID = packet.getTargetID();
-
+
PacketHandler handler = getHandler(targetID);
-
+
if (handler != null)
{
- if (callInterceptors(packet))
+ if (callInterceptors(packet))
{
handler.handle(connectionID, packet);
}
@@ -161,7 +161,7 @@
{
log.warn("Failed in calling interceptor: " + interceptor, e);
}
- }
+ }
}
return true;
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -127,7 +127,7 @@
// RemotingConnection implementation
// ------------------------------------------------------------
- public long getID()
+ public Object getID()
{
return transportConnection.getID();
}
@@ -240,9 +240,9 @@
}
public synchronized void fail(final MessagingException me)
- {
+ {
log.warn(me.getMessage());
-
+
destroy();
// Then call the listeners
@@ -258,7 +258,7 @@
// executing
log.error("Failed to execute failure listener", t);
}
- }
+ }
}
public synchronized void destroy()
@@ -292,22 +292,22 @@
// Private
// --------------------------------------------------------------------------------------
-
+
private void doWrite(final Packet packet)
{
if (destroyed)
{
throw new IllegalStateException("Cannot write packet to connection, it is destroyed");
}
-
+
MessagingBuffer buffer = transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
packet.encode(buffer);
transportConnection.write(buffer);
}
-
+
// Inner classes
// --------------------------------------------------------------------------------
@@ -323,7 +323,7 @@
fail(me);
}
-
+
gotPong = false;
firstTime = false;
@@ -351,9 +351,9 @@
return id;
}
- public void handle(long connectionID, Packet packet)
+ public void handle(Object connectionID, Packet packet)
{
- gotPong = true;
+ gotPong = true;
}
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java 2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingHandlerImpl.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,17 +18,11 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.remoting.impl;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CLOSE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATECONNECTION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATECONNECTION_RESP;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.NULL;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PING;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PONG;
-import static org.jboss.messaging.util.DataConstants.SIZE_INT;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.*;
+import static org.jboss.messaging.util.DataConstants.*;
import java.util.HashSet;
import java.util.Map;
@@ -87,33 +81,33 @@
import org.jboss.messaging.util.OrderedExecutorFactory;
/**
- *
+ *
* A RemotingHandlerImpl
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
*/
public class RemotingHandlerImpl implements RemotingHandler
-{
+{
private static final Logger log = Logger.getLogger(RemotingHandlerImpl.class);
-
+
private final PacketDispatcher dispatcher;
private final ExecutorFactory executorFactory;
private final ConcurrentMap<Long, Executor> executors = new ConcurrentHashMap<Long, Executor>();
-
- private final ConcurrentMap<Long, Long> lastPings = new ConcurrentHashMap<Long, Long>();
- public RemotingHandlerImpl(final PacketDispatcher dispatcher, final ExecutorService executorService)
+ private final ConcurrentMap<Object, Long> lastPings = new ConcurrentHashMap<Object, Long>();
+
+ public RemotingHandlerImpl(final PacketDispatcher dispatcher, final ExecutorService executorService)
{
if (dispatcher == null)
{
throw new IllegalArgumentException ("argument dispatcher can't be null");
}
-
+
this.dispatcher = dispatcher;
-
+
if (executorService != null)
{
executorFactory = new OrderedExecutorFactory(executorService);
@@ -123,30 +117,30 @@
executorFactory = null;
}
}
-
- public Set<Long> scanForFailedConnections(final long expirePeriod)
+
+ public Set<Object> scanForFailedConnections(final long expirePeriod)
{
long now = System.currentTimeMillis();
-
- Set<Long> failedIDs = new HashSet<Long>();
-
- for (Map.Entry<Long, Long> entry: lastPings.entrySet())
+
+ Set<Object> failedIDs = new HashSet<Object>();
+
+ for (Map.Entry<Object, Long> entry: lastPings.entrySet())
{
long lastPing = entry.getValue();
-
+
if (now - lastPing > expirePeriod)
{
failedIDs.add(entry.getKey());
}
}
-
+
return failedIDs;
}
-
- public void bufferReceived(final long connectionID, final MessagingBuffer buffer) throws Exception
+
+ public void bufferReceived(final Object connectionID, final MessagingBuffer buffer) throws Exception
{
final Packet packet = decode(connectionID, buffer);
-
+
if (executorFactory != null)
{
long executorID = packet.getExecutorID();
@@ -183,9 +177,9 @@
else
{
dispatcher.dispatch(connectionID, packet);
- }
+ }
}
-
+
public int isReadyToHandle(final MessagingBuffer buffer)
{
if (buffer.remaining() <= SIZE_INT)
@@ -194,15 +188,15 @@
}
int length = buffer.getInt();
-
+
if (buffer.remaining() < length)
{
return -1;
}
-
+
return length;
}
-
+
public void closeExecutor(final long executorID)
{
if (executors != null)
@@ -210,28 +204,28 @@
executors.remove(executorID);
}
}
-
- public void removeLastPing(final long connectionID)
+
+ public void removeLastPing(final Object connectionID)
{
lastPings.remove(connectionID);
}
-
+
// Public ------------------------------------------------------------------------------
-
+
public int getNumExecutors()
{
return executors.size();
}
-
- public Packet decode(final long connectionID, final MessagingBuffer in) throws Exception
- {
+
+ public Packet decode(final Object connectionID, final MessagingBuffer in) throws Exception
+ {
byte packetType = in.getByte();
-
+
Packet packet;
switch (packetType)
{
- case NULL:
+ case PacketImpl.NULL:
{
packet = new PacketImpl(PacketImpl.NULL);
break;
@@ -509,12 +503,12 @@
}
packet.decode(in);
-
+
return packet;
- }
-
+ }
+
// Private -----------------------------------------------------------------------------
-
-
+
+
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java 2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,11 +18,11 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.remoting.impl;
-import static org.jboss.messaging.core.remoting.impl.RemotingConfigurationValidator.validate;
+import static org.jboss.messaging.core.remoting.impl.RemotingConfigurationValidator.*;
import java.util.HashSet;
import java.util.Map;
@@ -51,13 +51,13 @@
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
* @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
+ *
* @version <tt>$Revision$</tt>
*/
public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycleListener
{
// Constants -----------------------------------------------------
-
+
private static final Logger log = Logger.getLogger(RemotingServiceImpl.class);
// Attributes ----------------------------------------------------
@@ -71,37 +71,37 @@
private final PacketDispatcher dispatcher;
private final ExecutorService remotingExecutor;
-
+
private RemotingHandler handler;
-
+
private final long connectionExpirePeriod;
-
- private final Map<Long, RemotingConnection> connections = new ConcurrentHashMap<Long, RemotingConnection>();
-
+
+ private final Map<Object, RemotingConnection> connections = new ConcurrentHashMap<Object, RemotingConnection>();
+
private final Set<AcceptorFactory> acceptorFactories = new HashSet<AcceptorFactory>();
-
+
private final Timer failedConnectionTimer = new Timer(true);
-
+
private TimerTask failedConnectionsTask;
-
+
// Static --------------------------------------------------------
-
+
// Constructors --------------------------------------------------
-
+
public RemotingServiceImpl(final Configuration config)
{
validate(config);
this.config = config;
-
+
dispatcher = new PacketDispatcherImpl(null);
remotingExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-session-ordering-threads"));
-
+
handler = new RemotingHandlerImpl(dispatcher, remotingExecutor);
-
+
long pingPeriod = config.getConnectionParams().getPingInterval();
-
+
if (pingPeriod != -1)
{
connectionExpirePeriod = (long)(1.5 * pingPeriod);
@@ -109,8 +109,8 @@
else
{
connectionExpirePeriod = -1;
- }
-
+ }
+
ClassLoader loader = Thread.currentThread().getContextClassLoader();
for (String interceptorClass : config.getInterceptorClassNames())
{
@@ -124,12 +124,12 @@
log.warn("Error instantiating interceptor \"" + interceptorClass + "\"", e);
}
}
-
+
for (String factoryClass: config.getAcceptorFactoryClassNames())
{
try
{
- Class<?> clazz = loader.loadClass(factoryClass);
+ Class<?> clazz = loader.loadClass(factoryClass);
acceptorFactories.add((AcceptorFactory)clazz.newInstance());
}
catch (Exception e)
@@ -138,7 +138,7 @@
}
}
}
-
+
// RemotingService implementation -------------------------------
public synchronized void start() throws Exception
@@ -147,23 +147,23 @@
{
return;
}
-
+
for (AcceptorFactory factory: acceptorFactories)
{
Acceptor acceptor = factory.createAcceptor(config, handler, this);
-
+
acceptors.add(acceptor);
}
-
+
for (Acceptor a : acceptors)
{
a.start();
}
-
+
if (connectionExpirePeriod != -1)
{
failedConnectionsTask = new FailedConnectionsTask();
-
+
failedConnectionTimer.schedule(failedConnectionsTask, 0, 1000);
}
@@ -176,22 +176,22 @@
{
return;
}
-
+
if (failedConnectionsTask != null)
{
failedConnectionsTask.cancel();
-
+
failedConnectionsTask = null;
}
-
+
for (Acceptor acceptor : acceptors)
{
acceptor.stop();
}
-
+
started = false;
}
-
+
public boolean isStarted()
{
return started;
@@ -206,12 +206,12 @@
{
return acceptors;
}
-
- public RemotingConnection getConnection(final long remotingConnectionID)
+
+ public RemotingConnection getConnection(final Object remotingConnectionID)
{
return connections.get(remotingConnectionID);
}
-
+
public synchronized void registerAcceptorFactory(final AcceptorFactory factory)
{
acceptorFactories.add(factory);
@@ -221,46 +221,46 @@
{
acceptorFactories.remove(factory);
}
-
+
public synchronized Set<RemotingConnection> getConnections()
{
return new HashSet<RemotingConnection>(connections.values());
}
// ConnectionLifeCycleListener implementation -----------------------------------
-
+
public void connectionCreated(final Connection connection)
{
RemotingConnection rc =
new RemotingConnectionImpl(connection, dispatcher, null, config.getConnectionParams().getCallTimeout());
-
- this.connections.put(connection.getID(), rc);
+
+ connections.put(connection.getID(), rc);
}
- public void connectionDestroyed(long connectionID)
+ public void connectionDestroyed(Object connectionID)
{
handler.removeLastPing(connectionID);
-
+
if (connections.remove(connectionID) == null)
{
throw new IllegalStateException("Cannot find connection with id " + connectionID);
- }
+ }
}
- public void connectionException(long connectionID, MessagingException me)
+ public void connectionException(Object connectionID, MessagingException me)
{
RemotingConnection rc = connections.remove(connectionID);
-
+
if (rc == null)
{
throw new IllegalStateException("Cannot find connection with id " + connectionID);
}
-
+
rc.fail(me);
}
-
+
// Public --------------------------------------------------------
-
+
/*
* Used in testing
*/
@@ -276,44 +276,46 @@
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
-
+
private class FailedConnectionsTask extends TimerTask
{
private boolean cancelled;
-
+
+ @Override
public synchronized void run()
{
if (cancelled)
{
return;
}
-
- Set<Long> failedIDs = handler.scanForFailedConnections(connectionExpirePeriod);
-
- for (long id: failedIDs)
+
+ Set<Object> failedIDs = handler.scanForFailedConnections(connectionExpirePeriod);
+
+ for (Object id: failedIDs)
{
RemotingConnection conn = connections.get(id);
-
+
if (conn == null)
{
throw new IllegalStateException("Cannot find connection with id " + id);
}
-
+
MessagingException me = new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
"Did not receive ping on connection. It is likely a client has exited or crashed without " +
"closing its connection, or the network between the server and client has failed. The connection will now be closed.");
-
+
conn.fail(me);
}
}
-
+
+ @Override
public synchronized boolean cancel()
{
cancelled = true;
-
+
return super.cancel();
}
-
+
}
}
\ No newline at end of file
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/ResponseHandlerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ResponseHandlerImpl.java 2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/ResponseHandlerImpl.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,7 +18,7 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.remoting.impl;
@@ -26,9 +26,9 @@
import org.jboss.messaging.core.remoting.ResponseHandler;
/**
- *
+ *
* A ResponseHandlerImpl
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
*/
@@ -50,7 +50,7 @@
return id;
}
- public synchronized void handle(final long remotingConnectionID, final Packet packet)
+ public synchronized void handle(final Object remotingConnectionID, final Packet packet)
{
if (failed)
{
@@ -58,7 +58,7 @@
return;
}
- this.response = packet;
+ response = packet;
notify();
}
@@ -104,7 +104,7 @@
return response;
}
-
+
public void reset()
{
response = null;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnection.java 2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnection.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -95,9 +95,9 @@
return new IoBufferWrapper(buffer);
}
- public long getID()
+ public Object getID()
{
- return session.getId();
+ return Long.valueOf(session.getId());
}
public void write(final MessagingBuffer buffer)
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/ChannelBufferWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/ChannelBufferWrapper.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/ChannelBufferWrapper.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -0,0 +1,458 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.netty;
+
+import static org.jboss.messaging.util.DataConstants.*;
+import static org.jboss.netty.buffer.ChannelBuffers.*;
+
+import java.nio.ByteBuffer;
+
+import org.jboss.messaging.core.remoting.MessagingBuffer;
+import org.jboss.messaging.util.SimpleString;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ * Wraps Netty {@link ChannelBuffer} with {@link MessagingBuffer}.
+ * Because there's neither {@code position()} nor {@code limit()} in a Netty
+ * buffer. {@link ChannelBuffer#readerIndex()} and {@link ChannelBuffer#writerIndex()}
+ * are used as {@code position} and {@code limit} of the buffer respectively
+ * instead.
+ *
+ * @author <a href="mailto:tlee at redhat.com">Trustin Lee</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * @version $Rev$, $Date$
+ */
+public class ChannelBufferWrapper implements MessagingBuffer
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final ChannelBuffer buffer;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ChannelBufferWrapper(final int size)
+ {
+ buffer = dynamicBuffer(size);
+ buffer.writerIndex(buffer.capacity());
+ }
+
+ public ChannelBufferWrapper(final ChannelBuffer buffer)
+ {
+ this.buffer = buffer;
+ }
+
+ // Public --------------------------------------------------------
+
+ // MessagingBuffer implementation ----------------------------------------------
+
+ public byte[] array()
+ {
+ ByteBuffer bb = buffer.toByteBuffer();
+ if (bb.hasArray() && !bb.isReadOnly()) {
+ return bb.array();
+ } else {
+ byte[] ba = new byte[bb.remaining()];
+ bb.get(ba);
+ return ba;
+ }
+ }
+
+ public int position()
+ {
+ return buffer.readerIndex();
+ }
+
+ public void position(final int position)
+ {
+ buffer.readerIndex(position);
+ }
+
+ public int limit()
+ {
+ return buffer.writerIndex();
+ }
+
+ public void limit(final int limit)
+ {
+ buffer.writerIndex(limit);
+ }
+
+ public int capacity()
+ {
+ return buffer.capacity();
+ }
+
+ public void flip()
+ {
+ int oldPosition = position();
+ position(0);
+ limit(oldPosition);
+ }
+
+ public MessagingBuffer slice()
+ {
+ return new ChannelBufferWrapper(buffer.slice());
+ }
+
+ public MessagingBuffer createNewBuffer(int len)
+ {
+ return new ChannelBufferWrapper(len);
+ }
+
+ public int remaining()
+ {
+ return buffer.readableBytes();
+ }
+
+ public void rewind()
+ {
+ position(0);
+ buffer.markReaderIndex();
+ }
+
+ public void putByte(byte byteValue)
+ {
+ int limit = buffer.writerIndex();
+ buffer.writerIndex(buffer.readerIndex());
+ try {
+ buffer.writeByte(byteValue);
+ } finally {
+ buffer.readerIndex(buffer.writerIndex());
+ if (limit < buffer.readerIndex()) {
+ limit = buffer.readerIndex();
+ }
+ buffer.writerIndex(limit);
+ }
+ }
+
+ public void putBytes(final byte[] byteArray)
+ {
+ int limit = buffer.writerIndex();
+ buffer.writerIndex(buffer.readerIndex());
+ try {
+ buffer.writeBytes(byteArray);
+ } finally {
+ buffer.readerIndex(buffer.writerIndex());
+ if (limit < buffer.readerIndex()) {
+ limit = buffer.readerIndex();
+ }
+ buffer.writerIndex(limit);
+ }
+ }
+
+ public void putBytes(final byte[] bytes, int offset, int length)
+ {
+ int limit = buffer.writerIndex();
+ buffer.writerIndex(buffer.readerIndex());
+ try {
+ buffer.writeBytes(bytes, offset, length);
+ } finally {
+ buffer.readerIndex(buffer.writerIndex());
+ if (limit < buffer.readerIndex()) {
+ limit = buffer.readerIndex();
+ }
+ buffer.writerIndex(limit);
+ }
+ }
+
+ public void putInt(final int intValue)
+ {
+ int limit = buffer.writerIndex();
+ buffer.writerIndex(buffer.readerIndex());
+ try {
+ buffer.writeInt(intValue);
+ } finally {
+ buffer.readerIndex(buffer.writerIndex());
+ if (limit < buffer.readerIndex()) {
+ limit = buffer.readerIndex();
+ }
+ buffer.writerIndex(limit);
+ }
+ }
+
+ public void putInt(final int pos, final int intValue)
+ {
+ buffer.setInt(pos, intValue);
+ }
+
+ public void putLong(final long longValue)
+ {
+ int limit = buffer.writerIndex();
+ buffer.writerIndex(buffer.readerIndex());
+ try {
+ buffer.writeLong(longValue);
+ } finally {
+ buffer.readerIndex(buffer.writerIndex());
+ if (limit < buffer.readerIndex()) {
+ limit = buffer.readerIndex();
+ }
+ buffer.writerIndex(limit);
+ }
+ }
+
+ public void putFloat(final float floatValue)
+ {
+ putInt(Float.floatToIntBits(floatValue));
+ }
+
+ public void putDouble(final double d)
+ {
+ putLong(Double.doubleToLongBits(d));
+ }
+
+ public void putShort(final short s)
+ {
+ int limit = buffer.writerIndex();
+ buffer.writerIndex(buffer.readerIndex());
+ try {
+ buffer.writeShort(s);
+ } finally {
+ buffer.readerIndex(buffer.writerIndex());
+ if (limit < buffer.readerIndex()) {
+ limit = buffer.readerIndex();
+ }
+ buffer.writerIndex(limit);
+ }
+ }
+
+ public void putChar(final char chr)
+ {
+ putShort((short) chr);
+ }
+
+ public byte getByte()
+ {
+ return buffer.readByte();
+ }
+
+ public short getUnsignedByte()
+ {
+ return buffer.readUnsignedByte();
+ }
+
+ public void getBytes(final byte[] b)
+ {
+ buffer.readBytes(b);
+ }
+
+ public void getBytes(final byte[] b, final int offset, final int length)
+ {
+ buffer.readBytes(b, offset, length);
+ }
+
+ public int getInt()
+ {
+ return buffer.readInt();
+ }
+
+ public long getLong()
+ {
+ return buffer.readLong();
+ }
+
+ public float getFloat()
+ {
+ return Float.intBitsToFloat(getInt());
+ }
+
+ public short getShort()
+ {
+ return buffer.readShort();
+ }
+
+ public int getUnsignedShort()
+ {
+ return buffer.readUnsignedShort();
+ }
+
+ public double getDouble()
+ {
+ return Double.longBitsToDouble(getLong());
+ }
+
+ public char getChar()
+ {
+ return (char) getShort();
+ }
+
+ public void putBoolean(final boolean b)
+ {
+ if (b)
+ {
+ putByte(TRUE);
+ } else
+ {
+ putByte(FALSE);
+ }
+ }
+
+ public boolean getBoolean()
+ {
+ byte b = getByte();
+ return b == TRUE;
+ }
+
+ public void putString(final String nullableString)
+ {
+ putInt(nullableString.length());
+
+ for (int i = 0; i < nullableString.length(); i++)
+ {
+ putChar(nullableString.charAt(i));
+ }
+ }
+
+ public void putNullableString(final String nullableString)
+ {
+ if (nullableString == null)
+ {
+ putByte(NULL);
+ }
+ else
+ {
+ putByte(NOT_NULL);
+ putString(nullableString);
+ }
+ }
+
+ public String getString()
+ {
+ int len = getInt();
+
+ char[] chars = new char[len];
+
+ for (int i = 0; i < len; i++)
+ {
+ chars[i] = getChar();
+ }
+
+ return new String(chars);
+ }
+
+ public String getNullableString()
+ {
+ byte check = getByte();
+
+ if (check == NULL)
+ {
+ return null;
+ }
+ else
+ {
+ return getString();
+ }
+ }
+
+ public void putUTF(final String str) throws Exception
+ {
+ ChannelBuffer encoded = copiedBuffer(str, "UTF-8");
+ int length = encoded.readableBytes();
+ if (length >= 65536) {
+ throw new IllegalArgumentException(
+ "the specified string is too long (" + length + ")");
+ }
+
+ int limit = buffer.writerIndex();
+ buffer.writerIndex(buffer.readerIndex());
+ try {
+ buffer.writeShort((short) length);
+ buffer.writeBytes(encoded);
+ } finally {
+ buffer.readerIndex(buffer.writerIndex());
+ if (limit < buffer.readerIndex()) {
+ limit = buffer.readerIndex();
+ }
+ buffer.writerIndex(limit);
+ }
+ }
+
+ public void putNullableSimpleString(final SimpleString string)
+ {
+ if (string == null)
+ {
+ putByte(NULL);
+ }
+ else
+ {
+ putByte(NOT_NULL);
+ putSimpleString(string);
+ }
+ }
+
+ public void putSimpleString(final SimpleString string)
+ {
+ byte[] data = string.getData();
+
+ putInt(data.length);
+ putBytes(data);
+ }
+
+ public SimpleString getSimpleString()
+ {
+ int len = getInt();
+
+ byte[] data = new byte[len];
+ getBytes(data);
+
+ return new SimpleString(data);
+ }
+
+ public SimpleString getNullableSimpleString()
+ {
+ int b = getByte();
+ if (b == NULL)
+ {
+ return null;
+ }
+ else
+ {
+ return getSimpleString();
+ }
+ }
+
+ public String getUTF() throws Exception
+ {
+ int length = buffer.readUnsignedShort();
+ ChannelBuffer utf8value = buffer.readSlice(length);
+ return utf8value.toString("UTF-8");
+ }
+
+ public Object getUnderlyingBuffer()
+ {
+ return buffer;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
\ No newline at end of file
Property changes on: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/ChannelBufferWrapper.java
___________________________________________________________________
Name: svn:keywords
+ Rev Date
Name: svn:eol-style
+ native
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/ChannelPipelineSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/ChannelPipelineSupport.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/ChannelPipelineSupport.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -0,0 +1,85 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.netty;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+import org.jboss.messaging.core.remoting.RemotingHandler;
+import org.jboss.messaging.core.remoting.impl.ssl.SSLSupport;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.handler.ssl.SslHandler;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ * @author <a href="mailto:tlee at redhat.com">Trustin Lee</a>
+ * @version $Rev$, $Date$
+ */
+public class ChannelPipelineSupport
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ private ChannelPipelineSupport()
+ {
+ // Unused
+ }
+
+ // Public --------------------------------------------------------
+
+ public static void addCodecFilter(final ChannelPipeline pipeline,
+ final RemotingHandler handler)
+ {
+ assert pipeline != null;
+ pipeline.addLast("decoder", new MessagingFrameDecoder(handler));
+ }
+
+ public static void addSSLFilter(
+ final ChannelPipeline pipeline, final boolean client,
+ final String keystorePath, final String keystorePassword, final String trustStorePath,
+ final String trustStorePassword) throws Exception
+ {
+ SSLContext context = SSLSupport.getInstance(client, keystorePath, keystorePassword,
+ trustStorePath, trustStorePassword);
+ SSLEngine engine = context.createSSLEngine();
+ if (client)
+ {
+ engine.setUseClientMode(true);
+ engine.setWantClientAuth(true);
+ }
+
+ SslHandler handler = new SslHandler(engine);
+ pipeline.addLast("ssl", handler);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Property changes on: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/ChannelPipelineSupport.java
___________________________________________________________________
Name: svn:keywords
+ Rev Date
Name: svn:eol-style
+ native
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/MessagingFrameDecoder.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/MessagingFrameDecoder.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/MessagingFrameDecoder.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -0,0 +1,74 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.netty;
+
+import static org.jboss.messaging.util.DataConstants.*;
+
+import org.jboss.messaging.core.remoting.RemotingHandler;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+
+/**
+ * A Netty FrameDecoder used to decode messages.
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ * @author <a href="tlee at redhat.com">Trustin Lee</a>
+ *
+ * @version $Revision$, $Date$
+ */
+public class MessagingFrameDecoder extends FrameDecoder
+{
+ private final RemotingHandler handler;
+
+ public MessagingFrameDecoder(final RemotingHandler handler)
+ {
+ this.handler = handler;
+ }
+
+ // FrameDecoder overrides
+ // -------------------------------------------------------------------------------------
+
+ @Override
+ protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer in) throws Exception
+ {
+ //TODO - we can avoid this entirely if we maintain fragmented packets in the handler
+ int start = in.readerIndex();
+
+ int length = handler.isReadyToHandle(new ChannelBufferWrapper(in));
+ if (length == -1)
+ {
+ in.readerIndex(start);
+ return false;
+ }
+
+ in.readerIndex(start + SIZE_INT);
+ return in.readBytes(length);
+ }
+}
+
+
+
+
Property changes on: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/MessagingFrameDecoder.java
___________________________________________________________________
Name: svn:keywords
+ Rev Date
Name: svn:eol-style
+ native
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyAcceptor.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyAcceptor.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -0,0 +1,196 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.netty;
+
+import static org.jboss.netty.channel.Channels.*;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.ConnectionLifeCycleListener;
+import org.jboss.messaging.core.remoting.RemotingHandler;
+import org.jboss.messaging.core.remoting.spi.Acceptor;
+import org.jboss.messaging.core.remoting.spi.Connection;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChildChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+
+/**
+ * A Netty TCP Acceptor that supports SSL
+ *
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ * @author <a href="tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="tlee at redhat.com">Trustin Lee</a>
+ *
+ * @version $Rev$, $Date$
+ */
+public class NettyAcceptor implements Acceptor
+{
+ private static final Logger log = Logger.getLogger(NettyAcceptor.class);
+
+ private ExecutorService bossExecutor;
+ private ExecutorService workerExecutor;
+ private ChannelFactory channelFactory;
+ private Channel serverChannel;
+ private ServerBootstrap bootstrap;
+ private NettyChildChannelHandler childChannelHandler;
+
+ private final Configuration configuration;
+
+ private final RemotingHandler handler;
+
+ private final ConnectionLifeCycleListener listener;
+
+ public NettyAcceptor(final Configuration configuration, final RemotingHandler handler,
+ final ConnectionLifeCycleListener listener)
+ {
+ this.configuration = configuration;
+
+ this.handler = handler;
+
+ this.listener = listener;
+ }
+
+ public synchronized void start() throws Exception
+ {
+ if (channelFactory != null)
+ {
+ //Already started
+ return;
+ }
+
+ bossExecutor = Executors.newCachedThreadPool();
+ workerExecutor = Executors.newCachedThreadPool();
+ channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor);
+ bootstrap = new ServerBootstrap(channelFactory);
+ childChannelHandler = new NettyChildChannelHandler();
+ bootstrap.setParentHandler(childChannelHandler);
+
+ bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+ public ChannelPipeline getPipeline() throws Exception
+ {
+ ChannelPipeline pipeline = pipeline();
+ if (configuration.isSSLEnabled())
+ {
+ ChannelPipelineSupport.addSSLFilter(pipeline, false, configuration.getKeyStorePath(),
+ configuration.getKeyStorePassword(),
+ configuration.getTrustStorePath(),
+ configuration.getTrustStorePassword());
+ }
+ ChannelPipelineSupport.addCodecFilter(pipeline, handler);
+ pipeline.addLast("handler", new NettyHandler());
+ return pipeline;
+ }
+ });
+
+ // Bind
+ bootstrap.setOption("localAddress", new InetSocketAddress(configuration.getHost(), configuration.getPort()));
+ bootstrap.setOption("child.tcpNoDelay", configuration.getConnectionParams().isTcpNoDelay());
+ int receiveBufferSize = configuration.getConnectionParams().getTcpReceiveBufferSize();
+ if (receiveBufferSize != -1)
+ {
+ bootstrap.setOption("child.receiveBufferSize", receiveBufferSize);
+ }
+ int sendBufferSize = configuration.getConnectionParams().getTcpSendBufferSize();
+ if (sendBufferSize != -1)
+ {
+ bootstrap.setOption("child.sendBufferSize", sendBufferSize);
+ }
+ bootstrap.setOption("reuseAddress", true);
+ bootstrap.setOption("child.reuseAddress", true);
+ bootstrap.setOption("child.keepAlive", true);
+
+ serverChannel = bootstrap.bind();
+ }
+
+ public synchronized void stop()
+ {
+ if (channelFactory == null)
+ {
+ return;
+ }
+
+ // remove the listener before disposing the acceptor
+ // so that we're not notified when the sessions are destroyed
+ serverChannel.getPipeline().remove(childChannelHandler);
+ serverChannel.close().awaitUninterruptibly();
+ bossExecutor.shutdown();
+ workerExecutor.shutdown();
+ channelFactory = null;
+ }
+
+ // Inner classes -----------------------------------------------------------------------------
+
+ private final class NettyHandler extends SimpleChannelHandler
+ {
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception
+ {
+ log.error(
+ "caught exception " + e.getCause() + " for channel " +
+ e.getChannel(), e.getCause());
+ MessagingException me = new MessagingException(MessagingException.INTERNAL_ERROR, "Netty exception");
+ me.initCause(e.getCause());
+ listener.connectionException(e.getChannel().getId(), me);
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
+ {
+ ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
+ handler.bufferReceived(e.getChannel().getId(), new ChannelBufferWrapper(buffer));
+ }
+ }
+
+ private final class NettyChildChannelHandler extends SimpleChannelHandler
+ {
+
+ @Override
+ public void childChannelOpen(ChannelHandlerContext ctx, ChildChannelStateEvent e) throws Exception
+ {
+ Connection tc = new NettyConnection(e.getChildChannel());
+ listener.connectionCreated(tc);
+ ctx.sendUpstream(e);
+ }
+
+ @Override
+ public void childChannelClosed(ChannelHandlerContext ctx, ChildChannelStateEvent e) throws Exception
+ {
+ listener.connectionDestroyed(e.getChildChannel().getId());
+ ctx.sendUpstream(e);
+ }
+ }
+}
Property changes on: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyAcceptor.java
___________________________________________________________________
Name: svn:keywords
+ Rev Date
Name: svn:eol-style
+ native
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyAcceptorFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyAcceptorFactory.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyAcceptorFactory.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.netty;
+
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.remoting.ConnectionLifeCycleListener;
+import org.jboss.messaging.core.remoting.RemotingHandler;
+import org.jboss.messaging.core.remoting.spi.Acceptor;
+import org.jboss.messaging.core.remoting.spi.AcceptorFactory;
+
+/**
+ * A NettyAcceptorFactory
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ */
+public class NettyAcceptorFactory implements AcceptorFactory
+{
+ public Acceptor createAcceptor(final Configuration configuration,
+ final RemotingHandler handler,
+ final ConnectionLifeCycleListener listener)
+ {
+ return new NettyAcceptor(configuration, handler, listener);
+ }
+}
Property changes on: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyAcceptorFactory.java
___________________________________________________________________
Name: svn:keywords
+ Rev Date
Name: svn:eol-style
+ native
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnection.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnection.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -0,0 +1,110 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.netty;
+
+import org.jboss.messaging.core.remoting.MessagingBuffer;
+import org.jboss.messaging.core.remoting.spi.Connection;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.handler.ssl.SslHandler;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * buhnaflagilibrn
+ * @version <tt>$Revision$</tt>
+ */
+public class NettyConnection implements Connection
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final Channel channel;
+
+ private boolean closed;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public NettyConnection(final Channel channel)
+ {
+ this.channel = channel;
+ }
+
+ // Public --------------------------------------------------------
+
+ // Connection implementation ----------------------------
+
+ public synchronized void close()
+ {
+ if (closed)
+ {
+ return;
+ }
+
+ channel.close().awaitUninterruptibly();
+
+ SslHandler sslHandler = (SslHandler) channel.getPipeline().get("ssl");
+ if (sslHandler != null)
+ {
+ try
+ {
+ sslHandler.close(channel).awaitUninterruptibly();
+ }
+ catch (Throwable t)
+ {
+ // ignore
+ }
+ }
+
+ closed = true;
+ }
+
+ public MessagingBuffer createBuffer(int size)
+ {
+ return new ChannelBufferWrapper(size);
+ }
+
+ public Object getID()
+ {
+ return channel.getId();
+ }
+
+ public void write(final MessagingBuffer buffer)
+ {
+ channel.write(buffer.getUnderlyingBuffer());
+ }
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Property changes on: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnection.java
___________________________________________________________________
Name: svn:keywords
+ Rev Date
Name: svn:eol-style
+ native
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnector.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnector.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -0,0 +1,230 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl.netty;
+
+import static org.jboss.netty.channel.Channels.*;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.jboss.messaging.core.client.ConnectionParams;
+import org.jboss.messaging.core.client.Location;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.ConnectionLifeCycleListener;
+import org.jboss.messaging.core.remoting.RemotingHandler;
+import org.jboss.messaging.core.remoting.spi.Connection;
+import org.jboss.messaging.core.remoting.spi.Connector;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+
+/**
+ *
+ * A NettyConnector
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:tlee at redhat.com">Trustin Lee</a>
+ */
+public class NettyConnector implements Connector
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(NettyConnection.class);
+
+ // Attributes ----------------------------------------------------
+
+ private ExecutorService bossExecutor;
+ private ExecutorService workerExecutor;
+ private ChannelFactory channelFactory;
+ private ClientBootstrap bootstrap;
+
+ private final RemotingHandler handler;
+
+ private final Location location;
+
+ private final ConnectionLifeCycleListener listener;
+
+ private final ConnectionParams params;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public NettyConnector(final Location location, final ConnectionParams params,
+ final RemotingHandler handler,
+ final ConnectionLifeCycleListener listener)
+ {
+ if (location == null)
+ {
+ throw new IllegalArgumentException("Invalid argument null location");
+ }
+
+ if (params == null)
+ {
+ throw new IllegalArgumentException("Invalid argument null connection params");
+ }
+
+ if (handler == null)
+ {
+ throw new IllegalArgumentException("Invalid argument null handler");
+ }
+
+ if (listener == null)
+ {
+ throw new IllegalArgumentException("Invalid argument null listener");
+ }
+
+ this.handler = handler;
+ this.location = location;
+ this.listener = listener;
+ this.params = params;
+ }
+
+ public synchronized void start()
+ {
+ if (channelFactory != null)
+ {
+ return;
+ }
+
+ bossExecutor = Executors.newCachedThreadPool();
+ workerExecutor = Executors.newCachedThreadPool();
+ channelFactory = new NioClientSocketChannelFactory(bossExecutor, workerExecutor);
+ bootstrap = new ClientBootstrap(channelFactory);
+
+ bootstrap.setOption("tcpNoDelay", params.isTcpNoDelay());
+ if (params.getTcpReceiveBufferSize() != -1)
+ {
+ bootstrap.setOption("receiveBufferSize", params.getTcpReceiveBufferSize());
+ }
+ if (params.getTcpSendBufferSize() != -1)
+ {
+ bootstrap.setOption("sendBufferSize", params.getTcpSendBufferSize());
+ }
+ bootstrap.setOption("keepAlive", true);
+ bootstrap.setOption("reuseAddress", true);
+
+ bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+
+ public ChannelPipeline getPipeline() throws Exception
+ {
+ ChannelPipeline pipeline = pipeline();
+ if (params.isSSLEnabled())
+ {
+ try
+ {
+ ChannelPipelineSupport.addSSLFilter(pipeline, true, params.getKeyStorePath(), params.getKeyStorePassword(), null, null);
+ }
+ catch (Exception e)
+ {
+ IllegalStateException ise = new IllegalStateException("Unable to create MinaConnection for " + location);
+ ise.initCause(e);
+ throw ise;
+ }
+ }
+ ChannelPipelineSupport.addCodecFilter(pipeline, handler);
+ pipeline.addLast("handler", new NettyHandler());
+ return pipeline;
+ }
+ });
+ }
+
+ public synchronized void close()
+ {
+ if (channelFactory == null)
+ {
+ return;
+ }
+
+ bossExecutor.shutdown();
+ workerExecutor.shutdown();
+ bootstrap = null;
+ channelFactory = null;
+ }
+
+ public Connection createConnection()
+ {
+ InetSocketAddress address = new InetSocketAddress(location.getHost(), location.getPort());
+ ChannelFuture future = bootstrap.connect(address);
+ future.awaitUninterruptibly();
+
+ if (future.isSuccess())
+ {
+ return new NettyConnection(future.getChannel());
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+ private final class NettyHandler extends SimpleChannelHandler
+ {
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
+ {
+ ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
+ handler.bufferReceived(e.getChannel().getId(), new ChannelBufferWrapper(buffer));
+ }
+
+ @Override
+ public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception
+ {
+ listener.connectionDestroyed(e.getChannel().getId());
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception
+ {
+ log.error(
+ "caught exception " + e.getCause() + " for session " +
+ e.getChannel(), e.getCause());
+
+ MessagingException me = new MessagingException(MessagingException.INTERNAL_ERROR, "Netty exception");
+ me.initCause(e.getCause());
+ listener.connectionException(e.getChannel().getId(), me);
+ }
+ }
+}
Property changes on: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnector.java
___________________________________________________________________
Name: svn:keywords
+ Rev Date
Name: svn:eol-style
+ native
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnectorFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnectorFactory.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnectorFactory.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -0,0 +1,22 @@
+package org.jboss.messaging.core.remoting.impl.netty;
+
+import org.jboss.messaging.core.client.ConnectionParams;
+import org.jboss.messaging.core.client.Location;
+import org.jboss.messaging.core.remoting.ConnectionLifeCycleListener;
+import org.jboss.messaging.core.remoting.RemotingHandler;
+import org.jboss.messaging.core.remoting.spi.Connector;
+import org.jboss.messaging.core.remoting.spi.ConnectorFactory;
+
+/**
+ * A NettyConnectorFactory
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ */
+public class NettyConnectorFactory implements ConnectorFactory
+{
+ public Connector createConnector(final Location location, final ConnectionParams params,
+ final RemotingHandler handler, final ConnectionLifeCycleListener listener)
+ {
+ return new NettyConnector(location, params, handler, listener);
+ }
+}
Property changes on: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnectorFactory.java
___________________________________________________________________
Name: svn:keywords
+ Rev Date
Name: svn:eol-style
+ native
Modified: trunk/src/main/org/jboss/messaging/core/remoting/spi/Connection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/spi/Connection.java 2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/remoting/spi/Connection.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,26 +18,26 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.remoting.spi;
import org.jboss.messaging.core.remoting.MessagingBuffer;
/**
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
+ *
* @version <tt>$Revision$</tt>
- *
+ *
*/
public interface Connection
{
MessagingBuffer createBuffer(int size);
-
- long getID();
+ Object getID();
+
void write(MessagingBuffer buffer);
- void close();
+ void close();
}
\ No newline at end of file
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,13 +18,11 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.server.impl;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATECONNECTION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PING;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PONG;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.*;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
@@ -49,13 +47,13 @@
private static final Logger log = Logger.getLogger(MessagingServerPacketHandler.class);
private final MessagingServer server;
-
+
private final RemotingService remotingService;
public MessagingServerPacketHandler(final MessagingServer server, final RemotingService remotingService)
{
this.server = server;
-
+
this.remotingService = remotingService;
}
@@ -64,15 +62,15 @@
//0 is reserved for this handler
return 0;
}
-
- public void handle(final long connectionID, final Packet packet)
+
+ public void handle(final Object connectionID, final Packet packet)
{
Packet response = null;
-
+
RemotingConnection connection = remotingService.getConnection(connectionID);
byte type = packet.getType();
-
+
try
{
if (type == PING)
@@ -80,13 +78,13 @@
response = new PacketImpl(PONG);
}
else if (type == CREATECONNECTION)
- {
+ {
CreateConnectionRequest request = (CreateConnectionRequest) packet;
-
+
response =
- server.createConnection(request.getUsername(), request.getPassword(),
+ server.createConnection(request.getUsername(), request.getPassword(),
request.getVersion(),
- connection);
+ connection);
}
else
{
@@ -97,23 +95,23 @@
catch (Throwable t)
{
MessagingException me;
-
- log.error("Caught unexpected exception", t);
-
+
+ log.error("Caught unexpected exception", t);
+
if (t instanceof MessagingException)
{
me = (MessagingException)t;
}
else
- {
+ {
me = new MessagingException(MessagingException.INTERNAL_ERROR);
}
-
- response = new MessagingExceptionMessage(me);
+
+ response = new MessagingExceptionMessage(me);
}
-
+
response.normalize(packet);
-
+
connection.sendOneWay(response);
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerBrowserImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerBrowserImpl.java 2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerBrowserImpl.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,16 +18,11 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.server.impl;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CLOSE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.NO_ID_SET;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.NULL;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_HASNEXTMESSAGE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_NEXTMESSAGE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_RESET;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.*;
import java.util.ArrayList;
import java.util.Iterator;
@@ -54,11 +49,11 @@
/**
* Concrete implementation of BrowserEndpoint.
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
* @version <tt>$Revision: 3778 $</tt>
- *
+ *
* $Id: ServerBrowserImpl.java 3778 2008-02-24 12:15:29Z timfox $
*/
public class ServerBrowserImpl
@@ -77,29 +72,29 @@
private final Filter filter;
private Iterator<ServerMessage> iterator;
private final RemotingConnection remotingConnection;
-
+
// Constructors ---------------------------------------------------------------------------------
public ServerBrowserImpl(final ServerSession session,
final Queue destination, final String messageFilter,
final PacketDispatcher dispatcher,
final RemotingConnection remotingConnection) throws MessagingException
- {
+ {
this.session = session;
-
- this.id = dispatcher.generateID();
-
+
+ id = dispatcher.generateID();
+
this.destination = destination;
if (messageFilter != null)
- {
+ {
filter = new FilterImpl(new SimpleString(messageFilter));
}
else
{
filter = null;
}
-
+
this.remotingConnection = remotingConnection;
}
@@ -109,7 +104,7 @@
{
return id;
}
-
+
public void reset() throws Exception
{
iterator = createIterator();
@@ -126,7 +121,7 @@
return has;
}
-
+
public ServerMessage nextMessage() throws Exception
{
if (iterator == null)
@@ -161,29 +156,33 @@
messages.add(m);
i++;
}
- else break;
- }
- return (Message[])messages.toArray(new Message[messages.size()]);
+ else
+ {
+ break;
+ }
+ }
+ return messages.toArray(new Message[messages.size()]);
}
-
+
public void close() throws Exception
{
iterator = null;
-
+
session.removeBrowser(this);
-
+
log.trace(this + " closed");
}
-
+
// Public ---------------------------------------------------------------------------------------
+ @Override
public String toString()
{
return "BrowserEndpoint[" + id + "]";
}
// Package protected ----------------------------------------------------------------------------
-
+
// Protected ------------------------------------------------------------------------------------
// Private --------------------------------------------------------------------------------------
@@ -191,14 +190,14 @@
private Iterator<ServerMessage> createIterator()
{
List<MessageReference> refs = destination.list(filter);
-
+
List<ServerMessage> msgs = new ArrayList<ServerMessage>();
-
+
for (MessageReference ref: refs)
{
msgs.add(ref.getMessage());
}
-
+
return msgs.iterator();
}
@@ -208,15 +207,15 @@
}
// Inner classes --------------------------------------------------------------------------------
-
+
private class ServerBrowserEndpointHandler implements PacketHandler
{
public long getID()
{
- return ServerBrowserImpl.this.id;
+ return id;
}
-
- public void handle(final long connectionID, final Packet packet)
+
+ public void handle(final Object connectionID, final Packet packet)
{
Packet response = null;
@@ -226,19 +225,19 @@
switch (type)
{
case SESS_BROWSER_HASNEXTMESSAGE:
- response = new SessionBrowserHasNextMessageResponseMessage(hasNextMessage());
+ response = new SessionBrowserHasNextMessageResponseMessage(hasNextMessage());
break;
case SESS_BROWSER_NEXTMESSAGE:
- ServerMessage message = nextMessage();
+ ServerMessage message = nextMessage();
response = new ReceiveMessage(message, 0, 0);
break;
- case SESS_BROWSER_RESET:
+ case SESS_BROWSER_RESET:
reset();
- response = new PacketImpl(NULL);
+ response = new PacketImpl(NULL);
break;
case CLOSE:
close();
- response = new PacketImpl(NULL);
+ response = new PacketImpl(NULL);
break;
default:
response = new MessagingExceptionMessage(new MessagingException(MessagingException.UNSUPPORTED_PACKET,
@@ -248,24 +247,24 @@
catch (Throwable t)
{
MessagingException me;
-
- log.error("Caught unexpected exception", t);
-
+
+ log.error("Caught unexpected exception", t);
+
if (t instanceof MessagingException)
{
me = (MessagingException)t;
}
else
- {
+ {
me = new MessagingException(MessagingException.INTERNAL_ERROR);
}
-
- response = new MessagingExceptionMessage(me);
+
+ response = new MessagingExceptionMessage(me);
}
-
+
response.normalize(packet);
-
- remotingConnection.sendOneWay(response);
+
+ remotingConnection.sendOneWay(response);
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionPacketHandler.java 2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionPacketHandler.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,7 +18,7 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.server.impl;
@@ -43,13 +43,13 @@
private static final Logger log = Logger.getLogger(ServerConnectionPacketHandler.class);
private final ServerConnection connection;
-
+
private final RemotingConnection remotingConnection;
public ServerConnectionPacketHandler(final ServerConnection connection, final RemotingConnection remotingConnection)
{
this.connection = connection;
-
+
this.remotingConnection = remotingConnection;
}
@@ -58,14 +58,14 @@
return connection.getID();
}
- public void handle(final long remotingConnectionID, final Packet packet)
+ public void handle(final Object remotingConnectionID, final Packet packet)
{
Packet response = null;
byte type = packet.getType();
try
- {
+ {
switch (type)
{
case PacketImpl.CONN_CREATESESSION:
@@ -87,26 +87,26 @@
default:
response = new MessagingExceptionMessage(new MessagingException(MessagingException.UNSUPPORTED_PACKET,
"Unsupported packet " + type));
- }
+ }
}
catch (Throwable t)
{
MessagingException me;
-
- log.error("Caught unexpected exception", t);
-
+
+ log.error("Caught unexpected exception", t);
+
if (t instanceof MessagingException)
{
me = (MessagingException)t;
}
else
- {
+ {
me = new MessagingException(MessagingException.INTERNAL_ERROR);
}
-
- response = new MessagingExceptionMessage(me);
+
+ response = new MessagingExceptionMessage(me);
}
-
+
if (response instanceof MessagingExceptionMessage)
{
MessagingExceptionMessage mee = (MessagingExceptionMessage)response;
@@ -115,8 +115,8 @@
if (response != null)
{
response.normalize(packet);
-
- remotingConnection.sendOneWay(response);
+
+ remotingConnection.sendOneWay(response);
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerPacketHandler.java 2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerPacketHandler.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,7 +18,7 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.server.impl;
@@ -33,7 +33,7 @@
import org.jboss.messaging.core.server.ServerConsumer;
/**
- *
+ *
* A ServerConsumerPacketHandler
*
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -45,13 +45,13 @@
private static final Logger log = Logger.getLogger(ServerConsumerPacketHandler.class);
private final ServerConsumer consumer;
-
+
private final RemotingConnection remotingConnection;
-
+
public ServerConsumerPacketHandler(final ServerConsumer consumer, final RemotingConnection remotingConnection)
{
this.consumer = consumer;
-
+
this.remotingConnection = remotingConnection;
}
@@ -60,12 +60,12 @@
return consumer.getID();
}
- public void handle(final long remotingConnectionID, final Packet packet)
+ public void handle(final Object remotingConnectionID, final Packet packet)
{
Packet response = null;
byte type = packet.getType();
-
+
try
{
switch (type)
@@ -76,7 +76,7 @@
break;
case PacketImpl.CLOSE:
consumer.close();
- response = new PacketImpl(PacketImpl.NULL);
+ response = new PacketImpl(PacketImpl.NULL);
break;
default:
response = new MessagingExceptionMessage(new MessagingException(MessagingException.UNSUPPORTED_PACKET,
@@ -86,26 +86,26 @@
catch (Throwable t)
{
MessagingException me;
-
- log.error("Caught unexpected exception", t);
-
+
+ log.error("Caught unexpected exception", t);
+
if (t instanceof MessagingException)
{
me = (MessagingException)t;
}
else
- {
+ {
me = new MessagingException(MessagingException.INTERNAL_ERROR);
}
-
- response = new MessagingExceptionMessage(me);
+
+ response = new MessagingExceptionMessage(me);
}
if (response != null)
{
response.normalize(packet);
-
- remotingConnection.sendOneWay(response);
+
+ remotingConnection.sendOneWay(response);
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerPacketHandler.java 2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerPacketHandler.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,7 +18,7 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.server.impl;
@@ -33,24 +33,24 @@
import org.jboss.messaging.core.server.ServerProducer;
/**
- *
+ *
* A ServerProducerPacketHandler
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
*/
public class ServerProducerPacketHandler implements PacketHandler
{
private static final Logger log = Logger.getLogger(ServerProducerPacketHandler.class);
-
+
private final ServerProducer producer;
-
+
private final RemotingConnection remotingConnection;
-
+
public ServerProducerPacketHandler(final ServerProducer producer, final RemotingConnection remotingConnection)
{
this.producer = producer;
-
+
this.remotingConnection = remotingConnection;
}
@@ -59,14 +59,14 @@
return producer.getID();
}
- public void handle(final long remotingConnectionID, final Packet packet)
+ public void handle(final Object remotingConnectionID, final Packet packet)
{
Packet response = null;
byte type = packet.getType();
-
+
try
- {
+ {
switch (type)
{
case PacketImpl.PROD_SEND:
@@ -89,26 +89,26 @@
catch (Throwable t)
{
MessagingException me;
-
- log.error("Caught unexpected exception", t);
-
+
+ log.error("Caught unexpected exception", t);
+
if (t instanceof MessagingException)
{
me = (MessagingException)t;
}
else
- {
+ {
me = new MessagingException(MessagingException.INTERNAL_ERROR);
}
-
- response = new MessagingExceptionMessage(me);
+
+ response = new MessagingExceptionMessage(me);
}
if (response != null)
{
response.normalize(packet);
-
- remotingConnection.sendOneWay(response);
+
+ remotingConnection.sendOneWay(response);
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,12 +18,10 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.server.impl;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.NO_ID_SET;
-
import java.util.List;
import javax.transaction.xa.Xid;
@@ -61,9 +59,9 @@
import org.jboss.messaging.core.server.ServerSession;
/**
- *
+ *
* A ServerSessionPacketHandler
- *
+ *
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
@@ -71,15 +69,15 @@
public class ServerSessionPacketHandler implements PacketHandler
{
private static final Logger log = Logger.getLogger(ServerSessionPacketHandler.class);
-
+
private final ServerSession session;
-
+
private final RemotingConnection remotingConnection;
-
+
public ServerSessionPacketHandler(final ServerSession session, final RemotingConnection remotingConnection)
{
this.session = session;
-
+
this.remotingConnection = remotingConnection;
}
@@ -88,20 +86,20 @@
return session.getID();
}
- public void handle(final long remotingConnectionID, final Packet packet)
+ public void handle(final Object remotingConnectionID, final Packet packet)
{
Packet response = null;
byte type = packet.getType();
-
+
try
- {
+ {
switch (type)
{
case PacketImpl.SESS_CREATECONSUMER:
{
SessionCreateConsumerMessage request = (SessionCreateConsumerMessage) packet;
-
+
response = session.createConsumer(request.getClientTargetID(), request.getQueueName(), request.getFilterString(),
request.isNoLocal(), request.isAutoDeleteQueue(),
request.getWindowSize(), request.getMaxRate());
@@ -268,31 +266,31 @@
response = new MessagingExceptionMessage(new MessagingException(MessagingException.UNSUPPORTED_PACKET,
"Unsupported packet " + type));
}
-
+
}
catch (Throwable t)
{
MessagingException me;
-
- log.error("Caught unexpected exception", t);
-
+
+ log.error("Caught unexpected exception", t);
+
if (t instanceof MessagingException)
{
me = (MessagingException)t;
}
else
- {
+ {
me = new MessagingException(MessagingException.INTERNAL_ERROR);
}
-
- response = new MessagingExceptionMessage(me);
+
+ response = new MessagingExceptionMessage(me);
}
if (response != null)
{
response.normalize(packet);
-
- remotingConnection.sendOneWay(response);
+
+ remotingConnection.sendOneWay(response);
}
}
}
Modified: trunk/tests/src/org/jboss/messaging/tests/timing/core/remoting/impl/RemotingHandlerImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/timing/core/remoting/impl/RemotingHandlerImplTest.java 2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/tests/src/org/jboss/messaging/tests/timing/core/remoting/impl/RemotingHandlerImplTest.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -35,9 +35,9 @@
import org.jboss.messaging.tests.util.UnitTestCase;
/**
- *
+ *
* A RemotingHandlerImplTest
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
*/
@@ -48,6 +48,7 @@
private PacketDispatcher dispatcher;
private ExecutorService executorService;
+ @Override
protected void setUp() throws Exception
{
super.setUp();
@@ -57,67 +58,68 @@
buff = new ByteBufferWrapper(ByteBuffer.allocate(1024));
}
+ @Override
protected void tearDown() throws Exception
{
super.tearDown();
handler = null;
buff = null;
}
-
+
public void testScanForFailedConnections() throws Exception
{
final long expirePeriod = 100;
-
+
MessagingBuffer buff1 = new ByteBufferWrapper(ByteBuffer.allocate(1024));
Packet ping1 = new PacketImpl(PacketImpl.PING);
- ping1.encode(buff1);
+ ping1.encode(buff1);
buff1.getInt();
final long connectionID1 = 120912;
-
+
MessagingBuffer buff2 = new ByteBufferWrapper(ByteBuffer.allocate(1024));
Packet ping2 = new PacketImpl(PacketImpl.PING);
- ping2.encode(buff2);
+ ping2.encode(buff2);
buff2.getInt();
final long connectionID2 = 12023;
-
+
MessagingBuffer buff3 = new ByteBufferWrapper(ByteBuffer.allocate(1024));
Packet ping3 = new PacketImpl(PacketImpl.PING);
- ping3.encode(buff3);
+ ping3.encode(buff3);
buff3.getInt();
final long connectionID3 = 1123128;
-
+
MessagingBuffer buff4 = new ByteBufferWrapper(ByteBuffer.allocate(1024));
Packet ping4 = new PacketImpl(PacketImpl.PING);
ping4.encode(buff4);
buff4.getInt();
final long connectionID4 = 127987;
-
- Set<Long> failed = handler.scanForFailedConnections(expirePeriod);
-
+
+ Set<Object> failed = handler.scanForFailedConnections(expirePeriod);
+
assertEquals(0, failed.size());
-
- handler.bufferReceived(connectionID1, buff1);
+
+ handler.bufferReceived(connectionID1, buff1);
handler.bufferReceived(connectionID2, buff2);
handler.bufferReceived(connectionID3, buff3);
handler.bufferReceived(connectionID4, buff4);
-
- failed = handler.scanForFailedConnections(expirePeriod);
+
+ failed = handler.scanForFailedConnections(expirePeriod);
assertEquals(0, failed.size());
-
+
Thread.sleep(expirePeriod + 10);
-
- failed = handler.scanForFailedConnections(expirePeriod);
+
+ failed = handler.scanForFailedConnections(expirePeriod);
assertEquals(4, failed.size());
-
- handler.removeLastPing(connectionID1);
- handler.removeLastPing(connectionID2);
- handler.removeLastPing(connectionID3);
+
+ handler.removeLastPing(connectionID1);
+ handler.removeLastPing(connectionID2);
+ handler.removeLastPing(connectionID3);
handler.removeLastPing(connectionID4);
-
- failed = handler.scanForFailedConnections(expirePeriod);
- assertEquals(0, failed.size());
+
+ failed = handler.scanForFailedConnections(expirePeriod);
+ assertEquals(0, failed.size());
}
-
+
}
Modified: trunk/tests/src/org/jboss/messaging/tests/timing/core/remoting/impl/RemotingServiceImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/timing/core/remoting/impl/RemotingServiceImplTest.java 2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/tests/src/org/jboss/messaging/tests/timing/core/remoting/impl/RemotingServiceImplTest.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -35,9 +35,9 @@
import org.jboss.messaging.tests.util.UnitTestCase;
/**
- *
+ *
* A RemotingServiceImplTest
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
*/
@@ -49,51 +49,51 @@
final long interval = 100;
config.getConnectionParams().setPingInterval(interval);
RemotingServiceImpl remotingService = new RemotingServiceImpl(config);
-
+
RemotingHandler handler = EasyMock.createStrictMock(RemotingHandler.class);
remotingService.setHandler(handler);
-
- Set<Long> failed = new HashSet<Long>();
-
+
+ Set<Object> failed = new HashSet<Object>();
+
EasyMock.expect(handler.scanForFailedConnections((long)(1.5 * interval))).andReturn(failed);
-
+
EasyMock.replay(handler);
-
+
remotingService.start();
-
+
Thread.sleep(interval * 2);
-
+
EasyMock.verify(handler);
-
+
}
-
+
public void testScanForFailedConnectionsFailed() throws Exception
{
ConfigurationImpl config = new ConfigurationImpl();
final long interval = 100;
config.getConnectionParams().setPingInterval(interval);
RemotingServiceImpl remotingService = new RemotingServiceImpl(config);
-
+
RemotingHandler handler = EasyMock.createStrictMock(RemotingHandler.class);
remotingService.setHandler(handler);
-
- Set<Long> failed = new HashSet<Long>();
+
+ Set<Object> failed = new HashSet<Object>();
failed.add(2L);
failed.add(3L);
-
+
EasyMock.expect(handler.scanForFailedConnections((long)(1.5 * interval))).andReturn(failed);
-
+
Connection conn1 = EasyMock.createStrictMock(Connection.class);
Connection conn2 = EasyMock.createStrictMock(Connection.class);
Connection conn3 = EasyMock.createStrictMock(Connection.class);
-
+
EasyMock.expect(conn1.getID()).andStubReturn(1);
EasyMock.expect(conn2.getID()).andStubReturn(2);
EasyMock.expect(conn3.getID()).andStubReturn(3);
-
+
conn2.close();
- conn3.close();
-
+ conn3.close();
+
class Listener implements FailureListener
{
volatile MessagingException me;
@@ -102,39 +102,39 @@
this.me = me;
}
}
-
+
EasyMock.replay(handler, conn1, conn2, conn3);
-
+
remotingService.start();
-
+
remotingService.connectionCreated(conn1);
remotingService.connectionCreated(conn2);
remotingService.connectionCreated(conn3);
-
+
RemotingConnection rc1 = remotingService.getConnection(1);
RemotingConnection rc2 = remotingService.getConnection(2);
RemotingConnection rc3 = remotingService.getConnection(3);
-
+
Listener listener1 = new Listener();
rc1.addFailureListener(listener1);
-
+
Listener listener2 = new Listener();
rc2.addFailureListener(listener2);
-
+
Listener listener3 = new Listener();
rc3.addFailureListener(listener3);
-
+
Thread.sleep(interval * 2);
-
+
EasyMock.verify(handler, conn1, conn2, conn3);
-
+
assertNull(listener1.me);
assertNotNull(listener2.me);
assertNotNull(listener3.me);
-
+
assertEquals(MessagingException.CONNECTION_TIMEDOUT, listener2.me.getCode());
assertEquals(MessagingException.CONNECTION_TIMEDOUT, listener3.me.getCode());
-
+
}
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/MessagingBufferTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/MessagingBufferTestBase.java 2008-08-18 04:41:51 UTC (rev 4806)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/MessagingBufferTestBase.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -18,18 +18,12 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.tests.unit.core.remoting;
-import static org.jboss.messaging.tests.util.RandomUtil.randomByte;
-import static org.jboss.messaging.tests.util.RandomUtil.randomBytes;
-import static org.jboss.messaging.tests.util.RandomUtil.randomDouble;
-import static org.jboss.messaging.tests.util.RandomUtil.randomFloat;
-import static org.jboss.messaging.tests.util.RandomUtil.randomInt;
-import static org.jboss.messaging.tests.util.RandomUtil.randomLong;
-import static org.jboss.messaging.tests.util.RandomUtil.randomString;
-import static org.jboss.messaging.tests.util.UnitTestCase.assertEqualsByteArrays;
+import static org.jboss.messaging.tests.util.RandomUtil.*;
+import static org.jboss.messaging.tests.util.UnitTestCase.*;
import junit.framework.TestCase;
import org.jboss.messaging.core.remoting.MessagingBuffer;
@@ -38,7 +32,7 @@
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
- *
+ *
* @version <tt>$Revision$</tt>
*/
public abstract class MessagingBufferTestBase extends TestCase
@@ -73,7 +67,7 @@
{
assertNull(putAndGetNullableString(null));
}
-
+
public void testEmptyString() throws Exception
{
String result = putAndGetNullableString("");
@@ -85,7 +79,7 @@
public void testNonEmptyString() throws Exception
{
String junk = randomString();
-
+
String result = putAndGetNullableString(junk);
assertNotNull(result);
@@ -96,7 +90,7 @@
{
assertNull(putAndGetNullableSimpleString(null));
}
-
+
public void testEmptySimpleString() throws Exception
{
SimpleString emptySimpleString = new SimpleString("");
@@ -119,53 +113,53 @@
{
byte b = randomByte();
wrapper.putByte(b);
-
+
wrapper.flip();
-
+
assertEquals(b, wrapper.getByte());
}
-
+
public void testUnsignedByte() throws Exception
{
byte b = (byte) 0xff;
wrapper.putByte(b);
-
+
wrapper.flip();
-
+
assertEquals(255, wrapper.getUnsignedByte());
wrapper.rewind();
-
+
b = (byte) 0xf;
wrapper.putByte(b);
-
+
wrapper.flip();
-
+
assertEquals(b, wrapper.getUnsignedByte());
}
-
-
-
+
+
+
public void testBytes() throws Exception
{
byte[] bytes = randomBytes();
wrapper.putBytes(bytes);
-
+
wrapper.flip();
-
+
byte[] b = new byte[bytes.length];
wrapper.getBytes(b);
assertEqualsByteArrays(bytes, b);
}
-
+
public void testBytesWithLength() throws Exception
{
byte[] bytes = randomBytes();
// put only half of the bytes
wrapper.putBytes(bytes, 0, bytes.length / 2);
-
+
wrapper.flip();
-
+
byte[] b = new byte[bytes.length / 2];
wrapper.getBytes(b, 0, b.length);
assertEqualsByteArrays(b.length, bytes, b);
@@ -174,151 +168,151 @@
public void testPutTrueBoolean() throws Exception
{
wrapper.putBoolean(true);
-
+
wrapper.flip();
-
+
assertTrue(wrapper.getBoolean());
}
public void testPutFalseBoolean() throws Exception
{
wrapper.putBoolean(false);
-
+
wrapper.flip();
-
+
assertFalse(wrapper.getBoolean());
}
-
+
public void testChar() throws Exception
{
wrapper.putChar('a');
-
+
wrapper.flip();
-
+
assertEquals('a', wrapper.getChar());
}
-
+
public void testInt() throws Exception
{
int i = randomInt();
wrapper.putInt(i);
-
+
wrapper.flip();
-
+
assertEquals(i, wrapper.getInt());
}
-
+
public void testIntAtPosition() throws Exception
{
int firstInt = randomInt();
int secondInt = randomInt();
-
+
wrapper.putInt(secondInt);
wrapper.putInt(secondInt);
// rewrite firstInt at the beginning
wrapper.putInt(0, firstInt);
wrapper.flip();
-
+
assertEquals(firstInt, wrapper.getInt());
assertEquals(secondInt, wrapper.getInt());
}
-
+
public void testLong() throws Exception
{
long l = randomLong();
wrapper.putLong(l);
-
+
wrapper.flip();
-
+
assertEquals(l, wrapper.getLong());
}
-
+
public void testUnsignedShort() throws Exception
{
short s1 = Short.MAX_VALUE;
-
+
wrapper.putShort(s1);
-
+
wrapper.flip();
-
+
int s2 = wrapper.getUnsignedShort();
-
- assertEquals((int) s1, s2);
-
+
+ assertEquals(s1, s2);
+
wrapper.rewind();
-
+
s1 = Short.MIN_VALUE;
-
+
wrapper.putShort(s1);
-
+
wrapper.flip();
-
+
s2 = wrapper.getUnsignedShort();
-
- assertEquals(((int) s1) * -1, s2);
-
+
+ assertEquals(s1 * -1, s2);
+
wrapper.rewind();
-
+
s1 = -1;
-
+
wrapper.putShort(s1);
-
+
wrapper.flip();
-
+
s2 = wrapper.getUnsignedShort();
-
+
// / The max of an unsigned short
// (http://en.wikipedia.org/wiki/Unsigned_short)
assertEquals(s2, 65535);
}
-
+
public void testShort() throws Exception
{
wrapper.putShort((short) 1);
-
+
wrapper.flip();
-
+
assertEquals((short)1, wrapper.getShort());
}
-
+
public void testDouble() throws Exception
{
double d = randomDouble();
wrapper.putDouble(d);
-
+
wrapper.flip();
-
+
assertEquals(d, wrapper.getDouble());
}
-
+
public void testFloat() throws Exception
{
float f = randomFloat();
wrapper.putFloat(f);
-
+
wrapper.flip();
-
+
assertEquals(f, wrapper.getFloat());
}
-
+
public void testUTF() throws Exception
{
String str = randomString();
wrapper.putUTF(str);
-
+
wrapper.flip();
-
+
assertEquals(str, wrapper.getUTF());
}
-
+
public void testArray() throws Exception
{
byte[] bytes = randomBytes(128);
wrapper.putBytes(bytes);
-
+
wrapper.flip();
-
+
byte[] array = wrapper.array();
assertEquals(wrapper.capacity(), array.length);
assertEqualsByteArrays(128, bytes, wrapper.array());
@@ -328,16 +322,16 @@
{
int i = randomInt();
wrapper.putInt(i);
-
+
wrapper.flip();
-
+
assertEquals(i, wrapper.getInt());
-
+
wrapper.rewind();
assertEquals(i, wrapper.getInt());
}
-
+
public void testRemaining() throws Exception
{
int capacity = wrapper.capacity();
@@ -347,22 +341,22 @@
int fill = capacity / 3;
byte[] bytes = randomBytes(fill);
wrapper.putBytes(bytes);
-
+
// check the remaining is 2/3
assertEquals(capacity - fill, wrapper.remaining());
}
-
+
public void testPosition() throws Exception
{
assertEquals(0, wrapper.position());
-
+
byte[] bytes = randomBytes(128);
wrapper.putBytes(bytes);
-
+
assertEquals(bytes.length, wrapper.position());
-
+
wrapper.position(0);
- assertEquals(0, wrapper.position());
+ assertEquals(0, wrapper.position());
}
public void testLimit() throws Exception
@@ -372,12 +366,12 @@
byte[] bytes = randomBytes(128);
wrapper.putBytes(bytes);
- assertEquals(wrapper.capacity(), wrapper.limit());
-
+ assertTrue(wrapper.limit() >= bytes.length);
+
wrapper.limit(128);
assertEquals(128, wrapper.limit());
}
-
+
public void testSlice() throws Exception
{
byte[] bytes = randomBytes(128);
@@ -385,16 +379,16 @@
wrapper.position(0);
wrapper.limit(128);
-
+
MessagingBuffer slicedBuffer = wrapper.slice();
assertEquals(128, slicedBuffer.capacity());
-
+
byte[] slicedBytes = new byte[128];
slicedBuffer.getBytes(slicedBytes);
-
+
assertEqualsByteArrays(bytes, slicedBytes);
}
-
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -406,18 +400,18 @@
wrapper.putNullableString(nullableString);
wrapper.flip();
-
+
return wrapper.getNullableString();
}
-
+
private SimpleString putAndGetNullableSimpleString(SimpleString nullableSimpleString) throws Exception
{
wrapper.putNullableSimpleString(nullableSimpleString);
wrapper.flip();
-
+
return wrapper.getNullableSimpleString();
}
-
+
// Inner classes -------------------------------------------------
}
Added: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/ChannelBufferWrapper2Test.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/ChannelBufferWrapper2Test.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/ChannelBufferWrapper2Test.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -0,0 +1,65 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.unit.core.remoting.impl.netty;
+
+import org.jboss.messaging.core.remoting.MessagingBuffer;
+import org.jboss.messaging.core.remoting.impl.netty.ChannelBufferWrapper;
+import org.jboss.messaging.tests.unit.core.remoting.MessagingBufferTestBase;
+
+/**
+ * Same as ChannelBufferWrapperTest, but using a different constructor
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:csuconic at redhat.com">Clebert Suconic</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class ChannelBufferWrapper2Test extends MessagingBufferTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // BufferWrapperBase overrides -----------------------------------
+
+ @Override
+ protected MessagingBuffer createBuffer()
+ {
+ return new ChannelBufferWrapper(512);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Property changes on: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/ChannelBufferWrapper2Test.java
___________________________________________________________________
Name: svn:keywords
+ Rev Date
Name: svn:eol-style
+ native
Added: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/ChannelBufferWrapperTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/ChannelBufferWrapperTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/ChannelBufferWrapperTest.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -0,0 +1,67 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.unit.core.remoting.impl.netty;
+
+import org.jboss.messaging.core.remoting.MessagingBuffer;
+import org.jboss.messaging.core.remoting.impl.netty.ChannelBufferWrapper;
+import org.jboss.messaging.tests.unit.core.remoting.MessagingBufferTestBase;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class ChannelBufferWrapperTest extends MessagingBufferTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // BufferWrapperBase overrides -----------------------------------
+
+ @Override
+ protected MessagingBuffer createBuffer()
+ {
+ ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(512);
+ buffer.writerIndex(buffer.capacity());
+ return new ChannelBufferWrapper(buffer);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Property changes on: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/ChannelBufferWrapperTest.java
___________________________________________________________________
Name: svn:keywords
+ Rev Date
Name: svn:eol-style
+ native
Added: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -0,0 +1,55 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.unit.core.remoting.impl.netty;
+
+import org.easymock.EasyMock;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.remoting.ConnectionLifeCycleListener;
+import org.jboss.messaging.core.remoting.RemotingHandler;
+import org.jboss.messaging.core.remoting.impl.netty.NettyAcceptor;
+import org.jboss.messaging.core.remoting.impl.netty.NettyAcceptorFactory;
+import org.jboss.messaging.core.remoting.spi.Acceptor;
+import org.jboss.messaging.tests.util.UnitTestCase;
+
+/**
+ *
+ * A MinaAcceptorFactoryTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class NettyAcceptorFactoryTest extends UnitTestCase
+{
+ public void testCreateAcceptor() throws Exception
+ {
+ NettyAcceptorFactory factory = new NettyAcceptorFactory();
+
+ RemotingHandler handler = EasyMock.createStrictMock(RemotingHandler.class);
+ Configuration config = new ConfigurationImpl();
+ ConnectionLifeCycleListener listener = EasyMock.createStrictMock(ConnectionLifeCycleListener.class);
+
+ Acceptor acceptor = factory.createAcceptor(config, handler, listener);
+
+ assertTrue(acceptor instanceof NettyAcceptor);
+ }
+}
Property changes on: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
___________________________________________________________________
Name: svn:keywords
+ Rev Date
Name: svn:eol-style
+ native
Added: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -0,0 +1,55 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.unit.core.remoting.impl.netty;
+
+import org.easymock.EasyMock;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.remoting.ConnectionLifeCycleListener;
+import org.jboss.messaging.core.remoting.RemotingHandler;
+import org.jboss.messaging.core.remoting.impl.netty.NettyAcceptor;
+import org.jboss.messaging.tests.util.UnitTestCase;
+
+/**
+ *
+ * A MinaAcceptorTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class NettyAcceptorTest extends UnitTestCase
+{
+ public void testStartStop() throws Exception
+ {
+ RemotingHandler handler = EasyMock.createStrictMock(RemotingHandler.class);
+ Configuration config = new ConfigurationImpl();
+ ConnectionLifeCycleListener listener = EasyMock.createStrictMock(ConnectionLifeCycleListener.class);
+ NettyAcceptor acceptor = new NettyAcceptor(config, handler, listener);
+
+ acceptor.start();
+ acceptor.stop();
+ acceptor.start();
+ acceptor.stop();
+
+ }
+}
Property changes on: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
___________________________________________________________________
Name: svn:keywords
+ Rev Date
Name: svn:eol-style
+ native
Added: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -0,0 +1,96 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.unit.core.remoting.impl.netty;
+
+import java.util.UUID;
+
+import org.easymock.EasyMock;
+import org.jboss.messaging.core.remoting.MessagingBuffer;
+import org.jboss.messaging.core.remoting.impl.netty.NettyConnection;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.netty.channel.Channel;
+
+/**
+ *
+ * A NettyConnectionTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class NettyConnectionTest extends UnitTestCase
+{
+ public void testGetID() throws Exception
+ {
+ Channel channel = EasyMock.createStrictMock(Channel.class);
+
+ final UUID id = UUID.randomUUID();
+
+ EasyMock.expect(channel.getId()).andReturn(id);
+
+ NettyConnection conn = new NettyConnection(channel);
+
+ EasyMock.replay(channel);
+
+ assertEquals(id, conn.getID());
+
+ EasyMock.verify(channel);
+ }
+
+ public void testWrite() throws Exception
+ {
+ Channel channel = EasyMock.createStrictMock(Channel.class);
+
+ final Object underlying = new Object();
+
+ MessagingBuffer buff = EasyMock.createStrictMock(MessagingBuffer.class);
+
+ EasyMock.expect(buff.getUnderlyingBuffer()).andReturn(underlying);
+
+ EasyMock.expect(channel.write(underlying)).andReturn(null);
+
+ NettyConnection conn = new NettyConnection(channel);
+
+ EasyMock.replay(channel, buff);
+
+ conn.write(buff);
+
+ EasyMock.verify(channel, buff);
+ }
+
+ public void testCreateBuffer() throws Exception
+ {
+ Channel channel = EasyMock.createStrictMock(Channel.class);
+
+ NettyConnection conn = new NettyConnection(channel);
+
+ EasyMock.replay(channel);
+
+ final int size = 1234;
+
+ MessagingBuffer buff = conn.createBuffer(size);
+ buff.putByte((byte) 0x00); // Netty buffer does lazy initialization.
+ assertEquals(size, buff.capacity());
+
+ EasyMock.verify(channel);
+ }
+
+}
Property changes on: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
___________________________________________________________________
Name: svn:keywords
+ Rev Date
Name: svn:eol-style
+ native
Added: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java 2008-08-18 06:38:52 UTC (rev 4807)
@@ -0,0 +1,119 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.unit.core.remoting.impl.netty;
+
+import org.easymock.EasyMock;
+import org.jboss.messaging.core.client.ConnectionParams;
+import org.jboss.messaging.core.client.Location;
+import org.jboss.messaging.core.client.impl.ConnectionParamsImpl;
+import org.jboss.messaging.core.client.impl.LocationImpl;
+import org.jboss.messaging.core.remoting.ConnectionLifeCycleListener;
+import org.jboss.messaging.core.remoting.RemotingHandler;
+import org.jboss.messaging.core.remoting.TransportType;
+import org.jboss.messaging.core.remoting.impl.mina.MinaConnector;
+import org.jboss.messaging.tests.util.UnitTestCase;
+
+/**
+ *
+ * A MinaConnectorTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class NettyConnectorTest extends UnitTestCase
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testStartStop() throws Exception
+ {
+ RemotingHandler handler = EasyMock.createStrictMock(RemotingHandler.class);
+ ConnectionParams params = new ConnectionParamsImpl();
+ Location location = new LocationImpl(TransportType.TCP, "blah", 1234);
+ ConnectionLifeCycleListener listener = EasyMock.createStrictMock(ConnectionLifeCycleListener.class);
+
+ MinaConnector connector = new MinaConnector(location, params, handler, listener);
+
+ connector.start();
+ connector.close();
+ }
+
+ public void testNullParams() throws Exception
+ {
+ RemotingHandler handler = EasyMock.createStrictMock(RemotingHandler.class);
+ ConnectionParams params = new ConnectionParamsImpl();
+ Location location = new LocationImpl(TransportType.TCP, "blah", 1234);
+ ConnectionLifeCycleListener listener = EasyMock.createStrictMock(ConnectionLifeCycleListener.class);
+
+ try
+ {
+ new MinaConnector(null, params, handler, listener);
+
+ fail("Should throw Exception");
+ }
+ catch (IllegalArgumentException e)
+ {
+ //Ok
+ }
+
+ try
+ {
+ new MinaConnector(location, null, handler, listener);
+
+ fail("Should throw Exception");
+ }
+ catch (IllegalArgumentException e)
+ {
+ //Ok
+ }
+
+ try
+ {
+ new MinaConnector(location, params, null, listener);
+
+ fail("Should throw Exception");
+ }
+ catch (IllegalArgumentException e)
+ {
+ //Ok
+ }
+
+ try
+ {
+ new MinaConnector(location, params, handler, null);
+
+ fail("Should throw Exception");
+ }
+ catch (IllegalArgumentException e)
+ {
+ //Ok
+ }
+ }
+}
Property changes on: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
___________________________________________________________________
Name: svn:keywords
+ Rev Date
Name: svn:eol-style
+ native
More information about the jboss-cvs-commits
mailing list