[teiid-commits] teiid SVN: r3290 - in branches/7.4.x: runtime/src/main/java/org/teiid/odbc and 3 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Tue Jun 28 21:10:23 EDT 2011


Author: rareddy
Date: 2011-06-28 21:10:22 -0400 (Tue, 28 Jun 2011)
New Revision: 3290

Added:
   branches/7.4.x/runtime/src/main/java/org/teiid/odbc/PGUtil.java
Modified:
   branches/7.4.x/build/kits/jboss-container/teiid-releasenotes.html
   branches/7.4.x/runtime/src/main/java/org/teiid/odbc/ODBCClientRemote.java
   branches/7.4.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java
   branches/7.4.x/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java
   branches/7.4.x/runtime/src/main/resources/org/teiid/runtime/i18n.properties
   branches/7.4.x/test-integration/common/src/test/java/org/teiid/transport/TestODBCSocketTransport.java
Log:
TEIID-1652, TEIID-1653

Modified: branches/7.4.x/build/kits/jboss-container/teiid-releasenotes.html
===================================================================
--- branches/7.4.x/build/kits/jboss-container/teiid-releasenotes.html	2011-06-29 00:24:02 UTC (rev 3289)
+++ branches/7.4.x/build/kits/jboss-container/teiid-releasenotes.html	2011-06-29 01:10:22 UTC (rev 3290)
@@ -62,6 +62,7 @@
 	<LI><B>Pluggable Authorization</B> - an alternative PolicyDecider can be defined in the teiid-jboss-beans.xml file to customize authorization decisions.
 	<LI><B>Streaming XQuery</B> - in situations where document projection applies if the XMLQUERY/XMLTABLE path expressions meet certain conditions, then the incoming document will not only be projected, but the independent subtrees will be processed without loading the entire document.  This allows for nearly arbitrarily large XML documents to be processed.  See the Reference for more.
 	<LI><B>Logging Procedures</B> - added SYSADMIN.isLoggable and SYSADMIN.logMsg to aid in debugging procedure logic.
+	<LI><B>ODBC Cursors</B> - Capability to use "UseDeclareFetch" with ODBC is added. This enables user to read the results in batches, especially useful when dealing with large row count of results.
 </UL>
 
 <h2><a name="Compatibility">Compatibility Issues</a></h2>

Modified: branches/7.4.x/runtime/src/main/java/org/teiid/odbc/ODBCClientRemote.java
===================================================================
--- branches/7.4.x/runtime/src/main/java/org/teiid/odbc/ODBCClientRemote.java	2011-06-29 00:24:02 UTC (rev 3289)
+++ branches/7.4.x/runtime/src/main/java/org/teiid/odbc/ODBCClientRemote.java	2011-06-29 01:10:22 UTC (rev 3290)
@@ -21,13 +21,14 @@
  */
 package org.teiid.odbc;
 
+import java.io.IOException;
 import java.sql.ParameterMetaData;
-import java.sql.ResultSetMetaData;
-import java.sql.Statement;
+import java.util.List;
 import java.util.Properties;
 
 import org.teiid.client.util.ResultsFuture;
 import org.teiid.jdbc.ResultSetImpl;
+import org.teiid.odbc.PGUtil.PgColInfo;
 
 public interface ODBCClientRemote {
 	
@@ -62,11 +63,19 @@
 
 	//	RowDescription (B)
 	//	NoData (B)
-	void sendResultSetDescription(ResultSetMetaData metaData, Statement stmt);
+	void sendResultSetDescription(List<PgColInfo> cols);
 	
 	//	DataRow (B)
 	//	CommandComplete (B)
-	void sendResults(String sql, ResultSetImpl rs, ResultsFuture<Void> result, boolean describeRows);
+	void sendResults(String sql, ResultSetImpl rs, List<PgColInfo> cols, ResultsFuture<Integer> result, boolean describeRows);
+	
+	void sendCursorResults(ResultSetImpl rs, List<PgColInfo> cols, ResultsFuture<Integer> result, int rowCount);
+	
+	void sendPortalResults(String sql, ResultSetImpl rs, List<PgColInfo> cols, ResultsFuture<Integer> result, int rowCount, boolean portal);
+	
+	void sendMoveCursor(ResultSetImpl rs, int rowCount, ResultsFuture<Integer> results);
+	
+	void sendCommandComplete(String sql, int updateCount) throws IOException;	
 
 	//	CommandComplete (B)
 	void sendUpdateCount(String sql, int updateCount);
@@ -106,8 +115,5 @@
 	//	NoticeResponse (B)
 	//	NotificationResponse (B)
 	
-	//	PortalSuspended (B)
-	
-		
-
+	void sendPortalSuspended();
 }

Modified: branches/7.4.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java
===================================================================
--- branches/7.4.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java	2011-06-29 00:24:02 UTC (rev 3289)
+++ branches/7.4.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java	2011-06-29 01:10:22 UTC (rev 3290)
@@ -21,14 +21,19 @@
  */
 package org.teiid.odbc;
 
+import static org.teiid.odbc.PGUtil.convertType;
+
 import java.io.IOException;
 import java.io.StringReader;
 import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
@@ -40,10 +45,12 @@
 import org.teiid.core.util.StringUtil;
 import org.teiid.jdbc.ConnectionImpl;
 import org.teiid.jdbc.PreparedStatementImpl;
+import org.teiid.jdbc.ResultSetImpl;
 import org.teiid.jdbc.StatementImpl;
 import org.teiid.jdbc.TeiidDriver;
 import org.teiid.logging.LogConstants;
 import org.teiid.logging.LogManager;
+import org.teiid.odbc.PGUtil.PgColInfo;
 import org.teiid.runtime.RuntimePlugin;
 import org.teiid.transport.ODBCClientInstance;
 
