[teiid-commits] teiid SVN: r3695 - in trunk: engine/src/main/java/org/teiid/common/buffer/impl and 3 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Tue Nov 22 21:22:18 EST 2011


Author: shawkins
Date: 2011-11-22 21:22:18 -0500 (Tue, 22 Nov 2011)
New Revision: 3695

Modified:
   trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
   trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java
   trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStore.java
   trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java
   trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
   trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViewReplication.java
Log:
TEIID-1720 upgrading to new jgroups.  however partial state is not corrected yet and initial state transfer may have to be rewritten

Modified: trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
===================================================================
--- trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java	2011-11-23 02:18:58 UTC (rev 3694)
+++ trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java	2011-11-23 02:22:18 UTC (rev 3695)
@@ -22,7 +22,11 @@
 
 package org.teiid.replication.jboss;
 
+import java.io.Externalizable;
+import java.io.IOException;
 import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.lang.reflect.InvocationHandler;
@@ -44,13 +48,9 @@
 import org.jboss.as.clustering.jgroups.ChannelFactory;
 import org.jgroups.Address;
 import org.jgroups.Channel;
-import org.jgroups.MembershipListener;
 import org.jgroups.Message;
-import org.jgroups.MessageListener;
-import org.jgroups.Receiver;
 import org.jgroups.ReceiverAdapter;
 import org.jgroups.View;
-import org.jgroups.blocks.GroupRequest;
 import org.jgroups.blocks.MethodCall;
 import org.jgroups.blocks.MethodLookup;
 import org.jgroups.blocks.RequestOptions;
@@ -61,6 +61,7 @@
 import org.jgroups.util.Util;
 import org.teiid.Replicated;
 import org.teiid.Replicated.ReplicationMode;
+import org.teiid.core.util.ReflectionHelper;
 import org.teiid.logging.LogConstants;
 import org.teiid.logging.LogManager;
 import org.teiid.query.ObjectReplicator;
@@ -68,6 +69,58 @@
 
 public abstract class JGroupsObjectReplicator implements ObjectReplicator, Serializable {
 	
+	public static final class AddressWrapper implements Externalizable {
+		
+		private Address address;
+		
+		public AddressWrapper() {
+			
+		}
+		
+		public AddressWrapper(Address address) {
+			this.address = address;
+		}
+		
+		@Override
+		public int hashCode() {
+			return address.hashCode();
+		}
+		
+		@Override
+		public boolean equals(Object obj) {
+			if (obj == this) {
+				return true;
+			}
+			if (!(obj instanceof AddressWrapper)) {
+				return false;
+			}
+			return address.equals(((AddressWrapper)obj).address);
+		}
+		
+		@Override
+		public void readExternal(ObjectInput in) throws IOException,
+				ClassNotFoundException {
+			String className = in.readUTF();
+			try {
+				this.address = (Address) ReflectionHelper.create(className, null, Thread.currentThread().getContextClassLoader());
+				this.address.readFrom(in);
+			} catch (Exception e) {
+				throw new IOException(e);
+			}
+		}
+		
+		@Override
+		public void writeExternal(ObjectOutput out) throws IOException {
+			out.writeUTF(address.getClass().getName());
+			try {
+				address.writeTo(out);
+			} catch (Exception e) {
+				throw new IOException(e);
+			}
+		}
+		
+	}
+	
 	private static final long serialVersionUID = -6851804958313095166L;
 	private static final String CREATE_STATE = "createState"; //$NON-NLS-1$
 	private static final String BUILD_STATE = "buildState"; //$NON-NLS-1$
@@ -97,9 +150,8 @@
 		}
 	}
 
-	private final static class ReplicatedInvocationHandler<S> implements
-			InvocationHandler, Serializable, MessageListener, Receiver,
-			MembershipListener {
+	private final static class ReplicatedInvocationHandler<S> extends ReceiverAdapter implements
+			InvocationHandler, Serializable {
 		
 		private static final long serialVersionUID = -2943462899945966103L;
 		private final S object;
@@ -172,7 +224,7 @@
 					LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "pulling state", stateId); //$NON-NLS-1$
 					long timeout = annotation.timeout();
 					threadLocalPromise.set(new Promise<Boolean>());
-					boolean getState = this.disp.getChannel().getState(null, stateId, timeout);
+					/*boolean getState = this.disp.getChannel().getState(null, stateId, timeout);
 					if (getState) {
 						Boolean loaded = threadLocalPromise.get().getResult(timeout);
 						if (Boolean.TRUE.equals(loaded)) {
@@ -182,7 +234,7 @@
 						}
 					} else {
 						LogManager.logInfo(LogConstants.CTX_RUNTIME, object + " first member or timeout exceeded " + stateId); //$NON-NLS-1$
-					}
+					}*/
 					LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "sent state", stateId); //$NON-NLS-1$
 			        return result;
 				}
