Author: shawkins
Date: 2011-12-20 17:40:36 -0500 (Tue, 20 Dec 2011)
New Revision: 3755
Modified:
branches/7.6.x/common-core/src/main/java/org/teiid/core/types/BlobType.java
branches/7.6.x/common-core/src/main/java/org/teiid/core/types/ClobType.java
branches/7.6.x/common-core/src/main/java/org/teiid/core/types/Streamable.java
branches/7.6.x/common-core/src/main/java/org/teiid/core/types/XMLType.java
branches/7.6.x/common-core/src/test/java/org/teiid/core/types/TestClobValue.java
branches/7.6.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
Log:
TEIID-1879 moved the proactive check for source closure out of the doneProducingBatches
method and added protection in the streamable logic so that the socket won't close
Modified: branches/7.6.x/common-core/src/main/java/org/teiid/core/types/BlobType.java
===================================================================
--- branches/7.6.x/common-core/src/main/java/org/teiid/core/types/BlobType.java 2011-12-20
22:07:09 UTC (rev 3754)
+++ branches/7.6.x/common-core/src/main/java/org/teiid/core/types/BlobType.java 2011-12-20
22:40:36 UTC (rev 3755)
@@ -22,6 +22,7 @@
package org.teiid.core.types;
+import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInput;
@@ -147,7 +148,7 @@
}
@Override
- protected void writeReference(final ObjectOutput out) throws IOException {
+ protected void writeReference(final DataOutput out) throws IOException {
try {
writeBinary(out, getBinaryStream(), (int)length);
} catch (SQLException e) {
@@ -155,7 +156,7 @@
}
}
- static void writeBinary(final ObjectOutput out, InputStream is, int length) throws
IOException {
+ static void writeBinary(final DataOutput out, InputStream is, int length) throws
IOException {
OutputStream os = new OutputStream() {
@Override
Modified: branches/7.6.x/common-core/src/main/java/org/teiid/core/types/ClobType.java
===================================================================
--- branches/7.6.x/common-core/src/main/java/org/teiid/core/types/ClobType.java 2011-12-20
22:07:09 UTC (rev 3754)
+++ branches/7.6.x/common-core/src/main/java/org/teiid/core/types/ClobType.java 2011-12-20
22:40:36 UTC (rev 3755)
@@ -22,10 +22,10 @@
package org.teiid.core.types;
+import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInput;
-import java.io.ObjectOutput;
import java.io.OutputStream;
import java.io.Reader;
import java.io.StringWriter;
@@ -230,7 +230,7 @@
* These clobs should be small, so the wasted space should be minimal.
*/
@Override
- protected void writeReference(final ObjectOutput out) throws IOException {
+ protected void writeReference(final DataOutput out) throws IOException {
Writer w = new Writer() {
@Override
Modified: branches/7.6.x/common-core/src/main/java/org/teiid/core/types/Streamable.java
===================================================================
---
branches/7.6.x/common-core/src/main/java/org/teiid/core/types/Streamable.java 2011-12-20
22:07:09 UTC (rev 3754)
+++
branches/7.6.x/common-core/src/main/java/org/teiid/core/types/Streamable.java 2011-12-20
22:40:36 UTC (rev 3755)
@@ -22,6 +22,8 @@
package org.teiid.core.types;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
@@ -29,8 +31,11 @@
import java.nio.charset.Charset;
import java.sql.SQLException;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import org.teiid.core.CorePlugin;
+import org.teiid.core.util.AccessibleByteArrayOutputStream;
@@ -42,6 +47,8 @@
* this is the ID that client needs to reference to get the chunk of data.
*/
public abstract class Streamable<T> implements Externalizable {
+
+ private static final Logger logger = Logger.getLogger(Streamable.class.getName());
private static final long serialVersionUID = -8252488562134729374L;
@@ -112,7 +119,7 @@
public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
length = in.readLong();
- referenceStreamId = (String)in.readObject();
+ this.referenceStreamId = (String)in.readObject();
if (referenceStreamId == null) {
//we expect the data inline
readReference(in);
@@ -128,12 +135,27 @@
} catch (SQLException e) {
}
out.writeLong(length);
- out.writeObject(referenceStreamId);
+ boolean writeBuffer = false;
+ AccessibleByteArrayOutputStream baos = null;
if (referenceStreamId == null) {
- writeReference(out);
+ //TODO: detect when this buffering is not necessary
+ baos = new AccessibleByteArrayOutputStream();
+ DataOutputStream dataOutput = new DataOutputStream(baos);
+ try {
+ writeReference(dataOutput);
+ dataOutput.close();
+ writeBuffer = true;
+ } catch (IOException e) {
+ logger.log(Level.WARNING, e.getMessage());
+ referenceStreamId = "error"; //$NON-NLS-1$
+ }
}
+ out.writeObject(referenceStreamId);
+ if (writeBuffer) {
+ out.write(baos.getBuffer(), 0, baos.getCount());
+ }
}
- protected abstract void writeReference(ObjectOutput out) throws IOException;
+ protected abstract void writeReference(DataOutput out) throws IOException;
}
Modified: branches/7.6.x/common-core/src/main/java/org/teiid/core/types/XMLType.java
===================================================================
--- branches/7.6.x/common-core/src/main/java/org/teiid/core/types/XMLType.java 2011-12-20
22:07:09 UTC (rev 3754)
+++ branches/7.6.x/common-core/src/main/java/org/teiid/core/types/XMLType.java 2011-12-20
22:40:36 UTC (rev 3755)
@@ -22,6 +22,7 @@
package org.teiid.core.types;
+import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInput;
@@ -228,7 +229,7 @@
}
@Override
- protected void writeReference(final ObjectOutput out) throws IOException {
+ protected void writeReference(final DataOutput out) throws IOException {
try {
BlobType.writeBinary(out, getBinaryStream(), (int)length);
} catch (SQLException e) {
Modified:
branches/7.6.x/common-core/src/test/java/org/teiid/core/types/TestClobValue.java
===================================================================
---
branches/7.6.x/common-core/src/test/java/org/teiid/core/types/TestClobValue.java 2011-12-20
22:07:09 UTC (rev 3754)
+++
branches/7.6.x/common-core/src/test/java/org/teiid/core/types/TestClobValue.java 2011-12-20
22:40:36 UTC (rev 3755)
@@ -28,6 +28,7 @@
import java.io.Reader;
import javax.sql.rowset.serial.SerialClob;
+import javax.sql.rowset.serial.SerialException;
import org.junit.Test;
import org.teiid.core.util.UnitTestUtil;
@@ -77,6 +78,27 @@
assertEquals(testString, read.getSubString(1, testString.length()));
}
+ @SuppressWarnings("serial")
+ @Test public void testReferencePersistenceError() throws Exception {
+ String testString = "this is test clob"; //$NON-NLS-1$
+ SerialClob clob = new SerialClob(testString.toCharArray()) {
+ @Override
+ public Reader getCharacterStream() throws SerialException {
+ throw new SerialException();
+ }
+ };
+
+ ClobType cv = new ClobType(clob);
+ cv.setReferenceStreamId(null);
+
+ // now force to serialize
+ ClobType read = UnitTestUtil.helpSerialize(cv);
+
+ assertTrue(read.length() > 0);
+ assertNotNull(read.getReferenceStreamId());
+ assertNull(read.getReference());
+ }
+
@Test public void testClobSubstring() throws Exception {
ClobImpl clob = new ClobImpl() {
public java.io.Reader getCharacterStream() throws java.sql.SQLException {
Modified:
branches/7.6.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
---
branches/7.6.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2011-12-20
22:07:09 UTC (rev 3754)
+++
branches/7.6.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2011-12-20
22:40:36 UTC (rev 3755)
@@ -370,6 +370,12 @@
this.resultsBuffer = collector.collectTuples();
if (!doneProducingBatches) {
doneProducingBatches();
+ //TODO: we could perform more tracking to know what source lobs are in use
+ if (this.resultsBuffer.getLobCount() == 0) {
+ for (DataTierTupleSource connectorRequest : getConnectorRequests()) {
+ connectorRequest.fullyCloseSource();
+ }
+ }
addToCache();
}
}
@@ -519,8 +525,8 @@
}
if (batch.getTerminationFlag()) {
doneProducingBatches();
+ addToCache();
}
- addToCache();
synchronized (lobStreams) {
if (resultsBuffer.isLobs()) {
super.flushBatchDirect(batch, false);
@@ -919,12 +925,6 @@
private void doneProducingBatches() {
this.doneProducingBatches = true;
- //TODO: we could perform more tracking to know what source lobs are in use
- if (this.resultsBuffer.getLobCount() == 0) {
- for (DataTierTupleSource connectorRequest : getConnectorRequests()) {
- connectorRequest.fullyCloseSource();
- }
- }
dqpCore.finishProcessing(this);
}