teiid SVN: r3703 - in trunk: documentation/admin-guide/src/main/docbook/en-US/content and 2 other directories.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2011-11-28 15:04:04 -0500 (Mon, 28 Nov 2011)
New Revision: 3703
Modified:
trunk/common-core/src/main/java/org/teiid/core/util/TimestampWithTimezone.java
trunk/documentation/admin-guide/src/main/docbook/en-US/content/appendix-c.xml
trunk/engine/src/main/java/org/teiid/query/function/FunctionMethods.java
trunk/engine/src/test/java/org/teiid/query/function/TestFunctionMethods.java
Log:
TEIID-1841 adding system property to drive iso 8601 behavior
Modified: trunk/common-core/src/main/java/org/teiid/core/util/TimestampWithTimezone.java
===================================================================
--- trunk/common-core/src/main/java/org/teiid/core/util/TimestampWithTimezone.java 2011-11-28 18:50:08 UTC (rev 3702)
+++ trunk/common-core/src/main/java/org/teiid/core/util/TimestampWithTimezone.java 2011-11-28 20:04:04 UTC (rev 3703)
@@ -46,9 +46,12 @@
*/
public class TimestampWithTimezone {
+ public static final String ISO8601_WEEK_PROP = "org.teiid.iso8601Week"; //$NON-NLS-1$
+ public static boolean ISO8601_WEEK = PropertiesUtils.getBooleanProperty(System.getProperties(), ISO8601_WEEK_PROP, false);
+
private static ThreadLocal<Calendar> CALENDAR = new ThreadLocal<Calendar>() {
protected Calendar initialValue() {
- return Calendar.getInstance();
+ return initialCalendar();
}
};
@@ -58,8 +61,17 @@
public static void resetCalendar(TimeZone tz) {
TimeZone.setDefault(tz);
- CALENDAR.set(Calendar.getInstance());
+ CALENDAR.set(initialCalendar());
}
+
+ static Calendar initialCalendar() {
+ Calendar result = Calendar.getInstance();
+ if (ISO8601_WEEK) {
+ result.setMinimalDaysInFirstWeek(4);
+ result.setFirstDayOfWeek(Calendar.MONDAY);
+ }
+ return result;
+ }
public static Object create(java.util.Date date, TimeZone initial, Calendar target, Class<?> type) {
if (type.equals(DataTypeManager.DefaultDataClasses.TIME)) {
Modified: trunk/documentation/admin-guide/src/main/docbook/en-US/content/appendix-c.xml
===================================================================
--- trunk/documentation/admin-guide/src/main/docbook/en-US/content/appendix-c.xml 2011-11-28 18:50:08 UTC (rev 3702)
+++ trunk/documentation/admin-guide/src/main/docbook/en-US/content/appendix-c.xml 2011-11-28 20:04:04 UTC (rev 3703)
@@ -35,5 +35,10 @@
Target size in bytes of the ODBC results buffer. This is not a hard maximum, lobs and wide rows may use larger buffers.
</para>
</listitem>
+ <listitem>
+ <para><emphasis>org.teiid.iso8601Week</emphasis> - defaults to false.
+ Set to true to use ISO 8601 rules for week calculations regardless of the locale. When true the dayOfWeek function will begin with 1 for MONDAY rather than SUNDAY, and the week function will require that week 1 of a year contains the year's first Thursday.
+ </para>
+ </listitem>
</itemizedlist>
</appendix>
\ No newline at end of file
Modified: trunk/engine/src/main/java/org/teiid/query/function/FunctionMethods.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/function/FunctionMethods.java 2011-11-28 18:50:08 UTC (rev 3702)
+++ trunk/engine/src/main/java/org/teiid/query/function/FunctionMethods.java 2011-11-28 20:04:04 UTC (rev 3703)
@@ -390,8 +390,16 @@
// ================== Function = dayofweek =====================
- public static Object dayOfWeek(Date x) {
- return Integer.valueOf(getField(x, Calendar.DAY_OF_WEEK));
+ public static int dayOfWeek(Date x) {
+ int result = getField(x, Calendar.DAY_OF_WEEK);
+ if (TimestampWithTimezone.ISO8601_WEEK) {
+ result -= 1;
+ if (result == 0) {
+ return 7;
+ }
+ return result;
+ }
+ return getField(x, Calendar.DAY_OF_WEEK);
}
// ================== Function = dayofyear =====================
@@ -436,8 +444,8 @@
// ================== Function = week =====================
- public static Object week(Date x) {
- return Integer.valueOf(getField(x, Calendar.WEEK_OF_YEAR));
+ public static int week(Date x) {
+ return getField(x, Calendar.WEEK_OF_YEAR);
}
// ================== Function = year =====================
Modified: trunk/engine/src/test/java/org/teiid/query/function/TestFunctionMethods.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/function/TestFunctionMethods.java 2011-11-28 18:50:08 UTC (rev 3702)
+++ trunk/engine/src/test/java/org/teiid/query/function/TestFunctionMethods.java 2011-11-28 20:04:04 UTC (rev 3703)
@@ -24,11 +24,25 @@
import static org.junit.Assert.*;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
+import org.teiid.core.util.TimestampWithTimezone;
+import org.teiid.query.unittest.TimestampUtil;
@SuppressWarnings("nls")
public class TestFunctionMethods {
+ @BeforeClass public static void oneTimeSetup() {
+ TimestampWithTimezone.ISO8601_WEEK = true;
+ TimestampWithTimezone.resetCalendar(null);
+ }
+
+ @AfterClass public static void oneTimeTearDown() {
+ TimestampWithTimezone.ISO8601_WEEK = false;
+ TimestampWithTimezone.resetCalendar(null);
+ }
+
@Test public void testUnescape() {
assertEquals("a\t\n\n%6", FunctionMethods.unescape("a\\t\\n\\012\\456"));
}
@@ -36,5 +50,18 @@
@Test public void testUnescape1() {
assertEquals("a\u45AA'", FunctionMethods.unescape("a\\u45Aa\'"));
}
+
+ @Test public void testIso8601Week() {
+ assertEquals(53, FunctionMethods.week(TimestampUtil.createDate(105, 0, 1)));
+ }
+
+ @Test public void testIso8601Week1() {
+ assertEquals(52, FunctionMethods.week(TimestampUtil.createDate(106, 0, 1)));
+ }
+
+ @Test public void testIso8601Week2() {
+ assertEquals(1, FunctionMethods.dayOfWeek(TimestampUtil.createDate(111, 10, 28)));
+ }
+
}
13 years, 5 months
teiid SVN: r3702 - trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2011-11-28 13:50:08 -0500 (Mon, 28 Nov 2011)
New Revision: 3702
Modified:
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java
Log:
TEIID-1673 update to any cast logic and removing unnecessary remote calls
Modified: trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
===================================================================
--- trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java 2011-11-28 18:49:16 UTC (rev 3701)
+++ trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java 2011-11-28 18:50:08 UTC (rev 3702)
@@ -117,8 +117,9 @@
if(log.isTraceEnabled())
log.trace("[sender=" + req.getSrc() + "], method_call: " + method_call); //$NON-NLS-1$ //$NON-NLS-2$
- if(method_lookup == null)
- throw new Exception("MethodCall uses ID=" + method_call.getId() + ", but method_lookup has not been set"); //$NON-NLS-1$ //$NON-NLS-2$
+ if (method_call.getId() >= methodList.size() - 5 && req.getSrc().equals(local_addr)) {
+ return null;
+ }
if (method_call.getId() >= methodList.size() - 3) {
Serializable address = new AddressWrapper(req.getSrc());
@@ -292,6 +293,9 @@
List<Address> dests = null;
if (annotation.remoteOnly()) {
dests = getRemoteMembersCopy();
+ if (dests.isEmpty()) {
+ return null;
+ }
}
RspList<Object> responses = disp.callRemoteMethods(dests, call, new RequestOptions().setMode(annotation.asynch()?ResponseMode.GET_NONE:ResponseMode.GET_ALL).setTimeout(annotation.timeout()).setAnycasting(dests != null));
if (annotation.asynch()) {
@@ -346,18 +350,19 @@
} catch (InvocationTargetException e) {
throw e.getCause();
}
- List<Address> dests = getRemoteMembersCopy();
ReplicatedObject ro = (ReplicatedObject)object;
Serializable stateId = (Serializable)args[0];
if (annotation.replicateState() == ReplicationMode.PUSH) {
- LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "replicating state", stateId); //$NON-NLS-1$
- JGroupsOutputStream oStream = new JGroupsOutputStream(disp, dests, stateId, (short)(methodMap.size() - 3), true);
- try {
- ro.getState(stateId, oStream);
- } finally {
- oStream.close();
+ if (!remoteMembers.isEmpty()) {
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "replicating state", stateId); //$NON-NLS-1$
+ JGroupsOutputStream oStream = new JGroupsOutputStream(disp, null, stateId, (short)(methodMap.size() - 3), true);
+ try {
+ ro.getState(stateId, oStream);
+ } finally {
+ oStream.close();
+ }
+ LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "sent state", stateId); //$NON-NLS-1$
}
- LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "sent state", stateId); //$NON-NLS-1$
return result;
}
if (result != null) {
Modified: trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java
===================================================================
--- trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java 2011-11-28 18:49:16 UTC (rev 3701)
+++ trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java 2011-11-28 18:50:08 UTC (rev 3702)
@@ -55,7 +55,7 @@
this.methodOffset = methodOffset;
if (sendCreate) {
try {
- disp.callRemoteMethods(this.dests, new MethodCall(methodOffset, new Object[] {stateId}), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(true));
+ disp.callRemoteMethods(this.dests, new MethodCall(methodOffset, new Object[] {stateId}), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(dests != null));
} catch(Exception e) {
throw new IOException(e);
}
@@ -68,7 +68,7 @@
}
flush();
try {
- disp.callRemoteMethods(dests, new MethodCall((short)(methodOffset + 2), new Object[] {stateId}), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(true));
+ disp.callRemoteMethods(dests, new MethodCall((short)(methodOffset + 2), new Object[] {stateId}), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(dests != null));
} catch(Exception e) {
}
closed=true;
@@ -80,7 +80,7 @@
if(index == 0) {
return;
}
- disp.callRemoteMethods(dests, new MethodCall((short)(methodOffset + 1), new Object[] {stateId, Arrays.copyOf(buffer, index)}), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(true));
+ disp.callRemoteMethods(dests, new MethodCall((short)(methodOffset + 1), new Object[] {stateId, Arrays.copyOf(buffer, index)}), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(dests != null));
index=0;
} catch(Exception e) {
throw new IOException(e);
13 years, 5 months
teiid SVN: r3701 - branches/7.6.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2011-11-28 13:49:16 -0500 (Mon, 28 Nov 2011)
New Revision: 3701
Modified:
branches/7.6.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
branches/7.6.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java
Log:
TEIID-1673 update to any cast logic and removing unnecessary remote calls
Modified: branches/7.6.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
===================================================================
--- branches/7.6.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java 2011-11-28 18:05:45 UTC (rev 3700)
+++ branches/7.6.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java 2011-11-28 18:49:16 UTC (rev 3701)
@@ -140,30 +140,31 @@
} catch (InvocationTargetException e) {
throw e.getCause();
}
- Vector<Address> dests = null;
- synchronized (remoteMembers) {
- dests = new Vector<Address>(remoteMembers);
+ if (!remoteMembers.isEmpty()) {
+ ReplicatedObject ro = (ReplicatedObject)object;
+ String stateId = (String)args[0];
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "replicating state", stateId); //$NON-NLS-1$
+ JGroupsOutputStream oStream = new JGroupsOutputStream(disp, null, stateId, (short)(methodMap.size() - 3));
+ try {
+ ro.getState(stateId, oStream);
+ } finally {
+ oStream.close();
+ }
+ LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "sent state", stateId); //$NON-NLS-1$
}
- ReplicatedObject ro = (ReplicatedObject)object;
- String stateId = (String)args[0];
- LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "replicating state", stateId); //$NON-NLS-1$
- JGroupsOutputStream oStream = new JGroupsOutputStream(disp, dests, stateId, (short)(methodMap.size() - 3));
- try {
- ro.getState(stateId, oStream);
- } finally {
- oStream.close();
- }
- LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "sent state", stateId); //$NON-NLS-1$
return result;
}
MethodCall call=new MethodCall(methodNum, args);
Vector<Address> dests = null;
if (annotation.remoteOnly()) {
synchronized (remoteMembers) {
+ if (remoteMembers.isEmpty()) {
+ return null;
+ }
dests = new Vector<Address>(remoteMembers);
}
}
- RspList responses = disp.callRemoteMethods(dests, call, annotation.asynch()?GroupRequest.GET_NONE:GroupRequest.GET_ALL, annotation.timeout());
+ RspList responses = disp.callRemoteMethods(dests, call, annotation.asynch()?GroupRequest.GET_NONE:GroupRequest.GET_ALL, annotation.timeout(), dests != null);
if (annotation.asynch()) {
return null;
}
@@ -184,7 +185,7 @@
}
return null;
} catch(Exception e) {
- throw new RuntimeException(method + " " + args + " failed"); //$NON-NLS-1$ //$NON-NLS-2$
+ throw new RuntimeException(method + " " + args + " failed", e); //$NON-NLS-1$ //$NON-NLS-2$
}
}
@@ -374,10 +375,10 @@
if(log.isTraceEnabled())
log.trace("[sender=" + req.getSrc() + "], method_call: " + method_call); //$NON-NLS-1$ //$NON-NLS-2$
- if(method_lookup == null)
- throw new Exception("MethodCall uses ID=" + method_call.getId() + ", but method_lookup has not been set"); //$NON-NLS-1$ //$NON-NLS-2$
-
if (method_call.getId() >= methodList.size() - 3) {
+ if (req.getSrc().equals(local_addr)) {
+ return null;
+ }
Serializable address = req.getSrc();
String stateId = (String)method_call.getArgs()[0];
List<?> key = Arrays.asList(stateId, address);
Modified: branches/7.6.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java
===================================================================
--- branches/7.6.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java 2011-11-28 18:05:45 UTC (rev 3700)
+++ branches/7.6.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java 2011-11-28 18:49:16 UTC (rev 3701)
@@ -51,7 +51,7 @@
this.dests=dests;
this.stateId=stateId;
this.methodOffset = methodOffset;
- disp.callRemoteMethods(this.dests, new MethodCall(methodOffset, new Object[] {stateId}), GroupRequest.GET_NONE, 0);
+ disp.callRemoteMethods(this.dests, new MethodCall(methodOffset, new Object[] {stateId}), GroupRequest.GET_NONE, 0, dests != null);
}
public void close() throws IOException {
@@ -60,7 +60,7 @@
}
flush();
try {
- disp.callRemoteMethods(dests, new MethodCall((short)(methodOffset + 2), new Object[] {stateId}), GroupRequest.GET_NONE, 0);
+ disp.callRemoteMethods(dests, new MethodCall((short)(methodOffset + 2), new Object[] {stateId}), GroupRequest.GET_NONE, 0, dests != null);
} catch(Exception e) {
}
closed=true;
@@ -72,7 +72,7 @@
if(index == 0) {
return;
}
- disp.callRemoteMethods(dests, new MethodCall((short)(methodOffset + 1), new Object[] {stateId, Arrays.copyOf(buffer, index)}), GroupRequest.GET_NONE, 0);
+ disp.callRemoteMethods(dests, new MethodCall((short)(methodOffset + 1), new Object[] {stateId, Arrays.copyOf(buffer, index)}), GroupRequest.GET_NONE, 0, dests != null);
index=0;
} catch(Exception e) {
throw new IOException(e);
13 years, 5 months
teiid SVN: r3700 - in trunk: engine/src/main/java/org/teiid/query and 1 other directory.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2011-11-28 13:05:45 -0500 (Mon, 28 Nov 2011)
New Revision: 3700
Modified:
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java
Log:
TEIID-1837 a check is performed for both initial and partial state transfers to determine the pull target
Modified: trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
===================================================================
--- trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java 2011-11-23 19:43:50 UTC (rev 3699)
+++ trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java 2011-11-28 18:05:45 UTC (rev 3700)
@@ -23,8 +23,6 @@
package org.teiid.replication.jboss;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.io.Serializable;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
@@ -33,6 +31,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -57,18 +56,22 @@
import org.jgroups.util.Promise;
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
-import org.jgroups.util.Util;
import org.teiid.Replicated;
import org.teiid.Replicated.ReplicationMode;
+import org.teiid.core.TeiidRuntimeException;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
import org.teiid.query.ObjectReplicator;
import org.teiid.query.ReplicatedObject;
+@SuppressWarnings("unchecked")
public class JGroupsObjectReplicator implements ObjectReplicator, Serializable {
-
+
+ private static final int IO_TIMEOUT = 15000;
+
private final class ReplicatorRpcDispatcher<S> extends RpcDispatcher {
private final S object;
+ private boolean initialized;
private final HashMap<Method, Short> methodMap;
private final ArrayList<Method> methodList;
Map<List<?>, JGroupsInputStream> inputStreams = new ConcurrentHashMap<List<?>, JGroupsInputStream>();
@@ -127,7 +130,7 @@
if (is != null) {
is.receive(null);
}
- is = new JGroupsInputStream(15000);
+ is = new JGroupsInputStream(IO_TIMEOUT);
this.inputStreams.put(key, is);
executor.execute(new StreamingRunner(object, stateId, is, null));
} else if (method_call.getId() == methodList.size() - 2) {
@@ -148,6 +151,15 @@
ReplicatedObject ro = (ReplicatedObject)object;
Serializable stateId = (Serializable)method_call.getArgs()[0];
+ if (stateId == null) {
+ synchronized (this) {
+ if (initialized) {
+ return Boolean.TRUE;
+ }
+ return null;
+ }
+ }
+
if (ro.hasState(stateId)) {
return Boolean.TRUE;
}
@@ -160,7 +172,11 @@
JGroupsOutputStream oStream = new JGroupsOutputStream(this, Arrays.asList(dest.address), stateId, (short)(methodMap.size() - 3), false);
try {
- ro.getState(stateId, oStream);
+ if (stateId == null) {
+ ro.getState(oStream);
+ } else {
+ ro.getState(stateId, oStream);
+ }
} finally {
oStream.close();
}
@@ -204,11 +220,15 @@
@Override
public void run() {
try {
- ((ReplicatedObject)object).setState(stateId, is);
+ if (stateId == null) {
+ ((ReplicatedObject<?>)object).setState(is);
+ } else {
+ ((ReplicatedObject)object).setState(stateId, is);
+ }
if (promise != null) {
promise.setResult(Boolean.TRUE);
}
- LogManager.logDetail(LogConstants.CTX_RUNTIME, "state set " + stateId); //$NON-NLS-1$
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, "state set", stateId); //$NON-NLS-1$
} catch (Exception e) {
if (promise != null) {
promise.setResult(Boolean.FALSE);
@@ -228,8 +248,7 @@
private final S object;
private transient ReplicatorRpcDispatcher<S> disp;
private final HashMap<Method, Short> methodMap;
- protected List<Address> remoteMembers = new ArrayList<Address>();
- protected final transient Promise<Boolean> state_promise=new Promise<Boolean>();
+ protected List<Address> remoteMembers = Collections.synchronizedList(new ArrayList<Address>());
private Map<Serializable, Promise<Boolean>> loadingStates = new HashMap<Serializable, Promise<Boolean>>();
private ReplicatedInvocationHandler(S object,HashMap<Method, Short> methodMap) {
@@ -237,6 +256,12 @@
this.methodMap = methodMap;
}
+ List<Address> getRemoteMembersCopy() {
+ synchronized (remoteMembers) {
+ return new ArrayList<Address>(remoteMembers);
+ }
+ }
+
public void setDisp(ReplicatorRpcDispatcher<S> disp) {
this.disp = disp;
}
@@ -264,11 +289,9 @@
return handleReplicateState(method, args, annotation);
}
MethodCall call=new MethodCall(methodNum, args);
- ArrayList<Address> dests = null;
+ List<Address> dests = null;
if (annotation.remoteOnly()) {
- synchronized (remoteMembers) {
- dests = new ArrayList<Address>(remoteMembers);
- }
+ dests = getRemoteMembersCopy();
}
RspList<Object> responses = disp.callRemoteMethods(dests, call, new RequestOptions().setMode(annotation.asynch()?ResponseMode.GET_NONE:ResponseMode.GET_ALL).setTimeout(annotation.timeout()).setAnycasting(dests != null));
if (annotation.asynch()) {
@@ -294,6 +317,25 @@
throw new RuntimeException(method + " " + args + " failed", e); //$NON-NLS-1$ //$NON-NLS-2$
}
}
+
+ protected Address whereIsState(Serializable stateId, long timeout) throws Exception {
+ if (remoteMembers.isEmpty()) {
+ return null;
+ }
+ RspList<Boolean> resp = this.disp.callRemoteMethods(getRemoteMembersCopy(), new MethodCall((short)(methodMap.size() - 5), new Object[]{stateId}), new RequestOptions(ResponseMode.GET_ALL, timeout));
+ Collection<Rsp<Boolean>> values = resp.values();
+ Rsp<Boolean> rsp = null;
+ for (Rsp<Boolean> response : values) {
+ if (Boolean.TRUE.equals(response.getValue())) {
+ rsp = response;
+ break;
+ }
+ }
+ if (rsp == null) {
+ return null;
+ }
+ return rsp.getSender();
+ }
private Object handleReplicateState(Method method, Object[] args,
Replicated annotation) throws IllegalAccessException,
@@ -304,10 +346,7 @@
} catch (InvocationTargetException e) {
throw e.getCause();
}
- List<Address> dests = null;
- synchronized (remoteMembers) {
- dests = new ArrayList<Address>(remoteMembers);
- }
+ List<Address> dests = getRemoteMembersCopy();
ReplicatedObject ro = (ReplicatedObject)object;
Serializable stateId = (Serializable)args[0];
if (annotation.replicateState() == ReplicationMode.PUSH) {
@@ -324,9 +363,17 @@
if (result != null) {
return result;
}
- if (!(object instanceof ReplicatedObject)) {
- throw new IllegalStateException("A non-ReplicatedObject cannot use state pulling."); //$NON-NLS-1$
- }
+ long timeout = annotation.timeout();
+ return pullState(method, args, stateId, timeout);
+ }
+
+ /**
+ * Pull the remote state. The method and args are optional
+ * to determine if the state has been made available.
+ */
+ Object pullState(Method method, Object[] args, Serializable stateId,
+ long timeout) throws Throwable {
+ Object result = null;
for (int i = 0; i < PULL_RETRIES; i++) {
Promise<Boolean> p = null;
boolean wait = true;
@@ -334,61 +381,60 @@
p = loadingStates.get(stateId);
if (p == null) {
wait = false;
- try {
- result = method.invoke(object, args);
- } catch (InvocationTargetException e) {
- throw e.getCause();
+ if (method != null) {
+ try {
+ result = method.invoke(object, args);
+ } catch (InvocationTargetException e) {
+ throw e.getCause();
+ }
+ if (result != null) {
+ return result;
+ }
}
- if (result != null) {
- return result;
- }
p = new Promise<Boolean>();
loadingStates.put(stateId, p);
}
}
- long timeout = annotation.timeout();
if (wait) {
p.getResult(timeout);
continue;
}
try {
LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "pulling state", stateId); //$NON-NLS-1$
- RspList<Boolean> resp = this.disp.callRemoteMethods(null, new MethodCall((short)(methodMap.size() - 5), stateId), new RequestOptions(ResponseMode.GET_ALL, timeout));
- Collection<Rsp<Boolean>> values = resp.values();
- Rsp<Boolean> rsp = null;
- for (Rsp<Boolean> response : values) {
- if (Boolean.TRUE.equals(response.getValue())) {
- rsp = response;
- break;
- }
- }
- if (rsp == null || this.disp.getChannel().getAddress().equals(rsp.getSender())) {
+ Address addr = whereIsState(stateId, timeout);
+ if (addr == null) {
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "timeout exceeded or first member"); //$NON-NLS-1$
break;
}
- JGroupsInputStream is = new JGroupsInputStream(15000);
+ JGroupsInputStream is = new JGroupsInputStream(IO_TIMEOUT);
StreamingRunner runner = new StreamingRunner(object, stateId, is, p);
- List<?> key = Arrays.asList(stateId, new AddressWrapper(rsp.getSender()));
+ List<?> key = Arrays.asList(stateId, new AddressWrapper(addr));
disp.inputStreams.put(key, is);
executor.execute(runner);
- this.disp.callRemoteMethod(rsp.getSender(), new MethodCall((short)(methodMap.size() - 4), stateId, new AddressWrapper(this.disp.getChannel().getAddress())), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(true));
+ this.disp.callRemoteMethod(addr, new MethodCall((short)(methodMap.size() - 4), stateId, new AddressWrapper(this.disp.getChannel().getAddress())), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(true));
Boolean fetched = p.getResult(timeout);
if (fetched != null) {
if (fetched) {
LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "pulled state", stateId); //$NON-NLS-1$
- } else {
- LogManager.logWarning(LogConstants.CTX_RUNTIME, object + " failed to pull " + stateId); //$NON-NLS-1$
- }
+ if (method !=null) {
+ try {
+ result = method.invoke(object, args);
+ } catch (InvocationTargetException e) {
+ throw e.getCause();
+ }
+ if (result != null) {
+ return result;
+ }
+ }
+ break;
+ }
+ LogManager.logWarning(LogConstants.CTX_RUNTIME, object + " failed to pull " + stateId); //$NON-NLS-1$
} else {
LogManager.logWarning(LogConstants.CTX_RUNTIME, object + " timeout pulling " + stateId); //$NON-NLS-1$
}
- try {
- result = method.invoke(object, args);
- } catch (InvocationTargetException e) {
- throw e.getCause();
- }
} finally {
synchronized (loadingStates) {
loadingStates.remove(stateId);
@@ -403,12 +449,12 @@
if (newView.getMembers() != null) {
synchronized (remoteMembers) {
remoteMembers.removeAll(newView.getMembers());
- if (object instanceof ReplicatedObject && !remoteMembers.isEmpty()) {
+ if (object instanceof ReplicatedObject<?> && !remoteMembers.isEmpty()) {
HashSet<Serializable> dropped = new HashSet<Serializable>();
for (Address address : remoteMembers) {
dropped.add(new AddressWrapper(address));
}
- ((ReplicatedObject)object).droppedMembers(dropped);
+ ((ReplicatedObject<?>)object).droppedMembers(dropped);
}
remoteMembers.clear();
remoteMembers.addAll(newView.getMembers());
@@ -416,33 +462,6 @@
}
}
}
-
- @Override
- public void setState(InputStream istream) {
- LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "loading initial state"); //$NON-NLS-1$
- try {
- ((ReplicatedObject)object).setState(istream);
- state_promise.setResult(Boolean.TRUE);
- } catch (Exception e) {
- state_promise.setResult(Boolean.FALSE);
- LogManager.logError(LogConstants.CTX_RUNTIME, e, "error loading initial state"); //$NON-NLS-1$
- } finally {
- Util.close(istream);
- }
- }
-
- @Override
- public void getState(OutputStream ostream) {
- LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "getting initial state"); //$NON-NLS-1$
- try {
- ((ReplicatedObject)object).getState(ostream);
- } catch (Exception e) {
- LogManager.logError(LogConstants.CTX_RUNTIME, e, "error gettting initial state"); //$NON-NLS-1$
- } finally {
- Util.close(ostream);
- }
- }
-
}
private interface Streaming {
@@ -470,7 +489,6 @@
c.close();
}
- @SuppressWarnings("unchecked")
@Override
public <T, S> T replicate(String mux_id,
Class<T> iface, final S object, long startTimeout) throws Exception {
@@ -528,23 +546,24 @@
channel.connect(mux_id);
if (object instanceof ReplicatedObject) {
((ReplicatedObject)object).setAddress(new AddressWrapper(channel.getAddress()));
- channel.getState(null, startTimeout);
- Boolean loaded = proxy.state_promise.getResult(1);
- if (loaded == null) {
- LogManager.logInfo(LogConstants.CTX_RUNTIME, object + " timeout exceeded or first member"); //$NON-NLS-1$
- } else if (loaded) {
- LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "loaded"); //$NON-NLS-1$
- } else {
- LogManager.logWarning(LogConstants.CTX_RUNTIME, object + " load error"); //$NON-NLS-1$
- }
+ proxy.pullState(null, null, null, startTimeout);
}
success = true;
return replicatedProxy;
+ } catch (Throwable e) {
+ if (e instanceof Exception) {
+ throw (Exception)e;
+ }
+ throw new TeiidRuntimeException(e);
} finally {
if (!success) {
channel.close();
+ } else {
+ synchronized (disp) {
+ //mark as initialized so that state can be pulled if needed
+ disp.initialized = true;
+ }
}
}
}
-
}
Modified: trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java 2011-11-23 19:43:50 UTC (rev 3699)
+++ trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java 2011-11-28 18:05:45 UTC (rev 3700)
@@ -75,6 +75,11 @@
*/
void droppedMembers(Collection<Serializable> addresses);
+ /**
+ * Return true if the object has the given state
+ * @param state_id
+ * @return
+ */
boolean hasState(K state_id);
}
13 years, 5 months
teiid SVN: r3699 - in trunk: cache-jbosscache/src/main/java/org/teiid/cache/jboss and 11 other directories.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2011-11-23 14:43:50 -0500 (Wed, 23 Nov 2011)
New Revision: 3699
Added:
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/AddressWrapper.java
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestReplication.java
Removed:
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViewReplication.java
Modified:
trunk/api/src/main/java/org/teiid/Replicated.java
trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/ClusterableCacheFactory.java
trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCache.java
trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheFactory.java
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java
trunk/engine/src/main/java/org/teiid/cache/Cachable.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedPlan.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java
trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java
trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestSessionAwareCache.java
trunk/jboss-integration/src/main/java/org/teiid/jboss/BufferManagerService.java
trunk/jboss-integration/src/main/java/org/teiid/jboss/JGroupsObjectReplicatorService.java
trunk/jboss-integration/src/main/java/org/teiid/jboss/TeiidAdd.java
trunk/test-integration/common/pom.xml
trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java
Log:
TEIID-1720 next refinement of replication logic
Modified: trunk/api/src/main/java/org/teiid/Replicated.java
===================================================================
--- trunk/api/src/main/java/org/teiid/Replicated.java 2011-11-23 18:30:27 UTC (rev 3698)
+++ trunk/api/src/main/java/org/teiid/Replicated.java 2011-11-23 19:43:50 UTC (rev 3699)
@@ -61,7 +61,7 @@
*
* @return PUSH if the remote members should have a partial state replication called using the first argument as the state after
* the local method has been invoked, or PULL if the local member should initial a partial state pull using the first argument
- * as the state after the local method returns null
+ * as the state after the local method returns null. PULL cannot be asynch.
*/
ReplicationMode replicateState() default ReplicationMode.NONE;
Modified: trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/ClusterableCacheFactory.java
===================================================================
--- trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/ClusterableCacheFactory.java 2011-11-23 18:30:27 UTC (rev 3698)
+++ trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/ClusterableCacheFactory.java 2011-11-23 19:43:50 UTC (rev 3699)
@@ -27,6 +27,7 @@
import javax.naming.InitialContext;
import javax.naming.NamingException;
+import org.infinispan.manager.CacheContainer;
import org.teiid.cache.Cache;
import org.teiid.cache.CacheConfiguration;
import org.teiid.cache.CacheFactory;
@@ -49,7 +50,7 @@
}
else {
try {
- this.delegate = new JBossCacheFactory(this.resultsetCacheName, cacheManager);
+ this.delegate = new JBossCacheFactory(this.resultsetCacheName, (CacheContainer) cacheManager);
} catch (Exception e) {
throw new TeiidRuntimeException("Failed to obtain the clusted cache"); //$NON-NLS-1$
}
Modified: trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCache.java
===================================================================
--- trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCache.java 2011-11-23 18:30:27 UTC (rev 3698)
+++ trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCache.java 2011-11-23 19:43:50 UTC (rev 3699)
@@ -56,7 +56,10 @@
@Override
public V put(String key, V value, Long ttl) {
- return this.cacheStore.put(fqn(key), value, ttl, TimeUnit.SECONDS);
+ if (ttl != null) {
+ return this.cacheStore.put(fqn(key), value, ttl, TimeUnit.MILLISECONDS);
+ }
+ return this.cacheStore.put(fqn(key), value);
}
@Override
Modified: trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheFactory.java
===================================================================
--- trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheFactory.java 2011-11-23 18:30:27 UTC (rev 3698)
+++ trunk/cache-jbosscache/src/main/java/org/teiid/cache/jboss/JBossCacheFactory.java 2011-11-23 19:43:50 UTC (rev 3699)
@@ -37,9 +37,8 @@
private volatile boolean destroyed = false;
- public JBossCacheFactory(String name, Object cm) throws Exception {
- CacheContainer cachemanager = (CacheContainer)cm;
- this.cacheStore = cachemanager.getCache(name);
+ public JBossCacheFactory(String name, CacheContainer cm) {
+ this.cacheStore = cm.getCache(name);
}
/**
Added: trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/AddressWrapper.java
===================================================================
--- trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/AddressWrapper.java (rev 0)
+++ trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/AddressWrapper.java 2011-11-23 19:43:50 UTC (rev 3699)
@@ -0,0 +1,86 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.replication.jboss;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+import org.jgroups.Address;
+import org.teiid.core.util.ReflectionHelper;
+
+/**
+ * Allows JGroups {@link Address} objects to be serializable
+ */
+public final class AddressWrapper implements Externalizable {
+
+ Address address;
+
+ public AddressWrapper() {
+
+ }
+
+ public AddressWrapper(Address address) {
+ this.address = address;
+ }
+
+ @Override
+ public int hashCode() {
+ return address.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof AddressWrapper)) {
+ return false;
+ }
+ return address.equals(((AddressWrapper)obj).address);
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException,
+ ClassNotFoundException {
+ String className = in.readUTF();
+ try {
+ this.address = (Address) ReflectionHelper.create(className, null, Thread.currentThread().getContextClassLoader());
+ this.address.readFrom(in);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeUTF(address.getClass().getName());
+ try {
+ address.writeTo(out);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+}
\ No newline at end of file
Property changes on: trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/AddressWrapper.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java
===================================================================
--- trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java 2011-11-23 18:30:27 UTC (rev 3698)
+++ trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsInputStream.java 2011-11-23 19:43:50 UTC (rev 3699)
@@ -31,14 +31,17 @@
public class JGroupsInputStream extends InputStream {
- static long TIME_OUT = 15000; //TODO make configurable
-
+ private long timeout = 15000;
private volatile byte[] buf;
private volatile int index=0;
private ReentrantLock lock = new ReentrantLock();
private Condition write = lock.newCondition();
private Condition doneReading = lock.newCondition();
+ public JGroupsInputStream(long timeout) {
+ this.timeout = timeout;
+ }
+
@Override
public int read() throws IOException {
if (index < 0) {
@@ -47,7 +50,7 @@
if (buf == null) {
lock.lock();
try {
- write.await(TIME_OUT, TimeUnit.MILLISECONDS);
+ write.await(timeout, TimeUnit.MILLISECONDS);
if (index < 0) {
return -1;
}
Modified: trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
===================================================================
--- trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java 2011-11-23 18:30:27 UTC (rev 3698)
+++ trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java 2011-11-23 19:43:50 UTC (rev 3699)
@@ -22,11 +22,8 @@
package org.teiid.replication.jboss;
-import java.io.Externalizable;
import java.io.IOException;
import java.io.InputStream;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
import java.io.OutputStream;
import java.io.Serializable;
import java.lang.reflect.InvocationHandler;
@@ -40,7 +37,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@@ -48,7 +44,9 @@
import org.jboss.as.clustering.jgroups.ChannelFactory;
import org.jgroups.Address;
import org.jgroups.Channel;
+import org.jgroups.MembershipListener;
import org.jgroups.Message;
+import org.jgroups.MessageListener;
import org.jgroups.ReceiverAdapter;
import org.jgroups.View;
import org.jgroups.blocks.MethodCall;
@@ -57,92 +55,164 @@
import org.jgroups.blocks.ResponseMode;
import org.jgroups.blocks.RpcDispatcher;
import org.jgroups.util.Promise;
+import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
import org.jgroups.util.Util;
import org.teiid.Replicated;
import org.teiid.Replicated.ReplicationMode;
-import org.teiid.core.util.ReflectionHelper;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
import org.teiid.query.ObjectReplicator;
import org.teiid.query.ReplicatedObject;
-public abstract class JGroupsObjectReplicator implements ObjectReplicator, Serializable {
+public class JGroupsObjectReplicator implements ObjectReplicator, Serializable {
- public static final class AddressWrapper implements Externalizable {
-
- private Address address;
-
- public AddressWrapper() {
-
+ private final class ReplicatorRpcDispatcher<S> extends RpcDispatcher {
+ private final S object;
+ private final HashMap<Method, Short> methodMap;
+ private final ArrayList<Method> methodList;
+ Map<List<?>, JGroupsInputStream> inputStreams = new ConcurrentHashMap<List<?>, JGroupsInputStream>();
+
+ private ReplicatorRpcDispatcher(Channel channel, MessageListener l,
+ MembershipListener l2, Object serverObj, S object,
+ HashMap<Method, Short> methodMap, ArrayList<Method> methodList) {
+ super(channel, l, l2, serverObj);
+ this.object = object;
+ this.methodMap = methodMap;
+ this.methodList = methodList;
}
-
- public AddressWrapper(Address address) {
- this.address = address;
- }
-
+
@Override
- public int hashCode() {
- return address.hashCode();
+ public Object handle(Message req) {
+ Object body=null;
+
+ if(req == null || req.getLength() == 0) {
+ if(log.isErrorEnabled()) log.error("message or message buffer is null"); //$NON-NLS-1$
+ return null;
+ }
+
+ try {
+ body=req_marshaller != null?
+ req_marshaller.objectFromBuffer(req.getBuffer(), req.getOffset(), req.getLength())
+ : req.getObject();
+ }
+ catch(Throwable e) {
+ if(log.isErrorEnabled()) log.error("exception marshalling object", e); //$NON-NLS-1$
+ return e;
+ }
+
+ if(!(body instanceof MethodCall)) {
+ if(log.isErrorEnabled()) log.error("message does not contain a MethodCall object"); //$NON-NLS-1$
+
+ // create an exception to represent this and return it
+ return new IllegalArgumentException("message does not contain a MethodCall object") ; //$NON-NLS-1$
+ }
+
+ final MethodCall method_call=(MethodCall)body;
+
+ try {
+ if(log.isTraceEnabled())
+ log.trace("[sender=" + req.getSrc() + "], method_call: " + method_call); //$NON-NLS-1$ //$NON-NLS-2$
+
+ if(method_lookup == null)
+ throw new Exception("MethodCall uses ID=" + method_call.getId() + ", but method_lookup has not been set"); //$NON-NLS-1$ //$NON-NLS-2$
+
+ if (method_call.getId() >= methodList.size() - 3) {
+ Serializable address = new AddressWrapper(req.getSrc());
+ Serializable stateId = (Serializable)method_call.getArgs()[0];
+ List<?> key = Arrays.asList(stateId, address);
+ JGroupsInputStream is = inputStreams.get(key);
+ if (method_call.getId() == methodList.size() - 3) {
+ LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "create state", stateId); //$NON-NLS-1$
+ if (is != null) {
+ is.receive(null);
+ }
+ is = new JGroupsInputStream(15000);
+ this.inputStreams.put(key, is);
+ executor.execute(new StreamingRunner(object, stateId, is, null));
+ } else if (method_call.getId() == methodList.size() - 2) {
+ LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "building state", stateId); //$NON-NLS-1$
+ if (is != null) {
+ is.receive((byte[])method_call.getArgs()[1]);
+ }
+ } else if (method_call.getId() == methodList.size() - 1) {
+ LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "finished state", stateId); //$NON-NLS-1$
+ if (is != null) {
+ is.receive(null);
+ }
+ this.inputStreams.remove(key);
+ }
+ return null;
+ } else if (method_call.getId() == methodList.size() - 5) {
+ //hasState
+ ReplicatedObject ro = (ReplicatedObject)object;
+ Serializable stateId = (Serializable)method_call.getArgs()[0];
+
+ if (ro.hasState(stateId)) {
+ return Boolean.TRUE;
+ }
+ return null;
+ } else if (method_call.getId() == methodList.size() - 4) {
+ //sendState
+ ReplicatedObject ro = (ReplicatedObject)object;
+ String stateId = (String)method_call.getArgs()[0];
+ AddressWrapper dest = (AddressWrapper)method_call.getArgs()[1];
+
+ JGroupsOutputStream oStream = new JGroupsOutputStream(this, Arrays.asList(dest.address), stateId, (short)(methodMap.size() - 3), false);
+ try {
+ ro.getState(stateId, oStream);
+ } finally {
+ oStream.close();
+ }
+ LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "sent state", stateId); //$NON-NLS-1$
+ return null;
+ }
+
+ Method m=method_lookup.findMethod(method_call.getId());
+ if(m == null)
+ throw new Exception("no method found for " + method_call.getId()); //$NON-NLS-1$
+ method_call.setMethod(m);
+
+ return method_call.invoke(server_obj);
+ }
+ catch(Throwable x) {
+ return x;
+ }
}
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
- if (!(obj instanceof AddressWrapper)) {
- return false;
- }
- return address.equals(((AddressWrapper)obj).address);
- }
-
- @Override
- public void readExternal(ObjectInput in) throws IOException,
- ClassNotFoundException {
- String className = in.readUTF();
- try {
- this.address = (Address) ReflectionHelper.create(className, null, Thread.currentThread().getContextClassLoader());
- this.address.readFrom(in);
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-
- @Override
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeUTF(address.getClass().getName());
- try {
- address.writeTo(out);
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-
}
-
+
private static final long serialVersionUID = -6851804958313095166L;
+ private static final String HAS_STATE = "hasState"; //$NON-NLS-1$
+ private static final String SEND_STATE = "sendState"; //$NON-NLS-1$
private static final String CREATE_STATE = "createState"; //$NON-NLS-1$
private static final String BUILD_STATE = "buildState"; //$NON-NLS-1$
private static final String FINISH_STATE = "finishState"; //$NON-NLS-1$
- private final class StreamingRunner implements Runnable {
+ private final static class StreamingRunner implements Runnable {
private final Object object;
- private final String stateId;
+ private final Serializable stateId;
private final JGroupsInputStream is;
+ private Promise<Boolean> promise;
- private StreamingRunner(Object object, String stateId, JGroupsInputStream is) {
+ private StreamingRunner(Object object, Serializable stateId, JGroupsInputStream is, Promise<Boolean> promise) {
this.object = object;
this.stateId = stateId;
this.is = is;
+ this.promise = promise;
}
@Override
public void run() {
try {
((ReplicatedObject)object).setState(stateId, is);
+ if (promise != null) {
+ promise.setResult(Boolean.TRUE);
+ }
LogManager.logDetail(LogConstants.CTX_RUNTIME, "state set " + stateId); //$NON-NLS-1$
} catch (Exception e) {
+ if (promise != null) {
+ promise.setResult(Boolean.FALSE);
+ }
LogManager.logError(LogConstants.CTX_RUNTIME, e, "error setting state " + stateId); //$NON-NLS-1$
} finally {
is.close();
@@ -150,28 +220,24 @@
}
}
- private final static class ReplicatedInvocationHandler<S> extends ReceiverAdapter implements
+ private final class ReplicatedInvocationHandler<S> extends ReceiverAdapter implements
InvocationHandler, Serializable {
+ private static final int PULL_RETRIES = 3;
private static final long serialVersionUID = -2943462899945966103L;
private final S object;
- private RpcDispatcher disp;
+ private transient ReplicatorRpcDispatcher<S> disp;
private final HashMap<Method, Short> methodMap;
protected List<Address> remoteMembers = new ArrayList<Address>();
protected final transient Promise<Boolean> state_promise=new Promise<Boolean>();
+ private Map<Serializable, Promise<Boolean>> loadingStates = new HashMap<Serializable, Promise<Boolean>>();
- protected transient ThreadLocal<Promise<Boolean>> threadLocalPromise = new ThreadLocal<Promise<Boolean>>() {
- protected org.jgroups.util.Promise<Boolean> initialValue() {
- return new Promise<Boolean>();
- }
- };
-
private ReplicatedInvocationHandler(S object,HashMap<Method, Short> methodMap) {
this.object = object;
this.methodMap = methodMap;
}
- public void setDisp(RpcDispatcher disp) {
+ public void setDisp(ReplicatorRpcDispatcher<S> disp) {
this.disp = disp;
}
@@ -195,57 +261,16 @@
try {
Replicated annotation = method.getAnnotation(Replicated.class);
if (annotation.replicateState() != ReplicationMode.NONE) {
- Object result = null;
- try {
- result = method.invoke(object, args);
- } catch (InvocationTargetException e) {
- throw e.getCause();
- }
- List<Address> dests = null;
- synchronized (remoteMembers) {
- dests = new ArrayList<Address>(remoteMembers);
- }
- ReplicatedObject ro = (ReplicatedObject)object;
- String stateId = (String)args[0];
- if (annotation.replicateState() == ReplicationMode.PUSH) {
- LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "replicating state", stateId); //$NON-NLS-1$
- JGroupsOutputStream oStream = new JGroupsOutputStream(disp, dests, stateId, (short)(methodMap.size() - 3));
- try {
- ro.getState(stateId, oStream);
- } finally {
- oStream.close();
- }
- LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "sent state", stateId); //$NON-NLS-1$
- return result;
- }
- if (result != null) {
- return result;
- }
- LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "pulling state", stateId); //$NON-NLS-1$
- long timeout = annotation.timeout();
- threadLocalPromise.set(new Promise<Boolean>());
- /*boolean getState = this.disp.getChannel().getState(null, stateId, timeout);
- if (getState) {
- Boolean loaded = threadLocalPromise.get().getResult(timeout);
- if (Boolean.TRUE.equals(loaded)) {
- LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "loaded", stateId); //$NON-NLS-1$
- } else {
- LogManager.logWarning(LogConstants.CTX_RUNTIME, object + " load error or timeout " + stateId); //$NON-NLS-1$
- }
- } else {
- LogManager.logInfo(LogConstants.CTX_RUNTIME, object + " first member or timeout exceeded " + stateId); //$NON-NLS-1$
- }*/
- LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "sent state", stateId); //$NON-NLS-1$
- return result;
+ return handleReplicateState(method, args, annotation);
}
MethodCall call=new MethodCall(methodNum, args);
- Vector<Address> dests = null;
+ ArrayList<Address> dests = null;
if (annotation.remoteOnly()) {
synchronized (remoteMembers) {
- dests = new Vector<Address>(remoteMembers);
+ dests = new ArrayList<Address>(remoteMembers);
}
}
- RspList<Object> responses = disp.callRemoteMethods(dests, call, new RequestOptions().setMode(annotation.asynch()?ResponseMode.GET_NONE:ResponseMode.GET_ALL).setTimeout(annotation.timeout()));
+ RspList<Object> responses = disp.callRemoteMethods(dests, call, new RequestOptions().setMode(annotation.asynch()?ResponseMode.GET_NONE:ResponseMode.GET_ALL).setTimeout(annotation.timeout()).setAnycasting(dests != null));
if (annotation.asynch()) {
return null;
}
@@ -266,9 +291,112 @@
}
return null;
} catch(Exception e) {
- throw new RuntimeException(method + " " + args + " failed"); //$NON-NLS-1$ //$NON-NLS-2$
+ throw new RuntimeException(method + " " + args + " failed", e); //$NON-NLS-1$ //$NON-NLS-2$
}
}
+
+ private Object handleReplicateState(Method method, Object[] args,
+ Replicated annotation) throws IllegalAccessException,
+ Throwable, IOException, IllegalStateException, Exception {
+ Object result = null;
+ try {
+ result = method.invoke(object, args);
+ } catch (InvocationTargetException e) {
+ throw e.getCause();
+ }
+ List<Address> dests = null;
+ synchronized (remoteMembers) {
+ dests = new ArrayList<Address>(remoteMembers);
+ }
+ ReplicatedObject ro = (ReplicatedObject)object;
+ Serializable stateId = (Serializable)args[0];
+ if (annotation.replicateState() == ReplicationMode.PUSH) {
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "replicating state", stateId); //$NON-NLS-1$
+ JGroupsOutputStream oStream = new JGroupsOutputStream(disp, dests, stateId, (short)(methodMap.size() - 3), true);
+ try {
+ ro.getState(stateId, oStream);
+ } finally {
+ oStream.close();
+ }
+ LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "sent state", stateId); //$NON-NLS-1$
+ return result;
+ }
+ if (result != null) {
+ return result;
+ }
+ if (!(object instanceof ReplicatedObject)) {
+ throw new IllegalStateException("A non-ReplicatedObject cannot use state pulling."); //$NON-NLS-1$
+ }
+ for (int i = 0; i < PULL_RETRIES; i++) {
+ Promise<Boolean> p = null;
+ boolean wait = true;
+ synchronized (loadingStates) {
+ p = loadingStates.get(stateId);
+ if (p == null) {
+ wait = false;
+ try {
+ result = method.invoke(object, args);
+ } catch (InvocationTargetException e) {
+ throw e.getCause();
+ }
+ if (result != null) {
+ return result;
+ }
+ p = new Promise<Boolean>();
+ loadingStates.put(stateId, p);
+ }
+ }
+ long timeout = annotation.timeout();
+ if (wait) {
+ p.getResult(timeout);
+ continue;
+ }
+ try {
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "pulling state", stateId); //$NON-NLS-1$
+ RspList<Boolean> resp = this.disp.callRemoteMethods(null, new MethodCall((short)(methodMap.size() - 5), stateId), new RequestOptions(ResponseMode.GET_ALL, timeout));
+ Collection<Rsp<Boolean>> values = resp.values();
+ Rsp<Boolean> rsp = null;
+ for (Rsp<Boolean> response : values) {
+ if (Boolean.TRUE.equals(response.getValue())) {
+ rsp = response;
+ break;
+ }
+ }
+ if (rsp == null || this.disp.getChannel().getAddress().equals(rsp.getSender())) {
+ break;
+ }
+ JGroupsInputStream is = new JGroupsInputStream(15000);
+ StreamingRunner runner = new StreamingRunner(object, stateId, is, p);
+ List<?> key = Arrays.asList(stateId, new AddressWrapper(rsp.getSender()));
+ disp.inputStreams.put(key, is);
+ executor.execute(runner);
+
+ this.disp.callRemoteMethod(rsp.getSender(), new MethodCall((short)(methodMap.size() - 4), stateId, new AddressWrapper(this.disp.getChannel().getAddress())), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(true));
+
+ Boolean fetched = p.getResult(timeout);
+
+ if (fetched != null) {
+ if (fetched) {
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "pulled state", stateId); //$NON-NLS-1$
+ } else {
+ LogManager.logWarning(LogConstants.CTX_RUNTIME, object + " failed to pull " + stateId); //$NON-NLS-1$
+ }
+ } else {
+ LogManager.logWarning(LogConstants.CTX_RUNTIME, object + " timeout pulling " + stateId); //$NON-NLS-1$
+ }
+ try {
+ result = method.invoke(object, args);
+ } catch (InvocationTargetException e) {
+ throw e.getCause();
+ }
+ } finally {
+ synchronized (loadingStates) {
+ loadingStates.remove(stateId);
+ }
+ }
+ }
+ return null; //could not fetch the remote state
+ }
@Override
public void viewAccepted(View newView) {
@@ -315,46 +443,23 @@
}
}
- public void setState(String stateId, InputStream istream) {
- LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "loading state"); //$NON-NLS-1$
- try {
- ((ReplicatedObject)object).setState(stateId, istream);
- threadLocalPromise.get().setResult(Boolean.TRUE);
- } catch (Exception e) {
- threadLocalPromise.get().setResult(Boolean.FALSE);
- LogManager.logError(LogConstants.CTX_RUNTIME, e, "error loading state"); //$NON-NLS-1$
- } finally {
- Util.close(istream);
- }
- }
-
- public void getState(String stateId, OutputStream ostream) {
- LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "getting state"); //$NON-NLS-1$
- try {
- ((ReplicatedObject)object).getState(stateId, ostream);
- } catch (Exception e) {
- LogManager.logError(LogConstants.CTX_RUNTIME, e, "error gettting state"); //$NON-NLS-1$
- } finally {
- Util.close(ostream);
- }
- }
}
private interface Streaming {
- void createState(String id);
- void buildState(String id, byte[] bytes);
- void finishState(String id);
+ void sendState(Serializable id, AddressWrapper dest);
+ void createState(Serializable id);
+ void buildState(Serializable id, byte[] bytes);
+ void finishState(Serializable id);
}
//TODO: this should be configurable, or use a common executor
private transient Executor executor = Executors.newCachedThreadPool();
+ private transient ChannelFactory channelFactory;
- public JGroupsObjectReplicator(@SuppressWarnings("unused") String clusterName) {
+ public JGroupsObjectReplicator(ChannelFactory channelFactory) {
+ this.channelFactory = channelFactory;
}
- public abstract ChannelFactory getChannelFactory();
-
-
public void stop(Object object) {
if (!Proxy.isProxyClass(object.getClass())) {
return;
@@ -369,7 +474,7 @@
@Override
public <T, S> T replicate(String mux_id,
Class<T> iface, final S object, long startTimeout) throws Exception {
- Channel channel = getChannelFactory().createChannel(mux_id);
+ Channel channel = channelFactory.createChannel(mux_id);
Method[] methods = iface.getMethods();
final HashMap<Method, Short> methodMap = new HashMap<Method, Short>();
@@ -383,14 +488,22 @@
methodMap.put(method, (short)(methodList.size() - 1));
}
+ Method hasState = ReplicatedObject.class.getMethod(HAS_STATE, new Class<?>[] {Serializable.class});
+ methodList.add(hasState);
+ methodMap.put(hasState, (short)(methodList.size() - 1));
+
+ Method sendState = JGroupsObjectReplicator.Streaming.class.getMethod(SEND_STATE, new Class<?>[] {Serializable.class, AddressWrapper.class});
+ methodList.add(sendState);
+ methodMap.put(sendState, (short)(methodList.size() - 1));
+
//add in streaming methods
- Method createState = JGroupsObjectReplicator.Streaming.class.getMethod(CREATE_STATE, new Class<?>[] {String.class});
+ Method createState = JGroupsObjectReplicator.Streaming.class.getMethod(CREATE_STATE, new Class<?>[] {Serializable.class});
methodList.add(createState);
methodMap.put(createState, (short)(methodList.size() - 1));
- Method buildState = JGroupsObjectReplicator.Streaming.class.getMethod(BUILD_STATE, new Class<?>[] {String.class, byte[].class});
+ Method buildState = JGroupsObjectReplicator.Streaming.class.getMethod(BUILD_STATE, new Class<?>[] {Serializable.class, byte[].class});
methodList.add(buildState);
methodMap.put(buildState, (short)(methodList.size() - 1));
- Method finishState = JGroupsObjectReplicator.Streaming.class.getMethod(FINISH_STATE, new Class<?>[] {String.class});
+ Method finishState = JGroupsObjectReplicator.Streaming.class.getMethod(FINISH_STATE, new Class<?>[] {Serializable.class});
methodList.add(finishState);
methodMap.put(finishState, (short)(methodList.size() - 1));
@@ -399,87 +512,8 @@
* TODO: could have an object implement streaming
* Override the normal handle method to support streaming
*/
- RpcDispatcher disp = new RpcDispatcher(channel, proxy, proxy, object) {
- Map<List<?>, JGroupsInputStream> inputStreams = new ConcurrentHashMap<List<?>, JGroupsInputStream>();
- @Override
- public Object handle(Message req) {
- Object body=null;
-
- if(req == null || req.getLength() == 0) {
- if(log.isErrorEnabled()) log.error("message or message buffer is null"); //$NON-NLS-1$
- return null;
- }
-
- if (req.getSrc().equals(local_addr)) {
- return null;
- }
-
- try {
- body=req_marshaller != null?
- req_marshaller.objectFromBuffer(req.getBuffer(), req.getOffset(), req.getLength())
- : req.getObject();
- }
- catch(Throwable e) {
- if(log.isErrorEnabled()) log.error("exception marshalling object", e); //$NON-NLS-1$
- return e;
- }
-
- if(!(body instanceof MethodCall)) {
- if(log.isErrorEnabled()) log.error("message does not contain a MethodCall object"); //$NON-NLS-1$
-
- // create an exception to represent this and return it
- return new IllegalArgumentException("message does not contain a MethodCall object") ; //$NON-NLS-1$
- }
-
- final MethodCall method_call=(MethodCall)body;
-
- try {
- if(log.isTraceEnabled())
- log.trace("[sender=" + req.getSrc() + "], method_call: " + method_call); //$NON-NLS-1$ //$NON-NLS-2$
-
- if(method_lookup == null)
- throw new Exception("MethodCall uses ID=" + method_call.getId() + ", but method_lookup has not been set"); //$NON-NLS-1$ //$NON-NLS-2$
-
- if (method_call.getId() >= methodList.size() - 3) {
- Serializable address = new AddressWrapper(req.getSrc());
- String stateId = (String)method_call.getArgs()[0];
- List<?> key = Arrays.asList(stateId, address);
- JGroupsInputStream is = inputStreams.get(key);
- if (method_call.getId() == methodList.size() - 3) {
- LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "create state", stateId); //$NON-NLS-1$
- if (is != null) {
- is.receive(null);
- }
- is = new JGroupsInputStream();
- this.inputStreams.put(key, is);
- executor.execute(new StreamingRunner(object, stateId, is));
- } else if (method_call.getId() == methodList.size() - 2) {
- LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "building state", stateId); //$NON-NLS-1$
- if (is != null) {
- is.receive((byte[])method_call.getArgs()[1]);
- }
- } else if (method_call.getId() == methodList.size() - 1) {
- LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "finished state", stateId); //$NON-NLS-1$
- if (is != null) {
- is.receive(null);
- }
- this.inputStreams.remove(key);
- }
- return null;
- }
-
- Method m=method_lookup.findMethod(method_call.getId());
- if(m == null)
- throw new Exception("no method found for " + method_call.getId()); //$NON-NLS-1$
- method_call.setMethod(m);
-
- return method_call.invoke(server_obj);
- }
- catch(Throwable x) {
- return x;
- }
- }
- };
+ ReplicatorRpcDispatcher disp = new ReplicatorRpcDispatcher<S>(channel, proxy, proxy, object,
+ object, methodMap, methodList);
proxy.setDisp(disp);
disp.setMethodLookup(new MethodLookup() {
Modified: trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java
===================================================================
--- trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java 2011-11-23 18:30:27 UTC (rev 3698)
+++ trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsOutputStream.java 2011-11-23 19:43:50 UTC (rev 3699)
@@ -24,6 +24,7 @@
import java.io.IOException;
import java.io.OutputStream;
+import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
@@ -40,22 +41,24 @@
protected final RpcDispatcher disp;
protected final List<Address> dests;
- protected final String stateId;
+ protected final Serializable stateId;
protected final short methodOffset;
private volatile boolean closed=false;
private final byte[] buffer=new byte[CHUNK_SIZE];
private int index=0;
- public JGroupsOutputStream(RpcDispatcher disp, List<Address> dests, String stateId, short methodOffset) throws IOException {
+ public JGroupsOutputStream(RpcDispatcher disp, List<Address> dests, Serializable stateId, short methodOffset, boolean sendCreate) throws IOException {
this.disp=disp;
this.dests=dests;
this.stateId=stateId;
this.methodOffset = methodOffset;
- try {
- disp.callRemoteMethods(this.dests, new MethodCall(methodOffset, new Object[] {stateId}), new RequestOptions(ResponseMode.GET_NONE, 0));
- } catch(Exception e) {
- throw new IOException(e);
+ if (sendCreate) {
+ try {
+ disp.callRemoteMethods(this.dests, new MethodCall(methodOffset, new Object[] {stateId}), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(true));
+ } catch(Exception e) {
+ throw new IOException(e);
+ }
}
}
@@ -65,7 +68,7 @@
}
flush();
try {
- disp.callRemoteMethods(dests, new MethodCall((short)(methodOffset + 2), new Object[] {stateId}), new RequestOptions(ResponseMode.GET_NONE, 0));
+ disp.callRemoteMethods(dests, new MethodCall((short)(methodOffset + 2), new Object[] {stateId}), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(true));
} catch(Exception e) {
}
closed=true;
@@ -77,7 +80,7 @@
if(index == 0) {
return;
}
- disp.callRemoteMethods(dests, new MethodCall((short)(methodOffset + 1), new Object[] {stateId, Arrays.copyOf(buffer, index)}), new RequestOptions(ResponseMode.GET_NONE, 0));
+ disp.callRemoteMethods(dests, new MethodCall((short)(methodOffset + 1), new Object[] {stateId, Arrays.copyOf(buffer, index)}), new RequestOptions(ResponseMode.GET_NONE, 0).setAnycasting(true));
index=0;
} catch(Exception e) {
throw new IOException(e);
Modified: trunk/engine/src/main/java/org/teiid/cache/Cachable.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/cache/Cachable.java 2011-11-23 18:30:27 UTC (rev 3698)
+++ trunk/engine/src/main/java/org/teiid/cache/Cachable.java 2011-11-23 19:43:50 UTC (rev 3699)
@@ -26,9 +26,9 @@
public interface Cachable {
- boolean prepare(Cache cache, BufferManager bufferManager);
+ boolean prepare(BufferManager bufferManager);
- boolean restore(Cache cache, BufferManager bufferManager);
+ boolean restore(BufferManager bufferManager);
AccessInfo getAccessInfo();
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-11-23 18:30:27 UTC (rev 3698)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-11-23 19:43:50 UTC (rev 3699)
@@ -74,7 +74,7 @@
*
* TODO: add a pre-fetch for tuplebuffers or some built-in correlation logic with the queue.
*/
-public class BufferManagerImpl implements BufferManager, StorageManager, ReplicatedObject {
+public class BufferManagerImpl implements BufferManager, StorageManager, ReplicatedObject<String> {
/**
* Asynch cleaner attempts to age out old entries and to reduce the memory size when
@@ -1069,8 +1069,14 @@
public void setCache(Cache cache) {
this.cache = cache;
}
+
public int getMemoryCacheEntries() {
return memoryEntries.size();
}
+ @Override
+ public boolean hasState(String stateId) {
+ return this.getTupleBuffer(stateId) != null;
+ }
+
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java 2011-11-23 18:30:27 UTC (rev 3698)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/CachedResults.java 2011-11-23 19:43:50 UTC (rev 3699)
@@ -27,7 +27,6 @@
import org.teiid.api.exception.query.QueryParserException;
import org.teiid.api.exception.query.QueryResolverException;
import org.teiid.cache.Cachable;
-import org.teiid.cache.Cache;
import org.teiid.common.buffer.BufferManager;
import org.teiid.common.buffer.TupleBuffer;
import org.teiid.core.TeiidComponentException;
@@ -94,14 +93,14 @@
}
@Override
- public boolean prepare(Cache cache, BufferManager bufferManager) {
+ public boolean prepare(BufferManager bufferManager) {
Assertion.assertTrue(!this.results.isForwardOnly());
bufferManager.distributeTupleBuffer(this.results.getId(), results);
return true;
}
@Override
- public synchronized boolean restore(Cache cache, BufferManager bufferManager) {
+ public synchronized boolean restore(BufferManager bufferManager) {
if (this.results == null) {
if (this.hasLobs) {
return false; //the lob store is local only and not distributed
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedPlan.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedPlan.java 2011-11-23 18:30:27 UTC (rev 3698)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/PreparedPlan.java 2011-11-23 19:43:50 UTC (rev 3699)
@@ -25,7 +25,6 @@
import java.util.List;
import org.teiid.cache.Cachable;
-import org.teiid.cache.Cache;
import org.teiid.common.buffer.BufferManager;
import org.teiid.query.analysis.AnalysisRecord;
import org.teiid.query.processor.ProcessorPlan;
@@ -115,12 +114,12 @@
}
@Override
- public boolean prepare(Cache cache, BufferManager bufferManager) {
+ public boolean prepare(BufferManager bufferManager) {
return true; //no remotable actions
}
@Override
- public boolean restore(Cache cache, BufferManager bufferManager) {
+ public boolean restore(BufferManager bufferManager) {
return true; //no remotable actions
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java 2011-11-23 18:30:27 UTC (rev 3698)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/SessionAwareCache.java 2011-11-23 19:43:50 UTC (rev 3699)
@@ -61,7 +61,6 @@
private Cache<CacheID, T> localCache;
private Cache<CacheID, T> distributedCache;
- private Cache tupleBatchCache;
private int maxSize = DEFAULT_MAX_SIZE_TOTAL;
private long modTime;
@@ -93,12 +92,6 @@
else {
String location = config.getLocation()+"/"+type.name(); //$NON-NLS-1$
this.distributedCache = cacheFactory.get(location, config);
- if (type == Type.RESULTSET) {
- this.tupleBatchCache = cacheFactory.get(location+"/batches", config); //$NON-NLS-1$
- }
- else {
- this.tupleBatchCache = this.distributedCache;
- }
}
this.modTime = config.getMaxStaleness()*1000;
this.type = type;
@@ -124,7 +117,7 @@
if (result instanceof Cachable) {
Cachable c = (Cachable)result;
- if (!c.restore(this.tupleBatchCache, this.bufferManager)) {
+ if (!c.restore(this.bufferManager)) {
result = null;
}
}
@@ -188,7 +181,7 @@
if (t instanceof Cachable) {
Cachable c = (Cachable)t;
- insert = c.prepare(this.tupleBatchCache, this.bufferManager);
+ insert = c.prepare(this.bufferManager);
}
if (insert) {
Modified: trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java 2011-11-23 18:30:27 UTC (rev 3698)
+++ trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java 2011-11-23 19:43:50 UTC (rev 3699)
@@ -31,7 +31,7 @@
* Optional interface to be implemented by a replicated object to support full and partial state transfer.
*
*/
-public interface ReplicatedObject {
+public interface ReplicatedObject<K extends Serializable> {
/**
* Allows an application to write a state through a provided OutputStream.
@@ -46,7 +46,7 @@
* @param state_id id of the partial state requested
* @param ostream the OutputStream
*/
- void getState(String state_id, OutputStream ostream);
+ void getState(K state_id, OutputStream ostream);
/**
* Allows an application to read a state through a provided InputStream.
@@ -61,7 +61,7 @@
* @param state_id id of the partial state requested
* @param istream the InputStream
*/
- void setState(String state_id, InputStream istream);
+ void setState(K state_id, InputStream istream);
/**
* Allows the replicator to set the local address from the channel
@@ -75,4 +75,6 @@
*/
void droppedMembers(Collection<Serializable> addresses);
+ boolean hasState(K state_id);
+
}
Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java 2011-11-23 18:30:27 UTC (rev 3698)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java 2011-11-23 19:43:50 UTC (rev 3699)
@@ -63,7 +63,7 @@
import org.teiid.query.sql.symbol.GroupSymbol;
import org.teiid.query.tempdata.TempTableStore.TransactionMode;
-public class GlobalTableStoreImpl implements GlobalTableStore, ReplicatedObject {
+public class GlobalTableStoreImpl implements GlobalTableStore, ReplicatedObject<String> {
public enum MatState {
NEEDS_LOADING,
@@ -484,5 +484,10 @@
}
}
}
+
+ @Override
+ public boolean hasState(String stateId) {
+ return this.tableStore.getTempTable(stateId) != null;
+ }
}
Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java 2011-11-23 18:30:27 UTC (rev 3698)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestCachedResults.java 2011-11-23 19:43:50 UTC (rev 3699)
@@ -94,7 +94,7 @@
cache.put(results.getId()+","+row, tb.getBatch(row), null); //$NON-NLS-1$
}
- results.prepare(cache, bm);
+ results.prepare(bm);
//simulate distribute
TupleBuffer distributedTb = bm.getTupleBuffer(results.getId());
@@ -106,7 +106,7 @@
BufferManager bm2 = fbs.getBufferManager();
bm2.distributeTupleBuffer(results.getId(), distributedTb);
- assertTrue(cachedResults.restore(cache, bm2));
+ assertTrue(cachedResults.restore(bm2));
// since restored, simulate a async cache flush
cache.clear();
Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestSessionAwareCache.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestSessionAwareCache.java 2011-11-23 18:30:27 UTC (rev 3698)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestSessionAwareCache.java 2011-11-23 19:43:50 UTC (rev 3699)
@@ -30,7 +30,6 @@
import org.mockito.Mockito;
import org.teiid.adminapi.impl.SessionMetadata;
import org.teiid.cache.Cachable;
-import org.teiid.cache.Cache;
import org.teiid.common.buffer.BufferManager;
import org.teiid.dqp.internal.process.SessionAwareCache.CacheID;
import org.teiid.metadata.FunctionMethod.Determinism;
@@ -54,11 +53,11 @@
// make sure that in the case of session specific; we do not call prepare
// as session is local only call for distributed
- Mockito.verify(result, times(0)).prepare((Cache)anyObject(), (BufferManager)anyObject());
+ Mockito.verify(result, times(0)).prepare((BufferManager)anyObject());
Object c = cache.get(id);
- Mockito.verify(result, times(0)).restore((Cache)anyObject(), (BufferManager)anyObject());
+ Mockito.verify(result, times(0)).restore((BufferManager)anyObject());
assertTrue(result==c);
}
@@ -71,20 +70,20 @@
CacheID id = new CacheID(buildWorkContext(), new ParseInfo(), "SELECT * FROM FOO");
Cachable result = Mockito.mock(Cachable.class);
- Mockito.stub(result.prepare((Cache)anyObject(), (BufferManager)anyObject())).toReturn(true);
- Mockito.stub(result.restore((Cache)anyObject(), (BufferManager)anyObject())).toReturn(true);
+ Mockito.stub(result.prepare((BufferManager)anyObject())).toReturn(true);
+ Mockito.stub(result.restore((BufferManager)anyObject())).toReturn(true);
cache.put(id, Determinism.USER_DETERMINISTIC, result, null);
// make sure that in the case of session specific; we do not call prepare
// as session is local only call for distributed
- Mockito.verify(result, times(1)).prepare((Cache)anyObject(), (BufferManager)anyObject());
+ Mockito.verify(result, times(1)).prepare((BufferManager)anyObject());
id = new CacheID(buildWorkContext(), new ParseInfo(), "SELECT * FROM FOO");
Object c = cache.get(id);
- Mockito.verify(result, times(1)).restore((Cache)anyObject(), (BufferManager)anyObject());
+ Mockito.verify(result, times(1)).restore((BufferManager)anyObject());
assertTrue(result==c);
}
@@ -97,20 +96,20 @@
CacheID id = new CacheID(buildWorkContext(), new ParseInfo(), "SELECT * FROM FOO");
Cachable result = Mockito.mock(Cachable.class);
- Mockito.stub(result.prepare((Cache)anyObject(), (BufferManager)anyObject())).toReturn(true);
- Mockito.stub(result.restore((Cache)anyObject(), (BufferManager)anyObject())).toReturn(true);
+ Mockito.stub(result.prepare((BufferManager)anyObject())).toReturn(true);
+ Mockito.stub(result.restore((BufferManager)anyObject())).toReturn(true);
cache.put(id, Determinism.VDB_DETERMINISTIC, result, null);
// make sure that in the case of session specific; we do not call prepare
// as session is local only call for distributed
- Mockito.verify(result, times(1)).prepare((Cache)anyObject(), (BufferManager)anyObject());
+ Mockito.verify(result, times(1)).prepare((BufferManager)anyObject());
id = new CacheID(buildWorkContext(), new ParseInfo(), "SELECT * FROM FOO");
Object c = cache.get(id);
- Mockito.verify(result, times(1)).restore((Cache)anyObject(), (BufferManager)anyObject());
+ Mockito.verify(result, times(1)).restore((BufferManager)anyObject());
assertTrue(result==c);
}
@@ -123,8 +122,8 @@
CacheID id = new CacheID(buildWorkContext(), new ParseInfo(), "SELECT * FROM FOO");
Cachable result = Mockito.mock(Cachable.class);
- Mockito.stub(result.prepare((Cache)anyObject(), (BufferManager)anyObject())).toReturn(true);
- Mockito.stub(result.restore((Cache)anyObject(), (BufferManager)anyObject())).toReturn(true);
+ Mockito.stub(result.prepare((BufferManager)anyObject())).toReturn(true);
+ Mockito.stub(result.restore((BufferManager)anyObject())).toReturn(true);
id = new CacheID(buildWorkContext(), new ParseInfo(), "SELECT * FROM FOO");
cache.put(id, Determinism.VDB_DETERMINISTIC, result, null);
Modified: trunk/jboss-integration/src/main/java/org/teiid/jboss/BufferManagerService.java
===================================================================
--- trunk/jboss-integration/src/main/java/org/teiid/jboss/BufferManagerService.java 2011-11-23 18:30:27 UTC (rev 3698)
+++ trunk/jboss-integration/src/main/java/org/teiid/jboss/BufferManagerService.java 2011-11-23 19:43:50 UTC (rev 3699)
@@ -26,30 +26,53 @@
import org.jboss.msc.service.StartException;
import org.jboss.msc.service.StopContext;
import org.jboss.msc.value.InjectedValue;
+import org.teiid.common.buffer.BufferManager;
+import org.teiid.dqp.service.BufferService;
+import org.teiid.query.ObjectReplicator;
import org.teiid.services.BufferServiceImpl;
-class BufferManagerService implements Service<BufferServiceImpl> {
+class BufferManagerService implements Service<BufferService>, BufferService {
private BufferServiceImpl bufferMgr;
+ private ObjectReplicator replicator;
public final InjectedValue<String> pathInjector = new InjectedValue<String>();
+ private BufferManager manager;
- public BufferManagerService(BufferServiceImpl buffer) {
+ public BufferManagerService(BufferServiceImpl buffer, ObjectReplicator replicator) {
this.bufferMgr = buffer;
+ this.replicator = replicator;
}
@Override
public void start(StartContext context) throws StartException {
bufferMgr.setDiskDirectory(pathInjector.getValue());
bufferMgr.start();
+ manager = bufferMgr.getBufferManager();
+ if (replicator != null) {
+ try {
+ //use a mux name that will not conflict with any vdb
+ manager = this.replicator.replicate("$BM$", BufferManager.class, this.manager, 0); //$NON-NLS-1$
+ } catch (Exception e) {
+ throw new StartException(e);
+ }
+ }
}
@Override
public void stop(StopContext context) {
bufferMgr.stop();
+ if (this.replicator != null) {
+ this.replicator.stop(bufferMgr);
+ }
}
+
+ @Override
+ public BufferManager getBufferManager() {
+ return manager;
+ }
@Override
- public BufferServiceImpl getValue() throws IllegalStateException,IllegalArgumentException {
+ public BufferService getValue() throws IllegalStateException,IllegalArgumentException {
return this.bufferMgr;
}
Modified: trunk/jboss-integration/src/main/java/org/teiid/jboss/JGroupsObjectReplicatorService.java
===================================================================
--- trunk/jboss-integration/src/main/java/org/teiid/jboss/JGroupsObjectReplicatorService.java 2011-11-23 18:30:27 UTC (rev 3698)
+++ trunk/jboss-integration/src/main/java/org/teiid/jboss/JGroupsObjectReplicatorService.java 2011-11-23 19:43:50 UTC (rev 3699)
@@ -27,34 +27,22 @@
import org.jboss.msc.service.StartException;
import org.jboss.msc.service.StopContext;
import org.jboss.msc.value.InjectedValue;
-import org.teiid.common.buffer.BufferManager;
import org.teiid.replication.jboss.JGroupsObjectReplicator;
class JGroupsObjectReplicatorService implements Service<JGroupsObjectReplicator> {
public final InjectedValue<ChannelFactory> channelFactoryInjector = new InjectedValue<ChannelFactory>();
private JGroupsObjectReplicator replicator;
- private String clusterName;
- private BufferManager buffermanager;
+ /**
+ * @param clusterName TODO see if this is still useful
+ */
public JGroupsObjectReplicatorService(String clusterName){
- this.clusterName = clusterName;
}
@Override
public void start(StartContext context) throws StartException {
- this.replicator = new JGroupsObjectReplicator(this.clusterName) {
- @Override
- public ChannelFactory getChannelFactory() {
- return channelFactoryInjector.getValue();
- }
- };
-
- try {
- this.replicator.replicate(clusterName, BufferManager.class, this.buffermanager, 0);
- } catch (Exception e) {
- throw new StartException(e);
- }
+ this.replicator = new JGroupsObjectReplicator(channelFactoryInjector.getValue());
}
@Override
@@ -66,8 +54,4 @@
return replicator;
}
- public void setBufferManager(BufferManager buffermanager) {
- this.buffermanager = buffermanager;
- }
-
}
Modified: trunk/jboss-integration/src/main/java/org/teiid/jboss/TeiidAdd.java
===================================================================
--- trunk/jboss-integration/src/main/java/org/teiid/jboss/TeiidAdd.java 2011-11-23 18:30:27 UTC (rev 3698)
+++ trunk/jboss-integration/src/main/java/org/teiid/jboss/TeiidAdd.java 2011-11-23 19:43:50 UTC (rev 3699)
@@ -22,10 +22,7 @@
package org.teiid.jboss;
-import static org.jboss.as.controller.descriptions.ModelDescriptionConstants.ADD;
-import static org.jboss.as.controller.descriptions.ModelDescriptionConstants.DESCRIPTION;
-import static org.jboss.as.controller.descriptions.ModelDescriptionConstants.OPERATION_NAME;
-import static org.jboss.as.controller.descriptions.ModelDescriptionConstants.REQUEST_PROPERTIES;
+import static org.jboss.as.controller.descriptions.ModelDescriptionConstants.*;
import java.util.List;
import java.util.Locale;
@@ -52,17 +49,17 @@
import org.jboss.modules.ModuleIdentifier;
import org.jboss.modules.ModuleLoadException;
import org.jboss.msc.service.ServiceBuilder;
-import org.jboss.msc.service.ServiceBuilder.DependencyType;
import org.jboss.msc.service.ServiceContainer;
import org.jboss.msc.service.ServiceController;
import org.jboss.msc.service.ServiceName;
import org.jboss.msc.service.ServiceTarget;
import org.jboss.msc.service.ValueService;
+import org.jboss.msc.service.ServiceBuilder.DependencyType;
import org.jboss.msc.value.InjectedValue;
import org.teiid.PolicyDecider;
import org.teiid.cache.CacheConfiguration;
-import org.teiid.cache.CacheConfiguration.Policy;
import org.teiid.cache.DefaultCacheFactory;
+import org.teiid.cache.CacheConfiguration.Policy;
import org.teiid.cache.jboss.ClusterableCacheFactory;
import org.teiid.common.buffer.BufferManager;
import org.teiid.deployers.SystemVDBDeployer;
@@ -76,6 +73,7 @@
import org.teiid.dqp.internal.process.DefaultAuthorizationValidator;
import org.teiid.dqp.internal.process.PreparedPlan;
import org.teiid.dqp.internal.process.SessionAwareCache;
+import org.teiid.dqp.service.BufferService;
import org.teiid.jboss.deployers.RuntimeEngineDeployer;
import org.teiid.query.ObjectReplicator;
import org.teiid.query.function.SystemFunctionManager;
@@ -231,13 +229,29 @@
ServiceBuilder<ObjectSerializer> objectSerializerService = target.addService(TeiidServiceNames.OBJECT_SERIALIZER, serializer);
objectSerializerService.addDependency(TeiidServiceNames.DATA_DIR, String.class, serializer.getPathInjector());
newControllers.add(objectSerializerService.install());
+
+ // Object Replicator
+ JGroupsObjectReplicatorService replicatorService = null;
+ if (Element.OR_STACK_ATTRIBUTE.isDefined(operation)) {
+ String stack = Element.OR_STACK_ATTRIBUTE.asString(operation);
+
+ String clusterName = "teiid-rep"; //$NON-NLS-1$
+ if (Element.OR_CLUSTER_NAME_ATTRIBUTE.isDefined(operation)) {
+ clusterName = Element.OR_CLUSTER_NAME_ATTRIBUTE.asString(operation);
+ }
+
+ replicatorService = new JGroupsObjectReplicatorService(clusterName);
+ ServiceBuilder<JGroupsObjectReplicator> serviceBuilder = target.addService(TeiidServiceNames.OBJECT_REPLICATOR, replicatorService);
+ serviceBuilder.addDependency(ServiceName.JBOSS.append("jgroups", "stack", stack), ChannelFactory.class, replicatorService.channelFactoryInjector); //$NON-NLS-1$ //$NON-NLS-2$
+ newControllers.add(serviceBuilder.install());
+ }
// TODO: remove verbose service by moving the buffer service from runtime project
newControllers.add(RelativePathService.addService(TeiidServiceNames.BUFFER_DIR, "teiid-buffer", "jboss.server.temp.dir", target)); //$NON-NLS-1$ //$NON-NLS-2$
- final BufferServiceImpl bufferManager = buildBufferManager(operation);
- BufferManagerService bufferService = new BufferManagerService(bufferManager);
- ServiceBuilder<BufferServiceImpl> bufferServiceBuilder = target.addService(TeiidServiceNames.BUFFER_MGR, bufferService);
+ BufferManagerService bufferService = new BufferManagerService(buildBufferManager(operation), replicatorService.getValue());
+ ServiceBuilder<BufferService> bufferServiceBuilder = target.addService(TeiidServiceNames.BUFFER_MGR, bufferService);
bufferServiceBuilder.addDependency(TeiidServiceNames.BUFFER_DIR, String.class, bufferService.pathInjector);
+ bufferServiceBuilder.addDependency(TeiidServiceNames.BUFFER_DIR, String.class, bufferService.pathInjector);
newControllers.add(bufferServiceBuilder.install());
PolicyDecider policyDecider;
@@ -272,7 +286,7 @@
newControllers.add(target.addService(TeiidServiceNames.AUTHORIZATION_VALIDATOR, authValidatorService).install());
// resultset cache
- final SessionAwareCache<CachedResults> resultsetCache = buildResultsetCache(operation, bufferManager.getBufferManager());
+ final SessionAwareCache<CachedResults> resultsetCache = buildResultsetCache(operation, bufferService.getValue().getBufferManager());
ValueService<SessionAwareCache<CachedResults>> resultSetService = new ValueService<SessionAwareCache<CachedResults>>(new org.jboss.msc.value.Value<SessionAwareCache<CachedResults>>() {
@Override
public SessionAwareCache<CachedResults> getValue() throws IllegalStateException, IllegalArgumentException {
@@ -282,7 +296,7 @@
newControllers.add(target.addService(TeiidServiceNames.CACHE_RESULTSET, resultSetService).install());
// prepared-plan cache
- final SessionAwareCache<PreparedPlan> preparedPlanCache = buildPreparedPlanCache(operation, bufferManager.getBufferManager());
+ final SessionAwareCache<PreparedPlan> preparedPlanCache = buildPreparedPlanCache(operation, bufferService.getValue().getBufferManager());
ValueService<SessionAwareCache<PreparedPlan>> preparedPlanService = new ValueService<SessionAwareCache<PreparedPlan>>(new org.jboss.msc.value.Value<SessionAwareCache<PreparedPlan>>() {
@Override
public SessionAwareCache<PreparedPlan> getValue() throws IllegalStateException, IllegalArgumentException {
@@ -291,22 +305,6 @@
});
newControllers.add(target.addService(TeiidServiceNames.CACHE_PREPAREDPLAN, preparedPlanService).install());
- // Object Replicator
- if (Element.OR_STACK_ATTRIBUTE.isDefined(operation)) {
- String stack = Element.OR_STACK_ATTRIBUTE.asString(operation);
-
- String clusterName = "teiid-rep"; //$NON-NLS-1$
- if (Element.OR_CLUSTER_NAME_ATTRIBUTE.isDefined(operation)) {
- clusterName = Element.OR_CLUSTER_NAME_ATTRIBUTE.asString(operation);
- }
-
- JGroupsObjectReplicatorService replicatorService = new JGroupsObjectReplicatorService(clusterName);
- replicatorService.setBufferManager(bufferManager.getBufferManager());
- ServiceBuilder<JGroupsObjectReplicator> serviceBuilder = target.addService(TeiidServiceNames.OBJECT_REPLICATOR, replicatorService);
- serviceBuilder.addDependency(ServiceName.JBOSS.append("jgroups", "stack", stack), ChannelFactory.class, replicatorService.channelFactoryInjector); //$NON-NLS-1$ //$NON-NLS-2$
- newControllers.add(serviceBuilder.install());
- }
-
// Query Engine
final RuntimeEngineDeployer engine = buildQueryEngine(operation);
String workManager = "default"; //$NON-NLS-1$
@@ -367,7 +365,7 @@
}
- private BufferServiceImpl buildBufferManager(ModelNode node) {
+ private BufferServiceImpl buildBufferManager(ModelNode node) {
BufferServiceImpl bufferManger = new BufferServiceImpl();
if (node == null) {
@@ -417,7 +415,6 @@
private SessionAwareCache<CachedResults> buildResultsetCache(ModelNode node, BufferManager bufferManager) {
CacheConfiguration cacheConfig = new CacheConfiguration();
- // these settings are not really used; they are defined by infinispan
cacheConfig.setMaxEntries(1024);
cacheConfig.setMaxAgeInSeconds(7200);
cacheConfig.setType(Policy.EXPIRATION.name());
Modified: trunk/test-integration/common/pom.xml
===================================================================
--- trunk/test-integration/common/pom.xml 2011-11-23 18:30:27 UTC (rev 3698)
+++ trunk/test-integration/common/pom.xml 2011-11-23 19:43:50 UTC (rev 3699)
@@ -47,11 +47,10 @@
<version>1.0</version>
</dependency>
<dependency>
- <groupId>org.jgroups</groupId>
- <artifactId>jgroups</artifactId>
- <version>3.0.0.CR5</version>
- <scope>test</scope>
- </dependency>
+ <groupId>org.jboss.as</groupId>
+ <artifactId>jboss-as-clustering-jgroups</artifactId>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
<profiles>
<profile>
Modified: trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java
===================================================================
--- trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java 2011-11-23 18:30:27 UTC (rev 3698)
+++ trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java 2011-11-23 19:43:50 UTC (rev 3699)
@@ -22,26 +22,39 @@
package org.teiid.jdbc;
import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import javax.security.auth.Subject;
import javax.security.auth.login.LoginException;
import org.mockito.Mockito;
+import org.teiid.Replicated;
+import org.teiid.Replicated.ReplicationMode;
import org.teiid.adminapi.AdminException;
import org.teiid.adminapi.VDB;
import org.teiid.adminapi.impl.ModelMetaData;
import org.teiid.adminapi.impl.VDBMetaData;
+import org.teiid.cache.Cache;
import org.teiid.cache.CacheConfiguration;
+import org.teiid.cache.DefaultCacheFactory;
import org.teiid.cache.CacheConfiguration.Policy;
-import org.teiid.cache.DefaultCacheFactory;
import org.teiid.client.DQP;
import org.teiid.client.security.ILogon;
+import org.teiid.common.buffer.BufferManager;
+import org.teiid.common.buffer.BufferManagerFactory;
+import org.teiid.core.TeiidRuntimeException;
import org.teiid.core.util.UnitTestUtil;
import org.teiid.deployers.CompositeVDB;
import org.teiid.deployers.MetadataStoreGroup;
@@ -57,7 +70,7 @@
import org.teiid.dqp.internal.process.DQPCore;
import org.teiid.dqp.internal.process.PreparedPlan;
import org.teiid.dqp.internal.process.SessionAwareCache;
-import org.teiid.dqp.service.FakeBufferService;
+import org.teiid.dqp.service.BufferService;
import org.teiid.metadata.FunctionMethod;
import org.teiid.metadata.MetadataRepository;
import org.teiid.metadata.MetadataStore;
@@ -67,6 +80,7 @@
import org.teiid.net.CommunicationException;
import org.teiid.net.ConnectionException;
import org.teiid.query.ObjectReplicator;
+import org.teiid.query.ReplicatedObject;
import org.teiid.query.function.SystemFunctionManager;
import org.teiid.query.metadata.TransformationMetadata;
import org.teiid.query.metadata.TransformationMetadata.Resource;
@@ -84,9 +98,108 @@
import org.teiid.transport.LocalServerConnection;
import org.teiid.transport.LogonImpl;
-@SuppressWarnings({"nls", "serial"})
+@SuppressWarnings({"nls"})
public class FakeServer extends ClientServiceRegistryImpl implements ConnectionProfile {
+
+ public interface ReplicatedCache<K, V> extends Cache<K, V> {
+
+ @Replicated(replicateState=ReplicationMode.PULL)
+ public V get(K key);
+ @Replicated(replicateState=ReplicationMode.PUSH)
+ V put(K key, V value, Long ttl);
+
+ @Replicated()
+ V remove(K key);
+
+ }
+
+ public static class ReplicatedCacheImpl<K extends Serializable, V> implements ReplicatedCache<K, V>, ReplicatedObject<K> {
+ private Cache<K, V> cache;
+
+ public ReplicatedCacheImpl(Cache<K, V> cache) {
+ this.cache = cache;
+ }
+
+ public void clear() {
+ cache.clear();
+ }
+
+ public V get(K key) {
+ return cache.get(key);
+ }
+
+ public String getName() {
+ return cache.getName();
+ }
+
+ public Set<K> keys() {
+ return cache.keys();
+ }
+
+ public V put(K key, V value, Long ttl) {
+ return cache.put(key, value, ttl);
+ }
+
+ public V remove(K key) {
+ return cache.remove(key);
+ }
+
+ public int size() {
+ return cache.size();
+ }
+
+ @Override
+ public void getState(K stateId, OutputStream ostream) {
+ V value = get(stateId);
+ if (value != null) {
+ try {
+ ObjectOutputStream oos = new ObjectOutputStream(ostream);
+ oos.writeObject(value);
+ oos.close();
+ } catch (IOException e) {
+ throw new TeiidRuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public void setState(K stateId, InputStream istream) {
+ try {
+ ObjectInputStream ois = new ObjectInputStream(istream);
+ V value = (V) ois.readObject();
+ this.put(stateId, value, null);
+ } catch (IOException e) {
+ throw new TeiidRuntimeException(e);
+ } catch (ClassNotFoundException e) {
+ throw new TeiidRuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean hasState(K stateId) {
+ return cache.get(stateId) != null;
+ }
+
+ @Override
+ public void droppedMembers(Collection<Serializable> addresses) {
+ }
+
+ @Override
+ public void getState(OutputStream ostream) {
+ }
+
+ @Override
+ public void setAddress(Serializable address) {
+ }
+
+ @Override
+ public void setState(InputStream istream) {
+ }
+
+
+ }
+
SessionServiceImpl sessionService = new SessionServiceImpl() {
@Override
protected TeiidLoginContext authenticate(String userName,
@@ -113,10 +226,17 @@
}
public FakeServer(DQPConfiguration config) {
- this(config, false);
+ start(config, false);
}
- public FakeServer(DQPConfiguration config, boolean realBufferMangaer) {
+ public FakeServer(boolean start) {
+ if (start) {
+ start(new DQPConfiguration(), false);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public void start(DQPConfiguration config, boolean realBufferMangaer) {
sessionService.setSecurityHelper(Mockito.mock(SecurityHelper.class));
sessionService.setSecurityDomains(Arrays.asList("somedomain"));
@@ -147,20 +267,57 @@
this.repo.start();
this.sessionService.setVDBRepository(repo);
+ BufferService bs = null;
if (!realBufferMangaer) {
- this.dqp.setBufferService(new FakeBufferService());
+ bs = new BufferService() {
+
+ @Override
+ public BufferManager getBufferManager() {
+ return BufferManagerFactory.createBufferManager();
+ }
+ };
} else {
BufferServiceImpl bsi = new BufferServiceImpl();
bsi.setDiskDirectory(UnitTestUtil.getTestScratchPath());
- this.dqp.setBufferService(bsi);
bsi.start();
+ bs = bsi;
}
+ if (replicator != null) {
+ try {
+ final BufferManager bm = replicator.replicate("$BM$", BufferManager.class, bs.getBufferManager(), 0);
+ bs = new BufferService() {
+
+ @Override
+ public BufferManager getBufferManager() {
+ return bm;
+ }
+ };
+ } catch (Exception e) {
+ throw new TeiidRuntimeException(e);
+ }
+ }
+ this.dqp.setBufferService(bs);
+
+ //TODO: wire in an infinispan cluster rather than this dummy replicated cache
DefaultCacheFactory dcf = new DefaultCacheFactory() {
- @Override
public boolean isReplicated() {
- return true; //pretend to be replicated for matview tests
+ return true;
}
- };
+
+ @Override
+ public <K, V> Cache<K, V> get(String location,
+ CacheConfiguration config) {
+ Cache<K, V> result = super.get(location, config);
+ if (replicator != null) {
+ try {
+ return (Cache<K, V>) replicator.replicate("$RS$", ReplicatedCache.class, new ReplicatedCacheImpl(result), 0);
+ } catch (Exception e) {
+ throw new TeiidRuntimeException(e);
+ }
+ }
+ return result;
+ }
+ };
SessionAwareCache rs = new SessionAwareCache<CachedResults>(dcf, SessionAwareCache.Type.RESULTSET, new CacheConfiguration(Policy.LRU, 60, 250, "resultsetcache"));
SessionAwareCache ppc = new SessionAwareCache<PreparedPlan>(dcf, SessionAwareCache.Type.PREPAREDPLAN, new CacheConfiguration());
rs.setBufferManager(this.dqp.getBufferManager());
Deleted: trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViewReplication.java
===================================================================
--- trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViewReplication.java 2011-11-23 18:30:27 UTC (rev 3698)
+++ trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViewReplication.java 2011-11-23 19:43:50 UTC (rev 3699)
@@ -1,138 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.systemmodel;
-
-import static org.junit.Assert.*;
-
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.Statement;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-
-import org.jboss.as.clustering.jgroups.ChannelFactory;
-import org.jgroups.Channel;
-import org.jgroups.JChannel;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.teiid.core.types.DataTypeManager;
-import org.teiid.core.util.UnitTestUtil;
-import org.teiid.jdbc.FakeServer;
-import org.teiid.metadata.FunctionMethod;
-import org.teiid.metadata.FunctionParameter;
-import org.teiid.metadata.FunctionMethod.Determinism;
-import org.teiid.metadata.FunctionMethod.PushDown;
-import org.teiid.replication.jboss.JGroupsObjectReplicator;
-
-@SuppressWarnings("nls")
-public class TestMatViewReplication {
-
- private static final String MATVIEWS = "matviews";
- private static final boolean DEBUG = false;
-
- @BeforeClass public static void oneTimeSetup() {
- System.setProperty("jgroups.bind_addr", "127.0.0.1");
- }
-
- @Test public void testReplication() throws Exception {
- if (DEBUG) {
- UnitTestUtil.enableTraceLogging("org.teiid");
- }
-
- FakeServer server1 = createServer();
-
- Connection c1 = server1.createConnection("jdbc:teiid:matviews");
- Statement stmt = c1.createStatement();
- stmt.execute("select * from TEST.RANDOMVIEW");
- ResultSet rs = stmt.getResultSet();
- assertTrue(rs.next());
- double d1 = rs.getDouble(1);
- double d2 = rs.getDouble(2);
-
- FakeServer server2 = createServer();
- Connection c2 = server2.createConnection("jdbc:teiid:matviews");
- Statement stmt2 = c2.createStatement();
- ResultSet rs2 = stmt2.executeQuery("select * from matviews where name = 'RandomView'");
- assertTrue(rs2.next());
- assertEquals("LOADED", rs2.getString("loadstate"));
- assertEquals(true, rs2.getBoolean("valid"));
- stmt2.execute("select * from TEST.RANDOMVIEW");
- rs2 = stmt2.getResultSet();
- assertTrue(rs2.next());
- assertEquals(d1, rs2.getDouble(1), 0);
- assertEquals(d2, rs2.getDouble(2), 0);
-
- rs2 = stmt2.executeQuery("select * from (call refreshMatView('TEST.RANDOMVIEW', false)) p");
-
- Thread.sleep(1000);
-
- //make sure we're still valid and the same
- stmt.execute("select * from TEST.RANDOMVIEW");
- rs = stmt.getResultSet();
- assertTrue(rs.next());
- d1 = rs.getDouble(1);
- d2 = rs.getDouble(2);
- stmt2.execute("select * from TEST.RANDOMVIEW");
- rs2 = stmt2.getResultSet();
- assertTrue(rs2.next());
- assertEquals(d1, rs2.getDouble(1), 0);
- assertEquals(d2, rs2.getDouble(2), 0);
-
- //ensure a lookup is usable on each side
- rs2 = stmt2.executeQuery("select lookup('sys.schemas', 'VDBName', 'name', 'SYS')");
- Thread.sleep(1000);
-
- rs = stmt.executeQuery("select lookup('sys.schemas', 'VDBName', 'name', 'SYS')");
- rs.next();
- assertEquals("matviews", rs.getString(1));
-
- server1.stop();
- server2.stop();
- }
-
- @SuppressWarnings("serial")
- private FakeServer createServer() throws Exception {
- FakeServer server = new FakeServer();
-
- JGroupsObjectReplicator jor = new JGroupsObjectReplicator("demo") {
- @Override
- public ChannelFactory getChannelFactory() {
- return new ChannelFactory() {
- @Override
- public Channel createChannel(String id) throws Exception {
- return new JChannel(this.getClass().getClassLoader().getResource("tcp.xml"));
- }
- };
- }
-
- };
-
- server.setReplicator(jor);
- HashMap<String, Collection<FunctionMethod>> udfs = new HashMap<String, Collection<FunctionMethod>>();
- udfs.put("funcs", Arrays.asList(new FunctionMethod("pause", null, null, PushDown.CANNOT_PUSHDOWN, TestMatViews.class.getName(), "pause", null, new FunctionParameter("return", DataTypeManager.DefaultDataTypes.INTEGER), false, Determinism.NONDETERMINISTIC)));
- server.deployVDB(MATVIEWS, UnitTestUtil.getTestDataPath() + "/matviews.vdb", udfs);
- return server;
- }
-
-}
Copied: trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestReplication.java (from rev 3695, trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViewReplication.java)
===================================================================
--- trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestReplication.java (rev 0)
+++ trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestReplication.java 2011-11-23 19:43:50 UTC (rev 3699)
@@ -0,0 +1,146 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.systemmodel;
+
+import static org.junit.Assert.*;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+
+import org.jboss.as.clustering.jgroups.ChannelFactory;
+import org.jgroups.Channel;
+import org.jgroups.JChannel;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.teiid.core.types.DataTypeManager;
+import org.teiid.core.util.UnitTestUtil;
+import org.teiid.dqp.internal.process.DQPConfiguration;
+import org.teiid.jdbc.FakeServer;
+import org.teiid.metadata.FunctionMethod;
+import org.teiid.metadata.FunctionParameter;
+import org.teiid.metadata.FunctionMethod.Determinism;
+import org.teiid.metadata.FunctionMethod.PushDown;
+import org.teiid.replication.jboss.JGroupsObjectReplicator;
+
+@SuppressWarnings("nls")
+public class TestReplication {
+
+ private static final String MATVIEWS = "matviews";
+ private static final boolean DEBUG = false;
+
+ @BeforeClass public static void oneTimeSetup() {
+ System.setProperty("jgroups.bind_addr", "127.0.0.1");
+ }
+
+ @Test public void testReplication() throws Exception {
+ if (DEBUG) {
+ UnitTestUtil.enableTraceLogging("org.teiid");
+ }
+
+ FakeServer server1 = createServer();
+
+ Connection c1 = server1.createConnection("jdbc:teiid:matviews");
+ Statement stmt = c1.createStatement();
+ stmt.execute("select * from TEST.RANDOMVIEW");
+ ResultSet rs = stmt.getResultSet();
+ assertTrue(rs.next());
+ double d1 = rs.getDouble(1);
+ double d2 = rs.getDouble(2);
+
+ FakeServer server2 = createServer();
+ Connection c2 = server2.createConnection("jdbc:teiid:matviews");
+ Statement stmt2 = c2.createStatement();
+ ResultSet rs2 = stmt2.executeQuery("select * from matviews where name = 'RandomView'");
+ assertTrue(rs2.next());
+ assertEquals("LOADED", rs2.getString("loadstate"));
+ assertEquals(true, rs2.getBoolean("valid"));
+ stmt2.execute("select * from TEST.RANDOMVIEW");
+ rs2 = stmt2.getResultSet();
+ assertTrue(rs2.next());
+ assertEquals(d1, rs2.getDouble(1), 0);
+ assertEquals(d2, rs2.getDouble(2), 0);
+
+ rs2 = stmt2.executeQuery("select * from (call refreshMatView('TEST.RANDOMVIEW', false)) p");
+
+ Thread.sleep(1000);
+
+ //make sure we're still valid and the same
+ stmt.execute("select * from TEST.RANDOMVIEW");
+ rs = stmt.getResultSet();
+ assertTrue(rs.next());
+ d1 = rs.getDouble(1);
+ d2 = rs.getDouble(2);
+ stmt2.execute("select * from TEST.RANDOMVIEW");
+ rs2 = stmt2.getResultSet();
+ assertTrue(rs2.next());
+ assertEquals(d1, rs2.getDouble(1), 0);
+ assertEquals(d2, rs2.getDouble(2), 0);
+
+ //ensure a lookup is usable on each side
+ rs2 = stmt2.executeQuery("select lookup('sys.schemas', 'VDBName', 'name', 'SYS')");
+ Thread.sleep(1000);
+
+ rs = stmt.executeQuery("select lookup('sys.schemas', 'VDBName', 'name', 'SYS')");
+ rs.next();
+ assertEquals("matviews", rs.getString(1));
+
+ //result set cache replication
+
+ rs = stmt.executeQuery("/*+ cache(scope:vdb) */ select rand()"); //$NON-NLS-1$
+ assertTrue(rs.next());
+ d1 = rs.getDouble(1);
+
+ //no wait is needed as we perform a synch pull
+ rs2 = stmt2.executeQuery("/*+ cache(scope:vdb) */ select rand()"); //$NON-NLS-1$
+ assertTrue(rs2.next());
+ d2 = rs2.getDouble(1);
+
+ assertEquals(d1, d2, 0);
+
+ server1.stop();
+ server2.stop();
+ }
+
+ private FakeServer createServer() throws Exception {
+ FakeServer server = new FakeServer(false);
+
+ JGroupsObjectReplicator jor = new JGroupsObjectReplicator(new ChannelFactory() {
+ @Override
+ public Channel createChannel(String id) throws Exception {
+ return new JChannel(this.getClass().getClassLoader().getResource("tcp.xml"));
+ }
+ });
+
+ server.setReplicator(jor);
+ server.start(new DQPConfiguration(), true);
+ HashMap<String, Collection<FunctionMethod>> udfs = new HashMap<String, Collection<FunctionMethod>>();
+ udfs.put("funcs", Arrays.asList(new FunctionMethod("pause", null, null, PushDown.CANNOT_PUSHDOWN, TestMatViews.class.getName(), "pause", null, new FunctionParameter("return", DataTypeManager.DefaultDataTypes.INTEGER), false, Determinism.NONDETERMINISTIC)));
+ server.deployVDB(MATVIEWS, UnitTestUtil.getTestDataPath() + "/matviews.vdb", udfs);
+ return server;
+ }
+
+}
Property changes on: trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestReplication.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
13 years, 5 months
teiid SVN: r3698 - in branches/7.6.x: client/src/main/resources/org/teiid/jdbc and 1 other directories.
by teiid-commits@lists.jboss.org
Author: rareddy
Date: 2011-11-23 13:30:27 -0500 (Wed, 23 Nov 2011)
New Revision: 3698
Modified:
branches/7.6.x/client/src/main/java/org/teiid/gss/MakeGSS.java
branches/7.6.x/client/src/main/resources/org/teiid/jdbc/i18n.properties
branches/7.6.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java
Log:
TEIID-1687, TEIID-1610: making the ODBC connection not use pass-through; added some error messages for jdbc configuration part if they are missing
Modified: branches/7.6.x/client/src/main/java/org/teiid/gss/MakeGSS.java
===================================================================
--- branches/7.6.x/client/src/main/java/org/teiid/gss/MakeGSS.java 2011-11-23 17:30:26 UTC (rev 3697)
+++ branches/7.6.x/client/src/main/java/org/teiid/gss/MakeGSS.java 2011-11-23 18:30:27 UTC (rev 3698)
@@ -67,8 +67,42 @@
Object result = null;
- String jaasApplicationName = props.getProperty(TeiidURL.CONNECTION.JAAS_NAME, "teiid"); //$NON-NLS-1$
- String kerberosPrincipalName = props.getProperty(TeiidURL.CONNECTION.KERBEROS_SERVICE_PRINCIPLE_NAME, "teiid"); //$NON-NLS-1$
+ StringBuilder errors = new StringBuilder();
+ String jaasApplicationName = props.getProperty(TeiidURL.CONNECTION.JAAS_NAME);
+ String nl = System.getProperty("line.separator");//$NON-NLS-1$
+ if (jaasApplicationName == null) {
+ errors.append(JDBCPlugin.Util.getString("client_prop_missing", TeiidURL.CONNECTION.JAAS_NAME)); //$NON-NLS-1$
+ errors.append(nl);
+ }
+
+ String kerberosPrincipalName = props.getProperty(TeiidURL.CONNECTION.KERBEROS_SERVICE_PRINCIPLE_NAME);
+ if (kerberosPrincipalName == null) {
+ errors.append(JDBCPlugin.Util.getString("client_prop_missing", TeiidURL.CONNECTION.KERBEROS_SERVICE_PRINCIPLE_NAME)); //$NON-NLS-1$
+ errors.append(nl);
+ }
+
+ String realm = System.getProperty("java.security.krb5.realm"); //$NON-NLS-1$
+ if (realm == null) {
+ errors.append(JDBCPlugin.Util.getString("system_prop_missing", "java.security.krb5.realm")); //$NON-NLS-1$ //$NON-NLS-2$
+ errors.append(nl);
+ }
+
+ String kdc = System.getProperty("java.security.krb5.kdc"); //$NON-NLS-1$
+ if (kdc == null) {
+ errors.append(JDBCPlugin.Util.getString("system_prop_missing", "java.security.krb5.kdc")); //$NON-NLS-1$ //$NON-NLS-2$
+ errors.append(nl);
+ }
+
+ String config = System.getProperty("java.security.auth.login.config"); //$NON-NLS-1$
+ if (config == null) {
+ errors.append(JDBCPlugin.Util.getString("system_prop_missing", "java.security.auth.login.config")); //$NON-NLS-1$ //$NON-NLS-2$
+ errors.append(nl);
+ }
+
+ if (errors.length() > 0) {
+ throw new LogonException(errors.toString());
+ }
+
String user = props.getProperty(TeiidURL.CONNECTION.USER_NAME);
String password = props.getProperty(TeiidURL.CONNECTION.PASSWORD);
Modified: branches/7.6.x/client/src/main/resources/org/teiid/jdbc/i18n.properties
===================================================================
--- branches/7.6.x/client/src/main/resources/org/teiid/jdbc/i18n.properties 2011-11-23 17:30:26 UTC (rev 3697)
+++ branches/7.6.x/client/src/main/resources/org/teiid/jdbc/i18n.properties 2011-11-23 18:30:27 UTC (rev 3698)
@@ -155,3 +155,5 @@
no_krb_ticket=No cached kerberos ticket found and/or no password supplied
gss_auth_failed=GSS Authentication failed
setup_failed=Protocol error. Session setup failed.
+client_prop_missing=Client URL connection property missing "{0}". Please add the property to connection URL.
+system_prop_missing=System property "{0}" missing, please add using -D option on the VM startup script.
\ No newline at end of file
Modified: branches/7.6.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java
===================================================================
--- branches/7.6.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java 2011-11-23 17:30:26 UTC (rev 3697)
+++ branches/7.6.x/runtime/src/main/java/org/teiid/odbc/ODBCServerRemoteImpl.java 2011-11-23 18:30:27 UTC (rev 3698)
@@ -201,7 +201,6 @@
info.put("user", user); //$NON-NLS-1$
String password = null;
- String passthroughAuthentication = ""; //$NON-NLS-1$
if (authType.equals(AuthenticationType.CLEARTEXT)) {
password = data.readString();
}
@@ -210,7 +209,6 @@
LogonResult result = this.logon.neogitiateGssLogin(this.props, serviceToken, false);
serviceToken = (byte[])result.getProperty(ILogon.KRB5TOKEN);
if (Boolean.TRUE.equals(result.getProperty(ILogon.KRB5_ESTABLISHED))) {
- passthroughAuthentication = ";PassthroughAuthentication=true;authenticationType=KRB5"; //$NON-NLS-1$
info.put(ILogon.KRB5TOKEN, serviceToken);
}
else {
@@ -220,7 +218,7 @@
}
// this is local connection
- String url = "jdbc:teiid:"+databaseName+";ApplicationName=ODBC"+passthroughAuthentication; //$NON-NLS-1$ //$NON-NLS-2$
+ String url = "jdbc:teiid:"+databaseName+";ApplicationName=ODBC"; //$NON-NLS-1$ //$NON-NLS-2$
if (password != null) {
info.put("password", password); //$NON-NLS-1$
13 years, 5 months
teiid SVN: r3697 - branches/7.6.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2011-11-23 12:30:26 -0500 (Wed, 23 Nov 2011)
New Revision: 3697
Modified:
branches/7.6.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
Log:
TEIID-1673 undoing ignoring local messages.
Modified: branches/7.6.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
===================================================================
--- branches/7.6.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java 2011-11-23 16:19:12 UTC (rev 3696)
+++ branches/7.6.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java 2011-11-23 17:30:26 UTC (rev 3697)
@@ -351,10 +351,6 @@
return null;
}
- if (req.getSrc().equals(local_addr)) {
- return null;
- }
-
try {
body=req_marshaller != null?
req_marshaller.objectFromByteBuffer(req.getBuffer(), req.getOffset(), req.getLength())
13 years, 5 months
teiid SVN: r3696 - branches/7.6.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2011-11-23 11:19:12 -0500 (Wed, 23 Nov 2011)
New Revision: 3696
Modified:
branches/7.6.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
Log:
TEIID-1673 ignoring local messages
Modified: branches/7.6.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
===================================================================
--- branches/7.6.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java 2011-11-23 02:22:18 UTC (rev 3695)
+++ branches/7.6.x/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java 2011-11-23 16:19:12 UTC (rev 3696)
@@ -350,6 +350,10 @@
if(log.isErrorEnabled()) log.error("message or message buffer is null"); //$NON-NLS-1$
return null;
}
+
+ if (req.getSrc().equals(local_addr)) {
+ return null;
+ }
try {
body=req_marshaller != null?
13 years, 5 months
teiid SVN: r3695 - in trunk: engine/src/main/java/org/teiid/common/buffer/impl and 3 other directories.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2011-11-22 21:22:18 -0500 (Tue, 22 Nov 2011)
New Revision: 3695
Modified:
trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java
trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStore.java
trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java
trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViewReplication.java
Log:
TEIID-1720 upgrading to new jgroups. however partial state is not corrected yet and initial state transfer may have to be rewritten
Modified: trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java
===================================================================
--- trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java 2011-11-23 02:18:58 UTC (rev 3694)
+++ trunk/cache-jbosscache/src/main/java/org/teiid/replication/jboss/JGroupsObjectReplicator.java 2011-11-23 02:22:18 UTC (rev 3695)
@@ -22,7 +22,11 @@
package org.teiid.replication.jboss;
+import java.io.Externalizable;
+import java.io.IOException;
import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.io.OutputStream;
import java.io.Serializable;
import java.lang.reflect.InvocationHandler;
@@ -44,13 +48,9 @@
import org.jboss.as.clustering.jgroups.ChannelFactory;
import org.jgroups.Address;
import org.jgroups.Channel;
-import org.jgroups.MembershipListener;
import org.jgroups.Message;
-import org.jgroups.MessageListener;
-import org.jgroups.Receiver;
import org.jgroups.ReceiverAdapter;
import org.jgroups.View;
-import org.jgroups.blocks.GroupRequest;
import org.jgroups.blocks.MethodCall;
import org.jgroups.blocks.MethodLookup;
import org.jgroups.blocks.RequestOptions;
@@ -61,6 +61,7 @@
import org.jgroups.util.Util;
import org.teiid.Replicated;
import org.teiid.Replicated.ReplicationMode;
+import org.teiid.core.util.ReflectionHelper;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
import org.teiid.query.ObjectReplicator;
@@ -68,6 +69,58 @@
public abstract class JGroupsObjectReplicator implements ObjectReplicator, Serializable {
+ public static final class AddressWrapper implements Externalizable {
+
+ private Address address;
+
+ public AddressWrapper() {
+
+ }
+
+ public AddressWrapper(Address address) {
+ this.address = address;
+ }
+
+ @Override
+ public int hashCode() {
+ return address.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof AddressWrapper)) {
+ return false;
+ }
+ return address.equals(((AddressWrapper)obj).address);
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException,
+ ClassNotFoundException {
+ String className = in.readUTF();
+ try {
+ this.address = (Address) ReflectionHelper.create(className, null, Thread.currentThread().getContextClassLoader());
+ this.address.readFrom(in);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeUTF(address.getClass().getName());
+ try {
+ address.writeTo(out);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ }
+
private static final long serialVersionUID = -6851804958313095166L;
private static final String CREATE_STATE = "createState"; //$NON-NLS-1$
private static final String BUILD_STATE = "buildState"; //$NON-NLS-1$
@@ -97,9 +150,8 @@
}
}
- private final static class ReplicatedInvocationHandler<S> implements
- InvocationHandler, Serializable, MessageListener, Receiver,
- MembershipListener {
+ private final static class ReplicatedInvocationHandler<S> extends ReceiverAdapter implements
+ InvocationHandler, Serializable {
private static final long serialVersionUID = -2943462899945966103L;
private final S object;
@@ -172,7 +224,7 @@
LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "pulling state", stateId); //$NON-NLS-1$
long timeout = annotation.timeout();
threadLocalPromise.set(new Promise<Boolean>());
- boolean getState = this.disp.getChannel().getState(null, stateId, timeout);
+ /*boolean getState = this.disp.getChannel().getState(null, stateId, timeout);
if (getState) {
Boolean loaded = threadLocalPromise.get().getResult(timeout);
if (Boolean.TRUE.equals(loaded)) {
@@ -182,7 +234,7 @@
}
} else {
LogManager.logInfo(LogConstants.CTX_RUNTIME, object + " first member or timeout exceeded " + stateId); //$NON-NLS-1$
- }
+ }*/
LogManager.logTrace(LogConstants.CTX_RUNTIME, object, "sent state", stateId); //$NON-NLS-1$
return result;
}
@@ -193,7 +245,7 @@
dests = new Vector<Address>(remoteMembers);
}
}
- RspList responses = disp.callRemoteMethods(dests, call, new RequestOptions().setMode(annotation.asynch()?ResponseMode.GET_NONE:ResponseMode.GET_ALL).setTimeout(annotation.timeout()));
+ RspList<Object> responses = disp.callRemoteMethods(dests, call, new RequestOptions().setMode(annotation.asynch()?ResponseMode.GET_NONE:ResponseMode.GET_ALL).setTimeout(annotation.timeout()));
if (annotation.asynch()) {
return null;
}
@@ -224,7 +276,11 @@
synchronized (remoteMembers) {
remoteMembers.removeAll(newView.getMembers());
if (object instanceof ReplicatedObject && !remoteMembers.isEmpty()) {
- ((ReplicatedObject)object).droppedMembers(new HashSet<Serializable>(remoteMembers));
+ HashSet<Serializable> dropped = new HashSet<Serializable>();
+ for (Address address : remoteMembers) {
+ dropped.add(new AddressWrapper(address));
+ }
+ ((ReplicatedObject)object).droppedMembers(dropped);
}
remoteMembers.clear();
remoteMembers.addAll(newView.getMembers());
@@ -353,10 +409,14 @@
if(log.isErrorEnabled()) log.error("message or message buffer is null"); //$NON-NLS-1$
return null;
}
+
+ if (req.getSrc().equals(local_addr)) {
+ return null;
+ }
try {
body=req_marshaller != null?
- req_marshaller.objectFromByteBuffer(req.getBuffer(), req.getOffset(), req.getLength())
+ req_marshaller.objectFromBuffer(req.getBuffer(), req.getOffset(), req.getLength())
: req.getObject();
}
catch(Throwable e) {
@@ -381,7 +441,7 @@
throw new Exception("MethodCall uses ID=" + method_call.getId() + ", but method_lookup has not been set"); //$NON-NLS-1$ //$NON-NLS-2$
if (method_call.getId() >= methodList.size() - 3) {
- Serializable address = req.getSrc();
+ Serializable address = new AddressWrapper(req.getSrc());
String stateId = (String)method_call.getArgs()[0];
List<?> key = Arrays.asList(stateId, address);
JGroupsInputStream is = inputStreams.get(key);
@@ -433,17 +493,15 @@
try {
channel.connect(mux_id);
if (object instanceof ReplicatedObject) {
- ((ReplicatedObject)object).setLocalAddress(channel.getAddress());
- boolean getState = channel.getState(null, startTimeout);
- if (getState) {
- Boolean loaded = proxy.state_promise.getResult(startTimeout);
- if (Boolean.TRUE.equals(loaded)) {
- LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "loaded"); //$NON-NLS-1$
- } else {
- LogManager.logWarning(LogConstants.CTX_RUNTIME, object + " load error or timeout"); //$NON-NLS-1$
- }
+ ((ReplicatedObject)object).setAddress(new AddressWrapper(channel.getAddress()));
+ channel.getState(null, startTimeout);
+ Boolean loaded = proxy.state_promise.getResult(1);
+ if (loaded == null) {
+ LogManager.logInfo(LogConstants.CTX_RUNTIME, object + " timeout exceeded or first member"); //$NON-NLS-1$
+ } else if (loaded) {
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, object, "loaded"); //$NON-NLS-1$
} else {
- LogManager.logInfo(LogConstants.CTX_RUNTIME, object + " first member or timeout exceeded"); //$NON-NLS-1$
+ LogManager.logWarning(LogConstants.CTX_RUNTIME, object + " load error"); //$NON-NLS-1$
}
}
success = true;
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-11-23 02:18:58 UTC (rev 3694)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-11-23 02:22:18 UTC (rev 3695)
@@ -1051,7 +1051,7 @@
}
@Override
- public void setLocalAddress(Serializable address) {
+ public void setAddress(Serializable address) {
}
@Override
Modified: trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java 2011-11-23 02:18:58 UTC (rev 3694)
+++ trunk/engine/src/main/java/org/teiid/query/ReplicatedObject.java 2011-11-23 02:22:18 UTC (rev 3695)
@@ -67,7 +67,7 @@
* Allows the replicator to set the local address from the channel
* @param address
*/
- void setLocalAddress(Serializable address);
+ void setAddress(Serializable address);
/**
* Called when members are dropped
Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStore.java 2011-11-23 02:18:58 UTC (rev 3694)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStore.java 2011-11-23 02:22:18 UTC (rev 3695)
@@ -49,7 +49,7 @@
TempTableStore getTempTableStore();
- Serializable getLocalAddress();
+ Serializable getAddress();
List<?> updateMatViewRow(String matTableName, List<?> tuple, boolean delete) throws TeiidComponentException;
Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java 2011-11-23 02:18:58 UTC (rev 3694)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java 2011-11-23 02:22:18 UTC (rev 3695)
@@ -352,12 +352,12 @@
//begin replication methods
@Override
- public void setLocalAddress(Serializable address) {
+ public void setAddress(Serializable address) {
this.localAddress = address;
}
@Override
- public Serializable getLocalAddress() {
+ public Serializable getAddress() {
return localAddress;
}
Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java 2011-11-23 02:18:58 UTC (rev 3694)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java 2011-11-23 02:22:18 UTC (rev 3695)
@@ -284,7 +284,7 @@
String matTableName = metadata.getFullName(matTableId);
LogManager.logDetail(LogConstants.CTX_MATVIEWS, "processing refreshmatview for", matViewName); //$NON-NLS-1$
boolean invalidate = Boolean.TRUE.equals(((Constant)proc.getParameter(2).getExpression()).getValue());
- boolean needsLoading = globalStore.needsLoading(matTableName, globalStore.getLocalAddress(), true, true, invalidate);
+ boolean needsLoading = globalStore.needsLoading(matTableName, globalStore.getAddress(), true, true, invalidate);
if (!needsLoading) {
return CollectionTupleSource.createUpdateCountTupleSource(-1);
}
@@ -367,9 +367,9 @@
final MatTableInfo info = globalStore.getMatTableInfo(tableName);
boolean load = false;
while (!info.isUpToDate()) {
- load = globalStore.needsLoading(tableName, globalStore.getLocalAddress(), true, false, false);
+ load = globalStore.needsLoading(tableName, globalStore.getAddress(), true, false, false);
if (load) {
- load = globalStore.needsLoading(tableName, globalStore.getLocalAddress(), false, false, false);
+ load = globalStore.needsLoading(tableName, globalStore.getAddress(), false, false, false);
if (load) {
break;
}
Modified: trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViewReplication.java
===================================================================
--- trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViewReplication.java 2011-11-23 02:18:58 UTC (rev 3694)
+++ trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViewReplication.java 2011-11-23 02:22:18 UTC (rev 3695)
@@ -22,8 +22,7 @@
package org.teiid.systemmodel;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
import java.sql.Connection;
import java.sql.ResultSet;
@@ -33,22 +32,20 @@
import java.util.HashMap;
import org.jboss.as.clustering.jgroups.ChannelFactory;
-import org.jboss.as.clustering.jgroups.JChannelFactory;
import org.jgroups.Channel;
+import org.jgroups.JChannel;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.teiid.core.types.DataTypeManager;
import org.teiid.core.util.UnitTestUtil;
import org.teiid.jdbc.FakeServer;
import org.teiid.metadata.FunctionMethod;
+import org.teiid.metadata.FunctionParameter;
import org.teiid.metadata.FunctionMethod.Determinism;
import org.teiid.metadata.FunctionMethod.PushDown;
-import org.teiid.metadata.FunctionParameter;
import org.teiid.replication.jboss.JGroupsObjectReplicator;
@SuppressWarnings("nls")
-@Ignore
public class TestMatViewReplication {
private static final String MATVIEWS = "matviews";
@@ -114,6 +111,7 @@
server2.stop();
}
+ @SuppressWarnings("serial")
private FakeServer createServer() throws Exception {
FakeServer server = new FakeServer();
@@ -123,9 +121,7 @@
return new ChannelFactory() {
@Override
public Channel createChannel(String id) throws Exception {
- JChannelFactory jcf = new JChannelFactory();
- jcf.setMultiplexerConfig(this.getClass().getClassLoader().getResource("stacks.xml")); //$NON-NLS-1$
- return jcf.createMultiplexerChannel("tcp", id);
+ return new JChannel(this.getClass().getClassLoader().getResource("tcp.xml"));
}
};
}
13 years, 5 months
teiid SVN: r3694 - trunk/engine/src/test/java/org/teiid/query/optimizer.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2011-11-22 21:18:58 -0500 (Tue, 22 Nov 2011)
New Revision: 3694
Modified:
trunk/engine/src/test/java/org/teiid/query/optimizer/TestJoinOptimization.java
Log:
TEIID-1022 updating indexing so that -1 can be used as a proper default cardinality
Modified: trunk/engine/src/test/java/org/teiid/query/optimizer/TestJoinOptimization.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/optimizer/TestJoinOptimization.java 2011-11-22 20:25:10 UTC (rev 3693)
+++ trunk/engine/src/test/java/org/teiid/query/optimizer/TestJoinOptimization.java 2011-11-23 02:18:58 UTC (rev 3694)
@@ -1034,7 +1034,7 @@
QueryMetadataInterface metadata = RealMetadataFactory.exampleBQT();
RealMetadataFactory.setCardinality("bqt1.smallb", 1800, metadata); //$NON-NLS-1$
- RealMetadataFactory.setCardinality("bqt1.smalla", 0, metadata); //$NON-NLS-1$
+ RealMetadataFactory.setCardinality("bqt1.smalla", -1, metadata); //$NON-NLS-1$
RealMetadataFactory.setCardinality("bqt2.smallb", 15662, metadata); //$NON-NLS-1$
TestOptimizer.helpPlan(
13 years, 5 months