[jbosscache-commits] JBoss Cache SVN: r6960 - in core/branches/flat/src: main/java/org/jboss/starobrno and 12 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Wed Oct 15 15:41:27 EDT 2008


Author: mircea.markus
Date: 2008-10-15 15:41:25 -0400 (Wed, 15 Oct 2008)
New Revision: 6960

Added:
   core/branches/flat/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor_Legacy.java
   core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/InvalidateCommand.java
   core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvalidationInterceptor.java
Removed:
   core/branches/flat/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java
Modified:
   core/branches/flat/src/main/java/org/jboss/starobrno/Cache.java
   core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java
   core/branches/flat/src/main/java/org/jboss/starobrno/commands/AbstractVisitor.java
   core/branches/flat/src/main/java/org/jboss/starobrno/commands/CommandsFactory.java
   core/branches/flat/src/main/java/org/jboss/starobrno/commands/CommandsFactoryImpl.java
   core/branches/flat/src/main/java/org/jboss/starobrno/commands/Visitor.java
   core/branches/flat/src/main/java/org/jboss/starobrno/container/DataContainer.java
   core/branches/flat/src/main/java/org/jboss/starobrno/container/UnsortedDataContainer.java
   core/branches/flat/src/main/java/org/jboss/starobrno/factories/InterceptorChainFactory.java
   core/branches/flat/src/main/java/org/jboss/starobrno/manager/CacheManager.java
   core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CacheMarshallerStarobrno.java
   core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CommandAwareRpcDispatcher.java
   core/branches/flat/src/main/java/org/jboss/starobrno/notifications/Notifier.java
   core/branches/flat/src/main/java/org/jboss/starobrno/notifications/NotifierImpl.java
   core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManagerImpl.java
   core/branches/flat/src/test/java/org/jboss/starobrno/BasicTest.java
   core/branches/flat/src/test/java/org/jboss/starobrno/util/TestingUtil.java
   core/branches/flat/src/test/resources/log4j.xml
Log:
progress on cache replication

Deleted: core/branches/flat/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java	2008-10-15 18:13:06 UTC (rev 6959)
+++ core/branches/flat/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java	2008-10-15 19:41:25 UTC (rev 6960)
@@ -1,392 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.cache.interceptors;
-
-import org.jboss.cache.Fqn;
-import org.jboss.cache.InvocationContext;
-import org.jboss.cache.commands.AbstractVisitor;
-import org.jboss.cache.commands.CommandsFactory;
-import org.jboss.cache.commands.VisitableCommand;
-import org.jboss.cache.commands.WriteCommand;
-import org.jboss.cache.commands.tx.CommitCommand;
-import org.jboss.cache.commands.tx.PrepareCommand;
-import org.jboss.cache.commands.tx.RollbackCommand;
-import org.jboss.cache.commands.write.ClearDataCommand;
-import org.jboss.cache.commands.write.MoveCommand;
-import org.jboss.cache.commands.write.PutDataMapCommand;
-import org.jboss.cache.commands.write.PutForExternalReadCommand;
-import org.jboss.cache.commands.write.PutKeyValueCommand;
-import org.jboss.cache.commands.write.RemoveKeyCommand;
-import org.jboss.cache.commands.write.RemoveNodeCommand;
-import org.jboss.cache.jmx.annotations.ManagedAttribute;
-import org.jboss.cache.jmx.annotations.ManagedOperation;
-import org.jboss.cache.transaction.GlobalTransaction;
-import org.jboss.cache.transaction.TransactionContext;
-import org.jboss.cache.transaction.TransactionTable;
-import org.jboss.starobrno.config.Option;
-import org.jboss.starobrno.factories.annotations.Inject;
-import org.jboss.starobrno.factories.annotations.Start;
-
-import javax.transaction.SystemException;
-import javax.transaction.Transaction;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * This interceptor acts as a replacement to the replication interceptor when
- * the CacheImpl is configured with ClusteredSyncMode as INVALIDATE.
- * <p/>
- * The idea is that rather than replicating changes to all caches in a cluster
- * when CRUD (Create, Remove, Update, Delete) methods are called, simply call
- * evict(Fqn) on the remote caches for each changed node.  This allows the
- * remote node to look up the value in a shared cache loader which would have
- * been updated with the changes.
- *
- * @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
- */
-public class InvalidationInterceptor extends BaseRpcInterceptor
-{
-   private long invalidations = 0;
-   protected Map<GlobalTransaction, List<WriteCommand>> txMods;
-   protected boolean optimistic;
-   private CommandsFactory commandsFactory;
-   private boolean statsEnabled;
-
-   @Inject
-   public void injectDependencies(CommandsFactory commandsFactory)
-   {
-      this.commandsFactory = commandsFactory;
-   }
-
-   @Start
-   private void initTxMap()
-   {
-      optimistic = false;
-      if (optimistic) txMods = new ConcurrentHashMap<GlobalTransaction, List<WriteCommand>>();
-      this.setStatisticsEnabled(configuration.getExposeManagementStatistics());
-   }
-
-   @Override
-   public Object visitPutDataMapCommand(InvocationContext ctx, PutDataMapCommand command) throws Throwable
-   {
-      return handleWriteMethod(ctx, command.getFqn(), null, command);
-   }
-
-   @Override
-   public Object visitPutForExternalReadCommand(InvocationContext ctx, PutForExternalReadCommand command) throws Throwable
-   {
-      // these are always local more, as far as invalidation is concerned
-      if (ctx.getTransaction() != null) ctx.getTransactionContext().addLocalModification(command);
-      return invokeNextInterceptor(ctx, command);
-   }
-
-   @Override
-   public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable
-   {
-      return handleWriteMethod(ctx, command.getFqn(), null, command);
-   }
-
-   @Override
-   public Object visitRemoveNodeCommand(InvocationContext ctx, RemoveNodeCommand command) throws Throwable
-   {
-      return handleWriteMethod(ctx, command.getFqn(), null, command);
-   }
-
-   @Override
-   public Object visitRemoveKeyCommand(InvocationContext ctx, RemoveKeyCommand command) throws Throwable
-   {
-      return handleWriteMethod(ctx, command.getFqn(), null, command);
-   }
-
-   @Override
-   public Object visitMoveCommand(InvocationContext ctx, MoveCommand command) throws Throwable
-   {
-      return handleWriteMethod(ctx, command.getTo(), command.getFqn(), command);
-   }
-
-   @Override
-   public Object visitClearDataCommand(InvocationContext ctx, ClearDataCommand command) throws Throwable
-   {
-      return handleWriteMethod(ctx, command.getFqn(), null, command);
-   }
-
-   @Override
-   public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand command) throws Throwable
-   {
-      Object retval = invokeNextInterceptor(ctx, command);
-      Transaction tx = ctx.getTransaction();
-      if (tx != null)
-      {
-         if (trace) log.trace("Entering InvalidationInterceptor's prepare phase");
-         // fetch the modifications before the transaction is committed (and thus removed from the txTable)
-         GlobalTransaction gtx = ctx.getGlobalTransaction();
-         TransactionContext transactionContext = ctx.getTransactionContext();
-         if (transactionContext == null)
-            throw new IllegalStateException("cannot find transaction transactionContext for " + gtx);
-
-         if (transactionContext.hasModifications())
-         {
-            List<WriteCommand> mods;
-            if (transactionContext.hasLocalModifications())
-            {
-               mods = new ArrayList<WriteCommand>(command.getModifications());
-               mods.removeAll(transactionContext.getLocalModifications());
-            }
-            else
-            {
-               mods = command.getModifications();
-            }
-            broadcastInvalidate(mods, tx, ctx);
-         }
-         else
-         {
-            if (trace) log.trace("Nothing to invalidate - no modifications in the transaction.");
-         }
-      }
-      return retval;
-   }
-
-   @Override
-   public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws Throwable
-   {
-      Object retval = invokeNextInterceptor(ctx, command);
-      Transaction tx = ctx.getTransaction();
-      if (tx != null && optimistic)
-      {
-         GlobalTransaction gtx = ctx.getGlobalTransaction();
-         List<WriteCommand> modifications = txMods.remove(gtx);
-         broadcastInvalidate(modifications, tx, ctx);
-         if (trace) log.trace("Committing.  Broadcasting invalidations.");
-      }
-      return retval;
-   }
-
-   @Override
-   public Object visitRollbackCommand(InvocationContext ctx, RollbackCommand command) throws Throwable
-   {
-      Object retval = invokeNextInterceptor(ctx, command);
-      Transaction tx = ctx.getTransaction();
-      if (tx != null && optimistic)
-      {
-         GlobalTransaction gtx = ctx.getGlobalTransaction();
-         txMods.remove(gtx);
-         log.debug("Caught a rollback.  Clearing modification in txMods");
-      }
-      return retval;
-   }
-
-   /**
-    * @param from    is only present for move operations, else pass it in as null
-    * @param command
-    */
-   private Object handleWriteMethod(InvocationContext ctx, Fqn targetFqn, Fqn from, VisitableCommand command)
-         throws Throwable
-   {
-      Object retval = invokeNextInterceptor(ctx, command);
-      Transaction tx = ctx.getTransaction();
-      Option optionOverride = ctx.getOptionOverrides();
-      if (log.isDebugEnabled()) log.debug("Is a CRUD method");
-      Set<Fqn> fqns = new HashSet<Fqn>();
-      if (from != null)
-      {
-         fqns.add(from);
-      }
-      fqns.add(targetFqn);
-      if (!fqns.isEmpty())
-      {
-         // could be potentially TRANSACTIONAL.  Ignore if it is, until we see a prepare().
-         if (tx == null || !TransactionTable.isValid(tx))
-         {
-            // the no-tx case:
-            //replicate an evict call.
-            for (Fqn fqn : fqns) invalidateAcrossCluster(fqn, null, isSynchronous(optionOverride), ctx);
-         }
-         else
-         {
-            if (isLocalModeForced(ctx)) ctx.getTransactionContext().addLocalModification((WriteCommand) command);
-         }
-      }
-      return retval;
-   }
-
-   private void broadcastInvalidate(List<WriteCommand> modifications, Transaction tx, InvocationContext ctx) throws Throwable
-   {
-      if (ctx.getTransaction() != null && !isLocalModeForced(ctx))
-      {
-         if (modifications == null || modifications.isEmpty()) return;
-         InvalidationFilterVisitor filterVisitor = new InvalidationFilterVisitor(modifications.size());
-         filterVisitor.visitCollection(null, modifications);
-
-         if (filterVisitor.containsPutForExternalRead)
-         {
-            log.debug("Modification list contains a putForExternalRead operation.  Not invalidating.");
-         }
-         else
-         {
-            try
-            {
-               for (Fqn fqn : filterVisitor.result) invalidateAcrossCluster(fqn, null, defaultSynchronous, ctx);
-            }
-            catch (Throwable t)
-            {
-               log.warn("Unable to broadcast evicts as a part of the prepare phase.  Rolling back.", t);
-               try
-               {
-                  tx.setRollbackOnly();
-               }
-               catch (SystemException se)
-               {
-                  throw new RuntimeException("setting tx rollback failed ", se);
-               }
-               if (t instanceof RuntimeException)
-                  throw (RuntimeException) t;
-               else
-                  throw new RuntimeException("Unable to broadcast invalidation messages", t);
-            }
-         }
-      }
-   }
-
-   public static class InvalidationFilterVisitor extends AbstractVisitor
-   {
-      Set<Fqn> result;
-      public boolean containsPutForExternalRead;
-
-      public InvalidationFilterVisitor(int maxSetSize)
-      {
-         result = new HashSet<Fqn>(maxSetSize);
-      }
-
-      @Override
-      public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable
-      {
-         result.add(command.getFqn());
-         return null;
-      }
-
-      @Override
-      public Object visitPutForExternalReadCommand(InvocationContext ctx, PutForExternalReadCommand command) throws Throwable
-      {
-         containsPutForExternalRead = true;
-         return null;
-      }
-
-      @Override
-      public Object visitPutDataMapCommand(InvocationContext ctx, PutDataMapCommand command) throws Throwable
-      {
-         result.add(command.getFqn());
-         return null;
-      }
-
-      @Override
-      public Object visitRemoveNodeCommand(InvocationContext ctx, RemoveNodeCommand command) throws Throwable
-      {
-         result.add(command.getFqn());
-         return null;
-      }
-
-      @Override
-      public Object visitClearDataCommand(InvocationContext ctx, ClearDataCommand command) throws Throwable
-      {
-         result.add(command.getFqn());
-         return null;
-      }
-
-      @Override
-      public Object visitRemoveKeyCommand(InvocationContext ctx, RemoveKeyCommand command) throws Throwable
-      {
-         result.add(command.getFqn());
-         return null;
-      }
-
-      @Override
-      public Object visitMoveCommand(InvocationContext ctx, MoveCommand command) throws Throwable
-      {
-         result.add(command.getFqn());
-         // now if this is a "move" operation, then we also have another Fqn -
-         Object le = command.getFqn().getLastElement();
-         Fqn parent = command.getTo();
-         result.add(Fqn.fromRelativeElements(parent, le));
-         return null;
-      }
-   }
-
-
-   protected void invalidateAcrossCluster(Fqn fqn, Object workspace, boolean synchronous, InvocationContext ctx) throws Throwable
-   {
-      /*
-      if (!isLocalModeForced(ctx))
-      {
-         // increment invalidations counter if statistics maintained
-         incrementInvalidations();
-         InvalidateCommand command = commandsFactory.buildInvalidateCommand(fqn);
-         DataVersion dataVersion = getNodeVersion(workspace, fqn);
-         if (dataVersion != null) ((VersionedInvalidateCommand) command).setDataVersion(dataVersion);
-         if (log.isDebugEnabled()) log.debug("Cache [" + rpcManager.getLocalAddress() + "] replicating " + command);
-         // voila, invalidated!
-         replicateCall(ctx, command, synchronous, ctx.getOptionOverrides());
-      }
-      */
-   }
-
-   private void incrementInvalidations()
-   {
-      if (getStatisticsEnabled()) invalidations++;
-   }
-
-   @ManagedOperation
-   public void resetStatistics()
-   {
-      invalidations = 0;
-   }
-
-   @ManagedOperation
-   public Map<String, Object> dumpStatistics()
-   {
-      Map<String, Object> retval = new HashMap<String, Object>();
-      retval.put("Invalidations", invalidations);
-      return retval;
-   }
-
-   @ManagedAttribute
-   public boolean getStatisticsEnabled()
-   {
-      return this.statsEnabled;
-   }
-
-   @ManagedAttribute
-   public void setStatisticsEnabled(boolean enabled)
-   {
-      this.statsEnabled = enabled;
-   }
-
-   @ManagedAttribute(description = "number of invalidations")
-   public long getInvalidations()
-   {
-      return invalidations;
-   }
-}

