[jboss-cvs] JBossAS SVN: r75270 - in trunk/cluster/src/main/org/jboss/ha/hasessionstate: server and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jul 1 11:26:34 EDT 2008


Author: pferraro
Date: 2008-07-01 11:26:34 -0400 (Tue, 01 Jul 2008)
New Revision: 75270

Modified:
   trunk/cluster/src/main/org/jboss/ha/hasessionstate/interfaces/PackagedSession.java
   trunk/cluster/src/main/org/jboss/ha/hasessionstate/server/HASessionStateImpl.java
   trunk/cluster/src/main/org/jboss/ha/hasessionstate/server/PackagedSessionImpl.java
Log:
[JBAS-5436] Ensure HASessionState can handle concurrent JGroups requests
Refactored PackagedSession mutex into PackagedSession itself, instead of maintaining a parallel map.
Replaced EDU.oswego.cs.dl.util.concurrent.Mutex with java.util.concurrent.locks.ReentrantLock.

Modified: trunk/cluster/src/main/org/jboss/ha/hasessionstate/interfaces/PackagedSession.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/hasessionstate/interfaces/PackagedSession.java	2008-07-01 14:16:20 UTC (rev 75269)
+++ trunk/cluster/src/main/org/jboss/ha/hasessionstate/interfaces/PackagedSession.java	2008-07-01 15:26:34 UTC (rev 75270)
@@ -23,13 +23,14 @@
 
 
 import java.io.Serializable;
+import java.util.concurrent.locks.Lock;
 
 /**
  *   Information about a session that is shared by nodes in the same subpartition
  *
  *   @see HASessionState
  *   @see org.jboss.ha.hasessionstate.server.PackagedSessionImpl
- *   
+ * 
  *   @author sacha.labourey at cogito-info.ch
  *   @version $Revision$
  *
@@ -40,44 +41,49 @@
 {
    /** The serialVersionUID
     * @since 1.2
-    */ 
+    */
    static final long serialVersionUID = 689622988452110553L;
    /*
     * Stored state
     */
-   public byte[] getState ();
-   public boolean setState (byte[] state);
+   public byte[] getState();
+   public boolean setState(byte[] state);
    
    /*
     * Stored state
-    */   
-   public boolean isStateIdentical (byte[] state);
+    */
+   public boolean isStateIdentical(byte[] state);
    
    /*
     * Update the state and content of this PackagedSession from the content of another
     * PackagedSession.
     */
-   public void update (PackagedSession clone);
+   public void update(PackagedSession clone);
    
    /*
     * Owner node of the state
     */
-   public String getOwner ();
-   public void setOwner (String owner);
+   public String getOwner();
+   public void setOwner(String owner);
    
    /*
     * Version number of this state
     */
-   public long getVersion ();
+   public long getVersion();
    
    /*
     * Key identifier associated with this state
     */
-   public Serializable getKey ();
-   public void setKey (Serializable key);
+   public Serializable getKey();
    
    /*
     * Number of miliseconds since when this state has not been modified in this VM
     */
-   public long unmodifiedExistenceInVM ();
+   public long unmodifiedExistenceInVM();
+   
+   /**
+    * Returns the lock used to prevent concurrent calls on this session.
+    * @return a mutex
+    */
+   public Lock getLock();
 }

Modified: trunk/cluster/src/main/org/jboss/ha/hasessionstate/server/HASessionStateImpl.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/hasessionstate/server/HASessionStateImpl.java	2008-07-01 14:16:20 UTC (rev 75269)
+++ trunk/cluster/src/main/org/jboss/ha/hasessionstate/server/HASessionStateImpl.java	2008-07-01 15:26:34 UTC (rev 75270)
@@ -1,24 +1,24 @@
 /*
-  * JBoss, Home of Professional Open Source
-  * Copyright 2005, JBoss Inc., and individual contributors as indicated
-  * by the @authors tag. See the copyright.txt in the distribution for a
-  * full listing of individual contributors.
-  *
-  * This is free software; you can redistribute it and/or modify it
-  * under the terms of the GNU Lesser General Public License as
-  * published by the Free Software Foundation; either version 2.1 of
-  * the License, or (at your option) any later version.
-  *
-  * This software is distributed in the hope that it will be useful,
-  * but WITHOUT ANY WARRANTY; without even the implied warranty of
-  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-  * Lesser General Public License for more details.
-  *
-  * You should have received a copy of the GNU Lesser General Public
-  * License along with this software; if not, write to the Free
-  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-  */
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
 package org.jboss.ha.hasessionstate.server;
 
 import java.io.ByteArrayInputStream;
