[teiid-commits] teiid SVN: r654 - in trunk: client/src/main/java/com/metamatrix/common/comm/platform/socket/client and 4 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Wed Mar 25 13:55:48 EDT 2009


Author: shawkins
Date: 2009-03-25 13:55:48 -0400 (Wed, 25 Mar 2009)
New Revision: 654

Modified:
   trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMServerConnection.java
   trunk/client/src/main/java/com/metamatrix/common/comm/platform/client/ServerAdminFactory.java
   trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnection.java
   trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnectionFactory.java
   trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstance.java
   trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java
   trunk/client/src/test/java/com/metamatrix/common/comm/platform/socket/client/TestSocketServerConnection.java
   trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/SocketVMController.java
   trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketListener.java
Log:
TEIID-297 misc comm clean ups, ensuring that serverinstances get properly cycled.

Modified: trunk/client/src/main/java/com/metamatrix/common/comm/platform/client/ServerAdminFactory.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/client/ServerAdminFactory.java	2009-03-25 16:01:36 UTC (rev 653)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/client/ServerAdminFactory.java	2009-03-25 17:55:48 UTC (rev 654)
@@ -40,6 +40,7 @@
 import com.metamatrix.common.comm.platform.socket.client.SocketServerConnection;
 import com.metamatrix.common.comm.platform.socket.client.SocketServerConnectionFactory;
 import com.metamatrix.common.util.MetaMatrixProductNames;
