[teiid-commits] teiid SVN: r3123 - in trunk/runtime/src/main/java/org/teiid: transport and 1 other directory.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Tue Apr 26 21:26:25 EDT 2011


Author: shawkins
Date: 2011-04-26 21:26:25 -0400 (Tue, 26 Apr 2011)
New Revision: 3123

Modified:
   trunk/runtime/src/main/java/org/teiid/odbc/ODBCClientRemote.java
   trunk/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java
   trunk/runtime/src/main/java/org/teiid/transport/ODBCClientInstance.java
   trunk/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java
Log:
TEIID-1176 further refinement of odbc handling.  messages are queued and extended queries will properly handle error conditions

Modified: trunk/runtime/src/main/java/org/teiid/odbc/ODBCClientRemote.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/odbc/ODBCClientRemote.java	2011-04-26 20:04:35 UTC (rev 3122)
+++ trunk/runtime/src/main/java/org/teiid/odbc/ODBCClientRemote.java	2011-04-27 01:26:25 UTC (rev 3123)
@@ -26,6 +26,7 @@
 import java.sql.Statement;
 import java.util.Properties;
 
+import org.teiid.client.util.ResultsFuture;
 import org.teiid.jdbc.ResultSetImpl;
 
 public interface ODBCClientRemote {
@@ -65,7 +66,7 @@
 	
 	//	DataRow (B)
 	//	CommandComplete (B)
-	void sendResults(String sql, ResultSetImpl rs, boolean describeRows);
+	void sendResults(String sql, ResultSetImpl rs, ResultsFuture<Void> result, boolean describeRows);
 
 	//	CommandComplete (B)
 	void sendUpdateCount(String sql, int updateCount);

Modified: trunk/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java	2011-04-26 20:04:35 UTC (rev 3122)
+++ trunk/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java	2011-04-27 01:26:25 UTC (rev 3123)
@@ -29,6 +29,7 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -140,8 +141,8 @@
 	private Properties props;
 	private AuthenticationType authType;
 	private ConnectionImpl connection;
-	private boolean shouldSynch;
-	private boolean synchCalled;
+	private boolean executing;
+	private boolean errorOccurred;
 	
 	private volatile ResultsFuture<Boolean> executionFuture;
 	
@@ -179,9 +180,9 @@
 			this.connection =  (ConnectionImpl)driver.connect(url, info);
 			int hash = this.connection.getConnectionId().hashCode();
 			this.client.authenticationSucess(hash, hash);
-			ready(true);
+			ready();
 		} catch (SQLException e) {
-			this.client.errorOccurred(e);
+			errorOccurred(e);
 			terminate();
 		} 
 	}	
@@ -208,12 +209,12 @@
 					this.preparedMap.put(prepareName, new Prepared(prepareName, sql, stmt, paramType));
 					this.client.prepareCompleted(prepareName);
 				} catch (SQLException e) {
-					this.client.errorOccurred(e);
+					errorOccurred(e);
 				}
 			}
 		}
 		else {
-			this.client.errorOccurred(RuntimePlugin.Util.getString("no_active_connection")); //$NON-NLS-1$
+			errorOccurred(RuntimePlugin.Util.getString("no_active_connection")); //$NON-NLS-1$
 		}
 	}	
 	
@@ -229,7 +230,7 @@
 		
 		Prepared previous = this.preparedMap.get(prepareName);
 		if (previous == null) {
-			this.client.errorOccurred(RuntimePlugin.Util.getString("bad_binding", prepareName)); //$NON-NLS-1$
+			errorOccurred(RuntimePlugin.Util.getString("bad_binding", prepareName)); //$NON-NLS-1$
 			return;
 		}		
 		
@@ -242,7 +243,7 @@
 				previous.stmt.setObject(i+1, params[i]);
 			}
 		} catch (SQLException e) {
-			this.client.errorOccurred(e);
+			errorOccurred(e);
 		}
 		
 		this.portalMap.put(bindName, new Portal(bindName, prepareName, previous.sql, previous.stmt, resultColumnFormat));