Copied: core/branches/flat/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor_Legacy.java (from rev 6936, core/branches/flat/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor_Legacy.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor_Legacy.java	2008-10-15 19:41:25 UTC (rev 6960)
@@ -0,0 +1,392 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.cache.interceptors;
+
+import org.jboss.cache.Fqn;
+import org.jboss.cache.InvocationContext;
+import org.jboss.cache.commands.AbstractVisitor;
+import org.jboss.cache.commands.CommandsFactory;
+import org.jboss.cache.commands.VisitableCommand;
+import org.jboss.cache.commands.WriteCommand;
+import org.jboss.cache.commands.tx.CommitCommand;
+import org.jboss.cache.commands.tx.PrepareCommand;
+import org.jboss.cache.commands.tx.RollbackCommand;
+import org.jboss.cache.commands.write.ClearDataCommand;
+import org.jboss.cache.commands.write.MoveCommand;
+import org.jboss.cache.commands.write.PutDataMapCommand;
+import org.jboss.cache.commands.write.PutForExternalReadCommand;
+import org.jboss.cache.commands.write.PutKeyValueCommand;
+import org.jboss.cache.commands.write.RemoveKeyCommand;
+import org.jboss.cache.commands.write.RemoveNodeCommand;
+import org.jboss.cache.jmx.annotations.ManagedAttribute;
+import org.jboss.cache.jmx.annotations.ManagedOperation;
+import org.jboss.cache.transaction.GlobalTransaction;
+import org.jboss.cache.transaction.TransactionContext;
+import org.jboss.cache.transaction.TransactionTable;
+import org.jboss.starobrno.config.Option;
+import org.jboss.starobrno.factories.annotations.Inject;
+import org.jboss.starobrno.factories.annotations.Start;
+
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This interceptor acts as a replacement to the replication interceptor when
+ * the CacheImpl is configured with ClusteredSyncMode as INVALIDATE.
+ * <p/>
+ * The idea is that rather than replicating changes to all caches in a cluster
+ * when CRUD (Create, Remove, Update, Delete) methods are called, simply call
+ * evict(Fqn) on the remote caches for each changed node.  This allows the
+ * remote node to look up the value in a shared cache loader which would have
+ * been updated with the changes.
+ *
+ * @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
+ */
+public class InvalidationInterceptor_Legacy extends BaseRpcInterceptor
+{
+   private long invalidations = 0;
+   protected Map<GlobalTransaction, List<WriteCommand>> txMods;
+   protected boolean optimistic;
+   private CommandsFactory commandsFactory;
+   private boolean statsEnabled;
+
+   @Inject
+   public void injectDependencies(CommandsFactory commandsFactory)
+   {
+      this.commandsFactory = commandsFactory;
+   }
+
+   @Start
+   private void initTxMap()
+   {
+      optimistic = false;
+      if (optimistic) txMods = new ConcurrentHashMap<GlobalTransaction, List<WriteCommand>>();
+      this.setStatisticsEnabled(configuration.getExposeManagementStatistics());
+   }
+
+   @Override
+   public Object visitPutDataMapCommand(InvocationContext ctx, PutDataMapCommand command) throws Throwable
+   {
+      return handleWriteMethod(ctx, command.getFqn(), null, command);
+   }
+
+   @Override
+   public Object visitPutForExternalReadCommand(InvocationContext ctx, PutForExternalReadCommand command) throws Throwable
+   {
+      // these are always local more, as far as invalidation is concerned
+      if (ctx.getTransaction() != null) ctx.getTransactionContext().addLocalModification(command);
+      return invokeNextInterceptor(ctx, command);
+   }
+
+   @Override
+   public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable
+   {
+      return handleWriteMethod(ctx, command.getFqn(), null, command);
+   }
+
+   @Override
+   public Object visitRemoveNodeCommand(InvocationContext ctx, RemoveNodeCommand command) throws Throwable
+   {
+      return handleWriteMethod(ctx, command.getFqn(), null, command);
+   }
+
+   @Override
+   public Object visitRemoveKeyCommand(InvocationContext ctx, RemoveKeyCommand command) throws Throwable
+   {
+      return handleWriteMethod(ctx, command.getFqn(), null, command);
+   }
+
+   @Override
+   public Object visitMoveCommand(InvocationContext ctx, MoveCommand command) throws Throwable
+   {
+      return handleWriteMethod(ctx, command.getTo(), command.getFqn(), command);
+   }
+
+   @Override
+   public Object visitClearDataCommand(InvocationContext ctx, ClearDataCommand command) throws Throwable
+   {
+      return handleWriteMethod(ctx, command.getFqn(), null, command);
+   }
+
+   @Override
+   public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand command) throws Throwable
+   {
+      Object retval = invokeNextInterceptor(ctx, command);
+      Transaction tx = ctx.getTransaction();
+      if (tx != null)
+      {
+         if (trace) log.trace("Entering InvalidationInterceptor_Legacy's prepare phase");
+         // fetch the modifications before the transaction is committed (and thus removed from the txTable)
+         GlobalTransaction gtx = ctx.getGlobalTransaction();
+         TransactionContext transactionContext = ctx.getTransactionContext();
+         if (transactionContext == null)
+            throw new IllegalStateException("cannot find transaction transactionContext for " + gtx);
+
+         if (transactionContext.hasModifications())
+         {
+            List<WriteCommand> mods;
+            if (transactionContext.hasLocalModifications())
+            {
+               mods = new ArrayList<WriteCommand>(command.getModifications());
+               mods.removeAll(transactionContext.getLocalModifications());
+            }
+            else
+            {
+               mods = command.getModifications();
+            }
+            broadcastInvalidate(mods, tx, ctx);
+         }
+         else
+         {
+            if (trace) log.trace("Nothing to invalidate - no modifications in the transaction.");
+         }
+      }
+      return retval;
+   }
+
+   @Override
+   public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws Throwable
+   {
+      Object retval = invokeNextInterceptor(ctx, command);
+      Transaction tx = ctx.getTransaction();
+      if (tx != null && optimistic)
+      {
+         GlobalTransaction gtx = ctx.getGlobalTransaction();
+         List<WriteCommand> modifications = txMods.remove(gtx);
+         broadcastInvalidate(modifications, tx, ctx);
+         if (trace) log.trace("Committing.  Broadcasting invalidations.");
+      }
+      return retval;
+   }
+
+   @Override
+   public Object visitRollbackCommand(InvocationContext ctx, RollbackCommand command) throws Throwable
+   {
+      Object retval = invokeNextInterceptor(ctx, command);
+      Transaction tx = ctx.getTransaction();
+      if (tx != null && optimistic)
+      {
+         GlobalTransaction gtx = ctx.getGlobalTransaction();
+         txMods.remove(gtx);
+         log.debug("Caught a rollback.  Clearing modification in txMods");
+      }
+      return retval;
+   }
+
+   /**
+    * @param from    is only present for move operations, else pass it in as null
+    * @param command
+    */
+   private Object handleWriteMethod(InvocationContext ctx, Fqn targetFqn, Fqn from, VisitableCommand command)
+         throws Throwable
+   {
+      Object retval = invokeNextInterceptor(ctx, command);
+      Transaction tx = ctx.getTransaction();
+      Option optionOverride = ctx.getOptionOverrides();
+      if (log.isDebugEnabled()) log.debug("Is a CRUD method");
+      Set<Fqn> fqns = new HashSet<Fqn>();
+      if (from != null)
+      {
+         fqns.add(from);
+      }
+      fqns.add(targetFqn);
+      if (!fqns.isEmpty())
+      {
+         // could be potentially TRANSACTIONAL.  Ignore if it is, until we see a prepare().
+         if (tx == null || !TransactionTable.isValid(tx))
+         {
+            // the no-tx case:
+            //replicate an evict call.
+            for (Fqn fqn : fqns) invalidateAcrossCluster(fqn, null, isSynchronous(optionOverride), ctx);
+         }
+         else
+         {
+            if (isLocalModeForced(ctx)) ctx.getTransactionContext().addLocalModification((WriteCommand) command);
+         }
+      }
+      return retval;
+   }
+
+   private void broadcastInvalidate(List<WriteCommand> modifications, Transaction tx, InvocationContext ctx) throws Throwable
+   {
+      if (ctx.getTransaction() != null && !isLocalModeForced(ctx))
+      {
+         if (modifications == null || modifications.isEmpty()) return;
+         InvalidationFilterVisitor filterVisitor = new InvalidationFilterVisitor(modifications.size());
+         filterVisitor.visitCollection(null, modifications);
+
+         if (filterVisitor.containsPutForExternalRead)
+         {
+            log.debug("Modification list contains a putForExternalRead operation.  Not invalidating.");
+         }
+         else
+         {
+            try
+            {
+               for (Fqn fqn : filterVisitor.result) invalidateAcrossCluster(fqn, null, defaultSynchronous, ctx);
+            }
+            catch (Throwable t)
+            {
+               log.warn("Unable to broadcast evicts as a part of the prepare phase.  Rolling back.", t);
+               try
+               {
+                  tx.setRollbackOnly();
+               }
+               catch (SystemException se)
+               {
+                  throw new RuntimeException("setting tx rollback failed ", se);
+               }
+               if (t instanceof RuntimeException)
+                  throw (RuntimeException) t;
+               else
+                  throw new RuntimeException("Unable to broadcast invalidation messages", t);
+            }
+         }
+      }
+   }
+
+   public static class InvalidationFilterVisitor extends AbstractVisitor
+   {
+      Set<Fqn> result;
+      public boolean containsPutForExternalRead;
+
+      public InvalidationFilterVisitor(int maxSetSize)
+      {
+         result = new HashSet<Fqn>(maxSetSize);
+      }
+
+      @Override
+      public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable
+      {
+         result.add(command.getFqn());
+         return null;
+      }
+
+      @Override
+      public Object visitPutForExternalReadCommand(InvocationContext ctx, PutForExternalReadCommand command) throws Throwable
+      {
+         containsPutForExternalRead = true;
+         return null;
+      }
+
+      @Override
+      public Object visitPutDataMapCommand(InvocationContext ctx, PutDataMapCommand command) throws Throwable
+      {
+         result.add(command.getFqn());
+         return null;
+      }
+
+      @Override
+      public Object visitRemoveNodeCommand(InvocationContext ctx, RemoveNodeCommand command) throws Throwable
+      {
+         result.add(command.getFqn());
+         return null;
+      }
+
+      @Override
+      public Object visitClearDataCommand(InvocationContext ctx, ClearDataCommand command) throws Throwable
+      {
+         result.add(command.getFqn());
+         return null;
+      }
+
+      @Override
+      public Object visitRemoveKeyCommand(InvocationContext ctx, RemoveKeyCommand command) throws Throwable
+      {
+         result.add(command.getFqn());
+         return null;
+      }
+
+      @Override
+      public Object visitMoveCommand(InvocationContext ctx, MoveCommand command) throws Throwable
+      {
+         result.add(command.getFqn());
+         // now if this is a "move" operation, then we also have another Fqn -
+         Object le = command.getFqn().getLastElement();
+         Fqn parent = command.getTo();
+         result.add(Fqn.fromRelativeElements(parent, le));
+         return null;
+      }
+   }
+
+
+   protected void invalidateAcrossCluster(Fqn fqn, Object workspace, boolean synchronous, InvocationContext ctx) throws Throwable
+   {
+      /*
+      if (!isLocalModeForced(ctx))
+      {
+         // increment invalidations counter if statistics maintained
+         incrementInvalidations();
+         InvalidateCommand command = commandsFactory.buildInvalidateCommand(fqn);
+         DataVersion dataVersion = getNodeVersion(workspace, fqn);
+         if (dataVersion != null) ((VersionedInvalidateCommand) command).setDataVersion(dataVersion);
+         if (log.isDebugEnabled()) log.debug("Cache [" + rpcManager.getLocalAddress() + "] replicating " + command);
+         // voila, invalidated!
+         replicateCall(ctx, command, synchronous, ctx.getOptionOverrides());
+      }
+      */
+   }
+
+   private void incrementInvalidations()
+   {
+      if (getStatisticsEnabled()) invalidations++;
+   }
+
+   @ManagedOperation
+   public void resetStatistics()
+   {
+      invalidations = 0;
+   }
+
+   @ManagedOperation
+   public Map<String, Object> dumpStatistics()
+   {
+      Map<String, Object> retval = new HashMap<String, Object>();
+      retval.put("Invalidations", invalidations);
+      return retval;
+   }
+
+   @ManagedAttribute
+   public boolean getStatisticsEnabled()
+   {
+      return this.statsEnabled;
+   }
+
+   @ManagedAttribute
+   public void setStatisticsEnabled(boolean enabled)
+   {
+      this.statsEnabled = enabled;
+   }
+
+   @ManagedAttribute(description = "number of invalidations")
+   public long getInvalidations()
+   {
+      return invalidations;
+   }
+}


