[teiid-commits] teiid SVN: r3755 - in branches/7.6.x: common-core/src/test/java/org/teiid/core/types and 1 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Tue Dec 20 17:40:37 EST 2011


Author: shawkins
Date: 2011-12-20 17:40:36 -0500 (Tue, 20 Dec 2011)
New Revision: 3755

Modified:
   branches/7.6.x/common-core/src/main/java/org/teiid/core/types/BlobType.java
   branches/7.6.x/common-core/src/main/java/org/teiid/core/types/ClobType.java
   branches/7.6.x/common-core/src/main/java/org/teiid/core/types/Streamable.java
   branches/7.6.x/common-core/src/main/java/org/teiid/core/types/XMLType.java
   branches/7.6.x/common-core/src/test/java/org/teiid/core/types/TestClobValue.java
   branches/7.6.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
Log:
TEIID-1879 moved the proactive check for source closure out of the doneProducingBatches method and added protection in the streamable logic so that the socket won't close

Modified: branches/7.6.x/common-core/src/main/java/org/teiid/core/types/BlobType.java
===================================================================
--- branches/7.6.x/common-core/src/main/java/org/teiid/core/types/BlobType.java	2011-12-20 22:07:09 UTC (rev 3754)
+++ branches/7.6.x/common-core/src/main/java/org/teiid/core/types/BlobType.java	2011-12-20 22:40:36 UTC (rev 3755)
@@ -22,6 +22,7 @@
 
 package org.teiid.core.types;
 
+import java.io.DataOutput;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInput;
@@ -147,7 +148,7 @@
 	}
 	
 	@Override