@@ -251,13 +252,13 @@
 
 	@Override
 	public void unsupportedOperation(String msg) {
-		this.client.errorOccurred(msg);
-		ready(true);
+		errorOccurred(msg);
 	}
 
 	@Override
 	public void execute(String bindName, int maxRows) {
-		if (isAwaitingAsynch()) {
+		if (beginExecution()) {
+			errorOccurred("Awaiting asynch result"); //$NON-NLS-1$
 			return;
 		}
 		if (bindName == null || bindName.length() == 0) {
@@ -266,8 +267,7 @@
 		
 		final Portal query = this.portalMap.get(bindName);
 		if (query == null) {
-			this.client.errorOccurred(RuntimePlugin.Util.getString("not_bound", bindName)); //$NON-NLS-1$
-			ready(true);
+			errorOccurred(RuntimePlugin.Util.getString("not_bound", bindName)); //$NON-NLS-1$
 		}				
 		else {
 			if (query.sql.trim().isEmpty()) {
@@ -288,23 +288,34 @@
 	        		public void onCompletion(ResultsFuture<Boolean> future) {
 	        			executionFuture = null;
                         try {
-		        			if (future.get()) {
-	                            client.sendResults(query.sql, stmt.getResultSet(), true);
-		                    } else {
-		                    	client.sendUpdateCount(query.sql, stmt.getUpdateCount());
-		                    	setEncoding();
-		                    }
+                        	ResultsFuture<Void> result = null;
+			                if (future.get()) {
+			                	result = new ResultsFuture<Void>();
+	                            client.sendResults(query.sql, stmt.getResultSet(), result, true);
+			                } else {
+			                	result = ResultsFuture.NULL_FUTURE;
+			                	client.sendUpdateCount(query.sql, stmt.getUpdateCount());
+			                	setEncoding();
+			                }
+		        			result.addCompletionListener(new ResultsFuture.CompletionListener<Void>() {
+                            	public void onCompletion(ResultsFuture<Void> future) {
+                            		try {
+										future.get();
+	                            		doneExecuting();
+									} catch (InterruptedException e) {
+										throw new AssertionError(e);
+									} catch (ExecutionException e) {
+										errorOccurred(e.getCause());
+									}
+                            	};
+							});
                         } catch (Throwable e) {
-                            client.errorOccurred(e);
+                            errorOccurred(e);
                         }
-                        if (query.closeRequested) {
-                        	closeBoundStatement(query.name);
-                        }
-                    	ready(false);
 	        		}
 				});
             } catch (SQLException e) {
-            	this.client.errorOccurred(e);
+            	errorOccurred(e);
             }			
 		}
 	}
@@ -410,7 +421,9 @@
 
 	@Override
 	public void executeQuery(final String query) {
-		if (isAwaitingAsynch()) {
+		if (beginExecution()) {
+			this.client.errorOccurred("Awaiting asynch result"); //$NON-NLS-1$
+			ready();
 			return;
 		}
 		//46.2.3 Note that a simple Query message also destroys the unnamed portal.
@@ -419,27 +432,30 @@
 		
 		if (query.trim().length() == 0) {
     		this.client.emptyQueryReceived();
-    		ready(false);
+    		ready();
     		return;
     	}
         QueryWorkItem r = new QueryWorkItem(query);
 		r.run();
 	}
 
-	/**
-	 * Just a sanity check.  Should never happen
-	 */
-	private boolean isAwaitingAsynch() {
+	private boolean beginExecution() {
 		if (this.executionFuture != null) {
-			this.client.errorOccurred("Awaiting asynch result"); //$NON-NLS-1$
-			ready(true);
 			return true;
 		}
 		synchronized (this) {
-			this.shouldSynch = false;
+			this.executing = true;			
 		}
 		return false;
 	}
+	
+	public boolean isExecuting() {
+		return executing;
+	}
+	
+	public boolean isErrorOccurred() {
+		return errorOccurred;
+	}
 
 	@Override
 	public void getParameterDescription(String prepareName) {
@@ -448,17 +464,32 @@
 		}		
 		Prepared query = this.preparedMap.get(prepareName);
 		if (query == null) {
-			this.client.errorOccurred(RuntimePlugin.Util.getString("no_stmt_found", prepareName)); //$NON-NLS-1$
-			ready(true);
+			errorOccurred(RuntimePlugin.Util.getString("no_stmt_found", prepareName)); //$NON-NLS-1$
 		}
 		else {
 			try {
 				this.client.sendParameterDescription(query.stmt.getParameterMetaData(), query.paramType);
 			} catch (SQLException e) {
-				this.client.errorOccurred(e);
+				errorOccurred(e);
 			}
 		}
 	}
+	
+	private void errorOccurred(String error) {
+		this.client.errorOccurred(error);
+		synchronized (this) {
+			this.errorOccurred = true;
+			doneExecuting();
+		}
+	}
+	
+	private void errorOccurred(Throwable error) {
+		this.client.errorOccurred(error);
+		synchronized (this) {
+			this.errorOccurred = true;
+			doneExecuting();
+		}
+	}
 
 	@Override
 	public void getResultSetMetaDataDescription(String bindName) {
@@ -467,40 +498,27 @@
 		}		
 		Portal query = this.portalMap.get(bindName);
 		if (query == null) {
-			this.client.errorOccurred(RuntimePlugin.Util.getString("not_bound", bindName)); //$NON-NLS-1$
+			errorOccurred(RuntimePlugin.Util.getString("not_bound", bindName)); //$NON-NLS-1$
 		}
 		else {
 			try {
 				this.client.sendResultSetDescription(query.stmt.getMetaData(), query.stmt);
 			} catch (SQLException e) {
-				this.client.errorOccurred(e);
+				errorOccurred(e);
 			}
 		}
 	}
 
 	@Override
 	public void sync() {
-		boolean ready = false;
-		synchronized (this) {
-			synchCalled = true;
-			ready = this.shouldSynch;
-		}
-		if (ready) {
-			ready(true);
-		}
+		ready();
 	}
 	
