[jbosscache-commits] JBoss Cache SVN: r7091 - in core/branches/flat/src/main/java/org/jboss/starobrno: delta and 1 other directory.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Thu Nov 6 14:32:17 EST 2008


Author: manik.surtani at jboss.com
Date: 2008-11-06 14:32:17 -0500 (Thu, 06 Nov 2008)
New Revision: 7091

Added:
   core/branches/flat/src/main/java/org/jboss/starobrno/delta/
   core/branches/flat/src/main/java/org/jboss/starobrno/delta/Delta.java
   core/branches/flat/src/main/java/org/jboss/starobrno/delta/DeltaHashMap.java
Log:
delta interface

Added: core/branches/flat/src/main/java/org/jboss/starobrno/delta/Delta.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/delta/Delta.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/delta/Delta.java	2008-11-06 19:32:17 UTC (rev 7091)
@@ -0,0 +1,97 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.delta;
+
+import java.io.Externalizable;
+
+/**
+ * This interface allows for implementations to be palced as values in the cache, and report what has changed.  This
+ * allows for finer grained replication messages.
+ * <p/>
+ * Basically, the first time a <tt>Delta</tt> object is placed in the cache, it is cached in its entirity.  When a
+ * <tt>Delta</tt> is removed, similarly, it is removed in its entirity.
+ * <p/>
+ * Special behavior is seen when a <tt>Delta</tt> object is changed and replicated though.  For example, consider the
+ * following code:
+ * <code>
+ * Person p = cache.get("personName");
+ * p.setAge(p.getAge() + 1);
+ * cache.put("personName", p);
+ * </code>
+ * <p/>
+ * where <tt>Person</tt> implements Delta.
+ * <p/>
+ * Normally, the put() call would involve serializing and replicating the entire <tt>Person</tt> instance, but since
+ * <tt>Person</tt> implements <tt>Delta</tt>, instead the marshaller will only expect the changes to be marshalled (when
+ * calling the {@link Externalizable} interfaces.  As such, implementations could safely only marshall modified fields when
+ * {@link Externalizable#writeExternal(java.io.ObjectOutput)} is called, thereby reducing the time taken serializing
+ * and replicating changes.
+ * <p/>
+ * On the receiving end, changes are applied as such.  In a {@link org.jboss.starobrno.commands.write.PutKeyValueCommand},
+ * if the value being put is a <tt>Delta</tt> and there is already a <tt>Delta</tt> under the same key, a merge operation
+ * is called by invoking {@link Delta#merge(Delta)} on the <i>replicated</i> <tt>Delta</tt> value and passing in the
+ * <i>original</i> <tt>Delta</tt> instance.  It would then be up to the implementation to efficiently and correctly merge
+ * in state, and return a coherent and complete <tt>Delta</tt> instance that could be placed in the cache.
+ * <p/>
+ * If the command does not find anything under the key - or emor importantly, finds an object that does not implement
+ * <tt>Delta</tt> - it would pass a null into the {@link #merge(Delta)} method.
+ * <p/>
+ * It is important to note that {@link #merge(Delta)} is <b>only</b> called on remote nodes, after replication when
+ * merging state.  This is never used on the local cache instance.
+ * <p/>
+ * In addition to {@link #merge(Delta)}, other important methods on this interface are {@link #rollback()}.  If running
+ * within a JTA transaction scope, and a rollback occurs, no replication takes place so nothing needs to happen on
+ * remote instances.  However, on your local cache instance, you may have changed internal state of a reference
+ * which already exists in the cache, such as in the above example.  To correctly deal with JTA transaction rollbacks,
+ * all {@link #rollback()} methods on <tt>Delta</tt> instances involved in a transaction will be called and it is
+ * up to the implementation to reset internal state.  There is a corresponding {@link #commit()} method where changes
+ * should be considered as accepted, are flushed and rollback information can be cleared.
+ * <p/>
+ * For an example of a <tt>Delta</tt> implementation, and for typical use cases, please refer to {@link org.jboss.starobrno.delta.DeltaHashMap}.
+ *
+ * @author Manik Surtani (<a href="mailto:manik AT jboss DOT org">manik AT jboss DOT org</a>)
+ * @see org.jboss.starobrno.delta.DeltaHashMap
+ */
+public interface Delta extends Externalizable
+{
+   /**
+    * Invoked if a transaction involving modifying state on this instance has rolled back, and state needs to be reverted.
+    */
+   void rollback();
+
+   /**
+    * Invoked if a transaction involving modifying state on this instance completes successfully, and any recorded
+    * rollback information can safely be discarded.
+    */
+   void commit();
+
+   /**
+    * Merges changes.  Important to note that this method is always called on remote nodes, after replication, and is
+    * called on the de-serialized instance read off the wire.  What is passed in is an extisting <tt>Delta</tt> instance
+    * to merge with, or a null.  This method should return a fully coherent <tt>Delta</tt> instance than can be placed
+    * in the cache
+    *
+    * @param toMergeInto
+    * @return a fully coherent <tt>Delta</tt> instance
+    */
+   Delta merge(Delta toMergeInto);
+}

