[teiid-commits] teiid SVN: r4454 - in trunk: jboss-integration/src/main/java/org/teiid/replication/jboss and 8 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Mon Sep 17 14:31:06 EDT 2012


Author: rareddy
Date: 2012-09-17 14:31:05 -0400 (Mon, 17 Sep 2012)
New Revision: 4454

Added:
   trunk/runtime/src/main/java/org/teiid/replication/
   trunk/runtime/src/main/java/org/teiid/replication/jgroups/
   trunk/runtime/src/main/java/org/teiid/replication/jgroups/AddressWrapper.java
   trunk/runtime/src/main/java/org/teiid/replication/jgroups/ChannelFactory.java
   trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsInputStream.java
   trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsObjectReplicator.java
   trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsOutputStream.java
Removed:
   trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/AddressWrapper.java
   trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java
   trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
   trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java
Modified:
   trunk/jboss-integration/src/main/java/org/teiid/jboss/IntegrationPlugin.java
   trunk/jboss-integration/src/main/java/org/teiid/jboss/JGroupsObjectReplicatorService.java
   trunk/jboss-integration/src/main/java/org/teiid/jboss/TeiidAdd.java
   trunk/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties
   trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedConfiguration.java
   trunk/runtime/src/main/java/org/teiid/runtime/RuntimePlugin.java
   trunk/runtime/src/main/resources/org/teiid/runtime/i18n.properties
   trunk/test-integration/common/pom.xml
   trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestReplication.java
Log:
TEIID-2215: refactored and moved code that is part of jboss-integration into runtime module to support the jgroups based object replication for clustered embedded Teiid.

Modified: trunk/jboss-integration/src/main/java/org/teiid/jboss/IntegrationPlugin.java
===================================================================
--- trunk/jboss-integration/src/main/java/org/teiid/jboss/IntegrationPlugin.java	2012-09-17 18:26:52 UTC (rev 4453)
+++ trunk/jboss-integration/src/main/java/org/teiid/jboss/IntegrationPlugin.java	2012-09-17 18:31:05 UTC (rev 4454)
@@ -56,9 +56,7 @@
     	TEIID50017, // vdb.xml parse exception
     	TEIID50018, // failed VDB dependency processing
     	TEIID50019, // redeploying VDB
-    	TEIID50020, // replication error failed to pull
     	TEIID50021, // vdb defined translator not found
-    	TEIID50022, // replication error timeout during the pull
     	TEIID50023, // replication failed
     	TEIID50024, // failed metadata load
     	TEIID50025, // VDB deployed
@@ -72,7 +70,6 @@
     	TEIID50039, // socket_disabled
     	TEIID50040, // odbc_disabled
     	TEIID50041, // embedded disabled
-    	TEIID50042, // error state
     	TEIID50043,
     	TEIID50044, // vdb save failed
     	TEIID50047,
@@ -82,7 +79,6 @@
     	TEIID50055,
     	TEIID50056,
     	TEIID50057,
-    	TEIID50067,
     	TEIID50069,
     	TEIID50070,
     	TEIID50071,

Modified: trunk/jboss-integration/src/main/java/org/teiid/jboss/JGroupsObjectReplicatorService.java
===================================================================
--- trunk/jboss-integration/src/main/java/org/teiid/jboss/JGroupsObjectReplicatorService.java	2012-09-17 18:26:52 UTC (rev 4453)
+++ trunk/jboss-integration/src/main/java/org/teiid/jboss/JGroupsObjectReplicatorService.java	2012-09-17 18:31:05 UTC (rev 4454)
@@ -21,23 +21,32 @@
  */
 package org.teiid.jboss;
 
+import java.util.concurrent.Executor;
+
 import org.jboss.as.clustering.jgroups.ChannelFactory;
 import org.jboss.msc.service.Service;
 import org.jboss.msc.service.StartContext;
 import org.jboss.msc.service.StartException;
 import org.jboss.msc.service.StopContext;
 import org.jboss.msc.value.InjectedValue;
