[jboss-cvs] JBossAS SVN: r105447 - in projects/cluster/ha-server-cache-jbc/trunk: src/main/java/org/jboss/ha/cachemanager and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon May 31 23:06:45 EDT 2010


Author: bstansberry at jboss.com
Date: 2010-05-31 23:06:45 -0400 (Mon, 31 May 2010)
New Revision: 105447

Added:
   projects/cluster/ha-server-cache-jbc/trunk/src/main/java/org/jboss/ha/cachemanager/MuxHandlerChannel.java
Modified:
   projects/cluster/ha-server-cache-jbc/trunk/pom.xml
   projects/cluster/ha-server-cache-jbc/trunk/src/main/java/org/jboss/ha/cachemanager/CacheManager.java
   projects/cluster/ha-server-cache-jbc/trunk/src/main/java/org/jboss/ha/framework/server/HAPartitionCacheHandlerImpl.java
Log:
Utilize ha-server-core MuxRpcDispatcher stuff

Modified: projects/cluster/ha-server-cache-jbc/trunk/pom.xml
===================================================================
--- projects/cluster/ha-server-cache-jbc/trunk/pom.xml	2010-06-01 03:05:10 UTC (rev 105446)
+++ projects/cluster/ha-server-cache-jbc/trunk/pom.xml	2010-06-01 03:06:45 UTC (rev 105447)
@@ -32,6 +32,7 @@
   <properties>
     <version.jboss.ha.server.cache.spi>2.2.0.Final</version.jboss.ha.server.cache.spi>
     <version.jboss.ha.server.api>2.0.0.Alpha3</version.jboss.ha.server.api>
+    <version.jboss.ha.server.core>1.0.0-SNAPSHOT</version.jboss.ha.server.core>
     <version.jboss.common.core>2.2.17.GA</version.jboss.common.core>
     <version.jboss.logging.spi>2.0.5.GA</version.jboss.logging.spi>
     <version.jboss.cache>3.2.5.GA</version.jboss.cache>
@@ -105,6 +106,12 @@
     </dependency>
     
     <dependency>
+      <groupId>org.jboss.cluster</groupId>
+      <artifactId>jboss-ha-server-core</artifactId>
+      <version>${version.jboss.ha.server.core}</version>
+    </dependency>
+    
+    <dependency>
       <groupId>org.jboss</groupId>
       <artifactId>jboss-common-core</artifactId>
       <version>${version.jboss.common.core}</version>

Modified: projects/cluster/ha-server-cache-jbc/trunk/src/main/java/org/jboss/ha/cachemanager/CacheManager.java
===================================================================
--- projects/cluster/ha-server-cache-jbc/trunk/src/main/java/org/jboss/ha/cachemanager/CacheManager.java	2010-06-01 03:05:10 UTC (rev 105446)
+++ projects/cluster/ha-server-cache-jbc/trunk/src/main/java/org/jboss/ha/cachemanager/CacheManager.java	2010-06-01 03:06:45 UTC (rev 105447)
@@ -45,6 +45,7 @@
 import org.jboss.cache.DefaultCacheFactory;
 import org.jboss.cache.config.Configuration;
 import org.jboss.cache.config.ConfigurationRegistry;
+import org.jboss.cache.config.RuntimeConfig;
 import org.jboss.cache.config.XmlParsingConfigurationRegistry;
 import org.jboss.cache.notifications.annotation.CacheListener;
 import org.jboss.cache.notifications.annotation.CacheStarted;
@@ -59,7 +60,9 @@
 import org.jboss.ha.framework.server.PojoCacheManagerLocator;
 import org.jboss.logging.Logger;
 import org.jboss.util.naming.NonSerializableFactory;
+import org.jgroups.Channel;
 import org.jgroups.ChannelFactory;
+import org.jgroups.JChannel;
 
 /**
  * JBoss AS specific implementation of {@link CacheManager}. Extends the core JBoss Cache
@@ -613,9 +616,11 @@
     * @param config the Configuration for the cache
     * @return the PojoCache
     */
-   protected PojoCache createPojoCache(Configuration config)
+   protected PojoCache createPojoCache(Configuration config) throws Exception
    {
-       return PojoCacheFactory.createCache(config, false);
+      PojoCache pc =  PojoCacheFactory.createCache(config, false);
+      configureMuxUpHandlerChannel(pc.getCache());
+      return pc;
    }
 
    /**
@@ -626,9 +631,11 @@
     * @param config the Configuration for the cache
     * @return the Cache
     */
