Author: rareddy
Date: 2010-01-22 17:49:05 -0500 (Fri, 22 Jan 2010)
New Revision: 1776
Added:
branches/JCA/client/src/main/java/com/metamatrix/dqp/client/DQPManagement.java
branches/JCA/client/src/main/java/org/teiid/transport/
branches/JCA/client/src/main/java/org/teiid/transport/LocalServerConnection.java
Removed:
branches/JCA/runtime/src/main/java/org/teiid/transport/LocalServerConnection.java
Modified:
branches/JCA/client-jdbc/src/main/java/com/metamatrix/jdbc/MMConnection.java
branches/JCA/client-jdbc/src/main/java/org/teiid/jdbc/EmbeddedProfile.java
branches/JCA/client-jdbc/src/test/java/com/metamatrix/jdbc/util/TestMMJDBCURL.java
branches/JCA/client/src/main/java/com/metamatrix/common/comm/api/ServerConnection.java
branches/JCA/client/src/main/java/com/metamatrix/common/comm/api/ServerConnectionFactory.java
branches/JCA/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnection.java
branches/JCA/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnectionFactory.java
branches/JCA/client/src/main/java/com/metamatrix/platform/security/api/SessionToken.java
branches/JCA/client/src/main/resources/com/metamatrix/common/comm/i18n.properties
branches/JCA/client/src/main/resources/com/metamatrix/common/comm/platform/i18n.properties
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DQPManagementView.java
branches/JCA/jboss-integration/src/main/java/org/teiid/adminapi/jboss/Admin.java
branches/JCA/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
branches/JCA/runtime/src/main/java/org/teiid/TeiidConnectionFactory.java
branches/JCA/runtime/src/main/java/org/teiid/TeiidManagedConnection.java
branches/JCA/runtime/src/main/java/org/teiid/WrappedConnection.java
branches/JCA/runtime/src/main/java/org/teiid/transport/LogonImpl.java
branches/JCA/runtime/src/main/java/org/teiid/transport/ServerWorkItem.java
branches/JCA/runtime/src/main/java/org/teiid/transport/SocketClientInstance.java
branches/JCA/runtime/src/main/java/org/teiid/transport/SocketListener.java
branches/JCA/runtime/src/main/java/org/teiid/transport/SocketTransport.java
branches/JCA/runtime/src/test/java/org/teiid/transport/TestCommSockets.java
branches/JCA/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java
Log:
TEIID-833: modified the socket layer and local layer to use same connection into runtime.
Removed the thread nature of the socket worker item, such that netty threads directly
submit work on to process workers
Modified:
branches/JCA/client/src/main/java/com/metamatrix/common/comm/api/ServerConnection.java
===================================================================
---
branches/JCA/client/src/main/java/com/metamatrix/common/comm/api/ServerConnection.java 2010-01-22
17:27:53 UTC (rev 1775)
+++
branches/JCA/client/src/main/java/com/metamatrix/common/comm/api/ServerConnection.java 2010-01-22
22:49:05 UTC (rev 1776)
@@ -33,8 +33,6 @@
void close();
- void reallyClose();
-
boolean isOpen();
LogonResult getLogonResult();
Modified:
branches/JCA/client/src/main/java/com/metamatrix/common/comm/api/ServerConnectionFactory.java
===================================================================
---
branches/JCA/client/src/main/java/com/metamatrix/common/comm/api/ServerConnectionFactory.java 2010-01-22
17:27:53 UTC (rev 1775)
+++
branches/JCA/client/src/main/java/com/metamatrix/common/comm/api/ServerConnectionFactory.java 2010-01-22
22:49:05 UTC (rev 1776)
@@ -42,5 +42,6 @@
<T> T getService(Class<T> clazz);
- <T> void setService(Class<T> type, T instance);
+ <T> void registerClientService(Class<T> type, T instance, String
loggingContext);
+ <T> String getLoggingContextForService(Class<T> type);
}
Modified:
branches/JCA/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnection.java
===================================================================
---
branches/JCA/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnection.java 2010-01-22
17:27:53 UTC (rev 1775)
+++
branches/JCA/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnection.java 2010-01-22
22:49:05 UTC (rev 1776)
@@ -190,7 +190,7 @@
if (e.getCause() instanceof CommunicationException) {
throw (CommunicationException)e.getCause();
}
- throw new CommunicationException(e,
CommPlatformPlugin.Util.getString("PlatformServerConnectionFactory.Unable_to_find_a_component_used_in_logging_on_to_MetaMatrix"));
//$NON-NLS-1$
+ throw new CommunicationException(e,
CommPlatformPlugin.Util.getString("PlatformServerConnectionFactory.Unable_to_find_a_component_used_in_logging_on_to"));
//$NON-NLS-1$
}
}
@@ -316,10 +316,4 @@
public void setFailOver(boolean failOver) {
this.failOver = failOver;
}
-
- @Override
- public void reallyClose() {
- close();
- }
-
}
\ No newline at end of file
Modified:
branches/JCA/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnectionFactory.java
===================================================================
---
branches/JCA/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnectionFactory.java 2010-01-22
17:27:53 UTC (rev 1775)
+++
branches/JCA/client/src/main/java/com/metamatrix/common/comm/platform/socket/client/SocketServerConnectionFactory.java 2010-01-22
22:49:05 UTC (rev 1776)
@@ -291,8 +291,11 @@
public <T> T getService(Class<T> clazz) {
return null;
}
-
@Override
- public <T> void setService(Class<T> type, T instance) {
+ public <T> void registerClientService(Class<T> type, T instance, String
loggingContext){
}
+ @Override
+ public <T> String getLoggingContextForService(Class<T> type) {
+ return null;
+ }
}
Added: branches/JCA/client/src/main/java/com/metamatrix/dqp/client/DQPManagement.java
===================================================================
--- branches/JCA/client/src/main/java/com/metamatrix/dqp/client/DQPManagement.java
(rev 0)
+++
branches/JCA/client/src/main/java/com/metamatrix/dqp/client/DQPManagement.java 2010-01-22
22:49:05 UTC (rev 1776)
@@ -0,0 +1,45 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+package com.metamatrix.dqp.client;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.teiid.adminapi.AdminException;
+import org.teiid.adminapi.impl.RequestMetadata;
+import org.teiid.adminapi.impl.SessionMetadata;
+import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata;
+
+
+public interface DQPManagement {
+ List<RequestMetadata> getRequestsForSession(long sessionId) ;
+ List<RequestMetadata> getRequests();
+ WorkerPoolStatisticsMetadata getWorkManagerStatistics(String identifier);
+ void terminateSession(long terminateeId);
+ boolean cancelRequest(long sessionId, long requestId) throws AdminException;
+ Collection<String> getCacheTypes();
+ void clearCache(String cacheType);
+ Collection<SessionMetadata> getActiveSessions() throws AdminException;
+ int getActiveSessionsCount() throws AdminException;
+ Collection<org.teiid.adminapi.Transaction> getTransactions();
+ void terminateTransaction(String xid) throws AdminException ;
+}
Modified:
branches/JCA/client/src/main/java/com/metamatrix/platform/security/api/SessionToken.java
===================================================================
---
branches/JCA/client/src/main/java/com/metamatrix/platform/security/api/SessionToken.java 2010-01-22
17:27:53 UTC (rev 1775)
+++
branches/JCA/client/src/main/java/com/metamatrix/platform/security/api/SessionToken.java 2010-01-22
22:49:05 UTC (rev 1776)
@@ -33,10 +33,23 @@
* transit if sent to the client. Also it should only be sent to the client
* who creates the session.
*/
-public class SessionToken implements Serializable,
- Cloneable {
+public class SessionToken implements Serializable, Cloneable {
public final static long serialVersionUID = -2853708320435636107L;
+ private static ThreadLocal<SessionToken> CONTEXTS = new
ThreadLocal<SessionToken>() {
+ protected SessionToken initialValue() {
+ return null;
+ }
+ };
+
+ public static SessionToken getSession() {
+ return CONTEXTS.get();
+ }
+
+ public static void setSession(SessionToken context) {
+ CONTEXTS.set(context);
+ }
+
/** The session ID */
private long sessionID;
private String userName;
Copied: branches/JCA/client/src/main/java/org/teiid/transport/LocalServerConnection.java
(from rev 1743,
branches/JCA/runtime/src/main/java/org/teiid/transport/LocalServerConnection.java)
===================================================================
--- branches/JCA/client/src/main/java/org/teiid/transport/LocalServerConnection.java
(rev 0)
+++
branches/JCA/client/src/main/java/org/teiid/transport/LocalServerConnection.java 2010-01-22
22:49:05 UTC (rev 1776)
@@ -0,0 +1,152 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.transport;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import com.metamatrix.admin.api.exception.security.InvalidSessionException;
+import com.metamatrix.api.exception.MetaMatrixComponentException;
+import com.metamatrix.api.exception.security.LogonException;
+import com.metamatrix.client.ExceptionUtil;
+import com.metamatrix.common.comm.CommonCommPlugin;
+import com.metamatrix.common.comm.api.ServerConnection;
+import com.metamatrix.common.comm.api.ServerConnectionFactory;
+import com.metamatrix.common.comm.exception.CommunicationException;
+import com.metamatrix.common.comm.exception.ConnectionException;
+import com.metamatrix.common.comm.platform.CommPlatformPlugin;
+import com.metamatrix.platform.security.api.ILogon;
+import com.metamatrix.platform.security.api.LogonResult;
+import com.metamatrix.platform.security.api.SessionToken;
+
+public class LocalServerConnection implements ServerConnection {
+ private static final String TEIID_RUNTIME = "java:teiid/runtime-engine";
+
+ private final LogonResult result;
+ private boolean shutdown;
+
+ public LocalServerConnection(Properties connectionProperties) throws
CommunicationException, ConnectionException{
+ this.result = authenticate(connectionProperties);
+ }
+
+ public synchronized LogonResult authenticate(Properties connProps) throws
ConnectionException, CommunicationException {
+ try {
+ connProps.setProperty("localConnection", "true");
+ LogonResult logonResult = this.getService(ILogon.class).logon(connProps);
+ return logonResult;
+ } catch (LogonException e) {
+ // Propagate the original message as it contains the message we want
+ // to give to the user
+ throw new ConnectionException(e, e.getMessage());
+ } catch (MetaMatrixComponentException e) {
+ if (e.getCause() instanceof CommunicationException) {
+ throw (CommunicationException)e.getCause();
+ }
+ throw new CommunicationException(e,
CommPlatformPlugin.Util.getString("PlatformServerConnectionFactory.Unable_to_find_a_component_used_in_logging_on_to_MetaMatrix"));
//$NON-NLS-1$
+ }
+ }
+
+
+ @SuppressWarnings("unchecked")
+ public <T> T getService(final Class<T> iface) {
+ return (T) Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]
{iface}, new InvocationHandler() {
+
+ public Object invoke(Object arg0, Method arg1, Object[] arg2) throws Throwable {
+ if (!isOpen()) {
+ throw ExceptionUtil.convertException(arg1, new
MetaMatrixComponentException(CommonCommPlugin.Util.getString("LocalTransportHandler.Transport_shutdown")));
//$NON-NLS-1$
+ }
+ try {
+ ServerConnectionFactory scf = lookup(TEIID_RUNTIME);
+ T service = scf.getService(iface);
+
+ if (!(iface.equals(ILogon.class))) {
+ SessionToken.setSession(result.getSessionToken());
+ }
+
+ return arg1.invoke(service, arg2);
+ } catch(NamingException e){
+ throw ExceptionUtil.convertException(arg1, new
MetaMatrixComponentException(CommonCommPlugin.Util.getString("LocalTransportHandler.Transport_shutdown")));
//$NON-NLS-1$
+ } finally {
+ SessionToken.setSession(null);
+ }
+ }
+ });
+
+ }
+
+ public boolean isOpen() {
+ return !shutdown;
+ }
+
+ public void close() {
+ shutdown(true);
+ }
+
+ private void shutdown(boolean logoff) {
+ if (shutdown) {
+ return;
+ }
+
+ if (logoff) {
+ try {
+ //make a best effort to send the logoff
+ Future<?> writeFuture = this.getService(ILogon.class).logoff();
+ if (writeFuture != null) {
+ writeFuture.get(5000, TimeUnit.MILLISECONDS);
+ }
+ } catch (InvalidSessionException e) {
+ //ignore
+ } catch (InterruptedException e) {
+ //ignore
+ } catch (ExecutionException e) {
+ //ignore
+ } catch (TimeoutException e) {
+ //ignore
+ }
+ }
+ this.shutdown = true;
+ }
+
+ public LogonResult getLogonResult() {
+ return result;
+ }
+
+ @Override
+ public boolean isSameInstance(ServerConnection conn) throws CommunicationException {
+ return (conn instanceof LocalServerConnection);
+ }
+
+ public static <T> T lookup(String jndiName) throws NamingException {
+ InitialContext ic = new InitialContext();
+ return (T)ic.lookup(jndiName);
+ }
+}
Property changes on:
branches/JCA/client/src/main/java/org/teiid/transport/LocalServerConnection.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified:
branches/JCA/client/src/main/resources/com/metamatrix/common/comm/i18n.properties
===================================================================
---
branches/JCA/client/src/main/resources/com/metamatrix/common/comm/i18n.properties 2010-01-22
17:27:53 UTC (rev 1775)
+++
branches/JCA/client/src/main/resources/com/metamatrix/common/comm/i18n.properties 2010-01-22
22:49:05 UTC (rev 1776)
@@ -27,4 +27,6 @@
TrustedSessionToken.token_null = The trusted token for a session token may not be null.
StreamImpl.Unable_to_read_data_from_stream=Unable to read data from the stream: {0}
-RequestMessage.invalid_txnAutoWrap=''{0}'' is an invalid transaction
autowrap mode.
\ No newline at end of file
+RequestMessage.invalid_txnAutoWrap=''{0}'' is an invalid transaction
autowrap mode.
+
+LocalTransportHandler.Transport_shutdown=Tranport has been shutdown.
Modified:
branches/JCA/client/src/main/resources/com/metamatrix/common/comm/platform/i18n.properties
===================================================================
---
branches/JCA/client/src/main/resources/com/metamatrix/common/comm/platform/i18n.properties 2010-01-22
17:27:53 UTC (rev 1775)
+++
branches/JCA/client/src/main/resources/com/metamatrix/common/comm/platform/i18n.properties 2010-01-22
22:49:05 UTC (rev 1776)
@@ -33,8 +33,8 @@
PlatformServerConnectionFactory.Missing_required_property=Missing required property:
PlatformServerConnectionFactory.Error_encrypting_user_password=Error encrypting user
password
PlatformServerConnectionFactory.Error_communicating_with_app_server=Error communicating
with app server
-PlatformServerConnectionFactory.Error_logging_on_to_MetaMatrix=Error logging on to
MetaMatrix: {0}
-PlatformServerConnectionFactory.Unable_to_find_a_component_used_in_logging_on_to_MetaMatrix=Unable
to find a component used in logging on to MetaMatrix
+PlatformServerConnectionFactory.Error_logging_on_to_MetaMatrix=Error logging on to Teiid:
{0}
+PlatformServerConnectionFactory.Unable_to_find_a_component_used_in_logging_on_to=Unable
to find a component used authenticate on to Teiid
PlatformServerConnectionFactory.Unable_to_get_a_PlatformServerConnection=Unable to get a
PlatformServerConnection
PlatformServerConnectionFactory.Error_comunicating_with_LogonAPI=Error communicating with
LogonAPI
PlatformServerConnectionFactory.JNDI_library_mismatch_plugin=Client library may not match
server vendor or version for server {1}. Client code loaded from plugin: {2}, initial
context factory: {0}.
Modified: branches/JCA/client-jdbc/src/main/java/com/metamatrix/jdbc/MMConnection.java
===================================================================
---
branches/JCA/client-jdbc/src/main/java/com/metamatrix/jdbc/MMConnection.java 2010-01-22
17:27:53 UTC (rev 1775)
+++
branches/JCA/client-jdbc/src/main/java/com/metamatrix/jdbc/MMConnection.java 2010-01-22
22:49:05 UTC (rev 1776)
@@ -57,8 +57,6 @@
import javax.transaction.xa.Xid;
-import org.teiid.adminapi.Admin;
-
import com.metamatrix.common.api.MMURL;
import com.metamatrix.common.comm.api.ServerConnection;
import com.metamatrix.common.comm.exception.CommunicationException;
@@ -279,7 +277,7 @@
} catch (SQLException se) {
firstException = se;
} finally {
- this.serverConn.reallyClose();
+ this.serverConn.close();
if ( firstException != null )
throw (SQLException)firstException;
}
Modified: branches/JCA/client-jdbc/src/main/java/org/teiid/jdbc/EmbeddedProfile.java
===================================================================
--- branches/JCA/client-jdbc/src/main/java/org/teiid/jdbc/EmbeddedProfile.java 2010-01-22
17:27:53 UTC (rev 1775)
+++ branches/JCA/client-jdbc/src/main/java/org/teiid/jdbc/EmbeddedProfile.java 2010-01-22
22:49:05 UTC (rev 1776)
@@ -32,14 +32,12 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
+import org.teiid.transport.LocalServerConnection;
-import com.metamatrix.common.comm.api.ServerConnection;
-import com.metamatrix.common.comm.api.ServerConnectionFactory;
import com.metamatrix.common.comm.exception.CommunicationException;
import com.metamatrix.common.comm.exception.ConnectionException;
import com.metamatrix.common.util.PropertiesUtils;
+import com.metamatrix.core.MetaMatrixRuntimeException;
import com.metamatrix.jdbc.BaseDataSource;
import com.metamatrix.jdbc.JDBCPlugin;
import com.metamatrix.jdbc.MMConnection;
@@ -47,7 +45,7 @@
final class EmbeddedProfile {
- private static final String TEIID_RUNTIME = "java:teiid/runtime-engine";
+
private static final String BUNDLE_NAME = "com.metamatrix.jdbc.basic_i18n";
//$NON-NLS-1$
/**
@@ -90,13 +88,8 @@
// and make sure we have all the properties we need.
validateProperties(info);
try {
- InitialContext ic = new InitialContext();
- ServerConnectionFactory scf = (ServerConnectionFactory)ic.lookup(TEIID_RUNTIME);
- ServerConnection conn = scf.getConnection(info);
- // this close has no effect; it only closes the managed connection of server
connection
- conn.close();
- return new MMConnection(conn, info, url);
- } catch (NamingException e) {
+ return new MMConnection(new LocalServerConnection(info), info, url);
+ } catch (MetaMatrixRuntimeException e) {
throw new SQLException(e);
} catch (ConnectionException e) {
throw new SQLException(e);
Modified:
branches/JCA/client-jdbc/src/test/java/com/metamatrix/jdbc/util/TestMMJDBCURL.java
===================================================================
---
branches/JCA/client-jdbc/src/test/java/com/metamatrix/jdbc/util/TestMMJDBCURL.java 2010-01-22
17:27:53 UTC (rev 1775)
+++
branches/JCA/client-jdbc/src/test/java/com/metamatrix/jdbc/util/TestMMJDBCURL.java 2010-01-22
22:49:05 UTC (rev 1776)
@@ -244,8 +244,8 @@
}
try {
+ // in embedded situation there is no connection url
new MMJDBCURL("myVDB", " ", null); //$NON-NLS-1$
//$NON-NLS-2$
- fail("Should have failed."); //$NON-NLS-1$
} catch (Exception e) {
}
Modified:
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DQPManagementView.java
===================================================================
---
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DQPManagementView.java 2010-01-22
17:27:53 UTC (rev 1775)
+++
branches/JCA/engine/src/main/java/org/teiid/dqp/internal/process/DQPManagementView.java 2010-01-22
22:49:05 UTC (rev 1776)
@@ -32,6 +32,7 @@
import org.jboss.managed.api.annotation.ManagementProperties;
import org.jboss.managed.api.annotation.ManagementProperty;
import org.jboss.managed.api.annotation.ViewUse;
+import org.teiid.adminapi.AdminComponentException;
import org.teiid.adminapi.AdminException;
import org.teiid.adminapi.impl.RequestMetadata;
import org.teiid.adminapi.impl.SessionMetadata;
@@ -41,6 +42,7 @@
import com.metamatrix.api.exception.MetaMatrixComponentException;
import com.metamatrix.api.exception.security.SessionServiceException;
+import com.metamatrix.dqp.client.DQPManagement;
/**
* Since DQPCore can not be made into a management bean itself as it has life cycle
dependencies associated with
@@ -49,7 +51,7 @@
* what ever objects it has access to.
*/
@ManagementObject(isRuntime=true,
componentType=@ManagementComponent(type="teiid",subtype="dqp"),
properties=ManagementProperties.EXPLICIT)
-public class DQPManagementView {
+public class DQPManagementView implements DQPManagement {
private DQPCore dqp;
private ConnectorManagerRepository connectorManagerRepository;
@@ -66,16 +68,19 @@
this.connectorManagerRepository = repo;
}
+ @Override
@ManagementOperation(description="Requests for perticular session",
impact=Impact.ReadOnly,params={@ManagementParameter(name="sessionId",description="The
session Identifier")})
public List<RequestMetadata> getRequestsForSession(long sessionId) {
return this.dqp.getRequestsForSession(sessionId);
}
+ @Override
@ManagementOperation(description="Active requests",
impact=Impact.ReadOnly)
public List<RequestMetadata> getRequests() {
return this.dqp.getRequests();
}
+ @Override
@ManagementOperation(description="Get Runtime workmanager statistics",
impact=Impact.ReadOnly,params={@ManagementParameter(name="identifier",description="Use
\"runtime\" for engine, or connector name for connector")})
public WorkerPoolStatisticsMetadata getWorkManagerStatistics(String identifier) {
if ("runtime".equalsIgnoreCase(identifier)) {
@@ -88,41 +93,61 @@
return null;
}
+ @Override
@ManagementOperation(description="Terminate a
Session",params={@ManagementParameter(name="terminateeId",description="The
session to be terminated")})
public void terminateSession(long terminateeId) {
this.dqp.terminateSession(terminateeId);
}
+ @Override
@ManagementOperation(description="Cancel a
Request",params={@ManagementParameter(name="sessionId",description="The
session Identifier"),
@ManagementParameter(name="requestId",description="The request
Identifier")})
- public boolean cancelRequest(long sessionId, long requestId) throws
MetaMatrixComponentException {
- return this.dqp.cancelRequest(sessionId, requestId);
+ public boolean cancelRequest(long sessionId, long requestId) throws AdminException {
+ try {
+ return this.dqp.cancelRequest(sessionId, requestId);
+ } catch (MetaMatrixComponentException e) {
+ throw new AdminComponentException(e);
+ }
}
+ @Override
@ManagementOperation(description="Get Cache types in the system",
impact=Impact.ReadOnly)
public Collection<String> getCacheTypes(){
return this.dqp.getCacheTypes();
}
+ @Override
@ManagementOperation(description="Clear the caches in the system",
impact=Impact.ReadOnly)
public void clearCache(String cacheType) {
this.dqp.clearCache(cacheType);
}
+ @Override
@ManagementOperation(description="Active sessions", impact=Impact.ReadOnly)
- public Collection<SessionMetadata> getActiveSessions() throws
SessionServiceException {
- return this.dqp.getActiveSessions();
+ public Collection<SessionMetadata> getActiveSessions() throws AdminException {
+ try {
+ return this.dqp.getActiveSessions();
+ } catch (SessionServiceException e) {
+ throw new AdminComponentException(e);
+ }
}
+ @Override
@ManagementProperty(description="Active session count",
use={ViewUse.STATISTIC}, readOnly=true)
- public int getActiveSessionsCount() throws SessionServiceException{
- return this.dqp.getActiveSessionsCount();
+ public int getActiveSessionsCount() throws AdminException{
+ try {
+ return this.dqp.getActiveSessionsCount();
+ } catch (SessionServiceException e) {
+ throw new AdminComponentException(e);
+ }
}
+ @Override
@ManagementOperation(description="Active Transactions",
impact=Impact.ReadOnly)
public Collection<org.teiid.adminapi.Transaction> getTransactions() {
return this.dqp.getTransactions();
}
+ @Override
@ManagementOperation(description="Clear the caches in the system",
impact=Impact.ReadOnly)
public void terminateTransaction(String xid) throws AdminException {
this.dqp.terminateTransaction(xid);
Modified:
branches/JCA/jboss-integration/src/main/java/org/teiid/adminapi/jboss/Admin.java
===================================================================
---
branches/JCA/jboss-integration/src/main/java/org/teiid/adminapi/jboss/Admin.java 2010-01-22
17:27:53 UTC (rev 1775)
+++
branches/JCA/jboss-integration/src/main/java/org/teiid/adminapi/jboss/Admin.java 2010-01-22
22:49:05 UTC (rev 1776)
@@ -120,6 +120,23 @@
return this.deploymentMgr;
}
+// private DQPManagement getDQPManagement() throws Exception {
+// final ManagedComponent mc = getView().getComponent(DQPManagementView.class.getName(),
DQPTYPE);
+//
+// return (DQPManagement)Proxy.newProxyInstance(this.getClass().getClassLoader(), new
Class[] {DQPManagement.class}, new InvocationHandler() {
+// @Override
+// public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+//
+// MetaValue value = ManagedUtil.executeOperation(mc, method.getName());
+// Class returnType = method.getReturnType();
+// if (returnType.equals(Void.class)) {
+// return value;
+// }
+// return null;
+// }
+// });
+// }
+
@Override
public Collection<ConnectorBinding> getConnectorBindings() throws AdminException
{
ArrayList<ConnectorBinding> bindings = new ArrayList<ConnectorBinding>();
Modified:
branches/JCA/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java
===================================================================
---
branches/JCA/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java 2010-01-22
17:27:53 UTC (rev 1775)
+++
branches/JCA/jboss-integration/src/main/java/org/teiid/jboss/deployers/RuntimeEngineDeployer.java 2010-01-22
22:49:05 UTC (rev 1776)
@@ -46,8 +46,6 @@
import org.teiid.transport.SocketConfiguration;
import org.teiid.transport.SocketTransport;
-import com.metamatrix.common.comm.ClientServiceRegistry;
-import com.metamatrix.common.comm.ClientServiceRegistryImpl;
import com.metamatrix.common.comm.api.ServerConnectionFactory;
import com.metamatrix.common.log.LogManager;
import com.metamatrix.core.log.MessageLevel;
@@ -60,11 +58,11 @@
import com.metamatrix.platform.security.api.service.SessionService;
public class RuntimeEngineDeployer extends
AbstractSimpleRealDeployer<ManagedConnectionFactoryDeploymentGroup> {
+ private static final String TEIID_RUNTIME = "java:teiid/runtime-engine";
protected Logger log = Logger.getLogger(getClass());
private ContainerHelper containerHelper;
private SocketTransport socketTransport;
private SocketConfiguration socketConfiguration;
- private ClientServiceRegistry clientServiceRegistry;
public RuntimeEngineDeployer() {
super(ManagedConnectionFactoryDeploymentGroup.class);
@@ -79,8 +77,7 @@
String connectorDefinition = data.getConnectionDefinition();
if
(connectorDefinition.equals("com.metamatrix.common.comm.api.ServerConnectionFactory"))
{
- ServerConnectionFactory scf =
(ServerConnectionFactory)ContainerUtil.lookup("java:teiid/runtime-engine");
- startEngine(scf);
+ startEngine();
log.info("Teiid Engine Started = " + new
Date(System.currentTimeMillis()).toString()); //$NON-NLS-1$
}
@@ -110,14 +107,14 @@
this.socketConfiguration = socketConfig;
}
- private void startEngine(ServerConnectionFactory scf) {
- DQPConfiguration config = scf.getService(DQPConfiguration.class);
- ClientServiceRegistry services = createClientServices(config,
scf.getService(WorkManager.class), scf.getService(XATerminator.class));
- scf.setService(ClientServiceRegistry.class, services);
-
- this.clientServiceRegistry = services;
-
+ private void startEngine() {
+ ServerConnectionFactory scf = ContainerUtil.lookup(TEIID_RUNTIME);
+
+ // create the necessary services
+ createClientServices(scf);
+
// Start the socket transport
+ DQPConfiguration config = scf.getService(DQPConfiguration.class);
if (config.getBindAddress() != null) {
this.socketConfiguration.setBindAddress(config.getBindAddress());
}
@@ -126,17 +123,17 @@
}
this.socketTransport = new SocketTransport(this.socketConfiguration);
- this.socketTransport.setClientServiceRegistry(services);
this.socketTransport.setWorkManager(scf.getService(WorkManager.class));
this.socketTransport.start();
}
private void stopEngine() {
+ ServerConnectionFactory scf = ContainerUtil.lookup(TEIID_RUNTIME);
// Stop DQP
- DQPCore dqp =
(DQPCore)this.clientServiceRegistry.getClientService(ClientSideDQP.class);
- dqp.stop();
+ ClientSideDQP dqp = scf.getService(ClientSideDQP.class);
+ ((DQPCore)dqp).stop();
// Stop socket transport
if (this.socketTransport != null) {
@@ -145,26 +142,21 @@
}
}
- private ClientServiceRegistry createClientServices(DQPConfiguration config, WorkManager
workMgr, XATerminator terminator) {
-
+ private void createClientServices(ServerConnectionFactory scf) {
DQPCore dqp = new DQPCore();
- dqp.setTransactionService(getTransactionService("localhost",
terminator));
- dqp.setWorkManager(workMgr);
+ dqp.setTransactionService(getTransactionService("localhost",
scf.getService(XATerminator.class)));
+ dqp.setWorkManager(scf.getService(WorkManager.class));
dqp.setAuthorizationService(this.containerHelper.getService(AuthorizationService.class));
dqp.setBufferService(this.containerHelper.getService(BufferService.class));
dqp.setSessionService(this.containerHelper.getService(SessionService.class));
dqp.setConnectorManagerRepository(this.containerHelper.getService(ConnectorManagerRepository.class));
- dqp.start(config);
+ dqp.start(scf.getService(DQPConfiguration.class));
DQPManagementView holder = this.containerHelper.getService(DQPManagementView.class);
holder.setDQP(dqp);
- ClientServiceRegistry services = new ClientServiceRegistryImpl();
- services.registerClientService(ILogon.class, new LogonImpl(dqp.getSessionService(),
"teiid-cluster"), com.metamatrix.common.util.LogConstants.CTX_SERVER);
-
- services.registerClientService(ClientSideDQP.class, dqp,
LogConstants.CTX_QUERY_SERVICE);
-
- return services;
+ scf.registerClientService(ILogon.class, new LogonImpl(dqp.getSessionService(),
"teiid-cluster"), com.metamatrix.common.util.LogConstants.CTX_SERVER);
+ scf.registerClientService(ClientSideDQP.class, dqp,
LogConstants.CTX_QUERY_SERVICE);
}
private TransactionService getTransactionService(String processName, XATerminator
terminator) {
Modified: branches/JCA/runtime/src/main/java/org/teiid/TeiidConnectionFactory.java
===================================================================
--- branches/JCA/runtime/src/main/java/org/teiid/TeiidConnectionFactory.java 2010-01-22
17:27:53 UTC (rev 1775)
+++ branches/JCA/runtime/src/main/java/org/teiid/TeiidConnectionFactory.java 2010-01-22
22:49:05 UTC (rev 1776)
@@ -22,7 +22,12 @@
package org.teiid;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
import javax.resource.ResourceException;
import javax.resource.spi.ConnectionManager;
@@ -30,8 +35,9 @@
import javax.resource.spi.work.WorkManager;
import org.teiid.dqp.internal.process.DQPConfiguration;
+import org.teiid.dqp.internal.process.DQPWorkContext;
-import com.metamatrix.common.comm.ClientServiceRegistry;
+import com.metamatrix.client.ExceptionUtil;
import com.metamatrix.common.comm.api.ServerConnection;
import com.metamatrix.common.comm.api.ServerConnectionFactory;
import com.metamatrix.common.comm.exception.CommunicationException;
@@ -39,6 +45,8 @@
import com.metamatrix.common.log.LogManager;
import com.metamatrix.jdbc.LogConfigurationProvider;
import com.metamatrix.jdbc.LogListernerProvider;
+import com.metamatrix.platform.security.api.ILogon;
+import com.metamatrix.platform.security.api.SessionToken;
/**
@@ -51,7 +59,8 @@
private TeiidResourceAdapter ra;
private TeiidManagedConnectionFactory mcf;
private ConnectionManager cxManager;
- private ClientServiceRegistry clientServices = null;
+ private ConcurrentHashMap<Class, Object> clientServices = new
ConcurrentHashMap<Class, Object>();
+ private ConcurrentHashMap<Class, String> loggingContext = new
ConcurrentHashMap<Class, String>();
public TeiidConnectionFactory(TeiidResourceAdapter ra, TeiidManagedConnectionFactory
mcf, ConnectionManager cxmanager) {
@@ -66,7 +75,8 @@
@Override
public ServerConnection getConnection(Properties connectionProperties) throws
CommunicationException, ConnectionException {
try {
- return (ServerConnection)cxManager.allocateConnection(this.mcf, new
ConnectionInfo(connectionProperties, this.clientServices));
+ // this code will not be invoked as teiid does not use managed connection.
+ return (ServerConnection)cxManager.allocateConnection(this.mcf, new
ConnectionInfo(connectionProperties, null));
} catch (ResourceException e) {
throw new ConnectionException(e);
}
@@ -85,15 +95,51 @@
return type.cast(mcf);
}
- throw new IllegalArgumentException(type + " Sevice is not available");
+ // see if there are any client services.
+ Object service = this.clientServices.get(type);
+ if (service != null) {
+ return type.cast(proxyService(type, type.cast(service)));
+ }
+
+ return null;
}
- public <T> void setService(Class<T> type, T instance) {
- if (type.equals(ClientServiceRegistry.class)) {
- this.clientServices = (ClientServiceRegistry)instance;
- }
+ @Override
+ public <T> void registerClientService(Class<T> type, T instance, String
loggingContext) {
+ this.clientServices.put(type, instance);
+ this.loggingContext.put(type, loggingContext);
}
+ @Override
+ public <T> String getLoggingContextForService(Class<T> type) {
+ return this.loggingContext.get(type);
+ }
+
+ private <T> T proxyService(final Class<T> iface, final T instance) {
+
+ return (T) Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]
{iface}, new InvocationHandler() {
+
+ public Object invoke(Object arg0, Method arg1, Object[] arg2) throws Throwable {
+
+ Throwable exception = null;
+ ClassLoader current = Thread.currentThread().getContextClassLoader();
+ try {
+ if (!(iface.equals(ILogon.class))) {
+ ((ILogon)clientServices.get(ILogon.class)).assertIdentity(SessionToken.getSession());
+ }
+ return arg1.invoke(instance, arg2);
+ } catch (InvocationTargetException e) {
+ exception = e.getTargetException();
+ } catch(Throwable t){
+ exception = t;
+ } finally {
+ Thread.currentThread().setContextClassLoader(current);
+ DQPWorkContext.releaseWorkContext();
+ }
+ throw ExceptionUtil.convertException(arg1, exception);
+ }
+ });
+ }
// public MMProcess getProcess() {
//
Modified: branches/JCA/runtime/src/main/java/org/teiid/TeiidManagedConnection.java
===================================================================
--- branches/JCA/runtime/src/main/java/org/teiid/TeiidManagedConnection.java 2010-01-22
17:27:53 UTC (rev 1775)
+++ branches/JCA/runtime/src/main/java/org/teiid/TeiidManagedConnection.java 2010-01-22
22:49:05 UTC (rev 1776)
@@ -76,18 +76,10 @@
@Override
public Object getConnection(Subject arg0, ConnectionRequestInfo arg1) throws
ResourceException {
ConnectionInfo ci = (ConnectionInfo)arg1;
- try {
- this.conn = new WrappedConnection(new LocalServerConnection(ci.properties,
ci.clientServices));
- this.conn.setManagedConnection(this);
- return this.conn;
- } catch (CommunicationException e) {
- throw new ResourceException(e);
- } catch (ConnectionException e) {
- if (e.getCause() instanceof ResourceException) {
- throw (ResourceException)e.getCause();
- }
- throw new ResourceException(e);
- }
+ // This code will not be invoked as managed connection is not used anywhere.
+ this.conn = new WrappedConnection(null);
+ this.conn.setManagedConnection(this);
+ return this.conn;
}
@Override
Modified: branches/JCA/runtime/src/main/java/org/teiid/WrappedConnection.java
===================================================================
--- branches/JCA/runtime/src/main/java/org/teiid/WrappedConnection.java 2010-01-22
17:27:53 UTC (rev 1775)
+++ branches/JCA/runtime/src/main/java/org/teiid/WrappedConnection.java 2010-01-22
22:49:05 UTC (rev 1776)
@@ -69,13 +69,4 @@
void setManagedConnection(TeiidManagedConnection teiidManagedConnection) {
this.mc = teiidManagedConnection;
}
-
- @Override
- public void reallyClose() {
- close();
- if (this.delegate != null) {
- delegate.close();
- delegate = null;
- }
- }
}
Deleted:
branches/JCA/runtime/src/main/java/org/teiid/transport/LocalServerConnection.java
===================================================================
---
branches/JCA/runtime/src/main/java/org/teiid/transport/LocalServerConnection.java 2010-01-22
17:27:53 UTC (rev 1775)
+++
branches/JCA/runtime/src/main/java/org/teiid/transport/LocalServerConnection.java 2010-01-22
22:49:05 UTC (rev 1776)
@@ -1,161 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.transport;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.teiid.dqp.internal.process.DQPWorkContext;
-
-import com.metamatrix.admin.api.exception.security.InvalidSessionException;
-import com.metamatrix.api.exception.MetaMatrixComponentException;
-import com.metamatrix.api.exception.security.LogonException;
-import com.metamatrix.client.ExceptionUtil;
-import com.metamatrix.common.comm.ClientServiceRegistry;
-import com.metamatrix.common.comm.api.ServerConnection;
-import com.metamatrix.common.comm.exception.CommunicationException;
-import com.metamatrix.common.comm.exception.ConnectionException;
-import com.metamatrix.common.comm.platform.CommPlatformPlugin;
-import com.metamatrix.jdbc.JDBCPlugin;
-import com.metamatrix.platform.security.api.ILogon;
-import com.metamatrix.platform.security.api.LogonResult;
-
-public class LocalServerConnection implements ServerConnection {
-
- private final LogonResult result;
- private boolean shutdown;
- private DQPWorkContext workContext;
- private ClientServiceRegistry clientServices;
- private ILogon logon;
-
- public LocalServerConnection(Properties connectionProperties, ClientServiceRegistry
clientServices) throws CommunicationException, ConnectionException{
- this.clientServices = clientServices;
- this.workContext = new DQPWorkContext();
- DQPWorkContext.setWorkContext(this.workContext);
- this.logon = this.getService(ILogon.class);
- this.result = authenticate(connectionProperties);
- }
-
- public synchronized LogonResult authenticate(Properties connProps) throws
ConnectionException, CommunicationException {
- try {
- connProps.setProperty("localConnection", "true");
- LogonResult logonResult = this.logon.logon(connProps);
- return logonResult;
- } catch (LogonException e) {
- // Propagate the original message as it contains the message we want
- // to give to the user
- throw new ConnectionException(e, e.getMessage());
- } catch (MetaMatrixComponentException e) {
- if (e.getCause() instanceof CommunicationException) {
- throw (CommunicationException)e.getCause();
- }
- throw new CommunicationException(e,
CommPlatformPlugin.Util.getString("PlatformServerConnectionFactory.Unable_to_find_a_component_used_in_logging_on_to_MetaMatrix"));
//$NON-NLS-1$
- }
- }
-
-
- @SuppressWarnings("unchecked")
- public <T> T getService(final Class<T> iface) {
-
- return (T) Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]
{iface}, new InvocationHandler() {
-
- public Object invoke(Object arg0, Method arg1, Object[] arg2) throws Throwable {
- if (!isOpen()) {
- throw ExceptionUtil.convertException(arg1, new
MetaMatrixComponentException(JDBCPlugin.Util.getString("LocalTransportHandler.session_inactive")));
//$NON-NLS-1$
- }
- Throwable exception = null;
- ClassLoader current = Thread.currentThread().getContextClassLoader();
- try {
- DQPWorkContext.setWorkContext(workContext);
- if (!(iface.equals(ILogon.class))) {
- logon.assertIdentity(result.getSessionToken());
- }
- return arg1.invoke(clientServices.getClientService(iface), arg2);
- } catch (InvocationTargetException e) {
- exception = e.getTargetException();
- } catch(Throwable t){
- exception = t;
- } finally {
- Thread.currentThread().setContextClassLoader(current);
- DQPWorkContext.releaseWorkContext();
- }
- throw ExceptionUtil.convertException(arg1, exception);
- }
- });
- }
-
- public boolean isOpen() {
- return !shutdown;
- }
-
- public void close() {
- // no-op managed connection close
- }
-
- @Override
- public void reallyClose() {
- shutdown(true);
- }
-
- private void shutdown(boolean logoff) {
- if (shutdown) {
- return;
- }
-
- if (logoff) {
- try {
- //make a best effort to send the logoff
- Future<?> writeFuture = this.logon.logoff();
- if (writeFuture != null) {
- writeFuture.get(5000, TimeUnit.MILLISECONDS);
- }
- } catch (InvalidSessionException e) {
- //ignore
- } catch (InterruptedException e) {
- //ignore
- } catch (ExecutionException e) {
- //ignore
- } catch (TimeoutException e) {
- //ignore
- }
- }
- this.workContext = null;
- this.shutdown = true;
- }
-
- public LogonResult getLogonResult() {
- return result;
- }
-
- @Override
- public boolean isSameInstance(ServerConnection conn) throws CommunicationException {
- return (conn instanceof LocalServerConnection);
- }
-}
Modified: branches/JCA/runtime/src/main/java/org/teiid/transport/LogonImpl.java
===================================================================
--- branches/JCA/runtime/src/main/java/org/teiid/transport/LogonImpl.java 2010-01-22
17:27:53 UTC (rev 1775)
+++ branches/JCA/runtime/src/main/java/org/teiid/transport/LogonImpl.java 2010-01-22
22:49:05 UTC (rev 1776)
@@ -89,6 +89,9 @@
long sessionID = s.getSessionId();
DQPWorkContext workContext = DQPWorkContext.getWorkContext();
+ if (workContext == null) {
+ workContext = new DQPWorkContext();
+ }
workContext.setSessionToken(s.getAttachment(SessionToken.class));
workContext.setAppName(s.getApplicationName());
@@ -105,6 +108,7 @@
workContext.setVdbVersion(vdb.getVersion());
workContext.setVdb(vdb);
}
+ DQPWorkContext.setWorkContext(workContext);
return sessionID;
}
Modified: branches/JCA/runtime/src/main/java/org/teiid/transport/ServerWorkItem.java
===================================================================
--- branches/JCA/runtime/src/main/java/org/teiid/transport/ServerWorkItem.java 2010-01-22
17:27:53 UTC (rev 1775)
+++ branches/JCA/runtime/src/main/java/org/teiid/transport/ServerWorkItem.java 2010-01-22
22:49:05 UTC (rev 1776)
@@ -38,8 +38,8 @@
import com.metamatrix.api.exception.ComponentNotFoundException;
import com.metamatrix.api.exception.ExceptionHolder;
import com.metamatrix.api.exception.MetaMatrixProcessingException;
-import com.metamatrix.common.comm.ClientServiceRegistry;
import com.metamatrix.common.comm.api.Message;
+import com.metamatrix.common.comm.api.ServerConnectionFactory;
import com.metamatrix.common.comm.platform.socket.client.ServiceInvocationStruct;
import com.metamatrix.common.log.LogManager;
import com.metamatrix.common.util.LogConstants;
@@ -49,30 +49,29 @@
import com.metamatrix.dqp.client.ResultsFuture;
import com.metamatrix.dqp.embedded.DQPEmbeddedPlugin;
import com.metamatrix.platform.security.api.ILogon;
+import com.metamatrix.platform.security.api.SessionToken;
-public class ServerWorkItem implements Runnable {
+public class ServerWorkItem {
+
private final ClientInstance socketClientInstance;
private final Serializable messageKey;
private final Message message;
- private final ClientServiceRegistry server;
+ private ServerConnectionFactory scf;
- public ServerWorkItem(ClientInstance socketClientInstance,
- Serializable messageKey, Message message,
- ClientServiceRegistry server) {
+ public ServerWorkItem(ClientInstance socketClientInstance, Serializable messageKey,
Message message, ServerConnectionFactory server) {
this.socketClientInstance = socketClientInstance;
this.messageKey = messageKey;
this.message = message;
- this.server = server;
+ this.scf = server;
}
/**
* main entry point for remote method calls. encryption/decryption is
* handled here so that it won't be done by the io thread
*/
- public void run() {
- DQPWorkContext.setWorkContext(this.socketClientInstance.getWorkContext());
+ public void process() {
Message result = null;
- String service = null;
+ DQPWorkContext.setWorkContext(this.socketClientInstance.getWorkContext());
final boolean encrypt = message.getContents() instanceof SealedObject;
try {
message.setContents(this.socketClientInstance.getCryptor().unsealObject(message.getContents()));
@@ -81,16 +80,14 @@
throw new AssertionError("unknown message contents"); //$NON-NLS-1$
}
final ServiceInvocationStruct serviceStruct =
(ServiceInvocationStruct)message.getContents();
- Object instance = server.getClientService(serviceStruct.targetClass);
+ final Class type = Class.forName(serviceStruct.targetClass);
+ Object instance = this.scf.getService(type);
if (instance == null) {
throw new
ComponentNotFoundException(DQPEmbeddedPlugin.Util.getString("ServerWorkItem.Component_Not_Found",
serviceStruct.targetClass)); //$NON-NLS-1$
}
if (!(instance instanceof ILogon)) {
- DQPWorkContext workContext = this.socketClientInstance.getWorkContext();
- ILogon logonModule = server.getClientService(ILogon.class);
- logonModule.assertIdentity(workContext.getSessionToken());
+ SessionToken.setSession(this.socketClientInstance.getWorkContext().getSessionToken());
}
- service = serviceStruct.targetClass;
ReflectionHelper helper = new ReflectionHelper(instance.getClass());
Method m = helper.findBestMethodOnTarget(serviceStruct.methodName,
serviceStruct.args);
Object methodResult;
@@ -109,9 +106,9 @@
try {
asynchResult.setContents(completedFuture.get());
} catch (InterruptedException e) {
- asynchResult.setContents(processException(e, serviceStruct.targetClass));
+ asynchResult.setContents(processException(e,
scf.getLoggingContextForService(type)));
} catch (ExecutionException e) {
- asynchResult.setContents(processException(e.getCause(),
serviceStruct.targetClass));
+ asynchResult.setContents(processException(e.getCause(),
scf.getLoggingContextForService(type)));
}
sendResult(asynchResult, encrypt);
}
@@ -124,11 +121,12 @@
}
} catch (Throwable t) {
Message holder = new Message();
- holder.setContents(processException(t, service));
+ holder.setContents(processException(t, null));
result = holder;
} finally {
DQPWorkContext.releaseWorkContext();
}
+
if (result != null) {
sendResult(result, encrypt);
}
@@ -145,11 +143,7 @@
socketClientInstance.send(result, messageKey);
}
- private Serializable processException(Throwable e, String service) {
- String context = null;
- if (service != null) {
- context = this.server.getLoggingContextForService(service);
- }
+ private Serializable processException(Throwable e, String context) {
if (context == null) {
context = LogConstants.CTX_SERVER;
}
@@ -176,5 +170,4 @@
LogManager.logDetail(context, e, "Processing exception for session",
this.socketClientInstance.getWorkContext().getConnectionID()); //$NON-NLS-1$
LogManager.logWarning(context,
DQPEmbeddedPlugin.Util.getString("ServerWorkItem.processing_error",
e.getMessage(), this.socketClientInstance.getWorkContext().getConnectionID(),
e.getClass().getName(), elem)); //$NON-NLS-1$
}
-
}
\ No newline at end of file
Modified:
branches/JCA/runtime/src/main/java/org/teiid/transport/SocketClientInstance.java
===================================================================
---
branches/JCA/runtime/src/main/java/org/teiid/transport/SocketClientInstance.java 2010-01-22
17:27:53 UTC (rev 1775)
+++
branches/JCA/runtime/src/main/java/org/teiid/transport/SocketClientInstance.java 2010-01-22
22:49:05 UTC (rev 1776)
@@ -28,14 +28,13 @@
import org.teiid.dqp.internal.process.DQPWorkContext;
-import com.metamatrix.common.comm.ClientServiceRegistry;
import com.metamatrix.common.comm.api.Message;
+import com.metamatrix.common.comm.api.ServerConnectionFactory;
import com.metamatrix.common.comm.exception.CommunicationException;
import com.metamatrix.common.comm.platform.CommPlatformPlugin;
import com.metamatrix.common.comm.platform.socket.Handshake;
import com.metamatrix.common.comm.platform.socket.ObjectChannel;
import com.metamatrix.common.log.LogManager;
-import com.metamatrix.common.queue.WorkerPool;
import com.metamatrix.common.util.LogConstants;
import com.metamatrix.common.util.crypto.CryptoException;
import com.metamatrix.common.util.crypto.Cryptor;
@@ -54,16 +53,14 @@
public class SocketClientInstance implements ChannelListener, ClientInstance {
private final ObjectChannel objectSocket;
- private final WorkerPool workerPool;
- private final ClientServiceRegistry server;
private Cryptor cryptor;
+ private ServerConnectionFactory server;
private boolean usingEncryption;
private DhKeyGenerator keyGen;
private DQPWorkContext workContext = new DQPWorkContext();
- public SocketClientInstance(ObjectChannel objectSocket, WorkerPool workerPool,
ClientServiceRegistry server, boolean isClientEncryptionEnabled) {
+ public SocketClientInstance(ObjectChannel objectSocket, ServerConnectionFactory
server, boolean isClientEncryptionEnabled) {
this.objectSocket = objectSocket;
- this.workerPool = workerPool;
this.server = server;
this.usingEncryption = isClientEncryptionEnabled;
SocketAddress address = this.objectSocket.getRemoteAddress();
@@ -142,7 +139,8 @@
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_SERVER, MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_SERVER, "processing message:" +
packet); //$NON-NLS-1$
}
- workerPool.execute(new ServerWorkItem(this, packet.getMessageKey(), packet,
this.server));
+ ServerWorkItem work = new ServerWorkItem(this, packet.getMessageKey(), packet,
this.server);
+ work.process();
}
public void shutdown() throws CommunicationException {
Modified: branches/JCA/runtime/src/main/java/org/teiid/transport/SocketListener.java
===================================================================
--- branches/JCA/runtime/src/main/java/org/teiid/transport/SocketListener.java 2010-01-22
17:27:53 UTC (rev 1775)
+++ branches/JCA/runtime/src/main/java/org/teiid/transport/SocketListener.java 2010-01-22
22:49:05 UTC (rev 1776)
@@ -25,6 +25,8 @@
import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
import javax.net.ssl.SSLEngine;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkException;
@@ -38,7 +40,7 @@
import org.teiid.transport.ChannelListener.ChannelListenerFactory;
import com.metamatrix.common.CommonPlugin;
-import com.metamatrix.common.comm.ClientServiceRegistry;
+import com.metamatrix.common.comm.api.ServerConnectionFactory;
import com.metamatrix.common.comm.platform.socket.ObjectChannel;
import com.metamatrix.common.log.LogManager;
import com.metamatrix.common.queue.WorkerPool;
@@ -51,8 +53,8 @@
* Server-side class to listen for new connection requests and create a
SocketClientConnection for each connection request.
*/
public class SocketListener implements ChannelListenerFactory {
- private ClientServiceRegistry server;
- private SSLAwareChannelHandler channelHandler;
+ private static final String TEIID_RUNTIME = "java:teiid/runtime-engine";
+ private SSLAwareChannelHandler channelHandler;
private Channel serverChanel;
private boolean isClientEncryptionEnabled;
private WorkerPool workerPool;
@@ -68,14 +70,13 @@
* @param workerPool
* @param engine null if SSL is disabled
*/
- public SocketListener(int port, String bindAddress, ClientServiceRegistry server, int
inputBufferSize,
+ public SocketListener(int port, String bindAddress, int inputBufferSize,
int outputBufferSize, int maxWorkers, SSLEngine engine, boolean
isClientEncryptionEnabled, WorkManager workManager) {
this.isClientEncryptionEnabled = isClientEncryptionEnabled;
if (port < 0 || port > 0xFFFF) {
throw new IllegalArgumentException("port out of range:" + port);
//$NON-NLS-1$
}
- this.server = server;
this.executor = new WorkManagerExecutor(workManager);
this.workerPool = WorkerPoolFactory.newWorkerPool("SocketWorker",
maxWorkers, this.executor); //$NON-NLS-1$
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_SERVER,
MessageLevel.DETAIL)) {
@@ -125,7 +126,7 @@
}
public ChannelListener createChannelListener(ObjectChannel channel) {
- return new SocketClientInstance(channel, this.workerPool, this.server,
this.isClientEncryptionEnabled);
+ return new SocketClientInstance(channel, getServer(), this.isClientEncryptionEnabled);
}
static class WorkManagerExecutor implements Executor{
@@ -153,4 +154,13 @@
}
}
}
+
+ protected ServerConnectionFactory getServer() {
+ try {
+ InitialContext ic = new InitialContext();
+ return (ServerConnectionFactory)ic.lookup(TEIID_RUNTIME);
+ } catch (NamingException e) {
+ return null;
+ }
+ }
}
\ No newline at end of file
Modified: branches/JCA/runtime/src/main/java/org/teiid/transport/SocketTransport.java
===================================================================
--- branches/JCA/runtime/src/main/java/org/teiid/transport/SocketTransport.java 2010-01-22
17:27:53 UTC (rev 1775)
+++ branches/JCA/runtime/src/main/java/org/teiid/transport/SocketTransport.java 2010-01-22
22:49:05 UTC (rev 1776)
@@ -29,7 +29,6 @@
import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata;
-import com.metamatrix.common.comm.ClientServiceRegistry;
import com.metamatrix.common.log.LogManager;
import com.metamatrix.common.util.LogConstants;
import com.metamatrix.core.MetaMatrixRuntimeException;
@@ -42,7 +41,6 @@
public class SocketTransport {
private SocketListener listener;
- private ClientServiceRegistry clientServices;
private WorkManager workManager;
private SocketConfiguration config;
@@ -56,7 +54,7 @@
try {
if (this.config.isEnabled()) {
LogManager.logDetail(LogConstants.CTX_SERVER,
DQPEmbeddedPlugin.Util.getString("SocketTransport.1", new Object[] {bindAddress,
String.valueOf(this.config.getPortNumber())})); //$NON-NLS-1$
- this.listener = new SocketListener(this.config.getPortNumber(), bindAddress,
this.clientServices, this.config.getInputBufferSize(), this.config.getOutputBufferSize(),
this.config.getMaxSocketThreads(), this.config.getSSLConfiguration().getServerSSLEngine(),
this.config.getSSLConfiguration().isClientEncryptionEnabled(), this.workManager);
+ this.listener = new SocketListener(this.config.getPortNumber(), bindAddress,
this.config.getInputBufferSize(), this.config.getOutputBufferSize(),
this.config.getMaxSocketThreads(), this.config.getSSLConfiguration().getServerSSLEngine(),
this.config.getSSLConfiguration().isClientEncryptionEnabled(), this.workManager);
}
else {
LogManager.logDetail(LogConstants.CTX_SERVER,
DQPEmbeddedPlugin.Util.getString("SocketTransport.3")); //$NON-NLS-1$
@@ -87,12 +85,7 @@
return this.listener.getStats();
}
- public void setClientServiceRegistry(ClientServiceRegistry services) {
- this.clientServices = services;
- }
-
public void setWorkManager(WorkManager mgr) {
this.workManager = mgr;
}
-
}
Modified: branches/JCA/runtime/src/test/java/org/teiid/transport/TestCommSockets.java
===================================================================
--- branches/JCA/runtime/src/test/java/org/teiid/transport/TestCommSockets.java 2010-01-22
17:27:53 UTC (rev 1775)
+++ branches/JCA/runtime/src/test/java/org/teiid/transport/TestCommSockets.java 2010-01-22
22:49:05 UTC (rev 1776)
@@ -35,6 +35,7 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import org.teiid.dqp.internal.datamgr.impl.FakeWorkManager;
import com.metamatrix.api.exception.ComponentNotFoundException;
@@ -42,6 +43,7 @@
import com.metamatrix.common.api.MMURL;
import com.metamatrix.common.comm.ClientServiceRegistry;
import com.metamatrix.common.comm.ClientServiceRegistryImpl;
+import com.metamatrix.common.comm.api.ServerConnectionFactory;
import com.metamatrix.common.comm.exception.CommunicationException;
import com.metamatrix.common.comm.exception.ConnectionException;
import com.metamatrix.common.comm.platform.socket.SocketUtil;
@@ -73,8 +75,7 @@
ClientServiceRegistry csr = new ClientServiceRegistryImpl();
SessionService sessionService = mock(SessionService.class);
csr.registerClientService(ILogon.class, new LogonImpl(sessionService,
"fakeCluster"), "foo"); //$NON-NLS-1$ //$NON-NLS-2$
- listener = new SocketListener(addr.getPort(), addr.getAddress().getHostAddress(),
- csr, 1024, 1024, 1, null, true, new FakeWorkManager());
+ listener = new SocketListener(addr.getPort(), addr.getAddress().getHostAddress(),1024,
1024, 1, null, true, new FakeWorkManager());
try {
Properties p = new Properties();
@@ -141,17 +142,20 @@
SSLEngine serverSSL, boolean isClientEncryptionEnabled, Properties socketConfig)
throws CommunicationException,
ConnectionException {
if (listener == null) {
- SessionService sessionService = mock(SessionService.class);
- ClientServiceRegistry csr = new ClientServiceRegistryImpl();
- csr.registerClientService(ILogon.class, new LogonImpl(sessionService,
"fakeCluster") { //$NON-NLS-1$
- @Override
- public LogonResult logon(Properties connProps)
- throws LogonException, ComponentNotFoundException {
- return new LogonResult();
+ listener = new SocketListener(addr.getPort(), addr.getAddress().getHostAddress(),
1024, 1024, 1, serverSSL, isClientEncryptionEnabled, new FakeWorkManager()) {
+ protected ServerConnectionFactory getServer() {
+ ServerConnectionFactory server = Mockito.mock(ServerConnectionFactory.class);
+ Mockito.stub(server.getService(ILogon.class)).toReturn(new
LogonImpl(mock(SessionService.class), "fakeCluster") { //$NON-NLS-1$
+ @Override
+ public LogonResult logon(Properties connProps)
+ throws LogonException, ComponentNotFoundException {
+ return new LogonResult();
+ }
+ });
+ return server;
}
- }, "foo"); //$NON-NLS-1$
- listener = new SocketListener(addr.getPort(), addr.getAddress().getHostAddress(),
- csr, 1024, 1024, 1, serverSSL, isClientEncryptionEnabled, new FakeWorkManager());
+ };
+
SocketListenerStats stats = listener.getStats();
assertEquals(0, stats.maxSockets);
assertEquals(0, stats.objectsRead);
Modified: branches/JCA/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java
===================================================================
---
branches/JCA/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java 2010-01-22
17:27:53 UTC (rev 1775)
+++
branches/JCA/runtime/src/test/java/org/teiid/transport/TestSocketRemoting.java 2010-01-22
22:49:05 UTC (rev 1776)
@@ -22,6 +22,11 @@
package org.teiid.transport;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import java.io.IOException;
import java.io.Serializable;
import java.util.Properties;
@@ -29,8 +34,13 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import junit.framework.TestCase;
+import javax.resource.spi.ConnectionManager;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.teiid.TeiidConnectionFactory;
+import org.teiid.TeiidManagedConnectionFactory;
+import org.teiid.TeiidResourceAdapter;
import org.teiid.dqp.internal.process.DQPWorkContext;
import com.metamatrix.admin.api.exception.security.InvalidSessionException;
@@ -39,10 +49,9 @@
import com.metamatrix.api.exception.security.LogonException;
import com.metamatrix.common.api.HostInfo;
import com.metamatrix.common.api.MMURL;
-import com.metamatrix.common.comm.ClientServiceRegistry;
-import com.metamatrix.common.comm.ClientServiceRegistryImpl;
import com.metamatrix.common.comm.api.Message;
import com.metamatrix.common.comm.api.ResultsReceiver;
+import com.metamatrix.common.comm.api.ServerConnectionFactory;
import com.metamatrix.common.comm.exception.CommunicationException;
import com.metamatrix.common.comm.exception.ConnectionException;
import com.metamatrix.common.comm.platform.socket.client.SocketServerConnection;
@@ -59,7 +68,7 @@
import com.metamatrix.platform.security.api.LogonResult;
import com.metamatrix.platform.security.api.SessionToken;
-public class TestSocketRemoting extends TestCase {
+public class TestSocketRemoting {
public interface FakeService {
@@ -85,12 +94,12 @@
private static class FakeClientServerInstance extends SocketServerInstanceImpl
implements ClientInstance {
- ClientServiceRegistry clientServiceRegistry;
+ ServerConnectionFactory server;
private ResultsReceiver<Object> listener;
- public FakeClientServerInstance(ClientServiceRegistry clientServiceRegistry) {
+ public FakeClientServerInstance(ServerConnectionFactory server) {
super();
- this.clientServiceRegistry = clientServiceRegistry;
+ this.server = server;
}
public HostInfo getHostInfo() {
@@ -105,9 +114,9 @@
public void send(Message message, ResultsReceiver<Object> listener,
Serializable messageKey) throws CommunicationException,
InterruptedException {
- ServerWorkItem workItem = new ServerWorkItem(this, messageKey, message,
clientServiceRegistry);
+ ServerWorkItem workItem = new ServerWorkItem(this, messageKey, message, server);
this.listener = listener;
- workItem.run();
+ workItem.process();
}
public void shutdown() {
@@ -131,18 +140,20 @@
/**
* No server was supplied, will throw an NPE under the covers
*/
+ @Test
public void testUnckedException() throws Exception {
FakeClientServerInstance serverInstance = new FakeClientServerInstance(null);
try {
createFakeConnection(serverInstance);
fail("expected exception"); //$NON-NLS-1$
} catch (CommunicationException e) {
- assertEquals("Unable to find a component used in logging on to MetaMatrix",
e.getMessage()); //$NON-NLS-1$
+ assertEquals("Unable to find a component used in logging on to Teiid",
e.getMessage()); //$NON-NLS-1$
}
}
+ @Test
public void testMethodInvocation() throws Exception {
- ClientServiceRegistry csr = new ClientServiceRegistryImpl();
+ TeiidConnectionFactory csr = new TeiidConnectionFactory(new TeiidResourceAdapter(), new
TeiidManagedConnectionFactory(), Mockito.mock(ConnectionManager.class));
csr.registerClientService(ILogon.class, new ILogon() {
public ResultsFuture<?> logoff()