+import com.metamatrix.common.util.PropertiesUtils;
 import com.metamatrix.core.MetaMatrixRuntimeException;
 
 /** 
@@ -205,10 +206,11 @@
     	return createAdmin(p);
     }
 
-	public ServerAdmin createAdmin(final Properties p)
+	public ServerAdmin createAdmin(Properties p)
 			throws AdminComponentException, AdminException {
+		p = PropertiesUtils.clone(p);
 		p.setProperty(MMURL.CONNECTION.PRODUCT_NAME, MetaMatrixProductNames.Platform.PRODUCT_NAME);
-    	
+    	p.setProperty(MMURL.CONNECTION.AUTO_FAILOVER, Boolean.TRUE.toString());
 		ServerAdmin serverAdmin = (ServerAdmin)Proxy.newProxyInstance(Thread.currentThread()
 				.getContextClassLoader(), new Class[] { ServerAdmin.class }, new ReconnectingProxy(p));
     	

Modified: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnection.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnection.java	2009-03-25 16:01:36 UTC (rev 653)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnection.java	2009-03-25 17:55:48 UTC (rev 654)
@@ -30,6 +30,7 @@
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
+import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.List;
@@ -54,7 +55,6 @@
 import com.metamatrix.common.comm.exception.ConnectionException;
 import com.metamatrix.common.comm.exception.SingleInstanceCommunicationException;
 import com.metamatrix.common.comm.platform.CommPlatformPlugin;
-import com.metamatrix.dqp.client.ClientSideDQP;
 import com.metamatrix.platform.security.api.ILogon;
 import com.metamatrix.platform.security.api.LogonResult;
 
@@ -78,6 +78,7 @@
     private ILogon logon;
     private Timer pingTimer;
     private boolean closed;
+	private boolean failOver;
     
 	public SocketServerConnection(
 			SocketServerInstanceFactory connectionFactory, boolean secure,
@@ -88,6 +89,7 @@
 		this.connProps = connProps;
 		this.secure = secure;
 		this.logon = this.getService(ILogon.class);
+		this.failOver = Boolean.valueOf(connProps.getProperty(MMURL.CONNECTION.AUTO_FAILOVER)).booleanValue();
 		
         authenticate(); 
         
@@ -111,7 +113,7 @@
 			if (this.serverInstance.isOpen()) {
 				return this.serverInstance;
 			}
-			this.serverInstance = null;
+			closeServerInstance();
 		}
 		List<HostInfo> hostKeys = new ArrayList<HostInfo>(this.serverDiscovery.getKnownHosts());
 		List<HostInfo> hostCopy = new ArrayList<HostInfo>(hostKeys);
@@ -157,7 +159,7 @@
         try {
             this.logonResult = logon.logon(connProps);
             if (this.serverDiscovery.setLogonResult(this.logonResult)) {
-            	selectNewServerInstance();
+            	closeServerInstance();
             }
             return;
         } catch (LogonException e) {
@@ -194,20 +196,15 @@
 	class ServerConnectionInvocationHandler implements InvocationHandler {
 		
 		private Class<?> targetClass;
-		private SocketServerInstance instance;
 		private Object target;
-		private boolean failOver;
 		
 		public ServerConnectionInvocationHandler(Class<?> targetClass) {
 			this.targetClass = targetClass;
-			this.failOver = Boolean.valueOf(connProps.getProperty(MMURL.CONNECTION.AUTO_FAILOVER)).booleanValue()
-					|| !ClientSideDQP.class.isAssignableFrom(targetClass);
 		}
 		
 		private synchronized Object getTarget() throws CommunicationException {
 			if (this.target == null) {
-				this.instance = selectServerInstance();
-				this.target = this.instance.getService(targetClass);
+				this.target = selectServerInstance().getService(targetClass);
 			}
 			return this.target;
 		}
@@ -240,13 +237,6 @@
 		private synchronized void invalidateTarget() {
 			this.target = null;
 		}
-
-		synchronized SocketServerInstance getInstance() throws CommunicationException {
-			if (instance == null) {
-				getTarget();
-			}
-			return instance;
-		}
 	    
 	}
 
@@ -274,10 +264,7 @@
 			//ignore
 		}
 		
-		if (this.serverInstance != null) {
-			this.serverInstance.shutdown();
-			this.serverInstance = null;
-		}
+		closeServerInstance();
 
 		this.closed = true;
 		this.serverDiscovery.shutdown();
@@ -298,25 +285,29 @@
 		return logonResult;
 	}
 	
-	synchronized void selectNewServerInstance() {
-		this.serverInstance = null;
+	synchronized void closeServerInstance() {
+		if (this.serverInstance != null) {
+			this.serverInstance.shutdown();
+			this.serverInstance = null;
+		}
 	}
 	
-	public static boolean isSameInstance(Object service, Object otherService) throws CommunicationException {
-		try {
-			ServerConnectionInvocationHandler handler = (ServerConnectionInvocationHandler)Proxy.getInvocationHandler(service);
-			ServerConnectionInvocationHandler otherHandler = (ServerConnectionInvocationHandler)Proxy.getInvocationHandler(otherService);
-			return handler.getInstance().getHostInfo().getInetAddress().equals(otherHandler.getInstance().getHostInfo().getInetAddress());
-		} catch (IllegalArgumentException e) {
+	public boolean isSameInstance(SocketServerConnection otherService) throws CommunicationException {
+		SocketAddress address = selectServerInstance().getRemoteAddress();
+		if (address == null) {
 			return false;
-		} catch (UnknownHostException e) {
-			throw new CommunicationException(e);
 		}
+		return address.equals(otherService.selectServerInstance().getRemoteAddress());
 	}
 	
-	public static void selectNewServerInstance(Object service) {
+	public void selectNewServerInstance(Object service) {
+		closeServerInstance();
 		ServerConnectionInvocationHandler handler = (ServerConnectionInvocationHandler)Proxy.getInvocationHandler(service);
 		handler.invalidateTarget();
 	}
+	
+	public void setFailOver(boolean failOver) {
+		this.failOver = failOver;
+	}
 
 }
\ No newline at end of file

Modified: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnectionFactory.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnectionFactory.java	2009-03-25 16:01:36 UTC (rev 653)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnectionFactory.java	2009-03-25 17:55:48 UTC (rev 654)
@@ -54,6 +54,14 @@
 import com.metamatrix.core.util.ReflectionHelper;
 import com.metamatrix.platform.security.api.ILogon;
 
+/**
+ * Responsible for creating socket based connections
+ * 
+ * The comm approach is object based and layered.  Connections manage failover and identity.  
+ * ServerInstances represent the service layer to a particular cluster member.  ObjectChannels
+ * abstract the underlying IO.
+ * 
+ */
 public class SocketServerConnectionFactory implements ServerConnectionFactory, SocketServerInstanceFactory {
 
 	private static final String URL = "URL"; //$NON-NLS-1$

Modified: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstance.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstance.java	2009-03-25 16:01:36 UTC (rev 653)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstance.java	2009-03-25 17:55:48 UTC (rev 654)
@@ -22,7 +22,8 @@
 
 package com.metamatrix.common.comm.platform.socket.client;
 
-import com.metamatrix.common.api.HostInfo;
+import java.net.SocketAddress;
+
 import com.metamatrix.common.util.crypto.Cryptor;
 
 public interface SocketServerInstance {
@@ -31,8 +32,8 @@
 
 	void shutdown();
 
-	HostInfo getHostInfo();
-
+	SocketAddress getRemoteAddress();
+	
 	boolean isOpen();
 	
 	Cryptor getCryptor();

Modified: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java	2009-03-25 16:01:36 UTC (rev 653)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java	2009-03-25 17:55:48 UTC (rev 654)
@@ -30,6 +30,7 @@
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.net.SocketTimeoutException;
 import java.util.Iterator;
 import java.util.Map;
@@ -104,11 +105,9 @@
         }
     }
     
