[teiid-commits] teiid SVN: r3700 - in trunk: engine/src/main/java/org/teiid/query and 1 other directory.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Mon Nov 28 13:05:45 EST 2011


Author: shawkins
Date: 2011-11-28 13:05:45 -0500 (Mon, 28 Nov 2011)
New Revision: 3700

Modified:
   trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
   trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java
Log:
TEIID-1837 a check is performed for both initial and partial state transfers to determine the pull target

Modified: trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
===================================================================
--- trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java	2011-11-23 19:43:50 UTC (rev 3699)
+++ trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java	2011-11-28 18:05:45 UTC (rev 3700)
@@ -23,8 +23,6 @@
 package org.teiid.replication.jboss;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.io.Serializable;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationTargetException;
@@ -33,6 +31,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -57,18 +56,22 @@
 import org.jgroups.util.Promise;
 import org.jgroups.util.Rsp;
 import org.jgroups.util.RspList;
-import org.jgroups.util.Util;
 import org.teiid.Replicated;
 import org.teiid.Replicated.ReplicationMode;
+import org.teiid.core.TeiidRuntimeException;
 import org.teiid.logging.LogConstants;
 import org.teiid.logging.LogManager;
 import org.teiid.query.ObjectReplicator;
 import org.teiid.query.ReplicatedObject;
 