-	private void ready(boolean sendAlways) {
-		synchronized (this) {
-			if (!sendAlways) {
-				shouldSynch = true;
-				if (!synchCalled) {
-					return;
-				}
-			}
-			shouldSynch = true;
-			synchCalled = false;
-		}
+	protected synchronized void doneExecuting() {
+		executing = false;
+	}
+	
+	private void ready() {
 		boolean inTxn = false;
 		boolean failedTxn = false;
 		try {
@@ -510,6 +528,9 @@
 		} catch (SQLException e) {
 			failedTxn = true;
 		}
+		synchronized (this) {
+			this.errorOccurred = false;
+		}
 		this.client.ready(inTxn, failedTxn);
 	}
 	
@@ -523,18 +544,9 @@
 		if (bindName == null || bindName.length() == 0) {
 			bindName  = UNNAMED;
 		}		
-		Portal query = this.portalMap.get(bindName);
-		if (query != null) {
-			if (this.executionFuture != null) {
-				synchronized(query) {
-					query.closeRequested = true;
-				}
-				return;
-			}
-		}
-		query = this.portalMap.remove(bindName);
+		Portal query = this.portalMap.remove(bindName);
 		if (query == null) {
-			this.client.errorOccurred(RuntimePlugin.Util.getString("not_bound", bindName)); //$NON-NLS-1$
+			errorOccurred(RuntimePlugin.Util.getString("not_bound", bindName)); //$NON-NLS-1$
 		}
 		else {
 			try {
@@ -559,7 +571,7 @@
 		}		
 		Prepared query = this.preparedMap.remove(preparedName);
 		if (query == null) {
-			this.client.errorOccurred(RuntimePlugin.Util.getString("no_stmt_found", preparedName)); //$NON-NLS-1$
+			errorOccurred(RuntimePlugin.Util.getString("no_stmt_found", preparedName)); //$NON-NLS-1$
 		}
 		else {
 			// Close all the bound messages off of this prepared
@@ -570,7 +582,7 @@
 				query.stmt.close();
 				this.client.statementClosed();
 			} catch (SQLException e) {
-				this.client.errorOccurred(RuntimePlugin.Util.getString("error_closing_stmt", preparedName)); //$NON-NLS-1$
+				errorOccurred(RuntimePlugin.Util.getString("error_closing_stmt", preparedName)); //$NON-NLS-1$
 			}			
 		}	
 	}
@@ -611,8 +623,7 @@
 	
 	@Override
 	public void functionCall(int oid) {
-		this.client.errorOccurred(RuntimePlugin.Util.getString("lo_not_supported")); //$NON-NLS-1$
-		ready(true);
+		errorOccurred(RuntimePlugin.Util.getString("lo_not_supported")); //$NON-NLS-1$
 	}
 	
 	@Override
