Author: rareddy
Date: 2012-09-17 14:31:05 -0400 (Mon, 17 Sep 2012)
New Revision: 4454
Added:
trunk/runtime/src/main/java/org/teiid/replication/
trunk/runtime/src/main/java/org/teiid/replication/jgroups/
trunk/runtime/src/main/java/org/teiid/replication/jgroups/AddressWrapper.java
trunk/runtime/src/main/java/org/teiid/replication/jgroups/ChannelFactory.java
trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsInputStream.java
trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsObjectReplicator.java
trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsOutputStream.java
Removed:
trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/AddressWrapper.java
trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java
trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java
Modified:
trunk/jboss-integration/src/main/java/org/teiid/jboss/IntegrationPlugin.java
trunk/jboss-integration/src/main/java/org/teiid/jboss/JGroupsObjectReplicatorService.java
trunk/jboss-integration/src/main/java/org/teiid/jboss/TeiidAdd.java
trunk/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties
trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedConfiguration.java
trunk/runtime/src/main/java/org/teiid/runtime/RuntimePlugin.java
trunk/runtime/src/main/resources/org/teiid/runtime/i18n.properties
trunk/test-integration/common/pom.xml
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestReplication.java
Log:
TEIID-2215: refactored and moved code that is part of jboss-integration into runtime
module to support the jgroups based object replication for clustered embedded Teiid.
Modified: trunk/jboss-integration/src/main/java/org/teiid/jboss/IntegrationPlugin.java
===================================================================
---
trunk/jboss-integration/src/main/java/org/teiid/jboss/IntegrationPlugin.java 2012-09-17
18:26:52 UTC (rev 4453)
+++
trunk/jboss-integration/src/main/java/org/teiid/jboss/IntegrationPlugin.java 2012-09-17
18:31:05 UTC (rev 4454)
@@ -56,9 +56,7 @@
TEIID50017, // vdb.xml parse exception
TEIID50018, // failed VDB dependency processing
TEIID50019, // redeploying VDB
- TEIID50020, // replication error failed to pull
TEIID50021, // vdb defined translator not found
- TEIID50022, // replication error timeout during the pull
TEIID50023, // replication failed
TEIID50024, // failed metadata load
TEIID50025, // VDB deployed
@@ -72,7 +70,6 @@
TEIID50039, // socket_disabled
TEIID50040, // odbc_disabled
TEIID50041, // embedded disabled
- TEIID50042, // error state
TEIID50043,
TEIID50044, // vdb save failed
TEIID50047,
@@ -82,7 +79,6 @@
TEIID50055,
TEIID50056,
TEIID50057,
- TEIID50067,
TEIID50069,
TEIID50070,
TEIID50071,
Modified:
trunk/jboss-integration/src/main/java/org/teiid/jboss/JGroupsObjectReplicatorService.java
===================================================================
---
trunk/jboss-integration/src/main/java/org/teiid/jboss/JGroupsObjectReplicatorService.java 2012-09-17
18:26:52 UTC (rev 4453)
+++
trunk/jboss-integration/src/main/java/org/teiid/jboss/JGroupsObjectReplicatorService.java 2012-09-17
18:31:05 UTC (rev 4454)
@@ -21,23 +21,32 @@
*/
package org.teiid.jboss;
+import java.util.concurrent.Executor;
+
import org.jboss.as.clustering.jgroups.ChannelFactory;
import org.jboss.msc.service.Service;
import org.jboss.msc.service.StartContext;
import org.jboss.msc.service.StartException;
import org.jboss.msc.service.StopContext;
import org.jboss.msc.value.InjectedValue;
-import org.teiid.replication.jboss.JGroupsObjectReplicator;
+import org.jgroups.Channel;
+import org.teiid.replication.jgroups.JGroupsObjectReplicator;
class JGroupsObjectReplicatorService implements Service<JGroupsObjectReplicator> {
public final InjectedValue<ChannelFactory> channelFactoryInjector = new
InjectedValue<ChannelFactory>();
+ final InjectedValue<Executor> executorInjector = new
InjectedValue<Executor>();
private JGroupsObjectReplicator replicator;
@Override
public void start(StartContext context) throws StartException {
- this.replicator = new JGroupsObjectReplicator(channelFactoryInjector.getValue());
+ this.replicator = new JGroupsObjectReplicator(new
org.teiid.replication.jgroups.ChannelFactory() {
+ @Override
+ public Channel createChannel(String id) throws Exception {
+ return channelFactoryInjector.getValue().createChannel(id);
+ }
+ }, executorInjector.getValue());
}
@Override
Modified: trunk/jboss-integration/src/main/java/org/teiid/jboss/TeiidAdd.java
===================================================================
--- trunk/jboss-integration/src/main/java/org/teiid/jboss/TeiidAdd.java 2012-09-17
18:26:52 UTC (rev 4453)
+++ trunk/jboss-integration/src/main/java/org/teiid/jboss/TeiidAdd.java 2012-09-17
18:31:05 UTC (rev 4454)
@@ -83,7 +83,7 @@
import org.teiid.logging.LogManager;
import org.teiid.query.ObjectReplicator;
import org.teiid.query.function.SystemFunctionManager;
-import org.teiid.replication.jboss.JGroupsObjectReplicator;
+import org.teiid.replication.jgroups.JGroupsObjectReplicator;
import org.teiid.services.InternalEventDistributorFactory;
class TeiidAdd extends AbstractAddStepHandler implements DescriptionProvider {
@@ -247,6 +247,7 @@
JGroupsObjectReplicatorService replicatorService = new
JGroupsObjectReplicatorService();
ServiceBuilder<JGroupsObjectReplicator> serviceBuilder =
target.addService(TeiidServiceNames.OBJECT_REPLICATOR, replicatorService);
serviceBuilder.addDependency(ServiceName.JBOSS.append("jgroups",
"stack", stack), ChannelFactory.class,
replicatorService.channelFactoryInjector); //$NON-NLS-1$ //$NON-NLS-2$
+ serviceBuilder.addDependency(TeiidServiceNames.executorServiceName(asyncThreadPoolName),
Executor.class, replicatorService.executorInjector);
newControllers.add(serviceBuilder.install());
LogManager.logInfo(LogConstants.CTX_RUNTIME,
IntegrationPlugin.Util.gs(IntegrationPlugin.Event.TEIID50003));
} else {
Deleted:
trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/AddressWrapper.java
===================================================================
---
trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/AddressWrapper.java 2012-09-17
18:26:52 UTC (rev 4453)
+++
trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/AddressWrapper.java 2012-09-17
18:31:05 UTC (rev 4454)
@@ -1,86 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.replication.jboss;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-
-import org.jgroups.Address;
-import org.teiid.core.util.ReflectionHelper;
-
-/**
- * Allows JGroups {@link Address} objects to be serializable
- */
-public final class AddressWrapper implements Externalizable {
-
- Address address;
-
- public AddressWrapper() {
-
- }
-
- public AddressWrapper(Address address) {
- this.address = address;
- }
-
- @Override
- public int hashCode() {
- return address.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
- if (!(obj instanceof AddressWrapper)) {
- return false;
- }
- return address.equals(((AddressWrapper)obj).address);
- }
-
- @Override
- public void readExternal(ObjectInput in) throws IOException,
- ClassNotFoundException {
- String className = in.readUTF();
- try {
- this.address = (Address) ReflectionHelper.create(className, null,
Thread.currentThread().getContextClassLoader());
- this.address.readFrom(in);
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-
- @Override
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeUTF(address.getClass().getName());
- try {
- address.writeTo(out);
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-
-}
\ No newline at end of file
Deleted:
trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java
===================================================================
---
trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java 2012-09-17
18:26:52 UTC (rev 4453)
+++
trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java 2012-09-17
18:31:05 UTC (rev 4454)
@@ -1,117 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.replication.jboss;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-public class JGroupsInputStream extends InputStream {
-
- private long timeout = 15000;
- private volatile byte[] buf;
- private volatile int index=0;
- private ReentrantLock lock = new ReentrantLock();
- private Condition write = lock.newCondition();
- private Condition doneReading = lock.newCondition();
-
- public JGroupsInputStream(long timeout) {
- this.timeout = timeout;
- }
-
- @Override
- public int read() throws IOException {
- if (index < 0) {
- return -1;
- }
- if (buf == null) {
- lock.lock();
- try {
- long waitTime = TimeUnit.MILLISECONDS.toNanos(timeout);
- while (buf == null) {
- waitTime = write.awaitNanos(waitTime);
- if (waitTime <= 0) {
- throw new IOException(new TimeoutException());
- }
- }
- if (index < 0) {
- return -1;
- }
- } catch(InterruptedException e) {
- throw new IOException(e);
- } finally {
- lock.unlock();
- }
- }
- if (index == buf.length) {
- lock.lock();
- try {
- buf = null;
- index = 0;
- doneReading.signal();
- } finally {
- lock.unlock();
- }
- return read();
- }
- return buf[index++] & 0xff;
- }
-
- @Override
- public void close() {
- lock.lock();
- try {
- buf = null;
- index = -1;
- doneReading.signal();
- } finally {
- lock.unlock();
- }
- }
-
- public void receive(byte[] bytes) throws InterruptedException {
- lock.lock();
- try {
- if (index == -1) {
- return;
- }
- while (buf != null) {
- doneReading.await();
- }
- if (index == -1) {
- return;
- }
- buf = bytes;
- if (bytes == null) {
- index = -1;
- }
- write.signal();
- } finally {
- lock.unlock();
- }
- }
-
-}
\ No newline at end of file
Deleted:
trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
===================================================================
---
trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java 2012-09-17
18:26:52 UTC (rev 4453)
+++
trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java 2012-09-17
18:31:05 UTC (rev 4454)
@@ -1,615 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.replication.jboss;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-
-import org.jboss.as.clustering.jgroups.ChannelFactory;
-import org.jgroups.Address;
-import org.jgroups.Channel;
-import org.jgroups.MembershipListener;
-import org.jgroups.Message;
-import org.jgroups.MessageListener;
-import org.jgroups.ReceiverAdapter;
-import org.jgroups.View;
-import org.jgroups.blocks.MethodCall;
-import org.jgroups.blocks.MethodLookup;
-import org.jgroups.blocks.RequestOptions;
-import org.jgroups.blocks.ResponseMode;
-import org.jgroups.blocks.RpcDispatcher;
-import org.jgroups.util.Buffer;
-import org.jgroups.util.Promise;
-import org.jgroups.util.Rsp;
-import org.jgroups.util.RspList;
-import org.teiid.Replicated;
-import org.teiid.Replicated.ReplicationMode;
-import org.teiid.core.TeiidRuntimeException;
-import org.teiid.core.util.ObjectInputStreamWithClassloader;
-import org.teiid.jboss.IntegrationPlugin;
-import org.teiid.logging.LogConstants;
-import org.teiid.logging.LogManager;
-import org.teiid.query.ObjectReplicator;
-import org.teiid.query.ReplicatedObject;
-
-@SuppressWarnings("unchecked")
-public class JGroupsObjectReplicator implements ObjectReplicator, Serializable {
-
- private static final int IO_TIMEOUT = 15000;
-
- private final class ReplicatorRpcDispatcher<S> extends RpcDispatcher {
- private final S object;
- private boolean initialized;
- private final HashMap<Method, Short> methodMap;
- private final ArrayList<Method> methodList;
- Map<List<?>, JGroupsInputStream> inputStreams = new
ConcurrentHashMap<List<?>, JGroupsInputStream>();
-
- private ReplicatorRpcDispatcher(Channel channel, MessageListener l,
- MembershipListener l2, Object serverObj, S object,
- HashMap<Method, Short> methodMap, ArrayList<Method> methodList) {
- super(channel, l, l2, serverObj);
- this.object = object;
- this.methodMap = methodMap;
- this.methodList = methodList;
- setMarshaller(new ContextAwareMarshaller(getClass().getClassLoader()));
- }
-
- @Override
- public Object handle(Message req) {
- Object body=null;
-
- if(req == null || req.getLength() == 0) {
- if(log.isErrorEnabled()) log.error("message or message buffer is
null"); //$NON-NLS-1$
- return null;
- }
-
- try {
- body=req_marshaller != null?
- req_marshaller.objectFromBuffer(req.getBuffer(), req.getOffset(),
req.getLength())
- : req.getObject();
- }
- catch(Throwable e) {
- if(log.isErrorEnabled()) log.error("exception marshalling object",
e); //$NON-NLS-1$
- return e;
- }
-
- if(!(body instanceof MethodCall)) {
- if(log.isErrorEnabled()) log.error("message does not contain a MethodCall
object"); //$NON-NLS-1$
-
- // create an exception to represent this and return it
- return new IllegalArgumentException("message does not contain a
MethodCall object") ; //$NON-NLS-1$
- }
-
- final MethodCall method_call=(MethodCall)body;
-
- try {
- if(log.isTraceEnabled())
- log.trace("[sender=" + req.getSrc() + "], method_call:
" + method_call); //$NON-NLS-1$ //$NON-NLS-2$
-
- if (method_call.getId() >= methodList.size() - 5 &&
req.getSrc().equals(local_addr)) {
- return null;
- }
-
- if (method_call.getId() >= methodList.size() - 3) {
- Serializable address = new AddressWrapper(req.getSrc());
- Serializable stateId = (Serializable)method_call.getArgs()[0];
- List<?> key = Arrays.asList(stateId, address);
- JGroupsInputStream is = inputStreams.get(key);
- if (method_call.getId() == methodList.size() - 3) {
- LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "create
state", stateId); //$NON-NLS-1$
- if (is != null) {
- is.receive(null);
- }
- is = new JGroupsInputStream(IO_TIMEOUT);
- this.inputStreams.put(key, is);
- executor.execute(new StreamingRunner(object, stateId, is, null));
- } else if (method_call.getId() == methodList.size() - 2) {
- LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "building
state", stateId); //$NON-NLS-1$
- if (is != null) {
- is.receive((byte[])method_call.getArgs()[1]);
- }
- } else if (method_call.getId() == methodList.size() - 1) {
- LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "finished
state", stateId); //$NON-NLS-1$
- if (is != null) {
- is.receive(null);
- }
- this.inputStreams.remove(key);
- }
- return null;
- } else if (method_call.getId() == methodList.size() - 5) {
- //hasState
- ReplicatedObject ro = (ReplicatedObject)object;
- Serializable stateId = (Serializable)method_call.getArgs()[0];
-
- if (stateId == null) {
- synchronized (this) {
- if (initialized) {
- return Boolean.TRUE;
- }
- return null;
- }
- }
-
- if (ro.hasState(stateId)) {
- return Boolean.TRUE;
- }
- return null;
- } else if (method_call.getId() == methodList.size() - 4) {
- //sendState
- ReplicatedObject ro = (ReplicatedObject)object;
- String stateId = (String)method_call.getArgs()[0];
- AddressWrapper dest = (AddressWrapper)method_call.getArgs()[1];
-
- JGroupsOutputStream oStream = new JGroupsOutputStream(this,
Arrays.asList(dest.address), stateId, (short)(methodMap.size() - 3), false);
- try {
- if (stateId == null) {
- ro.getState(oStream);
- } else {
- ro.getState(stateId, oStream);
- }
- } finally {
- oStream.close();
- }
- LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "sent state",
stateId); //$NON-NLS-1$
- return null;
- }
-
- Method m=method_lookup.findMethod(method_call.getId());
- if(m == null)
- throw new Exception("no method found for " +
method_call.getId()); //$NON-NLS-1$
- method_call.setMethod(m);
-
- return method_call.invoke(server_obj);
- }
- catch(Throwable x) {
- return x;
- }
- }
- }
-
- private static final long serialVersionUID = -6851804958313095166L;
- private static final String HAS_STATE = "hasState"; //$NON-NLS-1$
- private static final String SEND_STATE = "sendState"; //$NON-NLS-1$
- private static final String CREATE_STATE = "createState"; //$NON-NLS-1$
- private static final String BUILD_STATE = "buildState"; //$NON-NLS-1$
- private static final String FINISH_STATE = "finishState"; //$NON-NLS-1$
-
- private final static class StreamingRunner implements Runnable {
- private final Object object;
- private final Serializable stateId;
- private final JGroupsInputStream is;
- private Promise<Boolean> promise;
-
- private StreamingRunner(Object object, Serializable stateId, JGroupsInputStream is,
Promise<Boolean> promise) {
- this.object = object;
- this.stateId = stateId;
- this.is = is;
- this.promise = promise;
- }
-
- @Override
- public void run() {
- try {
- if (stateId == null) {
- ((ReplicatedObject<?>)object).setState(is);
- } else {
- ((ReplicatedObject)object).setState(stateId, is);
- }
- if (promise != null) {
- promise.setResult(Boolean.TRUE);
- }
- LogManager.logDetail(LogConstants.CTX_RUNTIME, "state set", stateId);
//$NON-NLS-1$
- } catch (Exception e) {
- if (promise != null) {
- promise.setResult(Boolean.FALSE);
- }
- LogManager.logError(LogConstants.CTX_RUNTIME, e,
IntegrationPlugin.Util.gs(IntegrationPlugin.Event.TEIID50042,stateId));
- } finally {
- is.close();
- }
- }
- }
-
- private final class ReplicatedInvocationHandler<S> extends ReceiverAdapter
implements
- InvocationHandler, Serializable {
-
- private static final int PULL_RETRIES = 3;
- private static final long serialVersionUID = -2943462899945966103L;
- private final S object;
- private transient ReplicatorRpcDispatcher<S> disp;
- private final HashMap<Method, Short> methodMap;
- protected List<Address> remoteMembers = Collections.synchronizedList(new
ArrayList<Address>());
- private Map<Serializable, Promise<Boolean>> loadingStates = new
HashMap<Serializable, Promise<Boolean>>();
-
- private ReplicatedInvocationHandler(S object,HashMap<Method, Short> methodMap) {
- this.object = object;
- this.methodMap = methodMap;
- }
-
- List<Address> getRemoteMembersCopy() {
- synchronized (remoteMembers) {
- return new ArrayList<Address>(remoteMembers);
- }
- }
-
- public void setDisp(ReplicatorRpcDispatcher<S> disp) {
- this.disp = disp;
- }
-
- @Override
- public Object invoke(Object proxy, Method method, Object[] args)
- throws Throwable {
- Short methodNum = methodMap.get(method);
- if (methodNum == null || remoteMembers.isEmpty()) {
- if (methodNum != null) {
- Replicated annotation = method.getAnnotation(Replicated.class);
- if (annotation != null && annotation.remoteOnly()) {
- return null;
- }
- }
- try {
- return method.invoke(object, args);
- } catch (InvocationTargetException e) {
- throw e.getCause();
- }
- }
- try {
- Replicated annotation = method.getAnnotation(Replicated.class);
- if (annotation.replicateState() != ReplicationMode.NONE) {
- return handleReplicateState(method, args, annotation);
- }
- MethodCall call=new MethodCall(methodNum, args);
- List<Address> dests = null;
- if (annotation.remoteOnly()) {
- dests = getRemoteMembersCopy();
- if (dests.isEmpty()) {
- return null;
- }
- }
- RspList<Object> responses = disp.callRemoteMethods(dests, call, new
RequestOptions().setMode(annotation.asynch()?ResponseMode.GET_NONE:ResponseMode.GET_ALL).setTimeout(annotation.timeout()).setAnycasting(dests
!= null));
- if (annotation.asynch()) {
- return null;
- }
- List<Object> results = responses.getResults();
- if (method.getReturnType() == boolean.class) {
- for (Object o : results) {
- if (!Boolean.TRUE.equals(o)) {
- return false;
- }
- }
- return true;
- } else if (method.getReturnType() == Collection.class) {
- ArrayList<Object> result = new ArrayList<Object>();
- for (Object o : results) {
- result.addAll((Collection)o);
- }
- return results;
- }
- return null;
- } catch(Exception e) {
- throw new RuntimeException(method + " " + args + " failed",
e); //$NON-NLS-1$ //$NON-NLS-2$
- }
- }
-
- protected Address whereIsState(Serializable stateId, long timeout) throws Exception {
- if (remoteMembers.isEmpty()) {
- return null;
- }
- RspList<Boolean> resp = this.disp.callRemoteMethods(getRemoteMembersCopy(), new
MethodCall((short)(methodMap.size() - 5), new Object[]{stateId}), new
RequestOptions(ResponseMode.GET_ALL, timeout));
- Collection<Rsp<Boolean>> values = resp.values();
- Rsp<Boolean> rsp = null;
- for (Rsp<Boolean> response : values) {
- if (Boolean.TRUE.equals(response.getValue())) {
- rsp = response;
- break;
- }
- }
- if (rsp == null) {
- return null;
- }
- return rsp.getSender();
- }
-
- private Object handleReplicateState(Method method, Object[] args,
- Replicated annotation) throws IllegalAccessException,
- Throwable, IOException, IllegalStateException, Exception {
- Object result = null;
- try {
- result = method.invoke(object, args);
- } catch (InvocationTargetException e) {
- throw e.getCause();
- }
- ReplicatedObject ro = (ReplicatedObject)object;
- Serializable stateId = (Serializable)args[0];
- if (annotation.replicateState() == ReplicationMode.PUSH) {
- if (!remoteMembers.isEmpty()) {
- LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "replicating
state", stateId); //$NON-NLS-1$
- JGroupsOutputStream oStream = new JGroupsOutputStream(disp, null, stateId,
(short)(methodMap.size() - 3), true);
- try {
- ro.getState(stateId, oStream);
- } finally {
- oStream.close();
- }
- LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "sent state",
stateId); //$NON-NLS-1$
- }
- return result;
- }
- if (result != null) {
- return result;
- }
- long timeout = annotation.timeout();
- return pullState(method, args, stateId, timeout);
- }
-
- /**
- * Pull the remote state. The method and args are optional
- * to determine if the state has been made available.
- */
- Object pullState(Method method, Object[] args, Serializable stateId,
- long timeout) throws Throwable {
- Object result = null;
- for (int i = 0; i < PULL_RETRIES; i++) {
- Promise<Boolean> p = null;
- boolean wait = true;
- synchronized (loadingStates) {
- p = loadingStates.get(stateId);
- if (p == null) {
- wait = false;
- if (method != null) {
- try {
- result = method.invoke(object, args);
- } catch (InvocationTargetException e) {
- throw e.getCause();
- }
- if (result != null) {
- return result;
- }
- }
- p = new Promise<Boolean>();
- loadingStates.put(stateId, p);
- }
- }
- if (wait) {
- p.getResult(timeout);
- continue;
- }
- try {
- LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "pulling state",
stateId); //$NON-NLS-1$
- Address addr = whereIsState(stateId, timeout);
- if (addr == null) {
- LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "timeout exceeded or
first member"); //$NON-NLS-1$
- break;
- }
- JGroupsInputStream is = new JGroupsInputStream(IO_TIMEOUT);
- StreamingRunner runner = new StreamingRunner(object, stateId, is, p);
- List<?> key = Arrays.asList(stateId, new AddressWrapper(addr));
- disp.inputStreams.put(key, is);
- executor.execute(runner);
-
- this.disp.callRemoteMethod(addr, new MethodCall((short)(methodMap.size() - 4),
stateId, new AddressWrapper(this.disp.getChannel().getAddress())), new
RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(true));
-
- Boolean fetched = p.getResult(timeout);
-
- if (fetched != null) {
- if (fetched) {
- LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "pulled state",
stateId); //$NON-NLS-1$
- if (method !=null) {
- try {
- result = method.invoke(object, args);
- } catch (InvocationTargetException e) {
- throw e.getCause();
- }
- if (result != null) {
- return result;
- }
- }
- break;
- }
- LogManager.logWarning(LogConstants.CTX_RUNTIME,
IntegrationPlugin.Util.gs(IntegrationPlugin.Event.TEIID50020, object, stateId));
- } else {
- LogManager.logWarning(LogConstants.CTX_RUNTIME,
IntegrationPlugin.Util.gs(IntegrationPlugin.Event.TEIID50022, object, stateId));
- }
- } finally {
- synchronized (loadingStates) {
- loadingStates.remove(stateId);
- }
- }
- }
- return null; //could not fetch the remote state
- }
-
- @Override
- public void viewAccepted(View newView) {
- if (newView.getMembers() != null) {
- synchronized (remoteMembers) {
- remoteMembers.removeAll(newView.getMembers());
- if (object instanceof ReplicatedObject<?> && !remoteMembers.isEmpty())
{
- HashSet<Serializable> dropped = new HashSet<Serializable>();
- for (Address address : remoteMembers) {
- dropped.add(new AddressWrapper(address));
- }
- ((ReplicatedObject<?>)object).droppedMembers(dropped);
- }
- remoteMembers.clear();
- remoteMembers.addAll(newView.getMembers());
- remoteMembers.remove(this.disp.getChannel().getAddress());
- }
- }
- }
- }
-
- private interface Streaming {
- void sendState(Serializable id, AddressWrapper dest);
- void createState(Serializable id);
- void buildState(Serializable id, byte[] bytes);
- void finishState(Serializable id);
- }
-
- //TODO: this should be configurable, or use a common executor
- private transient Executor executor = Executors.newCachedThreadPool();
- private transient ChannelFactory channelFactory;
-
- public JGroupsObjectReplicator(ChannelFactory channelFactory) {
- this.channelFactory = channelFactory;
- }
-
- public void stop(Object object) {
- if (object == null || !Proxy.isProxyClass(object.getClass())) {
- return;
- }
- ReplicatedInvocationHandler<?> handler = (ReplicatedInvocationHandler<?>)
Proxy.getInvocationHandler(object);
- Channel c = handler.disp.getChannel();
- handler.disp.stop();
- c.close();
- }
-
- @Override
- public <T, S> T replicate(String mux_id,
- Class<T> iface, final S object, long startTimeout) throws Exception {
- Channel channel = channelFactory.createChannel(mux_id);
-
- // To keep the order of methods same at all the nodes.
- TreeMap<String, Method> methods = new TreeMap<String, Method>();
- for (Method method : iface.getMethods()) {
- if (method.getAnnotation(Replicated.class) == null) {
- continue;
- }
- methods.put(method.toGenericString(), method);
- }
-
- final HashMap<Method, Short> methodMap = new HashMap<Method, Short>();
- final ArrayList<Method> methodList = new ArrayList<Method>();
-
- for (String method : methods.keySet()) {
- methodList.add(methods.get(method));
- methodMap.put(methods.get(method), (short)(methodList.size() - 1));
- }
-
- Method hasState = ReplicatedObject.class.getMethod(HAS_STATE, new Class<?>[]
{Serializable.class});
- methodList.add(hasState);
- methodMap.put(hasState, (short)(methodList.size() - 1));
-
- Method sendState = JGroupsObjectReplicator.Streaming.class.getMethod(SEND_STATE, new
Class<?>[] {Serializable.class, AddressWrapper.class});
- methodList.add(sendState);
- methodMap.put(sendState, (short)(methodList.size() - 1));
-
- //add in streaming methods
- Method createState = JGroupsObjectReplicator.Streaming.class.getMethod(CREATE_STATE,
new Class<?>[] {Serializable.class});
- methodList.add(createState);
- methodMap.put(createState, (short)(methodList.size() - 1));
- Method buildState = JGroupsObjectReplicator.Streaming.class.getMethod(BUILD_STATE, new
Class<?>[] {Serializable.class, byte[].class});
- methodList.add(buildState);
- methodMap.put(buildState, (short)(methodList.size() - 1));
- Method finishState = JGroupsObjectReplicator.Streaming.class.getMethod(FINISH_STATE,
new Class<?>[] {Serializable.class});
- methodList.add(finishState);
- methodMap.put(finishState, (short)(methodList.size() - 1));
-
- ReplicatedInvocationHandler<S> proxy = new
ReplicatedInvocationHandler<S>(object, methodMap);
- /*
- * TODO: could have an object implement streaming
- * Override the normal handle method to support streaming
- */
- ReplicatorRpcDispatcher disp = new ReplicatorRpcDispatcher<S>(channel,
proxy, proxy, object, object, methodMap, methodList);
-
- proxy.setDisp(disp);
- disp.setMethodLookup(new MethodLookup() {
- public Method findMethod(short id) {
- return methodList.get(id);
- }
- });
-
- T replicatedProxy = (T)
Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]
{iface}, proxy);
- boolean success = false;
- try {
- channel.connect(mux_id);
- if (object instanceof ReplicatedObject) {
- ((ReplicatedObject)object).setAddress(new AddressWrapper(channel.getAddress()));
- proxy.pullState(null, null, null, startTimeout);
- }
- success = true;
- return replicatedProxy;
- } catch (Throwable e) {
- if (e instanceof Exception) {
- throw (Exception)e;
- }
- throw new TeiidRuntimeException(IntegrationPlugin.Event.TEIID50067, e);
- } finally {
- if (!success) {
- channel.close();
- } else {
- synchronized (disp) {
- //mark as initialized so that state can be pulled if needed
- disp.initialized = true;
- }
- }
- }
- }
-
- // This class is used so that the objects are loaded with the current classes class
loader
- // rather than foreign class loader
- static class ContextAwareMarshaller implements RpcDispatcher.Marshaller {
- private ClassLoader classloader;
-
- public ContextAwareMarshaller(ClassLoader classloader) {
- this.classloader = classloader;
- }
-
- @Override
- public Buffer objectToBuffer(Object obj) throws Exception {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream out = new ObjectOutputStream(baos);
- out.writeObject(obj);
- out.close();
- return new Buffer(baos.toByteArray());
- }
-
- @Override
- public Object objectFromBuffer(byte[] buf, int offset, int length) throws Exception {
- ObjectInputStream in = new ObjectInputStreamWithClassloader(new
ByteArrayInputStream(buf, offset, length), this.classloader);
- Object anObj = in.readObject();
- in.close();
- return anObj;
- }
- }
-
-}
Deleted:
trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java
===================================================================
---
trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java 2012-09-17
18:26:52 UTC (rev 4453)
+++
trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java 2012-09-17
18:31:05 UTC (rev 4454)
@@ -1,104 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.replication.jboss;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-
-import org.jgroups.Address;
-import org.jgroups.blocks.MethodCall;
-import org.jgroups.blocks.RequestOptions;
-import org.jgroups.blocks.ResponseMode;
-import org.jgroups.blocks.RpcDispatcher;
-import org.teiid.core.types.Streamable;
-
-public class JGroupsOutputStream extends OutputStream {
-
- static final int CHUNK_SIZE=Streamable.STREAMING_BATCH_SIZE_IN_BYTES;
-
- protected final RpcDispatcher disp;
- protected final List<Address> dests;
- protected final Serializable stateId;
- protected final short methodOffset;
-
- private volatile boolean closed=false;
- private final byte[] buffer=new byte[CHUNK_SIZE];
- private int index=0;
-
- public JGroupsOutputStream(RpcDispatcher disp, List<Address> dests,
Serializable stateId, short methodOffset, boolean sendCreate) throws IOException {
- this.disp=disp;
- this.dests=dests;
- this.stateId=stateId;
- this.methodOffset = methodOffset;
- if (sendCreate) {
- try {
- disp.callRemoteMethods(this.dests, new MethodCall(methodOffset, new Object[]
{stateId}), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(dests != null));
- } catch(Exception e) {
- throw new IOException(e);
- }
- }
- }
-
- public void close() throws IOException {
- if(closed) {
- return;
- }
- flush();
- try {
- disp.callRemoteMethods(dests, new MethodCall((short)(methodOffset + 2), new
Object[] {stateId}), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(dests !=
null));
- } catch(Exception e) {
- }
- closed=true;
- }
-
- public void flush() throws IOException {
- checkClosed();
- try {
- if(index == 0) {
- return;
- }
- disp.callRemoteMethods(dests, new MethodCall((short)(methodOffset + 1), new
Object[] {stateId, Arrays.copyOf(buffer, index)}), new
RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(dests != null));
- index=0;
- } catch(Exception e) {
- throw new IOException(e);
- }
- }
-
- private void checkClosed() throws IOException {
- if(closed) {
- throw new IOException("output stream is closed"); //$NON-NLS-1$
- }
- }
-
- public void write(int b) throws IOException {
- checkClosed();
- if(index >= buffer.length) {
- flush();
- }
- buffer[index++]=(byte)b;
- }
-
-}
\ No newline at end of file
Modified: trunk/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties
===================================================================
--- trunk/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties 2012-09-17
18:26:52 UTC (rev 4453)
+++ trunk/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties 2012-09-17
18:31:05 UTC (rev 4454)
@@ -55,7 +55,7 @@
TEIID50029=VDB {0}.{1} model "{2}" metadata is currently being loaded. Start
Time: {3}
TEIID50030=VDB {0}.{1} model "{2}" metadata loaded. End Time: {3}
TEIID50036=VDB {0}.{1} model "{2}" metadata failed to load. Reason:{3}
-TEIID50042=error setting state {0}
+
TEIID50043=Invalid metadata file found at {0}; delete this file and restart server.
TEIID50069=Failed to load module {0}
TEIID50089=Failed to find any services of type {0} from module {1}
@@ -188,8 +188,6 @@
TEIID50005=Clearing cache {0} for vdb {1}.{2}
TEIID50021=VDB {0}.{1} deployed in inactive state due to unavailability of data sources
{2}
TEIID50016=Invalid VDB file deployment failed {0}
-TEIID50020= {0} Failed to Pull {1}
-TEIID50022={0} timeout pulling {1}
TEIID50078=Translator not found {0}
no_vdb_found=VDB {0}.{1} not found or VDB is not in ACTIVE status
no_model_found= VDB {0}.{1} does not have model with name {2}.
Copied: trunk/runtime/src/main/java/org/teiid/replication/jgroups/AddressWrapper.java
(from rev 4449,
trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/AddressWrapper.java)
===================================================================
--- trunk/runtime/src/main/java/org/teiid/replication/jgroups/AddressWrapper.java
(rev 0)
+++
trunk/runtime/src/main/java/org/teiid/replication/jgroups/AddressWrapper.java 2012-09-17
18:31:05 UTC (rev 4454)
@@ -0,0 +1,86 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.replication.jgroups;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+import org.jgroups.Address;
+import org.teiid.core.util.ReflectionHelper;
+
+/**
+ * Allows JGroups {@link Address} objects to be serializable
+ */
+public final class AddressWrapper implements Externalizable {
+
+ Address address;
+
+ public AddressWrapper() {
+
+ }
+
+ public AddressWrapper(Address address) {
+ this.address = address;
+ }
+
+ @Override
+ public int hashCode() {
+ return address.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof AddressWrapper)) {
+ return false;
+ }
+ return address.equals(((AddressWrapper)obj).address);
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException,
+ ClassNotFoundException {
+ String className = in.readUTF();
+ try {
+ this.address = (Address) ReflectionHelper.create(className, null,
Thread.currentThread().getContextClassLoader());
+ this.address.readFrom(in);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeUTF(address.getClass().getName());
+ try {
+ address.writeTo(out);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+}
\ No newline at end of file
Property changes on:
trunk/runtime/src/main/java/org/teiid/replication/jgroups/AddressWrapper.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added: svn:mergeinfo
+
/branches/7.6.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/AddressWrapper.java:3673-3711,3754-3769
/branches/7.7.x/jboss-integration/src/main/java/org/teiid/replication/jboss/AddressWrapper.java:3816-3868,3876-3915,3920-3983
/branches/8.0.x/jboss-integration/src/main/java/org/teiid/replication/jboss/AddressWrapper.java:4002-4046,4048-4051
/branches/8.1.x/jboss-integration/src/main/java/org/teiid/replication/jboss/AddressWrapper.java:4319-4342
/trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/AddressWrapper.java:3507-3666
Added: trunk/runtime/src/main/java/org/teiid/replication/jgroups/ChannelFactory.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/replication/jgroups/ChannelFactory.java
(rev 0)
+++
trunk/runtime/src/main/java/org/teiid/replication/jgroups/ChannelFactory.java 2012-09-17
18:31:05 UTC (rev 4454)
@@ -0,0 +1,33 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+package org.teiid.replication.jgroups;
+
+import org.jgroups.Channel;
+
+public interface ChannelFactory {
+ /**
+ * Creates a JGroups channel
+ * @return a JGroups channel
+ * @throws Exception if there was a failure setting up the protocol stack
+ */
+ Channel createChannel(String id) throws Exception;
+}
Property changes on:
trunk/runtime/src/main/java/org/teiid/replication/jgroups/ChannelFactory.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Copied: trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsInputStream.java
(from rev 4449,
trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java)
===================================================================
--- trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsInputStream.java
(rev 0)
+++
trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsInputStream.java 2012-09-17
18:31:05 UTC (rev 4454)
@@ -0,0 +1,117 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.replication.jgroups;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class JGroupsInputStream extends InputStream {
+
+ private long timeout = 15000;
+ private volatile byte[] buf;
+ private volatile int index=0;
+ private ReentrantLock lock = new ReentrantLock();
+ private Condition write = lock.newCondition();
+ private Condition doneReading = lock.newCondition();
+
+ public JGroupsInputStream(long timeout) {
+ this.timeout = timeout;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (index < 0) {
+ return -1;
+ }
+ if (buf == null) {
+ lock.lock();
+ try {
+ long waitTime = TimeUnit.MILLISECONDS.toNanos(timeout);
+ while (buf == null) {
+ waitTime = write.awaitNanos(waitTime);
+ if (waitTime <= 0) {
+ throw new IOException(new TimeoutException());
+ }
+ }
+ if (index < 0) {
+ return -1;
+ }
+ } catch(InterruptedException e) {
+ throw new IOException(e);
+ } finally {
+ lock.unlock();
+ }
+ }
+ if (index == buf.length) {
+ lock.lock();
+ try {
+ buf = null;
+ index = 0;
+ doneReading.signal();
+ } finally {
+ lock.unlock();
+ }
+ return read();
+ }
+ return buf[index++] & 0xff;
+ }
+
+ @Override
+ public void close() {
+ lock.lock();
+ try {
+ buf = null;
+ index = -1;
+ doneReading.signal();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void receive(byte[] bytes) throws InterruptedException {
+ lock.lock();
+ try {
+ if (index == -1) {
+ return;
+ }
+ while (buf != null) {
+ doneReading.await();
+ }
+ if (index == -1) {
+ return;
+ }
+ buf = bytes;
+ if (bytes == null) {
+ index = -1;
+ }
+ write.signal();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+}
\ No newline at end of file
Property changes on:
trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsInputStream.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added: svn:mergeinfo
+
/branches/7.4.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java:3535-3555
/branches/7.6.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java:3673-3711,3754-3769
/branches/7.7.x/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java:3816-3868,3876-3915,3920-3983
/branches/8.0.x/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java:4002-4046,4048-4051
/branches/8.1.x/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java:4319-4342
/trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java:3507-3666
Copied:
trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsObjectReplicator.java
(from rev 4449,
trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java)
===================================================================
---
trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsObjectReplicator.java
(rev 0)
+++
trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsObjectReplicator.java 2012-09-17
18:31:05 UTC (rev 4454)
@@ -0,0 +1,614 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.replication.jgroups;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+import org.jgroups.Address;
+import org.jgroups.Channel;
+import org.jgroups.MembershipListener;
+import org.jgroups.Message;
+import org.jgroups.MessageListener;
+import org.jgroups.ReceiverAdapter;
+import org.jgroups.View;
+import org.jgroups.blocks.MethodCall;
+import org.jgroups.blocks.MethodLookup;
+import org.jgroups.blocks.RequestOptions;
+import org.jgroups.blocks.ResponseMode;
+import org.jgroups.blocks.RpcDispatcher;
+import org.jgroups.util.Buffer;
+import org.jgroups.util.Promise;
+import org.jgroups.util.Rsp;
+import org.jgroups.util.RspList;
+import org.teiid.Replicated;
+import org.teiid.Replicated.ReplicationMode;
+import org.teiid.core.TeiidRuntimeException;
+import org.teiid.core.util.ObjectInputStreamWithClassloader;
+import org.teiid.logging.LogConstants;
+import org.teiid.logging.LogManager;
+import org.teiid.query.ObjectReplicator;
+import org.teiid.query.ReplicatedObject;
+import org.teiid.runtime.RuntimePlugin;
+
+@SuppressWarnings("unchecked")
+public class JGroupsObjectReplicator implements ObjectReplicator, Serializable {
+
+ private static final int IO_TIMEOUT = 15000;
+
+ private final class ReplicatorRpcDispatcher<S> extends RpcDispatcher {
+ private final S object;
+ private boolean initialized;
+ private final HashMap<Method, Short> methodMap;
+ private final ArrayList<Method> methodList;
+ Map<List<?>, JGroupsInputStream> inputStreams = new
ConcurrentHashMap<List<?>, JGroupsInputStream>();
+
+ private ReplicatorRpcDispatcher(Channel channel, MessageListener l,
+ MembershipListener l2, Object serverObj, S object,
+ HashMap<Method, Short> methodMap, ArrayList<Method> methodList) {
+ super(channel, l, l2, serverObj);
+ this.object = object;
+ this.methodMap = methodMap;
+ this.methodList = methodList;
+ setMarshaller(new ContextAwareMarshaller(getClass().getClassLoader()));
+ }
+
+ @Override
+ public Object handle(Message req) {
+ Object body=null;
+
+ if(req == null || req.getLength() == 0) {
+ if(log.isErrorEnabled()) log.error("message or message buffer is
null"); //$NON-NLS-1$
+ return null;
+ }
+
+ try {
+ body=req_marshaller != null?
+ req_marshaller.objectFromBuffer(req.getBuffer(), req.getOffset(),
req.getLength())
+ : req.getObject();
+ }
+ catch(Throwable e) {
+ if(log.isErrorEnabled()) log.error("exception marshalling object",
e); //$NON-NLS-1$
+ return e;
+ }
+
+ if(!(body instanceof MethodCall)) {
+ if(log.isErrorEnabled()) log.error("message does not contain a MethodCall
object"); //$NON-NLS-1$
+
+ // create an exception to represent this and return it
+ return new IllegalArgumentException("message does not contain a
MethodCall object") ; //$NON-NLS-1$
+ }
+
+ final MethodCall method_call=(MethodCall)body;
+
+ try {
+ if(log.isTraceEnabled())
+ log.trace("[sender=" + req.getSrc() + "], method_call:
" + method_call); //$NON-NLS-1$ //$NON-NLS-2$
+
+ if (method_call.getId() >= methodList.size() - 5 &&
req.getSrc().equals(local_addr)) {
+ return null;
+ }
+
+ if (method_call.getId() >= methodList.size() - 3) {
+ Serializable address = new AddressWrapper(req.getSrc());
+ Serializable stateId = (Serializable)method_call.getArgs()[0];
+ List<?> key = Arrays.asList(stateId, address);
+ JGroupsInputStream is = inputStreams.get(key);
+ if (method_call.getId() == methodList.size() - 3) {
+ LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "create
state", stateId); //$NON-NLS-1$
+ if (is != null) {
+ is.receive(null);
+ }
+ is = new JGroupsInputStream(IO_TIMEOUT);
+ this.inputStreams.put(key, is);
+ executor.execute(new StreamingRunner(object, stateId, is, null));
+ } else if (method_call.getId() == methodList.size() - 2) {
+ LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "building
state", stateId); //$NON-NLS-1$
+ if (is != null) {
+ is.receive((byte[])method_call.getArgs()[1]);
+ }
+ } else if (method_call.getId() == methodList.size() - 1) {
+ LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "finished
state", stateId); //$NON-NLS-1$
+ if (is != null) {
+ is.receive(null);
+ }
+ this.inputStreams.remove(key);
+ }
+ return null;
+ } else if (method_call.getId() == methodList.size() - 5) {
+ //hasState
+ ReplicatedObject ro = (ReplicatedObject)object;
+ Serializable stateId = (Serializable)method_call.getArgs()[0];
+
+ if (stateId == null) {
+ synchronized (this) {
+ if (initialized) {
+ return Boolean.TRUE;
+ }
+ return null;
+ }
+ }
+
+ if (ro.hasState(stateId)) {
+ return Boolean.TRUE;
+ }
+ return null;
+ } else if (method_call.getId() == methodList.size() - 4) {
+ //sendState
+ ReplicatedObject ro = (ReplicatedObject)object;
+ String stateId = (String)method_call.getArgs()[0];
+ AddressWrapper dest = (AddressWrapper)method_call.getArgs()[1];
+
+ JGroupsOutputStream oStream = new JGroupsOutputStream(this,
Arrays.asList(dest.address), stateId, (short)(methodMap.size() - 3), false);
+ try {
+ if (stateId == null) {
+ ro.getState(oStream);
+ } else {
+ ro.getState(stateId, oStream);
+ }
+ } finally {
+ oStream.close();
+ }
+ LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "sent state",
stateId); //$NON-NLS-1$
+ return null;
+ }
+
+ Method m=method_lookup.findMethod(method_call.getId());
+ if(m == null)
+ throw new Exception("no method found for " +
method_call.getId()); //$NON-NLS-1$
+ method_call.setMethod(m);
+
+ return method_call.invoke(server_obj);
+ }
+ catch(Throwable x) {
+ return x;
+ }
+ }
+ }
+
+ private static final long serialVersionUID = -6851804958313095166L;
+ private static final String HAS_STATE = "hasState"; //$NON-NLS-1$
+ private static final String SEND_STATE = "sendState"; //$NON-NLS-1$
+ private static final String CREATE_STATE = "createState"; //$NON-NLS-1$
+ private static final String BUILD_STATE = "buildState"; //$NON-NLS-1$
+ private static final String FINISH_STATE = "finishState"; //$NON-NLS-1$
+
+ private final static class StreamingRunner implements Runnable {
+ private final Object object;
+ private final Serializable stateId;
+ private final JGroupsInputStream is;
+ private Promise<Boolean> promise;
+
+ private StreamingRunner(Object object, Serializable stateId, JGroupsInputStream is,
Promise<Boolean> promise) {
+ this.object = object;
+ this.stateId = stateId;
+ this.is = is;
+ this.promise = promise;
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (stateId == null) {
+ ((ReplicatedObject<?>)object).setState(is);
+ } else {
+ ((ReplicatedObject)object).setState(stateId, is);
+ }
+ if (promise != null) {
+ promise.setResult(Boolean.TRUE);
+ }
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, "state set", stateId);
//$NON-NLS-1$
+ } catch (Exception e) {
+ if (promise != null) {
+ promise.setResult(Boolean.FALSE);
+ }
+ LogManager.logError(LogConstants.CTX_RUNTIME, e,
RuntimePlugin.Util.gs(RuntimePlugin.Event.TEIID40101, stateId));
+ } finally {
+ is.close();
+ }
+ }
+ }
+
+ private final class ReplicatedInvocationHandler<S> extends ReceiverAdapter
implements
+ InvocationHandler, Serializable {
+
+ private static final int PULL_RETRIES = 3;
+ private static final long serialVersionUID = -2943462899945966103L;
+ private final S object;
+ private transient ReplicatorRpcDispatcher<S> disp;
+ private final HashMap<Method, Short> methodMap;
+ protected List<Address> remoteMembers = Collections.synchronizedList(new
ArrayList<Address>());
+ private Map<Serializable, Promise<Boolean>> loadingStates = new
HashMap<Serializable, Promise<Boolean>>();
+
+ private ReplicatedInvocationHandler(S object,HashMap<Method, Short> methodMap) {
+ this.object = object;
+ this.methodMap = methodMap;
+ }
+
+ List<Address> getRemoteMembersCopy() {
+ synchronized (remoteMembers) {
+ return new ArrayList<Address>(remoteMembers);
+ }
+ }
+
+ public void setDisp(ReplicatorRpcDispatcher<S> disp) {
+ this.disp = disp;
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args)
+ throws Throwable {
+ Short methodNum = methodMap.get(method);
+ if (methodNum == null || remoteMembers.isEmpty()) {
+ if (methodNum != null) {
+ Replicated annotation = method.getAnnotation(Replicated.class);
+ if (annotation != null && annotation.remoteOnly()) {
+ return null;
+ }
+ }
+ try {
+ return method.invoke(object, args);
+ } catch (InvocationTargetException e) {
+ throw e.getCause();
+ }
+ }
+ try {
+ Replicated annotation = method.getAnnotation(Replicated.class);
+ if (annotation.replicateState() != ReplicationMode.NONE) {
+ return handleReplicateState(method, args, annotation);
+ }
+ MethodCall call=new MethodCall(methodNum, args);
+ List<Address> dests = null;
+ if (annotation.remoteOnly()) {
+ dests = getRemoteMembersCopy();
+ if (dests.isEmpty()) {
+ return null;
+ }
+ }
+ RspList<Object> responses = disp.callRemoteMethods(dests, call, new
RequestOptions().setMode(annotation.asynch()?ResponseMode.GET_NONE:ResponseMode.GET_ALL).setTimeout(annotation.timeout()).setAnycasting(dests
!= null));
+ if (annotation.asynch()) {
+ return null;
+ }
+ List<Object> results = responses.getResults();
+ if (method.getReturnType() == boolean.class) {
+ for (Object o : results) {
+ if (!Boolean.TRUE.equals(o)) {
+ return false;
+ }
+ }
+ return true;
+ } else if (method.getReturnType() == Collection.class) {
+ ArrayList<Object> result = new ArrayList<Object>();
+ for (Object o : results) {
+ result.addAll((Collection)o);
+ }
+ return results;
+ }
+ return null;
+ } catch(Exception e) {
+ throw new RuntimeException(method + " " + args + " failed",
e); //$NON-NLS-1$ //$NON-NLS-2$
+ }
+ }
+
+ protected Address whereIsState(Serializable stateId, long timeout) throws Exception {
+ if (remoteMembers.isEmpty()) {
+ return null;
+ }
+ RspList<Boolean> resp = this.disp.callRemoteMethods(getRemoteMembersCopy(), new
MethodCall((short)(methodMap.size() - 5), new Object[]{stateId}), new
RequestOptions(ResponseMode.GET_ALL, timeout));
+ Collection<Rsp<Boolean>> values = resp.values();
+ Rsp<Boolean> rsp = null;
+ for (Rsp<Boolean> response : values) {
+ if (Boolean.TRUE.equals(response.getValue())) {
+ rsp = response;
+ break;
+ }
+ }
+ if (rsp == null) {
+ return null;
+ }
+ return rsp.getSender();
+ }
+
+ private Object handleReplicateState(Method method, Object[] args,
+ Replicated annotation) throws IllegalAccessException,
+ Throwable, IOException, IllegalStateException, Exception {
+ Object result = null;
+ try {
+ result = method.invoke(object, args);
+ } catch (InvocationTargetException e) {
+ throw e.getCause();
+ }
+ ReplicatedObject ro = (ReplicatedObject)object;
+ Serializable stateId = (Serializable)args[0];
+ if (annotation.replicateState() == ReplicationMode.PUSH) {
+ if (!remoteMembers.isEmpty()) {
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "replicating
state", stateId); //$NON-NLS-1$
+ JGroupsOutputStream oStream = new JGroupsOutputStream(disp, null, stateId,
(short)(methodMap.size() - 3), true);
+ try {
+ ro.getState(stateId, oStream);
+ } finally {
+ oStream.close();
+ }
+ LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "sent state",
stateId); //$NON-NLS-1$
+ }
+ return result;
+ }
+ if (result != null) {
+ return result;
+ }
+ long timeout = annotation.timeout();
+ return pullState(method, args, stateId, timeout);
+ }
+
+ /**
+ * Pull the remote state. The method and args are optional
+ * to determine if the state has been made available.
+ */
+ Object pullState(Method method, Object[] args, Serializable stateId,
+ long timeout) throws Throwable {
+ Object result = null;
+ for (int i = 0; i < PULL_RETRIES; i++) {
+ Promise<Boolean> p = null;
+ boolean wait = true;
+ synchronized (loadingStates) {
+ p = loadingStates.get(stateId);
+ if (p == null) {
+ wait = false;
+ if (method != null) {
+ try {
+ result = method.invoke(object, args);
+ } catch (InvocationTargetException e) {
+ throw e.getCause();
+ }
+ if (result != null) {
+ return result;
+ }
+ }
+ p = new Promise<Boolean>();
+ loadingStates.put(stateId, p);
+ }
+ }
+ if (wait) {
+ p.getResult(timeout);
+ continue;
+ }
+ try {
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "pulling state",
stateId); //$NON-NLS-1$
+ Address addr = whereIsState(stateId, timeout);
+ if (addr == null) {
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "timeout exceeded or
first member"); //$NON-NLS-1$
+ break;
+ }
+ JGroupsInputStream is = new JGroupsInputStream(IO_TIMEOUT);
+ StreamingRunner runner = new StreamingRunner(object, stateId, is, p);
+ List<?> key = Arrays.asList(stateId, new AddressWrapper(addr));
+ disp.inputStreams.put(key, is);
+ executor.execute(runner);
+
+ this.disp.callRemoteMethod(addr, new MethodCall((short)(methodMap.size() - 4),
stateId, new AddressWrapper(this.disp.getChannel().getAddress())), new
RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(true));
+
+ Boolean fetched = p.getResult(timeout);
+
+ if (fetched != null) {
+ if (fetched) {
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "pulled state",
stateId); //$NON-NLS-1$
+ if (method !=null) {
+ try {
+ result = method.invoke(object, args);
+ } catch (InvocationTargetException e) {
+ throw e.getCause();
+ }
+ if (result != null) {
+ return result;
+ }
+ }
+ break;
+ }
+ LogManager.logWarning(LogConstants.CTX_RUNTIME,
RuntimePlugin.Util.gs(RuntimePlugin.Event.TEIID40102, object, stateId));
+ } else {
+ LogManager.logWarning(LogConstants.CTX_RUNTIME,
RuntimePlugin.Util.gs(RuntimePlugin.Event.TEIID40103, object, stateId));
+ }
+ } finally {
+ synchronized (loadingStates) {
+ loadingStates.remove(stateId);
+ }
+ }
+ }
+ return null; //could not fetch the remote state
+ }
+
+ @Override
+ public void viewAccepted(View newView) {
+ if (newView.getMembers() != null) {
+ synchronized (remoteMembers) {
+ remoteMembers.removeAll(newView.getMembers());
+ if (object instanceof ReplicatedObject<?> && !remoteMembers.isEmpty())
{
+ HashSet<Serializable> dropped = new HashSet<Serializable>();
+ for (Address address : remoteMembers) {
+ dropped.add(new AddressWrapper(address));
+ }
+ ((ReplicatedObject<?>)object).droppedMembers(dropped);
+ }
+ remoteMembers.clear();
+ remoteMembers.addAll(newView.getMembers());
+ remoteMembers.remove(this.disp.getChannel().getAddress());
+ }
+ }
+ }
+ }
+
+ private interface Streaming {
+ void sendState(Serializable id, AddressWrapper dest);
+ void createState(Serializable id);
+ void buildState(Serializable id, byte[] bytes);
+ void finishState(Serializable id);
+ }
+
+ //TODO: this should be configurable, or use a common executor
+ private transient Executor executor;
+ private transient ChannelFactory channelFactory;
+
+ public JGroupsObjectReplicator(ChannelFactory channelFactory, Executor executor) {
+ this.channelFactory = channelFactory;
+ this.executor = executor;
+ }
+
+ public void stop(Object object) {
+ if (object == null || !Proxy.isProxyClass(object.getClass())) {
+ return;
+ }
+ ReplicatedInvocationHandler<?> handler = (ReplicatedInvocationHandler<?>)
Proxy.getInvocationHandler(object);
+ Channel c = handler.disp.getChannel();
+ handler.disp.stop();
+ c.close();
+ }
+
+ @Override
+ public <T, S> T replicate(String mux_id,
+ Class<T> iface, final S object, long startTimeout) throws Exception {
+ Channel channel = channelFactory.createChannel(mux_id);
+
+ // To keep the order of methods same at all the nodes.
+ TreeMap<String, Method> methods = new TreeMap<String, Method>();
+ for (Method method : iface.getMethods()) {
+ if (method.getAnnotation(Replicated.class) == null) {
+ continue;
+ }
+ methods.put(method.toGenericString(), method);
+ }
+
+ final HashMap<Method, Short> methodMap = new HashMap<Method, Short>();
+ final ArrayList<Method> methodList = new ArrayList<Method>();
+
+ for (String method : methods.keySet()) {
+ methodList.add(methods.get(method));
+ methodMap.put(methods.get(method), (short)(methodList.size() - 1));
+ }
+
+ Method hasState = ReplicatedObject.class.getMethod(HAS_STATE, new Class<?>[]
{Serializable.class});
+ methodList.add(hasState);
+ methodMap.put(hasState, (short)(methodList.size() - 1));
+
+ Method sendState = JGroupsObjectReplicator.Streaming.class.getMethod(SEND_STATE, new
Class<?>[] {Serializable.class, AddressWrapper.class});
+ methodList.add(sendState);
+ methodMap.put(sendState, (short)(methodList.size() - 1));
+
+ //add in streaming methods
+ Method createState = JGroupsObjectReplicator.Streaming.class.getMethod(CREATE_STATE,
new Class<?>[] {Serializable.class});
+ methodList.add(createState);
+ methodMap.put(createState, (short)(methodList.size() - 1));
+ Method buildState = JGroupsObjectReplicator.Streaming.class.getMethod(BUILD_STATE, new
Class<?>[] {Serializable.class, byte[].class});
+ methodList.add(buildState);
+ methodMap.put(buildState, (short)(methodList.size() - 1));
+ Method finishState = JGroupsObjectReplicator.Streaming.class.getMethod(FINISH_STATE,
new Class<?>[] {Serializable.class});
+ methodList.add(finishState);
+ methodMap.put(finishState, (short)(methodList.size() - 1));
+
+ ReplicatedInvocationHandler<S> proxy = new
ReplicatedInvocationHandler<S>(object, methodMap);
+ /*
+ * TODO: could have an object implement streaming
+ * Override the normal handle method to support streaming
+ */
+ ReplicatorRpcDispatcher disp = new ReplicatorRpcDispatcher<S>(channel,
proxy, proxy, object, object, methodMap, methodList);
+
+ proxy.setDisp(disp);
+ disp.setMethodLookup(new MethodLookup() {
+ public Method findMethod(short id) {
+ return methodList.get(id);
+ }
+ });
+
+ T replicatedProxy = (T)
Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]
{iface}, proxy);
+ boolean success = false;
+ try {
+ channel.connect(mux_id);
+ if (object instanceof ReplicatedObject) {
+ ((ReplicatedObject)object).setAddress(new AddressWrapper(channel.getAddress()));
+ proxy.pullState(null, null, null, startTimeout);
+ }
+ success = true;
+ return replicatedProxy;
+ } catch (Throwable e) {
+ if (e instanceof Exception) {
+ throw (Exception)e;
+ }
+ throw new TeiidRuntimeException(RuntimePlugin.Event.TEIID40104, e);
+ } finally {
+ if (!success) {
+ channel.close();
+ } else {
+ synchronized (disp) {
+ //mark as initialized so that state can be pulled if needed
+ disp.initialized = true;
+ }
+ }
+ }
+ }
+
+ // This class is used so that the objects are loaded with the current classes class
loader
+ // rather than foreign class loader
+ static class ContextAwareMarshaller implements RpcDispatcher.Marshaller {
+ private ClassLoader classloader;
+
+ public ContextAwareMarshaller(ClassLoader classloader) {
+ this.classloader = classloader;
+ }
+
+ @Override
+ public Buffer objectToBuffer(Object obj) throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(baos);
+ out.writeObject(obj);
+ out.close();
+ return new Buffer(baos.toByteArray());
+ }
+
+ @Override
+ public Object objectFromBuffer(byte[] buf, int offset, int length) throws Exception {
+ ObjectInputStream in = new ObjectInputStreamWithClassloader(new
ByteArrayInputStream(buf, offset, length), this.classloader);
+ Object anObj = in.readObject();
+ in.close();
+ return anObj;
+ }
+ }
+
+}
Property changes on:
trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsObjectReplicator.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added: svn:mergeinfo
+
/branches/7.4.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java:3535-3555
/branches/7.6.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java:3673-3711,3754-3769
/branches/7.7.x/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java:3816-3868,3876-3915,3920-3983
/branches/8.0.x/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java:4002-4046,4048-4051
/branches/8.1.x/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java:4319-4342
/trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java:3507-3666
Copied: trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsOutputStream.java
(from rev 4449,
trunk/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java)
===================================================================
--- trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsOutputStream.java
(rev 0)
+++
trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsOutputStream.java 2012-09-17
18:31:05 UTC (rev 4454)
@@ -0,0 +1,104 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.replication.jgroups;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+import org.jgroups.Address;
+import org.jgroups.blocks.MethodCall;
+import org.jgroups.blocks.RequestOptions;
+import org.jgroups.blocks.ResponseMode;
+import org.jgroups.blocks.RpcDispatcher;
+import org.teiid.core.types.Streamable;
+
+public class JGroupsOutputStream extends OutputStream {
+
+ static final int CHUNK_SIZE=Streamable.STREAMING_BATCH_SIZE_IN_BYTES;
+
+ protected final RpcDispatcher disp;
+ protected final List<Address> dests;
+ protected final Serializable stateId;
+ protected final short methodOffset;
+
+ private volatile boolean closed=false;
+ private final byte[] buffer=new byte[CHUNK_SIZE];
+ private int index=0;
+
+ public JGroupsOutputStream(RpcDispatcher disp, List<Address> dests,
Serializable stateId, short methodOffset, boolean sendCreate) throws IOException {
+ this.disp=disp;
+ this.dests=dests;
+ this.stateId=stateId;
+ this.methodOffset = methodOffset;
+ if (sendCreate) {
+ try {
+ disp.callRemoteMethods(this.dests, new MethodCall(methodOffset, new Object[]
{stateId}), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(dests != null));
+ } catch(Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ public void close() throws IOException {
+ if(closed) {
+ return;
+ }
+ flush();
+ try {
+ disp.callRemoteMethods(dests, new MethodCall((short)(methodOffset + 2), new
Object[] {stateId}), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(dests !=
null));
+ } catch(Exception e) {
+ }
+ closed=true;
+ }
+
+ public void flush() throws IOException {
+ checkClosed();
+ try {
+ if(index == 0) {
+ return;
+ }
+ disp.callRemoteMethods(dests, new MethodCall((short)(methodOffset + 1), new
Object[] {stateId, Arrays.copyOf(buffer, index)}), new
RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(dests != null));
+ index=0;
+ } catch(Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ private void checkClosed() throws IOException {
+ if(closed) {
+ throw new IOException("output stream is closed"); //$NON-NLS-1$
+ }
+ }
+
+ public void write(int b) throws IOException {
+ checkClosed();
+ if(index >= buffer.length) {
+ flush();
+ }
+ buffer[index++]=(byte)b;
+ }
+
+}
\ No newline at end of file
Property changes on:
trunk/runtime/src/main/java/org/teiid/replication/jgroups/JGroupsOutputStream.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added: svn:mergeinfo
+
/branches/7.4.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java:3535-3555
/branches/7.6.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java:3673-3711,3754-3769
/branches/7.7.x/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java:3816-3868,3876-3915,3920-3983
/branches/8.0.x/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java:4002-4046,4048-4051
/branches/8.1.x/jboss-integration/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java:4319-4342
/trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java:3507-3666
Modified: trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedConfiguration.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedConfiguration.java 2012-09-17
18:26:52 UTC (rev 4453)
+++ trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedConfiguration.java 2012-09-17
18:31:05 UTC (rev 4454)
@@ -24,11 +24,14 @@
import java.io.IOException;
import java.util.List;
+import java.util.concurrent.Executors;
import javax.resource.spi.work.WorkManager;
import javax.transaction.TransactionManager;
import org.infinispan.manager.DefaultCacheManager;
+import org.jgroups.Channel;
+import org.jgroups.JChannel;
import org.teiid.cache.CacheFactory;
import org.teiid.cache.infinispan.InfinispanCacheFactory;
import org.teiid.core.TeiidRuntimeException;
@@ -36,6 +39,8 @@
import org.teiid.dqp.internal.process.TeiidExecutor;
import org.teiid.dqp.internal.process.ThreadReuseExecutor;
import org.teiid.query.ObjectReplicator;
+import org.teiid.replication.jgroups.ChannelFactory;
+import org.teiid.replication.jgroups.JGroupsObjectReplicator;
import org.teiid.security.SecurityHelper;
public class EmbeddedConfiguration extends DQPConfiguration {
@@ -50,6 +55,7 @@
private CacheFactory cacheFactory;
private int maxResultSetCacheStaleness = 60;
private String infinispanConfigFile = "infinispan-config.xml"; //$NON-NLS-1$
+ private String jgroupsConfigFile = "tcp.xml"; //$NON-NLS-1$
public SecurityHelper getSecurityHelper() {
return securityHelper;
@@ -76,12 +82,23 @@
public void setTransactionManager(TransactionManager transactionManager) {
this.transactionManager = transactionManager;
}
+
public ObjectReplicator getObjectReplicator() {
+ if (this.objectReplicator == null) {
+ this.objectReplicator = new JGroupsObjectReplicator(new ChannelFactory() {
+ @Override
+ public Channel createChannel(String id) throws Exception {
+ return new
JChannel(this.getClass().getClassLoader().getResource(getJgroupsConfigFile()));
+ }
+ }, Executors.newCachedThreadPool());
+ }
return objectReplicator;
}
+
public void setObjectReplicator(ObjectReplicator objectReplicator) {
this.objectReplicator = objectReplicator;
}
+
/**
* Sets the {@link WorkManager} to be used instead of a {@link ThreadReuseExecutor}.
* This means that Teiid will not own the processing threads and will not necessarily
be
@@ -151,5 +168,11 @@
}
public void setMaxResultSetCacheStaleness(int maxResultSetCacheStaleness) {
this.maxResultSetCacheStaleness = maxResultSetCacheStaleness;
+ }
+ public String getJgroupsConfigFile() {
+ return jgroupsConfigFile;
+ }
+ public void setJgroupsConfigFile(String jgroupsConfigFile) {
+ this.jgroupsConfigFile = jgroupsConfigFile;
}
}
Modified: trunk/runtime/src/main/java/org/teiid/runtime/RuntimePlugin.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/runtime/RuntimePlugin.java 2012-09-17 18:26:52
UTC (rev 4453)
+++ trunk/runtime/src/main/java/org/teiid/runtime/RuntimePlugin.java 2012-09-17 18:31:05
UTC (rev 4454)
@@ -112,5 +112,9 @@
TEIID40098,
TEIID40099,
TEIID40100,
+ TEIID40101,
+ TEIID40102,
+ TEIID40103,
+ TEIID40104
}
}
Modified: trunk/runtime/src/main/resources/org/teiid/runtime/i18n.properties
===================================================================
--- trunk/runtime/src/main/resources/org/teiid/runtime/i18n.properties 2012-09-17 18:26:52
UTC (rev 4453)
+++ trunk/runtime/src/main/resources/org/teiid/runtime/i18n.properties 2012-09-17 18:31:05
UTC (rev 4454)
@@ -100,4 +100,8 @@
TEIID40096=Waited {0} for VDB {1}.{2} to be deployed, but it never was. Please check to
see if the deployment is missing or is in error.
TEIID40097=Waited {0} for VDB {1}.{2} to be ACTIVE, but it never was. Please check
it's sources - {3}.
TEIID40098=Reached end of results; use hasNext() call to check if there are more results
before calling next()
-TEIID40099=Cache system has been shutdown
\ No newline at end of file
+TEIID40099=Cache system has been shutdown
+
+TEIID40101=error setting state {0}
+TEIID40102= {0} Failed to Pull {1}
+TEIID40103={0} timeout pulling {1}
\ No newline at end of file
Modified: trunk/test-integration/common/pom.xml
===================================================================
--- trunk/test-integration/common/pom.xml 2012-09-17 18:26:52 UTC (rev 4453)
+++ trunk/test-integration/common/pom.xml 2012-09-17 18:31:05 UTC (rev 4454)
@@ -39,18 +39,6 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
- <!--
- <dependency>
- <groupId>javax.enterprise</groupId>
- <artifactId>cdi-api</artifactId>
- <scope>test</scope>
- </dependency>
- -->
- <dependency>
- <groupId>org.jboss.as</groupId>
- <artifactId>jboss-as-clustering-jgroups</artifactId>
- <scope>provided</scope>
- </dependency>
</dependencies>
<profiles>
Modified:
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestReplication.java
===================================================================
---
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestReplication.java 2012-09-17
18:26:52 UTC (rev 4453)
+++
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestReplication.java 2012-09-17
18:31:05 UTC (rev 4454)
@@ -22,7 +22,8 @@
package org.teiid.systemmodel;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.sql.Connection;
import java.sql.ResultSet;
@@ -30,9 +31,8 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
+import java.util.concurrent.Executors;
-import org.jboss.as.clustering.jgroups.ChannelFactory;
-import org.jboss.as.server.ServerEnvironment;
import org.jgroups.Channel;
import org.jgroups.JChannel;
import org.junit.BeforeClass;
@@ -42,10 +42,11 @@
import org.teiid.jdbc.FakeServer;
import org.teiid.jdbc.FakeServer.DeployVDBParameter;
import org.teiid.metadata.FunctionMethod;
-import org.teiid.metadata.FunctionParameter;
import org.teiid.metadata.FunctionMethod.Determinism;
import org.teiid.metadata.FunctionMethod.PushDown;
-import org.teiid.replication.jboss.JGroupsObjectReplicator;
+import org.teiid.metadata.FunctionParameter;
+import org.teiid.replication.jgroups.ChannelFactory;
+import org.teiid.replication.jgroups.JGroupsObjectReplicator;
import org.teiid.runtime.EmbeddedConfiguration;
@SuppressWarnings("nls")
@@ -129,21 +130,8 @@
private FakeServer createServer() throws Exception {
FakeServer server = new FakeServer(false);
-
- JGroupsObjectReplicator jor = new JGroupsObjectReplicator(new ChannelFactory() {
- @Override
- public Channel createChannel(String id) throws Exception {
- return new
JChannel(this.getClass().getClassLoader().getResource("tcp.xml"));
- }
- @Override
- public ServerEnvironment getServerEnvironment() {
- return null;
- }
- });
-
EmbeddedConfiguration config = new EmbeddedConfiguration();
- config.setObjectReplicator(jor);
config.setInfinispanConfigFile(UnitTestUtil.getTestDataPath()+"/infinispan-replicated-config.xml");
server.start(config, true);