[teiid-commits] teiid SVN: r3123 - in trunk/runtime/src/main/java/org/teiid: transport and 1 other directory.
teiid-commits at lists.jboss.org
teiid-commits at lists.jboss.org
Tue Apr 26 21:26:25 EDT 2011
Author: shawkins
Date: 2011-04-26 21:26:25 -0400 (Tue, 26 Apr 2011)
New Revision: 3123
Modified:
trunk/runtime/src/main/java/org/teiid/odbc/ODBCClientRemote.java
trunk/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java
trunk/runtime/src/main/java/org/teiid/transport/ODBCClientInstance.java
trunk/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java
Log:
TEIID-1176 further refinement of odbc handling. messages are queued and extended queries will properly handle error conditions
Modified: trunk/runtime/src/main/java/org/teiid/odbc/ODBCClientRemote.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/odbc/ODBCClientRemote.java 2011-04-26 20:04:35 UTC (rev 3122)
+++ trunk/runtime/src/main/java/org/teiid/odbc/ODBCClientRemote.java 2011-04-27 01:26:25 UTC (rev 3123)
@@ -26,6 +26,7 @@
import java.sql.Statement;
import java.util.Properties;
+import org.teiid.client.util.ResultsFuture;
import org.teiid.jdbc.ResultSetImpl;
public interface ODBCClientRemote {
@@ -65,7 +66,7 @@
// DataRow (B)
// CommandComplete (B)
- void sendResults(String sql, ResultSetImpl rs, boolean describeRows);
+ void sendResults(String sql, ResultSetImpl rs, ResultsFuture<Void> result, boolean describeRows);
// CommandComplete (B)
void sendUpdateCount(String sql, int updateCount);
Modified: trunk/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java 2011-04-26 20:04:35 UTC (rev 3122)
+++ trunk/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java 2011-04-27 01:26:25 UTC (rev 3123)
@@ -29,6 +29,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ExecutionException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -140,8 +141,8 @@
private Properties props;
private AuthenticationType authType;
private ConnectionImpl connection;
- private boolean shouldSynch;
- private boolean synchCalled;
+ private boolean executing;
+ private boolean errorOccurred;
private volatile ResultsFuture<Boolean> executionFuture;
@@ -179,9 +180,9 @@
this.connection = (ConnectionImpl)driver.connect(url, info);
int hash = this.connection.getConnectionId().hashCode();
this.client.authenticationSucess(hash, hash);
- ready(true);
+ ready();
} catch (SQLException e) {
- this.client.errorOccurred(e);
+ errorOccurred(e);
terminate();
}
}
@@ -208,12 +209,12 @@
this.preparedMap.put(prepareName, new Prepared(prepareName, sql, stmt, paramType));
this.client.prepareCompleted(prepareName);
} catch (SQLException e) {
- this.client.errorOccurred(e);
+ errorOccurred(e);
}
}
}
else {
- this.client.errorOccurred(RuntimePlugin.Util.getString("no_active_connection")); //$NON-NLS-1$
+ errorOccurred(RuntimePlugin.Util.getString("no_active_connection")); //$NON-NLS-1$
}
}
@@ -229,7 +230,7 @@
Prepared previous = this.preparedMap.get(prepareName);
if (previous == null) {
- this.client.errorOccurred(RuntimePlugin.Util.getString("bad_binding", prepareName)); //$NON-NLS-1$
+ errorOccurred(RuntimePlugin.Util.getString("bad_binding", prepareName)); //$NON-NLS-1$
return;
}
@@ -242,7 +243,7 @@
previous.stmt.setObject(i+1, params[i]);
}
} catch (SQLException e) {
- this.client.errorOccurred(e);
+ errorOccurred(e);
}
this.portalMap.put(bindName, new Portal(bindName, prepareName, previous.sql, previous.stmt, resultColumnFormat));
@@ -251,13 +252,13 @@
@Override
public void unsupportedOperation(String msg) {
- this.client.errorOccurred(msg);
- ready(true);
+ errorOccurred(msg);
}
@Override
public void execute(String bindName, int maxRows) {
- if (isAwaitingAsynch()) {
+ if (beginExecution()) {
+ errorOccurred("Awaiting asynch result"); //$NON-NLS-1$
return;
}
if (bindName == null || bindName.length() == 0) {
@@ -266,8 +267,7 @@
final Portal query = this.portalMap.get(bindName);
if (query == null) {
- this.client.errorOccurred(RuntimePlugin.Util.getString("not_bound", bindName)); //$NON-NLS-1$
- ready(true);
+ errorOccurred(RuntimePlugin.Util.getString("not_bound", bindName)); //$NON-NLS-1$
}
else {
if (query.sql.trim().isEmpty()) {
@@ -288,23 +288,34 @@
public void onCompletion(ResultsFuture<Boolean> future) {
executionFuture = null;
try {
- if (future.get()) {
- client.sendResults(query.sql, stmt.getResultSet(), true);
- } else {
- client.sendUpdateCount(query.sql, stmt.getUpdateCount());
- setEncoding();
- }
+ ResultsFuture<Void> result = null;
+ if (future.get()) {
+ result = new ResultsFuture<Void>();
+ client.sendResults(query.sql, stmt.getResultSet(), result, true);
+ } else {
+ result = ResultsFuture.NULL_FUTURE;
+ client.sendUpdateCount(query.sql, stmt.getUpdateCount());
+ setEncoding();
+ }
+ result.addCompletionListener(new ResultsFuture.CompletionListener<Void>() {
+ public void onCompletion(ResultsFuture<Void> future) {
+ try {
+ future.get();
+ doneExecuting();
+ } catch (InterruptedException e) {
+ throw new AssertionError(e);
+ } catch (ExecutionException e) {
+ errorOccurred(e.getCause());
+ }
+ };
+ });
} catch (Throwable e) {
- client.errorOccurred(e);
+ errorOccurred(e);
}
- if (query.closeRequested) {
- closeBoundStatement(query.name);
- }
- ready(false);
}
});
} catch (SQLException e) {
- this.client.errorOccurred(e);
+ errorOccurred(e);
}
}
}
@@ -410,7 +421,9 @@
@Override
public void executeQuery(final String query) {
- if (isAwaitingAsynch()) {
+ if (beginExecution()) {
+ this.client.errorOccurred("Awaiting asynch result"); //$NON-NLS-1$
+ ready();
return;
}
//46.2.3 Note that a simple Query message also destroys the unnamed portal.
@@ -419,27 +432,30 @@
if (query.trim().length() == 0) {
this.client.emptyQueryReceived();
- ready(false);
+ ready();
return;
}
QueryWorkItem r = new QueryWorkItem(query);
r.run();
}
- /**
- * Just a sanity check. Should never happen
- */
- private boolean isAwaitingAsynch() {
+ private boolean beginExecution() {
if (this.executionFuture != null) {
- this.client.errorOccurred("Awaiting asynch result"); //$NON-NLS-1$
- ready(true);
return true;
}
synchronized (this) {
- this.shouldSynch = false;
+ this.executing = true;
}
return false;
}
+
+ public boolean isExecuting() {
+ return executing;
+ }
+
+ public boolean isErrorOccurred() {
+ return errorOccurred;
+ }
@Override
public void getParameterDescription(String prepareName) {
@@ -448,17 +464,32 @@
}
Prepared query = this.preparedMap.get(prepareName);
if (query == null) {
- this.client.errorOccurred(RuntimePlugin.Util.getString("no_stmt_found", prepareName)); //$NON-NLS-1$
- ready(true);
+ errorOccurred(RuntimePlugin.Util.getString("no_stmt_found", prepareName)); //$NON-NLS-1$
}
else {
try {
this.client.sendParameterDescription(query.stmt.getParameterMetaData(), query.paramType);
} catch (SQLException e) {
- this.client.errorOccurred(e);
+ errorOccurred(e);
}
}
}
+
+ private void errorOccurred(String error) {
+ this.client.errorOccurred(error);
+ synchronized (this) {
+ this.errorOccurred = true;
+ doneExecuting();
+ }
+ }
+
+ private void errorOccurred(Throwable error) {
+ this.client.errorOccurred(error);
+ synchronized (this) {
+ this.errorOccurred = true;
+ doneExecuting();
+ }
+ }
@Override
public void getResultSetMetaDataDescription(String bindName) {
@@ -467,40 +498,27 @@
}
Portal query = this.portalMap.get(bindName);
if (query == null) {
- this.client.errorOccurred(RuntimePlugin.Util.getString("not_bound", bindName)); //$NON-NLS-1$
+ errorOccurred(RuntimePlugin.Util.getString("not_bound", bindName)); //$NON-NLS-1$
}
else {
try {
this.client.sendResultSetDescription(query.stmt.getMetaData(), query.stmt);
} catch (SQLException e) {
- this.client.errorOccurred(e);
+ errorOccurred(e);
}
}
}
@Override
public void sync() {
- boolean ready = false;
- synchronized (this) {
- synchCalled = true;
- ready = this.shouldSynch;
- }
- if (ready) {
- ready(true);
- }
+ ready();
}
- private void ready(boolean sendAlways) {
- synchronized (this) {
- if (!sendAlways) {
- shouldSynch = true;
- if (!synchCalled) {
- return;
- }
- }
- shouldSynch = true;
- synchCalled = false;
- }
+ protected synchronized void doneExecuting() {
+ executing = false;
+ }
+
+ private void ready() {
boolean inTxn = false;
boolean failedTxn = false;
try {
@@ -510,6 +528,9 @@
} catch (SQLException e) {
failedTxn = true;
}
+ synchronized (this) {
+ this.errorOccurred = false;
+ }
this.client.ready(inTxn, failedTxn);
}
@@ -523,18 +544,9 @@
if (bindName == null || bindName.length() == 0) {
bindName = UNNAMED;
}
- Portal query = this.portalMap.get(bindName);
- if (query != null) {
- if (this.executionFuture != null) {
- synchronized(query) {
- query.closeRequested = true;
- }
- return;
- }
- }
- query = this.portalMap.remove(bindName);
+ Portal query = this.portalMap.remove(bindName);
if (query == null) {
- this.client.errorOccurred(RuntimePlugin.Util.getString("not_bound", bindName)); //$NON-NLS-1$
+ errorOccurred(RuntimePlugin.Util.getString("not_bound", bindName)); //$NON-NLS-1$
}
else {
try {
@@ -559,7 +571,7 @@
}
Prepared query = this.preparedMap.remove(preparedName);
if (query == null) {
- this.client.errorOccurred(RuntimePlugin.Util.getString("no_stmt_found", preparedName)); //$NON-NLS-1$
+ errorOccurred(RuntimePlugin.Util.getString("no_stmt_found", preparedName)); //$NON-NLS-1$
}
else {
// Close all the bound messages off of this prepared
@@ -570,7 +582,7 @@
query.stmt.close();
this.client.statementClosed();
} catch (SQLException e) {
- this.client.errorOccurred(RuntimePlugin.Util.getString("error_closing_stmt", preparedName)); //$NON-NLS-1$
+ errorOccurred(RuntimePlugin.Util.getString("error_closing_stmt", preparedName)); //$NON-NLS-1$
}
}
}
@@ -611,8 +623,7 @@
@Override
public void functionCall(int oid) {
- this.client.errorOccurred(RuntimePlugin.Util.getString("lo_not_supported")); //$NON-NLS-1$
- ready(true);
+ errorOccurred(RuntimePlugin.Util.getString("lo_not_supported")); //$NON-NLS-1$
}
@Override
@@ -661,29 +672,45 @@
public void onCompletion(ResultsFuture<Boolean> future) {
executionFuture = null;
try {
+ ResultsFuture<Void> result = null;
if (future.get()) {
- client.sendResults(sql, stmt.getResultSet(), true);
+ result = new ResultsFuture<Void>();
+ client.sendResults(sql, stmt.getResultSet(), result, true);
} else {
+ result = ResultsFuture.NULL_FUTURE;
client.sendUpdateCount(sql, stmt.getUpdateCount());
setEncoding();
}
- sql = reader.readStatement();
- modfiedSQL = fixSQL(sql);
+ result.addCompletionListener(new ResultsFuture.CompletionListener<Void>() {
+ public void onCompletion(ResultsFuture<Void> future) {
+ try {
+ future.get();
+ sql = reader.readStatement();
+ modfiedSQL = fixSQL(sql);
+ } catch (InterruptedException e) {
+ throw new AssertionError(e);
+ } catch (IOException e) {
+ client.errorOccurred(e);
+ return;
+ } catch (ExecutionException e) {
+ client.errorOccurred(e.getCause());
+ return;
+ } finally {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ LogManager.logDetail(LogConstants.CTX_ODBC, e, "Error closing statement"); //$NON-NLS-1$
+ }
+ }
+ QueryWorkItem.this.run(); //continue processing
+ };
+ });
} catch (Throwable e) {
client.errorOccurred(e);
- ready(true);
return;
- } finally {
- try {
- stmt.close();
- } catch (SQLException e) {
- LogManager.logDetail(LogConstants.CTX_ODBC, e, "Error closing statement"); //$NON-NLS-1$
- }
}
- QueryWorkItem.this.run(); //continue processing
}
});
- ready(false);
return; //wait for the execution to finish
} catch (SQLException e) {
client.errorOccurred(e);
@@ -693,8 +720,10 @@
} catch(IOException e) {
client.errorOccurred(e);
}
- sync();
+ doneExecuting();
+ ready();
}
+
}
/**
@@ -764,9 +793,6 @@
* The prepared statement.
*/
PreparedStatementImpl stmt;
-
- boolean closeRequested;
}
-
}
Modified: trunk/runtime/src/main/java/org/teiid/transport/ODBCClientInstance.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/transport/ODBCClientInstance.java 2011-04-26 20:04:35 UTC (rev 3122)
+++ trunk/runtime/src/main/java/org/teiid/transport/ODBCClientInstance.java 2011-04-27 01:26:25 UTC (rev 3123)
@@ -26,6 +26,7 @@
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Arrays;
+import java.util.concurrent.ConcurrentLinkedQueue;
import org.teiid.core.util.ReflectionHelper;
import org.teiid.jdbc.TeiidDriver;
@@ -45,6 +46,7 @@
private ODBCClientRemote client;
private ODBCServerRemoteImpl server;
private ReflectionHelper serverProxy = new ReflectionHelper(ODBCServerRemote.class);
+ private ConcurrentLinkedQueue<PGRequest> messageQueue = new ConcurrentLinkedQueue<PGRequest>();
public ODBCClientInstance(final ObjectChannel channel, ODBCServerRemote.AuthenticationType authType, TeiidDriver driver) {
this.client = (ODBCClientRemote)Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[] {ODBCClientRemote.class}, new InvocationHandler() {
@@ -58,7 +60,21 @@
return null;
}
});
- this.server = new ODBCServerRemoteImpl(this, authType, driver);
+ this.server = new ODBCServerRemoteImpl(this, authType, driver) {
+ @Override
+ protected synchronized void doneExecuting() {
+ super.doneExecuting();
+ while (!server.isExecuting()) {
+ PGRequest request = messageQueue.poll();
+ if (request == null) {
+ break;
+ }
+ if (!server.isErrorOccurred() || request.struct.methodName.equals("sync")) { //$NON-NLS-1$
+ processMessage(request.struct);
+ }
+ }
+ }
+ };
}
public ODBCClientRemote getClient() {
@@ -83,6 +99,17 @@
public void receivedMessage(Object msg) throws CommunicationException {
if (msg instanceof PGRequest) {
PGRequest request = (PGRequest)msg;
+ synchronized (server) {
+ if (server.isExecuting()) {
+ //queue until done
+ messageQueue.add(request);
+ return;
+ }
+ if (server.isErrorOccurred() && !request.struct.methodName.equals("sync")) { //$NON-NLS-1$
+ //discard until sync
+ return;
+ }
+ }
processMessage(request.struct);
}
}
Modified: trunk/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java 2011-04-26 20:04:35 UTC (rev 3122)
+++ trunk/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java 2011-04-27 01:26:25 UTC (rev 3123)
@@ -72,12 +72,14 @@
private final List<PgColInfo> cols;
private final String sql;
private final ResultSetImpl rs;
+ private final ResultsFuture<Void> result;
private ResultsWorkItem(List<PgColInfo> cols, String sql,
- ResultSetImpl rs) {
+ ResultSetImpl rs, ResultsFuture<Void> result) {
this.cols = cols;
this.sql = sql;
this.rs = rs;
+ this.result = result;
}
@Override
@@ -101,11 +103,7 @@
break;
}
} catch (Throwable t) {
- try {
- sendErrorResponse(t);
- } catch (IOException e) {
- terminate(e);
- }
+ result.getResultsReceiver().exceptionOccurred(t);
}
}
}
@@ -118,14 +116,11 @@
sendDataRow(rs, cols);
} else {
sendCommandComplete(sql, 0);
+ result.getResultsReceiver().receiveResults(null);
processNext = false;
}
} catch (Throwable t) {
- try {
- sendErrorResponse(t);
- } catch (IOException e) {
- terminate(e);
- }
+ result.getResultsReceiver().exceptionOccurred(t);
return false;
}
return processNext;
@@ -167,6 +162,8 @@
private MessageEvent message;
private int maxLobSize = (2*1024*1024); // 2 MB
+ private volatile ResultsFuture<Boolean> nextFuture;
+
public PgBackendProtocol(int maxLobSize) {
this.maxLobSize = maxLobSize;
}
@@ -328,25 +325,22 @@
}
}
- private volatile ResultsFuture<Boolean> nextFuture;
-
@Override
- public void sendResults(final String sql, final ResultSetImpl rs, boolean describeRows) {
+ public void sendResults(final String sql, final ResultSetImpl rs, ResultsFuture<Void> result, boolean describeRows) {
try {
if (nextFuture != null) {
sendErrorResponse(new IllegalStateException("Pending results have not been sent")); //$NON-NLS-1$
}
- try {
- ResultSetMetaData meta = rs.getMetaData();
- List<PgColInfo> cols = getPgColInfo(meta, rs.getStatement());
- if (describeRows) {
- sendRowDescription(cols);
- }
- Runnable r = new ResultsWorkItem(cols, sql, rs);
- r.run();
- } catch (SQLException e) {
- sendErrorResponse(e);
- }
+
+ ResultSetMetaData meta = rs.getMetaData();
+ List<PgColInfo> cols = getPgColInfo(meta, rs.getStatement());
+ if (describeRows) {
+ sendRowDescription(cols);
+ }
+ Runnable r = new ResultsWorkItem(cols, sql, rs, result);
+ r.run();
+ } catch (SQLException e) {
+ result.getResultsReceiver().exceptionOccurred(e);
} catch (IOException e) {
terminate(e);
}
More information about the teiid-commits
mailing list