Author: shawkins
Date: 2011-08-17 10:23:34 -0400 (Wed, 17 Aug 2011)
New Revision: 3389
Modified:
branches/7.4.x/common-core/src/main/java/org/teiid/core/util/ObjectConverterUtil.java
branches/7.4.x/documentation/admin-guide/src/main/docbook/en-US/content/appendix-c.xml
branches/7.4.x/runtime/src/main/java/org/teiid/transport/ODBCSocketListener.java
branches/7.4.x/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java
branches/7.4.x/test-integration/common/src/test/java/org/teiid/transport/TestODBCSocketTransport.java
Log:
TEIID-1715 simplifying odbc buffering logic
Modified:
branches/7.4.x/common-core/src/main/java/org/teiid/core/util/ObjectConverterUtil.java
===================================================================
---
branches/7.4.x/common-core/src/main/java/org/teiid/core/util/ObjectConverterUtil.java 2011-08-17
01:23:42 UTC (rev 3388)
+++
branches/7.4.x/common-core/src/main/java/org/teiid/core/util/ObjectConverterUtil.java 2011-08-17
14:23:34 UTC (rev 3389)
@@ -118,6 +118,7 @@
while ((l_nbytes = is.read(l_buffer, 0, readLength)) != -1) {
if (length != -1 && writen > length - l_nbytes) {
out.write(l_buffer, 0, writen + l_nbytes - length);
+ writen = length;
break;
}
out.write(l_buffer,0,l_nbytes);
@@ -143,7 +144,7 @@
return write(out, is, new byte[DEFAULT_READING_SIZE], length, close); // buffer
holding bytes to be transferred
}
- public static void write(final Writer out, final Reader is, int length) throws
IOException {
+ public static int write(final Writer out, final Reader is, int length, boolean close)
throws IOException {
int writen = 0;
try {
char[] l_buffer = new char[DEFAULT_READING_SIZE]; // buffer holding bytes to be
transferred
@@ -151,16 +152,20 @@
while ((l_nbytes = is.read(l_buffer)) != -1) {
if (length != -1 && writen > length - l_nbytes) {
out.write(l_buffer, 0, writen + l_nbytes - length);
+ writen = length;
break;
}
out.write(l_buffer,0,l_nbytes);
writen += l_nbytes;
}
+ return writen;
} finally {
- try {
- is.close();
- } finally {
- out.close();
+ if (close) {
+ try {
+ is.close();
+ } finally {
+ out.close();
+ }
}
}
}
@@ -192,7 +197,7 @@
public static void write(final Reader reader, final File f) throws IOException {
f.getParentFile().mkdirs();
FileWriter fw = new FileWriter(f);
- write(fw, reader, -1);
+ write(fw, reader, -1, true);
}
public static void write(final InputStream is, final File f) throws IOException {
@@ -290,7 +295,7 @@
public static char[] convertToCharArray(Reader reader, int length) throws IOException
{
StringWriter sb = new StringWriter();
- write(sb, reader, length);
+ write(sb, reader, length, true);
return sb.toString().toCharArray();
}
Modified:
branches/7.4.x/documentation/admin-guide/src/main/docbook/en-US/content/appendix-c.xml
===================================================================
---
branches/7.4.x/documentation/admin-guide/src/main/docbook/en-US/content/appendix-c.xml 2011-08-17
01:23:42 UTC (rev 3388)
+++
branches/7.4.x/documentation/admin-guide/src/main/docbook/en-US/content/appendix-c.xml 2011-08-17
14:23:34 UTC (rev 3389)
@@ -30,5 +30,10 @@
If a traditional join is not possible (such as with NOT IN) a merge join version of
the semijoin or antijoin will be considered by upon the costing information available.
</para>
</listitem>
+ <listitem>
+ <para><emphasis>org.teiid.ODBCPacketSize</emphasis> - defaults to
307200.
+ Target size in bytes of the ODBC results buffer. This is not a hard maximum, lobs and
wide rows may use larger buffers.
+ </para>
+ </listitem>
</itemizedlist>
</appendix>
\ No newline at end of file
Modified:
branches/7.4.x/runtime/src/main/java/org/teiid/transport/ODBCSocketListener.java
===================================================================
---
branches/7.4.x/runtime/src/main/java/org/teiid/transport/ODBCSocketListener.java 2011-08-17
01:23:42 UTC (rev 3388)
+++
branches/7.4.x/runtime/src/main/java/org/teiid/transport/ODBCSocketListener.java 2011-08-17
14:23:34 UTC (rev 3389)
@@ -34,6 +34,8 @@
import org.teiid.odbc.ODBCServerRemote;
public class ODBCSocketListener extends SocketListener {
+
+ private int maxBufferSize =
Integer.parseInt(System.getProperty("org.teiid.ODBCPacketSize",
"307200")); //$NON-NLS-1$ //$NON-NLS-2$
private ODBCServerRemote.AuthenticationType authType =
ODBCServerRemote.AuthenticationType.CLEARTEXT;
private int maxLobSize;
private TeiidDriver driver;
@@ -56,6 +58,10 @@
public void setDriver(TeiidDriver driver) {
this.driver = driver;
}
+
+ public void setMaxBufferSize(int maxBufferSize) {
+ this.maxBufferSize = maxBufferSize;
+ }
@Override
protected SSLAwareChannelHandler createChannelPipelineFactory(final SSLConfiguration
config, final StorageManager storageManager) {
@@ -64,7 +70,7 @@
ChannelPipeline pipeline = new DefaultChannelPipeline();
pipeline.addLast("odbcFrontendProtocol", new PgFrontendProtocol(1
<< 20)); //$NON-NLS-1$
- pipeline.addLast("odbcBackendProtocol", new
PgBackendProtocol(maxLobSize, config)); //$NON-NLS-1$
+ pipeline.addLast("odbcBackendProtocol", new
PgBackendProtocol(maxLobSize, maxBufferSize, config)); //$NON-NLS-1$
pipeline.addLast("handler", this); //$NON-NLS-1$
return pipeline;
}
Modified: branches/7.4.x/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java
===================================================================
---
branches/7.4.x/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java 2011-08-17
01:23:42 UTC (rev 3388)
+++
branches/7.4.x/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java 2011-08-17
14:23:34 UTC (rev 3389)
@@ -1,4 +1,3 @@
-
/*
* JBoss, Home of Professional Open Source.
* See the COPYRIGHT.txt file distributed with this work for information
@@ -21,30 +20,13 @@
* 02110-1301 USA.
*/package org.teiid.transport;
-import static org.teiid.odbc.PGUtil.PG_TYPE_BOOL;
-import static org.teiid.odbc.PGUtil.PG_TYPE_BPCHAR;
-import static org.teiid.odbc.PGUtil.PG_TYPE_BYTEA;
-import static org.teiid.odbc.PGUtil.PG_TYPE_CHARARRAY;
-import static org.teiid.odbc.PGUtil.PG_TYPE_DATE;
-import static org.teiid.odbc.PGUtil.PG_TYPE_FLOAT4;
-import static org.teiid.odbc.PGUtil.PG_TYPE_FLOAT8;
-import static org.teiid.odbc.PGUtil.PG_TYPE_INT2;
-import static org.teiid.odbc.PGUtil.PG_TYPE_INT4;
-import static org.teiid.odbc.PGUtil.PG_TYPE_INT8;
-import static org.teiid.odbc.PGUtil.PG_TYPE_NUMERIC;
-import static org.teiid.odbc.PGUtil.PG_TYPE_OIDARRAY;
-import static org.teiid.odbc.PGUtil.PG_TYPE_OIDVECTOR;
-import static org.teiid.odbc.PGUtil.PG_TYPE_TEXT;
-import static org.teiid.odbc.PGUtil.PG_TYPE_TEXTARRAY;
-import static org.teiid.odbc.PGUtil.PG_TYPE_TIME;
-import static org.teiid.odbc.PGUtil.PG_TYPE_TIMESTAMP_NO_TMZONE;
-import static org.teiid.odbc.PGUtil.PG_TYPE_UNKNOWN;
-import static org.teiid.odbc.PGUtil.PG_TYPE_VARCHAR;
+import static org.teiid.odbc.PGUtil.*;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Reader;
import java.io.StreamCorruptedException;
+import java.io.Writer;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.charset.Charset;
@@ -61,6 +43,7 @@
import javax.net.ssl.SSLEngine;
import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.ChannelDownstreamHandler;
import org.jboss.netty.channel.ChannelEvent;
@@ -72,7 +55,6 @@
import org.jboss.netty.handler.ssl.SslHandler;
import org.teiid.client.util.ResultsFuture;
import org.teiid.core.util.ObjectConverterUtil;
-import org.teiid.core.util.ReaderInputStream;
import org.teiid.core.util.ReflectionHelper;
import org.teiid.jdbc.ResultSetImpl;
import org.teiid.jdbc.TeiidSQLException;
@@ -108,9 +90,6 @@
}
}
- // 300k
- static int ODBC_SOCKET_BUFF_SIZE =
Integer.parseInt(System.getProperty("ODBCPacketSize", "307200"));
-
private final class ResultsWorkItem implements Runnable {
private final List<PgColInfo> cols;
private final ResultSetImpl rs;
@@ -118,13 +97,13 @@
private int rows2Send;
private int rowsSent = 0;
private int rowsInBuffer = 0;
- private ChannelBuffer buffer = ChannelBuffers.directBuffer(ODBC_SOCKET_BUFF_SIZE);
private ResultsWorkItem(List<PgColInfo> cols, ResultSetImpl rs,
ResultsFuture<Integer> result, int rows2Send) {
this.cols = cols;
this.rs = rs;
this.result = result;
this.rows2Send = rows2Send;
+ initBuffer(maxBufferSize / 8);
}
@Override
@@ -160,7 +139,7 @@
boolean processNext = true;
try {
if (future.get()) {
- sendDataRow(rs, cols, buffer);
+ sendDataRow(rs, cols);
rowsSent++;
rowsInBuffer++;
boolean done = rowsSent == rows2Send;
@@ -170,7 +149,7 @@
result.getResultsReceiver().receiveResults(rowsSent);
}
} else {
- sendContents(buffer);
+ sendContents();
result.getResultsReceiver().receiveResults(rowsSent);
processNext = false;
}
@@ -182,31 +161,33 @@
}
private void flushResults(boolean force) {
- int avgRowsize = buffer.readableBytes()/rowsInBuffer;
- if (force || buffer.writableBytes() < (avgRowsize*2)) {
- sendContents(buffer);
- buffer= ChannelBuffers.directBuffer(ODBC_SOCKET_BUFF_SIZE);
+ int avgRowsize = dataOut.writerIndex()/rowsInBuffer;
+ if (force || (maxBufferSize - dataOut.writerIndex()) < (avgRowsize*2)) {
+ sendContents();
+ initBuffer(maxBufferSize / 8);
rowsInBuffer = 0;
}
}
}
- private DataOutputStream dataOut;
- private ByteArrayOutputStream outBuffer;
- private char messageType;
+ private ChannelBuffer dataOut;
+ private OutputStreamWriter writer;
+
private Properties props;
private Charset encoding = Charset.forName("UTF-8");
private ReflectionHelper clientProxy = new ReflectionHelper(ODBCClientRemote.class);
private ChannelHandlerContext ctx;
private MessageEvent message;
private int maxLobSize = (2*1024*1024); // 2 MB
+ private final int maxBufferSize;
private volatile ResultsFuture<Boolean> nextFuture;
private SSLConfiguration config;
- public PgBackendProtocol(int maxLobSize, SSLConfiguration config) {
+ public PgBackendProtocol(int maxLobSize, int maxBufferSize, SSLConfiguration config) {
this.maxLobSize = maxLobSize;
+ this.maxBufferSize = maxBufferSize;
this.config = config;
}
@@ -246,42 +227,34 @@
@Override
public void useClearTextAuthentication() {
- try {
- sendAuthenticationCleartextPassword();
- } catch (IOException e) {
- terminate(e);
- }
+ sendAuthenticationCleartextPassword();
}
@Override
public void authenticationSucess(int processId, int screctKey) {
- try {
- sendAuthenticationOk();
- // server_version, server_encoding, client_encoding, application_name,
- // is_superuser, session_authorization, DateStyle, IntervalStyle, TimeZone,
- // integer_datetimes, and standard_conforming_strings.
- // (server_encoding, TimeZone, and integer_datetimes were not reported
- // by releases before 8.0; standard_conforming_strings was not reported by
- // releases before 8.1; IntervalStyle was not reported by releases before 8.4;
- // application_name was not reported by releases before 9.0.)
-
- sendParameterStatus("client_encoding",
PGCharsetConverter.getEncoding(this.encoding));
- sendParameterStatus("DateStyle",
this.props.getProperty("DateStyle", "ISO"));
- sendParameterStatus("integer_datetimes", "off");
- sendParameterStatus("is_superuser", "off");
- sendParameterStatus("server_encoding", "SQL_ASCII");
- sendParameterStatus("server_version", "8.1.4");
- sendParameterStatus("session_authorization",
this.props.getProperty("user"));
- sendParameterStatus("standard_conforming_strings", "off");
- sendParameterStatus("application_name",
this.props.getProperty("application_name", "ODBCClient"));
-
- // TODO PostgreSQL TimeZone
- sendParameterStatus("TimeZone", "CET");
-
- sendBackendKeyData(processId, screctKey);
- } catch (IOException e) {
- terminate(e);
- }
+ sendAuthenticationOk();
+ // server_version, server_encoding, client_encoding, application_name,
+ // is_superuser, session_authorization, DateStyle, IntervalStyle, TimeZone,
+ // integer_datetimes, and standard_conforming_strings.
+ // (server_encoding, TimeZone, and integer_datetimes were not reported
+ // by releases before 8.0; standard_conforming_strings was not reported by
+ // releases before 8.1; IntervalStyle was not reported by releases before 8.4;
+ // application_name was not reported by releases before 9.0.)
+
+ sendParameterStatus("client_encoding",
PGCharsetConverter.getEncoding(this.encoding));
+ sendParameterStatus("DateStyle",
this.props.getProperty("DateStyle", "ISO"));
+ sendParameterStatus("integer_datetimes", "off");
+ sendParameterStatus("is_superuser", "off");
+ sendParameterStatus("server_encoding", "SQL_ASCII");
+ sendParameterStatus("server_version", "8.1.4");
+ sendParameterStatus("session_authorization",
this.props.getProperty("user"));
+ sendParameterStatus("standard_conforming_strings", "off");
+ sendParameterStatus("application_name",
this.props.getProperty("application_name", "ODBCClient"));
+
+ // TODO PostgreSQL TimeZone
+ sendParameterStatus("TimeZone", "CET");
+
+ sendBackendKeyData(processId, screctKey);
}
@Override
@@ -296,29 +269,17 @@
@Override
public void errorOccurred(String msg) {
- try {
- sendErrorResponse(msg);
- } catch (IOException e) {
- terminate(e);
- }
+ sendErrorResponse(msg);
}
@Override
public void errorOccurred(Throwable t) {
- try {
- sendErrorResponse(t);
- } catch (IOException e) {
- terminate(e);
- }
+ sendErrorResponse(t);
}
@Override
public void ready(boolean inTransaction, boolean failedTransaction) {
- try {
- sendReadyForQuery(inTransaction, failedTransaction);
- } catch (IOException e) {
- terminate(e);
- }
+ sendReadyForQuery(inTransaction, failedTransaction);
}
public void setEncoding(String value) {
@@ -331,47 +292,35 @@
@Override
public void sendParameterDescription(ParameterMetaData meta, int[] paramType) {
try {
- try {
- int count = meta.getParameterCount();
- startMessage('t');
- writeShort(count);
- for (int i = 0; i < count; i++) {
- int type;
- if (paramType != null && paramType[i] != 0) {
- type = paramType[i];
- } else {
- type = convertType(meta.getParameterType(i+1));
- }
- writeInt(type);
+ int count = meta.getParameterCount();
+ startMessage('t');
+ writeShort(count);
+ for (int i = 0; i < count; i++) {
+ int type;
+ if (paramType != null && paramType[i] != 0) {
+ type = paramType[i];
+ } else {
+ type = convertType(meta.getParameterType(i+1));
}
- sendMessage();
- } catch (SQLException e) {
- sendErrorResponse(e);
- }
- } catch (IOException e) {
- terminate(e);
- }
+ writeInt(type);
+ }
+ sendMessage();
+ } catch (SQLException e) {
+ sendErrorResponse(e);
+ }
}
@Override
public void sendResultSetDescription(List<PgColInfo> cols) {
- try {
- sendRowDescription(cols);
- } catch (IOException e) {
- terminate(e);
- }
+ sendRowDescription(cols);
}
@Override
public void sendCursorResults(ResultSetImpl rs, List<PgColInfo> cols,
ResultsFuture<Integer> result, int rowCount) {
- try {
- sendRowDescription(cols);
+ sendRowDescription(cols);
- ResultsWorkItem r = new ResultsWorkItem(cols, rs, result, rowCount);
- r.run();
- } catch (IOException e) {
- terminate(e);
- }
+ ResultsWorkItem r = new ResultsWorkItem(cols, rs, result, rowCount);
+ r.run();
}
@Override
@@ -383,48 +332,36 @@
@Override
public void sendMoveCursor(ResultSetImpl rs, int rowCount, ResultsFuture<Integer>
results) {
try {
- try {
- int rowsMoved = 0;
- for (int i = 0; i < rowCount; i++) {
- if (!rs.next()) {
- break;
- }
- rowsMoved++;
- }
- results.getResultsReceiver().receiveResults(rowsMoved);
- } catch (SQLException e) {
- sendErrorResponse(e);
- }
- } catch (IOException e) {
- terminate(e);
+ int rowsMoved = 0;
+ for (int i = 0; i < rowCount; i++) {
+ if (!rs.next()) {
+ break;
+ }
+ rowsMoved++;
+ }
+ results.getResultsReceiver().receiveResults(rowsMoved);
+ } catch (SQLException e) {
+ sendErrorResponse(e);
}
}
@Override
public void sendResults(final String sql, final ResultSetImpl rs, List<PgColInfo>
cols, ResultsFuture<Integer> result, boolean describeRows) {
- try {
- if (nextFuture != null) {
- sendErrorResponse(new IllegalStateException("Pending results have not been
sent")); //$NON-NLS-1$
- }
-
- if (describeRows) {
- sendRowDescription(cols);
- }
- ResultsWorkItem r = new ResultsWorkItem(cols, rs, result, -1);
- r.run();
- sendCommandComplete(sql, 0);
- } catch (IOException e) {
- terminate(e);
+ if (nextFuture != null) {
+ sendErrorResponse(new IllegalStateException("Pending results have not been
sent")); //$NON-NLS-1$
}
+
+ if (describeRows) {
+ sendRowDescription(cols);
+ }
+ ResultsWorkItem r = new ResultsWorkItem(cols, rs, result, -1);
+ r.run();
+ sendCommandComplete(sql, 0);
}
@Override
public void sendUpdateCount(String sql, int updateCount) {
- try {
- sendCommandComplete(sql, updateCount);
- } catch (IOException e) {
- terminate(e);
- }
+ sendCommandComplete(sql, updateCount);
}
@Override
@@ -435,24 +372,16 @@
@Override
public void terminated() {
- try {
- trace("channel being terminated");
- this.sendNoticeResponse("Connection closed");
- this.ctx.getChannel().close();
- } catch (IOException e) {
- trace(e.getMessage());
- }
+ trace("channel being terminated");
+ this.sendNoticeResponse("Connection closed");
+ this.ctx.getChannel().close();
}
@Override
public void flush() {
- try {
- this.dataOut.flush();
- this.dataOut = null;
- Channels.write(this.ctx.getChannel(), null);
- } catch (IOException e) {
- terminate(e);
- }
+ this.dataOut = null;
+ this.writer = null;
+ Channels.write(this.ctx.getChannel(), null);
}
@Override
@@ -471,7 +400,7 @@
}
@Override
- public void sendCommandComplete(String sql, int updateCount) throws IOException {
+ public void sendCommandComplete(String sql, int updateCount) {
startMessage('C');
sql = sql.trim().toUpperCase();
// TODO remove remarks at the beginning
@@ -508,32 +437,24 @@
sendMessage();
}
- private void sendDataRow(ResultSet rs, List<PgColInfo> cols, ChannelBuffer buffer)
throws SQLException, IOException {
- startMessage('D');
+ private void sendDataRow(ResultSet rs, List<PgColInfo> cols) throws SQLException,
IOException {
+ startMessage('D', -1);
+ int lengthIndex = this.dataOut.writerIndex() - 4;
writeShort(cols.size());
for (int i = 0; i < cols.size(); i++) {
- byte[] bytes = getContent(rs, cols.get(i), i+1);
- if (bytes == null) {
- writeInt(-1);
- } else {
- writeInt(bytes.length);
- write(bytes);
+ int dataBytesIndex = this.dataOut.writerIndex();
+ writeInt(-1);
+ getContent(rs, cols.get(i), i+1);
+ writer.flush();
+ if (!rs.wasNull()) {
+ int bytes = this.dataOut.writerIndex() - dataBytesIndex - 4;
+ this.dataOut.setInt(dataBytesIndex, bytes);
}
}
-
- byte[] buff = outBuffer.toByteArray();
- int len = buff.length;
- this.outBuffer = null;
- this.dataOut = null;
-
- // now build the wire contents.
- buffer.writeByte((byte)this.messageType);
- buffer.writeInt(len+4);
- buffer.writeBytes(buff);
+ this.dataOut.setInt(lengthIndex, this.dataOut.writerIndex() - lengthIndex);
}
- private byte[] getContent(ResultSet rs, PgColInfo col, int column) throws SQLException,
TeiidSQLException, IOException {
- byte[] bytes = null;
+ private void getContent(ResultSet rs, PgColInfo col, int column) throws SQLException,
TeiidSQLException, IOException {
switch (col.type) {
case PG_TYPE_BOOL:
case PG_TYPE_BPCHAR:
@@ -549,14 +470,19 @@
case PG_TYPE_VARCHAR:
String value = rs.getString(column);
if (value != null) {
- bytes = value.getBytes(this.encoding);
+ writer.write(value);
}
break;
case PG_TYPE_TEXT:
Clob clob = rs.getClob(column);
if (clob != null) {
- bytes = ObjectConverterUtil.convertToByteArray(new
ReaderInputStream(clob.getCharacterStream(), this.encoding), this.maxLobSize);
+ Reader r = clob.getCharacterStream();
+ try {
+ ObjectConverterUtil.write(writer, r, this.maxLobSize, false);
+ } finally {
+ r.close();
+ }
}
break;
@@ -564,7 +490,8 @@
Blob blob = rs.getBlob(column);
if (blob != null) {
try {
- bytes =
PGbytea.toPGString(ObjectConverterUtil.convertToByteArray(blob.getBinaryStream(),
this.maxLobSize)).getBytes(this.encoding);
+ String blobString =
PGbytea.toPGString(ObjectConverterUtil.convertToByteArray(blob.getBinaryStream(),
this.maxLobSize));
+ writer.write(blobString);
} catch(OutOfMemoryError e) {
throw new StreamCorruptedException("data too big: " + e.getMessage());
//$NON-NLS-1$
}
@@ -577,25 +504,23 @@
{
Object[] obj = (Object[])rs.getObject(column);
if (obj != null) {
- StringBuilder sb = new StringBuilder();
- sb.append("{");
+ writer.append("{");
boolean first = true;
for (Object o:obj) {
if (!first) {
- sb.append(",");
+ writer.append(",");
}
else {
first = false;
}
if (col.type == PG_TYPE_TEXTARRAY) {
- escapeQuote(sb, o.toString());
+ escapeQuote(writer, o.toString());
}
else {
- sb.append(o.toString());
+ writer.append(o.toString());
}
}
- sb.append("}");
- bytes = sb.toString().getBytes(this.encoding);
+ writer.append("}");
}
}
break;
@@ -604,18 +529,16 @@
{
Object[] obj = (Object[])rs.getObject(column);
if (obj != null) {
- StringBuilder sb = new StringBuilder();
boolean first = true;
for (Object o:obj) {
if (!first) {
- sb.append(" ");
+ writer.append(" ");
}
else {
first = false;
}
- sb.append(o);
+ writer.append(o.toString());
}
- bytes = sb.toString().getBytes(this.encoding);
}
}
break;
@@ -623,10 +546,9 @@
default:
throw new TeiidSQLException("unknown datatype failed to convert");
}
- return bytes;
}
- public static void escapeQuote(StringBuilder sb, String s) {
+ public static void escapeQuote(Writer sb, String s) throws IOException {
sb.append('"');
for (int i = 0; i < s.length(); i++) {
char c = s.charAt(i);
@@ -649,7 +571,7 @@
} catch (GeneralSecurityException e) {
LogManager.logError(LogConstants.CTX_ODBC, e,
RuntimePlugin.Util.getString("PgBackendProtocol.ssl_error"));
}
- ChannelBuffer buffer = ChannelBuffers.directBuffer(1);
+ ChannelBuffer buffer = ctx.getChannel().getConfig().getBufferFactory().getBuffer(1);
if (engine == null) {
buffer.writeByte('N');
} else {
@@ -659,7 +581,7 @@
Channels.write(this.ctx, this.message.getFuture(), buffer,
this.message.getRemoteAddress());
}
- private void sendErrorResponse(Throwable t) throws IOException {
+ private void sendErrorResponse(Throwable t) {
trace(t.getMessage());
SQLException e = TeiidSQLException.create(t);
startMessage('E');
@@ -675,7 +597,7 @@
sendMessage();
}
- private void sendRowDescription(List<PgColInfo> cols) throws IOException {
+ private void sendRowDescription(List<PgColInfo> cols) {
startMessage('T');
writeShort(cols.size());
for (PgColInfo info : cols) {
@@ -705,7 +627,7 @@
}
}
- private void sendErrorResponse(String message) throws IOException {
+ private void sendErrorResponse(String message) {
trace("Exception:", message);
startMessage('E');
write('S');
@@ -718,7 +640,7 @@
sendMessage();
}
- private void sendNoticeResponse(String message) throws IOException {
+ private void sendNoticeResponse(String message) {
trace("notice:", message);
startMessage('N');
write('S');
@@ -744,19 +666,19 @@
sendMessage();
}
- private void sendAuthenticationCleartextPassword() throws IOException {
+ private void sendAuthenticationCleartextPassword() {
startMessage('R');
writeInt(3);
sendMessage();
}
- private void sendAuthenticationOk() throws IOException {
+ private void sendAuthenticationOk() {
startMessage('R');
writeInt(0);
sendMessage();
}
- private void sendReadyForQuery(boolean inTransaction, boolean failedTransaction) throws
IOException {
+ private void sendReadyForQuery(boolean inTransaction, boolean failedTransaction) {
startMessage('Z');
char c;
if (failedTransaction) {
@@ -776,14 +698,14 @@
sendMessage();
}
- private void sendBackendKeyData(int processId, int screctKey) throws IOException {
+ private void sendBackendKeyData(int processId, int screctKey) {
startMessage('K');
writeInt(processId);
writeInt(screctKey);
sendMessage();
}
- private void sendParameterStatus(String param, String value) throws IOException {
+ private void sendParameterStatus(String param, String value) {
startMessage('S');
writeString(param);
writeString(value);
@@ -792,76 +714,74 @@
@Override
public void functionCallResponse(byte[] data) {
- try {
- startMessage('V');
- if (data == null) {
- writeInt(-1);
- }
- else {
- writeInt(data.length);
- write(data);
- }
- sendMessage();
- } catch (IOException e) {
- terminate(e);
- }
+ startMessage('V');
+ if (data == null) {
+ writeInt(-1);
+ }
+ else {
+ writeInt(data.length);
+ write(data);
+ }
+ sendMessage();
}
@Override
public void functionCallResponse(int data) {
- try {
- startMessage('V');
- writeInt(4);
- writeInt(data);
- sendMessage();
- } catch (IOException e) {
- terminate(e);
- }
+ startMessage('V');
+ writeInt(4);
+ writeInt(data);
+ sendMessage();
}
- private void writeString(String s) throws IOException {
+ private void writeString(String s) {
write(s.getBytes(this.encoding));
write(0);
}
- private void writeInt(int i) throws IOException {
+ private void writeInt(int i) {
dataOut.writeInt(i);
}
- private void writeShort(int i) throws IOException {
+ private void writeShort(int i) {
dataOut.writeShort(i);
}
- private void write(byte[] data) throws IOException {
- dataOut.write(data);
+ private void write(byte[] data) {
+ dataOut.writeBytes(data);
}
- private void write(int b) throws IOException {
- dataOut.write(b);
+ private void write(int b) {
+ dataOut.writeByte(b);
}
private void startMessage(char newMessageType) {
- this.messageType = newMessageType;
- this.outBuffer = new ByteArrayOutputStream();
- this.dataOut = new DataOutputStream(this.outBuffer);
+ startMessage(newMessageType, 32);
}
+ private void startMessage(char newMessageType, int estimatedLength) {
+ if (estimatedLength > -1) {
+ initBuffer(estimatedLength);
+ }
+ this.dataOut.writeByte((byte)newMessageType);
+ this.dataOut.writerIndex(this.dataOut.writerIndex() + 4);
+ }
+
+ private void initBuffer(int estimatedLength) {
+ this.dataOut = ChannelBuffers.dynamicBuffer(estimatedLength);
+ ChannelBufferOutputStream cbos = new ChannelBufferOutputStream(this.dataOut);
+ this.writer = new OutputStreamWriter(cbos, this.encoding);
+ }
+
private void sendMessage() {
- byte[] buff = outBuffer.toByteArray();
- int len = buff.length;
- this.outBuffer = null;
- this.dataOut = null;
-
- // now build the wire contents.
- ChannelBuffer buffer = ChannelBuffers.directBuffer(len+5);
- buffer.writeByte((byte)this.messageType);
- buffer.writeInt(len+4);
- buffer.writeBytes(buff);
- Channels.write(this.ctx, this.message.getFuture(), buffer,
this.message.getRemoteAddress());
+ int pos = this.dataOut.writerIndex();
+ this.dataOut.setInt(1, pos - 1);
+ sendContents();
}
- private void sendContents(ChannelBuffer buffer) {
- Channels.write(this.ctx, this.message.getFuture(), buffer,
this.message.getRemoteAddress());
+ private void sendContents() {
+ ChannelBuffer cb = this.dataOut;
+ this.dataOut = null;
+ Channels.write(this.ctx, this.message.getFuture(), cb,
this.message.getRemoteAddress());
}
private static void trace(String... msg) {
Modified:
branches/7.4.x/test-integration/common/src/test/java/org/teiid/transport/TestODBCSocketTransport.java
===================================================================
---
branches/7.4.x/test-integration/common/src/test/java/org/teiid/transport/TestODBCSocketTransport.java 2011-08-17
01:23:42 UTC (rev 3388)
+++
branches/7.4.x/test-integration/common/src/test/java/org/teiid/transport/TestODBCSocketTransport.java 2011-08-17
14:23:34 UTC (rev 3389)
@@ -128,7 +128,7 @@
config.setBindAddress(addr.getHostName());
config.setPortNumber(0);
odbcTransport = new ODBCSocketListener(config,
BufferManagerFactory.getStandaloneBufferManager(), 0, 100000);
-
+ odbcTransport.setMaxBufferSize(100); //set to a small size to ensure buffering over
the limit works
FakeServer server = new FakeServer();
server.setUseCallingThread(false);
server.deployVDB("parts", UnitTestUtil.getTestDataPath() +
"/PartsSupplier.vdb");