[jboss-cvs] JBossAS SVN: r75270 - in trunk/cluster/src/main/org/jboss/ha/hasessionstate: server and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jul 1 11:26:34 EDT 2008
Author: pferraro
Date: 2008-07-01 11:26:34 -0400 (Tue, 01 Jul 2008)
New Revision: 75270
Modified:
trunk/cluster/src/main/org/jboss/ha/hasessionstate/interfaces/PackagedSession.java
trunk/cluster/src/main/org/jboss/ha/hasessionstate/server/HASessionStateImpl.java
trunk/cluster/src/main/org/jboss/ha/hasessionstate/server/PackagedSessionImpl.java
Log:
[JBAS-5436] Ensure HASessionState can handle concurrent JGroups requests
Refactored PackagedSession mutex into PackagedSession itself, instead of maintaining a parallel map.
Replaced EDU.oswego.cs.dl.util.concurrent.Mutex with java.util.concurrent.locks.ReentrantLock.
Modified: trunk/cluster/src/main/org/jboss/ha/hasessionstate/interfaces/PackagedSession.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/hasessionstate/interfaces/PackagedSession.java 2008-07-01 14:16:20 UTC (rev 75269)
+++ trunk/cluster/src/main/org/jboss/ha/hasessionstate/interfaces/PackagedSession.java 2008-07-01 15:26:34 UTC (rev 75270)
@@ -23,13 +23,14 @@
import java.io.Serializable;
+import java.util.concurrent.locks.Lock;
/**
* Information about a session that is shared by nodes in the same subpartition
*
* @see HASessionState
* @see org.jboss.ha.hasessionstate.server.PackagedSessionImpl
- *
+ *
* @author sacha.labourey at cogito-info.ch
* @version $Revision$
*
@@ -40,44 +41,49 @@
{
/** The serialVersionUID
* @since 1.2
- */
+ */
static final long serialVersionUID = 689622988452110553L;
/*
* Stored state
*/
- public byte[] getState ();
- public boolean setState (byte[] state);
+ public byte[] getState();
+ public boolean setState(byte[] state);
/*
* Stored state
- */
- public boolean isStateIdentical (byte[] state);
+ */
+ public boolean isStateIdentical(byte[] state);
/*
* Update the state and content of this PackagedSession from the content of another
* PackagedSession.
*/
- public void update (PackagedSession clone);
+ public void update(PackagedSession clone);
/*
* Owner node of the state
*/
- public String getOwner ();
- public void setOwner (String owner);
+ public String getOwner();
+ public void setOwner(String owner);
/*
* Version number of this state
*/
- public long getVersion ();
+ public long getVersion();
/*
* Key identifier associated with this state
*/
- public Serializable getKey ();
- public void setKey (Serializable key);
+ public Serializable getKey();
/*
* Number of miliseconds since when this state has not been modified in this VM
*/
- public long unmodifiedExistenceInVM ();
+ public long unmodifiedExistenceInVM();
+
+ /**
+ * Returns the lock used to prevent concurrent calls on this session.
+ * @return a mutex
+ */
+ public Lock getLock();
}
Modified: trunk/cluster/src/main/org/jboss/ha/hasessionstate/server/HASessionStateImpl.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/hasessionstate/server/HASessionStateImpl.java 2008-07-01 14:16:20 UTC (rev 75269)
+++ trunk/cluster/src/main/org/jboss/ha/hasessionstate/server/HASessionStateImpl.java 2008-07-01 15:26:34 UTC (rev 75270)
@@ -1,24 +1,24 @@
/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
package org.jboss.ha.hasessionstate.server;
import java.io.ByteArrayInputStream;
@@ -28,12 +28,13 @@
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Enumeration;
import java.util.HashMap;
-import java.util.Hashtable;
-import java.util.Iterator;
import java.util.Map;
-import java.util.Vector;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.locks.Lock;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
@@ -50,16 +51,16 @@
import org.jboss.ha.hasessionstate.interfaces.HASessionState;
import org.jboss.ha.hasessionstate.interfaces.PackagedSession;
import org.jboss.logging.Logger;
+import org.jboss.metadata.ClusterConfigMetaData;
import org.jboss.naming.NonSerializableFactory;
-import EDU.oswego.cs.dl.util.concurrent.Mutex;
-
/**
* Default implementation of HASessionState
*
* @see org.jboss.ha.hasessionstate.interfaces.HASessionState
* @author sacha.labourey at cogito-info.ch
* @author <a href="bill at burkecentral.com">Bill Burke</a>
+ * @author Paul Ferraro
* @version $Revision$
*
* <p><b>Revisions:</b><br>
@@ -73,23 +74,24 @@
public class HASessionStateImpl
implements HASessionState, HAPartition.HAPartitionStateTransfer
{
- private String _sessionStateName;
- private Logger log;
- private HAPartition hapGeneral;
- private String sessionStateIdentifier;
- private String myNodeName;
-
- long beanCleaningDelay;
-
private static final long MAX_DELAY_BEFORE_CLEANING_UNRECLAIMED_STATE = 30L * 60L * 1000L; // 30 minutes... should be set externally or use cache settings
private static final String HA_SESSION_STATE_STATE_TRANSFER = "HASessionStateTransfer";
private static final Class<?>[] SET_OWNERSHIP_TYPES = new Class[] { String.class, Object.class, String.class, Long.class };
- private static final Class<?>[] REMOVE_SESSION_TYPES = new Class[]{ String.class, Object.class };
+ private static final Class<?>[] REMOVE_SESSION_TYPES = new Class[] { String.class, Object.class };
private static final Class<?>[] SET_STATE_TYPES = new Class[] { String.class, PackagedSession.class };
- private Map<String, Map<Object, Mutex>> locks = new HashMap<String, Map<Object, Mutex>>();
+ protected Map<String, ConcurrentMap<Object, PackagedSession>> appSessionMap = new HashMap<String, ConcurrentMap<Object, PackagedSession>>();
+ protected ConcurrentMap<String, Set<HASessionStateListener>> appListenerMap = new ConcurrentHashMap<String, Set<HASessionStateListener>>();
+
+ long beanCleaningDelay;
+ private String _sessionStateName;
+ private Logger log;
+ private HAPartition partition;
+ private String sessionStateIdentifier;
+ private String myNodeName;
+
public HASessionStateImpl(String sessionStateName, HAPartition partition, long beanCleaningDelay)
{
if (partition == null)
@@ -97,40 +99,26 @@
throw new IllegalArgumentException("HAPartition must not be null when constructing HASessionImpl");
}
- this.hapGeneral = partition;
+ this.partition = partition;
- if (sessionStateName == null)
- {
- this._sessionStateName = org.jboss.metadata.ClusterConfigMetaData.DEFAULT_SESSION_STATE_NAME;
- }
- else
- {
- this._sessionStateName = sessionStateName;
- }
+ this._sessionStateName = (sessionStateName != null) ? sessionStateName : ClusterConfigMetaData.DEFAULT_SESSION_STATE_NAME;
this.sessionStateIdentifier = "SessionState-'" + this._sessionStateName + "'";
+
+ this.beanCleaningDelay = (beanCleaningDelay > 0) ? beanCleaningDelay : MAX_DELAY_BEFORE_CLEANING_UNRECLAIMED_STATE;
- if (beanCleaningDelay > 0)
- {
- this.beanCleaningDelay = beanCleaningDelay;
- }
- else
- {
- this.beanCleaningDelay = MAX_DELAY_BEFORE_CLEANING_UNRECLAIMED_STATE;
- }
+ this.log = Logger.getLogger(HASessionStateImpl.class.getName() + "." + this._sessionStateName);
}
public void init() throws Exception
{
- this.log = Logger.getLogger(HASessionStateImpl.class.getName() + "." + this._sessionStateName);
-
- this.hapGeneral.registerRPCHandler(this.sessionStateIdentifier, this);
- this.hapGeneral.subscribeToStateTransferEvents(HA_SESSION_STATE_STATE_TRANSFER, this);
+ this.partition.registerRPCHandler(this.sessionStateIdentifier, this);
+ this.partition.subscribeToStateTransferEvents(HA_SESSION_STATE_STATE_TRANSFER, this);
}
public void start() throws Exception
{
- this.myNodeName = this.hapGeneral.getNodeName();
+ this.myNodeName = this.partition.getNodeName();
this.log.debug("HASessionState node name : " + this.myNodeName );
// BES 4/7/06 clean up lifecycle; move this to start, as it can't be
@@ -188,8 +176,8 @@
public void destroy() throws Exception
{
// Remove ref to ourself from HAPartition
- this.hapGeneral.unregisterRPCHandler(this.sessionStateIdentifier, this);
- this.hapGeneral.unsubscribeFromStateTransferEvents(HA_SESSION_STATE_STATE_TRANSFER, this);
+ this.partition.unregisterRPCHandler(this.sessionStateIdentifier, this);
+ this.partition.unsubscribeFromStateTransferEvents(HA_SESSION_STATE_STATE_TRANSFER, this);
}
public String getNodeName()
@@ -203,27 +191,20 @@
{
this.log.debug ("Building and returning state of HASessionState");
- if (this.appSessions == null)
+ synchronized (this.appSessionMap)
{
- this.appSessions = new Hashtable<String, Hashtable<Object, PackagedSession>>();
- }
-
- Serializable result = null;
-
- synchronized (this.lockAppSession)
- {
this.purgeState();
try
{
- result = this.deflate(this.appSessions);
+ return this.deflate(this.appSessionMap);
}
- catch (Exception e)
+ catch (IOException e)
{
this.log.error("operation failed", e);
+ return null;
}
}
- return result;
}
@SuppressWarnings("unchecked")
@@ -231,43 +212,35 @@
{
this.log.debug("Receiving state of HASessionState");
- if (this.appSessions == null)
+ try
{
- this.appSessions = new Hashtable<String, Hashtable<Object, PackagedSession>>();
- }
-
- synchronized (this.lockAppSession)
- {
- try
+ Map<String, ConcurrentMap<Object, PackagedSession>> map = (Map) this.inflate((byte[]) newState);
+
+ synchronized (this.appSessionMap)
{
- this.appSessions.clear(); // hope to facilitate the job of the GC
- this.appSessions = (Hashtable<String, Hashtable<Object, PackagedSession>>) this.inflate((byte[]) newState);
+ this.appSessionMap.clear();
+ this.appSessionMap.putAll(map);
}
- catch (Exception e)
- {
- this.log.error("operation failed", e);
- }
}
+ catch (IOException e)
+ {
+ this.log.error("operation failed", e);
+ }
}
public void purgeState()
{
- synchronized (this.lockAppSession)
+ synchronized (this.appSessionMap)
{
- for (Enumeration<String> keyEnum = this.appSessions.keys(); keyEnum.hasMoreElements();)
+ long currentTime = System.currentTimeMillis();
+
+ for (Map<Object, PackagedSession> map: this.appSessionMap.values())
{
- // trip in apps..
- //
- String key = keyEnum.nextElement();
- Hashtable<Object, PackagedSession> value = this.appSessions.get(key);
- long currentTime = System.currentTimeMillis();
-
- for (Iterator<PackagedSession> iterSessions = value.values().iterator(); iterSessions.hasNext();)
+ for (PackagedSession session: map.values())
{
- PackagedSession ps = iterSessions.next();
- if ((currentTime - ps.unmodifiedExistenceInVM()) > this.beanCleaningDelay)
+ if ((currentTime - session.unmodifiedExistenceInVM()) > this.beanCleaningDelay)
{
- iterSessions.remove();
+ map.remove(session.getKey());
}
}
}
@@ -306,134 +279,93 @@
throw new IOException(e.toString());
}
}
-
- protected Hashtable<String, Hashtable<Object, PackagedSession>> appSessions = new Hashtable<String, Hashtable<Object, PackagedSession>>();
- protected Object lockAppSession = new Object();
-
- protected Hashtable<Object, PackagedSession> getHashtableForApp(String appName)
+
+ protected ConcurrentMap<Object, PackagedSession> getAppMap(String appName)
{
- if (this.appSessions == null)
+ synchronized (this.appSessionMap)
{
- this.appSessions = new Hashtable<String, Hashtable<Object, PackagedSession>>(); // should never happen though...
- }
-
- Hashtable<Object, PackagedSession> result = null;
-
- synchronized (this.lockAppSession)
- {
- result = this.appSessions.get(appName);
- if (result == null)
+ ConcurrentMap<Object, PackagedSession> map = this.appSessionMap.get(appName);
+
+ if (map == null)
{
- result = new Hashtable<Object, PackagedSession>();
- this.appSessions.put(appName, result);
+ map = new ConcurrentHashMap<Object, PackagedSession>();
+
+ this.appSessionMap.put(appName, map);
}
+
+ return map;
}
- return result;
}
-
+
public void createSession(String appName, Object keyId)
{
this._createSession(appName, keyId);
}
- public PackagedSessionImpl _createSession(String appName, Object keyId)
+ public PackagedSession _createSession(String appName, Object keyId)
{
- Hashtable<Object, PackagedSession> app = this.getHashtableForApp(appName);
- PackagedSessionImpl result = new PackagedSessionImpl((Serializable) keyId, null, this.myNodeName);
- app.put(keyId, result);
- return result;
+ PackagedSession session = this.createSession(keyId);
+
+ this.getAppMap(appName).put(keyId, session);
+
+ return session;
}
+ private PackagedSession createSession(Object keyId)
+ {
+ return new PackagedSessionImpl((Serializable) keyId, null, this.myNodeName);
+ }
+
public void setState(String appName, Object keyId, byte[] state)
throws java.rmi.RemoteException
{
- Hashtable<Object, PackagedSession> app = this.getHashtableForApp(appName);
- PackagedSession ps = app.get(keyId);
+ PackagedSession session = this.createSession(keyId);
+ PackagedSession existing = this.getAppMap(appName).putIfAbsent(keyId, session);
- if (ps == null)
+ if (existing != null)
{
- ps = this._createSession(appName, keyId);
+ session = existing;
}
-
- boolean isStateIdentical = false;
- Mutex mtx = this.getLock(appName, keyId);
- try
+ Lock lock = session.getLock();
+
+ if (!lock.tryLock())
{
- if (!mtx.attempt(0))
- {
- throw new java.rmi.RemoteException ("Concurent calls on session object.");
- }
+ throw new java.rmi.RemoteException("Concurent calls on session object.");
}
- catch (InterruptedException ie)
- {
- this.log.info(ie);
- return;
- }
try
{
- isStateIdentical = ps.setState(state);
+ boolean isStateIdentical = session.setState(state);
+
if (!isStateIdentical)
{
- Object[] args = { appName, ps };
+ Object[] args = { appName, session };
- try
- {
- this.hapGeneral.callMethodOnCluster(this.sessionStateIdentifier,
- "_setState",
- args,
- SET_STATE_TYPES,
- true);
- }
- catch (Exception e)
- {
- this.log.error("operation failed", e);
- }
+ this.partition.callMethodOnCluster(this.sessionStateIdentifier, "_setState", args, SET_STATE_TYPES, true);
}
}
+ catch (Exception e)
+ {
+ this.log.error("operation failed", e);
+ }
finally
{
- mtx.release();
+ lock.unlock();
}
}
- /*
- public void _setStates(String appName, Hashtable packagedSessions)
- {
- synchronized (this.lockAppSession)
- {
- Hashtable<Object, PackagedSession> app = this.getHashtableForApp(appName);
-
- if (app == null)
- {
- app = new Hashtable<Object, PackagedSession>(packagedSessions.size());
- this.appSessions.put(appName, app);
- }
- app.putAll(packagedSessions);
- }
- }*/
-
public void _setState(String appName, PackagedSession session)
{
- Hashtable<Object, PackagedSession> app = this.getHashtableForApp(appName);
- PackagedSession ps = app.get(session.getKey());
+ PackagedSession existing = this.getAppMap(appName).putIfAbsent(session.getKey(), session);
- if (ps == null)
+ if (existing != null)
{
- ps = session;
- synchronized (app)
- {
- app.put(ps.getKey(), ps);
- }
- }
- else
- {
- Mutex mtx = this.getLock(appName, session.getKey());
+ Lock lock = existing.getLock();
try
{
- mtx.acquire();
+ lock.lockInterruptibly();
}
catch (InterruptedException ie)
{
@@ -443,25 +375,25 @@
try
{
- if (ps.getOwner().equals(this.myNodeName))
+ if (existing.getOwner().equals(this.myNodeName))
{
// a modification has occured externally while we were the owner
//
- this.ownedObjectExternallyModified(appName, session.getKey(), ps, session);
+ this.ownedObjectExternallyModified(appName, session.getKey(), existing, session);
}
- ps.update(session);
+
+ existing.update(session);
}
finally
{
- mtx.release();
+ lock.unlock();
}
}
}
public PackagedSession getState(String appName, Object keyId)
{
- Hashtable<Object, PackagedSession> app = this.getHashtableForApp(appName);
- return app.get(keyId);
+ return this.getAppMap(appName).get(keyId);
}
public PackagedSession getStateWithOwnership(String appName, Object keyId) throws java.rmi.RemoteException
@@ -471,45 +403,32 @@
public PackagedSession localTakeOwnership(String appName, Object keyId) throws java.rmi.RemoteException
{
- Hashtable<Object, PackagedSession> app = this.getHashtableForApp(appName);
- PackagedSession ps = app.get(keyId);
+ PackagedSession session = this.getAppMap(appName).get(keyId);
// if the session is not yet available, we simply return null. The persistence manager
// will have to take an action accordingly
//
- if (ps == null)
+ if (session == null)
{
return null;
}
- Mutex mtx = this.getLock (appName, keyId);
+ Lock lock = session.getLock();
- try
+ if (!lock.tryLock())
{
- if (!mtx.attempt(0))
- {
- throw new java.rmi.RemoteException("Concurent calls on session object.");
- }
+ throw new java.rmi.RemoteException("Concurent calls on session object.");
}
- catch (InterruptedException ie)
- {
- this.log.info(ie);
- return null;
- }
try
{
- if (!ps.getOwner().equals(this.myNodeName))
+ if (!session.getOwner().equals(this.myNodeName))
{
- Object[] args = { appName, keyId, this.myNodeName, new Long(ps.getVersion()) };
+ Object[] args = { appName, keyId, this.myNodeName, new Long(session.getVersion()) };
ArrayList<?> answers = null;
try
{
- answers = this.hapGeneral.callMethodOnCluster(this.sessionStateIdentifier,
- "_setOwnership",
- args,
- SET_OWNERSHIP_TYPES,
- true);
+ answers = this.partition.callMethodOnCluster(this.sessionStateIdentifier, "_setOwnership", args, SET_OWNERSHIP_TYPES, true);
}
catch (Exception e)
{
@@ -521,72 +440,61 @@
throw new java.rmi.RemoteException("Concurent calls on session object.");
}
- ps.setOwner(this.myNodeName);
- return ps;
+ session.setOwner(this.myNodeName);
+ return session;
}
- return ps;
+ return session;
}
finally
{
- mtx.release();
+ lock.unlock();
}
}
public Boolean _setOwnership(String appName, Object keyId, String newOwner, Long remoteVersion)
{
- Hashtable<Object, PackagedSession> app = this.getHashtableForApp(appName);
- PackagedSession ps = app.get(keyId);
- Boolean answer = Boolean.TRUE;
- Mutex mtx = this.getLock(appName, keyId);
+ PackagedSession session = this.getAppMap(appName).get(keyId);
- try
+ Lock lock = session.getLock();
+
+ if (!lock.tryLock())
{
- if (!mtx.attempt(0))
- {
- return Boolean.FALSE;
- }
- }
- catch (InterruptedException ie)
- {
- this.log.info(ie);
return Boolean.FALSE;
}
try
{
- if (!ps.getOwner().equals(this.myNodeName))
+ if (!session.getOwner().equals(this.myNodeName))
{
// this is not our business... we don't care
// we do not update the owner of ps as another host may refuse the _setOwnership call
// anyway, the update will be sent to us later if state is modified
//
- //ps.setOwner (newOwner);
- answer = Boolean.TRUE;
+ return Boolean.TRUE;
}
- else if (ps.getVersion() > remoteVersion.longValue())
+ else if (session.getVersion() > remoteVersion.longValue())
{
// we are concerned and our version is more recent than the one of the remote host!
// it means that we have concurrent calls on the same state that has not yet been updated
// this means we will need to raise a java.rmi.RemoteException
//
- answer = Boolean.FALSE;
+ return Boolean.FALSE;
}
- else
- {
- // the remote host has the same version as us (or more recent? possible?)
- // we need to update the ownership. We can do this because we know that no other
- // node can refuse the _setOwnership call
- ps.setOwner(newOwner);
- this.ownedObjectExternallyModified(appName, keyId, ps, ps);
- answer = Boolean.TRUE;
- }
+
+ // the remote host has the same version as us (or more recent? possible?)
+ // we need to update the ownership. We can do this because we know that no other
+ // node can refuse the _setOwnership call
+ session.setOwner(newOwner);
+
+ this.ownedObjectExternallyModified(appName, keyId, session, session);
+
+ return Boolean.TRUE;
}
finally
{
- mtx.release();
+ lock.unlock();
}
- return answer;
}
public void takeOwnership(String appName, Object keyId) throws java.rmi.RemoteException
@@ -596,81 +504,65 @@
public void removeSession(String appName, Object keyId)
{
- Hashtable<Object, PackagedSession> app = this.getHashtableForApp(appName);
- if (app != null)
+ if (this.getAppMap(appName).remove(keyId) != null)
{
- PackagedSession ps = app.remove(keyId);
- if (ps != null)
+ Object[] args = { appName, keyId };
+
+ try
{
- this.removeLock(appName, keyId);
- Object[] args = { appName, keyId };
- try
- {
- this.hapGeneral.callMethodOnCluster (this.sessionStateIdentifier,
- "_removeSession",
- args,
- REMOVE_SESSION_TYPES,
- true);
- }
- catch (Exception e)
- {
- this.log.error("operation failed", e);
- }
+ this.partition.callMethodOnCluster(this.sessionStateIdentifier, "_removeSession", args, REMOVE_SESSION_TYPES, true);
}
+ catch (Exception e)
+ {
+ this.log.error("operation failed", e);
+ }
}
}
public void _removeSession(String appName, Object keyId)
{
- Hashtable<Object, PackagedSession> app = this.getHashtableForApp(appName);
- PackagedSession ps = app.remove(keyId);
- if (ps != null && ps.getOwner().equals(this.myNodeName))
+ PackagedSession session = this.getAppMap(appName).remove(keyId);
+
+ if ((session != null) && session.getOwner().equals(this.myNodeName))
{
- this.ownedObjectExternallyModified(appName, keyId, ps, ps);
+ this.ownedObjectExternallyModified(appName, keyId, session, session);
}
-
- this.removeLock(appName, keyId);
}
- protected Hashtable<String, Vector<HASessionStateListener>> listeners = new Hashtable<String, Vector<HASessionStateListener>>();
-
- public synchronized void subscribe(String appName, HASessionStateListener listener)
+ public void subscribe(String appName, HASessionStateListener listener)
{
- Vector<HASessionStateListener> members = this.listeners.get(appName);
- if (members == null)
- {
- members = new Vector<HASessionStateListener>();
- this.listeners.put(appName, members);
- }
- if (!members.contains(listener))
- {
- members.add(listener);
- }
+ Set<HASessionStateListener> set = new CopyOnWriteArraySet<HASessionStateListener>();
+
+ Set<HASessionStateListener> existing = this.appListenerMap.putIfAbsent(appName, set);
+
+ ((existing != null) ? existing : set).add(listener);
}
- public synchronized void unsubscribe(String appName, HASessionStateListener listener)
+ public void unsubscribe(String appName, HASessionStateListener listener)
{
- Vector<HASessionStateListener> members = this.listeners.get(appName);
- if ((members != null) && members.contains(listener))
+ Set<HASessionStateListener> listeners = this.appListenerMap.get(appName);
+
+ if (listeners != null)
{
- members.remove(listener);
+ listeners.remove(listener);
}
}
public void ownedObjectExternallyModified(String appName, Object key, PackagedSession oldSession, PackagedSession newSession)
{
- Vector<HASessionStateListener> members = this.listeners.get(appName);
- if (members != null)
+ Set<HASessionStateListener> listeners = this.appListenerMap.get(appName);
+
+ if (listeners != null)
{
- for (int i = 0; i < members.size(); i++)
+ for (HASessionStateListener listener: listeners)
{
try
{
- members.elementAt(i).sessionExternallyModified(newSession);
+ listener.sessionExternallyModified(newSession);
}
- catch (Throwable t)
+ catch (Throwable e)
{
- this.log.debug(t);
+ this.log.debug(e);
}
}
}
@@ -678,56 +570,6 @@
public HAPartition getCurrentHAPartition()
{
- return this.hapGeneral;
+ return this.partition;
}
-
-
- protected boolean lockExists(String appName, Object key)
- {
- synchronized (this.locks)
- {
- Map<Object, Mutex> ls = this.locks.get(appName);
- if (ls == null)
- {
- return false;
- }
-
- return (ls.get(key) != null);
- }
- }
-
- protected Mutex getLock(String appName, Object key)
- {
- synchronized (this.locks)
- {
- Map<Object, Mutex> ls = this.locks.get(appName);
- if (ls == null)
- {
- ls = new HashMap<Object, Mutex>();
- this.locks.put(appName, ls);
- }
-
- Mutex mutex = ls.get(key);
- if (mutex == null)
- {
- mutex = new Mutex();
- ls.put(key, mutex);
- }
-
- return mutex;
- }
- }
-
- protected void removeLock(String appName, Object key)
- {
- synchronized (this.locks)
- {
- Map<Object, Mutex> ls = this.locks.get(appName);
- if (ls == null)
- {
- return;
- }
- ls.remove(key);
- }
- }
}
Modified: trunk/cluster/src/main/org/jboss/ha/hasessionstate/server/PackagedSessionImpl.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/hasessionstate/server/PackagedSessionImpl.java 2008-07-01 14:16:20 UTC (rev 75269)
+++ trunk/cluster/src/main/org/jboss/ha/hasessionstate/server/PackagedSessionImpl.java 2008-07-01 15:26:34 UTC (rev 75270)
@@ -25,6 +25,8 @@
import java.io.IOException;
import java.io.Serializable;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
/**
* Default implementation of PackagedSession
@@ -41,79 +43,89 @@
*/
private static final long serialVersionUID = 4162160242862877223L;
- protected byte[] state;
- protected long versionId;
- protected String owner;
- protected Serializable key;
- protected transient long lastModificationTimeInVM;
+ private Serializable key;
- public PackagedSessionImpl ()
- {
- this.lastModificationTimeInVM = System.currentTimeMillis ();
- }
+ private volatile byte[] state;
+ private volatile long versionId;
+ private volatile String owner;
- public PackagedSessionImpl (Serializable key, byte[] state, String owner)
+ private transient volatile long lastModificationTimeInVM;
+ private transient Lock lock;
+
+ public PackagedSessionImpl(Serializable key, byte[] state, String owner)
{
this.key = key;
- this.setState (state);
+ this.setState(state);
this.owner = owner;
- this.lastModificationTimeInVM = System.currentTimeMillis ();
+ this.lock = new ReentrantLock();
}
- public byte[] getState ()
+ public byte[] getState()
{
return this.state;
}
- public boolean setState (byte[] state)
+ public boolean setState(byte[] state)
{
- this.lastModificationTimeInVM = System.currentTimeMillis ();
- if (isStateIdentical (state))
- return true;
- else
- {
- this.state = state;
- this.versionId++;
- return false;
- }
+ this.lastModificationTimeInVM = System.currentTimeMillis();
+ if (isStateIdentical(state)) return true;
+
+ this.state = state;
+ this.versionId++;
+ return false;
}
- public boolean isStateIdentical (byte[] state)
+ public boolean isStateIdentical(byte[] state)
{
- return java.util.Arrays.equals (state, this.state);
+ return java.util.Arrays.equals(state, this.state);
}
- public void update (PackagedSession clone)
+ public void update(PackagedSession clone)
{
- this.state = (byte[])clone.getState().clone();
- this.versionId = clone.getVersion ();
- this.owner = clone.getOwner ();
+ this.state = clone.getState().clone();
+ this.versionId = clone.getVersion();
+ this.owner = clone.getOwner();
this.lastModificationTimeInVM = System.currentTimeMillis();
}
- public String getOwner ()
- { return this.owner; }
- public void setOwner (String owner)
- { this.owner = owner; }
+ public String getOwner()
+ {
+ return this.owner;
+ }
- public long getVersion ()
- { return this.versionId; }
+ public void setOwner(String owner)
+ {
+ this.owner = owner;
+ }
- public Serializable getKey ()
- { return this.key; }
- public void setKey (Serializable key)
- { this.key = key; }
+ public long getVersion()
+ {
+ return this.versionId;
+ }
+
+ public Serializable getKey()
+ {
+ return this.key;
+ }
- public long unmodifiedExistenceInVM ()
+ public long unmodifiedExistenceInVM()
{
return this.lastModificationTimeInVM;
}
+
+ /**
+ * @see org.jboss.ha.hasessionstate.interfaces.PackagedSession#getLock()
+ */
+ public Lock getLock()
+ {
+ return this.lock;
+ }
// JBAS-3545 -- have to set the mod time after deserializing
- private void readObject(java.io.ObjectInputStream in)
- throws IOException, ClassNotFoundException
+ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException
{
in.defaultReadObject();
this.lastModificationTimeInVM = System.currentTimeMillis();
+ this.lock = new ReentrantLock();
}
}
More information about the jboss-cvs-commits
mailing list