@@ -193,7 +245,7 @@
 						dests = new Vector<Address>(remoteMembers);
 					}
 		        }
-		        RspList responses = disp.callRemoteMethods(dests, call, new RequestOptions().setMode(annotation.asynch()?ResponseMode.GET_NONE:ResponseMode.GET_ALL).setTimeout(annotation.timeout()));
+		        RspList<Object> responses = disp.callRemoteMethods(dests, call, new RequestOptions().setMode(annotation.asynch()?ResponseMode.GET_NONE:ResponseMode.GET_ALL).setTimeout(annotation.timeout()));
 		        if (annotation.asynch()) {
 			        return null;
 		        }
@@ -224,7 +276,11 @@
 				synchronized (remoteMembers) {
 					remoteMembers.removeAll(newView.getMembers());
 					if (object instanceof ReplicatedObject && !remoteMembers.isEmpty()) {
-						((ReplicatedObject)object).droppedMembers(new HashSet<Serializable>(remoteMembers));
+						HashSet<Serializable> dropped = new HashSet<Serializable>();
+						for (Address address : remoteMembers) {
+							dropped.add(new AddressWrapper(address));
+						}
+						((ReplicatedObject)object).droppedMembers(dropped);
 					}
 					remoteMembers.clear();
 					remoteMembers.addAll(newView.getMembers());
@@ -353,10 +409,14 @@
 		            if(log.isErrorEnabled()) log.error("message or message buffer is null"); //$NON-NLS-1$
 		            return null;
 		        }