-import org.teiid.replication.jboss.JGroupsObjectReplicator;
+import org.jgroups.Channel;
+import org.teiid.replication.jgroups.JGroupsObjectReplicator;
 
 class JGroupsObjectReplicatorService implements Service<JGroupsObjectReplicator> {
 
 	public final InjectedValue<ChannelFactory> channelFactoryInjector = new InjectedValue<ChannelFactory>();
+	final InjectedValue<Executor> executorInjector = new InjectedValue<Executor>();
 	private JGroupsObjectReplicator replicator; 
 	
 	
 	@Override
 	public void start(StartContext context) throws StartException {
-		this.replicator = new JGroupsObjectReplicator(channelFactoryInjector.getValue());
+		this.replicator = new JGroupsObjectReplicator(new org.teiid.replication.jgroups.ChannelFactory() {
+			@Override
+			public Channel createChannel(String id) throws Exception {
+				return channelFactoryInjector.getValue().createChannel(id);
+			}
+		}, executorInjector.getValue());
 	}
 
 	@Override

Modified: trunk/jboss-integration/src/main/java/org/teiid/jboss/TeiidAdd.java
===================================================================
--- trunk/jboss-integration/src/main/java/org/teiid/jboss/TeiidAdd.java	2012-09-17 18:26:52 UTC (rev 4453)
+++ trunk/jboss-integration/src/main/java/org/teiid/jboss/TeiidAdd.java	2012-09-17 18:31:05 UTC (rev 4454)
@@ -83,7 +83,7 @@
 import org.teiid.logging.LogManager;
 import org.teiid.query.ObjectReplicator;
 import org.teiid.query.function.SystemFunctionManager;
-import org.teiid.replication.jboss.JGroupsObjectReplicator;
+import org.teiid.replication.jgroups.JGroupsObjectReplicator;
 import org.teiid.services.InternalEventDistributorFactory;
 
 class TeiidAdd extends AbstractAddStepHandler implements DescriptionProvider {
@@ -247,6 +247,7 @@
     		JGroupsObjectReplicatorService replicatorService = new JGroupsObjectReplicatorService();
 			ServiceBuilder<JGroupsObjectReplicator> serviceBuilder = target.addService(TeiidServiceNames.OBJECT_REPLICATOR, replicatorService);
 			serviceBuilder.addDependency(ServiceName.JBOSS.append("jgroups", "stack", stack), ChannelFactory.class, replicatorService.channelFactoryInjector); //$NON-NLS-1$ //$NON-NLS-2$
+			serviceBuilder.addDependency(TeiidServiceNames.executorServiceName(asyncThreadPoolName), Executor.class,  replicatorService.executorInjector);
 			newControllers.add(serviceBuilder.install());
 			LogManager.logInfo(LogConstants.CTX_RUNTIME, IntegrationPlugin.Util.gs(IntegrationPlugin.Event.TEIID50003)); 
     	} else {

Deleted: trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/AddressWrapper.java
===================================================================
--- trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/AddressWrapper.java	2012-09-17 18:26:52 UTC (rev 4453)
+++ trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/AddressWrapper.java	2012-09-17 18:31:05 UTC (rev 4454)
@@ -1,86 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership.  Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- * 
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- * 
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- * 
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.replication.jboss;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-
-import org.jgroups.Address;
-import org.teiid.core.util.ReflectionHelper;
-
-/**
- * Allows JGroups {@link Address} objects to be serializable
- */
-public final class AddressWrapper implements Externalizable {
-	
-	Address address;
-	
-	public AddressWrapper() {
-		
-	}
-	
-	public AddressWrapper(Address address) {
-		this.address = address;
-	}
-	
-	@Override
-	public int hashCode() {
-		return address.hashCode();
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		if (obj == this) {
-			return true;
-		}
-		if (!(obj instanceof AddressWrapper)) {
-			return false;
-		}
-		return address.equals(((AddressWrapper)obj).address);
-	}
-	
-	@Override
-	public void readExternal(ObjectInput in) throws IOException,
-			ClassNotFoundException {
-		String className = in.readUTF();
-		try {
-			this.address = (Address) ReflectionHelper.create(className, null, Thread.currentThread().getContextClassLoader());
-			this.address.readFrom(in);
-		} catch (Exception e) {
-			throw new IOException(e);
-		}
-	}
-	
-	@Override
-	public void writeExternal(ObjectOutput out) throws IOException {
-		out.writeUTF(address.getClass().getName());
-		try {
-			address.writeTo(out);
-		} catch (Exception e) {
-			throw new IOException(e);
-		}
-	}
-	
-}
\ No newline at end of file

Deleted: trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java
===================================================================
--- trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java	2012-09-17 18:26:52 UTC (rev 4453)
+++ trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java	2012-09-17 18:31:05 UTC (rev 4454)
@@ -1,117 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership.  Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- * 
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- * 
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- * 
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.replication.jboss;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-public class JGroupsInputStream extends InputStream {
-	
-	private long timeout = 15000;
-    private volatile byte[] buf;
-    private volatile int index=0;
-    private ReentrantLock lock = new ReentrantLock();
-    private Condition write = lock.newCondition();
-    private Condition doneReading = lock.newCondition();
-    
-    public JGroupsInputStream(long timeout) {
-    	this.timeout = timeout;
-	}
-    
-    @Override
-    public int read() throws IOException {
-        if (index < 0) {
-        	return -1;
-        }
-        if (buf == null) {
-        	lock.lock();
-            try {
-            	long waitTime = TimeUnit.MILLISECONDS.toNanos(timeout);
-            	while (buf == null) {
-            		waitTime = write.awaitNanos(waitTime);
-					if (waitTime <= 0) {
-	            		throw new IOException(new TimeoutException());
-	            	}
-            	}
-                if (index < 0) {
-                	return -1;
-                }
-            } catch(InterruptedException e) {
-            	throw new IOException(e);
-            } finally {
-            	lock.unlock();
-            }
-        }
-        if (index == buf.length) {
-        	lock.lock();
-        	try {
-	        	buf = null;
-	        	index = 0;
-	        	doneReading.signal();
-        	} finally {
-        		lock.unlock();
-        	}
-        	return read();
-        }
-        return buf[index++] & 0xff;
-    }
-    
-    @Override
-    public void close() {
-    	lock.lock();
-    	try {
-    		buf = null;
-    		index = -1;
-    		doneReading.signal();
-    	} finally {
-    		lock.unlock();
-    	}
-    }
-    
-    public void receive(byte[] bytes) throws InterruptedException {
-    	lock.lock();
-    	try {	
-    		if (index == -1) {
-    			return;
-    		}
-    		while (buf != null) {
-    			doneReading.await();
-    		}
-    		if (index == -1) {
-    			return;
-    		}
-    		buf = bytes;
-    		if (bytes == null) {
-    			index = -1;
-    		}
-    		write.signal();
-    	} finally {
-    		lock.unlock();
-    	}
-    }
-
-}
\ No newline at end of file

Deleted: trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
===================================================================
--- trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java	2012-09-17 18:26:52 UTC (rev 4453)
+++ trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java	2012-09-17 18:31:05 UTC (rev 4454)
@@ -1,615 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership.  Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- * 
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- * 
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- * 
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.replication.jboss;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-
-import org.jboss.as.clustering.jgroups.ChannelFactory;
-import org.jgroups.Address;
-import org.jgroups.Channel;
-import org.jgroups.MembershipListener;
-import org.jgroups.Message;
-import org.jgroups.MessageListener;
-import org.jgroups.ReceiverAdapter;
-import org.jgroups.View;
-import org.jgroups.blocks.MethodCall;
-import org.jgroups.blocks.MethodLookup;
-import org.jgroups.blocks.RequestOptions;
-import org.jgroups.blocks.ResponseMode;
-import org.jgroups.blocks.RpcDispatcher;
-import org.jgroups.util.Buffer;
-import org.jgroups.util.Promise;
-import org.jgroups.util.Rsp;
-import org.jgroups.util.RspList;
-import org.teiid.Replicated;
-import org.teiid.Replicated.ReplicationMode;
-import org.teiid.core.TeiidRuntimeException;
-import org.teiid.core.util.ObjectInputStreamWithClassloader;
-import org.teiid.jboss.IntegrationPlugin;
-import org.teiid.logging.LogConstants;
-import org.teiid.logging.LogManager;
-import org.teiid.query.ObjectReplicator;
-import org.teiid.query.ReplicatedObject;
-
- at SuppressWarnings("unchecked")
-public class JGroupsObjectReplicator implements ObjectReplicator, Serializable {
-
-	private static final int IO_TIMEOUT = 15000;
-
-	private final class ReplicatorRpcDispatcher<S> extends RpcDispatcher {
-		private final S object;
-		private boolean initialized;
-		private final HashMap<Method, Short> methodMap;
-		private final ArrayList<Method> methodList;
-		Map<List<?>, JGroupsInputStream> inputStreams = new ConcurrentHashMap<List<?>, JGroupsInputStream>();
-
-		private ReplicatorRpcDispatcher(Channel channel, MessageListener l,
-				MembershipListener l2, Object serverObj, S object,
-				HashMap<Method, Short> methodMap, ArrayList<Method> methodList) {
-			super(channel, l, l2, serverObj);
-			this.object = object;
-			this.methodMap = methodMap;
-			this.methodList = methodList;
-			setMarshaller(new ContextAwareMarshaller(getClass().getClassLoader()));
-		}
-
-		@Override
-		public Object handle(Message req) {
-			Object      body=null;
-
-		    if(req == null || req.getLength() == 0) {
-		        if(log.isErrorEnabled()) log.error("message or message buffer is null"); //$NON-NLS-1$
-		        return null;
-		    }
-		    
-		    try {
-		        body=req_marshaller != null?
-		                req_marshaller.objectFromBuffer(req.getBuffer(), req.getOffset(), req.getLength())
-		                : req.getObject();
-		    }
-		    catch(Throwable e) {
-		        if(log.isErrorEnabled()) log.error("exception marshalling object", e); //$NON-NLS-1$
-		        return e;
-		    }
-
-		    if(!(body instanceof MethodCall)) {
-		        if(log.isErrorEnabled()) log.error("message does not contain a MethodCall object"); //$NON-NLS-1$
-
-		        // create an exception to represent this and return it
-		        return  new IllegalArgumentException("message does not contain a MethodCall object") ; //$NON-NLS-1$
-		    }
-
-		    final MethodCall method_call=(MethodCall)body;
-
-		    try {
-		        if(log.isTraceEnabled())
-		            log.trace("[sender=" + req.getSrc() + "], method_call: " + method_call); //$NON-NLS-1$ //$NON-NLS-2$
-
-		        if (method_call.getId() >= methodList.size() - 5 && req.getSrc().equals(local_addr)) {
-		        	return null;
-		        }
-
-		        if (method_call.getId() >= methodList.size() - 3) {
-		        	Serializable address = new AddressWrapper(req.getSrc());
-		        	Serializable stateId = (Serializable)method_call.getArgs()[0];
-		        	List<?> key = Arrays.asList(stateId, address);
-		        	JGroupsInputStream is = inputStreams.get(key);
-		        	if (method_call.getId() == methodList.size() - 3) {
-		        		LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "create state", stateId); //$NON-NLS-1$
-		        		if (is != null) {
-		        			is.receive(null);
-		        		}
-		        		is = new JGroupsInputStream(IO_TIMEOUT);
-		        		this.inputStreams.put(key, is);
-		        		executor.execute(new StreamingRunner(object, stateId, is, null));
-		        	} else if (method_call.getId() == methodList.size() - 2) {
-		        		LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "building state", stateId); //$NON-NLS-1$
-		        		if (is != null) {
-		        			is.receive((byte[])method_call.getArgs()[1]);
-		        		}
-		        	} else if (method_call.getId() == methodList.size() - 1) {
-		        		LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "finished state", stateId); //$NON-NLS-1$
-		        		if (is != null) {
-		        			is.receive(null);
-		        		}
-		        		this.inputStreams.remove(key);
-		        	}  
-		        	return null;
-		        } else if (method_call.getId() == methodList.size() - 5) {
-		        	//hasState
-		        	ReplicatedObject ro = (ReplicatedObject)object;
-		        	Serializable stateId = (Serializable)method_call.getArgs()[0];
-		        	
-		        	if (stateId == null) {
-		        		synchronized (this) {
-							if (initialized) {
-								return Boolean.TRUE;
-							}
-							return null;
-						}
-		        	}
-		        	
-		        	if (ro.hasState(stateId)) {
-		        		return Boolean.TRUE;
-		        	}
-		        	return null;
-		        } else if (method_call.getId() == methodList.size() - 4) {
-		        	//sendState
-		        	ReplicatedObject ro = (ReplicatedObject)object;
-		        	String stateId = (String)method_call.getArgs()[0];
-		        	AddressWrapper dest = (AddressWrapper)method_call.getArgs()[1];
-		        	
-		        	JGroupsOutputStream oStream = new JGroupsOutputStream(this, Arrays.asList(dest.address), stateId, (short)(methodMap.size() - 3), false);
-					try {
-						if (stateId == null) {
-							ro.getState(oStream);
-						} else {
-							ro.getState(stateId, oStream);
-						}
-					} finally {
-						oStream.close();
-					}
-					LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "sent state", stateId); //$NON-NLS-1$
-			        return null;
-		        }
-		        
-		        Method m=method_lookup.findMethod(method_call.getId());
-		        if(m == null)
-		            throw new Exception("no method found for " + method_call.getId()); //$NON-NLS-1$
-		        method_call.setMethod(m);
-		        
-		    	return method_call.invoke(server_obj);
-		    }
-		    catch(Throwable x) {
-		        return x;
-		    }
-		}
-	}
-
-	private static final long serialVersionUID = -6851804958313095166L;
-	private static final String HAS_STATE = "hasState"; //$NON-NLS-1$
-	private static final String SEND_STATE = "sendState"; //$NON-NLS-1$
-	private static final String CREATE_STATE = "createState"; //$NON-NLS-1$
-	private static final String BUILD_STATE = "buildState"; //$NON-NLS-1$
-	private static final String FINISH_STATE = "finishState"; //$NON-NLS-1$
-
-	private final static class StreamingRunner implements Runnable {
-		private final Object object;
-		private final Serializable stateId;
-		private final JGroupsInputStream is;
-		private Promise<Boolean> promise;
-
-		private StreamingRunner(Object object, Serializable stateId, JGroupsInputStream is, Promise<Boolean> promise) {
-			this.object = object;
-			this.stateId = stateId;
-			this.is = is;
-			this.promise = promise;
-		}
-
-		@Override
-		public void run() {
-			try {
-				if (stateId == null) {
-					((ReplicatedObject<?>)object).setState(is);
-				} else {
-					((ReplicatedObject)object).setState(stateId, is);
-				}
-				if (promise != null) { 
-					promise.setResult(Boolean.TRUE);
-				}
-				LogManager.logDetail(LogConstants.CTX_RUNTIME, "state set", stateId); //$NON-NLS-1$
-			} catch (Exception e) {
-				if (promise != null) {
-					promise.setResult(Boolean.FALSE);
-				}
-				LogManager.logError(LogConstants.CTX_RUNTIME, e, IntegrationPlugin.Util.gs(IntegrationPlugin.Event.TEIID50042,stateId));
-			} finally {
-				is.close();
-			}
-		}
-	}
-
-	private final class ReplicatedInvocationHandler<S> extends ReceiverAdapter implements
-			InvocationHandler, Serializable {
-		
-		private static final int PULL_RETRIES = 3;
-		private static final long serialVersionUID = -2943462899945966103L;
-		private final S object;
-		private transient ReplicatorRpcDispatcher<S> disp;
-		private final HashMap<Method, Short> methodMap;
-	    protected List<Address> remoteMembers = Collections.synchronizedList(new ArrayList<Address>());
-		private Map<Serializable, Promise<Boolean>> loadingStates = new HashMap<Serializable, Promise<Boolean>>();
-	    
-		private ReplicatedInvocationHandler(S object,HashMap<Method, Short> methodMap) {
-			this.object = object;
-			this.methodMap = methodMap;
-		}
-		
-		List<Address> getRemoteMembersCopy() {
-			synchronized (remoteMembers) {
-				return new ArrayList<Address>(remoteMembers);
-			}
-		}
-		
-		public void setDisp(ReplicatorRpcDispatcher<S> disp) {
-			this.disp = disp;
-		}
-		
-		@Override
-		public Object invoke(Object proxy, Method method, Object[] args)
-				throws Throwable {
-			Short methodNum = methodMap.get(method);
-			if (methodNum == null || remoteMembers.isEmpty()) {
-				if (methodNum != null) {
-			    	Replicated annotation = method.getAnnotation(Replicated.class);
-			    	if (annotation != null && annotation.remoteOnly()) {
-			    		return null;
-			    	}
-				}
-				try {
-					return method.invoke(object, args);
-				} catch (InvocationTargetException e) {
-					throw e.getCause();
-				}
-			}
-		    try {
-		    	Replicated annotation = method.getAnnotation(Replicated.class);
-		    	if (annotation.replicateState() != ReplicationMode.NONE) {
-		    		return handleReplicateState(method, args, annotation);
-				}
-		        MethodCall call=new MethodCall(methodNum, args);
-		        List<Address> dests = null;
-		        if (annotation.remoteOnly()) {
-		        	dests = getRemoteMembersCopy();
-		        	if (dests.isEmpty()) {
-		        		return null;
-		        	}
-		        }
-		        RspList<Object> responses = disp.callRemoteMethods(dests, call, new RequestOptions().setMode(annotation.asynch()?ResponseMode.GET_NONE:ResponseMode.GET_ALL).setTimeout(annotation.timeout()).setAnycasting(dests != null));
-		        if (annotation.asynch()) {
-			        return null;
-		        }
-		        List<Object> results = responses.getResults();
-		        if (method.getReturnType() == boolean.class) {
-		        	for (Object o : results) {
-						if (!Boolean.TRUE.equals(o)) {
-							return false;
-						}
-					}
-		        	return true;
-		        } else if (method.getReturnType() == Collection.class) {
-		        	ArrayList<Object> result = new ArrayList<Object>();
-		        	for (Object o : results) {
-		        		result.addAll((Collection)o);
-					}
-		        	return results;
-		        }
-	        	return null;
-		    } catch(Exception e) {
-		        throw new RuntimeException(method + " " + args + " failed", e); //$NON-NLS-1$ //$NON-NLS-2$
-		    }
-		}
-		
-		protected Address whereIsState(Serializable stateId, long timeout) throws Exception {
-			if (remoteMembers.isEmpty()) {
-				return null;
-			}
-			RspList<Boolean> resp = this.disp.callRemoteMethods(getRemoteMembersCopy(), new MethodCall((short)(methodMap.size() - 5), new Object[]{stateId}), new RequestOptions(ResponseMode.GET_ALL, timeout));
-			Collection<Rsp<Boolean>> values = resp.values();
-			Rsp<Boolean> rsp = null;
-			for (Rsp<Boolean> response : values) {
-				if (Boolean.TRUE.equals(response.getValue())) {
-					rsp = response;
-					break;
-				}
-			}
-			if (rsp == null) {
-				return null;
-			}
-			return rsp.getSender();
-		}
-
-		private Object handleReplicateState(Method method, Object[] args,
-				Replicated annotation) throws IllegalAccessException,
-				Throwable, IOException, IllegalStateException, Exception {
-			Object result = null;
-			try {
-				result = method.invoke(object, args);
-			} catch (InvocationTargetException e) {
-				throw e.getCause();
-			}
-			ReplicatedObject ro = (ReplicatedObject)object;
-			Serializable stateId = (Serializable)args[0];
-			if (annotation.replicateState() == ReplicationMode.PUSH) {
-				if (!remoteMembers.isEmpty()) {
-					LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "replicating state", stateId); //$NON-NLS-1$
-					JGroupsOutputStream oStream = new JGroupsOutputStream(disp, null, stateId, (short)(methodMap.size() - 3), true);
-					try {
-						ro.getState(stateId, oStream);
-					} finally {
-						oStream.close();
-					}
-					LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "sent state", stateId); //$NON-NLS-1$
-				}
-			    return result;
-			}
-			if (result != null) {
-				return result;
-			}
-			long timeout = annotation.timeout();
-			return pullState(method, args, stateId, timeout);
-		}
-
-		/**
-		 * Pull the remote state.  The method and args are optional
-		 * to determine if the state has been made available.
-		 */
-		Object pullState(Method method, Object[] args, Serializable stateId,
-				long timeout) throws Throwable {
-			Object result = null;
-			for (int i = 0; i < PULL_RETRIES; i++) {
-				Promise<Boolean> p = null;
-				boolean wait = true;
-				synchronized (loadingStates) {
-					p = loadingStates.get(stateId);
-					if (p == null) {
-						wait = false;
-						if (method != null) {
-							try {
-								result = method.invoke(object, args);
-							} catch (InvocationTargetException e) {
-								throw e.getCause();
-							}
-							if (result != null) {
-								return result;
-							}
-						}
-						p = new Promise<Boolean>();
-						loadingStates.put(stateId, p);
-					}
-				}
-				if (wait) {
-					p.getResult(timeout);
-					continue;
-				}
-				try {
-					LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "pulling state", stateId); //$NON-NLS-1$
-					Address addr = whereIsState(stateId, timeout);
-					if (addr == null) {
-						LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "timeout exceeded or first member"); //$NON-NLS-1$	
-						break;
-					}
-					JGroupsInputStream is = new JGroupsInputStream(IO_TIMEOUT);
-					StreamingRunner runner = new StreamingRunner(object, stateId, is, p);
-					List<?> key = Arrays.asList(stateId, new AddressWrapper(addr));
-					disp.inputStreams.put(key, is);
-					executor.execute(runner);
-					
-					this.disp.callRemoteMethod(addr, new MethodCall((short)(methodMap.size() - 4), stateId, new AddressWrapper(this.disp.getChannel().getAddress())), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(true));
-					
-					Boolean fetched = p.getResult(timeout);
-
-					if (fetched != null) {
-						if (fetched) {
-							LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "pulled state", stateId); //$NON-NLS-1$
-							if (method !=null) {
-								try {
-									result = method.invoke(object, args);
-								} catch (InvocationTargetException e) {
-									throw e.getCause();
-								}
-								if (result != null) {
-									return result;
-								}
-							}
-							break;
-						} 
-						LogManager.logWarning(LogConstants.CTX_RUNTIME, IntegrationPlugin.Util.gs(IntegrationPlugin.Event.TEIID50020, object, stateId));
-					} else {
-						LogManager.logWarning(LogConstants.CTX_RUNTIME, IntegrationPlugin.Util.gs(IntegrationPlugin.Event.TEIID50022, object, stateId));
-					}
-				} finally {
-					synchronized (loadingStates) {
-						loadingStates.remove(stateId);
-					}
-				}
-			}
-			return null; //could not fetch the remote state
-		}
-		
-		@Override
-		public void viewAccepted(View newView) {
-			if (newView.getMembers() != null) {
-				synchronized (remoteMembers) {
-					remoteMembers.removeAll(newView.getMembers());
-					if (object instanceof ReplicatedObject<?> && !remoteMembers.isEmpty()) {
-						HashSet<Serializable> dropped = new HashSet<Serializable>();
-						for (Address address : remoteMembers) {
-							dropped.add(new AddressWrapper(address));
-						}
-						((ReplicatedObject<?>)object).droppedMembers(dropped);
-					}
-					remoteMembers.clear();
-					remoteMembers.addAll(newView.getMembers());
-					remoteMembers.remove(this.disp.getChannel().getAddress());
-				}
-			}
-		}
-	}
-	
-	private interface Streaming {
-		void sendState(Serializable id, AddressWrapper dest);
-		void createState(Serializable id);
-		void buildState(Serializable id, byte[] bytes);
-		void finishState(Serializable id);
-	}
-
-	//TODO: this should be configurable, or use a common executor
-	private transient Executor executor = Executors.newCachedThreadPool();
-	private transient ChannelFactory channelFactory;
-
-	public JGroupsObjectReplicator(ChannelFactory channelFactory) {
-		this.channelFactory = channelFactory;
-	}
-	
-	public void stop(Object object) {
-		if (object == null || !Proxy.isProxyClass(object.getClass())) {
-			return;
-		}
-		ReplicatedInvocationHandler<?> handler = (ReplicatedInvocationHandler<?>) Proxy.getInvocationHandler(object);
-		Channel c = handler.disp.getChannel();
-		handler.disp.stop();
-		c.close();
-	}
-	
-	@Override
-	public <T, S> T replicate(String mux_id,
-			Class<T> iface, final S object, long startTimeout) throws Exception {
-		Channel channel = channelFactory.createChannel(mux_id);
-		
-		// To keep the order of methods same at all the nodes.
-		TreeMap<String, Method> methods = new TreeMap<String, Method>();
-		for (Method method : iface.getMethods()) {
-			if (method.getAnnotation(Replicated.class) == null) {
-				continue;
-			}
-			methods.put(method.toGenericString(), method);
-		}
-		
-		final HashMap<Method, Short> methodMap = new HashMap<Method, Short>();
-		final ArrayList<Method> methodList = new ArrayList<Method>();
-		
-		for (String method : methods.keySet()) {
-			methodList.add(methods.get(method));
-			methodMap.put(methods.get(method), (short)(methodList.size() - 1));
-		}
-		
-		Method hasState = ReplicatedObject.class.getMethod(HAS_STATE, new Class<?>[] {Serializable.class});
-		methodList.add(hasState);
-		methodMap.put(hasState, (short)(methodList.size() - 1));
-		
-		Method sendState = JGroupsObjectReplicator.Streaming.class.getMethod(SEND_STATE, new Class<?>[] {Serializable.class, AddressWrapper.class});
-		methodList.add(sendState);
-		methodMap.put(sendState, (short)(methodList.size() - 1));
-		
-		//add in streaming methods
-		Method createState = JGroupsObjectReplicator.Streaming.class.getMethod(CREATE_STATE, new Class<?>[] {Serializable.class});
-		methodList.add(createState);
-		methodMap.put(createState, (short)(methodList.size() - 1));
-		Method buildState = JGroupsObjectReplicator.Streaming.class.getMethod(BUILD_STATE, new Class<?>[] {Serializable.class, byte[].class});
-		methodList.add(buildState);
-		methodMap.put(buildState, (short)(methodList.size() - 1));
-		Method finishState = JGroupsObjectReplicator.Streaming.class.getMethod(FINISH_STATE, new Class<?>[] {Serializable.class});
-		methodList.add(finishState);
-		methodMap.put(finishState, (short)(methodList.size() - 1));
-		
-        ReplicatedInvocationHandler<S> proxy = new ReplicatedInvocationHandler<S>(object, methodMap);
-        /*
-         * TODO: could have an object implement streaming
-         * Override the normal handle method to support streaming
-         */
-        ReplicatorRpcDispatcher disp = new ReplicatorRpcDispatcher<S>(channel, proxy, proxy, object, object, methodMap, methodList);
-		
-		proxy.setDisp(disp);
-        disp.setMethodLookup(new MethodLookup() {
-            public Method findMethod(short id) {
-                return methodList.get(id);
-            }
-        });
-        
-		T replicatedProxy = (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[] {iface}, proxy);
-		boolean success = false;
-		try {
-			channel.connect(mux_id);
-			if (object instanceof ReplicatedObject) {
-				((ReplicatedObject)object).setAddress(new AddressWrapper(channel.getAddress()));
-				proxy.pullState(null, null, null, startTimeout);
-			}
-			success = true;
-			return replicatedProxy;
-		} catch (Throwable e) {
-			if (e instanceof Exception) {
-				throw (Exception)e;
-			}
-			 throw new TeiidRuntimeException(IntegrationPlugin.Event.TEIID50067, e);
-		} finally {
-			if (!success) {
-				channel.close();
-			} else {
-				synchronized (disp) {
-					//mark as initialized so that state can be pulled if needed
-					disp.initialized = true;
-				}
-			}
-		}
-	}	
-	
-	// This class is used so that the objects are loaded with the current classes class loader
-	// rather than foreign class loader
-	static class ContextAwareMarshaller implements RpcDispatcher.Marshaller {
-		private ClassLoader classloader;
-		
-		public ContextAwareMarshaller(ClassLoader classloader) {
-			this.classloader = classloader;
-		}
-		
-		@Override
-		public Buffer objectToBuffer(Object obj) throws Exception {
-			ByteArrayOutputStream baos = new ByteArrayOutputStream();
-			ObjectOutputStream out = new ObjectOutputStream(baos);
-			out.writeObject(obj);
-			out.close();
-			return new Buffer(baos.toByteArray());
-		}
-
-		@Override
-		public Object objectFromBuffer(byte[] buf, int offset, int length) throws Exception {
-			ObjectInputStream in = new ObjectInputStreamWithClassloader(new ByteArrayInputStream(buf, offset, length), this.classloader);
-			Object anObj = in.readObject();
-			in.close();
-			return anObj;
-		}
-	}
-	
-}

