[teiid-commits] teiid SVN: r3389 - in branches/7.4.x: documentation/admin-guide/src/main/docbook/en-US/content and 2 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Wed Aug 17 10:23:37 EDT 2011


Author: shawkins
Date: 2011-08-17 10:23:34 -0400 (Wed, 17 Aug 2011)
New Revision: 3389

Modified:
   branches/7.4.x/common-core/src/main/java/org/teiid/core/util/ObjectConverterUtil.java
   branches/7.4.x/documentation/admin-guide/src/main/docbook/en-US/content/appendix-c.xml
   branches/7.4.x/runtime/src/main/java/org/teiid/transport/ODBCSocketListener.java
   branches/7.4.x/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java
   branches/7.4.x/test-integration/common/src/test/java/org/teiid/transport/TestODBCSocketTransport.java
Log:
TEIID-1715 simplifying odbc buffering logic

Modified: branches/7.4.x/common-core/src/main/java/org/teiid/core/util/ObjectConverterUtil.java
===================================================================
--- branches/7.4.x/common-core/src/main/java/org/teiid/core/util/ObjectConverterUtil.java	2011-08-17 01:23:42 UTC (rev 3388)
+++ branches/7.4.x/common-core/src/main/java/org/teiid/core/util/ObjectConverterUtil.java	2011-08-17 14:23:34 UTC (rev 3389)
@@ -118,6 +118,7 @@
 	        while ((l_nbytes = is.read(l_buffer, 0, readLength)) != -1) {
 	        	if (length != -1 && writen > length - l_nbytes) {
 		        	out.write(l_buffer, 0, writen + l_nbytes - length); 
+		        	writen = length;
 		        	break;
 	        	}
 	        	out.write(l_buffer,0,l_nbytes); 
@@ -143,7 +144,7 @@
     	return write(out, is, new byte[DEFAULT_READING_SIZE], length, close); // buffer holding bytes to be transferred
     }
     