@@ -661,29 +672,45 @@
 			        		public void onCompletion(ResultsFuture<Boolean> future) {
 			        			executionFuture = null;
 			        			try {
+			        				ResultsFuture<Void> result = null;
 					                if (future.get()) {
-					                	client.sendResults(sql, stmt.getResultSet(), true);
+					                	result = new ResultsFuture<Void>();
+			                            client.sendResults(sql, stmt.getResultSet(), result, true);
 					                } else {
+					                	result = ResultsFuture.NULL_FUTURE;
 					                	client.sendUpdateCount(sql, stmt.getUpdateCount());
 					                	setEncoding();
 					                }
-					                sql = reader.readStatement();
-					                modfiedSQL = fixSQL(sql);
+					                result.addCompletionListener(new ResultsFuture.CompletionListener<Void>() {
+		                            	public void onCompletion(ResultsFuture<Void> future) {
+		                            		try {
+												future.get();
+								                sql = reader.readStatement();
+								                modfiedSQL = fixSQL(sql);
+											} catch (InterruptedException e) {
+												throw new AssertionError(e);
+											} catch (IOException e) {
+												client.errorOccurred(e);
+												return;
+											} catch (ExecutionException e) {
+												client.errorOccurred(e.getCause());
+												return;
+											} finally {
+												try {
+													stmt.close();
+												} catch (SQLException e) {
+													LogManager.logDetail(LogConstants.CTX_ODBC, e, "Error closing statement"); //$NON-NLS-1$
+												}
+											}
+						        			QueryWorkItem.this.run(); //continue processing
+		                            	};
+									});
 			        			} catch (Throwable e) {
 			        				client.errorOccurred(e);
-			        				ready(true);
 			        				return;
-			        			} finally {
-			        				try {
-										stmt.close();
-									} catch (SQLException e) {
-										LogManager.logDetail(LogConstants.CTX_ODBC, e, "Error closing statement"); //$NON-NLS-1$
-									}
 			        			}
-			        			QueryWorkItem.this.run(); //continue processing
 			        		}
 						});
-		                ready(false);
 		                return; //wait for the execution to finish
 		            } catch (SQLException e) {
 		                client.errorOccurred(e);
@@ -693,8 +720,10 @@
 			} catch(IOException e) {
 				client.errorOccurred(e);
 			}
-			sync();
+			doneExecuting();
+			ready();
 		}
+
 	}
 
 	/**
@@ -764,9 +793,6 @@
          * The prepared statement.
          */
         PreparedStatementImpl stmt;
-        
-        boolean closeRequested;
     }
 
-
 }

Modified: trunk/runtime/src/main/java/org/teiid/transport/ODBCClientInstance.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/transport/ODBCClientInstance.java	2011-04-26 20:04:35 UTC (rev 3122)
+++ trunk/runtime/src/main/java/org/teiid/transport/ODBCClientInstance.java	2011-04-27 01:26:25 UTC (rev 3123)
@@ -26,6 +26,7 @@
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.util.Arrays;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.teiid.core.util.ReflectionHelper;
 import org.teiid.jdbc.TeiidDriver;
@@ -45,6 +46,7 @@
 	private ODBCClientRemote client;
 	private ODBCServerRemoteImpl server;
 	private ReflectionHelper serverProxy = new ReflectionHelper(ODBCServerRemote.class);
+	private ConcurrentLinkedQueue<PGRequest> messageQueue = new ConcurrentLinkedQueue<PGRequest>();
 	
 	public ODBCClientInstance(final ObjectChannel channel, ODBCServerRemote.AuthenticationType authType, TeiidDriver driver) {
 		this.client = (ODBCClientRemote)Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[] {ODBCClientRemote.class}, new InvocationHandler() {
@@ -58,7 +60,21 @@
 				return null;
 			}
 		});
