Author: shawkins
Date: 2010-08-20 18:18:46 -0400 (Fri, 20 Aug 2010)
New Revision: 2482
Modified:
branches/7.1.x/client/src/main/java/org/teiid/adminapi/AdminFactory.java
branches/7.1.x/client/src/main/java/org/teiid/client/security/ILogon.java
branches/7.1.x/client/src/main/java/org/teiid/jdbc/ConnectionImpl.java
branches/7.1.x/client/src/main/java/org/teiid/jdbc/XAConnectionImpl.java
branches/7.1.x/client/src/main/java/org/teiid/net/HostInfo.java
branches/7.1.x/client/src/main/java/org/teiid/net/ServerConnection.java
branches/7.1.x/client/src/main/java/org/teiid/net/socket/AdminApiServerDiscovery.java
branches/7.1.x/client/src/main/java/org/teiid/net/socket/Handshake.java
branches/7.1.x/client/src/main/java/org/teiid/net/socket/SocketServerConnection.java
branches/7.1.x/client/src/main/java/org/teiid/net/socket/SocketServerConnectionFactory.java
branches/7.1.x/client/src/main/java/org/teiid/net/socket/SocketServerInstance.java
branches/7.1.x/client/src/main/java/org/teiid/net/socket/SocketServerInstanceFactory.java
branches/7.1.x/client/src/main/java/org/teiid/net/socket/SocketServerInstanceImpl.java
branches/7.1.x/client/src/test/java/org/teiid/net/socket/TestAdminApiServerDiscovery.java
branches/7.1.x/client/src/test/java/org/teiid/net/socket/TestSocketServerConnection.java
branches/7.1.x/client/src/test/java/org/teiid/net/socket/TestSocketServerInstanceImpl.java
branches/7.1.x/runtime/src/main/java/org/teiid/transport/LocalServerConnection.java
branches/7.1.x/runtime/src/main/java/org/teiid/transport/LogonImpl.java
branches/7.1.x/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java
branches/7.1.x/runtime/src/main/java/org/teiid/transport/PgFrontendProtocol.java
branches/7.1.x/runtime/src/main/java/org/teiid/transport/SSLAwareChannelHandler.java
branches/7.1.x/runtime/src/main/java/org/teiid/transport/SocketClientInstance.java
branches/7.1.x/runtime/src/main/java/org/teiid/transport/SocketListener.java
branches/7.1.x/runtime/src/test/java/org/teiid/transport/TestCommSockets.java
branches/7.1.x/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java
Log:
TEIID-1211 TEIID-1204 reimplementing loadbalancing and failover logic. this initial
checkin mainly addresses failover
Modified: branches/7.1.x/client/src/main/java/org/teiid/adminapi/AdminFactory.java
===================================================================
--- branches/7.1.x/client/src/main/java/org/teiid/adminapi/AdminFactory.java 2010-08-19
15:30:38 UTC (rev 2481)
+++ branches/7.1.x/client/src/main/java/org/teiid/adminapi/AdminFactory.java 2010-08-20
22:18:46 UTC (rev 2482)
@@ -27,11 +27,9 @@
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Properties;
-import java.util.concurrent.ExecutionException;
import org.teiid.client.security.LogonException;
import org.teiid.client.util.ExceptionUtil;
-import org.teiid.client.util.ResultsFuture;
import org.teiid.core.TeiidRuntimeException;
import org.teiid.core.util.PropertiesUtils;
import org.teiid.net.CommunicationException;
@@ -51,42 +49,23 @@
private static final int DEFAULT_BOUNCE_WAIT = 2000;
- private final class ReconnectingProxy implements InvocationHandler {
+ private final class AdminProxy implements InvocationHandler {
private Admin target;
private ServerConnection registry;
private Properties p;
private boolean closed;
- public ReconnectingProxy(Properties p) throws ConnectionException,
CommunicationException {
+ public AdminProxy(Properties p) throws ConnectionException, CommunicationException
{
this.p = p;
this.registry = serverConnectionFactory.getConnection(p);
this.target = registry.getService(Admin.class);
}
- private synchronized Admin getTarget() throws AdminComponentException,
CommunicationException {
+ private synchronized Admin getTarget() throws AdminComponentException {
if (closed) {
throw new
AdminComponentException(NetPlugin.Util.getString("ERR.014.001.0001"));
//$NON-NLS-1$
}
- if (target != null) {
- ResultsFuture<?> ping = registry.isOpen();
- if (ping != null) {
- try {
- ping.get();
- return target;
- } catch (InterruptedException e) {
- throw new CommunicationException(e);
- } catch (ExecutionException e) {
- //assume recoverable
- }
- }
- }
- try {
- registry = serverConnectionFactory.getConnection(p);
- } catch (ConnectionException e) {
- throw new AdminComponentException(e);
- }
- target = registry.getService(Admin.class);
return target;
}
@@ -97,7 +76,6 @@
close();
return null;
}
- Throwable t = null;
try {
return method.invoke(getTarget(), args);
} catch (InvocationTargetException e) {
@@ -109,10 +87,7 @@
}
}
throw e.getTargetException();
- } catch (CommunicationException e) {
- t = e;
}
- throw t;
}
public synchronized void close() {
@@ -231,7 +206,7 @@
p.setProperty(TeiidURL.CONNECTION.ADMIN, Boolean.TRUE.toString());
try {
- Admin serverAdmin = (Admin)Proxy.newProxyInstance(this.getClass().getClassLoader(),
new Class[] { Admin.class }, new ReconnectingProxy(p));
+ Admin serverAdmin = (Admin)Proxy.newProxyInstance(this.getClass().getClassLoader(),
new Class[] { Admin.class }, new AdminProxy(p));
return serverAdmin;
} catch (ConnectionException e) {
throw new AdminComponentException(e);
Modified: branches/7.1.x/client/src/main/java/org/teiid/client/security/ILogon.java
===================================================================
--- branches/7.1.x/client/src/main/java/org/teiid/client/security/ILogon.java 2010-08-19
15:30:38 UTC (rev 2481)
+++ branches/7.1.x/client/src/main/java/org/teiid/client/security/ILogon.java 2010-08-20
22:18:46 UTC (rev 2482)
@@ -21,11 +21,13 @@
*/
package org.teiid.client.security;
+import java.util.Collection;
import java.util.Properties;
import org.teiid.client.util.ResultsFuture;
import org.teiid.core.ComponentNotFoundException;
import org.teiid.core.TeiidComponentException;
+import org.teiid.net.CommunicationException;
/**
@@ -33,7 +35,7 @@
*/
public interface ILogon {
LogonResult logon(Properties connectionProperties)
- throws LogonException, TeiidComponentException;
+ throws LogonException, TeiidComponentException, CommunicationException;
/**
* Ping the server to see if the client-server connection is alive.
@@ -41,8 +43,10 @@
* @throws ComponentNotFoundException if can't find the Session service.
*/
ResultsFuture<?> ping()
- throws InvalidSessionException, TeiidComponentException;
+ throws InvalidSessionException, TeiidComponentException, CommunicationException;
+ ResultsFuture<?> ping(Collection<String> sessions)
+ throws TeiidComponentException, CommunicationException;
/**
* Log off the specified session.
@@ -51,5 +55,5 @@
*/
ResultsFuture<?> logoff() throws InvalidSessionException,
TeiidComponentException;
- void assertIdentity(SessionToken sessionId) throws InvalidSessionException,
TeiidComponentException;
+ void assertIdentity(SessionToken sessionId) throws InvalidSessionException,
TeiidComponentException, CommunicationException;
}
Modified: branches/7.1.x/client/src/main/java/org/teiid/jdbc/ConnectionImpl.java
===================================================================
--- branches/7.1.x/client/src/main/java/org/teiid/jdbc/ConnectionImpl.java 2010-08-19
15:30:38 UTC (rev 2481)
+++ branches/7.1.x/client/src/main/java/org/teiid/jdbc/ConnectionImpl.java 2010-08-20
22:18:46 UTC (rev 2482)
@@ -45,9 +45,6 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -878,20 +875,7 @@
}
public boolean isValid(int timeout) throws SQLException {
- ResultsFuture<?> future = this.getServerConnection().isOpen();
- if (future == null) {
- return false;
- }
- try {
- future.get(timeout, TimeUnit.SECONDS);
- return true;
- } catch (InterruptedException e) {
- return false;
- } catch (ExecutionException e) {
- return false;
- } catch (TimeoutException e) {
- return false;
- }
+ return this.getServerConnection().isOpen(timeout * 1000);
}
public void recycleConnection() {
@@ -918,7 +902,7 @@
//perform load balancing
if (this.serverConn instanceof SocketServerConnection) {
- ((SocketServerConnection)this.serverConn).selectNewServerInstance(this.getDQP());
+ ((SocketServerConnection)this.serverConn).selectNewServerInstance();
}
}
Modified: branches/7.1.x/client/src/main/java/org/teiid/jdbc/XAConnectionImpl.java
===================================================================
--- branches/7.1.x/client/src/main/java/org/teiid/jdbc/XAConnectionImpl.java 2010-08-19
15:30:38 UTC (rev 2481)
+++ branches/7.1.x/client/src/main/java/org/teiid/jdbc/XAConnectionImpl.java 2010-08-20
22:18:46 UTC (rev 2482)
@@ -80,7 +80,7 @@
ex = ExceptionUtil.getExceptionOfType(e, CommunicationException.class);
if (ex instanceof SingleInstanceCommunicationException) {
ServerConnection sc = proxiedConnection.getServerConnection();
- if (sc.isOpen() != null) {
+ if (!sc.isOpen(ServerConnection.PING_INTERVAL)) {
ex = null;
}
}
Modified: branches/7.1.x/client/src/main/java/org/teiid/net/HostInfo.java
===================================================================
--- branches/7.1.x/client/src/main/java/org/teiid/net/HostInfo.java 2010-08-19 15:30:38
UTC (rev 2481)
+++ branches/7.1.x/client/src/main/java/org/teiid/net/HostInfo.java 2010-08-20 22:18:46
UTC (rev 2482)
@@ -23,6 +23,7 @@
package
org.teiid.net;
import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import org.teiid.core.util.ArgCheck;
@@ -38,12 +39,12 @@
private String hostName;
private int portNumber = 0;
private InetAddress inetAddress;
-
- public InetAddress getInetAddress() throws UnknownHostException {
- if (inetAddress != null) {
- return inetAddress;
- }
- return InetAddress.getByName(this.hostName);
+ private boolean ssl;
+
+ public HostInfo(String hostName, InetSocketAddress addr) {
+ this.hostName = hostName;
+ this.portNumber = addr.getPort();
+ this.inetAddress = addr.getAddress();
}
public HostInfo (String host, int port) {
@@ -61,6 +62,13 @@
}
}
+ public InetAddress getInetAddress() throws UnknownHostException {
+ if (inetAddress != null) {
+ return inetAddress;
+ }
+ return InetAddress.getByName(this.hostName);
+ }
+
public String getHostName() {
return hostName;
}
@@ -87,7 +95,13 @@
return false;
}
HostInfo hostInfo = (HostInfo) obj;
- return hostName.equals(hostInfo.getHostName()) && portNumber ==
hostInfo.getPortNumber();
+ if (portNumber != hostInfo.getPortNumber()) {
+ return false;
+ }
+ if (inetAddress != null && hostInfo.inetAddress != null) {
+ return inetAddress.equals(hostInfo.inetAddress);
+ }
+ return hostName.equals(hostInfo.getHostName());
}
/**
@@ -98,5 +112,17 @@
int hc = HashCodeUtil.hashCode(0, hostName);
return HashCodeUtil.hashCode(hc, portNumber);
}
+
+ public boolean isResolved() {
+ return this.inetAddress != null;
+ }
+
+ public boolean isSsl() {
+ return ssl;
+ }
+
+ public void setSsl(boolean ssl) {
+ this.ssl = ssl;
+ }
}
Modified: branches/7.1.x/client/src/main/java/org/teiid/net/ServerConnection.java
===================================================================
--- branches/7.1.x/client/src/main/java/org/teiid/net/ServerConnection.java 2010-08-19
15:30:38 UTC (rev 2481)
+++ branches/7.1.x/client/src/main/java/org/teiid/net/ServerConnection.java 2010-08-20
22:18:46 UTC (rev 2482)
@@ -23,7 +23,6 @@
package
org.teiid.net;
import org.teiid.client.security.LogonResult;
-import org.teiid.client.util.ResultsFuture;
public interface ServerConnection {
@@ -33,7 +32,7 @@
void close();
- ResultsFuture<?> isOpen();
+ boolean isOpen(long msToTest);
LogonResult getLogonResult();
Modified:
branches/7.1.x/client/src/main/java/org/teiid/net/socket/AdminApiServerDiscovery.java
===================================================================
---
branches/7.1.x/client/src/main/java/org/teiid/net/socket/AdminApiServerDiscovery.java 2010-08-19
15:30:38 UTC (rev 2481)
+++
branches/7.1.x/client/src/main/java/org/teiid/net/socket/AdminApiServerDiscovery.java 2010-08-20
22:18:46 UTC (rev 2482)
@@ -82,7 +82,7 @@
&& (info.lastDiscoveryTime < System.currentTimeMillis() -
DISCOVERY_TIMEOUT || info.knownHosts.isEmpty())) {
Admin serverAdmin = instance.getService(Admin.class);
try {
- Collection<ProcessObject> processes =
serverAdmin.getProcesses("*");
+ Collection<ProcessObject> processes = serverAdmin.getProcesses("*");
//$NON-NLS-1$
info.knownHosts.clear();
for (ProcessObject processObject : processes) {
if (!processObject.isEnabled() || !processObject.isRunning()) {
Modified: branches/7.1.x/client/src/main/java/org/teiid/net/socket/Handshake.java
===================================================================
--- branches/7.1.x/client/src/main/java/org/teiid/net/socket/Handshake.java 2010-08-19
15:30:38 UTC (rev 2481)
+++ branches/7.1.x/client/src/main/java/org/teiid/net/socket/Handshake.java 2010-08-20
22:18:46 UTC (rev 2482)
@@ -50,8 +50,8 @@
/**
* @param version The version to set.
*/
- public void setVersion(String version) {
- this.version = version;
+ public void setVersion() {
+ this.version = ApplicationInfo.getInstance().getMajorReleaseNumber();
}
/**
Modified:
branches/7.1.x/client/src/main/java/org/teiid/net/socket/SocketServerConnection.java
===================================================================
---
branches/7.1.x/client/src/main/java/org/teiid/net/socket/SocketServerConnection.java 2010-08-19
15:30:38 UTC (rev 2481)
+++
branches/7.1.x/client/src/main/java/org/teiid/net/socket/SocketServerConnection.java 2010-08-20
22:18:46 UTC (rev 2482)
@@ -26,22 +26,18 @@
package org.teiid.net.socket;
import java.io.IOException;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
-import java.net.SocketAddress;
+import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -52,6 +48,7 @@
import org.teiid.client.util.ExceptionUtil;
import org.teiid.client.util.ResultsFuture;
import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidException;
import org.teiid.net.CommunicationException;
import org.teiid.net.ConnectionException;
import org.teiid.net.HostInfo;
@@ -66,8 +63,7 @@
*/
public class SocketServerConnection implements ServerConnection {
- private static final int RETRY_COUNT = 3;
-
+ private static final int FAILOVER_PING_INTERVAL = 1000;
private SocketServerInstanceFactory connectionFactory;
private ServerDiscovery serverDiscovery;
private static Logger log = Logger.getLogger("org.teiid.client.sockets");
//$NON-NLS-1$
@@ -76,72 +72,44 @@
private Properties connProps;
private SocketServerInstance serverInstance;
- private volatile LogonResult logonResult;
+ private LogonResult logonResult;
+ private Map<HostInfo, LogonResult> logonResults = new
ConcurrentHashMap<HostInfo, LogonResult>();
private ILogon logon;
- private Timer pingTimer;
private boolean closed;
private boolean failOver;
+ private long lastPing = System.currentTimeMillis();
+ private int pingFailOverInterval = FAILOVER_PING_INTERVAL;
public SocketServerConnection(
SocketServerInstanceFactory connectionFactory, boolean secure,
- ServerDiscovery serverDiscovery, Properties connProps,
- Timer pingTimer) throws CommunicationException, ConnectionException {
+ ServerDiscovery serverDiscovery, Properties connProps) throws CommunicationException,
ConnectionException {
this.connectionFactory = connectionFactory;
this.serverDiscovery = serverDiscovery;
this.connProps = connProps;
this.secure = secure;
+ //ILogon that is allowed to failover
this.logon = this.getService(ILogon.class);
this.failOver =
Boolean.valueOf(connProps.getProperty(TeiidURL.CONNECTION.AUTO_FAILOVER)).booleanValue();
-
- authenticate();
-
- this.pingTimer = pingTimer;
- schedulePing();
+ this.failOver |=
Boolean.valueOf(connProps.getProperty(TeiidURL.CONNECTION.ADMIN)).booleanValue();
+ selectServerInstance();
}
-
- private void schedulePing() {
- if (this.pingTimer != null) {
- this.pingTimer.schedule(new TimerTask() {
-
- private ResultsFuture<?> ping;
-
- @Override
- public void run() {
- if (ping == null) {
- ping = isOpen();
- }
- if (ping != null) {
- try {
- ping.get(1, TimeUnit.SECONDS);
- ping = null;
- return;
- } catch (TimeoutException e) {
- return;
- } catch (Throwable e) {
- handlePingError(e);
- }
- }
- this.cancel();
- }
-
- }, PING_INTERVAL, PING_INTERVAL);
- }
- }
/**
* Implements a sticky random selection policy
* TODO: make this customizable
* TODO: put more information on hostinfo as to process response time, last successful
connect, etc.
+ * @throws ConnectionException
*/
public synchronized SocketServerInstance selectServerInstance()
- throws CommunicationException {
+ throws CommunicationException, ConnectionException {
if (closed) {
throw new
CommunicationException(NetPlugin.Util.getString("SocketServerConnection.closed"));
//$NON-NLS-1$
}
if (this.serverInstance != null && (!failOver || this.serverInstance.isOpen()))
{
return this.serverInstance;
}
- List<HostInfo> hostKeys = new
ArrayList<HostInfo>(this.serverDiscovery.getKnownHosts(logonResult,
this.serverInstance));
+ List<HostInfo> hostKeys = new
ArrayList<HostInfo>(this.serverDiscovery.getKnownHosts(logonResult, null));
+ boolean discoverHosts = true;
closeServerInstance();
List<HostInfo> hostCopy = new ArrayList<HostInfo>(hostKeys);
int knownHosts = hostKeys.size();
@@ -150,23 +118,38 @@
Exception ex = null;
try {
- SocketServerInstance instance = connectionFactory.getServerInstance(hostInfo,
secure);
- this.serverInstance = instance;
- if (this.logonResult != null) {
- ILogon newLogon = instance.getService(ILogon.class);
- newLogon.assertIdentity(logonResult.getSessionToken());
+ ILogon newLogon = connect(hostInfo);
+ if (this.logonResult == null) {
+ try {
+ this.logonResult = newLogon.logon(connProps);
+ this.logonResults.put(this.serverInstance.getHostInfo(),
this.logonResult);
+ this.connectionFactory.connected(this.serverInstance,
this.logonResult.getSessionToken());
+ this.serverDiscovery.connectionSuccessful(hostInfo);
+ if (discoverHosts) {
+ List<HostInfo> updatedHosts =
this.serverDiscovery.getKnownHosts(logonResult, this.serverInstance);
+ if (updatedHosts.size() > 1 && new
HashSet<HostInfo>(updatedHosts).size() > new
HashSet<HostInfo>(hostCopy).size()) {
+ hostKeys = updatedHosts;
+ closeServerInstance();
+ discoverHosts = false;
+ continue;
+ }
+ }
+ } catch (LogonException e) {
+ // Propagate the original message as it contains the message we want
+ // to give to the user
+ throw new ConnectionException(e, e.getMessage());
+ } catch (TeiidComponentException e) {
+ if (e.getCause() instanceof CommunicationException) {
+ throw (CommunicationException)e.getCause();
+ }
+ throw new CommunicationException(e,
NetPlugin.Util.getString("PlatformServerConnectionFactory.Unable_to_find_a_component_used_in_logging_on_to"));
//$NON-NLS-1$
+ }
}
- this.serverDiscovery.connectionSuccessful(hostInfo);
return this.serverInstance;
} catch (IOException e) {
ex = e;
- } catch (InvalidSessionException e) {
- shutdown(false);
- throw new
CommunicationException(e,NetPlugin.Util.getString("SocketServerInstance.Connection_Error.Connect_Failed",
hostInfo.getHostName(), String.valueOf(hostInfo.getPortNumber()), e.getMessage()));
//$NON-NLS-1$
} catch (SingleInstanceCommunicationException e) {
ex = e;
- } catch (TeiidComponentException e) {
- ex = e;
}
this.serverDiscovery.markInstanceAsBad(hostInfo);
if (knownHosts == 1) { //just a single host, use the exception
@@ -179,133 +162,127 @@
}
throw new
CommunicationException(NetPlugin.Util.getString("SocketServerInstancePool.No_valid_host_available",
hostCopy.toString())); //$NON-NLS-1$
}
-
- public synchronized void authenticate() throws ConnectionException,
CommunicationException {
- this.logonResult = null;
- // Log on to server
- try {
- this.logonResult = logon.logon(connProps);
- List<HostInfo> knownHosts =
this.serverDiscovery.getKnownHosts(logonResult, this.serverInstance);
- if (knownHosts.size() > 1 && !new
HashSet<HostInfo>(knownHosts).equals(new
HashSet<HostInfo>(this.serverDiscovery.getKnownHosts(logonResult, null)))) {
- //if there are multiple instances, allow for load-balancing
- closeServerInstance();
- }
- return;
- } catch (LogonException e) {
- // Propagate the original message as it contains the message we want
- // to give to the user
- throw new ConnectionException(e, e.getMessage());
- } catch (TeiidComponentException e) {
- if (e.getCause() instanceof CommunicationException) {
- throw (CommunicationException)e.getCause();
- }
- throw new CommunicationException(e,
NetPlugin.Util.getString("PlatformServerConnectionFactory.Unable_to_find_a_component_used_in_logging_on_to"));
//$NON-NLS-1$
- }
- }
-
- class ServerConnectionInvocationHandler implements InvocationHandler {
-
- private Class<?> targetClass;
- private Object target;
-
- public ServerConnectionInvocationHandler(Class<?> targetClass) {
- this.targetClass = targetClass;
+
+ private ILogon connect(HostInfo hostInfo) throws CommunicationException,
+ IOException {
+ if (!hostInfo.isResolved()) {
+ hostInfo = new HostInfo(hostInfo.getHostName(), new
InetSocketAddress(hostInfo.getInetAddress(), hostInfo.getPortNumber()));
}
-
- private synchronized Object getTarget() throws CommunicationException {
- if (this.target == null) {
- this.target = selectServerInstance().getService(targetClass);
+ hostInfo.setSsl(secure);
+ this.serverInstance = connectionFactory.getServerInstance(hostInfo);
+ this.logonResult = logonResults.get(hostInfo);
+ ILogon newLogon = this.serverInstance.getService(ILogon.class);
+ if (this.logonResult != null) {
+ try {
+ newLogon.assertIdentity(logonResult.getSessionToken());
+ } catch (TeiidException e) {
+ // session is no longer valid
+ disconnect();
}
- return this.target;
}
-
- public Object invoke(Object proxy, Method method, Object[] args)
- throws Throwable {
- Throwable exception = null;
- for (int i = 0; i < RETRY_COUNT; i++) {
- try {
- return method.invoke(getTarget(), args);
- } catch (InvocationTargetException t) {
- exception = t.getTargetException();
- } catch (Throwable t) {
- exception = t;
- }
- if (!failOver || ExceptionUtil.getExceptionOfType(exception,
SingleInstanceCommunicationException.class) == null) {
- break;
- }
- invalidateTarget();
- //TODO: look for invalid session exception
- }
- throw ExceptionUtil.convertException(method, exception);
- }
-
- private synchronized void invalidateTarget() {
- this.target = null;
- }
-
+ return newLogon;
}
-
+
public <T> T getService(Class<T> iface) {
- return iface.cast(Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]
{iface}, new ServerConnectionInvocationHandler(iface)));
+ return iface.cast(Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]
{iface}, new SocketServerInstanceImpl.RemoteInvocationHandler(iface) {
+ @Override
+ protected SocketServerInstance getInstance() throws CommunicationException {
+ if (failOver && System.currentTimeMillis() - lastPing >
pingFailOverInterval) {
+ try {
+ ResultsFuture<?> future =
selectServerInstance().getService(ILogon.class).ping();
+ future.get();
+ } catch (SingleInstanceCommunicationException e) {
+ closeServerInstance();
+ } catch (CommunicationException e) {
+ throw e;
+ } catch (InvalidSessionException e) {
+ disconnect();
+ closeServerInstance();
+ } catch (Exception e) {
+ closeServerInstance();
+ }
+ }
+ lastPing = System.currentTimeMillis();
+ try {
+ return selectServerInstance();
+ } catch (ConnectionException e) {
+ throw new CommunicationException(e);
+ }
+ }
+
+ public Object invoke(Object proxy, Method method, Object[] args)
+ throws Throwable {
+ try {
+ return super.invoke(proxy, method, args);
+ } catch (Exception e) {
+ if (ExceptionUtil.getExceptionOfType(e, InvalidSessionException.class) != null) {
+ disconnect();
+ }
+ throw e;
+ }
+ }
+
+ }));
}
+
public synchronized void close() {
- shutdown(true);
- }
- private synchronized void shutdown(boolean logoff) {
if (this.closed) {
return;
}
- if (logoff) {
+ if (this.serverInstance != null) {
+ logoff();
+ }
+
+ for (Map.Entry<HostInfo, LogonResult> logonEntry : logonResults.entrySet()) {
try {
- //make a best effort to send the logoff
- Future<?> writeFuture = this.logon.logoff();
- writeFuture.get(5000, TimeUnit.MILLISECONDS);
- } catch (InvalidSessionException e) {
- //ignore
- } catch (InterruptedException e) {
- //ignore
- } catch (ExecutionException e) {
- //ignore
- } catch (TimeoutException e) {
- //ignore
- } catch (TeiidComponentException e) {
- //ignore
+ connect(logonEntry.getKey());
+ logoff();
+ } catch (Exception e) {
+
}
}
- closeServerInstance();
-
this.closed = true;
this.serverDiscovery.shutdown();
}
+
+ private void logoff() {
+ disconnect();
+ try {
+ //make a best effort to send the logoff
+ Future<?> writeFuture = this.serverInstance.getService(ILogon.class).logoff();
+ writeFuture.get(5000, TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ //ignore
+ }
+ closeServerInstance();
+ }
+
+ private void disconnect() {
+ this.logonResults.remove(this.serverInstance.getHostInfo());
+ if (this.logonResult != null) {
+ this.connectionFactory.disconnected(this.serverInstance,
this.logonResult.getSessionToken());
+ }
+ }
- public synchronized ResultsFuture<?> isOpen() {
+ private synchronized ResultsFuture<?> isOpen() throws CommunicationException,
InvalidSessionException, TeiidComponentException {
if (this.closed) {
- return null;
+ throw new CommunicationException();
}
+ return logon.ping();
+ }
+
+ public boolean isOpen(long msToTest) {
try {
- if (!selectServerInstance().isOpen()) {
- return null;
- }
- } catch (CommunicationException e) {
- return null;
- }
- try {
- return logon.ping();
+ ResultsFuture<?> future = isOpen();
+ future.get(msToTest, TimeUnit.MILLISECONDS);
+ return true;
} catch (Throwable th) {
- return null;
+ return false;
}
}
- private void handlePingError(Throwable th) {
- if (ExceptionUtil.getExceptionOfType(th, InvalidSessionException.class) != null) {
- shutdown(false);
- } else {
- close();
- }
- }
-
public LogonResult getLogonResult() {
return logonResult;
}
@@ -321,20 +298,22 @@
if (!(otherService instanceof SocketServerConnection)) {
return false;
}
- SocketAddress address = selectServerInstance().getRemoteAddress();
- if (address == null) {
- return false;
+ try {
+ return
selectServerInstance().getHostInfo().equals(((SocketServerConnection)otherService).selectServerInstance().getHostInfo());
+ } catch (ConnectionException e) {
+ throw new CommunicationException(e);
}
- return
address.equals(((SocketServerConnection)otherService).selectServerInstance().getRemoteAddress());
}
- public void selectNewServerInstance(Object service) {
+ public void selectNewServerInstance() {
closeServerInstance();
- ServerConnectionInvocationHandler handler =
(ServerConnectionInvocationHandler)Proxy.getInvocationHandler(service);
- handler.invalidateTarget();
}
public void setFailOver(boolean failOver) {
this.failOver = failOver;
}
+
+ public void setFailOverPingInterval(int pingFailOverInterval) {
+ this.pingFailOverInterval = pingFailOverInterval;
+ }
}
\ No newline at end of file
Modified:
branches/7.1.x/client/src/main/java/org/teiid/net/socket/SocketServerConnectionFactory.java
===================================================================
---
branches/7.1.x/client/src/main/java/org/teiid/net/socket/SocketServerConnectionFactory.java 2010-08-19
15:30:38 UTC (rev 2481)
+++
branches/7.1.x/client/src/main/java/org/teiid/net/socket/SocketServerConnectionFactory.java 2010-08-20
22:18:46 UTC (rev 2482)
@@ -30,11 +30,16 @@
import java.lang.reflect.Proxy;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -42,12 +47,15 @@
import java.util.logging.Logger;
import org.teiid.client.security.ILogon;
+import org.teiid.client.security.InvalidSessionException;
+import org.teiid.client.security.SessionToken;
import org.teiid.core.TeiidException;
import org.teiid.core.util.PropertiesUtils;
import org.teiid.core.util.ReflectionHelper;
import org.teiid.net.CommunicationException;
import org.teiid.net.ConnectionException;
import org.teiid.net.HostInfo;
+import org.teiid.net.ServerConnection;
import org.teiid.net.ServerConnectionFactory;
import org.teiid.net.TeiidURL;
@@ -106,13 +114,11 @@
private static class CachedInstance {
HostInfo info;
Integer instance;
- boolean ssl;
SocketServerInstance actual;
SocketServerInstance proxy;
- public CachedInstance(HostInfo info, boolean ssl) {
+ public CachedInstance(HostInfo info) {
this.info = info;
- this.ssl = ssl;
}
@Override
@@ -129,7 +135,7 @@
return false;
}
CachedInstance other = (CachedInstance) obj;
- if (!info.equals(other.info) || ssl != other.ssl) {
+ if (!info.equals(other.info)) {
return false;
}
if (instance == null || other.instance == null) {
@@ -142,6 +148,8 @@
private ObjectChannelFactory channelFactory;
private Timer pingTimer;
+ private HashMap<HostInfo, Set<SessionToken>> sessions = new
HashMap<HostInfo, Set<SessionToken>>();
+
//instance pooling
private AtomicInteger instanceCount = new AtomicInteger();
private Map<CachedInstance, CachedInstance> instancePool = new
LinkedHashMap<CachedInstance, CachedInstance>();
@@ -180,15 +188,58 @@
public void initialize(Properties info) {
PropertiesUtils.setBeanProperties(this, info, "org.teiid.sockets");
//$NON-NLS-1$
this.pingTimer = new Timer("SocketPing", true); //$NON-NLS-1$
+ this.pingTimer.schedule(new TimerTask() {
+
+ @Override
+ public void run() {
+ Set<Map.Entry<HostInfo, Set<SessionToken>>> sessionEntries = null;
+ synchronized (sessions) {
+ sessionEntries = new HashSet<Map.Entry<HostInfo,
Set<SessionToken>>>(sessions.entrySet());
+ }
+ for (Map.Entry<HostInfo, Set<SessionToken>> entry : sessionEntries) {
+ SocketServerInstance instance = null;
+ HashSet<SessionToken> entries = null;
+ synchronized (sessions) {
+ entries = new HashSet<SessionToken>(entry.getValue());
+ }
+ try {
+ instance = getServerInstance(entry.getKey());
+ ILogon logon = instance.getService(ILogon.class);
+ if ("7.1.1".compareTo(instance.getServerVersion()) > 0) {
//$NON-NLS-1$
+ for (SessionToken session : entries) {
+ try {
+ logon.assertIdentity(session);
+ logon.ping();
+ } catch (InvalidSessionException e) {
+ }
+ }
+ } else {
+ ArrayList<String> sessionStrings = new
ArrayList<String>(entry.getValue().size());
+ for (SessionToken session : entries) {
+ sessionStrings.add(session.getSessionID());
+ }
+ logon.ping(sessionStrings);
+ }
+ } catch (Exception e) {
+ log.log(Level.WARNING, "Error performing keep-alive ping", e);
//$NON-NLS-1$
+ } finally {
+ if (instance != null) {
+ instance.shutdown();
+ }
+ }
+ }
+ }
+ }, ServerConnection.PING_INTERVAL, ServerConnection.PING_INTERVAL);
this.channelFactory = new OioOjbectChannelFactory(info);
}
-
- public SocketServerInstance getServerInstance(HostInfo info, boolean ssl) throws
CommunicationException, IOException {
+
+ @Override
+ public SocketServerInstance getServerInstance(HostInfo info) throws
CommunicationException, IOException {
CachedInstance key = null;
- CachedInstance instance = null;
boolean useCache = this.maxCachedInstances > 0;
if (useCache) {
- key = new CachedInstance(info, ssl);
+ CachedInstance instance = null;
+ key = new CachedInstance(info);
synchronized (instancePool) {
instance = instancePool.remove(key);
}
@@ -197,7 +248,7 @@
boolean valid = false;
try {
Future<?> success = logon.ping();
- success.get(this.channelFactory.getSoTimeout(), TimeUnit.MICROSECONDS);
+ success.get(this.channelFactory.getSoTimeout(), TimeUnit.MILLISECONDS);
valid = true;
} catch (Exception e) {
log.log(Level.FINE, "Error performing ping, will select another instance",
e); //$NON-NLS-1$
@@ -219,7 +270,7 @@
}
}
}
- SocketServerInstanceImpl ssii = new SocketServerInstanceImpl(info, ssl,
getSynchronousTtl());
+ SocketServerInstanceImpl ssii = new SocketServerInstanceImpl(info,
getSynchronousTtl());
ssii.connect(this.channelFactory);
if (useCache) {
key.actual = ssii;
@@ -256,7 +307,7 @@
discovery.init(url, connectionProperties);
- return new SocketServerConnection(this, url.isUsingSSL(), discovery,
connectionProperties, pingTimer);
+ return new SocketServerConnection(this, url.isUsingSSL(), discovery,
connectionProperties);
}
static void updateConnectionProperties(Properties connectionProperties) {
@@ -284,5 +335,30 @@
public void setMaxCachedInstances(int maxCachedInstances) {
this.maxCachedInstances = maxCachedInstances;
}
+
+ @Override
+ public void connected(SocketServerInstance instance, SessionToken session) {
+ synchronized (sessions) {
+ Set<SessionToken> instanceSessions = sessions.get(instance.getHostInfo());
+ if (instanceSessions == null) {
+ instanceSessions = new HashSet<SessionToken>();
+ sessions.put(instance.getHostInfo(), instanceSessions);
+ }
+ instanceSessions.add(session);
+ }
+ }
+
+ @Override
+ public void disconnected(SocketServerInstance instance, SessionToken session) {
+ synchronized (sessions) {
+ Set<SessionToken> instanceSessions = sessions.get(instance.getHostInfo());
+ if (instanceSessions != null) {
+ instanceSessions.remove(session);
+ if (instanceSessions.isEmpty()) {
+ sessions.remove(instance.getHostInfo());
+ }
+ }
+ }
+ }
}
Modified:
branches/7.1.x/client/src/main/java/org/teiid/net/socket/SocketServerInstance.java
===================================================================
---
branches/7.1.x/client/src/main/java/org/teiid/net/socket/SocketServerInstance.java 2010-08-19
15:30:38 UTC (rev 2481)
+++
branches/7.1.x/client/src/main/java/org/teiid/net/socket/SocketServerInstance.java 2010-08-20
22:18:46 UTC (rev 2482)
@@ -22,9 +22,14 @@
package org.teiid.net.socket;
-import java.net.SocketAddress;
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.teiid.client.util.ResultsFuture;
+import org.teiid.client.util.ResultsReceiver;
import org.teiid.core.crypto.Cryptor;
+import org.teiid.net.CommunicationException;
import org.teiid.net.HostInfo;
@@ -33,8 +38,6 @@
<T> T getService(Class<T> iface);
void shutdown();
-
- SocketAddress getRemoteAddress();
HostInfo getHostInfo();
@@ -42,4 +45,12 @@
Cryptor getCryptor();
+ long getSynchTimeout();
+
+ void send(Message message, ResultsReceiver<Object> receiver, Serializable key)
throws CommunicationException, InterruptedException;
+
+ void read(long timeout, TimeUnit unit, ResultsFuture<?> resultsFuture) throws
TimeoutException, InterruptedException;
+
+ String getServerVersion();
+
}
\ No newline at end of file
Modified:
branches/7.1.x/client/src/main/java/org/teiid/net/socket/SocketServerInstanceFactory.java
===================================================================
---
branches/7.1.x/client/src/main/java/org/teiid/net/socket/SocketServerInstanceFactory.java 2010-08-19
15:30:38 UTC (rev 2481)
+++
branches/7.1.x/client/src/main/java/org/teiid/net/socket/SocketServerInstanceFactory.java 2010-08-20
22:18:46 UTC (rev 2482)
@@ -24,12 +24,17 @@
import java.io.IOException;
+import org.teiid.client.security.SessionToken;
import org.teiid.net.CommunicationException;
import org.teiid.net.HostInfo;
public interface SocketServerInstanceFactory {
- SocketServerInstance getServerInstance(HostInfo info, boolean ssl) throws
CommunicationException, IOException;
+ SocketServerInstance getServerInstance(HostInfo info) throws CommunicationException,
IOException;
+
+ void connected(SocketServerInstance instance, SessionToken session);
+
+ void disconnected(SocketServerInstance instance, SessionToken session);
}
\ No newline at end of file
Modified:
branches/7.1.x/client/src/main/java/org/teiid/net/socket/SocketServerInstanceImpl.java
===================================================================
---
branches/7.1.x/client/src/main/java/org/teiid/net/socket/SocketServerInstanceImpl.java 2010-08-19
15:30:38 UTC (rev 2481)
+++
branches/7.1.x/client/src/main/java/org/teiid/net/socket/SocketServerInstanceImpl.java 2010-08-20
22:18:46 UTC (rev 2482)
@@ -30,7 +30,6 @@
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
-import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.util.Iterator;
import java.util.Map;
@@ -53,7 +52,6 @@
import org.teiid.core.crypto.Cryptor;
import org.teiid.core.crypto.DhKeyGenerator;
import org.teiid.core.crypto.NullCryptor;
-import org.teiid.core.util.ApplicationInfo;
import org.teiid.net.CommunicationException;
import org.teiid.net.HostInfo;
import org.teiid.net.NetPlugin;
@@ -69,31 +67,28 @@
static final int HANDSHAKE_RETRIES = 10;
private static Logger log = Logger.getLogger("org.teiid.client.sockets");
//$NON-NLS-1$
- private AtomicInteger MESSAGE_ID = new AtomicInteger();
+ private static AtomicInteger MESSAGE_ID = new AtomicInteger();
private Map<Serializable, ResultsReceiver<Object>> asynchronousListeners
= new ConcurrentHashMap<Serializable, ResultsReceiver<Object>>();
- private HostInfo hostInfo;
- private boolean ssl;
private long synchTimeout;
+ private HostInfo info;
private ObjectChannel socketChannel;
private Cryptor cryptor;
+ private String serverVersion;
private boolean hasReader;
- public SocketServerInstanceImpl() {
-
- }
-
- public SocketServerInstanceImpl(final HostInfo host, boolean ssl, long synchTimeout)
{
- this.hostInfo = host;
- this.ssl = ssl;
+ public SocketServerInstanceImpl(HostInfo info, long synchTimeout) {
+ if (!info.isResolved()) {
+ throw new AssertionError("Expected HostInfo to be resolved");
//$NON-NLS-1$
+ }
+ this.info = info;
this.synchTimeout = synchTimeout;
}
public synchronized void connect(ObjectChannelFactory channelFactory) throws
CommunicationException, IOException {
- InetSocketAddress address = new InetSocketAddress(hostInfo.getInetAddress(),
hostInfo.getPortNumber());
- this.socketChannel = channelFactory.createObjectChannel(address, ssl);
+ this.socketChannel = channelFactory.createObjectChannel(new
InetSocketAddress(info.getInetAddress(), info.getPortNumber()), info.isSsl());
try {
doHandshake();
} catch (CommunicationException e) {
@@ -105,20 +100,11 @@
}
}
- @Override
+ @Override
public HostInfo getHostInfo() {
- return this.hostInfo;
+ return info;
}
- @Override
- public SocketAddress getRemoteAddress() {
- return this.socketChannel.getRemoteAddress();
- }
-
- static String getVersionInfo() {
- return ApplicationInfo.getInstance().getMajorReleaseNumber();
- }
-
private void doHandshake() throws IOException, CommunicationException {
Handshake handshake = null;
for (int i = 0; i < HANDSHAKE_RETRIES; i++) {
@@ -143,9 +129,9 @@
/*if (!getVersionInfo().equals(handshake.getVersion())) {
throw new
CommunicationException(NetPlugin.Util.getString("SocketServerInstanceImpl.version_mismatch",
getVersionInfo(), handshake.getVersion())); //$NON-NLS-1$
}*/
+ serverVersion = handshake.getVersion();
+ handshake.setVersion();
- handshake.setVersion(getVersionInfo());
-
byte[] serverPublicKey = handshake.getPublicKey();
if (serverPublicKey != null) {
@@ -162,12 +148,17 @@
throw new CommunicationException(err);
}
}
+
+ @Override
+ public String getServerVersion() {
+ return serverVersion;
+ }
public boolean isOpen() {
return socketChannel.isOpen();
}
- protected void send(Message message, ResultsReceiver<Object> listener,
Serializable messageKey)
+ public void send(Message message, ResultsReceiver<Object> listener,
Serializable messageKey)
throws CommunicationException, InterruptedException {
if (listener != null) {
asynchronousListeners.put(messageKey, listener);
@@ -227,6 +218,7 @@
listener.receiveResults(messagePacket.getContents());
}
} else {
+ //TODO: could ping back
log.log(Level.FINE, "packet ignored:" + packet); //$NON-NLS-1$
}
}
@@ -242,7 +234,7 @@
return this.cryptor;
}
- void read(long timeout, TimeUnit unit, ResultsFuture<?> future) throws
TimeoutException, InterruptedException {
+ public void read(long timeout, TimeUnit unit, ResultsFuture<?> future) throws
TimeoutException, InterruptedException {
long timeoutMillis = (int)Math.min(unit.toMillis(timeout), Integer.MAX_VALUE);
long start = System.currentTimeMillis();
while (!future.isDone()) {
@@ -280,15 +272,24 @@
}
}
}
-
+
@SuppressWarnings("unchecked")
@Override
public <T> T getService(Class<T> iface) {
- return (T)Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[] {iface},
new RemoteInvocationHandler(iface));
+ return (T)Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[] {iface},
new RemoteInvocationHandler(iface) {
+ @Override
+ protected SocketServerInstanceImpl getInstance() {
+ return SocketServerInstanceImpl.this;
+ }
+ });
}
- public class RemoteInvocationHandler implements InvocationHandler {
+ public long getSynchTimeout() {
+ return synchTimeout;
+ }
+ public static abstract class RemoteInvocationHandler implements InvocationHandler {
+
private boolean secure;
private Class<?> targetClass;
@@ -302,17 +303,18 @@
throws Throwable {
Throwable t = null;
try {
+ final SocketServerInstance instance = getInstance();
Message message = new Message();
message.setContents(new ServiceInvocationStruct(args, method.getName(),
targetClass));
if (secure) {
- message.setContents(getCryptor().sealObject(message.getContents()));
+ message.setContents(instance.getCryptor().sealObject(message.getContents()));
}
ResultsFuture<Object> results = new ResultsFuture<Object>() {
@Override
protected Object convertResult() throws ExecutionException {
try {
- Object result = getCryptor().unsealObject((Serializable) super.convertResult());
+ Object result = instance.getCryptor().unsealObject(super.convertResult());
if (result instanceof ExceptionHolder) {
throw new ExecutionException(((ExceptionHolder)result).getException());
}
@@ -342,17 +344,17 @@
public Object get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException,
TimeoutException {
- read(timeout, unit, this);
+ instance.read(timeout, unit, this);
return super.get(timeout, unit);
}
};
final ResultsReceiver<Object> receiver = results.getResultsReceiver();
- send(message, receiver, Integer.valueOf(MESSAGE_ID.getAndIncrement()));
+ instance.send(message, receiver, Integer.valueOf(MESSAGE_ID.getAndIncrement()));
if (ResultsFuture.class.isAssignableFrom(method.getReturnType())) {
return results;
}
- return results.get(synchTimeout, TimeUnit.MILLISECONDS);
+ return results.get(instance.getSynchTimeout(), TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
t = e.getCause();
} catch (TimeoutException e) {
@@ -362,6 +364,8 @@
}
throw ExceptionUtil.convertException(method, t);
}
+
+ protected abstract SocketServerInstance getInstance() throws CommunicationException;
}
Modified:
branches/7.1.x/client/src/test/java/org/teiid/net/socket/TestAdminApiServerDiscovery.java
===================================================================
---
branches/7.1.x/client/src/test/java/org/teiid/net/socket/TestAdminApiServerDiscovery.java 2010-08-19
15:30:38 UTC (rev 2481)
+++
branches/7.1.x/client/src/test/java/org/teiid/net/socket/TestAdminApiServerDiscovery.java 2010-08-20
22:18:46 UTC (rev 2482)
@@ -35,7 +35,6 @@
import org.teiid.client.security.LogonResult;
import org.teiid.net.HostInfo;
import org.teiid.net.TeiidURL;
-import org.teiid.net.socket.SocketServerInstance;
public class TestAdminApiServerDiscovery extends TestCase {
@@ -65,7 +64,7 @@
processes.add(p2);
Mockito.stub(serverAdmin.getProcesses("*")).toReturn(processes);
//$NON-NLS-1$
Mockito.stub(instance.getService(Admin.class)).toReturn(serverAdmin);
- Mockito.stub(instance.getHostInfo()).toReturn(knownHost);
+ Mockito.stub(instance.getHostInfo().getHostName()).toReturn("foo");
//$NON-NLS-1$
discovery.connectionSuccessful(knownHost);
List<HostInfo> knownHosts = discovery.getKnownHosts(new LogonResult(),
instance);
Modified:
branches/7.1.x/client/src/test/java/org/teiid/net/socket/TestSocketServerConnection.java
===================================================================
---
branches/7.1.x/client/src/test/java/org/teiid/net/socket/TestSocketServerConnection.java 2010-08-19
15:30:38 UTC (rev 2481)
+++
branches/7.1.x/client/src/test/java/org/teiid/net/socket/TestSocketServerConnection.java 2010-08-20
22:18:46 UTC (rev 2482)
@@ -25,29 +25,30 @@
*/
package org.teiid.net.socket;
+import static org.junit.Assert.*;
+
import java.io.IOException;
-import java.net.InetSocketAddress;
+import java.io.Serializable;
+import java.util.Collection;
import java.util.Properties;
-import junit.framework.TestCase;
-
+import org.junit.Test;
import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.teiid.client.security.ILogon;
import org.teiid.client.security.InvalidSessionException;
import org.teiid.client.security.LogonException;
import org.teiid.client.security.LogonResult;
import org.teiid.client.security.SessionToken;
import org.teiid.client.util.ResultsFuture;
+import org.teiid.client.util.ResultsReceiver;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.crypto.NullCryptor;
import org.teiid.net.CommunicationException;
import org.teiid.net.ConnectionException;
import org.teiid.net.HostInfo;
import org.teiid.net.TeiidURL;
-import org.teiid.net.socket.SocketServerConnection;
-import org.teiid.net.socket.SocketServerConnectionFactory;
-import org.teiid.net.socket.SocketServerInstance;
-import org.teiid.net.socket.SocketServerInstanceFactory;
/**
@@ -55,12 +56,16 @@
* @see SocketServerConnection
* @since Westport
*/
-public class TestSocketServerConnection extends TestCase {
+public class TestSocketServerConnection {
private static final class FakeILogon implements ILogon {
Throwable t;
+ public FakeILogon(Throwable t) {
+ this.t = t;
+ }
+
@Override
public void assertIdentity(SessionToken sessionId)
throws InvalidSessionException, TeiidComponentException {
@@ -83,15 +88,25 @@
@Override
public ResultsFuture<?> ping()
- throws InvalidSessionException,
- TeiidComponentException {
+ throws TeiidComponentException, CommunicationException {
if (t != null) {
+ if (t instanceof CommunicationException) {
+ CommunicationException ce = (CommunicationException)t;
+ t = null;
+ throw ce;
+ }
TeiidComponentException e = new TeiidComponentException(t);
t = null;
throw e;
}
- return null;
+ return ResultsFuture.NULL_FUTURE;
}
+
+ @Override
+ public ResultsFuture<?> ping(Collection<String> sessions)
+ throws TeiidComponentException, CommunicationException {
+ return ping();
+ }
}
/**
@@ -109,7 +124,7 @@
*
* @since Westport
*/
- public void testSocketServerConnection_PropertiesClientHost() throws Throwable {
+ @Test public void testSocketServerConnection_PropertiesClientHost() throws Throwable {
Properties p = new Properties();
SocketServerConnectionFactory.updateConnectionProperties(p);
@@ -118,25 +133,20 @@
assertTrue(p.containsKey(TeiidURL.CONNECTION.CLIENT_IP_ADDRESS));
}
- public void testLogonFailsWithMultipleHosts() throws Exception {
+ @Test public void testLogonFailsWithMultipleHosts() throws Exception {
Properties p = new Properties();
- SocketServerInstanceFactory instanceFactory = new SocketServerInstanceFactory() {
- @Override
- public SocketServerInstance getServerInstance(HostInfo info,
- boolean ssl) throws CommunicationException, IOException {
- throw new SingleInstanceCommunicationException();
- }
- };
+ SocketServerInstanceFactory instanceFactory =
Mockito.mock(SocketServerInstanceFactory.class);
+ Mockito.stub(instanceFactory.getServerInstance((HostInfo)Mockito.anyObject())).toThrow(new
SingleInstanceCommunicationException());
ServerDiscovery discovery = new UrlServerDiscovery(new
TeiidURL("mm://host1:1,host2:2")); //$NON-NLS-1$
try {
- new SocketServerConnection(instanceFactory, false, discovery, p, null);
+ new SocketServerConnection(instanceFactory, false, discovery, p);
fail("exception expected"); //$NON-NLS-1$
} catch (CommunicationException e) {
assertEquals("No valid host available. Attempted connections to: [host1:1,
host2:2]", e.getMessage()); //$NON-NLS-1$
}
}
- public void testLogon() throws Exception {
+ @Test public void testLogon() throws Exception {
SocketServerConnection connection = createConnection(null);
assertEquals(String.valueOf(1), connection.getLogonResult().getSessionID());
}
@@ -144,63 +154,83 @@
/**
* Since the original instance is still open, this will be a transparent retry
*/
- public void testRetry() throws Exception {
+ @Test public void testRetry() throws Exception {
SocketServerConnection connection = createConnection(new
SingleInstanceCommunicationException());
connection.setFailOver(true);
+ connection.setFailOverPingInterval(50);
ILogon logon = connection.getService(ILogon.class);
+ Thread.sleep(70);
logon.ping();
}
- public void testImmediateFail() throws Exception {
+ @Test(expected=CommunicationException.class) public void testImmediateFail() throws
Exception {
SocketServerConnection connection = createConnection(new CommunicationException());
ILogon logon = connection.getService(ILogon.class);
- try {
- logon.ping();
- fail("expected exception"); //$NON-NLS-1$
- } catch (TeiidComponentException e) {
-
- }
+ logon.ping();
}
- public void testImmediateFail1() throws Exception {
+ @Test(expected=CommunicationException.class) public void testImmediateFail1() throws
Exception {
SocketServerConnection connection = createConnection(new CommunicationException());
connection.setFailOver(true);
ILogon logon = connection.getService(ILogon.class);
- try {
- logon.ping();
- fail("expected exception"); //$NON-NLS-1$
- } catch (TeiidComponentException e) {
-
- }
+ logon.ping();
}
private SocketServerConnection createConnection(final Throwable throwException) throws
CommunicationException, ConnectionException {
return createConnection(throwException, new HostInfo("0.0.0.2", 1));
//$NON-NLS-1$
}
- private SocketServerConnection createConnection(final Throwable t, HostInfo hostInfo)
+ private SocketServerConnection createConnection(final Throwable t, final HostInfo
hostInfo)
throws CommunicationException, ConnectionException {
Properties p = new Properties();
ServerDiscovery discovery = new UrlServerDiscovery(new TeiidURL(hostInfo.getHostName(),
hostInfo.getPortNumber(), false));
SocketServerInstanceFactory instanceFactory = new SocketServerInstanceFactory() {
+ FakeILogon logon = new FakeILogon(t);
+
@Override
- public SocketServerInstance getServerInstance(final HostInfo info,
- boolean ssl) throws CommunicationException, IOException {
+ public SocketServerInstance getServerInstance(HostInfo info)
+ throws CommunicationException, IOException {
SocketServerInstance instance = Mockito.mock(SocketServerInstance.class);
Mockito.stub(instance.getCryptor()).toReturn(new NullCryptor());
- Mockito.stub(instance.getRemoteAddress()).toReturn(new
InetSocketAddress(info.getInetAddress(), info.getPortNumber()));
- FakeILogon logon = new FakeILogon();
- logon.t = t;
+ Mockito.stub(instance.getHostInfo()).toReturn(hostInfo);
Mockito.stub(instance.getService(ILogon.class)).toReturn(logon);
+ if (t != null) {
+ try {
+ Mockito.doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation)
+ throws Throwable {
+ if (logon.t == null) {
+ return null;
+ }
+ throw logon.t;
+ }
+ }).when(instance).send((Message)Mockito.anyObject(),
(ResultsReceiver<Object>)Mockito.anyObject(), (Serializable)Mockito.anyObject());
+ } catch (Exception e) {
+
+ }
+ }
Mockito.stub(instance.isOpen()).toReturn(true);
return instance;
}
+
+ @Override
+ public void connected(SocketServerInstance instance,
+ SessionToken session) {
+
+ }
+
+ @Override
+ public void disconnected(SocketServerInstance instance,
+ SessionToken session) {
+
+ }
};
- SocketServerConnection connection = new SocketServerConnection(instanceFactory, false,
discovery, p, null);
+ SocketServerConnection connection = new SocketServerConnection(instanceFactory, false,
discovery, p);
return connection;
}
- public void testIsSameInstance() throws Exception {
+ @Test public void testIsSameInstance() throws Exception {
SocketServerConnection conn = createConnection(null, new HostInfo("0.0.0.0",
1)); //$NON-NLS-1$
SocketServerConnection conn1 = createConnection(null, new HostInfo("0.0.0.1",
1)); //$NON-NLS-1$
Modified:
branches/7.1.x/client/src/test/java/org/teiid/net/socket/TestSocketServerInstanceImpl.java
===================================================================
---
branches/7.1.x/client/src/test/java/org/teiid/net/socket/TestSocketServerInstanceImpl.java 2010-08-19
15:30:38 UTC (rev 2481)
+++
branches/7.1.x/client/src/test/java/org/teiid/net/socket/TestSocketServerInstanceImpl.java 2010-08-20
22:18:46 UTC (rev 2482)
@@ -34,17 +34,14 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
-import org.junit.Ignore;
import org.junit.Test;
import org.teiid.client.security.ILogon;
import org.teiid.client.util.ResultsFuture;
import org.teiid.core.TeiidComponentException;
import org.teiid.net.CommunicationException;
import org.teiid.net.HostInfo;
-import org.teiid.net.socket.ObjectChannelFactory;
-import org.teiid.net.socket.SocketServerInstanceImpl;
-
+@SuppressWarnings("nls")
public class TestSocketServerInstanceImpl {
private static class FakeObjectChannel implements ObjectChannel, ObjectChannelFactory {
@@ -127,7 +124,8 @@
private SocketServerInstanceImpl createInstance(ObjectChannelFactory channelFactory)
throws CommunicationException, IOException {
- SocketServerInstanceImpl ssii = new SocketServerInstanceImpl(new
HostInfo("0.0.0.0", 1), false, 1); //$NON-NLS-1$
+ HostInfo info = new HostInfo("0.0.0.0", 1);
+ SocketServerInstanceImpl ssii = new SocketServerInstanceImpl(info, 1);
ssii.connect(channelFactory);
return ssii;
}
@@ -147,17 +145,4 @@
}
}
- @Ignore
- @Test public void testVersionMismatch() throws Exception {
- Handshake h = new Handshake();
- h.setVersion("foo"); //$NON-NLS-1$
- final FakeObjectChannel channel = new FakeObjectChannel(Arrays.asList(h));
- try {
- createInstance(channel);
- fail("exception expected"); //$NON-NLS-1$
- } catch (CommunicationException e) {
- assertTrue(e.getMessage().startsWith("Handshake failed due to version
mismatch")); //$NON-NLS-1$
- }
- }
-
}
Modified:
branches/7.1.x/runtime/src/main/java/org/teiid/transport/LocalServerConnection.java
===================================================================
---
branches/7.1.x/runtime/src/main/java/org/teiid/transport/LocalServerConnection.java 2010-08-19
15:30:38 UTC (rev 2481)
+++
branches/7.1.x/runtime/src/main/java/org/teiid/transport/LocalServerConnection.java 2010-08-20
22:18:46 UTC (rev 2482)
@@ -123,11 +123,11 @@
}
@Override
- public ResultsFuture<?> isOpen() {
+ public boolean isOpen(long msToTest) {
if (shutdown) {
- return null;
+ return false;
}
- return ResultsFuture.NULL_FUTURE;
+ return true;
}
public void close() {
Modified: branches/7.1.x/runtime/src/main/java/org/teiid/transport/LogonImpl.java
===================================================================
--- branches/7.1.x/runtime/src/main/java/org/teiid/transport/LogonImpl.java 2010-08-19
15:30:38 UTC (rev 2481)
+++ branches/7.1.x/runtime/src/main/java/org/teiid/transport/LogonImpl.java 2010-08-20
22:18:46 UTC (rev 2482)
@@ -22,6 +22,7 @@
package org.teiid.transport;
+import java.util.Collection;
import java.util.Properties;
import javax.security.auth.login.LoginException;
@@ -41,6 +42,7 @@
import org.teiid.dqp.service.SessionServiceException;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
+import org.teiid.net.CommunicationException;
import org.teiid.net.TeiidURL;
import org.teiid.security.Credentials;
@@ -107,6 +109,18 @@
LogManager.logTrace(LogConstants.CTX_SECURITY, "Ping", id); //$NON-NLS-1$
return ResultsFuture.NULL_FUTURE;
}
+
+ @Override
+ public ResultsFuture<?> ping(Collection<String> sessions)
+ throws TeiidComponentException, CommunicationException {
+ for (String string : sessions) {
+ try {
+ this.service.pingServer(string);
+ } catch (InvalidSessionException e) {
+ }
+ }
+ return ResultsFuture.NULL_FUTURE;
+ }
@Override
public void assertIdentity(SessionToken checkSession) throws InvalidSessionException,
TeiidComponentException {
Modified: branches/7.1.x/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java
===================================================================
---
branches/7.1.x/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java 2010-08-19
15:30:38 UTC (rev 2481)
+++
branches/7.1.x/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java 2010-08-20
22:18:46 UTC (rev 2482)
@@ -39,7 +39,6 @@
import org.jboss.netty.channel.ChannelDownstreamHandler;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.teiid.core.util.ReflectionHelper;
@@ -54,7 +53,6 @@
* Some parts of this code is taken from H2's implementation of ODBC
*/
@SuppressWarnings("nls")
-@ChannelPipelineCoverage("one")
public class PgBackendProtocol implements ChannelDownstreamHandler, ODBCClientRemote {
private static final int PG_TYPE_VARCHAR = 1043;
Modified:
branches/7.1.x/runtime/src/main/java/org/teiid/transport/PgFrontendProtocol.java
===================================================================
---
branches/7.1.x/runtime/src/main/java/org/teiid/transport/PgFrontendProtocol.java 2010-08-19
15:30:38 UTC (rev 2481)
+++
branches/7.1.x/runtime/src/main/java/org/teiid/transport/PgFrontendProtocol.java 2010-08-20
22:18:46 UTC (rev 2482)
@@ -34,7 +34,6 @@
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
@@ -46,7 +45,6 @@
* Some parts of this code is taken from H2's implementation of ODBC
*/
@SuppressWarnings("nls")
-@ChannelPipelineCoverage("one")
public class PgFrontendProtocol extends FrameDecoder {
private int maxObjectSize;
Modified:
branches/7.1.x/runtime/src/main/java/org/teiid/transport/SSLAwareChannelHandler.java
===================================================================
---
branches/7.1.x/runtime/src/main/java/org/teiid/transport/SSLAwareChannelHandler.java 2010-08-19
15:30:38 UTC (rev 2481)
+++
branches/7.1.x/runtime/src/main/java/org/teiid/transport/SSLAwareChannelHandler.java 2010-08-20
22:18:46 UTC (rev 2482)
@@ -42,13 +42,13 @@
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.DefaultChannelPipeline;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.ChannelHandler.Sharable;
import org.jboss.netty.handler.ssl.SslHandler;
import org.jboss.netty.handler.stream.ChunkedWriteHandler;
import org.teiid.common.buffer.StorageManager;
@@ -62,7 +62,7 @@
* Main class for creating Netty Nio Channels
*/
-(a)ChannelPipelineCoverage(ChannelPipelineCoverage.ALL)
+@Sharable
public class SSLAwareChannelHandler extends SimpleChannelHandler implements
ChannelPipelineFactory {
public class ObjectChannelImpl implements ObjectChannel {
Modified:
branches/7.1.x/runtime/src/main/java/org/teiid/transport/SocketClientInstance.java
===================================================================
---
branches/7.1.x/runtime/src/main/java/org/teiid/transport/SocketClientInstance.java 2010-08-19
15:30:38 UTC (rev 2481)
+++
branches/7.1.x/runtime/src/main/java/org/teiid/transport/SocketClientInstance.java 2010-08-20
22:18:46 UTC (rev 2482)
@@ -94,7 +94,6 @@
public void onConnection() throws CommunicationException {
Handshake handshake = new Handshake();
- handshake.setVersion(SocketListener.getVersionInfo());
if (usingEncryption) {
keyGen = new DhKeyGenerator();
Modified: branches/7.1.x/runtime/src/main/java/org/teiid/transport/SocketListener.java
===================================================================
---
branches/7.1.x/runtime/src/main/java/org/teiid/transport/SocketListener.java 2010-08-19
15:30:38 UTC (rev 2481)
+++
branches/7.1.x/runtime/src/main/java/org/teiid/transport/SocketListener.java 2010-08-20
22:18:46 UTC (rev 2482)
@@ -31,7 +31,6 @@
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.teiid.common.buffer.StorageManager;
-import org.teiid.core.util.ApplicationInfo;
import org.teiid.core.util.NamedThreadFactory;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
@@ -99,10 +98,6 @@
return ((InetSocketAddress)this.serverChanel.getLocalAddress()).getPort();
}
- static String getVersionInfo() {
- return ApplicationInfo.getInstance().getMajorReleaseNumber();
- }
-
public void stop() {
this.serverChanel.close();
this.nettyPool.shutdownNow();
Modified: branches/7.1.x/runtime/src/test/java/org/teiid/transport/TestCommSockets.java
===================================================================
---
branches/7.1.x/runtime/src/test/java/org/teiid/transport/TestCommSockets.java 2010-08-19
15:30:38 UTC (rev 2481)
+++
branches/7.1.x/runtime/src/test/java/org/teiid/transport/TestCommSockets.java 2010-08-20
22:18:46 UTC (rev 2482)
@@ -37,6 +37,7 @@
import org.teiid.client.security.ILogon;
import org.teiid.client.security.LogonException;
import org.teiid.client.security.LogonResult;
+import org.teiid.client.security.SessionToken;
import org.teiid.common.buffer.BufferManagerFactory;
import org.teiid.core.ComponentNotFoundException;
import org.teiid.core.crypto.NullCryptor;
@@ -69,19 +70,14 @@
}
}
- @Test public void testFailedConnect() throws Exception {
+ @Test(expected=CommunicationException.class) public void testFailedConnect() throws
Exception {
SSLConfiguration config = new SSLConfiguration();
listener = new SocketListener(addr.getPort(), addr.getAddress().getHostAddress(),1024,
1024, 1, config, null, BufferManagerFactory.getStandaloneBufferManager());
- try {
- Properties p = new Properties();
- String url = new TeiidURL(addr.getHostName(), listener.getPort() - 1,
false).getAppServerURL();
- p.setProperty(TeiidURL.CONNECTION.SERVER_URL, url); //wrong port
- SocketServerConnectionFactory.getInstance().getConnection(p);
- fail("exception expected"); //$NON-NLS-1$
- } catch (CommunicationException e) {
-
- }
+ Properties p = new Properties();
+ String url = new TeiidURL(addr.getHostName(), listener.getPort() - 1,
false).getAppServerURL();
+ p.setProperty(TeiidURL.CONNECTION.SERVER_URL, url); //wrong port
+ SocketServerConnectionFactory.getInstance().getConnection(p);
}
@Test public void testConnectWithoutPooling() throws Exception {
@@ -157,7 +153,7 @@
@Override
public LogonResult logon(Properties connProps)
throws LogonException, ComponentNotFoundException {
- return new LogonResult();
+ return new LogonResult(new SessionToken("dummy"), "x", 1,
"z");
}
}, null);
Modified:
branches/7.1.x/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java
===================================================================
---
branches/7.1.x/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java 2010-08-19
15:30:38 UTC (rev 2481)
+++
branches/7.1.x/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java 2010-08-20
22:18:46 UTC (rev 2482)
@@ -29,6 +29,10 @@
import java.io.Reader;
import java.io.Serializable;
import java.io.StringReader;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -60,7 +64,7 @@
import org.teiid.net.socket.SocketServerInstanceImpl;
import org.teiid.net.socket.UrlServerDiscovery;
-
+@SuppressWarnings("nls")
public class TestSocketRemoting {
public interface FakeService {
@@ -104,15 +108,11 @@
ClientServiceRegistryImpl server;
private ResultsReceiver<Object> listener;
- public FakeClientServerInstance(ClientServiceRegistryImpl server) {
- super();
+ public FakeClientServerInstance(ClientServiceRegistryImpl server) throws
UnknownHostException {
+ super(new HostInfo("foo", new InetSocketAddress(InetAddress.getLocalHost(),
1)), 1000);
this.server = server;
}
-
- public HostInfo getHostInfo() {
- return new HostInfo("fake", 1); //$NON-NLS-1$
- }
-
+
public boolean isOpen() {
return true;
}
@@ -182,6 +182,12 @@
}
@Override
+ public ResultsFuture<?> ping(Collection<String> sessions)
+ throws TeiidComponentException, CommunicationException {
+ return null;
+ }
+
+ @Override
public void assertIdentity(SessionToken sessionId)
throws InvalidSessionException,
TeiidComponentException {
@@ -226,12 +232,24 @@
SocketServerConnection connection = new SocketServerConnection(new
SocketServerInstanceFactory() {
@Override
- public SocketServerInstance getServerInstance(HostInfo info,
- boolean ssl) throws CommunicationException, IOException {
+ public SocketServerInstance getServerInstance(HostInfo info)
+ throws CommunicationException, IOException {
return serverInstance;
}
- }, false, new UrlServerDiscovery(new TeiidURL("foo", 1, false)), new
Properties(), null); //$NON-NLS-1$
+ @Override
+ public void connected(SocketServerInstance instance,
+ SessionToken session) {
+
+ }
+
+ @Override
+ public void disconnected(SocketServerInstance instance,
+ SessionToken session) {
+
+ }
+
+ }, false, new UrlServerDiscovery(new TeiidURL("0.0.0.0", 1, false)), new
Properties()); //$NON-NLS-1$
return connection;
}