Property changes on: core/branches/flat/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor_Legacy.java
___________________________________________________________________
Name: svn:keywords
   + Author Date Id Revision
Name: svn:eol-style
   + native

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/Cache.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/Cache.java	2008-10-15 18:13:06 UTC (rev 6959)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/Cache.java	2008-10-15 19:41:25 UTC (rev 6960)
@@ -25,8 +25,10 @@
 import org.jboss.starobrno.config.Configuration;
 import org.jboss.starobrno.context.InvocationContext;
 import org.jboss.starobrno.lifecycle.Lifecycle;
+import org.jgroups.Address;
 
 import java.util.Set;
+import java.util.List;
 import java.util.concurrent.ConcurrentMap;
 
 /**
@@ -53,4 +55,6 @@
    public void startBatch();
 
    public void endBatch(boolean successful);
+
+   List<Address> getMembers();
 }

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java	2008-10-15 18:13:06 UTC (rev 6959)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java	2008-10-15 19:41:25 UTC (rev 6960)
@@ -49,6 +49,7 @@
 import org.jboss.starobrno.notifications.Notifier;
 import org.jboss.starobrno.transaction.GlobalTransaction;
 import org.jboss.starobrno.transaction.TransactionTable;
+import org.jgroups.Address;
 
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
@@ -71,6 +72,7 @@
    protected BatchContainer batchContainer;
    protected ComponentRegistry componentRegistry;
    protected TransactionManager transactionManager;
+   protected RPCManager rpcManager;
 
 
    @Inject
@@ -81,7 +83,8 @@
                                    Notifier notifier,
                                    ComponentRegistry componentRegistry,
                                    TransactionManager transactionManager,
-                                   BatchContainer batchContainer)
+                                   BatchContainer batchContainer,
+                                   RPCManager rpcManager)
    {
       this.invocationContextContainer = invocationContextContainer;
       this.commandsFactory = commandsFactory;
@@ -91,6 +94,7 @@
       this.componentRegistry = componentRegistry;
       this.transactionManager = transactionManager;
       this.batchContainer = batchContainer;
+      this.rpcManager = rpcManager;
    }
 
    public V putIfAbsent(K key, V value)
@@ -293,7 +297,7 @@
 
    public RPCManager getRPCManager()
    {
-      return null;  //TODO: Autogenerated.  Implement me properly
+      return rpcManager;
    }
 
    public StateTransferManager getStateTransferManager()
@@ -349,4 +353,9 @@
          throw new ConfigurationException("Invocation batching not enabled in current configuration!  Please use the <invocationBatching /> element.");
       batchContainer.endBatch(successful);
    }
+
+   public List<Address> getMembers()
+   {
+      return rpcManager.getMembers();      
+   }
 }

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/commands/AbstractVisitor.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/commands/AbstractVisitor.java	2008-10-15 18:13:06 UTC (rev 6959)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/commands/AbstractVisitor.java	2008-10-15 19:41:25 UTC (rev 6960)
@@ -27,12 +27,7 @@
 import org.jboss.starobrno.commands.tx.CommitCommand;
 import org.jboss.starobrno.commands.tx.PrepareCommand;
 import org.jboss.starobrno.commands.tx.RollbackCommand;
-import org.jboss.starobrno.commands.write.ClearCommand;
-import org.jboss.starobrno.commands.write.EvictCommand;
-import org.jboss.starobrno.commands.write.PutKeyValueCommand;
-import org.jboss.starobrno.commands.write.PutMapCommand;
-import org.jboss.starobrno.commands.write.RemoveCommand;
-import org.jboss.starobrno.commands.write.ReplaceCommand;
+import org.jboss.starobrno.commands.write.*;
 import org.jboss.starobrno.context.InvocationContext;
 
 import java.util.Collection;
@@ -112,6 +107,10 @@
       return handleDefault(ctx, command);
    }
 
+   public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand invalidateCommand) throws Throwable
+   {
+      return handleDefault(ctx, invalidateCommand);
+   }
 
    /**
     * A default handler for all commands visited.  This is called for any visit method called, unless a visit command is

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/commands/CommandsFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/commands/CommandsFactory.java	2008-10-15 18:13:06 UTC (rev 6959)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/commands/CommandsFactory.java	2008-10-15 19:41:25 UTC (rev 6960)
@@ -26,12 +26,7 @@
 import org.jboss.starobrno.commands.tx.CommitCommand;
 import org.jboss.starobrno.commands.tx.PrepareCommand;
 import org.jboss.starobrno.commands.tx.RollbackCommand;
-import org.jboss.starobrno.commands.write.ClearCommand;
-import org.jboss.starobrno.commands.write.EvictCommand;
-import org.jboss.starobrno.commands.write.PutKeyValueCommand;
-import org.jboss.starobrno.commands.write.PutMapCommand;
-import org.jboss.starobrno.commands.write.RemoveCommand;
-import org.jboss.starobrno.commands.write.ReplaceCommand;
+import org.jboss.starobrno.commands.write.*;
 import org.jboss.starobrno.commands.remote.ReplicateCommand;
 import org.jboss.starobrno.transaction.GlobalTransaction;
 import org.jgroups.Address;
@@ -71,4 +66,6 @@
    ReplicateCommand buildReplicateCommand(List<ReplicableCommand> toReplicate);
 
    ReplicateCommand buildReplicateCommand(ReplicableCommand call);
+
+   InvalidateCommand buildInvalidateCommand(Object fqn);
 }

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/commands/CommandsFactoryImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/commands/CommandsFactoryImpl.java	2008-10-15 18:13:06 UTC (rev 6959)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/commands/CommandsFactoryImpl.java	2008-10-15 19:41:25 UTC (rev 6960)
@@ -22,17 +22,13 @@
 package org.jboss.starobrno.commands;
 
 import org.jboss.starobrno.CacheException;
+import org.jboss.starobrno.interceptors.InterceptorChain;
 import org.jboss.starobrno.commands.read.GetKeyValueCommand;
 import org.jboss.starobrno.commands.read.SizeCommand;
 import org.jboss.starobrno.commands.tx.CommitCommand;
 import org.jboss.starobrno.commands.tx.PrepareCommand;
 import org.jboss.starobrno.commands.tx.RollbackCommand;
-import org.jboss.starobrno.commands.write.ClearCommand;
-import org.jboss.starobrno.commands.write.EvictCommand;
-import org.jboss.starobrno.commands.write.PutKeyValueCommand;
-import org.jboss.starobrno.commands.write.PutMapCommand;
-import org.jboss.starobrno.commands.write.RemoveCommand;
-import org.jboss.starobrno.commands.write.ReplaceCommand;
+import org.jboss.starobrno.commands.write.*;
 import org.jboss.starobrno.commands.remote.ReplicateCommand;
 import org.jboss.starobrno.container.DataContainer;
 import org.jboss.starobrno.factories.annotations.Inject;
@@ -50,12 +46,14 @@
 {
    private DataContainer dataContainer;
    private Notifier notifier;
+   private InterceptorChain interceptorChain;
 
    @Inject
-   private void setupDependencies(DataContainer container, Notifier notifier)
+   private void setupDependencies(DataContainer container, Notifier notifier, InterceptorChain interceptorChain)
    {
       this.dataContainer = container;
       this.notifier = notifier;
+      this.interceptorChain = interceptorChain;
    }
 
    public PutKeyValueCommand buildPutKeyValueCommand(Object key, Object value)
@@ -185,11 +183,24 @@
             command = c;
             break;
          }
+         case ReplicateCommand.MULTIPLE_METHOD_ID:
+         case org.jboss.cache.commands.remote.ReplicateCommand.SINGLE_METHOD_ID:
+         {
+            ReplicateCommand c = new ReplicateCommand();
+            c.initialize(interceptorChain);
+            command = c;
+            break;
+         }
+
          default:
             throw new CacheException("Unknown command id " + id + "!");
       }
-
       command.setParameters(id, parameters);
       return command;
    }
+
+   public InvalidateCommand buildInvalidateCommand(Object fqn)
+   {
+      throw new UnsupportedOperationException("Not implemented");//todo please implement!
+   }
 }

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/commands/Visitor.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/commands/Visitor.java	2008-10-15 18:13:06 UTC (rev 6959)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/commands/Visitor.java	2008-10-15 19:41:25 UTC (rev 6960)
@@ -27,12 +27,7 @@
 import org.jboss.starobrno.commands.tx.CommitCommand;
 import org.jboss.starobrno.commands.tx.PrepareCommand;
 import org.jboss.starobrno.commands.tx.RollbackCommand;
-import org.jboss.starobrno.commands.write.ClearCommand;
-import org.jboss.starobrno.commands.write.EvictCommand;
-import org.jboss.starobrno.commands.write.PutKeyValueCommand;
-import org.jboss.starobrno.commands.write.PutMapCommand;
-import org.jboss.starobrno.commands.write.RemoveCommand;
-import org.jboss.starobrno.commands.write.ReplaceCommand;
+import org.jboss.starobrno.commands.write.*;
 import org.jboss.starobrno.context.InvocationContext;
 
 public interface Visitor
@@ -67,4 +62,5 @@
 
    Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws Throwable;
 
+   Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand invalidateCommand) throws Throwable;
 }
\ No newline at end of file

Copied: core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/InvalidateCommand.java (from rev 6936, core/branches/flat/src/main/java/org/jboss/cache/commands/write/InvalidateCommand.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/InvalidateCommand.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/InvalidateCommand.java	2008-10-15 19:41:25 UTC (rev 6960)
@@ -0,0 +1,159 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.commands.write;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.starobrno.commands.read.AbstractDataCommand;
+import org.jboss.starobrno.commands.Visitor;
+import org.jboss.starobrno.CacheSPI;
+import org.jboss.starobrno.context.InvocationContext;
+import org.jboss.starobrno.container.DataContainer;
+import org.jboss.starobrno.notifications.Notifier;
+import org.jboss.cache.NodeSPI;
+import org.jboss.cache.Fqn;
+
+
+/**
+ * Removes a node's content from memory - never removes the node.
+ * It also clenups data for resident nodes - which are not being touched by eviction.
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 2.2
+ */
+public class InvalidateCommand extends AbstractDataCommand
+{
+   public static final int METHOD_ID = 47;
+   private static final Log log = LogFactory.getLog(InvalidateCommand.class);
+   private static final boolean trace = log.isTraceEnabled();
+
+   /* dependencies*/
+   protected CacheSPI spi;
+   protected Notifier notifier;
+   protected DataContainer dataContainer;
+
+   public InvalidateCommand(Object key)
+   {
+      this.key = key;
+   }
+
+   public InvalidateCommand()
+   {
+   }
+
+   public void initialize(CacheSPI cacheSpi, DataContainer dataContainer, Notifier notifier)
+   {
+      this.spi = cacheSpi;
+      this.dataContainer = dataContainer;
+      this.notifier = notifier;
+   }
+
+   /**
+    * Performs an invalidation on a specified node
+    *
+    * @param ctx invocation context
+    * @return null
+    */
+   public Object perform(InvocationContext ctx)
+   {
+      Object value = enforceNodeLoading();
+      if (trace) log.trace("Invalidating key:" + key);
+      if (value == null)
+      {
+         return null;
+      }
+      evictNode(key, ctx);
+//      dataContainer.
+      return null;
+   }
+
+   boolean evictNode(Object key, InvocationContext ctx)
+   {
+      notifier.notifyNodeInvalidated(key, true, ctx);
+      try
+      {
+         return dataContainer.evict(key);
+      }
+      finally
+      {
+         notifier.notifyNodeInvalidated(key, false, ctx);
+      }
+   }
+
+
+   /**
+    * //TODO: 2.2.0: rather than using CacheSPI this should use peek().  The other interceptors should obtain locks and load nodes if necessary for this InvalidateCommand.
+    * //Even better - this can be handles in the interceptors before call interceptor
+    */
+   protected Object enforceNodeLoading()
+   {
+      return spi.get(key);
+   }
+
+
+   /**
+    * mark the node to be removed (and all children) as invalid so anyone holding a direct reference to it will
+    * be aware that it is no longer valid.
+    */
+   protected void invalidateNode(NodeSPI node)
+   {
+      node.setValid(false, true);
+      // root nodes can never be invalid
+//      if (fqn.isRoot()) node.setValid(true, false); // non-recursive.
+   }
+
+
+   public Object acceptVisitor(InvocationContext ctx, Visitor visitor) throws Throwable
+   {
+      return visitor.visitInvalidateCommand(ctx, this);
+   }
+
+   public byte getCommandId()
+   {
+      return METHOD_ID;
+   }
+
+   @Override
+   public String toString()
+   {
+      return "InvalidateCommand{" +
+            "key=" + key +
+            '}';
+   }
+
+   @Override
+   public Object[] getParameters()
+   {
+      return new Object[]{key};
+   }
+
+   @Override
+   public void setParameters(int commandId, Object[] args)
+   {
+      key = args[0];
+   }
+
+   void setFqn(Fqn newFqn)
+   {
+      this.key = newFqn;
+   }
+}
\ No newline at end of file


