Author: mircea.markus
Date: 2008-10-15 15:41:25 -0400 (Wed, 15 Oct 2008)
New Revision: 6960
Added:
core/branches/flat/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor_Legacy.java
core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/InvalidateCommand.java
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvalidationInterceptor.java
Removed:
core/branches/flat/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/Cache.java
core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java
core/branches/flat/src/main/java/org/jboss/starobrno/commands/AbstractVisitor.java
core/branches/flat/src/main/java/org/jboss/starobrno/commands/CommandsFactory.java
core/branches/flat/src/main/java/org/jboss/starobrno/commands/CommandsFactoryImpl.java
core/branches/flat/src/main/java/org/jboss/starobrno/commands/Visitor.java
core/branches/flat/src/main/java/org/jboss/starobrno/container/DataContainer.java
core/branches/flat/src/main/java/org/jboss/starobrno/container/UnsortedDataContainer.java
core/branches/flat/src/main/java/org/jboss/starobrno/factories/InterceptorChainFactory.java
core/branches/flat/src/main/java/org/jboss/starobrno/manager/CacheManager.java
core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CacheMarshallerStarobrno.java
core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CommandAwareRpcDispatcher.java
core/branches/flat/src/main/java/org/jboss/starobrno/notifications/Notifier.java
core/branches/flat/src/main/java/org/jboss/starobrno/notifications/NotifierImpl.java
core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManagerImpl.java
core/branches/flat/src/test/java/org/jboss/starobrno/BasicTest.java
core/branches/flat/src/test/java/org/jboss/starobrno/util/TestingUtil.java
core/branches/flat/src/test/resources/log4j.xml
Log:
progress on cache replication
Deleted:
core/branches/flat/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java 2008-10-15
18:13:06 UTC (rev 6959)
+++
core/branches/flat/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java 2008-10-15
19:41:25 UTC (rev 6960)
@@ -1,392 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.jboss.cache.interceptors;
-
-import org.jboss.cache.Fqn;
-import org.jboss.cache.InvocationContext;
-import org.jboss.cache.commands.AbstractVisitor;
-import org.jboss.cache.commands.CommandsFactory;
-import org.jboss.cache.commands.VisitableCommand;
-import org.jboss.cache.commands.WriteCommand;
-import org.jboss.cache.commands.tx.CommitCommand;
-import org.jboss.cache.commands.tx.PrepareCommand;
-import org.jboss.cache.commands.tx.RollbackCommand;
-import org.jboss.cache.commands.write.ClearDataCommand;
-import org.jboss.cache.commands.write.MoveCommand;
-import org.jboss.cache.commands.write.PutDataMapCommand;
-import org.jboss.cache.commands.write.PutForExternalReadCommand;
-import org.jboss.cache.commands.write.PutKeyValueCommand;
-import org.jboss.cache.commands.write.RemoveKeyCommand;
-import org.jboss.cache.commands.write.RemoveNodeCommand;
-import org.jboss.cache.jmx.annotations.ManagedAttribute;
-import org.jboss.cache.jmx.annotations.ManagedOperation;
-import org.jboss.cache.transaction.GlobalTransaction;
-import org.jboss.cache.transaction.TransactionContext;
-import org.jboss.cache.transaction.TransactionTable;
-import org.jboss.starobrno.config.Option;
-import org.jboss.starobrno.factories.annotations.Inject;
-import org.jboss.starobrno.factories.annotations.Start;
-
-import javax.transaction.SystemException;
-import javax.transaction.Transaction;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * This interceptor acts as a replacement to the replication interceptor when
- * the CacheImpl is configured with ClusteredSyncMode as INVALIDATE.
- * <p/>
- * The idea is that rather than replicating changes to all caches in a cluster
- * when CRUD (Create, Remove, Update, Delete) methods are called, simply call
- * evict(Fqn) on the remote caches for each changed node. This allows the
- * remote node to look up the value in a shared cache loader which would have
- * been updated with the changes.
- *
- * @author <a href="mailto:manik@jboss.org">Manik Surtani
(manik(a)jboss.org)</a>
- */
-public class InvalidationInterceptor extends BaseRpcInterceptor
-{
- private long invalidations = 0;
- protected Map<GlobalTransaction, List<WriteCommand>> txMods;
- protected boolean optimistic;
- private CommandsFactory commandsFactory;
- private boolean statsEnabled;
-
- @Inject
- public void injectDependencies(CommandsFactory commandsFactory)
- {
- this.commandsFactory = commandsFactory;
- }
-
- @Start
- private void initTxMap()
- {
- optimistic = false;
- if (optimistic) txMods = new ConcurrentHashMap<GlobalTransaction,
List<WriteCommand>>();
- this.setStatisticsEnabled(configuration.getExposeManagementStatistics());
- }
-
- @Override
- public Object visitPutDataMapCommand(InvocationContext ctx, PutDataMapCommand command)
throws Throwable
- {
- return handleWriteMethod(ctx, command.getFqn(), null, command);
- }
-
- @Override
- public Object visitPutForExternalReadCommand(InvocationContext ctx,
PutForExternalReadCommand command) throws Throwable
- {
- // these are always local more, as far as invalidation is concerned
- if (ctx.getTransaction() != null)
ctx.getTransactionContext().addLocalModification(command);
- return invokeNextInterceptor(ctx, command);
- }
-
- @Override
- public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand
command) throws Throwable
- {
- return handleWriteMethod(ctx, command.getFqn(), null, command);
- }
-
- @Override
- public Object visitRemoveNodeCommand(InvocationContext ctx, RemoveNodeCommand command)
throws Throwable
- {
- return handleWriteMethod(ctx, command.getFqn(), null, command);
- }
-
- @Override
- public Object visitRemoveKeyCommand(InvocationContext ctx, RemoveKeyCommand command)
throws Throwable
- {
- return handleWriteMethod(ctx, command.getFqn(), null, command);
- }
-
- @Override
- public Object visitMoveCommand(InvocationContext ctx, MoveCommand command) throws
Throwable
- {
- return handleWriteMethod(ctx, command.getTo(), command.getFqn(), command);
- }
-
- @Override
- public Object visitClearDataCommand(InvocationContext ctx, ClearDataCommand command)
throws Throwable
- {
- return handleWriteMethod(ctx, command.getFqn(), null, command);
- }
-
- @Override
- public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand command)
throws Throwable
- {
- Object retval = invokeNextInterceptor(ctx, command);
- Transaction tx = ctx.getTransaction();
- if (tx != null)
- {
- if (trace) log.trace("Entering InvalidationInterceptor's prepare
phase");
- // fetch the modifications before the transaction is committed (and thus removed
from the txTable)
- GlobalTransaction gtx = ctx.getGlobalTransaction();
- TransactionContext transactionContext = ctx.getTransactionContext();
- if (transactionContext == null)
- throw new IllegalStateException("cannot find transaction
transactionContext for " + gtx);
-
- if (transactionContext.hasModifications())
- {
- List<WriteCommand> mods;
- if (transactionContext.hasLocalModifications())
- {
- mods = new ArrayList<WriteCommand>(command.getModifications());
- mods.removeAll(transactionContext.getLocalModifications());
- }
- else
- {
- mods = command.getModifications();
- }
- broadcastInvalidate(mods, tx, ctx);
- }
- else
- {
- if (trace) log.trace("Nothing to invalidate - no modifications in the
transaction.");
- }
- }
- return retval;
- }
-
- @Override
- public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws
Throwable
- {
- Object retval = invokeNextInterceptor(ctx, command);
- Transaction tx = ctx.getTransaction();
- if (tx != null && optimistic)
- {
- GlobalTransaction gtx = ctx.getGlobalTransaction();
- List<WriteCommand> modifications = txMods.remove(gtx);
- broadcastInvalidate(modifications, tx, ctx);
- if (trace) log.trace("Committing. Broadcasting invalidations.");
- }
- return retval;
- }
-
- @Override
- public Object visitRollbackCommand(InvocationContext ctx, RollbackCommand command)
throws Throwable
- {
- Object retval = invokeNextInterceptor(ctx, command);
- Transaction tx = ctx.getTransaction();
- if (tx != null && optimistic)
- {
- GlobalTransaction gtx = ctx.getGlobalTransaction();
- txMods.remove(gtx);
- log.debug("Caught a rollback. Clearing modification in txMods");
- }
- return retval;
- }
-
- /**
- * @param from is only present for move operations, else pass it in as null
- * @param command
- */
- private Object handleWriteMethod(InvocationContext ctx, Fqn targetFqn, Fqn from,
VisitableCommand command)
- throws Throwable
- {
- Object retval = invokeNextInterceptor(ctx, command);
- Transaction tx = ctx.getTransaction();
- Option optionOverride = ctx.getOptionOverrides();
- if (log.isDebugEnabled()) log.debug("Is a CRUD method");
- Set<Fqn> fqns = new HashSet<Fqn>();
- if (from != null)
- {
- fqns.add(from);
- }
- fqns.add(targetFqn);
- if (!fqns.isEmpty())
- {
- // could be potentially TRANSACTIONAL. Ignore if it is, until we see a
prepare().
- if (tx == null || !TransactionTable.isValid(tx))
- {
- // the no-tx case:
- //replicate an evict call.
- for (Fqn fqn : fqns) invalidateAcrossCluster(fqn, null,
isSynchronous(optionOverride), ctx);
- }
- else
- {
- if (isLocalModeForced(ctx))
ctx.getTransactionContext().addLocalModification((WriteCommand) command);
- }
- }
- return retval;
- }
-
- private void broadcastInvalidate(List<WriteCommand> modifications, Transaction
tx, InvocationContext ctx) throws Throwable
- {
- if (ctx.getTransaction() != null && !isLocalModeForced(ctx))
- {
- if (modifications == null || modifications.isEmpty()) return;
- InvalidationFilterVisitor filterVisitor = new
InvalidationFilterVisitor(modifications.size());
- filterVisitor.visitCollection(null, modifications);
-
- if (filterVisitor.containsPutForExternalRead)
- {
- log.debug("Modification list contains a putForExternalRead operation.
Not invalidating.");
- }
- else
- {
- try
- {
- for (Fqn fqn : filterVisitor.result) invalidateAcrossCluster(fqn, null,
defaultSynchronous, ctx);
- }
- catch (Throwable t)
- {
- log.warn("Unable to broadcast evicts as a part of the prepare phase.
Rolling back.", t);
- try
- {
- tx.setRollbackOnly();
- }
- catch (SystemException se)
- {
- throw new RuntimeException("setting tx rollback failed ",
se);
- }
- if (t instanceof RuntimeException)
- throw (RuntimeException) t;
- else
- throw new RuntimeException("Unable to broadcast invalidation
messages", t);
- }
- }
- }
- }
-
- public static class InvalidationFilterVisitor extends AbstractVisitor
- {
- Set<Fqn> result;
- public boolean containsPutForExternalRead;
-
- public InvalidationFilterVisitor(int maxSetSize)
- {
- result = new HashSet<Fqn>(maxSetSize);
- }
-
- @Override
- public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand
command) throws Throwable
- {
- result.add(command.getFqn());
- return null;
- }
-
- @Override
- public Object visitPutForExternalReadCommand(InvocationContext ctx,
PutForExternalReadCommand command) throws Throwable
- {
- containsPutForExternalRead = true;
- return null;
- }
-
- @Override
- public Object visitPutDataMapCommand(InvocationContext ctx, PutDataMapCommand
command) throws Throwable
- {
- result.add(command.getFqn());
- return null;
- }
-
- @Override
- public Object visitRemoveNodeCommand(InvocationContext ctx, RemoveNodeCommand
command) throws Throwable
- {
- result.add(command.getFqn());
- return null;
- }
-
- @Override
- public Object visitClearDataCommand(InvocationContext ctx, ClearDataCommand
command) throws Throwable
- {
- result.add(command.getFqn());
- return null;
- }
-
- @Override
- public Object visitRemoveKeyCommand(InvocationContext ctx, RemoveKeyCommand
command) throws Throwable
- {
- result.add(command.getFqn());
- return null;
- }
-
- @Override
- public Object visitMoveCommand(InvocationContext ctx, MoveCommand command) throws
Throwable
- {
- result.add(command.getFqn());
- // now if this is a "move" operation, then we also have another Fqn -
- Object le = command.getFqn().getLastElement();
- Fqn parent = command.getTo();
- result.add(Fqn.fromRelativeElements(parent, le));
- return null;
- }
- }
-
-
- protected void invalidateAcrossCluster(Fqn fqn, Object workspace, boolean synchronous,
InvocationContext ctx) throws Throwable
- {
- /*
- if (!isLocalModeForced(ctx))
- {
- // increment invalidations counter if statistics maintained
- incrementInvalidations();
- InvalidateCommand command = commandsFactory.buildInvalidateCommand(fqn);
- DataVersion dataVersion = getNodeVersion(workspace, fqn);
- if (dataVersion != null) ((VersionedInvalidateCommand)
command).setDataVersion(dataVersion);
- if (log.isDebugEnabled()) log.debug("Cache [" +
rpcManager.getLocalAddress() + "] replicating " + command);
- // voila, invalidated!
- replicateCall(ctx, command, synchronous, ctx.getOptionOverrides());
- }
- */
- }
-
- private void incrementInvalidations()
- {
- if (getStatisticsEnabled()) invalidations++;
- }
-
- @ManagedOperation
- public void resetStatistics()
- {
- invalidations = 0;
- }
-
- @ManagedOperation
- public Map<String, Object> dumpStatistics()
- {
- Map<String, Object> retval = new HashMap<String, Object>();
- retval.put("Invalidations", invalidations);
- return retval;
- }
-
- @ManagedAttribute
- public boolean getStatisticsEnabled()
- {
- return this.statsEnabled;
- }
-
- @ManagedAttribute
- public void setStatisticsEnabled(boolean enabled)
- {
- this.statsEnabled = enabled;
- }
-
- @ManagedAttribute(description = "number of invalidations")
- public long getInvalidations()
- {
- return invalidations;
- }
-}
Copied:
core/branches/flat/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor_Legacy.java
(from rev 6936,
core/branches/flat/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java)
===================================================================
---
core/branches/flat/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor_Legacy.java
(rev 0)
+++
core/branches/flat/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor_Legacy.java 2008-10-15
19:41:25 UTC (rev 6960)
@@ -0,0 +1,392 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.cache.interceptors;
+
+import org.jboss.cache.Fqn;
+import org.jboss.cache.InvocationContext;
+import org.jboss.cache.commands.AbstractVisitor;
+import org.jboss.cache.commands.CommandsFactory;
+import org.jboss.cache.commands.VisitableCommand;
+import org.jboss.cache.commands.WriteCommand;
+import org.jboss.cache.commands.tx.CommitCommand;
+import org.jboss.cache.commands.tx.PrepareCommand;
+import org.jboss.cache.commands.tx.RollbackCommand;
+import org.jboss.cache.commands.write.ClearDataCommand;
+import org.jboss.cache.commands.write.MoveCommand;
+import org.jboss.cache.commands.write.PutDataMapCommand;
+import org.jboss.cache.commands.write.PutForExternalReadCommand;
+import org.jboss.cache.commands.write.PutKeyValueCommand;
+import org.jboss.cache.commands.write.RemoveKeyCommand;
+import org.jboss.cache.commands.write.RemoveNodeCommand;
+import org.jboss.cache.jmx.annotations.ManagedAttribute;
+import org.jboss.cache.jmx.annotations.ManagedOperation;
+import org.jboss.cache.transaction.GlobalTransaction;
+import org.jboss.cache.transaction.TransactionContext;
+import org.jboss.cache.transaction.TransactionTable;
+import org.jboss.starobrno.config.Option;
+import org.jboss.starobrno.factories.annotations.Inject;
+import org.jboss.starobrno.factories.annotations.Start;
+
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This interceptor acts as a replacement to the replication interceptor when
+ * the CacheImpl is configured with ClusteredSyncMode as INVALIDATE.
+ * <p/>
+ * The idea is that rather than replicating changes to all caches in a cluster
+ * when CRUD (Create, Remove, Update, Delete) methods are called, simply call
+ * evict(Fqn) on the remote caches for each changed node. This allows the
+ * remote node to look up the value in a shared cache loader which would have
+ * been updated with the changes.
+ *
+ * @author <a href="mailto:manik@jboss.org">Manik Surtani
(manik(a)jboss.org)</a>
+ */
+public class InvalidationInterceptor_Legacy extends BaseRpcInterceptor
+{
+ private long invalidations = 0;
+ protected Map<GlobalTransaction, List<WriteCommand>> txMods;
+ protected boolean optimistic;
+ private CommandsFactory commandsFactory;
+ private boolean statsEnabled;
+
+ @Inject
+ public void injectDependencies(CommandsFactory commandsFactory)
+ {
+ this.commandsFactory = commandsFactory;
+ }
+
+ @Start
+ private void initTxMap()
+ {
+ optimistic = false;
+ if (optimistic) txMods = new ConcurrentHashMap<GlobalTransaction,
List<WriteCommand>>();
+ this.setStatisticsEnabled(configuration.getExposeManagementStatistics());
+ }
+
+ @Override
+ public Object visitPutDataMapCommand(InvocationContext ctx, PutDataMapCommand command)
throws Throwable
+ {
+ return handleWriteMethod(ctx, command.getFqn(), null, command);
+ }
+
+ @Override
+ public Object visitPutForExternalReadCommand(InvocationContext ctx,
PutForExternalReadCommand command) throws Throwable
+ {
+ // these are always local more, as far as invalidation is concerned
+ if (ctx.getTransaction() != null)
ctx.getTransactionContext().addLocalModification(command);
+ return invokeNextInterceptor(ctx, command);
+ }
+
+ @Override
+ public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand
command) throws Throwable
+ {
+ return handleWriteMethod(ctx, command.getFqn(), null, command);
+ }
+
+ @Override
+ public Object visitRemoveNodeCommand(InvocationContext ctx, RemoveNodeCommand command)
throws Throwable
+ {
+ return handleWriteMethod(ctx, command.getFqn(), null, command);
+ }
+
+ @Override
+ public Object visitRemoveKeyCommand(InvocationContext ctx, RemoveKeyCommand command)
throws Throwable
+ {
+ return handleWriteMethod(ctx, command.getFqn(), null, command);
+ }
+
+ @Override
+ public Object visitMoveCommand(InvocationContext ctx, MoveCommand command) throws
Throwable
+ {
+ return handleWriteMethod(ctx, command.getTo(), command.getFqn(), command);
+ }
+
+ @Override
+ public Object visitClearDataCommand(InvocationContext ctx, ClearDataCommand command)
throws Throwable
+ {
+ return handleWriteMethod(ctx, command.getFqn(), null, command);
+ }
+
+ @Override
+ public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand command)
throws Throwable
+ {
+ Object retval = invokeNextInterceptor(ctx, command);
+ Transaction tx = ctx.getTransaction();
+ if (tx != null)
+ {
+ if (trace) log.trace("Entering InvalidationInterceptor_Legacy's prepare
phase");
+ // fetch the modifications before the transaction is committed (and thus removed
from the txTable)
+ GlobalTransaction gtx = ctx.getGlobalTransaction();
+ TransactionContext transactionContext = ctx.getTransactionContext();
+ if (transactionContext == null)
+ throw new IllegalStateException("cannot find transaction
transactionContext for " + gtx);
+
+ if (transactionContext.hasModifications())
+ {
+ List<WriteCommand> mods;
+ if (transactionContext.hasLocalModifications())
+ {
+ mods = new ArrayList<WriteCommand>(command.getModifications());
+ mods.removeAll(transactionContext.getLocalModifications());
+ }
+ else
+ {
+ mods = command.getModifications();
+ }
+ broadcastInvalidate(mods, tx, ctx);
+ }
+ else
+ {
+ if (trace) log.trace("Nothing to invalidate - no modifications in the
transaction.");
+ }
+ }
+ return retval;
+ }
+
+ @Override
+ public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws
Throwable
+ {
+ Object retval = invokeNextInterceptor(ctx, command);
+ Transaction tx = ctx.getTransaction();
+ if (tx != null && optimistic)
+ {
+ GlobalTransaction gtx = ctx.getGlobalTransaction();
+ List<WriteCommand> modifications = txMods.remove(gtx);
+ broadcastInvalidate(modifications, tx, ctx);
+ if (trace) log.trace("Committing. Broadcasting invalidations.");
+ }
+ return retval;
+ }
+
+ @Override
+ public Object visitRollbackCommand(InvocationContext ctx, RollbackCommand command)
throws Throwable
+ {
+ Object retval = invokeNextInterceptor(ctx, command);
+ Transaction tx = ctx.getTransaction();
+ if (tx != null && optimistic)
+ {
+ GlobalTransaction gtx = ctx.getGlobalTransaction();
+ txMods.remove(gtx);
+ log.debug("Caught a rollback. Clearing modification in txMods");
+ }
+ return retval;
+ }
+
+ /**
+ * @param from is only present for move operations, else pass it in as null
+ * @param command
+ */
+ private Object handleWriteMethod(InvocationContext ctx, Fqn targetFqn, Fqn from,
VisitableCommand command)
+ throws Throwable
+ {
+ Object retval = invokeNextInterceptor(ctx, command);
+ Transaction tx = ctx.getTransaction();
+ Option optionOverride = ctx.getOptionOverrides();
+ if (log.isDebugEnabled()) log.debug("Is a CRUD method");
+ Set<Fqn> fqns = new HashSet<Fqn>();
+ if (from != null)
+ {
+ fqns.add(from);
+ }
+ fqns.add(targetFqn);
+ if (!fqns.isEmpty())
+ {
+ // could be potentially TRANSACTIONAL. Ignore if it is, until we see a
prepare().
+ if (tx == null || !TransactionTable.isValid(tx))
+ {
+ // the no-tx case:
+ //replicate an evict call.
+ for (Fqn fqn : fqns) invalidateAcrossCluster(fqn, null,
isSynchronous(optionOverride), ctx);
+ }
+ else
+ {
+ if (isLocalModeForced(ctx))
ctx.getTransactionContext().addLocalModification((WriteCommand) command);
+ }
+ }
+ return retval;
+ }
+
+ private void broadcastInvalidate(List<WriteCommand> modifications, Transaction
tx, InvocationContext ctx) throws Throwable
+ {
+ if (ctx.getTransaction() != null && !isLocalModeForced(ctx))
+ {
+ if (modifications == null || modifications.isEmpty()) return;
+ InvalidationFilterVisitor filterVisitor = new
InvalidationFilterVisitor(modifications.size());
+ filterVisitor.visitCollection(null, modifications);
+
+ if (filterVisitor.containsPutForExternalRead)
+ {
+ log.debug("Modification list contains a putForExternalRead operation.
Not invalidating.");
+ }
+ else
+ {
+ try
+ {
+ for (Fqn fqn : filterVisitor.result) invalidateAcrossCluster(fqn, null,
defaultSynchronous, ctx);
+ }
+ catch (Throwable t)
+ {
+ log.warn("Unable to broadcast evicts as a part of the prepare phase.
Rolling back.", t);
+ try
+ {
+ tx.setRollbackOnly();
+ }
+ catch (SystemException se)
+ {
+ throw new RuntimeException("setting tx rollback failed ",
se);
+ }
+ if (t instanceof RuntimeException)
+ throw (RuntimeException) t;
+ else
+ throw new RuntimeException("Unable to broadcast invalidation
messages", t);
+ }
+ }
+ }
+ }
+
+ public static class InvalidationFilterVisitor extends AbstractVisitor
+ {
+ Set<Fqn> result;
+ public boolean containsPutForExternalRead;
+
+ public InvalidationFilterVisitor(int maxSetSize)
+ {
+ result = new HashSet<Fqn>(maxSetSize);
+ }
+
+ @Override
+ public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand
command) throws Throwable
+ {
+ result.add(command.getFqn());
+ return null;
+ }
+
+ @Override
+ public Object visitPutForExternalReadCommand(InvocationContext ctx,
PutForExternalReadCommand command) throws Throwable
+ {
+ containsPutForExternalRead = true;
+ return null;
+ }
+
+ @Override
+ public Object visitPutDataMapCommand(InvocationContext ctx, PutDataMapCommand
command) throws Throwable
+ {
+ result.add(command.getFqn());
+ return null;
+ }
+
+ @Override
+ public Object visitRemoveNodeCommand(InvocationContext ctx, RemoveNodeCommand
command) throws Throwable
+ {
+ result.add(command.getFqn());
+ return null;
+ }
+
+ @Override
+ public Object visitClearDataCommand(InvocationContext ctx, ClearDataCommand
command) throws Throwable
+ {
+ result.add(command.getFqn());
+ return null;
+ }
+
+ @Override
+ public Object visitRemoveKeyCommand(InvocationContext ctx, RemoveKeyCommand
command) throws Throwable
+ {
+ result.add(command.getFqn());
+ return null;
+ }
+
+ @Override
+ public Object visitMoveCommand(InvocationContext ctx, MoveCommand command) throws
Throwable
+ {
+ result.add(command.getFqn());
+ // now if this is a "move" operation, then we also have another Fqn -
+ Object le = command.getFqn().getLastElement();
+ Fqn parent = command.getTo();
+ result.add(Fqn.fromRelativeElements(parent, le));
+ return null;
+ }
+ }
+
+
+ protected void invalidateAcrossCluster(Fqn fqn, Object workspace, boolean synchronous,
InvocationContext ctx) throws Throwable
+ {
+ /*
+ if (!isLocalModeForced(ctx))
+ {
+ // increment invalidations counter if statistics maintained
+ incrementInvalidations();
+ InvalidateCommand command = commandsFactory.buildInvalidateCommand(fqn);
+ DataVersion dataVersion = getNodeVersion(workspace, fqn);
+ if (dataVersion != null) ((VersionedInvalidateCommand)
command).setDataVersion(dataVersion);
+ if (log.isDebugEnabled()) log.debug("Cache [" +
rpcManager.getLocalAddress() + "] replicating " + command);
+ // voila, invalidated!
+ replicateCall(ctx, command, synchronous, ctx.getOptionOverrides());
+ }
+ */
+ }
+
+ private void incrementInvalidations()
+ {
+ if (getStatisticsEnabled()) invalidations++;
+ }
+
+ @ManagedOperation
+ public void resetStatistics()
+ {
+ invalidations = 0;
+ }
+
+ @ManagedOperation
+ public Map<String, Object> dumpStatistics()
+ {
+ Map<String, Object> retval = new HashMap<String, Object>();
+ retval.put("Invalidations", invalidations);
+ return retval;
+ }
+
+ @ManagedAttribute
+ public boolean getStatisticsEnabled()
+ {
+ return this.statsEnabled;
+ }
+
+ @ManagedAttribute
+ public void setStatisticsEnabled(boolean enabled)
+ {
+ this.statsEnabled = enabled;
+ }
+
+ @ManagedAttribute(description = "number of invalidations")
+ public long getInvalidations()
+ {
+ return invalidations;
+ }
+}
Property changes on:
core/branches/flat/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor_Legacy.java
___________________________________________________________________
Name: svn:keywords
+ Author Date Id Revision
Name: svn:eol-style
+ native
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/Cache.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/Cache.java 2008-10-15 18:13:06
UTC (rev 6959)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/Cache.java 2008-10-15 19:41:25
UTC (rev 6960)
@@ -25,8 +25,10 @@
import org.jboss.starobrno.config.Configuration;
import org.jboss.starobrno.context.InvocationContext;
import org.jboss.starobrno.lifecycle.Lifecycle;
+import org.jgroups.Address;
import java.util.Set;
+import java.util.List;
import java.util.concurrent.ConcurrentMap;
/**
@@ -53,4 +55,6 @@
public void startBatch();
public void endBatch(boolean successful);
+
+ List<Address> getMembers();
}
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java 2008-10-15
18:13:06 UTC (rev 6959)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java 2008-10-15
19:41:25 UTC (rev 6960)
@@ -49,6 +49,7 @@
import org.jboss.starobrno.notifications.Notifier;
import org.jboss.starobrno.transaction.GlobalTransaction;
import org.jboss.starobrno.transaction.TransactionTable;
+import org.jgroups.Address;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
@@ -71,6 +72,7 @@
protected BatchContainer batchContainer;
protected ComponentRegistry componentRegistry;
protected TransactionManager transactionManager;
+ protected RPCManager rpcManager;
@Inject
@@ -81,7 +83,8 @@
Notifier notifier,
ComponentRegistry componentRegistry,
TransactionManager transactionManager,
- BatchContainer batchContainer)
+ BatchContainer batchContainer,
+ RPCManager rpcManager)
{
this.invocationContextContainer = invocationContextContainer;
this.commandsFactory = commandsFactory;
@@ -91,6 +94,7 @@
this.componentRegistry = componentRegistry;
this.transactionManager = transactionManager;
this.batchContainer = batchContainer;
+ this.rpcManager = rpcManager;
}
public V putIfAbsent(K key, V value)
@@ -293,7 +297,7 @@
public RPCManager getRPCManager()
{
- return null; //TODO: Autogenerated. Implement me properly
+ return rpcManager;
}
public StateTransferManager getStateTransferManager()
@@ -349,4 +353,9 @@
throw new ConfigurationException("Invocation batching not enabled in
current configuration! Please use the <invocationBatching /> element.");
batchContainer.endBatch(successful);
}
+
+ public List<Address> getMembers()
+ {
+ return rpcManager.getMembers();
+ }
}
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/commands/AbstractVisitor.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/commands/AbstractVisitor.java 2008-10-15
18:13:06 UTC (rev 6959)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/commands/AbstractVisitor.java 2008-10-15
19:41:25 UTC (rev 6960)
@@ -27,12 +27,7 @@
import org.jboss.starobrno.commands.tx.CommitCommand;
import org.jboss.starobrno.commands.tx.PrepareCommand;
import org.jboss.starobrno.commands.tx.RollbackCommand;
-import org.jboss.starobrno.commands.write.ClearCommand;
-import org.jboss.starobrno.commands.write.EvictCommand;
-import org.jboss.starobrno.commands.write.PutKeyValueCommand;
-import org.jboss.starobrno.commands.write.PutMapCommand;
-import org.jboss.starobrno.commands.write.RemoveCommand;
-import org.jboss.starobrno.commands.write.ReplaceCommand;
+import org.jboss.starobrno.commands.write.*;
import org.jboss.starobrno.context.InvocationContext;
import java.util.Collection;
@@ -112,6 +107,10 @@
return handleDefault(ctx, command);
}
+ public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand
invalidateCommand) throws Throwable
+ {
+ return handleDefault(ctx, invalidateCommand);
+ }
/**
* A default handler for all commands visited. This is called for any visit method
called, unless a visit command is
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/commands/CommandsFactory.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/commands/CommandsFactory.java 2008-10-15
18:13:06 UTC (rev 6959)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/commands/CommandsFactory.java 2008-10-15
19:41:25 UTC (rev 6960)
@@ -26,12 +26,7 @@
import org.jboss.starobrno.commands.tx.CommitCommand;
import org.jboss.starobrno.commands.tx.PrepareCommand;
import org.jboss.starobrno.commands.tx.RollbackCommand;
-import org.jboss.starobrno.commands.write.ClearCommand;
-import org.jboss.starobrno.commands.write.EvictCommand;
-import org.jboss.starobrno.commands.write.PutKeyValueCommand;
-import org.jboss.starobrno.commands.write.PutMapCommand;
-import org.jboss.starobrno.commands.write.RemoveCommand;
-import org.jboss.starobrno.commands.write.ReplaceCommand;
+import org.jboss.starobrno.commands.write.*;
import org.jboss.starobrno.commands.remote.ReplicateCommand;
import org.jboss.starobrno.transaction.GlobalTransaction;
import org.jgroups.Address;
@@ -71,4 +66,6 @@
ReplicateCommand buildReplicateCommand(List<ReplicableCommand> toReplicate);
ReplicateCommand buildReplicateCommand(ReplicableCommand call);
+
+ InvalidateCommand buildInvalidateCommand(Object fqn);
}
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/commands/CommandsFactoryImpl.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/commands/CommandsFactoryImpl.java 2008-10-15
18:13:06 UTC (rev 6959)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/commands/CommandsFactoryImpl.java 2008-10-15
19:41:25 UTC (rev 6960)
@@ -22,17 +22,13 @@
package org.jboss.starobrno.commands;
import org.jboss.starobrno.CacheException;
+import org.jboss.starobrno.interceptors.InterceptorChain;
import org.jboss.starobrno.commands.read.GetKeyValueCommand;
import org.jboss.starobrno.commands.read.SizeCommand;
import org.jboss.starobrno.commands.tx.CommitCommand;
import org.jboss.starobrno.commands.tx.PrepareCommand;
import org.jboss.starobrno.commands.tx.RollbackCommand;
-import org.jboss.starobrno.commands.write.ClearCommand;
-import org.jboss.starobrno.commands.write.EvictCommand;
-import org.jboss.starobrno.commands.write.PutKeyValueCommand;
-import org.jboss.starobrno.commands.write.PutMapCommand;
-import org.jboss.starobrno.commands.write.RemoveCommand;
-import org.jboss.starobrno.commands.write.ReplaceCommand;
+import org.jboss.starobrno.commands.write.*;
import org.jboss.starobrno.commands.remote.ReplicateCommand;
import org.jboss.starobrno.container.DataContainer;
import org.jboss.starobrno.factories.annotations.Inject;
@@ -50,12 +46,14 @@
{
private DataContainer dataContainer;
private Notifier notifier;
+ private InterceptorChain interceptorChain;
@Inject
- private void setupDependencies(DataContainer container, Notifier notifier)
+ private void setupDependencies(DataContainer container, Notifier notifier,
InterceptorChain interceptorChain)
{
this.dataContainer = container;
this.notifier = notifier;
+ this.interceptorChain = interceptorChain;
}
public PutKeyValueCommand buildPutKeyValueCommand(Object key, Object value)
@@ -185,11 +183,24 @@
command = c;
break;
}
+ case ReplicateCommand.MULTIPLE_METHOD_ID:
+ case org.jboss.cache.commands.remote.ReplicateCommand.SINGLE_METHOD_ID:
+ {
+ ReplicateCommand c = new ReplicateCommand();
+ c.initialize(interceptorChain);
+ command = c;
+ break;
+ }
+
default:
throw new CacheException("Unknown command id " + id +
"!");
}
-
command.setParameters(id, parameters);
return command;
}
+
+ public InvalidateCommand buildInvalidateCommand(Object fqn)
+ {
+ throw new UnsupportedOperationException("Not implemented");//todo please
implement!
+ }
}
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/commands/Visitor.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/commands/Visitor.java 2008-10-15
18:13:06 UTC (rev 6959)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/commands/Visitor.java 2008-10-15
19:41:25 UTC (rev 6960)
@@ -27,12 +27,7 @@
import org.jboss.starobrno.commands.tx.CommitCommand;
import org.jboss.starobrno.commands.tx.PrepareCommand;
import org.jboss.starobrno.commands.tx.RollbackCommand;
-import org.jboss.starobrno.commands.write.ClearCommand;
-import org.jboss.starobrno.commands.write.EvictCommand;
-import org.jboss.starobrno.commands.write.PutKeyValueCommand;
-import org.jboss.starobrno.commands.write.PutMapCommand;
-import org.jboss.starobrno.commands.write.RemoveCommand;
-import org.jboss.starobrno.commands.write.ReplaceCommand;
+import org.jboss.starobrno.commands.write.*;
import org.jboss.starobrno.context.InvocationContext;
public interface Visitor
@@ -67,4 +62,5 @@
Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws
Throwable;
+ Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand
invalidateCommand) throws Throwable;
}
\ No newline at end of file
Copied:
core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/InvalidateCommand.java
(from rev 6936,
core/branches/flat/src/main/java/org/jboss/cache/commands/write/InvalidateCommand.java)
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/InvalidateCommand.java
(rev 0)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/InvalidateCommand.java 2008-10-15
19:41:25 UTC (rev 6960)
@@ -0,0 +1,159 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.starobrno.commands.write;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.starobrno.commands.read.AbstractDataCommand;
+import org.jboss.starobrno.commands.Visitor;
+import org.jboss.starobrno.CacheSPI;
+import org.jboss.starobrno.context.InvocationContext;
+import org.jboss.starobrno.container.DataContainer;
+import org.jboss.starobrno.notifications.Notifier;
+import org.jboss.cache.NodeSPI;
+import org.jboss.cache.Fqn;
+
+
+/**
+ * Removes a node's content from memory - never removes the node.
+ * It also clenups data for resident nodes - which are not being touched by eviction.
+ *
+ * @author Mircea.Markus(a)jboss.com
+ * @since 2.2
+ */
+public class InvalidateCommand extends AbstractDataCommand
+{
+ public static final int METHOD_ID = 47;
+ private static final Log log = LogFactory.getLog(InvalidateCommand.class);
+ private static final boolean trace = log.isTraceEnabled();
+
+ /* dependencies*/
+ protected CacheSPI spi;
+ protected Notifier notifier;
+ protected DataContainer dataContainer;
+
+ public InvalidateCommand(Object key)
+ {
+ this.key = key;
+ }
+
+ public InvalidateCommand()
+ {
+ }
+
+ public void initialize(CacheSPI cacheSpi, DataContainer dataContainer, Notifier
notifier)
+ {
+ this.spi = cacheSpi;
+ this.dataContainer = dataContainer;
+ this.notifier = notifier;
+ }
+
+ /**
+ * Performs an invalidation on a specified node
+ *
+ * @param ctx invocation context
+ * @return null
+ */
+ public Object perform(InvocationContext ctx)
+ {
+ Object value = enforceNodeLoading();
+ if (trace) log.trace("Invalidating key:" + key);
+ if (value == null)
+ {
+ return null;
+ }
+ evictNode(key, ctx);
+// dataContainer.
+ return null;
+ }
+
+ boolean evictNode(Object key, InvocationContext ctx)
+ {
+ notifier.notifyNodeInvalidated(key, true, ctx);
+ try
+ {
+ return dataContainer.evict(key);
+ }
+ finally
+ {
+ notifier.notifyNodeInvalidated(key, false, ctx);
+ }
+ }
+
+
+ /**
+ * //TODO: 2.2.0: rather than using CacheSPI this should use peek(). The other
interceptors should obtain locks and load nodes if necessary for this InvalidateCommand.
+ * //Even better - this can be handles in the interceptors before call interceptor
+ */
+ protected Object enforceNodeLoading()
+ {
+ return spi.get(key);
+ }
+
+
+ /**
+ * mark the node to be removed (and all children) as invalid so anyone holding a
direct reference to it will
+ * be aware that it is no longer valid.
+ */
+ protected void invalidateNode(NodeSPI node)
+ {
+ node.setValid(false, true);
+ // root nodes can never be invalid
+// if (fqn.isRoot()) node.setValid(true, false); // non-recursive.
+ }
+
+
+ public Object acceptVisitor(InvocationContext ctx, Visitor visitor) throws Throwable
+ {
+ return visitor.visitInvalidateCommand(ctx, this);
+ }
+
+ public byte getCommandId()
+ {
+ return METHOD_ID;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "InvalidateCommand{" +
+ "key=" + key +
+ '}';
+ }
+
+ @Override
+ public Object[] getParameters()
+ {
+ return new Object[]{key};
+ }
+
+ @Override
+ public void setParameters(int commandId, Object[] args)
+ {
+ key = args[0];
+ }
+
+ void setFqn(Fqn newFqn)
+ {
+ this.key = newFqn;
+ }
+}
\ No newline at end of file
Property changes on:
core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/InvalidateCommand.java
___________________________________________________________________
Name: svn:mergeinfo
+
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/container/DataContainer.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/container/DataContainer.java 2008-10-15
18:13:06 UTC (rev 6959)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/container/DataContainer.java 2008-10-15
19:41:25 UTC (rev 6960)
@@ -44,4 +44,6 @@
void clear();
Set<K> keySet();
+
+ boolean evict(Object key);
}
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/container/UnsortedDataContainer.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/container/UnsortedDataContainer.java 2008-10-15
18:13:06 UTC (rev 6959)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/container/UnsortedDataContainer.java 2008-10-15
19:41:25 UTC (rev 6960)
@@ -69,4 +69,9 @@
{
return data.keySet();
}
+
+ public boolean evict(Object key)
+ {
+ throw new UnsupportedOperationException("Not implemented");//todo please
implement!
+ }
}
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/factories/InterceptorChainFactory.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/factories/InterceptorChainFactory.java 2008-10-15
18:13:06 UTC (rev 6959)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/factories/InterceptorChainFactory.java 2008-10-15
19:41:25 UTC (rev 6960)
@@ -26,15 +26,7 @@
import org.jboss.starobrno.config.ConfigurationException;
import org.jboss.starobrno.config.CustomInterceptorConfig;
import org.jboss.starobrno.factories.annotations.DefaultFactoryFor;
-import org.jboss.starobrno.interceptors.BatchingInterceptor;
-import org.jboss.starobrno.interceptors.CacheMgmtInterceptor;
-import org.jboss.starobrno.interceptors.CallInterceptor;
-import org.jboss.starobrno.interceptors.InterceptorChain;
-import org.jboss.starobrno.interceptors.InvocationContextInterceptor;
-import org.jboss.starobrno.interceptors.LockingInterceptor;
-import org.jboss.starobrno.interceptors.MarshalledValueInterceptor;
-import org.jboss.starobrno.interceptors.NotificationInterceptor;
-import org.jboss.starobrno.interceptors.TxInterceptor;
+import org.jboss.starobrno.interceptors.*;
import org.jboss.starobrno.interceptors.base.CommandInterceptor;
import java.util.List;
@@ -111,20 +103,20 @@
interceptorChain.appendIntereceptor(createInterceptor(NotificationInterceptor.class));
- // TODO: Uncomment once the Repl and Inval interceptors has been moved to
Starobrno
-// switch (configuration.getCacheMode())
-// {
-// case REPL_SYNC:
-// case REPL_ASYNC:
-//
interceptorChain.appendIntereceptor(createInterceptor(ReplicationInterceptor.class));
-// break;
-// case INVALIDATION_SYNC:
-// case INVALIDATION_ASYNC:
-//
interceptorChain.appendIntereceptor(createInterceptor(InvalidationInterceptor.class));
-// break;
-// case LOCAL:
-// //Nothing...
-// }
+// TODO: Uncomment once the Repl and Inval interceptors has been moved to
Starobrno
+ switch (configuration.getCacheMode())
+ {
+ case REPL_SYNC:
+ case REPL_ASYNC:
+
interceptorChain.appendIntereceptor(createInterceptor(ReplicationInterceptor.class));
+ break;
+ case INVALIDATION_SYNC:
+ case INVALIDATION_ASYNC:
+
interceptorChain.appendIntereceptor(createInterceptor(InvalidationInterceptor.class));
+ break;
+ case LOCAL:
+ //Nothing...
+ }
// TODO: Uncomment once the CacheLoader has been moved to Starobrno
// if (configuration.isUsingCacheLoaders())
Copied:
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvalidationInterceptor.java
(from rev 6936,
core/branches/flat/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java)
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvalidationInterceptor.java
(rev 0)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvalidationInterceptor.java 2008-10-15
19:41:25 UTC (rev 6960)
@@ -0,0 +1,273 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.starobrno.interceptors;
+
+import org.jboss.starobrno.commands.AbstractVisitor;
+import org.jboss.starobrno.commands.CommandsFactory;
+import org.jboss.starobrno.commands.DataCommand;
+import org.jboss.starobrno.commands.VisitableCommand;
+import org.jboss.starobrno.commands.tx.PrepareCommand;
+import org.jboss.starobrno.commands.write.ClearCommand;
+import org.jboss.starobrno.commands.write.InvalidateCommand;
+import org.jboss.starobrno.commands.write.PutKeyValueCommand;
+import org.jboss.starobrno.commands.write.RemoveCommand;
+import org.jboss.starobrno.config.Option;
+import org.jboss.starobrno.context.InvocationContext;
+import org.jboss.starobrno.context.TransactionContext;
+import org.jboss.starobrno.factories.annotations.Inject;
+import org.jboss.starobrno.factories.annotations.Start;
+import org.jboss.starobrno.interceptors.base.BaseRpcInterceptor;
+import org.jboss.starobrno.jmx.annotations.ManagedAttribute;
+import org.jboss.starobrno.jmx.annotations.ManagedOperation;
+import org.jboss.starobrno.transaction.GlobalTransaction;
+import org.jboss.starobrno.transaction.TransactionTable;
+
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import java.util.*;
+
+
+/**
+ * This interceptor acts as a replacement to the replication interceptor when
+ * the CacheImpl is configured with ClusteredSyncMode as INVALIDATE.
+ * <p/>
+ * The idea is that rather than replicating changes to all caches in a cluster
+ * when CRUD (Create, Remove, Update, Delete) methods are called, simply call
+ * evict(Fqn) on the remote caches for each changed node. This allows the
+ * remote node to look up the value in a shared cache loader which would have
+ * been updated with the changes.
+ *
+ * @author <a href="mailto:manik@jboss.org">Manik Surtani
(manik(a)jboss.org)</a>
+ */
+public class InvalidationInterceptor extends BaseRpcInterceptor
+{
+ private long invalidations = 0;
+ protected Map<GlobalTransaction, List<VisitableCommand>> txMods;
+ private CommandsFactory commandsFactory;
+ private boolean statsEnabled;
+
+ @Inject
+ public void injectDependencies(CommandsFactory commandsFactory)
+ {
+ this.commandsFactory = commandsFactory;
+ }
+
+ @Start
+ private void initTxMap()
+ {
+ this.setStatisticsEnabled(configuration.getExposeManagementStatistics());
+ }
+
+ @Override
+ public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand
command) throws Throwable
+ {
+ return handleWriteMethod(ctx, command, command);
+ }
+
+ @Override
+ public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws
Throwable
+ {
+ return handleWriteMethod(ctx, command.getKey(), command);
+ }
+
+ @Override
+ public Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws
Throwable
+ {
+// return handleWriteMethod(ctx, command.getKey(), command);
+ //todo handle this - should perfor a remote invalidation aswell!!!
+ return null;
+ }
+
+ @Override
+ public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand command)
throws Throwable
+ {
+ Object retval = invokeNextInterceptor(ctx, command);
+ Transaction tx = ctx.getTransaction();
+ if (tx != null)
+ {
+ if (trace) log.trace("Entering InvalidationInterceptor_Legacy's prepare
phase");
+ // fetch the modifications before the transaction is committed (and thus removed
from the txTable)
+ GlobalTransaction gtx = ctx.getGlobalTransaction();
+ TransactionContext transactionContext = ctx.getTransactionContext();
+ if (transactionContext == null)
+ throw new IllegalStateException("cannot find transaction
transactionContext for " + gtx);
+
+ if (transactionContext.hasModifications())
+ {
+ List<DataCommand> mods;
+ if (transactionContext.hasLocalModifications())
+ {
+ mods = new ArrayList<DataCommand>(command.getModifications());
+ mods.removeAll(transactionContext.getLocalModifications());
+ }
+ else
+ {
+ mods = command.getModifications();
+ }
+ broadcastInvalidate(mods, tx, ctx);
+ }
+ else
+ {
+ if (trace) log.trace("Nothing to invalidate - no modifications in the
transaction.");
+ }
+ }
+ return retval;
+ }
+
+ private Object handleWriteMethod(InvocationContext ctx, Object key, VisitableCommand
command)
+ throws Throwable
+ {
+ Object retval = invokeNextInterceptor(ctx, command);
+ Transaction tx = ctx.getTransaction();
+ Option optionOverride = ctx.getOptionOverrides();
+ if (log.isDebugEnabled()) log.debug("Is a CRUD method");
+ if (key != null)
+ {
+ // could be potentially TRANSACTIONAL. Ignore if it is, until we see a
prepare().
+ if (tx == null || !TransactionTable.isValid(tx))
+ {
+ // the no-tx case:
+ //replicate an evict call.
+ invalidateAcrossCluster(key, null, isSynchronous(optionOverride), ctx);
+ }
+ else
+ {
+ if (isLocalModeForced(ctx))
ctx.getTransactionContext().addLocalModification(command);
+ }
+ }
+ return retval;
+ }
+
+ private void broadcastInvalidate(List<DataCommand> modifications, Transaction
tx, InvocationContext ctx) throws Throwable
+ {
+ if (ctx.getTransaction() != null && !isLocalModeForced(ctx))
+ {
+ if (modifications == null || modifications.isEmpty()) return;
+ InvalidationFilterVisitor filterVisitor = new
InvalidationFilterVisitor(modifications.size());
+ filterVisitor.visitCollection(null, modifications);
+
+ if (filterVisitor.containsPutForExternalRead)
+ {
+ log.debug("Modification list contains a putForExternalRead operation.
Not invalidating.");
+ }
+ else
+ {
+ try
+ {
+ for (Object key : filterVisitor.result) invalidateAcrossCluster(key, null,
defaultSynchronous, ctx);
+ }
+ catch (Throwable t)
+ {
+ log.warn("Unable to broadcast evicts as a part of the prepare phase.
Rolling back.", t);
+ try
+ {
+ tx.setRollbackOnly();
+ }
+ catch (SystemException se)
+ {
+ throw new RuntimeException("setting tx rollback failed ",
se);
+ }
+ if (t instanceof RuntimeException)
+ throw (RuntimeException) t;
+ else
+ throw new RuntimeException("Unable to broadcast invalidation
messages", t);
+ }
+ }
+ }
+ }
+
+ public static class InvalidationFilterVisitor extends AbstractVisitor
+ {
+ Set<Object> result;
+ public boolean containsPutForExternalRead;
+
+ public InvalidationFilterVisitor(int maxSetSize)
+ {
+ result = new HashSet<Object>(maxSetSize);
+ }
+
+ @Override
+ public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand
command) throws Throwable
+ {
+ result.add(command.getKey());
+ return null;
+ }
+
+ @Override
+ public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command)
throws Throwable
+ {
+ result.add(command.getKey());
+ return null;
+ }
+ }
+
+
+ protected void invalidateAcrossCluster(Object fqn, Object workspace, boolean
synchronous, InvocationContext ctx) throws Throwable
+ {
+ if (!isLocalModeForced(ctx))
+ {
+ // increment invalidations counter if statistics maintained
+ incrementInvalidations();
+ InvalidateCommand command = commandsFactory.buildInvalidateCommand(fqn);
+ if (log.isDebugEnabled()) log.debug("Cache [" +
rpcManager.getLocalAddress() + "] replicating " + command);
+ // voila, invalidated!
+ replicateCall(ctx, command, synchronous, ctx.getOptionOverrides());
+ }
+ }
+
+ private void incrementInvalidations()
+ {
+ if (getStatisticsEnabled()) invalidations++;
+ }
+
+ @ManagedOperation
+ public void resetStatistics()
+ {
+ invalidations = 0;
+ }
+
+ @ManagedOperation
+ public Map<String, Object> dumpStatistics()
+ {
+ Map<String, Object> retval = new HashMap<String, Object>();
+ retval.put("Invalidations", invalidations);
+ return retval;
+ }
+
+ @ManagedAttribute
+ public boolean getStatisticsEnabled()
+ {
+ return this.statsEnabled;
+ }
+
+ @ManagedAttribute
+ public void setStatisticsEnabled(boolean enabled)
+ {
+ this.statsEnabled = enabled;
+ }
+
+ @ManagedAttribute(description = "number of invalidations")
+ public long getInvalidations()
+ {
+ return invalidations;
+ }
+}
\ No newline at end of file
Property changes on:
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvalidationInterceptor.java
___________________________________________________________________
Name: svn:keywords
+ Author Date Id Revision
Name: svn:mergeinfo
+
Name: svn:eol-style
+ native
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/manager/CacheManager.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/manager/CacheManager.java 2008-10-15
18:13:06 UTC (rev 6959)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/manager/CacheManager.java 2008-10-15
19:41:25 UTC (rev 6960)
@@ -42,7 +42,15 @@
public CacheManager(Configuration c)
{
- this.c = c;
+ try
+ {
+ //if a config is shared between multiple managers, then each registers it's
+ // own chnnel in runtime
+ this.c = c.clone();
+ } catch (CloneNotSupportedException e)
+ {
+ throw new RuntimeException(e);
+ }
}
public void start()
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CacheMarshallerStarobrno.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CacheMarshallerStarobrno.java 2008-10-15
18:13:06 UTC (rev 6959)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CacheMarshallerStarobrno.java 2008-10-15
19:41:25 UTC (rev 6960)
@@ -25,6 +25,7 @@
import org.apache.commons.logging.LogFactory;
import org.jboss.starobrno.CacheException;
import org.jboss.starobrno.io.ByteBuffer;
+import org.jboss.starobrno.io.ExposedByteArrayOutputStream;
import org.jboss.starobrno.commands.CommandsFactory;
import org.jboss.starobrno.commands.ReplicableCommand;
import org.jboss.starobrno.config.Configuration;
@@ -33,14 +34,13 @@
import org.jboss.starobrno.util.FastCopyHashMap;
import org.jboss.starobrno.util.Immutables;
import org.jboss.util.NotImplementedException;
+import org.jboss.util.stream.MarshalledValueInputStream;
+import org.jboss.cache.marshall.Marshaller;
import org.jgroups.Address;
import org.jgroups.stack.IpAddress;
import org.jgroups.util.Buffer;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.InputStream;
+import java.io.*;
import java.lang.reflect.Array;
import java.util.*;
@@ -138,7 +138,144 @@
marshallArray(o, out, refMap);
} else
{
- marshallObject(o, out, refMap);
+ if (o == null)
+ {
+ out.writeByte(MAGICNUMBER_NULL);
+ }
+ else if (useRefs && refMap.containsKey(o))// see if this object has been
marshalled before.
+ {
+ out.writeByte(MAGICNUMBER_REF);
+ writeReference(out, refMap.get(o));
+ }
+ else if (o instanceof ReplicableCommand)
+ {
+ ReplicableCommand command = (ReplicableCommand) o;
+
+ if (command.getCommandId() > -1)
+ {
+ out.writeByte(MAGICNUMBER_METHODCALL);
+ marshallCommand(command, out, refMap);
+ }
+ else
+ {
+ throw new IllegalArgumentException("MethodCall does not have a valid
method id. Was this method call created with MethodCallFactory?");
+ }
+ }
+ else if (o instanceof org.jgroups.blocks.MethodCall)
+ {
+ throw new IllegalArgumentException("Usage of a legacy MethodCall
object!!");
+ }
+ else if (o instanceof MarshalledValue)
+ {
+ out.writeByte(MAGICNUMBER_MARSHALLEDVALUE);
+ ((MarshalledValue) o).writeExternal(out);
+ }
+ else if (o instanceof GlobalTransaction)
+ {
+ out.writeByte(MAGICNUMBER_GTX);
+ if (useRefs) writeReference(out, createReference(o, refMap));
+ marshallGlobalTransaction((GlobalTransaction) o, out, refMap);
+ }
+ else if (o instanceof IpAddress)
+ {
+ out.writeByte(MAGICNUMBER_IPADDRESS);
+ marshallIpAddress((IpAddress) o, out);
+ }
+ else if (o.getClass().equals(ArrayList.class))
+ {
+ out.writeByte(MAGICNUMBER_ARRAY_LIST);
+ marshallCollection((Collection) o, out, refMap);
+ }
+ else if (o.getClass().equals(LinkedList.class))
+ {
+ out.writeByte(MAGICNUMBER_LINKED_LIST);
+ marshallCollection((Collection) o, out, refMap);
+ }
+ else if (o.getClass().equals(HashMap.class))
+ {
+ out.writeByte(MAGICNUMBER_HASH_MAP);
+ marshallMap((Map) o, out, refMap);
+ }
+ else if (o.getClass().equals(TreeMap.class))
+ {
+ out.writeByte(MAGICNUMBER_TREE_MAP);
+ marshallMap((Map) o, out, refMap);
+ }
+ else if (o.getClass().equals(FastCopyHashMap.class))
+ {
+ out.writeByte(MAGICNUMBER_FASTCOPY_HASHMAP);
+ marshallMap((Map) o, out, refMap);
+ }
+ else if (o instanceof Map && Immutables.isImmutable(o))
+ {
+ out.writeByte(MAGICNUMBER_IMMUTABLE_MAPCOPY);
+ marshallMap((Map) o, out, refMap);
+ }
+ else if (o.getClass().equals(HashSet.class))
+ {
+ out.writeByte(MAGICNUMBER_HASH_SET);
+ marshallCollection((Collection) o, out, refMap);
+ }
+ else if (o.getClass().equals(TreeSet.class))
+ {
+ out.writeByte(MAGICNUMBER_TREE_SET);
+ marshallCollection((Collection) o, out, refMap);
+ }
+ else if (o instanceof Boolean)
+ {
+ out.writeByte(MAGICNUMBER_BOOLEAN);
+ out.writeBoolean(((Boolean) o).booleanValue());
+ }
+ else if (o instanceof Integer)
+ {
+ out.writeByte(MAGICNUMBER_INTEGER);
+ out.writeInt(((Integer) o).intValue());
+ }
+ else if (o instanceof Long)
+ {
+ out.writeByte(MAGICNUMBER_LONG);
+ out.writeLong(((Long) o).longValue());
+ }
+ else if (o instanceof Short)
+ {
+ out.writeByte(MAGICNUMBER_SHORT);
+ out.writeShort(((Short) o).shortValue());
+ }
+ else if (o instanceof String)
+ {
+ out.writeByte(MAGICNUMBER_STRING);
+ if (useRefs) writeReference(out, createReference(o, refMap));
+ marshallString((String) o, out);
+ }
+ else if (o instanceof NodeDataMarker)
+ {
+ out.writeByte(MAGICNUMBER_NODEDATA_MARKER);
+ ((Externalizable) o).writeExternal(out);
+ }
+ else if (o instanceof NodeDataExceptionMarker)
+ {
+ out.writeByte(MAGICNUMBER_NODEDATA_EXCEPTION_MARKER);
+ ((Externalizable) o).writeExternal(out);
+ }
+ else if (o instanceof NodeData)
+ {
+ out.writeByte(MAGICNUMBER_NODEDATA);
+ ((Externalizable) o).writeExternal(out);
+ }
+ else if (o instanceof Serializable)
+ {
+ if (trace)
+ {
+ log.trace("Warning: using object serialization for " +
o.getClass());
+ }
+ out.writeByte(MAGICNUMBER_SERIALIZABLE);
+ if (useRefs) writeReference(out, createReference(o, refMap));
+ out.writeObject(o);
+ }
+ else
+ {
+ throw new Exception("Don't know how to marshall object of type
" + o.getClass());
+ }
}
}
@@ -162,6 +299,12 @@
}
}
+ private int createReference(Object o, Map<Object, Integer> refMap)
+ {
+ int reference = refMap.size();
+ refMap.put(o, reference);
+ return reference;
+ }
private void marshallGlobalTransaction(GlobalTransaction globalTransaction,
ObjectOutputStream out, Map<Object, Integer> refMap) throws Exception
{
@@ -787,12 +930,20 @@
public ByteBuffer objectToBuffer(Object o) throws Exception
{
- throw new RuntimeException("Needs to be overridden!");
+ ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(128);
+ ObjectOutputStream out = new ObjectOutputStream(baos);
+
+ //now marshall the contents of the object
+ objectToObjectStream(o, out);
+ out.close();
+ // and return bytes.
+ return new ByteBuffer(baos.getRawBuffer(), 0, baos.size());
}
public Object objectFromByteBuffer(byte[] buf, int offset, int length) throws
Exception
{
- throw new RuntimeException("Needs to be overridden!");
+ ObjectInputStream in = new MarshalledValueInputStream(new ByteArrayInputStream(buf,
offset, length));
+ return objectFromObjectStream(in);
}
public Object objectFromByteBuffer(byte[] bytes) throws Exception
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CommandAwareRpcDispatcher.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CommandAwareRpcDispatcher.java 2008-10-15
18:13:06 UTC (rev 6959)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/marshall/CommandAwareRpcDispatcher.java 2008-10-15
19:41:25 UTC (rev 6960)
@@ -130,12 +130,6 @@
protected boolean isValid(Message req)
{
- if (server_obj == null)
- {
- log.error("no method handler is registered. Discarding request.");
- return false;
- }
-
if (req == null || req.getLength() == 0)
{
log.error("message or message buffer is null");
@@ -160,8 +154,8 @@
}
if (trace)
- log.trace(new StringBuilder("dests=").append(dests).append(",
command=").append(command).
- append(", mode=").append(mode).append(",
timeout=").append(timeout));
+ log.trace(new StringBuilder("dests=").append(dests).append(",
command=").append(command).append(", mode=").
+ append(mode).append(", timeout=").append(timeout));
ReplicationTask replicationTask = new ReplicationTask(command, oob, dests, mode,
timeout, false, filter);
Future<RspList> response = replicationProcessor.submit(replicationTask);
@@ -292,7 +286,6 @@
// a null response is 99% likely to be due to a marshalling problem - we throw a
NSE, this needs to be changed when
// JGroups supports
http://jira.jboss.com/jira/browse/JGRP-193
// the serialization problem could be on the remote end and this is why we
cannot catch this above, when marshalling.
-
if (retval == null)
throw new NotSerializableException("RpcDispatcher returned a null. This
is most often caused by args for " + command.getClass().getSimpleName() + " not
being serializable.");
return retval;
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/notifications/Notifier.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/notifications/Notifier.java 2008-10-15
18:13:06 UTC (rev 6959)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/notifications/Notifier.java 2008-10-15
19:41:25 UTC (rev 6960)
@@ -120,15 +120,11 @@
/**
* Adds a cache listener to the list of cache listeners registered.
- *
- * @param listener
*/
void addCacheListener(Object listener);
/**
* Removes a cache listener from the list of cache listeners registered.
- *
- * @param listener
*/
void removeCacheListener(Object listener);
@@ -136,4 +132,6 @@
* @return Retrieves an (unmodifiable) set of cache listeners registered.
*/
Set<Object> getCacheListeners();
+
+ void notifyNodeInvalidated(Object key, boolean pre, InvocationContext ctx);
}
\ No newline at end of file
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/notifications/NotifierImpl.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/notifications/NotifierImpl.java 2008-10-15
18:13:06 UTC (rev 6959)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/notifications/NotifierImpl.java 2008-10-15
19:41:25 UTC (rev 6960)
@@ -639,4 +639,9 @@
if (list == null) throw new CacheException("Unknown listener annotation:
" + annotation);
return list;
}
+
+ public void notifyNodeInvalidated(Object key, boolean pre, InvocationContext ctx)
+ {
+ throw new UnsupportedOperationException("Not implemented");//todo please
implement!
+ }
}
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManagerImpl.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManagerImpl.java 2008-10-15
18:13:06 UTC (rev 6959)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManagerImpl.java 2008-10-15
19:41:25 UTC (rev 6960)
@@ -115,13 +115,12 @@
public void setupDependencies(ChannelMessageListener messageListener, Configuration
configuration, Notifier notifier,
ExtendedMarshaller extendedMarshaller, TransactionTable
txTable,
TransactionManager txManager, InvocationContextContainer
container, InterceptorChain interceptorChain,
- ComponentRegistry componentRegistry, LockManager
lockManager)
+ ComponentRegistry componentRegistry, LockManager
lockManager, CacheSPI spi)
{
this.messageListener = messageListener;
this.configuration = configuration;
this.notifier = notifier;
- // TODO: Inject cacheSPI when we are ready
-// this.spi = spi;
+ this.spi = spi;
this.extendedMarshaller = extendedMarshaller;
this.txManager = txManager;
this.txTable = txTable;
@@ -392,7 +391,6 @@
throw new TimeoutException("State retrieval timed out waiting for
flush unblock.");
}
useOutOfBandMessage = false;
- // todo fix me!!
RspList rsps = rpcDispatcher.invokeRemoteCommands(recipients, command,
modeToUse, timeout, useOutOfBandMessage, responseFilter);
if (mode == GroupRequest.GET_NONE) return Collections.emptyList();// async case
if (trace)
Modified: core/branches/flat/src/test/java/org/jboss/starobrno/BasicTest.java
===================================================================
--- core/branches/flat/src/test/java/org/jboss/starobrno/BasicTest.java 2008-10-15
18:13:06 UTC (rev 6959)
+++ core/branches/flat/src/test/java/org/jboss/starobrno/BasicTest.java 2008-10-15
19:41:25 UTC (rev 6960)
@@ -23,7 +23,10 @@
import org.jboss.starobrno.config.Configuration;
import org.jboss.starobrno.manager.CacheManager;
+import org.jboss.starobrno.util.TestingUtil;
import org.testng.annotations.Test;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
@Test(groups = "functional")
public class BasicTest
@@ -62,6 +65,7 @@
}
}
+ public static final Log log = LogFactory.getLog(BasicTest.class);
public void testBasicReplication()
{
Configuration configuration = new Configuration();
@@ -75,10 +79,14 @@
firstManager.start();
secondManager.start();
- Cache firstCache = firstManager.createCache("test");
- Cache secondCache = firstManager.createCache("test");
+ CacheSPI firstCache = (CacheSPI) firstManager.createCache("test");
+ CacheSPI secondCache = (CacheSPI) secondManager.createCache("test");
+ TestingUtil.blockUntilViewReceived(secondCache, 2, 3000);
+
+
firstCache.put("key","value");
+
assert secondCache.get("key").equals("value");
assert firstCache.get("key").equals("value");
secondCache.put("key", "value2");
Modified: core/branches/flat/src/test/java/org/jboss/starobrno/util/TestingUtil.java
===================================================================
--- core/branches/flat/src/test/java/org/jboss/starobrno/util/TestingUtil.java 2008-10-15
18:13:06 UTC (rev 6959)
+++ core/branches/flat/src/test/java/org/jboss/starobrno/util/TestingUtil.java 2008-10-15
19:41:25 UTC (rev 6960)
@@ -324,7 +324,7 @@
public static boolean isCacheViewComplete(Cache c, int memberCount, boolean
barfIfTooManyMembers)
{
CacheSPI cache = (CacheSPI) c;
- List members = cache.getRPCManager().getMembers();
+ List members = cache.getMembers();
if (members == null || memberCount > members.size())
{
return false;
Modified: core/branches/flat/src/test/resources/log4j.xml
===================================================================
--- core/branches/flat/src/test/resources/log4j.xml 2008-10-15 18:13:06 UTC (rev 6959)
+++ core/branches/flat/src/test/resources/log4j.xml 2008-10-15 19:41:25 UTC (rev 6960)
@@ -49,6 +49,10 @@
<priority value="TRACE"/>
</category>
+ <category name="org.jboss.starobrno.factories.ComponentRegistry">
+ <priority value="WARN"/>
+ </category>
+
<category name="org.jboss.cache.factories">
<priority value="TRACE"/>
</category>