[jboss-cvs] JBossAS SVN: r105346 - in projects/cluster/ha-server-cache-jbc/branches/Session_Ownership: src/main/java/org/jboss/ha/cachemanager and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri May 28 18:59:29 EDT 2010


Author: bstansberry at jboss.com
Date: 2010-05-28 18:59:28 -0400 (Fri, 28 May 2010)
New Revision: 105346

Added:
   projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/ha/cachemanager/MuxHandlerChannel.java
   projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/SessionOwnershipCache.java
   projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/SessionOwnershipCacheManager.java
   projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/SessionOwnershipPojoCache.java
Modified:
   projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/pom.xml
   projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/AbstractJBossCacheService.java
   projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/Util.java
Log:
First cut on integrating session ownership

Modified: projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/pom.xml
===================================================================
--- projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/pom.xml	2010-05-28 22:58:38 UTC (rev 105345)
+++ projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/pom.xml	2010-05-28 22:59:28 UTC (rev 105346)
@@ -32,6 +32,7 @@
   <properties>
     <version.jboss.ha.server.cache.spi>0.0.1-SNAPSHOT</version.jboss.ha.server.cache.spi>
     <version.jboss.ha.server.api>2.0.0-SNAPSHOT</version.jboss.ha.server.api>
+    <version.jboss.ha.server.core>1.0.0-SNAPSHOT</version.jboss.ha.server.core>
     <version.jboss.common.core>2.2.17.GA</version.jboss.common.core>
     <version.jboss.logging.spi>2.0.5.GA</version.jboss.logging.spi>
     <version.jboss.cache>3.2.5.GA</version.jboss.cache>
@@ -105,6 +106,12 @@
     </dependency>
     
     <dependency>
+      <groupId>org.jboss.cluster</groupId>
+      <artifactId>jboss-ha-server-core</artifactId>
+      <version>${version.jboss.ha.server.core}</version>
+    </dependency>
+    
+    <dependency>
       <groupId>org.jboss</groupId>
       <artifactId>jboss-common-core</artifactId>
       <version>${version.jboss.common.core}</version>