Deleted: trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java
===================================================================
--- trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java	2012-09-17 18:26:52 UTC (rev 4453)
+++ trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java	2012-09-17 18:31:05 UTC (rev 4454)
@@ -1,104 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership.  Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- * 
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- * 
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- * 
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.replication.jboss;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-
-import org.jgroups.Address;
-import org.jgroups.blocks.MethodCall;
-import org.jgroups.blocks.RequestOptions;
-import org.jgroups.blocks.ResponseMode;
-import org.jgroups.blocks.RpcDispatcher;
-import org.teiid.core.types.Streamable;
-
-public class JGroupsOutputStream extends OutputStream {
-	
-	static final int CHUNK_SIZE=Streamable.STREAMING_BATCH_SIZE_IN_BYTES;
-    
-	protected final RpcDispatcher disp;
-    protected final List<Address> dests;
-    protected final Serializable stateId;
-    protected final short methodOffset;
-	
-    private volatile boolean closed=false;
-    private final byte[] buffer=new byte[CHUNK_SIZE];
-    private int index=0;
-
-    public JGroupsOutputStream(RpcDispatcher disp, List<Address> dests, Serializable stateId, short methodOffset, boolean sendCreate) throws IOException {
-        this.disp=disp;
-        this.dests=dests;
-        this.stateId=stateId;
-        this.methodOffset = methodOffset;
-        if (sendCreate) {
-	        try {
-	        	disp.callRemoteMethods(this.dests, new MethodCall(methodOffset, new Object[] {stateId}), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(dests != null));
-	        } catch(Exception e) {
-	        	throw new IOException(e);
-	        }
-        }
-    }
-
-    public void close() throws IOException {
-        if(closed) {
-            return;
-        }
-        flush();
-        try {
-        	disp.callRemoteMethods(dests, new MethodCall((short)(methodOffset + 2), new Object[] {stateId}), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(dests != null));
-        } catch(Exception e) {
-        }
-        closed=true;
-    }
-
-    public void flush() throws IOException {
-        checkClosed();
-        try {
-            if(index == 0) {
-                return;
-            }
-        	disp.callRemoteMethods(dests, new MethodCall((short)(methodOffset + 1), new Object[] {stateId, Arrays.copyOf(buffer, index)}), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(dests != null));
-            index=0;
-        } catch(Exception e) {
-        	throw new IOException(e);
-        }
-    }
-
-	private void checkClosed() throws IOException {
-		if(closed) {
-            throw new IOException("output stream is closed"); //$NON-NLS-1$
-		}
-	}
-
-    public void write(int b) throws IOException {
-        checkClosed();
-        if(index >= buffer.length) {
-            flush();
-        }
-        buffer[index++]=(byte)b;
-    }
-
-}
\ No newline at end of file