-		this.server = new ODBCServerRemoteImpl(this, authType, driver);
+		this.server = new ODBCServerRemoteImpl(this, authType, driver) {
+			@Override
+			protected synchronized void doneExecuting() {
+				super.doneExecuting();
+				while (!server.isExecuting()) {
+					PGRequest request = messageQueue.poll();
+					if (request == null) {
+						break;
+					}
+	        		if (!server.isErrorOccurred() || request.struct.methodName.equals("sync")) { //$NON-NLS-1$
+	        			processMessage(request.struct);
+	        		}
+				}
+			}
+		};
 	}
 	
 	public ODBCClientRemote getClient() {
@@ -83,6 +99,17 @@
 	public void receivedMessage(Object msg) throws CommunicationException {
         if (msg instanceof PGRequest) {
         	PGRequest request = (PGRequest)msg;
+        	synchronized (server) {
+        		if (server.isExecuting()) {
+        			//queue until done
+        			messageQueue.add(request);
+        			return;
+        		}
+        		if (server.isErrorOccurred() && !request.struct.methodName.equals("sync")) { //$NON-NLS-1$
+        			//discard until sync
+        			return;
+        		}
+			}
             processMessage(request.struct);
         }
 	}

Modified: trunk/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java	2011-04-26 20:04:35 UTC (rev 3122)
+++ trunk/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java	2011-04-27 01:26:25 UTC (rev 3123)
@@ -72,12 +72,14 @@
 		private final List<PgColInfo> cols;
 		private final String sql;
 		private final ResultSetImpl rs;
+		private final ResultsFuture<Void> result;
 
 		private ResultsWorkItem(List<PgColInfo> cols, String sql,
-				ResultSetImpl rs) {
+				ResultSetImpl rs, ResultsFuture<Void> result) {
 			this.cols = cols;
 			this.sql = sql;
 			this.rs = rs;
+			this.result = result;
 		}
 
 		@Override
@@ -101,11 +103,7 @@
 			    		break;
 			    	}
 				} catch (Throwable t) {
-					try {
-						sendErrorResponse(t);
-					} catch (IOException e) {
-						terminate(e);
-					}
+					result.getResultsReceiver().exceptionOccurred(t);
 				}
 			}
 		}
@@ -118,14 +116,11 @@
     				sendDataRow(rs, cols);
     			} else {
     				sendCommandComplete(sql, 0);
+    				result.getResultsReceiver().receiveResults(null);
     				processNext = false;
     			}
 			} catch (Throwable t) {
-				try {
-					sendErrorResponse(t);
-				} catch (IOException e) {
-					terminate(e);
-				}
+				result.getResultsReceiver().exceptionOccurred(t);
 				return false;
 			}
 			return processNext;
@@ -167,6 +162,8 @@
     private MessageEvent message;
     private int maxLobSize = (2*1024*1024); // 2 MB
     
+	private volatile ResultsFuture<Boolean> nextFuture;
+
     public PgBackendProtocol(int maxLobSize) {
     	this.maxLobSize = maxLobSize;
     }
@@ -328,25 +325,22 @@
 		}
 	}
 	
-	private volatile ResultsFuture<Boolean> nextFuture;
-
 	@Override
-	public void sendResults(final String sql, final ResultSetImpl rs, boolean describeRows) {
+	public void sendResults(final String sql, final ResultSetImpl rs, ResultsFuture<Void> result, boolean describeRows) {
 		try {
 			if (nextFuture != null) {
 				sendErrorResponse(new IllegalStateException("Pending results have not been sent")); //$NON-NLS-1$
 			}
-            try {
-    			ResultSetMetaData meta = rs.getMetaData();
-        		List<PgColInfo> cols = getPgColInfo(meta, rs.getStatement());
-            	if (describeRows) {
-            		sendRowDescription(cols);
-            	}
-            	Runnable r = new ResultsWorkItem(cols, sql, rs);
-            	r.run();                
-			} catch (SQLException e) {
-				sendErrorResponse(e);
-			}			
+        	
+			ResultSetMetaData meta = rs.getMetaData();
+    		List<PgColInfo> cols = getPgColInfo(meta, rs.getStatement());
+        	if (describeRows) {
+        		sendRowDescription(cols);
+        	}
+        	Runnable r = new ResultsWorkItem(cols, sql, rs, result);
+        	r.run();    
+		} catch (SQLException e) {
+			result.getResultsReceiver().exceptionOccurred(e);
 		} catch (IOException e) {
 			terminate(e);
 		}



More information about the teiid-commits mailing list