Added: projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/ha/cachemanager/MuxHandlerChannel.java
===================================================================
--- projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/ha/cachemanager/MuxHandlerChannel.java	                        (rev 0)
+++ projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/ha/cachemanager/MuxHandlerChannel.java	2010-05-28 22:59:28 UTC (rev 105346)
@@ -0,0 +1,419 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.ha.cachemanager;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+
+import org.jboss.ha.core.framework.server.MuxUpHandler;
+import org.jgroups.Address;
+import org.jgroups.Channel;
+import org.jgroups.ChannelClosedException;
+import org.jgroups.ChannelException;
+import org.jgroups.ChannelListener;
+import org.jgroups.ChannelNotConnectedException;
+import org.jgroups.Event;
+import org.jgroups.Message;
+import org.jgroups.Receiver;
+import org.jgroups.TimeoutException;
+import org.jgroups.UpHandler;
+import org.jgroups.View;
+import org.jgroups.logging.Log;
+import org.jgroups.logging.LogFactory;
+import org.jgroups.stack.ProtocolStack;
+
+/**
+ *
+ *
+ * @author Brian Stansberry
+ * 
+ * @version $Revision$
+ */
+public class MuxHandlerChannel extends Channel
+{
+   private final Log log=LogFactory.getLog(MuxHandlerChannel.class);
+   private final Channel delegate;
+   private MuxUpHandler upHandler;
+   
+   public MuxHandlerChannel(Channel delegate)
+   {
+      this.delegate = delegate;
+      UpHandler uh = delegate.getUpHandler();
+      if (uh == null)
+      {
+         this.upHandler = new MuxUpHandler();
+         this.delegate.setUpHandler(upHandler);
+      }
+      else if (uh instanceof MuxUpHandler)
+      {
+         this.upHandler = (MuxUpHandler) uh;
+      }
+      else
+      {
+         throw new IllegalStateException("delegate Channel already has a non-Mux UpHandler");
+      }
+   }
+   
+   @Override
+   public void blockOk()
+   {
+      delegate.blockOk();
+   }
+
+   @Override
+   public void close()
+   {
+      delegate.close();
+   }
+
+   @Override
+   public void connect(String clusterName) throws ChannelException
+   {
+      delegate.connect(clusterName);
+   }
+
+   @Override
+   public void connect(String clusterName, Address target, String stateId, long timeout) throws ChannelException
+   {
+      delegate.connect(clusterName, target, stateId, timeout);
+   }
+
+   @Override
+   public void disconnect()
+   {
+      delegate.disconnect();
+   }
+
+   @Override
+   public Map<String, Object> dumpStats()
+   {
+      return delegate.dumpStats();
+   }
+
+   @Override
+   public boolean flushSupported()
+   {
+      return delegate.flushSupported();
+   }
+
+   @Override
+   public Address getAddress()
+   {
+      return delegate.getAddress();
+   }
+
+   @Override
+   public boolean getAllStates(Vector targets, long timeout) throws ChannelNotConnectedException,
+         ChannelClosedException
+   {
+      return delegate.getAllStates(targets, timeout);
+   }
+
+   @Override
+   public String getChannelName()
+   {
+      return delegate.getChannelName();
+   }
+
+   @Override
+   public String getClusterName()
+   {
+      return delegate.getClusterName();
+   }
+
+   @Override
+   public Map<String, Object> getInfo()
+   {
+      return delegate.getInfo();
+   }
+
+   @Override
+   public Address getLocalAddress()
+   {
+      return delegate.getLocalAddress();
+   }
+
+   @Override
+   protected Log getLog()
+   {
+      return log;
+   }
+
+   @Override
+   public String getName()
+   {
+      return delegate.getName();
+   }
+
+   @Override
+   public String getName(Address member)
+   {
+      return delegate.getName(member);
+   }
+
+   @Override
+   public Object getOpt(int option)
+   {
+      return delegate.getOpt(option);
+   }
+
+   @Override
+   public ProtocolStack getProtocolStack()
+   {
+      return delegate.getProtocolStack();
+   }
+
+   @Override
+   public boolean getState(Address target, long timeout) throws ChannelNotConnectedException, ChannelClosedException
+   {
+      return delegate.getState(target, timeout);
+   }
+
+   @Override
+   public boolean getState(Address target, String stateId, long timeout) throws ChannelNotConnectedException,
+         ChannelClosedException
+   {
+      return delegate.getState(target, stateId, timeout);
+   }
+
+   @Override
+   public View getView()
+   {
+      return delegate.getView();
+   }
+
+   @Override
+   public boolean isConnected()
+   {
+      return delegate.isConnected();
+   }
+
+   @Override
+   public boolean isOpen()
+   {
+      return delegate.isOpen();
+   }
+
+   @Override
+   public Object peek(long timeout) throws ChannelNotConnectedException, ChannelClosedException, TimeoutException
+   {
+      return delegate.peek(timeout);
+   }
+
+   @Override
+   public Object receive(long timeout) throws ChannelNotConnectedException, ChannelClosedException, TimeoutException
+   {
+      return delegate.receive(timeout);
+   }
+
+   @Override
+   public void returnState(byte[] state)
+   {
+      delegate.returnState(state);
+   }
+
+   @Override
+   public void returnState(byte[] state, String stateId)
+   {
+      delegate.returnState(state, stateId);
+   }
+
+   @Override
+   public void send(Message msg) throws ChannelNotConnectedException, ChannelClosedException
+   {
+      delegate.send(msg);
+   }
+
+   @Override
+   public void send(Address dst, Address src, Serializable obj) throws ChannelNotConnectedException,
+         ChannelClosedException
+   {
+      delegate.send(dst, src, obj);
+   }
+
+   @Override
+   public void send(Address dst, Address src, byte[] buf) throws ChannelNotConnectedException, ChannelClosedException
+   {
+      delegate.send(dst, src, buf);
+   }
+
+   @Override
+   public void send(Address dst, Address src, byte[] buf, int offset, int length) throws ChannelNotConnectedException,
+         ChannelClosedException
+   {
+      delegate.send(dst, src, buf, offset, length);
+   }
+
+   @Override
+   public void setInfo(String key, Object value)
+   {
+      delegate.setInfo(key, value);
+   }
+
+   @Override
+   public void setName(String name)
+   {
+      delegate.setName(name);
+   }
+
+   @Override
+   public void setOpt(int option, Object value)
+   {
+      delegate.setOpt(option, value);
+   }
+
+   @Override
+   public void shutdown()
+   {
+      delegate.shutdown();
+   }
+
+   @Override
+   public boolean startFlush(boolean automaticResume)
+   {
+      return delegate.startFlush(automaticResume);
+   }
+
+   @Override
+   public boolean startFlush(List<Address> flushParticipants, boolean automaticResume)
+   {
+      return delegate.startFlush(flushParticipants, automaticResume);
+   }
+
+   @Override
+   public boolean startFlush(long timeout, boolean automaticResume)
+   {
+      return delegate.startFlush(timeout, automaticResume);
+   }
+
+   @Override
+   public void stopFlush()
+   {
+      delegate.stopFlush();
+   }
+
+   @Override
+   public void stopFlush(List<Address> flushParticipants)
+   {
+      delegate.stopFlush(flushParticipants);
+   }
+
+   @Override
+   public synchronized void addChannelListener(ChannelListener listener)
+   {
+      delegate.addChannelListener(listener);
+   }
+
+   @Override
+   public synchronized void clearChannelListeners()
+   {
+      delegate.clearChannelListeners();
+   }
+
+   @Override
+   public void down(Event evt)
+   {
+      delegate.down(evt);
+   }
+
+   @Override
+   public Object downcall(Event evt)
+   {
+      return delegate.downcall(evt);
+   }
+
+   @Override
+   public String dumpQueue()
+   {
+      return delegate.dumpQueue();
+   }
+
+   @Override
+   public int getNumMessages()
+   {
+      return delegate.getNumMessages();
+   }
+
+   @Override
+   public String getProperties()
+   {
+      return delegate.getProperties();
+   }
+
+   @Override
+   public Receiver getReceiver()
+   {
+      return delegate.getReceiver();
+   }
+
+   @Override
+   public UpHandler getUpHandler()
+   {
+      // We hide the upHandler from callers so they will set it. Yuck!
+      return null;
+   }
+
+   @Override
+   public void open() throws ChannelException
+   {
+      delegate.open();
+   }
+
+   @Override
+   public synchronized void removeChannelListener(ChannelListener listener)
+   {
+      delegate.removeChannelListener(listener);
+   }
+
+   @Override
+   public void setChannelListener(ChannelListener channelListener)
+   {
+      delegate.setChannelListener(channelListener);
+   }
+
+   @Override
+   public void setReceiver(Receiver r)
+   {
+      delegate.setReceiver(r);
+   }
+
+   @Override
+   public void setUpHandler(UpHandler upHandler)
+   {
+      if (this.upHandler != null)
+      {
+         throw new IllegalStateException("UpHandler already set");
+      }
+      this.upHandler = new MuxUpHandler(upHandler);
+      delegate.setUpHandler(this.upHandler);
+   }
+   
+   public MuxUpHandler getMuxUpHandler()
+   {
+      return upHandler;
+   }
+   
+
+}