Modified: trunk/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties
===================================================================
--- trunk/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties	2012-09-17 18:26:52 UTC (rev 4453)
+++ trunk/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties	2012-09-17 18:31:05 UTC (rev 4454)
@@ -55,7 +55,7 @@
 TEIID50029=VDB {0}.{1} model "{2}" metadata is currently being loaded. Start Time: {3}
 TEIID50030=VDB {0}.{1} model "{2}" metadata loaded. End Time: {3}
 TEIID50036=VDB {0}.{1} model "{2}" metadata failed to load. Reason:{3}
-TEIID50042=error setting state {0}
+
 TEIID50043=Invalid metadata file found at {0}; delete this file and restart server.
 TEIID50069=Failed to load module {0}
 TEIID50089=Failed to find any services of type {0} from module {1}
@@ -188,8 +188,6 @@
 TEIID50005=Clearing cache {0} for vdb {1}.{2}
 TEIID50021=VDB {0}.{1} deployed in inactive state due to unavailability of data sources {2}
 TEIID50016=Invalid VDB file deployment failed {0}
-TEIID50020= {0} Failed to Pull {1}
-TEIID50022={0} timeout pulling {1}
 TEIID50078=Translator not found {0}
 no_vdb_found=VDB {0}.{1} not found or VDB is not in ACTIVE status
 no_model_found= VDB {0}.{1} does not have model with name {2}.

Copied: trunk/runtime/src/main/java/org/teiid/replication/jgroups/AddressWrapper.java (from rev 4449, trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/AddressWrapper.java)
===================================================================
--- trunk/runtime/src/main/java/org/teiid/replication/jgroups/AddressWrapper.java	                        (rev 0)
+++ trunk/runtime/src/main/java/org/teiid/replication/jgroups/AddressWrapper.java	2012-09-17 18:31:05 UTC (rev 4454)
@@ -0,0 +1,86 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.replication.jgroups;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+import org.jgroups.Address;
+import org.teiid.core.util.ReflectionHelper;
+
+/**
+ * Allows JGroups {@link Address} objects to be serializable
+ */
+public final class AddressWrapper implements Externalizable {
+	
+	Address address;
+	
+	public AddressWrapper() {
+		
+	}
+	
+	public AddressWrapper(Address address) {
+		this.address = address;
+	}
+	
+	@Override
+	public int hashCode() {
+		return address.hashCode();
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj == this) {
+			return true;
+		}
+		if (!(obj instanceof AddressWrapper)) {
+			return false;
+		}
+		return address.equals(((AddressWrapper)obj).address);
+	}
+	
+	@Override
+	public void readExternal(ObjectInput in) throws IOException,
+			ClassNotFoundException {
+		String className = in.readUTF();
+		try {
+			this.address = (Address) ReflectionHelper.create(className, null, Thread.currentThread().getContextClassLoader());
+			this.address.readFrom(in);
+		} catch (Exception e) {
+			throw new IOException(e);
+		}
+	}
+	
+	@Override
+	public void writeExternal(ObjectOutput out) throws IOException {
+		out.writeUTF(address.getClass().getName());
+		try {
+			address.writeTo(out);
+		} catch (Exception e) {
+			throw new IOException(e);
+		}
+	}
+	
+}
\ No newline at end of file