+ at SuppressWarnings("unchecked")
 public class JGroupsObjectReplicator implements ObjectReplicator, Serializable {
-	
+
+	private static final int IO_TIMEOUT = 15000;
+
 	private final class ReplicatorRpcDispatcher<S> extends RpcDispatcher {
 		private final S object;
+		private boolean initialized;
 		private final HashMap<Method, Short> methodMap;
 		private final ArrayList<Method> methodList;
 		Map<List<?>, JGroupsInputStream> inputStreams = new ConcurrentHashMap<List<?>, JGroupsInputStream>();
@@ -127,7 +130,7 @@
 		        		if (is != null) {
 		        			is.receive(null);
 		        		}
-		        		is = new JGroupsInputStream(15000);
+		        		is = new JGroupsInputStream(IO_TIMEOUT);
 		        		this.inputStreams.put(key, is);
 		        		executor.execute(new StreamingRunner(object, stateId, is, null));
 		        	} else if (method_call.getId() == methodList.size() - 2) {
@@ -148,6 +151,15 @@
 		        	ReplicatedObject ro = (ReplicatedObject)object;
 		        	Serializable stateId = (Serializable)method_call.getArgs()[0];
 		        	
+		        	if (stateId == null) {
+		        		synchronized (this) {
+							if (initialized) {
+								return Boolean.TRUE;
+							}
+							return null;
+						}
+		        	}
+		        	
 		        	if (ro.hasState(stateId)) {
 		        		return Boolean.TRUE;
 		        	}
@@ -160,7 +172,11 @@
 		        	
 		        	JGroupsOutputStream oStream = new JGroupsOutputStream(this, Arrays.asList(dest.address), stateId, (short)(methodMap.size() - 3), false);
 					try {
-						ro.getState(stateId, oStream);
+						if (stateId == null) {
+							ro.getState(oStream);
+						} else {
+							ro.getState(stateId, oStream);
+						}
 					} finally {
 						oStream.close();
 					}
@@ -204,11 +220,15 @@
 		@Override
 		public void run() {
 			try {
-				((ReplicatedObject)object).setState(stateId, is);
+				if (stateId == null) {
+					((ReplicatedObject<?>)object).setState(is);
+				} else {
+					((ReplicatedObject)object).setState(stateId, is);
+				}
 				if (promise != null) { 
 					promise.setResult(Boolean.TRUE);
 				}
-				LogManager.logDetail(LogConstants.CTX_RUNTIME, "state set " + stateId); //$NON-NLS-1$
+				LogManager.logDetail(LogConstants.CTX_RUNTIME, "state set", stateId); //$NON-NLS-1$
 			} catch (Exception e) {
 				if (promise != null) {
 					promise.setResult(Boolean.FALSE);
@@ -228,8 +248,7 @@
 		private final S object;
 		private transient ReplicatorRpcDispatcher<S> disp;
 		private final HashMap<Method, Short> methodMap;
-	    protected List<Address> remoteMembers = new ArrayList<Address>();
-	    protected final transient Promise<Boolean> state_promise=new Promise<Boolean>();
+	    protected List<Address> remoteMembers = Collections.synchronizedList(new ArrayList<Address>());
 		private Map<Serializable, Promise<Boolean>> loadingStates = new HashMap<Serializable, Promise<Boolean>>();
 	    
 		private ReplicatedInvocationHandler(S object,HashMap<Method, Short> methodMap) {
@@ -237,6 +256,12 @@
 			this.methodMap = methodMap;
 		}
 		
+		List<Address> getRemoteMembersCopy() {
+			synchronized (remoteMembers) {
+				return new ArrayList<Address>(remoteMembers);
+			}
+		}
+		
 		public void setDisp(ReplicatorRpcDispatcher<S> disp) {
 			this.disp = disp;
 		}
@@ -264,11 +289,9 @@
 		    		return handleReplicateState(method, args, annotation);
 				}
 		        MethodCall call=new MethodCall(methodNum, args);
-		        ArrayList<Address> dests = null;
+		        List<Address> dests = null;
 		        if (annotation.remoteOnly()) {
-					synchronized (remoteMembers) {
-						dests = new ArrayList<Address>(remoteMembers);
-					}
+		        	dests = getRemoteMembersCopy();
 		        }
 		        RspList<Object> responses = disp.callRemoteMethods(dests, call, new RequestOptions().setMode(annotation.asynch()?ResponseMode.GET_NONE:ResponseMode.GET_ALL).setTimeout(annotation.timeout()).setAnycasting(dests != null));
 		        if (annotation.asynch()) {
@@ -294,6 +317,25 @@
 		        throw new RuntimeException(method + " " + args + " failed", e); //$NON-NLS-1$ //$NON-NLS-2$
 		    }
 		}
+		
+		protected Address whereIsState(Serializable stateId, long timeout) throws Exception {
+			if (remoteMembers.isEmpty()) {
+				return null;
+			}
+			RspList<Boolean> resp = this.disp.callRemoteMethods(getRemoteMembersCopy(), new MethodCall((short)(methodMap.size() - 5), new Object[]{stateId}), new RequestOptions(ResponseMode.GET_ALL, timeout));
+			Collection<Rsp<Boolean>> values = resp.values();
+			Rsp<Boolean> rsp = null;
+			for (Rsp<Boolean> response : values) {
+				if (Boolean.TRUE.equals(response.getValue())) {
+					rsp = response;
+					break;
+				}
+			}
+			if (rsp == null) {
+				return null;
+			}
+			return rsp.getSender();
+		}
 
 		private Object handleReplicateState(Method method, Object[] args,
 				Replicated annotation) throws IllegalAccessException,
@@ -304,10 +346,7 @@
 			} catch (InvocationTargetException e) {
 				throw e.getCause();
 			}
-			List<Address> dests = null;
-			synchronized (remoteMembers) {
-				dests = new ArrayList<Address>(remoteMembers);
-			}
+			List<Address> dests = getRemoteMembersCopy();
 			ReplicatedObject ro = (ReplicatedObject)object;
 			Serializable stateId = (Serializable)args[0];
 			if (annotation.replicateState() == ReplicationMode.PUSH) {
@@ -324,9 +363,17 @@
 			if (result != null) {
 				return result;
 			}
-			if (!(object instanceof ReplicatedObject)) {
-				throw new IllegalStateException("A non-ReplicatedObject cannot use state pulling."); //$NON-NLS-1$
-			}
+			long timeout = annotation.timeout();
+			return pullState(method, args, stateId, timeout);
+		}
+
+		/**
+		 * Pull the remote state.  The method and args are optional
+		 * to determine if the state has been made available.
+		 */
+		Object pullState(Method method, Object[] args, Serializable stateId,
+				long timeout) throws Throwable {
+			Object result = null;
 			for (int i = 0; i < PULL_RETRIES; i++) {
 				Promise<Boolean> p = null;
 				boolean wait = true;
@@ -334,61 +381,60 @@
 					p = loadingStates.get(stateId);
 					if (p == null) {
 						wait = false;
-						try {
-							result = method.invoke(object, args);
-						} catch (InvocationTargetException e) {
-							throw e.getCause();
+						if (method != null) {
+							try {
+								result = method.invoke(object, args);
+							} catch (InvocationTargetException e) {
+								throw e.getCause();
+							}
+							if (result != null) {
+								return result;
+							}
 						}
-						if (result != null) {
-							return result;
-						}
 						p = new Promise<Boolean>();
 						loadingStates.put(stateId, p);
 					}
 				}
-				long timeout = annotation.timeout();
 				if (wait) {
 					p.getResult(timeout);
 					continue;
 				}
 				try {
 					LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "pulling state", stateId); //$NON-NLS-1$
-					RspList<Boolean> resp = this.disp.callRemoteMethods(null, new MethodCall((short)(methodMap.size() - 5), stateId), new RequestOptions(ResponseMode.GET_ALL, timeout));
-					Collection<Rsp<Boolean>> values = resp.values();
-					Rsp<Boolean> rsp = null;
-					for (Rsp<Boolean> response : values) {
-						if (Boolean.TRUE.equals(response.getValue())) {
-							rsp = response;
-							break;
-						}
-					}
-					if (rsp == null || this.disp.getChannel().getAddress().equals(rsp.getSender())) {
+					Address addr = whereIsState(stateId, timeout);
+					if (addr == null) {
+						LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "timeout exceeded or first member"); //$NON-NLS-1$	
 						break;
 					}
-					JGroupsInputStream is = new JGroupsInputStream(15000);
+					JGroupsInputStream is = new JGroupsInputStream(IO_TIMEOUT);
 					StreamingRunner runner = new StreamingRunner(object, stateId, is, p);
-					List<?> key = Arrays.asList(stateId, new AddressWrapper(rsp.getSender()));
+					List<?> key = Arrays.asList(stateId, new AddressWrapper(addr));
 					disp.inputStreams.put(key, is);
 					executor.execute(runner);
 					
-					this.disp.callRemoteMethod(rsp.getSender(), new MethodCall((short)(methodMap.size() - 4), stateId, new AddressWrapper(this.disp.getChannel().getAddress())), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(true));
+					this.disp.callRemoteMethod(addr, new MethodCall((short)(methodMap.size() - 4), stateId, new AddressWrapper(this.disp.getChannel().getAddress())), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(true));
 					
 					Boolean fetched = p.getResult(timeout);
 
 					if (fetched != null) {
 						if (fetched) {
 							LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "pulled state", stateId); //$NON-NLS-1$
-						} else {
-							LogManager.logWarning(LogConstants.CTX_RUNTIME, object + " failed to pull " + stateId); //$NON-NLS-1$
-						}
+							if (method !=null) {
+								try {
+									result = method.invoke(object, args);
+								} catch (InvocationTargetException e) {
+									throw e.getCause();
+								}
+								if (result != null) {
+									return result;
+								}
+							}
+							break;
+						} 
+						LogManager.logWarning(LogConstants.CTX_RUNTIME, object + " failed to pull " + stateId); //$NON-NLS-1$
 					} else {
 						LogManager.logWarning(LogConstants.CTX_RUNTIME, object + " timeout pulling " + stateId); //$NON-NLS-1$
 					}
-					try {
-						result = method.invoke(object, args);
-					} catch (InvocationTargetException e) {
-						throw e.getCause();
-					}
 				} finally {
 					synchronized (loadingStates) {
 						loadingStates.remove(stateId);
@@ -403,12 +449,12 @@
 			if (newView.getMembers() != null) {
 				synchronized (remoteMembers) {
 					remoteMembers.removeAll(newView.getMembers());
-					if (object instanceof ReplicatedObject && !remoteMembers.isEmpty()) {
+					if (object instanceof ReplicatedObject<?> && !remoteMembers.isEmpty()) {
 						HashSet<Serializable> dropped = new HashSet<Serializable>();
 						for (Address address : remoteMembers) {
 							dropped.add(new AddressWrapper(address));
 						}
-						((ReplicatedObject)object).droppedMembers(dropped);
+						((ReplicatedObject<?>)object).droppedMembers(dropped);
 					}
 					remoteMembers.clear();
 					remoteMembers.addAll(newView.getMembers());
@@ -416,33 +462,6 @@
 				}
 			}
 		}
-		
-		@Override
-		public void setState(InputStream istream) {
-			LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "loading initial state"); //$NON-NLS-1$
-			try {
-				((ReplicatedObject)object).setState(istream);
-				state_promise.setResult(Boolean.TRUE);
-			} catch (Exception e) {
-				state_promise.setResult(Boolean.FALSE);
-				LogManager.logError(LogConstants.CTX_RUNTIME, e, "error loading initial state"); //$NON-NLS-1$
-			} finally {
-				Util.close(istream);
-			}
-		}
-		
-		@Override
-		public void getState(OutputStream ostream) {
-			LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "getting initial state"); //$NON-NLS-1$
-			try {
-				((ReplicatedObject)object).getState(ostream);
-			} catch (Exception e) {
-				LogManager.logError(LogConstants.CTX_RUNTIME, e, "error gettting initial state"); //$NON-NLS-1$
-			} finally {
-				Util.close(ostream);
-			}
-		}
-		
 	}
 	
 	private interface Streaming {
@@ -470,7 +489,6 @@
 		c.close();
 	}
 	