@@ -133,11 +140,17 @@
 	private static Pattern preparedAutoIncrement = Pattern.compile("select 1 \\s*from pg_catalog.pg_attrdef \\s*where adrelid = \\$1 AND adnum = \\$2 " + //$NON-NLS-1$
 			"\\s*and pg_catalog.pg_get_expr\\(adbin, adrelid\\) \\s*like '%nextval\\(%'", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
 	
-	private static Pattern deallocatePattern = Pattern.compile("DEALLOCATE \"(\\w+\\d+_*)\""); //$NON-NLS-1$
-	private static Pattern releasePattern = Pattern.compile("RELEASE (\\w+\\d+_*)"); //$NON-NLS-1$
-	private static Pattern savepointPattern = Pattern.compile("SAVEPOINT (\\w+\\d+_*)"); //$NON-NLS-1$
-	private static Pattern rollbackPattern = Pattern.compile("ROLLBACK\\s*(to)*\\s*(\\w+\\d+_*)*"); //$NON-NLS-1$
+	private static Pattern cursorSelectPattern = Pattern.compile("DECLARE \"(\\w+)\" CURSOR(\\s(WITH HOLD|SCROLL))? FOR (.*)", Pattern.CASE_INSENSITIVE|Pattern.DOTALL); //$NON-NLS-1$
+	private static Pattern fetchPattern = Pattern.compile("FETCH (\\d+) IN \"(\\w+)\".*", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
+	private static Pattern movePattern = Pattern.compile("MOVE (\\d+) IN \"(\\w+)\".*", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
+	private static Pattern closePattern = Pattern.compile("CLOSE \"(\\w+)\"", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
 	
+	private static Pattern deallocatePattern = Pattern.compile("DEALLOCATE \"(\\w+\\d+_*)\"", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
+	private static Pattern releasePattern = Pattern.compile("RELEASE (\\w+\\d?_*)", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
+	private static Pattern savepointPattern = Pattern.compile("SAVEPOINT (\\w+\\d?_*)", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
+	private static Pattern rollbackPattern = Pattern.compile("ROLLBACK\\s*(to)*\\s*(\\w+\\d+_*)*", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
+	
+	
 	private TeiidDriver driver;
 	private ODBCClientRemote client;
 	private Properties props;
@@ -151,6 +164,7 @@
 	// TODO: this is unbounded map; need to define some boundaries as to how many stmts each session can have
 	private Map<String, Prepared> preparedMap = Collections.synchronizedMap(new HashMap<String, Prepared>());
 	private Map<String, Portal> portalMap = Collections.synchronizedMap(new HashMap<String, Portal>());
+	private Map<String, Cursor> cursorMap = Collections.synchronizedMap(new HashMap<String, Cursor>());
 	
 	public ODBCServerRemoteImpl(ODBCClientInstance client, AuthenticationType authType, TeiidDriver driver) {
 		this.driver = driver;
@@ -196,6 +210,155 @@
 		} 
 	}	
 	
+	private void cursorExecute(final String cursorName, final String sql, final ResultsFuture<Integer> completion) {
+		if (this.connection != null) {
+			if (sql != null) {
+				try {
+					// close if the name is already used or the unnamed prepare; otherwise
+					// stmt is alive until session ends.
+					Prepared previous = this.preparedMap.remove(cursorName);
+					if (previous != null) {
+						previous.stmt.close();
+					}
+					
+					final PreparedStatementImpl stmt = this.connection.prepareStatement(sql);
+	                this.executionFuture = stmt.submitExecute();
+	                this.executionFuture.addCompletionListener(new ResultsFuture.CompletionListener<Boolean>() {
+		        		@Override
+		        		public void onCompletion(ResultsFuture<Boolean> future) {
+		        			executionFuture = null;
+	                        try {
+				                if (future.get()) {
+				                	List<PgColInfo> cols = getPgColInfo(stmt.getResultSet().getMetaData());
+		                            cursorMap.put(cursorName, new Cursor(cursorName, sql, stmt, null, stmt.getResultSet(), cols));
+		        					client.sendCommandComplete("DECLARE CURSOR", 0); //$NON-NLS-1$		                            
+				                }
+				                else {
+				                	client.errorOccurred(RuntimePlugin.Util.getString("execution_failed")); //$NON-NLS-1$
+				                }
+				                completion.getResultsReceiver().receiveResults(1);
+				                doneExecuting();
+	                        } catch (Throwable e) {
+	                            errorOccurred(e);
+	                        }
+		        		}
+					});					
+				} catch (SQLException e) {
+					errorOccurred(e);
+				} 
+			}
+		}
+		else {
+			errorOccurred(RuntimePlugin.Util.getString("no_active_connection")); //$NON-NLS-1$
+		}
+		
+	}
+	
+	private void cursorFetch(String cursorName, int rows, final ResultsFuture<Integer> completion) {
+		Cursor cursor = this.cursorMap.get(cursorName);
+		if (cursor != null) {
+			cursor.fetchSize = rows;
+			ResultsFuture<Integer> result = new ResultsFuture<Integer>();
+			this.client.sendCursorResults(cursor.rs, cursor.columnMetadata, result, rows);
+			result.addCompletionListener(new ResultsFuture.CompletionListener<Integer>() {
+            	public void onCompletion(ResultsFuture<Integer> future) {
+            		try {
+						int rowsSent = future.get();
+						client.sendCommandComplete("FETCH", rowsSent); //$NON-NLS-1$
+						completion.getResultsReceiver().receiveResults(rowsSent);
+					} catch (InterruptedException e) {
+						throw new AssertionError(e);
+					} catch (ExecutionException e) {
+						errorOccurred(e.getCause());
+					} catch (IOException e) {
+						errorOccurred(e);
+					}
+            	};
+			});
+		}
+		else {
+			errorOccurred(RuntimePlugin.Util.getString("not_bound", cursorName)); //$NON-NLS-1$
+		}
+	}
+	
+	private void cursorMove(String prepareName, int rows, final ResultsFuture<Integer> completion) {
+		Cursor cursor = this.cursorMap.get(prepareName);
+		if (cursor != null) {
+			ResultsFuture<Integer> result = new ResultsFuture<Integer>();
+			this.client.sendMoveCursor(cursor.rs, rows, result);
+			result.addCompletionListener(new ResultsFuture.CompletionListener<Integer>() {
+            	public void onCompletion(ResultsFuture<Integer> future) {
+            		try {
+						int rowsMoved = future.get();
+						client.sendCommandComplete("MOVE", rowsMoved); //$NON-NLS-1$
+						completion.getResultsReceiver().receiveResults(rowsMoved);
+					} catch (InterruptedException e) {
+						throw new AssertionError(e);
+					} catch (ExecutionException e) {
+						errorOccurred(e.getCause());
+					} catch (IOException e) {
+						errorOccurred(e);
+					}
+            	};
+			});			
+		}
+		else {
+			errorOccurred(RuntimePlugin.Util.getString("not_bound", prepareName)); //$NON-NLS-1$
+		}
+	}	
+	
+	private void cursorClose(String prepareName) throws SQLException, IOException {
+		Cursor cursor = this.cursorMap.remove(prepareName);
+		if (cursor != null) {
+			cursor.rs.close();
+			cursor.stmt.close();
+			this.client.sendCommandComplete("CLOSE CURSOR", 0); //$NON-NLS-1$
+		}
+	}	
+	
+    private void sqlExecute(final String sql, final ResultsFuture<Integer> completion) throws SQLException {
+    	String modfiedSQL = fixSQL(sql); 
+    	final StatementImpl stmt = connection.createStatement();
+        executionFuture = stmt.submitExecute(modfiedSQL);
+        executionFuture.addCompletionListener(new ResultsFuture.CompletionListener<Boolean>() {
+    		@Override
+    		public void onCompletion(ResultsFuture<Boolean> future) {
+    			executionFuture = null;
+    			try {
+    				ResultsFuture<Integer> result = new ResultsFuture<Integer>();
+	                if (future.get()) {
+	                	if (stmt.getResultSet() != null) {
+	                		List<PgColInfo> cols = getPgColInfo(stmt.getResultSet().getMetaData());
+                            client.sendResults(sql, stmt.getResultSet(), cols, result, true);
+	                	}
+	                	else {
+	                		// handles the "SET" commands.
+		                	client.sendUpdateCount(sql, 0);
+		                	result.getResultsReceiver().receiveResults(1);
+	                	}					                	
+	                } else {
+	                	client.sendUpdateCount(sql, stmt.getUpdateCount());
+	                	setEncoding();
+	                	result.getResultsReceiver().receiveResults(1);
+	                }
+	                result.addCompletionListener(new ResultsFuture.CompletionListener<Integer>() {
+	                	public void onCompletion(ResultsFuture<Integer> future) {
+							try {
+								stmt.close();
+							} catch (SQLException e) {
+								LogManager.logDetail(LogConstants.CTX_ODBC, e, "Error closing statement"); //$NON-NLS-1$
+							}
+							completion.getResultsReceiver().receiveResults(1);
+	                	}
+	                });
+    			} catch (Throwable e) {
+    				client.errorOccurred(e);
+    				return;
+    			}
+    		}
+		});    	
+    }	
+	
 	@Override
 	public void prepare(String prepareName, String sql, int[] paramType) {
 		if (this.connection != null) {
@@ -215,7 +378,8 @@
 					}
 					
 					PreparedStatementImpl stmt = this.connection.prepareStatement(modfiedSQL);
-					this.preparedMap.put(prepareName, new Prepared(prepareName, sql, stmt, paramType));
+					List<PgColInfo> cols = getPgColInfo(stmt.getMetaData());
+					this.preparedMap.put(prepareName, new Prepared(prepareName, sql, stmt, paramType, cols));
 					this.client.prepareCompleted(prepareName);
 				} catch (SQLException e) {
 					errorOccurred(e);
@@ -255,7 +419,7 @@
 			errorOccurred(e);
 		}
 		
-		this.portalMap.put(bindName, new Portal(bindName, prepareName, previous.sql, previous.stmt, resultColumnFormat));
+		this.portalMap.put(bindName, new Portal(bindName, prepareName, previous.sql, previous.stmt, resultColumnFormat, previous.columnMetadata));
 		this.client.bindComplete();
 	}
 
@@ -274,60 +438,95 @@
 			bindName  = UNNAMED;
 		}		
 		
+		// special case cursor execution through portal
+		final Cursor cursor = this.cursorMap.get(bindName);
+		if (cursor != null) {
+			sendCursorResults(cursor);
+			return;
+		}		
+		
 		final Portal query = this.portalMap.get(bindName);
 		if (query == null) {
 			errorOccurred(RuntimePlugin.Util.getString("not_bound", bindName)); //$NON-NLS-1$
-		}				
-		else {
-			if (query.sql.trim().isEmpty()) {
-				this.client.emptyQueryReceived();
-				return;
-			}
-			
-            final PreparedStatementImpl stmt = query.stmt;
-            try {
-            	// maxRows = 0, means unlimited.
-            	if (maxRows != 0) {
-            		stmt.setMaxRows(maxRows);
-            	}
-            	
-                this.executionFuture = stmt.submitExecute();
-                executionFuture.addCompletionListener(new ResultsFuture.CompletionListener<Boolean>() {
-	        		@Override
-	        		public void onCompletion(ResultsFuture<Boolean> future) {
-	        			executionFuture = null;
-                        try {
-                        	ResultsFuture<Void> result = null;
-			                if (future.get()) {
-			                	result = new ResultsFuture<Void>();
-	                            client.sendResults(query.sql, stmt.getResultSet(), result, true);
-			                } else {
-			                	result = ResultsFuture.NULL_FUTURE;
-			                	client.sendUpdateCount(query.sql, stmt.getUpdateCount());
-			                	setEncoding();
-			                }
-		        			result.addCompletionListener(new ResultsFuture.CompletionListener<Void>() {
-                            	public void onCompletion(ResultsFuture<Void> future) {
-                            		try {
-										future.get();
-	                            		doneExecuting();
-									} catch (InterruptedException e) {
-										throw new AssertionError(e);
-									} catch (ExecutionException e) {
-										errorOccurred(e.getCause());
-									}
-                            	};
-							});
-                        } catch (Throwable e) {
-                            errorOccurred(e);
-                        }
-	        		}
-				});
-            } catch (SQLException e) {
-            	errorOccurred(e);
-            }			
+			return;
+		}	
+		
+		if (query.sql.trim().isEmpty()) {
+			this.client.emptyQueryReceived();
+			return;
 		}
+		
+        sendPortalResults(maxRows, query);			
 	}
+
+	private void sendPortalResults(int maxRows, final Portal query) {
+		final PreparedStatementImpl stmt = query.stmt;
+        try {
+        	// maxRows = 0, means unlimited.
+        	if (maxRows != 0) {
+        		stmt.setMaxRows(maxRows);
+        	}
+        	
+            this.executionFuture = stmt.submitExecute();
+            executionFuture.addCompletionListener(new ResultsFuture.CompletionListener<Boolean>() {
+        		@Override
+        		public void onCompletion(ResultsFuture<Boolean> future) {
+        			executionFuture = null;
+                    try {
+                    	ResultsFuture<Integer> result = new ResultsFuture<Integer>();
+		                if (future.get()) {
+                            client.sendResults(query.sql, stmt.getResultSet(), query.columnMetadata, result, true);
+		                } else {
+		                	// null future
+		                	client.sendUpdateCount(query.sql, stmt.getUpdateCount());
+		                	setEncoding();
+		                	result.getResultsReceiver().receiveResults(1);
+		                }
+	        			result.addCompletionListener(new ResultsFuture.CompletionListener<Integer>() {
+                        	public void onCompletion(ResultsFuture<Integer> future) {
+                        		try {
+									future.get();
+                            		doneExecuting();
+								} catch (InterruptedException e) {
+									throw new AssertionError(e);
+								} catch (ExecutionException e) {
+									errorOccurred(e.getCause());
+								}
+                        	};
+						});
+                    } catch (Throwable e) {
+                        errorOccurred(e);
+                    }
+        		}
+			});
+        } catch (SQLException e) {
+        	errorOccurred(e);
+        }
+	}
+
+	private void sendCursorResults(final Cursor cursor) {
+		ResultsFuture<Integer> result = new ResultsFuture<Integer>();
+		this.client.sendPortalResults(cursor.sql, cursor.rs, cursor.columnMetadata, result, cursor.fetchSize, true);
+		result.addCompletionListener(new ResultsFuture.CompletionListener<Integer>() {
+			public void onCompletion(ResultsFuture<Integer> future) {
+				try {
+					int rowsSent = future.get();
+					if (rowsSent < cursor.fetchSize) {
+						client.sendCommandComplete(cursor.sql, 0);
+					}
+					else {
+						client.sendPortalSuspended();
+					}
+				} catch (InterruptedException e) {
+					throw new AssertionError(e);
+				} catch (ExecutionException e) {
+					errorOccurred(e.getCause());
+				} catch (IOException e) {
+					errorOccurred(e);
+				}
+			};
+		});
+	}
 	
 	private String fixSQL(String sql) {
 		String modified = modifySQL(sql);
@@ -409,16 +608,6 @@
 			}
 			else if ((m = rollbackPattern.matcher(modified)).matches()) {
 				return "ROLLBACK"; //$NON-NLS-1$
-			}	
-			else if ((m = savepointPattern.matcher(modified)).matches()) {
-				return "SELECT 'SAVEPOINT'"; //$NON-NLS-1$
-			}
-			else if ((m = releasePattern.matcher(modified)).matches()) {
-				return "SELECT 'RELEASE'"; //$NON-NLS-1$
-			}		
-			else if ((m = deallocatePattern.matcher(modified)).matches()) {
-				closePreparedStatement(m.group(1));
-				return "SELECT 'DEALLOCATE'"; //$NON-NLS-1$
 			}					
 		}
 		modified = sql;
@@ -431,7 +620,7 @@
 	@Override
 	public void executeQuery(final String query) {
 		if (beginExecution()) {
-			this.client.errorOccurred("Awaiting asynch result"); //$NON-NLS-1$
+			errorOccurred("Awaiting asynch result"); //$NON-NLS-1$
 			ready();
 			return;
 		}
@@ -482,7 +671,7 @@
 				
 				// followed by a RowDescription message describing the rows that will be returned when the statement  
 				// is eventually executed (or a NoData message if the statement will not return rows).
-				this.client.sendResultSetDescription(query.stmt.getMetaData(), query.stmt);
+				this.client.sendResultSetDescription(query.columnMetadata);
 			} catch (SQLException e) {
 				errorOccurred(e);
 			}
@@ -515,11 +704,7 @@
 			errorOccurred(RuntimePlugin.Util.getString("not_bound", bindName)); //$NON-NLS-1$
 		}
 		else {
-			try {
-				this.client.sendResultSetDescription(query.stmt.getMetaData(), query.stmt);
-			} catch (SQLException e) {
-				errorOccurred(e);
-			}
+			this.client.sendResultSetDescription(query.columnMetadata);
 		}
 	}
 
@@ -622,6 +807,9 @@
 			
 		try {			
 			if (this.connection != null) {
+				if (!this.connection.getAutoCommit()) {
+					this.connection.rollback(false);
+				}
 				this.connection.close();
 			}
 		} catch (SQLException e) {
@@ -663,75 +851,71 @@
 	
     private final class QueryWorkItem implements Runnable {
 		private final ScriptReader reader;
-		String modfiedSQL;
 		String sql;
 
 		private QueryWorkItem(String query) {
-			this.reader = new ScriptReader(new StringReader(query));
+			this.reader = new ScriptReader(new StringReader(query));		
 		}
 
 		@Override
 		public void run() {
 			try {
-				if (modfiedSQL == null) {
+				if (sql == null) {
 					sql = reader.readStatement();
-			        modfiedSQL = fixSQL(sql);
 				}
-		        while (modfiedSQL != null) {
+		        while (sql != null) {
 		            try {
-		            	final StatementImpl stmt = connection.createStatement();
-		                executionFuture = stmt.submitExecute(modfiedSQL);
-		                executionFuture.addCompletionListener(new ResultsFuture.CompletionListener<Boolean>() {
-			        		@Override
-			        		public void onCompletion(ResultsFuture<Boolean> future) {
-			        			executionFuture = null;
-			        			try {
-			        				ResultsFuture<Void> result = null;
-					                if (future.get()) {
-					                	if (stmt.getResultSet() != null) {
-						                	result = new ResultsFuture<Void>();
-				                            client.sendResults(sql, stmt.getResultSet(), result, true);
-					                	}
-					                	else {
-					                		// handles the "SET" commands.
-						                	result = ResultsFuture.NULL_FUTURE;
-						                	client.sendUpdateCount(sql, 0);
-					                	}					                	
-					                } else {
-					                	result = ResultsFuture.NULL_FUTURE;
-					                	client.sendUpdateCount(sql, stmt.getUpdateCount());
-					                	setEncoding();
-					                }
-					                result.addCompletionListener(new ResultsFuture.CompletionListener<Void>() {
-		                            	public void onCompletion(ResultsFuture<Void> future) {
-		                            		try {
-												future.get();
-								                sql = reader.readStatement();
-								                modfiedSQL = fixSQL(sql);
-											} catch (InterruptedException e) {
-												throw new AssertionError(e);
-											} catch (IOException e) {
-												client.errorOccurred(e);
-												return;
-											} catch (ExecutionException e) {
-												client.errorOccurred(e.getCause());
-												return;
-											} finally {
-												try {
-													stmt.close();
-												} catch (SQLException e) {
-													LogManager.logDetail(LogConstants.CTX_ODBC, e, "Error closing statement"); //$NON-NLS-1$
-												}
-											}
-						        			QueryWorkItem.this.run(); //continue processing
-		                            	};
-									});
-			        			} catch (Throwable e) {
-			        				client.errorOccurred(e);
-			        				return;
-			        			}
-			        		}
-						});
+		    			
+		            	ResultsFuture<Integer> results = new ResultsFuture<Integer>();
+		    			results.addCompletionListener(new ResultsFuture.CompletionListener<Integer>() {
+		                	public void onCompletion(ResultsFuture<Integer> future) {
+		                		try {
+		    						future.get();
+		    		                sql = reader.readStatement();
+		    					} catch (InterruptedException e) {
+		    						throw new AssertionError(e);
+		    					} catch (IOException e) {
+		    						client.errorOccurred(e);
+		    						return;
+		    					} catch (ExecutionException e) {
+		    						client.errorOccurred(e.getCause());
+		    						return;
+		    					} finally {
+		    					}
+		            			QueryWorkItem.this.run(); //continue processing
+		                	};
+		    			});	
+		    			
+		            	Matcher m = null;
+		    	        if ((m = cursorSelectPattern.matcher(sql)).matches()){
+		    				cursorExecute(m.group(1), fixSQL(m.group(4)), results);
+		    			}
+		    			else if ((m = fetchPattern.matcher(sql)).matches()){
+		    				cursorFetch(m.group(2), Integer.parseInt(m.group(1)), results);
+		    			}
+		    			else if ((m = movePattern.matcher(sql)).matches()){
+		    				cursorMove(m.group(2), Integer.parseInt(m.group(1)), results);
+		    			}
+		    			else if ((m = closePattern.matcher(sql)).matches()){
+		    				cursorClose(m.group(1));
+		    				results.getResultsReceiver().receiveResults(1);
+		    			}
+		    			else if ((m = savepointPattern.matcher(sql)).matches()) {
+		    				client.sendCommandComplete("SAVEPOINT", 0); //$NON-NLS-1$
+		    				results.getResultsReceiver().receiveResults(1);
+		    			}
+		    			else if ((m = releasePattern.matcher(sql)).matches()) {
+		    				client.sendCommandComplete("RELEASE", 0); //$NON-NLS-1$
+		    				results.getResultsReceiver().receiveResults(1);
+		    			}		
+		    			else if ((m = deallocatePattern.matcher(sql)).matches()) { 
+		    				closePreparedStatement(m.group(1));
+		    				client.sendCommandComplete("DEALLOCATE", 0); //$NON-NLS-1$
+		    				results.getResultsReceiver().receiveResults(1);
+		    			}
+		    			else {
+		    				sqlExecute(sql, results);
+		    			}
 		                return; //wait for the execution to finish
 		            } catch (SQLException e) {
 		                client.errorOccurred(e);
@@ -744,19 +928,52 @@
 			doneExecuting();
 			ready();
 		}
-
 	}
-
+    
+	private List<PgColInfo> getPgColInfo(ResultSetMetaData meta)
+			throws SQLException {
+		int columns = meta.getColumnCount();
+		final ArrayList<PgColInfo> result = new ArrayList<PgColInfo>(columns);
+		for (int i = 1; i < columns + 1; i++) {
+			final PgColInfo info = new PgColInfo();
+			info.name = meta.getColumnName(i).toLowerCase();
+			info.type = meta.getColumnType(i);
+			info.type = convertType(info.type);
+			info.precision = meta.getColumnDisplaySize(i);
+			String name = meta.getColumnName(i);
+			String table = meta.getTableName(i);
+			String schema = meta.getSchemaName(i);
+			if (schema != null) {
+				final PreparedStatementImpl ps = this.connection.prepareStatement("select attrelid, attnum, typoid from matpg_relatt where attname = ? and relname = ? and nspname = ?"); //$NON-NLS-1$
+				ps.setString(1, name);
+				ps.setString(2, table);
+				ps.setString(3, schema);	
+				ResultSet rs = ps.executeQuery();
+				if (rs.next()) {
+					info.reloid = rs.getInt(1);
+					info.attnum = rs.getShort(2);
+					int specificType = rs.getInt(3);
+					if (!rs.wasNull()) {
+						info.type = specificType;
+					}
+				}	
+				result.add(info);
+			}
+		}
+		return result;
+	}  
+    
 	/**
      * Represents a PostgreSQL Prepared object.
      */
     static class Prepared {
 
-    	public Prepared (String name, String sql, PreparedStatementImpl stmt, int[] paramType) {
+    	public Prepared (String name, String sql, PreparedStatementImpl stmt, int[] paramType, List<PgColInfo> colMetadata) {
     		this.name = name;
     		this.sql = sql;
     		this.stmt = stmt;
     		this.paramType = paramType;
+    		this.columnMetadata = colMetadata;
     	}
     	
         /**
@@ -778,6 +995,11 @@
          * The list of parameter types (if set).
          */
         int[] paramType;
+        
+        /**
+         * calculated column metadata
+         */
+        List<PgColInfo> columnMetadata;
     }
 
     /**
@@ -785,12 +1007,13 @@
      */
     static class Portal {
 
-    	public Portal(String name, String preparedName, String sql, PreparedStatementImpl stmt, int[] resultColumnformat) {
+    	public Portal(String name, String preparedName, String sql, PreparedStatementImpl stmt, int[] resultColumnformat, List<PgColInfo> colMetadata) {
     		this.name = name;
     		this.preparedName = preparedName;
     		this.sql = sql;
     		this.stmt = stmt;
     		this.resultColumnFormat = resultColumnformat;
+    		this.columnMetadata = colMetadata;
     	}
         /**
          * The portal name.
@@ -814,6 +1037,22 @@
          * The prepared statement.
          */
         PreparedStatementImpl stmt;
+        
+        /**
+         * calculated column metadata
+         */
+        List<PgColInfo> columnMetadata;        
     }
+    
+    static class Cursor extends Prepared {
+    	ResultSetImpl rs;
+    	int fetchSize = 1000;
+    	
+    	
+    	public Cursor (String name, String sql, PreparedStatementImpl stmt, int[] paramType, ResultSetImpl rs, List<PgColInfo> colMetadata) {
+    		super(name, sql, stmt, paramType, colMetadata);
+    		this.rs = rs;
+    	}
+    }    
 
 }

Added: branches/7.4.x/runtime/src/main/java/org/teiid/odbc/PGUtil.java
===================================================================
--- branches/7.4.x/runtime/src/main/java/org/teiid/odbc/PGUtil.java	                        (rev 0)
+++ branches/7.4.x/runtime/src/main/java/org/teiid/odbc/PGUtil.java	2011-06-29 01:10:22 UTC (rev 3290)
@@ -0,0 +1,114 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+package org.teiid.odbc;
+
+import java.sql.Types;
+
+import org.teiid.deployers.PgCatalogMetadataStore;
+
+public class PGUtil {
+
+	public static final int PG_TYPE_VARCHAR = 1043;
+
+	public static final int PG_TYPE_BOOL = 16;
+	public static final int PG_TYPE_BYTEA = 17;
+	public static final int PG_TYPE_BPCHAR = 1042;
+	public static final int PG_TYPE_INT8 = 20;
+	public static final int PG_TYPE_INT2 = 21;
+	public static final int PG_TYPE_INT4 = 23;
+	public static final int PG_TYPE_TEXT = 25;
+    //private static final int PG_TYPE_OID = 26;
+	public static final int PG_TYPE_FLOAT4 = 700;
+	public static final int PG_TYPE_FLOAT8 = 701;
+	public static final int PG_TYPE_UNKNOWN = 705;
+    
+	public static final int PG_TYPE_OIDVECTOR = PgCatalogMetadataStore.PG_TYPE_OIDVECTOR;
+	public static final int PG_TYPE_OIDARRAY = PgCatalogMetadataStore.PG_TYPE_OIDARRAY;
+	public static final int PG_TYPE_CHARARRAY = PgCatalogMetadataStore.PG_TYPE_CHARARRAY;
+	public static final int PG_TYPE_TEXTARRAY = PgCatalogMetadataStore.PG_TYPE_TEXTARRAY;
+    
+	public static final int PG_TYPE_DATE = 1082;
+	public static final int PG_TYPE_TIME = 1083;
+	public static final int PG_TYPE_TIMESTAMP_NO_TMZONE = 1114;
+	public static final int PG_TYPE_NUMERIC = 1700;
+    //private static final int PG_TYPE_LO = 14939;
+    
+	public static class PgColInfo {
+		public String name;
+		public int reloid;
+		public short attnum;
+		public int type;
+		public int precision;
+	}
+		
+	/**
+	 * Types.ARRAY is not supported
+	 */
+	public static int convertType(final int type) {
+        switch (type) {
+        case Types.BIT:
+        case Types.BOOLEAN:
+            return PG_TYPE_BOOL;
+        case Types.VARCHAR:
+            return PG_TYPE_VARCHAR;        
+        case Types.CHAR:
+            return PG_TYPE_BPCHAR;
+        case Types.TINYINT:
+        case Types.SMALLINT:
+        	return PG_TYPE_INT2;
+        case Types.INTEGER:
+            return PG_TYPE_INT4;
+        case Types.BIGINT:
+            return PG_TYPE_INT8;
+        case Types.NUMERIC:
+        case Types.DECIMAL:
+            return PG_TYPE_NUMERIC;
+        case Types.FLOAT:
+        case Types.REAL:
+            return PG_TYPE_FLOAT4;
+        case Types.DOUBLE:
+            return PG_TYPE_FLOAT8;
+        case Types.TIME:
+            return PG_TYPE_TIME;
+        case Types.DATE:
+            return PG_TYPE_DATE;
+        case Types.TIMESTAMP:
+            return PG_TYPE_TIMESTAMP_NO_TMZONE;
+            
+        case Types.BLOB:            
+        case Types.BINARY:
+        case Types.VARBINARY:
+        case Types.LONGVARBINARY:
+        	return PG_TYPE_BYTEA;
+        	
+        case Types.LONGVARCHAR:
+        case Types.CLOB:            
+        	return PG_TYPE_TEXT;
+        
+        case Types.SQLXML:        	
+            return PG_TYPE_TEXT;
+            
+        default:
+            return PG_TYPE_UNKNOWN;
+        }
+	}	
+}


Property changes on: branches/7.4.x/runtime/src/main/java/org/teiid/odbc/PGUtil.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Modified: branches/7.4.x/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java
===================================================================
--- branches/7.4.x/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java	2011-06-29 00:24:02 UTC (rev 3289)
+++ branches/7.4.x/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java	2011-06-29 01:10:22 UTC (rev 3290)
@@ -21,6 +21,26 @@
  * 02110-1301 USA.
  */package org.teiid.transport;
 
+import static org.teiid.odbc.PGUtil.PG_TYPE_BOOL;
+import static org.teiid.odbc.PGUtil.PG_TYPE_BPCHAR;
+import static org.teiid.odbc.PGUtil.PG_TYPE_BYTEA;
+import static org.teiid.odbc.PGUtil.PG_TYPE_CHARARRAY;
+import static org.teiid.odbc.PGUtil.PG_TYPE_DATE;
+import static org.teiid.odbc.PGUtil.PG_TYPE_FLOAT4;
+import static org.teiid.odbc.PGUtil.PG_TYPE_FLOAT8;
+import static org.teiid.odbc.PGUtil.PG_TYPE_INT2;
+import static org.teiid.odbc.PGUtil.PG_TYPE_INT4;
+import static org.teiid.odbc.PGUtil.PG_TYPE_INT8;
+import static org.teiid.odbc.PGUtil.PG_TYPE_NUMERIC;
+import static org.teiid.odbc.PGUtil.PG_TYPE_OIDARRAY;
+import static org.teiid.odbc.PGUtil.PG_TYPE_OIDVECTOR;
+import static org.teiid.odbc.PGUtil.PG_TYPE_TEXT;
+import static org.teiid.odbc.PGUtil.PG_TYPE_TEXTARRAY;
+import static org.teiid.odbc.PGUtil.PG_TYPE_TIME;
+import static org.teiid.odbc.PGUtil.PG_TYPE_TIMESTAMP_NO_TMZONE;
+import static org.teiid.odbc.PGUtil.PG_TYPE_UNKNOWN;
+import static org.teiid.odbc.PGUtil.PG_TYPE_VARCHAR;
+
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -32,13 +52,9 @@
 import java.sql.Blob;
 import java.sql.Clob;
 import java.sql.ParameterMetaData;
-import java.sql.PreparedStatement;
 import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
-import java.sql.Statement;
 import java.sql.Types;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 
@@ -58,16 +74,15 @@
 import org.teiid.core.util.ObjectConverterUtil;
 import org.teiid.core.util.ReaderInputStream;
 import org.teiid.core.util.ReflectionHelper;
-import org.teiid.deployers.PgCatalogMetadataStore;
 import org.teiid.jdbc.ResultSetImpl;
 import org.teiid.jdbc.TeiidSQLException;
 import org.teiid.logging.LogConstants;
 import org.teiid.logging.LogManager;
 import org.teiid.net.socket.ServiceInvocationStruct;
 import org.teiid.odbc.ODBCClientRemote;
+import org.teiid.odbc.PGUtil.PgColInfo;
 import org.teiid.runtime.RuntimePlugin;
 import org.teiid.transport.pg.PGbytea;
-
 /**
  * Represents the messages going from Server --> PG ODBC Client  
  * Some parts of this code is taken from H2's implementation of ODBC
@@ -92,19 +107,24 @@
 			}
 		}
 	}
-
+	
+    // 300k
+	static int ODBC_SOCKET_BUFF_SIZE = Integer.parseInt(System.getProperty("ODBCPacketSize", "307200"));
+	
 	private final class ResultsWorkItem implements Runnable {
 		private final List<PgColInfo> cols;
-		private final String sql;
 		private final ResultSetImpl rs;
-		private final ResultsFuture<Void> result;
+		private final ResultsFuture<Integer> result;
+		private int rows2Send;
+		private int rowsSent = 0;
+		private int rowsInBuffer = 0;
+		private ChannelBuffer buffer = ChannelBuffers.directBuffer(ODBC_SOCKET_BUFF_SIZE);
 
-		private ResultsWorkItem(List<PgColInfo> cols, String sql,
-				ResultSetImpl rs, ResultsFuture<Void> result) {
+		private ResultsWorkItem(List<PgColInfo> cols, ResultSetImpl rs, ResultsFuture<Integer> result, int rows2Send) {
 			this.cols = cols;
-			this.sql = sql;
 			this.rs = rs;
 			this.result = result;
+			this.rows2Send = rows2Send;
 		}
 
 		@Override
@@ -117,8 +137,10 @@
 				    		@Override
 				    		public void onCompletion(ResultsFuture<Boolean> future) {
 				    			if (processRow(future)) {
-				    				//this can be recursive, but ideally won't be called many times 
-				    				ResultsWorkItem.this.run();
+				    				if (rowsSent != rows2Send) {
+				    					//this can be recursive, but ideally won't be called many times 
+				    					ResultsWorkItem.this.run();
+				    				}
 				    			}
 				    		}
 						});
@@ -138,10 +160,18 @@
 			boolean processNext = true;
 			try {
     			if (future.get()) {
-    				sendDataRow(rs, cols);
+    				sendDataRow(rs, cols, buffer);
+    				rowsSent++;
+    				rowsInBuffer++;
+    				boolean done = rowsSent == rows2Send;
+    				flushResults(done);
+    				processNext = !done;
+    				if (done) {
+    					result.getResultsReceiver().receiveResults(rowsSent);
+    				}
     			} else {
-    				sendCommandComplete(sql, 0);
-    				result.getResultsReceiver().receiveResults(null);
+    				sendContents(buffer);
+    				result.getResultsReceiver().receiveResults(rowsSent);
     				processNext = false;
     			}
 			} catch (Throwable t) {
@@ -150,33 +180,17 @@
 			}
 			return processNext;
 		}
+		
+		private void flushResults(boolean force) {
+			int avgRowsize = buffer.readableBytes()/rowsInBuffer;
+			if (force || buffer.writableBytes() < (avgRowsize*2)) {
+				sendContents(buffer);
+				buffer= ChannelBuffers.directBuffer(ODBC_SOCKET_BUFF_SIZE);
+				rowsInBuffer = 0;
+			}			
+		}
 	}
-
-	private static final int PG_TYPE_VARCHAR = 1043;
-
-    private static final int PG_TYPE_BOOL = 16;
-    private static final int PG_TYPE_BYTEA = 17;
-    private static final int PG_TYPE_BPCHAR = 1042;
-    private static final int PG_TYPE_INT8 = 20;
-    private static final int PG_TYPE_INT2 = 21;
-    private static final int PG_TYPE_INT4 = 23;
-    private static final int PG_TYPE_TEXT = 25;
-    //private static final int PG_TYPE_OID = 26;
-    private static final int PG_TYPE_FLOAT4 = 700;
-    private static final int PG_TYPE_FLOAT8 = 701;
-    private static final int PG_TYPE_UNKNOWN = 705;
     
-    private static final int PG_TYPE_OIDVECTOR = PgCatalogMetadataStore.PG_TYPE_OIDVECTOR;
-    private static final int PG_TYPE_OIDARRAY = PgCatalogMetadataStore.PG_TYPE_OIDARRAY;
-    private static final int PG_TYPE_CHARARRAY = PgCatalogMetadataStore.PG_TYPE_CHARARRAY;
-    private static final int PG_TYPE_TEXTARRAY = PgCatalogMetadataStore.PG_TYPE_TEXTARRAY;
-    
-    private static final int PG_TYPE_DATE = 1082;
-    private static final int PG_TYPE_TIME = 1083;
-    private static final int PG_TYPE_TIMESTAMP_NO_TMZONE = 1114;
-    private static final int PG_TYPE_NUMERIC = 1700;
-    //private static final int PG_TYPE_LO = 14939;
-    
     private DataOutputStream dataOut;
     private ByteArrayOutputStream outBuffer;
     private char messageType;
@@ -191,7 +205,7 @@
 
 	private SSLConfiguration config;
 
-    public PgBackendProtocol(int maxLobSize, SSLConfiguration config) {
+	public PgBackendProtocol(int maxLobSize, SSLConfiguration config) {
     	this.maxLobSize = maxLobSize;
     	this.config = config;
     }
@@ -340,35 +354,65 @@
 	}
 
 	@Override
-	public void sendResultSetDescription(ResultSetMetaData metaData, Statement stmt) {
+	public void sendResultSetDescription(List<PgColInfo> cols) {
 		try {
+			sendRowDescription(cols);
+		} catch (IOException e) {
+			terminate(e);
+		}
+	}
+	
+	@Override
+	public void sendCursorResults(ResultSetImpl rs, List<PgColInfo> cols, ResultsFuture<Integer> result, int rowCount) {
+		try {
+        	sendRowDescription(cols);
+
+        	ResultsWorkItem r = new ResultsWorkItem(cols, rs, result, rowCount);
+        	r.run();  	        					
+		} catch (IOException e) {
+			terminate(e);
+		}
+	}
+	
+	@Override
+	public void sendPortalResults(String sql, ResultSetImpl rs, List<PgColInfo> cols, ResultsFuture<Integer> result, int rowCount, boolean portal) {
+    	ResultsWorkItem r = new ResultsWorkItem(cols, rs, result, rowCount);
+    	r.run();	        	
+	}
+	
+	@Override
+	public void sendMoveCursor(ResultSetImpl rs, int rowCount, ResultsFuture<Integer> results) {
+		try {
 			try {
-        		List<PgColInfo> cols = getPgColInfo(metaData, stmt);
-				sendRowDescription(cols);
+				int rowsMoved = 0;
+				for (int i = 0; i < rowCount; i++) {
+					if (!rs.next()) {
+						break;
+					}
+					rowsMoved++;
+				}				
+				results.getResultsReceiver().receiveResults(rowsMoved);
 			} catch (SQLException e) {
-				sendErrorResponse(e);				
-			}			
+				sendErrorResponse(e);
+			}
 		} catch (IOException e) {
 			terminate(e);
 		}
-	}
+	}		
 	
 	@Override
-	public void sendResults(final String sql, final ResultSetImpl rs, ResultsFuture<Void> result, boolean describeRows) {
+	public void sendResults(final String sql, final ResultSetImpl rs, List<PgColInfo> cols, ResultsFuture<Integer> result, boolean describeRows) {
 		try {
 			if (nextFuture != null) {
 				sendErrorResponse(new IllegalStateException("Pending results have not been sent")); //$NON-NLS-1$
 			}
         	
-			ResultSetMetaData meta = rs.getMetaData();
-    		List<PgColInfo> cols = getPgColInfo(meta, rs.getStatement());
         	if (describeRows) {
         		sendRowDescription(cols);
         	}
-        	Runnable r = new ResultsWorkItem(cols, sql, rs, result);
+        	ResultsWorkItem r = new ResultsWorkItem(cols, rs, result, -1);
         	r.run();    
-		} catch (SQLException e) {
-			result.getResultsReceiver().exceptionOccurred(e);
+        	sendCommandComplete(sql, 0);
 		} catch (IOException e) {
 			terminate(e);
 		}
@@ -425,8 +469,9 @@
 		startMessage('I');
 		sendMessage();
 	}
-		
-	private void sendCommandComplete(String sql, int updateCount) throws IOException {
+
+	@Override
+	public void sendCommandComplete(String sql, int updateCount) throws IOException {
 		startMessage('C');
 		sql = sql.trim().toUpperCase();
 		// TODO remove remarks at the beginning
@@ -439,7 +484,7 @@
 			tag = "UPDATE " + updateCount;
 		} else if (sql.startsWith("SELECT") || sql.startsWith("CALL")) {
 			tag = "SELECT";
-		} else if (sql.startsWith("BEGIN")) {
+		} else if (sql.startsWith("BEGIN") || sql.startsWith("START TRANSACTION")) {
 			tag = "BEGIN";
 		} else if (sql.startsWith("COMMIT")) {
 			tag = "COMMIT";
@@ -447,15 +492,23 @@
 			tag = "ROLLBACK";
 		} else if (sql.startsWith("SET ")) {
 			tag = "SET";
-		}else {
-			trace("Check command tag:", sql);
-			tag = "UPDATE " + updateCount;
+		}  else if (sql.startsWith("DECLARE CURSOR")) {
+			tag = "DECLARE CURSOR";
+		} else if (sql.startsWith("CLOSE CURSOR")) {
+			tag = "CLOSE CURSOR";
+		} else if (sql.startsWith("FETCH")) {
+			tag = "FETCH "+ updateCount;
+		}  else if (sql.startsWith("MOVE")) {
+			tag = "MOVE "+ updateCount;
 		}
+		else {
+			tag = sql;
+		}
 		writeString(tag);
 		sendMessage();
 	}
 
-	private void sendDataRow(ResultSet rs, List<PgColInfo> cols) throws SQLException, IOException {
+	private void sendDataRow(ResultSet rs, List<PgColInfo> cols, ChannelBuffer buffer) throws SQLException, IOException {
 		startMessage('D');
 		writeShort(cols.size());
 		for (int i = 0; i < cols.size(); i++) {
@@ -467,9 +520,18 @@
 				write(bytes);
 			}
 		}
-		sendMessage();
+		
+		byte[] buff = outBuffer.toByteArray();
+		int len = buff.length;
+		this.outBuffer = null;
+		this.dataOut = null;
+		
+		// now build the wire contents.
+		buffer.writeByte((byte)this.messageType);
+		buffer.writeInt(len+4);
+		buffer.writeBytes(buff);
 	}
-
+	
 	private byte[] getContent(ResultSet rs, PgColInfo col, int column) throws SQLException, TeiidSQLException, IOException {
 		byte[] bytes = null;
 		switch (col.type) {
@@ -612,20 +674,7 @@
 		write(0);
 		sendMessage();
 	}
-
-	private void sendNoData() {
-		startMessage('n');
-		sendMessage();
-	}
 	
-	private static class PgColInfo {
-		String name;
-		int reloid;
-		short attnum;
-		int type;
-		int precision;
-	}
-
 	private void sendRowDescription(List<PgColInfo> cols) throws IOException {
 		startMessage('T');
 		writeShort(cols.size());
@@ -647,46 +696,6 @@
 		sendMessage();
 	}
 
-	private List<PgColInfo> getPgColInfo(ResultSetMetaData meta, Statement stmt)
-			throws SQLException {
-		int columns = meta.getColumnCount();
-		ArrayList<PgColInfo> result = new ArrayList<PgColInfo>(columns);
-		for (int i = 1; i < columns + 1; i++) {
-	 		PgColInfo info = new PgColInfo();
-			info.name = meta.getColumnName(i).toLowerCase();
-			info.type = meta.getColumnType(i);
-			info.type = convertType(info.type);
-			info.precision = meta.getColumnDisplaySize(i);
-			String name = meta.getColumnName(i);
-			String table = meta.getTableName(i);
-			String schema = meta.getSchemaName(i);
-			if (schema != null) {
-				PreparedStatement ps = null;
-				try {
-					ps = stmt.getConnection().prepareStatement("select attrelid, attnum, typoid from matpg_relatt where attname = ? and relname = ? and nspname = ?");
-					ps.setString(1, name);
-					ps.setString(2, table);
-					ps.setString(3, schema);
-					ResultSet rs = ps.executeQuery();
-					if (rs.next()) {
-						info.reloid = rs.getInt(1);
-						info.attnum = rs.getShort(2);
-						int specificType = rs.getInt(3);
-						if (!rs.wasNull()) {
-							info.type = specificType;
-						}
-					}
-				} finally {
-					if (ps != null) {
-						ps.close();
-					}
-				}
-			}
-			result.add(info);
-		}
-		return result;
-	}
-
 	private int getTypeSize(int pgType, int precision) {
 		switch (pgType) {
 		case PG_TYPE_VARCHAR:
@@ -728,6 +737,12 @@
 		startMessage('2');
 		sendMessage();
 	}
+	
+	@Override
+	public void sendPortalSuspended() {
+		startMessage('s');
+		sendMessage();
+	}	
 
 	private void sendAuthenticationCleartextPassword() throws IOException {
 		startMessage('R');
@@ -844,6 +859,10 @@
 		buffer.writeBytes(buff);
 		Channels.write(this.ctx, this.message.getFuture(), buffer, this.message.getRemoteAddress());
 	}
+	
+	private void sendContents(ChannelBuffer buffer) {
+		Channels.write(this.ctx, this.message.getFuture(), buffer, this.message.getRemoteAddress());
+	}
 
 	private static void trace(String... msg) {
 		LogManager.logTrace(LogConstants.CTX_ODBC, (Object[])msg);

Modified: branches/7.4.x/runtime/src/main/resources/org/teiid/runtime/i18n.properties
===================================================================
--- branches/7.4.x/runtime/src/main/resources/org/teiid/runtime/i18n.properties	2011-06-29 00:24:02 UTC (rev 3289)
+++ branches/7.4.x/runtime/src/main/resources/org/teiid/runtime/i18n.properties	2011-06-29 01:10:22 UTC (rev 3290)
@@ -92,5 +92,6 @@
 ambigious_name=Ambiguous VDB name specified. Only single occurrence of the "." is allowed in the VDB name. Also, when version based vdb name is specified, then a separate "version" connection option is not allowed:{0}.{1} 
 lo_not_supported=LO functions are not supported
 SSLConfiguration.no_anonymous=The anonymous cipher suite TLS_DH_anon_WITH_AES_128_CBC_SHA is not available.  Please change the transport to be non-SSL or use non-anonymous SSL.
+execution_failed=Cursor execution failed
 
 PgBackendProtocol.ssl_error=Could not initialize ODBC SSL.  non-SSL connections will still be allowed.
\ No newline at end of file

Modified: branches/7.4.x/test-integration/common/src/test/java/org/teiid/transport/TestODBCSocketTransport.java
===================================================================
--- branches/7.4.x/test-integration/common/src/test/java/org/teiid/transport/TestODBCSocketTransport.java	2011-06-29 00:24:02 UTC (rev 3289)
+++ branches/7.4.x/test-integration/common/src/test/java/org/teiid/transport/TestODBCSocketTransport.java	2011-06-29 01:10:22 UTC (rev 3290)
@@ -255,6 +255,13 @@
 		assertEquals("oid", rs.getArray("proargtypes").getBaseTypeName());
 	}
 	
+	// this does not work as JDBC always sends the queries in prepared form
+	public void testPgDeclareCursor() throws Exception {
+		Statement stmt = conn.createStatement();
+		ResultSet rs = stmt.executeQuery("begin;declare \"foo\" cursor for select * from pg_proc;fetch 10 in \"foo\"; close \"foo\"");
+		rs.next();		
+	}	
+	
 	@Test public void testPgProcedure() throws Exception {
 		Statement stmt = conn.createStatement();
 		ResultSet rs = stmt.executeQuery("select has_function_privilege(100, 'foo')");



More information about the teiid-commits mailing list