Property changes on: core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/InvalidateCommand.java
___________________________________________________________________
Name: svn:mergeinfo
   + 

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/container/DataContainer.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/container/DataContainer.java	2008-10-15 18:13:06 UTC (rev 6959)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/container/DataContainer.java	2008-10-15 19:41:25 UTC (rev 6960)
@@ -44,4 +44,6 @@
    void clear();
 
    Set<K> keySet();
+
+   boolean evict(Object key);
 }

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/container/UnsortedDataContainer.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/container/UnsortedDataContainer.java	2008-10-15 18:13:06 UTC (rev 6959)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/container/UnsortedDataContainer.java	2008-10-15 19:41:25 UTC (rev 6960)
@@ -69,4 +69,9 @@
    {
       return data.keySet();
    }
+
+   public boolean evict(Object key)
+   {
+      throw new UnsupportedOperationException("Not implemented");//todo please implement!
+   }
 }

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/factories/InterceptorChainFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/factories/InterceptorChainFactory.java	2008-10-15 18:13:06 UTC (rev 6959)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/factories/InterceptorChainFactory.java	2008-10-15 19:41:25 UTC (rev 6960)
@@ -26,15 +26,7 @@
 import org.jboss.starobrno.config.ConfigurationException;
 import org.jboss.starobrno.config.CustomInterceptorConfig;
 import org.jboss.starobrno.factories.annotations.DefaultFactoryFor;