-    /**
-     * Return identifier of the server VM this ServerInstance is associated with. 
-     */
-    public HostInfo getHostInfo() {
-        return this.hostInfo;
+    @Override
+    public SocketAddress getRemoteAddress() {
+    	return this.socketChannel.getRemoteAddress();
     }
     
     static String getVersionInfo() {

Modified: trunk/client/src/test/java/com/metamatrix/common/comm/platform/socket/client/TestSocketServerConnection.java
===================================================================
--- trunk/client/src/test/java/com/metamatrix/common/comm/platform/socket/client/TestSocketServerConnection.java	2009-03-25 16:01:36 UTC (rev 653)
+++ trunk/client/src/test/java/com/metamatrix/common/comm/platform/socket/client/TestSocketServerConnection.java	2009-03-25 17:55:48 UTC (rev 654)
@@ -26,6 +26,7 @@
 package com.metamatrix.common.comm.platform.socket.client;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.Properties;
 
 import junit.framework.TestCase;
@@ -41,7 +42,6 @@
 import com.metamatrix.common.comm.exception.ConnectionException;
 import com.metamatrix.common.comm.exception.SingleInstanceCommunicationException;
 import com.metamatrix.common.util.crypto.NullCryptor;
-import com.metamatrix.dqp.client.ClientSideDQP;
 import com.metamatrix.dqp.client.ResultsFuture;
 import com.metamatrix.platform.security.api.ILogon;
 import com.metamatrix.platform.security.api.LogonResult;
@@ -146,6 +146,7 @@
 	 */
 	public void testRetry() throws Exception {
 		SocketServerConnection connection = createConnection(new SingleInstanceCommunicationException());
+		connection.setFailOver(true);
 		ILogon logon = connection.getService(ILogon.class);
 		logon.ping();
 	}
@@ -160,9 +161,21 @@
 			
 		}
 	}
+	
+	public void testImmediateFail1() throws Exception {
+		SocketServerConnection connection = createConnection(new CommunicationException());
+		connection.setFailOver(true);
+		ILogon logon = connection.getService(ILogon.class);
+		try {
+			logon.ping();
+			fail("expected exception"); //$NON-NLS-1$
+		} catch (MetaMatrixComponentException e) {
+			
+		}
+	}
 
 	private SocketServerConnection createConnection(final Throwable throwException) throws CommunicationException, ConnectionException {
-		return createConnection(throwException, new HostInfo("foo", 1)); //$NON-NLS-1$
+		return createConnection(throwException, new HostInfo("0.0.0.2", 1)); //$NON-NLS-1$
 	}
 	
 	private SocketServerConnection createConnection(final Throwable t, HostInfo hostInfo)
@@ -175,7 +188,7 @@
 					boolean ssl) throws CommunicationException, IOException {
 				SocketServerInstance instance = Mockito.mock(SocketServerInstance.class);
 				Mockito.stub(instance.getCryptor()).toReturn(new NullCryptor());
-				Mockito.stub(instance.getHostInfo()).toReturn(info);
+				Mockito.stub(instance.getRemoteAddress()).toReturn(new InetSocketAddress(info.getInetAddress(), info.getPortNumber()));
 				FakeILogon logon = new FakeILogon();
 				logon.t = t;
 				Mockito.stub(instance.getService(ILogon.class)).toReturn(logon);
@@ -191,11 +204,8 @@
 		SocketServerConnection conn = createConnection(null, new HostInfo("0.0.0.0", 1)); //$NON-NLS-1$
 		SocketServerConnection conn1 = createConnection(null, new HostInfo("0.0.0.1", 1)); //$NON-NLS-1$
 		
-		ClientSideDQP dqp = conn.getService(ClientSideDQP.class);
-		ClientSideDQP dqp1 = conn1.getService(ClientSideDQP.class);
-		
-		assertFalse(SocketServerConnection.isSameInstance(dqp, dqp1));
-		assertTrue(SocketServerConnection.isSameInstance(dqp, dqp));
+		assertFalse(conn.isSameInstance(conn1));
+		assertTrue(conn.isSameInstance(conn));
 	}
 
 }

