[teiid-commits] teiid SVN: r1762 - in trunk/client/src: main/java/org/teiid/netty/handler/codec/serialization and 1 other directories.
teiid-commits at lists.jboss.org
teiid-commits at lists.jboss.org
Wed Jan 20 22:49:48 EST 2010
Author: shawkins
Date: 2010-01-20 22:49:48 -0500 (Wed, 20 Jan 2010)
New Revision: 1762
Modified:
trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java
trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectDecoderInputStream.java
trunk/client/src/test/java/org/teiid/netty/handler/codec/serialization/TestObjectDecoderInputStream.java
Log:
TEIID-916 fix for stream corruption during a timeout.
Modified: trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java 2010-01-20 20:25:36 UTC (rev 1761)
+++ trunk/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerInstanceImpl.java 2010-01-21 03:49:48 UTC (rev 1762)
@@ -41,7 +41,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -84,7 +83,7 @@
private Map<Serializable, ResultsReceiver<Object>> asynchronousListeners = new ConcurrentHashMap<Serializable, ResultsReceiver<Object>>();
- private ReentrantLock readLock = new ReentrantLock();
+ private boolean hasReader;
public SocketServerInstanceImpl() {
@@ -318,18 +317,29 @@
TimeoutException {
long timeoutMillis = (int)Math.min(unit.toMillis(timeout), Integer.MAX_VALUE);
long start = System.currentTimeMillis();
- boolean reading = false;
while (!isDone()) {
- try {
- if ((reading = readLock.tryLock(timeoutMillis, TimeUnit.MILLISECONDS)) == true && !isDone()) {
- receivedMessage(socketChannel.read());
+ boolean reading = false;
+ synchronized (SocketServerInstanceImpl.this) {
+ if (!hasReader) {
+ hasReader = true;
+ reading = true;
+ } else if (!isDone()) {
+ SocketServerInstanceImpl.this.wait(Math.max(1, timeoutMillis));
}
- } catch (SocketTimeoutException e) {
- } catch (Exception e) {
- exceptionOccurred(e);
- } finally {
- if (reading) {
- readLock.unlock();
+ }
+ if (reading) {
+ try {
+ if (!isDone()) {
+ receivedMessage(socketChannel.read());
+ }
+ } catch (SocketTimeoutException e) {
+ } catch (Exception e) {
+ exceptionOccurred(e);
+ } finally {
+ synchronized (SocketServerInstanceImpl.this) {
+ hasReader = false;
+ SocketServerInstanceImpl.this.notifyAll();
+ }
}
}
if (!isDone()) {
Modified: trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectDecoderInputStream.java
===================================================================
--- trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectDecoderInputStream.java 2010-01-20 20:25:36 UTC (rev 1761)
+++ trunk/client/src/main/java/org/teiid/netty/handler/codec/serialization/ObjectDecoderInputStream.java 2010-01-21 03:49:48 UTC (rev 1762)
@@ -65,7 +65,7 @@
buffer = new byte[4];
}
fillBuffer();
- int dataLen = ((buffer[0] & 0xff << 24) + (buffer[1] & 0xff << 16) + (buffer[2] & 0xff << 8) + (buffer[3] & 0xff << 0));
+ int dataLen = getIntFromBytes(buffer);
if (dataLen <= 0) {
throw new StreamCorruptedException("invalid data length: " + dataLen); //$NON-NLS-1$
}
@@ -87,6 +87,10 @@
return new CompactObjectInputStream(bais, classLoader).readObject();
}
+ static int getIntFromBytes(byte[] buffer) {
+ return ((buffer[0] & 0xff) << 24) + ((buffer[1] & 0xff) << 16) + ((buffer[2] & 0xff) << 8) + (buffer[3] & 0xff);
+ }
+
private void fillBuffer() throws IOException, EOFException {
while (count < buffer.length) {
int read = in.read(buffer, count, buffer.length - count);
Modified: trunk/client/src/test/java/org/teiid/netty/handler/codec/serialization/TestObjectDecoderInputStream.java
===================================================================
--- trunk/client/src/test/java/org/teiid/netty/handler/codec/serialization/TestObjectDecoderInputStream.java 2010-01-20 20:25:36 UTC (rev 1761)
+++ trunk/client/src/test/java/org/teiid/netty/handler/codec/serialization/TestObjectDecoderInputStream.java 2010-01-21 03:49:48 UTC (rev 1762)
@@ -67,4 +67,13 @@
assertEquals(obj, result);
}
+ @Test public void testLargeIntConversion() throws Exception {
+ int testValue = 204503404;
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ dos.writeInt(testValue);
+ dos.close();
+ assertEquals(testValue, ObjectDecoderInputStream.getIntFromBytes(baos.toByteArray()));
+ }
+
}
More information about the teiid-commits
mailing list