-import org.jboss.starobrno.interceptors.BatchingInterceptor;
-import org.jboss.starobrno.interceptors.CacheMgmtInterceptor;
-import org.jboss.starobrno.interceptors.CallInterceptor;
-import org.jboss.starobrno.interceptors.InterceptorChain;
-import org.jboss.starobrno.interceptors.InvocationContextInterceptor;
-import org.jboss.starobrno.interceptors.LockingInterceptor;
-import org.jboss.starobrno.interceptors.MarshalledValueInterceptor;
-import org.jboss.starobrno.interceptors.NotificationInterceptor;
-import org.jboss.starobrno.interceptors.TxInterceptor;
+import org.jboss.starobrno.interceptors.*;
 import org.jboss.starobrno.interceptors.base.CommandInterceptor;
 
 import java.util.List;
@@ -111,20 +103,20 @@
 
       interceptorChain.appendIntereceptor(createInterceptor(NotificationInterceptor.class));
 
-      // TODO: Uncomment once the Repl and Inval interceptors has been moved to Starobrno
-//      switch (configuration.getCacheMode())
-//      {
-//         case REPL_SYNC:
-//         case REPL_ASYNC:
-//            interceptorChain.appendIntereceptor(createInterceptor(ReplicationInterceptor.class));
-//            break;
-//         case INVALIDATION_SYNC:
-//         case INVALIDATION_ASYNC:
-//            interceptorChain.appendIntereceptor(createInterceptor(InvalidationInterceptor.class));
-//            break;
-//         case LOCAL:
-//            //Nothing...
-//      }
+//       TODO: Uncomment once the Repl and Inval interceptors has been moved to Starobrno
+      switch (configuration.getCacheMode())
+      {
+         case REPL_SYNC:
+         case REPL_ASYNC:
+            interceptorChain.appendIntereceptor(createInterceptor(ReplicationInterceptor.class));
+            break;
+         case INVALIDATION_SYNC:
+         case INVALIDATION_ASYNC:
+            interceptorChain.appendIntereceptor(createInterceptor(InvalidationInterceptor.class));
+            break;
+         case LOCAL:
+            //Nothing...
+      }
 
       // TODO: Uncomment once the CacheLoader has been moved to Starobrno
 //      if (configuration.isUsingCacheLoaders())