Property changes on: trunk/runtime/src/main/java/org/teiid/replication/jgroups/AddressWrapper.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain
Added: svn:mergeinfo
   + /branches/7.6.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/AddressWrapper.java:3673-3711,3754-3769
/branches/7.7.x/jboss-integration/src/main/java/org/teiid/replication/jboss/AddressWrapper.java:3816-3868,3876-3915,3920-3983
/branches/8.0.x/jboss-integration/src/main/java/org/teiid/replication/jboss/AddressWrapper.java:4002-4046,4048-4051
/branches/8.1.x/jboss-integration/src/main/java/org/teiid/replication/jboss/AddressWrapper.java:4319-4342
/trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/AddressWrapper.java:3507-3666

Added: trunk/runtime/src/main/java/org/teiid/replication/jgroups/ChannelFactory.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/replication/jgroups/ChannelFactory.java	                        (rev 0)
+++ trunk/runtime/src/main/java/org/teiid/replication/jgroups/ChannelFactory.java	2012-09-17 18:31:05 UTC (rev 4454)
@@ -0,0 +1,33 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+package org.teiid.replication.jgroups;
+
+import org.jgroups.Channel;
+
+public interface ChannelFactory {
+    /**
+     * Creates a JGroups channel
+     * @return a JGroups channel
+     * @throws Exception if there was a failure setting up the protocol stack
+     */
+    Channel createChannel(String id) throws Exception; 
+}


Property changes on: trunk/runtime/src/main/java/org/teiid/replication/jgroups/ChannelFactory.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Copied: trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsInputStream.java (from rev 4449, trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java)
===================================================================
--- trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsInputStream.java	                        (rev 0)
+++ trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsInputStream.java	2012-09-17 18:31:05 UTC (rev 4454)
@@ -0,0 +1,117 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.replication.jgroups;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class JGroupsInputStream extends InputStream {
+	
+	private long timeout = 15000;
+    private volatile byte[] buf;
+    private volatile int index=0;
+    private ReentrantLock lock = new ReentrantLock();
+    private Condition write = lock.newCondition();
+    private Condition doneReading = lock.newCondition();
+    
+    public JGroupsInputStream(long timeout) {
+    	this.timeout = timeout;
+	}
+    
+    @Override
+    public int read() throws IOException {
+        if (index < 0) {
+        	return -1;
+        }
+        if (buf == null) {
+        	lock.lock();
+            try {
+            	long waitTime = TimeUnit.MILLISECONDS.toNanos(timeout);
+            	while (buf == null) {
+            		waitTime = write.awaitNanos(waitTime);
+					if (waitTime <= 0) {
+	            		throw new IOException(new TimeoutException());
+	            	}
+            	}
+                if (index < 0) {
+                	return -1;
+                }
+            } catch(InterruptedException e) {
+            	throw new IOException(e);
+            } finally {
+            	lock.unlock();
+            }
+        }
+        if (index == buf.length) {
+        	lock.lock();
+        	try {
+	        	buf = null;
+	        	index = 0;
+	        	doneReading.signal();
+        	} finally {
+        		lock.unlock();
+        	}
+        	return read();
+        }
+        return buf[index++] & 0xff;
+    }
+    
+    @Override
+    public void close() {
+    	lock.lock();
+    	try {
+    		buf = null;
+    		index = -1;
+    		doneReading.signal();
+    	} finally {
+    		lock.unlock();
+    	}
+    }
+    
+    public void receive(byte[] bytes) throws InterruptedException {
+    	lock.lock();
+    	try {	
+    		if (index == -1) {
+    			return;
+    		}
+    		while (buf != null) {
+    			doneReading.await();
+    		}
+    		if (index == -1) {
+    			return;
+    		}
+    		buf = bytes;
+    		if (bytes == null) {
+    			index = -1;
+    		}
+    		write.signal();
+    	} finally {
+    		lock.unlock();
+    	}
+    }
+
+}
\ No newline at end of file


Property changes on: trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsInputStream.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain
Added: svn:mergeinfo
   + /branches/7.4.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java:3535-3555
/branches/7.6.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java:3673-3711,3754-3769
/branches/7.7.x/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java:3816-3868,3876-3915,3920-3983
/branches/8.0.x/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java:4002-4046,4048-4051
/branches/8.1.x/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java:4319-4342
/trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java:3507-3666

