Author: shawkins
Date: 2012-09-18 15:18:44 -0400 (Tue, 18 Sep 2012)
New Revision: 4456
Added:
trunk/runtime/src/main/resources/infinispan-replicated-config.xml
trunk/runtime/src/main/resources/tcp-shared.xml
trunk/test-integration/common/src/test/resources/infinispan-replicated-config-1.xml
trunk/test-integration/common/src/test/resources/tcp-shared-1.xml
Removed:
trunk/test-integration/common/src/test/resources/infinispan-replicated-config.xml
Modified:
trunk/client/src/main/java/org/teiid/jdbc/ConnectionImpl.java
trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedConfiguration.java
trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestReplication.java
Log:
TEIID-2215 making non-clustered by default, using a shared transport, and adding cleanup
Modified: trunk/client/src/main/java/org/teiid/jdbc/ConnectionImpl.java
===================================================================
--- trunk/client/src/main/java/org/teiid/jdbc/ConnectionImpl.java 2012-09-18 11:57:37 UTC
(rev 4455)
+++ trunk/client/src/main/java/org/teiid/jdbc/ConnectionImpl.java 2012-09-18 19:18:44 UTC
(rev 4456)
@@ -27,7 +27,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -484,22 +483,14 @@
public int getTransactionIsolation() throws SQLException {
return this.transactionIsolation;
- }
-
+ }
+
+ @Override
+ public Map<String, Class<?>> getTypeMap() throws SQLException {
+ return Collections.emptyMap();
+ }
+
/**
- * Retreives the type map associated with this Connection object. The type map
- * contains entries for undefined types. This method always returns an empty
- * map since it is not possible to add entries to this type map
- * @return map containing undefined types(empty)
- * @throws SQLException, should never occur
- */
- public Map getTypeMap() throws SQLException {
- //Check to see the connection is open
- checkConnection();
- return new HashMap();
- }
-
- /**
* <p>This method will return the first warning reported by calls on this
connection,
* or null if none exist.</p>
* @return A SQLWarning object if there are any warnings.
Modified: trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedConfiguration.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedConfiguration.java 2012-09-18
11:57:37 UTC (rev 4455)
+++ trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedConfiguration.java 2012-09-18
19:18:44 UTC (rev 4456)
@@ -23,7 +23,10 @@
package org.teiid.runtime;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.WeakHashMap;
import java.util.concurrent.Executors;
import javax.resource.spi.work.WorkManager;
@@ -31,6 +34,7 @@
import org.infinispan.manager.DefaultCacheManager;
import org.jgroups.Channel;
+import org.jgroups.ChannelListener;
import org.jgroups.JChannel;
import org.teiid.cache.CacheFactory;
import org.teiid.cache.infinispan.InfinispanCacheFactory;
@@ -45,6 +49,37 @@
public class EmbeddedConfiguration extends DQPConfiguration {
+ private final class SimpleChannelFactory implements ChannelFactory, ChannelListener {
+ private final Map<Channel, String> channels = new WeakHashMap<Channel,
String>();
+
+ @Override
+ public Channel createChannel(String id) throws Exception {
+ JChannel channel = new
JChannel(this.getClass().getClassLoader().getResource(getJgroupsConfigFile()));
+ channels.put(channel, id);
+ channel.addChannelListener(this);
+ return channel;
+ }
+
+ @Override
+ public void channelClosed(Channel channel) {
+ channels.remove(channel);
+ }
+
+ @Override
+ public void channelConnected(Channel channel) {
+ }
+
+ @Override
+ public void channelDisconnected(Channel channel) {
+ }
+
+ void stop() {
+ for (Channel c : new ArrayList<Channel>(channels.keySet())) {
+ c.close();
+ }
+ }
+ }
+
private SecurityHelper securityHelper;
private List<String> securityDomains;
private TransactionManager transactionManager;
@@ -55,8 +90,14 @@
private CacheFactory cacheFactory;
private int maxResultSetCacheStaleness = 60;
private String infinispanConfigFile = "infinispan-config.xml"; //$NON-NLS-1$
- private String jgroupsConfigFile = "tcp.xml"; //$NON-NLS-1$
+ private String jgroupsConfigFile;
+ private DefaultCacheManager manager;
+ private SimpleChannelFactory channelFactory;
+
+ public EmbeddedConfiguration() {
+ }
+
public SecurityHelper getSecurityHelper() {
return securityHelper;
}
@@ -84,13 +125,9 @@
}
public ObjectReplicator getObjectReplicator() {
- if (this.objectReplicator == null) {
- this.objectReplicator = new JGroupsObjectReplicator(new ChannelFactory() {
- @Override
- public Channel createChannel(String id) throws Exception {
- return new
JChannel(this.getClass().getClassLoader().getResource(getJgroupsConfigFile()));
- }
- }, Executors.newCachedThreadPool());
+ if (this.objectReplicator == null && jgroupsConfigFile != null) {
+ channelFactory = new SimpleChannelFactory();
+ this.objectReplicator = new JGroupsObjectReplicator(channelFactory,
Executors.newCachedThreadPool());
}
return objectReplicator;
}
@@ -150,7 +187,7 @@
public CacheFactory getCacheFactory() {
if (this.cacheFactory == null) {
try {
- DefaultCacheManager manager = new DefaultCacheManager(this.infinispanConfigFile,
true);
+ manager = new DefaultCacheManager(this.infinispanConfigFile, true);
manager.startCaches(manager.getCacheNames().toArray(new
String[manager.getCacheNames().size()]));
this.cacheFactory = new InfinispanCacheFactory(manager,
this.getClass().getClassLoader());
} catch (IOException e) {
@@ -175,4 +212,13 @@
public void setJgroupsConfigFile(String jgroupsConfigFile) {
this.jgroupsConfigFile = jgroupsConfigFile;
}
+
+ protected void stop() {
+ if (manager != null) {
+ manager.stop();
+ }
+ if (channelFactory != null) {
+ channelFactory.stop();
+ }
+ }
}
Modified: trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java 2012-09-18 11:57:37
UTC (rev 4455)
+++ trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java 2012-09-18 19:18:44
UTC (rev 4456)
@@ -264,6 +264,7 @@
//to allow teiid to start a request transaction under an existing thread bound
transaction
protected boolean detectTransactions = true;
private Boolean running;
+ private EmbeddedConfiguration config;
public EmbeddedServer() {
@@ -274,10 +275,11 @@
this.connectionFactoryProviders.put(name, connectionFactoryProvider);
}
- public synchronized void start(EmbeddedConfiguration config) {
+ public synchronized void start(@SuppressWarnings("hiding")
EmbeddedConfiguration config) {
if (running != null) {
throw new IllegalStateException();
}
+ this.config = config;
this.eventDistributorFactoryService.start();
this.dqp.setEventDistributor(this.eventDistributorFactoryService.getReplicatedEventDistributor());
this.replicator = config.getObjectReplicator();
@@ -505,6 +507,9 @@
* Stops the server. Once stopped it cannot be restarted.
*/
public synchronized void stop() {
+ if (config != null) {
+ config.stop();
+ }
dqp.stop();
eventDistributorFactoryService.stop();
bufferService = null;
Added: trunk/runtime/src/main/resources/infinispan-replicated-config.xml
===================================================================
--- trunk/runtime/src/main/resources/infinispan-replicated-config.xml
(rev 0)
+++ trunk/runtime/src/main/resources/infinispan-replicated-config.xml 2012-09-18 19:18:44
UTC (rev 4456)
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<infinispan
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:infinispan:config:5.1
http://www.infinispan.org/schemas/infinispan-config-5.1.xsd"
xmlns="urn:infinispan:config:5.1">
+
+ <global>
+ <transport clusterName="teiid-cluster" machineId="m1"
rackId="r1" nodeName="Node-A">
+ <properties>
+ <property name="configurationFile"
value="tcp-shared.xml"/>
+ </properties>
+ </transport>
+ </global>
+
+ <namedCache name="resultset">
+ <transaction transactionMode="TRANSACTIONAL"/>
+ <eviction maxEntries="1024" strategy="LIRS" />
+ <expiration lifespan="7200000"/>
+ <clustering mode="local"/>
+ </namedCache>
+
+ <namedCache name="resultset-repl">
+ <transaction transactionMode="TRANSACTIONAL"/>
+ <eviction maxEntries="1024" strategy="LIRS" />
+ <expiration lifespan="7200000"/>
+ <clustering mode="repl">
+ <sync/>
+ </clustering>
+ </namedCache>
+
+ <namedCache name="preparedplan">
+ <eviction maxEntries="512" strategy="LIRS"/>
+ <expiration lifespan="28800"/>
+ <clustering mode="local"/>
+ </namedCache>
+
+</infinispan>
\ No newline at end of file
Property changes on: trunk/runtime/src/main/resources/infinispan-replicated-config.xml
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added: trunk/runtime/src/main/resources/tcp-shared.xml
===================================================================
--- trunk/runtime/src/main/resources/tcp-shared.xml (rev 0)
+++ trunk/runtime/src/main/resources/tcp-shared.xml 2012-09-18 19:18:44 UTC (rev 4456)
@@ -0,0 +1,70 @@
+<!--
+ TCP based stack, with flow control and message bundling. This is usually used when
IP
+ multicasting cannot be used in a network, e.g. because it is disabled (routers
discard multicast).
+ Note that TCP.bind_addr and TCPPING.initial_hosts should be set, possibly via system
properties, e.g.
+ -Djgroups.bind_addr=192.168.5.2 and
-Djgroups.tcpping.initial_hosts=192.168.5.2[7800]
+ author: Bela Ban
+-->
+<config xmlns="urn:org:jgroups"
+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:org:jgroups
http://www.jgroups.org/schema/JGroups-3.0.xsd">
+ <TCP bind_port="7800"
+ loopback="false"
+ recv_buf_size="${tcp.recv_buf_size:20M}"
+ send_buf_size="${tcp.send_buf_size:640K}"
+ discard_incompatible_packets="true"
+ max_bundle_size="64K"
+ max_bundle_timeout="30"
+ enable_bundling="true"
+ use_send_queues="true"
+ sock_conn_timeout="300"
+
+ timer_type="new"
+ timer.min_threads="4"
+ timer.max_threads="10"
+ timer.keep_alive_time="3000"
+ timer.queue_max_size="500"
+
+ thread_pool.enabled="true"
+ thread_pool.min_threads="1"
+ thread_pool.max_threads="10"
+ thread_pool.keep_alive_time="5000"
+ thread_pool.queue_enabled="false"
+ thread_pool.queue_max_size="100"
+ thread_pool.rejection_policy="discard"
+
+ oob_thread_pool.enabled="true"
+ oob_thread_pool.min_threads="1"
+ oob_thread_pool.max_threads="8"
+ oob_thread_pool.keep_alive_time="5000"
+ oob_thread_pool.queue_enabled="false"
+ oob_thread_pool.queue_max_size="100"
+ oob_thread_pool.rejection_policy="discard"
+ singleton_name="tcp-teiid"/>
+
+ <TCPPING timeout="3000"
+
initial_hosts="${jgroups.tcpping.initial_hosts:localhost[7800],localhost[7801]}"
+ port_range="1"
+ num_initial_members="3"/>
+ <MERGE2 min_interval="10000"
+ max_interval="30000"/>
+ <FD_SOCK/>
+ <FD timeout="3000" max_tries="3" />
+ <VERIFY_SUSPECT timeout="1500" />
+ <BARRIER />
+ <pbcast.NAKACK use_mcast_xmit="false"
+ exponential_backoff="500"
+ discard_delivered_msgs="true"/>
+ <UNICAST />
+ <pbcast.STABLE stability_delay="1000"
desired_avg_gossip="50000"
+ max_bytes="4M"/>
+ <pbcast.GMS print_local_addr="true" join_timeout="3000"
+
+ view_bundling="true"/>
+ <UFC max_credits="2M"
+ min_threshold="0.4"/>
+ <MFC max_credits="2M"
+ min_threshold="0.4"/>
+ <FRAG2 frag_size="60K" />
+ <pbcast.STATE_TRANSFER/>
+</config>
\ No newline at end of file
Property changes on: trunk/runtime/src/main/resources/tcp-shared.xml
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified:
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestReplication.java
===================================================================
---
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestReplication.java 2012-09-18
11:57:37 UTC (rev 4455)
+++
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestReplication.java 2012-09-18
19:18:44 UTC (rev 4456)
@@ -22,8 +22,7 @@
package org.teiid.systemmodel;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
import java.sql.Connection;
import java.sql.ResultSet;
@@ -31,10 +30,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
-import java.util.concurrent.Executors;
-import org.jgroups.Channel;
-import org.jgroups.JChannel;
import org.junit.BeforeClass;
import org.junit.Test;
import org.teiid.core.types.DataTypeManager;
@@ -42,11 +38,9 @@
import org.teiid.jdbc.FakeServer;
import org.teiid.jdbc.FakeServer.DeployVDBParameter;
import org.teiid.metadata.FunctionMethod;
+import org.teiid.metadata.FunctionParameter;
import org.teiid.metadata.FunctionMethod.Determinism;
import org.teiid.metadata.FunctionMethod.PushDown;
-import org.teiid.metadata.FunctionParameter;
-import org.teiid.replication.jgroups.ChannelFactory;
-import org.teiid.replication.jgroups.JGroupsObjectReplicator;
import org.teiid.runtime.EmbeddedConfiguration;
@SuppressWarnings("nls")
@@ -63,8 +57,7 @@
if (DEBUG) {
UnitTestUtil.enableTraceLogging("org.teiid");
}
-
- FakeServer server1 = createServer();
+ FakeServer server1 = createServer("infinispan-replicated-config.xml",
"tcp-shared.xml");
Connection c1 = server1.createConnection("jdbc:teiid:matviews");
Statement stmt = c1.createStatement();
@@ -74,7 +67,7 @@
double d1 = rs.getDouble(1);
double d2 = rs.getDouble(2);
- FakeServer server2 = createServer();
+ FakeServer server2 = createServer("infinispan-replicated-config-1.xml",
"tcp-shared-1.xml");
Connection c2 = server2.createConnection("jdbc:teiid:matviews");
Statement stmt2 = c2.createStatement();
ResultSet rs2 = stmt2.executeQuery("select * from matviews where name =
'RandomView'");
@@ -128,12 +121,12 @@
server2.stop();
}
- private FakeServer createServer() throws Exception {
+ private FakeServer createServer(String ispn, String jgroups) throws Exception {
FakeServer server = new FakeServer(false);
EmbeddedConfiguration config = new EmbeddedConfiguration();
- config.setInfinispanConfigFile(UnitTestUtil.getTestDataPath()+"/infinispan-replicated-config.xml");
-
+ config.setInfinispanConfigFile(ispn);
+ config.setJgroupsConfigFile(jgroups);
server.start(config, true);
HashMap<String, Collection<FunctionMethod>> udfs = new
HashMap<String, Collection<FunctionMethod>>();
udfs.put("funcs", Arrays.asList(new FunctionMethod("pause",
null, null, PushDown.CANNOT_PUSHDOWN, TestMatViews.class.getName(), "pause",
null, new FunctionParameter("return", DataTypeManager.DefaultDataTypes.INTEGER),
true, Determinism.NONDETERMINISTIC)));
Copied:
trunk/test-integration/common/src/test/resources/infinispan-replicated-config-1.xml (from
rev 4454,
trunk/test-integration/common/src/test/resources/infinispan-replicated-config.xml)
===================================================================
--- trunk/test-integration/common/src/test/resources/infinispan-replicated-config-1.xml
(rev 0)
+++
trunk/test-integration/common/src/test/resources/infinispan-replicated-config-1.xml 2012-09-18
19:18:44 UTC (rev 4456)
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<infinispan
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:infinispan:config:5.1
http://www.infinispan.org/schemas/infinispan-config-5.1.xsd"
xmlns="urn:infinispan:config:5.1">
+
+ <global>
+ <transport clusterName="teiid-cluster" machineId="m1"
rackId="r1" nodeName="Node-A">
+ <properties>
+ <property name="configurationFile"
value="tcp-shared-1.xml"/>
+ </properties>
+ </transport>
+ </global>
+
+ <namedCache name="resultset">
+ <transaction transactionMode="TRANSACTIONAL"/>
+ <eviction maxEntries="1024" strategy="LIRS" />
+ <expiration lifespan="7200000"/>
+ <clustering mode="local"/>
+ </namedCache>
+
+ <namedCache name="resultset-repl">
+ <transaction transactionMode="TRANSACTIONAL"/>
+ <eviction maxEntries="1024" strategy="LIRS" />
+ <expiration lifespan="7200000"/>
+ <clustering mode="repl">
+ <sync/>
+ </clustering>
+ </namedCache>
+
+ <namedCache name="preparedplan">
+ <eviction maxEntries="512" strategy="LIRS"/>
+ <expiration lifespan="28800"/>
+ <clustering mode="local"/>
+ </namedCache>
+
+</infinispan>
\ No newline at end of file
Property changes on:
trunk/test-integration/common/src/test/resources/infinispan-replicated-config-1.xml
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Deleted:
trunk/test-integration/common/src/test/resources/infinispan-replicated-config.xml
===================================================================
---
trunk/test-integration/common/src/test/resources/infinispan-replicated-config.xml 2012-09-18
11:57:37 UTC (rev 4455)
+++
trunk/test-integration/common/src/test/resources/infinispan-replicated-config.xml 2012-09-18
19:18:44 UTC (rev 4456)
@@ -1,35 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
-<infinispan
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:infinispan:config:5.1
http://www.infinispan.org/schemas/infinispan-config-5.1.xsd"
xmlns="urn:infinispan:config:5.1">
-
- <global>
- <transport clusterName="teiid-cluster" machineId="m1"
rackId="r1" nodeName="Node-A">
- <properties>
- <property name="configurationFile"
value="tcp.xml"/>
- </properties>
- </transport>
- </global>
-
- <namedCache name="resultset">
- <transaction transactionMode="TRANSACTIONAL"/>
- <eviction maxEntries="1024" strategy="LIRS" />
- <expiration lifespan="7200000"/>
- <clustering mode="local"/>
- </namedCache>
-
- <namedCache name="resultset-repl">
- <transaction transactionMode="TRANSACTIONAL"/>
- <eviction maxEntries="1024" strategy="LIRS" />
- <expiration lifespan="7200000"/>
- <clustering mode="repl">
- <sync/>
- </clustering>
- </namedCache>
-
- <namedCache name="preparedplan">
- <eviction maxEntries="512" strategy="LIRS"/>
- <expiration lifespan="28800"/>
- <clustering mode="local"/>
- </namedCache>
-
-</infinispan>
\ No newline at end of file
Added: trunk/test-integration/common/src/test/resources/tcp-shared-1.xml
===================================================================
--- trunk/test-integration/common/src/test/resources/tcp-shared-1.xml
(rev 0)
+++ trunk/test-integration/common/src/test/resources/tcp-shared-1.xml 2012-09-18 19:18:44
UTC (rev 4456)
@@ -0,0 +1,70 @@
+<!--
+ TCP based stack, with flow control and message bundling. This is usually used when
IP
+ multicasting cannot be used in a network, e.g. because it is disabled (routers
discard multicast).
+ Note that TCP.bind_addr and TCPPING.initial_hosts should be set, possibly via system
properties, e.g.
+ -Djgroups.bind_addr=192.168.5.2 and
-Djgroups.tcpping.initial_hosts=192.168.5.2[7800]
+ author: Bela Ban
+-->
+<config xmlns="urn:org:jgroups"
+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:org:jgroups
http://www.jgroups.org/schema/JGroups-3.0.xsd">
+ <TCP bind_port="7800"
+ loopback="false"
+ recv_buf_size="${tcp.recv_buf_size:20M}"
+ send_buf_size="${tcp.send_buf_size:640K}"
+ discard_incompatible_packets="true"
+ max_bundle_size="64K"
+ max_bundle_timeout="30"
+ enable_bundling="true"
+ use_send_queues="true"
+ sock_conn_timeout="300"
+
+ timer_type="new"
+ timer.min_threads="4"
+ timer.max_threads="10"
+ timer.keep_alive_time="3000"
+ timer.queue_max_size="500"
+
+ thread_pool.enabled="true"
+ thread_pool.min_threads="1"
+ thread_pool.max_threads="10"
+ thread_pool.keep_alive_time="5000"
+ thread_pool.queue_enabled="false"
+ thread_pool.queue_max_size="100"
+ thread_pool.rejection_policy="discard"
+
+ oob_thread_pool.enabled="true"
+ oob_thread_pool.min_threads="1"
+ oob_thread_pool.max_threads="8"
+ oob_thread_pool.keep_alive_time="5000"
+ oob_thread_pool.queue_enabled="false"
+ oob_thread_pool.queue_max_size="100"
+ oob_thread_pool.rejection_policy="discard"
+ singleton_name="tcp-teiid-1"/>
+
+ <TCPPING timeout="3000"
+
initial_hosts="${jgroups.tcpping.initial_hosts:localhost[7800],localhost[7801]}"
+ port_range="1"
+ num_initial_members="3"/>
+ <MERGE2 min_interval="10000"
+ max_interval="30000"/>
+ <FD_SOCK/>
+ <FD timeout="3000" max_tries="3" />
+ <VERIFY_SUSPECT timeout="1500" />
+ <BARRIER />
+ <pbcast.NAKACK use_mcast_xmit="false"
+ exponential_backoff="500"
+ discard_delivered_msgs="true"/>
+ <UNICAST />
+ <pbcast.STABLE stability_delay="1000"
desired_avg_gossip="50000"
+ max_bytes="4M"/>
+ <pbcast.GMS print_local_addr="true" join_timeout="3000"
+
+ view_bundling="true"/>
+ <UFC max_credits="2M"
+ min_threshold="0.4"/>
+ <MFC max_credits="2M"
+ min_threshold="0.4"/>
+ <FRAG2 frag_size="60K" />
+ <pbcast.STATE_TRANSFER/>
+</config>
\ No newline at end of file
Property changes on: trunk/test-integration/common/src/test/resources/tcp-shared-1.xml
___________________________________________________________________
Added: svn:mime-type
+ text/plain