Copied: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvalidationInterceptor.java (from rev 6936, core/branches/flat/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvalidationInterceptor.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvalidationInterceptor.java	2008-10-15 19:41:25 UTC (rev 6960)
@@ -0,0 +1,273 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.interceptors;
+
+import org.jboss.starobrno.commands.AbstractVisitor;
+import org.jboss.starobrno.commands.CommandsFactory;
+import org.jboss.starobrno.commands.DataCommand;
+import org.jboss.starobrno.commands.VisitableCommand;
+import org.jboss.starobrno.commands.tx.PrepareCommand;
+import org.jboss.starobrno.commands.write.ClearCommand;
+import org.jboss.starobrno.commands.write.InvalidateCommand;
+import org.jboss.starobrno.commands.write.PutKeyValueCommand;
+import org.jboss.starobrno.commands.write.RemoveCommand;
+import org.jboss.starobrno.config.Option;
+import org.jboss.starobrno.context.InvocationContext;
+import org.jboss.starobrno.context.TransactionContext;
+import org.jboss.starobrno.factories.annotations.Inject;
+import org.jboss.starobrno.factories.annotations.Start;
+import org.jboss.starobrno.interceptors.base.BaseRpcInterceptor;
+import org.jboss.starobrno.jmx.annotations.ManagedAttribute;
+import org.jboss.starobrno.jmx.annotations.ManagedOperation;
+import org.jboss.starobrno.transaction.GlobalTransaction;
+import org.jboss.starobrno.transaction.TransactionTable;
+
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import java.util.*;
+
+
+/**
+ * This interceptor acts as a replacement to the replication interceptor when
+ * the CacheImpl is configured with ClusteredSyncMode as INVALIDATE.
+ * <p/>
+ * The idea is that rather than replicating changes to all caches in a cluster
+ * when CRUD (Create, Remove, Update, Delete) methods are called, simply call
+ * evict(Fqn) on the remote caches for each changed node.  This allows the
+ * remote node to look up the value in a shared cache loader which would have
+ * been updated with the changes.
+ *
+ * @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
+ */
+public class InvalidationInterceptor extends BaseRpcInterceptor
+{
+   private long invalidations = 0;
+   protected Map<GlobalTransaction, List<VisitableCommand>> txMods;
+   private CommandsFactory commandsFactory;
+   private boolean statsEnabled;
+
+   @Inject
+   public void injectDependencies(CommandsFactory commandsFactory)
+   {
+      this.commandsFactory = commandsFactory;
+   }
+
+   @Start
+   private void initTxMap()
+   {
+      this.setStatisticsEnabled(configuration.getExposeManagementStatistics());
+   }
+
+   @Override
+   public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable
+   {
+      return handleWriteMethod(ctx, command, command);
+   }
+
+   @Override
+   public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable
+   {
+      return handleWriteMethod(ctx, command.getKey(), command);
+   }
+
+   @Override
+   public Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws Throwable
+   {
+//      return handleWriteMethod(ctx, command.getKey(), command);
+      //todo handle this - should perfor a remote invalidation aswell!!!
+      return null;
+   }
+
+   @Override
+   public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand command) throws Throwable
+   {
+      Object retval = invokeNextInterceptor(ctx, command);
+      Transaction tx = ctx.getTransaction();
+      if (tx != null)
+      {
+         if (trace) log.trace("Entering InvalidationInterceptor_Legacy's prepare phase");
+         // fetch the modifications before the transaction is committed (and thus removed from the txTable)
+         GlobalTransaction gtx = ctx.getGlobalTransaction();
+         TransactionContext transactionContext = ctx.getTransactionContext();
+         if (transactionContext == null)
+            throw new IllegalStateException("cannot find transaction transactionContext for " + gtx);
+
+         if (transactionContext.hasModifications())
+         {
+            List<DataCommand> mods;
+            if (transactionContext.hasLocalModifications())
+            {
+               mods = new ArrayList<DataCommand>(command.getModifications());
+               mods.removeAll(transactionContext.getLocalModifications());
+            }
+            else
+            {
+               mods = command.getModifications();
+            }
+            broadcastInvalidate(mods, tx, ctx);
+         }
+         else
+         {
+            if (trace) log.trace("Nothing to invalidate - no modifications in the transaction.");
+         }
+      }
+      return retval;
+   }
+
+   private Object handleWriteMethod(InvocationContext ctx, Object key, VisitableCommand command)
+         throws Throwable
+   {
+      Object retval = invokeNextInterceptor(ctx, command);
+      Transaction tx = ctx.getTransaction();
+      Option optionOverride = ctx.getOptionOverrides();
+      if (log.isDebugEnabled()) log.debug("Is a CRUD method");
+      if (key != null)
+      {
+         // could be potentially TRANSACTIONAL.  Ignore if it is, until we see a prepare().
+         if (tx == null || !TransactionTable.isValid(tx))
+         {
+            // the no-tx case:
+            //replicate an evict call.
+            invalidateAcrossCluster(key, null, isSynchronous(optionOverride), ctx);
+         }
+         else
+         {
+            if (isLocalModeForced(ctx)) ctx.getTransactionContext().addLocalModification(command);
+         }
+      }
+      return retval;
+   }
+
+   private void broadcastInvalidate(List<DataCommand> modifications, Transaction tx, InvocationContext ctx) throws Throwable
+   {
+      if (ctx.getTransaction() != null && !isLocalModeForced(ctx))
+      {
+         if (modifications == null || modifications.isEmpty()) return;
+         InvalidationFilterVisitor filterVisitor = new InvalidationFilterVisitor(modifications.size());
+         filterVisitor.visitCollection(null, modifications);
+
+         if (filterVisitor.containsPutForExternalRead)
+         {
+            log.debug("Modification list contains a putForExternalRead operation.  Not invalidating.");
+         }
+         else
+         {
+            try
+            {
+               for (Object key : filterVisitor.result) invalidateAcrossCluster(key, null, defaultSynchronous, ctx);
+            }
+            catch (Throwable t)
+            {
+               log.warn("Unable to broadcast evicts as a part of the prepare phase.  Rolling back.", t);
+               try
+               {
+                  tx.setRollbackOnly();
+               }
+               catch (SystemException se)
+               {
+                  throw new RuntimeException("setting tx rollback failed ", se);
+               }
+               if (t instanceof RuntimeException)
+                  throw (RuntimeException) t;
+               else
+                  throw new RuntimeException("Unable to broadcast invalidation messages", t);
+            }
+         }
+      }
+   }
+
+   public static class InvalidationFilterVisitor extends AbstractVisitor
+   {
+      Set<Object> result;
+      public boolean containsPutForExternalRead;
+
+      public InvalidationFilterVisitor(int maxSetSize)
+      {
+         result = new HashSet<Object>(maxSetSize);
+      }
+
+      @Override
+      public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable
+      {
+         result.add(command.getKey());
+         return null;
+      }
+
+      @Override
+      public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable
+      {
+         result.add(command.getKey());
+         return null;
+      }
+   }
+
+
+   protected void invalidateAcrossCluster(Object fqn, Object workspace, boolean synchronous, InvocationContext ctx) throws Throwable
+   {
+      if (!isLocalModeForced(ctx))
+      {
+         // increment invalidations counter if statistics maintained
+         incrementInvalidations();
+         InvalidateCommand command = commandsFactory.buildInvalidateCommand(fqn);
+         if (log.isDebugEnabled()) log.debug("Cache [" + rpcManager.getLocalAddress() + "] replicating " + command);
+         // voila, invalidated!
+         replicateCall(ctx, command, synchronous, ctx.getOptionOverrides());
+      }
+   }
+
+   private void incrementInvalidations()
+   {
+      if (getStatisticsEnabled()) invalidations++;
+   }
+
+   @ManagedOperation
+   public void resetStatistics()
+   {
+      invalidations = 0;
+   }
+
+   @ManagedOperation
+   public Map<String, Object> dumpStatistics()
+   {
+      Map<String, Object> retval = new HashMap<String, Object>();
+      retval.put("Invalidations", invalidations);
+      return retval;
+   }
+
+   @ManagedAttribute
+   public boolean getStatisticsEnabled()
+   {
+      return this.statsEnabled;
+   }
+
+   @ManagedAttribute
+   public void setStatisticsEnabled(boolean enabled)
+   {
+      this.statsEnabled = enabled;
+   }
+
+   @ManagedAttribute(description = "number of invalidations")
+   public long getInvalidations()
+   {
+      return invalidations;
+   }
+}
\ No newline at end of file


