[jboss-cvs] JBossAS SVN: r105346 - in projects/cluster/ha-server-cache-jbc/branches/Session_Ownership: src/main/java/org/jboss/ha/cachemanager and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri May 28 18:59:29 EDT 2010
Author: bstansberry at jboss.com
Date: 2010-05-28 18:59:28 -0400 (Fri, 28 May 2010)
New Revision: 105346
Added:
projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/ha/cachemanager/MuxHandlerChannel.java
projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/SessionOwnershipCache.java
projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/SessionOwnershipCacheManager.java
projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/SessionOwnershipPojoCache.java
Modified:
projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/pom.xml
projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/AbstractJBossCacheService.java
projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/Util.java
Log:
First cut on integrating session ownership
Modified: projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/pom.xml
===================================================================
--- projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/pom.xml 2010-05-28 22:58:38 UTC (rev 105345)
+++ projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/pom.xml 2010-05-28 22:59:28 UTC (rev 105346)
@@ -32,6 +32,7 @@
<properties>
<version.jboss.ha.server.cache.spi>0.0.1-SNAPSHOT</version.jboss.ha.server.cache.spi>
<version.jboss.ha.server.api>2.0.0-SNAPSHOT</version.jboss.ha.server.api>
+ <version.jboss.ha.server.core>1.0.0-SNAPSHOT</version.jboss.ha.server.core>
<version.jboss.common.core>2.2.17.GA</version.jboss.common.core>
<version.jboss.logging.spi>2.0.5.GA</version.jboss.logging.spi>
<version.jboss.cache>3.2.5.GA</version.jboss.cache>
@@ -105,6 +106,12 @@
</dependency>
<dependency>
+ <groupId>org.jboss.cluster</groupId>
+ <artifactId>jboss-ha-server-core</artifactId>
+ <version>${version.jboss.ha.server.core}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.jboss</groupId>
<artifactId>jboss-common-core</artifactId>
<version>${version.jboss.common.core}</version>
Added: projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/ha/cachemanager/MuxHandlerChannel.java
===================================================================
--- projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/ha/cachemanager/MuxHandlerChannel.java (rev 0)
+++ projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/ha/cachemanager/MuxHandlerChannel.java 2010-05-28 22:59:28 UTC (rev 105346)
@@ -0,0 +1,419 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.ha.cachemanager;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+
+import org.jboss.ha.core.framework.server.MuxUpHandler;
+import org.jgroups.Address;
+import org.jgroups.Channel;
+import org.jgroups.ChannelClosedException;
+import org.jgroups.ChannelException;
+import org.jgroups.ChannelListener;
+import org.jgroups.ChannelNotConnectedException;
+import org.jgroups.Event;
+import org.jgroups.Message;
+import org.jgroups.Receiver;
+import org.jgroups.TimeoutException;
+import org.jgroups.UpHandler;
+import org.jgroups.View;
+import org.jgroups.logging.Log;
+import org.jgroups.logging.LogFactory;
+import org.jgroups.stack.ProtocolStack;
+
+/**
+ *
+ *
+ * @author Brian Stansberry
+ *
+ * @version $Revision$
+ */
+public class MuxHandlerChannel extends Channel
+{
+ private final Log log=LogFactory.getLog(MuxHandlerChannel.class);
+ private final Channel delegate;
+ private MuxUpHandler upHandler;
+
+ public MuxHandlerChannel(Channel delegate)
+ {
+ this.delegate = delegate;
+ UpHandler uh = delegate.getUpHandler();
+ if (uh == null)
+ {
+ this.upHandler = new MuxUpHandler();
+ this.delegate.setUpHandler(upHandler);
+ }
+ else if (uh instanceof MuxUpHandler)
+ {
+ this.upHandler = (MuxUpHandler) uh;
+ }
+ else
+ {
+ throw new IllegalStateException("delegate Channel already has a non-Mux UpHandler");
+ }
+ }
+
+ @Override
+ public void blockOk()
+ {
+ delegate.blockOk();
+ }
+
+ @Override
+ public void close()
+ {
+ delegate.close();
+ }
+
+ @Override
+ public void connect(String clusterName) throws ChannelException
+ {
+ delegate.connect(clusterName);
+ }
+
+ @Override
+ public void connect(String clusterName, Address target, String stateId, long timeout) throws ChannelException
+ {
+ delegate.connect(clusterName, target, stateId, timeout);
+ }
+
+ @Override
+ public void disconnect()
+ {
+ delegate.disconnect();
+ }
+
+ @Override
+ public Map<String, Object> dumpStats()
+ {
+ return delegate.dumpStats();
+ }
+
+ @Override
+ public boolean flushSupported()
+ {
+ return delegate.flushSupported();
+ }
+
+ @Override
+ public Address getAddress()
+ {
+ return delegate.getAddress();
+ }
+
+ @Override
+ public boolean getAllStates(Vector targets, long timeout) throws ChannelNotConnectedException,
+ ChannelClosedException
+ {
+ return delegate.getAllStates(targets, timeout);
+ }
+
+ @Override
+ public String getChannelName()
+ {
+ return delegate.getChannelName();
+ }
+
+ @Override
+ public String getClusterName()
+ {
+ return delegate.getClusterName();
+ }
+
+ @Override
+ public Map<String, Object> getInfo()
+ {
+ return delegate.getInfo();
+ }
+
+ @Override
+ public Address getLocalAddress()
+ {
+ return delegate.getLocalAddress();
+ }
+
+ @Override
+ protected Log getLog()
+ {
+ return log;
+ }
+
+ @Override
+ public String getName()
+ {
+ return delegate.getName();
+ }
+
+ @Override
+ public String getName(Address member)
+ {
+ return delegate.getName(member);
+ }
+
+ @Override
+ public Object getOpt(int option)
+ {
+ return delegate.getOpt(option);
+ }
+
+ @Override
+ public ProtocolStack getProtocolStack()
+ {
+ return delegate.getProtocolStack();
+ }
+
+ @Override
+ public boolean getState(Address target, long timeout) throws ChannelNotConnectedException, ChannelClosedException
+ {
+ return delegate.getState(target, timeout);
+ }
+
+ @Override
+ public boolean getState(Address target, String stateId, long timeout) throws ChannelNotConnectedException,
+ ChannelClosedException
+ {
+ return delegate.getState(target, stateId, timeout);
+ }
+
+ @Override
+ public View getView()
+ {
+ return delegate.getView();
+ }
+
+ @Override
+ public boolean isConnected()
+ {
+ return delegate.isConnected();
+ }
+
+ @Override
+ public boolean isOpen()
+ {
+ return delegate.isOpen();
+ }
+
+ @Override
+ public Object peek(long timeout) throws ChannelNotConnectedException, ChannelClosedException, TimeoutException
+ {
+ return delegate.peek(timeout);
+ }
+
+ @Override
+ public Object receive(long timeout) throws ChannelNotConnectedException, ChannelClosedException, TimeoutException
+ {
+ return delegate.receive(timeout);
+ }
+
+ @Override
+ public void returnState(byte[] state)
+ {
+ delegate.returnState(state);
+ }
+
+ @Override
+ public void returnState(byte[] state, String stateId)
+ {
+ delegate.returnState(state, stateId);
+ }
+
+ @Override
+ public void send(Message msg) throws ChannelNotConnectedException, ChannelClosedException
+ {
+ delegate.send(msg);
+ }
+
+ @Override
+ public void send(Address dst, Address src, Serializable obj) throws ChannelNotConnectedException,
+ ChannelClosedException
+ {
+ delegate.send(dst, src, obj);
+ }
+
+ @Override
+ public void send(Address dst, Address src, byte[] buf) throws ChannelNotConnectedException, ChannelClosedException
+ {
+ delegate.send(dst, src, buf);
+ }
+
+ @Override
+ public void send(Address dst, Address src, byte[] buf, int offset, int length) throws ChannelNotConnectedException,
+ ChannelClosedException
+ {
+ delegate.send(dst, src, buf, offset, length);
+ }
+
+ @Override
+ public void setInfo(String key, Object value)
+ {
+ delegate.setInfo(key, value);
+ }
+
+ @Override
+ public void setName(String name)
+ {
+ delegate.setName(name);
+ }
+
+ @Override
+ public void setOpt(int option, Object value)
+ {
+ delegate.setOpt(option, value);
+ }
+
+ @Override
+ public void shutdown()
+ {
+ delegate.shutdown();
+ }
+
+ @Override
+ public boolean startFlush(boolean automaticResume)
+ {
+ return delegate.startFlush(automaticResume);
+ }
+
+ @Override
+ public boolean startFlush(List<Address> flushParticipants, boolean automaticResume)
+ {
+ return delegate.startFlush(flushParticipants, automaticResume);
+ }
+
+ @Override
+ public boolean startFlush(long timeout, boolean automaticResume)
+ {
+ return delegate.startFlush(timeout, automaticResume);
+ }
+
+ @Override
+ public void stopFlush()
+ {
+ delegate.stopFlush();
+ }
+
+ @Override
+ public void stopFlush(List<Address> flushParticipants)
+ {
+ delegate.stopFlush(flushParticipants);
+ }
+
+ @Override
+ public synchronized void addChannelListener(ChannelListener listener)
+ {
+ delegate.addChannelListener(listener);
+ }
+
+ @Override
+ public synchronized void clearChannelListeners()
+ {
+ delegate.clearChannelListeners();
+ }
+
+ @Override
+ public void down(Event evt)
+ {
+ delegate.down(evt);
+ }
+
+ @Override
+ public Object downcall(Event evt)
+ {
+ return delegate.downcall(evt);
+ }
+
+ @Override
+ public String dumpQueue()
+ {
+ return delegate.dumpQueue();
+ }
+
+ @Override
+ public int getNumMessages()
+ {
+ return delegate.getNumMessages();
+ }
+
+ @Override
+ public String getProperties()
+ {
+ return delegate.getProperties();
+ }
+
+ @Override
+ public Receiver getReceiver()
+ {
+ return delegate.getReceiver();
+ }
+
+ @Override
+ public UpHandler getUpHandler()
+ {
+ // We hide the upHandler from callers so they will set it. Yuck!
+ return null;
+ }
+
+ @Override
+ public void open() throws ChannelException
+ {
+ delegate.open();
+ }
+
+ @Override
+ public synchronized void removeChannelListener(ChannelListener listener)
+ {
+ delegate.removeChannelListener(listener);
+ }
+
+ @Override
+ public void setChannelListener(ChannelListener channelListener)
+ {
+ delegate.setChannelListener(channelListener);
+ }
+
+ @Override
+ public void setReceiver(Receiver r)
+ {
+ delegate.setReceiver(r);
+ }
+
+ @Override
+ public void setUpHandler(UpHandler upHandler)
+ {
+ if (this.upHandler != null)
+ {
+ throw new IllegalStateException("UpHandler already set");
+ }
+ this.upHandler = new MuxUpHandler(upHandler);
+ delegate.setUpHandler(this.upHandler);
+ }
+
+ public MuxUpHandler getMuxUpHandler()
+ {
+ return upHandler;
+ }
+
+
+}
Modified: projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/AbstractJBossCacheService.java
===================================================================
--- projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/AbstractJBossCacheService.java 2010-05-28 22:58:38 UTC (rev 105345)
+++ projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/AbstractJBossCacheService.java 2010-05-28 22:59:28 UTC (rev 105346)
@@ -49,6 +49,8 @@
import org.jboss.cache.transaction.BatchModeTransactionManager;
import org.jboss.ha.framework.server.MarshalledValueHelper;
import org.jboss.ha.framework.server.SimpleCachableMarshalledValue;
+import org.jboss.ha.framework.server.lock.SharedLocalYieldingClusterLockManager;
+import org.jboss.ha.framework.server.lock.TimeoutException;
import org.jboss.logging.Logger;
import org.jboss.util.loading.ContextClassLoaderSwitcher;
import org.jboss.web.tomcat.service.session.distributedcache.spi.BatchingManager;
@@ -92,6 +94,11 @@
return Fqn.fromElements(SESSION, contextHostPath, sessionId);
}
+ public static String getSessionLockName(String contextHostPath, String sessionId)
+ {
+ return contextHostPath + "_" + sessionId;
+ }
+
@Deprecated
public static Fqn<String> getBuddyBackupSessionFqn(String dataOwner, String contextHostPath, String sessionId)
{
@@ -352,36 +359,62 @@
}
Fqn<String> fqn = getSessionFqn(combinedPath_, realId);
- Map<Object, Object> sessionData = cacheWrapper_.getData(fqn, true);
- if (sessionData == null) {
- // Requested session is no longer in the cache; return null
- return null;
- }
-
- if (initialLoad)
+ boolean ourBatch = false;
+ boolean loadCompleted = false;
+ Map<Object, Object> sessionData = null;
+ try
{
- setupSessionRegion(fqn);
- }
+ // We need batching so any data gravitation replication
+ // is sent in batch.
+ // Don't do anything if there is already a batch
+ // associated with this thread.
+ ourBatch = ensureBatchInProgress();
+
+ sessionData = cacheWrapper_.getData(fqn, true);
- IncomingDistributableSessionData dsd = null;
-
- try
- {
- dsd = getDistributableSessionData(realId, sessionData, true);
+ if (sessionData == null) {
+ // Requested session is no longer in the cache; return null
+ return null;
+ }
+
+ if (initialLoad)
+ {
+ setupSessionRegion(fqn);
+ }
+
+ IncomingDistributableSessionData dsd = null;
+
+ try
+ {
+ dsd = getDistributableSessionData(realId, sessionData, true);
+ }
+ catch (Exception e)
+ {
+ String masked = Util.maskId(realId);
+ log_.warn("Problem accessing session data for session " + masked + " (" +
+ e.getClass().getName() + ") -- existing session cannot be used");
+ log_.debug("Details on problem accessing session data for " + masked, e);
+ // Clean up
+ removeSessionLocal(realId);
+ return null;
+ }
+
+ loadCompleted = true;
+
+ return dsd;
}
catch (Exception e)
{
- String masked = Util.maskId(realId);
- log_.warn("Problem accessing session data for session " + masked + " (" +
- e.getClass().getName() + ") -- existing session cannot be used");
- log_.debug("Details on problem accessing session data for " + masked, e);
- // Clean up
- removeSessionLocal(realId);
- return null;
+ throw handleBatchException(ourBatch, realId, e);
}
-
- return dsd;
+ finally
+ {
+ if (ourBatch)
+ {
+ finishBatch(realId, loadCompleted);
+ }
+ }
}
public void storeSessionData(T sessionData)
@@ -566,26 +599,45 @@
public IncomingDistributableSessionData getSessionData(String realId, String dataOwner, boolean includeAttributes)
{
- Map<Object, Object> distributedCacheData = null;
- if (dataOwner == null)
+ boolean ourBatch = false;
+ boolean loadCompleted = false;
+ try
{
- Fqn<String> fqn = getSessionFqn(combinedPath_, realId);
- distributedCacheData = cacheWrapper_.getData(fqn, false);
- }
- else
- {
- List<Fqn<Object>> fqns = getBuddyBackupSessionFqns(dataOwner, combinedPath_, realId);
- for (Fqn<Object> fqn : fqns)
+ ourBatch = ensureBatchInProgress();
+
+ Map<Object, Object> distributedCacheData = null;
+ if (dataOwner == null)
{
+ Fqn<String> fqn = getSessionFqn(combinedPath_, realId);
distributedCacheData = cacheWrapper_.getData(fqn, false);
- if (distributedCacheData != null && distributedCacheData.size() > 0)
+ }
+ else
+ {
+ List<Fqn<Object>> fqns = getBuddyBackupSessionFqns(dataOwner, combinedPath_, realId);
+ for (Fqn<Object> fqn : fqns)
{
- break;
+ distributedCacheData = cacheWrapper_.getData(fqn, false);
+ if (distributedCacheData != null && distributedCacheData.size() > 0)
+ {
+ break;
+ }
}
}
+ IncomingDistributableSessionData result = distributedCacheData == null ? null : getDistributableSessionData(realId, distributedCacheData, includeAttributes);
+ loadCompleted = true;
+ return result;
}
-
- return distributedCacheData == null ? null : getDistributableSessionData(realId, distributedCacheData, includeAttributes);
+ catch (Exception e)
+ {
+ throw handleBatchException(ourBatch, realId, e);
+ }
+ finally
+ {
+ if (ourBatch)
+ {
+ finishBatch(realId, loadCompleted);
+ }
+ }
}
/**
@@ -714,6 +766,48 @@
*/
protected abstract Map<String, Object> getSessionAttributes(String realId, Map<Object, Object> distributedCacheData);
+ public boolean getSupportsSessionOwnership()
+ {
+ return true;
+ }
+
+ public LockResult acquireSessionOwnership(String realId, boolean newLock) throws TimeoutException, InterruptedException
+ {
+ String lockName = getSessionLockName(this.cacheConfigName_, realId);
+ SharedLocalYieldingClusterLockManager.LockResult result = getLockManager().lock(lockName, getLockTimeout(), newLock);
+ LockResult ourResult = null;
+ switch (result)
+ {
+ case ACQUIRED_FROM_CLUSTER:
+ ourResult = LockResult.ACQUIRED_FROM_CLUSTER;
+ break;
+ case ALREADY_HELD:
+ ourResult = LockResult.ALREADY_HELD;
+ break;
+ case NEW_LOCK:
+ ourResult = LockResult.NEW_LOCK;
+ }
+ return ourResult;
+ }
+
+ public void relinquishSessionOwnership(String realId, boolean remove)
+ {
+ String lockName = getSessionLockName(this.cacheConfigName_, realId);
+ getLockManager().unlock(lockName, remove);
+ }
+
+ private SharedLocalYieldingClusterLockManager getLockManager()
+ {
+ // FIXME
+ throw new UnsupportedOperationException("implement me");
+ }
+
+ private long getLockTimeout()
+ {
+ // FIXME
+ return 30000;
+ }
+
protected void releaseCacheToManager(String cacheConfigName)
{
try
@@ -825,4 +919,55 @@
}
}
+ private boolean ensureBatchInProgress() throws Exception
+ {
+ boolean newBatch = false;
+ if (batchingManager.isBatchInProgress() == false)
+ {
+ batchingManager.startBatch();
+ newBatch = true;
+ }
+ return newBatch;
+ }
+
+ private RuntimeException handleBatchException(boolean ourBatch, String realId, Exception e)
+ {
+ try
+ {
+// if(ourBatch)
+ // Let's set it no matter what.
+ batchingManager.setBatchRollbackOnly();
+ }
+ catch (Exception exn)
+ {
+ log_.error("Caught exception rolling back transaction", exn);
+ }
+
+ return Util.getRuntimeException("Failed to load session " + Util.maskId(realId), e);
+ }
+
+ private void finishBatch(String realId, boolean loadCompleted)
+ {
+ try
+ {
+ batchingManager.endBatch();
+ }
+ catch (Exception e)
+ {
+ if (loadCompleted)
+ {
+ // We read the data successfully but then failed in commit?
+ // That indicates a JBC data gravitation where the replication of
+ // the gravitated data to our buddy failed. We can ignore that
+ // and count on this request updating the cache. //
+ log_.warn("Problem ending batch after loading session " + Util.maskId(realId) + " -- " + e.getLocalizedMessage() + " However session data was successful loaded.");
+ log_.debug("Failure cause", e);
+ }
+ else
+ {
+ throw Util.getRuntimeException("Failed to load session " + Util.maskId(realId), e);
+ }
+ }
+ }
+
}
Added: projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/SessionOwnershipCache.java
===================================================================
--- projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/SessionOwnershipCache.java (rev 0)
+++ projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/SessionOwnershipCache.java 2010-05-28 22:59:28 UTC (rev 105346)
@@ -0,0 +1,57 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.web.tomcat.service.session.distributedcache.impl.jbc;
+
+import org.jboss.cache.Cache;
+import org.jboss.ha.framework.server.lock.SharedLocalYieldingClusterLockManager;
+
+/**
+ *
+ *
+ * @author Brian Stansberry
+ *
+ * @version $Revision$
+ */
+public class SessionOwnershipCache
+{
+ private final Cache<?, ?> cache;
+ private final SharedLocalYieldingClusterLockManager lockManager;
+
+ public SessionOwnershipCache(Cache<?, ?> cache, SharedLocalYieldingClusterLockManager lockManager)
+ {
+ this.cache = cache;
+ this.lockManager = lockManager;
+ }
+
+ public Cache<?, ?> getCache()
+ {
+ return cache;
+ }
+
+ public SharedLocalYieldingClusterLockManager getLockManager()
+ {
+ return lockManager;
+ }
+
+
+}
Added: projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/SessionOwnershipCacheManager.java
===================================================================
--- projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/SessionOwnershipCacheManager.java (rev 0)
+++ projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/SessionOwnershipCacheManager.java 2010-05-28 22:59:28 UTC (rev 105346)
@@ -0,0 +1,158 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.web.tomcat.service.session.distributedcache.impl.jbc;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.jboss.cache.Cache;
+import org.jboss.cache.CacheManager;
+import org.jboss.cache.CacheStatus;
+import org.jboss.cache.config.RuntimeConfig;
+import org.jboss.cache.pojo.PojoCache;
+import org.jboss.ha.cachemanager.MuxHandlerChannel;
+import org.jboss.ha.framework.interfaces.GroupMembershipNotifier;
+import org.jboss.ha.framework.interfaces.GroupRpcDispatcher;
+import org.jboss.ha.framework.server.PojoCacheManager;
+import org.jboss.ha.framework.server.lock.SharedLocalYieldingClusterLockManager;
+import org.jgroups.Channel;
+import org.jgroups.ChannelFactory;
+import org.jgroups.blocks.mux.Muxer;
+
+/**
+ *
+ *
+ * @author Brian Stansberry
+ *
+ * @version $Revision$
+ */
+public class SessionOwnershipCacheManager
+{
+ private final CacheManager plainCacheManager;
+ private final PojoCacheManager pojoCacheManager;
+
+ private final Map<String, SessionOwnershipCache> managers = new ConcurrentHashMap<String, SessionOwnershipCache>();
+
+ public SessionOwnershipCacheManager(CacheManager plainCacheManager, PojoCacheManager pojoCacheManager)
+ {
+ this.plainCacheManager = plainCacheManager;
+ this.pojoCacheManager = pojoCacheManager;
+ }
+
+ public synchronized SessionOwnershipCache getSessionOwnershipCache(String configName) throws Exception
+ {
+ SessionOwnershipCache soc = managers.get(configName);
+ if (soc == null)
+ {
+ Cache<?, ?> plainCache = plainCacheManager.getCache(configName, false);
+ SharedLocalYieldingClusterLockManager lockManager = configureCache(null, plainCache, configName);
+ soc = new SessionOwnershipCache(plainCache, lockManager);
+ managers.put(configName, soc);
+ }
+ return soc;
+ }
+
+ public synchronized SessionOwnershipPojoCache getSessionOwnershipPojoCache(String configName) throws Exception
+ {
+ SessionOwnershipCache soc = managers.get(configName);
+ if (!(soc instanceof SessionOwnershipPojoCache))
+ {
+ PojoCache pojoCache = pojoCacheManager.getPojoCache(configName, false);
+ SharedLocalYieldingClusterLockManager lockManager = configureCache(pojoCache, pojoCache.getCache(), configName);
+ soc = new SessionOwnershipPojoCache(pojoCache, lockManager);
+ managers.put(configName, soc);
+ pojoCache.start();
+
+ }
+ return (SessionOwnershipPojoCache) soc;
+ }
+
+ private SharedLocalYieldingClusterLockManager configureCache(PojoCache pojoCache, Cache<?, ?> cache, String configName) throws Exception
+ {
+ RuntimeConfig rc = cache.getConfiguration().getRuntimeConfig();
+ Channel channel = rc.getChannel();
+ if (cache.getCacheStatus() == CacheStatus.STARTED)
+ {
+ if (channel == null)
+ {
+ // TODO create and use a Channel just for locking???
+ throw new IllegalStateException("Cache is started but has no Channel");
+ }
+ }
+ else {
+ if (channel == null)
+ {
+ ChannelFactory cf = rc.getMuxChannelFactory();
+ if (cf == null)
+ {
+ throw new IllegalStateException("Cache configuration " + configName + " does not have a ChannelFactory injected");
+ }
+ String stack = cache.getConfiguration().getMuxStackName();
+ if (stack == null)
+ {
+ throw new IllegalStateException("Cache configuration " + configName + " does not have a MuxStackName configured");
+ }
+ channel = cf.createChannel(stack);
+ rc.setChannel(new MuxHandlerChannel(channel));
+ }
+ else if (channel.getUpHandler() == null)
+ {
+ // replace
+ rc.setChannel(new MuxHandlerChannel(channel));
+ }
+ // else the Channel was injected and already had a handler -- shouldn't happen
+
+ // Start the Cache so it injects it's UpHandler into MuxHandlerChannel
+ if (pojoCache != null)
+ {
+ pojoCache.start();
+ }
+ else if (cache != null)
+ {
+ if (cache.getCacheStatus() != CacheStatus.CREATED)
+ {
+ cache.create();
+ }
+ cache.start();
+ }
+ }
+
+ // At this point the Channel should have a Muxer as its UpHandler, either because
+ // it came that way or we used MuxHandlerChannel to set it
+ if (!(channel.getUpHandler() instanceof Muxer<?>))
+ {
+ throw new IllegalStateException("Cache " + configName + " has a Channel that does not use a Muxer as its UpHandler");
+ }
+
+ // TODO create the GroupRpcDispatcher/GroupMembershipNotifier
+ GroupRpcDispatcher dispatcher = createGroupRpcDispatcher();
+ GroupMembershipNotifier notifier = (GroupMembershipNotifier) dispatcher;
+
+ return new SharedLocalYieldingClusterLockManager("HTTPSESSIONOWNER", dispatcher, notifier);
+ }
+
+ private GroupRpcDispatcher createGroupRpcDispatcher()
+ {
+ throw new UnsupportedOperationException("Implement me");
+ }
+}
Added: projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/SessionOwnershipPojoCache.java
===================================================================
--- projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/SessionOwnershipPojoCache.java (rev 0)
+++ projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/SessionOwnershipPojoCache.java 2010-05-28 22:59:28 UTC (rev 105346)
@@ -0,0 +1,49 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.web.tomcat.service.session.distributedcache.impl.jbc;
+
+import org.jboss.cache.pojo.PojoCache;
+import org.jboss.ha.framework.server.lock.SharedLocalYieldingClusterLockManager;
+
+/**
+ *
+ *
+ * @author Brian Stansberry
+ *
+ * @version $Revision$
+ */
+public class SessionOwnershipPojoCache extends SessionOwnershipCache
+{
+ private final PojoCache pojoCache;
+
+ public SessionOwnershipPojoCache(PojoCache cache, SharedLocalYieldingClusterLockManager lockManager)
+ {
+ super(cache.getCache(), lockManager);
+ this.pojoCache = cache;
+ }
+
+ public PojoCache getPojoCache()
+ {
+ return pojoCache;
+ }
+}
Modified: projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/Util.java
===================================================================
--- projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/Util.java 2010-05-28 22:58:38 UTC (rev 105345)
+++ projects/cluster/ha-server-cache-jbc/branches/Session_Ownership/src/main/java/org/jboss/web/tomcat/service/session/distributedcache/impl/jbc/Util.java 2010-05-28 22:59:28 UTC (rev 105346)
@@ -281,6 +281,26 @@
}
/**
+ * Returns either an <code>e</code> or a wrapper RuntimeException, depending on the type
+ * of <code>e</code>. If <code>e</code> is an Ea RuntimeException, it
+ * is simply returned, otherwise <code>e</code> is wrapped in a RuntimeException
+ * which is returned.
+ *
+ * @param wrapperMsg an optional message for the wrapper RuntimeException, if
+ * one is needed. If not provided, the classname of <code>e</code>
+ * and it's localized message is used
+ * @param e the underlying exception
+ */
+ public static RuntimeException getRuntimeException(String wrapperMsg, Exception e)
+ {
+ if (e instanceof RuntimeException)
+ return (RuntimeException) e;
+
+ wrapperMsg = (wrapperMsg != null) ? wrapperMsg : e.getClass().getName() + " -- " + e.getLocalizedMessage();
+ throw new RuntimeException(wrapperMsg, e);
+ }
+
+ /**
* Prevent instantiation.
*/
private Util() {}
More information about the jboss-cvs-commits
mailing list