-   protected Cache<Object, Object> createCache(Configuration config)
+   protected Cache<Object, Object> createCache(Configuration config) throws Exception
    {
-      return new DefaultCacheFactory<Object, Object>().createCache(config, false);
+      Cache<Object, Object> cache = new DefaultCacheFactory<Object, Object>().createCache(config, false);
+      configureMuxUpHandlerChannel(cache);
+      return cache;
    }
    
    
@@ -817,6 +824,44 @@
       return base + ":" + attributesBase + configName;
    }
    
+   private void configureMuxUpHandlerChannel(Cache<?, ?> cache) throws Exception
+   {
+      RuntimeConfig rc = cache.getConfiguration().getRuntimeConfig();
+      Channel channel = rc.getChannel();
+      
+         if (channel == null)      
+         {
+            // TODO we could deal with other JBC mechanisms of configuring Channels, but in
+            // reality the AS use cases that want MuxUpHandler shouldn't configure their
+            // JBC that way
+            ChannelFactory cf = rc.getMuxChannelFactory();
+            if (cf == null)
+            {
+               log.debug("Cache " + cache.getConfiguration().getClusterName() + 
+                     " does not have a ChannelFactory injected so MuxUpHandler cannot be integrated");
+            }
+            String stack = cache.getConfiguration().getMuxStackName();
+            if (stack == null)
+            {
+               log.debug("Cache " + cache.getConfiguration().getClusterName() + 
+                     " does not have a MuxStackName configured so MuxUpHandler cannot be integrated");
+            }
+            if (cf != null && stack != null)
+            {
+               // This doesn't result in JMX reg of channel               
+               //channel = cf.createChannel(stack);
+               channel = cf.createMultiplexerChannel(stack, cache.getConfiguration().getClusterName());
+               rc.setChannel(new MuxHandlerChannel((JChannel) channel));
+            }
+         }
+         else if (channel.getUpHandler() == null)
+         {
+            // replace
+            rc.setChannel(new MuxHandlerChannel((JChannel) channel));
+         }
+         // else the Channel was injected and already had a handler -- shouldn't happen
+   }
+   
    @CacheListener
    public static class StartStopListener
    {

Added: projects/cluster/ha-server-cache-jbc/trunk/src/main/java/org/jboss/ha/cachemanager/MuxHandlerChannel.java
===================================================================
--- projects/cluster/ha-server-cache-jbc/trunk/src/main/java/org/jboss/ha/cachemanager/MuxHandlerChannel.java	                        (rev 0)
+++ projects/cluster/ha-server-cache-jbc/trunk/src/main/java/org/jboss/ha/cachemanager/MuxHandlerChannel.java	2010-06-01 03:06:45 UTC (rev 105447)
@@ -0,0 +1,543 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.ha.cachemanager;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+
+import org.jboss.ha.core.jgroups.blocks.mux.MuxUpHandler;
+import org.jgroups.Address;
+import org.jgroups.ChannelClosedException;
+import org.jgroups.ChannelException;
+import org.jgroups.ChannelListener;
+import org.jgroups.ChannelNotConnectedException;
+import org.jgroups.Event;
+import org.jgroups.JChannel;
+import org.jgroups.Message;
+import org.jgroups.Receiver;
+import org.jgroups.TimeoutException;
+import org.jgroups.UpHandler;
+import org.jgroups.View;
+import org.jgroups.logging.Log;
+import org.jgroups.logging.LogFactory;
+import org.jgroups.stack.ProtocolStack;
+
+/**
+ * JGroups <code>JChannel</code> subclass that delegates everything to a 
+ * wrapped ordinary <code>JChannel</code> except for calls related to the 
+ * {@link #setUpHandler(UpHandler) upHandler} property.
+ *
+ * @author Brian Stansberry
+ * 
+ * @version $Revision$
+ */
+public class MuxHandlerChannel extends JChannel
+{
+   private final Log log=LogFactory.getLog(MuxHandlerChannel.class);
+   private final JChannel delegate;
+   private MuxUpHandler upHandler;
+   
+   public MuxHandlerChannel(JChannel delegate) throws ChannelException
+   {
+      super(false);
+      if (delegate.getUpHandler() != null)
+      {
+         throw new IllegalStateException("delegate Channel already has an UpHandler");
+      }
+      this.delegate = delegate;
+   }
+   
+   @Override
+   public void blockOk()
+   {
+      delegate.blockOk();
+   }
+
+   @Override
+   public void close()
+   {
+      delegate.close();
+   }
+
+   @Override
+   public void connect(String clusterName) throws ChannelException
+   {
+      delegate.connect(clusterName);
+   }
+
+   @Override
+   public void connect(String clusterName, Address target, String stateId, long timeout) throws ChannelException
+   {
+      delegate.connect(clusterName, target, stateId, timeout);
+   }
+
+   @Override
+   public void disconnect()
+   {
+      delegate.disconnect();
+   }
+
+   @Override
+   public Map<String, Object> dumpStats()
+   {
+      return delegate.dumpStats();
+   }
+
+   @Override
+   public boolean flushSupported()
+   {
+      return delegate.flushSupported();
+   }
+
+   @Override
+   public Address getAddress()
+   {
+      return delegate.getAddress();
+   }
+
+   @Override
+   public boolean getAllStates(Vector targets, long timeout) throws ChannelNotConnectedException,
+         ChannelClosedException
+   {
+      return delegate.getAllStates(targets, timeout);
+   }
+
+   @Override
+   public String getChannelName()
+   {
+      return delegate.getChannelName();
+   }
+
+   @Override
+   public String getClusterName()
+   {
+      return delegate.getClusterName();
+   }
+
+   @Override
+   public Map<String, Object> getInfo()
+   {
+      return delegate.getInfo();
+   }
+
+   @Override
+   public Address getLocalAddress()
+   {
+      return delegate.getLocalAddress();
+   }
+
+   @Override
+   protected Log getLog()
+   {
+      return log;
+   }
+
+   @Override
+   public String getName()
+   {
+      return delegate.getName();
+   }
+
+   @Override
+   public String getName(Address member)
+   {
+      return delegate.getName(member);
+   }
+
+   @Override
+   public Object getOpt(int option)
+   {
+      return delegate.getOpt(option);
+   }
+
+   @Override
+   public ProtocolStack getProtocolStack()
+   {
+      return delegate.getProtocolStack();
+   }
+
+   @Override
+   public boolean getState(Address target, long timeout) throws ChannelNotConnectedException, ChannelClosedException
+   {
+      return delegate.getState(target, timeout);
+   }
+
+   @Override
+   public boolean getState(Address target, String stateId, long timeout) throws ChannelNotConnectedException,
+         ChannelClosedException
+   {
+      return delegate.getState(target, stateId, timeout);
+   }
+
+   @Override
+   public View getView()
+   {
+      return delegate.getView();
+   }
+
+   @Override
+   public boolean isConnected()
+   {
+      return delegate.isConnected();
+   }
+
+   @Override
+   public boolean isOpen()
+   {
+      return delegate.isOpen();
+   }
+
+   @Override
+   public Object peek(long timeout) throws ChannelNotConnectedException, ChannelClosedException, TimeoutException
+   {
+      return delegate.peek(timeout);
+   }
+
+   @Override
+   public Object receive(long timeout) throws ChannelNotConnectedException, ChannelClosedException, TimeoutException
+   {
+      return delegate.receive(timeout);
+   }
+
+   @Override
+   public void returnState(byte[] state)
+   {
+      delegate.returnState(state);
+   }
+
+   @Override
+   public void returnState(byte[] state, String stateId)
+   {
+      delegate.returnState(state, stateId);
+   }
+
+   @Override
+   public void send(Message msg) throws ChannelNotConnectedException, ChannelClosedException
+   {
+      delegate.send(msg);
+   }
+
+   @Override
+   public void send(Address dst, Address src, Serializable obj) throws ChannelNotConnectedException,
+         ChannelClosedException
+   {
+      delegate.send(dst, src, obj);
+   }
+
+   @Override
+   public void send(Address dst, Address src, byte[] buf) throws ChannelNotConnectedException, ChannelClosedException
+   {
+      delegate.send(dst, src, buf);
+   }
+
+   @Override
+   public void send(Address dst, Address src, byte[] buf, int offset, int length) throws ChannelNotConnectedException,
+         ChannelClosedException
+   {
+      delegate.send(dst, src, buf, offset, length);
+   }
+
+   @Override
+   public void setInfo(String key, Object value)
+   {
+      delegate.setInfo(key, value);
+   }
+
+   @Override
+   public void setName(String name)
+   {
+      delegate.setName(name);
+   }
+
+   @Override
+   public void setOpt(int option, Object value)
+   {
+      delegate.setOpt(option, value);
+   }
+
+   @Override
+   public void shutdown()
+   {
+      delegate.shutdown();
+   }
+
+   @Override
+   public boolean startFlush(boolean automaticResume)
+   {
+      return delegate.startFlush(automaticResume);
+   }
+
+   @Override
+   public boolean startFlush(List<Address> flushParticipants, boolean automaticResume)
+   {
+      return delegate.startFlush(flushParticipants, automaticResume);
+   }
+
+   @Override
+   public boolean startFlush(long timeout, boolean automaticResume)
+   {
+      return delegate.startFlush(timeout, automaticResume);
+   }
+
+   @Override
+   public void stopFlush()
+   {
+      delegate.stopFlush();
+   }
+
+   @Override
+   public void stopFlush(List<Address> flushParticipants)
+   {
+      delegate.stopFlush(flushParticipants);
+   }
+
+   @Override
+   public synchronized void addChannelListener(ChannelListener listener)
+   {
+      delegate.addChannelListener(listener);
+   }
+
+   @Override
+   public synchronized void clearChannelListeners()
+   {
+      delegate.clearChannelListeners();
+   }
+
+   @Override
+   public void down(Event evt)
+   {
+      delegate.down(evt);
+   }
+
+   @Override
+   public Object downcall(Event evt)
+   {
+      return delegate.downcall(evt);
+   }
+
+   @Override
+   public String dumpQueue()
+   {
+      return delegate.dumpQueue();
+   }
+
+   @Override
+   public int getNumMessages()
+   {
+      return delegate.getNumMessages();
+   }
+
+   @Override
+   public String getProperties()
+   {
+      return delegate.getProperties();
+   }
+
+   @Override
+   public Receiver getReceiver()
+   {
+      return delegate.getReceiver();
+   }
+
+   @Override
+   public UpHandler getUpHandler()
+   {
+      return delegate.getUpHandler();
+   }
+
+   @Override
+   public void open() throws ChannelException
+   {
+      delegate.open();
+   }
+
+   @Override
+   public synchronized void removeChannelListener(ChannelListener listener)
+   {
+      delegate.removeChannelListener(listener);
+   }
+
+   @Override
+   public void setChannelListener(ChannelListener channelListener)
+   {
+      delegate.setChannelListener(channelListener);
+   }
+
+   @Override
+   public void setReceiver(Receiver r)
+   {
+      delegate.setReceiver(r);
+   }
+
+   @Override
+   public void setUpHandler(UpHandler upHandler)
+   {
+      if (this.upHandler != null)
+      {
+         throw new IllegalStateException("UpHandler already set");
+      }
+      this.upHandler = new MuxUpHandler(upHandler);
+      delegate.setUpHandler(this.upHandler);
+   }
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   
+   @Override
+   public synchronized void connect(String arg0, Address arg1, String arg2, long arg3, boolean arg4)
+         throws ChannelException
+   {
+      delegate.connect(arg0, arg1, arg2, arg3, arg4);
+   }
+
+   @Override
+   public synchronized void connect(String arg0, boolean arg1) throws ChannelException
+   {
+      delegate.connect(arg0, arg1);
+   }
+
+   @Override
+   public Map<String, Object> dumpStats(String protocolName)
+   {
+      return delegate.dumpStats(protocolName);
+   }
+
+   @Override
+   public String dumpTimerQueue()
+   {
+      return delegate.dumpTimerQueue();
+   }
+
+   @Override
+   public void enableStats(boolean stats)
+   {
+      delegate.enableStats(stats);
+   }
+
+   @Override
+   public String getAddressAsString()
+   {
+      return delegate.getAddressAsString();
+   }
+
+   @Override
+   public String getAddressAsUUID()
+   {
+      return delegate.getAddressAsUUID();
+   }
+
+   @Override
+   public int getNumberOfTasksInTimer()
+   {
+      return delegate.getNumberOfTasksInTimer();
+   }
+
+   @Override
+   public long getReceivedBytes()
+   {
+      return delegate.getReceivedBytes();
+   }
+
+   @Override
+   public long getReceivedMessages()
+   {
+      return delegate.getReceivedMessages();
+   }
+
+   @Override
+   public long getSentBytes()
+   {
+      return delegate.getSentBytes();
+   }
+
+   @Override
+   public long getSentMessages()
+   {
+      return delegate.getSentMessages();
+   }
+
+   @Override
+   public boolean getState(Address target, String stateId, long timeout, boolean useFlushIfPresent)
+         throws ChannelNotConnectedException, ChannelClosedException
+   {
+      return delegate.getState(target, stateId, timeout, useFlushIfPresent);
+   }
+
+   @Override
+   public int getTimerThreads()
+   {
+      return delegate.getTimerThreads();
+   }
+
+   @Override
+   public String getViewAsString()
+   {
+      return delegate.getViewAsString();
+   }
+
+   @Override
+   public String printProtocolSpec(boolean includeProperties)
+   {
+      return delegate.printProtocolSpec(includeProperties);
+   }
+
+   @Override
+   public void resetStats()
+   {
+      delegate.resetStats();
+   }
+
+   @Override
+   public boolean statsEnabled()
+   {
+      return delegate.statsEnabled();
+   }
+
+   @Override
+   public String toString(boolean details)
+   {
+      return delegate.toString(details);
+   }
+
+   @Override
+   public Object up(Event arg0)
+   {
+      return delegate.up(arg0);
+   }
+
+   public MuxUpHandler getMuxUpHandler()
+   {
+      return upHandler;
+   }
+   
+
+}

Modified: projects/cluster/ha-server-cache-jbc/trunk/src/main/java/org/jboss/ha/framework/server/HAPartitionCacheHandlerImpl.java
===================================================================
--- projects/cluster/ha-server-cache-jbc/trunk/src/main/java/org/jboss/ha/framework/server/HAPartitionCacheHandlerImpl.java	2010-06-01 03:05:10 UTC (rev 105446)
+++ projects/cluster/ha-server-cache-jbc/trunk/src/main/java/org/jboss/ha/framework/server/HAPartitionCacheHandlerImpl.java	2010-06-01 03:06:45 UTC (rev 105447)
@@ -28,6 +28,7 @@
 import org.jboss.cache.CacheManager;
 import org.jboss.cache.CacheStatus;
 import org.jboss.ha.framework.server.spi.HAPartitionCacheHandler;
+import org.jgroups.Channel;
 import org.jgroups.ChannelFactory;
 
 /**
@@ -41,6 +42,7 @@
    private String cacheConfigName;
    private Cache<Object, Object> cache;
    private AtomicInteger acquireCount = new AtomicInteger();
+   private boolean startCacheInStart = false;
    
    // CacheHandler ------------------------------------------------------------
 
@@ -103,6 +105,19 @@
       return cache.getConfiguration().getRuntimeConfig().getMuxChannelFactory();
    }
    
+   public synchronized Channel getCacheChannel()
+   {
+      if (cache == null)
+      {
+         throw new IllegalStateException("Must acquire cache before getting cache channel");
+      }
+      if (cache.getCacheStatus() != CacheStatus.STARTED)
+      {
+         throw new IllegalStateException("Must start cache before getting cache channel");
+      }
+      return cache.getConfiguration().getRuntimeConfig().getChannel();
+   }
+   
    // Properties -------------------------------------------------------------
 
    public synchronized void setCacheConfigName(String cacheConfigName)
@@ -124,7 +139,39 @@
    {
       return cache;
    }
+
+   public boolean isStartCacheInStart()
+   {
+      return startCacheInStart;
+   }
+
+   public void setStartCacheInStart(boolean startCacheInStart)
+   {
+      this.startCacheInStart = startCacheInStart;
+   }
    
+   // Public ------------------------------------------------------------------
+   
+   public synchronized void start() throws Exception
+   {
+      if (startCacheInStart)
+      {
+         if (cache == null)
+         {
+            acquireCache();
+         }
+         startCache();
+      }
+   }
+   
+   public synchronized void stop() throws Exception
+   {
+      if (startCacheInStart)
+      {
+         releaseCache();
+      }
+   }
+   
    // Private -----------------------------------------------------------------
 
    private void validateState()




More information about the jboss-cvs-commits mailing list