Copied: trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsObjectReplicator.java (from rev 4449, trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java)
===================================================================
--- trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsObjectReplicator.java	                        (rev 0)
+++ trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsObjectReplicator.java	2012-09-17 18:31:05 UTC (rev 4454)
@@ -0,0 +1,614 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.replication.jgroups;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+import org.jgroups.Address;
+import org.jgroups.Channel;
+import org.jgroups.MembershipListener;
+import org.jgroups.Message;
+import org.jgroups.MessageListener;
+import org.jgroups.ReceiverAdapter;
+import org.jgroups.View;
+import org.jgroups.blocks.MethodCall;
+import org.jgroups.blocks.MethodLookup;
+import org.jgroups.blocks.RequestOptions;
+import org.jgroups.blocks.ResponseMode;
+import org.jgroups.blocks.RpcDispatcher;
+import org.jgroups.util.Buffer;
+import org.jgroups.util.Promise;
+import org.jgroups.util.Rsp;
+import org.jgroups.util.RspList;
+import org.teiid.Replicated;
+import org.teiid.Replicated.ReplicationMode;
+import org.teiid.core.TeiidRuntimeException;
+import org.teiid.core.util.ObjectInputStreamWithClassloader;
+import org.teiid.logging.LogConstants;
+import org.teiid.logging.LogManager;
+import org.teiid.query.ObjectReplicator;
+import org.teiid.query.ReplicatedObject;
+import org.teiid.runtime.RuntimePlugin;
+
+ at SuppressWarnings("unchecked")
+public class JGroupsObjectReplicator implements ObjectReplicator, Serializable {
+
+	private static final int IO_TIMEOUT = 15000;
+
+	private final class ReplicatorRpcDispatcher<S> extends RpcDispatcher {
+		private final S object;
+		private boolean initialized;
+		private final HashMap<Method, Short> methodMap;
+		private final ArrayList<Method> methodList;
+		Map<List<?>, JGroupsInputStream> inputStreams = new ConcurrentHashMap<List<?>, JGroupsInputStream>();
+
+		private ReplicatorRpcDispatcher(Channel channel, MessageListener l,
+				MembershipListener l2, Object serverObj, S object,
+				HashMap<Method, Short> methodMap, ArrayList<Method> methodList) {
+			super(channel, l, l2, serverObj);
+			this.object = object;
+			this.methodMap = methodMap;
+			this.methodList = methodList;
+			setMarshaller(new ContextAwareMarshaller(getClass().getClassLoader()));
+		}
+
+		@Override
+		public Object handle(Message req) {
+			Object      body=null;
+
+		    if(req == null || req.getLength() == 0) {
+		        if(log.isErrorEnabled()) log.error("message or message buffer is null"); //$NON-NLS-1$
+		        return null;
+		    }
+		    
+		    try {
+		        body=req_marshaller != null?
+		                req_marshaller.objectFromBuffer(req.getBuffer(), req.getOffset(), req.getLength())
+		                : req.getObject();
+		    }
+		    catch(Throwable e) {
+		        if(log.isErrorEnabled()) log.error("exception marshalling object", e); //$NON-NLS-1$
+		        return e;
+		    }
+
+		    if(!(body instanceof MethodCall)) {
+		        if(log.isErrorEnabled()) log.error("message does not contain a MethodCall object"); //$NON-NLS-1$
+
+		        // create an exception to represent this and return it
+		        return  new IllegalArgumentException("message does not contain a MethodCall object") ; //$NON-NLS-1$
+		    }
+
+		    final MethodCall method_call=(MethodCall)body;
+
+		    try {
+		        if(log.isTraceEnabled())
+		            log.trace("[sender=" + req.getSrc() + "], method_call: " + method_call); //$NON-NLS-1$ //$NON-NLS-2$
+
+		        if (method_call.getId() >= methodList.size() - 5 && req.getSrc().equals(local_addr)) {
+		        	return null;
+		        }
+
+		        if (method_call.getId() >= methodList.size() - 3) {
+		        	Serializable address = new AddressWrapper(req.getSrc());
+		        	Serializable stateId = (Serializable)method_call.getArgs()[0];
+		        	List<?> key = Arrays.asList(stateId, address);
+		        	JGroupsInputStream is = inputStreams.get(key);
+		        	if (method_call.getId() == methodList.size() - 3) {
+		        		LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "create state", stateId); //$NON-NLS-1$
+		        		if (is != null) {
+		        			is.receive(null);
+		        		}
+		        		is = new JGroupsInputStream(IO_TIMEOUT);
+		        		this.inputStreams.put(key, is);
+		        		executor.execute(new StreamingRunner(object, stateId, is, null));
+		        	} else if (method_call.getId() == methodList.size() - 2) {
+		        		LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "building state", stateId); //$NON-NLS-1$
+		        		if (is != null) {
+		        			is.receive((byte[])method_call.getArgs()[1]);
+		        		}
+		        	} else if (method_call.getId() == methodList.size() - 1) {
+		        		LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "finished state", stateId); //$NON-NLS-1$
+		        		if (is != null) {
+		        			is.receive(null);
+		        		}
+		        		this.inputStreams.remove(key);
+		        	}  
+		        	return null;
+		        } else if (method_call.getId() == methodList.size() - 5) {
+		        	//hasState
+		        	ReplicatedObject ro = (ReplicatedObject)object;
+		        	Serializable stateId = (Serializable)method_call.getArgs()[0];
+		        	
+		        	if (stateId == null) {
+		        		synchronized (this) {
+							if (initialized) {
+								return Boolean.TRUE;
+							}
+							return null;
+						}
+		        	}
+		        	
+		        	if (ro.hasState(stateId)) {
+		        		return Boolean.TRUE;
+		        	}
+		        	return null;
+		        } else if (method_call.getId() == methodList.size() - 4) {
+		        	//sendState
+		        	ReplicatedObject ro = (ReplicatedObject)object;
+		        	String stateId = (String)method_call.getArgs()[0];
+		        	AddressWrapper dest = (AddressWrapper)method_call.getArgs()[1];
+		        	
+		        	JGroupsOutputStream oStream = new JGroupsOutputStream(this, Arrays.asList(dest.address), stateId, (short)(methodMap.size() - 3), false);
+					try {
+						if (stateId == null) {
+							ro.getState(oStream);
+						} else {
+							ro.getState(stateId, oStream);
+						}
+					} finally {
+						oStream.close();
+					}
+					LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "sent state", stateId); //$NON-NLS-1$
+			        return null;
+		        }
+		        
+		        Method m=method_lookup.findMethod(method_call.getId());
+		        if(m == null)
+		            throw new Exception("no method found for " + method_call.getId()); //$NON-NLS-1$
+		        method_call.setMethod(m);
+		        
+		    	return method_call.invoke(server_obj);
+		    }
+		    catch(Throwable x) {
+		        return x;
+		    }
+		}
+	}
+
+	private static final long serialVersionUID = -6851804958313095166L;
+	private static final String HAS_STATE = "hasState"; //$NON-NLS-1$
+	private static final String SEND_STATE = "sendState"; //$NON-NLS-1$
+	private static final String CREATE_STATE = "createState"; //$NON-NLS-1$
+	private static final String BUILD_STATE = "buildState"; //$NON-NLS-1$
+	private static final String FINISH_STATE = "finishState"; //$NON-NLS-1$
+
+	private final static class StreamingRunner implements Runnable {
+		private final Object object;
+		private final Serializable stateId;
+		private final JGroupsInputStream is;
+		private Promise<Boolean> promise;
+
+		private StreamingRunner(Object object, Serializable stateId, JGroupsInputStream is, Promise<Boolean> promise) {
+			this.object = object;
+			this.stateId = stateId;
+			this.is = is;
+			this.promise = promise;
+		}
+
+		@Override
+		public void run() {
+			try {
+				if (stateId == null) {
+					((ReplicatedObject<?>)object).setState(is);
+				} else {
+					((ReplicatedObject)object).setState(stateId, is);
+				}
+				if (promise != null) { 
+					promise.setResult(Boolean.TRUE);
+				}
+				LogManager.logDetail(LogConstants.CTX_RUNTIME, "state set", stateId); //$NON-NLS-1$
+			} catch (Exception e) {
+				if (promise != null) {
+					promise.setResult(Boolean.FALSE);
+				}
+				LogManager.logError(LogConstants.CTX_RUNTIME, e, RuntimePlugin.Util.gs(RuntimePlugin.Event.TEIID40101, stateId));
+			} finally {
+				is.close();
+			}
+		}
+	}
+
+	private final class ReplicatedInvocationHandler<S> extends ReceiverAdapter implements
+			InvocationHandler, Serializable {
+		
+		private static final int PULL_RETRIES = 3;
+		private static final long serialVersionUID = -2943462899945966103L;
+		private final S object;
+		private transient ReplicatorRpcDispatcher<S> disp;
+		private final HashMap<Method, Short> methodMap;
+	    protected List<Address> remoteMembers = Collections.synchronizedList(new ArrayList<Address>());
+		private Map<Serializable, Promise<Boolean>> loadingStates = new HashMap<Serializable, Promise<Boolean>>();
+	    
+		private ReplicatedInvocationHandler(S object,HashMap<Method, Short> methodMap) {
+			this.object = object;
+			this.methodMap = methodMap;
+		}
+		
+		List<Address> getRemoteMembersCopy() {
+			synchronized (remoteMembers) {
+				return new ArrayList<Address>(remoteMembers);
+			}
+		}
+		
+		public void setDisp(ReplicatorRpcDispatcher<S> disp) {
+			this.disp = disp;
+		}
+		
+		@Override
+		public Object invoke(Object proxy, Method method, Object[] args)
+				throws Throwable {
+			Short methodNum = methodMap.get(method);
+			if (methodNum == null || remoteMembers.isEmpty()) {
+				if (methodNum != null) {
+			    	Replicated annotation = method.getAnnotation(Replicated.class);
+			    	if (annotation != null && annotation.remoteOnly()) {
+			    		return null;
+			    	}
+				}
+				try {
+					return method.invoke(object, args);
+				} catch (InvocationTargetException e) {
+					throw e.getCause();
+				}
+			}
+		    try {
+		    	Replicated annotation = method.getAnnotation(Replicated.class);
+		    	if (annotation.replicateState() != ReplicationMode.NONE) {
+		    		return handleReplicateState(method, args, annotation);
+				}
+		        MethodCall call=new MethodCall(methodNum, args);
+		        List<Address> dests = null;
+		        if (annotation.remoteOnly()) {
+		        	dests = getRemoteMembersCopy();
+		        	if (dests.isEmpty()) {
+		        		return null;
+		        	}
+		        }
+		        RspList<Object> responses = disp.callRemoteMethods(dests, call, new RequestOptions().setMode(annotation.asynch()?ResponseMode.GET_NONE:ResponseMode.GET_ALL).setTimeout(annotation.timeout()).setAnycasting(dests != null));
+		        if (annotation.asynch()) {
+			        return null;
+		        }
+		        List<Object> results = responses.getResults();
+		        if (method.getReturnType() == boolean.class) {
+		        	for (Object o : results) {
+						if (!Boolean.TRUE.equals(o)) {
+							return false;
+						}
+					}
+		        	return true;
+		        } else if (method.getReturnType() == Collection.class) {
+		        	ArrayList<Object> result = new ArrayList<Object>();
+		        	for (Object o : results) {
+		        		result.addAll((Collection)o);
+					}
+		        	return results;
+		        }
+	        	return null;
+		    } catch(Exception e) {
+		        throw new RuntimeException(method + " " + args + " failed", e); //$NON-NLS-1$ //$NON-NLS-2$
+		    }
+		}
+		
+		protected Address whereIsState(Serializable stateId, long timeout) throws Exception {
+			if (remoteMembers.isEmpty()) {
+				return null;
+			}
+			RspList<Boolean> resp = this.disp.callRemoteMethods(getRemoteMembersCopy(), new MethodCall((short)(methodMap.size() - 5), new Object[]{stateId}), new RequestOptions(ResponseMode.GET_ALL, timeout));
+			Collection<Rsp<Boolean>> values = resp.values();
+			Rsp<Boolean> rsp = null;
+			for (Rsp<Boolean> response : values) {
+				if (Boolean.TRUE.equals(response.getValue())) {
+					rsp = response;
+					break;
+				}
+			}
+			if (rsp == null) {
+				return null;
+			}
+			return rsp.getSender();
+		}
+
+		private Object handleReplicateState(Method method, Object[] args,
+				Replicated annotation) throws IllegalAccessException,
+				Throwable, IOException, IllegalStateException, Exception {
+			Object result = null;
+			try {
+				result = method.invoke(object, args);
+			} catch (InvocationTargetException e) {
+				throw e.getCause();
+			}
+			ReplicatedObject ro = (ReplicatedObject)object;
+			Serializable stateId = (Serializable)args[0];
+			if (annotation.replicateState() == ReplicationMode.PUSH) {
+				if (!remoteMembers.isEmpty()) {
+					LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "replicating state", stateId); //$NON-NLS-1$
+					JGroupsOutputStream oStream = new JGroupsOutputStream(disp, null, stateId, (short)(methodMap.size() - 3), true);
+					try {
+						ro.getState(stateId, oStream);
+					} finally {
+						oStream.close();
+					}
+					LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "sent state", stateId); //$NON-NLS-1$
+				}
+			    return result;
+			}
+			if (result != null) {
+				return result;
+			}
+			long timeout = annotation.timeout();
+			return pullState(method, args, stateId, timeout);
+		}
+
+		/**
+		 * Pull the remote state.  The method and args are optional
+		 * to determine if the state has been made available.
+		 */
+		Object pullState(Method method, Object[] args, Serializable stateId,
+				long timeout) throws Throwable {
+			Object result = null;
+			for (int i = 0; i < PULL_RETRIES; i++) {
+				Promise<Boolean> p = null;
+				boolean wait = true;
+				synchronized (loadingStates) {
+					p = loadingStates.get(stateId);
+					if (p == null) {
+						wait = false;
+						if (method != null) {
+							try {
+								result = method.invoke(object, args);
+							} catch (InvocationTargetException e) {
+								throw e.getCause();
+							}
+							if (result != null) {
+								return result;
+							}
+						}
+						p = new Promise<Boolean>();
+						loadingStates.put(stateId, p);
+					}
+				}
+				if (wait) {
+					p.getResult(timeout);
+					continue;
+				}
+				try {
+					LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "pulling state", stateId); //$NON-NLS-1$
+					Address addr = whereIsState(stateId, timeout);
+					if (addr == null) {
+						LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "timeout exceeded or first member"); //$NON-NLS-1$	
+						break;
+					}
+					JGroupsInputStream is = new JGroupsInputStream(IO_TIMEOUT);
+					StreamingRunner runner = new StreamingRunner(object, stateId, is, p);
+					List<?> key = Arrays.asList(stateId, new AddressWrapper(addr));
+					disp.inputStreams.put(key, is);
+					executor.execute(runner);
+					
+					this.disp.callRemoteMethod(addr, new MethodCall((short)(methodMap.size() - 4), stateId, new AddressWrapper(this.disp.getChannel().getAddress())), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(true));
+					
+					Boolean fetched = p.getResult(timeout);
+
+					if (fetched != null) {
+						if (fetched) {
+							LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "pulled state", stateId); //$NON-NLS-1$
+							if (method !=null) {
+								try {
+									result = method.invoke(object, args);
+								} catch (InvocationTargetException e) {
+									throw e.getCause();
+								}
+								if (result != null) {
+									return result;
+								}
+							}
+							break;
+						} 
+						LogManager.logWarning(LogConstants.CTX_RUNTIME, RuntimePlugin.Util.gs(RuntimePlugin.Event.TEIID40102, object, stateId));
+					} else {
+						LogManager.logWarning(LogConstants.CTX_RUNTIME, RuntimePlugin.Util.gs(RuntimePlugin.Event.TEIID40103, object, stateId));
+					}
+				} finally {
+					synchronized (loadingStates) {
+						loadingStates.remove(stateId);
+					}
+				}
+			}
+			return null; //could not fetch the remote state
+		}
+		
+		@Override
+		public void viewAccepted(View newView) {
+			if (newView.getMembers() != null) {
+				synchronized (remoteMembers) {
+					remoteMembers.removeAll(newView.getMembers());
+					if (object instanceof ReplicatedObject<?> && !remoteMembers.isEmpty()) {
+						HashSet<Serializable> dropped = new HashSet<Serializable>();
+						for (Address address : remoteMembers) {
+							dropped.add(new AddressWrapper(address));
+						}
+						((ReplicatedObject<?>)object).droppedMembers(dropped);
+					}
+					remoteMembers.clear();
+					remoteMembers.addAll(newView.getMembers());
+					remoteMembers.remove(this.disp.getChannel().getAddress());
+				}
+			}
+		}
+	}
+	
+	private interface Streaming {
+		void sendState(Serializable id, AddressWrapper dest);
+		void createState(Serializable id);
+		void buildState(Serializable id, byte[] bytes);
+		void finishState(Serializable id);
+	}
+
+	//TODO: this should be configurable, or use a common executor
+	private transient Executor executor;
+	private transient ChannelFactory channelFactory;
+
+	public JGroupsObjectReplicator(ChannelFactory channelFactory, Executor executor) {
+		this.channelFactory = channelFactory;
+		this.executor = executor;
+	}
+	
+	public void stop(Object object) {
+		if (object == null || !Proxy.isProxyClass(object.getClass())) {
+			return;
+		}
+		ReplicatedInvocationHandler<?> handler = (ReplicatedInvocationHandler<?>) Proxy.getInvocationHandler(object);
+		Channel c = handler.disp.getChannel();
+		handler.disp.stop();
+		c.close();
+	}
+	
+	@Override
+	public <T, S> T replicate(String mux_id,
+			Class<T> iface, final S object, long startTimeout) throws Exception {
+		Channel channel = channelFactory.createChannel(mux_id);
+		
+		// To keep the order of methods same at all the nodes.
+		TreeMap<String, Method> methods = new TreeMap<String, Method>();
+		for (Method method : iface.getMethods()) {
+			if (method.getAnnotation(Replicated.class) == null) {
+				continue;
+			}
+			methods.put(method.toGenericString(), method);
+		}
+		
+		final HashMap<Method, Short> methodMap = new HashMap<Method, Short>();
+		final ArrayList<Method> methodList = new ArrayList<Method>();
+		
+		for (String method : methods.keySet()) {
+			methodList.add(methods.get(method));
+			methodMap.put(methods.get(method), (short)(methodList.size() - 1));
+		}
+		
+		Method hasState = ReplicatedObject.class.getMethod(HAS_STATE, new Class<?>[] {Serializable.class});
+		methodList.add(hasState);
+		methodMap.put(hasState, (short)(methodList.size() - 1));
+		
+		Method sendState = JGroupsObjectReplicator.Streaming.class.getMethod(SEND_STATE, new Class<?>[] {Serializable.class, AddressWrapper.class});
+		methodList.add(sendState);
+		methodMap.put(sendState, (short)(methodList.size() - 1));
+		
+		//add in streaming methods
+		Method createState = JGroupsObjectReplicator.Streaming.class.getMethod(CREATE_STATE, new Class<?>[] {Serializable.class});
+		methodList.add(createState);
+		methodMap.put(createState, (short)(methodList.size() - 1));
+		Method buildState = JGroupsObjectReplicator.Streaming.class.getMethod(BUILD_STATE, new Class<?>[] {Serializable.class, byte[].class});
+		methodList.add(buildState);
+		methodMap.put(buildState, (short)(methodList.size() - 1));
+		Method finishState = JGroupsObjectReplicator.Streaming.class.getMethod(FINISH_STATE, new Class<?>[] {Serializable.class});
+		methodList.add(finishState);
+		methodMap.put(finishState, (short)(methodList.size() - 1));
+		
+        ReplicatedInvocationHandler<S> proxy = new ReplicatedInvocationHandler<S>(object, methodMap);
+        /*
+         * TODO: could have an object implement streaming
+         * Override the normal handle method to support streaming
+         */
+        ReplicatorRpcDispatcher disp = new ReplicatorRpcDispatcher<S>(channel, proxy, proxy, object, object, methodMap, methodList);
+		
+		proxy.setDisp(disp);
+        disp.setMethodLookup(new MethodLookup() {
+            public Method findMethod(short id) {
+                return methodList.get(id);
+            }
+        });
+        
+		T replicatedProxy = (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[] {iface}, proxy);
+		boolean success = false;
+		try {
+			channel.connect(mux_id);
+			if (object instanceof ReplicatedObject) {
+				((ReplicatedObject)object).setAddress(new AddressWrapper(channel.getAddress()));
+				proxy.pullState(null, null, null, startTimeout);
+			}
+			success = true;
+			return replicatedProxy;
+		} catch (Throwable e) {
+			if (e instanceof Exception) {
+				throw (Exception)e;
+			}
+			 throw new TeiidRuntimeException(RuntimePlugin.Event.TEIID40104, e);
+		} finally {
+			if (!success) {
+				channel.close();
+			} else {
+				synchronized (disp) {
+					//mark as initialized so that state can be pulled if needed
+					disp.initialized = true;
+				}
+			}
+		}
+	}	
+	
+	// This class is used so that the objects are loaded with the current classes class loader
+	// rather than foreign class loader
+	static class ContextAwareMarshaller implements RpcDispatcher.Marshaller {
+		private ClassLoader classloader;
+		
+		public ContextAwareMarshaller(ClassLoader classloader) {
+			this.classloader = classloader;
+		}
+		
+		@Override
+		public Buffer objectToBuffer(Object obj) throws Exception {
+			ByteArrayOutputStream baos = new ByteArrayOutputStream();
+			ObjectOutputStream out = new ObjectOutputStream(baos);
+			out.writeObject(obj);
+			out.close();
+			return new Buffer(baos.toByteArray());
+		}
+
+		@Override
+		public Object objectFromBuffer(byte[] buf, int offset, int length) throws Exception {
+			ObjectInputStream in = new ObjectInputStreamWithClassloader(new ByteArrayInputStream(buf, offset, length), this.classloader);
+			Object anObj = in.readObject();
+			in.close();
+			return anObj;
+		}
+	}
+	
+}


