Author: shawkins
Date: 2011-11-23 14:43:50 -0500 (Wed, 23 Nov 2011)
New Revision: 3699
Added:
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/AddressWrapper.java
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestReplication.java
Removed:
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViewReplication.java
Modified:
trunk/api/src/main/java/org/teiid/Replicated.java
trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/ClusterableCacheFactory.java
trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCache.java
trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheFactory.java
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java
trunk/engine/src/main/java/org/teiid/cache/Cachable.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedPlan.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java
trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java
trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestSessionAwareCache.java
trunk/jboss-integration/src/main/java/org/teiid/jboss/BufferManagerService.java
trunk/jboss-integration/src/main/java/org/teiid/jboss/JGroupsObjectReplicatorService.java
trunk/jboss-integration/src/main/java/org/teiid/jboss/TeiidAdd.java
trunk/test-integration/common/pom.xml
trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java
Log:
TEIID-1720 next refinement of replication logic
Modified: trunk/api/src/main/java/org/teiid/Replicated.java
===================================================================
--- trunk/api/src/main/java/org/teiid/Replicated.java 2011-11-23 18:30:27 UTC (rev 3698)
+++ trunk/api/src/main/java/org/teiid/Replicated.java 2011-11-23 19:43:50 UTC (rev 3699)
@@ -61,7 +61,7 @@
*
* @return PUSH if the remote members should have a partial state replication called
using the first argument as the state after
* the local method has been invoked, or PULL if the local member should initial a
partial state pull using the first argument
- * as the state after the local method returns null
+ * as the state after the local method returns null. PULL cannot be asynch.
*/
ReplicationMode replicateState() default ReplicationMode.NONE;
Modified:
trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/ClusterableCacheFactory.java
===================================================================
---
trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/ClusterableCacheFactory.java 2011-11-23
18:30:27 UTC (rev 3698)
+++
trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/ClusterableCacheFactory.java 2011-11-23
19:43:50 UTC (rev 3699)
@@ -27,6 +27,7 @@
import javax.naming.InitialContext;
import javax.naming.NamingException;
+import org.infinispan.manager.CacheContainer;
import org.teiid.cache.Cache;
import org.teiid.cache.CacheConfiguration;
import org.teiid.cache.CacheFactory;
@@ -49,7 +50,7 @@
}
else {
try {
- this.delegate = new JBossCacheFactory(this.resultsetCacheName, cacheManager);
+ this.delegate = new JBossCacheFactory(this.resultsetCacheName, (CacheContainer)
cacheManager);
} catch (Exception e) {
throw new TeiidRuntimeException("Failed to obtain the clusted cache");
//$NON-NLS-1$
}
Modified: trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCache.java
===================================================================
--- trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCache.java 2011-11-23
18:30:27 UTC (rev 3698)
+++ trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCache.java 2011-11-23
19:43:50 UTC (rev 3699)
@@ -56,7 +56,10 @@
@Override
public V put(String key, V value, Long ttl) {
- return this.cacheStore.put(fqn(key), value, ttl, TimeUnit.SECONDS);
+ if (ttl != null) {
+ return this.cacheStore.put(fqn(key), value, ttl, TimeUnit.MILLISECONDS);
+ }
+ return this.cacheStore.put(fqn(key), value);
}
@Override
Modified:
trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheFactory.java
===================================================================
---
trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheFactory.java 2011-11-23
18:30:27 UTC (rev 3698)
+++
trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheFactory.java 2011-11-23
19:43:50 UTC (rev 3699)
@@ -37,9 +37,8 @@
private volatile boolean destroyed = false;
- public JBossCacheFactory(String name, Object cm) throws Exception {
- CacheContainer cachemanager = (CacheContainer)cm;
- this.cacheStore = cachemanager.getCache(name);
+ public JBossCacheFactory(String name, CacheContainer cm) {
+ this.cacheStore = cm.getCache(name);
}
/**
Added:
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/AddressWrapper.java
===================================================================
--- trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/AddressWrapper.java
(rev 0)
+++
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/AddressWrapper.java 2011-11-23
19:43:50 UTC (rev 3699)
@@ -0,0 +1,86 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.replication.jboss;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+import org.jgroups.Address;
+import org.teiid.core.util.ReflectionHelper;
+
+/**
+ * Allows JGroups {@link Address} objects to be serializable
+ */
+public final class AddressWrapper implements Externalizable {
+
+ Address address;
+
+ public AddressWrapper() {
+
+ }
+
+ public AddressWrapper(Address address) {
+ this.address = address;
+ }
+
+ @Override
+ public int hashCode() {
+ return address.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof AddressWrapper)) {
+ return false;
+ }
+ return address.equals(((AddressWrapper)obj).address);
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException,
+ ClassNotFoundException {
+ String className = in.readUTF();
+ try {
+ this.address = (Address) ReflectionHelper.create(className, null,
Thread.currentThread().getContextClassLoader());
+ this.address.readFrom(in);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeUTF(address.getClass().getName());
+ try {
+ address.writeTo(out);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+}
\ No newline at end of file
Property changes on:
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/AddressWrapper.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified:
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java
===================================================================
---
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java 2011-11-23
18:30:27 UTC (rev 3698)
+++
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java 2011-11-23
19:43:50 UTC (rev 3699)
@@ -31,14 +31,17 @@
public class JGroupsInputStream extends InputStream {
- static long TIME_OUT = 15000; //TODO make configurable
-
+ private long timeout = 15000;
private volatile byte[] buf;
private volatile int index=0;
private ReentrantLock lock = new ReentrantLock();
private Condition write = lock.newCondition();
private Condition doneReading = lock.newCondition();
+ public JGroupsInputStream(long timeout) {
+ this.timeout = timeout;
+ }
+
@Override
public int read() throws IOException {
if (index < 0) {
@@ -47,7 +50,7 @@
if (buf == null) {
lock.lock();
try {
- write.await(TIME_OUT, TimeUnit.MILLISECONDS);
+ write.await(timeout, TimeUnit.MILLISECONDS);
if (index < 0) {
return -1;
}
Modified:
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
===================================================================
---
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java 2011-11-23
18:30:27 UTC (rev 3698)
+++
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java 2011-11-23
19:43:50 UTC (rev 3699)
@@ -22,11 +22,8 @@
package org.teiid.replication.jboss;
-import java.io.Externalizable;
import java.io.IOException;
import java.io.InputStream;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
import java.io.OutputStream;
import java.io.Serializable;
import java.lang.reflect.InvocationHandler;
@@ -40,7 +37,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@@ -48,7 +44,9 @@
import org.jboss.as.clustering.jgroups.ChannelFactory;
import org.jgroups.Address;
import org.jgroups.Channel;
+import org.jgroups.MembershipListener;
import org.jgroups.Message;
+import org.jgroups.MessageListener;
import org.jgroups.ReceiverAdapter;
import org.jgroups.View;
import org.jgroups.blocks.MethodCall;
@@ -57,92 +55,164 @@
import org.jgroups.blocks.ResponseMode;
import org.jgroups.blocks.RpcDispatcher;
import org.jgroups.util.Promise;
+import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
import org.jgroups.util.Util;
import org.teiid.Replicated;
import org.teiid.Replicated.ReplicationMode;
-import org.teiid.core.util.ReflectionHelper;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
import org.teiid.query.ObjectReplicator;
import org.teiid.query.ReplicatedObject;
-public abstract class JGroupsObjectReplicator implements ObjectReplicator, Serializable
{
+public class JGroupsObjectReplicator implements ObjectReplicator, Serializable {
- public static final class AddressWrapper implements Externalizable {
-
- private Address address;
-
- public AddressWrapper() {
-
+ private final class ReplicatorRpcDispatcher<S> extends RpcDispatcher {
+ private final S object;
+ private final HashMap<Method, Short> methodMap;
+ private final ArrayList<Method> methodList;
+ Map<List<?>, JGroupsInputStream> inputStreams = new
ConcurrentHashMap<List<?>, JGroupsInputStream>();
+
+ private ReplicatorRpcDispatcher(Channel channel, MessageListener l,
+ MembershipListener l2, Object serverObj, S object,
+ HashMap<Method, Short> methodMap, ArrayList<Method> methodList) {
+ super(channel, l, l2, serverObj);
+ this.object = object;
+ this.methodMap = methodMap;
+ this.methodList = methodList;
}
-
- public AddressWrapper(Address address) {
- this.address = address;
- }
-
+
@Override
- public int hashCode() {
- return address.hashCode();
+ public Object handle(Message req) {
+ Object body=null;
+
+ if(req == null || req.getLength() == 0) {
+ if(log.isErrorEnabled()) log.error("message or message buffer is
null"); //$NON-NLS-1$
+ return null;
+ }
+
+ try {
+ body=req_marshaller != null?
+ req_marshaller.objectFromBuffer(req.getBuffer(), req.getOffset(),
req.getLength())
+ : req.getObject();
+ }
+ catch(Throwable e) {
+ if(log.isErrorEnabled()) log.error("exception marshalling object",
e); //$NON-NLS-1$
+ return e;
+ }
+
+ if(!(body instanceof MethodCall)) {
+ if(log.isErrorEnabled()) log.error("message does not contain a MethodCall
object"); //$NON-NLS-1$
+
+ // create an exception to represent this and return it
+ return new IllegalArgumentException("message does not contain a
MethodCall object") ; //$NON-NLS-1$
+ }
+
+ final MethodCall method_call=(MethodCall)body;
+
+ try {
+ if(log.isTraceEnabled())
+ log.trace("[sender=" + req.getSrc() + "], method_call:
" + method_call); //$NON-NLS-1$ //$NON-NLS-2$
+
+ if(method_lookup == null)
+ throw new Exception("MethodCall uses ID=" + method_call.getId() +
", but method_lookup has not been set"); //$NON-NLS-1$ //$NON-NLS-2$
+
+ if (method_call.getId() >= methodList.size() - 3) {
+ Serializable address = new AddressWrapper(req.getSrc());
+ Serializable stateId = (Serializable)method_call.getArgs()[0];
+ List<?> key = Arrays.asList(stateId, address);
+ JGroupsInputStream is = inputStreams.get(key);
+ if (method_call.getId() == methodList.size() - 3) {
+ LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "create
state", stateId); //$NON-NLS-1$
+ if (is != null) {
+ is.receive(null);
+ }
+ is = new JGroupsInputStream(15000);
+ this.inputStreams.put(key, is);
+ executor.execute(new StreamingRunner(object, stateId, is, null));
+ } else if (method_call.getId() == methodList.size() - 2) {
+ LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "building
state", stateId); //$NON-NLS-1$
+ if (is != null) {
+ is.receive((byte[])method_call.getArgs()[1]);
+ }
+ } else if (method_call.getId() == methodList.size() - 1) {
+ LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "finished
state", stateId); //$NON-NLS-1$
+ if (is != null) {
+ is.receive(null);
+ }
+ this.inputStreams.remove(key);
+ }
+ return null;
+ } else if (method_call.getId() == methodList.size() - 5) {
+ //hasState
+ ReplicatedObject ro = (ReplicatedObject)object;
+ Serializable stateId = (Serializable)method_call.getArgs()[0];
+
+ if (ro.hasState(stateId)) {
+ return Boolean.TRUE;
+ }
+ return null;
+ } else if (method_call.getId() == methodList.size() - 4) {
+ //sendState
+ ReplicatedObject ro = (ReplicatedObject)object;
+ String stateId = (String)method_call.getArgs()[0];
+ AddressWrapper dest = (AddressWrapper)method_call.getArgs()[1];
+
+ JGroupsOutputStream oStream = new JGroupsOutputStream(this,
Arrays.asList(dest.address), stateId, (short)(methodMap.size() - 3), false);
+ try {
+ ro.getState(stateId, oStream);
+ } finally {
+ oStream.close();
+ }
+ LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "sent state",
stateId); //$NON-NLS-1$
+ return null;
+ }
+
+ Method m=method_lookup.findMethod(method_call.getId());
+ if(m == null)
+ throw new Exception("no method found for " +
method_call.getId()); //$NON-NLS-1$
+ method_call.setMethod(m);
+
+ return method_call.invoke(server_obj);
+ }
+ catch(Throwable x) {
+ return x;
+ }
}
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
- if (!(obj instanceof AddressWrapper)) {
- return false;
- }
- return address.equals(((AddressWrapper)obj).address);
- }
-
- @Override
- public void readExternal(ObjectInput in) throws IOException,
- ClassNotFoundException {
- String className = in.readUTF();
- try {
- this.address = (Address) ReflectionHelper.create(className, null,
Thread.currentThread().getContextClassLoader());
- this.address.readFrom(in);
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-
- @Override
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeUTF(address.getClass().getName());
- try {
- address.writeTo(out);
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-
}
-
+
private static final long serialVersionUID = -6851804958313095166L;
+ private static final String HAS_STATE = "hasState"; //$NON-NLS-1$
+ private static final String SEND_STATE = "sendState"; //$NON-NLS-1$
private static final String CREATE_STATE = "createState"; //$NON-NLS-1$
private static final String BUILD_STATE = "buildState"; //$NON-NLS-1$
private static final String FINISH_STATE = "finishState"; //$NON-NLS-1$
- private final class StreamingRunner implements Runnable {
+ private final static class StreamingRunner implements Runnable {
private final Object object;
- private final String stateId;
+ private final Serializable stateId;
private final JGroupsInputStream is;
+ private Promise<Boolean> promise;
- private StreamingRunner(Object object, String stateId, JGroupsInputStream is) {
+ private StreamingRunner(Object object, Serializable stateId, JGroupsInputStream is,
Promise<Boolean> promise) {
this.object = object;
this.stateId = stateId;
this.is = is;
+ this.promise = promise;
}
@Override
public void run() {
try {
((ReplicatedObject)object).setState(stateId, is);
+ if (promise != null) {
+ promise.setResult(Boolean.TRUE);
+ }
LogManager.logDetail(LogConstants.CTX_RUNTIME, "state set " + stateId);
//$NON-NLS-1$
} catch (Exception e) {
+ if (promise != null) {
+ promise.setResult(Boolean.FALSE);
+ }
LogManager.logError(LogConstants.CTX_RUNTIME, e, "error setting state " +
stateId); //$NON-NLS-1$
} finally {
is.close();
@@ -150,28 +220,24 @@
}
}
- private final static class ReplicatedInvocationHandler<S> extends ReceiverAdapter
implements
+ private final class ReplicatedInvocationHandler<S> extends ReceiverAdapter
implements
InvocationHandler, Serializable {
+ private static final int PULL_RETRIES = 3;
private static final long serialVersionUID = -2943462899945966103L;
private final S object;
- private RpcDispatcher disp;
+ private transient ReplicatorRpcDispatcher<S> disp;
private final HashMap<Method, Short> methodMap;
protected List<Address> remoteMembers = new ArrayList<Address>();
protected final transient Promise<Boolean> state_promise=new
Promise<Boolean>();
+ private Map<Serializable, Promise<Boolean>> loadingStates = new
HashMap<Serializable, Promise<Boolean>>();
- protected transient ThreadLocal<Promise<Boolean>> threadLocalPromise =
new ThreadLocal<Promise<Boolean>>() {
- protected org.jgroups.util.Promise<Boolean> initialValue() {
- return new Promise<Boolean>();
- }
- };
-
private ReplicatedInvocationHandler(S object,HashMap<Method, Short> methodMap) {
this.object = object;
this.methodMap = methodMap;
}
- public void setDisp(RpcDispatcher disp) {
+ public void setDisp(ReplicatorRpcDispatcher<S> disp) {
this.disp = disp;
}
@@ -195,57 +261,16 @@
try {
Replicated annotation = method.getAnnotation(Replicated.class);
if (annotation.replicateState() != ReplicationMode.NONE) {
- Object result = null;
- try {
- result = method.invoke(object, args);
- } catch (InvocationTargetException e) {
- throw e.getCause();
- }
- List<Address> dests = null;
- synchronized (remoteMembers) {
- dests = new ArrayList<Address>(remoteMembers);
- }
- ReplicatedObject ro = (ReplicatedObject)object;
- String stateId = (String)args[0];
- if (annotation.replicateState() == ReplicationMode.PUSH) {
- LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "replicating
state", stateId); //$NON-NLS-1$
- JGroupsOutputStream oStream = new JGroupsOutputStream(disp, dests, stateId,
(short)(methodMap.size() - 3));
- try {
- ro.getState(stateId, oStream);
- } finally {
- oStream.close();
- }
- LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "sent state",
stateId); //$NON-NLS-1$
- return result;
- }
- if (result != null) {
- return result;
- }
- LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "pulling state",
stateId); //$NON-NLS-1$
- long timeout = annotation.timeout();
- threadLocalPromise.set(new Promise<Boolean>());
- /*boolean getState = this.disp.getChannel().getState(null, stateId, timeout);
- if (getState) {
- Boolean loaded = threadLocalPromise.get().getResult(timeout);
- if (Boolean.TRUE.equals(loaded)) {
- LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "loaded",
stateId); //$NON-NLS-1$
- } else {
- LogManager.logWarning(LogConstants.CTX_RUNTIME, object + " load error or
timeout " + stateId); //$NON-NLS-1$
- }
- } else {
- LogManager.logInfo(LogConstants.CTX_RUNTIME, object + " first member or
timeout exceeded " + stateId); //$NON-NLS-1$
- }*/
- LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "sent state",
stateId); //$NON-NLS-1$
- return result;
+ return handleReplicateState(method, args, annotation);
}
MethodCall call=new MethodCall(methodNum, args);
- Vector<Address> dests = null;
+ ArrayList<Address> dests = null;
if (annotation.remoteOnly()) {
synchronized (remoteMembers) {
- dests = new Vector<Address>(remoteMembers);
+ dests = new ArrayList<Address>(remoteMembers);
}
}
- RspList<Object> responses = disp.callRemoteMethods(dests, call, new
RequestOptions().setMode(annotation.asynch()?ResponseMode.GET_NONE:ResponseMode.GET_ALL).setTimeout(annotation.timeout()));
+ RspList<Object> responses = disp.callRemoteMethods(dests, call, new
RequestOptions().setMode(annotation.asynch()?ResponseMode.GET_NONE:ResponseMode.GET_ALL).setTimeout(annotation.timeout()).setAnycasting(dests
!= null));
if (annotation.asynch()) {
return null;
}
@@ -266,9 +291,112 @@
}
return null;
} catch(Exception e) {
- throw new RuntimeException(method + " " + args + "
failed"); //$NON-NLS-1$ //$NON-NLS-2$
+ throw new RuntimeException(method + " " + args + " failed",
e); //$NON-NLS-1$ //$NON-NLS-2$
}
}
+
+ private Object handleReplicateState(Method method, Object[] args,
+ Replicated annotation) throws IllegalAccessException,
+ Throwable, IOException, IllegalStateException, Exception {
+ Object result = null;
+ try {
+ result = method.invoke(object, args);
+ } catch (InvocationTargetException e) {
+ throw e.getCause();
+ }
+ List<Address> dests = null;
+ synchronized (remoteMembers) {
+ dests = new ArrayList<Address>(remoteMembers);
+ }
+ ReplicatedObject ro = (ReplicatedObject)object;
+ Serializable stateId = (Serializable)args[0];
+ if (annotation.replicateState() == ReplicationMode.PUSH) {
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "replicating state",
stateId); //$NON-NLS-1$
+ JGroupsOutputStream oStream = new JGroupsOutputStream(disp, dests, stateId,
(short)(methodMap.size() - 3), true);
+ try {
+ ro.getState(stateId, oStream);
+ } finally {
+ oStream.close();
+ }
+ LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "sent state",
stateId); //$NON-NLS-1$
+ return result;
+ }
+ if (result != null) {
+ return result;
+ }
+ if (!(object instanceof ReplicatedObject)) {
+ throw new IllegalStateException("A non-ReplicatedObject cannot use state
pulling."); //$NON-NLS-1$
+ }
+ for (int i = 0; i < PULL_RETRIES; i++) {
+ Promise<Boolean> p = null;
+ boolean wait = true;
+ synchronized (loadingStates) {
+ p = loadingStates.get(stateId);
+ if (p == null) {
+ wait = false;
+ try {
+ result = method.invoke(object, args);
+ } catch (InvocationTargetException e) {
+ throw e.getCause();
+ }
+ if (result != null) {
+ return result;
+ }
+ p = new Promise<Boolean>();
+ loadingStates.put(stateId, p);
+ }
+ }
+ long timeout = annotation.timeout();
+ if (wait) {
+ p.getResult(timeout);
+ continue;
+ }
+ try {
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "pulling state",
stateId); //$NON-NLS-1$
+ RspList<Boolean> resp = this.disp.callRemoteMethods(null, new
MethodCall((short)(methodMap.size() - 5), stateId), new
RequestOptions(ResponseMode.GET_ALL, timeout));
+ Collection<Rsp<Boolean>> values = resp.values();
+ Rsp<Boolean> rsp = null;
+ for (Rsp<Boolean> response : values) {
+ if (Boolean.TRUE.equals(response.getValue())) {
+ rsp = response;
+ break;
+ }
+ }
+ if (rsp == null || this.disp.getChannel().getAddress().equals(rsp.getSender())) {
+ break;
+ }
+ JGroupsInputStream is = new JGroupsInputStream(15000);
+ StreamingRunner runner = new StreamingRunner(object, stateId, is, p);
+ List<?> key = Arrays.asList(stateId, new AddressWrapper(rsp.getSender()));
+ disp.inputStreams.put(key, is);
+ executor.execute(runner);
+
+ this.disp.callRemoteMethod(rsp.getSender(), new MethodCall((short)(methodMap.size()
- 4), stateId, new AddressWrapper(this.disp.getChannel().getAddress())), new
RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(true));
+
+ Boolean fetched = p.getResult(timeout);
+
+ if (fetched != null) {
+ if (fetched) {
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "pulled state",
stateId); //$NON-NLS-1$
+ } else {
+ LogManager.logWarning(LogConstants.CTX_RUNTIME, object + " failed to pull
" + stateId); //$NON-NLS-1$
+ }
+ } else {
+ LogManager.logWarning(LogConstants.CTX_RUNTIME, object + " timeout pulling
" + stateId); //$NON-NLS-1$
+ }
+ try {
+ result = method.invoke(object, args);
+ } catch (InvocationTargetException e) {
+ throw e.getCause();
+ }
+ } finally {
+ synchronized (loadingStates) {
+ loadingStates.remove(stateId);
+ }
+ }
+ }
+ return null; //could not fetch the remote state
+ }
@Override
public void viewAccepted(View newView) {
@@ -315,46 +443,23 @@
}
}
- public void setState(String stateId, InputStream istream) {
- LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "loading state");
//$NON-NLS-1$
- try {
- ((ReplicatedObject)object).setState(stateId, istream);
- threadLocalPromise.get().setResult(Boolean.TRUE);
- } catch (Exception e) {
- threadLocalPromise.get().setResult(Boolean.FALSE);
- LogManager.logError(LogConstants.CTX_RUNTIME, e, "error loading state");
//$NON-NLS-1$
- } finally {
- Util.close(istream);
- }
- }
-
- public void getState(String stateId, OutputStream ostream) {
- LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "getting state");
//$NON-NLS-1$
- try {
- ((ReplicatedObject)object).getState(stateId, ostream);
- } catch (Exception e) {
- LogManager.logError(LogConstants.CTX_RUNTIME, e, "error gettting state");
//$NON-NLS-1$
- } finally {
- Util.close(ostream);
- }
- }
}
private interface Streaming {
- void createState(String id);
- void buildState(String id, byte[] bytes);
- void finishState(String id);
+ void sendState(Serializable id, AddressWrapper dest);
+ void createState(Serializable id);
+ void buildState(Serializable id, byte[] bytes);
+ void finishState(Serializable id);
}
//TODO: this should be configurable, or use a common executor
private transient Executor executor = Executors.newCachedThreadPool();
+ private transient ChannelFactory channelFactory;
- public JGroupsObjectReplicator(@SuppressWarnings("unused") String clusterName)
{
+ public JGroupsObjectReplicator(ChannelFactory channelFactory) {
+ this.channelFactory = channelFactory;
}
- public abstract ChannelFactory getChannelFactory();
-
-
public void stop(Object object) {
if (!Proxy.isProxyClass(object.getClass())) {
return;
@@ -369,7 +474,7 @@
@Override
public <T, S> T replicate(String mux_id,
Class<T> iface, final S object, long startTimeout) throws Exception {
- Channel channel = getChannelFactory().createChannel(mux_id);
+ Channel channel = channelFactory.createChannel(mux_id);
Method[] methods = iface.getMethods();
final HashMap<Method, Short> methodMap = new HashMap<Method, Short>();
@@ -383,14 +488,22 @@
methodMap.put(method, (short)(methodList.size() - 1));
}
+ Method hasState = ReplicatedObject.class.getMethod(HAS_STATE, new Class<?>[]
{Serializable.class});
+ methodList.add(hasState);
+ methodMap.put(hasState, (short)(methodList.size() - 1));
+
+ Method sendState = JGroupsObjectReplicator.Streaming.class.getMethod(SEND_STATE, new
Class<?>[] {Serializable.class, AddressWrapper.class});
+ methodList.add(sendState);
+ methodMap.put(sendState, (short)(methodList.size() - 1));
+
//add in streaming methods
- Method createState = JGroupsObjectReplicator.Streaming.class.getMethod(CREATE_STATE,
new Class<?>[] {String.class});
+ Method createState = JGroupsObjectReplicator.Streaming.class.getMethod(CREATE_STATE,
new Class<?>[] {Serializable.class});
methodList.add(createState);
methodMap.put(createState, (short)(methodList.size() - 1));
- Method buildState = JGroupsObjectReplicator.Streaming.class.getMethod(BUILD_STATE, new
Class<?>[] {String.class, byte[].class});
+ Method buildState = JGroupsObjectReplicator.Streaming.class.getMethod(BUILD_STATE, new
Class<?>[] {Serializable.class, byte[].class});
methodList.add(buildState);
methodMap.put(buildState, (short)(methodList.size() - 1));
- Method finishState = JGroupsObjectReplicator.Streaming.class.getMethod(FINISH_STATE,
new Class<?>[] {String.class});
+ Method finishState = JGroupsObjectReplicator.Streaming.class.getMethod(FINISH_STATE,
new Class<?>[] {Serializable.class});
methodList.add(finishState);
methodMap.put(finishState, (short)(methodList.size() - 1));
@@ -399,87 +512,8 @@
* TODO: could have an object implement streaming
* Override the normal handle method to support streaming
*/
- RpcDispatcher disp = new RpcDispatcher(channel, proxy, proxy, object) {
- Map<List<?>, JGroupsInputStream> inputStreams = new
ConcurrentHashMap<List<?>, JGroupsInputStream>();
- @Override
- public Object handle(Message req) {
- Object body=null;
-
- if(req == null || req.getLength() == 0) {
- if(log.isErrorEnabled()) log.error("message or message buffer is
null"); //$NON-NLS-1$
- return null;
- }
-
- if (req.getSrc().equals(local_addr)) {
- return null;
- }
-
- try {
- body=req_marshaller != null?
- req_marshaller.objectFromBuffer(req.getBuffer(), req.getOffset(),
req.getLength())
- : req.getObject();
- }
- catch(Throwable e) {
- if(log.isErrorEnabled()) log.error("exception marshalling
object", e); //$NON-NLS-1$
- return e;
- }
-
- if(!(body instanceof MethodCall)) {
- if(log.isErrorEnabled()) log.error("message does not contain a
MethodCall object"); //$NON-NLS-1$
-
- // create an exception to represent this and return it
- return new IllegalArgumentException("message does not contain a
MethodCall object") ; //$NON-NLS-1$
- }
-
- final MethodCall method_call=(MethodCall)body;
-
- try {
- if(log.isTraceEnabled())
- log.trace("[sender=" + req.getSrc() + "], method_call:
" + method_call); //$NON-NLS-1$ //$NON-NLS-2$
-
- if(method_lookup == null)
- throw new Exception("MethodCall uses ID=" +
method_call.getId() + ", but method_lookup has not been set"); //$NON-NLS-1$
//$NON-NLS-2$
-
- if (method_call.getId() >= methodList.size() - 3) {
- Serializable address = new AddressWrapper(req.getSrc());
- String stateId = (String)method_call.getArgs()[0];
- List<?> key = Arrays.asList(stateId, address);
- JGroupsInputStream is = inputStreams.get(key);
- if (method_call.getId() == methodList.size() - 3) {
- LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "create
state", stateId); //$NON-NLS-1$
- if (is != null) {
- is.receive(null);
- }
- is = new JGroupsInputStream();
- this.inputStreams.put(key, is);
- executor.execute(new StreamingRunner(object, stateId, is));
- } else if (method_call.getId() == methodList.size() - 2) {
- LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "building
state", stateId); //$NON-NLS-1$
- if (is != null) {
- is.receive((byte[])method_call.getArgs()[1]);
- }
- } else if (method_call.getId() == methodList.size() - 1) {
- LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "finished
state", stateId); //$NON-NLS-1$
- if (is != null) {
- is.receive(null);
- }
- this.inputStreams.remove(key);
- }
- return null;
- }
-
- Method m=method_lookup.findMethod(method_call.getId());
- if(m == null)
- throw new Exception("no method found for " +
method_call.getId()); //$NON-NLS-1$
- method_call.setMethod(m);
-
- return method_call.invoke(server_obj);
- }
- catch(Throwable x) {
- return x;
- }
- }
- };
+ ReplicatorRpcDispatcher disp = new ReplicatorRpcDispatcher<S>(channel,
proxy, proxy, object,
+ object, methodMap, methodList);
proxy.setDisp(disp);
disp.setMethodLookup(new MethodLookup() {
Modified:
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java
===================================================================
---
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java 2011-11-23
18:30:27 UTC (rev 3698)
+++
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java 2011-11-23
19:43:50 UTC (rev 3699)
@@ -24,6 +24,7 @@
import java.io.IOException;
import java.io.OutputStream;
+import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
@@ -40,22 +41,24 @@
protected final RpcDispatcher disp;
protected final List<Address> dests;
- protected final String stateId;
+ protected final Serializable stateId;
protected final short methodOffset;
private volatile boolean closed=false;
private final byte[] buffer=new byte[CHUNK_SIZE];
private int index=0;
- public JGroupsOutputStream(RpcDispatcher disp, List<Address> dests, String
stateId, short methodOffset) throws IOException {
+ public JGroupsOutputStream(RpcDispatcher disp, List<Address> dests,
Serializable stateId, short methodOffset, boolean sendCreate) throws IOException {
this.disp=disp;
this.dests=dests;
this.stateId=stateId;
this.methodOffset = methodOffset;
- try {
- disp.callRemoteMethods(this.dests, new MethodCall(methodOffset, new Object[]
{stateId}), new RequestOptions(ResponseMode.GET_NONE, 0));
- } catch(Exception e) {
- throw new IOException(e);
+ if (sendCreate) {
+ try {
+ disp.callRemoteMethods(this.dests, new MethodCall(methodOffset, new Object[]
{stateId}), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(true));
+ } catch(Exception e) {
+ throw new IOException(e);
+ }
}
}
@@ -65,7 +68,7 @@
}
flush();
try {
- disp.callRemoteMethods(dests, new MethodCall((short)(methodOffset + 2), new
Object[] {stateId}), new RequestOptions(ResponseMode.GET_NONE, 0));
+ disp.callRemoteMethods(dests, new MethodCall((short)(methodOffset + 2), new
Object[] {stateId}), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(true));
} catch(Exception e) {
}
closed=true;
@@ -77,7 +80,7 @@
if(index == 0) {
return;
}
- disp.callRemoteMethods(dests, new MethodCall((short)(methodOffset + 1), new
Object[] {stateId, Arrays.copyOf(buffer, index)}), new
RequestOptions(ResponseMode.GET_NONE, 0));
+ disp.callRemoteMethods(dests, new MethodCall((short)(methodOffset + 1), new
Object[] {stateId, Arrays.copyOf(buffer, index)}), new
RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(true));
index=0;
} catch(Exception e) {
throw new IOException(e);
Modified: trunk/engine/src/main/java/org/teiid/cache/Cachable.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/cache/Cachable.java 2011-11-23 18:30:27 UTC (rev
3698)
+++ trunk/engine/src/main/java/org/teiid/cache/Cachable.java 2011-11-23 19:43:50 UTC (rev
3699)
@@ -26,9 +26,9 @@
public interface Cachable {
- boolean prepare(Cache cache, BufferManager bufferManager);
+ boolean prepare(BufferManager bufferManager);
- boolean restore(Cache cache, BufferManager bufferManager);
+ boolean restore(BufferManager bufferManager);
AccessInfo getAccessInfo();
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-11-23
18:30:27 UTC (rev 3698)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-11-23
19:43:50 UTC (rev 3699)
@@ -74,7 +74,7 @@
*
* TODO: add a pre-fetch for tuplebuffers or some built-in correlation logic with the
queue.
*/
-public class BufferManagerImpl implements BufferManager, StorageManager, ReplicatedObject
{
+public class BufferManagerImpl implements BufferManager, StorageManager,
ReplicatedObject<String> {
/**
* Asynch cleaner attempts to age out old entries and to reduce the memory size when
@@ -1069,8 +1069,14 @@
public void setCache(Cache cache) {
this.cache = cache;
}
+
public int getMemoryCacheEntries() {
return memoryEntries.size();
}
+ @Override
+ public boolean hasState(String stateId) {
+ return this.getTupleBuffer(stateId) != null;
+ }
+
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java 2011-11-23
18:30:27 UTC (rev 3698)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java 2011-11-23
19:43:50 UTC (rev 3699)
@@ -27,7 +27,6 @@
import org.teiid.api.exception.query.QueryParserException;
import org.teiid.api.exception.query.QueryResolverException;
import org.teiid.cache.Cachable;
-import org.teiid.cache.Cache;
import org.teiid.common.buffer.BufferManager;
import org.teiid.common.buffer.TupleBuffer;
import org.teiid.core.TeiidComponentException;
@@ -94,14 +93,14 @@
}
@Override
- public boolean prepare(Cache cache, BufferManager bufferManager) {
+ public boolean prepare(BufferManager bufferManager) {
Assertion.assertTrue(!this.results.isForwardOnly());
bufferManager.distributeTupleBuffer(this.results.getId(), results);
return true;
}
@Override
- public synchronized boolean restore(Cache cache, BufferManager bufferManager) {
+ public synchronized boolean restore(BufferManager bufferManager) {
if (this.results == null) {
if (this.hasLobs) {
return false; //the lob store is local only and not distributed
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedPlan.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedPlan.java 2011-11-23
18:30:27 UTC (rev 3698)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedPlan.java 2011-11-23
19:43:50 UTC (rev 3699)
@@ -25,7 +25,6 @@
import java.util.List;
import org.teiid.cache.Cachable;
-import org.teiid.cache.Cache;
import org.teiid.common.buffer.BufferManager;
import org.teiid.query.analysis.AnalysisRecord;
import org.teiid.query.processor.ProcessorPlan;
@@ -115,12 +114,12 @@
}
@Override
- public boolean prepare(Cache cache, BufferManager bufferManager) {
+ public boolean prepare(BufferManager bufferManager) {
return true; //no remotable actions
}
@Override
- public boolean restore(Cache cache, BufferManager bufferManager) {
+ public boolean restore(BufferManager bufferManager) {
return true; //no remotable actions
}
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java 2011-11-23
18:30:27 UTC (rev 3698)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java 2011-11-23
19:43:50 UTC (rev 3699)
@@ -61,7 +61,6 @@
private Cache<CacheID, T> localCache;
private Cache<CacheID, T> distributedCache;
- private Cache tupleBatchCache;
private int maxSize = DEFAULT_MAX_SIZE_TOTAL;
private long modTime;
@@ -93,12 +92,6 @@
else {
String location = config.getLocation()+"/"+type.name(); //$NON-NLS-1$
this.distributedCache = cacheFactory.get(location, config);
- if (type == Type.RESULTSET) {
- this.tupleBatchCache = cacheFactory.get(location+"/batches", config);
//$NON-NLS-1$
- }
- else {
- this.tupleBatchCache = this.distributedCache;
- }
}
this.modTime = config.getMaxStaleness()*1000;
this.type = type;
@@ -124,7 +117,7 @@
if (result instanceof Cachable) {
Cachable c = (Cachable)result;
- if (!c.restore(this.tupleBatchCache, this.bufferManager)) {
+ if (!c.restore(this.bufferManager)) {
result = null;
}
}
@@ -188,7 +181,7 @@
if (t instanceof Cachable) {
Cachable c = (Cachable)t;
- insert = c.prepare(this.tupleBatchCache, this.bufferManager);
+ insert = c.prepare(this.bufferManager);
}
if (insert) {
Modified: trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java 2011-11-23 18:30:27
UTC (rev 3698)
+++ trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java 2011-11-23 19:43:50
UTC (rev 3699)
@@ -31,7 +31,7 @@
* Optional interface to be implemented by a replicated object to support full and
partial state transfer.
*
*/
-public interface ReplicatedObject {
+public interface ReplicatedObject<K extends Serializable> {
/**
* Allows an application to write a state through a provided OutputStream.
@@ -46,7 +46,7 @@
* @param state_id id of the partial state requested
* @param ostream the OutputStream
*/
- void getState(String state_id, OutputStream ostream);
+ void getState(K state_id, OutputStream ostream);
/**
* Allows an application to read a state through a provided InputStream.
@@ -61,7 +61,7 @@
* @param state_id id of the partial state requested
* @param istream the InputStream
*/
- void setState(String state_id, InputStream istream);
+ void setState(K state_id, InputStream istream);
/**
* Allows the replicator to set the local address from the channel
@@ -75,4 +75,6 @@
*/
void droppedMembers(Collection<Serializable> addresses);
+ boolean hasState(K state_id);
+
}
Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java 2011-11-23
18:30:27 UTC (rev 3698)
+++
trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java 2011-11-23
19:43:50 UTC (rev 3699)
@@ -63,7 +63,7 @@
import org.teiid.query.sql.symbol.GroupSymbol;
import org.teiid.query.tempdata.TempTableStore.TransactionMode;
-public class GlobalTableStoreImpl implements GlobalTableStore, ReplicatedObject {
+public class GlobalTableStoreImpl implements GlobalTableStore,
ReplicatedObject<String> {
public enum MatState {
NEEDS_LOADING,
@@ -484,5 +484,10 @@
}
}
}
+
+ @Override
+ public boolean hasState(String stateId) {
+ return this.tableStore.getTempTable(stateId) != null;
+ }
}
Modified:
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java 2011-11-23
18:30:27 UTC (rev 3698)
+++
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java 2011-11-23
19:43:50 UTC (rev 3699)
@@ -94,7 +94,7 @@
cache.put(results.getId()+","+row, tb.getBatch(row), null); //$NON-NLS-1$
}
- results.prepare(cache, bm);
+ results.prepare(bm);
//simulate distribute
TupleBuffer distributedTb = bm.getTupleBuffer(results.getId());
@@ -106,7 +106,7 @@
BufferManager bm2 = fbs.getBufferManager();
bm2.distributeTupleBuffer(results.getId(), distributedTb);
- assertTrue(cachedResults.restore(cache, bm2));
+ assertTrue(cachedResults.restore(bm2));
// since restored, simulate a async cache flush
cache.clear();
Modified:
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestSessionAwareCache.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestSessionAwareCache.java 2011-11-23
18:30:27 UTC (rev 3698)
+++
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestSessionAwareCache.java 2011-11-23
19:43:50 UTC (rev 3699)
@@ -30,7 +30,6 @@
import org.mockito.Mockito;
import org.teiid.adminapi.impl.SessionMetadata;
import org.teiid.cache.Cachable;
-import org.teiid.cache.Cache;
import org.teiid.common.buffer.BufferManager;
import org.teiid.dqp.internal.process.SessionAwareCache.CacheID;
import org.teiid.metadata.FunctionMethod.Determinism;
@@ -54,11 +53,11 @@
// make sure that in the case of session specific; we do not call prepare
// as session is local only call for distributed
- Mockito.verify(result, times(0)).prepare((Cache)anyObject(),
(BufferManager)anyObject());
+ Mockito.verify(result, times(0)).prepare((BufferManager)anyObject());
Object c = cache.get(id);
- Mockito.verify(result, times(0)).restore((Cache)anyObject(),
(BufferManager)anyObject());
+ Mockito.verify(result, times(0)).restore((BufferManager)anyObject());
assertTrue(result==c);
}
@@ -71,20 +70,20 @@
CacheID id = new CacheID(buildWorkContext(), new ParseInfo(), "SELECT * FROM
FOO");
Cachable result = Mockito.mock(Cachable.class);
- Mockito.stub(result.prepare((Cache)anyObject(),
(BufferManager)anyObject())).toReturn(true);
- Mockito.stub(result.restore((Cache)anyObject(),
(BufferManager)anyObject())).toReturn(true);
+ Mockito.stub(result.prepare((BufferManager)anyObject())).toReturn(true);
+ Mockito.stub(result.restore((BufferManager)anyObject())).toReturn(true);
cache.put(id, Determinism.USER_DETERMINISTIC, result, null);
// make sure that in the case of session specific; we do not call prepare
// as session is local only call for distributed
- Mockito.verify(result, times(1)).prepare((Cache)anyObject(),
(BufferManager)anyObject());
+ Mockito.verify(result, times(1)).prepare((BufferManager)anyObject());
id = new CacheID(buildWorkContext(), new ParseInfo(), "SELECT * FROM FOO");
Object c = cache.get(id);
- Mockito.verify(result, times(1)).restore((Cache)anyObject(),
(BufferManager)anyObject());
+ Mockito.verify(result, times(1)).restore((BufferManager)anyObject());
assertTrue(result==c);
}
@@ -97,20 +96,20 @@
CacheID id = new CacheID(buildWorkContext(), new ParseInfo(), "SELECT * FROM
FOO");
Cachable result = Mockito.mock(Cachable.class);
- Mockito.stub(result.prepare((Cache)anyObject(),
(BufferManager)anyObject())).toReturn(true);
- Mockito.stub(result.restore((Cache)anyObject(),
(BufferManager)anyObject())).toReturn(true);
+ Mockito.stub(result.prepare((BufferManager)anyObject())).toReturn(true);
+ Mockito.stub(result.restore((BufferManager)anyObject())).toReturn(true);
cache.put(id, Determinism.VDB_DETERMINISTIC, result, null);
// make sure that in the case of session specific; we do not call prepare
// as session is local only call for distributed
- Mockito.verify(result, times(1)).prepare((Cache)anyObject(),
(BufferManager)anyObject());
+ Mockito.verify(result, times(1)).prepare((BufferManager)anyObject());
id = new CacheID(buildWorkContext(), new ParseInfo(), "SELECT * FROM FOO");
Object c = cache.get(id);
- Mockito.verify(result, times(1)).restore((Cache)anyObject(),
(BufferManager)anyObject());
+ Mockito.verify(result, times(1)).restore((BufferManager)anyObject());
assertTrue(result==c);
}
@@ -123,8 +122,8 @@
CacheID id = new CacheID(buildWorkContext(), new ParseInfo(), "SELECT * FROM
FOO");
Cachable result = Mockito.mock(Cachable.class);
- Mockito.stub(result.prepare((Cache)anyObject(),
(BufferManager)anyObject())).toReturn(true);
- Mockito.stub(result.restore((Cache)anyObject(),
(BufferManager)anyObject())).toReturn(true);
+ Mockito.stub(result.prepare((BufferManager)anyObject())).toReturn(true);
+ Mockito.stub(result.restore((BufferManager)anyObject())).toReturn(true);
id = new CacheID(buildWorkContext(), new ParseInfo(), "SELECT * FROM FOO");
cache.put(id, Determinism.VDB_DETERMINISTIC, result, null);
Modified: trunk/jboss-integration/src/main/java/org/teiid/jboss/BufferManagerService.java
===================================================================
---
trunk/jboss-integration/src/main/java/org/teiid/jboss/BufferManagerService.java 2011-11-23
18:30:27 UTC (rev 3698)
+++
trunk/jboss-integration/src/main/java/org/teiid/jboss/BufferManagerService.java 2011-11-23
19:43:50 UTC (rev 3699)
@@ -26,30 +26,53 @@
import org.jboss.msc.service.StartException;
import org.jboss.msc.service.StopContext;
import org.jboss.msc.value.InjectedValue;
+import org.teiid.common.buffer.BufferManager;
+import org.teiid.dqp.service.BufferService;
+import org.teiid.query.ObjectReplicator;
import org.teiid.services.BufferServiceImpl;
-class BufferManagerService implements Service<BufferServiceImpl> {
+class BufferManagerService implements Service<BufferService>, BufferService {
private BufferServiceImpl bufferMgr;
+ private ObjectReplicator replicator;
public final InjectedValue<String> pathInjector = new
InjectedValue<String>();
+ private BufferManager manager;
- public BufferManagerService(BufferServiceImpl buffer) {
+ public BufferManagerService(BufferServiceImpl buffer, ObjectReplicator replicator) {
this.bufferMgr = buffer;
+ this.replicator = replicator;
}
@Override
public void start(StartContext context) throws StartException {
bufferMgr.setDiskDirectory(pathInjector.getValue());
bufferMgr.start();
+ manager = bufferMgr.getBufferManager();
+ if (replicator != null) {
+ try {
+ //use a mux name that will not conflict with any vdb
+ manager = this.replicator.replicate("$BM$", BufferManager.class,
this.manager, 0); //$NON-NLS-1$
+ } catch (Exception e) {
+ throw new StartException(e);
+ }
+ }
}
@Override
public void stop(StopContext context) {
bufferMgr.stop();
+ if (this.replicator != null) {
+ this.replicator.stop(bufferMgr);
+ }
}
+
+ @Override
+ public BufferManager getBufferManager() {
+ return manager;
+ }
@Override
- public BufferServiceImpl getValue() throws
IllegalStateException,IllegalArgumentException {
+ public BufferService getValue() throws IllegalStateException,IllegalArgumentException {
return this.bufferMgr;
}
Modified:
trunk/jboss-integration/src/main/java/org/teiid/jboss/JGroupsObjectReplicatorService.java
===================================================================
---
trunk/jboss-integration/src/main/java/org/teiid/jboss/JGroupsObjectReplicatorService.java 2011-11-23
18:30:27 UTC (rev 3698)
+++
trunk/jboss-integration/src/main/java/org/teiid/jboss/JGroupsObjectReplicatorService.java 2011-11-23
19:43:50 UTC (rev 3699)
@@ -27,34 +27,22 @@
import org.jboss.msc.service.StartException;
import org.jboss.msc.service.StopContext;
import org.jboss.msc.value.InjectedValue;
-import org.teiid.common.buffer.BufferManager;
import org.teiid.replication.jboss.JGroupsObjectReplicator;
class JGroupsObjectReplicatorService implements Service<JGroupsObjectReplicator> {
public final InjectedValue<ChannelFactory> channelFactoryInjector = new
InjectedValue<ChannelFactory>();
private JGroupsObjectReplicator replicator;
- private String clusterName;
- private BufferManager buffermanager;
+ /**
+ * @param clusterName TODO see if this is still useful
+ */
public JGroupsObjectReplicatorService(String clusterName){
- this.clusterName = clusterName;
}
@Override
public void start(StartContext context) throws StartException {
- this.replicator = new JGroupsObjectReplicator(this.clusterName) {
- @Override
- public ChannelFactory getChannelFactory() {
- return channelFactoryInjector.getValue();
- }
- };
-
- try {
- this.replicator.replicate(clusterName, BufferManager.class, this.buffermanager, 0);
- } catch (Exception e) {
- throw new StartException(e);
- }
+ this.replicator = new JGroupsObjectReplicator(channelFactoryInjector.getValue());
}
@Override
@@ -66,8 +54,4 @@
return replicator;
}
- public void setBufferManager(BufferManager buffermanager) {
- this.buffermanager = buffermanager;
- }
-
}
Modified: trunk/jboss-integration/src/main/java/org/teiid/jboss/TeiidAdd.java
===================================================================
--- trunk/jboss-integration/src/main/java/org/teiid/jboss/TeiidAdd.java 2011-11-23
18:30:27 UTC (rev 3698)
+++ trunk/jboss-integration/src/main/java/org/teiid/jboss/TeiidAdd.java 2011-11-23
19:43:50 UTC (rev 3699)
@@ -22,10 +22,7 @@
package org.teiid.jboss;
-import static org.jboss.as.controller.descriptions.ModelDescriptionConstants.ADD;
-import static
org.jboss.as.controller.descriptions.ModelDescriptionConstants.DESCRIPTION;
-import static
org.jboss.as.controller.descriptions.ModelDescriptionConstants.OPERATION_NAME;
-import static
org.jboss.as.controller.descriptions.ModelDescriptionConstants.REQUEST_PROPERTIES;
+import static org.jboss.as.controller.descriptions.ModelDescriptionConstants.*;
import java.util.List;
import java.util.Locale;
@@ -52,17 +49,17 @@
import org.jboss.modules.ModuleIdentifier;
import org.jboss.modules.ModuleLoadException;
import org.jboss.msc.service.ServiceBuilder;
-import org.jboss.msc.service.ServiceBuilder.DependencyType;
import org.jboss.msc.service.ServiceContainer;
import org.jboss.msc.service.ServiceController;
import org.jboss.msc.service.ServiceName;
import org.jboss.msc.service.ServiceTarget;
import org.jboss.msc.service.ValueService;
+import org.jboss.msc.service.ServiceBuilder.DependencyType;
import org.jboss.msc.value.InjectedValue;
import org.teiid.PolicyDecider;
import org.teiid.cache.CacheConfiguration;
-import org.teiid.cache.CacheConfiguration.Policy;
import org.teiid.cache.DefaultCacheFactory;
+import org.teiid.cache.CacheConfiguration.Policy;
import org.teiid.cache.jboss.ClusterableCacheFactory;
import org.teiid.common.buffer.BufferManager;
import org.teiid.deployers.SystemVDBDeployer;
@@ -76,6 +73,7 @@
import org.teiid.dqp.internal.process.DefaultAuthorizationValidator;
import org.teiid.dqp.internal.process.PreparedPlan;
import org.teiid.dqp.internal.process.SessionAwareCache;
+import org.teiid.dqp.service.BufferService;
import org.teiid.jboss.deployers.RuntimeEngineDeployer;
import org.teiid.query.ObjectReplicator;
import org.teiid.query.function.SystemFunctionManager;
@@ -231,13 +229,29 @@
ServiceBuilder<ObjectSerializer> objectSerializerService =
target.addService(TeiidServiceNames.OBJECT_SERIALIZER, serializer);
objectSerializerService.addDependency(TeiidServiceNames.DATA_DIR, String.class,
serializer.getPathInjector());
newControllers.add(objectSerializerService.install());
+
+ // Object Replicator
+ JGroupsObjectReplicatorService replicatorService = null;
+ if (Element.OR_STACK_ATTRIBUTE.isDefined(operation)) {
+ String stack = Element.OR_STACK_ATTRIBUTE.asString(operation);
+
+ String clusterName = "teiid-rep"; //$NON-NLS-1$
+ if (Element.OR_CLUSTER_NAME_ATTRIBUTE.isDefined(operation)) {
+ clusterName = Element.OR_CLUSTER_NAME_ATTRIBUTE.asString(operation);
+ }
+
+ replicatorService = new JGroupsObjectReplicatorService(clusterName);
+ ServiceBuilder<JGroupsObjectReplicator> serviceBuilder =
target.addService(TeiidServiceNames.OBJECT_REPLICATOR, replicatorService);
+ serviceBuilder.addDependency(ServiceName.JBOSS.append("jgroups",
"stack", stack), ChannelFactory.class,
replicatorService.channelFactoryInjector); //$NON-NLS-1$ //$NON-NLS-2$
+ newControllers.add(serviceBuilder.install());
+ }
// TODO: remove verbose service by moving the buffer service from runtime project
newControllers.add(RelativePathService.addService(TeiidServiceNames.BUFFER_DIR,
"teiid-buffer", "jboss.server.temp.dir", target)); //$NON-NLS-1$
//$NON-NLS-2$
- final BufferServiceImpl bufferManager = buildBufferManager(operation);
- BufferManagerService bufferService = new BufferManagerService(bufferManager);
- ServiceBuilder<BufferServiceImpl> bufferServiceBuilder =
target.addService(TeiidServiceNames.BUFFER_MGR, bufferService);
+ BufferManagerService bufferService = new
BufferManagerService(buildBufferManager(operation), replicatorService.getValue());
+ ServiceBuilder<BufferService> bufferServiceBuilder =
target.addService(TeiidServiceNames.BUFFER_MGR, bufferService);
bufferServiceBuilder.addDependency(TeiidServiceNames.BUFFER_DIR, String.class,
bufferService.pathInjector);
+ bufferServiceBuilder.addDependency(TeiidServiceNames.BUFFER_DIR, String.class,
bufferService.pathInjector);
newControllers.add(bufferServiceBuilder.install());
PolicyDecider policyDecider;
@@ -272,7 +286,7 @@
newControllers.add(target.addService(TeiidServiceNames.AUTHORIZATION_VALIDATOR,
authValidatorService).install());
// resultset cache
- final SessionAwareCache<CachedResults> resultsetCache =
buildResultsetCache(operation, bufferManager.getBufferManager());
+ final SessionAwareCache<CachedResults> resultsetCache =
buildResultsetCache(operation, bufferService.getValue().getBufferManager());
ValueService<SessionAwareCache<CachedResults>> resultSetService = new
ValueService<SessionAwareCache<CachedResults>>(new
org.jboss.msc.value.Value<SessionAwareCache<CachedResults>>() {
@Override
public SessionAwareCache<CachedResults> getValue() throws IllegalStateException,
IllegalArgumentException {
@@ -282,7 +296,7 @@
newControllers.add(target.addService(TeiidServiceNames.CACHE_RESULTSET,
resultSetService).install());
// prepared-plan cache
- final SessionAwareCache<PreparedPlan> preparedPlanCache =
buildPreparedPlanCache(operation, bufferManager.getBufferManager());
+ final SessionAwareCache<PreparedPlan> preparedPlanCache =
buildPreparedPlanCache(operation, bufferService.getValue().getBufferManager());
ValueService<SessionAwareCache<PreparedPlan>> preparedPlanService = new
ValueService<SessionAwareCache<PreparedPlan>>(new
org.jboss.msc.value.Value<SessionAwareCache<PreparedPlan>>() {
@Override
public SessionAwareCache<PreparedPlan> getValue() throws IllegalStateException,
IllegalArgumentException {
@@ -291,22 +305,6 @@
});
newControllers.add(target.addService(TeiidServiceNames.CACHE_PREPAREDPLAN,
preparedPlanService).install());
- // Object Replicator
- if (Element.OR_STACK_ATTRIBUTE.isDefined(operation)) {
- String stack = Element.OR_STACK_ATTRIBUTE.asString(operation);
-
- String clusterName = "teiid-rep"; //$NON-NLS-1$
- if (Element.OR_CLUSTER_NAME_ATTRIBUTE.isDefined(operation)) {
- clusterName = Element.OR_CLUSTER_NAME_ATTRIBUTE.asString(operation);
- }
-
- JGroupsObjectReplicatorService replicatorService = new
JGroupsObjectReplicatorService(clusterName);
- replicatorService.setBufferManager(bufferManager.getBufferManager());
- ServiceBuilder<JGroupsObjectReplicator> serviceBuilder =
target.addService(TeiidServiceNames.OBJECT_REPLICATOR, replicatorService);
- serviceBuilder.addDependency(ServiceName.JBOSS.append("jgroups",
"stack", stack), ChannelFactory.class,
replicatorService.channelFactoryInjector); //$NON-NLS-1$ //$NON-NLS-2$
- newControllers.add(serviceBuilder.install());
- }
-
// Query Engine
final RuntimeEngineDeployer engine = buildQueryEngine(operation);
String workManager = "default"; //$NON-NLS-1$
@@ -367,7 +365,7 @@
}
- private BufferServiceImpl buildBufferManager(ModelNode node) {
+ private BufferServiceImpl buildBufferManager(ModelNode node) {
BufferServiceImpl bufferManger = new BufferServiceImpl();
if (node == null) {
@@ -417,7 +415,6 @@
private SessionAwareCache<CachedResults> buildResultsetCache(ModelNode node,
BufferManager bufferManager) {
CacheConfiguration cacheConfig = new CacheConfiguration();
- // these settings are not really used; they are defined by infinispan
cacheConfig.setMaxEntries(1024);
cacheConfig.setMaxAgeInSeconds(7200);
cacheConfig.setType(Policy.EXPIRATION.name());
Modified: trunk/test-integration/common/pom.xml
===================================================================
--- trunk/test-integration/common/pom.xml 2011-11-23 18:30:27 UTC (rev 3698)
+++ trunk/test-integration/common/pom.xml 2011-11-23 19:43:50 UTC (rev 3699)
@@ -47,11 +47,10 @@
<version>1.0</version>
</dependency>
<dependency>
- <groupId>org.jgroups</groupId>
- <artifactId>jgroups</artifactId>
- <version>3.0.0.CR5</version>
- <scope>test</scope>
- </dependency>
+ <groupId>org.jboss.as</groupId>
+ <artifactId>jboss-as-clustering-jgroups</artifactId>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
<profiles>
<profile>
Modified: trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java
===================================================================
--- trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java 2011-11-23
18:30:27 UTC (rev 3698)
+++ trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java 2011-11-23
19:43:50 UTC (rev 3699)
@@ -22,26 +22,39 @@
package org.teiid.jdbc;
import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import javax.security.auth.Subject;
import javax.security.auth.login.LoginException;
import org.mockito.Mockito;
+import org.teiid.Replicated;
+import org.teiid.Replicated.ReplicationMode;
import org.teiid.adminapi.AdminException;
import org.teiid.adminapi.VDB;
import org.teiid.adminapi.impl.ModelMetaData;
import org.teiid.adminapi.impl.VDBMetaData;
+import org.teiid.cache.Cache;
import org.teiid.cache.CacheConfiguration;
+import org.teiid.cache.DefaultCacheFactory;
import org.teiid.cache.CacheConfiguration.Policy;
-import org.teiid.cache.DefaultCacheFactory;
import org.teiid.client.DQP;
import org.teiid.client.security.ILogon;
+import org.teiid.common.buffer.BufferManager;
+import org.teiid.common.buffer.BufferManagerFactory;
+import org.teiid.core.TeiidRuntimeException;
import org.teiid.core.util.UnitTestUtil;
import org.teiid.deployers.CompositeVDB;
import org.teiid.deployers.MetadataStoreGroup;
@@ -57,7 +70,7 @@
import org.teiid.dqp.internal.process.DQPCore;
import org.teiid.dqp.internal.process.PreparedPlan;
import org.teiid.dqp.internal.process.SessionAwareCache;
-import org.teiid.dqp.service.FakeBufferService;
+import org.teiid.dqp.service.BufferService;
import org.teiid.metadata.FunctionMethod;
import org.teiid.metadata.MetadataRepository;
import org.teiid.metadata.MetadataStore;
@@ -67,6 +80,7 @@
import org.teiid.net.CommunicationException;
import org.teiid.net.ConnectionException;
import org.teiid.query.ObjectReplicator;
+import org.teiid.query.ReplicatedObject;
import org.teiid.query.function.SystemFunctionManager;
import org.teiid.query.metadata.TransformationMetadata;
import org.teiid.query.metadata.TransformationMetadata.Resource;
@@ -84,9 +98,108 @@
import org.teiid.transport.LocalServerConnection;
import org.teiid.transport.LogonImpl;
-@SuppressWarnings({"nls", "serial"})
+@SuppressWarnings({"nls"})
public class FakeServer extends ClientServiceRegistryImpl implements ConnectionProfile {
+
+ public interface ReplicatedCache<K, V> extends Cache<K, V> {
+
+ @Replicated(replicateState=ReplicationMode.PULL)
+ public V get(K key);
+ @Replicated(replicateState=ReplicationMode.PUSH)
+ V put(K key, V value, Long ttl);
+
+ @Replicated()
+ V remove(K key);
+
+ }
+
+ public static class ReplicatedCacheImpl<K extends Serializable, V> implements
ReplicatedCache<K, V>, ReplicatedObject<K> {
+ private Cache<K, V> cache;
+
+ public ReplicatedCacheImpl(Cache<K, V> cache) {
+ this.cache = cache;
+ }
+
+ public void clear() {
+ cache.clear();
+ }
+
+ public V get(K key) {
+ return cache.get(key);
+ }
+
+ public String getName() {
+ return cache.getName();
+ }
+
+ public Set<K> keys() {
+ return cache.keys();
+ }
+
+ public V put(K key, V value, Long ttl) {
+ return cache.put(key, value, ttl);
+ }
+
+ public V remove(K key) {
+ return cache.remove(key);
+ }
+
+ public int size() {
+ return cache.size();
+ }
+
+ @Override
+ public void getState(K stateId, OutputStream ostream) {
+ V value = get(stateId);
+ if (value != null) {
+ try {
+ ObjectOutputStream oos = new ObjectOutputStream(ostream);
+ oos.writeObject(value);
+ oos.close();
+ } catch (IOException e) {
+ throw new TeiidRuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public void setState(K stateId, InputStream istream) {
+ try {
+ ObjectInputStream ois = new ObjectInputStream(istream);
+ V value = (V) ois.readObject();
+ this.put(stateId, value, null);
+ } catch (IOException e) {
+ throw new TeiidRuntimeException(e);
+ } catch (ClassNotFoundException e) {
+ throw new TeiidRuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean hasState(K stateId) {
+ return cache.get(stateId) != null;
+ }
+
+ @Override
+ public void droppedMembers(Collection<Serializable> addresses) {
+ }
+
+ @Override
+ public void getState(OutputStream ostream) {
+ }
+
+ @Override
+ public void setAddress(Serializable address) {
+ }
+
+ @Override
+ public void setState(InputStream istream) {
+ }
+
+
+ }
+
SessionServiceImpl sessionService = new SessionServiceImpl() {
@Override
protected TeiidLoginContext authenticate(String userName,
@@ -113,10 +226,17 @@
}
public FakeServer(DQPConfiguration config) {
- this(config, false);
+ start(config, false);
}
- public FakeServer(DQPConfiguration config, boolean realBufferMangaer) {
+ public FakeServer(boolean start) {
+ if (start) {
+ start(new DQPConfiguration(), false);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public void start(DQPConfiguration config, boolean realBufferMangaer) {
sessionService.setSecurityHelper(Mockito.mock(SecurityHelper.class));
sessionService.setSecurityDomains(Arrays.asList("somedomain"));
@@ -147,20 +267,57 @@
this.repo.start();
this.sessionService.setVDBRepository(repo);
+ BufferService bs = null;
if (!realBufferMangaer) {
- this.dqp.setBufferService(new FakeBufferService());
+ bs = new BufferService() {
+
+ @Override
+ public BufferManager getBufferManager() {
+ return BufferManagerFactory.createBufferManager();
+ }
+ };
} else {
BufferServiceImpl bsi = new BufferServiceImpl();
bsi.setDiskDirectory(UnitTestUtil.getTestScratchPath());
- this.dqp.setBufferService(bsi);
bsi.start();
+ bs = bsi;
}
+ if (replicator != null) {
+ try {
+ final BufferManager bm = replicator.replicate("$BM$", BufferManager.class,
bs.getBufferManager(), 0);
+ bs = new BufferService() {
+
+ @Override
+ public BufferManager getBufferManager() {
+ return bm;
+ }
+ };
+ } catch (Exception e) {
+ throw new TeiidRuntimeException(e);
+ }
+ }
+ this.dqp.setBufferService(bs);
+
+ //TODO: wire in an infinispan cluster rather than this dummy replicated cache
DefaultCacheFactory dcf = new DefaultCacheFactory() {
- @Override
public boolean isReplicated() {
- return true; //pretend to be replicated for matview tests
+ return true;
}
- };
+
+ @Override
+ public <K, V> Cache<K, V> get(String location,
+ CacheConfiguration config) {
+ Cache<K, V> result = super.get(location, config);
+ if (replicator != null) {
+ try {
+ return (Cache<K, V>) replicator.replicate("$RS$",
ReplicatedCache.class, new ReplicatedCacheImpl(result), 0);
+ } catch (Exception e) {
+ throw new TeiidRuntimeException(e);
+ }
+ }
+ return result;
+ }
+ };
SessionAwareCache rs = new SessionAwareCache<CachedResults>(dcf,
SessionAwareCache.Type.RESULTSET, new CacheConfiguration(Policy.LRU, 60, 250,
"resultsetcache"));
SessionAwareCache ppc = new SessionAwareCache<PreparedPlan>(dcf,
SessionAwareCache.Type.PREPAREDPLAN, new CacheConfiguration());
rs.setBufferManager(this.dqp.getBufferManager());
Deleted:
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViewReplication.java
===================================================================
---
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViewReplication.java 2011-11-23
18:30:27 UTC (rev 3698)
+++
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViewReplication.java 2011-11-23
19:43:50 UTC (rev 3699)
@@ -1,138 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.systemmodel;
-
-import static org.junit.Assert.*;
-
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.Statement;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-
-import org.jboss.as.clustering.jgroups.ChannelFactory;
-import org.jgroups.Channel;
-import org.jgroups.JChannel;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.teiid.core.types.DataTypeManager;
-import org.teiid.core.util.UnitTestUtil;
-import org.teiid.jdbc.FakeServer;
-import org.teiid.metadata.FunctionMethod;
-import org.teiid.metadata.FunctionParameter;
-import org.teiid.metadata.FunctionMethod.Determinism;
-import org.teiid.metadata.FunctionMethod.PushDown;
-import org.teiid.replication.jboss.JGroupsObjectReplicator;
-
-@SuppressWarnings("nls")
-public class TestMatViewReplication {
-
- private static final String MATVIEWS = "matviews";
- private static final boolean DEBUG = false;
-
- @BeforeClass public static void oneTimeSetup() {
- System.setProperty("jgroups.bind_addr", "127.0.0.1");
- }
-
- @Test public void testReplication() throws Exception {
- if (DEBUG) {
- UnitTestUtil.enableTraceLogging("org.teiid");
- }
-
- FakeServer server1 = createServer();
-
- Connection c1 = server1.createConnection("jdbc:teiid:matviews");
- Statement stmt = c1.createStatement();
- stmt.execute("select * from TEST.RANDOMVIEW");
- ResultSet rs = stmt.getResultSet();
- assertTrue(rs.next());
- double d1 = rs.getDouble(1);
- double d2 = rs.getDouble(2);
-
- FakeServer server2 = createServer();
- Connection c2 = server2.createConnection("jdbc:teiid:matviews");
- Statement stmt2 = c2.createStatement();
- ResultSet rs2 = stmt2.executeQuery("select * from matviews where name =
'RandomView'");
- assertTrue(rs2.next());
- assertEquals("LOADED", rs2.getString("loadstate"));
- assertEquals(true, rs2.getBoolean("valid"));
- stmt2.execute("select * from TEST.RANDOMVIEW");
- rs2 = stmt2.getResultSet();
- assertTrue(rs2.next());
- assertEquals(d1, rs2.getDouble(1), 0);
- assertEquals(d2, rs2.getDouble(2), 0);
-
- rs2 = stmt2.executeQuery("select * from (call
refreshMatView('TEST.RANDOMVIEW', false)) p");
-
- Thread.sleep(1000);
-
- //make sure we're still valid and the same
- stmt.execute("select * from TEST.RANDOMVIEW");
- rs = stmt.getResultSet();
- assertTrue(rs.next());
- d1 = rs.getDouble(1);
- d2 = rs.getDouble(2);
- stmt2.execute("select * from TEST.RANDOMVIEW");
- rs2 = stmt2.getResultSet();
- assertTrue(rs2.next());
- assertEquals(d1, rs2.getDouble(1), 0);
- assertEquals(d2, rs2.getDouble(2), 0);
-
- //ensure a lookup is usable on each side
- rs2 = stmt2.executeQuery("select lookup('sys.schemas', 'VDBName',
'name', 'SYS')");
- Thread.sleep(1000);
-
- rs = stmt.executeQuery("select lookup('sys.schemas', 'VDBName',
'name', 'SYS')");
- rs.next();
- assertEquals("matviews", rs.getString(1));
-
- server1.stop();
- server2.stop();
- }
-
- @SuppressWarnings("serial")
- private FakeServer createServer() throws Exception {
- FakeServer server = new FakeServer();
-
- JGroupsObjectReplicator jor = new JGroupsObjectReplicator("demo") {
- @Override
- public ChannelFactory getChannelFactory() {
- return new ChannelFactory() {
- @Override
- public Channel createChannel(String id) throws Exception {
- return new
JChannel(this.getClass().getClassLoader().getResource("tcp.xml"));
- }
- };
- }
-
- };
-
- server.setReplicator(jor);
- HashMap<String, Collection<FunctionMethod>> udfs = new
HashMap<String, Collection<FunctionMethod>>();
- udfs.put("funcs", Arrays.asList(new FunctionMethod("pause",
null, null, PushDown.CANNOT_PUSHDOWN, TestMatViews.class.getName(), "pause",
null, new FunctionParameter("return", DataTypeManager.DefaultDataTypes.INTEGER),
false, Determinism.NONDETERMINISTIC)));
- server.deployVDB(MATVIEWS, UnitTestUtil.getTestDataPath() +
"/matviews.vdb", udfs);
- return server;
- }
-
-}
Copied:
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestReplication.java
(from rev 3695,
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViewReplication.java)
===================================================================
---
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestReplication.java
(rev 0)
+++
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestReplication.java 2011-11-23
19:43:50 UTC (rev 3699)
@@ -0,0 +1,146 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.systemmodel;
+
+import static org.junit.Assert.*;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+
+import org.jboss.as.clustering.jgroups.ChannelFactory;
+import org.jgroups.Channel;
+import org.jgroups.JChannel;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.teiid.core.types.DataTypeManager;
+import org.teiid.core.util.UnitTestUtil;
+import org.teiid.dqp.internal.process.DQPConfiguration;
+import org.teiid.jdbc.FakeServer;
+import org.teiid.metadata.FunctionMethod;
+import org.teiid.metadata.FunctionParameter;
+import org.teiid.metadata.FunctionMethod.Determinism;
+import org.teiid.metadata.FunctionMethod.PushDown;
+import org.teiid.replication.jboss.JGroupsObjectReplicator;
+
+@SuppressWarnings("nls")
+public class TestReplication {
+
+ private static final String MATVIEWS = "matviews";
+ private static final boolean DEBUG = false;
+
+ @BeforeClass public static void oneTimeSetup() {
+ System.setProperty("jgroups.bind_addr", "127.0.0.1");
+ }
+
+ @Test public void testReplication() throws Exception {
+ if (DEBUG) {
+ UnitTestUtil.enableTraceLogging("org.teiid");
+ }
+
+ FakeServer server1 = createServer();
+
+ Connection c1 = server1.createConnection("jdbc:teiid:matviews");
+ Statement stmt = c1.createStatement();
+ stmt.execute("select * from TEST.RANDOMVIEW");
+ ResultSet rs = stmt.getResultSet();
+ assertTrue(rs.next());
+ double d1 = rs.getDouble(1);
+ double d2 = rs.getDouble(2);
+
+ FakeServer server2 = createServer();
+ Connection c2 = server2.createConnection("jdbc:teiid:matviews");
+ Statement stmt2 = c2.createStatement();
+ ResultSet rs2 = stmt2.executeQuery("select * from matviews where name =
'RandomView'");
+ assertTrue(rs2.next());
+ assertEquals("LOADED", rs2.getString("loadstate"));
+ assertEquals(true, rs2.getBoolean("valid"));
+ stmt2.execute("select * from TEST.RANDOMVIEW");
+ rs2 = stmt2.getResultSet();
+ assertTrue(rs2.next());
+ assertEquals(d1, rs2.getDouble(1), 0);
+ assertEquals(d2, rs2.getDouble(2), 0);
+
+ rs2 = stmt2.executeQuery("select * from (call
refreshMatView('TEST.RANDOMVIEW', false)) p");
+
+ Thread.sleep(1000);
+
+ //make sure we're still valid and the same
+ stmt.execute("select * from TEST.RANDOMVIEW");
+ rs = stmt.getResultSet();
+ assertTrue(rs.next());
+ d1 = rs.getDouble(1);
+ d2 = rs.getDouble(2);
+ stmt2.execute("select * from TEST.RANDOMVIEW");
+ rs2 = stmt2.getResultSet();
+ assertTrue(rs2.next());
+ assertEquals(d1, rs2.getDouble(1), 0);
+ assertEquals(d2, rs2.getDouble(2), 0);
+
+ //ensure a lookup is usable on each side
+ rs2 = stmt2.executeQuery("select lookup('sys.schemas', 'VDBName',
'name', 'SYS')");
+ Thread.sleep(1000);
+
+ rs = stmt.executeQuery("select lookup('sys.schemas', 'VDBName',
'name', 'SYS')");
+ rs.next();
+ assertEquals("matviews", rs.getString(1));
+
+ //result set cache replication
+
+ rs = stmt.executeQuery("/*+ cache(scope:vdb) */ select rand()");
//$NON-NLS-1$
+ assertTrue(rs.next());
+ d1 = rs.getDouble(1);
+
+ //no wait is needed as we perform a synch pull
+ rs2 = stmt2.executeQuery("/*+ cache(scope:vdb) */ select rand()");
//$NON-NLS-1$
+ assertTrue(rs2.next());
+ d2 = rs2.getDouble(1);
+
+ assertEquals(d1, d2, 0);
+
+ server1.stop();
+ server2.stop();
+ }
+
+ private FakeServer createServer() throws Exception {
+ FakeServer server = new FakeServer(false);
+
+ JGroupsObjectReplicator jor = new JGroupsObjectReplicator(new ChannelFactory() {
+ @Override
+ public Channel createChannel(String id) throws Exception {
+ return new
JChannel(this.getClass().getClassLoader().getResource("tcp.xml"));
+ }
+ });
+
+ server.setReplicator(jor);
+ server.start(new DQPConfiguration(), true);
+ HashMap<String, Collection<FunctionMethod>> udfs = new
HashMap<String, Collection<FunctionMethod>>();
+ udfs.put("funcs", Arrays.asList(new FunctionMethod("pause",
null, null, PushDown.CANNOT_PUSHDOWN, TestMatViews.class.getName(), "pause",
null, new FunctionParameter("return", DataTypeManager.DefaultDataTypes.INTEGER),
false, Determinism.NONDETERMINISTIC)));
+ server.deployVDB(MATVIEWS, UnitTestUtil.getTestDataPath() +
"/matviews.vdb", udfs);
+ return server;
+ }
+
+}
Property changes on:
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestReplication.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain