[teiid-commits] teiid SVN: r565 - in trunk: client and 22 other directories.
teiid-commits at lists.jboss.org
teiid-commits at lists.jboss.org
Tue Mar 17 10:25:36 EDT 2009
Author: shawkins
Date: 2009-03-17 10:25:36 -0400 (Tue, 17 Mar 2009)
New Revision: 565
Added:
trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/ObjectInputStreamWithClassloader.java
trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/OioOjbectChannelFactory.java
trunk/client/src/main/java/org/
trunk/client/src/main/java/org/jboss/
trunk/client/src/main/java/org/jboss/netty/
trunk/client/src/main/java/org/jboss/netty/handler/
trunk/client/src/main/java/org/jboss/netty/handler/codec/
trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/
trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/CompactObjectInputStream.java
trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/CompactObjectOutputStream.java
trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/ObjectDecoderInputStream.java
trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/ObjectEncoderOutputStream.java
trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/ChannelListener.java
trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/SSLAwareChannelHandler.java
Removed:
trunk/client/src/main/java/com/metamatrix/common/comm/api/MessageListener.java
trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/SSLAwareChannelHandler.java
trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/NioObjectChannelFactory.java
Modified:
trunk/client/pom.xml
trunk/client/src/main/java/com/metamatrix/common/api/HostInfo.java
trunk/client/src/main/java/com/metamatrix/common/api/MMURL.java
trunk/client/src/main/java/com/metamatrix/common/comm/api/Message.java
trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/Handshake.java
trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/ObjectChannel.java
trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/SocketUtil.java
trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/AdminApiServerDiscovery.java
trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/ObjectChannelFactory.java
trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnection.java
trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnectionFactory.java
trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceFactory.java
trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java
trunk/client/src/main/java/com/metamatrix/dqp/client/ResultsFuture.java
trunk/client/src/main/resources/teiid-client-settings.properties
trunk/client/src/test/java/com/metamatrix/common/comm/platform/socket/client/TestSocketServerConnection.java
trunk/client/src/test/java/com/metamatrix/common/comm/platform/socket/client/TestSocketServerInstanceImpl.java
trunk/client/src/test/java/com/metamatrix/common/util/TestMMURL.java
trunk/common-internal/src/main/java/com/metamatrix/common/util/VMNaming.java
trunk/engine/src/main/java/com/metamatrix/dqp/internal/process/DQPCore.java
trunk/engine/src/main/java/com/metamatrix/dqp/internal/process/DQPWorkContext.java
trunk/pom.xml
trunk/server/pom.xml
trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/ServerWorkItem.java
trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketClientInstance.java
trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketListener.java
trunk/server/src/main/java/com/metamatrix/common/net/ServerSocketConfiguration.java
trunk/server/src/test/java/com/metamatrix/common/comm/platform/socket/server/TestCommSockets.java
trunk/server/src/test/java/com/metamatrix/common/comm/platform/socket/server/TestSocketRemoting.java
Log:
TEIID-292 changed back to synch io on the client.
Modified: trunk/client/pom.xml
===================================================================
--- trunk/client/pom.xml 2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/pom.xml 2009-03-17 14:25:36 UTC (rev 565)
@@ -18,10 +18,6 @@
<artifactId>teiid-common-core</artifactId>
</dependency>
<dependency>
- <groupId>org.jboss.netty</groupId>
- <artifactId>netty</artifactId>
- </dependency>
- <dependency>
<groupId>org.jboss.teiid</groupId>
<artifactId>teiid-common-core</artifactId>
<type>test-jar</type>
Modified: trunk/client/src/main/java/com/metamatrix/common/api/HostInfo.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/api/HostInfo.java 2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/main/java/com/metamatrix/common/api/HostInfo.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -26,93 +26,52 @@
import java.net.UnknownHostException;
import com.metamatrix.common.util.NetUtils;
+import com.metamatrix.core.util.ArgCheck;
import com.metamatrix.core.util.HashCodeUtil;
/**
- * HostInfo is used internally by Metamatirx to store the host information used in creating the Service Object reference
- *
+ * Defines the hostname/port or {@link InetAddress} to connect to a host.
* @since 4.2
*/
public class HostInfo {
// Host Name and Port Number
private String hostName;
+ private int portNumber = 0;
private InetAddress inetAddress;
- private int portNumber = 0;
- public HostInfo(String host, String port) {
- this(host, parsePort(port));
+ public InetAddress getInetAddress() throws UnknownHostException {
+ if (inetAddress != null) {
+ return inetAddress;
+ }
+ return NetUtils.resolveHostByName(hostName);
}
-
- private static int parsePort(String port) {
- try {
- return Integer.parseInt(port);
- } catch (NumberFormatException nfe) {
- throw new IllegalArgumentException("port must be numeric:" + port); //$NON-NLS-1$
- }
- }
public HostInfo (String host, int port) {
- this(host, port, null);
- }
-
- /**
- * @since 4.2
- */
- public HostInfo(String host, int port, InetAddress inetAddress) {
- if (host == null || host.equals("")) { //$NON-NLS-1$
- throw new IllegalArgumentException("hostname can't be null"); //$NON-NLS-1$
- }
- if( host.equalsIgnoreCase("localhost")) { //$NON-NLS-1$
- try {
- InetAddress addr = NetUtils.getInstance().getInetAddress();
- this.hostName = addr.getCanonicalHostName();
- } catch (UnknownHostException e) {
- this.hostName = host.toLowerCase();
+ ArgCheck.isNotNull(host);
+ this.hostName = host.toLowerCase();
+ this.portNumber = port;
+
+ //only cache inetaddresses if they represent the ip.
+ try {
+ InetAddress addr = NetUtils.resolveHostByName(hostName);
+ if (addr.getHostAddress().equalsIgnoreCase(hostName)) {
+ this.inetAddress = addr;
}
- } else {
- this.hostName = host.toLowerCase();
- }
- if (inetAddress == null) {
- try {
- this.inetAddress = InetAddress.getByName(hostName);
- } catch (UnknownHostException e) {
- //ignore
- }
- } else {
- this.inetAddress = inetAddress;
- }
- portNumber = port;
-
- if (portNumber < 0 || portNumber > 0xFFFF) {
- throw new IllegalArgumentException("port out of range:" + portNumber); //$NON-NLS-1$
- }
- if (hostName == null) {
- throw new IllegalArgumentException("hostname can't be null"); //$NON-NLS-1$
- }
+ } catch (UnknownHostException e) {
+ }
}
public String getHostName() {
return hostName;
}
- public void setPortNumber(int thePortNumber) {
- portNumber = thePortNumber;
- }
-
public int getPortNumber() {
return portNumber;
}
-
- public InetAddress getInetAddress() {
- return inetAddress;
- }
-
- public String toString() {
+
+ public String toString() {
StringBuffer sb = new StringBuffer();
sb.append(hostName).append(":").append(portNumber); //$NON-NLS-1$
- if (inetAddress != null) {
- sb.append(inetAddress);
- }
return sb.toString();
}
@@ -128,10 +87,7 @@
return false;
}
HostInfo hostInfo = (HostInfo) obj;
- if (inetAddress == null || hostInfo.getInetAddress() == null) {
- return hostName.equals(hostInfo.getHostName()) && portNumber == hostInfo.getPortNumber();
- }
- return inetAddress.equals(hostInfo.getInetAddress()) && portNumber == hostInfo.getPortNumber();
+ return hostName.equals(hostInfo.getHostName()) && portNumber == hostInfo.getPortNumber();
}
/**
@@ -139,16 +95,8 @@
* @since 4.2
*/
public int hashCode() {
- int hc = 0;
-
- if (inetAddress != null) {
- hc = HashCodeUtil.hashCode(hc, inetAddress.getHostAddress());
- } else {
- hc = HashCodeUtil.hashCode(hc, hostName);
- }
- hc = HashCodeUtil.hashCode(hc, portNumber);
-
- return hc;
+ int hc = HashCodeUtil.hashCode(0, hostName);
+ return HashCodeUtil.hashCode(hc, portNumber);
}
}
Modified: trunk/client/src/main/java/com/metamatrix/common/api/MMURL.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/api/MMURL.java 2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/main/java/com/metamatrix/common/api/MMURL.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -29,7 +29,7 @@
import java.util.NoSuchElementException;
import java.util.StringTokenizer;
-import com.metamatrix.core.CorePlugin;
+import com.metamatrix.common.comm.platform.CommPlatformPlugin;
/**
* Class to encapsulate URL to a Clustered Metamatrix Server with multiple host
@@ -92,7 +92,7 @@
public static final String FORMAT_SERVER = "mm[s]://server1:port1[,server2:port2]"; //$NON-NLS-1$
- public static final String INVALID_FORMAT_SERVER = CorePlugin.Util.getString("MMURL.INVALID_FORMAT", new Object[] {FORMAT_SERVER}); //$NON-NLS-1$
+ public static final String INVALID_FORMAT_SERVER = CommPlatformPlugin.Util.getString("MMURL.INVALID_FORMAT", new Object[] {FORMAT_SERVER}); //$NON-NLS-1$
@@ -234,7 +234,19 @@
try {
String host = st2.nextToken().trim();
String port = st2.nextToken().trim();
- HostInfo hostInfo = new HostInfo(host, port);
+ if (host.equals("")) { //$NON-NLS-1$
+ throw new IllegalArgumentException("hostname can't be empty"); //$NON-NLS-1$
+ }
+ int portNumber;
+ try {
+ portNumber = Integer.parseInt(port);
+ } catch (NumberFormatException nfe) {
+ throw new IllegalArgumentException("port must be numeric:" + port); //$NON-NLS-1$
+ }
+ if (portNumber < 0 || portNumber > 0xFFFF) {
+ throw new IllegalArgumentException("port out of range:" + portNumber); //$NON-NLS-1$
+ }
+ HostInfo hostInfo = new HostInfo(host, portNumber);
hosts.add(hostInfo);
} catch (NoSuchElementException nsee) {
throw new IllegalArgumentException(exceptionMessage);
Modified: trunk/client/src/main/java/com/metamatrix/common/comm/api/Message.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/api/Message.java 2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/api/Message.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -48,13 +48,7 @@
public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
- try {
- this.contents = (Serializable) in.readObject();
- } catch (IOException t) {
- throw t;
- } catch (Throwable t) {
- this.contents = t;
- }
+ this.contents = (Serializable) in.readObject();
this.messageKey = (Serializable) in.readObject();
}
Deleted: trunk/client/src/main/java/com/metamatrix/common/comm/api/MessageListener.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/api/MessageListener.java 2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/api/MessageListener.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -1,43 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package com.metamatrix.common.comm.api;
-
-import java.io.Serializable;
-
-/**
- * <p>The MessageListener is used for asynchronous message callbacks and
- * will receive a message with a messageKey that was sent with the
- * message originally. Typically the messageKey is a unique key generated
- * by the client so that it can distinguish between return messages. The
- * MessageListener is typically implemented by the application.</p>
- */
-public interface MessageListener {
-
- /**
- * Deliver a message to the listener.
- * @param message The message being delivered
- * @param messageKey The key identifying the message, may be null depending on the application
- */
- void deliverMessage(Message message, Serializable messageKey);
-
-}
Modified: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/Handshake.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/Handshake.java 2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/Handshake.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -31,7 +31,6 @@
*/
public class Handshake implements Serializable {
- public static final int HANDSHAKE_TIMEOUT = 3000; //3 seconds
private static final long serialVersionUID = 7839271224736355515L;
private String version = MetaMatrixProductVersion.VERSION_NUMBER;
Modified: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/ObjectChannel.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/ObjectChannel.java 2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/ObjectChannel.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -22,27 +22,16 @@
package com.metamatrix.common.comm.platform.socket;
+import java.io.IOException;
+import java.net.SocketAddress;
import java.util.concurrent.Future;
-import com.metamatrix.common.comm.exception.CommunicationException;
-
-
-
public interface ObjectChannel {
- public interface ChannelListenerFactory {
- ChannelListener createChannelListener(ObjectChannel channel);
- }
+ Object read() throws IOException, ClassNotFoundException;
- public interface ChannelListener {
+ SocketAddress getRemoteAddress();
- void receivedMessage(Object msg) throws CommunicationException;
-
- void exceptionOccurred(Throwable t);
-
- void onConnection() throws CommunicationException;
- }
-
Future<?> write(Object msg);
boolean isOpen();
Added: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/ObjectInputStreamWithClassloader.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/ObjectInputStreamWithClassloader.java (rev 0)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/ObjectInputStreamWithClassloader.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -0,0 +1,50 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package com.metamatrix.common.comm.platform.socket;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectStreamClass;
+
+public final class ObjectInputStreamWithClassloader extends
+ ObjectInputStream {
+ private final ClassLoader cl;
+
+ public ObjectInputStreamWithClassloader(InputStream in,
+ ClassLoader cl) throws IOException {
+ super(in);
+ this.cl = cl;
+ }
+
+ @Override
+ protected Class<?> resolveClass(ObjectStreamClass desc)
+ throws IOException, ClassNotFoundException {
+ //see java bug id 6434149
+ try {
+ return Class.forName(desc.getName(), false, cl);
+ } catch (ClassNotFoundException e) {
+ return super.resolveClass(desc);
+ }
+ }
+}
\ No newline at end of file
Property changes on: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/ObjectInputStreamWithClassloader.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Deleted: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/SSLAwareChannelHandler.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/SSLAwareChannelHandler.java 2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/SSLAwareChannelHandler.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -1,240 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-/**
- *
- */
-package com.metamatrix.common.comm.platform.socket;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.net.ssl.SSLEngine;
-
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineCoverage;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.DefaultChannelPipeline;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.handler.codec.serialization.ObjectDecoder;
-import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
-import org.jboss.netty.handler.ssl.SslHandler;
-
-import com.metamatrix.common.comm.exception.SingleInstanceCommunicationException;
-import com.metamatrix.common.comm.platform.CommPlatformPlugin;
-import com.metamatrix.common.comm.platform.socket.ObjectChannel.ChannelListener;
-import com.metamatrix.common.comm.platform.socket.ObjectChannel.ChannelListenerFactory;
-
-/**
- * Main class for creating Netty Nio Channels
- */
-
- at ChannelPipelineCoverage(ChannelPipelineCoverage.ALL)
-public class SSLAwareChannelHandler extends SimpleChannelHandler implements ChannelPipelineFactory {
-
- public class ObjectChannelImpl implements ObjectChannel {
- private final Channel channel;
-
- public ObjectChannelImpl(Channel channel) {
- this.channel = channel;
- }
-
- public void close() {
- channel.close();
- }
-
- public boolean isOpen() {
- return channel.isOpen();
- }
-
- public Future<?> write(Object msg) {
- final ChannelFuture future = channel.write(msg);
- future.addListener(completionListener);
- return new Future() {
-
- @Override
- public boolean cancel(boolean arg0) {
- return future.cancel();
- }
-
- @Override
- public Object get() throws InterruptedException,
- ExecutionException {
- future.await();
- if (!future.isSuccess()) {
- throw new ExecutionException(future.getCause());
- }
- return null;
- }
-
- @Override
- public Object get(long arg0, TimeUnit arg1)
- throws InterruptedException, ExecutionException,
- TimeoutException {
- if (future.await(arg0, arg1)) {
- if (!future.isSuccess()) {
- throw new ExecutionException(future.getCause());
- }
- return null;
- }
- throw new TimeoutException();
- }
-
- @Override
- public boolean isCancelled() {
- return future.isCancelled();
- }
-
- @Override
- public boolean isDone() {
- return future.isDone();
- }
- };
- }
- }
-
- private final ChannelListenerFactory listenerFactory;
- private final SSLEngine engine;
- private final ClassLoader classLoader;
- private Map<Channel, ChannelListener> listeners = Collections.synchronizedMap(new HashMap<Channel, ChannelListener>());
- private AtomicLong objectsRead = new AtomicLong(0);
- private AtomicLong objectsWritten = new AtomicLong(0);
- private volatile int maxChannels;
-
- private ChannelFutureListener completionListener = new ChannelFutureListener() {
-
- @Override
- public void operationComplete(ChannelFuture arg0)
- throws Exception {
- if (arg0.isSuccess()) {
- objectsWritten.getAndIncrement();
- }
- }
-
- };
-
- public SSLAwareChannelHandler(ChannelListenerFactory listenerFactory,
- SSLEngine engine, ClassLoader classloader) {
- this.listenerFactory = listenerFactory;
- this.engine = engine;
- this.classLoader = classloader;
- }
-
- @Override
- public void channelConnected(ChannelHandlerContext ctx,
- final ChannelStateEvent e) throws Exception {
- ChannelListener listener = this.listenerFactory.createChannelListener(new ObjectChannelImpl(e.getChannel()));
- synchronized (this.listeners) {
- this.listeners.put(e.getChannel(), listener);
- maxChannels = Math.max(maxChannels, this.listeners.size());
- }
- if (engine != null) {
- SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
- sslHandler.handshake(e.getChannel()).addListener(new ChannelFutureListener() {
- public void operationComplete(ChannelFuture arg0)
- throws Exception {
- onConnection(e.getChannel());
- }
- });
- } else {
- onConnection(e.getChannel());
- }
- }
-
- private void onConnection(Channel channel) throws Exception {
- ChannelListener listener = this.listeners.get(channel);
- if (listener != null) {
- listener.onConnection();
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx,
- ExceptionEvent e) throws Exception {
- ChannelListener listener = this.listeners.get(e.getChannel());
- if (listener != null) {
- listener.exceptionOccurred(e.getCause());
- }
- e.getChannel().close();
- }
-
- @Override
- public void messageReceived(ChannelHandlerContext ctx,
- MessageEvent e) throws Exception {
- objectsRead.getAndIncrement();
- ChannelListener listener = this.listeners.get(e.getChannel());
- if (listener != null) {
- listener.receivedMessage(e.getMessage());
- }
- }
-
- @Override
- public void channelDisconnected(ChannelHandlerContext ctx,
- ChannelStateEvent e) throws Exception {
- ChannelListener listener = this.listeners.remove(e.getChannel());
- if (listener != null) {
- listener.exceptionOccurred(new SingleInstanceCommunicationException(CommPlatformPlugin.Util.getString("SSLAwareChannelHandler.channel_closed"))); //$NON-NLS-1$
- }
- }
-
- public ChannelPipeline getPipeline() throws Exception {
- ChannelPipeline pipeline = new DefaultChannelPipeline();
-
- if (engine != null) {
- pipeline.addLast("ssl", new SslHandler(engine)); //$NON-NLS-1$
- }
- pipeline.addLast("decoder", new ObjectDecoder(2 << 24, this.classLoader)); //$NON-NLS-1$
- pipeline.addLast("encoder", new ObjectEncoder()); //$NON-NLS-1$
- pipeline.addLast("handler", this); //$NON-NLS-1$
- return pipeline;
- }
-
- public long getObjectsRead() {
- return this.objectsRead.get();
- }
-
- public long getObjectsWritten() {
- return this.objectsWritten.get();
- }
-
- public int getConnectedChannels() {
- return this.listeners.size();
- }
-
- public int getMaxConnectedChannels() {
- return this.maxChannels;
- }
-
-}
\ No newline at end of file
Modified: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/SocketUtil.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/SocketUtil.java 2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/SocketUtil.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -26,6 +26,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
+import java.net.Socket;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.security.KeyStoreException;
@@ -37,7 +38,7 @@
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLSocket;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
@@ -68,17 +69,17 @@
public static final String ANON_CIPHER_SUITE = "TLS_DH_anon_WITH_AES_128_CBC_SHA"; //$NON-NLS-1$
public static final String ANON_PROTOCOL = "TLS"; //$NON-NLS-1$
- public static class SSLEngineFactory {
+ public static class SSLSocketFactory {
private boolean isAnon;
- private SSLContext context;
+ private javax.net.ssl.SSLSocketFactory factory;
- public SSLEngineFactory(SSLContext context, boolean isAnon) {
- this.context = context;
+ public SSLSocketFactory(SSLContext context, boolean isAnon) {
+ this.factory = context.getSocketFactory();
this.isAnon = isAnon;
}
- public SSLEngine getSSLEngine() {
- SSLEngine result = context.createSSLEngine();
+ public synchronized Socket getSocket() throws IOException {
+ SSLSocket result = (SSLSocket)factory.createSocket();
result.setUseClientMode(true);
if (isAnon) {
addCipherSuite(result, ANON_CIPHER_SUITE);
@@ -87,7 +88,7 @@
}
}
- public static SSLEngineFactory getSSLEngineFactory(Properties props) throws IOException, NoSuchAlgorithmException{
+ public static SSLSocketFactory getSSLSocketFactory(Properties props) throws IOException, GeneralSecurityException{
// -Dcom.metamatrix.ssl.keyStore
String keystore = props.getProperty(KEYSTORE_FILENAME);
// -Dcom.metamatrix.ssl.keyStorePassword
@@ -123,11 +124,12 @@
} else {
result = SSLContext.getDefault();
}
- return new SSLEngineFactory(result, anon);
+ return new SSLSocketFactory(result, anon);
}
/**
* create socket factory for the client socket.
+ * @throws GeneralSecurityException
*/
static SSLContext getClientSSLContext(String keystore,
String password,
@@ -135,11 +137,11 @@
String truststorePassword,
String algorithm,
String keystoreType,
- String protocol) throws IOException {
+ String protocol) throws IOException, GeneralSecurityException {
return getSSLContext(keystore, password, truststore, truststorePassword, algorithm, keystoreType, protocol);
}
- public static void addCipherSuite(SSLEngine engine, String cipherSuite) {
+ public static void addCipherSuite(SSLSocket engine, String cipherSuite) {
Assertion.assertTrue(Arrays.asList(engine.getSupportedCipherSuites()).contains(cipherSuite));
String[] suites = engine.getEnabledCipherSuites();
@@ -152,7 +154,7 @@
engine.setEnabledCipherSuites(newSuites);
}
- public static SSLContext getAnonSSLContext() throws IOException {
+ public static SSLContext getAnonSSLContext() throws IOException, GeneralSecurityException {
return getSSLContext(null, null, null, null, null, null, ANON_PROTOCOL);
}
@@ -162,43 +164,37 @@
String truststorePassword,
String algorithm,
String keystoreType,
- String protocol) throws IOException {
+ String protocol) throws IOException, GeneralSecurityException {
- try {
- if (algorithm == null) {
- algorithm = KeyManagerFactory.getDefaultAlgorithm();
- }
- // Configure the Keystore Manager
- KeyManager[] keyManagers = null;
- if (keystore != null) {
- KeyStore ks = loadKeyStore(keystore, password, keystoreType);
- if (ks != null) {
- KeyManagerFactory kmf = KeyManagerFactory.getInstance(algorithm);
- kmf.init(ks, password.toCharArray());
- keyManagers = kmf.getKeyManagers();
- }
+ if (algorithm == null) {
+ algorithm = KeyManagerFactory.getDefaultAlgorithm();
+ }
+ // Configure the Keystore Manager
+ KeyManager[] keyManagers = null;
+ if (keystore != null) {
+ KeyStore ks = loadKeyStore(keystore, password, keystoreType);
+ if (ks != null) {
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance(algorithm);
+ kmf.init(ks, password.toCharArray());
+ keyManagers = kmf.getKeyManagers();
}
-
- // Configure the Trust Store Manager
- TrustManager[] trustManagers = null;
- if (truststore != null) {
- KeyStore ks = loadKeyStore(truststore, truststorePassword, keystoreType);
- if (ks != null) {
- TrustManagerFactory tmf = TrustManagerFactory.getInstance(algorithm);
- tmf.init(ks);
- trustManagers = tmf.getTrustManagers();
- }
- }
-
- // Configure the SSL
- SSLContext sslc = SSLContext.getInstance(protocol);
- sslc.init(keyManagers, trustManagers, null);
- return sslc;
- } catch (GeneralSecurityException err) {
- IOException exception = new IOException(err.getMessage());
- exception.initCause(err);
- throw exception;
+ }
+
+ // Configure the Trust Store Manager
+ TrustManager[] trustManagers = null;
+ if (truststore != null) {
+ KeyStore ks = loadKeyStore(truststore, truststorePassword, keystoreType);
+ if (ks != null) {
+ TrustManagerFactory tmf = TrustManagerFactory.getInstance(algorithm);
+ tmf.init(ks);
+ trustManagers = tmf.getTrustManagers();
+ }
}
+
+ // Configure the SSL
+ SSLContext sslc = SSLContext.getInstance(protocol);
+ sslc.init(keyManagers, trustManagers, null);
+ return sslc;
}
/**
Modified: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/AdminApiServerDiscovery.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/AdminApiServerDiscovery.java 2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/AdminApiServerDiscovery.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -90,11 +90,7 @@
if (!processObject.isEnabled()) {
continue;
}
- if (useUrlHost) {
- this.knownHosts.add(new HostInfo(lastHostInfo.getHostName(), processObject.getPort()));
- } else {
- this.knownHosts.add(new HostInfo(processObject.getInetAddress().getHostName(), processObject.getPort(), processObject.getInetAddress()));
- }
+ this.knownHosts.add(new HostInfo(useUrlHost?lastHostInfo.getHostName():processObject.getInetAddress().getHostName(), processObject.getPort()));
}
discoveredHosts = true;
} catch (AdminException e) {
Deleted: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/NioObjectChannelFactory.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/NioObjectChannelFactory.java 2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/NioObjectChannelFactory.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -1,92 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package com.metamatrix.common.comm.platform.socket.client;
-
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import javax.net.ssl.SSLEngine;
-
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-
-import com.metamatrix.common.comm.exception.CommunicationException;
-import com.metamatrix.common.comm.exception.SingleInstanceCommunicationException;
-import com.metamatrix.common.comm.platform.socket.SSLAwareChannelHandler;
-import com.metamatrix.common.comm.platform.socket.ObjectChannel.ChannelListenerFactory;
-import com.metamatrix.core.util.NamedThreadFactory;
-
-public class NioObjectChannelFactory implements ObjectChannelFactory {
-
- private int inputBufferSize;
- private int outputBufferSize;
- private boolean conserveBandwidth;
- private ClassLoader classLoader;
- private ChannelFactory channlFactory;
-
- public NioObjectChannelFactory(boolean conserveBandwidth, int inputBufferSize, int outputBufferSize, ClassLoader classLoader, int workerCount) {
- this.conserveBandwidth = conserveBandwidth;
- this.inputBufferSize = inputBufferSize;
- this.outputBufferSize = outputBufferSize;
- this.classLoader = classLoader;
- ThreadPoolExecutor executor = new ThreadPoolExecutor(0,
- Integer.MAX_VALUE, 2, TimeUnit.MINUTES,
- new SynchronousQueue<Runnable>(),
- new NamedThreadFactory("Nio")); //$NON-NLS-1$
- this.channlFactory = new NioClientSocketChannelFactory(executor, executor, workerCount);
- }
-
- public void createObjectChannel(SocketAddress address, SSLEngine engine,
- ChannelListenerFactory listener) throws IOException,
- CommunicationException {
- ClientBootstrap bootstrap = new ClientBootstrap(channlFactory);
-
- final SSLAwareChannelHandler handler = new SSLAwareChannelHandler(listener, engine, this.classLoader);
-
- bootstrap.setPipelineFactory(handler);
-
- if (!conserveBandwidth) {
- bootstrap.setOption("tcpNoDelay", Boolean.TRUE); //$NON-NLS-1$
- }
- bootstrap.setOption("receiveBufferSize", new Integer(inputBufferSize)); //$NON-NLS-1$
- bootstrap.setOption("sendBufferSize", new Integer(outputBufferSize)); //$NON-NLS-1$
- bootstrap.setOption("keepAlive", Boolean.TRUE); //$NON-NLS-1$
-
- ChannelFuture future = bootstrap.connect(address);
-
- //connections have no timeout
- future.awaitUninterruptibly();
-
- if (!future.isSuccess()) {
- if (future.getCause() instanceof IOException) {
- throw (IOException)future.getCause();
- }
- throw new SingleInstanceCommunicationException(future.getCause());
- }
- }
-}
Modified: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/ObjectChannelFactory.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/ObjectChannelFactory.java 2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/ObjectChannelFactory.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -25,15 +25,12 @@
import java.io.IOException;
import java.net.SocketAddress;
-import javax.net.ssl.SSLEngine;
-
import com.metamatrix.common.comm.exception.CommunicationException;
import com.metamatrix.common.comm.platform.socket.ObjectChannel;
public interface ObjectChannelFactory {
- void createObjectChannel(SocketAddress address, SSLEngine engine,
- ObjectChannel.ChannelListenerFactory listenerFactory) throws IOException,
+ ObjectChannel createObjectChannel(SocketAddress address, boolean ssl) throws IOException,
CommunicationException;
}
\ No newline at end of file
Added: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/OioOjbectChannelFactory.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/OioOjbectChannelFactory.java (rev 0)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/OioOjbectChannelFactory.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -0,0 +1,183 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package com.metamatrix.common.comm.platform.socket.client;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketTimeoutException;
+import java.security.GeneralSecurityException;
+import java.util.Properties;
+import java.util.concurrent.Future;
+import java.util.logging.Logger;
+
+import org.jboss.netty.handler.codec.serialization.ObjectDecoderInputStream;
+import org.jboss.netty.handler.codec.serialization.ObjectEncoderOutputStream;
+
+import com.metamatrix.common.comm.exception.CommunicationException;
+import com.metamatrix.common.comm.platform.socket.ObjectChannel;
+import com.metamatrix.common.comm.platform.socket.SocketUtil;
+import com.metamatrix.common.comm.platform.socket.SocketUtil.SSLSocketFactory;
+import com.metamatrix.dqp.client.ResultsFuture;
+
+final class OioOjbectChannelFactory implements ObjectChannelFactory {
+
+ private final static int STREAM_BUFFER_SIZE = 1<<15;
+ private final static int SO_TIMEOUT = 3000;
+
+ private static Logger log = Logger.getLogger("org.teiid.client.sockets"); //$NON-NLS-1$
+
+ final static class OioObjectChannel implements ObjectChannel {
+ private final Socket socket;
+ private ObjectOutputStream outputStream;
+ private ObjectInputStream inputStream;
+ private Object readLock = new Object();
+
+ private OioObjectChannel(Socket socket) throws IOException {
+ log.fine("creating new OioObjectChannel"); //$NON-NLS-1$
+ this.socket = socket;
+ socket.setSoTimeout(SO_TIMEOUT);
+ BufferedOutputStream bos = new BufferedOutputStream( socket.getOutputStream(), STREAM_BUFFER_SIZE);
+ outputStream = new ObjectEncoderOutputStream( new DataOutputStream(bos), 512);
+ //The output stream must be flushed on creation in order to write some initialization data
+ //through the buffered stream to the input stream on the other side
+ outputStream.flush();
+ final ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ BufferedInputStream bis = new BufferedInputStream(socket.getInputStream(), STREAM_BUFFER_SIZE);
+ inputStream = new ObjectDecoderInputStream(new DataInputStream(bis), cl, 1 << 25);
+ }
+
+ @Override
+ public void close() {
+ log.finer("closing socket"); //$NON-NLS-1$
+ try {
+ outputStream.flush();
+ } catch (IOException e) {
+ // ignore
+ }
+ try {
+ outputStream.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ try {
+ inputStream.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ try {
+ socket.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+
+ @Override
+ public SocketAddress getRemoteAddress() {
+ return socket.getRemoteSocketAddress();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return !socket.isClosed();
+ }
+
+ @Override
+ public Object read() throws IOException, ClassNotFoundException {
+ log.finer("reading message from socket"); //$NON-NLS-1$
+ synchronized (readLock) {
+ try {
+ return inputStream.readObject();
+ } catch (SocketTimeoutException e) {
+ throw e;
+ } catch (IOException e) {
+ close();
+ throw e;
+ }
+ }
+ }
+
+ @Override
+ public synchronized Future<?> write(Object msg) {
+ log.finer("writing message to socket"); //$NON-NLS-1$
+ ResultsFuture<Void> result = new ResultsFuture<Void>();
+ try {
+ outputStream.writeObject(msg);
+ outputStream.flush();
+ outputStream.reset();
+ result.getResultsReceiver().receiveResults(null);
+ } catch (IOException e) {
+ close();
+ result.getResultsReceiver().exceptionOccurred(e);
+ }
+ return result;
+ }
+ }
+
+ private Properties props;
+ private int inputBufferSize;
+ private int outputBufferSize;
+ private boolean conserveBandwidth;
+ private volatile SSLSocketFactory sslSocketFactory;
+
+ public OioOjbectChannelFactory(boolean conserveBandwidth,
+ int inputBufferSize, int outputBufferSize, Properties props) {
+ this.conserveBandwidth = conserveBandwidth;
+ this.inputBufferSize = inputBufferSize;
+ this.outputBufferSize = outputBufferSize;
+ this.props = props;
+ }
+
+ @Override
+ public ObjectChannel createObjectChannel(SocketAddress address, boolean ssl) throws IOException,
+ CommunicationException {
+ final Socket socket;
+ if (ssl) {
+ if (this.sslSocketFactory == null) {
+ try {
+ sslSocketFactory = SocketUtil.getSSLSocketFactory(props);
+ } catch (GeneralSecurityException e) {
+ throw new CommunicationException(e);
+ }
+ }
+ socket = sslSocketFactory.getSocket();
+ } else {
+ socket = new Socket();
+ }
+ if (inputBufferSize > 0) {
+ socket.setReceiveBufferSize(inputBufferSize);
+ }
+ if (outputBufferSize > 0) {
+ socket.setSendBufferSize(outputBufferSize);
+ }
+ socket.setTcpNoDelay(!conserveBandwidth); // enable Nagle's algorithm to conserve bandwidth
+ socket.connect(address);
+ return new OioObjectChannel(socket);
+ }
+}
\ No newline at end of file
Property changes on: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/OioOjbectChannelFactory.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnection.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnection.java 2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnection.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -40,6 +40,8 @@
import java.util.TimerTask;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -69,7 +71,7 @@
private Map<HostInfo, SocketServerInstance> existingConnections = new HashMap<HostInfo, SocketServerInstance>();
private SocketServerInstanceFactory connectionFactory;
private ServerDiscovery serverDiscovery;
- private static Logger log = Logger.getLogger("org.teiid.client.sockets");
+ private static Logger log = Logger.getLogger("org.teiid.client.sockets"); //$NON-NLS-1$
private boolean secure;
private Properties connProps;
@@ -78,7 +80,7 @@
private volatile LogonResult logonResult;
private ILogon logon;
private Timer pingTimer;
- private volatile boolean closed;
+ private boolean closed;
public SocketServerConnection(
SocketServerInstanceFactory connectionFactory, boolean secure,
@@ -131,7 +133,7 @@
}
Exception ex = null;
try {
- instance = connectionFactory.createServerInstance(hostInfo, secure);
+ instance = connectionFactory.getServerInstance(hostInfo, secure);
if (this.logonResult != null) {
ILogon newLogon = instance.getService(ILogon.class);
newLogon.assertIdentity(logonResult.getSessionID());
@@ -236,12 +238,9 @@
}
if (exception instanceof SingleInstanceCommunicationException
|| exception.getCause() instanceof SingleInstanceCommunicationException) {
- if (!isOpen()) {
+ if (!failOver || !isOpen()) {
break;
}
- if (!failOver) {
- break;
- }
invalidateTarget();
} else {
break;
@@ -275,7 +274,7 @@
try {
//make a best effort to send the logoff
Future<?> writeFuture = this.logon.logoff();
- writeFuture.get();
+ writeFuture.get(5000, TimeUnit.MILLISECONDS);
} catch (InvalidSessionException e) {
//ignore
} catch (MetaMatrixComponentException e) {
@@ -284,6 +283,8 @@
//ignore
} catch (ExecutionException e) {
//ignore
+ } catch (TimeoutException e) {
+ //ignore
}
for (SocketServerInstance instance : existingConnections.values()) {
@@ -294,7 +295,7 @@
this.serverDiscovery.shutdown();
}
- public boolean isOpen() {
+ public synchronized boolean isOpen() {
if (this.closed) {
return false;
}
Modified: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnectionFactory.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnectionFactory.java 2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnectionFactory.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -26,20 +26,17 @@
import java.io.InputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
-import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.Timer;
-import javax.net.ssl.SSLEngine;
-
import com.metamatrix.common.api.HostInfo;
import com.metamatrix.common.api.MMURL;
import com.metamatrix.common.comm.api.ServerConnectionFactory;
import com.metamatrix.common.comm.exception.CommunicationException;
import com.metamatrix.common.comm.exception.ConnectionException;
-import com.metamatrix.common.comm.platform.socket.Handshake;
-import com.metamatrix.common.comm.platform.socket.SocketUtil;
-import com.metamatrix.common.comm.platform.socket.SocketUtil.SSLEngineFactory;
import com.metamatrix.common.util.NetUtils;
import com.metamatrix.common.util.PropertiesUtils;
import com.metamatrix.core.MetaMatrixCoreException;
@@ -65,8 +62,8 @@
public static final int DEFAULT_MAX_THREADS = 15;
public static final long DEFAULT_TTL = 120000L;
public static final long DEFAULT_SYNCH_TTL = 120000L;
- public static final int DEFAULT_SOCKET_INPUT_BUFFER_SIZE = 102400;
- public static final int DEFAULT_SOCKET_OUTPUT_BUFFER_SIZE = 102400;
+ public static final int DEFAULT_SOCKET_INPUT_BUFFER_SIZE = 0;
+ public static final int DEFAULT_SOCKET_OUTPUT_BUFFER_SIZE = 0;
private static final String URL = "URL"; //$NON-NLS-1$
@@ -75,7 +72,6 @@
private ObjectChannelFactory channelFactory;
private Timer pingTimer;
private Properties props;
- private SSLEngineFactory sslEngineFactory;
public static synchronized SocketServerConnectionFactory getInstance() {
if (INSTANCE == null) {
@@ -95,7 +91,7 @@
}
}
}
- INSTANCE.init(props, true);
+ INSTANCE.init(props);
}
return INSTANCE;
}
@@ -104,33 +100,15 @@
}
- public void init(Properties props, boolean usePing) {
+ public void init(final Properties props) {
this.props = props;
this.pingTimer = new Timer("SocketPing", true); //$NON-NLS-1$
- this.channelFactory = new NioObjectChannelFactory(
- getConserveBandwidth(), getInputBufferSize(),
- getOutputBufferSize(), Thread.currentThread()
- .getContextClassLoader(), getMaxThreads());
+ this.channelFactory = new OioOjbectChannelFactory(getConserveBandwidth(), getInputBufferSize(), getOutputBufferSize(), props);
}
- public SocketServerInstance createServerInstance(HostInfo info, boolean ssl) throws CommunicationException, IOException {
- SSLEngine sslEngine = null;
- if (ssl) {
- synchronized (this) {
- if (this.sslEngineFactory == null) {
- try {
- this.sslEngineFactory = SocketUtil.getSSLEngineFactory(this.props);
- } catch (NoSuchAlgorithmException e) {
- throw new CommunicationException(e);
- } catch (IOException e) {
- throw new CommunicationException(e);
- }
- }
- sslEngine = this.sslEngineFactory.getSSLEngine();
- }
- }
- SocketServerInstanceImpl ssii = new SocketServerInstanceImpl(info, sslEngine, getSynchronousTTL());
- ssii.connect(this.channelFactory, Handshake.HANDSHAKE_TIMEOUT);
+ public SocketServerInstance getServerInstance(HostInfo info, boolean ssl) throws CommunicationException, IOException {
+ SocketServerInstanceImpl ssii = new SocketServerInstanceImpl(info, ssl, getSynchronousTTL());
+ ssii.connect(this.channelFactory);
return ssii;
}
Modified: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceFactory.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceFactory.java 2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceFactory.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -27,12 +27,8 @@
import com.metamatrix.common.api.HostInfo;
import com.metamatrix.common.comm.exception.CommunicationException;
-
-/**
- * Sockets implementation of the communication framework ServerConnectionFactory interface.
- */
public interface SocketServerInstanceFactory {
- SocketServerInstance createServerInstance(HostInfo info, boolean ssl) throws CommunicationException, IOException;
+ SocketServerInstance getServerInstance(HostInfo info, boolean ssl) throws CommunicationException, IOException;
}
\ No newline at end of file
Modified: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java 2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -30,6 +30,7 @@
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
@@ -37,16 +38,14 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
-import javax.net.ssl.SSLEngine;
-
import com.metamatrix.client.ExceptionUtil;
import com.metamatrix.common.api.HostInfo;
import com.metamatrix.common.comm.api.Message;
-import com.metamatrix.common.comm.api.MessageListener;
import com.metamatrix.common.comm.api.ResultsReceiver;
import com.metamatrix.common.comm.exception.CommunicationException;
import com.metamatrix.common.comm.exception.ExceptionHolder;
@@ -54,8 +53,6 @@
import com.metamatrix.common.comm.platform.CommPlatformPlugin;
import com.metamatrix.common.comm.platform.socket.Handshake;
import com.metamatrix.common.comm.platform.socket.ObjectChannel;
-import com.metamatrix.common.comm.platform.socket.ObjectChannel.ChannelListener;
-import com.metamatrix.common.comm.platform.socket.ObjectChannel.ChannelListenerFactory;
import com.metamatrix.common.util.crypto.CryptoException;
import com.metamatrix.common.util.crypto.Cryptor;
import com.metamatrix.common.util.crypto.DhKeyGenerator;
@@ -69,77 +66,42 @@
* On construction this class will create a channel and exchange a handshake.
* That handshake will establish a {@link Cryptor} to be used for secure traffic.
*/
-public class SocketServerInstanceImpl implements ChannelListener, SocketServerInstance {
+public class SocketServerInstanceImpl implements SocketServerInstance {
private AtomicInteger MESSAGE_ID = new AtomicInteger();
private HostInfo hostInfo;
- private SSLEngine engine;
+ private boolean ssl;
private ObjectChannel socketChannel;
- private Logger log = Logger.getLogger("org.teiid.client.sockets");
+ private static Logger log = Logger.getLogger("org.teiid.client.sockets"); //$NON-NLS-1$
private long synchTimeout;
private Cryptor cryptor;
- private Map<Serializable, MessageListener> asynchronousListeners = new ConcurrentHashMap<Serializable, MessageListener>();
+ private Map<Serializable, ResultsReceiver<Object>> asynchronousListeners = new ConcurrentHashMap<Serializable, ResultsReceiver<Object>>();
- private boolean handshakeCompleted;
- private CommunicationException handshakeError;
-
public SocketServerInstanceImpl() {
}
- public SocketServerInstanceImpl(final HostInfo host, SSLEngine engine, long synchTimeout) {
+ public SocketServerInstanceImpl(final HostInfo host, boolean ssl, long synchTimeout) {
this.hostInfo = host;
- this.engine = engine;
+ this.ssl = ssl;
this.synchTimeout = synchTimeout;
}
- public void connect(ObjectChannelFactory channelFactory, long handShakeTimeout) throws CommunicationException, IOException {
- InetSocketAddress address = null;
- if (hostInfo.getInetAddress() != null) {
- address = new InetSocketAddress(hostInfo.getInetAddress(), hostInfo.getPortNumber());
- } else {
- address = new InetSocketAddress(hostInfo.getHostName(), hostInfo.getPortNumber());
+ public void connect(ObjectChannelFactory channelFactory) throws CommunicationException, IOException {
+ InetSocketAddress address = new InetSocketAddress(hostInfo.getInetAddress(), hostInfo.getPortNumber());
+ this.socketChannel = channelFactory.createObjectChannel(address, ssl);
+ try {
+ doHandshake();
+ } catch (CommunicationException e) {
+ this.socketChannel.close();
+ throw e;
+ } catch (IOException e) {
+ this.socketChannel.close();
+ throw e;
}
- channelFactory.createObjectChannel(address, engine, new ChannelListenerFactory() {
-
- public ChannelListener createChannelListener(
- ObjectChannel channel) {
- synchronized (SocketServerInstanceImpl.this) {
- if (SocketServerInstanceImpl.this.handshakeError != null) {
- channel.close();
- }
- SocketServerInstanceImpl.this.socketChannel = channel;
- }
- return SocketServerInstanceImpl.this;
- }
-
- });
- synchronized (this) {
- long endTime = System.currentTimeMillis() + handShakeTimeout;
- while (!this.handshakeCompleted && this.handshakeError == null) {
- long remainingTimeout = endTime - System.currentTimeMillis();
- if (remainingTimeout <= 0) {
- break;
- }
- try {
- this.wait(remainingTimeout);
- } catch (InterruptedException e) {
- break;
- }
- }
- if (!this.handshakeCompleted || this.handshakeError != null) {
- if (this.socketChannel != null) {
- this.socketChannel.close();
- }
- if (this.handshakeError == null) {
- this.handshakeError = new SingleInstanceCommunicationException(CommPlatformPlugin.Util.getString("SocketServerInstanceImpl.handshake_timeout")); //$NON-NLS-1$
- }
- throw this.handshakeError;
- }
- }
}
/**
@@ -153,11 +115,22 @@
return MetaMatrixProductVersion.VERSION_NUMBER;
}
- private synchronized void receivedHahdshake(Handshake handshake) {
+ private void doHandshake() throws IOException, CommunicationException {
+ final Handshake handshake;
try {
+ Object obj = this.socketChannel.read();
+
+ if (!(obj instanceof Handshake)) {
+ throw new CommunicationException(CommPlatformPlugin.Util.getString("SocketServerInstanceImpl.handshake_error")); //$NON-NLS-1$
+ }
+ handshake = (Handshake)obj;
+ } catch (ClassNotFoundException e1) {
+ throw new CommunicationException(e1);
+ }
+
+ try {
if (!getVersionInfo().equals(handshake.getVersion())) {
- this.handshakeError = new CommunicationException(CommPlatformPlugin.Util.getString("SocketServerInstanceImpl.version_mismatch", getVersionInfo(), handshake.getVersion())); //$NON-NLS-1$
- return;
+ throw new CommunicationException(CommPlatformPlugin.Util.getString("SocketServerInstanceImpl.version_mismatch", getVersionInfo(), handshake.getVersion())); //$NON-NLS-1$
}
handshake.setVersion(getVersionInfo());
@@ -174,11 +147,8 @@
}
this.socketChannel.write(handshake);
- this.handshakeCompleted = true;
} catch (CryptoException err) {
- this.handshakeError = new CommunicationException(err);
- } finally {
- this.notify();
+ throw new CommunicationException(err);
}
}
@@ -186,7 +156,7 @@
return socketChannel.isOpen();
}
- public void send(Message message, MessageListener listener, Serializable messageKey)
+ public void send(Message message, ResultsReceiver<Object> listener, Serializable messageKey)
throws CommunicationException, InterruptedException {
if (listener != null) {
asynchronousListeners.put(messageKey, listener);
@@ -213,54 +183,42 @@
public void exceptionOccurred(Throwable e) {
if (e instanceof CommunicationException) {
if (e.getCause() instanceof InvalidClassException) {
- log.log(Level.SEVERE, "Unknown class or incorrect class version:", e); //$NON-NLS-1$ //$NON-NLS-2$
+ log.log(Level.SEVERE, "Unknown class or incorrect class version:", e); //$NON-NLS-1$
} else {
- log.log(Level.FINE, "Unable to read: socket was already closed.", e); //$NON-NLS-1$ //$NON-NLS-2$
+ log.log(Level.FINE, "Unable to read: socket was already closed.", e); //$NON-NLS-1$
}
} else if (e instanceof EOFException) {
- log.log(Level.FINE, "Unable to read: socket was already closed.", e); //$NON-NLS-1$ //$NON-NLS-2$
+ log.log(Level.FINE, "Unable to read: socket was already closed.", e); //$NON-NLS-1$
} else {
- log.log(Level.WARNING, "Unable to read: unexpected exception", e); //$NON-NLS-1$ //$NON-NLS-2$
+ log.log(Level.WARNING, "Unable to read: unexpected exception", e); //$NON-NLS-1$
}
-
- synchronized (this) {
- if (!handshakeCompleted) {
- this.handshakeError = new SingleInstanceCommunicationException(e, CommPlatformPlugin.Util.getString(engine!=null?"SocketServerInstanceImpl.secure_error_during_handshake":"SocketServerInstanceImpl.error_during_handshake")); //$NON-NLS-1$ //$NON-NLS-2$
- this.notify();
- }
- }
-
- Message messageHolder = new Message();
- messageHolder.setContents(e instanceof SingleInstanceCommunicationException?e:new SingleInstanceCommunicationException(e));
+
+ if (!(e instanceof SingleInstanceCommunicationException)) {
+ e = new SingleInstanceCommunicationException(e);
+ }
- Set<Map.Entry<Serializable, MessageListener>> entries = this.asynchronousListeners.entrySet();
- for (Iterator<Map.Entry<Serializable, MessageListener>> iterator = entries.iterator(); iterator.hasNext();) {
- Map.Entry<Serializable, MessageListener> entry = iterator.next();
+ Set<Map.Entry<Serializable, ResultsReceiver<Object>>> entries = this.asynchronousListeners.entrySet();
+ for (Iterator<Map.Entry<Serializable, ResultsReceiver<Object>>> iterator = entries.iterator(); iterator.hasNext();) {
+ Map.Entry<Serializable, ResultsReceiver<Object>> entry = iterator.next();
iterator.remove();
- entry.getValue().deliverMessage(messageHolder, entry.getKey());
+ entry.getValue().exceptionOccurred(e);
}
}
public void receivedMessage(Object packet) {
- log.log(Level.FINE, "reading packet"); //$NON-NLS-1$ //$NON-NLS-2$
+ log.log(Level.FINE, "reading packet"); //$NON-NLS-1$
if (packet instanceof Message) {
Message messagePacket = (Message)packet;
- processAsynchronousPacket(messagePacket);
- } else if (packet instanceof Handshake) {
- receivedHahdshake((Handshake)packet);
+ Serializable messageKey = messagePacket.getMessageKey();
+ log.log(Level.FINE, "read asynch message:" + messageKey); //$NON-NLS-1$
+ ResultsReceiver<Object> listener = asynchronousListeners.remove(messageKey);
+ if (listener != null) {
+ listener.receiveResults(messagePacket.getContents());
+ }
} else {
- log.log(Level.FINE, "packet ignored:" + packet); //$NON-NLS-1$ //$NON-NLS-2$
+ log.log(Level.FINE, "packet ignored:" + packet); //$NON-NLS-1$
}
}
-
- private void processAsynchronousPacket(Message message) {
- Serializable messageKey = message.getMessageKey();
- log.log(Level.FINE, "read asynch message:" + messageKey); //$NON-NLS-1$ //$NON-NLS-2$
- MessageListener listener = asynchronousListeners.remove(messageKey);
- if (listener != null) {
- listener.deliverMessage(message, messageKey);
- }
- }
public void shutdown() {
socketChannel.close();
@@ -273,10 +231,7 @@
return this.cryptor;
}
- public void onConnection() {
-
- }
-
+ @SuppressWarnings("unchecked")
@Override
public <T> T getService(Class<T> iface) {
return (T)Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[] {iface}, new RemoteInvocationHandler(iface));
@@ -303,7 +258,7 @@
if (secure) {
message.setContents(getCryptor().sealObject(message.getContents()));
}
- ResultsFuture results = new ResultsFuture() {
+ ResultsFuture<Object> results = new ResultsFuture<Object>() {
@Override
protected Object convertResult() throws ExecutionException {
try {
@@ -319,24 +274,53 @@
throw new ExecutionException(e);
}
}
+
+ @Override
+ public synchronized Object get()
+ throws InterruptedException, ExecutionException {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * get calls are overridden to provide a thread in which to perform
+ * the actual reads.
+ */
+ @Override
+ public synchronized Object get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException,
+ TimeoutException {
+ int timeoutMillis = (int)Math.min(unit.toMillis(timeout), Integer.MAX_VALUE);
+ while (!isDone()) {
+ if (timeoutMillis <= 0) {
+ throw new TimeoutException();
+ }
+ long start = System.currentTimeMillis();
+ try {
+ receivedMessage(socketChannel.read());
+ } catch (IOException e) {
+ if (e instanceof SocketTimeoutException) {
+ timeoutMillis -= (System.currentTimeMillis() - start);
+ continue;
+ }
+ exceptionOccurred(e);
+ } catch (ClassNotFoundException e) {
+ exceptionOccurred(e);
+ }
+ }
+ return super.get(timeout, unit);
+ }
};
- final ResultsReceiver receiver = results.getResultsReceiver();
+ final ResultsReceiver<Object> receiver = results.getResultsReceiver();
- send(message, new MessageListener() {
-
- public void deliverMessage(Message responseMessage,
- Serializable messageKey) {
- Serializable result = responseMessage.getContents();
- receiver.receiveResults(result);
- }
-
- }, Integer.valueOf(MESSAGE_ID.getAndIncrement()));
+ send(message, receiver, Integer.valueOf(MESSAGE_ID.getAndIncrement()));
if (ResultsFuture.class.isAssignableFrom(method.getReturnType())) {
return results;
}
return results.get(synchTimeout, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
t = e.getCause();
+ } catch (TimeoutException e) {
+ t = new SingleInstanceCommunicationException(e);
} catch (Throwable e) {
t = e;
}
Modified: trunk/client/src/main/java/com/metamatrix/dqp/client/ResultsFuture.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/dqp/client/ResultsFuture.java 2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/main/java/com/metamatrix/dqp/client/ResultsFuture.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -30,6 +30,10 @@
import com.metamatrix.common.comm.api.ResultsReceiver;
+/**
+ * Implements a call back based future that can also have
+ * completion listeners.
+ */
public class ResultsFuture<T> implements Future<T> {
public interface CompletionListener<T> {
@@ -41,36 +45,37 @@
private T result;
private Throwable exception;
private boolean done;
+ private ResultsReceiver<T> resultsReceiver = new ResultsReceiver<T> () {
+ public void exceptionOccurred(Throwable e) {
+ synchronized (ResultsFuture.this) {
+ if (done) {
+ throw new IllegalStateException("Already sent results"); //$NON-NLS-1$
+ }
+ exception = e;
+ done = true;
+ ResultsFuture.this.notifyAll();
+ }
+ done();
+ }
+ public void receiveResults(T results) {
+ synchronized (ResultsFuture.this) {
+ if (done) {
+ throw new IllegalStateException("Already sent results"); //$NON-NLS-1$
+ }
+ result = results;
+ done = true;
+ ResultsFuture.this.notifyAll();
+ }
+ done();
+ }
+ };
public ResultsFuture() {
}
public ResultsReceiver<T> getResultsReceiver() {
- return new ResultsReceiver<T> () {
- public void exceptionOccurred(Throwable e) {
- synchronized (ResultsFuture.this) {
- if (done) {
- throw new IllegalStateException("Already sent results"); //$NON-NLS-1$
- }
- exception = e;
- done = true;
- ResultsFuture.this.notifyAll();
- }
- done();
- }
- public void receiveResults(T results) {
- synchronized (ResultsFuture.this) {
- if (done) {
- throw new IllegalStateException("Already sent results"); //$NON-NLS-1$
- }
- result = results;
- done = true;
- ResultsFuture.this.notifyAll();
- }
- done();
- }
- };
+ return resultsReceiver;
}
public boolean cancel(boolean mayInterruptIfRunning) {
Added: trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/CompactObjectInputStream.java
===================================================================
--- trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/CompactObjectInputStream.java (rev 0)
+++ trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/CompactObjectInputStream.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -0,0 +1,98 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * by the @author tags. See the COPYRIGHT.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.netty.handler.codec.serialization;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectStreamClass;
+import java.io.StreamCorruptedException;
+
+/**
+ * @author The Netty Project (netty-dev at lists.jboss.org)
+ * @author Trustin Lee (tlee at redhat.com)
+ *
+ * @version $Rev: 381 $, $Date: 2008-10-01 06:06:18 -0500 (Wed, 01 Oct 2008) $
+ *
+ */
+class CompactObjectInputStream extends ObjectInputStream {
+
+ private final ClassLoader classLoader;
+
+ CompactObjectInputStream(InputStream in) throws IOException {
+ this(in, null);
+ }
+
+ CompactObjectInputStream(InputStream in, ClassLoader classLoader) throws IOException {
+ super(in);
+ this.classLoader = classLoader;
+ }
+
+ @Override
+ protected void readStreamHeader() throws IOException,
+ StreamCorruptedException {
+ int version = readByte() & 0xFF;
+ if (version != STREAM_VERSION) {
+ throw new StreamCorruptedException(
+ "Unsupported version: " + version); //$NON-NLS-1$
+ }
+ }
+
+ @Override
+ protected ObjectStreamClass readClassDescriptor()
+ throws IOException, ClassNotFoundException {
+ int type = read();
+ if (type < 0) {
+ throw new EOFException();
+ }
+ switch (type) {
+ case CompactObjectOutputStream.TYPE_PRIMITIVE:
+ return super.readClassDescriptor();
+ case CompactObjectOutputStream.TYPE_NON_PRIMITIVE:
+ String className = readUTF();
+ Class<?> clazz;
+ if (classLoader == null) {
+ clazz = Class.forName(
+ className, true,
+ Thread.currentThread().getContextClassLoader());
+ } else {
+ clazz = Class.forName(className, true, classLoader);
+ }
+ return ObjectStreamClass.lookup(clazz);
+ default:
+ throw new StreamCorruptedException(
+ "Unexpected class descriptor type: " + type); //$NON-NLS-1$
+ }
+ }
+
+ @Override
+ protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
+ String name = desc.getName();
+ try {
+ return Class.forName(name, false, classLoader);
+ } catch (ClassNotFoundException ex) {
+ return super.resolveClass(desc);
+ }
+ }
+}
Property changes on: trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/CompactObjectInputStream.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Added: trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/CompactObjectOutputStream.java
===================================================================
--- trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/CompactObjectOutputStream.java (rev 0)
+++ trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/CompactObjectOutputStream.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -0,0 +1,61 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * by the @author tags. See the COPYRIGHT.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.netty.handler.codec.serialization;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.ObjectStreamClass;
+import java.io.OutputStream;
+
+/**
+ * @author The Netty Project (netty-dev at lists.jboss.org)
+ * @author Trustin Lee (tlee at redhat.com)
+ *
+ * @version $Rev: 6 $, $Date: 2008-08-07 20:40:10 -0500 (Thu, 07 Aug 2008) $
+ *
+ */
+class CompactObjectOutputStream extends ObjectOutputStream {
+
+ static final int TYPE_PRIMITIVE = 0;
+ static final int TYPE_NON_PRIMITIVE = 1;
+
+ CompactObjectOutputStream(OutputStream out) throws IOException {
+ super(out);
+ }
+
+ @Override
+ protected void writeStreamHeader() throws IOException {
+ writeByte(STREAM_VERSION);
+ }
+
+ @Override
+ protected void writeClassDescriptor(ObjectStreamClass desc) throws IOException {
+ if (desc.forClass().isPrimitive()) {
+ write(TYPE_PRIMITIVE);
+ super.writeClassDescriptor(desc);
+ } else {
+ write(TYPE_NON_PRIMITIVE);
+ writeUTF(desc.getName());
+ }
+ }
+}
Property changes on: trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/CompactObjectOutputStream.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Added: trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/ObjectDecoderInputStream.java
===================================================================
--- trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/ObjectDecoderInputStream.java (rev 0)
+++ trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/ObjectDecoderInputStream.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -0,0 +1,74 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * by the @author tags. See the COPYRIGHT.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.netty.handler.codec.serialization;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.StreamCorruptedException;
+
+/**
+ * An {@link ObjectInput} which is interoperable with {@link ObjectEncoder}
+ * and {@link ObjectEncoderOutputStream}.
+ *
+ * @author The Netty Project (netty-dev at lists.jboss.org)
+ * @author Trustin Lee (tlee at redhat.com)
+ *
+ * @version $Rev: 628 $, $Date: 2009-01-05 20:06:00 -0600 (Mon, 05 Jan 2009) $
+ *
+ */
+public class ObjectDecoderInputStream extends ObjectInputStream {
+
+ private final DataInputStream in;
+ private final ClassLoader classLoader;
+ private final int maxObjectSize;
+
+ public ObjectDecoderInputStream(DataInputStream in, ClassLoader classLoader, int maxObjectSize) throws SecurityException, IOException {
+ super();
+ this.in = in;
+ this.classLoader = classLoader;
+ this.maxObjectSize = maxObjectSize;
+ }
+
+ @Override
+ protected final Object readObjectOverride() throws IOException,
+ ClassNotFoundException {
+ int dataLen = in.readInt();
+ if (dataLen <= 0) {
+ throw new StreamCorruptedException("invalid data length: " + dataLen); //$NON-NLS-1$
+ }
+ if (dataLen > maxObjectSize) {
+ throw new StreamCorruptedException(
+ "data length too big: " + dataLen + " (max: " + maxObjectSize + ')'); //$NON-NLS-1$ //$NON-NLS-2$
+ }
+
+ return new CompactObjectInputStream(in, classLoader).readObject();
+ }
+
+ @Override
+ public void close() throws IOException {
+ in.close();
+ }
+
+}
Property changes on: trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/ObjectDecoderInputStream.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Added: trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/ObjectEncoderOutputStream.java
===================================================================
--- trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/ObjectEncoderOutputStream.java (rev 0)
+++ trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/ObjectEncoderOutputStream.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -0,0 +1,78 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * by the @author tags. See the COPYRIGHT.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.netty.handler.codec.serialization;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+
+/**
+ * An {@link ObjectOutput} which is interoperable with {@link ObjectDecoder}
+ * and {@link ObjectDecoderInputStream}.
+ *
+ * @author The Netty Project (netty-dev at lists.jboss.org)
+ * @author Trustin Lee (tlee at redhat.com)
+ *
+ * @version $Rev: 595 $, $Date: 2008-12-08 03:02:33 -0600 (Mon, 08 Dec 2008) $
+ *
+ */
+public class ObjectEncoderOutputStream extends ObjectOutputStream {
+
+ private final DataOutputStream out;
+ private final int estimatedLength;
+
+ public ObjectEncoderOutputStream(DataOutputStream out, int estimatedLength) throws SecurityException, IOException {
+ super();
+ this.out = out;
+ this.estimatedLength = estimatedLength;
+ }
+
+ @Override
+ final protected void writeObjectOverride(Object obj) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(estimatedLength);
+ ObjectOutputStream oout = new CompactObjectOutputStream(baos);
+ oout.writeObject(obj);
+ oout.flush();
+ oout.close();
+
+ out.writeInt(baos.size());
+ out.write(baos.toByteArray());
+ }
+
+ @Override
+ public void close() throws IOException {
+ out.close();
+ }
+
+ @Override
+ public void flush() throws IOException {
+ out.flush();
+ }
+
+ @Override
+ public void reset() throws IOException {
+ }
+
+}
Property changes on: trunk/client/src/main/java/org/jboss/netty/handler/codec/serialization/ObjectEncoderOutputStream.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified: trunk/client/src/main/resources/teiid-client-settings.properties
===================================================================
--- trunk/client/src/main/resources/teiid-client-settings.properties 2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/main/resources/teiid-client-settings.properties 2009-03-17 14:25:36 UTC (rev 565)
@@ -86,16 +86,16 @@
metamatrix.synchronous.sockets.ttl=120000
#
-# Set the input buffer size
+# Set the socket receive buffer size
#
-metamatrix.sockets.inputBufferSize=102400
+metamatrix.sockets.inputBufferSize=0
#
-# Set the output buffer size
+# Set the socket send buffer size
#
-metamatrix.sockets.outputBufferSize=102400
+metamatrix.sockets.outputBufferSize=0
#
# Set to true to enable Nagle's algorithm to conserve bandwidth
Modified: trunk/client/src/test/java/com/metamatrix/common/comm/platform/socket/client/TestSocketServerConnection.java
===================================================================
--- trunk/client/src/test/java/com/metamatrix/common/comm/platform/socket/client/TestSocketServerConnection.java 2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/test/java/com/metamatrix/common/comm/platform/socket/client/TestSocketServerConnection.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -78,7 +78,7 @@
Properties connectionProperties)
throws LogonException,
MetaMatrixComponentException {
- return new LogonResult(new MetaMatrixSessionID(1), "fooUser", new Properties(), 1, "fake"); //$NON-NLS-1$
+ return new LogonResult(new MetaMatrixSessionID(1), "fooUser", new Properties(), 1, "fake"); //$NON-NLS-1$ //$NON-NLS-2$
}
@Override
@@ -122,23 +122,23 @@
Properties p = new Properties();
SocketServerInstanceFactory instanceFactory = new SocketServerInstanceFactory() {
@Override
- public SocketServerInstance createServerInstance(HostInfo info,
+ public SocketServerInstance getServerInstance(HostInfo info,
boolean ssl) throws CommunicationException, IOException {
throw new SingleInstanceCommunicationException();
}
};
- ServerDiscovery discovery = new UrlServerDiscovery(new MMURL("mm://host1:1,host2:2"));
+ ServerDiscovery discovery = new UrlServerDiscovery(new MMURL("mm://host1:1,host2:2")); //$NON-NLS-1$
try {
new SocketServerConnection(instanceFactory, false, discovery, p, null);
- fail("exception expected");
+ fail("exception expected"); //$NON-NLS-1$
} catch (CommunicationException e) {
- assertEquals("No valid host available. Attempted connections to: [host1:1, host2:2]", e.getMessage());
+ assertEquals("No valid host available. Attempted connections to: [host1:1, host2:2]", e.getMessage()); //$NON-NLS-1$
}
}
public void testLogon() throws Exception {
SocketServerConnection connection = createConnection(null);
- assertEquals("00000000-0000-0001-0000-000000000001", connection.getLogonResult().getSessionID().toString());
+ assertEquals("00000000-0000-0001-0000-000000000001", connection.getLogonResult().getSessionID().toString()); //$NON-NLS-1$
}
/**
@@ -155,14 +155,14 @@
ILogon logon = connection.getService(ILogon.class);
try {
logon.ping();
- fail("expected exception");
+ fail("expected exception"); //$NON-NLS-1$
} catch (MetaMatrixComponentException e) {
}
}
private SocketServerConnection createConnection(final Throwable throwException) throws CommunicationException, ConnectionException {
- return createConnection(throwException, new HostInfo("foo", 1));
+ return createConnection(throwException, new HostInfo("foo", 1)); //$NON-NLS-1$
}
private SocketServerConnection createConnection(final Throwable t, HostInfo hostInfo)
@@ -171,7 +171,7 @@
ServerDiscovery discovery = new UrlServerDiscovery(new MMURL(hostInfo.getHostName(), hostInfo.getPortNumber(), false));
SocketServerInstanceFactory instanceFactory = new SocketServerInstanceFactory() {
@Override
- public SocketServerInstance createServerInstance(final HostInfo info,
+ public SocketServerInstance getServerInstance(final HostInfo info,
boolean ssl) throws CommunicationException, IOException {
SocketServerInstance instance = Mockito.mock(SocketServerInstance.class);
Mockito.stub(instance.getCryptor()).toReturn(new NullCryptor());
@@ -188,8 +188,8 @@
}
public void testIsSameInstance() throws Exception {
- SocketServerConnection conn = createConnection(null, new HostInfo("foo", 1));
- SocketServerConnection conn1 = createConnection(null, new HostInfo("bar", 1));
+ SocketServerConnection conn = createConnection(null, new HostInfo("foo", 1)); //$NON-NLS-1$
+ SocketServerConnection conn1 = createConnection(null, new HostInfo("bar", 1)); //$NON-NLS-1$
ClientSideDQP dqp = conn.getService(ClientSideDQP.class);
ClientSideDQP dqp1 = conn1.getService(ClientSideDQP.class);
Modified: trunk/client/src/test/java/com/metamatrix/common/comm/platform/socket/client/TestSocketServerInstanceImpl.java
===================================================================
--- trunk/client/src/test/java/com/metamatrix/common/comm/platform/socket/client/TestSocketServerInstanceImpl.java 2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/test/java/com/metamatrix/common/comm/platform/socket/client/TestSocketServerInstanceImpl.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -24,30 +24,34 @@
import java.io.IOException;
import java.net.SocketAddress;
+import java.net.SocketTimeoutException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
-import javax.net.ssl.SSLEngine;
-
import junit.framework.TestCase;
-import org.mockito.Mockito;
-
import com.metamatrix.api.exception.MetaMatrixComponentException;
import com.metamatrix.common.api.HostInfo;
import com.metamatrix.common.comm.exception.CommunicationException;
import com.metamatrix.common.comm.platform.socket.Handshake;
import com.metamatrix.common.comm.platform.socket.ObjectChannel;
-import com.metamatrix.common.comm.platform.socket.ObjectChannel.ChannelListener;
-import com.metamatrix.common.comm.platform.socket.ObjectChannel.ChannelListenerFactory;
import com.metamatrix.dqp.client.ResultsFuture;
import com.metamatrix.platform.security.api.ILogon;
public class TestSocketServerInstanceImpl extends TestCase {
- private static final class FakeObjectChannel implements ObjectChannel {
- Object msg;
+ private static class FakeObjectChannel implements ObjectChannel, ObjectChannelFactory {
+ List<Object> msgs = new ArrayList<Object>();
+ List<? extends Object> readMsgs;
+ int readCount;
+
+ public FakeObjectChannel(List<? extends Object> readMsgs) {
+ this.readMsgs = readMsgs;
+ }
@Override
public void close() {
@@ -61,71 +65,80 @@
@Override
public Future<?> write(Object msg) {
- ResultsFuture<?> result = new ResultsFuture();
- this.msg = msg;
+ msgs.add(msg);
+ ResultsFuture<?> result = new ResultsFuture<Void>();
result.getResultsReceiver().receiveResults(null);
return result;
}
+
+ @Override
+ public Object read() throws IOException,
+ ClassNotFoundException {
+ Object msg = readMsgs.get(readCount++);
+ if (msg instanceof IOException) {
+ if (msg instanceof SocketTimeoutException) {
+ try {
+ Thread.sleep(5);
+ } catch (InterruptedException e) {
+ }
+ }
+ throw (IOException)msg;
+ }
+ return msg;
+ }
+
+ @Override
+ public SocketAddress getRemoteAddress() {
+ return null;
+ }
+
+ @Override
+ public ObjectChannel createObjectChannel(SocketAddress address,
+ boolean ssl) throws IOException, CommunicationException {
+ return this;
+ }
+
}
public void testHandshakeTimeout() throws Exception {
- ObjectChannelFactory channelFactory = Mockito.mock(ObjectChannelFactory.class);
+ final FakeObjectChannel channel = new FakeObjectChannel(Arrays.asList(new SocketTimeoutException()));
+
try {
- createInstance(channelFactory);
+ createInstance(channel);
fail("Exception expected"); //$NON-NLS-1$
- } catch (CommunicationException e) {
- assertEquals("Handshake timeout", e.getMessage()); //$NON-NLS-1$
+ } catch (IOException e) {
+
}
}
private SocketServerInstanceImpl createInstance(ObjectChannelFactory channelFactory)
throws CommunicationException, IOException {
- SocketServerInstanceImpl ssii = new SocketServerInstanceImpl(new HostInfo("foo", 1), null, 1); //$NON-NLS-1$
- ssii.connect(channelFactory, 1);
+ SocketServerInstanceImpl ssii = new SocketServerInstanceImpl(new HostInfo("0.0.0.0", 1), false, 1); //$NON-NLS-1$
+ ssii.connect(channelFactory);
return ssii;
}
public void testSuccessfulHandshake() throws Exception {
- final FakeObjectChannel channel = new FakeObjectChannel();
- ObjectChannelFactory channelFactory = new ObjectChannelFactory() {
- @Override
- public void createObjectChannel(SocketAddress address,
- SSLEngine engine, ChannelListenerFactory listenerFactory)
- throws IOException, CommunicationException {
- assertNull(engine);
- ChannelListener listener = listenerFactory.createChannelListener(channel);
- listener.receivedMessage(new Handshake());
- assertTrue(channel.msg instanceof Handshake);
- }
- };
- SocketServerInstanceImpl instance = createInstance(channelFactory);
+ final FakeObjectChannel channel = new FakeObjectChannel(Arrays.asList(new Handshake(), new SocketTimeoutException()));
+ SocketServerInstanceImpl instance = createInstance(channel);
+
//no remote server is hooked up, so this will timeout
ILogon logon = instance.getService(ILogon.class);
try {
logon.logon(new Properties());
fail("Exception expected"); //$NON-NLS-1$
} catch (MetaMatrixComponentException e) {
- assertTrue(e.getCause() instanceof TimeoutException);
+ assertTrue(e.getCause().getCause() instanceof TimeoutException);
}
}
public void testVersionMismatch() throws Exception {
- final FakeObjectChannel channel = new FakeObjectChannel();
- ObjectChannelFactory channelFactory = new ObjectChannelFactory() {
- @Override
- public void createObjectChannel(SocketAddress address,
- SSLEngine engine, ChannelListenerFactory listenerFactory)
- throws IOException, CommunicationException {
- assertNull(engine);
- ChannelListener listener = listenerFactory.createChannelListener(channel);
- Handshake h = new Handshake();
- h.setVersion("foo"); //$NON-NLS-1$
- listener.receivedMessage(h);
- }
- };
+ Handshake h = new Handshake();
+ h.setVersion("foo"); //$NON-NLS-1$
+ final FakeObjectChannel channel = new FakeObjectChannel(Arrays.asList(h));
try {
- createInstance(channelFactory);
+ createInstance(channel);
fail("exception expected"); //$NON-NLS-1$
} catch (CommunicationException e) {
e.printStackTrace();
@@ -133,5 +146,4 @@
}
}
-
}
Modified: trunk/client/src/test/java/com/metamatrix/common/util/TestMMURL.java
===================================================================
--- trunk/client/src/test/java/com/metamatrix/common/util/TestMMURL.java 2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/client/src/test/java/com/metamatrix/common/util/TestMMURL.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -217,14 +217,14 @@
}
public final void testHostInfoEquals() {
- HostInfo expectedResults = new HostInfo("localhost","31000"); //$NON-NLS-1$//$NON-NLS-2$
+ HostInfo expectedResults = new HostInfo("localhost",31000); //$NON-NLS-1$
MMURL url = new MMURL("mm://localhost:31000"); //$NON-NLS-1$
HostInfo actualResults = url.getHostInfo().get(0);
assertEquals(expectedResults,actualResults);
}
public final void testWithEmbeddedSpaces() {
- HostInfo expectedResults = new HostInfo("localhost","12345"); //$NON-NLS-1$ //$NON-NLS-2$
+ HostInfo expectedResults = new HostInfo("localhost",12345); //$NON-NLS-1$
MMURL url = new MMURL("mm://localhost : 12345"); //$NON-NLS-1$
List hosts = url.getHostInfo();
@@ -235,7 +235,7 @@
}
public final void testHostPortConstructor() {
- HostInfo expectedResults = new HostInfo("myhost","12345"); //$NON-NLS-1$ //$NON-NLS-2$
+ HostInfo expectedResults = new HostInfo("myhost", 12345); //$NON-NLS-1$
MMURL url = new MMURL("myhost", 12345, false); //$NON-NLS-1$
List hosts = url.getHostInfo();
@@ -247,7 +247,7 @@
}
public final void testHostPortConstructorSSL() {
- HostInfo expectedResults = new HostInfo("myhost","12345"); //$NON-NLS-1$ //$NON-NLS-2$
+ HostInfo expectedResults = new HostInfo("myhost",12345); //$NON-NLS-1$
MMURL url = new MMURL("myhost", 12345, true); //$NON-NLS-1$
List hosts = url.getHostInfo();
@@ -258,12 +258,4 @@
assertEquals("mms://myhost:12345", url.getAppServerURL()); //$NON-NLS-1$
}
- public final void testHostPortConstructor_NoHost() {
- try {
- new MMURL("", 12345, false); //$NON-NLS-1$
- fail("Should have failed."); //$NON-NLS-1$
- } catch (Exception e) {
- // Success
- }
- }
}
Modified: trunk/common-internal/src/main/java/com/metamatrix/common/util/VMNaming.java
===================================================================
--- trunk/common-internal/src/main/java/com/metamatrix/common/util/VMNaming.java 2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/common-internal/src/main/java/com/metamatrix/common/util/VMNaming.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -36,8 +36,7 @@
/*
* HOST_ADDRESS refers to to the host-name/ip, that is given to clients to connect where
- * the server is. So, in case of Firewall, this may be firewall name, but the bind_address
- * would be the physical address of the server
+ * the server is.
*/
private static InetAddress HOST_ADDRESS = null;
@@ -79,23 +78,22 @@
boolean bindAddressDefined = (bindAddress != null && bindAddress.length() > 0);
boolean hostNameDefined = (hostName != null && hostName.length() > 0);
+ if (hostNameDefined) {
+ HOST_ADDRESS = NetUtils.resolveHostByName(hostName);
+ }
- if (bindAddressDefined && hostNameDefined) {
+ if (bindAddressDefined) {
BIND_ADDRESS = bindAddress;
- HOST_ADDRESS = NetUtils.resolveHostByName(hostName);
+
+ if (!hostNameDefined) {
+ HOST_ADDRESS = InetAddress.getByName(bindAddress);
+ }
}
- else if (bindAddressDefined && !hostNameDefined) {
- BIND_ADDRESS = bindAddress;
- HOST_ADDRESS = InetAddress.getByAddress(BIND_ADDRESS.getBytes());
- }
- else if (!bindAddressDefined && hostNameDefined) {
- HOST_ADDRESS = NetUtils.resolveHostByName(hostName);
- BIND_ADDRESS = HOST_ADDRESS.getCanonicalHostName();
- }
else {
- InetAddress addr = NetUtils.getInstance().getInetAddress();
- BIND_ADDRESS = addr.getHostAddress();
- HOST_ADDRESS = addr;
+ if (!hostNameDefined) {
+ HOST_ADDRESS = NetUtils.getInstance().getInetAddress();
+ }
+ BIND_ADDRESS = HOST_ADDRESS.getHostAddress();
}
}
Modified: trunk/engine/src/main/java/com/metamatrix/dqp/internal/process/DQPCore.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/dqp/internal/process/DQPCore.java 2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/engine/src/main/java/com/metamatrix/dqp/internal/process/DQPCore.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -452,9 +452,7 @@
public ResultsFuture<?> closeRequest(long requestId) throws MetaMatrixProcessingException {
DQPWorkContext workContext = DQPWorkContext.getWorkContext();
closeRequest(workContext.getRequestID(requestId));
- ResultsFuture<Void> resultsFuture = new ResultsFuture<Void>();
- resultsFuture.getResultsReceiver().receiveResults(null);
- return resultsFuture;
+ return null;
}
/**
Modified: trunk/engine/src/main/java/com/metamatrix/dqp/internal/process/DQPWorkContext.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/dqp/internal/process/DQPWorkContext.java 2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/engine/src/main/java/com/metamatrix/dqp/internal/process/DQPWorkContext.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -22,17 +22,13 @@
package com.metamatrix.dqp.internal.process;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
import java.io.Serializable;
import com.metamatrix.dqp.message.RequestID;
import com.metamatrix.platform.security.api.MetaMatrixSessionID;
import com.metamatrix.platform.security.api.SessionToken;
-public class DQPWorkContext implements Externalizable {
+public class DQPWorkContext implements Serializable {
private static final long serialVersionUID = -6389893410233192977L;
@@ -50,7 +46,6 @@
CONTEXTS.set(context);
}
- private String connectionID;
private MetaMatrixSessionID sessionId;
private String userName;
private Serializable trustedPayload;
@@ -58,6 +53,8 @@
private String vdbVersion;
private String appName;
private SessionToken sessionToken;
+ private String clientAddress;
+ private String clientHostname;
public DQPWorkContext() {
}
@@ -119,7 +116,7 @@
}
public String getConnectionID() {
- return connectionID;
+ return this.sessionId!=null?this.sessionId.toString():null;
}
public MetaMatrixSessionID getSessionId() {
@@ -128,7 +125,6 @@
public void setSessionId(MetaMatrixSessionID sessionId) {
this.sessionId = sessionId;
- this.connectionID = sessionId.toString();
}
public void setAppName(String appName) {
@@ -151,25 +147,20 @@
return sessionToken;
}
- public void readExternal(ObjectInput in) throws IOException,
- ClassNotFoundException {
- this.setSessionId((MetaMatrixSessionID)in.readObject());
- this.setUserName((String)in.readObject());
- this.setTrustedPayload((Serializable)in.readObject());
- this.setVdbName((String)in.readObject());
- this.setVdbVersion((String)in.readObject());
- this.setAppName((String)in.readObject());
- this.setSessionToken((SessionToken)in.readObject());
+ public void setClientAddress(String clientAddress) {
+ this.clientAddress = clientAddress;
}
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(sessionId);
- out.writeObject(userName);
- out.writeObject(trustedPayload);
- out.writeObject(vdbName);
- out.writeObject(vdbVersion);
- out.writeObject(appName);
- out.writeObject(sessionToken);
+ public String getClientAddress() {
+ return clientAddress;
}
+ public void setClientHostname(String clientHostname) {
+ this.clientHostname = clientHostname;
+ }
+
+ public String getClientHostname() {
+ return clientHostname;
+ }
+
}
Modified: trunk/pom.xml
===================================================================
--- trunk/pom.xml 2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/pom.xml 2009-03-17 14:25:36 UTC (rev 565)
@@ -70,8 +70,6 @@
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
- <forkMode>once</forkMode>
- <argLine>-XX:MaxPermSize=128m</argLine>
<includes>
<include>**/*TestCase.java</include>
<include>**/*Test.java</include>
Modified: trunk/server/pom.xml
===================================================================
--- trunk/server/pom.xml 2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/server/pom.xml 2009-03-17 14:25:36 UTC (rev 565)
@@ -1,100 +1,93 @@
<?xml version="1.0" encoding="UTF-8"?>
-<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
- <parent>
- <artifactId>teiid</artifactId>
- <groupId>org.jboss.teiid</groupId>
- <version>6.0.0-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
- <artifactId>teiid-server</artifactId>
- <name>Server</name>
- <description>Standalone server/cluster infrastructure.</description>
-
- <build>
+<project
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <parent>
+ <artifactId>teiid</artifactId>
+ <groupId>org.jboss.teiid</groupId>
+ <version>6.0.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>teiid-server</artifactId>
+ <name>Server</name>
+ <description>Standalone server/cluster infrastructure.</description>
+ <build>
<!-- Zips all the .sql files into single file and adds as artifact to the maven repository -->
- <plugins>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>2.2-beta-2</version>
- <configuration>
- <descriptors>
- <descriptor>src/assembly/repository-sql.xml</descriptor>
- </descriptors>
- <outputDirectory>target/distribution</outputDirectory>
- <workDirectory>target/assembly/work</workDirectory>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>attached</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
- <dependencies>
-
- <dependency>
- <groupId>org.jboss.teiid</groupId>
- <artifactId>teiid-engine</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.jboss.teiid</groupId>
- <artifactId>teiid-engine</artifactId>
- <type>test-jar</type>
- </dependency>
-
- <dependency>
- <groupId>org.jboss.teiid</groupId>
- <artifactId>teiid-metadata</artifactId>
- </dependency>
-
- <!-- this dependency can easily be removed -->
- <dependency>
- <groupId>org.jboss.teiid</groupId>
- <artifactId>teiid-txn-jbossts</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.jboss.teiid</groupId>
- <artifactId>teiid-common-core</artifactId>
- <type>test-jar</type>
- </dependency>
-
- <dependency>
- <groupId>org.jboss.teiid</groupId>
- <artifactId>teiid-common-internal</artifactId>
- <type>test-jar</type>
- </dependency>
-
- <dependency>
- <groupId>org.jboss.teiid</groupId>
- <artifactId>teiid-client</artifactId>
- <type>test-jar</type>
- </dependency>
-
- <dependency>
- <groupId>org.jboss.teiid</groupId>
- <artifactId>teiid-metadata</artifactId>
- <type>test-jar</type>
- </dependency>
-
- <dependency>
- <groupId>com.google.code.guice</groupId>
- <artifactId>guice</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.jboss.teiid</groupId>
- <artifactId>teiid-cache-jbosscache</artifactId>
- </dependency>
-
- </dependencies>
-
-
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.2-beta-2</version>
+ <configuration>
+ <descriptors>
+ <descriptor>src/assembly/repository-sql.xml</descriptor>
+ </descriptors>
+ <outputDirectory>target/distribution</outputDirectory>
+ <workDirectory>target/assembly/work</workDirectory>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>attached</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>org.jboss.teiid</groupId>
+ <artifactId>teiid-engine</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.teiid</groupId>
+ <artifactId>teiid-engine</artifactId>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.teiid</groupId>
+ <artifactId>teiid-metadata</artifactId>
+ </dependency>
+ <dependency>
+ <!-- this dependency can easily be removed -->
+ <groupId>org.jboss.teiid</groupId>
+ <artifactId>teiid-txn-jbossts</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.teiid</groupId>
+ <artifactId>teiid-common-core</artifactId>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.teiid</groupId>
+ <artifactId>teiid-common-internal</artifactId>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.teiid</groupId>
+ <artifactId>teiid-client</artifactId>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.teiid</groupId>
+ <artifactId>teiid-metadata</artifactId>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.teiid</groupId>
+ <artifactId>teiid-cache-jbosscache</artifactId>
+ </dependency>
+
+ <!-- External dependencies -->
+ <dependency>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.guice</groupId>
+ <artifactId>guice</artifactId>
+ </dependency>
+ </dependencies>
</project>
\ No newline at end of file
Added: trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/ChannelListener.java
===================================================================
--- trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/ChannelListener.java (rev 0)
+++ trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/ChannelListener.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package com.metamatrix.common.comm.platform.socket;
+
+import com.metamatrix.common.comm.exception.CommunicationException;
+
+public interface ChannelListener {
+
+ public interface ChannelListenerFactory {
+ ChannelListener createChannelListener(ObjectChannel channel);
+ }
+
+ void receivedMessage(Object msg) throws CommunicationException;
+
+ void exceptionOccurred(Throwable t);
+
+ void onConnection() throws CommunicationException;
+}
\ No newline at end of file
Property changes on: trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/ChannelListener.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Copied: trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/SSLAwareChannelHandler.java (from rev 555, trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/SSLAwareChannelHandler.java)
===================================================================
--- trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/SSLAwareChannelHandler.java (rev 0)
+++ trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/SSLAwareChannelHandler.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -0,0 +1,250 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+/**
+ *
+ */
+package com.metamatrix.common.comm.platform.socket;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.net.ssl.SSLEngine;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.DefaultChannelPipeline;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.handler.codec.serialization.ObjectDecoder;
+import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
+import org.jboss.netty.handler.ssl.SslHandler;
+
+import com.metamatrix.common.comm.exception.SingleInstanceCommunicationException;
+import com.metamatrix.common.comm.platform.CommPlatformPlugin;
+
+/**
+ * Main class for creating Netty Nio Channels
+ */
+
+ at ChannelPipelineCoverage(ChannelPipelineCoverage.ALL)
+public class SSLAwareChannelHandler extends SimpleChannelHandler implements ChannelPipelineFactory {
+
+ public class ObjectChannelImpl implements ObjectChannel {
+ private final Channel channel;
+
+ public ObjectChannelImpl(Channel channel) {
+ this.channel = channel;
+ }
+
+ public void close() {
+ channel.close();
+ }
+
+ public boolean isOpen() {
+ return channel.isOpen();
+ }
+
+ public SocketAddress getRemoteAddress() {
+ return channel.getRemoteAddress();
+ }
+
+ @Override
+ public Object read() throws IOException,
+ ClassNotFoundException {
+ throw new UnsupportedOperationException();
+ }
+
+ public Future<?> write(Object msg) {
+ final ChannelFuture future = channel.write(msg);
+ future.addListener(completionListener);
+ return new Future<Void>() {
+
+ @Override
+ public boolean cancel(boolean arg0) {
+ return future.cancel();
+ }
+
+ @Override
+ public Void get() throws InterruptedException,
+ ExecutionException {
+ future.await();
+ if (!future.isSuccess()) {
+ throw new ExecutionException(future.getCause());
+ }
+ return null;
+ }
+
+ @Override
+ public Void get(long arg0, TimeUnit arg1)
+ throws InterruptedException, ExecutionException,
+ TimeoutException {
+ if (future.await(arg0, arg1)) {
+ if (!future.isSuccess()) {
+ throw new ExecutionException(future.getCause());
+ }
+ return null;
+ }
+ throw new TimeoutException();
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return future.isCancelled();
+ }
+
+ @Override
+ public boolean isDone() {
+ return future.isDone();
+ }
+ };
+ }
+ }
+
+ private final ChannelListener.ChannelListenerFactory listenerFactory;
+ private final SSLEngine engine;
+ private final ClassLoader classLoader;
+ private Map<Channel, ChannelListener> listeners = Collections.synchronizedMap(new HashMap<Channel, ChannelListener>());
+ private AtomicLong objectsRead = new AtomicLong(0);
+ private AtomicLong objectsWritten = new AtomicLong(0);
+ private volatile int maxChannels;
+
+ private ChannelFutureListener completionListener = new ChannelFutureListener() {
+
+ @Override
+ public void operationComplete(ChannelFuture arg0)
+ throws Exception {
+ if (arg0.isSuccess()) {
+ objectsWritten.getAndIncrement();
+ }
+ }
+
+ };
+
+ public SSLAwareChannelHandler(ChannelListener.ChannelListenerFactory listenerFactory,
+ SSLEngine engine, ClassLoader classloader) {
+ this.listenerFactory = listenerFactory;
+ this.engine = engine;
+ this.classLoader = classloader;
+ }
+
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx,
+ final ChannelStateEvent e) throws Exception {
+ ChannelListener listener = this.listenerFactory.createChannelListener(new ObjectChannelImpl(e.getChannel()));
+ synchronized (this.listeners) {
+ this.listeners.put(e.getChannel(), listener);
+ maxChannels = Math.max(maxChannels, this.listeners.size());
+ }
+ if (engine != null) {
+ SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
+ sslHandler.handshake(e.getChannel()).addListener(new ChannelFutureListener() {
+ public void operationComplete(ChannelFuture arg0)
+ throws Exception {
+ onConnection(e.getChannel());
+ }
+ });
+ } else {
+ onConnection(e.getChannel());
+ }
+ }
+
+ private void onConnection(Channel channel) throws Exception {
+ ChannelListener listener = this.listeners.get(channel);
+ if (listener != null) {
+ listener.onConnection();
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx,
+ ExceptionEvent e) throws Exception {
+ ChannelListener listener = this.listeners.get(e.getChannel());
+ if (listener != null) {
+ listener.exceptionOccurred(e.getCause());
+ }
+ e.getChannel().close();
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx,
+ MessageEvent e) throws Exception {
+ objectsRead.getAndIncrement();
+ ChannelListener listener = this.listeners.get(e.getChannel());
+ if (listener != null) {
+ listener.receivedMessage(e.getMessage());
+ }
+ }
+
+ @Override
+ public void channelDisconnected(ChannelHandlerContext ctx,
+ ChannelStateEvent e) throws Exception {
+ ChannelListener listener = this.listeners.remove(e.getChannel());
+ if (listener != null) {
+ listener.exceptionOccurred(new SingleInstanceCommunicationException(CommPlatformPlugin.Util.getString("SSLAwareChannelHandler.channel_closed"))); //$NON-NLS-1$
+ }
+ }
+
+ public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline pipeline = new DefaultChannelPipeline();
+
+ if (engine != null) {
+ pipeline.addLast("ssl", new SslHandler(engine)); //$NON-NLS-1$
+ }
+ pipeline.addLast("decoder", new ObjectDecoder(1 << 24, classLoader)); //$NON-NLS-1$
+ pipeline.addLast("encoder", new ObjectEncoder()); //$NON-NLS-1$
+ pipeline.addLast("handler", this); //$NON-NLS-1$
+ return pipeline;
+ }
+
+ public long getObjectsRead() {
+ return this.objectsRead.get();
+ }
+
+ public long getObjectsWritten() {
+ return this.objectsWritten.get();
+ }
+
+ public int getConnectedChannels() {
+ return this.listeners.size();
+ }
+
+ public int getMaxConnectedChannels() {
+ return this.maxChannels;
+ }
+
+}
\ No newline at end of file
Property changes on: trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/SSLAwareChannelHandler.java
___________________________________________________________________
Name: svn:mergeinfo
+
Modified: trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/ServerWorkItem.java
===================================================================
--- trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/ServerWorkItem.java 2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/ServerWorkItem.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -117,14 +117,14 @@
throw e.getCause();
}
if (ResultsFuture.class.isAssignableFrom(m.getReturnType()) && methodResult != null) {
- ResultsFuture future = (ResultsFuture) methodResult;
- future.addCompletionListener(new ResultsFuture.CompletionListener() {
+ ResultsFuture<Serializable> future = (ResultsFuture<Serializable>) methodResult;
+ future.addCompletionListener(new ResultsFuture.CompletionListener<Serializable>() {
public void onCompletion(
- ResultsFuture completedFuture) {
+ ResultsFuture<Serializable> completedFuture) {
Message asynchResult = new Message();
try {
- asynchResult.setContents((Serializable) completedFuture.get());
+ asynchResult.setContents(completedFuture.get());
} catch (InterruptedException e) {
asynchResult.setContents(processException(e, serviceStruct.targetClass));
} catch (ExecutionException e) {
Modified: trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketClientInstance.java
===================================================================
--- trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketClientInstance.java 2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketClientInstance.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -23,15 +23,17 @@
package com.metamatrix.common.comm.platform.socket.server;
import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import com.metamatrix.common.comm.ClientServiceRegistry;
import com.metamatrix.common.comm.api.Message;
import com.metamatrix.common.comm.exception.CommunicationException;
import com.metamatrix.common.comm.platform.CommPlatformPlugin;
+import com.metamatrix.common.comm.platform.socket.ChannelListener;
import com.metamatrix.common.comm.platform.socket.Handshake;
import com.metamatrix.common.comm.platform.socket.ObjectChannel;
import com.metamatrix.common.comm.platform.socket.SocketVMController;
-import com.metamatrix.common.comm.platform.socket.ObjectChannel.ChannelListener;
import com.metamatrix.common.log.LogManager;
import com.metamatrix.common.queue.WorkerPool;
import com.metamatrix.common.util.crypto.CryptoException;
@@ -67,6 +69,12 @@
this.server = server;
this.usingEncryption = isClientEncryptionEnabled;
this.sessionService = sessionService;
+ SocketAddress address = this.objectSocket.getRemoteAddress();
+ if (address instanceof InetSocketAddress) {
+ InetSocketAddress addr = (InetSocketAddress)address;
+ this.workContext.setClientAddress(addr.getAddress().getHostAddress());
+ this.workContext.setClientHostname(addr.getHostName());
+ }
}
public void send(Message message, Serializable messageKey) {
Modified: trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketListener.java
===================================================================
--- trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketListener.java 2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/server/src/main/java/com/metamatrix/common/comm/platform/socket/server/SocketListener.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -35,11 +35,11 @@
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import com.metamatrix.common.comm.ClientServiceRegistry;
+import com.metamatrix.common.comm.platform.socket.ChannelListener;
import com.metamatrix.common.comm.platform.socket.ObjectChannel;
import com.metamatrix.common.comm.platform.socket.SSLAwareChannelHandler;
import com.metamatrix.common.comm.platform.socket.SocketVMController;
-import com.metamatrix.common.comm.platform.socket.ObjectChannel.ChannelListener;
-import com.metamatrix.common.comm.platform.socket.ObjectChannel.ChannelListenerFactory;
+import com.metamatrix.common.comm.platform.socket.ChannelListener.ChannelListenerFactory;
import com.metamatrix.common.log.LogManager;
import com.metamatrix.common.queue.WorkerPool;
import com.metamatrix.common.queue.WorkerPoolFactory;
Modified: trunk/server/src/main/java/com/metamatrix/common/net/ServerSocketConfiguration.java
===================================================================
--- trunk/server/src/main/java/com/metamatrix/common/net/ServerSocketConfiguration.java 2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/server/src/main/java/com/metamatrix/common/net/ServerSocketConfiguration.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -23,6 +23,7 @@
package com.metamatrix.common.net;
import java.io.IOException;
+import java.security.GeneralSecurityException;
import java.util.Arrays;
import java.util.Properties;
@@ -121,7 +122,7 @@
sslProtocol = props.getProperty(SSL_PROTOCOL, DEFAULT_SSL_PROTOCOL);
}
- public SSLEngine getServerSSLEngine() throws IOException {
+ public SSLEngine getServerSSLEngine() throws IOException, GeneralSecurityException {
if (!isServerSSLEnabled()) {
return null;
}
Modified: trunk/server/src/test/java/com/metamatrix/common/comm/platform/socket/server/TestCommSockets.java
===================================================================
--- trunk/server/src/test/java/com/metamatrix/common/comm/platform/socket/server/TestCommSockets.java 2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/server/src/test/java/com/metamatrix/common/comm/platform/socket/server/TestCommSockets.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -22,6 +22,9 @@
*/
package com.metamatrix.common.comm.platform.socket.server;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import java.net.InetSocketAddress;
@@ -29,7 +32,8 @@
import javax.net.ssl.SSLEngine;
-import junit.framework.TestCase;
+import org.junit.Before;
+import org.junit.Test;
import com.metamatrix.api.exception.ComponentNotFoundException;
import com.metamatrix.api.exception.security.LogonException;
@@ -49,18 +53,17 @@
import com.metamatrix.platform.security.api.service.SessionServiceInterface;
import com.metamatrix.platform.vm.controller.SocketListenerStats;
-public class TestCommSockets extends TestCase {
+public class TestCommSockets {
SocketListener listener;
- @Override
- protected void tearDown() throws Exception {
+ @Before public void tearDown() throws Exception {
if (listener != null) {
listener.stop();
}
}
- public void testFailedConnect() throws Exception {
+ @Test public void testFailedConnect() throws Exception {
InetSocketAddress addr = new InetSocketAddress(0);
ClientServiceRegistry csr = new ClientServiceRegistry();
SessionServiceInterface sessionService = mock(SessionServiceInterface.class);
@@ -80,7 +83,7 @@
}
}
- public void testConnect() throws Exception {
+ @Test public void testConnect() throws Exception {
SocketServerConnection conn = helpEstablishConnection(false, null);
SocketListenerStats stats = listener.getStats();
assertEquals(2, stats.objectsRead); // handshake response, logon,
@@ -98,7 +101,7 @@
assertEquals(0, stats.sockets);
}
- public void testConnectWithoutClientEncryption() throws Exception {
+ @Test public void testConnectWithoutClientEncryption() throws Exception {
SocketServerConnection conn = helpEstablishConnection(false, null, false, new Properties());
assertTrue(((SocketServerInstanceImpl) conn
.selectServerInstance()).getCryptor() instanceof NullCryptor);
@@ -138,11 +141,11 @@
secure).getAppServerURL());
p.setProperty(MMURL.CONNECTION.DISCOVERY_STRATEGY, UrlServerDiscovery.class.getName());
SocketServerConnectionFactory sscf = new SocketServerConnectionFactory();
- sscf.init(socketConfig, false);
+ sscf.init(socketConfig);
return sscf.createConnection(p);
}
- public void testSSLConnectWithNonSSLServer() throws Exception {
+ @Test public void testSSLConnectWithNonSSLServer() throws Exception {
try {
helpEstablishConnection(true, null);
fail("exception expected"); //$NON-NLS-1$
@@ -151,7 +154,7 @@
}
}
- public void testAnonSSLConnect() throws Exception {
+ @Test public void testAnonSSLConnect() throws Exception {
SSLEngine engine = SocketUtil.getAnonSSLContext().createSSLEngine();
engine.setUseClientMode(false);
engine.setEnabledCipherSuites(new String[] { SocketUtil.ANON_CIPHER_SUITE });
@@ -160,5 +163,5 @@
SocketServerConnection conn = helpEstablishConnection(true, engine, true, p);
conn.shutdown();
}
-
+
}
Modified: trunk/server/src/test/java/com/metamatrix/common/comm/platform/socket/server/TestSocketRemoting.java
===================================================================
--- trunk/server/src/test/java/com/metamatrix/common/comm/platform/socket/server/TestSocketRemoting.java 2009-03-16 19:37:58 UTC (rev 564)
+++ trunk/server/src/test/java/com/metamatrix/common/comm/platform/socket/server/TestSocketRemoting.java 2009-03-17 14:25:36 UTC (rev 565)
@@ -40,7 +40,7 @@
import com.metamatrix.common.api.MMURL;
import com.metamatrix.common.comm.ClientServiceRegistry;
import com.metamatrix.common.comm.api.Message;
-import com.metamatrix.common.comm.api.MessageListener;
+import com.metamatrix.common.comm.api.ResultsReceiver;
import com.metamatrix.common.comm.exception.CommunicationException;
import com.metamatrix.common.comm.exception.ConnectionException;
import com.metamatrix.common.comm.platform.socket.client.SocketServerConnection;
@@ -73,7 +73,7 @@
private static class FakeServiceImpl implements FakeService {
public ResultsFuture<Integer> asynchResult() {
- ResultsFuture<Integer> result = new ResultsFuture();
+ ResultsFuture<Integer> result = new ResultsFuture<Integer>();
result.getResultsReceiver().receiveResults(new Integer(5));
return result;
}
@@ -87,9 +87,9 @@
private static class FakeClientServerInstance extends SocketServerInstanceImpl implements ClientInstance {
ClientServiceRegistry clientServiceRegistry;
- private MessageListener listener;
+ private ResultsReceiver<Object> listener;
- public FakeClientServerInstance(ClientServiceRegistry clientServiceRegistry) throws CommunicationException, IOException {
+ public FakeClientServerInstance(ClientServiceRegistry clientServiceRegistry) {
super();
this.clientServiceRegistry = clientServiceRegistry;
}
@@ -101,9 +101,11 @@
public boolean isOpen() {
return true;
}
-
- public void send(Message message, MessageListener listener,
- Serializable messageKey) throws CommunicationException {
+
+ @Override
+ public void send(Message message, ResultsReceiver<Object> listener,
+ Serializable messageKey) throws CommunicationException,
+ InterruptedException {
ServerWorkItem workItem = new ServerWorkItem(this, messageKey, message, clientServiceRegistry, SimpleMock.createSimpleMock(SessionServiceInterface.class));
this.listener = listener;
workItem.run();
@@ -122,7 +124,7 @@
}
public void send(Message message, Serializable messageKey) {
- this.listener.deliverMessage(message, messageKey);
+ this.listener.receiveResults(message.getContents());
}
}
@@ -208,7 +210,7 @@
SocketServerConnection connection = new SocketServerConnection(new SocketServerInstanceFactory() {
@Override
- public SocketServerInstance createServerInstance(HostInfo info,
+ public SocketServerInstance getServerInstance(HostInfo info,
boolean ssl) throws CommunicationException, IOException {
return serverInstance;
}
More information about the teiid-commits
mailing list