-	@SuppressWarnings("unchecked")
 	@Override
 	public <T, S> T replicate(String mux_id,
 			Class<T> iface, final S object, long startTimeout) throws Exception {
@@ -528,23 +546,24 @@
 			channel.connect(mux_id);
 			if (object instanceof ReplicatedObject) {
 				((ReplicatedObject)object).setAddress(new AddressWrapper(channel.getAddress()));
-				channel.getState(null, startTimeout);
-				Boolean loaded = proxy.state_promise.getResult(1);
-				if (loaded == null) {
-					LogManager.logInfo(LogConstants.CTX_RUNTIME, object + " timeout exceeded or first member"); //$NON-NLS-1$
-				} else if (loaded) {
-					LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "loaded"); //$NON-NLS-1$
-				} else {
-					LogManager.logWarning(LogConstants.CTX_RUNTIME, object + " load error"); //$NON-NLS-1$
-				}
+				proxy.pullState(null, null, null, startTimeout);
 			}
 			success = true;
 			return replicatedProxy;
+		} catch (Throwable e) {
+			if (e instanceof Exception) {
+				throw (Exception)e;
+			}
+			throw new TeiidRuntimeException(e);
 		} finally {
 			if (!success) {
 				channel.close();
+			} else {
+				synchronized (disp) {
+					//mark as initialized so that state can be pulled if needed
+					disp.initialized = true;
+				}
 			}
 		}
 	}
-	
 }

Modified: trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java	2011-11-23 19:43:50 UTC (rev 3699)
+++ trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java	2011-11-28 18:05:45 UTC (rev 3700)
@@ -75,6 +75,11 @@
 	 */
 	void droppedMembers(Collection<Serializable> addresses);
 	
+	/**
+	 * Return true if the object has the given state
+	 * @param state_id
+	 * @return
+	 */
 	boolean hasState(K state_id);
 	
 }



More information about the teiid-commits mailing list