Author: shawkins
Date: 2011-11-18 12:54:46 -0500 (Fri, 18 Nov 2011)
New Revision: 3671
Modified:
trunk/api/src/main/java/org/teiid/Replicated.java
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStore.java
trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java
Log:
TEIID-1720 switching resultset caching to use cluster pull, instead of push semantics.
Modified: trunk/api/src/main/java/org/teiid/Replicated.java
===================================================================
--- trunk/api/src/main/java/org/teiid/Replicated.java 2011-11-18 02:09:05 UTC (rev 3670)
+++ trunk/api/src/main/java/org/teiid/Replicated.java 2011-11-18 17:54:46 UTC (rev 3671)
@@ -37,6 +37,13 @@
@Inherited
@Documented
public @interface Replicated {
+
+ enum ReplicationMode {
+ PUSH,
+ PULL,
+ NONE
+ }
+
/**
* @return true if members should be called asynchronously. asynch methods should be
void.
*/
@@ -50,9 +57,12 @@
*/
boolean remoteOnly() default false;
/**
- * @return true if the remote members should have a partial state replication called
using the first argument as the state after
- * the local method has been invoked. should not be used with remoteOnly.
+ * Should not be used with remoteOnly.
+ *
+ * @return PUSH if the remote members should have a partial state replication called
using the first argument as the state after
+ * the local method has been invoked, or PULL if the local member should initial a
partial state pull using the first argument
+ * as the state after the local method returns null
*/
- boolean replicateState() default false;
+ ReplicationMode replicateState() default ReplicationMode.NONE;
}
\ No newline at end of file
Modified:
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
===================================================================
---
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java 2011-11-18
02:09:05 UTC (rev 3670)
+++
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java 2011-11-18
17:54:46 UTC (rev 3671)
@@ -29,21 +29,34 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.jboss.as.clustering.jgroups.ChannelFactory;
-import org.jgroups.*;
+import org.jgroups.Address;
+import org.jgroups.Channel;
+import org.jgroups.ExtendedReceiverAdapter;
+import org.jgroups.Message;
+import org.jgroups.View;
import org.jgroups.blocks.GroupRequest;
import org.jgroups.blocks.MethodCall;
import org.jgroups.blocks.MethodLookup;
+import org.jgroups.blocks.RequestOptions;
import org.jgroups.blocks.RpcDispatcher;
import org.jgroups.util.Promise;
import org.jgroups.util.RspList;
import org.jgroups.util.Util;
import org.teiid.Replicated;
+import org.teiid.Replicated.ReplicationMode;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
import org.teiid.query.ObjectReplicator;
@@ -89,6 +102,11 @@
private final HashMap<Method, Short> methodMap;
protected Vector<Address> remoteMembers = new Vector<Address>();
protected final transient Promise<Boolean> state_promise=new
Promise<Boolean>();
+ protected transient ThreadLocal<Promise<Boolean>> threadLocalPromise =
new ThreadLocal<Promise<Boolean>>() {
+ protected org.jgroups.util.Promise<Boolean> initialValue() {
+ return new Promise<Boolean>();
+ }
+ };
private ReplicatedInvocationHandler(S object,
HashMap<Method, Short> methodMap) {
@@ -119,7 +137,7 @@
}
try {
Replicated annotation = method.getAnnotation(Replicated.class);
- if (annotation.replicateState()) {
+ if (annotation.replicateState() != ReplicationMode.NONE) {
Object result = null;
try {
result = method.invoke(object, args);
@@ -132,13 +150,34 @@
}
ReplicatedObject ro = (ReplicatedObject)object;
String stateId = (String)args[0];
- LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "replicating
state", stateId); //$NON-NLS-1$
- JGroupsOutputStream oStream = new JGroupsOutputStream(disp, dests, stateId,
(short)(methodMap.size() - 3));
- try {
- ro.getState(stateId, oStream);
- } finally {
- oStream.close();
+ if (annotation.replicateState() == ReplicationMode.PUSH) {
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "replicating
state", stateId); //$NON-NLS-1$
+ JGroupsOutputStream oStream = new JGroupsOutputStream(disp, dests, stateId,
(short)(methodMap.size() - 3));
+ try {
+ ro.getState(stateId, oStream);
+ } finally {
+ oStream.close();
+ }
+ LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "sent state",
stateId); //$NON-NLS-1$
+ return result;
}
+ if (result != null) {
+ return result;
+ }
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "pulling state",
stateId); //$NON-NLS-1$
+ long timeout = annotation.timeout();
+ threadLocalPromise.set(new Promise<Boolean>());
+ boolean getState = this.disp.getChannel().getState(null, stateId, timeout);
+ if (getState) {
+ Boolean loaded = threadLocalPromise.get().getResult(timeout);
+ if (Boolean.TRUE.equals(loaded)) {
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "loaded",
stateId); //$NON-NLS-1$
+ } else {
+ LogManager.logWarning(LogConstants.CTX_RUNTIME, object + " load error or
timeout " + stateId); //$NON-NLS-1$
+ }
+ } else {
+ LogManager.logInfo(LogConstants.CTX_RUNTIME, object + " first member or
timeout exceeded " + stateId); //$NON-NLS-1$
+ }
LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "sent state",
stateId); //$NON-NLS-1$
return result;
}
@@ -149,7 +188,7 @@
dests = new Vector<Address>(remoteMembers);
}
}
- RspList responses = disp.callRemoteMethods(dests, call,
annotation.asynch()?GroupRequest.GET_NONE:GroupRequest.GET_ALL, annotation.timeout());
+ RspList responses = disp.callRemoteMethods(dests, call, new
RequestOptions().setMode(annotation.asynch()?GroupRequest.GET_NONE:GroupRequest.GET_ALL).setTimeout(annotation.timeout()));
if (annotation.asynch()) {
return null;
}
@@ -184,7 +223,7 @@
}
remoteMembers.clear();
remoteMembers.addAll(newView.getMembers());
- remoteMembers.remove(this.disp.getChannel().getLocalAddress());
+ remoteMembers.remove(this.disp.getChannel().getAddress());
}
}
}
@@ -214,6 +253,32 @@
Util.close(ostream);
}
}
+
+ @Override
+ public void setState(String stateId, InputStream istream) {
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "loading state");
//$NON-NLS-1$
+ try {
+ ((ReplicatedObject)object).setState(stateId, istream);
+ threadLocalPromise.get().setResult(Boolean.TRUE);
+ } catch (Exception e) {
+ threadLocalPromise.get().setResult(Boolean.FALSE);
+ LogManager.logError(LogConstants.CTX_RUNTIME, e, "error loading state");
//$NON-NLS-1$
+ } finally {
+ Util.close(istream);
+ }
+ }
+
+ @Override
+ public void getState(String stateId, OutputStream ostream) {
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "getting state");
//$NON-NLS-1$
+ try {
+ ((ReplicatedObject)object).getState(stateId, ostream);
+ } catch (Exception e) {
+ LogManager.logError(LogConstants.CTX_RUNTIME, e, "error gettting state");
//$NON-NLS-1$
+ } finally {
+ Util.close(ostream);
+ }
+ }
}
private interface Streaming {
@@ -222,13 +287,10 @@
void finishState(String id);
}
-
- private String clusterName;
//TODO: this should be configurable, or use a common executor
private transient Executor executor = Executors.newCachedThreadPool();
- public JGroupsObjectReplicator(String clusterName) {
- this.clusterName = clusterName;
+ public JGroupsObjectReplicator(@SuppressWarnings("unused") String clusterName)
{
}
public abstract ChannelFactory getChannelFactory();
@@ -368,14 +430,14 @@
try {
channel.connect(mux_id);
if (object instanceof ReplicatedObject) {
- ((ReplicatedObject)object).setLocalAddress(channel.getLocalAddress());
+ ((ReplicatedObject)object).setLocalAddress(channel.getAddress());
boolean getState = channel.getState(null, startTimeout);
if (getState) {
- boolean loaded = proxy.state_promise.getResult(startTimeout);
- if (loaded) {
+ Boolean loaded = proxy.state_promise.getResult(startTimeout);
+ if (Boolean.TRUE.equals(loaded)) {
LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "loaded");
//$NON-NLS-1$
} else {
- LogManager.logWarning(LogConstants.CTX_RUNTIME, object + " load
timeout"); //$NON-NLS-1$
+ LogManager.logWarning(LogConstants.CTX_RUNTIME, object + " load error or
timeout"); //$NON-NLS-1$
}
} else {
LogManager.logInfo(LogConstants.CTX_RUNTIME, object + " first member or timeout
exceeded"); //$NON-NLS-1$
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java 2011-11-18
02:09:05 UTC (rev 3670)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java 2011-11-18
17:54:46 UTC (rev 3671)
@@ -25,6 +25,7 @@
import java.util.List;
import org.teiid.Replicated;
+import org.teiid.Replicated.ReplicationMode;
import org.teiid.core.TeiidComponentException;
import org.teiid.query.sql.symbol.Expression;
@@ -126,6 +127,7 @@
void addTupleBuffer(TupleBuffer tb);
+ @Replicated(replicateState=ReplicationMode.PULL)
TupleBuffer getTupleBuffer(String id);
/**
@@ -141,6 +143,5 @@
*/
int reserveAdditionalBuffers(int additional);
- @Replicated(replicateState=true)
void distributeTupleBuffer(String uuid, TupleBuffer tb);
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-11-18
02:09:05 UTC (rev 3670)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-11-18
17:54:46 UTC (rev 3671)
@@ -975,21 +975,6 @@
@Override
public void getState(OutputStream ostream) {
- try {
- ObjectOutputStream out = new ObjectOutputStream(ostream);
- for(String id:this.tupleBufferMap.keySet()) {
- TupleReference tr = this.tupleBufferMap.get(id);
- TupleBuffer tb = tr.get();
- if (tb != null) {
- out.writeObject(tb.getId());
- getTupleBufferState(out, tb);
- }
- }
- } catch (TeiidComponentException e) {
- throw new TeiidRuntimeException(e);
- } catch (IOException e) {
- throw new TeiidRuntimeException(e);
- }
}
@Override
@@ -1019,27 +1004,6 @@
@Override
public void setState(InputStream istream) {
- try {
- ObjectInputStream in = new ObjectInputStream(istream);
- while (true) {
- String state_id = null;
- try {
- state_id = (String)in.readObject();
- } catch (IOException e) {
- break;
- }
- if (state_id != null) {
- setTupleBufferState(state_id, in);
- }
- }
- } catch (IOException e) {
- throw new TeiidRuntimeException(e);
- } catch(ClassNotFoundException e) {
- throw new TeiidRuntimeException(e);
- } catch(TeiidComponentException e) {
- throw new TeiidRuntimeException(e);
- }
-
}
@Override
Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStore.java 2011-11-18
02:09:05 UTC (rev 3670)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStore.java 2011-11-18
17:54:46 UTC (rev 3671)
@@ -26,6 +26,7 @@
import java.util.List;
import org.teiid.Replicated;
+import org.teiid.Replicated.ReplicationMode;
import org.teiid.api.exception.query.QueryMetadataException;
import org.teiid.api.exception.query.QueryResolverException;
import org.teiid.api.exception.query.QueryValidatorException;
@@ -62,7 +63,7 @@
boolean needsLoading(String matTableName, Serializable loadingAddress,
boolean firstPass, boolean refresh, boolean invalidate);
- @Replicated(replicateState=true)
+ @Replicated(replicateState=ReplicationMode.PUSH)
void loaded(String matTableName, TempTable table);
}
Modified:
trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java
===================================================================
---
trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java 2011-11-18
02:09:05 UTC (rev 3670)
+++
trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java 2011-11-18
17:54:46 UTC (rev 3671)
@@ -22,9 +22,7 @@
package org.teiid.dqp.service.buffer;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
import java.io.FileInputStream;
import java.io.FileOutputStream;
@@ -34,9 +32,9 @@
import org.junit.Test;
import org.teiid.common.buffer.BufferManager;
-import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.common.buffer.TupleBatch;
import org.teiid.common.buffer.TupleBuffer;
+import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.common.buffer.impl.BufferFrontedFileStoreCache;
import org.teiid.common.buffer.impl.BufferManagerImpl;
import org.teiid.common.buffer.impl.FileStorageManager;
@@ -125,23 +123,21 @@
es2.setType(DataTypeManager.getDataTypeClass(DefaultDataTypes.INTEGER));
schema.add(es2);
- for (int i = 0; i < 5; i++) {
- TupleBuffer buffer = mgr.createTupleBuffer(schema, "cached",
TupleSourceType.FINAL); //$NON-NLS-1$
- buffer.setBatchSize(50);
- buffer.setId("state_id"+i);
-
- for (int batch=0; batch<3; batch++) {
- for (int row = 0; row < 50; row++) {
- int val = (i*150)+(batch*50)+row;
- buffer.addTuple(Arrays.asList(new Object[] {"String"+val, new
Integer(val)}));
- }
+ TupleBuffer buffer = mgr.createTupleBuffer(schema, "cached",
TupleSourceType.FINAL); //$NON-NLS-1$
+ buffer.setBatchSize(50);
+ buffer.setId("state_id");
+
+ for (int batch=0; batch<3; batch++) {
+ for (int row = 0; row < 50; row++) {
+ int val = (batch*50)+row;
+ buffer.addTuple(Arrays.asList(new Object[] {"String"+val, new
Integer(val)}));
}
- buffer.close();
- mgr.distributeTupleBuffer(buffer.getId(), buffer);
}
+ buffer.close();
+ mgr.distributeTupleBuffer(buffer.getId(), buffer);
FileOutputStream fo = new
FileOutputStream(UnitTestUtil.getTestScratchPath()+"/teiid/statetest");
- ((BufferManagerImpl)mgr).getState(fo);
+ ((BufferManagerImpl)mgr).getState(buffer.getId(), fo);
fo.close();
svc.stop();
@@ -153,20 +149,18 @@
BufferManagerImpl mgr2 = svc2.getBufferManager();
FileInputStream fis = new
FileInputStream(UnitTestUtil.getTestScratchPath()+"/teiid/statetest");
- mgr2.setState(fis);
+ mgr2.setState(buffer.getId(), fis);
fis.close();
- for (int i = 0; i < 5; i++) {
- String id = "state_id"+i;
- TupleBuffer buffer = mgr2.getTupleBuffer(id);
- for (int batch=0; batch<3; batch++) {
- TupleBatch tb = buffer.getBatch((batch*50)+1);
- List[] rows = tb.getAllTuples();
- for (int row = 0; row < 50; row++) {
- int val = (i*150)+(batch*50)+row;
- assertEquals("String"+val, rows[row].get(0));
- assertEquals(val, rows[row].get(1));
- }
+ String id = "state_id";
+ buffer = mgr2.getTupleBuffer(id);
+ for (int batch=0; batch<3; batch++) {
+ TupleBatch tb = buffer.getBatch((batch*50)+1);
+ List[] rows = tb.getAllTuples();
+ for (int row = 0; row < 50; row++) {
+ int val = (batch*50)+row;
+ assertEquals("String"+val, rows[row].get(0));
+ assertEquals(val, rows[row].get(1));
}
}
svc2.stop();