[teiid-commits] teiid SVN: r3269 - in branches/7.1.1.CP3/runtime/src/main/java/org/teiid: transport and 1 other directory.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Thu Jun 23 15:56:53 EDT 2011


Author: rareddy
Date: 2011-06-23 15:56:53 -0400 (Thu, 23 Jun 2011)
New Revision: 3269

Modified:
   branches/7.1.1.CP3/runtime/src/main/java/org/teiid/odbc/ODBCClientRemote.java
   branches/7.1.1.CP3/runtime/src/main/java/org/teiid/odbc/ODBCServerRemote.java
   branches/7.1.1.CP3/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java
   branches/7.1.1.CP3/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java
Log:
TEIID-1652, TEIID-1653

Modified: branches/7.1.1.CP3/runtime/src/main/java/org/teiid/odbc/ODBCClientRemote.java
===================================================================
--- branches/7.1.1.CP3/runtime/src/main/java/org/teiid/odbc/ODBCClientRemote.java	2011-06-23 19:02:26 UTC (rev 3268)
+++ branches/7.1.1.CP3/runtime/src/main/java/org/teiid/odbc/ODBCClientRemote.java	2011-06-23 19:56:53 UTC (rev 3269)
@@ -21,6 +21,7 @@
  */
 package org.teiid.odbc;
 
+import java.io.IOException;
 import java.nio.charset.Charset;
 import java.sql.ParameterMetaData;
 import java.sql.ResultSet;
@@ -65,7 +66,16 @@
 	
 	//	DataRow (B)
 	//	CommandComplete (B)
+	// if count = -1 send all
 	void sendResults(String sql, ResultSet rs, boolean describeRows);
+	
+	void sendCursorResults(ResultSet rs, int rowCount);
+	
+	void sendPortalResults(String sql, ResultSet rs, int rowCount, boolean portal);
+	
+	void sendMoveCursor(ResultSet rs, int rowCount);
+	
+	void sendCommandComplete(String sql, int updateCount) throws IOException;
 
 	//	CommandComplete (B)
 	void sendUpdateCount(String sql, int updateCount);
@@ -100,9 +110,4 @@
 	
 	//	NoticeResponse (B)
 	//	NotificationResponse (B)
-	
-	//	PortalSuspended (B)
-	
-		
-
 }

Modified: branches/7.1.1.CP3/runtime/src/main/java/org/teiid/odbc/ODBCServerRemote.java
===================================================================
--- branches/7.1.1.CP3/runtime/src/main/java/org/teiid/odbc/ODBCServerRemote.java	2011-06-23 19:02:26 UTC (rev 3268)
+++ branches/7.1.1.CP3/runtime/src/main/java/org/teiid/odbc/ODBCServerRemote.java	2011-06-23 19:56:53 UTC (rev 3269)
@@ -55,6 +55,11 @@
 	void unsupportedOperation(String msg);
 
 	void flush();
+	
+	void cursorExecute(String prepareName, String sql);
+	void cursorFetch(String prepareName, int rows);
+	void cursorMove(String prepareName, int rows);
+	void cursorClose(String prepareName);
 
 	//  unimplemented frontend messages
 	//	CopyData (F & B)

