[jboss-cvs] JBoss Messaging SVN: r4427 - in trunk: src/main/org/jboss/messaging/core/remoting and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jun 11 08:06:27 EDT 2008
Author: jmesnil
Date: 2008-06-11 08:06:26 -0400 (Wed, 11 Jun 2008)
New Revision: 4427
Added:
trunk/src/main/org/jboss/messaging/core/remoting/ResponseHandler.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/ResponseHandlerImpl.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/ping/
trunk/tests/src/org/jboss/messaging/tests/unit/core/ping/impl/
trunk/tests/src/org/jboss/messaging/tests/unit/core/ping/impl/PingerImplTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/ping/impl/PingerImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/RemotingServiceImpl.java
Log:
added unit tests for PingerImpl
factorized code from PingerImpl's PongHandler and RemotingConnection's ResponseHandler in ResponseHandlerImpl class
Modified: trunk/src/main/org/jboss/messaging/core/ping/impl/PingerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/ping/impl/PingerImpl.java 2008-06-11 11:04:37 UTC (rev 4426)
+++ trunk/src/main/org/jboss/messaging/core/ping/impl/PingerImpl.java 2008-06-11 12:06:26 UTC (rev 4427)
@@ -31,7 +31,7 @@
private final NIOSession session;
- private final PongHandler pongHandler;
+ private final ResponseHandler pongHandler;
private final long pongTimeout;
@@ -39,20 +39,18 @@
public PingerImpl(final PacketDispatcher dispatcher, final NIOSession session, final long pongTimeout,
- final CleanUpNotifier failureHandler)
+ final ResponseHandler pongHandler, final CleanUpNotifier failureHandler)
{
this.dispatcher = dispatcher;
this.session = session;
- long handlerID = dispatcher.generateID();
-
this.pongTimeout = pongTimeout;
this.failureHandler = failureHandler;
- pongHandler = new PongHandler(handlerID);
-
+ this.pongHandler = pongHandler;
+
dispatcher.register(pongHandler);
}
@@ -68,7 +66,7 @@
ping.setTargetID(0);
ping.setExecutorID(session.getID());
ping.setResponseTargetID(pongHandler.getID());
- pongHandler.response = null;
+ pongHandler.reset();
try
{
if (isTraceEnabled)
@@ -82,13 +80,16 @@
log.error("Caught unexpected exception", e);
failureHandler.fireCleanup(session.getID(), new MessagingException(MessagingException.CONNECTION_TIMEDOUT, e.getMessage()));
+ return;
}
+
//now we have sent a ping, wait for a pong
Packet response = pongHandler.waitForResponse(pongTimeout);
if (response == null)
{
failureHandler.fireCleanup(session.getID(), new MessagingException(MessagingException.CONNECTION_TIMEDOUT, "no pong received"));
+ return;
}
else
{
@@ -106,80 +107,4 @@
}
}
}
-
- //TODO - duplicated from RemotingConnectionImpl - TODO combine
- private static class PongHandler implements PacketHandler
- {
- private long id;
-
- private Packet response;
-
- private boolean failed;
-
- PongHandler(final long id)
- {
- this.id = id;
- }
-
- public long getID()
- {
- return id;
- }
-
- public synchronized void setFailed()
- {
- failed = true;
- }
-
- public synchronized void handle(final Packet packet, final PacketReturner sender)
- {
- if (failed)
- {
- //Ignore any responses that come back after we timed out
- return;
- }
-
- this.response = packet;
-
- notify();
- }
-
- public synchronized Packet waitForResponse(final long timeout)
- {
- if (failed)
- {
- throw new IllegalStateException("Cannot wait for response - pinger has failed");
- }
-
- long toWait = timeout;
- long start = System.currentTimeMillis();
-
- while (response == null && toWait > 0)
- {
- try
- {
- wait(toWait);
- }
- catch (InterruptedException e)
- {
- }
-
- long now = System.currentTimeMillis();
-
- toWait -= now - start;
-
- start = now;
- }
-
- if (response == null)
- {
- failed = true;
- }
-
- return response;
- }
-
- }
-
-
}
Added: trunk/src/main/org/jboss/messaging/core/remoting/ResponseHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/ResponseHandler.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/ResponseHandler.java 2008-06-11 12:06:26 UTC (rev 4427)
@@ -0,0 +1,23 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting;
+
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public interface ResponseHandler extends PacketHandler
+{
+ Packet waitForResponse(final long timeout);
+
+ void reset();
+
+ void setFailed();
+}
\ No newline at end of file
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-06-11 11:04:37 UTC (rev 4426)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-06-11 12:06:26 UTC (rev 4427)
@@ -132,7 +132,7 @@
long handlerID = connector.getDispatcher().generateID();
- ResponseHandler handler = new ResponseHandler(handlerID);
+ ResponseHandlerImpl handler = new ResponseHandlerImpl(handlerID);
connector.getDispatcher().register(handler);
@@ -230,56 +230,6 @@
// Private --------------------------------------------------------------------------------------
- private static class ResponseHandler implements PacketHandler
- {
- private long id;
-
- private Packet response;
-
- ResponseHandler(final long id)
- {
- this.id = id;
- }
-
- public long getID()
- {
- return id;
- }
-
- public synchronized void handle(final Packet packet, final PacketReturner sender)
- {
- this.response = packet;
-
- notify();
- }
-
- public synchronized Packet waitForResponse(final long timeout)
- {
- long toWait = timeout;
- long start = System.currentTimeMillis();
-
- while (response == null && toWait > 0)
- {
- try
- {
- wait(toWait);
- }
- catch (InterruptedException e)
- {
- }
-
- long now = System.currentTimeMillis();
-
- toWait -= now - start;
-
- start = now;
- }
-
- return response;
- }
-
- }
-
private void checkConnected() throws MessagingException
{
if (session == null)
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/ResponseHandlerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ResponseHandlerImpl.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/ResponseHandlerImpl.java 2008-06-11 12:06:26 UTC (rev 4427)
@@ -0,0 +1,91 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.impl;
+
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.PacketReturner;
+import org.jboss.messaging.core.remoting.ResponseHandler;
+
+public class ResponseHandlerImpl implements ResponseHandler
+{
+ private final long id;
+
+ private Packet response;
+
+ private boolean failed = false;
+
+ public ResponseHandlerImpl(final long id)
+ {
+ this.id = id;
+ }
+
+ public long getID()
+ {
+ return id;
+ }
+
+ public synchronized void handle(final Packet packet, final PacketReturner sender)
+ {
+ if (failed)
+ {
+ //Ignore any responses that come back after we timed out
+ return;
+ }
+
+ this.response = packet;
+
+ notify();
+ }
+
+ public void setFailed()
+ {
+ failed = true;
+ }
+
+ public synchronized Packet waitForResponse(final long timeout)
+ {
+ if (failed)
+ {
+ throw new IllegalStateException("Cannot wait for response - handler has failed");
+ }
+
+ long toWait = timeout;
+ long start = System.currentTimeMillis();
+
+ while (response == null && toWait > 0)
+ {
+ try
+ {
+ wait(toWait);
+ }
+ catch (InterruptedException e)
+ {
+ }
+
+ long now = System.currentTimeMillis();
+
+ toWait -= now - start;
+
+ start = now;
+ }
+
+
+ if (response == null)
+ {
+ failed = true;
+ }
+
+ return response;
+ }
+
+
+ public void reset()
+ {
+ response = null;
+ }
+
+}
\ No newline at end of file
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2008-06-11 11:04:37 UTC (rev 4426)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2008-06-11 12:06:26 UTC (rev 4427)
@@ -6,7 +6,26 @@
*/
package org.jboss.messaging.core.remoting.impl.mina;
-import org.apache.mina.common.*;
+import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addCodecFilter;
+import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addSSLFilter;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.mina.common.CloseFuture;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.DefaultIoFilterChainBuilder;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoService;
+import org.apache.mina.common.IoServiceListener;
+import org.apache.mina.common.IoSession;
import org.apache.mina.filter.ssl.SslFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import org.jboss.messaging.core.client.ConnectionParams;
@@ -17,18 +36,18 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.ping.Pinger;
import org.jboss.messaging.core.ping.impl.PingerImpl;
-import org.jboss.messaging.core.remoting.*;
-import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addCodecFilter;
-import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addSSLFilter;
+import org.jboss.messaging.core.remoting.NIOConnector;
+import org.jboss.messaging.core.remoting.NIOSession;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketReturner;
+import org.jboss.messaging.core.remoting.ResponseHandler;
+import org.jboss.messaging.core.remoting.TransportType;
+import org.jboss.messaging.core.remoting.impl.ResponseHandlerImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.*;
-
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
* @version <tt>$Revision$</tt>
@@ -195,7 +214,8 @@
*/
if (connectionParams.getKeepAliveInterval() > 0 && location.getTransport() == TransportType.TCP)
{
- Pinger pinger = new PingerImpl(dispatcher, minaSession, connectionParams.getKeepAliveTimeout(), this);
+ ResponseHandler pongHandler = new ResponseHandlerImpl(dispatcher.generateID());
+ Pinger pinger = new PingerImpl(dispatcher, minaSession, connectionParams.getKeepAliveTimeout(), pongHandler, this);
scheduledExecutor.scheduleAtFixedRate(pinger, connectionParams.getKeepAliveInterval(), connectionParams.getKeepAliveInterval(), TimeUnit.MILLISECONDS);
}
return minaSession;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/RemotingServiceImpl.java 2008-06-11 11:04:37 UTC (rev 4426)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/RemotingServiceImpl.java 2008-06-11 12:06:26 UTC (rev 4427)
@@ -20,8 +20,13 @@
import org.jboss.messaging.core.remoting.Interceptor;
import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.RemotingService;
+import org.jboss.messaging.core.remoting.ResponseHandler;
+
import static org.jboss.messaging.core.remoting.TransportType.INVM;
import org.jboss.messaging.core.remoting.impl.PacketDispatcherImpl;
+import org.jboss.messaging.core.remoting.impl.ResponseHandlerImpl;
+import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
+
import static org.jboss.messaging.core.remoting.impl.RemotingConfigurationValidator.validate;
import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addCodecFilter;
import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addSSLFilter;
@@ -277,7 +282,8 @@
//register pinger
if (config.getKeepAliveInterval() > 0)
{
- Pinger pinger = new PingerImpl(getDispatcher(), new MinaSession(session, null), config.getKeepAliveTimeout(), RemotingServiceImpl.this);
+ ResponseHandler pongHandler = new ResponseHandlerImpl(dispatcher.generateID());
+ Pinger pinger = new PingerImpl(getDispatcher(), new MinaSession(session, null), config.getKeepAliveTimeout(), pongHandler, RemotingServiceImpl.this);
ScheduledFuture<?> future = scheduledExecutor.scheduleAtFixedRate(pinger, config.getKeepAliveInterval(), config.getKeepAliveInterval(), TimeUnit.MILLISECONDS);
currentScheduledPingers.put(session, future);
currentPingers.put(session, pinger);
Added: trunk/tests/src/org/jboss/messaging/tests/unit/core/ping/impl/PingerImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/ping/impl/PingerImplTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/ping/impl/PingerImplTest.java 2008-06-11 12:06:26 UTC (rev 4427)
@@ -0,0 +1,221 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.tests.unit.core.ping.impl;
+
+import static org.easymock.EasyMock.anyLong;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.jboss.messaging.tests.util.RandomUtil.randomLong;
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.ping.Pinger;
+import org.jboss.messaging.core.ping.impl.PingerImpl;
+import org.jboss.messaging.core.remoting.NIOSession;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.ResponseHandler;
+import org.jboss.messaging.core.remoting.impl.mina.CleanUpNotifier;
+import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
+import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class PingerImplTest extends TestCase
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testClose() throws Exception
+ {
+ long timeout = 500;
+
+ NIOSession session = createMock(NIOSession.class);
+
+ ResponseHandler pongHandler = createMock(ResponseHandler.class);
+ long handlerID = randomLong();
+ expect(pongHandler.getID()).andReturn(handlerID);
+
+ PacketDispatcher dispatcher = createMock(PacketDispatcher.class);
+ dispatcher.register(pongHandler);
+ expectLastCall().once();
+ dispatcher.unregister(handlerID);
+
+ CleanUpNotifier failureNotifier = createMock(CleanUpNotifier.class);
+
+ replay(dispatcher, session, pongHandler, failureNotifier);
+
+ Pinger pinger = new PingerImpl(dispatcher, session, timeout, pongHandler , failureNotifier);
+ pinger.close();
+
+ verify(dispatcher, session, pongHandler, failureNotifier);
+ }
+
+ public void testPingSuccess() throws Exception
+ {
+ long timeout = 500;
+
+ long sessionID = randomLong();
+ NIOSession session = createMock(NIOSession.class);
+ expect(session.getID()).andStubReturn(sessionID);
+
+ Ping ping = new Ping(sessionID);
+ session.write(ping);
+ expectLastCall().once();
+
+ Pong pong = new Pong(sessionID, false);
+ ResponseHandler pongHandler = createMock(ResponseHandler.class);
+ long handlerID = randomLong();
+ expect(pongHandler.getID()).andReturn(handlerID);
+ pongHandler.reset();
+ expectLastCall().once();
+ expect(pongHandler.waitForResponse(timeout)).andReturn(pong);
+
+ PacketDispatcher dispatcher = createMock(PacketDispatcher.class);
+ dispatcher.register(pongHandler);
+ expectLastCall().once();
+
+ CleanUpNotifier failureNotifier = createMock(CleanUpNotifier.class);
+
+ replay(dispatcher, session, pongHandler, failureNotifier);
+
+ Pinger pinger = new PingerImpl(dispatcher, session, timeout, pongHandler , failureNotifier);
+ pinger.run();
+
+ verify(dispatcher, session, pongHandler, failureNotifier);
+ }
+
+ public void testPingFailureWithPongFailed() throws Exception
+ {
+ long timeout = 500;
+
+ long sessionID = randomLong();
+ NIOSession session = createMock(NIOSession.class);
+ expect(session.getID()).andStubReturn(sessionID);
+
+ Ping ping = new Ping(sessionID);
+ session.write(ping);
+ expectLastCall().once();
+
+ Pong pong = new Pong(sessionID, true);
+ ResponseHandler pongHandler = createMock(ResponseHandler.class);
+ long handlerID = randomLong();
+ expect(pongHandler.getID()).andReturn(handlerID);
+ pongHandler.reset();
+ expectLastCall().once();
+ pongHandler.setFailed();
+ expectLastCall().once();
+ expect(pongHandler.waitForResponse(timeout)).andReturn(pong);
+
+ PacketDispatcher dispatcher = createMock(PacketDispatcher.class);
+ dispatcher.register(pongHandler);
+ expectLastCall().once();
+
+ CleanUpNotifier failureNotifier = createMock(CleanUpNotifier.class);
+ failureNotifier.fireCleanup(anyLong(), (MessagingException) anyObject());
+ expectLastCall().once();
+
+ replay(dispatcher, session, pongHandler, failureNotifier);
+
+ Pinger pinger = new PingerImpl(dispatcher, session, timeout, pongHandler , failureNotifier);
+ pinger.run();
+
+ verify(dispatcher, session, pongHandler, failureNotifier);
+ }
+
+ public void testWritePingFailure() throws Exception
+ {
+ long timeout = 500;
+
+ long sessionID = randomLong();
+ NIOSession session = createMock(NIOSession.class);
+ expect(session.getID()).andStubReturn(sessionID);
+
+ session.write((Packet) anyObject());
+ expectLastCall().andThrow(new Exception());
+
+ ResponseHandler pongHandler = createMock(ResponseHandler.class);
+ long handlerID = randomLong();
+ expect(pongHandler.getID()).andReturn(handlerID);
+ pongHandler.reset();
+ expectLastCall().once();
+
+ PacketDispatcher dispatcher = createMock(PacketDispatcher.class);
+ dispatcher.register(pongHandler);
+ expectLastCall().once();
+
+ CleanUpNotifier failureNotifier = createMock(CleanUpNotifier.class);
+ failureNotifier.fireCleanup(anyLong(), (MessagingException) anyObject());
+ expectLastCall().once();
+
+ replay(dispatcher, session, pongHandler, failureNotifier);
+
+ Pinger pinger = new PingerImpl(dispatcher, session, timeout, pongHandler , failureNotifier);
+ pinger.run();
+
+ verify(dispatcher, session, pongHandler, failureNotifier);
+ }
+
+ public void testPingFailure() throws Exception
+ {
+ long timeout = 500;
+
+ long sessionID = randomLong();
+ NIOSession session = createMock(NIOSession.class);
+ expect(session.getID()).andStubReturn(sessionID);
+
+ Ping ping = new Ping(sessionID);
+ session.write(ping);
+ expectLastCall().once();
+
+ ResponseHandler pongHandler = createMock(ResponseHandler.class);
+ long handlerID = randomLong();
+ expect(pongHandler.getID()).andReturn(handlerID);
+ pongHandler.reset();
+ expectLastCall().once();
+ expect(pongHandler.waitForResponse(timeout)).andReturn(null);
+
+ PacketDispatcher dispatcher = createMock(PacketDispatcher.class);
+ dispatcher.register(pongHandler);
+ expectLastCall().once();
+
+ CleanUpNotifier failureNotifier = createMock(CleanUpNotifier.class);
+ failureNotifier.fireCleanup(anyLong(), (MessagingException) anyObject());
+ expectLastCall().once();
+
+ replay(dispatcher, session, pongHandler, failureNotifier);
+
+ Pinger pinger = new PingerImpl(dispatcher, session, timeout, pongHandler , failureNotifier);
+ pinger.run();
+
+ verify(dispatcher, session, pongHandler, failureNotifier);
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
More information about the jboss-cvs-commits
mailing list