Property changes on: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvalidationInterceptor.java
___________________________________________________________________
Name: svn:keywords
   + Author Date Id Revision
Name: svn:mergeinfo
   + 
Name: svn:eol-style
   + native

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/manager/CacheManager.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/manager/CacheManager.java	2008-10-15 18:13:06 UTC (rev 6959)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/manager/CacheManager.java	2008-10-15 19:41:25 UTC (rev 6960)
@@ -42,7 +42,15 @@
 
    public CacheManager(Configuration c)
    {
-      this.c = c;
+      try
+      {
+         //if a config is shared between multiple managers, then each registers it's
+         // own chnnel in runtime
+         this.c = c.clone();
+      } catch (CloneNotSupportedException e)
+      {
+         throw new RuntimeException(e);
+      }
    }
 
    public void start()

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CacheMarshallerStarobrno.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CacheMarshallerStarobrno.java	2008-10-15 18:13:06 UTC (rev 6959)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CacheMarshallerStarobrno.java	2008-10-15 19:41:25 UTC (rev 6960)
@@ -25,6 +25,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.jboss.starobrno.CacheException;
 import org.jboss.starobrno.io.ByteBuffer;
+import org.jboss.starobrno.io.ExposedByteArrayOutputStream;
 import org.jboss.starobrno.commands.CommandsFactory;
 import org.jboss.starobrno.commands.ReplicableCommand;
 import org.jboss.starobrno.config.Configuration;
@@ -33,14 +34,13 @@
 import org.jboss.starobrno.util.FastCopyHashMap;
 import org.jboss.starobrno.util.Immutables;
 import org.jboss.util.NotImplementedException;
+import org.jboss.util.stream.MarshalledValueInputStream;
+import org.jboss.cache.marshall.Marshaller;
 import org.jgroups.Address;
 import org.jgroups.stack.IpAddress;
 import org.jgroups.util.Buffer;
 
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.InputStream;
+import java.io.*;
 import java.lang.reflect.Array;
 import java.util.*;
 
@@ -138,7 +138,144 @@
          marshallArray(o, out, refMap);
       } else
       {
-         marshallObject(o, out, refMap);
+         if (o == null)
+         {
+            out.writeByte(MAGICNUMBER_NULL);
+         }
+         else if (useRefs && refMap.containsKey(o))// see if this object has been marshalled before.
+         {
+            out.writeByte(MAGICNUMBER_REF);
+            writeReference(out, refMap.get(o));
+         }
+         else if (o instanceof ReplicableCommand)
+         {
+            ReplicableCommand command = (ReplicableCommand) o;
+
+            if (command.getCommandId() > -1)
+            {
+               out.writeByte(MAGICNUMBER_METHODCALL);
+               marshallCommand(command, out, refMap);
+            }
+            else
+            {
+               throw new IllegalArgumentException("MethodCall does not have a valid method id.  Was this method call created with MethodCallFactory?");
+            }
+         }
+         else if (o instanceof org.jgroups.blocks.MethodCall)
+         {
+            throw new IllegalArgumentException("Usage of a legacy MethodCall object!!");
+         }
+         else if (o instanceof MarshalledValue)
+         {
+            out.writeByte(MAGICNUMBER_MARSHALLEDVALUE);
+            ((MarshalledValue) o).writeExternal(out);
+         }
+         else if (o instanceof GlobalTransaction)
+         {
+            out.writeByte(MAGICNUMBER_GTX);
+            if (useRefs) writeReference(out, createReference(o, refMap));
+            marshallGlobalTransaction((GlobalTransaction) o, out, refMap);
+         }
+         else if (o instanceof IpAddress)
+         {
+            out.writeByte(MAGICNUMBER_IPADDRESS);
+            marshallIpAddress((IpAddress) o, out);
+         }
+         else if (o.getClass().equals(ArrayList.class))
+         {
+            out.writeByte(MAGICNUMBER_ARRAY_LIST);
+            marshallCollection((Collection) o, out, refMap);
+         }
+         else if (o.getClass().equals(LinkedList.class))
+         {
+            out.writeByte(MAGICNUMBER_LINKED_LIST);
+            marshallCollection((Collection) o, out, refMap);
+         }
+         else if (o.getClass().equals(HashMap.class))
+         {
+            out.writeByte(MAGICNUMBER_HASH_MAP);
+            marshallMap((Map) o, out, refMap);
+         }
+         else if (o.getClass().equals(TreeMap.class))
+         {
+            out.writeByte(MAGICNUMBER_TREE_MAP);
+            marshallMap((Map) o, out, refMap);
+         }
+         else if (o.getClass().equals(FastCopyHashMap.class))
+         {
+            out.writeByte(MAGICNUMBER_FASTCOPY_HASHMAP);
+            marshallMap((Map) o, out, refMap);
+         }
+         else if (o instanceof Map && Immutables.isImmutable(o))
+         {
+            out.writeByte(MAGICNUMBER_IMMUTABLE_MAPCOPY);
+            marshallMap((Map) o, out, refMap);
+         }
+         else if (o.getClass().equals(HashSet.class))
+         {
+            out.writeByte(MAGICNUMBER_HASH_SET);
+            marshallCollection((Collection) o, out, refMap);
+         }
+         else if (o.getClass().equals(TreeSet.class))
+         {
+            out.writeByte(MAGICNUMBER_TREE_SET);
+            marshallCollection((Collection) o, out, refMap);
+         }
+         else if (o instanceof Boolean)
+         {
+            out.writeByte(MAGICNUMBER_BOOLEAN);
+            out.writeBoolean(((Boolean) o).booleanValue());
+         }
+         else if (o instanceof Integer)
+         {
+            out.writeByte(MAGICNUMBER_INTEGER);
+            out.writeInt(((Integer) o).intValue());
+         }
+         else if (o instanceof Long)
+         {
+            out.writeByte(MAGICNUMBER_LONG);
+            out.writeLong(((Long) o).longValue());
+         }
+         else if (o instanceof Short)
+         {
+            out.writeByte(MAGICNUMBER_SHORT);
+            out.writeShort(((Short) o).shortValue());
+         }
+         else if (o instanceof String)
+         {
+            out.writeByte(MAGICNUMBER_STRING);
+            if (useRefs) writeReference(out, createReference(o, refMap));
+            marshallString((String) o, out);
+         }
+         else if (o instanceof NodeDataMarker)
+         {
+            out.writeByte(MAGICNUMBER_NODEDATA_MARKER);
+            ((Externalizable) o).writeExternal(out);
+         }
+         else if (o instanceof NodeDataExceptionMarker)
+         {
+            out.writeByte(MAGICNUMBER_NODEDATA_EXCEPTION_MARKER);
+            ((Externalizable) o).writeExternal(out);
+         }
+         else if (o instanceof NodeData)
+         {
+            out.writeByte(MAGICNUMBER_NODEDATA);
+            ((Externalizable) o).writeExternal(out);
+         }
+         else if (o instanceof Serializable)
+         {
+            if (trace)
+            {
+               log.trace("Warning: using object serialization for " + o.getClass());
+            }
+            out.writeByte(MAGICNUMBER_SERIALIZABLE);
+            if (useRefs) writeReference(out, createReference(o, refMap));
+            out.writeObject(o);
+         }
+         else
+         {
+            throw new Exception("Don't know how to marshall object of type " + o.getClass());
+         }
       }
    }
 
@@ -162,6 +299,12 @@
       }
    }
 
