[jbosscache-commits] JBoss Cache SVN: r7091 - in core/branches/flat/src/main/java/org/jboss/starobrno: delta and 1 other directory.
jbosscache-commits at lists.jboss.org
jbosscache-commits at lists.jboss.org
Thu Nov 6 14:32:17 EST 2008
Author: manik.surtani at jboss.com
Date: 2008-11-06 14:32:17 -0500 (Thu, 06 Nov 2008)
New Revision: 7091
Added:
core/branches/flat/src/main/java/org/jboss/starobrno/delta/
core/branches/flat/src/main/java/org/jboss/starobrno/delta/Delta.java
core/branches/flat/src/main/java/org/jboss/starobrno/delta/DeltaHashMap.java
Log:
delta interface
Added: core/branches/flat/src/main/java/org/jboss/starobrno/delta/Delta.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/delta/Delta.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/delta/Delta.java 2008-11-06 19:32:17 UTC (rev 7091)
@@ -0,0 +1,97 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.delta;
+
+import java.io.Externalizable;
+
+/**
+ * This interface allows for implementations to be palced as values in the cache, and report what has changed. This
+ * allows for finer grained replication messages.
+ * <p/>
+ * Basically, the first time a <tt>Delta</tt> object is placed in the cache, it is cached in its entirity. When a
+ * <tt>Delta</tt> is removed, similarly, it is removed in its entirity.
+ * <p/>
+ * Special behavior is seen when a <tt>Delta</tt> object is changed and replicated though. For example, consider the
+ * following code:
+ * <code>
+ * Person p = cache.get("personName");
+ * p.setAge(p.getAge() + 1);
+ * cache.put("personName", p);
+ * </code>
+ * <p/>
+ * where <tt>Person</tt> implements Delta.
+ * <p/>
+ * Normally, the put() call would involve serializing and replicating the entire <tt>Person</tt> instance, but since
+ * <tt>Person</tt> implements <tt>Delta</tt>, instead the marshaller will only expect the changes to be marshalled (when
+ * calling the {@link Externalizable} interfaces. As such, implementations could safely only marshall modified fields when
+ * {@link Externalizable#writeExternal(java.io.ObjectOutput)} is called, thereby reducing the time taken serializing
+ * and replicating changes.
+ * <p/>
+ * On the receiving end, changes are applied as such. In a {@link org.jboss.starobrno.commands.write.PutKeyValueCommand},
+ * if the value being put is a <tt>Delta</tt> and there is already a <tt>Delta</tt> under the same key, a merge operation
+ * is called by invoking {@link Delta#merge(Delta)} on the <i>replicated</i> <tt>Delta</tt> value and passing in the
+ * <i>original</i> <tt>Delta</tt> instance. It would then be up to the implementation to efficiently and correctly merge
+ * in state, and return a coherent and complete <tt>Delta</tt> instance that could be placed in the cache.
+ * <p/>
+ * If the command does not find anything under the key - or emor importantly, finds an object that does not implement
+ * <tt>Delta</tt> - it would pass a null into the {@link #merge(Delta)} method.
+ * <p/>
+ * It is important to note that {@link #merge(Delta)} is <b>only</b> called on remote nodes, after replication when
+ * merging state. This is never used on the local cache instance.
+ * <p/>
+ * In addition to {@link #merge(Delta)}, other important methods on this interface are {@link #rollback()}. If running
+ * within a JTA transaction scope, and a rollback occurs, no replication takes place so nothing needs to happen on
+ * remote instances. However, on your local cache instance, you may have changed internal state of a reference
+ * which already exists in the cache, such as in the above example. To correctly deal with JTA transaction rollbacks,
+ * all {@link #rollback()} methods on <tt>Delta</tt> instances involved in a transaction will be called and it is
+ * up to the implementation to reset internal state. There is a corresponding {@link #commit()} method where changes
+ * should be considered as accepted, are flushed and rollback information can be cleared.
+ * <p/>
+ * For an example of a <tt>Delta</tt> implementation, and for typical use cases, please refer to {@link org.jboss.starobrno.delta.DeltaHashMap}.
+ *
+ * @author Manik Surtani (<a href="mailto:manik AT jboss DOT org">manik AT jboss DOT org</a>)
+ * @see org.jboss.starobrno.delta.DeltaHashMap
+ */
+public interface Delta extends Externalizable
+{
+ /**
+ * Invoked if a transaction involving modifying state on this instance has rolled back, and state needs to be reverted.
+ */
+ void rollback();
+
+ /**
+ * Invoked if a transaction involving modifying state on this instance completes successfully, and any recorded
+ * rollback information can safely be discarded.
+ */
+ void commit();
+
+ /**
+ * Merges changes. Important to note that this method is always called on remote nodes, after replication, and is
+ * called on the de-serialized instance read off the wire. What is passed in is an extisting <tt>Delta</tt> instance
+ * to merge with, or a null. This method should return a fully coherent <tt>Delta</tt> instance than can be placed
+ * in the cache
+ *
+ * @param toMergeInto
+ * @return a fully coherent <tt>Delta</tt> instance
+ */
+ Delta merge(Delta toMergeInto);
+}
Added: core/branches/flat/src/main/java/org/jboss/starobrno/delta/DeltaHashMap.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/delta/DeltaHashMap.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/delta/DeltaHashMap.java 2008-11-06 19:32:17 UTC (rev 7091)
@@ -0,0 +1,301 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.delta;
+
+import org.jboss.starobrno.util.FastCopyHashMap;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+
+public class DeltaHashMap<K, V> implements Delta, Map<K, V>
+{
+ FastCopyHashMap<K, V> delegate;
+ List<Operation> changelog = new LinkedList<Operation>();
+
+ /**
+ * Constructs an empty <tt>HashMap</tt> with the specified initial
+ * capacity and load factor.
+ *
+ * @param initialCapacity The initial capacity.
+ * @param loadFactor The load factor.
+ * @throws IllegalArgumentException if the initial capacity is negative
+ * or the load factor is nonpositive.
+ */
+ public DeltaHashMap(int initialCapacity, float loadFactor)
+ {
+ delegate = new FastCopyHashMap(initialCapacity, loadFactor);
+ }
+
+ /**
+ * Constructs an empty <tt>HashMap</tt> with the specified initial
+ * capacity and the default load factor (0.75).
+ *
+ * @param initialCapacity the initial capacity.
+ * @throws IllegalArgumentException if the initial capacity is negative.
+ */
+ public DeltaHashMap(int initialCapacity)
+ {
+ delegate = new FastCopyHashMap<K, V>(initialCapacity);
+ }
+
+ /**
+ * Constructs an empty <tt>HashMap</tt> with the default initial capacity
+ * (16) and the default load factor (0.75).
+ */
+ public DeltaHashMap()
+ {
+ delegate = new FastCopyHashMap<K, V>();
+ }
+
+ /**
+ * Constructs a new <tt>HashMap</tt> with the same mappings as the
+ * specified <tt>Map</tt>. The <tt>HashMap</tt> is created with
+ * default load factor (0.75) and an initial capacity sufficient to
+ * hold the mappings in the specified <tt>Map</tt>.
+ *
+ * @param m the map whose mappings are to be placed in this map.
+ * @throws NullPointerException if the specified map is null.
+ */
+ public DeltaHashMap(Map<? extends K, ? extends V> m)
+ {
+ delegate = new FastCopyHashMap<K, V>(m);
+ }
+
+
+ public void rollback()
+ {
+ // replay reversal operations on the changelog, in reverse order
+ ListIterator<Operation> li = changelog.listIterator(changelog.size());
+ while (li.hasPrevious()) li.previous().rollback(delegate);
+ }
+
+ public void commit()
+ {
+ changelog.clear();
+ }
+
+ public Delta merge(Delta toMergeInto)
+ {
+ if (toMergeInto != null)
+ {
+ if (toMergeInto instanceof DeltaHashMap)
+ {
+ DeltaHashMap other = (DeltaHashMap) toMergeInto;
+ for (Operation o : changelog) o.replay(other.delegate);
+ commit();
+ other.commit();
+ return toMergeInto;
+ }
+ else
+ {
+ throw new IllegalArgumentException("This instance of " + getClass().getSimpleName() + " can only merge with other instances of the same type. Don't know how to deal with " + toMergeInto.getClass());
+ }
+ }
+ else
+ {
+ // use the current instance, since there is nothing to merge into.
+ for (Operation o : changelog) o.replay(delegate);
+ commit();
+ return this;
+ }
+ }
+
+ public void writeExternal(ObjectOutput out) throws IOException
+ {
+ out.writeObject(changelog);
+ }
+
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
+ {
+ changelog = (List<Operation>) in.readObject();
+ }
+
+ public int size()
+ {
+ return delegate.size();
+ }
+
+ public boolean isEmpty()
+ {
+ return delegate.isEmpty();
+ }
+
+ public boolean containsKey(Object key)
+ {
+ return delegate.containsKey(key);
+ }
+
+ public boolean containsValue(Object value)
+ {
+ return delegate.containsValue(value);
+ }
+
+ public V get(Object key)
+ {
+ return delegate.get(key);
+ }
+
+ public V put(K key, V value)
+ {
+ PutOperation<K, V> op = new PutOperation<K, V>();
+ op.key = key;
+ op.newValue = value;
+ op.oldValue = delegate.put(key, value);
+ changelog.add(op);
+ return op.oldValue;
+ }
+
+ public V remove(Object key)
+ {
+ RemoveOperation<K, V> op = new RemoveOperation<K, V>();
+ op.key = (K) key;
+ op.oldValue = delegate.remove(key);
+ changelog.add(op);
+ return op.oldValue;
+ }
+
+ public void putAll(Map<? extends K, ? extends V> t)
+ {
+ // this is crappy - need to do this more efficiently!
+ for (Entry<? extends K, ? extends V> e : t.entrySet()) put(e.getKey(), e.getValue());
+ }
+
+ public void clear()
+ {
+ ClearOperation<K, V> op = new ClearOperation<K, V>();
+ op.originalEntries = (FastCopyHashMap<K, V>) delegate.clone();
+ changelog.add(op);
+ delegate.clear();
+ }
+
+ public Set<K> keySet()
+ {
+ return delegate.keySet();
+ }
+
+ public Collection<V> values()
+ {
+ return delegate.values();
+ }
+
+ public Set<Entry<K, V>> entrySet()
+ {
+ return delegate.entrySet();
+ }
+
+ private static abstract class Operation<K, V> implements Externalizable
+ {
+ abstract void rollback(Map<K, V> delegate);
+
+ abstract void replay(Map<K, V> delegate);
+
+ public void writeExternal(ObjectOutput out) throws IOException
+ {
+ }
+
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
+ {
+ }
+ }
+
+ private static class PutOperation<K, V> extends Operation<K, V>
+ {
+ K key;
+ V oldValue;
+ V newValue;
+
+ void rollback(Map<K, V> delegate)
+ {
+ if (oldValue == null)
+ delegate.remove(key);
+ else
+ delegate.put(key, oldValue);
+ }
+
+ void replay(Map<K, V> delegate)
+ {
+ delegate.put(key, newValue);
+ }
+
+ public void writeExternal(ObjectOutput out) throws IOException
+ {
+ // don't bother writing out the old value since it will never be rolled back
+ out.writeObject(key);
+ out.writeObject(newValue);
+ }
+
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
+ {
+ key = (K) in.readObject();
+ newValue = (V) in.readObject();
+ }
+ }
+
+ private static class RemoveOperation<K, V> extends Operation<K, V>
+ {
+ K key;
+ V oldValue;
+
+ void rollback(Map<K, V> delegate)
+ {
+ if (oldValue != null) delegate.put(key, oldValue);
+ }
+
+ void replay(Map<K, V> delegate)
+ {
+ delegate.remove(key);
+ }
+
+ public void writeExternal(ObjectOutput out) throws IOException
+ {
+ out.writeObject(key);
+ }
+
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
+ {
+ key = (K) in.readObject();
+ }
+ }
+
+ private static class ClearOperation<K, V> extends Operation<K, V>
+ {
+ FastCopyHashMap<K, V> originalEntries;
+
+ void rollback(Map<K, V> delegate)
+ {
+ if (!originalEntries.isEmpty()) delegate.putAll(originalEntries);
+ }
+
+ void replay(Map<K, V> delegate)
+ {
+ delegate.clear();
+ }
+ }
+}
More information about the jbosscache-commits
mailing list