Modified: branches/7.1.1.CP3/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java
===================================================================
--- branches/7.1.1.CP3/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java	2011-06-23 19:02:26 UTC (rev 3268)
+++ branches/7.1.1.CP3/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java	2011-06-23 19:56:53 UTC (rev 3269)
@@ -137,11 +137,16 @@
 	private static Pattern preparedAutoIncrement = Pattern.compile("select 1 \\s*from pg_catalog.pg_attrdef \\s*where adrelid = \\$1 AND adnum = \\$2 " + //$NON-NLS-1$
 			"\\s*and pg_catalog.pg_get_expr\\(adbin, adrelid\\) \\s*like '%nextval\\(%'", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
 	
-	private static Pattern deallocatePattern = Pattern.compile("DEALLOCATE \"(\\w+\\d+_*)\""); //$NON-NLS-1$
-	private static Pattern releasePattern = Pattern.compile("RELEASE (\\w+\\d+_*)"); //$NON-NLS-1$
-	private static Pattern savepointPattern = Pattern.compile("SAVEPOINT (\\w+\\d+_*)"); //$NON-NLS-1$
-	private static Pattern rollbackPattern = Pattern.compile("ROLLBACK\\s*(to)*\\s*(\\w+\\d+_*)*"); //$NON-NLS-1$
+	private static Pattern cursorSelectPattern = Pattern.compile("DECLARE \"(\\w+)\" CURSOR(\\s(WITH HOLD|SCROLL))? FOR (.*)", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
+	private static Pattern fetchPattern = Pattern.compile("FETCH (\\d+) IN \"(\\w+)\".*", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
+	private static Pattern movePattern = Pattern.compile("MOVE (\\d+) IN \"(\\w+)\".*", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
+	private static Pattern closePattern = Pattern.compile("CLOSE \"(\\w+)\"", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
 	
+	private static Pattern deallocatePattern = Pattern.compile("DEALLOCATE \"(\\w+\\d+_*)\"", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
+	private static Pattern releasePattern = Pattern.compile("RELEASE (\\w+\\d?_*)", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
+	private static Pattern savepointPattern = Pattern.compile("SAVEPOINT (\\w+\\d?_*)", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
+	private static Pattern rollbackPattern = Pattern.compile("ROLLBACK\\s*(to)*\\s*(\\w+\\d+_*)*", Pattern.CASE_INSENSITIVE); //$NON-NLS-1$
+	
 	private ODBCClientRemote client;
 	private Properties props;
 	private AuthenticationType authType;
@@ -150,6 +155,7 @@
 	// TODO: this is unbounded map; need to define some boundaries as to how many stmts each session can have
 	private Map<String, Prepared> preparedMap = Collections.synchronizedMap(new HashMap<String, Prepared>());
 	private Map<String, Portal> portalMap = Collections.synchronizedMap(new HashMap<String, Portal>());
+	private Map<String, Cursor> cursorMap = Collections.synchronizedMap(new HashMap<String, Cursor>());
 	
 	public ODBCServerRemoteImpl(ODBCClientRemote client, AuthenticationType authType) {
 		this.client = client;
@@ -189,6 +195,76 @@
 	}	
 	
 	@Override
+	public void cursorExecute(String cursorName, String sql) {
+		if (this.connection != null) {
+			if (sql != null) {
+				String modfiedSQL = sql.replaceAll("\\$\\d+", "?");//$NON-NLS-1$ //$NON-NLS-2$
+				try {
+					// close if the name is already used or the unnamed prepare; otherwise
+					// stmt is alive until session ends.
+					Prepared previous = this.preparedMap.remove(cursorName);
+					if (previous != null) {
+						previous.stmt.close();
+					}
+					
+					PreparedStatement stmt = this.connection.prepareStatement(modfiedSQL);
+					boolean hasResults = stmt.execute();
+					this.cursorMap.put(cursorName, new Cursor(cursorName, sql, stmt, null, hasResults?stmt.getResultSet():null));
+					this.client.sendCommandComplete("DECLARE CURSOR", 0); //$NON-NLS-1$
+				} catch (SQLException e) {
+					this.client.errorOccurred(e);
+				} catch (IOException e) {
+					this.client.errorOccurred(e);
+				}
+			}
+		}
+		else {
+			this.client.errorOccurred(RuntimePlugin.Util.getString("no_active_connection")); //$NON-NLS-1$
+		}
+		
+	}
+	@Override
+	public void cursorFetch(String cursorName, int rows) {
+		Cursor cursor = this.cursorMap.get(cursorName);
+		if (cursor != null) {
+			cursor.fetchSize = rows;
+			this.client.sendCursorResults(cursor.rs, rows);
+		}
+		else {
+			this.client.errorOccurred(RuntimePlugin.Util.getString("not_bound", cursorName)); //$NON-NLS-1$
+			return;
+		}
+	}
+	
+	@Override
+	public void cursorMove(String prepareName, int rows) {
+		Cursor cursor = this.cursorMap.get(prepareName);
+		if (cursor != null) {
+			this.client.sendMoveCursor(cursor.rs, rows);
+		}
+		else {
+			this.client.errorOccurred(RuntimePlugin.Util.getString("not_bound", prepareName)); //$NON-NLS-1$
+			return;
+		}
+	}	
+	
+	@Override
+	public void cursorClose(String prepareName) {
+		Cursor cursor = this.cursorMap.remove(prepareName);
+		if (cursor != null) {
+			try {
+				cursor.rs.close();
+				cursor.stmt.close();
+				this.client.sendCommandComplete("CLOSE CURSOR", 0); //$NON-NLS-1$
+			} catch (SQLException e) {
+				this.client.errorOccurred(e);
+			} catch (IOException e) {
+				this.client.errorOccurred(e);
+			}
+		}
+	}
+	
+	@Override
 	public void prepare(String prepareName, String sql, int[] paramType) {
 		if (this.connection != null) {
 			
@@ -263,43 +339,50 @@
 			bindName  = UNNAMED;
 		}		
 		
-		Portal query = this.portalMap.get(bindName);
-		if (query == null) {
-			this.client.errorOccurred(RuntimePlugin.Util.getString("not_bound", bindName)); //$NON-NLS-1$
-			sync();
-		}				
+		Cursor cursor = this.cursorMap.get(bindName);
+		if (cursor != null) {
+			this.client.sendPortalResults(cursor.sql, cursor.rs, cursor.fetchSize, true);
+		}
 		else {
-			if (query.sql.trim().isEmpty()) {
-				this.client.emptyQueryReceived();
-				return;
+			Portal query = this.portalMap.get(bindName);
+			if (query == null) {
+				this.client.errorOccurred(RuntimePlugin.Util.getString("not_bound", bindName)); //$NON-NLS-1$
+				sync();
+			}				
+			else {
+				if (query.sql.trim().isEmpty()) {
+					this.client.emptyQueryReceived();
+					return;
+				}
+				
+	            PreparedStatement stmt = query.stmt;
+	            try {
+	            	// maxRows = 0, means unlimited.
+	            	if (maxRows != 0) {
+	            		stmt.setMaxRows(maxRows);
+	            	}
+	            	
+	                boolean result = stmt.execute();
+	                if (result) {
+	                    try {
+	                        ResultSet rs = stmt.getResultSet();
+	                        this.client.sendResults(query.sql, rs, true);
+	                    } catch (SQLException e) {
+	                        this.client.errorOccurred(e);
+	                    }
+	                } else {
+	                	this.client.sendUpdateCount(query.sql, stmt.getUpdateCount());
+	                }
+	            } catch (SQLException e) {
+	            	this.client.errorOccurred(e);
+	            }			
 			}
-			
-            PreparedStatement stmt = query.stmt;
-            try {
-            	// maxRows = 0, means unlimited.
-            	if (maxRows != 0) {
-            		stmt.setMaxRows(maxRows);
-            	}
-            	
-                boolean result = stmt.execute();
-                if (result) {
-                    try {
-                        ResultSet rs = stmt.getResultSet();
-                        this.client.sendResults(query.sql, rs, true);
-                    } catch (SQLException e) {
-                        this.client.errorOccurred(e);
-                    }
-                } else {
-                	this.client.sendUpdateCount(query.sql, stmt.getUpdateCount());
-                }
-            } catch (SQLException e) {
-            	this.client.errorOccurred(e);
-            }			
 		}
 	}
 	
 	private String fixSQL(String sql) {
 		String modified = sql;
+		Matcher m = null;
 		// select current_schema()
 		// set client_encoding to 'WIN1252'
 		if (sql != null) {
@@ -307,8 +390,7 @@
 			String sqlLower = sql.toLowerCase();
 			if (sqlLower.startsWith("select")) { //$NON-NLS-1$
 				modified = sql.replace('\n', ' ');
-											
-				Matcher m = null;
+															
 				if ((m = pkPattern.matcher(modified)).matches()) {
 					modified = new StringBuffer("SELECT k.Name AS attname, convert(Position, short) AS attnum, TableName AS relname, SchemaName AS nspname, TableName AS relname") //$NON-NLS-1$
 				          .append(" FROM SYS.KeyColumns k") //$NON-NLS-1$ 
@@ -367,7 +449,7 @@
 				modified = "select 63"; //$NON-NLS-1$
 			}
 			else {
-				Matcher m = setPattern.matcher(sql);
+				m = setPattern.matcher(sql);
 				if (m.matches()) {
 					if (m.group(2).equalsIgnoreCase("client_encoding")) { //$NON-NLS-1$
 						this.client.setEncoding(PGCharsetConverter.getCharset(m.group(4)));
@@ -383,16 +465,6 @@
 				else if ((m = rollbackPattern.matcher(modified)).matches()) {
 					modified = "ROLLBACK"; //$NON-NLS-1$
 				}	
-				else if ((m = savepointPattern.matcher(modified)).matches()) {
-					modified = "SELECT 'SAVEPOINT'"; //$NON-NLS-1$
-				}
-				else if ((m = releasePattern.matcher(modified)).matches()) {
-					modified = "SELECT 'RELEASE'"; //$NON-NLS-1$
-				}		
-				else if ((m = deallocatePattern.matcher(modified)).matches()) {
-					closePreparedStatement(m.group(1));
-					modified = "SELECT 'DEALLOCATE'"; //$NON-NLS-1$
-				}					
 			}
 			if (modified != null && !modified.equalsIgnoreCase(sql)) {
 				LogManager.logDetail(LogConstants.CTX_ODBC, "Modified Query:"+modified); //$NON-NLS-1$
@@ -417,39 +489,71 @@
 		try {			
 	        ScriptReader reader = new ScriptReader(new StringReader(query));
 	        String sql =  reader.readStatement();
-	        String s = fixSQL(sql);
-	        while (s != null) {
-	            Statement stmt = null;
-	            try {
-	                stmt = this.connection.createStatement();
-	                boolean result = stmt.execute(s);
-	                if (result) {
-	                	this.client.sendResults(sql, stmt.getResultSet(), true);
-	                } else {
-	                	this.client.sendUpdateCount(sql, stmt.getUpdateCount());
-	                }
-	                sql = reader.readStatement();
-	                s = fixSQL(sql);
-	            } catch (SQLException e) {
-	                this.client.errorOccurred(e);
-	                break;
-	            } finally {
-	                try {
-	                	if (stmt != null) {
-	                		stmt.close();
-	                	}
-					} catch (SQLException e) {
-						this.client.errorOccurred(e);
+	        while (sql != null) {
+		        Matcher m = null;
+		        if ((m = cursorSelectPattern.matcher(sql)).matches()){
+					cursorExecute(m.group(1), fixSQL(m.group(4)));
+				}
+				else if ((m = fetchPattern.matcher(sql)).matches()){
+					cursorFetch(m.group(2), Integer.parseInt(m.group(1)));
+				}
+				else if ((m = movePattern.matcher(sql)).matches()){
+					cursorMove(m.group(2), Integer.parseInt(m.group(1)));
+				}
+				else if ((m = closePattern.matcher(sql)).matches()){
+					cursorClose(m.group(1));
+				}
+				else if ((m = savepointPattern.matcher(sql)).matches()) {
+					this.client.sendCommandComplete("SAVEPOINT", 0); //$NON-NLS-1$
+				}
+				else if ((m = releasePattern.matcher(sql)).matches()) {
+					this.client.sendCommandComplete("RELEASE", 0); //$NON-NLS-1$
+				}		
+				else if ((m = deallocatePattern.matcher(sql)).matches()) { 
+					closePreparedStatement(m.group(1));
+					this.client.sendCommandComplete("DEALLOCATE", 0); //$NON-NLS-1$
+				}					
+		        
+				else {
+					if (!executeAndSend(fixSQL(sql))) {
 						break;
 					}
-	            }
+				}
+	            sql = reader.readStatement();
 	        }
+	        sync();
 		} catch(IOException e) {
 			this.client.errorOccurred(e);
 		}
-		sync();
 	}
 
+	private boolean executeAndSend(String sql) {
+		boolean sucess = true;
+        Statement stmt = null;
+        try {
+            stmt = this.connection.createStatement();
+            boolean result = stmt.execute(sql);
+            if (result) {
+            	this.client.sendResults(sql, stmt.getResultSet(), true);
+            } else {
+            	this.client.sendUpdateCount(sql, stmt.getUpdateCount());
+            }
+        } catch (SQLException e) {
+            this.client.errorOccurred(e);
+            sucess = false;
+        } finally {
+            try {
+            	if (stmt != null) {
+            		stmt.close();
+            	}
+			} catch (SQLException e) {
+				this.client.errorOccurred(e);
+				sucess = false;
+			}
+        }
+        return sucess;
+	}
+
 	@Override
 	public void getParameterDescription(String prepareName) {
 		if (prepareName == null || prepareName.length() == 0) {
@@ -625,6 +729,16 @@
          */
         int[] paramType;
     }
+    
+    static class Cursor extends Prepared {
+    	ResultSet rs;
+    	int fetchSize = 1000;
+    	
+    	public Cursor (String name, String sql, PreparedStatement stmt, int[] paramType, ResultSet rs) {
+    		super(name, sql, stmt, paramType);
+    		this.rs = rs;
+    	}
+    }
 
     /**
      * Represents a PostgreSQL Portal object.

Modified: branches/7.1.1.CP3/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java
===================================================================
--- branches/7.1.1.CP3/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java	2011-06-23 19:02:26 UTC (rev 3268)
+++ branches/7.1.1.CP3/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java	2011-06-23 19:56:53 UTC (rev 3269)
@@ -241,6 +241,61 @@
 	}
 
 	@Override
+	public void sendCursorResults(ResultSet rs, int rowCount) {
+		try {
+			try {
+        		ResultSetMetaData meta = rs.getMetaData();
+        		sendRowDescription(meta, rs.getStatement());
+            	int rowsSent = sendDataRows(rs, rowCount);
+				sendCommandComplete("FETCH", rowsSent);
+			} catch (SQLException e) {
+				sendErrorResponse(e);
+			}
+		} catch (IOException e) {
+			terminate(e);
+		}
+	}
+	
+	@Override
+	public void sendPortalResults(String sql, ResultSet rs, int rowCount, boolean portal) {
+		try {
+			try {
+				int rowsSent = sendDataRows(rs, rowCount);
+				if (rowsSent < rowCount) {
+					sendCommandComplete(sql, 0);
+				}
+				else {
+					sendPortalSuspended();
+				}
+			} catch (SQLException e) {
+				sendErrorResponse(e);
+			}
+		} catch (IOException e) {
+			terminate(e);
+		} 
+	}
+	
+	@Override
+	public void sendMoveCursor(ResultSet rs, int rowCount) {
+		try {
+			try {
+				int rowsMoved = 0;
+				for (int i = 0; i < rowCount; i++) {
+					if (!rs.next()) {
+						break;
+					}
+					rowsMoved++;
+				}
+				sendCommandComplete("MOVE", rowsMoved);
+			} catch (SQLException e) {
+				sendErrorResponse(e);
+			}
+		} catch (IOException e) {
+			terminate(e);
+		}
+	}	
+	
+	@Override
 	public void sendResults(String sql, ResultSet rs, boolean describeRows) {
 		try {
             try {
@@ -248,9 +303,7 @@
             		ResultSetMetaData meta = rs.getMetaData();
             		sendRowDescription(meta, rs.getStatement());
             	}
-                while (rs.next()) {
-                    sendDataRow(rs);
-                }
+                sendDataRows(rs, -1);
                 sendCommandComplete(sql, 0);
 			} catch (SQLException e) {
 				sendErrorResponse(e);
@@ -311,8 +364,9 @@
 		startMessage('I');
 		sendMessage();
 	}
-		
-	private void sendCommandComplete(String sql, int updateCount) throws IOException {
+	
+	@Override
+	public void sendCommandComplete(String sql, int updateCount) throws IOException {
 		startMessage('C');
 		sql = sql.trim().toUpperCase();
 		// TODO remove remarks at the beginning
@@ -331,22 +385,64 @@
 			tag = "COMMIT";
 		} else if (sql.startsWith("ROLLBACK")) {
 			tag = "ROLLBACK";
-		} else {
-			trace("Check command tag: " + sql);
-			tag = "UPDATE " + updateCount;
+		} else if (sql.startsWith("DECLARE CURSOR")) {
+			tag = "DECLARE CURSOR";
+		} else if (sql.startsWith("CLOSE CURSOR")) {
+			tag = "CLOSE CURSOR";
+		} else if (sql.startsWith("FETCH")) {
+			tag = "FETCH "+updateCount;
+		}  else if (sql.startsWith("MOVE")) {
+			tag = "MOVE "+updateCount;
 		}
+		else {
+			tag = sql;
+		}
 		writeString(tag);
 		sendMessage();
 	}
-
-	private void sendDataRow(ResultSet rs) throws SQLException, IOException {
+	
+	// 300k
+	static int ODBC_SOCKET_BUFF_SIZE = Integer.parseInt(System.getProperty("ODBCPacketSize", "307200"));
+	
+	private int sendDataRows(ResultSet rs, int rowsToSend) throws SQLException, IOException {		
+		int avgRowsize = -1;
+		int rowCount = 0;
+		ChannelBuffer buffer = ChannelBuffers.directBuffer(ODBC_SOCKET_BUFF_SIZE);
 		int columns = rs.getMetaData().getColumnCount();
-		String[] values = new String[columns];
-		for (int i = 0; i < columns; i++) {
-			values[i] = rs.getString(i + 1);
+		int rowsSent = 0;
+		
+		while(rs.next()) {
+			String[] values = new String[columns];
+			for (int i = 0; i < columns; i++) {
+				values[i] = rs.getString(i + 1);
+			}
+			
+			rowCount++;
+			
+			buildDataRow(values, buffer);
+			avgRowsize = buffer.readableBytes()/rowCount;
+			
+			if (buffer.writableBytes() < (avgRowsize*2)) {
+				Channels.write(this.ctx, this.message.getFuture(), buffer, this.message.getRemoteAddress());
+				rowCount = 0;
+				buffer= ChannelBuffers.directBuffer(ODBC_SOCKET_BUFF_SIZE);
+			}
+			
+			rowsSent++;
+			if (rowsSent == rowsToSend) {
+				break;
+			}
 		}
+		
+		if (rowCount > 0) {
+			Channels.write(this.ctx, this.message.getFuture(), buffer, this.message.getRemoteAddress());
+		}
+		return rowsSent;
+	}	
+
+	private void buildDataRow(String[] values, ChannelBuffer buffer) throws IOException {
 		startMessage('D');
-		writeShort(columns);
+		writeShort(values.length);
 		for (String s : values) {
 			if (s == null) {
 				writeInt(-1);
@@ -357,7 +453,16 @@
 				write(d2);
 			}
 		}
-		sendMessage();
+		
+		byte[] buff = outBuffer.toByteArray();
+		int len = buff.length;
+		this.outBuffer = null;
+		this.dataOut = null;
+		
+		// now build the wire contents.
+		buffer.writeByte((byte)this.messageType);
+		buffer.writeInt(len+4);
+		buffer.writeBytes(buff);
 	}
 
 	private void sendErrorResponse(Throwable t) throws IOException {
@@ -474,6 +579,11 @@
 		startMessage('2');
 		sendMessage();
 	}
+	
+	private void sendPortalSuspended() {
+		startMessage('s');
+		sendMessage();
+	}
 
 	private void sendAuthenticationCleartextPassword() throws IOException {
 		startMessage('R');



More information about the teiid-commits mailing list