Author: shawkins
Date: 2009-03-25 13:55:48 -0400 (Wed, 25 Mar 2009)
New Revision: 654
Modified:
trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMServerConnection.java
trunk/client/src/main/java/com/metamatrix/common/comm/platform/client/ServerAdminFactory.java
trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnection.java
trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnectionFactory.java
trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstance.java
trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java
trunk/client/src/test/java/com/metamatrix/common/comm/platform/socket/client/TestSocketServerConnection.java
trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/SocketVMController.java
trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketListener.java
Log:
TEIID-297 misc comm clean ups, ensuring that serverinstances get properly cycled.
Modified:
trunk/client/src/main/java/com/metamatrix/common/comm/platform/client/ServerAdminFactory.java
===================================================================
---
trunk/client/src/main/java/com/metamatrix/common/comm/platform/client/ServerAdminFactory.java 2009-03-25
16:01:36 UTC (rev 653)
+++
trunk/client/src/main/java/com/metamatrix/common/comm/platform/client/ServerAdminFactory.java 2009-03-25
17:55:48 UTC (rev 654)
@@ -40,6 +40,7 @@
import com.metamatrix.common.comm.platform.socket.client.SocketServerConnection;
import com.metamatrix.common.comm.platform.socket.client.SocketServerConnectionFactory;
import com.metamatrix.common.util.MetaMatrixProductNames;
+import com.metamatrix.common.util.PropertiesUtils;
import com.metamatrix.core.MetaMatrixRuntimeException;
/**
@@ -205,10 +206,11 @@
return createAdmin(p);
}
- public ServerAdmin createAdmin(final Properties p)
+ public ServerAdmin createAdmin(Properties p)
throws AdminComponentException, AdminException {
+ p = PropertiesUtils.clone(p);
p.setProperty(MMURL.CONNECTION.PRODUCT_NAME,
MetaMatrixProductNames.Platform.PRODUCT_NAME);
-
+ p.setProperty(MMURL.CONNECTION.AUTO_FAILOVER, Boolean.TRUE.toString());
ServerAdmin serverAdmin = (ServerAdmin)Proxy.newProxyInstance(Thread.currentThread()
.getContextClassLoader(), new Class[] { ServerAdmin.class }, new
ReconnectingProxy(p));
Modified:
trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnection.java
===================================================================
---
trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnection.java 2009-03-25
16:01:36 UTC (rev 653)
+++
trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnection.java 2009-03-25
17:55:48 UTC (rev 654)
@@ -30,6 +30,7 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
+import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
@@ -54,7 +55,6 @@
import com.metamatrix.common.comm.exception.ConnectionException;
import com.metamatrix.common.comm.exception.SingleInstanceCommunicationException;
import com.metamatrix.common.comm.platform.CommPlatformPlugin;
-import com.metamatrix.dqp.client.ClientSideDQP;
import com.metamatrix.platform.security.api.ILogon;
import com.metamatrix.platform.security.api.LogonResult;
@@ -78,6 +78,7 @@
private ILogon logon;
private Timer pingTimer;
private boolean closed;
+ private boolean failOver;
public SocketServerConnection(
SocketServerInstanceFactory connectionFactory, boolean secure,
@@ -88,6 +89,7 @@
this.connProps = connProps;
this.secure = secure;
this.logon = this.getService(ILogon.class);
+ this.failOver =
Boolean.valueOf(connProps.getProperty(MMURL.CONNECTION.AUTO_FAILOVER)).booleanValue();
authenticate();
@@ -111,7 +113,7 @@
if (this.serverInstance.isOpen()) {
return this.serverInstance;
}
- this.serverInstance = null;
+ closeServerInstance();
}
List<HostInfo> hostKeys = new
ArrayList<HostInfo>(this.serverDiscovery.getKnownHosts());
List<HostInfo> hostCopy = new ArrayList<HostInfo>(hostKeys);
@@ -157,7 +159,7 @@
try {
this.logonResult = logon.logon(connProps);
if (this.serverDiscovery.setLogonResult(this.logonResult)) {
- selectNewServerInstance();
+ closeServerInstance();
}
return;
} catch (LogonException e) {
@@ -194,20 +196,15 @@
class ServerConnectionInvocationHandler implements InvocationHandler {
private Class<?> targetClass;
- private SocketServerInstance instance;
private Object target;
- private boolean failOver;
public ServerConnectionInvocationHandler(Class<?> targetClass) {
this.targetClass = targetClass;
- this.failOver =
Boolean.valueOf(connProps.getProperty(MMURL.CONNECTION.AUTO_FAILOVER)).booleanValue()
- || !ClientSideDQP.class.isAssignableFrom(targetClass);
}
private synchronized Object getTarget() throws CommunicationException {
if (this.target == null) {
- this.instance = selectServerInstance();
- this.target = this.instance.getService(targetClass);
+ this.target = selectServerInstance().getService(targetClass);
}
return this.target;
}
@@ -240,13 +237,6 @@
private synchronized void invalidateTarget() {
this.target = null;
}
-
- synchronized SocketServerInstance getInstance() throws CommunicationException {
- if (instance == null) {
- getTarget();
- }
- return instance;
- }
}
@@ -274,10 +264,7 @@
//ignore
}
- if (this.serverInstance != null) {
- this.serverInstance.shutdown();
- this.serverInstance = null;
- }
+ closeServerInstance();
this.closed = true;
this.serverDiscovery.shutdown();
@@ -298,25 +285,29 @@
return logonResult;
}
- synchronized void selectNewServerInstance() {
- this.serverInstance = null;
+ synchronized void closeServerInstance() {
+ if (this.serverInstance != null) {
+ this.serverInstance.shutdown();
+ this.serverInstance = null;
+ }
}
- public static boolean isSameInstance(Object service, Object otherService) throws
CommunicationException {
- try {
- ServerConnectionInvocationHandler handler =
(ServerConnectionInvocationHandler)Proxy.getInvocationHandler(service);
- ServerConnectionInvocationHandler otherHandler =
(ServerConnectionInvocationHandler)Proxy.getInvocationHandler(otherService);
- return
handler.getInstance().getHostInfo().getInetAddress().equals(otherHandler.getInstance().getHostInfo().getInetAddress());
- } catch (IllegalArgumentException e) {
+ public boolean isSameInstance(SocketServerConnection otherService) throws
CommunicationException {
+ SocketAddress address = selectServerInstance().getRemoteAddress();
+ if (address == null) {
return false;
- } catch (UnknownHostException e) {
- throw new CommunicationException(e);
}
+ return address.equals(otherService.selectServerInstance().getRemoteAddress());
}
- public static void selectNewServerInstance(Object service) {
+ public void selectNewServerInstance(Object service) {
+ closeServerInstance();
ServerConnectionInvocationHandler handler =
(ServerConnectionInvocationHandler)Proxy.getInvocationHandler(service);
handler.invalidateTarget();
}
+
+ public void setFailOver(boolean failOver) {
+ this.failOver = failOver;
+ }
}
\ No newline at end of file
Modified:
trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnectionFactory.java
===================================================================
---
trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnectionFactory.java 2009-03-25
16:01:36 UTC (rev 653)
+++
trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnectionFactory.java 2009-03-25
17:55:48 UTC (rev 654)
@@ -54,6 +54,14 @@
import com.metamatrix.core.util.ReflectionHelper;
import com.metamatrix.platform.security.api.ILogon;
+/**
+ * Responsible for creating socket based connections
+ *
+ * The comm approach is object based and layered. Connections manage failover and
identity.
+ * ServerInstances represent the service layer to a particular cluster member.
ObjectChannels
+ * abstract the underlying IO.
+ *
+ */
public class SocketServerConnectionFactory implements ServerConnectionFactory,
SocketServerInstanceFactory {
private static final String URL = "URL"; //$NON-NLS-1$
Modified:
trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstance.java
===================================================================
---
trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstance.java 2009-03-25
16:01:36 UTC (rev 653)
+++
trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstance.java 2009-03-25
17:55:48 UTC (rev 654)
@@ -22,7 +22,8 @@
package com.metamatrix.common.comm.platform.socket.client;
-import com.metamatrix.common.api.HostInfo;
+import java.net.SocketAddress;
+
import com.metamatrix.common.util.crypto.Cryptor;
public interface SocketServerInstance {
@@ -31,8 +32,8 @@
void shutdown();
- HostInfo getHostInfo();
-
+ SocketAddress getRemoteAddress();
+
boolean isOpen();
Cryptor getCryptor();
Modified:
trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java
===================================================================
---
trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java 2009-03-25
16:01:36 UTC (rev 653)
+++
trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java 2009-03-25
17:55:48 UTC (rev 654)
@@ -30,6 +30,7 @@
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.util.Iterator;
import java.util.Map;
@@ -104,11 +105,9 @@
}
}
- /**
- * Return identifier of the server VM this ServerInstance is associated with.
- */
- public HostInfo getHostInfo() {
- return this.hostInfo;
+ @Override
+ public SocketAddress getRemoteAddress() {
+ return this.socketChannel.getRemoteAddress();
}
static String getVersionInfo() {
Modified:
trunk/client/src/test/java/com/metamatrix/common/comm/platform/socket/client/TestSocketServerConnection.java
===================================================================
---
trunk/client/src/test/java/com/metamatrix/common/comm/platform/socket/client/TestSocketServerConnection.java 2009-03-25
16:01:36 UTC (rev 653)
+++
trunk/client/src/test/java/com/metamatrix/common/comm/platform/socket/client/TestSocketServerConnection.java 2009-03-25
17:55:48 UTC (rev 654)
@@ -26,6 +26,7 @@
package com.metamatrix.common.comm.platform.socket.client;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.util.Properties;
import junit.framework.TestCase;
@@ -41,7 +42,6 @@
import com.metamatrix.common.comm.exception.ConnectionException;
import com.metamatrix.common.comm.exception.SingleInstanceCommunicationException;
import com.metamatrix.common.util.crypto.NullCryptor;
-import com.metamatrix.dqp.client.ClientSideDQP;
import com.metamatrix.dqp.client.ResultsFuture;
import com.metamatrix.platform.security.api.ILogon;
import com.metamatrix.platform.security.api.LogonResult;
@@ -146,6 +146,7 @@
*/
public void testRetry() throws Exception {
SocketServerConnection connection = createConnection(new
SingleInstanceCommunicationException());
+ connection.setFailOver(true);
ILogon logon = connection.getService(ILogon.class);
logon.ping();
}
@@ -160,9 +161,21 @@
}
}
+
+ public void testImmediateFail1() throws Exception {
+ SocketServerConnection connection = createConnection(new CommunicationException());
+ connection.setFailOver(true);
+ ILogon logon = connection.getService(ILogon.class);
+ try {
+ logon.ping();
+ fail("expected exception"); //$NON-NLS-1$
+ } catch (MetaMatrixComponentException e) {
+
+ }
+ }
private SocketServerConnection createConnection(final Throwable throwException) throws
CommunicationException, ConnectionException {
- return createConnection(throwException, new HostInfo("foo", 1));
//$NON-NLS-1$
+ return createConnection(throwException, new HostInfo("0.0.0.2", 1));
//$NON-NLS-1$
}
private SocketServerConnection createConnection(final Throwable t, HostInfo hostInfo)
@@ -175,7 +188,7 @@
boolean ssl) throws CommunicationException, IOException {
SocketServerInstance instance = Mockito.mock(SocketServerInstance.class);
Mockito.stub(instance.getCryptor()).toReturn(new NullCryptor());
- Mockito.stub(instance.getHostInfo()).toReturn(info);
+ Mockito.stub(instance.getRemoteAddress()).toReturn(new
InetSocketAddress(info.getInetAddress(), info.getPortNumber()));
FakeILogon logon = new FakeILogon();
logon.t = t;
Mockito.stub(instance.getService(ILogon.class)).toReturn(logon);
@@ -191,11 +204,8 @@
SocketServerConnection conn = createConnection(null, new HostInfo("0.0.0.0",
1)); //$NON-NLS-1$
SocketServerConnection conn1 = createConnection(null, new HostInfo("0.0.0.1",
1)); //$NON-NLS-1$
- ClientSideDQP dqp = conn.getService(ClientSideDQP.class);
- ClientSideDQP dqp1 = conn1.getService(ClientSideDQP.class);
-
- assertFalse(SocketServerConnection.isSameInstance(dqp, dqp1));
- assertTrue(SocketServerConnection.isSameInstance(dqp, dqp));
+ assertFalse(conn.isSameInstance(conn1));
+ assertTrue(conn.isSameInstance(conn));
}
}
Modified: trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMServerConnection.java
===================================================================
--- trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMServerConnection.java 2009-03-25
16:01:36 UTC (rev 653)
+++ trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMServerConnection.java 2009-03-25
17:55:48 UTC (rev 654)
@@ -108,7 +108,7 @@
super.recycleConnection();
//perform load balancing
if (this.serverConn instanceof SocketServerConnection) {
- SocketServerConnection.selectNewServerInstance(this.getDQP());
+ ((SocketServerConnection)this.serverConn).selectNewServerInstance(this.getDQP());
}
}
@@ -117,7 +117,7 @@
if (conn instanceof MMServerConnection
&& this.serverConn instanceof SocketServerConnection
&& conn.serverConn instanceof SocketServerConnection) {
- return SocketServerConnection.isSameInstance(this.getDQP(), conn.getDQP());
+ return
((SocketServerConnection)this.serverConn).isSameInstance((SocketServerConnection)conn.serverConn);
}
return false;
}
Modified:
trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/SocketVMController.java
===================================================================
---
trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/SocketVMController.java 2009-03-25
16:01:36 UTC (rev 653)
+++
trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/SocketVMController.java 2009-03-25
17:55:48 UTC (rev 654)
@@ -68,8 +68,8 @@
private static final int DEFAULT_MAX_THREADS = 15;
private static final long DEFAULT_TIMETOLIVE = 15000;
private static final long DEFAULT_WAITFORSERVICES = 500;
- private static final int DEFAULT_INPUT_BUFFER_SIZE = 102400;
- private static final int DEFAULT_OUTPUT_BUFFER_SIZE = 102400;
+ private static final int DEFAULT_INPUT_BUFFER_SIZE = 0;
+ private static final int DEFAULT_OUTPUT_BUFFER_SIZE = 0;
private static final String SOCKET_WORKER_POOL_NAME = "SocketWorkerQueue";
//$NON-NLS-1$
private SocketListener listener;
Modified:
trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketListener.java
===================================================================
---
trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketListener.java 2009-03-25
16:01:36 UTC (rev 653)
+++
trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketListener.java 2009-03-25
17:55:48 UTC (rev 654)
@@ -89,14 +89,17 @@
new SynchronousQueue<Runnable>(),
new WorkerPoolFactory.DefaultThreadFactory("ServerNio")); //$NON-NLS-1$
- ChannelFactory factory =
- new NioServerSocketChannelFactory(executor, executor,
Runtime.getRuntime().availableProcessors() * 2);
+ ChannelFactory factory = new NioServerSocketChannelFactory(executor, executor);
ServerBootstrap bootstrap = new ServerBootstrap(factory);
this.channelHandler = new SSLAwareChannelHandler(this, engine,
Thread.currentThread().getContextClassLoader());
bootstrap.setPipelineFactory(channelHandler);
- bootstrap.setOption("receiveBufferSize", new Integer(inputBufferSize));
//$NON-NLS-1$
- bootstrap.setOption("sendBufferSize", new Integer(outputBufferSize));
//$NON-NLS-1$
+ if (inputBufferSize != 0) {
+ bootstrap.setOption("receiveBufferSize", new
Integer(inputBufferSize)); //$NON-NLS-1$
+ }
+ if (outputBufferSize != 0) {
+ bootstrap.setOption("sendBufferSize", new Integer(outputBufferSize));
//$NON-NLS-1$
+ }
bootstrap.setOption("keepAlive", Boolean.TRUE); //$NON-NLS-1$
this.serverChanel = bootstrap.bind(new InetSocketAddress(bindAddress, port));