[teiid-commits] teiid SVN: r3700 - in trunk: engine/src/main/java/org/teiid/query and 1 other directory.
teiid-commits at lists.jboss.org
teiid-commits at lists.jboss.org
Mon Nov 28 13:05:45 EST 2011
Author: shawkins
Date: 2011-11-28 13:05:45 -0500 (Mon, 28 Nov 2011)
New Revision: 3700
Modified:
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java
Log:
TEIID-1837 a check is performed for both initial and partial state transfers to determine the pull target
Modified: trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
===================================================================
--- trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java 2011-11-23 19:43:50 UTC (rev 3699)
+++ trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java 2011-11-28 18:05:45 UTC (rev 3700)
@@ -23,8 +23,6 @@
package org.teiid.replication.jboss;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.io.Serializable;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
@@ -33,6 +31,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -57,18 +56,22 @@
import org.jgroups.util.Promise;
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
-import org.jgroups.util.Util;
import org.teiid.Replicated;
import org.teiid.Replicated.ReplicationMode;
+import org.teiid.core.TeiidRuntimeException;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
import org.teiid.query.ObjectReplicator;
import org.teiid.query.ReplicatedObject;
+ at SuppressWarnings("unchecked")
public class JGroupsObjectReplicator implements ObjectReplicator, Serializable {
-
+
+ private static final int IO_TIMEOUT = 15000;
+
private final class ReplicatorRpcDispatcher<S> extends RpcDispatcher {
private final S object;
+ private boolean initialized;
private final HashMap<Method, Short> methodMap;
private final ArrayList<Method> methodList;
Map<List<?>, JGroupsInputStream> inputStreams = new ConcurrentHashMap<List<?>, JGroupsInputStream>();
@@ -127,7 +130,7 @@
if (is != null) {
is.receive(null);
}
- is = new JGroupsInputStream(15000);
+ is = new JGroupsInputStream(IO_TIMEOUT);
this.inputStreams.put(key, is);
executor.execute(new StreamingRunner(object, stateId, is, null));
} else if (method_call.getId() == methodList.size() - 2) {
@@ -148,6 +151,15 @@
ReplicatedObject ro = (ReplicatedObject)object;
Serializable stateId = (Serializable)method_call.getArgs()[0];
+ if (stateId == null) {
+ synchronized (this) {
+ if (initialized) {
+ return Boolean.TRUE;
+ }
+ return null;
+ }
+ }
+
if (ro.hasState(stateId)) {
return Boolean.TRUE;
}
@@ -160,7 +172,11 @@
JGroupsOutputStream oStream = new JGroupsOutputStream(this, Arrays.asList(dest.address), stateId, (short)(methodMap.size() - 3), false);
try {
- ro.getState(stateId, oStream);
+ if (stateId == null) {
+ ro.getState(oStream);
+ } else {
+ ro.getState(stateId, oStream);
+ }
} finally {
oStream.close();
}
@@ -204,11 +220,15 @@
@Override
public void run() {
try {
- ((ReplicatedObject)object).setState(stateId, is);
+ if (stateId == null) {
+ ((ReplicatedObject<?>)object).setState(is);
+ } else {
+ ((ReplicatedObject)object).setState(stateId, is);
+ }
if (promise != null) {
promise.setResult(Boolean.TRUE);
}
- LogManager.logDetail(LogConstants.CTX_RUNTIME, "state set " + stateId); //$NON-NLS-1$
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, "state set", stateId); //$NON-NLS-1$
} catch (Exception e) {
if (promise != null) {
promise.setResult(Boolean.FALSE);
@@ -228,8 +248,7 @@
private final S object;
private transient ReplicatorRpcDispatcher<S> disp;
private final HashMap<Method, Short> methodMap;
- protected List<Address> remoteMembers = new ArrayList<Address>();
- protected final transient Promise<Boolean> state_promise=new Promise<Boolean>();
+ protected List<Address> remoteMembers = Collections.synchronizedList(new ArrayList<Address>());
private Map<Serializable, Promise<Boolean>> loadingStates = new HashMap<Serializable, Promise<Boolean>>();
private ReplicatedInvocationHandler(S object,HashMap<Method, Short> methodMap) {
@@ -237,6 +256,12 @@
this.methodMap = methodMap;
}
+ List<Address> getRemoteMembersCopy() {
+ synchronized (remoteMembers) {
+ return new ArrayList<Address>(remoteMembers);
+ }
+ }
+
public void setDisp(ReplicatorRpcDispatcher<S> disp) {
this.disp = disp;
}
@@ -264,11 +289,9 @@
return handleReplicateState(method, args, annotation);
}
MethodCall call=new MethodCall(methodNum, args);
- ArrayList<Address> dests = null;
+ List<Address> dests = null;
if (annotation.remoteOnly()) {
- synchronized (remoteMembers) {
- dests = new ArrayList<Address>(remoteMembers);
- }
+ dests = getRemoteMembersCopy();
}
RspList<Object> responses = disp.callRemoteMethods(dests, call, new RequestOptions().setMode(annotation.asynch()?ResponseMode.GET_NONE:ResponseMode.GET_ALL).setTimeout(annotation.timeout()).setAnycasting(dests != null));
if (annotation.asynch()) {
@@ -294,6 +317,25 @@
throw new RuntimeException(method + " " + args + " failed", e); //$NON-NLS-1$ //$NON-NLS-2$
}
}
+
+ protected Address whereIsState(Serializable stateId, long timeout) throws Exception {
+ if (remoteMembers.isEmpty()) {
+ return null;
+ }
+ RspList<Boolean> resp = this.disp.callRemoteMethods(getRemoteMembersCopy(), new MethodCall((short)(methodMap.size() - 5), new Object[]{stateId}), new RequestOptions(ResponseMode.GET_ALL, timeout));
+ Collection<Rsp<Boolean>> values = resp.values();
+ Rsp<Boolean> rsp = null;
+ for (Rsp<Boolean> response : values) {
+ if (Boolean.TRUE.equals(response.getValue())) {
+ rsp = response;
+ break;
+ }
+ }
+ if (rsp == null) {
+ return null;
+ }
+ return rsp.getSender();
+ }
private Object handleReplicateState(Method method, Object[] args,
Replicated annotation) throws IllegalAccessException,
@@ -304,10 +346,7 @@
} catch (InvocationTargetException e) {
throw e.getCause();
}
- List<Address> dests = null;
- synchronized (remoteMembers) {
- dests = new ArrayList<Address>(remoteMembers);
- }
+ List<Address> dests = getRemoteMembersCopy();
ReplicatedObject ro = (ReplicatedObject)object;
Serializable stateId = (Serializable)args[0];
if (annotation.replicateState() == ReplicationMode.PUSH) {
@@ -324,9 +363,17 @@
if (result != null) {
return result;
}
- if (!(object instanceof ReplicatedObject)) {
- throw new IllegalStateException("A non-ReplicatedObject cannot use state pulling."); //$NON-NLS-1$
- }
+ long timeout = annotation.timeout();
+ return pullState(method, args, stateId, timeout);
+ }
+
+ /**
+ * Pull the remote state. The method and args are optional
+ * to determine if the state has been made available.
+ */
+ Object pullState(Method method, Object[] args, Serializable stateId,
+ long timeout) throws Throwable {
+ Object result = null;
for (int i = 0; i < PULL_RETRIES; i++) {
Promise<Boolean> p = null;
boolean wait = true;
@@ -334,61 +381,60 @@
p = loadingStates.get(stateId);
if (p == null) {
wait = false;
- try {
- result = method.invoke(object, args);
- } catch (InvocationTargetException e) {
- throw e.getCause();
+ if (method != null) {
+ try {
+ result = method.invoke(object, args);
+ } catch (InvocationTargetException e) {
+ throw e.getCause();
+ }
+ if (result != null) {
+ return result;
+ }
}
- if (result != null) {
- return result;
- }
p = new Promise<Boolean>();
loadingStates.put(stateId, p);
}
}
- long timeout = annotation.timeout();
if (wait) {
p.getResult(timeout);
continue;
}
try {
LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "pulling state", stateId); //$NON-NLS-1$
- RspList<Boolean> resp = this.disp.callRemoteMethods(null, new MethodCall((short)(methodMap.size() - 5), stateId), new RequestOptions(ResponseMode.GET_ALL, timeout));
- Collection<Rsp<Boolean>> values = resp.values();
- Rsp<Boolean> rsp = null;
- for (Rsp<Boolean> response : values) {
- if (Boolean.TRUE.equals(response.getValue())) {
- rsp = response;
- break;
- }
- }
- if (rsp == null || this.disp.getChannel().getAddress().equals(rsp.getSender())) {
+ Address addr = whereIsState(stateId, timeout);
+ if (addr == null) {
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "timeout exceeded or first member"); //$NON-NLS-1$
break;
}
- JGroupsInputStream is = new JGroupsInputStream(15000);
+ JGroupsInputStream is = new JGroupsInputStream(IO_TIMEOUT);
StreamingRunner runner = new StreamingRunner(object, stateId, is, p);
- List<?> key = Arrays.asList(stateId, new AddressWrapper(rsp.getSender()));
+ List<?> key = Arrays.asList(stateId, new AddressWrapper(addr));
disp.inputStreams.put(key, is);
executor.execute(runner);
- this.disp.callRemoteMethod(rsp.getSender(), new MethodCall((short)(methodMap.size() - 4), stateId, new AddressWrapper(this.disp.getChannel().getAddress())), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(true));
+ this.disp.callRemoteMethod(addr, new MethodCall((short)(methodMap.size() - 4), stateId, new AddressWrapper(this.disp.getChannel().getAddress())), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(true));
Boolean fetched = p.getResult(timeout);
if (fetched != null) {
if (fetched) {
LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "pulled state", stateId); //$NON-NLS-1$
- } else {
- LogManager.logWarning(LogConstants.CTX_RUNTIME, object + " failed to pull " + stateId); //$NON-NLS-1$
- }
+ if (method !=null) {
+ try {
+ result = method.invoke(object, args);
+ } catch (InvocationTargetException e) {
+ throw e.getCause();
+ }
+ if (result != null) {
+ return result;
+ }
+ }
+ break;
+ }
+ LogManager.logWarning(LogConstants.CTX_RUNTIME, object + " failed to pull " + stateId); //$NON-NLS-1$
} else {
LogManager.logWarning(LogConstants.CTX_RUNTIME, object + " timeout pulling " + stateId); //$NON-NLS-1$
}
- try {
- result = method.invoke(object, args);
- } catch (InvocationTargetException e) {
- throw e.getCause();
- }
} finally {
synchronized (loadingStates) {
loadingStates.remove(stateId);
@@ -403,12 +449,12 @@
if (newView.getMembers() != null) {
synchronized (remoteMembers) {
remoteMembers.removeAll(newView.getMembers());
- if (object instanceof ReplicatedObject && !remoteMembers.isEmpty()) {
+ if (object instanceof ReplicatedObject<?> && !remoteMembers.isEmpty()) {
HashSet<Serializable> dropped = new HashSet<Serializable>();
for (Address address : remoteMembers) {
dropped.add(new AddressWrapper(address));
}
- ((ReplicatedObject)object).droppedMembers(dropped);
+ ((ReplicatedObject<?>)object).droppedMembers(dropped);
}
remoteMembers.clear();
remoteMembers.addAll(newView.getMembers());
@@ -416,33 +462,6 @@
}
}
}
-
- @Override
- public void setState(InputStream istream) {
- LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "loading initial state"); //$NON-NLS-1$
- try {
- ((ReplicatedObject)object).setState(istream);
- state_promise.setResult(Boolean.TRUE);
- } catch (Exception e) {
- state_promise.setResult(Boolean.FALSE);
- LogManager.logError(LogConstants.CTX_RUNTIME, e, "error loading initial state"); //$NON-NLS-1$
- } finally {
- Util.close(istream);
- }
- }
-
- @Override
- public void getState(OutputStream ostream) {
- LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "getting initial state"); //$NON-NLS-1$
- try {
- ((ReplicatedObject)object).getState(ostream);
- } catch (Exception e) {
- LogManager.logError(LogConstants.CTX_RUNTIME, e, "error gettting initial state"); //$NON-NLS-1$
- } finally {
- Util.close(ostream);
- }
- }
-
}
private interface Streaming {
@@ -470,7 +489,6 @@
c.close();
}
- @SuppressWarnings("unchecked")
@Override
public <T, S> T replicate(String mux_id,
Class<T> iface, final S object, long startTimeout) throws Exception {
@@ -528,23 +546,24 @@
channel.connect(mux_id);
if (object instanceof ReplicatedObject) {
((ReplicatedObject)object).setAddress(new AddressWrapper(channel.getAddress()));
- channel.getState(null, startTimeout);
- Boolean loaded = proxy.state_promise.getResult(1);
- if (loaded == null) {
- LogManager.logInfo(LogConstants.CTX_RUNTIME, object + " timeout exceeded or first member"); //$NON-NLS-1$
- } else if (loaded) {
- LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "loaded"); //$NON-NLS-1$
- } else {
- LogManager.logWarning(LogConstants.CTX_RUNTIME, object + " load error"); //$NON-NLS-1$
- }
+ proxy.pullState(null, null, null, startTimeout);
}
success = true;
return replicatedProxy;
+ } catch (Throwable e) {
+ if (e instanceof Exception) {
+ throw (Exception)e;
+ }
+ throw new TeiidRuntimeException(e);
} finally {
if (!success) {
channel.close();
+ } else {
+ synchronized (disp) {
+ //mark as initialized so that state can be pulled if needed
+ disp.initialized = true;
+ }
}
}
}
-
}
Modified: trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java 2011-11-23 19:43:50 UTC (rev 3699)
+++ trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java 2011-11-28 18:05:45 UTC (rev 3700)
@@ -75,6 +75,11 @@
*/
void droppedMembers(Collection<Serializable> addresses);
+ /**
+ * Return true if the object has the given state
+ * @param state_id
+ * @return
+ */
boolean hasState(K state_id);
}
More information about the teiid-commits
mailing list