Property changes on: trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsObjectReplicator.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain
Added: svn:mergeinfo
   + /branches/7.4.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java:3535-3555
/branches/7.6.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java:3673-3711,3754-3769
/branches/7.7.x/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java:3816-3868,3876-3915,3920-3983
/branches/8.0.x/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java:4002-4046,4048-4051
/branches/8.1.x/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java:4319-4342
/trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java:3507-3666

Copied: trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsOutputStream.java (from rev 4449, trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java)
===================================================================
--- trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsOutputStream.java	                        (rev 0)
+++ trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsOutputStream.java	2012-09-17 18:31:05 UTC (rev 4454)
@@ -0,0 +1,104 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.replication.jgroups;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+import org.jgroups.Address;
+import org.jgroups.blocks.MethodCall;
+import org.jgroups.blocks.RequestOptions;
+import org.jgroups.blocks.ResponseMode;
+import org.jgroups.blocks.RpcDispatcher;
+import org.teiid.core.types.Streamable;
+
+public class JGroupsOutputStream extends OutputStream {
+	
+	static final int CHUNK_SIZE=Streamable.STREAMING_BATCH_SIZE_IN_BYTES;
+    
+	protected final RpcDispatcher disp;
+    protected final List<Address> dests;
+    protected final Serializable stateId;
+    protected final short methodOffset;
+	
+    private volatile boolean closed=false;
+    private final byte[] buffer=new byte[CHUNK_SIZE];
+    private int index=0;
+
+    public JGroupsOutputStream(RpcDispatcher disp, List<Address> dests, Serializable stateId, short methodOffset, boolean sendCreate) throws IOException {
+        this.disp=disp;
+        this.dests=dests;
+        this.stateId=stateId;
+        this.methodOffset = methodOffset;
+        if (sendCreate) {
+	        try {
+	        	disp.callRemoteMethods(this.dests, new MethodCall(methodOffset, new Object[] {stateId}), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(dests != null));
+	        } catch(Exception e) {
+	        	throw new IOException(e);
+	        }
+        }
+    }
+
+    public void close() throws IOException {
+        if(closed) {
+            return;
+        }
+        flush();
+        try {
+        	disp.callRemoteMethods(dests, new MethodCall((short)(methodOffset + 2), new Object[] {stateId}), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(dests != null));
+        } catch(Exception e) {
+        }
+        closed=true;
+    }
+
+    public void flush() throws IOException {
+        checkClosed();
+        try {
+            if(index == 0) {
+                return;
+            }
+        	disp.callRemoteMethods(dests, new MethodCall((short)(methodOffset + 1), new Object[] {stateId, Arrays.copyOf(buffer, index)}), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(dests != null));
+            index=0;
+        } catch(Exception e) {
+        	throw new IOException(e);
+        }
+    }
+
+	private void checkClosed() throws IOException {
+		if(closed) {
+            throw new IOException("output stream is closed"); //$NON-NLS-1$
+		}
+	}
+
+    public void write(int b) throws IOException {
+        checkClosed();
+        if(index >= buffer.length) {
+            flush();
+        }
+        buffer[index++]=(byte)b;
+    }
+
+}
\ No newline at end of file


