[jboss-cvs] JBossAS SVN: r105447 - in projects/cluster/ha-server-cache-jbc/trunk: src/main/java/org/jboss/ha/cachemanager and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon May 31 23:06:45 EDT 2010
Author: bstansberry at jboss.com
Date: 2010-05-31 23:06:45 -0400 (Mon, 31 May 2010)
New Revision: 105447
Added:
projects/cluster/ha-server-cache-jbc/trunk/src/main/java/org/jboss/ha/cachemanager/MuxHandlerChannel.java
Modified:
projects/cluster/ha-server-cache-jbc/trunk/pom.xml
projects/cluster/ha-server-cache-jbc/trunk/src/main/java/org/jboss/ha/cachemanager/CacheManager.java
projects/cluster/ha-server-cache-jbc/trunk/src/main/java/org/jboss/ha/framework/server/HAPartitionCacheHandlerImpl.java
Log:
Utilize ha-server-core MuxRpcDispatcher stuff
Modified: projects/cluster/ha-server-cache-jbc/trunk/pom.xml
===================================================================
--- projects/cluster/ha-server-cache-jbc/trunk/pom.xml 2010-06-01 03:05:10 UTC (rev 105446)
+++ projects/cluster/ha-server-cache-jbc/trunk/pom.xml 2010-06-01 03:06:45 UTC (rev 105447)
@@ -32,6 +32,7 @@
<properties>
<version.jboss.ha.server.cache.spi>2.2.0.Final</version.jboss.ha.server.cache.spi>
<version.jboss.ha.server.api>2.0.0.Alpha3</version.jboss.ha.server.api>
+ <version.jboss.ha.server.core>1.0.0-SNAPSHOT</version.jboss.ha.server.core>
<version.jboss.common.core>2.2.17.GA</version.jboss.common.core>
<version.jboss.logging.spi>2.0.5.GA</version.jboss.logging.spi>
<version.jboss.cache>3.2.5.GA</version.jboss.cache>
@@ -105,6 +106,12 @@
</dependency>
<dependency>
+ <groupId>org.jboss.cluster</groupId>
+ <artifactId>jboss-ha-server-core</artifactId>
+ <version>${version.jboss.ha.server.core}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.jboss</groupId>
<artifactId>jboss-common-core</artifactId>
<version>${version.jboss.common.core}</version>
Modified: projects/cluster/ha-server-cache-jbc/trunk/src/main/java/org/jboss/ha/cachemanager/CacheManager.java
===================================================================
--- projects/cluster/ha-server-cache-jbc/trunk/src/main/java/org/jboss/ha/cachemanager/CacheManager.java 2010-06-01 03:05:10 UTC (rev 105446)
+++ projects/cluster/ha-server-cache-jbc/trunk/src/main/java/org/jboss/ha/cachemanager/CacheManager.java 2010-06-01 03:06:45 UTC (rev 105447)
@@ -45,6 +45,7 @@
import org.jboss.cache.DefaultCacheFactory;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.config.ConfigurationRegistry;
+import org.jboss.cache.config.RuntimeConfig;
import org.jboss.cache.config.XmlParsingConfigurationRegistry;
import org.jboss.cache.notifications.annotation.CacheListener;
import org.jboss.cache.notifications.annotation.CacheStarted;
@@ -59,7 +60,9 @@
import org.jboss.ha.framework.server.PojoCacheManagerLocator;
import org.jboss.logging.Logger;
import org.jboss.util.naming.NonSerializableFactory;
+import org.jgroups.Channel;
import org.jgroups.ChannelFactory;
+import org.jgroups.JChannel;
/**
* JBoss AS specific implementation of {@link CacheManager}. Extends the core JBoss Cache
@@ -613,9 +616,11 @@
* @param config the Configuration for the cache
* @return the PojoCache
*/
- protected PojoCache createPojoCache(Configuration config)
+ protected PojoCache createPojoCache(Configuration config) throws Exception
{
- return PojoCacheFactory.createCache(config, false);
+ PojoCache pc = PojoCacheFactory.createCache(config, false);
+ configureMuxUpHandlerChannel(pc.getCache());
+ return pc;
}
/**
@@ -626,9 +631,11 @@
* @param config the Configuration for the cache
* @return the Cache
*/
- protected Cache<Object, Object> createCache(Configuration config)
+ protected Cache<Object, Object> createCache(Configuration config) throws Exception
{
- return new DefaultCacheFactory<Object, Object>().createCache(config, false);
+ Cache<Object, Object> cache = new DefaultCacheFactory<Object, Object>().createCache(config, false);
+ configureMuxUpHandlerChannel(cache);
+ return cache;
}
@@ -817,6 +824,44 @@
return base + ":" + attributesBase + configName;
}
+ private void configureMuxUpHandlerChannel(Cache<?, ?> cache) throws Exception
+ {
+ RuntimeConfig rc = cache.getConfiguration().getRuntimeConfig();
+ Channel channel = rc.getChannel();
+
+ if (channel == null)
+ {
+ // TODO we could deal with other JBC mechanisms of configuring Channels, but in
+ // reality the AS use cases that want MuxUpHandler shouldn't configure their
+ // JBC that way
+ ChannelFactory cf = rc.getMuxChannelFactory();
+ if (cf == null)
+ {
+ log.debug("Cache " + cache.getConfiguration().getClusterName() +
+ " does not have a ChannelFactory injected so MuxUpHandler cannot be integrated");
+ }
+ String stack = cache.getConfiguration().getMuxStackName();
+ if (stack == null)
+ {
+ log.debug("Cache " + cache.getConfiguration().getClusterName() +
+ " does not have a MuxStackName configured so MuxUpHandler cannot be integrated");
+ }
+ if (cf != null && stack != null)
+ {
+ // This doesn't result in JMX reg of channel
+ //channel = cf.createChannel(stack);
+ channel = cf.createMultiplexerChannel(stack, cache.getConfiguration().getClusterName());
+ rc.setChannel(new MuxHandlerChannel((JChannel) channel));
+ }
+ }
+ else if (channel.getUpHandler() == null)
+ {
+ // replace
+ rc.setChannel(new MuxHandlerChannel((JChannel) channel));
+ }
+ // else the Channel was injected and already had a handler -- shouldn't happen
+ }
+
@CacheListener
public static class StartStopListener
{
Added: projects/cluster/ha-server-cache-jbc/trunk/src/main/java/org/jboss/ha/cachemanager/MuxHandlerChannel.java
===================================================================
--- projects/cluster/ha-server-cache-jbc/trunk/src/main/java/org/jboss/ha/cachemanager/MuxHandlerChannel.java (rev 0)
+++ projects/cluster/ha-server-cache-jbc/trunk/src/main/java/org/jboss/ha/cachemanager/MuxHandlerChannel.java 2010-06-01 03:06:45 UTC (rev 105447)
@@ -0,0 +1,543 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.ha.cachemanager;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+
+import org.jboss.ha.core.jgroups.blocks.mux.MuxUpHandler;
+import org.jgroups.Address;
+import org.jgroups.ChannelClosedException;
+import org.jgroups.ChannelException;
+import org.jgroups.ChannelListener;
+import org.jgroups.ChannelNotConnectedException;
+import org.jgroups.Event;
+import org.jgroups.JChannel;
+import org.jgroups.Message;
+import org.jgroups.Receiver;
+import org.jgroups.TimeoutException;
+import org.jgroups.UpHandler;
+import org.jgroups.View;
+import org.jgroups.logging.Log;
+import org.jgroups.logging.LogFactory;
+import org.jgroups.stack.ProtocolStack;
+
+/**
+ * JGroups <code>JChannel</code> subclass that delegates everything to a
+ * wrapped ordinary <code>JChannel</code> except for calls related to the
+ * {@link #setUpHandler(UpHandler) upHandler} property.
+ *
+ * @author Brian Stansberry
+ *
+ * @version $Revision$
+ */
+public class MuxHandlerChannel extends JChannel
+{
+ private final Log log=LogFactory.getLog(MuxHandlerChannel.class);
+ private final JChannel delegate;
+ private MuxUpHandler upHandler;
+
+ public MuxHandlerChannel(JChannel delegate) throws ChannelException
+ {
+ super(false);
+ if (delegate.getUpHandler() != null)
+ {
+ throw new IllegalStateException("delegate Channel already has an UpHandler");
+ }
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void blockOk()
+ {
+ delegate.blockOk();
+ }
+
+ @Override
+ public void close()
+ {
+ delegate.close();
+ }
+
+ @Override
+ public void connect(String clusterName) throws ChannelException
+ {
+ delegate.connect(clusterName);
+ }
+
+ @Override
+ public void connect(String clusterName, Address target, String stateId, long timeout) throws ChannelException
+ {
+ delegate.connect(clusterName, target, stateId, timeout);
+ }
+
+ @Override
+ public void disconnect()
+ {
+ delegate.disconnect();
+ }
+
+ @Override
+ public Map<String, Object> dumpStats()
+ {
+ return delegate.dumpStats();
+ }
+
+ @Override
+ public boolean flushSupported()
+ {
+ return delegate.flushSupported();
+ }
+
+ @Override
+ public Address getAddress()
+ {
+ return delegate.getAddress();
+ }
+
+ @Override
+ public boolean getAllStates(Vector targets, long timeout) throws ChannelNotConnectedException,
+ ChannelClosedException
+ {
+ return delegate.getAllStates(targets, timeout);
+ }
+
+ @Override
+ public String getChannelName()
+ {
+ return delegate.getChannelName();
+ }
+
+ @Override
+ public String getClusterName()
+ {
+ return delegate.getClusterName();
+ }
+
+ @Override
+ public Map<String, Object> getInfo()
+ {
+ return delegate.getInfo();
+ }
+
+ @Override
+ public Address getLocalAddress()
+ {
+ return delegate.getLocalAddress();
+ }
+
+ @Override
+ protected Log getLog()
+ {
+ return log;
+ }
+
+ @Override
+ public String getName()
+ {
+ return delegate.getName();
+ }
+
+ @Override
+ public String getName(Address member)
+ {
+ return delegate.getName(member);
+ }
+
+ @Override
+ public Object getOpt(int option)
+ {
+ return delegate.getOpt(option);
+ }
+
+ @Override
+ public ProtocolStack getProtocolStack()
+ {
+ return delegate.getProtocolStack();
+ }
+
+ @Override
+ public boolean getState(Address target, long timeout) throws ChannelNotConnectedException, ChannelClosedException
+ {
+ return delegate.getState(target, timeout);
+ }
+
+ @Override
+ public boolean getState(Address target, String stateId, long timeout) throws ChannelNotConnectedException,
+ ChannelClosedException
+ {
+ return delegate.getState(target, stateId, timeout);
+ }
+
+ @Override
+ public View getView()
+ {
+ return delegate.getView();
+ }
+
+ @Override
+ public boolean isConnected()
+ {
+ return delegate.isConnected();
+ }
+
+ @Override
+ public boolean isOpen()
+ {
+ return delegate.isOpen();
+ }
+
+ @Override
+ public Object peek(long timeout) throws ChannelNotConnectedException, ChannelClosedException, TimeoutException
+ {
+ return delegate.peek(timeout);
+ }
+
+ @Override
+ public Object receive(long timeout) throws ChannelNotConnectedException, ChannelClosedException, TimeoutException
+ {
+ return delegate.receive(timeout);
+ }
+
+ @Override
+ public void returnState(byte[] state)
+ {
+ delegate.returnState(state);
+ }
+
+ @Override
+ public void returnState(byte[] state, String stateId)
+ {
+ delegate.returnState(state, stateId);
+ }
+
+ @Override
+ public void send(Message msg) throws ChannelNotConnectedException, ChannelClosedException
+ {
+ delegate.send(msg);
+ }
+
+ @Override
+ public void send(Address dst, Address src, Serializable obj) throws ChannelNotConnectedException,
+ ChannelClosedException
+ {
+ delegate.send(dst, src, obj);
+ }
+
+ @Override
+ public void send(Address dst, Address src, byte[] buf) throws ChannelNotConnectedException, ChannelClosedException
+ {
+ delegate.send(dst, src, buf);
+ }
+
+ @Override
+ public void send(Address dst, Address src, byte[] buf, int offset, int length) throws ChannelNotConnectedException,
+ ChannelClosedException
+ {
+ delegate.send(dst, src, buf, offset, length);
+ }
+
+ @Override
+ public void setInfo(String key, Object value)
+ {
+ delegate.setInfo(key, value);
+ }
+
+ @Override
+ public void setName(String name)
+ {
+ delegate.setName(name);
+ }
+
+ @Override
+ public void setOpt(int option, Object value)
+ {
+ delegate.setOpt(option, value);
+ }
+
+ @Override
+ public void shutdown()
+ {
+ delegate.shutdown();
+ }
+
+ @Override
+ public boolean startFlush(boolean automaticResume)
+ {
+ return delegate.startFlush(automaticResume);
+ }
+
+ @Override
+ public boolean startFlush(List<Address> flushParticipants, boolean automaticResume)
+ {
+ return delegate.startFlush(flushParticipants, automaticResume);
+ }
+
+ @Override
+ public boolean startFlush(long timeout, boolean automaticResume)
+ {
+ return delegate.startFlush(timeout, automaticResume);
+ }
+
+ @Override
+ public void stopFlush()
+ {
+ delegate.stopFlush();
+ }
+
+ @Override
+ public void stopFlush(List<Address> flushParticipants)
+ {
+ delegate.stopFlush(flushParticipants);
+ }
+
+ @Override
+ public synchronized void addChannelListener(ChannelListener listener)
+ {
+ delegate.addChannelListener(listener);
+ }
+
+ @Override
+ public synchronized void clearChannelListeners()
+ {
+ delegate.clearChannelListeners();
+ }
+
+ @Override
+ public void down(Event evt)
+ {
+ delegate.down(evt);
+ }
+
+ @Override
+ public Object downcall(Event evt)
+ {
+ return delegate.downcall(evt);
+ }
+
+ @Override
+ public String dumpQueue()
+ {
+ return delegate.dumpQueue();
+ }
+
+ @Override
+ public int getNumMessages()
+ {
+ return delegate.getNumMessages();
+ }
+
+ @Override
+ public String getProperties()
+ {
+ return delegate.getProperties();
+ }
+
+ @Override
+ public Receiver getReceiver()
+ {
+ return delegate.getReceiver();
+ }
+
+ @Override
+ public UpHandler getUpHandler()
+ {
+ return delegate.getUpHandler();
+ }
+
+ @Override
+ public void open() throws ChannelException
+ {
+ delegate.open();
+ }
+
+ @Override
+ public synchronized void removeChannelListener(ChannelListener listener)
+ {
+ delegate.removeChannelListener(listener);
+ }
+
+ @Override
+ public void setChannelListener(ChannelListener channelListener)
+ {
+ delegate.setChannelListener(channelListener);
+ }
+
+ @Override
+ public void setReceiver(Receiver r)
+ {
+ delegate.setReceiver(r);
+ }
+
+ @Override
+ public void setUpHandler(UpHandler upHandler)
+ {
+ if (this.upHandler != null)
+ {
+ throw new IllegalStateException("UpHandler already set");
+ }
+ this.upHandler = new MuxUpHandler(upHandler);
+ delegate.setUpHandler(this.upHandler);
+ }
+
+
+
+
+
+
+
+
+
+
+
+ @Override
+ public synchronized void connect(String arg0, Address arg1, String arg2, long arg3, boolean arg4)
+ throws ChannelException
+ {
+ delegate.connect(arg0, arg1, arg2, arg3, arg4);
+ }
+
+ @Override
+ public synchronized void connect(String arg0, boolean arg1) throws ChannelException
+ {
+ delegate.connect(arg0, arg1);
+ }
+
+ @Override
+ public Map<String, Object> dumpStats(String protocolName)
+ {
+ return delegate.dumpStats(protocolName);
+ }
+
+ @Override
+ public String dumpTimerQueue()
+ {
+ return delegate.dumpTimerQueue();
+ }
+
+ @Override
+ public void enableStats(boolean stats)
+ {
+ delegate.enableStats(stats);
+ }
+
+ @Override
+ public String getAddressAsString()
+ {
+ return delegate.getAddressAsString();
+ }
+
+ @Override
+ public String getAddressAsUUID()
+ {
+ return delegate.getAddressAsUUID();
+ }
+
+ @Override
+ public int getNumberOfTasksInTimer()
+ {
+ return delegate.getNumberOfTasksInTimer();
+ }
+
+ @Override
+ public long getReceivedBytes()
+ {
+ return delegate.getReceivedBytes();
+ }
+
+ @Override
+ public long getReceivedMessages()
+ {
+ return delegate.getReceivedMessages();
+ }
+
+ @Override
+ public long getSentBytes()
+ {
+ return delegate.getSentBytes();
+ }
+
+ @Override
+ public long getSentMessages()
+ {
+ return delegate.getSentMessages();
+ }
+
+ @Override
+ public boolean getState(Address target, String stateId, long timeout, boolean useFlushIfPresent)
+ throws ChannelNotConnectedException, ChannelClosedException
+ {
+ return delegate.getState(target, stateId, timeout, useFlushIfPresent);
+ }
+
+ @Override
+ public int getTimerThreads()
+ {
+ return delegate.getTimerThreads();
+ }
+
+ @Override
+ public String getViewAsString()
+ {
+ return delegate.getViewAsString();
+ }
+
+ @Override
+ public String printProtocolSpec(boolean includeProperties)
+ {
+ return delegate.printProtocolSpec(includeProperties);
+ }
+
+ @Override
+ public void resetStats()
+ {
+ delegate.resetStats();
+ }
+
+ @Override
+ public boolean statsEnabled()
+ {
+ return delegate.statsEnabled();
+ }
+
+ @Override
+ public String toString(boolean details)
+ {
+ return delegate.toString(details);
+ }
+
+ @Override
+ public Object up(Event arg0)
+ {
+ return delegate.up(arg0);
+ }
+
+ public MuxUpHandler getMuxUpHandler()
+ {
+ return upHandler;
+ }
+
+
+}
Modified: projects/cluster/ha-server-cache-jbc/trunk/src/main/java/org/jboss/ha/framework/server/HAPartitionCacheHandlerImpl.java
===================================================================
--- projects/cluster/ha-server-cache-jbc/trunk/src/main/java/org/jboss/ha/framework/server/HAPartitionCacheHandlerImpl.java 2010-06-01 03:05:10 UTC (rev 105446)
+++ projects/cluster/ha-server-cache-jbc/trunk/src/main/java/org/jboss/ha/framework/server/HAPartitionCacheHandlerImpl.java 2010-06-01 03:06:45 UTC (rev 105447)
@@ -28,6 +28,7 @@
import org.jboss.cache.CacheManager;
import org.jboss.cache.CacheStatus;
import org.jboss.ha.framework.server.spi.HAPartitionCacheHandler;
+import org.jgroups.Channel;
import org.jgroups.ChannelFactory;
/**
@@ -41,6 +42,7 @@
private String cacheConfigName;
private Cache<Object, Object> cache;
private AtomicInteger acquireCount = new AtomicInteger();
+ private boolean startCacheInStart = false;
// CacheHandler ------------------------------------------------------------
@@ -103,6 +105,19 @@
return cache.getConfiguration().getRuntimeConfig().getMuxChannelFactory();
}
+ public synchronized Channel getCacheChannel()
+ {
+ if (cache == null)
+ {
+ throw new IllegalStateException("Must acquire cache before getting cache channel");
+ }
+ if (cache.getCacheStatus() != CacheStatus.STARTED)
+ {
+ throw new IllegalStateException("Must start cache before getting cache channel");
+ }
+ return cache.getConfiguration().getRuntimeConfig().getChannel();
+ }
+
// Properties -------------------------------------------------------------
public synchronized void setCacheConfigName(String cacheConfigName)
@@ -124,7 +139,39 @@
{
return cache;
}
+
+ public boolean isStartCacheInStart()
+ {
+ return startCacheInStart;
+ }
+
+ public void setStartCacheInStart(boolean startCacheInStart)
+ {
+ this.startCacheInStart = startCacheInStart;
+ }
+ // Public ------------------------------------------------------------------
+
+ public synchronized void start() throws Exception
+ {
+ if (startCacheInStart)
+ {
+ if (cache == null)
+ {
+ acquireCache();
+ }
+ startCache();
+ }
+ }
+
+ public synchronized void stop() throws Exception
+ {
+ if (startCacheInStart)
+ {
+ releaseCache();
+ }
+ }
+
// Private -----------------------------------------------------------------
private void validateState()
More information about the jboss-cvs-commits
mailing list