Modified: trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMServerConnection.java
===================================================================
--- trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMServerConnection.java	2009-03-25 16:01:36 UTC (rev 653)
+++ trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMServerConnection.java	2009-03-25 17:55:48 UTC (rev 654)
@@ -108,7 +108,7 @@
 		super.recycleConnection();
 		//perform load balancing
 		if (this.serverConn instanceof SocketServerConnection) {
-			SocketServerConnection.selectNewServerInstance(this.getDQP());
+			((SocketServerConnection)this.serverConn).selectNewServerInstance(this.getDQP());
 		}
 	}
 
@@ -117,7 +117,7 @@
 		if (conn instanceof MMServerConnection
 				&& this.serverConn instanceof SocketServerConnection
 				&& conn.serverConn instanceof SocketServerConnection) {
-			return SocketServerConnection.isSameInstance(this.getDQP(), conn.getDQP());
+			return ((SocketServerConnection)this.serverConn).isSameInstance((SocketServerConnection)conn.serverConn);
 		}
 		return false;
 	}

Modified: trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/SocketVMController.java
===================================================================
--- trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/SocketVMController.java	2009-03-25 16:01:36 UTC (rev 653)
+++ trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/SocketVMController.java	2009-03-25 17:55:48 UTC (rev 654)
@@ -68,8 +68,8 @@
     private static final int DEFAULT_MAX_THREADS = 15;
     private static final long DEFAULT_TIMETOLIVE = 15000;
     private static final long DEFAULT_WAITFORSERVICES = 500;
-    private static final int DEFAULT_INPUT_BUFFER_SIZE = 102400;
-    private static final int DEFAULT_OUTPUT_BUFFER_SIZE = 102400;
+    private static final int DEFAULT_INPUT_BUFFER_SIZE = 0;
+    private static final int DEFAULT_OUTPUT_BUFFER_SIZE = 0;
 
     private static final String SOCKET_WORKER_POOL_NAME = "SocketWorkerQueue"; //$NON-NLS-1$
     private SocketListener listener; 

Modified: trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketListener.java
===================================================================
--- trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketListener.java	2009-03-25 16:01:36 UTC (rev 653)
+++ trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketListener.java	2009-03-25 17:55:48 UTC (rev 654)
@@ -89,14 +89,17 @@
 				new SynchronousQueue<Runnable>(),
 				new WorkerPoolFactory.DefaultThreadFactory("ServerNio")); //$NON-NLS-1$
         
-        ChannelFactory factory =
-            new NioServerSocketChannelFactory(executor, executor, Runtime.getRuntime().availableProcessors() * 2);
+        ChannelFactory factory = new NioServerSocketChannelFactory(executor, executor);
         
         ServerBootstrap bootstrap = new ServerBootstrap(factory);
         this.channelHandler = new SSLAwareChannelHandler(this, engine, Thread.currentThread().getContextClassLoader());
         bootstrap.setPipelineFactory(channelHandler);
-        bootstrap.setOption("receiveBufferSize", new Integer(inputBufferSize)); //$NON-NLS-1$
-        bootstrap.setOption("sendBufferSize", new Integer(outputBufferSize)); //$NON-NLS-1$
+        if (inputBufferSize != 0) {
+        	bootstrap.setOption("receiveBufferSize", new Integer(inputBufferSize)); //$NON-NLS-1$
+        }
+        if (outputBufferSize != 0) {
+        	bootstrap.setOption("sendBufferSize", new Integer(outputBufferSize)); //$NON-NLS-1$
+        }
         bootstrap.setOption("keepAlive", Boolean.TRUE); //$NON-NLS-1$
         
         this.serverChanel = bootstrap.bind(new InetSocketAddress(bindAddress, port));




More information about the teiid-commits mailing list