-    public static void write(final Writer out, final Reader is, int length) throws IOException {
+    public static int write(final Writer out, final Reader is, int length, boolean close) throws IOException {
     	int writen = 0;
         try {
 	        char[] l_buffer = new char[DEFAULT_READING_SIZE]; // buffer holding bytes to be transferred
@@ -151,16 +152,20 @@
 	        while ((l_nbytes = is.read(l_buffer)) != -1) {
 	        	if (length != -1 && writen > length - l_nbytes) {
 		        	out.write(l_buffer, 0, writen + l_nbytes - length); 
+		        	writen = length;
 		        	break;
 	        	}
 	        	out.write(l_buffer,0,l_nbytes); 
 	        	writen += l_nbytes;
 	        }
+	        return writen;
         } finally {
-        	try {
-        		is.close();
-        	} finally {
-        		out.close();
+        	if (close) {
+	        	try {
+	        		is.close();
+	        	} finally {
+	        		out.close();
+	        	}
         	}
         }
     }
@@ -192,7 +197,7 @@
     public static void write(final Reader reader, final File f) throws IOException {
     	f.getParentFile().mkdirs();
     	FileWriter fw = new FileWriter(f);        
-        write(fw, reader, -1);   
+        write(fw, reader, -1, true);   
     }
 
     public static void write(final InputStream is, final File f) throws IOException {
@@ -290,7 +295,7 @@
 
     public static char[] convertToCharArray(Reader reader, int length) throws IOException {
         StringWriter sb = new StringWriter();     
-        write(sb, reader, length);
+        write(sb, reader, length, true);
         return sb.toString().toCharArray();
     }
 

Modified: branches/7.4.x/documentation/admin-guide/src/main/docbook/en-US/content/appendix-c.xml
===================================================================
--- branches/7.4.x/documentation/admin-guide/src/main/docbook/en-US/content/appendix-c.xml	2011-08-17 01:23:42 UTC (rev 3388)
+++ branches/7.4.x/documentation/admin-guide/src/main/docbook/en-US/content/appendix-c.xml	2011-08-17 14:23:34 UTC (rev 3389)
@@ -30,5 +30,10 @@
 			If a traditional join is not possible (such as with NOT IN) a merge join version of the semijoin or antijoin will be considered by upon the costing information available. 
 			</para>
 		</listitem>
+		<listitem>
+			<para><emphasis>org.teiid.ODBCPacketSize</emphasis> - defaults to 307200.  
+			Target size in bytes of the ODBC results buffer.  This is not a hard maximum, lobs and wide rows may use larger buffers.
+			</para>
+		</listitem>
 	</itemizedlist>
 </appendix>
\ No newline at end of file

Modified: branches/7.4.x/runtime/src/main/java/org/teiid/transport/ODBCSocketListener.java
===================================================================
--- branches/7.4.x/runtime/src/main/java/org/teiid/transport/ODBCSocketListener.java	2011-08-17 01:23:42 UTC (rev 3388)
+++ branches/7.4.x/runtime/src/main/java/org/teiid/transport/ODBCSocketListener.java	2011-08-17 14:23:34 UTC (rev 3389)
@@ -34,6 +34,8 @@
 import org.teiid.odbc.ODBCServerRemote;
 
 public class ODBCSocketListener extends SocketListener {
+	
+	private int maxBufferSize = Integer.parseInt(System.getProperty("org.teiid.ODBCPacketSize", "307200")); //$NON-NLS-1$ //$NON-NLS-2$
 	private ODBCServerRemote.AuthenticationType authType = ODBCServerRemote.AuthenticationType.CLEARTEXT;
 	private int maxLobSize;
 	private TeiidDriver driver;
@@ -56,6 +58,10 @@
 	public void setDriver(TeiidDriver driver) {
 		this.driver = driver;
 	}
+	
+	public void setMaxBufferSize(int maxBufferSize) {
+		this.maxBufferSize = maxBufferSize;
+	}
 
 	@Override
 	protected SSLAwareChannelHandler createChannelPipelineFactory(final SSLConfiguration config, final StorageManager storageManager) {
@@ -64,7 +70,7 @@
 				ChannelPipeline pipeline = new DefaultChannelPipeline();
 
 			    pipeline.addLast("odbcFrontendProtocol", new PgFrontendProtocol(1 << 20)); //$NON-NLS-1$
-			    pipeline.addLast("odbcBackendProtocol", new PgBackendProtocol(maxLobSize, config)); //$NON-NLS-1$
+			    pipeline.addLast("odbcBackendProtocol", new PgBackendProtocol(maxLobSize, maxBufferSize, config)); //$NON-NLS-1$
 			    pipeline.addLast("handler", this); //$NON-NLS-1$
 			    return pipeline;
 			}			

Modified: branches/7.4.x/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java
===================================================================
--- branches/7.4.x/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java	2011-08-17 01:23:42 UTC (rev 3388)
+++ branches/7.4.x/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java	2011-08-17 14:23:34 UTC (rev 3389)
@@ -1,4 +1,3 @@
-
 /*
  * JBoss, Home of Professional Open Source.
  * See the COPYRIGHT.txt file distributed with this work for information
@@ -21,30 +20,13 @@
  * 02110-1301 USA.
  */package org.teiid.transport;
 
-import static org.teiid.odbc.PGUtil.PG_TYPE_BOOL;
-import static org.teiid.odbc.PGUtil.PG_TYPE_BPCHAR;
-import static org.teiid.odbc.PGUtil.PG_TYPE_BYTEA;
-import static org.teiid.odbc.PGUtil.PG_TYPE_CHARARRAY;
-import static org.teiid.odbc.PGUtil.PG_TYPE_DATE;
-import static org.teiid.odbc.PGUtil.PG_TYPE_FLOAT4;
-import static org.teiid.odbc.PGUtil.PG_TYPE_FLOAT8;
-import static org.teiid.odbc.PGUtil.PG_TYPE_INT2;
-import static org.teiid.odbc.PGUtil.PG_TYPE_INT4;
-import static org.teiid.odbc.PGUtil.PG_TYPE_INT8;
-import static org.teiid.odbc.PGUtil.PG_TYPE_NUMERIC;
-import static org.teiid.odbc.PGUtil.PG_TYPE_OIDARRAY;
-import static org.teiid.odbc.PGUtil.PG_TYPE_OIDVECTOR;
-import static org.teiid.odbc.PGUtil.PG_TYPE_TEXT;
-import static org.teiid.odbc.PGUtil.PG_TYPE_TEXTARRAY;
-import static org.teiid.odbc.PGUtil.PG_TYPE_TIME;
-import static org.teiid.odbc.PGUtil.PG_TYPE_TIMESTAMP_NO_TMZONE;
-import static org.teiid.odbc.PGUtil.PG_TYPE_UNKNOWN;
-import static org.teiid.odbc.PGUtil.PG_TYPE_VARCHAR;
+import static org.teiid.odbc.PGUtil.*;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Reader;
 import java.io.StreamCorruptedException;
+import java.io.Writer;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.nio.charset.Charset;
@@ -61,6 +43,7 @@
 import javax.net.ssl.SSLEngine;
 
 import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.ChannelDownstreamHandler;
 import org.jboss.netty.channel.ChannelEvent;
@@ -72,7 +55,6 @@
 import org.jboss.netty.handler.ssl.SslHandler;
 import org.teiid.client.util.ResultsFuture;
 import org.teiid.core.util.ObjectConverterUtil;
-import org.teiid.core.util.ReaderInputStream;
 import org.teiid.core.util.ReflectionHelper;
 import org.teiid.jdbc.ResultSetImpl;
 import org.teiid.jdbc.TeiidSQLException;
@@ -108,9 +90,6 @@
 		}
 	}
 	
-    // 300k
-	static int ODBC_SOCKET_BUFF_SIZE = Integer.parseInt(System.getProperty("ODBCPacketSize", "307200"));
-	
 	private final class ResultsWorkItem implements Runnable {
 		private final List<PgColInfo> cols;
 		private final ResultSetImpl rs;
@@ -118,13 +97,13 @@
 		private int rows2Send;
 		private int rowsSent = 0;
 		private int rowsInBuffer = 0;
-		private ChannelBuffer buffer = ChannelBuffers.directBuffer(ODBC_SOCKET_BUFF_SIZE);
 
 		private ResultsWorkItem(List<PgColInfo> cols, ResultSetImpl rs, ResultsFuture<Integer> result, int rows2Send) {
 			this.cols = cols;
 			this.rs = rs;
 			this.result = result;
 			this.rows2Send = rows2Send;
+			initBuffer(maxBufferSize / 8);
 		}
 
 		@Override
@@ -160,7 +139,7 @@
 			boolean processNext = true;
 			try {
     			if (future.get()) {
-    				sendDataRow(rs, cols, buffer);
+    				sendDataRow(rs, cols);
     				rowsSent++;
     				rowsInBuffer++;
     				boolean done = rowsSent == rows2Send;
@@ -170,7 +149,7 @@
     					result.getResultsReceiver().receiveResults(rowsSent);
     				}
     			} else {
-    				sendContents(buffer);
+    				sendContents();
     				result.getResultsReceiver().receiveResults(rowsSent);
     				processNext = false;
     			}
@@ -182,31 +161,33 @@
 		}
 		
 		private void flushResults(boolean force) {
-			int avgRowsize = buffer.readableBytes()/rowsInBuffer;
-			if (force || buffer.writableBytes() < (avgRowsize*2)) {
-				sendContents(buffer);
-				buffer= ChannelBuffers.directBuffer(ODBC_SOCKET_BUFF_SIZE);
+			int avgRowsize = dataOut.writerIndex()/rowsInBuffer;
+			if (force || (maxBufferSize - dataOut.writerIndex()) < (avgRowsize*2)) {
+				sendContents();
+				initBuffer(maxBufferSize / 8);
 				rowsInBuffer = 0;
 			}			
 		}
 	}
     
-    private DataOutputStream dataOut;
-    private ByteArrayOutputStream outBuffer;
-    private char messageType;
+    private ChannelBuffer dataOut;
+	private OutputStreamWriter writer;
+
     private Properties props;    
     private Charset encoding = Charset.forName("UTF-8");
     private ReflectionHelper clientProxy = new ReflectionHelper(ODBCClientRemote.class);
     private ChannelHandlerContext ctx;
     private MessageEvent message;
     private int maxLobSize = (2*1024*1024); // 2 MB
+	private final int maxBufferSize;
     
 	private volatile ResultsFuture<Boolean> nextFuture;
 
 	private SSLConfiguration config;
 
-	public PgBackendProtocol(int maxLobSize, SSLConfiguration config) {
+	public PgBackendProtocol(int maxLobSize, int maxBufferSize, SSLConfiguration config) {
     	this.maxLobSize = maxLobSize;
+    	this.maxBufferSize = maxBufferSize;
     	this.config = config;
     }
     
@@ -246,42 +227,34 @@
 	
 	@Override
 	public void useClearTextAuthentication() {
-		try {
-			sendAuthenticationCleartextPassword();
-		} catch (IOException e) {
-			terminate(e);
-		}
+		sendAuthenticationCleartextPassword();
 	}
 	
 	@Override
 	public void authenticationSucess(int processId, int screctKey) {
-		try {
-			sendAuthenticationOk();
-			// server_version, server_encoding, client_encoding, application_name, 
-			// is_superuser, session_authorization, DateStyle, IntervalStyle, TimeZone, 
-			// integer_datetimes, and standard_conforming_strings. 
-			// (server_encoding, TimeZone, and integer_datetimes were not reported 
-			// by releases before 8.0; standard_conforming_strings was not reported by 
-			// releases before 8.1; IntervalStyle was not reported by releases before 8.4; 
-			// application_name was not reported by releases before 9.0.)
-			
-			sendParameterStatus("client_encoding", PGCharsetConverter.getEncoding(this.encoding));
-			sendParameterStatus("DateStyle", this.props.getProperty("DateStyle", "ISO"));
-			sendParameterStatus("integer_datetimes", "off");
-			sendParameterStatus("is_superuser", "off");
-			sendParameterStatus("server_encoding", "SQL_ASCII");
-			sendParameterStatus("server_version", "8.1.4");
-			sendParameterStatus("session_authorization", this.props.getProperty("user"));
-			sendParameterStatus("standard_conforming_strings", "off");
-			sendParameterStatus("application_name", this.props.getProperty("application_name", "ODBCClient"));
-			
-			// TODO PostgreSQL TimeZone
-			sendParameterStatus("TimeZone", "CET");
-			
-			sendBackendKeyData(processId, screctKey);
-		} catch (IOException e) {
-			terminate(e);
-		}
+		sendAuthenticationOk();
+		// server_version, server_encoding, client_encoding, application_name, 
+		// is_superuser, session_authorization, DateStyle, IntervalStyle, TimeZone, 
+		// integer_datetimes, and standard_conforming_strings. 
+		// (server_encoding, TimeZone, and integer_datetimes were not reported 
+		// by releases before 8.0; standard_conforming_strings was not reported by 
+		// releases before 8.1; IntervalStyle was not reported by releases before 8.4; 
+		// application_name was not reported by releases before 9.0.)
+		
+		sendParameterStatus("client_encoding", PGCharsetConverter.getEncoding(this.encoding));
+		sendParameterStatus("DateStyle", this.props.getProperty("DateStyle", "ISO"));
+		sendParameterStatus("integer_datetimes", "off");
+		sendParameterStatus("is_superuser", "off");
+		sendParameterStatus("server_encoding", "SQL_ASCII");
+		sendParameterStatus("server_version", "8.1.4");
+		sendParameterStatus("session_authorization", this.props.getProperty("user"));
+		sendParameterStatus("standard_conforming_strings", "off");
+		sendParameterStatus("application_name", this.props.getProperty("application_name", "ODBCClient"));
+		
+		// TODO PostgreSQL TimeZone
+		sendParameterStatus("TimeZone", "CET");
+		
+		sendBackendKeyData(processId, screctKey);
 	}
 
 	@Override
@@ -296,29 +269,17 @@
 
 	@Override
 	public void errorOccurred(String msg) {
-		try {
-			sendErrorResponse(msg);
-		} catch (IOException e) {
-			terminate(e);
-		}
+		sendErrorResponse(msg);
 	}
 
 	@Override
 	public void errorOccurred(Throwable t) {
-		try {
-			sendErrorResponse(t);
-		} catch (IOException e) {
-			terminate(e);
-		}
+		sendErrorResponse(t);
 	}
 
 	@Override
 	public void ready(boolean inTransaction, boolean failedTransaction) {
-		try {
-			sendReadyForQuery(inTransaction, failedTransaction);
-		} catch (IOException e) {
-			terminate(e);
-		}
+		sendReadyForQuery(inTransaction, failedTransaction);
 	}
 	
 	public void setEncoding(String value) {
@@ -331,47 +292,35 @@
 	@Override
 	public void sendParameterDescription(ParameterMetaData meta, int[] paramType) {
 		try {
-			try {
-				int count = meta.getParameterCount();
-				startMessage('t');
-				writeShort(count);
-				for (int i = 0; i < count; i++) {
-					int type;
-					if (paramType != null && paramType[i] != 0) {
-						type = paramType[i];
-					} else {
-						type = convertType(meta.getParameterType(i+1));
-					}
-					writeInt(type);
+			int count = meta.getParameterCount();
+			startMessage('t');
+			writeShort(count);
+			for (int i = 0; i < count; i++) {
+				int type;
+				if (paramType != null && paramType[i] != 0) {
+					type = paramType[i];
+				} else {
+					type = convertType(meta.getParameterType(i+1));
 				}
-				sendMessage();
-			} catch (SQLException e) {
-				sendErrorResponse(e);
-			}			
-		} catch (IOException e) {
-			terminate(e);
-		}
+				writeInt(type);
+			}
+			sendMessage();
+		} catch (SQLException e) {
+			sendErrorResponse(e);
+		}			
 	}
 
 	@Override
 	public void sendResultSetDescription(List<PgColInfo> cols) {
-		try {
-			sendRowDescription(cols);
-		} catch (IOException e) {
-			terminate(e);
-		}
+		sendRowDescription(cols);
 	}
 	
 	@Override
 	public void sendCursorResults(ResultSetImpl rs, List<PgColInfo> cols, ResultsFuture<Integer> result, int rowCount) {
-		try {
-        	sendRowDescription(cols);
+    	sendRowDescription(cols);
 
-        	ResultsWorkItem r = new ResultsWorkItem(cols, rs, result, rowCount);
-        	r.run();  	        					
-		} catch (IOException e) {
-			terminate(e);
-		}
+    	ResultsWorkItem r = new ResultsWorkItem(cols, rs, result, rowCount);
+    	r.run();  	        					
 	}
 	
 	@Override
@@ -383,48 +332,36 @@
 	@Override
 	public void sendMoveCursor(ResultSetImpl rs, int rowCount, ResultsFuture<Integer> results) {
 		try {
-			try {
-				int rowsMoved = 0;
-				for (int i = 0; i < rowCount; i++) {
-					if (!rs.next()) {
-						break;
-					}
-					rowsMoved++;
-				}				
-				results.getResultsReceiver().receiveResults(rowsMoved);
-			} catch (SQLException e) {
-				sendErrorResponse(e);
-			}
-		} catch (IOException e) {
-			terminate(e);
+			int rowsMoved = 0;
+			for (int i = 0; i < rowCount; i++) {
+				if (!rs.next()) {
+					break;
+				}
+				rowsMoved++;
+			}				
+			results.getResultsReceiver().receiveResults(rowsMoved);
+		} catch (SQLException e) {
+			sendErrorResponse(e);
 		}
 	}		
 	
 	@Override
 	public void sendResults(final String sql, final ResultSetImpl rs, List<PgColInfo> cols, ResultsFuture<Integer> result, boolean describeRows) {
-		try {
-			if (nextFuture != null) {
-				sendErrorResponse(new IllegalStateException("Pending results have not been sent")); //$NON-NLS-1$
-			}
-        	
-        	if (describeRows) {
-        		sendRowDescription(cols);
-        	}
-        	ResultsWorkItem r = new ResultsWorkItem(cols, rs, result, -1);
-        	r.run();    
-        	sendCommandComplete(sql, 0);
-		} catch (IOException e) {
-			terminate(e);
+		if (nextFuture != null) {
+			sendErrorResponse(new IllegalStateException("Pending results have not been sent")); //$NON-NLS-1$
 		}
+    	
+    	if (describeRows) {
+    		sendRowDescription(cols);
+    	}
+    	ResultsWorkItem r = new ResultsWorkItem(cols, rs, result, -1);
+    	r.run();    
+    	sendCommandComplete(sql, 0);
 	}
 
 	@Override
 	public void sendUpdateCount(String sql, int updateCount) {
-		try {
-			sendCommandComplete(sql, updateCount);
-		} catch (IOException e) {
-			terminate(e);
-		}
+		sendCommandComplete(sql, updateCount);
 	}
 
 	@Override
@@ -435,24 +372,16 @@
 
 	@Override
 	public void terminated() {
-		try {
-			trace("channel being terminated");
-			this.sendNoticeResponse("Connection closed");
-			this.ctx.getChannel().close();
-		} catch (IOException e) {
-			trace(e.getMessage());
-		}
+		trace("channel being terminated");
+		this.sendNoticeResponse("Connection closed");
+		this.ctx.getChannel().close();
 	}
 	
 	@Override
 	public void flush() {
-		try {
-			this.dataOut.flush();
-			this.dataOut = null;
-			Channels.write(this.ctx.getChannel(), null);
-		} catch (IOException e) {
-			terminate(e);
-		}		
+		this.dataOut = null;
+		this.writer = null;
+		Channels.write(this.ctx.getChannel(), null);
 	}
 
 	@Override
@@ -471,7 +400,7 @@
 	}
 
 	@Override
-	public void sendCommandComplete(String sql, int updateCount) throws IOException {
+	public void sendCommandComplete(String sql, int updateCount) {
 		startMessage('C');
 		sql = sql.trim().toUpperCase();
 		// TODO remove remarks at the beginning
@@ -508,32 +437,24 @@
 		sendMessage();
 	}
 
-	private void sendDataRow(ResultSet rs, List<PgColInfo> cols, ChannelBuffer buffer) throws SQLException, IOException {
-		startMessage('D');
+	private void sendDataRow(ResultSet rs, List<PgColInfo> cols) throws SQLException, IOException {
+		startMessage('D', -1);
+		int lengthIndex = this.dataOut.writerIndex() - 4;
 		writeShort(cols.size());
 		for (int i = 0; i < cols.size(); i++) {
-			byte[] bytes = getContent(rs, cols.get(i), i+1);			
-			if (bytes == null) {
-				writeInt(-1);
-			} else {
-				writeInt(bytes.length);
-				write(bytes);
+			int dataBytesIndex = this.dataOut.writerIndex();
+			writeInt(-1);
+			getContent(rs, cols.get(i), i+1);
+			writer.flush();
+			if (!rs.wasNull()) {
+				int bytes = this.dataOut.writerIndex() - dataBytesIndex - 4;
+				this.dataOut.setInt(dataBytesIndex, bytes);
 			}
 		}
-		
-		byte[] buff = outBuffer.toByteArray();
-		int len = buff.length;
-		this.outBuffer = null;
-		this.dataOut = null;
-		
-		// now build the wire contents.
-		buffer.writeByte((byte)this.messageType);
-		buffer.writeInt(len+4);
-		buffer.writeBytes(buff);
+		this.dataOut.setInt(lengthIndex, this.dataOut.writerIndex() - lengthIndex);
 	}
 	
-	private byte[] getContent(ResultSet rs, PgColInfo col, int column) throws SQLException, TeiidSQLException, IOException {
-		byte[] bytes = null;
+	private void getContent(ResultSet rs, PgColInfo col, int column) throws SQLException, TeiidSQLException, IOException {
 		switch (col.type) {
 			case PG_TYPE_BOOL:
 			case PG_TYPE_BPCHAR:
@@ -549,14 +470,19 @@
 		    case PG_TYPE_VARCHAR:
 		    	String value = rs.getString(column);
 		    	if (value != null) {
-		    		bytes = value.getBytes(this.encoding);
+			    	writer.write(value);
 		    	}
 		    	break;
 		    
 		    case PG_TYPE_TEXT:
 		    	Clob clob = rs.getClob(column);
 		    	if (clob != null) {
-		    		bytes = ObjectConverterUtil.convertToByteArray(new ReaderInputStream(clob.getCharacterStream(), this.encoding), this.maxLobSize);
+		    		Reader r = clob.getCharacterStream();
+		    		try {
+		    			ObjectConverterUtil.write(writer, r, this.maxLobSize, false);
+		    		} finally {
+		    			r.close();
+		    		}
 		    	}		        	
 		    	break;
 		    	
@@ -564,7 +490,8 @@
 		    	Blob blob = rs.getBlob(column);
 		    	if (blob != null) {
 		    		try {
-			    		bytes = PGbytea.toPGString(ObjectConverterUtil.convertToByteArray(blob.getBinaryStream(), this.maxLobSize)).getBytes(this.encoding);
+			    		String blobString = PGbytea.toPGString(ObjectConverterUtil.convertToByteArray(blob.getBinaryStream(), this.maxLobSize));
+			    		writer.write(blobString);
 		    		} catch(OutOfMemoryError e) {
 		    			throw new StreamCorruptedException("data too big: " + e.getMessage()); //$NON-NLS-1$ 
 		    		}
@@ -577,25 +504,23 @@
 		    	{
 		    	Object[] obj = (Object[])rs.getObject(column);
 		    	if (obj != null) {
-		    		StringBuilder sb = new StringBuilder();	
-			    	sb.append("{");
+		    		writer.append("{");
 			    	boolean first = true;
 			    	for (Object o:obj) {
 			    		if (!first) {
-			    			sb.append(",");
+			    			writer.append(",");
 			    		}
 			    		else {
 			    			first = false;
 			    		}
 			    		if (col.type == PG_TYPE_TEXTARRAY) {
-			    			escapeQuote(sb, o.toString());
+			    			escapeQuote(writer, o.toString());
 			    		}
 			    		else {
-			    			sb.append(o.toString());
+			    			writer.append(o.toString());
 			    		}
 			    	}
-			    	sb.append("}");
-			    	bytes = sb.toString().getBytes(this.encoding);
+			    	writer.append("}");
 		    	}
 		    	}
 		    	break;
@@ -604,18 +529,16 @@
 		    	{
 		    	Object[] obj = (Object[])rs.getObject(column);
 		    	if (obj != null) {
-		    		StringBuilder sb = new StringBuilder();	
 			    	boolean first = true;
 			    	for (Object o:obj) {
 			    		if (!first) {
-			    			sb.append(" ");
+			    			writer.append(" ");
 			    		}
 			    		else {
 			    			first = false;
 			    		}
-			    		sb.append(o);
+			    		writer.append(o.toString());
 			    	}
-			    	bytes = sb.toString().getBytes(this.encoding);
 		    	}	
 		    	}
 		    	break;
@@ -623,10 +546,9 @@
 		    default:
 		    	throw new TeiidSQLException("unknown datatype failed to convert"); 
 		}
-		return bytes;
 	}
 	
-	public static void escapeQuote(StringBuilder sb, String s) {
+	public static void escapeQuote(Writer sb, String s) throws IOException {
 		sb.append('"');
 		for (int i = 0; i < s.length(); i++) {
 			char c = s.charAt(i);
@@ -649,7 +571,7 @@
 		} catch (GeneralSecurityException e) {
 			LogManager.logError(LogConstants.CTX_ODBC, e, RuntimePlugin.Util.getString("PgBackendProtocol.ssl_error"));
 		}
-		ChannelBuffer buffer = ChannelBuffers.directBuffer(1);
+		ChannelBuffer buffer = ctx.getChannel().getConfig().getBufferFactory().getBuffer(1);
 		if (engine == null) {
 			buffer.writeByte('N');
 		} else {
@@ -659,7 +581,7 @@
 		Channels.write(this.ctx, this.message.getFuture(), buffer, this.message.getRemoteAddress());
 	}
 	
-	private void sendErrorResponse(Throwable t) throws IOException {
+	private void sendErrorResponse(Throwable t) {
 		trace(t.getMessage());
 		SQLException e = TeiidSQLException.create(t);
 		startMessage('E');
@@ -675,7 +597,7 @@
 		sendMessage();
 	}
 	
-	private void sendRowDescription(List<PgColInfo> cols) throws IOException {
+	private void sendRowDescription(List<PgColInfo> cols) {
 		startMessage('T');
 		writeShort(cols.size());
 		for (PgColInfo info : cols) {
@@ -705,7 +627,7 @@
 		}
 	}
 
-	private void sendErrorResponse(String message) throws IOException {
+	private void sendErrorResponse(String message) {
 		trace("Exception:", message);
 		startMessage('E');
 		write('S');
@@ -718,7 +640,7 @@
 		sendMessage();
 	}
 	
-	private void sendNoticeResponse(String message) throws IOException {
+	private void sendNoticeResponse(String message) {
 		trace("notice:", message);
 		startMessage('N');
 		write('S');
@@ -744,19 +666,19 @@
 		sendMessage();
 	}	
 
-	private void sendAuthenticationCleartextPassword() throws IOException {
+	private void sendAuthenticationCleartextPassword() {
 		startMessage('R');
 		writeInt(3);
 		sendMessage();
 	}
 
-	private void sendAuthenticationOk() throws IOException {
+	private void sendAuthenticationOk() {
 		startMessage('R');
 		writeInt(0);
 		sendMessage();
 	}
 
-	private void sendReadyForQuery(boolean inTransaction, boolean failedTransaction) throws IOException {
+	private void sendReadyForQuery(boolean inTransaction, boolean failedTransaction) {
 		startMessage('Z');
 		char c;
 		if (failedTransaction) {
@@ -776,14 +698,14 @@
 		sendMessage();
 	}
 
-	private void sendBackendKeyData(int processId, int screctKey) throws IOException {
+	private void sendBackendKeyData(int processId, int screctKey) {
 		startMessage('K');
 		writeInt(processId);
 		writeInt(screctKey);
 		sendMessage();
 	}
 
-	private void sendParameterStatus(String param, String value)	throws IOException {
+	private void sendParameterStatus(String param, String value) {
 		startMessage('S');
 		writeString(param);
 		writeString(value);
@@ -792,76 +714,74 @@
 	
 	@Override
 	public void functionCallResponse(byte[] data) {
-		try {
-			startMessage('V');
-			if (data == null) {
-				writeInt(-1);
-			}
-			else {
-				writeInt(data.length);
-				write(data);
-			}
-			sendMessage();
-		} catch (IOException e) {
-			terminate(e);
-		}		
+		startMessage('V');
+		if (data == null) {
+			writeInt(-1);
+		}
+		else {
+			writeInt(data.length);
+			write(data);
+		}
+		sendMessage();
 	}
 	
 	@Override
 	public void functionCallResponse(int data) {
-		try {
-			startMessage('V');
-			writeInt(4);
-			writeInt(data);
-			sendMessage();
-		} catch (IOException e) {
-			terminate(e);
-		}		
+		startMessage('V');
+		writeInt(4);
+		writeInt(data);
+		sendMessage();
 	}
 
-	private void writeString(String s) throws IOException {
+	private void writeString(String s) {
 		write(s.getBytes(this.encoding));
 		write(0);
 	}
 
-	private void writeInt(int i) throws IOException {
+	private void writeInt(int i) {
 		dataOut.writeInt(i);
 	}
 
-	private void writeShort(int i) throws IOException {
+	private void writeShort(int i) {
 		dataOut.writeShort(i);
 	}
 
-	private void write(byte[] data) throws IOException {
-		dataOut.write(data);
+	private void write(byte[] data) {
+		dataOut.writeBytes(data);
 	}
 
-	private void write(int b) throws IOException {
-		dataOut.write(b);
+	private void write(int b) {
+		dataOut.writeByte(b);
 	}
 
 	private void startMessage(char newMessageType) {
-		this.messageType = newMessageType;
-		this.outBuffer = new ByteArrayOutputStream();
-		this.dataOut = new DataOutputStream(this.outBuffer);
+		startMessage(newMessageType, 32);
 	}
 
+	private void startMessage(char newMessageType, int estimatedLength) {
+		if (estimatedLength > -1) {
+			initBuffer(estimatedLength);
+		}
+		this.dataOut.writeByte((byte)newMessageType);
+		this.dataOut.writerIndex(this.dataOut.writerIndex() + 4);
+	}
+
+	private void initBuffer(int estimatedLength) {
+		this.dataOut = ChannelBuffers.dynamicBuffer(estimatedLength);
+		ChannelBufferOutputStream cbos = new ChannelBufferOutputStream(this.dataOut);
+		this.writer = new OutputStreamWriter(cbos, this.encoding);
+	}
+
 	private void sendMessage() {
-		byte[] buff = outBuffer.toByteArray();
-		int len = buff.length;
-		this.outBuffer = null;
-		this.dataOut = null;
-		
-		// now build the wire contents.
-		ChannelBuffer buffer = ChannelBuffers.directBuffer(len+5);
-		buffer.writeByte((byte)this.messageType);
-		buffer.writeInt(len+4);
-		buffer.writeBytes(buff);
-		Channels.write(this.ctx, this.message.getFuture(), buffer, this.message.getRemoteAddress());
+		int pos = this.dataOut.writerIndex();
+		this.dataOut.setInt(1, pos - 1);
+		sendContents();
 	}
 	
-	private void sendContents(ChannelBuffer buffer) {
-		Channels.write(this.ctx, this.message.getFuture(), buffer, this.message.getRemoteAddress());
+	private void sendContents() {
+		ChannelBuffer cb = this.dataOut;
+		this.dataOut = null;
+		Channels.write(this.ctx, this.message.getFuture(), cb, this.message.getRemoteAddress());
 	}
 
 	private static void trace(String... msg) {

Modified: branches/7.4.x/test-integration/common/src/test/java/org/teiid/transport/TestODBCSocketTransport.java
===================================================================
--- branches/7.4.x/test-integration/common/src/test/java/org/teiid/transport/TestODBCSocketTransport.java	2011-08-17 01:23:42 UTC (rev 3388)
+++ branches/7.4.x/test-integration/common/src/test/java/org/teiid/transport/TestODBCSocketTransport.java	2011-08-17 14:23:34 UTC (rev 3389)
@@ -128,7 +128,7 @@
 			config.setBindAddress(addr.getHostName());
 			config.setPortNumber(0);
 			odbcTransport = new ODBCSocketListener(config, BufferManagerFactory.getStandaloneBufferManager(), 0, 100000);
-			
+			odbcTransport.setMaxBufferSize(100); //set to a small size to ensure buffering over the limit works
 			FakeServer server = new FakeServer();
 			server.setUseCallingThread(false);
 			server.deployVDB("parts", UnitTestUtil.getTestDataPath() + "/PartsSupplier.vdb");



More information about the teiid-commits mailing list