@@ -28,12 +28,13 @@
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Enumeration;
 import java.util.HashMap;
-import java.util.Hashtable;
-import java.util.Iterator;
 import java.util.Map;
-import java.util.Vector;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.locks.Lock;
 import java.util.zip.Deflater;
 import java.util.zip.DeflaterOutputStream;
 import java.util.zip.InflaterInputStream;
@@ -50,16 +51,16 @@
 import org.jboss.ha.hasessionstate.interfaces.HASessionState;
 import org.jboss.ha.hasessionstate.interfaces.PackagedSession;
 import org.jboss.logging.Logger;
+import org.jboss.metadata.ClusterConfigMetaData;
 import org.jboss.naming.NonSerializableFactory;
 
-import EDU.oswego.cs.dl.util.concurrent.Mutex;
-
 /**
  *   Default implementation of HASessionState
  *
  *   @see org.jboss.ha.hasessionstate.interfaces.HASessionState
  *   @author sacha.labourey at cogito-info.ch
  *   @author <a href="bill at burkecentral.com">Bill Burke</a>
+ *   @author Paul Ferraro
  *   @version $Revision$
  *
  * <p><b>Revisions:</b><br>
@@ -73,23 +74,24 @@
 public class HASessionStateImpl
    implements HASessionState, HAPartition.HAPartitionStateTransfer
 {
-   private String _sessionStateName;
-   private Logger log;
-   private HAPartition hapGeneral;
-   private String sessionStateIdentifier;
-   private String myNodeName;
-   
-   long beanCleaningDelay;
-   
    private static final long MAX_DELAY_BEFORE_CLEANING_UNRECLAIMED_STATE = 30L * 60L * 1000L; // 30 minutes... should be set externally or use cache settings
    private static final String HA_SESSION_STATE_STATE_TRANSFER = "HASessionStateTransfer";
    
    private static final Class<?>[] SET_OWNERSHIP_TYPES = new Class[] { String.class, Object.class, String.class, Long.class };
-   private static final Class<?>[] REMOVE_SESSION_TYPES = new Class[]{ String.class, Object.class };
+   private static final Class<?>[] REMOVE_SESSION_TYPES = new Class[] { String.class, Object.class };
    private static final Class<?>[] SET_STATE_TYPES = new Class[] { String.class, PackagedSession.class };
    
-   private Map<String, Map<Object, Mutex>> locks = new HashMap<String, Map<Object, Mutex>>();
+   protected Map<String, ConcurrentMap<Object, PackagedSession>> appSessionMap = new HashMap<String, ConcurrentMap<Object, PackagedSession>>();
+   protected ConcurrentMap<String, Set<HASessionStateListener>> appListenerMap = new ConcurrentHashMap<String, Set<HASessionStateListener>>();
+      
+   long beanCleaningDelay;
    
+   private String _sessionStateName;
+   private Logger log;
+   private HAPartition partition;
+   private String sessionStateIdentifier;
+   private String myNodeName;
+   
    public HASessionStateImpl(String sessionStateName, HAPartition partition, long beanCleaningDelay)
    {
       if (partition == null)
@@ -97,40 +99,26 @@
          throw new IllegalArgumentException("HAPartition must not be null when constructing HASessionImpl");
       }
       
-      this.hapGeneral = partition;
+      this.partition = partition;
       
-      if (sessionStateName == null)
-      {
-         this._sessionStateName = org.jboss.metadata.ClusterConfigMetaData.DEFAULT_SESSION_STATE_NAME;
-      }
-      else
-      {
-         this._sessionStateName = sessionStateName;
-      }
+      this._sessionStateName = (sessionStateName != null) ? sessionStateName : ClusterConfigMetaData.DEFAULT_SESSION_STATE_NAME;
       
       this.sessionStateIdentifier = "SessionState-'" + this._sessionStateName + "'";
+
+      this.beanCleaningDelay = (beanCleaningDelay > 0) ? beanCleaningDelay : MAX_DELAY_BEFORE_CLEANING_UNRECLAIMED_STATE;
       
-      if (beanCleaningDelay > 0)
-      {
-         this.beanCleaningDelay = beanCleaningDelay;
-      }
-      else
-      {
-         this.beanCleaningDelay = MAX_DELAY_BEFORE_CLEANING_UNRECLAIMED_STATE;
-      }
+      this.log = Logger.getLogger(HASessionStateImpl.class.getName() + "." + this._sessionStateName);
    }
    
    public void init() throws Exception
    {
-      this.log = Logger.getLogger(HASessionStateImpl.class.getName() + "." + this._sessionStateName);
-      
-      this.hapGeneral.registerRPCHandler(this.sessionStateIdentifier, this);
-      this.hapGeneral.subscribeToStateTransferEvents(HA_SESSION_STATE_STATE_TRANSFER, this);
+      this.partition.registerRPCHandler(this.sessionStateIdentifier, this);
+      this.partition.subscribeToStateTransferEvents(HA_SESSION_STATE_STATE_TRANSFER, this);
    }
    
    public void start() throws Exception
    {
-      this.myNodeName = this.hapGeneral.getNodeName();
+      this.myNodeName = this.partition.getNodeName();
       this.log.debug("HASessionState node name : " + this.myNodeName );
       
       // BES 4/7/06 clean up lifecycle; move this to start, as it can't be
@@ -188,8 +176,8 @@
    public void destroy() throws Exception
    {
       // Remove ref to ourself from HAPartition
-      this.hapGeneral.unregisterRPCHandler(this.sessionStateIdentifier, this);
-      this.hapGeneral.unsubscribeFromStateTransferEvents(HA_SESSION_STATE_STATE_TRANSFER, this);
+      this.partition.unregisterRPCHandler(this.sessionStateIdentifier, this);
+      this.partition.unsubscribeFromStateTransferEvents(HA_SESSION_STATE_STATE_TRANSFER, this);
    }
    
    public String getNodeName()
@@ -203,27 +191,20 @@
    {
       this.log.debug ("Building and returning state of HASessionState");
       
-      if (this.appSessions == null)
+      synchronized (this.appSessionMap)
       {
-         this.appSessions = new Hashtable<String, Hashtable<Object, PackagedSession>>();
-      }
-      
-      Serializable result = null;
-      
-      synchronized (this.lockAppSession)
-      {
          this.purgeState();
          
          try
          {
-            result = this.deflate(this.appSessions);
+            return this.deflate(this.appSessionMap);
          }
-         catch (Exception e)
+         catch (IOException e)
          {
             this.log.error("operation failed", e);
+            return null;
          }
       }
-      return result;
    }
    
    @SuppressWarnings("unchecked")
@@ -231,43 +212,35 @@
    {
       this.log.debug("Receiving state of HASessionState");
       
-      if (this.appSessions == null)
+      try
       {
-         this.appSessions = new Hashtable<String, Hashtable<Object, PackagedSession>>();
-      }
-      
-      synchronized (this.lockAppSession)
-      {
-         try
+         Map<String, ConcurrentMap<Object, PackagedSession>> map = (Map) this.inflate((byte[]) newState);
+         
+         synchronized (this.appSessionMap)
          {
-            this.appSessions.clear(); // hope to facilitate the job of the GC
-            this.appSessions = (Hashtable<String, Hashtable<Object, PackagedSession>>) this.inflate((byte[]) newState);
+            this.appSessionMap.clear();
+            this.appSessionMap.putAll(map);
          }
-         catch (Exception e)
-         {
-            this.log.error("operation failed", e);
-         }
       }
+      catch (IOException e)
+      {
+         this.log.error("operation failed", e);
+      }
    }
    
    public void purgeState()
    {
-      synchronized (this.lockAppSession)
+      synchronized (this.appSessionMap)
       {
-         for (Enumeration<String> keyEnum = this.appSessions.keys(); keyEnum.hasMoreElements();)
+         long currentTime = System.currentTimeMillis();
+         
+         for (Map<Object, PackagedSession> map: this.appSessionMap.values())
          {
-            // trip in apps..
-            //
-            String key = keyEnum.nextElement();
-            Hashtable<Object, PackagedSession> value = this.appSessions.get(key);
-            long currentTime = System.currentTimeMillis();
-            
-            for (Iterator<PackagedSession> iterSessions = value.values().iterator(); iterSessions.hasNext();)
+            for (PackagedSession session: map.values())
             {
-               PackagedSession ps = iterSessions.next();
-               if ((currentTime - ps.unmodifiedExistenceInVM()) > this.beanCleaningDelay)
+               if ((currentTime - session.unmodifiedExistenceInVM()) > this.beanCleaningDelay)
                {
-                  iterSessions.remove();
+                  map.remove(session.getKey());
                }
             }
          }
@@ -306,134 +279,93 @@
          throw new IOException(e.toString());
       }
    }
-   
-   protected Hashtable<String, Hashtable<Object, PackagedSession>> appSessions = new Hashtable<String, Hashtable<Object, PackagedSession>>();
-   protected Object lockAppSession = new Object();
-   
-   protected Hashtable<Object, PackagedSession> getHashtableForApp(String appName)
+
+   protected ConcurrentMap<Object, PackagedSession> getAppMap(String appName)
    {
-      if (this.appSessions == null)
+      synchronized (this.appSessionMap)
       {
-         this.appSessions = new Hashtable<String, Hashtable<Object, PackagedSession>>(); // should never happen though...
-      }
-      
-      Hashtable<Object, PackagedSession> result = null;
-      
-      synchronized (this.lockAppSession)
-      {
-         result = this.appSessions.get(appName);
-         if (result == null)
+         ConcurrentMap<Object, PackagedSession> map = this.appSessionMap.get(appName);
+         
+         if (map == null)
          {
-            result = new Hashtable<Object, PackagedSession>();
-            this.appSessions.put(appName, result);
+            map = new ConcurrentHashMap<Object, PackagedSession>();
+            
+            this.appSessionMap.put(appName, map);
          }
+         
+         return map;
       }
-      return result;
    }
-   
+
    public void createSession(String appName, Object keyId)
    {
       this._createSession(appName, keyId);
    }
    
-   public PackagedSessionImpl _createSession(String appName, Object keyId)
+   public PackagedSession _createSession(String appName, Object keyId)
    {
-      Hashtable<Object, PackagedSession> app = this.getHashtableForApp(appName);
-      PackagedSessionImpl result = new PackagedSessionImpl((Serializable) keyId, null, this.myNodeName);
-      app.put(keyId, result);
-      return result;
+      PackagedSession session = this.createSession(keyId);
+      
+      this.getAppMap(appName).put(keyId, session);
+      
+      return session;
    }
    
+   private PackagedSession createSession(Object keyId)
+   {
+      return new PackagedSessionImpl((Serializable) keyId, null, this.myNodeName);
+   }
+   
    public void setState(String appName, Object keyId, byte[] state)
       throws java.rmi.RemoteException
    {
-      Hashtable<Object, PackagedSession> app = this.getHashtableForApp(appName);
-      PackagedSession ps = app.get(keyId);
+      PackagedSession session = this.createSession(keyId);
+      PackagedSession existing = this.getAppMap(appName).putIfAbsent(keyId, session);
       
-      if (ps == null)
+      if (existing != null)
       {
-         ps = this._createSession(appName, keyId);
+         session = existing;
       }
-            
-      boolean isStateIdentical = false;
       
-      Mutex mtx = this.getLock(appName, keyId);
-      try
+      Lock lock = session.getLock();
+
+      if (!lock.tryLock())
       {
-         if (!mtx.attempt(0))
-         {
-            throw new java.rmi.RemoteException ("Concurent calls on session object.");
-         }
+         throw new java.rmi.RemoteException("Concurent calls on session object.");
       }
-      catch (InterruptedException ie)
-      {
-         this.log.info(ie);
-         return;
-      }
       
       try
       {
-         isStateIdentical = ps.setState(state);
+         boolean isStateIdentical = session.setState(state);
+         
          if (!isStateIdentical)
          {
-            Object[] args = { appName, ps };
+            Object[] args = { appName, session };
             
-            try
-            {
-               this.hapGeneral.callMethodOnCluster(this.sessionStateIdentifier,
-                                                   "_setState",
-                                                   args,
-                                                   SET_STATE_TYPES,
-                                                   true);
-            }
-            catch (Exception e)
-            {
-               this.log.error("operation failed", e);
-            }
+            this.partition.callMethodOnCluster(this.sessionStateIdentifier, "_setState", args, SET_STATE_TYPES, true);
          }
       }
+      catch (Exception e)
+      {
+         this.log.error("operation failed", e);
+      }
       finally
       {
-         mtx.release();
+         lock.unlock();
       }
    }
    
-   /*
-   public void _setStates(String appName, Hashtable packagedSessions)
-   {
-      synchronized (this.lockAppSession)
-      {
-         Hashtable<Object, PackagedSession> app = this.getHashtableForApp(appName);
-         
-         if (app == null)
-         {
-            app = new Hashtable<Object, PackagedSession>(packagedSessions.size());
-            this.appSessions.put(appName, app);
-         }
-         app.putAll(packagedSessions);
-      }
-   }*/
-   
    public void _setState(String appName, PackagedSession session)
    {
-      Hashtable<Object, PackagedSession> app = this.getHashtableForApp(appName);
-      PackagedSession ps = app.get(session.getKey());
+      PackagedSession existing = this.getAppMap(appName).putIfAbsent(session.getKey(), session);
       
-      if (ps == null)
+      if (existing != null)
       {
-         ps = session;
-         synchronized (app)
-         {
-            app.put(ps.getKey(), ps);
-         }
-      }
-      else
-      {
-         Mutex mtx = this.getLock(appName, session.getKey());
+         Lock lock = existing.getLock();
          
          try
          {
-            mtx.acquire();
+            lock.lockInterruptibly();
          }
          catch (InterruptedException ie)
          {
@@ -443,25 +375,25 @@
          
          try
          {
-            if (ps.getOwner().equals(this.myNodeName))
+            if (existing.getOwner().equals(this.myNodeName))
             {
                // a modification has occured externally while we were the owner
                //
-               this.ownedObjectExternallyModified(appName, session.getKey(), ps, session);
+               this.ownedObjectExternallyModified(appName, session.getKey(), existing, session);
             }
-            ps.update(session);
+            
+            existing.update(session);
          }
          finally
          {
-            mtx.release();
+            lock.unlock();
          }
       }
    }
    
    public PackagedSession getState(String appName, Object keyId)
    {
-      Hashtable<Object, PackagedSession> app = this.getHashtableForApp(appName);
-      return app.get(keyId);
+      return this.getAppMap(appName).get(keyId);
    }
    
    public PackagedSession getStateWithOwnership(String appName, Object keyId) throws java.rmi.RemoteException
@@ -471,45 +403,32 @@
    
    public PackagedSession localTakeOwnership(String appName, Object keyId) throws java.rmi.RemoteException
    {
-      Hashtable<Object, PackagedSession> app = this.getHashtableForApp(appName);
-      PackagedSession ps = app.get(keyId);
+      PackagedSession session = this.getAppMap(appName).get(keyId);
       
       // if the session is not yet available, we simply return null. The persistence manager
       // will have to take an action accordingly
       //
-      if (ps == null)
+      if (session == null)
       {
          return null;
       }
       
-      Mutex mtx = this.getLock (appName, keyId);
+      Lock lock = session.getLock();
       
-      try
+      if (!lock.tryLock())
       {
-         if (!mtx.attempt(0))
-         {
-            throw new java.rmi.RemoteException("Concurent calls on session object.");
-         }
+         throw new java.rmi.RemoteException("Concurent calls on session object.");
       }
-      catch (InterruptedException ie)
-      {
-         this.log.info(ie);
-         return null;
-      }
       
       try
       {
-         if (!ps.getOwner().equals(this.myNodeName))
+         if (!session.getOwner().equals(this.myNodeName))
          {
-            Object[] args = { appName, keyId, this.myNodeName, new Long(ps.getVersion()) };
+            Object[] args = { appName, keyId, this.myNodeName, new Long(session.getVersion()) };
             ArrayList<?> answers = null;
             try
             {
-               answers = this.hapGeneral.callMethodOnCluster(this.sessionStateIdentifier,
-                                                             "_setOwnership",
-                                                             args,
-                                                             SET_OWNERSHIP_TYPES,
-                                                             true);
+               answers = this.partition.callMethodOnCluster(this.sessionStateIdentifier, "_setOwnership", args, SET_OWNERSHIP_TYPES, true);
             }
             catch (Exception e)
             {
@@ -521,72 +440,61 @@
                throw new java.rmi.RemoteException("Concurent calls on session object.");
             }
 
-            ps.setOwner(this.myNodeName);
-            return ps;
+            session.setOwner(this.myNodeName);
+            return session;
          }
 
-         return ps;
+         return session;
       }
       finally
       {
-         mtx.release();
+         lock.unlock();
       }
    }
    
    public Boolean _setOwnership(String appName, Object keyId, String newOwner, Long remoteVersion)
    {
-      Hashtable<Object, PackagedSession> app = this.getHashtableForApp(appName);
-      PackagedSession ps = app.get(keyId);
-      Boolean answer = Boolean.TRUE;
-      Mutex mtx = this.getLock(appName, keyId);
+      PackagedSession session = this.getAppMap(appName).get(keyId);
       
-      try
+      Lock lock = session.getLock();
+      
+      if (!lock.tryLock())
       {
-         if (!mtx.attempt(0))
-         {
-            return Boolean.FALSE;
-         }
-      }
-      catch (InterruptedException ie)
-      {
-         this.log.info(ie);
          return Boolean.FALSE;
       }
 
       try
       {
-         if (!ps.getOwner().equals(this.myNodeName))
+         if (!session.getOwner().equals(this.myNodeName))
          {
             // this is not our business... we don't care
             // we do not update the owner of ps as another host may refuse the _setOwnership call
             // anyway, the update will be sent to us later if state is modified
             //
-            //ps.setOwner (newOwner);
-            answer = Boolean.TRUE;
+            return Boolean.TRUE;
          }
-         else if (ps.getVersion() > remoteVersion.longValue())
+         else if (session.getVersion() > remoteVersion.longValue())
          {
             // we are concerned and our version is more recent than the one of the remote host!
             // it means that we have concurrent calls on the same state that has not yet been updated
             // this means we will need to raise a java.rmi.RemoteException
             //
-            answer = Boolean.FALSE;
+            return Boolean.FALSE;
          }
-         else
-         {
-            // the remote host has the same version as us (or more recent? possible?)
-            // we need to update the ownership. We can do this because we know that no other
-            // node can refuse the _setOwnership call
-            ps.setOwner(newOwner);
-            this.ownedObjectExternallyModified(appName, keyId, ps, ps);
-            answer = Boolean.TRUE;
-         }
+
+         // the remote host has the same version as us (or more recent? possible?)
+         // we need to update the ownership. We can do this because we know that no other
+         // node can refuse the _setOwnership call
+         session.setOwner(newOwner);
+
+         this.ownedObjectExternallyModified(appName, keyId, session, session);
+         
+         return Boolean.TRUE;
       }
       finally
       {
-         mtx.release();
+         lock.unlock();
       }
-      return answer;
    }
    
    public void takeOwnership(String appName, Object keyId) throws java.rmi.RemoteException
@@ -596,81 +504,65 @@
    
    public void removeSession(String appName, Object keyId)
    {
-      Hashtable<Object, PackagedSession> app = this.getHashtableForApp(appName);
-      if (app != null)
+      if (this.getAppMap(appName).remove(keyId) != null)
       {
-         PackagedSession ps = app.remove(keyId);
-         if (ps != null)
+         Object[] args = { appName, keyId };
+         
+         try
          {
-            this.removeLock(appName, keyId);
-            Object[] args = { appName, keyId };
-            try
-            {
-               this.hapGeneral.callMethodOnCluster (this.sessionStateIdentifier,
-                                                    "_removeSession",
-                                                    args,
-                                                    REMOVE_SESSION_TYPES,
-                                                    true);
-            }
-            catch (Exception e)
-            {
-               this.log.error("operation failed", e);
-            }
+            this.partition.callMethodOnCluster(this.sessionStateIdentifier, "_removeSession", args, REMOVE_SESSION_TYPES, true);
          }
+         catch (Exception e)
+         {
+            this.log.error("operation failed", e);
+         }
       }
    }
    
    public void _removeSession(String appName, Object keyId)
    {
-      Hashtable<Object, PackagedSession> app = this.getHashtableForApp(appName);
-      PackagedSession ps = app.remove(keyId);
-      if (ps != null && ps.getOwner().equals(this.myNodeName))
+      PackagedSession session = this.getAppMap(appName).remove(keyId);
+      
+      if ((session != null) && session.getOwner().equals(this.myNodeName))
       {
-         this.ownedObjectExternallyModified(appName, keyId, ps, ps);
+         this.ownedObjectExternallyModified(appName, keyId, session, session);
       }
-      
-      this.removeLock(appName, keyId);
    }
    
-   protected Hashtable<String, Vector<HASessionStateListener>> listeners = new Hashtable<String, Vector<HASessionStateListener>>();
-   
-   public synchronized void subscribe(String appName, HASessionStateListener listener)
+   public void subscribe(String appName, HASessionStateListener listener)
    {
-      Vector<HASessionStateListener> members = this.listeners.get(appName);
-      if (members == null)
-      {
-         members = new Vector<HASessionStateListener>();
-         this.listeners.put(appName, members);
-      }
-      if (!members.contains(listener))
-      {
-         members.add(listener);
-      }
+      Set<HASessionStateListener> set = new CopyOnWriteArraySet<HASessionStateListener>();
+      
+      Set<HASessionStateListener> existing = this.appListenerMap.putIfAbsent(appName, set);
+      
+      ((existing != null) ? existing : set).add(listener);
    }
    
-   public synchronized void unsubscribe(String appName, HASessionStateListener listener)
+   public void unsubscribe(String appName, HASessionStateListener listener)
    {
-      Vector<HASessionStateListener> members = this.listeners.get(appName);
-      if ((members != null) && members.contains(listener))
+      Set<HASessionStateListener> listeners = this.appListenerMap.get(appName);
+      
+      if (listeners != null)
       {
-         members.remove(listener);
+         listeners.remove(listener);
       }
    }
    
    public void ownedObjectExternallyModified(String appName, Object key, PackagedSession oldSession, PackagedSession newSession)
    {
-      Vector<HASessionStateListener> members = this.listeners.get(appName);
-      if (members != null)
+      Set<HASessionStateListener> listeners = this.appListenerMap.get(appName);
+      
+      if (listeners != null)
       {
-         for (int i = 0; i < members.size(); i++)
+         for (HASessionStateListener listener: listeners)
          {
             try
             {
-               members.elementAt(i).sessionExternallyModified(newSession);
+               listener.sessionExternallyModified(newSession);
             }
-            catch (Throwable t)
+            catch (Throwable e)
             {
-               this.log.debug(t);
+               this.log.debug(e);
             }
          }
       }
@@ -678,56 +570,6 @@
    
    public HAPartition getCurrentHAPartition()
    {
-      return this.hapGeneral;
+      return this.partition;
    }
-   
-   
-   protected boolean lockExists(String appName, Object key)
-   {
-      synchronized (this.locks)
-      {
-         Map<Object, Mutex> ls = this.locks.get(appName);
-         if (ls == null)
-         {
-            return false;
-         }
-         
-         return (ls.get(key) != null);
-      }
-   }
-
-   protected Mutex getLock(String appName, Object key)
-   {
-      synchronized (this.locks)
-      {
-         Map<Object, Mutex> ls = this.locks.get(appName);
-         if (ls == null)
-         {
-            ls = new HashMap<Object, Mutex>();
-            this.locks.put(appName, ls);
-         }
-          
-         Mutex mutex = ls.get(key);
-         if (mutex == null)
-         {
-            mutex = new Mutex();
-            ls.put(key, mutex);
-         }
-         
-         return mutex;
-      }
-   }
-
-   protected void removeLock(String appName, Object key)
-   {
-      synchronized (this.locks)
-      {
-         Map<Object, Mutex> ls = this.locks.get(appName);
-         if (ls == null)
-         {
-            return;
-         }
-         ls.remove(key);
-      }
-   }   
 }

Modified: trunk/cluster/src/main/org/jboss/ha/hasessionstate/server/PackagedSessionImpl.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/hasessionstate/server/PackagedSessionImpl.java	2008-07-01 14:16:20 UTC (rev 75269)
+++ trunk/cluster/src/main/org/jboss/ha/hasessionstate/server/PackagedSessionImpl.java	2008-07-01 15:26:34 UTC (rev 75270)
@@ -25,6 +25,8 @@
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  *   Default implementation of PackagedSession
@@ -41,79 +43,89 @@
     */ 
    private static final long serialVersionUID = 4162160242862877223L;
 
-   protected byte[] state;
-   protected long versionId;
-   protected String owner;
-   protected Serializable key;
-   protected transient long lastModificationTimeInVM;
+   private Serializable key;
    
-   public PackagedSessionImpl () 
-   {
-      this.lastModificationTimeInVM = System.currentTimeMillis ();
-   }
+   private volatile byte[] state;
+   private volatile long versionId;
+   private volatile String owner;
    
-   public PackagedSessionImpl (Serializable key, byte[] state, String owner)
+   private transient volatile long lastModificationTimeInVM;
+   private transient Lock lock;
+   
+   public PackagedSessionImpl(Serializable key, byte[] state, String owner)
    {
       this.key = key;
-      this.setState (state);
+      this.setState(state);
       this.owner = owner;
-      this.lastModificationTimeInVM = System.currentTimeMillis ();
+      this.lock = new ReentrantLock();
    }
    
-   public byte[] getState ()
+   public byte[] getState()
    {
       return this.state;
    }
    
-   public boolean setState (byte[] state)
+   public boolean setState(byte[] state)
    {
-      this.lastModificationTimeInVM = System.currentTimeMillis ();
-      if (isStateIdentical (state))
-         return true;
-      else
-      {
-         this.state = state;
-         this.versionId++;
-         return false;
-      }
+      this.lastModificationTimeInVM = System.currentTimeMillis();
+      if (isStateIdentical(state)) return true;
+
+      this.state = state;
+      this.versionId++;
+      return false;
    }
    
-   public boolean isStateIdentical (byte[] state)
+   public boolean isStateIdentical(byte[] state)
    {
-      return java.util.Arrays.equals (state, this.state);
+      return java.util.Arrays.equals(state, this.state);
    }
    
-   public void update (PackagedSession clone)
+   public void update(PackagedSession clone)
    {
-      this.state = (byte[])clone.getState().clone();
-      this.versionId = clone.getVersion ();
-      this.owner = clone.getOwner ();    
+      this.state = clone.getState().clone();
+      this.versionId = clone.getVersion();
+      this.owner = clone.getOwner();    
       this.lastModificationTimeInVM = System.currentTimeMillis();
    }
    
-   public String getOwner ()
-   { return this.owner; }
-   public void setOwner (String owner)
-   { this.owner = owner; }
+   public String getOwner()
+   {
+      return this.owner;
+   }
    
-   public long getVersion ()
-   { return this.versionId; }
+   public void setOwner(String owner)
+   {
+      this.owner = owner;
+   }
    
-   public Serializable getKey ()
-   { return this.key; }
-   public void setKey (Serializable key)
-   { this.key = key; }
+   public long getVersion()
+   {
+      return this.versionId;
+   }
+   
+   public Serializable getKey()
+   {
+      return this.key;
+   }
 
-   public long unmodifiedExistenceInVM ()
+   public long unmodifiedExistenceInVM()
    {
       return this.lastModificationTimeInVM;
    }
+
+   /**
+    * @see org.jboss.ha.hasessionstate.interfaces.PackagedSession#getLock()
+    */
+   public Lock getLock()
+   {
+      return this.lock;
+   }
    
    // JBAS-3545 -- have to set the mod time after deserializing
-   private void readObject(java.io.ObjectInputStream in)
-         throws IOException, ClassNotFoundException
+   private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException
    {
       in.defaultReadObject();
       this.lastModificationTimeInVM = System.currentTimeMillis();
+      this.lock = new ReentrantLock();
    }
 }




More information about the jboss-cvs-commits mailing list