[teiid-commits] teiid SVN: r1762 - in trunk/client/src: main/java/org/teiid/netty/handler/codec/serialization and 1 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Wed Jan 20 22:49:48 EST 2010


Author: shawkins
Date: 2010-01-20 22:49:48 -0500 (Wed, 20 Jan 2010)
New Revision: 1762

Modified:
   trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java
   trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectDecoderInputStream.java
   trunk/client/src/test/java/org/teiid/netty/handler/codec/serialization/TestObjectDecoderInputStream.java
Log:
TEIID-916 fix for stream corruption during a timeout.

Modified: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java	2010-01-20 20:25:36 UTC (rev 1761)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java	2010-01-21 03:49:48 UTC (rev 1762)
@@ -41,7 +41,6 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -84,7 +83,7 @@
     
     private Map<Serializable, ResultsReceiver<Object>> asynchronousListeners = new ConcurrentHashMap<Serializable, ResultsReceiver<Object>>();
     
-    private ReentrantLock readLock = new ReentrantLock();
+    private boolean hasReader;
     
     public SocketServerInstanceImpl() {
     	
@@ -318,18 +317,29 @@
 							TimeoutException {
 						long timeoutMillis = (int)Math.min(unit.toMillis(timeout), Integer.MAX_VALUE);
 						long start = System.currentTimeMillis();
-						boolean reading = false;
 						while (!isDone()) {
-							try {
-								if ((reading = readLock.tryLock(timeoutMillis, TimeUnit.MILLISECONDS)) == true && !isDone()) {
-									receivedMessage(socketChannel.read());
+							boolean reading = false;
+							synchronized (SocketServerInstanceImpl.this) {
+								if (!hasReader) {
+									hasReader = true;
+									reading = true;
+								} else if (!isDone()) {
+									SocketServerInstanceImpl.this.wait(Math.max(1, timeoutMillis));
 								}
-							} catch (SocketTimeoutException e) {
-							} catch (Exception e) {
-								exceptionOccurred(e);
-							} finally {
-								if (reading) {
-									readLock.unlock();
+							} 
+							if (reading) {
+								try {
+									if (!isDone()) {
+										receivedMessage(socketChannel.read());
+									}
+								} catch (SocketTimeoutException e) {
+								} catch (Exception e) {
+									exceptionOccurred(e);
+								} finally {
+									synchronized (SocketServerInstanceImpl.this) {
+										hasReader = false;
+										SocketServerInstanceImpl.this.notifyAll();
+									}
 								}
 							}
 							if (!isDone()) {

Modified: trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectDecoderInputStream.java
===================================================================
--- trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectDecoderInputStream.java	2010-01-20 20:25:36 UTC (rev 1761)
+++ trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectDecoderInputStream.java	2010-01-21 03:49:48 UTC (rev 1762)
@@ -65,7 +65,7 @@
         		buffer = new byte[4];
         	}
             fillBuffer();
-    		int dataLen = ((buffer[0] & 0xff << 24) + (buffer[1] & 0xff << 16) + (buffer[2] & 0xff << 8) + (buffer[3] & 0xff << 0));
+    		int dataLen = getIntFromBytes(buffer);
 	        if (dataLen <= 0) {
 	            throw new StreamCorruptedException("invalid data length: " + dataLen); //$NON-NLS-1$
 	        }
@@ -87,6 +87,10 @@
         return new CompactObjectInputStream(bais, classLoader).readObject();
     }
 
+	static int getIntFromBytes(byte[] buffer) {
+		return ((buffer[0] & 0xff) << 24) + ((buffer[1] & 0xff) << 16) + ((buffer[2] & 0xff) << 8) + (buffer[3] & 0xff);
+	}
+
 	private void fillBuffer() throws IOException, EOFException {
 		while (count < buffer.length) {
 	        int read = in.read(buffer, count, buffer.length - count);

Modified: trunk/client/src/test/java/org/teiid/netty/handler/codec/serialization/TestObjectDecoderInputStream.java
===================================================================
--- trunk/client/src/test/java/org/teiid/netty/handler/codec/serialization/TestObjectDecoderInputStream.java	2010-01-20 20:25:36 UTC (rev 1761)
+++ trunk/client/src/test/java/org/teiid/netty/handler/codec/serialization/TestObjectDecoderInputStream.java	2010-01-21 03:49:48 UTC (rev 1762)
@@ -67,4 +67,13 @@
 		assertEquals(obj, result);
 	}
 	
+	@Test public void testLargeIntConversion() throws Exception {
+		int testValue = 204503404;
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		DataOutputStream dos = new DataOutputStream(baos);
+		dos.writeInt(testValue);
+		dos.close();
+		assertEquals(testValue, ObjectDecoderInputStream.getIntFromBytes(baos.toByteArray()));
+	}
+	
 }



More information about the teiid-commits mailing list