JBoss Cache SVN: r6950 - core/tags.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-10-14 18:53:36 -0400 (Tue, 14 Oct 2008)
New Revision: 6950
Added:
core/tags/2.2.1.CR2/
Log:
Copied: core/tags/2.2.1.CR2 (from rev 6949, core/branches/2.2.X)
17 years, 2 months
JBoss Cache SVN: r6949 - in core/branches/flat/src/main/java/org/jboss/cache: marshall and 1 other directory.
by jbosscache-commits@lists.jboss.org
Author: mircea.markus
Date: 2008-10-14 13:54:41 -0400 (Tue, 14 Oct 2008)
New Revision: 6949
Added:
core/branches/flat/src/main/java/org/jboss/cache/interceptors/MarshalledValueInterceptor.java
core/branches/flat/src/main/java/org/jboss/cache/marshall/MarshalledValueMap.java
Log:
Added: core/branches/flat/src/main/java/org/jboss/cache/interceptors/MarshalledValueInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/cache/interceptors/MarshalledValueInterceptor.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/cache/interceptors/MarshalledValueInterceptor.java 2008-10-14 17:54:41 UTC (rev 6949)
@@ -0,0 +1,210 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.cache.interceptors;
+
+import org.jboss.cache.InvocationContext;
+import org.jboss.cache.commands.ReplicableCommand;
+import org.jboss.cache.commands.read.GetChildrenNamesCommand;
+import org.jboss.cache.commands.read.GetDataMapCommand;
+import org.jboss.cache.commands.read.GetKeyValueCommand;
+import org.jboss.cache.commands.read.GetKeysCommand;
+import org.jboss.cache.commands.read.GetNodeCommand;
+import org.jboss.cache.commands.write.ClearDataCommand;
+import org.jboss.cache.commands.write.PutDataMapCommand;
+import org.jboss.cache.commands.write.PutForExternalReadCommand;
+import org.jboss.cache.commands.write.PutKeyValueCommand;
+import org.jboss.cache.commands.write.RemoveKeyCommand;
+import org.jboss.cache.interceptors.base.CommandInterceptor;
+import org.jboss.starobrno.marshall.MarshalledValue;
+import org.jboss.starobrno.marshall.MarshalledValueHelper;
+import org.jboss.cache.marshall.MarshalledValueMap;
+
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Interceptor that handles the wrapping and unwrapping of cached data using {@link org.jboss.starobrno.marshall.MarshalledValue}s.
+ * Known "excluded" types are not wrapped/unwrapped, which at this time include {@link String}, Java primitives
+ * and their Object wrappers, as well as arrays of excluded types.
+ * <p/>
+ * The {@link org.jboss.starobrno.marshall.MarshalledValue} wrapper handles lazy deserialization from byte array representations.
+ *
+ * @author Manik Surtani (<a href="mailto:manik@jboss.org">manik(a)jboss.org</a>)
+ * @see org.jboss.starobrno.marshall.MarshalledValue
+ * @since 2.1.0
+ */
+public class MarshalledValueInterceptor extends CommandInterceptor
+{
+
+ @Override
+ public Object visitPutDataMapCommand(InvocationContext ctx, PutDataMapCommand command) throws Throwable
+ {
+ Set<MarshalledValue> marshalledValues = new HashSet<MarshalledValue>();
+ command.setData(wrapMap(command.getData(), marshalledValues, ctx));
+ Object retVal = invokeNextInterceptor(ctx, command);
+ return compactAndProcessRetVal(command, marshalledValues, retVal);
+ }
+
+ @Override
+ public Object visitGetDataMapCommand(InvocationContext ctx, GetDataMapCommand command) throws Throwable
+ {
+ Object retVal = invokeNextInterceptor(ctx, command);
+ if (retVal instanceof Map)
+ {
+ if (trace) log.trace("Return value is a Map and we're retrieving data. Wrapping as a MarshalledValueMap.");
+ Map retValMap = (Map) retVal;
+ if (!retValMap.isEmpty()) retVal = new MarshalledValueMap(retValMap);
+ }
+ return retVal;
+ }
+
+ @Override
+ public Object visitPutForExternalReadCommand(InvocationContext ctx, PutForExternalReadCommand command) throws Throwable
+ {
+ return visitPutKeyValueCommand(ctx, command);
+ }
+
+ @Override
+ public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable
+ {
+ Set<MarshalledValue> marshalledValues = new HashSet<MarshalledValue>();
+ if (!MarshalledValueHelper.isTypeExcluded(command.getKey().getClass()))
+ {
+ Object newKey = createAndAddMarshalledValue(command.getKey(), marshalledValues, ctx);
+ command.setKey(newKey);
+ }
+ if (!MarshalledValueHelper.isTypeExcluded(command.getValue().getClass()))
+ {
+ Object value = createAndAddMarshalledValue(command.getValue(), marshalledValues, ctx);
+ command.setValue(value);
+ }
+ Object retVal = invokeNextInterceptor(ctx, command);
+ return compactAndProcessRetVal(command, marshalledValues, retVal);
+ }
+
+ @Override
+ public Object visitGetNodeCommand(InvocationContext ctx, GetNodeCommand command) throws Throwable
+ {
+ Object retVal = invokeNextInterceptor(ctx, command);
+ return processRetVal(retVal);
+ }
+
+ @Override
+ public Object visitClearDataCommand(InvocationContext ctx, ClearDataCommand command) throws Throwable
+ {
+ Object retVal = invokeNextInterceptor(ctx, command);
+ return processRetVal(retVal);
+ }
+
+ @Override
+ public Object visitRemoveKeyCommand(InvocationContext ctx, RemoveKeyCommand command) throws Throwable
+ {
+ Set<MarshalledValue> marshalledValues = new HashSet<MarshalledValue>();
+ if (!MarshalledValueHelper.isTypeExcluded(command.getKey().getClass()))
+ {
+ Object value = createAndAddMarshalledValue(command.getKey(), marshalledValues, ctx);
+ command.setKey(value);
+ }
+ Object retVal = invokeNextInterceptor(ctx, command);
+ return compactAndProcessRetVal(command, marshalledValues, retVal);
+ }
+
+ @Override
+ public Object visitGetChildrenNamesCommand(InvocationContext ctx, GetChildrenNamesCommand command) throws Throwable
+ {
+ Object retVal = invokeNextInterceptor(ctx, command);
+ return processRetVal(retVal);
+ }
+
+ @Override
+ public Object visitGetKeysCommand(InvocationContext ctx, GetKeysCommand command) throws Throwable
+ {
+ Object retVal = invokeNextInterceptor(ctx, command);
+ return processRetVal(retVal);
+ }
+
+ @Override
+ public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable
+ {
+ Set<MarshalledValue> marshalledValues = new HashSet<MarshalledValue>();
+ if (!MarshalledValueHelper.isTypeExcluded(command.getKey().getClass()))
+ {
+ Object value = createAndAddMarshalledValue(command.getKey(), marshalledValues, ctx);
+ command.setKey(value);
+ }
+ Object retVal = invokeNextInterceptor(ctx, command);
+ return compactAndProcessRetVal(command, marshalledValues, retVal);
+ }
+
+ private Object compactAndProcessRetVal(ReplicableCommand command, Set<MarshalledValue> marshalledValues, Object retVal)
+ throws IOException, ClassNotFoundException
+ {
+ if (trace) log.trace("Compacting MarshalledValues created");
+ for (MarshalledValue mv : marshalledValues) mv.compact(false, false);
+
+ return processRetVal(retVal);
+ }
+
+ private Object processRetVal(Object retVal)
+ throws IOException, ClassNotFoundException
+ {
+ if (retVal instanceof MarshalledValue)
+ {
+ if (trace) log.trace("Return value is a MarshalledValue. Unwrapping.");
+ retVal = ((MarshalledValue) retVal).get();
+ }
+ return retVal;
+ }
+
+ @SuppressWarnings("unchecked")
+ protected Map wrapMap(Map<Object, Object> m, Set<MarshalledValue> marshalledValues, InvocationContext ctx) throws NotSerializableException
+ {
+ if (m == null)
+ {
+ if (trace) log.trace("Map is nul; returning an empty map.");
+ return Collections.emptyMap();
+ }
+ if (trace) log.trace("Wrapping map contents of argument " + m);
+ Map copy = new HashMap();
+ for (Map.Entry me : m.entrySet())
+ {
+ Object key = me.getKey();
+ Object value = me.getValue();
+ copy.put((key == null || MarshalledValueHelper.isTypeExcluded(key.getClass())) ? key : createAndAddMarshalledValue(key, marshalledValues, ctx),
+ (value == null || MarshalledValueHelper.isTypeExcluded(value.getClass())) ? value : createAndAddMarshalledValue(value, marshalledValues, ctx));
+ }
+ return copy;
+ }
+
+ protected MarshalledValue createAndAddMarshalledValue(Object toWrap, Set<MarshalledValue> marshalledValues, InvocationContext ctx) throws NotSerializableException
+ {
+ MarshalledValue mv = new MarshalledValue(toWrap);
+ marshalledValues.add(mv);
+ if (!ctx.isOriginLocal()) mv.setEqualityPreferenceForInstance(false);
+ return mv;
+ }
+}
Added: core/branches/flat/src/main/java/org/jboss/cache/marshall/MarshalledValueMap.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/cache/marshall/MarshalledValueMap.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/cache/marshall/MarshalledValueMap.java 2008-10-14 17:54:41 UTC (rev 6949)
@@ -0,0 +1,182 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.cache.marshall;
+
+import net.jcip.annotations.Immutable;
+import org.jboss.starobrno.CacheException;
+import org.jboss.starobrno.marshall.MarshalledValue;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A Map that is able to wrap/unwrap MarshalledValues in keys or values. Note that calling keySet(), entrySet() or values()
+ * could be expensive if this map is large!!
+ * <p/>
+ * Also note that this is an immutable Map.
+ * <p/>
+ *
+ * @author Manik Surtani (<a href="mailto:manik@jboss.org">manik(a)jboss.org</a>)
+ * @see org.jboss.starobrno.marshall.MarshalledValue
+ * @since 2.1.0
+ */
+@Immutable
+public class MarshalledValueMap implements Map, Externalizable
+{
+ Map delegate;
+ Map<Object, Object> unmarshalled;
+
+ public MarshalledValueMap()
+ {
+ // for externalization
+ }
+
+ public MarshalledValueMap(Map delegate)
+ {
+ this.delegate = delegate;
+ }
+
+ @SuppressWarnings("unchecked")
+ protected synchronized Map getUnmarshalledMap()
+ {
+ if (unmarshalled == null) unmarshalled = unmarshalledMap(delegate.entrySet());
+ return unmarshalled;
+ }
+
+ public int size()
+ {
+ return delegate.size();
+ }
+
+ public boolean isEmpty()
+ {
+ return delegate.isEmpty();
+ }
+
+ public boolean containsKey(Object key)
+ {
+ return getUnmarshalledMap().containsKey(key);
+ }
+
+ public boolean containsValue(Object value)
+ {
+ return getUnmarshalledMap().containsValue(value);
+ }
+
+ public Object get(Object key)
+ {
+ return getUnmarshalledMap().get(key);
+ }
+
+ public Object put(Object key, Object value)
+ {
+ throw new UnsupportedOperationException("This is an immutable map!");
+ }
+
+ public Object remove(Object key)
+ {
+ throw new UnsupportedOperationException("This is an immutable map!");
+ }
+
+ public void putAll(Map t)
+ {
+ throw new UnsupportedOperationException("This is an immutable map!");
+ }
+
+ public void clear()
+ {
+ throw new UnsupportedOperationException("This is an immutable map!");
+ }
+
+ public Set keySet()
+ {
+ return getUnmarshalledMap().keySet();
+ }
+
+ public Collection values()
+ {
+ return getUnmarshalledMap().values();
+ }
+
+ public Set entrySet()
+ {
+ return getUnmarshalledMap().entrySet();
+ }
+
+ @SuppressWarnings("unchecked")
+ protected Map unmarshalledMap(Set entries)
+ {
+ if (entries == null || entries.isEmpty()) return Collections.emptyMap();
+ Map map = new HashMap(entries.size());
+ for (Object e : entries)
+ {
+ Map.Entry entry = (Map.Entry) e;
+ map.put(getUnmarshalledValue(entry.getKey()), getUnmarshalledValue(entry.getValue()));
+ }
+ return map;
+ }
+
+ private Object getUnmarshalledValue(Object o)
+ {
+ try
+ {
+ return o instanceof MarshalledValue ? ((MarshalledValue) o).get() : o;
+ }
+ catch (Exception e)
+ {
+ throw new CacheException("Unable to unmarshall value", e);
+ }
+ }
+
+ @Override
+ public boolean equals(Object other)
+ {
+ if (other instanceof Map)
+ {
+ return getUnmarshalledMap().equals(other);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return getUnmarshalledMap().hashCode();
+ }
+
+ public void writeExternal(ObjectOutput out) throws IOException
+ {
+ out.writeObject(delegate);
+ }
+
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
+ {
+ delegate = (Map) in.readObject();
+ }
+}
17 years, 2 months
JBoss Cache SVN: r6948 - in core/branches/flat/src: main/java/org/jboss/starobrno/marshall and 3 other directories.
by jbosscache-commits@lists.jboss.org
Author: mircea.markus
Date: 2008-10-14 13:54:06 -0400 (Tue, 14 Oct 2008)
New Revision: 6948
Removed:
core/branches/flat/src/main/java/org/jboss/starobrno/marshall/MarshalledValue2.java
core/branches/flat/src/main/java/org/jboss/starobrno/marshall/MarshalledValueHelper2.java
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/MarshalledValueInterceptor.java
core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CacheMarshallerStarobrno.java
core/branches/flat/src/main/java/org/jboss/starobrno/marshall/MarshalledValueMap.java
core/branches/flat/src/test/java/org/jboss/starobrno/BasicTest.java
core/branches/flat/src/test/java/org/jboss/starobrno/api/mvcc/LockAssert.java
core/branches/flat/src/test/java/org/jboss/starobrno/api/mvcc/LockTestBase.java
core/branches/flat/src/test/java/org/jboss/starobrno/lock/LockContainerHashingTest.java
Log:
added support for replication
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/MarshalledValueInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/MarshalledValueInterceptor.java 2008-10-14 17:50:19 UTC (rev 6947)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/MarshalledValueInterceptor.java 2008-10-14 17:54:06 UTC (rev 6948)
@@ -27,9 +27,9 @@
import org.jboss.starobrno.commands.write.RemoveCommand;
import org.jboss.starobrno.context.InvocationContext;
import org.jboss.starobrno.interceptors.base.CommandInterceptor;
-import org.jboss.starobrno.marshall.MarshalledValue2;
-import org.jboss.starobrno.marshall.MarshalledValueHelper2;
-//import org.jboss.starobrno.marshall.MarshalledValueHelper2;
+import org.jboss.starobrno.marshall.MarshalledValue;
+import org.jboss.starobrno.marshall.MarshalledValueHelper;
+//import org.jboss.starobrno.marshall.MarshalledValueHelper;
import java.io.IOException;
import java.io.NotSerializableException;
@@ -40,14 +40,14 @@
import java.util.Set;
/**
- * Interceptor that handles the wrapping and unwrapping of cached data using {@link org.jboss.starobrno.marshall.MarshalledValue2}s.
+ * Interceptor that handles the wrapping and unwrapping of cached data using {@link org.jboss.starobrno.marshall.MarshalledValue}s.
* Known "excluded" types are not wrapped/unwrapped, which at this time include {@link String}, Java primitives
* and their Object wrappers, as well as arrays of excluded types.
* <p/>
- * The {@link org.jboss.starobrno.marshall.MarshalledValue2} wrapper handles lazy deserialization from byte array representations.
+ * The {@link org.jboss.starobrno.marshall.MarshalledValue} wrapper handles lazy deserialization from byte array representations.
*
* @author Manik Surtani (<a href="mailto:manik@jboss.org">manik(a)jboss.org</a>)
- * @see org.jboss.starobrno.marshall.MarshalledValue2
+ * @see org.jboss.starobrno.marshall.MarshalledValue
* @since 2.1.0
*/
public class MarshalledValueInterceptor extends CommandInterceptor
@@ -55,7 +55,7 @@
@Override
public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable
{
- Set<MarshalledValue2> marshalledValues = new HashSet<MarshalledValue2>();
+ Set<MarshalledValue> marshalledValues = new HashSet<MarshalledValue>();
command.setMap(wrapMap(command.getMap(), marshalledValues, ctx));
Object retVal = invokeNextInterceptor(ctx, command);
@@ -65,13 +65,13 @@
@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable
{
- Set<MarshalledValue2> marshalledValues = new HashSet<MarshalledValue2>();
- if (!MarshalledValueHelper2.isTypeExcluded(command.getKey().getClass()))
+ Set<MarshalledValue> marshalledValues = new HashSet<MarshalledValue>();
+ if (!MarshalledValueHelper.isTypeExcluded(command.getKey().getClass()))
{
Object newKey = createAndAddMarshalledValue(command.getKey(), marshalledValues, ctx);
command.setKey(newKey);
}
- if (!MarshalledValueHelper2.isTypeExcluded(command.getValue().getClass()))
+ if (!MarshalledValueHelper.isTypeExcluded(command.getValue().getClass()))
{
Object value = createAndAddMarshalledValue(command.getValue(), marshalledValues, ctx);
command.setValue(value);
@@ -83,8 +83,8 @@
@Override
public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable
{
- Set<MarshalledValue2> marshalledValues = new HashSet<MarshalledValue2>();
- if (!MarshalledValueHelper2.isTypeExcluded(command.getKey().getClass()))
+ Set<MarshalledValue> marshalledValues = new HashSet<MarshalledValue>();
+ if (!MarshalledValueHelper.isTypeExcluded(command.getKey().getClass()))
{
Object value = createAndAddMarshalledValue(command.getKey(), marshalledValues, ctx);
command.setKey(value);
@@ -96,8 +96,8 @@
@Override
public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable
{
- Set<MarshalledValue2> marshalledValues = new HashSet<MarshalledValue2>();
- if (!MarshalledValueHelper2.isTypeExcluded(command.getKey().getClass()))
+ Set<MarshalledValue> marshalledValues = new HashSet<MarshalledValue>();
+ if (!MarshalledValueHelper.isTypeExcluded(command.getKey().getClass()))
{
Object value = createAndAddMarshalledValue(command.getKey(), marshalledValues, ctx);
command.setKey(value);
@@ -106,11 +106,11 @@
return compactAndProcessRetVal(marshalledValues, retVal);
}
- private Object compactAndProcessRetVal(Set<MarshalledValue2> marshalledValues, Object retVal)
+ private Object compactAndProcessRetVal(Set<MarshalledValue> marshalledValues, Object retVal)
throws IOException, ClassNotFoundException
{
if (trace) log.trace("Compacting MarshalledValues created");
- for (MarshalledValue2 mv : marshalledValues) mv.compact(false, false);
+ for (MarshalledValue mv : marshalledValues) mv.compact(false, false);
return processRetVal(retVal);
}
@@ -118,16 +118,16 @@
private Object processRetVal(Object retVal)
throws IOException, ClassNotFoundException
{
- if (retVal instanceof MarshalledValue2)
+ if (retVal instanceof MarshalledValue)
{
- if (trace) log.trace("Return value is a MarshalledValue2. Unwrapping.");
- retVal = ((MarshalledValue2) retVal).get();
+ if (trace) log.trace("Return value is a MarshalledValue. Unwrapping.");
+ retVal = ((MarshalledValue) retVal).get();
}
return retVal;
}
@SuppressWarnings("unchecked")
- protected Map wrapMap(Map<Object, Object> m, Set<MarshalledValue2> marshalledValues, InvocationContext ctx) throws NotSerializableException
+ protected Map wrapMap(Map<Object, Object> m, Set<MarshalledValue> marshalledValues, InvocationContext ctx) throws NotSerializableException
{
if (m == null)
{
@@ -140,15 +140,15 @@
{
Object key = me.getKey();
Object value = me.getValue();
- copy.put((key == null || MarshalledValueHelper2.isTypeExcluded(key.getClass())) ? key : createAndAddMarshalledValue(key, marshalledValues, ctx),
- (value == null || MarshalledValueHelper2.isTypeExcluded(value.getClass())) ? value : createAndAddMarshalledValue(value, marshalledValues, ctx));
+ copy.put((key == null || MarshalledValueHelper.isTypeExcluded(key.getClass())) ? key : createAndAddMarshalledValue(key, marshalledValues, ctx),
+ (value == null || MarshalledValueHelper.isTypeExcluded(value.getClass())) ? value : createAndAddMarshalledValue(value, marshalledValues, ctx));
}
return copy;
}
- protected MarshalledValue2 createAndAddMarshalledValue(Object toWrap, Set<MarshalledValue2> marshalledValues, InvocationContext ctx) throws NotSerializableException
+ protected MarshalledValue createAndAddMarshalledValue(Object toWrap, Set<MarshalledValue> marshalledValues, InvocationContext ctx) throws NotSerializableException
{
- MarshalledValue2 mv = new MarshalledValue2(toWrap);
+ MarshalledValue mv = new MarshalledValue(toWrap);
marshalledValues.add(mv);
if (!ctx.isOriginLocal()) mv.setEqualityPreferenceForInstance(false);
return mv;
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CacheMarshallerStarobrno.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CacheMarshallerStarobrno.java 2008-10-14 17:50:19 UTC (rev 6947)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CacheMarshallerStarobrno.java 2008-10-14 17:54:06 UTC (rev 6948)
@@ -243,7 +243,7 @@
if (useRefs) refMap.putReferencedObject(reference, retVal);
return retVal;
case MAGICNUMBER_MARSHALLEDVALUE:
- MarshalledValue2 mv = new MarshalledValue2();
+ MarshalledValue mv = new MarshalledValue();
mv.readExternal(in);
return mv;
case MAGICNUMBER_METHODCALL:
Deleted: core/branches/flat/src/main/java/org/jboss/starobrno/marshall/MarshalledValue2.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/marshall/MarshalledValue2.java 2008-10-14 17:50:19 UTC (rev 6947)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/marshall/MarshalledValue2.java 2008-10-14 17:54:06 UTC (rev 6948)
@@ -1,229 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.starobrno.marshall;
-
-import org.jboss.starobrno.CacheException;
-import org.jboss.util.stream.MarshalledValueInputStream;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.NotSerializableException;
-import java.io.ObjectInput;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutput;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.util.Arrays;
-
-/**
- * Wrapper that wraps cached data, providing lazy deserialization using the calling thread's context class loader.
- * <p/>
- * The {@link org.jboss.cache.interceptors.MarshalledValueInterceptor} handles transparent
- * wrapping/unwrapping of cached data.
- * <p/>
- *
- * @author Manik Surtani (<a href="mailto:manik@jboss.org">manik(a)jboss.org</a>)
- * @see org.jboss.cache.interceptors.MarshalledValueInterceptor
- * @since 2.1.0
- */
-public class MarshalledValue2 implements Externalizable
-{
- protected Object instance;
- protected byte[] raw;
- private int cachedHashCode = 0;
- // by default equals() will test on the istance rather than the byte array if conversion is required.
- private transient boolean equalityPreferenceForInstance = true;
-
- public MarshalledValue2(Object instance) throws NotSerializableException
- {
- if (instance == null) throw new NullPointerException("Null values cannot be wrapped as MarshalledValues!");
-
- if (instance instanceof Serializable)
- this.instance = instance;
- else
- throw new NotSerializableException("Marshalled values can only wrap Objects that are serializable! Instance of " + instance.getClass() + " won't Serialize.");
- }
-
- public MarshalledValue2()
- {
- // empty ctor for serialization
- }
-
- public void setEqualityPreferenceForInstance(boolean equalityPreferenceForInstance)
- {
- this.equalityPreferenceForInstance = equalityPreferenceForInstance;
- }
-
- public synchronized void serialize()
- {
- if (raw == null)
- {
- try
- {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(baos);
- oos.writeObject(instance);
- oos.close();
- baos.close();
- // Do NOT set instance to null over here, since it may be used elsewhere (e.g., in a cache listener).
- // this will be compacted by the MarshalledValueInterceptor when the call returns.
-// instance = null;
- raw = baos.toByteArray();
- }
- catch (Exception e)
- {
- throw new CacheException("Unable to marshall value " + instance, e);
- }
- }
- }
-
- public synchronized void deserialize()
- {
- if (instance == null)
- {
- try
- {
- ByteArrayInputStream bais = new ByteArrayInputStream(raw);
- // use a MarshalledValueInputStream since it needs to be aware of any context class loaders on the current thread.
- ObjectInputStream ois = new MarshalledValueInputStream(bais);
- instance = ois.readObject();
- ois.close();
- bais.close();
-// raw = null;
- }
- catch (Exception e)
- {
- throw new CacheException("Unable to unmarshall value", e);
- }
- }
- }
-
- /**
- * Compacts the references held by this class to a single reference. If only one representation exists this method
- * is a no-op unless the 'force' parameter is used, in which case the reference held is forcefully switched to the
- * 'preferred representation'.
- * <p/>
- * Either way, a call to compact() will ensure that only one representation is held.
- * <p/>
- *
- * @param preferSerializedRepresentation if true and both representations exist, the serialized representation is favoured. If false, the deserialized representation is preferred.
- * @param force ensures the preferred representation is maintained and the other released, even if this means serializing or deserializing.
- */
- public void compact(boolean preferSerializedRepresentation, boolean force)
- {
- // reset the equalityPreference
- equalityPreferenceForInstance = true;
- if (force)
- {
- if (preferSerializedRepresentation && raw == null) serialize();
- else if (!preferSerializedRepresentation && instance == null) deserialize();
- }
-
- if (instance != null && raw != null)
- {
- // need to lose one representation!
-
- if (preferSerializedRepresentation)
- {
- instance = null;
- }
- else
- {
- raw = null;
- }
- }
- }
-
- public void writeExternal(ObjectOutput out) throws IOException
- {
- if (raw == null) serialize();
- out.writeInt(raw.length);
- out.write(raw);
- out.writeInt(hashCode());
- }
-
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
- {
- int size = in.readInt();
- raw = new byte[size];
- cachedHashCode = 0;
- in.readFully(raw);
- cachedHashCode = in.readInt();
- }
-
- public Object get() throws IOException, ClassNotFoundException
- {
- if (instance == null) deserialize();
- return instance;
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- MarshalledValue2 that = (MarshalledValue2) o;
-
- // if both versions are serialized or deserialized, just compare the relevant representations.
- if (raw != null && that.raw != null) return Arrays.equals(raw, that.raw);
- if (instance != null && that.instance != null) return instance.equals(that.instance);
-
- // if conversion of one representation to the other is necessary, then see which we prefer converting.
- if (equalityPreferenceForInstance)
- {
- if (instance == null) deserialize();
- if (that.instance == null) that.deserialize();
- return instance.equals(that.instance);
- }
- else
- {
- if (raw == null) serialize();
- if (that.raw == null) that.serialize();
- return Arrays.equals(raw, that.raw);
- }
- }
-
- @Override
- public int hashCode()
- {
- if (cachedHashCode == 0)
- {
- // always calculate the hashcode based on the instance since this is where we're getting the equals()
- if (instance == null) deserialize();
- cachedHashCode = instance.hashCode();
- if (cachedHashCode == 0) // degenerate case
- {
- cachedHashCode = 0xFEED;
- }
- }
- return cachedHashCode;
- }
-
- @Override
- public String toString()
- {
- return "MarshalledValue2(cachedHashCode=" + cachedHashCode + "; serialized=" + (raw != null) + ")";
- }
-}
Deleted: core/branches/flat/src/main/java/org/jboss/starobrno/marshall/MarshalledValueHelper2.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/marshall/MarshalledValueHelper2.java 2008-10-14 17:50:19 UTC (rev 6947)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/marshall/MarshalledValueHelper2.java 2008-10-14 17:54:06 UTC (rev 6948)
@@ -1,56 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.starobrno.marshall;
-
-import org.jboss.cache.Fqn;
-import org.jboss.cache.commands.ReplicableCommand;
-import org.jboss.cache.transaction.GlobalTransaction;
-import org.jboss.starobrno.marshall.MarshalledValue2;
-import org.jgroups.Address;
-
-/**
- * Common functionality used by the {@link org.jboss.cache.interceptors.MarshalledValueInterceptor} and the {@link org.jboss.cache.marshall.MarshalledValueMap}.
- *
- * @author Manik Surtani (<a href="mailto:manik@jboss.org">manik(a)jboss.org</a>)
- * @see MarshalledValue2
- * @see org.jboss.cache.interceptors.MarshalledValueInterceptor
- * @see org.jboss.cache.marshall.MarshalledValueMap
- * @since 2.1.0
- */
-public class MarshalledValueHelper2
-{
- /**
- * Tests whether the type should be excluded from MarshalledValue wrapping.
- *
- * @param type type to test. Should not be null.
- * @return true if it should be excluded from MarshalledValue wrapping.
- */
- public static boolean isTypeExcluded(Class type)
- {
- return type.equals(String.class) || type.isPrimitive() ||
- type.equals(Void.class) || type.equals(Boolean.class) || type.equals(Character.class) ||
- type.equals(Byte.class) || type.equals(Short.class) || type.equals(Integer.class) ||
- type.equals(Long.class) || type.equals(Float.class) || type.equals(Double.class) ||
- (type.isArray() && isTypeExcluded(type.getComponentType())) || type.equals(Fqn.class) || type.equals(GlobalTransaction.class) || type.equals(Address.class) ||
- ReplicableCommand.class.isAssignableFrom(type) || type.equals(MarshalledValue2.class);
- }
-}
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/marshall/MarshalledValueMap.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/marshall/MarshalledValueMap.java 2008-10-14 17:50:19 UTC (rev 6947)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/marshall/MarshalledValueMap.java 2008-10-14 17:54:06 UTC (rev 6948)
@@ -42,7 +42,7 @@
* <p/>
*
* @author Manik Surtani (<a href="mailto:manik@jboss.org">manik(a)jboss.org</a>)
- * @see MarshalledValue2
+ * @see MarshalledValue
* @since 2.1.0
*/
@Immutable
@@ -145,7 +145,7 @@
{
try
{
- return o instanceof MarshalledValue2 ? ((MarshalledValue2) o).get() : o;
+ return o instanceof MarshalledValue ? ((MarshalledValue) o).get() : o;
}
catch (Exception e)
{
Modified: core/branches/flat/src/test/java/org/jboss/starobrno/BasicTest.java
===================================================================
--- core/branches/flat/src/test/java/org/jboss/starobrno/BasicTest.java 2008-10-14 17:50:19 UTC (rev 6947)
+++ core/branches/flat/src/test/java/org/jboss/starobrno/BasicTest.java 2008-10-14 17:54:06 UTC (rev 6948)
@@ -61,4 +61,44 @@
cm.stop();
}
}
+
+ public void testBasicReplication()
+ {
+ Configuration configuration = new Configuration();
+ configuration.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+
+ CacheManager firstManager = new CacheManager(configuration);
+ CacheManager secondManager = new CacheManager(configuration);
+
+ try
+ {
+ firstManager.start();
+ secondManager.start();
+
+ Cache firstCache = firstManager.createCache("test");
+ Cache secondCache = firstManager.createCache("test");
+
+ firstCache.put("key","value");
+ assert secondCache.get("key").equals("value");
+ assert firstCache.get("key").equals("value");
+ secondCache.put("key", "value2");
+ assert firstCache.get("key").equals("value2");
+ firstCache.remove("key");
+ assert secondCache.get("key") == null;
+ } finally
+ {
+ firstManager.destroyCache("test");
+ secondManager.destroyCache("test");
+ }
+ }
+
+ public void concurrentMapMethodTest()
+ {
+
+ }
+
+ public void transactionalTest()
+ {
+
+ }
}
Modified: core/branches/flat/src/test/java/org/jboss/starobrno/api/mvcc/LockAssert.java
===================================================================
--- core/branches/flat/src/test/java/org/jboss/starobrno/api/mvcc/LockAssert.java 2008-10-14 17:50:19 UTC (rev 6947)
+++ core/branches/flat/src/test/java/org/jboss/starobrno/api/mvcc/LockAssert.java 2008-10-14 17:54:06 UTC (rev 6948)
@@ -1,9 +1,9 @@
package org.jboss.starobrno.api.mvcc;
-import org.jboss.cache.util.concurrent.locks.LockContainer;
import org.jboss.starobrno.invocation.InvocationContextContainer;
import org.jboss.starobrno.lock.LockManager;
import org.jboss.starobrno.util.TestingUtil;
+import org.jboss.starobrno.util.concurrent.locks.LockContainer;
/**
* Helper class to assert lock status in MVCC
Modified: core/branches/flat/src/test/java/org/jboss/starobrno/api/mvcc/LockTestBase.java
===================================================================
--- core/branches/flat/src/test/java/org/jboss/starobrno/api/mvcc/LockTestBase.java 2008-10-14 17:50:19 UTC (rev 6947)
+++ core/branches/flat/src/test/java/org/jboss/starobrno/api/mvcc/LockTestBase.java 2008-10-14 17:54:06 UTC (rev 6948)
@@ -4,12 +4,12 @@
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.DefaultCacheFactory;
import org.jboss.cache.lock.IsolationLevel;
-import org.jboss.cache.lock.TimeoutException;
import org.jboss.cache.transaction.DummyTransactionManagerLookup;
import org.jboss.starobrno.Cache;
import org.jboss.starobrno.config.Configuration;
import org.jboss.starobrno.invocation.InvocationContextContainer;
import org.jboss.starobrno.lock.LockManager;
+import org.jboss.starobrno.lock.TimeoutException;
import org.jboss.starobrno.util.TestingUtil;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Modified: core/branches/flat/src/test/java/org/jboss/starobrno/lock/LockContainerHashingTest.java
===================================================================
--- core/branches/flat/src/test/java/org/jboss/starobrno/lock/LockContainerHashingTest.java 2008-10-14 17:50:19 UTC (rev 6947)
+++ core/branches/flat/src/test/java/org/jboss/starobrno/lock/LockContainerHashingTest.java 2008-10-14 17:54:06 UTC (rev 6948)
@@ -21,8 +21,8 @@
*/
package org.jboss.starobrno.lock;
-import org.jboss.cache.util.concurrent.locks.LockContainer;
-import org.jboss.cache.util.concurrent.locks.ReentrantLockContainer;
+import org.jboss.starobrno.util.concurrent.locks.LockContainer;
+import org.jboss.starobrno.util.concurrent.locks.ReentrantLockContainer;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -69,10 +69,10 @@
// cannot be larger than the number of locks
System.out.println("dist size: " + distribution.size());
- System.out.println("num shared locks: " + stripedLock.getSize());
- assert distribution.size() <= stripedLock.getSize();
+ System.out.println("num shared locks: " + stripedLock.getNumLocksHeld());
+ assert distribution.size() <= stripedLock.getNumLocksHeld();
// assume at least a 2/3rd spread
- assert distribution.size() * 1.5 >= stripedLock.getSize();
+ assert distribution.size() * 1.5 >= stripedLock.getNumLocksHeld();
}
private List<String> createRandomKeys(int number)
17 years, 2 months
JBoss Cache SVN: r6947 - core/branches/flat/src/main/java/org/jboss/starobrno/interceptors.
by jbosscache-commits@lists.jboss.org
Author: mircea.markus
Date: 2008-10-14 13:50:19 -0400 (Tue, 14 Oct 2008)
New Revision: 6947
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/MarshalledValueInterceptor.java
Log:
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/MarshalledValueInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/MarshalledValueInterceptor.java 2008-10-14 17:50:03 UTC (rev 6946)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/MarshalledValueInterceptor.java 2008-10-14 17:50:19 UTC (rev 6947)
@@ -27,9 +27,9 @@
import org.jboss.starobrno.commands.write.RemoveCommand;
import org.jboss.starobrno.context.InvocationContext;
import org.jboss.starobrno.interceptors.base.CommandInterceptor;
-import org.jboss.starobrno.marshall.MarshalledValue;
-import org.jboss.starobrno.marshall.MarshalledValueHelper;
-//import org.jboss.starobrno.marshall.MarshalledValueHelper;
+import org.jboss.starobrno.marshall.MarshalledValue2;
+import org.jboss.starobrno.marshall.MarshalledValueHelper2;
+//import org.jboss.starobrno.marshall.MarshalledValueHelper2;
import java.io.IOException;
import java.io.NotSerializableException;
@@ -40,14 +40,14 @@
import java.util.Set;
/**
- * Interceptor that handles the wrapping and unwrapping of cached data using {@link MarshalledValue}s.
+ * Interceptor that handles the wrapping and unwrapping of cached data using {@link org.jboss.starobrno.marshall.MarshalledValue2}s.
* Known "excluded" types are not wrapped/unwrapped, which at this time include {@link String}, Java primitives
* and their Object wrappers, as well as arrays of excluded types.
* <p/>
- * The {@link MarshalledValue} wrapper handles lazy deserialization from byte array representations.
+ * The {@link org.jboss.starobrno.marshall.MarshalledValue2} wrapper handles lazy deserialization from byte array representations.
*
* @author Manik Surtani (<a href="mailto:manik@jboss.org">manik(a)jboss.org</a>)
- * @see MarshalledValue
+ * @see org.jboss.starobrno.marshall.MarshalledValue2
* @since 2.1.0
*/
public class MarshalledValueInterceptor extends CommandInterceptor
@@ -55,7 +55,7 @@
@Override
public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable
{
- Set<MarshalledValue> marshalledValues = new HashSet<MarshalledValue>();
+ Set<MarshalledValue2> marshalledValues = new HashSet<MarshalledValue2>();
command.setMap(wrapMap(command.getMap(), marshalledValues, ctx));
Object retVal = invokeNextInterceptor(ctx, command);
@@ -65,13 +65,13 @@
@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable
{
- Set<MarshalledValue> marshalledValues = new HashSet<MarshalledValue>();
- if (!MarshalledValueHelper.isTypeExcluded(command.getKey().getClass()))
+ Set<MarshalledValue2> marshalledValues = new HashSet<MarshalledValue2>();
+ if (!MarshalledValueHelper2.isTypeExcluded(command.getKey().getClass()))
{
Object newKey = createAndAddMarshalledValue(command.getKey(), marshalledValues, ctx);
command.setKey(newKey);
}
- if (!MarshalledValueHelper.isTypeExcluded(command.getValue().getClass()))
+ if (!MarshalledValueHelper2.isTypeExcluded(command.getValue().getClass()))
{
Object value = createAndAddMarshalledValue(command.getValue(), marshalledValues, ctx);
command.setValue(value);
@@ -83,8 +83,8 @@
@Override
public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable
{
- Set<MarshalledValue> marshalledValues = new HashSet<MarshalledValue>();
- if (!MarshalledValueHelper.isTypeExcluded(command.getKey().getClass()))
+ Set<MarshalledValue2> marshalledValues = new HashSet<MarshalledValue2>();
+ if (!MarshalledValueHelper2.isTypeExcluded(command.getKey().getClass()))
{
Object value = createAndAddMarshalledValue(command.getKey(), marshalledValues, ctx);
command.setKey(value);
@@ -96,8 +96,8 @@
@Override
public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable
{
- Set<MarshalledValue> marshalledValues = new HashSet<MarshalledValue>();
- if (!MarshalledValueHelper.isTypeExcluded(command.getKey().getClass()))
+ Set<MarshalledValue2> marshalledValues = new HashSet<MarshalledValue2>();
+ if (!MarshalledValueHelper2.isTypeExcluded(command.getKey().getClass()))
{
Object value = createAndAddMarshalledValue(command.getKey(), marshalledValues, ctx);
command.setKey(value);
@@ -106,11 +106,11 @@
return compactAndProcessRetVal(marshalledValues, retVal);
}
- private Object compactAndProcessRetVal(Set<MarshalledValue> marshalledValues, Object retVal)
+ private Object compactAndProcessRetVal(Set<MarshalledValue2> marshalledValues, Object retVal)
throws IOException, ClassNotFoundException
{
if (trace) log.trace("Compacting MarshalledValues created");
- for (MarshalledValue mv : marshalledValues) mv.compact(false, false);
+ for (MarshalledValue2 mv : marshalledValues) mv.compact(false, false);
return processRetVal(retVal);
}
@@ -118,16 +118,16 @@
private Object processRetVal(Object retVal)
throws IOException, ClassNotFoundException
{
- if (retVal instanceof MarshalledValue)
+ if (retVal instanceof MarshalledValue2)
{
- if (trace) log.trace("Return value is a MarshalledValue. Unwrapping.");
- retVal = ((MarshalledValue) retVal).get();
+ if (trace) log.trace("Return value is a MarshalledValue2. Unwrapping.");
+ retVal = ((MarshalledValue2) retVal).get();
}
return retVal;
}
@SuppressWarnings("unchecked")
- protected Map wrapMap(Map<Object, Object> m, Set<MarshalledValue> marshalledValues, InvocationContext ctx) throws NotSerializableException
+ protected Map wrapMap(Map<Object, Object> m, Set<MarshalledValue2> marshalledValues, InvocationContext ctx) throws NotSerializableException
{
if (m == null)
{
@@ -140,15 +140,15 @@
{
Object key = me.getKey();
Object value = me.getValue();
- copy.put((key == null || MarshalledValueHelper.isTypeExcluded(key.getClass())) ? key : createAndAddMarshalledValue(key, marshalledValues, ctx),
- (value == null || MarshalledValueHelper.isTypeExcluded(value.getClass())) ? value : createAndAddMarshalledValue(value, marshalledValues, ctx));
+ copy.put((key == null || MarshalledValueHelper2.isTypeExcluded(key.getClass())) ? key : createAndAddMarshalledValue(key, marshalledValues, ctx),
+ (value == null || MarshalledValueHelper2.isTypeExcluded(value.getClass())) ? value : createAndAddMarshalledValue(value, marshalledValues, ctx));
}
return copy;
}
- protected MarshalledValue createAndAddMarshalledValue(Object toWrap, Set<MarshalledValue> marshalledValues, InvocationContext ctx) throws NotSerializableException
+ protected MarshalledValue2 createAndAddMarshalledValue(Object toWrap, Set<MarshalledValue2> marshalledValues, InvocationContext ctx) throws NotSerializableException
{
- MarshalledValue mv = new MarshalledValue(toWrap);
+ MarshalledValue2 mv = new MarshalledValue2(toWrap);
marshalledValues.add(mv);
if (!ctx.isOriginLocal()) mv.setEqualityPreferenceForInstance(false);
return mv;
17 years, 2 months
JBoss Cache SVN: r6946 - in core/branches/flat/src/main/java/org/jboss/starobrno: transaction and 3 other directories.
by jbosscache-commits@lists.jboss.org
Author: mircea.markus
Date: 2008-10-14 13:50:03 -0400 (Tue, 14 Oct 2008)
New Revision: 6946
Added:
core/branches/flat/src/main/java/org/jboss/starobrno/RPCManager.java
core/branches/flat/src/main/java/org/jboss/starobrno/util/
core/branches/flat/src/main/java/org/jboss/starobrno/util/FastCopyHashMap.java
core/branches/flat/src/main/java/org/jboss/starobrno/util/Immutables.java
core/branches/flat/src/main/java/org/jboss/starobrno/util/ReflectionUtil.java
core/branches/flat/src/main/java/org/jboss/starobrno/util/concurrent/
core/branches/flat/src/main/java/org/jboss/starobrno/util/concurrent/locks/
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java
core/branches/flat/src/main/java/org/jboss/starobrno/CacheException.java
core/branches/flat/src/main/java/org/jboss/starobrno/CacheSPI.java
core/branches/flat/src/main/java/org/jboss/starobrno/transaction/TransactionTable.java
core/branches/flat/src/main/java/org/jboss/starobrno/util/concurrent/locks/LockContainer.java
core/branches/flat/src/main/java/org/jboss/starobrno/util/concurrent/locks/OwnableReentrantLock.java
core/branches/flat/src/main/java/org/jboss/starobrno/util/concurrent/locks/OwnableReentrantLockContainer.java
core/branches/flat/src/main/java/org/jboss/starobrno/util/concurrent/locks/ReentrantLockContainer.java
Log:
enabling replication
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java 2008-10-14 17:49:31 UTC (rev 6945)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java 2008-10-14 17:50:03 UTC (rev 6946)
@@ -26,7 +26,7 @@
import org.jboss.cache.buddyreplication.GravitateResult;
import org.jboss.cache.loader.CacheLoaderManager;
import org.jboss.cache.marshall.Marshaller;
-import org.jboss.cache.statetransfer.StateTransferManager;
+import org.jboss.starobrno.statetransfer.StateTransferManager;
import org.jboss.starobrno.batch.BatchContainer;
import org.jboss.starobrno.commands.CommandsFactory;
import org.jboss.starobrno.commands.read.GetKeyValueCommand;
@@ -47,7 +47,6 @@
import org.jboss.starobrno.interceptors.base.CommandInterceptor;
import org.jboss.starobrno.invocation.InvocationContextContainer;
import org.jboss.starobrno.notifications.Notifier;
-import org.jboss.starobrno.remoting.RPCManager;
import org.jboss.starobrno.transaction.GlobalTransaction;
import org.jboss.starobrno.transaction.TransactionTable;
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/CacheException.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/CacheException.java 2008-10-14 17:49:31 UTC (rev 6945)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/CacheException.java 2008-10-14 17:50:03 UTC (rev 6946)
@@ -24,7 +24,7 @@
/**
* Thrown when operations on {@link Cache} or {@link org.jboss.cache.Node} fail unexpectedly.
* <p/>
- * Specific subclasses such as {@link org.jboss.cache.lock.TimeoutException}, {@link org.jboss.cache.config.ConfigurationException} and {@link org.jboss.cache.lock.LockingException}
+ * Specific subclasses such as {@link org.jboss.starobrno.lock.TimeoutException}, {@link org.jboss.cache.config.ConfigurationException} and {@link org.jboss.cache.lock.LockingException}
* have more specific uses.
*
* @author <a href="mailto:bela@jboss.org">Bela Ban</a>
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/CacheSPI.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/CacheSPI.java 2008-10-14 17:49:31 UTC (rev 6945)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/CacheSPI.java 2008-10-14 17:50:03 UTC (rev 6946)
@@ -26,12 +26,11 @@
import org.jboss.cache.buddyreplication.GravitateResult;
import org.jboss.cache.loader.CacheLoaderManager;
import org.jboss.cache.marshall.Marshaller;
-import org.jboss.cache.statetransfer.StateTransferManager;
+import org.jboss.starobrno.statetransfer.StateTransferManager;
import org.jboss.starobrno.context.InvocationContext;
import org.jboss.starobrno.factories.ComponentRegistry;
import org.jboss.starobrno.interceptors.base.CommandInterceptor;
import org.jboss.starobrno.notifications.Notifier;
-import org.jboss.starobrno.remoting.RPCManager;
import org.jboss.starobrno.transaction.GlobalTransaction;
import org.jboss.starobrno.transaction.TransactionTable;
@@ -176,7 +175,7 @@
* From 2.1.0, Interceptor authors should obtain this by injection rather than this method. See the
* {@link org.jboss.cache.factories.annotations.Inject} annotation.
*
- * @return the current {@link org.jboss.cache.statetransfer.StateTransferManager}
+ * @return the current {@link org.jboss.starobrno.statetransfer.StateTransferManager}
*/
StateTransferManager getStateTransferManager();
Added: core/branches/flat/src/main/java/org/jboss/starobrno/RPCManager.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/RPCManager.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/RPCManager.java 2008-10-14 17:50:03 UTC (rev 6946)
@@ -0,0 +1,135 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno;
+
+import org.jboss.starobrno.commands.ReplicableCommand;
+import org.jgroups.Address;
+import org.jgroups.Channel;
+import org.jgroups.blocks.RspFilter;
+
+import java.util.List;
+import java.util.Vector;
+
+/**
+ * Provides a mechanism for communicating with other caches in the cluster. For now this is based on JGroups as an underlying
+ * transport, and in future more transport options may become available.
+ * <p/>
+ * Implementations have a simple lifecycle:
+ * <ul>
+ * <li>start() - starts the underlying channel based on configuration options injected, and connects the channel</li>
+ * <li>disconnect() - disconnects the channel</li>
+ * <li>stop() - stops the dispatcher and releases resources</li>
+ * </ul>
+ *
+ * @author Manik Surtani
+ * @since 2.1.0
+ */
+public interface RPCManager
+{
+ /**
+ * Disconnects and closes the underlying JGroups channel.
+ */
+ void disconnect();
+
+ /**
+ * Stops the RPCDispatcher and frees resources. Closes and disconnects the underlying JGroups channel if this is
+ * still open/connected.
+ */
+ void stop();
+
+ /**
+ * Starts the RPCManager by connecting the underlying JGroups channel (if configured for replication). Connecting
+ * the channel may also involve state transfer (if configured) so the interceptor chain should be started and
+ * available before this method is called.
+ */
+ void start();
+
+ /**
+ * Invokes an RPC call on other caches in the cluster.
+ *
+ * @param recipients a list of Addresses to invoke the call on. If this is null, the call is broadcast to the entire cluster.
+ * @param cacheCommand the cache command to invoke
+ * @param mode the group request mode to use. See {@link org.jgroups.blocks.GroupRequest}.
+ * @param timeout a timeout after which to throw a replication exception.
+ * @param responseFilter a response filter with which to filter out failed/unwanted/invalid responses.
+ * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue. See JGroups docs for more info.
+ * @return a list of responses from each member contacted.
+ * @throws Exception in the event of problems.
+ */
+ List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand cacheCommand, int mode, long timeout, RspFilter responseFilter, boolean useOutOfBandMessage) throws Exception;
+
+ /**
+ * Invokes an RPC call on other caches in the cluster.
+ *
+ * @param recipients a list of Addresses to invoke the call on. If this is null, the call is broadcast to the entire cluster.
+ * @param cacheCommand the cache command to invoke
+ * @param mode the group request mode to use. See {@link org.jgroups.blocks.GroupRequest}.
+ * @param timeout a timeout after which to throw a replication exception.
+ * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue. See JGroups docs for more info.
+ * @return a list of responses from each member contacted.
+ * @throws Exception in the event of problems.
+ */
+ List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand cacheCommand, int mode, long timeout, boolean useOutOfBandMessage) throws Exception;
+
+ /**
+ * Invokes an RPC call on other caches in the cluster.
+ *
+ * @param recipients a list of Addresses to invoke the call on. If this is null, the call is broadcast to the entire cluster.
+ * @param cacheCommand the cache command to invoke
+ * @param synchronous if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL}, and if false sets it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
+ * @param timeout a timeout after which to throw a replication exception.
+ * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue. See JGroups docs for more info.
+ * @return a list of responses from each member contacted.
+ * @throws Exception in the event of problems.
+ */
+ List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand cacheCommand, boolean synchronous, long timeout, boolean useOutOfBandMessage) throws Exception;
+
+ /**
+ * @return true if the current Channel is the coordinator of the cluster.
+ */
+ boolean isCoordinator();
+
+ /**
+ * @return the Address of the current coordinator.
+ */
+ Address getCoordinator();
+
+ /**
+ * Retrieves the local JGroups channel's address
+ *
+ * @return an Address
+ */
+ Address getLocalAddress();
+
+ /**
+ * Returns a defensively copied list of members in the current cluster view.
+ */
+ List<Address> getMembers();
+
+
+ /**
+ * Retrieves the Channel
+ *
+ * @return a channel
+ */
+ Channel getChannel();
+}
\ No newline at end of file
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/transaction/TransactionTable.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/transaction/TransactionTable.java 2008-10-14 17:49:31 UTC (rev 6945)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/transaction/TransactionTable.java 2008-10-14 17:50:03 UTC (rev 6946)
@@ -24,12 +24,12 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.starobrno.CacheException;
+import org.jboss.starobrno.RPCManager;
import org.jboss.starobrno.context.InvocationContext;
import org.jboss.starobrno.context.TransactionContext;
import org.jboss.starobrno.factories.annotations.Inject;
import org.jboss.starobrno.factories.annotations.NonVolatile;
import org.jboss.starobrno.factories.context.ContextFactory;
-import org.jboss.starobrno.remoting.RPCManager;
import org.jgroups.Address;
import javax.transaction.Status;
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/util/FastCopyHashMap.java (from rev 6897, core/branches/flat/src/main/java/org/jboss/cache/util/FastCopyHashMap.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/util/FastCopyHashMap.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/util/FastCopyHashMap.java 2008-10-14 17:50:03 UTC (rev 6946)
@@ -0,0 +1,836 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.util;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.AbstractCollection;
+import java.util.AbstractMap;
+import java.util.AbstractSet;
+import java.util.Collection;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+/**
+ * A HashMap that is optimized for fast shallow copies.
+ *
+ * @author Jason T. Greene
+ */
+public class FastCopyHashMap<K, V> extends AbstractMap<K, V> implements Map<K, V>, Cloneable, Serializable
+{
+ /**
+ * Marks null keys.
+ */
+ private static final Object NULL = new Object();
+
+ /**
+ * Serialization ID
+ */
+ private static final long serialVersionUID = 10929568968762L;
+
+ /**
+ * Same default as HashMap, must be a power of 2
+ */
+ private static final int DEFAULT_CAPACITY = 8;
+
+ /**
+ * MAX_INT - 1
+ */
+ private static final int MAXIMUM_CAPACITY = 1 << 30;
+
+ /**
+ * 67%, just like IdentityHashMap
+ */
+ private static final float DEFAULT_LOAD_FACTOR = 0.67f;
+
+ /**
+ * The open-addressed table
+ */
+ private transient Entry<K, V>[] table;
+
+ /**
+ * The current number of key-value pairs
+ */
+ private transient int size;
+
+ /**
+ * The next resize
+ */
+ private transient int threshold;
+
+ /**
+ * The user defined load factor which defines when to resize
+ */
+ private final float loadFactor;
+
+ /**
+ * Counter used to detect changes made outside of an iterator
+ */
+ private transient int modCount;
+
+ // Cached views
+ private transient KeySet keySet;
+ private transient Values values;
+ private transient EntrySet entrySet;
+
+ public FastCopyHashMap(int initialCapacity, float loadFactor)
+ {
+ if (initialCapacity < 0)
+ throw new IllegalArgumentException("Can not have a negative size table!");
+
+ if (initialCapacity > MAXIMUM_CAPACITY)
+ initialCapacity = MAXIMUM_CAPACITY;
+
+ if (!(loadFactor > 0F && loadFactor <= 1F))
+ throw new IllegalArgumentException("Load factor must be greater than 0 and less than or equal to 1");
+
+ this.loadFactor = loadFactor;
+ init(initialCapacity, loadFactor);
+ }
+
+ @SuppressWarnings("unchecked")
+ public FastCopyHashMap(Map<? extends K, ? extends V> map)
+ {
+ if (map instanceof FastCopyHashMap)
+ {
+ FastCopyHashMap<? extends K, ? extends V> fast = (FastCopyHashMap<? extends K, ? extends V>) map;
+ this.table = (Entry<K, V>[]) fast.table.clone();
+ this.loadFactor = fast.loadFactor;
+ this.size = fast.size;
+ this.threshold = fast.threshold;
+ }
+ else
+ {
+ this.loadFactor = DEFAULT_LOAD_FACTOR;
+ init(map.size(), this.loadFactor);
+ putAll(map);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void init(int initialCapacity, float loadFactor)
+ {
+ int c = 1;
+ for (; c < initialCapacity; c <<= 1) ;
+
+ this.table = (Entry<K, V>[]) new Entry[c];
+
+ threshold = (int) (c * loadFactor);
+ }
+
+ public FastCopyHashMap(int initialCapacity)
+ {
+ this(initialCapacity, DEFAULT_LOAD_FACTOR);
+ }
+
+ public FastCopyHashMap()
+ {
+ this(DEFAULT_CAPACITY);
+ }
+
+ // The normal bit spreader...
+ private static final int hash(Object key)
+ {
+ int h = key.hashCode();
+ h ^= (h >>> 20) ^ (h >>> 12);
+ return h ^ (h >>> 7) ^ (h >>> 4);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static final <K> K maskNull(K key)
+ {
+ return key == null ? (K) NULL : key;
+ }
+
+ private static final <K> K unmaskNull(K key)
+ {
+ return key == NULL ? null : key;
+ }
+
+ private int nextIndex(int index, int length)
+ {
+ index = (index >= length - 1) ? 0 : index + 1;
+ return index;
+ }
+
+ private static final boolean eq(Object o1, Object o2)
+ {
+ return o1 == o2 || (o1 != null && o1.equals(o2));
+ }
+
+ private static final int index(int hashCode, int length)
+ {
+ return hashCode & (length - 1);
+ }
+
+ public int size()
+ {
+ return size;
+ }
+
+ public boolean isEmpty()
+ {
+ return size == 0;
+ }
+
+ public V get(Object key)
+ {
+ key = maskNull(key);
+
+ int hash = hash(key);
+ int length = table.length;
+ int index = index(hash, length);
+
+ for (; ;)
+ {
+ Entry<K, V> e = table[index];
+ if (e == null)
+ return null;
+
+ if (e.hash == hash && eq(key, e.key))
+ return e.value;
+
+ index = nextIndex(index, length);
+ }
+ }
+
+ public boolean containsKey(Object key)
+ {
+ key = maskNull(key);
+
+ int hash = hash(key);
+ int length = table.length;
+ int index = index(hash, length);
+
+ for (; ;)
+ {
+ Entry<K, V> e = table[index];
+ if (e == null)
+ return false;
+
+ if (e.hash == hash && eq(key, e.key))
+ return true;
+
+ index = nextIndex(index, length);
+ }
+ }
+
+ public boolean containsValue(Object value)
+ {
+ for (Entry<K, V> e : table)
+ if (e != null && eq(value, e.value))
+ return true;
+
+ return false;
+ }
+
+ public V put(K key, V value)
+ {
+ key = maskNull(key);
+
+ Entry<K, V>[] table = this.table;
+ int hash = hash(key);
+ int length = table.length;
+ int start = index(hash, length);
+ int index = start;
+
+
+ for (; ;)
+ {
+ Entry<K, V> e = table[index];
+ if (e == null)
+ break;
+
+ if (e.hash == hash && eq(key, e.key))
+ {
+ table[index] = new Entry<K, V>(e.key, e.hash, value);
+ return e.value;
+ }
+
+ index = nextIndex(index, length);
+ if (index == start)
+ throw new IllegalStateException("Table is full!");
+ }
+
+ modCount++;
+ table[index] = new Entry<K, V>(key, hash, value);
+ if (++size >= threshold)
+ resize(length);
+
+ return null;
+ }
+
+
+ @SuppressWarnings("unchecked")
+ private void resize(int from)
+ {
+ int newLength = from << 1;
+
+ // Can't get any bigger
+ if (newLength > MAXIMUM_CAPACITY || newLength <= from)
+ return;
+
+ Entry<K, V>[] newTable = new Entry[newLength];
+ Entry<K, V>[] old = table;
+
+ for (Entry<K, V> e : old)
+ {
+ if (e == null)
+ continue;
+
+ int index = index(e.hash, newLength);
+ while (newTable[index] != null)
+ index = nextIndex(index, newLength);
+
+ newTable[index] = e;
+ }
+
+ threshold = (int) (loadFactor * newLength);
+ table = newTable;
+ }
+
+ public void putAll(Map<? extends K, ? extends V> map)
+ {
+ int size = map.size();
+ if (size == 0)
+ return;
+
+ if (size > threshold)
+ {
+ if (size > MAXIMUM_CAPACITY)
+ size = MAXIMUM_CAPACITY;
+
+ int length = table.length;
+ for (; length < size; length <<= 1) ;
+
+ resize(length);
+ }
+
+ for (Map.Entry<? extends K, ? extends V> e : map.entrySet())
+ put(e.getKey(), e.getValue());
+ }
+
+ public V remove(Object key)
+ {
+ key = maskNull(key);
+
+ Entry<K, V>[] table = this.table;
+ int length = table.length;
+ int hash = hash(key);
+ int start = index(hash, length);
+
+ for (int index = start; ;)
+ {
+ Entry<K, V> e = table[index];
+ if (e == null)
+ return null;
+
+ if (e.hash == hash && eq(key, e.key))
+ {
+ table[index] = null;
+ relocate(index);
+ modCount++;
+ size--;
+ return e.value;
+ }
+
+ index = nextIndex(index, length);
+ if (index == start)
+ return null;
+ }
+
+
+ }
+
+ private void relocate(int start)
+ {
+ Entry<K, V>[] table = this.table;
+ int length = table.length;
+ int current = nextIndex(start, length);
+
+ for (; ;)
+ {
+ Entry<K, V> e = table[current];
+ if (e == null)
+ return;
+
+ // A Doug Lea variant of Knuth's Section 6.4 Algorithm R.
+ // This provides a non-recursive method of relocating
+ // entries to their optimal positions once a gap is created.
+ int prefer = index(e.hash, length);
+ if ((current < prefer && (prefer <= start || start <= current))
+ || (prefer <= start && start <= current))
+ {
+ table[start] = e;
+ table[current] = null;
+ start = current;
+ }
+
+ current = nextIndex(current, length);
+ }
+ }
+
+ public void clear()
+ {
+ modCount++;
+ Entry<K, V>[] table = this.table;
+ for (int i = 0; i < table.length; i++)
+ table[i] = null;
+
+ size = 0;
+ }
+
+ @SuppressWarnings("unchecked")
+ public Object clone()
+ {
+ try
+ {
+ FastCopyHashMap<K, V> clone = (FastCopyHashMap<K, V>) super.clone();
+ clone.table = table.clone();
+ clone.entrySet = null;
+ clone.values = null;
+ clone.keySet = null;
+ return clone;
+ }
+ catch (CloneNotSupportedException e)
+ {
+ // should never happen
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public void printDebugStats()
+ {
+ int optimal = 0;
+ int total = 0;
+ int totalSkew = 0;
+ int maxSkew = 0;
+ for (int i = 0; i < table.length; i++)
+ {
+ Entry<K, V> e = table[i];
+ if (e != null)
+ {
+
+ total++;
+ int target = index(e.hash, table.length);
+ if (i == target)
+ optimal++;
+ else
+ {
+ int skew = Math.abs(i - target);
+ if (skew > maxSkew) maxSkew = skew;
+ totalSkew += skew;
+ }
+
+ }
+ }
+
+ System.out.println(" Size: " + size);
+ System.out.println(" Real Size: " + total);
+ System.out.println(" Optimal: " + optimal + " (" + (float) optimal * 100 / total + "%)");
+ System.out.println(" Average Distnce: " + ((float) totalSkew / (total - optimal)));
+ System.out.println(" Max Distance: " + maxSkew);
+ }
+
+ public Set<Map.Entry<K, V>> entrySet()
+ {
+ if (entrySet == null)
+ entrySet = new EntrySet();
+
+ return entrySet;
+ }
+
+ public Set<K> keySet()
+ {
+ if (keySet == null)
+ keySet = new KeySet();
+
+ return keySet;
+ }
+
+ public Collection<V> values()
+ {
+ if (values == null)
+ values = new Values();
+
+ return values;
+ }
+
+ @SuppressWarnings("unchecked")
+ private void readObject(java.io.ObjectInputStream s) throws IOException, ClassNotFoundException
+ {
+ s.defaultReadObject();
+
+ int size = s.readInt();
+
+ init(size, loadFactor);
+
+ for (int i = 0; i < size; i++)
+ {
+ K key = (K) s.readObject();
+ V value = (V) s.readObject();
+ putForCreate(key, value);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void putForCreate(K key, V value)
+ {
+ key = maskNull(key);
+
+ Entry<K, V>[] table = this.table;
+ int hash = hash(key);
+ int length = table.length;
+ int index = index(hash, length);
+
+ Entry<K, V> e = table[index];
+ while (e != null)
+ {
+ index = nextIndex(index, length);
+ e = table[index];
+ }
+
+ table[index] = new Entry<K, V>(key, hash, value);
+ }
+
+ private void writeObject(java.io.ObjectOutputStream s) throws IOException
+ {
+ s.defaultWriteObject();
+ s.writeInt(size);
+
+ for (Entry<K, V> e : table)
+ {
+ if (e != null)
+ {
+ s.writeObject(unmaskNull(e.key));
+ s.writeObject(e.value);
+ }
+ }
+ }
+
+ private static final class Entry<K, V>
+ {
+ final K key;
+ final int hash;
+ final V value;
+
+ Entry(K key, int hash, V value)
+ {
+ this.key = key;
+ this.hash = hash;
+ this.value = value;
+ }
+ }
+
+ private abstract class FasyCopyHashMapIterator<E> implements Iterator<E>
+ {
+ private int next = 0;
+ private int expectedCount = modCount;
+ private int current = -1;
+ private boolean hasNext;
+ Entry<K, V> table[] = FastCopyHashMap.this.table;
+
+ public boolean hasNext()
+ {
+ if (hasNext == true)
+ return true;
+
+ Entry<K, V> table[] = this.table;
+ for (int i = next; i < table.length; i++)
+ {
+ if (table[i] != null)
+ {
+ next = i;
+ return hasNext = true;
+ }
+ }
+
+ next = table.length;
+ return false;
+ }
+
+ protected Entry<K, V> nextEntry()
+ {
+ if (modCount != expectedCount)
+ throw new ConcurrentModificationException();
+
+ if (!hasNext && !hasNext())
+ throw new NoSuchElementException();
+
+ current = next++;
+ hasNext = false;
+
+ return table[current];
+ }
+
+ @SuppressWarnings("unchecked")
+ public void remove()
+ {
+ if (modCount != expectedCount)
+ throw new ConcurrentModificationException();
+
+ int current = this.current;
+ int delete = current;
+
+ if (current == -1)
+ throw new IllegalStateException();
+
+ // Invalidate current (prevents multiple remove)
+ this.current = -1;
+
+ // Start were we relocate
+ next = delete;
+
+ Entry<K, V>[] table = this.table;
+ if (table != FastCopyHashMap.this.table)
+ {
+ FastCopyHashMap.this.remove(table[delete].key);
+ table[delete] = null;
+ expectedCount = modCount;
+ return;
+ }
+
+
+ int length = table.length;
+ int i = delete;
+
+ table[delete] = null;
+ size--;
+
+ for (; ;)
+ {
+ i = nextIndex(i, length);
+ Entry<K, V> e = table[i];
+ if (e == null)
+ break;
+
+ int prefer = index(e.hash, length);
+ if ((i < prefer && (prefer <= delete || delete <= i))
+ || (prefer <= delete && delete <= i))
+ {
+ // Snapshot the unseen portion of the table if we have
+ // to relocate an entry that was already seen by this iterator
+ if (i < current && current <= delete && table == FastCopyHashMap.this.table)
+ {
+ int remaining = length - current;
+ Entry<K, V>[] newTable = (Entry<K, V>[]) new Entry[remaining];
+ System.arraycopy(table, current, newTable, 0, remaining);
+
+ // Replace iterator's table.
+ // Leave table local var pointing to the real table
+ this.table = newTable;
+ next = 0;
+ }
+
+ // Do the swap on the real table
+ table[delete] = e;
+ table[i] = null;
+ delete = i;
+ }
+ }
+ }
+ }
+
+
+ private class KeyIterator extends FasyCopyHashMapIterator<K>
+ {
+ public K next()
+ {
+ return unmaskNull(nextEntry().key);
+ }
+ }
+
+ private class ValueIterator extends FasyCopyHashMapIterator<V>
+ {
+ public V next()
+ {
+ return nextEntry().value;
+ }
+ }
+
+ private class EntryIterator extends FasyCopyHashMapIterator<Map.Entry<K, V>>
+ {
+ private class WriteThroughEntry extends SimpleEntry<K, V>
+ {
+ WriteThroughEntry(K key, V value)
+ {
+ super(key, value);
+ }
+
+ public V setValue(V value)
+ {
+ if (table != FastCopyHashMap.this.table)
+ FastCopyHashMap.this.put(getKey(), value);
+
+ return super.setValue(value);
+ }
+ }
+
+ public Map.Entry<K, V> next()
+ {
+ Entry<K, V> e = nextEntry();
+ return new WriteThroughEntry(unmaskNull(e.key), e.value);
+ }
+
+ }
+
+ private class KeySet extends AbstractSet<K>
+ {
+ public Iterator<K> iterator()
+ {
+ return new KeyIterator();
+ }
+
+ public void clear()
+ {
+ FastCopyHashMap.this.clear();
+ }
+
+ public boolean contains(Object o)
+ {
+ return containsKey(o);
+ }
+
+ public boolean remove(Object o)
+ {
+ int size = size();
+ FastCopyHashMap.this.remove(o);
+ return size() < size;
+ }
+
+ public int size()
+ {
+ return FastCopyHashMap.this.size();
+ }
+ }
+
+ private class Values extends AbstractCollection<V>
+ {
+ public Iterator<V> iterator()
+ {
+ return new ValueIterator();
+ }
+
+ public void clear()
+ {
+ FastCopyHashMap.this.clear();
+ }
+
+ public int size()
+ {
+ return FastCopyHashMap.this.size();
+ }
+ }
+
+ private class EntrySet extends AbstractSet<Map.Entry<K, V>>
+ {
+ public Iterator<Map.Entry<K, V>> iterator()
+ {
+ return new EntryIterator();
+ }
+
+ public boolean contains(Object o)
+ {
+ if (!(o instanceof Map.Entry))
+ return false;
+
+ Map.Entry<?, ?> entry = (Map.Entry<?, ?>) o;
+ Object value = get(entry.getKey());
+ return eq(entry.getValue(), value);
+ }
+
+ public void clear()
+ {
+ FastCopyHashMap.this.clear();
+ }
+
+ public boolean isEmpty()
+ {
+ return FastCopyHashMap.this.isEmpty();
+ }
+
+ public int size()
+ {
+ return FastCopyHashMap.this.size();
+ }
+ }
+
+ protected static class SimpleEntry<K, V> implements Map.Entry<K, V>
+ {
+ private K key;
+ private V value;
+
+ SimpleEntry(K key, V value)
+ {
+ this.key = key;
+ this.value = value;
+ }
+
+ SimpleEntry(Map.Entry<K, V> entry)
+ {
+ this.key = entry.getKey();
+ this.value = entry.getValue();
+ }
+
+ public K getKey()
+ {
+ return key;
+ }
+
+ public V getValue()
+ {
+ return value;
+ }
+
+ public V setValue(V value)
+ {
+ V old = this.value;
+ this.value = value;
+ return old;
+ }
+
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ return true;
+
+ if (!(o instanceof Map.Entry))
+ return false;
+ Map.Entry<?, ?> e = (Map.Entry<?, ?>) o;
+ return eq(key, e.getKey()) && eq(value, e.getValue());
+ }
+
+ public int hashCode()
+ {
+ return hash(key) ^
+ (value == null ? 0 : hash(value));
+ }
+
+ public String toString()
+ {
+ return getKey() + "=" + getValue();
+ }
+ }
+}
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/util/Immutables.java (from rev 6897, core/branches/flat/src/main/java/org/jboss/cache/util/Immutables.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/util/Immutables.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/util/Immutables.java 2008-10-14 17:50:03 UTC (rev 6946)
@@ -0,0 +1,586 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.util;
+
+import org.jboss.starobrno.util.FastCopyHashMap;
+import org.jboss.cache.util.ImmutableListCopy;
+
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * Factory for generating immutable type wrappers.
+ *
+ * @author Jason T. Greene
+ */
+public class Immutables
+{
+ /**
+ * Whether or not this collection type is immutable
+ *
+ * @param o a Collection, Set, List, or Map
+ * @return true if immutable, false if not
+ */
+ public static boolean isImmutable(Object o)
+ {
+ return o instanceof Immutable;
+ }
+
+ /**
+ * Converts a Collection to an immutable List by copying it.
+ *
+ * @param source the collection to convert
+ * @return a copied/converted immutable list
+ */
+ public static <T> List<T> immutableListConvert(Collection<? extends T> source)
+ {
+ return new ImmutableListCopy<T>(source);
+ }
+
+ /**
+ * Creates an immutable copy of the list.
+ *
+ * @param list the list to copy
+ * @return the immutable copy
+ */
+ public static <T> List<T> immutableListCopy(List<? extends T> list)
+ {
+ return new ImmutableListCopy<T>(list);
+ }
+
+ /**
+ * Wraps an array with an immutable list. There is no copying involved.
+ *
+ * @param <T>
+ * @param array the array to wrap
+ * @return a list containing the array
+ */
+ public static <T> List<T> immutableListWrap(T... array)
+ {
+ return new ImmutableListCopy<T>(array);
+ }
+
+ /**
+ * Creates a new immutable list containing the union (combined entries) of both lists.
+ *
+ * @param list1 contains the first elements of the new list
+ * @param list2 contains the successor elements of the new list
+ * @return a new immutable merged copy of list1 and list2
+ */
+ public static <T> List<T> immutableListMerge(List<? extends T> list1, List<? extends T> list2)
+ {
+ return new ImmutableListCopy<T>(list1, list2);
+ }
+
+ /**
+ * Converts a Collections into an immutable Set by copying it.
+ *
+ * @param collection the collection to convert/copy
+ * @return a new immutable set containing the elements in collection
+ */
+ public static <T> Set<T> immutableSetConvert(Collection<? extends T> collection)
+ {
+ return immutableSetWrap(new HashSet<T>(collection));
+ }
+
+ /**
+ * Wraps a set with an immutable set. There is no copying involved.
+ *
+ * @param set the set to wrap
+ * @return an immutable set wrapper that delegates to the original set
+ */
+ public static <T> Set<T> immutableSetWrap(Set<? extends T> set)
+ {
+ return new ImmutableSetWrapper<T>(set);
+ }
+
+ /**
+ * Creates an immutable copy of the specified set.
+ *
+ * @param set the set to copy from
+ * @return an immutable set copy
+ */
+ public static <T> Set<T> immutableSetCopy(Set<? extends T> set)
+ {
+ Set<? extends T> copy = attemptKnownSetCopy(set);
+ if (copy == null)
+ attemptClone(set);
+ if (copy == null)
+ // Set uses Collection copy-ctor
+ copy = attemptCopyConstructor(set, Collection.class);
+ if (copy == null)
+ copy = new HashSet<T>(set);
+
+ return new ImmutableSetWrapper<T>(copy);
+ }
+
+
+ /**
+ * Wraps a map with an immutable map. There is no copying involved.
+ *
+ * @param map the map to wrap
+ * @return an immutable map wrapper that delegates to the original map
+ */
+ public static <K, V> Map<K, V> immutableMapWrap(Map<? extends K, ? extends V> map)
+ {
+ return new ImmutableMapWrapper<K, V>(map);
+ }
+
+ /**
+ * Creates an immutable copy of the specified map.
+ *
+ * @param map the map to copy from
+ * @return an immutable map copy
+ */
+ public static <K, V> Map<K, V> immutableMapCopy(Map<? extends K, ? extends V> map)
+ {
+ Map<? extends K, ? extends V> copy = attemptKnownMapCopy(map);
+
+ if (copy == null)
+ attemptClone(map);
+ if (copy == null)
+ copy = attemptCopyConstructor(map, Map.class);
+ if (copy == null)
+ copy = new HashMap<K, V>(map);
+
+ return new ImmutableMapWrapper<K, V>(copy);
+ }
+
+ /**
+ * Creates a new immutable copy of the specified Collection.
+ *
+ * @param collection the collection to copy
+ * @return an immutable copy
+ */
+ public static <T> Collection<T> immutableCollectionCopy(Collection<? extends T> collection)
+ {
+ Collection<? extends T> copy = attemptKnownSetCopy(collection);
+ if (copy == null)
+ copy = attemptClone(collection);
+ if (copy == null)
+ copy = attemptCopyConstructor(collection, Collection.class);
+ if (copy == null)
+ copy = new ArrayList<T>(collection);
+
+ return new ImmutableCollectionWrapper<T>(copy);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T extends Map> T attemptKnownMapCopy(T map)
+ {
+ if (map instanceof FastCopyHashMap)
+ return (T) ((FastCopyHashMap) map).clone();
+ if (map instanceof HashMap)
+ return (T) ((HashMap) map).clone();
+ if (map instanceof LinkedHashMap)
+ return (T) ((LinkedHashMap) map).clone();
+ if (map instanceof TreeMap)
+ return (T) ((TreeMap) map).clone();
+
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T extends Collection> T attemptKnownSetCopy(T set)
+ {
+ if (set instanceof HashSet)
+ return (T) ((HashSet) set).clone();
+ if (set instanceof LinkedHashSet)
+ return (T) ((LinkedHashSet) set).clone();
+ if (set instanceof TreeSet)
+ return (T) ((TreeSet) set).clone();
+
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> T attemptClone(T source)
+ {
+ if (source instanceof Cloneable)
+ {
+ try
+ {
+ return (T) source.getClass().getMethod("clone").invoke(source);
+ }
+ catch (Exception e)
+ {
+ }
+ }
+
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> T attemptCopyConstructor(T source, Class<? super T> clazz)
+ {
+ try
+ {
+ return (T) source.getClass().getConstructor(clazz).newInstance(source);
+ }
+ catch (Exception e)
+ {
+ }
+
+ return null;
+ }
+
+
+ public interface Immutable
+ {
+ }
+
+ /*
+ * Immutable wrapper types.
+ *
+ * We have to re-implement Collections.unmodifiableXXX, since it is not
+ * simple to detect them (the class names are JDK dependent).
+ */
+
+ private static class ImmutableIteratorWrapper<E> implements Iterator<E>
+ {
+ private Iterator<? extends E> iterator;
+
+ public ImmutableIteratorWrapper(Iterator<? extends E> iterator)
+ {
+ this.iterator = iterator;
+ }
+
+ public boolean hasNext()
+ {
+ return iterator.hasNext();
+ }
+
+ public E next()
+ {
+ return iterator.next();
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private static class ImmutableCollectionWrapper<E> implements Collection<E>, Serializable, Immutable
+ {
+ private static final long serialVersionUID = 6777564328198393535L;
+
+ Collection<? extends E> collection;
+
+ public ImmutableCollectionWrapper(Collection<? extends E> collection)
+ {
+ this.collection = collection;
+ }
+
+ public boolean add(E o)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean addAll(Collection<? extends E> c)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void clear()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean contains(Object o)
+ {
+ return collection.contains(o);
+ }
+
+ public boolean containsAll(Collection<?> c)
+ {
+ return collection.containsAll(c);
+ }
+
+ public boolean equals(Object o)
+ {
+ return collection.equals(o);
+ }
+
+ public int hashCode()
+ {
+ return collection.hashCode();
+ }
+
+ public boolean isEmpty()
+ {
+ return collection.isEmpty();
+ }
+
+ public Iterator<E> iterator()
+ {
+ return new ImmutableIteratorWrapper<E>(collection.iterator());
+ }
+
+ public boolean remove(Object o)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean removeAll(Collection<?> c)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean retainAll(Collection<?> c)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public int size()
+ {
+ return collection.size();
+ }
+
+ public Object[] toArray()
+ {
+ return collection.toArray();
+ }
+
+ public <T> T[] toArray(T[] a)
+ {
+ return collection.toArray(a);
+ }
+
+ public String toString()
+ {
+ return collection.toString();
+ }
+ }
+
+
+ private static class ImmutableSetWrapper<E> extends ImmutableCollectionWrapper<E> implements Set<E>, Serializable, Immutable
+ {
+ private static final long serialVersionUID = 7991492805176142615L;
+
+ public ImmutableSetWrapper(Set<? extends E> set)
+ {
+ super(set);
+ }
+ }
+
+
+ static class ImmutableEntry<K, V> implements Entry<K, V>
+ {
+ private K key;
+ private V value;
+ private int hash;
+
+ ImmutableEntry(Entry<? extends K, ? extends V> entry)
+ {
+ this.key = entry.getKey();
+ this.value = entry.getValue();
+ this.hash = entry.hashCode();
+ }
+
+ public K getKey()
+ {
+ return key;
+ }
+
+ public V getValue()
+ {
+ return value;
+ }
+
+ public V setValue(V value)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ private static boolean eq(Object o1, Object o2)
+ {
+ return o1 == o2 || (o1 != null && o1.equals(o2));
+ }
+
+ @SuppressWarnings("unchecked")
+ public boolean equals(Object o)
+ {
+ if (!(o instanceof Entry))
+ return false;
+
+ Entry<K, V> entry = (Entry<K, V>) o;
+ return eq(entry.getKey(), key) && eq(entry.getValue(), value);
+ }
+
+ public int hashCode()
+ {
+ return hash;
+ }
+
+ public String toString()
+ {
+ return getKey() + "=" + getValue();
+ }
+ }
+
+ private static class ImmutableEntrySetWrapper<K, V> extends ImmutableSetWrapper<Entry<K, V>>
+ {
+ private static final long serialVersionUID = 6378667653889667692L;
+
+ @SuppressWarnings("unchecked")
+ public ImmutableEntrySetWrapper(Set<? extends Entry<? extends K, ? extends V>> set)
+ {
+ super((Set<Entry<K, V>>) set);
+ }
+
+ public Object[] toArray()
+ {
+ Object[] array = new Object[collection.size()];
+ int i = 0;
+ for (Entry<K, V> entry : this)
+ array[i++] = entry;
+ return array;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> T[] toArray(T[] array)
+ {
+ int size = collection.size();
+ if (array.length < size)
+ array = (T[]) Array.newInstance(array.getClass().getComponentType(), size);
+
+ int i = 0;
+ Object[] result = array;
+ for (Entry<K, V> entry : this)
+ result[i++] = entry;
+
+ return array;
+ }
+
+ public Iterator<Entry<K, V>> iterator()
+ {
+ return new ImmutableIteratorWrapper<Entry<K, V>>(collection.iterator())
+ {
+ public Entry<K, V> next()
+ {
+ return new ImmutableEntry<K, V>(super.next());
+ }
+ };
+ }
+ }
+
+ private static class ImmutableMapWrapper<K, V> implements Map<K, V>, Serializable, Immutable
+ {
+ private static final long serialVersionUID = 708144227046742221L;
+
+ private Map<? extends K, ? extends V> map;
+
+ public ImmutableMapWrapper(Map<? extends K, ? extends V> map)
+ {
+ this.map = map;
+ }
+
+ public void clear()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean containsKey(Object key)
+ {
+ return map.containsKey(key);
+ }
+
+ public boolean containsValue(Object value)
+ {
+ return map.containsValue(value);
+ }
+
+ public Set<Entry<K, V>> entrySet()
+ {
+ return new ImmutableEntrySetWrapper<K, V>(map.entrySet());
+ }
+
+ public boolean equals(Object o)
+ {
+ return map.equals(o);
+ }
+
+ public V get(Object key)
+ {
+ return map.get(key);
+ }
+
+ public int hashCode()
+ {
+ return map.hashCode();
+ }
+
+ public boolean isEmpty()
+ {
+ return map.isEmpty();
+ }
+
+ public Set<K> keySet()
+ {
+ return new ImmutableSetWrapper<K>(map.keySet());
+ }
+
+ public V put(K key, V value)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void putAll(Map<? extends K, ? extends V> t)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public V remove(Object key)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public int size()
+ {
+ return map.size();
+ }
+
+ public Collection<V> values()
+ {
+ return new ImmutableCollectionWrapper<V>(map.values());
+ }
+
+ public String toString()
+ {
+ return map.toString();
+ }
+ }
+}
\ No newline at end of file
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/util/ReflectionUtil.java (from rev 6897, core/branches/flat/src/main/java/org/jboss/cache/util/reflect/ReflectionUtil.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/util/ReflectionUtil.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/util/ReflectionUtil.java 2008-10-14 17:50:03 UTC (rev 6946)
@@ -0,0 +1,153 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.starobrno.CacheException;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Basic reflection utilities to enhance what the JDK provides.
+ *
+ * @author Manik Surtani (<a href="mailto:manik@jboss.org">manik(a)jboss.org</a>)
+ * @since 2.1.0
+ */
+public class ReflectionUtil
+{
+ private static final Log log = LogFactory.getLog(ReflectionUtil.class);
+
+ /**
+ * Returns a set of Methods that contain the given method annotation. This includes all public, protected, package and private
+ * methods, as well as those of superclasses. Note that this does *not* include overridden methods.
+ *
+ * @param c class to inspect
+ * @param annotationType the type of annotation to look for
+ * @return List of Method objects that require injection.
+ */
+ public static List<Method> getAllMethods(Class c, Class<? extends Annotation> annotationType)
+ {
+ List<Method> annotated = new LinkedList<Method>();
+ inspectRecursively(c, annotated, annotationType);
+ return annotated;
+ }
+
+ /**
+ * Inspects a class and its superclasses (all the way to {@link Object} for method instances that contain a given annotation.
+ * This even identifies private, package and protected methods, not just public ones.
+ *
+ * @param c
+ * @param s
+ * @param annotationType
+ */
+ private static void inspectRecursively(Class c, List<Method> s, Class<? extends Annotation> annotationType)
+ {
+ // Superclass first
+ if (!c.equals(Object.class)) inspectRecursively(c.getSuperclass(), s, annotationType);
+
+ for (Method m : c.getDeclaredMethods())
+ {
+ // don't bother if this method has already been overridden by a subclass
+ if (!alreadyFound(m, s) && m.isAnnotationPresent(annotationType))
+ {
+ s.add(m);
+ }
+ }
+ }
+
+ /**
+ * Tests whether a method has already been found, i.e., overridden.
+ *
+ * @param m method to inspect
+ * @param s collection of methods found
+ * @return true a method with the same signature already exists.
+ */
+ private static boolean alreadyFound(Method m, Collection<Method> s)
+ {
+ for (Method found : s)
+ {
+ if (m.getName().equals(found.getName()) &&
+ Arrays.equals(m.getParameterTypes(), found.getParameterTypes()))
+ return true;
+ }
+ return false;
+ }
+
+ public static void setValue(Object instance, String fieldName, Object value)
+ {
+ try
+ {
+ Field f = findFieldRecursively(instance.getClass(), fieldName);
+ if (f == null)
+ throw new NoSuchMethodException("Cannot find field " + fieldName + " on " + instance.getClass() + " or superclasses");
+ f.setAccessible(true);
+ f.set(instance, value);
+ }
+ catch (Exception e)
+ {
+ log.error("Unable to set value!", e);
+ }
+ }
+
+ private static Field findFieldRecursively(Class c, String fieldName)
+ {
+ Field f = null;
+ try
+ {
+ f = c.getDeclaredField(fieldName);
+ }
+ catch (NoSuchFieldException e)
+ {
+ if (!c.equals(Object.class)) f = findFieldRecursively(c.getSuperclass(), fieldName);
+ }
+ return f;
+ }
+
+ /**
+ * Invokes a method using reflection, in an accessible manner (by using {@link Method#setAccessible(boolean)}
+ *
+ * @param instance instance on which to execute the method
+ * @param method method to execute
+ * @param parameters parameters
+ */
+ public static void invokeAccessibly(Object instance, Method method, Object[] parameters)
+ {
+ try
+ {
+ method.setAccessible(true);
+ method.invoke(instance, parameters);
+ }
+ catch (Exception e)
+ {
+ throw new CacheException("Unable to invoke method " + method + " on object " + //instance +
+ (parameters != null ? " with parameters " + Arrays.asList(parameters) : ""), e);
+ }
+ }
+
+}
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/util/concurrent/locks (from rev 6897, core/branches/flat/src/main/java/org/jboss/cache/util/concurrent/locks)
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/util/concurrent/locks/LockContainer.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/cache/util/concurrent/locks/LockContainer.java 2008-10-09 13:22:09 UTC (rev 6897)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/util/concurrent/locks/LockContainer.java 2008-10-14 17:50:03 UTC (rev 6946)
@@ -19,7 +19,7 @@
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
-package org.jboss.cache.util.concurrent.locks;
+package org.jboss.starobrno.util.concurrent.locks;
import net.jcip.annotations.ThreadSafe;
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/util/concurrent/locks/OwnableReentrantLock.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/cache/util/concurrent/locks/OwnableReentrantLock.java 2008-10-09 13:22:09 UTC (rev 6897)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/util/concurrent/locks/OwnableReentrantLock.java 2008-10-14 17:50:03 UTC (rev 6946)
@@ -19,7 +19,7 @@
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
-package org.jboss.cache.util.concurrent.locks;
+package org.jboss.starobrno.util.concurrent.locks;
import net.jcip.annotations.ThreadSafe;
import org.jboss.starobrno.invocation.InvocationContextContainer;
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/util/concurrent/locks/OwnableReentrantLockContainer.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/cache/util/concurrent/locks/OwnableReentrantLockContainer.java 2008-10-09 13:22:09 UTC (rev 6897)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/util/concurrent/locks/OwnableReentrantLockContainer.java 2008-10-14 17:50:03 UTC (rev 6946)
@@ -19,7 +19,7 @@
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
-package org.jboss.cache.util.concurrent.locks;
+package org.jboss.starobrno.util.concurrent.locks;
import net.jcip.annotations.ThreadSafe;
import org.jboss.starobrno.invocation.InvocationContextContainer;
@@ -27,11 +27,11 @@
import java.util.Arrays;
/**
- * A LockContainer that holds {@link org.jboss.cache.util.concurrent.locks.OwnableReentrantLock}s.
+ * A LockContainer that holds {@link org.jboss.starobrno.util.concurrent.locks.OwnableReentrantLock}s.
*
* @author Manik Surtani (<a href="mailto:manik@jboss.org">manik(a)jboss.org</a>)
- * @see org.jboss.cache.util.concurrent.locks.ReentrantLockContainer
- * @see org.jboss.cache.util.concurrent.locks.OwnableReentrantLock
+ * @see org.jboss.starobrno.util.concurrent.locks.ReentrantLockContainer
+ * @see org.jboss.starobrno.util.concurrent.locks.OwnableReentrantLock
* @since 3.0
*/
@ThreadSafe
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/util/concurrent/locks/ReentrantLockContainer.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/cache/util/concurrent/locks/ReentrantLockContainer.java 2008-10-09 13:22:09 UTC (rev 6897)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/util/concurrent/locks/ReentrantLockContainer.java 2008-10-14 17:50:03 UTC (rev 6946)
@@ -19,7 +19,7 @@
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
-package org.jboss.cache.util.concurrent.locks;
+package org.jboss.starobrno.util.concurrent.locks;
import net.jcip.annotations.ThreadSafe;
@@ -30,7 +30,7 @@
* A LockContainer that holds ReentrantLocks
*
* @author Manik Surtani (<a href="mailto:manik@jboss.org">manik(a)jboss.org</a>)
- * @see org.jboss.cache.util.concurrent.locks.OwnableReentrantLockContainer
+ * @see org.jboss.starobrno.util.concurrent.locks.OwnableReentrantLockContainer
* @since 3.0
*/
@ThreadSafe
17 years, 2 months
JBoss Cache SVN: r6945 - in core/branches/flat/src/main/java/org/jboss/starobrno: statetransfer and 1 other directory.
by jbosscache-commits@lists.jboss.org
Author: mircea.markus
Date: 2008-10-14 13:49:31 -0400 (Tue, 14 Oct 2008)
New Revision: 6945
Added:
core/branches/flat/src/main/java/org/jboss/starobrno/statetransfer/
core/branches/flat/src/main/java/org/jboss/starobrno/statetransfer/DefaultStateTransferManager.java
core/branches/flat/src/main/java/org/jboss/starobrno/statetransfer/StateTransferManager.java
Log:
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/statetransfer/DefaultStateTransferManager.java (from rev 6897, core/branches/flat/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/statetransfer/DefaultStateTransferManager.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/statetransfer/DefaultStateTransferManager.java 2008-10-14 17:49:31 UTC (rev 6945)
@@ -0,0 +1,203 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.statetransfer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.NodeSPI;
+import org.jboss.cache.RegionManager;
+import org.jboss.cache.statetransfer.StateTransferIntegrator;
+import org.jboss.cache.statetransfer.StateTransferGenerator;
+import org.jboss.cache.loader.CacheLoaderManager;
+import org.jboss.cache.marshall.Marshaller;
+import org.jboss.starobrno.marshall.NodeData;
+import org.jboss.starobrno.marshall.NodeDataMarker;
+import org.jboss.starobrno.CacheSPI;
+import org.jboss.starobrno.statetransfer.StateTransferManager;
+import org.jboss.starobrno.config.Configuration;
+import org.jboss.starobrno.factories.annotations.Inject;
+import org.jboss.starobrno.factories.annotations.Start;
+
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+/**
+ * The default state transfer manager to be used when using MVCC locking.
+ */
+public class DefaultStateTransferManager implements StateTransferManager
+{
+ protected final static Log log = LogFactory.getLog(DefaultStateTransferManager.class);
+ protected static final boolean trace = log.isTraceEnabled();
+
+ public static final NodeData STREAMING_DELIMITER_NODE = new NodeDataMarker();
+
+ public static final String PARTIAL_STATE_DELIMITER = "_PARTIAL_STATE_DELIMITER";
+
+ protected CacheSPI cache;
+ protected Marshaller marshaller;
+ protected RegionManager regionManager;
+ protected Configuration configuration;
+ private CacheLoaderManager cacheLoaderManager;
+ boolean fetchTransientState;
+ boolean fetchPersistentState;
+ protected long stateRetrievalTimeout;
+ protected StateTransferIntegrator integrator;
+ protected StateTransferGenerator generator;
+
+
+ @Inject
+ public void injectDependencies()
+ {
+ this.cache = cache;
+ this.regionManager = regionManager;
+ this.marshaller = marshaller;
+ this.configuration = configuration;
+ this.cacheLoaderManager = cacheLoaderManager;
+ this.integrator = integrator;
+ this.generator = generator;
+ }
+
+ @Start(priority = 14)
+ public void start()
+ {
+// fetchTransientState = configuration.isFetchInMemoryState();
+// fetchPersistentState = cacheLoaderManager != null && cacheLoaderManager.isFetchPersistentState();
+// stateRetrievalTimeout = configuration.getStateRetrievalTimeout();
+ }
+
+ public void getState(ObjectOutputStream out, Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Exception
+ {
+ throw new UnsupportedOperationException("Implement me properly!");
+ /*
+
+ // can't give state for regions currently being activated/inactivated
+ boolean canProvideState = (!regionManager.isInactive(fqn) && cache.peek(fqn, false) != null);
+ if (trace) log.trace("Can provide state? " + canProvideState);
+ if (canProvideState && (fetchPersistentState || fetchTransientState))
+ {
+ marshaller.objectToObjectStream(true, out);
+ long startTime = System.currentTimeMillis();
+ InternalNode subtreeRoot = fqn.isRoot() ? cache.getRoot().getDelegationTarget() : cache.getNode(fqn).getDelegationTarget();
+
+ // we don't need READ locks for MVCC based state transfer!
+ if (log.isDebugEnabled())
+ log.debug("Generating in-memory (transient) state for subtree " + fqn);
+
+ generator.generateState(out, subtreeRoot, fetchTransientState, fetchPersistentState, suppressErrors);
+
+ if (log.isDebugEnabled())
+ log.debug("Successfully generated state in " + (System.currentTimeMillis() - startTime) + " msec");
+ }
+ else
+ {
+ marshaller.objectToObjectStream(false, out);
+ Exception e = null;
+ if (!canProvideState)
+ {
+ String exceptionMessage = "Cache instance at " + cache.getLocalAddress() + " cannot provide state for fqn " + fqn + ".";
+
+ if (regionManager.isInactive(fqn))
+ {
+ exceptionMessage += " Region for fqn " + fqn + " is inactive.";
+ e = new InactiveRegionException(exceptionMessage);
+ }
+ // this is not really an exception. Just provide empty state. The exception is just a signal. Yes, lousy. - JBCACHE-1349
+ if (cache.peek(fqn, false, false) == null)
+ {
+ e = new RegionEmptyException();
+ }
+ }
+ if (!fetchPersistentState && !fetchTransientState)
+ {
+ e = new CacheException("Cache instance at " + cache.getLocalAddress() + " is not configured to provide state");
+ }
+ marshaller.objectToObjectStream(e, out);
+ if (e != null) throw e;
+ }
+ */
+ }
+
+ public void setState(ObjectInputStream in, Fqn targetRoot) throws Exception
+ {
+ throw new UnsupportedOperationException("fix me!");
+ /*
+ cache.getInvocationContext().getOptionOverrides().setSkipCacheStatusCheck(true);
+ NodeSPI target = cache.getNode(targetRoot);
+ if (target == null)
+ {
+ // Create the integration root, but do not replicate
+ cache.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+
+ //needed for BR state transfers
+ cache.getInvocationContext().getOptionOverrides().setSkipCacheStatusCheck(true);
+ cache.put(targetRoot, null);
+ cache.getInvocationContext().getOptionOverrides().setSkipCacheStatusCheck(true);
+ target = cache.getNode(targetRoot);
+ }
+ Object o = marshaller.objectFromObjectStream(in);
+ Boolean hasState = (Boolean) o;
+ if (hasState)
+ {
+ setState(in, target);
+ }
+ else
+ {
+ throw new CacheException("Cache instance at " + cache.getLocalAddress()
+ + " cannot integrate state since state provider could not provide state due to " + marshaller.objectFromObjectStream(in));
+ }
+ */
+ }
+
+ /**
+ * Set the portion of the cache rooted in <code>targetRoot</code>
+ * to match the given state. Updates the contents of <code>targetRoot</code>
+ * to reflect those in <code>new_state</code>.
+ * <p/>
+ * <strong>NOTE:</strong> This method performs no locking of nodes; it
+ * is up to the caller to lock <code>targetRoot</code> before calling
+ * this method.
+ *
+ * @param state a serialized byte[][] array where element 0 is the
+ * transient state (or null) , and element 1 is the
+ * persistent state (or null)
+ * @param targetRoot node into which the state should be integrated
+ */
+ protected void setState(ObjectInputStream state, NodeSPI targetRoot) throws Exception
+ {
+ long startTime = System.currentTimeMillis();
+ /*
+ * Vladimir/Manik/Brian (Dec 7,2006)
+ *
+ * integrator.integrateState(in,targetRoot, cl) will call cache.put for each
+ * node read from stream. Having option override below allows nodes read
+ * to be directly stored into a tree since we bypass interceptor chain.
+ *
+ */
+ if (log.isDebugEnabled())
+ log.debug("starting state integration at node " + targetRoot + ". Fetch Persistent State = " + fetchPersistentState);
+ integrator.integrateState(state, targetRoot.getDelegationTarget(), targetRoot.getFqn(), fetchPersistentState);
+
+ if (log.isDebugEnabled())
+ log.debug("successfully integrated state in " + (System.currentTimeMillis() - startTime) + " msec");
+ }
+}
Property changes on: core/branches/flat/src/main/java/org/jboss/starobrno/statetransfer/DefaultStateTransferManager.java
___________________________________________________________________
Name: svn:keywords
+ Author Date Id Revision
Name: svn:eol-style
+ native
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/statetransfer/StateTransferManager.java (from rev 6897, core/branches/flat/src/main/java/org/jboss/cache/statetransfer/StateTransferManager.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/statetransfer/StateTransferManager.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/statetransfer/StateTransferManager.java 2008-10-14 17:49:31 UTC (rev 6945)
@@ -0,0 +1,79 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.statetransfer;
+
+import org.jboss.cache.Fqn;
+
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+/**
+ * This interface handles requests to generate or integrate state from neighbouring caches in a cluster.
+ * <p/>
+ * This has existed prior to 3.0.0 as a concrete class. An interface was introduced in 3.0.0 to provide more flexibility
+ * in state transfer implementations.
+ * <p/>
+ *
+ * @author Manik Surtani (<a href="mailto:manik@jboss.org">manik(a)jboss.org</a>)
+ * @since 3.0
+ */
+public interface StateTransferManager
+{
+ /**
+ * Writes the state for the portion of the tree named by <code>fqn</code> to
+ * the provided OutputStream.
+ * <p/>
+ * <p/>
+ *
+ * @param out stream to write state to
+ * @param fqn Fqn indicating the uppermost node in the
+ * portion of the tree whose state should be returned.
+ * @param timeout max number of millis this method should wait to acquire
+ * any locks, if necessary, on the nodes being transferred
+ * @param force if locks are needed and cannot be acquired after
+ * <code>timeout</code> millis, should the lock acquisition
+ * be forced, and any existing transactions holding locks
+ * on the nodes be rolled back?
+ * @param suppressErrors if true, all exceptions are logged but not propagated.
+ * @throws Exception in event of error
+ */
+ void getState(ObjectOutputStream out, Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Exception;
+
+ /**
+ * Set the portion of the cache rooted in <code>targetRoot</code>
+ * to match the given state. Updates the contents of <code>targetRoot</code>
+ * to reflect those in <code>new_state</code>.
+ * <p/>
+ * <strong>NOTE:</strong> This method performs no locking of nodes; it
+ * is up to the caller to lock <code>targetRoot</code> before calling
+ * this method.
+ * <p/>
+ * This method will use any {@link ClassLoader} needed as defined by the active {@link org.jboss.cache.Region}
+ * in the {@link org.jboss.cache.RegionManager}, pertaining to the targetRoot passed in.
+ *
+ * @param in an input stream containing the state
+ * @param targetRoot fqn of the node into which the state should be integrated
+ * @throws Exception In event of error
+ */
+ void setState(ObjectInputStream in, Fqn targetRoot) throws Exception;
+
+}
17 years, 2 months
JBoss Cache SVN: r6944 - core/branches/flat/src/main/java/org/jboss/starobrno/remoting.
by jbosscache-commits@lists.jboss.org
Author: mircea.markus
Date: 2008-10-14 13:49:09 -0400 (Tue, 14 Oct 2008)
New Revision: 6944
Added:
core/branches/flat/src/main/java/org/jboss/starobrno/remoting/ChannelMessageListener.java
core/branches/flat/src/main/java/org/jboss/starobrno/remoting/SuspectException.java
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManagerImpl.java
Log:
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/remoting/ChannelMessageListener.java (from rev 6936, core/branches/flat/src/main/java/org/jboss/cache/remoting/jgroups/ChannelMessageListener.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/remoting/ChannelMessageListener.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/remoting/ChannelMessageListener.java 2008-10-14 17:49:09 UTC (rev 6944)
@@ -0,0 +1,407 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.remoting;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.Fqn;
+import org.jboss.starobrno.io.ExposedByteArrayOutputStream;
+import org.jboss.starobrno.statetransfer.DefaultStateTransferManager;
+import org.jboss.starobrno.statetransfer.StateTransferManager;
+import org.jboss.starobrno.CacheException;
+import org.jboss.starobrno.config.Configuration;
+import org.jboss.starobrno.factories.annotations.Inject;
+import org.jboss.starobrno.factories.annotations.NonVolatile;
+import org.jboss.util.stream.MarshalledValueInputStream;
+import org.jboss.util.stream.MarshalledValueOutputStream;
+import org.jgroups.ExtendedMessageListener;
+import org.jgroups.Message;
+import org.jgroups.util.Util;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * JGroups MessageListener
+ *
+ * @author Manik Surtani (<a href="mailto:manik@jboss.org">manik(a)jboss.org</a>)
+ * @since 2.1.0
+ */
+@NonVolatile
+public class ChannelMessageListener implements ExtendedMessageListener
+{
+ /**
+ * Reference to an exception that was raised during
+ * state installation on this node.
+ */
+ protected volatile Exception setStateException;
+ private final Object stateLock = new Object();
+ private static final Log log = LogFactory.getLog(ChannelMessageListener.class);
+ private static final boolean trace = log.isTraceEnabled();
+ private StateTransferManager stateTransferManager;
+ private Configuration configuration;
+ /**
+ * True if state was initialized during start-up.
+ */
+ private volatile boolean isStateSet = false;
+
+
+ @Inject
+ private void injectDependencies(StateTransferManager stateTransferManager, Configuration configuration)
+ {
+ this.stateTransferManager = stateTransferManager;
+ this.configuration = configuration;
+ }
+
+ public boolean isStateSet()
+ {
+ return isStateSet;
+ }
+
+ public void setStateSet(boolean stateSet)
+ {
+ isStateSet = stateSet;
+ }
+
+ public void waitForState() throws Exception
+ {
+ synchronized (stateLock)
+ {
+ while (!isStateSet)
+ {
+ if (setStateException != null)
+ {
+ throw setStateException;
+ }
+
+ try
+ {
+ stateLock.wait();
+ }
+ catch (InterruptedException iex)
+ {
+ }
+ }
+ }
+ }
+
+ protected void stateReceivedSuccess()
+ {
+ isStateSet = true;
+ setStateException = null;
+ }
+
+ protected void stateReceivingFailed(Throwable t)
+ {
+ if (t instanceof CacheException)
+ {
+ log.debug(t);
+ }
+ else
+ {
+ log.error("failed setting state", t);
+ }
+ if (t instanceof Exception)
+ {
+ setStateException = (Exception) t;
+ }
+ else
+ {
+ setStateException = new Exception(t);
+ }
+ }
+
+ protected void stateProducingFailed(Throwable t)
+ {
+ if (t instanceof CacheException)
+ {
+ log.debug(t);
+ }
+ else
+ {
+ log.error("Caught " + t.getClass().getName()
+ + " while responding to state transfer request", t);
+ }
+ }
+
+ /**
+ * Callback, does nothing.
+ */
+ public void receive(Message msg)
+ {
+ }
+
+ public byte[] getState()
+ {
+ MarshalledValueOutputStream out = null;
+ byte[] result;
+ ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16 * 1024);
+ try
+ {
+ out = new MarshalledValueOutputStream(baos);
+
+ stateTransferManager.getState(out, Fqn.ROOT, configuration.getStateRetrievalTimeout(), true, true);
+ }
+ catch (Throwable t)
+ {
+ stateProducingFailed(t);
+ }
+ finally
+ {
+ result = baos.getRawBuffer();
+ Util.close(out);
+ }
+ return result;
+ }
+
+ public void setState(byte[] new_state)
+ {
+ if (new_state == null)
+ {
+ log.debug("transferred state is null (may be first member in cluster)");
+ return;
+ }
+ ByteArrayInputStream bais = new ByteArrayInputStream(new_state);
+ MarshalledValueInputStream in = null;
+ try
+ {
+ in = new MarshalledValueInputStream(bais);
+ stateTransferManager.setState(in, Fqn.ROOT);
+ stateReceivedSuccess();
+ }
+ catch (Throwable t)
+ {
+ stateReceivingFailed(t);
+ }
+ finally
+ {
+ Util.close(in);
+ synchronized (stateLock)
+ {
+ // Notify wait that state has been set.
+ stateLock.notifyAll();
+ }
+ }
+ }
+
+ public byte[] getState(String state_id)
+ {
+ if (trace) log.trace("Getting state for state id " + state_id);
+ MarshalledValueOutputStream out = null;
+ String sourceRoot = state_id;
+ byte[] result;
+
+ boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(DefaultStateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
+ if (hasDifferentSourceAndIntegrationRoots)
+ {
+ sourceRoot = state_id.split(DefaultStateTransferManager.PARTIAL_STATE_DELIMITER)[0];
+ }
+
+ ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16 * 1024);
+ try
+ {
+ out = new MarshalledValueOutputStream(baos);
+
+ stateTransferManager.getState(out, Fqn.fromString(sourceRoot),
+ configuration.getStateRetrievalTimeout(), true, true);
+ }
+ catch (Throwable t)
+ {
+ stateProducingFailed(t);
+ }
+ finally
+ {
+ result = baos.getRawBuffer();
+ Util.close(out);
+ }
+ return result;
+ }
+
+ public void getState(OutputStream ostream)
+ {
+ MarshalledValueOutputStream out = null;
+ try
+ {
+ out = new MarshalledValueOutputStream(ostream);
+ stateTransferManager.getState(out, Fqn.ROOT, configuration.getStateRetrievalTimeout(), true, true);
+ }
+ catch (Throwable t)
+ {
+ stateProducingFailed(t);
+ }
+ finally
+ {
+ Util.close(out);
+ }
+ }
+
+ public void getState(String state_id, OutputStream ostream)
+ {
+ if (trace) log.trace("Getting state for state id " + state_id);
+ String sourceRoot = state_id;
+ MarshalledValueOutputStream out = null;
+ boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(DefaultStateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
+ if (hasDifferentSourceAndIntegrationRoots)
+ {
+ sourceRoot = state_id.split(DefaultStateTransferManager.PARTIAL_STATE_DELIMITER)[0];
+ }
+ try
+ {
+ out = new MarshalledValueOutputStream(ostream);
+ stateTransferManager.getState(out, Fqn.fromString(sourceRoot), configuration.getStateRetrievalTimeout(), true, true);
+ }
+ catch (Throwable t)
+ {
+ stateProducingFailed(t);
+ }
+ finally
+ {
+ Util.close(out);
+ }
+ }
+
+ public void setState(InputStream istream)
+ {
+ if (istream == null)
+ {
+ log.debug("stream is null (may be first member in cluster)");
+ return;
+ }
+ MarshalledValueInputStream in = null;
+ try
+ {
+ in = new MarshalledValueInputStream(istream);
+ stateTransferManager.setState(in, Fqn.ROOT);
+ stateReceivedSuccess();
+ }
+ catch (Throwable t)
+ {
+ stateReceivingFailed(t);
+ }
+ finally
+ {
+ Util.close(in);
+ synchronized (stateLock)
+ {
+ // Notify wait that state has been set.
+ stateLock.notifyAll();
+ }
+ }
+ }
+
+ public void setState(String state_id, byte[] state)
+ {
+ if (trace) log.trace("Receiving state for " + state_id);
+ if (state == null)
+ {
+ log.debug("partial transferred state is null");
+ return;
+ }
+
+ MarshalledValueInputStream in = null;
+ String targetRoot = state_id;
+ boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(DefaultStateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
+ if (hasDifferentSourceAndIntegrationRoots)
+ {
+ targetRoot = state_id.split(DefaultStateTransferManager.PARTIAL_STATE_DELIMITER)[1];
+ }
+ try
+ {
+ log.debug("Setting received partial state for subroot " + state_id);
+ Fqn subroot = Fqn.fromString(targetRoot);
+// Region region = regionManager.getRegion(subroot, false);
+// ClassLoader cl = null;
+// if (region != null)
+// {
+// // If a classloader is registered for the node's region, use it
+// cl = region.getClassLoader();
+// }
+ ByteArrayInputStream bais = new ByteArrayInputStream(state);
+ in = new MarshalledValueInputStream(bais);
+ //getStateTransferManager().setState(in, subroot, cl);
+ stateTransferManager.setState(in, subroot);
+ stateReceivedSuccess();
+ }
+ catch (Throwable t)
+ {
+ stateReceivingFailed(t);
+ }
+ finally
+ {
+ Util.close(in);
+ synchronized (stateLock)
+ {
+ // Notify wait that state has been set.
+ stateLock.notifyAll();
+ }
+ }
+ }
+
+ public void setState(String stateId, InputStream istream)
+ {
+ if (trace) log.trace("Receiving state for " + stateId);
+ String targetRoot = stateId;
+ MarshalledValueInputStream in = null;
+ boolean hasDifferentSourceAndIntegrationRoots = stateId.indexOf(DefaultStateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
+ if (hasDifferentSourceAndIntegrationRoots)
+ {
+ targetRoot = stateId.split(DefaultStateTransferManager.PARTIAL_STATE_DELIMITER)[1];
+ }
+ if (istream == null)
+ {
+ log.debug("stream is null (may be first member in cluster). State is not set");
+ return;
+ }
+
+ try
+ {
+ log.debug("Setting received partial state for subroot " + stateId);
+ in = new MarshalledValueInputStream(istream);
+ Fqn subroot = Fqn.fromString(targetRoot);
+// Region region = regionManager.getRegion(subroot, false);
+// ClassLoader cl = null;
+// if (region != null)
+// {
+// // If a classloader is registered for the node's region, use it
+// cl = region.getClassLoader();
+// }
+ //getStateTransferManager().setState(in, subroot, cl);
+ stateTransferManager.setState(in, subroot);
+ stateReceivedSuccess();
+ }
+ catch (Throwable t)
+ {
+ if (log.isTraceEnabled()) log.trace("Unknown error while integrating state", t);
+ stateReceivingFailed(t);
+ }
+ finally
+ {
+ Util.close(in);
+ synchronized (stateLock)
+ {
+ // Notify wait that state has been set.
+ stateLock.notifyAll();
+ }
+ }
+ }
+}
\ No newline at end of file
Property changes on: core/branches/flat/src/main/java/org/jboss/starobrno/remoting/ChannelMessageListener.java
___________________________________________________________________
Name: svn:mergeinfo
+
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManagerImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManagerImpl.java 2008-10-14 17:47:27 UTC (rev 6943)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManagerImpl.java 2008-10-14 17:49:09 UTC (rev 6944)
@@ -23,21 +23,10 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.Fqn;
-import org.jboss.cache.NodeSPI;
-import org.jboss.cache.SuspectException;
-import org.jboss.cache.jmx.annotations.MBean;
-import org.jboss.cache.jmx.annotations.ManagedAttribute;
-import org.jboss.cache.jmx.annotations.ManagedOperation;
-import org.jboss.cache.lock.TimeoutException;
-import org.jboss.cache.marshall.CommandAwareRpcDispatcher;
-import org.jboss.cache.marshall.Marshaller;
-import org.jboss.cache.remoting.jgroups.ChannelMessageListener;
-import org.jboss.cache.statetransfer.DefaultStateTransferManager;
-import org.jboss.cache.util.concurrent.ReclosableLatch;
-import org.jboss.cache.util.reflect.ReflectionUtil;
import org.jboss.starobrno.CacheException;
import org.jboss.starobrno.CacheSPI;
+import org.jboss.starobrno.RPCManager;
+import org.jboss.starobrno.context.InvocationContext;
import org.jboss.starobrno.commands.ReplicableCommand;
import org.jboss.starobrno.config.Configuration;
import org.jboss.starobrno.config.RuntimeConfig;
@@ -47,18 +36,19 @@
import org.jboss.starobrno.factories.annotations.Stop;
import org.jboss.starobrno.interceptors.InterceptorChain;
import org.jboss.starobrno.invocation.InvocationContextContainer;
+import org.jboss.starobrno.jmx.annotations.MBean;
+import org.jboss.starobrno.jmx.annotations.ManagedAttribute;
+import org.jboss.starobrno.jmx.annotations.ManagedOperation;
import org.jboss.starobrno.lock.LockManager;
+import org.jboss.starobrno.lock.TimeoutException;
+import org.jboss.starobrno.marshall.CommandAwareRpcDispatcher;
+import org.jboss.starobrno.marshall.ExtendedMarshaller;
import org.jboss.starobrno.notifications.Notifier;
-import org.jboss.starobrno.transaction.GlobalTransaction;
+import org.jboss.starobrno.remoting.ChannelMessageListener;
import org.jboss.starobrno.transaction.TransactionTable;
-import org.jgroups.Address;
-import org.jgroups.Channel;
-import org.jgroups.ChannelException;
-import org.jgroups.ChannelFactory;
-import org.jgroups.ExtendedMembershipListener;
-import org.jgroups.JChannel;
-import org.jgroups.StateTransferException;
-import org.jgroups.View;
+import org.jboss.starobrno.util.ReflectionUtil;
+import org.jboss.cache.util.concurrent.ReclosableLatch;
+import org.jgroups.*;
import org.jgroups.blocks.GroupRequest;
import org.jgroups.blocks.RspFilter;
import org.jgroups.protocols.TP;
@@ -70,10 +60,7 @@
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Set;
import java.util.Vector;
import java.util.concurrent.TimeUnit;
@@ -115,19 +102,18 @@
private CacheSPI spi;
private InvocationContextContainer invocationContextContainer;
private final boolean trace = log.isTraceEnabled();
- private Marshaller marshaller;
+ private ExtendedMarshaller extendedMarshaller;
private TransactionManager txManager;
private TransactionTable txTable;
private InterceptorChain interceptorChain;
- private boolean isUsingBuddyReplication;
private boolean isInLocalMode;
private ComponentRegistry componentRegistry;
private LockManager lockManager;
@Inject
public void setupDependencies(ChannelMessageListener messageListener, Configuration configuration, Notifier notifier,
- Marshaller marshaller, TransactionTable txTable,
+ ExtendedMarshaller extendedMarshaller, TransactionTable txTable,
TransactionManager txManager, InvocationContextContainer container, InterceptorChain interceptorChain,
ComponentRegistry componentRegistry, LockManager lockManager)
{
@@ -136,7 +122,7 @@
this.notifier = notifier;
// TODO: Inject cacheSPI when we are ready
// this.spi = spi;
- this.marshaller = marshaller;
+ this.extendedMarshaller = extendedMarshaller;
this.txManager = txManager;
this.txTable = txTable;
this.invocationContextContainer = container;
@@ -150,67 +136,27 @@
@Start(priority = 15)
public void start()
{
- switch (configuration.getCacheMode())
+ if (configuration.getCacheMode().equals(Configuration.CacheMode.LOCAL))
{
- case LOCAL:
- log.debug("cache mode is local, will not create the channel");
- isInLocalMode = true;
- isUsingBuddyReplication = false;
- break;
- case REPL_SYNC:
- case REPL_ASYNC:
- case INVALIDATION_ASYNC:
- case INVALIDATION_SYNC:
- isInLocalMode = false;
- isUsingBuddyReplication = configuration.getBuddyReplicationConfig() != null && configuration.getBuddyReplicationConfig().isEnabled();
- if (log.isDebugEnabled()) log.debug("Cache mode is " + configuration.getCacheMode());
+ log.debug("cache mode is local, will not create the channel");
+ isInLocalMode = true;
+ return;
+ }
+ isInLocalMode = false;
+ if (log.isDebugEnabled()) log.debug("Cache mode is " + configuration.getCacheMode());
- boolean fetchState = shouldFetchStateOnStartup();
- initialiseChannelAndRpcDispatcher(fetchState);
+ initialiseChannelAndRpcDispatcher();
- if (fetchState)
- {
- try
- {
- long start = System.currentTimeMillis();
- // connect and state transfer
- channel.connect(configuration.getClusterName(), null, null, configuration.getStateRetrievalTimeout());
- //if I am not the only and the first member than wait for a state to arrive
- if (getMembers().size() > 1) messageListener.waitForState();
-
- if (log.isDebugEnabled())
- log.debug("connected, state was retrieved successfully (in " + (System.currentTimeMillis() - start) + " milliseconds)");
- }
- catch (StateTransferException ste)
- {
- // make sure we disconnect from the channel before we throw this exception!
- // JBCACHE-761
- disconnect();
- throw new CacheException("Unable to fetch state on startup", ste);
- }
- catch (ChannelException e)
- {
- throw new CacheException("Unable to connect to JGroups channel", e);
- }
- catch (Exception ex)
- {
- throw new CacheException("Unable to fetch state on startup", ex);
- }
- }
- else
- {
- //otherwise just connect
- try
- {
- channel.connect(configuration.getClusterName());
- }
- catch (ChannelException e)
- {
- throw new CacheException("Unable to connect to JGroups channel", e);
- }
- }
- if (log.isInfoEnabled()) log.info("Cache local address is " + getLocalAddress());
+ //otherwise just connect
+ try
+ {
+ channel.connect(configuration.getClusterName());
+ }
+ catch (ChannelException e)
+ {
+ throw new CacheException("Unable to connect to JGroups channel", e);
}
+ if (log.isInfoEnabled()) log.info("Cache local address is " + getLocalAddress());
}
public void disconnect()
@@ -250,17 +196,24 @@
rpcDispatcher = null;
}
- /**
- * @return true if we need to fetch state on startup. I.e., initiate a state transfer.
- */
- private boolean shouldFetchStateOnStartup()
+ @SuppressWarnings("deprecation")
+ private void initialiseChannelAndRpcDispatcher() throws CacheException
{
- boolean loaderFetch = configuration.getCacheLoaderConfig() != null && configuration.getCacheLoaderConfig().isFetchPersistentState();
- return !configuration.isInactiveOnStartup() && !isUsingBuddyReplication && (configuration.isFetchInMemoryState() || loaderFetch);
+ buildChannel();
+ // Channel.LOCAL *must* be set to false so we don't see our own messages - otherwise invalidations targeted at
+ // remote instances will be received by self.
+ channel.setOpt(Channel.LOCAL, false);
+ channel.setOpt(Channel.AUTO_RECONNECT, true);
+ channel.setOpt(Channel.AUTO_GETSTATE, false);
+ channel.setOpt(Channel.BLOCK, true);
+ rpcDispatcher = new CommandAwareRpcDispatcher(channel, messageListener, new MembershipListenerAdaptor(),
+ invocationContextContainer, invocationContextContainer, interceptorChain, componentRegistry);
+ checkAppropriateConfig();
+ rpcDispatcher.setRequestMarshaller(extendedMarshaller);
+ rpcDispatcher.setResponseMarshaller(extendedMarshaller);
}
- @SuppressWarnings("deprecation")
- private void initialiseChannelAndRpcDispatcher(boolean fetchState) throws CacheException
+ private void buildChannel()
{
channel = configuration.getRuntimeConfig().getChannel();
if (channel == null)
@@ -301,29 +254,6 @@
configuration.getRuntimeConfig().setChannel(channel);
}
-
- // Channel.LOCAL *must* be set to false so we don't see our own messages - otherwise invalidations targeted at
- // remote instances will be received by self.
- channel.setOpt(Channel.LOCAL, false);
- channel.setOpt(Channel.AUTO_RECONNECT, true);
- channel.setOpt(Channel.AUTO_GETSTATE, fetchState);
- channel.setOpt(Channel.BLOCK, true);
- // todo fix me
- /*
- if (configuration.isUseRegionBasedMarshalling())
- {
- rpcDispatcher = new InactiveRegionAwareRpcDispatcher(channel, messageListener, new MembershipListenerAdaptor(),
- spi, invocationContextContainer, interceptorChain, componentRegistry);
- }
- else
- {
- rpcDispatcher = new CommandAwareRpcDispatcher(channel, messageListener, new MembershipListenerAdaptor(),
- invocationContextContainer, invocationContextContainer, interceptorChain, componentRegistry);
- }
- */
- checkAppropriateConfig();
- rpcDispatcher.setRequestMarshaller(marshaller);
- rpcDispatcher.setResponseMarshaller(marshaller);
}
public Channel getChannel()
@@ -357,37 +287,37 @@
@Deprecated
- private void removeLocksForDeadMembers(NodeSPI node, List deadMembers)
+ private void removeLocksForDeadMembers(List deadMembers)
{
- Set<GlobalTransaction> deadOwners = new HashSet<GlobalTransaction>();
- Object owner = lockManager.getOwner(node);
-
- // todo fix me
- /*
- if (isLockOwnerDead(owner, deadMembers)) deadOwners.add((GlobalTransaction) owner);
-
-
- for (Object readOwner : lockManager.getReadOwners(node))
- {
- if (isLockOwnerDead(readOwner, deadMembers)) deadOwners.add((GlobalTransaction) readOwner);
- }
- */
-
- for (GlobalTransaction deadOwner : deadOwners)
- {
- boolean localTx = deadOwner.getAddress().equals(getLocalAddress());
- // TODO: Fix me!!!
+// Set<GlobalTransaction> deadOwners = new HashSet<GlobalTransaction>();
+// Object owner = lockManager.getOwner(node);
+//
+// todo fix me
+// /*
+// if (isLockOwnerDead(owner, deadMembers)) deadOwners.add((GlobalTransaction) owner);
+//
+//
+// for (Object readOwner : lockManager.getReadOwners(node))
+// {
+// if (isLockOwnerDead(readOwner, deadMembers)) deadOwners.add((GlobalTransaction) readOwner);
+// }
+// */
+//
+// for (GlobalTransaction deadOwner : deadOwners)
+// {
+// boolean localTx = deadOwner.getAddress().equals(getLocalAddress());
+// TODO: Fix me!!!
// boolean broken = LockUtil.breakTransactionLock(node.getFqn(), lockManager, deadOwner, localTx, txTable, txManager);
- boolean broken = true;
-
- if (broken && trace) log.trace("Broke lock for node " + node.getFqn() + " held by " + deadOwner);
- }
-
- // Recursively unlock children
- for (Object child : node.getChildrenDirect())
- {
- removeLocksForDeadMembers((NodeSPI) child, deadMembers);
- }
+// boolean broken = true;
+//
+// if (broken && trace) log.trace("Broke lock for node " + node.getFqn() + " held by " + deadOwner);
+// }
+//
+// Recursively unlock children
+// for (Object child : node.getChildrenDirect())
+// {
+// removeLocksForDeadMembers((NodeSPI) child, deadMembers);
+// }
}
@@ -463,7 +393,7 @@
}
useOutOfBandMessage = false;
// todo fix me!!
- RspList rsps = null;//rpcDispatcher.invokeRemoteCommands(recipients, command, modeToUse, timeout, isUsingBuddyReplication, useOutOfBandMessage, responseFilter);
+ RspList rsps = rpcDispatcher.invokeRemoteCommands(recipients, command, modeToUse, timeout, useOutOfBandMessage, responseFilter);
if (mode == GroupRequest.GET_NONE) return Collections.emptyList();// async case
if (trace)
log.trace("(" + getLocalAddress() + "): responses for method " + command.getClass().getSimpleName() + ":\n" + rsps);
@@ -512,91 +442,6 @@
}
}
- // ------------ START: Partial state transfer methods ------------
-
- public void fetchPartialState(List<Address> sources, Fqn sourceTarget, Fqn integrationTarget) throws Exception
- {
- String encodedStateId = sourceTarget + DefaultStateTransferManager.PARTIAL_STATE_DELIMITER + integrationTarget;
- fetchPartialState(sources, encodedStateId);
- }
-
- public void fetchPartialState(List<Address> sources, Fqn subtree) throws Exception
- {
- if (subtree == null)
- {
- throw new IllegalArgumentException("Cannot fetch partial state. Null subtree.");
- }
- fetchPartialState(sources, subtree.toString());
- }
-
- private void fetchPartialState(List<Address> sources, String stateId) throws Exception
- {
- if (sources == null || sources.isEmpty() || stateId == null)
- {
- // should this really be throwing an exception? Are there valid use cases where partial state may not be available? - Manik
- // Yes -- cache is configured LOCAL but app doesn't know it -- Brian
- //throw new IllegalArgumentException("Cannot fetch partial state, targets are " + sources + " and stateId is " + stateId);
- if (log.isWarnEnabled())
- log.warn("Cannot fetch partial state, targets are " + sources + " and stateId is " + stateId);
- return;
- }
-
- List<Address> targets = new LinkedList<Address>(sources);
-
- //skip *this* node as a target
- targets.remove(getLocalAddress());
-
- if (targets.isEmpty())
- {
- // Definitely no exception here -- this happens every time the 1st node in the
- // cluster activates a region!! -- Brian
- if (log.isDebugEnabled()) log.debug("Cannot fetch partial state. There are no target members specified");
- return;
- }
-
- if (log.isDebugEnabled())
- log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from members " + targets);
- boolean successfulTransfer = false;
- for (Address target : targets)
- {
- try
- {
- if (log.isDebugEnabled())
- log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target);
- messageListener.setStateSet(false);
- successfulTransfer = channel.getState(target, stateId, configuration.getStateRetrievalTimeout());
- if (successfulTransfer)
- {
- try
- {
- messageListener.waitForState();
- }
- catch (Exception transferFailed)
- {
- if (log.isTraceEnabled()) log.trace("Error while fetching state", transferFailed);
- successfulTransfer = false;
- }
- }
- if (log.isDebugEnabled())
- log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target + (successfulTransfer ? " successful" : " failed"));
- if (successfulTransfer) break;
- }
- catch (IllegalStateException ise)
- {
- // thrown by the JGroups channel if state retrieval fails.
- if (log.isInfoEnabled())
- log.info("Channel problems fetching state. Continuing on to next provider. ", ise);
- }
- }
-
- if (!successfulTransfer)
- {
- if (log.isDebugEnabled())
- log.debug("Node " + getLocalAddress() + " could not fetch partial state " + stateId + " from any member " + targets);
- }
-
- }
-
// ------------ END: Partial state transfer methods ------------
// ------------ START: Informational methods ------------
@@ -671,12 +516,12 @@
removed.removeAll(newMembers);
spi.getInvocationContext().getOptionOverrides().setSkipCacheStatusCheck(true);
// todo fix me
- NodeSPI root = null; // spi.getRoot();
- if (root != null)
- {
- // todo fix me
- //removeLocksForDeadMembers(root.getDelegationTarget(), removed);
- }
+// NodeSPI root = null; // spi.getRoot();
+// if (root != null)
+// {
+ // todo fix me
+ //removeLocksForDeadMembers(root.getDelegationTarget(), removed);
+// }
}
members = new ArrayList<Address>(newMembers); // defensive copy.
@@ -690,10 +535,8 @@
// now notify listeners - *after* updating the coordinator. - JBCACHE-662
if (needNotification && notifier != null)
{
- // TODO: Fix me when we have repl working
- throw new UnsupportedOperationException("Fix me!");
-// InvocationContext ctx = spi.getInvocationContext();
-// notifier.notifyViewChange(newView, ctx);
+ InvocationContext ctx = invocationContextContainer.get();
+ notifier.notifyViewChange(newView, ctx);
}
// Wake up any threads that are waiting to know about who the coordinator is
Added: core/branches/flat/src/main/java/org/jboss/starobrno/remoting/SuspectException.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/remoting/SuspectException.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/remoting/SuspectException.java 2008-10-14 17:49:09 UTC (rev 6944)
@@ -0,0 +1,51 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.remoting;
+
+import org.jboss.starobrno.CacheException;
+
+/**
+ * Thrown when a member is suspected during remote method invocation
+ *
+ * @author Bela Ban
+ * @version $Id: SuspectException.java 6886 2008-10-08 16:29:32Z manik.surtani(a)jboss.com $
+ */
+public class SuspectException extends CacheException
+{
+
+ private static final long serialVersionUID = -2965599037371850141L;
+
+ public SuspectException()
+ {
+ super();
+ }
+
+ public SuspectException(String msg)
+ {
+ super(msg);
+ }
+
+ public SuspectException(String msg, Throwable cause)
+ {
+ super(msg, cause);
+ }
+}
17 years, 2 months
JBoss Cache SVN: r6943 - core/branches/flat/src/main/java/org/jboss/starobrno/marshall.
by jbosscache-commits@lists.jboss.org
Author: mircea.markus
Date: 2008-10-14 13:47:27 -0400 (Tue, 14 Oct 2008)
New Revision: 6943
Added:
core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CacheMarshallerStarobrno.java
core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CommandAwareRpcDispatcher.java
core/branches/flat/src/main/java/org/jboss/starobrno/marshall/ExtendedMarshaller.java
core/branches/flat/src/main/java/org/jboss/starobrno/marshall/MarshalledValue2.java
core/branches/flat/src/main/java/org/jboss/starobrno/marshall/MarshalledValueHelper2.java
core/branches/flat/src/main/java/org/jboss/starobrno/marshall/NodeData.java
core/branches/flat/src/main/java/org/jboss/starobrno/marshall/NodeDataExceptionMarker.java
core/branches/flat/src/main/java/org/jboss/starobrno/marshall/NodeDataMarker.java
core/branches/flat/src/main/java/org/jboss/starobrno/marshall/UnmarshalledReferences.java
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/marshall/MarshalledValueMap.java
Log:
enabling replication
Added: core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CacheMarshallerStarobrno.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CacheMarshallerStarobrno.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CacheMarshallerStarobrno.java 2008-10-14 17:47:27 UTC (rev 6943)
@@ -0,0 +1,802 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.marshall;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.starobrno.CacheException;
+import org.jboss.starobrno.io.ByteBuffer;
+import org.jboss.starobrno.commands.CommandsFactory;
+import org.jboss.starobrno.commands.ReplicableCommand;
+import org.jboss.starobrno.config.Configuration;
+import org.jboss.starobrno.factories.annotations.Inject;
+import org.jboss.starobrno.transaction.GlobalTransaction;
+import org.jboss.starobrno.util.FastCopyHashMap;
+import org.jboss.starobrno.util.Immutables;
+import org.jboss.util.NotImplementedException;
+import org.jgroups.Address;
+import org.jgroups.stack.IpAddress;
+import org.jgroups.util.Buffer;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.InputStream;
+import java.lang.reflect.Array;
+import java.util.*;
+
+/**
+ * Abstract AbstractMarshaller for JBoss Cache.
+ *
+ * @author <a href="mailto:manik@jboss.org">Manik Surtani (manik(a)jboss.org)</a>
+ */
+public class CacheMarshallerStarobrno implements ExtendedMarshaller
+{
+ // magic numbers
+ protected static final int MAGICNUMBER_METHODCALL = 1;
+ protected static final int MAGICNUMBER_FQN = 2;
+ protected static final int MAGICNUMBER_GTX = 3;
+ protected static final int MAGICNUMBER_IPADDRESS = 4;
+ protected static final int MAGICNUMBER_ARRAY_LIST = 5;
+ protected static final int MAGICNUMBER_INTEGER = 6;
+ protected static final int MAGICNUMBER_LONG = 7;
+ protected static final int MAGICNUMBER_BOOLEAN = 8;
+ protected static final int MAGICNUMBER_STRING = 9;
+ protected static final int MAGICNUMBER_DEFAULT_DATA_VERSION = 10;
+ protected static final int MAGICNUMBER_LINKED_LIST = 11;
+ protected static final int MAGICNUMBER_HASH_MAP = 12;
+ protected static final int MAGICNUMBER_TREE_MAP = 13;
+ protected static final int MAGICNUMBER_HASH_SET = 14;
+ protected static final int MAGICNUMBER_TREE_SET = 15;
+ protected static final int MAGICNUMBER_NODEDATA_MARKER = 16;
+ protected static final int MAGICNUMBER_NODEDATA_EXCEPTION_MARKER = 17;
+ protected static final int MAGICNUMBER_NODEDATA = 18;
+ protected static final int MAGICNUMBER_GRAVITATERESULT = 19;
+ protected static final int MAGICNUMBER_SHORT = 20;
+ protected static final int MAGICNUMBER_IMMUTABLE_MAPCOPY = 21;
+ protected static final int MAGICNUMBER_MARSHALLEDVALUE = 22;
+ protected static final int MAGICNUMBER_FASTCOPY_HASHMAP = 23;
+ protected static final int MAGICNUMBER_ARRAY = 24;
+ protected static final int MAGICNUMBER_BYTE = 25;
+ protected static final int MAGICNUMBER_CHAR = 26;
+ protected static final int MAGICNUMBER_FLOAT = 27;
+ protected static final int MAGICNUMBER_DOUBLE = 28;
+ protected static final int MAGICNUMBER_OBJECT = 29;
+ protected static final int MAGICNUMBER_NULL = 99;
+ protected static final int MAGICNUMBER_SERIALIZABLE = 100;
+
+ protected static final int MAGICNUMBER_REF = 101;
+
+ public CacheMarshallerStarobrno()
+ {
+ initLogger();
+ // enabled, since this is always enabled in JBC 2.0.0.
+ useRefs = false;
+ }
+
+ protected Log log;
+ protected boolean trace;
+
+ protected Configuration configuration;
+ protected ClassLoader defaultClassLoader;
+ protected boolean useRefs = false;
+
+ @Inject
+ void injectDependencies(Configuration configuration, ClassLoader defaultClassLoader)
+ {
+ this.defaultClassLoader = defaultClassLoader;
+ this.configuration = configuration;
+ }
+
+ protected void initLogger()
+ {
+ log = LogFactory.getLog(getClass());
+ trace = log.isTraceEnabled();
+ }
+
+ // implement the basic contract set in RPCDispatcher.AbstractMarshaller
+ public byte[] objectToByteBuffer(Object obj) throws Exception
+ {
+ Buffer b = objectToBuffer(obj);
+ byte[] bytes = new byte[b.getLength()];
+ System.arraycopy(b.getBuf(), b.getOffset(), bytes, 0, b.getLength());
+ return bytes;
+ }
+
+ protected CommandsFactory commandsFactory;
+
+
+ @Inject
+ public void injectCommandsFactory(CommandsFactory commandsFactory)
+ {
+ this.commandsFactory = commandsFactory;
+ }
+
+ protected void marshallObject(Object o, ObjectOutputStream out, Map<Object, Integer> refMap) throws Exception
+ {
+ if (o != null && o.getClass().isArray() && isKnownType(o.getClass().getComponentType()))
+ {
+ marshallArray(o, out, refMap);
+ } else
+ {
+ marshallObject(o, out, refMap);
+ }
+ }
+
+
+ protected void marshallString(String s, ObjectOutputStream out) throws Exception
+ {
+ //StringUtil.saveString(out, s);
+ out.writeObject(s);
+ }
+
+ private void marshallCommand(ReplicableCommand command, ObjectOutputStream out, Map<Object, Integer> refMap) throws Exception
+ {
+ out.writeShort(command.getCommandId());
+ Object[] args = command.getParameters();
+ byte numArgs = (byte) (args == null ? 0 : args.length);
+ out.writeByte(numArgs);
+
+ for (int i = 0; i < numArgs; i++)
+ {
+ marshallObject(args[i], out, refMap);
+ }
+ }
+
+
+ private void marshallGlobalTransaction(GlobalTransaction globalTransaction, ObjectOutputStream out, Map<Object, Integer> refMap) throws Exception
+ {
+ out.writeLong(globalTransaction.getId());
+ marshallObject(globalTransaction.getAddress(), out, refMap);
+ }
+
+ private void marshallIpAddress(IpAddress ipAddress, ObjectOutputStream out) throws Exception
+ {
+ ipAddress.writeExternal(out);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void marshallCollection(Collection c, ObjectOutputStream out, Map refMap) throws Exception
+ {
+ writeUnsignedInt(out, c.size());
+ for (Object o : c)
+ {
+ marshallObject(o, out, refMap);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void marshallMap(Map map, ObjectOutputStream out, Map<Object, Integer> refMap) throws Exception
+ {
+ int mapSize = map.size();
+ writeUnsignedInt(out, mapSize);
+ if (mapSize == 0) return;
+
+ for (Map.Entry me : (Set<Map.Entry>) map.entrySet())
+ {
+ marshallObject(me.getKey(), out, refMap);
+ marshallObject(me.getValue(), out, refMap);
+ }
+ }
+
+ // --------- Unmarshalling methods
+
+ protected Object unmarshallObject(ObjectInputStream in, ClassLoader loader, UnmarshalledReferences refMap, boolean overrideContextClassloaderOnThread) throws Exception
+ {
+ if (loader == null)
+ {
+ return unmarshallObject(in, refMap);
+ } else
+ {
+ Thread currentThread = Thread.currentThread();
+ ClassLoader old = currentThread.getContextClassLoader();
+ try
+ {
+ // only do this if we haven't already set a context class loader elsewhere.
+ if (overrideContextClassloaderOnThread || old == null) currentThread.setContextClassLoader(loader);
+ return unmarshallObject(in, refMap);
+ }
+ finally
+ {
+ if (overrideContextClassloaderOnThread || old == null) currentThread.setContextClassLoader(old);
+ }
+ }
+ }
+
+ protected Object unmarshallObject(ObjectInputStream in, UnmarshalledReferences refMap) throws Exception
+ {
+ byte magicNumber = in.readByte();
+ int reference = 0;
+ Object retVal;
+ switch (magicNumber)
+ {
+ case MAGICNUMBER_NULL:
+ return null;
+ case MAGICNUMBER_REF:
+ if (useRefs)
+ {
+ reference = readReference(in);
+ return refMap.getReferencedObject(reference);
+ } else break;
+ case MAGICNUMBER_SERIALIZABLE:
+ if (useRefs) reference = readReference(in);
+ retVal = in.readObject();
+ if (useRefs) refMap.putReferencedObject(reference, retVal);
+ return retVal;
+ case MAGICNUMBER_MARSHALLEDVALUE:
+ MarshalledValue2 mv = new MarshalledValue2();
+ mv.readExternal(in);
+ return mv;
+ case MAGICNUMBER_METHODCALL:
+ retVal = unmarshallCommand(in, refMap);
+ return retVal;
+ case MAGICNUMBER_GTX:
+ if (useRefs) reference = readReference(in);
+ retVal = unmarshallGlobalTransaction(in, refMap);
+ if (useRefs) refMap.putReferencedObject(reference, retVal);
+ return retVal;
+ case MAGICNUMBER_IPADDRESS:
+ retVal = unmarshallIpAddress(in);
+ return retVal;
+ case MAGICNUMBER_ARRAY:
+ return unmarshallArray(in, refMap);
+ case MAGICNUMBER_ARRAY_LIST:
+ return unmarshallArrayList(in, refMap);
+ case MAGICNUMBER_LINKED_LIST:
+ return unmarshallLinkedList(in, refMap);
+ case MAGICNUMBER_HASH_MAP:
+ return unmarshallHashMap(in, refMap);
+ case MAGICNUMBER_TREE_MAP:
+ return unmarshallTreeMap(in, refMap);
+ case MAGICNUMBER_HASH_SET:
+ return unmarshallHashSet(in, refMap);
+ case MAGICNUMBER_TREE_SET:
+ return unmarshallTreeSet(in, refMap);
+ case MAGICNUMBER_IMMUTABLE_MAPCOPY:
+ return unmarshallMapCopy(in, refMap);
+ case MAGICNUMBER_FASTCOPY_HASHMAP:
+ return unmarshallFastCopyHashMap(in, refMap);
+ case MAGICNUMBER_BOOLEAN:
+ return in.readBoolean() ? Boolean.TRUE : Boolean.FALSE;
+ case MAGICNUMBER_INTEGER:
+ return in.readInt();
+ case MAGICNUMBER_LONG:
+ return in.readLong();
+ case MAGICNUMBER_SHORT:
+ return in.readShort();
+ case MAGICNUMBER_STRING:
+ if (useRefs) reference = readReference(in);
+ retVal = unmarshallString(in);
+ if (useRefs) refMap.putReferencedObject(reference, retVal);
+ return retVal;
+ case MAGICNUMBER_NODEDATA_MARKER:
+ retVal = new NodeDataMarker();
+ ((NodeDataMarker) retVal).readExternal(in);
+ return retVal;
+ case MAGICNUMBER_NODEDATA_EXCEPTION_MARKER:
+ retVal = new NodeDataExceptionMarker();
+ ((NodeDataExceptionMarker) retVal).readExternal(in);
+ return retVal;
+ case MAGICNUMBER_NODEDATA:
+ retVal = new NodeData();
+ ((NodeData) retVal).readExternal(in);
+ return retVal;
+ default:
+ if (log.isErrorEnabled())
+ {
+ log.error("Unknown Magic Number " + magicNumber);
+ }
+ throw new Exception("Unknown magic number " + magicNumber);
+ }
+ throw new Exception("Unknown magic number " + magicNumber);
+ }
+
+ private FastCopyHashMap unmarshallFastCopyHashMap(ObjectInputStream in, UnmarshalledReferences refMap) throws Exception
+ {
+ FastCopyHashMap map = new FastCopyHashMap();
+ populateFromStream(in, refMap, map);
+ return map;
+ }
+
+ protected String unmarshallString(ObjectInputStream in) throws Exception
+ {
+ return (String) in.readObject();
+ }
+
+ private ReplicableCommand unmarshallCommand(ObjectInputStream in, UnmarshalledReferences refMap) throws Exception
+ {
+ short methodId = in.readShort();
+ byte numArgs = in.readByte();
+ Object[] args = null;
+
+ if (numArgs > 0)
+ {
+ args = new Object[numArgs];
+ for (int i = 0; i < numArgs; i++) args[i] = unmarshallObject(in, refMap);
+ }
+
+ return commandsFactory.fromStream((byte) methodId, args);
+ }
+
+
+ private GlobalTransaction unmarshallGlobalTransaction(ObjectInputStream in, UnmarshalledReferences refMap) throws Exception
+ {
+ GlobalTransaction gtx = new GlobalTransaction();
+ long id = in.readLong();
+ Object address = unmarshallObject(in, refMap);
+ gtx.setId(id);
+ gtx.setAddress((Address) address);
+ return gtx;
+ }
+
+ private IpAddress unmarshallIpAddress(ObjectInputStream in) throws Exception
+ {
+ IpAddress ipAddress = new IpAddress();
+ ipAddress.readExternal(in);
+ return ipAddress;
+ }
+
+ private List unmarshallArrayList(ObjectInputStream in, UnmarshalledReferences refMap) throws Exception
+ {
+ int listSize = readUnsignedInt(in);
+ List list = new ArrayList(listSize);
+ populateFromStream(in, refMap, list, listSize);
+ return list;
+ }
+
+ private List unmarshallLinkedList(ObjectInputStream in, UnmarshalledReferences refMap) throws Exception
+ {
+ List list = new LinkedList();
+ populateFromStream(in, refMap, list, readUnsignedInt(in));
+ return list;
+ }
+
+ private Map unmarshallHashMap(ObjectInputStream in, UnmarshalledReferences refMap) throws Exception
+ {
+ Map map = new HashMap();
+ populateFromStream(in, refMap, map);
+ return map;
+ }
+
+ @SuppressWarnings("unchecked")
+ private Map unmarshallMapCopy(ObjectInputStream in, UnmarshalledReferences refMap) throws Exception
+ {
+ // read in as a HashMap first
+ Map m = unmarshallHashMap(in, refMap);
+ return Immutables.immutableMapWrap(m);
+ }
+
+ private Map unmarshallTreeMap(ObjectInputStream in, UnmarshalledReferences refMap) throws Exception
+ {
+ Map map = new TreeMap();
+ populateFromStream(in, refMap, map);
+ return map;
+ }
+
+ private Set unmarshallHashSet(ObjectInputStream in, UnmarshalledReferences refMap) throws Exception
+ {
+ Set set = new HashSet();
+ populateFromStream(in, refMap, set);
+ return set;
+ }
+
+ private Set unmarshallTreeSet(ObjectInputStream in, UnmarshalledReferences refMap) throws Exception
+ {
+ Set set = new TreeSet();
+ populateFromStream(in, refMap, set);
+ return set;
+ }
+
+ @SuppressWarnings("unchecked")
+ private void populateFromStream(ObjectInputStream in, UnmarshalledReferences refMap, Map mapToPopulate) throws Exception
+ {
+ int size = readUnsignedInt(in);
+ for (int i = 0; i < size; i++) mapToPopulate.put(unmarshallObject(in, refMap), unmarshallObject(in, refMap));
+ }
+
+ @SuppressWarnings("unchecked")
+ private void populateFromStream(ObjectInputStream in, UnmarshalledReferences refMap, Set setToPopulate) throws Exception
+ {
+ int size = readUnsignedInt(in);
+ for (int i = 0; i < size; i++) setToPopulate.add(unmarshallObject(in, refMap));
+ }
+
+ @SuppressWarnings("unchecked")
+ private void populateFromStream(ObjectInputStream in, UnmarshalledReferences refMap, List listToPopulate, int listSize) throws Exception
+ {
+ for (int i = 0; i < listSize; i++) listToPopulate.add(unmarshallObject(in, refMap));
+ }
+
+ /**
+ * This version of writeReference is written to solve JBCACHE-1211, where references are encoded as ints rather than shorts.
+ *
+ * @param out stream to write to
+ * @param reference reference to write
+ * @throws java.io.IOException propagated from OOS
+ * @see <a href="http://jira.jboss.org/jira/browse/JBCACHE-1211">JBCACHE-1211</a>
+ */
+ protected void writeReference(ObjectOutputStream out, int reference) throws IOException
+ {
+ writeUnsignedInt(out, reference);
+ }
+
+ /**
+ * This version of readReference is written to solve JBCACHE-1211, where references are encoded as ints rather than shorts.
+ *
+ * @param in stream to read from
+ * @return reference
+ * @throws java.io.IOException propagated from OUS
+ * @see <a href="http://jira.jboss.org/jira/browse/JBCACHE-1211">JBCACHE-1211</a>
+ */
+ protected int readReference(ObjectInputStream in) throws IOException
+ {
+ return readUnsignedInt(in);
+ }
+
+ /**
+ * Reads an int stored in variable-length format. Reads between one and
+ * five bytes. Smaller values take fewer bytes. Negative numbers are not
+ * supported.
+ */
+ protected int readUnsignedInt(ObjectInputStream in) throws IOException
+ {
+ byte b = in.readByte();
+ int i = b & 0x7F;
+ for (int shift = 7; (b & 0x80) != 0; shift += 7)
+ {
+ b = in.readByte();
+ i |= (b & 0x7FL) << shift;
+ }
+ return i;
+ }
+
+ /**
+ * Writes an int in a variable-length format. Writes between one and
+ * five bytes. Smaller values take fewer bytes. Negative numbers are not
+ * supported.
+ *
+ * @param i int to write
+ */
+ protected void writeUnsignedInt(ObjectOutputStream out, int i) throws IOException
+ {
+ while ((i & ~0x7F) != 0)
+ {
+ out.writeByte((byte) ((i & 0x7f) | 0x80));
+ i >>>= 7;
+ }
+ out.writeByte((byte) i);
+ }
+
+
+ /**
+ * Reads an int stored in variable-length format. Reads between one and
+ * nine bytes. Smaller values take fewer bytes. Negative numbers are not
+ * supported.
+ */
+ protected long readUnsignedLong(ObjectInputStream in) throws IOException
+ {
+ byte b = in.readByte();
+ long i = b & 0x7F;
+ for (int shift = 7; (b & 0x80) != 0; shift += 7)
+ {
+ b = in.readByte();
+ i |= (b & 0x7FL) << shift;
+ }
+ return i;
+ }
+
+ /**
+ * Writes an int in a variable-length format. Writes between one and
+ * nine bytes. Smaller values take fewer bytes. Negative numbers are not
+ * supported.
+ *
+ * @param i int to write
+ */
+ protected void writeUnsignedLong(ObjectOutputStream out, long i) throws IOException
+ {
+ while ((i & ~0x7F) != 0)
+ {
+ out.writeByte((byte) ((i & 0x7f) | 0x80));
+ i >>>= 7;
+ }
+ out.writeByte((byte) i);
+ }
+
+ protected Object unmarshallArray(ObjectInputStream in, UnmarshalledReferences refs) throws Exception
+ {
+ int sz = readUnsignedInt(in);
+ byte type = in.readByte();
+ switch (type)
+ {
+ case MAGICNUMBER_BOOLEAN:
+ {
+ boolean isPrim = in.readBoolean();
+ if (isPrim)
+ {
+ boolean[] a = new boolean[sz];
+ for (int i = 0; i < sz; i++) a[i] = in.readBoolean();
+ return a;
+ } else
+ {
+ Boolean[] a = new Boolean[sz];
+ for (int i = 0; i < sz; i++) a[i] = in.readBoolean();
+ return a;
+ }
+ }
+ case MAGICNUMBER_INTEGER:
+ {
+ boolean isPrim = in.readBoolean();
+ if (isPrim)
+ {
+ int[] a = new int[sz];
+ for (int i = 0; i < sz; i++) a[i] = in.readInt();
+ return a;
+ } else
+ {
+ Integer[] a = new Integer[sz];
+ for (int i = 0; i < sz; i++) a[i] = in.readInt();
+ return a;
+ }
+ }
+ case MAGICNUMBER_LONG:
+ {
+ boolean isPrim = in.readBoolean();
+ if (isPrim)
+ {
+ long[] a = new long[sz];
+ for (int i = 0; i < sz; i++) a[i] = in.readLong();
+ return a;
+ } else
+ {
+ Long[] a = new Long[sz];
+ for (int i = 0; i < sz; i++) a[i] = in.readLong();
+ return a;
+ }
+ }
+ case MAGICNUMBER_CHAR:
+ {
+ boolean isPrim = in.readBoolean();
+ if (isPrim)
+ {
+ char[] a = new char[sz];
+ for (int i = 0; i < sz; i++) a[i] = in.readChar();
+ return a;
+ } else
+ {
+ Character[] a = new Character[sz];
+ for (int i = 0; i < sz; i++) a[i] = in.readChar();
+ return a;
+ }
+ }
+ case MAGICNUMBER_BYTE:
+ {
+ boolean isPrim = in.readBoolean();
+ if (isPrim)
+ {
+ byte[] a = new byte[sz];
+ int bsize = 10240;
+ int offset = 0;
+ int bytesLeft = sz;
+ while (bytesLeft > 0)
+ {
+ int read = in.read(a, offset, Math.min(bsize, bytesLeft));
+ offset += read;
+ bytesLeft -= read;
+ }
+ return a;
+ } else
+ {
+ Byte[] a = new Byte[sz];
+ for (int i = 0; i < sz; i++) a[i] = in.readByte();
+ return a;
+ }
+ }
+ case MAGICNUMBER_SHORT:
+ {
+ boolean isPrim = in.readBoolean();
+ if (isPrim)
+ {
+ short[] a = new short[sz];
+ for (int i = 0; i < sz; i++) a[i] = in.readShort();
+ return a;
+ } else
+ {
+ Short[] a = new Short[sz];
+ for (int i = 0; i < sz; i++) a[i] = in.readShort();
+ return a;
+ }
+ }
+ case MAGICNUMBER_FLOAT:
+ {
+ boolean isPrim = in.readBoolean();
+ if (isPrim)
+ {
+ float[] a = new float[sz];
+ for (int i = 0; i < sz; i++) a[i] = in.readFloat();
+ return a;
+ } else
+ {
+ Float[] a = new Float[sz];
+ for (int i = 0; i < sz; i++) a[i] = in.readFloat();
+ return a;
+ }
+ }
+ case MAGICNUMBER_DOUBLE:
+ {
+ boolean isPrim = in.readBoolean();
+ if (isPrim)
+ {
+ double[] a = new double[sz];
+ for (int i = 0; i < sz; i++) a[i] = in.readDouble();
+ return a;
+ } else
+ {
+ Double[] a = new Double[sz];
+ for (int i = 0; i < sz; i++) a[i] = in.readDouble();
+ return a;
+ }
+ }
+ case MAGICNUMBER_OBJECT:
+ {
+ Object[] a = new Object[sz];
+ for (int i = 0; i < sz; i++) a[i] = unmarshallObject(in, refs);
+ return a;
+ }
+ default:
+ throw new CacheException("Unknown array type");
+ }
+ }
+
+ protected void marshallArray(Object o, ObjectOutputStream out, Map<Object, Integer> refMap) throws Exception
+ {
+ out.writeByte(MAGICNUMBER_ARRAY);
+ Class arrayTypeClass = o.getClass().getComponentType();
+ int sz = Array.getLength(o);
+ writeUnsignedInt(out, sz);
+ boolean isPrim = arrayTypeClass.isPrimitive();
+
+ if (!isPrim && arrayTypeClass.equals(Object.class))
+ {
+ out.writeByte(MAGICNUMBER_OBJECT);
+ for (int i = 0; i < sz; i++) marshallObject(Array.get(o, i), out, refMap);
+ } else if (arrayTypeClass.equals(byte.class) || arrayTypeClass.equals(Byte.class))
+ {
+ out.writeByte(MAGICNUMBER_BYTE);
+ out.writeBoolean(isPrim);
+ if (isPrim)
+ out.write((byte[]) o);
+ else
+ for (int i = 0; i < sz; i++) out.writeByte((Byte) Array.get(o, i));
+ } else if (arrayTypeClass.equals(int.class) || arrayTypeClass.equals(Integer.class))
+ {
+ out.writeByte(MAGICNUMBER_INTEGER);
+ out.writeBoolean(isPrim);
+ if (isPrim)
+ for (int i = 0; i < sz; i++) out.writeInt(Array.getInt(o, i));
+ else
+ for (int i = 0; i < sz; i++) out.writeInt((Integer) Array.get(o, i));
+ } else if (arrayTypeClass.equals(long.class) || arrayTypeClass.equals(Long.class))
+ {
+ out.writeByte(MAGICNUMBER_LONG);
+ out.writeBoolean(isPrim);
+ if (isPrim)
+ for (int i = 0; i < sz; i++) out.writeLong(Array.getLong(o, i));
+ else
+ for (int i = 0; i < sz; i++) out.writeLong((Long) Array.get(o, i));
+ } else if (arrayTypeClass.equals(boolean.class) || arrayTypeClass.equals(Boolean.class))
+ {
+ out.writeByte(MAGICNUMBER_BOOLEAN);
+ out.writeBoolean(isPrim);
+ if (isPrim)
+ for (int i = 0; i < sz; i++) out.writeBoolean(Array.getBoolean(o, i));
+ else
+ for (int i = 0; i < sz; i++) out.writeBoolean((Boolean) Array.get(o, i));
+ } else if (arrayTypeClass.equals(char.class) || arrayTypeClass.equals(Character.class))
+ {
+ out.writeByte(MAGICNUMBER_CHAR);
+ out.writeBoolean(isPrim);
+ if (isPrim)
+ for (int i = 0; i < sz; i++) out.writeChar(Array.getChar(o, i));
+ else
+ for (int i = 0; i < sz; i++) out.writeChar((Character) Array.get(o, i));
+ } else if (arrayTypeClass.equals(short.class) || arrayTypeClass.equals(Short.class))
+ {
+ out.writeByte(MAGICNUMBER_SHORT);
+ out.writeBoolean(isPrim);
+ if (isPrim)
+ for (int i = 0; i < sz; i++) out.writeShort(Array.getShort(o, i));
+ else
+ for (int i = 0; i < sz; i++) out.writeShort((Short) Array.get(o, i));
+ } else if (arrayTypeClass.equals(float.class) || arrayTypeClass.equals(Float.class))
+ {
+ out.writeByte(MAGICNUMBER_FLOAT);
+ out.writeBoolean(isPrim);
+ if (isPrim)
+ for (int i = 0; i < sz; i++) out.writeFloat(Array.getFloat(o, i));
+ else
+ for (int i = 0; i < sz; i++) out.writeFloat((Float) Array.get(o, i));
+ } else if (arrayTypeClass.equals(double.class) || arrayTypeClass.equals(Double.class))
+ {
+ out.writeByte(MAGICNUMBER_DOUBLE);
+ out.writeBoolean(isPrim);
+ if (isPrim)
+ for (int i = 0; i < sz; i++) out.writeDouble(Array.getDouble(o, i));
+ else
+ for (int i = 0; i < sz; i++) out.writeDouble((Double) Array.get(o, i));
+ } else throw new CacheException("Unknown array type!");
+ }
+
+ protected boolean isKnownType(Class c)
+ {
+ return (c.equals(Object.class) ||
+ c.isPrimitive() || c.equals(Character.class) || c.equals(Integer.class) || c.equals(Long.class) ||
+ c.equals(Byte.class) || c.equals(Boolean.class) || c.equals(Short.class) || c.equals(Float.class) ||
+ c.equals(Double.class));
+ }
+
+ public void objectToObjectStream(Object o, ObjectOutputStream out) throws Exception
+ {
+ Map<Object, Integer> refMap = useRefs ? new IdentityHashMap<Object, Integer>() : null;
+ ClassLoader toUse = defaultClassLoader;
+ Thread current = Thread.currentThread();
+ ClassLoader old = current.getContextClassLoader();
+ if (old != null) toUse = old;
+
+ try
+ {
+ current.setContextClassLoader(toUse);
+ marshallObject(o, out, refMap);
+ }
+ finally
+ {
+ current.setContextClassLoader(old);
+ }
+ }
+
+ public Object objectFromObjectStream(ObjectInputStream in) throws Exception
+ {
+ UnmarshalledReferences refMap = useRefs ? new UnmarshalledReferences() : null;
+ Object retValue = unmarshallObject(in, defaultClassLoader, refMap, false);
+ if (trace) log.trace("Unmarshalled object " + retValue);
+ return retValue;
+ }
+
+ public Object objectFromStream(InputStream is) throws Exception
+ {
+ throw new NotImplementedException("not implemented");
+ }
+
+ public ByteBuffer objectToBuffer(Object o) throws Exception
+ {
+ throw new RuntimeException("Needs to be overridden!");
+ }
+
+ public Object objectFromByteBuffer(byte[] buf, int offset, int length) throws Exception
+ {
+ throw new RuntimeException("Needs to be overridden!");
+ }
+
+ public Object objectFromByteBuffer(byte[] bytes) throws Exception
+ {
+ return objectFromByteBuffer(bytes, 0, bytes.length);
+ }
+}
\ No newline at end of file
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CommandAwareRpcDispatcher.java (from rev 6897, core/branches/flat/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CommandAwareRpcDispatcher.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CommandAwareRpcDispatcher.java 2008-10-14 17:47:27 UTC (rev 6943)
@@ -0,0 +1,301 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.marshall;
+
+import org.jboss.cache.util.concurrent.WithinThreadExecutor;
+import org.jboss.starobrno.commands.ReplicableCommand;
+import org.jboss.starobrno.commands.VisitableCommand;
+import org.jboss.starobrno.commands.remote.AnnounceBuddyPoolNameCommand;
+import org.jboss.starobrno.commands.remote.AssignToBuddyGroupCommand;
+import org.jboss.starobrno.commands.remote.RemoveFromBuddyGroupCommand;
+import org.jboss.starobrno.config.Configuration;
+import org.jboss.starobrno.context.InvocationContext;
+import org.jboss.starobrno.factories.ComponentRegistry;
+import org.jboss.starobrno.interceptors.InterceptorChain;
+import org.jboss.starobrno.invocation.InvocationContextContainer;
+import org.jgroups.Address;
+import org.jgroups.Channel;
+import org.jgroups.MembershipListener;
+import org.jgroups.Message;
+import org.jgroups.MessageListener;
+import org.jgroups.blocks.RpcDispatcher;
+import org.jgroups.blocks.RspFilter;
+import org.jgroups.util.Buffer;
+import org.jgroups.util.Rsp;
+import org.jgroups.util.RspList;
+
+import java.io.NotSerializableException;
+import java.util.Vector;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A JGroups RPC dispatcher that knows how to deal with {@link org.jboss.cache.commands.ReplicableCommand}s.
+ *
+ * @author Manik Surtani (<a href="mailto:manik@jboss.org">manik(a)jboss.org</a>)
+ * @since 2.2.0
+ */
+public class CommandAwareRpcDispatcher extends RpcDispatcher
+{
+ protected InvocationContextContainer invocationContextContainer;
+ protected InterceptorChain interceptorChain;
+ protected ComponentRegistry componentRegistry;
+ protected boolean trace;
+ private ExecutorService replicationProcessor;
+ private AtomicInteger replicationProcessorCount;
+ private boolean asyncSerial;
+
+ public CommandAwareRpcDispatcher()
+ {
+ }
+
+ public CommandAwareRpcDispatcher(Channel channel, MessageListener l, MembershipListener l2, Object serverObj,
+ InvocationContextContainer container, InterceptorChain interceptorChain,
+ ComponentRegistry componentRegistry)
+ {
+ super(channel, l, l2, serverObj);
+ this.invocationContextContainer = container;
+ this.componentRegistry = componentRegistry;
+ this.interceptorChain = interceptorChain;
+ trace = log.isTraceEnabled();
+
+ // what sort of a repl processor do we need?
+ Configuration c = componentRegistry.getComponent(Configuration.class);
+ replicationProcessor = c.getRuntimeConfig().getAsyncSerializationExecutor();
+ if (c.getCacheMode().isSynchronous() ||
+ (replicationProcessor == null && c.getSerializationExecutorPoolSize() < 1)) // if an executor has not been injected and the pool size is set
+ {
+ // in-process thread. Not async.
+ replicationProcessor = new WithinThreadExecutor();
+ asyncSerial = false;
+ }
+ else
+ {
+ asyncSerial = true;
+ if (replicationProcessor == null)
+ {
+ replicationProcessorCount = new AtomicInteger(0);
+ replicationProcessor = Executors.newFixedThreadPool(c.isUseReplQueue() ? 1 : c.getSerializationExecutorPoolSize(),
+ new ThreadFactory()
+ {
+ public Thread newThread(Runnable r)
+ {
+ return new Thread(r, "AsyncReplicationProcessor-" + replicationProcessorCount.incrementAndGet());
+ }
+ }
+ );
+ }
+ }
+ }
+
+ @Override
+ public void stop()
+ {
+ replicationProcessor.shutdownNow();
+ try
+ {
+ replicationProcessor.awaitTermination(60, TimeUnit.SECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ super.stop();
+ }
+
+ protected boolean isValid(Message req)
+ {
+ if (server_obj == null)
+ {
+ log.error("no method handler is registered. Discarding request.");
+ return false;
+ }
+
+ if (req == null || req.getLength() == 0)
+ {
+ log.error("message or message buffer is null");
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Similar to {@link #callRemoteMethods(java.util.Vector, org.jgroups.blocks.MethodCall, int, long, boolean, boolean, org.jgroups.blocks.RspFilter)} except that this version
+ * is aware of {@link org.jboss.cache.commands.ReplicableCommand} objects.
+ */
+ public RspList invokeRemoteCommands(Vector<Address> dests, ReplicableCommand command, int mode, long timeout,
+ boolean oob, RspFilter filter) throws NotSerializableException, ExecutionException, InterruptedException
+ {
+ if (dests != null && dests.isEmpty())
+ {
+ // don't send if dest list is empty
+ if (trace) log.trace("Destination list is empty: no need to send message");
+ return new RspList();
+ }
+
+ if (trace)
+ log.trace(new StringBuilder("dests=").append(dests).append(", command=").append(command).
+ append(", mode=").append(mode).append(", timeout=").append(timeout));
+
+ ReplicationTask replicationTask = new ReplicationTask(command, oob, dests, mode, timeout, false, filter);
+ Future<RspList> response = replicationProcessor.submit(replicationTask);
+ if (asyncSerial)
+ {
+ // don't care about the response. return.
+ return null;
+ }
+ else
+ {
+ RspList retval = response.get();
+ if (retval.isEmpty() || containsOnlyNulls(retval))
+ return null;
+ else
+ return retval;
+ }
+ }
+
+ private boolean containsOnlyNulls(RspList l)
+ {
+ for (Rsp r : l.values())
+ {
+ if (r.getValue() != null || !r.wasReceived() || r.wasSuspected()) return false;
+ }
+ return true;
+ }
+
+ /**
+ * Message contains a Command. Execute it against *this* object and return result.
+ */
+ @Override
+ public Object handle(Message req)
+ {
+ if (isValid(req))
+ {
+ try
+ {
+ return executeCommand((ReplicableCommand) req_marshaller.objectFromByteBuffer(req.getBuffer(), req.getOffset(), req.getLength()), req);
+ }
+ catch (Throwable x)
+ {
+ if (trace) log.trace("Problems invoking command.", x);
+ return x;
+ }
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ protected Object executeCommand(ReplicableCommand cmd, Message req) throws Throwable
+ {
+ if (cmd == null) throw new NullPointerException("Unable to execute a null command! Message was " + req);
+ if (trace) log.trace("Executing command: " + cmd + " [sender=" + req.getSrc() + "]");
+
+ if (cmd instanceof VisitableCommand)
+ {
+ InvocationContext ctx = invocationContextContainer.get();
+ ctx.setOriginLocal(false);
+ if (!componentRegistry.invocationsAllowed(false))
+ {
+ return null;
+ }
+ return interceptorChain.invoke(ctx, (VisitableCommand) cmd);
+ }
+ else
+ {
+ if (trace) log.trace("This is a non-visitable command - so performing directly and not via the invoker.");
+
+ // need to check cache status for all except buddy replication commands.
+ if (!(cmd instanceof AnnounceBuddyPoolNameCommand ||
+ cmd instanceof AssignToBuddyGroupCommand ||
+ cmd instanceof RemoveFromBuddyGroupCommand)
+ && !componentRegistry.invocationsAllowed(false))
+ {
+ return null;
+ }
+ return cmd.perform(null);
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName() + "[Outgoing marshaller: " + req_marshaller + "; incoming marshaller: " + rsp_marshaller + "]";
+ }
+
+ private class ReplicationTask implements Callable<RspList>
+ {
+ private ReplicableCommand command;
+ private boolean oob;
+ private Vector<Address> dests;
+ private int mode;
+ private long timeout;
+ private boolean anycasting;
+ private RspFilter filter;
+
+ private ReplicationTask(ReplicableCommand command, boolean oob, Vector<Address> dests, int mode, long timeout, boolean anycasting, RspFilter filter)
+ {
+ this.command = command;
+ this.oob = oob;
+ this.dests = dests;
+ this.mode = mode;
+ this.timeout = timeout;
+ this.anycasting = anycasting;
+ this.filter = filter;
+ }
+
+ public RspList call() throws Exception
+ {
+ Buffer buf;
+ try
+ {
+ buf = req_marshaller.objectToBuffer(command);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Failure to marshal argument(s)", e);
+ }
+
+ Message msg = new Message();
+ msg.setBuffer(buf);
+ if (oob) msg.setFlag(Message.OOB);
+ RspList retval = castMessage(dests, msg, mode, timeout, anycasting, filter);
+ if (trace) log.trace("responses: " + retval);
+
+ // a null response is 99% likely to be due to a marshalling problem - we throw a NSE, this needs to be changed when
+ // JGroups supports http://jira.jboss.com/jira/browse/JGRP-193
+ // the serialization problem could be on the remote end and this is why we cannot catch this above, when marshalling.
+
+ if (retval == null)
+ throw new NotSerializableException("RpcDispatcher returned a null. This is most often caused by args for " + command.getClass().getSimpleName() + " not being serializable.");
+ return retval;
+ }
+ }
+}
\ No newline at end of file
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/marshall/ExtendedMarshaller.java (from rev 6897, core/branches/flat/src/main/java/org/jboss/cache/marshall/Marshaller.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/marshall/ExtendedMarshaller.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/marshall/ExtendedMarshaller.java 2008-10-14 17:47:27 UTC (rev 6943)
@@ -0,0 +1,89 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.marshall;
+
+import org.jboss.starobrno.io.ByteBuffer;
+import org.jgroups.blocks.RpcDispatcher;
+
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+/**
+ * A marshaller is a class that is able to marshall and unmarshall objects efficiently.
+ * <p/>
+ * The reason why this is implemented specially in JBoss Cache rather than resorting to
+ * Java serialization or even the more efficient JBoss serialization is that a lot of efficiency
+ * can be gained when a majority of the serialization that occurs has to do with a small set
+ * of known types such as {@link org.jboss.cache.Fqn} or {@link org.jboss.cache.commands.ReplicableCommand}, and class type information
+ * can be replaced with simple magic numbers.
+ * <p/>
+ * Unknown types (typically user data) falls back to JBoss serialization.
+ * <p/>
+ * In addition, using a marshaller allows adding additional data to the byte stream, such as context
+ * class loader information on which class loader to use to deserialize the object stream, or versioning
+ * information to allow streams to interoperate between different versions of JBoss Cache (see {@link org.jboss.cache.marshall.VersionAwareMarshaller}
+ * <p/>
+ * This interface implements the JGroups building-block interface {@link org.jgroups.blocks.RpcDispatcher.Marshaller} which
+ * is used to marshall {@link org.jboss.cache.commands.ReplicableCommand}s, their parameters and their response values.
+ * <p/>
+ * The interface is also used by the {@link org.jboss.cache.loader.CacheLoader} framework to efficiently serialize data to be persisted, as well as
+ * the {@link org.jboss.starobrno.statetransfer.StateTransferManager} when serializing the cache for transferring state en-masse.
+ *
+ * @author <a href="mailto://manik@jboss.org">Manik Surtani</a>
+ * @since 2.0.0
+ */
+public interface ExtendedMarshaller extends RpcDispatcher.Marshaller2
+{
+ /**
+ * Marshalls an object to a given {@link java.io.ObjectOutputStream}
+ *
+ * @param obj object to marshall
+ * @param out stream to marshall to
+ */
+ void objectToObjectStream(Object obj, ObjectOutputStream out) throws Exception;
+
+ /**
+ * Unmarshalls an object from an {@link java.io.ObjectInputStream}
+ *
+ * @param in stream to unmarshall from
+ */
+ Object objectFromObjectStream(ObjectInputStream in) throws Exception;
+
+ /**
+ * Unmarshalls an object from an {@link java.io.InputStream}
+ *
+ * @param is stream to unmarshall from
+ * @return Object from stream passed in.
+ */
+ Object objectFromStream(InputStream is) throws Exception;
+
+ /**
+ * A specialized form of {@link org.jgroups.blocks.RpcDispatcher.Marshaller2#objectToBuffer(Object)} that returns an instance
+ * of {@link org.jboss.starobrno.io.ByteBuffer} instead of {@link org.jgroups.util.Buffer}.
+ *
+ * @param o object to marshall
+ * @return a ByteBuffer
+ * @throws Exception
+ */
+ ByteBuffer objectToBuffer(Object o) throws Exception;
+}
\ No newline at end of file
Property changes on: core/branches/flat/src/main/java/org/jboss/starobrno/marshall/ExtendedMarshaller.java
___________________________________________________________________
Name: svn:keywords
+ Author Date Id Revision
Name: svn:eol-style
+ native
Added: core/branches/flat/src/main/java/org/jboss/starobrno/marshall/MarshalledValue2.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/marshall/MarshalledValue2.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/marshall/MarshalledValue2.java 2008-10-14 17:47:27 UTC (rev 6943)
@@ -0,0 +1,229 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.marshall;
+
+import org.jboss.starobrno.CacheException;
+import org.jboss.util.stream.MarshalledValueInputStream;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.Arrays;
+
+/**
+ * Wrapper that wraps cached data, providing lazy deserialization using the calling thread's context class loader.
+ * <p/>
+ * The {@link org.jboss.cache.interceptors.MarshalledValueInterceptor} handles transparent
+ * wrapping/unwrapping of cached data.
+ * <p/>
+ *
+ * @author Manik Surtani (<a href="mailto:manik@jboss.org">manik(a)jboss.org</a>)
+ * @see org.jboss.cache.interceptors.MarshalledValueInterceptor
+ * @since 2.1.0
+ */
+public class MarshalledValue2 implements Externalizable
+{
+ protected Object instance;
+ protected byte[] raw;
+ private int cachedHashCode = 0;
+ // by default equals() will test on the istance rather than the byte array if conversion is required.
+ private transient boolean equalityPreferenceForInstance = true;
+
+ public MarshalledValue2(Object instance) throws NotSerializableException
+ {
+ if (instance == null) throw new NullPointerException("Null values cannot be wrapped as MarshalledValues!");
+
+ if (instance instanceof Serializable)
+ this.instance = instance;
+ else
+ throw new NotSerializableException("Marshalled values can only wrap Objects that are serializable! Instance of " + instance.getClass() + " won't Serialize.");
+ }
+
+ public MarshalledValue2()
+ {
+ // empty ctor for serialization
+ }
+
+ public void setEqualityPreferenceForInstance(boolean equalityPreferenceForInstance)
+ {
+ this.equalityPreferenceForInstance = equalityPreferenceForInstance;
+ }
+
+ public synchronized void serialize()
+ {
+ if (raw == null)
+ {
+ try
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(instance);
+ oos.close();
+ baos.close();
+ // Do NOT set instance to null over here, since it may be used elsewhere (e.g., in a cache listener).
+ // this will be compacted by the MarshalledValueInterceptor when the call returns.
+// instance = null;
+ raw = baos.toByteArray();
+ }
+ catch (Exception e)
+ {
+ throw new CacheException("Unable to marshall value " + instance, e);
+ }
+ }
+ }
+
+ public synchronized void deserialize()
+ {
+ if (instance == null)
+ {
+ try
+ {
+ ByteArrayInputStream bais = new ByteArrayInputStream(raw);
+ // use a MarshalledValueInputStream since it needs to be aware of any context class loaders on the current thread.
+ ObjectInputStream ois = new MarshalledValueInputStream(bais);
+ instance = ois.readObject();
+ ois.close();
+ bais.close();
+// raw = null;
+ }
+ catch (Exception e)
+ {
+ throw new CacheException("Unable to unmarshall value", e);
+ }
+ }
+ }
+
+ /**
+ * Compacts the references held by this class to a single reference. If only one representation exists this method
+ * is a no-op unless the 'force' parameter is used, in which case the reference held is forcefully switched to the
+ * 'preferred representation'.
+ * <p/>
+ * Either way, a call to compact() will ensure that only one representation is held.
+ * <p/>
+ *
+ * @param preferSerializedRepresentation if true and both representations exist, the serialized representation is favoured. If false, the deserialized representation is preferred.
+ * @param force ensures the preferred representation is maintained and the other released, even if this means serializing or deserializing.
+ */
+ public void compact(boolean preferSerializedRepresentation, boolean force)
+ {
+ // reset the equalityPreference
+ equalityPreferenceForInstance = true;
+ if (force)
+ {
+ if (preferSerializedRepresentation && raw == null) serialize();
+ else if (!preferSerializedRepresentation && instance == null) deserialize();
+ }
+
+ if (instance != null && raw != null)
+ {
+ // need to lose one representation!
+
+ if (preferSerializedRepresentation)
+ {
+ instance = null;
+ }
+ else
+ {
+ raw = null;
+ }
+ }
+ }
+
+ public void writeExternal(ObjectOutput out) throws IOException
+ {
+ if (raw == null) serialize();
+ out.writeInt(raw.length);
+ out.write(raw);
+ out.writeInt(hashCode());
+ }
+
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
+ {
+ int size = in.readInt();
+ raw = new byte[size];
+ cachedHashCode = 0;
+ in.readFully(raw);
+ cachedHashCode = in.readInt();
+ }
+
+ public Object get() throws IOException, ClassNotFoundException
+ {
+ if (instance == null) deserialize();
+ return instance;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ MarshalledValue2 that = (MarshalledValue2) o;
+
+ // if both versions are serialized or deserialized, just compare the relevant representations.
+ if (raw != null && that.raw != null) return Arrays.equals(raw, that.raw);
+ if (instance != null && that.instance != null) return instance.equals(that.instance);
+
+ // if conversion of one representation to the other is necessary, then see which we prefer converting.
+ if (equalityPreferenceForInstance)
+ {
+ if (instance == null) deserialize();
+ if (that.instance == null) that.deserialize();
+ return instance.equals(that.instance);
+ }
+ else
+ {
+ if (raw == null) serialize();
+ if (that.raw == null) that.serialize();
+ return Arrays.equals(raw, that.raw);
+ }
+ }
+
+ @Override
+ public int hashCode()
+ {
+ if (cachedHashCode == 0)
+ {
+ // always calculate the hashcode based on the instance since this is where we're getting the equals()
+ if (instance == null) deserialize();
+ cachedHashCode = instance.hashCode();
+ if (cachedHashCode == 0) // degenerate case
+ {
+ cachedHashCode = 0xFEED;
+ }
+ }
+ return cachedHashCode;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "MarshalledValue2(cachedHashCode=" + cachedHashCode + "; serialized=" + (raw != null) + ")";
+ }
+}
Added: core/branches/flat/src/main/java/org/jboss/starobrno/marshall/MarshalledValueHelper2.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/marshall/MarshalledValueHelper2.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/marshall/MarshalledValueHelper2.java 2008-10-14 17:47:27 UTC (rev 6943)
@@ -0,0 +1,56 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.marshall;
+
+import org.jboss.cache.Fqn;
+import org.jboss.cache.commands.ReplicableCommand;
+import org.jboss.cache.transaction.GlobalTransaction;
+import org.jboss.starobrno.marshall.MarshalledValue2;
+import org.jgroups.Address;
+
+/**
+ * Common functionality used by the {@link org.jboss.cache.interceptors.MarshalledValueInterceptor} and the {@link org.jboss.cache.marshall.MarshalledValueMap}.
+ *
+ * @author Manik Surtani (<a href="mailto:manik@jboss.org">manik(a)jboss.org</a>)
+ * @see MarshalledValue2
+ * @see org.jboss.cache.interceptors.MarshalledValueInterceptor
+ * @see org.jboss.cache.marshall.MarshalledValueMap
+ * @since 2.1.0
+ */
+public class MarshalledValueHelper2
+{
+ /**
+ * Tests whether the type should be excluded from MarshalledValue wrapping.
+ *
+ * @param type type to test. Should not be null.
+ * @return true if it should be excluded from MarshalledValue wrapping.
+ */
+ public static boolean isTypeExcluded(Class type)
+ {
+ return type.equals(String.class) || type.isPrimitive() ||
+ type.equals(Void.class) || type.equals(Boolean.class) || type.equals(Character.class) ||
+ type.equals(Byte.class) || type.equals(Short.class) || type.equals(Integer.class) ||
+ type.equals(Long.class) || type.equals(Float.class) || type.equals(Double.class) ||
+ (type.isArray() && isTypeExcluded(type.getComponentType())) || type.equals(Fqn.class) || type.equals(GlobalTransaction.class) || type.equals(Address.class) ||
+ ReplicableCommand.class.isAssignableFrom(type) || type.equals(MarshalledValue2.class);
+ }
+}
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/marshall/MarshalledValueMap.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/marshall/MarshalledValueMap.java 2008-10-14 17:44:04 UTC (rev 6942)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/marshall/MarshalledValueMap.java 2008-10-14 17:47:27 UTC (rev 6943)
@@ -42,7 +42,7 @@
* <p/>
*
* @author Manik Surtani (<a href="mailto:manik@jboss.org">manik(a)jboss.org</a>)
- * @see MarshalledValue
+ * @see MarshalledValue2
* @since 2.1.0
*/
@Immutable
@@ -145,7 +145,7 @@
{
try
{
- return o instanceof MarshalledValue ? ((MarshalledValue) o).get() : o;
+ return o instanceof MarshalledValue2 ? ((MarshalledValue2) o).get() : o;
}
catch (Exception e)
{
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/marshall/NodeData.java (from rev 6897, core/branches/flat/src/main/java/org/jboss/cache/marshall/NodeData.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/marshall/NodeData.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/marshall/NodeData.java 2008-10-14 17:47:27 UTC (rev 6943)
@@ -0,0 +1,145 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.marshall;
+
+import org.jboss.cache.Fqn;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Serializable representation of the data of a node (FQN and attributes)
+ *
+ * @author Bela Ban
+ * @version $Id$
+ */
+// TODO: 3.0.0: remove Externalizable and rely on the CacheMarshaller.
+public class NodeData<K, V> implements Externalizable
+{
+ private Fqn fqn = null;
+ private Map<K, V> attrs = null;
+
+ static final long serialVersionUID = -7571995794010294485L;
+
+ public NodeData()
+ {
+ }
+
+ public NodeData(Fqn fqn)
+ {
+ this.fqn = fqn;
+ }
+
+ public NodeData(Fqn fqn, Map<K, V> attrs, boolean mapSafe)
+ {
+ this.fqn = fqn;
+ if (mapSafe || attrs == null)
+ this.attrs = attrs;
+ else
+ this.attrs = new HashMap<K, V>(attrs);
+ }
+
+ public NodeData(String fqn, Map<K, V> attrs, boolean mapSafe)
+ {
+ this(Fqn.fromString(fqn), attrs, mapSafe);
+ }
+
+ public Map<K, V> getAttributes()
+ {
+ return attrs;
+ }
+
+ public Fqn getFqn()
+ {
+ return fqn;
+ }
+
+ public boolean isMarker()
+ {
+ return false;
+ }
+
+ public boolean isExceptionMarker()
+ {
+ return false;
+ }
+
+ // TODO: 3.0.0: Remove and replace with marshallNodeData/unmarshallNodeData methods in the CacheMarshaller so that we can use the same marshalling framework for Fqns.
+ public void writeExternal(ObjectOutput out) throws IOException
+ {
+ out.writeObject(fqn);
+ if (attrs != null)
+ {
+ out.writeBoolean(true);
+ out.writeObject(attrs);
+ }
+ else
+ {
+ out.writeBoolean(false);
+ }
+ }
+
+ // TODO: 3.0.0: Remove in and replace with marshallNodeData/unmarshallNodeData methods in the CacheMarshaller so that we can use the same marshalling framework for Fqns.
+ @SuppressWarnings("unchecked")
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
+ {
+ fqn = (Fqn) in.readObject();
+ if (in.readBoolean())
+ {
+ attrs = (Map<K, V>) in.readObject();
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return "NodeData {fqn: " + fqn + ", attrs=" + attrs + "}";
+ }
+
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ NodeData nodeData = (NodeData) o;
+
+ if (attrs != null ? !attrs.equals(nodeData.attrs) : nodeData.attrs != null) return false;
+ if (fqn != null ? !fqn.equals(nodeData.fqn) : nodeData.fqn != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result;
+ result = (fqn != null ? fqn.hashCode() : 0);
+ result = 31 * result + (attrs != null ? attrs.hashCode() : 0);
+ return result;
+ }
+}
Property changes on: core/branches/flat/src/main/java/org/jboss/starobrno/marshall/NodeData.java
___________________________________________________________________
Name: svn:keywords
+ Author Date Id Revision
Name: svn:eol-style
+ native
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/marshall/NodeDataExceptionMarker.java (from rev 6897, core/branches/flat/src/main/java/org/jboss/cache/marshall/NodeDataExceptionMarker.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/marshall/NodeDataExceptionMarker.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/marshall/NodeDataExceptionMarker.java 2008-10-14 17:47:27 UTC (rev 6943)
@@ -0,0 +1,85 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.marshall;
+
+import org.jboss.starobrno.marshall.NodeData;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+public class NodeDataExceptionMarker extends NodeData
+{
+
+ private static final long serialVersionUID = 240199474174502551L;
+ private Throwable cause;
+ private Object cacheNodeIdentity;
+
+ public NodeDataExceptionMarker()
+ {
+ super();
+ }
+
+ public NodeDataExceptionMarker(Throwable t, Object node)
+ {
+ cause = t;
+ cacheNodeIdentity = node;
+ }
+
+ public Throwable getCause()
+ {
+ return cause;
+ }
+
+ public Object getCacheNodeIdentity()
+ {
+ return cacheNodeIdentity;
+ }
+
+ @Override
+ public boolean isExceptionMarker()
+ {
+ return true;
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException
+ {
+ super.writeExternal(out);
+ out.writeObject(cause);
+ out.writeObject(cacheNodeIdentity);
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
+ {
+ super.readExternal(in);
+ cause = (Throwable) in.readObject();
+ cacheNodeIdentity = in.readObject();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "NodeDataExceptionMarker";
+ }
+}
Property changes on: core/branches/flat/src/main/java/org/jboss/starobrno/marshall/NodeDataExceptionMarker.java
___________________________________________________________________
Name: svn:keywords
+ Author Date Id Revision
Name: svn:eol-style
+ native
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/marshall/NodeDataMarker.java (from rev 6897, core/branches/flat/src/main/java/org/jboss/cache/marshall/NodeDataMarker.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/marshall/NodeDataMarker.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/marshall/NodeDataMarker.java 2008-10-14 17:47:27 UTC (rev 6943)
@@ -0,0 +1,42 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.marshall;
+
+import org.jboss.starobrno.marshall.NodeData;
+
+public class NodeDataMarker extends NodeData
+{
+
+ private static final long serialVersionUID = 4851793846346021014L;
+
+ @Override
+ public boolean isMarker()
+ {
+ return true;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "NodeDataMarker";
+ }
+}
Property changes on: core/branches/flat/src/main/java/org/jboss/starobrno/marshall/NodeDataMarker.java
___________________________________________________________________
Name: svn:keywords
+ Author Date Id Revision
Name: svn:eol-style
+ native
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/marshall/UnmarshalledReferences.java (from rev 6897, core/branches/flat/src/main/java/org/jboss/cache/marshall/UnmarshalledReferences.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/marshall/UnmarshalledReferences.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/marshall/UnmarshalledReferences.java 2008-10-14 17:47:27 UTC (rev 6943)
@@ -0,0 +1,73 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.marshall;
+
+import org.jboss.starobrno.CacheException;
+
+import java.util.ArrayList;
+
+/**
+ * An efficient array-based list of referenced objects, using the reference id as a subscript for the array.
+ *
+ * @author <a href="mailto:manik@jboss.org">Manik Surtani (manik(a)jboss.org)</a>
+ */
+public class UnmarshalledReferences
+{
+ private final ArrayList<Object> referencedObjects = new ArrayList<Object>();
+
+ /**
+ * Retrieves an object referenced by an id
+ *
+ * @param ref reference
+ * @return object
+ */
+ public Object getReferencedObject(int ref)
+ {
+ if (ref >= referencedObjects.size())
+ throw new CacheException("Attempting to look up a ref that hasn't been inserted yet");
+ return referencedObjects.get(ref);
+ }
+
+ /**
+ * Adds a referenced object to the list of references
+ *
+ * @param ref reference id
+ * @param o object
+ */
+ public void putReferencedObject(int ref, Object o)
+ {
+ int sz = referencedObjects.size();
+ // if we are not adding the object to the end of the list, make sure we use a specific position
+ if (ref < sz)
+ {
+ referencedObjects.set(ref, o);
+ return;
+ }
+ else if (ref > sz)
+ {
+ // if we are adding the reference to a position beyond the end of the list, make sure we expand the list first.
+ // this can happen, weirdly enough, since marshallObject() can be called recursively, such as from marshallFqn().
+ for (int i = sz; i < ref; i++) referencedObjects.add(null);
+ }
+ referencedObjects.add(o);
+ }
+}
17 years, 2 months
JBoss Cache SVN: r6942 - in core/branches/flat/src/main/java/org/jboss/starobrno: io and 2 other directories.
by jbosscache-commits@lists.jboss.org
Author: mircea.markus
Date: 2008-10-14 13:44:04 -0400 (Tue, 14 Oct 2008)
New Revision: 6942
Added:
core/branches/flat/src/main/java/org/jboss/starobrno/io/
core/branches/flat/src/main/java/org/jboss/starobrno/lock/TimeoutException.java
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/io/ByteBuffer.java
core/branches/flat/src/main/java/org/jboss/starobrno/io/ExposedByteArrayOutputStream.java
core/branches/flat/src/main/java/org/jboss/starobrno/loader/CacheLoader.java
core/branches/flat/src/main/java/org/jboss/starobrno/lock/StripedLockManager.java
Log:
enabling replication
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/io (from rev 6897, core/branches/flat/src/main/java/org/jboss/cache/io)
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/io/ByteBuffer.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/cache/io/ByteBuffer.java 2008-10-09 13:22:09 UTC (rev 6897)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/io/ByteBuffer.java 2008-10-14 17:44:04 UTC (rev 6942)
@@ -19,7 +19,7 @@
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
-package org.jboss.cache.io;
+package org.jboss.starobrno.io;
import org.jgroups.util.Buffer;
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/io/ExposedByteArrayOutputStream.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/cache/io/ExposedByteArrayOutputStream.java 2008-10-09 13:22:09 UTC (rev 6897)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/io/ExposedByteArrayOutputStream.java 2008-10-14 17:44:04 UTC (rev 6942)
@@ -19,7 +19,7 @@
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
-package org.jboss.cache.io;
+package org.jboss.starobrno.io;
import net.jcip.annotations.NotThreadSafe;
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/loader/CacheLoader.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/loader/CacheLoader.java 2008-10-14 17:43:45 UTC (rev 6941)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/loader/CacheLoader.java 2008-10-14 17:44:04 UTC (rev 6942)
@@ -189,7 +189,7 @@
*
* @param os ObjectOutputStream to write state
* @see AbstractCacheLoader#loadEntireState(ObjectOutputStream)
- * @see org.jboss.cache.marshall.NodeData
+ * @see org.jboss.starobrno.marshall.NodeData
*/
void loadEntireState(ObjectOutputStream os) throws Exception;
@@ -209,7 +209,7 @@
*
* @param is ObjectInputStream to read state
* @see AbstractCacheLoader#storeEntireState(ObjectInputStream)
- * @see org.jboss.cache.marshall.NodeData
+ * @see org.jboss.starobrno.marshall.NodeData
*/
void storeEntireState(ObjectInputStream is) throws Exception;
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/lock/StripedLockManager.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/lock/StripedLockManager.java 2008-10-14 17:43:45 UTC (rev 6941)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/lock/StripedLockManager.java 2008-10-14 17:44:04 UTC (rev 6942)
@@ -23,10 +23,10 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.util.concurrent.locks.LockContainer;
-import org.jboss.cache.util.concurrent.locks.OwnableReentrantLock;
-import org.jboss.cache.util.concurrent.locks.OwnableReentrantLockContainer;
-import org.jboss.cache.util.concurrent.locks.ReentrantLockContainer;
+import org.jboss.starobrno.util.concurrent.locks.LockContainer;
+import org.jboss.starobrno.util.concurrent.locks.OwnableReentrantLock;
+import org.jboss.starobrno.util.concurrent.locks.OwnableReentrantLockContainer;
+import org.jboss.starobrno.util.concurrent.locks.ReentrantLockContainer;
import org.jboss.starobrno.config.Configuration;
import org.jboss.starobrno.context.InvocationContext;
import org.jboss.starobrno.factories.annotations.Inject;
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/lock/TimeoutException.java (from rev 6897, core/branches/flat/src/main/java/org/jboss/cache/lock/TimeoutException.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/lock/TimeoutException.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/lock/TimeoutException.java 2008-10-14 17:44:04 UTC (rev 6942)
@@ -0,0 +1,66 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.lock;
+
+import org.jboss.starobrno.CacheException;
+
+
+/**
+ * Thrown when a timeout occurred. used by operations with timeouts, e.g. lock
+ * acquisition, or waiting for responses from all members.
+ *
+ * @author <a href="mailto:bela@jboss.org">Bela Ban</a>.
+ * @version $Revision$
+ * <p/>
+ * <p><b>Revisions:</b>
+ * <p/>
+ * <p>Dec 28 2002 Bela Ban: first implementation
+ */
+public class TimeoutException extends CacheException
+{
+
+ /**
+ * The serialVersionUID
+ */
+ private static final long serialVersionUID = -8096787619908687038L;
+
+ public TimeoutException()
+ {
+ super();
+ }
+
+ public TimeoutException(String msg)
+ {
+ super(msg);
+ }
+
+ public TimeoutException(String msg, Throwable cause)
+ {
+ super(msg, cause);
+ }
+
+ @Override
+ public String toString()
+ {
+ return super.toString();
+ }
+}
Property changes on: core/branches/flat/src/main/java/org/jboss/starobrno/lock/TimeoutException.java
___________________________________________________________________
Name: svn:keywords
+ Author Date Id Revision
Name: svn:eol-style
+ native
17 years, 2 months
JBoss Cache SVN: r6941 - in core/branches/flat/src/main/java/org/jboss/starobrno: eviction and 3 other directories.
by jbosscache-commits@lists.jboss.org
Author: mircea.markus
Date: 2008-10-14 13:43:45 -0400 (Tue, 14 Oct 2008)
New Revision: 6941
Added:
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/ReplicationInterceptor.java
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/BaseRpcInterceptor.java
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntryCreator.java
core/branches/flat/src/main/java/org/jboss/starobrno/eviction/BaseEvictionAlgorithm.java
core/branches/flat/src/main/java/org/jboss/starobrno/factories/ComponentRegistry.java
core/branches/flat/src/main/java/org/jboss/starobrno/factories/EmptyConstructorFactory.java
core/branches/flat/src/main/java/org/jboss/starobrno/factories/ReplicationQueueFactory.java
core/branches/flat/src/main/java/org/jboss/starobrno/factories/RuntimeConfigAwareFactory.java
core/branches/flat/src/main/java/org/jboss/starobrno/factories/StateTransferManagerFactory.java
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvocationContextInterceptor.java
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/MarshalledValueInterceptor.java
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/TxInterceptor.java
Log:
enabling replication
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntryCreator.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntryCreator.java 2008-10-14 17:43:22 UTC (rev 6940)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntryCreator.java 2008-10-14 17:43:45 UTC (rev 6941)
@@ -23,7 +23,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.lock.TimeoutException;
+import org.jboss.starobrno.lock.TimeoutException;
import org.jboss.starobrno.config.Configuration;
import org.jboss.starobrno.context.InvocationContext;
import org.jboss.starobrno.factories.EntryFactory;
@@ -164,7 +164,7 @@
* @param fqn Fqn to lock
* @return true if a lock was needed and acquired, false if it didn't need to acquire the lock (i.e., lock was already held)
* @throws InterruptedException if interrupted
- * @throws org.jboss.cache.lock.TimeoutException
+ * @throws org.jboss.starobrno.lock.TimeoutException
* if we are unable to acquire the lock after a specified timeout.
*/
private boolean acquireLock(InvocationContext ctx, Object key) throws InterruptedException, TimeoutException
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/eviction/BaseEvictionAlgorithm.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/eviction/BaseEvictionAlgorithm.java 2008-10-14 17:43:22 UTC (rev 6940)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/eviction/BaseEvictionAlgorithm.java 2008-10-14 17:43:45 UTC (rev 6941)
@@ -25,7 +25,7 @@
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.CacheSPI_Legacy;
import org.jboss.cache.Fqn;
-import org.jboss.cache.lock.TimeoutException;
+import org.jboss.starobrno.lock.TimeoutException;
import org.jboss.starobrno.config.Configuration;
import org.jboss.starobrno.config.EvictionAlgorithmConfig;
import org.jboss.starobrno.eviction.EvictionEvent.Type;
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/factories/ComponentRegistry.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/factories/ComponentRegistry.java 2008-10-14 17:43:22 UTC (rev 6940)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/factories/ComponentRegistry.java 2008-10-14 17:43:45 UTC (rev 6941)
@@ -26,7 +26,7 @@
import org.jboss.cache.CacheStatus;
import org.jboss.cache.Version;
import org.jboss.cache.util.BeanUtils;
-import org.jboss.cache.util.reflect.ReflectionUtil;
+import org.jboss.starobrno.util.ReflectionUtil;
import org.jboss.starobrno.CacheException;
import org.jboss.starobrno.CacheSPI;
import org.jboss.starobrno.config.Configuration;
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/factories/EmptyConstructorFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/factories/EmptyConstructorFactory.java 2008-10-14 17:43:22 UTC (rev 6940)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/factories/EmptyConstructorFactory.java 2008-10-14 17:43:45 UTC (rev 6941)
@@ -22,14 +22,6 @@
package org.jboss.starobrno.factories;
-import org.jboss.cache.RegionRegistry;
-import org.jboss.cache.buddyreplication.BuddyFqnTransformer;
-import org.jboss.cache.invocation.CacheInvocationDelegate;
-import org.jboss.cache.loader.CacheLoaderManager;
-import org.jboss.cache.lock.LockStrategyFactory;
-import org.jboss.cache.marshall.Marshaller;
-import org.jboss.cache.marshall.VersionAwareMarshaller;
-import org.jboss.cache.remoting.jgroups.ChannelMessageListener;
import org.jboss.starobrno.batch.BatchContainer;
import org.jboss.starobrno.commands.CommandsFactory;
import org.jboss.starobrno.config.ConfigurationException;
@@ -39,6 +31,16 @@
import org.jboss.starobrno.invocation.InvocationContextContainer;
import org.jboss.starobrno.notifications.Notifier;
import org.jboss.starobrno.transaction.TransactionTable;
+import org.jboss.starobrno.remoting.RPCManagerImpl;
+import org.jboss.starobrno.remoting.ChannelMessageListener;
+import org.jboss.starobrno.marshall.ExtendedMarshaller;
+import org.jboss.starobrno.marshall.CacheMarshallerStarobrno;
+import org.jboss.starobrno.RPCManager;
+import org.jboss.cache.loader.CacheLoaderManager;
+import org.jboss.cache.invocation.CacheInvocationDelegate;
+import org.jboss.cache.lock.LockStrategyFactory;
+import org.jboss.cache.buddyreplication.BuddyFqnTransformer;
+import org.jboss.cache.RegionRegistry;
/**
* Simple factory that just uses reflection and an empty constructor of the component type.
@@ -47,7 +49,7 @@
* @since 2.1.0
*/
@DefaultFactoryFor(classes = {Notifier.class, RegionRegistry.class,
- ChannelMessageListener.class, CacheLoaderManager.class, Marshaller.class, InvocationContextContainer.class,
+ ChannelMessageListener.class, CacheLoaderManager.class, ExtendedMarshaller.class, InvocationContextContainer.class,
CacheInvocationDelegate.class, TransactionTable.class, MVCCEntryCreator.class,
LockStrategyFactory.class, BuddyFqnTransformer.class, BatchContainer.class,
ContextFactory.class, EntryFactory.class, CommandsFactory.class})
@@ -61,16 +63,20 @@
if (componentType.isInterface())
{
Class componentImpl;
- if (componentType.equals(Marshaller.class))
+ if (componentType.equals(ExtendedMarshaller.class))
{
- componentImpl = VersionAwareMarshaller.class;
+ componentImpl = CacheMarshallerStarobrno.class;
}
+ else
+ if (componentType.equals(RPCManager.class))
+ {
+ componentImpl = RPCManagerImpl.class;
+ }
else
{
// add an "Impl" to the end of the class name and try again
componentImpl = getClass().getClassLoader().loadClass(componentType.getName() + "Impl");
}
-
return componentType.cast(componentImpl.newInstance());
}
else
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/factories/ReplicationQueueFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/factories/ReplicationQueueFactory.java 2008-10-14 17:43:22 UTC (rev 6940)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/factories/ReplicationQueueFactory.java 2008-10-14 17:43:45 UTC (rev 6941)
@@ -21,7 +21,7 @@
*/
package org.jboss.starobrno.factories;
-import org.jboss.cache.cluster.ReplicationQueue;
+import org.jboss.starobrno.cluster.ReplicationQueue;
import org.jboss.starobrno.config.Configuration;
import org.jboss.starobrno.factories.annotations.DefaultFactoryFor;
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/factories/RuntimeConfigAwareFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/factories/RuntimeConfigAwareFactory.java 2008-10-14 17:43:22 UTC (rev 6940)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/factories/RuntimeConfigAwareFactory.java 2008-10-14 17:43:45 UTC (rev 6941)
@@ -22,10 +22,10 @@
package org.jboss.starobrno.factories;
import org.jboss.cache.util.BeanUtils;
+import org.jboss.starobrno.RPCManager;
import org.jboss.starobrno.config.ConfigurationException;
import org.jboss.starobrno.config.RuntimeConfig;
import org.jboss.starobrno.factories.annotations.DefaultFactoryFor;
-import org.jboss.starobrno.remoting.RPCManager;
import java.lang.reflect.Method;
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/factories/StateTransferManagerFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/factories/StateTransferManagerFactory.java 2008-10-14 17:43:22 UTC (rev 6940)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/factories/StateTransferManagerFactory.java 2008-10-14 17:43:45 UTC (rev 6941)
@@ -21,12 +21,12 @@
*/
package org.jboss.starobrno.factories;
-import org.jboss.cache.statetransfer.DefaultStateTransferManager;
-import org.jboss.cache.statetransfer.StateTransferManager;
+import org.jboss.starobrno.statetransfer.DefaultStateTransferManager;
+import org.jboss.starobrno.statetransfer.StateTransferManager;
import org.jboss.starobrno.factories.annotations.DefaultFactoryFor;
/**
- * Constructs {@link org.jboss.cache.statetransfer.StateTransferManager} instances.
+ * Constructs {@link org.jboss.starobrno.statetransfer.StateTransferManager} instances.
*
* @author Manik Surtani (<a href="mailto:manik@jboss.org">manik(a)jboss.org</a>)
* @since 3.0
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvocationContextInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvocationContextInterceptor.java 2008-10-14 17:43:22 UTC (rev 6940)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvocationContextInterceptor.java 2008-10-14 17:43:45 UTC (rev 6941)
@@ -34,9 +34,9 @@
import org.jboss.starobrno.config.Option;
import org.jboss.starobrno.context.InvocationContext;
import org.jboss.starobrno.factories.annotations.Inject;
-import org.jboss.starobrno.remoting.RPCManager;
import org.jboss.starobrno.transaction.GlobalTransaction;
import org.jboss.starobrno.transaction.TransactionTable;
+import org.jboss.starobrno.RPCManager;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/MarshalledValueInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/MarshalledValueInterceptor.java 2008-10-14 17:43:22 UTC (rev 6940)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/MarshalledValueInterceptor.java 2008-10-14 17:43:45 UTC (rev 6941)
@@ -29,6 +29,7 @@
import org.jboss.starobrno.interceptors.base.CommandInterceptor;
import org.jboss.starobrno.marshall.MarshalledValue;
import org.jboss.starobrno.marshall.MarshalledValueHelper;
+//import org.jboss.starobrno.marshall.MarshalledValueHelper;
import java.io.IOException;
import java.io.NotSerializableException;
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/ReplicationInterceptor.java (from rev 6895, core/branches/flat/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/ReplicationInterceptor.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/ReplicationInterceptor.java 2008-10-14 17:43:45 UTC (rev 6941)
@@ -0,0 +1,164 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.interceptors;
+
+import org.jboss.starobrno.commands.VisitableCommand;
+import org.jboss.starobrno.commands.tx.CommitCommand;
+import org.jboss.starobrno.commands.tx.PrepareCommand;
+import org.jboss.starobrno.commands.tx.RollbackCommand;
+import org.jboss.starobrno.commands.write.*;
+import org.jboss.starobrno.config.Configuration;
+import org.jboss.starobrno.context.InvocationContext;
+import org.jboss.starobrno.context.TransactionContext;
+import org.jboss.starobrno.interceptors.base.BaseRpcInterceptor;
+import org.jboss.starobrno.transaction.GlobalTransaction;
+
+/**
+ * Takes care of replicating modifications to other nodes in a cluster. Also
+ * listens for prepare(), commit() and rollback() messages which are received
+ * 'side-ways' (see docs/design/Refactoring.txt).
+ *
+ * @author Bela Ban
+ * @version $Id$
+ */
+public class ReplicationInterceptor extends BaseRpcInterceptor
+{
+
+ @Override
+ public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws Throwable
+ {
+ if (!skipReplicationOfTransactionMethod(ctx))
+ replicateCall(ctx, command, configuration.isSyncCommitPhase(), ctx.getOptionOverrides(), true);
+ return invokeNextInterceptor(ctx, command);
+ }
+
+ @Override
+ public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand command) throws Throwable
+ {
+ Object retVal = invokeNextInterceptor(ctx, command);
+ TransactionContext transactionContext = ctx.getTransactionContext();
+ if (transactionContext.hasLocalModifications())
+ {
+ PrepareCommand replicablePrepareCommand = command.copy(); // makre sure we remove any "local" transactions
+ replicablePrepareCommand.removeModifications(transactionContext.getLocalModifications());
+ command = replicablePrepareCommand;
+ }
+
+ if (!skipReplicationOfTransactionMethod(ctx)) runPreparePhase(command, command.getGlobalTransaction(), ctx);
+ return retVal;
+ }
+
+ @Override
+ public Object visitRollbackCommand(InvocationContext ctx, RollbackCommand command) throws Throwable
+ {
+ if (!skipReplicationOfTransactionMethod(ctx) && !ctx.isLocalRollbackOnly())
+ {
+ replicateCall(ctx, command, configuration.isSyncRollbackPhase(), ctx.getOptionOverrides());
+ }
+ return invokeNextInterceptor(ctx, command);
+ }
+
+ @Override
+ public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable
+ {
+ return handleCrudMethod(ctx, command);
+ }
+
+ @Override
+ public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable
+ {
+ return handleCrudMethod(ctx, command);
+ }
+
+ @Override
+ public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable
+ {
+ return handleCrudMethod(ctx, command);
+ }
+
+ @Override
+ public Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws Throwable
+ {
+ return handleCrudMethod(ctx, command);
+ }
+
+ @Override
+ public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable
+ {
+ return handleCrudMethod(ctx, command);
+ }
+
+ /**
+ * If we are within one transaction we won't do any replication as replication would only be performed at commit time.
+ * If the operation didn't originate locally we won't do any replication either.
+ */
+ private Object handleCrudMethod(InvocationContext ctx, VisitableCommand command)
+ throws Throwable
+ {
+ boolean local = isLocalModeForced(ctx);
+ if (local && ctx.getTransaction() == null) return invokeNextInterceptor(ctx, command);
+ // FIRST pass this call up the chain. Only if it succeeds (no exceptions) locally do we attempt to replicate.
+ Object returnValue = invokeNextInterceptor(ctx, command);
+ if (ctx.getTransaction() == null && ctx.isOriginLocal())
+ {
+ if (trace)
+ {
+ log.trace("invoking method " + command.getClass().getSimpleName() + ", members=" + rpcManager.getMembers() + ", mode=" +
+ configuration.getCacheMode() + ", exclude_self=" + true + ", timeout=" +
+ configuration.getSyncReplTimeout());
+ }
+
+ replicateCall(ctx, command, isSynchronous(ctx.getOptionOverrides()), ctx.getOptionOverrides());
+ }
+ else
+ {
+ if (local) ctx.getTransactionContext().addLocalModification(command);
+ }
+ return returnValue;
+ }
+
+ /**
+ * Calls prepare(GlobalTransaction,List,org.jgroups.Address,boolean)) in all members except self.
+ * Waits for all responses. If one of the members failed to prepare, its return value
+ * will be an exception. If there is one exception we rethrow it. This will mark the
+ * current transaction as rolled back, which will cause the
+ * afterCompletion(int) callback to have a status
+ * of <tt>MARKED_ROLLBACK</tt>. When we get that call, we simply roll back the
+ * transaction.<br/>
+ * If everything runs okay, the afterCompletion(int)
+ * callback will trigger the @link #runCommitPhase(GlobalTransaction)).
+ * <br/>
+ *
+ * @throws Exception
+ */
+ protected void runPreparePhase(PrepareCommand prepareMethod, GlobalTransaction gtx, InvocationContext ctx) throws Throwable
+ {
+ boolean async = configuration.getCacheMode() == Configuration.CacheMode.REPL_ASYNC;
+ if (trace)
+ {
+ log.trace("(" + rpcManager.getLocalAddress() + "): running remote prepare for global tx " + gtx + " with async mode=" + async);
+ }
+
+ // this method will return immediately if we're the only member (because exclude_self=true)
+ replicateCall(ctx, prepareMethod, !async, ctx.getOptionOverrides());
+ }
+}
\ No newline at end of file
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/TxInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/TxInterceptor.java 2008-10-14 17:43:22 UTC (rev 6940)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/TxInterceptor.java 2008-10-14 17:43:45 UTC (rev 6941)
@@ -23,6 +23,7 @@
import org.jboss.cache.util.concurrent.ConcurrentHashSet;
import org.jboss.starobrno.CacheException;
+import org.jboss.starobrno.RPCManager;
import org.jboss.starobrno.commands.CommandsFactory;
import org.jboss.starobrno.commands.ReplicableCommand;
import org.jboss.starobrno.commands.VisitableCommand;
@@ -40,7 +41,7 @@
import org.jboss.starobrno.jmx.annotations.ManagedOperation;
import org.jboss.starobrno.lock.LockManager;
import org.jboss.starobrno.notifications.Notifier;
-import org.jboss.starobrno.remoting.RPCManager;
+
import org.jboss.starobrno.remoting.ReplicationException;
import org.jboss.starobrno.transaction.GlobalTransaction;
import org.jboss.starobrno.transaction.OrderedSynchronizationHandler;
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/BaseRpcInterceptor.java (from rev 6895, core/branches/flat/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/BaseRpcInterceptor.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/BaseRpcInterceptor.java 2008-10-14 17:43:45 UTC (rev 6941)
@@ -0,0 +1,208 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.interceptors.base;
+
+import org.jboss.starobrno.RPCManager;
+import org.jboss.starobrno.cluster.ReplicationQueue;
+import org.jboss.starobrno.commands.CommandsFactory;
+import org.jboss.starobrno.commands.ReplicableCommand;
+import org.jboss.starobrno.config.Option;
+import org.jboss.starobrno.context.InvocationContext;
+import org.jboss.starobrno.context.TransactionContext;
+import org.jboss.starobrno.factories.annotations.Inject;
+import org.jboss.starobrno.factories.annotations.Start;
+import org.jboss.starobrno.transaction.GlobalTransaction;
+import org.jboss.starobrno.transaction.TransactionTable;
+import org.jgroups.Address;
+
+import javax.transaction.Transaction;
+import java.util.List;
+import java.util.Vector;
+
+/**
+ * Acts as a base for all RPC calls - subclassed by
+ * {@link org.jboss.cache.interceptors.ReplicationInterceptor} and {@link OptimisticReplicationInterceptor}.
+ *
+ * @author <a href="mailto:manik@jboss.org">Manik Surtani (manik(a)jboss.org)</a>
+ */
+public abstract class BaseRpcInterceptor extends CommandInterceptor
+{
+ private ReplicationQueue replicationQueue;
+ protected TransactionTable txTable;
+ private CommandsFactory commandsFactory;
+
+ protected RPCManager rpcManager;
+ protected boolean defaultSynchronous;
+
+ @Inject
+ public void injectComponents(RPCManager rpcManager, ReplicationQueue replicationQueue,
+ TransactionTable txTable, CommandsFactory commandsFactory)
+ {
+ this.rpcManager = rpcManager;
+ this.replicationQueue = replicationQueue;
+ this.txTable = txTable;
+ this.commandsFactory = commandsFactory;
+ }
+
+ @Start
+ public void init()
+ {
+ defaultSynchronous = configuration.getCacheMode().isSynchronous();
+ }
+
+ /**
+ * Checks whether any of the responses are exceptions. If yes, re-throws
+ * them (as exceptions or runtime exceptions).
+ */
+ protected void checkResponses(List rsps) throws Throwable
+ {
+ if (rsps != null)
+ {
+ for (Object rsp : rsps)
+ {
+ if (rsp != null && rsp instanceof Throwable)
+ {
+ // lets print a stack trace first.
+ if (log.isDebugEnabled())
+ log.debug("Received Throwable from remote node", (Throwable) rsp);
+ throw (Throwable) rsp;
+ }
+ }
+ }
+ }
+
+ protected void replicateCall(InvocationContext ctx, ReplicableCommand call, boolean sync, Option o, boolean useOutOfBandMessage) throws Throwable
+ {
+ replicateCall(ctx, null, call, sync, o, useOutOfBandMessage);
+ }
+
+ protected void replicateCall(InvocationContext ctx, ReplicableCommand call, boolean sync, Option o) throws Throwable
+ {
+ replicateCall(ctx, null, call, sync, o, false);
+ }
+
+ protected void replicateCall(InvocationContext ctx, Vector<Address> recipients, ReplicableCommand c, boolean sync, Option o, boolean useOutOfBandMessage) throws Throwable
+ {
+ long syncReplTimeout = configuration.getSyncReplTimeout();
+
+ // test for option overrides
+ if (o != null)
+ {
+ if (o.isForceAsynchronous()) sync = false;
+ else if (o.isForceSynchronous()) sync = true;
+
+ if (o.getSyncReplTimeout() > 0) syncReplTimeout = o.getSyncReplTimeout();
+ }
+
+ // tx-level overrides are more important
+ Transaction tx = ctx.getTransaction();
+ if (tx != null)
+ {
+ TransactionContext transactionContext = ctx.getTransactionContext();
+ if (transactionContext != null)
+ {
+ if (transactionContext.isForceAsyncReplication()) sync = false;
+ else if (transactionContext.isForceSyncReplication()) sync = true;
+ }
+ }
+
+ replicateCall(recipients, c, sync, true, useOutOfBandMessage, false, syncReplTimeout);
+ }
+
+ protected void replicateCall(Vector<Address> recipients, ReplicableCommand call, boolean sync, boolean wrapCacheCommandInReplicateMethod, boolean useOutOfBandMessage, boolean isBroadcast, long timeout) throws Throwable
+ {
+ if (trace) log.trace("Broadcasting call " + call + " to recipient list " + recipients);
+
+ if (!sync && replicationQueue != null)
+ {
+ if (log.isDebugEnabled()) log.debug("Putting call " + call + " on the replication queue.");
+ replicationQueue.add(commandsFactory.buildReplicateCommand(call));
+ }
+ else
+ {
+ Vector<Address> callRecipients = recipients;
+ if (callRecipients == null)
+ {
+ callRecipients = null;
+ if (trace)
+ log.trace("Setting call recipients to " + callRecipients + " since the original list of recipients passed in is null.");
+ }
+
+ ReplicableCommand toCall = wrapCacheCommandInReplicateMethod ? commandsFactory.buildReplicateCommand(call) : call;
+
+ List rsps = rpcManager.callRemoteMethods(callRecipients,
+ toCall,
+ sync, // is synchronised?
+ timeout,
+ useOutOfBandMessage
+ );
+ if (trace) log.trace("responses=" + rsps);
+ if (sync) checkResponses(rsps);
+ }
+ }
+
+ /**
+ * It does not make sense replicating a transaction method(commit, rollback, prepare) if one of the following is true:
+ * <pre>
+ * - call was not initiated here, but on other member of the cluster
+ * - there is no transaction. Why broadcast a commit or rollback if there is no transaction going on?
+ * - the current transaction did not modify any data
+ * </pre>
+ */
+ protected boolean skipReplicationOfTransactionMethod(InvocationContext ctx)
+ {
+ GlobalTransaction gtx = ctx.getGlobalTransaction();
+ return ctx.getTransaction() == null || gtx == null || gtx.isRemote() || ctx.getOptionOverrides().isCacheModeLocal() || !ctx.getTransactionContext().hasModifications();
+ }
+
+ /**
+ * The call runs in a transaction and it was initiated on this node of the cluster.
+ */
+ protected boolean isTransactionalAndLocal(InvocationContext ctx)
+ {
+ GlobalTransaction gtx = ctx.getGlobalTransaction();
+ boolean isInitiatedHere = gtx != null && !gtx.isRemote();
+ return isInitiatedHere && (ctx.getTransaction() != null);
+ }
+
+ protected boolean isSynchronous(Option option)
+ {
+ if (option != null)
+ {
+ if (option.isForceSynchronous())
+ return true;
+ else if (option.isForceAsynchronous())
+ return false;
+ }
+ return defaultSynchronous;
+ }
+
+ protected boolean isLocalModeForced(InvocationContext ctx)
+ {
+ if (ctx.getOptionOverrides() != null && ctx.getOptionOverrides().isCacheModeLocal())
+ {
+ if (log.isDebugEnabled()) log.debug("LOCAL mode forced on invocation. Suppressing clustered events.");
+ return true;
+ }
+ return false;
+ }
+}
\ No newline at end of file
Property changes on: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/BaseRpcInterceptor.java
___________________________________________________________________
Name: svn:keywords
+ Author Date Id Revision
Name: svn:eol-style
+ native
17 years, 2 months