-	protected void writeReference(final ObjectOutput out) throws IOException {
+	protected void writeReference(final DataOutput out) throws IOException {
 		try {
 			writeBinary(out, getBinaryStream(), (int)length);
 		} catch (SQLException e) {
@@ -155,7 +156,7 @@
 		}
 	}
 
-	static void writeBinary(final ObjectOutput out, InputStream is, int length) throws IOException {
+	static void writeBinary(final DataOutput out, InputStream is, int length) throws IOException {
 		OutputStream os = new OutputStream() {
 			
 			@Override

Modified: branches/7.6.x/common-core/src/main/java/org/teiid/core/types/ClobType.java
===================================================================
--- branches/7.6.x/common-core/src/main/java/org/teiid/core/types/ClobType.java	2011-12-20 22:07:09 UTC (rev 3754)
+++ branches/7.6.x/common-core/src/main/java/org/teiid/core/types/ClobType.java	2011-12-20 22:40:36 UTC (rev 3755)
@@ -22,10 +22,10 @@
 
 package org.teiid.core.types;
 
+import java.io.DataOutput;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInput;
-import java.io.ObjectOutput;
 import java.io.OutputStream;
 import java.io.Reader;
 import java.io.StringWriter;
@@ -230,7 +230,7 @@
 	 * These clobs should be small, so the wasted space should be minimal.
 	 */
 	@Override
-	protected void writeReference(final ObjectOutput out) throws IOException {
+	protected void writeReference(final DataOutput out) throws IOException {
 		Writer w = new Writer() {
 			
 			@Override

Modified: branches/7.6.x/common-core/src/main/java/org/teiid/core/types/Streamable.java
===================================================================
--- branches/7.6.x/common-core/src/main/java/org/teiid/core/types/Streamable.java	2011-12-20 22:07:09 UTC (rev 3754)
+++ branches/7.6.x/common-core/src/main/java/org/teiid/core/types/Streamable.java	2011-12-20 22:40:36 UTC (rev 3755)
@@ -22,6 +22,8 @@
 
 package org.teiid.core.types;
 
+import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
@@ -29,8 +31,11 @@
 import java.nio.charset.Charset;
 import java.sql.SQLException;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.teiid.core.CorePlugin;
+import org.teiid.core.util.AccessibleByteArrayOutputStream;
 
 
 
@@ -42,6 +47,8 @@
  * this is the ID that client needs to reference to get the chunk of data.
  */
 public abstract class Streamable<T> implements Externalizable {
+	
+	private static final Logger logger = Logger.getLogger(Streamable.class.getName());
 
 	private static final long serialVersionUID = -8252488562134729374L;
 	
@@ -112,7 +119,7 @@
     public void readExternal(ObjectInput in) throws IOException,
     		ClassNotFoundException {
     	length = in.readLong();
-    	referenceStreamId = (String)in.readObject();
+    	this.referenceStreamId = (String)in.readObject();
     	if (referenceStreamId == null) {
     		//we expect the data inline
     		readReference(in);
@@ -128,12 +135,27 @@
 		} catch (SQLException e) {
 		}
     	out.writeLong(length);
-    	out.writeObject(referenceStreamId);
+    	boolean writeBuffer = false;
+    	AccessibleByteArrayOutputStream baos = null;
     	if (referenceStreamId == null) {
-    		writeReference(out);
+    		//TODO: detect when this buffering is not necessary
+    		baos = new AccessibleByteArrayOutputStream();
+    		DataOutputStream dataOutput = new DataOutputStream(baos);
+    		try {
+    			writeReference(dataOutput);
+    			dataOutput.close();
+    			writeBuffer = true;
+    		} catch (IOException e) {
+    			logger.log(Level.WARNING, e.getMessage());
+    			referenceStreamId = "error"; //$NON-NLS-1$
+    		}
     	}
+    	out.writeObject(referenceStreamId);
+		if (writeBuffer) {
+			out.write(baos.getBuffer(), 0, baos.getCount());
+		}
     }
     
-    protected abstract void writeReference(ObjectOutput out) throws IOException;
+    protected abstract void writeReference(DataOutput out) throws IOException;
     
 }

Modified: branches/7.6.x/common-core/src/main/java/org/teiid/core/types/XMLType.java
===================================================================
--- branches/7.6.x/common-core/src/main/java/org/teiid/core/types/XMLType.java	2011-12-20 22:07:09 UTC (rev 3754)
+++ branches/7.6.x/common-core/src/main/java/org/teiid/core/types/XMLType.java	2011-12-20 22:40:36 UTC (rev 3755)
@@ -22,6 +22,7 @@
 
 package org.teiid.core.types;
 
+import java.io.DataOutput;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInput;
@@ -228,7 +229,7 @@
 	}
 	
 	@Override
-	protected void writeReference(final ObjectOutput out) throws IOException {
+	protected void writeReference(final DataOutput out) throws IOException {
 		try {
 			BlobType.writeBinary(out, getBinaryStream(), (int)length);
 		} catch (SQLException e) {

Modified: branches/7.6.x/common-core/src/test/java/org/teiid/core/types/TestClobValue.java
===================================================================
--- branches/7.6.x/common-core/src/test/java/org/teiid/core/types/TestClobValue.java	2011-12-20 22:07:09 UTC (rev 3754)
+++ branches/7.6.x/common-core/src/test/java/org/teiid/core/types/TestClobValue.java	2011-12-20 22:40:36 UTC (rev 3755)
@@ -28,6 +28,7 @@
 import java.io.Reader;
 
 import javax.sql.rowset.serial.SerialClob;
+import javax.sql.rowset.serial.SerialException;
 
 import org.junit.Test;
 import org.teiid.core.util.UnitTestUtil;
@@ -77,6 +78,27 @@
         assertEquals(testString, read.getSubString(1, testString.length()));
     }
     
+    @SuppressWarnings("serial")
+	@Test public void testReferencePersistenceError() throws Exception {
+    	String testString = "this is test clob"; //$NON-NLS-1$
+        SerialClob clob = new SerialClob(testString.toCharArray()) {
+        	@Override
+        	public Reader getCharacterStream() throws SerialException {
+        		throw new SerialException();
+        	}
+        };
+        
+        ClobType cv = new ClobType(clob);
+        cv.setReferenceStreamId(null);
+        
+        // now force to serialize
+        ClobType read = UnitTestUtil.helpSerialize(cv);
+        
+        assertTrue(read.length() > 0);
+        assertNotNull(read.getReferenceStreamId());
+        assertNull(read.getReference());
+    }
+    
     @Test public void testClobSubstring() throws Exception {
     	ClobImpl clob = new ClobImpl() {
     		public java.io.Reader getCharacterStream() throws java.sql.SQLException {

Modified: branches/7.6.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- branches/7.6.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2011-12-20 22:07:09 UTC (rev 3754)
+++ branches/7.6.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2011-12-20 22:40:36 UTC (rev 3755)
@@ -370,6 +370,12 @@
 			this.resultsBuffer = collector.collectTuples();
 			if (!doneProducingBatches) {
 				doneProducingBatches();
+				//TODO: we could perform more tracking to know what source lobs are in use
+				if (this.resultsBuffer.getLobCount() == 0) {
+					for (DataTierTupleSource connectorRequest : getConnectorRequests()) {
+						connectorRequest.fullyCloseSource();
+				    }
+				}
 				addToCache();
 			}
 		}
@@ -519,8 +525,8 @@
 				}
 				if (batch.getTerminationFlag()) {
 					doneProducingBatches();
+					addToCache();
 				}
-				addToCache();
 				synchronized (lobStreams) {
 					if (resultsBuffer.isLobs()) {
 						super.flushBatchDirect(batch, false);
@@ -919,12 +925,6 @@
 
 	private void doneProducingBatches() {
 		this.doneProducingBatches = true;
-		//TODO: we could perform more tracking to know what source lobs are in use
-		if (this.resultsBuffer.getLobCount() == 0) {
-			for (DataTierTupleSource connectorRequest : getConnectorRequests()) {
-				connectorRequest.fullyCloseSource();
-		    }
-		}
 		dqpCore.finishProcessing(this);
 	}
 	



More information about the teiid-commits mailing list