Modified: projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/AbstractJBossCacheService.java
===================================================================
--- projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/AbstractJBossCacheService.java	2010-05-28 22:58:38 UTC (rev 105345)
+++ projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/AbstractJBossCacheService.java	2010-05-28 22:59:28 UTC (rev 105346)
@@ -49,6 +49,8 @@
 import org.jboss.cache.transaction.BatchModeTransactionManager;
 import org.jboss.ha.framework.server.MarshalledValueHelper;
 import org.jboss.ha.framework.server.SimpleCachableMarshalledValue;
+import org.jboss.ha.framework.server.lock.SharedLocalYieldingClusterLockManager;
+import org.jboss.ha.framework.server.lock.TimeoutException;
 import org.jboss.logging.Logger;
 import org.jboss.util.loading.ContextClassLoaderSwitcher;
 import org.jboss.web.tomcat.service.session.distributedcache.spi.BatchingManager;
@@ -92,6 +94,11 @@
       return Fqn.fromElements(SESSION, contextHostPath, sessionId);
    }
    
+   public static String getSessionLockName(String contextHostPath, String sessionId)
+   {
+      return contextHostPath + "_" + sessionId;
+   }
+   
    @Deprecated
    public static Fqn<String> getBuddyBackupSessionFqn(String dataOwner, String contextHostPath, String sessionId)
    {
@@ -352,36 +359,62 @@
       }
       
       Fqn<String> fqn = getSessionFqn(combinedPath_, realId);
-      Map<Object, Object> sessionData =  cacheWrapper_.getData(fqn, true);
       
-      if (sessionData == null) {
-         // Requested session is no longer in the cache; return null
-         return null;
-      }
-      
-      if (initialLoad)
+      boolean ourBatch = false;
+      boolean loadCompleted = false;
+      Map<Object, Object> sessionData = null;
+      try
       {
-         setupSessionRegion(fqn);
-      }
+         // We need batching so any data gravitation replication 
+         // is sent in batch.
+         // Don't do anything if there is already a batch
+         // associated with this thread.
+         ourBatch = ensureBatchInProgress();
+         
+         sessionData =  cacheWrapper_.getData(fqn, true);
       
-      IncomingDistributableSessionData dsd = null;
-      
-      try
-      {
-         dsd = getDistributableSessionData(realId, sessionData, true);
+         if (sessionData == null) {
+            // Requested session is no longer in the cache; return null
+            return null;
+         }
+         
+         if (initialLoad)
+         {
+            setupSessionRegion(fqn);
+         }
+         
+         IncomingDistributableSessionData dsd = null;
+         
+         try
+         {
+            dsd = getDistributableSessionData(realId, sessionData, true);
+         }
+         catch (Exception e)
+         {
+            String masked = Util.maskId(realId);
+            log_.warn("Problem accessing session data for session " + masked + " (" + 
+                  e.getClass().getName() + ") -- existing session cannot be used");
+            log_.debug("Details on problem accessing session data for " + masked, e);
+            // Clean up
+            removeSessionLocal(realId);
+            return null;
+         }
+         
+         loadCompleted = true;
+         
+         return dsd;
       }
       catch (Exception e)
       {
-         String masked = Util.maskId(realId);
-         log_.warn("Problem accessing session data for session " + masked + " (" + 
-               e.getClass().getName() + ") -- existing session cannot be used");
-         log_.debug("Details on problem accessing session data for " + masked, e);
-         // Clean up
-         removeSessionLocal(realId);
-         return null;
+         throw handleBatchException(ourBatch, realId, e);
       }
-      
-      return dsd;
+      finally
+      {
+         if (ourBatch)
+         {
+            finishBatch(realId, loadCompleted);
+         }
+      }
    }
 
    public void storeSessionData(T sessionData)
@@ -566,26 +599,45 @@
    
    public IncomingDistributableSessionData getSessionData(String realId, String dataOwner, boolean includeAttributes)
    {
-      Map<Object, Object> distributedCacheData = null;
-      if (dataOwner == null)
+      boolean ourBatch = false;
+      boolean loadCompleted = false;
+      try
       {
-         Fqn<String> fqn = getSessionFqn(combinedPath_, realId); 
-         distributedCacheData = cacheWrapper_.getData(fqn, false);
-      }
-      else
-      {
-         List<Fqn<Object>> fqns = getBuddyBackupSessionFqns(dataOwner, combinedPath_, realId);      
-         for (Fqn<Object> fqn : fqns)
+         ourBatch = ensureBatchInProgress();
+
+         Map<Object, Object> distributedCacheData = null;
+         if (dataOwner == null)
          {
+            Fqn<String> fqn = getSessionFqn(combinedPath_, realId); 
             distributedCacheData = cacheWrapper_.getData(fqn, false);
-            if (distributedCacheData != null && distributedCacheData.size() > 0)
+         }
+         else
+         {
+            List<Fqn<Object>> fqns = getBuddyBackupSessionFqns(dataOwner, combinedPath_, realId);      
+            for (Fqn<Object> fqn : fqns)
             {
-               break;
+               distributedCacheData = cacheWrapper_.getData(fqn, false);
+               if (distributedCacheData != null && distributedCacheData.size() > 0)
+               {
+                  break;
+               }
             }
          }
+         IncomingDistributableSessionData result = distributedCacheData == null ? null : getDistributableSessionData(realId, distributedCacheData, includeAttributes);
+         loadCompleted = true;
+         return result;
       }
-      
-      return distributedCacheData == null ? null : getDistributableSessionData(realId, distributedCacheData, includeAttributes);
+      catch (Exception e)
+      {         
+         throw handleBatchException(ourBatch, realId, e);
+      }
+      finally
+      {
+         if (ourBatch)
+         {
+            finishBatch(realId, loadCompleted);
+         }
+      } 
    }
 
    /**
@@ -714,6 +766,48 @@
     */
    protected abstract Map<String, Object> getSessionAttributes(String realId, Map<Object, Object> distributedCacheData);
    
+   public boolean getSupportsSessionOwnership()
+   {
+      return true;
+   }
+   
+   public LockResult acquireSessionOwnership(String realId, boolean newLock) throws TimeoutException, InterruptedException
+   {
+      String lockName = getSessionLockName(this.cacheConfigName_, realId);
+      SharedLocalYieldingClusterLockManager.LockResult result = getLockManager().lock(lockName, getLockTimeout(), newLock);
+      LockResult ourResult = null;
+      switch (result)
+      {
+         case ACQUIRED_FROM_CLUSTER:
+            ourResult = LockResult.ACQUIRED_FROM_CLUSTER;
+            break;
+         case ALREADY_HELD:
+            ourResult = LockResult.ALREADY_HELD;
+            break;
+         case NEW_LOCK:
+            ourResult = LockResult.NEW_LOCK;
+      }
+      return ourResult;
+   }
+
+   public void relinquishSessionOwnership(String realId, boolean remove)
+   {
+      String lockName = getSessionLockName(this.cacheConfigName_, realId);
+      getLockManager().unlock(lockName, remove);
+   }
+   
+   private SharedLocalYieldingClusterLockManager getLockManager()
+   {
+      // FIXME
+      throw new UnsupportedOperationException("implement me");
+   }
+
+   private long getLockTimeout()
+   {
+      // FIXME
+      return 30000;
+   }
+
    protected void releaseCacheToManager(String cacheConfigName)
    {      
       try
@@ -825,4 +919,55 @@
       }
    }
 
+   private boolean ensureBatchInProgress() throws Exception
+   {
+      boolean newBatch = false;
+      if (batchingManager.isBatchInProgress() == false)
+      {
+         batchingManager.startBatch();
+         newBatch = true;
+      }
+      return newBatch;
+   }
+
+   private RuntimeException handleBatchException(boolean ourBatch, String realId, Exception e)
+   {
+      try
+      {
+//          if(ourBatch)
+         // Let's set it no matter what.
+         batchingManager.setBatchRollbackOnly();
+      }
+      catch (Exception exn)
+      {
+         log_.error("Caught exception rolling back transaction", exn);
+      }
+      
+      return Util.getRuntimeException("Failed to load session " + Util.maskId(realId), e);
+   }
+
+   private void finishBatch(String realId, boolean loadCompleted)
+   {
+      try
+      {
+         batchingManager.endBatch();
+      }
+      catch (Exception e)
+      {
+         if (loadCompleted)
+         {
+            // We read the data successfully but then failed in commit?
+            // That indicates a JBC data gravitation where the replication of
+            // the gravitated data to our buddy failed. We can ignore that
+            // and count on this request updating the cache.                               // 
+            log_.warn("Problem ending batch after loading session " + Util.maskId(realId) + " -- " + e.getLocalizedMessage() + " However session data was successful loaded.");
+            log_.debug("Failure cause", e);
+         }
+         else
+         {
+            throw Util.getRuntimeException("Failed to load session " + Util.maskId(realId), e);
+         }
+      }
+   }
+
 }

Added: projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/SessionOwnershipCache.java
===================================================================
--- projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/SessionOwnershipCache.java	                        (rev 0)
+++ projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/SessionOwnershipCache.java	2010-05-28 22:59:28 UTC (rev 105346)
@@ -0,0 +1,57 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.web.tomcat.service.session.distributedcache.impl.jbc;
+
+import org.jboss.cache.Cache;
+import org.jboss.ha.framework.server.lock.SharedLocalYieldingClusterLockManager;
+
+/**
+ *
+ *
+ * @author Brian Stansberry
+ * 
+ * @version $Revision$
+ */
+public class SessionOwnershipCache
+{
+   private final Cache<?, ?> cache;
+   private final SharedLocalYieldingClusterLockManager lockManager;
+   
+   public SessionOwnershipCache(Cache<?, ?> cache, SharedLocalYieldingClusterLockManager lockManager)
+   {
+      this.cache = cache;
+      this.lockManager = lockManager;
+   }
+
+   public Cache<?, ?> getCache()
+   {
+      return cache;
+   }
+
+   public SharedLocalYieldingClusterLockManager getLockManager()
+   {
+      return lockManager;
+   }
+   
+   
+}

