JBoss Remoting SVN: r5331 - remoting2/branches/2.2/src/tests/org/jboss/test/remoting/transport/bisocket.
by jboss-remoting-commits@lists.jboss.org
Author: ron.sigal(a)jboss.com
Date: 2009-08-14 16:59:58 -0400 (Fri, 14 Aug 2009)
New Revision: 5331
Added:
remoting2/branches/2.2/src/tests/org/jboss/test/remoting/transport/bisocket/BisocketControlConnectionReplacementTestCase.java
Log:
JBREM-1147: New unit test.
Added: remoting2/branches/2.2/src/tests/org/jboss/test/remoting/transport/bisocket/BisocketControlConnectionReplacementTestCase.java
===================================================================
--- remoting2/branches/2.2/src/tests/org/jboss/test/remoting/transport/bisocket/BisocketControlConnectionReplacementTestCase.java (rev 0)
+++ remoting2/branches/2.2/src/tests/org/jboss/test/remoting/transport/bisocket/BisocketControlConnectionReplacementTestCase.java 2009-08-14 20:59:58 UTC (rev 5331)
@@ -0,0 +1,486 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.remoting.transport.bisocket;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.management.MBeanServer;
+import javax.net.ServerSocketFactory;
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.jboss.logging.XLevel;
+import org.jboss.remoting.Client;
+import org.jboss.remoting.InvocationRequest;
+import org.jboss.remoting.InvokerLocator;
+import org.jboss.remoting.ServerInvocationHandler;
+import org.jboss.remoting.ServerInvoker;
+import org.jboss.remoting.callback.Callback;
+import org.jboss.remoting.callback.DefaultCallbackErrorHandler;
+import org.jboss.remoting.callback.HandleCallbackException;
+import org.jboss.remoting.callback.InvokerCallbackHandler;
+import org.jboss.remoting.transport.Connector;
+import org.jboss.remoting.transport.PortUtil;
+import org.jboss.remoting.transport.bisocket.Bisocket;
+
+
+/**
+ * Unit test for JBREM-1147.
+ *
+ * @author <a href="ron.sigal(a)jboss.com">Ron Sigal</a>
+ * @version
+ * <p>
+ * Copyright Aug 14, 2009
+ * </p>
+ */
+public class BisocketControlConnectionReplacementTestCase extends TestCase
+{
+ private static Logger log = Logger.getLogger(BisocketControlConnectionReplacementTestCase.class);
+
+ protected static boolean firstTime = true;
+ protected static int secondaryServerSocketPort;
+ protected static int numberOfCallbacks = 10;
+ protected static Object lock = new Object();
+ protected static TestCallbackHandler testCallbackHandler;
+
+ protected String host;
+ protected int port;
+ protected String locatorURI;
+ protected InvokerLocator serverLocator;
+ protected Connector connector;
+ protected TestInvocationHandler invocationHandler;
+
+
+ public void setUp() throws Exception
+ {
+ if (firstTime)
+ {
+ firstTime = false;
+ Logger.getLogger("org.jboss.remoting").setLevel(XLevel.DEBUG);
+ Logger.getLogger("org.jboss.test.remoting").setLevel(Level.INFO);
+ String pattern = "[%d{ABSOLUTE}] [%t] %5p (%F:%L) - %m%n";
+ PatternLayout layout = new PatternLayout(pattern);
+ ConsoleAppender consoleAppender = new ConsoleAppender(layout);
+ Logger.getRootLogger().addAppender(consoleAppender);
+ }
+ }
+
+
+ public void tearDown()
+ {
+ }
+
+
+ public void testCreateSocketWithReplacedControlConnection() throws Throwable
+ {
+ log.info("entering " + getName());
+
+ // Start server.
+ setupServer();
+
+ // Create client.
+ InvokerLocator clientLocator = new InvokerLocator(locatorURI);
+ HashMap clientConfig = new HashMap();
+ clientConfig.put(InvokerLocator.FORCE_REMOTE, "true");
+ addExtraClientConfig(clientConfig);
+ Client client = new Client(clientLocator, clientConfig);
+ client.connect();
+ log.info("client is connected");
+
+ // Test connections.
+ assertEquals("abc", client.invoke("abc"));
+ log.info("connection is good");
+
+ // Add callback handler.
+ testCallbackHandler = new TestCallbackHandler();
+ HashMap metadata = new HashMap();
+ metadata.put(Bisocket.IS_CALLBACK_SERVER, "true");
+ client.addListener(testCallbackHandler, metadata);
+
+ synchronized (lock)
+ {
+ lock.wait(120000);
+ }
+
+ assertEquals(numberOfCallbacks, testCallbackHandler.counter);
+ assertEquals(numberOfCallbacks - 1, testCallbackHandler.max);
+
+ client.removeListener(testCallbackHandler);
+ client.disconnect();
+ shutdownServer();
+ log.info(getName() + " PASSES");
+ }
+
+
+ protected String getTransport()
+ {
+ return "bisocket";
+ }
+
+
+ protected String getServerSocketName()
+ {
+ return TestServerSocketFactory.class.getName();
+ }
+
+
+ protected void addExtraClientConfig(Map config) {}
+ protected void addExtraServerConfig(Map config) {}
+
+
+ protected void setupServer() throws Exception
+ {
+ host = InetAddress.getLocalHost().getHostAddress();
+ port = PortUtil.findFreePort(host);
+ locatorURI = getTransport() + "://" + host + ":" + port;
+ locatorURI += "/?" + Bisocket.PING_FREQUENCY + "=2000";
+ locatorURI += "&" + DefaultCallbackErrorHandler.CALLBACK_ERRORS_ALLOWED + "=100";
+ String metadata = System.getProperty("remoting.metadata");
+ if (metadata != null)
+ {
+ locatorURI += "&" + metadata;
+ }
+ serverLocator = new InvokerLocator(locatorURI);
+ log.info("Starting remoting server with locator uri of: " + locatorURI);
+ HashMap config = new HashMap();
+ config.put(InvokerLocator.FORCE_REMOTE, "true");
+ secondaryServerSocketPort = PortUtil.findFreePort(host);
+ config.put(Bisocket.SECONDARY_BIND_PORT, Integer.toString(secondaryServerSocketPort));
+ config.put(ServerInvoker.SERVER_SOCKET_FACTORY, getServerSocketName());
+ addExtraServerConfig(config);
+ connector = new Connector(serverLocator, config);
+ connector.create();
+ invocationHandler = new TestInvocationHandler();
+ connector.addInvocationHandler("test", invocationHandler);
+ connector.start();
+ }
+
+
+ protected void shutdownServer() throws Exception
+ {
+ if (connector != null)
+ connector.stop();
+ }
+
+
+ static class TestInvocationHandler implements ServerInvocationHandler
+ {
+ int counter;
+
+ public void addListener(final InvokerCallbackHandler callbackHandler)
+ {
+ if (counter++ > 0)
+ return;
+
+ new Thread()
+ {
+ public void run()
+ {
+ for (int i = 0; i < 10 * numberOfCallbacks; i++)
+ {
+ try
+ {
+ if (testCallbackHandler.counter >= numberOfCallbacks)
+ {
+ return;
+ }
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e)
+ {
+ log.error("Unexpected interrupt", e);
+ }
+ log.info("sending callback: " + i);
+ callbackHandler.handleCallback(new Callback(Integer.toString(i)));
+ log.info("sent callback: " + i);
+ }
+ catch (HandleCallbackException e)
+ {
+ log.error("Callback error", e);
+ }
+ }
+ }
+ }.start();
+ }
+ public Object invoke(final InvocationRequest invocation) throws Throwable
+ {
+ return invocation.getParameter();
+ }
+ public void removeListener(InvokerCallbackHandler callbackHandler) {}
+ public void setMBeanServer(MBeanServer server) {}
+ public void setInvoker(ServerInvoker invoker) {}
+ }
+
+
+ static class TestCallbackHandler implements InvokerCallbackHandler
+ {
+ int counter;
+ int max;
+
+ public void handleCallback(Callback callback) throws HandleCallbackException
+ {
+ log.info("received callback: " + counter++);
+ max = Math.max(Integer.valueOf((String) callback.getParameter()).intValue(), max);
+ log.info("max: " + max);
+ if (counter >= numberOfCallbacks)
+ {
+ synchronized (lock)
+ {
+ lock.notifyAll();
+ }
+ }
+ }
+ }
+
+ static public class TestServerSocketFactory extends ServerSocketFactory
+ {
+ int timeout;
+ int initialWrites;
+
+ public TestServerSocketFactory()
+ {
+ this.timeout = 5000;
+ this.initialWrites = 2;
+ }
+ public TestServerSocketFactory(int timeout, int initialWrites)
+ {
+ this.timeout = timeout;
+ this.initialWrites = initialWrites;
+ }
+ public ServerSocket createServerSocket() throws IOException
+ {
+ ServerSocket ss = new TestServerSocket(timeout, initialWrites);
+ log.info("returning: " + ss);
+ return ss;
+ }
+ public ServerSocket createServerSocket(int port) throws IOException
+ {
+ ServerSocket ss = null;
+ if (port != secondaryServerSocketPort)
+ {
+ ss = ServerSocketFactory.getDefault().createServerSocket(port);
+ }
+ else
+ {
+ ss = new TestServerSocket(port, timeout, initialWrites);
+ }
+ log.info("returning: " + ss);
+ return ss;
+ }
+
+ public ServerSocket createServerSocket(int port, int backlog) throws IOException
+ {
+ ServerSocket ss = null;
+ if (port != secondaryServerSocketPort)
+ {
+ ss = ServerSocketFactory.getDefault().createServerSocket(port, backlog);
+ }
+ else
+ {
+ ss = new TestServerSocket(port, backlog, timeout, initialWrites);
+ }
+ log.info("returning: " + ss);
+ return ss;
+ }
+
+ public ServerSocket createServerSocket(int port, int backlog, InetAddress ifAddress) throws IOException
+ {
+ ServerSocket ss = null;
+ if (port != secondaryServerSocketPort)
+ {
+ ss = ServerSocketFactory.getDefault().createServerSocket(port, backlog, ifAddress);
+ }
+ else
+ {
+ ss = new TestServerSocket(port, backlog, ifAddress, timeout, initialWrites);
+ }
+ log.info("returning: " + ss);
+ return ss;
+ }
+ }
+
+
+ static class TestServerSocket extends ServerSocket
+ {
+ int timeout;
+ int initialWrites;
+
+ public TestServerSocket(int timeout, int initialWrites) throws IOException
+ {
+ super();
+ this.timeout = timeout;
+ this.initialWrites = initialWrites;
+ }
+ public TestServerSocket(int port, int timeout, int initialWrites) throws IOException
+ {
+ super(port);
+ this.timeout = timeout;
+ this.initialWrites = initialWrites;
+ }
+ public TestServerSocket(int port, int backlog, int timeout, int initialWrites) throws IOException
+ {
+ super(port, backlog);
+ this.timeout = timeout;
+ this.initialWrites = initialWrites;
+ }
+ public TestServerSocket(int port, int backlog, InetAddress bindAddr, int timeout, int initialWrites) throws IOException
+ {
+ super(port, backlog, bindAddr);
+ this.timeout = timeout;
+ this.initialWrites = initialWrites;
+ }
+ public Socket accept() throws IOException
+ {
+ Socket s = new TestSocket(timeout, initialWrites);
+ implAccept(s);
+ return s;
+ }
+ public String toString()
+ {
+ return "TestServerSocket[" + getLocalPort() + "]";
+ }
+ }
+
+
+ static class TestSocket extends Socket
+ {
+ int timeout;
+ int initialWrites;
+
+ public TestSocket(int timeout, int initialWrites)
+ {
+ this.timeout = timeout;
+ this.initialWrites = initialWrites;
+ }
+ public TestSocket(String host, int port, int timeout, int initialWrites) throws UnknownHostException, IOException
+ {
+ super(host, port);
+ this.timeout = timeout;
+ this.initialWrites = initialWrites;
+ }
+ public TestSocket(InetAddress address, int port, int timeout, int initialWrites) throws IOException
+ {
+ super(address, port);
+ this.timeout = timeout;
+ this.initialWrites = initialWrites;
+ }
+ public TestSocket(String host, int port, InetAddress localAddr, int localPort, int timeout, int initialWrites) throws IOException
+ {
+ super(host, port, localAddr, localPort);
+ this.timeout = timeout;
+ this.initialWrites = initialWrites;
+ }
+ public TestSocket(InetAddress address, int port, InetAddress localAddr, int localPort, int timeout, int initialWrites) throws IOException
+ {
+ super(address, port, localAddr, localPort);
+ this.timeout = timeout;
+ this.initialWrites = initialWrites;
+ }
+ public OutputStream getOutputStream() throws IOException
+ {
+ return new TestOutputStream(super.getOutputStream(), timeout, initialWrites);
+ }
+ public String toString()
+ {
+ return "TestSocket[" + getLocalPort() + "->" + getPort() + "]";
+ }
+ }
+
+ public static class TestOutputStream extends OutputStream
+ {
+ OutputStream os;
+ int timeout;
+ boolean closed;
+ int initialWrites;
+ boolean doCounterTest = true;
+ int counter;
+
+ public TestOutputStream(OutputStream os, int timeout, int initialWrites)
+ {
+ this.os = os;
+ this.timeout = timeout;
+ this.initialWrites = initialWrites;
+ }
+ public void close()throws IOException
+ {
+ closed = true;
+ super.close();
+ log.info(this + " closed");
+ }
+ public void write(int b) throws IOException
+ {
+ if (closed)
+ {
+ log.info("TestOutputStream closed, cannot write");
+ throw new SocketException("closed");
+ }
+ if (doCounterTest && ++counter > initialWrites)
+ {
+ close();
+ throw new SocketException("closed");
+ }
+ os.write(b);
+ }
+ public void write(byte b[], int off, int len) throws IOException
+ {
+ if (closed)
+ {
+ log.info("TestOutputStream closed, cannot write");
+ throw new SocketException("closed");
+ }
+ log.info("TestOutputStream: counter = " + counter + ", initialWrites = " + initialWrites);
+ if (++counter > initialWrites)
+ {
+ close();
+ throw new SocketException("closed");
+ }
+ try
+ {
+ log.info(this + " writing");
+ doCounterTest = false;
+ os.write(b, off, len);
+ doCounterTest = true;
+ log.info(this + " back from writing");
+ }
+ catch (IOException e)
+ {
+ log.info("exception: ", e);
+ throw e;
+ }
+ }
+ }
+}
\ No newline at end of file
15 years, 3 months
JBoss Remoting SVN: r5330 - remoting2/branches/2.2/src/main/org/jboss/remoting/transport/bisocket.
by jboss-remoting-commits@lists.jboss.org
Author: ron.sigal(a)jboss.com
Date: 2009-08-14 16:55:48 -0400 (Fri, 14 Aug 2009)
New Revision: 5330
Modified:
remoting2/branches/2.2/src/main/org/jboss/remoting/transport/bisocket/Bisocket.java
Log:
JBREM-1140: removed changes.
Modified: remoting2/branches/2.2/src/main/org/jboss/remoting/transport/bisocket/Bisocket.java
===================================================================
--- remoting2/branches/2.2/src/main/org/jboss/remoting/transport/bisocket/Bisocket.java 2009-08-14 20:52:58 UTC (rev 5329)
+++ remoting2/branches/2.2/src/main/org/jboss/remoting/transport/bisocket/Bisocket.java 2009-08-14 20:55:48 UTC (rev 5330)
@@ -55,13 +55,6 @@
public static final int PING_WINDOW_FACTOR_DEFAULT = 2;
/**
- * Configuration key for enabling ping replies, so that the client can more
- * actively detect connection death. Must be enabled on both client and
- * server.
- */
- public static final String ENABLE_PING_REPLIES = "enablePingReplies";
-
- /**
* Configuration key and default value for number of retries
* BisocketServerInvoker.ControlConnectionThread and
* BisocketServerInvoker.createControlConnection should attempt while creating
15 years, 3 months
JBoss Remoting SVN: r5329 - remoting2/branches/2.2/src/main/org/jboss/remoting/transport/bisocket.
by jboss-remoting-commits@lists.jboss.org
Author: ron.sigal(a)jboss.com
Date: 2009-08-14 16:52:58 -0400 (Fri, 14 Aug 2009)
New Revision: 5329
Modified:
remoting2/branches/2.2/src/main/org/jboss/remoting/transport/bisocket/BisocketClientInvoker.java
Log:
JBREM-1147: createSocket() periodically checks for updated controlOuputStream; JBREM-1140: removed changes.
Modified: remoting2/branches/2.2/src/main/org/jboss/remoting/transport/bisocket/BisocketClientInvoker.java
===================================================================
--- remoting2/branches/2.2/src/main/org/jboss/remoting/transport/bisocket/BisocketClientInvoker.java 2009-08-13 00:37:00 UTC (rev 5328)
+++ remoting2/branches/2.2/src/main/org/jboss/remoting/transport/bisocket/BisocketClientInvoker.java 2009-08-14 20:52:58 UTC (rev 5329)
@@ -24,7 +24,6 @@
import java.io.IOException;
import java.io.OutputStream;
-import java.io.InputStream;
import java.lang.reflect.Method;
import java.net.Socket;
import java.util.Collections;
@@ -82,16 +81,17 @@
protected String listenerId;
private int pingFrequency = Bisocket.PING_FREQUENCY_DEFAULT;
+ private int pingWindowFactor = Bisocket.PING_WINDOW_FACTOR_DEFAULT;
+ private int pingWindow = pingWindowFactor * pingFrequency;
private int maxRetries = Bisocket.MAX_RETRIES_DEFAULT;
private Socket controlSocket;
private OutputStream controlOutputStream;
- private InputStream controlInputStream;
private Object controlLock = new Object();
private PingTimerTask pingTimerTask;
protected boolean isCallbackInvoker;
protected BooleanHolder pingFailed = new BooleanHolder(false);
- private boolean enablePingReplies = false;
+
/**
* @param listenerId
* @return
@@ -188,6 +188,26 @@
}
}
+ val = configuration.get(Bisocket.PING_WINDOW_FACTOR);
+ if (val != null && val instanceof String && ((String) val).length() > 0)
+ {
+ try
+ {
+ pingWindowFactor = Integer.valueOf(((String) val)).intValue();
+ log.debug(this + " setting pingWindowFactor to " + pingWindowFactor);
+ }
+ catch (NumberFormatException e)
+ {
+ log.warn("Invalid format for " + "\"" + Bisocket.PING_WINDOW_FACTOR + "\": " + val);
+ }
+ }
+ else if (val != null)
+ {
+ log.warn("\"" + Bisocket.PING_WINDOW_FACTOR + "\" must be specified as a String");
+ }
+
+ pingWindow = pingWindowFactor * pingFrequency;
+
val = configuration.get(Bisocket.MAX_RETRIES);
if (val != null)
{
@@ -203,14 +223,6 @@
" value of " + val + " to an int value.");
}
}
-
- val = configuration.get(Bisocket.ENABLE_PING_REPLIES);
- if (val != null)
- {
- // Boolean.valueOf doesn't throw exceptions :-)
- boolean bVal = Boolean.valueOf((String) val).booleanValue();
- enablePingReplies = bVal;
- }
}
}
@@ -237,7 +249,20 @@
this.pingFrequency = pingFrequency;
}
-
+
+ public int getPingWindowFactor()
+ {
+ return pingWindowFactor;
+ }
+
+
+ public void setPingWindowFactor(int pingWindowFactor)
+ {
+ this.pingWindowFactor = pingWindowFactor;
+ pingWindow = pingWindowFactor * pingFrequency;
+ }
+
+
protected void handleConnect() throws ConnectionFailedException
{
// Callback client on server side.
@@ -287,7 +312,6 @@
try
{
controlOutputStream = controlSocket.getOutputStream();
- controlInputStream = controlSocket.getInputStream();
}
catch (IOException e1)
{
@@ -461,35 +485,47 @@
synchronized (controlLock)
{
- controlOutputStream.write(Bisocket.CREATE_ORDINARY_SOCKET);
- }
-
- synchronized (sockets)
- {
- if (!sockets.isEmpty())
+ if (log.isTraceEnabled()) log.trace(this + " writing Bisocket.CREATE_ORDINARY_SOCKET on " + controlOutputStream);
+ try
{
- Iterator it = sockets.iterator();
- Socket socket = (Socket) it.next();
- it.remove();
- log.debug(this + " found socket (" + listenerId + "): " + socket);
- return socket;
+ controlOutputStream.write(Bisocket.CREATE_ORDINARY_SOCKET);
+ if (log.isTraceEnabled()) log.trace(this + " wrote Bisocket.CREATE_ORDINARY_SOCKET");
+
+ synchronized (sockets)
+ {
+ if (!sockets.isEmpty())
+ {
+ Iterator it = sockets.iterator();
+ Socket socket = (Socket) it.next();
+ it.remove();
+ log.debug(this + " found socket (" + listenerId + "): " + socket);
+ return socket;
+ }
+ }
}
+ catch (IOException e)
+ {
+ log.debug(this + " unable to write Bisocket.CREATE_ORDINARY_SOCKET", e);
+ }
}
long timeRemaining = timeout;
- long start = System.currentTimeMillis();
+ long pingFailedWindow = 2 * pingWindow;
+ long pingFailedTimeRemaining = pingFailedWindow;
+ long start = System.currentTimeMillis();
+ OutputStream savedControlOutputStream = controlOutputStream;
- while (isConnected() && !pingFailed.flag && (timeout == 0 || timeRemaining > 0))
+ while (isConnected() && (!pingFailed.flag || pingFailedTimeRemaining > 0) && (timeout == 0 || timeRemaining > 0))
{
synchronized (sockets)
- {
+ {
try
{
sockets.wait(1000);
}
catch (InterruptedException e)
{
- log.debug("unexpected interrupt");
+ log.debug(this + " unexpected interrupt");
}
if (!sockets.isEmpty())
@@ -501,9 +537,26 @@
return socket;
}
}
-
+
+ if (savedControlOutputStream != controlOutputStream)
+ {
+ savedControlOutputStream = controlOutputStream;
+ log.debug(this + " rewriting Bisocket.CREATE_ORDINARY_SOCKET on " + controlOutputStream);
+ try
+ {
+ controlOutputStream.write(Bisocket.CREATE_ORDINARY_SOCKET);
+ log.debug(this + " rewrote Bisocket.CREATE_ORDINARY_SOCKET");
+ }
+ catch (IOException e)
+ {
+ log.debug(this + " unable to rewrite Bisocket.CREATE_ORDINARY_SOCKET" + e.getMessage());
+ }
+ }
+
+ long elapsed = System.currentTimeMillis() - start;
if (timeout > 0)
- timeRemaining = timeout - (System.currentTimeMillis() - start);
+ timeRemaining = timeout - elapsed;
+ pingFailedTimeRemaining = pingFailedWindow - elapsed;
}
if (!isConnected())
@@ -533,7 +586,7 @@
controlSocket = socket;
log.debug(this + " control socket replaced by: " + socket);
controlOutputStream = controlSocket.getOutputStream();
- controlInputStream = controlSocket.getInputStream();
+ log.debug("controlOutputStream replaced by: " + controlOutputStream);
}
if (pingTimerTask != null)
@@ -616,16 +669,18 @@
{
private Object controlLock;
private OutputStream controlOutputStream;
- private InputStream controlInputStream;
+ private int maxRetries;
+ private Exception savedException;
+ private boolean pingSent;
private BooleanHolder pingFailed;
- private boolean enablePingReplies;
-
+
PingTimerTask(BisocketClientInvoker invoker)
{
controlLock = invoker.controlLock;
controlOutputStream = invoker.controlOutputStream;
- controlInputStream = invoker.controlInputStream;
- enablePingReplies = invoker.enablePingReplies;
+ maxRetries = invoker.getMaxRetries();
+ pingFailed = invoker.pingFailed;
+ pingFailed.flag = false;
}
public void shutDown()
@@ -633,7 +688,6 @@
synchronized (controlLock)
{
controlOutputStream = null;
- controlInputStream = null;
}
cancel();
try
@@ -648,54 +702,42 @@
}
public void run()
- {
- boolean ok = false;
- try
+ {
+ pingSent = false;
+
+ for (int i = 0; i < maxRetries; i++)
{
- synchronized (controlLock)
+ try
{
- if (controlOutputStream == null || controlInputStream == null)
- return;
-
- controlOutputStream.write(Bisocket.PING);
- if (enablePingReplies)
+ synchronized (controlLock)
{
- int rep = controlInputStream.read();
- if (rep == -1)
- {
- // socket was closed!
- shutDown();
+ if (controlOutputStream == null)
return;
- }
- if (rep != Bisocket.PING)
- {
- // um - protocol error
- log.warn("Protocol error: received unexpected reply to ping on control socket (are ping replies enabled on the server?); shutting down PingTimerTask");
- shutDown();
- return;
- }
+
+ controlOutputStream.write(Bisocket.PING);
}
- ok = true;
+ pingSent = true;
+ break;
}
+ catch (Exception e)
+ {
+ savedException = e;
+ log.debug("Unable to send ping: trying again");
+ }
}
- catch (Exception e)
+
+ if (!pingSent)
{
- log.warn("Unable to send ping: shutting down PingTimerTask", e);
+ log.warn("Unable to send ping: shutting down PingTimerTask", savedException);
+ pingFailed.flag = true;
+ shutDown();
}
- finally
- {
- if (! ok)
- {
- pingFailed.flag = true;
- shutDown();
- }
- }
}
}
static class BooleanHolder
{
- public volatile boolean flag;
+ public boolean flag;
public BooleanHolder(boolean flag)
{
15 years, 3 months
JBoss Remoting SVN: r5328 - remoting2/branches/2.2/src/tests/org/jboss/test/remoting/invoker.
by jboss-remoting-commits@lists.jboss.org
Author: ron.sigal(a)jboss.com
Date: 2009-08-12 20:37:00 -0400 (Wed, 12 Aug 2009)
New Revision: 5328
Modified:
remoting2/branches/2.2/src/tests/org/jboss/test/remoting/invoker/ClientInvokerDelayedDestructionTestCase.java
Log:
JBREM-1143: Added testConfigByInvokerLocator().
Modified: remoting2/branches/2.2/src/tests/org/jboss/test/remoting/invoker/ClientInvokerDelayedDestructionTestCase.java
===================================================================
--- remoting2/branches/2.2/src/tests/org/jboss/test/remoting/invoker/ClientInvokerDelayedDestructionTestCase.java 2009-08-13 00:36:07 UTC (rev 5327)
+++ remoting2/branches/2.2/src/tests/org/jboss/test/remoting/invoker/ClientInvokerDelayedDestructionTestCase.java 2009-08-13 00:37:00 UTC (rev 5328)
@@ -304,8 +304,44 @@
shutdownServer();
log.info(getName() + " PASSES");
}
+
+ public void testConfigByInvokerLocator() throws Throwable
+ {
+ log.info("entering " + getName());
+
+ // Start server.
+ setupServer();
+
+ // Create client.
+ String clientLocatorURI = locatorURI + "/?invokerDestructionDelay=10000";
+ InvokerLocator clientLocator = new InvokerLocator(clientLocatorURI);
+ HashMap clientConfig = new HashMap();
+ clientConfig.put(InvokerLocator.FORCE_REMOTE, "true");
+ addExtraClientConfig(clientConfig);
+ Client client = new Client(clientLocator, clientConfig);
+ client.connect();
+ ClientInvoker invoker1 = client.getInvoker();
+
+ // Test connections.
+ assertEquals("abc", client.invoke("abc"));
+ log.info("connection is good for first client");
+ client.disconnect();
+
+ Thread.sleep(5000);
+ client = new Client(clientLocator, clientConfig);
+ client.connect();
+ ClientInvoker invoker2 = client.getInvoker();
+ assertEquals("abc", client.invoke("abc"));
+ log.info("connection is good for second client");
+ assertSame(invoker2, invoker1);
+
+ client.disconnect();
+ shutdownServer();
+ log.info(getName() + " PASSES");
+ }
+
protected String getTransport()
{
return "socket";
15 years, 3 months
JBoss Remoting SVN: r5327 - remoting2/branches/2.2/src/main/org/jboss/remoting.
by jboss-remoting-commits@lists.jboss.org
Author: ron.sigal(a)jboss.com
Date: 2009-08-12 20:36:07 -0400 (Wed, 12 Aug 2009)
New Revision: 5327
Modified:
remoting2/branches/2.2/src/main/org/jboss/remoting/Client.java
Log:
JBREM-1143: Value of "invokerDestructionDelay" can come from InvokerLocator as well as configuration map.
Modified: remoting2/branches/2.2/src/main/org/jboss/remoting/Client.java
===================================================================
--- remoting2/branches/2.2/src/main/org/jboss/remoting/Client.java 2009-08-12 02:31:22 UTC (rev 5326)
+++ remoting2/branches/2.2/src/main/org/jboss/remoting/Client.java 2009-08-13 00:36:07 UTC (rev 5327)
@@ -294,27 +294,9 @@
if (configuration != null)
{
this.configuration = new HashMap(configuration);
- Object o = configuration.get(INVOKER_DESTRUCTION_DELAY);
+ Object o = configuration.get(Remoting.USE_CLIENT_CONNECTION_IDENTITY);
if (o instanceof String)
{
- try
- {
- invokerDestructionDelay = Integer.parseInt((String) o);
- log.debug(this + " setting invokerDestructionDelay to " + invokerDestructionDelay);
- }
- catch (NumberFormatException e)
- {
- log.error("invokerDestructionDelay parameter has invalid format: " + o);
- }
- }
- else if (o != null)
- {
- log.error("invokerDestructionDelay parameter must be a string in integer format: " + o);
- }
-
- o = configuration.get(Remoting.USE_CLIENT_CONNECTION_IDENTITY);
- if (o instanceof String)
- {
useClientConnectionIdentity = Boolean.valueOf((String) o).booleanValue();
}
else if (o != null)
@@ -343,6 +325,24 @@
tempMap.putAll(locator.getParameters());
PortUtil.updateRange(tempMap);
+ Object o = tempMap.get(INVOKER_DESTRUCTION_DELAY);
+ if (o instanceof String)
+ {
+ try
+ {
+ invokerDestructionDelay = Integer.parseInt((String) o);
+ log.debug(this + " setting invokerDestructionDelay to " + invokerDestructionDelay);
+ }
+ catch (NumberFormatException e)
+ {
+ log.error("invokerDestructionDelay parameter has invalid format: " + o);
+ }
+ }
+ else if (o != null)
+ {
+ log.error("invokerDestructionDelay parameter must be a string in integer format: " + o);
+ }
+
this.sessionId = new GUID().toString();
}
15 years, 3 months
JBoss Remoting SVN: r5326 - remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2009-08-11 22:31:22 -0400 (Tue, 11 Aug 2009)
New Revision: 5326
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/ClientImpl.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/FutureReplyImpl.java
Log:
Generics fixes and error message improvements
Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/ClientImpl.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/ClientImpl.java 2009-08-06 02:41:11 UTC (rev 5325)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/ClientImpl.java 2009-08-12 02:31:22 UTC (rev 5326)
@@ -75,7 +75,7 @@
final ReplyHandler replyHandler = futureReply.getReplyHandler();
final RemoteRequestContext requestContext = handler.receiveRequest(actualRequest, replyHandler);
futureReply.setRemoteRequestContext(requestContext);
- futureReply.addNotifier(IoUtils.attachmentClosingNotifier(), executor);
+ futureReply.addNotifier(IoUtils.<O>attachmentClosingNotifier(), executor);
executor.runQueue();
try {
final O reply = futureReply.getInterruptibly();
Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/FutureReplyImpl.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/FutureReplyImpl.java 2009-08-06 02:41:11 UTC (rev 5325)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/FutureReplyImpl.java 2009-08-12 02:31:22 UTC (rev 5326)
@@ -65,11 +65,22 @@
private final class Handler implements ReplyHandler {
public void handleReply(final Object reply) {
+ final Class<O> replyType = FutureReplyImpl.this.replyType;
final O actualReply;
try {
actualReply = replyType.cast(reply);
} catch (ClassCastException e) {
- setException(new ReplyException("Reply was of the wrong type (got <" + reply.getClass().getName() + ">; expected <? extends " + replyType.getName() + ">"));
+ // reply can't be null, else we wouldn't be here...
+ final Class<? extends Object> actualReplyType = reply.getClass();
+ final String actualReplyTypeName = actualReplyType.getName();
+ final String replyTypeName = replyType.getName();
+ final ReplyException replyException;
+ if (actualReplyTypeName.equals(replyTypeName)) {
+ replyException = new ReplyException("Reply appears to be of the right type (" + replyTypeName + "), but from the wrong classloader (the reply is from classloader " + actualReplyType.getClassLoader() + " but the client expected it to be classloader " + replyType.getClassLoader() + ")");
+ } else {
+ replyException = new ReplyException("Reply was of the wrong type (got a " + actualReplyTypeName + "; expected a " + replyTypeName + ")");
+ }
+ setException(replyException);
return;
}
setResult(actualReply);
15 years, 3 months
JBoss Remoting SVN: r5325 - remoting2/branches/2.x/src/tests/org/jboss/test/remoting/transport/socket/timeout.
by jboss-remoting-commits@lists.jboss.org
Author: ron.sigal(a)jboss.com
Date: 2009-08-05 22:41:11 -0400 (Wed, 05 Aug 2009)
New Revision: 5325
Modified:
remoting2/branches/2.x/src/tests/org/jboss/test/remoting/transport/socket/timeout/WriteTimeoutTestParent.java
Log:
JBREM-1120: Elaborated logging.
Modified: remoting2/branches/2.x/src/tests/org/jboss/test/remoting/transport/socket/timeout/WriteTimeoutTestParent.java
===================================================================
--- remoting2/branches/2.x/src/tests/org/jboss/test/remoting/transport/socket/timeout/WriteTimeoutTestParent.java 2009-08-05 15:51:08 UTC (rev 5324)
+++ remoting2/branches/2.x/src/tests/org/jboss/test/remoting/transport/socket/timeout/WriteTimeoutTestParent.java 2009-08-06 02:41:11 UTC (rev 5325)
@@ -110,7 +110,7 @@
}
- public void testClientWriteTimeout() throws Throwable
+ public void xtestClientWriteTimeout() throws Throwable
{
log.info("entering " + getName());
@@ -160,7 +160,7 @@
}
- public void testServerWriteTimeout() throws Throwable
+ public void xtestServerWriteTimeout() throws Throwable
{
log.info("entering " + getName());
@@ -278,7 +278,7 @@
}
- public void testServerCallbackWriteTimeout() throws Throwable
+ public void xtestServerCallbackWriteTimeout() throws Throwable
{
log.info("entering " + getName());
@@ -697,7 +697,7 @@
{
closed = true;
super.close();
- log.info("closed");
+ log.info(this + " closed");
}
public void write(int b) throws IOException
{
@@ -740,10 +740,19 @@
e.printStackTrace();
}
}
- log.info("TestOutputStream writing");
- doWait = false;
- os.write(b, off, len);
- doWait = true;
+ try
+ {
+ log.info(this + " writing");
+ doWait = false;
+ os.write(b, off, len);
+ doWait = true;
+ log.info(this + " back from writing");
+ }
+ catch (IOException e)
+ {
+ log.info("exception: ", e);
+ throw e;
+ }
}
}
}
\ No newline at end of file
15 years, 3 months
JBoss Remoting SVN: r5324 - remoting2/branches/2.x/src/main/org/jboss/remoting/marshal/compress.
by jboss-remoting-commits@lists.jboss.org
Author: ron.sigal(a)jboss.com
Date: 2009-08-05 11:51:08 -0400 (Wed, 05 Aug 2009)
New Revision: 5324
Modified:
remoting2/branches/2.x/src/main/org/jboss/remoting/marshal/compress/CompressingMarshaller.java
Log:
JBREM-1077: Reorganized imports.
Modified: remoting2/branches/2.x/src/main/org/jboss/remoting/marshal/compress/CompressingMarshaller.java
===================================================================
--- remoting2/branches/2.x/src/main/org/jboss/remoting/marshal/compress/CompressingMarshaller.java 2009-08-05 15:50:38 UTC (rev 5323)
+++ remoting2/branches/2.x/src/main/org/jboss/remoting/marshal/compress/CompressingMarshaller.java 2009-08-05 15:51:08 UTC (rev 5324)
@@ -22,12 +22,6 @@
package org.jboss.remoting.marshal.compress;
-import org.jboss.remoting.marshal.Marshaller;
-import org.jboss.remoting.marshal.VersionedMarshaller;
-import org.jboss.remoting.marshal.serializable.SerializableMarshaller;
-import org.jboss.remoting.serialization.SerializationStreamFactory;
-import org.jfree.util.Log;
-
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
@@ -35,7 +29,12 @@
import java.util.zip.Deflater;
import java.util.zip.GZIPOutputStream;
+import org.jboss.remoting.marshal.Marshaller;
+import org.jboss.remoting.marshal.VersionedMarshaller;
+import org.jboss.remoting.marshal.serializable.SerializableMarshaller;
+import org.jboss.remoting.serialization.SerializationStreamFactory;
+
/**
* <code>CompressingMarshaller</code> and <code>CompressingUnMarshaller</code> are a general
* purpose compressing marshaller / decompressing unmarshaller pair based on Java's GZIP facilities.
15 years, 3 months
JBoss Remoting SVN: r5323 - remoting2/branches/2.x/src/main/org/jboss/remoting/marshal/compress.
by jboss-remoting-commits@lists.jboss.org
Author: ron.sigal(a)jboss.com
Date: 2009-08-05 11:50:38 -0400 (Wed, 05 Aug 2009)
New Revision: 5323
Modified:
remoting2/branches/2.x/src/main/org/jboss/remoting/marshal/compress/CompressingUnMarshaller.java
Log:
JBREM-1077: Reuses GZIPInputStream and BufferedInputStream, just replacing the Inflater with each call.
Modified: remoting2/branches/2.x/src/main/org/jboss/remoting/marshal/compress/CompressingUnMarshaller.java
===================================================================
--- remoting2/branches/2.x/src/main/org/jboss/remoting/marshal/compress/CompressingUnMarshaller.java 2009-08-05 15:48:06 UTC (rev 5322)
+++ remoting2/branches/2.x/src/main/org/jboss/remoting/marshal/compress/CompressingUnMarshaller.java 2009-08-05 15:50:38 UTC (rev 5323)
@@ -22,23 +22,23 @@
package org.jboss.remoting.marshal.compress;
-import org.jboss.remoting.marshal.UnMarshaller;
-import org.jboss.remoting.marshal.VersionedUnMarshaller;
-import org.jboss.remoting.marshal.http.HTTPUnMarshaller;
-import org.jboss.remoting.marshal.serializable.SerializableUnMarshaller;
-import org.jboss.remoting.serialization.SerializationManager;
-import org.jboss.remoting.serialization.SerializationStreamFactory;
-
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
-import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.zip.GZIPInputStream;
+import java.util.zip.Inflater;
+import org.jboss.remoting.marshal.UnMarshaller;
+import org.jboss.remoting.marshal.VersionedUnMarshaller;
+import org.jboss.remoting.marshal.http.HTTPUnMarshaller;
+import org.jboss.remoting.marshal.serializable.SerializableUnMarshaller;
+import org.jboss.remoting.serialization.SerializationManager;
+import org.jboss.remoting.serialization.SerializationStreamFactory;
+
/**
* <code>CompressingMarshaller</code> and <code>CompressingUnMarshaller</code> are a general
* purpose compressing marshaller / decompressing unmarshaller pair based on Java's GZIP facilities.
@@ -88,7 +88,9 @@
public InputStream getMarshallingStream(InputStream inputStream) throws IOException
{
- return inputStream;
+ SelfCleaningGZipInputStream gzis = new SelfCleaningGZipInputStream(inputStream);
+ DecomposableBufferedInputStream bis = new DecomposableBufferedInputStream(gzis);
+ return bis;
}
/**
@@ -104,8 +106,20 @@
*/
public Object read(InputStream inputStream, Map metadata, int version) throws IOException, ClassNotFoundException
{
- SelfCleaningGZipInputStream gzis = new SelfCleaningGZipInputStream(inputStream);
- BufferedInputStream bis = new BufferedInputStream(gzis);
+ SelfCleaningGZipInputStream gzis = null;
+ DecomposableBufferedInputStream bis = null;
+
+ if (inputStream instanceof DecomposableBufferedInputStream)
+ {
+ bis = (DecomposableBufferedInputStream) inputStream;
+ gzis = (SelfCleaningGZipInputStream) bis.getWrappedStream();
+ }
+ else
+ {
+ gzis = new SelfCleaningGZipInputStream(inputStream);
+ bis = new DecomposableBufferedInputStream(gzis);
+ }
+
SerializationManager manager = SerializationStreamFactory.getManagerInstance(getSerializationType());
ObjectInputStream ois = manager.createInput(bis, getClassLoader());
@@ -157,12 +171,18 @@
*/
static class SelfCleaningGZipInputStream extends GZIPInputStream
{
- public SelfCleaningGZipInputStream(InputStream in) throws IOException
+ SelfCleaningGZipInputStream(InputStream in) throws IOException
{
super(in);
}
+
+ void refreshInflater()
+ {
+ inf = new Inflater(true);
+ crc.reset();
+ }
- public void end() throws IOException
+ void end() throws IOException
{
while(available() > 0) { // This will force input stream to read gzip trailer from input stream
read();
@@ -171,5 +191,22 @@
}
}
+ static class DecomposableBufferedInputStream extends BufferedInputStream
+ {
+ DecomposableBufferedInputStream(InputStream in, int size)
+ {
+ super(in, size);
+ }
+
+ DecomposableBufferedInputStream(InputStream in)
+ {
+ super(in);
+ }
+
+ InputStream getWrappedStream()
+ {
+ return in;
+ }
+ }
}
15 years, 3 months
JBoss Remoting SVN: r5322 - remoting2/branches/2.x/src/main/org/jboss/remoting/marshal/compress.
by jboss-remoting-commits@lists.jboss.org
Author: ron.sigal(a)jboss.com
Date: 2009-08-05 11:48:06 -0400 (Wed, 05 Aug 2009)
New Revision: 5322
Modified:
remoting2/branches/2.x/src/main/org/jboss/remoting/marshal/compress/CompressingMarshaller.java
Log:
JBREM-1077: Reuses GZIPOutputStream and BufferedOutputStream, just replacing the Deflater with each call.
Modified: remoting2/branches/2.x/src/main/org/jboss/remoting/marshal/compress/CompressingMarshaller.java
===================================================================
--- remoting2/branches/2.x/src/main/org/jboss/remoting/marshal/compress/CompressingMarshaller.java 2009-08-05 04:27:28 UTC (rev 5321)
+++ remoting2/branches/2.x/src/main/org/jboss/remoting/marshal/compress/CompressingMarshaller.java 2009-08-05 15:48:06 UTC (rev 5322)
@@ -26,11 +26,13 @@
import org.jboss.remoting.marshal.VersionedMarshaller;
import org.jboss.remoting.marshal.serializable.SerializableMarshaller;
import org.jboss.remoting.serialization.SerializationStreamFactory;
+import org.jfree.util.Log;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
+import java.util.zip.Deflater;
import java.util.zip.GZIPOutputStream;
@@ -81,7 +83,9 @@
public OutputStream getMarshallingStream(OutputStream outputStream) throws IOException
{
- return outputStream;
+ GZIPOutputStream gzos = new SelfCleaningGZipOutputStream(outputStream);
+ DecomposableBufferedOutputStream bos = new DecomposableBufferedOutputStream(gzos);
+ return bos;
}
@@ -94,9 +98,19 @@
*/
public void write(Object dataObject, OutputStream output, int version) throws IOException
{
- output.flush();
- GZIPOutputStream gzos = new SelfCleaningGZipOutputStream(output);
- BufferedOutputStream bos = new BufferedOutputStream(gzos);
+ SelfCleaningGZipOutputStream gzos = null;
+ DecomposableBufferedOutputStream bos = null;
+ if (output instanceof DecomposableBufferedOutputStream)
+ {
+ bos = (DecomposableBufferedOutputStream) output;
+ gzos = (SelfCleaningGZipOutputStream) bos.getWrappedStream();
+ gzos.refreshDeflater();
+ }
+ else
+ {
+ gzos = new SelfCleaningGZipOutputStream(output);
+ bos = new DecomposableBufferedOutputStream(gzos);
+ }
ObjectOutputStream oos = SerializationStreamFactory.getManagerInstance(getSerializationType()).createOutput(bos);
if(wrappedMarshaller != null)
@@ -135,10 +149,25 @@
*/
static class SelfCleaningGZipOutputStream extends GZIPOutputStream
{
+ boolean used;
+
public SelfCleaningGZipOutputStream(OutputStream out) throws IOException
{
super(out);
}
+
+ void refreshDeflater()
+ {
+ if (used)
+ {
+ def = new Deflater(Deflater.DEFAULT_COMPRESSION, true);
+ crc.reset();
+ }
+ else
+ {
+ used = true;
+ }
+ }
/**
* Writes remaining compressed data to the output stream and closes the underlying stream.
@@ -150,5 +179,23 @@
def.end(); // This will release all resources used by zlib native code
}
}
+
+ static class DecomposableBufferedOutputStream extends BufferedOutputStream
+ {
+ DecomposableBufferedOutputStream(OutputStream out, int size)
+ {
+ super(out, size);
+ }
+
+ DecomposableBufferedOutputStream(OutputStream out)
+ {
+ super(out);
+ }
+
+ OutputStream getWrappedStream()
+ {
+ return out;
+ }
+ }
}
15 years, 3 months