+   private int createReference(Object o, Map<Object, Integer> refMap)
+   {
+      int reference = refMap.size();
+      refMap.put(o, reference);
+      return reference;
+   }
 
    private void marshallGlobalTransaction(GlobalTransaction globalTransaction, ObjectOutputStream out, Map<Object, Integer> refMap) throws Exception
    {
@@ -787,12 +930,20 @@
 
    public ByteBuffer objectToBuffer(Object o) throws Exception
    {
-      throw new RuntimeException("Needs to be overridden!");
+      ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(128);
+      ObjectOutputStream out = new ObjectOutputStream(baos);
+
+      //now marshall the contents of the object
+      objectToObjectStream(o, out);
+      out.close();
+      // and return bytes.
+      return new ByteBuffer(baos.getRawBuffer(), 0, baos.size());
    }
 
    public Object objectFromByteBuffer(byte[] buf, int offset, int length) throws Exception
    {
-      throw new RuntimeException("Needs to be overridden!");
+      ObjectInputStream in = new MarshalledValueInputStream(new ByteArrayInputStream(buf, offset, length));
+      return objectFromObjectStream(in);
    }
 
    public Object objectFromByteBuffer(byte[] bytes) throws Exception

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CommandAwareRpcDispatcher.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CommandAwareRpcDispatcher.java	2008-10-15 18:13:06 UTC (rev 6959)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CommandAwareRpcDispatcher.java	2008-10-15 19:41:25 UTC (rev 6960)
@@ -130,12 +130,6 @@
 
    protected boolean isValid(Message req)
    {
-      if (server_obj == null)
-      {
-         log.error("no method handler is registered. Discarding request.");
-         return false;
-      }
-
       if (req == null || req.getLength() == 0)
       {
          log.error("message or message buffer is null");
@@ -160,8 +154,8 @@
       }
 
       if (trace)
-         log.trace(new StringBuilder("dests=").append(dests).append(", command=").append(command).
-               append(", mode=").append(mode).append(", timeout=").append(timeout));
+         log.trace(new StringBuilder("dests=").append(dests).append(", command=").append(command).append(", mode=").
+               append(mode).append(", timeout=").append(timeout));
 
       ReplicationTask replicationTask = new ReplicationTask(command, oob, dests, mode, timeout, false, filter);
       Future<RspList> response = replicationProcessor.submit(replicationTask);
@@ -292,7 +286,6 @@
          // a null response is 99% likely to be due to a marshalling problem - we throw a NSE, this needs to be changed when
          // JGroups supports http://jira.jboss.com/jira/browse/JGRP-193
          // the serialization problem could be on the remote end and this is why we cannot catch this above, when marshalling.
-
          if (retval == null)
             throw new NotSerializableException("RpcDispatcher returned a null.  This is most often caused by args for " + command.getClass().getSimpleName() + " not being serializable.");
          return retval;

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/notifications/Notifier.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/notifications/Notifier.java	2008-10-15 18:13:06 UTC (rev 6959)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/notifications/Notifier.java	2008-10-15 19:41:25 UTC (rev 6960)
@@ -120,15 +120,11 @@
 
    /**
     * Adds a cache listener to the list of cache listeners registered.
-    *
-    * @param listener
     */
    void addCacheListener(Object listener);
 
    /**
     * Removes a cache listener from the list of cache listeners registered.
-    *
-    * @param listener
     */
    void removeCacheListener(Object listener);
 
@@ -136,4 +132,6 @@
     * @return Retrieves an (unmodifiable) set of cache listeners registered.
     */
    Set<Object> getCacheListeners();
+
+   void notifyNodeInvalidated(Object key, boolean pre, InvocationContext ctx);
 }
\ No newline at end of file

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/notifications/NotifierImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/notifications/NotifierImpl.java	2008-10-15 18:13:06 UTC (rev 6959)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/notifications/NotifierImpl.java	2008-10-15 19:41:25 UTC (rev 6960)
@@ -639,4 +639,9 @@
       if (list == null) throw new CacheException("Unknown listener annotation: " + annotation);
       return list;
    }
+
+   public void notifyNodeInvalidated(Object key, boolean pre, InvocationContext ctx)
+   {
+      throw new UnsupportedOperationException("Not implemented");//todo please implement!
+   }
 }

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManagerImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManagerImpl.java	2008-10-15 18:13:06 UTC (rev 6959)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManagerImpl.java	2008-10-15 19:41:25 UTC (rev 6960)
@@ -115,13 +115,12 @@
    public void setupDependencies(ChannelMessageListener messageListener, Configuration configuration, Notifier notifier,
                                  ExtendedMarshaller extendedMarshaller, TransactionTable txTable,
                                  TransactionManager txManager, InvocationContextContainer container, InterceptorChain interceptorChain,
-                                 ComponentRegistry componentRegistry, LockManager lockManager)
+                                 ComponentRegistry componentRegistry, LockManager lockManager, CacheSPI spi)
    {
       this.messageListener = messageListener;
       this.configuration = configuration;
       this.notifier = notifier;
-      // TODO: Inject cacheSPI when we are ready
-//      this.spi = spi;
+      this.spi = spi;
       this.extendedMarshaller = extendedMarshaller;
       this.txManager = txManager;
       this.txTable = txTable;
@@ -392,7 +391,6 @@
                throw new TimeoutException("State retrieval timed out waiting for flush unblock.");
          }
          useOutOfBandMessage = false;
-         // todo fix me!!
          RspList rsps = rpcDispatcher.invokeRemoteCommands(recipients, command, modeToUse, timeout, useOutOfBandMessage, responseFilter);
          if (mode == GroupRequest.GET_NONE) return Collections.emptyList();// async case
          if (trace)

Modified: core/branches/flat/src/test/java/org/jboss/starobrno/BasicTest.java
===================================================================
--- core/branches/flat/src/test/java/org/jboss/starobrno/BasicTest.java	2008-10-15 18:13:06 UTC (rev 6959)
+++ core/branches/flat/src/test/java/org/jboss/starobrno/BasicTest.java	2008-10-15 19:41:25 UTC (rev 6960)
@@ -23,7 +23,10 @@
 
 import org.jboss.starobrno.config.Configuration;
 import org.jboss.starobrno.manager.CacheManager;
+import org.jboss.starobrno.util.TestingUtil;
 import org.testng.annotations.Test;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 @Test(groups = "functional")
 public class BasicTest
@@ -62,6 +65,7 @@
       }
    }
 
+   public static final Log log = LogFactory.getLog(BasicTest.class);
    public void testBasicReplication()
    {
       Configuration configuration = new Configuration();
@@ -75,10 +79,14 @@
          firstManager.start();
          secondManager.start();
 
-         Cache firstCache = firstManager.createCache("test");
-         Cache secondCache = firstManager.createCache("test");
+         CacheSPI firstCache = (CacheSPI) firstManager.createCache("test");
+         CacheSPI secondCache = (CacheSPI) secondManager.createCache("test");
 
+         TestingUtil.blockUntilViewReceived(secondCache, 2, 3000);
+
+
          firstCache.put("key","value");
+
          assert secondCache.get("key").equals("value");
          assert firstCache.get("key").equals("value");
          secondCache.put("key", "value2");

Modified: core/branches/flat/src/test/java/org/jboss/starobrno/util/TestingUtil.java
===================================================================
--- core/branches/flat/src/test/java/org/jboss/starobrno/util/TestingUtil.java	2008-10-15 18:13:06 UTC (rev 6959)
+++ core/branches/flat/src/test/java/org/jboss/starobrno/util/TestingUtil.java	2008-10-15 19:41:25 UTC (rev 6960)
@@ -324,7 +324,7 @@
    public static boolean isCacheViewComplete(Cache c, int memberCount, boolean barfIfTooManyMembers)
    {
       CacheSPI cache = (CacheSPI) c;
-      List members = cache.getRPCManager().getMembers();
+      List members = cache.getMembers();
       if (members == null || memberCount > members.size())
       {
          return false;

Modified: core/branches/flat/src/test/resources/log4j.xml
===================================================================
--- core/branches/flat/src/test/resources/log4j.xml	2008-10-15 18:13:06 UTC (rev 6959)
+++ core/branches/flat/src/test/resources/log4j.xml	2008-10-15 19:41:25 UTC (rev 6960)
@@ -49,6 +49,10 @@
       <priority value="TRACE"/>
    </category>
 
+   <category name="org.jboss.starobrno.factories.ComponentRegistry">
+      <priority value="WARN"/>
+   </category>
+
    <category name="org.jboss.cache.factories">
       <priority value="TRACE"/>
    </category>




More information about the jbosscache-commits mailing list