Author: shawkins
Date: 2011-11-22 21:22:18 -0500 (Tue, 22 Nov 2011)
New Revision: 3695
Modified:
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java
trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStore.java
trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java
trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViewReplication.java
Log:
TEIID-1720 upgrading to new jgroups. however partial state is not corrected yet and
initial state transfer may have to be rewritten
Modified:
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
===================================================================
---
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java 2011-11-23
02:18:58 UTC (rev 3694)
+++
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java 2011-11-23
02:22:18 UTC (rev 3695)
@@ -22,7 +22,11 @@
package org.teiid.replication.jboss;
+import java.io.Externalizable;
+import java.io.IOException;
import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.io.OutputStream;
import java.io.Serializable;
import java.lang.reflect.InvocationHandler;
@@ -44,13 +48,9 @@
import org.jboss.as.clustering.jgroups.ChannelFactory;
import org.jgroups.Address;
import org.jgroups.Channel;
-import org.jgroups.MembershipListener;
import org.jgroups.Message;
-import org.jgroups.MessageListener;
-import org.jgroups.Receiver;
import org.jgroups.ReceiverAdapter;
import org.jgroups.View;
-import org.jgroups.blocks.GroupRequest;
import org.jgroups.blocks.MethodCall;
import org.jgroups.blocks.MethodLookup;
import org.jgroups.blocks.RequestOptions;
@@ -61,6 +61,7 @@
import org.jgroups.util.Util;
import org.teiid.Replicated;
import org.teiid.Replicated.ReplicationMode;
+import org.teiid.core.util.ReflectionHelper;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
import org.teiid.query.ObjectReplicator;
@@ -68,6 +69,58 @@
public abstract class JGroupsObjectReplicator implements ObjectReplicator, Serializable
{
+ public static final class AddressWrapper implements Externalizable {
+
+ private Address address;
+
+ public AddressWrapper() {
+
+ }
+
+ public AddressWrapper(Address address) {
+ this.address = address;
+ }
+
+ @Override
+ public int hashCode() {
+ return address.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof AddressWrapper)) {
+ return false;
+ }
+ return address.equals(((AddressWrapper)obj).address);
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException,
+ ClassNotFoundException {
+ String className = in.readUTF();
+ try {
+ this.address = (Address) ReflectionHelper.create(className, null,
Thread.currentThread().getContextClassLoader());
+ this.address.readFrom(in);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeUTF(address.getClass().getName());
+ try {
+ address.writeTo(out);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ }
+
private static final long serialVersionUID = -6851804958313095166L;
private static final String CREATE_STATE = "createState"; //$NON-NLS-1$
private static final String BUILD_STATE = "buildState"; //$NON-NLS-1$
@@ -97,9 +150,8 @@
}
}
- private final static class ReplicatedInvocationHandler<S> implements
- InvocationHandler, Serializable, MessageListener, Receiver,
- MembershipListener {
+ private final static class ReplicatedInvocationHandler<S> extends ReceiverAdapter
implements
+ InvocationHandler, Serializable {
private static final long serialVersionUID = -2943462899945966103L;
private final S object;
@@ -172,7 +224,7 @@
LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "pulling state",
stateId); //$NON-NLS-1$
long timeout = annotation.timeout();
threadLocalPromise.set(new Promise<Boolean>());
- boolean getState = this.disp.getChannel().getState(null, stateId, timeout);
+ /*boolean getState = this.disp.getChannel().getState(null, stateId, timeout);
if (getState) {
Boolean loaded = threadLocalPromise.get().getResult(timeout);
if (Boolean.TRUE.equals(loaded)) {
@@ -182,7 +234,7 @@
}
} else {
LogManager.logInfo(LogConstants.CTX_RUNTIME, object + " first member or
timeout exceeded " + stateId); //$NON-NLS-1$
- }
+ }*/
LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "sent state",
stateId); //$NON-NLS-1$
return result;
}
@@ -193,7 +245,7 @@
dests = new Vector<Address>(remoteMembers);
}
}
- RspList responses = disp.callRemoteMethods(dests, call, new
RequestOptions().setMode(annotation.asynch()?ResponseMode.GET_NONE:ResponseMode.GET_ALL).setTimeout(annotation.timeout()));
+ RspList<Object> responses = disp.callRemoteMethods(dests, call, new
RequestOptions().setMode(annotation.asynch()?ResponseMode.GET_NONE:ResponseMode.GET_ALL).setTimeout(annotation.timeout()));
if (annotation.asynch()) {
return null;
}
@@ -224,7 +276,11 @@
synchronized (remoteMembers) {
remoteMembers.removeAll(newView.getMembers());
if (object instanceof ReplicatedObject && !remoteMembers.isEmpty()) {
- ((ReplicatedObject)object).droppedMembers(new
HashSet<Serializable>(remoteMembers));
+ HashSet<Serializable> dropped = new HashSet<Serializable>();
+ for (Address address : remoteMembers) {
+ dropped.add(new AddressWrapper(address));
+ }
+ ((ReplicatedObject)object).droppedMembers(dropped);
}
remoteMembers.clear();
remoteMembers.addAll(newView.getMembers());
@@ -353,10 +409,14 @@
if(log.isErrorEnabled()) log.error("message or message buffer is
null"); //$NON-NLS-1$
return null;
}
+
+ if (req.getSrc().equals(local_addr)) {
+ return null;
+ }
try {
body=req_marshaller != null?
- req_marshaller.objectFromByteBuffer(req.getBuffer(),
req.getOffset(), req.getLength())
+ req_marshaller.objectFromBuffer(req.getBuffer(), req.getOffset(),
req.getLength())
: req.getObject();
}
catch(Throwable e) {
@@ -381,7 +441,7 @@
throw new Exception("MethodCall uses ID=" +
method_call.getId() + ", but method_lookup has not been set"); //$NON-NLS-1$
//$NON-NLS-2$
if (method_call.getId() >= methodList.size() - 3) {
- Serializable address = req.getSrc();
+ Serializable address = new AddressWrapper(req.getSrc());
String stateId = (String)method_call.getArgs()[0];
List<?> key = Arrays.asList(stateId, address);
JGroupsInputStream is = inputStreams.get(key);
@@ -433,17 +493,15 @@
try {
channel.connect(mux_id);
if (object instanceof ReplicatedObject) {
- ((ReplicatedObject)object).setLocalAddress(channel.getAddress());
- boolean getState = channel.getState(null, startTimeout);
- if (getState) {
- Boolean loaded = proxy.state_promise.getResult(startTimeout);
- if (Boolean.TRUE.equals(loaded)) {
- LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "loaded");
//$NON-NLS-1$
- } else {
- LogManager.logWarning(LogConstants.CTX_RUNTIME, object + " load error or
timeout"); //$NON-NLS-1$
- }
+ ((ReplicatedObject)object).setAddress(new AddressWrapper(channel.getAddress()));
+ channel.getState(null, startTimeout);
+ Boolean loaded = proxy.state_promise.getResult(1);
+ if (loaded == null) {
+ LogManager.logInfo(LogConstants.CTX_RUNTIME, object + " timeout exceeded or
first member"); //$NON-NLS-1$
+ } else if (loaded) {
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "loaded");
//$NON-NLS-1$
} else {
- LogManager.logInfo(LogConstants.CTX_RUNTIME, object + " first member or timeout
exceeded"); //$NON-NLS-1$
+ LogManager.logWarning(LogConstants.CTX_RUNTIME, object + " load error");
//$NON-NLS-1$
}
}
success = true;
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-11-23
02:18:58 UTC (rev 3694)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-11-23
02:22:18 UTC (rev 3695)
@@ -1051,7 +1051,7 @@
}
@Override
- public void setLocalAddress(Serializable address) {
+ public void setAddress(Serializable address) {
}
@Override
Modified: trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java 2011-11-23 02:18:58
UTC (rev 3694)
+++ trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java 2011-11-23 02:22:18
UTC (rev 3695)
@@ -67,7 +67,7 @@
* Allows the replicator to set the local address from the channel
* @param address
*/
- void setLocalAddress(Serializable address);
+ void setAddress(Serializable address);
/**
* Called when members are dropped
Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStore.java 2011-11-23
02:18:58 UTC (rev 3694)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStore.java 2011-11-23
02:22:18 UTC (rev 3695)
@@ -49,7 +49,7 @@
TempTableStore getTempTableStore();
- Serializable getLocalAddress();
+ Serializable getAddress();
List<?> updateMatViewRow(String matTableName, List<?> tuple, boolean delete)
throws TeiidComponentException;
Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java 2011-11-23
02:18:58 UTC (rev 3694)
+++
trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java 2011-11-23
02:22:18 UTC (rev 3695)
@@ -352,12 +352,12 @@
//begin replication methods
@Override
- public void setLocalAddress(Serializable address) {
+ public void setAddress(Serializable address) {
this.localAddress = address;
}
@Override
- public Serializable getLocalAddress() {
+ public Serializable getAddress() {
return localAddress;
}
Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java 2011-11-23
02:18:58 UTC (rev 3694)
+++
trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java 2011-11-23
02:22:18 UTC (rev 3695)
@@ -284,7 +284,7 @@
String matTableName = metadata.getFullName(matTableId);
LogManager.logDetail(LogConstants.CTX_MATVIEWS, "processing refreshmatview
for", matViewName); //$NON-NLS-1$
boolean invalidate =
Boolean.TRUE.equals(((Constant)proc.getParameter(2).getExpression()).getValue());
- boolean needsLoading = globalStore.needsLoading(matTableName,
globalStore.getLocalAddress(), true, true, invalidate);
+ boolean needsLoading = globalStore.needsLoading(matTableName,
globalStore.getAddress(), true, true, invalidate);
if (!needsLoading) {
return CollectionTupleSource.createUpdateCountTupleSource(-1);
}
@@ -367,9 +367,9 @@
final MatTableInfo info = globalStore.getMatTableInfo(tableName);
boolean load = false;
while (!info.isUpToDate()) {
- load = globalStore.needsLoading(tableName, globalStore.getLocalAddress(), true,
false, false);
+ load = globalStore.needsLoading(tableName, globalStore.getAddress(), true, false,
false);
if (load) {
- load = globalStore.needsLoading(tableName, globalStore.getLocalAddress(), false,
false, false);
+ load = globalStore.needsLoading(tableName, globalStore.getAddress(), false, false,
false);
if (load) {
break;
}
Modified:
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViewReplication.java
===================================================================
---
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViewReplication.java 2011-11-23
02:18:58 UTC (rev 3694)
+++
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViewReplication.java 2011-11-23
02:22:18 UTC (rev 3695)
@@ -22,8 +22,7 @@
package org.teiid.systemmodel;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
import java.sql.Connection;
import java.sql.ResultSet;
@@ -33,22 +32,20 @@
import java.util.HashMap;
import org.jboss.as.clustering.jgroups.ChannelFactory;
-import org.jboss.as.clustering.jgroups.JChannelFactory;
import org.jgroups.Channel;
+import org.jgroups.JChannel;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.teiid.core.types.DataTypeManager;
import org.teiid.core.util.UnitTestUtil;
import org.teiid.jdbc.FakeServer;
import org.teiid.metadata.FunctionMethod;
+import org.teiid.metadata.FunctionParameter;
import org.teiid.metadata.FunctionMethod.Determinism;
import org.teiid.metadata.FunctionMethod.PushDown;
-import org.teiid.metadata.FunctionParameter;
import org.teiid.replication.jboss.JGroupsObjectReplicator;
@SuppressWarnings("nls")
-@Ignore
public class TestMatViewReplication {
private static final String MATVIEWS = "matviews";
@@ -114,6 +111,7 @@
server2.stop();
}
+ @SuppressWarnings("serial")
private FakeServer createServer() throws Exception {
FakeServer server = new FakeServer();
@@ -123,9 +121,7 @@
return new ChannelFactory() {
@Override
public Channel createChannel(String id) throws Exception {
- JChannelFactory jcf = new JChannelFactory();
-
jcf.setMultiplexerConfig(this.getClass().getClassLoader().getResource("stacks.xml"));
//$NON-NLS-1$
- return jcf.createMultiplexerChannel("tcp", id);
+ return new
JChannel(this.getClass().getClassLoader().getResource("tcp.xml"));
}
};
}