Added: projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/SessionOwnershipCacheManager.java
===================================================================
--- projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/SessionOwnershipCacheManager.java	                        (rev 0)
+++ projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/SessionOwnershipCacheManager.java	2010-05-28 22:59:28 UTC (rev 105346)
@@ -0,0 +1,158 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.web.tomcat.service.session.distributedcache.impl.jbc;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.jboss.cache.Cache;
+import org.jboss.cache.CacheManager;
+import org.jboss.cache.CacheStatus;
+import org.jboss.cache.config.RuntimeConfig;
+import org.jboss.cache.pojo.PojoCache;
+import org.jboss.ha.cachemanager.MuxHandlerChannel;
+import org.jboss.ha.framework.interfaces.GroupMembershipNotifier;
+import org.jboss.ha.framework.interfaces.GroupRpcDispatcher;
+import org.jboss.ha.framework.server.PojoCacheManager;
+import org.jboss.ha.framework.server.lock.SharedLocalYieldingClusterLockManager;
+import org.jgroups.Channel;
+import org.jgroups.ChannelFactory;
+import org.jgroups.blocks.mux.Muxer;
+
+/**
+ *
+ *
+ * @author Brian Stansberry
+ * 
+ * @version $Revision$
+ */
+public class SessionOwnershipCacheManager
+{
+   private final CacheManager plainCacheManager;
+   private final PojoCacheManager pojoCacheManager;
+   
+   private final Map<String, SessionOwnershipCache> managers = new ConcurrentHashMap<String, SessionOwnershipCache>();
+   
+   public SessionOwnershipCacheManager(CacheManager plainCacheManager, PojoCacheManager pojoCacheManager)
+   {
+      this.plainCacheManager = plainCacheManager;
+      this.pojoCacheManager = pojoCacheManager;
+   }
+   
+   public synchronized SessionOwnershipCache getSessionOwnershipCache(String configName) throws Exception
+   {
+      SessionOwnershipCache soc = managers.get(configName);
+      if (soc == null)
+      {
+         Cache<?, ?> plainCache = plainCacheManager.getCache(configName, false);
+         SharedLocalYieldingClusterLockManager lockManager = configureCache(null, plainCache, configName);
+         soc = new SessionOwnershipCache(plainCache, lockManager);
+         managers.put(configName, soc);
+      }
+      return soc;
+   }
+
+   public synchronized SessionOwnershipPojoCache getSessionOwnershipPojoCache(String configName) throws Exception
+   {
+      SessionOwnershipCache soc = managers.get(configName);
+      if (!(soc instanceof SessionOwnershipPojoCache))
+      {
+         PojoCache pojoCache = pojoCacheManager.getPojoCache(configName, false);
+         SharedLocalYieldingClusterLockManager lockManager = configureCache(pojoCache, pojoCache.getCache(), configName);
+         soc = new SessionOwnershipPojoCache(pojoCache, lockManager);
+         managers.put(configName, soc);
+         pojoCache.start();
+         
+      }
+      return (SessionOwnershipPojoCache) soc;
+   }
+   
+   private SharedLocalYieldingClusterLockManager configureCache(PojoCache pojoCache, Cache<?, ?> cache, String configName) throws Exception
+   {
+      RuntimeConfig rc = cache.getConfiguration().getRuntimeConfig();
+      Channel channel = rc.getChannel();
+      if (cache.getCacheStatus() == CacheStatus.STARTED)
+      {
+         if (channel == null)
+         {
+            // TODO create and use a Channel just for locking???
+            throw new IllegalStateException("Cache is started but has no Channel");
+         }
+      }
+      else {
+         if (channel == null)      
+         {
+            ChannelFactory cf = rc.getMuxChannelFactory();
+            if (cf == null)
+            {
+               throw new IllegalStateException("Cache configuration " + configName + " does not have a ChannelFactory injected");
+            }
+            String stack = cache.getConfiguration().getMuxStackName();
+            if (stack == null)
+            {
+               throw new IllegalStateException("Cache configuration " + configName + " does not have a MuxStackName configured");
+            }
+            channel = cf.createChannel(stack);
+            rc.setChannel(new MuxHandlerChannel(channel));
+         }
+         else if (channel.getUpHandler() == null)
+         {
+            // replace
+            rc.setChannel(new MuxHandlerChannel(channel));
+         }
+         // else the Channel was injected and already had a handler -- shouldn't happen
+         
+         // Start the Cache so it injects it's UpHandler into MuxHandlerChannel
+         if (pojoCache != null)
+         {
+            pojoCache.start();            
+         }
+         else if (cache != null)
+         {
+            if (cache.getCacheStatus() != CacheStatus.CREATED)
+            {
+               cache.create();
+            }
+            cache.start();
+         }
+      }
+      
+      // At this point the Channel should have a Muxer as its UpHandler, either because
+      // it came that way or we used MuxHandlerChannel to set it
+      if (!(channel.getUpHandler() instanceof Muxer<?>))
+      {
+         throw new IllegalStateException("Cache " + configName + " has a Channel that does not use a Muxer as its UpHandler");
+      }
+      
+      // TODO create the GroupRpcDispatcher/GroupMembershipNotifier
+      GroupRpcDispatcher dispatcher = createGroupRpcDispatcher();
+      GroupMembershipNotifier notifier = (GroupMembershipNotifier) dispatcher;
+      
+      return new SharedLocalYieldingClusterLockManager("HTTPSESSIONOWNER", dispatcher, notifier);
+   }
+
+   private GroupRpcDispatcher createGroupRpcDispatcher()
+   {
+      throw new UnsupportedOperationException("Implement me");
+   }
+}