Added: core/branches/flat/src/main/java/org/jboss/starobrno/delta/DeltaHashMap.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/delta/DeltaHashMap.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/delta/DeltaHashMap.java	2008-11-06 19:32:17 UTC (rev 7091)
@@ -0,0 +1,301 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.delta;
+
+import org.jboss.starobrno.util.FastCopyHashMap;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+
+public class DeltaHashMap<K, V> implements Delta, Map<K, V>
+{
+   FastCopyHashMap<K, V> delegate;
+   List<Operation> changelog = new LinkedList<Operation>();
+
+   /**
+    * Constructs an empty <tt>HashMap</tt> with the specified initial
+    * capacity and load factor.
+    *
+    * @param initialCapacity The initial capacity.
+    * @param loadFactor      The load factor.
+    * @throws IllegalArgumentException if the initial capacity is negative
+    *                                  or the load factor is nonpositive.
+    */
+   public DeltaHashMap(int initialCapacity, float loadFactor)
+   {
+      delegate = new FastCopyHashMap(initialCapacity, loadFactor);
+   }
+
+   /**
+    * Constructs an empty <tt>HashMap</tt> with the specified initial
+    * capacity and the default load factor (0.75).
+    *
+    * @param initialCapacity the initial capacity.
+    * @throws IllegalArgumentException if the initial capacity is negative.
+    */
+   public DeltaHashMap(int initialCapacity)
+   {
+      delegate = new FastCopyHashMap<K, V>(initialCapacity);
+   }
+
+   /**
+    * Constructs an empty <tt>HashMap</tt> with the default initial capacity
+    * (16) and the default load factor (0.75).
+    */
+   public DeltaHashMap()
+   {
+      delegate = new FastCopyHashMap<K, V>();
+   }
+
+   /**
+    * Constructs a new <tt>HashMap</tt> with the same mappings as the
+    * specified <tt>Map</tt>.  The <tt>HashMap</tt> is created with
+    * default load factor (0.75) and an initial capacity sufficient to
+    * hold the mappings in the specified <tt>Map</tt>.
+    *
+    * @param m the map whose mappings are to be placed in this map.
+    * @throws NullPointerException if the specified map is null.
+    */
+   public DeltaHashMap(Map<? extends K, ? extends V> m)
+   {
+      delegate = new FastCopyHashMap<K, V>(m);
+   }
+
+
+   public void rollback()
+   {
+      // replay reversal operations on the changelog, in reverse order
+      ListIterator<Operation> li = changelog.listIterator(changelog.size());
+      while (li.hasPrevious()) li.previous().rollback(delegate);
+   }
+
+   public void commit()
+   {
+      changelog.clear();
+   }
+
+   public Delta merge(Delta toMergeInto)
+   {
+      if (toMergeInto != null)
+      {
+         if (toMergeInto instanceof DeltaHashMap)
+         {
+            DeltaHashMap other = (DeltaHashMap) toMergeInto;
+            for (Operation o : changelog) o.replay(other.delegate);
+            commit();
+            other.commit();
+            return toMergeInto;
+         }
+         else
+         {
+            throw new IllegalArgumentException("This instance of " + getClass().getSimpleName() + " can only merge with other instances of the same type.  Don't know how to deal with " + toMergeInto.getClass());
+         }
+      }
+      else
+      {
+         // use the current instance, since there is nothing to merge into.
+         for (Operation o : changelog) o.replay(delegate);
+         commit();
+         return this;
+      }
+   }
+
+   public void writeExternal(ObjectOutput out) throws IOException
+   {
+      out.writeObject(changelog);
+   }
+
+   public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
+   {
+      changelog = (List<Operation>) in.readObject();
+   }
+
+   public int size()
+   {
+      return delegate.size();
+   }
+
+   public boolean isEmpty()
+   {
+      return delegate.isEmpty();
+   }
+
+   public boolean containsKey(Object key)
+   {
+      return delegate.containsKey(key);
+   }
+
+   public boolean containsValue(Object value)
+   {
+      return delegate.containsValue(value);
+   }
+
+   public V get(Object key)
+   {
+      return delegate.get(key);
+   }
+
+   public V put(K key, V value)
+   {
+      PutOperation<K, V> op = new PutOperation<K, V>();
+      op.key = key;
+      op.newValue = value;
+      op.oldValue = delegate.put(key, value);
+      changelog.add(op);
+      return op.oldValue;
+   }
+
+   public V remove(Object key)
+   {
+      RemoveOperation<K, V> op = new RemoveOperation<K, V>();
+      op.key = (K) key;
+      op.oldValue = delegate.remove(key);
+      changelog.add(op);
+      return op.oldValue;
+   }
+
+   public void putAll(Map<? extends K, ? extends V> t)
+   {
+      // this is crappy - need to do this more efficiently!
+      for (Entry<? extends K, ? extends V> e : t.entrySet()) put(e.getKey(), e.getValue());
+   }
+
+   public void clear()
+   {
+      ClearOperation<K, V> op = new ClearOperation<K, V>();
+      op.originalEntries = (FastCopyHashMap<K, V>) delegate.clone();
+      changelog.add(op);
+      delegate.clear();
+   }
+
+   public Set<K> keySet()
+   {
+      return delegate.keySet();
+   }
+
+   public Collection<V> values()
+   {
+      return delegate.values();
+   }
+
+   public Set<Entry<K, V>> entrySet()
+   {
+      return delegate.entrySet();
+   }
+
+   private static abstract class Operation<K, V> implements Externalizable
+   {
+      abstract void rollback(Map<K, V> delegate);
+
+      abstract void replay(Map<K, V> delegate);
+
+      public void writeExternal(ObjectOutput out) throws IOException
+      {
+      }
+
+      public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
+      {
+      }
+   }
+
+   private static class PutOperation<K, V> extends Operation<K, V>
+   {
+      K key;
+      V oldValue;
+      V newValue;
+
+      void rollback(Map<K, V> delegate)
+      {
+         if (oldValue == null)
+            delegate.remove(key);
+         else
+            delegate.put(key, oldValue);
+      }
+
+      void replay(Map<K, V> delegate)
+      {
+         delegate.put(key, newValue);
+      }
+
+      public void writeExternal(ObjectOutput out) throws IOException
+      {
+         // don't bother writing out the old value since it will never be rolled back
+         out.writeObject(key);
+         out.writeObject(newValue);
+      }
+
+      public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
+      {
+         key = (K) in.readObject();
+         newValue = (V) in.readObject();
+      }
+   }
+
+   private static class RemoveOperation<K, V> extends Operation<K, V>
+   {
+      K key;
+      V oldValue;
+
+      void rollback(Map<K, V> delegate)
+      {
+         if (oldValue != null) delegate.put(key, oldValue);
+      }
+
+      void replay(Map<K, V> delegate)
+      {
+         delegate.remove(key);
+      }
+
+      public void writeExternal(ObjectOutput out) throws IOException
+      {
+         out.writeObject(key);
+      }
+
+      public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
+      {
+         key = (K) in.readObject();
+      }
+   }
+
+   private static class ClearOperation<K, V> extends Operation<K, V>
+   {
+      FastCopyHashMap<K, V> originalEntries;
+
+      void rollback(Map<K, V> delegate)
+      {
+         if (!originalEntries.isEmpty()) delegate.putAll(originalEntries);
+      }
+
+      void replay(Map<K, V> delegate)
+      {
+         delegate.clear();
+      }
+   }
+}




More information about the jbosscache-commits mailing list