Property changes on: trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsOutputStream.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain
Added: svn:mergeinfo
   + /branches/7.4.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java:3535-3555
/branches/7.6.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java:3673-3711,3754-3769
/branches/7.7.x/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java:3816-3868,3876-3915,3920-3983
/branches/8.0.x/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java:4002-4046,4048-4051
/branches/8.1.x/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java:4319-4342
/trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java:3507-3666

Modified: trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedConfiguration.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedConfiguration.java	2012-09-17 18:26:52 UTC (rev 4453)
+++ trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedConfiguration.java	2012-09-17 18:31:05 UTC (rev 4454)
@@ -24,11 +24,14 @@
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.Executors;
 
 import javax.resource.spi.work.WorkManager;
 import javax.transaction.TransactionManager;
 
 import org.infinispan.manager.DefaultCacheManager;
+import org.jgroups.Channel;
+import org.jgroups.JChannel;
 import org.teiid.cache.CacheFactory;
 import org.teiid.cache.infinispan.InfinispanCacheFactory;
 import org.teiid.core.TeiidRuntimeException;
@@ -36,6 +39,8 @@
 import org.teiid.dqp.internal.process.TeiidExecutor;
 import org.teiid.dqp.internal.process.ThreadReuseExecutor;
 import org.teiid.query.ObjectReplicator;
+import org.teiid.replication.jgroups.ChannelFactory;
+import org.teiid.replication.jgroups.JGroupsObjectReplicator;
 import org.teiid.security.SecurityHelper;
 
 public class EmbeddedConfiguration extends DQPConfiguration {
@@ -50,6 +55,7 @@
 	private CacheFactory cacheFactory;
 	private int maxResultSetCacheStaleness = 60;
 	private String infinispanConfigFile = "infinispan-config.xml"; //$NON-NLS-1$
+	private String jgroupsConfigFile = "tcp.xml"; //$NON-NLS-1$
 	
 	public SecurityHelper getSecurityHelper() {
 		return securityHelper;
@@ -76,12 +82,23 @@
 	public void setTransactionManager(TransactionManager transactionManager) {
 		this.transactionManager = transactionManager;
 	}
+	
 	public ObjectReplicator getObjectReplicator() {
+		if (this.objectReplicator == null) {
+			this.objectReplicator = new JGroupsObjectReplicator(new ChannelFactory() {
+				@Override
+				public Channel createChannel(String id) throws Exception {
+					return new JChannel(this.getClass().getClassLoader().getResource(getJgroupsConfigFile()));
+				}
+			}, Executors.newCachedThreadPool());			
+		}
 		return objectReplicator;
 	}
+	
 	public void setObjectReplicator(ObjectReplicator objectReplicator) {
 		this.objectReplicator = objectReplicator;
 	}
+	
 	/**
 	 * Sets the {@link WorkManager} to be used instead of a {@link ThreadReuseExecutor}.
 	 * This means that Teiid will not own the processing threads and will not necessarily be
@@ -151,5 +168,11 @@
 	}
 	public void setMaxResultSetCacheStaleness(int maxResultSetCacheStaleness) {
 		this.maxResultSetCacheStaleness = maxResultSetCacheStaleness;
+	}
+	public String getJgroupsConfigFile() {
+		return jgroupsConfigFile;
+	}
+	public void setJgroupsConfigFile(String jgroupsConfigFile) {
+		this.jgroupsConfigFile = jgroupsConfigFile;
 	}	
 }

Modified: trunk/runtime/src/main/java/org/teiid/runtime/RuntimePlugin.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/runtime/RuntimePlugin.java	2012-09-17 18:26:52 UTC (rev 4453)
+++ trunk/runtime/src/main/java/org/teiid/runtime/RuntimePlugin.java	2012-09-17 18:31:05 UTC (rev 4454)
@@ -112,5 +112,9 @@
     	TEIID40098,
     	TEIID40099,
     	TEIID40100,
+    	TEIID40101,
+    	TEIID40102,
+    	TEIID40103,
+    	TEIID40104
     }
 }

Modified: trunk/runtime/src/main/resources/org/teiid/runtime/i18n.properties
===================================================================
--- trunk/runtime/src/main/resources/org/teiid/runtime/i18n.properties	2012-09-17 18:26:52 UTC (rev 4453)
+++ trunk/runtime/src/main/resources/org/teiid/runtime/i18n.properties	2012-09-17 18:31:05 UTC (rev 4454)
@@ -100,4 +100,8 @@
 TEIID40096=Waited {0} for VDB {1}.{2} to be deployed, but it never was.  Please check to see if the deployment is missing or is in error.
 TEIID40097=Waited {0} for VDB {1}.{2} to be ACTIVE, but it never was.  Please check it's sources - {3}.
 TEIID40098=Reached end of results; use hasNext() call to check if there are more results before calling next()
-TEIID40099=Cache system has been shutdown
\ No newline at end of file
+TEIID40099=Cache system has been shutdown
+
+TEIID40101=error setting state {0}
+TEIID40102= {0} Failed to Pull {1}
+TEIID40103={0} timeout pulling {1}
\ No newline at end of file

Modified: trunk/test-integration/common/pom.xml
===================================================================
--- trunk/test-integration/common/pom.xml	2012-09-17 18:26:52 UTC (rev 4453)
+++ trunk/test-integration/common/pom.xml	2012-09-17 18:31:05 UTC (rev 4454)
@@ -39,18 +39,6 @@
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
-        <!-- 
-        <dependency>
-           <groupId>javax.enterprise</groupId>
-           <artifactId>cdi-api</artifactId>
-           <scope>test</scope>
-        </dependency> 
-         -->  
-        <dependency>
-          <groupId>org.jboss.as</groupId>
-          <artifactId>jboss-as-clustering-jgroups</artifactId>
-          <scope>provided</scope>
-        </dependency>
 	</dependencies>
     
     <profiles>

Modified: trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestReplication.java
===================================================================
--- trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestReplication.java	2012-09-17 18:26:52 UTC (rev 4453)
+++ trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestReplication.java	2012-09-17 18:31:05 UTC (rev 4454)
@@ -22,7 +22,8 @@
 
 package org.teiid.systemmodel;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.sql.Connection;
 import java.sql.ResultSet;
@@ -30,9 +31,8 @@
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.concurrent.Executors;
 
-import org.jboss.as.clustering.jgroups.ChannelFactory;
-import org.jboss.as.server.ServerEnvironment;
 import org.jgroups.Channel;
 import org.jgroups.JChannel;
 import org.junit.BeforeClass;
@@ -42,10 +42,11 @@
 import org.teiid.jdbc.FakeServer;
 import org.teiid.jdbc.FakeServer.DeployVDBParameter;
 import org.teiid.metadata.FunctionMethod;
-import org.teiid.metadata.FunctionParameter;
 import org.teiid.metadata.FunctionMethod.Determinism;
 import org.teiid.metadata.FunctionMethod.PushDown;
-import org.teiid.replication.jboss.JGroupsObjectReplicator;
+import org.teiid.metadata.FunctionParameter;
+import org.teiid.replication.jgroups.ChannelFactory;
+import org.teiid.replication.jgroups.JGroupsObjectReplicator;
 import org.teiid.runtime.EmbeddedConfiguration;
 
 @SuppressWarnings("nls")
@@ -129,21 +130,8 @@
 
 	private FakeServer createServer() throws Exception {
 		FakeServer server = new FakeServer(false);
-		
-		JGroupsObjectReplicator jor = new JGroupsObjectReplicator(new ChannelFactory() {
-					@Override
-					public Channel createChannel(String id) throws Exception {
-						return new JChannel(this.getClass().getClassLoader().getResource("tcp.xml"));
-					}
 
-					@Override
-					public ServerEnvironment getServerEnvironment() {
-						return null;
-					}
-				});
-
 		EmbeddedConfiguration config = new EmbeddedConfiguration();
-		config.setObjectReplicator(jor);
 		config.setInfinispanConfigFile(UnitTestUtil.getTestDataPath()+"/infinispan-replicated-config.xml");
 		
 		server.start(config, true);



More information about the teiid-commits mailing list