Added: projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/SessionOwnershipPojoCache.java
===================================================================
--- projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/SessionOwnershipPojoCache.java	                        (rev 0)
+++ projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/SessionOwnershipPojoCache.java	2010-05-28 22:59:28 UTC (rev 105346)
@@ -0,0 +1,49 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.web.tomcat.service.session.distributedcache.impl.jbc;
+
+import org.jboss.cache.pojo.PojoCache;
+import org.jboss.ha.framework.server.lock.SharedLocalYieldingClusterLockManager;
+
+/**
+ *
+ *
+ * @author Brian Stansberry
+ * 
+ * @version $Revision$
+ */
+public class SessionOwnershipPojoCache extends SessionOwnershipCache
+{
+   private final PojoCache pojoCache;
+   
+   public SessionOwnershipPojoCache(PojoCache cache, SharedLocalYieldingClusterLockManager lockManager)
+   {
+      super(cache.getCache(), lockManager);
+      this.pojoCache = cache;
+   }
+
+   public PojoCache getPojoCache()
+   {
+      return pojoCache;
+   }
+}

Modified: projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/Util.java
===================================================================
--- projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/Util.java	2010-05-28 22:58:38 UTC (rev 105345)
+++ projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/Util.java	2010-05-28 22:59:28 UTC (rev 105346)
@@ -281,6 +281,26 @@
    }
    
    /**
+    * Returns either an <code>e</code> or a wrapper RuntimeException, depending on the type
+    * of <code>e</code>. If <code>e</code> is an Ea RuntimeException, it
+    * is simply returned, otherwise <code>e</code> is wrapped in a RuntimeException
+    * which is returned.
+    * 
+    * @param wrapperMsg an optional message for the wrapper RuntimeException, if
+    *                   one is needed. If not provided, the classname of <code>e</code>
+    *                   and it's localized message is used
+    * @param e the underlying exception
+    */
+   public static RuntimeException getRuntimeException(String wrapperMsg, Exception e)
+   {   
+      if (e instanceof RuntimeException)
+         return (RuntimeException) e;
+      
+      wrapperMsg = (wrapperMsg != null) ? wrapperMsg : e.getClass().getName() + " -- " + e.getLocalizedMessage();
+      throw new RuntimeException(wrapperMsg, e);
+   }
+   
+   /**
     * Prevent instantiation.
     */
    private Util() {}




More information about the jboss-cvs-commits mailing list