+		        
+		        if (req.getSrc().equals(local_addr)) {
+		        	return null;
+		        }
 
 		        try {
 		            body=req_marshaller != null?
-		                    req_marshaller.objectFromByteBuffer(req.getBuffer(), req.getOffset(), req.getLength())
+		                    req_marshaller.objectFromBuffer(req.getBuffer(), req.getOffset(), req.getLength())
 		                    : req.getObject();
 		        }
 		        catch(Throwable e) {
@@ -381,7 +441,7 @@
 	                    throw new Exception("MethodCall uses ID=" + method_call.getId() + ", but method_lookup has not been set"); //$NON-NLS-1$ //$NON-NLS-2$
 
 		            if (method_call.getId() >= methodList.size() - 3) {
-		            	Serializable address = req.getSrc();
+		            	Serializable address = new AddressWrapper(req.getSrc());
 		            	String stateId = (String)method_call.getArgs()[0];
 		            	List<?> key = Arrays.asList(stateId, address);
 		            	JGroupsInputStream is = inputStreams.get(key);
@@ -433,17 +493,15 @@
 		try {
 			channel.connect(mux_id);
 			if (object instanceof ReplicatedObject) {
-				((ReplicatedObject)object).setLocalAddress(channel.getAddress());
-				boolean getState = channel.getState(null, startTimeout);
-				if (getState) {
-					Boolean loaded = proxy.state_promise.getResult(startTimeout);
-					if (Boolean.TRUE.equals(loaded)) {
-						LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "loaded"); //$NON-NLS-1$
-					} else {
-						LogManager.logWarning(LogConstants.CTX_RUNTIME, object + " load error or timeout"); //$NON-NLS-1$
-					}
+				((ReplicatedObject)object).setAddress(new AddressWrapper(channel.getAddress()));
+				channel.getState(null, startTimeout);
+				Boolean loaded = proxy.state_promise.getResult(1);
+				if (loaded == null) {
+					LogManager.logInfo(LogConstants.CTX_RUNTIME, object + " timeout exceeded or first member"); //$NON-NLS-1$
+				} else if (loaded) {
+					LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "loaded"); //$NON-NLS-1$
 				} else {
-					LogManager.logInfo(LogConstants.CTX_RUNTIME, object + " first member or timeout exceeded"); //$NON-NLS-1$
+					LogManager.logWarning(LogConstants.CTX_RUNTIME, object + " load error"); //$NON-NLS-1$
 				}
 			}
 			success = true;

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-11-23 02:18:58 UTC (rev 3694)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-11-23 02:22:18 UTC (rev 3695)
@@ -1051,7 +1051,7 @@
 	}
 
 	@Override
-	public void setLocalAddress(Serializable address) {
+	public void setAddress(Serializable address) {
 	}
 
 	@Override

Modified: trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java	2011-11-23 02:18:58 UTC (rev 3694)
+++ trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java	2011-11-23 02:22:18 UTC (rev 3695)
@@ -67,7 +67,7 @@
 	 * Allows the replicator to set the local address from the channel
 	 * @param address
 	 */
-	void setLocalAddress(Serializable address);
+	void setAddress(Serializable address);
 
 	/**
 	 * Called when members are dropped

Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStore.java	2011-11-23 02:18:58 UTC (rev 3694)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStore.java	2011-11-23 02:22:18 UTC (rev 3695)
@@ -49,7 +49,7 @@
 	
 	TempTableStore getTempTableStore();
 
-	Serializable getLocalAddress();
+	Serializable getAddress();
 	
 	List<?> updateMatViewRow(String matTableName, List<?> tuple, boolean delete) throws TeiidComponentException;
 

Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java	2011-11-23 02:18:58 UTC (rev 3694)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java	2011-11-23 02:22:18 UTC (rev 3695)
@@ -352,12 +352,12 @@
 	//begin replication methods
 	
 	@Override
-	public void setLocalAddress(Serializable address) {
+	public void setAddress(Serializable address) {
 		this.localAddress = address;
 	}
 	
 	@Override
-	public Serializable getLocalAddress() {
+	public Serializable getAddress() {
 		return localAddress;
 	}
 	

Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java	2011-11-23 02:18:58 UTC (rev 3694)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java	2011-11-23 02:22:18 UTC (rev 3695)
@@ -284,7 +284,7 @@
 			String matTableName = metadata.getFullName(matTableId);
 			LogManager.logDetail(LogConstants.CTX_MATVIEWS, "processing refreshmatview for", matViewName); //$NON-NLS-1$
 			boolean invalidate = Boolean.TRUE.equals(((Constant)proc.getParameter(2).getExpression()).getValue());
-			boolean needsLoading = globalStore.needsLoading(matTableName, globalStore.getLocalAddress(), true, true, invalidate);
+			boolean needsLoading = globalStore.needsLoading(matTableName, globalStore.getAddress(), true, true, invalidate);
 			if (!needsLoading) {
 				return CollectionTupleSource.createUpdateCountTupleSource(-1);
 			}
@@ -367,9 +367,9 @@
 			final MatTableInfo info = globalStore.getMatTableInfo(tableName);
 			boolean load = false;
 			while (!info.isUpToDate()) {
-				load = globalStore.needsLoading(tableName, globalStore.getLocalAddress(), true, false, false);
+				load = globalStore.needsLoading(tableName, globalStore.getAddress(), true, false, false);
 				if (load) {
-					load = globalStore.needsLoading(tableName, globalStore.getLocalAddress(), false, false, false);
+					load = globalStore.needsLoading(tableName, globalStore.getAddress(), false, false, false);
 					if (load) {
 						break;
 					}

Modified: trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViewReplication.java
===================================================================
--- trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViewReplication.java	2011-11-23 02:18:58 UTC (rev 3694)
+++ trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViewReplication.java	2011-11-23 02:22:18 UTC (rev 3695)
@@ -22,8 +22,7 @@
 
 package org.teiid.systemmodel;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 import java.sql.Connection;
 import java.sql.ResultSet;
@@ -33,22 +32,20 @@
 import java.util.HashMap;
 
 import org.jboss.as.clustering.jgroups.ChannelFactory;
-import org.jboss.as.clustering.jgroups.JChannelFactory;
 import org.jgroups.Channel;
+import org.jgroups.JChannel;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.teiid.core.types.DataTypeManager;
 import org.teiid.core.util.UnitTestUtil;
 import org.teiid.jdbc.FakeServer;
 import org.teiid.metadata.FunctionMethod;
+import org.teiid.metadata.FunctionParameter;
 import org.teiid.metadata.FunctionMethod.Determinism;
 import org.teiid.metadata.FunctionMethod.PushDown;
-import org.teiid.metadata.FunctionParameter;
 import org.teiid.replication.jboss.JGroupsObjectReplicator;
 
 @SuppressWarnings("nls")
- at Ignore
 public class TestMatViewReplication {
 	
     private static final String MATVIEWS = "matviews";
@@ -114,6 +111,7 @@
 		server2.stop();
     }
 
+	@SuppressWarnings("serial")
 	private FakeServer createServer() throws Exception {
 		FakeServer server = new FakeServer();
 		
@@ -123,9 +121,7 @@
 				return new ChannelFactory() {
 					@Override
 					public Channel createChannel(String id) throws Exception {
-				        JChannelFactory jcf = new JChannelFactory();
-				        jcf.setMultiplexerConfig(this.getClass().getClassLoader().getResource("stacks.xml")); //$NON-NLS-1$
-				        return jcf.createMultiplexerChannel("tcp", id);
+						return new JChannel(this.getClass().getClassLoader().getResource("tcp.xml"));
 					}
 				};
 			}



More information about the teiid-commits mailing list