Author: shawkins
Date: 2011-05-09 13:24:36 -0400 (Mon, 09 May 2011)
New Revision: 3156
Added:
branches/7.4.x/test-integration/common/src/test/java/org/teiid/jdbc/TestLocalConnections.java
Modified:
branches/7.4.x/common-core/src/main/java/org/teiid/core/util/ReflectionHelper.java
branches/7.4.x/common-core/src/test/java/org/teiid/core/util/TestReflectionHelper.java
branches/7.4.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
branches/7.4.x/runtime/src/main/java/org/teiid/transport/SocketClientInstance.java
branches/7.4.x/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java
Log:
TEIID-1579 general fix for ReflectionHelper create logic, and proper lock / queue
management with synch queries.
Modified:
branches/7.4.x/common-core/src/main/java/org/teiid/core/util/ReflectionHelper.java
===================================================================
---
branches/7.4.x/common-core/src/main/java/org/teiid/core/util/ReflectionHelper.java 2011-05-05
17:35:15 UTC (rev 3155)
+++
branches/7.4.x/common-core/src/main/java/org/teiid/core/util/ReflectionHelper.java 2011-05-09
17:24:36 UTC (rev 3156)
@@ -23,6 +23,7 @@
package org.teiid.core.util;
import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
@@ -175,26 +176,7 @@
}
for (Method method : methodsWithSameName) {
Class[] args = method.getParameterTypes();
- if ( args.length != argumentsClasses.size() ) {
- continue;
- }
- boolean allMatch = true; // assume all args match
- for ( int i=0; i<args.length && allMatch == true; ++i ) {
- Class<?> primitiveClazz = argumentsClassList.get(i);
- Class<?> objectClazz = argumentsClasses.get(i);
- if ( objectClazz != null ) {
- // Check for possible matches with (converted) primitive types
- // as well as the original Object type
- if ( ! args[i].equals(primitiveClazz) && !
args[i].isAssignableFrom(objectClazz) ) {
- allMatch = false; // found one that doesn't match
- }
- } else {
- // a null is assignable for everything except a primitive
- if ( args[i].isPrimitive() ) {
- allMatch = false; // found one that doesn't match
- }
- }
- }
+ boolean allMatch = argsMatch(argumentsClasses, argumentsClassList, args);
if ( allMatch ) {
if (result != null) {
throw new NoSuchMethodException(methodName + " Args: " +
argumentsClasses + " has multiple possible signatures."); //$NON-NLS-1$
//$NON-NLS-2$
@@ -209,6 +191,30 @@
throw new NoSuchMethodException(methodName + " Args: " +
argumentsClasses); //$NON-NLS-1$
}
+
+ private static boolean argsMatch(List<Class<?>> argumentsClasses,
+ List<Class<?>> argumentsClassList, Class[] args) {
+ if ( args.length != argumentsClasses.size() ) {
+ return false;
+ }
+ for ( int i=0; i<args.length; ++i ) {
+ Class<?> primitiveClazz = argumentsClassList.get(i);
+ Class<?> objectClazz = argumentsClasses.get(i);
+ if ( objectClazz != null ) {
+ // Check for possible matches with (converted) primitive types
+ // as well as the original Object type
+ if ( ! args[i].equals(primitiveClazz) && !
args[i].isAssignableFrom(objectClazz) ) {
+ return false; // found one that doesn't match
+ }
+ } else {
+ // a null is assignable for everything except a primitive
+ if ( args[i].isPrimitive() ) {
+ return false; // found one that doesn't match
+ }
+ }
+ }
+ return true;
+ }
/**
* Convert any argument classes to primitives.
@@ -260,10 +266,10 @@
* @param classLoader the class loader to use; may be null if the current
* class loader is to be used
* @return Object is the instance of the class
- * @throws TeiidException if an error occurrs instantiating the class
+ * @throws TeiidException if an error occurs instantiating the class
*/
- public static final Object create(String className, Collection ctorObjs,
+ public static final Object create(String className, Collection<?> ctorObjs,
final ClassLoader classLoader) throws
TeiidException {
try {
int size = (ctorObjs == null ? 0 : ctorObjs.size());
@@ -272,10 +278,12 @@
int i = 0;
if (size > 0) {
- for (Iterator it=ctorObjs.iterator(); it.hasNext(); ) {
+ for (Iterator<?> it=ctorObjs.iterator(); it.hasNext(); ) {
Object obj = it.next();
- names[i] = loadClass(obj.getClass().getName(),classLoader);
- objArray[i] = obj;
+ if (obj != null) {
+ names[i] = obj.getClass();
+ objArray[i] = obj;
+ }
i++;
}
}
@@ -287,16 +295,39 @@
public static final Object create(String className, Object[] ctorObjs,
Class<?>[] argTypes,
final ClassLoader classLoader) throws TeiidException {
+ Class<?> cls;
try {
- final Class<?> cls = loadClass(className,classLoader);
-
- Constructor<?> ctor = cls.getDeclaredConstructor(argTypes);
-
- return ctor.newInstance(ctorObjs);
-
+ cls = loadClass(className,classLoader);
} catch(Exception e) {
throw new TeiidException(e);
}
+ Constructor<?> ctor = null;
+ try {
+ ctor = cls.getDeclaredConstructor(argTypes);
+ } catch (NoSuchMethodException e) {
+
+ }
+
+ if (ctor == null && argTypes != null && argTypes.length > 0)
{
+ List<Class<?>> argumentsClasses = Arrays.asList(argTypes);
+ List<Class<?>> argumentsClassList =
convertArgumentClassesToPrimitives(argumentsClasses);
+ for (Constructor<?> possible : cls.getDeclaredConstructors()) {
+ if (argsMatch(argumentsClasses, argumentsClassList,
possible.getParameterTypes())) {
+ ctor = possible;
+ break;
+ }
+ }
+ }
+
+ if (ctor == null) {
+ throw new TeiidException(className + " Args: " +
Arrays.toString(argTypes)); //$NON-NLS-1$
+ }
+
+ try {
+ return ctor.newInstance(ctorObjs);
+ } catch (Exception e) {
+ throw new TeiidException(e);
+ }
}
}
Modified:
branches/7.4.x/common-core/src/test/java/org/teiid/core/util/TestReflectionHelper.java
===================================================================
---
branches/7.4.x/common-core/src/test/java/org/teiid/core/util/TestReflectionHelper.java 2011-05-05
17:35:15 UTC (rev 3155)
+++
branches/7.4.x/common-core/src/test/java/org/teiid/core/util/TestReflectionHelper.java 2011-05-09
17:24:36 UTC (rev 3156)
@@ -27,6 +27,7 @@
import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import org.junit.Test;
@@ -384,6 +385,10 @@
helpAssertSameMethodSignature("Found wrong method signature",
signatureExpected, signatureFound); //$NON-NLS-1$
}
+ @Test public void testCreate() throws Exception {
+ ReflectionHelper.create(SomeClass.class.getName(), Arrays.asList(true), null);
+ }
+
/**
* Test base interface
*/
@@ -419,4 +424,9 @@
void method(Serializable arg1, Number arg2);
void method(Serializable arg1, Long arg2);
}
+
+ public static class SomeClass {
+ public SomeClass(boolean primArg) {
+ }
+ }
}
Modified:
branches/7.4.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
---
branches/7.4.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-05-05
17:35:15 UTC (rev 3155)
+++
branches/7.4.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-05-09
17:24:36 UTC (rev 3156)
@@ -283,7 +283,7 @@
}
}
long count = readCount.incrementAndGet();
- LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, batchManager.id, id, "reading
batch from disk, total reads:", count); //$NON-NLS-1$
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, batchManager.id, id, "reading
batch from disk, total reads:", count); //$NON-NLS-1$
try {
this.batchManager.compactionLock.readLock().lock();
long[] info = batchManager.physicalMapping.get(this.id);
@@ -320,7 +320,7 @@
if (batch != null) {
if (!persistent) {
long count = writeCount.incrementAndGet();
- LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, batchManager.id, id, "writing
batch to disk, total writes: ", count); //$NON-NLS-1$
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, batchManager.id, id,
"writing batch to disk, total writes: ", count); //$NON-NLS-1$
long offset = 0;
if (lobManager != null) {
for (List<?> tuple : batch.getTuples()) {
Modified:
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java
===================================================================
---
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java 2011-05-05
17:35:15 UTC (rev 3155)
+++
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/datamgr/ConnectorWorkItem.java 2011-05-09
17:24:36 UTC (rev 3156)
@@ -271,7 +271,7 @@
protected AtomicResultsMessage handleBatch() throws TranslatorException {
Assertion.assertTrue(!this.lastBatch);
- LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.id,
"Sending results from connector"}); //$NON-NLS-1$
+ LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.id,
"Getting results from connector"}); //$NON-NLS-1$
int batchSize = 0;
List<List> rows = new ArrayList<List>(batchSize/4);
@@ -320,7 +320,9 @@
}
}
LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.id,
"Obtained last batch, total row count:", rowCount}); //$NON-NLS-1$\
- }
+ } else {
+ LogManager.logDetail(LogConstants.CTX_CONNECTOR, new Object[] {this.id,
"Obtained results from connector, current row count:", rowCount});
//$NON-NLS-1$
+ }
int currentRowCount = rows.size();
if ( !lastBatch && currentRowCount == 0 ) {
Modified: branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
---
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2011-05-05
17:35:15 UTC (rev 3155)
+++
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2011-05-09
17:24:36 UTC (rev 3156)
@@ -332,9 +332,10 @@
RequestWorkItem workItem = new RequestWorkItem(this, requestMsg, request,
resultsFuture.getResultsReceiver(), requestID, workContext);
logMMCommand(workItem, Event.NEW, null);
addRequest(requestID, workItem, state);
+ boolean runInThread = DQPWorkContext.getWorkContext().useCallingThread() ||
requestMsg.isSync();
synchronized (waitingPlans) {
- if (currentlyActivePlans < maxActivePlans ||
(!DQPWorkContext.getWorkContext().useCallingThread() && requestMsg.isSync())) {
- startActivePlan(workItem);
+ if (runInThread || currentlyActivePlans < maxActivePlans) {
+ startActivePlan(workItem, !runInThread);
} else {
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_DQP, "Queuing plan, since max
plans has been reached."); //$NON-NLS-1$
@@ -342,6 +343,9 @@
waitingPlans.add(workItem);
}
}
+ if (runInThread) {
+ workItem.run();
+ }
return resultsFuture;
}
@@ -362,15 +366,12 @@
state.addRequest(requestID);
}
- private void startActivePlan(RequestWorkItem workItem) {
+ private void startActivePlan(RequestWorkItem workItem, boolean addToQueue) {
workItem.active = true;
- if (workItem.getDqpWorkContext().useCallingThread() || workItem.requestMsg.isSync()) {
- this.currentlyActivePlans++;
- workItem.run();
- } else {
+ if (addToQueue) {
this.addWork(workItem);
- this.currentlyActivePlans++;
}
+ this.currentlyActivePlans++;
}
void finishProcessing(final RequestWorkItem workItem) {
@@ -381,7 +382,7 @@
workItem.active = false;
currentlyActivePlans--;
if (!waitingPlans.isEmpty()) {
- startActivePlan(waitingPlans.remove());
+ startActivePlan(waitingPlans.remove(), true);
}
}
}
Modified:
branches/7.4.x/runtime/src/main/java/org/teiid/transport/SocketClientInstance.java
===================================================================
---
branches/7.4.x/runtime/src/main/java/org/teiid/transport/SocketClientInstance.java 2011-05-05
17:35:15 UTC (rev 3155)
+++
branches/7.4.x/runtime/src/main/java/org/teiid/transport/SocketClientInstance.java 2011-05-09
17:24:36 UTC (rev 3156)
@@ -75,7 +75,7 @@
public void send(Message message, Serializable messageKey) {
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_TRANSPORT,
MessageLevel.DETAIL)) {
- LogManager.logDetail(LogConstants.CTX_TRANSPORT, " message: " +
message + " for request ID:" + messageKey); //$NON-NLS-1$ //$NON-NLS-2$
+ LogManager.logDetail(LogConstants.CTX_TRANSPORT, " message: " +
message + " for message:" + messageKey); //$NON-NLS-1$ //$NON-NLS-2$
}
message.setMessageKey(messageKey);
objectSocket.write(message);
Modified:
branches/7.4.x/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java
===================================================================
---
branches/7.4.x/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java 2011-05-05
17:35:15 UTC (rev 3155)
+++
branches/7.4.x/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java 2011-05-09
17:24:36 UTC (rev 3156)
@@ -102,6 +102,10 @@
registerClientService(DQP.class, dqp, null);
}
+ public void stop() {
+ this.dqp.stop();
+ }
+
public void setMetadataRepository(MetadataRepository metadataRepository) {
this.repo.setMetadataRepository(metadataRepository);
this.dqp.setMetadataRepository(metadataRepository);
Added:
branches/7.4.x/test-integration/common/src/test/java/org/teiid/jdbc/TestLocalConnections.java
===================================================================
---
branches/7.4.x/test-integration/common/src/test/java/org/teiid/jdbc/TestLocalConnections.java
(rev 0)
+++
branches/7.4.x/test-integration/common/src/test/java/org/teiid/jdbc/TestLocalConnections.java 2011-05-09
17:24:36 UTC (rev 3156)
@@ -0,0 +1,139 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.jdbc;
+
+import static org.junit.Assert.*;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.LinkedHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.jboss.netty.handler.timeout.TimeoutException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.teiid.core.types.DataTypeManager;
+import org.teiid.metadata.FunctionMethod;
+import org.teiid.metadata.FunctionParameter;
+import org.teiid.metadata.MetadataStore;
+import org.teiid.metadata.Schema;
+import org.teiid.metadata.FunctionMethod.PushDown;
+import org.teiid.query.function.metadata.FunctionCategoryConstants;
+import org.teiid.query.metadata.TransformationMetadata.Resource;
+
+@SuppressWarnings("nls")
+public class TestLocalConnections {
+
+ private final class SimpleUncaughtExceptionHandler implements
+ UncaughtExceptionHandler {
+ Throwable t;
+
+ @Override
+ public void uncaughtException(Thread arg0, Throwable arg1) {
+ t = arg1;
+ }
+ }
+
+ static ReentrantLock lock = new ReentrantLock();
+ static Condition waiting = lock.newCondition();
+ static Condition wait = lock.newCondition();
+
+ public static int blocking() throws InterruptedException {
+ lock.lock();
+ try {
+ waiting.signal();
+ if (!wait.await(2, TimeUnit.SECONDS)) {
+ throw new TimeoutException();
+ }
+ } finally {
+ lock.unlock();
+ }
+ return 1;
+ }
+
+ static FakeServer server = new FakeServer();
+
+ @BeforeClass public static void oneTimeSetup() {
+ server.setUseCallingThread(true);
+ MetadataStore ms = new MetadataStore();
+ Schema s = new Schema();
+ s.setName("test");
+ FunctionMethod function = new FunctionMethod("foo", null,
FunctionCategoryConstants.MISCELLANEOUS, PushDown.CANNOT_PUSHDOWN,
TestLocalConnections.class.getName(), "blocking", new FunctionParameter[0], new
FunctionParameter("result", DataTypeManager.DefaultDataTypes.INTEGER), true,
FunctionMethod.Determinism.NONDETERMINISTIC);
+ s.addFunction(function);
+ ms.addSchema(s);
+ server.deployVDB("test", ms, new LinkedHashMap<String,
Resource>());
+ }
+
+ @AfterClass public static void oneTimeTearDown() {
+ server.stop();
+ }
+
+ @Test public void testConcurrentExection() throws Throwable {
+
+ Thread t = new Thread() {
+
+ public void run() {
+ try {
+ Connection c = server.createConnection("jdbc:teiid:test");
+
+ Statement s = c.createStatement();
+ s.execute("select foo()");
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ SimpleUncaughtExceptionHandler handler = new SimpleUncaughtExceptionHandler();
+ t.setUncaughtExceptionHandler(handler);
+ t.start();
+
+ lock.lock();
+ try {
+ waiting.await();
+ } finally {
+ lock.unlock();
+ }
+ Connection c = server.createConnection("jdbc:teiid:test");
+ Statement s = c.createStatement();
+ s.execute("select * from tables");
+
+ lock.lock();
+ try {
+ wait.signal();
+ } finally {
+ lock.unlock();
+ }
+ t.join(2000);
+ if (t.isAlive()) {
+ fail();
+ }
+ if (handler.t != null) {
+ throw handler.t;
+ }
+ }
+
+}
Property changes on:
branches/7.4.x/test-integration/common/src/test/java/org/teiid/jdbc/TestLocalConnections.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain