[teiid-commits] teiid SVN: r3482 - in branches/7.1.x/runtime/src/main/java/org/teiid: transport and 1 other directory.
teiid-commits at lists.jboss.org
teiid-commits at lists.jboss.org
Tue Sep 13 19:19:51 EDT 2011
Author: loleary
Date: 2011-09-13 19:19:51 -0400 (Tue, 13 Sep 2011)
New Revision: 3482
Modified:
branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCClientRemote.java
branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemote.java
branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java
branches/7.1.x/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java
Log:
TEIID-1652, TEIID-1653
Modified: branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCClientRemote.java
===================================================================
--- branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCClientRemote.java 2011-09-13 23:19:40 UTC (rev 3481)
+++ branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCClientRemote.java 2011-09-13 23:19:51 UTC (rev 3482)
@@ -21,6 +21,7 @@
*/
package org.teiid.odbc;
+import java.io.IOException;
import java.nio.charset.Charset;
import java.sql.ParameterMetaData;
import java.sql.ResultSet;
@@ -65,7 +66,16 @@
// DataRow (B)
// CommandComplete (B)
+ // if count = -1 send all
void sendResults(String sql, ResultSet rs, boolean describeRows);
+
+ void sendCursorResults(ResultSet rs, int rowCount);
+
+ void sendPortalResults(String sql, ResultSet rs, int rowCount, boolean portal);
+
+ void sendMoveCursor(ResultSet rs, int rowCount);
+
+ void sendCommandComplete(String sql, int updateCount) throws IOException;
// CommandComplete (B)
void sendUpdateCount(String sql, int updateCount);
@@ -100,9 +110,4 @@
// NoticeResponse (B)
// NotificationResponse (B)
-
- // PortalSuspended (B)
-
-
-
}
Modified: branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemote.java
===================================================================
--- branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemote.java 2011-09-13 23:19:40 UTC (rev 3481)
+++ branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemote.java 2011-09-13 23:19:51 UTC (rev 3482)
@@ -55,6 +55,11 @@
void unsupportedOperation(String msg);
void flush();
+
+ void cursorExecute(String prepareName, String sql);
+ void cursorFetch(String prepareName, int rows);
+ void cursorMove(String prepareName, int rows);
+ void cursorClose(String prepareName);
// unimplemented frontend messages
// CopyData (F & B)
Modified: branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java
===================================================================
--- branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java 2011-09-13 23:19:40 UTC (rev 3481)
+++ branches/7.1.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java 2011-09-13 23:19:51 UTC (rev 3482)
@@ -137,11 +137,16 @@
private static Pattern preparedAutoIncrement = Pattern.compile("select 1 \\s*from pg_catalog.pg_attrdef \\s*where adrelid = \\$1 AND adnum = \\$2 " + //$NON-NLS-1$
"\\s*and pg_catalog.pg_get_expr\\(adbin, adrelid\\) \\s*like '%nextval\\(%'", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
- private static Pattern deallocatePattern = Pattern.compile("DEALLOCATE \"(\\w+\\d+_*)\""); //$NON-NLS-1$
- private static Pattern releasePattern = Pattern.compile("RELEASE (\\w+\\d+_*)"); //$NON-NLS-1$
- private static Pattern savepointPattern = Pattern.compile("SAVEPOINT (\\w+\\d+_*)"); //$NON-NLS-1$
- private static Pattern rollbackPattern = Pattern.compile("ROLLBACK\\s*(to)*\\s*(\\w+\\d+_*)*"); //$NON-NLS-1$
+ private static Pattern cursorSelectPattern = Pattern.compile("DECLARE \"(\\w+)\" CURSOR(\\s(WITH HOLD|SCROLL))? FOR (.*)", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
+ private static Pattern fetchPattern = Pattern.compile("FETCH (\\d+) IN \"(\\w+)\".*", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
+ private static Pattern movePattern = Pattern.compile("MOVE (\\d+) IN \"(\\w+)\".*", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
+ private static Pattern closePattern = Pattern.compile("CLOSE \"(\\w+)\"", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
+ private static Pattern deallocatePattern = Pattern.compile("DEALLOCATE \"(\\w+\\d+_*)\"", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
+ private static Pattern releasePattern = Pattern.compile("RELEASE (\\w+\\d?_*)", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
+ private static Pattern savepointPattern = Pattern.compile("SAVEPOINT (\\w+\\d?_*)", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
+ private static Pattern rollbackPattern = Pattern.compile("ROLLBACK\\s*(to)*\\s*(\\w+\\d+_*)*", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
+
private ODBCClientRemote client;
private Properties props;
private AuthenticationType authType;
@@ -150,6 +155,7 @@
// TODO: this is unbounded map; need to define some boundaries as to how many stmts each session can have
private Map<String, Prepared> preparedMap = Collections.synchronizedMap(new HashMap<String, Prepared>());
private Map<String, Portal> portalMap = Collections.synchronizedMap(new HashMap<String, Portal>());
+ private Map<String, Cursor> cursorMap = Collections.synchronizedMap(new HashMap<String, Cursor>());
public ODBCServerRemoteImpl(ODBCClientRemote client, AuthenticationType authType) {
this.client = client;
@@ -189,6 +195,76 @@
}
@Override
+ public void cursorExecute(String cursorName, String sql) {
+ if (this.connection != null) {
+ if (sql != null) {
+ String modfiedSQL = sql.replaceAll("\\$\\d+", "?");//$NON-NLS-1$ //$NON-NLS-2$
+ try {
+ // close if the name is already used or the unnamed prepare; otherwise
+ // stmt is alive until session ends.
+ Prepared previous = this.preparedMap.remove(cursorName);
+ if (previous != null) {
+ previous.stmt.close();
+ }
+
+ PreparedStatement stmt = this.connection.prepareStatement(modfiedSQL);
+ boolean hasResults = stmt.execute();
+ this.cursorMap.put(cursorName, new Cursor(cursorName, sql, stmt, null, hasResults?stmt.getResultSet():null));
+ this.client.sendCommandComplete("DECLARE CURSOR", 0); //$NON-NLS-1$
+ } catch (SQLException e) {
+ this.client.errorOccurred(e);
+ } catch (IOException e) {
+ this.client.errorOccurred(e);
+ }
+ }
+ }
+ else {
+ this.client.errorOccurred(RuntimePlugin.Util.getString("no_active_connection")); //$NON-NLS-1$
+ }
+
+ }
+ @Override
+ public void cursorFetch(String cursorName, int rows) {
+ Cursor cursor = this.cursorMap.get(cursorName);
+ if (cursor != null) {
+ cursor.fetchSize = rows;
+ this.client.sendCursorResults(cursor.rs, rows);
+ }
+ else {
+ this.client.errorOccurred(RuntimePlugin.Util.getString("not_bound", cursorName)); //$NON-NLS-1$
+ return;
+ }
+ }
+
+ @Override
+ public void cursorMove(String prepareName, int rows) {
+ Cursor cursor = this.cursorMap.get(prepareName);
+ if (cursor != null) {
+ this.client.sendMoveCursor(cursor.rs, rows);
+ }
+ else {
+ this.client.errorOccurred(RuntimePlugin.Util.getString("not_bound", prepareName)); //$NON-NLS-1$
+ return;
+ }
+ }
+
+ @Override
+ public void cursorClose(String prepareName) {
+ Cursor cursor = this.cursorMap.remove(prepareName);
+ if (cursor != null) {
+ try {
+ cursor.rs.close();
+ cursor.stmt.close();
+ this.client.sendCommandComplete("CLOSE CURSOR", 0); //$NON-NLS-1$
+ } catch (SQLException e) {
+ this.client.errorOccurred(e);
+ } catch (IOException e) {
+ this.client.errorOccurred(e);
+ }
+ }
+ }
+
+ @Override
public void prepare(String prepareName, String sql, int[] paramType) {
if (this.connection != null) {
@@ -263,43 +339,50 @@
bindName = UNNAMED;
}
- Portal query = this.portalMap.get(bindName);
- if (query == null) {
- this.client.errorOccurred(RuntimePlugin.Util.getString("not_bound", bindName)); //$NON-NLS-1$
- sync();
- }
+ Cursor cursor = this.cursorMap.get(bindName);
+ if (cursor != null) {
+ this.client.sendPortalResults(cursor.sql, cursor.rs, cursor.fetchSize, true);
+ }
else {
- if (query.sql.trim().isEmpty()) {
- this.client.emptyQueryReceived();
- return;
+ Portal query = this.portalMap.get(bindName);
+ if (query == null) {
+ this.client.errorOccurred(RuntimePlugin.Util.getString("not_bound", bindName)); //$NON-NLS-1$
+ sync();
+ }
+ else {
+ if (query.sql.trim().isEmpty()) {
+ this.client.emptyQueryReceived();
+ return;
+ }
+
+ PreparedStatement stmt = query.stmt;
+ try {
+ // maxRows = 0, means unlimited.
+ if (maxRows != 0) {
+ stmt.setMaxRows(maxRows);
+ }
+
+ boolean result = stmt.execute();
+ if (result) {
+ try {
+ ResultSet rs = stmt.getResultSet();
+ this.client.sendResults(query.sql, rs, true);
+ } catch (SQLException e) {
+ this.client.errorOccurred(e);
+ }
+ } else {
+ this.client.sendUpdateCount(query.sql, stmt.getUpdateCount());
+ }
+ } catch (SQLException e) {
+ this.client.errorOccurred(e);
+ }
}
-
- PreparedStatement stmt = query.stmt;
- try {
- // maxRows = 0, means unlimited.
- if (maxRows != 0) {
- stmt.setMaxRows(maxRows);
- }
-
- boolean result = stmt.execute();
- if (result) {
- try {
- ResultSet rs = stmt.getResultSet();
- this.client.sendResults(query.sql, rs, true);
- } catch (SQLException e) {
- this.client.errorOccurred(e);
- }
- } else {
- this.client.sendUpdateCount(query.sql, stmt.getUpdateCount());
- }
- } catch (SQLException e) {
- this.client.errorOccurred(e);
- }
}
}
private String fixSQL(String sql) {
String modified = sql;
+ Matcher m = null;
// select current_schema()
// set client_encoding to 'WIN1252'
if (sql != null) {
@@ -307,8 +390,7 @@
String sqlLower = sql.toLowerCase();
if (sqlLower.startsWith("select")) { //$NON-NLS-1$
modified = sql.replace('\n', ' ');
-
- Matcher m = null;
+
if ((m = pkPattern.matcher(modified)).matches()) {
modified = new StringBuffer("SELECT k.Name AS attname, convert(Position, short) AS attnum, TableName AS relname, SchemaName AS nspname, TableName AS relname") //$NON-NLS-1$
.append(" FROM SYS.KeyColumns k") //$NON-NLS-1$
@@ -367,7 +449,7 @@
modified = "select 63"; //$NON-NLS-1$
}
else {
- Matcher m = setPattern.matcher(sql);
+ m = setPattern.matcher(sql);
if (m.matches()) {
if (m.group(2).equalsIgnoreCase("client_encoding")) { //$NON-NLS-1$
this.client.setEncoding(PGCharsetConverter.getCharset(m.group(4)));
@@ -383,16 +465,6 @@
else if ((m = rollbackPattern.matcher(modified)).matches()) {
modified = "ROLLBACK"; //$NON-NLS-1$
}
- else if ((m = savepointPattern.matcher(modified)).matches()) {
- modified = "SELECT 'SAVEPOINT'"; //$NON-NLS-1$
- }
- else if ((m = releasePattern.matcher(modified)).matches()) {
- modified = "SELECT 'RELEASE'"; //$NON-NLS-1$
- }
- else if ((m = deallocatePattern.matcher(modified)).matches()) {
- closePreparedStatement(m.group(1));
- modified = "SELECT 'DEALLOCATE'"; //$NON-NLS-1$
- }
}
if (modified != null && !modified.equalsIgnoreCase(sql)) {
LogManager.logDetail(LogConstants.CTX_ODBC, "Modified Query:"+modified); //$NON-NLS-1$
@@ -417,39 +489,71 @@
try {
ScriptReader reader = new ScriptReader(new StringReader(query));
String sql = reader.readStatement();
- String s = fixSQL(sql);
- while (s != null) {
- Statement stmt = null;
- try {
- stmt = this.connection.createStatement();
- boolean result = stmt.execute(s);
- if (result) {
- this.client.sendResults(sql, stmt.getResultSet(), true);
- } else {
- this.client.sendUpdateCount(sql, stmt.getUpdateCount());
- }
- sql = reader.readStatement();
- s = fixSQL(sql);
- } catch (SQLException e) {
- this.client.errorOccurred(e);
- break;
- } finally {
- try {
- if (stmt != null) {
- stmt.close();
- }
- } catch (SQLException e) {
- this.client.errorOccurred(e);
+ while (sql != null) {
+ Matcher m = null;
+ if ((m = cursorSelectPattern.matcher(sql)).matches()){
+ cursorExecute(m.group(1), fixSQL(m.group(4)));
+ }
+ else if ((m = fetchPattern.matcher(sql)).matches()){
+ cursorFetch(m.group(2), Integer.parseInt(m.group(1)));
+ }
+ else if ((m = movePattern.matcher(sql)).matches()){
+ cursorMove(m.group(2), Integer.parseInt(m.group(1)));
+ }
+ else if ((m = closePattern.matcher(sql)).matches()){
+ cursorClose(m.group(1));
+ }
+ else if ((m = savepointPattern.matcher(sql)).matches()) {
+ this.client.sendCommandComplete("SAVEPOINT", 0); //$NON-NLS-1$
+ }
+ else if ((m = releasePattern.matcher(sql)).matches()) {
+ this.client.sendCommandComplete("RELEASE", 0); //$NON-NLS-1$
+ }
+ else if ((m = deallocatePattern.matcher(sql)).matches()) {
+ closePreparedStatement(m.group(1));
+ this.client.sendCommandComplete("DEALLOCATE", 0); //$NON-NLS-1$
+ }
+
+ else {
+ if (!executeAndSend(fixSQL(sql))) {
break;
}
- }
+ }
+ sql = reader.readStatement();
}
+ sync();
} catch(IOException e) {
this.client.errorOccurred(e);
}
- sync();
}
+ private boolean executeAndSend(String sql) {
+ boolean sucess = true;
+ Statement stmt = null;
+ try {
+ stmt = this.connection.createStatement();
+ boolean result = stmt.execute(sql);
+ if (result) {
+ this.client.sendResults(sql, stmt.getResultSet(), true);
+ } else {
+ this.client.sendUpdateCount(sql, stmt.getUpdateCount());
+ }
+ } catch (SQLException e) {
+ this.client.errorOccurred(e);
+ sucess = false;
+ } finally {
+ try {
+ if (stmt != null) {
+ stmt.close();
+ }
+ } catch (SQLException e) {
+ this.client.errorOccurred(e);
+ sucess = false;
+ }
+ }
+ return sucess;
+ }
+
@Override
public void getParameterDescription(String prepareName) {
if (prepareName == null || prepareName.length() == 0) {
@@ -625,6 +729,16 @@
*/
int[] paramType;
}
+
+ static class Cursor extends Prepared {
+ ResultSet rs;
+ int fetchSize = 1000;
+
+ public Cursor (String name, String sql, PreparedStatement stmt, int[] paramType, ResultSet rs) {
+ super(name, sql, stmt, paramType);
+ this.rs = rs;
+ }
+ }
/**
* Represents a PostgreSQL Portal object.
Modified: branches/7.1.x/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java
===================================================================
--- branches/7.1.x/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java 2011-09-13 23:19:40 UTC (rev 3481)
+++ branches/7.1.x/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java 2011-09-13 23:19:51 UTC (rev 3482)
@@ -241,6 +241,61 @@
}
@Override
+ public void sendCursorResults(ResultSet rs, int rowCount) {
+ try {
+ try {
+ ResultSetMetaData meta = rs.getMetaData();
+ sendRowDescription(meta, rs.getStatement());
+ int rowsSent = sendDataRows(rs, rowCount);
+ sendCommandComplete("FETCH", rowsSent);
+ } catch (SQLException e) {
+ sendErrorResponse(e);
+ }
+ } catch (IOException e) {
+ terminate(e);
+ }
+ }
+
+ @Override
+ public void sendPortalResults(String sql, ResultSet rs, int rowCount, boolean portal) {
+ try {
+ try {
+ int rowsSent = sendDataRows(rs, rowCount);
+ if (rowsSent < rowCount) {
+ sendCommandComplete(sql, 0);
+ }
+ else {
+ sendPortalSuspended();
+ }
+ } catch (SQLException e) {
+ sendErrorResponse(e);
+ }
+ } catch (IOException e) {
+ terminate(e);
+ }
+ }
+
+ @Override
+ public void sendMoveCursor(ResultSet rs, int rowCount) {
+ try {
+ try {
+ int rowsMoved = 0;
+ for (int i = 0; i < rowCount; i++) {
+ if (!rs.next()) {
+ break;
+ }
+ rowsMoved++;
+ }
+ sendCommandComplete("MOVE", rowsMoved);
+ } catch (SQLException e) {
+ sendErrorResponse(e);
+ }
+ } catch (IOException e) {
+ terminate(e);
+ }
+ }
+
+ @Override
public void sendResults(String sql, ResultSet rs, boolean describeRows) {
try {
try {
@@ -248,9 +303,7 @@
ResultSetMetaData meta = rs.getMetaData();
sendRowDescription(meta, rs.getStatement());
}
- while (rs.next()) {
- sendDataRow(rs);
- }
+ sendDataRows(rs, -1);
sendCommandComplete(sql, 0);
} catch (SQLException e) {
sendErrorResponse(e);
@@ -311,8 +364,9 @@
startMessage('I');
sendMessage();
}
-
- private void sendCommandComplete(String sql, int updateCount) throws IOException {
+
+ @Override
+ public void sendCommandComplete(String sql, int updateCount) throws IOException {
startMessage('C');
sql = sql.trim().toUpperCase();
// TODO remove remarks at the beginning
@@ -331,22 +385,64 @@
tag = "COMMIT";
} else if (sql.startsWith("ROLLBACK")) {
tag = "ROLLBACK";
- } else {
- trace("Check command tag: " + sql);
- tag = "UPDATE " + updateCount;
+ } else if (sql.startsWith("DECLARE CURSOR")) {
+ tag = "DECLARE CURSOR";
+ } else if (sql.startsWith("CLOSE CURSOR")) {
+ tag = "CLOSE CURSOR";
+ } else if (sql.startsWith("FETCH")) {
+ tag = "FETCH "+updateCount;
+ } else if (sql.startsWith("MOVE")) {
+ tag = "MOVE "+updateCount;
}
+ else {
+ tag = sql;
+ }
writeString(tag);
sendMessage();
}
-
- private void sendDataRow(ResultSet rs) throws SQLException, IOException {
+
+ // 300k
+ static int ODBC_SOCKET_BUFF_SIZE = Integer.parseInt(System.getProperty("ODBCPacketSize", "307200"));
+
+ private int sendDataRows(ResultSet rs, int rowsToSend) throws SQLException, IOException {
+ int avgRowsize = -1;
+ int rowCount = 0;
+ ChannelBuffer buffer = ChannelBuffers.directBuffer(ODBC_SOCKET_BUFF_SIZE);
int columns = rs.getMetaData().getColumnCount();
- String[] values = new String[columns];
- for (int i = 0; i < columns; i++) {
- values[i] = rs.getString(i + 1);
+ int rowsSent = 0;
+
+ while(rs.next()) {
+ String[] values = new String[columns];
+ for (int i = 0; i < columns; i++) {
+ values[i] = rs.getString(i + 1);
+ }
+
+ rowCount++;
+
+ buildDataRow(values, buffer);
+ avgRowsize = buffer.readableBytes()/rowCount;
+
+ if (buffer.writableBytes() < (avgRowsize*2)) {
+ Channels.write(this.ctx, this.message.getFuture(), buffer, this.message.getRemoteAddress());
+ rowCount = 0;
+ buffer= ChannelBuffers.directBuffer(ODBC_SOCKET_BUFF_SIZE);
+ }
+
+ rowsSent++;
+ if (rowsSent == rowsToSend) {
+ break;
+ }
}
+
+ if (rowCount > 0) {
+ Channels.write(this.ctx, this.message.getFuture(), buffer, this.message.getRemoteAddress());
+ }
+ return rowsSent;
+ }
+
+ private void buildDataRow(String[] values, ChannelBuffer buffer) throws IOException {
startMessage('D');
- writeShort(columns);
+ writeShort(values.length);
for (String s : values) {
if (s == null) {
writeInt(-1);
@@ -357,7 +453,16 @@
write(d2);
}
}
- sendMessage();
+
+ byte[] buff = outBuffer.toByteArray();
+ int len = buff.length;
+ this.outBuffer = null;
+ this.dataOut = null;
+
+ // now build the wire contents.
+ buffer.writeByte((byte)this.messageType);
+ buffer.writeInt(len+4);
+ buffer.writeBytes(buff);
}
private void sendErrorResponse(Throwable t) throws IOException {
@@ -474,6 +579,11 @@
startMessage('2');
sendMessage();
}
+
+ private void sendPortalSuspended() {
+ startMessage('s');
+ sendMessage();
+ }
private void sendAuthenticationCleartextPassword() throws IOException {
startMessage('R');
More information about the teiid-commits
mailing list